From 862ac9a16aa891c26355f9ad5858283c3aa029d6 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 19 Feb 2024 21:13:00 +0100 Subject: wallet-core: safer long-polling We now wait for some time if long-polling returns too early --- packages/taler-wallet-core/src/common.ts | 15 ++++++++ packages/taler-wallet-core/src/shepherd.ts | 57 +++++++++++++++++++----------- packages/taler-wallet-core/src/withdraw.ts | 28 ++++++++++----- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 9d7f2e763..45351f680 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -350,6 +350,7 @@ export enum TaskRunResultType { Backoff = "backoff", Progress = "progress", Error = "error", + LongpollReturnedPending = "longpoll-returned-pending", ScheduleLater = "schedule-later", } @@ -358,6 +359,7 @@ export type TaskRunResult = | TaskRunErrorResult | TaskRunBackoffResult | TaskRunProgressResult + | TaskRunLongpollReturnedPendingResult | TaskRunScheduleLaterResult; export namespace TaskRunResult { @@ -396,6 +398,15 @@ export namespace TaskRunResult { runAt, }; } + /** + * Longpolling returned, but what we're waiting for + * is still pending on the other side. + */ + export function longpollReturnedPending(): TaskRunLongpollReturnedPendingResult { + return { + type: TaskRunResultType.LongpollReturnedPending, + }; + } } export interface TaskRunFinishedResult { @@ -415,6 +426,10 @@ export interface TaskRunScheduleLaterResult { runAt: AbsoluteTime; } +export interface TaskRunLongpollReturnedPendingResult { + type: TaskRunResultType.LongpollReturnedPending; +} + export interface TaskRunErrorResult { type: TaskRunResultType.Error; errorDetail: TalerErrorDetail; diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index d6fc604e8..0639b7976 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -227,6 +227,18 @@ export class TaskScheduler { this.startShepherdTask(taskId); } + private async wait( + taskId: TaskId, + info: ShepherdInfo, + delay: Duration, + ): Promise { + try { + await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } + private async internalShepherdTask( taskId: TaskId, info: ShepherdInfo, @@ -250,6 +262,7 @@ export class TaskScheduler { Duration.fromSpec({ seconds: 60 }), ); } + const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { @@ -273,13 +286,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -292,13 +299,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -314,17 +315,27 @@ export class TaskScheduler { logger.trace(`Shepherd for ${taskId} got schedule-later result.`); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); return; + case TaskRunResultType.LongpollReturnedPending: { + // Make sure that we are waiting a bit if long-polling returned too early. + const endTime = AbsoluteTime.now(); + const taskDuration = AbsoluteTime.difference(endTime, startTime); + if ( + Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 + ) { + logger.info( + `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, + ); + await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); + } else { + logger.info(`task ${taskId} will long-poll again`); + } + break; + } default: assertUnreachable(res); } @@ -435,6 +446,10 @@ async function runTaskWithErrorReporting( case TaskRunResultType.Progress: await storeTaskProgress(ws, opId); return resp; + case TaskRunResultType.LongpollReturnedPending: + // Longpoll should be run again immediately. + await storeTaskProgress(ws, opId); + return resp; } } catch (e) { if (e instanceof CryptoApiStoppedError) { diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 9cf1ad36d..bfcf23588 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -911,6 +911,7 @@ async function processPlanchetExchangeBatchRequest( ws: InternalWalletState, wgContext: WithdrawalGroupContext, args: WithdrawalRequestBatchArgs, + cancellationToken: CancellationToken, ): Promise { const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( @@ -997,6 +998,8 @@ async function processPlanchetExchangeBatchRequest( const resp = await ws.http.fetch(reqUrl, { method: "POST", body: batchReq, + cancellationToken, + timeout: Duration.fromSpec({ seconds: 40 }), }); if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs); @@ -1300,7 +1303,7 @@ async function processQueryReserve( `got reserve status error, EC=${result.talerErrorResponse.code}`, ); if (resp.status === HttpStatusCode.NotFound) { - return TaskRunResult.backoff(); + return TaskRunResult.longpollReturnedPending(); } else { throwUnexpectedRequestError(resp, result.talerErrorResponse); } @@ -1491,6 +1494,7 @@ async function processWithdrawalGroupPendingKyc( async function processWithdrawalGroupPendingReady( ws: InternalWalletState, withdrawalGroup: WithdrawalGroupRecord, + cancellationToken: CancellationToken, ): Promise { const { withdrawalGroupId } = withdrawalGroup; const transactionId = constructTransactionIdentifier({ @@ -1553,10 +1557,15 @@ async function processWithdrawalGroupPendingReady( const maxBatchSize = 100; for (let i = 0; i < numTotalCoins; i += maxBatchSize) { - const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, { - batchSize: maxBatchSize, - coinStartIndex: i, - }); + const resp = await processPlanchetExchangeBatchRequest( + ws, + wgContext, + { + batchSize: maxBatchSize, + coinStartIndex: i, + }, + cancellationToken, + ); let work: Promise[] = []; work = []; for (let j = 0; j < resp.coinIdxs.length; j++) { @@ -1688,7 +1697,11 @@ export async function processWithdrawalGroup( ); case WithdrawalGroupStatus.PendingReady: // Continue with the actual withdrawal! - return await processWithdrawalGroupPendingReady(ws, withdrawalGroup); + return await processWithdrawalGroupPendingReady( + ws, + withdrawalGroup, + cancellationToken, + ); case WithdrawalGroupStatus.AbortingBank: return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup); case WithdrawalGroupStatus.AbortedBank: @@ -2265,8 +2278,7 @@ async function processReserveBankStatus( } if (!status.transfer_done) { - // FIXME: This is a long-poll result - return TaskRunResult.backoff(); + return TaskRunResult.longpollReturnedPending(); } const transitionInfo = await ws.db.runReadWriteTx( -- cgit v1.2.3