summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-19 21:13:00 +0100
committerFlorian Dold <florian@dold.me>2024-02-19 21:13:00 +0100
commit862ac9a16aa891c26355f9ad5858283c3aa029d6 (patch)
treef8b4244463a8bba73e277a6113fe306b3a3658a7 /packages/taler-wallet-core/src/shepherd.ts
parent20397e3fba3fe4b274354047f76e3a8f3a92d6b8 (diff)
downloadwallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.tar.gz
wallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.tar.bz2
wallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.zip
wallet-core: safer long-polling
We now wait for some time if long-polling returns too early
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts57
1 files changed, 36 insertions, 21 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index d6fc604e8..0639b7976 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -227,6 +227,18 @@ export class TaskScheduler {
this.startShepherdTask(taskId);
}
+ private async wait(
+ taskId: TaskId,
+ info: ShepherdInfo,
+ delay: Duration,
+ ): Promise<void> {
+ try {
+ await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ }
+
private async internalShepherdTask(
taskId: TaskId,
info: ShepherdInfo,
@@ -250,6 +262,7 @@ export class TaskScheduler {
Duration.fromSpec({ seconds: 60 }),
);
}
+ const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
@@ -273,13 +286,7 @@ export class TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
@@ -292,13 +299,7 @@ export class TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
@@ -314,17 +315,27 @@ export class TaskScheduler {
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
break;
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
return;
+ case TaskRunResultType.LongpollReturnedPending: {
+ // Make sure that we are waiting a bit if long-polling returned too early.
+ const endTime = AbsoluteTime.now();
+ const taskDuration = AbsoluteTime.difference(endTime, startTime);
+ if (
+ Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
+ ) {
+ logger.info(
+ `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
+ );
+ await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
+ } else {
+ logger.info(`task ${taskId} will long-poll again`);
+ }
+ break;
+ }
default:
assertUnreachable(res);
}
@@ -435,6 +446,10 @@ async function runTaskWithErrorReporting(
case TaskRunResultType.Progress:
await storeTaskProgress(ws, opId);
return resp;
+ case TaskRunResultType.LongpollReturnedPending:
+ // Longpoll should be run again immediately.
+ await storeTaskProgress(ws, opId);
+ return resp;
}
} catch (e) {
if (e instanceof CryptoApiStoppedError) {