From 523280b3862b528512ff93c651bc0d9ed632fbf6 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 27 Feb 2024 17:39:58 +0100 Subject: wallet-core: thread through wallet execution context --- .../taler-wallet-core/src/pay-peer-pull-credit.ts | 157 +++++++++------------ 1 file changed, 70 insertions(+), 87 deletions(-) (limited to 'packages/taler-wallet-core/src/pay-peer-pull-credit.ts') diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index 7774dfd5f..c999a8d1f 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -17,7 +17,6 @@ import { AbsoluteTime, Amounts, - CancellationToken, CheckPeerPullCreditRequest, CheckPeerPullCreditResponse, ContractTermsUtil, @@ -81,7 +80,7 @@ import { constructTransactionIdentifier, notifyTransition, } from "./transactions.js"; -import { InternalWalletState } from "./wallet.js"; +import { WalletExecutionContext } from "./wallet.js"; import { getExchangeWithdrawalInfo, internalCreateWithdrawalGroup, @@ -94,7 +93,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { readonly taskId: TaskIdStr; constructor( - public ws: InternalWalletState, + public wex: WalletExecutionContext, public pursePub: string, ) { this.taskId = constructTaskIdentifier({ @@ -108,7 +107,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async deleteTransaction(): Promise { - const { ws, pursePub } = this; + const { wex: ws, pursePub } = this; await ws.db.runReadWriteTx( ["withdrawalGroups", "peerPullCredit", "tombstones"], async (tx) => { @@ -138,7 +137,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { ws, pursePub, taskId: retryTag, transactionId } = this; + const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -198,7 +197,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async failTransaction(): Promise { - const { ws, pursePub, taskId: retryTag, transactionId } = this; + const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -249,7 +248,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async resumeTransaction(): Promise { - const { ws, pursePub, taskId: retryTag, transactionId } = this; + const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -308,7 +307,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async abortTransaction(): Promise { - const { ws, pursePub, taskId: retryTag, transactionId } = this; + const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { @@ -364,9 +363,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async function queryPurseForPeerPullCredit( - ws: InternalWalletState, + wex: WalletExecutionContext, pullIni: PeerPullCreditRecord, - cancellationToken: CancellationToken, ): Promise { const purseDepositUrl = new URL( `purses/${pullIni.pursePub}/deposit`, @@ -374,9 +372,9 @@ async function queryPurseForPeerPullCredit( ); purseDepositUrl.searchParams.set("timeout_ms", "30000"); logger.info(`querying purse status via ${purseDepositUrl.href}`); - const resp = await ws.http.fetch(purseDepositUrl.href, { + const resp = await wex.http.fetch(purseDepositUrl.href, { timeout: { d_ms: 60000 }, - cancellationToken, + cancellationToken: wex.cancellationToken, }); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -388,7 +386,7 @@ async function queryPurseForPeerPullCredit( switch (resp.status) { case HttpStatusCode.Gone: { // Exchange says that purse doesn't exist anymore => expired! - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const finPi = await tx.peerPullCredit.get(pullIni.pursePub); @@ -405,7 +403,7 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } case HttpStatusCode.NotFound: @@ -427,7 +425,7 @@ async function queryPurseForPeerPullCredit( return TaskRunResult.backoff(); } - const reserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => { + const reserve = await wex.db.runReadOnlyTx(["reserves"], async (tx) => { return await tx.reserves.get(pullIni.mergeReserveRowId); }); @@ -435,7 +433,7 @@ async function queryPurseForPeerPullCredit( throw Error("reserve for peer pull credit not found in wallet DB"); } - await internalCreateWithdrawalGroup(ws, { + await internalCreateWithdrawalGroup(wex, { amount: Amounts.parseOrThrow(pullIni.amount), wgInfo: { withdrawalType: WithdrawalRecordType.PeerPullCredit, @@ -449,7 +447,7 @@ async function queryPurseForPeerPullCredit( pub: reserve.reservePub, }, }); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const finPi = await tx.peerPullCredit.get(pullIni.pursePub); @@ -466,17 +464,16 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } async function longpollKycStatus( - ws: InternalWalletState, + wex: WalletExecutionContext, pursePub: string, exchangeUrl: string, kycInfo: KycPendingInfo, userType: KycUserType, - cancellationToken: CancellationToken, ): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -488,9 +485,9 @@ async function longpollKycStatus( ); url.searchParams.set("timeout_ms", "10000"); logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { + const kycStatusRes = await wex.http.fetch(url.href, { method: "GET", - cancellationToken, + cancellationToken: wex.cancellationToken, }); if ( kycStatusRes.status === HttpStatusCode.Ok || @@ -498,7 +495,7 @@ async function longpollKycStatus( // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const peerIni = await tx.peerPullCredit.get(pursePub); @@ -517,7 +514,7 @@ async function longpollKycStatus( return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.progress(); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { return TaskRunResult.longpollReturnedPending(); @@ -527,9 +524,8 @@ async function longpollKycStatus( } async function processPeerPullCreditAbortingDeletePurse( - ws: InternalWalletState, + wex: WalletExecutionContext, peerPullIni: PeerPullCreditRecord, - cancellationToken: CancellationToken, ): Promise { const { pursePub, pursePriv } = peerPullIni; const transactionId = constructTransactionIdentifier({ @@ -537,20 +533,20 @@ async function processPeerPullCreditAbortingDeletePurse( pursePub, }); - const sigResp = await ws.cryptoApi.signDeletePurse({ + const sigResp = await wex.cryptoApi.signDeletePurse({ pursePriv, }); const purseUrl = new URL(`purses/${pursePub}`, peerPullIni.exchangeBaseUrl); - const resp = await ws.http.fetch(purseUrl.href, { + const resp = await wex.http.fetch(purseUrl.href, { method: "DELETE", headers: { "taler-purse-signature": sigResp.sig, }, - cancellationToken, + cancellationToken: wex.cancellationToken, }); logger.info(`deleted purse with response status ${resp.status}`); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( [ "peerPullCredit", "refreshGroups", @@ -576,13 +572,13 @@ async function processPeerPullCreditAbortingDeletePurse( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } async function handlePeerPullCreditWithdrawing( - ws: InternalWalletState, + wex: WalletExecutionContext, pullIni: PeerPullCreditRecord, ): Promise { if (!pullIni.withdrawalGroupId) { @@ -594,7 +590,7 @@ async function handlePeerPullCreditWithdrawing( }); const wgId = pullIni.withdrawalGroupId; let finished: boolean = false; - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit", "withdrawalGroups"], async (tx) => { const ppi = await tx.peerPullCredit.get(pullIni.pursePub); @@ -627,7 +623,7 @@ async function handlePeerPullCreditWithdrawing( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); if (finished) { return TaskRunResult.finished(); } else { @@ -637,13 +633,12 @@ async function handlePeerPullCreditWithdrawing( } async function handlePeerPullCreditCreatePurse( - ws: InternalWalletState, + wex: WalletExecutionContext, pullIni: PeerPullCreditRecord, - cancellationToken: CancellationToken, ): Promise { const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount)); const pursePub = pullIni.pursePub; - const mergeReserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => { + const mergeReserve = await wex.db.runReadOnlyTx(["reserves"], async (tx) => { return tx.reserves.get(pullIni.mergeReserveRowId); }); @@ -651,7 +646,7 @@ async function handlePeerPullCreditCreatePurse( throw Error("merge reserve for peer pull payment not found in database"); } - const contractTermsRecord = await ws.db.runReadOnlyTx( + const contractTermsRecord = await wex.db.runReadOnlyTx( ["contractTerms"], async (tx) => { return tx.contractTerms.get(pullIni.contractTermsHash); @@ -669,7 +664,7 @@ async function handlePeerPullCreditCreatePurse( mergeReserve.reservePub, ); - const econtractResp = await ws.cryptoApi.encryptContractForDeposit({ + const econtractResp = await wex.cryptoApi.encryptContractForDeposit({ contractPriv: pullIni.contractPriv, contractPub: pullIni.contractPub, contractTerms: contractTermsRecord.contractTermsRaw, @@ -681,7 +676,7 @@ async function handlePeerPullCreditCreatePurse( const mergeTimestamp = timestampPreciseFromDb(pullIni.mergeTimestamp); const purseExpiration = contractTerms.purse_expiration; - const sigRes = await ws.cryptoApi.signReservePurseCreate({ + const sigRes = await wex.cryptoApi.signReservePurseCreate({ contractTermsHash: pullIni.contractTermsHash, flags: WalletAccountMergeFlags.CreateWithPurseFee, mergePriv: pullIni.mergePriv, @@ -717,22 +712,17 @@ async function handlePeerPullCreditCreatePurse( pullIni.exchangeBaseUrl, ); - const httpResp = await ws.http.fetch(reservePurseMergeUrl.href, { + const httpResp = await wex.http.fetch(reservePurseMergeUrl.href, { method: "POST", body: reservePurseReqBody, - cancellationToken, + cancellationToken: wex.cancellationToken, }); if (httpResp.status === HttpStatusCode.UnavailableForLegalReasons) { const respJson = await httpResp.json(); const kycPending = codecForWalletKycUuid().decode(respJson); logger.info(`kyc uuid response: ${j2s(kycPending)}`); - return processPeerPullCreditKycRequired( - ws, - pullIni, - kycPending, - cancellationToken, - ); + return processPeerPullCreditKycRequired(wex, pullIni, kycPending); } const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); @@ -744,7 +734,7 @@ async function handlePeerPullCreditCreatePurse( pursePub: pullIni.pursePub, }); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const pi2 = await tx.peerPullCredit.get(pursePub); @@ -758,16 +748,15 @@ async function handlePeerPullCreditCreatePurse( return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } export async function processPeerPullCredit( - ws: InternalWalletState, + wex: WalletExecutionContext, pursePub: string, - cancellationToken: CancellationToken, ): Promise { - const pullIni = await ws.db.runReadOnlyTx(["peerPullCredit"], async (tx) => { + const pullIni = await wex.db.runReadOnlyTx(["peerPullCredit"], async (tx) => { return tx.peerPullCredit.get(pursePub); }); if (!pullIni) { @@ -786,30 +775,25 @@ export async function processPeerPullCredit( return TaskRunResult.finished(); } case PeerPullPaymentCreditStatus.PendingReady: - return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken); + return queryPurseForPeerPullCredit(wex, pullIni); case PeerPullPaymentCreditStatus.PendingMergeKycRequired: { if (!pullIni.kycInfo) { throw Error("invalid state, kycInfo required"); } return await longpollKycStatus( - ws, + wex, pursePub, pullIni.exchangeBaseUrl, pullIni.kycInfo, "individual", - cancellationToken, ); } case PeerPullPaymentCreditStatus.PendingCreatePurse: - return handlePeerPullCreditCreatePurse(ws, pullIni, cancellationToken); + return handlePeerPullCreditCreatePurse(wex, pullIni); case PeerPullPaymentCreditStatus.AbortingDeletePurse: - return await processPeerPullCreditAbortingDeletePurse( - ws, - pullIni, - cancellationToken, - ); + return await processPeerPullCreditAbortingDeletePurse(wex, pullIni); case PeerPullPaymentCreditStatus.PendingWithdrawing: - return handlePeerPullCreditWithdrawing(ws, pullIni); + return handlePeerPullCreditWithdrawing(wex, pullIni); case PeerPullPaymentCreditStatus.Aborted: case PeerPullPaymentCreditStatus.Failed: case PeerPullPaymentCreditStatus.Expired: @@ -827,10 +811,9 @@ export async function processPeerPullCredit( } async function processPeerPullCreditKycRequired( - ws: InternalWalletState, + wex: WalletExecutionContext, peerIni: PeerPullCreditRecord, kycPending: WalletKycUuid, - cancellationToken: CancellationToken, ): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -845,9 +828,9 @@ async function processPeerPullCreditKycRequired( ); logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { + const kycStatusRes = await wex.http.fetch(url.href, { method: "GET", - cancellationToken, + cancellationToken: wex.cancellationToken, }); if ( @@ -861,7 +844,7 @@ async function processPeerPullCreditKycRequired( } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); - const { transitionInfo, result } = await ws.db.runReadWriteTx( + const { transitionInfo, result } = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const peerInc = await tx.peerPullCredit.get(pursePub); @@ -897,7 +880,7 @@ async function processPeerPullCreditKycRequired( }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } else { throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); @@ -908,7 +891,7 @@ async function processPeerPullCreditKycRequired( * Check fees and available exchanges for a peer push payment initiation. */ export async function checkPeerPullPaymentInitiation( - ws: InternalWalletState, + wex: WalletExecutionContext, req: CheckPeerPullCreditRequest, ): Promise { // FIXME: We don't support exchanges with purse fees yet. @@ -922,7 +905,7 @@ export async function checkPeerPullPaymentInitiation( if (req.exchangeBaseUrl) { exchangeUrl = req.exchangeBaseUrl; } else { - exchangeUrl = await getPreferredExchangeForCurrency(ws, currency); + exchangeUrl = await getPreferredExchangeForCurrency(wex, currency); } if (!exchangeUrl) { @@ -932,7 +915,7 @@ export async function checkPeerPullPaymentInitiation( logger.trace(`found ${exchangeUrl} as preferred exchange`); const wi = await getExchangeWithdrawalInfo( - ws, + wex, exchangeUrl, Amounts.parseOrThrow(req.amount), undefined, @@ -957,12 +940,12 @@ export async function checkPeerPullPaymentInitiation( * Find a preferred exchange based on when we withdrew last from this exchange. */ async function getPreferredExchangeForCurrency( - ws: InternalWalletState, + wex: WalletExecutionContext, currency: string, ): Promise { // Find an exchange with the matching currency. // Prefer exchanges with the most recent withdrawal. - const url = await ws.db.runReadOnlyTx(["exchanges"], async (tx) => { + const url = await wex.db.runReadOnlyTx(["exchanges"], async (tx) => { const exchanges = await tx.exchanges.iter().toArray(); let candidate = undefined; for (const e of exchanges) { @@ -1005,7 +988,7 @@ async function getPreferredExchangeForCurrency( * Initiate a peer pull payment. */ export async function initiatePeerPullPayment( - ws: InternalWalletState, + wex: WalletExecutionContext, req: InitiatePeerPullCreditRequest, ): Promise { const currency = Amounts.currencyOf(req.partialContractTerms.amount); @@ -1013,7 +996,7 @@ export async function initiatePeerPullPayment( if (req.exchangeBaseUrl) { maybeExchangeBaseUrl = req.exchangeBaseUrl; } else { - maybeExchangeBaseUrl = await getPreferredExchangeForCurrency(ws, currency); + maybeExchangeBaseUrl = await getPreferredExchangeForCurrency(wex, currency); } if (!maybeExchangeBaseUrl) { @@ -1022,20 +1005,20 @@ export async function initiatePeerPullPayment( const exchangeBaseUrl = maybeExchangeBaseUrl; - await fetchFreshExchange(ws, exchangeBaseUrl); + await fetchFreshExchange(wex, exchangeBaseUrl); - const mergeReserveInfo = await getMergeReserveInfo(ws, { + const mergeReserveInfo = await getMergeReserveInfo(wex, { exchangeBaseUrl: exchangeBaseUrl, }); - const pursePair = await ws.cryptoApi.createEddsaKeypair({}); - const mergePair = await ws.cryptoApi.createEddsaKeypair({}); + const pursePair = await wex.cryptoApi.createEddsaKeypair({}); + const mergePair = await wex.cryptoApi.createEddsaKeypair({}); const contractTerms = req.partialContractTerms; const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms); - const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({}); + const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({}); const withdrawalGroupId = encodeCrock(getRandomBytes(32)); @@ -1045,7 +1028,7 @@ export async function initiatePeerPullPayment( const contractEncNonce = encodeCrock(getRandomBytes(24)); const wi = await getExchangeWithdrawalInfo( - ws, + wex, exchangeBaseUrl, Amounts.parseOrThrow(req.partialContractTerms.amount), undefined, @@ -1053,7 +1036,7 @@ export async function initiatePeerPullPayment( const mergeTimestamp = TalerPreciseTimestamp.now(); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit", "contractTerms"], async (tx) => { const ppi: PeerPullCreditRecord = { @@ -1086,16 +1069,16 @@ export async function initiatePeerPullPayment( }, ); - const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub); + const ctx = new PeerPullCreditTransactionContext(wex, pursePair.pub); // The pending-incoming balance has changed. - ws.notify({ + wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: ctx.transactionId, }); - notifyTransition(ws, ctx.transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(ctx.taskId); + notifyTransition(wex, ctx.transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(ctx.taskId); return { talerUri: stringifyTalerUri({ -- cgit v1.2.3