taler-typescript-core

Wallet core logic and WebUIs for various components
Log | Files | Refs | Submodules | README | LICENSE

commit 1ce3866c9b6d1055d682b4a92d8cef902c0e95e9
parent 4c49969c58e84b0ea505487d475515b0e9e3f113
Author: Florian Dold <florian@dold.me>
Date:   Tue, 10 Mar 2026 20:32:49 +0100

wallet-core: use record handles instead of custom helpers

Diffstat:
Mpackages/taler-harness/src/integrationtests/test-peer-push-large.ts | 59++++++++++++++++++++++++-----------------------------------
Mpackages/taler-wallet-core/src/pay-peer-common.ts | 152-------------------------------------------------------------------------------
Mpackages/taler-wallet-core/src/pay-peer-pull-credit.ts | 361+++++++++++++++++++++++++++++++++++++++++++++++++------------------------------
Mpackages/taler-wallet-core/src/pay-peer-pull-debit.ts | 417++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mpackages/taler-wallet-core/src/pay-peer-push-credit.ts | 322+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Mpackages/taler-wallet-core/src/pay-peer-push-debit.ts | 411+++++++++++++++++++++++++++++++++++++++++--------------------------------------
6 files changed, 858 insertions(+), 864 deletions(-)

diff --git a/packages/taler-harness/src/integrationtests/test-peer-push-large.ts b/packages/taler-harness/src/integrationtests/test-peer-push-large.ts @@ -51,10 +51,7 @@ const coinConfigList: CoinConfig[] = [ ]; const purse_expiration = AbsoluteTime.toProtocolTimestamp( - AbsoluteTime.addDuration( - AbsoluteTime.now(), - Duration.fromSpec({ days: 2 }), - ), + AbsoluteTime.addDuration(AbsoluteTime.now(), Duration.fromSpec({ days: 2 })), ); /** @@ -70,13 +67,11 @@ export async function runPeerPushLargeTest(t: GlobalTestState) { { walletClient: wallet2 }, ] = await Promise.all([ createSimpleTestkudosEnvironmentV3(t, coinConfigList, { - additionalBankConfig(b) { - - }, + additionalBankConfig(b) {}, }), createWalletDaemonWithClient(t, { - name: "w2" - }) + name: "w2", + }), ]); // Withdraw digital cash into the wallet. @@ -88,12 +83,9 @@ export async function runPeerPushLargeTest(t: GlobalTestState) { }); await withdrawRes.withdrawalFinishedCond; - const check = await wallet1.call( - WalletApiOperation.CheckPeerPushDebit, - { - amount: "TESTKUDOS:200", - }, - ); + const check = await wallet1.call(WalletApiOperation.CheckPeerPushDebit, { + amount: "TESTKUDOS:200", + }); t.assertAmountEquals(check.amountEffective, "TESTKUDOS:200"); const initiate = await wallet1.call( @@ -107,36 +99,33 @@ export async function runPeerPushLargeTest(t: GlobalTestState) { }, ); + t.logStep("start: waiting for push credit to become ready"); + await wallet1.call(WalletApiOperation.TestingWaitTransactionState, { transactionId: initiate.transactionId, txState: { major: TransactionMajorState.Pending, - minor: TransactionMinorState.Ready + minor: TransactionMinorState.Ready, }, }); - const tx = await wallet1.call( - WalletApiOperation.GetTransactionById, - { - transactionId: initiate.transactionId, - }, - ); + t.logStep("end: waiting for push credit to become ready"); + + const tx = await wallet1.call(WalletApiOperation.GetTransactionById, { + transactionId: initiate.transactionId, + }); t.assertDeepEqual(tx.type, TransactionType.PeerPushDebit); t.assertTrue(!!tx.talerUri); - const prepare = await wallet2.call( - WalletApiOperation.PreparePeerPushCredit, - { - talerUri: tx.talerUri, - }, - ); + const prepare = await wallet2.call(WalletApiOperation.PreparePeerPushCredit, { + talerUri: tx.talerUri, + }); - await wallet2.call( - WalletApiOperation.ConfirmPeerPushCredit, - { - transactionId: prepare.transactionId, - }, - ); + await wallet2.call(WalletApiOperation.ConfirmPeerPushCredit, { + transactionId: prepare.transactionId, + }); + + t.logStep("start: waiting for both transactions to finish"); await Promise.all([ wallet1.call(WalletApiOperation.TestingWaitTransactionState, { @@ -150,7 +139,7 @@ export async function runPeerPushLargeTest(t: GlobalTestState) { txState: { major: TransactionMajorState.Done, }, - }) + }), ]); } diff --git a/packages/taler-wallet-core/src/pay-peer-common.ts b/packages/taler-wallet-core/src/pay-peer-common.ts @@ -15,34 +15,21 @@ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -import { IDBValidKey } from "@gnu-taler/idb-bridge"; import { AmountJson, Amounts, ExchangePurseStatus, - NotificationType, SelectedProspectiveCoin, TalerProtocolTimestamp, - TransactionIdStr, - TransactionMajorState, - TransactionState, - WalletNotification, checkDbInvariant, } from "@gnu-taler/taler-util"; -import { TransitionResultType } from "./common.js"; import { SpendCoinDetails } from "./crypto/cryptoImplementation.js"; import { DbPeerPushPaymentCoinSelection, ReserveRecord, - TransactionMetaRecord, WalletDbReadOnlyTransaction, - WalletDbReadWriteTransaction, - WalletDbStoresArr, - WalletDbStoresName, - WalletStoresV1, } from "./db.js"; import { getTotalRefreshCost } from "./refresh.js"; -import { BalanceEffect, applyNotifyTransition } from "./transactions.js"; import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; import { updateWithdrawalDenomsForExchange } from "./withdraw.js"; @@ -198,142 +185,3 @@ export function isPurseDeposited(purse: ExchangePurseStatus): boolean { !TalerProtocolTimestamp.isNever(depositTimestamp) ); } - -/** Extract the stored type of a DB store */ -type StoreType<Store extends WalletDbStoresName> = - (typeof WalletStoresV1)[Store]["store"]["_dummy"]; - -interface RecordCtx<Store extends WalletDbStoresName> { - store: Store; - transactionId: TransactionIdStr; - recordId: IDBValidKey; - wex: WalletExecutionContext; - recordMeta: (rec: StoreType<Store>) => TransactionMetaRecord; - recordState: (rec: StoreType<Store>) => { - txState: TransactionState; - stId: number; - }; -} - -/** - * Optionally update an existing record, ignore if missing. - * If a transition occurs, update its metadata and notify. - **/ -export async function recordTransition< - Store extends WalletDbStoresName, - ExtraStores extends WalletDbStoresArr = [], ->( - ctx: RecordCtx<Store>, - opts: { extraStores?: ExtraStores; label?: string }, - lambda: ( - rec: StoreType<Store>, - tx: WalletDbReadWriteTransaction< - [Store, "transactionsMeta", ...ExtraStores] - >, - ) => Promise<TransitionResultType.Stay | TransitionResultType.Transition>, -): Promise<void> { - const baseStore = [ctx.store, "transactionsMeta" as const]; - const storeNames = opts.extraStores - ? [...baseStore, ...opts.extraStores] - : baseStore; - await ctx.wex.db.runReadWriteTx( - { storeNames, label: opts.label }, - async (tx) => { - const rec = await tx[ctx.store].get(ctx.recordId); - if (rec == null) { - // FIXME warn - return; - } - const oldTxState = ctx.recordState(rec); - const res = await lambda(rec, tx); - switch (res) { - case TransitionResultType.Transition: { - await tx[ctx.store].put(rec); - await tx.transactionsMeta.put(ctx.recordMeta(rec)); - const newTxState = ctx.recordState(rec); - applyNotifyTransition(tx.notify, ctx.transactionId, { - oldTxState: oldTxState.txState, - newTxState: newTxState.txState, - balanceEffect: BalanceEffect.Any, - oldStId: oldTxState.stId, - newStId: newTxState.stId, - }); - return; - } - case TransitionResultType.Stay: - return; - } - }, - ); -} - -/** Extract the stored type status if any */ -type StoreTypeStatus<Store extends WalletDbStoresName> = - StoreType<Store> extends { status: infer Status } ? Status : never; - -/** - * Optionally update an existing record status from a state to another, ignore if missing. - * If a transition occurs, update its metadata and notify. - */ -export async function recordTransitionStatus<Store extends WalletDbStoresName>( - ctx: RecordCtx<Store>, - from: StoreTypeStatus<Store>, - to: StoreTypeStatus<Store>, -): Promise<void> { - await recordTransition(ctx, {}, async (rec, _) => { - const it = rec as { status: StoreTypeStatus<Store> }; - if (it.status !== from) { - return TransitionResultType.Stay; - } else { - it.status = to; - return TransitionResultType.Transition; - } - }); -} - -/** - * Optionally delete a record, update its metadata and notify. - **/ -export async function recordDelete<Store extends WalletDbStoresName>( - ctx: RecordCtx<Store>, - tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta"]>, - lambda: ( - rec: StoreType<Store>, - notifs: WalletNotification[], - ) => Promise<void> = async () => {}, -): Promise<{ notifs: WalletNotification[] }> { - const notifs: WalletNotification[] = []; - const rec = await tx[ctx.store].get(ctx.recordId); - if (rec == null) { - return { notifs }; - } - const oldTxState = ctx.recordState(rec); - await lambda(rec, notifs); - await tx[ctx.store].delete(ctx.recordId); - await tx.transactionsMeta.delete(ctx.transactionId); - notifs.push({ - type: NotificationType.TransactionStateTransition, - transactionId: ctx.transactionId, - oldTxState: oldTxState.txState, - newTxState: { - major: TransactionMajorState.Deleted, - }, - newStId: -1, - }); - return { notifs }; -} - -/** - * Update record stored transaction metadata - **/ -export async function recordUpdateMeta<Store extends WalletDbStoresName>( - ctx: RecordCtx<Store>, - tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta"]>, -): Promise<void> { - const rec = await tx[ctx.store].get(ctx.recordId); - if (rec == null) { - await tx.transactionsMeta.delete(ctx.transactionId); - } else { - await tx.transactionsMeta.put(ctx.recordMeta(rec)); - } -} diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -42,7 +42,6 @@ import { TransactionState, TransactionType, WalletAccountMergeFlags, - WalletNotification, assertUnreachable, checkDbInvariant, checkProtocolInvariant, @@ -60,7 +59,6 @@ import { TaskIdentifiers, TaskRunResult, TransactionContext, - TransitionResultType, constructTaskIdentifier, genericWaitForStateVal, getGenericRecordHandle, @@ -93,17 +91,8 @@ import { isKycOperationDue, runKycCheckAlgo, } from "./kyc.js"; +import { getMergeReserveInfo, isPurseDeposited } from "./pay-peer-common.js"; import { - getMergeReserveInfo, - isPurseDeposited, - recordDelete, - recordTransition, - recordTransitionStatus, - recordUpdateMeta, -} from "./pay-peer-common.js"; -import { - BalanceEffect, - applyNotifyBalanceEffect, constructTransactionIdentifier, isUnsuccessfulTransaction, } from "./transactions.js"; @@ -135,22 +124,22 @@ export class PeerPullCreditTransactionContext implements TransactionContext { }); } - readonly store = "peerPullCredit"; - readonly recordId = this.pursePub; - readonly recordState = (rec: PeerPullCreditRecord) => ({ - txState: computePeerPullCreditTransactionState(rec), - stId: rec.status, - }); - readonly recordMeta = (rec: PeerPullCreditRecord) => ({ - transactionId: this.transactionId, - status: rec.status, - timestamp: rec.mergeTimestamp, - currency: Amounts.currencyOf(rec.amount), - exchanges: [rec.exchangeBaseUrl], - }); - updateTransactionMeta = ( + async updateTransactionMeta( tx: WalletDbReadWriteTransaction<["peerPullCredit", "transactionsMeta"]>, - ) => recordUpdateMeta(this, tx); + ): Promise<void> { + const rec = await tx.peerPullCredit.get(this.pursePub); + if (rec == null) { + await tx.transactionsMeta.delete(this.pursePub); + } else { + await tx.transactionsMeta.put({ + currency: Amounts.currencyOf(rec.estimatedAmountEffective), + exchanges: [rec.exchangeBaseUrl], + status: rec.status, + timestamp: rec.mergeTimestamp, + transactionId: this.transactionId, + }); + } + } async deleteTransactionInTx( tx: WalletDbReadWriteTransaction< @@ -162,17 +151,20 @@ export class PeerPullCreditTransactionContext implements TransactionContext { "transactionsMeta", ] >, - ): Promise<{ notifs: WalletNotification[] }> { - return recordDelete(this, tx, async (rec, notifs) => { - if (rec.withdrawalGroupId) { - const withdrawalGroupId = rec.withdrawalGroupId; - const withdrawalCtx = new WithdrawTransactionContext( - this.wex, - withdrawalGroupId, - ); - await withdrawalCtx.deleteTransactionInTx(tx); - } - }); + ): Promise<void> { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } + if (rec.withdrawalGroupId) { + const withdrawalGroupId = rec.withdrawalGroupId; + const withdrawalCtx = new WithdrawTransactionContext( + this.wex, + withdrawalGroupId, + ); + await withdrawalCtx.deleteTransactionInTx(tx); + } + await h.update(undefined); } /** @@ -335,37 +327,40 @@ export class PeerPullCreditTransactionContext implements TransactionContext { "transactionsMeta", ], }, - this.deleteTransactionInTx.bind(this), + async (tx) => { + await this.deleteTransactionInTx(tx); + }, ); - for (const notif of res.notifs) { - this.wex.ws.notify(notif); - } } async suspendTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullPaymentCreditStatus.PendingCreatePurse: rec.status = PeerPullPaymentCreditStatus.SuspendedCreatePurse; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingMergeKycRequired: rec.status = PeerPullPaymentCreditStatus.SuspendedMergeKycRequired; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingWithdrawing: rec.status = PeerPullPaymentCreditStatus.SuspendedWithdrawing; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingReady: rec.status = PeerPullPaymentCreditStatus.SuspendedReady; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.AbortingDeletePurse: rec.status = PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingBalanceKycRequired: rec.status = PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingBalanceKycInit: rec.status = PeerPullPaymentCreditStatus.SuspendedBalanceKycInit; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired: case PeerPullPaymentCreditStatus.SuspendedCreatePurse: case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired: @@ -377,16 +372,21 @@ export class PeerPullCreditTransactionContext implements TransactionContext { case PeerPullPaymentCreditStatus.Aborted: case PeerPullPaymentCreditStatus.Failed: case PeerPullPaymentCreditStatus.Expired: - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async failTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullPaymentCreditStatus.PendingCreatePurse: case PeerPullPaymentCreditStatus.PendingMergeKycRequired: @@ -404,21 +404,26 @@ export class PeerPullCreditTransactionContext implements TransactionContext { case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired: case PeerPullPaymentCreditStatus.PendingBalanceKycInit: case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit: - return TransitionResultType.Stay; + return; case PeerPullPaymentCreditStatus.AbortingDeletePurse: case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse: rec.status = PeerPullPaymentCreditStatus.Failed; rec.failReason = reason; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullPaymentCreditStatus.PendingCreatePurse: case PeerPullPaymentCreditStatus.PendingMergeKycRequired: @@ -431,37 +436,42 @@ export class PeerPullCreditTransactionContext implements TransactionContext { case PeerPullPaymentCreditStatus.Failed: case PeerPullPaymentCreditStatus.Expired: case PeerPullPaymentCreditStatus.Aborted: - return TransitionResultType.Stay; + return; case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit: rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycInit; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedCreatePurse: rec.status = PeerPullPaymentCreditStatus.PendingCreatePurse; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired: rec.status = PeerPullPaymentCreditStatus.PendingMergeKycRequired; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedReady: rec.status = PeerPullPaymentCreditStatus.PendingReady; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedWithdrawing: rec.status = PeerPullPaymentCreditStatus.PendingWithdrawing; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse: rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired: rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycRequired; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.startShepherdTask(this.taskId); } async abortTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullPaymentCreditStatus.PendingBalanceKycRequired: case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired: @@ -471,13 +481,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext { case PeerPullPaymentCreditStatus.PendingMergeKycRequired: rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse; rec.abortReason = reason; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.PendingWithdrawing: throw Error("can't abort anymore"); case PeerPullPaymentCreditStatus.PendingReady: rec.abortReason = reason; rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPullPaymentCreditStatus.Done: case PeerPullPaymentCreditStatus.SuspendedCreatePurse: case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired: @@ -488,10 +498,11 @@ export class PeerPullCreditTransactionContext implements TransactionContext { case PeerPullPaymentCreditStatus.Failed: case PeerPullPaymentCreditStatus.Expired: case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse: - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); this.wex.taskScheduler.startShepherdTask(this.taskId); @@ -514,11 +525,20 @@ async function queryPurseForPeerPullCredit( break; case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! - await recordTransitionStatus( - ctx, - PeerPullPaymentCreditStatus.PendingReady, - PeerPullPaymentCreditStatus.Expired, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullPaymentCreditStatus.PendingReady: + rec.status = PeerPullPaymentCreditStatus.Expired; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); case HttpStatusCode.NotFound: await ctx.failTransaction(resp.detail); @@ -555,11 +575,20 @@ async function queryPurseForPeerPullCredit( pub: reserve.reservePub, }, }); - await recordTransitionStatus( - ctx, - PeerPullPaymentCreditStatus.PendingReady, - PeerPullPaymentCreditStatus.PendingWithdrawing, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullPaymentCreditStatus.PendingReady: + rec.status = PeerPullPaymentCreditStatus.PendingWithdrawing; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); } @@ -612,15 +641,20 @@ async function processPendingMergeKycRequired( checkProtocolInvariant(algoRes.requiresAuth != true); - recordTransition(ctx, {}, async (rec) => { + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } rec.kycLastAmlReview = updatedStatus.lastAmlReview; rec.kycLastCheckStatus = updatedStatus.lastCheckStatus; rec.kycLastCheckCode = updatedStatus.lastCheckCode; rec.kycLastDeny = updatedStatus.lastDeny; rec.kycLastRuleGen = updatedStatus.lastRuleGen; rec.kycAccessToken = updatedStatus.accessToken; - return TransitionResultType.Transition; + await h.update(rec); }); + return algoRes.taskResult; } @@ -638,11 +672,20 @@ async function processPeerPullCreditAbortingDeletePurse( switch (resp.case) { case "ok": case HttpStatusCode.NotFound: - await recordTransitionStatus( - ctx, - PeerPullPaymentCreditStatus.AbortingDeletePurse, - PeerPullPaymentCreditStatus.Aborted, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullPaymentCreditStatus.AbortingDeletePurse: + rec.status = PeerPullPaymentCreditStatus.Aborted; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); case HttpStatusCode.Forbidden: await ctx.failTransaction(resp.detail); @@ -665,37 +708,52 @@ async function processPeerPullCreditWithdrawing( await waitWithdrawalFinal(wex, pullIni.withdrawalGroupId); const ctx = new PeerPullCreditTransactionContext(wex, pullIni.pursePub); const wgId = pullIni.withdrawalGroupId; - let newTxState: TransactionState | undefined; - await recordTransition( - ctx, - { - extraStores: ["withdrawalGroups"], - }, - async (rec, tx) => { - if (rec.status !== PeerPullPaymentCreditStatus.PendingWithdrawing) { - return TransitionResultType.Stay; - } - const wg = await tx.withdrawalGroups.get(wgId); - if (!wg) { - // FIXME: Fail the operation instead? - return TransitionResultType.Stay; - } - switch (wg.status) { - case WithdrawalGroupStatus.Done: - rec.status = PeerPullPaymentCreditStatus.Done; - break; - // FIXME: Also handle other final states! - } - newTxState = computePeerPullCreditTransactionState(rec); - return TransitionResultType.Transition; - }, - ); - if (newTxState && newTxState.major != TransactionMajorState.Pending) { - return TaskRunResult.finished(); - } else { - // FIXME: Return indicator that we depend on the other operation! + + return await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (rec?.status !== PeerPullPaymentCreditStatus.PendingWithdrawing) { + return TaskRunResult.backoff(); + } + const wg = await tx.withdrawalGroups.get(wgId); + if (!wg) { + // FIXME: Fail the operation instead? + return TaskRunResult.backoff(); + } + switch (wg.status) { + case WithdrawalGroupStatus.Done: + rec.status = PeerPullPaymentCreditStatus.Done; + await h.update(rec); + return TaskRunResult.finished(); + case WithdrawalGroupStatus.AbortedBank: + case WithdrawalGroupStatus.AbortedExchange: + case WithdrawalGroupStatus.AbortedOtherWallet: + case WithdrawalGroupStatus.AbortedUserRefused: + case WithdrawalGroupStatus.FailedBankAborted: + throw Error("unexpected transaction state"); + case WithdrawalGroupStatus.AbortingBank: + case WithdrawalGroupStatus.DialogProposed: + case WithdrawalGroupStatus.FailedAbortingBank: + case WithdrawalGroupStatus.PendingBalanceKyc: + case WithdrawalGroupStatus.PendingBalanceKycInit: + case WithdrawalGroupStatus.PendingKyc: + case WithdrawalGroupStatus.PendingQueryingStatus: + case WithdrawalGroupStatus.PendingReady: + case WithdrawalGroupStatus.PendingRedenominate: + case WithdrawalGroupStatus.PendingRegisteringBank: + case WithdrawalGroupStatus.PendingWaitConfirmBank: + case WithdrawalGroupStatus.SuspendedAbortingBank: + case WithdrawalGroupStatus.SuspendedBalanceKyc: + case WithdrawalGroupStatus.SuspendedBalanceKycInit: + case WithdrawalGroupStatus.SuspendedKyc: + case WithdrawalGroupStatus.SuspendedQueryingStatus: + case WithdrawalGroupStatus.SuspendedReady: + case WithdrawalGroupStatus.SuspendedRedenominate: + case WithdrawalGroupStatus.SuspendedRegisteringBank: + case WithdrawalGroupStatus.SuspendedWaitConfirmBank: + break; + } return TaskRunResult.backoff(); - } + }); } async function processPeerPullCreditCreatePurse( @@ -716,11 +774,20 @@ async function processPeerPullCreditCreatePurse( amount: kycCheckRes.nextThreshold, exchangeBaseUrl: pullIni.exchangeBaseUrl, }); - await recordTransitionStatus( - ctx, - PeerPullPaymentCreditStatus.PendingCreatePurse, - PeerPullPaymentCreditStatus.PendingBalanceKycInit, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullPaymentCreditStatus.PendingCreatePurse: + rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycInit; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); } @@ -819,9 +886,13 @@ async function processPeerPullCreditCreatePurse( assertUnreachable(resp); } - await recordTransition(ctx, {}, async (rec, _) => { + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } rec.status = PeerPullPaymentCreditStatus.PendingReady; - return TransitionResultType.Transition; + await h.update(rec); }); return TaskRunResult.backoff(); } @@ -921,23 +992,40 @@ async function processPeerPullCreditBalanceKyc( }); if (ret.result === "ok") { - await recordTransitionStatus( - ctx, - PeerPullPaymentCreditStatus.PendingBalanceKycRequired, - PeerPullPaymentCreditStatus.PendingCreatePurse, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullPaymentCreditStatus.PendingBalanceKycRequired: + rec.status = PeerPullPaymentCreditStatus.PendingCreatePurse; + break; + default: + return; + } + await h.update(rec); + }); + return TaskRunResult.progress(); } else if ( peerInc.status === PeerPullPaymentCreditStatus.PendingBalanceKycInit && ret.walletKycStatus === ExchangeWalletKycStatus.Legi ) { - await recordTransition(ctx, {}, async (rec) => { - if (rec.status !== PeerPullPaymentCreditStatus.PendingBalanceKycInit) { - return TransitionResultType.Stay; + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; } - rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycRequired; - rec.kycAccessToken = ret.walletKycAccessToken; - return TransitionResultType.Transition; + switch (rec.status) { + case PeerPullPaymentCreditStatus.PendingBalanceKycInit: + rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycRequired; + rec.kycAccessToken = ret.walletKycAccessToken; + break; + default: + return; + } + await h.update(rec); }); return TaskRunResult.progress(); } else { @@ -952,13 +1040,16 @@ async function handlePeerPullCreditKycRequired( ): Promise<TaskRunResult> { const ctx = new PeerPullCreditTransactionContext(wex, peerIni.pursePub); - await recordTransition(ctx, {}, async (rec, tx) => { + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } logger.info(`setting peer-pull-credit kyc payto hash to ${kycPaytoHash}`); rec.kycLastDeny = timestampPreciseToDb(TalerPreciseTimestamp.now()); rec.kycPaytoHash = kycPaytoHash; rec.status = PeerPullPaymentCreditStatus.PendingMergeKycRequired; - applyNotifyBalanceEffect(tx.notify, ctx.transactionId, BalanceEffect.Flags); - return TransitionResultType.Transition; + await h.update(rec); }); return TaskRunResult.progress(); } diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -48,7 +48,6 @@ import { TransactionMinorState, TransactionState, TransactionType, - WalletNotification, assertUnreachable, checkDbInvariant, checkLogicInvariant, @@ -68,7 +67,6 @@ import { TaskIdStr, TaskRunResult, TransactionContext, - TransitionResultType, constructTaskIdentifier, getGenericRecordHandle, spendCoins, @@ -87,10 +85,6 @@ import { getTotalPeerPaymentCost, isPurseDeposited, queryCoinInfosForSelection, - recordDelete, - recordTransition, - recordTransitionStatus, - recordUpdateMeta, } from "./pay-peer-common.js"; import { createRefreshGroup } from "./refresh.js"; import { @@ -124,23 +118,22 @@ export class PeerPullDebitTransactionContext implements TransactionContext { }); } - readonly store = "peerPullDebit"; - readonly recordId = this.peerPullDebitId; - readonly recordState = (rec: PeerPullPaymentIncomingRecord) => ({ - txState: computePeerPullDebitTransactionState(rec), - stId: rec.status, - }); - readonly recordMeta = (rec: PeerPullPaymentIncomingRecord) => ({ - transactionId: this.transactionId, - status: rec.status, - timestamp: rec.timestampCreated, - currency: Amounts.currencyOf(rec.amount), - exchanges: [rec.exchangeBaseUrl], - }); - - updateTransactionMeta = ( + async updateTransactionMeta( tx: WalletDbReadWriteTransaction<["peerPullDebit", "transactionsMeta"]>, - ) => recordUpdateMeta(this, tx); + ): Promise<void> { + const rec = await tx.peerPullDebit.get(this.peerPullDebitId); + if (rec == null) { + await tx.transactionsMeta.delete(this.peerPullDebitId); + } else { + await tx.transactionsMeta.put({ + currency: Amounts.currencyOf(rec.amount), + exchanges: [rec.exchangeBaseUrl], + status: rec.status, + timestamp: rec.timestampCreated, + transactionId: this.transactionId, + }); + } + } /** * Get the full transaction details for the transaction. @@ -209,23 +202,30 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } async deleteTransaction(): Promise<void> { - const res = await this.wex.db.runReadWriteTx( + await this.wex.db.runReadWriteTx( { storeNames: ["peerPullDebit", "transactionsMeta"] }, - this.deleteTransactionInTx.bind(this), + async (tx) => { + await this.deleteTransactionInTx(tx); + }, ); - for (const notif of res.notifs) { - this.wex.ws.notify(notif); - } } async deleteTransactionInTx( tx: WalletDbReadWriteTransaction<["peerPullDebit", "transactionsMeta"]>, - ): Promise<{ notifs: WalletNotification[] }> { - return recordDelete(this, tx); + ): Promise<void> { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } + h.update(undefined); } async suspendTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullDebitRecordStatus.DialogProposed: case PeerPullDebitRecordStatus.Done: @@ -233,43 +233,53 @@ export class PeerPullDebitTransactionContext implements TransactionContext { case PeerPullDebitRecordStatus.Aborted: case PeerPullDebitRecordStatus.Failed: case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: - return TransitionResultType.Stay; + return; case PeerPullDebitRecordStatus.PendingDeposit: rec.status = PeerPullDebitRecordStatus.SuspendedDeposit; - return TransitionResultType.Transition; + break; case PeerPullDebitRecordStatus.AbortingRefresh: rec.status = PeerPullDebitRecordStatus.SuspendedAbortingRefresh; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: rec.status = PeerPullDebitRecordStatus.PendingDeposit; - return TransitionResultType.Transition; + break; case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: rec.status = PeerPullDebitRecordStatus.AbortingRefresh; - return TransitionResultType.Transition; + break; case PeerPullDebitRecordStatus.Aborted: case PeerPullDebitRecordStatus.AbortingRefresh: case PeerPullDebitRecordStatus.Failed: case PeerPullDebitRecordStatus.DialogProposed: case PeerPullDebitRecordStatus.Done: case PeerPullDebitRecordStatus.PendingDeposit: - return TransitionResultType.Stay; + return; } + await h.update(rec); }); this.wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: case PeerPullDebitRecordStatus.PendingDeposit: @@ -278,10 +288,11 @@ export class PeerPullDebitTransactionContext implements TransactionContext { // FIXME: Should we also abort the corresponding refresh session?! rec.status = PeerPullDebitRecordStatus.Failed; rec.failReason = reason; - return TransitionResultType.Transition; + break; default: - return TransitionResultType.Stay; + return; } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } @@ -296,56 +307,47 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } const currency = Amounts.currencyOf(oldRec.amount); await updateWithdrawalDenomsForCurrency(this.wex, currency); - await recordTransition( - this, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "refreshGroups", - "refreshSessions", - "denominationFamilies", - ], - }, - async (pi, tx) => { - switch (pi.status) { - case PeerPullDebitRecordStatus.SuspendedDeposit: - case PeerPullDebitRecordStatus.PendingDeposit: - break; - default: - return TransitionResultType.Stay; - } - const currency = Amounts.currencyOf(pi.totalCostEstimated); - const coinPubs: CoinRefreshRequest[] = []; - if (!pi.coinSel) { - throw Error("invalid db state"); - } + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [pi, h] = await this.getRecordHandle(tx); + if (!pi) { + return; + } + switch (pi.status) { + case PeerPullDebitRecordStatus.SuspendedDeposit: + case PeerPullDebitRecordStatus.PendingDeposit: + break; + default: + return; + } + const currency = Amounts.currencyOf(pi.totalCostEstimated); + const coinPubs: CoinRefreshRequest[] = []; - for (let i = 0; i < pi.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: pi.coinSel.contributions[i], - coinPub: pi.coinSel.coinPubs[i], - }); - } + if (!pi.coinSel) { + throw Error("invalid db state"); + } - const refresh = await createRefreshGroup( - this.wex, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPullDebit, - this.transactionId, - ); + for (let i = 0; i < pi.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: pi.coinSel.contributions[i], + coinPub: pi.coinSel.coinPubs[i], + }); + } - pi.status = PeerPullDebitRecordStatus.AbortingRefresh; - pi.abortRefreshGroupId = refresh.refreshGroupId; - pi.abortReason = reason; - return TransitionResultType.Transition; - }, - ); + const refresh = await createRefreshGroup( + this.wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPullDebit, + this.transactionId, + ); + + pi.status = PeerPullDebitRecordStatus.AbortingRefresh; + pi.abortRefreshGroupId = refresh.refreshGroupId; + pi.abortReason = reason; + await h.update(pi); + }); this.wex.taskScheduler.stopShepherdTask(this.taskId); this.wex.taskScheduler.startShepherdTask(this.taskId); } @@ -415,7 +417,11 @@ async function handlePurseCreationConflict( coinSelRes.result.coins, ); - await recordTransition(ctx, {}, async (rec) => { + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPullDebitRecordStatus.PendingDeposit: case PeerPullDebitRecordStatus.SuspendedDeposit: { @@ -425,10 +431,11 @@ async function handlePurseCreationConflict( contributions: sel.coins.map((x) => x.contribution), totalCost: Amounts.stringify(totalAmount), }; - return TransitionResultType.Transition; + await h.update(rec); + break; } default: - return TransitionResultType.Stay; + break; } }); return TaskRunResult.progress(); @@ -450,11 +457,20 @@ async function processPeerPullDebitDialogProposed( break; case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! - await recordTransitionStatus( - ctx, - PeerPullDebitRecordStatus.DialogProposed, - PeerPullDebitRecordStatus.Aborted, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPullDebitRecordStatus.DialogProposed: + rec.status = PeerPullDebitRecordStatus.Aborted; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); case HttpStatusCode.NotFound: await ctx.failTransaction(resp.detail); @@ -465,11 +481,17 @@ async function processPeerPullDebitDialogProposed( if (isPurseDeposited(resp.body)) { logger.info("purse completed by another wallet"); - await recordTransitionStatus( - ctx, - PeerPullDebitRecordStatus.DialogProposed, - PeerPullDebitRecordStatus.Aborted, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + switch (rec?.status) { + case PeerPullDebitRecordStatus.DialogProposed: + rec.status = PeerPullDebitRecordStatus.Aborted; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); } @@ -527,50 +549,33 @@ async function processPeerPullDebitPendingDeposit( } const totalAmount = await getTotalPeerPaymentCost(wex, coins); - - // FIXME: Missing notification here! - const info = await recordTransition( - ctx, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "exchanges", - "refreshGroups", - "refreshSessions", - "denominationFamilies", - ], - }, - async (rec, tx) => { - if ( - rec.status !== PeerPullDebitRecordStatus.PendingDeposit || - rec.coinSel != null - ) { - return TransitionResultType.Stay; - } - await spendCoins(wex, tx, { - transactionId: ctx.transactionId, - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPull, - }); - rec.coinSel = { - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => x.contribution), - totalCost: Amounts.stringify(totalAmount), - }; - return TransitionResultType.Transition; - }, - ); - if (info != null) { + return await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return TaskRunResult.finished(); + } + if ( + rec.status !== PeerPullDebitRecordStatus.PendingDeposit || + rec.coinSel != null + ) { + return TaskRunResult.backoff(); + } + await spendCoins(wex, tx, { + transactionId: ctx.transactionId, + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPull, + }); + rec.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + totalCost: Amounts.stringify(totalAmount), + }; + await h.update(rec); return TaskRunResult.progress(); - } else { - return TaskRunResult.backoff(); - } + }); } const exchangeClient = walletExchangeClient(peerPullInc.exchangeBaseUrl, wex); @@ -629,11 +634,17 @@ async function processPeerPullDebitPendingDeposit( } } // All batches succeeded, we can transition! - await recordTransitionStatus( - ctx, - PeerPullDebitRecordStatus.PendingDeposit, - PeerPullDebitRecordStatus.Done, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + switch (rec?.status) { + case PeerPullDebitRecordStatus.PendingDeposit: + rec.status = PeerPullDebitRecordStatus.Done; + break; + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); } @@ -645,32 +656,32 @@ async function processPeerPullDebitAbortingRefresh( const abortRefreshGroupId = peerPullInc.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId); - await recordTransition( - ctx, - { extraStores: ["refreshGroups"] }, - async (rec, tx) => { - const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); - if (refreshGroup == null) { - // Maybe it got manually deleted? Means that we should - // just go into failed. - logger.warn("no aborting refresh group found for deposit group"); - rec.status = PeerPullDebitRecordStatus.Failed; - return TransitionResultType.Transition; - } else { - switch (refreshGroup.operationStatus) { - case RefreshOperationStatus.Finished: - rec.status = PeerPullDebitRecordStatus.Aborted; - return TransitionResultType.Transition; - case RefreshOperationStatus.Failed: { - rec.status = PeerPullDebitRecordStatus.Failed; - return TransitionResultType.Transition; - } - default: - return TransitionResultType.Stay; + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + if (refreshGroup == null) { + // Maybe it got manually deleted? Means that we should + // just go into failed. + logger.warn("no aborting refresh group found for deposit group"); + rec.status = PeerPullDebitRecordStatus.Failed; + } else { + switch (refreshGroup.operationStatus) { + case RefreshOperationStatus.Finished: + rec.status = PeerPullDebitRecordStatus.Aborted; + break; + case RefreshOperationStatus.Failed: { + rec.status = PeerPullDebitRecordStatus.Failed; + break; } + default: + return; } - }, - ); + } + await h.update(rec); + }); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } @@ -770,43 +781,33 @@ export async function confirmPeerPullDebit( const totalAmount = await getTotalPeerPaymentCost(wex, coins); - await recordTransition( - ctx, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "exchanges", - "refreshGroups", - "refreshSessions", - "denominationFamilies", - ], - }, - async (rec, tx) => { - if (rec.status !== PeerPullDebitRecordStatus.DialogProposed) { - return TransitionResultType.Stay; - } - if (coinSelRes.type == "success") { - await spendCoins(wex, tx, { - transactionId: ctx.transactionId, - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPull, - }); - rec.coinSel = { - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => x.contribution), - totalCost: Amounts.stringify(totalAmount), - }; - } - rec.status = PeerPullDebitRecordStatus.PendingDeposit; - return TransitionResultType.Transition; - }, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + if (rec.status !== PeerPullDebitRecordStatus.DialogProposed) { + return; + } + if (coinSelRes.type == "success") { + await spendCoins(wex, tx, { + transactionId: ctx.transactionId, + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPull, + }); + rec.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + totalCost: Amounts.stringify(totalAmount), + }; + } + rec.status = PeerPullDebitRecordStatus.PendingDeposit; + await h.update(rec); + }); + wex.taskScheduler.stopShepherdTask(ctx.taskId); wex.taskScheduler.startShepherdTask(ctx.taskId); diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -39,7 +39,6 @@ import { TransactionState, TransactionType, WalletAccountMergeFlags, - WalletNotification, assertUnreachable, checkDbInvariant, checkProtocolInvariant, @@ -59,7 +58,6 @@ import { TaskIdentifiers, TaskRunResult, TransactionContext, - TransitionResultType, constructTaskIdentifier, genericWaitForStateVal, getGenericRecordHandle, @@ -92,17 +90,8 @@ import { isKycOperationDue, runKycCheckAlgo, } from "./kyc.js"; +import { getMergeReserveInfo, isPurseMerged } from "./pay-peer-common.js"; import { - getMergeReserveInfo, - isPurseMerged, - recordDelete, - recordTransition, - recordTransitionStatus, - recordUpdateMeta, -} from "./pay-peer-common.js"; -import { - BalanceEffect, - applyNotifyTransition, constructTransactionIdentifier, isUnsuccessfulTransaction, parseTransactionIdentifier, @@ -137,22 +126,22 @@ export class PeerPushCreditTransactionContext implements TransactionContext { }); } - readonly store = "peerPushCredit"; - readonly recordId = this.peerPushCreditId; - readonly recordState = (rec: PeerPushPaymentIncomingRecord) => ({ - txState: computePeerPushCreditTransactionState(rec), - stId: rec.status, - }); - readonly recordMeta = (rec: PeerPushPaymentIncomingRecord) => ({ - transactionId: this.transactionId, - status: rec.status, - timestamp: rec.timestamp, - currency: Amounts.currencyOf(rec.estimatedAmountEffective), - exchanges: [rec.exchangeBaseUrl], - }); - updateTransactionMeta = ( + async updateTransactionMeta( tx: WalletDbReadWriteTransaction<["peerPushCredit", "transactionsMeta"]>, - ) => recordUpdateMeta(this, tx); + ): Promise<void> { + const rec = await tx.peerPushCredit.get(this.peerPushCreditId); + if (rec == null) { + await tx.transactionsMeta.delete(this.peerPushCreditId); + } else { + await tx.transactionsMeta.put({ + currency: Amounts.currencyOf(rec.estimatedAmountEffective), + exchanges: [rec.exchangeBaseUrl], + status: rec.status, + timestamp: rec.timestamp, + transactionId: this.transactionId, + }); + } + } /** * Get the full transaction details for the transaction. @@ -304,9 +293,6 @@ export class PeerPushCreditTransactionContext implements TransactionContext { }, async (tx) => this.deleteTransactionInTx(tx), ); - for (const notif of res.notifs) { - this.wex.ws.notify(notif); - } } async deleteTransactionInTx( @@ -319,21 +305,28 @@ export class PeerPushCreditTransactionContext implements TransactionContext { "transactionsMeta", ] >, - ): Promise<{ notifs: WalletNotification[] }> { - return recordDelete(this, tx, async (rec, notifs) => { - if (rec.withdrawalGroupId) { - const withdrawalGroupId = rec.withdrawalGroupId; - const withdrawalCtx = new WithdrawTransactionContext( - this.wex, - withdrawalGroupId, - ); - await withdrawalCtx.deleteTransactionInTx(tx); - } - }); + ): Promise<void> { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } + await h.update(undefined); + const withdrawalGroupId = rec.withdrawalGroupId; + if (withdrawalGroupId != null) { + const withdrawalCtx = new WithdrawTransactionContext( + this.wex, + withdrawalGroupId, + ); + await withdrawalCtx.deleteTransactionInTx(tx); + } } async suspendTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushCreditStatus.DialogProposed: case PeerPushCreditStatus.Done: @@ -344,37 +337,42 @@ export class PeerPushCreditTransactionContext implements TransactionContext { case PeerPushCreditStatus.SuspendedBalanceKycInit: case PeerPushCreditStatus.Failed: case PeerPushCreditStatus.Aborted: - return TransitionResultType.Stay; + return; case PeerPushCreditStatus.PendingBalanceKycRequired: rec.status = PeerPushCreditStatus.SuspendedBalanceKycRequired; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.PendingBalanceKycInit: rec.status = PeerPushCreditStatus.SuspendedBalanceKycInit; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.PendingMergeKycRequired: rec.status = PeerPushCreditStatus.SuspendedMergeKycRequired; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.PendingMerge: rec.status = PeerPushCreditStatus.SuspendedMerge; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.PendingWithdrawing: // FIXME: Suspend internal withdrawal transaction! rec.status = PeerPushCreditStatus.SuspendedWithdrawing; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async abortTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushCreditStatus.Failed: case PeerPushCreditStatus.Aborted: case PeerPushCreditStatus.Done: - return TransitionResultType.Stay; + return; case PeerPushCreditStatus.SuspendedMerge: case PeerPushCreditStatus.DialogProposed: case PeerPushCreditStatus.SuspendedMergeKycRequired: @@ -387,16 +385,21 @@ export class PeerPushCreditTransactionContext implements TransactionContext { case PeerPushCreditStatus.PendingBalanceKycInit: case PeerPushCreditStatus.SuspendedBalanceKycInit: rec.status = PeerPushCreditStatus.Aborted; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushCreditStatus.DialogProposed: case PeerPushCreditStatus.PendingMergeKycRequired: @@ -407,38 +410,43 @@ export class PeerPushCreditTransactionContext implements TransactionContext { case PeerPushCreditStatus.Done: case PeerPushCreditStatus.Aborted: case PeerPushCreditStatus.Failed: - return TransitionResultType.Stay; + return; case PeerPushCreditStatus.SuspendedMerge: rec.status = PeerPushCreditStatus.PendingMerge; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.SuspendedMergeKycRequired: rec.status = PeerPushCreditStatus.PendingMergeKycRequired; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.SuspendedWithdrawing: // FIXME: resume underlying "internal-withdrawal" transaction. rec.status = PeerPushCreditStatus.PendingWithdrawing; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.SuspendedBalanceKycRequired: rec.status = PeerPushCreditStatus.PendingBalanceKycRequired; - return TransitionResultType.Transition; + break; case PeerPushCreditStatus.SuspendedBalanceKycInit: rec.status = PeerPushCreditStatus.PendingBalanceKycInit; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushCreditStatus.Done: case PeerPushCreditStatus.Aborted: case PeerPushCreditStatus.Failed: // Already in a final state. - return TransitionResultType.Stay; + return; case PeerPushCreditStatus.DialogProposed: case PeerPushCreditStatus.PendingMergeKycRequired: case PeerPushCreditStatus.PendingMerge: @@ -452,10 +460,11 @@ export class PeerPushCreditTransactionContext implements TransactionContext { case PeerPushCreditStatus.SuspendedBalanceKycInit: rec.status = PeerPushCreditStatus.Failed; rec.failReason = reason; - return TransitionResultType.Transition; + break; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } @@ -693,15 +702,20 @@ async function processPeerPushDebitMergeKyc( checkProtocolInvariant(algoRes.requiresAuth != true); - recordTransition(ctx, {}, async (rec) => { + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } rec.kycLastAmlReview = updatedStatus.lastAmlReview; rec.kycLastCheckStatus = updatedStatus.lastCheckStatus; rec.kycLastCheckCode = updatedStatus.lastCheckCode; rec.kycLastDeny = updatedStatus.lastDeny; rec.kycLastRuleGen = updatedStatus.lastRuleGen; rec.kycAccessToken = updatedStatus.accessToken; - return TransitionResultType.Transition; + await h.update(rec); }); + return algoRes.taskResult; } @@ -718,24 +732,14 @@ async function transitionPeerPushCreditKycRequired( return await wex.db.runReadWriteTx( { storeNames: ["peerPushCredit", "transactionsMeta"] }, async (tx) => { - const peerInc = await tx.peerPushCredit.get(ctx.peerPushCreditId); + const [peerInc, h] = await ctx.getRecordHandle(tx); if (!peerInc) { return TaskRunResult.finished(); } - const oldTxState = ctx.recordState(peerInc); peerInc.kycPaytoHash = kycPending.h_payto; peerInc.status = PeerPushCreditStatus.PendingMergeKycRequired; peerInc.kycLastDeny = timestampPreciseToDb(TalerPreciseTimestamp.now()); - const newTxState = computePeerPushCreditTransactionState(peerInc); - await tx.peerPushCredit.put(peerInc); - await ctx.updateTransactionMeta(tx); - applyNotifyTransition(tx.notify, ctx.transactionId, { - oldTxState: oldTxState.txState, - newTxState, - balanceEffect: BalanceEffect.Flags, - newStId: peerInc.status, - oldStId: oldTxState.stId, - }); + await h.update(peerInc); return TaskRunResult.progress(); }, ); @@ -761,11 +765,21 @@ async function processPendingMerge( amount: kycCheckRes.nextThreshold, exchangeBaseUrl: peerInc.exchangeBaseUrl, }); - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.PendingMerge, - PeerPushCreditStatus.PendingBalanceKycInit, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.PendingMerge: { + rec.status = PeerPushCreditStatus.PendingBalanceKycInit; + break; + } + default: + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); } @@ -831,11 +845,21 @@ async function processPendingMerge( case HttpStatusCode.Conflict: // FIXME: Check signature. // FIXME: status completed by other - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.PendingMerge, - PeerPushCreditStatus.Aborted, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.PendingMerge: { + rec.status = PeerPushCreditStatus.Aborted; + break; + } + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); case HttpStatusCode.Gone: // FIXME: status expired @@ -876,11 +900,10 @@ async function processPendingMerge( ], }, async (tx) => { - const peerInc = await tx.peerPushCredit.get(peerPushCreditId); + const [peerInc, h] = await ctx.getRecordHandle(tx); if (!peerInc) { return undefined; } - const oldTxState = ctx.recordState(peerInc); let wgCreateRes: PerformCreateWithdrawalGroupResult | undefined = undefined; switch (peerInc.status) { @@ -897,16 +920,7 @@ async function processPendingMerge( break; } } - await tx.peerPushCredit.put(peerInc); - await ctx.updateTransactionMeta(tx); - const newTxState = computePeerPushCreditTransactionState(peerInc); - applyNotifyTransition(tx.notify, ctx.transactionId, { - oldTxState: oldTxState.txState, - newTxState, - balanceEffect: BalanceEffect.Any, - newStId: peerInc.status, - oldStId: oldTxState.stId, - }); + await h.update(peerInc); }, ); return TaskRunResult.backoff(); @@ -929,7 +943,7 @@ async function processPendingWithdrawing( await wex.db.runReadWriteTx( { storeNames: ["peerPushCredit", "withdrawalGroups", "transactionsMeta"] }, async (tx) => { - const ppi = await tx.peerPushCredit.get(peerInc.peerPushCreditId); + const [ppi, h] = await ctx.getRecordHandle(tx); if (!ppi) { finished = true; return; @@ -938,7 +952,6 @@ async function processPendingWithdrawing( finished = true; return; } - const oldTxState = ctx.recordState(ppi); const wg = await tx.withdrawalGroups.get(wgId); if (!wg) { // FIXME: Fail the operation instead? @@ -951,17 +964,7 @@ async function processPendingWithdrawing( break; // FIXME: Also handle other final states! } - await tx.peerPushCredit.put(ppi); - await ctx.updateTransactionMeta(tx); - const newTxState = ctx.recordState(ppi); - applyNotifyTransition(tx.notify, ctx.transactionId, { - oldTxState: oldTxState.txState, - newTxState: newTxState.txState, - balanceEffect: BalanceEffect.Any, - oldStId: oldTxState.stId, - newStId: newTxState.stId, - }); - return; + await h.update(ppi); }, ); if (finished) { @@ -991,11 +994,21 @@ async function processPeerPushDebitDialogProposed( break; case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.DialogProposed, - PeerPushCreditStatus.Aborted, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.DialogProposed: { + rec.status = PeerPushCreditStatus.Aborted; + break; + } + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); case HttpStatusCode.NotFound: await ctx.failTransaction(resp.detail); @@ -1006,11 +1019,21 @@ async function processPeerPushDebitDialogProposed( if (isPurseMerged(resp.body)) { logger.info("purse completed by another wallet"); - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.DialogProposed, - PeerPushCreditStatus.Aborted, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.DialogProposed: { + rec.status = PeerPushCreditStatus.Aborted; + break; + } + default: + return; + } + await h.update(rec); + }); return TaskRunResult.finished(); } @@ -1026,8 +1049,8 @@ export async function processPeerPushCredit( } const ctx = new PeerPushCreditTransactionContext(wex, peerPushCreditId); - const { peerInc, contractTerms } = await wex.db.runReadOnlyTx( - { storeNames: ["contractTerms", "peerPushCredit", "transactionsMeta"] }, + const { peerInc, contractTerms } = await wex.db.runAllStoresReadWriteTx( + {}, async (tx) => { const rec = await tx.peerPushCredit.get(peerPushCreditId); let contractTerms = null; @@ -1123,23 +1146,35 @@ async function processPeerPushCreditBalanceKyc( }); if (ret.result === "ok") { - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.PendingBalanceKycRequired, - PeerPushCreditStatus.PendingMerge, - ); + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.PendingBalanceKycRequired: { + rec.status = PeerPushCreditStatus.PendingMerge; + await h.update(rec); + break; + } + } + }); return TaskRunResult.progress(); } else if ( peerInc.status === PeerPushCreditStatus.PendingBalanceKycInit && ret.walletKycStatus === ExchangeWalletKycStatus.Legi ) { - await recordTransition(ctx, {}, async (rec) => { - if (rec.status === PeerPushCreditStatus.PendingBalanceKycInit) { - rec.status = PeerPushCreditStatus.PendingBalanceKycRequired; - rec.kycAccessToken = ret.walletKycAccessToken; - return TransitionResultType.Transition; - } else { - return TransitionResultType.Stay; + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.PendingBalanceKycInit: { + rec.status = PeerPushCreditStatus.PendingBalanceKycRequired; + rec.kycAccessToken = ret.walletKycAccessToken; + await h.update(rec); + } } }); return TaskRunResult.progress(); @@ -1198,11 +1233,20 @@ export async function confirmPeerPushCredit( if (checkPeerCreditHardLimitExceeded(exchange, res.contractTerms.amount)) { throw Error("peer credit would exceed hard KYC limit"); } - await recordTransitionStatus( - ctx, - PeerPushCreditStatus.DialogProposed, - PeerPushCreditStatus.PendingMerge, - ); + + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushCreditStatus.DialogProposed: { + rec.status = PeerPushCreditStatus.PendingMerge; + await h.update(rec); + break; + } + } + }); wex.taskScheduler.stopShepherdTask(ctx.taskId); wex.taskScheduler.startShepherdTask(ctx.taskId); diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -44,7 +44,6 @@ import { TransactionMinorState, TransactionState, TransactionType, - WalletNotification, assertUnreachable, checkDbInvariant, encodeCrock, @@ -63,7 +62,6 @@ import { TaskIdStr, TaskRunResult, TransactionContext, - TransitionResultType, constructTaskIdentifier, getGenericRecordHandle, runWithClientCancellation, @@ -90,10 +88,6 @@ import { getTotalPeerPaymentCostInTx, isPurseMerged, queryCoinInfosForSelection, - recordDelete, - recordTransition, - recordTransitionStatus, - recordUpdateMeta, } from "./pay-peer-common.js"; import { createRefreshGroup } from "./refresh.js"; import { @@ -123,22 +117,22 @@ export class PeerPushDebitTransactionContext implements TransactionContext { }); } - readonly store = "peerPushDebit"; - readonly recordId = this.pursePub; - readonly recordState = (rec: PeerPushDebitRecord) => ({ - txState: computePeerPushDebitTransactionState(rec), - stId: rec.status, - }); - readonly recordMeta = (rec: PeerPushDebitRecord) => ({ - transactionId: this.transactionId, - status: rec.status, - timestamp: rec.timestampCreated, - currency: Amounts.currencyOf(rec.amount), - exchanges: [rec.exchangeBaseUrl], - }); - updateTransactionMeta = ( + async updateTransactionMeta( tx: WalletDbReadWriteTransaction<["peerPushDebit", "transactionsMeta"]>, - ) => recordUpdateMeta(this, tx); + ): Promise<void> { + const rec = await tx.peerPushDebit.get(this.pursePub); + if (rec == null) { + await tx.transactionsMeta.delete(this.pursePub); + } else { + await tx.transactionsMeta.put({ + currency: Amounts.currencyOf(rec.amount), + exchanges: [rec.exchangeBaseUrl], + status: rec.status, + timestamp: rec.timestampCreated, + transactionId: this.transactionId, + }); + } + } /** * Get the full transaction details for the transaction. @@ -221,33 +215,40 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async deleteTransaction(): Promise<void> { - const res = await this.wex.db.runReadWriteTx( + await this.wex.db.runReadWriteTx( { storeNames: ["peerPushDebit", "transactionsMeta"] }, - this.deleteTransactionInTx.bind(this), + async (tx) => { + await this.deleteTransactionInTx(tx); + }, ); - for (const notif of res.notifs) { - this.wex.ws.notify(notif); - } } async deleteTransactionInTx( tx: WalletDbReadWriteTransaction<["peerPushDebit", "transactionsMeta"]>, - ): Promise<{ notifs: WalletNotification[] }> { - return recordDelete(this, tx); + ): Promise<void> { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } + await h.update(undefined); } async suspendTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushDebitStatus.PendingCreatePurse: rec.status = PeerPushDebitStatus.SuspendedCreatePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.AbortingDeletePurse: rec.status = PeerPushDebitStatus.SuspendedAbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.PendingReady: rec.status = PeerPushDebitStatus.SuspendedReady; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.SuspendedAbortingDeletePurse: case PeerPushDebitStatus.SuspendedReady: case PeerPushDebitStatus.SuspendedCreatePurse: @@ -256,28 +257,33 @@ export class PeerPushDebitTransactionContext implements TransactionContext { case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); } async abortTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushDebitStatus.PendingReady: case PeerPushDebitStatus.SuspendedReady: rec.abortReason = reason; rec.status = PeerPushDebitStatus.AbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.SuspendedCreatePurse: case PeerPushDebitStatus.PendingCreatePurse: // Network request might already be in-flight! rec.abortReason = reason; rec.status = PeerPushDebitStatus.AbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.SuspendedAbortingDeletePurse: case PeerPushDebitStatus.Done: case PeerPushDebitStatus.AbortingDeletePurse: @@ -285,27 +291,32 @@ export class PeerPushDebitTransactionContext implements TransactionContext { case PeerPushDebitStatus.Expired: case PeerPushDebitStatus.Failed: // Do nothing - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); this.wex.taskScheduler.startShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushDebitStatus.SuspendedAbortingDeletePurse: rec.status = PeerPushDebitStatus.AbortingDeletePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.SuspendedReady: rec.status = PeerPushDebitStatus.PendingReady; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.SuspendedCreatePurse: rec.status = PeerPushDebitStatus.PendingCreatePurse; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.PendingCreatePurse: case PeerPushDebitStatus.AbortingDeletePurse: case PeerPushDebitStatus.PendingReady: @@ -314,16 +325,21 @@ export class PeerPushDebitTransactionContext implements TransactionContext { case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(reason?: TalerErrorDetail): Promise<void> { - await recordTransition(this, {}, async (rec) => { + await this.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await this.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushDebitStatus.AbortingDeletePurse: case PeerPushDebitStatus.SuspendedAbortingDeletePurse: @@ -333,16 +349,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext { case PeerPushDebitStatus.PendingCreatePurse: rec.status = PeerPushDebitStatus.Failed; rec.failReason = reason; - return TransitionResultType.Transition; + break; case PeerPushDebitStatus.Done: case PeerPushDebitStatus.Aborted: case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing - return TransitionResultType.Stay; + return; default: assertUnreachable(rec.status); } + await h.update(rec); }); this.wex.taskScheduler.stopShepherdTask(this.taskId); this.wex.taskScheduler.startShepherdTask(this.taskId); @@ -519,7 +536,11 @@ async function handlePurseCreationConflict( assertUnreachable(coinSelRes); } - await recordTransition(ctx, {}, async (rec) => { + await ctx.wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } switch (rec.status) { case PeerPushDebitStatus.PendingCreatePurse: case PeerPushDebitStatus.SuspendedCreatePurse: { @@ -528,10 +549,8 @@ async function handlePurseCreationConflict( coinPubs: sel.coins.map((x) => x.coinPub), contributions: sel.coins.map((x) => x.contribution), }; - return TransitionResultType.Transition; + await h.update(rec); } - default: - return TransitionResultType.Stay; } }); return TaskRunResult.progress(); @@ -590,52 +609,31 @@ async function processPeerPushDebitCreateReserve( assertUnreachable(coinSelRes); } - let transitionDone = false; - await recordTransition( - ctx, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "contractTerms", - "denominations", - "denominationFamilies", - "exchanges", - "refreshGroups", - "refreshSessions", - ], - }, - async (rec, tx) => { - if (rec.coinSel != null) { - return TransitionResultType.Stay; - } - - rec.coinSel = { - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => x.contribution), - }; - // FIXME: Instead of directly doing a spendCoin here, - // we might want to mark the coins as used and spend them - // after we've been able to create the purse. - await spendCoins(wex, tx, { - transactionId: ctx.transactionId, - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPush, - }); - - transitionDone = true; - return TransitionResultType.Transition; - }, - ); - if (transitionDone) { + return await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return TaskRunResult.backoff(); + } + if (rec.coinSel != null) { + return TaskRunResult.backoff(); + } + rec.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + }; + // FIXME: Instead of directly doing a spendCoin here, + // we might want to mark the coins as used and spend them + // after we've been able to create the purse. + await spendCoins(wex, tx, { + transactionId: ctx.transactionId, + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPush, + }); return TaskRunResult.progress(); - } else { - return TaskRunResult.backoff(); - } + }); } const purseAmount = peerPushInitiation.amount; @@ -735,11 +733,21 @@ async function processPeerPushDebitCreateReserve( continue; case HttpStatusCode.Gone: // FIXME we need PeerPushDebitStatus.ExpiredDeletePurse - await recordTransitionStatus( - ctx, - PeerPushDebitStatus.PendingCreatePurse, - PeerPushDebitStatus.AbortingDeletePurse, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushDebitStatus.PendingCreatePurse: + rec.status = PeerPushDebitStatus.AbortingDeletePurse; + break; + default: + // Do nothing + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); case HttpStatusCode.Conflict: // Handle double-spending @@ -762,19 +770,39 @@ async function processPeerPushDebitCreateReserve( const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub); switch (resp.case) { case "ok": - await recordTransitionStatus( - ctx, - PeerPushDebitStatus.PendingCreatePurse, - PeerPushDebitStatus.PendingReady, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushDebitStatus.PendingCreatePurse: + rec.status = PeerPushDebitStatus.PendingReady; + break; + default: + // Do nothing + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); case HttpStatusCode.Gone: // FIXME we need PeerPushDebitStatus.ExpiredDeletePurse - await recordTransitionStatus( - ctx, - PeerPushDebitStatus.PendingCreatePurse, - PeerPushDebitStatus.AbortingDeletePurse, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushDebitStatus.PendingCreatePurse: + rec.status = PeerPushDebitStatus.AbortingDeletePurse; + break; + default: + // Do nothing + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); case HttpStatusCode.NotFound: await ctx.failTransaction(resp.detail); @@ -811,50 +839,43 @@ async function processPeerPushDebitAbortingDeletePurse( await updateWithdrawalDenomsForCurrency(wex, currency); - await recordTransition( - ctx, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "denominationFamilies", - "refreshGroups", - "refreshSessions", - ], - }, - async (rec, tx) => { - if (rec.status !== PeerPushDebitStatus.AbortingDeletePurse) { - return TransitionResultType.Stay; - } - const currency = Amounts.currencyOf(rec.amount); - const coinPubs: CoinRefreshRequest[] = []; + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } - if (!rec.coinSel) { - return TransitionResultType.Stay; - } + if (rec.status !== PeerPushDebitStatus.AbortingDeletePurse) { + return; + } - for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: rec.coinSel.contributions[i], - coinPub: rec.coinSel.coinPubs[i], - }); - } - rec.status = PeerPushDebitStatus.Aborted; + const currency = Amounts.currencyOf(rec.amount); + const coinPubs: CoinRefreshRequest[] = []; - const refresh = await createRefreshGroup( - wex, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - ctx.transactionId, - ); - rec.abortRefreshGroupId = refresh.refreshGroupId; - return TransitionResultType.Transition; - }, - ); + if (!rec.coinSel) { + return; + } + + for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: rec.coinSel.contributions[i], + coinPub: rec.coinSel.coinPubs[i], + }); + } + rec.status = PeerPushDebitStatus.Aborted; + + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + ctx.transactionId, + ); + rec.abortRefreshGroupId = refresh.refreshGroupId; + + await h.update(rec); + }); return TaskRunResult.backoff(); } @@ -880,59 +901,59 @@ async function processPeerPushDebitReady( if (!isPurseMerged(resp.body)) { return TaskRunResult.longpollReturnedPending(); } else { - await recordTransitionStatus( - ctx, - PeerPushDebitStatus.PendingReady, - PeerPushDebitStatus.Done, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + switch (rec.status) { + case PeerPushDebitStatus.PendingReady: + rec.status = PeerPushDebitStatus.Done; + break; + default: + // Do nothing + return; + } + await h.update(rec); + }); return TaskRunResult.progress(); } } case HttpStatusCode.Gone: logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - await recordTransition( - ctx, - { - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "denominationFamilies", - "refreshGroups", - "refreshSessions", - ], - }, - async (rec, tx) => { - if (rec.status !== PeerPushDebitStatus.PendingReady) { - return TransitionResultType.Stay; + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const [rec, h] = await ctx.getRecordHandle(tx); + if (!rec) { + return; + } + if (rec.status !== PeerPushDebitStatus.PendingReady) { + return; + } + const currency = Amounts.currencyOf(rec.amount); + const coinPubs: CoinRefreshRequest[] = []; + + if (rec.coinSel) { + for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: rec.coinSel.contributions[i], + coinPub: rec.coinSel.coinPubs[i], + }); } - const currency = Amounts.currencyOf(rec.amount); - const coinPubs: CoinRefreshRequest[] = []; - - if (rec.coinSel) { - for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: rec.coinSel.contributions[i], - coinPub: rec.coinSel.coinPubs[i], - }); - } - const refresh = await createRefreshGroup( - wex, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - ctx.transactionId, - ); + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + ctx.transactionId, + ); - rec.abortRefreshGroupId = refresh.refreshGroupId; - } - rec.status = PeerPushDebitStatus.Aborted; - return TransitionResultType.Transition; - }, - ); + rec.abortRefreshGroupId = refresh.refreshGroupId; + } + rec.status = PeerPushDebitStatus.Aborted; + await h.update(rec); + }); return TaskRunResult.backoff(); case HttpStatusCode.NotFound: throw Error("peer push credit disappeared");