summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/exchanges.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/exchanges.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts468
1 files changed, 365 insertions, 103 deletions
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index b4d45db2c..22be4102a 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -28,6 +28,8 @@ import {
AgeRestriction,
Amounts,
CancellationToken,
+ CoinRefreshRequest,
+ CoinStatus,
DeleteExchangeRequest,
DenomKeyType,
DenomOperationMap,
@@ -53,6 +55,7 @@ import {
NotificationType,
OperationErrorInfo,
Recoup,
+ RefreshReason,
ScopeInfo,
ScopeType,
TalerError,
@@ -67,8 +70,11 @@ import {
WireFeeMap,
WireFeesJson,
WireInfo,
+ assertUnreachable,
canonicalizeBaseUrl,
codecForExchangeKeysJson,
+ durationFromSpec,
+ durationMul,
encodeCrock,
hashDenomPub,
j2s,
@@ -89,19 +95,22 @@ import {
WalletStoresV1,
} from "../db.js";
import {
+ AsyncFlag,
ExchangeEntryDbRecordStatus,
ExchangeEntryDbUpdateStatus,
PendingTaskType,
WalletDbReadOnlyTransactionArr,
WalletDbReadWriteTransactionArr,
+ createRefreshGroup,
createTimeline,
isWithdrawableDenom,
selectBestForOverlappingDenominations,
selectMinimumFee,
- timestampOptionalAbsoluteFromDb,
+ timestampAbsoluteFromDb,
timestampOptionalPreciseFromDb,
timestampPreciseFromDb,
timestampPreciseToDb,
+ timestampProtocolFromDb,
timestampProtocolToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
@@ -117,11 +126,11 @@ import {
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
+ getAutoRefreshExecuteThreshold,
getExchangeEntryStatusFromRecord,
getExchangeState,
getExchangeTosStatusFromRecord,
getExchangeUpdateStatusFromRecord,
- runTaskWithErrorReporting,
} from "./common.js";
const logger = new Logger("exchanges.ts");
@@ -635,11 +644,13 @@ async function downloadExchangeKeysInfo(
baseUrl: string,
http: HttpRequestLibrary,
timeout: Duration,
+ cancellationToken: CancellationToken,
): Promise<ExchangeKeysDownloadResult> {
const keysUrl = new URL("keys", baseUrl);
const resp = await http.fetch(keysUrl.href, {
timeout,
+ cancellationToken,
});
// We must make sure to parse out the protocol version
@@ -828,13 +839,19 @@ async function downloadTosFromAcceptedFormat(
* If the exchange entry doesn't exist,
* a new ephemeral entry is created.
*/
-export async function startUpdateExchangeEntry(
+async function startUpdateExchangeEntry(
ws: InternalWalletState,
exchangeBaseUrl: string,
options: { forceUpdate?: boolean } = {},
): Promise<void> {
const canonBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
+ logger.info(
+ `starting update of exchange entry ${canonBaseUrl}, forced=${
+ options.forceUpdate ?? false
+ }`,
+ );
+
const { notification } = await ws.db
.mktx((x) => [x.exchanges, x.exchangeDetails])
.runReadWrite(async (tx) => {
@@ -845,7 +862,7 @@ export async function startUpdateExchangeEntry(
ws.notify(notification);
}
- const { oldExchangeState, newExchangeState } = await ws.db
+ const { oldExchangeState, newExchangeState, taskId } = await ws.db
.mktx((x) => [x.exchanges, x.operationRetries])
.runReadWrite(async (tx) => {
const r = await tx.exchanges.get(canonBaseUrl);
@@ -882,7 +899,7 @@ export async function startUpdateExchangeEntry(
// Reset retries for updating the exchange entry.
const taskId = TaskIdentifiers.forExchangeUpdate(r);
await tx.operationRetries.delete(taskId);
- return { oldExchangeState, newExchangeState };
+ return { oldExchangeState, newExchangeState, taskId };
});
ws.notify({
type: NotificationType.ExchangeStateTransition,
@@ -890,7 +907,7 @@ export async function startUpdateExchangeEntry(
newExchangeState: newExchangeState,
oldExchangeState: oldExchangeState,
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.restartShepherdTask(taskId);
}
/**
@@ -909,6 +926,119 @@ export interface ReadyExchangeSummary {
scopeInfo: ScopeInfo;
}
+async function internalWaitReadyExchange(
+ ws: InternalWalletState,
+ canonUrl: string,
+ exchangeNotifFlag: AsyncFlag,
+ options: {
+ cancellationToken?: CancellationToken;
+ forceUpdate?: boolean;
+ expectedMasterPub?: string;
+ } = {},
+): Promise<ReadyExchangeSummary> {
+ const operationId = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: canonUrl,
+ });
+ while (true) {
+ logger.info(`waiting for ready exchange ${canonUrl}`);
+ const { exchange, exchangeDetails, retryInfo, scopeInfo } =
+ await ws.db.runReadOnlyTx(
+ [
+ "exchanges",
+ "exchangeDetails",
+ "operationRetries",
+ "globalCurrencyAuditors",
+ "globalCurrencyExchanges",
+ ],
+ async (tx) => {
+ const exchange = await tx.exchanges.get(canonUrl);
+ const exchangeDetails = await getExchangeRecordsInternal(
+ tx,
+ canonUrl,
+ );
+ const retryInfo = await tx.operationRetries.get(operationId);
+ let scopeInfo: ScopeInfo | undefined = undefined;
+ if (exchange && exchangeDetails) {
+ scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
+ }
+ return { exchange, exchangeDetails, retryInfo, scopeInfo };
+ },
+ );
+
+ if (!exchange) {
+ throw Error("exchange entry does not exist anymore");
+ }
+
+ let ready = false;
+
+ switch (exchange.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Ready:
+ ready = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ // If the update is forced,
+ // we wait until we're in a full "ready" state,
+ // as we're not happy with the stale information.
+ if (!options.forceUpdate) {
+ ready = true;
+ }
+ break;
+ default: {
+ if (retryInfo) {
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ }
+ }
+ }
+
+ if (!ready) {
+ logger.info("waiting for exchange update notification");
+ await exchangeNotifFlag.wait();
+ logger.info("done waiting for exchange update notification");
+ exchangeNotifFlag.reset();
+ continue;
+ }
+
+ if (!exchangeDetails) {
+ throw Error("invariant failed");
+ }
+
+ if (!scopeInfo) {
+ throw Error("invariant failed");
+ }
+
+ const res: ReadyExchangeSummary = {
+ currency: exchangeDetails.currency,
+ exchangeBaseUrl: canonUrl,
+ masterPub: exchangeDetails.masterPublicKey,
+ tosStatus: getExchangeTosStatusFromRecord(exchange),
+ tosAcceptedEtag: exchange.tosAcceptedEtag,
+ wireInfo: exchangeDetails.wireInfo,
+ protocolVersionRange: exchangeDetails.protocolVersionRange,
+ tosCurrentEtag: exchange.tosCurrentEtag,
+ tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
+ exchange.tosAcceptedTimestamp,
+ ),
+ scopeInfo,
+ };
+
+ if (options.expectedMasterPub) {
+ if (res.masterPub !== options.expectedMasterPub) {
+ throw Error(
+ "public key of the exchange does not match expected public key",
+ );
+ }
+ }
+ return res;
+ }
+}
+
/**
* Ensure that a fresh exchange entry exists for the given
* exchange base URL.
@@ -933,127 +1063,149 @@ export async function fetchFreshExchange(
} = {},
): Promise<ReadyExchangeSummary> {
const canonUrl = canonicalizeBaseUrl(baseUrl);
- const operationId = constructTaskIdentifier({
- tag: PendingTaskType.ExchangeUpdate,
- exchangeBaseUrl: canonUrl,
+
+ ws.ensureTaskLoopRunning();
+
+ await startUpdateExchangeEntry(ws, canonUrl, {
+ forceUpdate: options.forceUpdate,
});
- const oldExchange = await ws.db
- .mktx((x) => [x.exchanges])
- .runReadOnly(async (tx) => {
- return tx.exchanges.get(canonUrl);
- });
+ return waitReadyExchange(ws, canonUrl, options);
+}
- let needsUpdate = false;
+async function waitReadyExchange(
+ ws: InternalWalletState,
+ canonUrl: string,
+ options: {
+ cancellationToken?: CancellationToken;
+ forceUpdate?: boolean;
+ expectedMasterPub?: string;
+ } = {},
+): Promise<ReadyExchangeSummary> {
+ // FIXME: We should use Symbol.dispose magic here for cleanup!
- if (!oldExchange || options.forceUpdate) {
- needsUpdate = true;
- await startUpdateExchangeEntry(ws, canonUrl, {
- forceUpdate: options.forceUpdate,
- });
- } else {
- const nextUpdate = timestampOptionalAbsoluteFromDb(
- oldExchange.nextUpdateStamp,
- );
+ const exchangeNotifFlag = new AsyncFlag();
+ // Raise exchangeNotifFlag whenever we get a notification
+ // about our exchange.
+ const cancelNotif = ws.addNotificationListener((notif) => {
if (
- nextUpdate == null ||
- AbsoluteTime.isExpired(nextUpdate) ||
- oldExchange.updateStatus !== ExchangeEntryDbUpdateStatus.Ready
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === canonUrl
) {
- needsUpdate = true;
+ logger.info(`raising update notification: ${j2s(notif)}`);
+ exchangeNotifFlag.raise();
}
- }
+ });
- if (needsUpdate) {
- await runTaskWithErrorReporting(ws, operationId, () =>
- updateExchangeFromUrlHandler(ws, canonUrl),
+ try {
+ const res = await internalWaitReadyExchange(
+ ws,
+ canonUrl,
+ exchangeNotifFlag,
+ options,
);
+ logger.info("done waiting for ready exchange");
+ return res;
+ } finally {
+ cancelNotif();
}
+}
- const { exchange, exchangeDetails, retryInfo, scopeInfo } =
- await ws.db.runReadOnlyTx(
- [
- "exchanges",
- "exchangeDetails",
- "operationRetries",
- "globalCurrencyAuditors",
- "globalCurrencyExchanges",
- ],
- async (tx) => {
- const exchange = await tx.exchanges.get(canonUrl);
- const exchangeDetails = await getExchangeRecordsInternal(tx, canonUrl);
- const retryInfo = await tx.operationRetries.get(operationId);
- let scopeInfo: ScopeInfo | undefined = undefined;
- if (exchange && exchangeDetails) {
- scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
- }
- return { exchange, exchangeDetails, retryInfo, scopeInfo };
- },
- );
+/**
+ * Update an exchange entry in the wallet's database
+ * by fetching the /keys and /wire information.
+ * Optionally link the reserve entry to the new or existing
+ * exchange entry in then DB.
+ */
+export async function updateExchangeFromUrlHandler(
+ ws: InternalWalletState,
+ exchangeBaseUrl: string,
+ cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+ logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
+ exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
- if (!exchange) {
- throw Error("exchange entry does not exist anymore");
+ const oldExchangeRec = await ws.db.runReadOnlyTx(
+ ["exchanges"],
+ async (tx) => {
+ return tx.exchanges.get(exchangeBaseUrl);
+ },
+ );
+
+ if (!oldExchangeRec) {
+ logger.info(`not updating exchange ${exchangeBaseUrl}, no record in DB`);
+ return TaskRunResult.finished();
}
- switch (exchange.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Ready:
+ let updateRequestedExplicitly = false;
+
+ switch (oldExchangeRec.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Suspended:
+ logger.info(`not updating exchange in status "suspended"`);
+ return TaskRunResult.finished();
+ case ExchangeEntryDbUpdateStatus.Initial:
+ logger.info(`not updating exchange in status "initial"`);
+ return TaskRunResult.finished();
+ case ExchangeEntryDbUpdateStatus.InitialUpdate:
case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ updateRequestedExplicitly = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.Ready:
break;
default:
- throw TalerError.fromDetail(TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- });
+ assertUnreachable(oldExchangeRec.updateStatus);
}
- if (!exchangeDetails) {
- throw Error("invariant failed");
- }
+ let refreshCheckNecessary = true;
- if (!scopeInfo) {
- throw Error("invariant failed");
- }
+ if (!updateRequestedExplicitly) {
+ // If the update wasn't requested explicitly,
+ // check if we really need to update.
- const res: ReadyExchangeSummary = {
- currency: exchangeDetails.currency,
- exchangeBaseUrl: canonUrl,
- masterPub: exchangeDetails.masterPublicKey,
- tosStatus: getExchangeTosStatusFromRecord(exchange),
- tosAcceptedEtag: exchange.tosAcceptedEtag,
- wireInfo: exchangeDetails.wireInfo,
- protocolVersionRange: exchangeDetails.protocolVersionRange,
- tosCurrentEtag: exchange.tosCurrentEtag,
- tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
- exchange.tosAcceptedTimestamp,
- ),
- scopeInfo,
- };
+ let nextUpdateStamp = timestampAbsoluteFromDb(
+ oldExchangeRec.nextUpdateStamp,
+ );
- if (options.expectedMasterPub) {
- if (res.masterPub !== options.expectedMasterPub) {
- throw Error(
- "public key of the exchange does not match expected public key",
+ let nextRefreshCheckStamp = timestampAbsoluteFromDb(
+ oldExchangeRec.nextRefreshCheckStamp,
+ );
+
+ let updateNecessary = true;
+
+ if (
+ !AbsoluteTime.isNever(nextUpdateStamp) &&
+ !AbsoluteTime.isExpired(nextUpdateStamp)
+ ) {
+ logger.info(
+ `exchange update for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString(
+ nextUpdateStamp,
+ )}`,
+ );
+ updateNecessary = false;
+ }
+
+ if (
+ !AbsoluteTime.isNever(nextRefreshCheckStamp) &&
+ !AbsoluteTime.isExpired(nextRefreshCheckStamp)
+ ) {
+ logger.info(
+ `exchange refresh check for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString(
+ nextRefreshCheckStamp,
+ )}`,
+ );
+ refreshCheckNecessary = false;
+ }
+
+ if (!(updateNecessary || refreshCheckNecessary)) {
+ return TaskRunResult.runAgainAt(
+ AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp),
);
}
}
- return res;
-}
-/**
- * Update an exchange entry in the wallet's database
- * by fetching the /keys and /wire information.
- * Optionally link the reserve entry to the new or existing
- * exchange entry in then DB.
- */
-export async function updateExchangeFromUrlHandler(
- ws: InternalWalletState,
- exchangeBaseUrl: string,
- options: {
- cancellationToken?: CancellationToken;
- } = {},
-): Promise<TaskRunResult> {
- logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
- exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
+ // When doing the auto-refresh check, we always update
+ // the key info before that.
logger.trace("updating exchange /keys info");
@@ -1063,6 +1215,7 @@ export async function updateExchangeFromUrlHandler(
exchangeBaseUrl,
ws.http,
timeout,
+ cancellationToken,
);
logger.trace("validating exchange wire info");
@@ -1302,9 +1455,13 @@ export async function updateExchangeFromUrlHandler(
});
if (recoupGroupId) {
+ const recoupTaskId = constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId,
+ });
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(recoupTaskId);
}
if (!updated) {
@@ -1313,6 +1470,84 @@ export async function updateExchangeFromUrlHandler(
logger.trace("done updating exchange info in database");
+ logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
+
+ let minCheckThreshold = AbsoluteTime.addDuration(
+ AbsoluteTime.now(),
+ durationFromSpec({ days: 1 }),
+ );
+
+ if (refreshCheckNecessary) {
+ // Do auto-refresh.
+ await ws.db
+ .mktx((x) => [
+ x.coins,
+ x.denominations,
+ x.coinAvailability,
+ x.refreshGroups,
+ x.exchanges,
+ ])
+ .runReadWrite(async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange || !exchange.detailsPointer) {
+ return;
+ }
+ const coins = await tx.coins.indexes.byBaseUrl
+ .iter(exchangeBaseUrl)
+ .toArray();
+ const refreshCoins: CoinRefreshRequest[] = [];
+ for (const coin of coins) {
+ if (coin.status !== CoinStatus.Fresh) {
+ continue;
+ }
+ const denom = await tx.denominations.get([
+ exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ logger.warn("denomination not in database");
+ continue;
+ }
+ const executeThreshold =
+ getAutoRefreshExecuteThresholdForDenom(denom);
+ if (AbsoluteTime.isExpired(executeThreshold)) {
+ refreshCoins.push({
+ coinPub: coin.coinPub,
+ amount: denom.value,
+ });
+ } else {
+ const checkThreshold = getAutoRefreshCheckThreshold(denom);
+ minCheckThreshold = AbsoluteTime.min(
+ minCheckThreshold,
+ checkThreshold,
+ );
+ }
+ }
+ if (refreshCoins.length > 0) {
+ const res = await createRefreshGroup(
+ ws,
+ tx,
+ exchange.detailsPointer?.currency,
+ refreshCoins,
+ RefreshReason.Scheduled,
+ undefined,
+ );
+ logger.trace(
+ `created refresh group for auto-refresh (${res.refreshGroupId})`,
+ );
+ }
+ logger.trace(
+ `next refresh check at ${AbsoluteTime.toIsoString(
+ minCheckThreshold,
+ )}`,
+ );
+ exchange.nextRefreshCheckStamp = timestampPreciseToDb(
+ AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
+ );
+ await tx.exchanges.put(exchange);
+ });
+ }
+
ws.notify({
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl,
@@ -1320,7 +1555,33 @@ export async function updateExchangeFromUrlHandler(
oldExchangeState: updated.oldExchangeState,
});
- return TaskRunResult.finished();
+ // Next invocation will cause the task to be run again
+ // at the necessary time.
+ return TaskRunResult.progress();
+}
+
+function getAutoRefreshExecuteThresholdForDenom(
+ d: DenominationRecord,
+): AbsoluteTime {
+ return getAutoRefreshExecuteThreshold({
+ stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
+ stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
+ });
+}
+
+/**
+ * Timestamp after which the wallet would do the next check for an auto-refresh.
+ */
+function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
+ const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+ timestampProtocolFromDb(d.stampExpireWithdraw),
+ );
+ const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+ timestampProtocolFromDb(d.stampExpireDeposit),
+ );
+ const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+ const deltaDiv = durationMul(delta, 0.75);
+ return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
}
/**
@@ -1420,6 +1681,7 @@ export async function downloadExchangeInfo(
exchangeBaseUrl,
http,
Duration.getForever(),
+ CancellationToken.CONTINUE,
);
return {
keys: keysInfo,