diff options
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 184 |
1 files changed, 106 insertions, 78 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index f04bcd2c2..d662bd7ae 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -27,7 +27,6 @@ import { NotificationType, ObservabilityContext, ObservabilityEventType, - RetryLoopOpts, TalerErrorDetail, TaskThrottler, TransactionIdStr, @@ -37,6 +36,7 @@ import { assertUnreachable, getErrorDetailFromException, j2s, + safeStringifyException, } from "@gnu-taler/taler-util"; import { processBackupForProvider } from "./backup/index.js"; import { @@ -61,7 +61,10 @@ import { computeDepositTransactionStatus, processDepositGroup, } from "./deposits.js"; -import { updateExchangeFromUrlHandler } from "./exchanges.js"; +import { + computeDenomLossTransactionStatus, + updateExchangeFromUrlHandler, +} from "./exchanges.js"; import { computePayMerchantTransactionState, computeRefundTransactionState, @@ -88,7 +91,6 @@ import { computeRefreshTransactionState, processRefreshGroup, } from "./refresh.js"; -import { computeRewardTransactionStatus } from "./reward.js"; import { constructTransactionIdentifier, parseTransactionIdentifier, @@ -140,13 +142,14 @@ function taskGivesLiveness(taskId: string): boolean { } export interface TaskScheduler { - ensureRunning(): void; - run(opts?: RetryLoopOpts): Promise<void>; + ensureRunning(): Promise<void>; startShepherdTask(taskId: TaskIdStr): void; stopShepherdTask(taskId: TaskIdStr): void; resetTaskRetries(taskId: TaskIdStr): Promise<void>; - reload(): void; + reload(): Promise<void>; getActiveTasks(): TaskIdStr[]; + isIdle(): boolean; + shutdown(): Promise<void>; } export class TaskSchedulerImpl implements TaskScheduler { @@ -174,58 +177,73 @@ export class TaskSchedulerImpl implements TaskScheduler { return [...this.sheps.keys()]; } - ensureRunning(): void { + async shutdown(): Promise<void> { + const tasksIds = [...this.sheps.keys()]; + logger.info(`Stopping task shepherd.`); + for (const taskId of tasksIds) { + this.stopShepherdTask(taskId); + } + } + + async ensureRunning(): Promise<void> { if (this.isRunning) { return; } + this.isRunning = true; + try { + await this.loadTasksFromDb(); + } catch (e) { + this.isRunning = false; + throw e; + } this.run() .catch((e) => { logger.error("error running task loop"); logger.error(`err: ${e}`); }) .then(() => { - logger.info("done running task loop"); + logger.trace("done running task loop"); + this.isRunning = false; }); } - async run(opts: RetryLoopOpts = {}): Promise<void> { - if (this.isRunning) { - throw Error("task loop already running"); + isIdle(): boolean { + let alive = false; + const taskIds = [...this.sheps.keys()]; + for (const taskId of taskIds) { + if (taskGivesLiveness(taskId)) { + alive = true; + break; + } } - logger.info("Running task loop."); - this.isRunning = true; - await this.loadTasksFromDb(); - logger.info("loaded!"); - logger.info(`sheps: ${this.sheps.size}`); + // We're idle if no task is alive anymore. + return !alive; + } + + private async run(): Promise<void> { + logger.trace("Running task loop."); + logger.trace(`sheps: ${this.sheps.size}`); while (true) { - if (opts.stopWhenDone) { - let alive = false; - const taskIds = [...this.sheps.keys()]; - logger.info(`current task IDs: ${j2s(taskIds)}`); - logger.info(`sheps: ${this.sheps.size}`); - for (const taskId of taskIds) { - if (taskGivesLiveness(taskId)) { - alive = true; - break; - } - } - if (!alive) { - logger.info("Breaking out of task loop (no more work)."); - break; - } - } if (this.ws.stopped) { - logger.info("Breaking out of task loop (wallet stopped)."); + logger.trace("Breaking out of task loop (wallet stopped)."); break; } + + if (this.isIdle()) { + this.ws.notify({ + type: NotificationType.Idle, + }); + } + await this.iterCond.wait(); } - this.isRunning = false; - logger.info("Done with task loop."); + logger.trace("Done with task loop."); } startShepherdTask(taskId: TaskIdStr): void { - this.ensureRunning(); + this.ensureRunning().catch((e) => { + logger.error(`error running scheduler: ${safeStringifyException(e)}`); + }); // Run in the background, no await! this.internalStartShepherdTask(taskId); } @@ -235,10 +253,10 @@ export class TaskSchedulerImpl implements TaskScheduler { * * Mostly useful to interrupt all waits when time-travelling. */ - reload() { - this.ensureRunning(); + async reload(): Promise<void> { + await this.ensureRunning(); const tasksIds = [...this.sheps.keys()]; - logger.info(`reloading sheperd with ${tasksIds.length} tasks`); + logger.info(`reloading shepherd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { this.stopShepherdTask(taskId); } @@ -351,11 +369,11 @@ export class TaskSchedulerImpl implements TaskScheduler { }; } if (info.cts.token.isCancelled) { - logger.info("task cancelled, not processing result"); + logger.trace("task cancelled, not processing result"); return; } if (this.ws.stopped) { - logger.info("wallet stopped, not processing result"); + logger.trace("wallet stopped, not processing result"); return; } wex.oc.observe({ @@ -364,15 +382,20 @@ export class TaskSchedulerImpl implements TaskScheduler { }); switch (res.type) { case TaskRunResultType.Error: { - logger.trace(`Shepherd for ${taskId} got error result.`); + if (logger.shouldLogTrace()) { + logger.trace( + `Shepherd for ${taskId} got error result: ${j2s( + res.errorDetail, + )}`, + ); + } const retryRecord = await storePendingTaskError( this.ws, taskId, res.errorDetail, ); - let delay: Duration; const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); + const delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; @@ -380,9 +403,8 @@ export class TaskSchedulerImpl implements TaskScheduler { case TaskRunResultType.Backoff: { logger.trace(`Shepherd for ${taskId} got backoff result.`); const retryRecord = await storePendingTaskPending(this.ws, taskId); - let delay: Duration; const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); + const delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; @@ -394,13 +416,14 @@ export class TaskSchedulerImpl implements TaskScheduler { await storeTaskProgress(this.ws, taskId); break; } - case TaskRunResultType.ScheduleLater: + case TaskRunResultType.ScheduleLater: { logger.trace(`Shepherd for ${taskId} got schedule-later result.`); await storeTaskProgress(this.ws, taskId); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; + } case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); await storePendingTaskFinished(this.ws, taskId); @@ -466,9 +489,12 @@ async function storeTaskProgress( ws: InternalWalletState, pendingTaskId: string, ): Promise<void> { - await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { - await tx.operationRetries.delete(pendingTaskId); - }); + await ws.db.runReadWriteTx( + { storeNames: ["operationRetries"] }, + async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }, + ); } async function storePendingTaskPending( @@ -515,9 +541,12 @@ async function storePendingTaskFinished( ws: InternalWalletState, pendingTaskId: string, ): Promise<void> { - await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { - await tx.operationRetries.delete(pendingTaskId); - }); + await ws.db.runReadWriteTx( + { storeNames: ["operationRetries"] }, + async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }, + ); } function getWalletExecutionContextForTask( @@ -636,6 +665,7 @@ async function getTransactionState( "peerPushCredit", "rewards", "refreshGroups", + "denomLossEvents", ] >, transactionId: string, @@ -674,12 +704,13 @@ async function getTransactionState( } return computeRefundTransactionState(rec); } - case TransactionType.PeerPullCredit: + case TransactionType.PeerPullCredit: { const rec = await tx.peerPullCredit.get(parsedTxId.pursePub); if (!rec) { return undefined; } return computePeerPullCreditTransactionState(rec); + } case TransactionType.PeerPullDebit: { const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId); if (!rec) { @@ -708,15 +739,15 @@ async function getTransactionState( } return computeRefreshTransactionState(rec); } - case TransactionType.Reward: { - const rec = await tx.rewards.get(parsedTxId.walletRewardId); + case TransactionType.Recoup: + throw Error("not yet supported"); + case TransactionType.DenomLoss: { + const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId); if (!rec) { return undefined; } - return computeRewardTransactionStatus(rec); + return computeDenomLossTransactionStatus(rec); } - case TransactionType.Recoup: - throw Error("not yet supported"); default: assertUnreachable(parsedTxId); } @@ -855,8 +886,6 @@ export function listTaskForTransactionId(transactionId: string): TaskIdStr[] { ]; case TransactionType.Refund: return []; - case TransactionType.Reward: - return []; case TransactionType.Withdrawal: return [ constructTaskIdentifier({ @@ -864,6 +893,8 @@ export function listTaskForTransactionId(transactionId: string): TaskIdStr[] { withdrawalGroupId: tid.withdrawalGroupId, }), ]; + case TransactionType.DenomLoss: + return []; default: assertUnreachable(tid); } @@ -911,11 +942,6 @@ export function convertTaskToTransactionId( tag: TransactionType.Refresh, refreshGroupId: parsedTaskId.refreshGroupId, }); - case PendingTaskType.RewardPickup: - return constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: parsedTaskId.walletRewardId, - }); case PendingTaskType.PeerPushDebit: return constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, @@ -942,18 +968,20 @@ export async function getActiveTaskIds( taskIds: [], }; await ws.db.runReadWriteTx( - [ - "exchanges", - "refreshGroups", - "withdrawalGroups", - "purchases", - "depositGroups", - "recoupGroups", - "peerPullCredit", - "peerPushDebit", - "peerPullDebit", - "peerPushCredit", - ], + { + storeNames: [ + "exchanges", + "refreshGroups", + "withdrawalGroups", + "purchases", + "depositGroups", + "recoupGroups", + "peerPullCredit", + "peerPushDebit", + "peerPullDebit", + "peerPushCredit", + ], + }, async (tx) => { const active = GlobalIDB.KeyRange.bound( OPERATION_STATUS_ACTIVE_FIRST, |