taler-typescript-core

Wallet core logic and WebUIs for various components
Log | Files | Refs | Submodules | README | LICENSE

commit 296c0e2afd2cb4b5c7fc3c076976daee55965d9e
parent b1598757f43a5f1e1a5cef0e950ff1a9caafc768
Author: Antoine A <>
Date:   Tue, 15 Apr 2025 16:54:47 +0200

wallet-core: clean pay-peer-pull-debit

Diffstat:
Mpackages/taler-wallet-core/src/pay-peer-pull-debit.ts | 456++++++++++++++++++++++++++++++++-----------------------------------------------
1 file changed, 187 insertions(+), 269 deletions(-)

diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2022-2024 Taler Systems S.A. + (C) 2022-2025 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -20,9 +20,6 @@ * paying for an invoice the wallet received from another wallet. */ -/** - * Imports. - */ import { AcceptPeerPullPaymentResponse, Amounts, @@ -88,7 +85,7 @@ import { RefreshOperationStatus, WalletDbAllStoresReadOnlyTransaction, WalletDbReadWriteTransaction, - WalletStoresV1, + WalletDbStoresArr, timestampPreciseFromDb, timestampPreciseToDb, } from "./db.js"; @@ -98,7 +95,6 @@ import { getTotalPeerPaymentCost, queryCoinInfosForSelection, } from "./pay-peer-common.js"; -import { DbReadWriteTransaction, StoreNames } from "./query.js"; import { createRefreshGroup } from "./refresh.js"; import { BalanceEffect, @@ -106,6 +102,7 @@ import { isUnsuccessfulTransaction, notifyTransition, parseTransactionIdentifier, + TransitionInfo, } from "./transactions.js"; import { WalletExecutionContext } from "./wallet.js"; @@ -115,13 +112,13 @@ const logger = new Logger("pay-peer-pull-debit.ts"); * Common context for a peer-pull-debit transaction. */ export class PeerPullDebitTransactionContext implements TransactionContext { - wex: WalletExecutionContext; readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; - peerPullDebitId: string; - constructor(wex: WalletExecutionContext, peerPullDebitId: string) { - this.wex = wex; + constructor( + public wex: WalletExecutionContext, + public peerPullDebitId: string + ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullDebit, peerPullDebitId, @@ -130,7 +127,6 @@ export class PeerPullDebitTransactionContext implements TransactionContext { tag: PendingTaskType.PeerPullDebit, peerPullDebitId, }); - this.peerPullDebitId = peerPullDebitId; } async updateTransactionMeta( @@ -197,12 +193,8 @@ export class PeerPullDebitTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const res = await this.wex.db.runReadWriteTx( - { - storeNames: ["peerPullDebit", "tombstones", "transactionsMeta"], - }, - async (tx) => { - return this.deleteTransactionInTx(tx); - }, + { storeNames: ["peerPullDebit", "tombstones", "transactionsMeta"] }, + async (tx) => this.deleteTransactionInTx(tx) ); for (const notif of res.notifs) { this.wex.ws.notify(notif); @@ -221,7 +213,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } const oldTxState = computePeerPullDebitTransactionState(rec); await tx.peerPullDebit.delete(rec.peerPullDebitId); - await this.updateTransactionMeta(tx); + await tx.transactionsMeta.delete(this.transactionId); notifs.push({ type: NotificationType.TransactionStateTransition, transactionId: this.transactionId, @@ -234,69 +226,36 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const taskId = this.taskId; - const transactionId = this.transactionId; - const wex = this.wex; - const peerPullDebitId = this.peerPullDebitId; - const transitionInfo = await wex.db.runReadWriteTx( - { storeNames: ["peerPullDebit", "transactionsMeta"] }, - async (tx) => { - const pullDebitRec = await tx.peerPullDebit.get(peerPullDebitId); - if (!pullDebitRec) { - logger.warn(`peer pull debit ${peerPullDebitId} not found`); - return; - } - let newStatus: PeerPullDebitRecordStatus | undefined = undefined; - switch (pullDebitRec.status) { - case PeerPullDebitRecordStatus.DialogProposed: - break; - case PeerPullDebitRecordStatus.Done: - break; - case PeerPullDebitRecordStatus.PendingDeposit: - newStatus = PeerPullDebitRecordStatus.SuspendedDeposit; - break; - case PeerPullDebitRecordStatus.SuspendedDeposit: - break; - case PeerPullDebitRecordStatus.Aborted: - break; - case PeerPullDebitRecordStatus.AbortingRefresh: - newStatus = PeerPullDebitRecordStatus.SuspendedAbortingRefresh; - break; - case PeerPullDebitRecordStatus.Failed: - break; - case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: - break; - default: - assertUnreachable(pullDebitRec.status); - } - if (newStatus != null) { - const oldTxState = computePeerPullDebitTransactionState(pullDebitRec); - pullDebitRec.status = newStatus; - const newTxState = computePeerPullDebitTransactionState(pullDebitRec); - await tx.peerPullDebit.put(pullDebitRec); - await this.updateTransactionMeta(tx); - return { - oldTxState, - newTxState, - balanceEffect: BalanceEffect.None, - }; - } - return undefined; - }, - ); - notifyTransition(wex, transactionId, transitionInfo); - wex.taskScheduler.stopShepherdTask(taskId); + await this.transition({}, async (rec) => { + switch (rec.status) { + case PeerPullDebitRecordStatus.DialogProposed: + case PeerPullDebitRecordStatus.Done: + case PeerPullDebitRecordStatus.SuspendedDeposit: + case PeerPullDebitRecordStatus.Aborted: + case PeerPullDebitRecordStatus.Failed: + case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: + return TransitionResultType.Stay; + case PeerPullDebitRecordStatus.PendingDeposit: + rec.status = PeerPullDebitRecordStatus.SuspendedDeposit; + return TransitionResultType.Transition; + case PeerPullDebitRecordStatus.AbortingRefresh: + rec.status = PeerPullDebitRecordStatus.SuspendedAbortingRefresh; + return TransitionResultType.Transition; + default: + assertUnreachable(rec.status); + } + }) + this.wex.taskScheduler.stopShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - const ctx = this; - await ctx.transition(async (pi) => { - switch (pi.status) { + await this.transition({}, async (rec) => { + switch (rec.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: - pi.status = PeerPullDebitRecordStatus.PendingDeposit; + rec.status = PeerPullDebitRecordStatus.PendingDeposit; return TransitionResultType.Transition; case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: - pi.status = PeerPullDebitRecordStatus.AbortingRefresh; + rec.status = PeerPullDebitRecordStatus.AbortingRefresh; return TransitionResultType.Transition; case PeerPullDebitRecordStatus.Aborted: case PeerPullDebitRecordStatus.AbortingRefresh: @@ -311,16 +270,15 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } async failTransaction(reason?: TalerErrorDetail): Promise<void> { - const ctx = this; - await ctx.transition(async (pi) => { - switch (pi.status) { + await this.transition({}, async (rec) => { + switch (rec.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: case PeerPullDebitRecordStatus.PendingDeposit: case PeerPullDebitRecordStatus.AbortingRefresh: case PeerPullDebitRecordStatus.SuspendedAbortingRefresh: // FIXME: Should we also abort the corresponding refresh session?! - pi.status = PeerPullDebitRecordStatus.Failed; - pi.failReason = reason; + rec.status = PeerPullDebitRecordStatus.Failed; + rec.failReason = reason; return TransitionResultType.Transition; default: return TransitionResultType.Stay; @@ -330,8 +288,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } async abortTransaction(reason?: TalerErrorDetail): Promise<void> { - const ctx = this; - await ctx.transitionExtra( + await this.transition( { extraStores: [ "coinAvailability", @@ -366,7 +323,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } const refresh = await createRefreshGroup( - ctx.wex, + this.wex, tx, currency, coinPubs, @@ -380,47 +337,56 @@ export class PeerPullDebitTransactionContext implements TransactionContext { return TransitionResultType.Transition; }, ); + this.wex.taskScheduler.stopShepherdTask(this.taskId); + this.wex.taskScheduler.startShepherdTask(this.taskId); } - async transition( - f: (rec: PeerPullPaymentIncomingRecord) => Promise<TransitionResultType>, - ): Promise<void> { - return this.transitionExtra( - { - extraStores: [], - }, - f, - ); - } - - async transitionExtra< - StoreNameArray extends Array<StoreNames<typeof WalletStoresV1>> = [], - >( - opts: { extraStores: StoreNameArray }, - f: ( + /** + * Transition an existing peer-pull-debit transaction. + * Extra object stores may be accessed during the transition. + */ + async transition<StoreNameArray extends WalletDbStoresArr>( + opts: { extraStores?: StoreNameArray }, + lambda: ( rec: PeerPullPaymentIncomingRecord, - tx: DbReadWriteTransaction< - typeof WalletStoresV1, - ["peerPullDebit", "transactionsMeta", ...StoreNameArray] + tx: WalletDbReadWriteTransaction< + [ + "peerPullDebit", + "transactionsMeta", + ...StoreNameArray, + ] >, ) => Promise<TransitionResultType>, - ): Promise<void> { - const wex = this.wex; - const extraStores = opts.extraStores ?? []; - const transitionInfo = await wex.db.runReadWriteTx( - { storeNames: ["peerPullDebit", "transactionsMeta", ...extraStores] }, + ): Promise<TransitionInfo | undefined> { + const baseStores = [ + "peerPullDebit" as const, + "transactionsMeta" as const + ]; + const stores = opts.extraStores + ? [...baseStores, ...opts.extraStores] + : baseStores; + + const transitionInfo = await this.wex.db.runReadWriteTx( + { storeNames: stores }, async (tx) => { - const pi = await tx.peerPullDebit.get(this.peerPullDebitId); - if (!pi) { - throw Error("peer pull payment not found anymore"); + const rec = await tx.peerPullDebit.get(this.peerPullDebitId); + if (!rec) { + logger.warn(`peer pull payment ${this.peerPullDebitId} not found`); + return; } - const oldTxState = computePeerPullDebitTransactionState(pi); - const res = await f(pi, tx); + const oldTxState = computePeerPullDebitTransactionState(rec); + const res = await lambda(rec, tx); switch (res) { case TransitionResultType.Transition: { - await tx.peerPullDebit.put(pi); - await this.updateTransactionMeta(tx); - const newTxState = computePeerPullDebitTransactionState(pi); + await tx.peerPullDebit.put(rec); + await tx.transactionsMeta.put({ + transactionId: this.transactionId, + status: rec.status, + timestamp: rec.timestampCreated, + currency: Amounts.currencyOf(rec.amount), + exchanges: [rec.exchangeBaseUrl], + }); + const newTxState = computePeerPullDebitTransactionState(rec); return { oldTxState, newTxState, @@ -429,7 +395,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } case TransitionResultType.Delete: { await tx.peerPullDebit.delete(this.peerPullDebitId); - await this.updateTransactionMeta(tx); + await tx.transactionsMeta.delete(this.transactionId); return { oldTxState, newTxState: { @@ -438,14 +404,30 @@ export class PeerPullDebitTransactionContext implements TransactionContext { balanceEffect: BalanceEffect.Any, }; } - default: - return undefined; + case TransitionResultType.Stay: + return; } }, ); - wex.taskScheduler.stopShepherdTask(this.taskId); - notifyTransition(wex, this.transactionId, transitionInfo); - wex.taskScheduler.startShepherdTask(this.taskId); + notifyTransition(this.wex, this.transactionId, transitionInfo); + return transitionInfo; + } + + /** + * Transition an existing peer-pull-debit transaction status + */ + async transitionStatus( + from: PeerPullDebitRecordStatus, + to: PeerPullDebitRecordStatus + ) { + await this.transition({}, async (rec) => { + if (rec.status !== from) { + return TransitionResultType.Stay + } else { + rec.status = to; + return TransitionResultType.Transition + } + }); } } @@ -514,31 +496,22 @@ async function handlePurseCreationConflict( coinSelRes.result.coins, ); - await ws.db.runReadWriteTx( - { storeNames: ["peerPullDebit", "transactionsMeta"] }, - async (tx) => { - const myPpi = await tx.peerPullDebit.get(peerPullInc.peerPullDebitId); - if (!myPpi) { - return; - } - switch (myPpi.status) { - case PeerPullDebitRecordStatus.PendingDeposit: - case PeerPullDebitRecordStatus.SuspendedDeposit: { - const sel = coinSelRes.result; - myPpi.coinSel = { - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => x.contribution), - totalCost: Amounts.stringify(totalAmount), - }; - break; - } - default: - return; + await ctx.transition({}, async (rec) => { + switch (rec.status) { + case PeerPullDebitRecordStatus.PendingDeposit: + case PeerPullDebitRecordStatus.SuspendedDeposit: { + const sel = coinSelRes.result; + rec.coinSel = { + coinPubs: sel.coins.map((x) => x.coinPub), + contributions: sel.coins.map((x) => x.contribution), + totalCost: Amounts.stringify(totalAmount), + }; + return TransitionResultType.Transition } - await tx.peerPullDebit.put(myPpi); - await ctx.updateTransactionMeta(tx); - }, - ); + default: + return TransitionResultType.Stay + } + }); return TaskRunResult.backoff(); } @@ -584,35 +557,25 @@ async function processPeerPullDebitPendingDeposit( assertUnreachable(coinSelRes); } - const peerPullDebitId = peerPullInc.peerPullDebitId; const totalAmount = await getTotalPeerPaymentCost(wex, coins); // FIXME: Missing notification here! - - const transitionDone = await wex.db.runReadWriteTx( + let transitionDone = false; + await ctx.transition( { - storeNames: [ + extraStores: [ "coinAvailability", "coinHistory", "coins", "denominations", "exchanges", - "peerPullDebit", "refreshGroups", "refreshSessions", - "transactionsMeta", ], }, - async (tx) => { - const pi = await tx.peerPullDebit.get(peerPullDebitId); - if (!pi) { - return false; - } - if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return false; - } - if (pi.coinSel) { - return false; + async (rec, tx) => { + if (rec.status !== PeerPullDebitRecordStatus.PendingDeposit || rec.coinSel == null) { + return TransitionResultType.Stay; } await spendCoins(wex, tx, { transactionId: ctx.transactionId, @@ -622,14 +585,13 @@ async function processPeerPullDebitPendingDeposit( ), refreshReason: RefreshReason.PayPeerPull, }); - pi.coinSel = { + rec.coinSel = { coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), contributions: coinSelRes.result.coins.map((x) => x.contribution), totalCost: Amounts.stringify(totalAmount), }; - await tx.peerPullDebit.put(pi); - await ctx.updateTransactionMeta(tx); - return true; + transitionDone = true; + return TransitionResultType.Transition; }, ); if (transitionDone) { @@ -711,14 +673,7 @@ async function processPeerPullDebitPendingDeposit( } // All batches succeeded, we can transition! - - await ctx.transition(async (r) => { - if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return TransitionResultType.Stay; - } - r.status = PeerPullDebitRecordStatus.Done; - return TransitionResultType.Transition; - }); + await ctx.transitionStatus(PeerPullDebitRecordStatus.PendingDeposit, PeerPullDebitRecordStatus.Done); return TaskRunResult.finished(); } @@ -730,41 +685,28 @@ async function processPeerPullDebitAbortingRefresh( const abortRefreshGroupId = peerPullInc.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId); - const transitionInfo = await wex.db.runReadWriteTx( - { storeNames: ["peerPullDebit", "refreshGroups", "transactionsMeta"] }, - async (tx) => { - const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); - let newOpState: PeerPullDebitRecordStatus | undefined; - if (!refreshGroup) { - // Maybe it got manually deleted? Means that we should - // just go into failed. - logger.warn("no aborting refresh group found for deposit group"); - newOpState = PeerPullDebitRecordStatus.Failed; - } else { - if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { - newOpState = PeerPullDebitRecordStatus.Aborted; - } else if ( - refreshGroup.operationStatus === RefreshOperationStatus.Failed - ) { - newOpState = PeerPullDebitRecordStatus.Failed; - } - } - if (newOpState) { - const newDg = await tx.peerPullDebit.get(peerPullDebitId); - if (!newDg) { - return; + await ctx.transition({ extraStores: ["refreshGroups"] }, async (rec, tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + if (refreshGroup == null) { + // Maybe it got manually deleted? Means that we should + // just go into failed. + logger.warn("no aborting refresh group found for deposit group"); + rec.status = PeerPullDebitRecordStatus.Failed; + return TransitionResultType.Transition + } else { + switch (refreshGroup.operationStatus) { + case RefreshOperationStatus.Finished: + rec.status = PeerPullDebitRecordStatus.Aborted; + return TransitionResultType.Transition + case RefreshOperationStatus.Failed: { + rec.status = PeerPullDebitRecordStatus.Failed; + return TransitionResultType.Transition } - const oldTxState = computePeerPullDebitTransactionState(newDg); - newDg.status = newOpState; - const newTxState = computePeerPullDebitTransactionState(newDg); - await tx.peerPullDebit.put(newDg); - await ctx.updateTransactionMeta(tx); - return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any }; + default: + return TransitionResultType.Stay } - return undefined; - }, - ); - notifyTransition(wex, ctx.transactionId, transitionInfo); + } + }) // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } @@ -779,9 +721,7 @@ export async function processPeerPullDebit( const peerPullInc = await wex.db.runReadOnlyTx( { storeNames: ["peerPullDebit"] }, - async (tx) => { - return tx.peerPullDebit.get(peerPullDebitId); - }, + async (tx) => tx.peerPullDebit.get(peerPullDebitId) ); if (!peerPullInc) { throw Error("peer pull debit not found"); @@ -800,28 +740,23 @@ export async function confirmPeerPullDebit( wex: WalletExecutionContext, req: ConfirmPeerPullDebitRequest, ): Promise<AcceptPeerPullPaymentResponse> { - let peerPullDebitId: string; - const parsedTx = parseTransactionIdentifier(req.transactionId); - if (!parsedTx || parsedTx.tag !== TransactionType.PeerPullDebit) { + const parsed = parseTransactionIdentifier(req.transactionId); + if (!parsed || parsed.tag !== TransactionType.PeerPullDebit) { throw Error("invalid peer-pull-debit transaction identifier"); } - peerPullDebitId = parsedTx.peerPullDebitId; const peerPullInc = await wex.db.runReadOnlyTx( { storeNames: ["peerPullDebit"] }, - async (tx) => { - return tx.peerPullDebit.get(peerPullDebitId); - }, + async (tx) => tx.peerPullDebit.get(parsed.peerPullDebitId) ); - if (!peerPullInc) { + if (peerPullInc == null) { throw Error( `can't accept unknown incoming p2p pull payment (${req.transactionId})`, ); } - const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId); - const transactionId = ctx.transactionId; + const ctx = new PeerPullDebitTransactionContext(wex, parsed.peerPullDebitId); const instructedAmount = Amounts.parseOrThrow(peerPullInc.amount); @@ -854,58 +789,42 @@ export async function confirmPeerPullDebit( const totalAmount = await getTotalPeerPaymentCost(wex, coins); - const transitionInfo = await wex.db.runReadWriteTx( - { - storeNames: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "exchanges", - "peerPullDebit", - "refreshGroups", - "refreshSessions", - "transactionsMeta", - ], - }, - async (tx) => { - const pi = await tx.peerPullDebit.get(peerPullDebitId); - if (!pi) { - throw Error(); - } - if (pi.status !== PeerPullDebitRecordStatus.DialogProposed) { - return; - } - const oldTxState = computePeerPullDebitTransactionState(pi); - if (coinSelRes.type == "success") { - await spendCoins(wex, tx, { - transactionId, - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPull, - }); - pi.coinSel = { - coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), - contributions: coinSelRes.result.coins.map((x) => x.contribution), - totalCost: Amounts.stringify(totalAmount), - }; - } - pi.status = PeerPullDebitRecordStatus.PendingDeposit; - await ctx.updateTransactionMeta(tx); - await tx.peerPullDebit.put(pi); - const newTxState = computePeerPullDebitTransactionState(pi); - return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any }; - }, - ); - - notifyTransition(wex, transactionId, transitionInfo); - + await ctx.transition({ + extraStores: [ + "coinAvailability", + "coinHistory", + "coins", + "denominations", + "exchanges", + "refreshGroups", + "refreshSessions", + ] + }, async (rec, tx) => { + if (rec.status !== PeerPullDebitRecordStatus.DialogProposed) { + return TransitionResultType.Stay; + } + if (coinSelRes.type == "success") { + await spendCoins(wex, tx, { + transactionId: ctx.transactionId, + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPull, + }); + rec.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + totalCost: Amounts.stringify(totalAmount), + }; + } + rec.status = PeerPullDebitRecordStatus.PendingDeposit; + return TransitionResultType.Transition + }) wex.taskScheduler.startShepherdTask(ctx.taskId); return { - transactionId, + transactionId: ctx.transactionId, }; } @@ -1059,7 +978,6 @@ export async function preparePeerPullDebit( const currency = Amounts.currencyOf(totalAmount); const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId); - await wex.db.runReadWriteTx( { storeNames: ["peerPullDebit", "contractTerms", "transactionsMeta"] }, async (tx) => {