diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-merchant.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pay-merchant.ts | 552 |
1 files changed, 309 insertions, 243 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 52f9c70b1..e00432bd0 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -77,6 +77,7 @@ import { TalerProtocolViolationError, TalerUriAction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -102,12 +103,14 @@ import { WalletStoresV1, } from "../db.js"; import { + AsyncFlag, getCandidateWithdrawalDenomsTx, PendingTaskType, RefundGroupRecord, RefundGroupStatus, RefundItemRecord, RefundItemStatus, + TaskId, timestampPreciseToDb, timestampProtocolFromDb, timestampProtocolToDb, @@ -128,8 +131,6 @@ import { import { constructTaskIdentifier, DbRetryInfo, - runLongpollAsync, - runTaskWithErrorReporting, spendCoins, TaskIdentifiers, TaskRunResult, @@ -147,7 +148,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; /** @@ -156,8 +156,8 @@ import { const logger = new Logger("pay-merchant.ts"); export class PayMerchantTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, @@ -167,7 +167,7 @@ export class PayMerchantTransactionContext implements TransactionContext { tag: TransactionType.Payment, proposalId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId, }); @@ -252,7 +252,7 @@ export class PayMerchantTransactionContext implements TransactionContext { async suspendTransaction(): Promise<void> { const { ws, proposalId, transactionId } = this; - stopLongpolling(ws, this.retryTag); + ws.taskScheduler.stopShepherdTask(this.taskId); const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -270,7 +270,6 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); } async abortTransaction(): Promise<void> { @@ -330,18 +329,18 @@ export class PayMerchantTransactionContext implements TransactionContext { break; } await tx.purchases.put(purchase); - await tx.operationRetries.delete(this.retryTag); + await tx.operationRetries.delete(this.taskId); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; }, ); + ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(this.taskId); } async resumeTransaction(): Promise<void> { - const { ws, proposalId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); + const { ws, proposalId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -358,9 +357,8 @@ export class PayMerchantTransactionContext implements TransactionContext { const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise<void> { @@ -394,7 +392,7 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(this.taskId); } } @@ -638,14 +636,18 @@ async function processDownloadProposal( return TaskRunResult.finished(); } + const ctx = new PayMerchantTransactionContext(ws, proposalId); + if (proposal.purchaseStatus != PurchaseStatus.PendingDownloadingProposal) { + logger.error( + `unexpected state ${proposal.purchaseStatus}/${ + PurchaseStatus[proposal.purchaseStatus] + } for ${ctx.transactionId} in processDownloadProposal`, + ); return TaskRunResult.finished(); } - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const transactionId = ctx.transactionId; const orderClaimUrl = new URL( `orders/${proposal.orderId}/claim`, @@ -857,7 +859,7 @@ async function processDownloadProposal( notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } /** @@ -865,7 +867,7 @@ async function processDownloadProposal( * record for the provided arguments already exists, * return the old proposal ID. */ -async function createPurchase( +async function createOrReusePurchase( ws: InternalWalletState, merchantBaseUrl: string, orderId: string, @@ -889,23 +891,26 @@ async function createPurchase( p.claimToken === claimToken ); }); - /* If we have already claimed this proposal with the same sessionId - * nonce and claim token, reuse it. */ + // If we have already claimed this proposal with the same sessionId + // nonce and claim token, reuse it. */ if ( oldProposal && oldProposal.downloadSessionId === sessionId && (!noncePriv || oldProposal.noncePriv === noncePriv) && oldProposal.claimToken === claimToken ) { - // FIXME: This lacks proper error handling - await processDownloadProposal(ws, oldProposal.proposalId); - + logger.info( + `Found old proposal (status=${ + PurchaseStatus[oldProposal.purchaseStatus] + }) for order ${orderId} at ${merchantBaseUrl}`, + ); if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) { const download = await expectProposalDownload(ws, oldProposal); const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + logger.info(`old proposal paid: ${paid}`); if (paid) { - //if this transaction was shared and the order is paid then it - //means that another wallet already paid the proposal + // if this transaction was shared and the order is paid then it + // means that another wallet already paid the proposal const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -990,8 +995,6 @@ async function createPurchase( proposalId, }); notifyTransition(ws, transactionId, transitionInfo); - - await processDownloadProposal(ws, proposalId); return proposalId; } @@ -1244,11 +1247,10 @@ async function handleInsufficientFunds( }); } -// FIXME: Should probably not be exported in its current state // FIXME: Should take a transaction ID instead of a proposal ID // FIXME: Does way more than checking the payment // FIXME: Should return immediately. -export async function checkPaymentByProposalId( +async function checkPaymentByProposalId( ws: InternalWalletState, proposalId: string, sessionId?: string, @@ -1284,10 +1286,9 @@ export async function checkPaymentByProposalId( proposalId = proposal.proposalId; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + const transactionId = ctx.transactionId; const talerUri = stringifyTalerUri({ type: TalerUriAction.Pay, @@ -1377,12 +1378,12 @@ export async function checkPaymentByProposalId( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - // FIXME: What about error handling?! This doesn't properly store errors in the DB. - const r = await processPurchasePay(ws, proposalId); - if (r.type !== TaskRunResultType.Finished) { - // FIXME: This does not surface the original error - throw Error("submitting pay failed"); - } + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Consider changing the API here so that we don't have to + // wait inline for the repurchase. + + await waitPaymentResult(ws, proposalId, sessionId); const download = await expectProposalDownload(ws, purchase); return { status: PreparePayResultType.AlreadyConfirmed, @@ -1476,7 +1477,7 @@ export async function preparePayForUri( ); } - const proposalId = await createPurchase( + const proposalId = await createOrReusePurchase( ws, uriResult.merchantBaseUrl, uriResult.orderId, @@ -1485,9 +1486,79 @@ export async function preparePayForUri( uriResult.noncePriv, ); + await waitProposalDownloaded(ws, proposalId); + return checkPaymentByProposalId(ws, proposalId, uriResult.sessionId); } +/** + * Wait until a proposal is at least downloaded. + */ +async function waitProposalDownloaded( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + logger.info(`waiting for ${ctx.transactionId} to be downloaded`); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: We should use Symbol.dispose magic here for cleanup! + + const payNotifFlag = new AsyncFlag(); + // Raise exchangeNotifFlag whenever we get a notification + // about our exchange. + const cancelNotif = ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + logger.info(`raising update notification: ${j2s(notif)}`); + payNotifFlag.raise(); + } + }); + + try { + await internalWaitProposalDownloaded(ctx, payNotifFlag); + logger.info(`done waiting for ${ctx.transactionId} to be downloaded`); + } finally { + cancelNotif(); + } +} + +async function internalWaitProposalDownloaded( + ctx: PayMerchantTransactionContext, + payNotifFlag: AsyncFlag, +): Promise<void> { + while (true) { + const { purchase, retryInfo } = await ctx.ws.db.runReadOnlyTx( + ["purchases", "operationRetries"], + async (tx) => { + return { + purchase: await tx.purchases.get(ctx.proposalId), + retryInfo: await tx.operationRetries.get(ctx.taskId), + }; + }, + ); + if (!purchase) { + throw Error("purchase does not exist anymore"); + } + if (purchase.download) { + return; + } + if (retryInfo) { + if (retryInfo.lastError) { + throw TalerError.fromUncheckedDetail(retryInfo.lastError); + } else { + throw Error("transient error while waiting for proposal download"); + } + } + await payNotifFlag.wait(); + payNotifFlag.reset(); + } +} + export async function preparePayForTemplate( ws: InternalWalletState, req: PreparePayTemplateRequest, @@ -1598,71 +1669,101 @@ export async function generateDepositPermissions( return depositPermissions; } -/** - * Run the operation handler for a payment - * and return the result as a {@link ConfirmPayResult}. - */ -async function runPayForConfirmPay( - ws: InternalWalletState, - proposalId: string, +async function internalWaitPaymentResult( + ctx: PayMerchantTransactionContext, + purchaseNotifFlag: AsyncFlag, + waitSessionId?: string, ): Promise<ConfirmPayResult> { - logger.trace("processing proposal for confirmPay"); - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Purchase, - proposalId, - }); - const res = await runTaskWithErrorReporting(ws, taskId, async () => { - return await processPurchasePay(ws, proposalId); - }); - logger.trace(`processPurchasePay response type ${res.type}`); - switch (res.type) { - case TaskRunResultType.Finished: { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); - if (!purchase) { - throw Error("purchase record not available anymore"); + while (true) { + const txRes = await ctx.ws.db.runReadOnlyTx( + ["purchases", "operationRetries"], + async (tx) => { + const purchase = await tx.purchases.get(ctx.proposalId); + const retryRecord = await tx.operationRetries.get(ctx.taskId); + return { purchase, retryRecord }; + }, + ); + + if (!txRes.purchase) { + throw Error("purchase gone"); + } + + const purchase = txRes.purchase; + + logger.info( + `purchase is in state ${PurchaseStatus[purchase.purchaseStatus]}`, + ); + + const d = await expectProposalDownload(ctx.ws, purchase); + + if (txRes.purchase.timestampFirstSuccessfulPay) { + if ( + waitSessionId == null || + txRes.purchase.lastSessionId === waitSessionId + ) { + return { + type: ConfirmPayResultType.Done, + contractTerms: d.contractTermsRaw, + transactionId: ctx.transactionId, + }; } - const d = await expectProposalDownload(ws, purchase); - return { - type: ConfirmPayResultType.Done, - contractTerms: d.contractTermsRaw, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), - }; } - case TaskRunResultType.Error: { - // We hide transient errors from the caller. - const opRetry = await ws.db - .mktx((x) => [x.operationRetries]) - .runReadOnly(async (tx) => tx.operationRetries.get(taskId)); + + if (txRes.retryRecord) { return { type: ConfirmPayResultType.Pending, - lastError: opRetry?.lastError, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), + lastError: txRes.retryRecord.lastError, + transactionId: ctx.transactionId, }; } - case TaskRunResultType.Pending: - logger.trace("reporting pending as confirmPay response"); - return { - type: ConfirmPayResultType.Pending, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), - lastError: undefined, - }; - case TaskRunResultType.Longpoll: - throw Error("unexpected processPurchasePay result (longpoll)"); - default: - assertUnreachable(res); + + await purchaseNotifFlag.wait(); + purchaseNotifFlag.reset(); + } +} + +/** + * Wait until either: + * a) the payment succeeded (if provided under the {@param waitSessionId}), or + * b) the attempt to pay failed (merchant unavailable, etc.) + */ +async function waitPaymentResult( + ws: InternalWalletState, + proposalId: string, + waitSessionId?: string, +): Promise<ConfirmPayResult> { + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + ws.ensureTaskLoopRunning(); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const purchaseNotifFlag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our purchase. + const cancelNotif = ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + purchaseNotifFlag.raise(); + } + }); + + try { + logger.info(`waiting for first payment success on ${ctx.transactionId}`); + const res = await internalWaitPaymentResult( + ctx, + purchaseNotifFlag, + waitSessionId, + ); + logger.info( + `done waiting for first payment success on ${ctx.transactionId}, result ${res.type}`, + ); + return res; + } finally { + cancelNotif(); } } @@ -1719,7 +1820,12 @@ export async function confirmPay( if (existingPurchase && existingPurchase.payInfo) { logger.trace("confirmPay: submitting payment for existing purchase"); - return runPayForConfirmPay(ws, proposalId); + const ctx = new PayMerchantTransactionContext( + ws, + existingPurchase.proposalId, + ); + await ws.taskScheduler.resetTaskRetries(ctx.taskId); + return waitPaymentResult(ws, proposalId); } logger.trace("confirmPay: purchase record does not exist yet"); @@ -1817,9 +1923,8 @@ export async function confirmPay( hintTransactionId: transactionId, }); - // We directly make a first attempt to pay. - // FIXME: In the future we should just wait for the right event - return runPayForConfirmPay(ws, proposalId); + // Wait until we have completed the first attempt to pay. + return waitPaymentResult(ws, proposalId); } export async function processPurchase( @@ -2017,7 +2122,7 @@ async function processPurchasePay( // FIXME: Should we really consider this to be pending? - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -2076,7 +2181,7 @@ async function processPurchasePay( await storePayReplaySuccess(ws, proposalId, sessionId); } - return TaskRunResult.finished(); + return TaskRunResult.progress(); } export async function refuseProposal( @@ -2365,7 +2470,7 @@ export async function sharePayment( p.purchaseStatus !== PurchaseStatus.DialogProposed && p.purchaseStatus !== PurchaseStatus.DialogShared ) { - //FIXME: purchase can be shared before being paid + // FIXME: purchase can be shared before being paid return undefined; } if (p.purchaseStatus === PurchaseStatus.DialogProposed) { @@ -2426,57 +2531,37 @@ async function processPurchaseDialogShared( ): Promise<TaskRunResult> { const proposalId = purchase.proposalId; logger.trace(`processing dialog-shared for proposal ${proposalId}`); - - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Purchase, - proposalId, - }); - - // FIXME: Put this logic into runLongpollAsync? - if (ws.activeLongpoll[taskId]) { - return TaskRunResult.longpoll(); - } const download = await expectProposalDownload(ws, purchase); if (purchase.purchaseStatus !== PurchaseStatus.DialogShared) { return TaskRunResult.finished(); } - runLongpollAsync(ws, taskId, async (ct) => { - const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); - if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.FailedClaim; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, + const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + if (paid) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.FailedClaim; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; }); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId, + }); - notifyTransition(ws, transactionId, transitionInfo); - - return { - ready: true, - }; - } - - return { - ready: false, - }; - }); + notifyTransition(ws, transactionId, transitionInfo); + } - return TaskRunResult.longpoll(); + return TaskRunResult.backoff(); } async function processPurchaseAutoRefund( @@ -2496,97 +2581,81 @@ async function processPurchaseAutoRefund( proposalId, }); - // FIXME: Put this logic into runLongpollAsync? - if (ws.activeLongpoll[taskId]) { - return TaskRunResult.longpoll(); - } - const download = await expectProposalDownload(ws, purchase); - runLongpollAsync(ws, taskId, async (ct) => { - if ( - !purchase.autoRefundDeadline || - AbsoluteTime.isExpired( - AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(purchase.autoRefundDeadline), - ), - ) - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) { - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.Done; - p.refundAmountAwaiting = undefined; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } + if ( + !purchase.autoRefundDeadline || + AbsoluteTime.isExpired( + AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(purchase.autoRefundDeadline), + ), + ) + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) { + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.Done; + p.refundAmountAwaiting = undefined; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.finished(); + } - const requestUrl = new URL( - `orders/${download.contractData.orderId}`, - download.contractData.merchantBaseUrl, - ); - requestUrl.searchParams.set( - "h_contract", - download.contractData.contractTermsHash, - ); + const requestUrl = new URL( + `orders/${download.contractData.orderId}`, + download.contractData.merchantBaseUrl, + ); + requestUrl.searchParams.set( + "h_contract", + download.contractData.contractTermsHash, + ); - requestUrl.searchParams.set("timeout_ms", "1000"); - requestUrl.searchParams.set("await_refund_obtained", "yes"); + requestUrl.searchParams.set("timeout_ms", "1000"); + requestUrl.searchParams.set("await_refund_obtained", "yes"); - const resp = await ws.http.fetch(requestUrl.href); + const resp = await ws.http.fetch(requestUrl.href); - // FIXME: Check other status codes! + // FIXME: Check other status codes! - const orderStatus = await readSuccessResponseJsonOrThrow( - resp, - codecForMerchantOrderStatusPaid(), - ); + const orderStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForMerchantOrderStatusPaid(), + ); - if (orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) { - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.PendingAcceptRefund; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } else { - return { - ready: false, - }; - } - }); + if (orderStatus.refund_pending) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) { + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.PendingAcceptRefund; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } - return TaskRunResult.longpoll(); + return TaskRunResult.backoff(); } async function processPurchaseAbortingRefund( @@ -2734,7 +2803,7 @@ async function processPurchaseQueryRefund( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } else { const refundAwaiting = Amounts.sub( Amounts.parseOrThrow(orderStatus.refund_amount), @@ -2760,7 +2829,7 @@ async function processPurchaseQueryRefund( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } } @@ -2836,10 +2905,7 @@ export async function startQueryRefund( ws: InternalWalletState, proposalId: string, ): Promise<void> { - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const ctx = new PayMerchantTransactionContext(ws, proposalId); const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -2857,8 +2923,8 @@ export async function startQueryRefund( await tx.purchases.put(p); return { oldTxState, newTxState }; }); - notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + notifyTransition(ws, ctx.transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(ctx.taskId); } async function computeRefreshRequest( @@ -3128,10 +3194,10 @@ async function storeRefunds( notifyTransition(ws, transactionId, result.transitionInfo); if (result.numPendingItemsTotal > 0) { - return TaskRunResult.pending(); + return TaskRunResult.backoff(); + } else { + return TaskRunResult.progress(); } - - return TaskRunResult.finished(); } export function computeRefundTransactionState( |