From 599c8380f23584c24633927cafe8277fb9e41579 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 13 Aug 2020 15:15:01 +0530 Subject: make withdrawal requests sequentially, clean up withdrawal logic a bit --- .../taler-wallet-core/src/operations/reserves.ts | 1 - packages/taler-wallet-core/src/operations/tip.ts | 2 +- .../taler-wallet-core/src/operations/withdraw.ts | 273 +++++++++++++-------- 3 files changed, 178 insertions(+), 98 deletions(-) (limited to 'packages/taler-wallet-core/src/operations') diff --git a/packages/taler-wallet-core/src/operations/reserves.ts b/packages/taler-wallet-core/src/operations/reserves.ts index 58095affd..060226cab 100644 --- a/packages/taler-wallet-core/src/operations/reserves.ts +++ b/packages/taler-wallet-core/src/operations/reserves.ts @@ -758,7 +758,6 @@ async function depleteReserve( rawWithdrawalAmount: withdrawAmount, timestampStart: getTimestampNow(), retryInfo: initRetryInfo(), - lastErrorPerCoin: {}, lastError: undefined, denomsSel: denomSelectionInfoToState(denomsForWithdraw), }; diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts index d6768bdb6..84cfa570a 100644 --- a/packages/taler-wallet-core/src/operations/tip.ts +++ b/packages/taler-wallet-core/src/operations/tip.ts @@ -281,6 +281,7 @@ async function processTipImpl( coinIdx: i, withdrawalDone: false, withdrawalGroupId: withdrawalGroupId, + lastError: undefined, }; planchets.push(planchet); } @@ -294,7 +295,6 @@ async function processTipImpl( timestampStart: getTimestampNow(), withdrawalGroupId: withdrawalGroupId, rawWithdrawalAmount: tipRecord.amount, - lastErrorPerCoin: {}, retryInfo: initRetryInfo(), timestampFinish: undefined, lastError: undefined, diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 9719772a7..a72a70827 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -39,6 +39,7 @@ import { codecForWithdrawOperationStatusResponse, codecForWithdrawResponse, WithdrawUriInfoResponse, + WithdrawResponse, } from "../types/talerTypes"; import { InternalWalletState } from "./state"; import { parseWithdrawUri } from "../util/taleruri"; @@ -47,7 +48,7 @@ import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges"; import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "./versions"; import * as LibtoolVersion from "../util/libtoolVersion"; -import { guardOperationException } from "./errors"; +import { guardOperationException, makeErrorDetails, OperationFailedError } from "./errors"; import { NotificationType } from "../types/notifications"; import { getTimestampNow, @@ -57,6 +58,7 @@ import { } from "../util/time"; import { readSuccessResponseJsonOrThrow } from "../util/http"; import { URL } from "../util/url"; +import { TalerErrorCode } from "../TalerErrorCode"; const logger = new Logger("withdraw.ts"); @@ -184,9 +186,13 @@ async function getPossibleDenoms( } /** - * Given a planchet, withdraw a coin from the exchange. + * Generate a planchet for a coin index in a withdrawal group. + * Does not actually withdraw the coin yet. + * + * Split up so that we can parallelize the crypto, but serialize + * the exchange requests per reserve. */ -async function processPlanchet( +async function processPlanchetGenerate( ws: InternalWalletState, withdrawalGroupId: string, coinIdx: number, @@ -259,6 +265,7 @@ async function processPlanchet( withdrawalDone: false, withdrawSig: r.withdrawSig, withdrawalGroupId: withdrawalGroupId, + lastError: undefined, }; await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { const p = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [ @@ -273,8 +280,31 @@ async function processPlanchet( planchet = newPlanchet; }); } +} + +/** + * Send the withdrawal request for a generated planchet to the exchange. + * + * The verification of the response is done asynchronously to enable parallelism. + */ +async function processPlanchetExchangeRequest( + ws: InternalWalletState, + withdrawalGroupId: string, + coinIdx: number, +): Promise { + const withdrawalGroup = await ws.db.get( + Stores.withdrawalGroups, + withdrawalGroupId, + ); + if (!withdrawalGroup) { + return; + } + let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ + withdrawalGroupId, + coinIdx, + ]); if (!planchet) { - throw Error("invariant violated"); + return; } if (planchet.withdrawalDone) { logger.warn("processPlanchet: planchet already withdrawn"); @@ -313,16 +343,62 @@ async function processPlanchet( exchange.baseUrl, ).href; - const resp = await ws.http.postJson(reqUrl, wd); - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawResponse(), - ); + try { + const resp = await ws.http.postJson(reqUrl, wd); + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawResponse(), + ); + + logger.trace(`got response for /withdraw`); + return r; + } catch (e) { + if (!(e instanceof OperationFailedError)) { + throw e; + } + const errDetails = e.operationError; + await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { + let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = errDetails; + await tx.put(Stores.planchets, planchet); + }); + return; + } +} - logger.trace(`got response for /withdraw`); +async function processPlanchetVerifyAndStoreCoin( + ws: InternalWalletState, + withdrawalGroupId: string, + coinIdx: number, + resp: WithdrawResponse, +): Promise { + const withdrawalGroup = await ws.db.get( + Stores.withdrawalGroups, + withdrawalGroupId, + ); + if (!withdrawalGroup) { + return; + } + let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + if (planchet.withdrawalDone) { + logger.warn("processPlanchet: planchet already withdrawn"); + return; + } const denomSig = await ws.cryptoApi.rsaUnblind( - r.ev_sig, + resp.ev_sig, planchet.blindingKey, planchet.denomPub, ); @@ -334,11 +410,24 @@ async function processPlanchet( ); if (!isValid) { - throw Error("invalid RSA signature by the exchange"); + await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { + let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = makeErrorDetails( + TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID, + "invalid signature from the exchange after unblinding", + {}, + ); + await tx.put(Stores.planchets, planchet); + }); + return; } - logger.trace(`unblinded and verified`); - const coin: CoinRecord = { blindingKey: planchet.blindingKey, coinPriv: planchet.coinPriv, @@ -358,11 +447,9 @@ async function processPlanchet( suspended: false, }; - let withdrawalGroupFinished = false; - const planchetCoinPub = planchet.coinPub; - const success = await ws.db.runWithWriteTransaction( + const firstSuccess = await ws.db.runWithWriteTransaction( [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets], async (tx) => { const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId); @@ -370,64 +457,21 @@ async function processPlanchet( return false; } const p = await tx.get(Stores.planchets, planchetCoinPub); - if (!p) { - return false; - } - if (p.withdrawalDone) { - // Already withdrawn + if (!p || p.withdrawalDone) { return false; } p.withdrawalDone = true; await tx.put(Stores.planchets, p); - - let numTotal = 0; - - for (const ds of ws.denomsSel.selectedDenoms) { - numTotal += ds.count; - } - - let numDone = 0; - - await tx - .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId) - .forEach((x) => { - if (x.withdrawalDone) { - numDone++; - } - }); - - if (numDone > numTotal) { - throw Error( - "invariant violated (created more planchets than expected)", - ); - } - - if (numDone == numTotal) { - ws.timestampFinish = getTimestampNow(); - ws.lastError = undefined; - ws.retryInfo = initRetryInfo(false); - withdrawalGroupFinished = true; - } - await tx.put(Stores.withdrawalGroups, ws); await tx.add(Stores.coins, coin); return true; }, ); - logger.trace(`withdrawal result stored in DB`); - - if (success) { + if (firstSuccess) { ws.notify({ type: NotificationType.CoinWithdrawn, }); } - - if (withdrawalGroupFinished) { - ws.notify({ - type: NotificationType.WithdrawGroupFinished, - withdrawalSource: withdrawalGroup.source, - }); - } } export function denomSelectionInfoToState( @@ -552,27 +596,6 @@ async function resetWithdrawalGroupRetry( }); } -async function processInBatches( - workGen: Iterator>, - batchSize: number, -): Promise { - for (;;) { - const batch: Promise[] = []; - for (let i = 0; i < batchSize; i++) { - const wn = workGen.next(); - if (wn.done) { - break; - } - batch.push(wn.value); - } - if (batch.length == 0) { - break; - } - logger.trace(`processing withdrawal batch of ${batch.length} elements`); - await Promise.all(batch); - } -} - async function processWithdrawGroupImpl( ws: InternalWalletState, withdrawalGroupId: string, @@ -591,21 +614,79 @@ async function processWithdrawGroupImpl( return; } - const numDenoms = withdrawalGroup.denomsSel.selectedDenoms.length; - const genWork = function* (): Iterator> { - let coinIdx = 0; - for (let i = 0; i < numDenoms; i++) { - const count = withdrawalGroup.denomsSel.selectedDenoms[i].count; - for (let j = 0; j < count; j++) { - yield processPlanchet(ws, withdrawalGroupId, coinIdx); - coinIdx++; - } + const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms + .map((x) => x.count) + .reduce((a, b) => a + b); + + let work: Promise[] = []; + + for (let i = 0; i < numTotalCoins; i++) { + work.push(processPlanchetGenerate(ws, withdrawalGroupId, i)); + } + + // Generate coins concurrently (parallelism only happens in the crypto API workers) + await Promise.all(work); + + work = []; + + for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { + const resp = await processPlanchetExchangeRequest( + ws, + withdrawalGroupId, + coinIdx, + ); + if (!resp) { + continue; } - }; + work.push( + processPlanchetVerifyAndStoreCoin(ws, withdrawalGroupId, coinIdx, resp), + ); + } - // Withdraw coins in batches. - // The batch size is relatively large - await processInBatches(genWork(), 10); + await Promise.all(work); + + let numFinished = 0; + let finishedForFirstTime = false; + + await ws.db.runWithWriteTransaction( + [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets], + async (tx) => { + const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId); + if (!ws) { + return; + } + + await tx + .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId) + .forEach((x) => { + if (x.withdrawalDone) { + numFinished++; + } + }); + + if (ws.timestampFinish === undefined && numFinished == numTotalCoins) { + finishedForFirstTime = true; + ws.timestampFinish = getTimestampNow(); + ws.lastError = undefined; + ws.retryInfo = initRetryInfo(false); + } + await tx.put(Stores.withdrawalGroups, ws); + }, + ); + + if (numFinished != numTotalCoins) { + // FIXME: aggregate individual problems into the big error message here. + throw Error( + `withdrawal did not finish (${numFinished} / ${numTotalCoins} coins withdrawn)`, + ); + } + + if (finishedForFirstTime) { + ws.notify({ + type: NotificationType.WithdrawGroupFinished, + withdrawalSource: withdrawalGroup.source, + }); + } } export async function getExchangeWithdrawalInfo( -- cgit v1.2.3