From 70a803038f1cbe05dc4779bdd87376fd073421be Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 13 Feb 2024 10:53:43 +0100 Subject: implement task shepherd, many small fixes and tweaks --- .../src/operations/transactions.ts | 442 +++++++++++++++------ 1 file changed, 315 insertions(+), 127 deletions(-) (limited to 'packages/taler-wallet-core/src/operations/transactions.ts') diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index 3b4e75427..10e018d23 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -17,6 +17,7 @@ /** * Imports. */ +import { GlobalIDB } from "@gnu-taler/idb-bridge"; import { AbsoluteTime, Amounts, @@ -69,18 +70,19 @@ import { } from "../db.js"; import { GetReadOnlyAccess, + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, PeerPushDebitStatus, timestampPreciseFromDb, timestampProtocolFromDb, WalletStoresV1, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; import { constructTaskIdentifier, - resetPendingTaskTimeout, TaskIdentifiers, TransactionContext, } from "./common.js"; @@ -122,18 +124,6 @@ import { computePeerPushDebitTransactionState, PeerPushDebitTransactionContext, } from "./pay-peer-push-debit.js"; -import { - iterRecordsForDeposit, - iterRecordsForPeerPullInitiation as iterRecordsForPeerPullCredit, - iterRecordsForPeerPullDebit, - iterRecordsForPeerPushCredit, - iterRecordsForPeerPushInitiation as iterRecordsForPeerPushDebit, - iterRecordsForPurchase, - iterRecordsForRefresh, - iterRecordsForRefund, - iterRecordsForReward, - iterRecordsForWithdrawal, -} from "./pending.js"; import { computeRefreshTransactionActions, computeRefreshTransactionState, @@ -159,25 +149,32 @@ function shouldSkipCurrency( exchangesInTransaction: string[], ): boolean { if (transactionsRequest?.scopeInfo) { - const sameCurrency = transactionsRequest.scopeInfo.currency.toLowerCase() === currency.toLowerCase() + const sameCurrency = + transactionsRequest.scopeInfo.currency.toLowerCase() === + currency.toLowerCase(); switch (transactionsRequest.scopeInfo.type) { case ScopeType.Global: { - return !sameCurrency + return !sameCurrency; } case ScopeType.Exchange: { - const exchangeInvolveInTransaction = exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== -1 - return !sameCurrency || !exchangeInvolveInTransaction + const exchangeInvolveInTransaction = + exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== + -1; + return !sameCurrency || !exchangeInvolveInTransaction; } case ScopeType.Auditor: { // same currency and same auditor - throw Error("filering balance in auditor scope is not implemented") + throw Error("filering balance in auditor scope is not implemented"); } - default: assertUnreachable(transactionsRequest.scopeInfo) + default: + assertUnreachable(transactionsRequest.scopeInfo); } } // FIXME: remove next release if (transactionsRequest?.currency) { - return transactionsRequest.currency.toLowerCase() !== currency.toLowerCase(); + return ( + transactionsRequest.currency.toLowerCase() !== currency.toLowerCase() + ); } return false; } @@ -565,7 +562,7 @@ function buildTransactionForPeerPullCredit( const silentWithdrawalErrorForInvoice = wsrOrt?.lastError && wsrOrt.lastError.code === - TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE && + TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE && Object.values(wsrOrt.lastError.errorsPerCoin ?? {}).every((e) => { return ( e.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR && @@ -598,10 +595,10 @@ function buildTransactionForPeerPullCredit( kycUrl: pullCredit.kycUrl, ...(wsrOrt?.lastError ? { - error: silentWithdrawalErrorForInvoice - ? undefined - : wsrOrt.lastError, - } + error: silentWithdrawalErrorForInvoice + ? undefined + : wsrOrt.lastError, + } : {}), }; } @@ -1118,8 +1115,14 @@ export async function getTransactions( .runReadOnly(async (tx) => { await iterRecordsForPeerPushDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1134,8 +1137,14 @@ export async function getTransactions( await iterRecordsForPeerPullDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1169,8 +1178,10 @@ export async function getTransactions( // Legacy transaction return; } - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1208,7 +1219,7 @@ export async function getTransactions( await iterRecordsForPeerPullCredit(tx, filter, async (pi) => { const currency = Amounts.currencyOf(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] + const exchangesInTx = [pi.exchangeBaseUrl]; if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) { return; } @@ -1243,16 +1254,16 @@ export async function getTransactions( await iterRecordsForRefund(tx, filter, async (refundGroup) => { const currency = Amounts.currencyOf(refundGroup.amountRaw); - const exchangesInTx: string[] = [] - const p = await tx.purchases.get(refundGroup.proposalId) + const exchangesInTx: string[] = []; + const p = await tx.purchases.get(refundGroup.proposalId); if (!p || !p.payInfo) return; //refund with no payment p.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => { - const c = await tx.coins.get(cp) + const c = await tx.coins.get(cp); if (c?.exchangeBaseUrl) { - exchangesInTx.push(c.exchangeBaseUrl) + exchangesInTx.push(c.exchangeBaseUrl); } - }) + }); if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) { return; @@ -1265,8 +1276,12 @@ export async function getTransactions( }); await iterRecordsForRefresh(tx, filter, async (rg) => { - const exchangesInTx = rg.infoPerExchange ? Object.keys(rg.infoPerExchange) : [] - if (shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx)) { + const exchangesInTx = rg.infoPerExchange + ? Object.keys(rg.infoPerExchange) + : []; + if ( + shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx) + ) { return; } let required = false; @@ -1286,7 +1301,7 @@ export async function getTransactions( }); await iterRecordsForWithdrawal(tx, filter, async (wsr) => { - const exchangesInTx = [wsr.exchangeBaseUrl] + const exchangesInTx = [wsr.exchangeBaseUrl]; if ( shouldSkipCurrency( transactionsRequest, @@ -1343,8 +1358,16 @@ export async function getTransactions( await iterRecordsForDeposit(tx, filter, async (dg) => { const amount = Amounts.parseOrThrow(dg.amount); - const exchangesInTx = dg.infoPerExchange ? Object.keys(dg.infoPerExchange) : [] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = dg.infoPerExchange + ? Object.keys(dg.infoPerExchange) + : []; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } const opId = TaskIdentifiers.forDeposit(dg); @@ -1362,15 +1385,21 @@ export async function getTransactions( return; } - const exchangesInTx: string[] = [] + const exchangesInTx: string[] = []; purchase.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => { - const c = await tx.coins.get(cp) + const c = await tx.coins.get(cp); if (c?.exchangeBaseUrl) { - exchangesInTx.push(c.exchangeBaseUrl) + exchangesInTx.push(c.exchangeBaseUrl); } - }) + }); - if (shouldSkipCurrency(transactionsRequest, download.currency, exchangesInTx)) { + if ( + shouldSkipCurrency( + transactionsRequest, + download.currency, + exchangesInTx, + ) + ) { return; } const contractTermsRecord = await tx.contractTerms.get( @@ -1429,7 +1458,6 @@ export async function getTransactions( transactions.push(buildTransactionForTip(tipRecord, retryRecord)); }); //ends REMOVE REWARDS - }); // One-off checks, because of a bug where the wallet previously @@ -1587,25 +1615,7 @@ export function parseTransactionIdentifier( } } -export function stopLongpolling(ws: InternalWalletState, taskId: string) { - const longpoll = ws.activeLongpoll[taskId]; - if (longpoll) { - logger.info(`cancelling long-polling for ${taskId}`); - longpoll.cancel(); - delete ws.activeLongpoll[taskId]; - } -} - -/** - * Immediately retry the underlying operation - * of a transaction. - */ -export async function retryTransaction( - ws: InternalWalletState, - transactionId: string, -): Promise { - logger.info(`resetting retry timeout for ${transactionId}`); - +function maybeTaskFromTransaction(transactionId: string): TaskId | undefined { const parsedTx = parseTransactionIdentifier(transactionId); if (!parsedTx) { @@ -1615,100 +1625,80 @@ export async function retryTransaction( // FIXME: We currently don't cancel active long-polling tasks here. switch (parsedTx.tag) { - case TransactionType.PeerPullCredit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPullCredit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPullCredit, pursePub: parsedTx.pursePub, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Deposit: { - const taskId = constructTaskIdentifier({ + case TransactionType.Deposit: + return constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId: parsedTx.depositGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } case TransactionType.InternalWithdrawal: - case TransactionType.Withdrawal: { - // FIXME: Abort current long-poller! - const taskId = constructTaskIdentifier({ + case TransactionType.Withdrawal: + return constructTaskIdentifier({ tag: PendingTaskType.Withdraw, withdrawalGroupId: parsedTx.withdrawalGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Payment: { - const taskId = constructTaskIdentifier({ + case TransactionType.Payment: + return constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId: parsedTx.proposalId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Reward: { - const taskId = constructTaskIdentifier({ + case TransactionType.Reward: + return constructTaskIdentifier({ tag: PendingTaskType.RewardPickup, walletRewardId: parsedTx.walletRewardId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Refresh: { - const taskId = constructTaskIdentifier({ + case TransactionType.Refresh: + return constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId: parsedTx.refreshGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPullDebit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPullDebit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPullDebit, peerPullDebitId: parsedTx.peerPullDebitId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPushCredit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPushCredit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPushCredit, peerPushCreditId: parsedTx.peerPushCreditId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPushDebit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPushDebit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub: parsedTx.pursePub, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } case TransactionType.Refund: // Nothing to do for a refund transaction. - break; + return undefined; case TransactionType.Recoup: - // FIXME! - throw Error("not implemented"); + return constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: parsedTx.recoupGroupId, + }); default: assertUnreachable(parsedTx); } } +/** + * Immediately retry the underlying operation + * of a transaction. + */ +export async function retryTransaction( + ws: InternalWalletState, + transactionId: string, +): Promise { + logger.info(`resetting retry timeout for ${transactionId}`); + const taskId = maybeTaskFromTransaction(transactionId); + if (taskId) { + ws.taskScheduler.resetTaskRetries(taskId); + } +} + async function getContextForTransaction( ws: InternalWalletState, transactionId: string, @@ -1828,5 +1818,203 @@ export function notifyTransition( experimentalUserData, }); } - ws.workAvailable.trigger(); +} + +/** + * Iterate refresh records based on a filter. + */ +async function iterRecordsForRefresh( + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefreshGroupRecord) => Promise, +): Promise { + let refreshGroups: RefreshGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + RefreshOperationStatus.Pending, + RefreshOperationStatus.Suspended, + ); + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange); + } else { + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(); + } + + for (const r of refreshGroups) { + await f(r); + } +} + +async function iterRecordsForWithdrawal( + tx: GetReadOnlyAccess<{ + withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; + }>, + filter: TransactionRecordFilter, + f: (r: WithdrawalGroupRecord) => Promise, +): Promise { + let withdrawalGroupRecords: WithdrawalGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange); + } else { + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(); + } + for (const wgr of withdrawalGroupRecords) { + await f(wgr); + } +} + +async function iterRecordsForDeposit( + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + }>, + filter: TransactionRecordFilter, + f: (r: DepositGroupRecord) => Promise, +): Promise { + let dgs: DepositGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange); + } else { + dgs = await tx.depositGroups.indexes.byStatus.getAll(); + } + + for (const dg of dgs) { + await f(dg); + } +} + +async function iterRecordsForReward( + tx: GetReadOnlyAccess<{ + rewards: typeof WalletStoresV1.rewards; + }>, + filter: TransactionRecordFilter, + f: (r: RewardRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.rewards.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForRefund( + tx: GetReadOnlyAccess<{ + refundGroups: typeof WalletStoresV1.refundGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefundGroupRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.refundGroups.iter().forEachAsync(f); + } +} + +async function iterRecordsForPurchase( + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + }>, + filter: TransactionRecordFilter, + f: (r: PurchaseRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.purchases.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPullCredit( + tx: GetReadOnlyAccess<{ + peerPullCredit: typeof WalletStoresV1.peerPullCredit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullCreditRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPullDebit( + tx: GetReadOnlyAccess<{ + peerPullDebit: typeof WalletStoresV1.peerPullDebit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentIncomingRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPushDebit( + tx: GetReadOnlyAccess<{ + peerPushDebit: typeof WalletStoresV1.peerPushDebit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushDebitRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPushCredit( + tx: GetReadOnlyAccess<{ + peerPushCredit: typeof WalletStoresV1.peerPushCredit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentIncomingRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f); + } } -- cgit v1.2.3