summaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-02-10 13:21:37 +0100
committerFlorian Dold <florian@dold.me>2023-02-10 13:21:37 +0100
commit18c30b9a00a4e5dee629f4e06c261509ff7ba455 (patch)
treed6a546c98d86b15cfab017233f3ddc682822ff62 /packages
parentc4180e1290261c15d4c8cd081aec12252edf1939 (diff)
downloadwallet-core-18c30b9a00a4e5dee629f4e06c261509ff7ba455.tar.gz
wallet-core-18c30b9a00a4e5dee629f4e06c261509ff7ba455.tar.bz2
wallet-core-18c30b9a00a4e5dee629f4e06c261509ff7ba455.zip
wallet-core: implement partial withdrawal batching, don't block when generating planchets
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-harness/src/harness/harness.ts10
-rw-r--r--packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts3
-rw-r--r--packages/taler-util/src/taler-types.ts19
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts378
4 files changed, 207 insertions, 203 deletions
diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts
index 3403c266e..4e5d8238c 100644
--- a/packages/taler-harness/src/harness/harness.ts
+++ b/packages/taler-harness/src/harness/harness.ts
@@ -1361,7 +1361,12 @@ export class ExchangeService implements ExchangeServiceInterface {
this.exchangeWirewatchProc = this.globalState.spawnService(
"taler-exchange-wirewatch",
- ["-c", this.configFilename, ...this.timetravelArgArr],
+ [
+ "-c",
+ this.configFilename,
+ "--longpoll-timeout=5s",
+ ...this.timetravelArgArr,
+ ],
`exchange-wirewatch-${this.name}`,
);
@@ -1951,6 +1956,9 @@ export class WalletService {
],
`wallet-${this.opts.name}`,
);
+ logger.info(
+ `hint: connect to wallet using taler-wallet-cli --wallet-connection=${unixPath}`,
+ );
}
async pingUntilAvailable(): Promise<void> {
diff --git a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
index 579d727b1..437d799b8 100644
--- a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
+++ b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
@@ -87,9 +87,10 @@ export async function runWithdrawalHugeTest(t: GlobalTestState) {
exchangeBaseUrl: exchange.baseUrl,
});
+ // Results in about 1K coins withdrawn
await wallet.client.call(WalletApiOperation.WithdrawFakebank, {
exchange: exchange.baseUrl,
- amount: "TESTKUDOS:5000",
+ amount: "TESTKUDOS:10000",
bank: bank.baseUrl,
});
diff --git a/packages/taler-util/src/taler-types.ts b/packages/taler-util/src/taler-types.ts
index a9303ed9c..bb15f0494 100644
--- a/packages/taler-util/src/taler-types.ts
+++ b/packages/taler-util/src/taler-types.ts
@@ -951,12 +951,12 @@ export const codecForBlindedDenominationSignature = () =>
.alternative(DenomKeyType.Rsa, codecForRsaBlindedDenominationSignature())
.build("BlindedDenominationSignature");
-export class WithdrawResponse {
+export class ExchangeWithdrawResponse {
ev_sig: BlindedDenominationSignature;
}
-export class WithdrawBatchResponse {
- ev_sigs: WithdrawResponse[];
+export class ExchangeWithdrawBatchResponse {
+ ev_sigs: ExchangeWithdrawResponse[];
}
export interface MerchantPayResponse {
@@ -1476,13 +1476,13 @@ export const codecForRecoupConfirmation = (): Codec<RecoupConfirmation> =>
.property("old_coin_pub", codecOptional(codecForString()))
.build("RecoupConfirmation");
-export const codecForWithdrawResponse = (): Codec<WithdrawResponse> =>
- buildCodecForObject<WithdrawResponse>()
+export const codecForWithdrawResponse = (): Codec<ExchangeWithdrawResponse> =>
+ buildCodecForObject<ExchangeWithdrawResponse>()
.property("ev_sig", codecForBlindedDenominationSignature())
.build("WithdrawResponse");
-export const codecForWithdrawBatchResponse = (): Codec<WithdrawBatchResponse> =>
- buildCodecForObject<WithdrawBatchResponse>()
+export const codecForWithdrawBatchResponse = (): Codec<ExchangeWithdrawBatchResponse> =>
+ buildCodecForObject<ExchangeWithdrawBatchResponse>()
.property("ev_sigs", codecForList(codecForWithdrawResponse()))
.build("WithdrawBatchResponse");
@@ -1753,6 +1753,11 @@ export interface ExchangeWithdrawRequest {
coin_ev: CoinEnvelope;
}
+export interface ExchangeBatchWithdrawRequest {
+ planchets: ExchangeWithdrawRequest[];
+}
+
+
export interface ExchangeRefreshRevealRequest {
new_denoms_h: HashCodeString[];
coin_evs: CoinEnvelope[];
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index caa280fe5..987a5e062 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -59,9 +59,11 @@ import {
TransactionType,
UnblindedSignature,
URL,
- WithdrawBatchResponse,
- WithdrawResponse,
+ ExchangeWithdrawBatchResponse,
+ ExchangeWithdrawResponse,
WithdrawUriInfoResponse,
+ ExchangeBatchWithdrawRequest,
+ WalletNotification,
} from "@gnu-taler/taler-util";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
@@ -93,6 +95,7 @@ import {
import { walletCoreDebugFlags } from "../util/debugFlags.js";
import {
HttpRequestLibrary,
+ HttpResponse,
readSuccessResponseJsonOrErrorCode,
readSuccessResponseJsonOrThrow,
throwUnexpectedRequestError,
@@ -455,136 +458,21 @@ async function processPlanchetGenerate(
});
}
-/**
- * Send the withdrawal request for a generated planchet to the exchange.
- *
- * The verification of the response is done asynchronously to enable parallelism.
- */
-async function processPlanchetExchangeRequest(
- ws: InternalWalletState,
- wgContext: WithdrawalGroupContext,
- coinIdx: number,
-): Promise<WithdrawResponse | undefined> {
- const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
- logger.info(
- `processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`,
- );
- const d = await ws.db
- .mktx((x) => [
- x.withdrawalGroups,
- x.planchets,
- x.exchanges,
- x.denominations,
- ])
- .runReadOnly(async (tx) => {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
- logger.warn("processPlanchet: planchet already withdrawn");
- return;
- }
- const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
- if (!exchange) {
- logger.error("db inconsistent: exchange for planchet not found");
- return;
- }
-
- const denom = await ws.getDenomInfo(
- ws,
- tx,
- withdrawalGroup.exchangeBaseUrl,
- planchet.denomPubHash,
- );
-
- if (!denom) {
- logger.error("db inconsistent: denom for planchet not found");
- return;
- }
-
- logger.trace(
- `processing planchet #${coinIdx} in withdrawal ${withdrawalGroup.withdrawalGroupId}`,
- );
+interface WithdrawalRequestBatchArgs {
+ /**
+ * Use the batched request on the network level.
+ * Not supported by older exchanges.
+ */
+ useBatchRequest: boolean;
- const reqBody: ExchangeWithdrawRequest = {
- denom_pub_hash: planchet.denomPubHash,
- reserve_sig: planchet.withdrawSig,
- coin_ev: planchet.coinEv,
- };
- const reqUrl = new URL(
- `reserves/${withdrawalGroup.reservePub}/withdraw`,
- exchange.baseUrl,
- ).href;
+ coinStartIndex: number;
- return { reqUrl, reqBody };
- });
+ batchSize: number;
+}
- if (!d) {
- return;
- }
- const { reqUrl, reqBody } = d;
-
- try {
- const resp = await ws.http.postJson(reqUrl, reqBody);
- if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
- logger.info("withdrawal requires KYC");
- const respJson = await resp.json();
- const uuidResp = codecForWalletKycUuid().decode(respJson);
- logger.info(`kyc uuid response: ${j2s(uuidResp)}`);
- await ws.db
- .mktx((x) => [x.planchets, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.planchetStatus = PlanchetStatus.KycRequired;
- const wg2 = await tx.withdrawalGroups.get(
- withdrawalGroup.withdrawalGroupId,
- );
- if (!wg2) {
- return;
- }
- wg2.kycPending = {
- paytoHash: uuidResp.h_payto,
- requirementRow: uuidResp.requirement_row,
- };
- await tx.planchets.put(planchet);
- await tx.withdrawalGroups.put(wg2);
- });
- return;
- }
- const r = await readSuccessResponseJsonOrThrow(
- resp,
- codecForWithdrawResponse(),
- );
- return r;
- } catch (e) {
- const errDetail = getErrorDetailFromException(e);
- logger.trace("withdrawal request failed", e);
- logger.trace(String(e));
- await ws.db
- .mktx((x) => [x.planchets])
- .runReadWrite(async (tx) => {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.lastError = errDetail;
- await tx.planchets.put(planchet);
- });
- return;
- }
+interface WithdrawalBatchResult {
+ coinIdxs: number[];
+ batchResp: ExchangeWithdrawBatchResponse;
}
/**
@@ -595,15 +483,18 @@ async function processPlanchetExchangeRequest(
async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
-): Promise<WithdrawBatchResponse | undefined> {
+ args: WithdrawalRequestBatchArgs,
+): Promise<WithdrawalBatchResult> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
- `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`,
+ `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}, start=${args.coinStartIndex}, len=${args.batchSize}`,
);
- const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
- .map((x) => x.count)
- .reduce((a, b) => a + b);
- const d = await ws.db
+
+ const batchReq: ExchangeBatchWithdrawRequest = { planchets: [] };
+ // Indices of coins that are included in the batch request
+ const coinIdxs: number[] = [];
+
+ await ws.db
.mktx((x) => [
x.withdrawalGroups,
x.planchets,
@@ -611,26 +502,22 @@ async function processPlanchetExchangeBatchRequest(
x.denominations,
])
.runReadOnly(async (tx) => {
- const reqBody: { planchets: ExchangeWithdrawRequest[] } = {
- planchets: [],
- };
- const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
- if (!exchange) {
- logger.error("db inconsistent: exchange for planchet not found");
- return;
- }
-
- for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
+ for (
+ let coinIdx = args.coinStartIndex;
+ coinIdx < args.coinStartIndex + args.batchSize &&
+ coinIdx < wgContext.numPlanchets;
+ coinIdx++
+ ) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
]);
if (!planchet) {
- return;
+ continue;
}
if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
logger.warn("processPlanchet: planchet already withdrawn");
- return;
+ continue;
}
const denom = await ws.getDenomInfo(
ws,
@@ -641,7 +528,7 @@ async function processPlanchetExchangeBatchRequest(
if (!denom) {
logger.error("db inconsistent: denom for planchet not found");
- return;
+ continue;
}
const planchetReq: ExchangeWithdrawRequest = {
@@ -649,35 +536,145 @@ async function processPlanchetExchangeBatchRequest(
reserve_sig: planchet.withdrawSig,
coin_ev: planchet.coinEv,
};
- reqBody.planchets.push(planchetReq);
+ batchReq.planchets.push(planchetReq);
+ coinIdxs.push(coinIdx);
}
- return reqBody;
});
- if (!d) {
+ if (batchReq.planchets.length == 0) {
+ logger.warn("empty withdrawal batch");
+ return {
+ batchResp: { ev_sigs: [] },
+ coinIdxs: [],
+ };
+ }
+
+ async function handleKycRequired(resp: HttpResponse, startIdx: number) {
+ logger.info("withdrawal requires KYC");
+ const respJson = await resp.json();
+ const uuidResp = codecForWalletKycUuid().decode(respJson);
+ logger.info(`kyc uuid response: ${j2s(uuidResp)}`);
+ await ws.db
+ .mktx((x) => [x.planchets, x.withdrawalGroups])
+ .runReadWrite(async (tx) => {
+ for (let i = 0; i < startIdx; i++) {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdxs[i],
+ ]);
+ if (!planchet) {
+ continue;
+ }
+ planchet.planchetStatus = PlanchetStatus.KycRequired;
+ await tx.planchets.put(planchet);
+ }
+ const wg2 = await tx.withdrawalGroups.get(
+ withdrawalGroup.withdrawalGroupId,
+ );
+ if (!wg2) {
+ return;
+ }
+ wg2.kycPending = {
+ paytoHash: uuidResp.h_payto,
+ requirementRow: uuidResp.requirement_row,
+ };
+ await tx.withdrawalGroups.put(wg2);
+ });
return;
}
- const reqUrl = new URL(
- `reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
- withdrawalGroup.exchangeBaseUrl,
- ).href;
+ async function storeCoinError(e: any, coinIdx: number) {
+ const errDetail = getErrorDetailFromException(e);
+ logger.trace("withdrawal request failed", e);
+ logger.trace(String(e));
+ await ws.db
+ .mktx((x) => [x.planchets])
+ .runReadWrite(async (tx) => {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = errDetail;
+ await tx.planchets.put(planchet);
+ });
+ }
- const resp = await ws.http.postJson(reqUrl, d);
- const r = await readSuccessResponseJsonOrThrow(
- resp,
- codecForWithdrawBatchResponse(),
- );
- return r;
+ // FIXME: handle individual error codes better!
+
+ if (args.useBatchRequest) {
+ const reqUrl = new URL(
+ `reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
+ withdrawalGroup.exchangeBaseUrl,
+ ).href;
+
+ try {
+ const resp = await ws.http.postJson(reqUrl, batchReq);
+ if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
+ await handleKycRequired(resp, 0);
+ }
+ const r = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForWithdrawBatchResponse(),
+ );
+ return {
+ coinIdxs,
+ batchResp: r,
+ };
+ } catch (e) {
+ await storeCoinError(e, coinIdxs[0]);
+ return {
+ batchResp: { ev_sigs: [] },
+ coinIdxs: [],
+ };
+ }
+ } else {
+ // We emulate the batch response here by making multiple individual requests
+ const responses: ExchangeWithdrawBatchResponse = {
+ ev_sigs: [],
+ };
+ for (let i = 0; i < batchReq.planchets.length; i++) {
+ try {
+ const p = batchReq.planchets[i];
+ const reqUrl = new URL(
+ `reserves/${withdrawalGroup.reservePub}/withdraw`,
+ withdrawalGroup.exchangeBaseUrl,
+ ).href;
+ const resp = await ws.http.postJson(reqUrl, p);
+ if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
+ await handleKycRequired(resp, i);
+ // We still return blinded coins that we could actually withdraw.
+ return {
+ coinIdxs,
+ batchResp: responses,
+ };
+ }
+ const r = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForWithdrawResponse(),
+ );
+ responses.ev_sigs.push(r);
+ } catch (e) {
+ await storeCoinError(e, coinIdxs[i]);
+ }
+ }
+ return {
+ coinIdxs,
+ batchResp: responses,
+ };
+ }
}
async function processPlanchetVerifyAndStoreCoin(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
coinIdx: number,
- resp: WithdrawResponse,
+ resp: ExchangeWithdrawResponse,
): Promise<void> {
const withdrawalGroup = wgContext.wgRecord;
+ logger.info(`checking and storing planchet idx=${coinIdx}`);
const d = await ws.db
.mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations])
.runReadOnly(async (tx) => {
@@ -791,6 +788,14 @@ async function processPlanchetVerifyAndStoreCoin(
wgContext.planchetsFinished.add(planchet.coinPub);
+ // We create the notification here, as the async transaction below
+ // allows other planchet withdrawals to change wgContext.planchetsFinished
+ const notification: WalletNotification = {
+ type: NotificationType.CoinWithdrawn,
+ numTotal: wgContext.numPlanchets,
+ numWithdrawn: wgContext.planchetsFinished.size,
+ }
+
// Check if this is the first time that the whole
// withdrawal succeeded. If so, mark the withdrawal
// group as finished.
@@ -814,11 +819,7 @@ async function processPlanchetVerifyAndStoreCoin(
});
if (firstSuccess) {
- ws.notify({
- type: NotificationType.CoinWithdrawn,
- numTotal: wgContext.numPlanchets,
- numWithdrawn: wgContext.planchetsFinished.size,
- });
+ ws.notify(notification);
}
}
@@ -1150,8 +1151,6 @@ export async function processWithdrawalGroup(
wgRecord: withdrawalGroup,
};
- let work: Promise<void>[] = [];
-
await ws.db
.mktx((x) => [x.planchets])
.runReadOnly(async (tx) => {
@@ -1165,44 +1164,35 @@ export async function processWithdrawalGroup(
}
});
+ // We sequentially generate planchets, so that
+ // large withdrawal groups don't make the wallet unresponsive.
for (let i = 0; i < numTotalCoins; i++) {
- work.push(processPlanchetGenerate(ws, withdrawalGroup, i));
+ await processPlanchetGenerate(ws, withdrawalGroup, i);
}
- // Generate coins concurrently (parallelism only happens in the crypto API workers)
- await Promise.all(work);
-
- work = [];
+ const maxBatchSize = 100;
- if (ws.batchWithdrawal) {
- const resp = await processPlanchetExchangeBatchRequest(ws, wgContext);
- if (!resp) {
- throw Error("unable to do batch withdrawal");
- }
- for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
+ for (let i = 0; i < numTotalCoins; i += maxBatchSize) {
+ const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, {
+ batchSize: maxBatchSize,
+ coinStartIndex: i,
+ useBatchRequest: ws.batchWithdrawal,
+ });
+ let work: Promise<void>[] = [];
+ work = [];
+ for (let j = 0; j < resp.coinIdxs.length; j++) {
work.push(
processPlanchetVerifyAndStoreCoin(
ws,
wgContext,
- coinIdx,
- resp.ev_sigs[coinIdx],
+ resp.coinIdxs[j],
+ resp.batchResp.ev_sigs[j],
),
);
}
- } else {
- for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
- const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx);
- if (!resp) {
- continue;
- }
- work.push(
- processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp),
- );
- }
+ await Promise.all(work);
}
- await Promise.all(work);
-
let numFinished = 0;
let numKycRequired = 0;
let finishedForFirstTime = false;