diff options
Diffstat (limited to 'packages/taler-wallet-core/src/refresh.ts')
-rw-r--r-- | packages/taler-wallet-core/src/refresh.ts | 1883 |
1 files changed, 1883 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts new file mode 100644 index 000000000..7800967e6 --- /dev/null +++ b/packages/taler-wallet-core/src/refresh.ts @@ -0,0 +1,1883 @@ +/* + This file is part of GNU Taler + (C) 2019-2024 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +/** + * @fileoverview + * Implementation of the refresh transaction. + */ + +/** + * Imports. + */ +import { + AgeCommitment, + AgeRestriction, + AmountJson, + Amounts, + amountToPretty, + assertUnreachable, + AsyncFlag, + checkDbInvariant, + codecForCoinHistoryResponse, + codecForExchangeMeltResponse, + codecForExchangeRevealResponse, + CoinPublicKeyString, + CoinRefreshRequest, + CoinStatus, + DenominationInfo, + DenomKeyType, + Duration, + encodeCrock, + ExchangeMeltRequest, + ExchangeProtocolVersion, + ExchangeRefreshRevealRequest, + fnutil, + ForceRefreshRequest, + getErrorDetailFromException, + getRandomBytes, + HashCodeString, + HttpStatusCode, + j2s, + Logger, + makeErrorDetail, + NotificationType, + RefreshReason, + TalerError, + TalerErrorCode, + TalerErrorDetail, + TalerPreciseTimestamp, + TransactionAction, + TransactionIdStr, + TransactionMajorState, + TransactionState, + TransactionType, + URL, + WalletNotification, +} from "@gnu-taler/taler-util"; +import { + readSuccessResponseJsonOrThrow, + readTalerErrorResponse, + throwUnexpectedRequestError, +} from "@gnu-taler/taler-util/http"; +import { + constructTaskIdentifier, + makeCoinsVisible, + PendingTaskType, + TaskIdStr, + TaskRunResult, + TaskRunResultType, + TombstoneTag, + TransactionContext, + TransitionResult, + TransitionResultType, +} from "./common.js"; +import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; +import { + DerivedRefreshSession, + RefreshNewDenomInfo, +} from "./crypto/cryptoTypes.js"; +import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; +import { + CoinAvailabilityRecord, + CoinRecord, + CoinSourceType, + DenominationRecord, + RefreshCoinStatus, + RefreshGroupPerExchangeInfo, + RefreshGroupRecord, + RefreshOperationStatus, + RefreshSessionRecord, + timestampPreciseToDb, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, + WalletDbStoresArr, +} from "./db.js"; +import { selectWithdrawalDenominations } from "./denomSelection.js"; +import { + constructTransactionIdentifier, + notifyTransition, + TransitionInfo, +} from "./transactions.js"; +import { + EXCHANGE_COINS_LOCK, + getDenomInfo, + WalletExecutionContext, +} from "./wallet.js"; +import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; + +const logger = new Logger("refresh.ts"); + +/** + * Update the materialized refresh transaction based + * on the refresh group record. + */ +async function updateRefreshTransaction( + ctx: RefreshTransactionContext, + tx: WalletDbReadWriteTransaction< + [ + "refreshGroups", + "transactions", + "operationRetries", + "exchanges", + "exchangeDetails", + ] + >, +): Promise<void> {} + +export class RefreshTransactionContext implements TransactionContext { + readonly transactionId: TransactionIdStr; + readonly taskId: TaskIdStr; + + constructor( + public wex: WalletExecutionContext, + public refreshGroupId: string, + ) { + this.transactionId = constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId, + }); + this.taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId, + }); + } + + /** + * Transition a withdrawal transaction. + * Extra object stores may be accessed during the transition. + */ + async transition<StoreNameArray extends WalletDbStoresArr = []>( + opts: { extraStores?: StoreNameArray; transactionLabel?: string }, + f: ( + rec: RefreshGroupRecord | undefined, + tx: WalletDbReadWriteTransaction< + [ + "refreshGroups", + "transactions", + "operationRetries", + "exchanges", + "exchangeDetails", + ...StoreNameArray, + ] + >, + ) => Promise<TransitionResult<RefreshGroupRecord>>, + ): Promise<TransitionInfo | undefined> { + const baseStores = [ + "refreshGroups" as const, + "transactions" as const, + "operationRetries" as const, + "exchanges" as const, + "exchangeDetails" as const, + ]; + let stores = opts.extraStores + ? [...baseStores, ...opts.extraStores] + : baseStores; + const transitionInfo = await this.wex.db.runReadWriteTx( + { storeNames: stores }, + async (tx) => { + const wgRec = await tx.refreshGroups.get(this.refreshGroupId); + let oldTxState: TransactionState; + if (wgRec) { + oldTxState = computeRefreshTransactionState(wgRec); + } else { + oldTxState = { + major: TransactionMajorState.None, + }; + } + const res = await f(wgRec, tx); + switch (res.type) { + case TransitionResultType.Transition: { + await tx.refreshGroups.put(res.rec); + await updateRefreshTransaction(this, tx); + const newTxState = computeRefreshTransactionState(res.rec); + return { + oldTxState, + newTxState, + }; + } + case TransitionResultType.Delete: + await tx.refreshGroups.delete(this.refreshGroupId); + await updateRefreshTransaction(this, tx); + return { + oldTxState, + newTxState: { + major: TransactionMajorState.None, + }, + }; + default: + return undefined; + } + }, + ); + notifyTransition(this.wex, this.transactionId, transitionInfo); + return transitionInfo; + } + + async deleteTransaction(): Promise<void> { + await this.transition( + { + extraStores: ["tombstones"], + }, + async (rec, tx) => { + if (!rec) { + return TransitionResult.stay(); + } + await tx.tombstones.put({ + id: TombstoneTag.DeleteRefreshGroup + ":" + this.refreshGroupId, + }); + return TransitionResult.delete(); + }, + ); + } + + async suspendTransaction(): Promise<void> { + await this.transition({}, async (rec, tx) => { + if (!rec) { + return TransitionResult.stay(); + } + switch (rec.operationStatus) { + case RefreshOperationStatus.Finished: + case RefreshOperationStatus.Suspended: + case RefreshOperationStatus.Failed: + return TransitionResult.stay(); + case RefreshOperationStatus.Pending: { + rec.operationStatus = RefreshOperationStatus.Suspended; + return TransitionResult.transition(rec); + } + default: + assertUnreachable(rec.operationStatus); + } + }); + } + + async abortTransaction(): Promise<void> { + // Refresh transactions only support fail, not abort. + throw new Error("refresh transactions cannot be aborted"); + } + + async resumeTransaction(): Promise<void> { + await this.transition({}, async (rec, tx) => { + if (!rec) { + return TransitionResult.stay(); + } + switch (rec.operationStatus) { + case RefreshOperationStatus.Finished: + case RefreshOperationStatus.Failed: + case RefreshOperationStatus.Pending: + return TransitionResult.stay(); + case RefreshOperationStatus.Suspended: { + rec.operationStatus = RefreshOperationStatus.Pending; + return TransitionResult.transition(rec); + } + default: + assertUnreachable(rec.operationStatus); + } + }); + } + + async failTransaction(): Promise<void> { + await this.transition({}, async (rec, tx) => { + if (!rec) { + return TransitionResult.stay(); + } + switch (rec.operationStatus) { + case RefreshOperationStatus.Finished: + case RefreshOperationStatus.Failed: + return TransitionResult.stay(); + case RefreshOperationStatus.Pending: + case RefreshOperationStatus.Suspended: { + rec.operationStatus = RefreshOperationStatus.Failed; + return TransitionResult.transition(rec); + } + default: + assertUnreachable(rec.operationStatus); + } + }); + } +} + +export async function getTotalRefreshCost( + wex: WalletExecutionContext, + tx: WalletDbReadOnlyTransaction<["denominations"]>, + refreshedDenom: DenominationInfo, + amountLeft: AmountJson, +): Promise<AmountJson> { + const cacheKey = `denom=${refreshedDenom.exchangeBaseUrl}/${ + refreshedDenom.denomPubHash + };left=${Amounts.stringify(amountLeft)}`; + const cacheRes = wex.ws.refreshCostCache.get(cacheKey); + if (cacheRes) { + return cacheRes; + } + const allDenoms = await getCandidateWithdrawalDenomsTx( + wex, + tx, + refreshedDenom.exchangeBaseUrl, + Amounts.currencyOf(amountLeft), + ); + const res = getTotalRefreshCostInternal( + allDenoms, + refreshedDenom, + amountLeft, + ); + wex.ws.refreshCostCache.put(cacheKey, res); + return res; +} + +/** + * Get the amount that we lose when refreshing a coin of the given denomination + * with a certain amount left. + * + * If the amount left is zero, then the refresh cost + * is also considered to be zero. If a refresh isn't possible (e.g. due to lack of + * the right denominations), then the cost is the full amount left. + * + * Considers refresh fees, withdrawal fees after refresh and amounts too small + * to refresh. + */ +export function getTotalRefreshCostInternal( + denoms: DenominationRecord[], + refreshedDenom: DenominationInfo, + amountLeft: AmountJson, +): AmountJson { + const withdrawAmount = Amounts.sub( + amountLeft, + refreshedDenom.feeRefresh, + ).amount; + const denomMap = Object.fromEntries(denoms.map((x) => [x.denomPubHash, x])); + const withdrawDenoms = selectWithdrawalDenominations( + withdrawAmount, + denoms, + false, + ); + const resultingAmount = Amounts.add( + Amounts.zeroOfCurrency(withdrawAmount.currency), + ...withdrawDenoms.selectedDenoms.map( + (d) => Amounts.mult(denomMap[d.denomPubHash].value, d.count).amount, + ), + ).amount; + const totalCost = Amounts.sub(amountLeft, resultingAmount).amount; + logger.trace( + `total refresh cost for ${amountToPretty(amountLeft)} is ${amountToPretty( + totalCost, + )}`, + ); + return totalCost; +} + +async function getCoinAvailabilityForDenom( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["coins", "coinAvailability", "denominations"] + >, + denom: DenominationInfo, + ageRestriction: number, +): Promise<CoinAvailabilityRecord> { + checkDbInvariant(!!denom); + let car = await tx.coinAvailability.get([ + denom.exchangeBaseUrl, + denom.denomPubHash, + ageRestriction, + ]); + if (!car) { + car = { + maxAge: ageRestriction, + value: denom.value, + currency: Amounts.currencyOf(denom.value), + denomPubHash: denom.denomPubHash, + exchangeBaseUrl: denom.exchangeBaseUrl, + freshCoinCount: 0, + visibleCoinCount: 0, + }; + } + return car; +} + +/** + * Create a refresh session for one particular coin inside a refresh group. + */ +async function initRefreshSession( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["refreshSessions", "coinAvailability", "coins", "denominations"] + >, + refreshGroup: RefreshGroupRecord, + coinIndex: number, +): Promise<void> { + const refreshGroupId = refreshGroup.refreshGroupId; + logger.trace( + `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, + ); + const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; + const oldCoin = await tx.coins.get(oldCoinPub); + if (!oldCoin) { + throw Error("Can't refresh, coin not found"); + } + + const exchangeBaseUrl = oldCoin.exchangeBaseUrl; + + const sessionSecretSeed = encodeCrock(getRandomBytes(64)); + + const oldDenom = await getDenomInfo( + wex, + tx, + exchangeBaseUrl, + oldCoin.denomPubHash, + ); + + if (!oldDenom) { + throw Error("db inconsistent: denomination for coin not found"); + } + + const currency = refreshGroup.currency; + + const availableDenoms = await getCandidateWithdrawalDenomsTx( + wex, + tx, + exchangeBaseUrl, + currency, + ); + + const availableAmount = Amounts.sub( + refreshGroup.inputPerCoin[coinIndex], + oldDenom.feeRefresh, + ).amount; + + const newCoinDenoms = selectWithdrawalDenominations( + availableAmount, + availableDenoms, + wex.ws.config.testing.denomselAllowLate, + ); + + if (newCoinDenoms.selectedDenoms.length === 0) { + logger.trace( + `not refreshing, available amount ${amountToPretty( + availableAmount, + )} too small`, + ); + refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; + return; + } + + for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) { + const dph = newCoinDenoms.selectedDenoms[i].denomPubHash; + const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph); + if (!denom) { + logger.error(`denom ${dph} not in DB`); + continue; + } + const car = await getCoinAvailabilityForDenom( + wex, + tx, + denom, + oldCoin.maxAge, + ); + car.pendingRefreshOutputCount = + (car.pendingRefreshOutputCount ?? 0) + + newCoinDenoms.selectedDenoms[i].count; + await tx.coinAvailability.put(car); + } + + const newSession: RefreshSessionRecord = { + coinIndex, + refreshGroupId, + norevealIndex: undefined, + sessionSecretSeed: sessionSecretSeed, + newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ + count: x.count, + denomPubHash: x.denomPubHash, + })), + amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), + }; + await tx.refreshSessions.put(newSession); +} + +/** + * Uninitialize a refresh session. + * + * Adjust the coin availability of involved coins. + */ +async function destroyRefreshSession( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["denominations", "coinAvailability", "coins"] + >, + refreshGroup: RefreshGroupRecord, + refreshSession: RefreshSessionRecord, +): Promise<void> { + for (let i = 0; i < refreshSession.newDenoms.length; i++) { + const oldCoin = await tx.coins.get( + refreshGroup.oldCoinPubs[refreshSession.coinIndex], + ); + if (!oldCoin) { + continue; + } + const dph = refreshSession.newDenoms[i].denomPubHash; + const denom = await getDenomInfo(wex, tx, oldCoin.exchangeBaseUrl, dph); + if (!denom) { + logger.error(`denom ${dph} not in DB`); + continue; + } + const car = await getCoinAvailabilityForDenom( + wex, + tx, + denom, + oldCoin.maxAge, + ); + checkDbInvariant(car.pendingRefreshOutputCount != null); + car.pendingRefreshOutputCount = + car.pendingRefreshOutputCount - refreshSession.newDenoms[i].count; + await tx.coinAvailability.put(car); + } +} + +function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { + return Duration.fromSpec({ + seconds: 5, + }); +} + +/** + * Run the melt step of a refresh session. + * + * If the melt step succeeds or fails permanently, + * the status in the refresh group is updated. + * + * When a transient error occurs, an exception is thrown. + */ +async function refreshMelt( + wex: WalletExecutionContext, + refreshGroupId: string, + coinIndex: number, +): Promise<void> { + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + const d = await wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "coins", + "denominations", + ], + }, + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = await tx.refreshSessions.get([ + refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + return; + } + if (refreshSession.norevealIndex !== undefined) { + return; + } + + const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); + checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); + const oldDenom = await getDenomInfo( + wex, + tx, + oldCoin.exchangeBaseUrl, + oldCoin.denomPubHash, + ); + checkDbInvariant( + !!oldDenom, + "denomination for melted coin doesn't exist", + ); + + const newCoinDenoms: RefreshNewDenomInfo[] = []; + + for (const dh of refreshSession.newDenoms) { + const newDenom = await getDenomInfo( + wex, + tx, + oldCoin.exchangeBaseUrl, + dh.denomPubHash, + ); + checkDbInvariant( + !!newDenom, + "new denomination for refresh not in database", + ); + newCoinDenoms.push({ + count: dh.count, + denomPub: newDenom.denomPub, + denomPubHash: newDenom.denomPubHash, + feeWithdraw: newDenom.feeWithdraw, + value: Amounts.stringify(newDenom.value), + }); + } + return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; + }, + ); + + if (!d) { + return; + } + + const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d; + + let exchangeProtocolVersion: ExchangeProtocolVersion; + switch (d.oldDenom.denomPub.cipher) { + case DenomKeyType.Rsa: { + exchangeProtocolVersion = ExchangeProtocolVersion.V12; + break; + } + default: + throw Error("unsupported key type"); + } + + const derived = await wex.cryptoApi.deriveRefreshSession({ + exchangeProtocolVersion, + kappa: 3, + meltCoinDenomPubHash: oldCoin.denomPubHash, + meltCoinPriv: oldCoin.coinPriv, + meltCoinPub: oldCoin.coinPub, + feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), + meltCoinMaxAge: oldCoin.maxAge, + meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, + newCoinDenoms, + sessionSecretSeed: refreshSession.sessionSecretSeed, + }); + + const reqUrl = new URL( + `coins/${oldCoin.coinPub}/melt`, + oldCoin.exchangeBaseUrl, + ); + + let maybeAch: HashCodeString | undefined; + if (oldCoin.ageCommitmentProof) { + maybeAch = AgeRestriction.hashCommitment( + oldCoin.ageCommitmentProof.commitment, + ); + } + + const meltReqBody: ExchangeMeltRequest = { + coin_pub: oldCoin.coinPub, + confirm_sig: derived.confirmSig, + denom_pub_hash: oldCoin.denomPubHash, + denom_sig: oldCoin.denomSig, + rc: derived.hash, + value_with_fee: Amounts.stringify(derived.meltValueWithFee), + age_commitment_hash: maybeAch, + }; + + const resp = await wex.ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await wex.http.fetch(reqUrl.href, { + method: "POST", + body: meltReqBody, + timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); + + switch (resp.status) { + case HttpStatusCode.NotFound: { + const errDetail = await readTalerErrorResponse(resp); + await handleRefreshMeltNotFound(ctx, coinIndex, errDetail); + return; + } + case HttpStatusCode.Gone: { + const errDetail = await readTalerErrorResponse(resp); + await handleRefreshMeltGone(ctx, coinIndex, errDetail); + return; + } + case HttpStatusCode.Conflict: { + const errDetail = await readTalerErrorResponse(resp); + await handleRefreshMeltConflict( + ctx, + coinIndex, + errDetail, + derived, + oldCoin, + ); + return; + } + case HttpStatusCode.Ok: + break; + default: { + const errDetail = await readTalerErrorResponse(resp); + throwUnexpectedRequestError(resp, errDetail); + } + } + + const meltResponse = await readSuccessResponseJsonOrThrow( + resp, + codecForExchangeMeltResponse(), + ); + + const norevealIndex = meltResponse.noreveal_index; + + refreshSession.norevealIndex = norevealIndex; + + await wex.db.runReadWriteTx( + { storeNames: ["refreshGroups", "refreshSessions"] }, + async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); + if (!rs) { + return; + } + if (rs.norevealIndex !== undefined) { + return; + } + rs.norevealIndex = norevealIndex; + await tx.refreshSessions.put(rs); + }, + ); +} + +async function handleRefreshMeltGone( + ctx: RefreshTransactionContext, + coinIndex: number, + errDetails: TalerErrorDetail, +): Promise<void> { + // const expiredMsg = codecForDenominationExpiredMessage().decode(errDetails); + + // FIXME: Validate signature. + + await ctx.wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "coins", + "denominations", + "coinAvailability", + ], + }, + async (tx) => { + const rg = await tx.refreshGroups.get(ctx.refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { + return; + } + rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; + const refreshSession = await tx.refreshSessions.get([ + ctx.refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + throw Error("db invariant failed: missing refresh session in database"); + } + refreshSession.lastError = errDetails; + await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); + await tx.refreshGroups.put(rg); + await tx.refreshSessions.put(refreshSession); + }, + ); +} + +async function handleRefreshMeltConflict( + ctx: RefreshTransactionContext, + coinIndex: number, + errDetails: TalerErrorDetail, + derived: DerivedRefreshSession, + oldCoin: CoinRecord, +): Promise<void> { + // Just log for better diagnostics here, error status + // will be handled later. + logger.error( + `melt request for ${Amounts.stringify( + derived.meltValueWithFee, + )} failed in refresh group ${ctx.refreshGroupId} due to conflict`, + ); + + const historySig = await ctx.wex.cryptoApi.signCoinHistoryRequest({ + coinPriv: oldCoin.coinPriv, + coinPub: oldCoin.coinPub, + startOffset: 0, + }); + + const historyUrl = new URL( + `coins/${oldCoin.coinPub}/history`, + oldCoin.exchangeBaseUrl, + ); + + const historyResp = await ctx.wex.http.fetch(historyUrl.href, { + method: "GET", + headers: { + "Taler-Coin-History-Signature": historySig.sig, + }, + cancellationToken: ctx.wex.cancellationToken, + }); + + const historyJson = await readSuccessResponseJsonOrThrow( + historyResp, + codecForCoinHistoryResponse(), + ); + logger.info(`coin history: ${j2s(historyJson)}`); + + // FIXME: If response seems wrong, report to auditor (in the future!); + + await ctx.wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "denominations", + "coins", + "coinAvailability", + ], + }, + async (tx) => { + const rg = await tx.refreshGroups.get(ctx.refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { + return; + } + if (Amounts.isZero(historyJson.balance)) { + rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; + const refreshSession = await tx.refreshSessions.get([ + ctx.refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + throw Error( + "db invariant failed: missing refresh session in database", + ); + } + refreshSession.lastError = errDetails; + await tx.refreshGroups.put(rg); + await tx.refreshSessions.put(refreshSession); + } else { + // Try again with new denoms! + rg.inputPerCoin[coinIndex] = historyJson.balance; + const refreshSession = await tx.refreshSessions.get([ + ctx.refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + throw Error( + "db invariant failed: missing refresh session in database", + ); + } + await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); + await tx.refreshSessions.delete([ctx.refreshGroupId, coinIndex]); + await initRefreshSession(ctx.wex, tx, rg, coinIndex); + } + }, + ); +} + +async function handleRefreshMeltNotFound( + ctx: RefreshTransactionContext, + coinIndex: number, + errDetails: TalerErrorDetail, +): Promise<void> { + // FIXME: Validate the exchange's error response + await ctx.wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "coins", + "denominations", + "coinAvailability", + ], + }, + async (tx) => { + const rg = await tx.refreshGroups.get(ctx.refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { + return; + } + rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; + const refreshSession = await tx.refreshSessions.get([ + ctx.refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + throw Error("db invariant failed: missing refresh session in database"); + } + await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); + refreshSession.lastError = errDetails; + await tx.refreshGroups.put(rg); + await tx.refreshSessions.put(refreshSession); + }, + ); +} + +export async function assembleRefreshRevealRequest(args: { + cryptoApi: TalerCryptoInterface; + derived: DerivedRefreshSession; + norevealIndex: number; + oldCoinPub: CoinPublicKeyString; + oldCoinPriv: string; + newDenoms: { + denomPubHash: string; + count: number; + }[]; + oldAgeCommitment?: AgeCommitment; +}): Promise<ExchangeRefreshRevealRequest> { + const { + derived, + norevealIndex, + cryptoApi, + oldCoinPriv, + oldCoinPub, + newDenoms, + } = args; + const privs = Array.from(derived.transferPrivs); + privs.splice(norevealIndex, 1); + + const planchets = derived.planchetsForGammas[norevealIndex]; + if (!planchets) { + throw Error("refresh index error"); + } + + const newDenomsFlat: string[] = []; + const linkSigs: string[] = []; + + for (let i = 0; i < newDenoms.length; i++) { + const dsel = newDenoms[i]; + for (let j = 0; j < dsel.count; j++) { + const newCoinIndex = linkSigs.length; + const linkSig = await cryptoApi.signCoinLink({ + coinEv: planchets[newCoinIndex].coinEv, + newDenomHash: dsel.denomPubHash, + oldCoinPriv: oldCoinPriv, + oldCoinPub: oldCoinPub, + transferPub: derived.transferPubs[norevealIndex], + }); + linkSigs.push(linkSig.sig); + newDenomsFlat.push(dsel.denomPubHash); + } + } + + const req: ExchangeRefreshRevealRequest = { + coin_evs: planchets.map((x) => x.coinEv), + new_denoms_h: newDenomsFlat, + transfer_privs: privs, + transfer_pub: derived.transferPubs[norevealIndex], + link_sigs: linkSigs, + old_age_commitment: args.oldAgeCommitment?.publicKeys, + }; + return req; +} + +async function refreshReveal( + wex: WalletExecutionContext, + refreshGroupId: string, + coinIndex: number, +): Promise<void> { + logger.trace( + `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, + ); + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + const d = await wex.db.runReadOnlyTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "coins", + "denominations", + ], + }, + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = await tx.refreshSessions.get([ + refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + return; + } + const norevealIndex = refreshSession.norevealIndex; + if (norevealIndex === undefined) { + throw Error("can't reveal without melting first"); + } + + const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); + checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); + const oldDenom = await getDenomInfo( + wex, + tx, + oldCoin.exchangeBaseUrl, + oldCoin.denomPubHash, + ); + checkDbInvariant( + !!oldDenom, + "denomination for melted coin doesn't exist", + ); + + const newCoinDenoms: RefreshNewDenomInfo[] = []; + + for (const dh of refreshSession.newDenoms) { + const newDenom = await getDenomInfo( + wex, + tx, + oldCoin.exchangeBaseUrl, + dh.denomPubHash, + ); + checkDbInvariant( + !!newDenom, + "new denomination for refresh not in database", + ); + newCoinDenoms.push({ + count: dh.count, + denomPub: newDenom.denomPub, + denomPubHash: newDenom.denomPubHash, + feeWithdraw: newDenom.feeWithdraw, + value: Amounts.stringify(newDenom.value), + }); + } + return { + oldCoin, + oldDenom, + newCoinDenoms, + refreshSession, + refreshGroup, + norevealIndex, + }; + }, + ); + + if (!d) { + return; + } + + const { + oldCoin, + oldDenom, + newCoinDenoms, + refreshSession, + refreshGroup, + norevealIndex, + } = d; + + let exchangeProtocolVersion: ExchangeProtocolVersion; + switch (d.oldDenom.denomPub.cipher) { + case DenomKeyType.Rsa: { + exchangeProtocolVersion = ExchangeProtocolVersion.V12; + break; + } + default: + throw Error("unsupported key type"); + } + + const derived = await wex.cryptoApi.deriveRefreshSession({ + exchangeProtocolVersion, + kappa: 3, + meltCoinDenomPubHash: oldCoin.denomPubHash, + meltCoinPriv: oldCoin.coinPriv, + meltCoinPub: oldCoin.coinPub, + feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh), + newCoinDenoms, + meltCoinMaxAge: oldCoin.maxAge, + meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof, + sessionSecretSeed: refreshSession.sessionSecretSeed, + }); + + const reqUrl = new URL( + `refreshes/${derived.hash}/reveal`, + oldCoin.exchangeBaseUrl, + ); + + const req = await assembleRefreshRevealRequest({ + cryptoApi: wex.cryptoApi, + derived, + newDenoms: newCoinDenoms, + norevealIndex: norevealIndex, + oldCoinPriv: oldCoin.coinPriv, + oldCoinPub: oldCoin.coinPub, + oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment, + }); + + const resp = await wex.ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await wex.http.fetch(reqUrl.href, { + body: req, + method: "POST", + timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); + + switch (resp.status) { + case HttpStatusCode.Ok: + break; + case HttpStatusCode.Conflict: + case HttpStatusCode.Gone: { + const errDetail = await readTalerErrorResponse(resp); + await handleRefreshRevealError(ctx, coinIndex, errDetail); + return; + } + default: { + const errDetail = await readTalerErrorResponse(resp); + throwUnexpectedRequestError(resp, errDetail); + } + } + + const reveal = await readSuccessResponseJsonOrThrow( + resp, + codecForExchangeRevealResponse(), + ); + + const coins: CoinRecord[] = []; + + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId, + }); + + for (let i = 0; i < refreshSession.newDenoms.length; i++) { + const ncd = newCoinDenoms[i]; + for (let j = 0; j < refreshSession.newDenoms[i].count; j++) { + const newCoinIndex = coins.length; + const pc = derived.planchetsForGammas[norevealIndex][newCoinIndex]; + if (ncd.denomPub.cipher !== DenomKeyType.Rsa) { + throw Error("cipher unsupported"); + } + const evSig = reveal.ev_sigs[newCoinIndex].ev_sig; + const denomSig = await wex.cryptoApi.unblindDenominationSignature({ + planchet: { + blindingKey: pc.blindingKey, + denomPub: ncd.denomPub, + }, + evSig, + }); + const coin: CoinRecord = { + blindingKey: pc.blindingKey, + coinPriv: pc.coinPriv, + coinPub: pc.coinPub, + denomPubHash: ncd.denomPubHash, + denomSig, + exchangeBaseUrl: oldCoin.exchangeBaseUrl, + status: CoinStatus.Fresh, + coinSource: { + type: CoinSourceType.Refresh, + refreshGroupId, + oldCoinPub: refreshGroup.oldCoinPubs[coinIndex], + }, + sourceTransactionId: transactionId, + coinEvHash: pc.coinEvHash, + maxAge: pc.maxAge, + ageCommitmentProof: pc.ageCommitmentProof, + spendAllocation: undefined, + }; + + coins.push(coin); + } + } + + await wex.db.runReadWriteTx( + { + storeNames: [ + "coins", + "denominations", + "coinAvailability", + "refreshGroups", + "refreshSessions", + ], + }, + async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (!rg) { + logger.warn("no refresh session found"); + return; + } + if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { + return; + } + const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); + if (!rs) { + return; + } + rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; + for (const coin of coins) { + const existingCoin = await tx.coins.get(coin.coinPub); + if (existingCoin) { + continue; + } + await tx.coins.add(coin); + const denomInfo = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant(!!denomInfo); + const car = await getCoinAvailabilityForDenom( + wex, + tx, + denomInfo, + coin.maxAge, + ); + checkDbInvariant( + car.pendingRefreshOutputCount != null && + car.pendingRefreshOutputCount > 0, + ); + car.pendingRefreshOutputCount--; + car.freshCoinCount++; + await tx.coinAvailability.put(car); + } + await tx.refreshGroups.put(rg); + }, + ); + logger.trace("refresh finished (end of reveal)"); +} + +async function handleRefreshRevealError( + ctx: RefreshTransactionContext, + coinIndex: number, + errDetails: TalerErrorDetail, +): Promise<void> { + await ctx.wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "refreshSessions", + "coins", + "denominations", + "coinAvailability", + ], + }, + async (tx) => { + const rg = await tx.refreshGroups.get(ctx.refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) { + return; + } + rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; + const refreshSession = await tx.refreshSessions.get([ + ctx.refreshGroupId, + coinIndex, + ]); + if (!refreshSession) { + throw Error("db invariant failed: missing refresh session in database"); + } + refreshSession.lastError = errDetails; + await destroyRefreshSession(ctx.wex, tx, rg, refreshSession); + await tx.refreshGroups.put(rg); + await tx.refreshSessions.put(refreshSession); + }, + ); +} + +export async function processRefreshGroup( + wex: WalletExecutionContext, + refreshGroupId: string, +): Promise<TaskRunResult> { + logger.trace(`processing refresh group ${refreshGroupId}`); + + const refreshGroup = await wex.db.runReadOnlyTx( + { storeNames: ["refreshGroups"] }, + async (tx) => tx.refreshGroups.get(refreshGroupId), + ); + if (!refreshGroup) { + return TaskRunResult.finished(); + } + if (refreshGroup.timestampFinished) { + return TaskRunResult.finished(); + } + + if ( + wex.ws.config.testing.devModeActive && + wex.ws.devExperimentState.blockRefreshes + ) { + throw Error("refresh blocked"); + } + + // Process refresh sessions of the group in parallel. + logger.trace( + `processing refresh sessions for ${refreshGroup.oldCoinPubs.length} old coins`, + ); + let errors: TalerErrorDetail[] = []; + let inShutdown = false; + const ps = refreshGroup.oldCoinPubs.map((x, i) => + processRefreshSession(wex, refreshGroupId, i).catch((x) => { + if (x instanceof CryptoApiStoppedError) { + inShutdown = true; + logger.info( + "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", + ); + return; + } + if (x instanceof TalerError) { + logger.warn("process refresh session got exception (TalerError)"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + logger.warn(`error detail: ${j2s(x.errorDetail)}`); + } else { + logger.warn("process refresh session got exception"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + } + errors.push(getErrorDetailFromException(x)); + }), + ); + await Promise.all(ps); + if (inShutdown) { + return TaskRunResult.finished(); + } + + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + + // We've processed all refresh session and can now update the + // status of the whole refresh group. + + const transitionInfo = await wex.db.runReadWriteTx( + { storeNames: ["coins", "coinAvailability", "refreshGroups"] }, + async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (!rg) { + return; + } + switch (rg.operationStatus) { + case RefreshOperationStatus.Pending: + break; + default: + return undefined; + } + const oldTxState = computeRefreshTransactionState(rg); + const allFinal = fnutil.all( + rg.statusPerCoin, + (x) => + x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed, + ); + const anyFailed = fnutil.any( + rg.statusPerCoin, + (x) => x === RefreshCoinStatus.Failed, + ); + if (allFinal) { + if (anyFailed) { + rg.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + rg.operationStatus = RefreshOperationStatus.Failed; + } else { + rg.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + rg.operationStatus = RefreshOperationStatus.Finished; + } + await makeCoinsVisible(wex, tx, ctx.transactionId); + await tx.refreshGroups.put(rg); + const newTxState = computeRefreshTransactionState(rg); + return { + oldTxState, + newTxState, + }; + } + return undefined; + }, + ); + + if (transitionInfo) { + notifyTransition(wex, ctx.transactionId, transitionInfo); + return TaskRunResult.progress(); + } + + if (errors.length > 0) { + return { + type: TaskRunResultType.Error, + errorDetail: makeErrorDetail( + TalerErrorCode.WALLET_REFRESH_GROUP_INCOMPLETE, + { + numErrors: errors.length, + errors: errors.slice(0, 5), + }, + ), + }; + } + + return TaskRunResult.backoff(); +} + +async function processRefreshSession( + wex: WalletExecutionContext, + refreshGroupId: string, + coinIndex: number, +): Promise<void> { + logger.trace( + `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, + ); + let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx( + { storeNames: ["refreshGroups", "refreshSessions"] }, + async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); + return { + refreshGroup: rg, + refreshSession: rs, + }; + }, + ); + if (!refreshGroup) { + return; + } + if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) { + return; + } + if (!refreshSession) { + // No refresh session for that coin. + return; + } + if (refreshSession.norevealIndex === undefined) { + await refreshMelt(wex, refreshGroupId, coinIndex); + } + await refreshReveal(wex, refreshGroupId, coinIndex); +} + +export interface RefreshOutputInfo { + outputPerCoin: AmountJson[]; + perExchangeInfo: Record<string, RefreshGroupPerExchangeInfo>; +} + +export async function calculateRefreshOutput( + wex: WalletExecutionContext, + tx: WalletDbReadOnlyTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, + currency: string, + oldCoinPubs: CoinRefreshRequest[], +): Promise<RefreshOutputInfo> { + const estimatedOutputPerCoin: AmountJson[] = []; + + const denomsPerExchange: Record<string, DenominationRecord[]> = {}; + + const infoPerExchange: Record<string, RefreshGroupPerExchangeInfo> = {}; + + for (const ocp of oldCoinPubs) { + const coin = await tx.coins.get(ocp.coinPub); + checkDbInvariant(!!coin, "coin must be in database"); + const denom = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant( + !!denom, + "denomination for existing coin must be in database", + ); + const refreshAmount = ocp.amount; + const cost = await getTotalRefreshCost( + wex, + tx, + denom, + Amounts.parseOrThrow(refreshAmount), + ); + const output = Amounts.sub(refreshAmount, cost).amount; + let exchInfo = infoPerExchange[coin.exchangeBaseUrl]; + if (!exchInfo) { + infoPerExchange[coin.exchangeBaseUrl] = exchInfo = { + outputEffective: Amounts.stringify(Amounts.zeroOfAmount(cost)), + }; + } + exchInfo.outputEffective = Amounts.stringify( + Amounts.add(exchInfo.outputEffective, output).amount, + ); + estimatedOutputPerCoin.push(output); + } + + return { + outputPerCoin: estimatedOutputPerCoin, + perExchangeInfo: infoPerExchange, + }; +} + +async function applyRefreshToOldCoins( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, + oldCoinPubs: CoinRefreshRequest[], + refreshGroupId: string, +): Promise<void> { + for (const ocp of oldCoinPubs) { + const coin = await tx.coins.get(ocp.coinPub); + checkDbInvariant(!!coin, "coin must be in database"); + const denom = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant( + !!denom, + "denomination for existing coin must be in database", + ); + switch (coin.status) { + case CoinStatus.Dormant: + break; + case CoinStatus.Fresh: { + coin.status = CoinStatus.Dormant; + const coinAv = await tx.coinAvailability.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + coin.maxAge, + ]); + checkDbInvariant(!!coinAv); + checkDbInvariant(coinAv.freshCoinCount > 0); + coinAv.freshCoinCount--; + await tx.coinAvailability.put(coinAv); + break; + } + case CoinStatus.FreshSuspended: { + // For suspended coins, we don't have to adjust coin + // availability, as they are not counted as available. + coin.status = CoinStatus.Dormant; + break; + } + case CoinStatus.DenomLoss: + break; + default: + assertUnreachable(coin.status); + } + if (!coin.spendAllocation) { + coin.spendAllocation = { + amount: Amounts.stringify(ocp.amount), + // id: `txn:refresh:${refreshGroupId}`, + id: constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId, + }), + }; + } + await tx.coins.put(coin); + } +} + +export interface CreateRefreshGroupResult { + refreshGroupId: string; + notifications: WalletNotification[]; +} + +/** + * Create a refresh group for a list of coins. + * + * Refreshes the remaining amount on the coin, effectively capturing the remaining + * value in the refresh group. + * + * The caller must also ensure that the coins that should be refreshed exist + * in the current database transaction. + */ +export async function createRefreshGroup( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + [ + "denominations", + "coins", + "refreshGroups", + "refreshSessions", + "coinAvailability", + ] + >, + currency: string, + oldCoinPubs: CoinRefreshRequest[], + refreshReason: RefreshReason, + originatingTransactionId: string | undefined, +): Promise<CreateRefreshGroupResult> { + // FIXME: Check that involved exchanges are reasonably up-to-date. + // Otherwise, error out. + + const refreshGroupId = encodeCrock(getRandomBytes(32)); + + const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs); + + const estimatedOutputPerCoin = outInfo.outputPerCoin; + + await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId); + + const refreshGroup: RefreshGroupRecord = { + operationStatus: RefreshOperationStatus.Pending, + currency, + timestampFinished: undefined, + statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), + oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), + originatingTransactionId, + reason: refreshReason, + refreshGroupId, + inputPerCoin: oldCoinPubs.map((x) => x.amount), + expectedOutputPerCoin: estimatedOutputPerCoin.map((x) => + Amounts.stringify(x), + ), + infoPerExchange: outInfo.perExchangeInfo, + timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), + }; + + if (oldCoinPubs.length == 0) { + logger.warn("created refresh group with zero coins"); + refreshGroup.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + refreshGroup.operationStatus = RefreshOperationStatus.Finished; + } + + for (let i = 0; i < oldCoinPubs.length; i++) { + await initRefreshSession(wex, tx, refreshGroup, i); + } + + await tx.refreshGroups.put(refreshGroup); + + const newTxState = computeRefreshTransactionState(refreshGroup); + + logger.trace(`created refresh group ${refreshGroupId}`); + + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + + // Shepherd the task. + // If the current transaction fails to commit the refresh + // group to the DB, the shepherd will give up. + wex.taskScheduler.startShepherdTask(ctx.taskId); + + return { + refreshGroupId, + notifications: [ + { + type: NotificationType.TransactionStateTransition, + transactionId: ctx.transactionId, + oldTxState: { + major: TransactionMajorState.None, + }, + newTxState, + }, + ], + }; +} + +export function computeRefreshTransactionState( + rg: RefreshGroupRecord, +): TransactionState { + switch (rg.operationStatus) { + case RefreshOperationStatus.Finished: + return { + major: TransactionMajorState.Done, + }; + case RefreshOperationStatus.Failed: + return { + major: TransactionMajorState.Failed, + }; + case RefreshOperationStatus.Pending: + return { + major: TransactionMajorState.Pending, + }; + case RefreshOperationStatus.Suspended: + return { + major: TransactionMajorState.Suspended, + }; + } +} + +export function computeRefreshTransactionActions( + rg: RefreshGroupRecord, +): TransactionAction[] { + switch (rg.operationStatus) { + case RefreshOperationStatus.Finished: + return [TransactionAction.Delete]; + case RefreshOperationStatus.Failed: + return [TransactionAction.Delete]; + case RefreshOperationStatus.Pending: + return [ + TransactionAction.Retry, + TransactionAction.Suspend, + TransactionAction.Fail, + ]; + case RefreshOperationStatus.Suspended: + return [TransactionAction.Resume, TransactionAction.Fail]; + } +} + +export function getRefreshesForTransaction( + wex: WalletExecutionContext, + transactionId: string, +): Promise<string[]> { + return wex.db.runReadOnlyTx({ storeNames: ["refreshGroups"] }, async (tx) => { + const groups = + await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll( + transactionId, + ); + return groups.map((x) => + constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId: x.refreshGroupId, + }), + ); + }); +} + +export interface ForceRefreshResult { + refreshGroupId: string; +} + +export async function forceRefresh( + wex: WalletExecutionContext, + req: ForceRefreshRequest, +): Promise<ForceRefreshResult> { + if (req.refreshCoinSpecs.length == 0) { + throw Error("refusing to create empty refresh group"); + } + const res = await wex.db.runReadWriteTx( + { + storeNames: [ + "refreshGroups", + "coinAvailability", + "refreshSessions", + "denominations", + "coins", + ], + }, + async (tx) => { + let coinPubs: CoinRefreshRequest[] = []; + for (const c of req.refreshCoinSpecs) { + const coin = await tx.coins.get(c.coinPub); + if (!coin) { + throw Error(`coin (pubkey ${c}) not found`); + } + const denom = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant(!!denom); + coinPubs.push({ + coinPub: c.coinPub, + amount: c.amount ?? denom.value, + }); + } + return await createRefreshGroup( + wex, + tx, + Amounts.currencyOf(coinPubs[0].amount), + coinPubs, + RefreshReason.Manual, + undefined, + ); + }, + ); + + for (const notif of res.notifications) { + wex.ws.notify(notif); + } + + return { + refreshGroupId: res.refreshGroupId, + }; +} + +/** + * Wait until a refresh operation is final. + */ +export async function waitRefreshFinal( + wex: WalletExecutionContext, + refreshGroupId: string, +): Promise<void> { + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + wex.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const refreshNotifFlag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our refresh. + const cancelNotif = wex.ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + refreshNotifFlag.raise(); + } + }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + refreshNotifFlag.raise(); + }); + + try { + await internalWaitRefreshFinal(ctx, refreshNotifFlag); + } catch (e) { + unregisterOnCancelled(); + cancelNotif(); + } +} + +async function internalWaitRefreshFinal( + ctx: RefreshTransactionContext, + flag: AsyncFlag, +): Promise<void> { + while (true) { + if (ctx.wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } + + // Check if refresh is final + const res = await ctx.wex.db.runReadOnlyTx( + { storeNames: ["refreshGroups", "operationRetries"] }, + async (tx) => { + return { + rg: await tx.refreshGroups.get(ctx.refreshGroupId), + }; + }, + ); + const { rg } = res; + if (!rg) { + // Must've been deleted, we consider that final. + return; + } + switch (rg.operationStatus) { + case RefreshOperationStatus.Failed: + case RefreshOperationStatus.Finished: + // Transaction is final + return; + case RefreshOperationStatus.Pending: + case RefreshOperationStatus.Suspended: + break; + } + + // Wait for the next transition + await flag.wait(); + flag.reset(); + } +} |