diff options
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 1128 |
1 files changed, 1128 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts new file mode 100644 index 000000000..3b160d97f --- /dev/null +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -0,0 +1,1128 @@ +/* + This file is part of GNU Taler + (C) 2024 Taler Systems SA + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +/** + * Imports. + */ +import { GlobalIDB } from "@gnu-taler/idb-bridge"; +import { + AbsoluteTime, + AsyncCondition, + CancellationToken, + Duration, + Logger, + NotificationType, + ObservabilityContext, + ObservabilityEventType, + TalerErrorDetail, + TaskThrottler, + TransactionIdStr, + TransactionState, + TransactionType, + WalletNotification, + assertUnreachable, + getErrorDetailFromException, + j2s, + safeStringifyException, +} from "@gnu-taler/taler-util"; +import { processBackupForProvider } from "./backup/index.js"; +import { + DbRetryInfo, + PendingTaskType, + TaskIdStr, + TaskRunResult, + TaskRunResultType, + constructTaskIdentifier, + getExchangeState, + parseTaskIdentifier, +} from "./common.js"; +import { + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + OperationRetryRecord, + WalletDbAllStoresReadOnlyTransaction, + WalletDbReadOnlyTransaction, + timestampAbsoluteFromDb, +} from "./db.js"; +import { + computeDepositTransactionStatus, + processDepositGroup, +} from "./deposits.js"; +import { + computeDenomLossTransactionStatus, + updateExchangeFromUrlHandler, +} from "./exchanges.js"; +import { + computePayMerchantTransactionState, + computeRefundTransactionState, + processPurchase, +} from "./pay-merchant.js"; +import { + computePeerPullCreditTransactionState, + processPeerPullCredit, +} from "./pay-peer-pull-credit.js"; +import { + computePeerPullDebitTransactionState, + processPeerPullDebit, +} from "./pay-peer-pull-debit.js"; +import { + computePeerPushCreditTransactionState, + processPeerPushCredit, +} from "./pay-peer-push-credit.js"; +import { + computePeerPushDebitTransactionState, + processPeerPushDebit, +} from "./pay-peer-push-debit.js"; +import { processRecoupGroup } from "./recoup.js"; +import { + computeRefreshTransactionState, + processRefreshGroup, +} from "./refresh.js"; +import { + constructTransactionIdentifier, + parseTransactionIdentifier, +} from "./transactions.js"; +import { + InternalWalletState, + WalletExecutionContext, + getNormalWalletExecutionContext, + getObservedWalletExecutionContext, +} from "./wallet.js"; +import { + computeWithdrawalTransactionStatus, + processWithdrawalGroup, +} from "./withdraw.js"; + +const logger = new Logger("shepherd.ts"); + +/** + * Info about one task being shepherded. + */ +interface ShepherdInfo { + cts: CancellationToken.Source; +} + +/** + * Check if a task is alive, i.e. whether it prevents + * the main task loop from exiting. + */ +function taskGivesLiveness(taskId: string): boolean { + const parsedTaskId = parseTaskIdentifier(taskId); + switch (parsedTaskId.tag) { + case PendingTaskType.Backup: + case PendingTaskType.ExchangeUpdate: + return false; + case PendingTaskType.Deposit: + case PendingTaskType.PeerPullCredit: + case PendingTaskType.PeerPullDebit: + case PendingTaskType.PeerPushCredit: + case PendingTaskType.Refresh: + case PendingTaskType.Recoup: + case PendingTaskType.RewardPickup: + case PendingTaskType.Withdraw: + case PendingTaskType.PeerPushDebit: + case PendingTaskType.Purchase: + return true; + default: + assertUnreachable(parsedTaskId); + } +} + +export interface TaskScheduler { + ensureRunning(): Promise<void>; + startShepherdTask(taskId: TaskIdStr): void; + stopShepherdTask(taskId: TaskIdStr): void; + resetTaskRetries(taskId: TaskIdStr): Promise<void>; + reload(): Promise<void>; + getActiveTasks(): TaskIdStr[]; + isIdle(): boolean; + shutdown(): Promise<void>; +} + +export class TaskSchedulerImpl implements TaskScheduler { + private sheps: Map<TaskIdStr, ShepherdInfo> = new Map(); + + private iterCond = new AsyncCondition(); + + private throttler = new TaskThrottler(); + + isRunning: boolean = false; + + constructor(private ws: InternalWalletState) {} + + private async loadTasksFromDb(): Promise<void> { + const activeTasks = await getActiveTaskIds(this.ws); + + logger.info(`active tasks from DB: ${j2s(activeTasks)}`); + + for (const tid of activeTasks.taskIds) { + this.startShepherdTask(tid); + } + } + + getActiveTasks(): TaskIdStr[] { + return [...this.sheps.keys()]; + } + + async shutdown(): Promise<void> { + const tasksIds = [...this.sheps.keys()]; + logger.info(`Stopping task shepherd.`); + for (const taskId of tasksIds) { + this.stopShepherdTask(taskId); + } + } + + async ensureRunning(): Promise<void> { + if (this.isRunning) { + return; + } + this.isRunning = true; + try { + await this.loadTasksFromDb(); + } catch (e) { + this.isRunning = false; + throw e; + } + this.run() + .catch((e) => { + logger.error("error running task loop"); + logger.error(`err: ${e}`); + }) + .then(() => { + logger.trace("done running task loop"); + this.isRunning = false; + }); + } + + isIdle(): boolean { + let alive = false; + const taskIds = [...this.sheps.keys()]; + for (const taskId of taskIds) { + if (taskGivesLiveness(taskId)) { + alive = true; + break; + } + } + // We're idle if no task is alive anymore. + return !alive; + } + + private async run(): Promise<void> { + logger.trace("Running task loop."); + logger.trace(`sheps: ${this.sheps.size}`); + while (true) { + if (this.ws.stopped) { + logger.trace("Breaking out of task loop (wallet stopped)."); + break; + } + + if (this.isIdle()) { + this.ws.notify({ + type: NotificationType.Idle, + }); + } + + await this.iterCond.wait(); + } + logger.trace("Done with task loop."); + } + + startShepherdTask(taskId: TaskIdStr): void { + this.ensureRunning().catch((e) => { + logger.error(`error running scheduler: ${safeStringifyException(e)}`); + }); + // Run in the background, no await! + this.internalStartShepherdTask(taskId); + } + + /** + * Stop and re-load all existing tasks. + * + * Mostly useful to interrupt all waits when time-travelling. + */ + async reload(): Promise<void> { + await this.ensureRunning(); + const tasksIds = [...this.sheps.keys()]; + logger.info(`reloading sheperd with ${tasksIds.length} tasks`); + for (const taskId of tasksIds) { + this.stopShepherdTask(taskId); + } + for (const taskId of tasksIds) { + this.startShepherdTask(taskId); + } + } + + private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> { + logger.trace(`Starting to shepherd task ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Already have a shepherd for ${taskId}`); + return; + } + logger.trace(`Creating new shepherd for ${taskId}`); + const newShep: ShepherdInfo = { + cts: CancellationToken.create(), + }; + this.sheps.set(taskId, newShep); + try { + await this.internalShepherdTask(taskId, newShep); + } finally { + logger.trace(`Done shepherding ${taskId}`); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + stopShepherdTask(taskId: TaskIdStr): void { + logger.trace(`Stopping shepherding of ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Cancelling old shepherd for ${taskId}`); + oldShep.cts.cancel(); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + restartShepherdTask(taskId: TaskIdStr): void { + this.stopShepherdTask(taskId); + this.startShepherdTask(taskId); + } + + async resetTaskRetries(taskId: TaskIdStr): Promise<void> { + const maybeNotification = await this.ws.db.runAllStoresReadWriteTx( + {}, + async (tx) => { + await tx.operationRetries.delete(taskId); + return taskToRetryNotification(this.ws, tx, taskId, undefined); + }, + ); + this.stopShepherdTask(taskId); + if (maybeNotification) { + this.ws.notify(maybeNotification); + } + this.startShepherdTask(taskId); + } + + private async wait( + taskId: TaskIdStr, + info: ShepherdInfo, + delay: Duration, + ): Promise<void> { + try { + await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } + + private async internalShepherdTask( + taskId: TaskIdStr, + info: ShepherdInfo, + ): Promise<void> { + while (true) { + if (this.ws.stopped) { + logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`); + return; + } + if (info.cts.token.isCancelled) { + logger.trace(`Shepherd for ${taskId} got cancelled`); + return; + } + const isThrottled = this.throttler.applyThrottle(taskId); + if (isThrottled) { + logger.warn( + `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`, + ); + logger.warn("waiting for 60 seconds"); + await this.ws.timerGroup.resolveAfter( + Duration.fromSpec({ seconds: 60 }), + ); + } + const wex = getWalletExecutionContextForTask( + this.ws, + taskId, + info.cts.token, + ); + const startTime = AbsoluteTime.now(); + logger.trace(`Shepherd for ${taskId} will call handler`); + let res: TaskRunResult; + try { + res = await callOperationHandlerForTaskId(wex, taskId); + } catch (e) { + res = { + type: TaskRunResultType.Error, + errorDetail: getErrorDetailFromException(e), + }; + } + if (info.cts.token.isCancelled) { + logger.trace("task cancelled, not processing result"); + return; + } + if (this.ws.stopped) { + logger.trace("wallet stopped, not processing result"); + return; + } + wex.oc.observe({ + type: ObservabilityEventType.ShepherdTaskResult, + resultType: res.type, + }); + switch (res.type) { + case TaskRunResultType.Error: { + logger.trace(`Shepherd for ${taskId} got error result.`); + const retryRecord = await storePendingTaskError( + this.ws, + taskId, + res.errorDetail, + ); + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + const delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); + break; + } + case TaskRunResultType.Backoff: { + logger.trace(`Shepherd for ${taskId} got backoff result.`); + const retryRecord = await storePendingTaskPending(this.ws, taskId); + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + const delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); + break; + } + case TaskRunResultType.Progress: { + logger.trace( + `Shepherd for ${taskId} got progress result, re-running immediately.`, + ); + await storeTaskProgress(this.ws, taskId); + break; + } + case TaskRunResultType.ScheduleLater: { + logger.trace(`Shepherd for ${taskId} got schedule-later result.`); + await storeTaskProgress(this.ws, taskId); + const delay = AbsoluteTime.remaining(res.runAt); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); + break; + } + case TaskRunResultType.Finished: + logger.trace(`Shepherd for ${taskId} got finished result.`); + await storePendingTaskFinished(this.ws, taskId); + return; + case TaskRunResultType.LongpollReturnedPending: { + await storeTaskProgress(this.ws, taskId); + // Make sure that we are waiting a bit if long-polling returned too early. + const endTime = AbsoluteTime.now(); + const taskDuration = AbsoluteTime.difference(endTime, startTime); + if ( + Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 + ) { + logger.info( + `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, + ); + await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); + } else { + logger.info(`task ${taskId} will long-poll again`); + } + break; + } + default: + assertUnreachable(res); + } + } + } +} + +async function storePendingTaskError( + ws: InternalWalletState, + pendingTaskId: string, + e: TalerErrorDetail, +): Promise<OperationRetryRecord> { + logger.info(`storing pending task error for ${pendingTaskId}`); + const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + lastError: e, + retryInfo: DbRetryInfo.reset(), + }; + } else { + retryRecord.lastError = e; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + return { + notification: await taskToRetryNotification(ws, tx, pendingTaskId, e), + retryRecord, + }; + }); + if (res?.notification) { + ws.notify(res.notification); + } + return res.retryRecord; +} + +/** + * Task made progress, clear error. + */ +async function storeTaskProgress( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db.runReadWriteTx( + { storeNames: ["operationRetries"] }, + async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }, + ); +} + +async function storePendingTaskPending( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<OperationRetryRecord> { + const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + let hadError = false; + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + retryInfo: DbRetryInfo.reset(), + }; + } else { + if (retryRecord.lastError) { + hadError = true; + } + delete retryRecord.lastError; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + let notification: WalletNotification | undefined = undefined; + if (hadError) { + notification = await taskToRetryNotification( + ws, + tx, + pendingTaskId, + undefined, + ); + } + return { + notification, + retryRecord, + }; + }); + if (res.notification) { + ws.notify(res.notification); + } + return res.retryRecord; +} + +async function storePendingTaskFinished( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db.runReadWriteTx( + { storeNames: ["operationRetries"] }, + async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }, + ); +} + +function getWalletExecutionContextForTask( + ws: InternalWalletState, + taskId: TaskIdStr, + cancellationToken: CancellationToken, +): WalletExecutionContext { + let oc: ObservabilityContext; + let wex: WalletExecutionContext; + + if (ws.config.testing.emitObservabilityEvents) { + oc = { + observe(evt) { + if (ws.config.testing.emitObservabilityEvents) { + ws.notify({ + type: NotificationType.TaskObservabilityEvent, + taskId, + event: evt, + }); + } + }, + }; + + wex = getObservedWalletExecutionContext(ws, cancellationToken, oc); + } else { + oc = { + observe(evt) {}, + }; + wex = getNormalWalletExecutionContext(ws, cancellationToken, oc); + } + return wex; +} + +async function callOperationHandlerForTaskId( + wex: WalletExecutionContext, + taskId: TaskIdStr, +): Promise<TaskRunResult> { + const pending = parseTaskIdentifier(taskId); + switch (pending.tag) { + case PendingTaskType.ExchangeUpdate: + return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl); + case PendingTaskType.Refresh: + return await processRefreshGroup(wex, pending.refreshGroupId); + case PendingTaskType.Withdraw: + return await processWithdrawalGroup(wex, pending.withdrawalGroupId); + case PendingTaskType.Purchase: + return await processPurchase(wex, pending.proposalId); + case PendingTaskType.Recoup: + return await processRecoupGroup(wex, pending.recoupGroupId); + case PendingTaskType.Deposit: + return await processDepositGroup(wex, pending.depositGroupId); + case PendingTaskType.Backup: + return await processBackupForProvider(wex, pending.backupProviderBaseUrl); + case PendingTaskType.PeerPushDebit: + return await processPeerPushDebit(wex, pending.pursePub); + case PendingTaskType.PeerPullCredit: + return await processPeerPullCredit(wex, pending.pursePub); + case PendingTaskType.PeerPullDebit: + return await processPeerPullDebit(wex, pending.peerPullDebitId); + case PendingTaskType.PeerPushCredit: + return await processPeerPushCredit(wex, pending.peerPushCreditId); + case PendingTaskType.RewardPickup: + throw Error("not supported anymore"); + default: + return assertUnreachable(pending); + } + throw Error(`not reached ${pending.tag}`); +} + +/** + * Generate an appropriate error transition notification + * for applicable tasks. + * + * Namely, transition notifications are generated for: + * - exchange update errors + * - transactions + */ +async function taskToRetryNotification( + ws: InternalWalletState, + tx: WalletDbAllStoresReadOnlyTransaction, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + + switch (parsedTaskId.tag) { + case PendingTaskType.ExchangeUpdate: + return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.PeerPullCredit: + case PendingTaskType.PeerPullDebit: + case PendingTaskType.Withdraw: + case PendingTaskType.PeerPushCredit: + case PendingTaskType.Deposit: + case PendingTaskType.Refresh: + case PendingTaskType.RewardPickup: + case PendingTaskType.PeerPushDebit: + case PendingTaskType.Purchase: + return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.Backup: + case PendingTaskType.Recoup: + return undefined; + } +} + +async function getTransactionState( + ws: InternalWalletState, + tx: WalletDbReadOnlyTransaction< + [ + "depositGroups", + "withdrawalGroups", + "purchases", + "refundGroups", + "peerPullCredit", + "peerPullDebit", + "peerPushDebit", + "peerPushCredit", + "rewards", + "refreshGroups", + "denomLossEvents", + ] + >, + transactionId: string, +): Promise<TransactionState | undefined> { + const parsedTxId = parseTransactionIdentifier(transactionId); + if (!parsedTxId) { + throw Error("invalid tx identifier"); + } + switch (parsedTxId.tag) { + case TransactionType.Deposit: { + const rec = await tx.depositGroups.get(parsedTxId.depositGroupId); + if (!rec) { + return undefined; + } + return computeDepositTransactionStatus(rec); + } + case TransactionType.InternalWithdrawal: + case TransactionType.Withdrawal: { + const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId); + if (!rec) { + return undefined; + } + return computeWithdrawalTransactionStatus(rec); + } + case TransactionType.Payment: { + const rec = await tx.purchases.get(parsedTxId.proposalId); + if (!rec) { + return; + } + return computePayMerchantTransactionState(rec); + } + case TransactionType.Refund: { + const rec = await tx.refundGroups.get(parsedTxId.refundGroupId); + if (!rec) { + return undefined; + } + return computeRefundTransactionState(rec); + } + case TransactionType.PeerPullCredit: { + const rec = await tx.peerPullCredit.get(parsedTxId.pursePub); + if (!rec) { + return undefined; + } + return computePeerPullCreditTransactionState(rec); + } + case TransactionType.PeerPullDebit: { + const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId); + if (!rec) { + return undefined; + } + return computePeerPullDebitTransactionState(rec); + } + case TransactionType.PeerPushCredit: { + const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId); + if (!rec) { + return undefined; + } + return computePeerPushCreditTransactionState(rec); + } + case TransactionType.PeerPushDebit: { + const rec = await tx.peerPushDebit.get(parsedTxId.pursePub); + if (!rec) { + return undefined; + } + return computePeerPushDebitTransactionState(rec); + } + case TransactionType.Refresh: { + const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId); + if (!rec) { + return undefined; + } + return computeRefreshTransactionState(rec); + } + case TransactionType.Recoup: + throw Error("not yet supported"); + case TransactionType.DenomLoss: { + const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId); + if (!rec) { + return undefined; + } + return computeDenomLossTransactionStatus(rec); + } + default: + assertUnreachable(parsedTxId); + } +} + +async function makeTransactionRetryNotification( + ws: InternalWalletState, + tx: WalletDbAllStoresReadOnlyTransaction, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + const txId = convertTaskToTransactionId(pendingTaskId); + if (!txId) { + return undefined; + } + const txState = await getTransactionState(ws, tx, txId); + if (!txState) { + return undefined; + } + const notif: WalletNotification = { + type: NotificationType.TransactionStateTransition, + transactionId: txId, + oldTxState: txState, + newTxState: txState, + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +async function makeExchangeRetryNotification( + ws: InternalWalletState, + tx: WalletDbAllStoresReadOnlyTransaction, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + logger.info("making exchange retry notification"); + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { + throw Error("invalid task identifier"); + } + const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); + + if (!rec) { + logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); + return undefined; + } + + const notif: WalletNotification = { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, + oldExchangeState: getExchangeState(rec), + newExchangeState: getExchangeState(rec), + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +export function listTaskForTransactionId(transactionId: string): TaskIdStr[] { + const tid = parseTransactionIdentifier(transactionId); + if (!tid) { + throw Error("invalid task ID"); + } + switch (tid.tag) { + case TransactionType.Deposit: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: tid.depositGroupId, + }), + ]; + case TransactionType.InternalWithdrawal: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: tid.withdrawalGroupId, + }), + ]; + case TransactionType.Payment: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: tid.proposalId, + }), + ]; + case TransactionType.PeerPullCredit: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: tid.pursePub, + }), + ]; + case TransactionType.PeerPullDebit: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.PeerPullDebit, + peerPullDebitId: tid.peerPullDebitId, + }), + ]; + case TransactionType.PeerPushCredit: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: tid.peerPushCreditId, + }), + ]; + case TransactionType.PeerPushDebit: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: tid.pursePub, + }), + ]; + case TransactionType.Recoup: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: tid.recoupGroupId, + }), + ]; + case TransactionType.Refresh: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: tid.refreshGroupId, + }), + ]; + case TransactionType.Refund: + return []; + case TransactionType.Withdrawal: + return [ + constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: tid.withdrawalGroupId, + }), + ]; + case TransactionType.DenomLoss: + return []; + default: + assertUnreachable(tid); + } +} + +/** + * Convert the task ID for a task that processes a transaction int + * the ID for the transaction. + */ +export function convertTaskToTransactionId( + taskId: string, +): TransactionIdStr | undefined { + const parsedTaskId = parseTaskIdentifier(taskId); + switch (parsedTaskId.tag) { + case PendingTaskType.PeerPullCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullCredit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.PeerPullDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullDebitId: parsedTaskId.peerPullDebitId, + }); + // FIXME: This doesn't distinguish internal-withdrawal. + // Maybe we should have a different task type for that as well? + // Or maybe transaction IDs should be valid task identifiers? + case PendingTaskType.Withdraw: + return constructTransactionIdentifier({ + tag: TransactionType.Withdrawal, + withdrawalGroupId: parsedTaskId.withdrawalGroupId, + }); + case PendingTaskType.PeerPushCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushCredit, + peerPushCreditId: parsedTaskId.peerPushCreditId, + }); + case PendingTaskType.Deposit: + return constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: parsedTaskId.depositGroupId, + }); + case PendingTaskType.Refresh: + return constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId: parsedTaskId.refreshGroupId, + }); + case PendingTaskType.PeerPushDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.Purchase: + return constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId: parsedTaskId.proposalId, + }); + default: + return undefined; + } +} + +export interface ActiveTaskIdsResult { + taskIds: TaskIdStr[]; +} + +export async function getActiveTaskIds( + ws: InternalWalletState, +): Promise<ActiveTaskIdsResult> { + const res: ActiveTaskIdsResult = { + taskIds: [], + }; + await ws.db.runReadWriteTx( + { + storeNames: [ + "exchanges", + "refreshGroups", + "withdrawalGroups", + "purchases", + "depositGroups", + "recoupGroups", + "peerPullCredit", + "peerPushDebit", + "peerPullDebit", + "peerPushCredit", + ], + }, + async (tx) => { + const active = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + + // Withdrawals + + { + const activeRecs = + await tx.withdrawalGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: rec.withdrawalGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Deposits + + { + const activeRecs = + await tx.depositGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: rec.depositGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Refreshes + + { + const activeRecs = + await tx.refreshGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: rec.refreshGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Purchases + + { + const activeRecs = await tx.purchases.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: rec.proposalId, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-debit + + { + const activeRecs = + await tx.peerPushDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushDebit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-credit + + { + const activeRecs = + await tx.peerPushCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushCredit, + peerPushCreditId: rec.peerPushCreditId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-debit + + { + const activeRecs = + await tx.peerPullDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullDebit, + peerPullDebitId: rec.peerPullDebitId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-credit + + { + const activeRecs = + await tx.peerPullCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // recoup + + { + const activeRecs = + await tx.recoupGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: rec.recoupGroupId, + }); + res.taskIds.push(taskId); + } + } + + // exchange update + + { + const exchanges = await tx.exchanges.getAll(); + for (const rec of exchanges) { + const taskIdUpdate = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: rec.baseUrl, + }); + res.taskIds.push(taskIdUpdate); + } + } + + // FIXME: Recoup! + }, + ); + + return res; +} |