/* This file is part of GNU Taler (C) 2022-2023 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ import { Amounts, CheckPeerPushDebitRequest, CheckPeerPushDebitResponse, CoinRefreshRequest, ContractTermsUtil, ExchangePurseDeposits, HttpStatusCode, InitiatePeerPushDebitRequest, InitiatePeerPushDebitResponse, Logger, NotificationType, RefreshReason, SelectedProspectiveCoin, TalerError, TalerErrorCode, TalerPreciseTimestamp, TalerProtocolTimestamp, TalerProtocolViolationError, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, TransactionType, assertUnreachable, checkDbInvariant, checkLogicInvariant, encodeCrock, getRandomBytes, j2s, } from "@gnu-taler/taler-util"; import { HttpResponse, readSuccessResponseJsonOrThrow, readTalerErrorResponse, } from "@gnu-taler/taler-util/http"; import { PreviousPayCoins, selectPeerCoins } from "./coinSelection.js"; import { PendingTaskType, TaskIdStr, TaskRunResult, TaskRunResultType, TransactionContext, constructTaskIdentifier, spendCoins, } from "./common.js"; import { EncryptContractRequest } from "./crypto/cryptoTypes.js"; import { PeerPushDebitRecord, PeerPushDebitStatus, RefreshOperationStatus, timestampPreciseToDb, timestampProtocolFromDb, timestampProtocolToDb, } from "./db.js"; import { codecForExchangePurseStatus, getTotalPeerPaymentCost, queryCoinInfosForSelection, } from "./pay-peer-common.js"; import { createRefreshGroup, waitRefreshFinal } from "./refresh.js"; import { constructTransactionIdentifier, notifyTransition, } from "./transactions.js"; import { WalletExecutionContext } from "./wallet.js"; const logger = new Logger("pay-peer-push-debit.ts"); export class PeerPushDebitTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; constructor( public wex: WalletExecutionContext, public pursePub: string, ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub, }); this.taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub, }); } async deleteTransaction(): Promise { const { wex, pursePub, transactionId } = this; await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit", "tombstones"] }, async (tx) => { const debit = await tx.peerPushDebit.get(pursePub); if (debit) { await tx.peerPushDebit.delete(pursePub); await tx.tombstones.put({ id: transactionId }); } }, ); } async suspendTransaction(): Promise { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); return; } let newStatus: PeerPushDebitStatus | undefined = undefined; switch (pushDebitRec.status) { case PeerPushDebitStatus.PendingCreatePurse: newStatus = PeerPushDebitStatus.SuspendedCreatePurse; break; case PeerPushDebitStatus.AbortingRefreshDeleted: newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshDeleted; break; case PeerPushDebitStatus.AbortingRefreshExpired: newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshExpired; break; case PeerPushDebitStatus.AbortingDeletePurse: newStatus = PeerPushDebitStatus.SuspendedAbortingDeletePurse; break; case PeerPushDebitStatus.PendingReady: newStatus = PeerPushDebitStatus.SuspendedReady; break; case PeerPushDebitStatus.SuspendedAbortingDeletePurse: case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: case PeerPushDebitStatus.SuspendedReady: case PeerPushDebitStatus.SuspendedCreatePurse: case PeerPushDebitStatus.Done: case PeerPushDebitStatus.Aborted: case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing break; default: assertUnreachable(pushDebitRec.status); } if (newStatus != null) { const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); pushDebitRec.status = newStatus; const newTxState = computePeerPushDebitTransactionState(pushDebitRec); await tx.peerPushDebit.put(pushDebitRec); return { oldTxState, newTxState, }; } return undefined; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); return; } let newStatus: PeerPushDebitStatus | undefined = undefined; switch (pushDebitRec.status) { case PeerPushDebitStatus.PendingReady: case PeerPushDebitStatus.SuspendedReady: newStatus = PeerPushDebitStatus.AbortingDeletePurse; break; case PeerPushDebitStatus.SuspendedCreatePurse: case PeerPushDebitStatus.PendingCreatePurse: // Network request might already be in-flight! newStatus = PeerPushDebitStatus.AbortingDeletePurse; break; case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: case PeerPushDebitStatus.SuspendedAbortingDeletePurse: case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: case PeerPushDebitStatus.AbortingRefreshDeleted: case PeerPushDebitStatus.AbortingRefreshExpired: case PeerPushDebitStatus.Done: case PeerPushDebitStatus.AbortingDeletePurse: case PeerPushDebitStatus.Aborted: case PeerPushDebitStatus.Expired: case PeerPushDebitStatus.Failed: // Do nothing break; default: assertUnreachable(pushDebitRec.status); } if (newStatus != null) { const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); pushDebitRec.status = newStatus; const newTxState = computePeerPushDebitTransactionState(pushDebitRec); await tx.peerPushDebit.put(pushDebitRec); return { oldTxState, newTxState, }; } return undefined; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); return; } let newStatus: PeerPushDebitStatus | undefined = undefined; switch (pushDebitRec.status) { case PeerPushDebitStatus.SuspendedAbortingDeletePurse: newStatus = PeerPushDebitStatus.AbortingDeletePurse; break; case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: newStatus = PeerPushDebitStatus.AbortingRefreshDeleted; break; case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: newStatus = PeerPushDebitStatus.AbortingRefreshExpired; break; case PeerPushDebitStatus.SuspendedReady: newStatus = PeerPushDebitStatus.PendingReady; break; case PeerPushDebitStatus.SuspendedCreatePurse: newStatus = PeerPushDebitStatus.PendingCreatePurse; break; case PeerPushDebitStatus.PendingCreatePurse: case PeerPushDebitStatus.AbortingRefreshDeleted: case PeerPushDebitStatus.AbortingRefreshExpired: case PeerPushDebitStatus.AbortingDeletePurse: case PeerPushDebitStatus.PendingReady: case PeerPushDebitStatus.Done: case PeerPushDebitStatus.Aborted: case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing break; default: assertUnreachable(pushDebitRec.status); } if (newStatus != null) { const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); pushDebitRec.status = newStatus; const newTxState = computePeerPushDebitTransactionState(pushDebitRec); await tx.peerPushDebit.put(pushDebitRec); return { oldTxState, newTxState, }; } return undefined; }, ); wex.taskScheduler.startShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); } async failTransaction(): Promise { const { wex, pursePub, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit"] }, async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); return; } let newStatus: PeerPushDebitStatus | undefined = undefined; switch (pushDebitRec.status) { case PeerPushDebitStatus.AbortingRefreshDeleted: case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: // FIXME: What to do about the refresh group? newStatus = PeerPushDebitStatus.Failed; break; case PeerPushDebitStatus.AbortingDeletePurse: case PeerPushDebitStatus.SuspendedAbortingDeletePurse: case PeerPushDebitStatus.AbortingRefreshExpired: case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: case PeerPushDebitStatus.PendingReady: case PeerPushDebitStatus.SuspendedReady: case PeerPushDebitStatus.SuspendedCreatePurse: case PeerPushDebitStatus.PendingCreatePurse: newStatus = PeerPushDebitStatus.Failed; break; case PeerPushDebitStatus.Done: case PeerPushDebitStatus.Aborted: case PeerPushDebitStatus.Failed: case PeerPushDebitStatus.Expired: // Do nothing break; default: assertUnreachable(pushDebitRec.status); } if (newStatus != null) { const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); pushDebitRec.status = newStatus; const newTxState = computePeerPushDebitTransactionState(pushDebitRec); await tx.peerPushDebit.put(pushDebitRec); return { oldTxState, newTxState, }; } return undefined; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); } } export async function checkPeerPushDebit( wex: WalletExecutionContext, req: CheckPeerPushDebitRequest, ): Promise { const instructedAmount = Amounts.parseOrThrow(req.amount); logger.trace( `checking peer push debit for ${Amounts.stringify(instructedAmount)}`, ); const coinSelRes = await selectPeerCoins(wex, { instructedAmount, }); let coins: SelectedProspectiveCoin[] | undefined = undefined; switch (coinSelRes.type) { case "failure": throw TalerError.fromDetail( TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, }, ); case "prospective": coins = coinSelRes.result.prospectiveCoins; break; case "success": coins = coinSelRes.result.coins; break; default: assertUnreachable(coinSelRes); } logger.trace(`selected peer coins (len=${coins.length})`); const totalAmount = await getTotalPeerPaymentCost(wex, coins); logger.trace("computed total peer payment cost"); return { exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl, amountEffective: Amounts.stringify(totalAmount), amountRaw: req.amount, maxExpirationDate: coinSelRes.result.maxExpirationDate, }; } async function handlePurseCreationConflict( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, resp: HttpResponse, ): Promise { const pursePub = peerPushInitiation.pursePub; const errResp = await readTalerErrorResponse(resp); const ctx = new PeerPushDebitTransactionContext(wex, pursePub); if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) { await ctx.failTransaction(); return TaskRunResult.finished(); } // FIXME: Properly parse! const brokenCoinPub = (errResp as any).coin_pub; logger.trace(`excluded broken coin pub=${brokenCoinPub}`); if (!brokenCoinPub) { // FIXME: Details! throw new TalerProtocolViolationError(); } const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount); const sel = peerPushInitiation.coinSel; checkDbInvariant(!!sel); const repair: PreviousPayCoins = []; for (let i = 0; i < sel.coinPubs.length; i++) { if (sel.coinPubs[i] != brokenCoinPub) { repair.push({ coinPub: sel.coinPubs[i], contribution: Amounts.parseOrThrow(sel.contributions[i]), }); } } const coinSelRes = await selectPeerCoins(wex, { instructedAmount, repair, }); switch (coinSelRes.type) { case "failure": case "prospective": // FIXME: Details! throw Error( "insufficient balance to re-select coins to repair double spending", ); case "success": break; default: assertUnreachable(coinSelRes); } await wex.db.runReadWriteTx({ storeNames: ["peerPushDebit"] }, async (tx) => { const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); if (!myPpi) { return; } switch (myPpi.status) { case PeerPushDebitStatus.PendingCreatePurse: case PeerPushDebitStatus.SuspendedCreatePurse: { const sel = coinSelRes.result; myPpi.coinSel = { coinPubs: sel.coins.map((x) => x.coinPub), contributions: sel.coins.map((x) => x.contribution), }; break; } default: return; } await tx.peerPushDebit.put(myPpi); }); return TaskRunResult.progress(); } async function processPeerPushDebitCreateReserve( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const pursePub = peerPushInitiation.pursePub; const purseExpiration = peerPushInitiation.purseExpiration; const hContractTerms = peerPushInitiation.contractTermsHash; const ctx = new PeerPushDebitTransactionContext(wex, pursePub); const transactionId = ctx.transactionId; logger.trace(`processing ${transactionId} pending(create-reserve)`); const contractTermsRecord = await wex.db.runReadOnlyTx( { storeNames: ["contractTerms"] }, async (tx) => { return tx.contractTerms.get(hContractTerms); }, ); if (!contractTermsRecord) { throw Error( `db invariant failed, contract terms for ${transactionId} missing`, ); } if (!peerPushInitiation.coinSel) { const coinSelRes = await selectPeerCoins(wex, { instructedAmount: Amounts.parseOrThrow(peerPushInitiation.amount), }); switch (coinSelRes.type) { case "failure": throw TalerError.fromDetail( TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, }, ); case "prospective": throw Error("insufficient funds (blocked on refresh)"); case "success": break; default: assertUnreachable(coinSelRes); } const transitionDone = await wex.db.runReadWriteTx( { storeNames: [ "exchanges", "contractTerms", "coins", "coinAvailability", "denominations", "refreshGroups", "refreshSessions", "peerPushDebit", ], }, async (tx) => { const ppi = await tx.peerPushDebit.get(pursePub); if (!ppi) { return false; } if (ppi.coinSel) { return false; } ppi.coinSel = { coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), contributions: coinSelRes.result.coins.map((x) => x.contribution), }; // FIXME: Instead of directly doing a spendCoin here, // we might want to mark the coins as used and spend them // after we've been able to create the purse. await spendCoins(wex, tx, { allocationId: constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub, }), coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), contributions: coinSelRes.result.coins.map((x) => Amounts.parseOrThrow(x.contribution), ), refreshReason: RefreshReason.PayPeerPush, }); await tx.peerPushDebit.put(ppi); return true; }, ); if (transitionDone) { return TaskRunResult.progress(); } return TaskRunResult.backoff(); } const purseSigResp = await wex.cryptoApi.signPurseCreation({ hContractTerms, mergePub: peerPushInitiation.mergePub, minAge: 0, purseAmount: peerPushInitiation.amount, purseExpiration: timestampProtocolFromDb(purseExpiration), pursePriv: peerPushInitiation.pursePriv, }); const coins = await queryCoinInfosForSelection( wex, peerPushInitiation.coinSel, ); const encryptContractRequest: EncryptContractRequest = { contractTerms: contractTermsRecord.contractTermsRaw, mergePriv: peerPushInitiation.mergePriv, pursePriv: peerPushInitiation.pursePriv, pursePub: peerPushInitiation.pursePub, contractPriv: peerPushInitiation.contractPriv, contractPub: peerPushInitiation.contractPub, nonce: peerPushInitiation.contractEncNonce, }; const econtractResp = await wex.cryptoApi.encryptContractForMerge( encryptContractRequest, ); const maxBatchSize = 100; for (let i = 0; i < coins.length; i += maxBatchSize) { const batchSize = Math.min(maxBatchSize, coins.length - i); const batchCoins = coins.slice(i, i + batchSize); const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, pursePub: peerPushInitiation.pursePub, coins: batchCoins, }); if (i == 0) { // First batch creates the purse! logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); const createPurseUrl = new URL( `purses/${peerPushInitiation.pursePub}/create`, peerPushInitiation.exchangeBaseUrl, ); const reqBody = { amount: peerPushInitiation.amount, merge_pub: peerPushInitiation.mergePub, purse_sig: purseSigResp.sig, h_contract_terms: hContractTerms, purse_expiration: timestampProtocolFromDb(purseExpiration), deposits: depositSigsResp.deposits, min_age: 0, econtract: econtractResp.econtract, }; if (logger.shouldLogTrace()) { logger.trace(`request body: ${j2s(reqBody)}`); } const httpResp = await wex.http.fetch(createPurseUrl.href, { method: "POST", body: reqBody, cancellationToken: wex.cancellationToken, }); switch (httpResp.status) { case HttpStatusCode.Ok: // Possibly on to the next batch. continue; case HttpStatusCode.Forbidden: { // FIXME: Store this error! await ctx.failTransaction(); return TaskRunResult.finished(); } case HttpStatusCode.Conflict: { // Handle double-spending return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); } default: { const errResp = await readTalerErrorResponse(httpResp); return { type: TaskRunResultType.Error, errorDetail: errResp, }; } } } else { const purseDepositUrl = new URL( `purses/${pursePub}/deposit`, peerPushInitiation.exchangeBaseUrl, ); const depositPayload: ExchangePurseDeposits = { deposits: depositSigsResp.deposits, }; const httpResp = await wex.http.fetch(purseDepositUrl.href, { method: "POST", body: depositPayload, cancellationToken: wex.cancellationToken, }); switch (httpResp.status) { case HttpStatusCode.Ok: // Possibly on to the next batch. continue; case HttpStatusCode.Forbidden: { // FIXME: Store this error! await ctx.failTransaction(); return TaskRunResult.finished(); } case HttpStatusCode.Conflict: { // Handle double-spending return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); } default: { const errResp = await readTalerErrorResponse(httpResp); return { type: TaskRunResultType.Error, errorDetail: errResp, }; } } } } // All batches done! await transitionPeerPushDebitTransaction(wex, pursePub, { stFrom: PeerPushDebitStatus.PendingCreatePurse, stTo: PeerPushDebitStatus.PendingReady, }); return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingDeletePurse( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const { pursePub, pursePriv } = peerPushInitiation; const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub, }); const sigResp = await wex.cryptoApi.signDeletePurse({ pursePriv, }); const purseUrl = new URL( `purses/${pursePub}`, peerPushInitiation.exchangeBaseUrl, ); const resp = await wex.http.fetch(purseUrl.href, { method: "DELETE", headers: { "taler-purse-signature": sigResp.sig, }, cancellationToken: wex.cancellationToken, }); logger.info(`deleted purse with response status ${resp.status}`); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: [ "peerPushDebit", "refreshGroups", "refreshSessions", "denominations", "coinAvailability", "coins", ], }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; } if (ppiRec.status !== PeerPushDebitStatus.AbortingDeletePurse) { return undefined; } const currency = Amounts.currencyOf(ppiRec.amount); const oldTxState = computePeerPushDebitTransactionState(ppiRec); const coinPubs: CoinRefreshRequest[] = []; if (!ppiRec.coinSel) { return undefined; } for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { coinPubs.push({ amount: ppiRec.coinSel.contributions[i], coinPub: ppiRec.coinSel.coinPubs[i], }); } const refresh = await createRefreshGroup( wex, tx, currency, coinPubs, RefreshReason.AbortPeerPushDebit, transactionId, ); ppiRec.status = PeerPushDebitStatus.AbortingRefreshDeleted; ppiRec.abortRefreshGroupId = refresh.refreshGroupId; await tx.peerPushDebit.put(ppiRec); const newTxState = computePeerPushDebitTransactionState(ppiRec); return { oldTxState, newTxState, }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } interface SimpleTransition { stFrom: PeerPushDebitStatus; stTo: PeerPushDebitStatus; } // FIXME: This should be a transition on the peer push debit transaction context! async function transitionPeerPushDebitTransaction( wex: WalletExecutionContext, pursePub: string, transitionSpec: SimpleTransition, ): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub, }); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit"] }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; } if (ppiRec.status !== transitionSpec.stFrom) { return undefined; } const oldTxState = computePeerPushDebitTransactionState(ppiRec); ppiRec.status = transitionSpec.stTo; await tx.peerPushDebit.put(ppiRec); const newTxState = computePeerPushDebitTransactionState(ppiRec); return { oldTxState, newTxState, }; }, ); notifyTransition(wex, transactionId, transitionInfo); } async function processPeerPushDebitAbortingRefreshDeleted( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const pursePub = peerPushInitiation.pursePub; const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); if (peerPushInitiation.abortRefreshGroupId) { await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId); } const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["refreshGroups", "peerPushDebit"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; if (!refreshGroup) { // Maybe it got manually deleted? Means that we should // just go into failed. logger.warn("no aborting refresh group found for deposit group"); newOpState = PeerPushDebitStatus.Failed; } else { if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { newOpState = PeerPushDebitStatus.Aborted; } else if ( refreshGroup.operationStatus === RefreshOperationStatus.Failed ) { newOpState = PeerPushDebitStatus.Failed; } } if (newOpState) { const newDg = await tx.peerPushDebit.get(pursePub); if (!newDg) { return; } const oldTxState = computePeerPushDebitTransactionState(newDg); newDg.status = newOpState; const newTxState = computePeerPushDebitTransactionState(newDg); await tx.peerPushDebit.put(newDg); return { oldTxState, newTxState }; } return undefined; }, ); notifyTransition(wex, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingRefreshExpired( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { const pursePub = peerPushInitiation.pursePub; const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["peerPushDebit", "refreshGroups"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; if (!refreshGroup) { // Maybe it got manually deleted? Means that we should // just go into failed. logger.warn("no aborting refresh group found for deposit group"); newOpState = PeerPushDebitStatus.Failed; } else { if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { newOpState = PeerPushDebitStatus.Expired; } else if ( refreshGroup.operationStatus === RefreshOperationStatus.Failed ) { newOpState = PeerPushDebitStatus.Failed; } } if (newOpState) { const newDg = await tx.peerPushDebit.get(pursePub); if (!newDg) { return; } const oldTxState = computePeerPushDebitTransactionState(newDg); newDg.status = newOpState; const newTxState = computePeerPushDebitTransactionState(newDg); await tx.peerPushDebit.put(newDg); return { oldTxState, newTxState }; } return undefined; }, ); notifyTransition(wex, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); } /** * Process the "pending(ready)" state of a peer-push-debit transaction. */ async function processPeerPushDebitReady( wex: WalletExecutionContext, peerPushInitiation: PeerPushDebitRecord, ): Promise { logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; const transactionId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub, }); const mergeUrl = new URL( `purses/${pursePub}/merge`, peerPushInitiation.exchangeBaseUrl, ); mergeUrl.searchParams.set("timeout_ms", "30000"); logger.info(`long-polling on purse status at ${mergeUrl.href}`); const resp = await wex.http.fetch(mergeUrl.href, { // timeout: getReserveRequestTimeout(withdrawalGroup), cancellationToken: wex.cancellationToken, }); if (resp.status === HttpStatusCode.Ok) { const purseStatus = await readSuccessResponseJsonOrThrow( resp, codecForExchangePurseStatus(), ); const mergeTimestamp = purseStatus.merge_timestamp; logger.info(`got purse status ${j2s(purseStatus)}`); if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { return TaskRunResult.backoff(); } else { await transitionPeerPushDebitTransaction( wex, peerPushInitiation.pursePub, { stFrom: PeerPushDebitStatus.PendingReady, stTo: PeerPushDebitStatus.Done, }, ); return TaskRunResult.progress(); } } else if (resp.status === HttpStatusCode.Gone) { logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: [ "peerPushDebit", "refreshGroups", "refreshSessions", "denominations", "coinAvailability", "coins", ], }, async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; } if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { return undefined; } const currency = Amounts.currencyOf(ppiRec.amount); const oldTxState = computePeerPushDebitTransactionState(ppiRec); const coinPubs: CoinRefreshRequest[] = []; if (ppiRec.coinSel) { for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { coinPubs.push({ amount: ppiRec.coinSel.contributions[i], coinPub: ppiRec.coinSel.coinPubs[i], }); } const refresh = await createRefreshGroup( wex, tx, currency, coinPubs, RefreshReason.AbortPeerPushDebit, transactionId, ); ppiRec.abortRefreshGroupId = refresh.refreshGroupId; } ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; await tx.peerPushDebit.put(ppiRec); const newTxState = computePeerPushDebitTransactionState(ppiRec); return { oldTxState, newTxState, }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.backoff(); } else { logger.warn(`unexpected HTTP status for purse: ${resp.status}`); return TaskRunResult.longpollReturnedPending(); } } export async function processPeerPushDebit( wex: WalletExecutionContext, pursePub: string, ): Promise { const peerPushInitiation = await wex.db.runReadOnlyTx( { storeNames: ["peerPushDebit"] }, async (tx) => { return tx.peerPushDebit.get(pursePub); }, ); if (!peerPushInitiation) { throw Error("peer push payment not found"); } switch (peerPushInitiation.status) { case PeerPushDebitStatus.PendingCreatePurse: return processPeerPushDebitCreateReserve(wex, peerPushInitiation); case PeerPushDebitStatus.PendingReady: return processPeerPushDebitReady(wex, peerPushInitiation); case PeerPushDebitStatus.AbortingDeletePurse: return processPeerPushDebitAbortingDeletePurse(wex, peerPushInitiation); case PeerPushDebitStatus.AbortingRefreshDeleted: return processPeerPushDebitAbortingRefreshDeleted( wex, peerPushInitiation, ); case PeerPushDebitStatus.AbortingRefreshExpired: return processPeerPushDebitAbortingRefreshExpired( wex, peerPushInitiation, ); default: { const txState = computePeerPushDebitTransactionState(peerPushInitiation); logger.warn( `not processing peer-push-debit transaction in state ${j2s(txState)}`, ); } } return TaskRunResult.finished(); } /** * Initiate sending a peer-to-peer push payment. */ export async function initiatePeerPushDebit( wex: WalletExecutionContext, req: InitiatePeerPushDebitRequest, ): Promise { const instructedAmount = Amounts.parseOrThrow( req.partialContractTerms.amount, ); const purseExpiration = req.partialContractTerms.purse_expiration; const contractTerms = req.partialContractTerms; const pursePair = await wex.cryptoApi.createEddsaKeypair({}); const mergePair = await wex.cryptoApi.createEddsaKeypair({}); const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms); const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({}); const coinSelRes = await selectPeerCoins(wex, { instructedAmount, }); let coins: SelectedProspectiveCoin[] | undefined = undefined; switch (coinSelRes.type) { case "failure": throw TalerError.fromDetail( TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, }, ); case "prospective": coins = coinSelRes.result.prospectiveCoins; break; case "success": coins = coinSelRes.result.coins; break; default: assertUnreachable(coinSelRes); } const sel = coinSelRes.result; logger.info(`selected p2p coins (push):`); logger.trace(`${j2s(coinSelRes)}`); const totalAmount = await getTotalPeerPaymentCost(wex, coins); logger.info(`computed total peer payment cost`); const pursePub = pursePair.pub; const ctx = new PeerPushDebitTransactionContext(wex, pursePub); const transactionId = ctx.transactionId; const contractEncNonce = encodeCrock(getRandomBytes(24)); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: [ "exchanges", "contractTerms", "coins", "coinAvailability", "denominations", "refreshGroups", "refreshSessions", "peerPushDebit", ], }, async (tx) => { const ppi: PeerPushDebitRecord = { amount: Amounts.stringify(instructedAmount), contractPriv: contractKeyPair.priv, contractPub: contractKeyPair.pub, contractTermsHash: hContractTerms, exchangeBaseUrl: sel.exchangeBaseUrl, mergePriv: mergePair.priv, mergePub: mergePair.pub, purseExpiration: timestampProtocolToDb(purseExpiration), pursePriv: pursePair.priv, pursePub: pursePair.pub, timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), status: PeerPushDebitStatus.PendingCreatePurse, contractEncNonce, totalCost: Amounts.stringify(totalAmount), }; if (coinSelRes.type === "success") { ppi.coinSel = { coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), contributions: coinSelRes.result.coins.map((x) => x.contribution), }; // FIXME: Instead of directly doing a spendCoin here, // we might want to mark the coins as used and spend them // after we've been able to create the purse. await spendCoins(wex, tx, { allocationId: constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: pursePair.pub, }), coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), contributions: coinSelRes.result.coins.map((x) => Amounts.parseOrThrow(x.contribution), ), refreshReason: RefreshReason.PayPeerPush, }); } await tx.peerPushDebit.add(ppi); await tx.contractTerms.put({ h: hContractTerms, contractTermsRaw: contractTerms, }); const newTxState = computePeerPushDebitTransactionState(ppi); return { oldTxState: { major: TransactionMajorState.None }, newTxState, }; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); wex.taskScheduler.startShepherdTask(ctx.taskId); return { contractPriv: contractKeyPair.priv, mergePriv: mergePair.priv, pursePub: pursePair.pub, exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl, transactionId: constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: pursePair.pub, }), }; } export function computePeerPushDebitTransactionActions( ppiRecord: PeerPushDebitRecord, ): TransactionAction[] { switch (ppiRecord.status) { case PeerPushDebitStatus.PendingCreatePurse: return [TransactionAction.Abort, TransactionAction.Suspend]; case PeerPushDebitStatus.PendingReady: return [TransactionAction.Abort, TransactionAction.Suspend]; case PeerPushDebitStatus.Aborted: return [TransactionAction.Delete]; case PeerPushDebitStatus.AbortingDeletePurse: return [TransactionAction.Suspend, TransactionAction.Fail]; case PeerPushDebitStatus.AbortingRefreshDeleted: return [TransactionAction.Suspend, TransactionAction.Fail]; case PeerPushDebitStatus.AbortingRefreshExpired: return [TransactionAction.Suspend, TransactionAction.Fail]; case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: return [TransactionAction.Resume, TransactionAction.Fail]; case PeerPushDebitStatus.SuspendedAbortingDeletePurse: return [TransactionAction.Resume, TransactionAction.Fail]; case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: return [TransactionAction.Resume, TransactionAction.Fail]; case PeerPushDebitStatus.SuspendedCreatePurse: return [TransactionAction.Resume, TransactionAction.Abort]; case PeerPushDebitStatus.SuspendedReady: return [TransactionAction.Resume, TransactionAction.Abort]; case PeerPushDebitStatus.Done: return [TransactionAction.Delete]; case PeerPushDebitStatus.Expired: return [TransactionAction.Delete]; case PeerPushDebitStatus.Failed: return [TransactionAction.Delete]; } } export function computePeerPushDebitTransactionState( ppiRecord: PeerPushDebitRecord, ): TransactionState { switch (ppiRecord.status) { case PeerPushDebitStatus.PendingCreatePurse: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.CreatePurse, }; case PeerPushDebitStatus.PendingReady: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.Ready, }; case PeerPushDebitStatus.Aborted: return { major: TransactionMajorState.Aborted, }; case PeerPushDebitStatus.AbortingDeletePurse: return { major: TransactionMajorState.Aborting, minor: TransactionMinorState.DeletePurse, }; case PeerPushDebitStatus.AbortingRefreshDeleted: return { major: TransactionMajorState.Aborting, minor: TransactionMinorState.Refresh, }; case PeerPushDebitStatus.AbortingRefreshExpired: return { major: TransactionMajorState.Aborting, minor: TransactionMinorState.RefreshExpired, }; case PeerPushDebitStatus.SuspendedAbortingDeletePurse: return { major: TransactionMajorState.SuspendedAborting, minor: TransactionMinorState.DeletePurse, }; case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: return { major: TransactionMajorState.SuspendedAborting, minor: TransactionMinorState.RefreshExpired, }; case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: return { major: TransactionMajorState.SuspendedAborting, minor: TransactionMinorState.Refresh, }; case PeerPushDebitStatus.SuspendedCreatePurse: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.CreatePurse, }; case PeerPushDebitStatus.SuspendedReady: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.Ready, }; case PeerPushDebitStatus.Done: return { major: TransactionMajorState.Done, }; case PeerPushDebitStatus.Failed: return { major: TransactionMajorState.Failed, }; case PeerPushDebitStatus.Expired: return { major: TransactionMajorState.Expired, }; } }