diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/exchanges.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/exchanges.ts | 468 |
1 files changed, 365 insertions, 103 deletions
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index b4d45db2c..22be4102a 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -28,6 +28,8 @@ import { AgeRestriction, Amounts, CancellationToken, + CoinRefreshRequest, + CoinStatus, DeleteExchangeRequest, DenomKeyType, DenomOperationMap, @@ -53,6 +55,7 @@ import { NotificationType, OperationErrorInfo, Recoup, + RefreshReason, ScopeInfo, ScopeType, TalerError, @@ -67,8 +70,11 @@ import { WireFeeMap, WireFeesJson, WireInfo, + assertUnreachable, canonicalizeBaseUrl, codecForExchangeKeysJson, + durationFromSpec, + durationMul, encodeCrock, hashDenomPub, j2s, @@ -89,19 +95,22 @@ import { WalletStoresV1, } from "../db.js"; import { + AsyncFlag, ExchangeEntryDbRecordStatus, ExchangeEntryDbUpdateStatus, PendingTaskType, WalletDbReadOnlyTransactionArr, WalletDbReadWriteTransactionArr, + createRefreshGroup, createTimeline, isWithdrawableDenom, selectBestForOverlappingDenominations, selectMinimumFee, - timestampOptionalAbsoluteFromDb, + timestampAbsoluteFromDb, timestampOptionalPreciseFromDb, timestampPreciseFromDb, timestampPreciseToDb, + timestampProtocolFromDb, timestampProtocolToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; @@ -117,11 +126,11 @@ import { TaskRunResult, TaskRunResultType, constructTaskIdentifier, + getAutoRefreshExecuteThreshold, getExchangeEntryStatusFromRecord, getExchangeState, getExchangeTosStatusFromRecord, getExchangeUpdateStatusFromRecord, - runTaskWithErrorReporting, } from "./common.js"; const logger = new Logger("exchanges.ts"); @@ -635,11 +644,13 @@ async function downloadExchangeKeysInfo( baseUrl: string, http: HttpRequestLibrary, timeout: Duration, + cancellationToken: CancellationToken, ): Promise<ExchangeKeysDownloadResult> { const keysUrl = new URL("keys", baseUrl); const resp = await http.fetch(keysUrl.href, { timeout, + cancellationToken, }); // We must make sure to parse out the protocol version @@ -828,13 +839,19 @@ async function downloadTosFromAcceptedFormat( * If the exchange entry doesn't exist, * a new ephemeral entry is created. */ -export async function startUpdateExchangeEntry( +async function startUpdateExchangeEntry( ws: InternalWalletState, exchangeBaseUrl: string, options: { forceUpdate?: boolean } = {}, ): Promise<void> { const canonBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); + logger.info( + `starting update of exchange entry ${canonBaseUrl}, forced=${ + options.forceUpdate ?? false + }`, + ); + const { notification } = await ws.db .mktx((x) => [x.exchanges, x.exchangeDetails]) .runReadWrite(async (tx) => { @@ -845,7 +862,7 @@ export async function startUpdateExchangeEntry( ws.notify(notification); } - const { oldExchangeState, newExchangeState } = await ws.db + const { oldExchangeState, newExchangeState, taskId } = await ws.db .mktx((x) => [x.exchanges, x.operationRetries]) .runReadWrite(async (tx) => { const r = await tx.exchanges.get(canonBaseUrl); @@ -882,7 +899,7 @@ export async function startUpdateExchangeEntry( // Reset retries for updating the exchange entry. const taskId = TaskIdentifiers.forExchangeUpdate(r); await tx.operationRetries.delete(taskId); - return { oldExchangeState, newExchangeState }; + return { oldExchangeState, newExchangeState, taskId }; }); ws.notify({ type: NotificationType.ExchangeStateTransition, @@ -890,7 +907,7 @@ export async function startUpdateExchangeEntry( newExchangeState: newExchangeState, oldExchangeState: oldExchangeState, }); - ws.workAvailable.trigger(); + ws.taskScheduler.restartShepherdTask(taskId); } /** @@ -909,6 +926,119 @@ export interface ReadyExchangeSummary { scopeInfo: ScopeInfo; } +async function internalWaitReadyExchange( + ws: InternalWalletState, + canonUrl: string, + exchangeNotifFlag: AsyncFlag, + options: { + cancellationToken?: CancellationToken; + forceUpdate?: boolean; + expectedMasterPub?: string; + } = {}, +): Promise<ReadyExchangeSummary> { + const operationId = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: canonUrl, + }); + while (true) { + logger.info(`waiting for ready exchange ${canonUrl}`); + const { exchange, exchangeDetails, retryInfo, scopeInfo } = + await ws.db.runReadOnlyTx( + [ + "exchanges", + "exchangeDetails", + "operationRetries", + "globalCurrencyAuditors", + "globalCurrencyExchanges", + ], + async (tx) => { + const exchange = await tx.exchanges.get(canonUrl); + const exchangeDetails = await getExchangeRecordsInternal( + tx, + canonUrl, + ); + const retryInfo = await tx.operationRetries.get(operationId); + let scopeInfo: ScopeInfo | undefined = undefined; + if (exchange && exchangeDetails) { + scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); + } + return { exchange, exchangeDetails, retryInfo, scopeInfo }; + }, + ); + + if (!exchange) { + throw Error("exchange entry does not exist anymore"); + } + + let ready = false; + + switch (exchange.updateStatus) { + case ExchangeEntryDbUpdateStatus.Ready: + ready = true; + break; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + // If the update is forced, + // we wait until we're in a full "ready" state, + // as we're not happy with the stale information. + if (!options.forceUpdate) { + ready = true; + } + break; + default: { + if (retryInfo) { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + } + } + } + + if (!ready) { + logger.info("waiting for exchange update notification"); + await exchangeNotifFlag.wait(); + logger.info("done waiting for exchange update notification"); + exchangeNotifFlag.reset(); + continue; + } + + if (!exchangeDetails) { + throw Error("invariant failed"); + } + + if (!scopeInfo) { + throw Error("invariant failed"); + } + + const res: ReadyExchangeSummary = { + currency: exchangeDetails.currency, + exchangeBaseUrl: canonUrl, + masterPub: exchangeDetails.masterPublicKey, + tosStatus: getExchangeTosStatusFromRecord(exchange), + tosAcceptedEtag: exchange.tosAcceptedEtag, + wireInfo: exchangeDetails.wireInfo, + protocolVersionRange: exchangeDetails.protocolVersionRange, + tosCurrentEtag: exchange.tosCurrentEtag, + tosAcceptedTimestamp: timestampOptionalPreciseFromDb( + exchange.tosAcceptedTimestamp, + ), + scopeInfo, + }; + + if (options.expectedMasterPub) { + if (res.masterPub !== options.expectedMasterPub) { + throw Error( + "public key of the exchange does not match expected public key", + ); + } + } + return res; + } +} + /** * Ensure that a fresh exchange entry exists for the given * exchange base URL. @@ -933,127 +1063,149 @@ export async function fetchFreshExchange( } = {}, ): Promise<ReadyExchangeSummary> { const canonUrl = canonicalizeBaseUrl(baseUrl); - const operationId = constructTaskIdentifier({ - tag: PendingTaskType.ExchangeUpdate, - exchangeBaseUrl: canonUrl, + + ws.ensureTaskLoopRunning(); + + await startUpdateExchangeEntry(ws, canonUrl, { + forceUpdate: options.forceUpdate, }); - const oldExchange = await ws.db - .mktx((x) => [x.exchanges]) - .runReadOnly(async (tx) => { - return tx.exchanges.get(canonUrl); - }); + return waitReadyExchange(ws, canonUrl, options); +} - let needsUpdate = false; +async function waitReadyExchange( + ws: InternalWalletState, + canonUrl: string, + options: { + cancellationToken?: CancellationToken; + forceUpdate?: boolean; + expectedMasterPub?: string; + } = {}, +): Promise<ReadyExchangeSummary> { + // FIXME: We should use Symbol.dispose magic here for cleanup! - if (!oldExchange || options.forceUpdate) { - needsUpdate = true; - await startUpdateExchangeEntry(ws, canonUrl, { - forceUpdate: options.forceUpdate, - }); - } else { - const nextUpdate = timestampOptionalAbsoluteFromDb( - oldExchange.nextUpdateStamp, - ); + const exchangeNotifFlag = new AsyncFlag(); + // Raise exchangeNotifFlag whenever we get a notification + // about our exchange. + const cancelNotif = ws.addNotificationListener((notif) => { if ( - nextUpdate == null || - AbsoluteTime.isExpired(nextUpdate) || - oldExchange.updateStatus !== ExchangeEntryDbUpdateStatus.Ready + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === canonUrl ) { - needsUpdate = true; + logger.info(`raising update notification: ${j2s(notif)}`); + exchangeNotifFlag.raise(); } - } + }); - if (needsUpdate) { - await runTaskWithErrorReporting(ws, operationId, () => - updateExchangeFromUrlHandler(ws, canonUrl), + try { + const res = await internalWaitReadyExchange( + ws, + canonUrl, + exchangeNotifFlag, + options, ); + logger.info("done waiting for ready exchange"); + return res; + } finally { + cancelNotif(); } +} - const { exchange, exchangeDetails, retryInfo, scopeInfo } = - await ws.db.runReadOnlyTx( - [ - "exchanges", - "exchangeDetails", - "operationRetries", - "globalCurrencyAuditors", - "globalCurrencyExchanges", - ], - async (tx) => { - const exchange = await tx.exchanges.get(canonUrl); - const exchangeDetails = await getExchangeRecordsInternal(tx, canonUrl); - const retryInfo = await tx.operationRetries.get(operationId); - let scopeInfo: ScopeInfo | undefined = undefined; - if (exchange && exchangeDetails) { - scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); - } - return { exchange, exchangeDetails, retryInfo, scopeInfo }; - }, - ); +/** + * Update an exchange entry in the wallet's database + * by fetching the /keys and /wire information. + * Optionally link the reserve entry to the new or existing + * exchange entry in then DB. + */ +export async function updateExchangeFromUrlHandler( + ws: InternalWalletState, + exchangeBaseUrl: string, + cancellationToken: CancellationToken, +): Promise<TaskRunResult> { + logger.trace(`updating exchange info for ${exchangeBaseUrl}`); + exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); - if (!exchange) { - throw Error("exchange entry does not exist anymore"); + const oldExchangeRec = await ws.db.runReadOnlyTx( + ["exchanges"], + async (tx) => { + return tx.exchanges.get(exchangeBaseUrl); + }, + ); + + if (!oldExchangeRec) { + logger.info(`not updating exchange ${exchangeBaseUrl}, no record in DB`); + return TaskRunResult.finished(); } - switch (exchange.updateStatus) { - case ExchangeEntryDbUpdateStatus.Ready: + let updateRequestedExplicitly = false; + + switch (oldExchangeRec.updateStatus) { + case ExchangeEntryDbUpdateStatus.Suspended: + logger.info(`not updating exchange in status "suspended"`); + return TaskRunResult.finished(); + case ExchangeEntryDbUpdateStatus.Initial: + logger.info(`not updating exchange in status "initial"`); + return TaskRunResult.finished(); + case ExchangeEntryDbUpdateStatus.InitialUpdate: case ExchangeEntryDbUpdateStatus.ReadyUpdate: + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + updateRequestedExplicitly = true; + break; + case ExchangeEntryDbUpdateStatus.Ready: break; default: - throw TalerError.fromDetail(TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }); + assertUnreachable(oldExchangeRec.updateStatus); } - if (!exchangeDetails) { - throw Error("invariant failed"); - } + let refreshCheckNecessary = true; - if (!scopeInfo) { - throw Error("invariant failed"); - } + if (!updateRequestedExplicitly) { + // If the update wasn't requested explicitly, + // check if we really need to update. - const res: ReadyExchangeSummary = { - currency: exchangeDetails.currency, - exchangeBaseUrl: canonUrl, - masterPub: exchangeDetails.masterPublicKey, - tosStatus: getExchangeTosStatusFromRecord(exchange), - tosAcceptedEtag: exchange.tosAcceptedEtag, - wireInfo: exchangeDetails.wireInfo, - protocolVersionRange: exchangeDetails.protocolVersionRange, - tosCurrentEtag: exchange.tosCurrentEtag, - tosAcceptedTimestamp: timestampOptionalPreciseFromDb( - exchange.tosAcceptedTimestamp, - ), - scopeInfo, - }; + let nextUpdateStamp = timestampAbsoluteFromDb( + oldExchangeRec.nextUpdateStamp, + ); - if (options.expectedMasterPub) { - if (res.masterPub !== options.expectedMasterPub) { - throw Error( - "public key of the exchange does not match expected public key", + let nextRefreshCheckStamp = timestampAbsoluteFromDb( + oldExchangeRec.nextRefreshCheckStamp, + ); + + let updateNecessary = true; + + if ( + !AbsoluteTime.isNever(nextUpdateStamp) && + !AbsoluteTime.isExpired(nextUpdateStamp) + ) { + logger.info( + `exchange update for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString( + nextUpdateStamp, + )}`, + ); + updateNecessary = false; + } + + if ( + !AbsoluteTime.isNever(nextRefreshCheckStamp) && + !AbsoluteTime.isExpired(nextRefreshCheckStamp) + ) { + logger.info( + `exchange refresh check for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString( + nextRefreshCheckStamp, + )}`, + ); + refreshCheckNecessary = false; + } + + if (!(updateNecessary || refreshCheckNecessary)) { + return TaskRunResult.runAgainAt( + AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp), ); } } - return res; -} -/** - * Update an exchange entry in the wallet's database - * by fetching the /keys and /wire information. - * Optionally link the reserve entry to the new or existing - * exchange entry in then DB. - */ -export async function updateExchangeFromUrlHandler( - ws: InternalWalletState, - exchangeBaseUrl: string, - options: { - cancellationToken?: CancellationToken; - } = {}, -): Promise<TaskRunResult> { - logger.trace(`updating exchange info for ${exchangeBaseUrl}`); - exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); + // When doing the auto-refresh check, we always update + // the key info before that. logger.trace("updating exchange /keys info"); @@ -1063,6 +1215,7 @@ export async function updateExchangeFromUrlHandler( exchangeBaseUrl, ws.http, timeout, + cancellationToken, ); logger.trace("validating exchange wire info"); @@ -1302,9 +1455,13 @@ export async function updateExchangeFromUrlHandler( }); if (recoupGroupId) { + const recoupTaskId = constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId, + }); // Asynchronously start recoup. This doesn't need to finish // for the exchange update to be considered finished. - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(recoupTaskId); } if (!updated) { @@ -1313,6 +1470,84 @@ export async function updateExchangeFromUrlHandler( logger.trace("done updating exchange info in database"); + logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); + + let minCheckThreshold = AbsoluteTime.addDuration( + AbsoluteTime.now(), + durationFromSpec({ days: 1 }), + ); + + if (refreshCheckNecessary) { + // Do auto-refresh. + await ws.db + .mktx((x) => [ + x.coins, + x.denominations, + x.coinAvailability, + x.refreshGroups, + x.exchanges, + ]) + .runReadWrite(async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange || !exchange.detailsPointer) { + return; + } + const coins = await tx.coins.indexes.byBaseUrl + .iter(exchangeBaseUrl) + .toArray(); + const refreshCoins: CoinRefreshRequest[] = []; + for (const coin of coins) { + if (coin.status !== CoinStatus.Fresh) { + continue; + } + const denom = await tx.denominations.get([ + exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + logger.warn("denomination not in database"); + continue; + } + const executeThreshold = + getAutoRefreshExecuteThresholdForDenom(denom); + if (AbsoluteTime.isExpired(executeThreshold)) { + refreshCoins.push({ + coinPub: coin.coinPub, + amount: denom.value, + }); + } else { + const checkThreshold = getAutoRefreshCheckThreshold(denom); + minCheckThreshold = AbsoluteTime.min( + minCheckThreshold, + checkThreshold, + ); + } + } + if (refreshCoins.length > 0) { + const res = await createRefreshGroup( + ws, + tx, + exchange.detailsPointer?.currency, + refreshCoins, + RefreshReason.Scheduled, + undefined, + ); + logger.trace( + `created refresh group for auto-refresh (${res.refreshGroupId})`, + ); + } + logger.trace( + `next refresh check at ${AbsoluteTime.toIsoString( + minCheckThreshold, + )}`, + ); + exchange.nextRefreshCheckStamp = timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(minCheckThreshold), + ); + await tx.exchanges.put(exchange); + }); + } + ws.notify({ type: NotificationType.ExchangeStateTransition, exchangeBaseUrl, @@ -1320,7 +1555,33 @@ export async function updateExchangeFromUrlHandler( oldExchangeState: updated.oldExchangeState, }); - return TaskRunResult.finished(); + // Next invocation will cause the task to be run again + // at the necessary time. + return TaskRunResult.progress(); +} + +function getAutoRefreshExecuteThresholdForDenom( + d: DenominationRecord, +): AbsoluteTime { + return getAutoRefreshExecuteThreshold({ + stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw), + stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit), + }); +} + +/** + * Timestamp after which the wallet would do the next check for an auto-refresh. + */ +function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime { + const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(d.stampExpireWithdraw), + ); + const expireDeposit = AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(d.stampExpireDeposit), + ); + const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); + const deltaDiv = durationMul(delta, 0.75); + return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); } /** @@ -1420,6 +1681,7 @@ export async function downloadExchangeInfo( exchangeBaseUrl, http, Duration.getForever(), + CancellationToken.CONTINUE, ); return { keys: keysInfo, |