diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts | 231 |
1 files changed, 104 insertions, 127 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts index 2e5af4e78..165c8deee 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts @@ -16,6 +16,7 @@ import { Amounts, + CancellationToken, CheckPeerPushDebitRequest, CheckPeerPushDebitResponse, CoinRefreshRequest, @@ -32,6 +33,7 @@ import { TalerProtocolTimestamp, TalerProtocolViolationError, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -56,7 +58,7 @@ import { timestampProtocolToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { PeerCoinRepair, selectPeerCoins } from "../util/coinSelection.js"; import { checkLogicInvariant } from "../util/invariants.js"; @@ -65,7 +67,6 @@ import { TaskRunResultType, TransactionContext, constructTaskIdentifier, - runLongpollAsync, spendCoins, } from "./common.js"; import { @@ -76,14 +77,13 @@ import { import { constructTransactionIdentifier, notifyTransition, - stopLongpolling, } from "./transactions.js"; const logger = new Logger("pay-peer-push-debit.ts"); export class PeerPushDebitTransactionContext implements TransactionContext { - public transactionId: string; - public retryTag: string; + readonly transactionId: TransactionIdStr; + readonly retryTag: TaskId; constructor( public ws: InternalWalletState, @@ -114,7 +114,6 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async suspendTransaction(): Promise<void> { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -166,12 +165,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise<void> { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -218,12 +217,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise<void> { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -275,13 +275,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise<void> { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -328,7 +327,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -432,7 +433,7 @@ async function handlePurseCreationConflict( } await tx.peerPushDebit.put(myPpi); }); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } async function processPeerPushDebitCreateReserve( @@ -554,7 +555,7 @@ async function processPeerPushDebitCreateReserve( stTo: PeerPushDebitStatus.PendingReady, }); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingDeletePurse( @@ -628,7 +629,7 @@ async function processPeerPushDebitAbortingDeletePurse( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } interface SimpleTransition { @@ -712,7 +713,7 @@ async function processPeerPushDebitAbortingRefreshDeleted( }); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingRefreshExpired( @@ -760,7 +761,7 @@ async function processPeerPushDebitAbortingRefreshExpired( }); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } /** @@ -769,118 +770,102 @@ async function processPeerPushDebitAbortingRefreshExpired( async function processPeerPushDebitReady( ws: InternalWalletState, peerPushInitiation: PeerPushDebitRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; - const retryTag = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); const transactionId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub, }); - runLongpollAsync(ws, retryTag, async (ct) => { - const mergeUrl = new URL( - `purses/${pursePub}/merge`, - peerPushInitiation.exchangeBaseUrl, + const mergeUrl = new URL( + `purses/${pursePub}/merge`, + peerPushInitiation.exchangeBaseUrl, + ); + mergeUrl.searchParams.set("timeout_ms", "30000"); + logger.info(`long-polling on purse status at ${mergeUrl.href}`); + const resp = await ws.http.fetch(mergeUrl.href, { + // timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken, + }); + if (resp.status === HttpStatusCode.Ok) { + const purseStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForExchangePurseStatus(), ); - mergeUrl.searchParams.set("timeout_ms", "30000"); - logger.info(`long-polling on purse status at ${mergeUrl.href}`); - const resp = await ws.http.fetch(mergeUrl.href, { - // timeout: getReserveRequestTimeout(withdrawalGroup), - cancellationToken: ct, - }); - if (resp.status === HttpStatusCode.Ok) { - const purseStatus = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), + const mergeTimestamp = purseStatus.merge_timestamp; + logger.info(`got purse status ${j2s(purseStatus)}`); + if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { + return TaskRunResult.backoff(); + } else { + await transitionPeerPushDebitTransaction( + ws, + peerPushInitiation.pursePub, + { + stFrom: PeerPushDebitStatus.PendingReady, + stTo: PeerPushDebitStatus.Done, + }, ); - const mergeTimestamp = purseStatus.merge_timestamp; - logger.info(`got purse status ${j2s(purseStatus)}`); - if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { - return { ready: false }; - } else { - await transitionPeerPushDebitTransaction( + return TaskRunResult.finished(); + } + } else if (resp.status === HttpStatusCode.Gone) { + logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); + const transitionInfo = await ws.db + .mktx((x) => [ + x.peerPushDebit, + x.refreshGroups, + x.denominations, + x.coinAvailability, + x.coins, + ]) + .runReadWrite(async (tx) => { + const ppiRec = await tx.peerPushDebit.get(pursePub); + if (!ppiRec) { + return undefined; + } + if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { + return undefined; + } + const currency = Amounts.currencyOf(ppiRec.amount); + const oldTxState = computePeerPushDebitTransactionState(ppiRec); + const coinPubs: CoinRefreshRequest[] = []; + + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: ppiRec.coinSel.contributions[i], + coinPub: ppiRec.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( ws, - peerPushInitiation.pursePub, - { - stFrom: PeerPushDebitStatus.PendingReady, - stTo: PeerPushDebitStatus.Done, - }, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + transactionId, ); + ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; + ppiRec.abortRefreshGroupId = refresh.refreshGroupId; + await tx.peerPushDebit.put(ppiRec); + const newTxState = computePeerPushDebitTransactionState(ppiRec); return { - ready: true, + oldTxState, + newTxState, }; - } - } else if (resp.status === HttpStatusCode.Gone) { - logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - const transitionInfo = await ws.db - .mktx((x) => [ - x.peerPushDebit, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - ]) - .runReadWrite(async (tx) => { - const ppiRec = await tx.peerPushDebit.get(pursePub); - if (!ppiRec) { - return undefined; - } - if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { - return undefined; - } - const currency = Amounts.currencyOf(ppiRec.amount); - const oldTxState = computePeerPushDebitTransactionState(ppiRec); - const coinPubs: CoinRefreshRequest[] = []; - - for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: ppiRec.coinSel.contributions[i], - coinPub: ppiRec.coinSel.coinPubs[i], - }); - } - - const refresh = await createRefreshGroup( - ws, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - transactionId, - ); - ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; - ppiRec.abortRefreshGroupId = refresh.refreshGroupId; - await tx.peerPushDebit.put(ppiRec); - const newTxState = computePeerPushDebitTransactionState(ppiRec); - return { - oldTxState, - newTxState, - }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } else { - logger.warn(`unexpected HTTP status for purse: ${resp.status}`); - return { - ready: false, - }; - } - }); - logger.trace( - "returning early from peer-push-debit for long-polling in background", - ); - return { - type: TaskRunResultType.Longpoll, - }; + }); + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.backoff(); + } else { + logger.warn(`unexpected HTTP status for purse: ${resp.status}`); + return TaskRunResult.backoff(); + } } export async function processPeerPushDebit( ws: InternalWalletState, pursePub: string, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const peerPushInitiation = await ws.db .mktx((x) => [x.peerPushDebit]) @@ -891,24 +876,15 @@ export async function processPeerPushDebit( throw Error("peer push payment not found"); } - const retryTag = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); - - // We're already running! - if (ws.activeLongpoll[retryTag]) { - logger.info("peer-push-debit task already in long-polling, returning!"); - return { - type: TaskRunResultType.Longpoll, - }; - } - switch (peerPushInitiation.status) { case PeerPushDebitStatus.PendingCreatePurse: return processPeerPushDebitCreateReserve(ws, peerPushInitiation); case PeerPushDebitStatus.PendingReady: - return processPeerPushDebitReady(ws, peerPushInitiation); + return processPeerPushDebitReady( + ws, + peerPushInitiation, + cancellationToken, + ); case PeerPushDebitStatus.AbortingDeletePurse: return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation); case PeerPushDebitStatus.AbortingRefreshDeleted: @@ -971,10 +947,9 @@ export async function initiatePeerPushDebit( const pursePub = pursePair.pub; - const transactionId = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); + const ctx = new PeerPushDebitTransactionContext(ws, pursePub); + + const transactionId = ctx.transactionId; const contractEncNonce = encodeCrock(getRandomBytes(24)); @@ -1044,6 +1019,8 @@ export async function initiatePeerPushDebit( hintTransactionId: transactionId, }); + ws.taskScheduler.startShepherdTask(ctx.retryTag); + return { contractPriv: contractKeyPair.priv, mergePriv: mergePair.priv, |