From d3572014b06f60250e3bb9e99898b89cd11a4294 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 27 Feb 2024 22:23:11 +0100 Subject: observability --- packages/taler-wallet-core/src/deposits.ts | 8 +- packages/taler-wallet-core/src/exchanges.ts | 6 +- .../taler-wallet-core/src/observable-wrappers.ts | 131 +++++++++++++++++++++ packages/taler-wallet-core/src/pay-merchant.ts | 25 ++-- .../taler-wallet-core/src/pay-peer-pull-credit.ts | 34 +++--- .../taler-wallet-core/src/pay-peer-pull-debit.ts | 10 +- .../taler-wallet-core/src/pay-peer-push-credit.ts | 27 ++--- .../taler-wallet-core/src/pay-peer-push-debit.ts | 10 +- packages/taler-wallet-core/src/refresh.ts | 18 +-- packages/taler-wallet-core/src/shepherd.ts | 36 +++++- packages/taler-wallet-core/src/wallet-api-types.ts | 1 + packages/taler-wallet-core/src/wallet.ts | 47 ++++++-- packages/taler-wallet-core/src/withdraw.ts | 44 +++---- 13 files changed, 282 insertions(+), 115 deletions(-) create mode 100644 packages/taler-wallet-core/src/observable-wrappers.ts (limited to 'packages/taler-wallet-core/src') diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 3abb614bd..68ebc9507 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -238,8 +238,8 @@ export class DepositTransactionContext implements TransactionContext { } async resumeTransaction(): Promise { - const { wex: ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, depositGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); @@ -276,8 +276,8 @@ export class DepositTransactionContext implements TransactionContext { }; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index 4453123fa..5b0d3e823 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -957,7 +957,7 @@ async function startUpdateExchangeEntry( newExchangeState: newExchangeState, oldExchangeState: oldExchangeState, }); - await wex.ws.taskScheduler.resetTaskRetries(taskId); + await wex.taskScheduler.resetTaskRetries(taskId); } /** @@ -1114,7 +1114,7 @@ export async function fetchFreshExchange( ): Promise { const canonUrl = canonicalizeBaseUrl(baseUrl); - wex.ws.taskScheduler.ensureRunning(); + wex.taskScheduler.ensureRunning(); await startUpdateExchangeEntry(wex, canonUrl, { forceUpdate: options.forceUpdate, @@ -1514,7 +1514,7 @@ export async function updateExchangeFromUrlHandler( }); // Asynchronously start recoup. This doesn't need to finish // for the exchange update to be considered finished. - wex.ws.taskScheduler.startShepherdTask(recoupTaskId); + wex.taskScheduler.startShepherdTask(recoupTaskId); } if (!updated) { diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts new file mode 100644 index 000000000..77839e047 --- /dev/null +++ b/packages/taler-wallet-core/src/observable-wrappers.ts @@ -0,0 +1,131 @@ +/* + This file is part of GNU Taler + (C) 2024 Taler Systems SA + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * @fileoverview Wrappers/proxies to make various interfaces observable. + */ + +/** + * Imports. + */ +import { + ObservabilityContext, + ObservabilityEventType, + RetryLoopOpts, + getErrorDetailFromException, +} from "@gnu-taler/taler-util"; +import { + HttpRequestLibrary, + HttpRequestOptions, + HttpResponse, +} from "@gnu-taler/taler-util/http"; +import { TaskIdStr } from "./common.js"; +import { TaskScheduler } from "./shepherd.js"; + +/** + * Task scheduler with extra observability events. + */ +export class ObservableTaskScheduler implements TaskScheduler { + constructor( + private impl: TaskScheduler, + private oc: ObservabilityContext, + ) {} + + private taskDepCache = new Set(); + + private declareDep(taskId: TaskIdStr): void { + if (this.taskDepCache.size > 500) { + this.taskDepCache.clear(); + } + if (!this.taskDepCache.has(taskId)) { + this.taskDepCache.add(taskId); + this.oc.observe({ + type: ObservabilityEventType.TaskDependency, + taskId, + }); + } + } + + ensureRunning(): void { + return this.impl.ensureRunning(); + } + + run(opts?: RetryLoopOpts | undefined): Promise { + return this.impl.run(opts); + } + startShepherdTask(taskId: TaskIdStr): void { + this.declareDep(taskId); + this.oc.observe({ + type: ObservabilityEventType.StartTask, + taskId, + }); + return this.impl.startShepherdTask(taskId); + } + stopShepherdTask(taskId: TaskIdStr): void { + this.declareDep(taskId); + this.oc.observe({ + type: ObservabilityEventType.StopTask, + taskId, + }); + return this.impl.stopShepherdTask(taskId); + } + resetTaskRetries(taskId: TaskIdStr): Promise { + this.declareDep(taskId); + if (this.taskDepCache.size > 500) { + this.taskDepCache.clear(); + } + this.oc.observe({ + type: ObservabilityEventType.ResetTask, + taskId, + }); + return this.impl.resetTaskRetries(taskId); + } + reload(): void { + return this.impl.reload(); + } +} + +export class ObservableHttpClientLibrary implements HttpRequestLibrary { + constructor( + private impl: HttpRequestLibrary, + private oc: ObservabilityContext, + ) {} + async fetch( + url: string, + opt?: HttpRequestOptions | undefined, + ): Promise { + this.oc.observe({ + type: ObservabilityEventType.HttpFetchStart, + url: url, + }); + try { + const res = await this.impl.fetch(url, opt); + this.oc.observe({ + type: ObservabilityEventType.HttpFetchFinishSuccess, + url, + status: res.status, + }); + return res; + } catch (e) { + this.oc.observe({ + type: ObservabilityEventType.HttpFetchFinishError, + url, + error: getErrorDetailFromException(e), + }); + throw e; + } + } +} diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index a576930ba..e92156572 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -140,7 +140,6 @@ import { import { EXCHANGE_COINS_LOCK, getDenomInfo, - InternalWalletState, WalletExecutionContext, } from "./wallet.js"; import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; @@ -244,9 +243,9 @@ export class PayMerchantTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { wex: ws, proposalId, transactionId } = this; - ws.taskScheduler.stopShepherdTask(this.taskId); - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, proposalId, transactionId } = this; + wex.taskScheduler.stopShepherdTask(this.taskId); + const transitionInfo = await wex.db.runReadWriteTx( ["purchases"], async (tx) => { const purchase = await tx.purchases.get(proposalId); @@ -263,7 +262,7 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { @@ -334,8 +333,8 @@ export class PayMerchantTransactionContext implements TransactionContext { } async resumeTransaction(): Promise { - const { wex: ws, proposalId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, proposalId, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["purchases"], async (tx) => { const purchase = await tx.purchases.get(proposalId); @@ -352,13 +351,13 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(this.taskId); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { - const { wex: ws, proposalId, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, proposalId, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( [ "purchases", "refreshGroups", @@ -387,8 +386,8 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.stopShepherdTask(this.taskId); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.stopShepherdTask(this.taskId); } } diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index c999a8d1f..de30f66d2 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -137,8 +137,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); @@ -192,13 +192,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); } async failTransaction(): Promise { - const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); @@ -243,13 +243,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.stopShepherdTask(retryTag); } async resumeTransaction(): Promise { - const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); @@ -302,13 +302,13 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } async abortTransaction(): Promise { - const { wex: ws, pursePub, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullCredit"], async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); @@ -356,9 +356,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } } diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 828f68113..7348a30ce 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -300,9 +300,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext { >, ) => Promise, ): Promise { - const ws = this.wex; + const wex = this.wex; const extraStores = opts.extraStores ?? []; - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["peerPullDebit", ...extraStores], async (tx) => { const pi = await tx.peerPullDebit.get(this.peerPullDebitId); @@ -325,9 +325,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } }, ); - ws.taskScheduler.stopShepherdTask(this.taskId); - notifyTransition(ws, this.transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(this.taskId); + wex.taskScheduler.stopShepherdTask(this.taskId); + notifyTransition(wex, this.transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(this.taskId); } } diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts index 772007bb6..be2b3a7bc 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -17,7 +17,6 @@ import { AcceptPeerPushPaymentResponse, Amounts, - CancellationToken, ConfirmPeerPushCreditRequest, ContractTermsUtil, ExchangePurseMergeRequest, @@ -83,7 +82,7 @@ import { notifyTransition, parseTransactionIdentifier, } from "./transactions.js"; -import { InternalWalletState, WalletExecutionContext } from "./wallet.js"; +import { WalletExecutionContext } from "./wallet.js"; import { PerformCreateWithdrawalGroupResult, getExchangeWithdrawalInfo, @@ -252,8 +251,8 @@ export class PeerPushCreditTransactionContext implements TransactionContext { } async resumeTransaction(): Promise { - const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId); @@ -300,13 +299,13 @@ export class PeerPushCreditTransactionContext implements TransactionContext { return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { - const { wex: ws, peerPushCreditId, taskId: retryTag, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, peerPushCreditId, taskId: retryTag, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { const pushCreditRec = await tx.peerPushCredit.get(peerPushCreditId); @@ -348,9 +347,9 @@ export class PeerPushCreditTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } } @@ -706,11 +705,7 @@ async function handlePendingMerge( const respJson = await mergeHttpResp.json(); const kycPending = codecForWalletKycUuid().decode(respJson); logger.info(`kyc uuid response: ${j2s(kycPending)}`); - return processPeerPushCreditKycRequired( - wex, - peerInc, - kycPending, - ); + return processPeerPushCreditKycRequired(wex, peerInc, kycPending); } logger.trace(`merge request: ${j2s(mergeReq)}`); diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index 5ee4d642b..b621b9e0e 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -281,8 +281,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } async failTransaction(): Promise { - const { wex: ws, pursePub, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["peerPushDebit"], async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); @@ -329,9 +329,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } } diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts index b467a1c47..f67fb5015 100644 --- a/packages/taler-wallet-core/src/refresh.ts +++ b/packages/taler-wallet-core/src/refresh.ts @@ -183,8 +183,8 @@ export class RefreshTransactionContext implements TransactionContext { } async resumeTransaction(): Promise { - const { wex: ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, refreshGroupId, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); @@ -212,13 +212,13 @@ export class RefreshTransactionContext implements TransactionContext { return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(this.taskId); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { - const { wex: ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, refreshGroupId, transactionId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); @@ -252,9 +252,9 @@ export class RefreshTransactionContext implements TransactionContext { }; }, ); - ws.taskScheduler.stopShepherdTask(this.taskId); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(this.taskId); + wex.taskScheduler.stopShepherdTask(this.taskId); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(this.taskId); } } diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index e3a0bd609..1c23ab6b5 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -25,6 +25,7 @@ import { Duration, Logger, NotificationType, + ObservabilityContext, RetryLoopOpts, TalerError, TalerErrorCode, @@ -62,6 +63,10 @@ import { processDepositGroup, } from "./deposits.js"; import { updateExchangeFromUrlHandler } from "./exchanges.js"; +import { + ObservableHttpClientLibrary, + ObservableTaskScheduler, +} from "./observable-wrappers.js"; import { computePayMerchantTransactionState, computeRefundTransactionState, @@ -134,7 +139,16 @@ function taskGivesLiveness(taskId: string): boolean { } } -export class TaskScheduler { +export interface TaskScheduler { + ensureRunning(): void; + run(opts?: RetryLoopOpts): Promise; + startShepherdTask(taskId: TaskIdStr): void; + stopShepherdTask(taskId: TaskIdStr): void; + resetTaskRetries(taskId: TaskIdStr): Promise; + reload(): void; +} + +export class TaskSchedulerImpl implements TaskScheduler { private sheps: Map = new Map(); private iterCond = new AsyncCondition(); @@ -563,16 +577,26 @@ async function callOperationHandlerForTaskId( taskId: TaskIdStr, cancellationToken: CancellationToken, ): Promise { + const oc: ObservabilityContext = { + observe(evt) { + if (ws.config.testing.emitObservabilityEvents) { + ws.notify({ + type: NotificationType.TaskObservabilityEvent, + taskId, + event: evt, + }); + } + }, + }; + const wex: WalletExecutionContext = { ws, cancellationToken, cryptoApi: ws.cryptoApi, db: ws.db, - http: ws.http, - taskScheduler: ws.taskScheduler, - oc: { - observe(event) {}, - }, + http: new ObservableHttpClientLibrary(ws.http, oc), + taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), + oc, }; const pending = parseTaskIdentifier(taskId); switch (pending.tag) { diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts index 9cd8e0135..cf8f93ed1 100644 --- a/packages/taler-wallet-core/src/wallet-api-types.ts +++ b/packages/taler-wallet-core/src/wallet-api-types.ts @@ -308,6 +308,7 @@ export interface WalletConfig { insecureTrustExchange: boolean; preventThrottling: boolean; skipDefaults: boolean; + emitObservabilityEvents: boolean; }; /** diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index a779fdcec..9185ee48c 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -42,7 +42,9 @@ import { ListGlobalCurrencyAuditorsResponse, ListGlobalCurrencyExchangesResponse, Logger, + NotificationType, ObservabilityContext, + ObservabilityEventType, OpenedPromise, PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, @@ -198,6 +200,10 @@ import { getMaxDepositAmount, getMaxPeerPushAmount, } from "./instructedAmountConversion.js"; +import { + ObservableHttpClientLibrary, + ObservableTaskScheduler, +} from "./observable-wrappers.js"; import { confirmPay, getContractTermsDetails, @@ -225,7 +231,7 @@ import { } from "./pay-peer-push-debit.js"; import { DbAccess } from "./query.js"; import { forceRefresh } from "./refresh.js"; -import { TaskScheduler } from "./shepherd.js"; +import { TaskScheduler, TaskSchedulerImpl } from "./shepherd.js"; import { runIntegrationTest, runIntegrationTest2, @@ -1365,21 +1371,42 @@ async function handleCoreApiRequest( id: string, payload: unknown, ): Promise { + const oc: ObservabilityContext = { + observe(evt) { + if (ws.config.testing.emitObservabilityEvents) { + ws.notify({ + type: NotificationType.RequestObservabilityEvent, + operation, + requestId: id, + event: evt, + }); + } + }, + }; + const wex: WalletExecutionContext = { ws, cancellationToken: CancellationToken.CONTINUE, cryptoApi: ws.cryptoApi, db: ws.db, - http: ws.http, - taskScheduler: ws.taskScheduler, - oc: { - observe(event) {}, - }, + http: new ObservableHttpClientLibrary(ws.http, oc), + taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), + oc, }; try { await ws.ensureWalletDbOpen(); - const result = await dispatchRequestInternal(wex, operation as any, payload); + oc.observe({ + type: ObservabilityEventType.RequestStart, + }); + const result = await dispatchRequestInternal( + wex, + operation as any, + payload, + ); + oc.observe({ + type: ObservabilityEventType.RequestFinishSuccess, + }); return { type: "response", operation, @@ -1391,6 +1418,9 @@ async function handleCoreApiRequest( logger.info( `finished wallet core request ${operation} with error: ${j2s(err)}`, ); + oc.observe({ + type: ObservabilityEventType.RequestFinishError, + }); return { type: "error", operation, @@ -1460,6 +1490,7 @@ export class Wallet { insecureTrustExchange: false, denomselAllowLate: false, skipDefaults: false, + emitObservabilityEvents: false, }, }; @@ -1522,7 +1553,7 @@ export class InternalWalletState { */ private resourceLocks: Set = new Set(); - taskScheduler: TaskScheduler = new TaskScheduler(this); + taskScheduler: TaskScheduler = new TaskSchedulerImpl(this); config: Readonly; diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 44f1ee4f9..e3c4e66a2 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -146,11 +146,7 @@ import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION, } from "./versions.js"; -import { - WalletExecutionContext, - getDenomInfo, - type InternalWalletState, -} from "./wallet.js"; +import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; /** * Logger for this file. @@ -249,8 +245,8 @@ export class WithdrawTransactionContext implements TransactionContext { } async abortTransaction(): Promise { - const { wex: ws, withdrawalGroupId, transactionId, taskId } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, withdrawalGroupId, transactionId, taskId } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["withdrawalGroups"], async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); @@ -305,19 +301,14 @@ export class WithdrawTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(taskId); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(taskId); + wex.taskScheduler.stopShepherdTask(taskId); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(taskId); } async resumeTransaction(): Promise { - const { - wex: ws, - withdrawalGroupId, - transactionId, - taskId: retryTag, - } = this; - const transitionInfo = await ws.db.runReadWriteTx( + const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( ["withdrawalGroups"], async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); @@ -366,18 +357,13 @@ export class WithdrawTransactionContext implements TransactionContext { return undefined; }, ); - notifyTransition(ws, transactionId, transitionInfo); - ws.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { - const { - wex: ws, - withdrawalGroupId, - transactionId, - taskId: retryTag, - } = this; - const stateUpdate = await ws.db.runReadWriteTx( + const { wex, withdrawalGroupId, transactionId, taskId: retryTag } = this; + const stateUpdate = await wex.db.runReadWriteTx( ["withdrawalGroups"], async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); @@ -407,9 +393,9 @@ export class WithdrawTransactionContext implements TransactionContext { return undefined; }, ); - ws.taskScheduler.stopShepherdTask(retryTag); - notifyTransition(ws, transactionId, stateUpdate); - ws.taskScheduler.startShepherdTask(retryTag); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, stateUpdate); + wex.taskScheduler.startShepherdTask(retryTag); } } -- cgit v1.2.3