diff options
Diffstat (limited to 'packages/taler-wallet-core/src/wallet.ts')
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 1862 |
1 files changed, 956 insertions, 906 deletions
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 2d422e59c..ea47ffad7 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -22,13 +22,16 @@ /** * Imports. */ -import { IDBFactory } from "@gnu-taler/idb-bridge"; +import { IDBDatabase, IDBFactory } from "@gnu-taler/idb-bridge"; import { AbsoluteTime, + ActiveTask, + AmountJson, AmountString, Amounts, + AsyncCondition, + CancellationToken, CoinDumpJson, - CoinRefreshRequest, CoinStatus, CoreApiResponse, CreateStoredBackupResponse, @@ -40,43 +43,55 @@ import { InitResponse, KnownBankAccounts, KnownBankAccountsInfo, + ListGlobalCurrencyAuditorsResponse, + ListGlobalCurrencyExchangesResponse, Logger, - WithdrawalDetailsForAmount, - MerchantUsingTemplateDetails, NotificationType, + ObservabilityContext, + ObservabilityEventType, + ObservableHttpClientLibrary, + OpenedPromise, + PartialWalletRunConfig, PrepareWithdrawExchangeRequest, PrepareWithdrawExchangeResponse, RecoverStoredBackupRequest, - RefreshReason, - ScopeType, StoredBackupList, TalerError, TalerErrorCode, + TalerProtocolTimestamp, TalerUriAction, - TaskThrottler, + TestingGetDenomStatsResponse, + TestingListTasksForTransactionsResponse, TestingWaitTransactionRequest, - TransactionState, + TimerAPI, + TimerGroup, TransactionType, - URL, ValidateIbanResponse, WalletCoreVersion, WalletNotification, + WalletRunConfig, + canonicalizeBaseUrl, + checkDbInvariant, codecForAbortTransaction, codecForAcceptBankIntegratedWithdrawalRequest, codecForAcceptExchangeTosRequest, - codecForAcceptManualWithdrawalRequet, + codecForAcceptManualWithdrawalRequest, codecForAcceptPeerPullPaymentRequest, - codecForAcceptTipRequest, codecForAddExchangeRequest, + codecForAddGlobalCurrencyAuditorRequest, + codecForAddGlobalCurrencyExchangeRequest, codecForAddKnownBankAccounts, codecForAny, codecForApplyDevExperiment, + codecForCanonicalizeBaseUrlRequest, codecForCheckPeerPullPaymentRequest, codecForCheckPeerPushDebitRequest, codecForConfirmPayRequest, codecForConfirmPeerPushPaymentRequest, + codecForConfirmWithdrawalRequestRequest, codecForConvertAmountRequest, codecForCreateDepositGroupRequest, + codecForDeleteExchangeRequest, codecForDeleteStoredBackupRequest, codecForDeleteTransactionRequest, codecForFailTransactionRequest, @@ -87,26 +102,29 @@ import { codecForGetContractTermsDetails, codecForGetCurrencyInfoRequest, codecForGetExchangeEntryByUrlRequest, + codecForGetExchangeResourcesRequest, codecForGetExchangeTosRequest, codecForGetWithdrawalDetailsForAmountRequest, codecForGetWithdrawalDetailsForUri, codecForImportDbRequest, + codecForInitRequest, codecForInitiatePeerPullPaymentRequest, codecForInitiatePeerPushDebitRequest, codecForIntegrationTestArgs, codecForIntegrationTestV2Args, codecForListExchangesForScopedCurrencyRequest, codecForListKnownBankAccounts, - codecForMerchantPostOrderResponse, + codecForPrepareBankIntegratedWithdrawalRequest, codecForPrepareDepositRequest, codecForPreparePayRequest, codecForPreparePayTemplateRequest, codecForPreparePeerPullPaymentRequest, codecForPreparePeerPushCreditRequest, codecForPrepareRefundRequest, - codecForPrepareRewardRequest, codecForPrepareWithdrawExchangeRequest, codecForRecoverStoredBackupRequest, + codecForRemoveGlobalCurrencyAuditorRequest, + codecForRemoveGlobalCurrencyExchangeRequest, codecForResumeTransaction, codecForRetryTransactionRequest, codecForSetCoinSuspendedRequest, @@ -115,6 +133,8 @@ import { codecForStartRefundQueryRequest, codecForSuspendTransaction, codecForTestPayArgs, + codecForTestingGetDenomStatsRequest, + codecForTestingListTasksForTransactionRequest, codecForTestingSetTimetravelRequest, codecForTransactionByIdRequest, codecForTransactionsRequest, @@ -123,53 +143,23 @@ import { codecForUserAttentionsRequest, codecForValidateIbanRequest, codecForWithdrawTestBalance, - constructPayUri, - durationFromSpec, - durationMin, getErrorDetailFromException, j2s, - parsePayTemplateUri, + openPromise, parsePaytoUri, parseTalerUri, + performanceNow, + safeStringifyException, sampleWalletCoreTransactions, setDangerousTimetravel, validateIban, } from "@gnu-taler/taler-util"; import type { HttpRequestLibrary } from "@gnu-taler/taler-util/http"; -import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; -import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; -import { - CryptoDispatcher, - CryptoWorkerFactory, -} from "./crypto/workers/crypto-dispatcher.js"; -import { - CoinSourceType, - ConfigRecordKey, - DenominationRecord, - WalletStoresV1, - clearDatabase, - exportDb, - importDb, - openStoredBackupsDatabase, - openTalerDatabase, -} from "./db.js"; -import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js"; -import { - ActiveLongpollInfo, - CancelFn, - ExchangeOperations, - InternalWalletState, - MerchantInfo, - MerchantOperations, - NotificationListener, - RecoupOperations, - RefreshOperations, -} from "./internal-wallet-state.js"; import { getUserAttentions, getUserAttentionsUnreadCount, markAttentionRequestAsRead, -} from "./operations/attention.js"; +} from "./attention.js"; import { addBackupProvider, codecForAddBackupProviderRequest, @@ -178,382 +168,186 @@ import { getBackupInfo, getBackupRecovery, loadBackupRecovery, - processBackupForProvider, removeBackupProvider, runBackupCycle, setWalletDeviceId, -} from "./operations/backup/index.js"; -import { getBalanceDetail, getBalances } from "./operations/balance.js"; +} from "./backup/index.js"; +import { getBalanceDetail, getBalances } from "./balance.js"; +import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { - TaskIdentifiers, - TaskRunResult, - TaskRunResultType, - makeExchangeListItem, - runTaskWithErrorReporting, -} from "./operations/common.js"; + CryptoDispatcher, + CryptoWorkerFactory, +} from "./crypto/workers/crypto-dispatcher.js"; +import { + CoinSourceType, + ConfigRecordKey, + DenominationRecord, + WalletDbReadOnlyTransaction, + WalletStoresV1, + clearDatabase, + exportDb, + importDb, + openStoredBackupsDatabase, + openTalerDatabase, + timestampAbsoluteFromDb, + timestampProtocolToDb, +} from "./db.js"; import { - computeDepositTransactionStatus, + checkDepositGroup, createDepositGroup, generateDepositGroupTxId, - prepareDepositGroup, - processDepositGroup, -} from "./operations/deposits.js"; +} from "./deposits.js"; +import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js"; import { + ReadyExchangeSummary, acceptExchangeTermsOfService, addPresetExchangeEntry, + deleteExchange, fetchFreshExchange, + forgetExchangeTermsOfService, getExchangeDetailedInfo, - getExchangeDetails, + getExchangeResources, getExchangeTos, - getExchanges, - updateExchangeFromUrlHandler, -} from "./operations/exchanges.js"; -import { getMerchantInfo } from "./operations/merchants.js"; + listExchanges, + lookupExchangeByUri, +} from "./exchanges.js"; +import { + convertDepositAmount, + convertPeerPushAmount, + convertWithdrawalAmount, + getMaxDepositAmount, + getMaxPeerPushAmount, +} from "./instructedAmountConversion.js"; +import { + ObservableDbAccess, + ObservableTaskScheduler, + observeTalerCrypto, +} from "./observable-wrappers.js"; import { - computePayMerchantTransactionState, - computeRefundTransactionState, confirmPay, getContractTermsDetails, + preparePayForTemplate, preparePayForUri, - processPurchase, sharePayment, startQueryRefund, startRefundQueryForUri, -} from "./operations/pay-merchant.js"; +} from "./pay-merchant.js"; import { checkPeerPullPaymentInitiation, - computePeerPullCreditTransactionState, initiatePeerPullPayment, - processPeerPullCredit, -} from "./operations/pay-peer-pull-credit.js"; +} from "./pay-peer-pull-credit.js"; import { - computePeerPullDebitTransactionState, confirmPeerPullDebit, preparePeerPullDebit, - processPeerPullDebit, -} from "./operations/pay-peer-pull-debit.js"; +} from "./pay-peer-pull-debit.js"; import { - computePeerPushCreditTransactionState, confirmPeerPushCredit, preparePeerPushCredit, - processPeerPushCredit, -} from "./operations/pay-peer-push-credit.js"; +} from "./pay-peer-push-credit.js"; import { checkPeerPushDebit, - computePeerPushDebitTransactionState, initiatePeerPushDebit, - processPeerPushDebit, -} from "./operations/pay-peer-push-debit.js"; -import { getPendingOperations } from "./operations/pending.js"; -import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js"; +} from "./pay-peer-push-debit.js"; import { - autoRefresh, - computeRefreshTransactionState, - createRefreshGroup, - processRefreshGroup, -} from "./operations/refresh.js"; + AfterCommitInfo, + DbAccess, + DbAccessImpl, + TriggerSpec, +} from "./query.js"; +import { forceRefresh } from "./refresh.js"; import { - acceptTip, - computeRewardTransactionStatus, - prepareTip, - processTip, -} from "./operations/reward.js"; + TaskScheduler, + TaskSchedulerImpl, + convertTaskToTransactionId, + listTaskForTransactionId, +} from "./shepherd.js"; import { runIntegrationTest, runIntegrationTest2, testPay, + waitTasksDone, waitTransactionState, + waitUntilAllTransactionsFinal, waitUntilRefreshesDone, - waitUntilTasksProcessed, - waitUntilTransactionsFinal, withdrawTestBalance, -} from "./operations/testing.js"; +} from "./testing.js"; import { abortTransaction, + constructTransactionIdentifier, deleteTransaction, failTransaction, getTransactionById, getTransactions, + getWithdrawalTransactionByUri, parseTransactionIdentifier, resumeTransaction, retryTransaction, suspendTransaction, -} from "./operations/transactions.js"; -import { - acceptWithdrawalFromUri, - computeWithdrawalTransactionStatus, - createManualWithdrawal, - getExchangeWithdrawalInfo, - getWithdrawalDetailsForUri, - processWithdrawalGroup, -} from "./operations/withdraw.js"; -import { PendingTaskInfo, PendingTaskType } from "./pending-types.js"; -import { assertUnreachable } from "./util/assertUnreachable.js"; -import { - convertDepositAmount, - convertPeerPushAmount, - convertWithdrawalAmount, - getMaxDepositAmount, - getMaxPeerPushAmount, -} from "./util/instructedAmountConversion.js"; -import { checkDbInvariant } from "./util/invariants.js"; -import { - AsyncCondition, - OpenedPromise, - openPromise, -} from "./util/promiseUtils.js"; -import { - DbAccess, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "./util/query.js"; -import { TimerAPI, TimerGroup } from "./util/timer.js"; +} from "./transactions.js"; import { WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION, WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_COREBANK_API_PROTOCOL_VERSION, - WALLET_CORE_API_IMPLEMENTATION_VERSION, + WALLET_CORE_API_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION, WALLET_MERCHANT_PROTOCOL_VERSION, } from "./versions.js"; import { WalletApiOperation, - WalletConfig, - WalletConfigParameter, WalletCoreApiClient, WalletCoreResponseType, } from "./wallet-api-types.js"; +import { + acceptWithdrawalFromUri, + confirmWithdrawal, + createManualWithdrawal, + getWithdrawalDetailsForAmount, + getWithdrawalDetailsForUri, + prepareBankIntegratedWithdrawal, +} from "./withdraw.js"; const logger = new Logger("wallet.ts"); /** - * Call the right handler for a pending operation without doing - * any special error handling. - */ -async function callOperationHandler( - ws: InternalWalletState, - pending: PendingTaskInfo, -): Promise<TaskRunResult> { - switch (pending.type) { - case PendingTaskType.ExchangeUpdate: - return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl); - case PendingTaskType.Refresh: - return await processRefreshGroup(ws, pending.refreshGroupId); - case PendingTaskType.Withdraw: - return await processWithdrawalGroup(ws, pending.withdrawalGroupId); - case PendingTaskType.RewardPickup: - return await processTip(ws, pending.tipId); - case PendingTaskType.Purchase: - return await processPurchase(ws, pending.proposalId); - case PendingTaskType.Recoup: - return await processRecoupGroup(ws, pending.recoupGroupId); - case PendingTaskType.ExchangeCheckRefresh: - return await autoRefresh(ws, pending.exchangeBaseUrl); - case PendingTaskType.Deposit: - return await processDepositGroup(ws, pending.depositGroupId); - case PendingTaskType.Backup: - return await processBackupForProvider(ws, pending.backupProviderBaseUrl); - case PendingTaskType.PeerPushDebit: - return await processPeerPushDebit(ws, pending.pursePub); - case PendingTaskType.PeerPullCredit: - return await processPeerPullCredit(ws, pending.pursePub); - case PendingTaskType.PeerPullDebit: - return await processPeerPullDebit(ws, pending.peerPullDebitId); - case PendingTaskType.PeerPushCredit: - return await processPeerPushCredit(ws, pending.peerPushCreditId); - default: - return assertUnreachable(pending); - } - throw Error(`not reached ${pending.type}`); -} - -/** - * Process pending operations. - */ -export async function runPending(ws: InternalWalletState): Promise<void> { - const pendingOpsResponse = await getPendingOperations(ws); - for (const p of pendingOpsResponse.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - await runTaskWithErrorReporting(ws, p.id, async () => { - logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); - return await callOperationHandler(ws, p); - }); - } -} - -export interface RetryLoopOpts { - /** - * Stop when the number of retries is exceeded for any pending - * operation. - */ - maxRetries?: number; - - /** - * Stop the retry loop when all lifeness-giving pending operations - * are done. - * - * Defaults to false. - */ - stopWhenDone?: boolean; -} - -export interface TaskLoopResult { - /** - * Was the maximum number of retries exceeded in a task? - */ - retriesExceeded: boolean; -} - -/** - * Main retry loop of the wallet. + * Execution context for code that is run in the wallet. * - * Looks up pending operations from the wallet, runs them, repeat. + * Typically the execution context is either for a wallet-core + * request handler or for a shepherded task. */ -async function runTaskLoop( - ws: InternalWalletState, - opts: RetryLoopOpts = {}, -): Promise<TaskLoopResult> { - logger.trace(`running task loop opts=${j2s(opts)}`); - if (ws.isTaskLoopRunning) { - logger.warn( - "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided", - ); - } - const throttler = new TaskThrottler(); - ws.isTaskLoopRunning = true; - let retriesExceeded = false; - for (let iteration = 0; !ws.stopped; iteration++) { - const pending = await getPendingOperations(ws); - logger.trace(`pending operations: ${j2s(pending)}`); - let numGivingLiveness = 0; - let numDue = 0; - let numThrottled = 0; - let minDue: AbsoluteTime = AbsoluteTime.never(); - - for (const p of pending.pendingOperations) { - const maxRetries = opts.maxRetries; - - if (maxRetries && p.retryInfo && p.retryInfo.retryCounter > maxRetries) { - retriesExceeded = true; - logger.warn( - `skipping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`, - ); - continue; - } - if (p.givesLifeness) { - numGivingLiveness++; - } - if (!p.isDue) { - continue; - } - numDue++; - - const isThrottled = throttler.applyThrottle(p.id); - - if (isThrottled) { - logger.warn( - `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`, - ); - numDue--; - numThrottled++; - } else { - minDue = AbsoluteTime.min(minDue, p.timestampDue); - } - } - - logger.trace( - `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #trottled=${numThrottled}`, - ); - - if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { - logger.warn(`stopping, as no pending operations have lifeness`); - ws.isTaskLoopRunning = false; - return { - retriesExceeded, - }; - } +export interface WalletExecutionContext { + readonly ws: InternalWalletState; + readonly cryptoApi: TalerCryptoInterface; + readonly cancellationToken: CancellationToken; + readonly http: HttpRequestLibrary; + readonly db: DbAccess<typeof WalletStoresV1>; + readonly oc: ObservabilityContext; + readonly taskScheduler: TaskScheduler; +} - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return { - retriesExceeded, - }; - } +export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; +export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; - // Make sure that we run tasks that don't give lifeness at least - // one time. - if (iteration !== 0 && numDue === 0) { - // We've executed pending, due operations at least one. - // Now we don't have any more operations available, - // and need to wait. +export type NotificationListener = (n: WalletNotification) => void; - // Wait for at most 5 seconds to the next check. - const dt = durationMin( - durationFromSpec({ - seconds: 5, - }), - Duration.getRemaining(minDue), - ); - logger.trace(`waiting for at most ${dt.d_ms} ms`); - const timeout = ws.timerGroup.resolveAfter(dt); - // Wait until either the timeout, or we are notified (via the latch) - // that more work might be available. - await Promise.race([timeout, ws.workAvailable.wait()]); - logger.trace(`done waiting for available work`); - } else { - logger.trace( - `running ${pending.pendingOperations.length} pending operations`, - ); - for (const p of pending.pendingOperations) { - if (!AbsoluteTime.isExpired(p.timestampDue)) { - continue; - } - logger.trace(`running task ${p.id}`); - const res = await runTaskWithErrorReporting(ws, p.id, async () => { - return await callOperationHandler(ws, p); - }); - if (!(ws.stopped && res.type === TaskRunResultType.Error)) { - ws.notify({ - type: NotificationType.PendingOperationProcessed, - id: p.id, - taskResultType: res.type, - }); - } - if (ws.stopped) { - ws.isTaskLoopRunning = false; - return { - retriesExceeded, - }; - } - } - } - } - logger.trace("exiting wallet task loop"); - ws.isTaskLoopRunning = false; - return { - retriesExceeded, - }; -} +type CancelFn = () => void; /** * Insert the hard-coded defaults for exchanges, coins and * auditors into the database, unless these defaults have * already been applied. */ -async function fillDefaults(ws: InternalWalletState): Promise<void> { +async function fillDefaults(wex: WalletExecutionContext): Promise<void> { const notifications: WalletNotification[] = []; - await ws.db - .mktx((x) => [x.config, x.exchanges, x.exchangeDetails]) - .runReadWrite(async (tx) => { + await wex.db.runReadWriteTx( + { storeNames: ["config", "exchanges"] }, + async (tx) => { const appliedRec = await tx.config.get("currencyDefaultsApplied"); let alreadyApplied = appliedRec ? !!appliedRec.value : false; if (alreadyApplied) { logger.trace("defaults already applied"); return; } - for (const exch of ws.config.builtin.exchanges) { + for (const exch of wex.ws.config.builtin.exchanges) { const resp = await addPresetExchangeEntry( tx, exch.exchangeBaseUrl, @@ -567,10 +361,31 @@ async function fillDefaults(ws: InternalWalletState): Promise<void> { key: ConfigRecordKey.CurrencyDefaultsApplied, value: true, }); - }); + }, + ); for (const notif of notifications) { - ws.notify(notif); + wex.ws.notify(notif); + } +} + +export async function getDenomInfo( + wex: WalletExecutionContext, + tx: WalletDbReadOnlyTransaction<["denominations"]>, + exchangeBaseUrl: string, + denomPubHash: string, +): Promise<DenominationInfo | undefined> { + const cacheKey = `${exchangeBaseUrl}:${denomPubHash}`; + const cached = wex.ws.denomInfoCache.get(cacheKey); + if (cached) { + return cached; } + const d = await tx.denominations.get([exchangeBaseUrl, denomPubHash]); + if (d) { + const denomInfo = DenominationRecord.toDenomInfo(d); + wex.ws.denomInfoCache.put(cacheKey, denomInfo); + return denomInfo; + } + return undefined; } /** @@ -578,79 +393,73 @@ async function fillDefaults(ws: InternalWalletState): Promise<void> { * previous withdrawals. */ async function listKnownBankAccounts( - ws: InternalWalletState, + wex: WalletExecutionContext, currency?: string, ): Promise<KnownBankAccounts> { const accounts: KnownBankAccountsInfo[] = []; - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadOnly(async (tx) => { - const knownAccounts = await tx.bankAccounts.iter().toArray(); - for (const r of knownAccounts) { - if (currency && currency !== r.currency) { - continue; - } - const payto = parsePaytoUri(r.uri); - if (payto) { - accounts.push({ - uri: payto, - alias: r.alias, - kyc_completed: r.kycCompleted, - currency: r.currency, - }); - } + await wex.db.runReadOnlyTx({ storeNames: ["bankAccounts"] }, async (tx) => { + const knownAccounts = await tx.bankAccounts.iter().toArray(); + for (const r of knownAccounts) { + if (currency && currency !== r.currency) { + continue; } - }); + const payto = parsePaytoUri(r.uri); + if (payto) { + accounts.push({ + uri: payto, + alias: r.alias, + kyc_completed: r.kycCompleted, + currency: r.currency, + }); + } + } + }); return { accounts }; } /** */ async function addKnownBankAccounts( - ws: InternalWalletState, + wex: WalletExecutionContext, payto: string, alias: string, currency: string, ): Promise<void> { - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadWrite(async (tx) => { - tx.bankAccounts.put({ - uri: payto, - alias: alias, - currency: currency, - kycCompleted: false, - }); + await wex.db.runReadWriteTx({ storeNames: ["bankAccounts"] }, async (tx) => { + tx.bankAccounts.put({ + uri: payto, + alias: alias, + currency: currency, + kycCompleted: false, }); + }); return; } /** */ async function forgetKnownBankAccounts( - ws: InternalWalletState, + wex: WalletExecutionContext, payto: string, ): Promise<void> { - await ws.db - .mktx((x) => [x.bankAccounts]) - .runReadWrite(async (tx) => { - const account = await tx.bankAccounts.get(payto); - if (!account) { - throw Error(`account not found: ${payto}`); - } - tx.bankAccounts.delete(account.uri); - }); + await wex.db.runReadWriteTx({ storeNames: ["bankAccounts"] }, async (tx) => { + const account = await tx.bankAccounts.get(payto); + if (!account) { + throw Error(`account not found: ${payto}`); + } + tx.bankAccounts.delete(account.uri); + }); return; } async function setCoinSuspended( - ws: InternalWalletState, + wex: WalletExecutionContext, coinPub: string, suspended: boolean, ): Promise<void> { - await ws.db - .mktx((x) => [x.coins, x.coinAvailability]) - .runReadWrite(async (tx) => { + await wex.db.runReadWriteTx( + { storeNames: ["coins", "coinAvailability"] }, + async (tx) => { const c = await tx.coins.get(coinPub); if (!c) { logger.warn(`coin ${coinPub} not found, won't suspend`); @@ -682,18 +491,19 @@ async function setCoinSuspended( } await tx.coins.put(c); await tx.coinAvailability.put(coinAvailability); - }); + }, + ); } /** * Dump the public information of coins we have in an easy-to-process format. */ -async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> { +async function dumpCoins(wex: WalletExecutionContext): Promise<CoinDumpJson> { const coinsJson: CoinDumpJson = { coins: [] }; logger.info("dumping coins"); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.withdrawalGroups]) - .runReadOnly(async (tx) => { + await wex.db.runReadOnlyTx( + { storeNames: ["coins", "denominations"] }, + async (tx) => { const coins = await tx.coins.iter().toArray(); for (const c of coins) { const denom = await tx.denominations.get([ @@ -713,8 +523,8 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> { if (cs.type == CoinSourceType.Withdraw) { withdrawalReservePub = cs.reservePub; } - const denomInfo = await ws.getDenomInfo( - ws, + const denomInfo = await getDenomInfo( + wex, tx, c.exchangeBaseUrl, c.denomPubHash, @@ -741,20 +551,22 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> { : undefined, }); } - }); + }, + ); return coinsJson; } /** * Get an API client from an internal wallet state object. */ -export async function getClientFromWalletState( +let id = 0; +async function getClientFromWalletState( ws: InternalWalletState, ): Promise<WalletCoreApiClient> { - let id = 0; const client: WalletCoreApiClient = { async call(op, payload): Promise<any> { - const res = await handleCoreApiRequest(ws, op, `${id++}`, payload); + id = (id + 1) % (Number.MAX_SAFE_INTEGER - 100); + const res = await handleCoreApiRequest(ws, op, String(id), payload); switch (res.type) { case "error": throw TalerError.fromUncheckedDetail(res.error); @@ -767,12 +579,12 @@ export async function getClientFromWalletState( } async function createStoredBackup( - ws: InternalWalletState, + wex: WalletExecutionContext, ): Promise<CreateStoredBackupResponse> { - const backup = await exportDb(ws.idb); - const backupsDb = await openStoredBackupsDatabase(ws.idb); + const backup = await exportDb(wex.ws.idb); + const backupsDb = await openStoredBackupsDatabase(wex.ws.idb); const name = `backup-${new Date().getTime()}`; - await backupsDb.mktxAll().runReadWrite(async (tx) => { + await backupsDb.runAllStoresReadWriteTx({}, async (tx) => { await tx.backupMeta.add({ name, }); @@ -784,13 +596,13 @@ async function createStoredBackup( } async function listStoredBackups( - ws: InternalWalletState, + wex: WalletExecutionContext, ): Promise<StoredBackupList> { const storedBackups: StoredBackupList = { storedBackups: [], }; - const backupsDb = await openStoredBackupsDatabase(ws.idb); - await backupsDb.mktxAll().runReadWrite(async (tx) => { + const backupsDb = await openStoredBackupsDatabase(wex.ws.idb); + await backupsDb.runAllStoresReadWriteTx({}, async (tx) => { await tx.backupMeta.iter().forEach((x) => { storedBackups.storedBackups.push({ name: x.name, @@ -801,24 +613,24 @@ async function listStoredBackups( } async function deleteStoredBackup( - ws: InternalWalletState, + wex: WalletExecutionContext, req: DeleteStoredBackupRequest, ): Promise<void> { - const backupsDb = await openStoredBackupsDatabase(ws.idb); - await backupsDb.mktxAll().runReadWrite(async (tx) => { + const backupsDb = await openStoredBackupsDatabase(wex.ws.idb); + await backupsDb.runAllStoresReadWriteTx({}, async (tx) => { await tx.backupData.delete(req.name); await tx.backupMeta.delete(req.name); }); } async function recoverStoredBackup( - ws: InternalWalletState, + wex: WalletExecutionContext, req: RecoverStoredBackupRequest, ): Promise<void> { logger.info(`Recovering stored backup ${req.name}`); const { name } = req; - const backupsDb = await openStoredBackupsDatabase(ws.idb); - const bd = await backupsDb.mktxAll().runReadWrite(async (tx) => { + const backupsDb = await openStoredBackupsDatabase(wex.ws.idb); + const bd = await backupsDb.runAllStoresReadWriteTx({}, async (tx) => { const backupMeta = tx.backupMeta.get(name); if (!backupMeta) { throw Error("backup not found"); @@ -830,12 +642,12 @@ async function recoverStoredBackup( return backupData; }); logger.info(`backup found, now importing`); - await importDb(ws.db.idbHandle(), bd); + await importDb(wex.db.idbHandle(), bd); logger.info(`import done`); } async function handlePrepareWithdrawExchange( - ws: InternalWalletState, + wex: WalletExecutionContext, req: PrepareWithdrawExchangeRequest, ): Promise<PrepareWithdrawExchangeResponse> { const parsedUri = parseTalerUri(req.talerUri); @@ -843,8 +655,8 @@ async function handlePrepareWithdrawExchange( throw Error("expected a taler://withdraw-exchange URI"); } const exchangeBaseUrl = parsedUri.exchangeBaseUrl; - const exchange = await fetchFreshExchange(ws, exchangeBaseUrl); - if (exchange.masterPub != parsedUri.exchangePub) { + const exchange = await fetchFreshExchange(wex, exchangeBaseUrl); + if (parsedUri.exchangePub && exchange.masterPub != parsedUri.exchangePub) { throw Error("mismatch of exchange master public key (URI vs actual)"); } if (parsedUri.amount) { @@ -860,14 +672,27 @@ async function handlePrepareWithdrawExchange( } /** + * Response returned from the pending operations API. + * + * @deprecated this is a placeholder for the response type of a deprecated wallet-core request. + */ +export interface PendingOperationsResponse { + /** + * List of pending operations. + */ + pendingOperations: any[]; +} + +/** * Implementation of the "wallet-core" API. */ -async function dispatchRequestInternal<Op extends WalletApiOperation>( - ws: InternalWalletState, +async function dispatchRequestInternal( + wex: WalletExecutionContext, + cts: CancellationToken.Source, operation: WalletApiOperation, payload: unknown, ): Promise<WalletCoreResponseType<typeof operation>> { - if (!ws.initCalled && operation !== WalletApiOperation.InitWallet) { + if (!wex.ws.initCalled && operation !== WalletApiOperation.InitWallet) { throw Error( `wallet must be initialized before running operation ${operation}`, ); @@ -876,56 +701,95 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( // definitions we already have? switch (operation) { case WalletApiOperation.CreateStoredBackup: - return createStoredBackup(ws); + return createStoredBackup(wex); case WalletApiOperation.DeleteStoredBackup: { const req = codecForDeleteStoredBackupRequest().decode(payload); - await deleteStoredBackup(ws, req); + await deleteStoredBackup(wex, req); return {}; } case WalletApiOperation.ListStoredBackups: - return listStoredBackups(ws); + return listStoredBackups(wex); case WalletApiOperation.RecoverStoredBackup: { const req = codecForRecoverStoredBackupRequest().decode(payload); - await recoverStoredBackup(ws, req); + await recoverStoredBackup(wex, req); return {}; } + case WalletApiOperation.SetWalletRunConfig: case WalletApiOperation.InitWallet: { - logger.trace("initializing wallet"); - ws.initCalled = true; - if (ws.config.testing.skipDefaults) { + const req = codecForInitRequest().decode(payload); + + logger.info(`init request: ${j2s(req)}`); + + if (wex.ws.initCalled) { + logger.info("initializing wallet (repeat initialization)"); + } else { + logger.info("initializing wallet (first initialization)"); + } + + // Write to the DB to make sure that we're failing early in + // case the DB is not writeable. + try { + await wex.db.runReadWriteTx({ storeNames: ["config"] }, async (tx) => { + tx.config.put({ + key: ConfigRecordKey.LastInitInfo, + value: timestampProtocolToDb(TalerProtocolTimestamp.now()), + }); + }); + } catch (e) { + logger.error("error writing to database during initialization"); + throw TalerError.fromDetail(TalerErrorCode.WALLET_DB_UNAVAILABLE, { + innerError: getErrorDetailFromException(e), + }); + } + + wex.ws.initWithConfig(applyRunConfigDefaults(req.config)); + + if (wex.ws.config.testing.skipDefaults) { logger.trace("skipping defaults"); } else { logger.trace("filling defaults"); - await fillDefaults(ws); + await fillDefaults(wex); } const resp: InitResponse = { - versionInfo: getVersion(ws), + versionInfo: getVersion(wex), }; + + // After initialization, task loop should run. + await wex.taskScheduler.ensureRunning(); + + wex.ws.initCalled = true; return resp; } case WalletApiOperation.WithdrawTestkudos: { - await withdrawTestBalance(ws, { + await withdrawTestBalance(wex, { amount: "TESTKUDOS:10" as AmountString, corebankApiBaseUrl: "https://bank.test.taler.net/", exchangeBaseUrl: "https://exchange.test.taler.net/", }); return { - versionInfo: getVersion(ws), + versionInfo: getVersion(wex), }; } case WalletApiOperation.WithdrawTestBalance: { const req = codecForWithdrawTestBalance().decode(payload); - await withdrawTestBalance(ws, req); + await withdrawTestBalance(wex, req); return {}; } + case WalletApiOperation.TestingListTaskForTransaction: { + const req = + codecForTestingListTasksForTransactionRequest().decode(payload); + return { + taskIdList: listTaskForTransactionId(req.transactionId), + } satisfies TestingListTasksForTransactionsResponse; + } case WalletApiOperation.RunIntegrationTest: { const req = codecForIntegrationTestArgs().decode(payload); - await runIntegrationTest(ws, req); + await runIntegrationTest(wex, req); return {}; } case WalletApiOperation.RunIntegrationTestV2: { const req = codecForIntegrationTestV2Args().decode(payload); - await runIntegrationTest2(ws, req); + await runIntegrationTest2(wex, req); return {}; } case WalletApiOperation.ValidateIban: { @@ -938,66 +802,75 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( } case WalletApiOperation.TestPay: { const req = codecForTestPayArgs().decode(payload); - return await testPay(ws, req); + return await testPay(wex, req); } case WalletApiOperation.GetTransactions: { const req = codecForTransactionsRequest().decode(payload); - return await getTransactions(ws, req); + return await getTransactions(wex, req); } case WalletApiOperation.GetTransactionById: { const req = codecForTransactionByIdRequest().decode(payload); - return await getTransactionById(ws, req); + return await getTransactionById(wex, req); + } + case WalletApiOperation.GetWithdrawalTransactionByUri: { + const req = codecForGetWithdrawalDetailsForUri().decode(payload); + return await getWithdrawalTransactionByUri(wex, req); } case WalletApiOperation.AddExchange: { const req = codecForAddExchangeRequest().decode(payload); - await fetchFreshExchange(ws, req.exchangeBaseUrl, { + await fetchFreshExchange(wex, req.exchangeBaseUrl, { expectedMasterPub: req.masterPub, }); return {}; } + case WalletApiOperation.TestingPing: { + return {}; + } case WalletApiOperation.UpdateExchangeEntry: { const req = codecForUpdateExchangeEntryRequest().decode(payload); - await fetchFreshExchange(ws, req.exchangeBaseUrl, { - forceUpdate: true, + await fetchFreshExchange(wex, req.exchangeBaseUrl, { + forceUpdate: !!req.force, }); return {}; } + case WalletApiOperation.TestingGetDenomStats: { + const req = codecForTestingGetDenomStatsRequest().decode(payload); + const denomStats: TestingGetDenomStatsResponse = { + numKnown: 0, + numLost: 0, + numOffered: 0, + }; + await wex.db.runReadOnlyTx( + { storeNames: ["denominations"] }, + async (tx) => { + const denoms = + await tx.denominations.indexes.byExchangeBaseUrl.getAll( + req.exchangeBaseUrl, + ); + for (const d of denoms) { + denomStats.numKnown++; + if (d.isOffered) { + denomStats.numOffered++; + } + if (d.isLost) { + denomStats.numLost++; + } + } + }, + ); + return denomStats; + } case WalletApiOperation.ListExchanges: { - return await getExchanges(ws); + return await listExchanges(wex); } case WalletApiOperation.GetExchangeEntryByUrl: { const req = codecForGetExchangeEntryByUrlRequest().decode(payload); - const exchangeEntry = await ws.db - .mktx((x) => [ - x.exchanges, - x.exchangeDetails, - x.denominations, - x.operationRetries, - ]) - .runReadOnly(async (tx) => { - const exchangeRec = await tx.exchanges.get(req.exchangeBaseUrl); - if (!exchangeRec) { - throw Error("exchange not found"); - } - const exchangeDetails = await getExchangeDetails( - tx, - exchangeRec.baseUrl, - ); - const opRetryRecord = await tx.operationRetries.get( - TaskIdentifiers.forExchangeUpdate(exchangeRec), - ); - return makeExchangeListItem( - exchangeRec, - exchangeDetails, - opRetryRecord?.lastError, - ); - }); - return exchangeEntry; + return lookupExchangeByUri(wex, req); } case WalletApiOperation.ListExchangesForScopedCurrency: { const req = codecForListExchangesForScopedCurrencyRequest().decode(payload); - const exchangesResp = await getExchanges(ws); + const exchangesResp = await listExchanges(wex); const result: ExchangesShortListResponse = { exchanges: [], }; @@ -1014,97 +887,97 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( } case WalletApiOperation.GetExchangeDetailedInfo: { const req = codecForAddExchangeRequest().decode(payload); - return await getExchangeDetailedInfo(ws, req.exchangeBaseUrl); + return await getExchangeDetailedInfo(wex, req.exchangeBaseUrl); } case WalletApiOperation.ListKnownBankAccounts: { const req = codecForListKnownBankAccounts().decode(payload); - return await listKnownBankAccounts(ws, req.currency); + return await listKnownBankAccounts(wex, req.currency); } case WalletApiOperation.AddKnownBankAccounts: { const req = codecForAddKnownBankAccounts().decode(payload); - await addKnownBankAccounts(ws, req.payto, req.alias, req.currency); + await addKnownBankAccounts(wex, req.payto, req.alias, req.currency); return {}; } case WalletApiOperation.ForgetKnownBankAccounts: { const req = codecForForgetKnownBankAccounts().decode(payload); - await forgetKnownBankAccounts(ws, req.payto); + await forgetKnownBankAccounts(wex, req.payto); return {}; } case WalletApiOperation.GetWithdrawalDetailsForUri: { const req = codecForGetWithdrawalDetailsForUri().decode(payload); - return await getWithdrawalDetailsForUri(ws, req.talerWithdrawUri); + return await getWithdrawalDetailsForUri(wex, req.talerWithdrawUri, { + restrictAge: req.restrictAge, + }); } case WalletApiOperation.AcceptManualWithdrawal: { - const req = codecForAcceptManualWithdrawalRequet().decode(payload); - const res = await createManualWithdrawal(ws, { + const req = codecForAcceptManualWithdrawalRequest().decode(payload); + const res = await createManualWithdrawal(wex, { amount: Amounts.parseOrThrow(req.amount), exchangeBaseUrl: req.exchangeBaseUrl, restrictAge: req.restrictAge, + forceReservePriv: req.forceReservePriv, }); return res; } case WalletApiOperation.GetWithdrawalDetailsForAmount: { const req = codecForGetWithdrawalDetailsForAmountRequest().decode(payload); - const wi = await getExchangeWithdrawalInfo( - ws, - req.exchangeBaseUrl, - Amounts.parseOrThrow(req.amount), - req.restrictAge, - ); - let numCoins = 0; - for (const x of wi.selectedDenoms.selectedDenoms) { - numCoins += x.count; - } - const amt = Amounts.parseOrThrow(req.amount); - const resp: WithdrawalDetailsForAmount = { - amountRaw: req.amount, - amountEffective: Amounts.stringify(wi.selectedDenoms.totalCoinValue), - paytoUris: wi.exchangePaytoUris, - tosAccepted: wi.termsOfServiceAccepted, - ageRestrictionOptions: wi.ageRestrictionOptions, - withdrawalAccountsList: wi.exchangeCreditAccountDetails, - numCoins, - // FIXME: Once we have proper scope info support, return correct info here. - scopeInfo: { - type: ScopeType.Exchange, - currency: amt.currency, - url: req.exchangeBaseUrl, - }, - }; + const resp = await getWithdrawalDetailsForAmount(wex, cts, req); return resp; } case WalletApiOperation.GetBalances: { - return await getBalances(ws); + return await getBalances(wex); } case WalletApiOperation.GetBalanceDetail: { const req = codecForGetBalanceDetailRequest().decode(payload); - return await getBalanceDetail(ws, req); + return await getBalanceDetail(wex, req); } case WalletApiOperation.GetUserAttentionRequests: { const req = codecForUserAttentionsRequest().decode(payload); - return await getUserAttentions(ws, req); + return await getUserAttentions(wex, req); } case WalletApiOperation.MarkAttentionRequestAsRead: { const req = codecForUserAttentionByIdRequest().decode(payload); - return await markAttentionRequestAsRead(ws, req); + return await markAttentionRequestAsRead(wex, req); } case WalletApiOperation.GetUserAttentionUnreadCount: { const req = codecForUserAttentionsRequest().decode(payload); - return await getUserAttentionsUnreadCount(ws, req); + return await getUserAttentionsUnreadCount(wex, req); } case WalletApiOperation.GetPendingOperations: { - return await getPendingOperations(ws); + // FIXME: Eventually remove the handler after deprecation period. + return { + pendingOperations: [], + } satisfies PendingOperationsResponse; } case WalletApiOperation.SetExchangeTosAccepted: { const req = codecForAcceptExchangeTosRequest().decode(payload); - await acceptExchangeTermsOfService(ws, req.exchangeBaseUrl, req.etag); + await acceptExchangeTermsOfService(wex, req.exchangeBaseUrl); + return {}; + } + case WalletApiOperation.SetExchangeTosForgotten: { + const req = codecForAcceptExchangeTosRequest().decode(payload); + await forgetExchangeTermsOfService(wex, req.exchangeBaseUrl); return {}; } case WalletApiOperation.AcceptBankIntegratedWithdrawal: { const req = codecForAcceptBankIntegratedWithdrawalRequest().decode(payload); - return await acceptWithdrawalFromUri(ws, { + return await acceptWithdrawalFromUri(wex, { + selectedExchange: req.exchangeBaseUrl, + talerWithdrawUri: req.talerWithdrawUri, + forcedDenomSel: req.forcedDenomSel, + restrictAge: req.restrictAge, + }); + } + case WalletApiOperation.ConfirmWithdrawal: { + const req = codecForConfirmWithdrawalRequestRequest().decode(payload); + return confirmWithdrawal(wex, req.transactionId); + } + case WalletApiOperation.PrepareBankIntegratedWithdrawal: { + const req = + codecForPrepareBankIntegratedWithdrawalRequest().decode(payload); + return prepareBankIntegratedWithdrawal(wex, { selectedExchange: req.exchangeBaseUrl, talerWithdrawUri: req.talerWithdrawUri, forcedDenomSel: req.forcedDenomSel, @@ -1114,7 +987,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( case WalletApiOperation.GetExchangeTos: { const req = codecForGetExchangeTosRequest().decode(payload); return getExchangeTos( - ws, + wex, req.exchangeBaseUrl, req.acceptedFormat, req.acceptLanguage, @@ -1122,168 +995,130 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( } case WalletApiOperation.GetContractTermsDetails: { const req = codecForGetContractTermsDetails().decode(payload); - return getContractTermsDetails(ws, req.proposalId); + if (req.proposalId) { + // FIXME: deprecated path + return getContractTermsDetails(wex, req.proposalId); + } + if (req.transactionId) { + const parsedTx = parseTransactionIdentifier(req.transactionId); + if (parsedTx?.tag === TransactionType.Payment) { + return getContractTermsDetails(wex, parsedTx.proposalId); + } + throw Error("transactionId is not a payment transaction"); + } + throw Error("transactionId missing"); } case WalletApiOperation.RetryPendingNow: { - // FIXME: Should we reset all operation retries here? - await runPending(ws); + logger.error("retryPendingNow currently not implemented"); return {}; } case WalletApiOperation.SharePayment: { const req = codecForSharePaymentRequest().decode(payload); - return await sharePayment(ws, req.merchantBaseUrl, req.orderId); + return await sharePayment(wex, req.merchantBaseUrl, req.orderId); } case WalletApiOperation.PrepareWithdrawExchange: { const req = codecForPrepareWithdrawExchangeRequest().decode(payload); - return handlePrepareWithdrawExchange(ws, req); + return handlePrepareWithdrawExchange(wex, req); } case WalletApiOperation.PreparePayForUri: { const req = codecForPreparePayRequest().decode(payload); - return await preparePayForUri(ws, req.talerPayUri); + return await preparePayForUri(wex, req.talerPayUri); } case WalletApiOperation.PreparePayForTemplate: { const req = codecForPreparePayTemplateRequest().decode(payload); - const url = parsePayTemplateUri(req.talerPayTemplateUri); - const templateDetails: MerchantUsingTemplateDetails = {}; - if (!url) { - throw Error("invalid taler-template URI"); - } - if ( - url.templateParams.amount !== undefined && - typeof url.templateParams.amount === "string" - ) { - templateDetails.amount = (req.templateParams.amount ?? - url.templateParams.amount) as AmountString | undefined; - } - if ( - url.templateParams.summary !== undefined && - typeof url.templateParams.summary === "string" - ) { - templateDetails.summary = - req.templateParams.summary ?? url.templateParams.summary; - } - const reqUrl = new URL( - `templates/${url.templateId}`, - url.merchantBaseUrl, - ); - const httpReq = await ws.http.fetch(reqUrl.href, { - method: "POST", - body: templateDetails, - }); - const resp = await readSuccessResponseJsonOrThrow( - httpReq, - codecForMerchantPostOrderResponse(), - ); - - const payUri = constructPayUri( - url.merchantBaseUrl, - resp.order_id, - "", - resp.token, - ); - - return await preparePayForUri(ws, payUri); + return preparePayForTemplate(wex, req); } case WalletApiOperation.ConfirmPay: { const req = codecForConfirmPayRequest().decode(payload); - let proposalId; + let transactionId; if (req.proposalId) { // legacy client support - proposalId = req.proposalId; + transactionId = constructTransactionIdentifier({ + tag: TransactionType.Payment, + proposalId: req.proposalId, + }); } else if (req.transactionId) { - const txIdParsed = parseTransactionIdentifier(req.transactionId); - if (txIdParsed?.tag != TransactionType.Payment) { - throw Error("payment transaction ID required"); - } - proposalId = txIdParsed.proposalId; + transactionId = req.transactionId; } else { throw Error("transactionId or (deprecated) proposalId required"); } - return await confirmPay(ws, proposalId, req.sessionId); + return await confirmPay(wex, transactionId, req.sessionId); } case WalletApiOperation.AbortTransaction: { const req = codecForAbortTransaction().decode(payload); - await abortTransaction(ws, req.transactionId); + await abortTransaction(wex, req.transactionId); return {}; } case WalletApiOperation.SuspendTransaction: { const req = codecForSuspendTransaction().decode(payload); - await suspendTransaction(ws, req.transactionId); + await suspendTransaction(wex, req.transactionId); return {}; } + case WalletApiOperation.GetActiveTasks: { + const allTasksId = wex.taskScheduler.getActiveTasks(); + + const tasksInfo = await Promise.all( + allTasksId.map(async (id) => { + return await wex.db.runReadOnlyTx( + { storeNames: ["operationRetries"] }, + async (tx) => { + return tx.operationRetries.get(id); + }, + ); + }), + ); + + const tasks = allTasksId.map((taskId, i): ActiveTask => { + const transaction = convertTaskToTransactionId(taskId); + const d = tasksInfo[i]; + + const firstTry = !d + ? undefined + : timestampAbsoluteFromDb(d.retryInfo.firstTry); + const nextTry = !d + ? undefined + : timestampAbsoluteFromDb(d.retryInfo.nextRetry); + const counter = d?.retryInfo.retryCounter; + const lastError = d?.lastError; + + return { + taskId: taskId, + retryCounter: counter, + firstTry, + nextTry, + lastError, + transaction, + }; + }); + return { tasks }; + } case WalletApiOperation.FailTransaction: { const req = codecForFailTransactionRequest().decode(payload); - await failTransaction(ws, req.transactionId); + await failTransaction(wex, req.transactionId); return {}; } case WalletApiOperation.ResumeTransaction: { const req = codecForResumeTransaction().decode(payload); - await resumeTransaction(ws, req.transactionId); + await resumeTransaction(wex, req.transactionId); return {}; } case WalletApiOperation.DumpCoins: { - return await dumpCoins(ws); + return await dumpCoins(wex); } case WalletApiOperation.SetCoinSuspended: { const req = codecForSetCoinSuspendedRequest().decode(payload); - await setCoinSuspended(ws, req.coinPub, req.suspended); + await setCoinSuspended(wex, req.coinPub, req.suspended); return {}; } case WalletApiOperation.TestingGetSampleTransactions: return { transactions: sampleWalletCoreTransactions }; case WalletApiOperation.ForceRefresh: { const req = codecForForceRefreshRequest().decode(payload); - if (req.coinPubList.length == 0) { - throw Error("refusing to create empty refresh group"); - } - const refreshGroupId = await ws.db - .mktx((x) => [ - x.refreshGroups, - x.coinAvailability, - x.denominations, - x.coins, - ]) - .runReadWrite(async (tx) => { - let coinPubs: CoinRefreshRequest[] = []; - for (const c of req.coinPubList) { - const coin = await tx.coins.get(c); - if (!coin) { - throw Error(`coin (pubkey ${c}) not found`); - } - const denom = await ws.getDenomInfo( - ws, - tx, - coin.exchangeBaseUrl, - coin.denomPubHash, - ); - checkDbInvariant(!!denom); - coinPubs.push({ - coinPub: c, - amount: denom?.value, - }); - } - return await createRefreshGroup( - ws, - tx, - Amounts.currencyOf(coinPubs[0].amount), - coinPubs, - RefreshReason.Manual, - ); - }); - processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => { - logger.error(x); - }); - return { - refreshGroupId, - }; - } - case WalletApiOperation.PrepareReward: { - const req = codecForPrepareRewardRequest().decode(payload); - return await prepareTip(ws, req.talerRewardUri); + return await forceRefresh(wex, req); } case WalletApiOperation.StartRefundQueryForUri: { const req = codecForPrepareRefundRequest().decode(payload); - return await startRefundQueryForUri(ws, req.talerRefundUri); + return await startRefundQueryForUri(wex, req.talerRefundUri); } case WalletApiOperation.StartRefundQuery: { const req = codecForStartRefundQueryRequest().decode(payload); @@ -1294,38 +1129,30 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( if (txIdParsed.tag !== TransactionType.Payment) { throw Error("expected payment transaction ID"); } - await startQueryRefund(ws, txIdParsed.proposalId); + await startQueryRefund(wex, txIdParsed.proposalId); return {}; } - case WalletApiOperation.AcceptReward: { - const req = codecForAcceptTipRequest().decode(payload); - return await acceptTip(ws, req.walletRewardId); - } case WalletApiOperation.AddBackupProvider: { const req = codecForAddBackupProviderRequest().decode(payload); - return await addBackupProvider(ws, req); + return await addBackupProvider(wex, req); } case WalletApiOperation.RunBackupCycle: { const req = codecForRunBackupCycle().decode(payload); - await runBackupCycle(ws, req); + await runBackupCycle(wex, req); return {}; } case WalletApiOperation.RemoveBackupProvider: { const req = codecForRemoveBackupProvider().decode(payload); - await removeBackupProvider(ws, req); + await removeBackupProvider(wex, req); return {}; } case WalletApiOperation.ExportBackupRecovery: { - const resp = await getBackupRecovery(ws); + const resp = await getBackupRecovery(wex); return resp; } case WalletApiOperation.TestingWaitTransactionState: { const req = payload as TestingWaitTransactionRequest; - await waitTransactionState(ws, req.transactionId, req.txState); - return {}; - } - case WalletApiOperation.TestingWaitTasksProcessed: { - await waitUntilTasksProcessed(ws); + await waitTransactionState(wex, req.transactionId, req.txState); return {}; } case WalletApiOperation.GetCurrencySpecification: { @@ -1361,12 +1188,12 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( } const defaultResp: GetCurrencySpecificationResponse = { currencySpecification: { - name: "Unknown", + name: req.scope.currency, num_fractional_input_digits: 2, num_fractional_normal_digits: 2, num_fractional_trailing_zero_digits: 2, alt_unit_names: { - "0": "??", + "0": req.scope.currency, }, }, }; @@ -1374,7 +1201,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( } case WalletApiOperation.ImportBackupRecovery: { const req = codecForAny().decode(payload); - await loadBackupRecovery(ws, req); + await loadBackupRecovery(wex, req); return {}; } // case WalletApiOperation.GetPlanForOperation: { @@ -1383,31 +1210,31 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( // } case WalletApiOperation.ConvertDepositAmount: { const req = codecForConvertAmountRequest.decode(payload); - return await convertDepositAmount(ws, req); + return await convertDepositAmount(wex, req); } case WalletApiOperation.GetMaxDepositAmount: { const req = codecForGetAmountRequest.decode(payload); - return await getMaxDepositAmount(ws, req); + return await getMaxDepositAmount(wex, req); } case WalletApiOperation.ConvertPeerPushAmount: { const req = codecForConvertAmountRequest.decode(payload); - return await convertPeerPushAmount(ws, req); + return await convertPeerPushAmount(wex, req); } case WalletApiOperation.GetMaxPeerPushAmount: { const req = codecForGetAmountRequest.decode(payload); - return await getMaxPeerPushAmount(ws, req); + return await getMaxPeerPushAmount(wex, req); } case WalletApiOperation.ConvertWithdrawalAmount: { const req = codecForConvertAmountRequest.decode(payload); - return await convertWithdrawalAmount(ws, req); + return await convertWithdrawalAmount(wex, req); } case WalletApiOperation.GetBackupInfo: { - const resp = await getBackupInfo(ws); + const resp = await getBackupInfo(wex); return resp; } case WalletApiOperation.PrepareDeposit: { const req = codecForPrepareDepositRequest().decode(payload); - return await prepareDepositGroup(ws, req); + return await checkDepositGroup(wex, req); } case WalletApiOperation.GenerateDepositGroupTxId: return { @@ -1415,99 +1242,282 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( }; case WalletApiOperation.CreateDepositGroup: { const req = codecForCreateDepositGroupRequest().decode(payload); - return await createDepositGroup(ws, req); + return await createDepositGroup(wex, req); } case WalletApiOperation.DeleteTransaction: { const req = codecForDeleteTransactionRequest().decode(payload); - await deleteTransaction(ws, req.transactionId); + await deleteTransaction(wex, req.transactionId); return {}; } case WalletApiOperation.RetryTransaction: { const req = codecForRetryTransactionRequest().decode(payload); - await retryTransaction(ws, req.transactionId); + await retryTransaction(wex, req.transactionId); return {}; } case WalletApiOperation.SetWalletDeviceId: { const req = codecForSetWalletDeviceIdRequest().decode(payload); - await setWalletDeviceId(ws, req.walletDeviceId); + await setWalletDeviceId(wex, req.walletDeviceId); return {}; } - case WalletApiOperation.ListCurrencies: { - // FIXME: Remove / change to scoped currency approach. - return { - trustedAuditors: [], - trustedExchanges: [], - }; - } case WalletApiOperation.TestCrypto: { - return await ws.cryptoApi.hashString({ str: "hello world" }); + return await wex.cryptoApi.hashString({ str: "hello world" }); } - case WalletApiOperation.ClearDb: - await clearDatabase(ws.db.idbHandle()); + case WalletApiOperation.ClearDb: { + wex.ws.clearAllCaches(); + await clearDatabase(wex.db.idbHandle()); return {}; + } case WalletApiOperation.Recycle: { throw Error("not implemented"); return {}; } case WalletApiOperation.ExportDb: { - const dbDump = await exportDb(ws.idb); + const dbDump = await exportDb(wex.ws.idb); return dbDump; } + case WalletApiOperation.ListGlobalCurrencyExchanges: { + const resp: ListGlobalCurrencyExchangesResponse = { + exchanges: [], + }; + await wex.db.runReadOnlyTx( + { storeNames: ["globalCurrencyExchanges"] }, + async (tx) => { + const gceList = await tx.globalCurrencyExchanges.iter().toArray(); + for (const gce of gceList) { + resp.exchanges.push({ + currency: gce.currency, + exchangeBaseUrl: gce.exchangeBaseUrl, + exchangeMasterPub: gce.exchangeMasterPub, + }); + } + }, + ); + return resp; + } + case WalletApiOperation.ListGlobalCurrencyAuditors: { + const resp: ListGlobalCurrencyAuditorsResponse = { + auditors: [], + }; + await wex.db.runReadOnlyTx( + { storeNames: ["globalCurrencyAuditors"] }, + async (tx) => { + const gcaList = await tx.globalCurrencyAuditors.iter().toArray(); + for (const gca of gcaList) { + resp.auditors.push({ + currency: gca.currency, + auditorBaseUrl: gca.auditorBaseUrl, + auditorPub: gca.auditorPub, + }); + } + }, + ); + return resp; + } + case WalletApiOperation.AddGlobalCurrencyExchange: { + const req = codecForAddGlobalCurrencyExchangeRequest().decode(payload); + await wex.db.runReadWriteTx( + { storeNames: ["globalCurrencyExchanges"] }, + async (tx) => { + const key = [ + req.currency, + req.exchangeBaseUrl, + req.exchangeMasterPub, + ]; + const existingRec = + await tx.globalCurrencyExchanges.indexes.byCurrencyAndUrlAndPub.get( + key, + ); + if (existingRec) { + return; + } + wex.ws.exchangeCache.clear(); + await tx.globalCurrencyExchanges.add({ + currency: req.currency, + exchangeBaseUrl: req.exchangeBaseUrl, + exchangeMasterPub: req.exchangeMasterPub, + }); + }, + ); + return {}; + } + case WalletApiOperation.RemoveGlobalCurrencyExchange: { + const req = codecForRemoveGlobalCurrencyExchangeRequest().decode(payload); + await wex.db.runReadWriteTx( + { storeNames: ["globalCurrencyExchanges"] }, + async (tx) => { + const key = [ + req.currency, + req.exchangeBaseUrl, + req.exchangeMasterPub, + ]; + const existingRec = + await tx.globalCurrencyExchanges.indexes.byCurrencyAndUrlAndPub.get( + key, + ); + if (!existingRec) { + return; + } + wex.ws.exchangeCache.clear(); + checkDbInvariant(!!existingRec.id); + await tx.globalCurrencyExchanges.delete(existingRec.id); + }, + ); + return {}; + } + case WalletApiOperation.AddGlobalCurrencyAuditor: { + const req = codecForAddGlobalCurrencyAuditorRequest().decode(payload); + await wex.db.runReadWriteTx( + { storeNames: ["globalCurrencyAuditors"] }, + async (tx) => { + const key = [req.currency, req.auditorBaseUrl, req.auditorPub]; + const existingRec = + await tx.globalCurrencyAuditors.indexes.byCurrencyAndUrlAndPub.get( + key, + ); + if (existingRec) { + return; + } + await tx.globalCurrencyAuditors.add({ + currency: req.currency, + auditorBaseUrl: req.auditorBaseUrl, + auditorPub: req.auditorPub, + }); + wex.ws.exchangeCache.clear(); + }, + ); + return {}; + } + case WalletApiOperation.TestingWaitTasksDone: { + await waitTasksDone(wex); + return {}; + } + case WalletApiOperation.RemoveGlobalCurrencyAuditor: { + const req = codecForRemoveGlobalCurrencyAuditorRequest().decode(payload); + await wex.db.runReadWriteTx( + { storeNames: ["globalCurrencyAuditors"] }, + async (tx) => { + const key = [req.currency, req.auditorBaseUrl, req.auditorPub]; + const existingRec = + await tx.globalCurrencyAuditors.indexes.byCurrencyAndUrlAndPub.get( + key, + ); + if (!existingRec) { + return; + } + checkDbInvariant(!!existingRec.id); + await tx.globalCurrencyAuditors.delete(existingRec.id); + wex.ws.exchangeCache.clear(); + }, + ); + return {}; + } case WalletApiOperation.ImportDb: { const req = codecForImportDbRequest().decode(payload); - await importDb(ws.db.idbHandle(), req.dump); + await importDb(wex.db.idbHandle(), req.dump); return []; } case WalletApiOperation.CheckPeerPushDebit: { const req = codecForCheckPeerPushDebitRequest().decode(payload); - return await checkPeerPushDebit(ws, req); + return await checkPeerPushDebit(wex, req); } case WalletApiOperation.InitiatePeerPushDebit: { const req = codecForInitiatePeerPushDebitRequest().decode(payload); - return await initiatePeerPushDebit(ws, req); + return await initiatePeerPushDebit(wex, req); } case WalletApiOperation.PreparePeerPushCredit: { const req = codecForPreparePeerPushCreditRequest().decode(payload); - return await preparePeerPushCredit(ws, req); + return await preparePeerPushCredit(wex, req); } case WalletApiOperation.ConfirmPeerPushCredit: { const req = codecForConfirmPeerPushPaymentRequest().decode(payload); - return await confirmPeerPushCredit(ws, req); + return await confirmPeerPushCredit(wex, req); } case WalletApiOperation.CheckPeerPullCredit: { const req = codecForPreparePeerPullPaymentRequest().decode(payload); - return await checkPeerPullPaymentInitiation(ws, req); + return await checkPeerPullPaymentInitiation(wex, req); } case WalletApiOperation.InitiatePeerPullCredit: { const req = codecForInitiatePeerPullPaymentRequest().decode(payload); - return await initiatePeerPullPayment(ws, req); + return await initiatePeerPullPayment(wex, req); } case WalletApiOperation.PreparePeerPullDebit: { const req = codecForCheckPeerPullPaymentRequest().decode(payload); - return await preparePeerPullDebit(ws, req); + return await preparePeerPullDebit(wex, req); } case WalletApiOperation.ConfirmPeerPullDebit: { const req = codecForAcceptPeerPullPaymentRequest().decode(payload); - return await confirmPeerPullDebit(ws, req); + return await confirmPeerPullDebit(wex, req); } case WalletApiOperation.ApplyDevExperiment: { const req = codecForApplyDevExperiment().decode(payload); - await applyDevExperiment(ws, req.devExperimentUri); + await applyDevExperiment(wex, req.devExperimentUri); + return {}; + } + case WalletApiOperation.Shutdown: { + wex.ws.stop(); return {}; } case WalletApiOperation.GetVersion: { - return getVersion(ws); + return getVersion(wex); } case WalletApiOperation.TestingWaitTransactionsFinal: - return await waitUntilTransactionsFinal(ws); + return await waitUntilAllTransactionsFinal(wex); case WalletApiOperation.TestingWaitRefreshesFinal: - return await waitUntilRefreshesDone(ws); + return await waitUntilRefreshesDone(wex); case WalletApiOperation.TestingSetTimetravel: { const req = codecForTestingSetTimetravelRequest().decode(payload); setDangerousTimetravel(req.offsetMs); - ws.workAvailable.trigger(); + await wex.taskScheduler.reload(); + return {}; + } + case WalletApiOperation.DeleteExchange: { + const req = codecForDeleteExchangeRequest().decode(payload); + await deleteExchange(wex, req); return {}; } + case WalletApiOperation.GetExchangeResources: { + const req = codecForGetExchangeResourcesRequest().decode(payload); + return await getExchangeResources(wex, req.exchangeBaseUrl); + } + case WalletApiOperation.CanonicalizeBaseUrl: { + const req = codecForCanonicalizeBaseUrlRequest().decode(payload); + return { + url: canonicalizeBaseUrl(req.url), + }; + } + case WalletApiOperation.TestingInfiniteTransactionLoop: { + const myDelayMs = (payload as any).delayMs ?? 5; + const shouldFetch = !!(payload as any).shouldFetch; + const doFetch = async () => { + while (1) { + const url = + "https://exchange.demo.taler.net/reserves/01PMMB9PJN0QBWAFBXV6R0KNJJMAKXCV4D6FDG0GJFDJQXGYP32G?timeout_ms=30000"; + logger.info(`fetching ${url}`); + const res = await wex.http.fetch(url); + logger.info(`fetch result ${res.status}`); + } + }; + if (shouldFetch) { + // In the background! + doFetch(); + } + let loopCount = 0; + while (true) { + logger.info(`looping test write tx, iteration ${loopCount}`); + await wex.db.runReadWriteTx({ storeNames: ["config"] }, async (tx) => { + await tx.config.put({ + key: ConfigRecordKey.TestLoopTx, + value: loopCount, + }); + }); + if (myDelayMs != 0) { + await new Promise<void>((resolve, reject) => { + setTimeout(() => resolve(), myDelayMs); + }); + } + loopCount = (loopCount + 1) % (Number.MAX_SAFE_INTEGER - 1); + } + } // default: // assertUnreachable(operation); } @@ -1520,21 +1530,62 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( ); } -export function getVersion(ws: InternalWalletState): WalletCoreVersion { +export function getVersion(wex: WalletExecutionContext): WalletCoreVersion { const result: WalletCoreVersion = { + implementationSemver: walletCoreBuildInfo.implementationSemver, + implementationGitHash: walletCoreBuildInfo.implementationGitHash, hash: undefined, - version: WALLET_CORE_API_IMPLEMENTATION_VERSION, + version: WALLET_CORE_API_PROTOCOL_VERSION, exchange: WALLET_EXCHANGE_PROTOCOL_VERSION, merchant: WALLET_MERCHANT_PROTOCOL_VERSION, bankConversionApiRange: WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION, bankIntegrationApiRange: WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, corebankApiRange: WALLET_COREBANK_API_PROTOCOL_VERSION, bank: WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, - devMode: false, + devMode: wex.ws.config.testing.devModeActive, }; return result; } +export function getObservedWalletExecutionContext( + ws: InternalWalletState, + cancellationToken: CancellationToken, + oc: ObservabilityContext, +): WalletExecutionContext { + const wex: WalletExecutionContext = { + ws, + cancellationToken, + cryptoApi: observeTalerCrypto(ws.cryptoApi, oc), + db: new ObservableDbAccess(ws.db, oc), + http: new ObservableHttpClientLibrary(ws.http, oc), + taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), + oc, + }; + return wex; +} + +export function getNormalWalletExecutionContext( + ws: InternalWalletState, + cancellationToken: CancellationToken, + oc: ObservabilityContext, +): WalletExecutionContext { + const wex: WalletExecutionContext = { + ws, + cancellationToken, + cryptoApi: ws.cryptoApi, + db: ws.db, + get http() { + if (ws.initCalled) { + return ws.http; + } + throw Error("wallet not initialized"); + }, + taskScheduler: ws.taskScheduler, + oc, + }; + return wex; +} + /** * Handle a request to the wallet-core API. */ @@ -1544,8 +1595,48 @@ async function handleCoreApiRequest( id: string, payload: unknown, ): Promise<CoreApiResponse> { + let wex: WalletExecutionContext; + let oc: ObservabilityContext; + + const cts = CancellationToken.create(); + + if (ws.initCalled && ws.config.testing.emitObservabilityEvents) { + oc = { + observe(evt) { + ws.notify({ + type: NotificationType.RequestObservabilityEvent, + operation, + requestId: id, + event: evt, + }); + }, + }; + + wex = getObservedWalletExecutionContext(ws, cts.token, oc); + } else { + oc = { + observe(evt) {}, + }; + wex = getNormalWalletExecutionContext(ws, cts.token, oc); + } + try { - const result = await dispatchRequestInternal(ws, operation as any, payload); + const start = performanceNow(); + await ws.ensureWalletDbOpen(); + oc.observe({ + type: ObservabilityEventType.RequestStart, + }); + const result = await dispatchRequestInternal( + wex, + cts, + operation as any, + payload, + ); + const end = performanceNow(); + oc.observe({ + type: ObservabilityEventType.RequestFinishSuccess, + durationMs: Number((end - start) / 1000n / 1000n), + }); return { type: "response", operation, @@ -1557,6 +1648,9 @@ async function handleCoreApiRequest( logger.info( `finished wallet core request ${operation} with error: ${j2s(err)}`, ); + oc.observe({ + type: ObservabilityEventType.RequestFinishError, + }); return { type: "error", operation, @@ -1566,6 +1660,34 @@ async function handleCoreApiRequest( } } +export function applyRunConfigDefaults( + wcp?: PartialWalletRunConfig, +): WalletRunConfig { + return { + builtin: { + exchanges: wcp?.builtin?.exchanges ?? [ + { + exchangeBaseUrl: "https://exchange.demo.taler.net/", + currencyHint: "KUDOS", + }, + ], + }, + features: { + allowHttp: wcp?.features?.allowHttp ?? false, + }, + testing: { + denomselAllowLate: wcp?.testing?.denomselAllowLate ?? false, + devModeActive: wcp?.testing?.devModeActive ?? false, + insecureTrustExchange: wcp?.testing?.insecureTrustExchange ?? false, + preventThrottling: wcp?.testing?.preventThrottling ?? false, + skipDefaults: wcp?.testing?.skipDefaults ?? false, + emitObservabilityEvents: wcp?.testing?.emitObservabilityEvents ?? false, + }, + }; +} + +export type HttpFactory = (config: WalletRunConfig) => HttpRequestLibrary; + /** * Public handle to a running wallet. */ @@ -1575,17 +1697,15 @@ export class Wallet { private constructor( idb: IDBFactory, - http: HttpRequestLibrary, + httpFactory: HttpFactory, timer: TimerAPI, cryptoWorkerFactory: CryptoWorkerFactory, - config?: WalletConfigParameter, ) { - this.ws = new InternalWalletStateImpl( + this.ws = new InternalWalletState( idb, - http, + httpFactory, timer, cryptoWorkerFactory, - Wallet.getEffectiveConfig(config), ); } @@ -1598,61 +1718,19 @@ export class Wallet { static async create( idb: IDBFactory, - http: HttpRequestLibrary, + httpFactory: HttpFactory, timer: TimerAPI, cryptoWorkerFactory: CryptoWorkerFactory, - config?: WalletConfigParameter, ): Promise<Wallet> { - const w = new Wallet(idb, http, timer, cryptoWorkerFactory, config); + const w = new Wallet(idb, httpFactory, timer, cryptoWorkerFactory); w._client = await getClientFromWalletState(w.ws); return w; } - public static defaultConfig: Readonly<WalletConfig> = { - builtin: { - exchanges: [ - { - exchangeBaseUrl: "https://exchange.demo.taler.net/", - currencyHint: "KUDOS", - }, - ], - }, - features: { - allowHttp: false, - }, - testing: { - preventThrottling: false, - devModeActive: false, - insecureTrustExchange: false, - denomselAllowLate: false, - skipDefaults: false, - }, - }; - - static getEffectiveConfig( - param?: WalletConfigParameter, - ): Readonly<WalletConfig> { - return deepMerge(Wallet.defaultConfig, param ?? {}); - } - addNotificationListener(f: (n: WalletNotification) => void): CancelFn { return this.ws.addNotificationListener(f); } - stop(): void { - this.ws.stop(); - } - - async runPending(): Promise<void> { - await this.ws.ensureWalletDbOpen(); - return runPending(this.ws); - } - - async runTaskLoop(opts?: RetryLoopOpts): Promise<TaskLoopResult> { - await this.ws.ensureWalletDbOpen(); - return runTaskLoop(this.ws, opts); - } - async handleCoreApiRequest( operation: string, id: string, @@ -1663,49 +1741,107 @@ export class Wallet { } } +export interface DevExperimentState { + blockRefreshes?: boolean; +} + +export class Cache<T> { + private map: Map<string, [AbsoluteTime, T]> = new Map(); + + constructor( + private maxCapacity: number, + private cacheDuration: Duration, + ) {} + + get(key: string): T | undefined { + const r = this.map.get(key); + if (!r) { + return undefined; + } + + if (AbsoluteTime.isExpired(r[0])) { + this.map.delete(key); + return undefined; + } + + return r[1]; + } + + clear(): void { + this.map.clear(); + } + + put(key: string, value: T): void { + if (this.map.size > this.maxCapacity) { + this.map.clear(); + } + const expiry = AbsoluteTime.addDuration( + AbsoluteTime.now(), + this.cacheDuration, + ); + this.map.set(key, [expiry, value]); + } +} + +/** + * Implementation of triggers for the wallet DB. + */ +class WalletDbTriggerSpec implements TriggerSpec { + constructor(public ws: InternalWalletState) {} + + afterCommit(info: AfterCommitInfo): void { + if (info.mode !== "readwrite") { + return; + } + logger.info( + `in after commit callback for readwrite, modified ${j2s([ + ...info.modifiedStores, + ])}`, + ); + const modified = info.accessedStores; + if ( + modified.has(WalletStoresV1.exchanges.storeName) || + modified.has(WalletStoresV1.exchangeDetails.storeName) || + modified.has(WalletStoresV1.denominations.storeName) || + modified.has(WalletStoresV1.globalCurrencyAuditors.storeName) || + modified.has(WalletStoresV1.globalCurrencyExchanges.storeName) + ) { + this.ws.clearAllCaches(); + } + } +} + /** * Internal state of the wallet. * * This ties together all the operation implementations. */ -class InternalWalletStateImpl implements InternalWalletState { - /** - * @see {@link InternalWalletState.activeLongpoll} - */ - activeLongpoll: ActiveLongpollInfo = {}; - +export class InternalWalletState { cryptoApi: TalerCryptoInterface; cryptoDispatcher: CryptoDispatcher; - merchantInfoCache: Record<string, MerchantInfo> = {}; - readonly timerGroup: TimerGroup; workAvailable = new AsyncCondition(); stopped = false; - listeners: NotificationListener[] = []; + private listeners: NotificationListener[] = []; initCalled = false; - exchangeOps: ExchangeOperations = { - getExchangeDetails, - fetchFreshExchange, - }; - - recoupOps: RecoupOperations = { - createRecoupGroup, - }; - - merchantOps: MerchantOperations = { - getMerchantInfo, - }; + refreshCostCache: Cache<AmountJson> = new Cache( + 1000, + Duration.fromSpec({ minutes: 1 }), + ); - refreshOps: RefreshOperations = { - createRefreshGroup, - }; + denomInfoCache: Cache<DenominationInfo> = new Cache( + 1000, + Duration.fromSpec({ minutes: 1 }), + ); - // FIXME: Use an LRU cache here. - private denomCache: Record<string, DenominationInfo> = {}; + exchangeCache: Cache<ReadyExchangeSummary> = new Cache( + 1000, + Duration.fromSpec({ minutes: 1 }), + ); /** * Promises that are waiting for a particular resource. @@ -1717,153 +1853,106 @@ class InternalWalletStateImpl implements InternalWalletState { */ private resourceLocks: Set<string> = new Set(); - isTaskLoopRunning: boolean = false; + taskScheduler: TaskScheduler = new TaskSchedulerImpl(this); + + private _config: Readonly<WalletRunConfig> | undefined; - config: Readonly<WalletConfig>; + private _indexedDbHandle: IDBDatabase | undefined = undefined; - private _db: DbAccess<typeof WalletStoresV1> | undefined = undefined; + private _dbAccessHandle: DbAccess<typeof WalletStoresV1> | undefined; + + private _http: HttpRequestLibrary | undefined = undefined; get db(): DbAccess<typeof WalletStoresV1> { - if (!this._db) { + if (!this._dbAccessHandle) { + this._dbAccessHandle = this.createDbAccessHandle( + CancellationToken.CONTINUE, + ); + } + return this._dbAccessHandle; + } + + devExperimentState: DevExperimentState = {}; + + clientCancellationMap: Map<string, CancellationToken.Source> = new Map(); + + clearAllCaches(): void { + this.exchangeCache.clear(); + this.denomInfoCache.clear(); + this.refreshCostCache.clear(); + } + + initWithConfig(newConfig: WalletRunConfig): void { + this._config = newConfig; + + logger.info(`setting new config to ${j2s(newConfig)}`); + + this._http = this.httpFactory(newConfig); + + if (this.config.testing.devModeActive) { + this._http = new DevExperimentHttpLib(this.http); + } + } + + createDbAccessHandle( + cancellationToken: CancellationToken, + ): DbAccess<typeof WalletStoresV1> { + if (!this._indexedDbHandle) { throw Error("db not initialized"); } - return this._db; + return new DbAccessImpl( + this._indexedDbHandle, + WalletStoresV1, + new WalletDbTriggerSpec(this), + cancellationToken, + ); + } + + get config(): WalletRunConfig { + if (!this._config) { + throw Error("config not initialized"); + } + return this._config; + } + + get http(): HttpRequestLibrary { + if (!this._http) { + throw Error("wallet not initialized"); + } + return this._http; } constructor( public idb: IDBFactory, - public http: HttpRequestLibrary, + private httpFactory: HttpFactory, public timer: TimerAPI, cryptoWorkerFactory: CryptoWorkerFactory, - configParam: WalletConfig, ) { this.cryptoDispatcher = new CryptoDispatcher(cryptoWorkerFactory); this.cryptoApi = this.cryptoDispatcher.cryptoApi; this.timerGroup = new TimerGroup(timer); - this.config = configParam; - if (this.config.testing.devModeActive) { - this.http = new DevExperimentHttpLib(this.http); - } } async ensureWalletDbOpen(): Promise<void> { - if (this._db) { + if (this._indexedDbHandle) { return; } const myVersionChange = async (): Promise<void> => { logger.info("version change requested for Taler DB"); }; - const myDb = await openTalerDatabase(this.idb, myVersionChange); - this._db = myDb; - } - - async getTransactionState( - ws: InternalWalletState, - tx: GetReadOnlyAccess<typeof WalletStoresV1>, - transactionId: string, - ): Promise<TransactionState | undefined> { - const parsedTxId = parseTransactionIdentifier(transactionId); - if (!parsedTxId) { - throw Error("invalid tx identifier"); - } - switch (parsedTxId.tag) { - case TransactionType.Deposit: { - const rec = await tx.depositGroups.get(parsedTxId.depositGroupId); - if (!rec) { - return undefined; - } - return computeDepositTransactionStatus(rec); - } - case TransactionType.InternalWithdrawal: - case TransactionType.Withdrawal: { - const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId); - if (!rec) { - return undefined; - } - return computeWithdrawalTransactionStatus(rec); - } - case TransactionType.Payment: { - const rec = await tx.purchases.get(parsedTxId.proposalId); - if (!rec) { - return; - } - return computePayMerchantTransactionState(rec); - } - case TransactionType.Refund: { - const rec = await tx.refundGroups.get(parsedTxId.refundGroupId); - if (!rec) { - return undefined; - } - return computeRefundTransactionState(rec); - } - case TransactionType.PeerPullCredit: - const rec = await tx.peerPullCredit.get(parsedTxId.pursePub); - if (!rec) { - return undefined; - } - return computePeerPullCreditTransactionState(rec); - case TransactionType.PeerPullDebit: { - const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId); - if (!rec) { - return undefined; - } - return computePeerPullDebitTransactionState(rec); - } - case TransactionType.PeerPushCredit: { - const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId); - if (!rec) { - return undefined; - } - return computePeerPushCreditTransactionState(rec); - } - case TransactionType.PeerPushDebit: { - const rec = await tx.peerPushDebit.get(parsedTxId.pursePub); - if (!rec) { - return undefined; - } - return computePeerPushDebitTransactionState(rec); - } - case TransactionType.Refresh: { - const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId); - if (!rec) { - return undefined; - } - return computeRefreshTransactionState(rec); - } - case TransactionType.Reward: { - const rec = await tx.rewards.get(parsedTxId.walletRewardId); - if (!rec) { - return undefined; - } - return computeRewardTransactionStatus(rec); - } - default: - assertUnreachable(parsedTxId); + try { + const myDb = await openTalerDatabase(this.idb, myVersionChange); + this._indexedDbHandle = myDb; + } catch (e) { + logger.error("error writing to database during initialization"); + throw TalerError.fromDetail(TalerErrorCode.WALLET_DB_UNAVAILABLE, { + innerError: getErrorDetailFromException(e), + }); } } - async getDenomInfo( - ws: InternalWalletState, - tx: GetReadWriteAccess<{ - denominations: typeof WalletStoresV1.denominations; - }>, - exchangeBaseUrl: string, - denomPubHash: string, - ): Promise<DenominationInfo | undefined> { - const key = `${exchangeBaseUrl}:${denomPubHash}`; - const cached = this.denomCache[key]; - if (cached) { - return cached; - } - const d = await tx.denominations.get([exchangeBaseUrl, denomPubHash]); - if (d) { - return DenominationRecord.toDenomInfo(d); - } - return undefined; - } - notify(n: WalletNotification): void { - logger.trace("Notification", j2s(n)); + logger.trace(`Notification: ${j2s(n)}`); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); setTimeout(() => { @@ -1890,11 +1979,9 @@ class InternalWalletStateImpl implements InternalWalletState { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); this.cryptoDispatcher.stop(); - for (const key of Object.keys(this.activeLongpoll)) { - logger.trace(`cancelling active longpoll ${key}`); - this.activeLongpoll[key].cancel(); - delete this.activeLongpoll[key]; - } + this.taskScheduler.shutdown().catch((e) => { + logger.warn(`shutdown failed: ${safeStringifyException(e)}`); + }); } /** @@ -1929,48 +2016,11 @@ class InternalWalletStateImpl implements InternalWalletState { } finally { for (const token of tokens) { this.resourceLocks.delete(token); - let waiter = (this.resourceWaiters[token] ?? []).shift(); + const waiter = (this.resourceWaiters[token] ?? []).shift(); if (waiter) { waiter.resolve(); } } } } - - ensureTaskLoopRunning(): void { - if (this.isTaskLoopRunning) { - return; - } - runTaskLoop(this) - .catch((e) => { - logger.error("error running task loop"); - logger.error(`err: ${e}`); - }) - .then(() => { - logger.info("done running task loop"); - }); - } -} - -/** - * Take the full object as template, create a new result with all the values. - * Use the override object to change the values in the result - * return result - * @param full - * @param override - * @returns - */ -function deepMerge<T extends object>(full: T, override: object): T { - const keys = Object.keys(full); - const result = { ...full }; - for (const k of keys) { - // @ts-ignore - const newVal = override[k]; - if (newVal === undefined) continue; - // @ts-ignore - result[k] = - // @ts-ignore - typeof newVal === "object" ? deepMerge(full[k], newVal) : newVal; - } - return result; } |