commit 3847b84436df8985b6143606e7eeced2141e9baa
parent 3529b94fe7c59ac357b8b8e053b3b0c70be42d15
Author: Florian Dold <florian@dold.me>
Date: Mon, 14 Jul 2025 23:19:26 +0200
wallet-core: split deposit batches
The exchange only accepts a limited number of deposits per batch
deposit request, so we need to split batches.
Diffstat:
2 files changed, 294 insertions(+), 220 deletions(-)
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
@@ -29,6 +29,7 @@ import {
BatchDepositRequestCoin,
CheckDepositRequest,
CheckDepositResponse,
+ CoinDepositPermission,
CoinRefreshRequest,
CreateDepositGroupRequest,
CreateDepositGroupResponse,
@@ -61,6 +62,7 @@ import {
TransactionState,
TransactionType,
URL,
+ WalletContractData,
WalletNotification,
assertUnreachable,
canonicalJson,
@@ -120,7 +122,7 @@ import {
getExchangeWireFee,
getScopeForAllExchanges,
} from "./exchanges.js";
-import { EddsaKeyPairStrings } from "./index.js";
+import { EddsaKeyPairStrings, SignContractTermsHashResponse } from "./index.js";
import {
GenericKycStatusReq,
checkDepositHardLimitExceeded,
@@ -134,7 +136,6 @@ import {
getTotalPaymentCost,
} from "./pay-merchant.js";
import {
- CreateRefreshGroupResult,
RefreshTransactionContext,
createRefreshGroup,
getTotalRefreshCost,
@@ -826,7 +827,7 @@ async function refundDepositGroup(
"unable to refund deposit group without coin selection (selection missing)",
);
}
- const newTxPerCoin = [...statusPerCoin];
+ let newTxPerCoin = [...statusPerCoin];
// Refunds that might need to be handed off to the refresh,
// as we don't know if deposit request will still arrive
// before doing the refresh.
@@ -838,6 +839,7 @@ async function refundDepositGroup(
switch (st) {
case DepositElementStatus.RefundFailed:
case DepositElementStatus.RefundSuccess:
+ case DepositElementStatus.RefundNotFound:
break;
default: {
const coinPub = payCoinSelection.coinPubs[i];
@@ -893,23 +895,28 @@ async function refundDepositGroup(
}
// FIXME: Handle case where refund request needs to be tried again
newTxPerCoin[i] = newStatus;
+ await wex.db.runReadWriteTx(
+ {
+ storeNames: ["depositGroups"],
+ },
+ async (tx) => {
+ const newDg = await tx.depositGroups.get(
+ depositGroup.depositGroupId,
+ );
+ if (!newDg || !newDg.statusPerCoin) {
+ return;
+ }
+ newDg.statusPerCoin[i] = newStatus;
+ await tx.depositGroups.put(newDg);
+ newTxPerCoin = [...newDg.statusPerCoin];
+ },
+ );
break;
}
}
}
// Check if we are done trying to refund.
- let refundsAllDone = true;
- for (let i = 0; i < newTxPerCoin.length; i++) {
- switch (newTxPerCoin[i]) {
- case DepositElementStatus.RefundFailed:
- case DepositElementStatus.RefundNotFound:
- case DepositElementStatus.RefundSuccess:
- break;
- default:
- refundsAllDone = false;
- }
- }
const res = await wex.db.runReadWriteTx(
{
@@ -926,10 +933,24 @@ async function refundDepositGroup(
},
async (tx) => {
const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
- if (!newDg) {
+ if (!newDg || !newDg.statusPerCoin) {
+ return;
+ }
+ let refundsAllDone = true;
+ for (let i = 0; i < newTxPerCoin.length; i++) {
+ switch (newTxPerCoin[i]) {
+ case DepositElementStatus.RefundFailed:
+ case DepositElementStatus.RefundNotFound:
+ case DepositElementStatus.RefundSuccess:
+ break;
+ default:
+ refundsAllDone = false;
+ }
+ }
+ if (!refundsAllDone) {
return;
}
- newDg.statusPerCoin = newTxPerCoin;
+ newTxPerCoin = [...newDg.statusPerCoin];
const refreshCoins: CoinRefreshRequest[] = [];
for (let i = 0; i < newTxPerCoin.length; i++) {
refreshCoins.push({
@@ -938,21 +959,18 @@ async function refundDepositGroup(
refundRequest: refundReqPerCoin[i],
});
}
- let refreshRes: CreateRefreshGroupResult | undefined = undefined;
- if (refundsAllDone) {
- refreshRes = await createRefreshGroup(
- wex,
- tx,
- currency,
- refreshCoins,
- RefreshReason.AbortDeposit,
- constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId: newDg.depositGroupId,
- }),
- );
- newDg.abortRefreshGroupId = refreshRes.refreshGroupId;
- }
+ const refreshRes = await createRefreshGroup(
+ wex,
+ tx,
+ currency,
+ refreshCoins,
+ RefreshReason.AbortDeposit,
+ constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: newDg.depositGroupId,
+ }),
+ );
+ newDg.abortRefreshGroupId = refreshRes.refreshGroupId;
await tx.depositGroups.put(newDg);
await ctx.updateTransactionMeta(tx);
return { refreshRes };
@@ -960,12 +978,10 @@ async function refundDepositGroup(
);
if (res?.refreshRes) {
- for (const notif of res.refreshRes.notifications) {
- wex.ws.notify(notif);
- }
+ return TaskRunResult.progress();
}
- return TaskRunResult.progress();
+ return TaskRunResult.backoff();
}
/**
@@ -1506,6 +1522,217 @@ async function processDepositGroupTrack(
}
}
+async function doCoinSelection(
+ ctx: DepositTransactionContext,
+ depositGroup: DepositGroupRecord,
+ contractData: WalletContractData,
+): Promise<TaskRunResult> {
+ const wex = ctx.wex;
+ const depositGroupId = ctx.depositGroupId;
+
+ if (
+ contractData.version !== undefined &&
+ contractData.version !== MerchantContractVersion.V0
+ ) {
+ throw Error("assertion failed");
+ }
+
+ const transitionDone = await wex.db.runAllStoresReadWriteTx(
+ {},
+ async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return false;
+ }
+ if (dg.statusPerCoin) {
+ return false;
+ }
+
+ const contractTermsRec = tx.contractTerms.get(
+ depositGroup.contractTermsHash,
+ );
+ if (!contractTermsRec) {
+ throw Error("contract terms for deposit not found in database");
+ }
+
+ const payCoinSel = await selectPayCoinsInTx(wex, tx, {
+ restrictExchanges: {
+ auditors: [],
+ exchanges: contractData.exchanges.map((ex) => ({
+ exchangeBaseUrl: ex.url,
+ exchangePub: ex.master_pub,
+ })),
+ },
+ restrictWireMethod: contractData.wire_method,
+ depositPaytoUri: dg.wire.payto_uri,
+ contractTermsAmount: Amounts.parseOrThrow(contractData.amount),
+ depositFeeLimit: Amounts.parseOrThrow(contractData.max_fee),
+ prevPayCoins: [],
+ });
+
+ switch (payCoinSel.type) {
+ case "success":
+ logger.info("coin selection success");
+ break;
+ case "failure":
+ logger.info("coin selection failure");
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE,
+ {
+ insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails,
+ },
+ );
+ case "prospective":
+ logger.info("coin selection prospective");
+ throw Error("insufficient balance (waiting on pending refresh)");
+ default:
+ assertUnreachable(payCoinSel);
+ }
+
+ dg.payCoinSelection = {
+ coinContributions: payCoinSel.coinSel.coins.map((x) => x.contribution),
+ coinPubs: payCoinSel.coinSel.coins.map((x) => x.coinPub),
+ };
+ dg.payCoinSelectionUid = encodeCrock(getRandomBytes(32));
+ dg.statusPerCoin = payCoinSel.coinSel.coins.map(
+ () => DepositElementStatus.DepositPending,
+ );
+ await tx.depositGroups.put(dg);
+ await ctx.updateTransactionMeta(tx);
+ await spendCoins(wex, tx, {
+ transactionId: ctx.transactionId,
+ coinPubs: dg.payCoinSelection.coinPubs,
+ contributions: dg.payCoinSelection.coinContributions.map((x) =>
+ Amounts.parseOrThrow(x),
+ ),
+ refreshReason: RefreshReason.PayDeposit,
+ });
+ return true;
+ },
+ );
+
+ if (transitionDone) {
+ return TaskRunResult.progress();
+ } else {
+ return TaskRunResult.backoff();
+ }
+}
+
+interface SubmitBatchArgs {
+ exchangeBaseUrl: string;
+ contractTerms: MerchantContractTermsV0;
+ depositPermissions: CoinDepositPermission[];
+ depositGroup: DepositGroupRecord;
+ merchantSigResp: SignContractTermsHashResponse;
+ coinIndexes: number[];
+}
+
+/**
+ * Submit a single deposit batch to the exchange.
+ *
+ * Returns null on success or a TaskRunResult
+ * to abort the task with on error.
+ */
+async function submitDepositBatch(
+ wex: WalletExecutionContext,
+ args: SubmitBatchArgs,
+): Promise<TaskRunResult | null> {
+ const coins: BatchDepositRequestCoin[] = [];
+ const {
+ merchantSigResp,
+ exchangeBaseUrl,
+ contractTerms,
+ depositGroup,
+ depositPermissions,
+ coinIndexes,
+ } = args;
+ const depositGroupId = depositGroup.depositGroupId;
+ const batchReq: ExchangeBatchDepositRequest = {
+ coins,
+ h_contract_terms: depositGroup.contractTermsHash,
+ merchant_payto_uri: depositGroup.wire.payto_uri,
+ merchant_pub: contractTerms.merchant_pub,
+ timestamp: contractTerms.timestamp,
+ wire_salt: depositGroup.wire.salt,
+ wire_transfer_deadline: contractTerms.wire_transfer_deadline,
+ refund_deadline: contractTerms.refund_deadline,
+ merchant_sig: merchantSigResp.sig,
+ };
+ const ctx = new DepositTransactionContext(wex, depositGroupId);
+ for (let i = 0; i < coinIndexes.length; i++) {
+ const perm = depositPermissions[coinIndexes[i]];
+ if (perm.exchange_url != exchangeBaseUrl) {
+ continue;
+ }
+ coins.push({
+ coin_pub: perm.coin_pub,
+ coin_sig: perm.coin_sig,
+ contribution: Amounts.stringify(perm.contribution),
+ denom_pub_hash: perm.h_denom,
+ ub_sig: perm.ub_sig,
+ h_age_commitment: perm.h_age_commitment,
+ });
+ }
+
+ // Check for cancellation before making network request.
+ wex.cancellationToken?.throwIfCancelled();
+ const url = new URL(`batch-deposit`, exchangeBaseUrl);
+ logger.info(`depositing to ${url.href}`);
+ logger.trace(`deposit request: ${j2s(batchReq)}`);
+ const httpResp = await cancelableFetch(wex, url, {
+ method: "POST",
+ body: batchReq,
+ });
+
+ logger.info(`deposit result status ${httpResp.status}`);
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Accepted:
+ case HttpStatusCode.Ok:
+ break;
+ case HttpStatusCode.UnavailableForLegalReasons: {
+ const kycLegiNeededResp = await readResponseJsonOrThrow(
+ httpResp,
+ codecForLegitimizationNeededResponse(),
+ );
+ logger.info(
+ `kyc legitimization needed response: ${j2s(kycLegiNeededResp)}`,
+ );
+ return transitionToKycRequired(wex, depositGroup, {
+ exchangeUrl: exchangeBaseUrl,
+ kycPaytoHash: kycLegiNeededResp.h_payto,
+ kycAuth: kycLegiNeededResp.bad_kyc_auth ?? false,
+ });
+ }
+ }
+
+ await readSuccessResponseJsonOrThrow(httpResp, codecForBatchDepositSuccess());
+
+ await wex.db.runReadWriteTx(
+ { storeNames: ["depositGroups", "transactionsMeta"] },
+ async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ if (!dg.statusPerCoin) {
+ return;
+ }
+ for (const batchIndex of coinIndexes) {
+ const coinStatus = dg.statusPerCoin[batchIndex];
+ switch (coinStatus) {
+ case DepositElementStatus.DepositPending:
+ dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
+ await tx.depositGroups.put(dg);
+ }
+ }
+ await ctx.updateTransactionMeta(tx);
+ },
+ );
+
+ return null;
+}
+
/**
* Try to deposit coins with the exchange.
*
@@ -1536,12 +1763,6 @@ async function processDepositGroupPendingDeposit(
depositGroup.contractTermsHash,
"",
);
- if (
- contractData.version !== undefined &&
- contractData.version !== MerchantContractVersion.V0
- ) {
- throw Error("assertion failed");
- }
const ctx = new DepositTransactionContext(wex, depositGroupId);
@@ -1550,89 +1771,7 @@ async function processDepositGroupPendingDeposit(
if (!depositGroup.payCoinSelection) {
logger.info("missing coin selection for deposit group, selecting now");
-
- const transitionDone = await wex.db.runAllStoresReadWriteTx(
- {},
- async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return false;
- }
- if (dg.statusPerCoin) {
- return false;
- }
-
- const contractTermsRec = tx.contractTerms.get(
- depositGroup.contractTermsHash,
- );
- if (!contractTermsRec) {
- throw Error("contract terms for deposit not found in database");
- }
-
- const payCoinSel = await selectPayCoinsInTx(wex, tx, {
- restrictExchanges: {
- auditors: [],
- exchanges: contractData.exchanges.map((ex) => ({
- exchangeBaseUrl: ex.url,
- exchangePub: ex.master_pub,
- })),
- },
- restrictWireMethod: contractData.wire_method,
- depositPaytoUri: dg.wire.payto_uri,
- contractTermsAmount: Amounts.parseOrThrow(contractData.amount),
- depositFeeLimit: Amounts.parseOrThrow(contractData.max_fee),
- prevPayCoins: [],
- });
-
- switch (payCoinSel.type) {
- case "success":
- logger.info("coin selection success");
- break;
- case "failure":
- logger.info("coin selection failure");
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE,
- {
- insufficientBalanceDetails:
- payCoinSel.insufficientBalanceDetails,
- },
- );
- case "prospective":
- logger.info("coin selection prospective");
- throw Error("insufficient balance (waiting on pending refresh)");
- default:
- assertUnreachable(payCoinSel);
- }
-
- dg.payCoinSelection = {
- coinContributions: payCoinSel.coinSel.coins.map(
- (x) => x.contribution,
- ),
- coinPubs: payCoinSel.coinSel.coins.map((x) => x.coinPub),
- };
- dg.payCoinSelectionUid = encodeCrock(getRandomBytes(32));
- dg.statusPerCoin = payCoinSel.coinSel.coins.map(
- () => DepositElementStatus.DepositPending,
- );
- await tx.depositGroups.put(dg);
- await ctx.updateTransactionMeta(tx);
- await spendCoins(wex, tx, {
- transactionId: ctx.transactionId,
- coinPubs: dg.payCoinSelection.coinPubs,
- contributions: dg.payCoinSelection.coinContributions.map((x) =>
- Amounts.parseOrThrow(x),
- ),
- refreshReason: RefreshReason.PayDeposit,
- });
- return true;
- },
- );
-
- if (transitionDone) {
- return TaskRunResult.progress();
- } else {
- return TaskRunResult.backoff();
- }
+ return await doCoinSelection(ctx, depositGroup, contractData);
}
await wex.db.runReadWriteTx({ storeNames: ["depositGroups"] }, async (tx) => {
@@ -1665,97 +1804,38 @@ async function processDepositGroupPendingDeposit(
merchantPriv: depositGroup.merchantPriv,
});
- // We need to do one batch per exchange.
- for (const exchangeBaseUrl of exchanges.values()) {
- const coins: BatchDepositRequestCoin[] = [];
- const batchIndexes: number[] = [];
+ let maxBatchSize = 63;
- const batchReq: ExchangeBatchDepositRequest = {
- coins,
- h_contract_terms: depositGroup.contractTermsHash,
- merchant_payto_uri: depositGroup.wire.payto_uri,
- merchant_pub: contractTerms.merchant_pub,
- timestamp: contractTerms.timestamp,
- wire_salt: depositGroup.wire.salt,
- wire_transfer_deadline: contractTerms.wire_transfer_deadline,
- refund_deadline: contractTerms.refund_deadline,
- merchant_sig: merchantSigResp.sig,
+ // We need to do one or mre batches per exchange.
+ for (const exchangeBaseUrl of exchanges.values()) {
+ const batchArgs: SubmitBatchArgs = {
+ exchangeBaseUrl,
+ contractTerms,
+ depositPermissions,
+ depositGroup,
+ merchantSigResp,
+ coinIndexes: [],
};
-
for (let i = 0; i < depositPermissions.length; i++) {
const perm = depositPermissions[i];
if (perm.exchange_url != exchangeBaseUrl) {
continue;
}
- coins.push({
- coin_pub: perm.coin_pub,
- coin_sig: perm.coin_sig,
- contribution: Amounts.stringify(perm.contribution),
- denom_pub_hash: perm.h_denom,
- ub_sig: perm.ub_sig,
- h_age_commitment: perm.h_age_commitment,
- });
- batchIndexes.push(i);
+ batchArgs.coinIndexes.push(i);
+ if (batchArgs.coinIndexes.length >= maxBatchSize) {
+ const r = await submitDepositBatch(wex, batchArgs);
+ if (r != null) {
+ return r;
+ }
+ batchArgs.coinIndexes = [];
+ }
}
-
- // Check for cancellation before making network request.
- wex.cancellationToken?.throwIfCancelled();
- const url = new URL(`batch-deposit`, exchangeBaseUrl);
- logger.info(`depositing to ${url.href}`);
- logger.trace(`deposit request: ${j2s(batchReq)}`);
- const httpResp = await cancelableFetch(wex, url, {
- method: "POST",
- body: batchReq,
- });
-
- logger.info(`deposit result status ${httpResp.status}`);
-
- switch (httpResp.status) {
- case HttpStatusCode.Accepted:
- case HttpStatusCode.Ok:
- break;
- case HttpStatusCode.UnavailableForLegalReasons: {
- const kycLegiNeededResp = await readResponseJsonOrThrow(
- httpResp,
- codecForLegitimizationNeededResponse(),
- );
- logger.info(
- `kyc legitimization needed response: ${j2s(kycLegiNeededResp)}`,
- );
- return transitionToKycRequired(wex, depositGroup, {
- exchangeUrl: exchangeBaseUrl,
- kycPaytoHash: kycLegiNeededResp.h_payto,
- kycAuth: kycLegiNeededResp.bad_kyc_auth ?? false,
- });
+ if (batchArgs.coinIndexes.length >= 0) {
+ const r = await submitDepositBatch(wex, batchArgs);
+ if (r != null) {
+ return r;
}
}
-
- await readSuccessResponseJsonOrThrow(
- httpResp,
- codecForBatchDepositSuccess(),
- );
-
- await wex.db.runReadWriteTx(
- { storeNames: ["depositGroups", "transactionsMeta"] },
- async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return;
- }
- if (!dg.statusPerCoin) {
- return;
- }
- for (const batchIndex of batchIndexes) {
- const coinStatus = dg.statusPerCoin[batchIndex];
- switch (coinStatus) {
- case DepositElementStatus.DepositPending:
- dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
- await tx.depositGroups.put(dg);
- }
- }
- await ctx.updateTransactionMeta(tx);
- },
- );
}
await wex.db.runReadWriteTx(
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
@@ -2010,7 +2010,6 @@ async function applyRefreshToOldCoins(
export interface CreateRefreshGroupResult {
refreshGroupId: string;
- notifications: WalletNotification[];
}
/**
@@ -2110,18 +2109,17 @@ export async function createRefreshGroup(
// group to the DB, the shepherd will give up.
wex.taskScheduler.startShepherdTask(ctx.taskId);
+ tx.notify({
+ type: NotificationType.TransactionStateTransition,
+ transactionId: ctx.transactionId,
+ oldTxState: {
+ major: TransactionMajorState.None,
+ },
+ newTxState,
+ });
+
return {
refreshGroupId,
- notifications: [
- {
- type: NotificationType.TransactionStateTransition,
- transactionId: ctx.transactionId,
- oldTxState: {
- major: TransactionMajorState.None,
- },
- newTxState,
- },
- ],
};
}
@@ -2238,10 +2236,6 @@ export async function forceRefresh(
},
);
- for (const notif of res.notifications) {
- wex.ws.notify(notif);
- }
-
return {
refreshGroupId: res.refreshGroupId,
};