/* This file is part of GNU Taler (C) 2021-2023 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * Implementation of the deposit transaction. */ /** * Imports. */ import { AbsoluteTime, AmountJson, Amounts, BatchDepositRequestCoin, CancellationToken, CoinRefreshRequest, CreateDepositGroupRequest, CreateDepositGroupResponse, DepositGroupFees, Duration, ExchangeBatchDepositRequest, ExchangeHandle, ExchangeRefundRequest, HttpStatusCode, Logger, MerchantContractTerms, NotificationType, PrepareDepositRequest, PrepareDepositResponse, RefreshReason, SelectedProspectiveCoin, TalerError, TalerErrorCode, TalerPreciseTimestamp, TalerProtocolTimestamp, TrackTransaction, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, TransactionType, URL, WireFee, assertUnreachable, canonicalJson, checkDbInvariant, checkLogicInvariant, codecForBatchDepositSuccess, codecForTackTransactionAccepted, codecForTackTransactionWired, encodeCrock, getRandomBytes, hashTruncate32, hashWire, j2s, parsePaytoUri, stringToBytes, } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { selectPayCoins } from "./coinSelection.js"; import { PendingTaskType, TaskIdStr, TaskRunResult, TombstoneTag, TransactionContext, constructTaskIdentifier, spendCoins, } from "./common.js"; import { DepositElementStatus, DepositGroupRecord, DepositInfoPerExchange, DepositOperationStatus, DepositTrackingInfo, KycPendingInfo, RefreshOperationStatus, timestampPreciseToDb, timestampProtocolToDb, } from "./db.js"; import { getExchangeWireDetailsInTx } from "./exchanges.js"; import { extractContractData, generateDepositPermissions, getTotalPaymentCost, } from "./pay-merchant.js"; import { CreateRefreshGroupResult, createRefreshGroup, getTotalRefreshCost, } from "./refresh.js"; import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, } from "./transactions.js"; import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; /** * Logger. */ const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; constructor( public wex: WalletExecutionContext, public depositGroupId: string, ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId, }); } async deleteTransaction(): Promise { const depositGroupId = this.depositGroupId; const ws = this.wex; // FIXME: We should check first if we are in a final state // where deletion is allowed. await ws.db.runReadWriteTx( { storeNames: ["depositGroups", "tombstones"] }, async (tx) => { const tipRecord = await tx.depositGroups.get(depositGroupId); if (tipRecord) { await tx.depositGroups.delete(depositGroupId); await tx.tombstones.put({ id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, }); } }, ); return; } async suspendTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); let newOpStatus: DepositOperationStatus | undefined; switch (dg.operationStatus) { case DepositOperationStatus.PendingDeposit: newOpStatus = DepositOperationStatus.SuspendedDeposit; break; case DepositOperationStatus.PendingKyc: newOpStatus = DepositOperationStatus.SuspendedKyc; break; case DepositOperationStatus.PendingTrack: newOpStatus = DepositOperationStatus.SuspendedTrack; break; case DepositOperationStatus.Aborting: newOpStatus = DepositOperationStatus.SuspendedAborting; break; } if (!newOpStatus) { return undefined; } dg.operationStatus = newOpStatus; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); switch (dg.operationStatus) { case DepositOperationStatus.Finished: return undefined; case DepositOperationStatus.PendingDeposit: case DepositOperationStatus.SuspendedDeposit: { dg.operationStatus = DepositOperationStatus.Aborting; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; } } return undefined; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); } async resumeTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't resume deposit group, depositGroupId=${depositGroupId} not found`, ); return; } const oldState = computeDepositTransactionStatus(dg); let newOpStatus: DepositOperationStatus | undefined; switch (dg.operationStatus) { case DepositOperationStatus.SuspendedDeposit: newOpStatus = DepositOperationStatus.PendingDeposit; break; case DepositOperationStatus.SuspendedAborting: newOpStatus = DepositOperationStatus.Aborting; break; case DepositOperationStatus.SuspendedKyc: newOpStatus = DepositOperationStatus.PendingKyc; break; case DepositOperationStatus.SuspendedTrack: newOpStatus = DepositOperationStatus.PendingTrack; break; } if (!newOpStatus) { return undefined; } dg.operationStatus = newOpStatus; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId } = this; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't cancel aborting deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); switch (dg.operationStatus) { case DepositOperationStatus.SuspendedAborting: case DepositOperationStatus.Aborting: { dg.operationStatus = DepositOperationStatus.Failed; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; } } return undefined; }, ); wex.taskScheduler.stopShepherdTask(taskId); notifyTransition(wex, transactionId, transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); } } /** * Get the (DD37-style) transaction status based on the * database record of a deposit group. */ export function computeDepositTransactionStatus( dg: DepositGroupRecord, ): TransactionState { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return { major: TransactionMajorState.Done, }; case DepositOperationStatus.PendingDeposit: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.Deposit, }; case DepositOperationStatus.PendingKyc: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.KycRequired, }; case DepositOperationStatus.PendingTrack: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.Track, }; case DepositOperationStatus.SuspendedKyc: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.KycRequired, }; case DepositOperationStatus.SuspendedTrack: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.Track, }; case DepositOperationStatus.SuspendedDeposit: return { major: TransactionMajorState.Suspended, }; case DepositOperationStatus.Aborting: return { major: TransactionMajorState.Aborting, }; case DepositOperationStatus.Aborted: return { major: TransactionMajorState.Aborted, }; case DepositOperationStatus.Failed: return { major: TransactionMajorState.Failed, }; case DepositOperationStatus.SuspendedAborting: return { major: TransactionMajorState.SuspendedAborting, }; default: assertUnreachable(dg.operationStatus); } } /** * Compute the possible actions possible on a deposit transaction * based on the current transaction state. */ export function computeDepositTransactionActions( dg: DepositGroupRecord, ): TransactionAction[] { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return [TransactionAction.Delete]; case DepositOperationStatus.PendingDeposit: return [TransactionAction.Suspend, TransactionAction.Abort]; case DepositOperationStatus.SuspendedDeposit: return [TransactionAction.Resume]; case DepositOperationStatus.Aborting: return [TransactionAction.Fail, TransactionAction.Suspend]; case DepositOperationStatus.Aborted: return [TransactionAction.Delete]; case DepositOperationStatus.Failed: return [TransactionAction.Delete]; case DepositOperationStatus.SuspendedAborting: return [TransactionAction.Resume, TransactionAction.Fail]; case DepositOperationStatus.PendingKyc: return [TransactionAction.Suspend, TransactionAction.Fail]; case DepositOperationStatus.PendingTrack: return [TransactionAction.Suspend, TransactionAction.Abort]; case DepositOperationStatus.SuspendedKyc: return [TransactionAction.Resume, TransactionAction.Fail]; case DepositOperationStatus.SuspendedTrack: return [TransactionAction.Resume, TransactionAction.Abort]; default: assertUnreachable(dg.operationStatus); } } async function refundDepositGroup( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const statusPerCoin = depositGroup.statusPerCoin; const payCoinSelection = depositGroup.payCoinSelection; if (!statusPerCoin) { throw Error( "unable to refund deposit group without coin selection (status missing)", ); } if (!payCoinSelection) { throw Error( "unable to refund deposit group without coin selection (selection missing)", ); } const newTxPerCoin = [...statusPerCoin]; logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`); for (let i = 0; i < statusPerCoin.length; i++) { const st = statusPerCoin[i]; switch (st) { case DepositElementStatus.RefundFailed: case DepositElementStatus.RefundSuccess: break; default: { const coinPub = payCoinSelection.coinPubs[i]; const coinExchange = await wex.db.runReadOnlyTx( { storeNames: ["coins"] }, async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; }, ); const refundAmount = payCoinSelection.coinContributions[i]; // We use a constant refund transaction ID, since there can // only be one refund. const rtid = 1; const sig = await wex.cryptoApi.signRefund({ coinPub, contractTermsHash: depositGroup.contractTermsHash, merchantPriv: depositGroup.merchantPriv, merchantPub: depositGroup.merchantPub, refundAmount: refundAmount, rtransactionId: rtid, }); const refundReq: ExchangeRefundRequest = { h_contract_terms: depositGroup.contractTermsHash, merchant_pub: depositGroup.merchantPub, merchant_sig: sig.sig, refund_amount: refundAmount, rtransaction_id: rtid, }; const refundUrl = new URL(`coins/${coinPub}/refund`, coinExchange); const httpResp = await wex.http.fetch(refundUrl.href, { method: "POST", body: refundReq, cancellationToken: wex.cancellationToken, }); logger.info( `coin ${i} refund HTTP status for coin: ${httpResp.status}`, ); let newStatus: DepositElementStatus; if (httpResp.status === 200) { // FIXME: validate response newStatus = DepositElementStatus.RefundSuccess; } else { // FIXME: Store problem somewhere! newStatus = DepositElementStatus.RefundFailed; } // FIXME: Handle case where refund request needs to be tried again newTxPerCoin[i] = newStatus; break; } } } let isDone = true; for (let i = 0; i < newTxPerCoin.length; i++) { if ( newTxPerCoin[i] != DepositElementStatus.RefundFailed && newTxPerCoin[i] != DepositElementStatus.RefundSuccess ) { isDone = false; } } const currency = Amounts.currencyOf(depositGroup.totalPayCost); const res = await wex.db.runReadWriteTx( { storeNames: [ "depositGroups", "refreshGroups", "refreshSessions", "coins", "denominations", "coinAvailability", ], }, async (tx) => { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; } newDg.statusPerCoin = newTxPerCoin; const refreshCoins: CoinRefreshRequest[] = []; for (let i = 0; i < newTxPerCoin.length; i++) { refreshCoins.push({ amount: payCoinSelection.coinContributions[i], coinPub: payCoinSelection.coinPubs[i], }); } let refreshRes: CreateRefreshGroupResult | undefined = undefined; if (isDone) { refreshRes = await createRefreshGroup( wex, tx, currency, refreshCoins, RefreshReason.AbortDeposit, constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: newDg.depositGroupId, }), ); newDg.abortRefreshGroupId = refreshRes.refreshGroupId; } await tx.depositGroups.put(newDg); return { refreshRes }; }, ); if (res?.refreshRes) { for (const notif of res.refreshRes.notifications) { wex.ws.notify(notif); } } return TaskRunResult.backoff(); } /** * Check whether the refresh associated with the * aborting deposit group is done. * * If done, mark the deposit transaction as aborted. * * Otherwise continue waiting. * * FIXME: Wait for the refresh group notifications instead of periodically * checking the refresh group status. * FIXME: This is just one transaction, can't we do this in the initial * transaction of processDepositGroup? */ async function waitForRefreshOnDepositGroup( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const abortRefreshGroupId = depositGroup.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: depositGroup.depositGroupId, }); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups", "refreshGroups"] }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: DepositOperationStatus | undefined; if (!refreshGroup) { // Maybe it got manually deleted? Means that we should // just go into aborted. logger.warn("no aborting refresh group found for deposit group"); newOpState = DepositOperationStatus.Aborted; } else { if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { newOpState = DepositOperationStatus.Aborted; } else if ( refreshGroup.operationStatus === RefreshOperationStatus.Failed ) { newOpState = DepositOperationStatus.Aborted; } } if (newOpState) { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; } const oldTxState = computeDepositTransactionStatus(newDg); newDg.operationStatus = newOpState; const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; } return undefined; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); return TaskRunResult.backoff(); } async function processDepositGroupAborting( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { logger.info("processing deposit tx in 'aborting'"); const abortRefreshGroupId = depositGroup.abortRefreshGroupId; if (!abortRefreshGroupId) { logger.info("refunding deposit group"); return refundDepositGroup(wex, depositGroup); } logger.info("waiting for refresh"); return waitForRefreshOnDepositGroup(wex, depositGroup); } async function processDepositGroupPendingKyc( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const { depositGroupId } = depositGroup; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); const kycInfo = depositGroup.kycInfo; const userType = "individual"; if (!kycInfo) { throw Error("invalid DB state, in pending(kyc), but no kycInfo present"); } const url = new URL( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, kycInfo.exchangeBaseUrl, ); url.searchParams.set("timeout_ms", "10000"); logger.info(`kyc url ${url.href}`); const kycStatusRes = await wex.http.fetch(url.href, { method: "GET", cancellationToken: wex.cancellationToken, }); if ( kycStatusRes.status === HttpStatusCode.Ok || //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const newDg = await tx.depositGroups.get(depositGroupId); if (!newDg) { return; } if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { return; } const oldTxState = computeDepositTransactionStatus(newDg); newDg.operationStatus = DepositOperationStatus.PendingTrack; const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? } else { throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); } return TaskRunResult.backoff(); } /** * Tracking information from the exchange indicated that * KYC is required. We need to check the KYC info * and transition the transaction to the KYC required state. */ async function transitionToKycRequired( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, kycInfo: KycPendingInfo, exchangeUrl: string, ): Promise { const { depositGroupId } = depositGroup; const userType = "individual"; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); const url = new URL( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, exchangeUrl, ); logger.info(`kyc url ${url.href}`); const kycStatusReq = await wex.http.fetch(url.href, { method: "GET", }); if (kycStatusReq.status === HttpStatusCode.Ok) { logger.warn("kyc requested, but already fulfilled"); return TaskRunResult.backoff(); } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } if (dg.operationStatus !== DepositOperationStatus.PendingTrack) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); dg.kycInfo = { exchangeBaseUrl: exchangeUrl, kycUrl: kycStatus.kyc_url, paytoHash: kycInfo.paytoHash, requirementRow: kycInfo.requirementRow, }; await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.finished(); } else { throw Error(`unexpected response from kyc-check (${kycStatusReq.status})`); } } async function processDepositGroupPendingTrack( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const statusPerCoin = depositGroup.statusPerCoin; const payCoinSelection = depositGroup.payCoinSelection; if (!statusPerCoin) { throw Error( "unable to refund deposit group without coin selection (status missing)", ); } if (!payCoinSelection) { throw Error( "unable to refund deposit group without coin selection (selection missing)", ); } const { depositGroupId } = depositGroup; for (let i = 0; i < statusPerCoin.length; i++) { const coinPub = payCoinSelection.coinPubs[i]; // FIXME: Make the URL part of the coin selection? const exchangeBaseUrl = await wex.db.runReadWriteTx( { storeNames: ["coins"] }, async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; }, ); let updatedTxStatus: DepositElementStatus | undefined = undefined; let newWiredCoin: | { id: string; value: DepositTrackingInfo; } | undefined; if (statusPerCoin[i] !== DepositElementStatus.Wired) { const track = await trackDeposit( wex, depositGroup, coinPub, exchangeBaseUrl, ); if (track.type === "accepted") { if (!track.kyc_ok && track.requirement_row !== undefined) { const paytoHash = encodeCrock( hashTruncate32(stringToBytes(depositGroup.wire.payto_uri + "\0")), ); const { requirement_row: requirementRow } = track; const kycInfo: KycPendingInfo = { paytoHash, requirementRow, }; return transitionToKycRequired( wex, depositGroup, kycInfo, exchangeBaseUrl, ); } else { updatedTxStatus = DepositElementStatus.Tracking; } } else if (track.type === "wired") { updatedTxStatus = DepositElementStatus.Wired; const payto = parsePaytoUri(depositGroup.wire.payto_uri); if (!payto) { throw Error(`unparsable payto: ${depositGroup.wire.payto_uri}`); } const fee = await getExchangeWireFee( wex, payto.targetType, exchangeBaseUrl, track.execution_time, ); const raw = Amounts.parseOrThrow(track.coin_contribution); const wireFee = Amounts.parseOrThrow(fee.wireFee); newWiredCoin = { value: { amountRaw: Amounts.stringify(raw), wireFee: Amounts.stringify(wireFee), exchangePub: track.exchange_pub, timestampExecuted: timestampProtocolToDb(track.execution_time), wireTransferId: track.wtid, }, id: track.exchange_sig, }; } else { updatedTxStatus = DepositElementStatus.DepositPending; } } if (updatedTxStatus !== undefined) { await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return; } if (!dg.statusPerCoin) { return; } if (updatedTxStatus !== undefined) { dg.statusPerCoin[i] = updatedTxStatus; } if (newWiredCoin) { /** * FIXME: if there is a new wire information from the exchange * it should add up to the previous tracking states. * * This may loose information by overriding prev state. * * And: add checks to integration tests */ if (!dg.trackingState) { dg.trackingState = {}; } dg.trackingState[newWiredCoin.id] = newWiredCoin.value; } await tx.depositGroups.put(dg); }, ); } } let allWired = true; const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } if (!dg.statusPerCoin) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); for (let i = 0; i < dg.statusPerCoin.length; i++) { if (dg.statusPerCoin[i] !== DepositElementStatus.Wired) { allWired = false; break; } } if (allWired) { dg.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); dg.operationStatus = DepositOperationStatus.Finished; await tx.depositGroups.put(dg); } const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); notifyTransition(wex, transactionId, transitionInfo); if (allWired) { wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); return TaskRunResult.finished(); } else { return TaskRunResult.longpollReturnedPending(); } } async function processDepositGroupPendingDeposit( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, cancellationToken?: CancellationToken, ): Promise { logger.info("processing deposit group in pending(deposit)"); const depositGroupId = depositGroup.depositGroupId; const contractTermsRec = await wex.db.runReadOnlyTx( { storeNames: ["contractTerms"] }, async (tx) => { return tx.contractTerms.get(depositGroup.contractTermsHash); }, ); if (!contractTermsRec) { throw Error("contract terms for deposit not found in database"); } const contractTerms: MerchantContractTerms = contractTermsRec.contractTermsRaw; const contractData = extractContractData( contractTermsRec.contractTermsRaw, depositGroup.contractTermsHash, "", ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); // Check for cancellation before expensive operations. cancellationToken?.throwIfCancelled(); if (!depositGroup.payCoinSelection) { logger.info("missing coin selection for deposit group, selecting now"); // FIXME: Consider doing the coin selection inside the txn const payCoinSel = await selectPayCoins(wex, { restrictExchanges: { auditors: [], exchanges: contractData.allowedExchanges, }, restrictWireMethod: contractData.wireMethod, contractTermsAmount: Amounts.parseOrThrow(contractData.amount), depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), prevPayCoins: [], }); switch (payCoinSel.type) { case "success": logger.info("coin selection success"); break; case "failure": logger.info("coin selection failure"); throw TalerError.fromDetail( TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, }, ); case "prospective": logger.info("coin selection prospective"); throw Error("insufficient balance (waiting on pending refresh)"); default: assertUnreachable(payCoinSel); } const transitionDone = await wex.db.runReadWriteTx( { storeNames: [ "depositGroups", "coins", "coinAvailability", "refreshGroups", "refreshSessions", "denominations", ], }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return false; } if (dg.statusPerCoin) { return false; } dg.payCoinSelection = { coinContributions: payCoinSel.coinSel.coins.map( (x) => x.contribution, ), coinPubs: payCoinSel.coinSel.coins.map((x) => x.coinPub), }; dg.payCoinSelectionUid = encodeCrock(getRandomBytes(32)); dg.statusPerCoin = payCoinSel.coinSel.coins.map( () => DepositElementStatus.DepositPending, ); await tx.depositGroups.put(dg); await spendCoins(wex, tx, { allocationId: transactionId, coinPubs: dg.payCoinSelection.coinPubs, contributions: dg.payCoinSelection.coinContributions.map((x) => Amounts.parseOrThrow(x), ), refreshReason: RefreshReason.PayDeposit, }); return true; }, ); if (transitionDone) { return TaskRunResult.progress(); } else { return TaskRunResult.backoff(); } } // FIXME: Cache these! const depositPermissions = await generateDepositPermissions( wex, depositGroup.payCoinSelection, contractData, ); // Exchanges involved in the deposit const exchanges: Set = new Set(); for (const dp of depositPermissions) { exchanges.add(dp.exchange_url); } // We need to do one batch per exchange. for (const exchangeUrl of exchanges.values()) { const coins: BatchDepositRequestCoin[] = []; const batchIndexes: number[] = []; const batchReq: ExchangeBatchDepositRequest = { coins, h_contract_terms: depositGroup.contractTermsHash, merchant_payto_uri: depositGroup.wire.payto_uri, merchant_pub: contractTerms.merchant_pub, timestamp: contractTerms.timestamp, wire_salt: depositGroup.wire.salt, wire_transfer_deadline: contractTerms.wire_transfer_deadline, refund_deadline: contractTerms.refund_deadline, }; for (let i = 0; i < depositPermissions.length; i++) { const perm = depositPermissions[i]; if (perm.exchange_url != exchangeUrl) { continue; } coins.push({ coin_pub: perm.coin_pub, coin_sig: perm.coin_sig, contribution: Amounts.stringify(perm.contribution), denom_pub_hash: perm.h_denom, ub_sig: perm.ub_sig, h_age_commitment: perm.h_age_commitment, }); batchIndexes.push(i); } // Check for cancellation before making network request. cancellationToken?.throwIfCancelled(); const url = new URL(`batch-deposit`, exchangeUrl); logger.info(`depositing to ${url.href}`); logger.trace(`deposit request: ${j2s(batchReq)}`); const httpResp = await wex.http.fetch(url.href, { method: "POST", body: batchReq, cancellationToken: cancellationToken, }); await readSuccessResponseJsonOrThrow( httpResp, codecForBatchDepositSuccess(), ); await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return; } if (!dg.statusPerCoin) { return; } for (const batchIndex of batchIndexes) { const coinStatus = dg.statusPerCoin[batchIndex]; switch (coinStatus) { case DepositElementStatus.DepositPending: dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; await tx.depositGroups.put(dg); } } }, ); } const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["depositGroups"] }, async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); dg.operationStatus = DepositOperationStatus.PendingTrack; await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.progress(); } /** * Process a deposit group that is not in its final state yet. */ export async function processDepositGroup( wex: WalletExecutionContext, depositGroupId: string, ): Promise { const depositGroup = await wex.db.runReadOnlyTx( { storeNames: ["depositGroups"] }, async (tx) => { return tx.depositGroups.get(depositGroupId); }, ); if (!depositGroup) { logger.warn(`deposit group ${depositGroupId} not found`); return TaskRunResult.finished(); } switch (depositGroup.operationStatus) { case DepositOperationStatus.PendingTrack: return processDepositGroupPendingTrack(wex, depositGroup); case DepositOperationStatus.PendingKyc: return processDepositGroupPendingKyc(wex, depositGroup); case DepositOperationStatus.PendingDeposit: return processDepositGroupPendingDeposit(wex, depositGroup); case DepositOperationStatus.Aborting: return processDepositGroupAborting(wex, depositGroup); } return TaskRunResult.finished(); } /** * FIXME: Consider moving this to exchanges.ts. */ async function getExchangeWireFee( wex: WalletExecutionContext, wireType: string, baseUrl: string, time: TalerProtocolTimestamp, ): Promise { const exchangeDetails = await wex.db.runReadOnlyTx( { storeNames: ["exchangeDetails", "exchanges"] }, async (tx) => { const ex = await tx.exchanges.get(baseUrl); if (!ex || !ex.detailsPointer) return undefined; return await tx.exchangeDetails.indexes.byPointer.get([ baseUrl, ex.detailsPointer.currency, ex.detailsPointer.masterPublicKey, ]); }, ); if (!exchangeDetails) { throw Error(`exchange missing: ${baseUrl}`); } const fees = exchangeDetails.wireInfo.feesForType[wireType]; if (!fees || fees.length === 0) { throw Error( `exchange ${baseUrl} doesn't have fees for wire type ${wireType}`, ); } const fee = fees.find((x) => { return AbsoluteTime.isBetween( AbsoluteTime.fromProtocolTimestamp(time), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); }); if (!fee) { throw Error( `exchange ${exchangeDetails.exchangeBaseUrl} doesn't have fees for wire type ${wireType} at ${time.t_s}`, ); } return fee; } async function trackDeposit( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, coinPub: string, exchangeUrl: string, ): Promise { const wireHash = hashWire( depositGroup.wire.payto_uri, depositGroup.wire.salt, ); const url = new URL( `deposits/${wireHash}/${depositGroup.merchantPub}/${depositGroup.contractTermsHash}/${coinPub}`, exchangeUrl, ); const sigResp = await wex.cryptoApi.signTrackTransaction({ coinPub, contractTermsHash: depositGroup.contractTermsHash, merchantPriv: depositGroup.merchantPriv, merchantPub: depositGroup.merchantPub, wireHash, }); url.searchParams.set("merchant_sig", sigResp.sig); url.searchParams.set("timeout_ms", "30000"); const httpResp = await wex.http.fetch(url.href, { method: "GET", cancellationToken: wex.cancellationToken, }); logger.trace(`deposits response status: ${httpResp.status}`); switch (httpResp.status) { case HttpStatusCode.Accepted: { const accepted = await readSuccessResponseJsonOrThrow( httpResp, codecForTackTransactionAccepted(), ); return { type: "accepted", ...accepted }; } case HttpStatusCode.Ok: { const wired = await readSuccessResponseJsonOrThrow( httpResp, codecForTackTransactionWired(), ); return { type: "wired", ...wired }; } default: { throw Error( `unexpected response from track-transaction (${httpResp.status})`, ); } } } /** * Check if creating a deposit group is possible and calculate * the associated fees. */ export async function checkDepositGroup( wex: WalletExecutionContext, req: PrepareDepositRequest, ): Promise { const p = parsePaytoUri(req.depositPaytoUri); if (!p) { throw Error("invalid payto URI"); } const amount = Amounts.parseOrThrow(req.amount); const currency = Amounts.currencyOf(amount); const exchangeInfos: ExchangeHandle[] = []; await wex.db.runReadOnlyTx( { storeNames: ["exchangeDetails", "exchanges"] }, async (tx) => { const allExchanges = await tx.exchanges.iter().toArray(); for (const e of allExchanges) { const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); if (!details || amount.currency !== details.currency) { continue; } exchangeInfos.push({ master_pub: details.masterPublicKey, url: e.baseUrl, }); } }, ); const now = AbsoluteTime.now(); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); const contractTerms: MerchantContractTerms = { exchanges: exchangeInfos, amount: req.amount, max_fee: Amounts.stringify(amount), wire_method: p.targetType, timestamp: nowRounded, merchant_base_url: "", summary: "", nonce: "", wire_transfer_deadline: nowRounded, order_id: "", h_wire: "", pay_deadline: AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, Duration.fromSpec({ hours: 1 })), ), merchant: { name: "(wallet)", }, merchant_pub: "", refund_deadline: TalerProtocolTimestamp.zero(), }; const { h: contractTermsHash } = await wex.cryptoApi.hashString({ str: canonicalJson(contractTerms), }); const contractData = extractContractData( contractTerms, contractTermsHash, "", ); const payCoinSel = await selectPayCoins(wex, { restrictExchanges: { auditors: [], exchanges: contractData.allowedExchanges, }, restrictWireMethod: contractData.wireMethod, contractTermsAmount: Amounts.parseOrThrow(contractData.amount), depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), prevPayCoins: [], }); let selCoins: SelectedProspectiveCoin[] | undefined = undefined; switch (payCoinSel.type) { case "failure": throw TalerError.fromDetail( TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, }, ); case "prospective": selCoins = payCoinSel.result.prospectiveCoins; break; case "success": selCoins = payCoinSel.coinSel.coins; break; default: assertUnreachable(payCoinSel); } const totalDepositCost = await getTotalPaymentCost(wex, currency, selCoins); const effectiveDepositAmount = await getCounterpartyEffectiveDepositAmount( wex, p.targetType, selCoins, ); const fees = await getTotalFeesForDepositAmount( wex, p.targetType, amount, selCoins, ); return { totalDepositCost: Amounts.stringify(totalDepositCost), effectiveDepositAmount: Amounts.stringify(effectiveDepositAmount), fees, }; } export function generateDepositGroupTxId(): string { const depositGroupId = encodeCrock(getRandomBytes(32)); return constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: depositGroupId, }); } export async function createDepositGroup( wex: WalletExecutionContext, req: CreateDepositGroupRequest, ): Promise { const p = parsePaytoUri(req.depositPaytoUri); if (!p) { throw Error("invalid payto URI"); } const amount = Amounts.parseOrThrow(req.amount); const currency = amount.currency; const exchangeInfos: { url: string; master_pub: string }[] = []; await wex.db.runReadOnlyTx( { storeNames: ["exchanges", "exchangeDetails"] }, async (tx) => { const allExchanges = await tx.exchanges.iter().toArray(); for (const e of allExchanges) { const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); if (!details || amount.currency !== details.currency) { continue; } exchangeInfos.push({ master_pub: details.masterPublicKey, url: e.baseUrl, }); } }, ); const now = AbsoluteTime.now(); const wireDeadline = AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, Duration.fromSpec({ minutes: 5 })), ); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); const noncePair = await wex.cryptoApi.createEddsaKeypair({}); const merchantPair = await wex.cryptoApi.createEddsaKeypair({}); const wireSalt = encodeCrock(getRandomBytes(16)); const wireHash = hashWire(req.depositPaytoUri, wireSalt); const contractTerms: MerchantContractTerms = { exchanges: exchangeInfos, amount: req.amount, max_fee: Amounts.stringify(amount), wire_method: p.targetType, timestamp: nowRounded, merchant_base_url: "", summary: "", nonce: noncePair.pub, wire_transfer_deadline: wireDeadline, order_id: "", h_wire: wireHash, pay_deadline: AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, Duration.fromSpec({ hours: 1 })), ), merchant: { name: "(wallet)", }, merchant_pub: merchantPair.pub, refund_deadline: TalerProtocolTimestamp.zero(), }; const { h: contractTermsHash } = await wex.cryptoApi.hashString({ str: canonicalJson(contractTerms), }); const contractData = extractContractData( contractTerms, contractTermsHash, "", ); const payCoinSel = await selectPayCoins(wex, { restrictExchanges: { auditors: [], exchanges: contractData.allowedExchanges, }, restrictWireMethod: contractData.wireMethod, contractTermsAmount: Amounts.parseOrThrow(contractData.amount), depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), prevPayCoins: [], }); let coins: SelectedProspectiveCoin[] | undefined = undefined; switch (payCoinSel.type) { case "success": coins = payCoinSel.coinSel.coins; break; case "failure": throw TalerError.fromDetail( TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, }, ); case "prospective": coins = payCoinSel.result.prospectiveCoins; break; default: assertUnreachable(payCoinSel); } const totalDepositCost = await getTotalPaymentCost(wex, currency, coins); let depositGroupId: string; if (req.transactionId) { const txId = parseTransactionIdentifier(req.transactionId); if (!txId || txId.tag !== TransactionType.Deposit) { throw Error("invalid transaction ID"); } depositGroupId = txId.depositGroupId; } else { depositGroupId = encodeCrock(getRandomBytes(32)); } const infoPerExchange: Record = {}; for (let i = 0; i < coins.length; i++) { let depPerExchange = infoPerExchange[coins[i].exchangeBaseUrl]; if (!depPerExchange) { infoPerExchange[coins[i].exchangeBaseUrl] = depPerExchange = { amountEffective: Amounts.stringify( Amounts.zeroOfAmount(totalDepositCost), ), }; } const contrib = coins[i].contribution; depPerExchange.amountEffective = Amounts.stringify( Amounts.add(depPerExchange.amountEffective, contrib).amount, ); } const counterpartyEffectiveDepositAmount = await getCounterpartyEffectiveDepositAmount(wex, p.targetType, coins); const depositGroup: DepositGroupRecord = { contractTermsHash, depositGroupId, currency: Amounts.currencyOf(totalDepositCost), amount: contractData.amount, noncePriv: noncePair.priv, noncePub: noncePair.pub, timestampCreated: timestampPreciseToDb( AbsoluteTime.toPreciseTimestamp(now), ), timestampFinished: undefined, statusPerCoin: undefined, payCoinSelection: undefined, payCoinSelectionUid: undefined, merchantPriv: merchantPair.priv, merchantPub: merchantPair.pub, totalPayCost: Amounts.stringify(totalDepositCost), counterpartyEffectiveDepositAmount: Amounts.stringify( counterpartyEffectiveDepositAmount, ), wireTransferDeadline: timestampProtocolToDb( contractTerms.wire_transfer_deadline, ), wire: { payto_uri: req.depositPaytoUri, salt: wireSalt, }, operationStatus: DepositOperationStatus.PendingDeposit, infoPerExchange, }; if (payCoinSel.type === "success") { depositGroup.payCoinSelection = { coinContributions: payCoinSel.coinSel.coins.map((x) => x.contribution), coinPubs: payCoinSel.coinSel.coins.map((x) => x.coinPub), }; depositGroup.payCoinSelectionUid = encodeCrock(getRandomBytes(32)); depositGroup.statusPerCoin = payCoinSel.coinSel.coins.map( () => DepositElementStatus.DepositPending, ); } const ctx = new DepositTransactionContext(wex, depositGroupId); const transactionId = ctx.transactionId; const newTxState = await wex.db.runReadWriteTx( { storeNames: [ "depositGroups", "coins", "recoupGroups", "denominations", "refreshGroups", "refreshSessions", "coinAvailability", "contractTerms", ], }, async (tx) => { if (depositGroup.payCoinSelection) { await spendCoins(wex, tx, { allocationId: transactionId, coinPubs: depositGroup.payCoinSelection.coinPubs, contributions: depositGroup.payCoinSelection.coinContributions.map( (x) => Amounts.parseOrThrow(x), ), refreshReason: RefreshReason.PayDeposit, }); } await tx.depositGroups.put(depositGroup); await tx.contractTerms.put({ contractTermsRaw: contractTerms, h: contractTermsHash, }); return computeDepositTransactionStatus(depositGroup); }, ); wex.ws.notify({ type: NotificationType.TransactionStateTransition, transactionId, oldTxState: { major: TransactionMajorState.None, }, newTxState, }); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); wex.taskScheduler.startShepherdTask(ctx.taskId); return { depositGroupId, transactionId, }; } /** * Get the amount that will be deposited on the users bank * account after depositing, not considering aggregation. */ export async function getCounterpartyEffectiveDepositAmount( wex: WalletExecutionContext, wireType: string, pcs: SelectedProspectiveCoin[], ): Promise { const amt: AmountJson[] = []; const fees: AmountJson[] = []; const exchangeSet: Set = new Set(); await wex.db.runReadOnlyTx( { storeNames: ["coins", "denominations", "exchangeDetails", "exchanges"] }, async (tx) => { for (let i = 0; i < pcs.length; i++) { const denom = await getDenomInfo( wex, tx, pcs[i].exchangeBaseUrl, pcs[i].denomPubHash, ); if (!denom) { throw Error("can't find denomination to calculate deposit amount"); } amt.push(Amounts.parseOrThrow(pcs[i].contribution)); fees.push(Amounts.parseOrThrow(denom.feeDeposit)); exchangeSet.add(pcs[i].exchangeBaseUrl); } for (const exchangeUrl of exchangeSet.values()) { const exchangeDetails = await getExchangeWireDetailsInTx( tx, exchangeUrl, ); if (!exchangeDetails) { continue; } // FIXME/NOTE: the line below _likely_ throws exception // about "find method not found on undefined" when the wireType // is not supported by the Exchange. const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => { return AbsoluteTime.isBetween( AbsoluteTime.now(), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); })?.wireFee; if (fee) { fees.push(Amounts.parseOrThrow(fee)); } } }, ); return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } /** * Get the fee amount that will be charged when trying to deposit the * specified amount using the selected coins and the wire method. */ async function getTotalFeesForDepositAmount( wex: WalletExecutionContext, wireType: string, total: AmountJson, pcs: SelectedProspectiveCoin[], ): Promise { const wireFee: AmountJson[] = []; const coinFee: AmountJson[] = []; const refreshFee: AmountJson[] = []; const exchangeSet: Set = new Set(); await wex.db.runReadOnlyTx( { storeNames: ["coins", "denominations", "exchanges", "exchangeDetails"] }, async (tx) => { for (let i = 0; i < pcs.length; i++) { const denom = await getDenomInfo( wex, tx, pcs[i].exchangeBaseUrl, pcs[i].denomPubHash, ); if (!denom) { throw Error("can't find denomination to calculate deposit amount"); } coinFee.push(Amounts.parseOrThrow(denom.feeDeposit)); exchangeSet.add(pcs[i].exchangeBaseUrl); const amountLeft = Amounts.sub(denom.value, pcs[i].contribution).amount; const refreshCost = await getTotalRefreshCost( wex, tx, denom, amountLeft, ); refreshFee.push(refreshCost); } for (const exchangeUrl of exchangeSet.values()) { const exchangeDetails = await getExchangeWireDetailsInTx( tx, exchangeUrl, ); if (!exchangeDetails) { continue; } const fee = exchangeDetails.wireInfo.feesForType[wireType]?.find( (x) => { return AbsoluteTime.isBetween( AbsoluteTime.now(), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); }, )?.wireFee; if (fee) { wireFee.push(Amounts.parseOrThrow(fee)); } } }, ); return { coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount), wire: Amounts.stringify(Amounts.sumOrZero(total.currency, wireFee).amount), refresh: Amounts.stringify( Amounts.sumOrZero(total.currency, refreshFee).amount, ), }; }