commit 1db9018c79b14e8b605526cdd13775bcd2345801
parent 296c0e2afd2cb4b5c7fc3c076976daee55965d9e
Author: Antoine A <>
Date: Tue, 15 Apr 2025 18:05:18 +0200
wallet-core: clean pay-peer-push-debit
Diffstat:
1 file changed, 315 insertions(+), 417 deletions(-)
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2022-2023 Taler Systems S.A.
+ (C) 2022-2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -68,6 +68,7 @@ import {
TaskRunResult,
TaskRunResultType,
TransactionContext,
+ TransitionResultType,
constructTaskIdentifier,
runWithClientCancellation,
spendCoins,
@@ -78,13 +79,13 @@ import {
PeerPushDebitStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadWriteTransaction,
+ WalletDbStoresArr,
timestampPreciseFromDb,
timestampPreciseToDb,
timestampProtocolFromDb,
timestampProtocolToDb,
} from "./db.js";
import {
- fetchFreshExchange,
getPreferredExchangeForCurrency,
getScopeForAllExchanges,
} from "./exchanges.js";
@@ -100,6 +101,7 @@ import {
constructTransactionIdentifier,
isUnsuccessfulTransaction,
notifyTransition,
+ TransitionInfo,
} from "./transactions.js";
import { WalletExecutionContext } from "./wallet.js";
import { updateWithdrawalDenomsForCurrency } from "./withdraw.js";
@@ -124,21 +126,114 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
});
}
+ /**
+ * Transition an existing peer-pull-debit transaction.
+ * Extra object stores may be accessed during the transition.
+ */
+ async transition<StoreNameArray extends WalletDbStoresArr>(
+ opts: { extraStores?: StoreNameArray },
+ lambda: (
+ rec: PeerPushDebitRecord,
+ tx: WalletDbReadWriteTransaction<
+ [
+ "peerPushDebit",
+ "transactionsMeta",
+ ...StoreNameArray,
+ ]
+ >,
+ ) => Promise<TransitionResultType>,
+ ): Promise<TransitionInfo | undefined> {
+ const baseStores = [
+ "peerPushDebit" as const,
+ "transactionsMeta" as const
+ ];
+ const stores = opts.extraStores
+ ? [...baseStores, ...opts.extraStores]
+ : baseStores;
+
+ let errorThrown: Error | undefined;
+ const transitionInfo = await this.wex.db.runReadWriteTx(
+ { storeNames: stores },
+ async (tx) => {
+ const rec = await tx.peerPushDebit.get(this.pursePub);
+ if (rec == null) {
+ logger.warn(`peer pull debit ${this.pursePub} not found`);
+ return;
+ }
+ let oldTxState = computePeerPushDebitTransactionState(rec);
+ let res: TransitionResultType;
+ try {
+ res = await lambda(rec, tx);
+ } catch (error) {
+ if (error instanceof Error) {
+ errorThrown = error;
+ }
+ return undefined;
+ }
+
+ switch (res) {
+ case TransitionResultType.Transition: {
+ await tx.peerPushDebit.put(rec)
+ await tx.transactionsMeta.put({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.timestampCreated,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ })
+ const newTxState = computePeerPushDebitTransactionState(rec);
+ return {
+ oldTxState,
+ newTxState,
+ balanceEffect: BalanceEffect.Any,
+ };
+ }
+ case TransitionResultType.Delete:
+ throw new Error("Cannot delete using transition");
+ case TransitionResultType.Stay:
+ return;
+ }
+ },
+ );
+ if (errorThrown != null) {
+ throw errorThrown;
+ }
+ notifyTransition(this.wex, this.transactionId, transitionInfo);
+ return transitionInfo;
+ }
+
+ /**
+ * Transition an existing peer-push-debit transaction status
+ */
+ async transitionStatus(
+ from: PeerPushDebitStatus,
+ to: PeerPushDebitStatus
+ ) {
+ await this.transition({}, async (rec) => {
+ if (rec.status !== from) {
+ return TransitionResultType.Stay
+ } else {
+ rec.status = to;
+ return TransitionResultType.Transition
+ }
+ });
+ }
+
async updateTransactionMeta(
tx: WalletDbReadWriteTransaction<["peerPushDebit", "transactionsMeta"]>,
): Promise<void> {
- const ppdRec = await tx.peerPushDebit.get(this.pursePub);
- if (!ppdRec) {
+ const rec = await tx.peerPushDebit.get(this.pursePub);
+ if (rec == null) {
await tx.transactionsMeta.delete(this.transactionId);
- return;
+ } else {
+ await tx.transactionsMeta.put({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.timestampCreated,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ });
}
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: ppdRec.status,
- timestamp: ppdRec.timestampCreated,
- currency: Amounts.currencyOf(ppdRec.amount),
- exchanges: [ppdRec.exchangeBaseUrl],
- });
}
/**
@@ -151,7 +246,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
tx: WalletDbAllStoresReadOnlyTransaction,
): Promise<Transaction | undefined> {
const pushDebitRec = await tx.peerPushDebit.get(this.pursePub);
- if (!pushDebitRec) {
+ if (pushDebitRec == null) {
return undefined;
}
const retryRec = await tx.operationRetries.get(this.taskId);
@@ -202,12 +297,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const res = await this.wex.db.runReadWriteTx(
- {
- storeNames: ["peerPushDebit", "tombstones", "transactionsMeta"],
- },
- async (tx) => {
- return this.deleteTransactionInTx(tx);
- },
+ { storeNames: ["peerPushDebit", "tombstones", "transactionsMeta"] },
+ async (tx) => this.deleteTransactionInTx(tx)
);
for (const notif of res.notifs) {
this.wex.ws.notify(notif);
@@ -239,208 +330,115 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { wex, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const pushDebitRec = await tx.peerPushDebit.get(pursePub);
- if (!pushDebitRec) {
- logger.warn(`peer push debit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPushDebitStatus | undefined = undefined;
- switch (pushDebitRec.status) {
- case PeerPushDebitStatus.PendingCreatePurse:
- newStatus = PeerPushDebitStatus.SuspendedCreatePurse;
- break;
- case PeerPushDebitStatus.AbortingDeletePurse:
- newStatus = PeerPushDebitStatus.SuspendedAbortingDeletePurse;
- break;
- case PeerPushDebitStatus.PendingReady:
- newStatus = PeerPushDebitStatus.SuspendedReady;
- break;
- case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
- case PeerPushDebitStatus.SuspendedReady:
- case PeerPushDebitStatus.SuspendedCreatePurse:
- case PeerPushDebitStatus.Done:
- case PeerPushDebitStatus.Aborted:
- case PeerPushDebitStatus.Failed:
- case PeerPushDebitStatus.Expired:
- // Do nothing
- break;
- default:
- assertUnreachable(pushDebitRec.status);
- }
- if (newStatus != null) {
- const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
- pushDebitRec.status = newStatus;
- const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
- await tx.peerPushDebit.put(pushDebitRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.None,
- };
- }
- return undefined;
- },
- );
- wex.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
+ this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPushDebitStatus.PendingCreatePurse:
+ rec.status = PeerPushDebitStatus.SuspendedCreatePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.AbortingDeletePurse:
+ rec.status = PeerPushDebitStatus.SuspendedAbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.PendingReady:
+ rec.status = PeerPushDebitStatus.SuspendedReady;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
+ case PeerPushDebitStatus.SuspendedReady:
+ case PeerPushDebitStatus.SuspendedCreatePurse:
+ case PeerPushDebitStatus.Done:
+ case PeerPushDebitStatus.Aborted:
+ case PeerPushDebitStatus.Failed:
+ case PeerPushDebitStatus.Expired:
+ // Do nothing
+ return TransitionResultType.Stay;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.stopShepherdTask(this.taskId)
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const pushDebitRec = await tx.peerPushDebit.get(pursePub);
- if (!pushDebitRec) {
- logger.warn(`peer push debit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPushDebitStatus | undefined = undefined;
- switch (pushDebitRec.status) {
- case PeerPushDebitStatus.PendingReady:
- case PeerPushDebitStatus.SuspendedReady:
- pushDebitRec.abortReason = reason;
- newStatus = PeerPushDebitStatus.AbortingDeletePurse;
- break;
- case PeerPushDebitStatus.SuspendedCreatePurse:
- case PeerPushDebitStatus.PendingCreatePurse:
- // Network request might already be in-flight!
- pushDebitRec.abortReason = reason;
- newStatus = PeerPushDebitStatus.AbortingDeletePurse;
- break;
- case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
- case PeerPushDebitStatus.Done:
- case PeerPushDebitStatus.AbortingDeletePurse:
- case PeerPushDebitStatus.Aborted:
- case PeerPushDebitStatus.Expired:
- case PeerPushDebitStatus.Failed:
- // Do nothing
- break;
- default:
- assertUnreachable(pushDebitRec.status);
- }
- if (newStatus != null) {
- const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
- pushDebitRec.status = newStatus;
- const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
- await tx.peerPushDebit.put(pushDebitRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- return undefined;
- },
- );
- wex.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
- wex.taskScheduler.startShepherdTask(retryTag);
+ this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPushDebitStatus.PendingReady:
+ case PeerPushDebitStatus.SuspendedReady:
+ rec.abortReason = reason;
+ rec.status = PeerPushDebitStatus.AbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.SuspendedCreatePurse:
+ case PeerPushDebitStatus.PendingCreatePurse:
+ // Network request might already be in-flight!
+ rec.abortReason = reason;
+ rec.status = PeerPushDebitStatus.AbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
+ case PeerPushDebitStatus.Done:
+ case PeerPushDebitStatus.AbortingDeletePurse:
+ case PeerPushDebitStatus.Aborted:
+ case PeerPushDebitStatus.Expired:
+ case PeerPushDebitStatus.Failed:
+ // Do nothing
+ return TransitionResultType.Stay;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.stopShepherdTask(this.taskId);
+ this.wex.taskScheduler.startShepherdTask(this.taskId);
}
async resumeTransaction(): Promise<void> {
- const { wex, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const pushDebitRec = await tx.peerPushDebit.get(pursePub);
- if (!pushDebitRec) {
- logger.warn(`peer push debit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPushDebitStatus | undefined = undefined;
- switch (pushDebitRec.status) {
- case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
- newStatus = PeerPushDebitStatus.AbortingDeletePurse;
- break;
- case PeerPushDebitStatus.SuspendedReady:
- newStatus = PeerPushDebitStatus.PendingReady;
- break;
- case PeerPushDebitStatus.SuspendedCreatePurse:
- newStatus = PeerPushDebitStatus.PendingCreatePurse;
- break;
- case PeerPushDebitStatus.PendingCreatePurse:
- case PeerPushDebitStatus.AbortingDeletePurse:
- case PeerPushDebitStatus.PendingReady:
- case PeerPushDebitStatus.Done:
- case PeerPushDebitStatus.Aborted:
- case PeerPushDebitStatus.Failed:
- case PeerPushDebitStatus.Expired:
- // Do nothing
- break;
- default:
- assertUnreachable(pushDebitRec.status);
- }
- if (newStatus != null) {
- const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
- pushDebitRec.status = newStatus;
- const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
- await tx.peerPushDebit.put(pushDebitRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.None,
- };
- }
- return undefined;
- },
- );
- wex.taskScheduler.startShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
+ this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
+ rec.status = PeerPushDebitStatus.AbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.SuspendedReady:
+ rec.status = PeerPushDebitStatus.PendingReady;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.SuspendedCreatePurse:
+ rec.status = PeerPushDebitStatus.PendingCreatePurse;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.PendingCreatePurse:
+ case PeerPushDebitStatus.AbortingDeletePurse:
+ case PeerPushDebitStatus.PendingReady:
+ case PeerPushDebitStatus.Done:
+ case PeerPushDebitStatus.Aborted:
+ case PeerPushDebitStatus.Failed:
+ case PeerPushDebitStatus.Expired:
+ // Do nothing
+ return TransitionResultType.Stay;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const pushDebitRec = await tx.peerPushDebit.get(pursePub);
- if (!pushDebitRec) {
- logger.warn(`peer push debit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPushDebitStatus;
- switch (pushDebitRec.status) {
- case PeerPushDebitStatus.AbortingDeletePurse:
- case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
- case PeerPushDebitStatus.PendingReady:
- case PeerPushDebitStatus.SuspendedReady:
- case PeerPushDebitStatus.SuspendedCreatePurse:
- case PeerPushDebitStatus.PendingCreatePurse:
- newStatus = PeerPushDebitStatus.Failed;
- break;
- case PeerPushDebitStatus.Done:
- case PeerPushDebitStatus.Aborted:
- case PeerPushDebitStatus.Failed:
- case PeerPushDebitStatus.Expired:
- // Do nothing
- return undefined;
- default:
- assertUnreachable(pushDebitRec.status);
- }
- const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
- pushDebitRec.status = newStatus;
- pushDebitRec.failReason = reason;
- const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
- await tx.peerPushDebit.put(pushDebitRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- wex.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
- wex.taskScheduler.startShepherdTask(retryTag);
+ this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPushDebitStatus.AbortingDeletePurse:
+ case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
+ case PeerPushDebitStatus.PendingReady:
+ case PeerPushDebitStatus.SuspendedReady:
+ case PeerPushDebitStatus.SuspendedCreatePurse:
+ case PeerPushDebitStatus.PendingCreatePurse:
+ rec.status = PeerPushDebitStatus.Failed;
+ rec.failReason = reason;
+ return TransitionResultType.Transition;
+ case PeerPushDebitStatus.Done:
+ case PeerPushDebitStatus.Aborted:
+ case PeerPushDebitStatus.Failed:
+ case PeerPushDebitStatus.Expired:
+ // Do nothing
+ return TransitionResultType.Stay;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.stopShepherdTask(this.taskId);
+ this.wex.taskScheduler.startShepherdTask(this.taskId);
}
}
@@ -609,30 +607,21 @@ async function handlePurseCreationConflict(
assertUnreachable(coinSelRes);
}
- await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub);
- if (!myPpi) {
- return;
- }
- switch (myPpi.status) {
- case PeerPushDebitStatus.PendingCreatePurse:
- case PeerPushDebitStatus.SuspendedCreatePurse: {
- const sel = coinSelRes.result;
- myPpi.coinSel = {
- coinPubs: sel.coins.map((x) => x.coinPub),
- contributions: sel.coins.map((x) => x.contribution),
- };
- break;
- }
- default:
- return;
+ await ctx.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPushDebitStatus.PendingCreatePurse:
+ case PeerPushDebitStatus.SuspendedCreatePurse: {
+ const sel = coinSelRes.result;
+ rec.coinSel = {
+ coinPubs: sel.coins.map((x) => x.coinPub),
+ contributions: sel.coins.map((x) => x.contribution),
+ };
+ return TransitionResultType.Transition
}
- await tx.peerPushDebit.put(myPpi);
- await ctx.updateTransactionMeta(tx);
- },
- );
+ default:
+ return TransitionResultType.Stay
+ }
+ })
return TaskRunResult.progress();
}
@@ -640,24 +629,21 @@ async function processPeerPushDebitCreateReserve(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise<TaskRunResult> {
- const pursePub = peerPushInitiation.pursePub;
- const purseExpiration = peerPushInitiation.purseExpiration;
- const hContractTerms = peerPushInitiation.contractTermsHash;
+ const { pursePub, purseExpiration, contractTermsHash } = peerPushInitiation;
const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
- const transactionId = ctx.transactionId;
- logger.trace(`processing ${transactionId} pending(create-reserve)`);
+ logger.trace(`processing ${ctx.transactionId} pending(create-reserve)`);
const contractTermsRecord = await wex.db.runReadOnlyTx(
{ storeNames: ["contractTerms"] },
async (tx) => {
- return tx.contractTerms.get(hContractTerms);
+ return tx.contractTerms.get(contractTermsHash);
},
);
if (!contractTermsRecord) {
throw Error(
- `db invariant failed, contract terms for ${transactionId} missing`,
+ `db invariant failed, contract terms for ${ctx.transactionId} missing`,
);
}
@@ -683,31 +669,27 @@ async function processPeerPushDebitCreateReserve(
default:
assertUnreachable(coinSelRes);
}
- const transitionDone = await wex.db.runReadWriteTx(
+
+ let transitionDone = false
+ await ctx.transition(
{
- storeNames: [
+ extraStores: [
"coinAvailability",
"coinHistory",
"coins",
"contractTerms",
"denominations",
"exchanges",
- "peerPushDebit",
"refreshGroups",
"refreshSessions",
- "transactionsMeta",
],
},
- async (tx) => {
- const ppi = await tx.peerPushDebit.get(pursePub);
- if (!ppi) {
- return false;
- }
- if (ppi.coinSel) {
- return false;
+ async (rec, tx) => {
+ if (rec.coinSel != null) {
+ return TransitionResultType.Stay;
}
- ppi.coinSel = {
+ rec.coinSel = {
coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
contributions: coinSelRes.result.coins.map((x) => x.contribution),
};
@@ -723,21 +705,21 @@ async function processPeerPushDebitCreateReserve(
refreshReason: RefreshReason.PayPeerPush,
});
- await tx.peerPushDebit.put(ppi);
- await ctx.updateTransactionMeta(tx);
- return true;
+ transitionDone = true;
+ return TransitionResultType.Transition
},
);
if (transitionDone) {
return TaskRunResult.progress();
+ } else {
+ return TaskRunResult.backoff();
}
- return TaskRunResult.backoff();
}
const purseAmount = peerPushInitiation.amount;
const purseSigResp = await wex.cryptoApi.signPurseCreation({
- hContractTerms,
+ hContractTerms: contractTermsHash,
mergePub: peerPushInitiation.mergePub,
minAge: 0,
purseAmount,
@@ -791,7 +773,7 @@ async function processPeerPushDebitCreateReserve(
amount: purseAmount,
merge_pub: peerPushInitiation.mergePub,
purse_sig: purseSigResp.sig,
- h_contract_terms: hContractTerms,
+ h_contract_terms: contractTermsHash,
purse_expiration: timestampProtocolFromDb(purseExpiration),
deposits: depositSigsResp.deposits,
min_age: 0,
@@ -889,10 +871,7 @@ async function processPeerPushDebitCreateReserve(
logger.trace(`purse status: ${j2s(purseStatus)}`);
}
- await transitionPeerPushDebitTransaction(wex, pursePub, {
- stFrom: PeerPushDebitStatus.PendingCreatePurse,
- stTo: PeerPushDebitStatus.PendingReady,
- });
+ await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady)
return TaskRunResult.backoff();
}
@@ -920,105 +899,49 @@ async function processPeerPushDebitAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await wex.db.runReadWriteTx(
- {
- storeNames: [
- "coinAvailability",
- "coinHistory",
- "coins",
- "denominations",
- "peerPushDebit",
- "refreshGroups",
- "refreshSessions",
- "transactionsMeta",
- ],
- },
- async (tx) => {
- const ppiRec = await tx.peerPushDebit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== PeerPushDebitStatus.AbortingDeletePurse) {
- return undefined;
- }
- const currency = Amounts.currencyOf(ppiRec.amount);
- const oldTxState = computePeerPushDebitTransactionState(ppiRec);
- const coinPubs: CoinRefreshRequest[] = [];
-
- if (!ppiRec.coinSel) {
- return undefined;
- }
+ await ctx.transition({
+ extraStores: [
+ "coinAvailability",
+ "coinHistory",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "refreshSessions",
+ ]
+ }, async (rec, tx) => {
+ if (rec.status !== PeerPushDebitStatus.AbortingDeletePurse) {
+ return TransitionResultType.Stay;
+ }
+ const currency = Amounts.currencyOf(rec.amount);
+ const coinPubs: CoinRefreshRequest[] = [];
- for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
- coinPubs.push({
- amount: ppiRec.coinSel.contributions[i],
- coinPub: ppiRec.coinSel.coinPubs[i],
- });
- }
+ if (!rec.coinSel) {
+ return TransitionResultType.Stay;
+ }
- ppiRec.status = PeerPushDebitStatus.Aborted;
+ for (let i = 0; i < rec.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: rec.coinSel.contributions[i],
+ coinPub: rec.coinSel.coinPubs[i],
+ });
+ }
+ rec.status = PeerPushDebitStatus.Aborted;
- const refresh = await createRefreshGroup(
- wex,
- tx,
- currency,
- coinPubs,
- RefreshReason.AbortPeerPushDebit,
- ctx.transactionId,
- );
- ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
- await tx.peerPushDebit.put(ppiRec);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPushDebitTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ const refresh = await createRefreshGroup(
+ wex,
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ ctx.transactionId,
+ );
+ rec.abortRefreshGroupId = refresh.refreshGroupId;
+ return TransitionResultType.Transition
+ })
return TaskRunResult.backoff();
}
-interface SimpleTransition {
- stFrom: PeerPushDebitStatus;
- stTo: PeerPushDebitStatus;
-}
-
-// FIXME: This should be a transition on the peer push debit transaction context!
-async function transitionPeerPushDebitTransaction(
- wex: WalletExecutionContext,
- pursePub: string,
- transitionSpec: SimpleTransition,
-): Promise<void> {
- const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "transactionsMeta"] },
- async (tx) => {
- const ppiRec = await tx.peerPushDebit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== transitionSpec.stFrom) {
- return undefined;
- }
- const oldTxState = computePeerPushDebitTransactionState(ppiRec);
- ppiRec.status = transitionSpec.stTo;
- await tx.peerPushDebit.put(ppiRec);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPushDebitTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
-}
-
/**
* Process the "pending(ready)" state of a peer-push-debit transaction.
*/
@@ -1055,74 +978,49 @@ async function processPeerPushDebitReady(
if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
return TaskRunResult.backoff();
} else {
- await transitionPeerPushDebitTransaction(
- wex,
- peerPushInitiation.pursePub,
- {
- stFrom: PeerPushDebitStatus.PendingReady,
- stTo: PeerPushDebitStatus.Done,
- },
- );
+ await ctx.transitionStatus(PeerPushDebitStatus.PendingReady, PeerPushDebitStatus.Done);
return TaskRunResult.progress();
}
} else if (resp.status === HttpStatusCode.Gone) {
logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- const transitionInfo = await wex.db.runReadWriteTx(
- {
- storeNames: [
- "coinAvailability",
- "coinHistory",
- "coins",
- "denominations",
- "peerPushDebit",
- "refreshGroups",
- "refreshSessions",
- "transactionsMeta",
- ],
- },
- async (tx) => {
- const ppiRec = await tx.peerPushDebit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
- return undefined;
+ await ctx.transition({
+ extraStores: [
+ "coinAvailability",
+ "coinHistory",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "refreshSessions",
+ ]
+ }, async (rec, tx) => {
+ if (rec.status !== PeerPushDebitStatus.PendingReady) {
+ return TransitionResultType.Stay;
+ }
+ const currency = Amounts.currencyOf(rec.amount);
+ const coinPubs: CoinRefreshRequest[] = [];
+
+ if (rec.coinSel) {
+ for (let i = 0; i < rec.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: rec.coinSel.contributions[i],
+ coinPub: rec.coinSel.coinPubs[i],
+ });
}
- const currency = Amounts.currencyOf(ppiRec.amount);
- const oldTxState = computePeerPushDebitTransactionState(ppiRec);
- const coinPubs: CoinRefreshRequest[] = [];
-
- if (ppiRec.coinSel) {
- for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
- coinPubs.push({
- amount: ppiRec.coinSel.contributions[i],
- coinPub: ppiRec.coinSel.coinPubs[i],
- });
- }
- const refresh = await createRefreshGroup(
- wex,
- tx,
- currency,
- coinPubs,
- RefreshReason.AbortPeerPushDebit,
- ctx.transactionId,
- );
+ const refresh = await createRefreshGroup(
+ wex,
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ ctx.transactionId,
+ );
- ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
- }
- ppiRec.status = PeerPushDebitStatus.Aborted;
- await tx.peerPushDebit.put(ppiRec);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPushDebitTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ rec.abortRefreshGroupId = refresh.refreshGroupId;
+ }
+ rec.status = PeerPushDebitStatus.Aborted;
+ return TransitionResultType.Transition
+ })
return TaskRunResult.backoff();
} else {
logger.warn(`unexpected HTTP status for purse: ${resp.status}`);