diff options
author | Florian Dold <florian@dold.me> | 2024-02-21 14:23:01 +0100 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2024-02-21 14:23:01 +0100 |
commit | 52a1f63e0a8cc2ca78910e8b56326376eb1d75d0 (patch) | |
tree | e59e898731a9eb76a9af3cec75256b5a07adf893 /packages/taler-wallet-core/src | |
parent | 612b85c18fc17af412d08e075e1fddaa67aa7bf0 (diff) | |
download | wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.gz wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.bz2 wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.zip |
wallet-core: use cancellation tokens when possible
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r-- | packages/taler-wallet-core/src/common.ts | 88 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/denominations.ts | 3 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/deposits.ts | 148 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/exchanges.ts | 7 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pay-merchant.ts | 73 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-pull-credit.ts | 37 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-pull-debit.ts | 4 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-push-credit.ts | 31 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-push-debit.ts | 32 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/recoup.ts | 8 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/refresh.ts | 59 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/reward.ts | 5 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 24 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/transactions.ts | 9 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/withdraw.ts | 78 |
15 files changed, 374 insertions, 232 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 9b69ad6c4..8c6650f4a 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -337,7 +337,7 @@ export function constructTombstone(p: ParsedTombstone): TombstoneIdStr { * Uniform interface for a particular wallet transaction. */ export interface TransactionManager { - get taskId(): TaskId; + get taskId(): TaskIdStr; get transactionId(): TransactionIdStr; fail(): Promise<void>; abort(): Promise<void>; @@ -600,90 +600,92 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { } } -export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId { +export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskIdStr { switch (p.tag) { case PendingTaskType.Backup: - return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId; + return `${p.tag}:${p.backupProviderBaseUrl}` as TaskIdStr; case PendingTaskType.Deposit: - return `${p.tag}:${p.depositGroupId}` as TaskId; + return `${p.tag}:${p.depositGroupId}` as TaskIdStr; case PendingTaskType.ExchangeUpdate: - return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId; + return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskIdStr; case PendingTaskType.PeerPullDebit: - return `${p.tag}:${p.peerPullDebitId}` as TaskId; + return `${p.tag}:${p.peerPullDebitId}` as TaskIdStr; case PendingTaskType.PeerPushCredit: - return `${p.tag}:${p.peerPushCreditId}` as TaskId; + return `${p.tag}:${p.peerPushCreditId}` as TaskIdStr; case PendingTaskType.PeerPullCredit: - return `${p.tag}:${p.pursePub}` as TaskId; + return `${p.tag}:${p.pursePub}` as TaskIdStr; case PendingTaskType.PeerPushDebit: - return `${p.tag}:${p.pursePub}` as TaskId; + return `${p.tag}:${p.pursePub}` as TaskIdStr; case PendingTaskType.Purchase: - return `${p.tag}:${p.proposalId}` as TaskId; + return `${p.tag}:${p.proposalId}` as TaskIdStr; case PendingTaskType.Recoup: - return `${p.tag}:${p.recoupGroupId}` as TaskId; + return `${p.tag}:${p.recoupGroupId}` as TaskIdStr; case PendingTaskType.Refresh: - return `${p.tag}:${p.refreshGroupId}` as TaskId; + return `${p.tag}:${p.refreshGroupId}` as TaskIdStr; case PendingTaskType.RewardPickup: - return `${p.tag}:${p.walletRewardId}` as TaskId; + return `${p.tag}:${p.walletRewardId}` as TaskIdStr; case PendingTaskType.Withdraw: - return `${p.tag}:${p.withdrawalGroupId}` as TaskId; + return `${p.tag}:${p.withdrawalGroupId}` as TaskIdStr; default: assertUnreachable(p); } } export namespace TaskIdentifiers { - export function forWithdrawal(wg: WithdrawalGroupRecord): TaskId { - return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskId; + export function forWithdrawal(wg: WithdrawalGroupRecord): TaskIdStr { + return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskIdStr; } - export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskId { + export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskIdStr { return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent( exch.baseUrl, - )}` as TaskId; + )}` as TaskIdStr; } - export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskId { + export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskIdStr { return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent( exchBaseUrl, - )}` as TaskId; + )}` as TaskIdStr; } - export function forTipPickup(tipRecord: RewardRecord): TaskId { - return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId; + export function forTipPickup(tipRecord: RewardRecord): TaskIdStr { + return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskIdStr; } - export function forRefresh(refreshGroupRecord: RefreshGroupRecord): TaskId { - return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskId; + export function forRefresh( + refreshGroupRecord: RefreshGroupRecord, + ): TaskIdStr { + return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskIdStr; } - export function forPay(purchaseRecord: PurchaseRecord): TaskId { - return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskId; + export function forPay(purchaseRecord: PurchaseRecord): TaskIdStr { + return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskIdStr; } - export function forRecoup(recoupRecord: RecoupGroupRecord): TaskId { - return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskId; + export function forRecoup(recoupRecord: RecoupGroupRecord): TaskIdStr { + return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskIdStr; } - export function forDeposit(depositRecord: DepositGroupRecord): TaskId { - return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskId; + export function forDeposit(depositRecord: DepositGroupRecord): TaskIdStr { + return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskIdStr; } - export function forBackup(backupRecord: BackupProviderRecord): TaskId { + export function forBackup(backupRecord: BackupProviderRecord): TaskIdStr { return `${PendingTaskType.Backup}:${encodeURIComponent( backupRecord.baseUrl, - )}` as TaskId; + )}` as TaskIdStr; } export function forPeerPushPaymentInitiation( ppi: PeerPushDebitRecord, - ): TaskId { - return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskId; + ): TaskIdStr { + return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskIdStr; } export function forPeerPullPaymentInitiation( ppi: PeerPullCreditRecord, - ): TaskId { - return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskId; + ): TaskIdStr { + return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskIdStr; } export function forPeerPullPaymentDebit( ppi: PeerPullPaymentIncomingRecord, - ): TaskId { - return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskId; + ): TaskIdStr { + return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskIdStr; } export function forPeerPushCredit( ppi: PeerPushPaymentIncomingRecord, - ): TaskId { - return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskId; + ): TaskIdStr { + return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskIdStr; } } @@ -700,6 +702,8 @@ export enum TransitionResult { * Uniform interface to all transactions. */ export interface TransactionContext { + get taskId(): TaskIdStr | undefined; + get transactionId(): TransactionIdStr; abortTransaction(): Promise<void>; suspendTransaction(): Promise<void>; resumeTransaction(): Promise<void>; @@ -729,5 +733,5 @@ export enum PendingTaskType { PeerPullDebit = "peer-pull-debit", } -declare const __taskId: unique symbol; -export type TaskId = string & { [__taskId]: true }; +declare const __taskIdStr: unique symbol; +export type TaskIdStr = string & { [__taskIdStr]: true }; diff --git a/packages/taler-wallet-core/src/denominations.ts b/packages/taler-wallet-core/src/denominations.ts index 177070622..a539918de 100644 --- a/packages/taler-wallet-core/src/denominations.ts +++ b/packages/taler-wallet-core/src/denominations.ts @@ -14,6 +14,9 @@ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +/** + * Imports. + */ import { AbsoluteTime, AmountJson, diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 617f32887..ed8778368 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -75,7 +75,7 @@ import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { selectPayCoinsNew } from "./coinSelection.js"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TombstoneTag, TransactionContext, @@ -119,7 +119,8 @@ const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly taskId: TaskId; + readonly taskId: TaskIdStr; + constructor( public ws: InternalWalletState, public depositGroupId: string, @@ -210,7 +211,8 @@ export class DepositTransactionContext implements TransactionContext { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return undefined; - case DepositOperationStatus.PendingDeposit: { + case DepositOperationStatus.PendingDeposit: + case DepositOperationStatus.SuspendedDeposit: { dg.operationStatus = DepositOperationStatus.Aborting; await tx.depositGroups.put(dg); return { @@ -218,9 +220,6 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; } - case DepositOperationStatus.SuspendedDeposit: - // FIXME: Can we abort a suspended transaction?! - return undefined; } return undefined; }, @@ -410,74 +409,10 @@ export function computeDepositTransactionActions( } } -/** - * Check whether the refresh associated with the - * aborting deposit group is done. - * - * If done, mark the deposit transaction as aborted. - * - * Otherwise continue waiting. - * - * FIXME: Wait for the refresh group notifications instead of periodically - * checking the refresh group status. - * FIXME: This is just one transaction, can't we do this in the initial - * transaction of processDepositGroup? - */ -async function waitForRefreshOnDepositGroup( - ws: InternalWalletState, - depositGroup: DepositGroupRecord, -): Promise<TaskRunResult> { - const abortRefreshGroupId = depositGroup.abortRefreshGroupId; - checkLogicInvariant(!!abortRefreshGroupId); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId: depositGroup.depositGroupId, - }); - const transitionInfo = await ws.db.runReadWriteTx( - ["depositGroups", "refreshGroups"], - async (tx) => { - const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); - let newOpState: DepositOperationStatus | undefined; - if (!refreshGroup) { - // Maybe it got manually deleted? Means that we should - // just go into aborted. - logger.warn("no aborting refresh group found for deposit group"); - newOpState = DepositOperationStatus.Aborted; - } else { - if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { - newOpState = DepositOperationStatus.Aborted; - } else if ( - refreshGroup.operationStatus === RefreshOperationStatus.Failed - ) { - newOpState = DepositOperationStatus.Aborted; - } - } - if (newOpState) { - const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); - if (!newDg) { - return; - } - const oldTxState = computeDepositTransactionStatus(newDg); - newDg.operationStatus = newOpState; - const newTxState = computeDepositTransactionStatus(newDg); - await tx.depositGroups.put(newDg); - return { oldTxState, newTxState }; - } - return undefined; - }, - ); - - notifyTransition(ws, transactionId, transitionInfo); - ws.notify({ - type: NotificationType.BalanceChange, - hintTransactionId: transactionId, - }); - return TaskRunResult.backoff(); -} - async function refundDepositGroup( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const newTxPerCoin = [...depositGroup.statusPerCoin]; logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`); @@ -520,6 +455,7 @@ async function refundDepositGroup( const httpResp = await ws.http.fetch(refundUrl.href, { method: "POST", body: refundReq, + cancellationToken, }); logger.info( `coin ${i} refund HTTP status for coin: ${httpResp.status}`, @@ -600,15 +536,81 @@ async function refundDepositGroup( return TaskRunResult.backoff(); } +/** + * Check whether the refresh associated with the + * aborting deposit group is done. + * + * If done, mark the deposit transaction as aborted. + * + * Otherwise continue waiting. + * + * FIXME: Wait for the refresh group notifications instead of periodically + * checking the refresh group status. + * FIXME: This is just one transaction, can't we do this in the initial + * transaction of processDepositGroup? + */ +async function waitForRefreshOnDepositGroup( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, +): Promise<TaskRunResult> { + const abortRefreshGroupId = depositGroup.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: depositGroup.depositGroupId, + }); + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups", "refreshGroups"], + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: DepositOperationStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into aborted. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = DepositOperationStatus.Aborted; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = DepositOperationStatus.Aborted; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = DepositOperationStatus.Aborted; + } + } + if (newOpState) { + const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); + if (!newDg) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = newOpState; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }, + ); + + notifyTransition(ws, transactionId, transitionInfo); + ws.notify({ + type: NotificationType.BalanceChange, + hintTransactionId: transactionId, + }); + return TaskRunResult.backoff(); +} + async function processDepositGroupAborting( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { logger.info("processing deposit tx in 'aborting'"); const abortRefreshGroupId = depositGroup.abortRefreshGroupId; if (!abortRefreshGroupId) { logger.info("refunding deposit group"); - return refundDepositGroup(ws, depositGroup); + return refundDepositGroup(ws, depositGroup, cancellationToken); } logger.info("waiting for refresh"); return waitForRefreshOnDepositGroup(ws, depositGroup); @@ -1059,7 +1061,7 @@ export async function processDepositGroup( cancellationToken, ); case DepositOperationStatus.Aborting: - return processDepositGroupAborting(ws, depositGroup); + return processDepositGroupAborting(ws, depositGroup, cancellationToken); } return TaskRunResult.finished(); diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index a4732e474..4792c3c20 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -150,6 +150,7 @@ async function downloadExchangeWithTermsOfService( exchangeBaseUrl: string, http: HttpRequestLibrary, timeout: Duration, + cancellationToken: CancellationToken, acceptFormat: string, acceptLanguage: string | undefined, ): Promise<ExchangeTosDownloadResult> { @@ -169,6 +170,7 @@ async function downloadExchangeWithTermsOfService( const resp = await http.fetch(reqUrl.href, { headers, timeout, + cancellationToken, }); const tosText = await readSuccessResponseTextOrThrow(resp); const tosEtag = resp.headers.get("etag") || "unknown"; @@ -789,6 +791,7 @@ async function downloadTosFromAcceptedFormat( ws: InternalWalletState, baseUrl: string, timeout: Duration, + cancellationToken: CancellationToken, acceptedFormat?: string[], acceptLanguage?: string, ): Promise<ExchangeTosDownloadResult> { @@ -800,6 +803,7 @@ async function downloadTosFromAcceptedFormat( baseUrl, ws.http, timeout, + cancellationToken, format, acceptLanguage, ); @@ -816,6 +820,7 @@ async function downloadTosFromAcceptedFormat( baseUrl, ws.http, timeout, + cancellationToken, "text/plain", acceptLanguage, ); @@ -1256,6 +1261,7 @@ export async function updateExchangeFromUrlHandler( ws, exchangeBaseUrl, timeout, + cancellationToken, ["text/plain"], ); @@ -1632,6 +1638,7 @@ export async function getExchangeTos( ws, exchangeBaseUrl, getExchangeRequestTimeout(), + CancellationToken.CONTINUE, acceptedFormat, acceptLanguage, ); diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 5e01ae716..09e9e1fb3 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -103,8 +103,8 @@ import { DbRetryInfo, PendingTaskType, spendCoins, - TaskId, TaskIdentifiers, + TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -150,7 +150,7 @@ const logger = new Logger("pay-merchant.ts"); export class PayMerchantTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly taskId: TaskId; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -391,7 +391,8 @@ export class PayMerchantTransactionContext implements TransactionContext { } export class RefundTransactionContext implements TransactionContext { - public transactionId: string; + public transactionId: TransactionIdStr; + public taskId: TaskIdStr | undefined = undefined; constructor( public ws: InternalWalletState, public refundGroupId: string, @@ -612,6 +613,7 @@ export function extractContractData( async function processDownloadProposal( ws: InternalWalletState, proposalId: string, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { return await tx.purchases.get(proposalId); @@ -662,6 +664,7 @@ async function processDownloadProposal( method: "POST", body: requestBody, timeout: getProposalRequestTimeout(retryRecord?.retryInfo), + cancellationToken, }); const r = await readSuccessResponseJsonOrErrorCode( httpResponse, @@ -861,6 +864,7 @@ async function createOrReusePurchase( sessionId: string | undefined, claimToken: string | undefined, noncePriv: string | undefined, + cancellationToken: CancellationToken, ): Promise<string> { const oldProposals = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { return tx.purchases.indexes.byUrlAndOrderId.getAll([ @@ -891,7 +895,11 @@ async function createOrReusePurchase( ); if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) { const download = await expectProposalDownload(ws, oldProposal); - const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + const paid = await checkIfOrderIsAlreadyPaid( + ws, + download.contractData, + cancellationToken, + ); logger.info(`old proposal paid: ${paid}`); if (paid) { // if this transaction was shared and the order is paid then it @@ -1466,6 +1474,7 @@ export async function preparePayForUri( uriResult.sessionId, uriResult.claimToken, uriResult.noncePriv, + CancellationToken.CONTINUE, ); await waitProposalDownloaded(ws, proposalId); @@ -1951,20 +1960,20 @@ export async function processPurchase( switch (purchase.purchaseStatus) { case PurchaseStatus.PendingDownloadingProposal: - return processDownloadProposal(ws, proposalId); + return processDownloadProposal(ws, proposalId, cancellationToken); case PurchaseStatus.PendingPaying: case PurchaseStatus.PendingPayingReplay: - return processPurchasePay(ws, proposalId); + return processPurchasePay(ws, proposalId, cancellationToken); case PurchaseStatus.PendingQueryingRefund: - return processPurchaseQueryRefund(ws, purchase); + return processPurchaseQueryRefund(ws, purchase, cancellationToken); case PurchaseStatus.PendingQueryingAutoRefund: - return processPurchaseAutoRefund(ws, purchase); + return processPurchaseAutoRefund(ws, purchase, cancellationToken); case PurchaseStatus.AbortingWithRefund: - return processPurchaseAbortingRefund(ws, purchase); + return processPurchaseAbortingRefund(ws, purchase, cancellationToken); case PurchaseStatus.PendingAcceptRefund: - return processPurchaseAcceptRefund(ws, purchase); + return processPurchaseAcceptRefund(ws, purchase, cancellationToken); case PurchaseStatus.DialogShared: - return processPurchaseDialogShared(ws, purchase); + return processPurchaseDialogShared(ws, purchase, cancellationToken); case PurchaseStatus.FailedClaim: case PurchaseStatus.Done: case PurchaseStatus.DoneRepurchaseDetected: @@ -1990,6 +1999,7 @@ export async function processPurchase( async function processPurchasePay( ws: InternalWalletState, proposalId: string, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { return tx.purchases.get(proposalId); @@ -2024,7 +2034,11 @@ async function processPurchasePay( const download = await expectProposalDownload(ws, purchase); if (purchase.shared) { - const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + const paid = await checkIfOrderIsAlreadyPaid( + ws, + download.contractData, + cancellationToken, + ); if (paid) { const transitionInfo = await ws.db.runReadWriteTx( @@ -2088,6 +2102,7 @@ async function processPurchasePay( method: "POST", body: reqBody, timeout: getPayRequestTimeout(purchase), + cancellationToken, }), ); @@ -2163,7 +2178,11 @@ async function processPurchasePay( }; logger.trace(`/paid request body: ${j2s(reqBody)}`); const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () => - ws.http.fetch(payAgainUrl, { method: "POST", body: reqBody }), + ws.http.fetch(payAgainUrl, { + method: "POST", + body: reqBody, + cancellationToken, + }), ); logger.trace(`/paid response status: ${resp.status}`); if ( @@ -2499,6 +2518,7 @@ export async function sharePayment( async function checkIfOrderIsAlreadyPaid( ws: InternalWalletState, contract: WalletContractData, + cancellationToken: CancellationToken, ) { const requestUrl = new URL( `orders/${contract.orderId}`, @@ -2508,7 +2528,9 @@ async function checkIfOrderIsAlreadyPaid( requestUrl.searchParams.set("timeout_ms", "1000"); - const resp = await ws.http.fetch(requestUrl.href); + const resp = await ws.http.fetch(requestUrl.href, { + cancellationToken, + }); if ( resp.status === HttpStatusCode.Ok || resp.status === HttpStatusCode.Accepted || @@ -2518,13 +2540,14 @@ async function checkIfOrderIsAlreadyPaid( } else if (resp.status === HttpStatusCode.PaymentRequired) { return false; } - //forbidden, not found, not acceptable + // forbidden, not found, not acceptable throw Error(`this order cant be paid: ${resp.status}`); } async function processPurchaseDialogShared( ws: InternalWalletState, purchase: PurchaseRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const proposalId = purchase.proposalId; logger.trace(`processing dialog-shared for proposal ${proposalId}`); @@ -2534,7 +2557,11 @@ async function processPurchaseDialogShared( return TaskRunResult.finished(); } - const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + const paid = await checkIfOrderIsAlreadyPaid( + ws, + download.contractData, + cancellationToken, + ); if (paid) { const transitionInfo = await ws.db.runReadWriteTx( ["purchases"], @@ -2565,6 +2592,7 @@ async function processPurchaseDialogShared( async function processPurchaseAutoRefund( ws: InternalWalletState, purchase: PurchaseRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const proposalId = purchase.proposalId; logger.trace(`processing auto-refund for proposal ${proposalId}`); @@ -2624,7 +2652,9 @@ async function processPurchaseAutoRefund( requestUrl.searchParams.set("timeout_ms", "1000"); requestUrl.searchParams.set("await_refund_obtained", "yes"); - const resp = await ws.http.fetch(requestUrl.href); + const resp = await ws.http.fetch(requestUrl.href, { + cancellationToken, + }); // FIXME: Check other status codes! @@ -2661,6 +2691,7 @@ async function processPurchaseAutoRefund( async function processPurchaseAbortingRefund( ws: InternalWalletState, purchase: PurchaseRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const proposalId = purchase.proposalId; const download = await expectProposalDownload(ws, purchase); @@ -2701,6 +2732,7 @@ async function processPurchaseAbortingRefund( const abortHttpResp = await ws.http.fetch(requestUrl.href, { method: "POST", body: abortReq, + cancellationToken, }); if (abortHttpResp.status === HttpStatusCode.NotFound) { @@ -2753,6 +2785,7 @@ async function processPurchaseAbortingRefund( async function processPurchaseQueryRefund( ws: InternalWalletState, purchase: PurchaseRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const proposalId = purchase.proposalId; logger.trace(`processing query-refund for proposal ${proposalId}`); @@ -2768,7 +2801,9 @@ async function processPurchaseQueryRefund( download.contractData.contractTermsHash, ); - const resp = await ws.http.fetch(requestUrl.href); + const resp = await ws.http.fetch(requestUrl.href, { + cancellationToken, + }); const orderStatus = await readSuccessResponseJsonOrThrow( resp, codecForMerchantOrderStatusPaid(), @@ -2834,6 +2869,7 @@ async function processPurchaseQueryRefund( async function processPurchaseAcceptRefund( ws: InternalWalletState, purchase: PurchaseRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const download = await expectProposalDownload(ws, purchase); @@ -2849,6 +2885,7 @@ async function processPurchaseAcceptRefund( body: { h_contract: download.contractData.contractTermsHash, }, + cancellationToken, }); const refundResponse = await readSuccessResponseJsonOrThrow( diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index e764d2169..7774dfd5f 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -54,7 +54,7 @@ import { import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -91,13 +91,13 @@ const logger = new Logger("pay-peer-pull-credit.ts"); export class PeerPullCreditTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly retryTag: TaskId; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, public pursePub: string, ) { - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPullCredit, pursePub, }); @@ -138,7 +138,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const { ws, pursePub, retryTag, transactionId } = this; + const { ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -198,7 +198,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async failTransaction(): Promise<void> { - const { ws, pursePub, retryTag, transactionId } = this; + const { ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -249,7 +249,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async resumeTransaction(): Promise<void> { - const { ws, pursePub, retryTag, transactionId } = this; + const { ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -308,7 +308,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async abortTransaction(): Promise<void> { - const { ws, pursePub, retryTag, transactionId } = this; + const { ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -529,6 +529,7 @@ async function longpollKycStatus( async function processPeerPullCreditAbortingDeletePurse( ws: InternalWalletState, peerPullIni: PeerPullCreditRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { pursePub, pursePriv } = peerPullIni; const transactionId = constructTransactionIdentifier({ @@ -545,6 +546,7 @@ async function processPeerPullCreditAbortingDeletePurse( headers: { "taler-purse-signature": sigResp.sig, }, + cancellationToken, }); logger.info(`deleted purse with response status ${resp.status}`); @@ -637,6 +639,7 @@ async function handlePeerPullCreditWithdrawing( async function handlePeerPullCreditCreatePurse( ws: InternalWalletState, pullIni: PeerPullCreditRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount)); const pursePub = pullIni.pursePub; @@ -717,13 +720,19 @@ async function handlePeerPullCreditCreatePurse( const httpResp = await ws.http.fetch(reservePurseMergeUrl.href, { method: "POST", body: reservePurseReqBody, + cancellationToken, }); if (httpResp.status === HttpStatusCode.UnavailableForLegalReasons) { const respJson = await httpResp.json(); const kycPending = codecForWalletKycUuid().decode(respJson); logger.info(`kyc uuid response: ${j2s(kycPending)}`); - return processPeerPullCreditKycRequired(ws, pullIni, kycPending); + return processPeerPullCreditKycRequired( + ws, + pullIni, + kycPending, + cancellationToken, + ); } const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); @@ -792,9 +801,13 @@ export async function processPeerPullCredit( ); } case PeerPullPaymentCreditStatus.PendingCreatePurse: - return handlePeerPullCreditCreatePurse(ws, pullIni); + return handlePeerPullCreditCreatePurse(ws, pullIni, cancellationToken); case PeerPullPaymentCreditStatus.AbortingDeletePurse: - return await processPeerPullCreditAbortingDeletePurse(ws, pullIni); + return await processPeerPullCreditAbortingDeletePurse( + ws, + pullIni, + cancellationToken, + ); case PeerPullPaymentCreditStatus.PendingWithdrawing: return handlePeerPullCreditWithdrawing(ws, pullIni); case PeerPullPaymentCreditStatus.Aborted: @@ -817,6 +830,7 @@ async function processPeerPullCreditKycRequired( ws: InternalWalletState, peerIni: PeerPullCreditRecord, kycPending: WalletKycUuid, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -833,6 +847,7 @@ async function processPeerPullCreditKycRequired( logger.info(`kyc url ${url.href}`); const kycStatusRes = await ws.http.fetch(url.href, { method: "GET", + cancellationToken, }); if ( @@ -1080,7 +1095,7 @@ export async function initiatePeerPullPayment( }); notifyTransition(ws, ctx.transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(ctx.retryTag); + ws.taskScheduler.startShepherdTask(ctx.taskId); return { talerUri: stringifyTalerUri({ diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 9fa7eb575..30bd1a2c8 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -68,7 +68,7 @@ import { import { PeerCoinRepair, selectPeerCoins } from "./coinSelection.js"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TransactionContext, @@ -105,7 +105,7 @@ const logger = new Logger("pay-peer-pull-debit.ts"); export class PeerPullDebitTransactionContext implements TransactionContext { ws: InternalWalletState; readonly transactionId: TransactionIdStr; - readonly taskId: TaskId; + readonly taskId: TaskIdStr; peerPullDebitId: string; constructor(ws: InternalWalletState, peerPullDebitId: string) { diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts index 42a5b19df..e629bffe4 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -31,6 +31,7 @@ import { TalerPreciseTimestamp, TalerProtocolTimestamp, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -55,7 +56,7 @@ import { import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -93,8 +94,8 @@ import { const logger = new Logger("pay-peer-push-credit.ts"); export class PeerPushCreditTransactionContext implements TransactionContext { - readonly transactionId: string; - readonly retryTag: TaskId; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -104,7 +105,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext { tag: TransactionType.PeerPushCredit, peerPushCreditId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushCredit, peerPushCreditId, }); @@ -140,7 +141,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const { ws, peerPushCreditId, retryTag, transactionId } = this; + const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -194,7 +195,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext { } async abortTransaction(): Promise<void> { - const { ws, peerPushCreditId, retryTag, transactionId } = this; + const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -251,7 +252,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext { } async resumeTransaction(): Promise<void> { - const { ws, peerPushCreditId, retryTag, transactionId } = this; + const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -304,7 +305,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext { } async failTransaction(): Promise<void> { - const { ws, peerPushCreditId, retryTag, transactionId } = this; + const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -575,6 +576,7 @@ async function processPeerPushCreditKycRequired( ws: InternalWalletState, peerInc: PeerPushPaymentIncomingRecord, kycPending: WalletKycUuid, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, @@ -591,6 +593,7 @@ async function processPeerPushCreditKycRequired( logger.info(`kyc url ${url.href}`); const kycStatusRes = await ws.http.fetch(url.href, { method: "GET", + cancellationToken, }); if ( @@ -651,6 +654,7 @@ async function handlePendingMerge( ws: InternalWalletState, peerInc: PeerPushPaymentIncomingRecord, contractTerms: PeerContractTerms, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { peerPushCreditId } = peerInc; const transactionId = constructTransactionIdentifier({ @@ -705,7 +709,12 @@ async function handlePendingMerge( const respJson = await mergeHttpResp.json(); const kycPending = codecForWalletKycUuid().decode(respJson); logger.info(`kyc uuid response: ${j2s(kycPending)}`); - return processPeerPushCreditKycRequired(ws, peerInc, kycPending); + return processPeerPushCreditKycRequired( + ws, + peerInc, + kycPending, + cancellationToken, + ); } logger.trace(`merge request: ${j2s(mergeReq)}`); @@ -887,7 +896,7 @@ export async function processPeerPushCredit( } case PeerPushCreditStatus.PendingMerge: - return handlePendingMerge(ws, peerInc, contractTerms); + return handlePendingMerge(ws, peerInc, contractTerms, cancellationToken); case PeerPushCreditStatus.PendingWithdrawing: return handlePendingWithdrawing(ws, peerInc); @@ -940,7 +949,7 @@ export async function confirmPeerPushCredit( const ctx = new PeerPushCreditTransactionContext(ws, peerPushCreditId); - ws.taskScheduler.startShepherdTask(ctx.retryTag); + ws.taskScheduler.startShepherdTask(ctx.taskId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index 1bb3b8772..40a5d97a4 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -52,7 +52,7 @@ import { import { PeerCoinRepair, selectPeerCoins } from "./coinSelection.js"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TransactionContext, @@ -84,7 +84,7 @@ const logger = new Logger("pay-peer-push-debit.ts"); export class PeerPushDebitTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly retryTag: TaskId; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -94,7 +94,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { tag: TransactionType.PeerPushDebit, pursePub, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub, }); @@ -112,7 +112,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const { ws, pursePub, transactionId, retryTag } = this; + const { ws, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { @@ -170,7 +170,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async abortTransaction(): Promise<void> { - const { ws, pursePub, transactionId, retryTag } = this; + const { ws, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { @@ -224,7 +224,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async resumeTransaction(): Promise<void> { - const { ws, pursePub, transactionId, retryTag } = this; + const { ws, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { @@ -282,7 +282,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async failTransaction(): Promise<void> { - const { ws, pursePub, transactionId, retryTag } = this; + const { ws, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { @@ -440,6 +440,7 @@ async function handlePurseCreationConflict( async function processPeerPushDebitCreateReserve( ws: InternalWalletState, peerPushInitiation: PeerPushDebitRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const pursePub = peerPushInitiation.pursePub; const purseExpiration = peerPushInitiation.purseExpiration; @@ -519,6 +520,7 @@ async function processPeerPushDebitCreateReserve( const httpResp = await ws.http.fetch(createPurseUrl.href, { method: "POST", body: reqBody, + cancellationToken, }); { @@ -563,6 +565,7 @@ async function processPeerPushDebitCreateReserve( async function processPeerPushDebitAbortingDeletePurse( ws: InternalWalletState, peerPushInitiation: PeerPushDebitRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { pursePub, pursePriv } = peerPushInitiation; const transactionId = constructTransactionIdentifier({ @@ -582,6 +585,7 @@ async function processPeerPushDebitAbortingDeletePurse( headers: { "taler-purse-signature": sigResp.sig, }, + cancellationToken, }); logger.info(`deleted purse with response status ${resp.status}`); @@ -886,7 +890,11 @@ export async function processPeerPushDebit( switch (peerPushInitiation.status) { case PeerPushDebitStatus.PendingCreatePurse: - return processPeerPushDebitCreateReserve(ws, peerPushInitiation); + return processPeerPushDebitCreateReserve( + ws, + peerPushInitiation, + cancellationToken, + ); case PeerPushDebitStatus.PendingReady: return processPeerPushDebitReady( ws, @@ -894,7 +902,11 @@ export async function processPeerPushDebit( cancellationToken, ); case PeerPushDebitStatus.AbortingDeletePurse: - return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation); + return processPeerPushDebitAbortingDeletePurse( + ws, + peerPushInitiation, + cancellationToken, + ); case PeerPushDebitStatus.AbortingRefreshDeleted: return processPeerPushDebitAbortingRefreshDeleted(ws, peerPushInitiation); case PeerPushDebitStatus.AbortingRefreshExpired: @@ -1028,7 +1040,7 @@ export async function initiatePeerPushDebit( hintTransactionId: transactionId, }); - ws.taskScheduler.startShepherdTask(ctx.retryTag); + ws.taskScheduler.startShepherdTask(ctx.taskId); return { contractPriv: contractKeyPair.priv, diff --git a/packages/taler-wallet-core/src/recoup.ts b/packages/taler-wallet-core/src/recoup.ts index 6f1546d57..0ec71f4e7 100644 --- a/packages/taler-wallet-core/src/recoup.ts +++ b/packages/taler-wallet-core/src/recoup.ts @@ -31,6 +31,7 @@ import { Logger, RefreshReason, TalerPreciseTimestamp, + TransactionIdStr, TransactionType, URL, checkDbInvariant, @@ -43,6 +44,7 @@ import { import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { PendingTaskType, + TaskIdStr, TaskRunResult, TransactionContext, constructTaskIdentifier, @@ -432,8 +434,8 @@ export class RewardTransactionContext implements TransactionContext { deleteTransaction(): Promise<void> { throw new Error("Method not implemented."); } - public transactionId: string; - public retryTag: string; + public transactionId: TransactionIdStr; + public taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -443,7 +445,7 @@ export class RewardTransactionContext implements TransactionContext { tag: TransactionType.Recoup, recoupGroupId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Recoup, recoupGroupId, }); diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts index 3b75ae2f3..f1ee84f3e 100644 --- a/packages/taler-wallet-core/src/refresh.ts +++ b/packages/taler-wallet-core/src/refresh.ts @@ -52,6 +52,7 @@ import { TalerErrorDetail, TalerPreciseTimestamp, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionState, TransactionType, @@ -68,7 +69,7 @@ import { makeCoinAvailable, makeCoinsVisible, PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -104,8 +105,8 @@ import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; const logger = new Logger("refresh.ts"); export class RefreshTransactionContext implements TransactionContext { - public transactionId: string; - readonly taskId: TaskId; + public transactionId: TransactionIdStr; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -493,6 +494,7 @@ async function refreshMelt( ws: InternalWalletState, refreshGroupId: string, coinIndex: number, + cancellationToken: CancellationToken, ): Promise<void> { const d = await ws.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "denominations"], @@ -606,6 +608,7 @@ async function refreshMelt( method: "POST", body: meltReqBody, timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken, }); }); @@ -687,6 +690,7 @@ async function refreshMelt( headers: { "Taler-Coin-History-Signature": historySig.sig, }, + cancellationToken, }); const historyJson = await historyResp.json(); @@ -789,6 +793,7 @@ async function refreshReveal( ws: InternalWalletState, refreshGroupId: string, coinIndex: number, + cancellationToken: CancellationToken, ): Promise<void> { logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, @@ -913,6 +918,7 @@ async function refreshReveal( body: req, method: "POST", timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken, }); }); @@ -1026,26 +1032,28 @@ export async function processRefreshGroup( let errors: TalerErrorDetail[] = []; let inShutdown = false; const ps = refreshGroup.oldCoinPubs.map((x, i) => - processRefreshSession(ws, refreshGroupId, i).catch((x) => { - if (x instanceof CryptoApiStoppedError) { - inShutdown = true; - logger.info( - "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", - ); - return; - } - if (x instanceof TalerError) { - logger.warn("process refresh session got exception (TalerError)"); - logger.warn(`exc ${x}`); - logger.warn(`exc stack ${x.stack}`); - logger.warn(`error detail: ${j2s(x.errorDetail)}`); - } else { - logger.warn("process refresh session got exception"); - logger.warn(`exc ${x}`); - logger.warn(`exc stack ${x.stack}`); - } - errors.push(getErrorDetailFromException(x)); - }), + processRefreshSession(ws, refreshGroupId, i, cancellationToken).catch( + (x) => { + if (x instanceof CryptoApiStoppedError) { + inShutdown = true; + logger.info( + "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", + ); + return; + } + if (x instanceof TalerError) { + logger.warn("process refresh session got exception (TalerError)"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + logger.warn(`error detail: ${j2s(x.errorDetail)}`); + } else { + logger.warn("process refresh session got exception"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + } + errors.push(getErrorDetailFromException(x)); + }, + ), ); try { logger.info("waiting for refreshes"); @@ -1078,6 +1086,7 @@ async function processRefreshSession( ws: InternalWalletState, refreshGroupId: string, coinIndex: number, + cancellationToken: CancellationToken, ): Promise<void> { logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, @@ -1109,9 +1118,9 @@ async function processRefreshSession( return; } if (refreshSession.norevealIndex === undefined) { - await refreshMelt(ws, refreshGroupId, coinIndex); + await refreshMelt(ws, refreshGroupId, coinIndex, cancellationToken); } - await refreshReveal(ws, refreshGroupId, coinIndex); + await refreshReveal(ws, refreshGroupId, coinIndex, cancellationToken); } export interface RefreshOutputInfo { diff --git a/packages/taler-wallet-core/src/reward.ts b/packages/taler-wallet-core/src/reward.ts index 6bfd3b324..51eb0f5bd 100644 --- a/packages/taler-wallet-core/src/reward.ts +++ b/packages/taler-wallet-core/src/reward.ts @@ -31,6 +31,7 @@ import { } from "@gnu-taler/taler-util"; import { PendingTaskType, + TaskIdStr, TaskRunResult, TombstoneTag, TransactionContext, @@ -46,8 +47,8 @@ import { InternalWalletState } from "./wallet.js"; const logger = new Logger("operations/tip.ts"); export class RewardTransactionContext implements TransactionContext { - public transactionId: string; - public taskId: string; + public transactionId: TransactionIdStr; + public taskId: TaskIdStr; constructor( public ws: InternalWalletState, diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index e8fddfc73..2fd260b11 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -41,7 +41,7 @@ import { processBackupForProvider } from "./backup/index.js"; import { DbRetryInfo, PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, constructTaskIdentifier, @@ -104,7 +104,7 @@ function taskGivesLiveness(taskId: string): boolean { } export class TaskScheduler { - private sheps: Map<TaskId, ShepherdInfo> = new Map(); + private sheps: Map<TaskIdStr, ShepherdInfo> = new Map(); private iterCond = new AsyncCondition(); @@ -155,7 +155,7 @@ export class TaskScheduler { logger.info("Done with task loop."); } - startShepherdTask(taskId: TaskId): void { + startShepherdTask(taskId: TaskIdStr): void { // Run in the background, no await! this.internalStartShepherdTask(taskId); } @@ -176,7 +176,7 @@ export class TaskScheduler { } } - private async internalStartShepherdTask(taskId: TaskId): Promise<void> { + private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { @@ -197,7 +197,7 @@ export class TaskScheduler { } } - stopShepherdTask(taskId: TaskId): void { + stopShepherdTask(taskId: TaskIdStr): void { logger.trace(`Stopping shepherding of ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { @@ -208,12 +208,12 @@ export class TaskScheduler { } } - restartShepherdTask(taskId: TaskId): void { + restartShepherdTask(taskId: TaskIdStr): void { this.stopShepherdTask(taskId); this.startShepherdTask(taskId); } - async resetTaskRetries(taskId: TaskId): Promise<void> { + async resetTaskRetries(taskId: TaskIdStr): Promise<void> { const maybeNotification = await this.ws.db.runAllStoresReadWriteTx( async (tx) => { await tx.operationRetries.delete(taskId); @@ -228,7 +228,7 @@ export class TaskScheduler { } private async wait( - taskId: TaskId, + taskId: TaskIdStr, info: ShepherdInfo, delay: Duration, ): Promise<void> { @@ -240,7 +240,7 @@ export class TaskScheduler { } private async internalShepherdTask( - taskId: TaskId, + taskId: TaskIdStr, info: ShepherdInfo, ): Promise<void> { while (true) { @@ -423,7 +423,7 @@ async function storePendingTaskFinished( async function runTaskWithErrorReporting( ws: InternalWalletState, - opId: TaskId, + opId: TaskIdStr, f: () => Promise<TaskRunResult>, ): Promise<TaskRunResult> { let maybeError: TalerErrorDetail | undefined; @@ -510,7 +510,7 @@ async function runTaskWithErrorReporting( async function callOperationHandlerForTaskId( ws: InternalWalletState, - taskId: TaskId, + taskId: TaskIdStr, cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const pending = parseTaskIdentifier(taskId); @@ -741,7 +741,7 @@ function convertTaskToTransactionId( } export interface ActiveTaskIdsResult { - taskIds: TaskId[]; + taskIds: TaskIdStr[]; } export async function getActiveTaskIds( diff --git a/packages/taler-wallet-core/src/transactions.ts b/packages/taler-wallet-core/src/transactions.ts index 7d54ca980..d7f0c0d18 100644 --- a/packages/taler-wallet-core/src/transactions.ts +++ b/packages/taler-wallet-core/src/transactions.ts @@ -54,8 +54,8 @@ import { import { constructTaskIdentifier, PendingTaskType, - TaskId, TaskIdentifiers, + TaskIdStr, TransactionContext, } from "./common.js"; import { @@ -1622,7 +1622,9 @@ export function parseTransactionIdentifier( } } -function maybeTaskFromTransaction(transactionId: string): TaskId | undefined { +function maybeTaskFromTransaction( + transactionId: string, +): TaskIdStr | undefined { const parsedTx = parseTransactionIdentifier(transactionId); if (!parsedTx) { @@ -1786,6 +1788,9 @@ export async function deleteTransaction( ): Promise<void> { const ctx = await getContextForTransaction(ws, transactionId); await ctx.deleteTransaction(); + if (ctx.taskId) { + ws.taskScheduler.stopShepherdTask(ctx.taskId); + } } export async function abortTransaction( diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 2d9f5c35c..a54295613 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -100,7 +100,7 @@ import { } from "./coinSelection.js"; import { PendingTaskType, - TaskId, + TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -155,7 +155,7 @@ const logger = new Logger("operations/withdraw.ts"); export class WithdrawTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; - readonly taskId: TaskId; + readonly taskId: TaskIdStr; constructor( public ws: InternalWalletState, @@ -799,6 +799,7 @@ async function handleKycRequired( resp: HttpResponse, startIdx: number, requestCoinIdxs: number[], + cancellationToken: CancellationToken, ): Promise<void> { logger.info("withdrawal requires KYC"); const respJson = await resp.json(); @@ -822,6 +823,7 @@ async function handleKycRequired( logger.info(`kyc url ${url.href}`); const kycStatusRes = await ws.http.fetch(url.href, { method: "GET", + cancellationToken, }); let kycUrl: string; let amlStatus: ExchangeAmlStatus | undefined; @@ -1003,7 +1005,14 @@ async function processPlanchetExchangeBatchRequest( timeout: Duration.fromSpec({ seconds: 40 }), }); if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { - await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs); + await handleKycRequired( + ws, + withdrawalGroup, + resp, + 0, + requestCoinIdxs, + cancellationToken, + ); return { batchResp: { ev_sigs: [] }, coinIdxs: [], @@ -1359,6 +1368,7 @@ interface WithdrawalGroupContext { async function processWithdrawalGroupAbortingBank( ws: InternalWalletState, withdrawalGroup: WithdrawalGroupRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { withdrawalGroupId } = withdrawalGroup; const transactionId = constructTransactionIdentifier({ @@ -1375,6 +1385,7 @@ async function processWithdrawalGroupAbortingBank( const abortResp = await ws.http.fetch(abortUrl, { method: "POST", body: {}, + cancellationToken, }); logger.info(`abort response status: ${abortResp.status}`); @@ -1708,7 +1719,11 @@ export async function processWithdrawalGroup( cancellationToken, ); case WithdrawalGroupStatus.AbortingBank: - return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup); + return await processWithdrawalGroupAbortingBank( + ws, + withdrawalGroup, + cancellationToken, + ); case WithdrawalGroupStatus.AbortedBank: case WithdrawalGroupStatus.AbortedExchange: case WithdrawalGroupStatus.FailedAbortingBank: @@ -1749,10 +1764,14 @@ export async function getExchangeWithdrawalInfo( ); } - const withdrawalAccountsList = await fetchWithdrawalAccountInfo(ws, { - exchange, - instructedAmount, - }); + const withdrawalAccountsList = await fetchWithdrawalAccountInfo( + ws, + { + exchange, + instructedAmount, + }, + CancellationToken.CONTINUE, + ); logger.trace("updating withdrawal denoms"); await updateWithdrawalDenoms(ws, exchangeBaseUrl); @@ -2069,6 +2088,7 @@ export function getBankAbortUrl(talerWithdrawUri: string): string { async function registerReserveWithBank( ws: InternalWalletState, withdrawalGroupId: string, + cancellationToken: CancellationToken, ): Promise<void> { const withdrawalGroup = await ws.db.runReadOnlyTx( ["withdrawalGroups"], @@ -2106,6 +2126,7 @@ async function registerReserveWithBank( method: "POST", body: reqBody, timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken, }); const status = await readSuccessResponseJsonOrThrow( httpResp, @@ -2231,7 +2252,7 @@ async function processBankRegisterReserve( // FIXME: Put confirm transfer URL in the DB! - await registerReserveWithBank(ws, withdrawalGroupId); + await registerReserveWithBank(ws, withdrawalGroupId, cancellationToken); return TaskRunResult.progress(); } @@ -2631,10 +2652,14 @@ export async function acceptWithdrawalFromUri( const exchange = await fetchFreshExchange(ws, selectedExchange); - const withdrawalAccountList = await fetchWithdrawalAccountInfo(ws, { - exchange, - instructedAmount: withdrawInfo.amount, - }); + const withdrawalAccountList = await fetchWithdrawalAccountInfo( + ws, + { + exchange, + instructedAmount: withdrawInfo.amount, + }, + CancellationToken.CONTINUE, + ); const withdrawalGroup = await internalCreateWithdrawalGroup(ws, { amount: withdrawInfo.amount, @@ -2762,7 +2787,8 @@ async function fetchAccount( ws: InternalWalletState, instructedAmount: AmountJson, acct: ExchangeWireAccount, - reservePub?: string, + reservePub: string | undefined, + cancellationToken: CancellationToken, ): Promise<WithdrawalExchangeAccountDetails> { let paytoUri: string; let transferAmount: AmountString | undefined = undefined; @@ -2773,7 +2799,9 @@ async function fetchAccount( "amount_credit", Amounts.stringify(instructedAmount), ); - const httpResp = await ws.http.fetch(reqUrl.href); + const httpResp = await ws.http.fetch(reqUrl.href, { + cancellationToken, + }); const respOrErr = await readSuccessResponseJsonOrErrorCode( httpResp, codecForCashinConversionResponse(), @@ -2789,7 +2817,9 @@ async function fetchAccount( paytoUri = acct.payto_uri; transferAmount = resp.amount_debit; const configUrl = new URL("config", acct.conversion_url); - const configResp = await ws.http.fetch(configUrl.href); + const configResp = await ws.http.fetch(configUrl.href, { + cancellationToken, + }); const configRespOrError = await readSuccessResponseJsonOrErrorCode( configResp, codecForConversionBankConfig(), @@ -2840,6 +2870,7 @@ async function fetchWithdrawalAccountInfo( instructedAmount: AmountJson; reservePub?: string; }, + cancellationToken: CancellationToken, ): Promise<WithdrawalExchangeAccountDetails[]> { const { exchange } = req; const withdrawalAccounts: WithdrawalExchangeAccountDetails[] = []; @@ -2849,6 +2880,7 @@ async function fetchWithdrawalAccountInfo( req.instructedAmount, acct, req.reservePub, + cancellationToken, ); withdrawalAccounts.push(acctInfo); } @@ -2885,11 +2917,15 @@ export async function createManualWithdrawal( {}, ); - const withdrawalAccountsList = await fetchWithdrawalAccountInfo(ws, { - exchange, - instructedAmount: amount, - reservePub: reserveKeyPair.pub, - }); + const withdrawalAccountsList = await fetchWithdrawalAccountInfo( + ws, + { + exchange, + instructedAmount: amount, + reservePub: reserveKeyPair.pub, + }, + CancellationToken.CONTINUE, + ); const withdrawalGroup = await internalCreateWithdrawalGroup(ws, { amount: Amounts.jsonifyAmount(req.amount), |