summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/refresh.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/refresh.ts')
-rw-r--r--packages/taler-wallet-core/src/refresh.ts222
1 files changed, 113 insertions, 109 deletions
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
index cc5eff12c..b467a1c47 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -21,7 +21,6 @@ import {
Amounts,
amountToPretty,
assertUnreachable,
- CancellationToken,
checkDbInvariant,
codecForExchangeMeltResponse,
codecForExchangeRevealResponse,
@@ -103,6 +102,7 @@ import {
EXCHANGE_COINS_LOCK,
getDenomInfo,
InternalWalletState,
+ WalletExecutionContext,
} from "./wallet.js";
import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
@@ -113,7 +113,7 @@ export class RefreshTransactionContext implements TransactionContext {
readonly taskId: TaskIdStr;
constructor(
- public ws: InternalWalletState,
+ public wex: WalletExecutionContext,
public refreshGroupId: string,
) {
this.transactionId = constructTransactionIdentifier({
@@ -128,7 +128,7 @@ export class RefreshTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const refreshGroupId = this.refreshGroupId;
- const ws = this.ws;
+ const ws = this.wex;
await ws.db.runReadWriteTx(["refreshGroups", "tombstones"], async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (rg) {
@@ -141,8 +141,8 @@ export class RefreshTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, refreshGroupId, transactionId } = this;
- let res = await ws.db.runReadWriteTx(["refreshGroups"], async (tx) => {
+ const { wex, refreshGroupId, transactionId } = this;
+ let res = await wex.db.runReadWriteTx(["refreshGroups"], async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
if (!dg) {
logger.warn(
@@ -168,7 +168,7 @@ export class RefreshTransactionContext implements TransactionContext {
return undefined;
});
if (res) {
- ws.notify({
+ wex.ws.notify({
type: NotificationType.TransactionStateTransition,
transactionId,
oldTxState: res.oldTxState,
@@ -183,7 +183,7 @@ export class RefreshTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { ws, refreshGroupId, transactionId } = this;
+ const { wex: ws, refreshGroupId, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
@@ -217,7 +217,7 @@ export class RefreshTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { ws, refreshGroupId, transactionId } = this;
+ const { wex: ws, refreshGroupId, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["refreshGroups"],
async (tx) => {
@@ -331,7 +331,7 @@ function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } {
* finished), return undefined.
*/
async function provideRefreshSession(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
): Promise<RefreshSessionRecord | undefined> {
@@ -339,7 +339,7 @@ async function provideRefreshSession(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
- const d = await ws.db.runReadWriteTx(
+ const d = await wex.db.runReadWriteTx(
["coins", "refreshGroups", "refreshSessions"],
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
@@ -374,16 +374,16 @@ async function provideRefreshSession(
const { refreshGroup, coin } = d;
- const exch = await fetchFreshExchange(ws, coin.exchangeBaseUrl);
+ const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl);
// FIXME: use helper functions from withdraw.ts
// to update and filter withdrawable denoms.
- const { availableAmount, availableDenoms } = await ws.db.runReadOnlyTx(
+ const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx(
["denominations"],
async (tx) => {
const oldDenom = await getDenomInfo(
- ws,
+ wex,
tx,
exch.exchangeBaseUrl,
coin.denomPubHash,
@@ -410,7 +410,7 @@ async function provideRefreshSession(
const newCoinDenoms = selectWithdrawalDenominations(
availableAmount,
availableDenoms,
- ws.config.testing.denomselAllowLate,
+ wex.ws.config.testing.denomselAllowLate,
);
const transactionId = constructTransactionIdentifier({
@@ -424,7 +424,7 @@ async function provideRefreshSession(
availableAmount,
)} too small`,
);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups", "coins", "coinAvailability"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -435,25 +435,25 @@ async function provideRefreshSession(
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
const updateRes = updateGroupStatus(rg);
if (updateRes.final) {
- await makeCoinsVisible(ws, tx, transactionId);
+ await makeCoinsVisible(wex, tx, transactionId);
}
await tx.refreshGroups.put(rg);
const newTxState = computeRefreshTransactionState(rg);
return { oldTxState, newTxState };
},
);
- ws.notify({
+ wex.ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
});
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return;
}
const sessionSecretSeed = encodeCrock(getRandomBytes(64));
// Store refresh session for this coin in the database.
- const mySession = await ws.db.runReadWriteTx(
+ const mySession = await wex.db.runReadWriteTx(
["refreshGroups", "refreshSessions"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -495,12 +495,11 @@ function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
}
async function refreshMelt(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
- cancellationToken: CancellationToken,
): Promise<void> {
- const d = await ws.db.runReadWriteTx(
+ const d = await wex.db.runReadWriteTx(
["refreshGroups", "refreshSessions", "coins", "denominations"],
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
@@ -521,7 +520,7 @@ async function refreshMelt(
const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
const oldDenom = await getDenomInfo(
- ws,
+ wex,
tx,
oldCoin.exchangeBaseUrl,
oldCoin.denomPubHash,
@@ -535,7 +534,7 @@ async function refreshMelt(
for (const dh of refreshSession.newDenoms) {
const newDenom = await getDenomInfo(
- ws,
+ wex,
tx,
oldCoin.exchangeBaseUrl,
dh.denomPubHash,
@@ -572,7 +571,7 @@ async function refreshMelt(
throw Error("unsupported key type");
}
- const derived = await ws.cryptoApi.deriveRefreshSession({
+ const derived = await wex.cryptoApi.deriveRefreshSession({
exchangeProtocolVersion,
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
@@ -607,14 +606,17 @@ async function refreshMelt(
age_commitment_hash: maybeAch,
};
- const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => {
- return await ws.http.fetch(reqUrl.href, {
- method: "POST",
- body: meltReqBody,
- timeout: getRefreshRequestTimeout(refreshGroup),
- cancellationToken,
- });
- });
+ const resp = await wex.ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await wex.http.fetch(reqUrl.href, {
+ method: "POST",
+ body: meltReqBody,
+ timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
@@ -623,7 +625,7 @@ async function refreshMelt(
if (resp.status === HttpStatusCode.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups", "refreshSessions", "coins", "coinAvailability"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -650,7 +652,7 @@ async function refreshMelt(
refreshSession.lastError = errDetails;
const updateRes = updateGroupStatus(rg);
if (updateRes.final) {
- await makeCoinsVisible(ws, tx, transactionId);
+ await makeCoinsVisible(wex, tx, transactionId);
}
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
@@ -661,11 +663,11 @@ async function refreshMelt(
};
},
);
- ws.notify({
+ wex.ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
});
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return;
}
@@ -678,7 +680,7 @@ async function refreshMelt(
)} failed in refresh group ${refreshGroupId} due to conflict`,
);
- const historySig = await ws.cryptoApi.signCoinHistoryRequest({
+ const historySig = await wex.cryptoApi.signCoinHistoryRequest({
coinPriv: oldCoin.coinPriv,
coinPub: oldCoin.coinPub,
startOffset: 0,
@@ -689,12 +691,12 @@ async function refreshMelt(
oldCoin.exchangeBaseUrl,
);
- const historyResp = await ws.http.fetch(historyUrl.href, {
+ const historyResp = await wex.http.fetch(historyUrl.href, {
method: "GET",
headers: {
"Taler-Coin-History-Signature": historySig.sig,
},
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
const historyJson = await historyResp.json();
@@ -712,7 +714,7 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex;
- await ws.db.runReadWriteTx(
+ await wex.db.runReadWriteTx(
["refreshGroups", "refreshSessions"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -794,15 +796,14 @@ export async function assembleRefreshRevealRequest(args: {
}
async function refreshReveal(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
- cancellationToken: CancellationToken,
): Promise<void> {
logger.trace(
`doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
);
- const d = await ws.db.runReadOnlyTx(
+ const d = await wex.db.runReadOnlyTx(
["refreshGroups", "refreshSessions", "coins", "denominations"],
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
@@ -824,7 +825,7 @@ async function refreshReveal(
const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
const oldDenom = await getDenomInfo(
- ws,
+ wex,
tx,
oldCoin.exchangeBaseUrl,
oldCoin.denomPubHash,
@@ -838,7 +839,7 @@ async function refreshReveal(
for (const dh of refreshSession.newDenoms) {
const newDenom = await getDenomInfo(
- ws,
+ wex,
tx,
oldCoin.exchangeBaseUrl,
dh.denomPubHash,
@@ -889,7 +890,7 @@ async function refreshReveal(
throw Error("unsupported key type");
}
- const derived = await ws.cryptoApi.deriveRefreshSession({
+ const derived = await wex.cryptoApi.deriveRefreshSession({
exchangeProtocolVersion,
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
@@ -908,7 +909,7 @@ async function refreshReveal(
);
const req = await assembleRefreshRevealRequest({
- cryptoApi: ws.cryptoApi,
+ cryptoApi: wex.cryptoApi,
derived,
newDenoms: newCoinDenoms,
norevealIndex: norevealIndex,
@@ -917,14 +918,17 @@ async function refreshReveal(
oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment,
});
- const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => {
- return await ws.http.fetch(reqUrl.href, {
- body: req,
- method: "POST",
- timeout: getRefreshRequestTimeout(refreshGroup),
- cancellationToken,
- });
- });
+ const resp = await wex.ws.runSequentialized(
+ [EXCHANGE_COINS_LOCK],
+ async () => {
+ return await wex.http.fetch(reqUrl.href, {
+ body: req,
+ method: "POST",
+ timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
const reveal = await readSuccessResponseJsonOrThrow(
resp,
@@ -947,7 +951,7 @@ async function refreshReveal(
throw Error("cipher unsupported");
}
const evSig = reveal.ev_sigs[newCoinIndex].ev_sig;
- const denomSig = await ws.cryptoApi.unblindDenominationSignature({
+ const denomSig = await wex.cryptoApi.unblindDenominationSignature({
planchet: {
blindingKey: pc.blindingKey,
denomPub: ncd.denomPub,
@@ -978,7 +982,7 @@ async function refreshReveal(
}
}
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"coins",
"denominations",
@@ -1000,26 +1004,25 @@ async function refreshReveal(
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
updateGroupStatus(rg);
for (const coin of coins) {
- await makeCoinAvailable(ws, tx, coin);
+ await makeCoinAvailable(wex, tx, coin);
}
- await makeCoinsVisible(ws, tx, transactionId);
+ await makeCoinsVisible(wex, tx, transactionId);
await tx.refreshGroups.put(rg);
const newTxState = computeRefreshTransactionState(rg);
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
logger.trace("refresh finished (end of reveal)");
}
export async function processRefreshGroup(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
refreshGroupId: string,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace(`processing refresh group ${refreshGroupId}`);
- const refreshGroup = await ws.db.runReadOnlyTx(
+ const refreshGroup = await wex.db.runReadOnlyTx(
["refreshGroups"],
async (tx) => tx.refreshGroups.get(refreshGroupId),
);
@@ -1036,28 +1039,26 @@ export async function processRefreshGroup(
let errors: TalerErrorDetail[] = [];
let inShutdown = false;
const ps = refreshGroup.oldCoinPubs.map((x, i) =>
- processRefreshSession(ws, refreshGroupId, i, cancellationToken).catch(
- (x) => {
- if (x instanceof CryptoApiStoppedError) {
- inShutdown = true;
- logger.info(
- "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
- );
- return;
- }
- if (x instanceof TalerError) {
- logger.warn("process refresh session got exception (TalerError)");
- logger.warn(`exc ${x}`);
- logger.warn(`exc stack ${x.stack}`);
- logger.warn(`error detail: ${j2s(x.errorDetail)}`);
- } else {
- logger.warn("process refresh session got exception");
- logger.warn(`exc ${x}`);
- logger.warn(`exc stack ${x.stack}`);
- }
- errors.push(getErrorDetailFromException(x));
- },
- ),
+ processRefreshSession(wex, refreshGroupId, i).catch((x) => {
+ if (x instanceof CryptoApiStoppedError) {
+ inShutdown = true;
+ logger.info(
+ "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
+ );
+ return;
+ }
+ if (x instanceof TalerError) {
+ logger.warn("process refresh session got exception (TalerError)");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ logger.warn(`error detail: ${j2s(x.errorDetail)}`);
+ } else {
+ logger.warn("process refresh session got exception");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ }
+ errors.push(getErrorDetailFromException(x));
+ }),
);
try {
logger.info("waiting for refreshes");
@@ -1087,15 +1088,14 @@ export async function processRefreshGroup(
}
async function processRefreshSession(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
- cancellationToken: CancellationToken,
): Promise<void> {
logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
);
- let { refreshGroup, refreshSession } = await ws.db.runReadOnlyTx(
+ let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx(
["refreshGroups", "refreshSessions"],
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -1113,7 +1113,11 @@ async function processRefreshSession(
return;
}
if (!refreshSession) {
- refreshSession = await provideRefreshSession(ws, refreshGroupId, coinIndex);
+ refreshSession = await provideRefreshSession(
+ wex,
+ refreshGroupId,
+ coinIndex,
+ );
}
if (!refreshSession) {
// We tried to create the refresh session, but didn't get a result back.
@@ -1122,9 +1126,9 @@ async function processRefreshSession(
return;
}
if (refreshSession.norevealIndex === undefined) {
- await refreshMelt(ws, refreshGroupId, coinIndex, cancellationToken);
+ await refreshMelt(wex, refreshGroupId, coinIndex);
}
- await refreshReveal(ws, refreshGroupId, coinIndex, cancellationToken);
+ await refreshReveal(wex, refreshGroupId, coinIndex);
}
export interface RefreshOutputInfo {
@@ -1133,7 +1137,7 @@ export interface RefreshOutputInfo {
}
export async function calculateRefreshOutput(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
tx: WalletDbReadOnlyTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
>,
@@ -1154,7 +1158,7 @@ export async function calculateRefreshOutput(
return denomsPerExchange[exchangeBaseUrl];
}
const allDenoms = await getCandidateWithdrawalDenomsTx(
- ws,
+ wex,
tx,
exchangeBaseUrl,
currency,
@@ -1167,7 +1171,7 @@ export async function calculateRefreshOutput(
const coin = await tx.coins.get(ocp.coinPub);
checkDbInvariant(!!coin, "coin must be in database");
const denom = await getDenomInfo(
- ws,
+ wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
@@ -1182,7 +1186,7 @@ export async function calculateRefreshOutput(
denoms,
denom,
Amounts.parseOrThrow(refreshAmount),
- ws.config.testing.denomselAllowLate,
+ wex.ws.config.testing.denomselAllowLate,
);
const output = Amounts.sub(refreshAmount, cost).amount;
let exchInfo = infoPerExchange[coin.exchangeBaseUrl];
@@ -1204,7 +1208,7 @@ export async function calculateRefreshOutput(
}
async function applyRefresh(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
>,
@@ -1215,7 +1219,7 @@ async function applyRefresh(
const coin = await tx.coins.get(ocp.coinPub);
checkDbInvariant(!!coin, "coin must be in database");
const denom = await getDenomInfo(
- ws,
+ wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
@@ -1278,7 +1282,7 @@ export interface CreateRefreshGroupResult {
* in the current database transaction.
*/
export async function createRefreshGroup(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
>,
@@ -1289,11 +1293,11 @@ export async function createRefreshGroup(
): Promise<CreateRefreshGroupResult> {
const refreshGroupId = encodeCrock(getRandomBytes(32));
- const outInfo = await calculateRefreshOutput(ws, tx, currency, oldCoinPubs);
+ const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs);
const estimatedOutputPerCoin = outInfo.outputPerCoin;
- await applyRefresh(ws, tx, oldCoinPubs, refreshGroupId);
+ await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId);
const refreshGroup: RefreshGroupRecord = {
operationStatus: RefreshOperationStatus.Pending,
@@ -1326,12 +1330,12 @@ export async function createRefreshGroup(
logger.trace(`created refresh group ${refreshGroupId}`);
- const ctx = new RefreshTransactionContext(ws, refreshGroupId);
+ const ctx = new RefreshTransactionContext(wex, refreshGroupId);
// Shepherd the task.
// If the current transaction fails to commit the refresh
// group to the DB, the shepherd will give up.
- ws.taskScheduler.startShepherdTask(ctx.taskId);
+ wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
refreshGroupId,
@@ -1391,10 +1395,10 @@ export function computeRefreshTransactionActions(
}
export function getRefreshesForTransaction(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
transactionId: string,
): Promise<string[]> {
- return ws.db.runReadOnlyTx(["refreshGroups"], async (tx) => {
+ return wex.db.runReadOnlyTx(["refreshGroups"], async (tx) => {
const groups =
await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll(
transactionId,
@@ -1409,13 +1413,13 @@ export function getRefreshesForTransaction(
}
export async function forceRefresh(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: ForceRefreshRequest,
): Promise<{ refreshGroupId: RefreshGroupId }> {
if (req.coinPubList.length == 0) {
throw Error("refusing to create empty refresh group");
}
- const refreshGroupId = await ws.db.runReadWriteTx(
+ const refreshGroupId = await wex.db.runReadWriteTx(
["refreshGroups", "coinAvailability", "denominations", "coins"],
async (tx) => {
let coinPubs: CoinRefreshRequest[] = [];
@@ -1425,7 +1429,7 @@ export async function forceRefresh(
throw Error(`coin (pubkey ${c}) not found`);
}
const denom = await getDenomInfo(
- ws,
+ wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
@@ -1437,7 +1441,7 @@ export async function forceRefresh(
});
}
return await createRefreshGroup(
- ws,
+ wex,
tx,
Amounts.currencyOf(coinPubs[0].amount),
coinPubs,