summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-13 10:53:43 +0100
committerFlorian Dold <florian@dold.me>2024-02-15 21:56:54 +0100
commit70a803038f1cbe05dc4779bdd87376fd073421be (patch)
tree6607d69f6906ada9f912e31d9a9e3b65560a7326 /packages/taler-wallet-core/src/shepherd.ts
parent2c17e98c336d96f955ec82ad0a1b164e3da90103 (diff)
downloadwallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.gz
wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.bz2
wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.zip
implement task shepherd, many small fixes and tweaksdev/dold/task-shepherd
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts851
1 files changed, 851 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
new file mode 100644
index 000000000..d1648acc7
--- /dev/null
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -0,0 +1,851 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
+import {
+ AbsoluteTime,
+ CancellationToken,
+ Duration,
+ Logger,
+ NotificationType,
+ RetryLoopOpts,
+ TalerError,
+ TalerErrorCode,
+ TalerErrorDetail,
+ TaskThrottler,
+ TransactionIdStr,
+ TransactionType,
+ WalletNotification,
+ assertUnreachable,
+ j2s,
+ makeErrorDetail,
+} from "@gnu-taler/taler-util";
+import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
+import {
+ GetReadOnlyAccess,
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ WalletStoresV1,
+ timestampAbsoluteFromDb,
+} from "./index.js";
+import { InternalWalletState } from "./internal-wallet-state.js";
+import { processBackupForProvider } from "./operations/backup/index.js";
+import {
+ DbRetryInfo,
+ TaskRunResult,
+ TaskRunResultType,
+ constructTaskIdentifier,
+ getExchangeState,
+ parseTaskIdentifier,
+} from "./operations/common.js";
+import { processDepositGroup } from "./operations/deposits.js";
+import { updateExchangeFromUrlHandler } from "./operations/exchanges.js";
+import { processPurchase } from "./operations/pay-merchant.js";
+import { processPeerPullCredit } from "./operations/pay-peer-pull-credit.js";
+import { processPeerPullDebit } from "./operations/pay-peer-pull-debit.js";
+import { processPeerPushCredit } from "./operations/pay-peer-push-credit.js";
+import { processPeerPushDebit } from "./operations/pay-peer-push-debit.js";
+import { processRecoupGroup } from "./operations/recoup.js";
+import { processRefreshGroup } from "./operations/refresh.js";
+import { constructTransactionIdentifier } from "./operations/transactions.js";
+import { processWithdrawalGroup } from "./operations/withdraw.js";
+import { PendingTaskType, TaskId } from "./pending-types.js";
+import { AsyncCondition } from "./util/promiseUtils.js";
+
+const logger = new Logger("shepherd.ts");
+
+/**
+ * Info about one task being shepherded.
+ */
+interface ShepherdInfo {
+ cts: CancellationToken.Source;
+}
+
+export class TaskScheduler {
+ private sheps: Map<TaskId, ShepherdInfo> = new Map();
+
+ private iterCond = new AsyncCondition();
+
+ private throttler = new TaskThrottler();
+
+ constructor(private ws: InternalWalletState) {}
+
+ async loadTasksFromDb(): Promise<void> {
+ const activeTasks = await getActiveTaskIds(this.ws);
+
+ for (const tid of activeTasks.taskIds) {
+ this.startShepherdTask(tid);
+ }
+ }
+
+ async run(opts: RetryLoopOpts = {}): Promise<void> {
+ logger.info("Running task loop.");
+ this.ws.isTaskLoopRunning = true;
+ await this.loadTasksFromDb();
+ while (true) {
+ if (opts.stopWhenDone && this.sheps.size === 0) {
+ logger.info("Breaking out of task loop (no more work).");
+ break;
+ }
+ if (this.ws.stopped) {
+ logger.info("Breaking out of task loop (wallet stopped).");
+ break;
+ }
+ await this.iterCond.wait();
+ }
+ this.ws.isTaskLoopRunning = false;
+ logger.info("Done with task loop.");
+ }
+
+ startShepherdTask(taskId: TaskId): void {
+ // Run in the background, no await!
+ this.internalStartShepherdTask(taskId);
+ }
+
+ /**
+ * Stop and re-load all existing tasks.
+ *
+ * Mostly useful to interrupt all waits when time-travelling.
+ */
+ reload() {
+ const tasksIds = [...this.sheps.keys()];
+ logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
+ for (const taskId of tasksIds) {
+ this.stopShepherdTask(taskId);
+ }
+ for (const taskId of tasksIds) {
+ this.startShepherdTask(taskId);
+ }
+ }
+
+ private async internalStartShepherdTask(taskId: TaskId): Promise<void> {
+ logger.trace(`Starting to shepherd task ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ }
+ logger.trace(`Creating new shepherd for ${taskId}`);
+ const newShep: ShepherdInfo = {
+ cts: CancellationToken.create(),
+ };
+ this.sheps.set(taskId, newShep);
+ try {
+ await this.internalShepherdTask(taskId, newShep);
+ } finally {
+ logger.trace(`Done shepherding ${taskId}`);
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ stopShepherdTask(taskId: TaskId): void {
+ logger.trace(`Stopping shepherding of ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Cancelling old shepherd for ${taskId}`);
+ oldShep.cts.cancel();
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ restartShepherdTask(taskId: TaskId): void {
+ this.stopShepherdTask(taskId);
+ this.startShepherdTask(taskId);
+ }
+
+ async resetTaskRetries(taskId: TaskId): Promise<void> {
+ const maybeNotification = await this.ws.db
+ .mktxAll()
+ .runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(taskId);
+ return taskToRetryNotification(this.ws, tx, taskId, undefined);
+ });
+ this.stopShepherdTask(taskId);
+ if (maybeNotification) {
+ this.ws.notify(maybeNotification);
+ }
+ this.startShepherdTask(taskId);
+ }
+
+ private async internalShepherdTask(
+ taskId: TaskId,
+ info: ShepherdInfo,
+ ): Promise<void> {
+ while (true) {
+ if (this.ws.stopped) {
+ logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
+ return;
+ }
+ if (info.cts.token.isCancelled) {
+ logger.trace(`Shepherd for ${taskId} got cancelled`);
+ return;
+ }
+ const isThrottled = this.throttler.applyThrottle(taskId);
+ if (isThrottled) {
+ logger.warn(
+ `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
+ );
+ logger.warn("waiting for 60 seconds");
+ await this.ws.timerGroup.resolveAfter(
+ Duration.fromSpec({ seconds: 60 }),
+ );
+ }
+ logger.trace(`Shepherd for ${taskId} will call handler`);
+ // FIXME: This should already return the retry record.
+ const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
+ return await callOperationHandlerForTaskId(
+ this.ws,
+ taskId,
+ info.cts.token,
+ );
+ });
+ const retryRecord = await this.ws.db.runReadOnlyTx(
+ ["operationRetries"],
+ async (tx) => {
+ return tx.operationRetries.get(taskId);
+ },
+ );
+ switch (res.type) {
+ case TaskRunResultType.Error: {
+ logger.trace(`Shepherd for ${taskId} got error result.`);
+ if (retryRecord) {
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ } else {
+ logger.trace("Retrying immediately.");
+ }
+ break;
+ }
+ case TaskRunResultType.Backoff: {
+ logger.trace(`Shepherd for ${taskId} got backoff result.`);
+ if (retryRecord) {
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ } else {
+ logger.trace("Retrying immediately.");
+ }
+ break;
+ }
+ case TaskRunResultType.Progress: {
+ logger.trace(
+ `Shepherd for ${taskId} got progress result, re-running immediately.`,
+ );
+ break;
+ }
+ case TaskRunResultType.ScheduleLater:
+ logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+ const delay = AbsoluteTime.remaining(res.runAt);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ break;
+ case TaskRunResultType.Finished:
+ logger.trace(`Shepherd for ${taskId} got finished result.`);
+ return;
+ default:
+ assertUnreachable(res);
+ }
+ }
+ }
+}
+
+async function storePendingTaskError(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+ e: TalerErrorDetail,
+): Promise<void> {
+ logger.info(`storing pending task error for ${pendingTaskId}`);
+ const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ lastError: e,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ retryRecord.lastError = e;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ return taskToRetryNotification(ws, tx, pendingTaskId, e);
+ });
+ if (maybeNotification) {
+ ws.notify(maybeNotification);
+ }
+}
+
+/**
+ * Task made progress, clear error.
+ */
+async function storeTaskProgress(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db.mktxAll().runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
+}
+
+async function storePendingTaskPending(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ let hadError = false;
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ if (retryRecord.lastError) {
+ hadError = true;
+ }
+ delete retryRecord.lastError;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ if (hadError) {
+ return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
+ } else {
+ return undefined;
+ }
+ });
+ if (maybeNotification) {
+ ws.notify(maybeNotification);
+ }
+}
+
+async function storePendingTaskFinished(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => [x.operationRetries])
+ .runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
+}
+
+async function runTaskWithErrorReporting(
+ ws: InternalWalletState,
+ opId: TaskId,
+ f: () => Promise<TaskRunResult>,
+): Promise<TaskRunResult> {
+ let maybeError: TalerErrorDetail | undefined;
+ try {
+ const resp = await f();
+ switch (resp.type) {
+ case TaskRunResultType.Error:
+ await storePendingTaskError(ws, opId, resp.errorDetail);
+ return resp;
+ case TaskRunResultType.Finished:
+ await storePendingTaskFinished(ws, opId);
+ return resp;
+ case TaskRunResultType.Backoff:
+ await storePendingTaskPending(ws, opId);
+ return resp;
+ case TaskRunResultType.ScheduleLater:
+ // Task succeeded but wants to be run again.
+ await storeTaskProgress(ws, opId);
+ return resp;
+ case TaskRunResultType.Progress:
+ await storeTaskProgress(ws, opId);
+ return resp;
+ }
+ } catch (e) {
+ if (e instanceof CryptoApiStoppedError) {
+ if (ws.stopped) {
+ logger.warn("crypto API stopped during shutdown, ignoring error");
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {},
+ "Crypto API stopped during shutdown",
+ ),
+ };
+ }
+ }
+ if (e instanceof TalerError) {
+ logger.warn("operation processed resulted in error");
+ logger.warn(`error was: ${j2s(e.errorDetail)}`);
+ maybeError = e.errorDetail;
+ await storePendingTaskError(ws, opId, maybeError!);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: e.errorDetail,
+ };
+ } else if (e instanceof Error) {
+ // This is a bug, as we expect pending operations to always
+ // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
+ // or return something.
+ logger.error(`Uncaught exception: ${e.message}`);
+ logger.error(`Stack: ${e.stack}`);
+ maybeError = makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {
+ stack: e.stack,
+ },
+ `unexpected exception (message: ${e.message})`,
+ );
+ await storePendingTaskError(ws, opId, maybeError);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: maybeError,
+ };
+ } else {
+ logger.error("Uncaught exception, value is not even an error.");
+ maybeError = makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {},
+ `unexpected exception (not even an error)`,
+ );
+ await storePendingTaskError(ws, opId, maybeError);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: maybeError,
+ };
+ }
+ }
+}
+
+async function callOperationHandlerForTaskId(
+ ws: InternalWalletState,
+ taskId: TaskId,
+ cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+ const pending = parseTaskIdentifier(taskId);
+ switch (pending.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return await updateExchangeFromUrlHandler(
+ ws,
+ pending.exchangeBaseUrl,
+ cancellationToken,
+ );
+ case PendingTaskType.Refresh:
+ return await processRefreshGroup(
+ ws,
+ pending.refreshGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Withdraw:
+ return await processWithdrawalGroup(
+ ws,
+ pending.withdrawalGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Purchase:
+ return await processPurchase(ws, pending.proposalId);
+ case PendingTaskType.Recoup:
+ return await processRecoupGroup(ws, pending.recoupGroupId);
+ case PendingTaskType.Deposit:
+ return await processDepositGroup(
+ ws,
+ pending.depositGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Backup:
+ return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
+ case PendingTaskType.PeerPushDebit:
+ return await processPeerPushDebit(
+ ws,
+ pending.pursePub,
+ cancellationToken,
+ );
+ case PendingTaskType.PeerPullCredit:
+ return await processPeerPullCredit(
+ ws,
+ pending.pursePub,
+ cancellationToken,
+ );
+ case PendingTaskType.PeerPullDebit:
+ return await processPeerPullDebit(ws, pending.peerPullDebitId);
+ case PendingTaskType.PeerPushCredit:
+ return await processPeerPushCredit(
+ ws,
+ pending.peerPushCreditId,
+ cancellationToken,
+ );
+ case PendingTaskType.RewardPickup:
+ throw Error("not supported anymore");
+ default:
+ return assertUnreachable(pending);
+ }
+ throw Error(`not reached ${pending.tag}`);
+}
+
+/**
+ * Generate an appropriate error transition notification
+ * for applicable tasks.
+ *
+ * Namely, transition notifications are generated for:
+ * - exchange update errors
+ * - transactions
+ */
+async function taskToRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.PeerPullCredit:
+ case PendingTaskType.PeerPullDebit:
+ case PendingTaskType.Withdraw:
+ case PendingTaskType.PeerPushCredit:
+ case PendingTaskType.Deposit:
+ case PendingTaskType.Refresh:
+ case PendingTaskType.RewardPickup:
+ case PendingTaskType.PeerPushDebit:
+ case PendingTaskType.Purchase:
+ return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.Backup:
+ case PendingTaskType.Recoup:
+ return undefined;
+ }
+}
+
+async function makeTransactionRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const txId = convertTaskToTransactionId(pendingTaskId);
+ if (!txId) {
+ return undefined;
+ }
+ const txState = await ws.getTransactionState(ws, tx, txId);
+ if (!txState) {
+ return undefined;
+ }
+ const notif: WalletNotification = {
+ type: NotificationType.TransactionStateTransition,
+ transactionId: txId,
+ oldTxState: txState,
+ newTxState: txState,
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+async function makeExchangeRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ logger.info("making exchange retry notification");
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+ if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
+ throw Error("invalid task identifier");
+ }
+ const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
+
+ if (!rec) {
+ logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
+ return undefined;
+ }
+
+ const notif: WalletNotification = {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
+ oldExchangeState: getExchangeState(rec),
+ newExchangeState: getExchangeState(rec),
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+/**
+ * Convert the task ID for a task that processes a transaction int
+ * the ID for the transaction.
+ */
+function convertTaskToTransactionId(
+ taskId: string,
+): TransactionIdStr | undefined {
+ const parsedTaskId = parseTaskIdentifier(taskId);
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.PeerPullCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullCredit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.PeerPullDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullDebitId: parsedTaskId.peerPullDebitId,
+ });
+ // FIXME: This doesn't distinguish internal-withdrawal.
+ // Maybe we should have a different task type for that as well?
+ // Or maybe transaction IDs should be valid task identifiers?
+ case PendingTaskType.Withdraw:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Withdrawal,
+ withdrawalGroupId: parsedTaskId.withdrawalGroupId,
+ });
+ case PendingTaskType.PeerPushCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushCredit,
+ peerPushCreditId: parsedTaskId.peerPushCreditId,
+ });
+ case PendingTaskType.Deposit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: parsedTaskId.depositGroupId,
+ });
+ case PendingTaskType.Refresh:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId: parsedTaskId.refreshGroupId,
+ });
+ case PendingTaskType.RewardPickup:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Reward,
+ walletRewardId: parsedTaskId.walletRewardId,
+ });
+ case PendingTaskType.PeerPushDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushDebit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.Purchase:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Payment,
+ proposalId: parsedTaskId.proposalId,
+ });
+ default:
+ return undefined;
+ }
+}
+
+export interface ActiveTaskIdsResult {
+ taskIds: TaskId[];
+}
+
+export async function getActiveTaskIds(
+ ws: InternalWalletState,
+): Promise<ActiveTaskIdsResult> {
+ const res: ActiveTaskIdsResult = {
+ taskIds: [],
+ };
+ await ws.db
+ .mktx((x) => [
+ x.exchanges,
+ x.refreshGroups,
+ x.withdrawalGroups,
+ x.purchases,
+ x.depositGroups,
+ x.recoupGroups,
+ x.peerPullCredit,
+ x.peerPushDebit,
+ x.peerPullDebit,
+ x.peerPushCredit,
+ ])
+ .runReadWrite(async (tx) => {
+ const active = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+
+ // Withdrawals
+
+ {
+ const activeRecs =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: rec.withdrawalGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Deposits
+
+ {
+ const activeRecs =
+ await tx.depositGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: rec.depositGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Refreshes
+
+ {
+ const activeRecs =
+ await tx.refreshGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: rec.refreshGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Purchases
+
+ {
+ const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: rec.proposalId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-debit
+
+ {
+ const activeRecs =
+ await tx.peerPushDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushDebit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-credit
+
+ {
+ const activeRecs =
+ await tx.peerPushCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushCredit,
+ peerPushCreditId: rec.peerPushCreditId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-debit
+
+ {
+ const activeRecs =
+ await tx.peerPullDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullDebit,
+ peerPullDebitId: rec.peerPullDebitId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-credit
+
+ {
+ const activeRecs =
+ await tx.peerPullCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // recoup
+
+ {
+ const activeRecs =
+ await tx.recoupGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId: rec.recoupGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // exchange update
+
+ {
+ const exchanges = await tx.exchanges.getAll();
+ for (const rec of exchanges) {
+ const taskIdUpdate = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: rec.baseUrl,
+ });
+ res.taskIds.push(taskIdUpdate);
+ }
+ }
+
+ // FIXME: Recoup!
+ });
+
+ return res;
+}