From 70a803038f1cbe05dc4779bdd87376fd073421be Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 13 Feb 2024 10:53:43 +0100 Subject: implement task shepherd, many small fixes and tweaks --- packages/taler-wallet-core/src/wallet.ts | 244 ++----------------------------- 1 file changed, 15 insertions(+), 229 deletions(-) (limited to 'packages/taler-wallet-core/src/wallet.ts') diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 42aa8cdfc..0246597be 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -24,7 +24,6 @@ */ import { IDBFactory } from "@gnu-taler/idb-bridge"; import { - AbsoluteTime, AmountString, Amounts, CoinDumpJson, @@ -33,7 +32,6 @@ import { CreateStoredBackupResponse, DeleteStoredBackupRequest, DenominationInfo, - Duration, ExchangesShortListResponse, GetCurrencySpecificationResponse, InitResponse, @@ -42,15 +40,14 @@ import { ListGlobalCurrencyAuditorsResponse, ListGlobalCurrencyExchangesResponse, Logger, - NotificationType, PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, RecoverStoredBackupRequest, + RetryLoopOpts, StoredBackupList, TalerError, TalerErrorCode, TalerUriAction, - TaskThrottler, TestingWaitTransactionRequest, TransactionState, TransactionType, @@ -123,8 +120,6 @@ import { codecForUserAttentionsRequest, codecForValidateIbanRequest, codecForWithdrawTestBalance, - durationFromSpec, - durationMin, getErrorDetailFromException, j2s, parsePaytoUri, @@ -152,7 +147,6 @@ import { } from "./db.js"; import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js"; import { - ActiveLongpollInfo, CancelFn, InternalWalletState, MerchantInfo, @@ -172,23 +166,16 @@ import { getBackupInfo, getBackupRecovery, loadBackupRecovery, - processBackupForProvider, removeBackupProvider, runBackupCycle, setWalletDeviceId, } from "./operations/backup/index.js"; import { getBalanceDetail, getBalances } from "./operations/balance.js"; -import { - TaskRunResult, - TaskRunResultType, - runTaskWithErrorReporting, -} from "./operations/common.js"; import { computeDepositTransactionStatus, createDepositGroup, generateDepositGroupTxId, prepareDepositGroup, - processDepositGroup, } from "./operations/deposits.js"; import { acceptExchangeTermsOfService, @@ -200,7 +187,6 @@ import { getExchangeTos, listExchanges, lookupExchangeByUri, - updateExchangeFromUrlHandler, } from "./operations/exchanges.js"; import { computePayMerchantTransactionState, @@ -209,7 +195,6 @@ import { getContractTermsDetails, preparePayForTemplate, preparePayForUri, - processPurchase, sharePayment, startQueryRefund, startRefundQueryForUri, @@ -218,38 +203,28 @@ import { checkPeerPullPaymentInitiation, computePeerPullCreditTransactionState, initiatePeerPullPayment, - processPeerPullCredit, } from "./operations/pay-peer-pull-credit.js"; import { computePeerPullDebitTransactionState, confirmPeerPullDebit, preparePeerPullDebit, - processPeerPullDebit, } from "./operations/pay-peer-pull-debit.js"; import { computePeerPushCreditTransactionState, confirmPeerPushCredit, preparePeerPushCredit, - processPeerPushCredit, } from "./operations/pay-peer-push-credit.js"; import { checkPeerPushDebit, computePeerPushDebitTransactionState, initiatePeerPushDebit, - processPeerPushDebit, } from "./operations/pay-peer-push-debit.js"; -import { getPendingOperations } from "./operations/pending.js"; -import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js"; +import { createRecoupGroup } from "./operations/recoup.js"; import { - autoRefresh, computeRefreshTransactionState, forceRefresh, - processRefreshGroup, } from "./operations/refresh.js"; -import { - computeRewardTransactionStatus, - processTip, -} from "./operations/reward.js"; +import { computeRewardTransactionStatus } from "./operations/reward.js"; import { runIntegrationTest, runIntegrationTest2, @@ -257,7 +232,6 @@ import { waitTransactionState, waitUntilAllTransactionsFinal, waitUntilRefreshesDone, - waitUntilTasksProcessed, withdrawTestBalance, } from "./operations/testing.js"; import { @@ -279,9 +253,9 @@ import { createManualWithdrawal, getExchangeWithdrawalInfo, getWithdrawalDetailsForUri, - processWithdrawalGroup, } from "./operations/withdraw.js"; -import { PendingTaskInfo, PendingTaskType } from "./pending-types.js"; +import { PendingOperationsResponse } from "./pending-types.js"; +import { TaskScheduler } from "./shepherd.js"; import { assertUnreachable } from "./util/assertUnreachable.js"; import { convertDepositAmount, @@ -320,184 +294,11 @@ import { const logger = new Logger("wallet.ts"); -/** - * Call the right handler for a pending operation without doing - * any special error handling. - */ -async function callOperationHandler( - ws: InternalWalletState, - pending: PendingTaskInfo, -): Promise { - switch (pending.type) { - case PendingTaskType.ExchangeUpdate: - return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl); - case PendingTaskType.Refresh: - return await processRefreshGroup(ws, pending.refreshGroupId); - case PendingTaskType.Withdraw: - return await processWithdrawalGroup(ws, pending.withdrawalGroupId); - case PendingTaskType.RewardPickup: - return await processTip(ws, pending.tipId); - case PendingTaskType.Purchase: - return await processPurchase(ws, pending.proposalId); - case PendingTaskType.Recoup: - return await processRecoupGroup(ws, pending.recoupGroupId); - case PendingTaskType.ExchangeCheckRefresh: - return await autoRefresh(ws, pending.exchangeBaseUrl); - case PendingTaskType.Deposit: - return await processDepositGroup(ws, pending.depositGroupId); - case PendingTaskType.Backup: - return await processBackupForProvider(ws, pending.backupProviderBaseUrl); - case PendingTaskType.PeerPushDebit: - return await processPeerPushDebit(ws, pending.pursePub); - case PendingTaskType.PeerPullCredit: - return await processPeerPullCredit(ws, pending.pursePub); - case PendingTaskType.PeerPullDebit: - return await processPeerPullDebit(ws, pending.peerPullDebitId); - case PendingTaskType.PeerPushCredit: - return await processPeerPushCredit(ws, pending.peerPushCreditId); - default: - return assertUnreachable(pending); - } - throw Error(`not reached ${pending.type}`); -} - -/** - * Process pending operations. - */ -export async function runPending(ws: InternalWalletState): Promise { - const pendingOpsResponse = await getPendingOperations(ws); - for (const p of pendingOpsResponse.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - await runTaskWithErrorReporting(ws, p.id, async () => { - logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); - return await callOperationHandler(ws, p); - }); - } -} - -export interface RetryLoopOpts { - /** - * Stop the retry loop when all lifeness-giving pending operations - * are done. - * - * Defaults to false. - */ - stopWhenDone?: boolean; -} - -/** - * Main retry loop of the wallet. - * - * Looks up pending operations from the wallet, runs them, repeat. - */ async function runTaskLoop( ws: InternalWalletState, opts: RetryLoopOpts = {}, ): Promise { - logger.trace(`running task loop opts=${j2s(opts)}`); - if (ws.isTaskLoopRunning) { - logger.warn( - "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided", - ); - } - const throttler = new TaskThrottler(); - ws.isTaskLoopRunning = true; - for (let iteration = 0; !ws.stopped; iteration++) { - const pending = await getPendingOperations(ws); - logger.trace(`pending operations: ${j2s(pending)}`); - let numGivingLiveness = 0; - let numDue = 0; - let numThrottled = 0; - let minDue: AbsoluteTime = AbsoluteTime.never(); - - for (const p of pending.pendingOperations) { - if (p.givesLifeness) { - numGivingLiveness++; - } - if (!p.isDue) { - continue; - } - numDue++; - - const isThrottled = throttler.applyThrottle(p.id); - - if (isThrottled) { - logger.warn( - `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`, - ); - numDue--; - numThrottled++; - } else { - minDue = AbsoluteTime.min(minDue, p.timestampDue); - } - } - - logger.trace( - `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #throttled=${numThrottled}`, - ); - - if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { - logger.warn(`stopping, as no pending operations have lifeness`); - ws.isTaskLoopRunning = false; - return; - } - - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return; - } - - // Make sure that we run tasks that don't give lifeness at least - // one time. - if (iteration !== 0 && numDue === 0) { - // We've executed pending, due operations at least one. - // Now we don't have any more operations available, - // and need to wait. - - // Wait for at most 5 seconds to the next check. - const dt = durationMin( - durationFromSpec({ - seconds: 5, - }), - Duration.getRemaining(minDue), - ); - logger.trace(`waiting for at most ${dt.d_ms} ms`); - const timeout = ws.timerGroup.resolveAfter(dt); - // Wait until either the timeout, or we are notified (via the latch) - // that more work might be available. - await Promise.race([timeout, ws.workAvailable.wait()]); - logger.trace(`done waiting for available work`); - } else { - logger.trace( - `running ${pending.pendingOperations.length} pending operations`, - ); - for (const p of pending.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - logger.trace(`running task ${p.id}`); - const res = await runTaskWithErrorReporting(ws, p.id, async () => { - return await callOperationHandler(ws, p); - }); - if (!(ws.stopped && res.type === TaskRunResultType.Error)) { - ws.notify({ - type: NotificationType.PendingOperationProcessed, - id: p.id, - taskResultType: res.type, - }); - } - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return; - } - } - } - } - logger.trace("exiting wallet task loop"); - ws.isTaskLoopRunning = false; - return; + await ws.taskScheduler.run(opts); } /** @@ -1035,7 +836,10 @@ async function dispatchRequestInternal( return await getUserAttentionsUnreadCount(ws, req); } case WalletApiOperation.GetPendingOperations: { - return await getPendingOperations(ws); + // FIXME: Eventually remove the handler after deprecation period. + return { + pendingOperations: [], + } satisfies PendingOperationsResponse; } case WalletApiOperation.SetExchangeTosAccepted: { const req = codecForAcceptExchangeTosRequest().decode(payload); @@ -1066,8 +870,7 @@ async function dispatchRequestInternal( return getContractTermsDetails(ws, req.proposalId); } case WalletApiOperation.RetryPendingNow: { - // FIXME: Should we reset all operation retries here? - await runPending(ws); + logger.error("retryPendingNow currently not implemented"); return {}; } case WalletApiOperation.SharePayment: { @@ -1175,10 +978,6 @@ async function dispatchRequestInternal( await waitTransactionState(ws, req.transactionId, req.txState); return {}; } - case WalletApiOperation.TestingWaitTasksProcessed: { - await waitUntilTasksProcessed(ws); - return {}; - } case WalletApiOperation.GetCurrencySpecification: { // Ignore result, just validate in this mock implementation const req = codecForGetCurrencyInfoRequest().decode(payload); @@ -1451,7 +1250,7 @@ async function dispatchRequestInternal( case WalletApiOperation.TestingSetTimetravel: { const req = codecForTestingSetTimetravelRequest().decode(payload); setDangerousTimetravel(req.offsetMs); - ws.workAvailable.trigger(); + ws.taskScheduler.reload(); return {}; } case WalletApiOperation.DeleteExchange: { @@ -1634,11 +1433,6 @@ export class Wallet { this.ws.stop(); } - async runPending(): Promise { - await this.ws.ensureWalletDbOpen(); - return runPending(this.ws); - } - async runTaskLoop(opts?: RetryLoopOpts): Promise { await this.ws.ensureWalletDbOpen(); return runTaskLoop(this.ws, opts); @@ -1660,11 +1454,6 @@ export class Wallet { * This ties together all the operation implementations. */ class InternalWalletStateImpl implements InternalWalletState { - /** - * @see {@link InternalWalletState.activeLongpoll} - */ - activeLongpoll: ActiveLongpollInfo = {}; - cryptoApi: TalerCryptoInterface; cryptoDispatcher: CryptoDispatcher; @@ -1697,6 +1486,8 @@ class InternalWalletStateImpl implements InternalWalletState { isTaskLoopRunning: boolean = false; + taskScheduler: TaskScheduler = new TaskScheduler(this); + config: Readonly; private _db: DbAccess | undefined = undefined; @@ -1843,7 +1634,7 @@ class InternalWalletStateImpl implements InternalWalletState { } notify(n: WalletNotification): void { - logger.trace("Notification", j2s(n)); + logger.trace(`Notification: ${j2s(n)}`); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); setTimeout(() => { @@ -1870,11 +1661,6 @@ class InternalWalletStateImpl implements InternalWalletState { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); this.cryptoDispatcher.stop(); - for (const key of Object.keys(this.activeLongpoll)) { - logger.trace(`cancelling active longpoll ${key}`); - this.activeLongpoll[key].cancel(); - delete this.activeLongpoll[key]; - } } /** -- cgit v1.2.3