/* This file is part of GNU Taler (C) 2019-2024 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * @fileoverview * Implementation of the refresh transaction. */ /** * Imports. */ import { AgeCommitment, AgeRestriction, AmountJson, Amounts, amountToPretty, assertUnreachable, AsyncFlag, checkDbInvariant, codecForCoinHistoryResponse, codecForExchangeMeltResponse, codecForExchangeRevealResponse, CoinPublicKeyString, CoinRefreshRequest, CoinStatus, DenominationInfo, DenomKeyType, Duration, encodeCrock, ExchangeMeltRequest, ExchangeProtocolVersion, ExchangeRefreshRevealRequest, fnutil, ForceRefreshRequest, getErrorDetailFromException, getRandomBytes, HashCodeString, HttpStatusCode, j2s, Logger, makeErrorDetail, NotificationType, RefreshReason, TalerError, TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionState, TransactionType, URL, WalletNotification, } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrThrow, readTalerErrorResponse, throwUnexpectedRequestError, } from "@gnu-taler/taler-util/http"; import { constructTaskIdentifier, makeCoinsVisible, PendingTaskType, TaskIdStr, TaskRunResult, TaskRunResultType, TombstoneTag, TransactionContext, TransitionResult, TransitionResultType, } from "./common.js"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { DerivedRefreshSession, RefreshNewDenomInfo, } from "./crypto/cryptoTypes.js"; import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { CoinAvailabilityRecord, CoinRecord, CoinSourceType, DenominationRecord, RefreshCoinStatus, RefreshGroupPerExchangeInfo, RefreshGroupRecord, RefreshOperationStatus, RefreshSessionRecord, timestampPreciseToDb, WalletDbReadOnlyTransaction, WalletDbReadWriteTransaction, WalletDbStoresArr, } from "./db.js"; import { selectWithdrawalDenominations } from "./denomSelection.js"; import { constructTransactionIdentifier, notifyTransition, TransitionInfo, } from "./transactions.js"; import { EXCHANGE_COINS_LOCK, getDenomInfo, WalletExecutionContext, } from "./wallet.js"; import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; const logger = new Logger("refresh.ts"); /** * Update the materialized refresh transaction based * on the refresh group record. */ async function updateRefreshTransaction( ctx: RefreshTransactionContext, tx: WalletDbReadWriteTransaction< [ "refreshGroups", "transactions", "operationRetries", "exchanges", "exchangeDetails", ] >, ): Promise {} export class RefreshTransactionContext implements TransactionContext { readonly transactionId: TransactionIdStr; readonly taskId: TaskIdStr; constructor( public wex: WalletExecutionContext, public refreshGroupId: string, ) { this.transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }); this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId, }); } /** * Transition a withdrawal transaction. * Extra object stores may be accessed during the transition. */ async transition( opts: { extraStores?: StoreNameArray; transactionLabel?: string }, f: ( rec: RefreshGroupRecord | undefined, tx: WalletDbReadWriteTransaction< [ "refreshGroups", "transactions", "operationRetries", "exchanges", "exchangeDetails", ...StoreNameArray, ] >, ) => Promise>, ): Promise { const baseStores = [ "refreshGroups" as const, "transactions" as const, "operationRetries" as const, "exchanges" as const, "exchangeDetails" as const, ]; let stores = opts.extraStores ? [...baseStores, ...opts.extraStores] : baseStores; const transitionInfo = await this.wex.db.runReadWriteTx( { storeNames: stores }, async (tx) => { const wgRec = await tx.refreshGroups.get(this.refreshGroupId); let oldTxState: TransactionState; if (wgRec) { oldTxState = computeRefreshTransactionState(wgRec); } else { oldTxState = { major: TransactionMajorState.None, }; } const res = await f(wgRec, tx); switch (res.type) { case TransitionResultType.Transition: { await tx.refreshGroups.put(res.rec); await updateRefreshTransaction(this, tx); const newTxState = computeRefreshTransactionState(res.rec); return { oldTxState, newTxState, }; } case TransitionResultType.Delete: await tx.refreshGroups.delete(this.refreshGroupId); await updateRefreshTransaction(this, tx); return { oldTxState, newTxState: { major: TransactionMajorState.None, }, }; default: return undefined; } }, ); notifyTransition(this.wex, this.transactionId, transitionInfo); return transitionInfo; } async deleteTransaction(): Promise { await this.transition( { extraStores: ["tombstones"], }, async (rec, tx) => { if (!rec) { return TransitionResult.stay(); } await tx.tombstones.put({ id: TombstoneTag.DeleteRefreshGroup + ":" + this.refreshGroupId, }); return TransitionResult.delete(); }, ); } async suspendTransaction(): Promise { await this.transition({}, async (rec, tx) => { if (!rec) { return TransitionResult.stay(); } switch (rec.operationStatus) { case RefreshOperationStatus.Finished: case RefreshOperationStatus.Suspended: case RefreshOperationStatus.Failed: return TransitionResult.stay(); case RefreshOperationStatus.Pending: { rec.operationStatus = RefreshOperationStatus.Suspended; return TransitionResult.transition(rec); } default: assertUnreachable(rec.operationStatus); } }); } async abortTransaction(): Promise { // Refresh transactions only support fail, not abort. throw new Error("refresh transactions cannot be aborted"); } async resumeTransaction(): Promise { await this.transition({}, async (rec, tx) => { if (!rec) { return TransitionResult.stay(); } switch (rec.operationStatus) { case RefreshOperationStatus.Finished: case RefreshOperationStatus.Failed: case RefreshOperationStatus.Pending: return TransitionResult.stay(); case RefreshOperationStatus.Suspended: { rec.operationStatus = RefreshOperationStatus.Pending; return TransitionResult.transition(rec); } default: assertUnreachable(rec.operationStatus); } }); } async failTransaction(): Promise { await this.transition({}, async (rec, tx) => { if (!rec) { return TransitionResult.stay(); } switch (rec.operationStatus) { case RefreshOperationStatus.Finished: case RefreshOperationStatus.Failed: return TransitionResult.stay(); case RefreshOperationStatus.Pending: case RefreshOperationStatus.Suspended: { rec.operationStatus = RefreshOperationStatus.Failed; return TransitionResult.transition(rec); } default: assertUnreachable(rec.operationStatus); } }); } } export async function getTotalRefreshCost( wex: WalletExecutionContext, tx: WalletDbReadOnlyTransaction<["denominations"]>, refreshedDenom: DenominationInfo, amountLeft: AmountJson, ): Promise { const cacheKey = `denom=${refreshedDenom.exchangeBaseUrl}/${ refreshedDenom.denomPubHash };left=${Amounts.stringify(amountLeft)}`; const cacheRes = wex.ws.refreshCostCache.get(cacheKey); if (cacheRes) { return cacheRes; } const allDenoms = await getCandidateWithdrawalDenomsTx( wex, tx, refreshedDenom.exchangeBaseUrl, Amounts.currencyOf(amountLeft), ); const res = getTotalRefreshCostInternal( allDenoms, refreshedDenom, amountLeft, ); wex.ws.refreshCostCache.put(cacheKey, res); return res; } /** * Get the amount that we lose when refreshing a coin of the given denomination * with a certain amount left. * * If the amount left is zero, then the refresh cost * is also considered to be zero. If a refresh isn't possible (e.g. due to lack of * the right denominations), then the cost is the full amount left. * * Considers refresh fees, withdrawal fees after refresh and amounts too small * to refresh. */ export function getTotalRefreshCostInternal( denoms: DenominationRecord[], refreshedDenom: DenominationInfo, amountLeft: AmountJson, ): AmountJson { const withdrawAmount = Amounts.sub( amountLeft, refreshedDenom.feeRefresh, ).amount; const denomMap = Object.fromEntries(denoms.map((x) => [x.denomPubHash, x])); const withdrawDenoms = selectWithdrawalDenominations( withdrawAmount, denoms, false, ); const resultingAmount = Amounts.add( Amounts.zeroOfCurrency(withdrawAmount.currency), ...withdrawDenoms.selectedDenoms.map( (d) => Amounts.mult(denomMap[d.denomPubHash].value, d.count).amount, ), ).amount; const totalCost = Amounts.sub(amountLeft, resultingAmount).amount; logger.trace( `total refresh cost for ${amountToPretty(amountLeft)} is ${amountToPretty( totalCost, )}`, ); return totalCost; } async function getCoinAvailabilityForDenom( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["coins", "coinAvailability", "denominations"] >, denom: DenominationInfo, ageRestriction: number, ): Promise { checkDbInvariant(!!denom); let car = await tx.coinAvailability.get([ denom.exchangeBaseUrl, denom.denomPubHash, ageRestriction, ]); if (!car) { car = { maxAge: ageRestriction, value: denom.value, currency: Amounts.currencyOf(denom.value), denomPubHash: denom.denomPubHash, exchangeBaseUrl: denom.exchangeBaseUrl, freshCoinCount: 0, visibleCoinCount: 0, }; } return car; } /** * Create a refresh session for one particular coin inside a refresh group. */ async function initRefreshSession( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["refreshSessions", "coinAvailability", "coins", "denominations"] >, refreshGroup: RefreshGroupRecord, coinIndex: number, ): Promise { const refreshGroupId = refreshGroup.refreshGroupId; logger.trace( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; const oldCoin = await tx.coins.get(oldCoinPub); if (!oldCoin) { throw Error("Can't refresh, coin not found"); } const exchangeBaseUrl = oldCoin.exchangeBaseUrl; const sessionSecretSeed = encodeCrock(getRandomBytes(64)); const oldDenom = await getDenomInfo( wex, tx, exchangeBaseUrl, oldCoin.denomPubHash, ); if (!oldDenom) { throw Error("db inconsistent: denomination for coin not found"); } const currency = refreshGroup.currency; const availableDenoms = await getCandidateWithdrawalDenomsTx( wex, tx, exchangeBaseUrl, currency, ); const availableAmount = Amounts.sub( refreshGroup.inputPerCoin[coinIndex], oldDenom.feeRefresh, ).amount; const newCoinDenoms = selectWithdrawalDenominations( availableAmount, availableDenoms, wex.ws.config.testing.denomselAllowLate, ); if (newCoinDenoms.selectedDenoms.length === 0) { logger.trace( `not refreshing, available amount ${amountToPretty( availableAmount, )} too small`, ); refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; return; } for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) { const dph = newCoinDenoms.selectedDenoms[i].denomPubHash; const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph); if (!denom) { logger.error(`denom ${dph} not in DB`); continue; } const car = await getCoinAvailabilityForDenom( wex, tx, denom, oldCoin.maxAge, ); car.pendingRefreshOutputCount = (car.pendingRefreshOutputCount ?? 0) + newCoinDenoms.selectedDenoms[i].count; await tx.coinAvailability.put(car); } const newSession: RefreshSessionRecord = { coinIndex, refreshGroupId, norevealIndex: undefined, sessionSecretSeed: sessionSecretSeed, newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ count: x.count, denomPubHash: x.denomPubHash, })), amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), }; await tx.refreshSessions.put(newSession); } /** * Uninitialize a refresh session. * * Adjust the coin availability of involved coins. */ async function destroyRefreshSession( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coinAvailability", "coins"] >, refreshGroup: RefreshGroupRecord, refreshSession: RefreshSessionRecord, ): Promise { for (let i = 0; i < refreshSession.newDenoms.length; i++) { const oldCoin = await tx.coins.get( refreshGroup.oldCoinPubs[refreshSession.coinIndex], ); if (!oldCoin) { continue; } const dph = refreshSession.newDenoms[i].denomPubHash; const denom = await getDenomInfo(wex, tx, oldCoin.exchangeBaseUrl, dph); if (!denom) { logger.error(`denom ${dph} not in DB`); continue; } const car = await getCoinAvailabilityForDenom( wex, tx, denom, oldCoin.maxAge, ); checkDbInvariant(car.pendingRefreshOutputCount != null); car.pendingRefreshOutputCount = car.pendingRefreshOutputCount - refreshSession.newDenoms[i].count; await tx.coinAvailability.put(car); } } function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { return Duration.fromSpec({ seconds: 5, }); } /** * Run the melt step of a refresh session. * * If the melt step succeeds or fails permanently, * the status in the refresh group is updated. * * When a transient error occurs, an exception is thrown. */ async function refreshMelt( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { const ctx = new RefreshTransactionContext(wex, refreshGroupId); const d = await wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "refreshSessions", "coins", "denominations", ], }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; } const refreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!refreshSession) { return; } if (refreshSession.norevealIndex !== undefined) { return; } const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, ); checkDbInvariant( !!oldDenom, "denomination for melted coin doesn't exist", ); const newCoinDenoms: RefreshNewDenomInfo[] = []; for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, ); checkDbInvariant( !!newDenom, "new denomination for refresh not in database", ); newCoinDenoms.push({ count: dh.count, denomPub: newDenom.denomPub, denomPubHash: newDenom.denomPubHash, feeWithdraw: newDenom.feeWithdraw, value: Amounts.stringify(newDenom.value), }); } return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; }, ); if (!d) { return; } const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d; let exchangeProtocolVersion: ExchangeProtocolVersion; switch (d.oldDenom.denomPub.cipher) { case DenomKeyType.Rsa: { exchangeProtocolVersion = ExchangeProtocolVersion.V12; break; } default: throw Error("unsupported key type"); } const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, meltCoinPriv: oldCoin.coinPriv, meltCoinPub: oldCoin.coinPub, feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), meltCoinMaxAge: oldCoin.maxAge, meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, newCoinDenoms, sessionSecretSeed: refreshSession.sessionSecretSeed, }); const reqUrl = new URL( `coins/${oldCoin.coinPub}/melt`, oldCoin.exchangeBaseUrl, ); let maybeAch: HashCodeString | undefined; if (oldCoin.ageCommitmentProof) { maybeAch = AgeRestriction.hashCommitment( oldCoin.ageCommitmentProof.commitment, ); } const meltReqBody: ExchangeMeltRequest = { coin_pub: oldCoin.coinPub, confirm_sig: derived.confirmSig, denom_pub_hash: oldCoin.denomPubHash, denom_sig: oldCoin.denomSig, rc: derived.hash, value_with_fee: Amounts.stringify(derived.meltValueWithFee), age_commitment_hash: maybeAch, }; const resp = await wex.ws.runSequentialized( [EXCHANGE_COINS_LOCK], async () => { return await wex.http.fetch(reqUrl.href, { method: "POST", body: meltReqBody, timeout: getRefreshRequestTimeout(refreshGroup), cancellationToken: wex.cancellationToken, }); }, ); switch (resp.status) { case HttpStatusCode.NotFound: { const errDetail = await readTalerErrorResponse(resp); await handleRefreshMeltNotFound(ctx, coinIndex, errDetail); return; } case HttpStatusCode.Gone: { const errDetail = await readTalerErrorResponse(resp); await handleRefreshMeltGone(ctx, coinIndex, errDetail); return; } case HttpStatusCode.Conflict: { const errDetail = await readTalerErrorResponse(resp); await handleRefreshMeltConflict( ctx, coinIndex, errDetail, derived, oldCoin, ); return; } case HttpStatusCode.Ok: break; default: { const errDetail = await readTalerErrorResponse(resp); throwUnexpectedRequestError(resp, errDetail); } } const meltResponse = await readSuccessResponseJsonOrThrow( resp, codecForExchangeMeltResponse(), ); const norevealIndex = meltResponse.noreveal_index; refreshSession.norevealIndex = norevealIndex; await wex.db.runReadWriteTx( { storeNames: ["refreshGroups", "refreshSessions"] }, async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); if (!rs) { return; } if (rs.norevealIndex !== undefined) { return; } rs.norevealIndex = norevealIndex; await tx.refreshSessions.put(rs); }, ); } async function handleRefreshMeltGone( ctx: RefreshTransactionContext, coinIndex: number, errDetails: TalerErrorDetail, ): Promise { // const expiredMsg = codecForDenominationExpiredMessage().decode(errDetails); // FIXME: Validate signature. await ctx.wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "refreshSessions", "coins", "denominations", "coinAvailability", ], }, async (tx) => { const rg = await tx.refreshGroups.get(ctx.refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ ctx.refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error("db invariant failed: missing refresh session in database"); } refreshSession.lastError = errDetails; await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); }, ); } async function handleRefreshMeltConflict( ctx: RefreshTransactionContext, coinIndex: number, errDetails: TalerErrorDetail, derived: DerivedRefreshSession, oldCoin: CoinRecord, ): Promise { // Just log for better diagnostics here, error status // will be handled later. logger.error( `melt request for ${Amounts.stringify( derived.meltValueWithFee, )} failed in refresh group ${ctx.refreshGroupId} due to conflict`, ); const historySig = await ctx.wex.cryptoApi.signCoinHistoryRequest({ coinPriv: oldCoin.coinPriv, coinPub: oldCoin.coinPub, startOffset: 0, }); const historyUrl = new URL( `coins/${oldCoin.coinPub}/history`, oldCoin.exchangeBaseUrl, ); const historyResp = await ctx.wex.http.fetch(historyUrl.href, { method: "GET", headers: { "Taler-Coin-History-Signature": historySig.sig, }, cancellationToken: ctx.wex.cancellationToken, }); const historyJson = await readSuccessResponseJsonOrThrow( historyResp, codecForCoinHistoryResponse(), ); logger.info(`coin history: ${j2s(historyJson)}`); // FIXME: If response seems wrong, report to auditor (in the future!); await ctx.wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "refreshSessions", "denominations", "coins", "coinAvailability", ], }, async (tx) => { const rg = await tx.refreshGroups.get(ctx.refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } if (Amounts.isZero(historyJson.balance)) { rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ ctx.refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error( "db invariant failed: missing refresh session in database", ); } refreshSession.lastError = errDetails; await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); } else { // Try again with new denoms! rg.inputPerCoin[coinIndex] = historyJson.balance; const refreshSession = await tx.refreshSessions.get([ ctx.refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error( "db invariant failed: missing refresh session in database", ); } await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); await tx.refreshSessions.delete([ctx.refreshGroupId, coinIndex]); await initRefreshSession(ctx.wex, tx, rg, coinIndex); } }, ); } async function handleRefreshMeltNotFound( ctx: RefreshTransactionContext, coinIndex: number, errDetails: TalerErrorDetail, ): Promise { // FIXME: Validate the exchange's error response await ctx.wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "refreshSessions", "coins", "denominations", "coinAvailability", ], }, async (tx) => { const rg = await tx.refreshGroups.get(ctx.refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ ctx.refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error("db invariant failed: missing refresh session in database"); } await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); refreshSession.lastError = errDetails; await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); }, ); } export async function assembleRefreshRevealRequest(args: { cryptoApi: TalerCryptoInterface; derived: DerivedRefreshSession; norevealIndex: number; oldCoinPub: CoinPublicKeyString; oldCoinPriv: string; newDenoms: { denomPubHash: string; count: number; }[]; oldAgeCommitment?: AgeCommitment; }): Promise { const { derived, norevealIndex, cryptoApi, oldCoinPriv, oldCoinPub, newDenoms, } = args; const privs = Array.from(derived.transferPrivs); privs.splice(norevealIndex, 1); const planchets = derived.planchetsForGammas[norevealIndex]; if (!planchets) { throw Error("refresh index error"); } const newDenomsFlat: string[] = []; const linkSigs: string[] = []; for (let i = 0; i < newDenoms.length; i++) { const dsel = newDenoms[i]; for (let j = 0; j < dsel.count; j++) { const newCoinIndex = linkSigs.length; const linkSig = await cryptoApi.signCoinLink({ coinEv: planchets[newCoinIndex].coinEv, newDenomHash: dsel.denomPubHash, oldCoinPriv: oldCoinPriv, oldCoinPub: oldCoinPub, transferPub: derived.transferPubs[norevealIndex], }); linkSigs.push(linkSig.sig); newDenomsFlat.push(dsel.denomPubHash); } } const req: ExchangeRefreshRevealRequest = { coin_evs: planchets.map((x) => x.coinEv), new_denoms_h: newDenomsFlat, transfer_privs: privs, transfer_pub: derived.transferPubs[norevealIndex], link_sigs: linkSigs, old_age_commitment: args.oldAgeCommitment?.publicKeys, }; return req; } async function refreshReveal( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, ); const ctx = new RefreshTransactionContext(wex, refreshGroupId); const d = await wex.db.runReadOnlyTx( { storeNames: [ "refreshGroups", "refreshSessions", "coins", "denominations", ], }, async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; } const refreshSession = await tx.refreshSessions.get([ refreshGroupId, coinIndex, ]); if (!refreshSession) { return; } const norevealIndex = refreshSession.norevealIndex; if (norevealIndex === undefined) { throw Error("can't reveal without melting first"); } const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, ); checkDbInvariant( !!oldDenom, "denomination for melted coin doesn't exist", ); const newCoinDenoms: RefreshNewDenomInfo[] = []; for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, ); checkDbInvariant( !!newDenom, "new denomination for refresh not in database", ); newCoinDenoms.push({ count: dh.count, denomPub: newDenom.denomPub, denomPubHash: newDenom.denomPubHash, feeWithdraw: newDenom.feeWithdraw, value: Amounts.stringify(newDenom.value), }); } return { oldCoin, oldDenom, newCoinDenoms, refreshSession, refreshGroup, norevealIndex, }; }, ); if (!d) { return; } const { oldCoin, oldDenom, newCoinDenoms, refreshSession, refreshGroup, norevealIndex, } = d; let exchangeProtocolVersion: ExchangeProtocolVersion; switch (d.oldDenom.denomPub.cipher) { case DenomKeyType.Rsa: { exchangeProtocolVersion = ExchangeProtocolVersion.V12; break; } default: throw Error("unsupported key type"); } const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, meltCoinPriv: oldCoin.coinPriv, meltCoinPub: oldCoin.coinPub, feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), newCoinDenoms, meltCoinMaxAge: oldCoin.maxAge, meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, sessionSecretSeed: refreshSession.sessionSecretSeed, }); const reqUrl = new URL( `refreshes/${derived.hash}/reveal`, oldCoin.exchangeBaseUrl, ); const req = await assembleRefreshRevealRequest({ cryptoApi: wex.cryptoApi, derived, newDenoms: newCoinDenoms, norevealIndex: norevealIndex, oldCoinPriv: oldCoin.coinPriv, oldCoinPub: oldCoin.coinPub, oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment, }); const resp = await wex.ws.runSequentialized( [EXCHANGE_COINS_LOCK], async () => { return await wex.http.fetch(reqUrl.href, { body: req, method: "POST", timeout: getRefreshRequestTimeout(refreshGroup), cancellationToken: wex.cancellationToken, }); }, ); switch (resp.status) { case HttpStatusCode.Ok: break; case HttpStatusCode.Conflict: case HttpStatusCode.Gone: { const errDetail = await readTalerErrorResponse(resp); await handleRefreshRevealError(ctx, coinIndex, errDetail); return; } default: { const errDetail = await readTalerErrorResponse(resp); throwUnexpectedRequestError(resp, errDetail); } } const reveal = await readSuccessResponseJsonOrThrow( resp, codecForExchangeRevealResponse(), ); const coins: CoinRecord[] = []; const transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }); for (let i = 0; i < refreshSession.newDenoms.length; i++) { const ncd = newCoinDenoms[i]; for (let j = 0; j < refreshSession.newDenoms[i].count; j++) { const newCoinIndex = coins.length; const pc = derived.planchetsForGammas[norevealIndex][newCoinIndex]; if (ncd.denomPub.cipher !== DenomKeyType.Rsa) { throw Error("cipher unsupported"); } const evSig = reveal.ev_sigs[newCoinIndex].ev_sig; const denomSig = await wex.cryptoApi.unblindDenominationSignature({ planchet: { blindingKey: pc.blindingKey, denomPub: ncd.denomPub, }, evSig, }); const coin: CoinRecord = { blindingKey: pc.blindingKey, coinPriv: pc.coinPriv, coinPub: pc.coinPub, denomPubHash: ncd.denomPubHash, denomSig, exchangeBaseUrl: oldCoin.exchangeBaseUrl, status: CoinStatus.Fresh, coinSource: { type: CoinSourceType.Refresh, refreshGroupId, oldCoinPub: refreshGroup.oldCoinPubs[coinIndex], }, sourceTransactionId: transactionId, coinEvHash: pc.coinEvHash, maxAge: pc.maxAge, ageCommitmentProof: pc.ageCommitmentProof, spendAllocation: undefined, }; coins.push(coin); } } await wex.db.runReadWriteTx( { storeNames: [ "coins", "denominations", "coinAvailability", "refreshGroups", "refreshSessions", ], }, async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { logger.warn("no refresh session found"); return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); if (!rs) { return; } rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; for (const coin of coins) { const existingCoin = await tx.coins.get(coin.coinPub); if (existingCoin) { continue; } await tx.coins.add(coin); const denomInfo = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant(!!denomInfo); const car = await getCoinAvailabilityForDenom( wex, tx, denomInfo, coin.maxAge, ); checkDbInvariant( car.pendingRefreshOutputCount != null && car.pendingRefreshOutputCount > 0, ); car.pendingRefreshOutputCount--; car.freshCoinCount++; await tx.coinAvailability.put(car); } await tx.refreshGroups.put(rg); }, ); logger.trace("refresh finished (end of reveal)"); } async function handleRefreshRevealError( ctx: RefreshTransactionContext, coinIndex: number, errDetails: TalerErrorDetail, ): Promise { await ctx.wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "refreshSessions", "coins", "denominations", "coinAvailability", ], }, async (tx) => { const rg = await tx.refreshGroups.get(ctx.refreshGroupId); if (!rg) { return; } if (rg.timestampFinished) { return; } if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { return; } rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; const refreshSession = await tx.refreshSessions.get([ ctx.refreshGroupId, coinIndex, ]); if (!refreshSession) { throw Error("db invariant failed: missing refresh session in database"); } refreshSession.lastError = errDetails; await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); }, ); } export async function processRefreshGroup( wex: WalletExecutionContext, refreshGroupId: string, ): Promise { logger.trace(`processing refresh group ${refreshGroupId}`); const refreshGroup = await wex.db.runReadOnlyTx( { storeNames: ["refreshGroups"] }, async (tx) => tx.refreshGroups.get(refreshGroupId), ); if (!refreshGroup) { return TaskRunResult.finished(); } if (refreshGroup.timestampFinished) { return TaskRunResult.finished(); } if ( wex.ws.config.testing.devModeActive && wex.ws.devExperimentState.blockRefreshes ) { throw Error("refresh blocked"); } // Process refresh sessions of the group in parallel. logger.trace( `processing refresh sessions for ${refreshGroup.oldCoinPubs.length} old coins`, ); let errors: TalerErrorDetail[] = []; let inShutdown = false; const ps = refreshGroup.oldCoinPubs.map((x, i) => processRefreshSession(wex, refreshGroupId, i).catch((x) => { if (x instanceof CryptoApiStoppedError) { inShutdown = true; logger.info( "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", ); return; } if (x instanceof TalerError) { logger.warn("process refresh session got exception (TalerError)"); logger.warn(`exc ${x}`); logger.warn(`exc stack ${x.stack}`); logger.warn(`error detail: ${j2s(x.errorDetail)}`); } else { logger.warn("process refresh session got exception"); logger.warn(`exc ${x}`); logger.warn(`exc stack ${x.stack}`); } errors.push(getErrorDetailFromException(x)); }), ); await Promise.all(ps); if (inShutdown) { return TaskRunResult.finished(); } const ctx = new RefreshTransactionContext(wex, refreshGroupId); // We've processed all refresh session and can now update the // status of the whole refresh group. const transitionInfo = await wex.db.runReadWriteTx( { storeNames: ["coins", "coinAvailability", "refreshGroups"] }, async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } switch (rg.operationStatus) { case RefreshOperationStatus.Pending: break; default: return undefined; } const oldTxState = computeRefreshTransactionState(rg); const allFinal = fnutil.all( rg.statusPerCoin, (x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed, ); const anyFailed = fnutil.any( rg.statusPerCoin, (x) => x === RefreshCoinStatus.Failed, ); if (allFinal) { if (anyFailed) { rg.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); rg.operationStatus = RefreshOperationStatus.Failed; } else { rg.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); rg.operationStatus = RefreshOperationStatus.Finished; } await makeCoinsVisible(wex, tx, ctx.transactionId); await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState, }; } return undefined; }, ); if (transitionInfo) { notifyTransition(wex, ctx.transactionId, transitionInfo); return TaskRunResult.progress(); } if (errors.length > 0) { return { type: TaskRunResultType.Error, errorDetail: makeErrorDetail( TalerErrorCode.WALLET_REFRESH_GROUP_INCOMPLETE, { numErrors: errors.length, errors: errors.slice(0, 5), }, ), }; } return TaskRunResult.backoff(); } async function processRefreshSession( wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise { logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx( { storeNames: ["refreshGroups", "refreshSessions"] }, async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); return { refreshGroup: rg, refreshSession: rs, }; }, ); if (!refreshGroup) { return; } if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) { return; } if (!refreshSession) { // No refresh session for that coin. return; } if (refreshSession.norevealIndex === undefined) { await refreshMelt(wex, refreshGroupId, coinIndex); } await refreshReveal(wex, refreshGroupId, coinIndex); } export interface RefreshOutputInfo { outputPerCoin: AmountJson[]; perExchangeInfo: Record; } export async function calculateRefreshOutput( wex: WalletExecutionContext, tx: WalletDbReadOnlyTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, currency: string, oldCoinPubs: CoinRefreshRequest[], ): Promise { const estimatedOutputPerCoin: AmountJson[] = []; const denomsPerExchange: Record = {}; const infoPerExchange: Record = {}; for (const ocp of oldCoinPubs) { const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant( !!denom, "denomination for existing coin must be in database", ); const refreshAmount = ocp.amount; const cost = await getTotalRefreshCost( wex, tx, denom, Amounts.parseOrThrow(refreshAmount), ); const output = Amounts.sub(refreshAmount, cost).amount; let exchInfo = infoPerExchange[coin.exchangeBaseUrl]; if (!exchInfo) { infoPerExchange[coin.exchangeBaseUrl] = exchInfo = { outputEffective: Amounts.stringify(Amounts.zeroOfAmount(cost)), }; } exchInfo.outputEffective = Amounts.stringify( Amounts.add(exchInfo.outputEffective, output).amount, ); estimatedOutputPerCoin.push(output); } return { outputPerCoin: estimatedOutputPerCoin, perExchangeInfo: infoPerExchange, }; } async function applyRefreshToOldCoins( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, oldCoinPubs: CoinRefreshRequest[], refreshGroupId: string, ): Promise { for (const ocp of oldCoinPubs) { const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant( !!denom, "denomination for existing coin must be in database", ); switch (coin.status) { case CoinStatus.Dormant: break; case CoinStatus.Fresh: { coin.status = CoinStatus.Dormant; const coinAv = await tx.coinAvailability.get([ coin.exchangeBaseUrl, coin.denomPubHash, coin.maxAge, ]); checkDbInvariant(!!coinAv); checkDbInvariant(coinAv.freshCoinCount > 0); coinAv.freshCoinCount--; await tx.coinAvailability.put(coinAv); break; } case CoinStatus.FreshSuspended: { // For suspended coins, we don't have to adjust coin // availability, as they are not counted as available. coin.status = CoinStatus.Dormant; break; } case CoinStatus.DenomLoss: break; default: assertUnreachable(coin.status); } if (!coin.spendAllocation) { coin.spendAllocation = { amount: Amounts.stringify(ocp.amount), // id: `txn:refresh:${refreshGroupId}`, id: constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId, }), }; } await tx.coins.put(coin); } } export interface CreateRefreshGroupResult { refreshGroupId: string; notifications: WalletNotification[]; } /** * Create a refresh group for a list of coins. * * Refreshes the remaining amount on the coin, effectively capturing the remaining * value in the refresh group. * * The caller must also ensure that the coins that should be refreshed exist * in the current database transaction. */ export async function createRefreshGroup( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< [ "denominations", "coins", "refreshGroups", "refreshSessions", "coinAvailability", ] >, currency: string, oldCoinPubs: CoinRefreshRequest[], refreshReason: RefreshReason, originatingTransactionId: string | undefined, ): Promise { // FIXME: Check that involved exchanges are reasonably up-to-date. // Otherwise, error out. const refreshGroupId = encodeCrock(getRandomBytes(32)); const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs); const estimatedOutputPerCoin = outInfo.outputPerCoin; await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId); const refreshGroup: RefreshGroupRecord = { operationStatus: RefreshOperationStatus.Pending, currency, timestampFinished: undefined, statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), originatingTransactionId, reason: refreshReason, refreshGroupId, inputPerCoin: oldCoinPubs.map((x) => x.amount), expectedOutputPerCoin: estimatedOutputPerCoin.map((x) => Amounts.stringify(x), ), infoPerExchange: outInfo.perExchangeInfo, timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), }; if (oldCoinPubs.length == 0) { logger.warn("created refresh group with zero coins"); refreshGroup.timestampFinished = timestampPreciseToDb( TalerPreciseTimestamp.now(), ); refreshGroup.operationStatus = RefreshOperationStatus.Finished; } for (let i = 0; i < oldCoinPubs.length; i++) { await initRefreshSession(wex, tx, refreshGroup, i); } await tx.refreshGroups.put(refreshGroup); const newTxState = computeRefreshTransactionState(refreshGroup); logger.trace(`created refresh group ${refreshGroupId}`); const ctx = new RefreshTransactionContext(wex, refreshGroupId); // Shepherd the task. // If the current transaction fails to commit the refresh // group to the DB, the shepherd will give up. wex.taskScheduler.startShepherdTask(ctx.taskId); return { refreshGroupId, notifications: [ { type: NotificationType.TransactionStateTransition, transactionId: ctx.transactionId, oldTxState: { major: TransactionMajorState.None, }, newTxState, }, ], }; } export function computeRefreshTransactionState( rg: RefreshGroupRecord, ): TransactionState { switch (rg.operationStatus) { case RefreshOperationStatus.Finished: return { major: TransactionMajorState.Done, }; case RefreshOperationStatus.Failed: return { major: TransactionMajorState.Failed, }; case RefreshOperationStatus.Pending: return { major: TransactionMajorState.Pending, }; case RefreshOperationStatus.Suspended: return { major: TransactionMajorState.Suspended, }; } } export function computeRefreshTransactionActions( rg: RefreshGroupRecord, ): TransactionAction[] { switch (rg.operationStatus) { case RefreshOperationStatus.Finished: return [TransactionAction.Delete]; case RefreshOperationStatus.Failed: return [TransactionAction.Delete]; case RefreshOperationStatus.Pending: return [ TransactionAction.Retry, TransactionAction.Suspend, TransactionAction.Fail, ]; case RefreshOperationStatus.Suspended: return [TransactionAction.Resume, TransactionAction.Fail]; } } export function getRefreshesForTransaction( wex: WalletExecutionContext, transactionId: string, ): Promise { return wex.db.runReadOnlyTx({ storeNames: ["refreshGroups"] }, async (tx) => { const groups = await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll( transactionId, ); return groups.map((x) => constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId: x.refreshGroupId, }), ); }); } export interface ForceRefreshResult { refreshGroupId: string; } export async function forceRefresh( wex: WalletExecutionContext, req: ForceRefreshRequest, ): Promise { if (req.refreshCoinSpecs.length == 0) { throw Error("refusing to create empty refresh group"); } const res = await wex.db.runReadWriteTx( { storeNames: [ "refreshGroups", "coinAvailability", "refreshSessions", "denominations", "coins", ], }, async (tx) => { let coinPubs: CoinRefreshRequest[] = []; for (const c of req.refreshCoinSpecs) { const coin = await tx.coins.get(c.coinPub); if (!coin) { throw Error(`coin (pubkey ${c}) not found`); } const denom = await getDenomInfo( wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, ); checkDbInvariant(!!denom); coinPubs.push({ coinPub: c.coinPub, amount: c.amount ?? denom.value, }); } return await createRefreshGroup( wex, tx, Amounts.currencyOf(coinPubs[0].amount), coinPubs, RefreshReason.Manual, undefined, ); }, ); for (const notif of res.notifications) { wex.ws.notify(notif); } return { refreshGroupId: res.refreshGroupId, }; } /** * Wait until a refresh operation is final. */ export async function waitRefreshFinal( wex: WalletExecutionContext, refreshGroupId: string, ): Promise { const ctx = new RefreshTransactionContext(wex, refreshGroupId); wex.taskScheduler.startShepherdTask(ctx.taskId); // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. const refreshNotifFlag = new AsyncFlag(); // Raise purchaseNotifFlag whenever we get a notification // about our refresh. const cancelNotif = wex.ws.addNotificationListener((notif) => { if ( notif.type === NotificationType.TransactionStateTransition && notif.transactionId === ctx.transactionId ) { refreshNotifFlag.raise(); } }); const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { cancelNotif(); refreshNotifFlag.raise(); }); try { await internalWaitRefreshFinal(ctx, refreshNotifFlag); } catch (e) { unregisterOnCancelled(); cancelNotif(); } } async function internalWaitRefreshFinal( ctx: RefreshTransactionContext, flag: AsyncFlag, ): Promise { while (true) { if (ctx.wex.cancellationToken.isCancelled) { throw Error("cancelled"); } // Check if refresh is final const res = await ctx.wex.db.runReadOnlyTx( { storeNames: ["refreshGroups", "operationRetries"] }, async (tx) => { return { rg: await tx.refreshGroups.get(ctx.refreshGroupId), }; }, ); const { rg } = res; if (!rg) { // Must've been deleted, we consider that final. return; } switch (rg.operationStatus) { case RefreshOperationStatus.Failed: case RefreshOperationStatus.Finished: // Transaction is final return; case RefreshOperationStatus.Pending: case RefreshOperationStatus.Suspended: break; } // Wait for the next transition await flag.wait(); flag.reset(); } }