summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-03-05 01:28:30 +0100
committerFlorian Dold <florian@dold.me>2024-03-05 01:28:30 +0100
commit4ffb4a94e8279896a11d65b66d71beb66ed6d009 (patch)
treee740f5601a09f6ba2269b8fdd56dd8159d247b1e /packages/taler-wallet-core/src/shepherd.ts
parent8a111862400915627227fa432fab017833ec93a7 (diff)
downloadwallet-core-4ffb4a94e8279896a11d65b66d71beb66ed6d009.tar.gz
wallet-core-4ffb4a94e8279896a11d65b66d71beb66ed6d009.tar.bz2
wallet-core-4ffb4a94e8279896a11d65b66d71beb66ed6d009.zip
wallet-core: simplify shepherd, handle results of cancelled tasks properly
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts171
1 files changed, 68 insertions, 103 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index db090c352..0544288ba 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -28,7 +28,6 @@ import {
ObservabilityContext,
ObservabilityEventType,
RetryLoopOpts,
- TalerError,
TalerErrorCode,
TalerErrorDetail,
TaskThrottler,
@@ -37,6 +36,7 @@ import {
TransactionType,
WalletNotification,
assertUnreachable,
+ getErrorDetailFromException,
j2s,
makeErrorDetail,
} from "@gnu-taler/taler-util";
@@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
+ OperationRetryRecord,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
@@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
}
getActiveTasks(): TaskIdStr[] {
- return [...this.sheps.keys()]
+ return [...this.sheps.keys()];
}
ensureRunning(): void {
@@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler {
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
- const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
- return await callOperationHandlerForTaskId(wex, taskId);
- });
- const retryRecord = await this.ws.db.runReadOnlyTx(
- ["operationRetries"],
- async (tx) => {
- return tx.operationRetries.get(taskId);
+ const res = await runTaskWithErrorReporting(
+ this.ws,
+ taskId,
+ info,
+ async () => {
+ return await callOperationHandlerForTaskId(wex, taskId);
},
);
+ if (info.cts.token.isCancelled) {
+ logger.info("task cancelled, not processing result");
+ return;
+ }
+ if (this.ws.stopped) {
+ logger.info("wallet stopped, not processing result");
+ }
wex.oc.observe({
type: ObservabilityEventType.ShepherdTaskResult,
resultType: res.type,
@@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler {
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(`Shepherd for ${taskId} got error result.`);
- if (retryRecord) {
- let delay: Duration;
- const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
- logger.trace(`Waiting for ${delay.d_ms} ms`);
- await this.wait(taskId, info, delay);
- } else {
- logger.trace("Retrying immediately.");
- }
+ const retryRecord = await storePendingTaskError(
+ this.ws,
+ taskId,
+ res.errorDetail,
+ );
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
- if (retryRecord) {
- let delay: Duration;
- const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
- logger.trace(`Waiting for ${delay.d_ms} ms`);
- await this.wait(taskId, info, delay);
- } else {
- logger.trace("Retrying immediately.");
- }
+ const retryRecord = await storePendingTaskPending(this.ws, taskId);
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Progress: {
logger.trace(
`Shepherd for ${taskId} got progress result, re-running immediately.`,
);
+ await storeTaskProgress(this.ws, taskId);
break;
}
case TaskRunResultType.ScheduleLater:
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+ await storeTaskProgress(this.ws, taskId);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
+ await storePendingTaskFinished(this.ws, taskId);
return;
case TaskRunResultType.LongpollReturnedPending: {
+ await storeTaskProgress(this.ws, taskId);
// Make sure that we are waiting a bit if long-polling returned too early.
const endTime = AbsoluteTime.now();
const taskDuration = AbsoluteTime.difference(endTime, startTime);
@@ -425,9 +434,9 @@ async function storePendingTaskError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
-): Promise<void> {
+): Promise<OperationRetryRecord> {
logger.info(`storing pending task error for ${pendingTaskId}`);
- const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+ const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
@@ -440,11 +449,15 @@ async function storePendingTaskError(
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
- return taskToRetryNotification(ws, tx, pendingTaskId, e);
+ return {
+ notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
+ retryRecord,
+ };
});
- if (maybeNotification) {
- ws.notify(maybeNotification);
+ if (res?.notification) {
+ ws.notify(res.notification);
}
+ return res.retryRecord;
}
/**
@@ -462,8 +475,8 @@ async function storeTaskProgress(
async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+): Promise<OperationRetryRecord> {
+ const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
@@ -479,15 +492,24 @@ async function storePendingTaskPending(
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
+ let notification: WalletNotification | undefined = undefined;
if (hadError) {
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- } else {
- return undefined;
+ notification = await taskToRetryNotification(
+ ws,
+ tx,
+ pendingTaskId,
+ undefined,
+ );
}
+ return {
+ notification,
+ retryRecord,
+ };
});
- if (maybeNotification) {
- ws.notify(maybeNotification);
+ if (res.notification) {
+ ws.notify(res.notification);
}
+ return res.retryRecord;
}
async function storePendingTaskFinished(
@@ -502,33 +524,11 @@ async function storePendingTaskFinished(
async function runTaskWithErrorReporting(
ws: InternalWalletState,
opId: TaskIdStr,
+ info: ShepherdInfo,
f: () => Promise<TaskRunResult>,
): Promise<TaskRunResult> {
- let maybeError: TalerErrorDetail | undefined;
try {
- const resp = await f();
- switch (resp.type) {
- case TaskRunResultType.Error:
- await storePendingTaskError(ws, opId, resp.errorDetail);
- return resp;
- case TaskRunResultType.Finished:
- await storePendingTaskFinished(ws, opId);
- return resp;
- case TaskRunResultType.Backoff:
- await storePendingTaskPending(ws, opId);
- return resp;
- case TaskRunResultType.ScheduleLater:
- // Task succeeded but wants to be run again.
- await storeTaskProgress(ws, opId);
- return resp;
- case TaskRunResultType.Progress:
- await storeTaskProgress(ws, opId);
- return resp;
- case TaskRunResultType.LongpollReturnedPending:
- // Longpoll should be run again immediately.
- await storeTaskProgress(ws, opId);
- return resp;
- }
+ return await f();
} catch (e) {
if (e instanceof CryptoApiStoppedError) {
if (ws.stopped) {
@@ -543,46 +543,11 @@ async function runTaskWithErrorReporting(
};
}
}
- if (e instanceof TalerError) {
- logger.warn("operation processed resulted in error");
- logger.warn(`error was: ${j2s(e.errorDetail)}`);
- maybeError = e.errorDetail;
- await storePendingTaskError(ws, opId, maybeError!);
- return {
- type: TaskRunResultType.Error,
- errorDetail: e.errorDetail,
- };
- } else if (e instanceof Error) {
- // This is a bug, as we expect pending operations to always
- // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
- // or return something.
- logger.error(`Uncaught exception: ${e.message}`);
- logger.error(`Stack: ${e.stack}`);
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {
- stack: e.stack,
- },
- `unexpected exception (message: ${e.message})`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- } else {
- logger.error("Uncaught exception, value is not even an error.");
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- `unexpected exception (not even an error)`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- }
+ const errorDetail = getErrorDetailFromException(e);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail,
+ };
}
}