summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-27 22:23:11 +0100
committerFlorian Dold <florian@dold.me>2024-02-27 22:23:11 +0100
commitd3572014b06f60250e3bb9e99898b89cd11a4294 (patch)
treed98879e8198cc748513b4c331ee3a764ea10638e
parentf08798520ef9b8b0ff36c6aaf93653605b53b912 (diff)
downloadwallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.tar.gz
wallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.tar.bz2
wallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.zip
observability
-rw-r--r--packages/taler-util/src/notifications.ts84
-rw-r--r--packages/taler-wallet-cli/src/index.ts30
-rw-r--r--packages/taler-wallet-core/src/deposits.ts8
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts6
-rw-r--r--packages/taler-wallet-core/src/observable-wrappers.ts131
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts25
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-credit.ts34
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts10
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-credit.ts27
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts10
-rw-r--r--packages/taler-wallet-core/src/refresh.ts18
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts36
-rw-r--r--packages/taler-wallet-core/src/wallet-api-types.ts1
-rw-r--r--packages/taler-wallet-core/src/wallet.ts47
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts44
15 files changed, 368 insertions, 143 deletions
diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts
index 5e335f7ad..9a62362e7 100644
--- a/packages/taler-util/src/notifications.ts
+++ b/packages/taler-util/src/notifications.ts
@@ -30,10 +30,8 @@ export enum NotificationType {
BackupOperationError = "backup-error",
TransactionStateTransition = "transaction-state-transition",
ExchangeStateTransition = "exchange-state-transition",
- TaskProgress = "task-progress",
- RequestProgress = "request-progress",
- RequestOnTaskDependency = "request-on-task-dependency",
- TaskOnTaskDependency = "task-on-task-dependency",
+ TaskObservabilityEvent = "task-observability-event",
+ RequestObservabilityEvent = "request-observability-event",
}
export interface ErrorInfoSummary {
@@ -99,50 +97,88 @@ export interface BalanceChangeNotification {
}
export interface TaskProgressNotification {
- type: NotificationType.TaskProgress;
+ type: NotificationType.TaskObservabilityEvent;
taskId: string;
+ event: ObservabilityEvent;
}
export interface RequestProgressNotification {
- type: NotificationType.RequestProgress;
+ type: NotificationType.RequestObservabilityEvent;
requestId: string;
+ operation: string;
+ event: ObservabilityEvent;
}
-export interface RequestOnTaskDependencyNotification {
- type: NotificationType.RequestOnTaskDependency;
- parentTaskId: string;
- childTaskId: string;
-}
-
-export interface TaskOnTaskDependencyNotification {
- type: NotificationType.TaskOnTaskDependency;
- parentRequestId: string;
- childTaskId: string;
+export enum ObservabilityEventType {
+ HttpFetchStart = "http-fetch-start",
+ HttpFetchFinishError = "http-fetch-finish-success",
+ HttpFetchFinishSuccess = "http-fetch-finish-error",
+ DbQueryStart = "db-query-start",
+ DbQueryCommit = "db-query-commit",
+ DbQueryAbort = "db-query-abort",
+ RequestStart = "request-start",
+ RequestFinishSuccess = "request-finish-success",
+ RequestFinishError = "request-finish-error",
+ StartTask = "start-task",
+ StopTask = "stop-task",
+ ResetTask = "reset-task",
+ TaskDependency = "task-dependency",
}
export type ObservabilityEvent =
| {
- type: "start-http-fetch";
+ type: ObservabilityEventType.HttpFetchStart;
url: string;
}
| {
- type: "finish-http-fetch";
+ type: ObservabilityEventType.HttpFetchFinishSuccess;
url: string;
status: number;
}
| {
- type: "start-db-query";
+ type: ObservabilityEventType.HttpFetchFinishError;
+ url: string;
+ error: TalerErrorDetail;
+ }
+ | {
+ type: ObservabilityEventType.DbQueryStart;
name: string;
location: string;
}
| {
- type: "finish-db-query";
+ type: ObservabilityEventType.DbQueryCommit;
name: string;
location: string;
}
| {
- type: "task-processed";
- taskResultType: string;
+ type: ObservabilityEventType.DbQueryAbort;
+ name: string;
+ location: string;
+ }
+ | {
+ type: ObservabilityEventType.RequestStart;
+ }
+ | {
+ type: ObservabilityEventType.RequestFinishSuccess;
+ }
+ | {
+ type: ObservabilityEventType.RequestFinishError;
+ }
+ | {
+ type: ObservabilityEventType.StartTask;
+ taskId: string;
+ }
+ | {
+ type: ObservabilityEventType.StopTask;
+ taskId: string;
+ }
+ | {
+ type: ObservabilityEventType.ResetTask;
+ taskId: string;
+ }
+ | {
+ type: ObservabilityEventType.TaskDependency;
+ taskId: string;
};
export interface BackupOperationErrorNotification {
@@ -156,6 +192,4 @@ export type WalletNotification =
| ExchangeStateTransitionNotification
| TransactionStateTransitionNotification
| TaskProgressNotification
- | RequestProgressNotification
- | RequestOnTaskDependencyNotification
- | TaskOnTaskDependencyNotification;
+ | RequestProgressNotification;
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts
index e676f3950..68919615c 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -31,6 +31,7 @@ import {
getRandomBytes,
j2s,
Logger,
+ NotificationType,
parsePaytoUri,
parseTalerUri,
PreparePayResultType,
@@ -66,6 +67,8 @@ import {
makeNotificationWaiter,
} from "@gnu-taler/taler-wallet-core/remote";
+import * as fs from "node:fs";
+
// This module also serves as the entry point for the crypto
// thread worker, and thus must expose these two handlers.
export {
@@ -75,9 +78,10 @@ export {
const logger = new Logger("taler-wallet-cli.ts");
+let observabilityEventFile: string | undefined = undefined;
+
const EXIT_EXCEPTION = 4;
const EXIT_API_ERROR = 5;
-const EXIT_RETRIES_EXCEEDED = 6;
setUnhandledRejectionHandler((error: any) => {
logger.error("unhandledRejection", error.message);
@@ -270,6 +274,7 @@ async function createLocalWallet(
denomselAllowLate: checkEnvFlag(
"TALER_WALLET_DEBUG_DENOMSEL_ALLOW_LATE",
),
+ emitObservabilityEvents: observabilityEventFile != null,
skipDefaults: walletCliArgs.wallet.skipDefaults,
},
},
@@ -293,11 +298,26 @@ async function withWallet<T>(
): Promise<T> {
const waiter = makeNotificationWaiter();
+ const onNotif = (notif: WalletNotification) => {
+ waiter.notify(notif);
+ if (observabilityEventFile) {
+ switch (notif.type) {
+ case NotificationType.RequestObservabilityEvent:
+ case NotificationType.TaskObservabilityEvent:
+ fs.appendFileSync(
+ observabilityEventFile,
+ JSON.stringify(notif) + "\n",
+ );
+ break;
+ }
+ }
+ };
+
if (walletCliArgs.wallet.walletConnection) {
logger.info("creating remote wallet");
const w = await createRemoteWallet({
name: "wallet",
- notificationHandler: waiter.notify,
+ notificationHandler: onNotif,
socketFilename: walletCliArgs.wallet.walletConnection,
});
const ctx: WalletContext = {
@@ -311,7 +331,7 @@ async function withWallet<T>(
w.close();
return res;
} else {
- const wh = await createLocalWallet(walletCliArgs, waiter.notify);
+ const wh = await createLocalWallet(walletCliArgs, onNotif);
const ctx: WalletContext = {
client: wh.wallet.client,
waitForNotificationCond: waiter.waitForNotificationCond,
@@ -1719,5 +1739,9 @@ async function read(stream: NodeJS.ReadStream) {
}
export function main() {
+ const maybeFilename = getenv("TALER_WALLET_DEBUG_OBSERVE");
+ if (!!maybeFilename) {
+ observabilityEventFile = maybeFilename;
+ }
walletCli.run();
}
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 3abb614bd..68ebc9507 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -238,8 +238,8 @@ export class DepositTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, depositGroupId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["depositGroups"],
async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
@@ -276,8 +276,8 @@ export class DepositTransactionContext implements TransactionContext {
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index 4453123fa..5b0d3e823 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -957,7 +957,7 @@ async function startUpdateExchangeEntry(
newExchangeState: newExchangeState,
oldExchangeState: oldExchangeState,
});
- await wex.ws.taskScheduler.resetTaskRetries(taskId);
+ await wex.taskScheduler.resetTaskRetries(taskId);
}
/**
@@ -1114,7 +1114,7 @@ export async function fetchFreshExchange(
): Promise<ReadyExchangeSummary> {
const canonUrl = canonicalizeBaseUrl(baseUrl);
- wex.ws.taskScheduler.ensureRunning();
+ wex.taskScheduler.ensureRunning();
await startUpdateExchangeEntry(wex, canonUrl, {
forceUpdate: options.forceUpdate,
@@ -1514,7 +1514,7 @@ export async function updateExchangeFromUrlHandler(
});
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
- wex.ws.taskScheduler.startShepherdTask(recoupTaskId);
+ wex.taskScheduler.startShepherdTask(recoupTaskId);
}
if (!updated) {
diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts
new file mode 100644
index 000000000..77839e047
--- /dev/null
+++ b/packages/taler-wallet-core/src/observable-wrappers.ts
@@ -0,0 +1,131 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * @fileoverview Wrappers/proxies to make various interfaces observable.
+ */
+
+/**
+ * Imports.
+ */
+import {
+ ObservabilityContext,
+ ObservabilityEventType,
+ RetryLoopOpts,
+ getErrorDetailFromException,
+} from "@gnu-taler/taler-util";
+import {
+ HttpRequestLibrary,
+ HttpRequestOptions,
+ HttpResponse,
+} from "@gnu-taler/taler-util/http";
+import { TaskIdStr } from "./common.js";
+import { TaskScheduler } from "./shepherd.js";
+
+/**
+ * Task scheduler with extra observability events.
+ */
+export class ObservableTaskScheduler implements TaskScheduler {
+ constructor(
+ private impl: TaskScheduler,
+ private oc: ObservabilityContext,
+ ) {}
+
+ private taskDepCache = new Set<string>();
+
+ private declareDep(taskId: TaskIdStr): void {
+ if (this.taskDepCache.size > 500) {
+ this.taskDepCache.clear();
+ }
+ if (!this.taskDepCache.has(taskId)) {
+ this.taskDepCache.add(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.TaskDependency,
+ taskId,
+ });
+ }
+ }
+
+ ensureRunning(): void {
+ return this.impl.ensureRunning();
+ }
+
+ run(opts?: RetryLoopOpts | undefined): Promise<void> {
+ return this.impl.run(opts);
+ }
+ startShepherdTask(taskId: TaskIdStr): void {
+ this.declareDep(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.StartTask,
+ taskId,
+ });
+ return this.impl.startShepherdTask(taskId);
+ }
+ stopShepherdTask(taskId: TaskIdStr): void {
+ this.declareDep(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.StopTask,
+ taskId,
+ });
+ return this.impl.stopShepherdTask(taskId);
+ }
+ resetTaskRetries(taskId: TaskIdStr): Promise<void> {
+ this.declareDep(taskId);
+ if (this.taskDepCache.size > 500) {
+ this.taskDepCache.clear();
+ }
+ this.oc.observe({
+ type: ObservabilityEventType.ResetTask,
+ taskId,
+ });
+ return this.impl.resetTaskRetries(taskId);
+ }
+ reload(): void {
+ return this.impl.reload();
+ }
+}
+
+export class ObservableHttpClientLibrary implements HttpRequestLibrary {
+ constructor(
+ private impl: HttpRequestLibrary,
+ private oc: ObservabilityContext,
+ ) {}
+ async fetch(
+ url: string,
+ opt?: HttpRequestOptions | undefined,
+ ): Promise<HttpResponse> {
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchStart,
+ url: url,
+ });
+ try {
+ const res = await this.impl.fetch(url, opt);
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchFinishSuccess,
+ url,
+ status: res.status,
+ });
+ return res;
+ } catch (e) {
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchFinishError,
+ url,
+ error: getErrorDetailFromException(e),
+ });
+ throw e;
+ }
+ }
+}
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index a576930ba..e92156572 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -140,7 +140,6 @@ import {
import {
EXCHANGE_COINS_LOCK,
getDenomInfo,
- InternalWalletState,
WalletExecutionContext,
} from "./wallet.js";
import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
@@ -244,9 +243,9 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId } = this;
- ws.taskScheduler.stopShepherdTask(this.taskId);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId } = this;
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ const transitionInfo = await wex.db.runReadWriteTx(
["purchases"],
async (tx) => {
const purchase = await tx.purchases.get(proposalId);
@@ -263,7 +262,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
@@ -334,8 +333,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["purchases"],
async (tx) => {
const purchase = await tx.purchases.get(proposalId);
@@ -352,13 +351,13 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"purchases",
"refreshGroups",
@@ -387,8 +386,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index c999a8d1f..de30f66d2 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -137,8 +137,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -192,13 +192,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -243,13 +243,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -302,13 +302,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -356,9 +356,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 828f68113..7348a30ce 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -300,9 +300,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
>,
) => Promise<TransitionResult>,
): Promise<void> {
- const ws = this.wex;
+ const wex = this.wex;
const extraStores = opts.extraStores ?? [];
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullDebit", ...extraStores],
async (tx) => {
const pi = await tx.peerPullDebit.get(this.peerPullDebitId);
@@ -325,9 +325,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
},
);
- ws.taskScheduler.stopShepherdTask(this.taskId);
- notifyTransition(ws, this.transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, this.transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
index 772007bb6..be2b3a7bc 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -17,7 +17,6 @@
import {
AcceptPeerPushPaymentResponse,
Amounts,
- CancellationToken,
ConfirmPeerPushCreditRequest,
ContractTermsUtil,
ExchangePurseMergeRequest,
@@ -83,7 +82,7 @@ import {
notifyTransition,
parseTransactionIdentifier,
} from "./transactions.js";
-import { InternalWalletState, WalletExecutionContext } from "./wallet.js";
+import { WalletExecutionContext } from "./wallet.js";
import {
PerformCreateWithdrawalGroupResult,
getExchangeWithdrawalInfo,
@@ -252,8 +251,8 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId);
@@ -300,13 +299,13 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId);
@@ -348,9 +347,9 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -706,11 +705,7 @@ async function handlePendingMerge(
const respJson = await mergeHttpResp.json();
const kycPending = codecForWalletKycUuid().decode(respJson);
logger.info(`kyc uuid response: ${j2s(kycPending)}`);
- return processPeerPushCreditKycRequired(
- wex,
- peerInc,
- kycPending,
- );
+ return processPeerPushCreditKycRequired(wex, peerInc, kycPending);
}
logger.trace(`merge request: ${j2s(mergeReq)}`);
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index 5ee4d642b..b621b9e0e 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -281,8 +281,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { wex: ws, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
@@ -329,9 +329,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
index b467a1c47..f67fb5015 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -183,8 +183,8 @@ export class RefreshTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, refreshGroupId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
@@ -212,13 +212,13 @@ export class RefreshTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
- const { wex: ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, refreshGroupId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
@@ -252,9 +252,9 @@ export class RefreshTransactionContext implements TransactionContext {
};
},
);
- ws.taskScheduler.stopShepherdTask(this.taskId);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index e3a0bd609..1c23ab6b5 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -25,6 +25,7 @@ import {
Duration,
Logger,
NotificationType,
+ ObservabilityContext,
RetryLoopOpts,
TalerError,
TalerErrorCode,
@@ -63,6 +64,10 @@ import {
} from "./deposits.js";
import { updateExchangeFromUrlHandler } from "./exchanges.js";
import {
+ ObservableHttpClientLibrary,
+ ObservableTaskScheduler,
+} from "./observable-wrappers.js";
+import {
computePayMerchantTransactionState,
computeRefundTransactionState,
processPurchase,
@@ -134,7 +139,16 @@ function taskGivesLiveness(taskId: string): boolean {
}
}
-export class TaskScheduler {
+export interface TaskScheduler {
+ ensureRunning(): void;
+ run(opts?: RetryLoopOpts): Promise<void>;
+ startShepherdTask(taskId: TaskIdStr): void;
+ stopShepherdTask(taskId: TaskIdStr): void;
+ resetTaskRetries(taskId: TaskIdStr): Promise<void>;
+ reload(): void;
+}
+
+export class TaskSchedulerImpl implements TaskScheduler {
private sheps: Map<TaskIdStr, ShepherdInfo> = new Map();
private iterCond = new AsyncCondition();
@@ -563,16 +577,26 @@ async function callOperationHandlerForTaskId(
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
+ const oc: ObservabilityContext = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.TaskObservabilityEvent,
+ taskId,
+ event: evt,
+ });
+ }
+ },
+ };
+
const wex: WalletExecutionContext = {
ws,
cancellationToken,
cryptoApi: ws.cryptoApi,
db: ws.db,
- http: ws.http,
- taskScheduler: ws.taskScheduler,
- oc: {
- observe(event) {},
- },
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
};
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts
index 9cd8e0135..cf8f93ed1 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -308,6 +308,7 @@ export interface WalletConfig {
insecureTrustExchange: boolean;
preventThrottling: boolean;
skipDefaults: boolean;
+ emitObservabilityEvents: boolean;
};
/**
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index a779fdcec..9185ee48c 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -42,7 +42,9 @@ import {
ListGlobalCurrencyAuditorsResponse,
ListGlobalCurrencyExchangesResponse,
Logger,
+ NotificationType,
ObservabilityContext,
+ ObservabilityEventType,
OpenedPromise,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
@@ -199,6 +201,10 @@ import {
getMaxPeerPushAmount,
} from "./instructedAmountConversion.js";
import {
+ ObservableHttpClientLibrary,
+ ObservableTaskScheduler,
+} from "./observable-wrappers.js";
+import {
confirmPay,
getContractTermsDetails,
preparePayForTemplate,
@@ -225,7 +231,7 @@ import {
} from "./pay-peer-push-debit.js";
import { DbAccess } from "./query.js";
import { forceRefresh } from "./refresh.js";
-import { TaskScheduler } from "./shepherd.js";
+import { TaskScheduler, TaskSchedulerImpl } from "./shepherd.js";
import {
runIntegrationTest,
runIntegrationTest2,
@@ -1365,21 +1371,42 @@ async function handleCoreApiRequest(
id: string,
payload: unknown,
): Promise<CoreApiResponse> {
+ const oc: ObservabilityContext = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.RequestObservabilityEvent,
+ operation,
+ requestId: id,
+ event: evt,
+ });
+ }
+ },
+ };
+
const wex: WalletExecutionContext = {
ws,
cancellationToken: CancellationToken.CONTINUE,
cryptoApi: ws.cryptoApi,
db: ws.db,
- http: ws.http,
- taskScheduler: ws.taskScheduler,
- oc: {
- observe(event) {},
- },
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
};
try {
await ws.ensureWalletDbOpen();
- const result = await dispatchRequestInternal(wex, operation as any, payload);
+ oc.observe({
+ type: ObservabilityEventType.RequestStart,
+ });
+ const result = await dispatchRequestInternal(
+ wex,
+ operation as any,
+ payload,
+ );
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishSuccess,
+ });
return {
type: "response",
operation,
@@ -1391,6 +1418,9 @@ async function handleCoreApiRequest(
logger.info(
`finished wallet core request ${operation} with error: ${j2s(err)}`,
);
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishError,
+ });
return {
type: "error",
operation,
@@ -1460,6 +1490,7 @@ export class Wallet {
insecureTrustExchange: false,
denomselAllowLate: false,
skipDefaults: false,
+ emitObservabilityEvents: false,
},
};
@@ -1522,7 +1553,7 @@ export class InternalWalletState {
*/
private resourceLocks: Set<string> = new Set();
- taskScheduler: TaskScheduler = new TaskScheduler(this);
+ taskScheduler: TaskScheduler = new TaskSchedulerImpl(this);
config: Readonly<WalletConfig>;
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 44f1ee4f9..e3c4e66a2 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -146,11 +146,7 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "./versions.js";
-import {
- WalletExecutionContext,
- getDenomInfo,
- type InternalWalletState,
-} from "./wallet.js";
+import { WalletExecutionContext, getDenomInfo } from "./wallet.js";
/**
* Logger for this file.
@@ -249,8 +245,8 @@ export class WithdrawTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { wex: ws, withdrawalGroupId, transactionId, taskId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -305,19 +301,14 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(taskId);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(taskId);
+ wex.taskScheduler.stopShepherdTask(taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(taskId);
}
async resumeTransaction(): Promise<void> {
- const {
- wex: ws,
- withdrawalGroupId,
- transactionId,
- taskId: retryTag,
- } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -366,18 +357,13 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const {
- wex: ws,
- withdrawalGroupId,
- transactionId,
- taskId: retryTag,
- } = this;
- const stateUpdate = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this;
+ const stateUpdate = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -407,9 +393,9 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, stateUpdate);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, stateUpdate);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}