diff options
Diffstat (limited to 'packages/taler-wallet-core/src/refresh.ts')
-rw-r--r-- | packages/taler-wallet-core/src/refresh.ts | 222 |
1 files changed, 113 insertions, 109 deletions
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts index cc5eff12c..b467a1c47 100644 --- a/packages/taler-wallet-core/src/refresh.ts +++ b/packages/taler-wallet-core/src/refresh.ts @@ -21,7 +21,6 @@ import { Amounts, amountToPretty, assertUnreachable, - CancellationToken, checkDbInvariant, codecForExchangeMeltResponse, codecForExchangeRevealResponse, @@ -103,6 +102,7 @@ import { EXCHANGE_COINS_LOCK, getDenomInfo, InternalWalletState, + WalletExecutionContext, } from "./wallet.js"; import { getCandidateWithdrawalDenomsTx } from "./withdraw.js"; @@ -113,7 +113,7 @@ export class RefreshTransactionContext implements TransactionContext { readonly taskId: TaskIdStr; constructor( - public ws: InternalWalletState, + public wex: WalletExecutionContext, public refreshGroupId: string, ) { this.transactionId = constructTransactionIdentifier({ @@ -128,7 +128,7 @@ export class RefreshTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const refreshGroupId = this.refreshGroupId; - const ws = this.ws; + const ws = this.wex; await ws.db.runReadWriteTx(["refreshGroups", "tombstones"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (rg) { @@ -141,8 +141,8 @@ export class RefreshTransactionContext implements TransactionContext { } async suspendTransaction(): Promise<void> { - const { ws, refreshGroupId, transactionId } = this; - let res = await ws.db.runReadWriteTx(["refreshGroups"], async (tx) => { + const { wex, refreshGroupId, transactionId } = this; + let res = await wex.db.runReadWriteTx(["refreshGroups"], async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( @@ -168,7 +168,7 @@ export class RefreshTransactionContext implements TransactionContext { return undefined; }); if (res) { - ws.notify({ + wex.ws.notify({ type: NotificationType.TransactionStateTransition, transactionId, oldTxState: res.oldTxState, @@ -183,7 +183,7 @@ export class RefreshTransactionContext implements TransactionContext { } async resumeTransaction(): Promise<void> { - const { ws, refreshGroupId, transactionId } = this; + const { wex: ws, refreshGroupId, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["refreshGroups"], async (tx) => { @@ -217,7 +217,7 @@ export class RefreshTransactionContext implements TransactionContext { } async failTransaction(): Promise<void> { - const { ws, refreshGroupId, transactionId } = this; + const { wex: ws, refreshGroupId, transactionId } = this; const transitionInfo = await ws.db.runReadWriteTx( ["refreshGroups"], async (tx) => { @@ -331,7 +331,7 @@ function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } { * finished), return undefined. */ async function provideRefreshSession( - ws: InternalWalletState, + wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, ): Promise<RefreshSessionRecord | undefined> { @@ -339,7 +339,7 @@ async function provideRefreshSession( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); - const d = await ws.db.runReadWriteTx( + const d = await wex.db.runReadWriteTx( ["coins", "refreshGroups", "refreshSessions"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); @@ -374,16 +374,16 @@ async function provideRefreshSession( const { refreshGroup, coin } = d; - const exch = await fetchFreshExchange(ws, coin.exchangeBaseUrl); + const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl); // FIXME: use helper functions from withdraw.ts // to update and filter withdrawable denoms. - const { availableAmount, availableDenoms } = await ws.db.runReadOnlyTx( + const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx( ["denominations"], async (tx) => { const oldDenom = await getDenomInfo( - ws, + wex, tx, exch.exchangeBaseUrl, coin.denomPubHash, @@ -410,7 +410,7 @@ async function provideRefreshSession( const newCoinDenoms = selectWithdrawalDenominations( availableAmount, availableDenoms, - ws.config.testing.denomselAllowLate, + wex.ws.config.testing.denomselAllowLate, ); const transactionId = constructTransactionIdentifier({ @@ -424,7 +424,7 @@ async function provideRefreshSession( availableAmount, )} too small`, ); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "coins", "coinAvailability"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -435,25 +435,25 @@ async function provideRefreshSession( rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; const updateRes = updateGroupStatus(rg); if (updateRes.final) { - await makeCoinsVisible(ws, tx, transactionId); + await makeCoinsVisible(wex, tx, transactionId); } await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; }, ); - ws.notify({ + wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return; } const sessionSecretSeed = encodeCrock(getRandomBytes(64)); // Store refresh session for this coin in the database. - const mySession = await ws.db.runReadWriteTx( + const mySession = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -495,12 +495,11 @@ function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { } async function refreshMelt( - ws: InternalWalletState, + wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, - cancellationToken: CancellationToken, ): Promise<void> { - const d = await ws.db.runReadWriteTx( + const d = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "denominations"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); @@ -521,7 +520,7 @@ async function refreshMelt( const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( - ws, + wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, @@ -535,7 +534,7 @@ async function refreshMelt( for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( - ws, + wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, @@ -572,7 +571,7 @@ async function refreshMelt( throw Error("unsupported key type"); } - const derived = await ws.cryptoApi.deriveRefreshSession({ + const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, @@ -607,14 +606,17 @@ async function refreshMelt( age_commitment_hash: maybeAch, }; - const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => { - return await ws.http.fetch(reqUrl.href, { - method: "POST", - body: meltReqBody, - timeout: getRefreshRequestTimeout(refreshGroup), - cancellationToken, - }); - }); + const resp = await wex.ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await wex.http.fetch(reqUrl.href, { + method: "POST", + body: meltReqBody, + timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Refresh, @@ -623,7 +625,7 @@ async function refreshMelt( if (resp.status === HttpStatusCode.NotFound) { const errDetails = await readUnexpectedResponseDetails(resp); - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions", "coins", "coinAvailability"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -650,7 +652,7 @@ async function refreshMelt( refreshSession.lastError = errDetails; const updateRes = updateGroupStatus(rg); if (updateRes.final) { - await makeCoinsVisible(ws, tx, transactionId); + await makeCoinsVisible(wex, tx, transactionId); } await tx.refreshGroups.put(rg); await tx.refreshSessions.put(refreshSession); @@ -661,11 +663,11 @@ async function refreshMelt( }; }, ); - ws.notify({ + wex.ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); return; } @@ -678,7 +680,7 @@ async function refreshMelt( )} failed in refresh group ${refreshGroupId} due to conflict`, ); - const historySig = await ws.cryptoApi.signCoinHistoryRequest({ + const historySig = await wex.cryptoApi.signCoinHistoryRequest({ coinPriv: oldCoin.coinPriv, coinPub: oldCoin.coinPub, startOffset: 0, @@ -689,12 +691,12 @@ async function refreshMelt( oldCoin.exchangeBaseUrl, ); - const historyResp = await ws.http.fetch(historyUrl.href, { + const historyResp = await wex.http.fetch(historyUrl.href, { method: "GET", headers: { "Taler-Coin-History-Signature": historySig.sig, }, - cancellationToken, + cancellationToken: wex.cancellationToken, }); const historyJson = await historyResp.json(); @@ -712,7 +714,7 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db.runReadWriteTx( + await wex.db.runReadWriteTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -794,15 +796,14 @@ export async function assembleRefreshRevealRequest(args: { } async function refreshReveal( - ws: InternalWalletState, + wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, - cancellationToken: CancellationToken, ): Promise<void> { logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, ); - const d = await ws.db.runReadOnlyTx( + const d = await wex.db.runReadOnlyTx( ["refreshGroups", "refreshSessions", "coins", "denominations"], async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); @@ -824,7 +825,7 @@ async function refreshReveal( const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); const oldDenom = await getDenomInfo( - ws, + wex, tx, oldCoin.exchangeBaseUrl, oldCoin.denomPubHash, @@ -838,7 +839,7 @@ async function refreshReveal( for (const dh of refreshSession.newDenoms) { const newDenom = await getDenomInfo( - ws, + wex, tx, oldCoin.exchangeBaseUrl, dh.denomPubHash, @@ -889,7 +890,7 @@ async function refreshReveal( throw Error("unsupported key type"); } - const derived = await ws.cryptoApi.deriveRefreshSession({ + const derived = await wex.cryptoApi.deriveRefreshSession({ exchangeProtocolVersion, kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, @@ -908,7 +909,7 @@ async function refreshReveal( ); const req = await assembleRefreshRevealRequest({ - cryptoApi: ws.cryptoApi, + cryptoApi: wex.cryptoApi, derived, newDenoms: newCoinDenoms, norevealIndex: norevealIndex, @@ -917,14 +918,17 @@ async function refreshReveal( oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment, }); - const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => { - return await ws.http.fetch(reqUrl.href, { - body: req, - method: "POST", - timeout: getRefreshRequestTimeout(refreshGroup), - cancellationToken, - }); - }); + const resp = await wex.ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await wex.http.fetch(reqUrl.href, { + body: req, + method: "POST", + timeout: getRefreshRequestTimeout(refreshGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); const reveal = await readSuccessResponseJsonOrThrow( resp, @@ -947,7 +951,7 @@ async function refreshReveal( throw Error("cipher unsupported"); } const evSig = reveal.ev_sigs[newCoinIndex].ev_sig; - const denomSig = await ws.cryptoApi.unblindDenominationSignature({ + const denomSig = await wex.cryptoApi.unblindDenominationSignature({ planchet: { blindingKey: pc.blindingKey, denomPub: ncd.denomPub, @@ -978,7 +982,7 @@ async function refreshReveal( } } - const transitionInfo = await ws.db.runReadWriteTx( + const transitionInfo = await wex.db.runReadWriteTx( [ "coins", "denominations", @@ -1000,26 +1004,25 @@ async function refreshReveal( rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished; updateGroupStatus(rg); for (const coin of coins) { - await makeCoinAvailable(ws, tx, coin); + await makeCoinAvailable(wex, tx, coin); } - await makeCoinsVisible(ws, tx, transactionId); + await makeCoinsVisible(wex, tx, transactionId); await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; }, ); - notifyTransition(ws, transactionId, transitionInfo); + notifyTransition(wex, transactionId, transitionInfo); logger.trace("refresh finished (end of reveal)"); } export async function processRefreshGroup( - ws: InternalWalletState, + wex: WalletExecutionContext, refreshGroupId: string, - cancellationToken: CancellationToken, ): Promise<TaskRunResult> { logger.trace(`processing refresh group ${refreshGroupId}`); - const refreshGroup = await ws.db.runReadOnlyTx( + const refreshGroup = await wex.db.runReadOnlyTx( ["refreshGroups"], async (tx) => tx.refreshGroups.get(refreshGroupId), ); @@ -1036,28 +1039,26 @@ export async function processRefreshGroup( let errors: TalerErrorDetail[] = []; let inShutdown = false; const ps = refreshGroup.oldCoinPubs.map((x, i) => - processRefreshSession(ws, refreshGroupId, i, cancellationToken).catch( - (x) => { - if (x instanceof CryptoApiStoppedError) { - inShutdown = true; - logger.info( - "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", - ); - return; - } - if (x instanceof TalerError) { - logger.warn("process refresh session got exception (TalerError)"); - logger.warn(`exc ${x}`); - logger.warn(`exc stack ${x.stack}`); - logger.warn(`error detail: ${j2s(x.errorDetail)}`); - } else { - logger.warn("process refresh session got exception"); - logger.warn(`exc ${x}`); - logger.warn(`exc stack ${x.stack}`); - } - errors.push(getErrorDetailFromException(x)); - }, - ), + processRefreshSession(wex, refreshGroupId, i).catch((x) => { + if (x instanceof CryptoApiStoppedError) { + inShutdown = true; + logger.info( + "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.", + ); + return; + } + if (x instanceof TalerError) { + logger.warn("process refresh session got exception (TalerError)"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + logger.warn(`error detail: ${j2s(x.errorDetail)}`); + } else { + logger.warn("process refresh session got exception"); + logger.warn(`exc ${x}`); + logger.warn(`exc stack ${x.stack}`); + } + errors.push(getErrorDetailFromException(x)); + }), ); try { logger.info("waiting for refreshes"); @@ -1087,15 +1088,14 @@ export async function processRefreshGroup( } async function processRefreshSession( - ws: InternalWalletState, + wex: WalletExecutionContext, refreshGroupId: string, coinIndex: number, - cancellationToken: CancellationToken, ): Promise<void> { logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); - let { refreshGroup, refreshSession } = await ws.db.runReadOnlyTx( + let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx( ["refreshGroups", "refreshSessions"], async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); @@ -1113,7 +1113,11 @@ async function processRefreshSession( return; } if (!refreshSession) { - refreshSession = await provideRefreshSession(ws, refreshGroupId, coinIndex); + refreshSession = await provideRefreshSession( + wex, + refreshGroupId, + coinIndex, + ); } if (!refreshSession) { // We tried to create the refresh session, but didn't get a result back. @@ -1122,9 +1126,9 @@ async function processRefreshSession( return; } if (refreshSession.norevealIndex === undefined) { - await refreshMelt(ws, refreshGroupId, coinIndex, cancellationToken); + await refreshMelt(wex, refreshGroupId, coinIndex); } - await refreshReveal(ws, refreshGroupId, coinIndex, cancellationToken); + await refreshReveal(wex, refreshGroupId, coinIndex); } export interface RefreshOutputInfo { @@ -1133,7 +1137,7 @@ export interface RefreshOutputInfo { } export async function calculateRefreshOutput( - ws: InternalWalletState, + wex: WalletExecutionContext, tx: WalletDbReadOnlyTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, @@ -1154,7 +1158,7 @@ export async function calculateRefreshOutput( return denomsPerExchange[exchangeBaseUrl]; } const allDenoms = await getCandidateWithdrawalDenomsTx( - ws, + wex, tx, exchangeBaseUrl, currency, @@ -1167,7 +1171,7 @@ export async function calculateRefreshOutput( const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( - ws, + wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, @@ -1182,7 +1186,7 @@ export async function calculateRefreshOutput( denoms, denom, Amounts.parseOrThrow(refreshAmount), - ws.config.testing.denomselAllowLate, + wex.ws.config.testing.denomselAllowLate, ); const output = Amounts.sub(refreshAmount, cost).amount; let exchInfo = infoPerExchange[coin.exchangeBaseUrl]; @@ -1204,7 +1208,7 @@ export async function calculateRefreshOutput( } async function applyRefresh( - ws: InternalWalletState, + wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, @@ -1215,7 +1219,7 @@ async function applyRefresh( const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); const denom = await getDenomInfo( - ws, + wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, @@ -1278,7 +1282,7 @@ export interface CreateRefreshGroupResult { * in the current database transaction. */ export async function createRefreshGroup( - ws: InternalWalletState, + wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, @@ -1289,11 +1293,11 @@ export async function createRefreshGroup( ): Promise<CreateRefreshGroupResult> { const refreshGroupId = encodeCrock(getRandomBytes(32)); - const outInfo = await calculateRefreshOutput(ws, tx, currency, oldCoinPubs); + const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs); const estimatedOutputPerCoin = outInfo.outputPerCoin; - await applyRefresh(ws, tx, oldCoinPubs, refreshGroupId); + await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId); const refreshGroup: RefreshGroupRecord = { operationStatus: RefreshOperationStatus.Pending, @@ -1326,12 +1330,12 @@ export async function createRefreshGroup( logger.trace(`created refresh group ${refreshGroupId}`); - const ctx = new RefreshTransactionContext(ws, refreshGroupId); + const ctx = new RefreshTransactionContext(wex, refreshGroupId); // Shepherd the task. // If the current transaction fails to commit the refresh // group to the DB, the shepherd will give up. - ws.taskScheduler.startShepherdTask(ctx.taskId); + wex.taskScheduler.startShepherdTask(ctx.taskId); return { refreshGroupId, @@ -1391,10 +1395,10 @@ export function computeRefreshTransactionActions( } export function getRefreshesForTransaction( - ws: InternalWalletState, + wex: WalletExecutionContext, transactionId: string, ): Promise<string[]> { - return ws.db.runReadOnlyTx(["refreshGroups"], async (tx) => { + return wex.db.runReadOnlyTx(["refreshGroups"], async (tx) => { const groups = await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll( transactionId, @@ -1409,13 +1413,13 @@ export function getRefreshesForTransaction( } export async function forceRefresh( - ws: InternalWalletState, + wex: WalletExecutionContext, req: ForceRefreshRequest, ): Promise<{ refreshGroupId: RefreshGroupId }> { if (req.coinPubList.length == 0) { throw Error("refusing to create empty refresh group"); } - const refreshGroupId = await ws.db.runReadWriteTx( + const refreshGroupId = await wex.db.runReadWriteTx( ["refreshGroups", "coinAvailability", "denominations", "coins"], async (tx) => { let coinPubs: CoinRefreshRequest[] = []; @@ -1425,7 +1429,7 @@ export async function forceRefresh( throw Error(`coin (pubkey ${c}) not found`); } const denom = await getDenomInfo( - ws, + wex, tx, coin.exchangeBaseUrl, coin.denomPubHash, @@ -1437,7 +1441,7 @@ export async function forceRefresh( }); } return await createRefreshGroup( - ws, + wex, tx, Amounts.currencyOf(coinPubs[0].amount), coinPubs, |