diff options
Diffstat (limited to 'packages/taler-wallet-core/src')
3 files changed, 166 insertions, 107 deletions
diff --git a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts index 77ee65e52..0745d70c4 100644 --- a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts +++ b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts @@ -1468,15 +1468,12 @@ export const nativeCryptoR: TalerCryptoInterfaceR = { const hExchangeBaseUrl = hash(stringToBytes(req.exchangeBaseUrl + "\0")); const deposits: PurseDeposit[] = []; for (const c of req.coins) { - let haveAch: boolean; let maybeAch: Uint8Array; if (c.ageCommitmentProof) { - haveAch = true; maybeAch = decodeCrock( AgeRestriction.hashCommitment(c.ageCommitmentProof.commitment), ); } else { - haveAch = false; maybeAch = new Uint8Array(32); } const sigBlob = buildSigPS(TalerSignaturePurpose.WALLET_PURSE_DEPOSIT) diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 705317eb6..92eb44a87 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -33,6 +33,7 @@ import { HttpStatusCode, Logger, NotificationType, + ObservabilityEventType, PeerContractTerms, PreparePeerPullDebitRequest, PreparePeerPullDebitResponse, @@ -425,6 +426,11 @@ async function processPeerPullDebitPendingDeposit( wex: WalletExecutionContext, peerPullInc: PeerPullPaymentIncomingRecord, ): Promise<TaskRunResult> { + const ctx = new PeerPullDebitTransactionContext( + wex, + peerPullInc.peerPullDebitId, + ); + const pursePub = peerPullInc.pursePub; const coinSel = peerPullInc.coinSel; @@ -512,70 +518,82 @@ async function processPeerPullDebitPendingDeposit( } } - const coins = await queryCoinInfosForSelection(wex, coinSel); - - const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ - exchangeBaseUrl: peerPullInc.exchangeBaseUrl, - pursePub: peerPullInc.pursePub, - coins, - }); - const purseDepositUrl = new URL( `purses/${pursePub}/deposit`, peerPullInc.exchangeBaseUrl, ); - const depositPayload: ExchangePurseDeposits = { - deposits: depositSigsResp.deposits, - }; + // FIXME: We could skip batches that we've already submitted. - if (logger.shouldLogTrace()) { - logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); - } + const coins = await queryCoinInfosForSelection(wex, coinSel); - const httpResp = await wex.http.fetch(purseDepositUrl.href, { - method: "POST", - body: depositPayload, - cancellationToken: wex.cancellationToken, - }); + const maxBatchSize = 100; - const ctx = new PeerPullDebitTransactionContext( - wex, - peerPullInc.peerPullDebitId, - ); + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); - switch (httpResp.status) { - case HttpStatusCode.Ok: { - const resp = await readSuccessResponseJsonOrThrow( - httpResp, - codecForAny(), - ); - logger.trace(`purse deposit response: ${j2s(resp)}`); + wex.oc.observe({ + type: ObservabilityEventType.Message, + contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`, + }); - await ctx.transition(async (r) => { - if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return TransitionResultType.Stay; - } - r.status = PeerPullDebitRecordStatus.Done; - return TransitionResultType.Transition; - }); - return TaskRunResult.finished(); - } - case HttpStatusCode.Gone: { - await ctx.abortTransaction(); - return TaskRunResult.backoff(); - } - case HttpStatusCode.Conflict: { - return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + const batchCoins = coins.slice(i, i + batchSize); + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPullInc.exchangeBaseUrl, + pursePub: peerPullInc.pursePub, + coins: batchCoins, + }); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + if (logger.shouldLogTrace()) { + logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, - }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: { + const resp = await readSuccessResponseJsonOrThrow( + httpResp, + codecForAny(), + ); + logger.trace(`purse deposit response: ${j2s(resp)}`); + continue; + } + case HttpStatusCode.Gone: { + await ctx.abortTransaction(); + return TaskRunResult.backoff(); + } + case HttpStatusCode.Conflict: { + return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } } } + + // All batches succeeded, we can transition! + + await ctx.transition(async (r) => { + if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return TransitionResultType.Stay; + } + r.status = PeerPullDebitRecordStatus.Done; + return TransitionResultType.Transition; + }); + return TaskRunResult.finished(); } async function processPeerPullDebitAbortingRefresh( diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index b6771be89..63a02d7a7 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -20,6 +20,7 @@ import { CheckPeerPushDebitResponse, CoinRefreshRequest, ContractTermsUtil, + ExchangePurseDeposits, HttpStatusCode, InitiatePeerPushDebitRequest, InitiatePeerPushDebitResponse, @@ -564,12 +565,6 @@ async function processPeerPushDebitCreateReserve( peerPushInitiation.coinSel, ); - const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ - exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, - pursePub: peerPushInitiation.pursePub, - coins, - }); - const encryptContractRequest: EncryptContractRequest = { contractTerms: contractTermsRecord.contractTermsRaw, mergePriv: peerPushInitiation.mergePriv, @@ -580,66 +575,115 @@ async function processPeerPushDebitCreateReserve( nonce: peerPushInitiation.contractEncNonce, }; - logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); - const econtractResp = await wex.cryptoApi.encryptContractForMerge( encryptContractRequest, ); - const createPurseUrl = new URL( - `purses/${peerPushInitiation.pursePub}/create`, - peerPushInitiation.exchangeBaseUrl, - ); + const maxBatchSize = 100; - const reqBody = { - amount: peerPushInitiation.amount, - merge_pub: peerPushInitiation.mergePub, - purse_sig: purseSigResp.sig, - h_contract_terms: hContractTerms, - purse_expiration: timestampProtocolFromDb(purseExpiration), - deposits: depositSigsResp.deposits, - min_age: 0, - econtract: econtractResp.econtract, - }; + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); + const batchCoins = coins.slice(i, i + batchSize); - logger.trace(`request body: ${j2s(reqBody)}`); + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, + pursePub: peerPushInitiation.pursePub, + coins: batchCoins, + }); - const httpResp = await wex.http.fetch(createPurseUrl.href, { - method: "POST", - body: reqBody, - cancellationToken: wex.cancellationToken, - }); + if (i == 0) { + // First batch creates the purse! - { - const resp = await httpResp.json(); - logger.info(`resp: ${j2s(resp)}`); - } + logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); - switch (httpResp.status) { - case HttpStatusCode.Ok: - break; - case HttpStatusCode.Forbidden: { - // FIXME: Store this error! - await ctx.failTransaction(); - return TaskRunResult.finished(); - } - case HttpStatusCode.Conflict: { - // Handle double-spending - return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); - } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, + const createPurseUrl = new URL( + `purses/${peerPushInitiation.pursePub}/create`, + peerPushInitiation.exchangeBaseUrl, + ); + + const reqBody = { + amount: peerPushInitiation.amount, + merge_pub: peerPushInitiation.mergePub, + purse_sig: purseSigResp.sig, + h_contract_terms: hContractTerms, + purse_expiration: timestampProtocolFromDb(purseExpiration), + deposits: depositSigsResp.deposits, + min_age: 0, + econtract: econtractResp.econtract, }; + + if (logger.shouldLogTrace()) { + logger.trace(`request body: ${j2s(reqBody)}`); + } + + const httpResp = await wex.http.fetch(createPurseUrl.href, { + method: "POST", + body: reqBody, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } + } else { + const purseDepositUrl = new URL( + `purses/${pursePub}/deposit`, + peerPushInitiation.exchangeBaseUrl, + ); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } } } - if (httpResp.status !== HttpStatusCode.Ok) { - // FIXME: do proper error reporting - throw Error("got error response from exchange"); - } + // All batches done! await transitionPeerPushDebitTransaction(wex, pursePub, { stFrom: PeerPushDebitStatus.PendingCreatePurse, |