diff options
author | Florian Dold <florian@dold.me> | 2024-02-13 10:53:43 +0100 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2024-02-15 21:56:54 +0100 |
commit | 70a803038f1cbe05dc4779bdd87376fd073421be (patch) | |
tree | 6607d69f6906ada9f912e31d9a9e3b65560a7326 /packages/taler-wallet-core/src/shepherd.ts | |
parent | 2c17e98c336d96f955ec82ad0a1b164e3da90103 (diff) | |
download | wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.gz wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.bz2 wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.zip |
implement task shepherd, many small fixes and tweaksdev/dold/task-shepherd
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 851 |
1 files changed, 851 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts new file mode 100644 index 000000000..d1648acc7 --- /dev/null +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -0,0 +1,851 @@ +/* + This file is part of GNU Taler + (C) 2024 Taler Systems SA + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +/** + * Imports. + */ +import { GlobalIDB } from "@gnu-taler/idb-bridge"; +import { + AbsoluteTime, + CancellationToken, + Duration, + Logger, + NotificationType, + RetryLoopOpts, + TalerError, + TalerErrorCode, + TalerErrorDetail, + TaskThrottler, + TransactionIdStr, + TransactionType, + WalletNotification, + assertUnreachable, + j2s, + makeErrorDetail, +} from "@gnu-taler/taler-util"; +import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; +import { + GetReadOnlyAccess, + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + WalletStoresV1, + timestampAbsoluteFromDb, +} from "./index.js"; +import { InternalWalletState } from "./internal-wallet-state.js"; +import { processBackupForProvider } from "./operations/backup/index.js"; +import { + DbRetryInfo, + TaskRunResult, + TaskRunResultType, + constructTaskIdentifier, + getExchangeState, + parseTaskIdentifier, +} from "./operations/common.js"; +import { processDepositGroup } from "./operations/deposits.js"; +import { updateExchangeFromUrlHandler } from "./operations/exchanges.js"; +import { processPurchase } from "./operations/pay-merchant.js"; +import { processPeerPullCredit } from "./operations/pay-peer-pull-credit.js"; +import { processPeerPullDebit } from "./operations/pay-peer-pull-debit.js"; +import { processPeerPushCredit } from "./operations/pay-peer-push-credit.js"; +import { processPeerPushDebit } from "./operations/pay-peer-push-debit.js"; +import { processRecoupGroup } from "./operations/recoup.js"; +import { processRefreshGroup } from "./operations/refresh.js"; +import { constructTransactionIdentifier } from "./operations/transactions.js"; +import { processWithdrawalGroup } from "./operations/withdraw.js"; +import { PendingTaskType, TaskId } from "./pending-types.js"; +import { AsyncCondition } from "./util/promiseUtils.js"; + +const logger = new Logger("shepherd.ts"); + +/** + * Info about one task being shepherded. + */ +interface ShepherdInfo { + cts: CancellationToken.Source; +} + +export class TaskScheduler { + private sheps: Map<TaskId, ShepherdInfo> = new Map(); + + private iterCond = new AsyncCondition(); + + private throttler = new TaskThrottler(); + + constructor(private ws: InternalWalletState) {} + + async loadTasksFromDb(): Promise<void> { + const activeTasks = await getActiveTaskIds(this.ws); + + for (const tid of activeTasks.taskIds) { + this.startShepherdTask(tid); + } + } + + async run(opts: RetryLoopOpts = {}): Promise<void> { + logger.info("Running task loop."); + this.ws.isTaskLoopRunning = true; + await this.loadTasksFromDb(); + while (true) { + if (opts.stopWhenDone && this.sheps.size === 0) { + logger.info("Breaking out of task loop (no more work)."); + break; + } + if (this.ws.stopped) { + logger.info("Breaking out of task loop (wallet stopped)."); + break; + } + await this.iterCond.wait(); + } + this.ws.isTaskLoopRunning = false; + logger.info("Done with task loop."); + } + + startShepherdTask(taskId: TaskId): void { + // Run in the background, no await! + this.internalStartShepherdTask(taskId); + } + + /** + * Stop and re-load all existing tasks. + * + * Mostly useful to interrupt all waits when time-travelling. + */ + reload() { + const tasksIds = [...this.sheps.keys()]; + logger.info(`reloading sheperd with ${tasksIds.length} tasks`); + for (const taskId of tasksIds) { + this.stopShepherdTask(taskId); + } + for (const taskId of tasksIds) { + this.startShepherdTask(taskId); + } + } + + private async internalStartShepherdTask(taskId: TaskId): Promise<void> { + logger.trace(`Starting to shepherd task ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Already have a shepherd for ${taskId}`); + return; + } + logger.trace(`Creating new shepherd for ${taskId}`); + const newShep: ShepherdInfo = { + cts: CancellationToken.create(), + }; + this.sheps.set(taskId, newShep); + try { + await this.internalShepherdTask(taskId, newShep); + } finally { + logger.trace(`Done shepherding ${taskId}`); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + stopShepherdTask(taskId: TaskId): void { + logger.trace(`Stopping shepherding of ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Cancelling old shepherd for ${taskId}`); + oldShep.cts.cancel(); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + restartShepherdTask(taskId: TaskId): void { + this.stopShepherdTask(taskId); + this.startShepherdTask(taskId); + } + + async resetTaskRetries(taskId: TaskId): Promise<void> { + const maybeNotification = await this.ws.db + .mktxAll() + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(taskId); + return taskToRetryNotification(this.ws, tx, taskId, undefined); + }); + this.stopShepherdTask(taskId); + if (maybeNotification) { + this.ws.notify(maybeNotification); + } + this.startShepherdTask(taskId); + } + + private async internalShepherdTask( + taskId: TaskId, + info: ShepherdInfo, + ): Promise<void> { + while (true) { + if (this.ws.stopped) { + logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`); + return; + } + if (info.cts.token.isCancelled) { + logger.trace(`Shepherd for ${taskId} got cancelled`); + return; + } + const isThrottled = this.throttler.applyThrottle(taskId); + if (isThrottled) { + logger.warn( + `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`, + ); + logger.warn("waiting for 60 seconds"); + await this.ws.timerGroup.resolveAfter( + Duration.fromSpec({ seconds: 60 }), + ); + } + logger.trace(`Shepherd for ${taskId} will call handler`); + // FIXME: This should already return the retry record. + const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { + return await callOperationHandlerForTaskId( + this.ws, + taskId, + info.cts.token, + ); + }); + const retryRecord = await this.ws.db.runReadOnlyTx( + ["operationRetries"], + async (tx) => { + return tx.operationRetries.get(taskId); + }, + ); + switch (res.type) { + case TaskRunResultType.Error: { + logger.trace(`Shepherd for ${taskId} got error result.`); + if (retryRecord) { + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } else { + logger.trace("Retrying immediately."); + } + break; + } + case TaskRunResultType.Backoff: { + logger.trace(`Shepherd for ${taskId} got backoff result.`); + if (retryRecord) { + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } else { + logger.trace("Retrying immediately."); + } + break; + } + case TaskRunResultType.Progress: { + logger.trace( + `Shepherd for ${taskId} got progress result, re-running immediately.`, + ); + break; + } + case TaskRunResultType.ScheduleLater: + logger.trace(`Shepherd for ${taskId} got schedule-later result.`); + const delay = AbsoluteTime.remaining(res.runAt); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + break; + case TaskRunResultType.Finished: + logger.trace(`Shepherd for ${taskId} got finished result.`); + return; + default: + assertUnreachable(res); + } + } + } +} + +async function storePendingTaskError( + ws: InternalWalletState, + pendingTaskId: string, + e: TalerErrorDetail, +): Promise<void> { + logger.info(`storing pending task error for ${pendingTaskId}`); + const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + lastError: e, + retryInfo: DbRetryInfo.reset(), + }; + } else { + retryRecord.lastError = e; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + return taskToRetryNotification(ws, tx, pendingTaskId, e); + }); + if (maybeNotification) { + ws.notify(maybeNotification); + } +} + +/** + * Task made progress, clear error. + */ +async function storeTaskProgress( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db.mktxAll().runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +async function storePendingTaskPending( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + let hadError = false; + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + retryInfo: DbRetryInfo.reset(), + }; + } else { + if (retryRecord.lastError) { + hadError = true; + } + delete retryRecord.lastError; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + if (hadError) { + return taskToRetryNotification(ws, tx, pendingTaskId, undefined); + } else { + return undefined; + } + }); + if (maybeNotification) { + ws.notify(maybeNotification); + } +} + +async function storePendingTaskFinished( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +async function runTaskWithErrorReporting( + ws: InternalWalletState, + opId: TaskId, + f: () => Promise<TaskRunResult>, +): Promise<TaskRunResult> { + let maybeError: TalerErrorDetail | undefined; + try { + const resp = await f(); + switch (resp.type) { + case TaskRunResultType.Error: + await storePendingTaskError(ws, opId, resp.errorDetail); + return resp; + case TaskRunResultType.Finished: + await storePendingTaskFinished(ws, opId); + return resp; + case TaskRunResultType.Backoff: + await storePendingTaskPending(ws, opId); + return resp; + case TaskRunResultType.ScheduleLater: + // Task succeeded but wants to be run again. + await storeTaskProgress(ws, opId); + return resp; + case TaskRunResultType.Progress: + await storeTaskProgress(ws, opId); + return resp; + } + } catch (e) { + if (e instanceof CryptoApiStoppedError) { + if (ws.stopped) { + logger.warn("crypto API stopped during shutdown, ignoring error"); + return { + type: TaskRunResultType.Error, + errorDetail: makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + {}, + "Crypto API stopped during shutdown", + ), + }; + } + } + if (e instanceof TalerError) { + logger.warn("operation processed resulted in error"); + logger.warn(`error was: ${j2s(e.errorDetail)}`); + maybeError = e.errorDetail; + await storePendingTaskError(ws, opId, maybeError!); + return { + type: TaskRunResultType.Error, + errorDetail: e.errorDetail, + }; + } else if (e instanceof Error) { + // This is a bug, as we expect pending operations to always + // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED + // or return something. + logger.error(`Uncaught exception: ${e.message}`); + logger.error(`Stack: ${e.stack}`); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + { + stack: e.stack, + }, + `unexpected exception (message: ${e.message})`, + ); + await storePendingTaskError(ws, opId, maybeError); + return { + type: TaskRunResultType.Error, + errorDetail: maybeError, + }; + } else { + logger.error("Uncaught exception, value is not even an error."); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + {}, + `unexpected exception (not even an error)`, + ); + await storePendingTaskError(ws, opId, maybeError); + return { + type: TaskRunResultType.Error, + errorDetail: maybeError, + }; + } + } +} + +async function callOperationHandlerForTaskId( + ws: InternalWalletState, + taskId: TaskId, + cancellationToken: CancellationToken, +): Promise<TaskRunResult> { + const pending = parseTaskIdentifier(taskId); + switch (pending.tag) { + case PendingTaskType.ExchangeUpdate: + return await updateExchangeFromUrlHandler( + ws, + pending.exchangeBaseUrl, + cancellationToken, + ); + case PendingTaskType.Refresh: + return await processRefreshGroup( + ws, + pending.refreshGroupId, + cancellationToken, + ); + case PendingTaskType.Withdraw: + return await processWithdrawalGroup( + ws, + pending.withdrawalGroupId, + cancellationToken, + ); + case PendingTaskType.Purchase: + return await processPurchase(ws, pending.proposalId); + case PendingTaskType.Recoup: + return await processRecoupGroup(ws, pending.recoupGroupId); + case PendingTaskType.Deposit: + return await processDepositGroup( + ws, + pending.depositGroupId, + cancellationToken, + ); + case PendingTaskType.Backup: + return await processBackupForProvider(ws, pending.backupProviderBaseUrl); + case PendingTaskType.PeerPushDebit: + return await processPeerPushDebit( + ws, + pending.pursePub, + cancellationToken, + ); + case PendingTaskType.PeerPullCredit: + return await processPeerPullCredit( + ws, + pending.pursePub, + cancellationToken, + ); + case PendingTaskType.PeerPullDebit: + return await processPeerPullDebit(ws, pending.peerPullDebitId); + case PendingTaskType.PeerPushCredit: + return await processPeerPushCredit( + ws, + pending.peerPushCreditId, + cancellationToken, + ); + case PendingTaskType.RewardPickup: + throw Error("not supported anymore"); + default: + return assertUnreachable(pending); + } + throw Error(`not reached ${pending.tag}`); +} + +/** + * Generate an appropriate error transition notification + * for applicable tasks. + * + * Namely, transition notifications are generated for: + * - exchange update errors + * - transactions + */ +async function taskToRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess<typeof WalletStoresV1>, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + + switch (parsedTaskId.tag) { + case PendingTaskType.ExchangeUpdate: + return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.PeerPullCredit: + case PendingTaskType.PeerPullDebit: + case PendingTaskType.Withdraw: + case PendingTaskType.PeerPushCredit: + case PendingTaskType.Deposit: + case PendingTaskType.Refresh: + case PendingTaskType.RewardPickup: + case PendingTaskType.PeerPushDebit: + case PendingTaskType.Purchase: + return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.Backup: + case PendingTaskType.Recoup: + return undefined; + } +} + +async function makeTransactionRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess<typeof WalletStoresV1>, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + const txId = convertTaskToTransactionId(pendingTaskId); + if (!txId) { + return undefined; + } + const txState = await ws.getTransactionState(ws, tx, txId); + if (!txState) { + return undefined; + } + const notif: WalletNotification = { + type: NotificationType.TransactionStateTransition, + transactionId: txId, + oldTxState: txState, + newTxState: txState, + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +async function makeExchangeRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess<typeof WalletStoresV1>, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise<WalletNotification | undefined> { + logger.info("making exchange retry notification"); + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { + throw Error("invalid task identifier"); + } + const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); + + if (!rec) { + logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); + return undefined; + } + + const notif: WalletNotification = { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, + oldExchangeState: getExchangeState(rec), + newExchangeState: getExchangeState(rec), + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +/** + * Convert the task ID for a task that processes a transaction int + * the ID for the transaction. + */ +function convertTaskToTransactionId( + taskId: string, +): TransactionIdStr | undefined { + const parsedTaskId = parseTaskIdentifier(taskId); + switch (parsedTaskId.tag) { + case PendingTaskType.PeerPullCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullCredit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.PeerPullDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullDebitId: parsedTaskId.peerPullDebitId, + }); + // FIXME: This doesn't distinguish internal-withdrawal. + // Maybe we should have a different task type for that as well? + // Or maybe transaction IDs should be valid task identifiers? + case PendingTaskType.Withdraw: + return constructTransactionIdentifier({ + tag: TransactionType.Withdrawal, + withdrawalGroupId: parsedTaskId.withdrawalGroupId, + }); + case PendingTaskType.PeerPushCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushCredit, + peerPushCreditId: parsedTaskId.peerPushCreditId, + }); + case PendingTaskType.Deposit: + return constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: parsedTaskId.depositGroupId, + }); + case PendingTaskType.Refresh: + return constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId: parsedTaskId.refreshGroupId, + }); + case PendingTaskType.RewardPickup: + return constructTransactionIdentifier({ + tag: TransactionType.Reward, + walletRewardId: parsedTaskId.walletRewardId, + }); + case PendingTaskType.PeerPushDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.Purchase: + return constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId: parsedTaskId.proposalId, + }); + default: + return undefined; + } +} + +export interface ActiveTaskIdsResult { + taskIds: TaskId[]; +} + +export async function getActiveTaskIds( + ws: InternalWalletState, +): Promise<ActiveTaskIdsResult> { + const res: ActiveTaskIdsResult = { + taskIds: [], + }; + await ws.db + .mktx((x) => [ + x.exchanges, + x.refreshGroups, + x.withdrawalGroups, + x.purchases, + x.depositGroups, + x.recoupGroups, + x.peerPullCredit, + x.peerPushDebit, + x.peerPullDebit, + x.peerPushCredit, + ]) + .runReadWrite(async (tx) => { + const active = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + + // Withdrawals + + { + const activeRecs = + await tx.withdrawalGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: rec.withdrawalGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Deposits + + { + const activeRecs = + await tx.depositGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: rec.depositGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Refreshes + + { + const activeRecs = + await tx.refreshGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: rec.refreshGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Purchases + + { + const activeRecs = await tx.purchases.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: rec.proposalId, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-debit + + { + const activeRecs = + await tx.peerPushDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushDebit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-credit + + { + const activeRecs = + await tx.peerPushCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushCredit, + peerPushCreditId: rec.peerPushCreditId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-debit + + { + const activeRecs = + await tx.peerPullDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullDebit, + peerPullDebitId: rec.peerPullDebitId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-credit + + { + const activeRecs = + await tx.peerPullCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // recoup + + { + const activeRecs = + await tx.recoupGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: rec.recoupGroupId, + }); + res.taskIds.push(taskId); + } + } + + // exchange update + + { + const exchanges = await tx.exchanges.getAll(); + for (const rec of exchanges) { + const taskIdUpdate = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: rec.baseUrl, + }); + res.taskIds.push(taskIdUpdate); + } + } + + // FIXME: Recoup! + }); + + return res; +} |