summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-27 22:23:11 +0100
committerFlorian Dold <florian@dold.me>2024-02-27 22:23:11 +0100
commitd3572014b06f60250e3bb9e99898b89cd11a4294 (patch)
treed98879e8198cc748513b4c331ee3a764ea10638e /packages/taler-wallet-core/src
parentf08798520ef9b8b0ff36c6aaf93653605b53b912 (diff)
downloadwallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.tar.gz
wallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.tar.bz2
wallet-core-d3572014b06f60250e3bb9e99898b89cd11a4294.zip
observability
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r--packages/taler-wallet-core/src/deposits.ts8
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts6
-rw-r--r--packages/taler-wallet-core/src/observable-wrappers.ts131
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts25
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-credit.ts34
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts10
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-credit.ts27
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts10
-rw-r--r--packages/taler-wallet-core/src/refresh.ts18
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts36
-rw-r--r--packages/taler-wallet-core/src/wallet-api-types.ts1
-rw-r--r--packages/taler-wallet-core/src/wallet.ts47
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts44
13 files changed, 282 insertions, 115 deletions
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 3abb614bd..68ebc9507 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -238,8 +238,8 @@ export class DepositTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, depositGroupId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["depositGroups"],
async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
@@ -276,8 +276,8 @@ export class DepositTransactionContext implements TransactionContext {
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index 4453123fa..5b0d3e823 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -957,7 +957,7 @@ async function startUpdateExchangeEntry(
newExchangeState: newExchangeState,
oldExchangeState: oldExchangeState,
});
- await wex.ws.taskScheduler.resetTaskRetries(taskId);
+ await wex.taskScheduler.resetTaskRetries(taskId);
}
/**
@@ -1114,7 +1114,7 @@ export async function fetchFreshExchange(
): Promise<ReadyExchangeSummary> {
const canonUrl = canonicalizeBaseUrl(baseUrl);
- wex.ws.taskScheduler.ensureRunning();
+ wex.taskScheduler.ensureRunning();
await startUpdateExchangeEntry(wex, canonUrl, {
forceUpdate: options.forceUpdate,
@@ -1514,7 +1514,7 @@ export async function updateExchangeFromUrlHandler(
});
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
- wex.ws.taskScheduler.startShepherdTask(recoupTaskId);
+ wex.taskScheduler.startShepherdTask(recoupTaskId);
}
if (!updated) {
diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts
new file mode 100644
index 000000000..77839e047
--- /dev/null
+++ b/packages/taler-wallet-core/src/observable-wrappers.ts
@@ -0,0 +1,131 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * @fileoverview Wrappers/proxies to make various interfaces observable.
+ */
+
+/**
+ * Imports.
+ */
+import {
+ ObservabilityContext,
+ ObservabilityEventType,
+ RetryLoopOpts,
+ getErrorDetailFromException,
+} from "@gnu-taler/taler-util";
+import {
+ HttpRequestLibrary,
+ HttpRequestOptions,
+ HttpResponse,
+} from "@gnu-taler/taler-util/http";
+import { TaskIdStr } from "./common.js";
+import { TaskScheduler } from "./shepherd.js";
+
+/**
+ * Task scheduler with extra observability events.
+ */
+export class ObservableTaskScheduler implements TaskScheduler {
+ constructor(
+ private impl: TaskScheduler,
+ private oc: ObservabilityContext,
+ ) {}
+
+ private taskDepCache = new Set<string>();
+
+ private declareDep(taskId: TaskIdStr): void {
+ if (this.taskDepCache.size > 500) {
+ this.taskDepCache.clear();
+ }
+ if (!this.taskDepCache.has(taskId)) {
+ this.taskDepCache.add(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.TaskDependency,
+ taskId,
+ });
+ }
+ }
+
+ ensureRunning(): void {
+ return this.impl.ensureRunning();
+ }
+
+ run(opts?: RetryLoopOpts | undefined): Promise<void> {
+ return this.impl.run(opts);
+ }
+ startShepherdTask(taskId: TaskIdStr): void {
+ this.declareDep(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.StartTask,
+ taskId,
+ });
+ return this.impl.startShepherdTask(taskId);
+ }
+ stopShepherdTask(taskId: TaskIdStr): void {
+ this.declareDep(taskId);
+ this.oc.observe({
+ type: ObservabilityEventType.StopTask,
+ taskId,
+ });
+ return this.impl.stopShepherdTask(taskId);
+ }
+ resetTaskRetries(taskId: TaskIdStr): Promise<void> {
+ this.declareDep(taskId);
+ if (this.taskDepCache.size > 500) {
+ this.taskDepCache.clear();
+ }
+ this.oc.observe({
+ type: ObservabilityEventType.ResetTask,
+ taskId,
+ });
+ return this.impl.resetTaskRetries(taskId);
+ }
+ reload(): void {
+ return this.impl.reload();
+ }
+}
+
+export class ObservableHttpClientLibrary implements HttpRequestLibrary {
+ constructor(
+ private impl: HttpRequestLibrary,
+ private oc: ObservabilityContext,
+ ) {}
+ async fetch(
+ url: string,
+ opt?: HttpRequestOptions | undefined,
+ ): Promise<HttpResponse> {
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchStart,
+ url: url,
+ });
+ try {
+ const res = await this.impl.fetch(url, opt);
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchFinishSuccess,
+ url,
+ status: res.status,
+ });
+ return res;
+ } catch (e) {
+ this.oc.observe({
+ type: ObservabilityEventType.HttpFetchFinishError,
+ url,
+ error: getErrorDetailFromException(e),
+ });
+ throw e;
+ }
+ }
+}
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index a576930ba..e92156572 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -140,7 +140,6 @@ import {
import {
EXCHANGE_COINS_LOCK,
getDenomInfo,
- InternalWalletState,
WalletExecutionContext,
} from "./wallet.js";
import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
@@ -244,9 +243,9 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId } = this;
- ws.taskScheduler.stopShepherdTask(this.taskId);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId } = this;
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ const transitionInfo = await wex.db.runReadWriteTx(
["purchases"],
async (tx) => {
const purchase = await tx.purchases.get(proposalId);
@@ -263,7 +262,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
@@ -334,8 +333,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["purchases"],
async (tx) => {
const purchase = await tx.purchases.get(proposalId);
@@ -352,13 +351,13 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
- const { wex: ws, proposalId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, proposalId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"purchases",
"refreshGroups",
@@ -387,8 +386,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index c999a8d1f..de30f66d2 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -137,8 +137,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -192,13 +192,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -243,13 +243,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -302,13 +302,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
- const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
@@ -356,9 +356,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 828f68113..7348a30ce 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -300,9 +300,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
>,
) => Promise<TransitionResult>,
): Promise<void> {
- const ws = this.wex;
+ const wex = this.wex;
const extraStores = opts.extraStores ?? [];
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullDebit", ...extraStores],
async (tx) => {
const pi = await tx.peerPullDebit.get(this.peerPullDebitId);
@@ -325,9 +325,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
},
);
- ws.taskScheduler.stopShepherdTask(this.taskId);
- notifyTransition(ws, this.transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, this.transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
index 772007bb6..be2b3a7bc 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -17,7 +17,6 @@
import {
AcceptPeerPushPaymentResponse,
Amounts,
- CancellationToken,
ConfirmPeerPushCreditRequest,
ContractTermsUtil,
ExchangePurseMergeRequest,
@@ -83,7 +82,7 @@ import {
notifyTransition,
parseTransactionIdentifier,
} from "./transactions.js";
-import { InternalWalletState, WalletExecutionContext } from "./wallet.js";
+import { WalletExecutionContext } from "./wallet.js";
import {
PerformCreateWithdrawalGroupResult,
getExchangeWithdrawalInfo,
@@ -252,8 +251,8 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId);
@@ -300,13 +299,13 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId);
@@ -348,9 +347,9 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -706,11 +705,7 @@ async function handlePendingMerge(
const respJson = await mergeHttpResp.json();
const kycPending = codecForWalletKycUuid().decode(respJson);
logger.info(`kyc uuid response: ${j2s(kycPending)}`);
- return processPeerPushCreditKycRequired(
- wex,
- peerInc,
- kycPending,
- );
+ return processPeerPushCreditKycRequired(wex, peerInc, kycPending);
}
logger.trace(`merge request: ${j2s(mergeReq)}`);
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index 5ee4d642b..b621b9e0e 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -281,8 +281,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { wex: ws, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
@@ -329,9 +329,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
index b467a1c47..f67fb5015 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -183,8 +183,8 @@ export class RefreshTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { wex: ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, refreshGroupId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
@@ -212,13 +212,13 @@ export class RefreshTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
- const { wex: ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, refreshGroupId, transactionId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
@@ -252,9 +252,9 @@ export class RefreshTransactionContext implements TransactionContext {
};
},
);
- ws.taskScheduler.stopShepherdTask(this.taskId);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(this.taskId);
+ wex.taskScheduler.stopShepherdTask(this.taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index e3a0bd609..1c23ab6b5 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -25,6 +25,7 @@ import {
Duration,
Logger,
NotificationType,
+ ObservabilityContext,
RetryLoopOpts,
TalerError,
TalerErrorCode,
@@ -63,6 +64,10 @@ import {
} from "./deposits.js";
import { updateExchangeFromUrlHandler } from "./exchanges.js";
import {
+ ObservableHttpClientLibrary,
+ ObservableTaskScheduler,
+} from "./observable-wrappers.js";
+import {
computePayMerchantTransactionState,
computeRefundTransactionState,
processPurchase,
@@ -134,7 +139,16 @@ function taskGivesLiveness(taskId: string): boolean {
}
}
-export class TaskScheduler {
+export interface TaskScheduler {
+ ensureRunning(): void;
+ run(opts?: RetryLoopOpts): Promise<void>;
+ startShepherdTask(taskId: TaskIdStr): void;
+ stopShepherdTask(taskId: TaskIdStr): void;
+ resetTaskRetries(taskId: TaskIdStr): Promise<void>;
+ reload(): void;
+}
+
+export class TaskSchedulerImpl implements TaskScheduler {
private sheps: Map<TaskIdStr, ShepherdInfo> = new Map();
private iterCond = new AsyncCondition();
@@ -563,16 +577,26 @@ async function callOperationHandlerForTaskId(
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
+ const oc: ObservabilityContext = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.TaskObservabilityEvent,
+ taskId,
+ event: evt,
+ });
+ }
+ },
+ };
+
const wex: WalletExecutionContext = {
ws,
cancellationToken,
cryptoApi: ws.cryptoApi,
db: ws.db,
- http: ws.http,
- taskScheduler: ws.taskScheduler,
- oc: {
- observe(event) {},
- },
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
};
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts
index 9cd8e0135..cf8f93ed1 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -308,6 +308,7 @@ export interface WalletConfig {
insecureTrustExchange: boolean;
preventThrottling: boolean;
skipDefaults: boolean;
+ emitObservabilityEvents: boolean;
};
/**
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index a779fdcec..9185ee48c 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -42,7 +42,9 @@ import {
ListGlobalCurrencyAuditorsResponse,
ListGlobalCurrencyExchangesResponse,
Logger,
+ NotificationType,
ObservabilityContext,
+ ObservabilityEventType,
OpenedPromise,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
@@ -199,6 +201,10 @@ import {
getMaxPeerPushAmount,
} from "./instructedAmountConversion.js";
import {
+ ObservableHttpClientLibrary,
+ ObservableTaskScheduler,
+} from "./observable-wrappers.js";
+import {
confirmPay,
getContractTermsDetails,
preparePayForTemplate,
@@ -225,7 +231,7 @@ import {
} from "./pay-peer-push-debit.js";
import { DbAccess } from "./query.js";
import { forceRefresh } from "./refresh.js";
-import { TaskScheduler } from "./shepherd.js";
+import { TaskScheduler, TaskSchedulerImpl } from "./shepherd.js";
import {
runIntegrationTest,
runIntegrationTest2,
@@ -1365,21 +1371,42 @@ async function handleCoreApiRequest(
id: string,
payload: unknown,
): Promise<CoreApiResponse> {
+ const oc: ObservabilityContext = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.RequestObservabilityEvent,
+ operation,
+ requestId: id,
+ event: evt,
+ });
+ }
+ },
+ };
+
const wex: WalletExecutionContext = {
ws,
cancellationToken: CancellationToken.CONTINUE,
cryptoApi: ws.cryptoApi,
db: ws.db,
- http: ws.http,
- taskScheduler: ws.taskScheduler,
- oc: {
- observe(event) {},
- },
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
};
try {
await ws.ensureWalletDbOpen();
- const result = await dispatchRequestInternal(wex, operation as any, payload);
+ oc.observe({
+ type: ObservabilityEventType.RequestStart,
+ });
+ const result = await dispatchRequestInternal(
+ wex,
+ operation as any,
+ payload,
+ );
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishSuccess,
+ });
return {
type: "response",
operation,
@@ -1391,6 +1418,9 @@ async function handleCoreApiRequest(
logger.info(
`finished wallet core request ${operation} with error: ${j2s(err)}`,
);
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishError,
+ });
return {
type: "error",
operation,
@@ -1460,6 +1490,7 @@ export class Wallet {
insecureTrustExchange: false,
denomselAllowLate: false,
skipDefaults: false,
+ emitObservabilityEvents: false,
},
};
@@ -1522,7 +1553,7 @@ export class InternalWalletState {
*/
private resourceLocks: Set<string> = new Set();
- taskScheduler: TaskScheduler = new TaskScheduler(this);
+ taskScheduler: TaskScheduler = new TaskSchedulerImpl(this);
config: Readonly<WalletConfig>;
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 44f1ee4f9..e3c4e66a2 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -146,11 +146,7 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "./versions.js";
-import {
- WalletExecutionContext,
- getDenomInfo,
- type InternalWalletState,
-} from "./wallet.js";
+import { WalletExecutionContext, getDenomInfo } from "./wallet.js";
/**
* Logger for this file.
@@ -249,8 +245,8 @@ export class WithdrawTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { wex: ws, withdrawalGroupId, transactionId, taskId } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -305,19 +301,14 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(taskId);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(taskId);
+ wex.taskScheduler.stopShepherdTask(taskId);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(taskId);
}
async resumeTransaction(): Promise<void> {
- const {
- wex: ws,
- withdrawalGroupId,
- transactionId,
- taskId: retryTag,
- } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -366,18 +357,13 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const {
- wex: ws,
- withdrawalGroupId,
- transactionId,
- taskId: retryTag,
- } = this;
- const stateUpdate = await ws.db.runReadWriteTx(
+ const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this;
+ const stateUpdate = await wex.db.runReadWriteTx(
["withdrawalGroups"],
async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -407,9 +393,9 @@ export class WithdrawTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, stateUpdate);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, stateUpdate);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
}