diff options
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-pull-debit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-pull-debit.ts | 386 |
1 files changed, 264 insertions, 122 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 6cc552714..0355b58ad 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -33,10 +33,12 @@ import { HttpStatusCode, Logger, NotificationType, + ObservabilityEventType, PeerContractTerms, PreparePeerPullDebitRequest, PreparePeerPullDebitResponse, RefreshReason, + SelectedProspectiveCoin, TalerError, TalerErrorCode, TalerPreciseTimestamp, @@ -124,13 +126,16 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const transactionId = this.transactionId; const ws = this.wex; const peerPullDebitId = this.peerPullDebitId; - await ws.db.runReadWriteTx(["peerPullDebit", "tombstones"], async (tx) => { - const debit = await tx.peerPullDebit.get(peerPullDebitId); - if (debit) { - await tx.peerPullDebit.delete(peerPullDebitId); - await tx.tombstones.put({ id: transactionId }); - } - }); + await ws.db.runReadWriteTx( + { storeNames: ["peerPullDebit", "tombstones"] }, + async (tx) => { + const debit = await tx.peerPullDebit.get(peerPullDebitId); + if (debit) { + await tx.peerPullDebit.delete(peerPullDebitId); + await tx.tombstones.put({ id: transactionId }); + } + }, + ); } async suspendTransaction(): Promise<void> { @@ -139,7 +144,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const wex = this.wex; const peerPullDebitId = this.peerPullDebitId; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPullDebit"], + { storeNames: ["peerPullDebit"] }, async (tx) => { const pullDebitRec = await tx.peerPullDebit.get(peerPullDebitId); if (!pullDebitRec) { @@ -234,6 +239,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { "coinAvailability", "denominations", "refreshGroups", + "refreshSessions", "coins", "coinAvailability", ], @@ -302,7 +308,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const wex = this.wex; const extraStores = opts.extraStores ?? []; const transitionInfo = await wex.db.runReadWriteTx( - ["peerPullDebit", ...extraStores], + { storeNames: ["peerPullDebit", ...extraStores] }, async (tx) => { const pi = await tx.peerPullDebit.get(this.peerPullDebitId); if (!pi) { @@ -369,13 +375,25 @@ async function handlePurseCreationConflict( } } - const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair }); + const coinSelRes = await selectPeerCoins(ws, { + instructedAmount, + repair, + }); - if (coinSelRes.type == "failure") { - // FIXME: Details! - throw Error( - "insufficient balance to re-select coins to repair double spending", - ); + switch (coinSelRes.type) { + case "failure": + // FIXME: Details! + throw Error( + "insufficient balance to re-select coins to repair double spending", + ); + case "prospective": + throw Error( + "insufficient balance to re-select coins to repair double spending (blocked on refresh)", + ); + case "success": + break; + default: + assertUnreachable(coinSelRes); } const totalAmount = await getTotalPeerPaymentCost( @@ -383,7 +401,7 @@ async function handlePurseCreationConflict( coinSelRes.result.coins, ); - await ws.db.runReadWriteTx(["peerPullDebit"], async (tx) => { + await ws.db.runReadWriteTx({ storeNames: ["peerPullDebit"] }, async (tx) => { const myPpi = await tx.peerPullDebit.get(peerPullInc.peerPullDebitId); if (!myPpi) { return; @@ -411,77 +429,176 @@ async function processPeerPullDebitPendingDeposit( wex: WalletExecutionContext, peerPullInc: PeerPullPaymentIncomingRecord, ): Promise<TaskRunResult> { + const ctx = new PeerPullDebitTransactionContext( + wex, + peerPullInc.peerPullDebitId, + ); + const pursePub = peerPullInc.pursePub; const coinSel = peerPullInc.coinSel; + if (!coinSel) { - throw Error("invalid state, no coins selected"); - } + const instructedAmount = Amounts.parseOrThrow(peerPullInc.amount); - const coins = await queryCoinInfosForSelection(wex, coinSel); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); + if (logger.shouldLogTrace()) { + logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`); + } - const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ - exchangeBaseUrl: peerPullInc.exchangeBaseUrl, - pursePub: peerPullInc.pursePub, - coins, - }); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + throw Error("insufficient balance (locked behind refresh)"); + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); + } + + const peerPullDebitId = peerPullInc.peerPullDebitId; + const totalAmount = await getTotalPeerPaymentCost(wex, coins); + + // FIXME: Missing notification here! + + const transitionDone = await wex.db.runReadWriteTx( + { + storeNames: [ + "exchanges", + "coins", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPullDebit", + "coinAvailability", + ], + }, + async (tx) => { + const pi = await tx.peerPullDebit.get(peerPullDebitId); + if (!pi) { + return false; + } + if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return false; + } + if (pi.coinSel) { + return false; + } + await spendCoins(wex, tx, { + // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`, + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullDebitId, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPull, + }); + pi.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + totalCost: Amounts.stringify(totalAmount), + }; + await tx.peerPullDebit.put(pi); + return true; + }, + ); + if (transitionDone) { + return TaskRunResult.progress(); + } else { + return TaskRunResult.backoff(); + } + } const purseDepositUrl = new URL( `purses/${pursePub}/deposit`, peerPullInc.exchangeBaseUrl, ); - const depositPayload: ExchangePurseDeposits = { - deposits: depositSigsResp.deposits, - }; + // FIXME: We could skip batches that we've already submitted. - if (logger.shouldLogTrace()) { - logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); - } + const coins = await queryCoinInfosForSelection(wex, coinSel); - const httpResp = await wex.http.fetch(purseDepositUrl.href, { - method: "POST", - body: depositPayload, - cancellationToken: wex.cancellationToken, - }); + const maxBatchSize = 100; - const ctx = new PeerPullDebitTransactionContext( - wex, - peerPullInc.peerPullDebitId, - ); + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); - switch (httpResp.status) { - case HttpStatusCode.Ok: { - const resp = await readSuccessResponseJsonOrThrow( - httpResp, - codecForAny(), - ); - logger.trace(`purse deposit response: ${j2s(resp)}`); + wex.oc.observe({ + type: ObservabilityEventType.Message, + contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`, + }); - await ctx.transition(async (r) => { - if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return TransitionResultType.Stay; - } - r.status = PeerPullDebitRecordStatus.Done; - return TransitionResultType.Transition; - }); - return TaskRunResult.finished(); - } - case HttpStatusCode.Gone: { - await ctx.abortTransaction(); - return TaskRunResult.backoff(); - } - case HttpStatusCode.Conflict: { - return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + const batchCoins = coins.slice(i, i + batchSize); + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPullInc.exchangeBaseUrl, + pursePub: peerPullInc.pursePub, + coins: batchCoins, + }); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + if (logger.shouldLogTrace()) { + logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, - }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: { + const resp = await readSuccessResponseJsonOrThrow( + httpResp, + codecForAny(), + ); + logger.trace(`purse deposit response: ${j2s(resp)}`); + continue; + } + case HttpStatusCode.Gone: { + await ctx.abortTransaction(); + return TaskRunResult.backoff(); + } + case HttpStatusCode.Conflict: { + return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } } } + + // All batches succeeded, we can transition! + + await ctx.transition(async (r) => { + if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return TransitionResultType.Stay; + } + r.status = PeerPullDebitRecordStatus.Done; + return TransitionResultType.Transition; + }); + return TaskRunResult.finished(); } async function processPeerPullDebitAbortingRefresh( @@ -496,7 +613,7 @@ async function processPeerPullDebitAbortingRefresh( peerPullDebitId, }); const transitionInfo = await wex.db.runReadWriteTx( - ["peerPullDebit", "refreshGroups"], + { storeNames: ["peerPullDebit", "refreshGroups"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPullDebitRecordStatus | undefined; @@ -538,7 +655,7 @@ export async function processPeerPullDebit( peerPullDebitId: string, ): Promise<TaskRunResult> { const peerPullInc = await wex.db.runReadOnlyTx( - ["peerPullDebit"], + { storeNames: ["peerPullDebit"] }, async (tx) => { return tx.peerPullDebit.get(peerPullDebitId); }, @@ -568,7 +685,7 @@ export async function confirmPeerPullDebit( peerPullDebitId = parsedTx.peerPullDebitId; const peerPullInc = await wex.db.runReadOnlyTx( - ["peerPullDebit"], + { storeNames: ["peerPullDebit"] }, async (tx) => { return tx.peerPullDebit.get(peerPullDebitId); }, @@ -582,62 +699,77 @@ export async function confirmPeerPullDebit( const instructedAmount = Amounts.parseOrThrow(peerPullInc.amount); - const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); if (logger.shouldLogTrace()) { logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`); } - if (coinSelRes.type !== "success") { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, - { - insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, - }, - ); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); } - const sel = coinSelRes.result; + const totalAmount = await getTotalPeerPaymentCost(wex, coins); - const totalAmount = await getTotalPeerPaymentCost( - wex, - coinSelRes.result.coins, - ); + // FIXME: Missing notification here! await wex.db.runReadWriteTx( - [ - "exchanges", - "coins", - "denominations", - "refreshGroups", - "peerPullDebit", - "coinAvailability", - ], + { + storeNames: [ + "exchanges", + "coins", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPullDebit", + "coinAvailability", + ], + }, async (tx) => { - await spendCoins(wex, tx, { - // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`, - allocationId: constructTransactionIdentifier({ - tag: TransactionType.PeerPullDebit, - peerPullDebitId, - }), - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => - Amounts.parseOrThrow(x.contribution), - ), - refreshReason: RefreshReason.PayPeerPull, - }); - const pi = await tx.peerPullDebit.get(peerPullDebitId); if (!pi) { throw Error(); } - if (pi.status === PeerPullDebitRecordStatus.DialogProposed) { - pi.status = PeerPullDebitRecordStatus.PendingDeposit; + if (pi.status !== PeerPullDebitRecordStatus.DialogProposed) { + return; + } + if (coinSelRes.type == "success") { + await spendCoins(wex, tx, { + // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`, + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullDebitId, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPull, + }); pi.coinSel = { - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => x.contribution), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), totalCost: Amounts.stringify(totalAmount), }; } + pi.status = PeerPullDebitRecordStatus.PendingDeposit; await tx.peerPullDebit.put(pi); }, ); @@ -673,7 +805,7 @@ export async function preparePeerPullDebit( } const existing = await wex.db.runReadOnlyTx( - ["peerPullDebit", "contractTerms"], + { storeNames: ["peerPullDebit", "contractTerms"] }, async (tx) => { const peerPullDebitRecord = await tx.peerPullDebit.indexes.byExchangeAndContractPriv.get([ @@ -756,27 +888,37 @@ export async function preparePeerPullDebit( const instructedAmount = Amounts.parseOrThrow(contractTerms.amount); - const coinSelRes = await selectPeerCoins(wex, { instructedAmount }); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); if (logger.shouldLogTrace()) { logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`); } - if (coinSelRes.type !== "success") { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, - { - insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, - }, - ); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); } - const totalAmount = await getTotalPeerPaymentCost( - wex, - coinSelRes.result.coins, - ); + const totalAmount = await getTotalPeerPaymentCost(wex, coins); await wex.db.runReadWriteTx( - ["peerPullDebit", "contractTerms"], + { storeNames: ["peerPullDebit", "contractTerms"] }, async (tx) => { await tx.contractTerms.put({ h: contractTermsHash, |