diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/common.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/common.ts | 466 |
1 files changed, 63 insertions, 403 deletions
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 4c7c55212..92950b35b 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -21,7 +21,6 @@ import { AbsoluteTime, AmountJson, Amounts, - CancellationToken, CoinRefreshRequest, CoinStatus, Duration, @@ -29,22 +28,15 @@ import { ExchangeEntryStatus, ExchangeTosStatus, ExchangeUpdateStatus, - getErrorDetailFromException, - j2s, Logger, - makeErrorDetail, - NotificationType, RefreshReason, - TalerError, - TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, + TalerProtocolTimestamp, TombstoneIdStr, TransactionIdStr, - TransactionType, - WalletNotification, + durationMul, } from "@gnu-taler/taler-util"; -import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js"; import { BackupProviderRecord, CoinRecord, @@ -61,17 +53,16 @@ import { RecoupGroupRecord, RefreshGroupRecord, RewardRecord, - timestampPreciseToDb, WalletStoresV1, WithdrawalGroupRecord, + timestampPreciseToDb, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType, TaskId } from "../pending-types.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; +import { GetReadWriteAccess } from "../util/query.js"; import { createRefreshGroup } from "./refresh.js"; -import { constructTransactionIdentifier } from "./transactions.js"; const logger = new Logger("operations/common.ts"); @@ -251,331 +242,6 @@ export async function spendCoins( ); } -/** - * Convert the task ID for a task that processes a transaction int - * the ID for the transaction. - */ -function convertTaskToTransactionId( - taskId: string, -): TransactionIdStr | undefined { - const parsedTaskId = parseTaskIdentifier(taskId); - switch (parsedTaskId.tag) { - case PendingTaskType.PeerPullCredit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPullCredit, - pursePub: parsedTaskId.pursePub, - }); - case PendingTaskType.PeerPullDebit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPullDebit, - peerPullDebitId: parsedTaskId.peerPullDebitId, - }); - // FIXME: This doesn't distinguish internal-withdrawal. - // Maybe we should have a different task type for that as well? - // Or maybe transaction IDs should be valid task identifiers? - case PendingTaskType.Withdraw: - return constructTransactionIdentifier({ - tag: TransactionType.Withdrawal, - withdrawalGroupId: parsedTaskId.withdrawalGroupId, - }); - case PendingTaskType.PeerPushCredit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPushCredit, - peerPushCreditId: parsedTaskId.peerPushCreditId, - }); - case PendingTaskType.Deposit: - return constructTransactionIdentifier({ - tag: TransactionType.Deposit, - depositGroupId: parsedTaskId.depositGroupId, - }); - case PendingTaskType.Refresh: - return constructTransactionIdentifier({ - tag: TransactionType.Refresh, - refreshGroupId: parsedTaskId.refreshGroupId, - }); - case PendingTaskType.RewardPickup: - return constructTransactionIdentifier({ - tag: TransactionType.Reward, - walletRewardId: parsedTaskId.walletRewardId, - }); - case PendingTaskType.PeerPushDebit: - return constructTransactionIdentifier({ - tag: TransactionType.PeerPushDebit, - pursePub: parsedTaskId.pursePub, - }); - case PendingTaskType.Purchase: - return constructTransactionIdentifier({ - tag: TransactionType.Payment, - proposalId: parsedTaskId.proposalId, - }); - default: - return undefined; - } -} - -async function makeTransactionRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess<typeof WalletStoresV1>, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise<WalletNotification | undefined> { - const txId = convertTaskToTransactionId(pendingTaskId); - if (!txId) { - return undefined; - } - const txState = await ws.getTransactionState(ws, tx, txId); - if (!txState) { - return undefined; - } - const notif: WalletNotification = { - type: NotificationType.TransactionStateTransition, - transactionId: txId, - oldTxState: txState, - newTxState: txState, - }; - if (e) { - notif.errorInfo = { - code: e.code as number, - hint: e.hint, - }; - } - return notif; -} - -async function makeExchangeRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess<typeof WalletStoresV1>, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise<WalletNotification | undefined> { - logger.info("making exchange retry notification"); - const parsedTaskId = parseTaskIdentifier(pendingTaskId); - if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { - throw Error("invalid task identifier"); - } - const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); - - if (!rec) { - logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); - return undefined; - } - - const notif: WalletNotification = { - type: NotificationType.ExchangeStateTransition, - exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, - oldExchangeState: getExchangeState(rec), - newExchangeState: getExchangeState(rec), - }; - if (e) { - notif.errorInfo = { - code: e.code as number, - hint: e.hint, - }; - } - return notif; -} - -/** - * Generate an appropriate error transition notification - * for applicable tasks. - * - * Namely, transition notifications are generated for: - * - exchange update errors - * - transactions - */ -async function taskToRetryNotification( - ws: InternalWalletState, - tx: GetReadOnlyAccess<typeof WalletStoresV1>, - pendingTaskId: string, - e: TalerErrorDetail | undefined, -): Promise<WalletNotification | undefined> { - const parsedTaskId = parseTaskIdentifier(pendingTaskId); - - switch (parsedTaskId.tag) { - case PendingTaskType.ExchangeUpdate: - return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); - case PendingTaskType.PeerPullCredit: - case PendingTaskType.PeerPullDebit: - case PendingTaskType.Withdraw: - case PendingTaskType.PeerPushCredit: - case PendingTaskType.Deposit: - case PendingTaskType.Refresh: - case PendingTaskType.RewardPickup: - case PendingTaskType.PeerPushDebit: - case PendingTaskType.Purchase: - return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); - case PendingTaskType.Backup: - case PendingTaskType.ExchangeCheckRefresh: - case PendingTaskType.Recoup: - return undefined; - } -} - -async function storePendingTaskError( - ws: InternalWalletState, - pendingTaskId: string, - e: TalerErrorDetail, -): Promise<void> { - logger.info(`storing pending task error for ${pendingTaskId}`); - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - if (!retryRecord) { - retryRecord = { - id: pendingTaskId, - lastError: e, - retryInfo: DbRetryInfo.reset(), - }; - } else { - retryRecord.lastError = e; - retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); - } - await tx.operationRetries.put(retryRecord); - return taskToRetryNotification(ws, tx, pendingTaskId, e); - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -export async function resetPendingTaskTimeout( - ws: InternalWalletState, - pendingTaskId: string, -): Promise<void> { - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - if (retryRecord) { - // Note that we don't reset the lastError, it should still be visible - // while the retry runs. - retryRecord.retryInfo = DbRetryInfo.reset(); - await tx.operationRetries.put(retryRecord); - } - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -async function storePendingTaskPending( - ws: InternalWalletState, - pendingTaskId: string, -): Promise<void> { - const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => { - let retryRecord = await tx.operationRetries.get(pendingTaskId); - let hadError = false; - if (!retryRecord) { - retryRecord = { - id: pendingTaskId, - retryInfo: DbRetryInfo.reset(), - }; - } else { - if (retryRecord.lastError) { - hadError = true; - } - delete retryRecord.lastError; - retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); - } - await tx.operationRetries.put(retryRecord); - if (hadError) { - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - } else { - return undefined; - } - }); - if (maybeNotification) { - ws.notify(maybeNotification); - } -} - -async function storePendingTaskFinished( - ws: InternalWalletState, - pendingTaskId: string, -): Promise<void> { - await ws.db - .mktx((x) => [x.operationRetries]) - .runReadWrite(async (tx) => { - await tx.operationRetries.delete(pendingTaskId); - }); -} - -export async function runTaskWithErrorReporting( - ws: InternalWalletState, - opId: TaskId, - f: () => Promise<TaskRunResult>, -): Promise<TaskRunResult> { - let maybeError: TalerErrorDetail | undefined; - try { - const resp = await f(); - switch (resp.type) { - case TaskRunResultType.Error: - await storePendingTaskError(ws, opId, resp.errorDetail); - return resp; - case TaskRunResultType.Finished: - await storePendingTaskFinished(ws, opId); - return resp; - case TaskRunResultType.Pending: - await storePendingTaskPending(ws, opId); - return resp; - case TaskRunResultType.Longpoll: - return resp; - } - } catch (e) { - if (e instanceof CryptoApiStoppedError) { - if (ws.stopped) { - logger.warn("crypto API stopped during shutdown, ignoring error"); - return { - type: TaskRunResultType.Error, - errorDetail: makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - "Crypto API stopped during shutdown", - ), - }; - } - } - if (e instanceof TalerError) { - logger.warn("operation processed resulted in error"); - logger.warn(`error was: ${j2s(e.errorDetail)}`); - maybeError = e.errorDetail; - await storePendingTaskError(ws, opId, maybeError!); - return { - type: TaskRunResultType.Error, - errorDetail: e.errorDetail, - }; - } else if (e instanceof Error) { - // This is a bug, as we expect pending operations to always - // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED - // or return something. - logger.error(`Uncaught exception: ${e.message}`); - logger.error(`Stack: ${e.stack}`); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - { - stack: e.stack, - }, - `unexpected exception (message: ${e.message})`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } else { - logger.error("Uncaught exception, value is not even an error."); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - `unexpected exception (not even an error)`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } - } -} - export enum TombstoneTag { DeleteWithdrawalGroup = "delete-withdrawal-group", DeleteReserve = "delete-reserve", @@ -646,47 +312,6 @@ export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState { }; } -export interface LongpollResult { - ready: boolean; -} - -export function runLongpollAsync( - ws: InternalWalletState, - retryTag: string, - reqFn: (ct: CancellationToken) => Promise<LongpollResult>, -): void { - const asyncFn = async () => { - if (ws.stopped) { - logger.trace("not long-polling reserve, wallet already stopped"); - await storePendingTaskPending(ws, retryTag); - return; - } - const cts = CancellationToken.create(); - let res: { ready: boolean } | undefined = undefined; - try { - ws.activeLongpoll[retryTag] = { - cancel: () => { - logger.trace("cancel of reserve longpoll requested"); - cts.cancel(); - }, - }; - res = await reqFn(cts.token); - } catch (e) { - const errDetail = getErrorDetailFromException(e); - logger.warn(`got error during long-polling: ${j2s(errDetail)}`); - await storePendingTaskError(ws, retryTag, errDetail); - return; - } finally { - delete ws.activeLongpoll[retryTag]; - } - if (!res.ready) { - await storePendingTaskPending(ws, retryTag); - } - ws.workAvailable.trigger(); - }; - asyncFn(); -} - export type ParsedTombstone = | { tag: TombstoneTag.DeleteWithdrawalGroup; @@ -732,31 +357,53 @@ export interface TransactionManager { export enum TaskRunResultType { Finished = "finished", - Pending = "pending", + Backoff = "backoff", + Progress = "progress", Error = "error", - Longpoll = "longpoll", + ScheduleLater = "schedule-later", } export type TaskRunResult = | TaskRunFinishedResult | TaskRunErrorResult - | TaskRunLongpollResult - | TaskRunPendingResult; + | TaskRunBackoffResult + | TaskRunProgressResult + | TaskRunScheduleLaterResult; export namespace TaskRunResult { + /** + * Task is finished and does not need to be processed again. + */ export function finished(): TaskRunResult { return { type: TaskRunResultType.Finished, }; } - export function pending(): TaskRunResult { + /** + * Task is waiting for something, should be invoked + * again with exponentiall back-off until some other + * result is returned. + */ + export function backoff(): TaskRunResult { + return { + type: TaskRunResultType.Backoff, + }; + } + /** + * Task made progress and should be processed again. + */ + export function progress(): TaskRunResult { return { - type: TaskRunResultType.Pending, + type: TaskRunResultType.Progress, }; } - export function longpoll(): TaskRunResult { + /** + * Run the task again at a fixed time in the future. + */ + export function runAgainAt(runAt: AbsoluteTime): TaskRunResult { return { - type: TaskRunResultType.Longpoll, + type: TaskRunResultType.ScheduleLater, + runAt, }; } } @@ -765,8 +412,17 @@ export interface TaskRunFinishedResult { type: TaskRunResultType.Finished; } -export interface TaskRunPendingResult { - type: TaskRunResultType.Pending; +export interface TaskRunBackoffResult { + type: TaskRunResultType.Backoff; +} + +export interface TaskRunProgressResult { + type: TaskRunResultType.Progress; +} + +export interface TaskRunScheduleLaterResult { + type: TaskRunResultType.ScheduleLater; + runAt: AbsoluteTime; } export interface TaskRunErrorResult { @@ -774,10 +430,6 @@ export interface TaskRunErrorResult { errorDetail: TalerErrorDetail; } -export interface TaskRunLongpollResult { - type: TaskRunResultType.Longpoll; -} - export interface DbRetryInfo { firstTry: DbPreciseTimestamp; nextRetry: DbPreciseTimestamp; @@ -867,6 +519,24 @@ export namespace DbRetryInfo { } /** + * Timestamp after which the wallet would do an auto-refresh. + */ +export function getAutoRefreshExecuteThreshold(d: { + stampExpireWithdraw: TalerProtocolTimestamp; + stampExpireDeposit: TalerProtocolTimestamp; +}): AbsoluteTime { + const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireWithdraw, + ); + const expireDeposit = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireDeposit, + ); + const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); + const deltaDiv = durationMul(delta, 0.5); + return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); +} + +/** * Parsed representation of task identifiers. */ export type ParsedTaskIdentifier = @@ -877,7 +547,6 @@ export type ParsedTaskIdentifier = | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string } | { tag: PendingTaskType.Deposit; depositGroupId: string } - | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string } | { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string } | { tag: PendingTaskType.PeerPullCredit; pursePub: string } | { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string } @@ -900,8 +569,6 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.Deposit: return { tag: type, depositGroupId: rest[0] }; - case PendingTaskType.ExchangeCheckRefresh: - return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.ExchangeUpdate: return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) }; case PendingTaskType.PeerPullCredit: @@ -933,8 +600,6 @@ export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId { return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId; case PendingTaskType.Deposit: return `${p.tag}:${p.depositGroupId}` as TaskId; - case PendingTaskType.ExchangeCheckRefresh: - return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId; case PendingTaskType.ExchangeUpdate: return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId; case PendingTaskType.PeerPullDebit: @@ -974,11 +639,6 @@ export namespace TaskIdentifiers { exchBaseUrl, )}` as TaskId; } - export function forExchangeCheckRefresh(exch: ExchangeEntryRecord): TaskId { - return `${PendingTaskType.ExchangeCheckRefresh}:${encodeURIComponent( - exch.baseUrl, - )}` as TaskId; - } export function forTipPickup(tipRecord: RewardRecord): TaskId { return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId; } |