summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-27 23:36:37 +0100
committerFlorian Dold <florian@dold.me>2024-02-27 23:36:37 +0100
commitd394a6f02f7905813afb74f157badd11f609a18c (patch)
tree0ed89c8fdb85de4e43d818283151c4d1e9837019
parent6584d9e054faf9a927708f1f7f51bcbed7873afb (diff)
downloadwallet-core-d394a6f02f7905813afb74f157badd11f609a18c.tar.gz
wallet-core-d394a6f02f7905813afb74f157badd11f609a18c.tar.bz2
wallet-core-d394a6f02f7905813afb74f157badd11f609a18c.zip
also observe shepherd
-rw-r--r--packages/taler-util/src/notifications.ts19
-rw-r--r--packages/taler-wallet-cli/src/index.ts15
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts30
3 files changed, 47 insertions, 17 deletions
diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts
index d0095dcd2..023eb8ef4 100644
--- a/packages/taler-util/src/notifications.ts
+++ b/packages/taler-util/src/notifications.ts
@@ -111,17 +111,18 @@ export interface RequestProgressNotification {
export enum ObservabilityEventType {
HttpFetchStart = "http-fetch-start",
- HttpFetchFinishError = "http-fetch-finish-success",
- HttpFetchFinishSuccess = "http-fetch-finish-error",
+ HttpFetchFinishError = "http-fetch-finish-error",
+ HttpFetchFinishSuccess = "http-fetch-finish-success",
DbQueryStart = "db-query-start",
- DbQueryFinishSuccess = "db-query-finish-error",
- DbQueryFinishError = "db-query-finish-success",
+ DbQueryFinishSuccess = "db-query-finish-success",
+ DbQueryFinishError = "db-query-finish-error",
RequestStart = "request-start",
RequestFinishSuccess = "request-finish-success",
RequestFinishError = "request-finish-error",
- TaskStart = "start-task",
- TaskStop = "stop-task",
- TaskReset = "reset-task",
+ TaskStart = "task-start",
+ TaskStop = "task-stop",
+ TaskReset = "task-reset",
+ ShepherdTaskResult = "sheperd-task-result",
DeclareTaskDependency = "declare-task-dependency",
CryptoStart = "crypto-start",
CryptoFinishSuccess = "crypto-finish-success",
@@ -194,6 +195,10 @@ export type ObservabilityEvent =
| {
type: ObservabilityEventType.CryptoFinishError;
operation: string;
+ }
+ | {
+ type: ObservabilityEventType.ShepherdTaskResult;
+ resultType: string;
};
export interface BackupOperationErrorNotification {
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts
index 68919615c..b1d813e0d 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -1181,7 +1181,20 @@ advancedCli
})
.action(async (args) => {
logger.info(`serving at ${args.serve.unixPath}`);
- const wh = await createLocalWallet(args);
+ const onNotif = (notif: WalletNotification) => {
+ if (observabilityEventFile) {
+ switch (notif.type) {
+ case NotificationType.RequestObservabilityEvent:
+ case NotificationType.TaskObservabilityEvent:
+ fs.appendFileSync(
+ observabilityEventFile,
+ JSON.stringify(notif) + "\n",
+ );
+ break;
+ }
+ }
+ };
+ const wh = await createLocalWallet(args, onNotif);
const w = wh.wallet;
w.runTaskLoop()
.then((res) => {
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 0bc548a60..e6ea412e3 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -26,6 +26,7 @@ import {
Logger,
NotificationType,
ObservabilityContext,
+ ObservabilityEventType,
RetryLoopOpts,
TalerError,
TalerErrorCode,
@@ -327,15 +328,16 @@ export class TaskSchedulerImpl implements TaskScheduler {
Duration.fromSpec({ seconds: 60 }),
);
}
+ const wex = getWalletExecutionContextForTask(
+ this.ws,
+ taskId,
+ info.cts.token,
+ );
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
- return await callOperationHandlerForTaskId(
- this.ws,
- taskId,
- info.cts.token,
- );
+ return await callOperationHandlerForTaskId(wex, taskId);
});
const retryRecord = await this.ws.db.runReadOnlyTx(
["operationRetries"],
@@ -343,6 +345,10 @@ export class TaskSchedulerImpl implements TaskScheduler {
return tx.operationRetries.get(taskId);
},
);
+ wex.oc.observe({
+ type: ObservabilityEventType.ShepherdTaskResult,
+ resultType: res.type,
+ });
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(`Shepherd for ${taskId} got error result.`);
@@ -573,11 +579,11 @@ async function runTaskWithErrorReporting(
}
}
-async function callOperationHandlerForTaskId(
+function getWalletExecutionContextForTask(
ws: InternalWalletState,
taskId: TaskIdStr,
cancellationToken: CancellationToken,
-): Promise<TaskRunResult> {
+): WalletExecutionContext {
let oc: ObservabilityContext;
let wex: WalletExecutionContext;
@@ -594,14 +600,20 @@ async function callOperationHandlerForTaskId(
},
};
- wex = getObservedWalletExecutionContext(ws, CancellationToken.CONTINUE, oc);
+ wex = getObservedWalletExecutionContext(ws, cancellationToken, oc);
} else {
oc = {
observe(evt) {},
};
- wex = getNormalWalletExecutionContext(ws, CancellationToken.CONTINUE, oc);
+ wex = getNormalWalletExecutionContext(ws, cancellationToken, oc);
}
+ return wex;
+}
+async function callOperationHandlerForTaskId(
+ wex: WalletExecutionContext,
+ taskId: TaskIdStr,
+): Promise<TaskRunResult> {
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {
case PendingTaskType.ExchangeUpdate: