From 862ac9a16aa891c26355f9ad5858283c3aa029d6 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 19 Feb 2024 21:13:00 +0100 Subject: wallet-core: safer long-polling We now wait for some time if long-polling returns too early --- packages/taler-wallet-core/src/shepherd.ts | 57 +++++++++++++++++++----------- 1 file changed, 36 insertions(+), 21 deletions(-) (limited to 'packages/taler-wallet-core/src/shepherd.ts') diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index d6fc604e8..0639b7976 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -227,6 +227,18 @@ export class TaskScheduler { this.startShepherdTask(taskId); } + private async wait( + taskId: TaskId, + info: ShepherdInfo, + delay: Duration, + ): Promise { + try { + await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } + private async internalShepherdTask( taskId: TaskId, info: ShepherdInfo, @@ -250,6 +262,7 @@ export class TaskScheduler { Duration.fromSpec({ seconds: 60 }), ); } + const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { @@ -273,13 +286,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -292,13 +299,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -314,17 +315,27 @@ export class TaskScheduler { logger.trace(`Shepherd for ${taskId} got schedule-later result.`); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); return; + case TaskRunResultType.LongpollReturnedPending: { + // Make sure that we are waiting a bit if long-polling returned too early. + const endTime = AbsoluteTime.now(); + const taskDuration = AbsoluteTime.difference(endTime, startTime); + if ( + Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 + ) { + logger.info( + `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, + ); + await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); + } else { + logger.info(`task ${taskId} will long-poll again`); + } + break; + } default: assertUnreachable(res); } @@ -435,6 +446,10 @@ async function runTaskWithErrorReporting( case TaskRunResultType.Progress: await storeTaskProgress(ws, opId); return resp; + case TaskRunResultType.LongpollReturnedPending: + // Longpoll should be run again immediately. + await storeTaskProgress(ws, opId); + return resp; } } catch (e) { if (e instanceof CryptoApiStoppedError) { -- cgit v1.2.3