summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-19 12:49:17 +0100
committerFlorian Dold <florian@dold.me>2024-02-19 17:47:35 +0100
commit1ec521b9d214b286e747b3ccb3113730ac3a2509 (patch)
tree2f3d2b2906810dca45859b8cbfb8d18d53b27e80
parent1034ecb5f20bd8c75e37e0b4b454ea6c1f4c1da6 (diff)
downloadwallet-core-1ec521b9d214b286e747b3ccb3113730ac3a2509.tar.gz
wallet-core-1ec521b9d214b286e747b3ccb3113730ac3a2509.tar.bz2
wallet-core-1ec521b9d214b286e747b3ccb3113730ac3a2509.zip
wallet-core: simplify/unify DB access
-rw-r--r--packages/taler-wallet-core/src/db.ts98
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts28
-rw-r--r--packages/taler-wallet-core/src/operations/attention.ts110
-rw-r--r--packages/taler-wallet-core/src/operations/backup/index.ts472
-rw-r--r--packages/taler-wallet-core/src/operations/balance.ts188
-rw-r--r--packages/taler-wallet-core/src/operations/common.ts25
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts343
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts211
-rw-r--r--packages/taler-wallet-core/src/operations/pay-merchant.ts647
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-common.ts143
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts230
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts124
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts86
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts204
-rw-r--r--packages/taler-wallet-core/src/operations/recoup.ts160
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts239
-rw-r--r--packages/taler-wallet-core/src/operations/reward.ts461
-rw-r--r--packages/taler-wallet-core/src/operations/testing.ts8
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts234
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts530
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts62
-rw-r--r--packages/taler-wallet-core/src/util/coinSelection.ts79
-rw-r--r--packages/taler-wallet-core/src/util/instructedAmountConversion.ts14
-rw-r--r--packages/taler-wallet-core/src/util/query.ts209
-rw-r--r--packages/taler-wallet-core/src/wallet.ts303
25 files changed, 2276 insertions, 2932 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index 0aae2ddff..ff1e87ccb 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -64,10 +64,7 @@ import { DbRetryInfo, TaskIdentifiers } from "./operations/common.js";
import {
DbAccess,
DbReadOnlyTransaction,
- DbReadOnlyTransactionArr,
DbReadWriteTransaction,
- DbReadWriteTransactionArr,
- GetReadWriteAccess,
IndexDescriptor,
StoreDescriptor,
StoreNames,
@@ -1338,9 +1335,9 @@ export enum ConfigRecordKey {
*/
export type ConfigRecord =
| {
- key: ConfigRecordKey.WalletBackupState;
- value: WalletBackupConfState;
- }
+ key: ConfigRecordKey.WalletBackupState;
+ value: WalletBackupConfState;
+ }
| { key: ConfigRecordKey.CurrencyDefaultsApplied; value: boolean }
| { key: ConfigRecordKey.DevMode; value: boolean }
| { key: ConfigRecordKey.TestLoopTx; value: number };
@@ -1618,15 +1615,15 @@ export enum BackupProviderStateTag {
export type BackupProviderState =
| {
- tag: BackupProviderStateTag.Provisional;
- }
+ tag: BackupProviderStateTag.Provisional;
+ }
| {
- tag: BackupProviderStateTag.Ready;
- nextBackupTimestamp: DbPreciseTimestamp;
- }
+ tag: BackupProviderStateTag.Ready;
+ nextBackupTimestamp: DbPreciseTimestamp;
+ }
| {
- tag: BackupProviderStateTag.Retrying;
- };
+ tag: BackupProviderStateTag.Retrying;
+ };
export interface BackupProviderTerms {
supportedProtocolVersion: string;
@@ -2715,21 +2712,23 @@ export const WalletStoresV1 = {
type WalletStoreNames = StoreNames<typeof WalletStoresV1>;
-export type WalletDbReadOnlyTransaction<
- Stores extends StoreNames<typeof WalletStoresV1> & string,
-> = DbReadOnlyTransaction<typeof WalletStoresV1, Stores>;
-
export type WalletDbReadWriteTransaction<
- Stores extends StoreNames<typeof WalletStoresV1> & string,
-> = DbReadWriteTransaction<typeof WalletStoresV1, Stores>;
-
-export type WalletDbReadWriteTransactionArr<
StoresArr extends Array<StoreNames<typeof WalletStoresV1>>,
-> = DbReadWriteTransactionArr<typeof WalletStoresV1, StoresArr>;
+> = DbReadWriteTransaction<typeof WalletStoresV1, StoresArr>;
-export type WalletDbReadOnlyTransactionArr<
+export type WalletDbReadOnlyTransaction<
StoresArr extends Array<StoreNames<typeof WalletStoresV1>>,
-> = DbReadOnlyTransactionArr<typeof WalletStoresV1, StoresArr>;
+> = DbReadOnlyTransaction<typeof WalletStoresV1, StoresArr>;
+
+export type WalletDbAllStoresReadOnlyTransaction<> = DbReadOnlyTransaction<
+ typeof WalletStoresV1,
+ Array<StoreNames<typeof WalletStoresV1>>
+>;
+
+export type WalletDbAllStoresReadWriteTransaction<> = DbReadWriteTransaction<
+ typeof WalletStoresV1,
+ Array<StoreNames<typeof WalletStoresV1>>
+>;
/**
* An applied migration.
@@ -2939,7 +2938,12 @@ export async function importDb(db: IDBDatabase, dumpJson: any): Promise<void> {
export interface FixupDescription {
name: string;
- fn(tx: GetReadWriteAccess<typeof WalletStoresV1>): Promise<void>;
+ fn(
+ tx: DbReadWriteTransaction<
+ typeof WalletStoresV1,
+ Array<StoreNames<typeof WalletStoresV1>>
+ >,
+ ): Promise<void>;
}
/**
@@ -2953,7 +2957,7 @@ export async function applyFixups(
db: DbAccess<typeof WalletStoresV1>,
): Promise<void> {
logger.trace("applying fixups");
- await db.mktxAll().runReadWrite(async (tx) => {
+ await db.runAllStoresReadWriteTx(async (tx) => {
for (const fixupInstruction of walletDbFixups) {
logger.trace(`checking fixup ${fixupInstruction.name}`);
const fixupRecord = await tx.fixups.get(fixupInstruction.name);
@@ -3156,7 +3160,7 @@ export async function openStoredBackupsDatabase(
idbFactory,
TALER_WALLET_STORED_BACKUPS_DB_NAME,
1,
- () => { },
+ () => {},
onStoredBackupsDbUpgradeNeeded,
);
@@ -3179,26 +3183,24 @@ export async function openTalerDatabase(
idbFactory,
TALER_WALLET_META_DB_NAME,
1,
- () => { },
+ () => {},
onMetaDbUpgradeNeeded,
);
const metaDb = new DbAccess(metaDbHandle, walletMetadataStore);
let currentMainVersion: string | undefined;
- await metaDb
- .mktx((stores) => [stores.metaConfig])
- .runReadWrite(async (tx) => {
- const dbVersionRecord = await tx.metaConfig.get(CURRENT_DB_CONFIG_KEY);
- if (!dbVersionRecord) {
- currentMainVersion = TALER_WALLET_MAIN_DB_NAME;
- await tx.metaConfig.put({
- key: CURRENT_DB_CONFIG_KEY,
- value: TALER_WALLET_MAIN_DB_NAME,
- });
- } else {
- currentMainVersion = dbVersionRecord.value;
- }
- });
+ await metaDb.runReadWriteTx(["metaConfig"], async (tx) => {
+ const dbVersionRecord = await tx.metaConfig.get(CURRENT_DB_CONFIG_KEY);
+ if (!dbVersionRecord) {
+ currentMainVersion = TALER_WALLET_MAIN_DB_NAME;
+ await tx.metaConfig.put({
+ key: CURRENT_DB_CONFIG_KEY,
+ value: TALER_WALLET_MAIN_DB_NAME,
+ });
+ } else {
+ currentMainVersion = dbVersionRecord.value;
+ }
+ });
if (currentMainVersion !== TALER_WALLET_MAIN_DB_NAME) {
switch (currentMainVersion) {
@@ -3212,14 +3214,12 @@ export async function openTalerDatabase(
case "taler-wallet-main-v9":
// We consider this a pre-release
// development version, no migration is done.
- await metaDb
- .mktx((stores) => [stores.metaConfig])
- .runReadWrite(async (tx) => {
- await tx.metaConfig.put({
- key: CURRENT_DB_CONFIG_KEY,
- value: TALER_WALLET_MAIN_DB_NAME,
- });
+ await metaDb.runReadWriteTx(["metaConfig"], async (tx) => {
+ await tx.metaConfig.put({
+ key: CURRENT_DB_CONFIG_KEY,
+ value: TALER_WALLET_MAIN_DB_NAME,
});
+ });
break;
default:
throw Error(
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index 13578adda..3ff9d8064 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -37,13 +37,14 @@ import {
} from "@gnu-taler/taler-util";
import { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
-import { WalletStoresV1 } from "./db.js";
-import { TaskScheduler } from "./shepherd.js";
import {
- DbAccess,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "./util/query.js";
+ WalletDbAllStoresReadOnlyTransaction,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
+ WalletStoresV1,
+} from "./db.js";
+import { TaskScheduler } from "./shepherd.js";
+import { DbAccess } from "./util/query.js";
import { TimerGroup } from "./util/timer.js";
import { WalletConfig } from "./wallet-api-types.js";
@@ -62,12 +63,9 @@ export interface MerchantInfo {
export interface RecoupOperations {
createRecoupGroup(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- recoupGroups: typeof WalletStoresV1.recoupGroups;
- denominations: typeof WalletStoresV1.denominations;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coins: typeof WalletStoresV1.coins;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["recoupGroups", "denominations", "refreshGroups"]
+ >,
exchangeBaseUrl: string,
coinPubs: string[],
): Promise<string>;
@@ -106,15 +104,13 @@ export interface InternalWalletState {
getTransactionState(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ tx: WalletDbAllStoresReadOnlyTransaction,
transactionId: string,
): Promise<TransactionState | undefined>;
getDenomInfo(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- denominations: typeof WalletStoresV1.denominations;
- }>,
+ tx: WalletDbReadOnlyTransaction<["denominations"]>,
exchangeBaseUrl: string,
denomPubHash: string,
): Promise<DenominationInfo | undefined>;
diff --git a/packages/taler-wallet-core/src/operations/attention.ts b/packages/taler-wallet-core/src/operations/attention.ts
index 92d69e93e..7d8b11e79 100644
--- a/packages/taler-wallet-core/src/operations/attention.ts
+++ b/packages/taler-wallet-core/src/operations/attention.ts
@@ -18,20 +18,18 @@
* Imports.
*/
import {
- AbsoluteTime,
AttentionInfo,
Logger,
- TalerProtocolTimestamp,
TalerPreciseTimestamp,
UserAttentionByIdRequest,
UserAttentionPriority,
+ UserAttentionUnreadList,
UserAttentionsCountResponse,
UserAttentionsRequest,
UserAttentionsResponse,
- UserAttentionUnreadList,
} from "@gnu-taler/taler-util";
-import { InternalWalletState } from "../internal-wallet-state.js";
import { timestampPreciseFromDb, timestampPreciseToDb } from "../index.js";
+import { InternalWalletState } from "../internal-wallet-state.js";
const logger = new Logger("operations/attention.ts");
@@ -39,23 +37,21 @@ export async function getUserAttentionsUnreadCount(
ws: InternalWalletState,
req: UserAttentionsRequest,
): Promise<UserAttentionsCountResponse> {
- const total = await ws.db
- .mktx((x) => [x.userAttention])
- .runReadOnly(async (tx) => {
- let count = 0;
- await tx.userAttention.iter().forEach((x) => {
- if (
- req.priority !== undefined &&
- UserAttentionPriority[x.info.type] !== req.priority
- )
- return;
- if (x.read !== undefined) return;
- count++;
- });
-
- return count;
+ const total = await ws.db.runReadOnlyTx(["userAttention"], async (tx) => {
+ let count = 0;
+ await tx.userAttention.iter().forEach((x) => {
+ if (
+ req.priority !== undefined &&
+ UserAttentionPriority[x.info.type] !== req.priority
+ )
+ return;
+ if (x.read !== undefined) return;
+ count++;
});
+ return count;
+ });
+
return { total };
}
@@ -63,41 +59,37 @@ export async function getUserAttentions(
ws: InternalWalletState,
req: UserAttentionsRequest,
): Promise<UserAttentionsResponse> {
- return await ws.db
- .mktx((x) => [x.userAttention])
- .runReadOnly(async (tx) => {
- const pending: UserAttentionUnreadList = [];
- await tx.userAttention.iter().forEach((x) => {
- if (
- req.priority !== undefined &&
- UserAttentionPriority[x.info.type] !== req.priority
- )
- return;
- pending.push({
- info: x.info,
- when: timestampPreciseFromDb(x.created),
- read: x.read !== undefined,
- });
+ return await ws.db.runReadOnlyTx(["userAttention"], async (tx) => {
+ const pending: UserAttentionUnreadList = [];
+ await tx.userAttention.iter().forEach((x) => {
+ if (
+ req.priority !== undefined &&
+ UserAttentionPriority[x.info.type] !== req.priority
+ )
+ return;
+ pending.push({
+ info: x.info,
+ when: timestampPreciseFromDb(x.created),
+ read: x.read !== undefined,
});
-
- return { pending };
});
+
+ return { pending };
+ });
}
export async function markAttentionRequestAsRead(
ws: InternalWalletState,
req: UserAttentionByIdRequest,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.userAttention])
- .runReadWrite(async (tx) => {
- const ua = await tx.userAttention.get([req.entityId, req.type]);
- if (!ua) throw Error("attention request not found");
- tx.userAttention.put({
- ...ua,
- read: timestampPreciseToDb(TalerPreciseTimestamp.now()),
- });
+ await ws.db.runReadWriteTx(["userAttention"], async (tx) => {
+ const ua = await tx.userAttention.get([req.entityId, req.type]);
+ if (!ua) throw Error("attention request not found");
+ tx.userAttention.put({
+ ...ua,
+ read: timestampPreciseToDb(TalerPreciseTimestamp.now()),
});
+ });
}
/**
@@ -112,16 +104,14 @@ export async function addAttentionRequest(
info: AttentionInfo,
entityId: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.userAttention])
- .runReadWrite(async (tx) => {
- await tx.userAttention.put({
- info,
- entityId,
- created: timestampPreciseToDb(TalerPreciseTimestamp.now()),
- read: undefined,
- });
+ await ws.db.runReadWriteTx(["userAttention"], async (tx) => {
+ await tx.userAttention.put({
+ info,
+ entityId,
+ created: timestampPreciseToDb(TalerPreciseTimestamp.now()),
+ read: undefined,
});
+ });
}
/**
@@ -135,11 +125,9 @@ export async function removeAttentionRequest(
ws: InternalWalletState,
req: UserAttentionByIdRequest,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.userAttention])
- .runReadWrite(async (tx) => {
- const ua = await tx.userAttention.get([req.entityId, req.type]);
- if (!ua) throw Error("attention request not found");
- await tx.userAttention.delete([req.entityId, req.type]);
- });
+ await ws.db.runReadWriteTx(["userAttention"], async (tx) => {
+ const ua = await tx.userAttention.get([req.entityId, req.type]);
+ if (!ua) throw Error("attention request not found");
+ await tx.userAttention.delete([req.entityId, req.type]);
+ });
}
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts
index e4e4e43f6..948b8eb85 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -79,7 +79,7 @@ import {
ConfigRecord,
ConfigRecordKey,
WalletBackupConfState,
- WalletStoresV1,
+ WalletDbReadOnlyTransaction,
timestampOptionalPreciseFromDb,
timestampPreciseToDb,
} from "../../db.js";
@@ -88,7 +88,6 @@ import {
checkDbInvariant,
checkLogicInvariant,
} from "../../util/invariants.js";
-import { GetReadOnlyAccess } from "../../util/query.js";
import { addAttentionRequest, removeAttentionRequest } from "../attention.js";
import {
TaskIdentifiers,
@@ -187,11 +186,12 @@ async function runBackupCycleForProvider(
ws: InternalWalletState,
args: BackupForProviderArgs,
): Promise<TaskRunResult> {
- const provider = await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadOnly(async (tx) => {
+ const provider = await ws.db.runReadOnlyTx(
+ ["backupProviders"],
+ async (tx) => {
return tx.backupProviders.get(args.backupProviderBaseUrl);
- });
+ },
+ );
if (!provider) {
logger.warn("provider disappeared");
@@ -248,22 +248,20 @@ async function runBackupCycleForProvider(
logger.trace(`sync response status: ${resp.status}`);
if (resp.status === HttpStatusCode.NotModified) {
- await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadWrite(async (tx) => {
- const prov = await tx.backupProviders.get(provider.baseUrl);
- if (!prov) {
- return;
- }
- prov.lastBackupCycleTimestamp = timestampPreciseToDb(
- TalerPreciseTimestamp.now(),
- );
- prov.state = {
- tag: BackupProviderStateTag.Ready,
- nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()),
- };
- await tx.backupProviders.put(prov);
- });
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ const prov = await tx.backupProviders.get(provider.baseUrl);
+ if (!prov) {
+ return;
+ }
+ prov.lastBackupCycleTimestamp = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ prov.state = {
+ tag: BackupProviderStateTag.Ready,
+ nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()),
+ };
+ await tx.backupProviders.put(prov);
+ });
removeAttentionRequest(ws, {
entityId: provider.baseUrl,
@@ -296,46 +294,42 @@ async function runBackupCycleForProvider(
if (res === undefined) {
//claimed
- await ws.db
- .mktx((x) => [x.backupProviders, x.operationRetries])
- .runReadWrite(async (tx) => {
- const prov = await tx.backupProviders.get(provider.baseUrl);
- if (!prov) {
- logger.warn("backup provider not found anymore");
- return;
- }
- prov.shouldRetryFreshProposal = true;
- prov.state = {
- tag: BackupProviderStateTag.Retrying,
- };
- await tx.backupProviders.put(prov);
- });
-
- throw Error("not implemented");
- // return {
- // type: TaskRunResultType.Pending,
- // };
- }
- const result = res;
-
- await ws.db
- .mktx((x) => [x.backupProviders, x.operationRetries])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
const prov = await tx.backupProviders.get(provider.baseUrl);
if (!prov) {
logger.warn("backup provider not found anymore");
return;
}
- const opId = TaskIdentifiers.forBackup(prov);
- //await scheduleRetryInTx(ws, tx, opId);
- prov.currentPaymentProposalId = result.proposalId;
- prov.shouldRetryFreshProposal = false;
+ prov.shouldRetryFreshProposal = true;
prov.state = {
tag: BackupProviderStateTag.Retrying,
};
await tx.backupProviders.put(prov);
});
+ throw Error("not implemented");
+ // return {
+ // type: TaskRunResultType.Pending,
+ // };
+ }
+ const result = res;
+
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ const prov = await tx.backupProviders.get(provider.baseUrl);
+ if (!prov) {
+ logger.warn("backup provider not found anymore");
+ return;
+ }
+ // const opId = TaskIdentifiers.forBackup(prov);
+ // await scheduleRetryInTx(ws, tx, opId);
+ prov.currentPaymentProposalId = result.proposalId;
+ prov.shouldRetryFreshProposal = false;
+ prov.state = {
+ tag: BackupProviderStateTag.Retrying,
+ };
+ await tx.backupProviders.put(prov);
+ });
+
addAttentionRequest(
ws,
{
@@ -353,23 +347,21 @@ async function runBackupCycleForProvider(
}
if (resp.status === HttpStatusCode.NoContent) {
- await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadWrite(async (tx) => {
- const prov = await tx.backupProviders.get(provider.baseUrl);
- if (!prov) {
- return;
- }
- prov.lastBackupHash = encodeCrock(currentBackupHash);
- prov.lastBackupCycleTimestamp = timestampPreciseToDb(
- TalerPreciseTimestamp.now(),
- );
- prov.state = {
- tag: BackupProviderStateTag.Ready,
- nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()),
- };
- await tx.backupProviders.put(prov);
- });
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ const prov = await tx.backupProviders.get(provider.baseUrl);
+ if (!prov) {
+ return;
+ }
+ prov.lastBackupHash = encodeCrock(currentBackupHash);
+ prov.lastBackupCycleTimestamp = timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ );
+ prov.state = {
+ tag: BackupProviderStateTag.Ready,
+ nextBackupTimestamp: timestampPreciseToDb(getNextBackupTimestamp()),
+ };
+ await tx.backupProviders.put(prov);
+ });
removeAttentionRequest(ws, {
entityId: provider.baseUrl,
@@ -388,24 +380,22 @@ async function runBackupCycleForProvider(
// const blob = await decryptBackup(backupConfig, backupEnc);
// FIXME: Re-implement backup import with merging
// await importBackup(ws, blob, cryptoData);
- await ws.db
- .mktx((x) => [x.backupProviders, x.operationRetries])
- .runReadWrite(async (tx) => {
- const prov = await tx.backupProviders.get(provider.baseUrl);
- if (!prov) {
- logger.warn("backup provider not found anymore");
- return;
- }
- prov.lastBackupHash = encodeCrock(hash(backupEnc));
- // FIXME: Allocate error code for this situation?
- // FIXME: Add operation retry record!
- const opId = TaskIdentifiers.forBackup(prov);
- //await scheduleRetryInTx(ws, tx, opId);
- prov.state = {
- tag: BackupProviderStateTag.Retrying,
- };
- await tx.backupProviders.put(prov);
- });
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ const prov = await tx.backupProviders.get(provider.baseUrl);
+ if (!prov) {
+ logger.warn("backup provider not found anymore");
+ return;
+ }
+ prov.lastBackupHash = encodeCrock(hash(backupEnc));
+ // FIXME: Allocate error code for this situation?
+ // FIXME: Add operation retry record!
+ const opId = TaskIdentifiers.forBackup(prov);
+ //await scheduleRetryInTx(ws, tx, opId);
+ prov.state = {
+ tag: BackupProviderStateTag.Retrying,
+ };
+ await tx.backupProviders.put(prov);
+ });
logger.info("processed existing backup");
// Now upload our own, merged backup.
return await runBackupCycleForProvider(ws, args);
@@ -427,11 +417,12 @@ export async function processBackupForProvider(
ws: InternalWalletState,
backupProviderBaseUrl: string,
): Promise<TaskRunResult> {
- const provider = await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadOnly(async (tx) => {
+ const provider = await ws.db.runReadOnlyTx(
+ ["backupProviders"],
+ async (tx) => {
return await tx.backupProviders.get(backupProviderBaseUrl);
- });
+ },
+ );
if (!provider) {
throw Error("unknown backup provider");
}
@@ -457,11 +448,9 @@ export async function removeBackupProvider(
ws: InternalWalletState,
req: RemoveBackupProviderRequest,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadWrite(async (tx) => {
- await tx.backupProviders.delete(req.provider);
- });
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ await tx.backupProviders.delete(req.provider);
+ });
}
export interface RunBackupCycleRequest {
@@ -487,9 +476,9 @@ export async function runBackupCycle(
ws: InternalWalletState,
req: RunBackupCycleRequest,
): Promise<void> {
- const providers = await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadOnly(async (tx) => {
+ const providers = await ws.db.runReadOnlyTx(
+ ["backupProviders"],
+ async (tx) => {
if (req.providers) {
const rs = await Promise.all(
req.providers.map((id) => tx.backupProviders.get(id)),
@@ -497,7 +486,8 @@ export async function runBackupCycle(
return rs.filter(notEmpty);
}
return await tx.backupProviders.iter().toArray();
- });
+ },
+ );
for (const provider of providers) {
await runBackupCycleForProvider(ws, {
@@ -587,62 +577,56 @@ export async function addBackupProvider(
logger.info(`adding backup provider ${j2s(req)}`);
await provideBackupState(ws);
const canonUrl = canonicalizeBaseUrl(req.backupProviderBaseUrl);
- await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadWrite(async (tx) => {
- const oldProv = await tx.backupProviders.get(canonUrl);
- if (oldProv) {
- logger.info("old backup provider found");
- if (req.activate) {
- oldProv.state = {
- tag: BackupProviderStateTag.Ready,
- nextBackupTimestamp: timestampPreciseToDb(
- TalerPreciseTimestamp.now(),
- ),
- };
- logger.info("setting existing backup provider to active");
- await tx.backupProviders.put(oldProv);
- }
- return;
- }
- });
- const termsUrl = new URL("config", canonUrl);
- const resp = await ws.http.fetch(termsUrl.href);
- const terms = await readSuccessResponseJsonOrThrow(
- resp,
- codecForSyncTermsOfServiceResponse(),
- );
- await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadWrite(async (tx) => {
- let state: BackupProviderState;
- //FIXME: what is the difference provisional and ready?
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ const oldProv = await tx.backupProviders.get(canonUrl);
+ if (oldProv) {
+ logger.info("old backup provider found");
if (req.activate) {
- state = {
+ oldProv.state = {
tag: BackupProviderStateTag.Ready,
nextBackupTimestamp: timestampPreciseToDb(
TalerPreciseTimestamp.now(),
),
};
- } else {
- state = {
- tag: BackupProviderStateTag.Provisional,
- };
+ logger.info("setting existing backup provider to active");
+ await tx.backupProviders.put(oldProv);
}
- await tx.backupProviders.put({
- state,
- name: req.name,
- terms: {
- annualFee: terms.annual_fee,
- storageLimitInMegabytes: terms.storage_limit_in_megabytes,
- supportedProtocolVersion: terms.version,
- },
- shouldRetryFreshProposal: false,
- paymentProposalIds: [],
- baseUrl: canonUrl,
- uids: [encodeCrock(getRandomBytes(32))],
- });
+ return;
+ }
+ });
+ const termsUrl = new URL("config", canonUrl);
+ const resp = await ws.http.fetch(termsUrl.href);
+ const terms = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForSyncTermsOfServiceResponse(),
+ );
+ await ws.db.runReadWriteTx(["backupProviders"], async (tx) => {
+ let state: BackupProviderState;
+ //FIXME: what is the difference provisional and ready?
+ if (req.activate) {
+ state = {
+ tag: BackupProviderStateTag.Ready,
+ nextBackupTimestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()),
+ };
+ } else {
+ state = {
+ tag: BackupProviderStateTag.Provisional,
+ };
+ }
+ await tx.backupProviders.put({
+ state,
+ name: req.name,
+ terms: {
+ annualFee: terms.annual_fee,
+ storageLimitInMegabytes: terms.storage_limit_in_megabytes,
+ supportedProtocolVersion: terms.version,
+ },
+ shouldRetryFreshProposal: false,
+ paymentProposalIds: [],
+ baseUrl: canonUrl,
+ uids: [encodeCrock(getRandomBytes(32))],
});
+ });
return await runFirstBackupCycleForProvider(ws, {
backupProviderBaseUrl: canonUrl,
@@ -827,9 +811,9 @@ export async function getBackupInfo(
ws: InternalWalletState,
): Promise<BackupInfo> {
const backupConfig = await provideBackupState(ws);
- const providerRecords = await ws.db
- .mktx((x) => [x.backupProviders, x.operationRetries])
- .runReadOnly(async (tx) => {
+ const providerRecords = await ws.db.runReadOnlyTx(
+ ["backupProviders", "operationRetries"],
+ async (tx) => {
return await tx.backupProviders.iter().mapAsync(async (bp) => {
const opId = TaskIdentifiers.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
@@ -838,7 +822,8 @@ export async function getBackupInfo(
retryRecord,
};
});
- });
+ },
+ );
const providers: ProviderInfo[] = [];
for (const x of providerRecords) {
providers.push({
@@ -872,11 +857,12 @@ export async function getBackupRecovery(
ws: InternalWalletState,
): Promise<BackupRecovery> {
const bs = await provideBackupState(ws);
- const providers = await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadOnly(async (tx) => {
+ const providers = await ws.db.runReadOnlyTx(
+ ["backupProviders"],
+ async (tx) => {
return await tx.backupProviders.iter().toArray();
- });
+ },
+ );
return {
providers: providers
.filter((x) => x.state.tag !== BackupProviderStateTag.Provisional)
@@ -894,50 +880,48 @@ async function backupRecoveryTheirs(
ws: InternalWalletState,
br: BackupRecovery,
) {
- await ws.db
- .mktx((x) => [x.config, x.backupProviders])
- .runReadWrite(async (tx) => {
- let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
- ConfigRecordKey.WalletBackupState,
- );
- checkDbInvariant(!!backupStateEntry);
- checkDbInvariant(
- backupStateEntry.key === ConfigRecordKey.WalletBackupState,
- );
- backupStateEntry.value.lastBackupNonce = undefined;
- backupStateEntry.value.lastBackupTimestamp = undefined;
- backupStateEntry.value.lastBackupCheckTimestamp = undefined;
- backupStateEntry.value.lastBackupPlainHash = undefined;
- backupStateEntry.value.walletRootPriv = br.walletRootPriv;
- backupStateEntry.value.walletRootPub = encodeCrock(
- eddsaGetPublic(decodeCrock(br.walletRootPriv)),
- );
- await tx.config.put(backupStateEntry);
- for (const prov of br.providers) {
- const existingProv = await tx.backupProviders.get(prov.url);
- if (!existingProv) {
- await tx.backupProviders.put({
- baseUrl: prov.url,
- name: prov.name,
- paymentProposalIds: [],
- shouldRetryFreshProposal: false,
- state: {
- tag: BackupProviderStateTag.Ready,
- nextBackupTimestamp: timestampPreciseToDb(
- TalerPreciseTimestamp.now(),
- ),
- },
- uids: [encodeCrock(getRandomBytes(32))],
- });
- }
- }
- const providers = await tx.backupProviders.iter().toArray();
- for (const prov of providers) {
- prov.lastBackupCycleTimestamp = undefined;
- prov.lastBackupHash = undefined;
- await tx.backupProviders.put(prov);
+ await ws.db.runReadWriteTx(["backupProviders", "config"], async (tx) => {
+ let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
+ ConfigRecordKey.WalletBackupState,
+ );
+ checkDbInvariant(!!backupStateEntry);
+ checkDbInvariant(
+ backupStateEntry.key === ConfigRecordKey.WalletBackupState,
+ );
+ backupStateEntry.value.lastBackupNonce = undefined;
+ backupStateEntry.value.lastBackupTimestamp = undefined;
+ backupStateEntry.value.lastBackupCheckTimestamp = undefined;
+ backupStateEntry.value.lastBackupPlainHash = undefined;
+ backupStateEntry.value.walletRootPriv = br.walletRootPriv;
+ backupStateEntry.value.walletRootPub = encodeCrock(
+ eddsaGetPublic(decodeCrock(br.walletRootPriv)),
+ );
+ await tx.config.put(backupStateEntry);
+ for (const prov of br.providers) {
+ const existingProv = await tx.backupProviders.get(prov.url);
+ if (!existingProv) {
+ await tx.backupProviders.put({
+ baseUrl: prov.url,
+ name: prov.name,
+ paymentProposalIds: [],
+ shouldRetryFreshProposal: false,
+ state: {
+ tag: BackupProviderStateTag.Ready,
+ nextBackupTimestamp: timestampPreciseToDb(
+ TalerPreciseTimestamp.now(),
+ ),
+ },
+ uids: [encodeCrock(getRandomBytes(32))],
+ });
}
- });
+ }
+ const providers = await tx.backupProviders.iter().toArray();
+ for (const prov of providers) {
+ prov.lastBackupCycleTimestamp = undefined;
+ prov.lastBackupHash = undefined;
+ await tx.backupProviders.put(prov);
+ }
+ });
}
async function backupRecoveryOurs(ws: InternalWalletState, br: BackupRecovery) {
@@ -949,11 +933,12 @@ export async function loadBackupRecovery(
br: RecoveryLoadRequest,
): Promise<void> {
const bs = await provideBackupState(ws);
- const providers = await ws.db
- .mktx((x) => [x.backupProviders])
- .runReadOnly(async (tx) => {
+ const providers = await ws.db.runReadOnlyTx(
+ ["backupProviders"],
+ async (tx) => {
return await tx.backupProviders.iter().toArray();
- });
+ },
+ );
let strategy = br.strategy;
if (
br.recovery.walletRootPriv != bs.walletRootPriv &&
@@ -996,11 +981,12 @@ export async function decryptBackup(
export async function provideBackupState(
ws: InternalWalletState,
): Promise<WalletBackupConfState> {
- const bs: ConfigRecord | undefined = await ws.db
- .mktx((stores) => [stores.config])
- .runReadOnly(async (tx) => {
+ const bs: ConfigRecord | undefined = await ws.db.runReadOnlyTx(
+ ["config"],
+ async (tx) => {
return await tx.config.get(ConfigRecordKey.WalletBackupState);
- });
+ },
+ );
if (bs) {
checkDbInvariant(bs.key === ConfigRecordKey.WalletBackupState);
return bs.value;
@@ -1012,34 +998,32 @@ export async function provideBackupState(
// FIXME: device ID should be configured when wallet is initialized
// and be based on hostname
const deviceId = `wallet-core-${encodeCrock(d)}`;
- return await ws.db
- .mktx((x) => [x.config])
- .runReadWrite(async (tx) => {
- let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
- ConfigRecordKey.WalletBackupState,
- );
- if (!backupStateEntry) {
- backupStateEntry = {
- key: ConfigRecordKey.WalletBackupState,
- value: {
- deviceId,
- walletRootPub: k.pub,
- walletRootPriv: k.priv,
- lastBackupPlainHash: undefined,
- },
- };
- await tx.config.put(backupStateEntry);
- }
- checkDbInvariant(
- backupStateEntry.key === ConfigRecordKey.WalletBackupState,
- );
- return backupStateEntry.value;
- });
+ return await ws.db.runReadWriteTx(["config"], async (tx) => {
+ let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
+ ConfigRecordKey.WalletBackupState,
+ );
+ if (!backupStateEntry) {
+ backupStateEntry = {
+ key: ConfigRecordKey.WalletBackupState,
+ value: {
+ deviceId,
+ walletRootPub: k.pub,
+ walletRootPriv: k.priv,
+ lastBackupPlainHash: undefined,
+ },
+ };
+ await tx.config.put(backupStateEntry);
+ }
+ checkDbInvariant(
+ backupStateEntry.key === ConfigRecordKey.WalletBackupState,
+ );
+ return backupStateEntry.value;
+ });
}
export async function getWalletBackupState(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<{ config: typeof WalletStoresV1.config }>,
+ tx: WalletDbReadOnlyTransaction<["config"]>,
): Promise<WalletBackupConfState> {
const bs = await tx.config.get(ConfigRecordKey.WalletBackupState);
checkDbInvariant(!!bs, "wallet backup state should be in DB");
@@ -1052,21 +1036,19 @@ export async function setWalletDeviceId(
deviceId: string,
): Promise<void> {
await provideBackupState(ws);
- await ws.db
- .mktx((x) => [x.config])
- .runReadWrite(async (tx) => {
- let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
- ConfigRecordKey.WalletBackupState,
- );
- if (
- !backupStateEntry ||
- backupStateEntry.key !== ConfigRecordKey.WalletBackupState
- ) {
- return;
- }
- backupStateEntry.value.deviceId = deviceId;
- await tx.config.put(backupStateEntry);
- });
+ await ws.db.runReadWriteTx(["config"], async (tx) => {
+ let backupStateEntry: ConfigRecord | undefined = await tx.config.get(
+ ConfigRecordKey.WalletBackupState,
+ );
+ if (
+ !backupStateEntry ||
+ backupStateEntry.key !== ConfigRecordKey.WalletBackupState
+ ) {
+ return;
+ }
+ backupStateEntry.value.deviceId = deviceId;
+ await tx.config.put(backupStateEntry);
+ });
}
export async function getWalletDeviceId(
diff --git a/packages/taler-wallet-core/src/operations/balance.ts b/packages/taler-wallet-core/src/operations/balance.ts
index a73476e9c..12f9795b2 100644
--- a/packages/taler-wallet-core/src/operations/balance.ts
+++ b/packages/taler-wallet-core/src/operations/balance.ts
@@ -71,14 +71,12 @@ import {
OPERATION_STATUS_ACTIVE_LAST,
RefreshGroupRecord,
RefreshOperationStatus,
- WalletDbReadOnlyTransactionArr,
- WalletStoresV1,
+ WalletDbReadOnlyTransaction,
WithdrawalGroupStatus,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkLogicInvariant } from "../util/invariants.js";
-import { GetReadOnlyAccess } from "../util/query.js";
import {
getExchangeScopeInfo,
getExchangeWireDetailsInTx,
@@ -134,7 +132,7 @@ class BalancesStore {
constructor(
private ws: InternalWalletState,
- private tx: WalletDbReadOnlyTransactionArr<
+ private tx: WalletDbReadOnlyTransaction<
[
"globalCurrencyAuditors",
"globalCurrencyExchanges",
@@ -275,7 +273,7 @@ class BalancesStore {
*/
export async function getBalancesInsideTransaction(
ws: InternalWalletState,
- tx: WalletDbReadOnlyTransactionArr<
+ tx: WalletDbReadOnlyTransaction<
[
"exchanges",
"exchangeDetails",
@@ -465,81 +463,79 @@ export async function getAcceptableExchangeBaseUrls(
): Promise<AcceptableExchanges> {
const acceptableExchangeUrls = new Set<string>();
const depositableExchangeUrls = new Set<string>();
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- // FIXME: We should have a DB index to look up all exchanges
- // for a particular auditor ...
-
- const canonExchanges = new Set<string>();
- const canonAuditors = new Set<string>();
-
- for (const exchangeHandle of req.acceptedExchanges) {
- const normUrl = canonicalizeBaseUrl(exchangeHandle.exchangeBaseUrl);
- canonExchanges.add(normUrl);
- }
+ await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => {
+ // FIXME: We should have a DB index to look up all exchanges
+ // for a particular auditor ...
- for (const auditorHandle of req.acceptedAuditors) {
- const normUrl = canonicalizeBaseUrl(auditorHandle.auditorBaseUrl);
- canonAuditors.add(normUrl);
- }
+ const canonExchanges = new Set<string>();
+ const canonAuditors = new Set<string>();
- await tx.exchanges.iter().forEachAsync(async (exchange) => {
- const dp = exchange.detailsPointer;
- if (!dp) {
- return;
- }
- const { currency, masterPublicKey } = dp;
- const exchangeDetails = await tx.exchangeDetails.indexes.byPointer.get([
- exchange.baseUrl,
- currency,
- masterPublicKey,
- ]);
- if (!exchangeDetails) {
- return;
- }
+ for (const exchangeHandle of req.acceptedExchanges) {
+ const normUrl = canonicalizeBaseUrl(exchangeHandle.exchangeBaseUrl);
+ canonExchanges.add(normUrl);
+ }
- let acceptable = false;
+ for (const auditorHandle of req.acceptedAuditors) {
+ const normUrl = canonicalizeBaseUrl(auditorHandle.auditorBaseUrl);
+ canonAuditors.add(normUrl);
+ }
- if (canonExchanges.has(exchange.baseUrl)) {
+ await tx.exchanges.iter().forEachAsync(async (exchange) => {
+ const dp = exchange.detailsPointer;
+ if (!dp) {
+ return;
+ }
+ const { currency, masterPublicKey } = dp;
+ const exchangeDetails = await tx.exchangeDetails.indexes.byPointer.get([
+ exchange.baseUrl,
+ currency,
+ masterPublicKey,
+ ]);
+ if (!exchangeDetails) {
+ return;
+ }
+
+ let acceptable = false;
+
+ if (canonExchanges.has(exchange.baseUrl)) {
+ acceptableExchangeUrls.add(exchange.baseUrl);
+ acceptable = true;
+ }
+ for (const exchangeAuditor of exchangeDetails.auditors) {
+ if (canonAuditors.has(exchangeAuditor.auditor_url)) {
acceptableExchangeUrls.add(exchange.baseUrl);
acceptable = true;
+ break;
}
- for (const exchangeAuditor of exchangeDetails.auditors) {
- if (canonAuditors.has(exchangeAuditor.auditor_url)) {
- acceptableExchangeUrls.add(exchange.baseUrl);
- acceptable = true;
+ }
+
+ if (!acceptable) {
+ return;
+ }
+ // FIXME: Also consider exchange and auditor public key
+ // instead of just base URLs?
+
+ let wireMethodSupported = false;
+ for (const acc of exchangeDetails.wireInfo.accounts) {
+ const pp = parsePaytoUri(acc.payto_uri);
+ checkLogicInvariant(!!pp);
+ for (const wm of req.acceptedWireMethods) {
+ if (pp.targetType === wm) {
+ wireMethodSupported = true;
break;
}
- }
-
- if (!acceptable) {
- return;
- }
- // FIXME: Also consider exchange and auditor public key
- // instead of just base URLs?
-
- let wireMethodSupported = false;
- for (const acc of exchangeDetails.wireInfo.accounts) {
- const pp = parsePaytoUri(acc.payto_uri);
- checkLogicInvariant(!!pp);
- for (const wm of req.acceptedWireMethods) {
- if (pp.targetType === wm) {
- wireMethodSupported = true;
- break;
- }
- if (wireMethodSupported) {
- break;
- }
+ if (wireMethodSupported) {
+ break;
}
}
+ }
- acceptableExchangeUrls.add(exchange.baseUrl);
- if (wireMethodSupported) {
- depositableExchangeUrls.add(exchange.baseUrl);
- }
- });
+ acceptableExchangeUrls.add(exchange.baseUrl);
+ if (wireMethodSupported) {
+ depositableExchangeUrls.add(exchange.baseUrl);
+ }
});
+ });
return {
acceptableExchanges: [...acceptableExchangeUrls],
depositableExchanges: [...depositableExchangeUrls],
@@ -587,15 +583,9 @@ export async function getMerchantPaymentBalanceDetails(
balanceMerchantDepositable: Amounts.zeroOfCurrency(req.currency),
};
- await ws.db
- .mktx((x) => [
- x.coins,
- x.coinAvailability,
- x.refreshGroups,
- x.purchases,
- x.withdrawalGroups,
- ])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ ["coinAvailability", "refreshGroups"],
+ async (tx) => {
await tx.coinAvailability.iter().forEach((ca) => {
if (ca.currency != req.currency) {
return;
@@ -638,7 +628,8 @@ export async function getMerchantPaymentBalanceDetails(
computeRefreshGroupAvailableAmount(r),
).amount;
});
- });
+ },
+ );
return d;
}
@@ -649,27 +640,25 @@ export async function getBalanceDetail(
): Promise<MerchantPaymentBalanceDetails> {
const exchanges: { exchangeBaseUrl: string; exchangePub: string }[] = [];
const wires = new Array<string>();
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- const allExchanges = await tx.exchanges.iter().toArray();
- for (const e of allExchanges) {
- const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
- if (!details || req.currency !== details.currency) {
- continue;
- }
- details.wireInfo.accounts.forEach((a) => {
- const payto = parsePaytoUri(a.payto_uri);
- if (payto && !wires.includes(payto.targetType)) {
- wires.push(payto.targetType);
- }
- });
- exchanges.push({
- exchangePub: details.masterPublicKey,
- exchangeBaseUrl: e.baseUrl,
- });
+ await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
+ if (!details || req.currency !== details.currency) {
+ continue;
}
- });
+ details.wireInfo.accounts.forEach((a) => {
+ const payto = parsePaytoUri(a.payto_uri);
+ if (payto && !wires.includes(payto.targetType)) {
+ wires.push(payto.targetType);
+ }
+ });
+ exchanges.push({
+ exchangePub: details.masterPublicKey,
+ exchangeBaseUrl: e.baseUrl,
+ });
+ }
+ });
return await getMerchantPaymentBalanceDetails(ws, {
currency: req.currency,
@@ -699,10 +688,7 @@ export interface PeerPaymentBalanceDetails {
export async function getPeerPaymentBalanceDetailsInTx(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- }>,
+ tx: WalletDbReadOnlyTransaction<["coinAvailability", "refreshGroups"]>,
req: PeerPaymentRestrictionsForBalance,
): Promise<PeerPaymentBalanceDetails> {
let balanceAvailable = Amounts.zeroOfCurrency(req.currency);
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts
index 92950b35b..6bafa632e 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -53,7 +53,7 @@ import {
RecoupGroupRecord,
RefreshGroupRecord,
RewardRecord,
- WalletStoresV1,
+ WalletDbReadWriteTransaction,
WithdrawalGroupRecord,
timestampPreciseToDb,
} from "../db.js";
@@ -61,7 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import { GetReadWriteAccess } from "../util/query.js";
import { createRefreshGroup } from "./refresh.js";
const logger = new Logger("operations/common.ts");
@@ -78,10 +77,7 @@ export interface CoinsSpendInfo {
export async function makeCoinsVisible(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- coins: typeof WalletStoresV1.coins;
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- }>,
+ tx: WalletDbReadWriteTransaction<["coins", "coinAvailability"]>,
transactionId: string,
): Promise<void> {
const coins =
@@ -109,11 +105,9 @@ export async function makeCoinsVisible(
export async function makeCoinAvailable(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- coins: typeof WalletStoresV1.coins;
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- denominations: typeof WalletStoresV1.denominations;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["coins", "coinAvailability", "denominations"]
+ >,
coinRecord: CoinRecord,
): Promise<void> {
checkLogicInvariant(coinRecord.status === CoinStatus.Fresh);
@@ -150,12 +144,9 @@ export async function makeCoinAvailable(
export async function spendCoins(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- coins: typeof WalletStoresV1.coins;
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- denominations: typeof WalletStoresV1.denominations;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["coins", "coinAvailability", "refreshGroups", "denominations"]
+ >,
csi: CoinsSpendInfo,
): Promise<void> {
if (csi.coinPubs.length != csi.contributions.length) {
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index decef7375..415f3cd72 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -133,25 +133,23 @@ export class DepositTransactionContext implements TransactionContext {
const ws = this.ws;
// FIXME: We should check first if we are in a final state
// where deletion is allowed.
- await ws.db
- .mktx((x) => [x.depositGroups, x.tombstones])
- .runReadWrite(async (tx) => {
- const tipRecord = await tx.depositGroups.get(depositGroupId);
- if (tipRecord) {
- await tx.depositGroups.delete(depositGroupId);
- await tx.tombstones.put({
- id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId,
- });
- }
- });
+ await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => {
+ const tipRecord = await tx.depositGroups.get(depositGroupId);
+ if (tipRecord) {
+ await tx.depositGroups.delete(depositGroupId);
+ await tx.tombstones.put({
+ id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId,
+ });
+ }
+ });
return;
}
async suspendTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -184,16 +182,17 @@ export class DepositTransactionContext implements TransactionContext {
oldTxState: oldState,
newTxState: computeDepositTransactionStatus(dg),
};
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -218,7 +217,8 @@ export class DepositTransactionContext implements TransactionContext {
return undefined;
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
@@ -226,9 +226,9 @@ export class DepositTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -261,16 +261,17 @@ export class DepositTransactionContext implements TransactionContext {
oldTxState: oldState,
newTxState: computeDepositTransactionStatus(dg),
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -291,7 +292,8 @@ export class DepositTransactionContext implements TransactionContext {
}
}
return undefined;
- });
+ },
+ );
// FIXME: Also cancel ongoing work (via cancellation token, once implemented)
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
@@ -418,9 +420,9 @@ async function waitForRefreshOnDepositGroup(
tag: TransactionType.Deposit,
depositGroupId: depositGroup.depositGroupId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups, x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups", "refreshGroups"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: DepositOperationStatus | undefined;
if (!refreshGroup) {
@@ -449,7 +451,8 @@ async function waitForRefreshOnDepositGroup(
return { oldTxState, newTxState };
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
@@ -469,13 +472,14 @@ async function refundDepositGroup(
break;
default: {
const coinPub = depositGroup.payCoinSelection.coinPubs[i];
- const coinExchange = await ws.db
- .mktx((x) => [x.coins])
- .runReadOnly(async (tx) => {
+ const coinExchange = await ws.db.runReadOnlyTx(
+ ["coins"],
+ async (tx) => {
const coinRecord = await tx.coins.get(coinPub);
checkDbInvariant(!!coinRecord);
return coinRecord.exchangeBaseUrl;
- });
+ },
+ );
const refundAmount = depositGroup.payCoinSelection.coinContributions[i];
// We use a constant refund transaction ID, since there can
// only be one refund.
@@ -529,15 +533,15 @@ async function refundDepositGroup(
const currency = Amounts.currencyOf(depositGroup.totalPayCost);
- await ws.db
- .mktx((x) => [
- x.depositGroups,
- x.refreshGroups,
- x.coins,
- x.denominations,
- x.coinAvailability,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "depositGroups",
+ "refreshGroups",
+ "coins",
+ "denominations",
+ "coinAvailability",
+ ],
+ async (tx) => {
const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
if (!newDg) {
return;
@@ -565,7 +569,8 @@ async function refundDepositGroup(
newDg.abortRefreshGroupId = rgid.refreshGroupId;
}
await tx.depositGroups.put(newDg);
- });
+ },
+ );
return TaskRunResult.backoff();
}
@@ -622,9 +627,9 @@ async function processDepositGroupPendingKyc(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const newDg = await tx.depositGroups.get(depositGroupId);
if (!newDg) {
return;
@@ -637,7 +642,8 @@ async function processDepositGroupPendingKyc(
const newTxState = computeDepositTransactionStatus(newDg);
await tx.depositGroups.put(newDg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
// FIXME: Do we have to update the URL here?
@@ -680,9 +686,9 @@ async function transitionToKycRequired(
} else if (kycStatusReq.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusReq.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -700,7 +706,8 @@ async function transitionToKycRequired(
await tx.depositGroups.put(dg);
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
} else {
@@ -717,13 +724,14 @@ async function processDepositGroupPendingTrack(
for (let i = 0; i < depositGroup.statusPerCoin.length; i++) {
const coinPub = depositGroup.payCoinSelection.coinPubs[i];
// FIXME: Make the URL part of the coin selection?
- const exchangeBaseUrl = await ws.db
- .mktx((x) => [x.coins])
- .runReadWrite(async (tx) => {
+ const exchangeBaseUrl = await ws.db.runReadWriteTx(
+ ["coins"],
+ async (tx) => {
const coinRecord = await tx.coins.get(coinPub);
checkDbInvariant(!!coinRecord);
return coinRecord.exchangeBaseUrl;
- });
+ },
+ );
let updatedTxStatus: DepositElementStatus | undefined = undefined;
let newWiredCoin:
@@ -793,41 +801,39 @@ async function processDepositGroupPendingTrack(
}
if (updatedTxStatus !== undefined) {
- await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return;
- }
- if (updatedTxStatus !== undefined) {
- dg.statusPerCoin[i] = updatedTxStatus;
- }
- if (newWiredCoin) {
- /**
- * FIXME: if there is a new wire information from the exchange
- * it should add up to the previous tracking states.
- *
- * This may loose information by overriding prev state.
- *
- * And: add checks to integration tests
- */
- if (!dg.trackingState) {
- dg.trackingState = {};
- }
-
- dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+ await ws.db.runReadWriteTx(["depositGroups"], async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ if (updatedTxStatus !== undefined) {
+ dg.statusPerCoin[i] = updatedTxStatus;
+ }
+ if (newWiredCoin) {
+ /**
+ * FIXME: if there is a new wire information from the exchange
+ * it should add up to the previous tracking states.
+ *
+ * This may loose information by overriding prev state.
+ *
+ * And: add checks to integration tests
+ */
+ if (!dg.trackingState) {
+ dg.trackingState = {};
}
- await tx.depositGroups.put(dg);
- });
+
+ dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+ }
+ await tx.depositGroups.put(dg);
+ });
}
}
let allWired = true;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -848,7 +854,8 @@ async function processDepositGroupPendingTrack(
}
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Deposit,
depositGroupId,
@@ -869,11 +876,12 @@ async function processDepositGroupPendingDeposit(
): Promise<TaskRunResult> {
logger.info("processing deposit group in pending(deposit)");
const depositGroupId = depositGroup.depositGroupId;
- const contractTermsRec = await ws.db
- .mktx((x) => [x.contractTerms])
- .runReadOnly(async (tx) => {
+ const contractTermsRec = await ws.db.runReadOnlyTx(
+ ["contractTerms"],
+ async (tx) => {
return tx.contractTerms.get(depositGroup.contractTermsHash);
- });
+ },
+ );
if (!contractTermsRec) {
throw Error("contract terms for deposit not found in database");
}
@@ -954,27 +962,25 @@ async function processDepositGroupPendingDeposit(
codecForBatchDepositSuccess(),
);
- await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return;
- }
- for (const batchIndex of batchIndexes) {
- const coinStatus = dg.statusPerCoin[batchIndex];
- switch (coinStatus) {
- case DepositElementStatus.DepositPending:
- dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
- await tx.depositGroups.put(dg);
- }
+ await ws.db.runReadWriteTx(["depositGroups"], async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ for (const batchIndex of batchIndexes) {
+ const coinStatus = dg.statusPerCoin[batchIndex];
+ switch (coinStatus) {
+ case DepositElementStatus.DepositPending:
+ dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
+ await tx.depositGroups.put(dg);
}
- });
+ }
+ });
}
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -984,7 +990,8 @@ async function processDepositGroupPendingDeposit(
await tx.depositGroups.put(dg);
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.progress();
@@ -998,11 +1005,12 @@ export async function processDepositGroup(
depositGroupId: string,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const depositGroup = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadOnly(async (tx) => {
+ const depositGroup = await ws.db.runReadOnlyTx(
+ ["depositGroups"],
+ async (tx) => {
return tx.depositGroups.get(depositGroupId);
- });
+ },
+ );
if (!depositGroup) {
logger.warn(`deposit group ${depositGroupId} not found`);
return TaskRunResult.finished();
@@ -1030,15 +1038,18 @@ export async function processDepositGroup(
return TaskRunResult.finished();
}
+/**
+ * FIXME: Consider moving this to exchanges.ts.
+ */
async function getExchangeWireFee(
ws: InternalWalletState,
wireType: string,
baseUrl: string,
time: TalerProtocolTimestamp,
): Promise<WireFee> {
- const exchangeDetails = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ const exchangeDetails = await ws.db.runReadOnlyTx(
+ ["exchangeDetails", "exchanges"],
+ async (tx) => {
const ex = await tx.exchanges.get(baseUrl);
if (!ex || !ex.detailsPointer) return undefined;
return await tx.exchangeDetails.indexes.byPointer.get([
@@ -1046,7 +1057,8 @@ async function getExchangeWireFee(
ex.detailsPointer.currency,
ex.detailsPointer.masterPublicKey,
]);
- });
+ },
+ );
if (!exchangeDetails) {
throw Error(`exchange missing: ${baseUrl}`);
@@ -1141,21 +1153,19 @@ export async function prepareDepositGroup(
const exchangeInfos: { url: string; master_pub: string }[] = [];
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- const allExchanges = await tx.exchanges.iter().toArray();
- for (const e of allExchanges) {
- const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
- if (!details || amount.currency !== details.currency) {
- continue;
- }
- exchangeInfos.push({
- master_pub: details.masterPublicKey,
- url: e.baseUrl,
- });
+ await ws.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
+ if (!details || amount.currency !== details.currency) {
+ continue;
}
- });
+ exchangeInfos.push({
+ master_pub: details.masterPublicKey,
+ url: e.baseUrl,
+ });
+ }
+ });
const now = AbsoluteTime.now();
const nowRounded = AbsoluteTime.toProtocolTimestamp(now);
@@ -1255,21 +1265,19 @@ export async function createDepositGroup(
const exchangeInfos: { url: string; master_pub: string }[] = [];
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- const allExchanges = await tx.exchanges.iter().toArray();
- for (const e of allExchanges) {
- const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
- if (!details || amount.currency !== details.currency) {
- continue;
- }
- exchangeInfos.push({
- master_pub: details.masterPublicKey,
- url: e.baseUrl,
- });
+ await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
+ if (!details || amount.currency !== details.currency) {
+ continue;
}
- });
+ exchangeInfos.push({
+ master_pub: details.masterPublicKey,
+ url: e.baseUrl,
+ });
+ }
+ });
const now = AbsoluteTime.now();
const wireDeadline = AbsoluteTime.toProtocolTimestamp(
@@ -1388,17 +1396,17 @@ export async function createDepositGroup(
const ctx = new DepositTransactionContext(ws, depositGroupId);
const transactionId = ctx.transactionId;
- const newTxState = await ws.db
- .mktx((x) => [
- x.depositGroups,
- x.coins,
- x.recoupGroups,
- x.denominations,
- x.refreshGroups,
- x.coinAvailability,
- x.contractTerms,
- ])
- .runReadWrite(async (tx) => {
+ const newTxState = await ws.db.runReadWriteTx(
+ [
+ "depositGroups",
+ "coins",
+ "recoupGroups",
+ "denominations",
+ "refreshGroups",
+ "coinAvailability",
+ "contractTerms",
+ ],
+ async (tx) => {
await spendCoins(ws, tx, {
allocationId: transactionId,
coinPubs: payCoinSel.coinSel.coinPubs,
@@ -1413,7 +1421,8 @@ export async function createDepositGroup(
h: contractTermsHash,
});
return computeDepositTransactionStatus(depositGroup);
- });
+ },
+ );
ws.notify({
type: NotificationType.TransactionStateTransition,
@@ -1450,9 +1459,9 @@ export async function getCounterpartyEffectiveDepositAmount(
const fees: AmountJson[] = [];
const exchangeSet: Set<string> = new Set();
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ ["coins", "denominations", "exchangeDetails", "exchanges"],
+ async (tx) => {
for (let i = 0; i < pcs.coinPubs.length; i++) {
const coin = await tx.coins.get(pcs.coinPubs[i]);
if (!coin) {
@@ -1495,7 +1504,8 @@ export async function getCounterpartyEffectiveDepositAmount(
fees.push(Amounts.parseOrThrow(fee));
}
}
- });
+ },
+ );
return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount;
}
@@ -1515,9 +1525,9 @@ async function getTotalFeesForDepositAmount(
const exchangeSet: Set<string> = new Set();
const currency = Amounts.currencyOf(total);
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ ["coins", "denominations", "exchanges", "exchangeDetails"],
+ async (tx) => {
for (let i = 0; i < pcs.coinPubs.length; i++) {
const coin = await tx.coins.get(pcs.coinPubs[i]);
if (!coin) {
@@ -1575,7 +1585,8 @@ async function getTotalFeesForDepositAmount(
wireFee.push(Amounts.parseOrThrow(fee));
}
}
- });
+ },
+ );
return {
coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount),
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index 460b47e73..678e48fb9 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -99,8 +99,8 @@ import {
ExchangeEntryDbRecordStatus,
ExchangeEntryDbUpdateStatus,
PendingTaskType,
- WalletDbReadOnlyTransactionArr,
- WalletDbReadWriteTransactionArr,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
createRefreshGroup,
createTimeline,
isWithdrawableDenom,
@@ -115,11 +115,7 @@ import {
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { checkDbInvariant } from "../util/invariants.js";
-import {
- DbReadOnlyTransactionArr,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "../util/query.js";
+import { DbReadOnlyTransaction } from "../util/query.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
import {
TaskIdentifiers,
@@ -197,10 +193,7 @@ async function downloadExchangeWithTermsOfService(
* Get exchange details from the database.
*/
async function getExchangeRecordsInternal(
- tx: GetReadOnlyAccess<{
- exchanges: typeof WalletStoresV1.exchanges;
- exchangeDetails: typeof WalletStoresV1.exchangeDetails;
- }>,
+ tx: WalletDbReadOnlyTransaction<["exchanges", "exchangeDetails"]>,
exchangeBaseUrl: string,
): Promise<ExchangeDetailsRecord | undefined> {
const r = await tx.exchanges.get(exchangeBaseUrl);
@@ -220,7 +213,7 @@ async function getExchangeRecordsInternal(
}
export async function getExchangeScopeInfo(
- tx: WalletDbReadOnlyTransactionArr<
+ tx: WalletDbReadOnlyTransaction<
[
"exchanges",
"exchangeDetails",
@@ -243,7 +236,7 @@ export async function getExchangeScopeInfo(
}
async function internalGetExchangeScopeInfo(
- tx: WalletDbReadOnlyTransactionArr<
+ tx: WalletDbReadOnlyTransaction<
["globalCurrencyExchanges", "globalCurrencyAuditors"]
>,
exchangeDetails: ExchangeDetailsRecord,
@@ -284,7 +277,7 @@ async function internalGetExchangeScopeInfo(
}
async function makeExchangeListItem(
- tx: WalletDbReadOnlyTransactionArr<
+ tx: WalletDbReadOnlyTransaction<
["globalCurrencyExchanges", "globalCurrencyAuditors"]
>,
r: ExchangeEntryRecord,
@@ -328,10 +321,7 @@ export interface ExchangeWireDetails {
}
export async function getExchangeWireDetailsInTx(
- tx: GetReadOnlyAccess<{
- exchanges: typeof WalletStoresV1.exchanges;
- exchangeDetails: typeof WalletStoresV1.exchangeDetails;
- }>,
+ tx: WalletDbReadOnlyTransaction<["exchanges", "exchangeDetails"]>,
exchangeBaseUrl: string,
): Promise<ExchangeWireDetails | undefined> {
const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl);
@@ -389,9 +379,9 @@ export async function acceptExchangeTermsOfService(
ws: InternalWalletState,
exchangeBaseUrl: string,
): Promise<void> {
- const notif = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadWrite(async (tx) => {
+ const notif = await ws.db.runReadWriteTx(
+ ["exchangeDetails", "exchanges"],
+ async (tx) => {
const exch = await tx.exchanges.get(exchangeBaseUrl);
if (exch && exch.tosCurrentEtag) {
const oldExchangeState = getExchangeState(exch);
@@ -409,7 +399,8 @@ export async function acceptExchangeTermsOfService(
} satisfies WalletNotification;
}
return undefined;
- });
+ },
+ );
if (notif) {
ws.notify(notif);
}
@@ -537,7 +528,7 @@ async function validateGlobalFees(
* if the DB transaction succeeds.
*/
export async function addPresetExchangeEntry(
- tx: WalletDbReadWriteTransactionArr<["exchanges"]>,
+ tx: WalletDbReadWriteTransaction<["exchanges"]>,
exchangeBaseUrl: string,
currencyHint?: string,
): Promise<{ notification?: WalletNotification }> {
@@ -577,10 +568,7 @@ export async function addPresetExchangeEntry(
async function provideExchangeRecordInTx(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- exchanges: typeof WalletStoresV1.exchanges;
- exchangeDetails: typeof WalletStoresV1.exchangeDetails;
- }>,
+ tx: WalletDbReadWriteTransaction<["exchanges", "exchangeDetails"]>,
baseUrl: string,
): Promise<{
exchange: ExchangeEntryRecord;
@@ -854,55 +842,58 @@ async function startUpdateExchangeEntry(
}`,
);
- const { notification } = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadWrite(async (tx) => {
+ const { notification } = await ws.db.runReadWriteTx(
+ ["exchanges", "exchangeDetails"],
+ async (tx) => {
return provideExchangeRecordInTx(ws, tx, exchangeBaseUrl);
- });
+ },
+ );
if (notification) {
ws.notify(notification);
}
- const { oldExchangeState, newExchangeState, taskId } = await ws.db
- .mktx((x) => [x.exchanges, x.operationRetries])
- .runReadWrite(async (tx) => {
- const r = await tx.exchanges.get(canonBaseUrl);
- if (!r) {
- throw Error("exchange not found");
- }
- const oldExchangeState = getExchangeState(r);
- switch (r.updateStatus) {
- case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
- break;
- case ExchangeEntryDbUpdateStatus.Suspended:
- break;
- case ExchangeEntryDbUpdateStatus.ReadyUpdate:
- break;
- case ExchangeEntryDbUpdateStatus.Ready: {
- const nextUpdateTimestamp = AbsoluteTime.fromPreciseTimestamp(
- timestampPreciseFromDb(r.nextUpdateStamp),
- );
- // Only update if entry is outdated or update is forced.
- if (
- options.forceUpdate ||
- AbsoluteTime.isExpired(nextUpdateTimestamp)
- ) {
- r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate;
+ const { oldExchangeState, newExchangeState, taskId } =
+ await ws.db.runReadWriteTx(
+ ["exchanges", "operationRetries"],
+ async (tx) => {
+ const r = await tx.exchanges.get(canonBaseUrl);
+ if (!r) {
+ throw Error("exchange not found");
+ }
+ const oldExchangeState = getExchangeState(r);
+ switch (r.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ break;
+ case ExchangeEntryDbUpdateStatus.Suspended:
+ break;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ break;
+ case ExchangeEntryDbUpdateStatus.Ready: {
+ const nextUpdateTimestamp = AbsoluteTime.fromPreciseTimestamp(
+ timestampPreciseFromDb(r.nextUpdateStamp),
+ );
+ // Only update if entry is outdated or update is forced.
+ if (
+ options.forceUpdate ||
+ AbsoluteTime.isExpired(nextUpdateTimestamp)
+ ) {
+ r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate;
+ }
+ break;
}
- break;
+ case ExchangeEntryDbUpdateStatus.Initial:
+ r.updateStatus = ExchangeEntryDbUpdateStatus.InitialUpdate;
+ break;
}
- case ExchangeEntryDbUpdateStatus.Initial:
- r.updateStatus = ExchangeEntryDbUpdateStatus.InitialUpdate;
- break;
- }
- await tx.exchanges.put(r);
- const newExchangeState = getExchangeState(r);
- // Reset retries for updating the exchange entry.
- const taskId = TaskIdentifiers.forExchangeUpdate(r);
- await tx.operationRetries.delete(taskId);
- return { oldExchangeState, newExchangeState, taskId };
- });
+ await tx.exchanges.put(r);
+ const newExchangeState = getExchangeState(r);
+ // Reset retries for updating the exchange entry.
+ const taskId = TaskIdentifiers.forExchangeUpdate(r);
+ await tx.operationRetries.delete(taskId);
+ return { oldExchangeState, newExchangeState, taskId };
+ },
+ );
ws.notify({
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl: canonBaseUrl,
@@ -1284,17 +1275,17 @@ export async function updateExchangeFromUrlHandler(
}
}
- const updated = await ws.db
- .mktx((x) => [
- x.exchanges,
- x.exchangeDetails,
- x.exchangeSignKeys,
- x.denominations,
- x.coins,
- x.refreshGroups,
- x.recoupGroups,
- ])
- .runReadWrite(async (tx) => {
+ const updated = await ws.db.runReadWriteTx(
+ [
+ "exchanges",
+ "exchangeDetails",
+ "exchangeSignKeys",
+ "denominations",
+ "coins",
+ "refreshGroups",
+ "recoupGroups",
+ ],
+ async (tx) => {
const r = await tx.exchanges.get(exchangeBaseUrl);
if (!r) {
logger.warn(`exchange ${exchangeBaseUrl} no longer present`);
@@ -1454,7 +1445,8 @@ export async function updateExchangeFromUrlHandler(
oldExchangeState,
newExchangeState,
};
- });
+ },
+ );
if (recoupGroupId) {
const recoupTaskId = constructTaskIdentifier({
@@ -1481,15 +1473,15 @@ export async function updateExchangeFromUrlHandler(
if (refreshCheckNecessary) {
// Do auto-refresh.
- await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.coinAvailability,
- x.refreshGroups,
- x.exchanges,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "coins",
+ "denominations",
+ "coinAvailability",
+ "refreshGroups",
+ "exchanges",
+ ],
+ async (tx) => {
const exchange = await tx.exchanges.get(exchangeBaseUrl);
if (!exchange || !exchange.detailsPointer) {
return;
@@ -1547,7 +1539,8 @@ export async function updateExchangeFromUrlHandler(
AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
);
await tx.exchanges.put(exchange);
- });
+ },
+ );
}
ws.notify({
@@ -1599,11 +1592,12 @@ export async function getExchangePaytoUri(
): Promise<string> {
// We do the update here, since the exchange might not even exist
// yet in our database.
- const details = await ws.db
- .mktx((x) => [x.exchangeDetails, x.exchanges])
- .runReadOnly(async (tx) => {
+ const details = await ws.db.runReadOnlyTx(
+ ["exchanges", "exchangeDetails"],
+ async (tx) => {
return getExchangeRecordsInternal(tx, exchangeBaseUrl);
- });
+ },
+ );
const accounts = details?.wireInfo.accounts ?? [];
for (const account of accounts) {
const res = parsePaytoUri(account.payto_uri);
@@ -1641,15 +1635,13 @@ export async function getExchangeTos(
acceptLanguage,
);
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadWrite(async (tx) => {
- const updateExchangeEntry = await tx.exchanges.get(exchangeBaseUrl);
- if (updateExchangeEntry) {
- updateExchangeEntry.tosCurrentEtag = tosDownload.tosEtag;
- await tx.exchanges.put(updateExchangeEntry);
- }
- });
+ await ws.db.runReadWriteTx(["exchanges"], async (tx) => {
+ const updateExchangeEntry = await tx.exchanges.get(exchangeBaseUrl);
+ if (updateExchangeEntry) {
+ updateExchangeEntry.tosCurrentEtag = tosDownload.tosEtag;
+ await tx.exchanges.put(updateExchangeEntry);
+ }
+ });
return {
acceptedEtag: exch.tosAcceptedEtag,
@@ -1738,7 +1730,7 @@ export async function listExchanges(
*/
export async function markExchangeUsed(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{ exchanges: typeof WalletStoresV1.exchanges }>,
+ tx: WalletDbReadWriteTransaction<["exchanges"]>,
exchangeBaseUrl: string,
): Promise<{ notif: WalletNotification | undefined }> {
exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
@@ -1780,9 +1772,9 @@ export async function getExchangeDetailedInfo(
ws: InternalWalletState,
exchangeBaseurl: string,
): Promise<ExchangeDetailedResponse> {
- const exchange = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails, x.denominations])
- .runReadOnly(async (tx) => {
+ const exchange = await ws.db.runReadOnlyTx(
+ ["exchanges", "exchangeDetails", "denominations"],
+ async (tx) => {
const ex = await tx.exchanges.get(exchangeBaseurl);
const dp = ex?.detailsPointer;
if (!dp) {
@@ -1815,7 +1807,8 @@ export async function getExchangeDetailedInfo(
},
denominations,
};
- });
+ },
+ );
if (!exchange) {
throw Error(`exchange with base url "${exchangeBaseurl}" not found`);
@@ -1930,7 +1923,7 @@ export async function getExchangeDetailedInfo(
async function internalGetExchangeResources(
ws: InternalWalletState,
- tx: DbReadOnlyTransactionArr<
+ tx: DbReadOnlyTransaction<
typeof WalletStoresV1,
["exchanges", "coins", "withdrawalGroups"]
>,
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts
index 1039ac95e..260fc815a 100644
--- a/packages/taler-wallet-core/src/operations/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts
@@ -114,7 +114,8 @@ import {
timestampPreciseToDb,
timestampProtocolFromDb,
timestampProtocolToDb,
- WalletDbReadWriteTransactionArr,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
} from "../index.js";
import {
EXCHANGE_COINS_LOCK,
@@ -123,11 +124,7 @@ import {
import { assertUnreachable } from "../util/assertUnreachable.js";
import { PreviousPayCoins, selectPayCoinsNew } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
-import {
- DbReadWriteTransactionArr,
- GetReadOnlyAccess,
- StoreNames,
-} from "../util/query.js";
+import { DbReadWriteTransaction, StoreNames } from "../util/query.js";
import {
constructTaskIdentifier,
DbRetryInfo,
@@ -197,7 +194,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
opts: { extraStores: StoreNameArray },
f: (
rec: PurchaseRecord,
- tx: DbReadWriteTransactionArr<
+ tx: DbReadWriteTransaction<
typeof WalletStoresV1,
["purchases", ...StoreNameArray]
>,
@@ -233,29 +230,27 @@ export class PayMerchantTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, proposalId } = this;
- await ws.db
- .mktx((x) => [x.purchases, x.tombstones])
- .runReadWrite(async (tx) => {
- let found = false;
- const purchase = await tx.purchases.get(proposalId);
- if (purchase) {
- found = true;
- await tx.purchases.delete(proposalId);
- }
- if (found) {
- await tx.tombstones.put({
- id: TombstoneTag.DeletePayment + ":" + proposalId,
- });
- }
- });
+ await ws.db.runReadWriteTx(["purchases", "tombstones"], async (tx) => {
+ let found = false;
+ const purchase = await tx.purchases.get(proposalId);
+ if (purchase) {
+ found = true;
+ await tx.purchases.delete(proposalId);
+ }
+ if (found) {
+ await tx.tombstones.put({
+ id: TombstoneTag.DeletePayment + ":" + proposalId,
+ });
+ }
+ });
}
async suspendTransaction(): Promise<void> {
const { ws, proposalId, transactionId } = this;
ws.taskScheduler.stopShepherdTask(this.taskId);
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) {
throw Error("purchase not found");
@@ -268,7 +263,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
await tx.purchases.put(purchase);
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -341,9 +337,9 @@ export class PayMerchantTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, proposalId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) {
throw Error("purchase not found");
@@ -356,23 +352,24 @@ export class PayMerchantTransactionContext implements TransactionContext {
await tx.purchases.put(purchase);
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
const { ws, proposalId, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.purchases,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- x.operationRetries,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "purchases",
+ "refreshGroups",
+ "denominations",
+ "coinAvailability",
+ "coins",
+ "operationRetries",
+ ],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) {
throw Error("purchase not found");
@@ -390,7 +387,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.stopShepherdTask(this.taskId);
}
@@ -410,17 +408,15 @@ export class RefundTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, refundGroupId, transactionId } = this;
- await ws.db
- .mktx((x) => [x.refundGroups, x.tombstones])
- .runReadWrite(async (tx) => {
- const refundRecord = await tx.refundGroups.get(refundGroupId);
- if (!refundRecord) {
- return;
- }
- await tx.refundGroups.delete(refundGroupId);
- await tx.tombstones.put({ id: transactionId });
- // FIXME: Also tombstone the refund items, so that they won't reappear.
- });
+ await ws.db.runReadWriteTx(["refundGroups", "tombstones"], async (tx) => {
+ const refundRecord = await tx.refundGroups.get(refundGroupId);
+ if (!refundRecord) {
+ return;
+ }
+ await tx.refundGroups.delete(refundGroupId);
+ await tx.tombstones.put({ id: transactionId });
+ // FIXME: Also tombstone the refund items, so that they won't reappear.
+ });
}
suspendTransaction(): Promise<void> {
@@ -452,46 +448,44 @@ export async function getTotalPaymentCost(
pcs: PayCoinSelection,
): Promise<AmountJson> {
const currency = Amounts.currencyOf(pcs.paymentAmount);
- return ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- const costs: AmountJson[] = [];
- for (let i = 0; i < pcs.coinPubs.length; i++) {
- const coin = await tx.coins.get(pcs.coinPubs[i]);
- if (!coin) {
- throw Error("can't calculate payment cost, coin not found");
- }
- const denom = await tx.denominations.get([
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error(
- "can't calculate payment cost, denomination for coin not found",
- );
- }
- const allDenoms = await getCandidateWithdrawalDenomsTx(
- ws,
- tx,
- coin.exchangeBaseUrl,
- currency,
- );
- const amountLeft = Amounts.sub(
- denom.value,
- pcs.coinContributions[i],
- ).amount;
- const refreshCost = getTotalRefreshCost(
- allDenoms,
- DenominationRecord.toDenomInfo(denom),
- amountLeft,
- ws.config.testing.denomselAllowLate,
+ return ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ const costs: AmountJson[] = [];
+ for (let i = 0; i < pcs.coinPubs.length; i++) {
+ const coin = await tx.coins.get(pcs.coinPubs[i]);
+ if (!coin) {
+ throw Error("can't calculate payment cost, coin not found");
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error(
+ "can't calculate payment cost, denomination for coin not found",
);
- costs.push(Amounts.parseOrThrow(pcs.coinContributions[i]));
- costs.push(refreshCost);
}
- const zero = Amounts.zeroOfAmount(pcs.paymentAmount);
- return Amounts.sum([zero, ...costs]).amount;
- });
+ const allDenoms = await getCandidateWithdrawalDenomsTx(
+ ws,
+ tx,
+ coin.exchangeBaseUrl,
+ currency,
+ );
+ const amountLeft = Amounts.sub(
+ denom.value,
+ pcs.coinContributions[i],
+ ).amount;
+ const refreshCost = getTotalRefreshCost(
+ allDenoms,
+ DenominationRecord.toDenomInfo(denom),
+ amountLeft,
+ ws.config.testing.denomselAllowLate,
+ );
+ costs.push(Amounts.parseOrThrow(pcs.coinContributions[i]));
+ costs.push(refreshCost);
+ }
+ const zero = Amounts.zeroOfAmount(pcs.paymentAmount);
+ return Amounts.sum([zero, ...costs]).amount;
+ });
}
async function failProposalPermanently(
@@ -503,9 +497,9 @@ async function failProposalPermanently(
tag: TransactionType.Payment,
proposalId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
return;
@@ -516,7 +510,8 @@ async function failProposalPermanently(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -543,9 +538,7 @@ function getPayRequestTimeout(purchase: PurchaseRecord): Duration {
export async function expectProposalDownload(
ws: InternalWalletState,
p: PurchaseRecord,
- parentTx?: GetReadOnlyAccess<{
- contractTerms: typeof WalletStoresV1.contractTerms;
- }>,
+ parentTx?: WalletDbReadOnlyTransaction<["contractTerms"]>,
): Promise<{
contractData: WalletContractData;
contractTermsRaw: any;
@@ -577,9 +570,7 @@ export async function expectProposalDownload(
if (parentTx) {
return getFromTransaction(parentTx);
}
- return await ws.db
- .mktx((x) => [x.contractTerms])
- .runReadOnly(getFromTransaction);
+ return await ws.db.runReadOnlyTx(["contractTerms"], getFromTransaction);
}
export function extractContractData(
@@ -626,11 +617,9 @@ async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
): Promise<TaskRunResult> {
- const proposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return await tx.purchases.get(proposalId);
- });
+ const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return await tx.purchases.get(proposalId);
+ });
if (!proposal) {
return TaskRunResult.finished();
@@ -666,11 +655,12 @@ async function processDownloadProposal(
}
const opId = TaskIdentifiers.forPay(proposal);
- const retryRecord = await ws.db
- .mktx((x) => [x.operationRetries])
- .runReadOnly(async (tx) => {
+ const retryRecord = await ws.db.runReadOnlyTx(
+ ["operationRetries"],
+ async (tx) => {
return tx.operationRetries.get(opId);
- });
+ },
+ );
const httpResponse = await ws.http.fetch(orderClaimUrl, {
method: "POST",
@@ -807,9 +797,9 @@ async function processDownloadProposal(
logger.trace(`extracted contract data: ${j2s(contractData)}`);
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases, x.contractTerms])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases", "contractTerms"],
+ async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
return;
@@ -855,7 +845,8 @@ async function processDownloadProposal(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
@@ -875,14 +866,12 @@ async function createOrReusePurchase(
claimToken: string | undefined,
noncePriv: string | undefined,
): Promise<string> {
- const oldProposals = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.indexes.byUrlAndOrderId.getAll([
- merchantBaseUrl,
- orderId,
- ]);
- });
+ const oldProposals = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.indexes.byUrlAndOrderId.getAll([
+ merchantBaseUrl,
+ orderId,
+ ]);
+ });
const oldProposal = oldProposals.find((p) => {
return (
@@ -911,9 +900,9 @@ async function createOrReusePurchase(
if (paid) {
// if this transaction was shared and the order is paid then it
// means that another wallet already paid the proposal
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(oldProposal.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -924,7 +913,8 @@ async function createOrReusePurchase(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Payment,
@@ -976,9 +966,9 @@ async function createOrReusePurchase(
shared: shared,
};
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
await tx.purchases.put(proposalRecord);
const oldTxState: TransactionState = {
major: TransactionMajorState.None,
@@ -988,7 +978,8 @@ async function createOrReusePurchase(
oldTxState,
newTxState,
};
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Payment,
@@ -1009,9 +1000,9 @@ async function storeFirstPaySuccess(
proposalId,
});
const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now());
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases, x.contractTerms])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["contractTerms", "purchases"],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) {
@@ -1059,7 +1050,8 @@ async function storeFirstPaySuccess(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -1072,9 +1064,9 @@ async function storePayReplaySuccess(
tag: TransactionType.Payment,
proposalId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) {
@@ -1096,7 +1088,8 @@ async function storePayReplaySuccess(
await tx.purchases.put(purchase);
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -1115,11 +1108,9 @@ async function handleInsufficientFunds(
): Promise<void> {
logger.trace("handling insufficient funds, trying to re-select coins");
- const proposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!proposal) {
return;
}
@@ -1156,34 +1147,32 @@ async function handleInsufficientFunds(
const payCoinSelection = payInfo.payCoinSelection;
- await ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- for (let i = 0; i < payCoinSelection.coinPubs.length; i++) {
- const coinPub = payCoinSelection.coinPubs[i];
- if (coinPub === brokenCoinPub) {
- continue;
- }
- const contrib = payCoinSelection.coinContributions[i];
- const coin = await tx.coins.get(coinPub);
- if (!coin) {
- continue;
- }
- const denom = await tx.denominations.get([
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- continue;
- }
- prevPayCoins.push({
- coinPub,
- contribution: Amounts.parseOrThrow(contrib),
- exchangeBaseUrl: coin.exchangeBaseUrl,
- feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit),
- });
+ await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ for (let i = 0; i < payCoinSelection.coinPubs.length; i++) {
+ const coinPub = payCoinSelection.coinPubs[i];
+ if (coinPub === brokenCoinPub) {
+ continue;
}
- });
+ const contrib = payCoinSelection.coinContributions[i];
+ const coin = await tx.coins.get(coinPub);
+ if (!coin) {
+ continue;
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ continue;
+ }
+ prevPayCoins.push({
+ coinPub,
+ contribution: Amounts.parseOrThrow(contrib),
+ exchangeBaseUrl: coin.exchangeBaseUrl,
+ feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit),
+ });
+ }
+ });
const res = await selectPayCoinsNew(ws, {
auditors: [],
@@ -1204,15 +1193,15 @@ async function handleInsufficientFunds(
logger.trace("re-selected coins");
- await ws.db
- .mktx((x) => [
- x.purchases,
- x.coins,
- x.coinAvailability,
- x.denominations,
- x.refreshGroups,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "purchases",
+ "coins",
+ "coinAvailability",
+ "denominations",
+ "refreshGroups",
+ ],
+ async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
return;
@@ -1236,7 +1225,8 @@ async function handleInsufficientFunds(
),
refreshReason: RefreshReason.PayMerchant,
});
- });
+ },
+ );
ws.notify({
type: NotificationType.BalanceChange,
@@ -1255,11 +1245,9 @@ async function checkPaymentByProposalId(
proposalId: string,
sessionId?: string,
): Promise<PreparePayResult> {
- let proposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ let proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!proposal) {
throw Error(`could not get proposal ${proposalId}`);
}
@@ -1267,11 +1255,12 @@ async function checkPaymentByProposalId(
const existingProposalId = proposal.repurchaseProposalId;
if (existingProposalId) {
logger.trace("using existing purchase for same product");
- const oldProposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
+ const oldProposal = await ws.db.runReadOnlyTx(
+ ["purchases"],
+ async (tx) => {
return tx.purchases.get(existingProposalId);
- });
+ },
+ );
if (oldProposal) {
proposal = oldProposal;
}
@@ -1299,11 +1288,9 @@ async function checkPaymentByProposalId(
});
// First check if we already paid for it.
- const purchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (
!purchase ||
@@ -1363,9 +1350,9 @@ async function checkPaymentByProposalId(
"automatically re-submitting payment with different session ID",
);
logger.trace(`last: ${purchase.lastSessionId}, current: ${sessionId}`);
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
return;
@@ -1376,7 +1363,8 @@ async function checkPaymentByProposalId(
await tx.purchases.put(p);
const newTxState = computePayMerchantTransactionState(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(ctx.taskId);
@@ -1440,11 +1428,9 @@ export async function getContractTermsDetails(
ws: InternalWalletState,
proposalId: string,
): Promise<WalletContractData> {
- const proposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!proposal) {
throw Error(`proposal with id ${proposalId} not found`);
@@ -1630,26 +1616,24 @@ export async function generateDepositPermissions(
coin: CoinRecord;
denom: DenominationRecord;
}> = [];
- await ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- for (let i = 0; i < payCoinSel.coinPubs.length; i++) {
- const coin = await tx.coins.get(payCoinSel.coinPubs[i]);
- if (!coin) {
- throw Error("can't pay, allocated coin not found anymore");
- }
- const denom = await tx.denominations.get([
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error(
- "can't pay, denomination of allocated coin not found anymore",
- );
- }
- coinWithDenom.push({ coin, denom });
+ await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ for (let i = 0; i < payCoinSel.coinPubs.length; i++) {
+ const coin = await tx.coins.get(payCoinSel.coinPubs[i]);
+ if (!coin) {
+ throw Error("can't pay, allocated coin not found anymore");
}
- });
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error(
+ "can't pay, denomination of allocated coin not found anymore",
+ );
+ }
+ coinWithDenom.push({ coin, denom });
+ }
+ });
for (let i = 0; i < payCoinSel.coinPubs.length; i++) {
const { coin, denom } = coinWithDenom[i];
@@ -1805,11 +1789,9 @@ export async function confirmPay(
logger.trace(
`executing confirmPay with proposalId ${proposalId} and sessionIdOverride ${sessionIdOverride}`,
);
- const proposal = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!proposal) {
throw Error(`proposal with id ${proposalId} not found`);
@@ -1820,9 +1802,9 @@ export async function confirmPay(
throw Error("proposal is in invalid state");
}
- const existingPurchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const existingPurchase = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (
purchase &&
@@ -1837,7 +1819,8 @@ export async function confirmPay(
await tx.purchases.put(purchase);
}
return purchase;
- });
+ },
+ );
if (existingPurchase && existingPurchase.payInfo) {
logger.trace("confirmPay: submitting payment for existing purchase");
@@ -1890,15 +1873,15 @@ export async function confirmPay(
`recording payment on ${proposal.orderId} with session ID ${sessionId}`,
);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.purchases,
- x.coins,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "purchases",
+ "coins",
+ "refreshGroups",
+ "denominations",
+ "coinAvailability",
+ ],
+ async (tx) => {
const p = await tx.purchases.get(proposal.proposalId);
if (!p) {
return;
@@ -1936,7 +1919,8 @@ export async function confirmPay(
}
const newTxState = computePayMerchantTransactionState(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.notify({
@@ -1952,11 +1936,9 @@ export async function processPurchase(
ws: InternalWalletState,
proposalId: string,
): Promise<TaskRunResult> {
- const purchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
return {
type: TaskRunResultType.Error,
@@ -2013,11 +1995,9 @@ async function processPurchasePay(
ws: InternalWalletState,
proposalId: string,
): Promise<TaskRunResult> {
- const purchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
+ const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
return {
type: TaskRunResultType.Error,
@@ -2051,9 +2031,9 @@ async function processPurchasePay(
const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
if (paid) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2064,7 +2044,8 @@ async function processPurchasePay(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Payment,
proposalId,
@@ -2213,9 +2194,9 @@ export async function refuseProposal(
tag: TransactionType.Payment,
proposalId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const proposal = await tx.purchases.get(proposalId);
if (!proposal) {
logger.trace(`proposal ${proposalId} not found, won't refuse proposal`);
@@ -2232,7 +2213,8 @@ export async function refuseProposal(
const newTxState = computePayMerchantTransactionState(proposal);
await tx.purchases.put(proposal);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -2476,36 +2458,34 @@ export async function sharePayment(
merchantBaseUrl: string,
orderId: string,
): Promise<SharePaymentResult> {
- const result = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
- const p = await tx.purchases.indexes.byUrlAndOrderId.get([
- merchantBaseUrl,
- orderId,
- ]);
- if (!p) {
- logger.warn("purchase does not exist anymore");
- return undefined;
- }
- if (
- p.purchaseStatus !== PurchaseStatus.DialogProposed &&
- p.purchaseStatus !== PurchaseStatus.DialogShared
- ) {
- // FIXME: purchase can be shared before being paid
- return undefined;
- }
- if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
- p.purchaseStatus = PurchaseStatus.DialogShared;
- p.shared = true;
- tx.purchases.put(p);
- }
+ const result = await ws.db.runReadWriteTx(["purchases"], async (tx) => {
+ const p = await tx.purchases.indexes.byUrlAndOrderId.get([
+ merchantBaseUrl,
+ orderId,
+ ]);
+ if (!p) {
+ logger.warn("purchase does not exist anymore");
+ return undefined;
+ }
+ if (
+ p.purchaseStatus !== PurchaseStatus.DialogProposed &&
+ p.purchaseStatus !== PurchaseStatus.DialogShared
+ ) {
+ // FIXME: purchase can be shared before being paid
+ return undefined;
+ }
+ if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
+ p.purchaseStatus = PurchaseStatus.DialogShared;
+ p.shared = true;
+ tx.purchases.put(p);
+ }
- return {
- nonce: p.noncePriv,
- session: p.lastSessionId ?? p.downloadSessionId,
- token: p.claimToken,
- };
- });
+ return {
+ nonce: p.noncePriv,
+ session: p.lastSessionId ?? p.downloadSessionId,
+ token: p.claimToken,
+ };
+ });
if (result === undefined) {
throw Error("This purchase can't be shared");
@@ -2560,9 +2540,9 @@ async function processPurchaseDialogShared(
const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
if (paid) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2573,7 +2553,8 @@ async function processPurchaseDialogShared(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Payment,
proposalId,
@@ -2612,9 +2593,9 @@ async function processPurchaseAutoRefund(
),
)
) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2629,7 +2610,8 @@ async function processPurchaseAutoRefund(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
}
@@ -2656,9 +2638,9 @@ async function processPurchaseAutoRefund(
);
if (orderStatus.refund_pending) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2672,7 +2654,8 @@ async function processPurchaseAutoRefund(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -2699,22 +2682,18 @@ async function processPurchaseAbortingRefund(
throw Error("can't abort, no coins selected");
}
- await ws.db
- .mktx((x) => [x.coins])
- .runReadOnly(async (tx) => {
- for (let i = 0; i < payCoinSelection.coinPubs.length; i++) {
- const coinPub = payCoinSelection.coinPubs[i];
- const coin = await tx.coins.get(coinPub);
- checkDbInvariant(!!coin, "expected coin to be present");
- abortingCoins.push({
- coin_pub: coinPub,
- contribution: Amounts.stringify(
- payCoinSelection.coinContributions[i],
- ),
- exchange_url: coin.exchangeBaseUrl,
- });
- }
- });
+ await ws.db.runReadOnlyTx(["coins"], async (tx) => {
+ for (let i = 0; i < payCoinSelection.coinPubs.length; i++) {
+ const coinPub = payCoinSelection.coinPubs[i];
+ const coin = await tx.coins.get(coinPub);
+ checkDbInvariant(!!coin, "expected coin to be present");
+ abortingCoins.push({
+ coin_pub: coinPub,
+ contribution: Amounts.stringify(payCoinSelection.coinContributions[i]),
+ exchange_url: coin.exchangeBaseUrl,
+ });
+ }
+ });
const abortReq: AbortRequest = {
h_contract: download.contractData.contractTermsHash,
@@ -2805,9 +2784,9 @@ async function processPurchaseQueryRefund(
});
if (!orderStatus.refund_pending) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2822,7 +2801,8 @@ async function processPurchaseQueryRefund(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.progress();
} else {
@@ -2831,9 +2811,9 @@ async function processPurchaseQueryRefund(
Amounts.parseOrThrow(orderStatus.refund_taken),
).amount;
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(purchase.proposalId);
if (!p) {
logger.warn("purchase does not exist anymore");
@@ -2848,7 +2828,8 @@ async function processPurchaseQueryRefund(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.progress();
}
@@ -2897,14 +2878,15 @@ export async function startRefundQueryForUri(
if (parsedUri.type !== TalerUriAction.Refund) {
throw Error("expected taler://refund URI");
}
- const purchaseRecord = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
+ const purchaseRecord = await ws.db.runReadOnlyTx(
+ ["purchases"],
+ async (tx) => {
return tx.purchases.indexes.byUrlAndOrderId.get([
parsedUri.merchantBaseUrl,
parsedUri.orderId,
]);
- });
+ },
+ );
if (!purchaseRecord) {
logger.error(
`no purchase for order ID "${parsedUri.orderId}" from merchant "${parsedUri.merchantBaseUrl}" when processing "${talerUri}"`,
@@ -2927,9 +2909,9 @@ export async function startQueryRefund(
proposalId: string,
): Promise<void> {
const ctx = new PayMerchantTransactionContext(ws, proposalId);
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["purchases"],
+ async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
logger.warn(`purchase ${proposalId} does not exist anymore`);
@@ -2943,14 +2925,15 @@ export async function startQueryRefund(
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, ctx.transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(ctx.taskId);
}
async function computeRefreshRequest(
ws: InternalWalletState,
- tx: WalletDbReadWriteTransactionArr<["coins", "denominations"]>,
+ tx: WalletDbReadWriteTransaction<["coins", "denominations"]>,
items: RefundItemRecord[],
): Promise<CoinRefreshRequest[]> {
const refreshCoins: CoinRefreshRequest[] = [];
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-common.ts b/packages/taler-wallet-core/src/operations/pay-peer-common.ts
index 88eedb530..ae6f98ccd 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-common.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-common.ts
@@ -30,11 +30,7 @@ import {
codecOptional,
} from "@gnu-taler/taler-util";
import { SpendCoinDetails } from "../crypto/cryptoImplementation.js";
-import {
- DenominationRecord,
- PeerPushPaymentCoinSelection,
- ReserveRecord,
-} from "../db.js";
+import { PeerPushPaymentCoinSelection, ReserveRecord } from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import type { SelectedPeerCoin } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
@@ -51,33 +47,31 @@ export async function queryCoinInfosForSelection(
csel: PeerPushPaymentCoinSelection,
): Promise<SpendCoinDetails[]> {
let infos: SpendCoinDetails[] = [];
- await ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- for (let i = 0; i < csel.coinPubs.length; i++) {
- const coin = await tx.coins.get(csel.coinPubs[i]);
- if (!coin) {
- throw Error("coin not found anymore");
- }
- const denom = await ws.getDenomInfo(
- ws,
- tx,
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- );
- if (!denom) {
- throw Error("denom for coin not found anymore");
- }
- infos.push({
- coinPriv: coin.coinPriv,
- coinPub: coin.coinPub,
- denomPubHash: coin.denomPubHash,
- denomSig: coin.denomSig,
- ageCommitmentProof: coin.ageCommitmentProof,
- contribution: csel.contributions[i],
- });
+ await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ for (let i = 0; i < csel.coinPubs.length; i++) {
+ const coin = await tx.coins.get(csel.coinPubs[i]);
+ if (!coin) {
+ throw Error("coin not found anymore");
+ }
+ const denom = await ws.getDenomInfo(
+ ws,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ if (!denom) {
+ throw Error("denom for coin not found anymore");
}
- });
+ infos.push({
+ coinPriv: coin.coinPriv,
+ coinPub: coin.coinPub,
+ denomPubHash: coin.denomPubHash,
+ denomSig: coin.denomSig,
+ ageCommitmentProof: coin.ageCommitmentProof,
+ contribution: csel.contributions[i],
+ });
+ }
+ });
return infos;
}
@@ -86,48 +80,46 @@ export async function getTotalPeerPaymentCost(
pcs: SelectedPeerCoin[],
): Promise<AmountJson> {
const currency = Amounts.currencyOf(pcs[0].contribution);
- return ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- const costs: AmountJson[] = [];
- for (let i = 0; i < pcs.length; i++) {
- const coin = await tx.coins.get(pcs[i].coinPub);
- if (!coin) {
- throw Error("can't calculate payment cost, coin not found");
- }
- const denomInfo = await ws.getDenomInfo(
- ws,
- tx,
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- );
- if (!denomInfo) {
- throw Error(
- "can't calculate payment cost, denomination for coin not found",
- );
- }
- const allDenoms = await getCandidateWithdrawalDenomsTx(
- ws,
- tx,
- coin.exchangeBaseUrl,
- currency,
- );
- const amountLeft = Amounts.sub(
- denomInfo.value,
- pcs[i].contribution,
- ).amount;
- const refreshCost = getTotalRefreshCost(
- allDenoms,
- denomInfo,
- amountLeft,
- ws.config.testing.denomselAllowLate,
+ return ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ const costs: AmountJson[] = [];
+ for (let i = 0; i < pcs.length; i++) {
+ const coin = await tx.coins.get(pcs[i].coinPub);
+ if (!coin) {
+ throw Error("can't calculate payment cost, coin not found");
+ }
+ const denomInfo = await ws.getDenomInfo(
+ ws,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ if (!denomInfo) {
+ throw Error(
+ "can't calculate payment cost, denomination for coin not found",
);
- costs.push(Amounts.parseOrThrow(pcs[i].contribution));
- costs.push(refreshCost);
}
- const zero = Amounts.zeroOfAmount(pcs[0].contribution);
- return Amounts.sum([zero, ...costs]).amount;
- });
+ const allDenoms = await getCandidateWithdrawalDenomsTx(
+ ws,
+ tx,
+ coin.exchangeBaseUrl,
+ currency,
+ );
+ const amountLeft = Amounts.sub(
+ denomInfo.value,
+ pcs[i].contribution,
+ ).amount;
+ const refreshCost = getTotalRefreshCost(
+ allDenoms,
+ denomInfo,
+ amountLeft,
+ ws.config.testing.denomselAllowLate,
+ );
+ costs.push(Amounts.parseOrThrow(pcs[i].contribution));
+ costs.push(refreshCost);
+ }
+ const zero = Amounts.zeroOfAmount(pcs[0].contribution);
+ return Amounts.sum([zero, ...costs]).amount;
+ });
}
interface ExchangePurseStatus {
@@ -153,9 +145,9 @@ export async function getMergeReserveInfo(
// due to the async crypto API.
const newReservePair = await ws.cryptoApi.createEddsaKeypair({});
- const mergeReserveRecord: ReserveRecord = await ws.db
- .mktx((x) => [x.exchanges, x.reserves, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const mergeReserveRecord: ReserveRecord = await ws.db.runReadWriteTx(
+ ["exchanges", "reserves"],
+ async (tx) => {
const ex = await tx.exchanges.get(req.exchangeBaseUrl);
checkDbInvariant(!!ex);
if (ex.currentMergeReserveRowId != null) {
@@ -173,7 +165,8 @@ export async function getMergeReserveInfo(
ex.currentMergeReserveRowId = reserve.rowId;
await tx.exchanges.put(ex);
return reserve;
- });
+ },
+ );
return mergeReserveRecord;
}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index cc41abde9..e97466084 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -108,9 +108,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, pursePub } = this;
- await ws.db
- .mktx((x) => [x.withdrawalGroups, x.peerPullCredit, x.tombstones])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["withdrawalGroups", "peerPullCredit", "tombstones"],
+ async (tx) => {
const pullIni = await tx.peerPullCredit.get(pursePub);
if (!pullIni) {
return;
@@ -130,16 +130,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
await tx.tombstones.put({
id: TombstoneTag.DeletePeerPullCredit + ":" + pursePub,
});
- });
+ },
+ );
return;
}
async suspendTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
if (!pullCreditRec) {
logger.warn(`peer pull credit ${pursePub} not found`);
@@ -189,16 +190,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
if (!pullCreditRec) {
logger.warn(`peer pull credit ${pursePub} not found`);
@@ -239,16 +241,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.stopShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
if (!pullCreditRec) {
logger.warn(`peer pull credit ${pursePub} not found`);
@@ -297,16 +300,17 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const pullCreditRec = await tx.peerPullCredit.get(pursePub);
if (!pullCreditRec) {
logger.warn(`peer pull credit ${pursePub} not found`);
@@ -350,7 +354,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
@@ -382,9 +387,9 @@ async function queryPurseForPeerPullCredit(
switch (resp.status) {
case HttpStatusCode.Gone: {
// Exchange says that purse doesn't exist anymore => expired!
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
if (!finPi) {
logger.warn("peerPullCredit not found anymore");
@@ -397,7 +402,8 @@ async function queryPurseForPeerPullCredit(
await tx.peerPullCredit.put(finPi);
const newTxState = computePeerPullCreditTransactionState(finPi);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
@@ -419,11 +425,9 @@ async function queryPurseForPeerPullCredit(
return TaskRunResult.backoff();
}
- const reserve = await ws.db
- .mktx((x) => [x.reserves])
- .runReadOnly(async (tx) => {
- return await tx.reserves.get(pullIni.mergeReserveRowId);
- });
+ const reserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => {
+ return await tx.reserves.get(pullIni.mergeReserveRowId);
+ });
if (!reserve) {
throw Error("reserve for peer pull credit not found in wallet DB");
@@ -443,9 +447,9 @@ async function queryPurseForPeerPullCredit(
pub: reserve.reservePub,
},
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
if (!finPi) {
logger.warn("peerPullCredit not found anymore");
@@ -458,7 +462,8 @@ async function queryPurseForPeerPullCredit(
await tx.peerPullCredit.put(finPi);
const newTxState = computePeerPullCreditTransactionState(finPi);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
@@ -496,9 +501,9 @@ async function longpollKycStatus(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const peerIni = await tx.peerPullCredit.get(pursePub);
if (!peerIni) {
return;
@@ -513,7 +518,8 @@ async function longpollKycStatus(
const newTxState = computePeerPullCreditTransactionState(peerIni);
await tx.peerPullCredit.put(peerIni);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
// FIXME: Do we have to update the URL here?
@@ -545,15 +551,15 @@ async function processPeerPullCreditAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.peerPullCredit,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "peerPullCredit",
+ "refreshGroups",
+ "denominations",
+ "coinAvailability",
+ "coins",
+ ],
+ async (tx) => {
const ppiRec = await tx.peerPullCredit.get(pursePub);
if (!ppiRec) {
return undefined;
@@ -569,7 +575,8 @@ async function processPeerPullCreditAbortingDeletePurse(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
@@ -588,9 +595,9 @@ async function handlePeerPullCreditWithdrawing(
});
const wgId = pullIni.withdrawalGroupId;
let finished: boolean = false;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit", "withdrawalGroups"],
+ async (tx) => {
const ppi = await tx.peerPullCredit.get(pullIni.pursePub);
if (!ppi) {
finished = true;
@@ -619,7 +626,8 @@ async function handlePeerPullCreditWithdrawing(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
if (finished) {
return TaskRunResult.finished();
@@ -635,21 +643,20 @@ async function handlePeerPullCreditCreatePurse(
): Promise<TaskRunResult> {
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const pursePub = pullIni.pursePub;
- const mergeReserve = await ws.db
- .mktx((x) => [x.reserves])
- .runReadOnly(async (tx) => {
- return tx.reserves.get(pullIni.mergeReserveRowId);
- });
+ const mergeReserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => {
+ return tx.reserves.get(pullIni.mergeReserveRowId);
+ });
if (!mergeReserve) {
throw Error("merge reserve for peer pull payment not found in database");
}
- const contractTermsRecord = await ws.db
- .mktx((x) => [x.contractTerms])
- .runReadOnly(async (tx) => {
+ const contractTermsRecord = await ws.db.runReadOnlyTx(
+ ["contractTerms"],
+ async (tx) => {
return tx.contractTerms.get(pullIni.contractTermsHash);
- });
+ },
+ );
if (!contractTermsRecord) {
throw Error("contract terms for peer pull payment not found in database");
@@ -731,9 +738,9 @@ async function handlePeerPullCreditCreatePurse(
pursePub: pullIni.pursePub,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const pi2 = await tx.peerPullCredit.get(pursePub);
if (!pi2) {
return;
@@ -743,7 +750,8 @@ async function handlePeerPullCreditCreatePurse(
await tx.peerPullCredit.put(pi2);
const newTxState = computePeerPullCreditTransactionState(pi2);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
@@ -753,11 +761,9 @@ export async function processPeerPullCredit(
pursePub: string,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const pullIni = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadOnly(async (tx) => {
- return tx.peerPullCredit.get(pursePub);
- });
+ const pullIni = await ws.db.runReadOnlyTx(["peerPullCredit"], async (tx) => {
+ return tx.peerPullCredit.get(pursePub);
+ });
if (!pullIni) {
throw Error("peer pull payment initiation not found in database");
}
@@ -843,9 +849,9 @@ async function processPeerPullCreditKycRequired(
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusRes.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const { transitionInfo, result } = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
+ const { transitionInfo, result } = await ws.db.runReadWriteTx(
+ ["peerPullCredit"],
+ async (tx) => {
const peerInc = await tx.peerPullCredit.get(pursePub);
if (!peerInc) {
return {
@@ -877,7 +883,8 @@ async function processPeerPullCreditKycRequired(
transitionInfo: { oldTxState, newTxState },
result: res,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
} else {
@@ -943,44 +950,42 @@ async function getPreferredExchangeForCurrency(
): Promise<string | undefined> {
// Find an exchange with the matching currency.
// Prefer exchanges with the most recent withdrawal.
- const url = await ws.db
- .mktx((x) => [x.exchanges])
- .runReadOnly(async (tx) => {
- const exchanges = await tx.exchanges.iter().toArray();
- let candidate = undefined;
- for (const e of exchanges) {
- if (e.detailsPointer?.currency !== currency) {
- continue;
- }
- if (!candidate) {
+ const url = await ws.db.runReadOnlyTx(["exchanges"], async (tx) => {
+ const exchanges = await tx.exchanges.iter().toArray();
+ let candidate = undefined;
+ for (const e of exchanges) {
+ if (e.detailsPointer?.currency !== currency) {
+ continue;
+ }
+ if (!candidate) {
+ candidate = e;
+ continue;
+ }
+ if (candidate.lastWithdrawal && !e.lastWithdrawal) {
+ continue;
+ }
+ const exchangeLastWithdrawal = timestampOptionalPreciseFromDb(
+ e.lastWithdrawal,
+ );
+ const candidateLastWithdrawal = timestampOptionalPreciseFromDb(
+ candidate.lastWithdrawal,
+ );
+ if (exchangeLastWithdrawal && candidateLastWithdrawal) {
+ if (
+ AbsoluteTime.cmp(
+ AbsoluteTime.fromPreciseTimestamp(exchangeLastWithdrawal),
+ AbsoluteTime.fromPreciseTimestamp(candidateLastWithdrawal),
+ ) > 0
+ ) {
candidate = e;
- continue;
- }
- if (candidate.lastWithdrawal && !e.lastWithdrawal) {
- continue;
- }
- const exchangeLastWithdrawal = timestampOptionalPreciseFromDb(
- e.lastWithdrawal,
- );
- const candidateLastWithdrawal = timestampOptionalPreciseFromDb(
- candidate.lastWithdrawal,
- );
- if (exchangeLastWithdrawal && candidateLastWithdrawal) {
- if (
- AbsoluteTime.cmp(
- AbsoluteTime.fromPreciseTimestamp(exchangeLastWithdrawal),
- AbsoluteTime.fromPreciseTimestamp(candidateLastWithdrawal),
- ) > 0
- ) {
- candidate = e;
- }
}
}
- if (candidate) {
- return candidate.baseUrl;
- }
- return undefined;
- });
+ }
+ if (candidate) {
+ return candidate.baseUrl;
+ }
+ return undefined;
+ });
return url;
}
@@ -1036,9 +1041,9 @@ export async function initiatePeerPullPayment(
const mergeTimestamp = TalerPreciseTimestamp.now();
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit, x.contractTerms])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullCredit", "contractTerms"],
+ async (tx) => {
const ppi: PeerPullCreditRecord = {
amount: req.partialContractTerms.amount,
contractTermsHash: hContractTerms,
@@ -1066,7 +1071,8 @@ export async function initiatePeerPullPayment(
h: hContractTerms,
});
return { oldTxState, newTxState };
- });
+ },
+ );
const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub);
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index e5ae6b73b..1504f3d83 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -63,7 +63,7 @@ import {
readTalerErrorResponse,
} from "@gnu-taler/taler-util/http";
import {
- DbReadWriteTransactionArr,
+ DbReadWriteTransaction,
InternalWalletState,
PeerPullDebitRecordStatus,
PeerPullPaymentIncomingRecord,
@@ -125,15 +125,13 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const transactionId = this.transactionId;
const ws = this.ws;
const peerPullDebitId = this.peerPullDebitId;
- await ws.db
- .mktx((x) => [x.peerPullDebit, x.tombstones])
- .runReadWrite(async (tx) => {
- const debit = await tx.peerPullDebit.get(peerPullDebitId);
- if (debit) {
- await tx.peerPullDebit.delete(peerPullDebitId);
- await tx.tombstones.put({ id: transactionId });
- }
- });
+ await ws.db.runReadWriteTx(["peerPullDebit", "tombstones"], async (tx) => {
+ const debit = await tx.peerPullDebit.get(peerPullDebitId);
+ if (debit) {
+ await tx.peerPullDebit.delete(peerPullDebitId);
+ await tx.tombstones.put({ id: transactionId });
+ }
+ });
}
async suspendTransaction(): Promise<void> {
@@ -141,9 +139,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const transactionId = this.transactionId;
const ws = this.ws;
const peerPullDebitId = this.peerPullDebitId;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullDebit"],
+ async (tx) => {
const pullDebitRec = await tx.peerPullDebit.get(peerPullDebitId);
if (!pullDebitRec) {
logger.warn(`peer pull debit ${peerPullDebitId} not found`);
@@ -183,7 +181,8 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.stopShepherdTask(taskId);
}
@@ -295,7 +294,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
opts: { extraStores: StoreNameArray },
f: (
rec: PeerPullPaymentIncomingRecord,
- tx: DbReadWriteTransactionArr<
+ tx: DbReadWriteTransaction<
typeof WalletStoresV1,
["peerPullDebit", ...StoreNameArray]
>,
@@ -498,9 +497,9 @@ async function processPeerPullDebitAbortingRefresh(
tag: TransactionType.PeerPullDebit,
peerPullDebitId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups, x.peerPullDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPullDebit", "refreshGroups"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPullDebitRecordStatus | undefined;
if (!refreshGroup) {
@@ -529,7 +528,8 @@ async function processPeerPullDebitAbortingRefresh(
return { oldTxState, newTxState };
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
@@ -539,11 +539,12 @@ export async function processPeerPullDebit(
ws: InternalWalletState,
peerPullDebitId: string,
): Promise<TaskRunResult> {
- const peerPullInc = await ws.db
- .mktx((x) => [x.peerPullDebit])
- .runReadOnly(async (tx) => {
+ const peerPullInc = await ws.db.runReadOnlyTx(
+ ["peerPullDebit"],
+ async (tx) => {
return tx.peerPullDebit.get(peerPullDebitId);
- });
+ },
+ );
if (!peerPullInc) {
throw Error("peer pull debit not found");
}
@@ -575,11 +576,12 @@ export async function confirmPeerPullDebit(
throw Error("invalid request, transactionId or peerPullDebitId required");
}
- const peerPullInc = await ws.db
- .mktx((x) => [x.peerPullDebit])
- .runReadOnly(async (tx) => {
+ const peerPullInc = await ws.db.runReadOnlyTx(
+ ["peerPullDebit"],
+ async (tx) => {
return tx.peerPullDebit.get(peerPullDebitId);
- });
+ },
+ );
if (!peerPullInc) {
throw Error(
@@ -610,16 +612,16 @@ export async function confirmPeerPullDebit(
coinSelRes.result.coins,
);
- await ws.db
- .mktx((x) => [
- x.exchanges,
- x.coins,
- x.denominations,
- x.refreshGroups,
- x.peerPullDebit,
- x.coinAvailability,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "exchanges",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "peerPullDebit",
+ "coinAvailability",
+ ],
+ async (tx) => {
await spendCoins(ws, tx, {
// allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`,
allocationId: constructTransactionIdentifier({
@@ -646,7 +648,8 @@ export async function confirmPeerPullDebit(
};
}
await tx.peerPullDebit.put(pi);
- });
+ },
+ );
const ctx = new PeerPullDebitTransactionContext(ws, peerPullDebitId);
@@ -678,9 +681,9 @@ export async function preparePeerPullDebit(
throw Error("got invalid taler://pay-pull URI");
}
- const existing = await ws.db
- .mktx((x) => [x.peerPullDebit, x.contractTerms])
- .runReadOnly(async (tx) => {
+ const existing = await ws.db.runReadOnlyTx(
+ ["peerPullDebit", "contractTerms"],
+ async (tx) => {
const peerPullDebitRecord =
await tx.peerPullDebit.indexes.byExchangeAndContractPriv.get([
uri.exchangeBaseUrl,
@@ -696,7 +699,8 @@ export async function preparePeerPullDebit(
return;
}
return { peerPullDebitRecord, contractTerms };
- });
+ },
+ );
if (existing) {
return {
@@ -780,25 +784,23 @@ export async function preparePeerPullDebit(
coinSelRes.result.coins,
);
- await ws.db
- .mktx((x) => [x.peerPullDebit, x.contractTerms])
- .runReadWrite(async (tx) => {
- await tx.contractTerms.put({
- h: contractTermsHash,
- contractTermsRaw: contractTerms,
- }),
- await tx.peerPullDebit.add({
- peerPullDebitId,
- contractPriv: contractPriv,
- exchangeBaseUrl: exchangeBaseUrl,
- pursePub: pursePub,
- timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
- contractTermsHash,
- amount: contractTerms.amount,
- status: PeerPullDebitRecordStatus.DialogProposed,
- totalCostEstimated: Amounts.stringify(totalAmount),
- });
- });
+ await ws.db.runReadWriteTx(["peerPullDebit", "contractTerms"], async (tx) => {
+ await tx.contractTerms.put({
+ h: contractTermsHash,
+ contractTermsRaw: contractTerms,
+ }),
+ await tx.peerPullDebit.add({
+ peerPullDebitId,
+ contractPriv: contractPriv,
+ exchangeBaseUrl: exchangeBaseUrl,
+ pursePub: pursePub,
+ timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
+ contractTermsHash,
+ amount: contractTerms.amount,
+ status: PeerPullDebitRecordStatus.DialogProposed,
+ totalCostEstimated: Amounts.stringify(totalAmount),
+ });
+ });
return {
amount: contractTerms.amount,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
index 23976f11b..412631356 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
@@ -363,9 +363,9 @@ export async function preparePeerPushCredit(
throw Error("got invalid taler://pay-push URI");
}
- const existing = await ws.db
- .mktx((x) => [x.contractTerms, x.peerPushCredit])
- .runReadOnly(async (tx) => {
+ const existing = await ws.db.runReadOnlyTx(
+ ["contractTerms", "peerPushCredit"],
+ async (tx) => {
const existingPushInc =
await tx.peerPushCredit.indexes.byExchangeAndContractPriv.get([
uri.exchangeBaseUrl,
@@ -386,7 +386,8 @@ export async function preparePeerPushCredit(
existingContractTermsRec.contractTermsRaw,
);
return { existingPushInc, existingContractTerms };
- });
+ },
+ );
if (existing) {
return {
@@ -457,9 +458,9 @@ export async function preparePeerPushCredit(
undefined,
);
- const transitionInfo = await ws.db
- .mktx((x) => [x.contractTerms, x.peerPushCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["contractTerms", "peerPushCredit"],
+ async (tx) => {
const rec: PeerPushPaymentIncomingRecord = {
peerPushCreditId,
contractPriv: contractPriv,
@@ -489,7 +490,8 @@ export async function preparePeerPushCredit(
},
newTxState,
} satisfies TransitionInfo;
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
@@ -542,9 +544,9 @@ async function longpollKycStatus(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushCredit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushCredit"],
+ async (tx) => {
const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!peerInc) {
return;
@@ -557,7 +559,8 @@ async function longpollKycStatus(
const newTxState = computePeerPushCreditTransactionState(peerInc);
await tx.peerPushCredit.put(peerInc);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
// FIXME: Do we have to update the URL here?
@@ -600,9 +603,9 @@ async function processPeerPushCreditKycRequired(
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusRes.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const { transitionInfo, result } = await ws.db
- .mktx((x) => [x.peerPushCredit])
- .runReadWrite(async (tx) => {
+ const { transitionInfo, result } = await ws.db.runReadWriteTx(
+ ["peerPushCredit"],
+ async (tx) => {
const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!peerInc) {
return {
@@ -634,7 +637,8 @@ async function processPeerPushCreditKycRequired(
transitionInfo: { oldTxState, newTxState },
result: res,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return result;
} else {
@@ -724,16 +728,16 @@ async function handlePendingMerge(
},
});
- const txRes = await ws.db
- .mktx((x) => [
- x.contractTerms,
- x.peerPushCredit,
- x.withdrawalGroups,
- x.reserves,
- x.exchanges,
- x.exchangeDetails,
- ])
- .runReadWrite(async (tx) => {
+ const txRes = await ws.db.runReadWriteTx(
+ [
+ "contractTerms",
+ "peerPushCredit",
+ "withdrawalGroups",
+ "reserves",
+ "exchanges",
+ "exchangeDetails",
+ ],
+ async (tx) => {
const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!peerInc) {
return undefined;
@@ -761,7 +765,8 @@ async function handlePendingMerge(
peerPushCreditTransition: { oldTxState, newTxState },
wgCreateRes,
};
- });
+ },
+ );
// Transaction was committed, now we can emit notifications.
if (txRes?.wgCreateRes?.exchangeNotif) {
ws.notify(txRes.wgCreateRes.exchangeNotif);
@@ -789,9 +794,9 @@ async function handlePendingWithdrawing(
});
const wgId = peerInc.withdrawalGroupId;
let finished: boolean = false;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushCredit, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushCredit", "withdrawalGroups"],
+ async (tx) => {
const ppi = await tx.peerPushCredit.get(peerInc.peerPushCreditId);
if (!ppi) {
finished = true;
@@ -820,7 +825,8 @@ async function handlePendingWithdrawing(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
if (finished) {
return TaskRunResult.finished();
@@ -837,9 +843,9 @@ export async function processPeerPushCredit(
): Promise<TaskRunResult> {
let peerInc: PeerPushPaymentIncomingRecord | undefined;
let contractTerms: PeerContractTerms | undefined;
- await ws.db
- .mktx((x) => [x.contractTerms, x.peerPushCredit])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["contractTerms", "peerPushCredit"],
+ async (tx) => {
peerInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!peerInc) {
return;
@@ -849,7 +855,8 @@ export async function processPeerPushCredit(
contractTerms = ctRec.contractTermsRaw;
}
await tx.peerPushCredit.put(peerInc);
- });
+ },
+ );
if (!peerInc) {
throw Error(
@@ -910,9 +917,9 @@ export async function confirmPeerPushCredit(
throw Error("no transaction ID (or deprecated peerPushCreditId) provided");
}
- await ws.db
- .mktx((x) => [x.contractTerms, x.peerPushCredit])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["contractTerms", "peerPushCredit"],
+ async (tx) => {
peerInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!peerInc) {
return;
@@ -921,7 +928,8 @@ export async function confirmPeerPushCredit(
peerInc.status = PeerPushCreditStatus.PendingMerge;
}
await tx.peerPushCredit.put(peerInc);
- });
+ },
+ );
if (!peerInc) {
throw Error(
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
index 165c8deee..91c5430be 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
@@ -101,22 +101,20 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, pursePub, transactionId } = this;
- await ws.db
- .mktx((x) => [x.peerPushDebit, x.tombstones])
- .runReadWrite(async (tx) => {
- const debit = await tx.peerPushDebit.get(pursePub);
- if (debit) {
- await tx.peerPushDebit.delete(pursePub);
- await tx.tombstones.put({ id: transactionId });
- }
- });
+ await ws.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => {
+ const debit = await tx.peerPushDebit.get(pursePub);
+ if (debit) {
+ await tx.peerPushDebit.delete(pursePub);
+ await tx.tombstones.put({ id: transactionId });
+ }
+ });
}
async suspendTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit"],
+ async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
@@ -164,16 +162,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit"],
+ async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
@@ -216,7 +215,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
@@ -224,9 +224,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit"],
+ async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
@@ -274,16 +274,17 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.startShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit"],
+ async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
@@ -326,7 +327,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
@@ -411,28 +413,26 @@ async function handlePurseCreationConflict(
);
}
- await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
- const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub);
- if (!myPpi) {
- return;
- }
- switch (myPpi.status) {
- case PeerPushDebitStatus.PendingCreatePurse:
- case PeerPushDebitStatus.SuspendedCreatePurse: {
- const sel = coinSelRes.result;
- myPpi.coinSel = {
- coinPubs: sel.coins.map((x) => x.coinPub),
- contributions: sel.coins.map((x) => x.contribution),
- };
- break;
- }
- default:
- return;
+ await ws.db.runReadWriteTx(["peerPushDebit"], async (tx) => {
+ const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub);
+ if (!myPpi) {
+ return;
+ }
+ switch (myPpi.status) {
+ case PeerPushDebitStatus.PendingCreatePurse:
+ case PeerPushDebitStatus.SuspendedCreatePurse: {
+ const sel = coinSelRes.result;
+ myPpi.coinSel = {
+ coinPubs: sel.coins.map((x) => x.coinPub),
+ contributions: sel.coins.map((x) => x.contribution),
+ };
+ break;
}
- await tx.peerPushDebit.put(myPpi);
- });
+ default:
+ return;
+ }
+ await tx.peerPushDebit.put(myPpi);
+ });
return TaskRunResult.progress();
}
@@ -448,11 +448,12 @@ async function processPeerPushDebitCreateReserve(
logger.trace(`processing ${transactionId} pending(create-reserve)`);
- const contractTermsRecord = await ws.db
- .mktx((x) => [x.contractTerms])
- .runReadOnly(async (tx) => {
+ const contractTermsRecord = await ws.db.runReadOnlyTx(
+ ["contractTerms"],
+ async (tx) => {
return tx.contractTerms.get(hContractTerms);
- });
+ },
+ );
if (!contractTermsRecord) {
throw Error(
@@ -583,15 +584,15 @@ async function processPeerPushDebitAbortingDeletePurse(
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.peerPushDebit,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "peerPushDebit",
+ "refreshGroups",
+ "denominations",
+ "coinAvailability",
+ "coins",
+ ],
+ async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
@@ -626,7 +627,8 @@ async function processPeerPushDebitAbortingDeletePurse(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
@@ -646,9 +648,9 @@ async function transitionPeerPushDebitTransaction(
tag: TransactionType.PeerPushDebit,
pursePub,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit"],
+ async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
@@ -664,7 +666,8 @@ async function transitionPeerPushDebitTransaction(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -679,9 +682,9 @@ async function processPeerPushDebitAbortingRefreshDeleted(
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups, x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["refreshGroups", "peerPushDebit"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPushDebitStatus | undefined;
if (!refreshGroup) {
@@ -710,7 +713,8 @@ async function processPeerPushDebitAbortingRefreshDeleted(
return { oldTxState, newTxState };
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
@@ -727,9 +731,9 @@ async function processPeerPushDebitAbortingRefreshExpired(
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups, x.peerPushDebit])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["peerPushDebit", "refreshGroups"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPushDebitStatus | undefined;
if (!refreshGroup) {
@@ -758,7 +762,8 @@ async function processPeerPushDebitAbortingRefreshExpired(
return { oldTxState, newTxState };
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
@@ -810,15 +815,15 @@ async function processPeerPushDebitReady(
}
} else if (resp.status === HttpStatusCode.Gone) {
logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.peerPushDebit,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "peerPushDebit",
+ "refreshGroups",
+ "denominations",
+ "coinAvailability",
+ "coins",
+ ],
+ async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
@@ -853,7 +858,8 @@ async function processPeerPushDebitReady(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
} else {
@@ -867,11 +873,12 @@ export async function processPeerPushDebit(
pursePub: string,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const peerPushInitiation = await ws.db
- .mktx((x) => [x.peerPushDebit])
- .runReadOnly(async (tx) => {
+ const peerPushInitiation = await ws.db.runReadOnlyTx(
+ ["peerPushDebit"],
+ async (tx) => {
return tx.peerPushDebit.get(pursePub);
- });
+ },
+ );
if (!peerPushInitiation) {
throw Error("peer push payment not found");
}
@@ -953,17 +960,17 @@ export async function initiatePeerPushDebit(
const contractEncNonce = encodeCrock(getRandomBytes(24));
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.exchanges,
- x.contractTerms,
- x.coins,
- x.coinAvailability,
- x.denominations,
- x.refreshGroups,
- x.peerPushDebit,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "exchanges",
+ "contractTerms",
+ "coins",
+ "coinAvailability",
+ "denominations",
+ "refreshGroups",
+ "peerPushDebit",
+ ],
+ async (tx) => {
// FIXME: Instead of directly doing a spendCoin here,
// we might want to mark the coins as used and spend them
// after we've been able to create the purse.
@@ -1012,7 +1019,8 @@ export async function initiatePeerPushDebit(
oldTxState: { major: TransactionMajorState.None },
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.notify({
type: NotificationType.BalanceChange,
diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts
index 2dd88b614..b88115d8e 100644
--- a/packages/taler-wallet-core/src/operations/recoup.ts
+++ b/packages/taler-wallet-core/src/operations/recoup.ts
@@ -45,7 +45,7 @@ import {
RecoupGroupRecord,
RecoupOperationStatus,
RefreshCoinSource,
- WalletStoresV1,
+ WalletDbReadWriteTransaction,
WithdrawCoinSource,
WithdrawalGroupStatus,
WithdrawalRecordType,
@@ -54,7 +54,6 @@ import {
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType } from "../pending-types.js";
import { checkDbInvariant } from "../util/invariants.js";
-import { GetReadWriteAccess } from "../util/query.js";
import {
TaskRunResult,
TransactionContext,
@@ -72,12 +71,9 @@ const logger = new Logger("operations/recoup.ts");
*/
async function putGroupAsFinished(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- recoupGroups: typeof WalletStoresV1.recoupGroups;
- denominations: typeof WalletStoresV1.denominations;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coins: typeof WalletStoresV1.coins;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["recoupGroups", "denominations", "refreshGroups", "coins"]
+ >,
recoupGroup: RecoupGroupRecord,
coinIdx: number,
): Promise<void> {
@@ -100,14 +96,9 @@ async function recoupRewardCoin(
// We can't really recoup a coin we got via tipping.
// Thus we just put the coin to sleep.
// FIXME: somehow report this to the user
- await ws.db
- .mktx((stores) => [
- stores.recoupGroups,
- stores.denominations,
- stores.refreshGroups,
- stores.coins,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["recoupGroups", "denominations", "refreshGroups", "coins"],
+ async (tx) => {
const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
@@ -116,7 +107,8 @@ async function recoupRewardCoin(
return;
}
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- });
+ },
+ );
}
async function recoupWithdrawCoin(
@@ -127,17 +119,15 @@ async function recoupWithdrawCoin(
cs: WithdrawCoinSource,
): Promise<void> {
const reservePub = cs.reservePub;
- const denomInfo = await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
- const denomInfo = await ws.getDenomInfo(
- ws,
- tx,
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- );
- return denomInfo;
- });
+ const denomInfo = await ws.db.runReadOnlyTx(["denominations"], async (tx) => {
+ const denomInfo = await ws.getDenomInfo(
+ ws,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ return denomInfo;
+ });
if (!denomInfo) {
// FIXME: We should at least emit some pending operation / warning for this?
return;
@@ -170,9 +160,9 @@ async function recoupWithdrawCoin(
// FIXME: verify that our expectations about the amount match
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.recoupGroups, x.refreshGroups])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["coins", "denominations", "recoupGroups", "refreshGroups"],
+ async (tx) => {
const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
@@ -187,7 +177,8 @@ async function recoupWithdrawCoin(
updatedCoin.status = CoinStatus.Dormant;
await tx.coins.put(updatedCoin);
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- });
+ },
+ );
}
async function recoupRefreshCoin(
@@ -197,9 +188,9 @@ async function recoupRefreshCoin(
coin: CoinRecord,
cs: RefreshCoinSource,
): Promise<void> {
- const d = await ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
+ const d = await ws.db.runReadOnlyTx(
+ ["coins", "denominations"],
+ async (tx) => {
const denomInfo = await ws.getDenomInfo(
ws,
tx,
@@ -210,7 +201,8 @@ async function recoupRefreshCoin(
return;
}
return { denomInfo };
- });
+ },
+ );
if (!d) {
// FIXME: We should at least emit some pending operation / warning for this?
return;
@@ -243,9 +235,9 @@ async function recoupRefreshCoin(
throw Error(`Coin's oldCoinPub doesn't match reserve on recoup`);
}
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.recoupGroups, x.refreshGroups])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["coins", "denominations", "recoupGroups", "refreshGroups"],
+ async (tx) => {
const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
@@ -296,18 +288,17 @@ async function recoupRefreshCoin(
await tx.coins.put(revokedCoin);
await tx.coins.put(oldCoin);
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- });
+ },
+ );
}
export async function processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
): Promise<TaskRunResult> {
- let recoupGroup = await ws.db
- .mktx((x) => [x.recoupGroups])
- .runReadOnly(async (tx) => {
- return tx.recoupGroups.get(recoupGroupId);
- });
+ let recoupGroup = await ws.db.runReadOnlyTx(["recoupGroups"], async (tx) => {
+ return tx.recoupGroups.get(recoupGroupId);
+ });
if (!recoupGroup) {
return TaskRunResult.finished();
}
@@ -325,11 +316,9 @@ export async function processRecoupGroup(
});
await Promise.all(ps);
- recoupGroup = await ws.db
- .mktx((x) => [x.recoupGroups])
- .runReadOnly(async (tx) => {
- return tx.recoupGroups.get(recoupGroupId);
- });
+ recoupGroup = await ws.db.runReadOnlyTx(["recoupGroups"], async (tx) => {
+ return tx.recoupGroups.get(recoupGroupId);
+ });
if (!recoupGroup) {
return TaskRunResult.finished();
}
@@ -346,24 +335,22 @@ export async function processRecoupGroup(
const reservePrivMap: Record<string, string> = {};
for (let i = 0; i < recoupGroup.coinPubs.length; i++) {
const coinPub = recoupGroup.coinPubs[i];
- await ws.db
- .mktx((x) => [x.coins, x.reserves])
- .runReadOnly(async (tx) => {
- const coin = await tx.coins.get(coinPub);
- if (!coin) {
- throw Error(`Coin ${coinPub} not found, can't request recoup`);
- }
- if (coin.coinSource.type === CoinSourceType.Withdraw) {
- const reserve = await tx.reserves.indexes.byReservePub.get(
- coin.coinSource.reservePub,
- );
- if (!reserve) {
- return;
- }
- reserveSet.add(coin.coinSource.reservePub);
- reservePrivMap[coin.coinSource.reservePub] = reserve.reservePriv;
+ await ws.db.runReadOnlyTx(["coins", "reserves"], async (tx) => {
+ const coin = await tx.coins.get(coinPub);
+ if (!coin) {
+ throw Error(`Coin ${coinPub} not found, can't request recoup`);
+ }
+ if (coin.coinSource.type === CoinSourceType.Withdraw) {
+ const reserve = await tx.reserves.indexes.byReservePub.get(
+ coin.coinSource.reservePub,
+ );
+ if (!reserve) {
+ return;
}
- });
+ reserveSet.add(coin.coinSource.reservePub);
+ reservePrivMap[coin.coinSource.reservePub] = reserve.reservePriv;
+ }
+ });
}
for (const reservePub of reserveSet) {
@@ -393,15 +380,15 @@ export async function processRecoupGroup(
});
}
- await ws.db
- .mktx((x) => [
- x.recoupGroups,
- x.coinAvailability,
- x.denominations,
- x.refreshGroups,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "recoupGroups",
+ "coinAvailability",
+ "denominations",
+ "refreshGroups",
+ "coins",
+ ],
+ async (tx) => {
const rg2 = await tx.recoupGroups.get(recoupGroupId);
if (!rg2) {
return;
@@ -422,7 +409,8 @@ export async function processRecoupGroup(
);
}
await tx.recoupGroups.put(rg2);
- });
+ },
+ );
return TaskRunResult.finished();
}
@@ -462,12 +450,9 @@ export class RewardTransactionContext implements TransactionContext {
export async function createRecoupGroup(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- recoupGroups: typeof WalletStoresV1.recoupGroups;
- denominations: typeof WalletStoresV1.denominations;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coins: typeof WalletStoresV1.coins;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["recoupGroups", "denominations", "refreshGroups", "coins"]
+ >,
exchangeBaseUrl: string,
coinPubs: string[],
): Promise<string> {
@@ -507,9 +492,9 @@ async function processRecoupForCoin(
recoupGroupId: string,
coinIdx: number,
): Promise<void> {
- const coin = await ws.db
- .mktx((x) => [x.recoupGroups, x.coins])
- .runReadOnly(async (tx) => {
+ const coin = await ws.db.runReadOnlyTx(
+ ["coins", "recoupGroups"],
+ async (tx) => {
const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
@@ -528,7 +513,8 @@ async function processRecoupForCoin(
throw Error(`Coin ${coinPub} not found, can't request recoup`);
}
return coin;
- });
+ },
+ );
if (!coin) {
return;
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index b9ac12518..ad9fdedb4 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -72,7 +72,6 @@ import {
RefreshCoinStatus,
RefreshGroupRecord,
RefreshOperationStatus,
- WalletStoresV1,
} from "../db.js";
import {
getCandidateWithdrawalDenomsTx,
@@ -81,7 +80,8 @@ import {
RefreshSessionRecord,
TaskId,
timestampPreciseToDb,
- WalletDbReadWriteTransactionArr,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
} from "../index.js";
import {
EXCHANGE_COINS_LOCK,
@@ -90,7 +90,6 @@ import {
import { assertUnreachable } from "../util/assertUnreachable.js";
import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
-import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
import {
constructTaskIdentifier,
makeCoinAvailable,
@@ -129,48 +128,44 @@ export class RefreshTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const refreshGroupId = this.refreshGroupId;
const ws = this.ws;
- await ws.db
- .mktx((x) => [x.refreshGroups, x.tombstones])
- .runReadWrite(async (tx) => {
- const rg = await tx.refreshGroups.get(refreshGroupId);
- if (rg) {
- await tx.refreshGroups.delete(refreshGroupId);
- await tx.tombstones.put({
- id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId,
- });
- }
- });
+ await ws.db.runReadWriteTx(["refreshGroups", "tombstones"], async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (rg) {
+ await tx.refreshGroups.delete(refreshGroupId);
+ await tx.tombstones.put({
+ id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId,
+ });
+ }
+ });
}
async suspendTransaction(): Promise<void> {
const { ws, refreshGroupId, transactionId } = this;
- let res = await ws.db
- .mktx((x) => [x.refreshGroups])
- .runReadWrite(async (tx) => {
- const dg = await tx.refreshGroups.get(refreshGroupId);
- if (!dg) {
- logger.warn(
- `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`,
- );
+ let res = await ws.db.runReadWriteTx(["refreshGroups"], async (tx) => {
+ const dg = await tx.refreshGroups.get(refreshGroupId);
+ if (!dg) {
+ logger.warn(
+ `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`,
+ );
+ return undefined;
+ }
+ const oldState = computeRefreshTransactionState(dg);
+ switch (dg.operationStatus) {
+ case RefreshOperationStatus.Finished:
return undefined;
+ case RefreshOperationStatus.Pending: {
+ dg.operationStatus = RefreshOperationStatus.Suspended;
+ await tx.refreshGroups.put(dg);
+ return {
+ oldTxState: oldState,
+ newTxState: computeRefreshTransactionState(dg),
+ };
}
- const oldState = computeRefreshTransactionState(dg);
- switch (dg.operationStatus) {
- case RefreshOperationStatus.Finished:
- return undefined;
- case RefreshOperationStatus.Pending: {
- dg.operationStatus = RefreshOperationStatus.Suspended;
- await tx.refreshGroups.put(dg);
- return {
- oldTxState: oldState,
- newTxState: computeRefreshTransactionState(dg),
- };
- }
- case RefreshOperationStatus.Suspended:
- return undefined;
- }
- return undefined;
- });
+ case RefreshOperationStatus.Suspended:
+ return undefined;
+ }
+ return undefined;
+ });
if (res) {
ws.notify({
type: NotificationType.TransactionStateTransition,
@@ -188,9 +183,9 @@ export class RefreshTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["refreshGroups"],
+ async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
if (!dg) {
logger.warn(
@@ -214,16 +209,17 @@ export class RefreshTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
const { ws, refreshGroupId, transactionId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["refreshGroups"],
+ async (tx) => {
const dg = await tx.refreshGroups.get(refreshGroupId);
if (!dg) {
logger.warn(
@@ -253,7 +249,8 @@ export class RefreshTransactionContext implements TransactionContext {
oldTxState: oldState,
newTxState: computeRefreshTransactionState(dg),
};
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(this.taskId);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(this.taskId);
@@ -341,9 +338,9 @@ async function provideRefreshSession(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
- const d = await ws.db
- .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions])
- .runReadWrite(async (tx) => {
+ const d = await ws.db.runReadWriteTx(
+ ["coins", "refreshGroups", "refreshSessions"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) {
return;
@@ -363,7 +360,8 @@ async function provideRefreshSession(
throw Error("Can't refresh, coin not found");
}
return { refreshGroup, coin, existingRefreshSession };
- });
+ },
+ );
if (!d) {
return undefined;
@@ -380,9 +378,9 @@ async function provideRefreshSession(
// FIXME: use helper functions from withdraw.ts
// to update and filter withdrawable denoms.
- const { availableAmount, availableDenoms } = await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
+ const { availableAmount, availableDenoms } = await ws.db.runReadOnlyTx(
+ ["denominations"],
+ async (tx) => {
const oldDenom = await ws.getDenomInfo(
ws,
tx,
@@ -405,7 +403,8 @@ async function provideRefreshSession(
oldDenom.feeRefresh,
).amount;
return { availableAmount, availableDenoms };
- });
+ },
+ );
const newCoinDenoms = selectWithdrawalDenominations(
availableAmount,
@@ -424,9 +423,9 @@ async function provideRefreshSession(
availableAmount,
)} too small`,
);
- const transitionInfo = await ws.db
- .mktx((x) => [x.coins, x.coinAvailability, x.refreshGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["refreshGroups", "coins", "coinAvailability"],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
@@ -440,7 +439,8 @@ async function provideRefreshSession(
await tx.refreshGroups.put(rg);
const newTxState = computeRefreshTransactionState(rg);
return { oldTxState, newTxState };
- });
+ },
+ );
ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
@@ -452,9 +452,9 @@ async function provideRefreshSession(
const sessionSecretSeed = encodeCrock(getRandomBytes(64));
// Store refresh session for this coin in the database.
- const mySession = await ws.db
- .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions])
- .runReadWrite(async (tx) => {
+ const mySession = await ws.db.runReadWriteTx(
+ ["refreshGroups", "refreshSessions"],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
@@ -479,7 +479,8 @@ async function provideRefreshSession(
};
await tx.refreshSessions.put(newSession);
return newSession;
- });
+ },
+ );
logger.trace(
`found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
);
@@ -497,9 +498,9 @@ async function refreshMelt(
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
- const d = await ws.db
- .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations])
- .runReadWrite(async (tx) => {
+ const d = await ws.db.runReadWriteTx(
+ ["refreshGroups", "refreshSessions", "coins", "denominations"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) {
return;
@@ -550,7 +551,8 @@ async function refreshMelt(
});
}
return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession };
- });
+ },
+ );
if (!d) {
return;
@@ -618,14 +620,9 @@ async function refreshMelt(
if (resp.status === HttpStatusCode.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.refreshGroups,
- x.refreshSessions,
- x.coins,
- x.coinAvailability,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["refreshGroups", "refreshSessions", "coins", "coinAvailability"],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
@@ -659,7 +656,8 @@ async function refreshMelt(
oldTxState,
newTxState,
};
- });
+ },
+ );
ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
@@ -710,9 +708,9 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex;
- await ws.db
- .mktx((x) => [x.refreshGroups, x.refreshSessions])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["refreshGroups", "refreshSessions"],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
@@ -729,7 +727,8 @@ async function refreshMelt(
}
rs.norevealIndex = norevealIndex;
await tx.refreshSessions.put(rs);
- });
+ },
+ );
}
export async function assembleRefreshRevealRequest(args: {
@@ -798,9 +797,9 @@ async function refreshReveal(
logger.trace(
`doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
);
- const d = await ws.db
- .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations])
- .runReadOnly(async (tx) => {
+ const d = await ws.db.runReadOnlyTx(
+ ["refreshGroups", "refreshSessions", "coins", "denominations"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) {
return;
@@ -859,7 +858,8 @@ async function refreshReveal(
refreshGroup,
norevealIndex,
};
- });
+ },
+ );
if (!d) {
return;
@@ -972,15 +972,15 @@ async function refreshReveal(
}
}
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.coinAvailability,
- x.refreshGroups,
- x.refreshSessions,
- ])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ [
+ "coins",
+ "denominations",
+ "coinAvailability",
+ "refreshGroups",
+ "refreshSessions",
+ ],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
logger.warn("no refresh session found");
@@ -1000,7 +1000,8 @@ async function refreshReveal(
await tx.refreshGroups.put(rg);
const newTxState = computeRefreshTransactionState(rg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
logger.trace("refresh finished (end of reveal)");
}
@@ -1012,9 +1013,10 @@ export async function processRefreshGroup(
): Promise<TaskRunResult> {
logger.trace(`processing refresh group ${refreshGroupId}`);
- const refreshGroup = await ws.db
- .mktx((x) => [x.refreshGroups])
- .runReadOnly(async (tx) => tx.refreshGroups.get(refreshGroupId));
+ const refreshGroup = await ws.db.runReadOnlyTx(
+ ["refreshGroups"],
+ async (tx) => tx.refreshGroups.get(refreshGroupId),
+ );
if (!refreshGroup) {
return TaskRunResult.finished();
}
@@ -1084,16 +1086,17 @@ async function processRefreshSession(
logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
);
- let { refreshGroup, refreshSession } = await ws.db
- .mktx((x) => [x.refreshGroups, x.refreshSessions])
- .runReadOnly(async (tx) => {
+ let { refreshGroup, refreshSession } = await ws.db.runReadOnlyTx(
+ ["refreshGroups", "refreshSessions"],
+ async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
return {
refreshGroup: rg,
refreshSession: rs,
};
- });
+ },
+ );
if (!refreshGroup) {
return;
}
@@ -1122,12 +1125,9 @@ export interface RefreshOutputInfo {
export async function calculateRefreshOutput(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- denominations: typeof WalletStoresV1.denominations;
- coins: typeof WalletStoresV1.coins;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- }>,
+ tx: WalletDbReadOnlyTransaction<
+ ["denominations", "coins", "refreshGroups", "coinAvailability"]
+ >,
currency: string,
oldCoinPubs: CoinRefreshRequest[],
): Promise<RefreshOutputInfo> {
@@ -1196,12 +1196,9 @@ export async function calculateRefreshOutput(
async function applyRefresh(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- denominations: typeof WalletStoresV1.denominations;
- coins: typeof WalletStoresV1.coins;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coinAvailability: typeof WalletStoresV1.coinAvailability;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["denominations", "coins", "refreshGroups", "coinAvailability"]
+ >,
oldCoinPubs: CoinRefreshRequest[],
refreshGroupId: string,
): Promise<void> {
@@ -1272,7 +1269,7 @@ export interface CreateRefreshGroupResult {
*/
export async function createRefreshGroup(
ws: InternalWalletState,
- tx: WalletDbReadWriteTransactionArr<
+ tx: WalletDbReadWriteTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
>,
currency: string,
@@ -1395,14 +1392,9 @@ export async function forceRefresh(
if (req.coinPubList.length == 0) {
throw Error("refusing to create empty refresh group");
}
- const refreshGroupId = await ws.db
- .mktx((x) => [
- x.refreshGroups,
- x.coinAvailability,
- x.denominations,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
+ const refreshGroupId = await ws.db.runReadWriteTx(
+ ["refreshGroups", "coinAvailability", "denominations", "coins"],
+ async (tx) => {
let coinPubs: CoinRefreshRequest[] = [];
for (const c of req.coinPubList) {
const coin = await tx.coins.get(c);
@@ -1429,7 +1421,8 @@ export async function forceRefresh(
RefreshReason.Manual,
undefined,
);
- });
+ },
+ );
return {
refreshGroupId,
diff --git a/packages/taler-wallet-core/src/operations/reward.ts b/packages/taler-wallet-core/src/operations/reward.ts
index 4d8653a9d..7d826e630 100644
--- a/packages/taler-wallet-core/src/operations/reward.ts
+++ b/packages/taler-wallet-core/src/operations/reward.ts
@@ -19,73 +19,29 @@
*/
import {
AcceptTipResponse,
- AgeRestriction,
- Amounts,
- BlindedDenominationSignature,
- codecForMerchantTipResponseV2,
- codecForRewardPickupGetResponse,
- CoinStatus,
- DenomKeyType,
- encodeCrock,
- getRandomBytes,
- j2s,
Logger,
- NotificationType,
- parseRewardUri,
PrepareTipResult,
- TalerErrorCode,
- TalerPreciseTimestamp,
- TipPlanchetDetail,
TransactionAction,
TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
TransactionType,
- URL,
} from "@gnu-taler/taler-util";
-import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js";
-import {
- CoinRecord,
- CoinSourceType,
- DenominationRecord,
- RewardRecord,
- RewardRecordStatus,
- timestampPreciseFromDb,
- timestampPreciseToDb,
- timestampProtocolFromDb,
- timestampProtocolToDb,
-} from "../db.js";
-import { makeErrorDetail } from "@gnu-taler/taler-util";
+import { RewardRecord, RewardRecordStatus } from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
+import { PendingTaskType } from "../pending-types.js";
+import { assertUnreachable } from "../util/assertUnreachable.js";
import {
- getHttpResponseErrorDetails,
- readSuccessResponseJsonOrThrow,
-} from "@gnu-taler/taler-util/http";
-import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import {
- constructTaskIdentifier,
- makeCoinAvailable,
- makeCoinsVisible,
TaskRunResult,
- TaskRunResultType,
TombstoneTag,
TransactionContext,
+ constructTaskIdentifier,
} from "./common.js";
-import { fetchFreshExchange } from "./exchanges.js";
-import {
- getCandidateWithdrawalDenoms,
- getExchangeWithdrawalInfo,
- updateWithdrawalDenoms,
-} from "./withdraw.js";
-import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import {
constructTransactionIdentifier,
notifyTransition,
- parseTransactionIdentifier,
} from "./transactions.js";
-import { PendingTaskType } from "../pending-types.js";
-import { assertUnreachable } from "../util/assertUnreachable.js";
const logger = new Logger("operations/tip.ts");
@@ -109,24 +65,22 @@ export class RewardTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, walletRewardId } = this;
- await ws.db
- .mktx((x) => [x.rewards, x.tombstones])
- .runReadWrite(async (tx) => {
- const tipRecord = await tx.rewards.get(walletRewardId);
- if (tipRecord) {
- await tx.rewards.delete(walletRewardId);
- await tx.tombstones.put({
- id: TombstoneTag.DeleteReward + ":" + walletRewardId,
- });
- }
- });
+ await ws.db.runReadWriteTx(["rewards", "tombstones"], async (tx) => {
+ const tipRecord = await tx.rewards.get(walletRewardId);
+ if (tipRecord) {
+ await tx.rewards.delete(walletRewardId);
+ await tx.tombstones.put({
+ id: TombstoneTag.DeleteReward + ":" + walletRewardId,
+ });
+ }
+ });
}
async suspendTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["rewards"],
+ async (tx) => {
const tipRec = await tx.rewards.get(walletRewardId);
if (!tipRec) {
logger.warn(`transaction tip ${walletRewardId} not found`);
@@ -158,15 +112,16 @@ export class RewardTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["rewards"],
+ async (tx) => {
const tipRec = await tx.rewards.get(walletRewardId);
if (!tipRec) {
logger.warn(`transaction tip ${walletRewardId} not found`);
@@ -197,15 +152,16 @@ export class RewardTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
async resumeTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["rewards"],
+ async (tx) => {
const rewardRec = await tx.rewards.get(walletRewardId);
if (!rewardRec) {
logger.warn(`transaction reward ${walletRewardId} not found`);
@@ -236,15 +192,16 @@ export class RewardTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["rewards"],
+ async (tx) => {
const tipRec = await tx.rewards.get(walletRewardId);
if (!tipRec) {
logger.warn(`transaction tip ${walletRewardId} not found`);
@@ -275,7 +232,8 @@ export class RewardTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
}
@@ -345,368 +303,19 @@ export async function prepareReward(
ws: InternalWalletState,
talerTipUri: string,
): Promise<PrepareTipResult> {
- const res = parseRewardUri(talerTipUri);
- if (!res) {
- throw Error("invalid taler://tip URI");
- }
-
- let tipRecord = await ws.db
- .mktx((x) => [x.rewards])
- .runReadOnly(async (tx) => {
- return tx.rewards.indexes.byMerchantTipIdAndBaseUrl.get([
- res.merchantRewardId,
- res.merchantBaseUrl,
- ]);
- });
-
- if (!tipRecord) {
- const tipStatusUrl = new URL(
- `rewards/${res.merchantRewardId}`,
- res.merchantBaseUrl,
- );
- logger.trace("checking tip status from", tipStatusUrl.href);
- const merchantResp = await ws.http.fetch(tipStatusUrl.href);
- const rewardPickupStatus = await readSuccessResponseJsonOrThrow(
- merchantResp,
- codecForRewardPickupGetResponse(),
- );
- logger.trace(`status ${j2s(rewardPickupStatus)}`);
-
- const amount = Amounts.parseOrThrow(rewardPickupStatus.reward_amount);
- const currency = amount.currency;
-
- logger.trace("new tip, creating tip record");
- await fetchFreshExchange(ws, rewardPickupStatus.exchange_url);
-
- //FIXME: is this needed? withdrawDetails is not used
- // * if the intention is to update the exchange information in the database
- // maybe we can use another name. `get` seems like a pure-function
- const withdrawDetails = await getExchangeWithdrawalInfo(
- ws,
- rewardPickupStatus.exchange_url,
- amount,
- undefined,
- );
-
- const walletRewardId = encodeCrock(getRandomBytes(32));
- await updateWithdrawalDenoms(ws, rewardPickupStatus.exchange_url);
- const denoms = await getCandidateWithdrawalDenoms(
- ws,
- rewardPickupStatus.exchange_url,
- currency,
- );
- const selectedDenoms = selectWithdrawalDenominations(amount, denoms);
-
- const secretSeed = encodeCrock(getRandomBytes(64));
- const denomSelUid = encodeCrock(getRandomBytes(32));
-
- const newTipRecord: RewardRecord = {
- walletRewardId: walletRewardId,
- acceptedTimestamp: undefined,
- status: RewardRecordStatus.DialogAccept,
- rewardAmountRaw: Amounts.stringify(amount),
- rewardExpiration: timestampProtocolToDb(rewardPickupStatus.expiration),
- exchangeBaseUrl: rewardPickupStatus.exchange_url,
- next_url: rewardPickupStatus.next_url,
- merchantBaseUrl: res.merchantBaseUrl,
- createdTimestamp: timestampPreciseToDb(TalerPreciseTimestamp.now()),
- merchantRewardId: res.merchantRewardId,
- rewardAmountEffective: Amounts.stringify(selectedDenoms.totalCoinValue),
- denomsSel: selectedDenoms,
- pickedUpTimestamp: undefined,
- secretSeed,
- denomSelUid,
- };
- await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
- await tx.rewards.put(newTipRecord);
- });
- tipRecord = newTipRecord;
- }
-
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: tipRecord.walletRewardId,
- });
-
- const tipStatus: PrepareTipResult = {
- accepted: !!tipRecord && !!tipRecord.acceptedTimestamp,
- rewardAmountRaw: Amounts.stringify(tipRecord.rewardAmountRaw),
- exchangeBaseUrl: tipRecord.exchangeBaseUrl,
- merchantBaseUrl: tipRecord.merchantBaseUrl,
- expirationTimestamp: timestampProtocolFromDb(tipRecord.rewardExpiration),
- rewardAmountEffective: Amounts.stringify(tipRecord.rewardAmountEffective),
- walletRewardId: tipRecord.walletRewardId,
- transactionId,
- };
-
- return tipStatus;
+ throw Error("the rewards feature is not supported anymore");
}
export async function processTip(
ws: InternalWalletState,
walletTipId: string,
): Promise<TaskRunResult> {
- const tipRecord = await ws.db
- .mktx((x) => [x.rewards])
- .runReadOnly(async (tx) => {
- return tx.rewards.get(walletTipId);
- });
- if (!tipRecord) {
- return TaskRunResult.finished();
- }
-
- switch (tipRecord.status) {
- case RewardRecordStatus.Aborted:
- case RewardRecordStatus.DialogAccept:
- case RewardRecordStatus.Done:
- case RewardRecordStatus.SuspendedPickup:
- return TaskRunResult.finished();
- }
-
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: walletTipId,
- });
-
- const denomsForWithdraw = tipRecord.denomsSel;
-
- const planchets: DerivedTipPlanchet[] = [];
- // Planchets in the form that the merchant expects
- const planchetsDetail: TipPlanchetDetail[] = [];
- const denomForPlanchet: { [index: number]: DenominationRecord } = [];
-
- for (const dh of denomsForWithdraw.selectedDenoms) {
- const denom = await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
- return tx.denominations.get([
- tipRecord.exchangeBaseUrl,
- dh.denomPubHash,
- ]);
- });
- checkDbInvariant(!!denom, "denomination should be in database");
- for (let i = 0; i < dh.count; i++) {
- const deriveReq = {
- denomPub: denom.denomPub,
- planchetIndex: planchets.length,
- secretSeed: tipRecord.secretSeed,
- };
- logger.trace(`deriving tip planchet: ${j2s(deriveReq)}`);
- const p = await ws.cryptoApi.createTipPlanchet(deriveReq);
- logger.trace(`derive result: ${j2s(p)}`);
- denomForPlanchet[planchets.length] = denom;
- planchets.push(p);
- planchetsDetail.push({
- coin_ev: p.coinEv,
- denom_pub_hash: denom.denomPubHash,
- });
- }
- }
-
- const tipStatusUrl = new URL(
- `rewards/${tipRecord.merchantRewardId}/pickup`,
- tipRecord.merchantBaseUrl,
- );
-
- const req = { planchets: planchetsDetail };
- logger.trace(`sending tip request: ${j2s(req)}`);
- const merchantResp = await ws.http.fetch(tipStatusUrl.href, {
- method: "POST",
- body: req,
- });
-
- logger.trace(`got tip response, status ${merchantResp.status}`);
-
- // FIXME: Why do we do this?
- if (
- (merchantResp.status >= 500 && merchantResp.status <= 599) ||
- merchantResp.status === 424
- ) {
- logger.trace(`got transient tip error`);
- // FIXME: wrap in another error code that indicates a transient error
- return {
- type: TaskRunResultType.Error,
- errorDetail: makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
- getHttpResponseErrorDetails(merchantResp),
- "tip pickup failed (transient)",
- ),
- };
- }
- let blindedSigs: BlindedDenominationSignature[] = [];
-
- const response = await readSuccessResponseJsonOrThrow(
- merchantResp,
- codecForMerchantTipResponseV2(),
- );
- blindedSigs = response.blind_sigs.map((x) => x.blind_sig);
-
- if (blindedSigs.length !== planchets.length) {
- throw Error("number of tip responses does not match requested planchets");
- }
-
- const newCoinRecords: CoinRecord[] = [];
-
- for (let i = 0; i < blindedSigs.length; i++) {
- const blindedSig = blindedSigs[i];
-
- const denom = denomForPlanchet[i];
- checkLogicInvariant(!!denom);
- const planchet = planchets[i];
- checkLogicInvariant(!!planchet);
-
- if (denom.denomPub.cipher !== DenomKeyType.Rsa) {
- throw Error("unsupported cipher");
- }
-
- if (blindedSig.cipher !== DenomKeyType.Rsa) {
- throw Error("unsupported cipher");
- }
-
- const denomSigRsa = await ws.cryptoApi.rsaUnblind({
- bk: planchet.blindingKey,
- blindedSig: blindedSig.blinded_rsa_signature,
- pk: denom.denomPub.rsa_public_key,
- });
-
- const isValid = await ws.cryptoApi.rsaVerify({
- hm: planchet.coinPub,
- pk: denom.denomPub.rsa_public_key,
- sig: denomSigRsa.sig,
- });
-
- if (!isValid) {
- return {
- type: TaskRunResultType.Error,
- errorDetail: makeErrorDetail(
- TalerErrorCode.WALLET_REWARD_COIN_SIGNATURE_INVALID,
- {},
- "invalid signature from the exchange (via merchant reward) after unblinding",
- ),
- };
- }
-
- newCoinRecords.push({
- blindingKey: planchet.blindingKey,
- coinPriv: planchet.coinPriv,
- coinPub: planchet.coinPub,
- coinSource: {
- type: CoinSourceType.Reward,
- coinIndex: i,
- walletRewardId: walletTipId,
- },
- sourceTransactionId: transactionId,
- denomPubHash: denom.denomPubHash,
- denomSig: { cipher: DenomKeyType.Rsa, rsa_signature: denomSigRsa.sig },
- exchangeBaseUrl: tipRecord.exchangeBaseUrl,
- status: CoinStatus.Fresh,
- coinEvHash: planchet.coinEvHash,
- maxAge: AgeRestriction.AGE_UNRESTRICTED,
- ageCommitmentProof: planchet.ageCommitmentProof,
- spendAllocation: undefined,
- });
- }
-
- const transitionInfo = await ws.db
- .mktx((x) => [x.coins, x.coinAvailability, x.denominations, x.rewards])
- .runReadWrite(async (tx) => {
- const tr = await tx.rewards.get(walletTipId);
- if (!tr) {
- return;
- }
- if (tr.status !== RewardRecordStatus.PendingPickup) {
- return;
- }
- const oldTxState = computeRewardTransactionStatus(tr);
- tr.pickedUpTimestamp = timestampPreciseToDb(TalerPreciseTimestamp.now());
- tr.status = RewardRecordStatus.Done;
- await tx.rewards.put(tr);
- const newTxState = computeRewardTransactionStatus(tr);
- for (const cr of newCoinRecords) {
- await makeCoinAvailable(ws, tx, cr);
- }
- await makeCoinsVisible(ws, tx, transactionId);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- ws.notify({
- type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
- });
-
return TaskRunResult.finished();
}
-export async function acceptTipBackwardCompat(
- ws: InternalWalletState,
- walletTipId: string | undefined,
- transactionIdParam: TransactionIdStr | undefined,
-): Promise<AcceptTipResponse> {
- if (transactionIdParam) {
- return acceptTip(ws, transactionIdParam);
- }
- if (walletTipId) {
- /**
- * old clients use still use tipId
- */
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: walletTipId,
- });
- return acceptTip(ws, transactionId);
- }
- throw Error(
- "Unable to accept tip: neither tipId (deprecated) nor transactionId was specified",
- );
-}
-
export async function acceptTip(
ws: InternalWalletState,
transactionId: TransactionIdStr,
): Promise<AcceptTipResponse> {
- const pTxId = parseTransactionIdentifier(transactionId);
- if (!pTxId)
- throw Error(`Unable to accept tip: invalid tx tag "${transactionId}"`);
- const rewardId =
- pTxId.tag === TransactionType.Reward ? pTxId.walletRewardId : undefined;
- if (!rewardId)
- throw Error(
- `Unable to accept tip: txId is not a reward tag "${pTxId.tag}"`,
- );
- const dbRes = await ws.db
- .mktx((x) => [x.rewards])
- .runReadWrite(async (tx) => {
- const tipRecord = await tx.rewards.get(rewardId);
- if (!tipRecord) {
- logger.error("tip not found");
- return;
- }
- if (tipRecord.status != RewardRecordStatus.DialogAccept) {
- logger.warn("Unable to accept tip in the current state");
- return { tipRecord };
- }
- const oldTxState = computeRewardTransactionStatus(tipRecord);
- tipRecord.acceptedTimestamp = timestampPreciseToDb(
- TalerPreciseTimestamp.now(),
- );
- tipRecord.status = RewardRecordStatus.PendingPickup;
- await tx.rewards.put(tipRecord);
- const newTxState = computeRewardTransactionStatus(tipRecord);
- return { tipRecord, transitionInfo: { oldTxState, newTxState } };
- });
-
- if (!dbRes) {
- throw Error("tip not found");
- }
-
- notifyTransition(ws, transactionId, dbRes.transitionInfo);
-
- const tipRecord = dbRes.tipRecord;
-
- return {
- transactionId,
- next_url: tipRecord.next_url,
- };
+ throw Error("the rewards feature is not supported anymore");
}
diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts
index 17863450c..3c7845813 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -903,11 +903,9 @@ export async function testPay(
if (r.type != ConfirmPayResultType.Done) {
throw Error("payment not done");
}
- const purchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(result.proposalId);
- });
+ const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
+ return tx.purchases.get(result.proposalId);
+ });
checkLogicInvariant(!!purchase);
return {
payCoinSelection: purchase.payInfo?.payCoinSelection!,
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index 671ccd556..1d3ea3d5a 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -69,13 +69,12 @@ import {
WithdrawalRecordType,
} from "../db.js";
import {
- GetReadOnlyAccess,
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
PeerPushDebitStatus,
timestampPreciseFromDb,
timestampProtocolFromDb,
- WalletStoresV1,
+ WalletDbReadOnlyTransaction,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType, TaskId } from "../pending-types.js";
@@ -229,14 +228,14 @@ export async function getTransactionById(
case TransactionType.InternalWithdrawal:
case TransactionType.Withdrawal: {
const withdrawalGroupId = parsedTx.withdrawalGroupId;
- return await ws.db
- .mktx((x) => [
- x.withdrawalGroups,
- x.exchangeDetails,
- x.exchanges,
- x.operationRetries,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ [
+ "withdrawalGroups",
+ "exchangeDetails",
+ "exchanges",
+ "operationRetries",
+ ],
+ async (tx) => {
const withdrawalGroupRecord =
await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -265,7 +264,8 @@ export async function getTransactionById(
exchangeDetails,
ort,
);
- });
+ },
+ );
}
case TransactionType.Recoup:
@@ -273,15 +273,15 @@ export async function getTransactionById(
case TransactionType.Payment: {
const proposalId = parsedTx.proposalId;
- return await ws.db
- .mktx((x) => [
- x.purchases,
- x.tombstones,
- x.operationRetries,
- x.refundGroups,
- x.contractTerms,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ [
+ "purchases",
+ "tombstones",
+ "operationRetries",
+ "contractTerms",
+ "refundGroups",
+ ],
+ async (tx) => {
const purchase = await tx.purchases.get(proposalId);
if (!purchase) throw Error("not found");
const download = await expectProposalDownload(ws, purchase, tx);
@@ -299,7 +299,8 @@ export async function getTransactionById(
refunds,
payRetryRecord,
);
- });
+ },
+ );
}
case TransactionType.Refresh: {
@@ -322,9 +323,9 @@ export async function getTransactionById(
case TransactionType.Reward: {
const tipId = parsedTx.walletRewardId;
- return await ws.db
- .mktx((x) => [x.rewards, x.operationRetries])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ ["rewards", "operationRetries"],
+ async (tx) => {
const tipRecord = await tx.rewards.get(tipId);
if (!tipRecord) throw Error("not found");
@@ -332,14 +333,15 @@ export async function getTransactionById(
TaskIdentifiers.forTipPickup(tipRecord),
);
return buildTransactionForTip(tipRecord, retries);
- });
+ },
+ );
}
case TransactionType.Deposit: {
const depositGroupId = parsedTx.depositGroupId;
- return await ws.db
- .mktx((x) => [x.depositGroups, x.operationRetries])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ ["depositGroups", "operationRetries"],
+ async (tx) => {
const depositRecord = await tx.depositGroups.get(depositGroupId);
if (!depositRecord) throw Error("not found");
@@ -347,13 +349,14 @@ export async function getTransactionById(
TaskIdentifiers.forDeposit(depositRecord),
);
return buildTransactionForDeposit(depositRecord, retries);
- });
+ },
+ );
}
case TransactionType.Refund: {
- return await ws.db
- .mktx((x) => [x.refundGroups, x.contractTerms, x.purchases])
- .runReadOnly(async (tx) => {
+ return await ws.db.runReadOnlyTx(
+ ["refundGroups", "purchases", "operationRetries", "contractTerms"],
+ async (tx) => {
const refundRecord = await tx.refundGroups.get(
parsedTx.refundGroupId,
);
@@ -365,12 +368,13 @@ export async function getTransactionById(
refundRecord?.proposalId,
);
return buildTransactionForRefund(refundRecord, contractData);
- });
+ },
+ );
}
case TransactionType.PeerPullDebit: {
- return await ws.db
- .mktx((x) => [x.peerPullDebit, x.contractTerms])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ ["peerPullDebit", "contractTerms"],
+ async (tx) => {
const debit = await tx.peerPullDebit.get(parsedTx.peerPullDebitId);
if (!debit) throw Error("not found");
const contractTermsRec = await tx.contractTerms.get(
@@ -382,13 +386,14 @@ export async function getTransactionById(
debit,
contractTermsRec.contractTermsRaw,
);
- });
+ },
+ );
}
case TransactionType.PeerPushDebit: {
- return await ws.db
- .mktx((x) => [x.peerPushDebit, x.contractTerms])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ ["peerPushDebit", "contractTerms"],
+ async (tx) => {
const debit = await tx.peerPushDebit.get(parsedTx.pursePub);
if (!debit) throw Error("not found");
const ct = await tx.contractTerms.get(debit.contractTermsHash);
@@ -397,19 +402,20 @@ export async function getTransactionById(
debit,
ct.contractTermsRaw,
);
- });
+ },
+ );
}
case TransactionType.PeerPushCredit: {
const peerPushCreditId = parsedTx.peerPushCreditId;
- return await ws.db
- .mktx((x) => [
- x.peerPushCredit,
- x.contractTerms,
- x.withdrawalGroups,
- x.operationRetries,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ [
+ "peerPushCredit",
+ "contractTerms",
+ "withdrawalGroups",
+ "operationRetries",
+ ],
+ async (tx) => {
const pushInc = await tx.peerPushCredit.get(peerPushCreditId);
if (!pushInc) throw Error("not found");
const ct = await tx.contractTerms.get(pushInc.contractTermsHash);
@@ -434,19 +440,20 @@ export async function getTransactionById(
wg,
wgOrt,
);
- });
+ },
+ );
}
case TransactionType.PeerPullCredit: {
const pursePub = parsedTx.pursePub;
- return await ws.db
- .mktx((x) => [
- x.peerPullCredit,
- x.contractTerms,
- x.withdrawalGroups,
- x.operationRetries,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ [
+ "peerPullCredit",
+ "contractTerms",
+ "withdrawalGroups",
+ "operationRetries",
+ ],
+ async (tx) => {
const pushInc = await tx.peerPullCredit.get(pursePub);
if (!pushInc) throw Error("not found");
const ct = await tx.contractTerms.get(pushInc.contractTermsHash);
@@ -472,7 +479,8 @@ export async function getTransactionById(
wg,
wgOrt,
);
- });
+ },
+ );
}
}
}
@@ -941,10 +949,7 @@ function buildTransactionForTip(
}
async function lookupMaybeContractData(
- tx: GetReadOnlyAccess<{
- purchases: typeof WalletStoresV1.purchases;
- contractTerms: typeof WalletStoresV1.contractTerms;
- }>,
+ tx: WalletDbReadOnlyTransaction<["purchases", "contractTerms"]>,
proposalId: string,
): Promise<WalletContractData | undefined> {
let contractData: WalletContractData | undefined = undefined;
@@ -1037,14 +1042,9 @@ export async function getWithdrawalTransactionByUri(
ws: InternalWalletState,
request: WithdrawalTransactionByURIRequest,
): Promise<TransactionWithdrawal | undefined> {
- return await ws.db
- .mktx((x) => [
- x.withdrawalGroups,
- x.exchangeDetails,
- x.exchanges,
- x.operationRetries,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ ["withdrawalGroups", "exchangeDetails", "exchanges", "operationRetries"],
+ async (tx) => {
const withdrawalGroupRecord =
await tx.withdrawalGroups.indexes.byTalerWithdrawUri.get(
request.talerWithdrawUri,
@@ -1077,7 +1077,8 @@ export async function getWithdrawalTransactionByUri(
exchangeDetails,
ort,
);
- });
+ },
+ );
}
/**
@@ -1094,29 +1095,29 @@ export async function getTransactions(
filter.onlyState = transactionsRequest.filterByState;
}
- await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.depositGroups,
- x.exchangeDetails,
- x.exchanges,
- x.operationRetries,
- x.peerPullDebit,
- x.peerPushDebit,
- x.peerPushCredit,
- x.peerPullCredit,
- x.planchets,
- x.purchases,
- x.contractTerms,
- x.recoupGroups,
- x.rewards,
- x.tombstones,
- x.withdrawalGroups,
- x.refreshGroups,
- x.refundGroups,
- ])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ [
+ "coins",
+ "denominations",
+ "depositGroups",
+ "exchangeDetails",
+ "exchanges",
+ "operationRetries",
+ "peerPullDebit",
+ "peerPushDebit",
+ "peerPushCredit",
+ "peerPullCredit",
+ "planchets",
+ "purchases",
+ "contractTerms",
+ "recoupGroups",
+ "rewards",
+ "tombstones",
+ "withdrawalGroups",
+ "refreshGroups",
+ "refundGroups",
+ ],
+ async (tx) => {
await iterRecordsForPeerPushDebit(tx, filter, async (pi) => {
const amount = Amounts.parseOrThrow(pi.amount);
const exchangesInTx = [pi.exchangeBaseUrl];
@@ -1463,7 +1464,8 @@ export async function getTransactions(
transactions.push(buildTransactionForTip(tipRecord, retryRecord));
});
//ends REMOVE REWARDS
- });
+ },
+ );
// One-off checks, because of a bug where the wallet previously
// did not migrate the DB correctly and caused these amounts
@@ -1829,9 +1831,7 @@ export function notifyTransition(
* Iterate refresh records based on a filter.
*/
async function iterRecordsForRefresh(
- tx: GetReadOnlyAccess<{
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- }>,
+ tx: WalletDbReadOnlyTransaction<["refreshGroups"]>,
filter: TransactionRecordFilter,
f: (r: RefreshGroupRecord) => Promise<void>,
): Promise<void> {
@@ -1852,9 +1852,7 @@ async function iterRecordsForRefresh(
}
async function iterRecordsForWithdrawal(
- tx: GetReadOnlyAccess<{
- withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
- }>,
+ tx: WalletDbReadOnlyTransaction<["withdrawalGroups"]>,
filter: TransactionRecordFilter,
f: (r: WithdrawalGroupRecord) => Promise<void>,
): Promise<void> {
@@ -1876,9 +1874,7 @@ async function iterRecordsForWithdrawal(
}
async function iterRecordsForDeposit(
- tx: GetReadOnlyAccess<{
- depositGroups: typeof WalletStoresV1.depositGroups;
- }>,
+ tx: WalletDbReadOnlyTransaction<["depositGroups"]>,
filter: TransactionRecordFilter,
f: (r: DepositGroupRecord) => Promise<void>,
): Promise<void> {
@@ -1899,9 +1895,7 @@ async function iterRecordsForDeposit(
}
async function iterRecordsForReward(
- tx: GetReadOnlyAccess<{
- rewards: typeof WalletStoresV1.rewards;
- }>,
+ tx: WalletDbReadOnlyTransaction<["rewards"]>,
filter: TransactionRecordFilter,
f: (r: RewardRecord) => Promise<void>,
): Promise<void> {
@@ -1917,9 +1911,7 @@ async function iterRecordsForReward(
}
async function iterRecordsForRefund(
- tx: GetReadOnlyAccess<{
- refundGroups: typeof WalletStoresV1.refundGroups;
- }>,
+ tx: WalletDbReadOnlyTransaction<["refundGroups"]>,
filter: TransactionRecordFilter,
f: (r: RefundGroupRecord) => Promise<void>,
): Promise<void> {
@@ -1935,9 +1927,7 @@ async function iterRecordsForRefund(
}
async function iterRecordsForPurchase(
- tx: GetReadOnlyAccess<{
- purchases: typeof WalletStoresV1.purchases;
- }>,
+ tx: WalletDbReadOnlyTransaction<["purchases"]>,
filter: TransactionRecordFilter,
f: (r: PurchaseRecord) => Promise<void>,
): Promise<void> {
@@ -1953,9 +1943,7 @@ async function iterRecordsForPurchase(
}
async function iterRecordsForPeerPullCredit(
- tx: GetReadOnlyAccess<{
- peerPullCredit: typeof WalletStoresV1.peerPullCredit;
- }>,
+ tx: WalletDbReadOnlyTransaction<["peerPullCredit"]>,
filter: TransactionRecordFilter,
f: (r: PeerPullCreditRecord) => Promise<void>,
): Promise<void> {
@@ -1971,9 +1959,7 @@ async function iterRecordsForPeerPullCredit(
}
async function iterRecordsForPeerPullDebit(
- tx: GetReadOnlyAccess<{
- peerPullDebit: typeof WalletStoresV1.peerPullDebit;
- }>,
+ tx: WalletDbReadOnlyTransaction<["peerPullDebit"]>,
filter: TransactionRecordFilter,
f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
): Promise<void> {
@@ -1989,9 +1975,7 @@ async function iterRecordsForPeerPullDebit(
}
async function iterRecordsForPeerPushDebit(
- tx: GetReadOnlyAccess<{
- peerPushDebit: typeof WalletStoresV1.peerPushDebit;
- }>,
+ tx: WalletDbReadOnlyTransaction<["peerPushDebit"]>,
filter: TransactionRecordFilter,
f: (r: PeerPushDebitRecord) => Promise<void>,
): Promise<void> {
@@ -2007,9 +1991,7 @@ async function iterRecordsForPeerPushDebit(
}
async function iterRecordsForPeerPushCredit(
- tx: GetReadOnlyAccess<{
- peerPushCredit: typeof WalletStoresV1.peerPushCredit;
- }>,
+ tx: WalletDbReadOnlyTransaction<["peerPushCredit"]>,
filter: TransactionRecordFilter,
f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
): Promise<void> {
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 4d9996cf4..542868de0 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -100,7 +100,12 @@ import {
WithdrawalGroupStatus,
WithdrawalRecordType,
} from "../db.js";
-import { isWithdrawableDenom, timestampPreciseToDb } from "../index.js";
+import {
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
+ isWithdrawableDenom,
+ timestampPreciseToDb,
+} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
TaskRunResult,
@@ -118,11 +123,7 @@ import {
selectWithdrawalDenominations,
} from "../util/coinSelection.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import {
- DbAccess,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "../util/query.js";
+import { DbAccess } from "../util/query.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
@@ -166,9 +167,9 @@ export class WithdrawTransactionContext implements TransactionContext {
async deleteTransaction(): Promise<void> {
const { ws, withdrawalGroupId } = this;
- await ws.db
- .mktx((x) => [x.withdrawalGroups, x.tombstones])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["withdrawalGroups", "tombstones"],
+ async (tx) => {
const withdrawalGroupRecord =
await tx.withdrawalGroups.get(withdrawalGroupId);
if (withdrawalGroupRecord) {
@@ -178,14 +179,15 @@ export class WithdrawTransactionContext implements TransactionContext {
});
return;
}
- });
+ },
+ );
}
async suspendTransaction(): Promise<void> {
const { ws, withdrawalGroupId, transactionId, taskId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
logger.warn(`withdrawal group ${withdrawalGroupId} not found`);
@@ -230,16 +232,17 @@ export class WithdrawTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(taskId);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, withdrawalGroupId, transactionId, taskId } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
logger.warn(`withdrawal group ${withdrawalGroupId} not found`);
@@ -276,7 +279,6 @@ export class WithdrawTransactionContext implements TransactionContext {
case WithdrawalGroupStatus.FailedAbortingBank:
// Not allowed
throw Error("abort not allowed in current state");
- break;
default:
assertUnreachable(wg.status);
}
@@ -291,7 +293,8 @@ export class WithdrawTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(taskId);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(taskId);
@@ -299,9 +302,9 @@ export class WithdrawTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
logger.warn(`withdrawal group ${withdrawalGroupId} not found`);
@@ -346,16 +349,17 @@ export class WithdrawTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
- const stateUpdate = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const stateUpdate = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
logger.warn(`withdrawal group ${withdrawalGroupId} not found`);
@@ -381,13 +385,18 @@ export class WithdrawTransactionContext implements TransactionContext {
};
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, stateUpdate);
ws.taskScheduler.startShepherdTask(retryTag);
}
}
+/**
+ * Compute the DD37 transaction state of a withdrawal transaction
+ * from the database's withdrawal group record.
+ */
export function computeWithdrawalTransactionStatus(
wgRecord: WithdrawalGroupRecord,
): TransactionState {
@@ -494,6 +503,10 @@ export function computeWithdrawalTransactionStatus(
}
}
+/**
+ * Compute DD37 transaction actions for a withdrawal transaction
+ * based on the database's withdrawal group record.
+ */
export function computeWithdrawalTransactionActions(
wgRecord: WithdrawalGroupRecord,
): TransactionAction[] {
@@ -598,21 +611,19 @@ export async function getBankWithdrawalInfo(
/**
* Return denominations that can potentially used for a withdrawal.
*/
-export async function getCandidateWithdrawalDenoms(
+async function getCandidateWithdrawalDenoms(
ws: InternalWalletState,
exchangeBaseUrl: string,
currency: string,
): Promise<DenominationRecord[]> {
- return await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
- return getCandidateWithdrawalDenomsTx(ws, tx, exchangeBaseUrl, currency);
- });
+ return await ws.db.runReadOnlyTx(["denominations"], async (tx) => {
+ return getCandidateWithdrawalDenomsTx(ws, tx, exchangeBaseUrl, currency);
+ });
}
export async function getCandidateWithdrawalDenomsTx(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<{ denominations: typeof WalletStoresV1.denominations }>,
+ tx: WalletDbReadOnlyTransaction<["denominations"]>,
exchangeBaseUrl: string,
currency: string,
): Promise<DenominationRecord[]> {
@@ -636,14 +647,12 @@ async function processPlanchetGenerate(
withdrawalGroup: WithdrawalGroupRecord,
coinIdx: number,
): Promise<void> {
- let planchet = await ws.db
- .mktx((x) => [x.planchets])
- .runReadOnly(async (tx) => {
- return tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- });
+ let planchet = await ws.db.runReadOnlyTx(["planchets"], async (tx) => {
+ return tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ });
if (planchet) {
return;
}
@@ -662,16 +671,14 @@ async function processPlanchetGenerate(
}
const denomPubHash = maybeDenomPubHash;
- const denom = await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
- return ws.getDenomInfo(
- ws,
- tx,
- withdrawalGroup.exchangeBaseUrl,
- denomPubHash,
- );
- });
+ const denom = await ws.db.runReadOnlyTx(["denominations"], async (tx) => {
+ return ws.getDenomInfo(
+ ws,
+ tx,
+ withdrawalGroup.exchangeBaseUrl,
+ denomPubHash,
+ );
+ });
checkDbInvariant(!!denom);
const r = await ws.cryptoApi.createPlanchet({
denomPub: denom.denomPub,
@@ -697,20 +704,18 @@ async function processPlanchetGenerate(
ageCommitmentProof: r.ageCommitmentProof,
lastError: undefined,
};
- await ws.db
- .mktx((x) => [x.planchets])
- .runReadWrite(async (tx) => {
- const p = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (p) {
- planchet = p;
- return;
- }
- await tx.planchets.put(newPlanchet);
- planchet = newPlanchet;
- });
+ await ws.db.runReadWriteTx(["planchets"], async (tx) => {
+ const p = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (p) {
+ planchet = p;
+ return;
+ }
+ await tx.planchets.put(newPlanchet);
+ planchet = newPlanchet;
+ });
}
interface WithdrawalRequestBatchArgs {
@@ -744,9 +749,9 @@ async function transitionKycUrlUpdate(
const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
const transactionId = ctx.transactionId;
- const transitionInfo = await ws.db
- .mktx((x) => [x.planchets, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg2 = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg2) {
return;
@@ -766,7 +771,8 @@ async function transitionKycUrlUpdate(
default:
return undefined;
}
- });
+ },
+ );
if (transitionInfo) {
// Always notify, even on self-transition, as the KYC URL might have changed.
ws.notify({
@@ -814,7 +820,7 @@ async function handleKycRequired(
let amlStatus: AmlStatus | undefined;
if (
kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
@@ -836,9 +842,9 @@ async function handleKycRequired(
let notificationKycUrl: string | undefined = undefined;
- const transitionInfo = await ws.db
- .mktx((x) => [x.planchets, x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["planchets", "withdrawalGroups"],
+ async (tx) => {
for (let i = startIdx; i < requestCoinIdxs.length; i++) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
@@ -885,7 +891,8 @@ async function handleKycRequired(
default:
return undefined;
}
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo, notificationKycUrl);
}
@@ -908,52 +915,45 @@ async function processPlanchetExchangeBatchRequest(
// Indices of coins that are included in the batch request
const requestCoinIdxs: number[] = [];
- await ws.db
- .mktx((x) => [
- x.withdrawalGroups,
- x.planchets,
- x.exchanges,
- x.denominations,
- ])
- .runReadOnly(async (tx) => {
- for (
- let coinIdx = args.coinStartIndex;
- coinIdx < args.coinStartIndex + args.batchSize &&
- coinIdx < wgContext.numPlanchets;
- coinIdx++
- ) {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- continue;
- }
- if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
- logger.warn("processPlanchet: planchet already withdrawn");
- continue;
- }
- const denom = await ws.getDenomInfo(
- ws,
- tx,
- withdrawalGroup.exchangeBaseUrl,
- planchet.denomPubHash,
- );
-
- if (!denom) {
- logger.error("db inconsistent: denom for planchet not found");
- continue;
- }
+ await ws.db.runReadOnlyTx(["planchets", "denominations"], async (tx) => {
+ for (
+ let coinIdx = args.coinStartIndex;
+ coinIdx < args.coinStartIndex + args.batchSize &&
+ coinIdx < wgContext.numPlanchets;
+ coinIdx++
+ ) {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ continue;
+ }
+ if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
+ logger.warn("processPlanchet: planchet already withdrawn");
+ continue;
+ }
+ const denom = await ws.getDenomInfo(
+ ws,
+ tx,
+ withdrawalGroup.exchangeBaseUrl,
+ planchet.denomPubHash,
+ );
- const planchetReq: ExchangeWithdrawRequest = {
- denom_pub_hash: planchet.denomPubHash,
- reserve_sig: planchet.withdrawSig,
- coin_ev: planchet.coinEv,
- };
- batchReq.planchets.push(planchetReq);
- requestCoinIdxs.push(coinIdx);
+ if (!denom) {
+ logger.error("db inconsistent: denom for planchet not found");
+ continue;
}
- });
+
+ const planchetReq: ExchangeWithdrawRequest = {
+ denom_pub_hash: planchet.denomPubHash,
+ reserve_sig: planchet.withdrawSig,
+ coin_ev: planchet.coinEv,
+ };
+ batchReq.planchets.push(planchetReq);
+ requestCoinIdxs.push(coinIdx);
+ }
+ });
if (batchReq.planchets.length == 0) {
logger.warn("empty withdrawal batch");
@@ -967,19 +967,17 @@ async function processPlanchetExchangeBatchRequest(
const errDetail = getErrorDetailFromException(e);
logger.trace("withdrawal request failed", e);
logger.trace(String(e));
- await ws.db
- .mktx((x) => [x.planchets])
- .runReadWrite(async (tx) => {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.lastError = errDetail;
- await tx.planchets.put(planchet);
- });
+ await ws.db.runReadWriteTx(["planchets"], async (tx) => {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = errDetail;
+ await tx.planchets.put(planchet);
+ });
}
// FIXME: handle individual error codes better!
@@ -1026,9 +1024,9 @@ async function processPlanchetVerifyAndStoreCoin(
): Promise<void> {
const withdrawalGroup = wgContext.wgRecord;
logger.trace(`checking and storing planchet idx=${coinIdx}`);
- const d = await ws.db
- .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations])
- .runReadOnly(async (tx) => {
+ const d = await ws.db.runReadOnlyTx(
+ ["planchets", "denominations"],
+ async (tx) => {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
@@ -1054,7 +1052,8 @@ async function processPlanchetVerifyAndStoreCoin(
denomInfo,
exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl,
};
- });
+ },
+ );
if (!d) {
return;
@@ -1090,23 +1089,21 @@ async function processPlanchetVerifyAndStoreCoin(
});
if (!isValid) {
- await ws.db
- .mktx((x) => [x.planchets])
- .runReadWrite(async (tx) => {
- let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
- withdrawalGroup.withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.lastError = makeErrorDetail(
- TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID,
- {},
- "invalid signature from the exchange after unblinding",
- );
- await tx.planchets.put(planchet);
- });
+ await ws.db.runReadWriteTx(["planchets"], async (tx) => {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroup.withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = makeErrorDetail(
+ TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID,
+ {},
+ "invalid signature from the exchange after unblinding",
+ );
+ await tx.planchets.put(planchet);
+ });
return;
}
@@ -1145,46 +1142,38 @@ async function processPlanchetVerifyAndStoreCoin(
wgContext.planchetsFinished.add(planchet.coinPub);
- // Check if this is the first time that the whole
- // withdrawal succeeded. If so, mark the withdrawal
- // group as finished.
- const success = await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.coinAvailability,
- x.withdrawalGroups,
- x.planchets,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ ["planchets", "coins", "coinAvailability", "denominations"],
+ async (tx) => {
const p = await tx.planchets.get(planchetCoinPub);
if (!p || p.planchetStatus === PlanchetStatus.WithdrawalDone) {
- return false;
+ return;
}
p.planchetStatus = PlanchetStatus.WithdrawalDone;
p.lastError = undefined;
await tx.planchets.put(p);
await makeCoinAvailable(ws, tx, coin);
- return true;
- });
+ },
+ );
}
/**
* Make sure that denominations that currently can be used for withdrawal
* are validated, and the result of validation is stored in the database.
*/
-export async function updateWithdrawalDenoms(
+async function updateWithdrawalDenoms(
ws: InternalWalletState,
exchangeBaseUrl: string,
): Promise<void> {
logger.trace(
`updating denominations used for withdrawal for ${exchangeBaseUrl}`,
);
- const exchangeDetails = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ const exchangeDetails = await ws.db.runReadOnlyTx(
+ ["exchanges", "exchangeDetails"],
+ async (tx) => {
return getExchangeWireDetailsInTx(tx, exchangeBaseUrl);
- });
+ },
+ );
if (!exchangeDetails) {
logger.error("exchange details not available");
throw Error(`exchange ${exchangeBaseUrl} details not available`);
@@ -1243,14 +1232,12 @@ export async function updateWithdrawalDenoms(
}
if (updatedDenominations.length > 0) {
logger.trace("writing denomination batch to db");
- await ws.db
- .mktx((x) => [x.denominations])
- .runReadWrite(async (tx) => {
- for (let i = 0; i < updatedDenominations.length; i++) {
- const denom = updatedDenominations[i];
- await tx.denominations.put(denom);
- }
- });
+ await ws.db.runReadWriteTx(["denominations"], async (tx) => {
+ for (let i = 0; i < updatedDenominations.length; i++) {
+ const denom = updatedDenominations[i];
+ await tx.denominations.put(denom);
+ }
+ });
logger.trace("done with DB write");
}
}
@@ -1315,9 +1302,9 @@ async function queryReserve(
logger.trace(`got reserve status ${j2s(result.response)}`);
- const transitionResult = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionResult = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
logger.warn(`withdrawal group ${withdrawalGroupId} not found`);
@@ -1332,7 +1319,8 @@ async function queryReserve(
oldTxState: txStateOld,
newTxState: txStateNew,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionResult);
@@ -1376,9 +1364,9 @@ async function processWithdrawalGroupAbortingBank(
});
logger.info(`abort response status: ${abortResp.status}`);
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
return undefined;
@@ -1392,7 +1380,8 @@ async function processWithdrawalGroupAbortingBank(
oldTxState: txStatusOld,
newTxState: txStatusNew,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
}
@@ -1409,9 +1398,9 @@ async function transitionKycSatisfied(
tag: TransactionType.Withdrawal,
withdrawalGroupId: withdrawalGroup.withdrawalGroupId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg2 = await tx.withdrawalGroups.get(
withdrawalGroup.withdrawalGroupId,
);
@@ -1434,7 +1423,8 @@ async function transitionKycSatisfied(
default:
return undefined;
}
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -1502,9 +1492,9 @@ async function processWithdrawalGroupPendingReady(
if (withdrawalGroup.denomsSel.selectedDenoms.length === 0) {
logger.warn("Finishing empty withdrawal group (no denoms)");
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
return undefined;
@@ -1518,7 +1508,8 @@ async function processWithdrawalGroupPendingReady(
oldTxState: txStatusOld,
newTxState: txStatusNew,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
}
@@ -1533,17 +1524,15 @@ async function processWithdrawalGroupPendingReady(
wgRecord: withdrawalGroup,
};
- await ws.db
- .mktx((x) => [x.planchets])
- .runReadOnly(async (tx) => {
- const planchets =
- await tx.planchets.indexes.byGroup.getAll(withdrawalGroupId);
- for (const p of planchets) {
- if (p.planchetStatus === PlanchetStatus.WithdrawalDone) {
- wgContext.planchetsFinished.add(p.coinPub);
- }
+ await ws.db.runReadOnlyTx(["planchets"], async (tx) => {
+ const planchets =
+ await tx.planchets.indexes.byGroup.getAll(withdrawalGroupId);
+ for (const p of planchets) {
+ if (p.planchetStatus === PlanchetStatus.WithdrawalDone) {
+ wgContext.planchetsFinished.add(p.coinPub);
}
- });
+ }
+ });
// We sequentially generate planchets, so that
// large withdrawal groups don't make the wallet unresponsive.
@@ -1582,9 +1571,9 @@ async function processWithdrawalGroupPendingReady(
let numPlanchetErrors = 0;
const maxReportedErrors = 5;
- const res = await ws.db
- .mktx((x) => [x.coins, x.coinAvailability, x.withdrawalGroups, x.planchets])
- .runReadWrite(async (tx) => {
+ const res = await ws.db.runReadWriteTx(
+ ["coins", "coinAvailability", "withdrawalGroups", "planchets"],
+ async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
return;
@@ -1621,7 +1610,8 @@ async function processWithdrawalGroupPendingReady(
newTxState,
},
};
- });
+ },
+ );
if (!res) {
throw Error("withdrawal group does not exist anymore");
@@ -1655,11 +1645,12 @@ export async function processWithdrawalGroup(
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
- const withdrawalGroup = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadOnly(async (tx) => {
+ const withdrawalGroup = await ws.db.runReadOnlyTx(
+ ["withdrawalGroups"],
+ async (tx) => {
return tx.withdrawalGroups.get(withdrawalGroupId);
- });
+ },
+ );
if (!withdrawalGroup) {
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
@@ -1779,11 +1770,9 @@ export async function getExchangeWithdrawalInfo(
for (let i = 0; i < selectedDenoms.selectedDenoms.length; i++) {
const ds = selectedDenoms.selectedDenoms[i];
// FIXME: Do in one transaction!
- const denom = await ws.db
- .mktx((x) => [x.denominations])
- .runReadOnly(async (tx) => {
- return ws.getDenomInfo(ws, tx, exchangeBaseUrl, ds.denomPubHash);
- });
+ const denom = await ws.db.runReadOnlyTx(["denominations"], async (tx) => {
+ return ws.getDenomInfo(ws, tx, exchangeBaseUrl, ds.denomPubHash);
+ });
checkDbInvariant(!!denom);
hasDenomWithAgeRestriction =
hasDenomWithAgeRestriction || denom.denomPub.age_mask > 0;
@@ -1979,11 +1968,9 @@ export function augmentPaytoUrisForWithdrawal(
* Get payto URIs that can be used to fund a withdrawal operation.
*/
export async function getFundingPaytoUris(
- tx: GetReadOnlyAccess<{
- withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
- exchanges: typeof WalletStoresV1.exchanges;
- exchangeDetails: typeof WalletStoresV1.exchangeDetails;
- }>,
+ tx: WalletDbReadOnlyTransaction<
+ ["withdrawalGroups", "exchanges", "exchangeDetails"]
+ >,
withdrawalGroupId: string,
): Promise<string[]> {
const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId);
@@ -2017,11 +2004,9 @@ async function getWithdrawalGroupRecordTx(
withdrawalGroupId: string;
},
): Promise<WithdrawalGroupRecord | undefined> {
- return await db
- .mktx((x) => [x.withdrawalGroups])
- .runReadOnly(async (tx) => {
- return tx.withdrawalGroups.get(req.withdrawalGroupId);
- });
+ return await db.runReadOnlyTx(["withdrawalGroups"], async (tx) => {
+ return tx.withdrawalGroups.get(req.withdrawalGroupId);
+ });
}
export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration {
@@ -2056,11 +2041,12 @@ async function registerReserveWithBank(
ws: InternalWalletState,
withdrawalGroupId: string,
): Promise<void> {
- const withdrawalGroup = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadOnly(async (tx) => {
+ const withdrawalGroup = await ws.db.runReadOnlyTx(
+ ["withdrawalGroups"],
+ async (tx) => {
return await tx.withdrawalGroups.get(withdrawalGroupId);
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId,
@@ -2094,9 +2080,9 @@ async function registerReserveWithBank(
});
// FIXME: libeufin-bank currently doesn't return a response in the right format, so we don't validate at all.
await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const r = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!r) {
return undefined;
@@ -2122,7 +2108,8 @@ async function registerReserveWithBank(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
}
@@ -2168,9 +2155,9 @@ async function processReserveBankStatus(
if (status.aborted) {
logger.info("bank aborted the withdrawal");
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const r = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!r) {
return;
@@ -2195,7 +2182,8 @@ async function processReserveBankStatus(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
}
@@ -2212,9 +2200,9 @@ async function processReserveBankStatus(
return await processReserveBankStatus(ws, withdrawalGroupId);
}
- const transitionInfo = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["withdrawalGroups"],
+ async (tx) => {
const r = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!r) {
return undefined;
@@ -2247,7 +2235,8 @@ async function processReserveBankStatus(
oldTxState,
newTxState,
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
@@ -2293,11 +2282,12 @@ export async function internalPrepareCreateWithdrawalGroup(
if (args.forcedWithdrawalGroupId) {
withdrawalGroupId = args.forcedWithdrawalGroupId;
const wgId = withdrawalGroupId;
- const existingWg = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadOnly(async (tx) => {
+ const existingWg = await ws.db.runReadOnlyTx(
+ ["withdrawalGroups"],
+ async (tx) => {
return tx.withdrawalGroups.get(wgId);
- });
+ },
+ );
if (existingWg) {
const transactionId = constructTransactionIdentifier({
@@ -2384,11 +2374,9 @@ export interface PerformCreateWithdrawalGroupResult {
export async function internalPerformCreateWithdrawalGroup(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
- reserves: typeof WalletStoresV1.reserves;
- exchanges: typeof WalletStoresV1.exchanges;
- }>,
+ tx: WalletDbReadWriteTransaction<
+ ["withdrawalGroups", "reserves", "exchanges"]
+ >,
prep: PrepareCreateWithdrawalGroupResult,
): Promise<PerformCreateWithdrawalGroupResult> {
const { withdrawalGroup } = prep;
@@ -2477,16 +2465,12 @@ export async function internalCreateWithdrawalGroup(
tag: TransactionType.Withdrawal,
withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId,
});
- const res = await ws.db
- .mktx((x) => [
- x.withdrawalGroups,
- x.reserves,
- x.exchanges,
- x.exchangeDetails,
- ])
- .runReadWrite(async (tx) => {
+ const res = await ws.db.runReadWriteTx(
+ ["withdrawalGroups", "reserves", "exchanges", "exchangeDetails"],
+ async (tx) => {
return await internalPerformCreateWithdrawalGroup(ws, tx, prep);
- });
+ },
+ );
if (res.exchangeNotif) {
ws.notify(res.exchangeNotif);
}
@@ -2507,13 +2491,14 @@ export async function acceptWithdrawalFromUri(
logger.info(
`accepting withdrawal via ${req.talerWithdrawUri}, canonicalized selected exchange ${selectedExchange}`,
);
- const existingWithdrawalGroup = await ws.db
- .mktx((x) => [x.withdrawalGroups])
- .runReadOnly(async (tx) => {
+ const existingWithdrawalGroup = await ws.db.runReadOnlyTx(
+ ["withdrawalGroups"],
+ async (tx) => {
return await tx.withdrawalGroups.indexes.byTalerWithdrawUri.get(
req.talerWithdrawUri,
);
- });
+ },
+ );
if (existingWithdrawalGroup) {
let url: string | undefined;
@@ -2751,11 +2736,12 @@ export async function createManualWithdrawal(
const transactionId = ctx.transactionId;
- const exchangePaytoUris = await ws.db
- .mktx((x) => [x.withdrawalGroups, x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ const exchangePaytoUris = await ws.db.runReadOnlyTx(
+ ["withdrawalGroups", "exchanges", "exchangeDetails"],
+ async (tx) => {
return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);
- });
+ },
+ );
ws.taskScheduler.startShepherdTask(ctx.taskId);
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index a3735e78f..f9290cdd9 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -39,10 +39,9 @@ import {
} from "@gnu-taler/taler-util";
import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
- GetReadOnlyAccess,
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
- WalletStoresV1,
+ WalletDbAllStoresReadOnlyTransaction,
timestampAbsoluteFromDb,
} from "./index.js";
import { InternalWalletState } from "./internal-wallet-state.js";
@@ -214,12 +213,12 @@ export class TaskScheduler {
}
async resetTaskRetries(taskId: TaskId): Promise<void> {
- const maybeNotification = await this.ws.db
- .mktxAll()
- .runReadWrite(async (tx) => {
+ const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
+ async (tx) => {
await tx.operationRetries.delete(taskId);
return taskToRetryNotification(this.ws, tx, taskId, undefined);
- });
+ },
+ );
this.stopShepherdTask(taskId);
if (maybeNotification) {
this.ws.notify(maybeNotification);
@@ -338,7 +337,7 @@ async function storePendingTaskError(
e: TalerErrorDetail,
): Promise<void> {
logger.info(`storing pending task error for ${pendingTaskId}`);
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
@@ -365,7 +364,7 @@ async function storeTaskProgress(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
- await ws.db.mktxAll().runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
@@ -374,7 +373,7 @@ async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
@@ -405,11 +404,9 @@ async function storePendingTaskFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.operationRetries])
- .runReadWrite(async (tx) => {
- await tx.operationRetries.delete(pendingTaskId);
- });
+ await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
}
async function runTaskWithErrorReporting(
@@ -570,7 +567,7 @@ async function callOperationHandlerForTaskId(
*/
async function taskToRetryNotification(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise<WalletNotification | undefined> {
@@ -597,7 +594,7 @@ async function taskToRetryNotification(
async function makeTransactionRetryNotification(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise<WalletNotification | undefined> {
@@ -626,7 +623,7 @@ async function makeTransactionRetryNotification(
async function makeExchangeRetryNotification(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise<WalletNotification | undefined> {
@@ -729,20 +726,20 @@ export async function getActiveTaskIds(
const res: ActiveTaskIdsResult = {
taskIds: [],
};
- await ws.db
- .mktx((x) => [
- x.exchanges,
- x.refreshGroups,
- x.withdrawalGroups,
- x.purchases,
- x.depositGroups,
- x.recoupGroups,
- x.peerPullCredit,
- x.peerPushDebit,
- x.peerPullDebit,
- x.peerPushCredit,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "exchanges",
+ "refreshGroups",
+ "withdrawalGroups",
+ "purchases",
+ "depositGroups",
+ "recoupGroups",
+ "peerPullCredit",
+ "peerPushDebit",
+ "peerPullDebit",
+ "peerPushCredit",
+ ],
+ async (tx) => {
const active = GlobalIDB.KeyRange.bound(
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
@@ -887,7 +884,8 @@ export async function getActiveTaskIds(
}
// FIXME: Recoup!
- });
+ },
+ );
return res;
}
diff --git a/packages/taler-wallet-core/src/util/coinSelection.ts b/packages/taler-wallet-core/src/util/coinSelection.ts
index be868867d..0f6316bce 100644
--- a/packages/taler-wallet-core/src/util/coinSelection.ts
+++ b/packages/taler-wallet-core/src/util/coinSelection.ts
@@ -58,7 +58,7 @@ import { DenominationRecord } from "../db.js";
import {
getExchangeWireDetailsInTx,
isWithdrawableDenom,
- WalletDbReadOnlyTransactionArr,
+ WalletDbReadOnlyTransaction,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
@@ -359,33 +359,31 @@ export async function selectPayCoinsNew(
logger.trace(`coin selection request ${j2s(req)}`);
logger.trace(`selected coins (via denoms) for payment: ${j2s(finalSel)}`);
- await ws.db
- .mktx((x) => [x.coins, x.denominations])
- .runReadOnly(async (tx) => {
- for (const dph of Object.keys(finalSel)) {
- const selInfo = finalSel[dph];
- const numRequested = selInfo.contributions.length;
- const query = [
- selInfo.exchangeBaseUrl,
- selInfo.denomPubHash,
- selInfo.maxAge,
- CoinStatus.Fresh,
- ];
- logger.trace(`query: ${j2s(query)}`);
- const coins =
- await tx.coins.indexes.byExchangeDenomPubHashAndAgeAndStatus.getAll(
- query,
- numRequested,
- );
- if (coins.length != numRequested) {
- throw Error(
- `coin selection failed (not available anymore, got only ${coins.length}/${numRequested})`,
- );
- }
- coinPubs.push(...coins.map((x) => x.coinPub));
- coinContributions.push(...selInfo.contributions);
+ await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ for (const dph of Object.keys(finalSel)) {
+ const selInfo = finalSel[dph];
+ const numRequested = selInfo.contributions.length;
+ const query = [
+ selInfo.exchangeBaseUrl,
+ selInfo.denomPubHash,
+ selInfo.maxAge,
+ CoinStatus.Fresh,
+ ];
+ logger.trace(`query: ${j2s(query)}`);
+ const coins =
+ await tx.coins.indexes.byExchangeDenomPubHashAndAgeAndStatus.getAll(
+ query,
+ numRequested,
+ );
+ if (coins.length != numRequested) {
+ throw Error(
+ `coin selection failed (not available anymore, got only ${coins.length}/${numRequested})`,
+ );
}
- });
+ coinPubs.push(...coins.map((x) => x.coinPub));
+ coinContributions.push(...selInfo.contributions);
+ }
+ });
return {
type: "success",
@@ -911,7 +909,7 @@ export interface PeerCoinSelectionRequest {
*/
async function selectPayPeerCandidatesForExchange(
ws: InternalWalletState,
- tx: WalletDbReadOnlyTransactionArr<["coinAvailability", "denominations"]>,
+ tx: WalletDbReadOnlyTransaction<["coinAvailability", "denominations"]>,
exchangeBaseUrl: string,
): Promise<AvailableDenom[]> {
const denoms: AvailableDenom[] = [];
@@ -1048,17 +1046,17 @@ export async function selectPeerCoins(
// one coin to spend.
throw new Error("amount of zero not allowed");
}
- return await ws.db
- .mktx((x) => [
- x.exchanges,
- x.contractTerms,
- x.coins,
- x.coinAvailability,
- x.denominations,
- x.refreshGroups,
- x.peerPushDebit,
- ])
- .runReadWrite(async (tx) => {
+ return await ws.db.runReadWriteTx(
+ [
+ "exchanges",
+ "contractTerms",
+ "coins",
+ "coinAvailability",
+ "denominations",
+ "refreshGroups",
+ "peerPushDebit",
+ ],
+ async (tx) => {
const exchanges = await tx.exchanges.iter().toArray();
const exchangeFeeGap: { [url: string]: AmountJson } = {};
const currency = Amounts.currencyOf(instructedAmount);
@@ -1232,5 +1230,6 @@ export async function selectPeerCoins(
};
return { type: "failure", insufficientBalanceDetails: errDetails };
- });
+ },
+ );
}
diff --git a/packages/taler-wallet-core/src/util/instructedAmountConversion.ts b/packages/taler-wallet-core/src/util/instructedAmountConversion.ts
index caa3fdca5..c4a2f2d5c 100644
--- a/packages/taler-wallet-core/src/util/instructedAmountConversion.ts
+++ b/packages/taler-wallet-core/src/util/instructedAmountConversion.ts
@@ -139,14 +139,9 @@ async function getAvailableDenoms(
): Promise<AvailableCoins> {
const operationType = getOperationType(TransactionType.Deposit);
- return await ws.db
- .mktx((x) => [
- x.exchanges,
- x.exchangeDetails,
- x.denominations,
- x.coinAvailability,
- ])
- .runReadOnly(async (tx) => {
+ return await ws.db.runReadOnlyTx(
+ ["exchanges", "exchangeDetails", "denominations", "coinAvailability"],
+ async (tx) => {
const list: CoinInfo[] = [];
const exchanges: Record<string, ExchangeInfo> = {};
@@ -304,7 +299,8 @@ async function getAvailableDenoms(
}
return { list, exchanges };
- });
+ },
+ );
}
function buildCoinInfoFromDenom(
diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts
index 19fa0dbfd..90a3cac70 100644
--- a/packages/taler-wallet-core/src/util/query.ts
+++ b/packages/taler-wallet-core/src/util/query.ts
@@ -490,78 +490,14 @@ type ValidateKeyPath<T, P> = P extends `${infer PX extends keyof T &
// foo({x: [0,1,2]}, "x.0");
-export type GetReadOnlyAccess<BoundStores> = {
- [P in keyof BoundStores]: BoundStores[P] extends StoreWithIndexes<
- infer StoreName,
- infer RecordType,
- infer IndexMap
- >
- ? StoreReadOnlyAccessor<RecordType, IndexMap>
- : unknown;
-};
-
export type StoreNames<StoreMap> = StoreMap extends {
[P in keyof StoreMap]: StoreWithIndexes<infer SN1, infer SD1, infer IM1>;
}
? keyof StoreMap
: unknown;
-export type DbReadOnlyTransaction<
- StoreMap,
- Stores extends StoreNames<StoreMap> & string,
-> = StoreMap extends {
- [P in Stores]: StoreWithIndexes<infer SN1, infer SD1, infer IM1>;
-}
- ? {
- [P in Stores]: StoreMap[P] extends StoreWithIndexes<
- infer StoreName,
- infer RecordType,
- infer IndexMap
- >
- ? StoreReadOnlyAccessor<RecordType, IndexMap>
- : unknown;
- }
- : unknown;
-
export type DbReadWriteTransaction<
StoreMap,
- Stores extends StoreNames<StoreMap> & string,
-> = StoreMap extends {
- [P in Stores]: StoreWithIndexes<infer SN1, infer SD1, infer IM1>;
-}
- ? {
- [P in Stores]: StoreMap[P] extends StoreWithIndexes<
- infer StoreName,
- infer RecordType,
- infer IndexMap
- >
- ? StoreReadWriteAccessor<RecordType, IndexMap>
- : unknown;
- }
- : unknown;
-
-export type GetReadWriteAccess<BoundStores> = {
- [P in keyof BoundStores]: BoundStores[P] extends StoreWithIndexes<
- infer StoreName,
- infer RecordType,
- infer IndexMap
- >
- ? StoreReadWriteAccessor<RecordType, IndexMap>
- : unknown;
-};
-
-type ReadOnlyTransactionFunction<BoundStores, T> = (
- t: GetReadOnlyAccess<BoundStores>,
- rawTx: IDBTransaction,
-) => Promise<T>;
-
-type ReadWriteTransactionFunction<BoundStores, T> = (
- t: GetReadWriteAccess<BoundStores>,
- rawTx: IDBTransaction,
-) => Promise<T>;
-
-export type DbReadWriteTransactionArr<
- StoreMap,
StoresArr extends Array<StoreNames<StoreMap>>,
> = StoreMap extends {
[P in string]: StoreWithIndexes<infer _SN1, infer _SD1, infer _IM1>;
@@ -578,7 +514,7 @@ export type DbReadWriteTransactionArr<
}
: never;
-export type DbReadOnlyTransactionArr<
+export type DbReadOnlyTransaction<
StoreMap,
StoresArr extends Array<StoreNames<StoreMap>>,
> = StoreMap extends {
@@ -596,11 +532,6 @@ export type DbReadOnlyTransactionArr<
}
: never;
-export interface DbTransactionContext<BoundStores> {
- runReadWrite<T>(f: ReadWriteTransactionFunction<BoundStores, T>): Promise<T>;
- runReadOnly<T>(f: ReadOnlyTransactionFunction<BoundStores, T>): Promise<T>;
-}
-
/**
* Convert the type of an array to a union of the contents.
*
@@ -811,12 +742,6 @@ function makeWriteContext(
return ctx;
}
-type StoreNamesOf<X> = X extends { [x: number]: infer F }
- ? F extends { storeName: infer I }
- ? I
- : never
- : never;
-
/**
* Type-safe access to a database with a particular store map.
*
@@ -832,65 +757,46 @@ export class DbAccess<StoreMap> {
return this.db;
}
- /**
- * Run a transaction with all object stores.
- */
- mktxAll(): DbTransactionContext<StoreMap> {
- const storeNames: string[] = [];
+ runAllStoresReadWriteTx<T>(
+ txf: (
+ tx: DbReadWriteTransaction<StoreMap, Array<StoreNames<StoreMap>>>,
+ ) => Promise<T>,
+ ): Promise<T> {
const accessibleStores: { [x: string]: StoreWithIndexes<any, any, any> } =
{};
- for (let i = 0; i < this.db.objectStoreNames.length; i++) {
- const sn = this.db.objectStoreNames[i];
+ const strStoreNames: string[] = [];
+ for (const sn of Object.keys(this.stores as any)) {
const swi = (this.stores as any)[sn] as StoreWithIndexes<any, any, any>;
- if (!swi) {
- logger.warn(`store metadata not available (${sn})`);
- continue;
- }
- storeNames.push(sn);
- accessibleStores[sn] = swi;
+ strStoreNames.push(swi.storeName);
+ accessibleStores[swi.storeName] = swi;
}
+ const tx = this.db.transaction(strStoreNames, "readwrite");
+ const writeContext = makeWriteContext(tx, accessibleStores);
+ return runTx(tx, writeContext, txf);
+ }
- const storeMapKeys = Object.keys(this.stores as any);
- for (const storeMapKey of storeMapKeys) {
- const swi = (this.stores as any)[storeMapKey] as StoreWithIndexes<
- any,
- any,
- any
- >;
- if (!accessibleStores[swi.storeName]) {
- const version = this.db.version;
- throw Error(
- `store '${swi.storeName}' required by schema but not in database (minver=${version})`,
- );
- }
+ runAllStoresReadOnlyTx<T>(
+ txf: (
+ tx: DbReadOnlyTransaction<StoreMap, Array<StoreNames<StoreMap>>>,
+ ) => Promise<T>,
+ ): Promise<T> {
+ const accessibleStores: { [x: string]: StoreWithIndexes<any, any, any> } =
+ {};
+ const strStoreNames: string[] = [];
+ for (const sn of Object.keys(this.stores as any)) {
+ const swi = (this.stores as any)[sn] as StoreWithIndexes<any, any, any>;
+ strStoreNames.push(swi.storeName);
+ accessibleStores[swi.storeName] = swi;
}
-
- const runReadOnly = <T>(
- txf: ReadOnlyTransactionFunction<StoreMap, T>,
- ): Promise<T> => {
- const tx = this.db.transaction(storeNames, "readonly");
- const readContext = makeReadContext(tx, accessibleStores);
- return runTx(tx, readContext, txf);
- };
-
- const runReadWrite = <T>(
- txf: ReadWriteTransactionFunction<StoreMap, T>,
- ): Promise<T> => {
- const tx = this.db.transaction(storeNames, "readwrite");
- const writeContext = makeWriteContext(tx, accessibleStores);
- return runTx(tx, writeContext, txf);
- };
-
- return {
- runReadOnly,
- runReadWrite,
- };
+ const tx = this.db.transaction(strStoreNames, "readonly");
+ const writeContext = makeReadContext(tx, accessibleStores);
+ return runTx(tx, writeContext, txf);
}
runReadWriteTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>(
storeNames: StoreNameArray,
txf: (
- tx: DbReadWriteTransactionArr<StoreMap, StoreNameArray>,
+ tx: DbReadWriteTransaction<StoreMap, StoreNameArray>,
) => Promise<T>,
): Promise<T> {
const accessibleStores: { [x: string]: StoreWithIndexes<any, any, any> } =
@@ -908,7 +814,7 @@ export class DbAccess<StoreMap> {
runReadOnlyTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>(
storeNames: StoreNameArray,
- txf: (tx: DbReadOnlyTransactionArr<StoreMap, StoreNameArray>) => Promise<T>,
+ txf: (tx: DbReadOnlyTransaction<StoreMap, StoreNameArray>) => Promise<T>,
): Promise<T> {
const accessibleStores: { [x: string]: StoreWithIndexes<any, any, any> } =
{};
@@ -922,57 +828,4 @@ export class DbAccess<StoreMap> {
const readContext = makeReadContext(tx, accessibleStores);
return runTx(tx, readContext, txf);
}
-
- /**
- * Run a transaction with selected object stores.
- *
- * The {@link namePicker} must be a function that selects a list of object
- * stores from all available object stores.
- */
- mktx<
- StoreNames extends keyof StoreMap,
- Stores extends StoreMap[StoreNames],
- StoreList extends Stores[],
- BoundStores extends {
- [X in StoreNamesOf<StoreList>]: StoreList[number] & { storeName: X };
- },
- >(namePicker: (x: StoreMap) => StoreList): DbTransactionContext<BoundStores> {
- const storeNames: string[] = [];
- const accessibleStores: { [x: string]: StoreWithIndexes<any, any, any> } =
- {};
-
- const storePick = namePicker(this.stores) as any;
- if (typeof storePick !== "object" || storePick === null) {
- throw Error();
- }
- for (const swiPicked of storePick) {
- const swi = swiPicked as StoreWithIndexes<any, any, any>;
- if (swi.mark !== storeWithIndexesSymbol) {
- throw Error("invalid store descriptor returned from selector function");
- }
- storeNames.push(swi.storeName);
- accessibleStores[swi.storeName] = swi;
- }
-
- const runReadOnly = <T>(
- txf: ReadOnlyTransactionFunction<BoundStores, T>,
- ): Promise<T> => {
- const tx = this.db.transaction(storeNames, "readonly");
- const readContext = makeReadContext(tx, accessibleStores);
- return runTx(tx, readContext, txf);
- };
-
- const runReadWrite = <T>(
- txf: ReadWriteTransactionFunction<BoundStores, T>,
- ): Promise<T> => {
- const tx = this.db.transaction(storeNames, "readwrite");
- const writeContext = makeWriteContext(tx, accessibleStores);
- return runTx(tx, writeContext, txf);
- };
-
- return {
- runReadOnly,
- runReadWrite,
- };
- }
}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index cfe171bd0..45970e770 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -141,6 +141,8 @@ import {
CoinSourceType,
ConfigRecordKey,
DenominationRecord,
+ WalletDbReadOnlyTransaction,
+ WalletDbReadWriteTransaction,
WalletStoresV1,
clearDatabase,
exportDb,
@@ -268,11 +270,7 @@ import {
getMaxPeerPushAmount,
} from "./util/instructedAmountConversion.js";
import { checkDbInvariant } from "./util/invariants.js";
-import {
- DbAccess,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "./util/query.js";
+import { DbAccess } from "./util/query.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION,
@@ -306,30 +304,28 @@ async function runTaskLoop(
*/
async function fillDefaults(ws: InternalWalletState): Promise<void> {
const notifications: WalletNotification[] = [];
- await ws.db
- .mktx((x) => [x.config, x.exchanges, x.exchangeDetails])
- .runReadWrite(async (tx) => {
- const appliedRec = await tx.config.get("currencyDefaultsApplied");
- let alreadyApplied = appliedRec ? !!appliedRec.value : false;
- if (alreadyApplied) {
- logger.trace("defaults already applied");
- return;
- }
- for (const exch of ws.config.builtin.exchanges) {
- const resp = await addPresetExchangeEntry(
- tx,
- exch.exchangeBaseUrl,
- exch.currencyHint,
- );
- if (resp.notification) {
- notifications.push(resp.notification);
- }
+ await ws.db.runReadWriteTx(["config", "exchanges"], async (tx) => {
+ const appliedRec = await tx.config.get("currencyDefaultsApplied");
+ let alreadyApplied = appliedRec ? !!appliedRec.value : false;
+ if (alreadyApplied) {
+ logger.trace("defaults already applied");
+ return;
+ }
+ for (const exch of ws.config.builtin.exchanges) {
+ const resp = await addPresetExchangeEntry(
+ tx,
+ exch.exchangeBaseUrl,
+ exch.currencyHint,
+ );
+ if (resp.notification) {
+ notifications.push(resp.notification);
}
- await tx.config.put({
- key: ConfigRecordKey.CurrencyDefaultsApplied,
- value: true,
- });
+ }
+ await tx.config.put({
+ key: ConfigRecordKey.CurrencyDefaultsApplied,
+ value: true,
});
+ });
for (const notif of notifications) {
ws.notify(notif);
}
@@ -344,25 +340,23 @@ async function listKnownBankAccounts(
currency?: string,
): Promise<KnownBankAccounts> {
const accounts: KnownBankAccountsInfo[] = [];
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadOnly(async (tx) => {
- const knownAccounts = await tx.bankAccounts.iter().toArray();
- for (const r of knownAccounts) {
- if (currency && currency !== r.currency) {
- continue;
- }
- const payto = parsePaytoUri(r.uri);
- if (payto) {
- accounts.push({
- uri: payto,
- alias: r.alias,
- kyc_completed: r.kycCompleted,
- currency: r.currency,
- });
- }
+ await ws.db.runReadOnlyTx(["bankAccounts"], async (tx) => {
+ const knownAccounts = await tx.bankAccounts.iter().toArray();
+ for (const r of knownAccounts) {
+ if (currency && currency !== r.currency) {
+ continue;
}
- });
+ const payto = parsePaytoUri(r.uri);
+ if (payto) {
+ accounts.push({
+ uri: payto,
+ alias: r.alias,
+ kyc_completed: r.kycCompleted,
+ currency: r.currency,
+ });
+ }
+ }
+ });
return { accounts };
}
@@ -374,16 +368,14 @@ async function addKnownBankAccounts(
alias: string,
currency: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadWrite(async (tx) => {
- tx.bankAccounts.put({
- uri: payto,
- alias: alias,
- currency: currency,
- kycCompleted: false,
- });
+ await ws.db.runReadWriteTx(["bankAccounts"], async (tx) => {
+ tx.bankAccounts.put({
+ uri: payto,
+ alias: alias,
+ currency: currency,
+ kycCompleted: false,
});
+ });
return;
}
@@ -393,15 +385,13 @@ async function forgetKnownBankAccounts(
ws: InternalWalletState,
payto: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadWrite(async (tx) => {
- const account = await tx.bankAccounts.get(payto);
- if (!account) {
- throw Error(`account not found: ${payto}`);
- }
- tx.bankAccounts.delete(account.uri);
- });
+ await ws.db.runReadWriteTx(["bankAccounts"], async (tx) => {
+ const account = await tx.bankAccounts.get(payto);
+ if (!account) {
+ throw Error(`account not found: ${payto}`);
+ }
+ tx.bankAccounts.delete(account.uri);
+ });
return;
}
@@ -410,41 +400,39 @@ async function setCoinSuspended(
coinPub: string,
suspended: boolean,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.coins, x.coinAvailability])
- .runReadWrite(async (tx) => {
- const c = await tx.coins.get(coinPub);
- if (!c) {
- logger.warn(`coin ${coinPub} not found, won't suspend`);
+ await ws.db.runReadWriteTx(["coins", "coinAvailability"], async (tx) => {
+ const c = await tx.coins.get(coinPub);
+ if (!c) {
+ logger.warn(`coin ${coinPub} not found, won't suspend`);
+ return;
+ }
+ const coinAvailability = await tx.coinAvailability.get([
+ c.exchangeBaseUrl,
+ c.denomPubHash,
+ c.maxAge,
+ ]);
+ checkDbInvariant(!!coinAvailability);
+ if (suspended) {
+ if (c.status !== CoinStatus.Fresh) {
return;
}
- const coinAvailability = await tx.coinAvailability.get([
- c.exchangeBaseUrl,
- c.denomPubHash,
- c.maxAge,
- ]);
- checkDbInvariant(!!coinAvailability);
- if (suspended) {
- if (c.status !== CoinStatus.Fresh) {
- return;
- }
- if (coinAvailability.freshCoinCount === 0) {
- throw Error(
- `invalid coin count ${coinAvailability.freshCoinCount} in DB`,
- );
- }
- coinAvailability.freshCoinCount--;
- c.status = CoinStatus.FreshSuspended;
- } else {
- if (c.status == CoinStatus.Dormant) {
- return;
- }
- coinAvailability.freshCoinCount++;
- c.status = CoinStatus.Fresh;
+ if (coinAvailability.freshCoinCount === 0) {
+ throw Error(
+ `invalid coin count ${coinAvailability.freshCoinCount} in DB`,
+ );
}
- await tx.coins.put(c);
- await tx.coinAvailability.put(coinAvailability);
- });
+ coinAvailability.freshCoinCount--;
+ c.status = CoinStatus.FreshSuspended;
+ } else {
+ if (c.status == CoinStatus.Dormant) {
+ return;
+ }
+ coinAvailability.freshCoinCount++;
+ c.status = CoinStatus.Fresh;
+ }
+ await tx.coins.put(c);
+ await tx.coinAvailability.put(coinAvailability);
+ });
}
/**
@@ -453,57 +441,55 @@ async function setCoinSuspended(
async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
const coinsJson: CoinDumpJson = { coins: [] };
logger.info("dumping coins");
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.withdrawalGroups])
- .runReadOnly(async (tx) => {
- const coins = await tx.coins.iter().toArray();
- for (const c of coins) {
- const denom = await tx.denominations.get([
- c.exchangeBaseUrl,
- c.denomPubHash,
- ]);
- if (!denom) {
- logger.warn("no denom found for coin");
- continue;
- }
- const cs = c.coinSource;
- let refreshParentCoinPub: string | undefined;
- if (cs.type == CoinSourceType.Refresh) {
- refreshParentCoinPub = cs.oldCoinPub;
- }
- let withdrawalReservePub: string | undefined;
- if (cs.type == CoinSourceType.Withdraw) {
- withdrawalReservePub = cs.reservePub;
- }
- const denomInfo = await ws.getDenomInfo(
- ws,
- tx,
- c.exchangeBaseUrl,
- c.denomPubHash,
- );
- if (!denomInfo) {
- logger.warn("no denomination found for coin");
- continue;
- }
- coinsJson.coins.push({
- coin_pub: c.coinPub,
- denom_pub: denomInfo.denomPub,
- denom_pub_hash: c.denomPubHash,
- denom_value: denom.value,
- exchange_base_url: c.exchangeBaseUrl,
- refresh_parent_coin_pub: refreshParentCoinPub,
- withdrawal_reserve_pub: withdrawalReservePub,
- coin_status: c.status,
- ageCommitmentProof: c.ageCommitmentProof,
- spend_allocation: c.spendAllocation
- ? {
- amount: c.spendAllocation.amount,
- id: c.spendAllocation.id,
- }
- : undefined,
- });
+ await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => {
+ const coins = await tx.coins.iter().toArray();
+ for (const c of coins) {
+ const denom = await tx.denominations.get([
+ c.exchangeBaseUrl,
+ c.denomPubHash,
+ ]);
+ if (!denom) {
+ logger.warn("no denom found for coin");
+ continue;
}
- });
+ const cs = c.coinSource;
+ let refreshParentCoinPub: string | undefined;
+ if (cs.type == CoinSourceType.Refresh) {
+ refreshParentCoinPub = cs.oldCoinPub;
+ }
+ let withdrawalReservePub: string | undefined;
+ if (cs.type == CoinSourceType.Withdraw) {
+ withdrawalReservePub = cs.reservePub;
+ }
+ const denomInfo = await ws.getDenomInfo(
+ ws,
+ tx,
+ c.exchangeBaseUrl,
+ c.denomPubHash,
+ );
+ if (!denomInfo) {
+ logger.warn("no denomination found for coin");
+ continue;
+ }
+ coinsJson.coins.push({
+ coin_pub: c.coinPub,
+ denom_pub: denomInfo.denomPub,
+ denom_pub_hash: c.denomPubHash,
+ denom_value: denom.value,
+ exchange_base_url: c.exchangeBaseUrl,
+ refresh_parent_coin_pub: refreshParentCoinPub,
+ withdrawal_reserve_pub: withdrawalReservePub,
+ coin_status: c.status,
+ ageCommitmentProof: c.ageCommitmentProof,
+ spend_allocation: c.spendAllocation
+ ? {
+ amount: c.spendAllocation.amount,
+ id: c.spendAllocation.id,
+ }
+ : undefined,
+ });
+ }
+ });
return coinsJson;
}
@@ -534,7 +520,7 @@ async function createStoredBackup(
const backup = await exportDb(ws.idb);
const backupsDb = await openStoredBackupsDatabase(ws.idb);
const name = `backup-${new Date().getTime()}`;
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ await backupsDb.runAllStoresReadWriteTx(async (tx) => {
await tx.backupMeta.add({
name,
});
@@ -552,7 +538,7 @@ async function listStoredBackups(
storedBackups: [],
};
const backupsDb = await openStoredBackupsDatabase(ws.idb);
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ await backupsDb.runAllStoresReadWriteTx(async (tx) => {
await tx.backupMeta.iter().forEach((x) => {
storedBackups.storedBackups.push({
name: x.name,
@@ -567,7 +553,7 @@ async function deleteStoredBackup(
req: DeleteStoredBackupRequest,
): Promise<void> {
const backupsDb = await openStoredBackupsDatabase(ws.idb);
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ await backupsDb.runAllStoresReadWriteTx(async (tx) => {
await tx.backupData.delete(req.name);
await tx.backupMeta.delete(req.name);
});
@@ -580,7 +566,7 @@ async function recoverStoredBackup(
logger.info(`Recovering stored backup ${req.name}`);
const { name } = req;
const backupsDb = await openStoredBackupsDatabase(ws.idb);
- const bd = await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ const bd = await backupsDb.runAllStoresReadWriteTx(async (tx) => {
const backupMeta = tx.backupMeta.get(name);
if (!backupMeta) {
throw Error("backup not found");
@@ -1526,7 +1512,20 @@ class InternalWalletStateImpl implements InternalWalletState {
async getTransactionState(
ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "depositGroups",
+ "withdrawalGroups",
+ "purchases",
+ "refundGroups",
+ "peerPullCredit",
+ "peerPullDebit",
+ "peerPushDebit",
+ "peerPushCredit",
+ "rewards",
+ "refreshGroups",
+ ]
+ >,
transactionId: string,
): Promise<TransactionState | undefined> {
const parsedTxId = parseTransactionIdentifier(transactionId);
@@ -1613,9 +1612,7 @@ class InternalWalletStateImpl implements InternalWalletState {
async getDenomInfo(
ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- denominations: typeof WalletStoresV1.denominations;
- }>,
+ tx: WalletDbReadWriteTransaction<["denominations"]>,
exchangeBaseUrl: string,
denomPubHash: string,
): Promise<DenominationInfo | undefined> {