From a181ee06e4b52cb35e00ff8c86acff315135faf2 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 22 Apr 2024 23:29:07 +0200 Subject: wallet-core: unify handling of run-until-done, simplify waiter implementation --- packages/taler-wallet-core/src/shepherd.ts | 54 +++++++++++++++++------------- 1 file changed, 30 insertions(+), 24 deletions(-) (limited to 'packages/taler-wallet-core/src/shepherd.ts') diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 58bdcf0dd..aae6d5a18 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -27,7 +27,6 @@ import { NotificationType, ObservabilityContext, ObservabilityEventType, - RetryLoopOpts, TalerErrorDetail, TaskThrottler, TransactionIdStr, @@ -142,13 +141,13 @@ function taskGivesLiveness(taskId: string): boolean { } export interface TaskScheduler { - ensureRunning(): void; - run(opts?: RetryLoopOpts): Promise; + ensureRunning(): Promise; startShepherdTask(taskId: TaskIdStr): void; stopShepherdTask(taskId: TaskIdStr): void; resetTaskRetries(taskId: TaskIdStr): Promise; - reload(): void; + reload(): Promise; getActiveTasks(): TaskIdStr[]; + isIdle(): boolean; } export class TaskSchedulerImpl implements TaskScheduler { @@ -176,10 +175,11 @@ export class TaskSchedulerImpl implements TaskScheduler { return [...this.sheps.keys()]; } - ensureRunning(): void { + async ensureRunning(): Promise { if (this.isRunning) { return; } + await this.loadTasksFromDb(); this.run() .catch((e) => { logger.error("error running task loop"); @@ -190,7 +190,22 @@ export class TaskSchedulerImpl implements TaskScheduler { }); } - async run(opts: RetryLoopOpts = {}): Promise { + isIdle(): boolean { + let alive = false; + const taskIds = [...this.sheps.keys()]; + logger.info(`current task IDs: ${j2s(taskIds)}`); + logger.info(`sheps: ${this.sheps.size}`); + for (const taskId of taskIds) { + if (taskGivesLiveness(taskId)) { + alive = true; + break; + } + } + // We're idle if no task is alive anymore. + return !alive; + } + + private async run(): Promise { if (this.isRunning) { throw Error("task loop already running"); } @@ -200,26 +215,17 @@ export class TaskSchedulerImpl implements TaskScheduler { logger.info("loaded!"); logger.info(`sheps: ${this.sheps.size}`); while (true) { - if (opts.stopWhenDone) { - let alive = false; - const taskIds = [...this.sheps.keys()]; - logger.info(`current task IDs: ${j2s(taskIds)}`); - logger.info(`sheps: ${this.sheps.size}`); - for (const taskId of taskIds) { - if (taskGivesLiveness(taskId)) { - alive = true; - break; - } - } - if (!alive) { - logger.info("Breaking out of task loop (no more work)."); - break; - } - } if (this.ws.stopped) { logger.info("Breaking out of task loop (wallet stopped)."); break; } + + if (this.isIdle()) { + this.ws.notify({ + type: NotificationType.Idle, + }); + } + await this.iterCond.wait(); } this.isRunning = false; @@ -237,8 +243,8 @@ export class TaskSchedulerImpl implements TaskScheduler { * * Mostly useful to interrupt all waits when time-travelling. */ - reload(): void { - this.ensureRunning(); + async reload(): Promise { + await this.ensureRunning(); const tasksIds = [...this.sheps.keys()]; logger.info(`reloading sheperd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { -- cgit v1.2.3