From 70a803038f1cbe05dc4779bdd87376fd073421be Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 13 Feb 2024 10:53:43 +0100 Subject: implement task shepherd, many small fixes and tweaks --- packages/taler-harness/src/harness/harness.ts | 11 + .../src/integrationtests/test-payment-fault.ts | 82 +- .../src/integrationtests/test-payment-share.ts | 33 +- .../src/integrationtests/test-peer-repair.ts | 4 +- .../src/integrationtests/test-peer-to-peer-pull.ts | 1 - .../test-timetravel-autorefresh.ts | 19 +- packages/taler-util/src/time.ts | 23 +- packages/taler-util/src/wallet-types.ts | 12 +- packages/taler-wallet-cli/src/index.ts | 11 +- .../taler-wallet-core/src/internal-wallet-state.ts | 23 +- .../taler-wallet-core/src/operations/README.md | 7 - .../src/operations/backup/index.ts | 165 ++-- .../taler-wallet-core/src/operations/common.ts | 466 ++--------- .../taler-wallet-core/src/operations/deposits.ts | 140 ++-- .../taler-wallet-core/src/operations/exchanges.ts | 468 ++++++++--- .../src/operations/pay-merchant.ts | 552 +++++++------ .../src/operations/pay-peer-pull-credit.ts | 167 ++-- .../src/operations/pay-peer-pull-debit.ts | 33 +- .../src/operations/pay-peer-push-credit.ts | 123 ++- .../src/operations/pay-peer-push-debit.ts | 231 +++--- .../taler-wallet-core/src/operations/pending.ts | 814 -------------------- .../taler-wallet-core/src/operations/refresh.ts | 155 +--- .../taler-wallet-core/src/operations/reward.ts | 6 - .../taler-wallet-core/src/operations/testing.ts | 41 +- .../src/operations/transactions.ts | 442 ++++++++--- .../taler-wallet-core/src/operations/withdraw.ts | 233 +++--- packages/taler-wallet-core/src/pending-types.ts | 10 - packages/taler-wallet-core/src/shepherd.ts | 851 +++++++++++++++++++++ .../taler-wallet-core/src/util/coinSelection.ts | 3 +- .../taler-wallet-core/src/util/promiseUtils.ts | 40 + packages/taler-wallet-core/src/util/query.ts | 2 +- packages/taler-wallet-core/src/wallet-api-types.ts | 15 - packages/taler-wallet-core/src/wallet.ts | 244 +----- 33 files changed, 2589 insertions(+), 2838 deletions(-) delete mode 100644 packages/taler-wallet-core/src/operations/README.md delete mode 100644 packages/taler-wallet-core/src/operations/pending.ts create mode 100644 packages/taler-wallet-core/src/shepherd.ts diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts index 48f8450fd..975d73cf8 100644 --- a/packages/taler-harness/src/harness/harness.ts +++ b/packages/taler-harness/src/harness/harness.ts @@ -412,6 +412,17 @@ export class GlobalTestState { } } } + + /** + * Log that the test arrived a certain step. + * + * The step name should be unique across the whole + */ + logStep(stepName: string): void { + // Now we just log, later we may report the steps that were done + // to easily see where the test hangs. + console.info(`STEP: ${stepName}`); + } } export function shouldLingerInTest(): boolean { diff --git a/packages/taler-harness/src/integrationtests/test-payment-fault.ts b/packages/taler-harness/src/integrationtests/test-payment-fault.ts index af6751ef4..cadcc9056 100644 --- a/packages/taler-harness/src/integrationtests/test-payment-fault.ts +++ b/packages/taler-harness/src/integrationtests/test-payment-fault.ts @@ -21,11 +21,7 @@ /** * Imports. */ -import { - TalerCorebankApiClient, - CoreApiResponse, - MerchantApiClient, -} from "@gnu-taler/taler-util"; +import { ConfirmPayResultType, MerchantApiClient } from "@gnu-taler/taler-util"; import { WalletApiOperation } from "@gnu-taler/taler-wallet-core"; import { defaultCoinConfig } from "../harness/denomStructures.js"; import { @@ -38,10 +34,13 @@ import { ExchangeService, GlobalTestState, MerchantService, - WalletCli, generateRandomPayto, setupDb, } from "../harness/harness.js"; +import { + createWalletDaemonWithClient, + withdrawViaBankV2, +} from "../harness/helpers.js"; /** * Run test for basic, bank-integrated withdrawal. @@ -123,45 +122,20 @@ export async function runPaymentFaultTest(t: GlobalTestState) { console.log("setup done!"); - const wallet = new WalletCli(t); - - // Create withdrawal operation - - const bankClient = new TalerCorebankApiClient(bank.corebankApiBaseUrl); - - const user = await bankClient.createRandomBankUser(); - const wop = await bankClient.createWithdrawalOperation( - user.username, - "TESTKUDOS:20", - ); - - // Hand it to the wallet - - await wallet.client.call(WalletApiOperation.GetWithdrawalDetailsForUri, { - talerWithdrawUri: wop.taler_withdraw_uri, - }); - - await wallet.runPending(); - - // Withdraw - - await wallet.client.call(WalletApiOperation.AcceptBankIntegratedWithdrawal, { - exchangeBaseUrl: faultyExchange.baseUrl, - talerWithdrawUri: wop.taler_withdraw_uri, + const { walletClient } = await createWalletDaemonWithClient(t, { + name: "default", }); - await wallet.runPending(); - // Confirm it + await walletClient.call(WalletApiOperation.GetBalances, {}); - await bankClient.confirmWithdrawalOperation(user.username, { - withdrawalOperationId: wop.withdrawal_id, + const wres = await withdrawViaBankV2(t, { + walletClient, + bank, + exchange: faultyExchange, + amount: "TESTKUDOS:20", }); - await wallet.runUntilDone(); - - // Check balance - - await wallet.client.call(WalletApiOperation.GetBalances, {}); + await wres.withdrawalFinishedCond; // Set up order. @@ -181,24 +155,22 @@ export async function runPaymentFaultTest(t: GlobalTestState) { // Make wallet pay for the order - let apiResp: CoreApiResponse; - - const prepResp = await wallet.client.call( + const prepResp = await walletClient.call( WalletApiOperation.PreparePayForUri, { talerPayUri: orderStatus.taler_pay_uri, }, ); - const proposalId = prepResp.proposalId; - - await wallet.runPending(); - // Drop 3 responses from the exchange. let faultCount = 0; faultyExchange.faultProxy.addFault({ async modifyResponse(ctx: FaultInjectionResponseContext) { - if (!ctx.request.requestUrl.endsWith("/deposit")) { + console.log(`in modifyResponse for ${ctx.request.requestUrl}`); + if ( + !ctx.request.requestUrl.endsWith("/deposit") && + !ctx.request.requestUrl.endsWith("/batch-deposit") + ) { return; } if (faultCount < 3) { @@ -213,12 +185,16 @@ export async function runPaymentFaultTest(t: GlobalTestState) { // confirmPay won't work, as the exchange is unreachable - await wallet.client.call(WalletApiOperation.ConfirmPay, { - // FIXME: should be validated, don't cast! - proposalId: proposalId, - }); + const confirmPayResp = await walletClient.call( + WalletApiOperation.ConfirmPay, + { + transactionId: prepResp.transactionId, + }, + ); + + t.assertDeepEqual(confirmPayResp.type, ConfirmPayResultType.Pending); - await wallet.runUntilDone(); + await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); // Check if payment was successful. diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts b/packages/taler-harness/src/integrationtests/test-payment-share.ts index ef4f8adeb..034bbc98d 100644 --- a/packages/taler-harness/src/integrationtests/test-payment-share.ts +++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts @@ -65,6 +65,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { }); await secondWallet.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); + t.logStep("setup-done"); + // create two orders to pay async function createOrder(amount: string) { const order = { @@ -74,7 +76,6 @@ export async function runPaymentShareTest(t: GlobalTestState) { }; const args = { order }; - const auth = {}; const orderResp = await merchantClient.createOrder({ order: args.order, @@ -88,6 +89,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { return { id: orderResp.order_id, uri: orderStatus.taler_pay_uri }; } + t.logStep("orders-created"); + /** * FIRST CASE, create in first wallet and pay in the second wallet * first wallet should not be able to continue @@ -104,6 +107,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { claimFirstWallet.status === PreparePayResultType.PaymentPossible, ); + t.logStep("w1-payment-possible"); + // share order from the first wallet const { privatePayUri } = await firstWallet.call( WalletApiOperation.SharePayment, @@ -113,6 +118,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { }, ); + t.logStep("w1-payment-shared"); + // claim from the second wallet const claimSecondWallet = await secondWallet.call( WalletApiOperation.PreparePayForUri, @@ -123,18 +130,25 @@ export async function runPaymentShareTest(t: GlobalTestState) { claimSecondWallet.status === PreparePayResultType.PaymentPossible, ); + t.logStep("w2-claimed"); + // pay from the second wallet const r2 = await secondWallet.call(WalletApiOperation.ConfirmPay, { - proposalId: claimSecondWallet.proposalId, + transactionId: claimSecondWallet.transactionId, }); + t.assertTrue(r2.type === ConfirmPayResultType.Done); + + t.logStep("w2-confirmed"); + // Wait for refresh to settle before we do checks await secondWallet.call( WalletApiOperation.TestingWaitTransactionsFinal, {}, ); - t.assertTrue(r2.type === ConfirmPayResultType.Done); + t.logStep("w2-refresh-settled"); + { const first = await firstWallet.call(WalletApiOperation.GetBalances, {}); const second = await secondWallet.call( @@ -155,11 +169,16 @@ export async function runPaymentShareTest(t: GlobalTestState) { claimFirstWalletAgain.status === PreparePayResultType.AlreadyConfirmed, ); + t.logStep("w1-prepared-again"); + const r1 = await firstWallet.call(WalletApiOperation.ConfirmPay, { - proposalId: claimFirstWallet.proposalId, + transactionId: claimFirstWallet.transactionId, }); t.assertTrue(r1.type === ConfirmPayResultType.Done); + + t.logStep("w1-confirmed-shared"); + { const first = await firstWallet.call(WalletApiOperation.GetBalances, {}); const second = await secondWallet.call( @@ -171,6 +190,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { } } + t.logStep("first-case-done"); + /** * SECOND CASE, create in first wallet and share to the second wallet * pay with the first wallet, second wallet should not be able to continue @@ -208,7 +229,7 @@ export async function runPaymentShareTest(t: GlobalTestState) { // pay from the second wallet const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, { - proposalId: claimFirstWallet.proposalId, + transactionId: claimFirstWallet.transactionId, }); t.assertTrue(r2.type === ConfirmPayResultType.Done); @@ -232,6 +253,8 @@ export async function runPaymentShareTest(t: GlobalTestState) { claimSecondWalletAgain.status === PreparePayResultType.AlreadyConfirmed, ); } + + t.logStep("second-case-done"); } runPaymentShareTest.suites = ["wallet"]; diff --git a/packages/taler-harness/src/integrationtests/test-peer-repair.ts b/packages/taler-harness/src/integrationtests/test-peer-repair.ts index a225a2057..22664bcc1 100644 --- a/packages/taler-harness/src/integrationtests/test-peer-repair.ts +++ b/packages/taler-harness/src/integrationtests/test-peer-repair.ts @@ -22,21 +22,19 @@ import { AmountString, Duration, NotificationType, - TalerUriAction, TransactionMajorState, TransactionMinorState, TransactionType, WalletNotification, - stringifyTalerUri, } from "@gnu-taler/taler-util"; import { WalletApiOperation } from "@gnu-taler/taler-wallet-core"; +import * as fs from "node:fs"; import { GlobalTestState } from "../harness/harness.js"; import { createSimpleTestkudosEnvironmentV2, createWalletDaemonWithClient, withdrawViaBankV2, } from "../harness/helpers.js"; -import * as fs from "node:fs"; export async function runPeerRepairTest(t: GlobalTestState) { // Set up test environment diff --git a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts index e8d34e288..b61a3941b 100644 --- a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts +++ b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts @@ -33,7 +33,6 @@ import { BankServiceHandle, ExchangeService, GlobalTestState, - WalletCli, WalletClient, } from "../harness/harness.js"; import { diff --git a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts index def2462e0..3c47f30db 100644 --- a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts +++ b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts @@ -22,6 +22,7 @@ import { Duration, durationFromSpec, MerchantApiClient, + NotificationType, PreparePayResultType, } from "@gnu-taler/taler-util"; import { @@ -124,6 +125,12 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) { }); await wres.withdrawalFinishedCond; + const exchangeUpdated1Cond = walletClient.waitForNotificationCond( + (x) => + x.type === NotificationType.ExchangeStateTransition && + x.exchangeBaseUrl === exchange.baseUrl, + ); + // Travel into the future, the deposit expiration is two years // into the future. console.log("applying first time travel"); @@ -142,7 +149,8 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) { console.log("pending operations after first time travel"); console.log(JSON.stringify(p, undefined, 2)); - await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {}); + // The time travel should cause exchanges to update. + await exchangeUpdated1Cond; await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); const wres2 = await withdrawViaBankV2(t, { @@ -155,6 +163,12 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) { await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); + const exchangeUpdated2Cond = walletClient.waitForNotificationCond( + (x) => + x.type === NotificationType.ExchangeStateTransition && + x.exchangeBaseUrl === exchange.baseUrl, + ); + // Travel into the future, the deposit expiration is two years // into the future. console.log("applying second time travel"); @@ -167,7 +181,8 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) { }, ); - await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {}); + // The time travel should cause exchanges to update. + await exchangeUpdated2Cond; await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {}); // At this point, the original coins should've been refreshed. diff --git a/packages/taler-util/src/time.ts b/packages/taler-util/src/time.ts index c677d52ae..5702b2947 100644 --- a/packages/taler-util/src/time.ts +++ b/packages/taler-util/src/time.ts @@ -21,7 +21,7 @@ /** * Imports. */ -import { Codec, renderContext, Context } from "./codec.js"; +import { Codec, Context, renderContext } from "./codec.js"; declare const flavor_AbsoluteTime: unique symbol; declare const flavor_TalerProtocolTimestamp: unique symbol; @@ -412,6 +412,10 @@ export namespace AbsoluteTime { return cmp(t, now()) <= 0; } + export function isNever(t: AbsoluteTime): boolean { + return t.t_ms === "never"; + } + export function fromProtocolTimestamp( t: TalerProtocolTimestamp, ): AbsoluteTime { @@ -503,6 +507,23 @@ export namespace AbsoluteTime { return { t_ms: t1.t_ms + d.d_ms, [opaque_AbsoluteTime]: true }; } + /** + * Get the remaining duration until {@param t1}. + * + * If {@param t1} already happened, the remaining duration + * is zero. + */ + export function remaining(t1: AbsoluteTime): Duration { + if (t1.t_ms === "never") { + return Duration.getForever(); + } + const stampNow = now(); + if (stampNow.t_ms === "never") { + throw Error("invariant violated"); + } + return Duration.fromMilliseconds(Math.max(0, t1.t_ms - stampNow.t_ms)); + } + export function subtractDuraction( t1: AbsoluteTime, d: Duration, diff --git a/packages/taler-util/src/wallet-types.ts b/packages/taler-util/src/wallet-types.ts index 0749df9f9..b79bfe4fe 100644 --- a/packages/taler-util/src/wallet-types.ts +++ b/packages/taler-util/src/wallet-types.ts @@ -71,12 +71,10 @@ import { } from "./taler-types.js"; import { AbsoluteTime, - Duration, TalerPreciseTimestamp, TalerProtocolDuration, TalerProtocolTimestamp, codecForAbsoluteTime, - codecForDuration, codecForTimestamp, } from "./time.js"; import { @@ -3062,3 +3060,13 @@ export const codecForRemoveGlobalCurrencyAuditorRequest = .property("auditorBaseUrl", codecForString()) .property("auditorPub", codecForString()) .build("RemoveGlobalCurrencyAuditorRequest"); + +export interface RetryLoopOpts { + /** + * Stop the retry loop when all lifeness-giving pending operations + * are done. + * + * Defaults to false. + */ + stopWhenDone?: boolean; +} diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 91dcd2702..f81236cd4 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -284,9 +284,6 @@ async function createLocalWallet( console.error("Operation failed: " + summarizeTalerErrorDetail(ed)); console.error("Error details:", JSON.stringify(ed, undefined, 2)); processExit(1); - } finally { - logger.trace("operation with wallet finished, stopping"); - logger.trace("stopped wallet"); } } @@ -343,6 +340,7 @@ async function withLocalWallet( const wh = await createLocalWallet(walletCliArgs); const w = wh.wallet; const res = await f({ client: w.client, ws: w }); + logger.info("Work done, stopping wallet."); w.stop(); return res; } @@ -956,7 +954,6 @@ depositCli }, ); console.log(`Created deposit ${resp.depositGroupId}`); - await wallet.ws.runPending(); }); }); @@ -1231,9 +1228,9 @@ advancedCli help: "Run pending operations.", }) .action(async (args) => { - await withLocalWallet(args, async (wallet) => { - await wallet.ws.runPending(); - }); + logger.error( + "Subcommand run-pending not supported anymore. Please use run-until-done or the client/server wallet.", + ); }); advancedCli diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index fdf04a65f..4379f20b5 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -31,16 +31,14 @@ */ import { IDBFactory } from "@gnu-taler/idb-bridge"; import { - CoinRefreshRequest, DenominationInfo, - RefreshGroupId, - RefreshReason, TransactionState, WalletNotification, } from "@gnu-taler/taler-util"; import { HttpRequestLibrary } from "@gnu-taler/taler-util/http"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { WalletStoresV1 } from "./db.js"; +import { TaskScheduler } from "./shepherd.js"; import { AsyncCondition } from "./util/promiseUtils.js"; import { DbAccess, @@ -78,12 +76,6 @@ export interface RecoupOperations { export type NotificationListener = (n: WalletNotification) => void; -export interface ActiveLongpollInfo { - [opId: string]: { - cancel: () => void; - }; -} - export type CancelFn = () => void; /** @@ -94,11 +86,6 @@ export type CancelFn = () => void; * as it's an opaque implementation detail. */ export interface InternalWalletState { - /** - * Active longpoll operations. - */ - activeLongpoll: ActiveLongpollInfo; - cryptoApi: TalerCryptoInterface; timerGroup: TimerGroup; @@ -106,13 +93,7 @@ export interface InternalWalletState { config: Readonly; - /** - * Asynchronous condition to interrupt the sleep of the - * retry loop. - * - * Used to allow processing of new work faster. - */ - workAvailable: AsyncCondition; + taskScheduler: TaskScheduler; listeners: NotificationListener[]; diff --git a/packages/taler-wallet-core/src/operations/README.md b/packages/taler-wallet-core/src/operations/README.md deleted file mode 100644 index a40349d37..000000000 --- a/packages/taler-wallet-core/src/operations/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# Wallet Operations - -This folder contains the implementations for all wallet operations that operate on the wallet state. - -To avoid cyclic dependencies, these files must **not** reference each other. Instead, other operations should only be accessed via injected dependencies. - -Avoiding cyclic dependencies is important for module bundlers. diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index 7a2771c57..e4e4e43f6 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -30,12 +30,10 @@ import { AttentionType, BackupRecovery, Codec, - DenomKeyType, EddsaKeyPair, HttpStatusCode, Logger, PreparePayResult, - PreparePayResultType, RecoveryLoadRequest, RecoveryMergeStrategy, TalerError, @@ -61,11 +59,9 @@ import { encodeCrock, getRandomBytes, hash, - hashDenomPub, j2s, kdf, notEmpty, - rsaBlind, secretbox, secretbox_open, stringToBytes, @@ -75,7 +71,6 @@ import { readTalerErrorResponse, } from "@gnu-taler/taler-util/http"; import { gunzipSync, gzipSync } from "fflate"; -import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js"; import { BackupProviderRecord, BackupProviderState, @@ -84,25 +79,23 @@ import { ConfigRecord, ConfigRecordKey, WalletBackupConfState, + WalletStoresV1, timestampOptionalPreciseFromDb, - timestampPreciseFromDb, timestampPreciseToDb, } from "../../db.js"; import { InternalWalletState } from "../../internal-wallet-state.js"; -import { assertUnreachable } from "../../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant, } from "../../util/invariants.js"; +import { GetReadOnlyAccess } from "../../util/query.js"; import { addAttentionRequest, removeAttentionRequest } from "../attention.js"; import { + TaskIdentifiers, TaskRunResult, TaskRunResultType, - TaskIdentifiers, } from "../common.js"; -import { checkPaymentByProposalId, preparePayForUri } from "../pay-merchant.js"; -import { WalletStoresV1 } from "../../db.js"; -import { GetReadOnlyAccess } from "../../util/query.js"; +import { preparePayForUri } from "../pay-merchant.js"; const logger = new Logger("operations/backup.ts"); @@ -318,9 +311,10 @@ async function runBackupCycleForProvider( await tx.backupProviders.put(prov); }); - return { - type: TaskRunResultType.Pending, - }; + throw Error("not implemented"); + // return { + // type: TaskRunResultType.Pending, + // }; } const result = res; @@ -352,9 +346,10 @@ async function runBackupCycleForProvider( provider.baseUrl, ); - return { - type: TaskRunResultType.Pending, - }; + throw Error("not implemented"); + // return { + // type: TaskRunResultType.Pending, + // }; } if (resp.status === HttpStatusCode.NoContent) { @@ -658,30 +653,27 @@ async function runFirstBackupCycleForProvider( ws: InternalWalletState, args: BackupForProviderArgs, ): Promise { - const resp = await runBackupCycleForProvider(ws, args); - switch (resp.type) { - case TaskRunResultType.Error: - throw TalerError.fromDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - resp.errorDetail as any, //FIXME create an error for backup problems - ); - case TaskRunResultType.Finished: - return { - status: "ok", - }; - case TaskRunResultType.Longpoll: - throw Error( - "unexpected runFirstBackupCycleForProvider result (longpoll)", - ); - case TaskRunResultType.Pending: - return { - status: "payment-required", - talerUri: "FIXME", - //talerUri: resp.result.talerUri, - }; - default: - assertUnreachable(resp); - } + throw Error("not implemented"); + // const resp = await runBackupCycleForProvider(ws, args); + // switch (resp.type) { + // case TaskRunResultType.Error: + // throw TalerError.fromDetail( + // TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + // resp.errorDetail as any, //FIXME create an error for backup problems + // ); + // case TaskRunResultType.Finished: + // return { + // status: "ok", + // }; + // case TaskRunResultType.Pending: + // return { + // status: "payment-required", + // talerUri: "FIXME", + // //talerUri: resp.result.talerUri, + // }; + // default: + // assertUnreachable(resp); + // } } export async function restoreFromRecoverySecret(): Promise { @@ -780,51 +772,52 @@ async function getProviderPaymentInfo( ws: InternalWalletState, provider: BackupProviderRecord, ): Promise { - if (!provider.currentPaymentProposalId) { - return { - type: ProviderPaymentType.Unpaid, - }; - } - const status = await checkPaymentByProposalId( - ws, - provider.currentPaymentProposalId, - ).catch(() => undefined); - - if (!status) { - return { - type: ProviderPaymentType.Unpaid, - }; - } - - switch (status.status) { - case PreparePayResultType.InsufficientBalance: - return { - type: ProviderPaymentType.InsufficientBalance, - amount: status.amountRaw, - }; - case PreparePayResultType.PaymentPossible: - return { - type: ProviderPaymentType.Pending, - talerUri: status.talerUri, - }; - case PreparePayResultType.AlreadyConfirmed: - if (status.paid) { - return { - type: ProviderPaymentType.Paid, - paidUntil: AbsoluteTime.addDuration( - AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp), - durationFromSpec({ years: 1 }), //FIXME: take this from the contract term - ), - }; - } else { - return { - type: ProviderPaymentType.Pending, - talerUri: status.talerUri, - }; - } - default: - assertUnreachable(status); - } + throw Error("not implemented"); + // if (!provider.currentPaymentProposalId) { + // return { + // type: ProviderPaymentType.Unpaid, + // }; + // } + // const status = await checkPaymentByProposalId( + // ws, + // provider.currentPaymentProposalId, + // ).catch(() => undefined); + + // if (!status) { + // return { + // type: ProviderPaymentType.Unpaid, + // }; + // } + + // switch (status.status) { + // case PreparePayResultType.InsufficientBalance: + // return { + // type: ProviderPaymentType.InsufficientBalance, + // amount: status.amountRaw, + // }; + // case PreparePayResultType.PaymentPossible: + // return { + // type: ProviderPaymentType.Pending, + // talerUri: status.talerUri, + // }; + // case PreparePayResultType.AlreadyConfirmed: + // if (status.paid) { + // return { + // type: ProviderPaymentType.Paid, + // paidUntil: AbsoluteTime.addDuration( + // AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp), + // durationFromSpec({ years: 1 }), //FIXME: take this from the contract term + // ), + // }; + // } else { + // return { + // type: ProviderPaymentType.Pending, + // talerUri: status.talerUri, + // }; + // } + // default: + // assertUnreachable(status); + // } } /** diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 4c7c55212..92950b35b 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -21,7 +21,6 @@ import { AbsoluteTime, AmountJson, Amounts, - CancellationToken, CoinRefreshRequest, CoinStatus, Duration, @@ -29,22 +28,15 @@ import { ExchangeEntryStatus, ExchangeTosStatus, ExchangeUpdateStatus, - getErrorDetailFromException, - j2s, Logger, - makeErrorDetail, - NotificationType, RefreshReason, - TalerError, - TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, + TalerProtocolTimestamp, TombstoneIdStr, TransactionIdStr, - TransactionType, - WalletNotification, + durationMul, } from "@gnu-taler/taler-util"; -import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js"; import { BackupProviderRecord, CoinRecord, @@ -61,17 +53,16 @@ import { RecoupGroupRecord, RefreshGroupRecord, RewardRecord, - timestampPreciseToDb, WalletStoresV1, WithdrawalGroupRecord, + timestampPreciseToDb, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; +import { GetReadWriteAccess } from "../util/query.js"; import { createRefreshGroup } from "./refresh.js"; -import { constructTransactionIdentifier } from "./transactions.js"; const logger = new Logger("operations/common.ts"); @@ -251,331 +242,6 @@ export async function spendCoins( ); } -/** - * Convert the task ID for a task that processes a transaction int - * the ID for the transaction. - */ -function convertTaskToTransactionId( - taskId: string, -): TransactionIdStr | undefined { - const parsedTaskId = parseTaskIdentifier(taskId); - switch (parsedTaskId.tag) { - case PendingTaskType.PeerPullCredit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPullCredit, - pursePub: parsedTaskId.pursePub, - }); - case PendingTaskType.PeerPullDebit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPullDebit, - peerPullDebitId: parsedTaskId.peerPullDebitId, - }); - // FIXME: This doesn't distinguish internal-withdrawal. - // Maybe we should have a different task type for that as well? - // Or maybe transaction IDs should be valid task identifiers? - case PendingTaskType.Withdraw: - return constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId: parsedTaskId.withdrawalGroupId, - }); - case PendingTaskType.PeerPushCredit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPushCredit, - peerPushCreditId: parsedTaskId.peerPushCreditId, - }); - case PendingTaskType.Deposit: - return constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId: parsedTaskId.depositGroupId, - }); - case PendingTaskType.Refresh: - return constructTransactionIdentifier({ - tag: TransactionType.Refresh, - refreshGroupId: parsedTaskId.refreshGroupId, - }); - case PendingTaskType.RewardPickup: - return constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: parsedTaskId.walletRewardId, - }); - case PendingTaskType.PeerPushDebit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPushDebit, - pursePub: parsedTaskId.pursePub, - }); - case PendingTaskType.Purchase: - return constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId: parsedTaskId.proposalId, - }); - default: - return undefined; - } -} - -async function makeTransactionRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise { - const txId = convertTaskToTransactionId(pendingTaskId); - if (!txId) { - return undefined; - } - const txState = await ws.getTransactionState(ws, tx, txId); - if (!txState) { - return undefined; - } - const notif: WalletNotification = { - type: NotificationType.TransactionStateTransition, - transactionId: txId, - oldTxState: txState, - newTxState: txState, - }; - if (e) { - notif.errorInfo = { - code: e.code as number, - hint: e.hint, - }; - } - return notif; -} - -async function makeExchangeRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise { - logger.info("making exchange retry notification"); - const parsedTaskId = parseTaskIdentifier(pendingTaskId); - if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { - throw Error("invalid task identifier"); - } - const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); - - if (!rec) { - logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); - return undefined; - } - - const notif: WalletNotification = { - type: NotificationType.ExchangeStateTransition, - exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, - oldExchangeState: getExchangeState(rec), - newExchangeState: getExchangeState(rec), - }; - if (e) { - notif.errorInfo = { - code: e.code as number, - hint: e.hint, - }; - } - return notif; -} - -/** - * Generate an appropriate error transition notification - * for applicable tasks. - * - * Namely, transition notifications are generated for: - * - exchange update errors - * - transactions - */ -async function taskToRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise { - const parsedTaskId = parseTaskIdentifier(pendingTaskId); - - switch (parsedTaskId.tag) { - case PendingTaskType.ExchangeUpdate: - return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); - case PendingTaskType.PeerPullCredit: - case PendingTaskType.PeerPullDebit: - case PendingTaskType.Withdraw: - case PendingTaskType.PeerPushCredit: - case PendingTaskType.Deposit: - case PendingTaskType.Refresh: - case PendingTaskType.RewardPickup: - case PendingTaskType.PeerPushDebit: - case PendingTaskType.Purchase: - return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); - case PendingTaskType.Backup: - case PendingTaskType.ExchangeCheckRefresh: - case PendingTaskType.Recoup: - return undefined; - } -} - -async function storePendingTaskError( - ws: InternalWalletState, - pendingTaskId: string, - e: TalerErrorDetail, -): Promise { - logger.info(`storing pending task error for ${pendingTaskId}`); - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - if (!retryRecord) { - retryRecord = { - id: pendingTaskId, - lastError: e, - retryInfo: DbRetryInfo.reset(), - }; - } else { - retryRecord.lastError = e; - retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); - } - await tx.operationRetries.put(retryRecord); - return taskToRetryNotification(ws, tx, pendingTaskId, e); - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -export async function resetPendingTaskTimeout( - ws: InternalWalletState, - pendingTaskId: string, -): Promise { - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - if (retryRecord) { - // Note that we don't reset the lastError, it should still be visible - // while the retry runs. - retryRecord.retryInfo = DbRetryInfo.reset(); - await tx.operationRetries.put(retryRecord); - } - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -async function storePendingTaskPending( - ws: InternalWalletState, - pendingTaskId: string, -): Promise { - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - let hadError = false; - if (!retryRecord) { - retryRecord = { - id: pendingTaskId, - retryInfo: DbRetryInfo.reset(), - }; - } else { - if (retryRecord.lastError) { - hadError = true; - } - delete retryRecord.lastError; - retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); - } - await tx.operationRetries.put(retryRecord); - if (hadError) { - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - } else { - return undefined; - } - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -async function storePendingTaskFinished( - ws: InternalWalletState, - pendingTaskId: string, -): Promise { - await ws.db - .mktx((x) => [x.operationRetries]) - .runReadWrite(async (tx) => { - await tx.operationRetries.delete(pendingTaskId); - }); -} - -export async function runTaskWithErrorReporting( - ws: InternalWalletState, - opId: TaskId, - f: () => Promise, -): Promise { - let maybeError: TalerErrorDetail | undefined; - try { - const resp = await f(); - switch (resp.type) { - case TaskRunResultType.Error: - await storePendingTaskError(ws, opId, resp.errorDetail); - return resp; - case TaskRunResultType.Finished: - await storePendingTaskFinished(ws, opId); - return resp; - case TaskRunResultType.Pending: - await storePendingTaskPending(ws, opId); - return resp; - case TaskRunResultType.Longpoll: - return resp; - } - } catch (e) { - if (e instanceof CryptoApiStoppedError) { - if (ws.stopped) { - logger.warn("crypto API stopped during shutdown, ignoring error"); - return { - type: TaskRunResultType.Error, - errorDetail: makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - "Crypto API stopped during shutdown", - ), - }; - } - } - if (e instanceof TalerError) { - logger.warn("operation processed resulted in error"); - logger.warn(`error was: ${j2s(e.errorDetail)}`); - maybeError = e.errorDetail; - await storePendingTaskError(ws, opId, maybeError!); - return { - type: TaskRunResultType.Error, - errorDetail: e.errorDetail, - }; - } else if (e instanceof Error) { - // This is a bug, as we expect pending operations to always - // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED - // or return something. - logger.error(`Uncaught exception: ${e.message}`); - logger.error(`Stack: ${e.stack}`); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - { - stack: e.stack, - }, - `unexpected exception (message: ${e.message})`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } else { - logger.error("Uncaught exception, value is not even an error."); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - `unexpected exception (not even an error)`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } - } -} - export enum TombstoneTag { DeleteWithdrawalGroup = "delete-withdrawal-group", DeleteReserve = "delete-reserve", @@ -646,47 +312,6 @@ export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState { }; } -export interface LongpollResult { - ready: boolean; -} - -export function runLongpollAsync( - ws: InternalWalletState, - retryTag: string, - reqFn: (ct: CancellationToken) => Promise, -): void { - const asyncFn = async () => { - if (ws.stopped) { - logger.trace("not long-polling reserve, wallet already stopped"); - await storePendingTaskPending(ws, retryTag); - return; - } - const cts = CancellationToken.create(); - let res: { ready: boolean } | undefined = undefined; - try { - ws.activeLongpoll[retryTag] = { - cancel: () => { - logger.trace("cancel of reserve longpoll requested"); - cts.cancel(); - }, - }; - res = await reqFn(cts.token); - } catch (e) { - const errDetail = getErrorDetailFromException(e); - logger.warn(`got error during long-polling: ${j2s(errDetail)}`); - await storePendingTaskError(ws, retryTag, errDetail); - return; - } finally { - delete ws.activeLongpoll[retryTag]; - } - if (!res.ready) { - await storePendingTaskPending(ws, retryTag); - } - ws.workAvailable.trigger(); - }; - asyncFn(); -} - export type ParsedTombstone = | { tag: TombstoneTag.DeleteWithdrawalGroup; @@ -732,31 +357,53 @@ export interface TransactionManager { export enum TaskRunResultType { Finished = "finished", - Pending = "pending", + Backoff = "backoff", + Progress = "progress", Error = "error", - Longpoll = "longpoll", + ScheduleLater = "schedule-later", } export type TaskRunResult = | TaskRunFinishedResult | TaskRunErrorResult - | TaskRunLongpollResult - | TaskRunPendingResult; + | TaskRunBackoffResult + | TaskRunProgressResult + | TaskRunScheduleLaterResult; export namespace TaskRunResult { + /** + * Task is finished and does not need to be processed again. + */ export function finished(): TaskRunResult { return { type: TaskRunResultType.Finished, }; } - export function pending(): TaskRunResult { + /** + * Task is waiting for something, should be invoked + * again with exponentiall back-off until some other + * result is returned. + */ + export function backoff(): TaskRunResult { + return { + type: TaskRunResultType.Backoff, + }; + } + /** + * Task made progress and should be processed again. + */ + export function progress(): TaskRunResult { return { - type: TaskRunResultType.Pending, + type: TaskRunResultType.Progress, }; } - export function longpoll(): TaskRunResult { + /** + * Run the task again at a fixed time in the future. + */ + export function runAgainAt(runAt: AbsoluteTime): TaskRunResult { return { - type: TaskRunResultType.Longpoll, + type: TaskRunResultType.ScheduleLater, + runAt, }; } } @@ -765,8 +412,17 @@ export interface TaskRunFinishedResult { type: TaskRunResultType.Finished; } -export interface TaskRunPendingResult { - type: TaskRunResultType.Pending; +export interface TaskRunBackoffResult { + type: TaskRunResultType.Backoff; +} + +export interface TaskRunProgressResult { + type: TaskRunResultType.Progress; +} + +export interface TaskRunScheduleLaterResult { + type: TaskRunResultType.ScheduleLater; + runAt: AbsoluteTime; } export interface TaskRunErrorResult { @@ -774,10 +430,6 @@ export interface TaskRunErrorResult { errorDetail: TalerErrorDetail; } -export interface TaskRunLongpollResult { - type: TaskRunResultType.Longpoll; -} - export interface DbRetryInfo { firstTry: DbPreciseTimestamp; nextRetry: DbPreciseTimestamp; @@ -866,6 +518,24 @@ export namespace DbRetryInfo { } } +/** + * Timestamp after which the wallet would do an auto-refresh. + */ +export function getAutoRefreshExecuteThreshold(d: { + stampExpireWithdraw: TalerProtocolTimestamp; + stampExpireDeposit: TalerProtocolTimestamp; +}): AbsoluteTime { + const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireWithdraw, + ); + const expireDeposit = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireDeposit, + ); + const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); + const deltaDiv = durationMul(delta, 0.5); + return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); +} + /** * Parsed representation of task identifiers. */ @@ -877,7 +547,6 @@ export type ParsedTaskIdentifier = | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string } | { tag: PendingTaskType.Deposit; depositGroupId: string } - | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string } | { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string } | { tag: PendingTaskType.PeerPullCredit; pursePub: string } | { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string } @@ -900,8 +569,6 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.Deposit: return { tag: type, depositGroupId: rest[0] }; - case PendingTaskType.ExchangeCheckRefresh: - return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.ExchangeUpdate: return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.PeerPullCredit: @@ -933,8 +600,6 @@ export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId { return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId; case PendingTaskType.Deposit: return `${p.tag}:${p.depositGroupId}` as TaskId; - case PendingTaskType.ExchangeCheckRefresh: - return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId; case PendingTaskType.ExchangeUpdate: return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId; case PendingTaskType.PeerPullDebit: @@ -974,11 +639,6 @@ export namespace TaskIdentifiers { exchBaseUrl, )}` as TaskId; } - export function forExchangeCheckRefresh(exch: ExchangeEntryRecord): TaskId { - return `${PendingTaskType.ExchangeCheckRefresh}:${encodeURIComponent( - exch.baseUrl, - )}` as TaskId; - } export function forTipPickup(tipRecord: RewardRecord): TaskId { return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId; } diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 3619ac4f4..38b5d43f0 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -48,6 +48,7 @@ import { TalerProtocolTimestamp, TrackTransaction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -75,6 +76,7 @@ import { KycPendingInfo, PendingTaskType, RefreshOperationStatus, + TaskId, createRefreshGroup, getCandidateWithdrawalDenomsTx, getTotalRefreshCost, @@ -90,7 +92,6 @@ import { TombstoneTag, TransactionContext, constructTaskIdentifier, - runLongpollAsync, spendCoins, } from "./common.js"; import { getExchangeWireDetailsInTx } from "./exchanges.js"; @@ -103,7 +104,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; /** @@ -112,8 +112,8 @@ import { const logger = new Logger("deposits.ts"); export class DepositTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, public depositGroupId: string, @@ -122,7 +122,7 @@ export class DepositTransactionContext implements TransactionContext { tag: TransactionType.Deposit, depositGroupId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId, }); @@ -148,7 +148,7 @@ export class DepositTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -185,12 +185,12 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; }); - stopLongpolling(ws, retryTag); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -219,14 +219,13 @@ export class DepositTransactionContext implements TransactionContext { } return undefined; }); - stopLongpolling(ws, retryTag); - // Need to process the operation again. - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -263,12 +262,12 @@ export class DepositTransactionContext implements TransactionContext { newTxState: computeDepositTransactionStatus(dg), }; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { - const { ws, depositGroupId, transactionId, retryTag } = this; + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.depositGroups]) .runReadWrite(async (tx) => { @@ -294,7 +293,7 @@ export class DepositTransactionContext implements TransactionContext { return undefined; }); // FIXME: Also cancel ongoing work (via cancellation token, once implemented) - stopLongpolling(ws, retryTag); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } } @@ -453,7 +452,7 @@ async function waitForRefreshOnDepositGroup( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function refundDepositGroup( @@ -568,7 +567,7 @@ async function refundDepositGroup( await tx.depositGroups.put(newDg); }); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processDepositGroupAborting( @@ -588,6 +587,7 @@ async function processDepositGroupAborting( async function processDepositGroupPendingKyc( ws: InternalWalletState, depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, ): Promise { const { depositGroupId } = depositGroup; const transactionId = constructTransactionIdentifier({ @@ -606,51 +606,45 @@ async function processDepositGroupPendingKyc( throw Error("invalid DB state, in pending(kyc), but no kycInfo present"); } - runLongpollAsync(ws, retryTag, async (ct) => { - const url = new URL( - `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, - kycInfo.exchangeBaseUrl, - ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken: ct, - }); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const newDg = await tx.depositGroups.get(depositGroupId); - if (!newDg) { - return; - } - if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { - return; - } - const oldTxState = computeDepositTransactionStatus(newDg); - newDg.operationStatus = DepositOperationStatus.PendingTrack; - const newTxState = computeDepositTransactionStatus(newDg); - await tx.depositGroups.put(newDg); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - // FIXME: Do we have to update the URL here? - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + kycInfo.exchangeBaseUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return TaskRunResult.longpoll(); + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.depositGroups]) + .runReadWrite(async (tx) => { + const newDg = await tx.depositGroups.get(depositGroupId); + if (!newDg) { + return; + } + if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = DepositOperationStatus.PendingTrack; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } /** @@ -682,7 +676,7 @@ async function transitionToKycRequired( }); if (kycStatusReq.status === HttpStatusCode.Ok) { logger.warn("kyc requested, but already fulfilled"); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); @@ -864,7 +858,7 @@ async function processDepositGroupPendingTrack( return TaskRunResult.finished(); } else { // FIXME: Use long-polling. - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -993,7 +987,7 @@ async function processDepositGroupPendingDeposit( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } /** @@ -1002,9 +996,7 @@ async function processDepositGroupPendingDeposit( export async function processDepositGroup( ws: InternalWalletState, depositGroupId: string, - options: { - cancellationToken?: CancellationToken; - } = {}, + cancellationToken: CancellationToken, ): Promise { const depositGroup = await ws.db .mktx((x) => [x.depositGroups]) @@ -1021,15 +1013,15 @@ export async function processDepositGroup( return processDepositGroupPendingTrack( ws, depositGroup, - options.cancellationToken, + cancellationToken, ); case DepositOperationStatus.PendingKyc: - return processDepositGroupPendingKyc(ws, depositGroup); + return processDepositGroupPendingKyc(ws, depositGroup, cancellationToken); case DepositOperationStatus.PendingDeposit: return processDepositGroupPendingDeposit( ws, depositGroup, - options.cancellationToken, + cancellationToken, ); case DepositOperationStatus.Aborting: return processDepositGroupAborting(ws, depositGroup); @@ -1393,10 +1385,8 @@ export async function createDepositGroup( operationStatus: DepositOperationStatus.PendingDeposit, }; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId, - }); + const ctx = new DepositTransactionContext(ws, depositGroupId); + const transactionId = ctx.transactionId; const newTxState = await ws.db .mktx((x) => [ @@ -1439,6 +1429,8 @@ export async function createDepositGroup( hintTransactionId: transactionId, }); + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { depositGroupId, transactionId, diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index b4d45db2c..22be4102a 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -28,6 +28,8 @@ import { AgeRestriction, Amounts, CancellationToken, + CoinRefreshRequest, + CoinStatus, DeleteExchangeRequest, DenomKeyType, DenomOperationMap, @@ -53,6 +55,7 @@ import { NotificationType, OperationErrorInfo, Recoup, + RefreshReason, ScopeInfo, ScopeType, TalerError, @@ -67,8 +70,11 @@ import { WireFeeMap, WireFeesJson, WireInfo, + assertUnreachable, canonicalizeBaseUrl, codecForExchangeKeysJson, + durationFromSpec, + durationMul, encodeCrock, hashDenomPub, j2s, @@ -89,19 +95,22 @@ import { WalletStoresV1, } from "../db.js"; import { + AsyncFlag, ExchangeEntryDbRecordStatus, ExchangeEntryDbUpdateStatus, PendingTaskType, WalletDbReadOnlyTransactionArr, WalletDbReadWriteTransactionArr, + createRefreshGroup, createTimeline, isWithdrawableDenom, selectBestForOverlappingDenominations, selectMinimumFee, - timestampOptionalAbsoluteFromDb, + timestampAbsoluteFromDb, timestampOptionalPreciseFromDb, timestampPreciseFromDb, timestampPreciseToDb, + timestampProtocolFromDb, timestampProtocolToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; @@ -117,11 +126,11 @@ import { TaskRunResult, TaskRunResultType, constructTaskIdentifier, + getAutoRefreshExecuteThreshold, getExchangeEntryStatusFromRecord, getExchangeState, getExchangeTosStatusFromRecord, getExchangeUpdateStatusFromRecord, - runTaskWithErrorReporting, } from "./common.js"; const logger = new Logger("exchanges.ts"); @@ -635,11 +644,13 @@ async function downloadExchangeKeysInfo( baseUrl: string, http: HttpRequestLibrary, timeout: Duration, + cancellationToken: CancellationToken, ): Promise { const keysUrl = new URL("keys", baseUrl); const resp = await http.fetch(keysUrl.href, { timeout, + cancellationToken, }); // We must make sure to parse out the protocol version @@ -828,13 +839,19 @@ async function downloadTosFromAcceptedFormat( * If the exchange entry doesn't exist, * a new ephemeral entry is created. */ -export async function startUpdateExchangeEntry( +async function startUpdateExchangeEntry( ws: InternalWalletState, exchangeBaseUrl: string, options: { forceUpdate?: boolean } = {}, ): Promise { const canonBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); + logger.info( + `starting update of exchange entry ${canonBaseUrl}, forced=${ + options.forceUpdate ?? false + }`, + ); + const { notification } = await ws.db .mktx((x) => [x.exchanges, x.exchangeDetails]) .runReadWrite(async (tx) => { @@ -845,7 +862,7 @@ export async function startUpdateExchangeEntry( ws.notify(notification); } - const { oldExchangeState, newExchangeState } = await ws.db + const { oldExchangeState, newExchangeState, taskId } = await ws.db .mktx((x) => [x.exchanges, x.operationRetries]) .runReadWrite(async (tx) => { const r = await tx.exchanges.get(canonBaseUrl); @@ -882,7 +899,7 @@ export async function startUpdateExchangeEntry( // Reset retries for updating the exchange entry. const taskId = TaskIdentifiers.forExchangeUpdate(r); await tx.operationRetries.delete(taskId); - return { oldExchangeState, newExchangeState }; + return { oldExchangeState, newExchangeState, taskId }; }); ws.notify({ type: NotificationType.ExchangeStateTransition, @@ -890,7 +907,7 @@ export async function startUpdateExchangeEntry( newExchangeState: newExchangeState, oldExchangeState: oldExchangeState, }); - ws.workAvailable.trigger(); + ws.taskScheduler.restartShepherdTask(taskId); } /** @@ -909,6 +926,119 @@ export interface ReadyExchangeSummary { scopeInfo: ScopeInfo; } +async function internalWaitReadyExchange( + ws: InternalWalletState, + canonUrl: string, + exchangeNotifFlag: AsyncFlag, + options: { + cancellationToken?: CancellationToken; + forceUpdate?: boolean; + expectedMasterPub?: string; + } = {}, +): Promise { + const operationId = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: canonUrl, + }); + while (true) { + logger.info(`waiting for ready exchange ${canonUrl}`); + const { exchange, exchangeDetails, retryInfo, scopeInfo } = + await ws.db.runReadOnlyTx( + [ + "exchanges", + "exchangeDetails", + "operationRetries", + "globalCurrencyAuditors", + "globalCurrencyExchanges", + ], + async (tx) => { + const exchange = await tx.exchanges.get(canonUrl); + const exchangeDetails = await getExchangeRecordsInternal( + tx, + canonUrl, + ); + const retryInfo = await tx.operationRetries.get(operationId); + let scopeInfo: ScopeInfo | undefined = undefined; + if (exchange && exchangeDetails) { + scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); + } + return { exchange, exchangeDetails, retryInfo, scopeInfo }; + }, + ); + + if (!exchange) { + throw Error("exchange entry does not exist anymore"); + } + + let ready = false; + + switch (exchange.updateStatus) { + case ExchangeEntryDbUpdateStatus.Ready: + ready = true; + break; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + // If the update is forced, + // we wait until we're in a full "ready" state, + // as we're not happy with the stale information. + if (!options.forceUpdate) { + ready = true; + } + break; + default: { + if (retryInfo) { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + } + } + } + + if (!ready) { + logger.info("waiting for exchange update notification"); + await exchangeNotifFlag.wait(); + logger.info("done waiting for exchange update notification"); + exchangeNotifFlag.reset(); + continue; + } + + if (!exchangeDetails) { + throw Error("invariant failed"); + } + + if (!scopeInfo) { + throw Error("invariant failed"); + } + + const res: ReadyExchangeSummary = { + currency: exchangeDetails.currency, + exchangeBaseUrl: canonUrl, + masterPub: exchangeDetails.masterPublicKey, + tosStatus: getExchangeTosStatusFromRecord(exchange), + tosAcceptedEtag: exchange.tosAcceptedEtag, + wireInfo: exchangeDetails.wireInfo, + protocolVersionRange: exchangeDetails.protocolVersionRange, + tosCurrentEtag: exchange.tosCurrentEtag, + tosAcceptedTimestamp: timestampOptionalPreciseFromDb( + exchange.tosAcceptedTimestamp, + ), + scopeInfo, + }; + + if (options.expectedMasterPub) { + if (res.masterPub !== options.expectedMasterPub) { + throw Error( + "public key of the exchange does not match expected public key", + ); + } + } + return res; + } +} + /** * Ensure that a fresh exchange entry exists for the given * exchange base URL. @@ -933,127 +1063,149 @@ export async function fetchFreshExchange( } = {}, ): Promise { const canonUrl = canonicalizeBaseUrl(baseUrl); - const operationId = constructTaskIdentifier({ - tag: PendingTaskType.ExchangeUpdate, - exchangeBaseUrl: canonUrl, + + ws.ensureTaskLoopRunning(); + + await startUpdateExchangeEntry(ws, canonUrl, { + forceUpdate: options.forceUpdate, }); - const oldExchange = await ws.db - .mktx((x) => [x.exchanges]) - .runReadOnly(async (tx) => { - return tx.exchanges.get(canonUrl); - }); + return waitReadyExchange(ws, canonUrl, options); +} - let needsUpdate = false; +async function waitReadyExchange( + ws: InternalWalletState, + canonUrl: string, + options: { + cancellationToken?: CancellationToken; + forceUpdate?: boolean; + expectedMasterPub?: string; + } = {}, +): Promise { + // FIXME: We should use Symbol.dispose magic here for cleanup! - if (!oldExchange || options.forceUpdate) { - needsUpdate = true; - await startUpdateExchangeEntry(ws, canonUrl, { - forceUpdate: options.forceUpdate, - }); - } else { - const nextUpdate = timestampOptionalAbsoluteFromDb( - oldExchange.nextUpdateStamp, - ); + const exchangeNotifFlag = new AsyncFlag(); + // Raise exchangeNotifFlag whenever we get a notification + // about our exchange. + const cancelNotif = ws.addNotificationListener((notif) => { if ( - nextUpdate == null || - AbsoluteTime.isExpired(nextUpdate) || - oldExchange.updateStatus !== ExchangeEntryDbUpdateStatus.Ready + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === canonUrl ) { - needsUpdate = true; + logger.info(`raising update notification: ${j2s(notif)}`); + exchangeNotifFlag.raise(); } - } + }); - if (needsUpdate) { - await runTaskWithErrorReporting(ws, operationId, () => - updateExchangeFromUrlHandler(ws, canonUrl), + try { + const res = await internalWaitReadyExchange( + ws, + canonUrl, + exchangeNotifFlag, + options, ); + logger.info("done waiting for ready exchange"); + return res; + } finally { + cancelNotif(); } +} - const { exchange, exchangeDetails, retryInfo, scopeInfo } = - await ws.db.runReadOnlyTx( - [ - "exchanges", - "exchangeDetails", - "operationRetries", - "globalCurrencyAuditors", - "globalCurrencyExchanges", - ], - async (tx) => { - const exchange = await tx.exchanges.get(canonUrl); - const exchangeDetails = await getExchangeRecordsInternal(tx, canonUrl); - const retryInfo = await tx.operationRetries.get(operationId); - let scopeInfo: ScopeInfo | undefined = undefined; - if (exchange && exchangeDetails) { - scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); - } - return { exchange, exchangeDetails, retryInfo, scopeInfo }; - }, - ); +/** + * Update an exchange entry in the wallet's database + * by fetching the /keys and /wire information. + * Optionally link the reserve entry to the new or existing + * exchange entry in then DB. + */ +export async function updateExchangeFromUrlHandler( + ws: InternalWalletState, + exchangeBaseUrl: string, + cancellationToken: CancellationToken, +): Promise { + logger.trace(`updating exchange info for ${exchangeBaseUrl}`); + exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); - if (!exchange) { - throw Error("exchange entry does not exist anymore"); + const oldExchangeRec = await ws.db.runReadOnlyTx( + ["exchanges"], + async (tx) => { + return tx.exchanges.get(exchangeBaseUrl); + }, + ); + + if (!oldExchangeRec) { + logger.info(`not updating exchange ${exchangeBaseUrl}, no record in DB`); + return TaskRunResult.finished(); } - switch (exchange.updateStatus) { - case ExchangeEntryDbUpdateStatus.Ready: + let updateRequestedExplicitly = false; + + switch (oldExchangeRec.updateStatus) { + case ExchangeEntryDbUpdateStatus.Suspended: + logger.info(`not updating exchange in status "suspended"`); + return TaskRunResult.finished(); + case ExchangeEntryDbUpdateStatus.Initial: + logger.info(`not updating exchange in status "initial"`); + return TaskRunResult.finished(); + case ExchangeEntryDbUpdateStatus.InitialUpdate: case ExchangeEntryDbUpdateStatus.ReadyUpdate: + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + updateRequestedExplicitly = true; + break; + case ExchangeEntryDbUpdateStatus.Ready: break; default: - throw TalerError.fromDetail(TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }); + assertUnreachable(oldExchangeRec.updateStatus); } - if (!exchangeDetails) { - throw Error("invariant failed"); - } + let refreshCheckNecessary = true; - if (!scopeInfo) { - throw Error("invariant failed"); - } + if (!updateRequestedExplicitly) { + // If the update wasn't requested explicitly, + // check if we really need to update. - const res: ReadyExchangeSummary = { - currency: exchangeDetails.currency, - exchangeBaseUrl: canonUrl, - masterPub: exchangeDetails.masterPublicKey, - tosStatus: getExchangeTosStatusFromRecord(exchange), - tosAcceptedEtag: exchange.tosAcceptedEtag, - wireInfo: exchangeDetails.wireInfo, - protocolVersionRange: exchangeDetails.protocolVersionRange, - tosCurrentEtag: exchange.tosCurrentEtag, - tosAcceptedTimestamp: timestampOptionalPreciseFromDb( - exchange.tosAcceptedTimestamp, - ), - scopeInfo, - }; + let nextUpdateStamp = timestampAbsoluteFromDb( + oldExchangeRec.nextUpdateStamp, + ); - if (options.expectedMasterPub) { - if (res.masterPub !== options.expectedMasterPub) { - throw Error( - "public key of the exchange does not match expected public key", + let nextRefreshCheckStamp = timestampAbsoluteFromDb( + oldExchangeRec.nextRefreshCheckStamp, + ); + + let updateNecessary = true; + + if ( + !AbsoluteTime.isNever(nextUpdateStamp) && + !AbsoluteTime.isExpired(nextUpdateStamp) + ) { + logger.info( + `exchange update for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString( + nextUpdateStamp, + )}`, + ); + updateNecessary = false; + } + + if ( + !AbsoluteTime.isNever(nextRefreshCheckStamp) && + !AbsoluteTime.isExpired(nextRefreshCheckStamp) + ) { + logger.info( + `exchange refresh check for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString( + nextRefreshCheckStamp, + )}`, + ); + refreshCheckNecessary = false; + } + + if (!(updateNecessary || refreshCheckNecessary)) { + return TaskRunResult.runAgainAt( + AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp), ); } } - return res; -} -/** - * Update an exchange entry in the wallet's database - * by fetching the /keys and /wire information. - * Optionally link the reserve entry to the new or existing - * exchange entry in then DB. - */ -export async function updateExchangeFromUrlHandler( - ws: InternalWalletState, - exchangeBaseUrl: string, - options: { - cancellationToken?: CancellationToken; - } = {}, -): Promise { - logger.trace(`updating exchange info for ${exchangeBaseUrl}`); - exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl); + // When doing the auto-refresh check, we always update + // the key info before that. logger.trace("updating exchange /keys info"); @@ -1063,6 +1215,7 @@ export async function updateExchangeFromUrlHandler( exchangeBaseUrl, ws.http, timeout, + cancellationToken, ); logger.trace("validating exchange wire info"); @@ -1302,9 +1455,13 @@ export async function updateExchangeFromUrlHandler( }); if (recoupGroupId) { + const recoupTaskId = constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId, + }); // Asynchronously start recoup. This doesn't need to finish // for the exchange update to be considered finished. - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(recoupTaskId); } if (!updated) { @@ -1313,6 +1470,84 @@ export async function updateExchangeFromUrlHandler( logger.trace("done updating exchange info in database"); + logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); + + let minCheckThreshold = AbsoluteTime.addDuration( + AbsoluteTime.now(), + durationFromSpec({ days: 1 }), + ); + + if (refreshCheckNecessary) { + // Do auto-refresh. + await ws.db + .mktx((x) => [ + x.coins, + x.denominations, + x.coinAvailability, + x.refreshGroups, + x.exchanges, + ]) + .runReadWrite(async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange || !exchange.detailsPointer) { + return; + } + const coins = await tx.coins.indexes.byBaseUrl + .iter(exchangeBaseUrl) + .toArray(); + const refreshCoins: CoinRefreshRequest[] = []; + for (const coin of coins) { + if (coin.status !== CoinStatus.Fresh) { + continue; + } + const denom = await tx.denominations.get([ + exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + logger.warn("denomination not in database"); + continue; + } + const executeThreshold = + getAutoRefreshExecuteThresholdForDenom(denom); + if (AbsoluteTime.isExpired(executeThreshold)) { + refreshCoins.push({ + coinPub: coin.coinPub, + amount: denom.value, + }); + } else { + const checkThreshold = getAutoRefreshCheckThreshold(denom); + minCheckThreshold = AbsoluteTime.min( + minCheckThreshold, + checkThreshold, + ); + } + } + if (refreshCoins.length > 0) { + const res = await createRefreshGroup( + ws, + tx, + exchange.detailsPointer?.currency, + refreshCoins, + RefreshReason.Scheduled, + undefined, + ); + logger.trace( + `created refresh group for auto-refresh (${res.refreshGroupId})`, + ); + } + logger.trace( + `next refresh check at ${AbsoluteTime.toIsoString( + minCheckThreshold, + )}`, + ); + exchange.nextRefreshCheckStamp = timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(minCheckThreshold), + ); + await tx.exchanges.put(exchange); + }); + } + ws.notify({ type: NotificationType.ExchangeStateTransition, exchangeBaseUrl, @@ -1320,7 +1555,33 @@ export async function updateExchangeFromUrlHandler( oldExchangeState: updated.oldExchangeState, }); - return TaskRunResult.finished(); + // Next invocation will cause the task to be run again + // at the necessary time. + return TaskRunResult.progress(); +} + +function getAutoRefreshExecuteThresholdForDenom( + d: DenominationRecord, +): AbsoluteTime { + return getAutoRefreshExecuteThreshold({ + stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw), + stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit), + }); +} + +/** + * Timestamp after which the wallet would do the next check for an auto-refresh. + */ +function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime { + const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(d.stampExpireWithdraw), + ); + const expireDeposit = AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(d.stampExpireDeposit), + ); + const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); + const deltaDiv = durationMul(delta, 0.75); + return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); } /** @@ -1420,6 +1681,7 @@ export async function downloadExchangeInfo( exchangeBaseUrl, http, Duration.getForever(), + CancellationToken.CONTINUE, ); return { keys: keysInfo, diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 52f9c70b1..e00432bd0 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -77,6 +77,7 @@ import { TalerProtocolViolationError, TalerUriAction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -102,12 +103,14 @@ import { WalletStoresV1, } from "../db.js"; import { + AsyncFlag, getCandidateWithdrawalDenomsTx, PendingTaskType, RefundGroupRecord, RefundGroupStatus, RefundItemRecord, RefundItemStatus, + TaskId, timestampPreciseToDb, timestampProtocolFromDb, timestampProtocolToDb, @@ -128,8 +131,6 @@ import { import { constructTaskIdentifier, DbRetryInfo, - runLongpollAsync, - runTaskWithErrorReporting, spendCoins, TaskIdentifiers, TaskRunResult, @@ -147,7 +148,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; /** @@ -156,8 +156,8 @@ import { const logger = new Logger("pay-merchant.ts"); export class PayMerchantTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, @@ -167,7 +167,7 @@ export class PayMerchantTransactionContext implements TransactionContext { tag: TransactionType.Payment, proposalId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId, }); @@ -252,7 +252,7 @@ export class PayMerchantTransactionContext implements TransactionContext { async suspendTransaction(): Promise { const { ws, proposalId, transactionId } = this; - stopLongpolling(ws, this.retryTag); + ws.taskScheduler.stopShepherdTask(this.taskId); const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -270,7 +270,6 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); } async abortTransaction(): Promise { @@ -330,18 +329,18 @@ export class PayMerchantTransactionContext implements TransactionContext { break; } await tx.purchases.put(purchase); - await tx.operationRetries.delete(this.retryTag); + await tx.operationRetries.delete(this.taskId); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; }, ); + ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(this.taskId); } async resumeTransaction(): Promise { - const { ws, proposalId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); + const { ws, proposalId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -358,9 +357,8 @@ export class PayMerchantTransactionContext implements TransactionContext { const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { @@ -394,7 +392,7 @@ export class PayMerchantTransactionContext implements TransactionContext { return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(this.taskId); } } @@ -638,14 +636,18 @@ async function processDownloadProposal( return TaskRunResult.finished(); } + const ctx = new PayMerchantTransactionContext(ws, proposalId); + if (proposal.purchaseStatus != PurchaseStatus.PendingDownloadingProposal) { + logger.error( + `unexpected state ${proposal.purchaseStatus}/${ + PurchaseStatus[proposal.purchaseStatus] + } for ${ctx.transactionId} in processDownloadProposal`, + ); return TaskRunResult.finished(); } - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const transactionId = ctx.transactionId; const orderClaimUrl = new URL( `orders/${proposal.orderId}/claim`, @@ -857,7 +859,7 @@ async function processDownloadProposal( notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } /** @@ -865,7 +867,7 @@ async function processDownloadProposal( * record for the provided arguments already exists, * return the old proposal ID. */ -async function createPurchase( +async function createOrReusePurchase( ws: InternalWalletState, merchantBaseUrl: string, orderId: string, @@ -889,23 +891,26 @@ async function createPurchase( p.claimToken === claimToken ); }); - /* If we have already claimed this proposal with the same sessionId - * nonce and claim token, reuse it. */ + // If we have already claimed this proposal with the same sessionId + // nonce and claim token, reuse it. */ if ( oldProposal && oldProposal.downloadSessionId === sessionId && (!noncePriv || oldProposal.noncePriv === noncePriv) && oldProposal.claimToken === claimToken ) { - // FIXME: This lacks proper error handling - await processDownloadProposal(ws, oldProposal.proposalId); - + logger.info( + `Found old proposal (status=${ + PurchaseStatus[oldProposal.purchaseStatus] + }) for order ${orderId} at ${merchantBaseUrl}`, + ); if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) { const download = await expectProposalDownload(ws, oldProposal); const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + logger.info(`old proposal paid: ${paid}`); if (paid) { - //if this transaction was shared and the order is paid then it - //means that another wallet already paid the proposal + // if this transaction was shared and the order is paid then it + // means that another wallet already paid the proposal const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -990,8 +995,6 @@ async function createPurchase( proposalId, }); notifyTransition(ws, transactionId, transitionInfo); - - await processDownloadProposal(ws, proposalId); return proposalId; } @@ -1244,11 +1247,10 @@ async function handleInsufficientFunds( }); } -// FIXME: Should probably not be exported in its current state // FIXME: Should take a transaction ID instead of a proposal ID // FIXME: Does way more than checking the payment // FIXME: Should return immediately. -export async function checkPaymentByProposalId( +async function checkPaymentByProposalId( ws: InternalWalletState, proposalId: string, sessionId?: string, @@ -1284,10 +1286,9 @@ export async function checkPaymentByProposalId( proposalId = proposal.proposalId; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + const transactionId = ctx.transactionId; const talerUri = stringifyTalerUri({ type: TalerUriAction.Pay, @@ -1377,12 +1378,12 @@ export async function checkPaymentByProposalId( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - // FIXME: What about error handling?! This doesn't properly store errors in the DB. - const r = await processPurchasePay(ws, proposalId); - if (r.type !== TaskRunResultType.Finished) { - // FIXME: This does not surface the original error - throw Error("submitting pay failed"); - } + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Consider changing the API here so that we don't have to + // wait inline for the repurchase. + + await waitPaymentResult(ws, proposalId, sessionId); const download = await expectProposalDownload(ws, purchase); return { status: PreparePayResultType.AlreadyConfirmed, @@ -1476,7 +1477,7 @@ export async function preparePayForUri( ); } - const proposalId = await createPurchase( + const proposalId = await createOrReusePurchase( ws, uriResult.merchantBaseUrl, uriResult.orderId, @@ -1485,9 +1486,79 @@ export async function preparePayForUri( uriResult.noncePriv, ); + await waitProposalDownloaded(ws, proposalId); + return checkPaymentByProposalId(ws, proposalId, uriResult.sessionId); } +/** + * Wait until a proposal is at least downloaded. + */ +async function waitProposalDownloaded( + ws: InternalWalletState, + proposalId: string, +): Promise { + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + logger.info(`waiting for ${ctx.transactionId} to be downloaded`); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: We should use Symbol.dispose magic here for cleanup! + + const payNotifFlag = new AsyncFlag(); + // Raise exchangeNotifFlag whenever we get a notification + // about our exchange. + const cancelNotif = ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + logger.info(`raising update notification: ${j2s(notif)}`); + payNotifFlag.raise(); + } + }); + + try { + await internalWaitProposalDownloaded(ctx, payNotifFlag); + logger.info(`done waiting for ${ctx.transactionId} to be downloaded`); + } finally { + cancelNotif(); + } +} + +async function internalWaitProposalDownloaded( + ctx: PayMerchantTransactionContext, + payNotifFlag: AsyncFlag, +): Promise { + while (true) { + const { purchase, retryInfo } = await ctx.ws.db.runReadOnlyTx( + ["purchases", "operationRetries"], + async (tx) => { + return { + purchase: await tx.purchases.get(ctx.proposalId), + retryInfo: await tx.operationRetries.get(ctx.taskId), + }; + }, + ); + if (!purchase) { + throw Error("purchase does not exist anymore"); + } + if (purchase.download) { + return; + } + if (retryInfo) { + if (retryInfo.lastError) { + throw TalerError.fromUncheckedDetail(retryInfo.lastError); + } else { + throw Error("transient error while waiting for proposal download"); + } + } + await payNotifFlag.wait(); + payNotifFlag.reset(); + } +} + export async function preparePayForTemplate( ws: InternalWalletState, req: PreparePayTemplateRequest, @@ -1598,71 +1669,101 @@ export async function generateDepositPermissions( return depositPermissions; } -/** - * Run the operation handler for a payment - * and return the result as a {@link ConfirmPayResult}. - */ -async function runPayForConfirmPay( - ws: InternalWalletState, - proposalId: string, +async function internalWaitPaymentResult( + ctx: PayMerchantTransactionContext, + purchaseNotifFlag: AsyncFlag, + waitSessionId?: string, ): Promise { - logger.trace("processing proposal for confirmPay"); - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Purchase, - proposalId, - }); - const res = await runTaskWithErrorReporting(ws, taskId, async () => { - return await processPurchasePay(ws, proposalId); - }); - logger.trace(`processPurchasePay response type ${res.type}`); - switch (res.type) { - case TaskRunResultType.Finished: { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); - if (!purchase) { - throw Error("purchase record not available anymore"); + while (true) { + const txRes = await ctx.ws.db.runReadOnlyTx( + ["purchases", "operationRetries"], + async (tx) => { + const purchase = await tx.purchases.get(ctx.proposalId); + const retryRecord = await tx.operationRetries.get(ctx.taskId); + return { purchase, retryRecord }; + }, + ); + + if (!txRes.purchase) { + throw Error("purchase gone"); + } + + const purchase = txRes.purchase; + + logger.info( + `purchase is in state ${PurchaseStatus[purchase.purchaseStatus]}`, + ); + + const d = await expectProposalDownload(ctx.ws, purchase); + + if (txRes.purchase.timestampFirstSuccessfulPay) { + if ( + waitSessionId == null || + txRes.purchase.lastSessionId === waitSessionId + ) { + return { + type: ConfirmPayResultType.Done, + contractTerms: d.contractTermsRaw, + transactionId: ctx.transactionId, + }; } - const d = await expectProposalDownload(ws, purchase); - return { - type: ConfirmPayResultType.Done, - contractTerms: d.contractTermsRaw, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), - }; } - case TaskRunResultType.Error: { - // We hide transient errors from the caller. - const opRetry = await ws.db - .mktx((x) => [x.operationRetries]) - .runReadOnly(async (tx) => tx.operationRetries.get(taskId)); + + if (txRes.retryRecord) { return { type: ConfirmPayResultType.Pending, - lastError: opRetry?.lastError, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), + lastError: txRes.retryRecord.lastError, + transactionId: ctx.transactionId, }; } - case TaskRunResultType.Pending: - logger.trace("reporting pending as confirmPay response"); - return { - type: ConfirmPayResultType.Pending, - transactionId: constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }), - lastError: undefined, - }; - case TaskRunResultType.Longpoll: - throw Error("unexpected processPurchasePay result (longpoll)"); - default: - assertUnreachable(res); + + await purchaseNotifFlag.wait(); + purchaseNotifFlag.reset(); + } +} + +/** + * Wait until either: + * a) the payment succeeded (if provided under the {@param waitSessionId}), or + * b) the attempt to pay failed (merchant unavailable, etc.) + */ +async function waitPaymentResult( + ws: InternalWalletState, + proposalId: string, + waitSessionId?: string, +): Promise { + const ctx = new PayMerchantTransactionContext(ws, proposalId); + + ws.ensureTaskLoopRunning(); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const purchaseNotifFlag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our purchase. + const cancelNotif = ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + purchaseNotifFlag.raise(); + } + }); + + try { + logger.info(`waiting for first payment success on ${ctx.transactionId}`); + const res = await internalWaitPaymentResult( + ctx, + purchaseNotifFlag, + waitSessionId, + ); + logger.info( + `done waiting for first payment success on ${ctx.transactionId}, result ${res.type}`, + ); + return res; + } finally { + cancelNotif(); } } @@ -1719,7 +1820,12 @@ export async function confirmPay( if (existingPurchase && existingPurchase.payInfo) { logger.trace("confirmPay: submitting payment for existing purchase"); - return runPayForConfirmPay(ws, proposalId); + const ctx = new PayMerchantTransactionContext( + ws, + existingPurchase.proposalId, + ); + await ws.taskScheduler.resetTaskRetries(ctx.taskId); + return waitPaymentResult(ws, proposalId); } logger.trace("confirmPay: purchase record does not exist yet"); @@ -1817,9 +1923,8 @@ export async function confirmPay( hintTransactionId: transactionId, }); - // We directly make a first attempt to pay. - // FIXME: In the future we should just wait for the right event - return runPayForConfirmPay(ws, proposalId); + // Wait until we have completed the first attempt to pay. + return waitPaymentResult(ws, proposalId); } export async function processPurchase( @@ -2017,7 +2122,7 @@ async function processPurchasePay( // FIXME: Should we really consider this to be pending? - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -2076,7 +2181,7 @@ async function processPurchasePay( await storePayReplaySuccess(ws, proposalId, sessionId); } - return TaskRunResult.finished(); + return TaskRunResult.progress(); } export async function refuseProposal( @@ -2365,7 +2470,7 @@ export async function sharePayment( p.purchaseStatus !== PurchaseStatus.DialogProposed && p.purchaseStatus !== PurchaseStatus.DialogShared ) { - //FIXME: purchase can be shared before being paid + // FIXME: purchase can be shared before being paid return undefined; } if (p.purchaseStatus === PurchaseStatus.DialogProposed) { @@ -2426,57 +2531,37 @@ async function processPurchaseDialogShared( ): Promise { const proposalId = purchase.proposalId; logger.trace(`processing dialog-shared for proposal ${proposalId}`); - - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Purchase, - proposalId, - }); - - // FIXME: Put this logic into runLongpollAsync? - if (ws.activeLongpoll[taskId]) { - return TaskRunResult.longpoll(); - } const download = await expectProposalDownload(ws, purchase); if (purchase.purchaseStatus !== PurchaseStatus.DialogShared) { return TaskRunResult.finished(); } - runLongpollAsync(ws, taskId, async (ct) => { - const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); - if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.FailedClaim; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, + const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); + if (paid) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.FailedClaim; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; }); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId, + }); - notifyTransition(ws, transactionId, transitionInfo); - - return { - ready: true, - }; - } - - return { - ready: false, - }; - }); + notifyTransition(ws, transactionId, transitionInfo); + } - return TaskRunResult.longpoll(); + return TaskRunResult.backoff(); } async function processPurchaseAutoRefund( @@ -2496,97 +2581,81 @@ async function processPurchaseAutoRefund( proposalId, }); - // FIXME: Put this logic into runLongpollAsync? - if (ws.activeLongpoll[taskId]) { - return TaskRunResult.longpoll(); - } - const download = await expectProposalDownload(ws, purchase); - runLongpollAsync(ws, taskId, async (ct) => { - if ( - !purchase.autoRefundDeadline || - AbsoluteTime.isExpired( - AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(purchase.autoRefundDeadline), - ), - ) - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) { - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.Done; - p.refundAmountAwaiting = undefined; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } + if ( + !purchase.autoRefundDeadline || + AbsoluteTime.isExpired( + AbsoluteTime.fromProtocolTimestamp( + timestampProtocolFromDb(purchase.autoRefundDeadline), + ), + ) + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) { + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.Done; + p.refundAmountAwaiting = undefined; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.finished(); + } - const requestUrl = new URL( - `orders/${download.contractData.orderId}`, - download.contractData.merchantBaseUrl, - ); - requestUrl.searchParams.set( - "h_contract", - download.contractData.contractTermsHash, - ); + const requestUrl = new URL( + `orders/${download.contractData.orderId}`, + download.contractData.merchantBaseUrl, + ); + requestUrl.searchParams.set( + "h_contract", + download.contractData.contractTermsHash, + ); - requestUrl.searchParams.set("timeout_ms", "1000"); - requestUrl.searchParams.set("await_refund_obtained", "yes"); + requestUrl.searchParams.set("timeout_ms", "1000"); + requestUrl.searchParams.set("await_refund_obtained", "yes"); - const resp = await ws.http.fetch(requestUrl.href); + const resp = await ws.http.fetch(requestUrl.href); - // FIXME: Check other status codes! + // FIXME: Check other status codes! - const orderStatus = await readSuccessResponseJsonOrThrow( - resp, - codecForMerchantOrderStatusPaid(), - ); + const orderStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForMerchantOrderStatusPaid(), + ); - if (orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(purchase.proposalId); - if (!p) { - logger.warn("purchase does not exist anymore"); - return; - } - if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) { - return; - } - const oldTxState = computePayMerchantTransactionState(p); - p.purchaseStatus = PurchaseStatus.PendingAcceptRefund; - const newTxState = computePayMerchantTransactionState(p); - await tx.purchases.put(p); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } else { - return { - ready: false, - }; - } - }); + if (orderStatus.refund_pending) { + const transitionInfo = await ws.db + .mktx((x) => [x.purchases]) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(purchase.proposalId); + if (!p) { + logger.warn("purchase does not exist anymore"); + return; + } + if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) { + return; + } + const oldTxState = computePayMerchantTransactionState(p); + p.purchaseStatus = PurchaseStatus.PendingAcceptRefund; + const newTxState = computePayMerchantTransactionState(p); + await tx.purchases.put(p); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } - return TaskRunResult.longpoll(); + return TaskRunResult.backoff(); } async function processPurchaseAbortingRefund( @@ -2734,7 +2803,7 @@ async function processPurchaseQueryRefund( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } else { const refundAwaiting = Amounts.sub( Amounts.parseOrThrow(orderStatus.refund_amount), @@ -2760,7 +2829,7 @@ async function processPurchaseQueryRefund( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } } @@ -2836,10 +2905,7 @@ export async function startQueryRefund( ws: InternalWalletState, proposalId: string, ): Promise { - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId, - }); + const ctx = new PayMerchantTransactionContext(ws, proposalId); const transitionInfo = await ws.db .mktx((x) => [x.purchases]) .runReadWrite(async (tx) => { @@ -2857,8 +2923,8 @@ export async function startQueryRefund( await tx.purchases.put(p); return { oldTxState, newTxState }; }); - notifyTransition(ws, transactionId, transitionInfo); - ws.workAvailable.trigger(); + notifyTransition(ws, ctx.transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(ctx.taskId); } async function computeRefreshRequest( @@ -3128,10 +3194,10 @@ async function storeRefunds( notifyTransition(ws, transactionId, result.transitionInfo); if (result.numPendingItemsTotal > 0) { - return TaskRunResult.pending(); + return TaskRunResult.backoff(); + } else { + return TaskRunResult.progress(); } - - return TaskRunResult.finished(); } export function computeRefundTransactionState( diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts index e655eba4b..cc41abde9 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts @@ -33,6 +33,7 @@ import { TalerProtocolTimestamp, TalerUriAction, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -62,17 +63,15 @@ import { timestampPreciseToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant } from "../util/invariants.js"; import { - LongpollResult, TaskRunResult, TaskRunResultType, TombstoneTag, TransactionContext, constructTaskIdentifier, - runLongpollAsync, } from "./common.js"; import { codecForExchangePurseStatus, @@ -81,7 +80,6 @@ import { import { constructTransactionIdentifier, notifyTransition, - stopLongpolling, } from "./transactions.js"; import { getExchangeWithdrawalInfo, @@ -91,8 +89,8 @@ import { const logger = new Logger("pay-peer-pull-credit.ts"); export class PeerPullCreditTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: TransactionIdStr; + readonly retryTag: TaskId; constructor( public ws: InternalWalletState, @@ -139,7 +137,6 @@ export class PeerPullCreditTransactionContext implements TransactionContext { async suspendTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -193,12 +190,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -244,11 +241,11 @@ export class PeerPullCreditTransactionContext implements TransactionContext { return undefined; }); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.stopShepherdTask(retryTag); } async resumeTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -301,13 +298,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async abortTransaction(): Promise { const { ws, pursePub, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPullCredit]) .runReadWrite(async (tx) => { @@ -355,7 +351,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -363,7 +361,7 @@ async function queryPurseForPeerPullCredit( ws: InternalWalletState, pullIni: PeerPullCreditRecord, cancellationToken: CancellationToken, -): Promise { +): Promise { const purseDepositUrl = new URL( `purses/${pullIni.pursePub}/deposit`, pullIni.exchangeBaseUrl, @@ -401,10 +399,10 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; + return TaskRunResult.backoff(); } case HttpStatusCode.NotFound: - return { ready: false }; + return TaskRunResult.backoff(); } const result = await readSuccessResponseJsonOrThrow( @@ -418,7 +416,7 @@ async function queryPurseForPeerPullCredit( if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) { logger.info("purse not ready yet (no deposit)"); - return { ready: false }; + return TaskRunResult.backoff(); } const reserve = await ws.db @@ -462,9 +460,7 @@ async function queryPurseForPeerPullCredit( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; + return TaskRunResult.backoff(); } async function longpollKycStatus( @@ -473,6 +469,7 @@ async function longpollKycStatus( exchangeUrl: string, kycInfo: KycPendingInfo, userType: KycUserType, + cancellationToken: CancellationToken, ): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, @@ -483,56 +480,47 @@ async function longpollKycStatus( pursePub, }); - runLongpollAsync(ws, retryTag, async (ct) => { - const url = new URL( - `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, - exchangeUrl, - ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken: ct, - }); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.peerPullCredit]) - .runReadWrite(async (tx) => { - const peerIni = await tx.peerPullCredit.get(pursePub); - if (!peerIni) { - return; - } - if ( - peerIni.status !== - PeerPullPaymentCreditStatus.PendingMergeKycRequired - ) { - return; - } - const oldTxState = computePeerPullCreditTransactionState(peerIni); - peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse; - const newTxState = computePeerPullCreditTransactionState(peerIni); - await tx.peerPullCredit.put(peerIni); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - // FIXME: Do we have to update the URL here? - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + exchangeUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return { - type: TaskRunResultType.Longpoll, - }; + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.peerPullCredit]) + .runReadWrite(async (tx) => { + const peerIni = await tx.peerPullCredit.get(pursePub); + if (!peerIni) { + return; + } + if ( + peerIni.status !== PeerPullPaymentCreditStatus.PendingMergeKycRequired + ) { + return; + } + const oldTxState = computePeerPullCreditTransactionState(peerIni); + peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse; + const newTxState = computePeerPullCreditTransactionState(peerIni); + await tx.peerPullCredit.put(peerIni); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } async function processPeerPullCreditAbortingDeletePurse( @@ -584,7 +572,7 @@ async function processPeerPullCreditAbortingDeletePurse( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function handlePeerPullCreditWithdrawing( @@ -637,7 +625,7 @@ async function handlePeerPullCreditWithdrawing( return TaskRunResult.finished(); } else { // FIXME: Return indicator that we depend on the other operation! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } @@ -757,13 +745,13 @@ async function handlePeerPullCreditCreatePurse( return { oldTxState, newTxState }; }); notifyTransition(ws, transactionId, transitionInfo); - - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } export async function processPeerPullCredit( ws: InternalWalletState, pursePub: string, + cancellationToken: CancellationToken, ): Promise { const pullIni = await ws.db .mktx((x) => [x.peerPullCredit]) @@ -779,14 +767,6 @@ export async function processPeerPullCredit( pursePub, }); - // We're already running! - if (ws.activeLongpoll[retryTag]) { - logger.info("peer-pull-credit already in long-polling, returning!"); - return { - type: TaskRunResultType.Longpoll, - }; - } - logger.trace(`processing ${retryTag}, status=${pullIni.status}`); switch (pullIni.status) { @@ -794,15 +774,7 @@ export async function processPeerPullCredit( return TaskRunResult.finished(); } case PeerPullPaymentCreditStatus.PendingReady: - runLongpollAsync(ws, retryTag, async (cancellationToken) => - queryPurseForPeerPullCredit(ws, pullIni, cancellationToken), - ); - logger.trace( - "returning early from processPeerPullCredit for long-polling in background", - ); - return { - type: TaskRunResultType.Longpoll, - }; + return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken); case PeerPullPaymentCreditStatus.PendingMergeKycRequired: { if (!pullIni.kycInfo) { throw Error("invalid state, kycInfo required"); @@ -813,6 +785,7 @@ export async function processPeerPullCredit( pullIni.exchangeBaseUrl, pullIni.kycInfo, "individual", + cancellationToken, ); } case PeerPullPaymentCreditStatus.PendingCreatePurse: @@ -866,7 +839,7 @@ async function processPeerPullCreditKycRequired( kycStatusRes.status === HttpStatusCode.NoContent ) { logger.warn("kyc requested, but already fulfilled"); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusRes.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); @@ -906,7 +879,7 @@ async function processPeerPullCreditKycRequired( }; }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } else { throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); } @@ -1095,20 +1068,16 @@ export async function initiatePeerPullPayment( return { oldTxState, newTxState }; }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.PeerPullCredit, - pursePub: pursePair.pub, - }); + const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub); // The pending-incoming balance has changed. ws.notify({ type: NotificationType.BalanceChange, - hintTransactionId: transactionId, + hintTransactionId: ctx.transactionId, }); - notifyTransition(ws, transactionId, transitionInfo); - - ws.workAvailable.trigger(); + notifyTransition(ws, ctx.transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(ctx.retryTag); return { talerUri: stringifyTalerUri({ @@ -1116,7 +1085,7 @@ export async function initiatePeerPullPayment( exchangeBaseUrl: exchangeBaseUrl, contractPriv: contractKeyPair.priv, }), - transactionId, + transactionId: ctx.transactionId, }; } diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts index c7e447dab..e5ae6b73b 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts @@ -42,6 +42,7 @@ import { TalerPreciseTimestamp, TalerProtocolViolationError, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -69,6 +70,7 @@ import { PendingTaskType, RefreshOperationStatus, StoreNames, + TaskId, WalletStoresV1, createRefreshGroup, timestampPreciseToDb, @@ -93,7 +95,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; const logger = new Logger("pay-peer-pull-debit.ts"); @@ -103,8 +104,8 @@ const logger = new Logger("pay-peer-pull-debit.ts"); */ export class PeerPullDebitTransactionContext implements TransactionContext { ws: InternalWalletState; - transactionId: string; - taskId: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; peerPullDebitId: string; constructor(ws: InternalWalletState, peerPullDebitId: string) { @@ -140,7 +141,6 @@ export class PeerPullDebitTransactionContext implements TransactionContext { const transactionId = this.transactionId; const ws = this.ws; const peerPullDebitId = this.peerPullDebitId; - stopLongpolling(ws, taskId); const transitionInfo = await ws.db .mktx((x) => [x.peerPullDebit]) .runReadWrite(async (tx) => { @@ -185,11 +185,11 @@ export class PeerPullDebitTransactionContext implements TransactionContext { return undefined; }); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.stopShepherdTask(taskId); } async resumeTransaction(): Promise { const ctx = this; - stopLongpolling(ctx.ws, ctx.taskId); await ctx.transition(async (pi) => { switch (pi.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: @@ -207,11 +207,11 @@ export class PeerPullDebitTransactionContext implements TransactionContext { return TransitionResult.Stay; } }); + this.ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { const ctx = this; - stopLongpolling(ctx.ws, ctx.taskId); await ctx.transition(async (pi) => { switch (pi.status) { case PeerPullDebitRecordStatus.SuspendedDeposit: @@ -225,6 +225,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext { return TransitionResult.Stay; } }); + this.ws.taskScheduler.stopShepherdTask(this.taskId); } async abortTransaction(): Promise { @@ -325,7 +326,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext { } }, ); + ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, this.transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(this.taskId); } } @@ -405,7 +408,7 @@ async function handlePurseCreationConflict( } await tx.peerPullDebit.put(myPpi); }); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } async function processPeerPullDebitPendingDeposit( @@ -469,7 +472,7 @@ async function processPeerPullDebitPendingDeposit( } case HttpStatusCode.Gone: { await ctx.abortTransaction(); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } case HttpStatusCode.Conflict: { return handlePurseCreationConflict(ctx, peerPullInc, httpResp); @@ -529,7 +532,7 @@ async function processPeerPullDebitAbortingRefresh( }); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } export async function processPeerPullDebit( @@ -607,7 +610,7 @@ export async function confirmPeerPullDebit( coinSelRes.result.coins, ); - const ppi = await ws.db + await ws.db .mktx((x) => [ x.exchanges, x.coins, @@ -643,19 +646,19 @@ export async function confirmPeerPullDebit( }; } await tx.peerPullDebit.put(pi); - return pi; }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.PeerPullDebit, - peerPullDebitId, - }); + const ctx = new PeerPullDebitTransactionContext(ws, peerPullDebitId); + + const transactionId = ctx.transactionId; ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, }); + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { transactionId, }; diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts index 427961f44..23976f11b 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts @@ -17,6 +17,7 @@ import { AcceptPeerPushPaymentResponse, Amounts, + CancellationToken, ConfirmPeerPushCreditRequest, ContractTermsUtil, ExchangePurseMergeRequest, @@ -54,9 +55,10 @@ import { InternalWalletState, KycPendingInfo, KycUserType, - PeerPushPaymentIncomingRecord, PeerPushCreditStatus, + PeerPushPaymentIncomingRecord, PendingTaskType, + TaskId, WithdrawalGroupStatus, WithdrawalRecordType, timestampPreciseToDb, @@ -69,9 +71,8 @@ import { TombstoneTag, TransactionContext, constructTaskIdentifier, - runLongpollAsync, } from "./common.js"; -import { fetchFreshExchange, markExchangeUsed } from "./exchanges.js"; +import { fetchFreshExchange } from "./exchanges.js"; import { codecForExchangePurseStatus, getMergeReserveInfo, @@ -81,7 +82,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; import { PerformCreateWithdrawalGroupResult, @@ -93,8 +93,8 @@ import { const logger = new Logger("pay-peer-push-credit.ts"); export class PeerPushCreditTransactionContext implements TransactionContext { - private transactionId: string; - private retryTag: string; + readonly transactionId: string; + readonly retryTag: TaskId; constructor( public ws: InternalWalletState, @@ -141,7 +141,6 @@ export class PeerPushCreditTransactionContext implements TransactionContext { async suspendTransaction(): Promise { const { ws, peerPushCreditId, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -191,11 +190,11 @@ export class PeerPushCreditTransactionContext implements TransactionContext { }, ); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.stopShepherdTask(retryTag); } async abortTransaction(): Promise { const { ws, peerPushCreditId, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -248,11 +247,11 @@ export class PeerPushCreditTransactionContext implements TransactionContext { }, ); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise { const { ws, peerPushCreditId, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -300,13 +299,12 @@ export class PeerPushCreditTransactionContext implements TransactionContext { return undefined; }, ); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { const { ws, peerPushCreditId, retryTag, transactionId } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db.runReadWriteTx( ["peerPushCredit"], async (tx) => { @@ -349,8 +347,9 @@ export class PeerPushCreditTransactionContext implements TransactionContext { return undefined; }, ); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -521,63 +520,51 @@ async function longpollKycStatus( exchangeUrl: string, kycInfo: KycPendingInfo, userType: KycUserType, + cancellationToken: CancellationToken, ): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, peerPushCreditId, }); - const retryTag = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushCredit, - peerPushCreditId, - }); - - runLongpollAsync(ws, retryTag, async (ct) => { - const url = new URL( - `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, - exchangeUrl, - ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken: ct, - }); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - const transitionInfo = await ws.db - .mktx((x) => [x.peerPushCredit]) - .runReadWrite(async (tx) => { - const peerInc = await tx.peerPushCredit.get(peerPushCreditId); - if (!peerInc) { - return; - } - if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) { - return; - } - const oldTxState = computePeerPushCreditTransactionState(peerInc); - peerInc.status = PeerPushCreditStatus.PendingMerge; - const newTxState = computePeerPushCreditTransactionState(peerInc); - await tx.peerPushCredit.put(peerInc); - return { oldTxState, newTxState }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - // FIXME: Do we have to update the URL here? - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + exchangeUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return { - type: TaskRunResultType.Longpoll, - }; + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db + .mktx((x) => [x.peerPushCredit]) + .runReadWrite(async (tx) => { + const peerInc = await tx.peerPushCredit.get(peerPushCreditId); + if (!peerInc) { + return; + } + if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) { + return; + } + const oldTxState = computePeerPushCreditTransactionState(peerInc); + peerInc.status = PeerPushCreditStatus.PendingMerge; + const newTxState = computePeerPushCreditTransactionState(peerInc); + await tx.peerPushCredit.put(peerInc); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } async function processPeerPushCreditKycRequired( @@ -786,7 +773,7 @@ async function handlePendingMerge( ); notifyTransition(ws, transactionId, txRes?.peerPushCreditTransition); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } async function handlePendingWithdrawing( @@ -839,13 +826,14 @@ async function handlePendingWithdrawing( return TaskRunResult.finished(); } else { // FIXME: Return indicator that we depend on the other operation! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } } export async function processPeerPushCredit( ws: InternalWalletState, peerPushCreditId: string, + cancellationToken: CancellationToken, ): Promise { let peerInc: PeerPushPaymentIncomingRecord | undefined; let contractTerms: PeerContractTerms | undefined; @@ -886,6 +874,7 @@ export async function processPeerPushCredit( peerInc.exchangeBaseUrl, peerInc.kycInfo, "individual", + cancellationToken, ); } @@ -940,7 +929,9 @@ export async function confirmPeerPushCredit( ); } - ws.workAvailable.trigger(); + const ctx = new PeerPushCreditTransactionContext(ws, peerPushCreditId); + + ws.taskScheduler.startShepherdTask(ctx.retryTag); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts index 2e5af4e78..165c8deee 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts @@ -16,6 +16,7 @@ import { Amounts, + CancellationToken, CheckPeerPushDebitRequest, CheckPeerPushDebitResponse, CoinRefreshRequest, @@ -32,6 +33,7 @@ import { TalerProtocolTimestamp, TalerProtocolViolationError, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -56,7 +58,7 @@ import { timestampProtocolToDb, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { PeerCoinRepair, selectPeerCoins } from "../util/coinSelection.js"; import { checkLogicInvariant } from "../util/invariants.js"; @@ -65,7 +67,6 @@ import { TaskRunResultType, TransactionContext, constructTaskIdentifier, - runLongpollAsync, spendCoins, } from "./common.js"; import { @@ -76,14 +77,13 @@ import { import { constructTransactionIdentifier, notifyTransition, - stopLongpolling, } from "./transactions.js"; const logger = new Logger("pay-peer-push-debit.ts"); export class PeerPushDebitTransactionContext implements TransactionContext { - public transactionId: string; - public retryTag: string; + readonly transactionId: TransactionIdStr; + readonly retryTag: TaskId; constructor( public ws: InternalWalletState, @@ -114,7 +114,6 @@ export class PeerPushDebitTransactionContext implements TransactionContext { async suspendTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -166,12 +165,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -218,12 +217,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async resumeTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -275,13 +275,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async failTransaction(): Promise { const { ws, pursePub, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.peerPushDebit]) .runReadWrite(async (tx) => { @@ -328,7 +327,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -432,7 +433,7 @@ async function handlePurseCreationConflict( } await tx.peerPushDebit.put(myPpi); }); - return TaskRunResult.finished(); + return TaskRunResult.progress(); } async function processPeerPushDebitCreateReserve( @@ -554,7 +555,7 @@ async function processPeerPushDebitCreateReserve( stTo: PeerPushDebitStatus.PendingReady, }); - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingDeletePurse( @@ -628,7 +629,7 @@ async function processPeerPushDebitAbortingDeletePurse( }); notifyTransition(ws, transactionId, transitionInfo); - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } interface SimpleTransition { @@ -712,7 +713,7 @@ async function processPeerPushDebitAbortingRefreshDeleted( }); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingRefreshExpired( @@ -760,7 +761,7 @@ async function processPeerPushDebitAbortingRefreshExpired( }); notifyTransition(ws, transactionId, transitionInfo); // FIXME: Shouldn't this be finished in some cases?! - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } /** @@ -769,118 +770,102 @@ async function processPeerPushDebitAbortingRefreshExpired( async function processPeerPushDebitReady( ws: InternalWalletState, peerPushInitiation: PeerPushDebitRecord, + cancellationToken: CancellationToken, ): Promise { logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; - const retryTag = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); const transactionId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub, }); - runLongpollAsync(ws, retryTag, async (ct) => { - const mergeUrl = new URL( - `purses/${pursePub}/merge`, - peerPushInitiation.exchangeBaseUrl, + const mergeUrl = new URL( + `purses/${pursePub}/merge`, + peerPushInitiation.exchangeBaseUrl, + ); + mergeUrl.searchParams.set("timeout_ms", "30000"); + logger.info(`long-polling on purse status at ${mergeUrl.href}`); + const resp = await ws.http.fetch(mergeUrl.href, { + // timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken, + }); + if (resp.status === HttpStatusCode.Ok) { + const purseStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForExchangePurseStatus(), ); - mergeUrl.searchParams.set("timeout_ms", "30000"); - logger.info(`long-polling on purse status at ${mergeUrl.href}`); - const resp = await ws.http.fetch(mergeUrl.href, { - // timeout: getReserveRequestTimeout(withdrawalGroup), - cancellationToken: ct, - }); - if (resp.status === HttpStatusCode.Ok) { - const purseStatus = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), + const mergeTimestamp = purseStatus.merge_timestamp; + logger.info(`got purse status ${j2s(purseStatus)}`); + if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { + return TaskRunResult.backoff(); + } else { + await transitionPeerPushDebitTransaction( + ws, + peerPushInitiation.pursePub, + { + stFrom: PeerPushDebitStatus.PendingReady, + stTo: PeerPushDebitStatus.Done, + }, ); - const mergeTimestamp = purseStatus.merge_timestamp; - logger.info(`got purse status ${j2s(purseStatus)}`); - if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { - return { ready: false }; - } else { - await transitionPeerPushDebitTransaction( + return TaskRunResult.finished(); + } + } else if (resp.status === HttpStatusCode.Gone) { + logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); + const transitionInfo = await ws.db + .mktx((x) => [ + x.peerPushDebit, + x.refreshGroups, + x.denominations, + x.coinAvailability, + x.coins, + ]) + .runReadWrite(async (tx) => { + const ppiRec = await tx.peerPushDebit.get(pursePub); + if (!ppiRec) { + return undefined; + } + if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { + return undefined; + } + const currency = Amounts.currencyOf(ppiRec.amount); + const oldTxState = computePeerPushDebitTransactionState(ppiRec); + const coinPubs: CoinRefreshRequest[] = []; + + for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: ppiRec.coinSel.contributions[i], + coinPub: ppiRec.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( ws, - peerPushInitiation.pursePub, - { - stFrom: PeerPushDebitStatus.PendingReady, - stTo: PeerPushDebitStatus.Done, - }, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + transactionId, ); + ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; + ppiRec.abortRefreshGroupId = refresh.refreshGroupId; + await tx.peerPushDebit.put(ppiRec); + const newTxState = computePeerPushDebitTransactionState(ppiRec); return { - ready: true, + oldTxState, + newTxState, }; - } - } else if (resp.status === HttpStatusCode.Gone) { - logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - const transitionInfo = await ws.db - .mktx((x) => [ - x.peerPushDebit, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - ]) - .runReadWrite(async (tx) => { - const ppiRec = await tx.peerPushDebit.get(pursePub); - if (!ppiRec) { - return undefined; - } - if (ppiRec.status !== PeerPushDebitStatus.PendingReady) { - return undefined; - } - const currency = Amounts.currencyOf(ppiRec.amount); - const oldTxState = computePeerPushDebitTransactionState(ppiRec); - const coinPubs: CoinRefreshRequest[] = []; - - for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: ppiRec.coinSel.contributions[i], - coinPub: ppiRec.coinSel.coinPubs[i], - }); - } - - const refresh = await createRefreshGroup( - ws, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - transactionId, - ); - ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired; - ppiRec.abortRefreshGroupId = refresh.refreshGroupId; - await tx.peerPushDebit.put(ppiRec); - const newTxState = computePeerPushDebitTransactionState(ppiRec); - return { - oldTxState, - newTxState, - }; - }); - notifyTransition(ws, transactionId, transitionInfo); - return { - ready: true, - }; - } else { - logger.warn(`unexpected HTTP status for purse: ${resp.status}`); - return { - ready: false, - }; - } - }); - logger.trace( - "returning early from peer-push-debit for long-polling in background", - ); - return { - type: TaskRunResultType.Longpoll, - }; + }); + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.backoff(); + } else { + logger.warn(`unexpected HTTP status for purse: ${resp.status}`); + return TaskRunResult.backoff(); + } } export async function processPeerPushDebit( ws: InternalWalletState, pursePub: string, + cancellationToken: CancellationToken, ): Promise { const peerPushInitiation = await ws.db .mktx((x) => [x.peerPushDebit]) @@ -891,24 +876,15 @@ export async function processPeerPushDebit( throw Error("peer push payment not found"); } - const retryTag = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); - - // We're already running! - if (ws.activeLongpoll[retryTag]) { - logger.info("peer-push-debit task already in long-polling, returning!"); - return { - type: TaskRunResultType.Longpoll, - }; - } - switch (peerPushInitiation.status) { case PeerPushDebitStatus.PendingCreatePurse: return processPeerPushDebitCreateReserve(ws, peerPushInitiation); case PeerPushDebitStatus.PendingReady: - return processPeerPushDebitReady(ws, peerPushInitiation); + return processPeerPushDebitReady( + ws, + peerPushInitiation, + cancellationToken, + ); case PeerPushDebitStatus.AbortingDeletePurse: return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation); case PeerPushDebitStatus.AbortingRefreshDeleted: @@ -971,10 +947,9 @@ export async function initiatePeerPushDebit( const pursePub = pursePair.pub; - const transactionId = constructTaskIdentifier({ - tag: PendingTaskType.PeerPushDebit, - pursePub, - }); + const ctx = new PeerPushDebitTransactionContext(ws, pursePub); + + const transactionId = ctx.transactionId; const contractEncNonce = encodeCrock(getRandomBytes(24)); @@ -1044,6 +1019,8 @@ export async function initiatePeerPushDebit( hintTransactionId: transactionId, }); + ws.taskScheduler.startShepherdTask(ctx.retryTag); + return { contractPriv: contractKeyPair.priv, mergePriv: mergePair.priv, diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts deleted file mode 100644 index 990d9a7b3..000000000 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ /dev/null @@ -1,814 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2019 GNUnet e.V. - - GNU Taler is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - GNU Taler; see the file COPYING. If not, see - */ - -/** - * Derive pending tasks from the wallet database. - */ - -/** - * Imports. - */ -import { GlobalIDB } from "@gnu-taler/idb-bridge"; -import { - AbsoluteTime, - TalerPreciseTimestamp, - TransactionRecordFilter, -} from "@gnu-taler/taler-util"; -import { - BackupProviderStateTag, - DbPreciseTimestamp, - DepositElementStatus, - DepositGroupRecord, - ExchangeEntryDbUpdateStatus, - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - PeerPullCreditRecord, - PeerPullDebitRecordStatus, - PeerPullPaymentIncomingRecord, - PeerPushCreditStatus, - PeerPushDebitRecord, - PeerPushPaymentIncomingRecord, - PurchaseRecord, - PurchaseStatus, - RefreshCoinStatus, - RefreshGroupRecord, - RefreshOperationStatus, - RefundGroupRecord, - RewardRecord, - WalletStoresV1, - WithdrawalGroupRecord, - timestampAbsoluteFromDb, - timestampOptionalAbsoluteFromDb, - timestampPreciseFromDb, - timestampPreciseToDb, -} from "../db.js"; -import { InternalWalletState } from "../internal-wallet-state.js"; -import { - PendingOperationsResponse, - PendingTaskType, - TaskId, -} from "../pending-types.js"; -import { GetReadOnlyAccess } from "../util/query.js"; -import { TaskIdentifiers } from "./common.js"; - -function getPendingCommon( - ws: InternalWalletState, - opTag: TaskId, - timestampDue: AbsoluteTime, -): { - id: TaskId; - isDue: boolean; - timestampDue: AbsoluteTime; - isLongpolling: boolean; -} { - const isDue = - AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag]; - return { - id: opTag, - isDue, - timestampDue, - isLongpolling: !!ws.activeLongpoll[opTag], - }; -} - -async function gatherExchangePending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - exchanges: typeof WalletStoresV1.exchanges; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - let timestampDue: DbPreciseTimestamp | undefined = undefined; - await tx.exchanges.iter().forEachAsync(async (exch) => { - switch (exch.updateStatus) { - case ExchangeEntryDbUpdateStatus.Initial: - case ExchangeEntryDbUpdateStatus.Suspended: - return; - } - const opUpdateExchangeTag = TaskIdentifiers.forExchangeUpdate(exch); - let opr = await tx.operationRetries.get(opUpdateExchangeTag); - - switch (exch.updateStatus) { - case ExchangeEntryDbUpdateStatus.Ready: - timestampDue = opr?.retryInfo.nextRetry ?? exch.nextRefreshCheckStamp; - break; - case ExchangeEntryDbUpdateStatus.ReadyUpdate: - case ExchangeEntryDbUpdateStatus.InitialUpdate: - case ExchangeEntryDbUpdateStatus.UnavailableUpdate: - timestampDue = - opr?.retryInfo.nextRetry ?? - timestampPreciseToDb(TalerPreciseTimestamp.now()); - break; - } - - resp.pendingOperations.push({ - type: PendingTaskType.ExchangeUpdate, - ...getPendingCommon( - ws, - opUpdateExchangeTag, - AbsoluteTime.fromPreciseTimestamp(timestampPreciseFromDb(timestampDue)), - ), - givesLifeness: false, - exchangeBaseUrl: exch.baseUrl, - lastError: opr?.lastError, - }); - - // We only schedule a check for auto-refresh if the exchange update - // was successful. - if (!opr?.lastError) { - const opCheckRefreshTag = TaskIdentifiers.forExchangeCheckRefresh(exch); - resp.pendingOperations.push({ - type: PendingTaskType.ExchangeCheckRefresh, - ...getPendingCommon( - ws, - opCheckRefreshTag, - AbsoluteTime.fromPreciseTimestamp( - timestampPreciseFromDb(timestampDue), - ), - ), - timestampDue: AbsoluteTime.fromPreciseTimestamp( - timestampPreciseFromDb(exch.nextRefreshCheckStamp), - ), - givesLifeness: false, - exchangeBaseUrl: exch.baseUrl, - }); - } - }); -} - -/** - * Iterate refresh records based on a filter. - */ -export async function iterRecordsForRefresh( - tx: GetReadOnlyAccess<{ - refreshGroups: typeof WalletStoresV1.refreshGroups; - }>, - filter: TransactionRecordFilter, - f: (r: RefreshGroupRecord) => Promise, -): Promise { - let refreshGroups: RefreshGroupRecord[]; - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - RefreshOperationStatus.Pending, - RefreshOperationStatus.Suspended, - ); - refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange); - } else { - refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(); - } - - for (const r of refreshGroups) { - await f(r); - } -} - -async function gatherRefreshPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - refreshGroups: typeof WalletStoresV1.refreshGroups; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => { - if (r.timestampFinished) { - return; - } - const opId = TaskIdentifiers.forRefresh(r); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Refresh, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - refreshGroupId: r.refreshGroupId, - finishedPerCoin: r.statusPerCoin.map( - (x) => x === RefreshCoinStatus.Finished, - ), - retryInfo: retryRecord?.retryInfo, - }); - }); -} - -export async function iterRecordsForWithdrawal( - tx: GetReadOnlyAccess<{ - withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; - }>, - filter: TransactionRecordFilter, - f: (r: WithdrawalGroupRecord) => Promise, -): Promise { - let withdrawalGroupRecords: WithdrawalGroupRecord[]; - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - withdrawalGroupRecords = - await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange); - } else { - withdrawalGroupRecords = - await tx.withdrawalGroups.indexes.byStatus.getAll(); - } - for (const wgr of withdrawalGroupRecords) { - await f(wgr); - } -} - -async function gatherWithdrawalPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; - planchets: typeof WalletStoresV1.planchets; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => { - const opTag = TaskIdentifiers.forWithdrawal(wsr); - let opr = await tx.operationRetries.get(opTag); - /** - * kyc pending operation don't give lifeness - * since the user need to complete kyc procedure - */ - const userNeedToCompleteKYC = wsr.kycUrl !== undefined; - const now = AbsoluteTime.now(); - if (!opr) { - opr = { - id: opTag, - retryInfo: { - firstTry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)), - nextRetry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)), - retryCounter: 0, - }, - }; - } - resp.pendingOperations.push({ - type: PendingTaskType.Withdraw, - ...getPendingCommon( - ws, - opTag, - timestampOptionalAbsoluteFromDb(opr.retryInfo?.nextRetry) ?? - AbsoluteTime.now(), - ), - givesLifeness: !userNeedToCompleteKYC, - withdrawalGroupId: wsr.withdrawalGroupId, - lastError: opr.lastError, - retryInfo: opr.retryInfo, - }); - }); -} - -export async function iterRecordsForDeposit( - tx: GetReadOnlyAccess<{ - depositGroups: typeof WalletStoresV1.depositGroups; - }>, - filter: TransactionRecordFilter, - f: (r: DepositGroupRecord) => Promise, -): Promise { - let dgs: DepositGroupRecord[]; - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange); - } else { - dgs = await tx.depositGroups.indexes.byStatus.getAll(); - } - - for (const dg of dgs) { - await f(dg); - } -} - -async function gatherDepositPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - depositGroups: typeof WalletStoresV1.depositGroups; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => { - let deposited = true; - for (const d of dg.statusPerCoin) { - if (d === DepositElementStatus.DepositPending) { - deposited = false; - } - } - /** - * kyc pending operation don't give lifeness - * since the user need to complete kyc procedure - */ - const userNeedToCompleteKYC = dg.kycInfo !== undefined; - const opId = TaskIdentifiers.forDeposit(dg); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Deposit, - ...getPendingCommon(ws, opId, timestampDue), - // Fully deposited operations don't give lifeness, - // because there is no reason to wait on the - // deposit tracking status. - givesLifeness: !deposited && !userNeedToCompleteKYC, - depositGroupId: dg.depositGroupId, - lastError: retryRecord?.lastError, - retryInfo: retryRecord?.retryInfo, - }); - }); -} - -export async function iterRecordsForReward( - tx: GetReadOnlyAccess<{ - rewards: typeof WalletStoresV1.rewards; - }>, - filter: TransactionRecordFilter, - f: (r: RewardRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.rewards.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherRewardPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - rewards: typeof WalletStoresV1.rewards; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => { - const opId = TaskIdentifiers.forTipPickup(tip); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - - /** - * kyc pending operation don't give lifeness - * since the user need to complete kyc procedure - */ - // const userNeedToCompleteKYC = tip. - - if (tip.acceptedTimestamp) { - resp.pendingOperations.push({ - type: PendingTaskType.RewardPickup, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - timestampDue, - merchantBaseUrl: tip.merchantBaseUrl, - tipId: tip.walletRewardId, - merchantTipId: tip.merchantRewardId, - }); - } - }); -} - -export async function iterRecordsForRefund( - tx: GetReadOnlyAccess<{ - refundGroups: typeof WalletStoresV1.refundGroups; - }>, - filter: TransactionRecordFilter, - f: (r: RefundGroupRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.refundGroups.iter().forEachAsync(f); - } -} - -export async function iterRecordsForPurchase( - tx: GetReadOnlyAccess<{ - purchases: typeof WalletStoresV1.purchases; - }>, - filter: TransactionRecordFilter, - f: (r: PurchaseRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.purchases.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherPurchasePending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - purchases: typeof WalletStoresV1.purchases; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => { - switch (pr.purchaseStatus) { - // These states are nonfinal but don't need any processing - case PurchaseStatus.DialogProposed: - case PurchaseStatus.DialogShared: - return; - } - const opId = TaskIdentifiers.forPay(pr); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Purchase, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - statusStr: PurchaseStatus[pr.purchaseStatus], - proposalId: pr.proposalId, - retryInfo: retryRecord?.retryInfo, - lastError: retryRecord?.lastError, - }); - }); -} - -async function gatherRecoupPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - recoupGroups: typeof WalletStoresV1.recoupGroups; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - // FIXME: Have a status field! - await tx.recoupGroups.iter().forEachAsync(async (rg) => { - if (rg.timestampFinished) { - return; - } - const opId = TaskIdentifiers.forRecoup(rg); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Recoup, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - recoupGroupId: rg.recoupGroupId, - retryInfo: retryRecord?.retryInfo, - lastError: retryRecord?.lastError, - }); - }); -} - -async function gatherBackupPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - backupProviders: typeof WalletStoresV1.backupProviders; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await tx.backupProviders.iter().forEachAsync(async (bp) => { - const opId = TaskIdentifiers.forBackup(bp); - const retryRecord = await tx.operationRetries.get(opId); - if (bp.state.tag === BackupProviderStateTag.Ready) { - const timestampDue = timestampAbsoluteFromDb( - bp.state.nextBackupTimestamp, - ); - resp.pendingOperations.push({ - type: PendingTaskType.Backup, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: false, - backupProviderBaseUrl: bp.baseUrl, - lastError: undefined, - }); - } else if (bp.state.tag === BackupProviderStateTag.Retrying) { - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo?.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Backup, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: false, - backupProviderBaseUrl: bp.baseUrl, - retryInfo: retryRecord?.retryInfo, - lastError: retryRecord?.lastError, - }); - } - }); -} - -export async function iterRecordsForPeerPullInitiation( - tx: GetReadOnlyAccess<{ - peerPullCredit: typeof WalletStoresV1.peerPullCredit; - }>, - filter: TransactionRecordFilter, - f: (r: PeerPullCreditRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherPeerPullInitiationPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - peerPullCredit: typeof WalletStoresV1.peerPullCredit; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForPeerPullInitiation( - tx, - { onlyState: "nonfinal" }, - async (pi) => { - const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - - /** - * kyc pending operation don't give lifeness - * since the user need to complete kyc procedure - */ - const userNeedToCompleteKYC = pi.kycUrl !== undefined; - - resp.pendingOperations.push({ - type: PendingTaskType.PeerPullCredit, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: !userNeedToCompleteKYC, - retryInfo: retryRecord?.retryInfo, - pursePub: pi.pursePub, - internalOperationStatus: `0x${pi.status.toString(16)}`, - }); - }, - ); -} - -export async function iterRecordsForPeerPullDebit( - tx: GetReadOnlyAccess<{ - peerPullDebit: typeof WalletStoresV1.peerPullDebit; - }>, - filter: TransactionRecordFilter, - f: (r: PeerPullPaymentIncomingRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherPeerPullDebitPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - peerPullDebit: typeof WalletStoresV1.peerPullDebit; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForPeerPullDebit( - tx, - { onlyState: "nonfinal" }, - async (pi) => { - const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - switch (pi.status) { - case PeerPullDebitRecordStatus.DialogProposed: - return; - } - resp.pendingOperations.push({ - type: PendingTaskType.PeerPullDebit, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - retryInfo: retryRecord?.retryInfo, - peerPullDebitId: pi.peerPullDebitId, - internalOperationStatus: `0x${pi.status.toString(16)}`, - }); - }, - ); -} - -export async function iterRecordsForPeerPushInitiation( - tx: GetReadOnlyAccess<{ - peerPushDebit: typeof WalletStoresV1.peerPushDebit; - }>, - filter: TransactionRecordFilter, - f: (r: PeerPushDebitRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherPeerPushInitiationPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - peerPushDebit: typeof WalletStoresV1.peerPushDebit; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForPeerPushInitiation( - tx, - { onlyState: "nonfinal" }, - async (pi) => { - const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.PeerPushDebit, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - retryInfo: retryRecord?.retryInfo, - pursePub: pi.pursePub, - }); - }, - ); -} - -export async function iterRecordsForPeerPushCredit( - tx: GetReadOnlyAccess<{ - peerPushCredit: typeof WalletStoresV1.peerPushCredit; - }>, - filter: TransactionRecordFilter, - f: (r: PeerPushPaymentIncomingRecord) => Promise, -): Promise { - if (filter.onlyState === "nonfinal") { - const keyRange = GlobalIDB.KeyRange.bound( - OPERATION_STATUS_ACTIVE_FIRST, - OPERATION_STATUS_ACTIVE_LAST, - ); - await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); - } else { - await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f); - } -} - -async function gatherPeerPushCreditPending( - ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - peerPushCredit: typeof WalletStoresV1.peerPushCredit; - operationRetries: typeof WalletStoresV1.operationRetries; - }>, - now: AbsoluteTime, - resp: PendingOperationsResponse, -): Promise { - await iterRecordsForPeerPushCredit( - tx, - { onlyState: "nonfinal" }, - async (pi) => { - const opId = TaskIdentifiers.forPeerPushCredit(pi); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? - AbsoluteTime.now(); - - /** - * kyc pending operation don't give lifeness - * since the user need to complete kyc procedure - */ - const userNeedToCompleteKYC = pi.kycUrl !== undefined; - - switch (pi.status) { - // Status is nonfinal but no processing needs to be done - case PeerPushCreditStatus.DialogProposed: - return; - default: - resp.pendingOperations.push({ - type: PendingTaskType.PeerPushCredit, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: !userNeedToCompleteKYC, - retryInfo: retryRecord?.retryInfo, - peerPushCreditId: pi.peerPushCreditId, - }); - } - }, - ); -} - -const taskPrio: { [X in PendingTaskType]: number } = { - [PendingTaskType.Deposit]: 2, - [PendingTaskType.ExchangeUpdate]: 1, - [PendingTaskType.PeerPullCredit]: 2, - [PendingTaskType.PeerPullDebit]: 2, - [PendingTaskType.PeerPushCredit]: 2, - [PendingTaskType.Purchase]: 2, - [PendingTaskType.Recoup]: 3, - [PendingTaskType.RewardPickup]: 2, - [PendingTaskType.Refresh]: 3, - [PendingTaskType.Withdraw]: 3, - [PendingTaskType.ExchangeCheckRefresh]: 3, - [PendingTaskType.PeerPushDebit]: 2, - [PendingTaskType.Backup]: 4, -}; - -export async function getPendingOperations( - ws: InternalWalletState, -): Promise { - const now = AbsoluteTime.now(); - const resp = await ws.db - .mktx((x) => [ - x.backupProviders, - x.exchanges, - x.exchangeDetails, - x.refreshGroups, - x.coins, - x.withdrawalGroups, - x.rewards, - x.purchases, - x.planchets, - x.depositGroups, - x.recoupGroups, - x.operationRetries, - x.peerPullCredit, - x.peerPushDebit, - x.peerPullDebit, - x.peerPushCredit, - ]) - .runReadWrite(async (tx) => { - const resp: PendingOperationsResponse = { - pendingOperations: [], - }; - await gatherExchangePending(ws, tx, now, resp); - await gatherRefreshPending(ws, tx, now, resp); - await gatherWithdrawalPending(ws, tx, now, resp); - await gatherDepositPending(ws, tx, now, resp); - await gatherRewardPending(ws, tx, now, resp); - await gatherPurchasePending(ws, tx, now, resp); - await gatherRecoupPending(ws, tx, now, resp); - await gatherBackupPending(ws, tx, now, resp); - await gatherPeerPushInitiationPending(ws, tx, now, resp); - await gatherPeerPullInitiationPending(ws, tx, now, resp); - await gatherPeerPullDebitPending(ws, tx, now, resp); - await gatherPeerPushCreditPending(ws, tx, now, resp); - return resp; - }); - - resp.pendingOperations.sort((a, b) => { - let prioA = taskPrio[a.type]; - let prioB = taskPrio[b.type]; - return Math.sign(prioA - prioB); - }); - - return resp; -} diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 5f7169dbd..b9ac12518 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -15,12 +15,12 @@ */ import { - AbsoluteTime, AgeCommitment, AgeRestriction, AmountJson, Amounts, amountToPretty, + CancellationToken, codecForExchangeMeltResponse, codecForExchangeRevealResponse, CoinPublicKeyString, @@ -29,8 +29,6 @@ import { DenominationInfo, DenomKeyType, Duration, - durationFromSpec, - durationMul, encodeCrock, ExchangeMeltRequest, ExchangeProtocolVersion, @@ -51,7 +49,6 @@ import { TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, - TalerProtocolTimestamp, TransactionAction, TransactionMajorState, TransactionState, @@ -79,10 +76,11 @@ import { } from "../db.js"; import { getCandidateWithdrawalDenomsTx, + PendingTaskType, RefreshGroupPerExchangeInfo, RefreshSessionRecord, + TaskId, timestampPreciseToDb, - timestampProtocolFromDb, WalletDbReadWriteTransactionArr, } from "../index.js"; import { @@ -94,6 +92,7 @@ import { selectWithdrawalDenominations } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; import { + constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, TaskRunResult, @@ -111,6 +110,7 @@ const logger = new Logger("refresh.ts"); export class RefreshTransactionContext implements TransactionContext { public transactionId: string; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, @@ -120,6 +120,10 @@ export class RefreshTransactionContext implements TransactionContext { tag: TransactionType.Refresh, refreshGroupId, }); + this.taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId, + }); } async deleteTransaction(): Promise { @@ -211,8 +215,8 @@ export class RefreshTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise { @@ -250,8 +254,9 @@ export class RefreshTransactionContext implements TransactionContext { newTxState: computeRefreshTransactionState(dg), }; }); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(this.taskId); } } @@ -1003,7 +1008,7 @@ async function refreshReveal( export async function processRefreshGroup( ws: InternalWalletState, refreshGroupId: string, - options: Record = {}, + cancellationToken: CancellationToken, ): Promise { logger.trace(`processing refresh group ${refreshGroupId}`); @@ -1053,7 +1058,7 @@ export async function processRefreshGroup( logger.warn(`exception: ${e}`); } if (inShutdown) { - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } if (errors.length > 0) { return { @@ -1068,7 +1073,7 @@ export async function processRefreshGroup( }; } - return TaskRunResult.pending(); + return TaskRunResult.backoff(); } async function processRefreshSession( @@ -1311,134 +1316,18 @@ export async function createRefreshGroup( logger.trace(`created refresh group ${refreshGroupId}`); + const ctx = new RefreshTransactionContext(ws, refreshGroupId); + + // Shepherd the task. + // If the current transaction fails to commit the refresh + // group to the DB, the shepherd will give up. + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { refreshGroupId, }; } -/** - * Timestamp after which the wallet would do the next check for an auto-refresh. - */ -function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime { - const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(d.stampExpireWithdraw), - ); - const expireDeposit = AbsoluteTime.fromProtocolTimestamp( - timestampProtocolFromDb(d.stampExpireDeposit), - ); - const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); - const deltaDiv = durationMul(delta, 0.75); - return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); -} - -/** - * Timestamp after which the wallet would do an auto-refresh. - */ -export function getAutoRefreshExecuteThreshold(d: { - stampExpireWithdraw: TalerProtocolTimestamp; - stampExpireDeposit: TalerProtocolTimestamp; -}): AbsoluteTime { - const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( - d.stampExpireWithdraw, - ); - const expireDeposit = AbsoluteTime.fromProtocolTimestamp( - d.stampExpireDeposit, - ); - const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); - const deltaDiv = durationMul(delta, 0.5); - return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); -} - -function getAutoRefreshExecuteThresholdForDenom( - d: DenominationRecord, -): AbsoluteTime { - return getAutoRefreshExecuteThreshold({ - stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw), - stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit), - }); -} - -export async function autoRefresh( - ws: InternalWalletState, - exchangeBaseUrl: string, -): Promise { - logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); - - // We must make sure that the exchange is up-to-date so that - // can refresh into new denominations. - await fetchFreshExchange(ws, exchangeBaseUrl); - - let minCheckThreshold = AbsoluteTime.addDuration( - AbsoluteTime.now(), - durationFromSpec({ days: 1 }), - ); - await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.refreshGroups, - x.exchanges, - ]) - .runReadWrite(async (tx) => { - const exchange = await tx.exchanges.get(exchangeBaseUrl); - if (!exchange || !exchange.detailsPointer) { - return; - } - const coins = await tx.coins.indexes.byBaseUrl - .iter(exchangeBaseUrl) - .toArray(); - const refreshCoins: CoinRefreshRequest[] = []; - for (const coin of coins) { - if (coin.status !== CoinStatus.Fresh) { - continue; - } - const denom = await tx.denominations.get([ - exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - logger.warn("denomination not in database"); - continue; - } - const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom); - if (AbsoluteTime.isExpired(executeThreshold)) { - refreshCoins.push({ - coinPub: coin.coinPub, - amount: denom.value, - }); - } else { - const checkThreshold = getAutoRefreshCheckThreshold(denom); - minCheckThreshold = AbsoluteTime.min( - minCheckThreshold, - checkThreshold, - ); - } - } - if (refreshCoins.length > 0) { - const res = await createRefreshGroup( - ws, - tx, - exchange.detailsPointer?.currency, - refreshCoins, - RefreshReason.Scheduled, - undefined, - ); - logger.trace( - `created refresh group for auto-refresh (${res.refreshGroupId})`, - ); - } - logger.trace( - `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`, - ); - exchange.nextRefreshCheckStamp = timestampPreciseToDb( - AbsoluteTime.toPreciseTimestamp(minCheckThreshold), - ); - await tx.exchanges.put(exchange); - }); - return TaskRunResult.finished(); -} - export function computeRefreshTransactionState( rg: RefreshGroupRecord, ): TransactionState { diff --git a/packages/taler-wallet-core/src/operations/reward.ts b/packages/taler-wallet-core/src/operations/reward.ts index 6dcd48019..4d8653a9d 100644 --- a/packages/taler-wallet-core/src/operations/reward.ts +++ b/packages/taler-wallet-core/src/operations/reward.ts @@ -83,7 +83,6 @@ import { constructTransactionIdentifier, notifyTransition, parseTransactionIdentifier, - stopLongpolling, } from "./transactions.js"; import { PendingTaskType } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; @@ -125,7 +124,6 @@ export class RewardTransactionContext implements TransactionContext { async suspendTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.rewards]) .runReadWrite(async (tx) => { @@ -161,13 +159,11 @@ export class RewardTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.rewards]) .runReadWrite(async (tx) => { @@ -207,7 +203,6 @@ export class RewardTransactionContext implements TransactionContext { async resumeTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.rewards]) .runReadWrite(async (tx) => { @@ -247,7 +242,6 @@ export class RewardTransactionContext implements TransactionContext { async failTransaction(): Promise { const { ws, walletRewardId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); const transitionInfo = await ws.db .mktx((x) => [x.rewards]) .runReadWrite(async (tx) => { diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index 4c2cfae2c..5902e8362 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -62,6 +62,7 @@ import { import { InternalWalletState } from "../internal-wallet-state.js"; import { checkLogicInvariant } from "../util/invariants.js"; import { getBalances } from "./balance.js"; +import { createDepositGroup } from "./deposits.js"; import { fetchFreshExchange } from "./exchanges.js"; import { confirmPay, @@ -78,10 +79,8 @@ import { preparePeerPushCredit, } from "./pay-peer-push-credit.js"; import { initiatePeerPushDebit } from "./pay-peer-push-debit.js"; -import { getPendingOperations } from "./pending.js"; import { getTransactionById, getTransactions } from "./transactions.js"; import { acceptWithdrawalFromUri } from "./withdraw.js"; -import { createDepositGroup } from "./deposits.js"; const logger = new Logger("operations/testing.ts"); @@ -521,44 +520,6 @@ export async function waitUntilGivenTransactionsFinal( logger.info("done waiting until given transactions are in a final state"); } -/** - * Wait until pending work is processed. - */ -export async function waitUntilTasksProcessed( - ws: InternalWalletState, -): Promise { - logger.info("waiting until pending work is processed"); - ws.ensureTaskLoopRunning(); - let p: OpenedPromise | undefined = undefined; - const cancelNotifs = ws.addNotificationListener((notif) => { - if (!p) { - return; - } - if (notif.type === NotificationType.PendingOperationProcessed) { - p.resolve(); - } - }); - while (1) { - p = openPromise(); - const pendingTasksResp = await getPendingOperations(ws); - logger.info(`waiting on pending ops: ${j2s(pendingTasksResp)}`); - let finished = true; - for (const task of pendingTasksResp.pendingOperations) { - if (task.isDue) { - finished = false; - } - logger.info(`continuing waiting for task ${task.id}`); - } - if (finished) { - break; - } - // Wait until task is done - await p.promise; - } - logger.info("done waiting until pending work is processed"); - cancelNotifs(); -} - export async function waitUntilRefreshesDone( ws: InternalWalletState, ): Promise { diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index 3b4e75427..10e018d23 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -17,6 +17,7 @@ /** * Imports. */ +import { GlobalIDB } from "@gnu-taler/idb-bridge"; import { AbsoluteTime, Amounts, @@ -69,18 +70,19 @@ import { } from "../db.js"; import { GetReadOnlyAccess, + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, PeerPushDebitStatus, timestampPreciseFromDb, timestampProtocolFromDb, WalletStoresV1, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; import { constructTaskIdentifier, - resetPendingTaskTimeout, TaskIdentifiers, TransactionContext, } from "./common.js"; @@ -122,18 +124,6 @@ import { computePeerPushDebitTransactionState, PeerPushDebitTransactionContext, } from "./pay-peer-push-debit.js"; -import { - iterRecordsForDeposit, - iterRecordsForPeerPullInitiation as iterRecordsForPeerPullCredit, - iterRecordsForPeerPullDebit, - iterRecordsForPeerPushCredit, - iterRecordsForPeerPushInitiation as iterRecordsForPeerPushDebit, - iterRecordsForPurchase, - iterRecordsForRefresh, - iterRecordsForRefund, - iterRecordsForReward, - iterRecordsForWithdrawal, -} from "./pending.js"; import { computeRefreshTransactionActions, computeRefreshTransactionState, @@ -159,25 +149,32 @@ function shouldSkipCurrency( exchangesInTransaction: string[], ): boolean { if (transactionsRequest?.scopeInfo) { - const sameCurrency = transactionsRequest.scopeInfo.currency.toLowerCase() === currency.toLowerCase() + const sameCurrency = + transactionsRequest.scopeInfo.currency.toLowerCase() === + currency.toLowerCase(); switch (transactionsRequest.scopeInfo.type) { case ScopeType.Global: { - return !sameCurrency + return !sameCurrency; } case ScopeType.Exchange: { - const exchangeInvolveInTransaction = exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== -1 - return !sameCurrency || !exchangeInvolveInTransaction + const exchangeInvolveInTransaction = + exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== + -1; + return !sameCurrency || !exchangeInvolveInTransaction; } case ScopeType.Auditor: { // same currency and same auditor - throw Error("filering balance in auditor scope is not implemented") + throw Error("filering balance in auditor scope is not implemented"); } - default: assertUnreachable(transactionsRequest.scopeInfo) + default: + assertUnreachable(transactionsRequest.scopeInfo); } } // FIXME: remove next release if (transactionsRequest?.currency) { - return transactionsRequest.currency.toLowerCase() !== currency.toLowerCase(); + return ( + transactionsRequest.currency.toLowerCase() !== currency.toLowerCase() + ); } return false; } @@ -565,7 +562,7 @@ function buildTransactionForPeerPullCredit( const silentWithdrawalErrorForInvoice = wsrOrt?.lastError && wsrOrt.lastError.code === - TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE && + TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE && Object.values(wsrOrt.lastError.errorsPerCoin ?? {}).every((e) => { return ( e.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR && @@ -598,10 +595,10 @@ function buildTransactionForPeerPullCredit( kycUrl: pullCredit.kycUrl, ...(wsrOrt?.lastError ? { - error: silentWithdrawalErrorForInvoice - ? undefined - : wsrOrt.lastError, - } + error: silentWithdrawalErrorForInvoice + ? undefined + : wsrOrt.lastError, + } : {}), }; } @@ -1118,8 +1115,14 @@ export async function getTransactions( .runReadOnly(async (tx) => { await iterRecordsForPeerPushDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1134,8 +1137,14 @@ export async function getTransactions( await iterRecordsForPeerPullDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1169,8 +1178,10 @@ export async function getTransactions( // Legacy transaction return; } - const exchangesInTx = [pi.exchangeBaseUrl] - if (shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx)) { + const exchangesInTx = [pi.exchangeBaseUrl]; + if ( + shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx) + ) { return; } if (shouldSkipSearch(transactionsRequest, [])) { @@ -1208,7 +1219,7 @@ export async function getTransactions( await iterRecordsForPeerPullCredit(tx, filter, async (pi) => { const currency = Amounts.currencyOf(pi.amount); - const exchangesInTx = [pi.exchangeBaseUrl] + const exchangesInTx = [pi.exchangeBaseUrl]; if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) { return; } @@ -1243,16 +1254,16 @@ export async function getTransactions( await iterRecordsForRefund(tx, filter, async (refundGroup) => { const currency = Amounts.currencyOf(refundGroup.amountRaw); - const exchangesInTx: string[] = [] - const p = await tx.purchases.get(refundGroup.proposalId) + const exchangesInTx: string[] = []; + const p = await tx.purchases.get(refundGroup.proposalId); if (!p || !p.payInfo) return; //refund with no payment p.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => { - const c = await tx.coins.get(cp) + const c = await tx.coins.get(cp); if (c?.exchangeBaseUrl) { - exchangesInTx.push(c.exchangeBaseUrl) + exchangesInTx.push(c.exchangeBaseUrl); } - }) + }); if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) { return; @@ -1265,8 +1276,12 @@ export async function getTransactions( }); await iterRecordsForRefresh(tx, filter, async (rg) => { - const exchangesInTx = rg.infoPerExchange ? Object.keys(rg.infoPerExchange) : [] - if (shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx)) { + const exchangesInTx = rg.infoPerExchange + ? Object.keys(rg.infoPerExchange) + : []; + if ( + shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx) + ) { return; } let required = false; @@ -1286,7 +1301,7 @@ export async function getTransactions( }); await iterRecordsForWithdrawal(tx, filter, async (wsr) => { - const exchangesInTx = [wsr.exchangeBaseUrl] + const exchangesInTx = [wsr.exchangeBaseUrl]; if ( shouldSkipCurrency( transactionsRequest, @@ -1343,8 +1358,16 @@ export async function getTransactions( await iterRecordsForDeposit(tx, filter, async (dg) => { const amount = Amounts.parseOrThrow(dg.amount); - const exchangesInTx = dg.infoPerExchange ? Object.keys(dg.infoPerExchange) : [] - if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) { + const exchangesInTx = dg.infoPerExchange + ? Object.keys(dg.infoPerExchange) + : []; + if ( + shouldSkipCurrency( + transactionsRequest, + amount.currency, + exchangesInTx, + ) + ) { return; } const opId = TaskIdentifiers.forDeposit(dg); @@ -1362,15 +1385,21 @@ export async function getTransactions( return; } - const exchangesInTx: string[] = [] + const exchangesInTx: string[] = []; purchase.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => { - const c = await tx.coins.get(cp) + const c = await tx.coins.get(cp); if (c?.exchangeBaseUrl) { - exchangesInTx.push(c.exchangeBaseUrl) + exchangesInTx.push(c.exchangeBaseUrl); } - }) + }); - if (shouldSkipCurrency(transactionsRequest, download.currency, exchangesInTx)) { + if ( + shouldSkipCurrency( + transactionsRequest, + download.currency, + exchangesInTx, + ) + ) { return; } const contractTermsRecord = await tx.contractTerms.get( @@ -1429,7 +1458,6 @@ export async function getTransactions( transactions.push(buildTransactionForTip(tipRecord, retryRecord)); }); //ends REMOVE REWARDS - }); // One-off checks, because of a bug where the wallet previously @@ -1587,25 +1615,7 @@ export function parseTransactionIdentifier( } } -export function stopLongpolling(ws: InternalWalletState, taskId: string) { - const longpoll = ws.activeLongpoll[taskId]; - if (longpoll) { - logger.info(`cancelling long-polling for ${taskId}`); - longpoll.cancel(); - delete ws.activeLongpoll[taskId]; - } -} - -/** - * Immediately retry the underlying operation - * of a transaction. - */ -export async function retryTransaction( - ws: InternalWalletState, - transactionId: string, -): Promise { - logger.info(`resetting retry timeout for ${transactionId}`); - +function maybeTaskFromTransaction(transactionId: string): TaskId | undefined { const parsedTx = parseTransactionIdentifier(transactionId); if (!parsedTx) { @@ -1615,100 +1625,80 @@ export async function retryTransaction( // FIXME: We currently don't cancel active long-polling tasks here. switch (parsedTx.tag) { - case TransactionType.PeerPullCredit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPullCredit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPullCredit, pursePub: parsedTx.pursePub, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Deposit: { - const taskId = constructTaskIdentifier({ + case TransactionType.Deposit: + return constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId: parsedTx.depositGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } case TransactionType.InternalWithdrawal: - case TransactionType.Withdrawal: { - // FIXME: Abort current long-poller! - const taskId = constructTaskIdentifier({ + case TransactionType.Withdrawal: + return constructTaskIdentifier({ tag: PendingTaskType.Withdraw, withdrawalGroupId: parsedTx.withdrawalGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Payment: { - const taskId = constructTaskIdentifier({ + case TransactionType.Payment: + return constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId: parsedTx.proposalId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Reward: { - const taskId = constructTaskIdentifier({ + case TransactionType.Reward: + return constructTaskIdentifier({ tag: PendingTaskType.RewardPickup, walletRewardId: parsedTx.walletRewardId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.Refresh: { - const taskId = constructTaskIdentifier({ + case TransactionType.Refresh: + return constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId: parsedTx.refreshGroupId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPullDebit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPullDebit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPullDebit, peerPullDebitId: parsedTx.peerPullDebitId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPushCredit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPushCredit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPushCredit, peerPushCreditId: parsedTx.peerPushCreditId, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } - case TransactionType.PeerPushDebit: { - const taskId = constructTaskIdentifier({ + case TransactionType.PeerPushDebit: + return constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub: parsedTx.pursePub, }); - await resetPendingTaskTimeout(ws, taskId); - stopLongpolling(ws, taskId); - break; - } case TransactionType.Refund: // Nothing to do for a refund transaction. - break; + return undefined; case TransactionType.Recoup: - // FIXME! - throw Error("not implemented"); + return constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: parsedTx.recoupGroupId, + }); default: assertUnreachable(parsedTx); } } +/** + * Immediately retry the underlying operation + * of a transaction. + */ +export async function retryTransaction( + ws: InternalWalletState, + transactionId: string, +): Promise { + logger.info(`resetting retry timeout for ${transactionId}`); + const taskId = maybeTaskFromTransaction(transactionId); + if (taskId) { + ws.taskScheduler.resetTaskRetries(taskId); + } +} + async function getContextForTransaction( ws: InternalWalletState, transactionId: string, @@ -1828,5 +1818,203 @@ export function notifyTransition( experimentalUserData, }); } - ws.workAvailable.trigger(); +} + +/** + * Iterate refresh records based on a filter. + */ +async function iterRecordsForRefresh( + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefreshGroupRecord) => Promise, +): Promise { + let refreshGroups: RefreshGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + RefreshOperationStatus.Pending, + RefreshOperationStatus.Suspended, + ); + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange); + } else { + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(); + } + + for (const r of refreshGroups) { + await f(r); + } +} + +async function iterRecordsForWithdrawal( + tx: GetReadOnlyAccess<{ + withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; + }>, + filter: TransactionRecordFilter, + f: (r: WithdrawalGroupRecord) => Promise, +): Promise { + let withdrawalGroupRecords: WithdrawalGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange); + } else { + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(); + } + for (const wgr of withdrawalGroupRecords) { + await f(wgr); + } +} + +async function iterRecordsForDeposit( + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + }>, + filter: TransactionRecordFilter, + f: (r: DepositGroupRecord) => Promise, +): Promise { + let dgs: DepositGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange); + } else { + dgs = await tx.depositGroups.indexes.byStatus.getAll(); + } + + for (const dg of dgs) { + await f(dg); + } +} + +async function iterRecordsForReward( + tx: GetReadOnlyAccess<{ + rewards: typeof WalletStoresV1.rewards; + }>, + filter: TransactionRecordFilter, + f: (r: RewardRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.rewards.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForRefund( + tx: GetReadOnlyAccess<{ + refundGroups: typeof WalletStoresV1.refundGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefundGroupRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.refundGroups.iter().forEachAsync(f); + } +} + +async function iterRecordsForPurchase( + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + }>, + filter: TransactionRecordFilter, + f: (r: PurchaseRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.purchases.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPullCredit( + tx: GetReadOnlyAccess<{ + peerPullCredit: typeof WalletStoresV1.peerPullCredit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullCreditRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPullDebit( + tx: GetReadOnlyAccess<{ + peerPullDebit: typeof WalletStoresV1.peerPullDebit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentIncomingRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPushDebit( + tx: GetReadOnlyAccess<{ + peerPushDebit: typeof WalletStoresV1.peerPushDebit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushDebitRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f); + } +} + +async function iterRecordsForPeerPushCredit( + tx: GetReadOnlyAccess<{ + peerPushCredit: typeof WalletStoresV1.peerPushCredit; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentIncomingRecord) => Promise, +): Promise { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f); + } } diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 86f05478a..4a50e0775 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2019-2021 Taler Systems SA + (C) 2019-2024 Taler Systems SA GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -52,6 +52,7 @@ import { TalerPreciseTimestamp, TalerProtocolTimestamp, TransactionAction, + TransactionIdStr, TransactionMajorState, TransactionMinorState, TransactionState, @@ -67,7 +68,6 @@ import { codecForCashinConversionResponse, codecForConversionBankConfig, codecForExchangeWithdrawBatchResponse, - codecForIntegrationBankConfig, codecForReserveStatus, codecForWalletKycUuid, codecForWithdrawOperationStatusResponse, @@ -103,7 +103,6 @@ import { import { isWithdrawableDenom, timestampPreciseToDb } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { - TaskIdentifiers, TaskRunResult, TaskRunResultType, TombstoneTag, @@ -111,9 +110,8 @@ import { constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, - runLongpollAsync, } from "../operations/common.js"; -import { PendingTaskType } from "../pending-types.js"; +import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { selectForcedWithdrawalDenominations, @@ -141,7 +139,6 @@ import { TransitionInfo, constructTransactionIdentifier, notifyTransition, - stopLongpolling, } from "./transactions.js"; /** @@ -150,8 +147,8 @@ import { const logger = new Logger("operations/withdraw.ts"); export class WithdrawTransactionContext implements TransactionContext { - public transactionId: string; - public retryTag: string; + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; constructor( public ws: InternalWalletState, @@ -161,7 +158,7 @@ export class WithdrawTransactionContext implements TransactionContext { tag: TransactionType.Withdrawal, withdrawalGroupId, }); - this.retryTag = constructTaskIdentifier({ + this.taskId = constructTaskIdentifier({ tag: PendingTaskType.Withdraw, withdrawalGroupId, }); @@ -185,8 +182,7 @@ export class WithdrawTransactionContext implements TransactionContext { } async suspendTransaction(): Promise { - const { ws, withdrawalGroupId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); + const { ws, withdrawalGroupId, transactionId, taskId } = this; const transitionInfo = await ws.db .mktx((x) => [x.withdrawalGroups]) .runReadWrite(async (tx) => { @@ -235,13 +231,12 @@ export class WithdrawTransactionContext implements TransactionContext { } return undefined; }); - + ws.taskScheduler.stopShepherdTask(taskId); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise { - const { ws, withdrawalGroupId, transactionId } = this; - stopLongpolling(ws, this.retryTag); + const { ws, withdrawalGroupId, transactionId, taskId } = this; const transitionInfo = await ws.db .mktx((x) => [x.withdrawalGroups]) .runReadWrite(async (tx) => { @@ -297,12 +292,13 @@ export class WithdrawTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); + ws.taskScheduler.stopShepherdTask(taskId); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(taskId); } async resumeTransaction(): Promise { - const { ws, withdrawalGroupId, transactionId } = this; + const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this; const transitionInfo = await ws.db .mktx((x) => [x.withdrawalGroups]) .runReadWrite(async (tx) => { @@ -351,13 +347,12 @@ export class WithdrawTransactionContext implements TransactionContext { } return undefined; }); - ws.workAvailable.trigger(); notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise { - const { ws, withdrawalGroupId, transactionId, retryTag } = this; - stopLongpolling(ws, retryTag); + const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this; const stateUpdate = await ws.db .mktx((x) => [x.withdrawalGroups]) .runReadWrite(async (tx) => { @@ -387,7 +382,9 @@ export class WithdrawTransactionContext implements TransactionContext { } return undefined; }); + ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, stateUpdate); + ws.taskScheduler.startShepherdTask(retryTag); } } @@ -744,10 +741,8 @@ async function transitionKycUrlUpdate( kycUrl: string, ): Promise { let notificationKycUrl: string | undefined = undefined; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId); + const transactionId = ctx.transactionId; const transitionInfo = await ws.db .mktx((x) => [x.planchets, x.withdrawalGroups]) @@ -782,7 +777,7 @@ async function transitionKycUrlUpdate( experimentalUserData: notificationKycUrl, }); } - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(ctx.taskId); } async function handleKycRequired( @@ -1273,7 +1268,7 @@ async function queryReserve( ws: InternalWalletState, withdrawalGroupId: string, cancellationToken: CancellationToken, -): Promise<{ ready: boolean }> { +): Promise { const transactionId = constructTransactionIdentifier({ tag: TransactionType.Withdrawal, withdrawalGroupId, @@ -1283,7 +1278,7 @@ async function queryReserve( }); checkDbInvariant(!!withdrawalGroup); if (withdrawalGroup.status !== WithdrawalGroupStatus.PendingQueryingStatus) { - return { ready: true }; + return TaskRunResult.backoff(); } const reservePub = withdrawalGroup.reservePub; @@ -1312,7 +1307,7 @@ async function queryReserve( `got reserve status error, EC=${result.talerErrorResponse.code}`, ); if (resp.status === HttpStatusCode.NotFound) { - return { ready: false }; + return TaskRunResult.backoff(); } else { throwUnexpectedRequestError(resp, result.talerErrorResponse); } @@ -1341,13 +1336,7 @@ async function queryReserve( notifyTransition(ws, transactionId, transitionResult); - return { ready: true }; -} - -enum BankStatusResultCode { - Done = "done", - Waiting = "waiting", - Aborted = "aborted", + return TaskRunResult.backoff(); } /** @@ -1452,6 +1441,7 @@ async function transitionKycSatisfied( async function processWithdrawalGroupPendingKyc( ws: InternalWalletState, withdrawalGroup: WithdrawalGroupRecord, + cancellationToken: CancellationToken, ): Promise { const userType = "individual"; const kycInfo = withdrawalGroup.kycPending; @@ -1467,45 +1457,35 @@ async function processWithdrawalGroupPendingKyc( const withdrawalGroupId = withdrawalGroup.withdrawalGroupId; - const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup); - runLongpollAsync(ws, retryTag, async (cancellationToken) => { - logger.info(`long-polling for withdrawal KYC status via ${url.href}`); - const kycStatusRes = await ws.http.fetch(url.href, { - method: "GET", - cancellationToken, - }); - logger.info( - `kyc long-polling response status: HTTP ${kycStatusRes.status}`, - ); - if ( - kycStatusRes.status === HttpStatusCode.Ok || - //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge - // remove after the exchange is fixed or clarified - kycStatusRes.status === HttpStatusCode.NoContent - ) { - await transitionKycSatisfied(ws, withdrawalGroup); - return { ready: true }; - } else if (kycStatusRes.status === HttpStatusCode.Accepted) { - const kycStatus = await kycStatusRes.json(); - logger.info(`kyc status: ${j2s(kycStatus)}`); - const kycUrl = kycStatus.kyc_url; - if (typeof kycUrl === "string") { - await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl); - } - return { ready: false }; - } else if ( - kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons - ) { - const kycStatus = await kycStatusRes.json(); - logger.info(`aml status: ${j2s(kycStatus)}`); - return { ready: false }; - } else { - throw Error( - `unexpected response from kyc-check (${kycStatusRes.status})`, - ); - } + logger.info(`long-polling for withdrawal KYC status via ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, }); - return TaskRunResult.longpoll(); + logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`); + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + await transitionKycSatisfied(ws, withdrawalGroup); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + const kycStatus = await kycStatusRes.json(); + logger.info(`kyc status: ${j2s(kycStatus)}`); + const kycUrl = kycStatus.kyc_url; + if (typeof kycUrl === "string") { + await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl); + } + } else if ( + kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons + ) { + const kycStatus = await kycStatusRes.json(); + logger.info(`aml status: ${j2s(kycStatus)}`); + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); } async function processWithdrawalGroupPendingReady( @@ -1666,12 +1646,13 @@ async function processWithdrawalGroupPendingReady( }; } - return TaskRunResult.finished(); + return TaskRunResult.backoff(); } export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, + cancellationToken: CancellationToken, ): Promise { logger.trace("processing withdrawal group", withdrawalGroupId); const withdrawalGroup = await ws.db @@ -1684,54 +1665,30 @@ export async function processWithdrawalGroup( throw Error(`withdrawal group ${withdrawalGroupId} not found`); } - const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup); - - // We're already running! - if (ws.activeLongpoll[retryTag]) { - logger.info("withdrawal group already in long-polling, returning!"); - return { - type: TaskRunResultType.Longpoll, - }; - } - switch (withdrawalGroup.status) { case WithdrawalGroupStatus.PendingRegisteringBank: await processReserveBankStatus(ws, withdrawalGroupId); // FIXME: This will get called by the main task loop, why call it here?! - return await processWithdrawalGroup(ws, withdrawalGroupId); - case WithdrawalGroupStatus.PendingQueryingStatus: { - runLongpollAsync(ws, retryTag, (ct) => { - return queryReserve(ws, withdrawalGroupId, ct); - }); - logger.trace( - "returning early from withdrawal for long-polling in background", + return await processWithdrawalGroup( + ws, + withdrawalGroupId, + cancellationToken, ); - return { - type: TaskRunResultType.Longpoll, - }; + case WithdrawalGroupStatus.PendingQueryingStatus: { + return queryReserve(ws, withdrawalGroupId, cancellationToken); } case WithdrawalGroupStatus.PendingWaitConfirmBank: { - const res = await processReserveBankStatus(ws, withdrawalGroupId); - switch (res.status) { - case BankStatusResultCode.Aborted: - case BankStatusResultCode.Done: - return TaskRunResult.finished(); - case BankStatusResultCode.Waiting: { - return TaskRunResult.pending(); - } - } - break; - } - case WithdrawalGroupStatus.Done: - case WithdrawalGroupStatus.FailedBankAborted: { - // FIXME - return TaskRunResult.pending(); + return await processReserveBankStatus(ws, withdrawalGroupId); } case WithdrawalGroupStatus.PendingAml: // FIXME: Handle this case, withdrawal doesn't support AML yet. - return TaskRunResult.pending(); + return TaskRunResult.backoff(); case WithdrawalGroupStatus.PendingKyc: - return processWithdrawalGroupPendingKyc(ws, withdrawalGroup); + return processWithdrawalGroupPendingKyc( + ws, + withdrawalGroup, + cancellationToken, + ); case WithdrawalGroupStatus.PendingReady: // Continue with the actual withdrawal! return await processWithdrawalGroupPendingReady(ws, withdrawalGroup); @@ -1747,6 +1704,8 @@ export async function processWithdrawalGroup( case WithdrawalGroupStatus.SuspendedReady: case WithdrawalGroupStatus.SuspendedRegisteringBank: case WithdrawalGroupStatus.SuspendedWaitConfirmBank: + case WithdrawalGroupStatus.Done: + case WithdrawalGroupStatus.FailedBankAborted: // Nothing to do. return TaskRunResult.finished(); default: @@ -2168,14 +2127,10 @@ async function registerReserveWithBank( notifyTransition(ws, transactionId, transitionInfo); } -interface BankStatusResult { - status: BankStatusResultCode; -} - async function processReserveBankStatus( ws: InternalWalletState, withdrawalGroupId: string, -): Promise { +): Promise { const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, { withdrawalGroupId, }); @@ -2188,9 +2143,7 @@ async function processReserveBankStatus( case WithdrawalGroupStatus.PendingRegisteringBank: break; default: - return { - status: BankStatusResultCode.Done, - }; + return TaskRunResult.backoff(); } if ( @@ -2200,9 +2153,7 @@ async function processReserveBankStatus( } const bankInfo = withdrawalGroup.wgInfo.bankInfo; if (!bankInfo) { - return { - status: BankStatusResultCode.Done, - }; + return TaskRunResult.backoff(); } const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri); @@ -2246,9 +2197,7 @@ async function processReserveBankStatus( }; }); notifyTransition(ws, transactionId, transitionInfo); - return { - status: BankStatusResultCode.Aborted, - }; + return TaskRunResult.finished(); } // Bank still needs to know our reserve info @@ -2302,15 +2251,7 @@ async function processReserveBankStatus( notifyTransition(ws, transactionId, transitionInfo); - if (status.transfer_done) { - return { - status: BankStatusResultCode.Done, - }; - } else { - return { - status: BankStatusResultCode.Waiting, - }; - } + return TaskRunResult.backoff(); } export interface PrepareCreateWithdrawalGroupResult { @@ -2492,6 +2433,13 @@ export async function internalPerformCreateWithdrawalGroup( prep.withdrawalGroup.exchangeBaseUrl, ); + const ctx = new WithdrawTransactionContext( + ws, + withdrawalGroup.withdrawalGroupId, + ); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + return { withdrawalGroup, transitionInfo, @@ -2619,10 +2567,10 @@ export async function acceptWithdrawalFromUri( }); const withdrawalGroupId = withdrawalGroup.withdrawalGroupId; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + + const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId); + + const transactionId = ctx.transactionId; // We do this here, as the reserve should be registered before we return, // so that we can redirect the user to the bank's status page. @@ -2639,7 +2587,7 @@ export async function acceptWithdrawalFromUri( ); } - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(ctx.taskId); return { reservePub: withdrawalGroup.reservePub, @@ -2795,10 +2743,9 @@ export async function createManualWithdrawal( }); const withdrawalGroupId = withdrawalGroup.withdrawalGroupId; - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId, - }); + const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId); + + const transactionId = ctx.transactionId; const exchangePaytoUris = await ws.db .mktx((x) => [x.withdrawalGroups, x.exchanges, x.exchangeDetails]) @@ -2806,7 +2753,7 @@ export async function createManualWithdrawal( return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId); }); - ws.workAvailable.trigger(); + ws.taskScheduler.startShepherdTask(ctx.taskId); return { reservePub: withdrawalGroup.reservePub, diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts index f8406033a..4dfad9389 100644 --- a/packages/taler-wallet-core/src/pending-types.ts +++ b/packages/taler-wallet-core/src/pending-types.ts @@ -29,7 +29,6 @@ import { DbRetryInfo } from "./operations/common.js"; export enum PendingTaskType { ExchangeUpdate = "exchange-update", - ExchangeCheckRefresh = "exchange-check-refresh", Purchase = "purchase", Refresh = "refresh", Recoup = "recoup", @@ -49,7 +48,6 @@ export enum PendingTaskType { export type PendingTaskInfo = PendingTaskInfoCommon & ( | PendingExchangeUpdateTask - | PendingExchangeCheckRefreshTask | PendingPurchaseTask | PendingRefreshTask | PendingTipPickupTask @@ -109,14 +107,6 @@ export interface PendingPeerPushCreditTask { peerPushCreditId: string; } -/** - * The wallet should check whether coins from this exchange - * need to be auto-refreshed. - */ -export interface PendingExchangeCheckRefreshTask { - type: PendingTaskType.ExchangeCheckRefresh; - exchangeBaseUrl: string; -} export enum ReserveType { /** diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts new file mode 100644 index 000000000..d1648acc7 --- /dev/null +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -0,0 +1,851 @@ +/* + This file is part of GNU Taler + (C) 2024 Taler Systems SA + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * Imports. + */ +import { GlobalIDB } from "@gnu-taler/idb-bridge"; +import { + AbsoluteTime, + CancellationToken, + Duration, + Logger, + NotificationType, + RetryLoopOpts, + TalerError, + TalerErrorCode, + TalerErrorDetail, + TaskThrottler, + TransactionIdStr, + TransactionType, + WalletNotification, + assertUnreachable, + j2s, + makeErrorDetail, +} from "@gnu-taler/taler-util"; +import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; +import { + GetReadOnlyAccess, + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + WalletStoresV1, + timestampAbsoluteFromDb, +} from "./index.js"; +import { InternalWalletState } from "./internal-wallet-state.js"; +import { processBackupForProvider } from "./operations/backup/index.js"; +import { + DbRetryInfo, + TaskRunResult, + TaskRunResultType, + constructTaskIdentifier, + getExchangeState, + parseTaskIdentifier, +} from "./operations/common.js"; +import { processDepositGroup } from "./operations/deposits.js"; +import { updateExchangeFromUrlHandler } from "./operations/exchanges.js"; +import { processPurchase } from "./operations/pay-merchant.js"; +import { processPeerPullCredit } from "./operations/pay-peer-pull-credit.js"; +import { processPeerPullDebit } from "./operations/pay-peer-pull-debit.js"; +import { processPeerPushCredit } from "./operations/pay-peer-push-credit.js"; +import { processPeerPushDebit } from "./operations/pay-peer-push-debit.js"; +import { processRecoupGroup } from "./operations/recoup.js"; +import { processRefreshGroup } from "./operations/refresh.js"; +import { constructTransactionIdentifier } from "./operations/transactions.js"; +import { processWithdrawalGroup } from "./operations/withdraw.js"; +import { PendingTaskType, TaskId } from "./pending-types.js"; +import { AsyncCondition } from "./util/promiseUtils.js"; + +const logger = new Logger("shepherd.ts"); + +/** + * Info about one task being shepherded. + */ +interface ShepherdInfo { + cts: CancellationToken.Source; +} + +export class TaskScheduler { + private sheps: Map = new Map(); + + private iterCond = new AsyncCondition(); + + private throttler = new TaskThrottler(); + + constructor(private ws: InternalWalletState) {} + + async loadTasksFromDb(): Promise { + const activeTasks = await getActiveTaskIds(this.ws); + + for (const tid of activeTasks.taskIds) { + this.startShepherdTask(tid); + } + } + + async run(opts: RetryLoopOpts = {}): Promise { + logger.info("Running task loop."); + this.ws.isTaskLoopRunning = true; + await this.loadTasksFromDb(); + while (true) { + if (opts.stopWhenDone && this.sheps.size === 0) { + logger.info("Breaking out of task loop (no more work)."); + break; + } + if (this.ws.stopped) { + logger.info("Breaking out of task loop (wallet stopped)."); + break; + } + await this.iterCond.wait(); + } + this.ws.isTaskLoopRunning = false; + logger.info("Done with task loop."); + } + + startShepherdTask(taskId: TaskId): void { + // Run in the background, no await! + this.internalStartShepherdTask(taskId); + } + + /** + * Stop and re-load all existing tasks. + * + * Mostly useful to interrupt all waits when time-travelling. + */ + reload() { + const tasksIds = [...this.sheps.keys()]; + logger.info(`reloading sheperd with ${tasksIds.length} tasks`); + for (const taskId of tasksIds) { + this.stopShepherdTask(taskId); + } + for (const taskId of tasksIds) { + this.startShepherdTask(taskId); + } + } + + private async internalStartShepherdTask(taskId: TaskId): Promise { + logger.trace(`Starting to shepherd task ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Already have a shepherd for ${taskId}`); + return; + } + logger.trace(`Creating new shepherd for ${taskId}`); + const newShep: ShepherdInfo = { + cts: CancellationToken.create(), + }; + this.sheps.set(taskId, newShep); + try { + await this.internalShepherdTask(taskId, newShep); + } finally { + logger.trace(`Done shepherding ${taskId}`); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + stopShepherdTask(taskId: TaskId): void { + logger.trace(`Stopping shepherding of ${taskId}`); + const oldShep = this.sheps.get(taskId); + if (oldShep) { + logger.trace(`Cancelling old shepherd for ${taskId}`); + oldShep.cts.cancel(); + this.sheps.delete(taskId); + this.iterCond.trigger(); + } + } + + restartShepherdTask(taskId: TaskId): void { + this.stopShepherdTask(taskId); + this.startShepherdTask(taskId); + } + + async resetTaskRetries(taskId: TaskId): Promise { + const maybeNotification = await this.ws.db + .mktxAll() + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(taskId); + return taskToRetryNotification(this.ws, tx, taskId, undefined); + }); + this.stopShepherdTask(taskId); + if (maybeNotification) { + this.ws.notify(maybeNotification); + } + this.startShepherdTask(taskId); + } + + private async internalShepherdTask( + taskId: TaskId, + info: ShepherdInfo, + ): Promise { + while (true) { + if (this.ws.stopped) { + logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`); + return; + } + if (info.cts.token.isCancelled) { + logger.trace(`Shepherd for ${taskId} got cancelled`); + return; + } + const isThrottled = this.throttler.applyThrottle(taskId); + if (isThrottled) { + logger.warn( + `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`, + ); + logger.warn("waiting for 60 seconds"); + await this.ws.timerGroup.resolveAfter( + Duration.fromSpec({ seconds: 60 }), + ); + } + logger.trace(`Shepherd for ${taskId} will call handler`); + // FIXME: This should already return the retry record. + const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { + return await callOperationHandlerForTaskId( + this.ws, + taskId, + info.cts.token, + ); + }); + const retryRecord = await this.ws.db.runReadOnlyTx( + ["operationRetries"], + async (tx) => { + return tx.operationRetries.get(taskId); + }, + ); + switch (res.type) { + case TaskRunResultType.Error: { + logger.trace(`Shepherd for ${taskId} got error result.`); + if (retryRecord) { + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } else { + logger.trace("Retrying immediately."); + } + break; + } + case TaskRunResultType.Backoff: { + logger.trace(`Shepherd for ${taskId} got backoff result.`); + if (retryRecord) { + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } else { + logger.trace("Retrying immediately."); + } + break; + } + case TaskRunResultType.Progress: { + logger.trace( + `Shepherd for ${taskId} got progress result, re-running immediately.`, + ); + break; + } + case TaskRunResultType.ScheduleLater: + logger.trace(`Shepherd for ${taskId} got schedule-later result.`); + const delay = AbsoluteTime.remaining(res.runAt); + logger.trace(`Waiting for ${delay.d_ms} ms`); + try { + await info.cts.token.racePromise( + this.ws.timerGroup.resolveAfter(delay), + ); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + break; + case TaskRunResultType.Finished: + logger.trace(`Shepherd for ${taskId} got finished result.`); + return; + default: + assertUnreachable(res); + } + } + } +} + +async function storePendingTaskError( + ws: InternalWalletState, + pendingTaskId: string, + e: TalerErrorDetail, +): Promise { + logger.info(`storing pending task error for ${pendingTaskId}`); + const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + lastError: e, + retryInfo: DbRetryInfo.reset(), + }; + } else { + retryRecord.lastError = e; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + return taskToRetryNotification(ws, tx, pendingTaskId, e); + }); + if (maybeNotification) { + ws.notify(maybeNotification); + } +} + +/** + * Task made progress, clear error. + */ +async function storeTaskProgress( + ws: InternalWalletState, + pendingTaskId: string, +): Promise { + await ws.db.mktxAll().runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +async function storePendingTaskPending( + ws: InternalWalletState, + pendingTaskId: string, +): Promise { + const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + let hadError = false; + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + retryInfo: DbRetryInfo.reset(), + }; + } else { + if (retryRecord.lastError) { + hadError = true; + } + delete retryRecord.lastError; + retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + if (hadError) { + return taskToRetryNotification(ws, tx, pendingTaskId, undefined); + } else { + return undefined; + } + }); + if (maybeNotification) { + ws.notify(maybeNotification); + } +} + +async function storePendingTaskFinished( + ws: InternalWalletState, + pendingTaskId: string, +): Promise { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +async function runTaskWithErrorReporting( + ws: InternalWalletState, + opId: TaskId, + f: () => Promise, +): Promise { + let maybeError: TalerErrorDetail | undefined; + try { + const resp = await f(); + switch (resp.type) { + case TaskRunResultType.Error: + await storePendingTaskError(ws, opId, resp.errorDetail); + return resp; + case TaskRunResultType.Finished: + await storePendingTaskFinished(ws, opId); + return resp; + case TaskRunResultType.Backoff: + await storePendingTaskPending(ws, opId); + return resp; + case TaskRunResultType.ScheduleLater: + // Task succeeded but wants to be run again. + await storeTaskProgress(ws, opId); + return resp; + case TaskRunResultType.Progress: + await storeTaskProgress(ws, opId); + return resp; + } + } catch (e) { + if (e instanceof CryptoApiStoppedError) { + if (ws.stopped) { + logger.warn("crypto API stopped during shutdown, ignoring error"); + return { + type: TaskRunResultType.Error, + errorDetail: makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + {}, + "Crypto API stopped during shutdown", + ), + }; + } + } + if (e instanceof TalerError) { + logger.warn("operation processed resulted in error"); + logger.warn(`error was: ${j2s(e.errorDetail)}`); + maybeError = e.errorDetail; + await storePendingTaskError(ws, opId, maybeError!); + return { + type: TaskRunResultType.Error, + errorDetail: e.errorDetail, + }; + } else if (e instanceof Error) { + // This is a bug, as we expect pending operations to always + // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED + // or return something. + logger.error(`Uncaught exception: ${e.message}`); + logger.error(`Stack: ${e.stack}`); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + { + stack: e.stack, + }, + `unexpected exception (message: ${e.message})`, + ); + await storePendingTaskError(ws, opId, maybeError); + return { + type: TaskRunResultType.Error, + errorDetail: maybeError, + }; + } else { + logger.error("Uncaught exception, value is not even an error."); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + {}, + `unexpected exception (not even an error)`, + ); + await storePendingTaskError(ws, opId, maybeError); + return { + type: TaskRunResultType.Error, + errorDetail: maybeError, + }; + } + } +} + +async function callOperationHandlerForTaskId( + ws: InternalWalletState, + taskId: TaskId, + cancellationToken: CancellationToken, +): Promise { + const pending = parseTaskIdentifier(taskId); + switch (pending.tag) { + case PendingTaskType.ExchangeUpdate: + return await updateExchangeFromUrlHandler( + ws, + pending.exchangeBaseUrl, + cancellationToken, + ); + case PendingTaskType.Refresh: + return await processRefreshGroup( + ws, + pending.refreshGroupId, + cancellationToken, + ); + case PendingTaskType.Withdraw: + return await processWithdrawalGroup( + ws, + pending.withdrawalGroupId, + cancellationToken, + ); + case PendingTaskType.Purchase: + return await processPurchase(ws, pending.proposalId); + case PendingTaskType.Recoup: + return await processRecoupGroup(ws, pending.recoupGroupId); + case PendingTaskType.Deposit: + return await processDepositGroup( + ws, + pending.depositGroupId, + cancellationToken, + ); + case PendingTaskType.Backup: + return await processBackupForProvider(ws, pending.backupProviderBaseUrl); + case PendingTaskType.PeerPushDebit: + return await processPeerPushDebit( + ws, + pending.pursePub, + cancellationToken, + ); + case PendingTaskType.PeerPullCredit: + return await processPeerPullCredit( + ws, + pending.pursePub, + cancellationToken, + ); + case PendingTaskType.PeerPullDebit: + return await processPeerPullDebit(ws, pending.peerPullDebitId); + case PendingTaskType.PeerPushCredit: + return await processPeerPushCredit( + ws, + pending.peerPushCreditId, + cancellationToken, + ); + case PendingTaskType.RewardPickup: + throw Error("not supported anymore"); + default: + return assertUnreachable(pending); + } + throw Error(`not reached ${pending.tag}`); +} + +/** + * Generate an appropriate error transition notification + * for applicable tasks. + * + * Namely, transition notifications are generated for: + * - exchange update errors + * - transactions + */ +async function taskToRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise { + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + + switch (parsedTaskId.tag) { + case PendingTaskType.ExchangeUpdate: + return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.PeerPullCredit: + case PendingTaskType.PeerPullDebit: + case PendingTaskType.Withdraw: + case PendingTaskType.PeerPushCredit: + case PendingTaskType.Deposit: + case PendingTaskType.Refresh: + case PendingTaskType.RewardPickup: + case PendingTaskType.PeerPushDebit: + case PendingTaskType.Purchase: + return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); + case PendingTaskType.Backup: + case PendingTaskType.Recoup: + return undefined; + } +} + +async function makeTransactionRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise { + const txId = convertTaskToTransactionId(pendingTaskId); + if (!txId) { + return undefined; + } + const txState = await ws.getTransactionState(ws, tx, txId); + if (!txState) { + return undefined; + } + const notif: WalletNotification = { + type: NotificationType.TransactionStateTransition, + transactionId: txId, + oldTxState: txState, + newTxState: txState, + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +async function makeExchangeRetryNotification( + ws: InternalWalletState, + tx: GetReadOnlyAccess, + pendingTaskId: string, + e: TalerErrorDetail | undefined, +): Promise { + logger.info("making exchange retry notification"); + const parsedTaskId = parseTaskIdentifier(pendingTaskId); + if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { + throw Error("invalid task identifier"); + } + const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); + + if (!rec) { + logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); + return undefined; + } + + const notif: WalletNotification = { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, + oldExchangeState: getExchangeState(rec), + newExchangeState: getExchangeState(rec), + }; + if (e) { + notif.errorInfo = { + code: e.code as number, + hint: e.hint, + }; + } + return notif; +} + +/** + * Convert the task ID for a task that processes a transaction int + * the ID for the transaction. + */ +function convertTaskToTransactionId( + taskId: string, +): TransactionIdStr | undefined { + const parsedTaskId = parseTaskIdentifier(taskId); + switch (parsedTaskId.tag) { + case PendingTaskType.PeerPullCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullCredit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.PeerPullDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullDebitId: parsedTaskId.peerPullDebitId, + }); + // FIXME: This doesn't distinguish internal-withdrawal. + // Maybe we should have a different task type for that as well? + // Or maybe transaction IDs should be valid task identifiers? + case PendingTaskType.Withdraw: + return constructTransactionIdentifier({ + tag: TransactionType.Withdrawal, + withdrawalGroupId: parsedTaskId.withdrawalGroupId, + }); + case PendingTaskType.PeerPushCredit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushCredit, + peerPushCreditId: parsedTaskId.peerPushCreditId, + }); + case PendingTaskType.Deposit: + return constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: parsedTaskId.depositGroupId, + }); + case PendingTaskType.Refresh: + return constructTransactionIdentifier({ + tag: TransactionType.Refresh, + refreshGroupId: parsedTaskId.refreshGroupId, + }); + case PendingTaskType.RewardPickup: + return constructTransactionIdentifier({ + tag: TransactionType.Reward, + walletRewardId: parsedTaskId.walletRewardId, + }); + case PendingTaskType.PeerPushDebit: + return constructTransactionIdentifier({ + tag: TransactionType.PeerPushDebit, + pursePub: parsedTaskId.pursePub, + }); + case PendingTaskType.Purchase: + return constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId: parsedTaskId.proposalId, + }); + default: + return undefined; + } +} + +export interface ActiveTaskIdsResult { + taskIds: TaskId[]; +} + +export async function getActiveTaskIds( + ws: InternalWalletState, +): Promise { + const res: ActiveTaskIdsResult = { + taskIds: [], + }; + await ws.db + .mktx((x) => [ + x.exchanges, + x.refreshGroups, + x.withdrawalGroups, + x.purchases, + x.depositGroups, + x.recoupGroups, + x.peerPullCredit, + x.peerPushDebit, + x.peerPullDebit, + x.peerPushCredit, + ]) + .runReadWrite(async (tx) => { + const active = GlobalIDB.KeyRange.bound( + OPERATION_STATUS_ACTIVE_FIRST, + OPERATION_STATUS_ACTIVE_LAST, + ); + + // Withdrawals + + { + const activeRecs = + await tx.withdrawalGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: rec.withdrawalGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Deposits + + { + const activeRecs = + await tx.depositGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: rec.depositGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Refreshes + + { + const activeRecs = + await tx.refreshGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: rec.refreshGroupId, + }); + res.taskIds.push(taskId); + } + } + + // Purchases + + { + const activeRecs = await tx.purchases.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: rec.proposalId, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-debit + + { + const activeRecs = + await tx.peerPushDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushDebit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // peer-push-credit + + { + const activeRecs = + await tx.peerPushCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushCredit, + peerPushCreditId: rec.peerPushCreditId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-debit + + { + const activeRecs = + await tx.peerPullDebit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullDebit, + peerPullDebitId: rec.peerPullDebitId, + }); + res.taskIds.push(taskId); + } + } + + // peer-pull-credit + + { + const activeRecs = + await tx.peerPullCredit.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullCredit, + pursePub: rec.pursePub, + }); + res.taskIds.push(taskId); + } + } + + // recoup + + { + const activeRecs = + await tx.recoupGroups.indexes.byStatus.getAll(active); + for (const rec of activeRecs) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Recoup, + recoupGroupId: rec.recoupGroupId, + }); + res.taskIds.push(taskId); + } + } + + // exchange update + + { + const exchanges = await tx.exchanges.getAll(); + for (const rec of exchanges) { + const taskIdUpdate = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: rec.baseUrl, + }); + res.taskIds.push(taskIdUpdate); + } + } + + // FIXME: Recoup! + }); + + return res; +} diff --git a/packages/taler-wallet-core/src/util/coinSelection.ts b/packages/taler-wallet-core/src/util/coinSelection.ts index e06d7454b..be868867d 100644 --- a/packages/taler-wallet-core/src/util/coinSelection.ts +++ b/packages/taler-wallet-core/src/util/coinSelection.ts @@ -56,10 +56,8 @@ import { } from "@gnu-taler/taler-util"; import { DenominationRecord } from "../db.js"; import { - getAutoRefreshExecuteThreshold, getExchangeWireDetailsInTx, isWithdrawableDenom, - WalletDbReadOnlyTransaction, WalletDbReadOnlyTransactionArr, } from "../index.js"; import { InternalWalletState } from "../internal-wallet-state.js"; @@ -67,6 +65,7 @@ import { getMerchantPaymentBalanceDetails, getPeerPaymentBalanceDetailsInTx, } from "../operations/balance.js"; +import { getAutoRefreshExecuteThreshold } from "../operations/common.js"; import { checkDbInvariant, checkLogicInvariant } from "./invariants.js"; const logger = new Logger("coinSelection.ts"); diff --git a/packages/taler-wallet-core/src/util/promiseUtils.ts b/packages/taler-wallet-core/src/util/promiseUtils.ts index d152a12f4..bc1e40260 100644 --- a/packages/taler-wallet-core/src/util/promiseUtils.ts +++ b/packages/taler-wallet-core/src/util/promiseUtils.ts @@ -70,3 +70,43 @@ export class AsyncCondition { this.promCap = undefined; } } + +/** + * Flag that can be raised to notify asynchronous waiters. + * + * You can think of it as a promise that can + * be un-resolved. + */ +export class AsyncFlag { + private promCap?: OpenedPromise = undefined; + private internalFlagRaised: boolean = false; + + constructor() {} + + /** + * Wait until the flag is raised. + * + * Reset if before returning. + */ + wait(): Promise { + if (this.internalFlagRaised) { + return Promise.resolve(); + } + if (!this.promCap) { + this.promCap = openPromise(); + } + return this.promCap.promise; + } + + raise(): void { + this.internalFlagRaised = true; + if (this.promCap) { + this.promCap.resolve(); + } + } + + reset(): void { + this.internalFlagRaised = false; + this.promCap = undefined; + } +} diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts index d96a03c61..5fba61f11 100644 --- a/packages/taler-wallet-core/src/util/query.ts +++ b/packages/taler-wallet-core/src/util/query.ts @@ -919,7 +919,7 @@ export class DbAccess { strStoreNames.push(swi.storeName); accessibleStores[swi.storeName] = swi; } - const tx = this.db.transaction(strStoreNames, "readwrite"); + const tx = this.db.transaction(strStoreNames, "readonly"); const readContext = makeReadContext(tx, accessibleStores); return runTx(tx, readContext, txf); } diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts index cdde2ee62..190558e14 100644 --- a/packages/taler-wallet-core/src/wallet-api-types.ts +++ b/packages/taler-wallet-core/src/wallet-api-types.ts @@ -29,8 +29,6 @@ import { AcceptExchangeTosRequest, AcceptManualWithdrawalRequest, AcceptManualWithdrawalResult, - AcceptRewardRequest, - AcceptTipResponse, AcceptWithdrawalResponse, AddExchangeRequest, AddGlobalCurrencyAuditorRequest, @@ -102,8 +100,6 @@ import { PreparePeerPushCreditRequest, PreparePeerPushCreditResponse, PrepareRefundRequest, - PrepareRewardRequest, - PrepareTipResult as PrepareRewardResult, PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, RecoverStoredBackupRequest, @@ -242,7 +238,6 @@ export enum WalletApiOperation { DeleteStoredBackup = "deleteStoredBackup", RecoverStoredBackup = "recoverStoredBackup", UpdateExchangeEntry = "updateExchangeEntry", - TestingWaitTasksProcessed = "testingWaitTasksProcessed", ListExchangesForScopedCurrency = "listExchangesForScopedCurrency", PrepareWithdrawExchange = "prepareWithdrawExchange", TestingInfiniteTransactionLoop = "testingInfiniteTransactionLoop", @@ -1124,15 +1119,6 @@ export type TestingWaitRefreshesFinalOp = { response: EmptyObject; }; -/** - * Wait until all tasks have been processed and the wallet is idle. - */ -export type TestingWaitTasksProcessedOp = { - op: WalletApiOperation.TestingWaitTasksProcessed; - request: EmptyObject; - response: EmptyObject; -}; - /** * Wait until a transaction is in a particular state. */ @@ -1245,7 +1231,6 @@ export type WalletOperations = { [WalletApiOperation.ValidateIban]: ValidateIbanOp; [WalletApiOperation.TestingWaitTransactionsFinal]: TestingWaitTransactionsFinalOp; [WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp; - [WalletApiOperation.TestingWaitTasksProcessed]: TestingWaitTasksProcessedOp; [WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp; [WalletApiOperation.TestingWaitTransactionState]: TestingWaitTransactionStateOp; [WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp; diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 42aa8cdfc..0246597be 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -24,7 +24,6 @@ */ import { IDBFactory } from "@gnu-taler/idb-bridge"; import { - AbsoluteTime, AmountString, Amounts, CoinDumpJson, @@ -33,7 +32,6 @@ import { CreateStoredBackupResponse, DeleteStoredBackupRequest, DenominationInfo, - Duration, ExchangesShortListResponse, GetCurrencySpecificationResponse, InitResponse, @@ -42,15 +40,14 @@ import { ListGlobalCurrencyAuditorsResponse, ListGlobalCurrencyExchangesResponse, Logger, - NotificationType, PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, RecoverStoredBackupRequest, + RetryLoopOpts, StoredBackupList, TalerError, TalerErrorCode, TalerUriAction, - TaskThrottler, TestingWaitTransactionRequest, TransactionState, TransactionType, @@ -123,8 +120,6 @@ import { codecForUserAttentionsRequest, codecForValidateIbanRequest, codecForWithdrawTestBalance, - durationFromSpec, - durationMin, getErrorDetailFromException, j2s, parsePaytoUri, @@ -152,7 +147,6 @@ import { } from "./db.js"; import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js"; import { - ActiveLongpollInfo, CancelFn, InternalWalletState, MerchantInfo, @@ -172,23 +166,16 @@ import { getBackupInfo, getBackupRecovery, loadBackupRecovery, - processBackupForProvider, removeBackupProvider, runBackupCycle, setWalletDeviceId, } from "./operations/backup/index.js"; import { getBalanceDetail, getBalances } from "./operations/balance.js"; -import { - TaskRunResult, - TaskRunResultType, - runTaskWithErrorReporting, -} from "./operations/common.js"; import { computeDepositTransactionStatus, createDepositGroup, generateDepositGroupTxId, prepareDepositGroup, - processDepositGroup, } from "./operations/deposits.js"; import { acceptExchangeTermsOfService, @@ -200,7 +187,6 @@ import { getExchangeTos, listExchanges, lookupExchangeByUri, - updateExchangeFromUrlHandler, } from "./operations/exchanges.js"; import { computePayMerchantTransactionState, @@ -209,7 +195,6 @@ import { getContractTermsDetails, preparePayForTemplate, preparePayForUri, - processPurchase, sharePayment, startQueryRefund, startRefundQueryForUri, @@ -218,38 +203,28 @@ import { checkPeerPullPaymentInitiation, computePeerPullCreditTransactionState, initiatePeerPullPayment, - processPeerPullCredit, } from "./operations/pay-peer-pull-credit.js"; import { computePeerPullDebitTransactionState, confirmPeerPullDebit, preparePeerPullDebit, - processPeerPullDebit, } from "./operations/pay-peer-pull-debit.js"; import { computePeerPushCreditTransactionState, confirmPeerPushCredit, preparePeerPushCredit, - processPeerPushCredit, } from "./operations/pay-peer-push-credit.js"; import { checkPeerPushDebit, computePeerPushDebitTransactionState, initiatePeerPushDebit, - processPeerPushDebit, } from "./operations/pay-peer-push-debit.js"; -import { getPendingOperations } from "./operations/pending.js"; -import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js"; +import { createRecoupGroup } from "./operations/recoup.js"; import { - autoRefresh, computeRefreshTransactionState, forceRefresh, - processRefreshGroup, } from "./operations/refresh.js"; -import { - computeRewardTransactionStatus, - processTip, -} from "./operations/reward.js"; +import { computeRewardTransactionStatus } from "./operations/reward.js"; import { runIntegrationTest, runIntegrationTest2, @@ -257,7 +232,6 @@ import { waitTransactionState, waitUntilAllTransactionsFinal, waitUntilRefreshesDone, - waitUntilTasksProcessed, withdrawTestBalance, } from "./operations/testing.js"; import { @@ -279,9 +253,9 @@ import { createManualWithdrawal, getExchangeWithdrawalInfo, getWithdrawalDetailsForUri, - processWithdrawalGroup, } from "./operations/withdraw.js"; -import { PendingTaskInfo, PendingTaskType } from "./pending-types.js"; +import { PendingOperationsResponse } from "./pending-types.js"; +import { TaskScheduler } from "./shepherd.js"; import { assertUnreachable } from "./util/assertUnreachable.js"; import { convertDepositAmount, @@ -320,184 +294,11 @@ import { const logger = new Logger("wallet.ts"); -/** - * Call the right handler for a pending operation without doing - * any special error handling. - */ -async function callOperationHandler( - ws: InternalWalletState, - pending: PendingTaskInfo, -): Promise { - switch (pending.type) { - case PendingTaskType.ExchangeUpdate: - return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl); - case PendingTaskType.Refresh: - return await processRefreshGroup(ws, pending.refreshGroupId); - case PendingTaskType.Withdraw: - return await processWithdrawalGroup(ws, pending.withdrawalGroupId); - case PendingTaskType.RewardPickup: - return await processTip(ws, pending.tipId); - case PendingTaskType.Purchase: - return await processPurchase(ws, pending.proposalId); - case PendingTaskType.Recoup: - return await processRecoupGroup(ws, pending.recoupGroupId); - case PendingTaskType.ExchangeCheckRefresh: - return await autoRefresh(ws, pending.exchangeBaseUrl); - case PendingTaskType.Deposit: - return await processDepositGroup(ws, pending.depositGroupId); - case PendingTaskType.Backup: - return await processBackupForProvider(ws, pending.backupProviderBaseUrl); - case PendingTaskType.PeerPushDebit: - return await processPeerPushDebit(ws, pending.pursePub); - case PendingTaskType.PeerPullCredit: - return await processPeerPullCredit(ws, pending.pursePub); - case PendingTaskType.PeerPullDebit: - return await processPeerPullDebit(ws, pending.peerPullDebitId); - case PendingTaskType.PeerPushCredit: - return await processPeerPushCredit(ws, pending.peerPushCreditId); - default: - return assertUnreachable(pending); - } - throw Error(`not reached ${pending.type}`); -} - -/** - * Process pending operations. - */ -export async function runPending(ws: InternalWalletState): Promise { - const pendingOpsResponse = await getPendingOperations(ws); - for (const p of pendingOpsResponse.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - await runTaskWithErrorReporting(ws, p.id, async () => { - logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); - return await callOperationHandler(ws, p); - }); - } -} - -export interface RetryLoopOpts { - /** - * Stop the retry loop when all lifeness-giving pending operations - * are done. - * - * Defaults to false. - */ - stopWhenDone?: boolean; -} - -/** - * Main retry loop of the wallet. - * - * Looks up pending operations from the wallet, runs them, repeat. - */ async function runTaskLoop( ws: InternalWalletState, opts: RetryLoopOpts = {}, ): Promise { - logger.trace(`running task loop opts=${j2s(opts)}`); - if (ws.isTaskLoopRunning) { - logger.warn( - "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided", - ); - } - const throttler = new TaskThrottler(); - ws.isTaskLoopRunning = true; - for (let iteration = 0; !ws.stopped; iteration++) { - const pending = await getPendingOperations(ws); - logger.trace(`pending operations: ${j2s(pending)}`); - let numGivingLiveness = 0; - let numDue = 0; - let numThrottled = 0; - let minDue: AbsoluteTime = AbsoluteTime.never(); - - for (const p of pending.pendingOperations) { - if (p.givesLifeness) { - numGivingLiveness++; - } - if (!p.isDue) { - continue; - } - numDue++; - - const isThrottled = throttler.applyThrottle(p.id); - - if (isThrottled) { - logger.warn( - `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`, - ); - numDue--; - numThrottled++; - } else { - minDue = AbsoluteTime.min(minDue, p.timestampDue); - } - } - - logger.trace( - `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #throttled=${numThrottled}`, - ); - - if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { - logger.warn(`stopping, as no pending operations have lifeness`); - ws.isTaskLoopRunning = false; - return; - } - - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return; - } - - // Make sure that we run tasks that don't give lifeness at least - // one time. - if (iteration !== 0 && numDue === 0) { - // We've executed pending, due operations at least one. - // Now we don't have any more operations available, - // and need to wait. - - // Wait for at most 5 seconds to the next check. - const dt = durationMin( - durationFromSpec({ - seconds: 5, - }), - Duration.getRemaining(minDue), - ); - logger.trace(`waiting for at most ${dt.d_ms} ms`); - const timeout = ws.timerGroup.resolveAfter(dt); - // Wait until either the timeout, or we are notified (via the latch) - // that more work might be available. - await Promise.race([timeout, ws.workAvailable.wait()]); - logger.trace(`done waiting for available work`); - } else { - logger.trace( - `running ${pending.pendingOperations.length} pending operations`, - ); - for (const p of pending.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - logger.trace(`running task ${p.id}`); - const res = await runTaskWithErrorReporting(ws, p.id, async () => { - return await callOperationHandler(ws, p); - }); - if (!(ws.stopped && res.type === TaskRunResultType.Error)) { - ws.notify({ - type: NotificationType.PendingOperationProcessed, - id: p.id, - taskResultType: res.type, - }); - } - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return; - } - } - } - } - logger.trace("exiting wallet task loop"); - ws.isTaskLoopRunning = false; - return; + await ws.taskScheduler.run(opts); } /** @@ -1035,7 +836,10 @@ async function dispatchRequestInternal( return await getUserAttentionsUnreadCount(ws, req); } case WalletApiOperation.GetPendingOperations: { - return await getPendingOperations(ws); + // FIXME: Eventually remove the handler after deprecation period. + return { + pendingOperations: [], + } satisfies PendingOperationsResponse; } case WalletApiOperation.SetExchangeTosAccepted: { const req = codecForAcceptExchangeTosRequest().decode(payload); @@ -1066,8 +870,7 @@ async function dispatchRequestInternal( return getContractTermsDetails(ws, req.proposalId); } case WalletApiOperation.RetryPendingNow: { - // FIXME: Should we reset all operation retries here? - await runPending(ws); + logger.error("retryPendingNow currently not implemented"); return {}; } case WalletApiOperation.SharePayment: { @@ -1175,10 +978,6 @@ async function dispatchRequestInternal( await waitTransactionState(ws, req.transactionId, req.txState); return {}; } - case WalletApiOperation.TestingWaitTasksProcessed: { - await waitUntilTasksProcessed(ws); - return {}; - } case WalletApiOperation.GetCurrencySpecification: { // Ignore result, just validate in this mock implementation const req = codecForGetCurrencyInfoRequest().decode(payload); @@ -1451,7 +1250,7 @@ async function dispatchRequestInternal( case WalletApiOperation.TestingSetTimetravel: { const req = codecForTestingSetTimetravelRequest().decode(payload); setDangerousTimetravel(req.offsetMs); - ws.workAvailable.trigger(); + ws.taskScheduler.reload(); return {}; } case WalletApiOperation.DeleteExchange: { @@ -1634,11 +1433,6 @@ export class Wallet { this.ws.stop(); } - async runPending(): Promise { - await this.ws.ensureWalletDbOpen(); - return runPending(this.ws); - } - async runTaskLoop(opts?: RetryLoopOpts): Promise { await this.ws.ensureWalletDbOpen(); return runTaskLoop(this.ws, opts); @@ -1660,11 +1454,6 @@ export class Wallet { * This ties together all the operation implementations. */ class InternalWalletStateImpl implements InternalWalletState { - /** - * @see {@link InternalWalletState.activeLongpoll} - */ - activeLongpoll: ActiveLongpollInfo = {}; - cryptoApi: TalerCryptoInterface; cryptoDispatcher: CryptoDispatcher; @@ -1697,6 +1486,8 @@ class InternalWalletStateImpl implements InternalWalletState { isTaskLoopRunning: boolean = false; + taskScheduler: TaskScheduler = new TaskScheduler(this); + config: Readonly; private _db: DbAccess | undefined = undefined; @@ -1843,7 +1634,7 @@ class InternalWalletStateImpl implements InternalWalletState { } notify(n: WalletNotification): void { - logger.trace("Notification", j2s(n)); + logger.trace(`Notification: ${j2s(n)}`); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); setTimeout(() => { @@ -1870,11 +1661,6 @@ class InternalWalletStateImpl implements InternalWalletState { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); this.cryptoDispatcher.stop(); - for (const key of Object.keys(this.activeLongpoll)) { - logger.trace(`cancelling active longpoll ${key}`); - this.activeLongpoll[key].cancel(); - delete this.activeLongpoll[key]; - } } /** -- cgit v1.2.3