/* This file is part of GNU Taler (C) 2024 Taler Systems SA GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * Imports. */ import { GlobalIDB } from "@gnu-taler/idb-bridge"; import { AbsoluteTime, AsyncCondition, CancellationToken, Duration, Logger, NotificationType, RetryLoopOpts, TalerError, TalerErrorCode, TalerErrorDetail, TaskThrottler, TransactionIdStr, TransactionState, TransactionType, WalletNotification, assertUnreachable, j2s, makeErrorDetail, } from "@gnu-taler/taler-util"; import { processBackupForProvider } from "./backup/index.js"; import { DbRetryInfo, PendingTaskType, TaskIdStr, TaskRunResult, TaskRunResultType, constructTaskIdentifier, getExchangeState, parseTaskIdentifier, } from "./common.js"; import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, } from "./db.js"; import { computeDepositTransactionStatus, processDepositGroup, } from "./deposits.js"; import { updateExchangeFromUrlHandler } from "./exchanges.js"; import { computePayMerchantTransactionState, computeRefundTransactionState, processPurchase, } from "./pay-merchant.js"; import { computePeerPullCreditTransactionState, processPeerPullCredit, } from "./pay-peer-pull-credit.js"; import { computePeerPullDebitTransactionState, processPeerPullDebit, } from "./pay-peer-pull-debit.js"; import { computePeerPushCreditTransactionState, processPeerPushCredit, } from "./pay-peer-push-credit.js"; import { computePeerPushDebitTransactionState, processPeerPushDebit, } from "./pay-peer-push-debit.js"; import { processRecoupGroup } from "./recoup.js"; import { computeRefreshTransactionState, processRefreshGroup, } from "./refresh.js"; import { computeRewardTransactionStatus } from "./reward.js"; import { constructTransactionIdentifier, parseTransactionIdentifier, } from "./transactions.js"; import { InternalWalletState } from "./wallet.js"; import { computeWithdrawalTransactionStatus, processWithdrawalGroup, } from "./withdraw.js"; const logger = new Logger("shepherd.ts"); /** * Info about one task being shepherded. */ interface ShepherdInfo { cts: CancellationToken.Source; } /** * Check if a task is alive, i.e. whether it prevents * the main task loop from exiting. */ function taskGivesLiveness(taskId: string): boolean { const parsedTaskId = parseTaskIdentifier(taskId); switch (parsedTaskId.tag) { case PendingTaskType.Backup: case PendingTaskType.ExchangeUpdate: return false; case PendingTaskType.Deposit: case PendingTaskType.PeerPullCredit: case PendingTaskType.PeerPullDebit: case PendingTaskType.PeerPushCredit: case PendingTaskType.Refresh: case PendingTaskType.Recoup: case PendingTaskType.RewardPickup: case PendingTaskType.Withdraw: case PendingTaskType.PeerPushDebit: case PendingTaskType.Purchase: return true; default: assertUnreachable(parsedTaskId); } } export class TaskScheduler { private sheps: Map = new Map(); private iterCond = new AsyncCondition(); private throttler = new TaskThrottler(); constructor(private ws: InternalWalletState) {} async loadTasksFromDb(): Promise { const activeTasks = await getActiveTaskIds(this.ws); logger.info(`active tasks from DB: ${j2s(activeTasks)}`); for (const tid of activeTasks.taskIds) { this.startShepherdTask(tid); } } async run(opts: RetryLoopOpts = {}): Promise { logger.info("Running task loop."); this.ws.isTaskLoopRunning = true; await this.loadTasksFromDb(); logger.info("loaded!"); logger.info(`sheps: ${this.sheps.size}`); while (true) { if (opts.stopWhenDone) { let alive = false; const taskIds = [...this.sheps.keys()]; logger.info(`current task IDs: ${j2s(taskIds)}`); logger.info(`sheps: ${this.sheps.size}`); for (const taskId of taskIds) { if (taskGivesLiveness(taskId)) { alive = true; break; } } if (!alive) { logger.info("Breaking out of task loop (no more work)."); break; } } if (this.ws.stopped) { logger.info("Breaking out of task loop (wallet stopped)."); break; } await this.iterCond.wait(); } this.ws.isTaskLoopRunning = false; logger.info("Done with task loop."); } startShepherdTask(taskId: TaskIdStr): void { // Run in the background, no await! this.internalStartShepherdTask(taskId); } /** * Stop and re-load all existing tasks. * * Mostly useful to interrupt all waits when time-travelling. */ reload() { const tasksIds = [...this.sheps.keys()]; logger.info(`reloading sheperd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { this.stopShepherdTask(taskId); } for (const taskId of tasksIds) { this.startShepherdTask(taskId); } } private async internalStartShepherdTask(taskId: TaskIdStr): Promise { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { logger.trace(`Already have a shepherd for ${taskId}`); return; } logger.trace(`Creating new shepherd for ${taskId}`); const newShep: ShepherdInfo = { cts: CancellationToken.create(), }; this.sheps.set(taskId, newShep); try { await this.internalShepherdTask(taskId, newShep); } finally { logger.trace(`Done shepherding ${taskId}`); this.sheps.delete(taskId); this.iterCond.trigger(); } } stopShepherdTask(taskId: TaskIdStr): void { logger.trace(`Stopping shepherding of ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { logger.trace(`Cancelling old shepherd for ${taskId}`); oldShep.cts.cancel(); this.sheps.delete(taskId); this.iterCond.trigger(); } } restartShepherdTask(taskId: TaskIdStr): void { this.stopShepherdTask(taskId); this.startShepherdTask(taskId); } async resetTaskRetries(taskId: TaskIdStr): Promise { const maybeNotification = await this.ws.db.runAllStoresReadWriteTx( async (tx) => { await tx.operationRetries.delete(taskId); return taskToRetryNotification(this.ws, tx, taskId, undefined); }, ); this.stopShepherdTask(taskId); if (maybeNotification) { this.ws.notify(maybeNotification); } this.startShepherdTask(taskId); } private async wait( taskId: TaskIdStr, info: ShepherdInfo, delay: Duration, ): Promise { try { await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); } catch (e) { logger.info(`waiting for ${taskId} interrupted`); } } private async internalShepherdTask( taskId: TaskIdStr, info: ShepherdInfo, ): Promise { while (true) { if (this.ws.stopped) { logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`); return; } if (info.cts.token.isCancelled) { logger.trace(`Shepherd for ${taskId} got cancelled`); return; } const isThrottled = this.throttler.applyThrottle(taskId); if (isThrottled) { logger.warn( `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`, ); logger.warn("waiting for 60 seconds"); await this.ws.timerGroup.resolveAfter( Duration.fromSpec({ seconds: 60 }), ); } const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { return await callOperationHandlerForTaskId( this.ws, taskId, info.cts.token, ); }); const retryRecord = await this.ws.db.runReadOnlyTx( ["operationRetries"], async (tx) => { return tx.operationRetries.get(taskId); }, ); switch (res.type) { case TaskRunResultType.Error: { logger.trace(`Shepherd for ${taskId} got error result.`); if (retryRecord) { let delay: Duration; const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } break; } case TaskRunResultType.Backoff: { logger.trace(`Shepherd for ${taskId} got backoff result.`); if (retryRecord) { let delay: Duration; const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } break; } case TaskRunResultType.Progress: { logger.trace( `Shepherd for ${taskId} got progress result, re-running immediately.`, ); break; } case TaskRunResultType.ScheduleLater: logger.trace(`Shepherd for ${taskId} got schedule-later result.`); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); return; case TaskRunResultType.LongpollReturnedPending: { // Make sure that we are waiting a bit if long-polling returned too early. const endTime = AbsoluteTime.now(); const taskDuration = AbsoluteTime.difference(endTime, startTime); if ( Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 ) { logger.info( `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, ); await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); } else { logger.info(`task ${taskId} will long-poll again`); } break; } default: assertUnreachable(res); } } } } async function storePendingTaskError( ws: InternalWalletState, pendingTaskId: string, e: TalerErrorDetail, ): Promise { logger.info(`storing pending task error for ${pendingTaskId}`); const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); if (!retryRecord) { retryRecord = { id: pendingTaskId, lastError: e, retryInfo: DbRetryInfo.reset(), }; } else { retryRecord.lastError = e; retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); return taskToRetryNotification(ws, tx, pendingTaskId, e); }); if (maybeNotification) { ws.notify(maybeNotification); } } /** * Task made progress, clear error. */ async function storeTaskProgress( ws: InternalWalletState, pendingTaskId: string, ): Promise { await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { await tx.operationRetries.delete(pendingTaskId); }); } async function storePendingTaskPending( ws: InternalWalletState, pendingTaskId: string, ): Promise { const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); let hadError = false; if (!retryRecord) { retryRecord = { id: pendingTaskId, retryInfo: DbRetryInfo.reset(), }; } else { if (retryRecord.lastError) { hadError = true; } delete retryRecord.lastError; retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); if (hadError) { return taskToRetryNotification(ws, tx, pendingTaskId, undefined); } else { return undefined; } }); if (maybeNotification) { ws.notify(maybeNotification); } } async function storePendingTaskFinished( ws: InternalWalletState, pendingTaskId: string, ): Promise { await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { await tx.operationRetries.delete(pendingTaskId); }); } async function runTaskWithErrorReporting( ws: InternalWalletState, opId: TaskIdStr, f: () => Promise, ): Promise { let maybeError: TalerErrorDetail | undefined; try { const resp = await f(); switch (resp.type) { case TaskRunResultType.Error: await storePendingTaskError(ws, opId, resp.errorDetail); return resp; case TaskRunResultType.Finished: await storePendingTaskFinished(ws, opId); return resp; case TaskRunResultType.Backoff: await storePendingTaskPending(ws, opId); return resp; case TaskRunResultType.ScheduleLater: // Task succeeded but wants to be run again. await storeTaskProgress(ws, opId); return resp; case TaskRunResultType.Progress: await storeTaskProgress(ws, opId); return resp; case TaskRunResultType.LongpollReturnedPending: // Longpoll should be run again immediately. await storeTaskProgress(ws, opId); return resp; } } catch (e) { if (e instanceof CryptoApiStoppedError) { if (ws.stopped) { logger.warn("crypto API stopped during shutdown, ignoring error"); return { type: TaskRunResultType.Error, errorDetail: makeErrorDetail( TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, {}, "Crypto API stopped during shutdown", ), }; } } if (e instanceof TalerError) { logger.warn("operation processed resulted in error"); logger.warn(`error was: ${j2s(e.errorDetail)}`); maybeError = e.errorDetail; await storePendingTaskError(ws, opId, maybeError!); return { type: TaskRunResultType.Error, errorDetail: e.errorDetail, }; } else if (e instanceof Error) { // This is a bug, as we expect pending operations to always // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED // or return something. logger.error(`Uncaught exception: ${e.message}`); logger.error(`Stack: ${e.stack}`); maybeError = makeErrorDetail( TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, { stack: e.stack, }, `unexpected exception (message: ${e.message})`, ); await storePendingTaskError(ws, opId, maybeError); return { type: TaskRunResultType.Error, errorDetail: maybeError, }; } else { logger.error("Uncaught exception, value is not even an error."); maybeError = makeErrorDetail( TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, {}, `unexpected exception (not even an error)`, ); await storePendingTaskError(ws, opId, maybeError); return { type: TaskRunResultType.Error, errorDetail: maybeError, }; } } } async function callOperationHandlerForTaskId( ws: InternalWalletState, taskId: TaskIdStr, cancellationToken: CancellationToken, ): Promise { const pending = parseTaskIdentifier(taskId); switch (pending.tag) { case PendingTaskType.ExchangeUpdate: return await updateExchangeFromUrlHandler( ws, pending.exchangeBaseUrl, cancellationToken, ); case PendingTaskType.Refresh: return await processRefreshGroup( ws, pending.refreshGroupId, cancellationToken, ); case PendingTaskType.Withdraw: return await processWithdrawalGroup( ws, pending.withdrawalGroupId, cancellationToken, ); case PendingTaskType.Purchase: return await processPurchase(ws, pending.proposalId, cancellationToken); case PendingTaskType.Recoup: return await processRecoupGroup( ws, pending.recoupGroupId, cancellationToken, ); case PendingTaskType.Deposit: return await processDepositGroup( ws, pending.depositGroupId, cancellationToken, ); case PendingTaskType.Backup: return await processBackupForProvider(ws, pending.backupProviderBaseUrl); case PendingTaskType.PeerPushDebit: return await processPeerPushDebit( ws, pending.pursePub, cancellationToken, ); case PendingTaskType.PeerPullCredit: return await processPeerPullCredit( ws, pending.pursePub, cancellationToken, ); case PendingTaskType.PeerPullDebit: return await processPeerPullDebit( ws, pending.peerPullDebitId, cancellationToken, ); case PendingTaskType.PeerPushCredit: return await processPeerPushCredit( ws, pending.peerPushCreditId, cancellationToken, ); case PendingTaskType.RewardPickup: throw Error("not supported anymore"); default: return assertUnreachable(pending); } throw Error(`not reached ${pending.tag}`); } /** * Generate an appropriate error transition notification * for applicable tasks. * * Namely, transition notifications are generated for: * - exchange update errors * - transactions */ async function taskToRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { const parsedTaskId = parseTaskIdentifier(pendingTaskId); switch (parsedTaskId.tag) { case PendingTaskType.ExchangeUpdate: return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); case PendingTaskType.PeerPullCredit: case PendingTaskType.PeerPullDebit: case PendingTaskType.Withdraw: case PendingTaskType.PeerPushCredit: case PendingTaskType.Deposit: case PendingTaskType.Refresh: case PendingTaskType.RewardPickup: case PendingTaskType.PeerPushDebit: case PendingTaskType.Purchase: return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); case PendingTaskType.Backup: case PendingTaskType.Recoup: return undefined; } } async function getTransactionState( ws: InternalWalletState, tx: WalletDbReadOnlyTransaction< [ "depositGroups", "withdrawalGroups", "purchases", "refundGroups", "peerPullCredit", "peerPullDebit", "peerPushDebit", "peerPushCredit", "rewards", "refreshGroups", ] >, transactionId: string, ): Promise { const parsedTxId = parseTransactionIdentifier(transactionId); if (!parsedTxId) { throw Error("invalid tx identifier"); } switch (parsedTxId.tag) { case TransactionType.Deposit: { const rec = await tx.depositGroups.get(parsedTxId.depositGroupId); if (!rec) { return undefined; } return computeDepositTransactionStatus(rec); } case TransactionType.InternalWithdrawal: case TransactionType.Withdrawal: { const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId); if (!rec) { return undefined; } return computeWithdrawalTransactionStatus(rec); } case TransactionType.Payment: { const rec = await tx.purchases.get(parsedTxId.proposalId); if (!rec) { return; } return computePayMerchantTransactionState(rec); } case TransactionType.Refund: { const rec = await tx.refundGroups.get(parsedTxId.refundGroupId); if (!rec) { return undefined; } return computeRefundTransactionState(rec); } case TransactionType.PeerPullCredit: const rec = await tx.peerPullCredit.get(parsedTxId.pursePub); if (!rec) { return undefined; } return computePeerPullCreditTransactionState(rec); case TransactionType.PeerPullDebit: { const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId); if (!rec) { return undefined; } return computePeerPullDebitTransactionState(rec); } case TransactionType.PeerPushCredit: { const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId); if (!rec) { return undefined; } return computePeerPushCreditTransactionState(rec); } case TransactionType.PeerPushDebit: { const rec = await tx.peerPushDebit.get(parsedTxId.pursePub); if (!rec) { return undefined; } return computePeerPushDebitTransactionState(rec); } case TransactionType.Refresh: { const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId); if (!rec) { return undefined; } return computeRefreshTransactionState(rec); } case TransactionType.Reward: { const rec = await tx.rewards.get(parsedTxId.walletRewardId); if (!rec) { return undefined; } return computeRewardTransactionStatus(rec); } case TransactionType.Recoup: throw Error("not yet supported"); default: assertUnreachable(parsedTxId); } } async function makeTransactionRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { const txId = convertTaskToTransactionId(pendingTaskId); if (!txId) { return undefined; } const txState = await getTransactionState(ws, tx, txId); if (!txState) { return undefined; } const notif: WalletNotification = { type: NotificationType.TransactionStateTransition, transactionId: txId, oldTxState: txState, newTxState: txState, }; if (e) { notif.errorInfo = { code: e.code as number, hint: e.hint, }; } return notif; } async function makeExchangeRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { logger.info("making exchange retry notification"); const parsedTaskId = parseTaskIdentifier(pendingTaskId); if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { throw Error("invalid task identifier"); } const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); if (!rec) { logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); return undefined; } const notif: WalletNotification = { type: NotificationType.ExchangeStateTransition, exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, oldExchangeState: getExchangeState(rec), newExchangeState: getExchangeState(rec), }; if (e) { notif.errorInfo = { code: e.code as number, hint: e.hint, }; } return notif; } /** * Convert the task ID for a task that processes a transaction int * the ID for the transaction. */ function convertTaskToTransactionId( taskId: string, ): TransactionIdStr | undefined { const parsedTaskId = parseTaskIdentifier(taskId); switch (parsedTaskId.tag) { case PendingTaskType.PeerPullCredit: return constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, pursePub: parsedTaskId.pursePub, }); case PendingTaskType.PeerPullDebit: return constructTransactionIdentifier({ tag: TransactionType.PeerPullDebit, peerPullDebitId: parsedTaskId.peerPullDebitId, }); // FIXME: This doesn't distinguish internal-withdrawal. // Maybe we should have a different task type for that as well? // Or maybe transaction IDs should be valid task identifiers? case PendingTaskType.Withdraw: return constructTransactionIdentifier({ tag: TransactionType.Withdrawal, withdrawalGroupId: parsedTaskId.withdrawalGroupId, }); case PendingTaskType.PeerPushCredit: return constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, peerPushCreditId: parsedTaskId.peerPushCreditId, }); case PendingTaskType.Deposit: return constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: parsedTaskId.depositGroupId, }); case PendingTaskType.Refresh: return constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId: parsedTaskId.refreshGroupId, }); case PendingTaskType.RewardPickup: return constructTransactionIdentifier({ tag: TransactionType.Reward, walletRewardId: parsedTaskId.walletRewardId, }); case PendingTaskType.PeerPushDebit: return constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: parsedTaskId.pursePub, }); case PendingTaskType.Purchase: return constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId: parsedTaskId.proposalId, }); default: return undefined; } } export interface ActiveTaskIdsResult { taskIds: TaskIdStr[]; } export async function getActiveTaskIds( ws: InternalWalletState, ): Promise { const res: ActiveTaskIdsResult = { taskIds: [], }; await ws.db.runReadWriteTx( [ "exchanges", "refreshGroups", "withdrawalGroups", "purchases", "depositGroups", "recoupGroups", "peerPullCredit", "peerPushDebit", "peerPullDebit", "peerPushCredit", ], async (tx) => { const active = GlobalIDB.KeyRange.bound( OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, ); // Withdrawals { const activeRecs = await tx.withdrawalGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Withdraw, withdrawalGroupId: rec.withdrawalGroupId, }); res.taskIds.push(taskId); } } // Deposits { const activeRecs = await tx.depositGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId: rec.depositGroupId, }); res.taskIds.push(taskId); } } // Refreshes { const activeRecs = await tx.refreshGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId: rec.refreshGroupId, }); res.taskIds.push(taskId); } } // Purchases { const activeRecs = await tx.purchases.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId: rec.proposalId, }); res.taskIds.push(taskId); } } // peer-push-debit { const activeRecs = await tx.peerPushDebit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub: rec.pursePub, }); res.taskIds.push(taskId); } } // peer-push-credit { const activeRecs = await tx.peerPushCredit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushCredit, peerPushCreditId: rec.peerPushCreditId, }); res.taskIds.push(taskId); } } // peer-pull-debit { const activeRecs = await tx.peerPullDebit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPullDebit, peerPullDebitId: rec.peerPullDebitId, }); res.taskIds.push(taskId); } } // peer-pull-credit { const activeRecs = await tx.peerPullCredit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPullCredit, pursePub: rec.pursePub, }); res.taskIds.push(taskId); } } // recoup { const activeRecs = await tx.recoupGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Recoup, recoupGroupId: rec.recoupGroupId, }); res.taskIds.push(taskId); } } // exchange update { const exchanges = await tx.exchanges.getAll(); for (const rec of exchanges) { const taskIdUpdate = constructTaskIdentifier({ tag: PendingTaskType.ExchangeUpdate, exchangeBaseUrl: rec.baseUrl, }); res.taskIds.push(taskIdUpdate); } } // FIXME: Recoup! }, ); return res; }