summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-pull-debit.ts')
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts120
1 files changed, 69 insertions, 51 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 705317eb6..92eb44a87 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -33,6 +33,7 @@ import {
HttpStatusCode,
Logger,
NotificationType,
+ ObservabilityEventType,
PeerContractTerms,
PreparePeerPullDebitRequest,
PreparePeerPullDebitResponse,
@@ -425,6 +426,11 @@ async function processPeerPullDebitPendingDeposit(
wex: WalletExecutionContext,
peerPullInc: PeerPullPaymentIncomingRecord,
): Promise<TaskRunResult> {
+ const ctx = new PeerPullDebitTransactionContext(
+ wex,
+ peerPullInc.peerPullDebitId,
+ );
+
const pursePub = peerPullInc.pursePub;
const coinSel = peerPullInc.coinSel;
@@ -512,70 +518,82 @@ async function processPeerPullDebitPendingDeposit(
}
}
- const coins = await queryCoinInfosForSelection(wex, coinSel);
-
- const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
- exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
- pursePub: peerPullInc.pursePub,
- coins,
- });
-
const purseDepositUrl = new URL(
`purses/${pursePub}/deposit`,
peerPullInc.exchangeBaseUrl,
);
- const depositPayload: ExchangePurseDeposits = {
- deposits: depositSigsResp.deposits,
- };
+ // FIXME: We could skip batches that we've already submitted.
- if (logger.shouldLogTrace()) {
- logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
- }
+ const coins = await queryCoinInfosForSelection(wex, coinSel);
- const httpResp = await wex.http.fetch(purseDepositUrl.href, {
- method: "POST",
- body: depositPayload,
- cancellationToken: wex.cancellationToken,
- });
+ const maxBatchSize = 100;
- const ctx = new PeerPullDebitTransactionContext(
- wex,
- peerPullInc.peerPullDebitId,
- );
+ for (let i = 0; i < coins.length; i += maxBatchSize) {
+ const batchSize = Math.min(maxBatchSize, coins.length - i);
- switch (httpResp.status) {
- case HttpStatusCode.Ok: {
- const resp = await readSuccessResponseJsonOrThrow(
- httpResp,
- codecForAny(),
- );
- logger.trace(`purse deposit response: ${j2s(resp)}`);
+ wex.oc.observe({
+ type: ObservabilityEventType.Message,
+ contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`,
+ });
- await ctx.transition(async (r) => {
- if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
- return TransitionResultType.Stay;
- }
- r.status = PeerPullDebitRecordStatus.Done;
- return TransitionResultType.Transition;
- });
- return TaskRunResult.finished();
- }
- case HttpStatusCode.Gone: {
- await ctx.abortTransaction();
- return TaskRunResult.backoff();
- }
- case HttpStatusCode.Conflict: {
- return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ const batchCoins = coins.slice(i, i + batchSize);
+ const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
+ exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
+ pursePub: peerPullInc.pursePub,
+ coins: batchCoins,
+ });
+
+ const depositPayload: ExchangePurseDeposits = {
+ deposits: depositSigsResp.deposits,
+ };
+
+ if (logger.shouldLogTrace()) {
+ logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
}
- default: {
- const errResp = await readTalerErrorResponse(httpResp);
- return {
- type: TaskRunResultType.Error,
- errorDetail: errResp,
- };
+
+ const httpResp = await wex.http.fetch(purseDepositUrl.href, {
+ method: "POST",
+ body: depositPayload,
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Ok: {
+ const resp = await readSuccessResponseJsonOrThrow(
+ httpResp,
+ codecForAny(),
+ );
+ logger.trace(`purse deposit response: ${j2s(resp)}`);
+ continue;
+ }
+ case HttpStatusCode.Gone: {
+ await ctx.abortTransaction();
+ return TaskRunResult.backoff();
+ }
+ case HttpStatusCode.Conflict: {
+ return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ }
+ default: {
+ const errResp = await readTalerErrorResponse(httpResp);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: errResp,
+ };
+ }
}
}
+
+ // All batches succeeded, we can transition!
+
+ await ctx.transition(async (r) => {
+ if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return TransitionResultType.Stay;
+ }
+ r.status = PeerPullDebitRecordStatus.Done;
+ return TransitionResultType.Transition;
+ });
+ return TaskRunResult.finished();
}
async function processPeerPullDebitAbortingRefresh(