summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/refresh.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/refresh.ts')
-rw-r--r--packages/taler-wallet-core/src/refresh.ts1883
1 files changed, 1883 insertions, 0 deletions
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
new file mode 100644
index 000000000..7800967e6
--- /dev/null
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -0,0 +1,1883 @@
+/*
+ This file is part of GNU Taler
+ (C) 2019-2024 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * @fileoverview
+ * Implementation of the refresh transaction.
+ */
+
+/**
+ * Imports.
+ */
+import {
+ AgeCommitment,
+ AgeRestriction,
+ AmountJson,
+ Amounts,
+ amountToPretty,
+ assertUnreachable,
+ AsyncFlag,
+ checkDbInvariant,
+ codecForCoinHistoryResponse,
+ codecForExchangeMeltResponse,
+ codecForExchangeRevealResponse,
+ CoinPublicKeyString,
+ CoinRefreshRequest,
+ CoinStatus,
+ DenominationInfo,
+ DenomKeyType,
+ Duration,
+ encodeCrock,
+ ExchangeMeltRequest,
+ ExchangeProtocolVersion,
+ ExchangeRefreshRevealRequest,
+ fnutil,
+ ForceRefreshRequest,
+ getErrorDetailFromException,
+ getRandomBytes,
+ HashCodeString,
+ HttpStatusCode,
+ j2s,
+ Logger,
+ makeErrorDetail,
+ NotificationType,
+ RefreshReason,
+ TalerError,
+ TalerErrorCode,
+ TalerErrorDetail,
+ TalerPreciseTimestamp,
+ TransactionAction,
+ TransactionIdStr,
+ TransactionMajorState,
+ TransactionState,
+ TransactionType,
+ URL,
+ WalletNotification,
+} from "@gnu-taler/taler-util";
+import {
+ readSuccessResponseJsonOrThrow,
+ readTalerErrorResponse,
+ throwUnexpectedRequestError,
+} from "@gnu-taler/taler-util/http";
+import {
+ constructTaskIdentifier,
+ makeCoinsVisible,
+ PendingTaskType,
+ TaskIdStr,
+ TaskRunResult,
+ TaskRunResultType,
+ TombstoneTag,
+ TransactionContext,
+ TransitionResult,
+ TransitionResultType,
+} from "./common.js";
+import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
+import {
+ DerivedRefreshSession,
+ RefreshNewDenomInfo,
+} from "./crypto/cryptoTypes.js";
+import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
+import {
+ CoinAvailabilityRecord,
+ CoinRecord,
+ CoinSourceType,
+ DenominationRecord,
+ RefreshCoinStatus,
+ RefreshGroupPerExchangeInfo,
+ RefreshGroupRecord,
+ RefreshOperationStatus,
+ RefreshSessionRecord,
+ timestampPreciseToDb,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
+ WalletDbStoresArr,
+} from "./db.js";
+import { selectWithdrawalDenominations } from "./denomSelection.js";
+import {
+ constructTransactionIdentifier,
+ notifyTransition,
+ TransitionInfo,
+} from "./transactions.js";
+import {
+ EXCHANGE_COINS_LOCK,
+ getDenomInfo,
+ WalletExecutionContext,
+} from "./wallet.js";
+import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
+
+const logger = new Logger("refresh.ts");
+
+/**
+ * Update the materialized refresh transaction based
+ * on the refresh group record.
+ */
+async function updateRefreshTransaction(
+ ctx: RefreshTransactionContext,
+ tx: WalletDbReadWriteTransaction<
+ [
+ "refreshGroups",
+ "transactions",
+ "operationRetries",
+ "exchanges",
+ "exchangeDetails",
+ ]
+ >,
+): Promise<void> {}
+
+export class RefreshTransactionContext implements TransactionContext {
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskIdStr;
+
+ constructor(
+ public wex: WalletExecutionContext,
+ public refreshGroupId: string,
+ ) {
+ this.transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId,
+ });
+ this.taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId,
+ });
+ }
+
+ /**
+ * Transition a withdrawal transaction.
+ * Extra object stores may be accessed during the transition.
+ */
+ async transition<StoreNameArray extends WalletDbStoresArr = []>(
+ opts: { extraStores?: StoreNameArray; transactionLabel?: string },
+ f: (
+ rec: RefreshGroupRecord | undefined,
+ tx: WalletDbReadWriteTransaction<
+ [
+ "refreshGroups",
+ "transactions",
+ "operationRetries",
+ "exchanges",
+ "exchangeDetails",
+ ...StoreNameArray,
+ ]
+ >,
+ ) => Promise<TransitionResult<RefreshGroupRecord>>,
+ ): Promise<TransitionInfo | undefined> {
+ const baseStores = [
+ "refreshGroups" as const,
+ "transactions" as const,
+ "operationRetries" as const,
+ "exchanges" as const,
+ "exchangeDetails" as const,
+ ];
+ let stores = opts.extraStores
+ ? [...baseStores, ...opts.extraStores]
+ : baseStores;
+ const transitionInfo = await this.wex.db.runReadWriteTx(
+ { storeNames: stores },
+ async (tx) => {
+ const wgRec = await tx.refreshGroups.get(this.refreshGroupId);
+ let oldTxState: TransactionState;
+ if (wgRec) {
+ oldTxState = computeRefreshTransactionState(wgRec);
+ } else {
+ oldTxState = {
+ major: TransactionMajorState.None,
+ };
+ }
+ const res = await f(wgRec, tx);
+ switch (res.type) {
+ case TransitionResultType.Transition: {
+ await tx.refreshGroups.put(res.rec);
+ await updateRefreshTransaction(this, tx);
+ const newTxState = computeRefreshTransactionState(res.rec);
+ return {
+ oldTxState,
+ newTxState,
+ };
+ }
+ case TransitionResultType.Delete:
+ await tx.refreshGroups.delete(this.refreshGroupId);
+ await updateRefreshTransaction(this, tx);
+ return {
+ oldTxState,
+ newTxState: {
+ major: TransactionMajorState.None,
+ },
+ };
+ default:
+ return undefined;
+ }
+ },
+ );
+ notifyTransition(this.wex, this.transactionId, transitionInfo);
+ return transitionInfo;
+ }
+
+ async deleteTransaction(): Promise<void> {
+ await this.transition(
+ {
+ extraStores: ["tombstones"],
+ },
+ async (rec, tx) => {
+ if (!rec) {
+ return TransitionResult.stay();
+ }
+ await tx.tombstones.put({
+ id: TombstoneTag.DeleteRefreshGroup + ":" + this.refreshGroupId,
+ });
+ return TransitionResult.delete();
+ },
+ );
+ }
+
+ async suspendTransaction(): Promise<void> {
+ await this.transition({}, async (rec, tx) => {
+ if (!rec) {
+ return TransitionResult.stay();
+ }
+ switch (rec.operationStatus) {
+ case RefreshOperationStatus.Finished:
+ case RefreshOperationStatus.Suspended:
+ case RefreshOperationStatus.Failed:
+ return TransitionResult.stay();
+ case RefreshOperationStatus.Pending: {
+ rec.operationStatus = RefreshOperationStatus.Suspended;
+ return TransitionResult.transition(rec);
+ }
+ default:
+ assertUnreachable(rec.operationStatus);
+ }
+ });
+ }
+
+ async abortTransaction(): Promise<void> {
+ // Refresh transactions only support fail, not abort.
+ throw new Error("refresh transactions cannot be aborted");
+ }
+
+ async resumeTransaction(): Promise<void> {
+ await this.transition({}, async (rec, tx) => {
+ if (!rec) {
+ return TransitionResult.stay();
+ }
+ switch (rec.operationStatus) {
+ case RefreshOperationStatus.Finished:
+ case RefreshOperationStatus.Failed:
+ case RefreshOperationStatus.Pending:
+ return TransitionResult.stay();
+ case RefreshOperationStatus.Suspended: {
+ rec.operationStatus = RefreshOperationStatus.Pending;
+ return TransitionResult.transition(rec);
+ }
+ default:
+ assertUnreachable(rec.operationStatus);
+ }
+ });
+ }
+
+ async failTransaction(): Promise<void> {
+ await this.transition({}, async (rec, tx) => {
+ if (!rec) {
+ return TransitionResult.stay();
+ }
+ switch (rec.operationStatus) {
+ case RefreshOperationStatus.Finished:
+ case RefreshOperationStatus.Failed:
+ return TransitionResult.stay();
+ case RefreshOperationStatus.Pending:
+ case RefreshOperationStatus.Suspended: {
+ rec.operationStatus = RefreshOperationStatus.Failed;
+ return TransitionResult.transition(rec);
+ }
+ default:
+ assertUnreachable(rec.operationStatus);
+ }
+ });
+ }
+}
+
+export async function getTotalRefreshCost(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadOnlyTransaction<["denominations"]>,
+ refreshedDenom: DenominationInfo,
+ amountLeft: AmountJson,
+): Promise<AmountJson> {
+ const cacheKey = `denom=${refreshedDenom.exchangeBaseUrl}/${
+ refreshedDenom.denomPubHash
+ };left=${Amounts.stringify(amountLeft)}`;
+ const cacheRes = wex.ws.refreshCostCache.get(cacheKey);
+ if (cacheRes) {
+ return cacheRes;
+ }
+ const allDenoms = await getCandidateWithdrawalDenomsTx(
+ wex,
+ tx,
+ refreshedDenom.exchangeBaseUrl,
+ Amounts.currencyOf(amountLeft),
+ );
+ const res = getTotalRefreshCostInternal(
+ allDenoms,
+ refreshedDenom,
+ amountLeft,
+ );
+ wex.ws.refreshCostCache.put(cacheKey, res);
+ return res;
+}
+
+/**
+ * Get the amount that we lose when refreshing a coin of the given denomination
+ * with a certain amount left.
+ *
+ * If the amount left is zero, then the refresh cost
+ * is also considered to be zero. If a refresh isn't possible (e.g. due to lack of
+ * the right denominations), then the cost is the full amount left.
+ *
+ * Considers refresh fees, withdrawal fees after refresh and amounts too small
+ * to refresh.
+ */
+export function getTotalRefreshCostInternal(
+ denoms: DenominationRecord[],
+ refreshedDenom: DenominationInfo,
+ amountLeft: AmountJson,
+): AmountJson {
+ const withdrawAmount = Amounts.sub(
+ amountLeft,
+ refreshedDenom.feeRefresh,
+ ).amount;
+ const denomMap = Object.fromEntries(denoms.map((x) => [x.denomPubHash, x]));
+ const withdrawDenoms = selectWithdrawalDenominations(
+ withdrawAmount,
+ denoms,
+ false,
+ );
+ const resultingAmount = Amounts.add(
+ Amounts.zeroOfCurrency(withdrawAmount.currency),
+ ...withdrawDenoms.selectedDenoms.map(
+ (d) => Amounts.mult(denomMap[d.denomPubHash].value, d.count).amount,
+ ),
+ ).amount;
+ const totalCost = Amounts.sub(amountLeft, resultingAmount).amount;
+ logger.trace(
+ `total refresh cost for ${amountToPretty(amountLeft)} is ${amountToPretty(
+ totalCost,
+ )}`,
+ );
+ return totalCost;
+}
+
+async function getCoinAvailabilityForDenom(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["coins", "coinAvailability", "denominations"]
+ >,
+ denom: DenominationInfo,
+ ageRestriction: number,
+): Promise<CoinAvailabilityRecord> {
+ checkDbInvariant(!!denom);
+ let car = await tx.coinAvailability.get([
+ denom.exchangeBaseUrl,
+ denom.denomPubHash,
+ ageRestriction,
+ ]);
+ if (!car) {
+ car = {
+ maxAge: ageRestriction,
+ value: denom.value,
+ currency: Amounts.currencyOf(denom.value),
+ denomPubHash: denom.denomPubHash,
+ exchangeBaseUrl: denom.exchangeBaseUrl,
+ freshCoinCount: 0,
+ visibleCoinCount: 0,
+ };
+ }
+ return car;
+}
+
+/**
+ * Create a refresh session for one particular coin inside a refresh group.
+ */
+async function initRefreshSession(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["refreshSessions", "coinAvailability", "coins", "denominations"]
+ >,
+ refreshGroup: RefreshGroupRecord,
+ coinIndex: number,
+): Promise<void> {
+ const refreshGroupId = refreshGroup.refreshGroupId;
+ logger.trace(
+ `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
+ );
+ const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
+ const oldCoin = await tx.coins.get(oldCoinPub);
+ if (!oldCoin) {
+ throw Error("Can't refresh, coin not found");
+ }
+
+ const exchangeBaseUrl = oldCoin.exchangeBaseUrl;
+
+ const sessionSecretSeed = encodeCrock(getRandomBytes(64));
+
+ const oldDenom = await getDenomInfo(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ );
+
+ if (!oldDenom) {
+ throw Error("db inconsistent: denomination for coin not found");
+ }
+
+ const currency = refreshGroup.currency;
+
+ const availableDenoms = await getCandidateWithdrawalDenomsTx(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ currency,
+ );
+
+ const availableAmount = Amounts.sub(
+ refreshGroup.inputPerCoin[coinIndex],
+ oldDenom.feeRefresh,
+ ).amount;
+
+ const newCoinDenoms = selectWithdrawalDenominations(
+ availableAmount,
+ availableDenoms,
+ wex.ws.config.testing.denomselAllowLate,
+ );
+
+ if (newCoinDenoms.selectedDenoms.length === 0) {
+ logger.trace(
+ `not refreshing, available amount ${amountToPretty(
+ availableAmount,
+ )} too small`,
+ );
+ refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
+ return;
+ }
+
+ for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) {
+ const dph = newCoinDenoms.selectedDenoms[i].denomPubHash;
+ const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph);
+ if (!denom) {
+ logger.error(`denom ${dph} not in DB`);
+ continue;
+ }
+ const car = await getCoinAvailabilityForDenom(
+ wex,
+ tx,
+ denom,
+ oldCoin.maxAge,
+ );
+ car.pendingRefreshOutputCount =
+ (car.pendingRefreshOutputCount ?? 0) +
+ newCoinDenoms.selectedDenoms[i].count;
+ await tx.coinAvailability.put(car);
+ }
+
+ const newSession: RefreshSessionRecord = {
+ coinIndex,
+ refreshGroupId,
+ norevealIndex: undefined,
+ sessionSecretSeed: sessionSecretSeed,
+ newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
+ count: x.count,
+ denomPubHash: x.denomPubHash,
+ })),
+ amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
+ };
+ await tx.refreshSessions.put(newSession);
+}
+
+/**
+ * Uninitialize a refresh session.
+ *
+ * Adjust the coin availability of involved coins.
+ */
+async function destroyRefreshSession(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["denominations", "coinAvailability", "coins"]
+ >,
+ refreshGroup: RefreshGroupRecord,
+ refreshSession: RefreshSessionRecord,
+): Promise<void> {
+ for (let i = 0; i < refreshSession.newDenoms.length; i++) {
+ const oldCoin = await tx.coins.get(
+ refreshGroup.oldCoinPubs[refreshSession.coinIndex],
+ );
+ if (!oldCoin) {
+ continue;
+ }
+ const dph = refreshSession.newDenoms[i].denomPubHash;
+ const denom = await getDenomInfo(wex, tx, oldCoin.exchangeBaseUrl, dph);
+ if (!denom) {
+ logger.error(`denom ${dph} not in DB`);
+ continue;
+ }
+ const car = await getCoinAvailabilityForDenom(
+ wex,
+ tx,
+ denom,
+ oldCoin.maxAge,
+ );
+ checkDbInvariant(car.pendingRefreshOutputCount != null);
+ car.pendingRefreshOutputCount =
+ car.pendingRefreshOutputCount - refreshSession.newDenoms[i].count;
+ await tx.coinAvailability.put(car);
+ }
+}
+
+function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
+ return Duration.fromSpec({
+ seconds: 5,
+ });
+}
+
+/**
+ * Run the melt step of a refresh session.
+ *
+ * If the melt step succeeds or fails permanently,
+ * the status in the refresh group is updated.
+ *
+ * When a transient error occurs, an exception is thrown.
+ */
+async function refreshMelt(
+ wex: WalletExecutionContext,
+ refreshGroupId: string,
+ coinIndex: number,
+): Promise<void> {
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+ const d = await wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "coins",
+ "denominations",
+ ],
+ },
+ async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = await tx.refreshSessions.get([
+ refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ return;
+ }
+ if (refreshSession.norevealIndex !== undefined) {
+ return;
+ }
+
+ const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
+ checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
+ const oldDenom = await getDenomInfo(
+ wex,
+ tx,
+ oldCoin.exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ );
+ checkDbInvariant(
+ !!oldDenom,
+ "denomination for melted coin doesn't exist",
+ );
+
+ const newCoinDenoms: RefreshNewDenomInfo[] = [];
+
+ for (const dh of refreshSession.newDenoms) {
+ const newDenom = await getDenomInfo(
+ wex,
+ tx,
+ oldCoin.exchangeBaseUrl,
+ dh.denomPubHash,
+ );
+ checkDbInvariant(
+ !!newDenom,
+ "new denomination for refresh not in database",
+ );
+ newCoinDenoms.push({
+ count: dh.count,
+ denomPub: newDenom.denomPub,
+ denomPubHash: newDenom.denomPubHash,
+ feeWithdraw: newDenom.feeWithdraw,
+ value: Amounts.stringify(newDenom.value),
+ });
+ }
+ return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession };
+ },
+ );
+
+ if (!d) {
+ return;
+ }
+
+ const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d;
+
+ let exchangeProtocolVersion: ExchangeProtocolVersion;
+ switch (d.oldDenom.denomPub.cipher) {
+ case DenomKeyType.Rsa: {
+ exchangeProtocolVersion = ExchangeProtocolVersion.V12;
+ break;
+ }
+ default:
+ throw Error("unsupported key type");
+ }
+
+ const derived = await wex.cryptoApi.deriveRefreshSession({
+ exchangeProtocolVersion,
+ kappa: 3,
+ meltCoinDenomPubHash: oldCoin.denomPubHash,
+ meltCoinPriv: oldCoin.coinPriv,
+ meltCoinPub: oldCoin.coinPub,
+ feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh),
+ meltCoinMaxAge: oldCoin.maxAge,
+ meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof,
+ newCoinDenoms,
+ sessionSecretSeed: refreshSession.sessionSecretSeed,
+ });
+
+ const reqUrl = new URL(
+ `coins/${oldCoin.coinPub}/melt`,
+ oldCoin.exchangeBaseUrl,
+ );
+
+ let maybeAch: HashCodeString | undefined;
+ if (oldCoin.ageCommitmentProof) {
+ maybeAch = AgeRestriction.hashCommitment(
+ oldCoin.ageCommitmentProof.commitment,
+ );
+ }
+
+ const meltReqBody: ExchangeMeltRequest = {
+ coin_pub: oldCoin.coinPub,
+ confirm_sig: derived.confirmSig,
+ denom_pub_hash: oldCoin.denomPubHash,
+ denom_sig: oldCoin.denomSig,
+ rc: derived.hash,
+ value_with_fee: Amounts.stringify(derived.meltValueWithFee),
+ age_commitment_hash: maybeAch,
+ };
+
+ const resp = await wex.ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await wex.http.fetch(reqUrl.href, {
+ method: "POST",
+ body: meltReqBody,
+ timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+
+ switch (resp.status) {
+ case HttpStatusCode.NotFound: {
+ const errDetail = await readTalerErrorResponse(resp);
+ await handleRefreshMeltNotFound(ctx, coinIndex, errDetail);
+ return;
+ }
+ case HttpStatusCode.Gone: {
+ const errDetail = await readTalerErrorResponse(resp);
+ await handleRefreshMeltGone(ctx, coinIndex, errDetail);
+ return;
+ }
+ case HttpStatusCode.Conflict: {
+ const errDetail = await readTalerErrorResponse(resp);
+ await handleRefreshMeltConflict(
+ ctx,
+ coinIndex,
+ errDetail,
+ derived,
+ oldCoin,
+ );
+ return;
+ }
+ case HttpStatusCode.Ok:
+ break;
+ default: {
+ const errDetail = await readTalerErrorResponse(resp);
+ throwUnexpectedRequestError(resp, errDetail);
+ }
+ }
+
+ const meltResponse = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForExchangeMeltResponse(),
+ );
+
+ const norevealIndex = meltResponse.noreveal_index;
+
+ refreshSession.norevealIndex = norevealIndex;
+
+ await wex.db.runReadWriteTx(
+ { storeNames: ["refreshGroups", "refreshSessions"] },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
+ if (!rs) {
+ return;
+ }
+ if (rs.norevealIndex !== undefined) {
+ return;
+ }
+ rs.norevealIndex = norevealIndex;
+ await tx.refreshSessions.put(rs);
+ },
+ );
+}
+
+async function handleRefreshMeltGone(
+ ctx: RefreshTransactionContext,
+ coinIndex: number,
+ errDetails: TalerErrorDetail,
+): Promise<void> {
+ // const expiredMsg = codecForDenominationExpiredMessage().decode(errDetails);
+
+ // FIXME: Validate signature.
+
+ await ctx.wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "coins",
+ "denominations",
+ "coinAvailability",
+ ],
+ },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
+ return;
+ }
+ rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
+ const refreshSession = await tx.refreshSessions.get([
+ ctx.refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ throw Error("db invariant failed: missing refresh session in database");
+ }
+ refreshSession.lastError = errDetails;
+ await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
+ await tx.refreshGroups.put(rg);
+ await tx.refreshSessions.put(refreshSession);
+ },
+ );
+}
+
+async function handleRefreshMeltConflict(
+ ctx: RefreshTransactionContext,
+ coinIndex: number,
+ errDetails: TalerErrorDetail,
+ derived: DerivedRefreshSession,
+ oldCoin: CoinRecord,
+): Promise<void> {
+ // Just log for better diagnostics here, error status
+ // will be handled later.
+ logger.error(
+ `melt request for ${Amounts.stringify(
+ derived.meltValueWithFee,
+ )} failed in refresh group ${ctx.refreshGroupId} due to conflict`,
+ );
+
+ const historySig = await ctx.wex.cryptoApi.signCoinHistoryRequest({
+ coinPriv: oldCoin.coinPriv,
+ coinPub: oldCoin.coinPub,
+ startOffset: 0,
+ });
+
+ const historyUrl = new URL(
+ `coins/${oldCoin.coinPub}/history`,
+ oldCoin.exchangeBaseUrl,
+ );
+
+ const historyResp = await ctx.wex.http.fetch(historyUrl.href, {
+ method: "GET",
+ headers: {
+ "Taler-Coin-History-Signature": historySig.sig,
+ },
+ cancellationToken: ctx.wex.cancellationToken,
+ });
+
+ const historyJson = await readSuccessResponseJsonOrThrow(
+ historyResp,
+ codecForCoinHistoryResponse(),
+ );
+ logger.info(`coin history: ${j2s(historyJson)}`);
+
+ // FIXME: If response seems wrong, report to auditor (in the future!);
+
+ await ctx.wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "denominations",
+ "coins",
+ "coinAvailability",
+ ],
+ },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
+ return;
+ }
+ if (Amounts.isZero(historyJson.balance)) {
+ rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
+ const refreshSession = await tx.refreshSessions.get([
+ ctx.refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ throw Error(
+ "db invariant failed: missing refresh session in database",
+ );
+ }
+ refreshSession.lastError = errDetails;
+ await tx.refreshGroups.put(rg);
+ await tx.refreshSessions.put(refreshSession);
+ } else {
+ // Try again with new denoms!
+ rg.inputPerCoin[coinIndex] = historyJson.balance;
+ const refreshSession = await tx.refreshSessions.get([
+ ctx.refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ throw Error(
+ "db invariant failed: missing refresh session in database",
+ );
+ }
+ await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
+ await tx.refreshSessions.delete([ctx.refreshGroupId, coinIndex]);
+ await initRefreshSession(ctx.wex, tx, rg, coinIndex);
+ }
+ },
+ );
+}
+
+async function handleRefreshMeltNotFound(
+ ctx: RefreshTransactionContext,
+ coinIndex: number,
+ errDetails: TalerErrorDetail,
+): Promise<void> {
+ // FIXME: Validate the exchange's error response
+ await ctx.wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "coins",
+ "denominations",
+ "coinAvailability",
+ ],
+ },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
+ return;
+ }
+ rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
+ const refreshSession = await tx.refreshSessions.get([
+ ctx.refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ throw Error("db invariant failed: missing refresh session in database");
+ }
+ await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
+ refreshSession.lastError = errDetails;
+ await tx.refreshGroups.put(rg);
+ await tx.refreshSessions.put(refreshSession);
+ },
+ );
+}
+
+export async function assembleRefreshRevealRequest(args: {
+ cryptoApi: TalerCryptoInterface;
+ derived: DerivedRefreshSession;
+ norevealIndex: number;
+ oldCoinPub: CoinPublicKeyString;
+ oldCoinPriv: string;
+ newDenoms: {
+ denomPubHash: string;
+ count: number;
+ }[];
+ oldAgeCommitment?: AgeCommitment;
+}): Promise<ExchangeRefreshRevealRequest> {
+ const {
+ derived,
+ norevealIndex,
+ cryptoApi,
+ oldCoinPriv,
+ oldCoinPub,
+ newDenoms,
+ } = args;
+ const privs = Array.from(derived.transferPrivs);
+ privs.splice(norevealIndex, 1);
+
+ const planchets = derived.planchetsForGammas[norevealIndex];
+ if (!planchets) {
+ throw Error("refresh index error");
+ }
+
+ const newDenomsFlat: string[] = [];
+ const linkSigs: string[] = [];
+
+ for (let i = 0; i < newDenoms.length; i++) {
+ const dsel = newDenoms[i];
+ for (let j = 0; j < dsel.count; j++) {
+ const newCoinIndex = linkSigs.length;
+ const linkSig = await cryptoApi.signCoinLink({
+ coinEv: planchets[newCoinIndex].coinEv,
+ newDenomHash: dsel.denomPubHash,
+ oldCoinPriv: oldCoinPriv,
+ oldCoinPub: oldCoinPub,
+ transferPub: derived.transferPubs[norevealIndex],
+ });
+ linkSigs.push(linkSig.sig);
+ newDenomsFlat.push(dsel.denomPubHash);
+ }
+ }
+
+ const req: ExchangeRefreshRevealRequest = {
+ coin_evs: planchets.map((x) => x.coinEv),
+ new_denoms_h: newDenomsFlat,
+ transfer_privs: privs,
+ transfer_pub: derived.transferPubs[norevealIndex],
+ link_sigs: linkSigs,
+ old_age_commitment: args.oldAgeCommitment?.publicKeys,
+ };
+ return req;
+}
+
+async function refreshReveal(
+ wex: WalletExecutionContext,
+ refreshGroupId: string,
+ coinIndex: number,
+): Promise<void> {
+ logger.trace(
+ `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
+ );
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+ const d = await wex.db.runReadOnlyTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "coins",
+ "denominations",
+ ],
+ },
+ async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = await tx.refreshSessions.get([
+ refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ return;
+ }
+ const norevealIndex = refreshSession.norevealIndex;
+ if (norevealIndex === undefined) {
+ throw Error("can't reveal without melting first");
+ }
+
+ const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
+ checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
+ const oldDenom = await getDenomInfo(
+ wex,
+ tx,
+ oldCoin.exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ );
+ checkDbInvariant(
+ !!oldDenom,
+ "denomination for melted coin doesn't exist",
+ );
+
+ const newCoinDenoms: RefreshNewDenomInfo[] = [];
+
+ for (const dh of refreshSession.newDenoms) {
+ const newDenom = await getDenomInfo(
+ wex,
+ tx,
+ oldCoin.exchangeBaseUrl,
+ dh.denomPubHash,
+ );
+ checkDbInvariant(
+ !!newDenom,
+ "new denomination for refresh not in database",
+ );
+ newCoinDenoms.push({
+ count: dh.count,
+ denomPub: newDenom.denomPub,
+ denomPubHash: newDenom.denomPubHash,
+ feeWithdraw: newDenom.feeWithdraw,
+ value: Amounts.stringify(newDenom.value),
+ });
+ }
+ return {
+ oldCoin,
+ oldDenom,
+ newCoinDenoms,
+ refreshSession,
+ refreshGroup,
+ norevealIndex,
+ };
+ },
+ );
+
+ if (!d) {
+ return;
+ }
+
+ const {
+ oldCoin,
+ oldDenom,
+ newCoinDenoms,
+ refreshSession,
+ refreshGroup,
+ norevealIndex,
+ } = d;
+
+ let exchangeProtocolVersion: ExchangeProtocolVersion;
+ switch (d.oldDenom.denomPub.cipher) {
+ case DenomKeyType.Rsa: {
+ exchangeProtocolVersion = ExchangeProtocolVersion.V12;
+ break;
+ }
+ default:
+ throw Error("unsupported key type");
+ }
+
+ const derived = await wex.cryptoApi.deriveRefreshSession({
+ exchangeProtocolVersion,
+ kappa: 3,
+ meltCoinDenomPubHash: oldCoin.denomPubHash,
+ meltCoinPriv: oldCoin.coinPriv,
+ meltCoinPub: oldCoin.coinPub,
+ feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh),
+ newCoinDenoms,
+ meltCoinMaxAge: oldCoin.maxAge,
+ meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof,
+ sessionSecretSeed: refreshSession.sessionSecretSeed,
+ });
+
+ const reqUrl = new URL(
+ `refreshes/${derived.hash}/reveal`,
+ oldCoin.exchangeBaseUrl,
+ );
+
+ const req = await assembleRefreshRevealRequest({
+ cryptoApi: wex.cryptoApi,
+ derived,
+ newDenoms: newCoinDenoms,
+ norevealIndex: norevealIndex,
+ oldCoinPriv: oldCoin.coinPriv,
+ oldCoinPub: oldCoin.coinPub,
+ oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment,
+ });
+
+ const resp = await wex.ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await wex.http.fetch(reqUrl.href, {
+ body: req,
+ method: "POST",
+ timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+
+ switch (resp.status) {
+ case HttpStatusCode.Ok:
+ break;
+ case HttpStatusCode.Conflict:
+ case HttpStatusCode.Gone: {
+ const errDetail = await readTalerErrorResponse(resp);
+ await handleRefreshRevealError(ctx, coinIndex, errDetail);
+ return;
+ }
+ default: {
+ const errDetail = await readTalerErrorResponse(resp);
+ throwUnexpectedRequestError(resp, errDetail);
+ }
+ }
+
+ const reveal = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForExchangeRevealResponse(),
+ );
+
+ const coins: CoinRecord[] = [];
+
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId,
+ });
+
+ for (let i = 0; i < refreshSession.newDenoms.length; i++) {
+ const ncd = newCoinDenoms[i];
+ for (let j = 0; j < refreshSession.newDenoms[i].count; j++) {
+ const newCoinIndex = coins.length;
+ const pc = derived.planchetsForGammas[norevealIndex][newCoinIndex];
+ if (ncd.denomPub.cipher !== DenomKeyType.Rsa) {
+ throw Error("cipher unsupported");
+ }
+ const evSig = reveal.ev_sigs[newCoinIndex].ev_sig;
+ const denomSig = await wex.cryptoApi.unblindDenominationSignature({
+ planchet: {
+ blindingKey: pc.blindingKey,
+ denomPub: ncd.denomPub,
+ },
+ evSig,
+ });
+ const coin: CoinRecord = {
+ blindingKey: pc.blindingKey,
+ coinPriv: pc.coinPriv,
+ coinPub: pc.coinPub,
+ denomPubHash: ncd.denomPubHash,
+ denomSig,
+ exchangeBaseUrl: oldCoin.exchangeBaseUrl,
+ status: CoinStatus.Fresh,
+ coinSource: {
+ type: CoinSourceType.Refresh,
+ refreshGroupId,
+ oldCoinPub: refreshGroup.oldCoinPubs[coinIndex],
+ },
+ sourceTransactionId: transactionId,
+ coinEvHash: pc.coinEvHash,
+ maxAge: pc.maxAge,
+ ageCommitmentProof: pc.ageCommitmentProof,
+ spendAllocation: undefined,
+ };
+
+ coins.push(coin);
+ }
+ }
+
+ await wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "coins",
+ "denominations",
+ "coinAvailability",
+ "refreshGroups",
+ "refreshSessions",
+ ],
+ },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (!rg) {
+ logger.warn("no refresh session found");
+ return;
+ }
+ if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
+ return;
+ }
+ const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
+ if (!rs) {
+ return;
+ }
+ rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
+ for (const coin of coins) {
+ const existingCoin = await tx.coins.get(coin.coinPub);
+ if (existingCoin) {
+ continue;
+ }
+ await tx.coins.add(coin);
+ const denomInfo = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(!!denomInfo);
+ const car = await getCoinAvailabilityForDenom(
+ wex,
+ tx,
+ denomInfo,
+ coin.maxAge,
+ );
+ checkDbInvariant(
+ car.pendingRefreshOutputCount != null &&
+ car.pendingRefreshOutputCount > 0,
+ );
+ car.pendingRefreshOutputCount--;
+ car.freshCoinCount++;
+ await tx.coinAvailability.put(car);
+ }
+ await tx.refreshGroups.put(rg);
+ },
+ );
+ logger.trace("refresh finished (end of reveal)");
+}
+
+async function handleRefreshRevealError(
+ ctx: RefreshTransactionContext,
+ coinIndex: number,
+ errDetails: TalerErrorDetail,
+): Promise<void> {
+ await ctx.wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "refreshSessions",
+ "coins",
+ "denominations",
+ "coinAvailability",
+ ],
+ },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
+ return;
+ }
+ rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
+ const refreshSession = await tx.refreshSessions.get([
+ ctx.refreshGroupId,
+ coinIndex,
+ ]);
+ if (!refreshSession) {
+ throw Error("db invariant failed: missing refresh session in database");
+ }
+ refreshSession.lastError = errDetails;
+ await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
+ await tx.refreshGroups.put(rg);
+ await tx.refreshSessions.put(refreshSession);
+ },
+ );
+}
+
+export async function processRefreshGroup(
+ wex: WalletExecutionContext,
+ refreshGroupId: string,
+): Promise<TaskRunResult> {
+ logger.trace(`processing refresh group ${refreshGroupId}`);
+
+ const refreshGroup = await wex.db.runReadOnlyTx(
+ { storeNames: ["refreshGroups"] },
+ async (tx) => tx.refreshGroups.get(refreshGroupId),
+ );
+ if (!refreshGroup) {
+ return TaskRunResult.finished();
+ }
+ if (refreshGroup.timestampFinished) {
+ return TaskRunResult.finished();
+ }
+
+ if (
+ wex.ws.config.testing.devModeActive &&
+ wex.ws.devExperimentState.blockRefreshes
+ ) {
+ throw Error("refresh blocked");
+ }
+
+ // Process refresh sessions of the group in parallel.
+ logger.trace(
+ `processing refresh sessions for ${refreshGroup.oldCoinPubs.length} old coins`,
+ );
+ let errors: TalerErrorDetail[] = [];
+ let inShutdown = false;
+ const ps = refreshGroup.oldCoinPubs.map((x, i) =>
+ processRefreshSession(wex, refreshGroupId, i).catch((x) => {
+ if (x instanceof CryptoApiStoppedError) {
+ inShutdown = true;
+ logger.info(
+ "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
+ );
+ return;
+ }
+ if (x instanceof TalerError) {
+ logger.warn("process refresh session got exception (TalerError)");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ logger.warn(`error detail: ${j2s(x.errorDetail)}`);
+ } else {
+ logger.warn("process refresh session got exception");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ }
+ errors.push(getErrorDetailFromException(x));
+ }),
+ );
+ await Promise.all(ps);
+ if (inShutdown) {
+ return TaskRunResult.finished();
+ }
+
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+
+ // We've processed all refresh session and can now update the
+ // status of the whole refresh group.
+
+ const transitionInfo = await wex.db.runReadWriteTx(
+ { storeNames: ["coins", "coinAvailability", "refreshGroups"] },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ switch (rg.operationStatus) {
+ case RefreshOperationStatus.Pending:
+ break;
+ default:
+ return undefined;
+ }
+ const oldTxState = computeRefreshTransactionState(rg);
+ const allFinal = fnutil.all(
+ rg.statusPerCoin,
+ (x) =>
+ x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
+ );
+ const anyFailed = fnutil.any(
+ rg.statusPerCoin,
+ (x) => x === RefreshCoinStatus.Failed,
+ );
+ if (allFinal) {
+ if (anyFailed) {
+ rg.timestampFinished = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ rg.operationStatus = RefreshOperationStatus.Failed;
+ } else {
+ rg.timestampFinished = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ rg.operationStatus = RefreshOperationStatus.Finished;
+ }
+ await makeCoinsVisible(wex, tx, ctx.transactionId);
+ await tx.refreshGroups.put(rg);
+ const newTxState = computeRefreshTransactionState(rg);
+ return {
+ oldTxState,
+ newTxState,
+ };
+ }
+ return undefined;
+ },
+ );
+
+ if (transitionInfo) {
+ notifyTransition(wex, ctx.transactionId, transitionInfo);
+ return TaskRunResult.progress();
+ }
+
+ if (errors.length > 0) {
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: makeErrorDetail(
+ TalerErrorCode.WALLET_REFRESH_GROUP_INCOMPLETE,
+ {
+ numErrors: errors.length,
+ errors: errors.slice(0, 5),
+ },
+ ),
+ };
+ }
+
+ return TaskRunResult.backoff();
+}
+
+async function processRefreshSession(
+ wex: WalletExecutionContext,
+ refreshGroupId: string,
+ coinIndex: number,
+): Promise<void> {
+ logger.trace(
+ `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
+ );
+ let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx(
+ { storeNames: ["refreshGroups", "refreshSessions"] },
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
+ return {
+ refreshGroup: rg,
+ refreshSession: rs,
+ };
+ },
+ );
+ if (!refreshGroup) {
+ return;
+ }
+ if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) {
+ return;
+ }
+ if (!refreshSession) {
+ // No refresh session for that coin.
+ return;
+ }
+ if (refreshSession.norevealIndex === undefined) {
+ await refreshMelt(wex, refreshGroupId, coinIndex);
+ }
+ await refreshReveal(wex, refreshGroupId, coinIndex);
+}
+
+export interface RefreshOutputInfo {
+ outputPerCoin: AmountJson[];
+ perExchangeInfo: Record<string, RefreshGroupPerExchangeInfo>;
+}
+
+export async function calculateRefreshOutput(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadOnlyTransaction<
+ ["denominations", "coins", "refreshGroups", "coinAvailability"]
+ >,
+ currency: string,
+ oldCoinPubs: CoinRefreshRequest[],
+): Promise<RefreshOutputInfo> {
+ const estimatedOutputPerCoin: AmountJson[] = [];
+
+ const denomsPerExchange: Record<string, DenominationRecord[]> = {};
+
+ const infoPerExchange: Record<string, RefreshGroupPerExchangeInfo> = {};
+
+ for (const ocp of oldCoinPubs) {
+ const coin = await tx.coins.get(ocp.coinPub);
+ checkDbInvariant(!!coin, "coin must be in database");
+ const denom = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(
+ !!denom,
+ "denomination for existing coin must be in database",
+ );
+ const refreshAmount = ocp.amount;
+ const cost = await getTotalRefreshCost(
+ wex,
+ tx,
+ denom,
+ Amounts.parseOrThrow(refreshAmount),
+ );
+ const output = Amounts.sub(refreshAmount, cost).amount;
+ let exchInfo = infoPerExchange[coin.exchangeBaseUrl];
+ if (!exchInfo) {
+ infoPerExchange[coin.exchangeBaseUrl] = exchInfo = {
+ outputEffective: Amounts.stringify(Amounts.zeroOfAmount(cost)),
+ };
+ }
+ exchInfo.outputEffective = Amounts.stringify(
+ Amounts.add(exchInfo.outputEffective, output).amount,
+ );
+ estimatedOutputPerCoin.push(output);
+ }
+
+ return {
+ outputPerCoin: estimatedOutputPerCoin,
+ perExchangeInfo: infoPerExchange,
+ };
+}
+
+async function applyRefreshToOldCoins(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["denominations", "coins", "refreshGroups", "coinAvailability"]
+ >,
+ oldCoinPubs: CoinRefreshRequest[],
+ refreshGroupId: string,
+): Promise<void> {
+ for (const ocp of oldCoinPubs) {
+ const coin = await tx.coins.get(ocp.coinPub);
+ checkDbInvariant(!!coin, "coin must be in database");
+ const denom = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(
+ !!denom,
+ "denomination for existing coin must be in database",
+ );
+ switch (coin.status) {
+ case CoinStatus.Dormant:
+ break;
+ case CoinStatus.Fresh: {
+ coin.status = CoinStatus.Dormant;
+ const coinAv = await tx.coinAvailability.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ coin.maxAge,
+ ]);
+ checkDbInvariant(!!coinAv);
+ checkDbInvariant(coinAv.freshCoinCount > 0);
+ coinAv.freshCoinCount--;
+ await tx.coinAvailability.put(coinAv);
+ break;
+ }
+ case CoinStatus.FreshSuspended: {
+ // For suspended coins, we don't have to adjust coin
+ // availability, as they are not counted as available.
+ coin.status = CoinStatus.Dormant;
+ break;
+ }
+ case CoinStatus.DenomLoss:
+ break;
+ default:
+ assertUnreachable(coin.status);
+ }
+ if (!coin.spendAllocation) {
+ coin.spendAllocation = {
+ amount: Amounts.stringify(ocp.amount),
+ // id: `txn:refresh:${refreshGroupId}`,
+ id: constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId,
+ }),
+ };
+ }
+ await tx.coins.put(coin);
+ }
+}
+
+export interface CreateRefreshGroupResult {
+ refreshGroupId: string;
+ notifications: WalletNotification[];
+}
+
+/**
+ * Create a refresh group for a list of coins.
+ *
+ * Refreshes the remaining amount on the coin, effectively capturing the remaining
+ * value in the refresh group.
+ *
+ * The caller must also ensure that the coins that should be refreshed exist
+ * in the current database transaction.
+ */
+export async function createRefreshGroup(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ [
+ "denominations",
+ "coins",
+ "refreshGroups",
+ "refreshSessions",
+ "coinAvailability",
+ ]
+ >,
+ currency: string,
+ oldCoinPubs: CoinRefreshRequest[],
+ refreshReason: RefreshReason,
+ originatingTransactionId: string | undefined,
+): Promise<CreateRefreshGroupResult> {
+ // FIXME: Check that involved exchanges are reasonably up-to-date.
+ // Otherwise, error out.
+
+ const refreshGroupId = encodeCrock(getRandomBytes(32));
+
+ const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs);
+
+ const estimatedOutputPerCoin = outInfo.outputPerCoin;
+
+ await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId);
+
+ const refreshGroup: RefreshGroupRecord = {
+ operationStatus: RefreshOperationStatus.Pending,
+ currency,
+ timestampFinished: undefined,
+ statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
+ oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
+ originatingTransactionId,
+ reason: refreshReason,
+ refreshGroupId,
+ inputPerCoin: oldCoinPubs.map((x) => x.amount),
+ expectedOutputPerCoin: estimatedOutputPerCoin.map((x) =>
+ Amounts.stringify(x),
+ ),
+ infoPerExchange: outInfo.perExchangeInfo,
+ timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
+ };
+
+ if (oldCoinPubs.length == 0) {
+ logger.warn("created refresh group with zero coins");
+ refreshGroup.timestampFinished = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ refreshGroup.operationStatus = RefreshOperationStatus.Finished;
+ }
+
+ for (let i = 0; i < oldCoinPubs.length; i++) {
+ await initRefreshSession(wex, tx, refreshGroup, i);
+ }
+
+ await tx.refreshGroups.put(refreshGroup);
+
+ const newTxState = computeRefreshTransactionState(refreshGroup);
+
+ logger.trace(`created refresh group ${refreshGroupId}`);
+
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+
+ // Shepherd the task.
+ // If the current transaction fails to commit the refresh
+ // group to the DB, the shepherd will give up.
+ wex.taskScheduler.startShepherdTask(ctx.taskId);
+
+ return {
+ refreshGroupId,
+ notifications: [
+ {
+ type: NotificationType.TransactionStateTransition,
+ transactionId: ctx.transactionId,
+ oldTxState: {
+ major: TransactionMajorState.None,
+ },
+ newTxState,
+ },
+ ],
+ };
+}
+
+export function computeRefreshTransactionState(
+ rg: RefreshGroupRecord,
+): TransactionState {
+ switch (rg.operationStatus) {
+ case RefreshOperationStatus.Finished:
+ return {
+ major: TransactionMajorState.Done,
+ };
+ case RefreshOperationStatus.Failed:
+ return {
+ major: TransactionMajorState.Failed,
+ };
+ case RefreshOperationStatus.Pending:
+ return {
+ major: TransactionMajorState.Pending,
+ };
+ case RefreshOperationStatus.Suspended:
+ return {
+ major: TransactionMajorState.Suspended,
+ };
+ }
+}
+
+export function computeRefreshTransactionActions(
+ rg: RefreshGroupRecord,
+): TransactionAction[] {
+ switch (rg.operationStatus) {
+ case RefreshOperationStatus.Finished:
+ return [TransactionAction.Delete];
+ case RefreshOperationStatus.Failed:
+ return [TransactionAction.Delete];
+ case RefreshOperationStatus.Pending:
+ return [
+ TransactionAction.Retry,
+ TransactionAction.Suspend,
+ TransactionAction.Fail,
+ ];
+ case RefreshOperationStatus.Suspended:
+ return [TransactionAction.Resume, TransactionAction.Fail];
+ }
+}
+
+export function getRefreshesForTransaction(
+ wex: WalletExecutionContext,
+ transactionId: string,
+): Promise<string[]> {
+ return wex.db.runReadOnlyTx({ storeNames: ["refreshGroups"] }, async (tx) => {
+ const groups =
+ await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll(
+ transactionId,
+ );
+ return groups.map((x) =>
+ constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId: x.refreshGroupId,
+ }),
+ );
+ });
+}
+
+export interface ForceRefreshResult {
+ refreshGroupId: string;
+}
+
+export async function forceRefresh(
+ wex: WalletExecutionContext,
+ req: ForceRefreshRequest,
+): Promise<ForceRefreshResult> {
+ if (req.refreshCoinSpecs.length == 0) {
+ throw Error("refusing to create empty refresh group");
+ }
+ const res = await wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "refreshGroups",
+ "coinAvailability",
+ "refreshSessions",
+ "denominations",
+ "coins",
+ ],
+ },
+ async (tx) => {
+ let coinPubs: CoinRefreshRequest[] = [];
+ for (const c of req.refreshCoinSpecs) {
+ const coin = await tx.coins.get(c.coinPub);
+ if (!coin) {
+ throw Error(`coin (pubkey ${c}) not found`);
+ }
+ const denom = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(!!denom);
+ coinPubs.push({
+ coinPub: c.coinPub,
+ amount: c.amount ?? denom.value,
+ });
+ }
+ return await createRefreshGroup(
+ wex,
+ tx,
+ Amounts.currencyOf(coinPubs[0].amount),
+ coinPubs,
+ RefreshReason.Manual,
+ undefined,
+ );
+ },
+ );
+
+ for (const notif of res.notifications) {
+ wex.ws.notify(notif);
+ }
+
+ return {
+ refreshGroupId: res.refreshGroupId,
+ };
+}
+
+/**
+ * Wait until a refresh operation is final.
+ */
+export async function waitRefreshFinal(
+ wex: WalletExecutionContext,
+ refreshGroupId: string,
+): Promise<void> {
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+ wex.taskScheduler.startShepherdTask(ctx.taskId);
+
+ // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+ const refreshNotifFlag = new AsyncFlag();
+ // Raise purchaseNotifFlag whenever we get a notification
+ // about our refresh.
+ const cancelNotif = wex.ws.addNotificationListener((notif) => {
+ if (
+ notif.type === NotificationType.TransactionStateTransition &&
+ notif.transactionId === ctx.transactionId
+ ) {
+ refreshNotifFlag.raise();
+ }
+ });
+ const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+ cancelNotif();
+ refreshNotifFlag.raise();
+ });
+
+ try {
+ await internalWaitRefreshFinal(ctx, refreshNotifFlag);
+ } catch (e) {
+ unregisterOnCancelled();
+ cancelNotif();
+ }
+}
+
+async function internalWaitRefreshFinal(
+ ctx: RefreshTransactionContext,
+ flag: AsyncFlag,
+): Promise<void> {
+ while (true) {
+ if (ctx.wex.cancellationToken.isCancelled) {
+ throw Error("cancelled");
+ }
+
+ // Check if refresh is final
+ const res = await ctx.wex.db.runReadOnlyTx(
+ { storeNames: ["refreshGroups", "operationRetries"] },
+ async (tx) => {
+ return {
+ rg: await tx.refreshGroups.get(ctx.refreshGroupId),
+ };
+ },
+ );
+ const { rg } = res;
+ if (!rg) {
+ // Must've been deleted, we consider that final.
+ return;
+ }
+ switch (rg.operationStatus) {
+ case RefreshOperationStatus.Failed:
+ case RefreshOperationStatus.Finished:
+ // Transaction is final
+ return;
+ case RefreshOperationStatus.Pending:
+ case RefreshOperationStatus.Suspended:
+ break;
+ }
+
+ // Wait for the next transition
+ await flag.wait();
+ flag.reset();
+ }
+}