From 651db75296bfe7c35dc7d29e39f25e6dacb72930 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 2 Apr 2024 18:19:11 +0200 Subject: wallet-core: refresh cleanup, preparations for #8568 --- .../test-wallet-refresh-blocked.ts | 66 ++++ .../src/integrationtests/testrunner.ts | 2 + packages/taler-wallet-core/src/common.ts | 10 +- packages/taler-wallet-core/src/db.ts | 5 + packages/taler-wallet-core/src/deposits.ts | 8 +- packages/taler-wallet-core/src/exchanges.ts | 1 + packages/taler-wallet-core/src/pay-merchant.ts | 4 + .../taler-wallet-core/src/pay-peer-pull-debit.ts | 2 + .../taler-wallet-core/src/pay-peer-push-debit.ts | 3 + packages/taler-wallet-core/src/recoup.ts | 1 + packages/taler-wallet-core/src/refresh.ts | 407 +++++++++++---------- 11 files changed, 306 insertions(+), 203 deletions(-) create mode 100644 packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts diff --git a/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts new file mode 100644 index 000000000..806802612 --- /dev/null +++ b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts @@ -0,0 +1,66 @@ +/* + This file is part of GNU Taler + (C) 2020 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * Imports. + */ +import { WalletApiOperation } from "@gnu-taler/taler-wallet-core"; +import { GlobalTestState } from "../harness/harness.js"; +import { + createSimpleTestkudosEnvironmentV2, + createWalletDaemonWithClient, + withdrawViaBankV2, +} from "../harness/helpers.js"; + +/** + * Run test for refreshe after a payment. + */ +export async function runWalletRefreshBlockedTest(t: GlobalTestState) { + // Set up test environment + + const { walletClient, bank, exchange, merchant } = + await createSimpleTestkudosEnvironmentV2(t); + + // Withdraw digital cash into the wallet. + + const { walletClient: w1 } = await createWalletDaemonWithClient(t, { + name: "w1", + config: { + testing: { + devModeActive: true, + }, + }, + }); + + await withdrawViaBankV2(t, { + walletClient: w1, + bank, + exchange, + amount: "TESTKUDOS:20", + }); + + await w1.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); + + // Prevent the wallet from doing refreshes by injecting a 5xx + // status for all refresh requests. + await w1.call(WalletApiOperation.ApplyDevExperiment, { + devExperimentUri: "taler://dev-experiment/start-block-refresh", + }); + + // FIXME: Now force a refresh, check balance +} + +runWalletRefreshBlockedTest.suites = ["wallet"]; diff --git a/packages/taler-harness/src/integrationtests/testrunner.ts b/packages/taler-harness/src/integrationtests/testrunner.ts index 380251b76..2bca91e45 100644 --- a/packages/taler-harness/src/integrationtests/testrunner.ts +++ b/packages/taler-harness/src/integrationtests/testrunner.ts @@ -101,6 +101,7 @@ import { runWalletGenDbTest } from "./test-wallet-gendb.js"; import { runWalletInsufficientBalanceTest } from "./test-wallet-insufficient-balance.js"; import { runWalletNotificationsTest } from "./test-wallet-notifications.js"; import { runWalletObservabilityTest } from "./test-wallet-observability.js"; +import { runWalletRefreshBlockedTest } from "./test-wallet-refresh-blocked.js"; import { runWalletRefreshTest } from "./test-wallet-refresh.js"; import { runWalletWirefeesTest } from "./test-wallet-wirefees.js"; import { runWallettestingTest } from "./test-wallettesting.js"; @@ -212,6 +213,7 @@ const allTests: TestMainFunction[] = [ runWalletWirefeesTest, runDenomLostTest, runWalletDenomExpireTest, + runWalletRefreshBlockedTest, ]; export interface TestRunSpec { diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index eb06b8eb0..5b7ceeead 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -42,8 +42,10 @@ import { } from "@gnu-taler/taler-util"; import { BackupProviderRecord, + CoinAvailabilityRecord, CoinRecord, DbPreciseTimestamp, + DenominationRecord, DepositGroupRecord, ExchangeEntryDbRecordStatus, ExchangeEntryDbUpdateStatus, @@ -145,7 +147,13 @@ export async function makeCoinAvailable( export async function spendCoins( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< - ["coins", "coinAvailability", "refreshGroups", "denominations"] + [ + "coins", + "coinAvailability", + "refreshGroups", + "refreshSessions", + "denominations", + ] >, csi: CoinsSpendInfo, ): Promise { diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 487927b8f..de22d78a8 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -2198,6 +2198,11 @@ export interface CoinAvailabilityRecord { * a final state. */ visibleCoinCount: number; + + /** + * Number of coins that we expect to obtain via a pending refresh. + */ + pendingRefreshOutputCount?: number; } export interface ContractTermsRecord { diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 6c04b20de..a8612744f 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -489,6 +489,7 @@ async function refundDepositGroup( [ "depositGroups", "refreshGroups", + "refreshSessions", "coins", "denominations", "coinAvailability", @@ -755,9 +756,9 @@ async function processDepositGroupPendingTrack( let updatedTxStatus: DepositElementStatus | undefined = undefined; let newWiredCoin: | { - id: string; - value: DepositTrackingInfo; - } + id: string; + value: DepositTrackingInfo; + } | undefined; if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) { @@ -1448,6 +1449,7 @@ export async function createDepositGroup( "recoupGroups", "denominations", "refreshGroups", + "refreshSessions", "coinAvailability", "contractTerms", ], diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index 7fb387a9e..0fbe7297c 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -1614,6 +1614,7 @@ export async function updateExchangeFromUrlHandler( "denominations", "coinAvailability", "refreshGroups", + "refreshSessions", "exchanges", ], async (tx) => { diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 812d32429..3b58c1e0a 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -272,6 +272,7 @@ export class PayMerchantTransactionContext implements TransactionContext { [ "purchases", "refreshGroups", + "refreshSessions", "denominations", "coinAvailability", "coins", @@ -1175,6 +1176,7 @@ async function handleInsufficientFunds( "coinAvailability", "denominations", "refreshGroups", + "refreshSessions", ], async (tx) => { const p = await tx.purchases.get(proposalId); @@ -1854,6 +1856,7 @@ export async function confirmPay( "purchases", "coins", "refreshGroups", + "refreshSessions", "denominations", "coinAvailability", ], @@ -3047,6 +3050,7 @@ async function storeRefunds( "coins", "coinAvailability", "refreshGroups", + "refreshSessions", ], async (tx) => { const myPurchase = await tx.purchases.get(purchase.proposalId); diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 6cc552714..da68d7839 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -234,6 +234,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { "coinAvailability", "denominations", "refreshGroups", + "refreshSessions", "coins", "coinAvailability", ], @@ -609,6 +610,7 @@ export async function confirmPeerPullDebit( "coins", "denominations", "refreshGroups", + "refreshSessions", "peerPullDebit", "coinAvailability", ], diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index ab80888eb..20001e040 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -588,6 +588,7 @@ async function processPeerPushDebitAbortingDeletePurse( [ "peerPushDebit", "refreshGroups", + "refreshSessions", "denominations", "coinAvailability", "coins", @@ -821,6 +822,7 @@ async function processPeerPushDebitReady( [ "peerPushDebit", "refreshGroups", + "refreshSessions", "denominations", "coinAvailability", "coins", @@ -971,6 +973,7 @@ export async function initiatePeerPushDebit( "coinAvailability", "denominations", "refreshGroups", + "refreshSessions", "peerPushDebit", ], async (tx) => { diff --git a/packages/taler-wallet-core/src/recoup.ts b/packages/taler-wallet-core/src/recoup.ts index b8b2cf808..758ba106d 100644 --- a/packages/taler-wallet-core/src/recoup.ts +++ b/packages/taler-wallet-core/src/recoup.ts @@ -390,6 +390,7 @@ export async function processRecoupGroup( "coinAvailability", "denominations", "refreshGroups", + "refreshSessions", "coins", ], async (tx) => { diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts index 516d5e3da..490b1b5f5 100644 --- a/packages/taler-wallet-core/src/refresh.ts +++ b/packages/taler-wallet-core/src/refresh.ts @@ -66,7 +66,6 @@ import { } from "@gnu-taler/taler-util/http"; import { constructTaskIdentifier, - makeCoinAvailable, makeCoinsVisible, PendingTaskType, TaskIdStr, @@ -82,6 +81,7 @@ import { } from "./crypto/cryptoTypes.js"; import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { + CoinAvailabilityRecord, CoinRecord, CoinSourceType, DenominationRecord, @@ -305,192 +305,130 @@ export function getTotalRefreshCost( return totalCost; } -function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } { - const allFinal = fnutil.all( - rg.statusPerCoin, - (x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed, - ); - const anyFailed = fnutil.any( - rg.statusPerCoin, - (x) => x === RefreshCoinStatus.Failed, - ); - if (allFinal) { - if (anyFailed) { - rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now()); - rg.operationStatus = RefreshOperationStatus.Failed; - } else { - rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now()); - rg.operationStatus = RefreshOperationStatus.Finished; - } - return { final: true }; +export async function getCoinAvailabilityForDenom( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["coins", "coinAvailability", "denominations"] + >, + denom: DenominationInfo, + ageRestriction: number, +): Promise { + checkDbInvariant(!!denom); + let car = await tx.coinAvailability.get([ + denom.exchangeBaseUrl, + denom.denomPubHash, + ageRestriction, + ]); + if (!car) { + car = { + maxAge: ageRestriction, + value: denom.value, + currency: Amounts.currencyOf(denom.value), + denomPubHash: denom.denomPubHash, + exchangeBaseUrl: denom.exchangeBaseUrl, + freshCoinCount: 0, + visibleCoinCount: 0, + }; } - return { final: false }; + return car; } /** * Create a refresh session for one particular coin inside a refresh group. - * - * If the session already exists, return the existing one. - * - * If the session doesn't need to be created (refresh group gone or session already - * finished), return undefined. */ -async function provideRefreshSession( +async function initRefreshSession( wex: WalletExecutionContext, - refreshGroupId: string, + tx: WalletDbReadWriteTransaction< + ["refreshSessions", "coinAvailability", "coins", "denominations"] + >, + refreshGroup: RefreshGroupRecord, coinIndex: number, -): Promise { +): Promise { + const refreshGroupId = refreshGroup.refreshGroupId; logger.trace( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); - - const d = await wex.db.runReadWriteTx( - ["coins", "refreshGroups", "refreshSessions"], - async (tx) => { - const refreshGroup = await tx.refreshGroups.get(refreshGroupId); - if (!refreshGroup) { - return; - } - if ( - refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished - ) { - return; - } - const existingRefreshSession = await tx.refreshSessions.get([ - refreshGroupId, - coinIndex, - ]); - const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; - const coin = await tx.coins.get(oldCoinPub); - if (!coin) { - throw Error("Can't refresh, coin not found"); - } - return { refreshGroup, coin, existingRefreshSession }; - }, - ); - - if (!d) { - return undefined; - } - - if (d.existingRefreshSession) { - return d.existingRefreshSession; + const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; + const oldCoin = await tx.coins.get(oldCoinPub); + if (!oldCoin) { + throw Error("Can't refresh, coin not found"); } - const { refreshGroup, coin } = d; - - const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl); + const exchangeBaseUrl = oldCoin.exchangeBaseUrl; - // FIXME: use helper functions from withdraw.ts - // to update and filter withdrawable denoms. + const sessionSecretSeed = encodeCrock(getRandomBytes(64)); - const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx( - ["denominations"], - async (tx) => { - const oldDenom = await getDenomInfo( - wex, - tx, - exch.exchangeBaseUrl, - coin.denomPubHash, - ); + const oldDenom = await getDenomInfo( + wex, + tx, + exchangeBaseUrl, + oldCoin.denomPubHash, + ); - if (!oldDenom) { - throw Error("db inconsistent: denomination for coin not found"); - } + if (!oldDenom) { + throw Error("db inconsistent: denomination for coin not found"); + } - // FIXME: Use denom groups instead of querying all denominations! - const availableDenoms: DenominationRecord[] = - await tx.denominations.indexes.byExchangeBaseUrl.getAll( - exch.exchangeBaseUrl, - ); + const currency = refreshGroup.currency; - const availableAmount = Amounts.sub( - refreshGroup.inputPerCoin[coinIndex], - oldDenom.feeRefresh, - ).amount; - return { availableAmount, availableDenoms }; - }, + const availableDenoms = await getCandidateWithdrawalDenomsTx( + wex, + tx, + exchangeBaseUrl, + currency, ); + const availableAmount = Amounts.sub( + refreshGroup.inputPerCoin[coinIndex], + oldDenom.feeRefresh, + ).amount; + const newCoinDenoms = selectWithdrawalDenominations( availableAmount, availableDenoms, wex.ws.config.testing.denomselAllowLate, ); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Refresh, - refreshGroupId, - }); - if (newCoinDenoms.selectedDenoms.length === 0) { logger.trace( `not refreshing, available amount ${amountToPretty( availableAmount, )} too small`, ); - const transitionInfo = await wex.db.runReadWriteTx( - ["refreshGroups", "coins", "coinAvailability"], - async (tx) => { - const rg = await tx.refreshGroups.get(refreshGroupId); - if (!rg) { - return; - } - const oldTxState = computeRefreshTransactionState(rg); - rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; - const updateRes = updateGroupStatus(rg); - if (updateRes.final) { - await makeCoinsVisible(wex, tx, transactionId); - } - await tx.refreshGroups.put(rg); - const newTxState = computeRefreshTransactionState(rg); - return { oldTxState, newTxState }; - }, - ); - wex.ws.notify({ - type: NotificationType.BalanceChange, - hintTransactionId: transactionId, - }); - notifyTransition(wex, transactionId, transitionInfo); - return; + refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; } - const sessionSecretSeed = encodeCrock(getRandomBytes(64)); + for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) { + const dph = newCoinDenoms.selectedDenoms[i].denomPubHash; + const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph); + if (!denom) { + logger.error(`denom ${dph} not in DB`); + continue; + } + const car = await getCoinAvailabilityForDenom( + wex, + tx, + denom, + oldCoin.maxAge, + ); + car.pendingRefreshOutputCount = + (car.pendingRefreshOutputCount ?? 0) + + newCoinDenoms.selectedDenoms[i].count; + await tx.coinAvailability.put(car); + } - // Store refresh session for this coin in the database. - const mySession = await wex.db.runReadWriteTx( - ["refreshGroups", "refreshSessions"], - async (tx) => { - const rg = await tx.refreshGroups.get(refreshGroupId); - if (!rg) { - return; - } - const existingSession = await tx.refreshSessions.get([ - refreshGroupId, - coinIndex, - ]); - if (existingSession) { - return existingSession; - } - const newSession: RefreshSessionRecord = { - coinIndex, - refreshGroupId, - norevealIndex: undefined, - sessionSecretSeed: sessionSecretSeed, - newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ - count: x.count, - denomPubHash: x.denomPubHash, - })), - amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), - }; - await tx.refreshSessions.put(newSession); - return newSession; - }, - ); - logger.trace( - `found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`, - ); - return mySession; + const newSession: RefreshSessionRecord = { + coinIndex, + refreshGroupId, + norevealIndex: undefined, + sessionSecretSeed: sessionSecretSeed, + newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ + count: x.count, + denomPubHash: x.denomPubHash, + })), + amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), + }; + await tx.refreshSessions.put(newSession); } function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { @@ -499,6 +437,14 @@ function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { }); } +/** + * Run the melt step of a refresh session. + * + * If the melt step succeeds or fails permanently, + * the status in the refresh group is updated. + * + * When a transient error occurs, an exception is thrown. + */ async function refreshMelt( wex: WalletExecutionContext, refreshGroupId: string, @@ -627,7 +573,7 @@ async function refreshMelt( if (resp.status === HttpStatusCode.NotFound) { const errDetails = await readUnexpectedResponseDetails(resp); - const transitionInfo = await wex.db.runReadWriteTx( + await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "coinAvailability"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -640,7 +586,6 @@ async function refreshMelt( if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } - const oldTxState = computeRefreshTransactionState(rg); rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ refreshGroupId, @@ -652,24 +597,10 @@ async function refreshMelt( ); } refreshSession.lastError = errDetails; - const updateRes = updateGroupStatus(rg); - if (updateRes.final) { - await makeCoinsVisible(wex, tx, transactionId); - } await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); - const newTxState = computeRefreshTransactionState(rg); - return { - oldTxState, - newTxState, - }; }, ); - wex.ws.notify({ - type: NotificationType.BalanceChange, - hintTransactionId: transactionId, - }); - notifyTransition(wex, transactionId, transitionInfo); return; } @@ -718,6 +649,7 @@ async function refreshMelt( const input = Amounts.parseOrThrow(rg.inputPerCoin[rs.coinIndex]); const newSel = selectWithdrawalDenominations(input, candidates); rs.amountRefreshOutput = newSel.totalCoinValue; + // FIXME: This is wrong! When denoms are re-selected, the melt commitment breaks. rs.newDenoms = newSel.selectedDenoms.map((x) => ({ count: x.count, denomPubHash: x.denomPubHash, @@ -1043,7 +975,7 @@ async function refreshReveal( } } - const transitionInfo = await wex.db.runReadWriteTx( + await wex.db.runReadWriteTx( [ "coins", "denominations", @@ -1061,19 +993,32 @@ async function refreshReveal( if (!rs) { return; } - const oldTxState = computeRefreshTransactionState(rg); rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; - updateGroupStatus(rg); for (const coin of coins) { - await makeCoinAvailable(wex, tx, coin); + const denomInfo = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant(!!denomInfo); + const car = await getCoinAvailabilityForDenom( + wex, + tx, + denomInfo, + coin.maxAge, + ); + checkDbInvariant( + car.pendingRefreshOutputCount != null && + car.pendingRefreshOutputCount > 0, + ); + car.pendingRefreshOutputCount--; + car.freshCoinCount++; + await tx.coinAvailability.put(car); } - await makeCoinsVisible(wex, tx, transactionId); await tx.refreshGroups.put(rg); - const newTxState = computeRefreshTransactionState(rg); - return { oldTxState, newTxState }; }, ); - notifyTransition(wex, transactionId, transitionInfo); logger.trace("refresh finished (end of reveal)"); } @@ -1121,17 +1066,71 @@ export async function processRefreshGroup( errors.push(getErrorDetailFromException(x)); }), ); - try { - logger.info("waiting for refreshes"); - await Promise.all(ps); - logger.info("refresh group finished"); - } catch (e) { - logger.warn("process refresh sessions got exception"); - logger.warn(`exception: ${e}`); - } + await Promise.all(ps); if (inShutdown) { - return TaskRunResult.backoff(); + return TaskRunResult.finished(); } + + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + + // We've processed all refresh session and can now update the + // status of the whole refresh group. + + const transitionInfo = await wex.db.runReadWriteTx( + ["coins", "coinAvailability", "refreshGroups"], + async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (!rg) { + return; + } + switch (rg.operationStatus) { + case RefreshOperationStatus.Pending: + break; + default: + return undefined; + } + const oldTxState = computeRefreshTransactionState(rg); + const allFinal = fnutil.all( + rg.statusPerCoin, + (x) => + x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed, + ); + const anyFailed = fnutil.any( + rg.statusPerCoin, + (x) => x === RefreshCoinStatus.Failed, + ); + if (allFinal) { + if (anyFailed) { + rg.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + rg.operationStatus = RefreshOperationStatus.Failed; + } else { + rg.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + rg.operationStatus = RefreshOperationStatus.Finished; + } + } + if (allFinal) { + await makeCoinsVisible(wex, tx, ctx.transactionId); + await tx.refreshGroups.put(rg); + const newTxState = computeRefreshTransactionState(rg); + return { + oldTxState, + newTxState, + }; + } else { + return undefined; + } + }, + ); + + if (transitionInfo) { + notifyTransition(wex, ctx.transactionId, transitionInfo); + return TaskRunResult.progress(); + } + if (errors.length > 0) { return { type: TaskRunResultType.Error, @@ -1174,16 +1173,7 @@ async function processRefreshSession( return; } if (!refreshSession) { - refreshSession = await provideRefreshSession( - wex, - refreshGroupId, - coinIndex, - ); - } - if (!refreshSession) { - // We tried to create the refresh session, but didn't get a result back. - // This means that either the session is finished, or that creating - // one isn't necessary. + // No refresh session for that coin. return; } if (refreshSession.norevealIndex === undefined) { @@ -1268,7 +1258,7 @@ export async function calculateRefreshOutput( }; } -async function applyRefresh( +async function applyRefreshToOldCoins( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] @@ -1347,20 +1337,29 @@ export interface CreateRefreshGroupResult { export async function createRefreshGroup( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< - ["denominations", "coins", "refreshGroups", "coinAvailability"] + [ + "denominations", + "coins", + "refreshGroups", + "refreshSessions", + "coinAvailability", + ] >, currency: string, oldCoinPubs: CoinRefreshRequest[], refreshReason: RefreshReason, originatingTransactionId: string | undefined, ): Promise { + // FIXME: Check that involved exchanges are reasonably up-to-date. + // Otherwise, error out. + const refreshGroupId = encodeCrock(getRandomBytes(32)); const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs); const estimatedOutputPerCoin = outInfo.outputPerCoin; - await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId); + await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId); const refreshGroup: RefreshGroupRecord = { operationStatus: RefreshOperationStatus.Pending, @@ -1387,6 +1386,10 @@ export async function createRefreshGroup( refreshGroup.operationStatus = RefreshOperationStatus.Finished; } + for (let i = 0; i < oldCoinPubs.length; i++) { + await initRefreshSession(wex, tx, refreshGroup, i); + } + await tx.refreshGroups.put(refreshGroup); const newTxState = computeRefreshTransactionState(refreshGroup); @@ -1487,7 +1490,13 @@ export async function forceRefresh( throw Error("refusing to create empty refresh group"); } const res = await wex.db.runReadWriteTx( - ["refreshGroups", "coinAvailability", "denominations", "coins"], + [ + "refreshGroups", + "coinAvailability", + "refreshSessions", + "denominations", + "coins", + ], async (tx) => { let coinPubs: CoinRefreshRequest[] = []; for (const c of req.coinPubList) { -- cgit v1.2.3