diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts | 167 |
1 files changed, 68 insertions, 99 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts index e655eba4b..cc41abde9 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts @@ -33,6 +33,7 @@ import { TalerProtocolTimestamp, TalerUriAction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -62,17 +63,15 @@ import { timestampPreciseToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant } from "../util/invariants.js"; import { - LongpollResult, TaskRunResult, TaskRunResultType, TombstoneTag, TransactionContext, constructTaskIdentifier, - runLongpollAsync, } from "./common.js"; import { codecForExchangePurseStatus, @@ -81,7 +80,6 @@ import { import { constructTransactionIdentifier, notifyTransition, - stopLongpolling, } from "./transactions.js"; import { getExchangeWithdrawalInfo, @@ -91,8 +89,8 @@ import { const logger = new Logger("pay-peer-pull-credit.ts"); export class PeerPullCreditTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly retryTag: TaskId; constructor( public ws: InternalWalletState, @@ -139,7 +137,6 @@ export class PeerPullCreditTransactionContext implements TransactionContext { async suspendTransaction(): Promise<void> { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -193,12 +190,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise<void> { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -244,11 +241,11 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.stopShepherdTask(retryTag); } async resumeTransaction(): Promise<void> { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -301,13 +298,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async abortTransaction(): Promise<void> { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -355,7 +351,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -363,7 +361,7 @@ async function queryPurseForPeerPullCredit( ws: InternalWalletState, pullIni: PeerPullCreditRecord, cancellationToken: CancellationToken, -): Promise<LongpollResult> { +): Promise<TaskRunResult> { const purseDepositUrl = new URL( `purses/${pullIni.pursePub}/deposit`, pullIni.exchangeBaseUrl, @@ -401,10 +399,10 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; + return TaskRunResult.backoff(); } case HttpStatusCode.NotFound: - return { ready: false }; + return TaskRunResult.backoff(); } const result = await readSuccessResponseJsonOrThrow( @@ -418,7 +416,7 @@ async function queryPurseForPeerPullCredit( if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) { logger.info("purse not ready yet (no deposit)"); - return { ready: false }; + return TaskRunResult.backoff(); } const reserve = await ws.db @@ -462,9 +460,7 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; + return TaskRunResult.backoff(); } async function longpollKycStatus( @@ -473,6 +469,7 @@ async function longpollKycStatus( exchangeUrl: string, kycInfo: KycPendingInfo, userType: KycUserType, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -483,56 +480,47 @@ async function longpollKycStatus( pursePub, }); - runLongpollAsync(ws, retryTag, async (ct) => { - const url = new URL( - `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, - exchangeUrl, - ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken: ct, - }); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { - const peerIni = await tx.peerPullCredit.get(pursePub); - if (!peerIni) { - return; - } - if ( - peerIni.status !== - PeerPullPaymentCreditStatus.PendingMergeKycRequired - ) { - return; - } - const oldTxState = computePeerPullCreditTransactionState(peerIni); - peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse; - const newTxState = computePeerPullCreditTransactionState(peerIni); - await tx.peerPullCredit.put(peerIni); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - // FIXME: Do we have to update the URL here? - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + exchangeUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return { - type: TaskRunResultType.Longpoll, - }; + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.peerPullCredit]) + .runReadWrite(async (tx) => { + const peerIni = await tx.peerPullCredit.get(pursePub); + if (!peerIni) { + return; + } + if ( + peerIni.status !== PeerPullPaymentCreditStatus.PendingMergeKycRequired + ) { + return; + } + const oldTxState = computePeerPullCreditTransactionState(peerIni); + peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse; + const newTxState = computePeerPullCreditTransactionState(peerIni); + await tx.peerPullCredit.put(peerIni); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } async function processPeerPullCreditAbortingDeletePurse( @@ -584,7 +572,7 @@ async function processPeerPullCreditAbortingDeletePurse( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function handlePeerPullCreditWithdrawing( @@ -637,7 +625,7 @@ async function handlePeerPullCreditWithdrawing( return TaskRunResult.finished(); } else { // FIXME: Return indicator that we depend on the other operation! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -757,13 +745,13 @@ async function handlePeerPullCreditCreatePurse( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } export async function processPeerPullCredit( ws: InternalWalletState, pursePub: string, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const pullIni = await ws.db .mktx((x) => [x.peerPullCredit]) @@ -779,14 +767,6 @@ export async function processPeerPullCredit( pursePub, }); - // We're already running! - if (ws.activeLongpoll[retryTag]) { - logger.info("peer-pull-credit already in long-polling, returning!"); - return { - type: TaskRunResultType.Longpoll, - }; - } - logger.trace(`processing ${retryTag}, status=${pullIni.status}`); switch (pullIni.status) { @@ -794,15 +774,7 @@ export async function processPeerPullCredit( return TaskRunResult.finished(); } case PeerPullPaymentCreditStatus.PendingReady: - runLongpollAsync(ws, retryTag, async (cancellationToken) => - queryPurseForPeerPullCredit(ws, pullIni, cancellationToken), - ); - logger.trace( - "returning early from processPeerPullCredit for long-polling in background", - ); - return { - type: TaskRunResultType.Longpoll, - }; + return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken); case PeerPullPaymentCreditStatus.PendingMergeKycRequired: { if (!pullIni.kycInfo) { throw Error("invalid state, kycInfo required"); @@ -813,6 +785,7 @@ export async function processPeerPullCredit( pullIni.exchangeBaseUrl, pullIni.kycInfo, "individual", + cancellationToken, ); } case PeerPullPaymentCreditStatus.PendingCreatePurse: @@ -866,7 +839,7 @@ async function processPeerPullCreditKycRequired( kycStatusRes.status === HttpStatusCode.NoContent ) { logger.warn("kyc requested, but already fulfilled"); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); @@ -906,7 +879,7 @@ async function processPeerPullCreditKycRequired( }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } else { throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); } @@ -1095,20 +1068,16 @@ export async function initiatePeerPullPayment( return { oldTxState, newTxState }; }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.PeerPullCredit, - pursePub: pursePair.pub, - }); + const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub); // The pending-incoming balance has changed. ws.notify({ type: NotificationType.BalanceChange, - hintTransactionId: transactionId, + hintTransactionId: ctx.transactionId, }); - notifyTransition(ws, transactionId, transitionInfo); - - ws.workAvailable.trigger(); + notifyTransition(ws, ctx.transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(ctx.retryTag); return { talerUri: stringifyTalerUri({ @@ -1116,7 +1085,7 @@ export async function initiatePeerPullPayment( exchangeBaseUrl: exchangeBaseUrl, contractPriv: contractKeyPair.priv, }), - transactionId, + transactionId: ctx.transactionId, }; } |