summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-07-11 15:41:48 +0200
committerFlorian Dold <florian@dold.me>2023-08-22 08:01:13 +0200
commitb2d0ad57ddf251a109d536cdc49fb6505dbdc50c (patch)
tree7eaeca3ad8ec97a9c1970c1004feda2d61c3441b /packages/taler-wallet-core
parent58fdf9dc091b076787a9746c405fe6a9366f5da6 (diff)
downloadwallet-core-b2d0ad57ddf251a109d536cdc49fb6505dbdc50c.tar.gz
wallet-core-b2d0ad57ddf251a109d536cdc49fb6505dbdc50c.tar.bz2
wallet-core-b2d0ad57ddf251a109d536cdc49fb6505dbdc50c.zip
sqlite3 backend for idb-bridge / wallet-core
Diffstat (limited to 'packages/taler-wallet-core')
-rw-r--r--packages/taler-wallet-core/src/db.ts5
-rw-r--r--packages/taler-wallet-core/src/host-common.ts2
-rw-r--r--packages/taler-wallet-core/src/host-impl.node.ts69
-rw-r--r--packages/taler-wallet-core/src/host-impl.qtart.ts116
-rw-r--r--packages/taler-wallet-core/src/host.ts1
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts350
-rw-r--r--packages/taler-wallet-core/src/operations/testing.ts3
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts31
-rw-r--r--packages/taler-wallet-core/src/util/query.ts4
-rw-r--r--packages/taler-wallet-core/src/wallet.ts1
10 files changed, 453 insertions, 129 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index c7d0b0bda..1d0d3a6e5 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -119,7 +119,7 @@ export const CURRENT_DB_CONFIG_KEY = "currentMainDbName";
* backwards-compatible way or object stores and indices
* are added.
*/
-export const WALLET_DB_MINOR_VERSION = 9;
+export const WALLET_DB_MINOR_VERSION = 10;
/**
* Ranges for operation status fields.
@@ -2675,6 +2675,9 @@ export const WalletStoresV1 = {
}),
{
byProposalId: describeIndex("byProposalId", "proposalId"),
+ byStatus: describeIndex("byStatus", "status", {
+ versionAdded: 10,
+ }),
},
),
refundItems: describeStore(
diff --git a/packages/taler-wallet-core/src/host-common.ts b/packages/taler-wallet-core/src/host-common.ts
index 21e7f1157..c56d7ed1c 100644
--- a/packages/taler-wallet-core/src/host-common.ts
+++ b/packages/taler-wallet-core/src/host-common.ts
@@ -16,7 +16,7 @@
import { WalletNotification } from "@gnu-taler/taler-util";
import { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
-import { WalletConfig, WalletConfigParameter } from "./index.js";
+import { WalletConfigParameter } from "./index.js";
/**
* Helpers to initiate a wallet in a host environment.
diff --git a/packages/taler-wallet-core/src/host-impl.node.ts b/packages/taler-wallet-core/src/host-impl.node.ts
index 150bba49a..ceda7243f 100644
--- a/packages/taler-wallet-core/src/host-impl.node.ts
+++ b/packages/taler-wallet-core/src/host-impl.node.ts
@@ -27,6 +27,7 @@ import type { IDBFactory } from "@gnu-taler/idb-bridge";
import {
BridgeIDBFactory,
MemoryBackend,
+ createSqliteBackend,
shimIndexedDB,
} from "@gnu-taler/idb-bridge";
import { AccessStats } from "@gnu-taler/idb-bridge";
@@ -39,24 +40,21 @@ import { createPlatformHttpLib } from "@gnu-taler/taler-util/http";
import { SetTimeoutTimerAPI } from "./util/timer.js";
import { Wallet } from "./wallet.js";
import { DefaultNodeWalletArgs, makeTempfileId } from "./host-common.js";
+import { createNodeSqlite3Impl } from "@gnu-taler/idb-bridge/node-sqlite3-bindings";
const logger = new Logger("host-impl.node.ts");
-/**
- * Get a wallet instance with default settings for node.
- *
- * Extended version that allows getting DB stats.
- */
-export async function createNativeWalletHost2(
+interface MakeDbResult {
+ idbFactory: BridgeIDBFactory;
+ getStats: () => AccessStats;
+}
+
+async function makeFileDb(
args: DefaultNodeWalletArgs = {},
-): Promise<{
- wallet: Wallet;
- getDbStats: () => AccessStats;
-}> {
+): Promise<MakeDbResult> {
BridgeIDBFactory.enableTracing = false;
const myBackend = new MemoryBackend();
myBackend.enableTracing = false;
-
const storagePath = args.persistentStoragePath;
if (storagePath) {
try {
@@ -96,8 +94,41 @@ export async function createNativeWalletHost2(
BridgeIDBFactory.enableTracing = false;
const myBridgeIdbFactory = new BridgeIDBFactory(myBackend);
- const myIdbFactory: IDBFactory = myBridgeIdbFactory as any as IDBFactory;
+ return {
+ idbFactory: myBridgeIdbFactory,
+ getStats: () => myBackend.accessStats,
+ };
+}
+async function makeSqliteDb(
+ args: DefaultNodeWalletArgs,
+): Promise<MakeDbResult> {
+ BridgeIDBFactory.enableTracing = false;
+ const imp = await createNodeSqlite3Impl();
+ const myBackend = await createSqliteBackend(imp, {
+ filename: args.persistentStoragePath ?? ":memory:",
+ });
+ myBackend.enableTracing = false;
+ const myBridgeIdbFactory = new BridgeIDBFactory(myBackend);
+ return {
+ getStats() {
+ throw Error("not implemented");
+ },
+ idbFactory: myBridgeIdbFactory,
+ };
+}
+
+/**
+ * Get a wallet instance with default settings for node.
+ *
+ * Extended version that allows getting DB stats.
+ */
+export async function createNativeWalletHost2(
+ args: DefaultNodeWalletArgs = {},
+): Promise<{
+ wallet: Wallet;
+ getDbStats: () => AccessStats;
+}> {
let myHttpLib;
if (args.httpLib) {
myHttpLib = args.httpLib;
@@ -115,7 +146,17 @@ export async function createNativeWalletHost2(
);
};
- shimIndexedDB(myBridgeIdbFactory);
+ let dbResp: MakeDbResult;
+
+ if (!args.persistentStoragePath || args.persistentStoragePath.endsWith(".json")) {
+ dbResp = await makeFileDb(args);
+ } else {
+ dbResp = await makeSqliteDb(args);
+ }
+
+ const myIdbFactory: IDBFactory = dbResp.idbFactory as any as IDBFactory;
+
+ shimIndexedDB(dbResp.idbFactory);
const myDb = await openTalerDatabase(myIdbFactory, myVersionChange);
@@ -158,6 +199,6 @@ export async function createNativeWalletHost2(
}
return {
wallet: w,
- getDbStats: () => myBackend.accessStats,
+ getDbStats: dbResp.getStats,
};
}
diff --git a/packages/taler-wallet-core/src/host-impl.qtart.ts b/packages/taler-wallet-core/src/host-impl.qtart.ts
index d10914b10..390282f8c 100644
--- a/packages/taler-wallet-core/src/host-impl.qtart.ts
+++ b/packages/taler-wallet-core/src/host-impl.qtart.ts
@@ -22,11 +22,17 @@
/**
* Imports.
*/
-import type { IDBFactory } from "@gnu-taler/idb-bridge";
+import type {
+ IDBFactory,
+ ResultRow,
+ Sqlite3Interface,
+ Sqlite3Statement,
+} from "@gnu-taler/idb-bridge";
// eslint-disable-next-line no-duplicate-imports
import {
BridgeIDBFactory,
MemoryBackend,
+ createSqliteBackend,
shimIndexedDB,
} from "@gnu-taler/idb-bridge";
import { AccessStats } from "@gnu-taler/idb-bridge";
@@ -41,12 +47,78 @@ import { DefaultNodeWalletArgs, makeTempfileId } from "./host-common.js";
const logger = new Logger("host-impl.qtart.ts");
-export async function createNativeWalletHost2(
+interface MakeDbResult {
+ idbFactory: BridgeIDBFactory;
+ getStats: () => AccessStats;
+}
+
+let numStmt = 0;
+
+export async function createQtartSqlite3Impl(): Promise<Sqlite3Interface> {
+ const tart: any = (globalThis as any)._tart;
+ if (!tart) {
+ throw Error("globalThis._qtart not defined");
+ }
+ return {
+ open(filename: string) {
+ const internalDbHandle = tart.sqlite3Open(filename);
+ return {
+ internalDbHandle,
+ close() {
+ tart.sqlite3Close(internalDbHandle);
+ },
+ prepare(stmtStr): Sqlite3Statement {
+ const stmtHandle = tart.sqlite3Prepare(internalDbHandle, stmtStr);
+ return {
+ internalStatement: stmtHandle,
+ getAll(params): ResultRow[] {
+ numStmt++;
+ return tart.sqlite3StmtGetAll(stmtHandle, params);
+ },
+ getFirst(params): ResultRow | undefined {
+ numStmt++;
+ return tart.sqlite3StmtGetFirst(stmtHandle, params);
+ },
+ run(params) {
+ numStmt++;
+ return tart.sqlite3StmtRun(stmtHandle, params);
+ },
+ };
+ },
+ exec(sqlStr): void {
+ numStmt++;
+ tart.sqlite3Exec(internalDbHandle, sqlStr);
+ },
+ };
+ },
+ };
+}
+
+async function makeSqliteDb(
+ args: DefaultNodeWalletArgs,
+): Promise<MakeDbResult> {
+ BridgeIDBFactory.enableTracing = false;
+ const imp = await createQtartSqlite3Impl();
+ const myBackend = await createSqliteBackend(imp, {
+ filename: args.persistentStoragePath ?? ":memory:",
+ });
+ myBackend.trackStats = true;
+ myBackend.enableTracing = false;
+ const myBridgeIdbFactory = new BridgeIDBFactory(myBackend);
+ return {
+ getStats() {
+ return {
+ ...myBackend.accessStats,
+ primitiveStatements: numStmt,
+ }
+ },
+ idbFactory: myBridgeIdbFactory,
+ };
+}
+
+async function makeFileDb(
args: DefaultNodeWalletArgs = {},
-): Promise<{
- wallet: Wallet;
- getDbStats: () => AccessStats;
-}> {
+): Promise<MakeDbResult> {
BridgeIDBFactory.enableTracing = false;
const myBackend = new MemoryBackend();
myBackend.enableTracing = false;
@@ -78,12 +150,34 @@ export async function createNativeWalletHost2(
};
}
- logger.info("done processing storage path");
+ const myBridgeIdbFactory = new BridgeIDBFactory(myBackend);
+ return {
+ idbFactory: myBridgeIdbFactory,
+ getStats: () => myBackend.accessStats,
+ };
+}
+export async function createNativeWalletHost2(
+ args: DefaultNodeWalletArgs = {},
+): Promise<{
+ wallet: Wallet;
+ getDbStats: () => AccessStats;
+}> {
BridgeIDBFactory.enableTracing = false;
- const myBridgeIdbFactory = new BridgeIDBFactory(myBackend);
- const myIdbFactory: IDBFactory = myBridgeIdbFactory as any as IDBFactory;
+ let dbResp: MakeDbResult;
+
+ if (args.persistentStoragePath && args.persistentStoragePath.endsWith(".json")) {
+ logger.info("using JSON file backend (slow!)");
+ dbResp = await makeFileDb(args);
+ } else {
+ logger.info("using JSON file backend (experimental!)");
+ dbResp = await makeSqliteDb(args)
+ }
+
+ const myIdbFactory: IDBFactory = dbResp.idbFactory as any as IDBFactory;
+
+ shimIndexedDB(dbResp.idbFactory);
let myHttpLib;
if (args.httpLib) {
@@ -102,8 +196,6 @@ export async function createNativeWalletHost2(
);
};
- shimIndexedDB(myBridgeIdbFactory);
-
const myDb = await openTalerDatabase(myIdbFactory, myVersionChange);
let workerFactory;
@@ -124,6 +216,6 @@ export async function createNativeWalletHost2(
}
return {
wallet: w,
- getDbStats: () => myBackend.accessStats,
+ getDbStats: dbResp.getStats,
};
}
diff --git a/packages/taler-wallet-core/src/host.ts b/packages/taler-wallet-core/src/host.ts
index 4b319f081..feccf42a6 100644
--- a/packages/taler-wallet-core/src/host.ts
+++ b/packages/taler-wallet-core/src/host.ts
@@ -16,7 +16,6 @@
import { DefaultNodeWalletArgs } from "./host-common.js";
import { Wallet } from "./index.js";
-
import * as hostImpl from "#host-impl";
import { AccessStats } from "@gnu-taler/idb-bridge";
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index cc9217d67..6c6546f83 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -34,13 +34,24 @@ import {
WithdrawalGroupStatus,
RewardRecordStatus,
DepositOperationStatus,
+ RefreshGroupRecord,
+ WithdrawalGroupRecord,
+ DepositGroupRecord,
+ RewardRecord,
+ PurchaseRecord,
+ PeerPullPaymentInitiationRecord,
+ PeerPullPaymentIncomingRecord,
+ PeerPushPaymentInitiationRecord,
+ PeerPushPaymentIncomingRecord,
+ RefundGroupRecord,
+ RefundGroupStatus,
} from "../db.js";
import {
PendingOperationsResponse,
PendingTaskType,
TaskId,
} from "../pending-types.js";
-import { AbsoluteTime } from "@gnu-taler/taler-util";
+import { AbsoluteTime, TransactionRecordFilter } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { GlobalIDB } from "@gnu-taler/idb-bridge";
@@ -105,6 +116,32 @@ async function gatherExchangePending(
});
}
+/**
+ * Iterate refresh records based on a filter.
+ */
+export async function iterRecordsForRefresh(
+ tx: GetReadOnlyAccess<{
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefreshGroupRecord) => Promise<void>,
+): Promise<void> {
+ let refreshGroups: RefreshGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OperationStatusRange.ACTIVE_START,
+ OperationStatusRange.ACTIVE_END,
+ );
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
+ } else {
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
+ }
+
+ for (const r of refreshGroups) {
+ await f(r);
+ }
+}
+
async function gatherRefreshPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -114,22 +151,13 @@ async function gatherRefreshPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- OperationStatusRange.ACTIVE_START,
- OperationStatusRange.ACTIVE_END,
- );
- const refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(
- keyRange,
- );
- for (const r of refreshGroups) {
+ await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => {
if (r.timestampFinished) {
return;
}
const opId = TaskIdentifiers.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId);
-
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
-
resp.pendingOperations.push({
type: PendingTaskType.Refresh,
...getPendingCommon(ws, opId, timestampDue),
@@ -140,6 +168,30 @@ async function gatherRefreshPending(
),
retryInfo: retryRecord?.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForWithdrawal(
+ tx: GetReadOnlyAccess<{
+ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: WithdrawalGroupRecord) => Promise<void>,
+): Promise<void> {
+ let withdrawalGroupRecords: WithdrawalGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const range = GlobalIDB.KeyRange.bound(
+ WithdrawalGroupStatus.PendingRegisteringBank,
+ WithdrawalGroupStatus.PendingAml,
+ );
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(range);
+ } else {
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll();
+ }
+ for (const wgr of withdrawalGroupRecords) {
+ await f(wgr);
}
}
@@ -153,12 +205,7 @@ async function gatherWithdrawalPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const range = GlobalIDB.KeyRange.bound(
- WithdrawalGroupStatus.PendingRegisteringBank,
- WithdrawalGroupStatus.PendingAml,
- );
- const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(range);
- for (const wsr of wsrs) {
+ await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => {
const opTag = TaskIdentifiers.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
@@ -184,6 +231,30 @@ async function gatherWithdrawalPending(
lastError: opr.lastError,
retryInfo: opr.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForDeposit(
+ tx: GetReadOnlyAccess<{
+ depositGroups: typeof WalletStoresV1.depositGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: DepositGroupRecord) => Promise<void>,
+): Promise<void> {
+ let dgs: DepositGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ dgs = await tx.depositGroups.indexes.byStatus.getAll(
+ GlobalIDB.KeyRange.bound(
+ DepositOperationStatus.PendingDeposit,
+ DepositOperationStatus.PendingKyc,
+ ),
+ );
+ } else {
+ dgs = await tx.depositGroups.indexes.byStatus.getAll();
+ }
+
+ for (const dg of dgs) {
+ await f(dg);
}
}
@@ -196,16 +267,7 @@ async function gatherDepositPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const dgs = await tx.depositGroups.indexes.byStatus.getAll(
- GlobalIDB.KeyRange.bound(
- DepositOperationStatus.PendingDeposit,
- DepositOperationStatus.PendingKyc,
- ),
- );
- for (const dg of dgs) {
- if (dg.timestampFinished) {
- return;
- }
+ await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => {
let deposited = true;
for (const d of dg.depositedPerCoin) {
if (!d) {
@@ -226,10 +288,28 @@ async function gatherDepositPending(
lastError: retryRecord?.lastError,
retryInfo: retryRecord?.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForReward(
+ tx: GetReadOnlyAccess<{
+ rewards: typeof WalletStoresV1.rewards;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RewardRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const range = GlobalIDB.KeyRange.bound(
+ RewardRecordStatus.PendingPickup,
+ RewardRecordStatus.PendingPickup,
+ );
+ await tx.rewards.indexes.byStatus.iter(range).forEachAsync(f);
+ } else {
+ await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
}
}
-async function gatherTipPending(
+async function gatherRewardPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
rewards: typeof WalletStoresV1.rewards;
@@ -238,15 +318,7 @@ async function gatherTipPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const range = GlobalIDB.KeyRange.bound(
- RewardRecordStatus.PendingPickup,
- RewardRecordStatus.PendingPickup,
- );
- await tx.rewards.indexes.byStatus.iter(range).forEachAsync(async (tip) => {
- // FIXME: The tip record needs a proper status field!
- if (tip.pickedUpTimestamp) {
- return;
- }
+ await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => {
const opId = TaskIdentifiers.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@@ -264,6 +336,43 @@ async function gatherTipPending(
});
}
+export async function iterRecordsForRefund(
+ tx: GetReadOnlyAccess<{
+ refundGroups: typeof WalletStoresV1.refundGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefundGroupRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.only(
+ RefundGroupStatus.Pending
+ );
+ await tx.refundGroups.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.refundGroups.iter().forEachAsync(f);
+ }
+}
+
+export async function iterRecordsForPurchase(
+ tx: GetReadOnlyAccess<{
+ purchases: typeof WalletStoresV1.purchases;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PurchaseRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PurchaseStatus.PendingDownloadingProposal,
+ PurchaseStatus.PendingAcceptRefund,
+ );
+ await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
async function gatherPurchasePending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -273,27 +382,20 @@ async function gatherPurchasePending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PurchaseStatus.PendingDownloadingProposal,
- PurchaseStatus.PendingAcceptRefund,
- );
- await tx.purchases.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pr) => {
- const opId = TaskIdentifiers.forPay(pr);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Purchase,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- statusStr: PurchaseStatus[pr.purchaseStatus],
- proposalId: pr.proposalId,
- retryInfo: retryRecord?.retryInfo,
- lastError: retryRecord?.lastError,
- });
+ await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => {
+ const opId = TaskIdentifiers.forPay(pr);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
+ resp.pendingOperations.push({
+ type: PendingTaskType.Purchase,
+ ...getPendingCommon(ws, opId, timestampDue),
+ givesLifeness: true,
+ statusStr: PurchaseStatus[pr.purchaseStatus],
+ proposalId: pr.proposalId,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
+ });
}
async function gatherRecoupPending(
@@ -362,6 +464,26 @@ async function gatherBackupPending(
});
}
+export async function iterRecordsForPeerPullInitiation(
+ tx: GetReadOnlyAccess<{
+ peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullPaymentInitiationRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPullPaymentInitiationStatus.PendingCreatePurse,
+ PeerPullPaymentInitiationStatus.AbortingDeletePurse,
+ );
+ await tx.peerPullPaymentInitiations.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPullPaymentInitiations.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
async function gatherPeerPullInitiationPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -371,13 +493,10 @@ async function gatherPeerPullInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPullPaymentInitiationStatus.PendingCreatePurse,
- PeerPullPaymentInitiationStatus.AbortingDeletePurse,
- );
- await tx.peerPullPaymentInitiations.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPullInitiation(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -389,7 +508,28 @@ async function gatherPeerPullInitiationPending(
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPullDebit(
+ tx: GetReadOnlyAccess<{
+ peerPullPaymentIncoming: typeof WalletStoresV1.peerPullPaymentIncoming;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPullDebitRecordStatus.PendingDeposit,
+ PeerPullDebitRecordStatus.AbortingRefresh,
+ );
+ await tx.peerPullPaymentIncoming.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPullPaymentIncoming.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPullDebitPending(
@@ -401,13 +541,10 @@ async function gatherPeerPullDebitPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPullDebitRecordStatus.PendingDeposit,
- PeerPullDebitRecordStatus.AbortingRefresh,
- );
- await tx.peerPullPaymentIncoming.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPullDebit(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -419,7 +556,28 @@ async function gatherPeerPullDebitPending(
retryInfo: retryRecord?.retryInfo,
peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPushInitiation(
+ tx: GetReadOnlyAccess<{
+ peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushPaymentInitiationRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPushPaymentInitiationStatus.PendingCreatePurse,
+ PeerPushPaymentInitiationStatus.AbortingRefresh,
+ );
+ await tx.peerPushPaymentInitiations.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPushPaymentInitiations.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPushInitiationPending(
@@ -431,13 +589,10 @@ async function gatherPeerPushInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPushPaymentInitiationStatus.PendingCreatePurse,
- PeerPushPaymentInitiationStatus.AbortingRefresh,
- );
- await tx.peerPushPaymentInitiations.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushInitiation(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -449,7 +604,28 @@ async function gatherPeerPushInitiationPending(
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPushCredit(
+ tx: GetReadOnlyAccess<{
+ peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPushPaymentIncomingStatus.PendingMerge,
+ PeerPushPaymentIncomingStatus.PendingWithdrawing,
+ );
+ await tx.peerPushPaymentIncoming.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPushPaymentIncoming.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPushCreditPending(
@@ -465,9 +641,10 @@ async function gatherPeerPushCreditPending(
PeerPushPaymentIncomingStatus.PendingMerge,
PeerPushPaymentIncomingStatus.PendingWithdrawing,
);
- await tx.peerPushPaymentIncoming.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushCredit(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -479,7 +656,8 @@ async function gatherPeerPushCreditPending(
retryInfo: retryRecord?.retryInfo,
peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId,
});
- });
+ },
+ );
}
export async function getPendingOperations(
@@ -513,7 +691,7 @@ export async function getPendingOperations(
await gatherRefreshPending(ws, tx, now, resp);
await gatherWithdrawalPending(ws, tx, now, resp);
await gatherDepositPending(ws, tx, now, resp);
- await gatherTipPending(ws, tx, now, resp);
+ await gatherRewardPending(ws, tx, now, resp);
await gatherPurchasePending(ws, tx, now, resp);
await gatherRecoupPending(ws, tx, now, resp);
await gatherBackupPending(ws, tx, now, resp);
diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts
index ea373e914..3090549d5 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -472,12 +472,15 @@ export async function waitUntilDone(ws: InternalWalletState): Promise<void> {
p = openPromise();
const txs = await getTransactions(ws, {
includeRefreshes: true,
+ filterByState: "nonfinal",
});
let finished = true;
for (const tx of txs.transactions) {
switch (tx.txState.major) {
case TransactionMajorState.Pending:
case TransactionMajorState.Aborting:
+ case TransactionMajorState.Suspended:
+ case TransactionMajorState.SuspendedAborting:
finished = false;
logger.info(
`continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index a16809b36..af04cb161 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -36,6 +36,7 @@ import {
TransactionByIdRequest,
TransactionIdStr,
TransactionMajorState,
+ TransactionRecordFilter,
TransactionsRequest,
TransactionsResponse,
TransactionState,
@@ -153,6 +154,7 @@ import {
resumePeerPushDebitTransaction,
abortPeerPushDebitTransaction,
} from "./pay-peer-push-debit.js";
+import { iterRecordsForDeposit, iterRecordsForPeerPullDebit, iterRecordsForPeerPullInitiation, iterRecordsForPeerPushCredit, iterRecordsForPeerPushInitiation, iterRecordsForPurchase, iterRecordsForRefresh, iterRecordsForRefund, iterRecordsForReward, iterRecordsForWithdrawal } from "./pending.js";
const logger = new Logger("taler-wallet-core:transactions.ts");
@@ -929,6 +931,11 @@ export async function getTransactions(
): Promise<TransactionsResponse> {
const transactions: Transaction[] = [];
+ const filter: TransactionRecordFilter = {};
+ if (transactionsRequest?.filterByState) {
+ filter.onlyState = transactionsRequest.filterByState;
+ }
+
await ws.db
.mktx((x) => [
x.coins,
@@ -952,7 +959,7 @@ export async function getTransactions(
x.refundGroups,
])
.runReadOnly(async (tx) => {
- tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushInitiation(tx, filter, async (pi) => {
const amount = Amounts.parseOrThrow(pi.amount);
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
@@ -968,7 +975,7 @@ export async function getTransactions(
);
});
- tx.peerPullPaymentIncoming.iter().forEachAsync(async (pi) => {
+ await iterRecordsForPeerPullDebit(tx, filter, async (pi) => {
const amount = Amounts.parseOrThrow(pi.contractTerms.amount);
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
@@ -986,7 +993,7 @@ export async function getTransactions(
transactions.push(buildTransactionForPullPaymentDebit(pi));
});
- tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushCredit(tx, filter, async (pi) => {
if (!pi.currency) {
// Legacy transaction
return;
@@ -1026,8 +1033,8 @@ export async function getTransactions(
),
);
});
-
- tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
+
+ await iterRecordsForPeerPullInitiation(tx, filter, async (pi) => {
const currency = Amounts.currencyOf(pi.amount);
if (shouldSkipCurrency(transactionsRequest, currency)) {
return;
@@ -1060,7 +1067,7 @@ export async function getTransactions(
);
});
- tx.refundGroups.iter().forEachAsync(async (refundGroup) => {
+ await iterRecordsForRefund(tx, filter, async (refundGroup) => {
const currency = Amounts.currencyOf(refundGroup.amountRaw);
if (shouldSkipCurrency(transactionsRequest, currency)) {
return;
@@ -1071,8 +1078,8 @@ export async function getTransactions(
);
transactions.push(buildTransactionForRefund(refundGroup, contractData));
});
-
- tx.refreshGroups.iter().forEachAsync(async (rg) => {
+
+ await iterRecordsForRefresh(tx, filter, async (rg) => {
if (shouldSkipCurrency(transactionsRequest, rg.currency)) {
return;
}
@@ -1092,7 +1099,7 @@ export async function getTransactions(
}
});
- tx.withdrawalGroups.iter().forEachAsync(async (wsr) => {
+ await iterRecordsForWithdrawal(tx, filter ,async (wsr) => {
if (
shouldSkipCurrency(
transactionsRequest,
@@ -1146,7 +1153,7 @@ export async function getTransactions(
}
});
- tx.depositGroups.iter().forEachAsync(async (dg) => {
+ await iterRecordsForDeposit(tx, filter, async (dg) => {
const amount = Amounts.parseOrThrow(dg.contractTermsRaw.amount);
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
@@ -1157,7 +1164,7 @@ export async function getTransactions(
transactions.push(buildTransactionForDeposit(dg, retryRecord));
});
- tx.purchases.iter().forEachAsync(async (purchase) => {
+ await iterRecordsForPurchase(tx, filter, async (purchase) => {
const download = purchase.download;
if (!download) {
return;
@@ -1200,7 +1207,7 @@ export async function getTransactions(
);
});
- tx.rewards.iter().forEachAsync(async (tipRecord) => {
+ await iterRecordsForReward(tx, filter, async (tipRecord) => {
if (
shouldSkipCurrency(
transactionsRequest,
diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts
index 1de1e9a0d..527cbdf63 100644
--- a/packages/taler-wallet-core/src/util/query.ts
+++ b/packages/taler-wallet-core/src/util/query.ts
@@ -338,7 +338,7 @@ interface IndexReadOnlyAccessor<RecordType> {
iter(query?: IDBKeyRange | IDBValidKey): ResultStream<RecordType>;
get(query: IDBValidKey): Promise<RecordType | undefined>;
getAll(
- query: IDBKeyRange | IDBValidKey,
+ query?: IDBKeyRange | IDBValidKey,
count?: number,
): Promise<RecordType[]>;
}
@@ -351,7 +351,7 @@ interface IndexReadWriteAccessor<RecordType> {
iter(query: IDBKeyRange | IDBValidKey): ResultStream<RecordType>;
get(query: IDBValidKey): Promise<RecordType | undefined>;
getAll(
- query: IDBKeyRange | IDBValidKey,
+ query?: IDBKeyRange | IDBValidKey,
count?: number,
): Promise<RecordType[]>;
}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 796a96f14..8cd9bb8c3 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -479,6 +479,7 @@ async function runTaskLoop(
// Wait until either the timeout, or we are notified (via the latch)
// that more work might be available.
await Promise.race([timeout, ws.workAvailable.wait()]);
+ logger.trace(`done waiting for available work`);
} else {
logger.trace(
`running ${pending.pendingOperations.length} pending operations`,