commit 5af24b3d192b8c84caab02a0bcec9108d828b0e3
parent bb88585c7cc80907acd4c811c049698b7c60be5f
Author: Antoine A <>
Date: Thu, 24 Apr 2025 13:07:48 +0200
wallet-core: use common transition logic for all P2P transactions
Diffstat:
7 files changed, 404 insertions(+), 738 deletions(-)
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
@@ -3255,7 +3255,8 @@ export const WalletStoresV1 = {
),
};
-export type WalletDbStoresArr = Array<StoreNames<typeof WalletStoresV1>>;
+export type WalletDbStoresName = StoreNames<typeof WalletStoresV1>;
+export type WalletDbStoresArr = Array<WalletDbStoresName>;
export type WalletDbReadWriteTransaction<StoresArr extends WalletDbStoresArr> =
DbReadWriteTransaction<typeof WalletStoresV1, StoresArr>;
diff --git a/packages/taler-wallet-core/src/pay-peer-common.ts b/packages/taler-wallet-core/src/pay-peer-common.ts
@@ -19,18 +19,31 @@ import {
AmountJson,
Amounts,
ExchangePurseStatus,
+ NotificationType,
SelectedProspectiveCoin,
TalerProtocolTimestamp,
+ TransactionIdStr,
+ TransactionMajorState,
+ TransactionState,
+ WalletNotification,
checkDbInvariant,
} from "@gnu-taler/taler-util";
import { SpendCoinDetails } from "./crypto/cryptoImplementation.js";
import {
DbPeerPushPaymentCoinSelection,
ReserveRecord,
+ TransactionMetaRecord,
WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
+ WalletDbStoresArr,
+ WalletDbStoresName,
+ WalletStoresV1,
} from "./db.js";
import { getTotalRefreshCost } from "./refresh.js";
import { WalletExecutionContext, getDenomInfo } from "./wallet.js";
+import { BalanceEffect, notifyTransition, TransitionInfo } from "./transactions.js";
+import { TransitionResultType } from "./common.js";
+import { IDBValidKey } from "@gnu-taler/idb-bridge";
/**
* Get information about the coin selected for signatures.
@@ -171,3 +184,151 @@ export function isPurseDeposited(purse: ExchangePurseStatus): boolean {
return depositTimestamp != null &&
!TalerProtocolTimestamp.isNever(depositTimestamp)
}
+
+/** Extract the stored type of a DB store */
+type StoreType<Store extends WalletDbStoresName> = (typeof WalletStoresV1)[Store]['store']['_dummy'];
+
+interface RecordCtx<Store extends WalletDbStoresName> {
+ store: Store,
+ transactionId: TransactionIdStr;
+ recordId: IDBValidKey;
+ wex: WalletExecutionContext;
+ recordMeta: ((rec: StoreType<Store>) => TransactionMetaRecord);
+ recordState: ((rec: StoreType<Store>) => TransactionState),
+}
+
+/** Create a new record, update its metadata and notify its creation */
+export async function recordCreate<Store extends WalletDbStoresName, ExtraStores extends WalletDbStoresArr = []>(
+ ctx: RecordCtx<Store>,
+ opts: { extraStores?: ExtraStores, label?: string },
+ lambda: ((tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta", ...ExtraStores]>) => Promise<StoreType<Store>>)
+) {
+ const baseStore = [ctx.store, "transactionsMeta" as const]
+ const storeNames = opts.extraStores
+ ? [...baseStore, ...opts.extraStores]
+ : baseStore;
+ const transitionInfo = await ctx.wex.db.runReadWriteTx(
+ { storeNames, label: opts.label },
+ async (tx) => {
+ const oldTxState: TransactionState = {
+ major: TransactionMajorState.None,
+ };
+ const rec = await lambda(tx);
+ // FIXME: DbReadWriteTransaction conditional type confuse Typescript, we should simplify invariable
+ await (tx[ctx.store] as any).add(rec);
+ await tx.transactionsMeta.put(ctx.recordMeta(rec))
+ const newTxState = ctx.recordState(rec);
+ return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
+ },
+ );
+ notifyTransition(ctx.wex, ctx.transactionId, transitionInfo);
+}
+
+/**
+ * Optionally update an existing record, ignore if missing.
+ * If a transition occurs, update its metadata and notify.
+ **/
+export async function recordTransition<Store extends WalletDbStoresName, ExtraStores extends WalletDbStoresArr = []>(
+ ctx: RecordCtx<Store>,
+ opts: { extraStores?: ExtraStores, label?: string },
+ lambda: ((rec: StoreType<Store>, tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta", ...ExtraStores]>) => Promise<TransitionResultType.Stay | TransitionResultType.Transition>)
+): Promise<TransitionInfo | undefined> {
+ const baseStore = [ctx.store, "transactionsMeta" as const]
+ const storeNames = opts.extraStores
+ ? [...baseStore, ...opts.extraStores]
+ : baseStore;
+ const transitionInfo = await ctx.wex.db.runReadWriteTx(
+ { storeNames, label: opts.label },
+ async (tx) => {
+ const rec = await tx[ctx.store].get(ctx.recordId);
+ if (rec == null) {
+ // FIXME warn
+ return;
+ }
+ const oldTxState = ctx.recordState(rec);
+ const res = await lambda(rec, tx);
+ switch (res) {
+ case TransitionResultType.Transition: {
+ await tx[ctx.store].put(rec);
+ await tx.transactionsMeta.put(ctx.recordMeta(rec))
+ const newTxState = ctx.recordState(rec);
+ return {
+ oldTxState,
+ newTxState,
+ balanceEffect: BalanceEffect.Any,
+ };
+ }
+ case TransitionResultType.Stay:
+ return;
+ }
+ },
+ );
+ notifyTransition(ctx.wex, ctx.transactionId, transitionInfo);
+ return transitionInfo;
+}
+
+/** Extract the stored type status if any */
+type StoreTypeStatus<Store extends WalletDbStoresName> = StoreType<Store> extends { status: infer Status } ? Status : never;
+
+/**
+ * Optionally update an existing record status from a state to another, ignore if missing.
+ * If a transition occurs, update its metadata and notify.
+ **/
+export async function recordTransitionStatus<Store extends WalletDbStoresName>(
+ ctx: RecordCtx<Store>,
+ from: StoreTypeStatus<Store>,
+ to: StoreTypeStatus<Store>
+): Promise<TransitionInfo | undefined> {
+ return recordTransition(ctx, {}, async (rec, _) => {
+ const it = rec as { status: StoreTypeStatus<Store> };
+ if (it.status !== from) {
+ return TransitionResultType.Stay
+ } else {
+ it.status = to;
+ return TransitionResultType.Transition
+ }
+ })
+}
+
+/**
+ * Optionally delete a record, update its metadata and notify.
+ **/
+export async function recordDelete<Store extends WalletDbStoresName>(
+ ctx: RecordCtx<Store>,
+ tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta"]>,
+ lambda: ((rec: StoreType<Store>, notifs: WalletNotification[]) => Promise<void>) = async () => { }
+): Promise<{ notifs: WalletNotification[] }> {
+ const notifs: WalletNotification[] = [];
+ const rec = tx[ctx.store].get(ctx.recordId);
+ if (rec == null) {
+ return { notifs };
+ }
+ const oldTxState = ctx.recordState(rec);
+ await lambda(rec, notifs)
+ await tx[ctx.store].delete(ctx.recordId);
+ await tx.transactionsMeta.delete(ctx.transactionId);
+ notifs.push({
+ type: NotificationType.TransactionStateTransition,
+ transactionId: ctx.transactionId,
+ oldTxState,
+ newTxState: {
+ major: TransactionMajorState.Deleted,
+ },
+ });
+ return { notifs };
+}
+
+/**
+ * Update record stored transaction metadata
+ **/
+export async function recordUpdateMeta<Store extends WalletDbStoresName>(
+ ctx: RecordCtx<Store>,
+ tx: WalletDbReadWriteTransaction<[Store, "transactionsMeta"]>
+): Promise<void> {
+ const rec = await tx[ctx.store].get(ctx.recordId);
+ if (rec == null) {
+ await tx.transactionsMeta.delete(ctx.transactionId);
+ } else {
+ await tx.transactionsMeta.put(ctx.recordMeta(rec));
+ }
+}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -72,7 +72,6 @@ import {
PeerPullPaymentCreditStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadWriteTransaction,
- WalletDbStoresArr,
WithdrawalGroupRecord,
WithdrawalGroupStatus,
WithdrawalRecordType,
@@ -89,15 +88,17 @@ import {
} from "./exchanges.js";
import { checkPeerCreditHardLimitExceeded } from "./kyc.js";
import {
- getMergeReserveInfo,
isPurseDeposited,
+ recordCreate,
+ getMergeReserveInfo,
+ recordTransitionStatus,
+ recordTransition,
+ recordUpdateMeta,
+ recordDelete,
} from "./pay-peer-common.js";
import {
- BalanceEffect,
- TransitionInfo,
constructTransactionIdentifier,
isUnsuccessfulTransaction,
- notifyTransition,
} from "./transactions.js";
import { WalletExecutionContext } from "./wallet.js";
import {
@@ -127,115 +128,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
});
}
- /**
- * Transition an existing peer-pull-credit transaction.
- * Extra object stores may be accessed during the transition.
- */
- async transition<StoreNameArray extends WalletDbStoresArr>(
- opts: { extraStores?: StoreNameArray },
- lambda: (
- rec: PeerPullCreditRecord,
- tx: WalletDbReadWriteTransaction<
- [
- "peerPullCredit",
- "transactionsMeta",
- ...StoreNameArray,
- ]
- >,
- ) => Promise<TransitionResultType>,
- ): Promise<TransitionInfo | undefined> {
- const baseStores = [
- "peerPullCredit" as const,
- "transactionsMeta" as const
- ];
- const stores = opts.extraStores
- ? [...baseStores, ...opts.extraStores]
- : baseStores;
-
- let errorThrown: Error | undefined;
- const transitionInfo = await this.wex.db.runReadWriteTx(
- { storeNames: stores },
- async (tx) => {
- const rec = await tx.peerPullCredit.get(this.pursePub);
- if (rec == null) {
- logger.warn(`peer pull credit ${this.pursePub} not found`);
- return;
- }
- let oldTxState = computePeerPullCreditTransactionState(rec);
- let res: TransitionResultType;
- try {
- res = await lambda(rec, tx);
- } catch (error) {
- if (error instanceof Error) {
- errorThrown = error;
- }
- return undefined;
- }
-
- switch (res) {
- case TransitionResultType.Transition: {
- await tx.peerPullCredit.put(rec)
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: rec.status,
- timestamp: rec.mergeTimestamp,
- currency: Amounts.currencyOf(rec.amount),
- exchanges: [rec.exchangeBaseUrl],
- })
- const newTxState = computePeerPullCreditTransactionState(rec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- case TransitionResultType.Delete:
- throw new Error("Cannot delete using transition");
- case TransitionResultType.Stay:
- return;
- }
- },
- );
- if (errorThrown != null) {
- throw errorThrown;
- }
- notifyTransition(this.wex, this.transactionId, transitionInfo);
- return transitionInfo;
- }
-
- /**
- * Transition an existing peer-pull-credit transaction status
- */
- async transitionStatus(
- from: PeerPullPaymentCreditStatus,
- to: PeerPullPaymentCreditStatus
- ) {
- await this.transition({}, async (rec) => {
- if (rec.status !== from) {
- return TransitionResultType.Stay
- } else {
- rec.status = to;
- return TransitionResultType.Transition
- }
- });
- }
-
- async updateTransactionMeta(
- tx: WalletDbReadWriteTransaction<["peerPullCredit", "transactionsMeta"]>,
- ): Promise<void> {
- const rec = await tx.peerPullCredit.get(this.pursePub);
- if (rec == null) {
- await tx.transactionsMeta.delete(this.transactionId);
- } else {
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: rec.status,
- timestamp: rec.mergeTimestamp,
- currency: Amounts.currencyOf(rec.amount),
- exchanges: [rec.exchangeBaseUrl],
- });
- }
- }
+ readonly store = "peerPullCredit";
+ readonly recordId = this.pursePub
+ readonly recordState = computePeerPullCreditTransactionState
+ readonly recordMeta = (rec: PeerPullCreditRecord) => ({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.mergeTimestamp,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ });
+ updateTransactionMeta = (tx: WalletDbReadWriteTransaction<["peerPullCredit", "transactionsMeta"]>) => recordUpdateMeta(this, tx)
async deleteTransactionInTx(
tx: WalletDbReadWriteTransaction<
@@ -248,32 +151,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
]
>,
): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.peerPullCredit.get(this.pursePub);
- if (!rec) {
- return { notifs };
- }
- const oldTxState = computePeerPullCreditTransactionState(rec);
- if (rec.withdrawalGroupId) {
- const withdrawalGroupId = rec.withdrawalGroupId;
- const withdrawalCtx = new WithdrawTransactionContext(
- this.wex,
- withdrawalGroupId,
- );
- const res = await withdrawalCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
- }
- await tx.peerPullCredit.delete(this.pursePub)
- await tx.transactionsMeta.delete(this.transactionId)
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- });
- return { notifs };
+ return recordDelete(this, tx, async (rec, notifs) => {
+ if (rec.withdrawalGroupId) {
+ const withdrawalGroupId = rec.withdrawalGroupId;
+ const withdrawalCtx = new WithdrawTransactionContext(
+ this.wex,
+ withdrawalGroupId,
+ );
+ const res = await withdrawalCtx.deleteTransactionInTx(tx);
+ notifs.push(...res.notifs);
+ }
+ })
}
/**
@@ -421,7 +309,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullPaymentCreditStatus.PendingCreatePurse:
rec.status = PeerPullPaymentCreditStatus.SuspendedCreatePurse;
@@ -465,7 +353,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullPaymentCreditStatus.PendingCreatePurse:
case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
@@ -497,7 +385,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullPaymentCreditStatus.PendingCreatePurse:
case PeerPullPaymentCreditStatus.PendingMergeKycRequired:
@@ -540,7 +428,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullPaymentCreditStatus.PendingBalanceKycRequired:
case PeerPullPaymentCreditStatus.SuspendedBalanceKycRequired:
@@ -593,7 +481,7 @@ async function queryPurseForPeerPullCredit(
break;
case HttpStatusCode.Gone:
// Exchange says that purse doesn't exist anymore => expired!
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.Expired);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.Expired);
return TaskRunResult.finished();
case HttpStatusCode.NotFound:
// FIXME: Maybe check error code? 404 could also mean something else.
@@ -630,7 +518,7 @@ async function queryPurseForPeerPullCredit(
pub: reserve.reservePub,
},
});
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.PendingWithdrawing);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.PendingWithdrawing);
return TaskRunResult.progress();
}
@@ -664,7 +552,7 @@ async function longpollKycStatus(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingMergeKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.PendingMergeKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
return TaskRunResult.progress();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
return TaskRunResult.longpollReturnedPending();
@@ -692,7 +580,7 @@ async function processPeerPullCreditAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.AbortingDeletePurse, PeerPullPaymentCreditStatus.Aborted);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.AbortingDeletePurse, PeerPullPaymentCreditStatus.Aborted);
return TaskRunResult.backoff();
}
@@ -707,7 +595,7 @@ async function handlePeerPullCreditWithdrawing(
await waitWithdrawalFinal(wex, pullIni.withdrawalGroupId);
const ctx = new PeerPullCreditTransactionContext(wex, pullIni.pursePub);
const wgId = pullIni.withdrawalGroupId;
- const info = await ctx.transition({
+ const info = await recordTransition(ctx, {
extraStores: ["withdrawalGroups"]
}, async (rec, tx) => {
if (rec.status !== PeerPullPaymentCreditStatus.PendingWithdrawing) {
@@ -752,7 +640,7 @@ async function handlePeerPullCreditCreatePurse(
amount: kycCheckRes.nextThreshold,
exchangeBaseUrl: pullIni.exchangeBaseUrl,
});
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingCreatePurse, PeerPullPaymentCreditStatus.PendingBalanceKycInit);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.PendingCreatePurse, PeerPullPaymentCreditStatus.PendingBalanceKycInit);
return TaskRunResult.progress();
}
@@ -847,7 +735,7 @@ async function handlePeerPullCreditCreatePurse(
assertUnreachable(httpResp);
}
- await ctx.transition({}, async (rec, _) => {
+ await recordTransition(ctx, {}, async (rec, _) => {
rec.status = PeerPullPaymentCreditStatus.PendingReady
return TransitionResultType.Transition
})
@@ -954,13 +842,13 @@ async function processPeerPullCreditBalanceKyc(
});
if (ret.result === "ok") {
- await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingBalanceKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
+ await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.PendingBalanceKycRequired, PeerPullPaymentCreditStatus.PendingCreatePurse);
return TaskRunResult.progress();
} else if (
peerInc.status === PeerPullPaymentCreditStatus.PendingBalanceKycInit &&
ret.walletKycStatus === ExchangeWalletKycStatus.Legi
) {
- await ctx.transition({}, async (rec) => {
+ await recordTransition(ctx, {}, async (rec) => {
if (rec.status !== PeerPullPaymentCreditStatus.PendingBalanceKycInit) {
return TransitionResultType.Stay
}
@@ -1004,7 +892,7 @@ async function processPeerPullCreditKycRequired(
return TaskRunResult.finished();
case HttpStatusCode.Accepted: {
logger.info(`kyc status: ${j2s(res.body)}`);
- await ctx.transition({}, async (rec) => {
+ await recordTransition(ctx, {}, async (rec) => {
rec.kycPaytoHash = kycPayoHash;
logger.info(
`setting peer-pull-credit kyc payto hash to ${kycPayoHash}`,
@@ -1175,40 +1063,32 @@ export async function initiatePeerPullPayment(
const mergeTimestamp = TalerPreciseTimestamp.now();
const ctx = new PeerPullCreditTransactionContext(wex, pursePair.pub);
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullCredit", "contractTerms", "transactionsMeta"] },
- async (tx) => {
- const ppi: PeerPullCreditRecord = {
- amount: req.partialContractTerms.amount,
- contractTermsHash: hContractTerms,
- exchangeBaseUrl: exchangeBaseUrl,
- pursePriv: pursePair.priv,
- pursePub: pursePair.pub,
- mergePriv: mergePair.priv,
- mergePub: mergePair.pub,
- status: PeerPullPaymentCreditStatus.PendingCreatePurse,
- mergeTimestamp: timestampPreciseToDb(mergeTimestamp),
- contractEncNonce,
- mergeReserveRowId: mergeReserveRowId,
- contractPriv: contractKeyPair.priv,
- contractPub: contractKeyPair.pub,
- withdrawalGroupId,
- estimatedAmountEffective: wi.withdrawalAmountEffective,
- };
- await tx.peerPullCredit.put(ppi);
- await ctx.updateTransactionMeta(tx);
- const oldTxState: TransactionState = {
- major: TransactionMajorState.None,
- };
- const newTxState = computePeerPullCreditTransactionState(ppi);
- await tx.contractTerms.put({
- contractTermsRaw: contractTerms,
- h: hContractTerms,
- });
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await recordCreate(ctx, {
+ extraStores: ["contractTerms"],
+ label: "create-transaction-peer-pull-credit"
+ }, async (tx) => {
+ await tx.contractTerms.put({
+ contractTermsRaw: contractTerms,
+ h: hContractTerms,
+ });
+ return {
+ amount: req.partialContractTerms.amount,
+ contractTermsHash: hContractTerms,
+ exchangeBaseUrl: exchangeBaseUrl,
+ pursePriv: pursePair.priv,
+ pursePub: pursePair.pub,
+ mergePriv: mergePair.priv,
+ mergePub: mergePair.pub,
+ status: PeerPullPaymentCreditStatus.PendingCreatePurse,
+ mergeTimestamp: timestampPreciseToDb(mergeTimestamp),
+ contractEncNonce,
+ mergeReserveRowId: mergeReserveRowId,
+ contractPriv: contractKeyPair.priv,
+ contractPub: contractKeyPair.pub,
+ withdrawalGroupId,
+ estimatedAmountEffective: wi.withdrawalAmountEffective,
+ };
+ });
wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -87,7 +87,6 @@ import {
RefreshOperationStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadWriteTransaction,
- WalletDbStoresArr,
timestampPreciseFromDb,
timestampPreciseToDb,
} from "./db.js";
@@ -96,15 +95,17 @@ import {
getTotalPeerPaymentCost,
isPurseDeposited,
queryCoinInfosForSelection,
+ recordCreate,
+ recordDelete,
+ recordTransition,
+ recordTransitionStatus,
+ recordUpdateMeta,
} from "./pay-peer-common.js";
import { createRefreshGroup } from "./refresh.js";
import {
- BalanceEffect,
constructTransactionIdentifier,
isUnsuccessfulTransaction,
- notifyTransition,
parseTransactionIdentifier,
- TransitionInfo,
} from "./transactions.js";
import { WalletExecutionContext } from "./wallet.js";
@@ -131,22 +132,17 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
});
}
- async updateTransactionMeta(
- tx: WalletDbReadWriteTransaction<["peerPullDebit", "transactionsMeta"]>,
- ): Promise<void> {
- const ppdRec = await tx.peerPullDebit.get(this.peerPullDebitId);
- if (!ppdRec) {
- await tx.transactionsMeta.delete(this.transactionId);
- return;
- }
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: ppdRec.status,
- timestamp: ppdRec.timestampCreated,
- currency: Amounts.currencyOf(ppdRec.amount),
- exchanges: [ppdRec.exchangeBaseUrl],
- });
- }
+ readonly store = "peerPullDebit";
+ readonly recordId = this.peerPullDebitId
+ readonly recordState = computePeerPullDebitTransactionState
+ readonly recordMeta = (rec: PeerPullPaymentIncomingRecord) => ({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.timestampCreated,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ });
+ updateTransactionMeta = (tx: WalletDbReadWriteTransaction<["peerPullDebit", "transactionsMeta"]>) => recordUpdateMeta(this, tx)
/**
* Get the full transaction details for the transaction.
@@ -192,8 +188,8 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const res = await this.wex.db.runReadWriteTx(
- { storeNames: ["peerPullDebit", "tombstones", "transactionsMeta"] },
- async (tx) => this.deleteTransactionInTx(tx)
+ { storeNames: ["peerPullDebit", "transactionsMeta"] },
+ this.deleteTransactionInTx
);
for (const notif of res.notifs) {
this.wex.ws.notify(notif);
@@ -202,30 +198,14 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
async deleteTransactionInTx(
tx: WalletDbReadWriteTransaction<
- ["peerPullDebit", "tombstones", "transactionsMeta"]
+ ["peerPullDebit", "transactionsMeta"]
>,
): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.peerPullDebit.get(this.peerPullDebitId);
- if (!rec) {
- return { notifs };
- }
- const oldTxState = computePeerPullDebitTransactionState(rec);
- await tx.peerPullDebit.delete(rec.peerPullDebitId);
- await tx.transactionsMeta.delete(this.transactionId);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- });
- return { notifs };
+ return recordDelete(this, tx)
}
async suspendTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullDebitRecordStatus.DialogProposed:
case PeerPullDebitRecordStatus.Done:
@@ -248,7 +228,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullDebitRecordStatus.SuspendedDeposit:
rec.status = PeerPullDebitRecordStatus.PendingDeposit;
@@ -269,7 +249,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPullDebitRecordStatus.SuspendedDeposit:
case PeerPullDebitRecordStatus.PendingDeposit:
@@ -287,16 +267,15 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition(
+ await recordTransition(this,
{
extraStores: [
"coinAvailability",
- "coinAvailability",
"coinHistory",
"coins",
"denominations",
"refreshGroups",
- "refreshSessions",
+ "refreshSessions"
],
},
async (pi, tx) => {
@@ -339,95 +318,6 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
this.wex.taskScheduler.stopShepherdTask(this.taskId);
this.wex.taskScheduler.startShepherdTask(this.taskId);
}
-
- /**
- * Transition an existing peer-pull-debit transaction.
- * Extra object stores may be accessed during the transition.
- */
- async transition<StoreNameArray extends WalletDbStoresArr>(
- opts: { extraStores?: StoreNameArray },
- lambda: (
- rec: PeerPullPaymentIncomingRecord,
- tx: WalletDbReadWriteTransaction<
- [
- "peerPullDebit",
- "transactionsMeta",
- ...StoreNameArray,
- ]
- >,
- ) => Promise<TransitionResultType>,
- ): Promise<TransitionInfo | undefined> {
- const baseStores = [
- "peerPullDebit" as const,
- "transactionsMeta" as const
- ];
- const stores = opts.extraStores
- ? [...baseStores, ...opts.extraStores]
- : baseStores;
-
- const transitionInfo = await this.wex.db.runReadWriteTx(
- { storeNames: stores },
- async (tx) => {
- const rec = await tx.peerPullDebit.get(this.peerPullDebitId);
- if (!rec) {
- logger.warn(`peer pull payment ${this.peerPullDebitId} not found`);
- return;
- }
- const oldTxState = computePeerPullDebitTransactionState(rec);
- const res = await lambda(rec, tx);
- switch (res) {
- case TransitionResultType.Transition: {
- await tx.peerPullDebit.put(rec);
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: rec.status,
- timestamp: rec.timestampCreated,
- currency: Amounts.currencyOf(rec.amount),
- exchanges: [rec.exchangeBaseUrl],
- });
- const newTxState = computePeerPullDebitTransactionState(rec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- case TransitionResultType.Delete: {
- await tx.peerPullDebit.delete(this.peerPullDebitId);
- await tx.transactionsMeta.delete(this.transactionId);
- return {
- oldTxState,
- newTxState: {
- major: TransactionMajorState.None,
- },
- balanceEffect: BalanceEffect.Any,
- };
- }
- case TransitionResultType.Stay:
- return;
- }
- },
- );
- notifyTransition(this.wex, this.transactionId, transitionInfo);
- return transitionInfo;
- }
-
- /**
- * Transition an existing peer-pull-debit transaction status
- */
- async transitionStatus(
- from: PeerPullDebitRecordStatus,
- to: PeerPullDebitRecordStatus
- ) {
- await this.transition({}, async (rec) => {
- if (rec.status !== from) {
- return TransitionResultType.Stay
- } else {
- rec.status = to;
- return TransitionResultType.Transition
- }
- });
- }
}
async function handlePurseCreationConflict(
@@ -495,7 +385,7 @@ async function handlePurseCreationConflict(
coinSelRes.result.coins,
);
- await ctx.transition({}, async (rec) => {
+ await recordTransition(ctx, {}, async (rec) => {
switch (rec.status) {
case PeerPullDebitRecordStatus.PendingDeposit:
case PeerPullDebitRecordStatus.SuspendedDeposit: {
@@ -530,7 +420,7 @@ async function processPeerPullDebitDialogProposed(
break;
case HttpStatusCode.Gone:
// Exchange says that purse doesn't exist anymore => expired!
- await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
+ await recordTransitionStatus(ctx, PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
return TaskRunResult.finished();
case HttpStatusCode.NotFound:
// FIXME: Maybe check error code? 404 could also mean something else.
@@ -541,7 +431,7 @@ async function processPeerPullDebitDialogProposed(
if (isPurseDeposited(resp.body)) {
logger.info("purse completed by another wallet");
- await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
+ await recordTransitionStatus(ctx, PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
return TaskRunResult.finished();
}
@@ -590,7 +480,7 @@ async function processPeerPullDebitPendingDeposit(
const totalAmount = await getTotalPeerPaymentCost(wex, coins);
// FIXME: Missing notification here!
- const info = await ctx.transition(
+ const info = await recordTransition(ctx,
{
extraStores: [
"coinAvailability",
@@ -700,7 +590,7 @@ async function processPeerPullDebitPendingDeposit(
}
// All batches succeeded, we can transition!
- await ctx.transitionStatus(PeerPullDebitRecordStatus.PendingDeposit, PeerPullDebitRecordStatus.Done);
+ await recordTransitionStatus(ctx, PeerPullDebitRecordStatus.PendingDeposit, PeerPullDebitRecordStatus.Done);
return TaskRunResult.finished();
}
@@ -712,7 +602,7 @@ async function processPeerPullDebitAbortingRefresh(
const abortRefreshGroupId = peerPullInc.abortRefreshGroupId;
checkLogicInvariant(!!abortRefreshGroupId);
const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId);
- await ctx.transition({ extraStores: ["refreshGroups"] }, async (rec, tx) => {
+ await recordTransition(ctx, { extraStores: ["refreshGroups"] }, async (rec, tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
if (refreshGroup == null) {
// Maybe it got manually deleted? Means that we should
@@ -825,7 +715,7 @@ export async function confirmPeerPullDebit(
const totalAmount = await getTotalPeerPaymentCost(wex, coins);
- await ctx.transition({
+ await recordTransition(ctx, {
extraStores: [
"coinAvailability",
"coinHistory",
@@ -1029,14 +919,14 @@ export async function preparePeerPullDebit(
const currency = Amounts.currencyOf(totalAmount);
const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId);
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["peerPullDebit", "contractTerms", "transactionsMeta"] },
+ await recordCreate(ctx,
+ { extraStores: ["contractTerms"], label: "crate-transaction-peer-pull-credit" },
async (tx) => {
await tx.contractTerms.put({
h: contractTermsHash,
contractTermsRaw: contractTerms,
});
- const record = {
+ return {
peerPullDebitId,
contractPriv: contractPriv,
exchangeBaseUrl: exchangeBaseUrl,
@@ -1047,16 +937,8 @@ export async function preparePeerPullDebit(
status: PeerPullDebitRecordStatus.DialogProposed,
totalCostEstimated: Amounts.stringify(totalAmount),
}
- await tx.peerPullDebit.add(record);
- await ctx.updateTransactionMeta(tx);
- const oldTxState: TransactionState = {
- major: TransactionMajorState.None,
- };
- const newTxState = computePeerPullDebitTransactionState(record);
- return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
},
);
- notifyTransition(wex, ctx.transactionId, transitionInfo);
wex.taskScheduler.startShepherdTask(ctx.taskId)
const scopeInfo = await wex.db.runAllStoresReadOnlyTx({}, (tx) => {
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -77,7 +77,6 @@ import {
PeerPushPaymentIncomingRecord,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadWriteTransaction,
- WalletDbStoresArr,
WithdrawalGroupRecord,
WithdrawalGroupStatus,
WithdrawalRecordType,
@@ -99,10 +98,14 @@ import {
import {
getMergeReserveInfo,
isPurseMerged,
+ recordCreate,
+ recordDelete,
+ recordTransition,
+ recordTransitionStatus,
+ recordUpdateMeta,
} from "./pay-peer-common.js";
import {
BalanceEffect,
- TransitionInfo,
constructTransactionIdentifier,
isUnsuccessfulTransaction,
notifyTransition,
@@ -138,99 +141,17 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
});
}
- /**
- * Transition an existing peer-push-credit transaction.
- * Extra object stores may be accessed during the transition.
- */
- async transition<StoreNameArray extends WalletDbStoresArr>(
- opts: { extraStores?: StoreNameArray },
- f: (
- rec: PeerPushPaymentIncomingRecord,
- tx: WalletDbReadWriteTransaction<
- ["peerPushCredit", "transactionsMeta", ...StoreNameArray]
- >,
- ) => Promise<TransitionResultType>,
- ): Promise<TransitionInfo | undefined> {
- const baseStores = ["peerPushCredit" as const, "transactionsMeta" as const];
- const stores = opts.extraStores
- ? [...baseStores, ...opts.extraStores]
- : baseStores;
-
- let errorThrown: Error | undefined;
- const transitionInfo = await this.wex.db.runReadWriteTx(
- { storeNames: stores },
- async (tx) => {
- const rec = await tx.peerPushCredit.get(this.peerPushCreditId);
- if (rec == null) {
- logger.warn(`peer push credit ${this.peerPushCreditId} not found`);
- return;
- }
- const oldTxState = computePeerPushCreditTransactionState(rec);
- let res: TransitionResultType;
- try {
- res = await f(rec, tx);
- } catch (error) {
- if (error instanceof Error) {
- errorThrown = error;
- }
- return undefined;
- }
-
- switch (res) {
- case TransitionResultType.Transition: {
- await tx.peerPushCredit.put(rec);
- await this.updateTransactionMeta(tx);
- const newTxState = computePeerPushCreditTransactionState(rec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- case TransitionResultType.Delete:
- throw new Error("Cannot delete using transition");
- case TransitionResultType.Stay:
- return;
- }
- },
- );
- if (errorThrown) {
- throw errorThrown;
- }
- notifyTransition(this.wex, this.transactionId, transitionInfo);
- return transitionInfo;
- }
-
- /**
- * Transition an existing peer-push-credit transaction status
- */
- async transitionStatus(from: PeerPushCreditStatus, to: PeerPushCreditStatus) {
- await this.transition({}, async (rec) => {
- if (rec.status !== from) {
- return TransitionResultType.Stay;
- } else {
- rec.status = to;
- return TransitionResultType.Transition;
- }
- });
- }
-
- async updateTransactionMeta(
- tx: WalletDbReadWriteTransaction<["peerPushCredit", "transactionsMeta"]>,
- ): Promise<void> {
- const ppdRec = await tx.peerPushCredit.get(this.peerPushCreditId);
- if (!ppdRec) {
- await tx.transactionsMeta.delete(this.transactionId);
- return;
- }
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: ppdRec.status,
- timestamp: ppdRec.timestamp,
- currency: Amounts.currencyOf(ppdRec.estimatedAmountEffective),
- exchanges: [ppdRec.exchangeBaseUrl],
- });
- }
+ readonly store = "peerPushCredit";
+ readonly recordId = this.peerPushCreditId
+ readonly recordState = computePeerPushCreditTransactionState
+ readonly recordMeta = (rec: PeerPushPaymentIncomingRecord) => ({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.timestamp,
+ currency: Amounts.currencyOf(rec.estimatedAmountEffective),
+ exchanges: [rec.exchangeBaseUrl],
+ });
+ updateTransactionMeta = (tx: WalletDbReadWriteTransaction<["peerPushCredit", "transactionsMeta"]>) => recordUpdateMeta(this, tx)
/**
* Get the full transaction details for the transaction.
@@ -361,36 +282,21 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
]
>,
): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.peerPushCredit.get(this.peerPushCreditId);
- if (!rec) {
- return { notifs };
- }
- const oldTxState = computePeerPushCreditTransactionState(rec);
- if (rec.withdrawalGroupId) {
- const withdrawalGroupId = rec.withdrawalGroupId;
- const withdrawalCtx = new WithdrawTransactionContext(
- this.wex,
- withdrawalGroupId,
- );
- const res = await withdrawalCtx.deleteTransactionInTx(tx);
- notifs.push(...res.notifs);
- }
- await tx.peerPushCredit.delete(rec.peerPushCreditId);
- await this.updateTransactionMeta(tx);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- });
- return { notifs };
+ return recordDelete(this, tx, async (rec, notifs) => {
+ if (rec.withdrawalGroupId) {
+ const withdrawalGroupId = rec.withdrawalGroupId;
+ const withdrawalCtx = new WithdrawTransactionContext(
+ this.wex,
+ withdrawalGroupId,
+ );
+ const res = await withdrawalCtx.deleteTransactionInTx(tx);
+ notifs.push(...res.notifs);
+ }
+ })
}
async suspendTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushCreditStatus.DialogProposed:
case PeerPushCreditStatus.Done:
@@ -426,7 +332,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushCreditStatus.Failed:
case PeerPushCreditStatus.Aborted:
@@ -453,7 +359,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushCreditStatus.DialogProposed:
case PeerPushCreditStatus.PendingMergeKycRequired:
@@ -489,7 +395,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushCreditStatus.Done:
case PeerPushCreditStatus.Aborted:
@@ -648,44 +554,29 @@ export async function preparePeerPushCredit(
}
const ctx = new PeerPushCreditTransactionContext(wex, peerPushCreditId);
-
- const transitionInfo = await wex.db.runReadWriteTx(
- { storeNames: ["contractTerms", "peerPushCredit", "transactionsMeta"] },
- async (tx) => {
- const rec: PeerPushPaymentIncomingRecord = {
- peerPushCreditId,
- contractPriv: contractPriv,
- exchangeBaseUrl: exchangeBaseUrl,
- mergePriv: dec.mergePriv,
- pursePub: pursePub,
- timestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()),
- contractTermsHash,
- status: PeerPushCreditStatus.DialogProposed,
- withdrawalGroupId,
- currency: Amounts.currencyOf(purseStatus.balance),
- estimatedAmountEffective: Amounts.stringify(
- wi.withdrawalAmountEffective,
- ),
- };
- await tx.peerPushCredit.add(rec);
- await ctx.updateTransactionMeta(tx);
- await tx.contractTerms.put({
- h: contractTermsHash,
- contractTermsRaw: dec.contractTerms,
- });
-
- const newTxState = computePeerPushCreditTransactionState(rec);
-
- return {
- oldTxState: {
- major: TransactionMajorState.None,
- },
- newTxState,
- balanceEffect: BalanceEffect.Any,
- } satisfies TransitionInfo;
- },
- );
- notifyTransition(wex, ctx.transactionId, transitionInfo);
+ await recordCreate(ctx, {
+ extraStores: ["contractTerms"]
+ }, async (tx) => {
+ await tx.contractTerms.put({
+ h: contractTermsHash,
+ contractTermsRaw: dec.contractTerms,
+ });
+ return {
+ peerPushCreditId,
+ contractPriv: contractPriv,
+ exchangeBaseUrl: exchangeBaseUrl,
+ mergePriv: dec.mergePriv,
+ pursePub: pursePub,
+ timestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()),
+ contractTermsHash,
+ status: PeerPushCreditStatus.DialogProposed,
+ withdrawalGroupId,
+ currency: Amounts.currencyOf(purseStatus.balance),
+ estimatedAmountEffective: Amounts.stringify(
+ wi.withdrawalAmountEffective,
+ ),
+ };
+ });
wex.taskScheduler.startShepherdTask(ctx.taskId);
const currency = Amounts.currencyOf(wi.withdrawalAmountRaw);
@@ -736,10 +627,7 @@ async function longpollKycStatus(
kycStatusRes.status === HttpStatusCode.Ok ||
kycStatusRes.status === HttpStatusCode.NoContent
) {
- await ctx.transitionStatus(
- PeerPushCreditStatus.PendingMergeKycRequired,
- PeerPushCreditStatus.PendingMerge,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.PendingMergeKycRequired, PeerPushCreditStatus.PendingMerge);
return TaskRunResult.progress();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
// Access token / URL stays the same, just long-poll again.
@@ -849,10 +737,7 @@ async function handlePendingMerge(
amount: kycCheckRes.nextThreshold,
exchangeBaseUrl: peerInc.exchangeBaseUrl,
});
- await ctx.transitionStatus(
- PeerPushCreditStatus.PendingMerge,
- PeerPushCreditStatus.PendingBalanceKycInit,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.PendingMerge, PeerPushCreditStatus.PendingBalanceKycInit);
return TaskRunResult.progress();
}
@@ -919,7 +804,7 @@ async function handlePendingMerge(
case HttpStatusCode.Conflict: {
// FIXME: Check signature.
// FIXME: status completed by other
- await ctx.transitionStatus(
+ await recordTransitionStatus(ctx,
PeerPushCreditStatus.PendingMerge,
PeerPushCreditStatus.Aborted,
);
@@ -1091,10 +976,7 @@ async function processPeerPushDebitDialogProposed(
break;
case HttpStatusCode.Gone:
// Exchange says that purse doesn't exist anymore => expired!
- await ctx.transitionStatus(
- PeerPushCreditStatus.DialogProposed,
- PeerPushCreditStatus.Aborted,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.DialogProposed, PeerPushCreditStatus.Aborted);
return TaskRunResult.finished();
case HttpStatusCode.NotFound:
// FIXME: Maybe check error code? 404 could also mean something else.
@@ -1105,10 +987,7 @@ async function processPeerPushDebitDialogProposed(
if (isPurseMerged(resp.body)) {
logger.info("purse completed by another wallet");
- await ctx.transitionStatus(
- PeerPushCreditStatus.DialogProposed,
- PeerPushCreditStatus.Aborted,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.DialogProposed, PeerPushCreditStatus.Aborted);
return TaskRunResult.finished();
}
@@ -1230,16 +1109,13 @@ async function processPeerPushCreditBalanceKyc(
});
if (ret.result === "ok") {
- await ctx.transitionStatus(
- PeerPushCreditStatus.PendingBalanceKycRequired,
- PeerPushCreditStatus.PendingMerge,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.PendingBalanceKycRequired, PeerPushCreditStatus.PendingMerge);
return TaskRunResult.progress();
} else if (
peerInc.status === PeerPushCreditStatus.PendingBalanceKycInit &&
ret.walletKycStatus === ExchangeWalletKycStatus.Legi
) {
- await ctx.transition({}, async (rec) => {
+ await recordTransition(ctx, {}, async (rec) => {
if (rec.status === PeerPushCreditStatus.PendingBalanceKycInit) {
rec.status = PeerPushCreditStatus.PendingBalanceKycRequired;
rec.kycAccessToken = ret.walletKycAccessToken;
@@ -1304,11 +1180,7 @@ export async function confirmPeerPushCredit(
if (checkPeerCreditHardLimitExceeded(exchange, res.contractTerms.amount)) {
throw Error("peer credit would exceed hard KYC limit");
}
-
- await ctx.transitionStatus(
- PeerPushCreditStatus.DialogProposed,
- PeerPushCreditStatus.PendingMerge,
- );
+ await recordTransitionStatus(ctx, PeerPushCreditStatus.DialogProposed, PeerPushCreditStatus.PendingMerge);
wex.taskScheduler.stopShepherdTask(ctx.taskId);
wex.taskScheduler.startShepherdTask(ctx.taskId);
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -26,7 +26,6 @@ import {
InitiatePeerPushDebitRequest,
InitiatePeerPushDebitResponse,
Logger,
- NotificationType,
RefreshReason,
ScopeInfo,
ScopeType,
@@ -80,7 +79,6 @@ import {
PeerPushDebitStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadWriteTransaction,
- WalletDbStoresArr,
timestampPreciseFromDb,
timestampPreciseToDb,
timestampProtocolFromDb,
@@ -95,14 +93,16 @@ import {
getTotalPeerPaymentCostInTx,
isPurseMerged,
queryCoinInfosForSelection,
+ recordCreate,
+ recordDelete,
+ recordTransition,
+ recordTransitionStatus,
+ recordUpdateMeta,
} from "./pay-peer-common.js";
import { createRefreshGroup } from "./refresh.js";
import {
- BalanceEffect,
constructTransactionIdentifier,
isUnsuccessfulTransaction,
- notifyTransition,
- TransitionInfo,
} from "./transactions.js";
import { WalletExecutionContext } from "./wallet.js";
import { updateWithdrawalDenomsForCurrency } from "./withdraw.js";
@@ -127,115 +127,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
});
}
- /**
- * Transition an existing peer-pull-debit transaction.
- * Extra object stores may be accessed during the transition.
- */
- async transition<StoreNameArray extends WalletDbStoresArr>(
- opts: { extraStores?: StoreNameArray },
- lambda: (
- rec: PeerPushDebitRecord,
- tx: WalletDbReadWriteTransaction<
- [
- "peerPushDebit",
- "transactionsMeta",
- ...StoreNameArray,
- ]
- >,
- ) => Promise<TransitionResultType>,
- ): Promise<TransitionInfo | undefined> {
- const baseStores = [
- "peerPushDebit" as const,
- "transactionsMeta" as const
- ];
- const stores = opts.extraStores
- ? [...baseStores, ...opts.extraStores]
- : baseStores;
-
- let errorThrown: Error | undefined;
- const transitionInfo = await this.wex.db.runReadWriteTx(
- { storeNames: stores },
- async (tx) => {
- const rec = await tx.peerPushDebit.get(this.pursePub);
- if (rec == null) {
- logger.warn(`peer pull debit ${this.pursePub} not found`);
- return;
- }
- const oldTxState = computePeerPushDebitTransactionState(rec);
- let res: TransitionResultType;
- try {
- res = await lambda(rec, tx);
- } catch (error) {
- if (error instanceof Error) {
- errorThrown = error;
- }
- return undefined;
- }
-
- switch (res) {
- case TransitionResultType.Transition: {
- await tx.peerPushDebit.put(rec)
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: rec.status,
- timestamp: rec.timestampCreated,
- currency: Amounts.currencyOf(rec.amount),
- exchanges: [rec.exchangeBaseUrl],
- })
- const newTxState = computePeerPushDebitTransactionState(rec);
- return {
- oldTxState,
- newTxState,
- balanceEffect: BalanceEffect.Any,
- };
- }
- case TransitionResultType.Delete:
- throw new Error("Cannot delete using transition");
- case TransitionResultType.Stay:
- return;
- }
- },
- );
- if (errorThrown != null) {
- throw errorThrown;
- }
- notifyTransition(this.wex, this.transactionId, transitionInfo);
- return transitionInfo;
- }
-
- /**
- * Transition an existing peer-push-debit transaction status
- */
- async transitionStatus(
- from: PeerPushDebitStatus,
- to: PeerPushDebitStatus
- ) {
- await this.transition({}, async (rec) => {
- if (rec.status !== from) {
- return TransitionResultType.Stay
- } else {
- rec.status = to;
- return TransitionResultType.Transition
- }
- });
- }
-
- async updateTransactionMeta(
- tx: WalletDbReadWriteTransaction<["peerPushDebit", "transactionsMeta"]>,
- ): Promise<void> {
- const rec = await tx.peerPushDebit.get(this.pursePub);
- if (rec == null) {
- await tx.transactionsMeta.delete(this.transactionId);
- } else {
- await tx.transactionsMeta.put({
- transactionId: this.transactionId,
- status: rec.status,
- timestamp: rec.timestampCreated,
- currency: Amounts.currencyOf(rec.amount),
- exchanges: [rec.exchangeBaseUrl],
- });
- }
- }
+ readonly store = "peerPushDebit";
+ readonly recordId = this.pursePub
+ readonly recordState = computePeerPushDebitTransactionState
+ readonly recordMeta = (rec: PeerPushDebitRecord) => ({
+ transactionId: this.transactionId,
+ status: rec.status,
+ timestamp: rec.timestampCreated,
+ currency: Amounts.currencyOf(rec.amount),
+ exchanges: [rec.exchangeBaseUrl],
+ });
+ updateTransactionMeta = (tx: WalletDbReadWriteTransaction<["peerPushDebit", "transactionsMeta"]>) => recordUpdateMeta(this, tx)
/**
* Get the full transaction details for the transaction.
@@ -298,8 +200,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const res = await this.wex.db.runReadWriteTx(
- { storeNames: ["peerPushDebit", "tombstones", "transactionsMeta"] },
- async (tx) => this.deleteTransactionInTx(tx)
+ { storeNames: ["peerPushDebit", "transactionsMeta"] },
+ this.deleteTransactionInTx
);
for (const notif of res.notifs) {
this.wex.ws.notify(notif);
@@ -308,30 +210,14 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async deleteTransactionInTx(
tx: WalletDbReadWriteTransaction<
- ["peerPushDebit", "tombstones", "transactionsMeta"]
+ ["peerPushDebit", "transactionsMeta"]
>,
): Promise<{ notifs: WalletNotification[] }> {
- const notifs: WalletNotification[] = [];
- const rec = await tx.peerPushDebit.get(this.pursePub);
- if (!rec) {
- return { notifs };
- }
- const oldTxState = computePeerPushDebitTransactionState(rec);
- await tx.peerPushDebit.delete(rec.pursePub);
- await this.updateTransactionMeta(tx);
- notifs.push({
- type: NotificationType.TransactionStateTransition,
- transactionId: this.transactionId,
- oldTxState,
- newTxState: {
- major: TransactionMajorState.Deleted,
- },
- });
- return { notifs };
+ return recordDelete(this, tx);
}
async suspendTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushDebitStatus.PendingCreatePurse:
rec.status = PeerPushDebitStatus.SuspendedCreatePurse;
@@ -359,7 +245,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async abortTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushDebitStatus.PendingReady:
case PeerPushDebitStatus.SuspendedReady:
@@ -389,7 +275,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
rec.status = PeerPushDebitStatus.AbortingDeletePurse;
@@ -417,7 +303,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async failTransaction(reason?: TalerErrorDetail): Promise<void> {
- await this.transition({}, async (rec) => {
+ await recordTransition(this, {}, async (rec) => {
switch (rec.status) {
case PeerPushDebitStatus.AbortingDeletePurse:
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
@@ -608,7 +494,7 @@ async function handlePurseCreationConflict(
assertUnreachable(coinSelRes);
}
- await ctx.transition({}, async (rec) => {
+ await recordTransition(ctx, {}, async (rec) => {
switch (rec.status) {
case PeerPushDebitStatus.PendingCreatePurse:
case PeerPushDebitStatus.SuspendedCreatePurse: {
@@ -672,7 +558,7 @@ async function processPeerPushDebitCreateReserve(
}
let transitionDone = false
- await ctx.transition(
+ await recordTransition(ctx,
{
extraStores: [
"coinAvailability",
@@ -860,11 +746,11 @@ async function processPeerPushDebitCreateReserve(
logger.info(j2s(resp));
switch (resp.case) {
case "ok":
- await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady)
+ await recordTransitionStatus(ctx, PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady)
return TaskRunResult.progress()
case HttpStatusCode.Gone:
// FIXME we need PeerPushDebitStatus.ExpiredDeletePurse
- await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.AbortingDeletePurse)
+ await recordTransitionStatus(ctx, PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.AbortingDeletePurse)
return TaskRunResult.progress()
case HttpStatusCode.NotFound:
throw Error("peer push credit disappeared");
@@ -895,45 +781,46 @@ async function processPeerPushDebitAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- await ctx.transition({
- extraStores: [
- "coinAvailability",
- "coinHistory",
- "coins",
- "denominations",
- "refreshGroups",
- "refreshSessions",
- ]
- }, async (rec, tx) => {
- if (rec.status !== PeerPushDebitStatus.AbortingDeletePurse) {
- return TransitionResultType.Stay;
- }
- const currency = Amounts.currencyOf(rec.amount);
- const coinPubs: CoinRefreshRequest[] = [];
-
- if (!rec.coinSel) {
- return TransitionResultType.Stay;
- }
+ await recordTransition(ctx,
+ {
+ extraStores: [
+ "coinAvailability",
+ "coinHistory",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "refreshSessions",
+ ]
+ }, async (rec, tx) => {
+ if (rec.status !== PeerPushDebitStatus.AbortingDeletePurse) {
+ return TransitionResultType.Stay;
+ }
+ const currency = Amounts.currencyOf(rec.amount);
+ const coinPubs: CoinRefreshRequest[] = [];
- for (let i = 0; i < rec.coinSel.coinPubs.length; i++) {
- coinPubs.push({
- amount: rec.coinSel.contributions[i],
- coinPub: rec.coinSel.coinPubs[i],
- });
- }
- rec.status = PeerPushDebitStatus.Aborted;
+ if (!rec.coinSel) {
+ return TransitionResultType.Stay;
+ }
- const refresh = await createRefreshGroup(
- wex,
- tx,
- currency,
- coinPubs,
- RefreshReason.AbortPeerPushDebit,
- ctx.transactionId,
- );
- rec.abortRefreshGroupId = refresh.refreshGroupId;
- return TransitionResultType.Transition
- })
+ for (let i = 0; i < rec.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: rec.coinSel.contributions[i],
+ coinPub: rec.coinSel.coinPubs[i],
+ });
+ }
+ rec.status = PeerPushDebitStatus.Aborted;
+
+ const refresh = await createRefreshGroup(
+ wex,
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ ctx.transactionId,
+ );
+ rec.abortRefreshGroupId = refresh.refreshGroupId;
+ return TransitionResultType.Transition
+ })
return TaskRunResult.backoff();
}
@@ -953,20 +840,19 @@ async function processPeerPushDebitReady(
const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, wex.cancellationToken, {
timeout: 30000
});
- console.log(j2s("RESERVE"))
switch (resp.case) {
case "ok": {
if (!isPurseMerged(resp.body)) {
return TaskRunResult.longpollReturnedPending();
} else {
- await ctx.transitionStatus(PeerPushDebitStatus.PendingReady, PeerPushDebitStatus.Done);
+ await recordTransitionStatus(ctx, PeerPushDebitStatus.PendingReady, PeerPushDebitStatus.Done);
return TaskRunResult.progress();
}
}
case HttpStatusCode.Gone:
logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- await ctx.transition({
+ await recordTransition(ctx, {
extraStores: [
"coinAvailability",
"coinHistory",
@@ -1070,17 +956,16 @@ export async function initiatePeerPushDebit(
const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
- const transactionId = ctx.transactionId;
-
const contractEncNonce = encodeCrock(getRandomBytes(24));
const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms);
await updateWithdrawalDenomsForCurrency(wex, instructedAmount.currency);
- const res = await wex.db.runReadWriteTx(
+ let exchangeBaseUrl;
+ await recordCreate(ctx,
{
- storeNames: [
+ extraStores: [
"coinAvailability",
"coinHistory",
"coins",
@@ -1088,13 +973,11 @@ export async function initiatePeerPushDebit(
"denominations",
"exchangeDetails",
"exchanges",
- "peerPushDebit",
"refreshGroups",
"refreshSessions",
- "transactionsMeta",
"globalCurrencyExchanges",
"globalCurrencyAuditors",
- ],
+ ]
},
async (tx) => {
const coinSelRes = await selectPeerCoinsInTx(wex, tx, {
@@ -1175,27 +1058,14 @@ export async function initiatePeerPushDebit(
refreshReason: RefreshReason.PayPeerPush,
});
}
-
- await tx.peerPushDebit.add(ppi);
- await ctx.updateTransactionMeta(tx);
-
await tx.contractTerms.put({
h: hContractTerms,
contractTermsRaw: contractTerms,
});
-
- const newTxState = computePeerPushDebitTransactionState(ppi);
- return {
- transitionInfo: {
- oldTxState: { major: TransactionMajorState.None },
- newTxState,
- balanceEffect: BalanceEffect.Any,
- },
- exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl,
- };
+ exchangeBaseUrl = coinSelRes.result.exchangeBaseUrl;
+ return ppi;
},
);
- notifyTransition(wex, transactionId, res.transitionInfo);
wex.taskScheduler.startShepherdTask(ctx.taskId);
@@ -1203,7 +1073,7 @@ export async function initiatePeerPushDebit(
contractPriv: contractKeyPair.priv,
mergePriv: mergePair.priv,
pursePub: pursePair.pub,
- exchangeBaseUrl: res.exchangeBaseUrl,
+ exchangeBaseUrl: exchangeBaseUrl!,
transactionId: constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: pursePair.pub,
diff --git a/packages/taler-wallet-core/src/query.ts b/packages/taler-wallet-core/src/query.ts
@@ -547,7 +547,7 @@ type ValidateKeyPath<T, P> = P extends `${infer PX extends keyof T &
// foo({x: [0,1,2]}, "x.0");
-export type StoreMap = { [P in string]: StoreWithIndexes<any, any, any> }
+export type StoreMap = { [Store: string]: StoreWithIndexes<any, any, any> }
export type StoreNames<Stores extends StoreMap> = keyof Stores
export type DbReadWriteTransaction<
Stores extends StoreMap,