summaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-04-22 23:29:07 +0200
committerFlorian Dold <florian@dold.me>2024-04-22 23:29:07 +0200
commita181ee06e4b52cb35e00ff8c86acff315135faf2 (patch)
tree9961ae277d861f93818c253e3992ad25128f6377 /packages
parente944c27e43474e8db464fbc593607e4e9d89576d (diff)
downloadwallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.gz
wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.tar.bz2
wallet-core-a181ee06e4b52cb35e00ff8c86acff315135faf2.zip
wallet-core: unify handling of run-until-done, simplify waiter implementation
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-harness/src/bench1.ts9
-rw-r--r--packages/taler-harness/src/bench3.ts8
-rw-r--r--packages/taler-util/src/notifications.ts8
-rw-r--r--packages/taler-util/src/wallet-types.ts10
-rw-r--r--packages/taler-wallet-cli/src/index.ts31
-rw-r--r--packages/taler-wallet-core/src/common.ts52
-rw-r--r--packages/taler-wallet-core/src/observable-wrappers.ts15
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts54
-rw-r--r--packages/taler-wallet-core/src/testing.ts297
-rw-r--r--packages/taler-wallet-core/src/wallet-api-types.ts11
-rw-r--r--packages/taler-wallet-core/src/wallet.ts15
11 files changed, 247 insertions, 263 deletions
diff --git a/packages/taler-harness/src/bench1.ts b/packages/taler-harness/src/bench1.ts
index 428114e0e..216760260 100644
--- a/packages/taler-harness/src/bench1.ts
+++ b/packages/taler-harness/src/bench1.ts
@@ -29,7 +29,6 @@ import {
} from "@gnu-taler/taler-util";
import {
AccessStats,
- applyRunConfigDefaults,
createNativeWalletHost2,
Wallet,
WalletApiOperation,
@@ -105,9 +104,7 @@ export async function runBench1(configJson: any): Promise<void> {
exchangeBaseUrl: b1conf.exchange,
});
- await wallet.runTaskLoop({
- stopWhenDone: true,
- });
+ await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
logger.info(
`Finished withdrawal amount=${withdrawAmount} time=${Date.now() - start}`,
@@ -123,9 +120,7 @@ export async function runBench1(configJson: any): Promise<void> {
depositPaytoUri: b1conf.payto,
});
- await wallet.runTaskLoop({
- stopWhenDone: true,
- });
+ await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
logger.info(`Finished deposit amount=10 time=${Date.now() - start}`);
}
diff --git a/packages/taler-harness/src/bench3.ts b/packages/taler-harness/src/bench3.ts
index f138dff68..a5bc094df 100644
--- a/packages/taler-harness/src/bench3.ts
+++ b/packages/taler-harness/src/bench3.ts
@@ -115,9 +115,7 @@ export async function runBench3(configJson: any): Promise<void> {
exchangeBaseUrl: b3conf.exchange,
});
- await wallet.runTaskLoop({
- stopWhenDone: true,
- });
+ await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
logger.info(
`Finished withdrawal amount=${withdrawAmount} time=${Date.now() - start}`,
@@ -135,9 +133,7 @@ export async function runBench3(configJson: any): Promise<void> {
depositPaytoUri: payto,
});
- await wallet.runTaskLoop({
- stopWhenDone: true,
- });
+ await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
logger.info(`Finished deposit amount=10 time=${Date.now() - start}`);
}
diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts
index 1c6ca4b85..b60fb267c 100644
--- a/packages/taler-util/src/notifications.ts
+++ b/packages/taler-util/src/notifications.ts
@@ -32,6 +32,7 @@ export enum NotificationType {
TransactionStateTransition = "transaction-state-transition",
WithdrawalOperationTransition = "withdrawal-operation-transition",
ExchangeStateTransition = "exchange-state-transition",
+ Idle = "idle",
TaskObservabilityEvent = "task-observability-event",
RequestObservabilityEvent = "request-observability-event",
}
@@ -230,6 +231,10 @@ export interface WithdrawalOperationTransitionNotification {
uri: string;
}
+export interface IdleNotification {
+ type: NotificationType.Idle;
+}
+
export type WalletNotification =
| BalanceChangeNotification
| WithdrawalOperationTransitionNotification
@@ -237,4 +242,5 @@ export type WalletNotification =
| ExchangeStateTransitionNotification
| TransactionStateTransitionNotification
| TaskProgressNotification
- | RequestProgressNotification;
+ | RequestProgressNotification
+ | IdleNotification;
diff --git a/packages/taler-util/src/wallet-types.ts b/packages/taler-util/src/wallet-types.ts
index 0653bc473..d39eb3ce9 100644
--- a/packages/taler-util/src/wallet-types.ts
+++ b/packages/taler-util/src/wallet-types.ts
@@ -3213,16 +3213,6 @@ export const codecForRemoveGlobalCurrencyAuditorRequest =
.property("auditorPub", codecForString())
.build("RemoveGlobalCurrencyAuditorRequest");
-export interface RetryLoopOpts {
- /**
- * Stop the retry loop when all lifeness-giving pending operations
- * are done.
- *
- * Defaults to false.
- */
- stopWhenDone?: boolean;
-}
-
/**
* Information about one provider.
*
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts
index 8c4760223..f0ff02903 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -358,25 +358,6 @@ async function withWallet<T>(
}
}
-/**
- * Run a function with a local wallet.
- *
- * Stops the wallet after the function is done.
- */
-async function withLocalWallet<T>(
- walletCliArgs: WalletCliArgsType,
- f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
-): Promise<T> {
- const wh = await createLocalWallet(walletCliArgs, (notif) => {
- writeObservabilityLog(notif);
- });
- const w = wh.wallet;
- const res = await f({ client: w.client, ws: w });
- logger.info("Work done, stopping wallet.");
- w.stop();
- return res;
-}
-
walletCli
.subcommand("balance", "balance", { help: "Show wallet balance." })
.flag("json", ["--json"], {
@@ -584,12 +565,8 @@ walletCli
help: "Run until no more work is left.",
})
.action(async (args) => {
- await withLocalWallet(args, async (wallet) => {
- logger.info("running until pending operations are finished");
- await wallet.ws.runTaskLoop({
- stopWhenDone: true,
- });
- wallet.ws.stop();
+ await withWallet(args, async (ctx) => {
+ await ctx.client.call(WalletApiOperation.TestingWaitTasksDone, {});
});
});
@@ -1330,9 +1307,7 @@ advancedCli
exchangeBaseUrl: "http://localhost:8081/",
merchantBaseUrl: "http://localhost:8083/",
});
- await wallet.runTaskLoop({
- stopWhenDone: true,
- });
+ await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
wallet.stop();
});
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index 6d116c47e..edaba5ba4 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -21,6 +21,7 @@ import {
AbsoluteTime,
AmountJson,
Amounts,
+ AsyncFlag,
CoinRefreshRequest,
CoinStatus,
Duration,
@@ -35,6 +36,7 @@ import {
TalerProtocolTimestamp,
TombstoneIdStr,
TransactionIdStr,
+ WalletNotification,
assertUnreachable,
checkDbInvariant,
checkLogicInvariant,
@@ -769,3 +771,53 @@ export enum PendingTaskType {
declare const __taskIdStr: unique symbol;
export type TaskIdStr = string & { [__taskIdStr]: true };
+
+/**
+ * Wait until the wallet is in a particular state.
+ *
+ * Two functions must be provided:
+ * 1. checkState, which checks if the wallet is in the
+ * desired state.
+ * 2. filterNotification, which checks whether a notification
+ * might have lead to a state change.
+ */
+export async function genericWaitForState(
+ wex: WalletExecutionContext,
+ args: {
+ checkState: () => Promise<boolean>;
+ filterNotification: (notif: WalletNotification) => boolean;
+ },
+): Promise<void> {
+ await wex.taskScheduler.ensureRunning();
+
+ // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+ const flag = new AsyncFlag();
+ // Raise purchaseNotifFlag whenever we get a notification
+ // about our refresh.
+ const cancelNotif = wex.ws.addNotificationListener((notif) => {
+ if (args.filterNotification(notif)) {
+ flag.raise();
+ }
+ });
+ const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+ cancelNotif();
+ flag.raise();
+ });
+
+ try {
+ while (true) {
+ if (wex.cancellationToken.isCancelled) {
+ throw Error("cancelled");
+ }
+ if (await args.checkState()) {
+ return;
+ }
+ // Wait for the next transition
+ await flag.wait();
+ flag.reset();
+ }
+ } catch (e) {
+ unregisterOnCancelled();
+ cancelNotif();
+ }
+}
diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts
index b36f41611..7cd65f38e 100644
--- a/packages/taler-wallet-core/src/observable-wrappers.ts
+++ b/packages/taler-wallet-core/src/observable-wrappers.ts
@@ -25,7 +25,6 @@ import { IDBDatabase } from "@gnu-taler/idb-bridge";
import {
ObservabilityContext,
ObservabilityEventType,
- RetryLoopOpts,
} from "@gnu-taler/taler-util";
import { TaskIdStr } from "./common.js";
import { TalerCryptoInterface } from "./index.js";
@@ -65,13 +64,14 @@ export class ObservableTaskScheduler implements TaskScheduler {
return this.impl.getActiveTasks();
}
- ensureRunning(): void {
- return this.impl.ensureRunning();
+ isIdle(): boolean {
+ return this.impl.isIdle();
}
- run(opts?: RetryLoopOpts | undefined): Promise<void> {
- return this.impl.run(opts);
+ ensureRunning(): Promise<void> {
+ return this.impl.ensureRunning();
}
+
startShepherdTask(taskId: TaskIdStr): void {
this.declareDep(taskId);
this.oc.observe({
@@ -80,6 +80,7 @@ export class ObservableTaskScheduler implements TaskScheduler {
});
return this.impl.startShepherdTask(taskId);
}
+
stopShepherdTask(taskId: TaskIdStr): void {
this.declareDep(taskId);
this.oc.observe({
@@ -88,6 +89,7 @@ export class ObservableTaskScheduler implements TaskScheduler {
});
return this.impl.stopShepherdTask(taskId);
}
+
resetTaskRetries(taskId: TaskIdStr): Promise<void> {
this.declareDep(taskId);
if (this.taskDepCache.size > 500) {
@@ -99,7 +101,8 @@ export class ObservableTaskScheduler implements TaskScheduler {
});
return this.impl.resetTaskRetries(taskId);
}
- reload(): void {
+
+ async reload(): Promise<void> {
return this.impl.reload();
}
}
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 58bdcf0dd..aae6d5a18 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -27,7 +27,6 @@ import {
NotificationType,
ObservabilityContext,
ObservabilityEventType,
- RetryLoopOpts,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
@@ -142,13 +141,13 @@ function taskGivesLiveness(taskId: string): boolean {
}
export interface TaskScheduler {
- ensureRunning(): void;
- run(opts?: RetryLoopOpts): Promise<void>;
+ ensureRunning(): Promise<void>;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
resetTaskRetries(taskId: TaskIdStr): Promise<void>;
- reload(): void;
+ reload(): Promise<void>;
getActiveTasks(): TaskIdStr[];
+ isIdle(): boolean;
}
export class TaskSchedulerImpl implements TaskScheduler {
@@ -176,10 +175,11 @@ export class TaskSchedulerImpl implements TaskScheduler {
return [...this.sheps.keys()];
}
- ensureRunning(): void {
+ async ensureRunning(): Promise<void> {
if (this.isRunning) {
return;
}
+ await this.loadTasksFromDb();
this.run()
.catch((e) => {
logger.error("error running task loop");
@@ -190,7 +190,22 @@ export class TaskSchedulerImpl implements TaskScheduler {
});
}
- async run(opts: RetryLoopOpts = {}): Promise<void> {
+ isIdle(): boolean {
+ let alive = false;
+ const taskIds = [...this.sheps.keys()];
+ logger.info(`current task IDs: ${j2s(taskIds)}`);
+ logger.info(`sheps: ${this.sheps.size}`);
+ for (const taskId of taskIds) {
+ if (taskGivesLiveness(taskId)) {
+ alive = true;
+ break;
+ }
+ }
+ // We're idle if no task is alive anymore.
+ return !alive;
+ }
+
+ private async run(): Promise<void> {
if (this.isRunning) {
throw Error("task loop already running");
}
@@ -200,26 +215,17 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.info("loaded!");
logger.info(`sheps: ${this.sheps.size}`);
while (true) {
- if (opts.stopWhenDone) {
- let alive = false;
- const taskIds = [...this.sheps.keys()];
- logger.info(`current task IDs: ${j2s(taskIds)}`);
- logger.info(`sheps: ${this.sheps.size}`);
- for (const taskId of taskIds) {
- if (taskGivesLiveness(taskId)) {
- alive = true;
- break;
- }
- }
- if (!alive) {
- logger.info("Breaking out of task loop (no more work).");
- break;
- }
- }
if (this.ws.stopped) {
logger.info("Breaking out of task loop (wallet stopped).");
break;
}
+
+ if (this.isIdle()) {
+ this.ws.notify({
+ type: NotificationType.Idle,
+ });
+ }
+
await this.iterCond.wait();
}
this.isRunning = false;
@@ -237,8 +243,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
*
* Mostly useful to interrupt all waits when time-travelling.
*/
- reload(): void {
- this.ensureRunning();
+ async reload(): Promise<void> {
+ await this.ensureRunning();
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
diff --git a/packages/taler-wallet-core/src/testing.ts b/packages/taler-wallet-core/src/testing.ts
index 32c0765b4..2f149cfa8 100644
--- a/packages/taler-wallet-core/src/testing.ts
+++ b/packages/taler-wallet-core/src/testing.ts
@@ -39,8 +39,6 @@ import {
j2s,
Logger,
NotificationType,
- OpenedPromise,
- openPromise,
parsePaytoUri,
PreparePayResultType,
TalerCorebankApiClient,
@@ -58,6 +56,7 @@ import {
readSuccessResponseJsonOrThrow,
} from "@gnu-taler/taler-util/http";
import { getBalances } from "./balance.js";
+import { genericWaitForState } from "./common.js";
import { createDepositGroup } from "./deposits.js";
import { fetchFreshExchange } from "./exchanges.js";
import {
@@ -402,52 +401,56 @@ export async function waitUntilAllTransactionsFinal(
wex: WalletExecutionContext,
): Promise<void> {
logger.info("waiting until all transactions are in a final state");
- wex.taskScheduler.ensureRunning();
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = wex.ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.TransactionStateTransition) {
+ await wex.taskScheduler.ensureRunning();
+ await genericWaitForState(wex, {
+ filterNotification(notif) {
+ if (notif.type !== NotificationType.TransactionStateTransition) {
+ return false;
+ }
switch (notif.newTxState.major) {
case TransactionMajorState.Pending:
case TransactionMajorState.Aborting:
- break;
+ return false;
default:
- p.resolve();
+ return true;
}
- }
- });
- while (1) {
- p = openPromise();
- const txs = await getTransactions(wex, {
- includeRefreshes: true,
- filterByState: "nonfinal",
- });
- let finished = true;
- for (const tx of txs.transactions) {
- switch (tx.txState.major) {
- case TransactionMajorState.Pending:
- case TransactionMajorState.Aborting:
- case TransactionMajorState.Suspended:
- case TransactionMajorState.SuspendedAborting:
- finished = false;
- logger.info(
- `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
- );
- break;
+ },
+ async checkState() {
+ const txs = await getTransactions(wex, {
+ includeRefreshes: true,
+ filterByState: "nonfinal",
+ });
+ for (const tx of txs.transactions) {
+ switch (tx.txState.major) {
+ case TransactionMajorState.Pending:
+ case TransactionMajorState.Aborting:
+ case TransactionMajorState.Suspended:
+ case TransactionMajorState.SuspendedAborting:
+ logger.info(
+ `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
+ );
+ return false;
+ }
}
- }
- if (finished) {
- break;
- }
- // Wait until transaction state changed
- await p.promise;
- }
- cancelNotifs();
+ return true;
+ },
+ });
logger.info("done waiting until all transactions are in a final state");
}
+export async function waitTasksDone(
+ wex: WalletExecutionContext,
+): Promise<void> {
+ await genericWaitForState(wex, {
+ async checkState() {
+ return wex.taskScheduler.isIdle();
+ },
+ filterNotification(notif) {
+ return notif.type === NotificationType.Idle;
+ },
+ });
+}
+
/**
* Wait until all chosen transactions are in a final state.
*/
@@ -462,59 +465,51 @@ export async function waitUntilGivenTransactionsFinal(
if (transactionIds.length === 0) {
return;
}
- wex.taskScheduler.ensureRunning();
+
const txIdSet = new Set(transactionIds);
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = wex.ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.TransactionStateTransition) {
+
+ await genericWaitForState(wex, {
+ filterNotification(notif) {
+ if (notif.type !== NotificationType.TransactionStateTransition) {
+ return false;
+ }
if (!txIdSet.has(notif.transactionId)) {
- return;
+ return false;
}
switch (notif.newTxState.major) {
case TransactionMajorState.Pending:
case TransactionMajorState.Aborting:
case TransactionMajorState.Suspended:
case TransactionMajorState.SuspendedAborting:
- break;
- default:
- p.resolve();
- }
- }
- });
- while (1) {
- p = openPromise();
- const txs = await getTransactions(wex, {
- includeRefreshes: true,
- filterByState: "nonfinal",
- });
- let finished = true;
- for (const tx of txs.transactions) {
- if (!txIdSet.has(tx.transactionId)) {
- // Don't look at this transaction, we're not interested in it.
- continue;
+ return false;
}
- switch (tx.txState.major) {
- case TransactionMajorState.Pending:
- case TransactionMajorState.Aborting:
- case TransactionMajorState.Suspended:
- case TransactionMajorState.SuspendedAborting:
- finished = false;
- logger.info(
- `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
- );
- break;
+ return true;
+ },
+ async checkState() {
+ const txs = await getTransactions(wex, {
+ includeRefreshes: true,
+ filterByState: "nonfinal",
+ });
+ for (const tx of txs.transactions) {
+ if (!txIdSet.has(tx.transactionId)) {
+ // Don't look at this transaction, we're not interested in it.
+ continue;
+ }
+ switch (tx.txState.major) {
+ case TransactionMajorState.Pending:
+ case TransactionMajorState.Aborting:
+ case TransactionMajorState.Suspended:
+ case TransactionMajorState.SuspendedAborting:
+ logger.info(
+ `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
+ );
+ return false;
+ }
}
- }
- if (finished) {
- break;
- }
- // Wait until transaction state changed
- await p.promise;
- }
- cancelNotifs();
+ // No transaction is pending, we're done waiting!
+ return true;
+ },
+ });
logger.info("done waiting until given transactions are in a final state");
}
@@ -522,52 +517,43 @@ export async function waitUntilRefreshesDone(
wex: WalletExecutionContext,
): Promise<void> {
logger.info("waiting until all refresh transactions are in a final state");
- wex.taskScheduler.ensureRunning();
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = wex.ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.TransactionStateTransition) {
+
+ await genericWaitForState(wex, {
+ filterNotification(notif) {
+ if (notif.type !== NotificationType.TransactionStateTransition) {
+ return false;
+ }
switch (notif.newTxState.major) {
case TransactionMajorState.Pending:
case TransactionMajorState.Aborting:
- break;
+ return false;
default:
- p.resolve();
- }
- }
- });
- while (1) {
- p = openPromise();
- const txs = await getTransactions(wex, {
- includeRefreshes: true,
- filterByState: "nonfinal",
- });
- let finished = true;
- for (const tx of txs.transactions) {
- if (tx.type !== TransactionType.Refresh) {
- continue;
+ return true;
}
- switch (tx.txState.major) {
- case TransactionMajorState.Pending:
- case TransactionMajorState.Aborting:
- case TransactionMajorState.Suspended:
- case TransactionMajorState.SuspendedAborting:
- finished = false;
- logger.info(
- `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
- );
- break;
+ },
+ async checkState() {
+ const txs = await getTransactions(wex, {
+ includeRefreshes: true,
+ filterByState: "nonfinal",
+ });
+ for (const tx of txs.transactions) {
+ if (tx.type !== TransactionType.Refresh) {
+ continue;
+ }
+ switch (tx.txState.major) {
+ case TransactionMajorState.Pending:
+ case TransactionMajorState.Aborting:
+ case TransactionMajorState.Suspended:
+ case TransactionMajorState.SuspendedAborting:
+ logger.info(
+ `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
+ );
+ return false;
+ }
}
- }
- if (finished) {
- break;
- }
- // Wait until transaction state changed
- await p.promise;
- }
- cancelNotifs();
+ return true;
+ },
+ });
logger.info("done waiting until all refreshes are in a final state");
}
@@ -575,33 +561,10 @@ async function waitUntilTransactionPendingReady(
wex: WalletExecutionContext,
transactionId: string,
): Promise<void> {
- logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
- wex.taskScheduler.ensureRunning();
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = wex.ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.TransactionStateTransition) {
- p.resolve();
- }
+ return await waitTransactionState(wex, transactionId, {
+ major: TransactionMajorState.Pending,
+ minor: TransactionMinorState.Ready,
});
- while (1) {
- p = openPromise();
- const tx = await getTransactionById(wex, {
- transactionId,
- });
- if (
- tx.txState.major == TransactionMajorState.Pending &&
- tx.txState.minor === TransactionMinorState.Ready
- ) {
- break;
- }
- // Wait until transaction state changed
- await p.promise;
- }
- logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
- cancelNotifs();
}
/**
@@ -617,34 +580,22 @@ export async function waitTransactionState(
txState,
)})`,
);
- wex.taskScheduler.ensureRunning();
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = wex.ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.TransactionStateTransition) {
- p.resolve();
- }
+ await genericWaitForState(wex, {
+ async checkState() {
+ const tx = await getTransactionById(wex, {
+ transactionId,
+ });
+ return (
+ tx.txState.major === txState.major && tx.txState.minor === txState.minor
+ );
+ },
+ filterNotification(notif) {
+ return notif.type === NotificationType.TransactionStateTransition;
+ },
});
- while (1) {
- p = openPromise();
- const tx = await getTransactionById(wex, {
- transactionId,
- });
- if (
- tx.txState.major === txState.major &&
- tx.txState.minor === txState.minor
- ) {
- break;
- }
- // Wait until transaction state changed
- await p.promise;
- }
logger.info(
`done waiting for ${transactionId} to be in ${JSON.stringify(txState)}`,
);
- cancelNotifs();
}
export async function waitUntilTransactionWithAssociatedRefreshesFinal(
@@ -669,7 +620,7 @@ export async function runIntegrationTest2(
wex: WalletExecutionContext,
args: IntegrationTestV2Args,
): Promise<void> {
- wex.taskScheduler.ensureRunning();
+ await wex.taskScheduler.ensureRunning();
logger.info("running test with arguments", args);
const exchangeInfo = await fetchFreshExchange(wex, args.exchangeBaseUrl);
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts
index 15803ce8d..f493a6b8b 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -238,6 +238,7 @@ export enum WalletApiOperation {
TestingWaitTransactionsFinal = "testingWaitTransactionsFinal",
TestingWaitRefreshesFinal = "testingWaitRefreshesFinal",
TestingWaitTransactionState = "testingWaitTransactionState",
+ TestingWaitTasksDone = "testingWaitTasksDone",
TestingSetTimetravel = "testingSetTimetravel",
GetCurrencySpecification = "getCurrencySpecification",
ListStoredBackups = "listStoredBackups",
@@ -1113,6 +1114,15 @@ export type TestingWaitTransactionsFinalOp = {
};
/**
+ * Wait until all transactions are in a final state.
+ */
+export type TestingWaitTasksDoneOp = {
+ op: WalletApiOperation.TestingWaitTasksDone;
+ request: EmptyObject;
+ response: EmptyObject;
+};
+
+/**
* Wait until all refresh transactions are in a final state.
*/
export type TestingWaitRefreshesFinalOp = {
@@ -1253,6 +1263,7 @@ export type WalletOperations = {
[WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp;
[WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp;
[WalletApiOperation.TestingWaitTransactionState]: TestingWaitTransactionStateOp;
+ [WalletApiOperation.TestingWaitTasksDone]: TestingWaitTasksDoneOp;
[WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp;
[WalletApiOperation.CreateStoredBackup]: CreateStoredBackupsOp;
[WalletApiOperation.ListStoredBackups]: ListStoredBackupsOp;
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index e3aca1ac5..45f9e6078 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -55,7 +55,6 @@ import {
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
RecoverStoredBackupRequest,
- RetryLoopOpts,
StoredBackupList,
TalerError,
TalerErrorCode,
@@ -256,6 +255,7 @@ import {
runIntegrationTest,
runIntegrationTest2,
testPay,
+ waitTasksDone,
waitTransactionState,
waitUntilAllTransactionsFinal,
waitUntilRefreshesDone,
@@ -734,7 +734,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
};
// After initialization, task loop should run.
- wex.taskScheduler.ensureRunning();
+ await wex.taskScheduler.ensureRunning();
wex.ws.initCalled = true;
return resp;
@@ -1325,6 +1325,10 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
});
return {};
}
+ case WalletApiOperation.TestingWaitTasksDone: {
+ await waitTasksDone(wex);
+ return {};
+ }
case WalletApiOperation.RemoveGlobalCurrencyAuditor: {
const req = codecForRemoveGlobalCurrencyAuditorRequest().decode(payload);
await wex.db.runReadWriteTx(["globalCurrencyAuditors"], async (tx) => {
@@ -1394,7 +1398,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
case WalletApiOperation.TestingSetTimetravel: {
const req = codecForTestingSetTimetravelRequest().decode(payload);
setDangerousTimetravel(req.offsetMs);
- wex.taskScheduler.reload();
+ await wex.taskScheduler.reload();
return {};
}
case WalletApiOperation.DeleteExchange: {
@@ -1656,11 +1660,6 @@ export class Wallet {
this.ws.stop();
}
- async runTaskLoop(opts?: RetryLoopOpts): Promise<void> {
- await this.ws.ensureWalletDbOpen();
- return this.ws.taskScheduler.run(opts);
- }
-
async handleCoreApiRequest(
operation: string,
id: string,