diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/deposits.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/deposits.ts | 140 |
1 files changed, 66 insertions, 74 deletions
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 3619ac4f4..38b5d43f0 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -48,6 +48,7 @@ import { TalerProtocolTimestamp, TrackTransaction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -75,6 +76,7 @@ import { KycPendingInfo, PendingTaskType, RefreshOperationStatus, + TaskId, createRefreshGroup, getCandidateWithdrawalDenomsTx, getTotalRefreshCost, @@ -90,7 +92,6 @@ import { TombstoneTag, TransactionContext, constructTaskIdentifier, - runLongpollAsync, spendCoins, } from "./common.js"; import { getExchangeWireDetailsInTx } from "./exchanges.js"; @@ -103,7 +104,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; /** @@ -112,8 +112,8 @@ import { const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, public depositGroupId: string, @@ -122,7 +122,7 @@ export class DepositTransactionContext implements TransactionContext { tag: TransactionType.Deposit, depositGroupId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId, }); @@ -148,7 +148,7 @@ export class DepositTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -185,12 +185,12 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; }); - stopLongpolling(ws, retryTag); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise<void> { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -219,14 +219,13 @@ export class DepositTransactionContext implements TransactionContext { } return undefined; }); - stopLongpolling(ws, retryTag); - // Need to process the operation again. - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise<void> { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -263,12 +262,12 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise<void> { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -294,7 +293,7 @@ export class DepositTransactionContext implements TransactionContext { return undefined; }); // FIXME: Also cancel ongoing work (via cancellation token, once implemented) - stopLongpolling(ws, retryTag); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } } @@ -453,7 +452,7 @@ async function waitForRefreshOnDepositGroup( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function refundDepositGroup( @@ -568,7 +567,7 @@ async function refundDepositGroup( await tx.depositGroups.put(newDg); }); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processDepositGroupAborting( @@ -588,6 +587,7 @@ async function processDepositGroupAborting( async function processDepositGroupPendingKyc( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { depositGroupId } = depositGroup; const transactionId = constructTransactionIdentifier({ @@ -606,51 +606,45 @@ async function processDepositGroupPendingKyc( throw Error("invalid DB state, in pending(kyc), but no kycInfo present"); } - runLongpollAsync(ws, retryTag, async (ct) => { - const url = new URL( - `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, - kycInfo.exchangeBaseUrl, - ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken: ct, - }); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const newDg = await tx.depositGroups.get(depositGroupId); - if (!newDg) { - return; - } - if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { - return; - } - const oldTxState = computeDepositTransactionStatus(newDg); - newDg.operationStatus = DepositOperationStatus.PendingTrack; - const newTxState = computeDepositTransactionStatus(newDg); - await tx.depositGroups.put(newDg); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - // FIXME: Do we have to update the URL here? - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + kycInfo.exchangeBaseUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return TaskRunResult.longpoll(); + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.depositGroups]) + .runReadWrite(async (tx) => { + const newDg = await tx.depositGroups.get(depositGroupId); + if (!newDg) { + return; + } + if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = DepositOperationStatus.PendingTrack; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } /** @@ -682,7 +676,7 @@ async function transitionToKycRequired( }); if (kycStatusReq.status === HttpStatusCode.Ok) { logger.warn("kyc requested, but already fulfilled"); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); @@ -864,7 +858,7 @@ async function processDepositGroupPendingTrack( return TaskRunResult.finished(); } else { // FIXME: Use long-polling. - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -993,7 +987,7 @@ async function processDepositGroupPendingDeposit( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } /** @@ -1002,9 +996,7 @@ async function processDepositGroupPendingDeposit( export async function processDepositGroup( ws: InternalWalletState, depositGroupId: string, - options: { - cancellationToken?: CancellationToken; - } = {}, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const depositGroup = await ws.db .mktx((x) => [x.depositGroups]) @@ -1021,15 +1013,15 @@ export async function processDepositGroup( return processDepositGroupPendingTrack( ws, depositGroup, - options.cancellationToken, + cancellationToken, ); case DepositOperationStatus.PendingKyc: - return processDepositGroupPendingKyc(ws, depositGroup); + return processDepositGroupPendingKyc(ws, depositGroup, cancellationToken); case DepositOperationStatus.PendingDeposit: return processDepositGroupPendingDeposit( ws, depositGroup, - options.cancellationToken, + cancellationToken, ); case DepositOperationStatus.Aborting: return processDepositGroupAborting(ws, depositGroup); @@ -1393,10 +1385,8 @@ export async function createDepositGroup( operationStatus: DepositOperationStatus.PendingDeposit, }; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId, - }); + const ctx = new DepositTransactionContext(ws, depositGroupId); + const transactionId = ctx.transactionId; const newTxState = await ws.db .mktx((x) => [ @@ -1439,6 +1429,8 @@ export async function createDepositGroup( hintTransactionId: transactionId, }); + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { depositGroupId, transactionId, |