/* This file is part of GNU Taler (C) 2021-2023 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * Implementation of the deposit transaction. */ /** * Imports. */ import { AbsoluteTime, AmountJson, Amounts, BatchDepositRequestCoin, CancellationToken, CoinRefreshRequest, CreateDepositGroupRequest, CreateDepositGroupResponse, DepositGroupFees, Duration, ExchangeBatchDepositRequest, ExchangeRefundRequest, HttpStatusCode, Logger, MerchantContractTerms, NotificationType, PayCoinSelection, PrepareDepositRequest, PrepareDepositResponse, RefreshReason, TalerError, TalerErrorCode, TalerPreciseTimestamp, TalerProtocolTimestamp, TrackTransaction, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, TransactionType, URL, WireFee, assertUnreachable, canonicalJson, checkDbInvariant, checkLogicInvariant, codecForBatchDepositSuccess, codecForTackTransactionAccepted, codecForTackTransactionWired, durationFromSpec, encodeCrock, getRandomBytes, hashTruncate32, hashWire, j2s, parsePaytoUri, stringToBytes, } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { selectPayCoinsNew } from "./coinSelection.js"; import { PendingTaskType, TaskIdStr, TaskRunResult, TombstoneTag, TransactionContext, constructTaskIdentifier, spendCoins, } from "./common.js"; import { DepositElementStatus, DepositGroupRecord, DepositInfoPerExchange, DepositOperationStatus, DepositTrackingInfo, KycPendingInfo, RefreshOperationStatus, timestampPreciseToDb, timestampProtocolToDb, } from "./db.js"; import { getExchangeWireDetailsInTx } from "./exchanges.js"; import { extractContractData, generateDepositPermissions, getTotalPaymentCost, } from "./pay-merchant.js"; import { CreateRefreshGroupResult, createRefreshGroup, getTotalRefreshCost, } from "./refresh.js"; import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, } from "./transactions.js"; import { InternalWalletState, WalletExecutionContext, getDenomInfo, } from "./wallet.js"; import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; /** * Logger. */ const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; constructor( public wex: WalletExecutionContext, public depositGroupId: string, ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId, }); } async deleteTransaction(): Promise { const depositGroupId = this.depositGroupId; const ws = this.wex; // FIXME: We should check first if we are in a final state // where deletion is allowed. await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => { const tipRecord = await tx.depositGroups.get(depositGroupId); if (tipRecord) { await tx.depositGroups.delete(depositGroupId); await tx.tombstones.put({ id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, }); } }); return; } async suspendTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); let newOpStatus: DepositOperationStatus | undefined; switch (dg.operationStatus) { case DepositOperationStatus.PendingDeposit: newOpStatus = DepositOperationStatus.SuspendedDeposit; break; case DepositOperationStatus.PendingKyc: newOpStatus = DepositOperationStatus.SuspendedKyc; break; case DepositOperationStatus.PendingTrack: newOpStatus = DepositOperationStatus.SuspendedTrack; break; case DepositOperationStatus.Aborting: newOpStatus = DepositOperationStatus.SuspendedAborting; break; } if (!newOpStatus) { return undefined; } dg.operationStatus = newOpStatus; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); switch (dg.operationStatus) { case DepositOperationStatus.Finished: return undefined; case DepositOperationStatus.PendingDeposit: case DepositOperationStatus.SuspendedDeposit: { dg.operationStatus = DepositOperationStatus.Aborting; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; } } return undefined; }, ); wex.taskScheduler.stopShepherdTask(retryTag); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); } async resumeTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't resume deposit group, depositGroupId=${depositGroupId} not found`, ); return; } const oldState = computeDepositTransactionStatus(dg); let newOpStatus: DepositOperationStatus | undefined; switch (dg.operationStatus) { case DepositOperationStatus.SuspendedDeposit: newOpStatus = DepositOperationStatus.PendingDeposit; break; case DepositOperationStatus.SuspendedAborting: newOpStatus = DepositOperationStatus.Aborting; break; case DepositOperationStatus.SuspendedKyc: newOpStatus = DepositOperationStatus.PendingKyc; break; case DepositOperationStatus.SuspendedTrack: newOpStatus = DepositOperationStatus.PendingTrack; break; } if (!newOpStatus) { return undefined; } dg.operationStatus = newOpStatus; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { const { wex, depositGroupId, transactionId, taskId } = this; const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( `can't cancel aborting deposit group, depositGroupId=${depositGroupId} not found`, ); return undefined; } const oldState = computeDepositTransactionStatus(dg); switch (dg.operationStatus) { case DepositOperationStatus.SuspendedAborting: case DepositOperationStatus.Aborting: { dg.operationStatus = DepositOperationStatus.Failed; await tx.depositGroups.put(dg); return { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; } } return undefined; }, ); wex.taskScheduler.stopShepherdTask(taskId); notifyTransition(wex, transactionId, transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); } } /** * Get the (DD37-style) transaction status based on the * database record of a deposit group. */ export function computeDepositTransactionStatus( dg: DepositGroupRecord, ): TransactionState { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return { major: TransactionMajorState.Done, }; case DepositOperationStatus.PendingDeposit: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.Deposit, }; case DepositOperationStatus.PendingKyc: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.KycRequired, }; case DepositOperationStatus.PendingTrack: return { major: TransactionMajorState.Pending, minor: TransactionMinorState.Track, }; case DepositOperationStatus.SuspendedKyc: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.KycRequired, }; case DepositOperationStatus.SuspendedTrack: return { major: TransactionMajorState.Suspended, minor: TransactionMinorState.Track, }; case DepositOperationStatus.SuspendedDeposit: return { major: TransactionMajorState.Suspended, }; case DepositOperationStatus.Aborting: return { major: TransactionMajorState.Aborting, }; case DepositOperationStatus.Aborted: return { major: TransactionMajorState.Aborted, }; case DepositOperationStatus.Failed: return { major: TransactionMajorState.Failed, }; case DepositOperationStatus.SuspendedAborting: return { major: TransactionMajorState.SuspendedAborting, }; default: assertUnreachable(dg.operationStatus); } } /** * Compute the possible actions possible on a deposit transaction * based on the current transaction state. */ export function computeDepositTransactionActions( dg: DepositGroupRecord, ): TransactionAction[] { switch (dg.operationStatus) { case DepositOperationStatus.Finished: return [TransactionAction.Delete]; case DepositOperationStatus.PendingDeposit: return [TransactionAction.Suspend, TransactionAction.Abort]; case DepositOperationStatus.SuspendedDeposit: return [TransactionAction.Resume]; case DepositOperationStatus.Aborting: return [TransactionAction.Fail, TransactionAction.Suspend]; case DepositOperationStatus.Aborted: return [TransactionAction.Delete]; case DepositOperationStatus.Failed: return [TransactionAction.Delete]; case DepositOperationStatus.SuspendedAborting: return [TransactionAction.Resume, TransactionAction.Fail]; case DepositOperationStatus.PendingKyc: return [TransactionAction.Suspend, TransactionAction.Fail]; case DepositOperationStatus.PendingTrack: return [TransactionAction.Suspend, TransactionAction.Abort]; case DepositOperationStatus.SuspendedKyc: return [TransactionAction.Resume, TransactionAction.Fail]; case DepositOperationStatus.SuspendedTrack: return [TransactionAction.Resume, TransactionAction.Abort]; default: assertUnreachable(dg.operationStatus); } } async function refundDepositGroup( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const newTxPerCoin = [...depositGroup.statusPerCoin]; logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`); for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { const st = depositGroup.statusPerCoin[i]; switch (st) { case DepositElementStatus.RefundFailed: case DepositElementStatus.RefundSuccess: break; default: { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; const coinExchange = await wex.db.runReadOnlyTx( ["coins"], async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; }, ); const refundAmount = depositGroup.payCoinSelection.coinContributions[i]; // We use a constant refund transaction ID, since there can // only be one refund. const rtid = 1; const sig = await wex.cryptoApi.signRefund({ coinPub, contractTermsHash: depositGroup.contractTermsHash, merchantPriv: depositGroup.merchantPriv, merchantPub: depositGroup.merchantPub, refundAmount: refundAmount, rtransactionId: rtid, }); const refundReq: ExchangeRefundRequest = { h_contract_terms: depositGroup.contractTermsHash, merchant_pub: depositGroup.merchantPub, merchant_sig: sig.sig, refund_amount: refundAmount, rtransaction_id: rtid, }; const refundUrl = new URL(`coins/${coinPub}/refund`, coinExchange); const httpResp = await wex.http.fetch(refundUrl.href, { method: "POST", body: refundReq, cancellationToken: wex.cancellationToken, }); logger.info( `coin ${i} refund HTTP status for coin: ${httpResp.status}`, ); let newStatus: DepositElementStatus; if (httpResp.status === 200) { // FIXME: validate response newStatus = DepositElementStatus.RefundSuccess; } else { // FIXME: Store problem somewhere! newStatus = DepositElementStatus.RefundFailed; } // FIXME: Handle case where refund request needs to be tried again newTxPerCoin[i] = newStatus; break; } } } let isDone = true; for (let i = 0; i < newTxPerCoin.length; i++) { if ( newTxPerCoin[i] != DepositElementStatus.RefundFailed && newTxPerCoin[i] != DepositElementStatus.RefundSuccess ) { isDone = false; } } const currency = Amounts.currencyOf(depositGroup.totalPayCost); const res = await wex.db.runReadWriteTx( [ "depositGroups", "refreshGroups", "coins", "denominations", "coinAvailability", ], async (tx) => { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; } newDg.statusPerCoin = newTxPerCoin; const refreshCoins: CoinRefreshRequest[] = []; for (let i = 0; i < newTxPerCoin.length; i++) { refreshCoins.push({ amount: depositGroup.payCoinSelection.coinContributions[i], coinPub: depositGroup.payCoinSelection.coinPubs[i], }); } let refreshRes: CreateRefreshGroupResult | undefined = undefined; if (isDone) { refreshRes = await createRefreshGroup( wex, tx, currency, refreshCoins, RefreshReason.AbortDeposit, constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: newDg.depositGroupId, }), ); newDg.abortRefreshGroupId = refreshRes.refreshGroupId; } await tx.depositGroups.put(newDg); return { refreshRes }; }, ); if (res?.refreshRes) { for (const notif of res.refreshRes.notifications) { wex.ws.notify(notif); } } return TaskRunResult.backoff(); } /** * Check whether the refresh associated with the * aborting deposit group is done. * * If done, mark the deposit transaction as aborted. * * Otherwise continue waiting. * * FIXME: Wait for the refresh group notifications instead of periodically * checking the refresh group status. * FIXME: This is just one transaction, can't we do this in the initial * transaction of processDepositGroup? */ async function waitForRefreshOnDepositGroup( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const abortRefreshGroupId = depositGroup.abortRefreshGroupId; checkLogicInvariant(!!abortRefreshGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: depositGroup.depositGroupId, }); const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups", "refreshGroups"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: DepositOperationStatus | undefined; if (!refreshGroup) { // Maybe it got manually deleted? Means that we should // just go into aborted. logger.warn("no aborting refresh group found for deposit group"); newOpState = DepositOperationStatus.Aborted; } else { if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { newOpState = DepositOperationStatus.Aborted; } else if ( refreshGroup.operationStatus === RefreshOperationStatus.Failed ) { newOpState = DepositOperationStatus.Aborted; } } if (newOpState) { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; } const oldTxState = computeDepositTransactionStatus(newDg); newDg.operationStatus = newOpState; const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; } return undefined; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); return TaskRunResult.backoff(); } async function processDepositGroupAborting( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { logger.info("processing deposit tx in 'aborting'"); const abortRefreshGroupId = depositGroup.abortRefreshGroupId; if (!abortRefreshGroupId) { logger.info("refunding deposit group"); return refundDepositGroup(wex, depositGroup); } logger.info("waiting for refresh"); return waitForRefreshOnDepositGroup(wex, depositGroup); } async function processDepositGroupPendingKyc( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const { depositGroupId } = depositGroup; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); const kycInfo = depositGroup.kycInfo; const userType = "individual"; if (!kycInfo) { throw Error("invalid DB state, in pending(kyc), but no kycInfo present"); } const url = new URL( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, kycInfo.exchangeBaseUrl, ); url.searchParams.set("timeout_ms", "10000"); logger.info(`kyc url ${url.href}`); const kycStatusRes = await wex.http.fetch(url.href, { method: "GET", cancellationToken: wex.cancellationToken, }); if ( kycStatusRes.status === HttpStatusCode.Ok || //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const newDg = await tx.depositGroups.get(depositGroupId); if (!newDg) { return; } if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { return; } const oldTxState = computeDepositTransactionStatus(newDg); newDg.operationStatus = DepositOperationStatus.PendingTrack; const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? } else { throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); } return TaskRunResult.backoff(); } /** * Tracking information from the exchange indicated that * KYC is required. We need to check the KYC info * and transition the transaction to the KYC required state. */ async function transitionToKycRequired( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, kycInfo: KycPendingInfo, exchangeUrl: string, ): Promise { const { depositGroupId } = depositGroup; const userType = "individual"; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); const url = new URL( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, exchangeUrl, ); logger.info(`kyc url ${url.href}`); const kycStatusReq = await wex.http.fetch(url.href, { method: "GET", }); if (kycStatusReq.status === HttpStatusCode.Ok) { logger.warn("kyc requested, but already fulfilled"); return TaskRunResult.backoff(); } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } if (dg.operationStatus !== DepositOperationStatus.PendingTrack) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); dg.kycInfo = { exchangeBaseUrl: exchangeUrl, kycUrl: kycStatus.kyc_url, paytoHash: kycInfo.paytoHash, requirementRow: kycInfo.requirementRow, }; await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.finished(); } else { throw Error(`unexpected response from kyc-check (${kycStatusReq.status})`); } } async function processDepositGroupPendingTrack( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, ): Promise { const { depositGroupId } = depositGroup; for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; // FIXME: Make the URL part of the coin selection? const exchangeBaseUrl = await wex.db.runReadWriteTx( ["coins"], async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; }, ); let updatedTxStatus: DepositElementStatus | undefined = undefined; let newWiredCoin: | { id: string; value: DepositTrackingInfo; } | undefined; if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) { const track = await trackDeposit( wex, depositGroup, coinPub, exchangeBaseUrl, ); if (track.type === "accepted") { if (!track.kyc_ok && track.requirement_row !== undefined) { const paytoHash = encodeCrock( hashTruncate32(stringToBytes(depositGroup.wire.payto_uri + "\0")), ); const { requirement_row: requirementRow } = track; const kycInfo: KycPendingInfo = { paytoHash, requirementRow, }; return transitionToKycRequired( wex, depositGroup, kycInfo, exchangeBaseUrl, ); } else { updatedTxStatus = DepositElementStatus.Tracking; } } else if (track.type === "wired") { updatedTxStatus = DepositElementStatus.Wired; const payto = parsePaytoUri(depositGroup.wire.payto_uri); if (!payto) { throw Error(`unparsable payto: ${depositGroup.wire.payto_uri}`); } const fee = await getExchangeWireFee( wex, payto.targetType, exchangeBaseUrl, track.execution_time, ); const raw = Amounts.parseOrThrow(track.coin_contribution); const wireFee = Amounts.parseOrThrow(fee.wireFee); newWiredCoin = { value: { amountRaw: Amounts.stringify(raw), wireFee: Amounts.stringify(wireFee), exchangePub: track.exchange_pub, timestampExecuted: timestampProtocolToDb(track.execution_time), wireTransferId: track.wtid, }, id: track.exchange_sig, }; } else { updatedTxStatus = DepositElementStatus.DepositPending; } } if (updatedTxStatus !== undefined) { await wex.db.runReadWriteTx(["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return; } if (updatedTxStatus !== undefined) { dg.statusPerCoin[i] = updatedTxStatus; } if (newWiredCoin) { /** * FIXME: if there is a new wire information from the exchange * it should add up to the previous tracking states. * * This may loose information by overriding prev state. * * And: add checks to integration tests */ if (!dg.trackingState) { dg.trackingState = {}; } dg.trackingState[newWiredCoin.id] = newWiredCoin.value; } await tx.depositGroups.put(dg); }); } } let allWired = true; const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) { allWired = false; break; } } if (allWired) { dg.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); dg.operationStatus = DepositOperationStatus.Finished; await tx.depositGroups.put(dg); } const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); notifyTransition(wex, transactionId, transitionInfo); if (allWired) { wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); return TaskRunResult.finished(); } else { // Backing off until deposit long-polling works in the exchange. return TaskRunResult.backoff(); // return TaskRunResult.longpollReturnedPending(); } } async function processDepositGroupPendingDeposit( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, cancellationToken?: CancellationToken, ): Promise { logger.info("processing deposit group in pending(deposit)"); const depositGroupId = depositGroup.depositGroupId; const contractTermsRec = await wex.db.runReadOnlyTx( ["contractTerms"], async (tx) => { return tx.contractTerms.get(depositGroup.contractTermsHash); }, ); if (!contractTermsRec) { throw Error("contract terms for deposit not found in database"); } const contractTerms: MerchantContractTerms = contractTermsRec.contractTermsRaw; const contractData = extractContractData( contractTermsRec.contractTermsRaw, depositGroup.contractTermsHash, "", ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, }); // Check for cancellation before expensive operations. cancellationToken?.throwIfCancelled(); // FIXME: Cache these! const depositPermissions = await generateDepositPermissions( wex, depositGroup.payCoinSelection, contractData, ); // Exchanges involved in the deposit const exchanges: Set = new Set(); for (const dp of depositPermissions) { exchanges.add(dp.exchange_url); } // We need to do one batch per exchange. for (const exchangeUrl of exchanges.values()) { const coins: BatchDepositRequestCoin[] = []; const batchIndexes: number[] = []; const batchReq: ExchangeBatchDepositRequest = { coins, h_contract_terms: depositGroup.contractTermsHash, merchant_payto_uri: depositGroup.wire.payto_uri, merchant_pub: contractTerms.merchant_pub, timestamp: contractTerms.timestamp, wire_salt: depositGroup.wire.salt, wire_transfer_deadline: contractTerms.wire_transfer_deadline, refund_deadline: contractTerms.refund_deadline, }; for (let i = 0; i < depositPermissions.length; i++) { const perm = depositPermissions[i]; if (perm.exchange_url != exchangeUrl) { continue; } coins.push({ coin_pub: perm.coin_pub, coin_sig: perm.coin_sig, contribution: Amounts.stringify(perm.contribution), denom_pub_hash: perm.h_denom, ub_sig: perm.ub_sig, h_age_commitment: perm.h_age_commitment, }); batchIndexes.push(i); } // Check for cancellation before making network request. cancellationToken?.throwIfCancelled(); const url = new URL(`batch-deposit`, exchangeUrl); logger.info(`depositing to ${url.href}`); logger.trace(`deposit request: ${j2s(batchReq)}`); const httpResp = await wex.http.fetch(url.href, { method: "POST", body: batchReq, cancellationToken: cancellationToken, }); await readSuccessResponseJsonOrThrow( httpResp, codecForBatchDepositSuccess(), ); await wex.db.runReadWriteTx(["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return; } for (const batchIndex of batchIndexes) { const coinStatus = dg.statusPerCoin[batchIndex]; switch (coinStatus) { case DepositElementStatus.DepositPending: dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; await tx.depositGroups.put(dg); } } }); } const transitionInfo = await wex.db.runReadWriteTx( ["depositGroups"], async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; } const oldTxState = computeDepositTransactionStatus(dg); dg.operationStatus = DepositOperationStatus.PendingTrack; await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); return TaskRunResult.progress(); } /** * Process a deposit group that is not in its final state yet. */ export async function processDepositGroup( wex: WalletExecutionContext, depositGroupId: string, ): Promise { const depositGroup = await wex.db.runReadOnlyTx( ["depositGroups"], async (tx) => { return tx.depositGroups.get(depositGroupId); }, ); if (!depositGroup) { logger.warn(`deposit group ${depositGroupId} not found`); return TaskRunResult.finished(); } switch (depositGroup.operationStatus) { case DepositOperationStatus.PendingTrack: return processDepositGroupPendingTrack(wex, depositGroup); case DepositOperationStatus.PendingKyc: return processDepositGroupPendingKyc(wex, depositGroup); case DepositOperationStatus.PendingDeposit: return processDepositGroupPendingDeposit(wex, depositGroup); case DepositOperationStatus.Aborting: return processDepositGroupAborting(wex, depositGroup); } return TaskRunResult.finished(); } /** * FIXME: Consider moving this to exchanges.ts. */ async function getExchangeWireFee( wex: WalletExecutionContext, wireType: string, baseUrl: string, time: TalerProtocolTimestamp, ): Promise { const exchangeDetails = await wex.db.runReadOnlyTx( ["exchangeDetails", "exchanges"], async (tx) => { const ex = await tx.exchanges.get(baseUrl); if (!ex || !ex.detailsPointer) return undefined; return await tx.exchangeDetails.indexes.byPointer.get([ baseUrl, ex.detailsPointer.currency, ex.detailsPointer.masterPublicKey, ]); }, ); if (!exchangeDetails) { throw Error(`exchange missing: ${baseUrl}`); } const fees = exchangeDetails.wireInfo.feesForType[wireType]; if (!fees || fees.length === 0) { throw Error( `exchange ${baseUrl} doesn't have fees for wire type ${wireType}`, ); } const fee = fees.find((x) => { return AbsoluteTime.isBetween( AbsoluteTime.fromProtocolTimestamp(time), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); }); if (!fee) { throw Error( `exchange ${exchangeDetails.exchangeBaseUrl} doesn't have fees for wire type ${wireType} at ${time.t_s}`, ); } return fee; } async function trackDeposit( wex: WalletExecutionContext, depositGroup: DepositGroupRecord, coinPub: string, exchangeUrl: string, ): Promise { const wireHash = hashWire( depositGroup.wire.payto_uri, depositGroup.wire.salt, ); const url = new URL( `deposits/${wireHash}/${depositGroup.merchantPub}/${depositGroup.contractTermsHash}/${coinPub}`, exchangeUrl, ); const sigResp = await wex.cryptoApi.signTrackTransaction({ coinPub, contractTermsHash: depositGroup.contractTermsHash, merchantPriv: depositGroup.merchantPriv, merchantPub: depositGroup.merchantPub, wireHash, }); url.searchParams.set("merchant_sig", sigResp.sig); // Not doing long-polling yet as it looks like it's broken in the exchange (2024-02-20) // url.searchParams.set("timeout_ms", "30000"); const httpResp = await wex.http.fetch(url.href, { method: "GET", cancellationToken: wex.cancellationToken, }); logger.trace(`deposits response status: ${httpResp.status}`); switch (httpResp.status) { case HttpStatusCode.Accepted: { const accepted = await readSuccessResponseJsonOrThrow( httpResp, codecForTackTransactionAccepted(), ); return { type: "accepted", ...accepted }; } case HttpStatusCode.Ok: { const wired = await readSuccessResponseJsonOrThrow( httpResp, codecForTackTransactionWired(), ); return { type: "wired", ...wired }; } default: { throw Error( `unexpected response from track-transaction (${httpResp.status})`, ); } } } /** * Check if creating a deposit group is possible and calculate * the associated fees. * * FIXME: This should be renamed to checkDepositGroup, * as it doesn't prepare anything */ export async function prepareDepositGroup( wex: WalletExecutionContext, req: PrepareDepositRequest, ): Promise { const p = parsePaytoUri(req.depositPaytoUri); if (!p) { throw Error("invalid payto URI"); } const amount = Amounts.parseOrThrow(req.amount); const exchangeInfos: { url: string; master_pub: string }[] = []; await wex.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => { const allExchanges = await tx.exchanges.iter().toArray(); for (const e of allExchanges) { const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); if (!details || amount.currency !== details.currency) { continue; } exchangeInfos.push({ master_pub: details.masterPublicKey, url: e.baseUrl, }); } }); const now = AbsoluteTime.now(); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); const contractTerms: MerchantContractTerms = { exchanges: exchangeInfos, amount: req.amount, max_fee: Amounts.stringify(amount), max_wire_fee: Amounts.stringify(amount), wire_method: p.targetType, timestamp: nowRounded, merchant_base_url: "", summary: "", nonce: "", wire_transfer_deadline: nowRounded, order_id: "", h_wire: "", pay_deadline: AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, durationFromSpec({ hours: 1 })), ), merchant: { name: "(wallet)", }, merchant_pub: "", refund_deadline: TalerProtocolTimestamp.zero(), }; const { h: contractTermsHash } = await wex.cryptoApi.hashString({ str: canonicalJson(contractTerms), }); const contractData = extractContractData( contractTerms, contractTermsHash, "", ); const payCoinSel = await selectPayCoinsNew(wex, { auditors: [], exchanges: contractData.allowedExchanges, wireMethod: contractData.wireMethod, contractTermsAmount: Amounts.parseOrThrow(contractData.amount), depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), wireFeeAmortization: contractData.wireFeeAmortization ?? 1, wireFeeLimit: Amounts.parseOrThrow(contractData.maxWireFee), prevPayCoins: [], }); if (payCoinSel.type !== "success") { throw TalerError.fromDetail( TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, }, ); } const totalDepositCost = await getTotalPaymentCost(wex, payCoinSel.coinSel); const effectiveDepositAmount = await getCounterpartyEffectiveDepositAmount( wex, p.targetType, payCoinSel.coinSel, ); const fees = await getTotalFeesForDepositAmount( wex, p.targetType, amount, payCoinSel.coinSel, ); return { totalDepositCost: Amounts.stringify(totalDepositCost), effectiveDepositAmount: Amounts.stringify(effectiveDepositAmount), fees, }; } export function generateDepositGroupTxId(): string { const depositGroupId = encodeCrock(getRandomBytes(32)); return constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: depositGroupId, }); } export async function createDepositGroup( wex: WalletExecutionContext, req: CreateDepositGroupRequest, ): Promise { const p = parsePaytoUri(req.depositPaytoUri); if (!p) { throw Error("invalid payto URI"); } const amount = Amounts.parseOrThrow(req.amount); const exchangeInfos: { url: string; master_pub: string }[] = []; await wex.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { const allExchanges = await tx.exchanges.iter().toArray(); for (const e of allExchanges) { const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); if (!details || amount.currency !== details.currency) { continue; } exchangeInfos.push({ master_pub: details.masterPublicKey, url: e.baseUrl, }); } }); const now = AbsoluteTime.now(); const wireDeadline = AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, Duration.fromSpec({ minutes: 5 })), ); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); const noncePair = await wex.cryptoApi.createEddsaKeypair({}); const merchantPair = await wex.cryptoApi.createEddsaKeypair({}); const wireSalt = encodeCrock(getRandomBytes(16)); const wireHash = hashWire(req.depositPaytoUri, wireSalt); const contractTerms: MerchantContractTerms = { exchanges: exchangeInfos, amount: req.amount, max_fee: Amounts.stringify(amount), max_wire_fee: Amounts.stringify(amount), wire_method: p.targetType, timestamp: nowRounded, merchant_base_url: "", summary: "", nonce: noncePair.pub, wire_transfer_deadline: wireDeadline, order_id: "", h_wire: wireHash, pay_deadline: AbsoluteTime.toProtocolTimestamp( AbsoluteTime.addDuration(now, durationFromSpec({ hours: 1 })), ), merchant: { name: "(wallet)", }, merchant_pub: merchantPair.pub, refund_deadline: TalerProtocolTimestamp.zero(), }; const { h: contractTermsHash } = await wex.cryptoApi.hashString({ str: canonicalJson(contractTerms), }); const contractData = extractContractData( contractTerms, contractTermsHash, "", ); const payCoinSel = await selectPayCoinsNew(wex, { auditors: [], exchanges: contractData.allowedExchanges, wireMethod: contractData.wireMethod, contractTermsAmount: Amounts.parseOrThrow(contractData.amount), depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), wireFeeAmortization: contractData.wireFeeAmortization ?? 1, wireFeeLimit: Amounts.parseOrThrow(contractData.maxWireFee), prevPayCoins: [], }); if (payCoinSel.type !== "success") { throw TalerError.fromDetail( TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, { insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, }, ); } const totalDepositCost = await getTotalPaymentCost(wex, payCoinSel.coinSel); let depositGroupId: string; if (req.transactionId) { const txId = parseTransactionIdentifier(req.transactionId); if (!txId || txId.tag !== TransactionType.Deposit) { throw Error("invalid transaction ID"); } depositGroupId = txId.depositGroupId; } else { depositGroupId = encodeCrock(getRandomBytes(32)); } const infoPerExchange: Record = {}; await wex.db.runReadOnlyTx(["coins"], async (tx) => { for (let i = 0; i < payCoinSel.coinSel.coinPubs.length; i++) { const coin = await tx.coins.get(payCoinSel.coinSel.coinPubs[i]); if (!coin) { logger.error("coin not found anymore"); continue; } let depPerExchange = infoPerExchange[coin.exchangeBaseUrl]; if (!depPerExchange) { infoPerExchange[coin.exchangeBaseUrl] = depPerExchange = { amountEffective: Amounts.stringify( Amounts.zeroOfAmount(totalDepositCost), ), }; } const contrib = payCoinSel.coinSel.coinContributions[i]; depPerExchange.amountEffective = Amounts.stringify( Amounts.add(depPerExchange.amountEffective, contrib).amount, ); } }); const counterpartyEffectiveDepositAmount = await getCounterpartyEffectiveDepositAmount( wex, p.targetType, payCoinSel.coinSel, ); const depositGroup: DepositGroupRecord = { contractTermsHash, depositGroupId, currency: Amounts.currencyOf(totalDepositCost), amount: contractData.amount, noncePriv: noncePair.priv, noncePub: noncePair.pub, timestampCreated: timestampPreciseToDb( AbsoluteTime.toPreciseTimestamp(now), ), timestampFinished: undefined, statusPerCoin: payCoinSel.coinSel.coinPubs.map( () => DepositElementStatus.DepositPending, ), payCoinSelection: payCoinSel.coinSel, payCoinSelectionUid: encodeCrock(getRandomBytes(32)), merchantPriv: merchantPair.priv, merchantPub: merchantPair.pub, totalPayCost: Amounts.stringify(totalDepositCost), counterpartyEffectiveDepositAmount: Amounts.stringify( counterpartyEffectiveDepositAmount, ), wireTransferDeadline: timestampProtocolToDb( contractTerms.wire_transfer_deadline, ), wire: { payto_uri: req.depositPaytoUri, salt: wireSalt, }, operationStatus: DepositOperationStatus.PendingDeposit, infoPerExchange, }; const ctx = new DepositTransactionContext(wex, depositGroupId); const transactionId = ctx.transactionId; const newTxState = await wex.db.runReadWriteTx( [ "depositGroups", "coins", "recoupGroups", "denominations", "refreshGroups", "coinAvailability", "contractTerms", ], async (tx) => { await spendCoins(wex, tx, { allocationId: transactionId, coinPubs: payCoinSel.coinSel.coinPubs, contributions: payCoinSel.coinSel.coinContributions.map((x) => Amounts.parseOrThrow(x), ), refreshReason: RefreshReason.PayDeposit, }); await tx.depositGroups.put(depositGroup); await tx.contractTerms.put({ contractTermsRaw: contractTerms, h: contractTermsHash, }); return computeDepositTransactionStatus(depositGroup); }, ); wex.ws.notify({ type: NotificationType.TransactionStateTransition, transactionId, oldTxState: { major: TransactionMajorState.None, }, newTxState, }); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); wex.taskScheduler.startShepherdTask(ctx.taskId); return { depositGroupId, transactionId, }; } /** * Get the amount that will be deposited on the users bank * account after depositing, not considering aggregation. */ export async function getCounterpartyEffectiveDepositAmount( wex: WalletExecutionContext, wireType: string, pcs: PayCoinSelection, ): Promise { const amt: AmountJson[] = []; const fees: AmountJson[] = []; const exchangeSet: Set = new Set(); await wex.db.runReadOnlyTx( ["coins", "denominations", "exchangeDetails", "exchanges"], async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { throw Error("can't calculate deposit amount, coin not found"); } const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); if (!denom) { throw Error("can't find denomination to calculate deposit amount"); } amt.push(Amounts.parseOrThrow(pcs.coinContributions[i])); fees.push(Amounts.parseOrThrow(denom.feeDeposit)); exchangeSet.add(coin.exchangeBaseUrl); } for (const exchangeUrl of exchangeSet.values()) { const exchangeDetails = await getExchangeWireDetailsInTx( tx, exchangeUrl, ); if (!exchangeDetails) { continue; } // FIXME/NOTE: the line below _likely_ throws exception // about "find method not found on undefined" when the wireType // is not supported by the Exchange. const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => { return AbsoluteTime.isBetween( AbsoluteTime.now(), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); })?.wireFee; if (fee) { fees.push(Amounts.parseOrThrow(fee)); } } }, ); return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } /** * Get the fee amount that will be charged when trying to deposit the * specified amount using the selected coins and the wire method. */ async function getTotalFeesForDepositAmount( wex: WalletExecutionContext, wireType: string, total: AmountJson, pcs: PayCoinSelection, ): Promise { const wireFee: AmountJson[] = []; const coinFee: AmountJson[] = []; const refreshFee: AmountJson[] = []; const exchangeSet: Set = new Set(); const currency = Amounts.currencyOf(total); await wex.db.runReadOnlyTx( ["coins", "denominations", "exchanges", "exchangeDetails"], async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { throw Error("can't calculate deposit amount, coin not found"); } const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); if (!denom) { throw Error("can't find denomination to calculate deposit amount"); } coinFee.push(Amounts.parseOrThrow(denom.feeDeposit)); exchangeSet.add(coin.exchangeBaseUrl); const allDenoms = await getCandidateWithdrawalDenomsTx( wex, tx, coin.exchangeBaseUrl, currency, ); const amountLeft = Amounts.sub( denom.value, pcs.coinContributions[i], ).amount; const refreshCost = getTotalRefreshCost( allDenoms, denom, amountLeft, wex.ws.config.testing.denomselAllowLate, ); refreshFee.push(refreshCost); } for (const exchangeUrl of exchangeSet.values()) { const exchangeDetails = await getExchangeWireDetailsInTx( tx, exchangeUrl, ); if (!exchangeDetails) { continue; } const fee = exchangeDetails.wireInfo.feesForType[wireType]?.find( (x) => { return AbsoluteTime.isBetween( AbsoluteTime.now(), AbsoluteTime.fromProtocolTimestamp(x.startStamp), AbsoluteTime.fromProtocolTimestamp(x.endStamp), ); }, )?.wireFee; if (fee) { wireFee.push(Amounts.parseOrThrow(fee)); } } }, ); return { coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount), wire: Amounts.stringify(Amounts.sumOrZero(total.currency, wireFee).amount), refresh: Amounts.stringify( Amounts.sumOrZero(total.currency, refreshFee).amount, ), }; }