diff options
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-push-debit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-push-debit.ts | 1322 |
1 files changed, 1322 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts new file mode 100644 index 000000000..6452407ff --- /dev/null +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -0,0 +1,1322 @@ +/* + This file is part of GNU Taler + (C) 2022-2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +import { + Amounts, + CheckPeerPushDebitRequest, + CheckPeerPushDebitResponse, + CoinRefreshRequest, + ContractTermsUtil, + ExchangePurseDeposits, + HttpStatusCode, + InitiatePeerPushDebitRequest, + InitiatePeerPushDebitResponse, + Logger, + NotificationType, + RefreshReason, + SelectedProspectiveCoin, + TalerError, + TalerErrorCode, + TalerPreciseTimestamp, + TalerProtocolTimestamp, + TalerProtocolViolationError, + TransactionAction, + TransactionIdStr, + TransactionMajorState, + TransactionMinorState, + TransactionState, + TransactionType, + assertUnreachable, + checkDbInvariant, + checkLogicInvariant, + encodeCrock, + getRandomBytes, + j2s, +} from "@gnu-taler/taler-util"; +import { + HttpResponse, + readSuccessResponseJsonOrThrow, + readTalerErrorResponse, +} from "@gnu-taler/taler-util/http"; +import { PreviousPayCoins, selectPeerCoins } from "./coinSelection.js"; +import { + PendingTaskType, + TaskIdStr, + TaskRunResult, + TaskRunResultType, + TransactionContext, + constructTaskIdentifier, + spendCoins, +} from "./common.js"; +import { EncryptContractRequest } from "./crypto/cryptoTypes.js"; +import { + PeerPushDebitRecord, + PeerPushDebitStatus, + RefreshOperationStatus, + timestampPreciseToDb, + timestampProtocolFromDb, + timestampProtocolToDb, +} from "./db.js"; +import { + codecForExchangePurseStatus, + getTotalPeerPaymentCost, + queryCoinInfosForSelection, +} from "./pay-peer-common.js"; +import { createRefreshGroup, waitRefreshFinal } from "./refresh.js"; +import { + constructTransactionIdentifier, + notifyTransition, +} from "./transactions.js"; +import { WalletExecutionContext } from "./wallet.js"; + +const logger = new Logger("pay-peer-push-debit.ts"); + +export class PeerPushDebitTransactionContext implements TransactionContext { + readonly transactionId: TransactionIdStr; + readonly taskId: TaskIdStr; + + constructor( + public wex: WalletExecutionContext, + public pursePub: string, + ) { + this.transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub, + }); + this.taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushDebit, + pursePub, + }); + } + + async deleteTransaction(): Promise<void> { + const { wex, pursePub, transactionId } = this; + await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit", "tombstones"] }, + async (tx) => { + const debit = await tx.peerPushDebit.get(pursePub); + if (debit) { + await tx.peerPushDebit.delete(pursePub); + await tx.tombstones.put({ id: transactionId }); + } + }, + ); + } + + async suspendTransaction(): Promise<void> { + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + const pushDebitRec = await tx.peerPushDebit.get(pursePub); + if (!pushDebitRec) { + logger.warn(`peer push debit ${pursePub} not found`); + return; + } + let newStatus: PeerPushDebitStatus | undefined = undefined; + switch (pushDebitRec.status) { + case PeerPushDebitStatus.PendingCreatePurse: + newStatus = PeerPushDebitStatus.SuspendedCreatePurse; + break; + case PeerPushDebitStatus.AbortingRefreshDeleted: + newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshDeleted; + break; + case PeerPushDebitStatus.AbortingRefreshExpired: + newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshExpired; + break; + case PeerPushDebitStatus.AbortingDeletePurse: + newStatus = PeerPushDebitStatus.SuspendedAbortingDeletePurse; + break; + case PeerPushDebitStatus.PendingReady: + newStatus = PeerPushDebitStatus.SuspendedReady; + break; + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + case PeerPushDebitStatus.SuspendedReady: + case PeerPushDebitStatus.SuspendedCreatePurse: + case PeerPushDebitStatus.Done: + case PeerPushDebitStatus.Aborted: + case PeerPushDebitStatus.Failed: + case PeerPushDebitStatus.Expired: + // Do nothing + break; + default: + assertUnreachable(pushDebitRec.status); + } + if (newStatus != null) { + const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); + pushDebitRec.status = newStatus; + const newTxState = computePeerPushDebitTransactionState(pushDebitRec); + await tx.peerPushDebit.put(pushDebitRec); + return { + oldTxState, + newTxState, + }; + } + return undefined; + }, + ); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + } + + async abortTransaction(): Promise<void> { + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + const pushDebitRec = await tx.peerPushDebit.get(pursePub); + if (!pushDebitRec) { + logger.warn(`peer push debit ${pursePub} not found`); + return; + } + let newStatus: PeerPushDebitStatus | undefined = undefined; + switch (pushDebitRec.status) { + case PeerPushDebitStatus.PendingReady: + case PeerPushDebitStatus.SuspendedReady: + newStatus = PeerPushDebitStatus.AbortingDeletePurse; + break; + case PeerPushDebitStatus.SuspendedCreatePurse: + case PeerPushDebitStatus.PendingCreatePurse: + // Network request might already be in-flight! + newStatus = PeerPushDebitStatus.AbortingDeletePurse; + break; + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + case PeerPushDebitStatus.AbortingRefreshDeleted: + case PeerPushDebitStatus.AbortingRefreshExpired: + case PeerPushDebitStatus.Done: + case PeerPushDebitStatus.AbortingDeletePurse: + case PeerPushDebitStatus.Aborted: + case PeerPushDebitStatus.Expired: + case PeerPushDebitStatus.Failed: + // Do nothing + break; + default: + assertUnreachable(pushDebitRec.status); + } + if (newStatus != null) { + const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); + pushDebitRec.status = newStatus; + const newTxState = computePeerPushDebitTransactionState(pushDebitRec); + await tx.peerPushDebit.put(pushDebitRec); + return { + oldTxState, + newTxState, + }; + } + return undefined; + }, + ); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); + } + + async resumeTransaction(): Promise<void> { + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + const pushDebitRec = await tx.peerPushDebit.get(pursePub); + if (!pushDebitRec) { + logger.warn(`peer push debit ${pursePub} not found`); + return; + } + let newStatus: PeerPushDebitStatus | undefined = undefined; + switch (pushDebitRec.status) { + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + newStatus = PeerPushDebitStatus.AbortingDeletePurse; + break; + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + newStatus = PeerPushDebitStatus.AbortingRefreshDeleted; + break; + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + newStatus = PeerPushDebitStatus.AbortingRefreshExpired; + break; + case PeerPushDebitStatus.SuspendedReady: + newStatus = PeerPushDebitStatus.PendingReady; + break; + case PeerPushDebitStatus.SuspendedCreatePurse: + newStatus = PeerPushDebitStatus.PendingCreatePurse; + break; + case PeerPushDebitStatus.PendingCreatePurse: + case PeerPushDebitStatus.AbortingRefreshDeleted: + case PeerPushDebitStatus.AbortingRefreshExpired: + case PeerPushDebitStatus.AbortingDeletePurse: + case PeerPushDebitStatus.PendingReady: + case PeerPushDebitStatus.Done: + case PeerPushDebitStatus.Aborted: + case PeerPushDebitStatus.Failed: + case PeerPushDebitStatus.Expired: + // Do nothing + break; + default: + assertUnreachable(pushDebitRec.status); + } + if (newStatus != null) { + const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); + pushDebitRec.status = newStatus; + const newTxState = computePeerPushDebitTransactionState(pushDebitRec); + await tx.peerPushDebit.put(pushDebitRec); + return { + oldTxState, + newTxState, + }; + } + return undefined; + }, + ); + wex.taskScheduler.startShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + } + + async failTransaction(): Promise<void> { + const { wex, pursePub, transactionId, taskId: retryTag } = this; + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + const pushDebitRec = await tx.peerPushDebit.get(pursePub); + if (!pushDebitRec) { + logger.warn(`peer push debit ${pursePub} not found`); + return; + } + let newStatus: PeerPushDebitStatus | undefined = undefined; + switch (pushDebitRec.status) { + case PeerPushDebitStatus.AbortingRefreshDeleted: + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + // FIXME: What to do about the refresh group? + newStatus = PeerPushDebitStatus.Failed; + break; + case PeerPushDebitStatus.AbortingDeletePurse: + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + case PeerPushDebitStatus.AbortingRefreshExpired: + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + case PeerPushDebitStatus.PendingReady: + case PeerPushDebitStatus.SuspendedReady: + case PeerPushDebitStatus.SuspendedCreatePurse: + case PeerPushDebitStatus.PendingCreatePurse: + newStatus = PeerPushDebitStatus.Failed; + break; + case PeerPushDebitStatus.Done: + case PeerPushDebitStatus.Aborted: + case PeerPushDebitStatus.Failed: + case PeerPushDebitStatus.Expired: + // Do nothing + break; + default: + assertUnreachable(pushDebitRec.status); + } + if (newStatus != null) { + const oldTxState = computePeerPushDebitTransactionState(pushDebitRec); + pushDebitRec.status = newStatus; + const newTxState = computePeerPushDebitTransactionState(pushDebitRec); + await tx.peerPushDebit.put(pushDebitRec); + return { + oldTxState, + newTxState, + }; + } + return undefined; + }, + ); + wex.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(wex, transactionId, transitionInfo); + wex.taskScheduler.startShepherdTask(retryTag); + } +} + +export async function checkPeerPushDebit( + wex: WalletExecutionContext, + req: CheckPeerPushDebitRequest, +): Promise<CheckPeerPushDebitResponse> { + const instructedAmount = Amounts.parseOrThrow(req.amount); + logger.trace( + `checking peer push debit for ${Amounts.stringify(instructedAmount)}`, + ); + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); + let coins: SelectedProspectiveCoin[] | undefined = undefined; + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); + } + logger.trace(`selected peer coins (len=${coins.length})`); + const totalAmount = await getTotalPeerPaymentCost(wex, coins); + logger.trace("computed total peer payment cost"); + return { + exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl, + amountEffective: Amounts.stringify(totalAmount), + amountRaw: req.amount, + maxExpirationDate: coinSelRes.result.maxExpirationDate, + }; +} + +async function handlePurseCreationConflict( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, + resp: HttpResponse, +): Promise<TaskRunResult> { + const pursePub = peerPushInitiation.pursePub; + const errResp = await readTalerErrorResponse(resp); + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); + if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) { + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + + // FIXME: Properly parse! + const brokenCoinPub = (errResp as any).coin_pub; + logger.trace(`excluded broken coin pub=${brokenCoinPub}`); + + if (!brokenCoinPub) { + // FIXME: Details! + throw new TalerProtocolViolationError(); + } + + const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount); + const sel = peerPushInitiation.coinSel; + + checkDbInvariant(!!sel); + + const repair: PreviousPayCoins = []; + + for (let i = 0; i < sel.coinPubs.length; i++) { + if (sel.coinPubs[i] != brokenCoinPub) { + repair.push({ + coinPub: sel.coinPubs[i], + contribution: Amounts.parseOrThrow(sel.contributions[i]), + }); + } + } + + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + repair, + }); + + switch (coinSelRes.type) { + case "failure": + case "prospective": + // FIXME: Details! + throw Error( + "insufficient balance to re-select coins to repair double spending", + ); + case "success": + break; + default: + assertUnreachable(coinSelRes); + } + + await wex.db.runReadWriteTx({ storeNames: ["peerPushDebit"] }, async (tx) => { + const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); + if (!myPpi) { + return; + } + switch (myPpi.status) { + case PeerPushDebitStatus.PendingCreatePurse: + case PeerPushDebitStatus.SuspendedCreatePurse: { + const sel = coinSelRes.result; + myPpi.coinSel = { + coinPubs: sel.coins.map((x) => x.coinPub), + contributions: sel.coins.map((x) => x.contribution), + }; + break; + } + default: + return; + } + await tx.peerPushDebit.put(myPpi); + }); + return TaskRunResult.progress(); +} + +async function processPeerPushDebitCreateReserve( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, +): Promise<TaskRunResult> { + const pursePub = peerPushInitiation.pursePub; + const purseExpiration = peerPushInitiation.purseExpiration; + const hContractTerms = peerPushInitiation.contractTermsHash; + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); + const transactionId = ctx.transactionId; + + logger.trace(`processing ${transactionId} pending(create-reserve)`); + + const contractTermsRecord = await wex.db.runReadOnlyTx( + { storeNames: ["contractTerms"] }, + async (tx) => { + return tx.contractTerms.get(hContractTerms); + }, + ); + + if (!contractTermsRecord) { + throw Error( + `db invariant failed, contract terms for ${transactionId} missing`, + ); + } + + if (!peerPushInitiation.coinSel) { + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount: Amounts.parseOrThrow(peerPushInitiation.amount), + }); + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + throw Error("insufficient funds (blocked on refresh)"); + case "success": + break; + default: + assertUnreachable(coinSelRes); + } + const transitionDone = await wex.db.runReadWriteTx( + { + storeNames: [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPushDebit", + ], + }, + async (tx) => { + const ppi = await tx.peerPushDebit.get(pursePub); + if (!ppi) { + return false; + } + if (ppi.coinSel) { + return false; + } + + ppi.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + }; + // FIXME: Instead of directly doing a spendCoin here, + // we might want to mark the coins as used and spend them + // after we've been able to create the purse. + await spendCoins(wex, tx, { + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPush, + }); + + await tx.peerPushDebit.put(ppi); + return true; + }, + ); + if (transitionDone) { + return TaskRunResult.progress(); + } + return TaskRunResult.backoff(); + } + + const purseSigResp = await wex.cryptoApi.signPurseCreation({ + hContractTerms, + mergePub: peerPushInitiation.mergePub, + minAge: 0, + purseAmount: peerPushInitiation.amount, + purseExpiration: timestampProtocolFromDb(purseExpiration), + pursePriv: peerPushInitiation.pursePriv, + }); + + const coins = await queryCoinInfosForSelection( + wex, + peerPushInitiation.coinSel, + ); + + const encryptContractRequest: EncryptContractRequest = { + contractTerms: contractTermsRecord.contractTermsRaw, + mergePriv: peerPushInitiation.mergePriv, + pursePriv: peerPushInitiation.pursePriv, + pursePub: peerPushInitiation.pursePub, + contractPriv: peerPushInitiation.contractPriv, + contractPub: peerPushInitiation.contractPub, + nonce: peerPushInitiation.contractEncNonce, + }; + + const econtractResp = await wex.cryptoApi.encryptContractForMerge( + encryptContractRequest, + ); + + const maxBatchSize = 100; + + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); + const batchCoins = coins.slice(i, i + batchSize); + + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, + pursePub: peerPushInitiation.pursePub, + coins: batchCoins, + }); + + if (i == 0) { + // First batch creates the purse! + + logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`); + + const createPurseUrl = new URL( + `purses/${peerPushInitiation.pursePub}/create`, + peerPushInitiation.exchangeBaseUrl, + ); + + const reqBody = { + amount: peerPushInitiation.amount, + merge_pub: peerPushInitiation.mergePub, + purse_sig: purseSigResp.sig, + h_contract_terms: hContractTerms, + purse_expiration: timestampProtocolFromDb(purseExpiration), + deposits: depositSigsResp.deposits, + min_age: 0, + econtract: econtractResp.econtract, + }; + + if (logger.shouldLogTrace()) { + logger.trace(`request body: ${j2s(reqBody)}`); + } + + const httpResp = await wex.http.fetch(createPurseUrl.href, { + method: "POST", + body: reqBody, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } + } else { + const purseDepositUrl = new URL( + `purses/${pursePub}/deposit`, + peerPushInitiation.exchangeBaseUrl, + ); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: + // Possibly on to the next batch. + continue; + case HttpStatusCode.Forbidden: { + // FIXME: Store this error! + await ctx.failTransaction(); + return TaskRunResult.finished(); + } + case HttpStatusCode.Conflict: { + // Handle double-spending + return handlePurseCreationConflict(wex, peerPushInitiation, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } + } + } + } + + // All batches done! + + await transitionPeerPushDebitTransaction(wex, pursePub, { + stFrom: PeerPushDebitStatus.PendingCreatePurse, + stTo: PeerPushDebitStatus.PendingReady, + }); + + return TaskRunResult.backoff(); +} + +async function processPeerPushDebitAbortingDeletePurse( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, +): Promise<TaskRunResult> { + const { pursePub, pursePriv } = peerPushInitiation; + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub, + }); + + const sigResp = await wex.cryptoApi.signDeletePurse({ + pursePriv, + }); + const purseUrl = new URL( + `purses/${pursePub}`, + peerPushInitiation.exchangeBaseUrl, + ); + const resp = await wex.http.fetch(purseUrl.href, { + method: "DELETE", + headers: { + "taler-purse-signature": sigResp.sig, + }, + cancellationToken: wex.cancellationToken, + }); + logger.info(`deleted purse with response status ${resp.status}`); + + const transitionInfo = await wex.db.runReadWriteTx( + { + storeNames: [ + "peerPushDebit", + "refreshGroups", + "refreshSessions", + "denominations", + "coinAvailability", + "coins", + ], + }, + async (tx) => { + const ppiRec = await tx.peerPushDebit.get(pursePub); + if (!ppiRec) { + return undefined; + } + if (ppiRec.status !== PeerPushDebitStatus.AbortingDeletePurse) { + return undefined; + } + const currency = Amounts.currencyOf(ppiRec.amount); + const oldTxState = computePeerPushDebitTransactionState(ppiRec); + const coinPubs: CoinRefreshRequest[] = []; + + if (!ppiRec.coinSel) { + return undefined; + } + + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: ppiRec.coinSel.contributions[i], + coinPub: ppiRec.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + transactionId, + ); + ppiRec.status = PeerPushDebitStatus.AbortingRefreshDeleted; + ppiRec.abortRefreshGroupId = refresh.refreshGroupId; + await tx.peerPushDebit.put(ppiRec); + const newTxState = computePeerPushDebitTransactionState(ppiRec); + return { + oldTxState, + newTxState, + }; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); + + return TaskRunResult.backoff(); +} + +interface SimpleTransition { + stFrom: PeerPushDebitStatus; + stTo: PeerPushDebitStatus; +} + +// FIXME: This should be a transition on the peer push debit transaction context! +async function transitionPeerPushDebitTransaction( + wex: WalletExecutionContext, + pursePub: string, + transitionSpec: SimpleTransition, +): Promise<void> { + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub, + }); + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + const ppiRec = await tx.peerPushDebit.get(pursePub); + if (!ppiRec) { + return undefined; + } + if (ppiRec.status !== transitionSpec.stFrom) { + return undefined; + } + const oldTxState = computePeerPushDebitTransactionState(ppiRec); + ppiRec.status = transitionSpec.stTo; + await tx.peerPushDebit.put(ppiRec); + const newTxState = computePeerPushDebitTransactionState(ppiRec); + return { + oldTxState, + newTxState, + }; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); +} + +async function processPeerPushDebitAbortingRefreshDeleted( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, +): Promise<TaskRunResult> { + const pursePub = peerPushInitiation.pursePub; + const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: peerPushInitiation.pursePub, + }); + if (peerPushInitiation.abortRefreshGroupId) { + await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId); + } + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["refreshGroups", "peerPushDebit"] }, + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: PeerPushDebitStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into failed. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = PeerPushDebitStatus.Failed; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = PeerPushDebitStatus.Aborted; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = PeerPushDebitStatus.Failed; + } + } + if (newOpState) { + const newDg = await tx.peerPushDebit.get(pursePub); + if (!newDg) { + return; + } + const oldTxState = computePeerPushDebitTransactionState(newDg); + newDg.status = newOpState; + const newTxState = computePeerPushDebitTransactionState(newDg); + await tx.peerPushDebit.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); + // FIXME: Shouldn't this be finished in some cases?! + return TaskRunResult.backoff(); +} + +async function processPeerPushDebitAbortingRefreshExpired( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, +): Promise<TaskRunResult> { + const pursePub = peerPushInitiation.pursePub; + const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: peerPushInitiation.pursePub, + }); + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["peerPushDebit", "refreshGroups"] }, + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: PeerPushDebitStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into failed. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = PeerPushDebitStatus.Failed; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = PeerPushDebitStatus.Expired; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = PeerPushDebitStatus.Failed; + } + } + if (newOpState) { + const newDg = await tx.peerPushDebit.get(pursePub); + if (!newDg) { + return; + } + const oldTxState = computePeerPushDebitTransactionState(newDg); + newDg.status = newOpState; + const newTxState = computePeerPushDebitTransactionState(newDg); + await tx.peerPushDebit.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); + // FIXME: Shouldn't this be finished in some cases?! + return TaskRunResult.backoff(); +} + +/** + * Process the "pending(ready)" state of a peer-push-debit transaction. + */ +async function processPeerPushDebitReady( + wex: WalletExecutionContext, + peerPushInitiation: PeerPushDebitRecord, +): Promise<TaskRunResult> { + logger.trace("processing peer-push-debit pending(ready)"); + const pursePub = peerPushInitiation.pursePub; + const transactionId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushDebit, + pursePub, + }); + const mergeUrl = new URL( + `purses/${pursePub}/merge`, + peerPushInitiation.exchangeBaseUrl, + ); + mergeUrl.searchParams.set("timeout_ms", "30000"); + logger.info(`long-polling on purse status at ${mergeUrl.href}`); + const resp = await wex.http.fetch(mergeUrl.href, { + // timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken: wex.cancellationToken, + }); + if (resp.status === HttpStatusCode.Ok) { + const purseStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForExchangePurseStatus(), + ); + const mergeTimestamp = purseStatus.merge_timestamp; + logger.info(`got purse status ${j2s(purseStatus)}`); + if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { + return TaskRunResult.backoff(); + } else { + await transitionPeerPushDebitTransaction( + wex, + peerPushInitiation.pursePub, + { + stFrom: PeerPushDebitStatus.PendingReady, + stTo: PeerPushDebitStatus.Done, + }, + ); + return TaskRunResult.progress(); + } + } else if (resp.status === HttpStatusCode.Gone) { + logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); + const transitionInfo = await wex.db.runReadWriteTx( + { + storeNames: [ + "peerPushDebit", + "refreshGroups", + "refreshSessions", + "denominations", + "coinAvailability", + "coins", + ], + }, + async (tx) => { + const ppiRec = await tx.peerPushDebit.get(pursePub); + if (!ppiRec) { + return undefined; + } + if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { + return undefined; + } + const currency = Amounts.currencyOf(ppiRec.amount); + const oldTxState = computePeerPushDebitTransactionState(ppiRec); + const coinPubs: CoinRefreshRequest[] = []; + + if (ppiRec.coinSel) { + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: ppiRec.coinSel.contributions[i], + coinPub: ppiRec.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + transactionId, + ); + + ppiRec.abortRefreshGroupId = refresh.refreshGroupId; + } + ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; + await tx.peerPushDebit.put(ppiRec); + const newTxState = computePeerPushDebitTransactionState(ppiRec); + return { + oldTxState, + newTxState, + }; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); + return TaskRunResult.backoff(); + } else { + logger.warn(`unexpected HTTP status for purse: ${resp.status}`); + return TaskRunResult.longpollReturnedPending(); + } +} + +export async function processPeerPushDebit( + wex: WalletExecutionContext, + pursePub: string, +): Promise<TaskRunResult> { + const peerPushInitiation = await wex.db.runReadOnlyTx( + { storeNames: ["peerPushDebit"] }, + async (tx) => { + return tx.peerPushDebit.get(pursePub); + }, + ); + if (!peerPushInitiation) { + throw Error("peer push payment not found"); + } + + switch (peerPushInitiation.status) { + case PeerPushDebitStatus.PendingCreatePurse: + return processPeerPushDebitCreateReserve(wex, peerPushInitiation); + case PeerPushDebitStatus.PendingReady: + return processPeerPushDebitReady(wex, peerPushInitiation); + case PeerPushDebitStatus.AbortingDeletePurse: + return processPeerPushDebitAbortingDeletePurse(wex, peerPushInitiation); + case PeerPushDebitStatus.AbortingRefreshDeleted: + return processPeerPushDebitAbortingRefreshDeleted( + wex, + peerPushInitiation, + ); + case PeerPushDebitStatus.AbortingRefreshExpired: + return processPeerPushDebitAbortingRefreshExpired( + wex, + peerPushInitiation, + ); + default: { + const txState = computePeerPushDebitTransactionState(peerPushInitiation); + logger.warn( + `not processing peer-push-debit transaction in state ${j2s(txState)}`, + ); + } + } + + return TaskRunResult.finished(); +} + +/** + * Initiate sending a peer-to-peer push payment. + */ +export async function initiatePeerPushDebit( + wex: WalletExecutionContext, + req: InitiatePeerPushDebitRequest, +): Promise<InitiatePeerPushDebitResponse> { + const instructedAmount = Amounts.parseOrThrow( + req.partialContractTerms.amount, + ); + const purseExpiration = req.partialContractTerms.purse_expiration; + const contractTerms = req.partialContractTerms; + + const pursePair = await wex.cryptoApi.createEddsaKeypair({}); + const mergePair = await wex.cryptoApi.createEddsaKeypair({}); + + const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms); + + const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({}); + + const coinSelRes = await selectPeerCoins(wex, { + instructedAmount, + }); + + let coins: SelectedProspectiveCoin[] | undefined = undefined; + + switch (coinSelRes.type) { + case "failure": + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails, + }, + ); + case "prospective": + coins = coinSelRes.result.prospectiveCoins; + break; + case "success": + coins = coinSelRes.result.coins; + break; + default: + assertUnreachable(coinSelRes); + } + + const sel = coinSelRes.result; + + logger.info(`selected p2p coins (push):`); + logger.trace(`${j2s(coinSelRes)}`); + + const totalAmount = await getTotalPeerPaymentCost(wex, coins); + + logger.info(`computed total peer payment cost`); + + const pursePub = pursePair.pub; + + const ctx = new PeerPushDebitTransactionContext(wex, pursePub); + + const transactionId = ctx.transactionId; + + const contractEncNonce = encodeCrock(getRandomBytes(24)); + + const transitionInfo = await wex.db.runReadWriteTx( + { + storeNames: [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "refreshSessions", + "peerPushDebit", + ], + }, + async (tx) => { + const ppi: PeerPushDebitRecord = { + amount: Amounts.stringify(instructedAmount), + contractPriv: contractKeyPair.priv, + contractPub: contractKeyPair.pub, + contractTermsHash: hContractTerms, + exchangeBaseUrl: sel.exchangeBaseUrl, + mergePriv: mergePair.priv, + mergePub: mergePair.pub, + purseExpiration: timestampProtocolToDb(purseExpiration), + pursePriv: pursePair.priv, + pursePub: pursePair.pub, + timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), + status: PeerPushDebitStatus.PendingCreatePurse, + contractEncNonce, + totalCost: Amounts.stringify(totalAmount), + }; + + if (coinSelRes.type === "success") { + ppi.coinSel = { + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => x.contribution), + }; + // FIXME: Instead of directly doing a spendCoin here, + // we might want to mark the coins as used and spend them + // after we've been able to create the purse. + await spendCoins(wex, tx, { + allocationId: constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: pursePair.pub, + }), + coinPubs: coinSelRes.result.coins.map((x) => x.coinPub), + contributions: coinSelRes.result.coins.map((x) => + Amounts.parseOrThrow(x.contribution), + ), + refreshReason: RefreshReason.PayPeerPush, + }); + } + + await tx.peerPushDebit.add(ppi); + + await tx.contractTerms.put({ + h: hContractTerms, + contractTermsRaw: contractTerms, + }); + + const newTxState = computePeerPushDebitTransactionState(ppi); + return { + oldTxState: { major: TransactionMajorState.None }, + newTxState, + }; + }, + ); + notifyTransition(wex, transactionId, transitionInfo); + wex.ws.notify({ + type: NotificationType.BalanceChange, + hintTransactionId: transactionId, + }); + + wex.taskScheduler.startShepherdTask(ctx.taskId); + + return { + contractPriv: contractKeyPair.priv, + mergePriv: mergePair.priv, + pursePub: pursePair.pub, + exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl, + transactionId: constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: pursePair.pub, + }), + }; +} + +export function computePeerPushDebitTransactionActions( + ppiRecord: PeerPushDebitRecord, +): TransactionAction[] { + switch (ppiRecord.status) { + case PeerPushDebitStatus.PendingCreatePurse: + return [TransactionAction.Abort, TransactionAction.Suspend]; + case PeerPushDebitStatus.PendingReady: + return [TransactionAction.Abort, TransactionAction.Suspend]; + case PeerPushDebitStatus.Aborted: + return [TransactionAction.Delete]; + case PeerPushDebitStatus.AbortingDeletePurse: + return [TransactionAction.Suspend, TransactionAction.Fail]; + case PeerPushDebitStatus.AbortingRefreshDeleted: + return [TransactionAction.Suspend, TransactionAction.Fail]; + case PeerPushDebitStatus.AbortingRefreshExpired: + return [TransactionAction.Suspend, TransactionAction.Fail]; + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + return [TransactionAction.Resume, TransactionAction.Fail]; + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + return [TransactionAction.Resume, TransactionAction.Fail]; + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + return [TransactionAction.Resume, TransactionAction.Fail]; + case PeerPushDebitStatus.SuspendedCreatePurse: + return [TransactionAction.Resume, TransactionAction.Abort]; + case PeerPushDebitStatus.SuspendedReady: + return [TransactionAction.Resume, TransactionAction.Abort]; + case PeerPushDebitStatus.Done: + return [TransactionAction.Delete]; + case PeerPushDebitStatus.Expired: + return [TransactionAction.Delete]; + case PeerPushDebitStatus.Failed: + return [TransactionAction.Delete]; + } +} + +export function computePeerPushDebitTransactionState( + ppiRecord: PeerPushDebitRecord, +): TransactionState { + switch (ppiRecord.status) { + case PeerPushDebitStatus.PendingCreatePurse: + return { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.CreatePurse, + }; + case PeerPushDebitStatus.PendingReady: + return { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.Ready, + }; + case PeerPushDebitStatus.Aborted: + return { + major: TransactionMajorState.Aborted, + }; + case PeerPushDebitStatus.AbortingDeletePurse: + return { + major: TransactionMajorState.Aborting, + minor: TransactionMinorState.DeletePurse, + }; + case PeerPushDebitStatus.AbortingRefreshDeleted: + return { + major: TransactionMajorState.Aborting, + minor: TransactionMinorState.Refresh, + }; + case PeerPushDebitStatus.AbortingRefreshExpired: + return { + major: TransactionMajorState.Aborting, + minor: TransactionMinorState.RefreshExpired, + }; + case PeerPushDebitStatus.SuspendedAbortingDeletePurse: + return { + major: TransactionMajorState.SuspendedAborting, + minor: TransactionMinorState.DeletePurse, + }; + case PeerPushDebitStatus.SuspendedAbortingRefreshExpired: + return { + major: TransactionMajorState.SuspendedAborting, + minor: TransactionMinorState.RefreshExpired, + }; + case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted: + return { + major: TransactionMajorState.SuspendedAborting, + minor: TransactionMinorState.Refresh, + }; + case PeerPushDebitStatus.SuspendedCreatePurse: + return { + major: TransactionMajorState.Suspended, + minor: TransactionMinorState.CreatePurse, + }; + case PeerPushDebitStatus.SuspendedReady: + return { + major: TransactionMajorState.Suspended, + minor: TransactionMinorState.Ready, + }; + case PeerPushDebitStatus.Done: + return { + major: TransactionMajorState.Done, + }; + case PeerPushDebitStatus.Failed: + return { + major: TransactionMajorState.Failed, + }; + case PeerPushDebitStatus.Expired: + return { + major: TransactionMajorState.Expired, + }; + } +} |