commit b1598757f43a5f1e1a5cef0e950ff1a9caafc768
parent f51fd65ad2d0227ac84b1b5e28c774f79f9d7188
Author: Antoine A <>
Date: Tue, 15 Apr 2025 14:52:28 +0200
wallet-core: clean pay-peer-pull-credit
Diffstat:
1 file changed, 293 insertions(+), 566 deletions(-)
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2022-2023 Taler Systems S.A.
+ (C) 2022-2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -14,9 +14,6 @@
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-/**
- * Imports.
- */
import {
Amounts,
CheckPeerPullCreditRequest,
@@ -68,7 +65,6 @@ import {
TaskIdentifiers,
TaskRunResult,
TransactionContext,
- TransitionResult,
TransitionResultType,
constructTaskIdentifier,
genericWaitForStateVal,
@@ -137,31 +133,25 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
/**
- * Transition a peer-pull-credit transaction.
+ * Transition an existing peer-pull-credit transaction.
* Extra object stores may be accessed during the transition.
*/
- async transition<StoreNameArray extends WalletDbStoresArr = []>(
- opts: { extraStores?: StoreNameArray; transactionLabel?: string },
- f: (
- rec: PeerPullCreditRecord | undefined,
+ async transition<StoreNameArray extends WalletDbStoresArr>(
+ opts: { extraStores?: StoreNameArray },
+ lambda: (
+ rec: PeerPullCreditRecord,
tx: WalletDbReadWriteTransaction<
[
"peerPullCredit",
"transactionsMeta",
- "operationRetries",
- "exchanges",
- "exchangeDetails",
...StoreNameArray,
]
>,
- ) => Promise<TransitionResult<PeerPullCreditRecord>>,
+ ) => Promise<TransitionResultType>,
): Promise<TransitionInfo | undefined> {
const baseStores = [
"peerPullCredit" as const,
- "transactionsMeta" as const,
- "operationRetries" as const,
- "exchanges" as const,
- "exchangeDetails" as const,
+ "transactionsMeta" as const
];
const stores = opts.extraStores
? [...baseStores, ...opts.extraStores]
@@ -172,17 +162,14 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
{ storeNames: stores },
async (tx) => {
const rec = await tx.peerPullCredit.get(this.pursePub);
- let oldTxState: TransactionState;
- if (rec) {
- oldTxState = computePeerPullCreditTransactionState(rec);
- } else {
- oldTxState = {
- major: TransactionMajorState.None,
- };
+ if (rec == null) {
+ logger.warn(`peer pull credit ${this.pursePub} not found`);
+ return;
}
- let res: TransitionResult<PeerPullCreditRecord> | undefined;
+ let oldTxState = computePeerPullCreditTransactionState(rec);
+ let res: TransitionResultType;
try {
- res = await f(rec, tx);
+ res = await lambda(rec, tx);
} catch (error) {
if (error instanceof Error) {
errorThrown = error;
@@ -190,54 +177,108 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
}
- switch (res.type) {
+ switch (res) {
case TransitionResultType.Transition: {
- await tx.peerPullCredit.put(res.rec);
- await this.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(res.rec);
+ await tx.peerPullCredit.put(rec)
+ await tx.transactionsMeta.put({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.mergeTimestamp,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ })
+ const newTxState = computePeerPullCreditTransactionState(rec);
return {
oldTxState,
newTxState,
- balanceEffect: res.balanceEffect,
+ balanceEffect: BalanceEffect.Any,
};
}
case TransitionResultType.Delete:
- await tx.peerPullCredit.delete(this.pursePub);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState: {
- major: TransactionMajorState.None,
- },
- balanceEffect: BalanceEffect.None,
- };
- default:
- return undefined;
+ throw new Error("Cannot delete using transition");
+ case TransitionResultType.Stay:
+ return;
}
},
);
- if (errorThrown) {
+ if (errorThrown != null) {
throw errorThrown;
}
notifyTransition(this.wex, this.transactionId, transitionInfo);
return transitionInfo;
}
+ /**
+ * Transition an existing peer-pull-credit transaction status
+ */
+ async transitionStatus(
+ from: PeerPullPaymentCreditStatus,
+ to: PeerPullPaymentCreditStatus
+ ) {
+ await this.transition({}, async (rec) => {
+ if (rec.status !== from) {
+ return TransitionResultType.Stay
+ } else {
+ rec.status = to;
+ return TransitionResultType.Transition
+ }
+ });
+ }
+
async updateTransactionMeta(
tx: WalletDbReadWriteTransaction<["peerPullCredit", "transactionsMeta"]>,
): Promise<void> {
- const ppcRec = await tx.peerPullCredit.get(this.pursePub);
- if (!ppcRec) {
+ const rec = await tx.peerPullCredit.get(this.pursePub);
+ if (rec == null) {
await tx.transactionsMeta.delete(this.transactionId);
- return;
+ } else {
+ await tx.transactionsMeta.put({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.mergeTimestamp,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ });
+ }
+ }
+
+ async deleteTransactionInTx(
+ tx: WalletDbReadWriteTransaction<
+ [
+ "withdrawalGroups",
+ "peerPullCredit",
+ "planchets",
+ "tombstones",
+ "transactionsMeta",
+ ]
+ >,
+ ): Promise<{ notifs: WalletNotification[] }> {
+ const notifs: WalletNotification[] = [];
+ const rec = await tx.peerPullCredit.get(this.pursePub);
+ if (!rec) {
+ return { notifs };
}
- await tx.transactionsMeta.put({
+ const oldTxState = computePeerPullCreditTransactionState(rec);
+ if (rec.withdrawalGroupId) {
+ const withdrawalGroupId = rec.withdrawalGroupId;
+ const withdrawalCtx = new WithdrawTransactionContext(
+ this.wex,
+ withdrawalGroupId,
+ );
+ const res = await withdrawalCtx.deleteTransactionInTx(tx);
+ notifs.push(...res.notifs);
+ }
+ await tx.peerPullCredit.delete(this.pursePub)
+ await tx.transactionsMeta.delete(this.transactionId)
+ notifs.push({
+ type: NotificationType.TransactionStateTransition,
transactionId: this.transactionId,
- status: ppcRec.status,
- timestamp: ppcRec.mergeTimestamp,
- currency: Amounts.currencyOf(ppcRec.amount),
- exchanges: [ppcRec.exchangeBaseUrl],
+ oldTxState,
+ newTxState: {
+ major: TransactionMajorState.Deleted,
+ },
});
+ return { notifs };
}
/**
@@ -291,7 +332,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
const silentWithdrawalErrorForInvoice =
wsrOrt?.lastError &&
wsrOrt.lastError.code ===
- TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
+ TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
Object.values(wsrOrt.lastError.errorsPerCoin ?? {}).every((e) => {
return (
e.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR &&
@@ -331,10 +372,10 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
kycUrl: kycUrl,
...(wsrOrt?.lastError
? {
- error: silentWithdrawalErrorForInvoice
- ? undefined
- : wsrOrt.lastError,
- }
+ error: silentWithdrawalErrorForInvoice
+ ? undefined
+ : wsrOrt.lastError,
+ }
: {}),
};
}
@@ -383,312 +424,167 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
"transactionsMeta",
],
},
- async (tx) => {
- return this.deleteTransactionInTx(tx);
- },
+ this.deleteTransactionInTx
);
for (const notif of res.notifs) {
this.wex.ws.notify(notif);
}
}
- async deleteTransactionInTx(
- tx: WalletDbReadWriteTransaction<
- [
- "withdrawalGroups",
- "peerPullCredit",
- "planchets",
- "tombstones",
- "transactionsMeta",
- ]
- >,
- ): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.peerPullCredit.get(this.pursePub);
- if (!rec) {
- return { notifs };
- }
- const oldTxState = computePeerPullCreditTransactionState(rec);
- if (rec.withdrawalGroupId) {
- const withdrawalGroupId = rec.withdrawalGroupId;
- const withdrawalCtx = new WithdrawTransactionContext(
- this.wex,
- withdrawalGroupId,
- );
- const res = await withdrawalCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
- }
- await tx.peerPullCredit.delete(rec.pursePub);
- await this.updateTransactionMeta(tx);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- });
- return { notifs };
- }
-
async suspendTransaction(): Promise<void> {
- const { wex, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const pullCreditRec = await tx.peerPullCredit.get(pursePub);
- if (!pullCreditRec) {
- logger.warn(`peer pull credit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPullPaymentCreditStatus | undefined = undefined;
- switch (pullCreditRec.status) {
- case PeerPullPaymentCreditStatus.PendingCreatePurse:
- newStatus = PeerPullPaymentCreditStatus.SuspendedCreatePurse;
- break;
- case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
- newStatus = PeerPullPaymentCreditStatus.SuspendedMergeKycRequired;
- break;
- case PeerPullPaymentCreditStatus.PendingWithdrawing:
- newStatus = PeerPullPaymentCreditStatus.SuspendedWithdrawing;
- break;
- case PeerPullPaymentCreditStatus.PendingReady:
- newStatus = PeerPullPaymentCreditStatus.SuspendedReady;
- break;
- case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- newStatus =
- PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse;
- break;
- case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
- newStatus = PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired;
- break;
- case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
- newStatus = PeerPullPaymentCreditStatus.SuspendedBalanceKycInit;
- break;
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
- case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedReady:
- case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
- case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
- case PeerPullPaymentCreditStatus.Done:
- case PeerPullPaymentCreditStatus.Aborted:
- case PeerPullPaymentCreditStatus.Failed:
- case PeerPullPaymentCreditStatus.Expired:
- break;
- default:
- assertUnreachable(pullCreditRec.status);
- }
- if (newStatus != null) {
- const oldTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- pullCreditRec.status = newStatus;
- const newTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- await tx.peerPullCredit.put(pullCreditRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.None,
- };
- }
- return undefined;
- },
- );
- wex.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
+ await this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPullPaymentCreditStatus.PendingCreatePurse:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedCreatePurse;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedMergeKycRequired;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingWithdrawing:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedWithdrawing;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingReady:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedReady;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.AbortingDeletePurse:
+ rec.status =
+ PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
+ rec.status = PeerPullPaymentCreditStatus.SuspendedBalanceKycInit;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
+ case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedReady:
+ case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
+ case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
+ case PeerPullPaymentCreditStatus.Done:
+ case PeerPullPaymentCreditStatus.Aborted:
+ case PeerPullPaymentCreditStatus.Failed:
+ case PeerPullPaymentCreditStatus.Expired:
+ return TransitionResultType.Stay;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.stopShepherdTask(this.taskId);
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const pullCreditRec = await tx.peerPullCredit.get(pursePub);
- if (!pullCreditRec) {
- logger.warn(`peer pull credit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPullPaymentCreditStatus | undefined = undefined;
- switch (pullCreditRec.status) {
- case PeerPullPaymentCreditStatus.PendingCreatePurse:
- case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
- case PeerPullPaymentCreditStatus.PendingWithdrawing:
- case PeerPullPaymentCreditStatus.PendingReady:
- case PeerPullPaymentCreditStatus.Done:
- case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
- case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedReady:
- case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
- case PeerPullPaymentCreditStatus.Aborted:
- case PeerPullPaymentCreditStatus.Failed:
- case PeerPullPaymentCreditStatus.Expired:
- case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
- case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
- break;
- case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
- newStatus = PeerPullPaymentCreditStatus.Failed;
- break;
- default:
- assertUnreachable(pullCreditRec.status);
- }
- if (newStatus != null) {
- const oldTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- pullCreditRec.status = newStatus;
- pullCreditRec.failReason = reason;
- const newTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- await tx.peerPullCredit.put(pullCreditRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- return undefined;
- },
- );
- notifyTransition(wex, transactionId, transitionInfo);
- wex.taskScheduler.stopShepherdTask(retryTag);
+ await this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPullPaymentCreditStatus.PendingCreatePurse:
+ case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
+ case PeerPullPaymentCreditStatus.PendingWithdrawing:
+ case PeerPullPaymentCreditStatus.PendingReady:
+ case PeerPullPaymentCreditStatus.Done:
+ case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
+ case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedReady:
+ case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
+ case PeerPullPaymentCreditStatus.Aborted:
+ case PeerPullPaymentCreditStatus.Failed:
+ case PeerPullPaymentCreditStatus.Expired:
+ case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
+ return TransitionResultType.Stay;
+ case PeerPullPaymentCreditStatus.AbortingDeletePurse:
+ case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
+ rec.status = PeerPullPaymentCreditStatus.Failed;
+ rec.failReason = reason;
+ return TransitionResultType.Transition;
+ default:
+ assertUnreachable(rec.status);
+ }
+ })
+ this.wex.taskScheduler.stopShepherdTask(this.taskId);
}
async resumeTransaction(): Promise<void> {
- const { wex, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const pullCreditRec = await tx.peerPullCredit.get(pursePub);
- if (!pullCreditRec) {
- logger.warn(`peer pull credit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPullPaymentCreditStatus | undefined = undefined;
- switch (pullCreditRec.status) {
- case PeerPullPaymentCreditStatus.PendingCreatePurse:
- case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
- case PeerPullPaymentCreditStatus.PendingWithdrawing:
- case PeerPullPaymentCreditStatus.PendingReady:
- case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
- case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
- case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- case PeerPullPaymentCreditStatus.Done:
- case PeerPullPaymentCreditStatus.Failed:
- case PeerPullPaymentCreditStatus.Expired:
- case PeerPullPaymentCreditStatus.Aborted:
- break;
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
- newStatus = PeerPullPaymentCreditStatus.PendingBalanceKycInit;
- break;
- case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
- newStatus = PeerPullPaymentCreditStatus.PendingCreatePurse;
- break;
- case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
- newStatus = PeerPullPaymentCreditStatus.PendingMergeKycRequired;
- break;
- case PeerPullPaymentCreditStatus.SuspendedReady:
- newStatus = PeerPullPaymentCreditStatus.PendingReady;
- break;
- case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
- newStatus = PeerPullPaymentCreditStatus.PendingWithdrawing;
- break;
- case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
- newStatus = PeerPullPaymentCreditStatus.AbortingDeletePurse;
- break;
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
- newStatus = PeerPullPaymentCreditStatus.PendingBalanceKycRequired;
- break;
- default:
- assertUnreachable(pullCreditRec.status);
- }
- if (newStatus != null) {
- const oldTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- pullCreditRec.status = newStatus;
- const newTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- await tx.peerPullCredit.put(pullCreditRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.None,
- };
- }
- return undefined;
- },
- );
- notifyTransition(wex, transactionId, transitionInfo);
- wex.taskScheduler.startShepherdTask(retryTag);
+ await this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPullPaymentCreditStatus.PendingCreatePurse:
+ case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
+ case PeerPullPaymentCreditStatus.PendingWithdrawing:
+ case PeerPullPaymentCreditStatus.PendingReady:
+ case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
+ case PeerPullPaymentCreditStatus.AbortingDeletePurse:
+ case PeerPullPaymentCreditStatus.Done:
+ case PeerPullPaymentCreditStatus.Failed:
+ case PeerPullPaymentCreditStatus.Expired:
+ case PeerPullPaymentCreditStatus.Aborted:
+ return TransitionResultType.Stay;
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
+ rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycInit;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
+ rec.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
+ rec.status = PeerPullPaymentCreditStatus.PendingMergeKycRequired;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedReady:
+ rec.status = PeerPullPaymentCreditStatus.PendingReady;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
+ rec.status = PeerPullPaymentCreditStatus.PendingWithdrawing;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
+ rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
+ rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycRequired;
+ return TransitionResultType.Transition;
+ default:
+ assertUnreachable(rec.status);
+ }
+ });
+ this.wex.taskScheduler.startShepherdTask(this.taskId);
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- const { wex, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const pullCreditRec = await tx.peerPullCredit.get(pursePub);
- if (!pullCreditRec) {
- logger.warn(`peer pull credit ${pursePub} not found`);
- return;
- }
- let newStatus: PeerPullPaymentCreditStatus | undefined = undefined;
- switch (pullCreditRec.status) {
- case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
- case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
- case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
- case PeerPullPaymentCreditStatus.PendingCreatePurse:
- case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
- newStatus = PeerPullPaymentCreditStatus.AbortingDeletePurse;
- pullCreditRec.abortReason = reason;
- break;
- case PeerPullPaymentCreditStatus.PendingWithdrawing:
- throw Error("can't abort anymore");
- case PeerPullPaymentCreditStatus.PendingReady:
- pullCreditRec.abortReason = reason;
- newStatus = PeerPullPaymentCreditStatus.AbortingDeletePurse;
- break;
- case PeerPullPaymentCreditStatus.Done:
- case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
- case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
- case PeerPullPaymentCreditStatus.SuspendedReady:
- case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
- case PeerPullPaymentCreditStatus.Aborted:
- case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- case PeerPullPaymentCreditStatus.Failed:
- case PeerPullPaymentCreditStatus.Expired:
- case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
- break;
- default:
- assertUnreachable(pullCreditRec.status);
- }
- if (newStatus != null) {
- const oldTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- pullCreditRec.status = newStatus;
- const newTxState =
- computePeerPullCreditTransactionState(pullCreditRec);
- await tx.peerPullCredit.put(pullCreditRec);
- await this.updateTransactionMeta(tx);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- return undefined;
- },
- );
- wex.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(wex, transactionId, transitionInfo);
- wex.taskScheduler.startShepherdTask(retryTag);
+ await this.transition({}, async (rec) => {
+ switch (rec.status) {
+ case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
+ case PeerPullPaymentCreditStatus.PendingBalanceKycInit:
+ case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
+ case PeerPullPaymentCreditStatus.PendingCreatePurse:
+ case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
+ rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse;
+ rec.abortReason = reason;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.PendingWithdrawing:
+ throw Error("can't abort anymore");
+ case PeerPullPaymentCreditStatus.PendingReady:
+ rec.abortReason = reason;
+ rec.status = PeerPullPaymentCreditStatus.AbortingDeletePurse;
+ return TransitionResultType.Transition;
+ case PeerPullPaymentCreditStatus.Done:
+ case PeerPullPaymentCreditStatus.SuspendedCreatePurse:
+ case PeerPullPaymentCreditStatus.SuspendedMergeKycRequired:
+ case PeerPullPaymentCreditStatus.SuspendedReady:
+ case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
+ case PeerPullPaymentCreditStatus.Aborted:
+ case PeerPullPaymentCreditStatus.AbortingDeletePurse:
+ case PeerPullPaymentCreditStatus.Failed:
+ case PeerPullPaymentCreditStatus.Expired:
+ case PeerPullPaymentCreditStatus.SuspendedAbortingDeletePurse:
+ return TransitionResultType.Stay
+ default:
+ assertUnreachable(rec.status);
+ }
+ });
+ this.wex.taskScheduler.stopShepherdTask(this.taskId);
+ this.wex.taskScheduler.startShepherdTask(this.taskId);
}
}
@@ -719,25 +615,7 @@ async function queryPurseForPeerPullCredit(
switch (resp.status) {
case HttpStatusCode.Gone: {
// Exchange says that purse doesn't exist anymore => expired!
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
- if (!finPi) {
- logger.warn("peerPullCredit not found anymore");
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(finPi);
- if (finPi.status === PeerPullPaymentCreditStatus.PendingReady) {
- finPi.status = PeerPullPaymentCreditStatus.Expired;
- }
- await tx.peerPullCredit.put(finPi);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(finPi);
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.Expired);
return TaskRunResult.backoff();
}
case HttpStatusCode.NotFound:
@@ -784,25 +662,7 @@ async function queryPurseForPeerPullCredit(
pub: reserve.reservePub,
},
});
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
- if (!finPi) {
- logger.warn("peerPullCredit not found anymore");
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(finPi);
- if (finPi.status === PeerPullPaymentCreditStatus.PendingReady) {
- finPi.status = PeerPullPaymentCreditStatus.PendingWithdrawing;
- }
- await tx.peerPullCredit.put(finPi);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(finPi);
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.PendingWithdrawing);
return TaskRunResult.backoff();
}
@@ -845,27 +705,7 @@ async function longpollKycStatus(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const peerIni = await tx.peerPullCredit.get(pursePub);
- if (!peerIni) {
- return;
- }
- if (
- peerIni.status !== PeerPullPaymentCreditStatus.PendingMergeKycRequired
- ) {
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(peerIni);
- peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
- const newTxState = computePeerPullCreditTransactionState(peerIni);
- await tx.peerPullCredit.put(peerIni);
- await ctx.updateTransactionMeta(tx);
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingMergeKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
return TaskRunResult.progress();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
return TaskRunResult.longpollReturnedPending();
@@ -894,38 +734,7 @@ async function processPeerPullCreditAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await wex.db.runReadWriteTx(
- {
- storeNames: [
- "peerPullCredit",
- "refreshGroups",
- "denominations",
- "coinAvailability",
- "coins",
- "transactionsMeta",
- ],
- },
- async (tx) => {
- const ppiRec = await tx.peerPullCredit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== PeerPullPaymentCreditStatus.AbortingDeletePurse) {
- return undefined;
- }
- const oldTxState = computePeerPullCreditTransactionState(ppiRec);
- ppiRec.status = PeerPullPaymentCreditStatus.Aborted;
- await tx.peerPullCredit.put(ppiRec);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.AbortingDeletePurse, PeerPullPaymentCreditStatus.Aborted);
return TaskRunResult.backoff();
}
@@ -940,44 +749,26 @@ async function handlePeerPullCreditWithdrawing(
await waitWithdrawalFinal(wex, pullIni.withdrawalGroupId);
const ctx = new PeerPullCreditTransactionContext(wex, pullIni.pursePub);
const wgId = pullIni.withdrawalGroupId;
- let finished: boolean = false;
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "withdrawalGroups", "transactionsMeta"] },
- async (tx) => {
- const ppi = await tx.peerPullCredit.get(pullIni.pursePub);
- if (!ppi) {
- finished = true;
- return;
- }
- if (ppi.status !== PeerPullPaymentCreditStatus.PendingWithdrawing) {
- finished = true;
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(ppi);
- const wg = await tx.withdrawalGroups.get(wgId);
- if (!wg) {
- // FIXME: Fail the operation instead?
- return undefined;
- }
- switch (wg.status) {
- case WithdrawalGroupStatus.Done:
- finished = true;
- ppi.status = PeerPullPaymentCreditStatus.Done;
- break;
- // FIXME: Also handle other final states!
- }
- await tx.peerPullCredit.put(ppi);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(ppi);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
- if (finished) {
+ const info = await ctx.transition({
+ extraStores: ["withdrawalGroups"]
+ }, async (rec, tx) => {
+ if (rec.status !== PeerPullPaymentCreditStatus.PendingWithdrawing) {
+ return TransitionResultType.Stay
+ }
+ const wg = await tx.withdrawalGroups.get(wgId);
+ if (!wg) {
+ // FIXME: Fail the operation instead?
+ return TransitionResultType.Stay
+ }
+ switch (wg.status) {
+ case WithdrawalGroupStatus.Done:
+ rec.status = PeerPullPaymentCreditStatus.Done;
+ break;
+ // FIXME: Also handle other final states!
+ }
+ return TransitionResultType.Transition
+ })
+ if (info?.newTxState.major != TransactionMajorState.Pending) {
return TaskRunResult.finished();
} else {
// FIXME: Return indicator that we depend on the other operation!
@@ -1003,39 +794,24 @@ async function handlePeerPullCreditCreatePurse(
amount: kycCheckRes.nextThreshold,
exchangeBaseUrl: pullIni.exchangeBaseUrl,
});
- await ctx.transition({}, async (rec) => {
- if (!rec) {
- return TransitionResult.stay();
- }
- if (rec.status !== PeerPullPaymentCreditStatus.PendingCreatePurse) {
- return TransitionResult.stay();
- }
- rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycInit;
- return TransitionResult.transition(rec);
- });
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingCreatePurse, PeerPullPaymentCreditStatus.PendingBalanceKycInit);
return TaskRunResult.progress();
}
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
- const pursePub = pullIni.pursePub;
+
const mergeReserve = await wex.db.runReadOnlyTx(
{ storeNames: ["reserves"] },
- async (tx) => {
- return tx.reserves.get(pullIni.mergeReserveRowId);
- },
+ async (tx) => tx.reserves.get(pullIni.mergeReserveRowId)
);
-
if (!mergeReserve) {
throw Error("merge reserve for peer pull payment not found in database");
}
const contractTermsRecord = await wex.db.runReadOnlyTx(
{ storeNames: ["contractTerms"] },
- async (tx) => {
- return tx.contractTerms.get(pullIni.contractTermsHash);
- },
+ async (tx) => tx.contractTerms.get(pullIni.contractTermsHash)
);
-
if (!contractTermsRecord) {
throw Error("contract terms for peer pull payment not found in database");
}
@@ -1112,22 +888,10 @@ async function handlePeerPullCreditCreatePurse(
logger.info(`reserve merge response: ${j2s(resp)}`);
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const pi2 = await tx.peerPullCredit.get(pursePub);
- if (!pi2) {
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(pi2);
- pi2.status = PeerPullPaymentCreditStatus.PendingReady;
- await tx.peerPullCredit.put(pi2);
- await ctx.updateTransactionMeta(tx);
- const newTxState = computePeerPullCreditTransactionState(pi2);
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await ctx.transition({}, async (rec, _) => {
+ rec.status = PeerPullPaymentCreditStatus.PendingReady
+ return TransitionResultType.Transition
+ })
return TaskRunResult.backoff();
}
@@ -1141,9 +905,7 @@ export async function processPeerPullCredit(
const pullIni = await wex.db.runReadOnlyTx(
{ storeNames: ["peerPullCredit"] },
- async (tx) => {
- return tx.peerPullCredit.get(pursePub);
- },
+ async (tx) => tx.peerPullCredit.get(pursePub),
);
if (!pullIni) {
throw Error("peer pull payment initiation not found in database");
@@ -1246,33 +1008,19 @@ async function processPeerPullCreditBalanceKyc(
});
if (ret.result === "ok") {
- await ctx.transition({}, async (rec) => {
- if (!rec) {
- return TransitionResult.stay();
- }
- if (
- rec.status !== PeerPullPaymentCreditStatus.PendingBalanceKycRequired
- ) {
- return TransitionResult.stay();
- }
- rec.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
- return TransitionResult.transition(rec);
- });
+ await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingBalanceKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
return TaskRunResult.progress();
} else if (
peerInc.status === PeerPullPaymentCreditStatus.PendingBalanceKycInit &&
ret.walletKycStatus === ExchangeWalletKycStatus.Legi
) {
await ctx.transition({}, async (rec) => {
- if (!rec) {
- return TransitionResult.stay();
- }
if (rec.status !== PeerPullPaymentCreditStatus.PendingBalanceKycInit) {
- return TransitionResult.stay();
+ return TransitionResultType.Stay
}
rec.status = PeerPullPaymentCreditStatus.PendingBalanceKycRequired;
rec.kycAccessToken = ret.walletKycAccessToken;
- return TransitionResult.transition(rec);
+ return TransitionResultType.Transition
});
return TaskRunResult.progress();
} else {
@@ -1286,7 +1034,6 @@ async function processPeerPullCreditKycRequired(
kycPayoHash: string,
): Promise<TaskRunResult> {
const ctx = new PeerPullCreditTransactionContext(wex, peerIni.pursePub);
- const { pursePub } = peerIni;
// FIXME: What if this changes? Should be part of the p2p record
const mergeReserveInfo = await getMergeReserveInfo(wex, {
@@ -1321,38 +1068,20 @@ async function processPeerPullCreditKycRequired(
codecForAccountKycStatus(),
);
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const { transitionInfo, result } = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "transactionsMeta"] },
- async (tx) => {
- const peerInc = await tx.peerPullCredit.get(pursePub);
- if (!peerInc) {
- return {
- transitionInfo: undefined,
- result: TaskRunResult.finished(),
- };
- }
- const oldTxState = computePeerPullCreditTransactionState(peerInc);
- peerInc.kycPaytoHash = kycPayoHash;
- logger.info(
- `setting peer-pull-credit kyc payto hash to ${kycPayoHash}`,
- );
- peerInc.kycAccessToken = kycStatus.access_token;
- peerInc.status = PeerPullPaymentCreditStatus.PendingMergeKycRequired;
- const newTxState = computePeerPullCreditTransactionState(peerInc);
- await tx.peerPullCredit.put(peerInc);
- await ctx.updateTransactionMeta(tx);
- return {
- transitionInfo: {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- },
- result: TaskRunResult.progress(),
- };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
- return result;
+ const info = await ctx.transition({}, async (rec) => {
+ rec.kycPaytoHash = kycPayoHash;
+ logger.info(
+ `setting peer-pull-credit kyc payto hash to ${kycPayoHash}`,
+ );
+ rec.kycAccessToken = kycStatus.access_token;
+ rec.status = PeerPullPaymentCreditStatus.PendingMergeKycRequired;
+ return TransitionResultType.Transition
+ })
+ if (info == null) {
+ return TaskRunResult.finished()
+ } else {
+ return TaskRunResult.progress()
+ }
} else {
throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
}
@@ -1513,7 +1242,6 @@ export async function initiatePeerPullPayment(
const mergeTimestamp = TalerPreciseTimestamp.now();
const ctx = new PeerPullCreditTransactionContext(wex, pursePair.pub);
-
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPullCredit", "contractTerms", "transactionsMeta"] },
async (tx) => {
@@ -1547,7 +1275,6 @@ export async function initiatePeerPullPayment(
return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
},
);
-
notifyTransition(wex, ctx.transactionId, transitionInfo);
wex.taskScheduler.startShepherdTask(ctx.taskId);