From 4ffb4a94e8279896a11d65b66d71beb66ed6d009 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 5 Mar 2024 01:28:30 +0100 Subject: wallet-core: simplify shepherd, handle results of cancelled tasks properly --- packages/taler-wallet-core/src/shepherd.ts | 171 ++++++++++++----------------- 1 file changed, 68 insertions(+), 103 deletions(-) (limited to 'packages/taler-wallet-core/src/shepherd.ts') diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index db090c352..0544288ba 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -28,7 +28,6 @@ import { ObservabilityContext, ObservabilityEventType, RetryLoopOpts, - TalerError, TalerErrorCode, TalerErrorDetail, TaskThrottler, @@ -37,6 +36,7 @@ import { TransactionType, WalletNotification, assertUnreachable, + getErrorDetailFromException, j2s, makeErrorDetail, } from "@gnu-taler/taler-util"; @@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, + OperationRetryRecord, WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, @@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler { } getActiveTasks(): TaskIdStr[] { - return [...this.sheps.keys()] + return [...this.sheps.keys()]; } ensureRunning(): void { @@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler { const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. - const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { - return await callOperationHandlerForTaskId(wex, taskId); - }); - const retryRecord = await this.ws.db.runReadOnlyTx( - ["operationRetries"], - async (tx) => { - return tx.operationRetries.get(taskId); + const res = await runTaskWithErrorReporting( + this.ws, + taskId, + info, + async () => { + return await callOperationHandlerForTaskId(wex, taskId); }, ); + if (info.cts.token.isCancelled) { + logger.info("task cancelled, not processing result"); + return; + } + if (this.ws.stopped) { + logger.info("wallet stopped, not processing result"); + } wex.oc.observe({ type: ObservabilityEventType.ShepherdTaskResult, resultType: res.type, @@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler { switch (res.type) { case TaskRunResultType.Error: { logger.trace(`Shepherd for ${taskId} got error result.`); - if (retryRecord) { - let delay: Duration; - const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); - logger.trace(`Waiting for ${delay.d_ms} ms`); - await this.wait(taskId, info, delay); - } else { - logger.trace("Retrying immediately."); - } + const retryRecord = await storePendingTaskError( + this.ws, + taskId, + res.errorDetail, + ); + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); break; } case TaskRunResultType.Backoff: { logger.trace(`Shepherd for ${taskId} got backoff result.`); - if (retryRecord) { - let delay: Duration; - const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); - logger.trace(`Waiting for ${delay.d_ms} ms`); - await this.wait(taskId, info, delay); - } else { - logger.trace("Retrying immediately."); - } + const retryRecord = await storePendingTaskPending(this.ws, taskId); + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); break; } case TaskRunResultType.Progress: { logger.trace( `Shepherd for ${taskId} got progress result, re-running immediately.`, ); + await storeTaskProgress(this.ws, taskId); break; } case TaskRunResultType.ScheduleLater: logger.trace(`Shepherd for ${taskId} got schedule-later result.`); + await storeTaskProgress(this.ws, taskId); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); + await storePendingTaskFinished(this.ws, taskId); return; case TaskRunResultType.LongpollReturnedPending: { + await storeTaskProgress(this.ws, taskId); // Make sure that we are waiting a bit if long-polling returned too early. const endTime = AbsoluteTime.now(); const taskDuration = AbsoluteTime.difference(endTime, startTime); @@ -425,9 +434,9 @@ async function storePendingTaskError( ws: InternalWalletState, pendingTaskId: string, e: TalerErrorDetail, -): Promise { +): Promise { logger.info(`storing pending task error for ${pendingTaskId}`); - const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { + const res = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); if (!retryRecord) { retryRecord = { @@ -440,11 +449,15 @@ async function storePendingTaskError( retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); - return taskToRetryNotification(ws, tx, pendingTaskId, e); + return { + notification: await taskToRetryNotification(ws, tx, pendingTaskId, e), + retryRecord, + }; }); - if (maybeNotification) { - ws.notify(maybeNotification); + if (res?.notification) { + ws.notify(res.notification); } + return res.retryRecord; } /** @@ -462,8 +475,8 @@ async function storeTaskProgress( async function storePendingTaskPending( ws: InternalWalletState, pendingTaskId: string, -): Promise { - const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { +): Promise { + const res = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); let hadError = false; if (!retryRecord) { @@ -479,15 +492,24 @@ async function storePendingTaskPending( retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); + let notification: WalletNotification | undefined = undefined; if (hadError) { - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - } else { - return undefined; + notification = await taskToRetryNotification( + ws, + tx, + pendingTaskId, + undefined, + ); } + return { + notification, + retryRecord, + }; }); - if (maybeNotification) { - ws.notify(maybeNotification); + if (res.notification) { + ws.notify(res.notification); } + return res.retryRecord; } async function storePendingTaskFinished( @@ -502,33 +524,11 @@ async function storePendingTaskFinished( async function runTaskWithErrorReporting( ws: InternalWalletState, opId: TaskIdStr, + info: ShepherdInfo, f: () => Promise, ): Promise { - let maybeError: TalerErrorDetail | undefined; try { - const resp = await f(); - switch (resp.type) { - case TaskRunResultType.Error: - await storePendingTaskError(ws, opId, resp.errorDetail); - return resp; - case TaskRunResultType.Finished: - await storePendingTaskFinished(ws, opId); - return resp; - case TaskRunResultType.Backoff: - await storePendingTaskPending(ws, opId); - return resp; - case TaskRunResultType.ScheduleLater: - // Task succeeded but wants to be run again. - await storeTaskProgress(ws, opId); - return resp; - case TaskRunResultType.Progress: - await storeTaskProgress(ws, opId); - return resp; - case TaskRunResultType.LongpollReturnedPending: - // Longpoll should be run again immediately. - await storeTaskProgress(ws, opId); - return resp; - } + return await f(); } catch (e) { if (e instanceof CryptoApiStoppedError) { if (ws.stopped) { @@ -543,46 +543,11 @@ async function runTaskWithErrorReporting( }; } } - if (e instanceof TalerError) { - logger.warn("operation processed resulted in error"); - logger.warn(`error was: ${j2s(e.errorDetail)}`); - maybeError = e.errorDetail; - await storePendingTaskError(ws, opId, maybeError!); - return { - type: TaskRunResultType.Error, - errorDetail: e.errorDetail, - }; - } else if (e instanceof Error) { - // This is a bug, as we expect pending operations to always - // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED - // or return something. - logger.error(`Uncaught exception: ${e.message}`); - logger.error(`Stack: ${e.stack}`); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - { - stack: e.stack, - }, - `unexpected exception (message: ${e.message})`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } else { - logger.error("Uncaught exception, value is not even an error."); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - `unexpected exception (not even an error)`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } + const errorDetail = getErrorDetailFromException(e); + return { + type: TaskRunResultType.Error, + errorDetail, + }; } } -- cgit v1.2.3