From 523280b3862b528512ff93c651bc0d9ed632fbf6 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 27 Feb 2024 17:39:58 +0100 Subject: wallet-core: thread through wallet execution context --- .../taler-wallet-core/src/pay-peer-push-debit.ts | 177 ++++++++++----------- 1 file changed, 83 insertions(+), 94 deletions(-) (limited to 'packages/taler-wallet-core/src/pay-peer-push-debit.ts') diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index 40a5d97a4..5ee4d642b 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -16,7 +16,6 @@ import { Amounts, - CancellationToken, CheckPeerPushDebitRequest, CheckPeerPushDebitResponse, CoinRefreshRequest, @@ -78,7 +77,7 @@ import { constructTransactionIdentifier, notifyTransition, } from "./transactions.js"; -import { InternalWalletState } from "./wallet.js"; +import { WalletExecutionContext } from "./wallet.js"; const logger = new Logger("pay-peer-push-debit.ts"); @@ -87,7 +86,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { readonly taskId: TaskIdStr; constructor( - public ws: InternalWalletState, + public wex: WalletExecutionContext, public pursePub: string, ) { this.transactionId = constructTransactionIdentifier({ @@ -101,8 +100,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async deleteTransaction(): Promise { - const { ws, pursePub, transactionId } = this; - await ws.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => { + const { wex, pursePub, transactionId } = this; + await wex.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => { const debit = await tx.peerPushDebit.get(pursePub); if (debit) { await tx.peerPushDebit.delete(pursePub); @@ -112,8 +111,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { ws, pursePub, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); @@ -165,13 +164,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { - const { ws, pursePub, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); @@ -218,14 +217,14 @@ export class PeerPushDebitTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise { - const { ws, pursePub, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); @@ -277,12 +276,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.startShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); } async failTransaction(): Promise { - const { ws, pursePub, transactionId, taskId: retryTag } = this; + const { wex: ws, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { @@ -337,14 +336,14 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } export async function checkPeerPushDebit( - ws: InternalWalletState, + wex: WalletExecutionContext, req: CheckPeerPushDebitRequest, ): Promise { const instructedAmount = Amounts.parseOrThrow(req.amount); logger.trace( `checking peer push debit for ${Amounts.stringify(instructedAmount)}`, ); - const coinSelRes = await selectPeerCoins(ws, { instructedAmount }); + const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); if (coinSelRes.type === "failure") { throw TalerError.fromDetail( TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, @@ -355,7 +354,7 @@ export async function checkPeerPushDebit( } logger.trace(`selected peer coins (len=${coinSelRes.result.coins.length})`); const totalAmount = await getTotalPeerPaymentCost( - ws, + wex, coinSelRes.result.coins, ); logger.trace("computed total peer payment cost"); @@ -368,13 +367,13 @@ export async function checkPeerPushDebit( } async function handlePurseCreationConflict( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, resp: HttpResponse, ): Promise { const pursePub = peerPushInitiation.pursePub; const errResp = await readTalerErrorResponse(resp); - const ctx = new PeerPushDebitTransactionContext(ws, pursePub); + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) { await ctx.failTransaction(); return TaskRunResult.finished(); @@ -405,7 +404,7 @@ async function handlePurseCreationConflict( } } - const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair }); + const coinSelRes = await selectPeerCoins(wex, { instructedAmount, repair }); if (coinSelRes.type == "failure") { // FIXME: Details! @@ -414,7 +413,7 @@ async function handlePurseCreationConflict( ); } - await ws.db.runReadWriteTx(["peerPushDebit"], async (tx) => { + await wex.db.runReadWriteTx(["peerPushDebit"], async (tx) => { const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); if (!myPpi) { return; @@ -438,19 +437,18 @@ async function handlePurseCreationConflict( } async function processPeerPushDebitCreateReserve( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, - cancellationToken: CancellationToken, ): Promise { const pursePub = peerPushInitiation.pursePub; const purseExpiration = peerPushInitiation.purseExpiration; const hContractTerms = peerPushInitiation.contractTermsHash; - const ctx = new PeerPushDebitTransactionContext(ws, pursePub); + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); const transactionId = ctx.transactionId; logger.trace(`processing ${transactionId} pending(create-reserve)`); - const contractTermsRecord = await ws.db.runReadOnlyTx( + const contractTermsRecord = await wex.db.runReadOnlyTx( ["contractTerms"], async (tx) => { return tx.contractTerms.get(hContractTerms); @@ -463,7 +461,7 @@ async function processPeerPushDebitCreateReserve( ); } - const purseSigResp = await ws.cryptoApi.signPurseCreation({ + const purseSigResp = await wex.cryptoApi.signPurseCreation({ hContractTerms, mergePub: peerPushInitiation.mergePub, minAge: 0, @@ -473,11 +471,11 @@ async function processPeerPushDebitCreateReserve( }); const coins = await queryCoinInfosForSelection( - ws, + wex, peerPushInitiation.coinSel, ); - const depositSigsResp = await ws.cryptoApi.signPurseDeposits({ + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, pursePub: peerPushInitiation.pursePub, coins, @@ -495,7 +493,7 @@ async function processPeerPushDebitCreateReserve( logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); - const econtractResp = await ws.cryptoApi.encryptContractForMerge( + const econtractResp = await wex.cryptoApi.encryptContractForMerge( encryptContractRequest, ); @@ -517,10 +515,10 @@ async function processPeerPushDebitCreateReserve( logger.trace(`request body: ${j2s(reqBody)}`); - const httpResp = await ws.http.fetch(createPurseUrl.href, { + const httpResp = await wex.http.fetch(createPurseUrl.href, { method: "POST", body: reqBody, - cancellationToken, + cancellationToken: wex.cancellationToken, }); { @@ -538,7 +536,7 @@ async function processPeerPushDebitCreateReserve( } case HttpStatusCode.Conflict: { // Handle double-spending - return handlePurseCreationConflict(ws, peerPushInitiation, httpResp); + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); } default: { const errResp = await readTalerErrorResponse(httpResp); @@ -554,7 +552,7 @@ async function processPeerPushDebitCreateReserve( throw Error("got error response from exchange"); } - await transitionPeerPushDebitTransaction(ws, pursePub, { + await transitionPeerPushDebitTransaction(wex, pursePub, { stFrom: PeerPushDebitStatus.PendingCreatePurse, stTo: PeerPushDebitStatus.PendingReady, }); @@ -563,9 +561,8 @@ async function processPeerPushDebitCreateReserve( } async function processPeerPushDebitAbortingDeletePurse( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, - cancellationToken: CancellationToken, ): Promise { const { pursePub, pursePriv } = peerPushInitiation; const transactionId = constructTransactionIdentifier({ @@ -573,23 +570,23 @@ async function processPeerPushDebitAbortingDeletePurse( pursePub, }); - const sigResp = await ws.cryptoApi.signDeletePurse({ + const sigResp = await wex.cryptoApi.signDeletePurse({ pursePriv, }); const purseUrl = new URL( `purses/${pursePub}`, peerPushInitiation.exchangeBaseUrl, ); - const resp = await ws.http.fetch(purseUrl.href, { + const resp = await wex.http.fetch(purseUrl.href, { method: "DELETE", headers: { "taler-purse-signature": sigResp.sig, }, - cancellationToken, + cancellationToken: wex.cancellationToken, }); logger.info(`deleted purse with response status ${resp.status}`); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( [ "peerPushDebit", "refreshGroups", @@ -617,7 +614,7 @@ async function processPeerPushDebitAbortingDeletePurse( } const refresh = await createRefreshGroup( - ws, + wex, tx, currency, coinPubs, @@ -634,7 +631,7 @@ async function processPeerPushDebitAbortingDeletePurse( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } @@ -645,7 +642,7 @@ interface SimpleTransition { } async function transitionPeerPushDebitTransaction( - ws: InternalWalletState, + wex: WalletExecutionContext, pursePub: string, transitionSpec: SimpleTransition, ): Promise { @@ -653,7 +650,7 @@ async function transitionPeerPushDebitTransaction( tag: TransactionType.PeerPushDebit, pursePub, }); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); @@ -673,11 +670,11 @@ async function transitionPeerPushDebitTransaction( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); } async function processPeerPushDebitAbortingRefreshDeleted( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const pursePub = peerPushInitiation.pursePub; @@ -687,7 +684,7 @@ async function processPeerPushDebitAbortingRefreshDeleted( tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "peerPushDebit"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); @@ -720,13 +717,13 @@ async function processPeerPushDebitAbortingRefreshDeleted( return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingRefreshExpired( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const pursePub = peerPushInitiation.pursePub; @@ -736,7 +733,7 @@ async function processPeerPushDebitAbortingRefreshExpired( tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit", "refreshGroups"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); @@ -769,7 +766,7 @@ async function processPeerPushDebitAbortingRefreshExpired( return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } @@ -778,9 +775,8 @@ async function processPeerPushDebitAbortingRefreshExpired( * Process the "pending(ready)" state of a peer-push-debit transaction. */ async function processPeerPushDebitReady( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, - cancellationToken: CancellationToken, ): Promise { logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; @@ -794,9 +790,9 @@ async function processPeerPushDebitReady( ); mergeUrl.searchParams.set("timeout_ms", "30000"); logger.info(`long-polling on purse status at ${mergeUrl.href}`); - const resp = await ws.http.fetch(mergeUrl.href, { + const resp = await wex.http.fetch(mergeUrl.href, { // timeout: getReserveRequestTimeout(withdrawalGroup), - cancellationToken, + cancellationToken: wex.cancellationToken, }); if (resp.status === HttpStatusCode.Ok) { const purseStatus = await readSuccessResponseJsonOrThrow( @@ -809,7 +805,7 @@ async function processPeerPushDebitReady( return TaskRunResult.backoff(); } else { await transitionPeerPushDebitTransaction( - ws, + wex, peerPushInitiation.pursePub, { stFrom: PeerPushDebitStatus.PendingReady, @@ -820,7 +816,7 @@ async function processPeerPushDebitReady( } } else if (resp.status === HttpStatusCode.Gone) { logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( [ "peerPushDebit", "refreshGroups", @@ -848,7 +844,7 @@ async function processPeerPushDebitReady( } const refresh = await createRefreshGroup( - ws, + wex, tx, currency, coinPubs, @@ -865,7 +861,7 @@ async function processPeerPushDebitReady( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } else { logger.warn(`unexpected HTTP status for purse: ${resp.status}`); @@ -874,11 +870,10 @@ async function processPeerPushDebitReady( } export async function processPeerPushDebit( - ws: InternalWalletState, + wex: WalletExecutionContext, pursePub: string, - cancellationToken: CancellationToken, ): Promise { - const peerPushInitiation = await ws.db.runReadOnlyTx( + const peerPushInitiation = await wex.db.runReadOnlyTx( ["peerPushDebit"], async (tx) => { return tx.peerPushDebit.get(pursePub); @@ -890,27 +885,21 @@ export async function processPeerPushDebit( switch (peerPushInitiation.status) { case PeerPushDebitStatus.PendingCreatePurse: - return processPeerPushDebitCreateReserve( - ws, - peerPushInitiation, - cancellationToken, - ); + return processPeerPushDebitCreateReserve(wex, peerPushInitiation); case PeerPushDebitStatus.PendingReady: - return processPeerPushDebitReady( - ws, - peerPushInitiation, - cancellationToken, - ); + return processPeerPushDebitReady(wex, peerPushInitiation); case PeerPushDebitStatus.AbortingDeletePurse: - return processPeerPushDebitAbortingDeletePurse( - ws, + return processPeerPushDebitAbortingDeletePurse(wex, peerPushInitiation); + case PeerPushDebitStatus.AbortingRefreshDeleted: + return processPeerPushDebitAbortingRefreshDeleted( + wex, peerPushInitiation, - cancellationToken, ); - case PeerPushDebitStatus.AbortingRefreshDeleted: - return processPeerPushDebitAbortingRefreshDeleted(ws, peerPushInitiation); case PeerPushDebitStatus.AbortingRefreshExpired: - return processPeerPushDebitAbortingRefreshExpired(ws, peerPushInitiation); + return processPeerPushDebitAbortingRefreshExpired( + wex, + peerPushInitiation, + ); default: { const txState = computePeerPushDebitTransactionState(peerPushInitiation); logger.warn( @@ -926,7 +915,7 @@ export async function processPeerPushDebit( * Initiate sending a peer-to-peer push payment. */ export async function initiatePeerPushDebit( - ws: InternalWalletState, + wex: WalletExecutionContext, req: InitiatePeerPushDebitRequest, ): Promise { const instructedAmount = Amounts.parseOrThrow( @@ -935,14 +924,14 @@ export async function initiatePeerPushDebit( const purseExpiration = req.partialContractTerms.purse_expiration; const contractTerms = req.partialContractTerms; - const pursePair = await ws.cryptoApi.createEddsaKeypair({}); - const mergePair = await ws.cryptoApi.createEddsaKeypair({}); + const pursePair = await wex.cryptoApi.createEddsaKeypair({}); + const mergePair = await wex.cryptoApi.createEddsaKeypair({}); const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms); - const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({}); + const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({}); - const coinSelRes = await selectPeerCoins(ws, { instructedAmount }); + const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); if (coinSelRes.type !== "success") { throw TalerError.fromDetail( @@ -959,7 +948,7 @@ export async function initiatePeerPushDebit( logger.trace(`${j2s(coinSelRes)}`); const totalAmount = await getTotalPeerPaymentCost( - ws, + wex, coinSelRes.result.coins, ); @@ -967,13 +956,13 @@ export async function initiatePeerPushDebit( const pursePub = pursePair.pub; - const ctx = new PeerPushDebitTransactionContext(ws, pursePub); + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); const transactionId = ctx.transactionId; const contractEncNonce = encodeCrock(getRandomBytes(24)); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( [ "exchanges", "contractTerms", @@ -987,7 +976,7 @@ export async function initiatePeerPushDebit( // FIXME: Instead of directly doing a spendCoin here, // we might want to mark the coins as used and spend them // after we've been able to create the purse. - await spendCoins(ws, tx, { + await spendCoins(wex, tx, { allocationId: constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: pursePair.pub, @@ -1034,13 +1023,13 @@ export async function initiatePeerPushDebit( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.notify({ + notifyTransition(wex, transactionId, transitionInfo); + wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); - ws.taskScheduler.startShepherdTask(ctx.taskId); + wex.taskScheduler.startShepherdTask(ctx.taskId); return { contractPriv: contractKeyPair.priv, -- cgit v1.2.3