summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts36
1 files changed, 30 insertions, 6 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index e3a0bd609..1c23ab6b5 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -25,6 +25,7 @@ import {
Duration,
Logger,
NotificationType,
+ ObservabilityContext,
RetryLoopOpts,
TalerError,
TalerErrorCode,
@@ -63,6 +64,10 @@ import {
} from "./deposits.js";
import { updateExchangeFromUrlHandler } from "./exchanges.js";
import {
+ ObservableHttpClientLibrary,
+ ObservableTaskScheduler,
+} from "./observable-wrappers.js";
+import {
computePayMerchantTransactionState,
computeRefundTransactionState,
processPurchase,
@@ -134,7 +139,16 @@ function taskGivesLiveness(taskId: string): boolean {
}
}
-export class TaskScheduler {
+export interface TaskScheduler {
+ ensureRunning(): void;
+ run(opts?: RetryLoopOpts): Promise<void>;
+ startShepherdTask(taskId: TaskIdStr): void;
+ stopShepherdTask(taskId: TaskIdStr): void;
+ resetTaskRetries(taskId: TaskIdStr): Promise<void>;
+ reload(): void;
+}
+
+export class TaskSchedulerImpl implements TaskScheduler {
private sheps: Map<TaskIdStr, ShepherdInfo> = new Map();
private iterCond = new AsyncCondition();
@@ -563,16 +577,26 @@ async function callOperationHandlerForTaskId(
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
+ const oc: ObservabilityContext = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.TaskObservabilityEvent,
+ taskId,
+ event: evt,
+ });
+ }
+ },
+ };
+
const wex: WalletExecutionContext = {
ws,
cancellationToken,
cryptoApi: ws.cryptoApi,
db: ws.db,
- http: ws.http,
- taskScheduler: ws.taskScheduler,
- oc: {
- observe(event) {},
- },
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
};
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {