/* This file is part of GNU Taler (C) 2019 GNUnet e.V. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ import { AgeCommitment, AgeRestriction, AmountJson, Amounts, amountToPretty, assertUnreachable, AsyncFlag, checkDbInvariant, codecForExchangeMeltResponse, codecForExchangeRevealResponse, CoinPublicKeyString, CoinRefreshRequest, CoinStatus, DenominationInfo, DenomKeyType, Duration, encodeCrock, ExchangeMeltRequest, ExchangeProtocolVersion, ExchangeRefreshRevealRequest, fnutil, ForceRefreshRequest, getErrorDetailFromException, getRandomBytes, HashCodeString, HttpStatusCode, j2s, Logger, makeErrorDetail, NotificationType, RefreshReason, TalerError, TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionState, TransactionType, URL, WalletNotification, } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrThrow, readTalerErrorResponse, readUnexpectedResponseDetails, throwUnexpectedRequestError, } from "@gnu-taler/taler-util/http"; import { constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, PendingTaskType, TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, TransactionContext, } from "./common.js"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { DerivedRefreshSession, RefreshNewDenomInfo, } from "./crypto/cryptoTypes.js"; import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { CoinRecord, CoinSourceType, DenominationRecord, RefreshCoinStatus, RefreshGroupPerExchangeInfo, RefreshGroupRecord, RefreshOperationStatus, RefreshSessionRecord, timestampPreciseToDb, WalletDbReadOnlyTransaction, WalletDbReadWriteTransaction, } from "./db.js"; import { selectWithdrawalDenominations } from "./denomSelection.js"; import { fetchFreshExchange } from "./exchanges.js"; import { constructTransactionIdentifier, notifyTransition, } from "./transactions.js"; import { EXCHANGE_COINS_LOCK, getDenomInfo, WalletExecutionContext, } from "./wallet.js"; import { getCandidateWithdrawalDenomsTx, updateWithdrawalDenoms, } from "./withdraw.js"; const logger = new Logger("refresh.ts"); export class RefreshTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; constructor( public wex: WalletExecutionContext, public refreshGroupId: string, ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }); this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId, }); } async deleteTransaction(): Promise { const refreshGroupId = this.refreshGroupId; await this.wex.db.runReadWriteTx( ["refreshGroups", "tombstones"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (rg) { await tx.refreshGroups.delete(refreshGroupId); await tx.tombstones.put({ id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId, }); } }, ); } async suspendTransaction(): Promise { const { wex, refreshGroupId, transactionId } = this; let transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`, ); return undefined; } const oldState = computeRefreshTransactionState(dg); switch (dg.operationStatus) { case RefreshOperationStatus.Finished: case RefreshOperationStatus.Suspended: case RefreshOperationStatus.Failed: return undefined; case RefreshOperationStatus.Pending: { dg.operationStatus = RefreshOperationStatus.Suspended; await tx.refreshGroups.put(dg); break; } default: assertUnreachable(dg.operationStatus); } return { oldTxState: oldState, newTxState: computeRefreshTransactionState(dg), }; }, ); wex.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(wex, transactionId, transitionInfo); } async abortTransaction(): Promise { // Refresh transactions only support fail, not abort. throw new Error("refresh transactions cannot be aborted"); } async resumeTransaction(): Promise { const { wex, refreshGroupId, transactionId } = this; const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( `can't resume refresh group, refreshGroupId=${refreshGroupId} not found`, ); return; } const oldState = computeRefreshTransactionState(dg); switch (dg.operationStatus) { case RefreshOperationStatus.Finished: return; case RefreshOperationStatus.Pending: { return; } case RefreshOperationStatus.Suspended: dg.operationStatus = RefreshOperationStatus.Pending; await tx.refreshGroups.put(dg); return { oldTxState: oldState, newTxState: computeRefreshTransactionState(dg), }; } return undefined; }, ); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { const { wex, refreshGroupId, transactionId } = this; const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( `can't resume refresh group, refreshGroupId=${refreshGroupId} not found`, ); return; } const oldState = computeRefreshTransactionState(dg); let newStatus: RefreshOperationStatus | undefined; switch (dg.operationStatus) { case RefreshOperationStatus.Finished: break; case RefreshOperationStatus.Pending: case RefreshOperationStatus.Suspended: newStatus = RefreshOperationStatus.Failed; break; case RefreshOperationStatus.Failed: break; default: assertUnreachable(dg.operationStatus); } if (newStatus) { dg.operationStatus = newStatus; await tx.refreshGroups.put(dg); } return { oldTxState: oldState, newTxState: computeRefreshTransactionState(dg), }; }, ); wex.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(wex, transactionId, transitionInfo); wex.taskScheduler.startShepherdTask(this.taskId); } } /** * Get the amount that we lose when refreshing a coin of the given denomination * with a certain amount left. * * If the amount left is zero, then the refresh cost * is also considered to be zero. If a refresh isn't possible (e.g. due to lack of * the right denominations), then the cost is the full amount left. * * Considers refresh fees, withdrawal fees after refresh and amounts too small * to refresh. */ export function getTotalRefreshCost( denoms: DenominationRecord[], refreshedDenom: DenominationInfo, amountLeft: AmountJson, denomselAllowLate: boolean, ): AmountJson { const withdrawAmount = Amounts.sub( amountLeft, refreshedDenom.feeRefresh, ).amount; const denomMap = Object.fromEntries(denoms.map((x) => [x.denomPubHash, x])); const withdrawDenoms = selectWithdrawalDenominations( withdrawAmount, denoms, denomselAllowLate, ); const resultingAmount = Amounts.add( Amounts.zeroOfCurrency(withdrawAmount.currency), ...withdrawDenoms.selectedDenoms.map( (d) => Amounts.mult(denomMap[d.denomPubHash].value, d.count).amount, ), ).amount; const totalCost = Amounts.sub(amountLeft, resultingAmount).amount; logger.trace( `total refresh cost for ${amountToPretty(amountLeft)} is ${amountToPretty( totalCost, )}`, ); return totalCost; } function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } { const allFinal = fnutil.all( rg.statusPerCoin, (x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed, ); const anyFailed = fnutil.any( rg.statusPerCoin, (x) => x === RefreshCoinStatus.Failed, ); if (allFinal) { if (anyFailed) { rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now()); rg.operationStatus = RefreshOperationStatus.Failed; } else { rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now()); rg.operationStatus = RefreshOperationStatus.Finished; } return { final: true }; } return { final: false }; } /** * Create a refresh session for one particular coin inside a refresh group. * * If the session already exists, return the existing one. * * If the session doesn't need to be created (refresh group gone or session already * finished), return undefined. */ async function provideRefreshSession( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { logger.trace( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); const d = await wex.db.runReadWriteTx( ["coins", "refreshGroups", "refreshSessions"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; } if ( refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished ) { return; } const existingRefreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; const coin = await tx.coins.get(oldCoinPub); if (!coin) { throw Error("Can't refresh, coin not found"); } return { refreshGroup, coin, existingRefreshSession }; }, ); if (!d) { return undefined; } if (d.existingRefreshSession) { return d.existingRefreshSession; } const { refreshGroup, coin } = d; const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl); // FIXME: use helper functions from withdraw.ts // to update and filter withdrawable denoms. const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx( ["denominations"], async (tx) => { const oldDenom = await getDenomInfo( wex, tx, exch.exchangeBaseUrl, coin.denomPubHash, ); if (!oldDenom) { throw Error("db inconsistent: denomination for coin not found"); } // FIXME: Use denom groups instead of querying all denominations! const availableDenoms: DenominationRecord[] = await tx.denominations.indexes.byExchangeBaseUrl .iter(exch.exchangeBaseUrl) .toArray(); const availableAmount = Amounts.sub( refreshGroup.inputPerCoin[coinIndex], oldDenom.feeRefresh, ).amount; return { availableAmount, availableDenoms }; }, ); const newCoinDenoms = selectWithdrawalDenominations( availableAmount, availableDenoms, wex.ws.config.testing.denomselAllowLate, ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }); if (newCoinDenoms.selectedDenoms.length === 0) { logger.trace( `not refreshing, available amount ${amountToPretty( availableAmount, )} too small`, ); const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "coins", "coinAvailability"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } const oldTxState = computeRefreshTransactionState(rg); rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; const updateRes = updateGroupStatus(rg); if (updateRes.final) { await makeCoinsVisible(wex, tx, transactionId); } await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; }, ); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); notifyTransition(wex, transactionId, transitionInfo); return; } const sessionSecretSeed = encodeCrock(getRandomBytes(64)); // Store refresh session for this coin in the database. const mySession = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } const existingSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (existingSession) { return existingSession; } const newSession: RefreshSessionRecord = { coinIndex, refreshGroupId, norevealIndex: undefined, sessionSecretSeed: sessionSecretSeed, newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ count: x.count, denomPubHash: x.denomPubHash, })), amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), }; await tx.refreshSessions.put(newSession); return newSession; }, ); logger.trace( `found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`, ); return mySession; } function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { return Duration.fromSpec({ seconds: 5, }); } async function refreshMelt( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { const ctx = new RefreshTransactionContext(wex, refreshGroupId); const transactionId = ctx.transactionId; const d = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "denominations"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; } const refreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!refreshSession) { return; } if (refreshSession.norevealIndex !== undefined) { return; } const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, ); checkDbInvariant( !!oldDenom, "denomination for melted coin doesn't exist", ); const newCoinDenoms: RefreshNewDenomInfo[] = []; for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, ); checkDbInvariant( !!newDenom, "new denomination for refresh not in database", ); newCoinDenoms.push({ count: dh.count, denomPub: newDenom.denomPub, denomPubHash: newDenom.denomPubHash, feeWithdraw: newDenom.feeWithdraw, value: Amounts.stringify(newDenom.value), }); } return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; }, ); if (!d) { return; } const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d; let exchangeProtocolVersion: ExchangeProtocolVersion; switch (d.oldDenom.denomPub.cipher) { case DenomKeyType.Rsa: { exchangeProtocolVersion = ExchangeProtocolVersion.V12; break; } default: throw Error("unsupported key type"); } const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, meltCoinPriv: oldCoin.coinPriv, meltCoinPub: oldCoin.coinPub, feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), meltCoinMaxAge: oldCoin.maxAge, meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, newCoinDenoms, sessionSecretSeed: refreshSession.sessionSecretSeed, }); const reqUrl = new URL( `coins/${oldCoin.coinPub}/melt`, oldCoin.exchangeBaseUrl, ); let maybeAch: HashCodeString | undefined; if (oldCoin.ageCommitmentProof) { maybeAch = AgeRestriction.hashCommitment( oldCoin.ageCommitmentProof.commitment, ); } const meltReqBody: ExchangeMeltRequest = { coin_pub: oldCoin.coinPub, confirm_sig: derived.confirmSig, denom_pub_hash: oldCoin.denomPubHash, denom_sig: oldCoin.denomSig, rc: derived.hash, value_with_fee: Amounts.stringify(derived.meltValueWithFee), age_commitment_hash: maybeAch, }; const resp = await wex.ws.runSequentialized( [EXCHANGE_COINS_LOCK], async () => { return await wex.http.fetch(reqUrl.href, { method: "POST", body: meltReqBody, timeout: getRefreshRequestTimeout(refreshGroup), cancellationToken: wex.cancellationToken, }); }, ); if (resp.status === HttpStatusCode.NotFound) { const errDetails = await readUnexpectedResponseDetails(resp); const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "coinAvailability"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } const oldTxState = computeRefreshTransactionState(rg); rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error( "db invariant failed: missing refresh session in database", ); } refreshSession.lastError = errDetails; const updateRes = updateGroupStatus(rg); if (updateRes.final) { await makeCoinsVisible(wex, tx, transactionId); } await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState, }; }, ); wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); notifyTransition(wex, transactionId, transitionInfo); return; } const exchangeBaseUrl = oldCoin.exchangeBaseUrl; const currency = Amounts.currencyOf(oldDenom.value); if (resp.status === HttpStatusCode.Gone) { const errDetail = await readTalerErrorResponse(resp); switch (errDetail.code) { case TalerErrorCode.EXCHANGE_GENERIC_DENOMINATION_REVOKED: case TalerErrorCode.EXCHANGE_GENERIC_DENOMINATION_EXPIRED: { logger.warn(`refresh ${transactionId} requires redenomination`); await fetchFreshExchange(wex, exchangeBaseUrl, { forceUpdate: true, }); await updateWithdrawalDenoms(wex, exchangeBaseUrl); await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "denominations"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } const rs = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!rs) { return; } if (rs.norevealIndex !== undefined) { return; } const candidates = await getCandidateWithdrawalDenomsTx( wex, tx, exchangeBaseUrl, currency, ); // We can just replace the existing coin selection, because melt is atomic, // and thus it's not possible that some denoms in the selection were already // withdrawn. const input = Amounts.parseOrThrow(rg.inputPerCoin[rs.coinIndex]); const newSel = selectWithdrawalDenominations(input, candidates); rs.amountRefreshOutput = newSel.totalCoinValue; rs.newDenoms = newSel.selectedDenoms.map((x) => ({ count: x.count, denomPubHash: x.denomPubHash, })); await tx.refreshSessions.put(rs); }, ); break; } } throwUnexpectedRequestError(resp, errDetail); } if (resp.status === HttpStatusCode.Conflict) { // Just log for better diagnostics here, error status // will be handled later. logger.error( `melt request for ${Amounts.stringify( derived.meltValueWithFee, )} failed in refresh group ${refreshGroupId} due to conflict`, ); const historySig = await wex.cryptoApi.signCoinHistoryRequest({ coinPriv: oldCoin.coinPriv, coinPub: oldCoin.coinPub, startOffset: 0, }); const historyUrl = new URL( `coins/${oldCoin.coinPub}/history`, oldCoin.exchangeBaseUrl, ); const historyResp = await wex.http.fetch(historyUrl.href, { method: "GET", headers: { "Taler-Coin-History-Signature": historySig.sig, }, cancellationToken: wex.cancellationToken, }); const historyJson = await historyResp.json(); logger.info(`coin history: ${j2s(historyJson)}`); // FIXME: Before failing and re-trying, analyse response and adjust amount } const meltResponse = await readSuccessResponseJsonOrThrow( resp, codecForExchangeMeltResponse(), ); const norevealIndex = meltResponse.noreveal_index; refreshSession.norevealIndex = norevealIndex; await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); if (!rs) { return; } if (rs.norevealIndex !== undefined) { return; } rs.norevealIndex = norevealIndex; await tx.refreshSessions.put(rs); }, ); } export async function assembleRefreshRevealRequest(args: { cryptoApi: TalerCryptoInterface; derived: DerivedRefreshSession; norevealIndex: number; oldCoinPub: CoinPublicKeyString; oldCoinPriv: string; newDenoms: { denomPubHash: string; count: number; }[]; oldAgeCommitment?: AgeCommitment; }): Promise { const { derived, norevealIndex, cryptoApi, oldCoinPriv, oldCoinPub, newDenoms, } = args; const privs = Array.from(derived.transferPrivs); privs.splice(norevealIndex, 1); const planchets = derived.planchetsForGammas[norevealIndex]; if (!planchets) { throw Error("refresh index error"); } const newDenomsFlat: string[] = []; const linkSigs: string[] = []; for (let i = 0; i < newDenoms.length; i++) { const dsel = newDenoms[i]; for (let j = 0; j < dsel.count; j++) { const newCoinIndex = linkSigs.length; const linkSig = await cryptoApi.signCoinLink({ coinEv: planchets[newCoinIndex].coinEv, newDenomHash: dsel.denomPubHash, oldCoinPriv: oldCoinPriv, oldCoinPub: oldCoinPub, transferPub: derived.transferPubs[norevealIndex], }); linkSigs.push(linkSig.sig); newDenomsFlat.push(dsel.denomPubHash); } } const req: ExchangeRefreshRevealRequest = { coin_evs: planchets.map((x) => x.coinEv), new_denoms_h: newDenomsFlat, transfer_privs: privs, transfer_pub: derived.transferPubs[norevealIndex], link_sigs: linkSigs, old_age_commitment: args.oldAgeCommitment?.publicKeys, }; return req; } async function refreshReveal( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, ); const d = await wex.db.runReadOnlyTx( ["refreshGroups", "refreshSessions", "coins", "denominations"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; } const refreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!refreshSession) { return; } const norevealIndex = refreshSession.norevealIndex; if (norevealIndex === undefined) { throw Error("can't reveal without melting first"); } const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, ); checkDbInvariant( !!oldDenom, "denomination for melted coin doesn't exist", ); const newCoinDenoms: RefreshNewDenomInfo[] = []; for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, ); checkDbInvariant( !!newDenom, "new denomination for refresh not in database", ); newCoinDenoms.push({ count: dh.count, denomPub: newDenom.denomPub, denomPubHash: newDenom.denomPubHash, feeWithdraw: newDenom.feeWithdraw, value: Amounts.stringify(newDenom.value), }); } return { oldCoin, oldDenom, newCoinDenoms, refreshSession, refreshGroup, norevealIndex, }; }, ); if (!d) { return; } const { oldCoin, oldDenom, newCoinDenoms, refreshSession, refreshGroup, norevealIndex, } = d; let exchangeProtocolVersion: ExchangeProtocolVersion; switch (d.oldDenom.denomPub.cipher) { case DenomKeyType.Rsa: { exchangeProtocolVersion = ExchangeProtocolVersion.V12; break; } default: throw Error("unsupported key type"); } const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, meltCoinPriv: oldCoin.coinPriv, meltCoinPub: oldCoin.coinPub, feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), newCoinDenoms, meltCoinMaxAge: oldCoin.maxAge, meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, sessionSecretSeed: refreshSession.sessionSecretSeed, }); const reqUrl = new URL( `refreshes/${derived.hash}/reveal`, oldCoin.exchangeBaseUrl, ); const req = await assembleRefreshRevealRequest({ cryptoApi: wex.cryptoApi, derived, newDenoms: newCoinDenoms, norevealIndex: norevealIndex, oldCoinPriv: oldCoin.coinPriv, oldCoinPub: oldCoin.coinPub, oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment, }); const resp = await wex.ws.runSequentialized( [EXCHANGE_COINS_LOCK], async () => { return await wex.http.fetch(reqUrl.href, { body: req, method: "POST", timeout: getRefreshRequestTimeout(refreshGroup), cancellationToken: wex.cancellationToken, }); }, ); const reveal = await readSuccessResponseJsonOrThrow( resp, codecForExchangeRevealResponse(), ); const coins: CoinRecord[] = []; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }); for (let i = 0; i < refreshSession.newDenoms.length; i++) { const ncd = newCoinDenoms[i]; for (let j = 0; j < refreshSession.newDenoms[i].count; j++) { const newCoinIndex = coins.length; const pc = derived.planchetsForGammas[norevealIndex][newCoinIndex]; if (ncd.denomPub.cipher !== DenomKeyType.Rsa) { throw Error("cipher unsupported"); } const evSig = reveal.ev_sigs[newCoinIndex].ev_sig; const denomSig = await wex.cryptoApi.unblindDenominationSignature({ planchet: { blindingKey: pc.blindingKey, denomPub: ncd.denomPub, }, evSig, }); const coin: CoinRecord = { blindingKey: pc.blindingKey, coinPriv: pc.coinPriv, coinPub: pc.coinPub, denomPubHash: ncd.denomPubHash, denomSig, exchangeBaseUrl: oldCoin.exchangeBaseUrl, status: CoinStatus.Fresh, coinSource: { type: CoinSourceType.Refresh, refreshGroupId, oldCoinPub: refreshGroup.oldCoinPubs[coinIndex], }, sourceTransactionId: transactionId, coinEvHash: pc.coinEvHash, maxAge: pc.maxAge, ageCommitmentProof: pc.ageCommitmentProof, spendAllocation: undefined, }; coins.push(coin); } } const transitionInfo = await wex.db.runReadWriteTx( [ "coins", "denominations", "coinAvailability", "refreshGroups", "refreshSessions", ], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { logger.warn("no refresh session found"); return; } const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); if (!rs) { return; } const oldTxState = computeRefreshTransactionState(rg); rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; updateGroupStatus(rg); for (const coin of coins) { await makeCoinAvailable(wex, tx, coin); } await makeCoinsVisible(wex, tx, transactionId); await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; }, ); notifyTransition(wex, transactionId, transitionInfo); logger.trace("refresh finished (end of reveal)"); } export async function processRefreshGroup( wex: WalletExecutionContext, refreshGroupId: string, ): Promise { logger.trace(`processing refresh group ${refreshGroupId}`); const refreshGroup = await wex.db.runReadOnlyTx( ["refreshGroups"], async (tx) => tx.refreshGroups.get(refreshGroupId), ); if (!refreshGroup) { return TaskRunResult.finished(); } if (refreshGroup.timestampFinished) { return TaskRunResult.finished(); } // Process refresh sessions of the group in parallel. logger.trace( `processing refresh sessions for ${refreshGroup.oldCoinPubs.length} old coins`, ); let errors: TalerErrorDetail[] = []; let inShutdown = false; const ps = refreshGroup.oldCoinPubs.map((x, i) => processRefreshSession(wex, refreshGroupId, i).catch((x) => { if (x instanceof CryptoApiStoppedError) { inShutdown = true; logger.info( "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", ); return; } if (x instanceof TalerError) { logger.warn("process refresh session got exception (TalerError)"); logger.warn(`exc ${x}`); logger.warn(`exc stack ${x.stack}`); logger.warn(`error detail: ${j2s(x.errorDetail)}`); } else { logger.warn("process refresh session got exception"); logger.warn(`exc ${x}`); logger.warn(`exc stack ${x.stack}`); } errors.push(getErrorDetailFromException(x)); }), ); try { logger.info("waiting for refreshes"); await Promise.all(ps); logger.info("refresh group finished"); } catch (e) { logger.warn("process refresh sessions got exception"); logger.warn(`exception: ${e}`); } if (inShutdown) { return TaskRunResult.backoff(); } if (errors.length > 0) { return { type: TaskRunResultType.Error, errorDetail: makeErrorDetail( TalerErrorCode.WALLET_REFRESH_GROUP_INCOMPLETE, { numErrors: errors.length, errors: errors.slice(0, 5), }, ), }; } return TaskRunResult.backoff(); } async function processRefreshSession( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); return { refreshGroup: rg, refreshSession: rs, }; }, ); if (!refreshGroup) { return; } if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) { return; } if (!refreshSession) { refreshSession = await provideRefreshSession( wex, refreshGroupId, coinIndex, ); } if (!refreshSession) { // We tried to create the refresh session, but didn't get a result back. // This means that either the session is finished, or that creating // one isn't necessary. return; } if (refreshSession.norevealIndex === undefined) { await refreshMelt(wex, refreshGroupId, coinIndex); } await refreshReveal(wex, refreshGroupId, coinIndex); } export interface RefreshOutputInfo { outputPerCoin: AmountJson[]; perExchangeInfo: Record; } export async function calculateRefreshOutput( wex: WalletExecutionContext, tx: WalletDbReadOnlyTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, currency: string, oldCoinPubs: CoinRefreshRequest[], ): Promise { const estimatedOutputPerCoin: AmountJson[] = []; const denomsPerExchange: Record = {}; const infoPerExchange: Record = {}; // FIXME: Use denom groups instead of querying all denominations! const getDenoms = async ( exchangeBaseUrl: string, ): Promise => { if (denomsPerExchange[exchangeBaseUrl]) { return denomsPerExchange[exchangeBaseUrl]; } const allDenoms = await getCandidateWithdrawalDenomsTx( wex, tx, exchangeBaseUrl, currency, ); denomsPerExchange[exchangeBaseUrl] = allDenoms; return allDenoms; }; for (const ocp of oldCoinPubs) { const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant( !!denom, "denomination for existing coin must be in database", ); const refreshAmount = ocp.amount; const denoms = await getDenoms(coin.exchangeBaseUrl); const cost = getTotalRefreshCost( denoms, denom, Amounts.parseOrThrow(refreshAmount), wex.ws.config.testing.denomselAllowLate, ); const output = Amounts.sub(refreshAmount, cost).amount; let exchInfo = infoPerExchange[coin.exchangeBaseUrl]; if (!exchInfo) { infoPerExchange[coin.exchangeBaseUrl] = exchInfo = { outputEffective: Amounts.stringify(Amounts.zeroOfAmount(cost)), }; } exchInfo.outputEffective = Amounts.stringify( Amounts.add(exchInfo.outputEffective, output).amount, ); estimatedOutputPerCoin.push(output); } return { outputPerCoin: estimatedOutputPerCoin, perExchangeInfo: infoPerExchange, }; } async function applyRefresh( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, oldCoinPubs: CoinRefreshRequest[], refreshGroupId: string, ): Promise { for (const ocp of oldCoinPubs) { const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant( !!denom, "denomination for existing coin must be in database", ); switch (coin.status) { case CoinStatus.Dormant: break; case CoinStatus.Fresh: { coin.status = CoinStatus.Dormant; const coinAv = await tx.coinAvailability.get([ coin.exchangeBaseUrl, coin.denomPubHash, coin.maxAge, ]); checkDbInvariant(!!coinAv); checkDbInvariant(coinAv.freshCoinCount > 0); coinAv.freshCoinCount--; await tx.coinAvailability.put(coinAv); break; } case CoinStatus.FreshSuspended: { // For suspended coins, we don't have to adjust coin // availability, as they are not counted as available. coin.status = CoinStatus.Dormant; break; } default: assertUnreachable(coin.status); } if (!coin.spendAllocation) { coin.spendAllocation = { amount: Amounts.stringify(ocp.amount), // id: `txn:refresh:${refreshGroupId}`, id: constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }), }; } await tx.coins.put(coin); } } export interface CreateRefreshGroupResult { refreshGroupId: string; notifications: WalletNotification[]; } /** * Create a refresh group for a list of coins. * * Refreshes the remaining amount on the coin, effectively capturing the remaining * value in the refresh group. * * The caller must also ensure that the coins that should be refreshed exist * in the current database transaction. */ export async function createRefreshGroup( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, currency: string, oldCoinPubs: CoinRefreshRequest[], refreshReason: RefreshReason, originatingTransactionId: string | undefined, ): Promise { const refreshGroupId = encodeCrock(getRandomBytes(32)); const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs); const estimatedOutputPerCoin = outInfo.outputPerCoin; await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId); const refreshGroup: RefreshGroupRecord = { operationStatus: RefreshOperationStatus.Pending, currency, timestampFinished: undefined, statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), originatingTransactionId, reason: refreshReason, refreshGroupId, inputPerCoin: oldCoinPubs.map((x) => x.amount), expectedOutputPerCoin: estimatedOutputPerCoin.map((x) => Amounts.stringify(x), ), infoPerExchange: outInfo.perExchangeInfo, timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), }; if (oldCoinPubs.length == 0) { logger.warn("created refresh group with zero coins"); refreshGroup.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); refreshGroup.operationStatus = RefreshOperationStatus.Finished; } await tx.refreshGroups.put(refreshGroup); const newTxState = computeRefreshTransactionState(refreshGroup); logger.trace(`created refresh group ${refreshGroupId}`); const ctx = new RefreshTransactionContext(wex, refreshGroupId); // Shepherd the task. // If the current transaction fails to commit the refresh // group to the DB, the shepherd will give up. wex.taskScheduler.startShepherdTask(ctx.taskId); return { refreshGroupId, notifications: [ { type: NotificationType.TransactionStateTransition, transactionId: ctx.transactionId, oldTxState: { major: TransactionMajorState.None, }, newTxState, }, ], }; } export function computeRefreshTransactionState( rg: RefreshGroupRecord, ): TransactionState { switch (rg.operationStatus) { case RefreshOperationStatus.Finished: return { major: TransactionMajorState.Done, }; case RefreshOperationStatus.Failed: return { major: TransactionMajorState.Failed, }; case RefreshOperationStatus.Pending: return { major: TransactionMajorState.Pending, }; case RefreshOperationStatus.Suspended: return { major: TransactionMajorState.Suspended, }; } } export function computeRefreshTransactionActions( rg: RefreshGroupRecord, ): TransactionAction[] { switch (rg.operationStatus) { case RefreshOperationStatus.Finished: return [TransactionAction.Delete]; case RefreshOperationStatus.Failed: return [TransactionAction.Delete]; case RefreshOperationStatus.Pending: return [ TransactionAction.Retry, TransactionAction.Suspend, TransactionAction.Fail, ]; case RefreshOperationStatus.Suspended: return [TransactionAction.Resume, TransactionAction.Fail]; } } export function getRefreshesForTransaction( wex: WalletExecutionContext, transactionId: string, ): Promise { return wex.db.runReadOnlyTx(["refreshGroups"], async (tx) => { const groups = await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll( transactionId, ); return groups.map((x) => constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId: x.refreshGroupId, }), ); }); } export interface ForceRefreshResult { refreshGroupId: string; } export async function forceRefresh( wex: WalletExecutionContext, req: ForceRefreshRequest, ): Promise { if (req.coinPubList.length == 0) { throw Error("refusing to create empty refresh group"); } const res = await wex.db.runReadWriteTx( ["refreshGroups", "coinAvailability", "denominations", "coins"], async (tx) => { let coinPubs: CoinRefreshRequest[] = []; for (const c of req.coinPubList) { const coin = await tx.coins.get(c); if (!coin) { throw Error(`coin (pubkey ${c}) not found`); } const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant(!!denom); coinPubs.push({ coinPub: c, amount: denom?.value, }); } return await createRefreshGroup( wex, tx, Amounts.currencyOf(coinPubs[0].amount), coinPubs, RefreshReason.Manual, undefined, ); }, ); for (const notif of res.notifications) { wex.ws.notify(notif); } return { refreshGroupId: res.refreshGroupId, }; } /** * Wait until a refresh operation is final. */ export async function waitRefreshFinal( wex: WalletExecutionContext, refreshGroupId: string, ): Promise { const ctx = new RefreshTransactionContext(wex, refreshGroupId); wex.taskScheduler.startShepherdTask(ctx.taskId); // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. const refreshNotifFlag = new AsyncFlag(); // Raise purchaseNotifFlag whenever we get a notification // about our refresh. const cancelNotif = wex.ws.addNotificationListener((notif) => { if ( notif.type === NotificationType.TransactionStateTransition && notif.transactionId === ctx.transactionId ) { refreshNotifFlag.raise(); } }); const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { cancelNotif(); refreshNotifFlag.raise(); }); try { await internalWaitRefreshFinal(ctx, refreshNotifFlag); } catch (e) { unregisterOnCancelled(); cancelNotif(); } } async function internalWaitRefreshFinal( ctx: RefreshTransactionContext, flag: AsyncFlag, ): Promise { while (true) { if (ctx.wex.cancellationToken.isCancelled) { throw Error("cancelled"); } // Check if refresh is final const res = await ctx.wex.db.runReadOnlyTx( ["refreshGroups", "operationRetries"], async (tx) => { return { rg: await tx.refreshGroups.get(ctx.refreshGroupId), }; }, ); const { rg } = res; if (!rg) { // Must've been deleted, we consider that final. return; } switch (rg.operationStatus) { case RefreshOperationStatus.Failed: case RefreshOperationStatus.Finished: // Transaction is final return; case RefreshOperationStatus.Pending: case RefreshOperationStatus.Suspended: break; } // Wait for the next transition await flag.wait(); flag.reset(); } }