commit cc78da1ddfd8148d9be314dc7038a887c8974c5f
parent 89bc314de7aa7613c2f54a4c2b9ca3d13a5f1e78
Author: Florian Dold <florian@dold.me>
Date: Thu, 13 Nov 2025 12:24:41 +0100
refactor pay-merchant state transitions
Diffstat:
7 files changed, 306 insertions(+), 375 deletions(-)
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
@@ -38,6 +38,8 @@ import {
TombstoneIdStr,
Transaction,
TransactionIdStr,
+ TransactionMajorState,
+ TransactionState,
WalletNotification,
assertUnreachable,
checkDbInvariant,
@@ -68,7 +70,7 @@ import {
} from "./db.js";
import { ReadyExchangeSummary } from "./exchanges.js";
import { createRefreshGroup } from "./refresh.js";
-import { BalanceEffect } from "./transactions.js";
+import { BalanceEffect, applyNotifyTransition } from "./transactions.js";
import { WalletExecutionContext, getDenomInfo } from "./wallet.js";
const logger = new Logger("operations/common.ts");
@@ -1087,3 +1089,52 @@ export async function cancelableFetch(
cancellationToken: wex.cancellationToken,
});
}
+
+export interface RecordHandle<T> {
+ rec: T | undefined;
+ update(newRec: T | undefined, eff?: BalanceEffect): Promise<void>;
+}
+
+/**
+ * Generic helper for transitioning transactions.
+ */
+export async function getGenericRecordHandle<T>(
+ ctx: TransactionContext,
+ tx: WalletDbReadWriteTransaction<any>,
+ getRec: () => Promise<T | undefined>,
+ getSt: (r: T) => TransactionState,
+ getStId: (r: T) => number,
+): Promise<[T | undefined, RecordHandle<T>]> {
+ const rec = await getRec();
+ let oldTxState: TransactionState;
+ let oldStId: number;
+ if (rec) {
+ oldTxState = getSt(rec);
+ oldStId = getStId(rec);
+ } else {
+ oldTxState = { major: TransactionMajorState.None };
+ oldStId = 0;
+ }
+ const update = async (newRec: T | undefined, eff?: BalanceEffect) => {
+ let newTxState: TransactionState;
+ let newStId: number;
+ if (newRec != null) {
+ newTxState = getSt(newRec);
+ newStId = getStId(newRec);
+ } else if (rec != null) {
+ newTxState = { major: TransactionMajorState.Deleted };
+ newStId = -1;
+ } else {
+ // Neither old or new record. Nothing to do.
+ return;
+ }
+ applyNotifyTransition(tx.notify, ctx.transactionId, {
+ oldTxState,
+ newTxState,
+ balanceEffect: eff ?? BalanceEffect.Any,
+ oldStId,
+ newStId,
+ });
+ };
+ return [rec, { rec, update }];
+}
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
@@ -3102,9 +3102,8 @@ async function purgeExchange(
tx: WalletDbAllStoresReadWriteTransaction,
exchangeRec: ExchangeEntryRecord,
purgeTransactions?: boolean,
-): Promise<{ notifs: WalletNotification[] }> {
+): Promise<void> {
const exchangeBaseUrl = exchangeRec.baseUrl;
- const notifs: WalletNotification[] = [];
const detRecs = await tx.exchangeDetails.indexes.byExchangeBaseUrl.getAll();
// Remove all exchange detail records for that exchange
for (const r of detRecs) {
@@ -3126,7 +3125,7 @@ async function purgeExchange(
const oldExchangeState = getExchangeState(exchangeRec);
await tx.exchanges.delete(exchangeBaseUrl);
- notifs.push({
+ tx.notify({
type: NotificationType.ExchangeStateTransition,
oldExchangeState,
newExchangeState: undefined,
@@ -3172,8 +3171,7 @@ async function purgeExchange(
);
for (const wg of withdrawalGroupRecs) {
const ctx = new WithdrawTransactionContext(wex, wg.withdrawalGroupId);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
}
@@ -3183,8 +3181,7 @@ async function purgeExchange(
await tx.refreshGroups.iter().forEachAsync(async (rg) => {
if (rg.infoPerExchange && rg.infoPerExchange[exchangeBaseUrl] != null) {
const ctx = new RefreshTransactionContext(wex, rg.refreshGroupId);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3194,8 +3191,7 @@ async function purgeExchange(
await tx.recoupGroups.indexes.byExchangeBaseUrl.getAll(exchangeBaseUrl);
for (const rg of recoupGroups) {
const ctx = new RecoupTransactionContext(wex, rg.recoupGroupId);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
}
// Remove from deposits
@@ -3204,8 +3200,7 @@ async function purgeExchange(
await tx.depositGroups.iter().forEachAsync(async (dg) => {
if (dg.infoPerExchange && dg.infoPerExchange[exchangeBaseUrl]) {
const ctx = new DepositTransactionContext(wex, dg.depositGroupId);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3219,12 +3214,10 @@ async function purgeExchange(
);
for (const r of refunds) {
const refundCtx = new RefundTransactionContext(wex, r.refundGroupId);
- const res = await refundCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await refundCtx.deleteTransactionInTx(tx);
}
const payCtx = new PayMerchantTransactionContext(wex, purch.proposalId);
const res = await payCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
}
}
// Remove from peerPullCredit
@@ -3232,8 +3225,7 @@ async function purgeExchange(
await tx.peerPullCredit.iter().forEachAsync(async (rec) => {
if (rec.exchangeBaseUrl === exchangeBaseUrl) {
const ctx = new PeerPullCreditTransactionContext(wex, rec.pursePub);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3245,8 +3237,7 @@ async function purgeExchange(
wex,
rec.peerPullDebitId,
);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3258,8 +3249,7 @@ async function purgeExchange(
wex,
rec.peerPushCreditId,
);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3268,8 +3258,7 @@ async function purgeExchange(
await tx.peerPushDebit.iter().forEachAsync(async (rec) => {
if (rec.exchangeBaseUrl === exchangeBaseUrl) {
const ctx = new PeerPushDebitTransactionContext(wex, rec.pursePub);
- const res = await ctx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await ctx.deleteTransactionInTx(tx);
}
});
}
@@ -3277,8 +3266,6 @@ async function purgeExchange(
// FIXME: Is this even necessary? Each deletion should already do it.
await rematerializeTransactions(wex, tx);
-
- return { notifs };
}
export async function deleteExchange(
@@ -3287,7 +3274,7 @@ export async function deleteExchange(
): Promise<void> {
let inUse: boolean = false;
const exchangeBaseUrl = req.exchangeBaseUrl;
- const notifs = await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
const exchangeRec = await tx.exchanges.get(exchangeBaseUrl);
if (!exchangeRec) {
// Nothing to delete!
@@ -3299,9 +3286,9 @@ export async function deleteExchange(
inUse = true;
return;
}
- const purgeRes = await purgeExchange(wex, tx, exchangeRec, true);
+ await purgeExchange(wex, tx, exchangeRec, true);
wex.ws.exchangeCache.clear();
- return purgeRes.notifs;
+ return;
});
if (inUse) {
@@ -3310,9 +3297,6 @@ export async function deleteExchange(
hint: "Exchange in use.",
});
}
- for (const notif of notifs ?? []) {
- wex.ws.notify(notif);
- }
}
export async function getExchangeResources(
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -134,8 +134,10 @@ import {
constructTaskIdentifier,
genericWaitForState,
genericWaitForStateVal,
+ getGenericRecordHandle,
LookupFullTransactionOpts,
PendingTaskType,
+ RecordHandle,
spendCoins,
spendTokens,
TaskIdentifiers,
@@ -143,7 +145,6 @@ import {
TaskRunResult,
TaskRunResultType,
TransactionContext,
- TransitionResultType,
} from "./common.js";
import { EddsaKeyPairStrings } from "./crypto/cryptoImplementation.js";
import {
@@ -170,7 +171,6 @@ import {
WalletDbHelpers,
WalletDbReadOnlyTransaction,
WalletDbReadWriteTransaction,
- WalletDbStoresArr,
} from "./db.js";
import { acceptDonauBlindSigs, generateDonauPlanchets } from "./donau.js";
import { getScopeForAllCoins, getScopeForAllExchanges } from "./exchanges.js";
@@ -202,6 +202,11 @@ import {
*/
const logger = new Logger("pay-merchant.ts");
+/**
+ * Context for a merchant pay transaction.
+ *
+ * Used in every task and request handlers.
+ */
export class PayMerchantTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
readonly taskId: TaskIdStr;
@@ -387,84 +392,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
};
}
- /**
- * Transition a payment transition.
- */
- async transition(
- f: (rec: PurchaseRecord) => Promise<TransitionResultType>,
- ): Promise<void> {
- return this.transitionExtra(
- {
- extraStores: [],
- },
- f,
- );
- }
-
- /**
- * Transition a payment transition.
- * Extra object stores may be accessed during the transition.
- */
- async transitionExtra<StoreNameArray extends WalletDbStoresArr = []>(
- opts: { extraStores: StoreNameArray },
- f: (
- rec: PurchaseRecord,
- tx: WalletDbReadWriteTransaction<
- ["purchases", "transactionsMeta", ...StoreNameArray]
- >,
- ) => Promise<TransitionResultType>,
- ): Promise<void> {
- const ws = this.wex;
- const extraStores = opts.extraStores ?? [];
- await ws.db.runReadWriteTx(
- { storeNames: ["purchases", "transactionsMeta", ...extraStores] },
- async (tx) => {
- const purchaseRec = await tx.purchases.get(this.proposalId);
- if (!purchaseRec) {
- throw Error("purchase not found anymore");
- }
- const oldTxState = computePayMerchantTransactionState(purchaseRec);
- const oldStId = purchaseRec.purchaseStatus;
- const res = await f(purchaseRec, tx);
- switch (res) {
- case TransitionResultType.Transition: {
- await tx.purchases.put(purchaseRec);
- await this.updateTransactionMeta(tx);
- const newTxState = computePayMerchantTransactionState(purchaseRec);
- const newStId = purchaseRec.purchaseStatus;
- applyNotifyTransition(tx.notify, this.transactionId, {
- oldTxState,
- newTxState,
- // FIXME: The transition function should really return the effect
- // and not just the status.
- balanceEffect: BalanceEffect.Any,
- oldStId,
- newStId,
- });
- return;
- }
- case TransitionResultType.Delete:
- await tx.purchases.delete(this.proposalId);
- await this.updateTransactionMeta(tx);
- applyNotifyTransition(tx.notify, this.transactionId, {
- oldTxState,
- newTxState: {
- major: TransactionMajorState.None,
- },
- balanceEffect: BalanceEffect.Any,
- oldStId,
- newStId: -1,
- });
- return;
- default:
- return undefined;
- }
- },
- );
- }
-
async deleteTransaction(): Promise<void> {
- const res = await this.wex.db.runReadWriteTx(
+ await this.wex.db.runReadWriteTx(
{
storeNames: ["purchases", "tombstones", "transactionsMeta"],
},
@@ -472,25 +401,26 @@ export class PayMerchantTransactionContext implements TransactionContext {
return this.deleteTransactionInTx(tx);
},
);
- for (const notif of res.notifs) {
- this.wex.ws.notify(notif);
- }
}
async deleteTransactionInTx(
tx: WalletDbReadWriteTransaction<
["purchases", "tombstones", "transactionsMeta"]
>,
- ): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.purchases.get(this.proposalId);
+ opts: { keepRelated?: boolean } = {},
+ ): Promise<void> {
+ const [rec, h] = await this.getRecordHandle(tx);
if (!rec) {
- return { notifs };
+ return;
}
let relatedTransactions: PurchaseRecord[] = [];
// Automatically delete transactions that are a repurchase of this transaction,
// as they're typically hidden.
- if (rec.download?.fulfillmentUrl) {
+ if (
+ !opts?.keepRelated &&
+ rec.download?.fulfillmentUrl &&
+ rec.purchaseStatus !== PurchaseStatus.DoneRepurchaseDetected
+ ) {
const otherTransactions =
await tx.purchases.indexes.byFulfillmentUrl.getAll(
rec.download.fulfillmentUrl,
@@ -509,31 +439,9 @@ export class PayMerchantTransactionContext implements TransactionContext {
this.wex,
rt.proposalId,
);
- await tx.purchases.delete(rt.proposalId);
- await otherCtx.updateTransactionMeta(tx);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState: computePayMerchantTransactionState(rt),
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- newStId: -1,
- });
+ await otherCtx.deleteTransactionInTx(tx, { keepRelated: true });
}
- const oldTxState = computePayMerchantTransactionState(rec);
- await tx.purchases.delete(rec.proposalId);
- await this.updateTransactionMeta(tx);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- newStId: -1,
- });
- return { notifs };
+ await h.update(undefined);
}
async suspendTransaction(): Promise<void> {
@@ -568,168 +476,115 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, proposalId, transactionId } = this;
- await wex.db.runReadWriteTx(
- {
- storeNames: [
- "coinAvailability",
- "coinHistory",
- "coins",
- "denominations",
- "operationRetries",
- "purchases",
- "refreshGroups",
- "refreshSessions",
- "transactionsMeta",
- ],
- },
- async (tx) => {
- const purchase = await tx.purchases.get(proposalId);
- if (!purchase) {
- throw Error("purchase not found");
- }
- const oldTxState = computePayMerchantTransactionState(purchase);
- const oldStId = purchase.purchaseStatus;
- switch (oldStId) {
- case PurchaseStatus.Done:
- return;
- case PurchaseStatus.PendingPaying:
- case PurchaseStatus.SuspendedPaying: {
- purchase.abortReason = reason;
- purchase.purchaseStatus = PurchaseStatus.AbortingWithRefund;
- if (purchase.payInfo && purchase.payInfo.payCoinSelection) {
- const coinSel = purchase.payInfo.payCoinSelection;
- const currency = Amounts.currencyOf(
- purchase.payInfo.totalPayCost,
- );
- const refreshCoins: CoinRefreshRequest[] = [];
- for (let i = 0; i < coinSel.coinPubs.length; i++) {
- refreshCoins.push({
- amount: coinSel.coinContributions[i],
- coinPub: coinSel.coinPubs[i],
- });
- }
- await createRefreshGroup(
- wex,
- tx,
- currency,
- refreshCoins,
- RefreshReason.AbortPay,
- this.transactionId,
- );
+ const { wex } = this;
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [purchase, h] = await this.getRecordHandle(tx);
+ if (!purchase) {
+ throw Error("purchase not found");
+ }
+ switch (purchase.purchaseStatus) {
+ case PurchaseStatus.Done:
+ return;
+ case PurchaseStatus.PendingPaying:
+ case PurchaseStatus.SuspendedPaying: {
+ purchase.abortReason = reason;
+ purchase.purchaseStatus = PurchaseStatus.AbortingWithRefund;
+ if (purchase.payInfo && purchase.payInfo.payCoinSelection) {
+ const coinSel = purchase.payInfo.payCoinSelection;
+ const currency = Amounts.currencyOf(purchase.payInfo.totalPayCost);
+ const refreshCoins: CoinRefreshRequest[] = [];
+ for (let i = 0; i < coinSel.coinPubs.length; i++) {
+ refreshCoins.push({
+ amount: coinSel.coinContributions[i],
+ coinPub: coinSel.coinPubs[i],
+ });
}
- break;
+ await createRefreshGroup(
+ wex,
+ tx,
+ currency,
+ refreshCoins,
+ RefreshReason.AbortPay,
+ this.transactionId,
+ );
}
- case PurchaseStatus.PendingQueryingAutoRefund:
- case PurchaseStatus.SuspendedQueryingAutoRefund:
- case PurchaseStatus.PendingAcceptRefund:
- case PurchaseStatus.SuspendedPendingAcceptRefund:
- case PurchaseStatus.PendingQueryingRefund:
- case PurchaseStatus.SuspendedQueryingRefund:
- if (!purchase.timestampFirstSuccessfulPay) {
- throw Error("invalid state");
- }
- purchase.purchaseStatus = PurchaseStatus.Done;
- break;
- case PurchaseStatus.DialogProposed:
- purchase.purchaseStatus = PurchaseStatus.AbortedProposalRefused;
- break;
- default:
- return;
+ break;
}
- await tx.purchases.put(purchase);
- await this.updateTransactionMeta(tx);
- const newTxState = computePayMerchantTransactionState(purchase);
- const newStId = purchase.purchaseStatus;
- applyNotifyTransition(tx.notify, transactionId, {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- oldStId,
- newStId,
- });
- },
- );
+ case PurchaseStatus.PendingQueryingAutoRefund:
+ case PurchaseStatus.SuspendedQueryingAutoRefund:
+ case PurchaseStatus.PendingAcceptRefund:
+ case PurchaseStatus.SuspendedPendingAcceptRefund:
+ case PurchaseStatus.PendingQueryingRefund:
+ case PurchaseStatus.SuspendedQueryingRefund:
+ if (!purchase.timestampFirstSuccessfulPay) {
+ throw Error("invalid state");
+ }
+ purchase.purchaseStatus = PurchaseStatus.Done;
+ break;
+ case PurchaseStatus.DialogProposed:
+ purchase.purchaseStatus = PurchaseStatus.AbortedProposalRefused;
+ break;
+ default:
+ return;
+ }
+ await h.update(purchase);
+ });
wex.taskScheduler.stopShepherdTask(this.taskId);
wex.taskScheduler.startShepherdTask(this.taskId);
}
async resumeTransaction(): Promise<void> {
- const { wex, proposalId, transactionId } = this;
+ const { wex } = this;
await wex.db.runReadWriteTx(
{ storeNames: ["purchases", "transactionsMeta"] },
async (tx) => {
- const purchase = await tx.purchases.get(proposalId);
+ const [purchase, h] = await this.getRecordHandle(tx);
if (!purchase) {
throw Error("purchase not found");
}
- const oldTxState = computePayMerchantTransactionState(purchase);
- const oldStId = purchase.purchaseStatus;
let newStatus = transitionResume[purchase.purchaseStatus];
- if (!newStatus) {
- return undefined;
+ if (!newStatus?.next) {
+ return;
}
- await tx.purchases.put(purchase);
- await this.updateTransactionMeta(tx);
- const newTxState = computePayMerchantTransactionState(purchase);
- const newStId = purchase.purchaseStatus;
- applyNotifyTransition(tx.notify, transactionId, {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- oldStId,
- newStId,
- });
+ purchase.purchaseStatus = newStatus.next;
+ await h.update(purchase, BalanceEffect.Any);
},
);
wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, proposalId, transactionId } = this;
- await wex.db.runReadWriteTx(
- {
- storeNames: [
- "purchases",
- "refreshGroups",
- "denominations",
- "coinAvailability",
- "coins",
- "operationRetries",
- "transactionsMeta",
- ],
- },
- async (tx) => {
- const purchase = await tx.purchases.get(proposalId);
- if (!purchase) {
- throw Error("purchase not found");
- }
- const oldTxState = computePayMerchantTransactionState(purchase);
- const oldStId = purchase.purchaseStatus;
- let newState: PurchaseStatus | undefined = undefined;
- switch (purchase.purchaseStatus) {
- case PurchaseStatus.AbortingWithRefund:
- newState = PurchaseStatus.FailedAbort;
- break;
- }
- if (newState) {
- purchase.purchaseStatus = newState;
+ const { wex } = this;
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [purchase, h] = await this.getRecordHandle(tx);
+ if (!purchase) {
+ throw Error("purchase not found");
+ }
+ switch (purchase.purchaseStatus) {
+ case PurchaseStatus.AbortingWithRefund:
+ purchase.purchaseStatus = PurchaseStatus.FailedAbort;
purchase.failReason = reason;
- await tx.purchases.put(purchase);
- }
- await this.updateTransactionMeta(tx);
- const newTxState = computePayMerchantTransactionState(purchase);
- const newStId = purchase.purchaseStatus;
- applyNotifyTransition(tx.notify, transactionId, {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- oldStId,
- newStId,
- });
- },
+ break;
+ default:
+ return;
+ }
+ await h.update(purchase, BalanceEffect.Any);
+ tx._util.scheduleOnCommit(() =>
+ wex.taskScheduler.stopShepherdTask(this.taskId),
+ );
+ });
+ }
+
+ async getRecordHandle(
+ tx: WalletDbReadWriteTransaction<["purchases", "transactionsMeta"]>,
+ ): Promise<[PurchaseRecord | undefined, RecordHandle<PurchaseRecord>]> {
+ return getGenericRecordHandle<PurchaseRecord>(
+ this,
+ tx as any,
+ async () => tx.purchases.get(this.proposalId),
+ (r) => computePayMerchantTransactionState(r),
+ (r) => r.purchaseStatus,
);
- wex.taskScheduler.stopShepherdTask(this.taskId);
}
}
@@ -1499,19 +1354,20 @@ async function createOrReusePurchase(
// if this transaction was shared and the order is paid then it
// means that another wallet already paid the proposal
if (paid) {
- await oldCtx.transition(async (p) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [rec, h] = await oldCtx.getRecordHandle(tx);
// The order is only paid by another wallet
// if the merchant says it's paid but the local
// wallet is still in a dialog state.
- switch (p.purchaseStatus) {
+ switch (rec?.purchaseStatus) {
case PurchaseStatus.DialogProposed:
case PurchaseStatus.DialogShared:
break;
default:
- return TransitionResultType.Stay;
+ return;
}
- p.purchaseStatus = PurchaseStatus.FailedPaidByOther;
- return TransitionResultType.Transition;
+ rec.purchaseStatus = PurchaseStatus.FailedPaidByOther;
+ await h.update(rec);
});
}
}
@@ -1604,52 +1460,51 @@ async function storeFirstPaySuccess(
): Promise<void> {
const ctx = new PayMerchantTransactionContext(wex, proposalId);
const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now());
- await ctx.transitionExtra(
- {
- extraStores: ["contractTerms"],
- },
- async (purchase, tx) => {
- const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
- if (!isFirst) {
- logger.warn("payment success already stored");
- return TransitionResultType.Stay;
- }
- if (purchase.purchaseStatus === PurchaseStatus.PendingPaying) {
- purchase.purchaseStatus = PurchaseStatus.Done;
- }
- purchase.timestampFirstSuccessfulPay = timestampPreciseToDb(now);
- purchase.lastSessionId = sessionId;
- purchase.merchantPaySig = payResponse.sig;
- purchase.posConfirmation = payResponse.pos_confirmation;
- const dl = purchase.download;
- checkDbInvariant(
- !!dl,
- `purchase ${purchase.orderId} without ct downloaded`,
- );
- const contractTermsRecord = await tx.contractTerms.get(
- dl.contractTermsHash,
- );
- checkDbInvariant(
- !!contractTermsRecord,
- `no contract terms found for purchase ${purchase.orderId}`,
- );
- const contractTerms = codecForMerchantContractTerms().decode(
- contractTermsRecord.contractTermsRaw,
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [purchase, h] = await ctx.getRecordHandle(tx);
+ if (!purchase) {
+ return;
+ }
+ const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
+ if (!isFirst) {
+ logger.warn("payment success already stored");
+ return;
+ }
+ if (purchase.purchaseStatus === PurchaseStatus.PendingPaying) {
+ purchase.purchaseStatus = PurchaseStatus.Done;
+ }
+ purchase.timestampFirstSuccessfulPay = timestampPreciseToDb(now);
+ purchase.lastSessionId = sessionId;
+ purchase.merchantPaySig = payResponse.sig;
+ purchase.posConfirmation = payResponse.pos_confirmation;
+ const dl = purchase.download;
+ checkDbInvariant(
+ !!dl,
+ `purchase ${purchase.orderId} without ct downloaded`,
+ );
+ const contractTermsRecord = await tx.contractTerms.get(
+ dl.contractTermsHash,
+ );
+ checkDbInvariant(
+ !!contractTermsRecord,
+ `no contract terms found for purchase ${purchase.orderId}`,
+ );
+ const contractTerms = codecForMerchantContractTerms().decode(
+ contractTermsRecord.contractTermsRaw,
+ );
+ const protoAr = contractTerms.auto_refund;
+ if (protoAr) {
+ const ar = Duration.fromTalerProtocolDuration(protoAr);
+ logger.info("auto_refund present");
+ purchase.purchaseStatus = PurchaseStatus.FinalizingQueryingAutoRefund;
+ purchase.autoRefundDeadline = timestampProtocolToDb(
+ AbsoluteTime.toProtocolTimestamp(
+ AbsoluteTime.addDuration(AbsoluteTime.now(), ar),
+ ),
);
- const protoAr = contractTerms.auto_refund;
- if (protoAr) {
- const ar = Duration.fromTalerProtocolDuration(protoAr);
- logger.info("auto_refund present");
- purchase.purchaseStatus = PurchaseStatus.FinalizingQueryingAutoRefund;
- purchase.autoRefundDeadline = timestampProtocolToDb(
- AbsoluteTime.toProtocolTimestamp(
- AbsoluteTime.addDuration(AbsoluteTime.now(), ar),
- ),
- );
- }
- return TransitionResultType.Transition;
- },
- );
+ }
+ await h.update(purchase);
+ });
}
async function storePayReplaySuccess(
@@ -1658,7 +1513,11 @@ async function storePayReplaySuccess(
sessionId: string | undefined,
): Promise<void> {
const ctx = new PayMerchantTransactionContext(wex, proposalId);
- await ctx.transition(async (purchase) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [purchase, h] = await ctx.getRecordHandle(tx);
+ if (!purchase) {
+ return;
+ }
const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
if (isFirst) {
throw Error("invalid payment state");
@@ -1670,7 +1529,7 @@ async function storePayReplaySuccess(
break;
}
purchase.lastSessionId = sessionId;
- return TransitionResultType.Transition;
+ await h.update(purchase);
});
}
@@ -2080,10 +1939,14 @@ async function checkPaymentByProposalId(
"automatically re-submitting payment with different session ID",
);
logger.trace(`last: ${purchase.lastSessionId}, current: ${sessionId}`);
- await ctx.transition(async (p) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ if (!p) {
+ return;
+ }
p.lastSessionId = sessionId;
p.purchaseStatus = PurchaseStatus.PendingPayingReplay;
- return TransitionResultType.Transition;
+ await h.update(p);
});
wex.taskScheduler.startShepherdTask(ctx.taskId);
@@ -3224,9 +3087,13 @@ async function processPurchasePay(
const paid = await checkIfOrderIsAlreadyPaid(wex, download, false);
if (paid) {
- await ctx.transition(async (p) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ if (!p) {
+ return;
+ }
p.purchaseStatus = PurchaseStatus.FailedPaidByOther;
- return TransitionResultType.Transition;
+ await h.update(p);
});
return {
@@ -3810,16 +3677,17 @@ export async function refuseProposal(
proposalId: string,
): Promise<void> {
const ctx = new PayMerchantTransactionContext(wex, proposalId);
- await ctx.transition(async (proposal) => {
- switch (proposal.purchaseStatus) {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [proposal, h] = await ctx.getRecordHandle(tx);
+ switch (proposal?.purchaseStatus) {
case PurchaseStatus.DialogProposed:
case PurchaseStatus.DialogShared:
break;
default:
- return TransitionResultType.Stay;
+ return;
}
proposal.purchaseStatus = PurchaseStatus.AbortedProposalRefused;
- return TransitionResultType.Transition;
+ await h.update(proposal);
});
}
@@ -4255,9 +4123,13 @@ async function processPurchaseDialogShared(
const paid = await checkIfOrderIsAlreadyPaid(wex, download, true);
if (paid) {
- await ctx.transition(async (p) => {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ if (!p) {
+ return;
+ }
p.purchaseStatus = PurchaseStatus.FailedPaidByOther;
- return TransitionResultType.Transition;
+ await h.update(p);
});
return TaskRunResult.progress();
}
@@ -4316,17 +4188,18 @@ async function processPurchaseAutoRefund(
// is over or the product is already fully refunded.
if (noAutoRefundOrExpired || fullyRefunded) {
- await ctx.transition(async (p) => {
- switch (p.purchaseStatus) {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ switch (p?.purchaseStatus) {
case PurchaseStatus.PendingQueryingAutoRefund:
case PurchaseStatus.FinalizingQueryingAutoRefund:
break;
default:
- return TransitionResultType.Stay;
+ return;
}
p.purchaseStatus = PurchaseStatus.Done;
p.refundAmountAwaiting = undefined;
- return TransitionResultType.Transition;
+ await h.update(p);
});
return TaskRunResult.progress();
}
@@ -4352,16 +4225,17 @@ async function processPurchaseAutoRefund(
return TaskRunResult.longpollReturnedPending();
}
- await ctx.transition(async (p) => {
- switch (p.purchaseStatus) {
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [rec, h] = await ctx.getRecordHandle(tx);
+ switch (rec?.purchaseStatus) {
case PurchaseStatus.PendingQueryingAutoRefund:
case PurchaseStatus.FinalizingQueryingAutoRefund:
break;
default:
- return TransitionResultType.Stay;
+ return;
}
- p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
- return TransitionResultType.Transition;
+ rec.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
+ await h.update(rec);
});
return TaskRunResult.progress();
}
@@ -4420,12 +4294,13 @@ async function processPurchaseAbortingRefund(
TalerErrorCode.MERCHANT_POST_ORDERS_ID_ABORT_CONTRACT_NOT_FOUND
) {
const ctx = new PayMerchantTransactionContext(wex, proposalId);
- await ctx.transition(async (rec) => {
- if (rec.purchaseStatus === PurchaseStatus.AbortingWithRefund) {
- rec.purchaseStatus = PurchaseStatus.AbortedOrderDeleted;
- return TransitionResultType.Transition;
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [rec, h] = await ctx.getRecordHandle(tx);
+ if (rec?.purchaseStatus !== PurchaseStatus.AbortingWithRefund) {
+ return;
}
- return TransitionResultType.Stay;
+ rec.purchaseStatus = PurchaseStatus.AbortedOrderDeleted;
+ await h.update(rec);
});
}
}
@@ -4491,13 +4366,14 @@ async function processPurchaseQueryRefund(
const ctx = new PayMerchantTransactionContext(wex, proposalId);
if (!orderStatus.refund_pending) {
- await ctx.transition(async (p) => {
- if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
- return TransitionResultType.Stay;
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ if (p?.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
+ return;
}
p.purchaseStatus = PurchaseStatus.Done;
p.refundAmountAwaiting = undefined;
- return TransitionResultType.Transition;
+ await h.update(p);
});
} else {
const refundAwaiting = Amounts.sub(
@@ -4505,13 +4381,14 @@ async function processPurchaseQueryRefund(
Amounts.parseOrThrow(orderStatus.refund_taken),
).amount;
- await ctx.transition(async (p) => {
- if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
- return TransitionResultType.Stay;
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const [p, h] = await ctx.getRecordHandle(tx);
+ if (p?.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
+ return;
}
p.refundAmountAwaiting = Amounts.stringify(refundAwaiting);
p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
- return TransitionResultType.Transition;
+ await h.update(p);
});
}
return TaskRunResult.progress();
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -169,8 +169,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
this.wex,
withdrawalGroupId,
);
- const res = await withdrawalCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await withdrawalCtx.deleteTransactionInTx(tx);
}
});
}
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -304,8 +304,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
this.wex,
withdrawalGroupId,
);
- const res = await withdrawalCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
+ await withdrawalCtx.deleteTransactionInTx(tx);
}
});
}
diff --git a/packages/taler-wallet-core/src/query.ts b/packages/taler-wallet-core/src/query.ts
@@ -548,6 +548,11 @@ type ValidateKeyPath<T, P> = P extends `${infer PX extends keyof T &
// foo({x: [0,1,2]}, "x.0");
+export interface TransactionUtil {
+ notify: (w: WalletNotification) => void;
+ scheduleOnCommit(f: () => void): void;
+}
+
export type StoreMap = { [Store: string]: StoreWithIndexes<any, any, any> };
export type StoreNames<Stores extends StoreMap> = keyof Stores;
export type DbReadWriteTransaction<
@@ -559,6 +564,7 @@ export type DbReadWriteTransaction<
Stores[X]["indexMap"]
>;
} & {
+ _util: TransactionUtil;
notify: (w: WalletNotification) => void;
};
@@ -571,6 +577,7 @@ export type DbReadOnlyTransaction<
Stores[X]["indexMap"]
>;
} & {
+ _util: TransactionUtil;
notify: (w: WalletNotification) => void;
};
@@ -728,11 +735,20 @@ function makeTxClientContext(
const ctx: {
[s: string]:
| StoreReadWriteAccessor<any, any>
- | ((notif: WalletNotification) => void);
+ | ((notif: WalletNotification) => void)
+ | TransactionUtil;
} = {
notify(notif: WalletNotification): void {
internalContext.scheduleNotification(notif);
},
+ _util: {
+ notify(notif: WalletNotification): void {
+ internalContext.scheduleNotification(notif);
+ },
+ scheduleOnCommit(f: () => void): void {
+ internalContext.scheduleAfterCommit(f);
+ },
+ },
};
for (const storeAlias in storePick) {
const indexes: { [s: string]: IndexReadWriteAccessor<any> } = {};
@@ -954,6 +970,7 @@ class InternalTransactionContext {
allowWrite: boolean;
abortExn: TransactionAbortedError | undefined;
notifications: WalletNotification[] = [];
+ afterCommitHandlers: (() => void)[];
constructor(
private readonly triggerSpec: TriggerSpec,
@@ -970,6 +987,10 @@ class InternalTransactionContext {
this.notifications.push(notif);
}
+ scheduleAfterCommit(f: () => void) {
+ this.afterCommitHandlers.push(f);
+ }
+
handleAfterCommit() {
if (this.triggerSpec.afterCommit) {
this.triggerSpec.afterCommit({
@@ -979,6 +1000,9 @@ class InternalTransactionContext {
scope: this.storesScope,
});
}
+ for (const f of this.afterCommitHandlers) {
+ f();
+ }
}
throwIfInactive() {
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
@@ -595,20 +595,17 @@ export class WithdrawTransactionContext implements TransactionContext {
return this.deleteTransactionInTx(tx);
},
);
- for (const notif of res.notifs) {
- this.wex.ws.notify(notif);
- }
}
async deleteTransactionInTx(
tx: WalletDbReadWriteTransaction<
["withdrawalGroups", "planchets", "tombstones", "transactionsMeta"]
>,
- ): Promise<{ notifs: WalletNotification[] }> {
+ ): Promise<void> {
const notifs: WalletNotification[] = [];
const rec = await tx.withdrawalGroups.get(this.withdrawalGroupId);
if (!rec) {
- return { notifs };
+ return;
}
const oldTxState = computeWithdrawalTransactionStatus(rec);
await tx.withdrawalGroups.delete(rec.withdrawalGroupId);
@@ -628,7 +625,7 @@ export class WithdrawTransactionContext implements TransactionContext {
},
newStId: -1,
});
- return { notifs };
+ return;
}
async suspendTransaction(): Promise<void> {