summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-04-22 23:29:07 +0200
committerFlorian Dold <florian@dold.me>2024-04-22 23:29:07 +0200
commita181ee06e4b52cb35e00ff8c86acff315135faf2 (patch)
tree9961ae277d861f93818c253e3992ad25128f6377 /packages/taler-wallet-core/src/shepherd.ts
parente944c27e43474e8db464fbc593607e4e9d89576d (diff)
downloadwallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.gz
wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.bz2
wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.zip
wallet-core: unify handling of run-until-done, simplify waiter implementation
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts54
1 files changed, 30 insertions, 24 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 58bdcf0dd..aae6d5a18 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -27,7 +27,6 @@ import {
NotificationType,
ObservabilityContext,
ObservabilityEventType,
- RetryLoopOpts,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
@@ -142,13 +141,13 @@ function taskGivesLiveness(taskId: string): boolean {
}
export interface TaskScheduler {
- ensureRunning(): void;
- run(opts?: RetryLoopOpts): Promise<void>;
+ ensureRunning(): Promise<void>;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
resetTaskRetries(taskId: TaskIdStr): Promise<void>;
- reload(): void;
+ reload(): Promise<void>;
getActiveTasks(): TaskIdStr[];
+ isIdle(): boolean;
}
export class TaskSchedulerImpl implements TaskScheduler {
@@ -176,10 +175,11 @@ export class TaskSchedulerImpl implements TaskScheduler {
return [...this.sheps.keys()];
}
- ensureRunning(): void {
+ async ensureRunning(): Promise<void> {
if (this.isRunning) {
return;
}
+ await this.loadTasksFromDb();
this.run()
.catch((e) => {
logger.error("error running task loop");
@@ -190,7 +190,22 @@ export class TaskSchedulerImpl implements TaskScheduler {
});
}
- async run(opts: RetryLoopOpts = {}): Promise<void> {
+ isIdle(): boolean {
+ let alive = false;
+ const taskIds = [...this.sheps.keys()];
+ logger.info(`current task IDs: ${j2s(taskIds)}`);
+ logger.info(`sheps: ${this.sheps.size}`);
+ for (const taskId of taskIds) {
+ if (taskGivesLiveness(taskId)) {
+ alive = true;
+ break;
+ }
+ }
+ // We're idle if no task is alive anymore.
+ return !alive;
+ }
+
+ private async run(): Promise<void> {
if (this.isRunning) {
throw Error("task loop already running");
}
@@ -200,26 +215,17 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.info("loaded!");
logger.info(`sheps: ${this.sheps.size}`);
while (true) {
- if (opts.stopWhenDone) {
- let alive = false;
- const taskIds = [...this.sheps.keys()];
- logger.info(`current task IDs: ${j2s(taskIds)}`);
- logger.info(`sheps: ${this.sheps.size}`);
- for (const taskId of taskIds) {
- if (taskGivesLiveness(taskId)) {
- alive = true;
- break;
- }
- }
- if (!alive) {
- logger.info("Breaking out of task loop (no more work).");
- break;
- }
- }
if (this.ws.stopped) {
logger.info("Breaking out of task loop (wallet stopped).");
break;
}
+
+ if (this.isIdle()) {
+ this.ws.notify({
+ type: NotificationType.Idle,
+ });
+ }
+
await this.iterCond.wait();
}
this.isRunning = false;
@@ -237,8 +243,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
*
* Mostly useful to interrupt all waits when time-travelling.
*/
- reload(): void {
- this.ensureRunning();
+ async reload(): Promise<void> {
+ await this.ensureRunning();
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {