diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/refresh.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/refresh.ts | 155 |
1 files changed, 22 insertions, 133 deletions
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 5f7169dbd..b9ac12518 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -15,12 +15,12 @@ */ import { - AbsoluteTime, AgeCommitment, AgeRestriction, AmountJson, Amounts, amountToPretty, + CancellationToken, codecForExchangeMeltResponse, codecForExchangeRevealResponse, CoinPublicKeyString, @@ -29,8 +29,6 @@ import { DenominationInfo, DenomKeyType, Duration, - durationFromSpec, - durationMul, encodeCrock, ExchangeMeltRequest, ExchangeProtocolVersion, @@ -51,7 +49,6 @@ import { TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, - TalerProtocolTimestamp, TransactionAction, TransactionMajorState, TransactionState, @@ -79,10 +76,11 @@ import { } from "../db.js"; import { getCandidateWithdrawalDenomsTx, + PendingTaskType, RefreshGroupPerExchangeInfo, RefreshSessionRecord, + TaskId, timestampPreciseToDb, - timestampProtocolFromDb, WalletDbReadWriteTransactionArr, } from "../index.js"; import { @@ -94,6 +92,7 @@ import { selectWithdrawalDenominations } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; import { + constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, TaskRunResult, @@ -111,6 +110,7 @@ const logger = new Logger("refresh.ts"); export class RefreshTransactionContext implements TransactionContext { public transactionId: string; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, @@ -120,6 +120,10 @@ export class RefreshTransactionContext implements TransactionContext { tag: TransactionType.Refresh, refreshGroupId, }); + this.taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId, + }); } async deleteTransaction(): Promise<void> { @@ -211,8 +215,8 @@ export class RefreshTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise<void> { @@ -250,8 +254,9 @@ export class RefreshTransactionContext implements TransactionContext { newTxState: computeRefreshTransactionState(dg), }; }); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(this.taskId); } } @@ -1003,7 +1008,7 @@ async function refreshReveal( export async function processRefreshGroup( ws: InternalWalletState, refreshGroupId: string, - options: Record<string, never> = {}, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { logger.trace(`processing refresh group ${refreshGroupId}`); @@ -1053,7 +1058,7 @@ export async function processRefreshGroup( logger.warn(`exception: ${e}`); } if (inShutdown) { - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } if (errors.length > 0) { return { @@ -1068,7 +1073,7 @@ export async function processRefreshGroup( }; } - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processRefreshSession( @@ -1311,134 +1316,18 @@ export async function createRefreshGroup( logger.trace(`created refresh group ${refreshGroupId}`); + const ctx = new RefreshTransactionContext(ws, refreshGroupId); + + // Shepherd the task. + // If the current transaction fails to commit the refresh + // group to the DB, the shepherd will give up. + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { refreshGroupId, }; } -/** - * Timestamp after which the wallet would do the next check for an auto-refresh. - */ -function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime { - const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(d.stampExpireWithdraw), - ); - const expireDeposit = AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(d.stampExpireDeposit), - ); - const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); - const deltaDiv = durationMul(delta, 0.75); - return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); -} - -/** - * Timestamp after which the wallet would do an auto-refresh. - */ -export function getAutoRefreshExecuteThreshold(d: { - stampExpireWithdraw: TalerProtocolTimestamp; - stampExpireDeposit: TalerProtocolTimestamp; -}): AbsoluteTime { - const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( - d.stampExpireWithdraw, - ); - const expireDeposit = AbsoluteTime.fromProtocolTimestamp( - d.stampExpireDeposit, - ); - const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); - const deltaDiv = durationMul(delta, 0.5); - return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); -} - -function getAutoRefreshExecuteThresholdForDenom( - d: DenominationRecord, -): AbsoluteTime { - return getAutoRefreshExecuteThreshold({ - stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw), - stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit), - }); -} - -export async function autoRefresh( - ws: InternalWalletState, - exchangeBaseUrl: string, -): Promise<TaskRunResult> { - logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); - - // We must make sure that the exchange is up-to-date so that - // can refresh into new denominations. - await fetchFreshExchange(ws, exchangeBaseUrl); - - let minCheckThreshold = AbsoluteTime.addDuration( - AbsoluteTime.now(), - durationFromSpec({ days: 1 }), - ); - await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.refreshGroups, - x.exchanges, - ]) - .runReadWrite(async (tx) => { - const exchange = await tx.exchanges.get(exchangeBaseUrl); - if (!exchange || !exchange.detailsPointer) { - return; - } - const coins = await tx.coins.indexes.byBaseUrl - .iter(exchangeBaseUrl) - .toArray(); - const refreshCoins: CoinRefreshRequest[] = []; - for (const coin of coins) { - if (coin.status !== CoinStatus.Fresh) { - continue; - } - const denom = await tx.denominations.get([ - exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - logger.warn("denomination not in database"); - continue; - } - const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom); - if (AbsoluteTime.isExpired(executeThreshold)) { - refreshCoins.push({ - coinPub: coin.coinPub, - amount: denom.value, - }); - } else { - const checkThreshold = getAutoRefreshCheckThreshold(denom); - minCheckThreshold = AbsoluteTime.min( - minCheckThreshold, - checkThreshold, - ); - } - } - if (refreshCoins.length > 0) { - const res = await createRefreshGroup( - ws, - tx, - exchange.detailsPointer?.currency, - refreshCoins, - RefreshReason.Scheduled, - undefined, - ); - logger.trace( - `created refresh group for auto-refresh (${res.refreshGroupId})`, - ); - } - logger.trace( - `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`, - ); - exchange.nextRefreshCheckStamp = timestampPreciseToDb( - AbsoluteTime.toPreciseTimestamp(minCheckThreshold), - ); - await tx.exchanges.put(exchange); - }); - return TaskRunResult.finished(); -} - export function computeRefreshTransactionState( rg: RefreshGroupRecord, ): TransactionState { |