summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts1128
1 files changed, 1128 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
new file mode 100644
index 000000000..3b160d97f
--- /dev/null
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -0,0 +1,1128 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
+import {
+ AbsoluteTime,
+ AsyncCondition,
+ CancellationToken,
+ Duration,
+ Logger,
+ NotificationType,
+ ObservabilityContext,
+ ObservabilityEventType,
+ TalerErrorDetail,
+ TaskThrottler,
+ TransactionIdStr,
+ TransactionState,
+ TransactionType,
+ WalletNotification,
+ assertUnreachable,
+ getErrorDetailFromException,
+ j2s,
+ safeStringifyException,
+} from "@gnu-taler/taler-util";
+import { processBackupForProvider } from "./backup/index.js";
+import {
+ DbRetryInfo,
+ PendingTaskType,
+ TaskIdStr,
+ TaskRunResult,
+ TaskRunResultType,
+ constructTaskIdentifier,
+ getExchangeState,
+ parseTaskIdentifier,
+} from "./common.js";
+import {
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ OperationRetryRecord,
+ WalletDbAllStoresReadOnlyTransaction,
+ WalletDbReadOnlyTransaction,
+ timestampAbsoluteFromDb,
+} from "./db.js";
+import {
+ computeDepositTransactionStatus,
+ processDepositGroup,
+} from "./deposits.js";
+import {
+ computeDenomLossTransactionStatus,
+ updateExchangeFromUrlHandler,
+} from "./exchanges.js";
+import {
+ computePayMerchantTransactionState,
+ computeRefundTransactionState,
+ processPurchase,
+} from "./pay-merchant.js";
+import {
+ computePeerPullCreditTransactionState,
+ processPeerPullCredit,
+} from "./pay-peer-pull-credit.js";
+import {
+ computePeerPullDebitTransactionState,
+ processPeerPullDebit,
+} from "./pay-peer-pull-debit.js";
+import {
+ computePeerPushCreditTransactionState,
+ processPeerPushCredit,
+} from "./pay-peer-push-credit.js";
+import {
+ computePeerPushDebitTransactionState,
+ processPeerPushDebit,
+} from "./pay-peer-push-debit.js";
+import { processRecoupGroup } from "./recoup.js";
+import {
+ computeRefreshTransactionState,
+ processRefreshGroup,
+} from "./refresh.js";
+import {
+ constructTransactionIdentifier,
+ parseTransactionIdentifier,
+} from "./transactions.js";
+import {
+ InternalWalletState,
+ WalletExecutionContext,
+ getNormalWalletExecutionContext,
+ getObservedWalletExecutionContext,
+} from "./wallet.js";
+import {
+ computeWithdrawalTransactionStatus,
+ processWithdrawalGroup,
+} from "./withdraw.js";
+
+const logger = new Logger("shepherd.ts");
+
+/**
+ * Info about one task being shepherded.
+ */
+interface ShepherdInfo {
+ cts: CancellationToken.Source;
+}
+
+/**
+ * Check if a task is alive, i.e. whether it prevents
+ * the main task loop from exiting.
+ */
+function taskGivesLiveness(taskId: string): boolean {
+ const parsedTaskId = parseTaskIdentifier(taskId);
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.Backup:
+ case PendingTaskType.ExchangeUpdate:
+ return false;
+ case PendingTaskType.Deposit:
+ case PendingTaskType.PeerPullCredit:
+ case PendingTaskType.PeerPullDebit:
+ case PendingTaskType.PeerPushCredit:
+ case PendingTaskType.Refresh:
+ case PendingTaskType.Recoup:
+ case PendingTaskType.RewardPickup:
+ case PendingTaskType.Withdraw:
+ case PendingTaskType.PeerPushDebit:
+ case PendingTaskType.Purchase:
+ return true;
+ default:
+ assertUnreachable(parsedTaskId);
+ }
+}
+
+export interface TaskScheduler {
+ ensureRunning(): Promise<void>;
+ startShepherdTask(taskId: TaskIdStr): void;
+ stopShepherdTask(taskId: TaskIdStr): void;
+ resetTaskRetries(taskId: TaskIdStr): Promise<void>;
+ reload(): Promise<void>;
+ getActiveTasks(): TaskIdStr[];
+ isIdle(): boolean;
+ shutdown(): Promise<void>;
+}
+
+export class TaskSchedulerImpl implements TaskScheduler {
+ private sheps: Map<TaskIdStr, ShepherdInfo> = new Map();
+
+ private iterCond = new AsyncCondition();
+
+ private throttler = new TaskThrottler();
+
+ isRunning: boolean = false;
+
+ constructor(private ws: InternalWalletState) {}
+
+ private async loadTasksFromDb(): Promise<void> {
+ const activeTasks = await getActiveTaskIds(this.ws);
+
+ logger.info(`active tasks from DB: ${j2s(activeTasks)}`);
+
+ for (const tid of activeTasks.taskIds) {
+ this.startShepherdTask(tid);
+ }
+ }
+
+ getActiveTasks(): TaskIdStr[] {
+ return [...this.sheps.keys()];
+ }
+
+ async shutdown(): Promise<void> {
+ const tasksIds = [...this.sheps.keys()];
+ logger.info(`Stopping task shepherd.`);
+ for (const taskId of tasksIds) {
+ this.stopShepherdTask(taskId);
+ }
+ }
+
+ async ensureRunning(): Promise<void> {
+ if (this.isRunning) {
+ return;
+ }
+ this.isRunning = true;
+ try {
+ await this.loadTasksFromDb();
+ } catch (e) {
+ this.isRunning = false;
+ throw e;
+ }
+ this.run()
+ .catch((e) => {
+ logger.error("error running task loop");
+ logger.error(`err: ${e}`);
+ })
+ .then(() => {
+ logger.trace("done running task loop");
+ this.isRunning = false;
+ });
+ }
+
+ isIdle(): boolean {
+ let alive = false;
+ const taskIds = [...this.sheps.keys()];
+ for (const taskId of taskIds) {
+ if (taskGivesLiveness(taskId)) {
+ alive = true;
+ break;
+ }
+ }
+ // We're idle if no task is alive anymore.
+ return !alive;
+ }
+
+ private async run(): Promise<void> {
+ logger.trace("Running task loop.");
+ logger.trace(`sheps: ${this.sheps.size}`);
+ while (true) {
+ if (this.ws.stopped) {
+ logger.trace("Breaking out of task loop (wallet stopped).");
+ break;
+ }
+
+ if (this.isIdle()) {
+ this.ws.notify({
+ type: NotificationType.Idle,
+ });
+ }
+
+ await this.iterCond.wait();
+ }
+ logger.trace("Done with task loop.");
+ }
+
+ startShepherdTask(taskId: TaskIdStr): void {
+ this.ensureRunning().catch((e) => {
+ logger.error(`error running scheduler: ${safeStringifyException(e)}`);
+ });
+ // Run in the background, no await!
+ this.internalStartShepherdTask(taskId);
+ }
+
+ /**
+ * Stop and re-load all existing tasks.
+ *
+ * Mostly useful to interrupt all waits when time-travelling.
+ */
+ async reload(): Promise<void> {
+ await this.ensureRunning();
+ const tasksIds = [...this.sheps.keys()];
+ logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
+ for (const taskId of tasksIds) {
+ this.stopShepherdTask(taskId);
+ }
+ for (const taskId of tasksIds) {
+ this.startShepherdTask(taskId);
+ }
+ }
+
+ private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> {
+ logger.trace(`Starting to shepherd task ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ }
+ logger.trace(`Creating new shepherd for ${taskId}`);
+ const newShep: ShepherdInfo = {
+ cts: CancellationToken.create(),
+ };
+ this.sheps.set(taskId, newShep);
+ try {
+ await this.internalShepherdTask(taskId, newShep);
+ } finally {
+ logger.trace(`Done shepherding ${taskId}`);
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ stopShepherdTask(taskId: TaskIdStr): void {
+ logger.trace(`Stopping shepherding of ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Cancelling old shepherd for ${taskId}`);
+ oldShep.cts.cancel();
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ restartShepherdTask(taskId: TaskIdStr): void {
+ this.stopShepherdTask(taskId);
+ this.startShepherdTask(taskId);
+ }
+
+ async resetTaskRetries(taskId: TaskIdStr): Promise<void> {
+ const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
+ {},
+ async (tx) => {
+ await tx.operationRetries.delete(taskId);
+ return taskToRetryNotification(this.ws, tx, taskId, undefined);
+ },
+ );
+ this.stopShepherdTask(taskId);
+ if (maybeNotification) {
+ this.ws.notify(maybeNotification);
+ }
+ this.startShepherdTask(taskId);
+ }
+
+ private async wait(
+ taskId: TaskIdStr,
+ info: ShepherdInfo,
+ delay: Duration,
+ ): Promise<void> {
+ try {
+ await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ }
+
+ private async internalShepherdTask(
+ taskId: TaskIdStr,
+ info: ShepherdInfo,
+ ): Promise<void> {
+ while (true) {
+ if (this.ws.stopped) {
+ logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
+ return;
+ }
+ if (info.cts.token.isCancelled) {
+ logger.trace(`Shepherd for ${taskId} got cancelled`);
+ return;
+ }
+ const isThrottled = this.throttler.applyThrottle(taskId);
+ if (isThrottled) {
+ logger.warn(
+ `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
+ );
+ logger.warn("waiting for 60 seconds");
+ await this.ws.timerGroup.resolveAfter(
+ Duration.fromSpec({ seconds: 60 }),
+ );
+ }
+ const wex = getWalletExecutionContextForTask(
+ this.ws,
+ taskId,
+ info.cts.token,
+ );
+ const startTime = AbsoluteTime.now();
+ logger.trace(`Shepherd for ${taskId} will call handler`);
+ let res: TaskRunResult;
+ try {
+ res = await callOperationHandlerForTaskId(wex, taskId);
+ } catch (e) {
+ res = {
+ type: TaskRunResultType.Error,
+ errorDetail: getErrorDetailFromException(e),
+ };
+ }
+ if (info.cts.token.isCancelled) {
+ logger.trace("task cancelled, not processing result");
+ return;
+ }
+ if (this.ws.stopped) {
+ logger.trace("wallet stopped, not processing result");
+ return;
+ }
+ wex.oc.observe({
+ type: ObservabilityEventType.ShepherdTaskResult,
+ resultType: res.type,
+ });
+ switch (res.type) {
+ case TaskRunResultType.Error: {
+ logger.trace(`Shepherd for ${taskId} got error result.`);
+ const retryRecord = await storePendingTaskError(
+ this.ws,
+ taskId,
+ res.errorDetail,
+ );
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ const delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
+ break;
+ }
+ case TaskRunResultType.Backoff: {
+ logger.trace(`Shepherd for ${taskId} got backoff result.`);
+ const retryRecord = await storePendingTaskPending(this.ws, taskId);
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ const delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
+ break;
+ }
+ case TaskRunResultType.Progress: {
+ logger.trace(
+ `Shepherd for ${taskId} got progress result, re-running immediately.`,
+ );
+ await storeTaskProgress(this.ws, taskId);
+ break;
+ }
+ case TaskRunResultType.ScheduleLater: {
+ logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+ await storeTaskProgress(this.ws, taskId);
+ const delay = AbsoluteTime.remaining(res.runAt);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
+ break;
+ }
+ case TaskRunResultType.Finished:
+ logger.trace(`Shepherd for ${taskId} got finished result.`);
+ await storePendingTaskFinished(this.ws, taskId);
+ return;
+ case TaskRunResultType.LongpollReturnedPending: {
+ await storeTaskProgress(this.ws, taskId);
+ // Make sure that we are waiting a bit if long-polling returned too early.
+ const endTime = AbsoluteTime.now();
+ const taskDuration = AbsoluteTime.difference(endTime, startTime);
+ if (
+ Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
+ ) {
+ logger.info(
+ `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
+ );
+ await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
+ } else {
+ logger.info(`task ${taskId} will long-poll again`);
+ }
+ break;
+ }
+ default:
+ assertUnreachable(res);
+ }
+ }
+ }
+}
+
+async function storePendingTaskError(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+ e: TalerErrorDetail,
+): Promise<OperationRetryRecord> {
+ logger.info(`storing pending task error for ${pendingTaskId}`);
+ const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ lastError: e,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ retryRecord.lastError = e;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ return {
+ notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
+ retryRecord,
+ };
+ });
+ if (res?.notification) {
+ ws.notify(res.notification);
+ }
+ return res.retryRecord;
+}
+
+/**
+ * Task made progress, clear error.
+ */
+async function storeTaskProgress(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db.runReadWriteTx(
+ { storeNames: ["operationRetries"] },
+ async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ },
+ );
+}
+
+async function storePendingTaskPending(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<OperationRetryRecord> {
+ const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ let hadError = false;
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ if (retryRecord.lastError) {
+ hadError = true;
+ }
+ delete retryRecord.lastError;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ let notification: WalletNotification | undefined = undefined;
+ if (hadError) {
+ notification = await taskToRetryNotification(
+ ws,
+ tx,
+ pendingTaskId,
+ undefined,
+ );
+ }
+ return {
+ notification,
+ retryRecord,
+ };
+ });
+ if (res.notification) {
+ ws.notify(res.notification);
+ }
+ return res.retryRecord;
+}
+
+async function storePendingTaskFinished(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db.runReadWriteTx(
+ { storeNames: ["operationRetries"] },
+ async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ },
+ );
+}
+
+function getWalletExecutionContextForTask(
+ ws: InternalWalletState,
+ taskId: TaskIdStr,
+ cancellationToken: CancellationToken,
+): WalletExecutionContext {
+ let oc: ObservabilityContext;
+ let wex: WalletExecutionContext;
+
+ if (ws.config.testing.emitObservabilityEvents) {
+ oc = {
+ observe(evt) {
+ if (ws.config.testing.emitObservabilityEvents) {
+ ws.notify({
+ type: NotificationType.TaskObservabilityEvent,
+ taskId,
+ event: evt,
+ });
+ }
+ },
+ };
+
+ wex = getObservedWalletExecutionContext(ws, cancellationToken, oc);
+ } else {
+ oc = {
+ observe(evt) {},
+ };
+ wex = getNormalWalletExecutionContext(ws, cancellationToken, oc);
+ }
+ return wex;
+}
+
+async function callOperationHandlerForTaskId(
+ wex: WalletExecutionContext,
+ taskId: TaskIdStr,
+): Promise<TaskRunResult> {
+ const pending = parseTaskIdentifier(taskId);
+ switch (pending.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl);
+ case PendingTaskType.Refresh:
+ return await processRefreshGroup(wex, pending.refreshGroupId);
+ case PendingTaskType.Withdraw:
+ return await processWithdrawalGroup(wex, pending.withdrawalGroupId);
+ case PendingTaskType.Purchase:
+ return await processPurchase(wex, pending.proposalId);
+ case PendingTaskType.Recoup:
+ return await processRecoupGroup(wex, pending.recoupGroupId);
+ case PendingTaskType.Deposit:
+ return await processDepositGroup(wex, pending.depositGroupId);
+ case PendingTaskType.Backup:
+ return await processBackupForProvider(wex, pending.backupProviderBaseUrl);
+ case PendingTaskType.PeerPushDebit:
+ return await processPeerPushDebit(wex, pending.pursePub);
+ case PendingTaskType.PeerPullCredit:
+ return await processPeerPullCredit(wex, pending.pursePub);
+ case PendingTaskType.PeerPullDebit:
+ return await processPeerPullDebit(wex, pending.peerPullDebitId);
+ case PendingTaskType.PeerPushCredit:
+ return await processPeerPushCredit(wex, pending.peerPushCreditId);
+ case PendingTaskType.RewardPickup:
+ throw Error("not supported anymore");
+ default:
+ return assertUnreachable(pending);
+ }
+ throw Error(`not reached ${pending.tag}`);
+}
+
+/**
+ * Generate an appropriate error transition notification
+ * for applicable tasks.
+ *
+ * Namely, transition notifications are generated for:
+ * - exchange update errors
+ * - transactions
+ */
+async function taskToRetryNotification(
+ ws: InternalWalletState,
+ tx: WalletDbAllStoresReadOnlyTransaction,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.PeerPullCredit:
+ case PendingTaskType.PeerPullDebit:
+ case PendingTaskType.Withdraw:
+ case PendingTaskType.PeerPushCredit:
+ case PendingTaskType.Deposit:
+ case PendingTaskType.Refresh:
+ case PendingTaskType.RewardPickup:
+ case PendingTaskType.PeerPushDebit:
+ case PendingTaskType.Purchase:
+ return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.Backup:
+ case PendingTaskType.Recoup:
+ return undefined;
+ }
+}
+
+async function getTransactionState(
+ ws: InternalWalletState,
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "depositGroups",
+ "withdrawalGroups",
+ "purchases",
+ "refundGroups",
+ "peerPullCredit",
+ "peerPullDebit",
+ "peerPushDebit",
+ "peerPushCredit",
+ "rewards",
+ "refreshGroups",
+ "denomLossEvents",
+ ]
+ >,
+ transactionId: string,
+): Promise<TransactionState | undefined> {
+ const parsedTxId = parseTransactionIdentifier(transactionId);
+ if (!parsedTxId) {
+ throw Error("invalid tx identifier");
+ }
+ switch (parsedTxId.tag) {
+ case TransactionType.Deposit: {
+ const rec = await tx.depositGroups.get(parsedTxId.depositGroupId);
+ if (!rec) {
+ return undefined;
+ }
+ return computeDepositTransactionStatus(rec);
+ }
+ case TransactionType.InternalWithdrawal:
+ case TransactionType.Withdrawal: {
+ const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId);
+ if (!rec) {
+ return undefined;
+ }
+ return computeWithdrawalTransactionStatus(rec);
+ }
+ case TransactionType.Payment: {
+ const rec = await tx.purchases.get(parsedTxId.proposalId);
+ if (!rec) {
+ return;
+ }
+ return computePayMerchantTransactionState(rec);
+ }
+ case TransactionType.Refund: {
+ const rec = await tx.refundGroups.get(parsedTxId.refundGroupId);
+ if (!rec) {
+ return undefined;
+ }
+ return computeRefundTransactionState(rec);
+ }
+ case TransactionType.PeerPullCredit: {
+ const rec = await tx.peerPullCredit.get(parsedTxId.pursePub);
+ if (!rec) {
+ return undefined;
+ }
+ return computePeerPullCreditTransactionState(rec);
+ }
+ case TransactionType.PeerPullDebit: {
+ const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId);
+ if (!rec) {
+ return undefined;
+ }
+ return computePeerPullDebitTransactionState(rec);
+ }
+ case TransactionType.PeerPushCredit: {
+ const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId);
+ if (!rec) {
+ return undefined;
+ }
+ return computePeerPushCreditTransactionState(rec);
+ }
+ case TransactionType.PeerPushDebit: {
+ const rec = await tx.peerPushDebit.get(parsedTxId.pursePub);
+ if (!rec) {
+ return undefined;
+ }
+ return computePeerPushDebitTransactionState(rec);
+ }
+ case TransactionType.Refresh: {
+ const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId);
+ if (!rec) {
+ return undefined;
+ }
+ return computeRefreshTransactionState(rec);
+ }
+ case TransactionType.Recoup:
+ throw Error("not yet supported");
+ case TransactionType.DenomLoss: {
+ const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId);
+ if (!rec) {
+ return undefined;
+ }
+ return computeDenomLossTransactionStatus(rec);
+ }
+ default:
+ assertUnreachable(parsedTxId);
+ }
+}
+
+async function makeTransactionRetryNotification(
+ ws: InternalWalletState,
+ tx: WalletDbAllStoresReadOnlyTransaction,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const txId = convertTaskToTransactionId(pendingTaskId);
+ if (!txId) {
+ return undefined;
+ }
+ const txState = await getTransactionState(ws, tx, txId);
+ if (!txState) {
+ return undefined;
+ }
+ const notif: WalletNotification = {
+ type: NotificationType.TransactionStateTransition,
+ transactionId: txId,
+ oldTxState: txState,
+ newTxState: txState,
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+async function makeExchangeRetryNotification(
+ ws: InternalWalletState,
+ tx: WalletDbAllStoresReadOnlyTransaction,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ logger.info("making exchange retry notification");
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+ if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
+ throw Error("invalid task identifier");
+ }
+ const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
+
+ if (!rec) {
+ logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
+ return undefined;
+ }
+
+ const notif: WalletNotification = {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
+ oldExchangeState: getExchangeState(rec),
+ newExchangeState: getExchangeState(rec),
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+export function listTaskForTransactionId(transactionId: string): TaskIdStr[] {
+ const tid = parseTransactionIdentifier(transactionId);
+ if (!tid) {
+ throw Error("invalid task ID");
+ }
+ switch (tid.tag) {
+ case TransactionType.Deposit:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: tid.depositGroupId,
+ }),
+ ];
+ case TransactionType.InternalWithdrawal:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: tid.withdrawalGroupId,
+ }),
+ ];
+ case TransactionType.Payment:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: tid.proposalId,
+ }),
+ ];
+ case TransactionType.PeerPullCredit:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: tid.pursePub,
+ }),
+ ];
+ case TransactionType.PeerPullDebit:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullDebit,
+ peerPullDebitId: tid.peerPullDebitId,
+ }),
+ ];
+ case TransactionType.PeerPushCredit:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: tid.peerPushCreditId,
+ }),
+ ];
+ case TransactionType.PeerPushDebit:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: tid.pursePub,
+ }),
+ ];
+ case TransactionType.Recoup:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId: tid.recoupGroupId,
+ }),
+ ];
+ case TransactionType.Refresh:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: tid.refreshGroupId,
+ }),
+ ];
+ case TransactionType.Refund:
+ return [];
+ case TransactionType.Withdrawal:
+ return [
+ constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: tid.withdrawalGroupId,
+ }),
+ ];
+ case TransactionType.DenomLoss:
+ return [];
+ default:
+ assertUnreachable(tid);
+ }
+}
+
+/**
+ * Convert the task ID for a task that processes a transaction int
+ * the ID for the transaction.
+ */
+export function convertTaskToTransactionId(
+ taskId: string,
+): TransactionIdStr | undefined {
+ const parsedTaskId = parseTaskIdentifier(taskId);
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.PeerPullCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullCredit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.PeerPullDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullDebitId: parsedTaskId.peerPullDebitId,
+ });
+ // FIXME: This doesn't distinguish internal-withdrawal.
+ // Maybe we should have a different task type for that as well?
+ // Or maybe transaction IDs should be valid task identifiers?
+ case PendingTaskType.Withdraw:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Withdrawal,
+ withdrawalGroupId: parsedTaskId.withdrawalGroupId,
+ });
+ case PendingTaskType.PeerPushCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushCredit,
+ peerPushCreditId: parsedTaskId.peerPushCreditId,
+ });
+ case PendingTaskType.Deposit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: parsedTaskId.depositGroupId,
+ });
+ case PendingTaskType.Refresh:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId: parsedTaskId.refreshGroupId,
+ });
+ case PendingTaskType.PeerPushDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushDebit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.Purchase:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Payment,
+ proposalId: parsedTaskId.proposalId,
+ });
+ default:
+ return undefined;
+ }
+}
+
+export interface ActiveTaskIdsResult {
+ taskIds: TaskIdStr[];
+}
+
+export async function getActiveTaskIds(
+ ws: InternalWalletState,
+): Promise<ActiveTaskIdsResult> {
+ const res: ActiveTaskIdsResult = {
+ taskIds: [],
+ };
+ await ws.db.runReadWriteTx(
+ {
+ storeNames: [
+ "exchanges",
+ "refreshGroups",
+ "withdrawalGroups",
+ "purchases",
+ "depositGroups",
+ "recoupGroups",
+ "peerPullCredit",
+ "peerPushDebit",
+ "peerPullDebit",
+ "peerPushCredit",
+ ],
+ },
+ async (tx) => {
+ const active = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+
+ // Withdrawals
+
+ {
+ const activeRecs =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: rec.withdrawalGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Deposits
+
+ {
+ const activeRecs =
+ await tx.depositGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: rec.depositGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Refreshes
+
+ {
+ const activeRecs =
+ await tx.refreshGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: rec.refreshGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Purchases
+
+ {
+ const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: rec.proposalId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-debit
+
+ {
+ const activeRecs =
+ await tx.peerPushDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushDebit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-credit
+
+ {
+ const activeRecs =
+ await tx.peerPushCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushCredit,
+ peerPushCreditId: rec.peerPushCreditId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-debit
+
+ {
+ const activeRecs =
+ await tx.peerPullDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullDebit,
+ peerPullDebitId: rec.peerPullDebitId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-credit
+
+ {
+ const activeRecs =
+ await tx.peerPullCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // recoup
+
+ {
+ const activeRecs =
+ await tx.recoupGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId: rec.recoupGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // exchange update
+
+ {
+ const exchanges = await tx.exchanges.getAll();
+ for (const rec of exchanges) {
+ const taskIdUpdate = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: rec.baseUrl,
+ });
+ res.taskIds.push(taskIdUpdate);
+ }
+ }
+
+ // FIXME: Recoup!
+ },
+ );
+
+ return res;
+}