summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r--packages/taler-wallet-core/src/crypto/cryptoImplementation.ts3
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts120
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts150
3 files changed, 166 insertions, 107 deletions
diff --git a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts
index 77ee65e52..0745d70c4 100644
--- a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts
+++ b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts
@@ -1468,15 +1468,12 @@ export const nativeCryptoR: TalerCryptoInterfaceR = {
const hExchangeBaseUrl = hash(stringToBytes(req.exchangeBaseUrl + "\0"));
const deposits: PurseDeposit[] = [];
for (const c of req.coins) {
- let haveAch: boolean;
let maybeAch: Uint8Array;
if (c.ageCommitmentProof) {
- haveAch = true;
maybeAch = decodeCrock(
AgeRestriction.hashCommitment(c.ageCommitmentProof.commitment),
);
} else {
- haveAch = false;
maybeAch = new Uint8Array(32);
}
const sigBlob = buildSigPS(TalerSignaturePurpose.WALLET_PURSE_DEPOSIT)
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 705317eb6..92eb44a87 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -33,6 +33,7 @@ import {
HttpStatusCode,
Logger,
NotificationType,
+ ObservabilityEventType,
PeerContractTerms,
PreparePeerPullDebitRequest,
PreparePeerPullDebitResponse,
@@ -425,6 +426,11 @@ async function processPeerPullDebitPendingDeposit(
wex: WalletExecutionContext,
peerPullInc: PeerPullPaymentIncomingRecord,
): Promise<TaskRunResult> {
+ const ctx = new PeerPullDebitTransactionContext(
+ wex,
+ peerPullInc.peerPullDebitId,
+ );
+
const pursePub = peerPullInc.pursePub;
const coinSel = peerPullInc.coinSel;
@@ -512,70 +518,82 @@ async function processPeerPullDebitPendingDeposit(
}
}
- const coins = await queryCoinInfosForSelection(wex, coinSel);
-
- const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
- exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
- pursePub: peerPullInc.pursePub,
- coins,
- });
-
const purseDepositUrl = new URL(
`purses/${pursePub}/deposit`,
peerPullInc.exchangeBaseUrl,
);
- const depositPayload: ExchangePurseDeposits = {
- deposits: depositSigsResp.deposits,
- };
+ // FIXME: We could skip batches that we've already submitted.
- if (logger.shouldLogTrace()) {
- logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
- }
+ const coins = await queryCoinInfosForSelection(wex, coinSel);
- const httpResp = await wex.http.fetch(purseDepositUrl.href, {
- method: "POST",
- body: depositPayload,
- cancellationToken: wex.cancellationToken,
- });
+ const maxBatchSize = 100;
- const ctx = new PeerPullDebitTransactionContext(
- wex,
- peerPullInc.peerPullDebitId,
- );
+ for (let i = 0; i < coins.length; i += maxBatchSize) {
+ const batchSize = Math.min(maxBatchSize, coins.length - i);
- switch (httpResp.status) {
- case HttpStatusCode.Ok: {
- const resp = await readSuccessResponseJsonOrThrow(
- httpResp,
- codecForAny(),
- );
- logger.trace(`purse deposit response: ${j2s(resp)}`);
+ wex.oc.observe({
+ type: ObservabilityEventType.Message,
+ contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`,
+ });
- await ctx.transition(async (r) => {
- if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
- return TransitionResultType.Stay;
- }
- r.status = PeerPullDebitRecordStatus.Done;
- return TransitionResultType.Transition;
- });
- return TaskRunResult.finished();
- }
- case HttpStatusCode.Gone: {
- await ctx.abortTransaction();
- return TaskRunResult.backoff();
- }
- case HttpStatusCode.Conflict: {
- return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ const batchCoins = coins.slice(i, i + batchSize);
+ const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
+ exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
+ pursePub: peerPullInc.pursePub,
+ coins: batchCoins,
+ });
+
+ const depositPayload: ExchangePurseDeposits = {
+ deposits: depositSigsResp.deposits,
+ };
+
+ if (logger.shouldLogTrace()) {
+ logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
}
- default: {
- const errResp = await readTalerErrorResponse(httpResp);
- return {
- type: TaskRunResultType.Error,
- errorDetail: errResp,
- };
+
+ const httpResp = await wex.http.fetch(purseDepositUrl.href, {
+ method: "POST",
+ body: depositPayload,
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Ok: {
+ const resp = await readSuccessResponseJsonOrThrow(
+ httpResp,
+ codecForAny(),
+ );
+ logger.trace(`purse deposit response: ${j2s(resp)}`);
+ continue;
+ }
+ case HttpStatusCode.Gone: {
+ await ctx.abortTransaction();
+ return TaskRunResult.backoff();
+ }
+ case HttpStatusCode.Conflict: {
+ return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ }
+ default: {
+ const errResp = await readTalerErrorResponse(httpResp);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: errResp,
+ };
+ }
}
}
+
+ // All batches succeeded, we can transition!
+
+ await ctx.transition(async (r) => {
+ if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return TransitionResultType.Stay;
+ }
+ r.status = PeerPullDebitRecordStatus.Done;
+ return TransitionResultType.Transition;
+ });
+ return TaskRunResult.finished();
}
async function processPeerPullDebitAbortingRefresh(
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index b6771be89..63a02d7a7 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -20,6 +20,7 @@ import {
CheckPeerPushDebitResponse,
CoinRefreshRequest,
ContractTermsUtil,
+ ExchangePurseDeposits,
HttpStatusCode,
InitiatePeerPushDebitRequest,
InitiatePeerPushDebitResponse,
@@ -564,12 +565,6 @@ async function processPeerPushDebitCreateReserve(
peerPushInitiation.coinSel,
);
- const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
- exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
- pursePub: peerPushInitiation.pursePub,
- coins,
- });
-
const encryptContractRequest: EncryptContractRequest = {
contractTerms: contractTermsRecord.contractTermsRaw,
mergePriv: peerPushInitiation.mergePriv,
@@ -580,66 +575,115 @@ async function processPeerPushDebitCreateReserve(
nonce: peerPushInitiation.contractEncNonce,
};
- logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`);
-
const econtractResp = await wex.cryptoApi.encryptContractForMerge(
encryptContractRequest,
);
- const createPurseUrl = new URL(
- `purses/${peerPushInitiation.pursePub}/create`,
- peerPushInitiation.exchangeBaseUrl,
- );
+ const maxBatchSize = 100;
- const reqBody = {
- amount: peerPushInitiation.amount,
- merge_pub: peerPushInitiation.mergePub,
- purse_sig: purseSigResp.sig,
- h_contract_terms: hContractTerms,
- purse_expiration: timestampProtocolFromDb(purseExpiration),
- deposits: depositSigsResp.deposits,
- min_age: 0,
- econtract: econtractResp.econtract,
- };
+ for (let i = 0; i < coins.length; i += maxBatchSize) {
+ const batchSize = Math.min(maxBatchSize, coins.length - i);
+ const batchCoins = coins.slice(i, i + batchSize);
- logger.trace(`request body: ${j2s(reqBody)}`);
+ const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
+ exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
+ pursePub: peerPushInitiation.pursePub,
+ coins: batchCoins,
+ });
- const httpResp = await wex.http.fetch(createPurseUrl.href, {
- method: "POST",
- body: reqBody,
- cancellationToken: wex.cancellationToken,
- });
+ if (i == 0) {
+ // First batch creates the purse!
- {
- const resp = await httpResp.json();
- logger.info(`resp: ${j2s(resp)}`);
- }
+ logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`);
- switch (httpResp.status) {
- case HttpStatusCode.Ok:
- break;
- case HttpStatusCode.Forbidden: {
- // FIXME: Store this error!
- await ctx.failTransaction();
- return TaskRunResult.finished();
- }
- case HttpStatusCode.Conflict: {
- // Handle double-spending
- return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
- }
- default: {
- const errResp = await readTalerErrorResponse(httpResp);
- return {
- type: TaskRunResultType.Error,
- errorDetail: errResp,
+ const createPurseUrl = new URL(
+ `purses/${peerPushInitiation.pursePub}/create`,
+ peerPushInitiation.exchangeBaseUrl,
+ );
+
+ const reqBody = {
+ amount: peerPushInitiation.amount,
+ merge_pub: peerPushInitiation.mergePub,
+ purse_sig: purseSigResp.sig,
+ h_contract_terms: hContractTerms,
+ purse_expiration: timestampProtocolFromDb(purseExpiration),
+ deposits: depositSigsResp.deposits,
+ min_age: 0,
+ econtract: econtractResp.econtract,
};
+
+ if (logger.shouldLogTrace()) {
+ logger.trace(`request body: ${j2s(reqBody)}`);
+ }
+
+ const httpResp = await wex.http.fetch(createPurseUrl.href, {
+ method: "POST",
+ body: reqBody,
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Ok:
+ // Possibly on to the next batch.
+ continue;
+ case HttpStatusCode.Forbidden: {
+ // FIXME: Store this error!
+ await ctx.failTransaction();
+ return TaskRunResult.finished();
+ }
+ case HttpStatusCode.Conflict: {
+ // Handle double-spending
+ return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
+ }
+ default: {
+ const errResp = await readTalerErrorResponse(httpResp);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: errResp,
+ };
+ }
+ }
+ } else {
+ const purseDepositUrl = new URL(
+ `purses/${pursePub}/deposit`,
+ peerPushInitiation.exchangeBaseUrl,
+ );
+
+ const depositPayload: ExchangePurseDeposits = {
+ deposits: depositSigsResp.deposits,
+ };
+
+ const httpResp = await wex.http.fetch(purseDepositUrl.href, {
+ method: "POST",
+ body: depositPayload,
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Ok:
+ // Possibly on to the next batch.
+ continue;
+ case HttpStatusCode.Forbidden: {
+ // FIXME: Store this error!
+ await ctx.failTransaction();
+ return TaskRunResult.finished();
+ }
+ case HttpStatusCode.Conflict: {
+ // Handle double-spending
+ return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
+ }
+ default: {
+ const errResp = await readTalerErrorResponse(httpResp);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: errResp,
+ };
+ }
+ }
}
}
- if (httpResp.status !== HttpStatusCode.Ok) {
- // FIXME: do proper error reporting
- throw Error("got error response from exchange");
- }
+ // All batches done!
await transitionPeerPushDebitTransaction(wex, pursePub, {
stFrom: PeerPushDebitStatus.PendingCreatePurse,