From 52a1f63e0a8cc2ca78910e8b56326376eb1d75d0 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 21 Feb 2024 14:23:01 +0100 Subject: wallet-core: use cancellation tokens when possible --- packages/taler-wallet-core/src/deposits.ts | 148 +++++++++++++++-------------- 1 file changed, 75 insertions(+), 73 deletions(-) (limited to 'packages/taler-wallet-core/src/deposits.ts') diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 617f32887..ed8778368 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -75,7 +75,7 @@ import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { selectPayCoinsNew } from "./coinSelection.js"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TombstoneTag, TransactionContext, @@ -119,7 +119,8 @@ const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly taskId: TaskId; + readonly taskId: TaskIdStr; + constructor( public ws: InternalWalletState, public depositGroupId: string, @@ -210,7 +211,8 @@ export class DepositTransactionContext implements TransactionContext { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return undefined; - case DepositOperationStatus.PendingDeposit: { + case DepositOperationStatus.PendingDeposit: + case DepositOperationStatus.SuspendedDeposit: { dg.operationStatus = DepositOperationStatus.Aborting; await tx.depositGroups.put(dg); return { @@ -218,9 +220,6 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; } - case DepositOperationStatus.SuspendedDeposit: - // FIXME: Can we abort a suspended transaction?! - return undefined; } return undefined; }, @@ -410,74 +409,10 @@ export function computeDepositTransactionActions( } } -/** - * Check whether the refresh associated with the - * aborting deposit group is done. - * - * If done, mark the deposit transaction as aborted. - * - * Otherwise continue waiting. - * - * FIXME: Wait for the refresh group notifications instead of periodically - * checking the refresh group status. - * FIXME: This is just one transaction, can't we do this in the initial - * transaction of processDepositGroup? - */ -async function waitForRefreshOnDepositGroup( - ws: InternalWalletState, - depositGroup: DepositGroupRecord, -): Promise { - const abortRefreshGroupId = depositGroup.abortRefreshGroupId; - checkLogicInvariant(!!abortRefreshGroupId); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId: depositGroup.depositGroupId, - }); - const transitionInfo = await ws.db.runReadWriteTx( - ["depositGroups", "refreshGroups"], - async (tx) => { - const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); - let newOpState: DepositOperationStatus | undefined; - if (!refreshGroup) { - // Maybe it got manually deleted? Means that we should - // just go into aborted. - logger.warn("no aborting refresh group found for deposit group"); - newOpState = DepositOperationStatus.Aborted; - } else { - if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { - newOpState = DepositOperationStatus.Aborted; - } else if ( - refreshGroup.operationStatus === RefreshOperationStatus.Failed - ) { - newOpState = DepositOperationStatus.Aborted; - } - } - if (newOpState) { - const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); - if (!newDg) { - return; - } - const oldTxState = computeDepositTransactionStatus(newDg); - newDg.operationStatus = newOpState; - const newTxState = computeDepositTransactionStatus(newDg); - await tx.depositGroups.put(newDg); - return { oldTxState, newTxState }; - } - return undefined; - }, - ); - - notifyTransition(ws, transactionId, transitionInfo); - ws.notify({ - type: NotificationType.BalanceChange, - hintTransactionId: transactionId, - }); - return TaskRunResult.backoff(); -} - async function refundDepositGroup( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise { const newTxPerCoin = [...depositGroup.statusPerCoin]; logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`); @@ -520,6 +455,7 @@ async function refundDepositGroup( const httpResp = await ws.http.fetch(refundUrl.href, { method: "POST", body: refundReq, + cancellationToken, }); logger.info( `coin ${i} refund HTTP status for coin: ${httpResp.status}`, @@ -600,15 +536,81 @@ async function refundDepositGroup( return TaskRunResult.backoff(); } +/** + * Check whether the refresh associated with the + * aborting deposit group is done. + * + * If done, mark the deposit transaction as aborted. + * + * Otherwise continue waiting. + * + * FIXME: Wait for the refresh group notifications instead of periodically + * checking the refresh group status. + * FIXME: This is just one transaction, can't we do this in the initial + * transaction of processDepositGroup? + */ +async function waitForRefreshOnDepositGroup( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, +): Promise { + const abortRefreshGroupId = depositGroup.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: depositGroup.depositGroupId, + }); + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups", "refreshGroups"], + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: DepositOperationStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into aborted. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = DepositOperationStatus.Aborted; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = DepositOperationStatus.Aborted; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = DepositOperationStatus.Aborted; + } + } + if (newOpState) { + const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); + if (!newDg) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = newOpState; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }, + ); + + notifyTransition(ws, transactionId, transitionInfo); + ws.notify({ + type: NotificationType.BalanceChange, + hintTransactionId: transactionId, + }); + return TaskRunResult.backoff(); +} + async function processDepositGroupAborting( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise { logger.info("processing deposit tx in 'aborting'"); const abortRefreshGroupId = depositGroup.abortRefreshGroupId; if (!abortRefreshGroupId) { logger.info("refunding deposit group"); - return refundDepositGroup(ws, depositGroup); + return refundDepositGroup(ws, depositGroup, cancellationToken); } logger.info("waiting for refresh"); return waitForRefreshOnDepositGroup(ws, depositGroup); @@ -1059,7 +1061,7 @@ export async function processDepositGroup( cancellationToken, ); case DepositOperationStatus.Aborting: - return processDepositGroupAborting(ws, depositGroup); + return processDepositGroupAborting(ws, depositGroup, cancellationToken); } return TaskRunResult.finished(); -- cgit v1.2.3