diff options
author | Florian Dold <florian@dold.me> | 2024-04-22 23:29:07 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2024-04-22 23:29:07 +0200 |
commit | a181ee06e4b52cb35e00ff8c86acff315135faf2 (patch) | |
tree | 9961ae277d861f93818c253e3992ad25128f6377 | |
parent | e944c27e43474e8db464fbc593607e4e9d89576d (diff) | |
download | wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.gz wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.bz2 wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.zip |
wallet-core: unify handling of run-until-done, simplify waiter implementation
-rw-r--r-- | packages/taler-harness/src/bench1.ts | 9 | ||||
-rw-r--r-- | packages/taler-harness/src/bench3.ts | 8 | ||||
-rw-r--r-- | packages/taler-util/src/notifications.ts | 8 | ||||
-rw-r--r-- | packages/taler-util/src/wallet-types.ts | 10 | ||||
-rw-r--r-- | packages/taler-wallet-cli/src/index.ts | 31 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/common.ts | 52 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/observable-wrappers.ts | 15 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 54 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/testing.ts | 297 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet-api-types.ts | 11 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 15 |
11 files changed, 247 insertions, 263 deletions
diff --git a/packages/taler-harness/src/bench1.ts b/packages/taler-harness/src/bench1.ts index 428114e0e..216760260 100644 --- a/packages/taler-harness/src/bench1.ts +++ b/packages/taler-harness/src/bench1.ts @@ -29,7 +29,6 @@ import { } from "@gnu-taler/taler-util"; import { AccessStats, - applyRunConfigDefaults, createNativeWalletHost2, Wallet, WalletApiOperation, @@ -105,9 +104,7 @@ export async function runBench1(configJson: any): Promise<void> { exchangeBaseUrl: b1conf.exchange, }); - await wallet.runTaskLoop({ - stopWhenDone: true, - }); + await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {}); logger.info( `Finished withdrawal amount=${withdrawAmount} time=${Date.now() - start}`, @@ -123,9 +120,7 @@ export async function runBench1(configJson: any): Promise<void> { depositPaytoUri: b1conf.payto, }); - await wallet.runTaskLoop({ - stopWhenDone: true, - }); + await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {}); logger.info(`Finished deposit amount=10 time=${Date.now() - start}`); } diff --git a/packages/taler-harness/src/bench3.ts b/packages/taler-harness/src/bench3.ts index f138dff68..a5bc094df 100644 --- a/packages/taler-harness/src/bench3.ts +++ b/packages/taler-harness/src/bench3.ts @@ -115,9 +115,7 @@ export async function runBench3(configJson: any): Promise<void> { exchangeBaseUrl: b3conf.exchange, }); - await wallet.runTaskLoop({ - stopWhenDone: true, - }); + await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {}); logger.info( `Finished withdrawal amount=${withdrawAmount} time=${Date.now() - start}`, @@ -135,9 +133,7 @@ export async function runBench3(configJson: any): Promise<void> { depositPaytoUri: payto, }); - await wallet.runTaskLoop({ - stopWhenDone: true, - }); + await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {}); logger.info(`Finished deposit amount=10 time=${Date.now() - start}`); } diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts index 1c6ca4b85..b60fb267c 100644 --- a/packages/taler-util/src/notifications.ts +++ b/packages/taler-util/src/notifications.ts @@ -32,6 +32,7 @@ export enum NotificationType { TransactionStateTransition = "transaction-state-transition", WithdrawalOperationTransition = "withdrawal-operation-transition", ExchangeStateTransition = "exchange-state-transition", + Idle = "idle", TaskObservabilityEvent = "task-observability-event", RequestObservabilityEvent = "request-observability-event", } @@ -230,6 +231,10 @@ export interface WithdrawalOperationTransitionNotification { uri: string; } +export interface IdleNotification { + type: NotificationType.Idle; +} + export type WalletNotification = | BalanceChangeNotification | WithdrawalOperationTransitionNotification @@ -237,4 +242,5 @@ export type WalletNotification = | ExchangeStateTransitionNotification | TransactionStateTransitionNotification | TaskProgressNotification - | RequestProgressNotification; + | RequestProgressNotification + | IdleNotification; diff --git a/packages/taler-util/src/wallet-types.ts b/packages/taler-util/src/wallet-types.ts index 0653bc473..d39eb3ce9 100644 --- a/packages/taler-util/src/wallet-types.ts +++ b/packages/taler-util/src/wallet-types.ts @@ -3213,16 +3213,6 @@ export const codecForRemoveGlobalCurrencyAuditorRequest = .property("auditorPub", codecForString()) .build("RemoveGlobalCurrencyAuditorRequest"); -export interface RetryLoopOpts { - /** - * Stop the retry loop when all lifeness-giving pending operations - * are done. - * - * Defaults to false. - */ - stopWhenDone?: boolean; -} - /** * Information about one provider. * diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 8c4760223..f0ff02903 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -358,25 +358,6 @@ async function withWallet<T>( } } -/** - * Run a function with a local wallet. - * - * Stops the wallet after the function is done. - */ -async function withLocalWallet<T>( - walletCliArgs: WalletCliArgsType, - f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>, -): Promise<T> { - const wh = await createLocalWallet(walletCliArgs, (notif) => { - writeObservabilityLog(notif); - }); - const w = wh.wallet; - const res = await f({ client: w.client, ws: w }); - logger.info("Work done, stopping wallet."); - w.stop(); - return res; -} - walletCli .subcommand("balance", "balance", { help: "Show wallet balance." }) .flag("json", ["--json"], { @@ -584,12 +565,8 @@ walletCli help: "Run until no more work is left.", }) .action(async (args) => { - await withLocalWallet(args, async (wallet) => { - logger.info("running until pending operations are finished"); - await wallet.ws.runTaskLoop({ - stopWhenDone: true, - }); - wallet.ws.stop(); + await withWallet(args, async (ctx) => { + await ctx.client.call(WalletApiOperation.TestingWaitTasksDone, {}); }); }); @@ -1330,9 +1307,7 @@ advancedCli exchangeBaseUrl: "http://localhost:8081/", merchantBaseUrl: "http://localhost:8083/", }); - await wallet.runTaskLoop({ - stopWhenDone: true, - }); + await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {}); wallet.stop(); }); diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 6d116c47e..edaba5ba4 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -21,6 +21,7 @@ import { AbsoluteTime, AmountJson, Amounts, + AsyncFlag, CoinRefreshRequest, CoinStatus, Duration, @@ -35,6 +36,7 @@ import { TalerProtocolTimestamp, TombstoneIdStr, TransactionIdStr, + WalletNotification, assertUnreachable, checkDbInvariant, checkLogicInvariant, @@ -769,3 +771,53 @@ export enum PendingTaskType { declare const __taskIdStr: unique symbol; export type TaskIdStr = string & { [__taskIdStr]: true }; + +/** + * Wait until the wallet is in a particular state. + * + * Two functions must be provided: + * 1. checkState, which checks if the wallet is in the + * desired state. + * 2. filterNotification, which checks whether a notification + * might have lead to a state change. + */ +export async function genericWaitForState( + wex: WalletExecutionContext, + args: { + checkState: () => Promise<boolean>; + filterNotification: (notif: WalletNotification) => boolean; + }, +): Promise<void> { + await wex.taskScheduler.ensureRunning(); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const flag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our refresh. + const cancelNotif = wex.ws.addNotificationListener((notif) => { + if (args.filterNotification(notif)) { + flag.raise(); + } + }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + flag.raise(); + }); + + try { + while (true) { + if (wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } + if (await args.checkState()) { + return; + } + // Wait for the next transition + await flag.wait(); + flag.reset(); + } + } catch (e) { + unregisterOnCancelled(); + cancelNotif(); + } +} diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts index b36f41611..7cd65f38e 100644 --- a/packages/taler-wallet-core/src/observable-wrappers.ts +++ b/packages/taler-wallet-core/src/observable-wrappers.ts @@ -25,7 +25,6 @@ import { IDBDatabase } from "@gnu-taler/idb-bridge"; import { ObservabilityContext, ObservabilityEventType, - RetryLoopOpts, } from "@gnu-taler/taler-util"; import { TaskIdStr } from "./common.js"; import { TalerCryptoInterface } from "./index.js"; @@ -65,13 +64,14 @@ export class ObservableTaskScheduler implements TaskScheduler { return this.impl.getActiveTasks(); } - ensureRunning(): void { - return this.impl.ensureRunning(); + isIdle(): boolean { + return this.impl.isIdle(); } - run(opts?: RetryLoopOpts | undefined): Promise<void> { - return this.impl.run(opts); + ensureRunning(): Promise<void> { + return this.impl.ensureRunning(); } + startShepherdTask(taskId: TaskIdStr): void { this.declareDep(taskId); this.oc.observe({ @@ -80,6 +80,7 @@ export class ObservableTaskScheduler implements TaskScheduler { }); return this.impl.startShepherdTask(taskId); } + stopShepherdTask(taskId: TaskIdStr): void { this.declareDep(taskId); this.oc.observe({ @@ -88,6 +89,7 @@ export class ObservableTaskScheduler implements TaskScheduler { }); return this.impl.stopShepherdTask(taskId); } + resetTaskRetries(taskId: TaskIdStr): Promise<void> { this.declareDep(taskId); if (this.taskDepCache.size > 500) { @@ -99,7 +101,8 @@ export class ObservableTaskScheduler implements TaskScheduler { }); return this.impl.resetTaskRetries(taskId); } - reload(): void { + + async reload(): Promise<void> { return this.impl.reload(); } } diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 58bdcf0dd..aae6d5a18 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -27,7 +27,6 @@ import { NotificationType, ObservabilityContext, ObservabilityEventType, - RetryLoopOpts, TalerErrorDetail, TaskThrottler, TransactionIdStr, @@ -142,13 +141,13 @@ function taskGivesLiveness(taskId: string): boolean { } export interface TaskScheduler { - ensureRunning(): void; - run(opts?: RetryLoopOpts): Promise<void>; + ensureRunning(): Promise<void>; startShepherdTask(taskId: TaskIdStr): void; stopShepherdTask(taskId: TaskIdStr): void; resetTaskRetries(taskId: TaskIdStr): Promise<void>; - reload(): void; + reload(): Promise<void>; getActiveTasks(): TaskIdStr[]; + isIdle(): boolean; } export class TaskSchedulerImpl implements TaskScheduler { @@ -176,10 +175,11 @@ export class TaskSchedulerImpl implements TaskScheduler { return [...this.sheps.keys()]; } - ensureRunning(): void { + async ensureRunning(): Promise<void> { if (this.isRunning) { return; } + await this.loadTasksFromDb(); this.run() .catch((e) => { logger.error("error running task loop"); @@ -190,7 +190,22 @@ export class TaskSchedulerImpl implements TaskScheduler { }); } - async run(opts: RetryLoopOpts = {}): Promise<void> { + isIdle(): boolean { + let alive = false; + const taskIds = [...this.sheps.keys()]; + logger.info(`current task IDs: ${j2s(taskIds)}`); + logger.info(`sheps: ${this.sheps.size}`); + for (const taskId of taskIds) { + if (taskGivesLiveness(taskId)) { + alive = true; + break; + } + } + // We're idle if no task is alive anymore. + return !alive; + } + + private async run(): Promise<void> { if (this.isRunning) { throw Error("task loop already running"); } @@ -200,26 +215,17 @@ export class TaskSchedulerImpl implements TaskScheduler { logger.info("loaded!"); logger.info(`sheps: ${this.sheps.size}`); while (true) { - if (opts.stopWhenDone) { - let alive = false; - const taskIds = [...this.sheps.keys()]; - logger.info(`current task IDs: ${j2s(taskIds)}`); - logger.info(`sheps: ${this.sheps.size}`); - for (const taskId of taskIds) { - if (taskGivesLiveness(taskId)) { - alive = true; - break; - } - } - if (!alive) { - logger.info("Breaking out of task loop (no more work)."); - break; - } - } if (this.ws.stopped) { logger.info("Breaking out of task loop (wallet stopped)."); break; } + + if (this.isIdle()) { + this.ws.notify({ + type: NotificationType.Idle, + }); + } + await this.iterCond.wait(); } this.isRunning = false; @@ -237,8 +243,8 @@ export class TaskSchedulerImpl implements TaskScheduler { * * Mostly useful to interrupt all waits when time-travelling. */ - reload(): void { - this.ensureRunning(); + async reload(): Promise<void> { + await this.ensureRunning(); const tasksIds = [...this.sheps.keys()]; logger.info(`reloading sheperd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { diff --git a/packages/taler-wallet-core/src/testing.ts b/packages/taler-wallet-core/src/testing.ts index 32c0765b4..2f149cfa8 100644 --- a/packages/taler-wallet-core/src/testing.ts +++ b/packages/taler-wallet-core/src/testing.ts @@ -39,8 +39,6 @@ import { j2s, Logger, NotificationType, - OpenedPromise, - openPromise, parsePaytoUri, PreparePayResultType, TalerCorebankApiClient, @@ -58,6 +56,7 @@ import { readSuccessResponseJsonOrThrow, } from "@gnu-taler/taler-util/http"; import { getBalances } from "./balance.js"; +import { genericWaitForState } from "./common.js"; import { createDepositGroup } from "./deposits.js"; import { fetchFreshExchange } from "./exchanges.js"; import { @@ -402,52 +401,56 @@ export async function waitUntilAllTransactionsFinal( wex: WalletExecutionContext, ): Promise<void> { logger.info("waiting until all transactions are in a final state"); - wex.taskScheduler.ensureRunning(); - let p: OpenedPromise<void> | undefined = undefined; - const cancelNotifs = wex.ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.TransactionStateTransition) { + await wex.taskScheduler.ensureRunning(); + await genericWaitForState(wex, { + filterNotification(notif) { + if (notif.type !== NotificationType.TransactionStateTransition) { + return false; + } switch (notif.newTxState.major) { case TransactionMajorState.Pending: case TransactionMajorState.Aborting: - break; + return false; default: - p.resolve(); + return true; } - } - }); - while (1) { - p = openPromise(); - const txs = await getTransactions(wex, { - includeRefreshes: true, - filterByState: "nonfinal", - }); - let finished = true; - for (const tx of txs.transactions) { - switch (tx.txState.major) { - case TransactionMajorState.Pending: - case TransactionMajorState.Aborting: - case TransactionMajorState.Suspended: - case TransactionMajorState.SuspendedAborting: - finished = false; - logger.info( - `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, - ); - break; + }, + async checkState() { + const txs = await getTransactions(wex, { + includeRefreshes: true, + filterByState: "nonfinal", + }); + for (const tx of txs.transactions) { + switch (tx.txState.major) { + case TransactionMajorState.Pending: + case TransactionMajorState.Aborting: + case TransactionMajorState.Suspended: + case TransactionMajorState.SuspendedAborting: + logger.info( + `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, + ); + return false; + } } - } - if (finished) { - break; - } - // Wait until transaction state changed - await p.promise; - } - cancelNotifs(); + return true; + }, + }); logger.info("done waiting until all transactions are in a final state"); } +export async function waitTasksDone( + wex: WalletExecutionContext, +): Promise<void> { + await genericWaitForState(wex, { + async checkState() { + return wex.taskScheduler.isIdle(); + }, + filterNotification(notif) { + return notif.type === NotificationType.Idle; + }, + }); +} + /** * Wait until all chosen transactions are in a final state. */ @@ -462,59 +465,51 @@ export async function waitUntilGivenTransactionsFinal( if (transactionIds.length === 0) { return; } - wex.taskScheduler.ensureRunning(); + const txIdSet = new Set(transactionIds); - let p: OpenedPromise<void> | undefined = undefined; - const cancelNotifs = wex.ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.TransactionStateTransition) { + + await genericWaitForState(wex, { + filterNotification(notif) { + if (notif.type !== NotificationType.TransactionStateTransition) { + return false; + } if (!txIdSet.has(notif.transactionId)) { - return; + return false; } switch (notif.newTxState.major) { case TransactionMajorState.Pending: case TransactionMajorState.Aborting: case TransactionMajorState.Suspended: case TransactionMajorState.SuspendedAborting: - break; - default: - p.resolve(); - } - } - }); - while (1) { - p = openPromise(); - const txs = await getTransactions(wex, { - includeRefreshes: true, - filterByState: "nonfinal", - }); - let finished = true; - for (const tx of txs.transactions) { - if (!txIdSet.has(tx.transactionId)) { - // Don't look at this transaction, we're not interested in it. - continue; + return false; } - switch (tx.txState.major) { - case TransactionMajorState.Pending: - case TransactionMajorState.Aborting: - case TransactionMajorState.Suspended: - case TransactionMajorState.SuspendedAborting: - finished = false; - logger.info( - `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, - ); - break; + return true; + }, + async checkState() { + const txs = await getTransactions(wex, { + includeRefreshes: true, + filterByState: "nonfinal", + }); + for (const tx of txs.transactions) { + if (!txIdSet.has(tx.transactionId)) { + // Don't look at this transaction, we're not interested in it. + continue; + } + switch (tx.txState.major) { + case TransactionMajorState.Pending: + case TransactionMajorState.Aborting: + case TransactionMajorState.Suspended: + case TransactionMajorState.SuspendedAborting: + logger.info( + `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, + ); + return false; + } } - } - if (finished) { - break; - } - // Wait until transaction state changed - await p.promise; - } - cancelNotifs(); + // No transaction is pending, we're done waiting! + return true; + }, + }); logger.info("done waiting until given transactions are in a final state"); } @@ -522,52 +517,43 @@ export async function waitUntilRefreshesDone( wex: WalletExecutionContext, ): Promise<void> { logger.info("waiting until all refresh transactions are in a final state"); - wex.taskScheduler.ensureRunning(); - let p: OpenedPromise<void> | undefined = undefined; - const cancelNotifs = wex.ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.TransactionStateTransition) { + + await genericWaitForState(wex, { + filterNotification(notif) { + if (notif.type !== NotificationType.TransactionStateTransition) { + return false; + } switch (notif.newTxState.major) { case TransactionMajorState.Pending: case TransactionMajorState.Aborting: - break; + return false; default: - p.resolve(); - } - } - }); - while (1) { - p = openPromise(); - const txs = await getTransactions(wex, { - includeRefreshes: true, - filterByState: "nonfinal", - }); - let finished = true; - for (const tx of txs.transactions) { - if (tx.type !== TransactionType.Refresh) { - continue; + return true; } - switch (tx.txState.major) { - case TransactionMajorState.Pending: - case TransactionMajorState.Aborting: - case TransactionMajorState.Suspended: - case TransactionMajorState.SuspendedAborting: - finished = false; - logger.info( - `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, - ); - break; + }, + async checkState() { + const txs = await getTransactions(wex, { + includeRefreshes: true, + filterByState: "nonfinal", + }); + for (const tx of txs.transactions) { + if (tx.type !== TransactionType.Refresh) { + continue; + } + switch (tx.txState.major) { + case TransactionMajorState.Pending: + case TransactionMajorState.Aborting: + case TransactionMajorState.Suspended: + case TransactionMajorState.SuspendedAborting: + logger.info( + `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, + ); + return false; + } } - } - if (finished) { - break; - } - // Wait until transaction state changed - await p.promise; - } - cancelNotifs(); + return true; + }, + }); logger.info("done waiting until all refreshes are in a final state"); } @@ -575,33 +561,10 @@ async function waitUntilTransactionPendingReady( wex: WalletExecutionContext, transactionId: string, ): Promise<void> { - logger.info(`starting waiting for ${transactionId} to be in pending(ready)`); - wex.taskScheduler.ensureRunning(); - let p: OpenedPromise<void> | undefined = undefined; - const cancelNotifs = wex.ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.TransactionStateTransition) { - p.resolve(); - } + return await waitTransactionState(wex, transactionId, { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.Ready, }); - while (1) { - p = openPromise(); - const tx = await getTransactionById(wex, { - transactionId, - }); - if ( - tx.txState.major == TransactionMajorState.Pending && - tx.txState.minor === TransactionMinorState.Ready - ) { - break; - } - // Wait until transaction state changed - await p.promise; - } - logger.info(`done waiting for ${transactionId} to be in pending(ready)`); - cancelNotifs(); } /** @@ -617,34 +580,22 @@ export async function waitTransactionState( txState, )})`, ); - wex.taskScheduler.ensureRunning(); - let p: OpenedPromise<void> | undefined = undefined; - const cancelNotifs = wex.ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.TransactionStateTransition) { - p.resolve(); - } + await genericWaitForState(wex, { + async checkState() { + const tx = await getTransactionById(wex, { + transactionId, + }); + return ( + tx.txState.major === txState.major && tx.txState.minor === txState.minor + ); + }, + filterNotification(notif) { + return notif.type === NotificationType.TransactionStateTransition; + }, }); - while (1) { - p = openPromise(); - const tx = await getTransactionById(wex, { - transactionId, - }); - if ( - tx.txState.major === txState.major && - tx.txState.minor === txState.minor - ) { - break; - } - // Wait until transaction state changed - await p.promise; - } logger.info( `done waiting for ${transactionId} to be in ${JSON.stringify(txState)}`, ); - cancelNotifs(); } export async function waitUntilTransactionWithAssociatedRefreshesFinal( @@ -669,7 +620,7 @@ export async function runIntegrationTest2( wex: WalletExecutionContext, args: IntegrationTestV2Args, ): Promise<void> { - wex.taskScheduler.ensureRunning(); + await wex.taskScheduler.ensureRunning(); logger.info("running test with arguments", args); const exchangeInfo = await fetchFreshExchange(wex, args.exchangeBaseUrl); diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts index 15803ce8d..f493a6b8b 100644 --- a/packages/taler-wallet-core/src/wallet-api-types.ts +++ b/packages/taler-wallet-core/src/wallet-api-types.ts @@ -238,6 +238,7 @@ export enum WalletApiOperation { TestingWaitTransactionsFinal = "testingWaitTransactionsFinal", TestingWaitRefreshesFinal = "testingWaitRefreshesFinal", TestingWaitTransactionState = "testingWaitTransactionState", + TestingWaitTasksDone = "testingWaitTasksDone", TestingSetTimetravel = "testingSetTimetravel", GetCurrencySpecification = "getCurrencySpecification", ListStoredBackups = "listStoredBackups", @@ -1113,6 +1114,15 @@ export type TestingWaitTransactionsFinalOp = { }; /** + * Wait until all transactions are in a final state. + */ +export type TestingWaitTasksDoneOp = { + op: WalletApiOperation.TestingWaitTasksDone; + request: EmptyObject; + response: EmptyObject; +}; + +/** * Wait until all refresh transactions are in a final state. */ export type TestingWaitRefreshesFinalOp = { @@ -1253,6 +1263,7 @@ export type WalletOperations = { [WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp; [WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp; [WalletApiOperation.TestingWaitTransactionState]: TestingWaitTransactionStateOp; + [WalletApiOperation.TestingWaitTasksDone]: TestingWaitTasksDoneOp; [WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp; [WalletApiOperation.CreateStoredBackup]: CreateStoredBackupsOp; [WalletApiOperation.ListStoredBackups]: ListStoredBackupsOp; diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index e3aca1ac5..45f9e6078 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -55,7 +55,6 @@ import { PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, RecoverStoredBackupRequest, - RetryLoopOpts, StoredBackupList, TalerError, TalerErrorCode, @@ -256,6 +255,7 @@ import { runIntegrationTest, runIntegrationTest2, testPay, + waitTasksDone, waitTransactionState, waitUntilAllTransactionsFinal, waitUntilRefreshesDone, @@ -734,7 +734,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( }; // After initialization, task loop should run. - wex.taskScheduler.ensureRunning(); + await wex.taskScheduler.ensureRunning(); wex.ws.initCalled = true; return resp; @@ -1325,6 +1325,10 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( }); return {}; } + case WalletApiOperation.TestingWaitTasksDone: { + await waitTasksDone(wex); + return {}; + } case WalletApiOperation.RemoveGlobalCurrencyAuditor: { const req = codecForRemoveGlobalCurrencyAuditorRequest().decode(payload); await wex.db.runReadWriteTx(["globalCurrencyAuditors"], async (tx) => { @@ -1394,7 +1398,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( case WalletApiOperation.TestingSetTimetravel: { const req = codecForTestingSetTimetravelRequest().decode(payload); setDangerousTimetravel(req.offsetMs); - wex.taskScheduler.reload(); + await wex.taskScheduler.reload(); return {}; } case WalletApiOperation.DeleteExchange: { @@ -1656,11 +1660,6 @@ export class Wallet { this.ws.stop(); } - async runTaskLoop(opts?: RetryLoopOpts): Promise<void> { - await this.ws.ensureWalletDbOpen(); - return this.ws.taskScheduler.run(opts); - } - async handleCoreApiRequest( operation: string, id: string, |