summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-04-02 18:19:11 +0200
committerFlorian Dold <florian@dold.me>2024-04-02 18:40:02 +0200
commit651db75296bfe7c35dc7d29e39f25e6dacb72930 (patch)
treee508ebc04f93e652d6fbbfa4b0193878247ac413
parentb4185a6ef452fba47e481264af06e67524391584 (diff)
downloadwallet-core-651db75296bfe7c35dc7d29e39f25e6dacb72930.tar.gz
wallet-core-651db75296bfe7c35dc7d29e39f25e6dacb72930.tar.bz2
wallet-core-651db75296bfe7c35dc7d29e39f25e6dacb72930.zip
wallet-core: refresh cleanup, preparations for #8568
-rw-r--r--packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts66
-rw-r--r--packages/taler-harness/src/integrationtests/testrunner.ts2
-rw-r--r--packages/taler-wallet-core/src/common.ts10
-rw-r--r--packages/taler-wallet-core/src/db.ts5
-rw-r--r--packages/taler-wallet-core/src/deposits.ts8
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts1
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts4
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts2
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts3
-rw-r--r--packages/taler-wallet-core/src/recoup.ts1
-rw-r--r--packages/taler-wallet-core/src/refresh.ts407
11 files changed, 306 insertions, 203 deletions
diff --git a/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts
new file mode 100644
index 000000000..806802612
--- /dev/null
+++ b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts
@@ -0,0 +1,66 @@
+/*
+ This file is part of GNU Taler
+ (C) 2020 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
+import { GlobalTestState } from "../harness/harness.js";
+import {
+ createSimpleTestkudosEnvironmentV2,
+ createWalletDaemonWithClient,
+ withdrawViaBankV2,
+} from "../harness/helpers.js";
+
+/**
+ * Run test for refreshe after a payment.
+ */
+export async function runWalletRefreshBlockedTest(t: GlobalTestState) {
+ // Set up test environment
+
+ const { walletClient, bank, exchange, merchant } =
+ await createSimpleTestkudosEnvironmentV2(t);
+
+ // Withdraw digital cash into the wallet.
+
+ const { walletClient: w1 } = await createWalletDaemonWithClient(t, {
+ name: "w1",
+ config: {
+ testing: {
+ devModeActive: true,
+ },
+ },
+ });
+
+ await withdrawViaBankV2(t, {
+ walletClient: w1,
+ bank,
+ exchange,
+ amount: "TESTKUDOS:20",
+ });
+
+ await w1.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
+
+ // Prevent the wallet from doing refreshes by injecting a 5xx
+ // status for all refresh requests.
+ await w1.call(WalletApiOperation.ApplyDevExperiment, {
+ devExperimentUri: "taler://dev-experiment/start-block-refresh",
+ });
+
+ // FIXME: Now force a refresh, check balance
+}
+
+runWalletRefreshBlockedTest.suites = ["wallet"];
diff --git a/packages/taler-harness/src/integrationtests/testrunner.ts b/packages/taler-harness/src/integrationtests/testrunner.ts
index 380251b76..2bca91e45 100644
--- a/packages/taler-harness/src/integrationtests/testrunner.ts
+++ b/packages/taler-harness/src/integrationtests/testrunner.ts
@@ -101,6 +101,7 @@ import { runWalletGenDbTest } from "./test-wallet-gendb.js";
import { runWalletInsufficientBalanceTest } from "./test-wallet-insufficient-balance.js";
import { runWalletNotificationsTest } from "./test-wallet-notifications.js";
import { runWalletObservabilityTest } from "./test-wallet-observability.js";
+import { runWalletRefreshBlockedTest } from "./test-wallet-refresh-blocked.js";
import { runWalletRefreshTest } from "./test-wallet-refresh.js";
import { runWalletWirefeesTest } from "./test-wallet-wirefees.js";
import { runWallettestingTest } from "./test-wallettesting.js";
@@ -212,6 +213,7 @@ const allTests: TestMainFunction[] = [
runWalletWirefeesTest,
runDenomLostTest,
runWalletDenomExpireTest,
+ runWalletRefreshBlockedTest,
];
export interface TestRunSpec {
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index eb06b8eb0..5b7ceeead 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -42,8 +42,10 @@ import {
} from "@gnu-taler/taler-util";
import {
BackupProviderRecord,
+ CoinAvailabilityRecord,
CoinRecord,
DbPreciseTimestamp,
+ DenominationRecord,
DepositGroupRecord,
ExchangeEntryDbRecordStatus,
ExchangeEntryDbUpdateStatus,
@@ -145,7 +147,13 @@ export async function makeCoinAvailable(
export async function spendCoins(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
- ["coins", "coinAvailability", "refreshGroups", "denominations"]
+ [
+ "coins",
+ "coinAvailability",
+ "refreshGroups",
+ "refreshSessions",
+ "denominations",
+ ]
>,
csi: CoinsSpendInfo,
): Promise<void> {
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index 487927b8f..de22d78a8 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -2198,6 +2198,11 @@ export interface CoinAvailabilityRecord {
* a final state.
*/
visibleCoinCount: number;
+
+ /**
+ * Number of coins that we expect to obtain via a pending refresh.
+ */
+ pendingRefreshOutputCount?: number;
}
export interface ContractTermsRecord {
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 6c04b20de..a8612744f 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -489,6 +489,7 @@ async function refundDepositGroup(
[
"depositGroups",
"refreshGroups",
+ "refreshSessions",
"coins",
"denominations",
"coinAvailability",
@@ -755,9 +756,9 @@ async function processDepositGroupPendingTrack(
let updatedTxStatus: DepositElementStatus | undefined = undefined;
let newWiredCoin:
| {
- id: string;
- value: DepositTrackingInfo;
- }
+ id: string;
+ value: DepositTrackingInfo;
+ }
| undefined;
if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) {
@@ -1448,6 +1449,7 @@ export async function createDepositGroup(
"recoupGroups",
"denominations",
"refreshGroups",
+ "refreshSessions",
"coinAvailability",
"contractTerms",
],
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index 7fb387a9e..0fbe7297c 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -1614,6 +1614,7 @@ export async function updateExchangeFromUrlHandler(
"denominations",
"coinAvailability",
"refreshGroups",
+ "refreshSessions",
"exchanges",
],
async (tx) => {
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index 812d32429..3b58c1e0a 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -272,6 +272,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
[
"purchases",
"refreshGroups",
+ "refreshSessions",
"denominations",
"coinAvailability",
"coins",
@@ -1175,6 +1176,7 @@ async function handleInsufficientFunds(
"coinAvailability",
"denominations",
"refreshGroups",
+ "refreshSessions",
],
async (tx) => {
const p = await tx.purchases.get(proposalId);
@@ -1854,6 +1856,7 @@ export async function confirmPay(
"purchases",
"coins",
"refreshGroups",
+ "refreshSessions",
"denominations",
"coinAvailability",
],
@@ -3047,6 +3050,7 @@ async function storeRefunds(
"coins",
"coinAvailability",
"refreshGroups",
+ "refreshSessions",
],
async (tx) => {
const myPurchase = await tx.purchases.get(purchase.proposalId);
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 6cc552714..da68d7839 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -234,6 +234,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
"coinAvailability",
"denominations",
"refreshGroups",
+ "refreshSessions",
"coins",
"coinAvailability",
],
@@ -609,6 +610,7 @@ export async function confirmPeerPullDebit(
"coins",
"denominations",
"refreshGroups",
+ "refreshSessions",
"peerPullDebit",
"coinAvailability",
],
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index ab80888eb..20001e040 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -588,6 +588,7 @@ async function processPeerPushDebitAbortingDeletePurse(
[
"peerPushDebit",
"refreshGroups",
+ "refreshSessions",
"denominations",
"coinAvailability",
"coins",
@@ -821,6 +822,7 @@ async function processPeerPushDebitReady(
[
"peerPushDebit",
"refreshGroups",
+ "refreshSessions",
"denominations",
"coinAvailability",
"coins",
@@ -971,6 +973,7 @@ export async function initiatePeerPushDebit(
"coinAvailability",
"denominations",
"refreshGroups",
+ "refreshSessions",
"peerPushDebit",
],
async (tx) => {
diff --git a/packages/taler-wallet-core/src/recoup.ts b/packages/taler-wallet-core/src/recoup.ts
index b8b2cf808..758ba106d 100644
--- a/packages/taler-wallet-core/src/recoup.ts
+++ b/packages/taler-wallet-core/src/recoup.ts
@@ -390,6 +390,7 @@ export async function processRecoupGroup(
"coinAvailability",
"denominations",
"refreshGroups",
+ "refreshSessions",
"coins",
],
async (tx) => {
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
index 516d5e3da..490b1b5f5 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -66,7 +66,6 @@ import {
} from "@gnu-taler/taler-util/http";
import {
constructTaskIdentifier,
- makeCoinAvailable,
makeCoinsVisible,
PendingTaskType,
TaskIdStr,
@@ -82,6 +81,7 @@ import {
} from "./crypto/cryptoTypes.js";
import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
+ CoinAvailabilityRecord,
CoinRecord,
CoinSourceType,
DenominationRecord,
@@ -305,192 +305,130 @@ export function getTotalRefreshCost(
return totalCost;
}
-function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } {
- const allFinal = fnutil.all(
- rg.statusPerCoin,
- (x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
- );
- const anyFailed = fnutil.any(
- rg.statusPerCoin,
- (x) => x === RefreshCoinStatus.Failed,
- );
- if (allFinal) {
- if (anyFailed) {
- rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now());
- rg.operationStatus = RefreshOperationStatus.Failed;
- } else {
- rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now());
- rg.operationStatus = RefreshOperationStatus.Finished;
- }
- return { final: true };
+export async function getCoinAvailabilityForDenom(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["coins", "coinAvailability", "denominations"]
+ >,
+ denom: DenominationInfo,
+ ageRestriction: number,
+): Promise<CoinAvailabilityRecord> {
+ checkDbInvariant(!!denom);
+ let car = await tx.coinAvailability.get([
+ denom.exchangeBaseUrl,
+ denom.denomPubHash,
+ ageRestriction,
+ ]);
+ if (!car) {
+ car = {
+ maxAge: ageRestriction,
+ value: denom.value,
+ currency: Amounts.currencyOf(denom.value),
+ denomPubHash: denom.denomPubHash,
+ exchangeBaseUrl: denom.exchangeBaseUrl,
+ freshCoinCount: 0,
+ visibleCoinCount: 0,
+ };
}
- return { final: false };
+ return car;
}
/**
* Create a refresh session for one particular coin inside a refresh group.
- *
- * If the session already exists, return the existing one.
- *
- * If the session doesn't need to be created (refresh group gone or session already
- * finished), return undefined.
*/
-async function provideRefreshSession(
+async function initRefreshSession(
wex: WalletExecutionContext,
- refreshGroupId: string,
+ tx: WalletDbReadWriteTransaction<
+ ["refreshSessions", "coinAvailability", "coins", "denominations"]
+ >,
+ refreshGroup: RefreshGroupRecord,
coinIndex: number,
-): Promise<RefreshSessionRecord | undefined> {
+): Promise<void> {
+ const refreshGroupId = refreshGroup.refreshGroupId;
logger.trace(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
-
- const d = await wex.db.runReadWriteTx(
- ["coins", "refreshGroups", "refreshSessions"],
- async (tx) => {
- const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
- if (!refreshGroup) {
- return;
- }
- if (
- refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished
- ) {
- return;
- }
- const existingRefreshSession = await tx.refreshSessions.get([
- refreshGroupId,
- coinIndex,
- ]);
- const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
- const coin = await tx.coins.get(oldCoinPub);
- if (!coin) {
- throw Error("Can't refresh, coin not found");
- }
- return { refreshGroup, coin, existingRefreshSession };
- },
- );
-
- if (!d) {
- return undefined;
- }
-
- if (d.existingRefreshSession) {
- return d.existingRefreshSession;
+ const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
+ const oldCoin = await tx.coins.get(oldCoinPub);
+ if (!oldCoin) {
+ throw Error("Can't refresh, coin not found");
}
- const { refreshGroup, coin } = d;
-
- const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl);
+ const exchangeBaseUrl = oldCoin.exchangeBaseUrl;
- // FIXME: use helper functions from withdraw.ts
- // to update and filter withdrawable denoms.
+ const sessionSecretSeed = encodeCrock(getRandomBytes(64));
- const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx(
- ["denominations"],
- async (tx) => {
- const oldDenom = await getDenomInfo(
- wex,
- tx,
- exch.exchangeBaseUrl,
- coin.denomPubHash,
- );
+ const oldDenom = await getDenomInfo(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ );
- if (!oldDenom) {
- throw Error("db inconsistent: denomination for coin not found");
- }
+ if (!oldDenom) {
+ throw Error("db inconsistent: denomination for coin not found");
+ }
- // FIXME: Use denom groups instead of querying all denominations!
- const availableDenoms: DenominationRecord[] =
- await tx.denominations.indexes.byExchangeBaseUrl.getAll(
- exch.exchangeBaseUrl,
- );
+ const currency = refreshGroup.currency;
- const availableAmount = Amounts.sub(
- refreshGroup.inputPerCoin[coinIndex],
- oldDenom.feeRefresh,
- ).amount;
- return { availableAmount, availableDenoms };
- },
+ const availableDenoms = await getCandidateWithdrawalDenomsTx(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ currency,
);
+ const availableAmount = Amounts.sub(
+ refreshGroup.inputPerCoin[coinIndex],
+ oldDenom.feeRefresh,
+ ).amount;
+
const newCoinDenoms = selectWithdrawalDenominations(
availableAmount,
availableDenoms,
wex.ws.config.testing.denomselAllowLate,
);
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Refresh,
- refreshGroupId,
- });
-
if (newCoinDenoms.selectedDenoms.length === 0) {
logger.trace(
`not refreshing, available amount ${amountToPretty(
availableAmount,
)} too small`,
);
- const transitionInfo = await wex.db.runReadWriteTx(
- ["refreshGroups", "coins", "coinAvailability"],
- async (tx) => {
- const rg = await tx.refreshGroups.get(refreshGroupId);
- if (!rg) {
- return;
- }
- const oldTxState = computeRefreshTransactionState(rg);
- rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
- const updateRes = updateGroupStatus(rg);
- if (updateRes.final) {
- await makeCoinsVisible(wex, tx, transactionId);
- }
- await tx.refreshGroups.put(rg);
- const newTxState = computeRefreshTransactionState(rg);
- return { oldTxState, newTxState };
- },
- );
- wex.ws.notify({
- type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
- });
- notifyTransition(wex, transactionId, transitionInfo);
- return;
+ refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
}
- const sessionSecretSeed = encodeCrock(getRandomBytes(64));
+ for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) {
+ const dph = newCoinDenoms.selectedDenoms[i].denomPubHash;
+ const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph);
+ if (!denom) {
+ logger.error(`denom ${dph} not in DB`);
+ continue;
+ }
+ const car = await getCoinAvailabilityForDenom(
+ wex,
+ tx,
+ denom,
+ oldCoin.maxAge,
+ );
+ car.pendingRefreshOutputCount =
+ (car.pendingRefreshOutputCount ?? 0) +
+ newCoinDenoms.selectedDenoms[i].count;
+ await tx.coinAvailability.put(car);
+ }
- // Store refresh session for this coin in the database.
- const mySession = await wex.db.runReadWriteTx(
- ["refreshGroups", "refreshSessions"],
- async (tx) => {
- const rg = await tx.refreshGroups.get(refreshGroupId);
- if (!rg) {
- return;
- }
- const existingSession = await tx.refreshSessions.get([
- refreshGroupId,
- coinIndex,
- ]);
- if (existingSession) {
- return existingSession;
- }
- const newSession: RefreshSessionRecord = {
- coinIndex,
- refreshGroupId,
- norevealIndex: undefined,
- sessionSecretSeed: sessionSecretSeed,
- newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
- count: x.count,
- denomPubHash: x.denomPubHash,
- })),
- amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
- };
- await tx.refreshSessions.put(newSession);
- return newSession;
- },
- );
- logger.trace(
- `found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
- );
- return mySession;
+ const newSession: RefreshSessionRecord = {
+ coinIndex,
+ refreshGroupId,
+ norevealIndex: undefined,
+ sessionSecretSeed: sessionSecretSeed,
+ newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
+ count: x.count,
+ denomPubHash: x.denomPubHash,
+ })),
+ amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
+ };
+ await tx.refreshSessions.put(newSession);
}
function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
@@ -499,6 +437,14 @@ function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
});
}
+/**
+ * Run the melt step of a refresh session.
+ *
+ * If the melt step succeeds or fails permanently,
+ * the status in the refresh group is updated.
+ *
+ * When a transient error occurs, an exception is thrown.
+ */
async function refreshMelt(
wex: WalletExecutionContext,
refreshGroupId: string,
@@ -627,7 +573,7 @@ async function refreshMelt(
if (resp.status === HttpStatusCode.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp);
- const transitionInfo = await wex.db.runReadWriteTx(
+ await wex.db.runReadWriteTx(
["refreshGroups", "refreshSessions", "coins", "coinAvailability"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -640,7 +586,6 @@ async function refreshMelt(
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
- const oldTxState = computeRefreshTransactionState(rg);
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
@@ -652,24 +597,10 @@ async function refreshMelt(
);
}
refreshSession.lastError = errDetails;
- const updateRes = updateGroupStatus(rg);
- if (updateRes.final) {
- await makeCoinsVisible(wex, tx, transactionId);
- }
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
- const newTxState = computeRefreshTransactionState(rg);
- return {
- oldTxState,
- newTxState,
- };
},
);
- wex.ws.notify({
- type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
- });
- notifyTransition(wex, transactionId, transitionInfo);
return;
}
@@ -718,6 +649,7 @@ async function refreshMelt(
const input = Amounts.parseOrThrow(rg.inputPerCoin[rs.coinIndex]);
const newSel = selectWithdrawalDenominations(input, candidates);
rs.amountRefreshOutput = newSel.totalCoinValue;
+ // FIXME: This is wrong! When denoms are re-selected, the melt commitment breaks.
rs.newDenoms = newSel.selectedDenoms.map((x) => ({
count: x.count,
denomPubHash: x.denomPubHash,
@@ -1043,7 +975,7 @@ async function refreshReveal(
}
}
- const transitionInfo = await wex.db.runReadWriteTx(
+ await wex.db.runReadWriteTx(
[
"coins",
"denominations",
@@ -1061,19 +993,32 @@ async function refreshReveal(
if (!rs) {
return;
}
- const oldTxState = computeRefreshTransactionState(rg);
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
- updateGroupStatus(rg);
for (const coin of coins) {
- await makeCoinAvailable(wex, tx, coin);
+ const denomInfo = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(!!denomInfo);
+ const car = await getCoinAvailabilityForDenom(
+ wex,
+ tx,
+ denomInfo,
+ coin.maxAge,
+ );
+ checkDbInvariant(
+ car.pendingRefreshOutputCount != null &&
+ car.pendingRefreshOutputCount > 0,
+ );
+ car.pendingRefreshOutputCount--;
+ car.freshCoinCount++;
+ await tx.coinAvailability.put(car);
}
- await makeCoinsVisible(wex, tx, transactionId);
await tx.refreshGroups.put(rg);
- const newTxState = computeRefreshTransactionState(rg);
- return { oldTxState, newTxState };
},
);
- notifyTransition(wex, transactionId, transitionInfo);
logger.trace("refresh finished (end of reveal)");
}
@@ -1121,17 +1066,71 @@ export async function processRefreshGroup(
errors.push(getErrorDetailFromException(x));
}),
);
- try {
- logger.info("waiting for refreshes");
- await Promise.all(ps);
- logger.info("refresh group finished");
- } catch (e) {
- logger.warn("process refresh sessions got exception");
- logger.warn(`exception: ${e}`);
- }
+ await Promise.all(ps);
if (inShutdown) {
- return TaskRunResult.backoff();
+ return TaskRunResult.finished();
}
+
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+
+ // We've processed all refresh session and can now update the
+ // status of the whole refresh group.
+
+ const transitionInfo = await wex.db.runReadWriteTx(
+ ["coins", "coinAvailability", "refreshGroups"],
+ async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ switch (rg.operationStatus) {
+ case RefreshOperationStatus.Pending:
+ break;
+ default:
+ return undefined;
+ }
+ const oldTxState = computeRefreshTransactionState(rg);
+ const allFinal = fnutil.all(
+ rg.statusPerCoin,
+ (x) =>
+ x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
+ );
+ const anyFailed = fnutil.any(
+ rg.statusPerCoin,
+ (x) => x === RefreshCoinStatus.Failed,
+ );
+ if (allFinal) {
+ if (anyFailed) {
+ rg.timestampFinished = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ rg.operationStatus = RefreshOperationStatus.Failed;
+ } else {
+ rg.timestampFinished = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ rg.operationStatus = RefreshOperationStatus.Finished;
+ }
+ }
+ if (allFinal) {
+ await makeCoinsVisible(wex, tx, ctx.transactionId);
+ await tx.refreshGroups.put(rg);
+ const newTxState = computeRefreshTransactionState(rg);
+ return {
+ oldTxState,
+ newTxState,
+ };
+ } else {
+ return undefined;
+ }
+ },
+ );
+
+ if (transitionInfo) {
+ notifyTransition(wex, ctx.transactionId, transitionInfo);
+ return TaskRunResult.progress();
+ }
+
if (errors.length > 0) {
return {
type: TaskRunResultType.Error,
@@ -1174,16 +1173,7 @@ async function processRefreshSession(
return;
}
if (!refreshSession) {
- refreshSession = await provideRefreshSession(
- wex,
- refreshGroupId,
- coinIndex,
- );
- }
- if (!refreshSession) {
- // We tried to create the refresh session, but didn't get a result back.
- // This means that either the session is finished, or that creating
- // one isn't necessary.
+ // No refresh session for that coin.
return;
}
if (refreshSession.norevealIndex === undefined) {
@@ -1268,7 +1258,7 @@ export async function calculateRefreshOutput(
};
}
-async function applyRefresh(
+async function applyRefreshToOldCoins(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
@@ -1347,20 +1337,29 @@ export interface CreateRefreshGroupResult {
export async function createRefreshGroup(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
- ["denominations", "coins", "refreshGroups", "coinAvailability"]
+ [
+ "denominations",
+ "coins",
+ "refreshGroups",
+ "refreshSessions",
+ "coinAvailability",
+ ]
>,
currency: string,
oldCoinPubs: CoinRefreshRequest[],
refreshReason: RefreshReason,
originatingTransactionId: string | undefined,
): Promise<CreateRefreshGroupResult> {
+ // FIXME: Check that involved exchanges are reasonably up-to-date.
+ // Otherwise, error out.
+
const refreshGroupId = encodeCrock(getRandomBytes(32));
const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs);
const estimatedOutputPerCoin = outInfo.outputPerCoin;
- await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId);
+ await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId);
const refreshGroup: RefreshGroupRecord = {
operationStatus: RefreshOperationStatus.Pending,
@@ -1387,6 +1386,10 @@ export async function createRefreshGroup(
refreshGroup.operationStatus = RefreshOperationStatus.Finished;
}
+ for (let i = 0; i < oldCoinPubs.length; i++) {
+ await initRefreshSession(wex, tx, refreshGroup, i);
+ }
+
await tx.refreshGroups.put(refreshGroup);
const newTxState = computeRefreshTransactionState(refreshGroup);
@@ -1487,7 +1490,13 @@ export async function forceRefresh(
throw Error("refusing to create empty refresh group");
}
const res = await wex.db.runReadWriteTx(
- ["refreshGroups", "coinAvailability", "denominations", "coins"],
+ [
+ "refreshGroups",
+ "coinAvailability",
+ "refreshSessions",
+ "denominations",
+ "coins",
+ ],
async (tx) => {
let coinPubs: CoinRefreshRequest[] = [];
for (const c of req.coinPubList) {