summaryrefslogtreecommitdiff
path: root/src/operations/refresh.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2019-12-15 16:59:00 +0100
committerFlorian Dold <florian.dold@gmail.com>2019-12-15 16:59:00 +0100
commit4966376839365536923cd6cfbb86d15071432e1a (patch)
tree6658b4a84df5ba6a0189e6d79e37fb0cc7fb597a /src/operations/refresh.ts
parentf4043a0f8114b1b8612e01a5cdf65b8d6ffc6f00 (diff)
downloadwallet-core-4966376839365536923cd6cfbb86d15071432e1a.tar.gz
wallet-core-4966376839365536923cd6cfbb86d15071432e1a.tar.bz2
wallet-core-4966376839365536923cd6cfbb86d15071432e1a.zip
group refresh sessions into groups for nicer history
Diffstat (limited to 'src/operations/refresh.ts')
-rw-r--r--src/operations/refresh.ts364
1 files changed, 239 insertions, 125 deletions
diff --git a/src/operations/refresh.ts b/src/operations/refresh.ts
index 4ffc3ea60..be23a5bb0 100644
--- a/src/operations/refresh.ts
+++ b/src/operations/refresh.ts
@@ -25,16 +25,24 @@ import {
RefreshSessionRecord,
initRetryInfo,
updateRetryInfoTimeout,
+ RefreshGroupRecord,
} from "../types/dbTypes";
import { amountToPretty } from "../util/helpers";
-import { Database } from "../util/query";
+import { Database, TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state";
import { Logger } from "../util/logging";
import { getWithdrawDenomList } from "./withdraw";
import { updateExchangeFromUrl } from "./exchanges";
-import { getTimestampNow, OperationError } from "../types/walletTypes";
+import {
+ getTimestampNow,
+ OperationError,
+ CoinPublicKey,
+ RefreshReason,
+ RefreshGroupId,
+} from "../types/walletTypes";
import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications";
+import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
const logger = new Logger("refresh.ts");
@@ -71,11 +79,130 @@ export function getTotalRefreshCost(
return totalCost;
}
+/**
+ * Create a refresh session inside a refresh group.
+ */
+async function refreshCreateSession(
+ ws: InternalWalletState,
+ refreshGroupId: string,
+ coinIndex: number,
+): Promise<void> {
+ logger.trace(
+ `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
+ );
+ const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ if (refreshGroup.finishedPerCoin[coinIndex]) {
+ return;
+ }
+ const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
+ if (existingRefreshSession) {
+ return;
+ }
+ const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
+ const coin = await ws.db.get(Stores.coins, oldCoinPub);
+ if (!coin) {
+ throw Error("Can't refresh, coin not found");
+ }
+
+ const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl);
+ if (!exchange) {
+ throw Error("db inconsistent: exchange of coin not found");
+ }
+
+ const oldDenom = await ws.db.get(Stores.denominations, [
+ exchange.baseUrl,
+ coin.denomPub,
+ ]);
+
+ if (!oldDenom) {
+ throw Error("db inconsistent: denomination for coin not found");
+ }
+
+ const availableDenoms: DenominationRecord[] = await ws.db
+ .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl)
+ .toArray();
+
+ const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh)
+ .amount;
+
+ const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms);
+
+ if (newCoinDenoms.length === 0) {
+ logger.trace(
+ `not refreshing, available amount ${amountToPretty(
+ availableAmount,
+ )} too small`,
+ );
+ await ws.db.runWithWriteTransaction(
+ [Stores.coins, Stores.refreshGroups],
+ async tx => {
+ const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ rg.finishedPerCoin[coinIndex] = true;
+ await tx.put(Stores.refreshGroups, rg);
+ },
+ );
+ ws.notify({ type: NotificationType.RefreshRefused });
+ return;
+ }
+
+ const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession(
+ exchange.baseUrl,
+ 3,
+ coin,
+ newCoinDenoms,
+ oldDenom.feeRefresh,
+ );
+
+ // Store refresh session and subtract refreshed amount from
+ // coin in the same transaction.
+ await ws.db.runWithWriteTransaction(
+ [Stores.refreshGroups, Stores.coins],
+ async tx => {
+ const c = await tx.get(Stores.coins, coin.coinPub);
+ if (!c) {
+ throw Error("coin not found, but marked for refresh");
+ }
+ const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
+ if (r.saturated) {
+ console.log("can't refresh coin, no amount left");
+ return;
+ }
+ c.currentAmount = r.amount;
+ c.status = CoinStatus.Dormant;
+ const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.refreshSessionPerCoin[coinIndex]) {
+ return;
+ }
+ rg.refreshSessionPerCoin[coinIndex] = refreshSession;
+ await tx.put(Stores.refreshGroups, rg);
+ await tx.put(Stores.coins, c);
+ },
+ );
+ logger.info(
+ `created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
+ );
+ ws.notify({ type: NotificationType.RefreshStarted });
+}
+
async function refreshMelt(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
+ coinIndex: number,
): Promise<void> {
- const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
+ const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) {
return;
}
@@ -122,7 +249,11 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex;
- await ws.db.mutate(Stores.refresh, refreshSessionId, rs => {
+ await ws.db.mutate(Stores.refreshGroups, refreshGroupId, rg => {
+ const rs = rg.refreshSessionPerCoin[coinIndex];
+ if (!rs) {
+ return;
+ }
if (rs.norevealIndex !== undefined) {
return;
}
@@ -130,7 +261,7 @@ async function refreshMelt(
return;
}
rs.norevealIndex = norevealIndex;
- return rs;
+ return rg;
});
ws.notify({
@@ -140,9 +271,14 @@ async function refreshMelt(
async function refreshReveal(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
+ coinIndex: number,
): Promise<void> {
- const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
+ const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) {
return;
}
@@ -253,23 +389,38 @@ async function refreshReveal(
}
await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.refresh],
+ [Stores.coins, Stores.refreshGroups],
async tx => {
- const rs = await tx.get(Stores.refresh, refreshSessionId);
- if (!rs) {
+ const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ if (!rg) {
console.log("no refresh session found");
return;
}
+ const rs = rg.refreshSessionPerCoin[coinIndex];
+ if (!rs) {
+ return;
+ }
if (rs.finishedTimestamp) {
console.log("refresh session already finished");
return;
}
rs.finishedTimestamp = getTimestampNow();
- rs.retryInfo = initRetryInfo(false);
+ rg.finishedPerCoin[coinIndex] = true;
+ let allDone = true;
+ for (const f of rg.finishedPerCoin) {
+ if (!f) {
+ allDone = false;
+ break;
+ }
+ }
+ if (allDone) {
+ rg.finishedTimestamp = getTimestampNow();
+ rg.retryInfo = initRetryInfo(false);
+ }
for (let coin of coins) {
await tx.put(Stores.coins, coin);
}
- await tx.put(Stores.refresh, rs);
+ await tx.put(Stores.refreshGroups, rg);
},
);
console.log("refresh finished (end of reveal)");
@@ -280,11 +431,11 @@ async function refreshReveal(
async function incrementRefreshRetry(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
err: OperationError | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.refresh], async tx => {
- const r = await tx.get(Stores.refresh, refreshSessionId);
+ await ws.db.runWithWriteTransaction([Stores.refreshGroups], async tx => {
+ const r = await tx.get(Stores.refreshGroups, refreshGroupId);
if (!r) {
return;
}
@@ -294,31 +445,31 @@ async function incrementRefreshRetry(
r.retryInfo.retryCounter++;
updateRetryInfoTimeout(r.retryInfo);
r.lastError = err;
- await tx.put(Stores.refresh, r);
+ await tx.put(Stores.refreshGroups, r);
});
ws.notify({ type: NotificationType.RefreshOperationError });
}
-export async function processRefreshSession(
+export async function processRefreshGroup(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
forceNow: boolean = false,
-) {
- return ws.memoProcessRefresh.memo(refreshSessionId, async () => {
+): Promise<void> {
+ await ws.memoProcessRefresh.memo(refreshGroupId, async () => {
const onOpErr = (e: OperationError) =>
- incrementRefreshRetry(ws, refreshSessionId, e);
- return guardOperationException(
- () => processRefreshSessionImpl(ws, refreshSessionId, forceNow),
+ incrementRefreshRetry(ws, refreshGroupId, e);
+ return await guardOperationException(
+ async () => await processRefreshGroupImpl(ws, refreshGroupId, forceNow),
onOpErr,
);
});
}
-async function resetRefreshSessionRetry(
+async function resetRefreshGroupRetry(
ws: InternalWalletState,
refreshSessionId: string,
) {
- await ws.db.mutate(Stores.refresh, refreshSessionId, x => {
+ await ws.db.mutate(Stores.refreshGroups, refreshSessionId, x => {
if (x.retryInfo.active) {
x.retryInfo = initRetryInfo();
}
@@ -326,124 +477,87 @@ async function resetRefreshSessionRetry(
});
}
-async function processRefreshSessionImpl(
+async function processRefreshGroupImpl(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
forceNow: boolean,
) {
if (forceNow) {
- await resetRefreshSessionRetry(ws, refreshSessionId);
+ await resetRefreshGroupRetry(ws, refreshGroupId);
}
- const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
- if (!refreshSession) {
+ const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
return;
}
- if (refreshSession.finishedTimestamp) {
+ if (refreshGroup.finishedTimestamp) {
return;
}
- if (typeof refreshSession.norevealIndex !== "number") {
- await refreshMelt(ws, refreshSession.refreshSessionId);
- }
- await refreshReveal(ws, refreshSession.refreshSessionId);
+ const ps = refreshGroup.oldCoinPubs.map((x, i) =>
+ processRefreshSession(ws, refreshGroupId, i),
+ );
+ await Promise.all(ps);
logger.trace("refresh finished");
}
-export async function refresh(
+async function processRefreshSession(
ws: InternalWalletState,
- oldCoinPub: string,
- force: boolean = false,
-): Promise<void> {
- const coin = await ws.db.get(Stores.coins, oldCoinPub);
- if (!coin) {
- console.warn("can't refresh, coin not in database");
+ refreshGroupId: string,
+ coinIndex: number,
+) {
+ logger.trace(`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`);
+ let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
return;
}
- switch (coin.status) {
- case CoinStatus.Dirty:
- break;
- case CoinStatus.Dormant:
- return;
- case CoinStatus.Fresh:
- if (!force) {
- return;
- }
- break;
- }
-
- const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl);
- if (!exchange) {
- throw Error("db inconsistent: exchange of coin not found");
+ if (refreshGroup.finishedPerCoin[coinIndex]) {
+ return;
}
-
- const oldDenom = await ws.db.get(Stores.denominations, [
- exchange.baseUrl,
- coin.denomPub,
- ]);
-
- if (!oldDenom) {
- throw Error("db inconsistent: denomination for coin not found");
+ if (!refreshGroup.refreshSessionPerCoin[coinIndex]) {
+ await refreshCreateSession(ws, refreshGroupId, coinIndex);
+ refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
}
-
- const availableDenoms: DenominationRecord[] = await ws.db
- .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl)
- .toArray();
-
- const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh)
- .amount;
-
- const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms);
-
- if (newCoinDenoms.length === 0) {
- logger.trace(
- `not refreshing, available amount ${amountToPretty(
- availableAmount,
- )} too small`,
- );
- await ws.db.mutate(Stores.coins, oldCoinPub, x => {
- if (x.status != coin.status) {
- // Concurrent modification?
- return;
- }
- x.status = CoinStatus.Dormant;
- return x;
- });
- ws.notify({ type: NotificationType.RefreshRefused });
+ const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
+ if (!refreshSession) {
+ if (!refreshGroup.finishedPerCoin[coinIndex]) {
+ throw Error(
+ "BUG: refresh session was not created and coin not marked as finished",
+ );
+ }
return;
}
+ if (refreshSession.norevealIndex === undefined) {
+ await refreshMelt(ws, refreshGroupId, coinIndex);
+ }
+ await refreshReveal(ws, refreshGroupId, coinIndex);
+}
- const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession(
- exchange.baseUrl,
- 3,
- coin,
- newCoinDenoms,
- oldDenom.feeRefresh,
- );
-
- // Store refresh session and subtract refreshed amount from
- // coin in the same transaction.
- await ws.db.runWithWriteTransaction(
- [Stores.refresh, Stores.coins],
- async tx => {
- const c = await tx.get(Stores.coins, coin.coinPub);
- if (!c) {
- return;
- }
- if (c.status !== CoinStatus.Dirty) {
- return;
- }
- const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
- if (r.saturated) {
- console.log("can't refresh coin, no amount left");
- return;
- }
- c.currentAmount = r.amount;
- c.status = CoinStatus.Dormant;
- await tx.put(Stores.refresh, refreshSession);
- await tx.put(Stores.coins, c);
- },
- );
- logger.info(`created refresh session ${refreshSession.refreshSessionId}`);
- ws.notify({ type: NotificationType.RefreshStarted });
+/**
+ * Create a refresh group for a list of coins.
+ */
+export async function createRefreshGroup(
+ tx: TransactionHandle,
+ oldCoinPubs: CoinPublicKey[],
+ reason: RefreshReason,
+): Promise<RefreshGroupId> {
+ const refreshGroupId = encodeCrock(getRandomBytes(32));
+
+ const refreshGroup: RefreshGroupRecord = {
+ finishedTimestamp: undefined,
+ finishedPerCoin: oldCoinPubs.map(x => false),
+ lastError: undefined,
+ lastErrorPerCoin: oldCoinPubs.map(x => undefined),
+ oldCoinPubs: oldCoinPubs.map(x => x.coinPub),
+ reason,
+ refreshGroupId,
+ refreshSessionPerCoin: oldCoinPubs.map(x => undefined),
+ retryInfo: initRetryInfo(),
+ };
- await processRefreshSession(ws, refreshSession.refreshSessionId);
+ await tx.put(Stores.refreshGroups, refreshGroup);
+ return {
+ refreshGroupId,
+ };
}