summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/withdraw.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/withdraw.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts273
1 files changed, 177 insertions, 96 deletions
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 9719772a7..a72a70827 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -39,6 +39,7 @@ import {
codecForWithdrawOperationStatusResponse,
codecForWithdrawResponse,
WithdrawUriInfoResponse,
+ WithdrawResponse,
} from "../types/talerTypes";
import { InternalWalletState } from "./state";
import { parseWithdrawUri } from "../util/taleruri";
@@ -47,7 +48,7 @@ import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "./versions";
import * as LibtoolVersion from "../util/libtoolVersion";
-import { guardOperationException } from "./errors";
+import { guardOperationException, makeErrorDetails, OperationFailedError } from "./errors";
import { NotificationType } from "../types/notifications";
import {
getTimestampNow,
@@ -57,6 +58,7 @@ import {
} from "../util/time";
import { readSuccessResponseJsonOrThrow } from "../util/http";
import { URL } from "../util/url";
+import { TalerErrorCode } from "../TalerErrorCode";
const logger = new Logger("withdraw.ts");
@@ -184,9 +186,13 @@ async function getPossibleDenoms(
}
/**
- * Given a planchet, withdraw a coin from the exchange.
+ * Generate a planchet for a coin index in a withdrawal group.
+ * Does not actually withdraw the coin yet.
+ *
+ * Split up so that we can parallelize the crypto, but serialize
+ * the exchange requests per reserve.
*/
-async function processPlanchet(
+async function processPlanchetGenerate(
ws: InternalWalletState,
withdrawalGroupId: string,
coinIdx: number,
@@ -259,6 +265,7 @@ async function processPlanchet(
withdrawalDone: false,
withdrawSig: r.withdrawSig,
withdrawalGroupId: withdrawalGroupId,
+ lastError: undefined,
};
await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
const p = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [
@@ -273,8 +280,31 @@ async function processPlanchet(
planchet = newPlanchet;
});
}
+}
+
+/**
+ * Send the withdrawal request for a generated planchet to the exchange.
+ *
+ * The verification of the response is done asynchronously to enable parallelism.
+ */
+async function processPlanchetExchangeRequest(
+ ws: InternalWalletState,
+ withdrawalGroupId: string,
+ coinIdx: number,
+): Promise<WithdrawResponse | undefined> {
+ const withdrawalGroup = await ws.db.get(
+ Stores.withdrawalGroups,
+ withdrawalGroupId,
+ );
+ if (!withdrawalGroup) {
+ return;
+ }
+ let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
+ withdrawalGroupId,
+ coinIdx,
+ ]);
if (!planchet) {
- throw Error("invariant violated");
+ return;
}
if (planchet.withdrawalDone) {
logger.warn("processPlanchet: planchet already withdrawn");
@@ -313,16 +343,62 @@ async function processPlanchet(
exchange.baseUrl,
).href;
- const resp = await ws.http.postJson(reqUrl, wd);
- const r = await readSuccessResponseJsonOrThrow(
- resp,
- codecForWithdrawResponse(),
- );
+ try {
+ const resp = await ws.http.postJson(reqUrl, wd);
+ const r = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForWithdrawResponse(),
+ );
+
+ logger.trace(`got response for /withdraw`);
+ return r;
+ } catch (e) {
+ if (!(e instanceof OperationFailedError)) {
+ throw e;
+ }
+ const errDetails = e.operationError;
+ await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
+ let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = errDetails;
+ await tx.put(Stores.planchets, planchet);
+ });
+ return;
+ }
+}
- logger.trace(`got response for /withdraw`);
+async function processPlanchetVerifyAndStoreCoin(
+ ws: InternalWalletState,
+ withdrawalGroupId: string,
+ coinIdx: number,
+ resp: WithdrawResponse,
+): Promise<void> {
+ const withdrawalGroup = await ws.db.get(
+ Stores.withdrawalGroups,
+ withdrawalGroupId,
+ );
+ if (!withdrawalGroup) {
+ return;
+ }
+ let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ if (planchet.withdrawalDone) {
+ logger.warn("processPlanchet: planchet already withdrawn");
+ return;
+ }
const denomSig = await ws.cryptoApi.rsaUnblind(
- r.ev_sig,
+ resp.ev_sig,
planchet.blindingKey,
planchet.denomPub,
);
@@ -334,11 +410,24 @@ async function processPlanchet(
);
if (!isValid) {
- throw Error("invalid RSA signature by the exchange");
+ await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
+ let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = makeErrorDetails(
+ TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID,
+ "invalid signature from the exchange after unblinding",
+ {},
+ );
+ await tx.put(Stores.planchets, planchet);
+ });
+ return;
}
- logger.trace(`unblinded and verified`);
-
const coin: CoinRecord = {
blindingKey: planchet.blindingKey,
coinPriv: planchet.coinPriv,
@@ -358,11 +447,9 @@ async function processPlanchet(
suspended: false,
};
- let withdrawalGroupFinished = false;
-
const planchetCoinPub = planchet.coinPub;
- const success = await ws.db.runWithWriteTransaction(
+ const firstSuccess = await ws.db.runWithWriteTransaction(
[Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets],
async (tx) => {
const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId);
@@ -370,64 +457,21 @@ async function processPlanchet(
return false;
}
const p = await tx.get(Stores.planchets, planchetCoinPub);
- if (!p) {
- return false;
- }
- if (p.withdrawalDone) {
- // Already withdrawn
+ if (!p || p.withdrawalDone) {
return false;
}
p.withdrawalDone = true;
await tx.put(Stores.planchets, p);
-
- let numTotal = 0;
-
- for (const ds of ws.denomsSel.selectedDenoms) {
- numTotal += ds.count;
- }
-
- let numDone = 0;
-
- await tx
- .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId)
- .forEach((x) => {
- if (x.withdrawalDone) {
- numDone++;
- }
- });
-
- if (numDone > numTotal) {
- throw Error(
- "invariant violated (created more planchets than expected)",
- );
- }
-
- if (numDone == numTotal) {
- ws.timestampFinish = getTimestampNow();
- ws.lastError = undefined;
- ws.retryInfo = initRetryInfo(false);
- withdrawalGroupFinished = true;
- }
- await tx.put(Stores.withdrawalGroups, ws);
await tx.add(Stores.coins, coin);
return true;
},
);
- logger.trace(`withdrawal result stored in DB`);
-
- if (success) {
+ if (firstSuccess) {
ws.notify({
type: NotificationType.CoinWithdrawn,
});
}
-
- if (withdrawalGroupFinished) {
- ws.notify({
- type: NotificationType.WithdrawGroupFinished,
- withdrawalSource: withdrawalGroup.source,
- });
- }
}
export function denomSelectionInfoToState(
@@ -552,27 +596,6 @@ async function resetWithdrawalGroupRetry(
});
}
-async function processInBatches(
- workGen: Iterator<Promise<void>>,
- batchSize: number,
-): Promise<void> {
- for (;;) {
- const batch: Promise<void>[] = [];
- for (let i = 0; i < batchSize; i++) {
- const wn = workGen.next();
- if (wn.done) {
- break;
- }
- batch.push(wn.value);
- }
- if (batch.length == 0) {
- break;
- }
- logger.trace(`processing withdrawal batch of ${batch.length} elements`);
- await Promise.all(batch);
- }
-}
-
async function processWithdrawGroupImpl(
ws: InternalWalletState,
withdrawalGroupId: string,
@@ -591,21 +614,79 @@ async function processWithdrawGroupImpl(
return;
}
- const numDenoms = withdrawalGroup.denomsSel.selectedDenoms.length;
- const genWork = function* (): Iterator<Promise<void>> {
- let coinIdx = 0;
- for (let i = 0; i < numDenoms; i++) {
- const count = withdrawalGroup.denomsSel.selectedDenoms[i].count;
- for (let j = 0; j < count; j++) {
- yield processPlanchet(ws, withdrawalGroupId, coinIdx);
- coinIdx++;
- }
+ const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
+ .map((x) => x.count)
+ .reduce((a, b) => a + b);
+
+ let work: Promise<void>[] = [];
+
+ for (let i = 0; i < numTotalCoins; i++) {
+ work.push(processPlanchetGenerate(ws, withdrawalGroupId, i));
+ }
+
+ // Generate coins concurrently (parallelism only happens in the crypto API workers)
+ await Promise.all(work);
+
+ work = [];
+
+ for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
+ const resp = await processPlanchetExchangeRequest(
+ ws,
+ withdrawalGroupId,
+ coinIdx,
+ );
+ if (!resp) {
+ continue;
}
- };
+ work.push(
+ processPlanchetVerifyAndStoreCoin(ws, withdrawalGroupId, coinIdx, resp),
+ );
+ }
- // Withdraw coins in batches.
- // The batch size is relatively large
- await processInBatches(genWork(), 10);
+ await Promise.all(work);
+
+ let numFinished = 0;
+ let finishedForFirstTime = false;
+
+ await ws.db.runWithWriteTransaction(
+ [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets],
+ async (tx) => {
+ const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId);
+ if (!ws) {
+ return;
+ }
+
+ await tx
+ .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId)
+ .forEach((x) => {
+ if (x.withdrawalDone) {
+ numFinished++;
+ }
+ });
+
+ if (ws.timestampFinish === undefined && numFinished == numTotalCoins) {
+ finishedForFirstTime = true;
+ ws.timestampFinish = getTimestampNow();
+ ws.lastError = undefined;
+ ws.retryInfo = initRetryInfo(false);
+ }
+ await tx.put(Stores.withdrawalGroups, ws);
+ },
+ );
+
+ if (numFinished != numTotalCoins) {
+ // FIXME: aggregate individual problems into the big error message here.
+ throw Error(
+ `withdrawal did not finish (${numFinished} / ${numTotalCoins} coins withdrawn)`,
+ );
+ }
+
+ if (finishedForFirstTime) {
+ ws.notify({
+ type: NotificationType.WithdrawGroupFinished,
+ withdrawalSource: withdrawalGroup.source,
+ });
+ }
}
export async function getExchangeWithdrawalInfo(