diff options
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r-- | packages/taler-wallet-core/src/db.ts | 5 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/host-common.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/host-impl.node.ts | 69 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/host-impl.qtart.ts | 116 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/host.ts | 1 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/pending.ts | 350 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/testing.ts | 3 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/transactions.ts | 31 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/util/query.ts | 4 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 1 |
10 files changed, 453 insertions, 129 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index c7d0b0bda..1d0d3a6e5 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -119,7 +119,7 @@ export const CURRENT_DB_CONFIG_KEY = "currentMainDbName"; * backwards-compatible way or object stores and indices * are added. */ -export const WALLET_DB_MINOR_VERSION = 9; +export const WALLET_DB_MINOR_VERSION = 10; /** * Ranges for operation status fields. @@ -2675,6 +2675,9 @@ export const WalletStoresV1 = { }), { byProposalId: describeIndex("byProposalId", "proposalId"), + byStatus: describeIndex("byStatus", "status", { + versionAdded: 10, + }), }, ), refundItems: describeStore( diff --git a/packages/taler-wallet-core/src/host-common.ts b/packages/taler-wallet-core/src/host-common.ts index 21e7f1157..c56d7ed1c 100644 --- a/packages/taler-wallet-core/src/host-common.ts +++ b/packages/taler-wallet-core/src/host-common.ts @@ -16,7 +16,7 @@ import { WalletNotification } from "@gnu-taler/taler-util"; import { HttpRequestLibrary } from "@gnu-taler/taler-util/http"; -import { WalletConfig, WalletConfigParameter } from "./index.js"; +import { WalletConfigParameter } from "./index.js"; /** * Helpers to initiate a wallet in a host environment. diff --git a/packages/taler-wallet-core/src/host-impl.node.ts b/packages/taler-wallet-core/src/host-impl.node.ts index 150bba49a..ceda7243f 100644 --- a/packages/taler-wallet-core/src/host-impl.node.ts +++ b/packages/taler-wallet-core/src/host-impl.node.ts @@ -27,6 +27,7 @@ import type { IDBFactory } from "@gnu-taler/idb-bridge"; import { BridgeIDBFactory, MemoryBackend, + createSqliteBackend, shimIndexedDB, } from "@gnu-taler/idb-bridge"; import { AccessStats } from "@gnu-taler/idb-bridge"; @@ -39,24 +40,21 @@ import { createPlatformHttpLib } from "@gnu-taler/taler-util/http"; import { SetTimeoutTimerAPI } from "./util/timer.js"; import { Wallet } from "./wallet.js"; import { DefaultNodeWalletArgs, makeTempfileId } from "./host-common.js"; +import { createNodeSqlite3Impl } from "@gnu-taler/idb-bridge/node-sqlite3-bindings"; const logger = new Logger("host-impl.node.ts"); -/** - * Get a wallet instance with default settings for node. - * - * Extended version that allows getting DB stats. - */ -export async function createNativeWalletHost2( +interface MakeDbResult { + idbFactory: BridgeIDBFactory; + getStats: () => AccessStats; +} + +async function makeFileDb( args: DefaultNodeWalletArgs = {}, -): Promise<{ - wallet: Wallet; - getDbStats: () => AccessStats; -}> { +): Promise<MakeDbResult> { BridgeIDBFactory.enableTracing = false; const myBackend = new MemoryBackend(); myBackend.enableTracing = false; - const storagePath = args.persistentStoragePath; if (storagePath) { try { @@ -96,8 +94,41 @@ export async function createNativeWalletHost2( BridgeIDBFactory.enableTracing = false; const myBridgeIdbFactory = new BridgeIDBFactory(myBackend); - const myIdbFactory: IDBFactory = myBridgeIdbFactory as any as IDBFactory; + return { + idbFactory: myBridgeIdbFactory, + getStats: () => myBackend.accessStats, + }; +} +async function makeSqliteDb( + args: DefaultNodeWalletArgs, +): Promise<MakeDbResult> { + BridgeIDBFactory.enableTracing = false; + const imp = await createNodeSqlite3Impl(); + const myBackend = await createSqliteBackend(imp, { + filename: args.persistentStoragePath ?? ":memory:", + }); + myBackend.enableTracing = false; + const myBridgeIdbFactory = new BridgeIDBFactory(myBackend); + return { + getStats() { + throw Error("not implemented"); + }, + idbFactory: myBridgeIdbFactory, + }; +} + +/** + * Get a wallet instance with default settings for node. + * + * Extended version that allows getting DB stats. + */ +export async function createNativeWalletHost2( + args: DefaultNodeWalletArgs = {}, +): Promise<{ + wallet: Wallet; + getDbStats: () => AccessStats; +}> { let myHttpLib; if (args.httpLib) { myHttpLib = args.httpLib; @@ -115,7 +146,17 @@ export async function createNativeWalletHost2( ); }; - shimIndexedDB(myBridgeIdbFactory); + let dbResp: MakeDbResult; + + if (!args.persistentStoragePath || args.persistentStoragePath.endsWith(".json")) { + dbResp = await makeFileDb(args); + } else { + dbResp = await makeSqliteDb(args); + } + + const myIdbFactory: IDBFactory = dbResp.idbFactory as any as IDBFactory; + + shimIndexedDB(dbResp.idbFactory); const myDb = await openTalerDatabase(myIdbFactory, myVersionChange); @@ -158,6 +199,6 @@ export async function createNativeWalletHost2( } return { wallet: w, - getDbStats: () => myBackend.accessStats, + getDbStats: dbResp.getStats, }; } diff --git a/packages/taler-wallet-core/src/host-impl.qtart.ts b/packages/taler-wallet-core/src/host-impl.qtart.ts index d10914b10..390282f8c 100644 --- a/packages/taler-wallet-core/src/host-impl.qtart.ts +++ b/packages/taler-wallet-core/src/host-impl.qtart.ts @@ -22,11 +22,17 @@ /** * Imports. */ -import type { IDBFactory } from "@gnu-taler/idb-bridge"; +import type { + IDBFactory, + ResultRow, + Sqlite3Interface, + Sqlite3Statement, +} from "@gnu-taler/idb-bridge"; // eslint-disable-next-line no-duplicate-imports import { BridgeIDBFactory, MemoryBackend, + createSqliteBackend, shimIndexedDB, } from "@gnu-taler/idb-bridge"; import { AccessStats } from "@gnu-taler/idb-bridge"; @@ -41,12 +47,78 @@ import { DefaultNodeWalletArgs, makeTempfileId } from "./host-common.js"; const logger = new Logger("host-impl.qtart.ts"); -export async function createNativeWalletHost2( +interface MakeDbResult { + idbFactory: BridgeIDBFactory; + getStats: () => AccessStats; +} + +let numStmt = 0; + +export async function createQtartSqlite3Impl(): Promise<Sqlite3Interface> { + const tart: any = (globalThis as any)._tart; + if (!tart) { + throw Error("globalThis._qtart not defined"); + } + return { + open(filename: string) { + const internalDbHandle = tart.sqlite3Open(filename); + return { + internalDbHandle, + close() { + tart.sqlite3Close(internalDbHandle); + }, + prepare(stmtStr): Sqlite3Statement { + const stmtHandle = tart.sqlite3Prepare(internalDbHandle, stmtStr); + return { + internalStatement: stmtHandle, + getAll(params): ResultRow[] { + numStmt++; + return tart.sqlite3StmtGetAll(stmtHandle, params); + }, + getFirst(params): ResultRow | undefined { + numStmt++; + return tart.sqlite3StmtGetFirst(stmtHandle, params); + }, + run(params) { + numStmt++; + return tart.sqlite3StmtRun(stmtHandle, params); + }, + }; + }, + exec(sqlStr): void { + numStmt++; + tart.sqlite3Exec(internalDbHandle, sqlStr); + }, + }; + }, + }; +} + +async function makeSqliteDb( + args: DefaultNodeWalletArgs, +): Promise<MakeDbResult> { + BridgeIDBFactory.enableTracing = false; + const imp = await createQtartSqlite3Impl(); + const myBackend = await createSqliteBackend(imp, { + filename: args.persistentStoragePath ?? ":memory:", + }); + myBackend.trackStats = true; + myBackend.enableTracing = false; + const myBridgeIdbFactory = new BridgeIDBFactory(myBackend); + return { + getStats() { + return { + ...myBackend.accessStats, + primitiveStatements: numStmt, + } + }, + idbFactory: myBridgeIdbFactory, + }; +} + +async function makeFileDb( args: DefaultNodeWalletArgs = {}, -): Promise<{ - wallet: Wallet; - getDbStats: () => AccessStats; -}> { +): Promise<MakeDbResult> { BridgeIDBFactory.enableTracing = false; const myBackend = new MemoryBackend(); myBackend.enableTracing = false; @@ -78,12 +150,34 @@ export async function createNativeWalletHost2( }; } - logger.info("done processing storage path"); + const myBridgeIdbFactory = new BridgeIDBFactory(myBackend); + return { + idbFactory: myBridgeIdbFactory, + getStats: () => myBackend.accessStats, + }; +} +export async function createNativeWalletHost2( + args: DefaultNodeWalletArgs = {}, +): Promise<{ + wallet: Wallet; + getDbStats: () => AccessStats; +}> { BridgeIDBFactory.enableTracing = false; - const myBridgeIdbFactory = new BridgeIDBFactory(myBackend); - const myIdbFactory: IDBFactory = myBridgeIdbFactory as any as IDBFactory; + let dbResp: MakeDbResult; + + if (args.persistentStoragePath && args.persistentStoragePath.endsWith(".json")) { + logger.info("using JSON file backend (slow!)"); + dbResp = await makeFileDb(args); + } else { + logger.info("using JSON file backend (experimental!)"); + dbResp = await makeSqliteDb(args) + } + + const myIdbFactory: IDBFactory = dbResp.idbFactory as any as IDBFactory; + + shimIndexedDB(dbResp.idbFactory); let myHttpLib; if (args.httpLib) { @@ -102,8 +196,6 @@ export async function createNativeWalletHost2( ); }; - shimIndexedDB(myBridgeIdbFactory); - const myDb = await openTalerDatabase(myIdbFactory, myVersionChange); let workerFactory; @@ -124,6 +216,6 @@ export async function createNativeWalletHost2( } return { wallet: w, - getDbStats: () => myBackend.accessStats, + getDbStats: dbResp.getStats, }; } diff --git a/packages/taler-wallet-core/src/host.ts b/packages/taler-wallet-core/src/host.ts index 4b319f081..feccf42a6 100644 --- a/packages/taler-wallet-core/src/host.ts +++ b/packages/taler-wallet-core/src/host.ts @@ -16,7 +16,6 @@ import { DefaultNodeWalletArgs } from "./host-common.js"; import { Wallet } from "./index.js"; - import * as hostImpl from "#host-impl"; import { AccessStats } from "@gnu-taler/idb-bridge"; diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index cc9217d67..6c6546f83 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -34,13 +34,24 @@ import { WithdrawalGroupStatus, RewardRecordStatus, DepositOperationStatus, + RefreshGroupRecord, + WithdrawalGroupRecord, + DepositGroupRecord, + RewardRecord, + PurchaseRecord, + PeerPullPaymentInitiationRecord, + PeerPullPaymentIncomingRecord, + PeerPushPaymentInitiationRecord, + PeerPushPaymentIncomingRecord, + RefundGroupRecord, + RefundGroupStatus, } from "../db.js"; import { PendingOperationsResponse, PendingTaskType, TaskId, } from "../pending-types.js"; -import { AbsoluteTime } from "@gnu-taler/taler-util"; +import { AbsoluteTime, TransactionRecordFilter } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; import { GlobalIDB } from "@gnu-taler/idb-bridge"; @@ -105,6 +116,32 @@ async function gatherExchangePending( }); } +/** + * Iterate refresh records based on a filter. + */ +export async function iterRecordsForRefresh( + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefreshGroupRecord) => Promise<void>, +): Promise<void> { + let refreshGroups: RefreshGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OperationStatusRange.ACTIVE_START, + OperationStatusRange.ACTIVE_END, + ); + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange); + } else { + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(); + } + + for (const r of refreshGroups) { + await f(r); + } +} + async function gatherRefreshPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -114,22 +151,13 @@ async function gatherRefreshPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - OperationStatusRange.ACTIVE_START, - OperationStatusRange.ACTIVE_END, - ); - const refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll( - keyRange, - ); - for (const r of refreshGroups) { + await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => { if (r.timestampFinished) { return; } const opId = TaskIdentifiers.forRefresh(r); const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); - resp.pendingOperations.push({ type: PendingTaskType.Refresh, ...getPendingCommon(ws, opId, timestampDue), @@ -140,6 +168,30 @@ async function gatherRefreshPending( ), retryInfo: retryRecord?.retryInfo, }); + }); +} + +export async function iterRecordsForWithdrawal( + tx: GetReadOnlyAccess<{ + withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; + }>, + filter: TransactionRecordFilter, + f: (r: WithdrawalGroupRecord) => Promise<void>, +): Promise<void> { + let withdrawalGroupRecords: WithdrawalGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const range = GlobalIDB.KeyRange.bound( + WithdrawalGroupStatus.PendingRegisteringBank, + WithdrawalGroupStatus.PendingAml, + ); + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(range); + } else { + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(); + } + for (const wgr of withdrawalGroupRecords) { + await f(wgr); } } @@ -153,12 +205,7 @@ async function gatherWithdrawalPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const range = GlobalIDB.KeyRange.bound( - WithdrawalGroupStatus.PendingRegisteringBank, - WithdrawalGroupStatus.PendingAml, - ); - const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(range); - for (const wsr of wsrs) { + await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => { const opTag = TaskIdentifiers.forWithdrawal(wsr); let opr = await tx.operationRetries.get(opTag); const now = AbsoluteTime.now(); @@ -184,6 +231,30 @@ async function gatherWithdrawalPending( lastError: opr.lastError, retryInfo: opr.retryInfo, }); + }); +} + +export async function iterRecordsForDeposit( + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + }>, + filter: TransactionRecordFilter, + f: (r: DepositGroupRecord) => Promise<void>, +): Promise<void> { + let dgs: DepositGroupRecord[]; + if (filter.onlyState === "nonfinal") { + dgs = await tx.depositGroups.indexes.byStatus.getAll( + GlobalIDB.KeyRange.bound( + DepositOperationStatus.PendingDeposit, + DepositOperationStatus.PendingKyc, + ), + ); + } else { + dgs = await tx.depositGroups.indexes.byStatus.getAll(); + } + + for (const dg of dgs) { + await f(dg); } } @@ -196,16 +267,7 @@ async function gatherDepositPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const dgs = await tx.depositGroups.indexes.byStatus.getAll( - GlobalIDB.KeyRange.bound( - DepositOperationStatus.PendingDeposit, - DepositOperationStatus.PendingKyc, - ), - ); - for (const dg of dgs) { - if (dg.timestampFinished) { - return; - } + await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => { let deposited = true; for (const d of dg.depositedPerCoin) { if (!d) { @@ -226,10 +288,28 @@ async function gatherDepositPending( lastError: retryRecord?.lastError, retryInfo: retryRecord?.retryInfo, }); + }); +} + +export async function iterRecordsForReward( + tx: GetReadOnlyAccess<{ + rewards: typeof WalletStoresV1.rewards; + }>, + filter: TransactionRecordFilter, + f: (r: RewardRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const range = GlobalIDB.KeyRange.bound( + RewardRecordStatus.PendingPickup, + RewardRecordStatus.PendingPickup, + ); + await tx.rewards.indexes.byStatus.iter(range).forEachAsync(f); + } else { + await tx.rewards.indexes.byStatus.iter().forEachAsync(f); } } -async function gatherTipPending( +async function gatherRewardPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ rewards: typeof WalletStoresV1.rewards; @@ -238,15 +318,7 @@ async function gatherTipPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const range = GlobalIDB.KeyRange.bound( - RewardRecordStatus.PendingPickup, - RewardRecordStatus.PendingPickup, - ); - await tx.rewards.indexes.byStatus.iter(range).forEachAsync(async (tip) => { - // FIXME: The tip record needs a proper status field! - if (tip.pickedUpTimestamp) { - return; - } + await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => { const opId = TaskIdentifiers.forTipPickup(tip); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -264,6 +336,43 @@ async function gatherTipPending( }); } +export async function iterRecordsForRefund( + tx: GetReadOnlyAccess<{ + refundGroups: typeof WalletStoresV1.refundGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefundGroupRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.only( + RefundGroupStatus.Pending + ); + await tx.refundGroups.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.refundGroups.iter().forEachAsync(f); + } +} + +export async function iterRecordsForPurchase( + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + }>, + filter: TransactionRecordFilter, + f: (r: PurchaseRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PurchaseStatus.PendingDownloadingProposal, + PurchaseStatus.PendingAcceptRefund, + ); + await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.purchases.indexes.byStatus.iter().forEachAsync(f); + } +} + async function gatherPurchasePending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -273,27 +382,20 @@ async function gatherPurchasePending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PurchaseStatus.PendingDownloadingProposal, - PurchaseStatus.PendingAcceptRefund, - ); - await tx.purchases.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pr) => { - const opId = TaskIdentifiers.forPay(pr); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Purchase, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - statusStr: PurchaseStatus[pr.purchaseStatus], - proposalId: pr.proposalId, - retryInfo: retryRecord?.retryInfo, - lastError: retryRecord?.lastError, - }); + await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => { + const opId = TaskIdentifiers.forPay(pr); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); + resp.pendingOperations.push({ + type: PendingTaskType.Purchase, + ...getPendingCommon(ws, opId, timestampDue), + givesLifeness: true, + statusStr: PurchaseStatus[pr.purchaseStatus], + proposalId: pr.proposalId, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); + }); } async function gatherRecoupPending( @@ -362,6 +464,26 @@ async function gatherBackupPending( }); } +export async function iterRecordsForPeerPullInitiation( + tx: GetReadOnlyAccess<{ + peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentInitiationRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPullPaymentInitiationStatus.PendingCreatePurse, + PeerPullPaymentInitiationStatus.AbortingDeletePurse, + ); + await tx.peerPullPaymentInitiations.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPullPaymentInitiations.indexes.byStatus.iter().forEachAsync(f); + } +} + async function gatherPeerPullInitiationPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -371,13 +493,10 @@ async function gatherPeerPullInitiationPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPullPaymentInitiationStatus.PendingCreatePurse, - PeerPullPaymentInitiationStatus.AbortingDeletePurse, - ); - await tx.peerPullPaymentInitiations.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPullInitiation( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -389,7 +508,28 @@ async function gatherPeerPullInitiationPending( retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPullDebit( + tx: GetReadOnlyAccess<{ + peerPullPaymentIncoming: typeof WalletStoresV1.peerPullPaymentIncoming; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentIncomingRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPullDebitRecordStatus.PendingDeposit, + PeerPullDebitRecordStatus.AbortingRefresh, + ); + await tx.peerPullPaymentIncoming.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPullPaymentIncoming.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPullDebitPending( @@ -401,13 +541,10 @@ async function gatherPeerPullDebitPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPullDebitRecordStatus.PendingDeposit, - PeerPullDebitRecordStatus.AbortingRefresh, - ); - await tx.peerPullPaymentIncoming.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPullDebit( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -419,7 +556,28 @@ async function gatherPeerPullDebitPending( retryInfo: retryRecord?.retryInfo, peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPushInitiation( + tx: GetReadOnlyAccess<{ + peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentInitiationRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPushPaymentInitiationStatus.PendingCreatePurse, + PeerPushPaymentInitiationStatus.AbortingRefresh, + ); + await tx.peerPushPaymentInitiations.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPushPaymentInitiations.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPushInitiationPending( @@ -431,13 +589,10 @@ async function gatherPeerPushInitiationPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPushPaymentInitiationStatus.PendingCreatePurse, - PeerPushPaymentInitiationStatus.AbortingRefresh, - ); - await tx.peerPushPaymentInitiations.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPushInitiation( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -449,7 +604,28 @@ async function gatherPeerPushInitiationPending( retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPushCredit( + tx: GetReadOnlyAccess<{ + peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentIncomingRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPushPaymentIncomingStatus.PendingMerge, + PeerPushPaymentIncomingStatus.PendingWithdrawing, + ); + await tx.peerPushPaymentIncoming.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPushPaymentIncoming.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPushCreditPending( @@ -465,9 +641,10 @@ async function gatherPeerPushCreditPending( PeerPushPaymentIncomingStatus.PendingMerge, PeerPushPaymentIncomingStatus.PendingWithdrawing, ); - await tx.peerPushPaymentIncoming.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPushCredit( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPushCredit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -479,7 +656,8 @@ async function gatherPeerPushCreditPending( retryInfo: retryRecord?.retryInfo, peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId, }); - }); + }, + ); } export async function getPendingOperations( @@ -513,7 +691,7 @@ export async function getPendingOperations( await gatherRefreshPending(ws, tx, now, resp); await gatherWithdrawalPending(ws, tx, now, resp); await gatherDepositPending(ws, tx, now, resp); - await gatherTipPending(ws, tx, now, resp); + await gatherRewardPending(ws, tx, now, resp); await gatherPurchasePending(ws, tx, now, resp); await gatherRecoupPending(ws, tx, now, resp); await gatherBackupPending(ws, tx, now, resp); diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index ea373e914..3090549d5 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -472,12 +472,15 @@ export async function waitUntilDone(ws: InternalWalletState): Promise<void> { p = openPromise(); const txs = await getTransactions(ws, { includeRefreshes: true, + filterByState: "nonfinal", }); let finished = true; for (const tx of txs.transactions) { switch (tx.txState.major) { case TransactionMajorState.Pending: case TransactionMajorState.Aborting: + case TransactionMajorState.Suspended: + case TransactionMajorState.SuspendedAborting: finished = false; logger.info( `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index a16809b36..af04cb161 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -36,6 +36,7 @@ import { TransactionByIdRequest, TransactionIdStr, TransactionMajorState, + TransactionRecordFilter, TransactionsRequest, TransactionsResponse, TransactionState, @@ -153,6 +154,7 @@ import { resumePeerPushDebitTransaction, abortPeerPushDebitTransaction, } from "./pay-peer-push-debit.js"; +import { iterRecordsForDeposit, iterRecordsForPeerPullDebit, iterRecordsForPeerPullInitiation, iterRecordsForPeerPushCredit, iterRecordsForPeerPushInitiation, iterRecordsForPurchase, iterRecordsForRefresh, iterRecordsForRefund, iterRecordsForReward, iterRecordsForWithdrawal } from "./pending.js"; const logger = new Logger("taler-wallet-core:transactions.ts"); @@ -929,6 +931,11 @@ export async function getTransactions( ): Promise<TransactionsResponse> { const transactions: Transaction[] = []; + const filter: TransactionRecordFilter = {}; + if (transactionsRequest?.filterByState) { + filter.onlyState = transactionsRequest.filterByState; + } + await ws.db .mktx((x) => [ x.coins, @@ -952,7 +959,7 @@ export async function getTransactions( x.refundGroups, ]) .runReadOnly(async (tx) => { - tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPushInitiation(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { @@ -968,7 +975,7 @@ export async function getTransactions( ); }); - tx.peerPullPaymentIncoming.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPullDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.contractTerms.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; @@ -986,7 +993,7 @@ export async function getTransactions( transactions.push(buildTransactionForPullPaymentDebit(pi)); }); - tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPushCredit(tx, filter, async (pi) => { if (!pi.currency) { // Legacy transaction return; @@ -1026,8 +1033,8 @@ export async function getTransactions( ), ); }); - - tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => { + + await iterRecordsForPeerPullInitiation(tx, filter, async (pi) => { const currency = Amounts.currencyOf(pi.amount); if (shouldSkipCurrency(transactionsRequest, currency)) { return; @@ -1060,7 +1067,7 @@ export async function getTransactions( ); }); - tx.refundGroups.iter().forEachAsync(async (refundGroup) => { + await iterRecordsForRefund(tx, filter, async (refundGroup) => { const currency = Amounts.currencyOf(refundGroup.amountRaw); if (shouldSkipCurrency(transactionsRequest, currency)) { return; @@ -1071,8 +1078,8 @@ export async function getTransactions( ); transactions.push(buildTransactionForRefund(refundGroup, contractData)); }); - - tx.refreshGroups.iter().forEachAsync(async (rg) => { + + await iterRecordsForRefresh(tx, filter, async (rg) => { if (shouldSkipCurrency(transactionsRequest, rg.currency)) { return; } @@ -1092,7 +1099,7 @@ export async function getTransactions( } }); - tx.withdrawalGroups.iter().forEachAsync(async (wsr) => { + await iterRecordsForWithdrawal(tx, filter ,async (wsr) => { if ( shouldSkipCurrency( transactionsRequest, @@ -1146,7 +1153,7 @@ export async function getTransactions( } }); - tx.depositGroups.iter().forEachAsync(async (dg) => { + await iterRecordsForDeposit(tx, filter, async (dg) => { const amount = Amounts.parseOrThrow(dg.contractTermsRaw.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; @@ -1157,7 +1164,7 @@ export async function getTransactions( transactions.push(buildTransactionForDeposit(dg, retryRecord)); }); - tx.purchases.iter().forEachAsync(async (purchase) => { + await iterRecordsForPurchase(tx, filter, async (purchase) => { const download = purchase.download; if (!download) { return; @@ -1200,7 +1207,7 @@ export async function getTransactions( ); }); - tx.rewards.iter().forEachAsync(async (tipRecord) => { + await iterRecordsForReward(tx, filter, async (tipRecord) => { if ( shouldSkipCurrency( transactionsRequest, diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts index 1de1e9a0d..527cbdf63 100644 --- a/packages/taler-wallet-core/src/util/query.ts +++ b/packages/taler-wallet-core/src/util/query.ts @@ -338,7 +338,7 @@ interface IndexReadOnlyAccessor<RecordType> { iter(query?: IDBKeyRange | IDBValidKey): ResultStream<RecordType>; get(query: IDBValidKey): Promise<RecordType | undefined>; getAll( - query: IDBKeyRange | IDBValidKey, + query?: IDBKeyRange | IDBValidKey, count?: number, ): Promise<RecordType[]>; } @@ -351,7 +351,7 @@ interface IndexReadWriteAccessor<RecordType> { iter(query: IDBKeyRange | IDBValidKey): ResultStream<RecordType>; get(query: IDBValidKey): Promise<RecordType | undefined>; getAll( - query: IDBKeyRange | IDBValidKey, + query?: IDBKeyRange | IDBValidKey, count?: number, ): Promise<RecordType[]>; } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 796a96f14..8cd9bb8c3 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -479,6 +479,7 @@ async function runTaskLoop( // Wait until either the timeout, or we are notified (via the latch) // that more work might be available. await Promise.race([timeout, ws.workAvailable.wait()]); + logger.trace(`done waiting for available work`); } else { logger.trace( `running ${pending.pendingOperations.length} pending operations`, |