From 6b9b4ba7cc1576e7cc91e2b30e9d5e75edbe0037 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 13 Mar 2024 21:07:46 +0100 Subject: wallet-core: preparations for materialzed txns --- packages/taler-wallet-core/src/common.ts | 29 +- packages/taler-wallet-core/src/db.ts | 53 +- packages/taler-wallet-core/src/pay-merchant.ts | 12 +- .../taler-wallet-core/src/pay-peer-pull-debit.ts | 26 +- packages/taler-wallet-core/src/transactions.ts | 2 +- packages/taler-wallet-core/src/withdraw.ts | 903 ++++++++++----------- 6 files changed, 536 insertions(+), 489 deletions(-) diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 5acdeeba4..eb06b8eb0 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -61,11 +61,7 @@ import { timestampPreciseToDb, } from "./db.js"; import { createRefreshGroup } from "./refresh.js"; -import { - InternalWalletState, - WalletExecutionContext, - getDenomInfo, -} from "./wallet.js"; +import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; const logger = new Logger("operations/common.ts"); @@ -696,11 +692,32 @@ export namespace TaskIdentifiers { /** * Result of a transaction transition. */ -export enum TransitionResult { +export enum TransitionResultType { Transition = 1, Stay = 2, + Delete = 3, } +export type TransitionResult = + | { type: TransitionResultType.Stay } + | { type: TransitionResultType.Transition; rec: R } + | { type: TransitionResultType.Delete }; + +export const TransitionResult = { + stay(): TransitionResult { + return { type: TransitionResultType.Stay }; + }, + delete(): TransitionResult { + return { type: TransitionResultType.Delete }; + }, + transition(rec: T): TransitionResult { + return { + type: TransitionResultType.Transition, + rec, + }; + }, +}; + /** * Transaction context. * Uniform interface to all transactions. diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 997d9c90a..1d5ff4e7a 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2021-2022 Taler Systems S.A. + (C) 2021-2024 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -53,6 +53,7 @@ import { TalerPreciseTimestamp, TalerProtocolDuration, TalerProtocolTimestamp, + Transaction, TransactionIdStr, UnblindedSignature, WireInfo, @@ -148,7 +149,7 @@ export const CURRENT_DB_CONFIG_KEY = "currentMainDbName"; * backwards-compatible way or object stores and indices * are added. */ -export const WALLET_DB_MINOR_VERSION = 6; +export const WALLET_DB_MINOR_VERSION = 7; declare const symDbProtocolTimestamp: unique symbol; @@ -2327,11 +2328,43 @@ export interface GlobalCurrencyExchangeRecord { exchangeMasterPub: string; } +/** + * Primary key: transactionItem.transactionId + */ +export interface TransactionRecord { + /** + * Transaction item returned to the client. + */ + transactionItem: Transaction; + + /** + * Exchanges involved in the transaction. + */ + exchanges: string[]; + + currency: string; +} + /** * Schema definition for the IndexedDB * wallet database. */ export const WalletStoresV1 = { + transactions: describeStoreV2({ + recordCodec: passthroughCodec(), + storeName: "transactions", + keyPath: "transactionItem.transactionId", + versionAdded: 7, + indexes: { + byCurrency: describeIndex("byCurrency", "currency", { + versionAdded: 7, + }), + byExchange: describeIndex("byExchange", "exchanges", { + versionAdded: 7, + multiEntry: true, + }), + }, + }), globalCurrencyAuditors: describeStoreV2({ recordCodec: passthroughCodec(), storeName: "globalCurrencyAuditors", @@ -2721,22 +2754,22 @@ export const WalletStoresV1 = { ), }; -export type WalletDbReadWriteTransaction< - StoresArr extends Array>, -> = DbReadWriteTransaction; +export type WalletDbStoresArr = Array>; + +export type WalletDbReadWriteTransaction = + DbReadWriteTransaction; -export type WalletDbReadOnlyTransaction< - StoresArr extends Array>, -> = DbReadOnlyTransaction; +export type WalletDbReadOnlyTransaction = + DbReadOnlyTransaction; export type WalletDbAllStoresReadOnlyTransaction<> = DbReadOnlyTransaction< typeof WalletStoresV1, - Array> + WalletDbStoresArr >; export type WalletDbAllStoresReadWriteTransaction<> = DbReadWriteTransaction< typeof WalletStoresV1, - Array> + WalletDbStoresArr >; /** diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 401020f47..40abd44a0 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -106,7 +106,7 @@ import { TaskRunResultType, TombstoneTag, TransactionContext, - TransitionResult, + TransitionResultType, } from "./common.js"; import { EddsaKeypair } from "./crypto/cryptoImplementation.js"; import { @@ -172,7 +172,7 @@ export class PayMerchantTransactionContext implements TransactionContext { * Transition a payment transition. */ async transition( - f: (rec: PurchaseRecord) => Promise, + f: (rec: PurchaseRecord) => Promise, ): Promise { return this.transitionExtra( { @@ -196,7 +196,7 @@ export class PayMerchantTransactionContext implements TransactionContext { typeof WalletStoresV1, ["purchases", ...StoreNameArray] >, - ) => Promise, + ) => Promise, ): Promise { const ws = this.wex; const extraStores = opts.extraStores ?? []; @@ -210,7 +210,7 @@ export class PayMerchantTransactionContext implements TransactionContext { const oldTxState = computePayMerchantTransactionState(purchaseRec); const res = await f(purchaseRec, tx); switch (res) { - case TransitionResult.Transition: { + case TransitionResultType.Transition: { await tx.purchases.put(purchaseRec); const newTxState = computePayMerchantTransactionState(purchaseRec); return { @@ -2746,9 +2746,9 @@ async function processPurchaseAbortingRefund( await ctx.transition(async (rec) => { if (rec.purchaseStatus === PurchaseStatus.AbortingWithRefund) { rec.purchaseStatus = PurchaseStatus.AbortedOrderDeleted; - return TransitionResult.Transition; + return TransitionResultType.Transition; } - return TransitionResult.Stay; + return TransitionResultType.Stay; }); } } diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 114d2366a..6cc552714 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -71,7 +71,7 @@ import { TaskRunResult, TaskRunResultType, TransactionContext, - TransitionResult, + TransitionResultType, constructTaskIdentifier, spendCoins, } from "./common.js"; @@ -192,17 +192,17 @@ export class PeerPullDebitTransactionContext implements TransactionContext { switch (pi.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: pi.status = PeerPullDebitRecordStatus.PendingDeposit; - return TransitionResult.Transition; + return TransitionResultType.Transition; case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: pi.status = PeerPullDebitRecordStatus.AbortingRefresh; - return TransitionResult.Transition; + return TransitionResultType.Transition; case PeerPullDebitRecordStatus.Aborted: case PeerPullDebitRecordStatus.AbortingRefresh: case PeerPullDebitRecordStatus.Failed: case PeerPullDebitRecordStatus.DialogProposed: case PeerPullDebitRecordStatus.Done: case PeerPullDebitRecordStatus.PendingDeposit: - return TransitionResult.Stay; + return TransitionResultType.Stay; } }); this.wex.taskScheduler.startShepherdTask(this.taskId); @@ -218,9 +218,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext { case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: // FIXME: Should we also abort the corresponding refresh session?! pi.status = PeerPullDebitRecordStatus.Failed; - return TransitionResult.Transition; + return TransitionResultType.Transition; default: - return TransitionResult.Stay; + return TransitionResultType.Stay; } }); this.wex.taskScheduler.stopShepherdTask(this.taskId); @@ -244,7 +244,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { case PeerPullDebitRecordStatus.PendingDeposit: break; default: - return TransitionResult.Stay; + return TransitionResultType.Stay; } const currency = Amounts.currencyOf(pi.totalCostEstimated); const coinPubs: CoinRefreshRequest[] = []; @@ -271,13 +271,13 @@ export class PeerPullDebitTransactionContext implements TransactionContext { pi.status = PeerPullDebitRecordStatus.AbortingRefresh; pi.abortRefreshGroupId = refresh.refreshGroupId; - return TransitionResult.Transition; + return TransitionResultType.Transition; }, ); } async transition( - f: (rec: PeerPullPaymentIncomingRecord) => Promise, + f: (rec: PeerPullPaymentIncomingRecord) => Promise, ): Promise { return this.transitionExtra( { @@ -297,7 +297,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { typeof WalletStoresV1, ["peerPullDebit", ...StoreNameArray] >, - ) => Promise, + ) => Promise, ): Promise { const wex = this.wex; const extraStores = opts.extraStores ?? []; @@ -311,7 +311,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const oldTxState = computePeerPullDebitTransactionState(pi); const res = await f(pi, tx); switch (res) { - case TransitionResult.Transition: { + case TransitionResultType.Transition: { await tx.peerPullDebit.put(pi); const newTxState = computePeerPullDebitTransactionState(pi); return { @@ -460,10 +460,10 @@ async function processPeerPullDebitPendingDeposit( await ctx.transition(async (r) => { if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return TransitionResult.Stay; + return TransitionResultType.Stay; } r.status = PeerPullDebitRecordStatus.Done; - return TransitionResult.Transition; + return TransitionResultType.Transition; }); return TaskRunResult.finished(); } diff --git a/packages/taler-wallet-core/src/transactions.ts b/packages/taler-wallet-core/src/transactions.ts index 0e3f4a3fb..1660c8d09 100644 --- a/packages/taler-wallet-core/src/transactions.ts +++ b/packages/taler-wallet-core/src/transactions.ts @@ -739,7 +739,7 @@ function buildTransactionForBankIntegratedWithdraw( }; } -function isUnsuccessfulTransaction(state: TransactionState): boolean { +export function isUnsuccessfulTransaction(state: TransactionState): boolean { return ( state.major === TransactionMajorState.Aborted || state.major === TransactionMajorState.Expired || diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 6a9d27fe8..424f370ff 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -59,6 +59,7 @@ import { TalerErrorDetail, TalerPreciseTimestamp, TalerProtocolTimestamp, + Transaction, TransactionAction, TransactionIdStr, TransactionMajorState, @@ -70,6 +71,7 @@ import { WalletNotification, WithdrawUriInfoResponse, WithdrawalExchangeAccountDetails, + WithdrawalType, addPaytoQueryParams, assertUnreachable, canonicalizeBaseUrl, @@ -104,6 +106,8 @@ import { TaskRunResultType, TombstoneTag, TransactionContext, + TransitionResult, + TransitionResultType, constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, @@ -119,11 +123,13 @@ import { PlanchetStatus, WalletDbReadOnlyTransaction, WalletDbReadWriteTransaction, + WalletDbStoresArr, WalletStoresV1, WgInfo, WithdrawalGroupRecord, WithdrawalGroupStatus, WithdrawalRecordType, + timestampPreciseFromDb, timestampPreciseToDb, } from "./db.js"; import { @@ -143,6 +149,7 @@ import { DbAccess } from "./query.js"; import { TransitionInfo, constructTransactionIdentifier, + isUnsuccessfulTransaction, notifyTransition, } from "./transactions.js"; import { @@ -156,6 +163,117 @@ import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; */ const logger = new Logger("operations/withdraw.ts"); +/** + * Update the materialized withdrawal transaction based + * on the withdrawal group record. + */ +async function updateWithdrawalTransaction( + ctx: WithdrawTransactionContext, + tx: WalletDbReadWriteTransaction< + [ + "withdrawalGroups", + "transactions", + "operationRetries", + "exchanges", + "exchangeDetails", + ] + >, +): Promise { + const wgRecord = await tx.withdrawalGroups.get(ctx.withdrawalGroupId); + if (!wgRecord) { + await tx.transactions.delete(ctx.transactionId); + return; + } + const retryRecord = await tx.operationRetries.get(ctx.taskId); + + let transactionItem: Transaction; + + if (wgRecord.wgInfo.withdrawalType === WithdrawalRecordType.BankIntegrated) { + const txState = computeWithdrawalTransactionStatus(wgRecord); + transactionItem = { + type: TransactionType.Withdrawal, + txState, + txActions: computeWithdrawalTransactionActions(wgRecord), + amountEffective: isUnsuccessfulTransaction(txState) + ? Amounts.stringify(Amounts.zeroOfAmount(wgRecord.instructedAmount)) + : Amounts.stringify(wgRecord.denomsSel.totalCoinValue), + amountRaw: Amounts.stringify(wgRecord.instructedAmount), + withdrawalDetails: { + type: WithdrawalType.TalerBankIntegrationApi, + confirmed: wgRecord.wgInfo.bankInfo.timestampBankConfirmed + ? true + : false, + exchangeCreditAccountDetails: wgRecord.wgInfo.exchangeCreditAccounts, + reservePub: wgRecord.reservePub, + bankConfirmationUrl: wgRecord.wgInfo.bankInfo.confirmUrl, + reserveIsReady: + wgRecord.status === WithdrawalGroupStatus.Done || + wgRecord.status === WithdrawalGroupStatus.PendingReady, + }, + kycUrl: wgRecord.kycUrl, + exchangeBaseUrl: wgRecord.exchangeBaseUrl, + timestamp: timestampPreciseFromDb(wgRecord.timestampStart), + transactionId: ctx.transactionId, + }; + } else if ( + wgRecord.wgInfo.withdrawalType === WithdrawalRecordType.BankManual + ) { + const exchangeDetails = await getExchangeWireDetailsInTx( + tx, + wgRecord.exchangeBaseUrl, + ); + const plainPaytoUris = + exchangeDetails?.wireInfo?.accounts.map((x) => x.payto_uri) ?? []; + + const exchangePaytoUris = augmentPaytoUrisForWithdrawal( + plainPaytoUris, + wgRecord.reservePub, + wgRecord.instructedAmount, + ); + + const txState = computeWithdrawalTransactionStatus(wgRecord); + + transactionItem = { + type: TransactionType.Withdrawal, + txState, + txActions: computeWithdrawalTransactionActions(wgRecord), + amountEffective: isUnsuccessfulTransaction(txState) + ? Amounts.stringify(Amounts.zeroOfAmount(wgRecord.instructedAmount)) + : Amounts.stringify(wgRecord.denomsSel.totalCoinValue), + amountRaw: Amounts.stringify(wgRecord.instructedAmount), + withdrawalDetails: { + type: WithdrawalType.ManualTransfer, + reservePub: wgRecord.reservePub, + exchangePaytoUris, + exchangeCreditAccountDetails: wgRecord.wgInfo.exchangeCreditAccounts, + reserveIsReady: + wgRecord.status === WithdrawalGroupStatus.Done || + wgRecord.status === WithdrawalGroupStatus.PendingReady, + }, + kycUrl: wgRecord.kycUrl, + exchangeBaseUrl: wgRecord.exchangeBaseUrl, + timestamp: timestampPreciseFromDb(wgRecord.timestampStart), + transactionId: ctx.transactionId, + }; + } else { + // FIXME: If this is an orphaned withdrawal for a p2p transaction, we + // still might want to report the withdrawal. + return; + } + + if (retryRecord?.lastError) { + transactionItem.error = retryRecord.lastError; + } + + await tx.transactions.put({ + currency: Amounts.currencyOf(wgRecord.instructedAmount), + transactionItem, + exchanges: [wgRecord.exchangeBaseUrl], + }); + + // FIXME: Handle orphaned withdrawals where the p2p or recoup tx was deleted? +} + export class WithdrawTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; @@ -174,33 +292,108 @@ export class WithdrawTransactionContext implements TransactionContext { }); } - async deleteTransaction(): Promise { - const { wex: ws, withdrawalGroupId } = this; - await ws.db.runReadWriteTx( - ["withdrawalGroups", "tombstones"], + /** + * Transition a withdrawal transaction. + * Extra object stores may be accessed during the transition. + */ + async transition( + opts: { extraStores?: StoreNameArray; transactionLabel?: string }, + f: ( + rec: WithdrawalGroupRecord | undefined, + tx: WalletDbReadWriteTransaction< + [ + "withdrawalGroups", + "transactions", + "operationRetries", + "exchanges", + "exchangeDetails", + ...StoreNameArray, + ] + >, + ) => Promise>, + ): Promise { + const baseStores = [ + "withdrawalGroups" as const, + "transactions" as const, + "operationRetries" as const, + "exchanges" as const, + "exchangeDetails" as const, + ]; + let stores = opts.extraStores + ? [...baseStores, ...opts.extraStores] + : baseStores; + const transitionInfo = await this.wex.db.runReadWriteTx( + stores, async (tx) => { - const withdrawalGroupRecord = - await tx.withdrawalGroups.get(withdrawalGroupId); - if (withdrawalGroupRecord) { - await tx.withdrawalGroups.delete(withdrawalGroupId); + const wgRec = await tx.withdrawalGroups.get(this.withdrawalGroupId); + let oldTxState: TransactionState; + if (wgRec) { + oldTxState = computeWithdrawalTransactionStatus(wgRec); + } else { + oldTxState = { + major: TransactionMajorState.None, + }; + } + const res = await f(wgRec, tx); + switch (res.type) { + case TransitionResultType.Transition: { + await tx.withdrawalGroups.put(res.rec); + await updateWithdrawalTransaction(this, tx); + const newTxState = computeWithdrawalTransactionStatus(res.rec); + return { + oldTxState, + newTxState, + }; + } + case TransitionResultType.Delete: + await tx.withdrawalGroups.delete(this.withdrawalGroupId); + await updateWithdrawalTransaction(this, tx); + return { + oldTxState, + newTxState: { + major: TransactionMajorState.None, + }, + }; + default: + return undefined; + } + }, + ); + notifyTransition(this.wex, this.transactionId, transitionInfo); + return transitionInfo; + } + + async deleteTransaction(): Promise { + await this.transition( + { + extraStores: ["tombstones"], + transactionLabel: "delete-transaction-withdraw", + }, + async (rec, tx) => { + if (!rec) { + return TransitionResult.stay(); + } + if (rec) { await tx.tombstones.put({ - id: TombstoneTag.DeleteWithdrawalGroup + ":" + withdrawalGroupId, + id: + TombstoneTag.DeleteWithdrawalGroup + ":" + rec.withdrawalGroupId, }); - return; } + return TransitionResult.delete(); }, ); } async suspendTransaction(): Promise { - const { wex, withdrawalGroupId, transactionId, taskId } = this; - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); + const { withdrawalGroupId } = this; + await this.transition( + { + transactionLabel: "suspend-transaction-withdraw", + }, + async (wg, _tx) => { if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); - return; + return TransitionResult.stay(); } let newStatus: WithdrawalGroupStatus | undefined = undefined; switch (wg.status) { @@ -229,33 +422,24 @@ export class WithdrawTransactionContext implements TransactionContext { logger.warn( `Unsupported 'suspend' on withdrawal transaction in status ${wg.status}`, ); + return TransitionResult.stay(); } - if (newStatus != null) { - const oldTxState = computeWithdrawalTransactionStatus(wg); - wg.status = newStatus; - const newTxState = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState, - newTxState, - }; - } - return undefined; + wg.status = newStatus; + return TransitionResult.transition(wg); }, ); - wex.taskScheduler.stopShepherdTask(taskId); - notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { - const { wex, withdrawalGroupId, transactionId, taskId } = this; - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); + const { withdrawalGroupId } = this; + await this.transition( + { + transactionLabel: "abort-transaction-withdraw", + }, + async (wg, _tx) => { if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); - return; + return TransitionResult.stay(); } let newStatus: WithdrawalGroupStatus | undefined = undefined; switch (wg.status) { @@ -280,7 +464,7 @@ export class WithdrawTransactionContext implements TransactionContext { case WithdrawalGroupStatus.SuspendedAbortingBank: case WithdrawalGroupStatus.AbortingBank: // No transition needed, but not an error - break; + return TransitionResult.stay(); case WithdrawalGroupStatus.Done: case WithdrawalGroupStatus.FailedBankAborted: case WithdrawalGroupStatus.AbortedExchange: @@ -291,33 +475,22 @@ export class WithdrawTransactionContext implements TransactionContext { default: assertUnreachable(wg.status); } - if (newStatus != null) { - const oldTxState = computeWithdrawalTransactionStatus(wg); - wg.status = newStatus; - const newTxState = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState, - newTxState, - }; - } - return undefined; + wg.status = newStatus; + return TransitionResult.transition(wg); }, ); - wex.taskScheduler.stopShepherdTask(taskId); - notifyTransition(wex, transactionId, transitionInfo); - wex.taskScheduler.startShepherdTask(taskId); } async resumeTransaction(): Promise { - const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); + const { withdrawalGroupId } = this; + await this.transition( + { + transactionLabel: "resume-transaction-withdraw", + }, + async (wg, _tx) => { if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); - return; + return TransitionResult.stay(); } let newStatus: WithdrawalGroupStatus | undefined = undefined; switch (wg.status) { @@ -346,33 +519,24 @@ export class WithdrawTransactionContext implements TransactionContext { logger.warn( `Unsupported 'resume' on withdrawal transaction in status ${wg.status}`, ); + return TransitionResult.stay(); } - if (newStatus != null) { - const oldTxState = computeWithdrawalTransactionStatus(wg); - wg.status = newStatus; - const newTxState = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState, - newTxState, - }; - } - return undefined; + wg.status = newStatus; + return TransitionResult.transition(wg); }, ); - notifyTransition(wex, transactionId, transitionInfo); - wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { - const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this; - const stateUpdate = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); + const { withdrawalGroupId } = this; + await this.transition( + { + transactionLabel: "fail-transaction-withdraw", + }, + async (wg, _tx) => { if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); - return; + return TransitionResult.stay(); } let newStatus: WithdrawalGroupStatus | undefined = undefined; switch (wg.status) { @@ -381,24 +545,12 @@ export class WithdrawTransactionContext implements TransactionContext { newStatus = WithdrawalGroupStatus.FailedAbortingBank; break; default: - break; + return TransitionResult.stay(); } - if (newStatus != null) { - const oldTxState = computeWithdrawalTransactionStatus(wg); - wg.status = newStatus; - const newTxState = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState, - newTxState, - }; - } - return undefined; + wg.status = newStatus; + return TransitionResult.transition(wg); }, ); - wex.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(wex, transactionId, stateUpdate); - wex.taskScheduler.startShepherdTask(retryTag); } } @@ -743,57 +895,6 @@ enum ExchangeAmlStatus { Frozen = 2, } -/** - * Transition a withdrawal transaction with a (new) KYC URL. - * - * Emit a notification for the (self-)transition. - */ -async function transitionKycUrlUpdate( - wex: WalletExecutionContext, - withdrawalGroupId: string, - kycUrl: string, -): Promise { - let notificationKycUrl: string | undefined = undefined; - const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); - const transactionId = ctx.transactionId; - - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg2 = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wg2) { - return; - } - const oldTxState = computeWithdrawalTransactionStatus(wg2); - switch (wg2.status) { - case WithdrawalGroupStatus.PendingReady: { - wg2.kycUrl = kycUrl; - notificationKycUrl = kycUrl; - await tx.withdrawalGroups.put(wg2); - const newTxState = computeWithdrawalTransactionStatus(wg2); - return { - oldTxState, - newTxState, - }; - } - default: - return undefined; - } - }, - ); - if (transitionInfo) { - // Always notify, even on self-transition, as the KYC URL might have changed. - wex.ws.notify({ - type: NotificationType.TransactionStateTransition, - oldTxState: transitionInfo.oldTxState, - newTxState: transitionInfo.newTxState, - transactionId, - experimentalUserData: notificationKycUrl, - }); - } - wex.taskScheduler.startShepherdTask(ctx.taskId); -} - async function handleKycRequired( wex: WalletExecutionContext, withdrawalGroup: WithdrawalGroupRecord, @@ -805,10 +906,7 @@ async function handleKycRequired( const respJson = await resp.json(); const uuidResp = codecForWalletKycUuid().decode(respJson); const withdrawalGroupId = withdrawalGroup.withdrawalGroupId; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); logger.info(`kyc uuid response: ${j2s(uuidResp)}`); const exchangeUrl = withdrawalGroup.exchangeBaseUrl; const userType = "individual"; @@ -849,11 +947,14 @@ async function handleKycRequired( throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); } - let notificationKycUrl: string | undefined = undefined; - - const transitionInfo = await wex.db.runReadWriteTx( - ["planchets", "withdrawalGroups"], - async (tx) => { + await ctx.transition( + { + extraStores: ["planchets"], + }, + async (wg2, tx) => { + if (!wg2) { + return TransitionResult.stay(); + } for (let i = startIdx; i < requestCoinIdxs.length; i++) { let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ withdrawalGroup.withdrawalGroupId, @@ -865,44 +966,25 @@ async function handleKycRequired( planchet.planchetStatus = PlanchetStatus.KycRequired; await tx.planchets.put(planchet); } - const wg2 = await tx.withdrawalGroups.get( - withdrawalGroup.withdrawalGroupId, - ); - if (!wg2) { - return; - } - const oldTxState = computeWithdrawalTransactionStatus(wg2); - switch (wg2.status) { - case WithdrawalGroupStatus.PendingReady: { - wg2.kycPending = { - paytoHash: uuidResp.h_payto, - requirementRow: uuidResp.requirement_row, - }; - wg2.kycUrl = kycUrl; - wg2.status = - amlStatus === ExchangeAmlStatus.Normal || amlStatus === undefined - ? WithdrawalGroupStatus.PendingKyc - : amlStatus === ExchangeAmlStatus.Pending - ? WithdrawalGroupStatus.PendingAml - : amlStatus === ExchangeAmlStatus.Frozen - ? WithdrawalGroupStatus.SuspendedAml - : assertUnreachable(amlStatus); - - notificationKycUrl = kycUrl; - - await tx.withdrawalGroups.put(wg2); - const newTxState = computeWithdrawalTransactionStatus(wg2); - return { - oldTxState, - newTxState, - }; - } - default: - return undefined; + if (wg2.status !== WithdrawalGroupStatus.PendingReady) { + return TransitionResult.stay(); } + wg2.kycPending = { + paytoHash: uuidResp.h_payto, + requirementRow: uuidResp.requirement_row, + }; + wg2.kycUrl = kycUrl; + wg2.status = + amlStatus === ExchangeAmlStatus.Normal || amlStatus === undefined + ? WithdrawalGroupStatus.PendingKyc + : amlStatus === ExchangeAmlStatus.Pending + ? WithdrawalGroupStatus.PendingAml + : amlStatus === ExchangeAmlStatus.Frozen + ? WithdrawalGroupStatus.SuspendedAml + : assertUnreachable(amlStatus); + return TransitionResult.transition(wg2); }, ); - notifyTransition(wex, transactionId, transitionInfo, notificationKycUrl); } /** @@ -912,7 +994,7 @@ async function handleKycRequired( */ async function processPlanchetExchangeBatchRequest( wex: WalletExecutionContext, - wgContext: WithdrawalGroupContext, + wgContext: WithdrawalGroupStatusInfo, args: WithdrawalRequestBatchArgs, ): Promise { const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; @@ -1045,7 +1127,7 @@ async function processPlanchetExchangeBatchRequest( async function processPlanchetVerifyAndStoreCoin( wex: WalletExecutionContext, - wgContext: WithdrawalGroupContext, + wgContext: WithdrawalGroupStatusInfo, coinIdx: number, resp: ExchangeWithdrawResponse, ): Promise { @@ -1282,14 +1364,13 @@ async function processQueryReserve( wex: WalletExecutionContext, withdrawalGroupId: string, ): Promise { - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); const withdrawalGroup = await getWithdrawalGroupRecordTx(wex.db, { withdrawalGroupId, }); - checkDbInvariant(!!withdrawalGroup); + if (!withdrawalGroup) { + return TaskRunResult.finished(); + } if (withdrawalGroup.status !== WithdrawalGroupStatus.PendingQueryingStatus) { return TaskRunResult.backoff(); } @@ -1328,27 +1409,15 @@ async function processQueryReserve( logger.trace(`got reserve status ${j2s(result.response)}`); - const transitionResult = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wg) { - logger.warn(`withdrawal group ${withdrawalGroupId} not found`); - return undefined; - } - const txStateOld = computeWithdrawalTransactionStatus(wg); - wg.status = WithdrawalGroupStatus.PendingReady; - const txStateNew = computeWithdrawalTransactionStatus(wg); - wg.reserveBalanceAmount = Amounts.stringify(result.response.balance); - await tx.withdrawalGroups.put(wg); - return { - oldTxState: txStateOld, - newTxState: txStateNew, - }; - }, - ); - - notifyTransition(wex, transactionId, transitionResult); + const transitionResult = await ctx.transition({}, async (wg) => { + if (!wg) { + logger.warn(`withdrawal group ${withdrawalGroupId} not found`); + return TransitionResult.stay(); + } + wg.status = WithdrawalGroupStatus.PendingReady; + wg.reserveBalanceAmount = Amounts.stringify(result.response.balance); + return TransitionResult.transition(wg); + }); if (transitionResult) { return TaskRunResult.progress(); @@ -1362,7 +1431,7 @@ async function processQueryReserve( * * Used to store some cached info during a withdrawal operation. */ -interface WithdrawalGroupContext { +interface WithdrawalGroupStatusInfo { numPlanchets: number; planchetsFinished: Set; @@ -1377,11 +1446,7 @@ async function processWithdrawalGroupAbortingBank( withdrawalGroup: WithdrawalGroupRecord, ): Promise { const { withdrawalGroupId } = withdrawalGroup; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); - + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); const wgInfo = withdrawalGroup.wgInfo; if (wgInfo.withdrawalType != WithdrawalRecordType.BankIntegrated) { throw Error("invalid state (aborting(bank) without bank info"); @@ -1395,74 +1460,25 @@ async function processWithdrawalGroupAbortingBank( }); logger.info(`abort response status: ${abortResp.status}`); - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wg) { - return undefined; - } - const txStatusOld = computeWithdrawalTransactionStatus(wg); - wg.status = WithdrawalGroupStatus.AbortedBank; - wg.timestampFinish = timestampPreciseToDb(TalerPreciseTimestamp.now()); - const txStatusNew = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState: txStatusOld, - newTxState: txStatusNew, - }; - }, - ); - notifyTransition(wex, transactionId, transitionInfo); - return TaskRunResult.finished(); -} - -/** - * Store in the database that the KYC for a withdrawal is now - * satisfied. - */ -async function transitionKycSatisfied( - wex: WalletExecutionContext, - withdrawalGroup: WithdrawalGroupRecord, -): Promise { - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId: withdrawalGroup.withdrawalGroupId, + await ctx.transition({}, async (wg) => { + if (!wg) { + return TransitionResult.stay(); + } + wg.status = WithdrawalGroupStatus.AbortedBank; + wg.timestampFinish = timestampPreciseToDb(TalerPreciseTimestamp.now()); + return TransitionResult.transition(wg); }); - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg2 = await tx.withdrawalGroups.get( - withdrawalGroup.withdrawalGroupId, - ); - if (!wg2) { - return; - } - const oldTxState = computeWithdrawalTransactionStatus(wg2); - switch (wg2.status) { - case WithdrawalGroupStatus.PendingKyc: { - delete wg2.kycPending; - delete wg2.kycUrl; - wg2.status = WithdrawalGroupStatus.PendingReady; - await tx.withdrawalGroups.put(wg2); - const newTxState = computeWithdrawalTransactionStatus(wg2); - return { - oldTxState, - newTxState, - }; - } - default: - return undefined; - } - }, - ); - notifyTransition(wex, transactionId, transitionInfo); + return TaskRunResult.finished(); } async function processWithdrawalGroupPendingKyc( wex: WalletExecutionContext, withdrawalGroup: WithdrawalGroupRecord, ): Promise { + const ctx = new WithdrawTransactionContext( + wex, + withdrawalGroup.withdrawalGroupId, + ); const userType = "individual"; const kycInfo = withdrawalGroup.kycPending; if (!kycInfo) { @@ -1474,9 +1490,6 @@ async function processWithdrawalGroupPendingKyc( exchangeUrl, ); url.searchParams.set("timeout_ms", "30000"); - - const withdrawalGroupId = withdrawalGroup.withdrawalGroupId; - logger.info(`long-polling for withdrawal KYC status via ${url.href}`); const kycStatusRes = await wex.http.fetch(url.href, { method: "GET", @@ -1485,17 +1498,42 @@ async function processWithdrawalGroupPendingKyc( logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`); if ( kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - await transitionKycSatisfied(wex, withdrawalGroup); + await ctx.transition({}, async (rec) => { + if (!rec) { + return TransitionResult.stay(); + } + switch (rec.status) { + case WithdrawalGroupStatus.PendingKyc: { + delete rec.kycPending; + delete rec.kycUrl; + rec.status = WithdrawalGroupStatus.PendingReady; + return TransitionResult.transition(rec); + } + default: + return TransitionResult.stay(); + } + }); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); const kycUrl = kycStatus.kyc_url; if (typeof kycUrl === "string") { - await transitionKycUrlUpdate(wex, withdrawalGroupId, kycUrl); + await ctx.transition({}, async (rec) => { + if (!rec) { + return TransitionResult.stay(); + } + switch (rec.status) { + case WithdrawalGroupStatus.PendingReady: { + rec.kycUrl = kycUrl; + return TransitionResult.transition(rec); + } + } + return TransitionResult.stay(); + }); } } else if ( kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons @@ -1508,6 +1546,11 @@ async function processWithdrawalGroupPendingKyc( return TaskRunResult.backoff(); } +/** + * Select new denominations for a withdrawal group. + * Necessary when denominations expired or got revoked + * before the withdrawal could complete. + */ async function redenominateWithdrawal( wex: WalletExecutionContext, withdrawalGroupId: string, @@ -1635,10 +1678,7 @@ async function processWithdrawalGroupPendingReady( withdrawalGroup: WithdrawalGroupRecord, ): Promise { const { withdrawalGroupId } = withdrawalGroup; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); const exchangeBaseUrl = withdrawalGroup.exchangeBaseUrl; @@ -1646,25 +1686,14 @@ async function processWithdrawalGroupPendingReady( if (withdrawalGroup.denomsSel.selectedDenoms.length === 0) { logger.warn("Finishing empty withdrawal group (no denoms)"); - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wg) { - return undefined; - } - const txStatusOld = computeWithdrawalTransactionStatus(wg); - wg.status = WithdrawalGroupStatus.Done; - wg.timestampFinish = timestampPreciseToDb(TalerPreciseTimestamp.now()); - const txStatusNew = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - return { - oldTxState: txStatusOld, - newTxState: txStatusNew, - }; - }, - ); - notifyTransition(wex, transactionId, transitionInfo); + await ctx.transition({}, async (wg) => { + if (!wg) { + return TransitionResult.stay(); + } + wg.status = WithdrawalGroupStatus.Done; + wg.timestampFinish = timestampPreciseToDb(TalerPreciseTimestamp.now()); + return TransitionResult.transition(wg); + }); return TaskRunResult.finished(); } @@ -1672,7 +1701,7 @@ async function processWithdrawalGroupPendingReady( .map((x) => x.count) .reduce((a, b) => a + b); - const wgContext: WithdrawalGroupContext = { + const wgContext: WithdrawalGroupStatusInfo = { numPlanchets: numTotalCoins, planchetsFinished: new Set(), wgRecord: withdrawalGroup, @@ -1757,51 +1786,41 @@ async function processWithdrawalGroupPendingReady( let numDone = 0; const maxReportedErrors = 5; - const res = await wex.db.runReadWriteTx( - ["coins", "coinAvailability", "withdrawalGroups", "planchets"], - async (tx) => { - const wg = await tx.withdrawalGroups.get(withdrawalGroupId); + const res = await ctx.transition( + { + extraStores: ["coins", "coinAvailability", "planchets"], + }, + async (wg, tx) => { if (!wg) { - return; + return TransitionResult.stay(); } - await tx.planchets.indexes.byGroup - .iter(withdrawalGroupId) - .forEach((x) => { - switch (x.planchetStatus) { - case PlanchetStatus.KycRequired: - case PlanchetStatus.Pending: - numActive++; - break; - case PlanchetStatus.WithdrawalDone: - numDone++; - break; - } - if (x.lastError) { - numPlanchetErrors++; - if (numPlanchetErrors < maxReportedErrors) { - errorsPerCoin[x.coinIdx] = x.lastError; - } + const groupPlanchets = + await tx.planchets.indexes.byGroup.getAll(withdrawalGroupId); + for (const x of groupPlanchets) { + switch (x.planchetStatus) { + case PlanchetStatus.KycRequired: + case PlanchetStatus.Pending: + numActive++; + break; + case PlanchetStatus.WithdrawalDone: + numDone++; + break; + } + if (x.lastError) { + numPlanchetErrors++; + if (numPlanchetErrors < maxReportedErrors) { + errorsPerCoin[x.coinIdx] = x.lastError; } - }); - const oldTxState = computeWithdrawalTransactionStatus(wg); - logger.info(`now withdrawn ${numDone} of ${numTotalCoins} coins`); + } + } + if (wg.timestampFinish === undefined && numActive === 0) { wg.timestampFinish = timestampPreciseToDb(TalerPreciseTimestamp.now()); wg.status = WithdrawalGroupStatus.Done; - await makeCoinsVisible(wex, tx, transactionId); + await makeCoinsVisible(wex, tx, ctx.transactionId); } - - const newTxState = computeWithdrawalTransactionStatus(wg); - await tx.withdrawalGroups.put(wg); - - return { - kycInfo: wg.kycPending, - transitionInfo: { - oldTxState, - newTxState, - }, - }; + return TransitionResult.transition(wg); }, ); @@ -1809,10 +1828,9 @@ async function processWithdrawalGroupPendingReady( throw Error("withdrawal group does not exist anymore"); } - notifyTransition(wex, transactionId, res.transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, - hintTransactionId: transactionId, + hintTransactionId: ctx.transactionId, }); if (numPlanchetErrors > 0) { @@ -2051,7 +2069,9 @@ export interface GetWithdrawalDetailsForUriOpts { type WithdrawalOperationMemoryMap = { [uri: string]: boolean | undefined; }; + const ongoingChecks: WithdrawalOperationMemoryMap = {}; + /** * Get more information about a taler://withdraw URI. * @@ -2229,10 +2249,7 @@ async function registerReserveWithBank( return await tx.withdrawalGroups.get(withdrawalGroupId); }, ); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); switch (withdrawalGroup?.status) { case WithdrawalGroupStatus.PendingWaitConfirmBank: case WithdrawalGroupStatus.PendingRegisteringBank: @@ -2265,76 +2282,54 @@ async function registerReserveWithBank( httpResp, codeForBankWithdrawalOperationPostResponse(), ); - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const r = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!r) { - return undefined; - } - switch (r.status) { - case WithdrawalGroupStatus.PendingRegisteringBank: - case WithdrawalGroupStatus.PendingWaitConfirmBank: - break; - default: - return; - } - if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { - throw Error("invariant failed"); - } - r.wgInfo.bankInfo.timestampReserveInfoPosted = timestampPreciseToDb( - AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()), - ); - const oldTxState = computeWithdrawalTransactionStatus(r); - r.status = WithdrawalGroupStatus.PendingWaitConfirmBank; - r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url; - const newTxState = computeWithdrawalTransactionStatus(r); - await tx.withdrawalGroups.put(r); - return { - oldTxState, - newTxState, - }; - }, - ); - notifyTransition(wex, transactionId, transitionInfo); + await ctx.transition({}, async (r) => { + if (!r) { + return TransitionResult.stay(); + } + switch (r.status) { + case WithdrawalGroupStatus.PendingRegisteringBank: + case WithdrawalGroupStatus.PendingWaitConfirmBank: + break; + default: + return TransitionResult.stay(); + } + if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { + throw Error("invariant failed"); + } + r.wgInfo.bankInfo.timestampReserveInfoPosted = timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()), + ); + r.status = WithdrawalGroupStatus.PendingWaitConfirmBank; + r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url; + return TransitionResult.transition(r); + }); } async function transitionBankAborted( ctx: WithdrawTransactionContext, ): Promise { logger.info("bank aborted the withdrawal"); - const transitionInfo = await ctx.wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const r = await tx.withdrawalGroups.get(ctx.withdrawalGroupId); - if (!r) { - return; - } - switch (r.status) { - case WithdrawalGroupStatus.PendingRegisteringBank: - case WithdrawalGroupStatus.PendingWaitConfirmBank: - break; - default: - return; - } - if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { - throw Error("invariant failed"); - } - const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); - const oldTxState = computeWithdrawalTransactionStatus(r); - r.wgInfo.bankInfo.timestampBankConfirmed = timestampPreciseToDb(now); - r.status = WithdrawalGroupStatus.FailedBankAborted; - const newTxState = computeWithdrawalTransactionStatus(r); - await tx.withdrawalGroups.put(r); - return { - oldTxState, - newTxState, - }; - }, - ); - notifyTransition(ctx.wex, ctx.transactionId, transitionInfo); - return TaskRunResult.finished(); + await ctx.transition({}, async (r) => { + if (!r) { + return TransitionResult.stay(); + } + switch (r.status) { + case WithdrawalGroupStatus.PendingRegisteringBank: + case WithdrawalGroupStatus.PendingWaitConfirmBank: + break; + default: + return TransitionResult.stay(); + } + if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { + throw Error("invariant failed"); + } + const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); + r.wgInfo.bankInfo.timestampBankConfirmed = timestampPreciseToDb(now); + r.status = WithdrawalGroupStatus.FailedBankAborted; + return TransitionResult.transition(r); + }); + return TaskRunResult.progress(); } async function processBankRegisterReserve( @@ -2448,41 +2443,30 @@ async function processReserveBankStatus( return TaskRunResult.longpollReturnedPending(); } - const transitionInfo = await wex.db.runReadWriteTx( - ["withdrawalGroups"], - async (tx) => { - const r = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!r) { - return undefined; - } - // Re-check reserve status within transaction - switch (r.status) { - case WithdrawalGroupStatus.PendingWaitConfirmBank: - break; - default: - return undefined; - } - if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { - throw Error("invariant failed"); - } - const oldTxState = computeWithdrawalTransactionStatus(r); - if (status.transfer_done) { - logger.info("withdrawal: transfer confirmed by bank."); - const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); - r.wgInfo.bankInfo.timestampBankConfirmed = timestampPreciseToDb(now); - r.status = WithdrawalGroupStatus.PendingQueryingStatus; - } else { - } - const newTxState = computeWithdrawalTransactionStatus(r); - await tx.withdrawalGroups.put(r); - return { - oldTxState, - newTxState, - }; - }, - ); - - notifyTransition(wex, ctx.transactionId, transitionInfo); + const transitionInfo = await ctx.transition({}, async (r) => { + if (!r) { + return TransitionResult.stay(); + } + // Re-check reserve status within transaction + switch (r.status) { + case WithdrawalGroupStatus.PendingWaitConfirmBank: + break; + default: + return TransitionResult.stay(); + } + if (r.wgInfo.withdrawalType !== WithdrawalRecordType.BankIntegrated) { + throw Error("invariant failed"); + } + if (status.transfer_done) { + logger.info("withdrawal: transfer confirmed by bank."); + const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); + r.wgInfo.bankInfo.timestampBankConfirmed = timestampPreciseToDb(now); + r.status = WithdrawalGroupStatus.PendingQueryingStatus; + return TransitionResult.transition(r); + } else { + return TransitionResult.stay(); + } + }); if (transitionInfo) { return TaskRunResult.progress(); @@ -2709,10 +2693,23 @@ export async function internalCreateWithdrawalGroup( tag: TransactionType.Withdrawal, withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId, }); + const ctx = new WithdrawTransactionContext( + wex, + prep.withdrawalGroup.withdrawalGroupId, + ); const res = await wex.db.runReadWriteTx( - ["withdrawalGroups", "reserves", "exchanges", "exchangeDetails"], + [ + "withdrawalGroups", + "reserves", + "exchanges", + "exchangeDetails", + "transactions", + "operationRetries", + ], async (tx) => { - return await internalPerformCreateWithdrawalGroup(wex, tx, prep); + const res = await internalPerformCreateWithdrawalGroup(wex, tx, prep); + await updateWithdrawalTransaction(ctx, tx); + return res; }, ); if (res.exchangeNotif) { -- cgit v1.2.3