From 1ec521b9d214b286e747b3ccb3113730ac3a2509 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 19 Feb 2024 12:49:17 +0100 Subject: wallet-core: simplify/unify DB access --- packages/taler-wallet-core/src/db.ts | 98 ++-- .../taler-wallet-core/src/internal-wallet-state.ts | 28 +- .../taler-wallet-core/src/operations/attention.ts | 110 ++-- .../src/operations/backup/index.ts | 472 ++++++++------- .../taler-wallet-core/src/operations/balance.ts | 188 +++--- .../taler-wallet-core/src/operations/common.ts | 25 +- .../taler-wallet-core/src/operations/deposits.ts | 343 +++++------ .../taler-wallet-core/src/operations/exchanges.ts | 211 ++++--- .../src/operations/pay-merchant.ts | 647 ++++++++++----------- .../src/operations/pay-peer-common.ts | 143 +++-- .../src/operations/pay-peer-pull-credit.ts | 230 ++++---- .../src/operations/pay-peer-pull-debit.ts | 124 ++-- .../src/operations/pay-peer-push-credit.ts | 86 +-- .../src/operations/pay-peer-push-debit.ts | 204 +++---- .../taler-wallet-core/src/operations/recoup.ts | 160 +++-- .../taler-wallet-core/src/operations/refresh.ts | 239 ++++---- .../taler-wallet-core/src/operations/reward.ts | 461 ++------------- .../taler-wallet-core/src/operations/testing.ts | 8 +- .../src/operations/transactions.ts | 234 ++++---- .../taler-wallet-core/src/operations/withdraw.ts | 530 ++++++++--------- packages/taler-wallet-core/src/shepherd.ts | 62 +- .../taler-wallet-core/src/util/coinSelection.ts | 79 ++- .../src/util/instructedAmountConversion.ts | 14 +- packages/taler-wallet-core/src/util/query.ts | 209 +------ packages/taler-wallet-core/src/wallet.ts | 303 +++++----- 25 files changed, 2276 insertions(+), 2932 deletions(-) (limited to 'packages/taler-wallet-core/src') diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 0aae2ddff..ff1e87ccb 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -64,10 +64,7 @@ import { DbRetryInfo, TaskIdentifiers } from "./operations/common.js"; import { DbAccess, DbReadOnlyTransaction, - DbReadOnlyTransactionArr, DbReadWriteTransaction, - DbReadWriteTransactionArr, - GetReadWriteAccess, IndexDescriptor, StoreDescriptor, StoreNames, @@ -1338,9 +1335,9 @@ export enum ConfigRecordKey { */ export type ConfigRecord = | { - key: ConfigRecordKey.WalletBackupState; - value: WalletBackupConfState; - } + key: ConfigRecordKey.WalletBackupState; + value: WalletBackupConfState; + } | { key: ConfigRecordKey.CurrencyDefaultsApplied; value: boolean } | { key: ConfigRecordKey.DevMode; value: boolean } | { key: ConfigRecordKey.TestLoopTx; value: number }; @@ -1618,15 +1615,15 @@ export enum BackupProviderStateTag { export type BackupProviderState = | { - tag: BackupProviderStateTag.Provisional; - } + tag: BackupProviderStateTag.Provisional; + } | { - tag: BackupProviderStateTag.Ready; - nextBackupTimestamp: DbPreciseTimestamp; - } + tag: BackupProviderStateTag.Ready; + nextBackupTimestamp: DbPreciseTimestamp; + } | { - tag: BackupProviderStateTag.Retrying; - }; + tag: BackupProviderStateTag.Retrying; + }; export interface BackupProviderTerms { supportedProtocolVersion: string; @@ -2715,21 +2712,23 @@ export const WalletStoresV1 = { type WalletStoreNames = StoreNames; -export type WalletDbReadOnlyTransaction< - Stores extends StoreNames & string, -> = DbReadOnlyTransaction; - export type WalletDbReadWriteTransaction< - Stores extends StoreNames & string, -> = DbReadWriteTransaction; - -export type WalletDbReadWriteTransactionArr< StoresArr extends Array>, -> = DbReadWriteTransactionArr; +> = DbReadWriteTransaction; -export type WalletDbReadOnlyTransactionArr< +export type WalletDbReadOnlyTransaction< StoresArr extends Array>, -> = DbReadOnlyTransactionArr; +> = DbReadOnlyTransaction; + +export type WalletDbAllStoresReadOnlyTransaction<> = DbReadOnlyTransaction< + typeof WalletStoresV1, + Array> +>; + +export type WalletDbAllStoresReadWriteTransaction<> = DbReadWriteTransaction< + typeof WalletStoresV1, + Array> +>; /** * An applied migration. @@ -2939,7 +2938,12 @@ export async function importDb(db: IDBDatabase, dumpJson: any): Promise { export interface FixupDescription { name: string; - fn(tx: GetReadWriteAccess): Promise; + fn( + tx: DbReadWriteTransaction< + typeof WalletStoresV1, + Array> + >, + ): Promise; } /** @@ -2953,7 +2957,7 @@ export async function applyFixups( db: DbAccess, ): Promise { logger.trace("applying fixups"); - await db.mktxAll().runReadWrite(async (tx) => { + await db.runAllStoresReadWriteTx(async (tx) => { for (const fixupInstruction of walletDbFixups) { logger.trace(`checking fixup ${fixupInstruction.name}`); const fixupRecord = await tx.fixups.get(fixupInstruction.name); @@ -3156,7 +3160,7 @@ export async function openStoredBackupsDatabase( idbFactory, TALER_WALLET_STORED_BACKUPS_DB_NAME, 1, - () => { }, + () => {}, onStoredBackupsDbUpgradeNeeded, ); @@ -3179,26 +3183,24 @@ export async function openTalerDatabase( idbFactory, TALER_WALLET_META_DB_NAME, 1, - () => { }, + () => {}, onMetaDbUpgradeNeeded, ); const metaDb = new DbAccess(metaDbHandle, walletMetadataStore); let currentMainVersion: string | undefined; - await metaDb - .mktx((stores) => [stores.metaConfig]) - .runReadWrite(async (tx) => { - const dbVersionRecord = await tx.metaConfig.get(CURRENT_DB_CONFIG_KEY); - if (!dbVersionRecord) { - currentMainVersion = TALER_WALLET_MAIN_DB_NAME; - await tx.metaConfig.put({ - key: CURRENT_DB_CONFIG_KEY, - value: TALER_WALLET_MAIN_DB_NAME, - }); - } else { - currentMainVersion = dbVersionRecord.value; - } - }); + await metaDb.runReadWriteTx(["metaConfig"], async (tx) => { + const dbVersionRecord = await tx.metaConfig.get(CURRENT_DB_CONFIG_KEY); + if (!dbVersionRecord) { + currentMainVersion = TALER_WALLET_MAIN_DB_NAME; + await tx.metaConfig.put({ + key: CURRENT_DB_CONFIG_KEY, + value: TALER_WALLET_MAIN_DB_NAME, + }); + } else { + currentMainVersion = dbVersionRecord.value; + } + }); if (currentMainVersion !== TALER_WALLET_MAIN_DB_NAME) { switch (currentMainVersion) { @@ -3212,14 +3214,12 @@ export async function openTalerDatabase( case "taler-wallet-main-v9": // We consider this a pre-release // development version, no migration is done. - await metaDb - .mktx((stores) => [stores.metaConfig]) - .runReadWrite(async (tx) => { - await tx.metaConfig.put({ - key: CURRENT_DB_CONFIG_KEY, - value: TALER_WALLET_MAIN_DB_NAME, - }); + await metaDb.runReadWriteTx(["metaConfig"], async (tx) => { + await tx.metaConfig.put({ + key: CURRENT_DB_CONFIG_KEY, + value: TALER_WALLET_MAIN_DB_NAME, }); + }); break; default: throw Error( diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index 13578adda..3ff9d8064 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -37,13 +37,14 @@ import { } from "@gnu-taler/taler-util"; import { HttpRequestLibrary } from "@gnu-taler/taler-util/http"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; -import { WalletStoresV1 } from "./db.js"; -import { TaskScheduler } from "./shepherd.js"; import { - DbAccess, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "./util/query.js"; + WalletDbAllStoresReadOnlyTransaction, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, + WalletStoresV1, +} from "./db.js"; +import { TaskScheduler } from "./shepherd.js"; +import { DbAccess } from "./util/query.js"; import { TimerGroup } from "./util/timer.js"; import { WalletConfig } from "./wallet-api-types.js"; @@ -62,12 +63,9 @@ export interface MerchantInfo { export interface RecoupOperations { createRecoupGroup( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - recoupGroups: typeof WalletStoresV1.recoupGroups; - denominations: typeof WalletStoresV1.denominations; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coins: typeof WalletStoresV1.coins; - }>, + tx: WalletDbReadWriteTransaction< + ["recoupGroups", "denominations", "refreshGroups"] + >, exchangeBaseUrl: string, coinPubs: string[], ): Promise; @@ -106,15 +104,13 @@ export interface InternalWalletState { getTransactionState( ws: InternalWalletState, - tx: GetReadOnlyAccess, + tx: WalletDbAllStoresReadOnlyTransaction, transactionId: string, ): Promise; getDenomInfo( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - denominations: typeof WalletStoresV1.denominations; - }>, + tx: WalletDbReadOnlyTransaction<["denominations"]>, exchangeBaseUrl: string, denomPubHash: string, ): Promise; diff --git a/packages/taler-wallet-core/src/operations/attention.ts b/packages/taler-wallet-core/src/operations/attention.ts index 92d69e93e..7d8b11e79 100644 --- a/packages/taler-wallet-core/src/operations/attention.ts +++ b/packages/taler-wallet-core/src/operations/attention.ts @@ -18,20 +18,18 @@ * Imports. */ import { - AbsoluteTime, AttentionInfo, Logger, - TalerProtocolTimestamp, TalerPreciseTimestamp, UserAttentionByIdRequest, UserAttentionPriority, + UserAttentionUnreadList, UserAttentionsCountResponse, UserAttentionsRequest, UserAttentionsResponse, - UserAttentionUnreadList, } from "@gnu-taler/taler-util"; -import { InternalWalletState } from "../internal-wallet-state.js"; import { timestampPreciseFromDb, timestampPreciseToDb } from "../index.js"; +import { InternalWalletState } from "../internal-wallet-state.js"; const logger = new Logger("operations/attention.ts"); @@ -39,23 +37,21 @@ export async function getUserAttentionsUnreadCount( ws: InternalWalletState, req: UserAttentionsRequest, ): Promise { - const total = await ws.db - .mktx((x) => [x.userAttention]) - .runReadOnly(async (tx) => { - let count = 0; - await tx.userAttention.iter().forEach((x) => { - if ( - req.priority !== undefined && - UserAttentionPriority[x.info.type] !== req.priority - ) - return; - if (x.read !== undefined) return; - count++; - }); - - return count; + const total = await ws.db.runReadOnlyTx(["userAttention"], async (tx) => { + let count = 0; + await tx.userAttention.iter().forEach((x) => { + if ( + req.priority !== undefined && + UserAttentionPriority[x.info.type] !== req.priority + ) + return; + if (x.read !== undefined) return; + count++; }); + return count; + }); + return { total }; } @@ -63,41 +59,37 @@ export async function getUserAttentions( ws: InternalWalletState, req: UserAttentionsRequest, ): Promise { - return await ws.db - .mktx((x) => [x.userAttention]) - .runReadOnly(async (tx) => { - const pending: UserAttentionUnreadList = []; - await tx.userAttention.iter().forEach((x) => { - if ( - req.priority !== undefined && - UserAttentionPriority[x.info.type] !== req.priority - ) - return; - pending.push({ - info: x.info, - when: timestampPreciseFromDb(x.created), - read: x.read !== undefined, - }); + return await ws.db.runReadOnlyTx(["userAttention"], async (tx) => { + const pending: UserAttentionUnreadList = []; + await tx.userAttention.iter().forEach((x) => { + if ( + req.priority !== undefined && + UserAttentionPriority[x.info.type] !== req.priority + ) + return; + pending.push({ + info: x.info, + when: timestampPreciseFromDb(x.created), + read: x.read !== undefined, }); - - return { pending }; }); + + return { pending }; + }); } export async function markAttentionRequestAsRead( ws: InternalWalletState, req: UserAttentionByIdRequest, ): Promise { - await ws.db - .mktx((x) => [x.userAttention]) - .runReadWrite(async (tx) => { - const ua = await tx.userAttention.get([req.entityId, req.type]); - if (!ua) throw Error("attention request not found"); - tx.userAttention.put({ - ...ua, - read: timestampPreciseToDb(TalerPreciseTimestamp.now()), - }); + await ws.db.runReadWriteTx(["userAttention"], async (tx) => { + const ua = await tx.userAttention.get([req.entityId, req.type]); + if (!ua) throw Error("attention request not found"); + tx.userAttention.put({ + ...ua, + read: timestampPreciseToDb(TalerPreciseTimestamp.now()), }); + }); } /** @@ -112,16 +104,14 @@ export async function addAttentionRequest( info: AttentionInfo, entityId: string, ): Promise { - await ws.db - .mktx((x) => [x.userAttention]) - .runReadWrite(async (tx) => { - await tx.userAttention.put({ - info, - entityId, - created: timestampPreciseToDb(TalerPreciseTimestamp.now()), - read: undefined, - }); + await ws.db.runReadWriteTx(["userAttention"], async (tx) => { + await tx.userAttention.put({ + info, + entityId, + created: timestampPreciseToDb(TalerPreciseTimestamp.now()), + read: undefined, }); + }); } /** @@ -135,11 +125,9 @@ export async function removeAttentionRequest( ws: InternalWalletState, req: UserAttentionByIdRequest, ): Promise { - await ws.db - .mktx((x) => [x.userAttention]) - .runReadWrite(async (tx) => { - const ua = await tx.userAttention.get([req.entityId, req.type]); - if (!ua) throw Error("attention request not found"); - await tx.userAttention.delete([req.entityId, req.type]); - }); + await ws.db.runReadWriteTx(["userAttention"], async (tx) => { + const ua = await tx.userAttention.get([req.entityId, req.type]); + if (!ua) throw Error("attention request not found"); + await tx.userAttention.delete([req.entityId, req.type]); + }); } diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index e4e4e43f6..948b8eb85 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -79,7 +79,7 @@ import { ConfigRecord, ConfigRecordKey, WalletBackupConfState, - WalletStoresV1, + WalletDbReadOnlyTransaction, timestampOptionalPreciseFromDb, timestampPreciseToDb, } from "../../db.js"; @@ -88,7 +88,6 @@ import { checkDbInvariant, checkLogicInvariant, } from "../../util/invariants.js"; -import { GetReadOnlyAccess } from "../../util/query.js"; import { addAttentionRequest, removeAttentionRequest } from "../attention.js"; import { TaskIdentifiers, @@ -187,11 +186,12 @@ async function runBackupCycleForProvider( ws: InternalWalletState, args: BackupForProviderArgs, ): Promise { - const provider = await ws.db - .mktx((x) => [x.backupProviders]) - .runReadOnly(async (tx) => { + const provider = await ws.db.runReadOnlyTx( + ["backupProviders"], + async (tx) => { return tx.backupProviders.get(args.backupProviderBaseUrl); - }); + }, + ); if (!provider) { logger.warn("provider disappeared"); @@ -248,22 +248,20 @@ async function runBackupCycleForProvider( logger.trace(`sync response status: ${resp.status}`); if (resp.status === HttpStatusCode.NotModified) { - await ws.db - .mktx((x) => [x.backupProviders]) - .runReadWrite(async (tx) => { - const prov = await tx.backupProviders.get(provider.baseUrl); - if (!prov) { - return; - } - prov.lastBackupCycleTimestamp = timestampPreciseToDb( - TalerPreciseTimestamp.now(), - ); - prov.state = { - tag: BackupProviderStateTag.Ready, - nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()), - }; - await tx.backupProviders.put(prov); - }); + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + const prov = await tx.backupProviders.get(provider.baseUrl); + if (!prov) { + return; + } + prov.lastBackupCycleTimestamp = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + prov.state = { + tag: BackupProviderStateTag.Ready, + nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()), + }; + await tx.backupProviders.put(prov); + }); removeAttentionRequest(ws, { entityId: provider.baseUrl, @@ -296,46 +294,42 @@ async function runBackupCycleForProvider( if (res === undefined) { //claimed - await ws.db - .mktx((x) => [x.backupProviders, x.operationRetries]) - .runReadWrite(async (tx) => { - const prov = await tx.backupProviders.get(provider.baseUrl); - if (!prov) { - logger.warn("backup provider not found anymore"); - return; - } - prov.shouldRetryFreshProposal = true; - prov.state = { - tag: BackupProviderStateTag.Retrying, - }; - await tx.backupProviders.put(prov); - }); - - throw Error("not implemented"); - // return { - // type: TaskRunResultType.Pending, - // }; - } - const result = res; - - await ws.db - .mktx((x) => [x.backupProviders, x.operationRetries]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { const prov = await tx.backupProviders.get(provider.baseUrl); if (!prov) { logger.warn("backup provider not found anymore"); return; } - const opId = TaskIdentifiers.forBackup(prov); - //await scheduleRetryInTx(ws, tx, opId); - prov.currentPaymentProposalId = result.proposalId; - prov.shouldRetryFreshProposal = false; + prov.shouldRetryFreshProposal = true; prov.state = { tag: BackupProviderStateTag.Retrying, }; await tx.backupProviders.put(prov); }); + throw Error("not implemented"); + // return { + // type: TaskRunResultType.Pending, + // }; + } + const result = res; + + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + const prov = await tx.backupProviders.get(provider.baseUrl); + if (!prov) { + logger.warn("backup provider not found anymore"); + return; + } + // const opId = TaskIdentifiers.forBackup(prov); + // await scheduleRetryInTx(ws, tx, opId); + prov.currentPaymentProposalId = result.proposalId; + prov.shouldRetryFreshProposal = false; + prov.state = { + tag: BackupProviderStateTag.Retrying, + }; + await tx.backupProviders.put(prov); + }); + addAttentionRequest( ws, { @@ -353,23 +347,21 @@ async function runBackupCycleForProvider( } if (resp.status === HttpStatusCode.NoContent) { - await ws.db - .mktx((x) => [x.backupProviders]) - .runReadWrite(async (tx) => { - const prov = await tx.backupProviders.get(provider.baseUrl); - if (!prov) { - return; - } - prov.lastBackupHash = encodeCrock(currentBackupHash); - prov.lastBackupCycleTimestamp = timestampPreciseToDb( - TalerPreciseTimestamp.now(), - ); - prov.state = { - tag: BackupProviderStateTag.Ready, - nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()), - }; - await tx.backupProviders.put(prov); - }); + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + const prov = await tx.backupProviders.get(provider.baseUrl); + if (!prov) { + return; + } + prov.lastBackupHash = encodeCrock(currentBackupHash); + prov.lastBackupCycleTimestamp = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + prov.state = { + tag: BackupProviderStateTag.Ready, + nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()), + }; + await tx.backupProviders.put(prov); + }); removeAttentionRequest(ws, { entityId: provider.baseUrl, @@ -388,24 +380,22 @@ async function runBackupCycleForProvider( // const blob = await decryptBackup(backupConfig, backupEnc); // FIXME: Re-implement backup import with merging // await importBackup(ws, blob, cryptoData); - await ws.db - .mktx((x) => [x.backupProviders, x.operationRetries]) - .runReadWrite(async (tx) => { - const prov = await tx.backupProviders.get(provider.baseUrl); - if (!prov) { - logger.warn("backup provider not found anymore"); - return; - } - prov.lastBackupHash = encodeCrock(hash(backupEnc)); - // FIXME: Allocate error code for this situation? - // FIXME: Add operation retry record! - const opId = TaskIdentifiers.forBackup(prov); - //await scheduleRetryInTx(ws, tx, opId); - prov.state = { - tag: BackupProviderStateTag.Retrying, - }; - await tx.backupProviders.put(prov); - }); + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + const prov = await tx.backupProviders.get(provider.baseUrl); + if (!prov) { + logger.warn("backup provider not found anymore"); + return; + } + prov.lastBackupHash = encodeCrock(hash(backupEnc)); + // FIXME: Allocate error code for this situation? + // FIXME: Add operation retry record! + const opId = TaskIdentifiers.forBackup(prov); + //await scheduleRetryInTx(ws, tx, opId); + prov.state = { + tag: BackupProviderStateTag.Retrying, + }; + await tx.backupProviders.put(prov); + }); logger.info("processed existing backup"); // Now upload our own, merged backup. return await runBackupCycleForProvider(ws, args); @@ -427,11 +417,12 @@ export async function processBackupForProvider( ws: InternalWalletState, backupProviderBaseUrl: string, ): Promise { - const provider = await ws.db - .mktx((x) => [x.backupProviders]) - .runReadOnly(async (tx) => { + const provider = await ws.db.runReadOnlyTx( + ["backupProviders"], + async (tx) => { return await tx.backupProviders.get(backupProviderBaseUrl); - }); + }, + ); if (!provider) { throw Error("unknown backup provider"); } @@ -457,11 +448,9 @@ export async function removeBackupProvider( ws: InternalWalletState, req: RemoveBackupProviderRequest, ): Promise { - await ws.db - .mktx((x) => [x.backupProviders]) - .runReadWrite(async (tx) => { - await tx.backupProviders.delete(req.provider); - }); + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + await tx.backupProviders.delete(req.provider); + }); } export interface RunBackupCycleRequest { @@ -487,9 +476,9 @@ export async function runBackupCycle( ws: InternalWalletState, req: RunBackupCycleRequest, ): Promise { - const providers = await ws.db - .mktx((x) => [x.backupProviders]) - .runReadOnly(async (tx) => { + const providers = await ws.db.runReadOnlyTx( + ["backupProviders"], + async (tx) => { if (req.providers) { const rs = await Promise.all( req.providers.map((id) => tx.backupProviders.get(id)), @@ -497,7 +486,8 @@ export async function runBackupCycle( return rs.filter(notEmpty); } return await tx.backupProviders.iter().toArray(); - }); + }, + ); for (const provider of providers) { await runBackupCycleForProvider(ws, { @@ -587,62 +577,56 @@ export async function addBackupProvider( logger.info(`adding backup provider ${j2s(req)}`); await provideBackupState(ws); const canonUrl = canonicalizeBaseUrl(req.backupProviderBaseUrl); - await ws.db - .mktx((x) => [x.backupProviders]) - .runReadWrite(async (tx) => { - const oldProv = await tx.backupProviders.get(canonUrl); - if (oldProv) { - logger.info("old backup provider found"); - if (req.activate) { - oldProv.state = { - tag: BackupProviderStateTag.Ready, - nextBackupTimestamp: timestampPreciseToDb( - TalerPreciseTimestamp.now(), - ), - }; - logger.info("setting existing backup provider to active"); - await tx.backupProviders.put(oldProv); - } - return; - } - }); - const termsUrl = new URL("config", canonUrl); - const resp = await ws.http.fetch(termsUrl.href); - const terms = await readSuccessResponseJsonOrThrow( - resp, - codecForSyncTermsOfServiceResponse(), - ); - await ws.db - .mktx((x) => [x.backupProviders]) - .runReadWrite(async (tx) => { - let state: BackupProviderState; - //FIXME: what is the difference provisional and ready? + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + const oldProv = await tx.backupProviders.get(canonUrl); + if (oldProv) { + logger.info("old backup provider found"); if (req.activate) { - state = { + oldProv.state = { tag: BackupProviderStateTag.Ready, nextBackupTimestamp: timestampPreciseToDb( TalerPreciseTimestamp.now(), ), }; - } else { - state = { - tag: BackupProviderStateTag.Provisional, - }; + logger.info("setting existing backup provider to active"); + await tx.backupProviders.put(oldProv); } - await tx.backupProviders.put({ - state, - name: req.name, - terms: { - annualFee: terms.annual_fee, - storageLimitInMegabytes: terms.storage_limit_in_megabytes, - supportedProtocolVersion: terms.version, - }, - shouldRetryFreshProposal: false, - paymentProposalIds: [], - baseUrl: canonUrl, - uids: [encodeCrock(getRandomBytes(32))], - }); + return; + } + }); + const termsUrl = new URL("config", canonUrl); + const resp = await ws.http.fetch(termsUrl.href); + const terms = await readSuccessResponseJsonOrThrow( + resp, + codecForSyncTermsOfServiceResponse(), + ); + await ws.db.runReadWriteTx(["backupProviders"], async (tx) => { + let state: BackupProviderState; + //FIXME: what is the difference provisional and ready? + if (req.activate) { + state = { + tag: BackupProviderStateTag.Ready, + nextBackupTimestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()), + }; + } else { + state = { + tag: BackupProviderStateTag.Provisional, + }; + } + await tx.backupProviders.put({ + state, + name: req.name, + terms: { + annualFee: terms.annual_fee, + storageLimitInMegabytes: terms.storage_limit_in_megabytes, + supportedProtocolVersion: terms.version, + }, + shouldRetryFreshProposal: false, + paymentProposalIds: [], + baseUrl: canonUrl, + uids: [encodeCrock(getRandomBytes(32))], }); + }); return await runFirstBackupCycleForProvider(ws, { backupProviderBaseUrl: canonUrl, @@ -827,9 +811,9 @@ export async function getBackupInfo( ws: InternalWalletState, ): Promise { const backupConfig = await provideBackupState(ws); - const providerRecords = await ws.db - .mktx((x) => [x.backupProviders, x.operationRetries]) - .runReadOnly(async (tx) => { + const providerRecords = await ws.db.runReadOnlyTx( + ["backupProviders", "operationRetries"], + async (tx) => { return await tx.backupProviders.iter().mapAsync(async (bp) => { const opId = TaskIdentifiers.forBackup(bp); const retryRecord = await tx.operationRetries.get(opId); @@ -838,7 +822,8 @@ export async function getBackupInfo( retryRecord, }; }); - }); + }, + ); const providers: ProviderInfo[] = []; for (const x of providerRecords) { providers.push({ @@ -872,11 +857,12 @@ export async function getBackupRecovery( ws: InternalWalletState, ): Promise { const bs = await provideBackupState(ws); - const providers = await ws.db - .mktx((x) => [x.backupProviders]) - .runReadOnly(async (tx) => { + const providers = await ws.db.runReadOnlyTx( + ["backupProviders"], + async (tx) => { return await tx.backupProviders.iter().toArray(); - }); + }, + ); return { providers: providers .filter((x) => x.state.tag !== BackupProviderStateTag.Provisional) @@ -894,50 +880,48 @@ async function backupRecoveryTheirs( ws: InternalWalletState, br: BackupRecovery, ) { - await ws.db - .mktx((x) => [x.config, x.backupProviders]) - .runReadWrite(async (tx) => { - let backupStateEntry: ConfigRecord | undefined = await tx.config.get( - ConfigRecordKey.WalletBackupState, - ); - checkDbInvariant(!!backupStateEntry); - checkDbInvariant( - backupStateEntry.key === ConfigRecordKey.WalletBackupState, - ); - backupStateEntry.value.lastBackupNonce = undefined; - backupStateEntry.value.lastBackupTimestamp = undefined; - backupStateEntry.value.lastBackupCheckTimestamp = undefined; - backupStateEntry.value.lastBackupPlainHash = undefined; - backupStateEntry.value.walletRootPriv = br.walletRootPriv; - backupStateEntry.value.walletRootPub = encodeCrock( - eddsaGetPublic(decodeCrock(br.walletRootPriv)), - ); - await tx.config.put(backupStateEntry); - for (const prov of br.providers) { - const existingProv = await tx.backupProviders.get(prov.url); - if (!existingProv) { - await tx.backupProviders.put({ - baseUrl: prov.url, - name: prov.name, - paymentProposalIds: [], - shouldRetryFreshProposal: false, - state: { - tag: BackupProviderStateTag.Ready, - nextBackupTimestamp: timestampPreciseToDb( - TalerPreciseTimestamp.now(), - ), - }, - uids: [encodeCrock(getRandomBytes(32))], - }); - } - } - const providers = await tx.backupProviders.iter().toArray(); - for (const prov of providers) { - prov.lastBackupCycleTimestamp = undefined; - prov.lastBackupHash = undefined; - await tx.backupProviders.put(prov); + await ws.db.runReadWriteTx(["backupProviders", "config"], async (tx) => { + let backupStateEntry: ConfigRecord | undefined = await tx.config.get( + ConfigRecordKey.WalletBackupState, + ); + checkDbInvariant(!!backupStateEntry); + checkDbInvariant( + backupStateEntry.key === ConfigRecordKey.WalletBackupState, + ); + backupStateEntry.value.lastBackupNonce = undefined; + backupStateEntry.value.lastBackupTimestamp = undefined; + backupStateEntry.value.lastBackupCheckTimestamp = undefined; + backupStateEntry.value.lastBackupPlainHash = undefined; + backupStateEntry.value.walletRootPriv = br.walletRootPriv; + backupStateEntry.value.walletRootPub = encodeCrock( + eddsaGetPublic(decodeCrock(br.walletRootPriv)), + ); + await tx.config.put(backupStateEntry); + for (const prov of br.providers) { + const existingProv = await tx.backupProviders.get(prov.url); + if (!existingProv) { + await tx.backupProviders.put({ + baseUrl: prov.url, + name: prov.name, + paymentProposalIds: [], + shouldRetryFreshProposal: false, + state: { + tag: BackupProviderStateTag.Ready, + nextBackupTimestamp: timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ), + }, + uids: [encodeCrock(getRandomBytes(32))], + }); } - }); + } + const providers = await tx.backupProviders.iter().toArray(); + for (const prov of providers) { + prov.lastBackupCycleTimestamp = undefined; + prov.lastBackupHash = undefined; + await tx.backupProviders.put(prov); + } + }); } async function backupRecoveryOurs(ws: InternalWalletState, br: BackupRecovery) { @@ -949,11 +933,12 @@ export async function loadBackupRecovery( br: RecoveryLoadRequest, ): Promise { const bs = await provideBackupState(ws); - const providers = await ws.db - .mktx((x) => [x.backupProviders]) - .runReadOnly(async (tx) => { + const providers = await ws.db.runReadOnlyTx( + ["backupProviders"], + async (tx) => { return await tx.backupProviders.iter().toArray(); - }); + }, + ); let strategy = br.strategy; if ( br.recovery.walletRootPriv != bs.walletRootPriv && @@ -996,11 +981,12 @@ export async function decryptBackup( export async function provideBackupState( ws: InternalWalletState, ): Promise { - const bs: ConfigRecord | undefined = await ws.db - .mktx((stores) => [stores.config]) - .runReadOnly(async (tx) => { + const bs: ConfigRecord | undefined = await ws.db.runReadOnlyTx( + ["config"], + async (tx) => { return await tx.config.get(ConfigRecordKey.WalletBackupState); - }); + }, + ); if (bs) { checkDbInvariant(bs.key === ConfigRecordKey.WalletBackupState); return bs.value; @@ -1012,34 +998,32 @@ export async function provideBackupState( // FIXME: device ID should be configured when wallet is initialized // and be based on hostname const deviceId = `wallet-core-${encodeCrock(d)}`; - return await ws.db - .mktx((x) => [x.config]) - .runReadWrite(async (tx) => { - let backupStateEntry: ConfigRecord | undefined = await tx.config.get( - ConfigRecordKey.WalletBackupState, - ); - if (!backupStateEntry) { - backupStateEntry = { - key: ConfigRecordKey.WalletBackupState, - value: { - deviceId, - walletRootPub: k.pub, - walletRootPriv: k.priv, - lastBackupPlainHash: undefined, - }, - }; - await tx.config.put(backupStateEntry); - } - checkDbInvariant( - backupStateEntry.key === ConfigRecordKey.WalletBackupState, - ); - return backupStateEntry.value; - }); + return await ws.db.runReadWriteTx(["config"], async (tx) => { + let backupStateEntry: ConfigRecord | undefined = await tx.config.get( + ConfigRecordKey.WalletBackupState, + ); + if (!backupStateEntry) { + backupStateEntry = { + key: ConfigRecordKey.WalletBackupState, + value: { + deviceId, + walletRootPub: k.pub, + walletRootPriv: k.priv, + lastBackupPlainHash: undefined, + }, + }; + await tx.config.put(backupStateEntry); + } + checkDbInvariant( + backupStateEntry.key === ConfigRecordKey.WalletBackupState, + ); + return backupStateEntry.value; + }); } export async function getWalletBackupState( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ config: typeof WalletStoresV1.config }>, + tx: WalletDbReadOnlyTransaction<["config"]>, ): Promise { const bs = await tx.config.get(ConfigRecordKey.WalletBackupState); checkDbInvariant(!!bs, "wallet backup state should be in DB"); @@ -1052,21 +1036,19 @@ export async function setWalletDeviceId( deviceId: string, ): Promise { await provideBackupState(ws); - await ws.db - .mktx((x) => [x.config]) - .runReadWrite(async (tx) => { - let backupStateEntry: ConfigRecord | undefined = await tx.config.get( - ConfigRecordKey.WalletBackupState, - ); - if ( - !backupStateEntry || - backupStateEntry.key !== ConfigRecordKey.WalletBackupState - ) { - return; - } - backupStateEntry.value.deviceId = deviceId; - await tx.config.put(backupStateEntry); - }); + await ws.db.runReadWriteTx(["config"], async (tx) => { + let backupStateEntry: ConfigRecord | undefined = await tx.config.get( + ConfigRecordKey.WalletBackupState, + ); + if ( + !backupStateEntry || + backupStateEntry.key !== ConfigRecordKey.WalletBackupState + ) { + return; + } + backupStateEntry.value.deviceId = deviceId; + await tx.config.put(backupStateEntry); + }); } export async function getWalletDeviceId( diff --git a/packages/taler-wallet-core/src/operations/balance.ts b/packages/taler-wallet-core/src/operations/balance.ts index a73476e9c..12f9795b2 100644 --- a/packages/taler-wallet-core/src/operations/balance.ts +++ b/packages/taler-wallet-core/src/operations/balance.ts @@ -71,14 +71,12 @@ import { OPERATION_STATUS_ACTIVE_LAST, RefreshGroupRecord, RefreshOperationStatus, - WalletDbReadOnlyTransactionArr, - WalletStoresV1, + WalletDbReadOnlyTransaction, WithdrawalGroupStatus, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkLogicInvariant } from "../util/invariants.js"; -import { GetReadOnlyAccess } from "../util/query.js"; import { getExchangeScopeInfo, getExchangeWireDetailsInTx, @@ -134,7 +132,7 @@ class BalancesStore { constructor( private ws: InternalWalletState, - private tx: WalletDbReadOnlyTransactionArr< + private tx: WalletDbReadOnlyTransaction< [ "globalCurrencyAuditors", "globalCurrencyExchanges", @@ -275,7 +273,7 @@ class BalancesStore { */ export async function getBalancesInsideTransaction( ws: InternalWalletState, - tx: WalletDbReadOnlyTransactionArr< + tx: WalletDbReadOnlyTransaction< [ "exchanges", "exchangeDetails", @@ -465,81 +463,79 @@ export async function getAcceptableExchangeBaseUrls( ): Promise { const acceptableExchangeUrls = new Set(); const depositableExchangeUrls = new Set(); - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - // FIXME: We should have a DB index to look up all exchanges - // for a particular auditor ... - - const canonExchanges = new Set(); - const canonAuditors = new Set(); - - for (const exchangeHandle of req.acceptedExchanges) { - const normUrl = canonicalizeBaseUrl(exchangeHandle.exchangeBaseUrl); - canonExchanges.add(normUrl); - } + await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { + // FIXME: We should have a DB index to look up all exchanges + // for a particular auditor ... - for (const auditorHandle of req.acceptedAuditors) { - const normUrl = canonicalizeBaseUrl(auditorHandle.auditorBaseUrl); - canonAuditors.add(normUrl); - } + const canonExchanges = new Set(); + const canonAuditors = new Set(); - await tx.exchanges.iter().forEachAsync(async (exchange) => { - const dp = exchange.detailsPointer; - if (!dp) { - return; - } - const { currency, masterPublicKey } = dp; - const exchangeDetails = await tx.exchangeDetails.indexes.byPointer.get([ - exchange.baseUrl, - currency, - masterPublicKey, - ]); - if (!exchangeDetails) { - return; - } + for (const exchangeHandle of req.acceptedExchanges) { + const normUrl = canonicalizeBaseUrl(exchangeHandle.exchangeBaseUrl); + canonExchanges.add(normUrl); + } - let acceptable = false; + for (const auditorHandle of req.acceptedAuditors) { + const normUrl = canonicalizeBaseUrl(auditorHandle.auditorBaseUrl); + canonAuditors.add(normUrl); + } - if (canonExchanges.has(exchange.baseUrl)) { + await tx.exchanges.iter().forEachAsync(async (exchange) => { + const dp = exchange.detailsPointer; + if (!dp) { + return; + } + const { currency, masterPublicKey } = dp; + const exchangeDetails = await tx.exchangeDetails.indexes.byPointer.get([ + exchange.baseUrl, + currency, + masterPublicKey, + ]); + if (!exchangeDetails) { + return; + } + + let acceptable = false; + + if (canonExchanges.has(exchange.baseUrl)) { + acceptableExchangeUrls.add(exchange.baseUrl); + acceptable = true; + } + for (const exchangeAuditor of exchangeDetails.auditors) { + if (canonAuditors.has(exchangeAuditor.auditor_url)) { acceptableExchangeUrls.add(exchange.baseUrl); acceptable = true; + break; } - for (const exchangeAuditor of exchangeDetails.auditors) { - if (canonAuditors.has(exchangeAuditor.auditor_url)) { - acceptableExchangeUrls.add(exchange.baseUrl); - acceptable = true; + } + + if (!acceptable) { + return; + } + // FIXME: Also consider exchange and auditor public key + // instead of just base URLs? + + let wireMethodSupported = false; + for (const acc of exchangeDetails.wireInfo.accounts) { + const pp = parsePaytoUri(acc.payto_uri); + checkLogicInvariant(!!pp); + for (const wm of req.acceptedWireMethods) { + if (pp.targetType === wm) { + wireMethodSupported = true; break; } - } - - if (!acceptable) { - return; - } - // FIXME: Also consider exchange and auditor public key - // instead of just base URLs? - - let wireMethodSupported = false; - for (const acc of exchangeDetails.wireInfo.accounts) { - const pp = parsePaytoUri(acc.payto_uri); - checkLogicInvariant(!!pp); - for (const wm of req.acceptedWireMethods) { - if (pp.targetType === wm) { - wireMethodSupported = true; - break; - } - if (wireMethodSupported) { - break; - } + if (wireMethodSupported) { + break; } } + } - acceptableExchangeUrls.add(exchange.baseUrl); - if (wireMethodSupported) { - depositableExchangeUrls.add(exchange.baseUrl); - } - }); + acceptableExchangeUrls.add(exchange.baseUrl); + if (wireMethodSupported) { + depositableExchangeUrls.add(exchange.baseUrl); + } }); + }); return { acceptableExchanges: [...acceptableExchangeUrls], depositableExchanges: [...depositableExchangeUrls], @@ -587,15 +583,9 @@ export async function getMerchantPaymentBalanceDetails( balanceMerchantDepositable: Amounts.zeroOfCurrency(req.currency), }; - await ws.db - .mktx((x) => [ - x.coins, - x.coinAvailability, - x.refreshGroups, - x.purchases, - x.withdrawalGroups, - ]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + ["coinAvailability", "refreshGroups"], + async (tx) => { await tx.coinAvailability.iter().forEach((ca) => { if (ca.currency != req.currency) { return; @@ -638,7 +628,8 @@ export async function getMerchantPaymentBalanceDetails( computeRefreshGroupAvailableAmount(r), ).amount; }); - }); + }, + ); return d; } @@ -649,27 +640,25 @@ export async function getBalanceDetail( ): Promise { const exchanges: { exchangeBaseUrl: string; exchangePub: string }[] = []; const wires = new Array(); - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - const allExchanges = await tx.exchanges.iter().toArray(); - for (const e of allExchanges) { - const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); - if (!details || req.currency !== details.currency) { - continue; - } - details.wireInfo.accounts.forEach((a) => { - const payto = parsePaytoUri(a.payto_uri); - if (payto && !wires.includes(payto.targetType)) { - wires.push(payto.targetType); - } - }); - exchanges.push({ - exchangePub: details.masterPublicKey, - exchangeBaseUrl: e.baseUrl, - }); + await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || req.currency !== details.currency) { + continue; } - }); + details.wireInfo.accounts.forEach((a) => { + const payto = parsePaytoUri(a.payto_uri); + if (payto && !wires.includes(payto.targetType)) { + wires.push(payto.targetType); + } + }); + exchanges.push({ + exchangePub: details.masterPublicKey, + exchangeBaseUrl: e.baseUrl, + }); + } + }); return await getMerchantPaymentBalanceDetails(ws, { currency: req.currency, @@ -699,10 +688,7 @@ export interface PeerPaymentBalanceDetails { export async function getPeerPaymentBalanceDetailsInTx( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - coinAvailability: typeof WalletStoresV1.coinAvailability; - refreshGroups: typeof WalletStoresV1.refreshGroups; - }>, + tx: WalletDbReadOnlyTransaction<["coinAvailability", "refreshGroups"]>, req: PeerPaymentRestrictionsForBalance, ): Promise { let balanceAvailable = Amounts.zeroOfCurrency(req.currency); diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 92950b35b..6bafa632e 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -53,7 +53,7 @@ import { RecoupGroupRecord, RefreshGroupRecord, RewardRecord, - WalletStoresV1, + WalletDbReadWriteTransaction, WithdrawalGroupRecord, timestampPreciseToDb, } from "../db.js"; @@ -61,7 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { GetReadWriteAccess } from "../util/query.js"; import { createRefreshGroup } from "./refresh.js"; const logger = new Logger("operations/common.ts"); @@ -78,10 +77,7 @@ export interface CoinsSpendInfo { export async function makeCoinsVisible( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - coins: typeof WalletStoresV1.coins; - coinAvailability: typeof WalletStoresV1.coinAvailability; - }>, + tx: WalletDbReadWriteTransaction<["coins", "coinAvailability"]>, transactionId: string, ): Promise { const coins = @@ -109,11 +105,9 @@ export async function makeCoinsVisible( export async function makeCoinAvailable( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - coins: typeof WalletStoresV1.coins; - coinAvailability: typeof WalletStoresV1.coinAvailability; - denominations: typeof WalletStoresV1.denominations; - }>, + tx: WalletDbReadWriteTransaction< + ["coins", "coinAvailability", "denominations"] + >, coinRecord: CoinRecord, ): Promise { checkLogicInvariant(coinRecord.status === CoinStatus.Fresh); @@ -150,12 +144,9 @@ export async function makeCoinAvailable( export async function spendCoins( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - coins: typeof WalletStoresV1.coins; - coinAvailability: typeof WalletStoresV1.coinAvailability; - refreshGroups: typeof WalletStoresV1.refreshGroups; - denominations: typeof WalletStoresV1.denominations; - }>, + tx: WalletDbReadWriteTransaction< + ["coins", "coinAvailability", "refreshGroups", "denominations"] + >, csi: CoinsSpendInfo, ): Promise { if (csi.coinPubs.length != csi.contributions.length) { diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index decef7375..415f3cd72 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -133,25 +133,23 @@ export class DepositTransactionContext implements TransactionContext { const ws = this.ws; // FIXME: We should check first if we are in a final state // where deletion is allowed. - await ws.db - .mktx((x) => [x.depositGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const tipRecord = await tx.depositGroups.get(depositGroupId); - if (tipRecord) { - await tx.depositGroups.delete(depositGroupId); - await tx.tombstones.put({ - id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, - }); - } - }); + await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => { + const tipRecord = await tx.depositGroups.get(depositGroupId); + if (tipRecord) { + await tx.depositGroups.delete(depositGroupId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, + }); + } + }); return; } async suspendTransaction(): Promise { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -184,16 +182,17 @@ export class DepositTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -218,7 +217,8 @@ export class DepositTransactionContext implements TransactionContext { return undefined; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); @@ -226,9 +226,9 @@ export class DepositTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -261,16 +261,17 @@ export class DepositTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -291,7 +292,8 @@ export class DepositTransactionContext implements TransactionContext { } } return undefined; - }); + }, + ); // FIXME: Also cancel ongoing work (via cancellation token, once implemented) ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); @@ -418,9 +420,9 @@ async function waitForRefreshOnDepositGroup( tag: TransactionType.Deposit, depositGroupId: depositGroup.depositGroupId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups, x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups", "refreshGroups"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: DepositOperationStatus | undefined; if (!refreshGroup) { @@ -449,7 +451,8 @@ async function waitForRefreshOnDepositGroup( return { oldTxState, newTxState }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); @@ -469,13 +472,14 @@ async function refundDepositGroup( break; default: { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; - const coinExchange = await ws.db - .mktx((x) => [x.coins]) - .runReadOnly(async (tx) => { + const coinExchange = await ws.db.runReadOnlyTx( + ["coins"], + async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; - }); + }, + ); const refundAmount = depositGroup.payCoinSelection.coinContributions[i]; // We use a constant refund transaction ID, since there can // only be one refund. @@ -529,15 +533,15 @@ async function refundDepositGroup( const currency = Amounts.currencyOf(depositGroup.totalPayCost); - await ws.db - .mktx((x) => [ - x.depositGroups, - x.refreshGroups, - x.coins, - x.denominations, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "depositGroups", + "refreshGroups", + "coins", + "denominations", + "coinAvailability", + ], + async (tx) => { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; @@ -565,7 +569,8 @@ async function refundDepositGroup( newDg.abortRefreshGroupId = rgid.refreshGroupId; } await tx.depositGroups.put(newDg); - }); + }, + ); return TaskRunResult.backoff(); } @@ -622,9 +627,9 @@ async function processDepositGroupPendingKyc( // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const newDg = await tx.depositGroups.get(depositGroupId); if (!newDg) { return; @@ -637,7 +642,8 @@ async function processDepositGroupPendingKyc( const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? @@ -680,9 +686,9 @@ async function transitionToKycRequired( } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -700,7 +706,8 @@ async function transitionToKycRequired( await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } else { @@ -717,13 +724,14 @@ async function processDepositGroupPendingTrack( for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; // FIXME: Make the URL part of the coin selection? - const exchangeBaseUrl = await ws.db - .mktx((x) => [x.coins]) - .runReadWrite(async (tx) => { + const exchangeBaseUrl = await ws.db.runReadWriteTx( + ["coins"], + async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; - }); + }, + ); let updatedTxStatus: DepositElementStatus | undefined = undefined; let newWiredCoin: @@ -793,41 +801,39 @@ async function processDepositGroupPendingTrack( } if (updatedTxStatus !== undefined) { - await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.depositGroups.get(depositGroupId); - if (!dg) { - return; - } - if (updatedTxStatus !== undefined) { - dg.statusPerCoin[i] = updatedTxStatus; - } - if (newWiredCoin) { - /** - * FIXME: if there is a new wire information from the exchange - * it should add up to the previous tracking states. - * - * This may loose information by overriding prev state. - * - * And: add checks to integration tests - */ - if (!dg.trackingState) { - dg.trackingState = {}; - } - - dg.trackingState[newWiredCoin.id] = newWiredCoin.value; + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + if (updatedTxStatus !== undefined) { + dg.statusPerCoin[i] = updatedTxStatus; + } + if (newWiredCoin) { + /** + * FIXME: if there is a new wire information from the exchange + * it should add up to the previous tracking states. + * + * This may loose information by overriding prev state. + * + * And: add checks to integration tests + */ + if (!dg.trackingState) { + dg.trackingState = {}; } - await tx.depositGroups.put(dg); - }); + + dg.trackingState[newWiredCoin.id] = newWiredCoin.value; + } + await tx.depositGroups.put(dg); + }); } } let allWired = true; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -848,7 +854,8 @@ async function processDepositGroupPendingTrack( } const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, @@ -869,11 +876,12 @@ async function processDepositGroupPendingDeposit( ): Promise { logger.info("processing deposit group in pending(deposit)"); const depositGroupId = depositGroup.depositGroupId; - const contractTermsRec = await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(async (tx) => { + const contractTermsRec = await ws.db.runReadOnlyTx( + ["contractTerms"], + async (tx) => { return tx.contractTerms.get(depositGroup.contractTermsHash); - }); + }, + ); if (!contractTermsRec) { throw Error("contract terms for deposit not found in database"); } @@ -954,27 +962,25 @@ async function processDepositGroupPendingDeposit( codecForBatchDepositSuccess(), ); - await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.depositGroups.get(depositGroupId); - if (!dg) { - return; - } - for (const batchIndex of batchIndexes) { - const coinStatus = dg.statusPerCoin[batchIndex]; - switch (coinStatus) { - case DepositElementStatus.DepositPending: - dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; - await tx.depositGroups.put(dg); - } + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + for (const batchIndex of batchIndexes) { + const coinStatus = dg.statusPerCoin[batchIndex]; + switch (coinStatus) { + case DepositElementStatus.DepositPending: + dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; + await tx.depositGroups.put(dg); } - }); + } + }); } - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -984,7 +990,8 @@ async function processDepositGroupPendingDeposit( await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); @@ -998,11 +1005,12 @@ export async function processDepositGroup( depositGroupId: string, cancellationToken: CancellationToken, ): Promise { - const depositGroup = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadOnly(async (tx) => { + const depositGroup = await ws.db.runReadOnlyTx( + ["depositGroups"], + async (tx) => { return tx.depositGroups.get(depositGroupId); - }); + }, + ); if (!depositGroup) { logger.warn(`deposit group ${depositGroupId} not found`); return TaskRunResult.finished(); @@ -1030,15 +1038,18 @@ export async function processDepositGroup( return TaskRunResult.finished(); } +/** + * FIXME: Consider moving this to exchanges.ts. + */ async function getExchangeWireFee( ws: InternalWalletState, wireType: string, baseUrl: string, time: TalerProtocolTimestamp, ): Promise { - const exchangeDetails = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + const exchangeDetails = await ws.db.runReadOnlyTx( + ["exchangeDetails", "exchanges"], + async (tx) => { const ex = await tx.exchanges.get(baseUrl); if (!ex || !ex.detailsPointer) return undefined; return await tx.exchangeDetails.indexes.byPointer.get([ @@ -1046,7 +1057,8 @@ async function getExchangeWireFee( ex.detailsPointer.currency, ex.detailsPointer.masterPublicKey, ]); - }); + }, + ); if (!exchangeDetails) { throw Error(`exchange missing: ${baseUrl}`); @@ -1141,21 +1153,19 @@ export async function prepareDepositGroup( const exchangeInfos: { url: string; master_pub: string }[] = []; - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - const allExchanges = await tx.exchanges.iter().toArray(); - for (const e of allExchanges) { - const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); - if (!details || amount.currency !== details.currency) { - continue; - } - exchangeInfos.push({ - master_pub: details.masterPublicKey, - url: e.baseUrl, - }); + await ws.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; } - }); + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); const now = AbsoluteTime.now(); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); @@ -1255,21 +1265,19 @@ export async function createDepositGroup( const exchangeInfos: { url: string; master_pub: string }[] = []; - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - const allExchanges = await tx.exchanges.iter().toArray(); - for (const e of allExchanges) { - const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); - if (!details || amount.currency !== details.currency) { - continue; - } - exchangeInfos.push({ - master_pub: details.masterPublicKey, - url: e.baseUrl, - }); + await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; } - }); + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); const now = AbsoluteTime.now(); const wireDeadline = AbsoluteTime.toProtocolTimestamp( @@ -1388,17 +1396,17 @@ export async function createDepositGroup( const ctx = new DepositTransactionContext(ws, depositGroupId); const transactionId = ctx.transactionId; - const newTxState = await ws.db - .mktx((x) => [ - x.depositGroups, - x.coins, - x.recoupGroups, - x.denominations, - x.refreshGroups, - x.coinAvailability, - x.contractTerms, - ]) - .runReadWrite(async (tx) => { + const newTxState = await ws.db.runReadWriteTx( + [ + "depositGroups", + "coins", + "recoupGroups", + "denominations", + "refreshGroups", + "coinAvailability", + "contractTerms", + ], + async (tx) => { await spendCoins(ws, tx, { allocationId: transactionId, coinPubs: payCoinSel.coinSel.coinPubs, @@ -1413,7 +1421,8 @@ export async function createDepositGroup( h: contractTermsHash, }); return computeDepositTransactionStatus(depositGroup); - }); + }, + ); ws.notify({ type: NotificationType.TransactionStateTransition, @@ -1450,9 +1459,9 @@ export async function getCounterpartyEffectiveDepositAmount( const fees: AmountJson[] = []; const exchangeSet: Set = new Set(); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchangeDetails", "exchanges"], + async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { @@ -1495,7 +1504,8 @@ export async function getCounterpartyEffectiveDepositAmount( fees.push(Amounts.parseOrThrow(fee)); } } - }); + }, + ); return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } @@ -1515,9 +1525,9 @@ async function getTotalFeesForDepositAmount( const exchangeSet: Set = new Set(); const currency = Amounts.currencyOf(total); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchanges", "exchangeDetails"], + async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { @@ -1575,7 +1585,8 @@ async function getTotalFeesForDepositAmount( wireFee.push(Amounts.parseOrThrow(fee)); } } - }); + }, + ); return { coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount), diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index 460b47e73..678e48fb9 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -99,8 +99,8 @@ import { ExchangeEntryDbRecordStatus, ExchangeEntryDbUpdateStatus, PendingTaskType, - WalletDbReadOnlyTransactionArr, - WalletDbReadWriteTransactionArr, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, createRefreshGroup, createTimeline, isWithdrawableDenom, @@ -115,11 +115,7 @@ import { } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { - DbReadOnlyTransactionArr, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "../util/query.js"; +import { DbReadOnlyTransaction } from "../util/query.js"; import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js"; import { TaskIdentifiers, @@ -197,10 +193,7 @@ async function downloadExchangeWithTermsOfService( * Get exchange details from the database. */ async function getExchangeRecordsInternal( - tx: GetReadOnlyAccess<{ - exchanges: typeof WalletStoresV1.exchanges; - exchangeDetails: typeof WalletStoresV1.exchangeDetails; - }>, + tx: WalletDbReadOnlyTransaction<["exchanges", "exchangeDetails"]>, exchangeBaseUrl: string, ): Promise { const r = await tx.exchanges.get(exchangeBaseUrl); @@ -220,7 +213,7 @@ async function getExchangeRecordsInternal( } export async function getExchangeScopeInfo( - tx: WalletDbReadOnlyTransactionArr< + tx: WalletDbReadOnlyTransaction< [ "exchanges", "exchangeDetails", @@ -243,7 +236,7 @@ export async function getExchangeScopeInfo( } async function internalGetExchangeScopeInfo( - tx: WalletDbReadOnlyTransactionArr< + tx: WalletDbReadOnlyTransaction< ["globalCurrencyExchanges", "globalCurrencyAuditors"] >, exchangeDetails: ExchangeDetailsRecord, @@ -284,7 +277,7 @@ async function internalGetExchangeScopeInfo( } async function makeExchangeListItem( - tx: WalletDbReadOnlyTransactionArr< + tx: WalletDbReadOnlyTransaction< ["globalCurrencyExchanges", "globalCurrencyAuditors"] >, r: ExchangeEntryRecord, @@ -328,10 +321,7 @@ export interface ExchangeWireDetails { } export async function getExchangeWireDetailsInTx( - tx: GetReadOnlyAccess<{ - exchanges: typeof WalletStoresV1.exchanges; - exchangeDetails: typeof WalletStoresV1.exchangeDetails; - }>, + tx: WalletDbReadOnlyTransaction<["exchanges", "exchangeDetails"]>, exchangeBaseUrl: string, ): Promise { const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl); @@ -389,9 +379,9 @@ export async function acceptExchangeTermsOfService( ws: InternalWalletState, exchangeBaseUrl: string, ): Promise { - const notif = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadWrite(async (tx) => { + const notif = await ws.db.runReadWriteTx( + ["exchangeDetails", "exchanges"], + async (tx) => { const exch = await tx.exchanges.get(exchangeBaseUrl); if (exch && exch.tosCurrentEtag) { const oldExchangeState = getExchangeState(exch); @@ -409,7 +399,8 @@ export async function acceptExchangeTermsOfService( } satisfies WalletNotification; } return undefined; - }); + }, + ); if (notif) { ws.notify(notif); } @@ -537,7 +528,7 @@ async function validateGlobalFees( * if the DB transaction succeeds. */ export async function addPresetExchangeEntry( - tx: WalletDbReadWriteTransactionArr<["exchanges"]>, + tx: WalletDbReadWriteTransaction<["exchanges"]>, exchangeBaseUrl: string, currencyHint?: string, ): Promise<{ notification?: WalletNotification }> { @@ -577,10 +568,7 @@ export async function addPresetExchangeEntry( async function provideExchangeRecordInTx( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - exchanges: typeof WalletStoresV1.exchanges; - exchangeDetails: typeof WalletStoresV1.exchangeDetails; - }>, + tx: WalletDbReadWriteTransaction<["exchanges", "exchangeDetails"]>, baseUrl: string, ): Promise<{ exchange: ExchangeEntryRecord; @@ -854,55 +842,58 @@ async function startUpdateExchangeEntry( }`, ); - const { notification } = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadWrite(async (tx) => { + const { notification } = await ws.db.runReadWriteTx( + ["exchanges", "exchangeDetails"], + async (tx) => { return provideExchangeRecordInTx(ws, tx, exchangeBaseUrl); - }); + }, + ); if (notification) { ws.notify(notification); } - const { oldExchangeState, newExchangeState, taskId } = await ws.db - .mktx((x) => [x.exchanges, x.operationRetries]) - .runReadWrite(async (tx) => { - const r = await tx.exchanges.get(canonBaseUrl); - if (!r) { - throw Error("exchange not found"); - } - const oldExchangeState = getExchangeState(r); - switch (r.updateStatus) { - case ExchangeEntryDbUpdateStatus.UnavailableUpdate: - break; - case ExchangeEntryDbUpdateStatus.Suspended: - break; - case ExchangeEntryDbUpdateStatus.ReadyUpdate: - break; - case ExchangeEntryDbUpdateStatus.Ready: { - const nextUpdateTimestamp = AbsoluteTime.fromPreciseTimestamp( - timestampPreciseFromDb(r.nextUpdateStamp), - ); - // Only update if entry is outdated or update is forced. - if ( - options.forceUpdate || - AbsoluteTime.isExpired(nextUpdateTimestamp) - ) { - r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate; + const { oldExchangeState, newExchangeState, taskId } = + await ws.db.runReadWriteTx( + ["exchanges", "operationRetries"], + async (tx) => { + const r = await tx.exchanges.get(canonBaseUrl); + if (!r) { + throw Error("exchange not found"); + } + const oldExchangeState = getExchangeState(r); + switch (r.updateStatus) { + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + break; + case ExchangeEntryDbUpdateStatus.Suspended: + break; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + break; + case ExchangeEntryDbUpdateStatus.Ready: { + const nextUpdateTimestamp = AbsoluteTime.fromPreciseTimestamp( + timestampPreciseFromDb(r.nextUpdateStamp), + ); + // Only update if entry is outdated or update is forced. + if ( + options.forceUpdate || + AbsoluteTime.isExpired(nextUpdateTimestamp) + ) { + r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate; + } + break; } - break; + case ExchangeEntryDbUpdateStatus.Initial: + r.updateStatus = ExchangeEntryDbUpdateStatus.InitialUpdate; + break; } - case ExchangeEntryDbUpdateStatus.Initial: - r.updateStatus = ExchangeEntryDbUpdateStatus.InitialUpdate; - break; - } - await tx.exchanges.put(r); - const newExchangeState = getExchangeState(r); - // Reset retries for updating the exchange entry. - const taskId = TaskIdentifiers.forExchangeUpdate(r); - await tx.operationRetries.delete(taskId); - return { oldExchangeState, newExchangeState, taskId }; - }); + await tx.exchanges.put(r); + const newExchangeState = getExchangeState(r); + // Reset retries for updating the exchange entry. + const taskId = TaskIdentifiers.forExchangeUpdate(r); + await tx.operationRetries.delete(taskId); + return { oldExchangeState, newExchangeState, taskId }; + }, + ); ws.notify({ type: NotificationType.ExchangeStateTransition, exchangeBaseUrl: canonBaseUrl, @@ -1284,17 +1275,17 @@ export async function updateExchangeFromUrlHandler( } } - const updated = await ws.db - .mktx((x) => [ - x.exchanges, - x.exchangeDetails, - x.exchangeSignKeys, - x.denominations, - x.coins, - x.refreshGroups, - x.recoupGroups, - ]) - .runReadWrite(async (tx) => { + const updated = await ws.db.runReadWriteTx( + [ + "exchanges", + "exchangeDetails", + "exchangeSignKeys", + "denominations", + "coins", + "refreshGroups", + "recoupGroups", + ], + async (tx) => { const r = await tx.exchanges.get(exchangeBaseUrl); if (!r) { logger.warn(`exchange ${exchangeBaseUrl} no longer present`); @@ -1454,7 +1445,8 @@ export async function updateExchangeFromUrlHandler( oldExchangeState, newExchangeState, }; - }); + }, + ); if (recoupGroupId) { const recoupTaskId = constructTaskIdentifier({ @@ -1481,15 +1473,15 @@ export async function updateExchangeFromUrlHandler( if (refreshCheckNecessary) { // Do auto-refresh. - await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.refreshGroups, - x.exchanges, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "coins", + "denominations", + "coinAvailability", + "refreshGroups", + "exchanges", + ], + async (tx) => { const exchange = await tx.exchanges.get(exchangeBaseUrl); if (!exchange || !exchange.detailsPointer) { return; @@ -1547,7 +1539,8 @@ export async function updateExchangeFromUrlHandler( AbsoluteTime.toPreciseTimestamp(minCheckThreshold), ); await tx.exchanges.put(exchange); - }); + }, + ); } ws.notify({ @@ -1599,11 +1592,12 @@ export async function getExchangePaytoUri( ): Promise { // We do the update here, since the exchange might not even exist // yet in our database. - const details = await ws.db - .mktx((x) => [x.exchangeDetails, x.exchanges]) - .runReadOnly(async (tx) => { + const details = await ws.db.runReadOnlyTx( + ["exchanges", "exchangeDetails"], + async (tx) => { return getExchangeRecordsInternal(tx, exchangeBaseUrl); - }); + }, + ); const accounts = details?.wireInfo.accounts ?? []; for (const account of accounts) { const res = parsePaytoUri(account.payto_uri); @@ -1641,15 +1635,13 @@ export async function getExchangeTos( acceptLanguage, ); - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadWrite(async (tx) => { - const updateExchangeEntry = await tx.exchanges.get(exchangeBaseUrl); - if (updateExchangeEntry) { - updateExchangeEntry.tosCurrentEtag = tosDownload.tosEtag; - await tx.exchanges.put(updateExchangeEntry); - } - }); + await ws.db.runReadWriteTx(["exchanges"], async (tx) => { + const updateExchangeEntry = await tx.exchanges.get(exchangeBaseUrl); + if (updateExchangeEntry) { + updateExchangeEntry.tosCurrentEtag = tosDownload.tosEtag; + await tx.exchanges.put(updateExchangeEntry); + } + }); return { acceptedEtag: exch.tosAcceptedEtag, @@ -1738,7 +1730,7 @@ export async function listExchanges( */ export async function markExchangeUsed( ws: InternalWalletState, - tx: GetReadWriteAccess<{ exchanges: typeof WalletStoresV1.exchanges }>, + tx: WalletDbReadWriteTransaction<["exchanges"]>, exchangeBaseUrl: string, ): Promise<{ notif: WalletNotification | undefined }> { exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); @@ -1780,9 +1772,9 @@ export async function getExchangeDetailedInfo( ws: InternalWalletState, exchangeBaseurl: string, ): Promise { - const exchange = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails, x.denominations]) - .runReadOnly(async (tx) => { + const exchange = await ws.db.runReadOnlyTx( + ["exchanges", "exchangeDetails", "denominations"], + async (tx) => { const ex = await tx.exchanges.get(exchangeBaseurl); const dp = ex?.detailsPointer; if (!dp) { @@ -1815,7 +1807,8 @@ export async function getExchangeDetailedInfo( }, denominations, }; - }); + }, + ); if (!exchange) { throw Error(`exchange with base url "${exchangeBaseurl}" not found`); @@ -1930,7 +1923,7 @@ export async function getExchangeDetailedInfo( async function internalGetExchangeResources( ws: InternalWalletState, - tx: DbReadOnlyTransactionArr< + tx: DbReadOnlyTransaction< typeof WalletStoresV1, ["exchanges", "coins", "withdrawalGroups"] >, diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 1039ac95e..260fc815a 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -114,7 +114,8 @@ import { timestampPreciseToDb, timestampProtocolFromDb, timestampProtocolToDb, - WalletDbReadWriteTransactionArr, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, } from "../index.js"; import { EXCHANGE_COINS_LOCK, @@ -123,11 +124,7 @@ import { import { assertUnreachable } from "../util/assertUnreachable.js"; import { PreviousPayCoins, selectPayCoinsNew } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { - DbReadWriteTransactionArr, - GetReadOnlyAccess, - StoreNames, -} from "../util/query.js"; +import { DbReadWriteTransaction, StoreNames } from "../util/query.js"; import { constructTaskIdentifier, DbRetryInfo, @@ -197,7 +194,7 @@ export class PayMerchantTransactionContext implements TransactionContext { opts: { extraStores: StoreNameArray }, f: ( rec: PurchaseRecord, - tx: DbReadWriteTransactionArr< + tx: DbReadWriteTransaction< typeof WalletStoresV1, ["purchases", ...StoreNameArray] >, @@ -233,29 +230,27 @@ export class PayMerchantTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, proposalId } = this; - await ws.db - .mktx((x) => [x.purchases, x.tombstones]) - .runReadWrite(async (tx) => { - let found = false; - const purchase = await tx.purchases.get(proposalId); - if (purchase) { - found = true; - await tx.purchases.delete(proposalId); - } - if (found) { - await tx.tombstones.put({ - id: TombstoneTag.DeletePayment + ":" + proposalId, - }); - } - }); + await ws.db.runReadWriteTx(["purchases", "tombstones"], async (tx) => { + let found = false; + const purchase = await tx.purchases.get(proposalId); + if (purchase) { + found = true; + await tx.purchases.delete(proposalId); + } + if (found) { + await tx.tombstones.put({ + id: TombstoneTag.DeletePayment + ":" + proposalId, + }); + } + }); } async suspendTransaction(): Promise { const { ws, proposalId, transactionId } = this; ws.taskScheduler.stopShepherdTask(this.taskId); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -268,7 +263,8 @@ export class PayMerchantTransactionContext implements TransactionContext { await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -341,9 +337,9 @@ export class PayMerchantTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, proposalId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -356,23 +352,24 @@ export class PayMerchantTransactionContext implements TransactionContext { await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { const { ws, proposalId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [ - x.purchases, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "purchases", + "refreshGroups", + "denominations", + "coinAvailability", + "coins", + "operationRetries", + ], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -390,7 +387,8 @@ export class PayMerchantTransactionContext implements TransactionContext { } const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.stopShepherdTask(this.taskId); } @@ -410,17 +408,15 @@ export class RefundTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, refundGroupId, transactionId } = this; - await ws.db - .mktx((x) => [x.refundGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const refundRecord = await tx.refundGroups.get(refundGroupId); - if (!refundRecord) { - return; - } - await tx.refundGroups.delete(refundGroupId); - await tx.tombstones.put({ id: transactionId }); - // FIXME: Also tombstone the refund items, so that they won't reappear. - }); + await ws.db.runReadWriteTx(["refundGroups", "tombstones"], async (tx) => { + const refundRecord = await tx.refundGroups.get(refundGroupId); + if (!refundRecord) { + return; + } + await tx.refundGroups.delete(refundGroupId); + await tx.tombstones.put({ id: transactionId }); + // FIXME: Also tombstone the refund items, so that they won't reappear. + }); } suspendTransaction(): Promise { @@ -452,46 +448,44 @@ export async function getTotalPaymentCost( pcs: PayCoinSelection, ): Promise { const currency = Amounts.currencyOf(pcs.paymentAmount); - return ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - const costs: AmountJson[] = []; - for (let i = 0; i < pcs.coinPubs.length; i++) { - const coin = await tx.coins.get(pcs.coinPubs[i]); - if (!coin) { - throw Error("can't calculate payment cost, coin not found"); - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't calculate payment cost, denomination for coin not found", - ); - } - const allDenoms = await getCandidateWithdrawalDenomsTx( - ws, - tx, - coin.exchangeBaseUrl, - currency, - ); - const amountLeft = Amounts.sub( - denom.value, - pcs.coinContributions[i], - ).amount; - const refreshCost = getTotalRefreshCost( - allDenoms, - DenominationRecord.toDenomInfo(denom), - amountLeft, - ws.config.testing.denomselAllowLate, + return ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + const costs: AmountJson[] = []; + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate payment cost, coin not found"); + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't calculate payment cost, denomination for coin not found", ); - costs.push(Amounts.parseOrThrow(pcs.coinContributions[i])); - costs.push(refreshCost); } - const zero = Amounts.zeroOfAmount(pcs.paymentAmount); - return Amounts.sum([zero, ...costs]).amount; - }); + const allDenoms = await getCandidateWithdrawalDenomsTx( + ws, + tx, + coin.exchangeBaseUrl, + currency, + ); + const amountLeft = Amounts.sub( + denom.value, + pcs.coinContributions[i], + ).amount; + const refreshCost = getTotalRefreshCost( + allDenoms, + DenominationRecord.toDenomInfo(denom), + amountLeft, + ws.config.testing.denomselAllowLate, + ); + costs.push(Amounts.parseOrThrow(pcs.coinContributions[i])); + costs.push(refreshCost); + } + const zero = Amounts.zeroOfAmount(pcs.paymentAmount); + return Amounts.sum([zero, ...costs]).amount; + }); } async function failProposalPermanently( @@ -503,9 +497,9 @@ async function failProposalPermanently( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -516,7 +510,8 @@ async function failProposalPermanently( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -543,9 +538,7 @@ function getPayRequestTimeout(purchase: PurchaseRecord): Duration { export async function expectProposalDownload( ws: InternalWalletState, p: PurchaseRecord, - parentTx?: GetReadOnlyAccess<{ - contractTerms: typeof WalletStoresV1.contractTerms; - }>, + parentTx?: WalletDbReadOnlyTransaction<["contractTerms"]>, ): Promise<{ contractData: WalletContractData; contractTermsRaw: any; @@ -577,9 +570,7 @@ export async function expectProposalDownload( if (parentTx) { return getFromTransaction(parentTx); } - return await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(getFromTransaction); + return await ws.db.runReadOnlyTx(["contractTerms"], getFromTransaction); } export function extractContractData( @@ -626,11 +617,9 @@ async function processDownloadProposal( ws: InternalWalletState, proposalId: string, ): Promise { - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return await tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return await tx.purchases.get(proposalId); + }); if (!proposal) { return TaskRunResult.finished(); @@ -666,11 +655,12 @@ async function processDownloadProposal( } const opId = TaskIdentifiers.forPay(proposal); - const retryRecord = await ws.db - .mktx((x) => [x.operationRetries]) - .runReadOnly(async (tx) => { + const retryRecord = await ws.db.runReadOnlyTx( + ["operationRetries"], + async (tx) => { return tx.operationRetries.get(opId); - }); + }, + ); const httpResponse = await ws.http.fetch(orderClaimUrl, { method: "POST", @@ -807,9 +797,9 @@ async function processDownloadProposal( logger.trace(`extracted contract data: ${j2s(contractData)}`); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases, x.contractTerms]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases", "contractTerms"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -855,7 +845,8 @@ async function processDownloadProposal( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); @@ -875,14 +866,12 @@ async function createOrReusePurchase( claimToken: string | undefined, noncePriv: string | undefined, ): Promise { - const oldProposals = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.indexes.byUrlAndOrderId.getAll([ - merchantBaseUrl, - orderId, - ]); - }); + const oldProposals = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.indexes.byUrlAndOrderId.getAll([ + merchantBaseUrl, + orderId, + ]); + }); const oldProposal = oldProposals.find((p) => { return ( @@ -911,9 +900,9 @@ async function createOrReusePurchase( if (paid) { // if this transaction was shared and the order is paid then it // means that another wallet already paid the proposal - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(oldProposal.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -924,7 +913,8 @@ async function createOrReusePurchase( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, @@ -976,9 +966,9 @@ async function createOrReusePurchase( shared: shared, }; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { await tx.purchases.put(proposalRecord); const oldTxState: TransactionState = { major: TransactionMajorState.None, @@ -988,7 +978,8 @@ async function createOrReusePurchase( oldTxState, newTxState, }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, @@ -1009,9 +1000,9 @@ async function storeFirstPaySuccess( proposalId, }); const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases, x.contractTerms]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["contractTerms", "purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { @@ -1059,7 +1050,8 @@ async function storeFirstPaySuccess( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -1072,9 +1064,9 @@ async function storePayReplaySuccess( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { @@ -1096,7 +1088,8 @@ async function storePayReplaySuccess( await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -1115,11 +1108,9 @@ async function handleInsufficientFunds( ): Promise { logger.trace("handling insufficient funds, trying to re-select coins"); - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { return; } @@ -1156,34 +1147,32 @@ async function handleInsufficientFunds( const payCoinSelection = payInfo.payCoinSelection; - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { - const coinPub = payCoinSelection.coinPubs[i]; - if (coinPub === brokenCoinPub) { - continue; - } - const contrib = payCoinSelection.coinContributions[i]; - const coin = await tx.coins.get(coinPub); - if (!coin) { - continue; - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - continue; - } - prevPayCoins.push({ - coinPub, - contribution: Amounts.parseOrThrow(contrib), - exchangeBaseUrl: coin.exchangeBaseUrl, - feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit), - }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { + const coinPub = payCoinSelection.coinPubs[i]; + if (coinPub === brokenCoinPub) { + continue; } - }); + const contrib = payCoinSelection.coinContributions[i]; + const coin = await tx.coins.get(coinPub); + if (!coin) { + continue; + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + continue; + } + prevPayCoins.push({ + coinPub, + contribution: Amounts.parseOrThrow(contrib), + exchangeBaseUrl: coin.exchangeBaseUrl, + feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit), + }); + } + }); const res = await selectPayCoinsNew(ws, { auditors: [], @@ -1204,15 +1193,15 @@ async function handleInsufficientFunds( logger.trace("re-selected coins"); - await ws.db - .mktx((x) => [ - x.purchases, - x.coins, - x.coinAvailability, - x.denominations, - x.refreshGroups, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "purchases", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + ], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -1236,7 +1225,8 @@ async function handleInsufficientFunds( ), refreshReason: RefreshReason.PayMerchant, }); - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, @@ -1255,11 +1245,9 @@ async function checkPaymentByProposalId( proposalId: string, sessionId?: string, ): Promise { - let proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + let proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`could not get proposal ${proposalId}`); } @@ -1267,11 +1255,12 @@ async function checkPaymentByProposalId( const existingProposalId = proposal.repurchaseProposalId; if (existingProposalId) { logger.trace("using existing purchase for same product"); - const oldProposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { + const oldProposal = await ws.db.runReadOnlyTx( + ["purchases"], + async (tx) => { return tx.purchases.get(existingProposalId); - }); + }, + ); if (oldProposal) { proposal = oldProposal; } @@ -1299,11 +1288,9 @@ async function checkPaymentByProposalId( }); // First check if we already paid for it. - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if ( !purchase || @@ -1363,9 +1350,9 @@ async function checkPaymentByProposalId( "automatically re-submitting payment with different session ID", ); logger.trace(`last: ${purchase.lastSessionId}, current: ${sessionId}`); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -1376,7 +1363,8 @@ async function checkPaymentByProposalId( await tx.purchases.put(p); const newTxState = computePayMerchantTransactionState(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(ctx.taskId); @@ -1440,11 +1428,9 @@ export async function getContractTermsDetails( ws: InternalWalletState, proposalId: string, ): Promise { - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`proposal with id ${proposalId} not found`); @@ -1630,26 +1616,24 @@ export async function generateDepositPermissions( coin: CoinRecord; denom: DenominationRecord; }> = []; - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSel.coinPubs.length; i++) { - const coin = await tx.coins.get(payCoinSel.coinPubs[i]); - if (!coin) { - throw Error("can't pay, allocated coin not found anymore"); - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't pay, denomination of allocated coin not found anymore", - ); - } - coinWithDenom.push({ coin, denom }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (let i = 0; i < payCoinSel.coinPubs.length; i++) { + const coin = await tx.coins.get(payCoinSel.coinPubs[i]); + if (!coin) { + throw Error("can't pay, allocated coin not found anymore"); } - }); + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't pay, denomination of allocated coin not found anymore", + ); + } + coinWithDenom.push({ coin, denom }); + } + }); for (let i = 0; i < payCoinSel.coinPubs.length; i++) { const { coin, denom } = coinWithDenom[i]; @@ -1805,11 +1789,9 @@ export async function confirmPay( logger.trace( `executing confirmPay with proposalId ${proposalId} and sessionIdOverride ${sessionIdOverride}`, ); - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`proposal with id ${proposalId} not found`); @@ -1820,9 +1802,9 @@ export async function confirmPay( throw Error("proposal is in invalid state"); } - const existingPurchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const existingPurchase = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if ( purchase && @@ -1837,7 +1819,8 @@ export async function confirmPay( await tx.purchases.put(purchase); } return purchase; - }); + }, + ); if (existingPurchase && existingPurchase.payInfo) { logger.trace("confirmPay: submitting payment for existing purchase"); @@ -1890,15 +1873,15 @@ export async function confirmPay( `recording payment on ${proposal.orderId} with session ID ${sessionId}`, ); - const transitionInfo = await ws.db - .mktx((x) => [ - x.purchases, - x.coins, - x.refreshGroups, - x.denominations, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "purchases", + "coins", + "refreshGroups", + "denominations", + "coinAvailability", + ], + async (tx) => { const p = await tx.purchases.get(proposal.proposalId); if (!p) { return; @@ -1936,7 +1919,8 @@ export async function confirmPay( } const newTxState = computePayMerchantTransactionState(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.notify({ @@ -1952,11 +1936,9 @@ export async function processPurchase( ws: InternalWalletState, proposalId: string, ): Promise { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { return { type: TaskRunResultType.Error, @@ -2013,11 +1995,9 @@ async function processPurchasePay( ws: InternalWalletState, proposalId: string, ): Promise { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { return { type: TaskRunResultType.Error, @@ -2051,9 +2031,9 @@ async function processPurchasePay( const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2064,7 +2044,8 @@ async function processPurchasePay( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId, @@ -2213,9 +2194,9 @@ export async function refuseProposal( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const proposal = await tx.purchases.get(proposalId); if (!proposal) { logger.trace(`proposal ${proposalId} not found, won't refuse proposal`); @@ -2232,7 +2213,8 @@ export async function refuseProposal( const newTxState = computePayMerchantTransactionState(proposal); await tx.purchases.put(proposal); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -2476,36 +2458,34 @@ export async function sharePayment( merchantBaseUrl: string, orderId: string, ): Promise { - const result = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.indexes.byUrlAndOrderId.get([ - merchantBaseUrl, - orderId, - ]); - if (!p) { - logger.warn("purchase does not exist anymore"); - return undefined; - } - if ( - p.purchaseStatus !== PurchaseStatus.DialogProposed && - p.purchaseStatus !== PurchaseStatus.DialogShared - ) { - // FIXME: purchase can be shared before being paid - return undefined; - } - if (p.purchaseStatus === PurchaseStatus.DialogProposed) { - p.purchaseStatus = PurchaseStatus.DialogShared; - p.shared = true; - tx.purchases.put(p); - } + const result = await ws.db.runReadWriteTx(["purchases"], async (tx) => { + const p = await tx.purchases.indexes.byUrlAndOrderId.get([ + merchantBaseUrl, + orderId, + ]); + if (!p) { + logger.warn("purchase does not exist anymore"); + return undefined; + } + if ( + p.purchaseStatus !== PurchaseStatus.DialogProposed && + p.purchaseStatus !== PurchaseStatus.DialogShared + ) { + // FIXME: purchase can be shared before being paid + return undefined; + } + if (p.purchaseStatus === PurchaseStatus.DialogProposed) { + p.purchaseStatus = PurchaseStatus.DialogShared; + p.shared = true; + tx.purchases.put(p); + } - return { - nonce: p.noncePriv, - session: p.lastSessionId ?? p.downloadSessionId, - token: p.claimToken, - }; - }); + return { + nonce: p.noncePriv, + session: p.lastSessionId ?? p.downloadSessionId, + token: p.claimToken, + }; + }); if (result === undefined) { throw Error("This purchase can't be shared"); @@ -2560,9 +2540,9 @@ async function processPurchaseDialogShared( const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2573,7 +2553,8 @@ async function processPurchaseDialogShared( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId, @@ -2612,9 +2593,9 @@ async function processPurchaseAutoRefund( ), ) ) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2629,7 +2610,8 @@ async function processPurchaseAutoRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } @@ -2656,9 +2638,9 @@ async function processPurchaseAutoRefund( ); if (orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2672,7 +2654,8 @@ async function processPurchaseAutoRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -2699,22 +2682,18 @@ async function processPurchaseAbortingRefund( throw Error("can't abort, no coins selected"); } - await ws.db - .mktx((x) => [x.coins]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { - const coinPub = payCoinSelection.coinPubs[i]; - const coin = await tx.coins.get(coinPub); - checkDbInvariant(!!coin, "expected coin to be present"); - abortingCoins.push({ - coin_pub: coinPub, - contribution: Amounts.stringify( - payCoinSelection.coinContributions[i], - ), - exchange_url: coin.exchangeBaseUrl, - }); - } - }); + await ws.db.runReadOnlyTx(["coins"], async (tx) => { + for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { + const coinPub = payCoinSelection.coinPubs[i]; + const coin = await tx.coins.get(coinPub); + checkDbInvariant(!!coin, "expected coin to be present"); + abortingCoins.push({ + coin_pub: coinPub, + contribution: Amounts.stringify(payCoinSelection.coinContributions[i]), + exchange_url: coin.exchangeBaseUrl, + }); + } + }); const abortReq: AbortRequest = { h_contract: download.contractData.contractTermsHash, @@ -2805,9 +2784,9 @@ async function processPurchaseQueryRefund( }); if (!orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2822,7 +2801,8 @@ async function processPurchaseQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); } else { @@ -2831,9 +2811,9 @@ async function processPurchaseQueryRefund( Amounts.parseOrThrow(orderStatus.refund_taken), ).amount; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2848,7 +2828,8 @@ async function processPurchaseQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); } @@ -2897,14 +2878,15 @@ export async function startRefundQueryForUri( if (parsedUri.type !== TalerUriAction.Refund) { throw Error("expected taler://refund URI"); } - const purchaseRecord = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { + const purchaseRecord = await ws.db.runReadOnlyTx( + ["purchases"], + async (tx) => { return tx.purchases.indexes.byUrlAndOrderId.get([ parsedUri.merchantBaseUrl, parsedUri.orderId, ]); - }); + }, + ); if (!purchaseRecord) { logger.error( `no purchase for order ID "${parsedUri.orderId}" from merchant "${parsedUri.merchantBaseUrl}" when processing "${talerUri}"`, @@ -2927,9 +2909,9 @@ export async function startQueryRefund( proposalId: string, ): Promise { const ctx = new PayMerchantTransactionContext(ws, proposalId); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { logger.warn(`purchase ${proposalId} does not exist anymore`); @@ -2943,14 +2925,15 @@ export async function startQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, ctx.transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(ctx.taskId); } async function computeRefreshRequest( ws: InternalWalletState, - tx: WalletDbReadWriteTransactionArr<["coins", "denominations"]>, + tx: WalletDbReadWriteTransaction<["coins", "denominations"]>, items: RefundItemRecord[], ): Promise { const refreshCoins: CoinRefreshRequest[] = []; diff --git a/packages/taler-wallet-core/src/operations/pay-peer-common.ts b/packages/taler-wallet-core/src/operations/pay-peer-common.ts index 88eedb530..ae6f98ccd 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-common.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-common.ts @@ -30,11 +30,7 @@ import { codecOptional, } from "@gnu-taler/taler-util"; import { SpendCoinDetails } from "../crypto/cryptoImplementation.js"; -import { - DenominationRecord, - PeerPushPaymentCoinSelection, - ReserveRecord, -} from "../db.js"; +import { PeerPushPaymentCoinSelection, ReserveRecord } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import type { SelectedPeerCoin } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; @@ -51,33 +47,31 @@ export async function queryCoinInfosForSelection( csel: PeerPushPaymentCoinSelection, ): Promise { let infos: SpendCoinDetails[] = []; - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (let i = 0; i < csel.coinPubs.length; i++) { - const coin = await tx.coins.get(csel.coinPubs[i]); - if (!coin) { - throw Error("coin not found anymore"); - } - const denom = await ws.getDenomInfo( - ws, - tx, - coin.exchangeBaseUrl, - coin.denomPubHash, - ); - if (!denom) { - throw Error("denom for coin not found anymore"); - } - infos.push({ - coinPriv: coin.coinPriv, - coinPub: coin.coinPub, - denomPubHash: coin.denomPubHash, - denomSig: coin.denomSig, - ageCommitmentProof: coin.ageCommitmentProof, - contribution: csel.contributions[i], - }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (let i = 0; i < csel.coinPubs.length; i++) { + const coin = await tx.coins.get(csel.coinPubs[i]); + if (!coin) { + throw Error("coin not found anymore"); + } + const denom = await ws.getDenomInfo( + ws, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + if (!denom) { + throw Error("denom for coin not found anymore"); } - }); + infos.push({ + coinPriv: coin.coinPriv, + coinPub: coin.coinPub, + denomPubHash: coin.denomPubHash, + denomSig: coin.denomSig, + ageCommitmentProof: coin.ageCommitmentProof, + contribution: csel.contributions[i], + }); + } + }); return infos; } @@ -86,48 +80,46 @@ export async function getTotalPeerPaymentCost( pcs: SelectedPeerCoin[], ): Promise { const currency = Amounts.currencyOf(pcs[0].contribution); - return ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - const costs: AmountJson[] = []; - for (let i = 0; i < pcs.length; i++) { - const coin = await tx.coins.get(pcs[i].coinPub); - if (!coin) { - throw Error("can't calculate payment cost, coin not found"); - } - const denomInfo = await ws.getDenomInfo( - ws, - tx, - coin.exchangeBaseUrl, - coin.denomPubHash, - ); - if (!denomInfo) { - throw Error( - "can't calculate payment cost, denomination for coin not found", - ); - } - const allDenoms = await getCandidateWithdrawalDenomsTx( - ws, - tx, - coin.exchangeBaseUrl, - currency, - ); - const amountLeft = Amounts.sub( - denomInfo.value, - pcs[i].contribution, - ).amount; - const refreshCost = getTotalRefreshCost( - allDenoms, - denomInfo, - amountLeft, - ws.config.testing.denomselAllowLate, + return ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + const costs: AmountJson[] = []; + for (let i = 0; i < pcs.length; i++) { + const coin = await tx.coins.get(pcs[i].coinPub); + if (!coin) { + throw Error("can't calculate payment cost, coin not found"); + } + const denomInfo = await ws.getDenomInfo( + ws, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + if (!denomInfo) { + throw Error( + "can't calculate payment cost, denomination for coin not found", ); - costs.push(Amounts.parseOrThrow(pcs[i].contribution)); - costs.push(refreshCost); } - const zero = Amounts.zeroOfAmount(pcs[0].contribution); - return Amounts.sum([zero, ...costs]).amount; - }); + const allDenoms = await getCandidateWithdrawalDenomsTx( + ws, + tx, + coin.exchangeBaseUrl, + currency, + ); + const amountLeft = Amounts.sub( + denomInfo.value, + pcs[i].contribution, + ).amount; + const refreshCost = getTotalRefreshCost( + allDenoms, + denomInfo, + amountLeft, + ws.config.testing.denomselAllowLate, + ); + costs.push(Amounts.parseOrThrow(pcs[i].contribution)); + costs.push(refreshCost); + } + const zero = Amounts.zeroOfAmount(pcs[0].contribution); + return Amounts.sum([zero, ...costs]).amount; + }); } interface ExchangePurseStatus { @@ -153,9 +145,9 @@ export async function getMergeReserveInfo( // due to the async crypto API. const newReservePair = await ws.cryptoApi.createEddsaKeypair({}); - const mergeReserveRecord: ReserveRecord = await ws.db - .mktx((x) => [x.exchanges, x.reserves, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const mergeReserveRecord: ReserveRecord = await ws.db.runReadWriteTx( + ["exchanges", "reserves"], + async (tx) => { const ex = await tx.exchanges.get(req.exchangeBaseUrl); checkDbInvariant(!!ex); if (ex.currentMergeReserveRowId != null) { @@ -173,7 +165,8 @@ export async function getMergeReserveInfo( ex.currentMergeReserveRowId = reserve.rowId; await tx.exchanges.put(ex); return reserve; - }); + }, + ); return mergeReserveRecord; } diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts index cc41abde9..e97466084 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts @@ -108,9 +108,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, pursePub } = this; - await ws.db - .mktx((x) => [x.withdrawalGroups, x.peerPullCredit, x.tombstones]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["withdrawalGroups", "peerPullCredit", "tombstones"], + async (tx) => { const pullIni = await tx.peerPullCredit.get(pursePub); if (!pullIni) { return; @@ -130,16 +130,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext { await tx.tombstones.put({ id: TombstoneTag.DeletePeerPullCredit + ":" + pursePub, }); - }); + }, + ); return; } async suspendTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); if (!pullCreditRec) { logger.warn(`peer pull credit ${pursePub} not found`); @@ -189,16 +190,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); if (!pullCreditRec) { logger.warn(`peer pull credit ${pursePub} not found`); @@ -239,16 +241,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.stopShepherdTask(retryTag); } async resumeTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); if (!pullCreditRec) { logger.warn(`peer pull credit ${pursePub} not found`); @@ -297,16 +300,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); } async abortTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const pullCreditRec = await tx.peerPullCredit.get(pursePub); if (!pullCreditRec) { logger.warn(`peer pull credit ${pursePub} not found`); @@ -350,7 +354,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); @@ -382,9 +387,9 @@ async function queryPurseForPeerPullCredit( switch (resp.status) { case HttpStatusCode.Gone: { // Exchange says that purse doesn't exist anymore => expired! - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const finPi = await tx.peerPullCredit.get(pullIni.pursePub); if (!finPi) { logger.warn("peerPullCredit not found anymore"); @@ -397,7 +402,8 @@ async function queryPurseForPeerPullCredit( await tx.peerPullCredit.put(finPi); const newTxState = computePeerPullCreditTransactionState(finPi); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); } @@ -419,11 +425,9 @@ async function queryPurseForPeerPullCredit( return TaskRunResult.backoff(); } - const reserve = await ws.db - .mktx((x) => [x.reserves]) - .runReadOnly(async (tx) => { - return await tx.reserves.get(pullIni.mergeReserveRowId); - }); + const reserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => { + return await tx.reserves.get(pullIni.mergeReserveRowId); + }); if (!reserve) { throw Error("reserve for peer pull credit not found in wallet DB"); @@ -443,9 +447,9 @@ async function queryPurseForPeerPullCredit( pub: reserve.reservePub, }, }); - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const finPi = await tx.peerPullCredit.get(pullIni.pursePub); if (!finPi) { logger.warn("peerPullCredit not found anymore"); @@ -458,7 +462,8 @@ async function queryPurseForPeerPullCredit( await tx.peerPullCredit.put(finPi); const newTxState = computePeerPullCreditTransactionState(finPi); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); } @@ -496,9 +501,9 @@ async function longpollKycStatus( // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const peerIni = await tx.peerPullCredit.get(pursePub); if (!peerIni) { return; @@ -513,7 +518,8 @@ async function longpollKycStatus( const newTxState = computePeerPullCreditTransactionState(peerIni); await tx.peerPullCredit.put(peerIni); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? @@ -545,15 +551,15 @@ async function processPeerPullCreditAbortingDeletePurse( }); logger.info(`deleted purse with response status ${resp.status}`); - const transitionInfo = await ws.db - .mktx((x) => [ - x.peerPullCredit, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "peerPullCredit", + "refreshGroups", + "denominations", + "coinAvailability", + "coins", + ], + async (tx) => { const ppiRec = await tx.peerPullCredit.get(pursePub); if (!ppiRec) { return undefined; @@ -569,7 +575,8 @@ async function processPeerPullCreditAbortingDeletePurse( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); @@ -588,9 +595,9 @@ async function handlePeerPullCreditWithdrawing( }); const wgId = pullIni.withdrawalGroupId; let finished: boolean = false; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit", "withdrawalGroups"], + async (tx) => { const ppi = await tx.peerPullCredit.get(pullIni.pursePub); if (!ppi) { finished = true; @@ -619,7 +626,8 @@ async function handlePeerPullCreditWithdrawing( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); if (finished) { return TaskRunResult.finished(); @@ -635,21 +643,20 @@ async function handlePeerPullCreditCreatePurse( ): Promise { const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount)); const pursePub = pullIni.pursePub; - const mergeReserve = await ws.db - .mktx((x) => [x.reserves]) - .runReadOnly(async (tx) => { - return tx.reserves.get(pullIni.mergeReserveRowId); - }); + const mergeReserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => { + return tx.reserves.get(pullIni.mergeReserveRowId); + }); if (!mergeReserve) { throw Error("merge reserve for peer pull payment not found in database"); } - const contractTermsRecord = await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(async (tx) => { + const contractTermsRecord = await ws.db.runReadOnlyTx( + ["contractTerms"], + async (tx) => { return tx.contractTerms.get(pullIni.contractTermsHash); - }); + }, + ); if (!contractTermsRecord) { throw Error("contract terms for peer pull payment not found in database"); @@ -731,9 +738,9 @@ async function handlePeerPullCreditCreatePurse( pursePub: pullIni.pursePub, }); - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const pi2 = await tx.peerPullCredit.get(pursePub); if (!pi2) { return; @@ -743,7 +750,8 @@ async function handlePeerPullCreditCreatePurse( await tx.peerPullCredit.put(pi2); const newTxState = computePeerPullCreditTransactionState(pi2); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); } @@ -753,11 +761,9 @@ export async function processPeerPullCredit( pursePub: string, cancellationToken: CancellationToken, ): Promise { - const pullIni = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadOnly(async (tx) => { - return tx.peerPullCredit.get(pursePub); - }); + const pullIni = await ws.db.runReadOnlyTx(["peerPullCredit"], async (tx) => { + return tx.peerPullCredit.get(pursePub); + }); if (!pullIni) { throw Error("peer pull payment initiation not found in database"); } @@ -843,9 +849,9 @@ async function processPeerPullCreditKycRequired( } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); - const { transitionInfo, result } = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { + const { transitionInfo, result } = await ws.db.runReadWriteTx( + ["peerPullCredit"], + async (tx) => { const peerInc = await tx.peerPullCredit.get(pursePub); if (!peerInc) { return { @@ -877,7 +883,8 @@ async function processPeerPullCreditKycRequired( transitionInfo: { oldTxState, newTxState }, result: res, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); } else { @@ -943,44 +950,42 @@ async function getPreferredExchangeForCurrency( ): Promise { // Find an exchange with the matching currency. // Prefer exchanges with the most recent withdrawal. - const url = await ws.db - .mktx((x) => [x.exchanges]) - .runReadOnly(async (tx) => { - const exchanges = await tx.exchanges.iter().toArray(); - let candidate = undefined; - for (const e of exchanges) { - if (e.detailsPointer?.currency !== currency) { - continue; - } - if (!candidate) { + const url = await ws.db.runReadOnlyTx(["exchanges"], async (tx) => { + const exchanges = await tx.exchanges.iter().toArray(); + let candidate = undefined; + for (const e of exchanges) { + if (e.detailsPointer?.currency !== currency) { + continue; + } + if (!candidate) { + candidate = e; + continue; + } + if (candidate.lastWithdrawal && !e.lastWithdrawal) { + continue; + } + const exchangeLastWithdrawal = timestampOptionalPreciseFromDb( + e.lastWithdrawal, + ); + const candidateLastWithdrawal = timestampOptionalPreciseFromDb( + candidate.lastWithdrawal, + ); + if (exchangeLastWithdrawal && candidateLastWithdrawal) { + if ( + AbsoluteTime.cmp( + AbsoluteTime.fromPreciseTimestamp(exchangeLastWithdrawal), + AbsoluteTime.fromPreciseTimestamp(candidateLastWithdrawal), + ) > 0 + ) { candidate = e; - continue; - } - if (candidate.lastWithdrawal && !e.lastWithdrawal) { - continue; - } - const exchangeLastWithdrawal = timestampOptionalPreciseFromDb( - e.lastWithdrawal, - ); - const candidateLastWithdrawal = timestampOptionalPreciseFromDb( - candidate.lastWithdrawal, - ); - if (exchangeLastWithdrawal && candidateLastWithdrawal) { - if ( - AbsoluteTime.cmp( - AbsoluteTime.fromPreciseTimestamp(exchangeLastWithdrawal), - AbsoluteTime.fromPreciseTimestamp(candidateLastWithdrawal), - ) > 0 - ) { - candidate = e; - } } } - if (candidate) { - return candidate.baseUrl; - } - return undefined; - }); + } + if (candidate) { + return candidate.baseUrl; + } + return undefined; + }); return url; } @@ -1036,9 +1041,9 @@ export async function initiatePeerPullPayment( const mergeTimestamp = TalerPreciseTimestamp.now(); - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit, x.contractTerms]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullCredit", "contractTerms"], + async (tx) => { const ppi: PeerPullCreditRecord = { amount: req.partialContractTerms.amount, contractTermsHash: hContractTerms, @@ -1066,7 +1071,8 @@ export async function initiatePeerPullPayment( h: hContractTerms, }); return { oldTxState, newTxState }; - }); + }, + ); const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub); diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts index e5ae6b73b..1504f3d83 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts @@ -63,7 +63,7 @@ import { readTalerErrorResponse, } from "@gnu-taler/taler-util/http"; import { - DbReadWriteTransactionArr, + DbReadWriteTransaction, InternalWalletState, PeerPullDebitRecordStatus, PeerPullPaymentIncomingRecord, @@ -125,15 +125,13 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const transactionId = this.transactionId; const ws = this.ws; const peerPullDebitId = this.peerPullDebitId; - await ws.db - .mktx((x) => [x.peerPullDebit, x.tombstones]) - .runReadWrite(async (tx) => { - const debit = await tx.peerPullDebit.get(peerPullDebitId); - if (debit) { - await tx.peerPullDebit.delete(peerPullDebitId); - await tx.tombstones.put({ id: transactionId }); - } - }); + await ws.db.runReadWriteTx(["peerPullDebit", "tombstones"], async (tx) => { + const debit = await tx.peerPullDebit.get(peerPullDebitId); + if (debit) { + await tx.peerPullDebit.delete(peerPullDebitId); + await tx.tombstones.put({ id: transactionId }); + } + }); } async suspendTransaction(): Promise { @@ -141,9 +139,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const transactionId = this.transactionId; const ws = this.ws; const peerPullDebitId = this.peerPullDebitId; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullDebit"], + async (tx) => { const pullDebitRec = await tx.peerPullDebit.get(peerPullDebitId); if (!pullDebitRec) { logger.warn(`peer pull debit ${peerPullDebitId} not found`); @@ -183,7 +181,8 @@ export class PeerPullDebitTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.stopShepherdTask(taskId); } @@ -295,7 +294,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { opts: { extraStores: StoreNameArray }, f: ( rec: PeerPullPaymentIncomingRecord, - tx: DbReadWriteTransactionArr< + tx: DbReadWriteTransaction< typeof WalletStoresV1, ["peerPullDebit", ...StoreNameArray] >, @@ -498,9 +497,9 @@ async function processPeerPullDebitAbortingRefresh( tag: TransactionType.PeerPullDebit, peerPullDebitId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups, x.peerPullDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPullDebit", "refreshGroups"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPullDebitRecordStatus | undefined; if (!refreshGroup) { @@ -529,7 +528,8 @@ async function processPeerPullDebitAbortingRefresh( return { oldTxState, newTxState }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); @@ -539,11 +539,12 @@ export async function processPeerPullDebit( ws: InternalWalletState, peerPullDebitId: string, ): Promise { - const peerPullInc = await ws.db - .mktx((x) => [x.peerPullDebit]) - .runReadOnly(async (tx) => { + const peerPullInc = await ws.db.runReadOnlyTx( + ["peerPullDebit"], + async (tx) => { return tx.peerPullDebit.get(peerPullDebitId); - }); + }, + ); if (!peerPullInc) { throw Error("peer pull debit not found"); } @@ -575,11 +576,12 @@ export async function confirmPeerPullDebit( throw Error("invalid request, transactionId or peerPullDebitId required"); } - const peerPullInc = await ws.db - .mktx((x) => [x.peerPullDebit]) - .runReadOnly(async (tx) => { + const peerPullInc = await ws.db.runReadOnlyTx( + ["peerPullDebit"], + async (tx) => { return tx.peerPullDebit.get(peerPullDebitId); - }); + }, + ); if (!peerPullInc) { throw Error( @@ -610,16 +612,16 @@ export async function confirmPeerPullDebit( coinSelRes.result.coins, ); - await ws.db - .mktx((x) => [ - x.exchanges, - x.coins, - x.denominations, - x.refreshGroups, - x.peerPullDebit, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "exchanges", + "coins", + "denominations", + "refreshGroups", + "peerPullDebit", + "coinAvailability", + ], + async (tx) => { await spendCoins(ws, tx, { // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`, allocationId: constructTransactionIdentifier({ @@ -646,7 +648,8 @@ export async function confirmPeerPullDebit( }; } await tx.peerPullDebit.put(pi); - }); + }, + ); const ctx = new PeerPullDebitTransactionContext(ws, peerPullDebitId); @@ -678,9 +681,9 @@ export async function preparePeerPullDebit( throw Error("got invalid taler://pay-pull URI"); } - const existing = await ws.db - .mktx((x) => [x.peerPullDebit, x.contractTerms]) - .runReadOnly(async (tx) => { + const existing = await ws.db.runReadOnlyTx( + ["peerPullDebit", "contractTerms"], + async (tx) => { const peerPullDebitRecord = await tx.peerPullDebit.indexes.byExchangeAndContractPriv.get([ uri.exchangeBaseUrl, @@ -696,7 +699,8 @@ export async function preparePeerPullDebit( return; } return { peerPullDebitRecord, contractTerms }; - }); + }, + ); if (existing) { return { @@ -780,25 +784,23 @@ export async function preparePeerPullDebit( coinSelRes.result.coins, ); - await ws.db - .mktx((x) => [x.peerPullDebit, x.contractTerms]) - .runReadWrite(async (tx) => { - await tx.contractTerms.put({ - h: contractTermsHash, - contractTermsRaw: contractTerms, - }), - await tx.peerPullDebit.add({ - peerPullDebitId, - contractPriv: contractPriv, - exchangeBaseUrl: exchangeBaseUrl, - pursePub: pursePub, - timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), - contractTermsHash, - amount: contractTerms.amount, - status: PeerPullDebitRecordStatus.DialogProposed, - totalCostEstimated: Amounts.stringify(totalAmount), - }); - }); + await ws.db.runReadWriteTx(["peerPullDebit", "contractTerms"], async (tx) => { + await tx.contractTerms.put({ + h: contractTermsHash, + contractTermsRaw: contractTerms, + }), + await tx.peerPullDebit.add({ + peerPullDebitId, + contractPriv: contractPriv, + exchangeBaseUrl: exchangeBaseUrl, + pursePub: pursePub, + timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), + contractTermsHash, + amount: contractTerms.amount, + status: PeerPullDebitRecordStatus.DialogProposed, + totalCostEstimated: Amounts.stringify(totalAmount), + }); + }); return { amount: contractTerms.amount, diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts index 23976f11b..412631356 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts @@ -363,9 +363,9 @@ export async function preparePeerPushCredit( throw Error("got invalid taler://pay-push URI"); } - const existing = await ws.db - .mktx((x) => [x.contractTerms, x.peerPushCredit]) - .runReadOnly(async (tx) => { + const existing = await ws.db.runReadOnlyTx( + ["contractTerms", "peerPushCredit"], + async (tx) => { const existingPushInc = await tx.peerPushCredit.indexes.byExchangeAndContractPriv.get([ uri.exchangeBaseUrl, @@ -386,7 +386,8 @@ export async function preparePeerPushCredit( existingContractTermsRec.contractTermsRaw, ); return { existingPushInc, existingContractTerms }; - }); + }, + ); if (existing) { return { @@ -457,9 +458,9 @@ export async function preparePeerPushCredit( undefined, ); - const transitionInfo = await ws.db - .mktx((x) => [x.contractTerms, x.peerPushCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["contractTerms", "peerPushCredit"], + async (tx) => { const rec: PeerPushPaymentIncomingRecord = { peerPushCreditId, contractPriv: contractPriv, @@ -489,7 +490,8 @@ export async function preparePeerPushCredit( }, newTxState, } satisfies TransitionInfo; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, @@ -542,9 +544,9 @@ async function longpollKycStatus( // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushCredit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushCredit"], + async (tx) => { const peerInc = await tx.peerPushCredit.get(peerPushCreditId); if (!peerInc) { return; @@ -557,7 +559,8 @@ async function longpollKycStatus( const newTxState = computePeerPushCreditTransactionState(peerInc); await tx.peerPushCredit.put(peerInc); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? @@ -600,9 +603,9 @@ async function processPeerPushCreditKycRequired( } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); - const { transitionInfo, result } = await ws.db - .mktx((x) => [x.peerPushCredit]) - .runReadWrite(async (tx) => { + const { transitionInfo, result } = await ws.db.runReadWriteTx( + ["peerPushCredit"], + async (tx) => { const peerInc = await tx.peerPushCredit.get(peerPushCreditId); if (!peerInc) { return { @@ -634,7 +637,8 @@ async function processPeerPushCreditKycRequired( transitionInfo: { oldTxState, newTxState }, result: res, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return result; } else { @@ -724,16 +728,16 @@ async function handlePendingMerge( }, }); - const txRes = await ws.db - .mktx((x) => [ - x.contractTerms, - x.peerPushCredit, - x.withdrawalGroups, - x.reserves, - x.exchanges, - x.exchangeDetails, - ]) - .runReadWrite(async (tx) => { + const txRes = await ws.db.runReadWriteTx( + [ + "contractTerms", + "peerPushCredit", + "withdrawalGroups", + "reserves", + "exchanges", + "exchangeDetails", + ], + async (tx) => { const peerInc = await tx.peerPushCredit.get(peerPushCreditId); if (!peerInc) { return undefined; @@ -761,7 +765,8 @@ async function handlePendingMerge( peerPushCreditTransition: { oldTxState, newTxState }, wgCreateRes, }; - }); + }, + ); // Transaction was committed, now we can emit notifications. if (txRes?.wgCreateRes?.exchangeNotif) { ws.notify(txRes.wgCreateRes.exchangeNotif); @@ -789,9 +794,9 @@ async function handlePendingWithdrawing( }); const wgId = peerInc.withdrawalGroupId; let finished: boolean = false; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushCredit, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushCredit", "withdrawalGroups"], + async (tx) => { const ppi = await tx.peerPushCredit.get(peerInc.peerPushCreditId); if (!ppi) { finished = true; @@ -820,7 +825,8 @@ async function handlePendingWithdrawing( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); if (finished) { return TaskRunResult.finished(); @@ -837,9 +843,9 @@ export async function processPeerPushCredit( ): Promise { let peerInc: PeerPushPaymentIncomingRecord | undefined; let contractTerms: PeerContractTerms | undefined; - await ws.db - .mktx((x) => [x.contractTerms, x.peerPushCredit]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["contractTerms", "peerPushCredit"], + async (tx) => { peerInc = await tx.peerPushCredit.get(peerPushCreditId); if (!peerInc) { return; @@ -849,7 +855,8 @@ export async function processPeerPushCredit( contractTerms = ctRec.contractTermsRaw; } await tx.peerPushCredit.put(peerInc); - }); + }, + ); if (!peerInc) { throw Error( @@ -910,9 +917,9 @@ export async function confirmPeerPushCredit( throw Error("no transaction ID (or deprecated peerPushCreditId) provided"); } - await ws.db - .mktx((x) => [x.contractTerms, x.peerPushCredit]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["contractTerms", "peerPushCredit"], + async (tx) => { peerInc = await tx.peerPushCredit.get(peerPushCreditId); if (!peerInc) { return; @@ -921,7 +928,8 @@ export async function confirmPeerPushCredit( peerInc.status = PeerPushCreditStatus.PendingMerge; } await tx.peerPushCredit.put(peerInc); - }); + }, + ); if (!peerInc) { throw Error( diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts index 165c8deee..91c5430be 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts @@ -101,22 +101,20 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, pursePub, transactionId } = this; - await ws.db - .mktx((x) => [x.peerPushDebit, x.tombstones]) - .runReadWrite(async (tx) => { - const debit = await tx.peerPushDebit.get(pursePub); - if (debit) { - await tx.peerPushDebit.delete(pursePub); - await tx.tombstones.put({ id: transactionId }); - } - }); + await ws.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => { + const debit = await tx.peerPushDebit.get(pursePub); + if (debit) { + await tx.peerPushDebit.delete(pursePub); + await tx.tombstones.put({ id: transactionId }); + } + }); } async suspendTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit"], + async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); @@ -164,16 +162,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit"], + async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); @@ -216,7 +215,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); @@ -224,9 +224,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit"], + async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); @@ -274,16 +274,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.startShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit"], + async (tx) => { const pushDebitRec = await tx.peerPushDebit.get(pursePub); if (!pushDebitRec) { logger.warn(`peer push debit ${pursePub} not found`); @@ -326,7 +327,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); @@ -411,28 +413,26 @@ async function handlePurseCreationConflict( ); } - await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { - const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); - if (!myPpi) { - return; - } - switch (myPpi.status) { - case PeerPushDebitStatus.PendingCreatePurse: - case PeerPushDebitStatus.SuspendedCreatePurse: { - const sel = coinSelRes.result; - myPpi.coinSel = { - coinPubs: sel.coins.map((x) => x.coinPub), - contributions: sel.coins.map((x) => x.contribution), - }; - break; - } - default: - return; + await ws.db.runReadWriteTx(["peerPushDebit"], async (tx) => { + const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub); + if (!myPpi) { + return; + } + switch (myPpi.status) { + case PeerPushDebitStatus.PendingCreatePurse: + case PeerPushDebitStatus.SuspendedCreatePurse: { + const sel = coinSelRes.result; + myPpi.coinSel = { + coinPubs: sel.coins.map((x) => x.coinPub), + contributions: sel.coins.map((x) => x.contribution), + }; + break; } - await tx.peerPushDebit.put(myPpi); - }); + default: + return; + } + await tx.peerPushDebit.put(myPpi); + }); return TaskRunResult.progress(); } @@ -448,11 +448,12 @@ async function processPeerPushDebitCreateReserve( logger.trace(`processing ${transactionId} pending(create-reserve)`); - const contractTermsRecord = await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(async (tx) => { + const contractTermsRecord = await ws.db.runReadOnlyTx( + ["contractTerms"], + async (tx) => { return tx.contractTerms.get(hContractTerms); - }); + }, + ); if (!contractTermsRecord) { throw Error( @@ -583,15 +584,15 @@ async function processPeerPushDebitAbortingDeletePurse( }); logger.info(`deleted purse with response status ${resp.status}`); - const transitionInfo = await ws.db - .mktx((x) => [ - x.peerPushDebit, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "peerPushDebit", + "refreshGroups", + "denominations", + "coinAvailability", + "coins", + ], + async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; @@ -626,7 +627,8 @@ async function processPeerPushDebitAbortingDeletePurse( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); @@ -646,9 +648,9 @@ async function transitionPeerPushDebitTransaction( tag: TransactionType.PeerPushDebit, pursePub, }); - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit"], + async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; @@ -664,7 +666,8 @@ async function transitionPeerPushDebitTransaction( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -679,9 +682,9 @@ async function processPeerPushDebitAbortingRefreshDeleted( tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups, x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups", "peerPushDebit"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; if (!refreshGroup) { @@ -710,7 +713,8 @@ async function processPeerPushDebitAbortingRefreshDeleted( return { oldTxState, newTxState }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); @@ -727,9 +731,9 @@ async function processPeerPushDebitAbortingRefreshExpired( tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups, x.peerPushDebit]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["peerPushDebit", "refreshGroups"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: PeerPushDebitStatus | undefined; if (!refreshGroup) { @@ -758,7 +762,8 @@ async function processPeerPushDebitAbortingRefreshExpired( return { oldTxState, newTxState }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! return TaskRunResult.backoff(); @@ -810,15 +815,15 @@ async function processPeerPushDebitReady( } } else if (resp.status === HttpStatusCode.Gone) { logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - const transitionInfo = await ws.db - .mktx((x) => [ - x.peerPushDebit, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "peerPushDebit", + "refreshGroups", + "denominations", + "coinAvailability", + "coins", + ], + async (tx) => { const ppiRec = await tx.peerPushDebit.get(pursePub); if (!ppiRec) { return undefined; @@ -853,7 +858,8 @@ async function processPeerPushDebitReady( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); } else { @@ -867,11 +873,12 @@ export async function processPeerPushDebit( pursePub: string, cancellationToken: CancellationToken, ): Promise { - const peerPushInitiation = await ws.db - .mktx((x) => [x.peerPushDebit]) - .runReadOnly(async (tx) => { + const peerPushInitiation = await ws.db.runReadOnlyTx( + ["peerPushDebit"], + async (tx) => { return tx.peerPushDebit.get(pursePub); - }); + }, + ); if (!peerPushInitiation) { throw Error("peer push payment not found"); } @@ -953,17 +960,17 @@ export async function initiatePeerPushDebit( const contractEncNonce = encodeCrock(getRandomBytes(24)); - const transitionInfo = await ws.db - .mktx((x) => [ - x.exchanges, - x.contractTerms, - x.coins, - x.coinAvailability, - x.denominations, - x.refreshGroups, - x.peerPushDebit, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "peerPushDebit", + ], + async (tx) => { // FIXME: Instead of directly doing a spendCoin here, // we might want to mark the coins as used and spend them // after we've been able to create the purse. @@ -1012,7 +1019,8 @@ export async function initiatePeerPushDebit( oldTxState: { major: TransactionMajorState.None }, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.notify({ type: NotificationType.BalanceChange, diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts index 2dd88b614..b88115d8e 100644 --- a/packages/taler-wallet-core/src/operations/recoup.ts +++ b/packages/taler-wallet-core/src/operations/recoup.ts @@ -45,7 +45,7 @@ import { RecoupGroupRecord, RecoupOperationStatus, RefreshCoinSource, - WalletStoresV1, + WalletDbReadWriteTransaction, WithdrawCoinSource, WithdrawalGroupStatus, WithdrawalRecordType, @@ -54,7 +54,6 @@ import { import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType } from "../pending-types.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { GetReadWriteAccess } from "../util/query.js"; import { TaskRunResult, TransactionContext, @@ -72,12 +71,9 @@ const logger = new Logger("operations/recoup.ts"); */ async function putGroupAsFinished( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - recoupGroups: typeof WalletStoresV1.recoupGroups; - denominations: typeof WalletStoresV1.denominations; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coins: typeof WalletStoresV1.coins; - }>, + tx: WalletDbReadWriteTransaction< + ["recoupGroups", "denominations", "refreshGroups", "coins"] + >, recoupGroup: RecoupGroupRecord, coinIdx: number, ): Promise { @@ -100,14 +96,9 @@ async function recoupRewardCoin( // We can't really recoup a coin we got via tipping. // Thus we just put the coin to sleep. // FIXME: somehow report this to the user - await ws.db - .mktx((stores) => [ - stores.recoupGroups, - stores.denominations, - stores.refreshGroups, - stores.coins, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["recoupGroups", "denominations", "refreshGroups", "coins"], + async (tx) => { const recoupGroup = await tx.recoupGroups.get(recoupGroupId); if (!recoupGroup) { return; @@ -116,7 +107,8 @@ async function recoupRewardCoin( return; } await putGroupAsFinished(ws, tx, recoupGroup, coinIdx); - }); + }, + ); } async function recoupWithdrawCoin( @@ -127,17 +119,15 @@ async function recoupWithdrawCoin( cs: WithdrawCoinSource, ): Promise { const reservePub = cs.reservePub; - const denomInfo = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { - const denomInfo = await ws.getDenomInfo( - ws, - tx, - coin.exchangeBaseUrl, - coin.denomPubHash, - ); - return denomInfo; - }); + const denomInfo = await ws.db.runReadOnlyTx(["denominations"], async (tx) => { + const denomInfo = await ws.getDenomInfo( + ws, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + return denomInfo; + }); if (!denomInfo) { // FIXME: We should at least emit some pending operation / warning for this? return; @@ -170,9 +160,9 @@ async function recoupWithdrawCoin( // FIXME: verify that our expectations about the amount match - await ws.db - .mktx((x) => [x.coins, x.denominations, x.recoupGroups, x.refreshGroups]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["coins", "denominations", "recoupGroups", "refreshGroups"], + async (tx) => { const recoupGroup = await tx.recoupGroups.get(recoupGroupId); if (!recoupGroup) { return; @@ -187,7 +177,8 @@ async function recoupWithdrawCoin( updatedCoin.status = CoinStatus.Dormant; await tx.coins.put(updatedCoin); await putGroupAsFinished(ws, tx, recoupGroup, coinIdx); - }); + }, + ); } async function recoupRefreshCoin( @@ -197,9 +188,9 @@ async function recoupRefreshCoin( coin: CoinRecord, cs: RefreshCoinSource, ): Promise { - const d = await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { + const d = await ws.db.runReadOnlyTx( + ["coins", "denominations"], + async (tx) => { const denomInfo = await ws.getDenomInfo( ws, tx, @@ -210,7 +201,8 @@ async function recoupRefreshCoin( return; } return { denomInfo }; - }); + }, + ); if (!d) { // FIXME: We should at least emit some pending operation / warning for this? return; @@ -243,9 +235,9 @@ async function recoupRefreshCoin( throw Error(`Coin's oldCoinPub doesn't match reserve on recoup`); } - await ws.db - .mktx((x) => [x.coins, x.denominations, x.recoupGroups, x.refreshGroups]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["coins", "denominations", "recoupGroups", "refreshGroups"], + async (tx) => { const recoupGroup = await tx.recoupGroups.get(recoupGroupId); if (!recoupGroup) { return; @@ -296,18 +288,17 @@ async function recoupRefreshCoin( await tx.coins.put(revokedCoin); await tx.coins.put(oldCoin); await putGroupAsFinished(ws, tx, recoupGroup, coinIdx); - }); + }, + ); } export async function processRecoupGroup( ws: InternalWalletState, recoupGroupId: string, ): Promise { - let recoupGroup = await ws.db - .mktx((x) => [x.recoupGroups]) - .runReadOnly(async (tx) => { - return tx.recoupGroups.get(recoupGroupId); - }); + let recoupGroup = await ws.db.runReadOnlyTx(["recoupGroups"], async (tx) => { + return tx.recoupGroups.get(recoupGroupId); + }); if (!recoupGroup) { return TaskRunResult.finished(); } @@ -325,11 +316,9 @@ export async function processRecoupGroup( }); await Promise.all(ps); - recoupGroup = await ws.db - .mktx((x) => [x.recoupGroups]) - .runReadOnly(async (tx) => { - return tx.recoupGroups.get(recoupGroupId); - }); + recoupGroup = await ws.db.runReadOnlyTx(["recoupGroups"], async (tx) => { + return tx.recoupGroups.get(recoupGroupId); + }); if (!recoupGroup) { return TaskRunResult.finished(); } @@ -346,24 +335,22 @@ export async function processRecoupGroup( const reservePrivMap: Record = {}; for (let i = 0; i < recoupGroup.coinPubs.length; i++) { const coinPub = recoupGroup.coinPubs[i]; - await ws.db - .mktx((x) => [x.coins, x.reserves]) - .runReadOnly(async (tx) => { - const coin = await tx.coins.get(coinPub); - if (!coin) { - throw Error(`Coin ${coinPub} not found, can't request recoup`); - } - if (coin.coinSource.type === CoinSourceType.Withdraw) { - const reserve = await tx.reserves.indexes.byReservePub.get( - coin.coinSource.reservePub, - ); - if (!reserve) { - return; - } - reserveSet.add(coin.coinSource.reservePub); - reservePrivMap[coin.coinSource.reservePub] = reserve.reservePriv; + await ws.db.runReadOnlyTx(["coins", "reserves"], async (tx) => { + const coin = await tx.coins.get(coinPub); + if (!coin) { + throw Error(`Coin ${coinPub} not found, can't request recoup`); + } + if (coin.coinSource.type === CoinSourceType.Withdraw) { + const reserve = await tx.reserves.indexes.byReservePub.get( + coin.coinSource.reservePub, + ); + if (!reserve) { + return; } - }); + reserveSet.add(coin.coinSource.reservePub); + reservePrivMap[coin.coinSource.reservePub] = reserve.reservePriv; + } + }); } for (const reservePub of reserveSet) { @@ -393,15 +380,15 @@ export async function processRecoupGroup( }); } - await ws.db - .mktx((x) => [ - x.recoupGroups, - x.coinAvailability, - x.denominations, - x.refreshGroups, - x.coins, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "recoupGroups", + "coinAvailability", + "denominations", + "refreshGroups", + "coins", + ], + async (tx) => { const rg2 = await tx.recoupGroups.get(recoupGroupId); if (!rg2) { return; @@ -422,7 +409,8 @@ export async function processRecoupGroup( ); } await tx.recoupGroups.put(rg2); - }); + }, + ); return TaskRunResult.finished(); } @@ -462,12 +450,9 @@ export class RewardTransactionContext implements TransactionContext { export async function createRecoupGroup( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - recoupGroups: typeof WalletStoresV1.recoupGroups; - denominations: typeof WalletStoresV1.denominations; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coins: typeof WalletStoresV1.coins; - }>, + tx: WalletDbReadWriteTransaction< + ["recoupGroups", "denominations", "refreshGroups", "coins"] + >, exchangeBaseUrl: string, coinPubs: string[], ): Promise { @@ -507,9 +492,9 @@ async function processRecoupForCoin( recoupGroupId: string, coinIdx: number, ): Promise { - const coin = await ws.db - .mktx((x) => [x.recoupGroups, x.coins]) - .runReadOnly(async (tx) => { + const coin = await ws.db.runReadOnlyTx( + ["coins", "recoupGroups"], + async (tx) => { const recoupGroup = await tx.recoupGroups.get(recoupGroupId); if (!recoupGroup) { return; @@ -528,7 +513,8 @@ async function processRecoupForCoin( throw Error(`Coin ${coinPub} not found, can't request recoup`); } return coin; - }); + }, + ); if (!coin) { return; diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index b9ac12518..ad9fdedb4 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -72,7 +72,6 @@ import { RefreshCoinStatus, RefreshGroupRecord, RefreshOperationStatus, - WalletStoresV1, } from "../db.js"; import { getCandidateWithdrawalDenomsTx, @@ -81,7 +80,8 @@ import { RefreshSessionRecord, TaskId, timestampPreciseToDb, - WalletDbReadWriteTransactionArr, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, } from "../index.js"; import { EXCHANGE_COINS_LOCK, @@ -90,7 +90,6 @@ import { import { assertUnreachable } from "../util/assertUnreachable.js"; import { selectWithdrawalDenominations } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; import { constructTaskIdentifier, makeCoinAvailable, @@ -129,48 +128,44 @@ export class RefreshTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const refreshGroupId = this.refreshGroupId; const ws = this.ws; - await ws.db - .mktx((x) => [x.refreshGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const rg = await tx.refreshGroups.get(refreshGroupId); - if (rg) { - await tx.refreshGroups.delete(refreshGroupId); - await tx.tombstones.put({ - id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId, - }); - } - }); + await ws.db.runReadWriteTx(["refreshGroups", "tombstones"], async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (rg) { + await tx.refreshGroups.delete(refreshGroupId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId, + }); + } + }); } async suspendTransaction(): Promise { const { ws, refreshGroupId, transactionId } = this; - let res = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.refreshGroups.get(refreshGroupId); - if (!dg) { - logger.warn( - `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`, - ); + let res = await ws.db.runReadWriteTx(["refreshGroups"], async (tx) => { + const dg = await tx.refreshGroups.get(refreshGroupId); + if (!dg) { + logger.warn( + `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`, + ); + return undefined; + } + const oldState = computeRefreshTransactionState(dg); + switch (dg.operationStatus) { + case RefreshOperationStatus.Finished: return undefined; + case RefreshOperationStatus.Pending: { + dg.operationStatus = RefreshOperationStatus.Suspended; + await tx.refreshGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeRefreshTransactionState(dg), + }; } - const oldState = computeRefreshTransactionState(dg); - switch (dg.operationStatus) { - case RefreshOperationStatus.Finished: - return undefined; - case RefreshOperationStatus.Pending: { - dg.operationStatus = RefreshOperationStatus.Suspended; - await tx.refreshGroups.put(dg); - return { - oldTxState: oldState, - newTxState: computeRefreshTransactionState(dg), - }; - } - case RefreshOperationStatus.Suspended: - return undefined; - } - return undefined; - }); + case RefreshOperationStatus.Suspended: + return undefined; + } + return undefined; + }); if (res) { ws.notify({ type: NotificationType.TransactionStateTransition, @@ -188,9 +183,9 @@ export class RefreshTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups"], + async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( @@ -214,16 +209,17 @@ export class RefreshTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { const { ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups"], + async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( @@ -253,7 +249,8 @@ export class RefreshTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeRefreshTransactionState(dg), }; - }); + }, + ); ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); @@ -341,9 +338,9 @@ async function provideRefreshSession( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions]) - .runReadWrite(async (tx) => { + const d = await ws.db.runReadWriteTx( + ["coins", "refreshGroups", "refreshSessions"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -363,7 +360,8 @@ async function provideRefreshSession( throw Error("Can't refresh, coin not found"); } return { refreshGroup, coin, existingRefreshSession }; - }); + }, + ); if (!d) { return undefined; @@ -380,9 +378,9 @@ async function provideRefreshSession( // FIXME: use helper functions from withdraw.ts // to update and filter withdrawable denoms. - const { availableAmount, availableDenoms } = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { + const { availableAmount, availableDenoms } = await ws.db.runReadOnlyTx( + ["denominations"], + async (tx) => { const oldDenom = await ws.getDenomInfo( ws, tx, @@ -405,7 +403,8 @@ async function provideRefreshSession( oldDenom.feeRefresh, ).amount; return { availableAmount, availableDenoms }; - }); + }, + ); const newCoinDenoms = selectWithdrawalDenominations( availableAmount, @@ -424,9 +423,9 @@ async function provideRefreshSession( availableAmount, )} too small`, ); - const transitionInfo = await ws.db - .mktx((x) => [x.coins, x.coinAvailability, x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups", "coins", "coinAvailability"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -440,7 +439,8 @@ async function provideRefreshSession( await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, @@ -452,9 +452,9 @@ async function provideRefreshSession( const sessionSecretSeed = encodeCrock(getRandomBytes(64)); // Store refresh session for this coin in the database. - const mySession = await ws.db - .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions]) - .runReadWrite(async (tx) => { + const mySession = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -479,7 +479,8 @@ async function provideRefreshSession( }; await tx.refreshSessions.put(newSession); return newSession; - }); + }, + ); logger.trace( `found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`, ); @@ -497,9 +498,9 @@ async function refreshMelt( refreshGroupId: string, coinIndex: number, ): Promise { - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations]) - .runReadWrite(async (tx) => { + const d = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions", "coins", "denominations"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -550,7 +551,8 @@ async function refreshMelt( }); } return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; - }); + }, + ); if (!d) { return; @@ -618,14 +620,9 @@ async function refreshMelt( if (resp.status === HttpStatusCode.NotFound) { const errDetails = await readUnexpectedResponseDetails(resp); - const transitionInfo = await ws.db - .mktx((x) => [ - x.refreshGroups, - x.refreshSessions, - x.coins, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions", "coins", "coinAvailability"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -659,7 +656,8 @@ async function refreshMelt( oldTxState, newTxState, }; - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, @@ -710,9 +708,9 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -729,7 +727,8 @@ async function refreshMelt( } rs.norevealIndex = norevealIndex; await tx.refreshSessions.put(rs); - }); + }, + ); } export async function assembleRefreshRevealRequest(args: { @@ -798,9 +797,9 @@ async function refreshReveal( logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, ); - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations]) - .runReadOnly(async (tx) => { + const d = await ws.db.runReadOnlyTx( + ["refreshGroups", "refreshSessions", "coins", "denominations"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -859,7 +858,8 @@ async function refreshReveal( refreshGroup, norevealIndex, }; - }); + }, + ); if (!d) { return; @@ -972,15 +972,15 @@ async function refreshReveal( } } - const transitionInfo = await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.refreshGroups, - x.refreshSessions, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "coins", + "denominations", + "coinAvailability", + "refreshGroups", + "refreshSessions", + ], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { logger.warn("no refresh session found"); @@ -1000,7 +1000,8 @@ async function refreshReveal( await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); logger.trace("refresh finished (end of reveal)"); } @@ -1012,9 +1013,10 @@ export async function processRefreshGroup( ): Promise { logger.trace(`processing refresh group ${refreshGroupId}`); - const refreshGroup = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadOnly(async (tx) => tx.refreshGroups.get(refreshGroupId)); + const refreshGroup = await ws.db.runReadOnlyTx( + ["refreshGroups"], + async (tx) => tx.refreshGroups.get(refreshGroupId), + ); if (!refreshGroup) { return TaskRunResult.finished(); } @@ -1084,16 +1086,17 @@ async function processRefreshSession( logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); - let { refreshGroup, refreshSession } = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions]) - .runReadOnly(async (tx) => { + let { refreshGroup, refreshSession } = await ws.db.runReadOnlyTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); return { refreshGroup: rg, refreshSession: rs, }; - }); + }, + ); if (!refreshGroup) { return; } @@ -1122,12 +1125,9 @@ export interface RefreshOutputInfo { export async function calculateRefreshOutput( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - denominations: typeof WalletStoresV1.denominations; - coins: typeof WalletStoresV1.coins; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coinAvailability: typeof WalletStoresV1.coinAvailability; - }>, + tx: WalletDbReadOnlyTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, currency: string, oldCoinPubs: CoinRefreshRequest[], ): Promise { @@ -1196,12 +1196,9 @@ export async function calculateRefreshOutput( async function applyRefresh( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - denominations: typeof WalletStoresV1.denominations; - coins: typeof WalletStoresV1.coins; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coinAvailability: typeof WalletStoresV1.coinAvailability; - }>, + tx: WalletDbReadWriteTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, oldCoinPubs: CoinRefreshRequest[], refreshGroupId: string, ): Promise { @@ -1272,7 +1269,7 @@ export interface CreateRefreshGroupResult { */ export async function createRefreshGroup( ws: InternalWalletState, - tx: WalletDbReadWriteTransactionArr< + tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, currency: string, @@ -1395,14 +1392,9 @@ export async function forceRefresh( if (req.coinPubList.length == 0) { throw Error("refusing to create empty refresh group"); } - const refreshGroupId = await ws.db - .mktx((x) => [ - x.refreshGroups, - x.coinAvailability, - x.denominations, - x.coins, - ]) - .runReadWrite(async (tx) => { + const refreshGroupId = await ws.db.runReadWriteTx( + ["refreshGroups", "coinAvailability", "denominations", "coins"], + async (tx) => { let coinPubs: CoinRefreshRequest[] = []; for (const c of req.coinPubList) { const coin = await tx.coins.get(c); @@ -1429,7 +1421,8 @@ export async function forceRefresh( RefreshReason.Manual, undefined, ); - }); + }, + ); return { refreshGroupId, diff --git a/packages/taler-wallet-core/src/operations/reward.ts b/packages/taler-wallet-core/src/operations/reward.ts index 4d8653a9d..7d826e630 100644 --- a/packages/taler-wallet-core/src/operations/reward.ts +++ b/packages/taler-wallet-core/src/operations/reward.ts @@ -19,73 +19,29 @@ */ import { AcceptTipResponse, - AgeRestriction, - Amounts, - BlindedDenominationSignature, - codecForMerchantTipResponseV2, - codecForRewardPickupGetResponse, - CoinStatus, - DenomKeyType, - encodeCrock, - getRandomBytes, - j2s, Logger, - NotificationType, - parseRewardUri, PrepareTipResult, - TalerErrorCode, - TalerPreciseTimestamp, - TipPlanchetDetail, TransactionAction, TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, TransactionType, - URL, } from "@gnu-taler/taler-util"; -import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js"; -import { - CoinRecord, - CoinSourceType, - DenominationRecord, - RewardRecord, - RewardRecordStatus, - timestampPreciseFromDb, - timestampPreciseToDb, - timestampProtocolFromDb, - timestampProtocolToDb, -} from "../db.js"; -import { makeErrorDetail } from "@gnu-taler/taler-util"; +import { RewardRecord, RewardRecordStatus } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js"; +import { assertUnreachable } from "../util/assertUnreachable.js"; import { - getHttpResponseErrorDetails, - readSuccessResponseJsonOrThrow, -} from "@gnu-taler/taler-util/http"; -import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { - constructTaskIdentifier, - makeCoinAvailable, - makeCoinsVisible, TaskRunResult, - TaskRunResultType, TombstoneTag, TransactionContext, + constructTaskIdentifier, } from "./common.js"; -import { fetchFreshExchange } from "./exchanges.js"; -import { - getCandidateWithdrawalDenoms, - getExchangeWithdrawalInfo, - updateWithdrawalDenoms, -} from "./withdraw.js"; -import { selectWithdrawalDenominations } from "../util/coinSelection.js"; import { constructTransactionIdentifier, notifyTransition, - parseTransactionIdentifier, } from "./transactions.js"; -import { PendingTaskType } from "../pending-types.js"; -import { assertUnreachable } from "../util/assertUnreachable.js"; const logger = new Logger("operations/tip.ts"); @@ -109,24 +65,22 @@ export class RewardTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, walletRewardId } = this; - await ws.db - .mktx((x) => [x.rewards, x.tombstones]) - .runReadWrite(async (tx) => { - const tipRecord = await tx.rewards.get(walletRewardId); - if (tipRecord) { - await tx.rewards.delete(walletRewardId); - await tx.tombstones.put({ - id: TombstoneTag.DeleteReward + ":" + walletRewardId, - }); - } - }); + await ws.db.runReadWriteTx(["rewards", "tombstones"], async (tx) => { + const tipRecord = await tx.rewards.get(walletRewardId); + if (tipRecord) { + await tx.rewards.delete(walletRewardId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteReward + ":" + walletRewardId, + }); + } + }); } async suspendTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["rewards"], + async (tx) => { const tipRec = await tx.rewards.get(walletRewardId); if (!tipRec) { logger.warn(`transaction tip ${walletRewardId} not found`); @@ -158,15 +112,16 @@ export class RewardTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["rewards"], + async (tx) => { const tipRec = await tx.rewards.get(walletRewardId); if (!tipRec) { logger.warn(`transaction tip ${walletRewardId} not found`); @@ -197,15 +152,16 @@ export class RewardTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } async resumeTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["rewards"], + async (tx) => { const rewardRec = await tx.rewards.get(walletRewardId); if (!rewardRec) { logger.warn(`transaction reward ${walletRewardId} not found`); @@ -236,15 +192,16 @@ export class RewardTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["rewards"], + async (tx) => { const tipRec = await tx.rewards.get(walletRewardId); if (!tipRec) { logger.warn(`transaction tip ${walletRewardId} not found`); @@ -275,7 +232,8 @@ export class RewardTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } } @@ -345,368 +303,19 @@ export async function prepareReward( ws: InternalWalletState, talerTipUri: string, ): Promise { - const res = parseRewardUri(talerTipUri); - if (!res) { - throw Error("invalid taler://tip URI"); - } - - let tipRecord = await ws.db - .mktx((x) => [x.rewards]) - .runReadOnly(async (tx) => { - return tx.rewards.indexes.byMerchantTipIdAndBaseUrl.get([ - res.merchantRewardId, - res.merchantBaseUrl, - ]); - }); - - if (!tipRecord) { - const tipStatusUrl = new URL( - `rewards/${res.merchantRewardId}`, - res.merchantBaseUrl, - ); - logger.trace("checking tip status from", tipStatusUrl.href); - const merchantResp = await ws.http.fetch(tipStatusUrl.href); - const rewardPickupStatus = await readSuccessResponseJsonOrThrow( - merchantResp, - codecForRewardPickupGetResponse(), - ); - logger.trace(`status ${j2s(rewardPickupStatus)}`); - - const amount = Amounts.parseOrThrow(rewardPickupStatus.reward_amount); - const currency = amount.currency; - - logger.trace("new tip, creating tip record"); - await fetchFreshExchange(ws, rewardPickupStatus.exchange_url); - - //FIXME: is this needed? withdrawDetails is not used - // * if the intention is to update the exchange information in the database - // maybe we can use another name. `get` seems like a pure-function - const withdrawDetails = await getExchangeWithdrawalInfo( - ws, - rewardPickupStatus.exchange_url, - amount, - undefined, - ); - - const walletRewardId = encodeCrock(getRandomBytes(32)); - await updateWithdrawalDenoms(ws, rewardPickupStatus.exchange_url); - const denoms = await getCandidateWithdrawalDenoms( - ws, - rewardPickupStatus.exchange_url, - currency, - ); - const selectedDenoms = selectWithdrawalDenominations(amount, denoms); - - const secretSeed = encodeCrock(getRandomBytes(64)); - const denomSelUid = encodeCrock(getRandomBytes(32)); - - const newTipRecord: RewardRecord = { - walletRewardId: walletRewardId, - acceptedTimestamp: undefined, - status: RewardRecordStatus.DialogAccept, - rewardAmountRaw: Amounts.stringify(amount), - rewardExpiration: timestampProtocolToDb(rewardPickupStatus.expiration), - exchangeBaseUrl: rewardPickupStatus.exchange_url, - next_url: rewardPickupStatus.next_url, - merchantBaseUrl: res.merchantBaseUrl, - createdTimestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()), - merchantRewardId: res.merchantRewardId, - rewardAmountEffective: Amounts.stringify(selectedDenoms.totalCoinValue), - denomsSel: selectedDenoms, - pickedUpTimestamp: undefined, - secretSeed, - denomSelUid, - }; - await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { - await tx.rewards.put(newTipRecord); - }); - tipRecord = newTipRecord; - } - - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: tipRecord.walletRewardId, - }); - - const tipStatus: PrepareTipResult = { - accepted: !!tipRecord && !!tipRecord.acceptedTimestamp, - rewardAmountRaw: Amounts.stringify(tipRecord.rewardAmountRaw), - exchangeBaseUrl: tipRecord.exchangeBaseUrl, - merchantBaseUrl: tipRecord.merchantBaseUrl, - expirationTimestamp: timestampProtocolFromDb(tipRecord.rewardExpiration), - rewardAmountEffective: Amounts.stringify(tipRecord.rewardAmountEffective), - walletRewardId: tipRecord.walletRewardId, - transactionId, - }; - - return tipStatus; + throw Error("the rewards feature is not supported anymore"); } export async function processTip( ws: InternalWalletState, walletTipId: string, ): Promise { - const tipRecord = await ws.db - .mktx((x) => [x.rewards]) - .runReadOnly(async (tx) => { - return tx.rewards.get(walletTipId); - }); - if (!tipRecord) { - return TaskRunResult.finished(); - } - - switch (tipRecord.status) { - case RewardRecordStatus.Aborted: - case RewardRecordStatus.DialogAccept: - case RewardRecordStatus.Done: - case RewardRecordStatus.SuspendedPickup: - return TaskRunResult.finished(); - } - - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: walletTipId, - }); - - const denomsForWithdraw = tipRecord.denomsSel; - - const planchets: DerivedTipPlanchet[] = []; - // Planchets in the form that the merchant expects - const planchetsDetail: TipPlanchetDetail[] = []; - const denomForPlanchet: { [index: number]: DenominationRecord } = []; - - for (const dh of denomsForWithdraw.selectedDenoms) { - const denom = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { - return tx.denominations.get([ - tipRecord.exchangeBaseUrl, - dh.denomPubHash, - ]); - }); - checkDbInvariant(!!denom, "denomination should be in database"); - for (let i = 0; i < dh.count; i++) { - const deriveReq = { - denomPub: denom.denomPub, - planchetIndex: planchets.length, - secretSeed: tipRecord.secretSeed, - }; - logger.trace(`deriving tip planchet: ${j2s(deriveReq)}`); - const p = await ws.cryptoApi.createTipPlanchet(deriveReq); - logger.trace(`derive result: ${j2s(p)}`); - denomForPlanchet[planchets.length] = denom; - planchets.push(p); - planchetsDetail.push({ - coin_ev: p.coinEv, - denom_pub_hash: denom.denomPubHash, - }); - } - } - - const tipStatusUrl = new URL( - `rewards/${tipRecord.merchantRewardId}/pickup`, - tipRecord.merchantBaseUrl, - ); - - const req = { planchets: planchetsDetail }; - logger.trace(`sending tip request: ${j2s(req)}`); - const merchantResp = await ws.http.fetch(tipStatusUrl.href, { - method: "POST", - body: req, - }); - - logger.trace(`got tip response, status ${merchantResp.status}`); - - // FIXME: Why do we do this? - if ( - (merchantResp.status >= 500 && merchantResp.status <= 599) || - merchantResp.status === 424 - ) { - logger.trace(`got transient tip error`); - // FIXME: wrap in another error code that indicates a transient error - return { - type: TaskRunResultType.Error, - errorDetail: makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, - getHttpResponseErrorDetails(merchantResp), - "tip pickup failed (transient)", - ), - }; - } - let blindedSigs: BlindedDenominationSignature[] = []; - - const response = await readSuccessResponseJsonOrThrow( - merchantResp, - codecForMerchantTipResponseV2(), - ); - blindedSigs = response.blind_sigs.map((x) => x.blind_sig); - - if (blindedSigs.length !== planchets.length) { - throw Error("number of tip responses does not match requested planchets"); - } - - const newCoinRecords: CoinRecord[] = []; - - for (let i = 0; i < blindedSigs.length; i++) { - const blindedSig = blindedSigs[i]; - - const denom = denomForPlanchet[i]; - checkLogicInvariant(!!denom); - const planchet = planchets[i]; - checkLogicInvariant(!!planchet); - - if (denom.denomPub.cipher !== DenomKeyType.Rsa) { - throw Error("unsupported cipher"); - } - - if (blindedSig.cipher !== DenomKeyType.Rsa) { - throw Error("unsupported cipher"); - } - - const denomSigRsa = await ws.cryptoApi.rsaUnblind({ - bk: planchet.blindingKey, - blindedSig: blindedSig.blinded_rsa_signature, - pk: denom.denomPub.rsa_public_key, - }); - - const isValid = await ws.cryptoApi.rsaVerify({ - hm: planchet.coinPub, - pk: denom.denomPub.rsa_public_key, - sig: denomSigRsa.sig, - }); - - if (!isValid) { - return { - type: TaskRunResultType.Error, - errorDetail: makeErrorDetail( - TalerErrorCode.WALLET_REWARD_COIN_SIGNATURE_INVALID, - {}, - "invalid signature from the exchange (via merchant reward) after unblinding", - ), - }; - } - - newCoinRecords.push({ - blindingKey: planchet.blindingKey, - coinPriv: planchet.coinPriv, - coinPub: planchet.coinPub, - coinSource: { - type: CoinSourceType.Reward, - coinIndex: i, - walletRewardId: walletTipId, - }, - sourceTransactionId: transactionId, - denomPubHash: denom.denomPubHash, - denomSig: { cipher: DenomKeyType.Rsa, rsa_signature: denomSigRsa.sig }, - exchangeBaseUrl: tipRecord.exchangeBaseUrl, - status: CoinStatus.Fresh, - coinEvHash: planchet.coinEvHash, - maxAge: AgeRestriction.AGE_UNRESTRICTED, - ageCommitmentProof: planchet.ageCommitmentProof, - spendAllocation: undefined, - }); - } - - const transitionInfo = await ws.db - .mktx((x) => [x.coins, x.coinAvailability, x.denominations, x.rewards]) - .runReadWrite(async (tx) => { - const tr = await tx.rewards.get(walletTipId); - if (!tr) { - return; - } - if (tr.status !== RewardRecordStatus.PendingPickup) { - return; - } - const oldTxState = computeRewardTransactionStatus(tr); - tr.pickedUpTimestamp = timestampPreciseToDb(TalerPreciseTimestamp.now()); - tr.status = RewardRecordStatus.Done; - await tx.rewards.put(tr); - const newTxState = computeRewardTransactionStatus(tr); - for (const cr of newCoinRecords) { - await makeCoinAvailable(ws, tx, cr); - } - await makeCoinsVisible(ws, tx, transactionId); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - ws.notify({ - type: NotificationType.BalanceChange, - hintTransactionId: transactionId, - }); - return TaskRunResult.finished(); } -export async function acceptTipBackwardCompat( - ws: InternalWalletState, - walletTipId: string | undefined, - transactionIdParam: TransactionIdStr | undefined, -): Promise { - if (transactionIdParam) { - return acceptTip(ws, transactionIdParam); - } - if (walletTipId) { - /** - * old clients use still use tipId - */ - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: walletTipId, - }); - return acceptTip(ws, transactionId); - } - throw Error( - "Unable to accept tip: neither tipId (deprecated) nor transactionId was specified", - ); -} - export async function acceptTip( ws: InternalWalletState, transactionId: TransactionIdStr, ): Promise { - const pTxId = parseTransactionIdentifier(transactionId); - if (!pTxId) - throw Error(`Unable to accept tip: invalid tx tag "${transactionId}"`); - const rewardId = - pTxId.tag === TransactionType.Reward ? pTxId.walletRewardId : undefined; - if (!rewardId) - throw Error( - `Unable to accept tip: txId is not a reward tag "${pTxId.tag}"`, - ); - const dbRes = await ws.db - .mktx((x) => [x.rewards]) - .runReadWrite(async (tx) => { - const tipRecord = await tx.rewards.get(rewardId); - if (!tipRecord) { - logger.error("tip not found"); - return; - } - if (tipRecord.status != RewardRecordStatus.DialogAccept) { - logger.warn("Unable to accept tip in the current state"); - return { tipRecord }; - } - const oldTxState = computeRewardTransactionStatus(tipRecord); - tipRecord.acceptedTimestamp = timestampPreciseToDb( - TalerPreciseTimestamp.now(), - ); - tipRecord.status = RewardRecordStatus.PendingPickup; - await tx.rewards.put(tipRecord); - const newTxState = computeRewardTransactionStatus(tipRecord); - return { tipRecord, transitionInfo: { oldTxState, newTxState } }; - }); - - if (!dbRes) { - throw Error("tip not found"); - } - - notifyTransition(ws, transactionId, dbRes.transitionInfo); - - const tipRecord = dbRes.tipRecord; - - return { - transactionId, - next_url: tipRecord.next_url, - }; + throw Error("the rewards feature is not supported anymore"); } diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index 17863450c..3c7845813 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -903,11 +903,9 @@ export async function testPay( if (r.type != ConfirmPayResultType.Done) { throw Error("payment not done"); } - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(result.proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(result.proposalId); + }); checkLogicInvariant(!!purchase); return { payCoinSelection: purchase.payInfo?.payCoinSelection!, diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index 671ccd556..1d3ea3d5a 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -69,13 +69,12 @@ import { WithdrawalRecordType, } from "../db.js"; import { - GetReadOnlyAccess, OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, PeerPushDebitStatus, timestampPreciseFromDb, timestampProtocolFromDb, - WalletStoresV1, + WalletDbReadOnlyTransaction, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType, TaskId } from "../pending-types.js"; @@ -229,14 +228,14 @@ export async function getTransactionById( case TransactionType.InternalWithdrawal: case TransactionType.Withdrawal: { const withdrawalGroupId = parsedTx.withdrawalGroupId; - return await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.exchangeDetails, - x.exchanges, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + [ + "withdrawalGroups", + "exchangeDetails", + "exchanges", + "operationRetries", + ], + async (tx) => { const withdrawalGroupRecord = await tx.withdrawalGroups.get(withdrawalGroupId); @@ -265,7 +264,8 @@ export async function getTransactionById( exchangeDetails, ort, ); - }); + }, + ); } case TransactionType.Recoup: @@ -273,15 +273,15 @@ export async function getTransactionById( case TransactionType.Payment: { const proposalId = parsedTx.proposalId; - return await ws.db - .mktx((x) => [ - x.purchases, - x.tombstones, - x.operationRetries, - x.refundGroups, - x.contractTerms, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + [ + "purchases", + "tombstones", + "operationRetries", + "contractTerms", + "refundGroups", + ], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) throw Error("not found"); const download = await expectProposalDownload(ws, purchase, tx); @@ -299,7 +299,8 @@ export async function getTransactionById( refunds, payRetryRecord, ); - }); + }, + ); } case TransactionType.Refresh: { @@ -322,9 +323,9 @@ export async function getTransactionById( case TransactionType.Reward: { const tipId = parsedTx.walletRewardId; - return await ws.db - .mktx((x) => [x.rewards, x.operationRetries]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + ["rewards", "operationRetries"], + async (tx) => { const tipRecord = await tx.rewards.get(tipId); if (!tipRecord) throw Error("not found"); @@ -332,14 +333,15 @@ export async function getTransactionById( TaskIdentifiers.forTipPickup(tipRecord), ); return buildTransactionForTip(tipRecord, retries); - }); + }, + ); } case TransactionType.Deposit: { const depositGroupId = parsedTx.depositGroupId; - return await ws.db - .mktx((x) => [x.depositGroups, x.operationRetries]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + ["depositGroups", "operationRetries"], + async (tx) => { const depositRecord = await tx.depositGroups.get(depositGroupId); if (!depositRecord) throw Error("not found"); @@ -347,13 +349,14 @@ export async function getTransactionById( TaskIdentifiers.forDeposit(depositRecord), ); return buildTransactionForDeposit(depositRecord, retries); - }); + }, + ); } case TransactionType.Refund: { - return await ws.db - .mktx((x) => [x.refundGroups, x.contractTerms, x.purchases]) - .runReadOnly(async (tx) => { + return await ws.db.runReadOnlyTx( + ["refundGroups", "purchases", "operationRetries", "contractTerms"], + async (tx) => { const refundRecord = await tx.refundGroups.get( parsedTx.refundGroupId, ); @@ -365,12 +368,13 @@ export async function getTransactionById( refundRecord?.proposalId, ); return buildTransactionForRefund(refundRecord, contractData); - }); + }, + ); } case TransactionType.PeerPullDebit: { - return await ws.db - .mktx((x) => [x.peerPullDebit, x.contractTerms]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + ["peerPullDebit", "contractTerms"], + async (tx) => { const debit = await tx.peerPullDebit.get(parsedTx.peerPullDebitId); if (!debit) throw Error("not found"); const contractTermsRec = await tx.contractTerms.get( @@ -382,13 +386,14 @@ export async function getTransactionById( debit, contractTermsRec.contractTermsRaw, ); - }); + }, + ); } case TransactionType.PeerPushDebit: { - return await ws.db - .mktx((x) => [x.peerPushDebit, x.contractTerms]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + ["peerPushDebit", "contractTerms"], + async (tx) => { const debit = await tx.peerPushDebit.get(parsedTx.pursePub); if (!debit) throw Error("not found"); const ct = await tx.contractTerms.get(debit.contractTermsHash); @@ -397,19 +402,20 @@ export async function getTransactionById( debit, ct.contractTermsRaw, ); - }); + }, + ); } case TransactionType.PeerPushCredit: { const peerPushCreditId = parsedTx.peerPushCreditId; - return await ws.db - .mktx((x) => [ - x.peerPushCredit, - x.contractTerms, - x.withdrawalGroups, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + [ + "peerPushCredit", + "contractTerms", + "withdrawalGroups", + "operationRetries", + ], + async (tx) => { const pushInc = await tx.peerPushCredit.get(peerPushCreditId); if (!pushInc) throw Error("not found"); const ct = await tx.contractTerms.get(pushInc.contractTermsHash); @@ -434,19 +440,20 @@ export async function getTransactionById( wg, wgOrt, ); - }); + }, + ); } case TransactionType.PeerPullCredit: { const pursePub = parsedTx.pursePub; - return await ws.db - .mktx((x) => [ - x.peerPullCredit, - x.contractTerms, - x.withdrawalGroups, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + [ + "peerPullCredit", + "contractTerms", + "withdrawalGroups", + "operationRetries", + ], + async (tx) => { const pushInc = await tx.peerPullCredit.get(pursePub); if (!pushInc) throw Error("not found"); const ct = await tx.contractTerms.get(pushInc.contractTermsHash); @@ -472,7 +479,8 @@ export async function getTransactionById( wg, wgOrt, ); - }); + }, + ); } } } @@ -941,10 +949,7 @@ function buildTransactionForTip( } async function lookupMaybeContractData( - tx: GetReadOnlyAccess<{ - purchases: typeof WalletStoresV1.purchases; - contractTerms: typeof WalletStoresV1.contractTerms; - }>, + tx: WalletDbReadOnlyTransaction<["purchases", "contractTerms"]>, proposalId: string, ): Promise { let contractData: WalletContractData | undefined = undefined; @@ -1037,14 +1042,9 @@ export async function getWithdrawalTransactionByUri( ws: InternalWalletState, request: WithdrawalTransactionByURIRequest, ): Promise { - return await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.exchangeDetails, - x.exchanges, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + ["withdrawalGroups", "exchangeDetails", "exchanges", "operationRetries"], + async (tx) => { const withdrawalGroupRecord = await tx.withdrawalGroups.indexes.byTalerWithdrawUri.get( request.talerWithdrawUri, @@ -1077,7 +1077,8 @@ export async function getWithdrawalTransactionByUri( exchangeDetails, ort, ); - }); + }, + ); } /** @@ -1094,29 +1095,29 @@ export async function getTransactions( filter.onlyState = transactionsRequest.filterByState; } - await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.depositGroups, - x.exchangeDetails, - x.exchanges, - x.operationRetries, - x.peerPullDebit, - x.peerPushDebit, - x.peerPushCredit, - x.peerPullCredit, - x.planchets, - x.purchases, - x.contractTerms, - x.recoupGroups, - x.rewards, - x.tombstones, - x.withdrawalGroups, - x.refreshGroups, - x.refundGroups, - ]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + [ + "coins", + "denominations", + "depositGroups", + "exchangeDetails", + "exchanges", + "operationRetries", + "peerPullDebit", + "peerPushDebit", + "peerPushCredit", + "peerPullCredit", + "planchets", + "purchases", + "contractTerms", + "recoupGroups", + "rewards", + "tombstones", + "withdrawalGroups", + "refreshGroups", + "refundGroups", + ], + async (tx) => { await iterRecordsForPeerPushDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); const exchangesInTx = [pi.exchangeBaseUrl]; @@ -1463,7 +1464,8 @@ export async function getTransactions( transactions.push(buildTransactionForTip(tipRecord, retryRecord)); }); //ends REMOVE REWARDS - }); + }, + ); // One-off checks, because of a bug where the wallet previously // did not migrate the DB correctly and caused these amounts @@ -1829,9 +1831,7 @@ export function notifyTransition( * Iterate refresh records based on a filter. */ async function iterRecordsForRefresh( - tx: GetReadOnlyAccess<{ - refreshGroups: typeof WalletStoresV1.refreshGroups; - }>, + tx: WalletDbReadOnlyTransaction<["refreshGroups"]>, filter: TransactionRecordFilter, f: (r: RefreshGroupRecord) => Promise, ): Promise { @@ -1852,9 +1852,7 @@ async function iterRecordsForRefresh( } async function iterRecordsForWithdrawal( - tx: GetReadOnlyAccess<{ - withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; - }>, + tx: WalletDbReadOnlyTransaction<["withdrawalGroups"]>, filter: TransactionRecordFilter, f: (r: WithdrawalGroupRecord) => Promise, ): Promise { @@ -1876,9 +1874,7 @@ async function iterRecordsForWithdrawal( } async function iterRecordsForDeposit( - tx: GetReadOnlyAccess<{ - depositGroups: typeof WalletStoresV1.depositGroups; - }>, + tx: WalletDbReadOnlyTransaction<["depositGroups"]>, filter: TransactionRecordFilter, f: (r: DepositGroupRecord) => Promise, ): Promise { @@ -1899,9 +1895,7 @@ async function iterRecordsForDeposit( } async function iterRecordsForReward( - tx: GetReadOnlyAccess<{ - rewards: typeof WalletStoresV1.rewards; - }>, + tx: WalletDbReadOnlyTransaction<["rewards"]>, filter: TransactionRecordFilter, f: (r: RewardRecord) => Promise, ): Promise { @@ -1917,9 +1911,7 @@ async function iterRecordsForReward( } async function iterRecordsForRefund( - tx: GetReadOnlyAccess<{ - refundGroups: typeof WalletStoresV1.refundGroups; - }>, + tx: WalletDbReadOnlyTransaction<["refundGroups"]>, filter: TransactionRecordFilter, f: (r: RefundGroupRecord) => Promise, ): Promise { @@ -1935,9 +1927,7 @@ async function iterRecordsForRefund( } async function iterRecordsForPurchase( - tx: GetReadOnlyAccess<{ - purchases: typeof WalletStoresV1.purchases; - }>, + tx: WalletDbReadOnlyTransaction<["purchases"]>, filter: TransactionRecordFilter, f: (r: PurchaseRecord) => Promise, ): Promise { @@ -1953,9 +1943,7 @@ async function iterRecordsForPurchase( } async function iterRecordsForPeerPullCredit( - tx: GetReadOnlyAccess<{ - peerPullCredit: typeof WalletStoresV1.peerPullCredit; - }>, + tx: WalletDbReadOnlyTransaction<["peerPullCredit"]>, filter: TransactionRecordFilter, f: (r: PeerPullCreditRecord) => Promise, ): Promise { @@ -1971,9 +1959,7 @@ async function iterRecordsForPeerPullCredit( } async function iterRecordsForPeerPullDebit( - tx: GetReadOnlyAccess<{ - peerPullDebit: typeof WalletStoresV1.peerPullDebit; - }>, + tx: WalletDbReadOnlyTransaction<["peerPullDebit"]>, filter: TransactionRecordFilter, f: (r: PeerPullPaymentIncomingRecord) => Promise, ): Promise { @@ -1989,9 +1975,7 @@ async function iterRecordsForPeerPullDebit( } async function iterRecordsForPeerPushDebit( - tx: GetReadOnlyAccess<{ - peerPushDebit: typeof WalletStoresV1.peerPushDebit; - }>, + tx: WalletDbReadOnlyTransaction<["peerPushDebit"]>, filter: TransactionRecordFilter, f: (r: PeerPushDebitRecord) => Promise, ): Promise { @@ -2007,9 +1991,7 @@ async function iterRecordsForPeerPushDebit( } async function iterRecordsForPeerPushCredit( - tx: GetReadOnlyAccess<{ - peerPushCredit: typeof WalletStoresV1.peerPushCredit; - }>, + tx: WalletDbReadOnlyTransaction<["peerPushCredit"]>, filter: TransactionRecordFilter, f: (r: PeerPushPaymentIncomingRecord) => Promise, ): Promise { diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 4d9996cf4..542868de0 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -100,7 +100,12 @@ import { WithdrawalGroupStatus, WithdrawalRecordType, } from "../db.js"; -import { isWithdrawableDenom, timestampPreciseToDb } from "../index.js"; +import { + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, + isWithdrawableDenom, + timestampPreciseToDb, +} from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { TaskRunResult, @@ -118,11 +123,7 @@ import { selectWithdrawalDenominations, } from "../util/coinSelection.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { - DbAccess, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "../util/query.js"; +import { DbAccess } from "../util/query.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION, @@ -166,9 +167,9 @@ export class WithdrawTransactionContext implements TransactionContext { async deleteTransaction(): Promise { const { ws, withdrawalGroupId } = this; - await ws.db - .mktx((x) => [x.withdrawalGroups, x.tombstones]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["withdrawalGroups", "tombstones"], + async (tx) => { const withdrawalGroupRecord = await tx.withdrawalGroups.get(withdrawalGroupId); if (withdrawalGroupRecord) { @@ -178,14 +179,15 @@ export class WithdrawTransactionContext implements TransactionContext { }); return; } - }); + }, + ); } async suspendTransaction(): Promise { const { ws, withdrawalGroupId, transactionId, taskId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); @@ -230,16 +232,17 @@ export class WithdrawTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(taskId); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, withdrawalGroupId, transactionId, taskId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); @@ -276,7 +279,6 @@ export class WithdrawTransactionContext implements TransactionContext { case WithdrawalGroupStatus.FailedAbortingBank: // Not allowed throw Error("abort not allowed in current state"); - break; default: assertUnreachable(wg.status); } @@ -291,7 +293,8 @@ export class WithdrawTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(taskId); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(taskId); @@ -299,9 +302,9 @@ export class WithdrawTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); @@ -346,16 +349,17 @@ export class WithdrawTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this; - const stateUpdate = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const stateUpdate = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); @@ -381,13 +385,18 @@ export class WithdrawTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, stateUpdate); ws.taskScheduler.startShepherdTask(retryTag); } } +/** + * Compute the DD37 transaction state of a withdrawal transaction + * from the database's withdrawal group record. + */ export function computeWithdrawalTransactionStatus( wgRecord: WithdrawalGroupRecord, ): TransactionState { @@ -494,6 +503,10 @@ export function computeWithdrawalTransactionStatus( } } +/** + * Compute DD37 transaction actions for a withdrawal transaction + * based on the database's withdrawal group record. + */ export function computeWithdrawalTransactionActions( wgRecord: WithdrawalGroupRecord, ): TransactionAction[] { @@ -598,21 +611,19 @@ export async function getBankWithdrawalInfo( /** * Return denominations that can potentially used for a withdrawal. */ -export async function getCandidateWithdrawalDenoms( +async function getCandidateWithdrawalDenoms( ws: InternalWalletState, exchangeBaseUrl: string, currency: string, ): Promise { - return await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { - return getCandidateWithdrawalDenomsTx(ws, tx, exchangeBaseUrl, currency); - }); + return await ws.db.runReadOnlyTx(["denominations"], async (tx) => { + return getCandidateWithdrawalDenomsTx(ws, tx, exchangeBaseUrl, currency); + }); } export async function getCandidateWithdrawalDenomsTx( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ denominations: typeof WalletStoresV1.denominations }>, + tx: WalletDbReadOnlyTransaction<["denominations"]>, exchangeBaseUrl: string, currency: string, ): Promise { @@ -636,14 +647,12 @@ async function processPlanchetGenerate( withdrawalGroup: WithdrawalGroupRecord, coinIdx: number, ): Promise { - let planchet = await ws.db - .mktx((x) => [x.planchets]) - .runReadOnly(async (tx) => { - return tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - }); + let planchet = await ws.db.runReadOnlyTx(["planchets"], async (tx) => { + return tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + }); if (planchet) { return; } @@ -662,16 +671,14 @@ async function processPlanchetGenerate( } const denomPubHash = maybeDenomPubHash; - const denom = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { - return ws.getDenomInfo( - ws, - tx, - withdrawalGroup.exchangeBaseUrl, - denomPubHash, - ); - }); + const denom = await ws.db.runReadOnlyTx(["denominations"], async (tx) => { + return ws.getDenomInfo( + ws, + tx, + withdrawalGroup.exchangeBaseUrl, + denomPubHash, + ); + }); checkDbInvariant(!!denom); const r = await ws.cryptoApi.createPlanchet({ denomPub: denom.denomPub, @@ -697,20 +704,18 @@ async function processPlanchetGenerate( ageCommitmentProof: r.ageCommitmentProof, lastError: undefined, }; - await ws.db - .mktx((x) => [x.planchets]) - .runReadWrite(async (tx) => { - const p = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (p) { - planchet = p; - return; - } - await tx.planchets.put(newPlanchet); - planchet = newPlanchet; - }); + await ws.db.runReadWriteTx(["planchets"], async (tx) => { + const p = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (p) { + planchet = p; + return; + } + await tx.planchets.put(newPlanchet); + planchet = newPlanchet; + }); } interface WithdrawalRequestBatchArgs { @@ -744,9 +749,9 @@ async function transitionKycUrlUpdate( const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId); const transactionId = ctx.transactionId; - const transitionInfo = await ws.db - .mktx((x) => [x.planchets, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg2 = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg2) { return; @@ -766,7 +771,8 @@ async function transitionKycUrlUpdate( default: return undefined; } - }); + }, + ); if (transitionInfo) { // Always notify, even on self-transition, as the KYC URL might have changed. ws.notify({ @@ -814,7 +820,7 @@ async function handleKycRequired( let amlStatus: AmlStatus | undefined; if ( kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { @@ -836,9 +842,9 @@ async function handleKycRequired( let notificationKycUrl: string | undefined = undefined; - const transitionInfo = await ws.db - .mktx((x) => [x.planchets, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["planchets", "withdrawalGroups"], + async (tx) => { for (let i = startIdx; i < requestCoinIdxs.length; i++) { let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ withdrawalGroup.withdrawalGroupId, @@ -885,7 +891,8 @@ async function handleKycRequired( default: return undefined; } - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo, notificationKycUrl); } @@ -908,52 +915,45 @@ async function processPlanchetExchangeBatchRequest( // Indices of coins that are included in the batch request const requestCoinIdxs: number[] = []; - await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.planchets, - x.exchanges, - x.denominations, - ]) - .runReadOnly(async (tx) => { - for ( - let coinIdx = args.coinStartIndex; - coinIdx < args.coinStartIndex + args.batchSize && - coinIdx < wgContext.numPlanchets; - coinIdx++ - ) { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - continue; - } - if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); - continue; - } - const denom = await ws.getDenomInfo( - ws, - tx, - withdrawalGroup.exchangeBaseUrl, - planchet.denomPubHash, - ); - - if (!denom) { - logger.error("db inconsistent: denom for planchet not found"); - continue; - } + await ws.db.runReadOnlyTx(["planchets", "denominations"], async (tx) => { + for ( + let coinIdx = args.coinStartIndex; + coinIdx < args.coinStartIndex + args.batchSize && + coinIdx < wgContext.numPlanchets; + coinIdx++ + ) { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + continue; + } + if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { + logger.warn("processPlanchet: planchet already withdrawn"); + continue; + } + const denom = await ws.getDenomInfo( + ws, + tx, + withdrawalGroup.exchangeBaseUrl, + planchet.denomPubHash, + ); - const planchetReq: ExchangeWithdrawRequest = { - denom_pub_hash: planchet.denomPubHash, - reserve_sig: planchet.withdrawSig, - coin_ev: planchet.coinEv, - }; - batchReq.planchets.push(planchetReq); - requestCoinIdxs.push(coinIdx); + if (!denom) { + logger.error("db inconsistent: denom for planchet not found"); + continue; } - }); + + const planchetReq: ExchangeWithdrawRequest = { + denom_pub_hash: planchet.denomPubHash, + reserve_sig: planchet.withdrawSig, + coin_ev: planchet.coinEv, + }; + batchReq.planchets.push(planchetReq); + requestCoinIdxs.push(coinIdx); + } + }); if (batchReq.planchets.length == 0) { logger.warn("empty withdrawal batch"); @@ -967,19 +967,17 @@ async function processPlanchetExchangeBatchRequest( const errDetail = getErrorDetailFromException(e); logger.trace("withdrawal request failed", e); logger.trace(String(e)); - await ws.db - .mktx((x) => [x.planchets]) - .runReadWrite(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.lastError = errDetail; - await tx.planchets.put(planchet); - }); + await ws.db.runReadWriteTx(["planchets"], async (tx) => { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = errDetail; + await tx.planchets.put(planchet); + }); } // FIXME: handle individual error codes better! @@ -1026,9 +1024,9 @@ async function processPlanchetVerifyAndStoreCoin( ): Promise { const withdrawalGroup = wgContext.wgRecord; logger.trace(`checking and storing planchet idx=${coinIdx}`); - const d = await ws.db - .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations]) - .runReadOnly(async (tx) => { + const d = await ws.db.runReadOnlyTx( + ["planchets", "denominations"], + async (tx) => { let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ withdrawalGroup.withdrawalGroupId, coinIdx, @@ -1054,7 +1052,8 @@ async function processPlanchetVerifyAndStoreCoin( denomInfo, exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl, }; - }); + }, + ); if (!d) { return; @@ -1090,23 +1089,21 @@ async function processPlanchetVerifyAndStoreCoin( }); if (!isValid) { - await ws.db - .mktx((x) => [x.planchets]) - .runReadWrite(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.lastError = makeErrorDetail( - TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID, - {}, - "invalid signature from the exchange after unblinding", - ); - await tx.planchets.put(planchet); - }); + await ws.db.runReadWriteTx(["planchets"], async (tx) => { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = makeErrorDetail( + TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID, + {}, + "invalid signature from the exchange after unblinding", + ); + await tx.planchets.put(planchet); + }); return; } @@ -1145,46 +1142,38 @@ async function processPlanchetVerifyAndStoreCoin( wgContext.planchetsFinished.add(planchet.coinPub); - // Check if this is the first time that the whole - // withdrawal succeeded. If so, mark the withdrawal - // group as finished. - const success = await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.withdrawalGroups, - x.planchets, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["planchets", "coins", "coinAvailability", "denominations"], + async (tx) => { const p = await tx.planchets.get(planchetCoinPub); if (!p || p.planchetStatus === PlanchetStatus.WithdrawalDone) { - return false; + return; } p.planchetStatus = PlanchetStatus.WithdrawalDone; p.lastError = undefined; await tx.planchets.put(p); await makeCoinAvailable(ws, tx, coin); - return true; - }); + }, + ); } /** * Make sure that denominations that currently can be used for withdrawal * are validated, and the result of validation is stored in the database. */ -export async function updateWithdrawalDenoms( +async function updateWithdrawalDenoms( ws: InternalWalletState, exchangeBaseUrl: string, ): Promise { logger.trace( `updating denominations used for withdrawal for ${exchangeBaseUrl}`, ); - const exchangeDetails = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + const exchangeDetails = await ws.db.runReadOnlyTx( + ["exchanges", "exchangeDetails"], + async (tx) => { return getExchangeWireDetailsInTx(tx, exchangeBaseUrl); - }); + }, + ); if (!exchangeDetails) { logger.error("exchange details not available"); throw Error(`exchange ${exchangeBaseUrl} details not available`); @@ -1243,14 +1232,12 @@ export async function updateWithdrawalDenoms( } if (updatedDenominations.length > 0) { logger.trace("writing denomination batch to db"); - await ws.db - .mktx((x) => [x.denominations]) - .runReadWrite(async (tx) => { - for (let i = 0; i < updatedDenominations.length; i++) { - const denom = updatedDenominations[i]; - await tx.denominations.put(denom); - } - }); + await ws.db.runReadWriteTx(["denominations"], async (tx) => { + for (let i = 0; i < updatedDenominations.length; i++) { + const denom = updatedDenominations[i]; + await tx.denominations.put(denom); + } + }); logger.trace("done with DB write"); } } @@ -1315,9 +1302,9 @@ async function queryReserve( logger.trace(`got reserve status ${j2s(result.response)}`); - const transitionResult = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionResult = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { logger.warn(`withdrawal group ${withdrawalGroupId} not found`); @@ -1332,7 +1319,8 @@ async function queryReserve( oldTxState: txStateOld, newTxState: txStateNew, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionResult); @@ -1376,9 +1364,9 @@ async function processWithdrawalGroupAbortingBank( }); logger.info(`abort response status: ${abortResp.status}`); - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { return undefined; @@ -1392,7 +1380,8 @@ async function processWithdrawalGroupAbortingBank( oldTxState: txStatusOld, newTxState: txStatusNew, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } @@ -1409,9 +1398,9 @@ async function transitionKycSatisfied( tag: TransactionType.Withdrawal, withdrawalGroupId: withdrawalGroup.withdrawalGroupId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg2 = await tx.withdrawalGroups.get( withdrawalGroup.withdrawalGroupId, ); @@ -1434,7 +1423,8 @@ async function transitionKycSatisfied( default: return undefined; } - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -1502,9 +1492,9 @@ async function processWithdrawalGroupPendingReady( if (withdrawalGroup.denomsSel.selectedDenoms.length === 0) { logger.warn("Finishing empty withdrawal group (no denoms)"); - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { return undefined; @@ -1518,7 +1508,8 @@ async function processWithdrawalGroupPendingReady( oldTxState: txStatusOld, newTxState: txStatusNew, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } @@ -1533,17 +1524,15 @@ async function processWithdrawalGroupPendingReady( wgRecord: withdrawalGroup, }; - await ws.db - .mktx((x) => [x.planchets]) - .runReadOnly(async (tx) => { - const planchets = - await tx.planchets.indexes.byGroup.getAll(withdrawalGroupId); - for (const p of planchets) { - if (p.planchetStatus === PlanchetStatus.WithdrawalDone) { - wgContext.planchetsFinished.add(p.coinPub); - } + await ws.db.runReadOnlyTx(["planchets"], async (tx) => { + const planchets = + await tx.planchets.indexes.byGroup.getAll(withdrawalGroupId); + for (const p of planchets) { + if (p.planchetStatus === PlanchetStatus.WithdrawalDone) { + wgContext.planchetsFinished.add(p.coinPub); } - }); + } + }); // We sequentially generate planchets, so that // large withdrawal groups don't make the wallet unresponsive. @@ -1582,9 +1571,9 @@ async function processWithdrawalGroupPendingReady( let numPlanchetErrors = 0; const maxReportedErrors = 5; - const res = await ws.db - .mktx((x) => [x.coins, x.coinAvailability, x.withdrawalGroups, x.planchets]) - .runReadWrite(async (tx) => { + const res = await ws.db.runReadWriteTx( + ["coins", "coinAvailability", "withdrawalGroups", "planchets"], + async (tx) => { const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { return; @@ -1621,7 +1610,8 @@ async function processWithdrawalGroupPendingReady( newTxState, }, }; - }); + }, + ); if (!res) { throw Error("withdrawal group does not exist anymore"); @@ -1655,11 +1645,12 @@ export async function processWithdrawalGroup( cancellationToken: CancellationToken, ): Promise { logger.trace("processing withdrawal group", withdrawalGroupId); - const withdrawalGroup = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadOnly(async (tx) => { + const withdrawalGroup = await ws.db.runReadOnlyTx( + ["withdrawalGroups"], + async (tx) => { return tx.withdrawalGroups.get(withdrawalGroupId); - }); + }, + ); if (!withdrawalGroup) { throw Error(`withdrawal group ${withdrawalGroupId} not found`); @@ -1779,11 +1770,9 @@ export async function getExchangeWithdrawalInfo( for (let i = 0; i < selectedDenoms.selectedDenoms.length; i++) { const ds = selectedDenoms.selectedDenoms[i]; // FIXME: Do in one transaction! - const denom = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { - return ws.getDenomInfo(ws, tx, exchangeBaseUrl, ds.denomPubHash); - }); + const denom = await ws.db.runReadOnlyTx(["denominations"], async (tx) => { + return ws.getDenomInfo(ws, tx, exchangeBaseUrl, ds.denomPubHash); + }); checkDbInvariant(!!denom); hasDenomWithAgeRestriction = hasDenomWithAgeRestriction || denom.denomPub.age_mask > 0; @@ -1979,11 +1968,9 @@ export function augmentPaytoUrisForWithdrawal( * Get payto URIs that can be used to fund a withdrawal operation. */ export async function getFundingPaytoUris( - tx: GetReadOnlyAccess<{ - withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; - exchanges: typeof WalletStoresV1.exchanges; - exchangeDetails: typeof WalletStoresV1.exchangeDetails; - }>, + tx: WalletDbReadOnlyTransaction< + ["withdrawalGroups", "exchanges", "exchangeDetails"] + >, withdrawalGroupId: string, ): Promise { const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId); @@ -2017,11 +2004,9 @@ async function getWithdrawalGroupRecordTx( withdrawalGroupId: string; }, ): Promise { - return await db - .mktx((x) => [x.withdrawalGroups]) - .runReadOnly(async (tx) => { - return tx.withdrawalGroups.get(req.withdrawalGroupId); - }); + return await db.runReadOnlyTx(["withdrawalGroups"], async (tx) => { + return tx.withdrawalGroups.get(req.withdrawalGroupId); + }); } export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration { @@ -2056,11 +2041,12 @@ async function registerReserveWithBank( ws: InternalWalletState, withdrawalGroupId: string, ): Promise { - const withdrawalGroup = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadOnly(async (tx) => { + const withdrawalGroup = await ws.db.runReadOnlyTx( + ["withdrawalGroups"], + async (tx) => { return await tx.withdrawalGroups.get(withdrawalGroupId); - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Withdrawal, withdrawalGroupId, @@ -2094,9 +2080,9 @@ async function registerReserveWithBank( }); // FIXME: libeufin-bank currently doesn't return a response in the right format, so we don't validate at all. await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const r = await tx.withdrawalGroups.get(withdrawalGroupId); if (!r) { return undefined; @@ -2122,7 +2108,8 @@ async function registerReserveWithBank( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -2168,9 +2155,9 @@ async function processReserveBankStatus( if (status.aborted) { logger.info("bank aborted the withdrawal"); - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const r = await tx.withdrawalGroups.get(withdrawalGroupId); if (!r) { return; @@ -2195,7 +2182,8 @@ async function processReserveBankStatus( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } @@ -2212,9 +2200,9 @@ async function processReserveBankStatus( return await processReserveBankStatus(ws, withdrawalGroupId); } - const transitionInfo = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["withdrawalGroups"], + async (tx) => { const r = await tx.withdrawalGroups.get(withdrawalGroupId); if (!r) { return undefined; @@ -2247,7 +2235,8 @@ async function processReserveBankStatus( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); @@ -2293,11 +2282,12 @@ export async function internalPrepareCreateWithdrawalGroup( if (args.forcedWithdrawalGroupId) { withdrawalGroupId = args.forcedWithdrawalGroupId; const wgId = withdrawalGroupId; - const existingWg = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadOnly(async (tx) => { + const existingWg = await ws.db.runReadOnlyTx( + ["withdrawalGroups"], + async (tx) => { return tx.withdrawalGroups.get(wgId); - }); + }, + ); if (existingWg) { const transactionId = constructTransactionIdentifier({ @@ -2384,11 +2374,9 @@ export interface PerformCreateWithdrawalGroupResult { export async function internalPerformCreateWithdrawalGroup( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; - reserves: typeof WalletStoresV1.reserves; - exchanges: typeof WalletStoresV1.exchanges; - }>, + tx: WalletDbReadWriteTransaction< + ["withdrawalGroups", "reserves", "exchanges"] + >, prep: PrepareCreateWithdrawalGroupResult, ): Promise { const { withdrawalGroup } = prep; @@ -2477,16 +2465,12 @@ export async function internalCreateWithdrawalGroup( tag: TransactionType.Withdrawal, withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId, }); - const res = await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.reserves, - x.exchanges, - x.exchangeDetails, - ]) - .runReadWrite(async (tx) => { + const res = await ws.db.runReadWriteTx( + ["withdrawalGroups", "reserves", "exchanges", "exchangeDetails"], + async (tx) => { return await internalPerformCreateWithdrawalGroup(ws, tx, prep); - }); + }, + ); if (res.exchangeNotif) { ws.notify(res.exchangeNotif); } @@ -2507,13 +2491,14 @@ export async function acceptWithdrawalFromUri( logger.info( `accepting withdrawal via ${req.talerWithdrawUri}, canonicalized selected exchange ${selectedExchange}`, ); - const existingWithdrawalGroup = await ws.db - .mktx((x) => [x.withdrawalGroups]) - .runReadOnly(async (tx) => { + const existingWithdrawalGroup = await ws.db.runReadOnlyTx( + ["withdrawalGroups"], + async (tx) => { return await tx.withdrawalGroups.indexes.byTalerWithdrawUri.get( req.talerWithdrawUri, ); - }); + }, + ); if (existingWithdrawalGroup) { let url: string | undefined; @@ -2751,11 +2736,12 @@ export async function createManualWithdrawal( const transactionId = ctx.transactionId; - const exchangePaytoUris = await ws.db - .mktx((x) => [x.withdrawalGroups, x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + const exchangePaytoUris = await ws.db.runReadOnlyTx( + ["withdrawalGroups", "exchanges", "exchangeDetails"], + async (tx) => { return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId); - }); + }, + ); ws.taskScheduler.startShepherdTask(ctx.taskId); diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index a3735e78f..f9290cdd9 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -39,10 +39,9 @@ import { } from "@gnu-taler/taler-util"; import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { - GetReadOnlyAccess, OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, - WalletStoresV1, + WalletDbAllStoresReadOnlyTransaction, timestampAbsoluteFromDb, } from "./index.js"; import { InternalWalletState } from "./internal-wallet-state.js"; @@ -214,12 +213,12 @@ export class TaskScheduler { } async resetTaskRetries(taskId: TaskId): Promise { - const maybeNotification = await this.ws.db - .mktxAll() - .runReadWrite(async (tx) => { + const maybeNotification = await this.ws.db.runAllStoresReadWriteTx( + async (tx) => { await tx.operationRetries.delete(taskId); return taskToRetryNotification(this.ws, tx, taskId, undefined); - }); + }, + ); this.stopShepherdTask(taskId); if (maybeNotification) { this.ws.notify(maybeNotification); @@ -338,7 +337,7 @@ async function storePendingTaskError( e: TalerErrorDetail, ): Promise { logger.info(`storing pending task error for ${pendingTaskId}`); - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); if (!retryRecord) { retryRecord = { @@ -365,7 +364,7 @@ async function storeTaskProgress( ws: InternalWalletState, pendingTaskId: string, ): Promise { - await ws.db.mktxAll().runReadWrite(async (tx) => { + await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { await tx.operationRetries.delete(pendingTaskId); }); } @@ -374,7 +373,7 @@ async function storePendingTaskPending( ws: InternalWalletState, pendingTaskId: string, ): Promise { - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); let hadError = false; if (!retryRecord) { @@ -405,11 +404,9 @@ async function storePendingTaskFinished( ws: InternalWalletState, pendingTaskId: string, ): Promise { - await ws.db - .mktx((x) => [x.operationRetries]) - .runReadWrite(async (tx) => { - await tx.operationRetries.delete(pendingTaskId); - }); + await ws.db.runReadWriteTx(["operationRetries"], async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); } async function runTaskWithErrorReporting( @@ -570,7 +567,7 @@ async function callOperationHandlerForTaskId( */ async function taskToRetryNotification( ws: InternalWalletState, - tx: GetReadOnlyAccess, + tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { @@ -597,7 +594,7 @@ async function taskToRetryNotification( async function makeTransactionRetryNotification( ws: InternalWalletState, - tx: GetReadOnlyAccess, + tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { @@ -626,7 +623,7 @@ async function makeTransactionRetryNotification( async function makeExchangeRetryNotification( ws: InternalWalletState, - tx: GetReadOnlyAccess, + tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { @@ -729,20 +726,20 @@ export async function getActiveTaskIds( const res: ActiveTaskIdsResult = { taskIds: [], }; - await ws.db - .mktx((x) => [ - x.exchanges, - x.refreshGroups, - x.withdrawalGroups, - x.purchases, - x.depositGroups, - x.recoupGroups, - x.peerPullCredit, - x.peerPushDebit, - x.peerPullDebit, - x.peerPushCredit, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "exchanges", + "refreshGroups", + "withdrawalGroups", + "purchases", + "depositGroups", + "recoupGroups", + "peerPullCredit", + "peerPushDebit", + "peerPullDebit", + "peerPushCredit", + ], + async (tx) => { const active = GlobalIDB.KeyRange.bound( OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, @@ -887,7 +884,8 @@ export async function getActiveTaskIds( } // FIXME: Recoup! - }); + }, + ); return res; } diff --git a/packages/taler-wallet-core/src/util/coinSelection.ts b/packages/taler-wallet-core/src/util/coinSelection.ts index be868867d..0f6316bce 100644 --- a/packages/taler-wallet-core/src/util/coinSelection.ts +++ b/packages/taler-wallet-core/src/util/coinSelection.ts @@ -58,7 +58,7 @@ import { DenominationRecord } from "../db.js"; import { getExchangeWireDetailsInTx, isWithdrawableDenom, - WalletDbReadOnlyTransactionArr, + WalletDbReadOnlyTransaction, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { @@ -359,33 +359,31 @@ export async function selectPayCoinsNew( logger.trace(`coin selection request ${j2s(req)}`); logger.trace(`selected coins (via denoms) for payment: ${j2s(finalSel)}`); - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (const dph of Object.keys(finalSel)) { - const selInfo = finalSel[dph]; - const numRequested = selInfo.contributions.length; - const query = [ - selInfo.exchangeBaseUrl, - selInfo.denomPubHash, - selInfo.maxAge, - CoinStatus.Fresh, - ]; - logger.trace(`query: ${j2s(query)}`); - const coins = - await tx.coins.indexes.byExchangeDenomPubHashAndAgeAndStatus.getAll( - query, - numRequested, - ); - if (coins.length != numRequested) { - throw Error( - `coin selection failed (not available anymore, got only ${coins.length}/${numRequested})`, - ); - } - coinPubs.push(...coins.map((x) => x.coinPub)); - coinContributions.push(...selInfo.contributions); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (const dph of Object.keys(finalSel)) { + const selInfo = finalSel[dph]; + const numRequested = selInfo.contributions.length; + const query = [ + selInfo.exchangeBaseUrl, + selInfo.denomPubHash, + selInfo.maxAge, + CoinStatus.Fresh, + ]; + logger.trace(`query: ${j2s(query)}`); + const coins = + await tx.coins.indexes.byExchangeDenomPubHashAndAgeAndStatus.getAll( + query, + numRequested, + ); + if (coins.length != numRequested) { + throw Error( + `coin selection failed (not available anymore, got only ${coins.length}/${numRequested})`, + ); } - }); + coinPubs.push(...coins.map((x) => x.coinPub)); + coinContributions.push(...selInfo.contributions); + } + }); return { type: "success", @@ -911,7 +909,7 @@ export interface PeerCoinSelectionRequest { */ async function selectPayPeerCandidatesForExchange( ws: InternalWalletState, - tx: WalletDbReadOnlyTransactionArr<["coinAvailability", "denominations"]>, + tx: WalletDbReadOnlyTransaction<["coinAvailability", "denominations"]>, exchangeBaseUrl: string, ): Promise { const denoms: AvailableDenom[] = []; @@ -1048,17 +1046,17 @@ export async function selectPeerCoins( // one coin to spend. throw new Error("amount of zero not allowed"); } - return await ws.db - .mktx((x) => [ - x.exchanges, - x.contractTerms, - x.coins, - x.coinAvailability, - x.denominations, - x.refreshGroups, - x.peerPushDebit, - ]) - .runReadWrite(async (tx) => { + return await ws.db.runReadWriteTx( + [ + "exchanges", + "contractTerms", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + "peerPushDebit", + ], + async (tx) => { const exchanges = await tx.exchanges.iter().toArray(); const exchangeFeeGap: { [url: string]: AmountJson } = {}; const currency = Amounts.currencyOf(instructedAmount); @@ -1232,5 +1230,6 @@ export async function selectPeerCoins( }; return { type: "failure", insufficientBalanceDetails: errDetails }; - }); + }, + ); } diff --git a/packages/taler-wallet-core/src/util/instructedAmountConversion.ts b/packages/taler-wallet-core/src/util/instructedAmountConversion.ts index caa3fdca5..c4a2f2d5c 100644 --- a/packages/taler-wallet-core/src/util/instructedAmountConversion.ts +++ b/packages/taler-wallet-core/src/util/instructedAmountConversion.ts @@ -139,14 +139,9 @@ async function getAvailableDenoms( ): Promise { const operationType = getOperationType(TransactionType.Deposit); - return await ws.db - .mktx((x) => [ - x.exchanges, - x.exchangeDetails, - x.denominations, - x.coinAvailability, - ]) - .runReadOnly(async (tx) => { + return await ws.db.runReadOnlyTx( + ["exchanges", "exchangeDetails", "denominations", "coinAvailability"], + async (tx) => { const list: CoinInfo[] = []; const exchanges: Record = {}; @@ -304,7 +299,8 @@ async function getAvailableDenoms( } return { list, exchanges }; - }); + }, + ); } function buildCoinInfoFromDenom( diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts index 19fa0dbfd..90a3cac70 100644 --- a/packages/taler-wallet-core/src/util/query.ts +++ b/packages/taler-wallet-core/src/util/query.ts @@ -490,77 +490,13 @@ type ValidateKeyPath = P extends `${infer PX extends keyof T & // foo({x: [0,1,2]}, "x.0"); -export type GetReadOnlyAccess = { - [P in keyof BoundStores]: BoundStores[P] extends StoreWithIndexes< - infer StoreName, - infer RecordType, - infer IndexMap - > - ? StoreReadOnlyAccessor - : unknown; -}; - export type StoreNames = StoreMap extends { [P in keyof StoreMap]: StoreWithIndexes; } ? keyof StoreMap : unknown; -export type DbReadOnlyTransaction< - StoreMap, - Stores extends StoreNames & string, -> = StoreMap extends { - [P in Stores]: StoreWithIndexes; -} - ? { - [P in Stores]: StoreMap[P] extends StoreWithIndexes< - infer StoreName, - infer RecordType, - infer IndexMap - > - ? StoreReadOnlyAccessor - : unknown; - } - : unknown; - export type DbReadWriteTransaction< - StoreMap, - Stores extends StoreNames & string, -> = StoreMap extends { - [P in Stores]: StoreWithIndexes; -} - ? { - [P in Stores]: StoreMap[P] extends StoreWithIndexes< - infer StoreName, - infer RecordType, - infer IndexMap - > - ? StoreReadWriteAccessor - : unknown; - } - : unknown; - -export type GetReadWriteAccess = { - [P in keyof BoundStores]: BoundStores[P] extends StoreWithIndexes< - infer StoreName, - infer RecordType, - infer IndexMap - > - ? StoreReadWriteAccessor - : unknown; -}; - -type ReadOnlyTransactionFunction = ( - t: GetReadOnlyAccess, - rawTx: IDBTransaction, -) => Promise; - -type ReadWriteTransactionFunction = ( - t: GetReadWriteAccess, - rawTx: IDBTransaction, -) => Promise; - -export type DbReadWriteTransactionArr< StoreMap, StoresArr extends Array>, > = StoreMap extends { @@ -578,7 +514,7 @@ export type DbReadWriteTransactionArr< } : never; -export type DbReadOnlyTransactionArr< +export type DbReadOnlyTransaction< StoreMap, StoresArr extends Array>, > = StoreMap extends { @@ -596,11 +532,6 @@ export type DbReadOnlyTransactionArr< } : never; -export interface DbTransactionContext { - runReadWrite(f: ReadWriteTransactionFunction): Promise; - runReadOnly(f: ReadOnlyTransactionFunction): Promise; -} - /** * Convert the type of an array to a union of the contents. * @@ -811,12 +742,6 @@ function makeWriteContext( return ctx; } -type StoreNamesOf = X extends { [x: number]: infer F } - ? F extends { storeName: infer I } - ? I - : never - : never; - /** * Type-safe access to a database with a particular store map. * @@ -832,65 +757,46 @@ export class DbAccess { return this.db; } - /** - * Run a transaction with all object stores. - */ - mktxAll(): DbTransactionContext { - const storeNames: string[] = []; + runAllStoresReadWriteTx( + txf: ( + tx: DbReadWriteTransaction>>, + ) => Promise, + ): Promise { const accessibleStores: { [x: string]: StoreWithIndexes } = {}; - for (let i = 0; i < this.db.objectStoreNames.length; i++) { - const sn = this.db.objectStoreNames[i]; + const strStoreNames: string[] = []; + for (const sn of Object.keys(this.stores as any)) { const swi = (this.stores as any)[sn] as StoreWithIndexes; - if (!swi) { - logger.warn(`store metadata not available (${sn})`); - continue; - } - storeNames.push(sn); - accessibleStores[sn] = swi; + strStoreNames.push(swi.storeName); + accessibleStores[swi.storeName] = swi; } + const tx = this.db.transaction(strStoreNames, "readwrite"); + const writeContext = makeWriteContext(tx, accessibleStores); + return runTx(tx, writeContext, txf); + } - const storeMapKeys = Object.keys(this.stores as any); - for (const storeMapKey of storeMapKeys) { - const swi = (this.stores as any)[storeMapKey] as StoreWithIndexes< - any, - any, - any - >; - if (!accessibleStores[swi.storeName]) { - const version = this.db.version; - throw Error( - `store '${swi.storeName}' required by schema but not in database (minver=${version})`, - ); - } + runAllStoresReadOnlyTx( + txf: ( + tx: DbReadOnlyTransaction>>, + ) => Promise, + ): Promise { + const accessibleStores: { [x: string]: StoreWithIndexes } = + {}; + const strStoreNames: string[] = []; + for (const sn of Object.keys(this.stores as any)) { + const swi = (this.stores as any)[sn] as StoreWithIndexes; + strStoreNames.push(swi.storeName); + accessibleStores[swi.storeName] = swi; } - - const runReadOnly = ( - txf: ReadOnlyTransactionFunction, - ): Promise => { - const tx = this.db.transaction(storeNames, "readonly"); - const readContext = makeReadContext(tx, accessibleStores); - return runTx(tx, readContext, txf); - }; - - const runReadWrite = ( - txf: ReadWriteTransactionFunction, - ): Promise => { - const tx = this.db.transaction(storeNames, "readwrite"); - const writeContext = makeWriteContext(tx, accessibleStores); - return runTx(tx, writeContext, txf); - }; - - return { - runReadOnly, - runReadWrite, - }; + const tx = this.db.transaction(strStoreNames, "readonly"); + const writeContext = makeReadContext(tx, accessibleStores); + return runTx(tx, writeContext, txf); } runReadWriteTx>>( storeNames: StoreNameArray, txf: ( - tx: DbReadWriteTransactionArr, + tx: DbReadWriteTransaction, ) => Promise, ): Promise { const accessibleStores: { [x: string]: StoreWithIndexes } = @@ -908,7 +814,7 @@ export class DbAccess { runReadOnlyTx>>( storeNames: StoreNameArray, - txf: (tx: DbReadOnlyTransactionArr) => Promise, + txf: (tx: DbReadOnlyTransaction) => Promise, ): Promise { const accessibleStores: { [x: string]: StoreWithIndexes } = {}; @@ -922,57 +828,4 @@ export class DbAccess { const readContext = makeReadContext(tx, accessibleStores); return runTx(tx, readContext, txf); } - - /** - * Run a transaction with selected object stores. - * - * The {@link namePicker} must be a function that selects a list of object - * stores from all available object stores. - */ - mktx< - StoreNames extends keyof StoreMap, - Stores extends StoreMap[StoreNames], - StoreList extends Stores[], - BoundStores extends { - [X in StoreNamesOf]: StoreList[number] & { storeName: X }; - }, - >(namePicker: (x: StoreMap) => StoreList): DbTransactionContext { - const storeNames: string[] = []; - const accessibleStores: { [x: string]: StoreWithIndexes } = - {}; - - const storePick = namePicker(this.stores) as any; - if (typeof storePick !== "object" || storePick === null) { - throw Error(); - } - for (const swiPicked of storePick) { - const swi = swiPicked as StoreWithIndexes; - if (swi.mark !== storeWithIndexesSymbol) { - throw Error("invalid store descriptor returned from selector function"); - } - storeNames.push(swi.storeName); - accessibleStores[swi.storeName] = swi; - } - - const runReadOnly = ( - txf: ReadOnlyTransactionFunction, - ): Promise => { - const tx = this.db.transaction(storeNames, "readonly"); - const readContext = makeReadContext(tx, accessibleStores); - return runTx(tx, readContext, txf); - }; - - const runReadWrite = ( - txf: ReadWriteTransactionFunction, - ): Promise => { - const tx = this.db.transaction(storeNames, "readwrite"); - const writeContext = makeWriteContext(tx, accessibleStores); - return runTx(tx, writeContext, txf); - }; - - return { - runReadOnly, - runReadWrite, - }; - } } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index cfe171bd0..45970e770 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -141,6 +141,8 @@ import { CoinSourceType, ConfigRecordKey, DenominationRecord, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, WalletStoresV1, clearDatabase, exportDb, @@ -268,11 +270,7 @@ import { getMaxPeerPushAmount, } from "./util/instructedAmountConversion.js"; import { checkDbInvariant } from "./util/invariants.js"; -import { - DbAccess, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "./util/query.js"; +import { DbAccess } from "./util/query.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION, @@ -306,30 +304,28 @@ async function runTaskLoop( */ async function fillDefaults(ws: InternalWalletState): Promise { const notifications: WalletNotification[] = []; - await ws.db - .mktx((x) => [x.config, x.exchanges, x.exchangeDetails]) - .runReadWrite(async (tx) => { - const appliedRec = await tx.config.get("currencyDefaultsApplied"); - let alreadyApplied = appliedRec ? !!appliedRec.value : false; - if (alreadyApplied) { - logger.trace("defaults already applied"); - return; - } - for (const exch of ws.config.builtin.exchanges) { - const resp = await addPresetExchangeEntry( - tx, - exch.exchangeBaseUrl, - exch.currencyHint, - ); - if (resp.notification) { - notifications.push(resp.notification); - } + await ws.db.runReadWriteTx(["config", "exchanges"], async (tx) => { + const appliedRec = await tx.config.get("currencyDefaultsApplied"); + let alreadyApplied = appliedRec ? !!appliedRec.value : false; + if (alreadyApplied) { + logger.trace("defaults already applied"); + return; + } + for (const exch of ws.config.builtin.exchanges) { + const resp = await addPresetExchangeEntry( + tx, + exch.exchangeBaseUrl, + exch.currencyHint, + ); + if (resp.notification) { + notifications.push(resp.notification); } - await tx.config.put({ - key: ConfigRecordKey.CurrencyDefaultsApplied, - value: true, - }); + } + await tx.config.put({ + key: ConfigRecordKey.CurrencyDefaultsApplied, + value: true, }); + }); for (const notif of notifications) { ws.notify(notif); } @@ -344,25 +340,23 @@ async function listKnownBankAccounts( currency?: string, ): Promise { const accounts: KnownBankAccountsInfo[] = []; - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadOnly(async (tx) => { - const knownAccounts = await tx.bankAccounts.iter().toArray(); - for (const r of knownAccounts) { - if (currency && currency !== r.currency) { - continue; - } - const payto = parsePaytoUri(r.uri); - if (payto) { - accounts.push({ - uri: payto, - alias: r.alias, - kyc_completed: r.kycCompleted, - currency: r.currency, - }); - } + await ws.db.runReadOnlyTx(["bankAccounts"], async (tx) => { + const knownAccounts = await tx.bankAccounts.iter().toArray(); + for (const r of knownAccounts) { + if (currency && currency !== r.currency) { + continue; } - }); + const payto = parsePaytoUri(r.uri); + if (payto) { + accounts.push({ + uri: payto, + alias: r.alias, + kyc_completed: r.kycCompleted, + currency: r.currency, + }); + } + } + }); return { accounts }; } @@ -374,16 +368,14 @@ async function addKnownBankAccounts( alias: string, currency: string, ): Promise { - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadWrite(async (tx) => { - tx.bankAccounts.put({ - uri: payto, - alias: alias, - currency: currency, - kycCompleted: false, - }); + await ws.db.runReadWriteTx(["bankAccounts"], async (tx) => { + tx.bankAccounts.put({ + uri: payto, + alias: alias, + currency: currency, + kycCompleted: false, }); + }); return; } @@ -393,15 +385,13 @@ async function forgetKnownBankAccounts( ws: InternalWalletState, payto: string, ): Promise { - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadWrite(async (tx) => { - const account = await tx.bankAccounts.get(payto); - if (!account) { - throw Error(`account not found: ${payto}`); - } - tx.bankAccounts.delete(account.uri); - }); + await ws.db.runReadWriteTx(["bankAccounts"], async (tx) => { + const account = await tx.bankAccounts.get(payto); + if (!account) { + throw Error(`account not found: ${payto}`); + } + tx.bankAccounts.delete(account.uri); + }); return; } @@ -410,41 +400,39 @@ async function setCoinSuspended( coinPub: string, suspended: boolean, ): Promise { - await ws.db - .mktx((x) => [x.coins, x.coinAvailability]) - .runReadWrite(async (tx) => { - const c = await tx.coins.get(coinPub); - if (!c) { - logger.warn(`coin ${coinPub} not found, won't suspend`); + await ws.db.runReadWriteTx(["coins", "coinAvailability"], async (tx) => { + const c = await tx.coins.get(coinPub); + if (!c) { + logger.warn(`coin ${coinPub} not found, won't suspend`); + return; + } + const coinAvailability = await tx.coinAvailability.get([ + c.exchangeBaseUrl, + c.denomPubHash, + c.maxAge, + ]); + checkDbInvariant(!!coinAvailability); + if (suspended) { + if (c.status !== CoinStatus.Fresh) { return; } - const coinAvailability = await tx.coinAvailability.get([ - c.exchangeBaseUrl, - c.denomPubHash, - c.maxAge, - ]); - checkDbInvariant(!!coinAvailability); - if (suspended) { - if (c.status !== CoinStatus.Fresh) { - return; - } - if (coinAvailability.freshCoinCount === 0) { - throw Error( - `invalid coin count ${coinAvailability.freshCoinCount} in DB`, - ); - } - coinAvailability.freshCoinCount--; - c.status = CoinStatus.FreshSuspended; - } else { - if (c.status == CoinStatus.Dormant) { - return; - } - coinAvailability.freshCoinCount++; - c.status = CoinStatus.Fresh; + if (coinAvailability.freshCoinCount === 0) { + throw Error( + `invalid coin count ${coinAvailability.freshCoinCount} in DB`, + ); } - await tx.coins.put(c); - await tx.coinAvailability.put(coinAvailability); - }); + coinAvailability.freshCoinCount--; + c.status = CoinStatus.FreshSuspended; + } else { + if (c.status == CoinStatus.Dormant) { + return; + } + coinAvailability.freshCoinCount++; + c.status = CoinStatus.Fresh; + } + await tx.coins.put(c); + await tx.coinAvailability.put(coinAvailability); + }); } /** @@ -453,57 +441,55 @@ async function setCoinSuspended( async function dumpCoins(ws: InternalWalletState): Promise { const coinsJson: CoinDumpJson = { coins: [] }; logger.info("dumping coins"); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.withdrawalGroups]) - .runReadOnly(async (tx) => { - const coins = await tx.coins.iter().toArray(); - for (const c of coins) { - const denom = await tx.denominations.get([ - c.exchangeBaseUrl, - c.denomPubHash, - ]); - if (!denom) { - logger.warn("no denom found for coin"); - continue; - } - const cs = c.coinSource; - let refreshParentCoinPub: string | undefined; - if (cs.type == CoinSourceType.Refresh) { - refreshParentCoinPub = cs.oldCoinPub; - } - let withdrawalReservePub: string | undefined; - if (cs.type == CoinSourceType.Withdraw) { - withdrawalReservePub = cs.reservePub; - } - const denomInfo = await ws.getDenomInfo( - ws, - tx, - c.exchangeBaseUrl, - c.denomPubHash, - ); - if (!denomInfo) { - logger.warn("no denomination found for coin"); - continue; - } - coinsJson.coins.push({ - coin_pub: c.coinPub, - denom_pub: denomInfo.denomPub, - denom_pub_hash: c.denomPubHash, - denom_value: denom.value, - exchange_base_url: c.exchangeBaseUrl, - refresh_parent_coin_pub: refreshParentCoinPub, - withdrawal_reserve_pub: withdrawalReservePub, - coin_status: c.status, - ageCommitmentProof: c.ageCommitmentProof, - spend_allocation: c.spendAllocation - ? { - amount: c.spendAllocation.amount, - id: c.spendAllocation.id, - } - : undefined, - }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + const coins = await tx.coins.iter().toArray(); + for (const c of coins) { + const denom = await tx.denominations.get([ + c.exchangeBaseUrl, + c.denomPubHash, + ]); + if (!denom) { + logger.warn("no denom found for coin"); + continue; } - }); + const cs = c.coinSource; + let refreshParentCoinPub: string | undefined; + if (cs.type == CoinSourceType.Refresh) { + refreshParentCoinPub = cs.oldCoinPub; + } + let withdrawalReservePub: string | undefined; + if (cs.type == CoinSourceType.Withdraw) { + withdrawalReservePub = cs.reservePub; + } + const denomInfo = await ws.getDenomInfo( + ws, + tx, + c.exchangeBaseUrl, + c.denomPubHash, + ); + if (!denomInfo) { + logger.warn("no denomination found for coin"); + continue; + } + coinsJson.coins.push({ + coin_pub: c.coinPub, + denom_pub: denomInfo.denomPub, + denom_pub_hash: c.denomPubHash, + denom_value: denom.value, + exchange_base_url: c.exchangeBaseUrl, + refresh_parent_coin_pub: refreshParentCoinPub, + withdrawal_reserve_pub: withdrawalReservePub, + coin_status: c.status, + ageCommitmentProof: c.ageCommitmentProof, + spend_allocation: c.spendAllocation + ? { + amount: c.spendAllocation.amount, + id: c.spendAllocation.id, + } + : undefined, + }); + } + }); return coinsJson; } @@ -534,7 +520,7 @@ async function createStoredBackup( const backup = await exportDb(ws.idb); const backupsDb = await openStoredBackupsDatabase(ws.idb); const name = `backup-${new Date().getTime()}`; - await backupsDb.mktxAll().runReadWrite(async (tx) => { + await backupsDb.runAllStoresReadWriteTx(async (tx) => { await tx.backupMeta.add({ name, }); @@ -552,7 +538,7 @@ async function listStoredBackups( storedBackups: [], }; const backupsDb = await openStoredBackupsDatabase(ws.idb); - await backupsDb.mktxAll().runReadWrite(async (tx) => { + await backupsDb.runAllStoresReadWriteTx(async (tx) => { await tx.backupMeta.iter().forEach((x) => { storedBackups.storedBackups.push({ name: x.name, @@ -567,7 +553,7 @@ async function deleteStoredBackup( req: DeleteStoredBackupRequest, ): Promise { const backupsDb = await openStoredBackupsDatabase(ws.idb); - await backupsDb.mktxAll().runReadWrite(async (tx) => { + await backupsDb.runAllStoresReadWriteTx(async (tx) => { await tx.backupData.delete(req.name); await tx.backupMeta.delete(req.name); }); @@ -580,7 +566,7 @@ async function recoverStoredBackup( logger.info(`Recovering stored backup ${req.name}`); const { name } = req; const backupsDb = await openStoredBackupsDatabase(ws.idb); - const bd = await backupsDb.mktxAll().runReadWrite(async (tx) => { + const bd = await backupsDb.runAllStoresReadWriteTx(async (tx) => { const backupMeta = tx.backupMeta.get(name); if (!backupMeta) { throw Error("backup not found"); @@ -1526,7 +1512,20 @@ class InternalWalletStateImpl implements InternalWalletState { async getTransactionState( ws: InternalWalletState, - tx: GetReadOnlyAccess, + tx: WalletDbReadOnlyTransaction< + [ + "depositGroups", + "withdrawalGroups", + "purchases", + "refundGroups", + "peerPullCredit", + "peerPullDebit", + "peerPushDebit", + "peerPushCredit", + "rewards", + "refreshGroups", + ] + >, transactionId: string, ): Promise { const parsedTxId = parseTransactionIdentifier(transactionId); @@ -1613,9 +1612,7 @@ class InternalWalletStateImpl implements InternalWalletState { async getDenomInfo( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - denominations: typeof WalletStoresV1.denominations; - }>, + tx: WalletDbReadWriteTransaction<["denominations"]>, exchangeBaseUrl: string, denomPubHash: string, ): Promise { -- cgit v1.2.3