diff options
author | Florian Dold <florian@dold.me> | 2024-02-27 23:36:37 +0100 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2024-02-27 23:36:37 +0100 |
commit | d394a6f02f7905813afb74f157badd11f609a18c (patch) | |
tree | 0ed89c8fdb85de4e43d818283151c4d1e9837019 | |
parent | 6584d9e054faf9a927708f1f7f51bcbed7873afb (diff) | |
download | wallet-core-d394a6f02f7905813afb74f157badd11f609a18c.tar.gz wallet-core-d394a6f02f7905813afb74f157badd11f609a18c.tar.bz2 wallet-core-d394a6f02f7905813afb74f157badd11f609a18c.zip |
also observe shepherd
-rw-r--r-- | packages/taler-util/src/notifications.ts | 19 | ||||
-rw-r--r-- | packages/taler-wallet-cli/src/index.ts | 15 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 30 |
3 files changed, 47 insertions, 17 deletions
diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts index d0095dcd2..023eb8ef4 100644 --- a/packages/taler-util/src/notifications.ts +++ b/packages/taler-util/src/notifications.ts @@ -111,17 +111,18 @@ export interface RequestProgressNotification { export enum ObservabilityEventType { HttpFetchStart = "http-fetch-start", - HttpFetchFinishError = "http-fetch-finish-success", - HttpFetchFinishSuccess = "http-fetch-finish-error", + HttpFetchFinishError = "http-fetch-finish-error", + HttpFetchFinishSuccess = "http-fetch-finish-success", DbQueryStart = "db-query-start", - DbQueryFinishSuccess = "db-query-finish-error", - DbQueryFinishError = "db-query-finish-success", + DbQueryFinishSuccess = "db-query-finish-success", + DbQueryFinishError = "db-query-finish-error", RequestStart = "request-start", RequestFinishSuccess = "request-finish-success", RequestFinishError = "request-finish-error", - TaskStart = "start-task", - TaskStop = "stop-task", - TaskReset = "reset-task", + TaskStart = "task-start", + TaskStop = "task-stop", + TaskReset = "task-reset", + ShepherdTaskResult = "sheperd-task-result", DeclareTaskDependency = "declare-task-dependency", CryptoStart = "crypto-start", CryptoFinishSuccess = "crypto-finish-success", @@ -194,6 +195,10 @@ export type ObservabilityEvent = | { type: ObservabilityEventType.CryptoFinishError; operation: string; + } + | { + type: ObservabilityEventType.ShepherdTaskResult; + resultType: string; }; export interface BackupOperationErrorNotification { diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 68919615c..b1d813e0d 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -1181,7 +1181,20 @@ advancedCli }) .action(async (args) => { logger.info(`serving at ${args.serve.unixPath}`); - const wh = await createLocalWallet(args); + const onNotif = (notif: WalletNotification) => { + if (observabilityEventFile) { + switch (notif.type) { + case NotificationType.RequestObservabilityEvent: + case NotificationType.TaskObservabilityEvent: + fs.appendFileSync( + observabilityEventFile, + JSON.stringify(notif) + "\n", + ); + break; + } + } + }; + const wh = await createLocalWallet(args, onNotif); const w = wh.wallet; w.runTaskLoop() .then((res) => { diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 0bc548a60..e6ea412e3 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -26,6 +26,7 @@ import { Logger, NotificationType, ObservabilityContext, + ObservabilityEventType, RetryLoopOpts, TalerError, TalerErrorCode, @@ -327,15 +328,16 @@ export class TaskSchedulerImpl implements TaskScheduler { Duration.fromSpec({ seconds: 60 }), ); } + const wex = getWalletExecutionContextForTask( + this.ws, + taskId, + info.cts.token, + ); const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { - return await callOperationHandlerForTaskId( - this.ws, - taskId, - info.cts.token, - ); + return await callOperationHandlerForTaskId(wex, taskId); }); const retryRecord = await this.ws.db.runReadOnlyTx( ["operationRetries"], @@ -343,6 +345,10 @@ export class TaskSchedulerImpl implements TaskScheduler { return tx.operationRetries.get(taskId); }, ); + wex.oc.observe({ + type: ObservabilityEventType.ShepherdTaskResult, + resultType: res.type, + }); switch (res.type) { case TaskRunResultType.Error: { logger.trace(`Shepherd for ${taskId} got error result.`); @@ -573,11 +579,11 @@ async function runTaskWithErrorReporting( } } -async function callOperationHandlerForTaskId( +function getWalletExecutionContextForTask( ws: InternalWalletState, taskId: TaskIdStr, cancellationToken: CancellationToken, -): Promise<TaskRunResult> { +): WalletExecutionContext { let oc: ObservabilityContext; let wex: WalletExecutionContext; @@ -594,14 +600,20 @@ async function callOperationHandlerForTaskId( }, }; - wex = getObservedWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + wex = getObservedWalletExecutionContext(ws, cancellationToken, oc); } else { oc = { observe(evt) {}, }; - wex = getNormalWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + wex = getNormalWalletExecutionContext(ws, cancellationToken, oc); } + return wex; +} +async function callOperationHandlerForTaskId( + wex: WalletExecutionContext, + taskId: TaskIdStr, +): Promise<TaskRunResult> { const pending = parseTaskIdentifier(taskId); switch (pending.tag) { case PendingTaskType.ExchangeUpdate: |