summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/wallet.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/wallet.ts')
-rw-r--r--packages/taler-wallet-core/src/wallet.ts1862
1 files changed, 956 insertions, 906 deletions
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 2d422e59c..ea47ffad7 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -22,13 +22,16 @@
/**
* Imports.
*/
-import { IDBFactory } from "@gnu-taler/idb-bridge";
+import { IDBDatabase, IDBFactory } from "@gnu-taler/idb-bridge";
import {
AbsoluteTime,
+ ActiveTask,
+ AmountJson,
AmountString,
Amounts,
+ AsyncCondition,
+ CancellationToken,
CoinDumpJson,
- CoinRefreshRequest,
CoinStatus,
CoreApiResponse,
CreateStoredBackupResponse,
@@ -40,43 +43,55 @@ import {
InitResponse,
KnownBankAccounts,
KnownBankAccountsInfo,
+ ListGlobalCurrencyAuditorsResponse,
+ ListGlobalCurrencyExchangesResponse,
Logger,
- WithdrawalDetailsForAmount,
- MerchantUsingTemplateDetails,
NotificationType,
+ ObservabilityContext,
+ ObservabilityEventType,
+ ObservableHttpClientLibrary,
+ OpenedPromise,
+ PartialWalletRunConfig,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
RecoverStoredBackupRequest,
- RefreshReason,
- ScopeType,
StoredBackupList,
TalerError,
TalerErrorCode,
+ TalerProtocolTimestamp,
TalerUriAction,
- TaskThrottler,
+ TestingGetDenomStatsResponse,
+ TestingListTasksForTransactionsResponse,
TestingWaitTransactionRequest,
- TransactionState,
+ TimerAPI,
+ TimerGroup,
TransactionType,
- URL,
ValidateIbanResponse,
WalletCoreVersion,
WalletNotification,
+ WalletRunConfig,
+ canonicalizeBaseUrl,
+ checkDbInvariant,
codecForAbortTransaction,
codecForAcceptBankIntegratedWithdrawalRequest,
codecForAcceptExchangeTosRequest,
- codecForAcceptManualWithdrawalRequet,
+ codecForAcceptManualWithdrawalRequest,
codecForAcceptPeerPullPaymentRequest,
- codecForAcceptTipRequest,
codecForAddExchangeRequest,
+ codecForAddGlobalCurrencyAuditorRequest,
+ codecForAddGlobalCurrencyExchangeRequest,
codecForAddKnownBankAccounts,
codecForAny,
codecForApplyDevExperiment,
+ codecForCanonicalizeBaseUrlRequest,
codecForCheckPeerPullPaymentRequest,
codecForCheckPeerPushDebitRequest,
codecForConfirmPayRequest,
codecForConfirmPeerPushPaymentRequest,
+ codecForConfirmWithdrawalRequestRequest,
codecForConvertAmountRequest,
codecForCreateDepositGroupRequest,
+ codecForDeleteExchangeRequest,
codecForDeleteStoredBackupRequest,
codecForDeleteTransactionRequest,
codecForFailTransactionRequest,
@@ -87,26 +102,29 @@ import {
codecForGetContractTermsDetails,
codecForGetCurrencyInfoRequest,
codecForGetExchangeEntryByUrlRequest,
+ codecForGetExchangeResourcesRequest,
codecForGetExchangeTosRequest,
codecForGetWithdrawalDetailsForAmountRequest,
codecForGetWithdrawalDetailsForUri,
codecForImportDbRequest,
+ codecForInitRequest,
codecForInitiatePeerPullPaymentRequest,
codecForInitiatePeerPushDebitRequest,
codecForIntegrationTestArgs,
codecForIntegrationTestV2Args,
codecForListExchangesForScopedCurrencyRequest,
codecForListKnownBankAccounts,
- codecForMerchantPostOrderResponse,
+ codecForPrepareBankIntegratedWithdrawalRequest,
codecForPrepareDepositRequest,
codecForPreparePayRequest,
codecForPreparePayTemplateRequest,
codecForPreparePeerPullPaymentRequest,
codecForPreparePeerPushCreditRequest,
codecForPrepareRefundRequest,
- codecForPrepareRewardRequest,
codecForPrepareWithdrawExchangeRequest,
codecForRecoverStoredBackupRequest,
+ codecForRemoveGlobalCurrencyAuditorRequest,
+ codecForRemoveGlobalCurrencyExchangeRequest,
codecForResumeTransaction,
codecForRetryTransactionRequest,
codecForSetCoinSuspendedRequest,
@@ -115,6 +133,8 @@ import {
codecForStartRefundQueryRequest,
codecForSuspendTransaction,
codecForTestPayArgs,
+ codecForTestingGetDenomStatsRequest,
+ codecForTestingListTasksForTransactionRequest,
codecForTestingSetTimetravelRequest,
codecForTransactionByIdRequest,
codecForTransactionsRequest,
@@ -123,53 +143,23 @@ import {
codecForUserAttentionsRequest,
codecForValidateIbanRequest,
codecForWithdrawTestBalance,
- constructPayUri,
- durationFromSpec,
- durationMin,
getErrorDetailFromException,
j2s,
- parsePayTemplateUri,
+ openPromise,
parsePaytoUri,
parseTalerUri,
+ performanceNow,
+ safeStringifyException,
sampleWalletCoreTransactions,
setDangerousTimetravel,
validateIban,
} from "@gnu-taler/taler-util";
import type { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
-import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
-import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
-import {
- CryptoDispatcher,
- CryptoWorkerFactory,
-} from "./crypto/workers/crypto-dispatcher.js";
-import {
- CoinSourceType,
- ConfigRecordKey,
- DenominationRecord,
- WalletStoresV1,
- clearDatabase,
- exportDb,
- importDb,
- openStoredBackupsDatabase,
- openTalerDatabase,
-} from "./db.js";
-import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
-import {
- ActiveLongpollInfo,
- CancelFn,
- ExchangeOperations,
- InternalWalletState,
- MerchantInfo,
- MerchantOperations,
- NotificationListener,
- RecoupOperations,
- RefreshOperations,
-} from "./internal-wallet-state.js";
import {
getUserAttentions,
getUserAttentionsUnreadCount,
markAttentionRequestAsRead,
-} from "./operations/attention.js";
+} from "./attention.js";
import {
addBackupProvider,
codecForAddBackupProviderRequest,
@@ -178,382 +168,186 @@ import {
getBackupInfo,
getBackupRecovery,
loadBackupRecovery,
- processBackupForProvider,
removeBackupProvider,
runBackupCycle,
setWalletDeviceId,
-} from "./operations/backup/index.js";
-import { getBalanceDetail, getBalances } from "./operations/balance.js";
+} from "./backup/index.js";
+import { getBalanceDetail, getBalances } from "./balance.js";
+import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
- TaskIdentifiers,
- TaskRunResult,
- TaskRunResultType,
- makeExchangeListItem,
- runTaskWithErrorReporting,
-} from "./operations/common.js";
+ CryptoDispatcher,
+ CryptoWorkerFactory,
+} from "./crypto/workers/crypto-dispatcher.js";
+import {
+ CoinSourceType,
+ ConfigRecordKey,
+ DenominationRecord,
+ WalletDbReadOnlyTransaction,
+ WalletStoresV1,
+ clearDatabase,
+ exportDb,
+ importDb,
+ openStoredBackupsDatabase,
+ openTalerDatabase,
+ timestampAbsoluteFromDb,
+ timestampProtocolToDb,
+} from "./db.js";
import {
- computeDepositTransactionStatus,
+ checkDepositGroup,
createDepositGroup,
generateDepositGroupTxId,
- prepareDepositGroup,
- processDepositGroup,
-} from "./operations/deposits.js";
+} from "./deposits.js";
+import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
import {
+ ReadyExchangeSummary,
acceptExchangeTermsOfService,
addPresetExchangeEntry,
+ deleteExchange,
fetchFreshExchange,
+ forgetExchangeTermsOfService,
getExchangeDetailedInfo,
- getExchangeDetails,
+ getExchangeResources,
getExchangeTos,
- getExchanges,
- updateExchangeFromUrlHandler,
-} from "./operations/exchanges.js";
-import { getMerchantInfo } from "./operations/merchants.js";
+ listExchanges,
+ lookupExchangeByUri,
+} from "./exchanges.js";
+import {
+ convertDepositAmount,
+ convertPeerPushAmount,
+ convertWithdrawalAmount,
+ getMaxDepositAmount,
+ getMaxPeerPushAmount,
+} from "./instructedAmountConversion.js";
+import {
+ ObservableDbAccess,
+ ObservableTaskScheduler,
+ observeTalerCrypto,
+} from "./observable-wrappers.js";
import {
- computePayMerchantTransactionState,
- computeRefundTransactionState,
confirmPay,
getContractTermsDetails,
+ preparePayForTemplate,
preparePayForUri,
- processPurchase,
sharePayment,
startQueryRefund,
startRefundQueryForUri,
-} from "./operations/pay-merchant.js";
+} from "./pay-merchant.js";
import {
checkPeerPullPaymentInitiation,
- computePeerPullCreditTransactionState,
initiatePeerPullPayment,
- processPeerPullCredit,
-} from "./operations/pay-peer-pull-credit.js";
+} from "./pay-peer-pull-credit.js";
import {
- computePeerPullDebitTransactionState,
confirmPeerPullDebit,
preparePeerPullDebit,
- processPeerPullDebit,
-} from "./operations/pay-peer-pull-debit.js";
+} from "./pay-peer-pull-debit.js";
import {
- computePeerPushCreditTransactionState,
confirmPeerPushCredit,
preparePeerPushCredit,
- processPeerPushCredit,
-} from "./operations/pay-peer-push-credit.js";
+} from "./pay-peer-push-credit.js";
import {
checkPeerPushDebit,
- computePeerPushDebitTransactionState,
initiatePeerPushDebit,
- processPeerPushDebit,
-} from "./operations/pay-peer-push-debit.js";
-import { getPendingOperations } from "./operations/pending.js";
-import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
+} from "./pay-peer-push-debit.js";
import {
- autoRefresh,
- computeRefreshTransactionState,
- createRefreshGroup,
- processRefreshGroup,
-} from "./operations/refresh.js";
+ AfterCommitInfo,
+ DbAccess,
+ DbAccessImpl,
+ TriggerSpec,
+} from "./query.js";
+import { forceRefresh } from "./refresh.js";
import {
- acceptTip,
- computeRewardTransactionStatus,
- prepareTip,
- processTip,
-} from "./operations/reward.js";
+ TaskScheduler,
+ TaskSchedulerImpl,
+ convertTaskToTransactionId,
+ listTaskForTransactionId,
+} from "./shepherd.js";
import {
runIntegrationTest,
runIntegrationTest2,
testPay,
+ waitTasksDone,
waitTransactionState,
+ waitUntilAllTransactionsFinal,
waitUntilRefreshesDone,
- waitUntilTasksProcessed,
- waitUntilTransactionsFinal,
withdrawTestBalance,
-} from "./operations/testing.js";
+} from "./testing.js";
import {
abortTransaction,
+ constructTransactionIdentifier,
deleteTransaction,
failTransaction,
getTransactionById,
getTransactions,
+ getWithdrawalTransactionByUri,
parseTransactionIdentifier,
resumeTransaction,
retryTransaction,
suspendTransaction,
-} from "./operations/transactions.js";
-import {
- acceptWithdrawalFromUri,
- computeWithdrawalTransactionStatus,
- createManualWithdrawal,
- getExchangeWithdrawalInfo,
- getWithdrawalDetailsForUri,
- processWithdrawalGroup,
-} from "./operations/withdraw.js";
-import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
-import { assertUnreachable } from "./util/assertUnreachable.js";
-import {
- convertDepositAmount,
- convertPeerPushAmount,
- convertWithdrawalAmount,
- getMaxDepositAmount,
- getMaxPeerPushAmount,
-} from "./util/instructedAmountConversion.js";
-import { checkDbInvariant } from "./util/invariants.js";
-import {
- AsyncCondition,
- OpenedPromise,
- openPromise,
-} from "./util/promiseUtils.js";
-import {
- DbAccess,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "./util/query.js";
-import { TimerAPI, TimerGroup } from "./util/timer.js";
+} from "./transactions.js";
import {
WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION,
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_COREBANK_API_PROTOCOL_VERSION,
- WALLET_CORE_API_IMPLEMENTATION_VERSION,
+ WALLET_CORE_API_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
WALLET_MERCHANT_PROTOCOL_VERSION,
} from "./versions.js";
import {
WalletApiOperation,
- WalletConfig,
- WalletConfigParameter,
WalletCoreApiClient,
WalletCoreResponseType,
} from "./wallet-api-types.js";
+import {
+ acceptWithdrawalFromUri,
+ confirmWithdrawal,
+ createManualWithdrawal,
+ getWithdrawalDetailsForAmount,
+ getWithdrawalDetailsForUri,
+ prepareBankIntegratedWithdrawal,
+} from "./withdraw.js";
const logger = new Logger("wallet.ts");
/**
- * Call the right handler for a pending operation without doing
- * any special error handling.
- */
-async function callOperationHandler(
- ws: InternalWalletState,
- pending: PendingTaskInfo,
-): Promise<TaskRunResult> {
- switch (pending.type) {
- case PendingTaskType.ExchangeUpdate:
- return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Refresh:
- return await processRefreshGroup(ws, pending.refreshGroupId);
- case PendingTaskType.Withdraw:
- return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
- case PendingTaskType.RewardPickup:
- return await processTip(ws, pending.tipId);
- case PendingTaskType.Purchase:
- return await processPurchase(ws, pending.proposalId);
- case PendingTaskType.Recoup:
- return await processRecoupGroup(ws, pending.recoupGroupId);
- case PendingTaskType.ExchangeCheckRefresh:
- return await autoRefresh(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Deposit:
- return await processDepositGroup(ws, pending.depositGroupId);
- case PendingTaskType.Backup:
- return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
- case PendingTaskType.PeerPushDebit:
- return await processPeerPushDebit(ws, pending.pursePub);
- case PendingTaskType.PeerPullCredit:
- return await processPeerPullCredit(ws, pending.pursePub);
- case PendingTaskType.PeerPullDebit:
- return await processPeerPullDebit(ws, pending.peerPullDebitId);
- case PendingTaskType.PeerPushCredit:
- return await processPeerPushCredit(ws, pending.peerPushCreditId);
- default:
- return assertUnreachable(pending);
- }
- throw Error(`not reached ${pending.type}`);
-}
-
-/**
- * Process pending operations.
- */
-export async function runPending(ws: InternalWalletState): Promise<void> {
- const pendingOpsResponse = await getPendingOperations(ws);
- for (const p of pendingOpsResponse.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- await runTaskWithErrorReporting(ws, p.id, async () => {
- logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
- return await callOperationHandler(ws, p);
- });
- }
-}
-
-export interface RetryLoopOpts {
- /**
- * Stop when the number of retries is exceeded for any pending
- * operation.
- */
- maxRetries?: number;
-
- /**
- * Stop the retry loop when all lifeness-giving pending operations
- * are done.
- *
- * Defaults to false.
- */
- stopWhenDone?: boolean;
-}
-
-export interface TaskLoopResult {
- /**
- * Was the maximum number of retries exceeded in a task?
- */
- retriesExceeded: boolean;
-}
-
-/**
- * Main retry loop of the wallet.
+ * Execution context for code that is run in the wallet.
*
- * Looks up pending operations from the wallet, runs them, repeat.
+ * Typically the execution context is either for a wallet-core
+ * request handler or for a shepherded task.
*/
-async function runTaskLoop(
- ws: InternalWalletState,
- opts: RetryLoopOpts = {},
-): Promise<TaskLoopResult> {
- logger.trace(`running task loop opts=${j2s(opts)}`);
- if (ws.isTaskLoopRunning) {
- logger.warn(
- "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
- );
- }
- const throttler = new TaskThrottler();
- ws.isTaskLoopRunning = true;
- let retriesExceeded = false;
- for (let iteration = 0; !ws.stopped; iteration++) {
- const pending = await getPendingOperations(ws);
- logger.trace(`pending operations: ${j2s(pending)}`);
- let numGivingLiveness = 0;
- let numDue = 0;
- let numThrottled = 0;
- let minDue: AbsoluteTime = AbsoluteTime.never();
-
- for (const p of pending.pendingOperations) {
- const maxRetries = opts.maxRetries;
-
- if (maxRetries && p.retryInfo && p.retryInfo.retryCounter > maxRetries) {
- retriesExceeded = true;
- logger.warn(
- `skipping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`,
- );
- continue;
- }
- if (p.givesLifeness) {
- numGivingLiveness++;
- }
- if (!p.isDue) {
- continue;
- }
- numDue++;
-
- const isThrottled = throttler.applyThrottle(p.id);
-
- if (isThrottled) {
- logger.warn(
- `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`,
- );
- numDue--;
- numThrottled++;
- } else {
- minDue = AbsoluteTime.min(minDue, p.timestampDue);
- }
- }
-
- logger.trace(
- `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #trottled=${numThrottled}`,
- );
-
- if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
- logger.warn(`stopping, as no pending operations have lifeness`);
- ws.isTaskLoopRunning = false;
- return {
- retriesExceeded,
- };
- }
+export interface WalletExecutionContext {
+ readonly ws: InternalWalletState;
+ readonly cryptoApi: TalerCryptoInterface;
+ readonly cancellationToken: CancellationToken;
+ readonly http: HttpRequestLibrary;
+ readonly db: DbAccess<typeof WalletStoresV1>;
+ readonly oc: ObservabilityContext;
+ readonly taskScheduler: TaskScheduler;
+}
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return {
- retriesExceeded,
- };
- }
+export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
+export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
- // Make sure that we run tasks that don't give lifeness at least
- // one time.
- if (iteration !== 0 && numDue === 0) {
- // We've executed pending, due operations at least one.
- // Now we don't have any more operations available,
- // and need to wait.
+export type NotificationListener = (n: WalletNotification) => void;
- // Wait for at most 5 seconds to the next check.
- const dt = durationMin(
- durationFromSpec({
- seconds: 5,
- }),
- Duration.getRemaining(minDue),
- );
- logger.trace(`waiting for at most ${dt.d_ms} ms`);
- const timeout = ws.timerGroup.resolveAfter(dt);
- // Wait until either the timeout, or we are notified (via the latch)
- // that more work might be available.
- await Promise.race([timeout, ws.workAvailable.wait()]);
- logger.trace(`done waiting for available work`);
- } else {
- logger.trace(
- `running ${pending.pendingOperations.length} pending operations`,
- );
- for (const p of pending.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- logger.trace(`running task ${p.id}`);
- const res = await runTaskWithErrorReporting(ws, p.id, async () => {
- return await callOperationHandler(ws, p);
- });
- if (!(ws.stopped && res.type === TaskRunResultType.Error)) {
- ws.notify({
- type: NotificationType.PendingOperationProcessed,
- id: p.id,
- taskResultType: res.type,
- });
- }
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return {
- retriesExceeded,
- };
- }
- }
- }
- }
- logger.trace("exiting wallet task loop");
- ws.isTaskLoopRunning = false;
- return {
- retriesExceeded,
- };
-}
+type CancelFn = () => void;
/**
* Insert the hard-coded defaults for exchanges, coins and
* auditors into the database, unless these defaults have
* already been applied.
*/
-async function fillDefaults(ws: InternalWalletState): Promise<void> {
+async function fillDefaults(wex: WalletExecutionContext): Promise<void> {
const notifications: WalletNotification[] = [];
- await ws.db
- .mktx((x) => [x.config, x.exchanges, x.exchangeDetails])
- .runReadWrite(async (tx) => {
+ await wex.db.runReadWriteTx(
+ { storeNames: ["config", "exchanges"] },
+ async (tx) => {
const appliedRec = await tx.config.get("currencyDefaultsApplied");
let alreadyApplied = appliedRec ? !!appliedRec.value : false;
if (alreadyApplied) {
logger.trace("defaults already applied");
return;
}
- for (const exch of ws.config.builtin.exchanges) {
+ for (const exch of wex.ws.config.builtin.exchanges) {
const resp = await addPresetExchangeEntry(
tx,
exch.exchangeBaseUrl,
@@ -567,10 +361,31 @@ async function fillDefaults(ws: InternalWalletState): Promise<void> {
key: ConfigRecordKey.CurrencyDefaultsApplied,
value: true,
});
- });
+ },
+ );
for (const notif of notifications) {
- ws.notify(notif);
+ wex.ws.notify(notif);
+ }
+}
+
+export async function getDenomInfo(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadOnlyTransaction<["denominations"]>,
+ exchangeBaseUrl: string,
+ denomPubHash: string,
+): Promise<DenominationInfo | undefined> {
+ const cacheKey = `${exchangeBaseUrl}:${denomPubHash}`;
+ const cached = wex.ws.denomInfoCache.get(cacheKey);
+ if (cached) {
+ return cached;
}
+ const d = await tx.denominations.get([exchangeBaseUrl, denomPubHash]);
+ if (d) {
+ const denomInfo = DenominationRecord.toDenomInfo(d);
+ wex.ws.denomInfoCache.put(cacheKey, denomInfo);
+ return denomInfo;
+ }
+ return undefined;
}
/**
@@ -578,79 +393,73 @@ async function fillDefaults(ws: InternalWalletState): Promise<void> {
* previous withdrawals.
*/
async function listKnownBankAccounts(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
currency?: string,
): Promise<KnownBankAccounts> {
const accounts: KnownBankAccountsInfo[] = [];
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadOnly(async (tx) => {
- const knownAccounts = await tx.bankAccounts.iter().toArray();
- for (const r of knownAccounts) {
- if (currency && currency !== r.currency) {
- continue;
- }
- const payto = parsePaytoUri(r.uri);
- if (payto) {
- accounts.push({
- uri: payto,
- alias: r.alias,
- kyc_completed: r.kycCompleted,
- currency: r.currency,
- });
- }
+ await wex.db.runReadOnlyTx({ storeNames: ["bankAccounts"] }, async (tx) => {
+ const knownAccounts = await tx.bankAccounts.iter().toArray();
+ for (const r of knownAccounts) {
+ if (currency && currency !== r.currency) {
+ continue;
}
- });
+ const payto = parsePaytoUri(r.uri);
+ if (payto) {
+ accounts.push({
+ uri: payto,
+ alias: r.alias,
+ kyc_completed: r.kycCompleted,
+ currency: r.currency,
+ });
+ }
+ }
+ });
return { accounts };
}
/**
*/
async function addKnownBankAccounts(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
payto: string,
alias: string,
currency: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadWrite(async (tx) => {
- tx.bankAccounts.put({
- uri: payto,
- alias: alias,
- currency: currency,
- kycCompleted: false,
- });
+ await wex.db.runReadWriteTx({ storeNames: ["bankAccounts"] }, async (tx) => {
+ tx.bankAccounts.put({
+ uri: payto,
+ alias: alias,
+ currency: currency,
+ kycCompleted: false,
});
+ });
return;
}
/**
*/
async function forgetKnownBankAccounts(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
payto: string,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.bankAccounts])
- .runReadWrite(async (tx) => {
- const account = await tx.bankAccounts.get(payto);
- if (!account) {
- throw Error(`account not found: ${payto}`);
- }
- tx.bankAccounts.delete(account.uri);
- });
+ await wex.db.runReadWriteTx({ storeNames: ["bankAccounts"] }, async (tx) => {
+ const account = await tx.bankAccounts.get(payto);
+ if (!account) {
+ throw Error(`account not found: ${payto}`);
+ }
+ tx.bankAccounts.delete(account.uri);
+ });
return;
}
async function setCoinSuspended(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
coinPub: string,
suspended: boolean,
): Promise<void> {
- await ws.db
- .mktx((x) => [x.coins, x.coinAvailability])
- .runReadWrite(async (tx) => {
+ await wex.db.runReadWriteTx(
+ { storeNames: ["coins", "coinAvailability"] },
+ async (tx) => {
const c = await tx.coins.get(coinPub);
if (!c) {
logger.warn(`coin ${coinPub} not found, won't suspend`);
@@ -682,18 +491,19 @@ async function setCoinSuspended(
}
await tx.coins.put(c);
await tx.coinAvailability.put(coinAvailability);
- });
+ },
+ );
}
/**
* Dump the public information of coins we have in an easy-to-process format.
*/
-async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
+async function dumpCoins(wex: WalletExecutionContext): Promise<CoinDumpJson> {
const coinsJson: CoinDumpJson = { coins: [] };
logger.info("dumping coins");
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.withdrawalGroups])
- .runReadOnly(async (tx) => {
+ await wex.db.runReadOnlyTx(
+ { storeNames: ["coins", "denominations"] },
+ async (tx) => {
const coins = await tx.coins.iter().toArray();
for (const c of coins) {
const denom = await tx.denominations.get([
@@ -713,8 +523,8 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
if (cs.type == CoinSourceType.Withdraw) {
withdrawalReservePub = cs.reservePub;
}
- const denomInfo = await ws.getDenomInfo(
- ws,
+ const denomInfo = await getDenomInfo(
+ wex,
tx,
c.exchangeBaseUrl,
c.denomPubHash,
@@ -741,20 +551,22 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
: undefined,
});
}
- });
+ },
+ );
return coinsJson;
}
/**
* Get an API client from an internal wallet state object.
*/
-export async function getClientFromWalletState(
+let id = 0;
+async function getClientFromWalletState(
ws: InternalWalletState,
): Promise<WalletCoreApiClient> {
- let id = 0;
const client: WalletCoreApiClient = {
async call(op, payload): Promise<any> {
- const res = await handleCoreApiRequest(ws, op, `${id++}`, payload);
+ id = (id + 1) % (Number.MAX_SAFE_INTEGER - 100);
+ const res = await handleCoreApiRequest(ws, op, String(id), payload);
switch (res.type) {
case "error":
throw TalerError.fromUncheckedDetail(res.error);
@@ -767,12 +579,12 @@ export async function getClientFromWalletState(
}
async function createStoredBackup(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
): Promise<CreateStoredBackupResponse> {
- const backup = await exportDb(ws.idb);
- const backupsDb = await openStoredBackupsDatabase(ws.idb);
+ const backup = await exportDb(wex.ws.idb);
+ const backupsDb = await openStoredBackupsDatabase(wex.ws.idb);
const name = `backup-${new Date().getTime()}`;
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ await backupsDb.runAllStoresReadWriteTx({}, async (tx) => {
await tx.backupMeta.add({
name,
});
@@ -784,13 +596,13 @@ async function createStoredBackup(
}
async function listStoredBackups(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
): Promise<StoredBackupList> {
const storedBackups: StoredBackupList = {
storedBackups: [],
};
- const backupsDb = await openStoredBackupsDatabase(ws.idb);
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ const backupsDb = await openStoredBackupsDatabase(wex.ws.idb);
+ await backupsDb.runAllStoresReadWriteTx({}, async (tx) => {
await tx.backupMeta.iter().forEach((x) => {
storedBackups.storedBackups.push({
name: x.name,
@@ -801,24 +613,24 @@ async function listStoredBackups(
}
async function deleteStoredBackup(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: DeleteStoredBackupRequest,
): Promise<void> {
- const backupsDb = await openStoredBackupsDatabase(ws.idb);
- await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ const backupsDb = await openStoredBackupsDatabase(wex.ws.idb);
+ await backupsDb.runAllStoresReadWriteTx({}, async (tx) => {
await tx.backupData.delete(req.name);
await tx.backupMeta.delete(req.name);
});
}
async function recoverStoredBackup(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: RecoverStoredBackupRequest,
): Promise<void> {
logger.info(`Recovering stored backup ${req.name}`);
const { name } = req;
- const backupsDb = await openStoredBackupsDatabase(ws.idb);
- const bd = await backupsDb.mktxAll().runReadWrite(async (tx) => {
+ const backupsDb = await openStoredBackupsDatabase(wex.ws.idb);
+ const bd = await backupsDb.runAllStoresReadWriteTx({}, async (tx) => {
const backupMeta = tx.backupMeta.get(name);
if (!backupMeta) {
throw Error("backup not found");
@@ -830,12 +642,12 @@ async function recoverStoredBackup(
return backupData;
});
logger.info(`backup found, now importing`);
- await importDb(ws.db.idbHandle(), bd);
+ await importDb(wex.db.idbHandle(), bd);
logger.info(`import done`);
}
async function handlePrepareWithdrawExchange(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: PrepareWithdrawExchangeRequest,
): Promise<PrepareWithdrawExchangeResponse> {
const parsedUri = parseTalerUri(req.talerUri);
@@ -843,8 +655,8 @@ async function handlePrepareWithdrawExchange(
throw Error("expected a taler://withdraw-exchange URI");
}
const exchangeBaseUrl = parsedUri.exchangeBaseUrl;
- const exchange = await fetchFreshExchange(ws, exchangeBaseUrl);
- if (exchange.masterPub != parsedUri.exchangePub) {
+ const exchange = await fetchFreshExchange(wex, exchangeBaseUrl);
+ if (parsedUri.exchangePub && exchange.masterPub != parsedUri.exchangePub) {
throw Error("mismatch of exchange master public key (URI vs actual)");
}
if (parsedUri.amount) {
@@ -860,14 +672,27 @@ async function handlePrepareWithdrawExchange(
}
/**
+ * Response returned from the pending operations API.
+ *
+ * @deprecated this is a placeholder for the response type of a deprecated wallet-core request.
+ */
+export interface PendingOperationsResponse {
+ /**
+ * List of pending operations.
+ */
+ pendingOperations: any[];
+}
+
+/**
* Implementation of the "wallet-core" API.
*/
-async function dispatchRequestInternal<Op extends WalletApiOperation>(
- ws: InternalWalletState,
+async function dispatchRequestInternal(
+ wex: WalletExecutionContext,
+ cts: CancellationToken.Source,
operation: WalletApiOperation,
payload: unknown,
): Promise<WalletCoreResponseType<typeof operation>> {
- if (!ws.initCalled && operation !== WalletApiOperation.InitWallet) {
+ if (!wex.ws.initCalled && operation !== WalletApiOperation.InitWallet) {
throw Error(
`wallet must be initialized before running operation ${operation}`,
);
@@ -876,56 +701,95 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
// definitions we already have?
switch (operation) {
case WalletApiOperation.CreateStoredBackup:
- return createStoredBackup(ws);
+ return createStoredBackup(wex);
case WalletApiOperation.DeleteStoredBackup: {
const req = codecForDeleteStoredBackupRequest().decode(payload);
- await deleteStoredBackup(ws, req);
+ await deleteStoredBackup(wex, req);
return {};
}
case WalletApiOperation.ListStoredBackups:
- return listStoredBackups(ws);
+ return listStoredBackups(wex);
case WalletApiOperation.RecoverStoredBackup: {
const req = codecForRecoverStoredBackupRequest().decode(payload);
- await recoverStoredBackup(ws, req);
+ await recoverStoredBackup(wex, req);
return {};
}
+ case WalletApiOperation.SetWalletRunConfig:
case WalletApiOperation.InitWallet: {
- logger.trace("initializing wallet");
- ws.initCalled = true;
- if (ws.config.testing.skipDefaults) {
+ const req = codecForInitRequest().decode(payload);
+
+ logger.info(`init request: ${j2s(req)}`);
+
+ if (wex.ws.initCalled) {
+ logger.info("initializing wallet (repeat initialization)");
+ } else {
+ logger.info("initializing wallet (first initialization)");
+ }
+
+ // Write to the DB to make sure that we're failing early in
+ // case the DB is not writeable.
+ try {
+ await wex.db.runReadWriteTx({ storeNames: ["config"] }, async (tx) => {
+ tx.config.put({
+ key: ConfigRecordKey.LastInitInfo,
+ value: timestampProtocolToDb(TalerProtocolTimestamp.now()),
+ });
+ });
+ } catch (e) {
+ logger.error("error writing to database during initialization");
+ throw TalerError.fromDetail(TalerErrorCode.WALLET_DB_UNAVAILABLE, {
+ innerError: getErrorDetailFromException(e),
+ });
+ }
+
+ wex.ws.initWithConfig(applyRunConfigDefaults(req.config));
+
+ if (wex.ws.config.testing.skipDefaults) {
logger.trace("skipping defaults");
} else {
logger.trace("filling defaults");
- await fillDefaults(ws);
+ await fillDefaults(wex);
}
const resp: InitResponse = {
- versionInfo: getVersion(ws),
+ versionInfo: getVersion(wex),
};
+
+ // After initialization, task loop should run.
+ await wex.taskScheduler.ensureRunning();
+
+ wex.ws.initCalled = true;
return resp;
}
case WalletApiOperation.WithdrawTestkudos: {
- await withdrawTestBalance(ws, {
+ await withdrawTestBalance(wex, {
amount: "TESTKUDOS:10" as AmountString,
corebankApiBaseUrl: "https://bank.test.taler.net/",
exchangeBaseUrl: "https://exchange.test.taler.net/",
});
return {
- versionInfo: getVersion(ws),
+ versionInfo: getVersion(wex),
};
}
case WalletApiOperation.WithdrawTestBalance: {
const req = codecForWithdrawTestBalance().decode(payload);
- await withdrawTestBalance(ws, req);
+ await withdrawTestBalance(wex, req);
return {};
}
+ case WalletApiOperation.TestingListTaskForTransaction: {
+ const req =
+ codecForTestingListTasksForTransactionRequest().decode(payload);
+ return {
+ taskIdList: listTaskForTransactionId(req.transactionId),
+ } satisfies TestingListTasksForTransactionsResponse;
+ }
case WalletApiOperation.RunIntegrationTest: {
const req = codecForIntegrationTestArgs().decode(payload);
- await runIntegrationTest(ws, req);
+ await runIntegrationTest(wex, req);
return {};
}
case WalletApiOperation.RunIntegrationTestV2: {
const req = codecForIntegrationTestV2Args().decode(payload);
- await runIntegrationTest2(ws, req);
+ await runIntegrationTest2(wex, req);
return {};
}
case WalletApiOperation.ValidateIban: {
@@ -938,66 +802,75 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.TestPay: {
const req = codecForTestPayArgs().decode(payload);
- return await testPay(ws, req);
+ return await testPay(wex, req);
}
case WalletApiOperation.GetTransactions: {
const req = codecForTransactionsRequest().decode(payload);
- return await getTransactions(ws, req);
+ return await getTransactions(wex, req);
}
case WalletApiOperation.GetTransactionById: {
const req = codecForTransactionByIdRequest().decode(payload);
- return await getTransactionById(ws, req);
+ return await getTransactionById(wex, req);
+ }
+ case WalletApiOperation.GetWithdrawalTransactionByUri: {
+ const req = codecForGetWithdrawalDetailsForUri().decode(payload);
+ return await getWithdrawalTransactionByUri(wex, req);
}
case WalletApiOperation.AddExchange: {
const req = codecForAddExchangeRequest().decode(payload);
- await fetchFreshExchange(ws, req.exchangeBaseUrl, {
+ await fetchFreshExchange(wex, req.exchangeBaseUrl, {
expectedMasterPub: req.masterPub,
});
return {};
}
+ case WalletApiOperation.TestingPing: {
+ return {};
+ }
case WalletApiOperation.UpdateExchangeEntry: {
const req = codecForUpdateExchangeEntryRequest().decode(payload);
- await fetchFreshExchange(ws, req.exchangeBaseUrl, {
- forceUpdate: true,
+ await fetchFreshExchange(wex, req.exchangeBaseUrl, {
+ forceUpdate: !!req.force,
});
return {};
}
+ case WalletApiOperation.TestingGetDenomStats: {
+ const req = codecForTestingGetDenomStatsRequest().decode(payload);
+ const denomStats: TestingGetDenomStatsResponse = {
+ numKnown: 0,
+ numLost: 0,
+ numOffered: 0,
+ };
+ await wex.db.runReadOnlyTx(
+ { storeNames: ["denominations"] },
+ async (tx) => {
+ const denoms =
+ await tx.denominations.indexes.byExchangeBaseUrl.getAll(
+ req.exchangeBaseUrl,
+ );
+ for (const d of denoms) {
+ denomStats.numKnown++;
+ if (d.isOffered) {
+ denomStats.numOffered++;
+ }
+ if (d.isLost) {
+ denomStats.numLost++;
+ }
+ }
+ },
+ );
+ return denomStats;
+ }
case WalletApiOperation.ListExchanges: {
- return await getExchanges(ws);
+ return await listExchanges(wex);
}
case WalletApiOperation.GetExchangeEntryByUrl: {
const req = codecForGetExchangeEntryByUrlRequest().decode(payload);
- const exchangeEntry = await ws.db
- .mktx((x) => [
- x.exchanges,
- x.exchangeDetails,
- x.denominations,
- x.operationRetries,
- ])
- .runReadOnly(async (tx) => {
- const exchangeRec = await tx.exchanges.get(req.exchangeBaseUrl);
- if (!exchangeRec) {
- throw Error("exchange not found");
- }
- const exchangeDetails = await getExchangeDetails(
- tx,
- exchangeRec.baseUrl,
- );
- const opRetryRecord = await tx.operationRetries.get(
- TaskIdentifiers.forExchangeUpdate(exchangeRec),
- );
- return makeExchangeListItem(
- exchangeRec,
- exchangeDetails,
- opRetryRecord?.lastError,
- );
- });
- return exchangeEntry;
+ return lookupExchangeByUri(wex, req);
}
case WalletApiOperation.ListExchangesForScopedCurrency: {
const req =
codecForListExchangesForScopedCurrencyRequest().decode(payload);
- const exchangesResp = await getExchanges(ws);
+ const exchangesResp = await listExchanges(wex);
const result: ExchangesShortListResponse = {
exchanges: [],
};
@@ -1014,97 +887,97 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.GetExchangeDetailedInfo: {
const req = codecForAddExchangeRequest().decode(payload);
- return await getExchangeDetailedInfo(ws, req.exchangeBaseUrl);
+ return await getExchangeDetailedInfo(wex, req.exchangeBaseUrl);
}
case WalletApiOperation.ListKnownBankAccounts: {
const req = codecForListKnownBankAccounts().decode(payload);
- return await listKnownBankAccounts(ws, req.currency);
+ return await listKnownBankAccounts(wex, req.currency);
}
case WalletApiOperation.AddKnownBankAccounts: {
const req = codecForAddKnownBankAccounts().decode(payload);
- await addKnownBankAccounts(ws, req.payto, req.alias, req.currency);
+ await addKnownBankAccounts(wex, req.payto, req.alias, req.currency);
return {};
}
case WalletApiOperation.ForgetKnownBankAccounts: {
const req = codecForForgetKnownBankAccounts().decode(payload);
- await forgetKnownBankAccounts(ws, req.payto);
+ await forgetKnownBankAccounts(wex, req.payto);
return {};
}
case WalletApiOperation.GetWithdrawalDetailsForUri: {
const req = codecForGetWithdrawalDetailsForUri().decode(payload);
- return await getWithdrawalDetailsForUri(ws, req.talerWithdrawUri);
+ return await getWithdrawalDetailsForUri(wex, req.talerWithdrawUri, {
+ restrictAge: req.restrictAge,
+ });
}
case WalletApiOperation.AcceptManualWithdrawal: {
- const req = codecForAcceptManualWithdrawalRequet().decode(payload);
- const res = await createManualWithdrawal(ws, {
+ const req = codecForAcceptManualWithdrawalRequest().decode(payload);
+ const res = await createManualWithdrawal(wex, {
amount: Amounts.parseOrThrow(req.amount),
exchangeBaseUrl: req.exchangeBaseUrl,
restrictAge: req.restrictAge,
+ forceReservePriv: req.forceReservePriv,
});
return res;
}
case WalletApiOperation.GetWithdrawalDetailsForAmount: {
const req =
codecForGetWithdrawalDetailsForAmountRequest().decode(payload);
- const wi = await getExchangeWithdrawalInfo(
- ws,
- req.exchangeBaseUrl,
- Amounts.parseOrThrow(req.amount),
- req.restrictAge,
- );
- let numCoins = 0;
- for (const x of wi.selectedDenoms.selectedDenoms) {
- numCoins += x.count;
- }
- const amt = Amounts.parseOrThrow(req.amount);
- const resp: WithdrawalDetailsForAmount = {
- amountRaw: req.amount,
- amountEffective: Amounts.stringify(wi.selectedDenoms.totalCoinValue),
- paytoUris: wi.exchangePaytoUris,
- tosAccepted: wi.termsOfServiceAccepted,
- ageRestrictionOptions: wi.ageRestrictionOptions,
- withdrawalAccountsList: wi.exchangeCreditAccountDetails,
- numCoins,
- // FIXME: Once we have proper scope info support, return correct info here.
- scopeInfo: {
- type: ScopeType.Exchange,
- currency: amt.currency,
- url: req.exchangeBaseUrl,
- },
- };
+ const resp = await getWithdrawalDetailsForAmount(wex, cts, req);
return resp;
}
case WalletApiOperation.GetBalances: {
- return await getBalances(ws);
+ return await getBalances(wex);
}
case WalletApiOperation.GetBalanceDetail: {
const req = codecForGetBalanceDetailRequest().decode(payload);
- return await getBalanceDetail(ws, req);
+ return await getBalanceDetail(wex, req);
}
case WalletApiOperation.GetUserAttentionRequests: {
const req = codecForUserAttentionsRequest().decode(payload);
- return await getUserAttentions(ws, req);
+ return await getUserAttentions(wex, req);
}
case WalletApiOperation.MarkAttentionRequestAsRead: {
const req = codecForUserAttentionByIdRequest().decode(payload);
- return await markAttentionRequestAsRead(ws, req);
+ return await markAttentionRequestAsRead(wex, req);
}
case WalletApiOperation.GetUserAttentionUnreadCount: {
const req = codecForUserAttentionsRequest().decode(payload);
- return await getUserAttentionsUnreadCount(ws, req);
+ return await getUserAttentionsUnreadCount(wex, req);
}
case WalletApiOperation.GetPendingOperations: {
- return await getPendingOperations(ws);
+ // FIXME: Eventually remove the handler after deprecation period.
+ return {
+ pendingOperations: [],
+ } satisfies PendingOperationsResponse;
}
case WalletApiOperation.SetExchangeTosAccepted: {
const req = codecForAcceptExchangeTosRequest().decode(payload);
- await acceptExchangeTermsOfService(ws, req.exchangeBaseUrl, req.etag);
+ await acceptExchangeTermsOfService(wex, req.exchangeBaseUrl);
+ return {};
+ }
+ case WalletApiOperation.SetExchangeTosForgotten: {
+ const req = codecForAcceptExchangeTosRequest().decode(payload);
+ await forgetExchangeTermsOfService(wex, req.exchangeBaseUrl);
return {};
}
case WalletApiOperation.AcceptBankIntegratedWithdrawal: {
const req =
codecForAcceptBankIntegratedWithdrawalRequest().decode(payload);
- return await acceptWithdrawalFromUri(ws, {
+ return await acceptWithdrawalFromUri(wex, {
+ selectedExchange: req.exchangeBaseUrl,
+ talerWithdrawUri: req.talerWithdrawUri,
+ forcedDenomSel: req.forcedDenomSel,
+ restrictAge: req.restrictAge,
+ });
+ }
+ case WalletApiOperation.ConfirmWithdrawal: {
+ const req = codecForConfirmWithdrawalRequestRequest().decode(payload);
+ return confirmWithdrawal(wex, req.transactionId);
+ }
+ case WalletApiOperation.PrepareBankIntegratedWithdrawal: {
+ const req =
+ codecForPrepareBankIntegratedWithdrawalRequest().decode(payload);
+ return prepareBankIntegratedWithdrawal(wex, {
selectedExchange: req.exchangeBaseUrl,
talerWithdrawUri: req.talerWithdrawUri,
forcedDenomSel: req.forcedDenomSel,
@@ -1114,7 +987,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
case WalletApiOperation.GetExchangeTos: {
const req = codecForGetExchangeTosRequest().decode(payload);
return getExchangeTos(
- ws,
+ wex,
req.exchangeBaseUrl,
req.acceptedFormat,
req.acceptLanguage,
@@ -1122,168 +995,130 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.GetContractTermsDetails: {
const req = codecForGetContractTermsDetails().decode(payload);
- return getContractTermsDetails(ws, req.proposalId);
+ if (req.proposalId) {
+ // FIXME: deprecated path
+ return getContractTermsDetails(wex, req.proposalId);
+ }
+ if (req.transactionId) {
+ const parsedTx = parseTransactionIdentifier(req.transactionId);
+ if (parsedTx?.tag === TransactionType.Payment) {
+ return getContractTermsDetails(wex, parsedTx.proposalId);
+ }
+ throw Error("transactionId is not a payment transaction");
+ }
+ throw Error("transactionId missing");
}
case WalletApiOperation.RetryPendingNow: {
- // FIXME: Should we reset all operation retries here?
- await runPending(ws);
+ logger.error("retryPendingNow currently not implemented");
return {};
}
case WalletApiOperation.SharePayment: {
const req = codecForSharePaymentRequest().decode(payload);
- return await sharePayment(ws, req.merchantBaseUrl, req.orderId);
+ return await sharePayment(wex, req.merchantBaseUrl, req.orderId);
}
case WalletApiOperation.PrepareWithdrawExchange: {
const req = codecForPrepareWithdrawExchangeRequest().decode(payload);
- return handlePrepareWithdrawExchange(ws, req);
+ return handlePrepareWithdrawExchange(wex, req);
}
case WalletApiOperation.PreparePayForUri: {
const req = codecForPreparePayRequest().decode(payload);
- return await preparePayForUri(ws, req.talerPayUri);
+ return await preparePayForUri(wex, req.talerPayUri);
}
case WalletApiOperation.PreparePayForTemplate: {
const req = codecForPreparePayTemplateRequest().decode(payload);
- const url = parsePayTemplateUri(req.talerPayTemplateUri);
- const templateDetails: MerchantUsingTemplateDetails = {};
- if (!url) {
- throw Error("invalid taler-template URI");
- }
- if (
- url.templateParams.amount !== undefined &&
- typeof url.templateParams.amount === "string"
- ) {
- templateDetails.amount = (req.templateParams.amount ??
- url.templateParams.amount) as AmountString | undefined;
- }
- if (
- url.templateParams.summary !== undefined &&
- typeof url.templateParams.summary === "string"
- ) {
- templateDetails.summary =
- req.templateParams.summary ?? url.templateParams.summary;
- }
- const reqUrl = new URL(
- `templates/${url.templateId}`,
- url.merchantBaseUrl,
- );
- const httpReq = await ws.http.fetch(reqUrl.href, {
- method: "POST",
- body: templateDetails,
- });
- const resp = await readSuccessResponseJsonOrThrow(
- httpReq,
- codecForMerchantPostOrderResponse(),
- );
-
- const payUri = constructPayUri(
- url.merchantBaseUrl,
- resp.order_id,
- "",
- resp.token,
- );
-
- return await preparePayForUri(ws, payUri);
+ return preparePayForTemplate(wex, req);
}
case WalletApiOperation.ConfirmPay: {
const req = codecForConfirmPayRequest().decode(payload);
- let proposalId;
+ let transactionId;
if (req.proposalId) {
// legacy client support
- proposalId = req.proposalId;
+ transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Payment,
+ proposalId: req.proposalId,
+ });
} else if (req.transactionId) {
- const txIdParsed = parseTransactionIdentifier(req.transactionId);
- if (txIdParsed?.tag != TransactionType.Payment) {
- throw Error("payment transaction ID required");
- }
- proposalId = txIdParsed.proposalId;
+ transactionId = req.transactionId;
} else {
throw Error("transactionId or (deprecated) proposalId required");
}
- return await confirmPay(ws, proposalId, req.sessionId);
+ return await confirmPay(wex, transactionId, req.sessionId);
}
case WalletApiOperation.AbortTransaction: {
const req = codecForAbortTransaction().decode(payload);
- await abortTransaction(ws, req.transactionId);
+ await abortTransaction(wex, req.transactionId);
return {};
}
case WalletApiOperation.SuspendTransaction: {
const req = codecForSuspendTransaction().decode(payload);
- await suspendTransaction(ws, req.transactionId);
+ await suspendTransaction(wex, req.transactionId);
return {};
}
+ case WalletApiOperation.GetActiveTasks: {
+ const allTasksId = wex.taskScheduler.getActiveTasks();
+
+ const tasksInfo = await Promise.all(
+ allTasksId.map(async (id) => {
+ return await wex.db.runReadOnlyTx(
+ { storeNames: ["operationRetries"] },
+ async (tx) => {
+ return tx.operationRetries.get(id);
+ },
+ );
+ }),
+ );
+
+ const tasks = allTasksId.map((taskId, i): ActiveTask => {
+ const transaction = convertTaskToTransactionId(taskId);
+ const d = tasksInfo[i];
+
+ const firstTry = !d
+ ? undefined
+ : timestampAbsoluteFromDb(d.retryInfo.firstTry);
+ const nextTry = !d
+ ? undefined
+ : timestampAbsoluteFromDb(d.retryInfo.nextRetry);
+ const counter = d?.retryInfo.retryCounter;
+ const lastError = d?.lastError;
+
+ return {
+ taskId: taskId,
+ retryCounter: counter,
+ firstTry,
+ nextTry,
+ lastError,
+ transaction,
+ };
+ });
+ return { tasks };
+ }
case WalletApiOperation.FailTransaction: {
const req = codecForFailTransactionRequest().decode(payload);
- await failTransaction(ws, req.transactionId);
+ await failTransaction(wex, req.transactionId);
return {};
}
case WalletApiOperation.ResumeTransaction: {
const req = codecForResumeTransaction().decode(payload);
- await resumeTransaction(ws, req.transactionId);
+ await resumeTransaction(wex, req.transactionId);
return {};
}
case WalletApiOperation.DumpCoins: {
- return await dumpCoins(ws);
+ return await dumpCoins(wex);
}
case WalletApiOperation.SetCoinSuspended: {
const req = codecForSetCoinSuspendedRequest().decode(payload);
- await setCoinSuspended(ws, req.coinPub, req.suspended);
+ await setCoinSuspended(wex, req.coinPub, req.suspended);
return {};
}
case WalletApiOperation.TestingGetSampleTransactions:
return { transactions: sampleWalletCoreTransactions };
case WalletApiOperation.ForceRefresh: {
const req = codecForForceRefreshRequest().decode(payload);
- if (req.coinPubList.length == 0) {
- throw Error("refusing to create empty refresh group");
- }
- const refreshGroupId = await ws.db
- .mktx((x) => [
- x.refreshGroups,
- x.coinAvailability,
- x.denominations,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
- let coinPubs: CoinRefreshRequest[] = [];
- for (const c of req.coinPubList) {
- const coin = await tx.coins.get(c);
- if (!coin) {
- throw Error(`coin (pubkey ${c}) not found`);
- }
- const denom = await ws.getDenomInfo(
- ws,
- tx,
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- );
- checkDbInvariant(!!denom);
- coinPubs.push({
- coinPub: c,
- amount: denom?.value,
- });
- }
- return await createRefreshGroup(
- ws,
- tx,
- Amounts.currencyOf(coinPubs[0].amount),
- coinPubs,
- RefreshReason.Manual,
- );
- });
- processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => {
- logger.error(x);
- });
- return {
- refreshGroupId,
- };
- }
- case WalletApiOperation.PrepareReward: {
- const req = codecForPrepareRewardRequest().decode(payload);
- return await prepareTip(ws, req.talerRewardUri);
+ return await forceRefresh(wex, req);
}
case WalletApiOperation.StartRefundQueryForUri: {
const req = codecForPrepareRefundRequest().decode(payload);
- return await startRefundQueryForUri(ws, req.talerRefundUri);
+ return await startRefundQueryForUri(wex, req.talerRefundUri);
}
case WalletApiOperation.StartRefundQuery: {
const req = codecForStartRefundQueryRequest().decode(payload);
@@ -1294,38 +1129,30 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
if (txIdParsed.tag !== TransactionType.Payment) {
throw Error("expected payment transaction ID");
}
- await startQueryRefund(ws, txIdParsed.proposalId);
+ await startQueryRefund(wex, txIdParsed.proposalId);
return {};
}
- case WalletApiOperation.AcceptReward: {
- const req = codecForAcceptTipRequest().decode(payload);
- return await acceptTip(ws, req.walletRewardId);
- }
case WalletApiOperation.AddBackupProvider: {
const req = codecForAddBackupProviderRequest().decode(payload);
- return await addBackupProvider(ws, req);
+ return await addBackupProvider(wex, req);
}
case WalletApiOperation.RunBackupCycle: {
const req = codecForRunBackupCycle().decode(payload);
- await runBackupCycle(ws, req);
+ await runBackupCycle(wex, req);
return {};
}
case WalletApiOperation.RemoveBackupProvider: {
const req = codecForRemoveBackupProvider().decode(payload);
- await removeBackupProvider(ws, req);
+ await removeBackupProvider(wex, req);
return {};
}
case WalletApiOperation.ExportBackupRecovery: {
- const resp = await getBackupRecovery(ws);
+ const resp = await getBackupRecovery(wex);
return resp;
}
case WalletApiOperation.TestingWaitTransactionState: {
const req = payload as TestingWaitTransactionRequest;
- await waitTransactionState(ws, req.transactionId, req.txState);
- return {};
- }
- case WalletApiOperation.TestingWaitTasksProcessed: {
- await waitUntilTasksProcessed(ws);
+ await waitTransactionState(wex, req.transactionId, req.txState);
return {};
}
case WalletApiOperation.GetCurrencySpecification: {
@@ -1361,12 +1188,12 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
const defaultResp: GetCurrencySpecificationResponse = {
currencySpecification: {
- name: "Unknown",
+ name: req.scope.currency,
num_fractional_input_digits: 2,
num_fractional_normal_digits: 2,
num_fractional_trailing_zero_digits: 2,
alt_unit_names: {
- "0": "??",
+ "0": req.scope.currency,
},
},
};
@@ -1374,7 +1201,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.ImportBackupRecovery: {
const req = codecForAny().decode(payload);
- await loadBackupRecovery(ws, req);
+ await loadBackupRecovery(wex, req);
return {};
}
// case WalletApiOperation.GetPlanForOperation: {
@@ -1383,31 +1210,31 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
// }
case WalletApiOperation.ConvertDepositAmount: {
const req = codecForConvertAmountRequest.decode(payload);
- return await convertDepositAmount(ws, req);
+ return await convertDepositAmount(wex, req);
}
case WalletApiOperation.GetMaxDepositAmount: {
const req = codecForGetAmountRequest.decode(payload);
- return await getMaxDepositAmount(ws, req);
+ return await getMaxDepositAmount(wex, req);
}
case WalletApiOperation.ConvertPeerPushAmount: {
const req = codecForConvertAmountRequest.decode(payload);
- return await convertPeerPushAmount(ws, req);
+ return await convertPeerPushAmount(wex, req);
}
case WalletApiOperation.GetMaxPeerPushAmount: {
const req = codecForGetAmountRequest.decode(payload);
- return await getMaxPeerPushAmount(ws, req);
+ return await getMaxPeerPushAmount(wex, req);
}
case WalletApiOperation.ConvertWithdrawalAmount: {
const req = codecForConvertAmountRequest.decode(payload);
- return await convertWithdrawalAmount(ws, req);
+ return await convertWithdrawalAmount(wex, req);
}
case WalletApiOperation.GetBackupInfo: {
- const resp = await getBackupInfo(ws);
+ const resp = await getBackupInfo(wex);
return resp;
}
case WalletApiOperation.PrepareDeposit: {
const req = codecForPrepareDepositRequest().decode(payload);
- return await prepareDepositGroup(ws, req);
+ return await checkDepositGroup(wex, req);
}
case WalletApiOperation.GenerateDepositGroupTxId:
return {
@@ -1415,99 +1242,282 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
};
case WalletApiOperation.CreateDepositGroup: {
const req = codecForCreateDepositGroupRequest().decode(payload);
- return await createDepositGroup(ws, req);
+ return await createDepositGroup(wex, req);
}
case WalletApiOperation.DeleteTransaction: {
const req = codecForDeleteTransactionRequest().decode(payload);
- await deleteTransaction(ws, req.transactionId);
+ await deleteTransaction(wex, req.transactionId);
return {};
}
case WalletApiOperation.RetryTransaction: {
const req = codecForRetryTransactionRequest().decode(payload);
- await retryTransaction(ws, req.transactionId);
+ await retryTransaction(wex, req.transactionId);
return {};
}
case WalletApiOperation.SetWalletDeviceId: {
const req = codecForSetWalletDeviceIdRequest().decode(payload);
- await setWalletDeviceId(ws, req.walletDeviceId);
+ await setWalletDeviceId(wex, req.walletDeviceId);
return {};
}
- case WalletApiOperation.ListCurrencies: {
- // FIXME: Remove / change to scoped currency approach.
- return {
- trustedAuditors: [],
- trustedExchanges: [],
- };
- }
case WalletApiOperation.TestCrypto: {
- return await ws.cryptoApi.hashString({ str: "hello world" });
+ return await wex.cryptoApi.hashString({ str: "hello world" });
}
- case WalletApiOperation.ClearDb:
- await clearDatabase(ws.db.idbHandle());
+ case WalletApiOperation.ClearDb: {
+ wex.ws.clearAllCaches();
+ await clearDatabase(wex.db.idbHandle());
return {};
+ }
case WalletApiOperation.Recycle: {
throw Error("not implemented");
return {};
}
case WalletApiOperation.ExportDb: {
- const dbDump = await exportDb(ws.idb);
+ const dbDump = await exportDb(wex.ws.idb);
return dbDump;
}
+ case WalletApiOperation.ListGlobalCurrencyExchanges: {
+ const resp: ListGlobalCurrencyExchangesResponse = {
+ exchanges: [],
+ };
+ await wex.db.runReadOnlyTx(
+ { storeNames: ["globalCurrencyExchanges"] },
+ async (tx) => {
+ const gceList = await tx.globalCurrencyExchanges.iter().toArray();
+ for (const gce of gceList) {
+ resp.exchanges.push({
+ currency: gce.currency,
+ exchangeBaseUrl: gce.exchangeBaseUrl,
+ exchangeMasterPub: gce.exchangeMasterPub,
+ });
+ }
+ },
+ );
+ return resp;
+ }
+ case WalletApiOperation.ListGlobalCurrencyAuditors: {
+ const resp: ListGlobalCurrencyAuditorsResponse = {
+ auditors: [],
+ };
+ await wex.db.runReadOnlyTx(
+ { storeNames: ["globalCurrencyAuditors"] },
+ async (tx) => {
+ const gcaList = await tx.globalCurrencyAuditors.iter().toArray();
+ for (const gca of gcaList) {
+ resp.auditors.push({
+ currency: gca.currency,
+ auditorBaseUrl: gca.auditorBaseUrl,
+ auditorPub: gca.auditorPub,
+ });
+ }
+ },
+ );
+ return resp;
+ }
+ case WalletApiOperation.AddGlobalCurrencyExchange: {
+ const req = codecForAddGlobalCurrencyExchangeRequest().decode(payload);
+ await wex.db.runReadWriteTx(
+ { storeNames: ["globalCurrencyExchanges"] },
+ async (tx) => {
+ const key = [
+ req.currency,
+ req.exchangeBaseUrl,
+ req.exchangeMasterPub,
+ ];
+ const existingRec =
+ await tx.globalCurrencyExchanges.indexes.byCurrencyAndUrlAndPub.get(
+ key,
+ );
+ if (existingRec) {
+ return;
+ }
+ wex.ws.exchangeCache.clear();
+ await tx.globalCurrencyExchanges.add({
+ currency: req.currency,
+ exchangeBaseUrl: req.exchangeBaseUrl,
+ exchangeMasterPub: req.exchangeMasterPub,
+ });
+ },
+ );
+ return {};
+ }
+ case WalletApiOperation.RemoveGlobalCurrencyExchange: {
+ const req = codecForRemoveGlobalCurrencyExchangeRequest().decode(payload);
+ await wex.db.runReadWriteTx(
+ { storeNames: ["globalCurrencyExchanges"] },
+ async (tx) => {
+ const key = [
+ req.currency,
+ req.exchangeBaseUrl,
+ req.exchangeMasterPub,
+ ];
+ const existingRec =
+ await tx.globalCurrencyExchanges.indexes.byCurrencyAndUrlAndPub.get(
+ key,
+ );
+ if (!existingRec) {
+ return;
+ }
+ wex.ws.exchangeCache.clear();
+ checkDbInvariant(!!existingRec.id);
+ await tx.globalCurrencyExchanges.delete(existingRec.id);
+ },
+ );
+ return {};
+ }
+ case WalletApiOperation.AddGlobalCurrencyAuditor: {
+ const req = codecForAddGlobalCurrencyAuditorRequest().decode(payload);
+ await wex.db.runReadWriteTx(
+ { storeNames: ["globalCurrencyAuditors"] },
+ async (tx) => {
+ const key = [req.currency, req.auditorBaseUrl, req.auditorPub];
+ const existingRec =
+ await tx.globalCurrencyAuditors.indexes.byCurrencyAndUrlAndPub.get(
+ key,
+ );
+ if (existingRec) {
+ return;
+ }
+ await tx.globalCurrencyAuditors.add({
+ currency: req.currency,
+ auditorBaseUrl: req.auditorBaseUrl,
+ auditorPub: req.auditorPub,
+ });
+ wex.ws.exchangeCache.clear();
+ },
+ );
+ return {};
+ }
+ case WalletApiOperation.TestingWaitTasksDone: {
+ await waitTasksDone(wex);
+ return {};
+ }
+ case WalletApiOperation.RemoveGlobalCurrencyAuditor: {
+ const req = codecForRemoveGlobalCurrencyAuditorRequest().decode(payload);
+ await wex.db.runReadWriteTx(
+ { storeNames: ["globalCurrencyAuditors"] },
+ async (tx) => {
+ const key = [req.currency, req.auditorBaseUrl, req.auditorPub];
+ const existingRec =
+ await tx.globalCurrencyAuditors.indexes.byCurrencyAndUrlAndPub.get(
+ key,
+ );
+ if (!existingRec) {
+ return;
+ }
+ checkDbInvariant(!!existingRec.id);
+ await tx.globalCurrencyAuditors.delete(existingRec.id);
+ wex.ws.exchangeCache.clear();
+ },
+ );
+ return {};
+ }
case WalletApiOperation.ImportDb: {
const req = codecForImportDbRequest().decode(payload);
- await importDb(ws.db.idbHandle(), req.dump);
+ await importDb(wex.db.idbHandle(), req.dump);
return [];
}
case WalletApiOperation.CheckPeerPushDebit: {
const req = codecForCheckPeerPushDebitRequest().decode(payload);
- return await checkPeerPushDebit(ws, req);
+ return await checkPeerPushDebit(wex, req);
}
case WalletApiOperation.InitiatePeerPushDebit: {
const req = codecForInitiatePeerPushDebitRequest().decode(payload);
- return await initiatePeerPushDebit(ws, req);
+ return await initiatePeerPushDebit(wex, req);
}
case WalletApiOperation.PreparePeerPushCredit: {
const req = codecForPreparePeerPushCreditRequest().decode(payload);
- return await preparePeerPushCredit(ws, req);
+ return await preparePeerPushCredit(wex, req);
}
case WalletApiOperation.ConfirmPeerPushCredit: {
const req = codecForConfirmPeerPushPaymentRequest().decode(payload);
- return await confirmPeerPushCredit(ws, req);
+ return await confirmPeerPushCredit(wex, req);
}
case WalletApiOperation.CheckPeerPullCredit: {
const req = codecForPreparePeerPullPaymentRequest().decode(payload);
- return await checkPeerPullPaymentInitiation(ws, req);
+ return await checkPeerPullPaymentInitiation(wex, req);
}
case WalletApiOperation.InitiatePeerPullCredit: {
const req = codecForInitiatePeerPullPaymentRequest().decode(payload);
- return await initiatePeerPullPayment(ws, req);
+ return await initiatePeerPullPayment(wex, req);
}
case WalletApiOperation.PreparePeerPullDebit: {
const req = codecForCheckPeerPullPaymentRequest().decode(payload);
- return await preparePeerPullDebit(ws, req);
+ return await preparePeerPullDebit(wex, req);
}
case WalletApiOperation.ConfirmPeerPullDebit: {
const req = codecForAcceptPeerPullPaymentRequest().decode(payload);
- return await confirmPeerPullDebit(ws, req);
+ return await confirmPeerPullDebit(wex, req);
}
case WalletApiOperation.ApplyDevExperiment: {
const req = codecForApplyDevExperiment().decode(payload);
- await applyDevExperiment(ws, req.devExperimentUri);
+ await applyDevExperiment(wex, req.devExperimentUri);
+ return {};
+ }
+ case WalletApiOperation.Shutdown: {
+ wex.ws.stop();
return {};
}
case WalletApiOperation.GetVersion: {
- return getVersion(ws);
+ return getVersion(wex);
}
case WalletApiOperation.TestingWaitTransactionsFinal:
- return await waitUntilTransactionsFinal(ws);
+ return await waitUntilAllTransactionsFinal(wex);
case WalletApiOperation.TestingWaitRefreshesFinal:
- return await waitUntilRefreshesDone(ws);
+ return await waitUntilRefreshesDone(wex);
case WalletApiOperation.TestingSetTimetravel: {
const req = codecForTestingSetTimetravelRequest().decode(payload);
setDangerousTimetravel(req.offsetMs);
- ws.workAvailable.trigger();
+ await wex.taskScheduler.reload();
+ return {};
+ }
+ case WalletApiOperation.DeleteExchange: {
+ const req = codecForDeleteExchangeRequest().decode(payload);
+ await deleteExchange(wex, req);
return {};
}
+ case WalletApiOperation.GetExchangeResources: {
+ const req = codecForGetExchangeResourcesRequest().decode(payload);
+ return await getExchangeResources(wex, req.exchangeBaseUrl);
+ }
+ case WalletApiOperation.CanonicalizeBaseUrl: {
+ const req = codecForCanonicalizeBaseUrlRequest().decode(payload);
+ return {
+ url: canonicalizeBaseUrl(req.url),
+ };
+ }
+ case WalletApiOperation.TestingInfiniteTransactionLoop: {
+ const myDelayMs = (payload as any).delayMs ?? 5;
+ const shouldFetch = !!(payload as any).shouldFetch;
+ const doFetch = async () => {
+ while (1) {
+ const url =
+ "https://exchange.demo.taler.net/reserves/01PMMB9PJN0QBWAFBXV6R0KNJJMAKXCV4D6FDG0GJFDJQXGYP32G?timeout_ms=30000";
+ logger.info(`fetching ${url}`);
+ const res = await wex.http.fetch(url);
+ logger.info(`fetch result ${res.status}`);
+ }
+ };
+ if (shouldFetch) {
+ // In the background!
+ doFetch();
+ }
+ let loopCount = 0;
+ while (true) {
+ logger.info(`looping test write tx, iteration ${loopCount}`);
+ await wex.db.runReadWriteTx({ storeNames: ["config"] }, async (tx) => {
+ await tx.config.put({
+ key: ConfigRecordKey.TestLoopTx,
+ value: loopCount,
+ });
+ });
+ if (myDelayMs != 0) {
+ await new Promise<void>((resolve, reject) => {
+ setTimeout(() => resolve(), myDelayMs);
+ });
+ }
+ loopCount = (loopCount + 1) % (Number.MAX_SAFE_INTEGER - 1);
+ }
+ }
// default:
// assertUnreachable(operation);
}
@@ -1520,21 +1530,62 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
);
}
-export function getVersion(ws: InternalWalletState): WalletCoreVersion {
+export function getVersion(wex: WalletExecutionContext): WalletCoreVersion {
const result: WalletCoreVersion = {
+ implementationSemver: walletCoreBuildInfo.implementationSemver,
+ implementationGitHash: walletCoreBuildInfo.implementationGitHash,
hash: undefined,
- version: WALLET_CORE_API_IMPLEMENTATION_VERSION,
+ version: WALLET_CORE_API_PROTOCOL_VERSION,
exchange: WALLET_EXCHANGE_PROTOCOL_VERSION,
merchant: WALLET_MERCHANT_PROTOCOL_VERSION,
bankConversionApiRange: WALLET_BANK_CONVERSION_API_PROTOCOL_VERSION,
bankIntegrationApiRange: WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
corebankApiRange: WALLET_COREBANK_API_PROTOCOL_VERSION,
bank: WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
- devMode: false,
+ devMode: wex.ws.config.testing.devModeActive,
};
return result;
}
+export function getObservedWalletExecutionContext(
+ ws: InternalWalletState,
+ cancellationToken: CancellationToken,
+ oc: ObservabilityContext,
+): WalletExecutionContext {
+ const wex: WalletExecutionContext = {
+ ws,
+ cancellationToken,
+ cryptoApi: observeTalerCrypto(ws.cryptoApi, oc),
+ db: new ObservableDbAccess(ws.db, oc),
+ http: new ObservableHttpClientLibrary(ws.http, oc),
+ taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
+ oc,
+ };
+ return wex;
+}
+
+export function getNormalWalletExecutionContext(
+ ws: InternalWalletState,
+ cancellationToken: CancellationToken,
+ oc: ObservabilityContext,
+): WalletExecutionContext {
+ const wex: WalletExecutionContext = {
+ ws,
+ cancellationToken,
+ cryptoApi: ws.cryptoApi,
+ db: ws.db,
+ get http() {
+ if (ws.initCalled) {
+ return ws.http;
+ }
+ throw Error("wallet not initialized");
+ },
+ taskScheduler: ws.taskScheduler,
+ oc,
+ };
+ return wex;
+}
+
/**
* Handle a request to the wallet-core API.
*/
@@ -1544,8 +1595,48 @@ async function handleCoreApiRequest(
id: string,
payload: unknown,
): Promise<CoreApiResponse> {
+ let wex: WalletExecutionContext;
+ let oc: ObservabilityContext;
+
+ const cts = CancellationToken.create();
+
+ if (ws.initCalled && ws.config.testing.emitObservabilityEvents) {
+ oc = {
+ observe(evt) {
+ ws.notify({
+ type: NotificationType.RequestObservabilityEvent,
+ operation,
+ requestId: id,
+ event: evt,
+ });
+ },
+ };
+
+ wex = getObservedWalletExecutionContext(ws, cts.token, oc);
+ } else {
+ oc = {
+ observe(evt) {},
+ };
+ wex = getNormalWalletExecutionContext(ws, cts.token, oc);
+ }
+
try {
- const result = await dispatchRequestInternal(ws, operation as any, payload);
+ const start = performanceNow();
+ await ws.ensureWalletDbOpen();
+ oc.observe({
+ type: ObservabilityEventType.RequestStart,
+ });
+ const result = await dispatchRequestInternal(
+ wex,
+ cts,
+ operation as any,
+ payload,
+ );
+ const end = performanceNow();
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishSuccess,
+ durationMs: Number((end - start) / 1000n / 1000n),
+ });
return {
type: "response",
operation,
@@ -1557,6 +1648,9 @@ async function handleCoreApiRequest(
logger.info(
`finished wallet core request ${operation} with error: ${j2s(err)}`,
);
+ oc.observe({
+ type: ObservabilityEventType.RequestFinishError,
+ });
return {
type: "error",
operation,
@@ -1566,6 +1660,34 @@ async function handleCoreApiRequest(
}
}
+export function applyRunConfigDefaults(
+ wcp?: PartialWalletRunConfig,
+): WalletRunConfig {
+ return {
+ builtin: {
+ exchanges: wcp?.builtin?.exchanges ?? [
+ {
+ exchangeBaseUrl: "https://exchange.demo.taler.net/",
+ currencyHint: "KUDOS",
+ },
+ ],
+ },
+ features: {
+ allowHttp: wcp?.features?.allowHttp ?? false,
+ },
+ testing: {
+ denomselAllowLate: wcp?.testing?.denomselAllowLate ?? false,
+ devModeActive: wcp?.testing?.devModeActive ?? false,
+ insecureTrustExchange: wcp?.testing?.insecureTrustExchange ?? false,
+ preventThrottling: wcp?.testing?.preventThrottling ?? false,
+ skipDefaults: wcp?.testing?.skipDefaults ?? false,
+ emitObservabilityEvents: wcp?.testing?.emitObservabilityEvents ?? false,
+ },
+ };
+}
+
+export type HttpFactory = (config: WalletRunConfig) => HttpRequestLibrary;
+
/**
* Public handle to a running wallet.
*/
@@ -1575,17 +1697,15 @@ export class Wallet {
private constructor(
idb: IDBFactory,
- http: HttpRequestLibrary,
+ httpFactory: HttpFactory,
timer: TimerAPI,
cryptoWorkerFactory: CryptoWorkerFactory,
- config?: WalletConfigParameter,
) {
- this.ws = new InternalWalletStateImpl(
+ this.ws = new InternalWalletState(
idb,
- http,
+ httpFactory,
timer,
cryptoWorkerFactory,
- Wallet.getEffectiveConfig(config),
);
}
@@ -1598,61 +1718,19 @@ export class Wallet {
static async create(
idb: IDBFactory,
- http: HttpRequestLibrary,
+ httpFactory: HttpFactory,
timer: TimerAPI,
cryptoWorkerFactory: CryptoWorkerFactory,
- config?: WalletConfigParameter,
): Promise<Wallet> {
- const w = new Wallet(idb, http, timer, cryptoWorkerFactory, config);
+ const w = new Wallet(idb, httpFactory, timer, cryptoWorkerFactory);
w._client = await getClientFromWalletState(w.ws);
return w;
}
- public static defaultConfig: Readonly<WalletConfig> = {
- builtin: {
- exchanges: [
- {
- exchangeBaseUrl: "https://exchange.demo.taler.net/",
- currencyHint: "KUDOS",
- },
- ],
- },
- features: {
- allowHttp: false,
- },
- testing: {
- preventThrottling: false,
- devModeActive: false,
- insecureTrustExchange: false,
- denomselAllowLate: false,
- skipDefaults: false,
- },
- };
-
- static getEffectiveConfig(
- param?: WalletConfigParameter,
- ): Readonly<WalletConfig> {
- return deepMerge(Wallet.defaultConfig, param ?? {});
- }
-
addNotificationListener(f: (n: WalletNotification) => void): CancelFn {
return this.ws.addNotificationListener(f);
}
- stop(): void {
- this.ws.stop();
- }
-
- async runPending(): Promise<void> {
- await this.ws.ensureWalletDbOpen();
- return runPending(this.ws);
- }
-
- async runTaskLoop(opts?: RetryLoopOpts): Promise<TaskLoopResult> {
- await this.ws.ensureWalletDbOpen();
- return runTaskLoop(this.ws, opts);
- }
-
async handleCoreApiRequest(
operation: string,
id: string,
@@ -1663,49 +1741,107 @@ export class Wallet {
}
}
+export interface DevExperimentState {
+ blockRefreshes?: boolean;
+}
+
+export class Cache<T> {
+ private map: Map<string, [AbsoluteTime, T]> = new Map();
+
+ constructor(
+ private maxCapacity: number,
+ private cacheDuration: Duration,
+ ) {}
+
+ get(key: string): T | undefined {
+ const r = this.map.get(key);
+ if (!r) {
+ return undefined;
+ }
+
+ if (AbsoluteTime.isExpired(r[0])) {
+ this.map.delete(key);
+ return undefined;
+ }
+
+ return r[1];
+ }
+
+ clear(): void {
+ this.map.clear();
+ }
+
+ put(key: string, value: T): void {
+ if (this.map.size > this.maxCapacity) {
+ this.map.clear();
+ }
+ const expiry = AbsoluteTime.addDuration(
+ AbsoluteTime.now(),
+ this.cacheDuration,
+ );
+ this.map.set(key, [expiry, value]);
+ }
+}
+
+/**
+ * Implementation of triggers for the wallet DB.
+ */
+class WalletDbTriggerSpec implements TriggerSpec {
+ constructor(public ws: InternalWalletState) {}
+
+ afterCommit(info: AfterCommitInfo): void {
+ if (info.mode !== "readwrite") {
+ return;
+ }
+ logger.info(
+ `in after commit callback for readwrite, modified ${j2s([
+ ...info.modifiedStores,
+ ])}`,
+ );
+ const modified = info.accessedStores;
+ if (
+ modified.has(WalletStoresV1.exchanges.storeName) ||
+ modified.has(WalletStoresV1.exchangeDetails.storeName) ||
+ modified.has(WalletStoresV1.denominations.storeName) ||
+ modified.has(WalletStoresV1.globalCurrencyAuditors.storeName) ||
+ modified.has(WalletStoresV1.globalCurrencyExchanges.storeName)
+ ) {
+ this.ws.clearAllCaches();
+ }
+ }
+}
+
/**
* Internal state of the wallet.
*
* This ties together all the operation implementations.
*/
-class InternalWalletStateImpl implements InternalWalletState {
- /**
- * @see {@link InternalWalletState.activeLongpoll}
- */
- activeLongpoll: ActiveLongpollInfo = {};
-
+export class InternalWalletState {
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;
- merchantInfoCache: Record<string, MerchantInfo> = {};
-
readonly timerGroup: TimerGroup;
workAvailable = new AsyncCondition();
stopped = false;
- listeners: NotificationListener[] = [];
+ private listeners: NotificationListener[] = [];
initCalled = false;
- exchangeOps: ExchangeOperations = {
- getExchangeDetails,
- fetchFreshExchange,
- };
-
- recoupOps: RecoupOperations = {
- createRecoupGroup,
- };
-
- merchantOps: MerchantOperations = {
- getMerchantInfo,
- };
+ refreshCostCache: Cache<AmountJson> = new Cache(
+ 1000,
+ Duration.fromSpec({ minutes: 1 }),
+ );
- refreshOps: RefreshOperations = {
- createRefreshGroup,
- };
+ denomInfoCache: Cache<DenominationInfo> = new Cache(
+ 1000,
+ Duration.fromSpec({ minutes: 1 }),
+ );
- // FIXME: Use an LRU cache here.
- private denomCache: Record<string, DenominationInfo> = {};
+ exchangeCache: Cache<ReadyExchangeSummary> = new Cache(
+ 1000,
+ Duration.fromSpec({ minutes: 1 }),
+ );
/**
* Promises that are waiting for a particular resource.
@@ -1717,153 +1853,106 @@ class InternalWalletStateImpl implements InternalWalletState {
*/
private resourceLocks: Set<string> = new Set();
- isTaskLoopRunning: boolean = false;
+ taskScheduler: TaskScheduler = new TaskSchedulerImpl(this);
+
+ private _config: Readonly<WalletRunConfig> | undefined;
- config: Readonly<WalletConfig>;
+ private _indexedDbHandle: IDBDatabase | undefined = undefined;
- private _db: DbAccess<typeof WalletStoresV1> | undefined = undefined;
+ private _dbAccessHandle: DbAccess<typeof WalletStoresV1> | undefined;
+
+ private _http: HttpRequestLibrary | undefined = undefined;
get db(): DbAccess<typeof WalletStoresV1> {
- if (!this._db) {
+ if (!this._dbAccessHandle) {
+ this._dbAccessHandle = this.createDbAccessHandle(
+ CancellationToken.CONTINUE,
+ );
+ }
+ return this._dbAccessHandle;
+ }
+
+ devExperimentState: DevExperimentState = {};
+
+ clientCancellationMap: Map<string, CancellationToken.Source> = new Map();
+
+ clearAllCaches(): void {
+ this.exchangeCache.clear();
+ this.denomInfoCache.clear();
+ this.refreshCostCache.clear();
+ }
+
+ initWithConfig(newConfig: WalletRunConfig): void {
+ this._config = newConfig;
+
+ logger.info(`setting new config to ${j2s(newConfig)}`);
+
+ this._http = this.httpFactory(newConfig);
+
+ if (this.config.testing.devModeActive) {
+ this._http = new DevExperimentHttpLib(this.http);
+ }
+ }
+
+ createDbAccessHandle(
+ cancellationToken: CancellationToken,
+ ): DbAccess<typeof WalletStoresV1> {
+ if (!this._indexedDbHandle) {
throw Error("db not initialized");
}
- return this._db;
+ return new DbAccessImpl(
+ this._indexedDbHandle,
+ WalletStoresV1,
+ new WalletDbTriggerSpec(this),
+ cancellationToken,
+ );
+ }
+
+ get config(): WalletRunConfig {
+ if (!this._config) {
+ throw Error("config not initialized");
+ }
+ return this._config;
+ }
+
+ get http(): HttpRequestLibrary {
+ if (!this._http) {
+ throw Error("wallet not initialized");
+ }
+ return this._http;
}
constructor(
public idb: IDBFactory,
- public http: HttpRequestLibrary,
+ private httpFactory: HttpFactory,
public timer: TimerAPI,
cryptoWorkerFactory: CryptoWorkerFactory,
- configParam: WalletConfig,
) {
this.cryptoDispatcher = new CryptoDispatcher(cryptoWorkerFactory);
this.cryptoApi = this.cryptoDispatcher.cryptoApi;
this.timerGroup = new TimerGroup(timer);
- this.config = configParam;
- if (this.config.testing.devModeActive) {
- this.http = new DevExperimentHttpLib(this.http);
- }
}
async ensureWalletDbOpen(): Promise<void> {
- if (this._db) {
+ if (this._indexedDbHandle) {
return;
}
const myVersionChange = async (): Promise<void> => {
logger.info("version change requested for Taler DB");
};
- const myDb = await openTalerDatabase(this.idb, myVersionChange);
- this._db = myDb;
- }
-
- async getTransactionState(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- transactionId: string,
- ): Promise<TransactionState | undefined> {
- const parsedTxId = parseTransactionIdentifier(transactionId);
- if (!parsedTxId) {
- throw Error("invalid tx identifier");
- }
- switch (parsedTxId.tag) {
- case TransactionType.Deposit: {
- const rec = await tx.depositGroups.get(parsedTxId.depositGroupId);
- if (!rec) {
- return undefined;
- }
- return computeDepositTransactionStatus(rec);
- }
- case TransactionType.InternalWithdrawal:
- case TransactionType.Withdrawal: {
- const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId);
- if (!rec) {
- return undefined;
- }
- return computeWithdrawalTransactionStatus(rec);
- }
- case TransactionType.Payment: {
- const rec = await tx.purchases.get(parsedTxId.proposalId);
- if (!rec) {
- return;
- }
- return computePayMerchantTransactionState(rec);
- }
- case TransactionType.Refund: {
- const rec = await tx.refundGroups.get(parsedTxId.refundGroupId);
- if (!rec) {
- return undefined;
- }
- return computeRefundTransactionState(rec);
- }
- case TransactionType.PeerPullCredit:
- const rec = await tx.peerPullCredit.get(parsedTxId.pursePub);
- if (!rec) {
- return undefined;
- }
- return computePeerPullCreditTransactionState(rec);
- case TransactionType.PeerPullDebit: {
- const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId);
- if (!rec) {
- return undefined;
- }
- return computePeerPullDebitTransactionState(rec);
- }
- case TransactionType.PeerPushCredit: {
- const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId);
- if (!rec) {
- return undefined;
- }
- return computePeerPushCreditTransactionState(rec);
- }
- case TransactionType.PeerPushDebit: {
- const rec = await tx.peerPushDebit.get(parsedTxId.pursePub);
- if (!rec) {
- return undefined;
- }
- return computePeerPushDebitTransactionState(rec);
- }
- case TransactionType.Refresh: {
- const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId);
- if (!rec) {
- return undefined;
- }
- return computeRefreshTransactionState(rec);
- }
- case TransactionType.Reward: {
- const rec = await tx.rewards.get(parsedTxId.walletRewardId);
- if (!rec) {
- return undefined;
- }
- return computeRewardTransactionStatus(rec);
- }
- default:
- assertUnreachable(parsedTxId);
+ try {
+ const myDb = await openTalerDatabase(this.idb, myVersionChange);
+ this._indexedDbHandle = myDb;
+ } catch (e) {
+ logger.error("error writing to database during initialization");
+ throw TalerError.fromDetail(TalerErrorCode.WALLET_DB_UNAVAILABLE, {
+ innerError: getErrorDetailFromException(e),
+ });
}
}
- async getDenomInfo(
- ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- denominations: typeof WalletStoresV1.denominations;
- }>,
- exchangeBaseUrl: string,
- denomPubHash: string,
- ): Promise<DenominationInfo | undefined> {
- const key = `${exchangeBaseUrl}:${denomPubHash}`;
- const cached = this.denomCache[key];
- if (cached) {
- return cached;
- }
- const d = await tx.denominations.get([exchangeBaseUrl, denomPubHash]);
- if (d) {
- return DenominationRecord.toDenomInfo(d);
- }
- return undefined;
- }
-
notify(n: WalletNotification): void {
- logger.trace("Notification", j2s(n));
+ logger.trace(`Notification: ${j2s(n)}`);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
setTimeout(() => {
@@ -1890,11 +1979,9 @@ class InternalWalletStateImpl implements InternalWalletState {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop();
- for (const key of Object.keys(this.activeLongpoll)) {
- logger.trace(`cancelling active longpoll ${key}`);
- this.activeLongpoll[key].cancel();
- delete this.activeLongpoll[key];
- }
+ this.taskScheduler.shutdown().catch((e) => {
+ logger.warn(`shutdown failed: ${safeStringifyException(e)}`);
+ });
}
/**
@@ -1929,48 +2016,11 @@ class InternalWalletStateImpl implements InternalWalletState {
} finally {
for (const token of tokens) {
this.resourceLocks.delete(token);
- let waiter = (this.resourceWaiters[token] ?? []).shift();
+ const waiter = (this.resourceWaiters[token] ?? []).shift();
if (waiter) {
waiter.resolve();
}
}
}
}
-
- ensureTaskLoopRunning(): void {
- if (this.isTaskLoopRunning) {
- return;
- }
- runTaskLoop(this)
- .catch((e) => {
- logger.error("error running task loop");
- logger.error(`err: ${e}`);
- })
- .then(() => {
- logger.info("done running task loop");
- });
- }
-}
-
-/**
- * Take the full object as template, create a new result with all the values.
- * Use the override object to change the values in the result
- * return result
- * @param full
- * @param override
- * @returns
- */
-function deepMerge<T extends object>(full: T, override: object): T {
- const keys = Object.keys(full);
- const result = { ...full };
- for (const k of keys) {
- // @ts-ignore
- const newVal = override[k];
- if (newVal === undefined) continue;
- // @ts-ignore
- result[k] =
- // @ts-ignore
- typeof newVal === "object" ? deepMerge(full[k], newVal) : newVal;
- }
- return result;
}