diff options
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-push-debit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-push-debit.ts | 485 |
1 files changed, 327 insertions, 158 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index ab80888eb..3a936fb04 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -20,12 +20,14 @@ import { CheckPeerPushDebitResponse, CoinRefreshRequest, ContractTermsUtil, + ExchangePurseDeposits, HttpStatusCode, InitiatePeerPushDebitRequest, InitiatePeerPushDebitResponse, Logger, NotificationType, RefreshReason, + SelectedProspectiveCoin, TalerError, TalerErrorCode, TalerPreciseTimestamp, @@ -38,6 +40,7 @@ import { TransactionState, TransactionType, assertUnreachable, + checkDbInvariant, checkLogicInvariant, encodeCrock, getRandomBytes, @@ -101,19 +104,22 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const { wex, pursePub, transactionId } = this; - await wex.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => { - const debit = await tx.peerPushDebit.get(pursePub); - if (debit) { - await tx.peerPushDebit.delete(pursePub); - await tx.tombstones.put({ id: transactionId }); - } - }); + await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit", "tombstones"] }, + async (tx) => { + const debit = await tx.peerPushDebit.get(pursePub); + if (debit) { + await tx.peerPushDebit.delete(pursePub); + await tx.tombstones.put({ id: transactionId }); + } + }, + ); } async suspendTransaction(): Promise<void> { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { @@ -171,7 +177,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async abortTransaction(): Promise<void> { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { @@ -225,7 +231,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async resumeTransaction(): Promise<void> { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { @@ -283,7 +289,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async failTransaction(): Promise<void> { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { @@ -343,20 +349,29 @@ export async function checkPeerPushDebit( logger.trace( `checking peer push debit for ${Amounts.stringify(instructedAmount)}`, ); - const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); - if (coinSelRes.type === "failure") { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, - { - insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, - }, - ); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); } - logger.trace(`selected peer coins (len=${coinSelRes.result.coins.length})`); - const totalAmount = await getTotalPeerPaymentCost( - wex, - coinSelRes.result.coins, - ); + logger.trace(`selected peer coins (len=${coins.length})`); + const totalAmount = await getTotalPeerPaymentCost(wex, coins); logger.trace("computed total peer payment cost"); return { exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl, @@ -391,6 +406,8 @@ async function handlePurseCreationConflict( const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount); const sel = peerPushInitiation.coinSel; + checkDbInvariant(!!sel, `no coin selected for peer push initiation ${peerPushInitiation.pursePub}`); + const repair: PreviousPayCoins = []; for (let i = 0; i < sel.coinPubs.length; i++) { @@ -402,16 +419,25 @@ async function handlePurseCreationConflict( } } - const coinSelRes = await selectPeerCoins(wex, { instructedAmount, repair }); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + repair, + }); - if (coinSelRes.type == "failure") { - // FIXME: Details! - throw Error( - "insufficient balance to re-select coins to repair double spending", - ); + switch (coinSelRes.type) { + case "failure": + case "prospective": + // FIXME: Details! + throw Error( + "insufficient balance to re-select coins to repair double spending", + ); + case "success": + break; + default: + assertUnreachable(coinSelRes); } - await wex.db.runReadWriteTx(["peerPushDebit"], async (tx) => { + await wex.db.runReadWriteTx({ storeNames: ["peerPushDebit"] }, async (tx) => { const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); if (!myPpi) { return; @@ -447,7 +473,7 @@ async function processPeerPushDebitCreateReserve( logger.trace(`processing ${transactionId} pending(create-reserve)`); const contractTermsRecord = await wex.db.runReadOnlyTx( - ["contractTerms"], + { storeNames: ["contractTerms"] }, async (tx) => { return tx.contractTerms.get(hContractTerms); }, @@ -459,6 +485,77 @@ async function processPeerPushDebitCreateReserve( ); } + if (!peerPushInitiation.coinSel) { + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount: Amounts.parseOrThrow(peerPushInitiation.amount), + }); + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + throw Error("insufficient funds (blocked on refresh)"); + case "success": + break; + default: + assertUnreachable(coinSelRes); + } + const transitionDone = await wex.db.runReadWriteTx( + { + storeNames: [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPushDebit", + ], + }, + async (tx) => { + const ppi = await tx.peerPushDebit.get(pursePub); + if (!ppi) { + return false; + } + if (ppi.coinSel) { + return false; + } + + ppi.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + }; + // FIXME: Instead of directly doing a spendCoin here, + // we might want to mark the coins as used and spend them + // after we've been able to create the purse. + await spendCoins(wex, tx, { + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPush, + }); + + await tx.peerPushDebit.put(ppi); + return true; + }, + ); + if (transitionDone) { + return TaskRunResult.progress(); + } + return TaskRunResult.backoff(); + } + const purseSigResp = await wex.cryptoApi.signPurseCreation({ hContractTerms, mergePub: peerPushInitiation.mergePub, @@ -473,12 +570,6 @@ async function processPeerPushDebitCreateReserve( peerPushInitiation.coinSel, ); - const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ - exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, - pursePub: peerPushInitiation.pursePub, - coins, - }); - const encryptContractRequest: EncryptContractRequest = { contractTerms: contractTermsRecord.contractTermsRaw, mergePriv: peerPushInitiation.mergePriv, @@ -489,66 +580,115 @@ async function processPeerPushDebitCreateReserve( nonce: peerPushInitiation.contractEncNonce, }; - logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); - const econtractResp = await wex.cryptoApi.encryptContractForMerge( encryptContractRequest, ); - const createPurseUrl = new URL( - `purses/${peerPushInitiation.pursePub}/create`, - peerPushInitiation.exchangeBaseUrl, - ); + const maxBatchSize = 100; - const reqBody = { - amount: peerPushInitiation.amount, - merge_pub: peerPushInitiation.mergePub, - purse_sig: purseSigResp.sig, - h_contract_terms: hContractTerms, - purse_expiration: timestampProtocolFromDb(purseExpiration), - deposits: depositSigsResp.deposits, - min_age: 0, - econtract: econtractResp.econtract, - }; + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); + const batchCoins = coins.slice(i, i + batchSize); - logger.trace(`request body: ${j2s(reqBody)}`); + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, + pursePub: peerPushInitiation.pursePub, + coins: batchCoins, + }); - const httpResp = await wex.http.fetch(createPurseUrl.href, { - method: "POST", - body: reqBody, - cancellationToken: wex.cancellationToken, - }); + if (i == 0) { + // First batch creates the purse! - { - const resp = await httpResp.json(); - logger.info(`resp: ${j2s(resp)}`); - } + logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); - switch (httpResp.status) { - case HttpStatusCode.Ok: - break; - case HttpStatusCode.Forbidden: { - // FIXME: Store this error! - await ctx.failTransaction(); - return TaskRunResult.finished(); - } - case HttpStatusCode.Conflict: { - // Handle double-spending - return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); - } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, + const createPurseUrl = new URL( + `purses/${peerPushInitiation.pursePub}/create`, + peerPushInitiation.exchangeBaseUrl, + ); + + const reqBody = { + amount: peerPushInitiation.amount, + merge_pub: peerPushInitiation.mergePub, + purse_sig: purseSigResp.sig, + h_contract_terms: hContractTerms, + purse_expiration: timestampProtocolFromDb(purseExpiration), + deposits: depositSigsResp.deposits, + min_age: 0, + econtract: econtractResp.econtract, }; + + if (logger.shouldLogTrace()) { + logger.trace(`request body: ${j2s(reqBody)}`); + } + + const httpResp = await wex.http.fetch(createPurseUrl.href, { + method: "POST", + body: reqBody, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } + } else { + const purseDepositUrl = new URL( + `purses/${pursePub}/deposit`, + peerPushInitiation.exchangeBaseUrl, + ); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } } } - if (httpResp.status !== HttpStatusCode.Ok) { - // FIXME: do proper error reporting - throw Error("got error response from exchange"); - } + // All batches done! await transitionPeerPushDebitTransaction(wex, pursePub, { stFrom: PeerPushDebitStatus.PendingCreatePurse, @@ -585,13 +725,16 @@ async function processPeerPushDebitAbortingDeletePurse( logger.info(`deleted purse with response status ${resp.status}`); const transitionInfo = await wex.db.runReadWriteTx( - [ - "peerPushDebit", - "refreshGroups", - "denominations", - "coinAvailability", - "coins", - ], + { + storeNames: [ + "peerPushDebit", + "refreshGroups", + "refreshSessions", + "denominations", + "coinAvailability", + "coins", + ], + }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { @@ -604,6 +747,10 @@ async function processPeerPushDebitAbortingDeletePurse( const oldTxState = computePeerPushDebitTransactionState(ppiRec); const coinPubs: CoinRefreshRequest[] = []; + if (!ppiRec.coinSel) { + return undefined; + } + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { coinPubs.push({ amount: ppiRec.coinSel.contributions[i], @@ -639,6 +786,7 @@ interface SimpleTransition { stTo: PeerPushDebitStatus; } +// FIXME: This should be a transition on the peer push debit transaction context! async function transitionPeerPushDebitTransaction( wex: WalletExecutionContext, pursePub: string, @@ -649,7 +797,7 @@ async function transitionPeerPushDebitTransaction( pursePub, }); const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { @@ -686,7 +834,7 @@ async function processPeerPushDebitAbortingRefreshDeleted( await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId); } const transitionInfo = await wex.db.runReadWriteTx( - ["refreshGroups", "peerPushDebit"], + { storeNames: ["refreshGroups", "peerPushDebit"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; @@ -735,7 +883,7 @@ async function processPeerPushDebitAbortingRefreshExpired( pursePub: peerPushInitiation.pursePub, }); const transitionInfo = await wex.db.runReadWriteTx( - ["peerPushDebit", "refreshGroups"], + { storeNames: ["peerPushDebit", "refreshGroups"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; @@ -818,13 +966,16 @@ async function processPeerPushDebitReady( } else if (resp.status === HttpStatusCode.Gone) { logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); const transitionInfo = await wex.db.runReadWriteTx( - [ - "peerPushDebit", - "refreshGroups", - "denominations", - "coinAvailability", - "coins", - ], + { + storeNames: [ + "peerPushDebit", + "refreshGroups", + "refreshSessions", + "denominations", + "coinAvailability", + "coins", + ], + }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { @@ -837,23 +988,26 @@ async function processPeerPushDebitReady( const oldTxState = computePeerPushDebitTransactionState(ppiRec); const coinPubs: CoinRefreshRequest[] = []; - for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: ppiRec.coinSel.contributions[i], - coinPub: ppiRec.coinSel.coinPubs[i], - }); + if (ppiRec.coinSel) { + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: ppiRec.coinSel.contributions[i], + coinPub: ppiRec.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + transactionId, + ); + + ppiRec.abortRefreshGroupId = refresh.refreshGroupId; } - - const refresh = await createRefreshGroup( - wex, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - transactionId, - ); ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; - ppiRec.abortRefreshGroupId = refresh.refreshGroupId; await tx.peerPushDebit.put(ppiRec); const newTxState = computePeerPushDebitTransactionState(ppiRec); return { @@ -875,7 +1029,7 @@ export async function processPeerPushDebit( pursePub: string, ): Promise<TaskRunResult> { const peerPushInitiation = await wex.db.runReadOnlyTx( - ["peerPushDebit"], + { storeNames: ["peerPushDebit"] }, async (tx) => { return tx.peerPushDebit.get(pursePub); }, @@ -932,15 +1086,28 @@ export async function initiatePeerPushDebit( const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({}); - const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); - if (coinSelRes.type !== "success") { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, - { - insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, - }, - ); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); } const sel = coinSelRes.result; @@ -948,10 +1115,7 @@ export async function initiatePeerPushDebit( logger.info(`selected p2p coins (push):`); logger.trace(`${j2s(coinSelRes)}`); - const totalAmount = await getTotalPeerPaymentCost( - wex, - coinSelRes.result.coins, - ); + const totalAmount = await getTotalPeerPaymentCost(wex, coins); logger.info(`computed total peer payment cost`); @@ -964,31 +1128,19 @@ export async function initiatePeerPushDebit( const contractEncNonce = encodeCrock(getRandomBytes(24)); const transitionInfo = await wex.db.runReadWriteTx( - [ - "exchanges", - "contractTerms", - "coins", - "coinAvailability", - "denominations", - "refreshGroups", - "peerPushDebit", - ], + { + storeNames: [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPushDebit", + ], + }, async (tx) => { - // FIXME: Instead of directly doing a spendCoin here, - // we might want to mark the coins as used and spend them - // after we've been able to create the purse. - await spendCoins(wex, tx, { - allocationId: constructTransactionIdentifier({ - tag: TransactionType.PeerPushDebit, - pursePub: pursePair.pub, - }), - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPush, - }); - const ppi: PeerPushDebitRecord = { amount: Amounts.stringify(instructedAmount), contractPriv: contractKeyPair.priv, @@ -1003,13 +1155,30 @@ export async function initiatePeerPushDebit( timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), status: PeerPushDebitStatus.PendingCreatePurse, contractEncNonce, - coinSel: { - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => x.contribution), - }, totalCost: Amounts.stringify(totalAmount), }; + if (coinSelRes.type === "success") { + ppi.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + }; + // FIXME: Instead of directly doing a spendCoin here, + // we might want to mark the coins as used and spend them + // after we've been able to create the purse. + await spendCoins(wex, tx, { + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: pursePair.pub, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPush, + }); + } + await tx.peerPushDebit.add(ppi); await tx.contractTerms.put({ |