From e35c2f581b49f6441b6f75bb9ce0a1677d5fb46f Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 22 Jun 2021 13:52:28 +0200 Subject: simplify task loop, test coin suspension --- packages/taler-wallet-core/src/common.ts | 10 +- packages/taler-wallet-core/src/operations/pay.ts | 9 +- packages/taler-wallet-core/src/wallet.ts | 120 ++++++++--------------- 3 files changed, 52 insertions(+), 87 deletions(-) (limited to 'packages/taler-wallet-core') diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 128138eb2..b0b975e7b 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -116,9 +116,15 @@ export interface InternalWalletState { cryptoApi: CryptoApi; timerGroup: TimerGroup; - latch: AsyncCondition; stopped: boolean; - memoRunRetryLoop: AsyncOpMemoSingle; + + /** + * Asynchronous condition to interrupt the sleep of the + * retry loop. + * + * Used to allow processing of new work faster. + */ + latch: AsyncCondition; listeners: NotificationListener[]; diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts index 464c3136f..e662ee72f 100644 --- a/packages/taler-wallet-core/src/operations/pay.ts +++ b/packages/taler-wallet-core/src/operations/pay.ts @@ -205,10 +205,7 @@ export async function getEffectiveDepositAmount( return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } -export function isSpendableCoin( - coin: CoinRecord, - denom: DenominationRecord, -): boolean { +function isSpendableCoin(coin: CoinRecord, denom: DenominationRecord): boolean { if (coin.suspended) { return false; } @@ -721,7 +718,9 @@ async function processDownloadProposalImpl( ); if (!isWellFormed) { - logger.trace(`malformed contract terms: ${j2s(proposalResp.contract_terms)}`); + logger.trace( + `malformed contract terms: ${j2s(proposalResp.contract_terms)}`, + ); const err = makeErrorDetails( TalerErrorCode.WALLET_CONTRACT_TERMS_MALFORMED, "validation for well-formedness failed", diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 6a7ee9de1..de0675cd6 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -276,81 +276,31 @@ export async function runPending( } } -/** - * Run the wallet until there are no more pending operations that give - * liveness left. The wallet will be in a stopped state when this function - * returns without resolving to an exception. - */ -export async function runUntilDone( - ws: InternalWalletState, - req: { - maxRetries?: number; - } = {}, -): Promise { - let done = false; - const p = new Promise((resolve, reject) => { - // Monitor for conditions that means we're done or we - // should quit with an error (due to exceeded retries). - ws.addNotificationListener((n) => { - if (done) { - return; - } - if ( - n.type === NotificationType.WaitingForRetry && - n.numGivingLiveness == 0 - ) { - done = true; - logger.trace("no liveness-giving operations left"); - resolve(); - } - const maxRetries = req.maxRetries; - if (!maxRetries) { - return; - } - getPendingOperations(ws) - .then((pending) => { - for (const p of pending.pendingOperations) { - if (p.retryInfo && p.retryInfo.retryCounter > maxRetries) { - console.warn( - `stopping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`, - ); - ws.stop(); - done = true; - resolve(); - } - } - }) - .catch((e) => { - logger.error(e); - reject(e); - }); - }); - // Run this asynchronously - runRetryLoop(ws).catch((e) => { - logger.error("exception in wallet retry loop"); - reject(e); - }); - }); - await p; +export interface RetryLoopOpts { + /** + * Stop when the number of retries is exceeded for any pending + * operation. + */ + maxRetries?: number; + + /** + * Stop the retry loop when all lifeness-giving pending operations + * are done. + * + * Defaults to false. + */ + stopWhenDone?: boolean; } /** - * Process pending operations and wait for scheduled operations in - * a loop until the wallet is stopped explicitly. + * Main retry loop of the wallet. + * + * Looks up pending operations from the wallet, runs them, repeat. */ -export async function runRetryLoop(ws: InternalWalletState): Promise { - // Make sure we only run one main loop at a time. - return ws.memoRunRetryLoop.memo(async () => { - try { - await runRetryLoopImpl(ws); - } catch (e) { - console.error("error during retry loop execution", e); - throw e; - } - }); -} - -async function runRetryLoopImpl(ws: InternalWalletState): Promise { +async function runTaskLoop( + ws: InternalWalletState, + opts: RetryLoopOpts = {}, +): Promise { for (let iteration = 0; !ws.stopped; iteration++) { const pending = await getPendingOperations(ws); logger.trace(`pending operations: ${j2s(pending)}`); @@ -365,7 +315,22 @@ async function runRetryLoopImpl(ws: InternalWalletState): Promise { if (p.givesLifeness) { numGivingLiveness++; } + + const maxRetries = opts.maxRetries; + + if (maxRetries && p.retryInfo && p.retryInfo.retryCounter > maxRetries) { + logger.warn( + `stopping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`, + ); + return; + } } + + if (opts.stopWhenDone && numGivingLiveness === 0) { + logger.warn(`stopping, as no pending operations have lifeness`); + return; + } + // Make sure that we run tasks that don't give lifeness at least // one time. if (iteration !== 0 && numDue === 0) { @@ -993,19 +958,15 @@ export class Wallet { } runRetryLoop(): Promise { - return runRetryLoop(this.ws); + return runTaskLoop(this.ws); } runPending(forceNow: boolean = false) { return runPending(this.ws, forceNow); } - runUntilDone( - req: { - maxRetries?: number; - } = {}, - ) { - return runUntilDone(this.ws, req); + runTaskLoop(opts: RetryLoopOpts) { + return runTaskLoop(this.ws, opts); } handleCoreApiRequest( @@ -1035,7 +996,6 @@ class InternalWalletStateImpl implements InternalWalletState { timerGroup: TimerGroup = new TimerGroup(); latch = new AsyncCondition(); stopped = false; - memoRunRetryLoop = new AsyncOpMemoSingle(); listeners: NotificationListener[] = []; @@ -1102,7 +1062,7 @@ class InternalWalletStateImpl implements InternalWalletState { maxRetries?: number; } = {}, ): Promise { - runUntilDone(this, req); + await runTaskLoop(this, { ...req, stopWhenDone: true }); } /** -- cgit v1.2.3