diff options
Diffstat (limited to 'packages/taler-wallet-core/src/common.ts')
-rw-r--r-- | packages/taler-wallet-core/src/common.ts | 897 |
1 files changed, 782 insertions, 115 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index dd8542def..edaba5ba4 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2019 GNUnet e.V. + (C) 2022 GNUnet e.V. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -15,142 +15,809 @@ */ /** - * Common interface of the internal wallet state. This object is passed - * to the various operations (exchange management, withdrawal, refresh, reserve - * management, etc.). - * - * Some operations can be accessed via this state object. This allows mutual - * recursion between operations, without having cycling dependencies between - * the respective TypeScript files. - * - * (You can think of this as a "header file" for the wallet implementation.) - */ - -/** * Imports. */ -import { WalletNotification, BalancesResponse } from "@gnu-taler/taler-util"; -import { CryptoApi } from "./crypto/workers/cryptoApi.js"; -import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js"; -import { PendingOperationsResponse } from "./pending-types.js"; -import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js"; -import { HttpRequestLibrary } from "./util/http.js"; -import { AsyncCondition } from "./util/promiseUtils.js"; import { - DbAccess, - GetReadOnlyAccess, - GetReadWriteAccess, -} from "./util/query.js"; -import { TimerGroup } from "./util/timer.js"; + AbsoluteTime, + AmountJson, + Amounts, + AsyncFlag, + CoinRefreshRequest, + CoinStatus, + Duration, + ExchangeEntryState, + ExchangeEntryStatus, + ExchangeTosStatus, + ExchangeUpdateStatus, + Logger, + RefreshReason, + TalerErrorDetail, + TalerPreciseTimestamp, + TalerProtocolTimestamp, + TombstoneIdStr, + TransactionIdStr, + WalletNotification, + assertUnreachable, + checkDbInvariant, + checkLogicInvariant, + durationMul, +} from "@gnu-taler/taler-util"; +import { + BackupProviderRecord, + CoinRecord, + DbPreciseTimestamp, + DepositGroupRecord, + ExchangeEntryDbRecordStatus, + ExchangeEntryDbUpdateStatus, + ExchangeEntryRecord, + PeerPullCreditRecord, + PeerPullPaymentIncomingRecord, + PeerPushDebitRecord, + PeerPushPaymentIncomingRecord, + PurchaseRecord, + RecoupGroupRecord, + RefreshGroupRecord, + RewardRecord, + WalletDbReadWriteTransaction, + WithdrawalGroupRecord, + timestampPreciseToDb, +} from "./db.js"; +import { createRefreshGroup } from "./refresh.js"; +import { WalletExecutionContext, getDenomInfo } from "./wallet.js"; + +const logger = new Logger("operations/common.ts"); + +export interface CoinsSpendInfo { + coinPubs: string[]; + contributions: AmountJson[]; + refreshReason: RefreshReason; + /** + * Identifier for what the coin has been spent for. + */ + allocationId: TransactionIdStr; +} + +export async function makeCoinsVisible( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction<["coins", "coinAvailability"]>, + transactionId: string, +): Promise<void> { + const coins = + await tx.coins.indexes.bySourceTransactionId.getAll(transactionId); + for (const coinRecord of coins) { + if (!coinRecord.visible) { + coinRecord.visible = 1; + await tx.coins.put(coinRecord); + const ageRestriction = coinRecord.maxAge; + const car = await tx.coinAvailability.get([ + coinRecord.exchangeBaseUrl, + coinRecord.denomPubHash, + ageRestriction, + ]); + if (!car) { + logger.error("missing coin availability record"); + continue; + } + const visCount = car.visibleCoinCount ?? 0; + car.visibleCoinCount = visCount + 1; + await tx.coinAvailability.put(car); + } + } +} -export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; -export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; +export async function makeCoinAvailable( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + ["coins", "coinAvailability", "denominations"] + >, + coinRecord: CoinRecord, +): Promise<void> { + checkLogicInvariant(coinRecord.status === CoinStatus.Fresh); + const existingCoin = await tx.coins.get(coinRecord.coinPub); + if (existingCoin) { + return; + } + const denom = await tx.denominations.get([ + coinRecord.exchangeBaseUrl, + coinRecord.denomPubHash, + ]); + checkDbInvariant(!!denom); + const ageRestriction = coinRecord.maxAge; + let car = await tx.coinAvailability.get([ + coinRecord.exchangeBaseUrl, + coinRecord.denomPubHash, + ageRestriction, + ]); + if (!car) { + car = { + maxAge: ageRestriction, + value: denom.value, + currency: denom.currency, + denomPubHash: denom.denomPubHash, + exchangeBaseUrl: denom.exchangeBaseUrl, + freshCoinCount: 0, + visibleCoinCount: 0, + }; + } + car.freshCoinCount++; + await tx.coins.put(coinRecord); + await tx.coinAvailability.put(car); +} + +export async function spendCoins( + wex: WalletExecutionContext, + tx: WalletDbReadWriteTransaction< + [ + "coins", + "coinAvailability", + "refreshGroups", + "refreshSessions", + "denominations", + ] + >, + csi: CoinsSpendInfo, +): Promise<void> { + if (csi.coinPubs.length != csi.contributions.length) { + throw Error("assertion failed"); + } + if (csi.coinPubs.length === 0) { + return; + } + let refreshCoinPubs: CoinRefreshRequest[] = []; + for (let i = 0; i < csi.coinPubs.length; i++) { + const coin = await tx.coins.get(csi.coinPubs[i]); + if (!coin) { + throw Error("coin allocated for payment doesn't exist anymore"); + } + const denom = await getDenomInfo( + wex, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + checkDbInvariant(!!denom); + const coinAvailability = await tx.coinAvailability.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + coin.maxAge, + ]); + checkDbInvariant(!!coinAvailability); + const contrib = csi.contributions[i]; + if (coin.status !== CoinStatus.Fresh) { + const alloc = coin.spendAllocation; + if (!alloc) { + continue; + } + if (alloc.id !== csi.allocationId) { + // FIXME: assign error code + logger.info("conflicting coin allocation ID"); + logger.info(`old ID: ${alloc.id}, new ID: ${csi.allocationId}`); + throw Error("conflicting coin allocation (id)"); + } + if (0 !== Amounts.cmp(alloc.amount, contrib)) { + // FIXME: assign error code + throw Error("conflicting coin allocation (contrib)"); + } + continue; + } + coin.status = CoinStatus.Dormant; + coin.spendAllocation = { + id: csi.allocationId, + amount: Amounts.stringify(contrib), + }; + const remaining = Amounts.sub(denom.value, contrib); + if (remaining.saturated) { + throw Error("not enough remaining balance on coin for payment"); + } + refreshCoinPubs.push({ + amount: Amounts.stringify(remaining.amount), + coinPub: coin.coinPub, + }); + checkDbInvariant(!!coinAvailability); + if (coinAvailability.freshCoinCount === 0) { + throw Error( + `invalid coin count ${coinAvailability.freshCoinCount} in DB`, + ); + } + coinAvailability.freshCoinCount--; + if (coin.visible) { + if (!coinAvailability.visibleCoinCount) { + logger.error("coin availability inconsistent"); + } else { + coinAvailability.visibleCoinCount--; + } + } + await tx.coins.put(coin); + await tx.coinAvailability.put(coinAvailability); + } + + await createRefreshGroup( + wex, + tx, + Amounts.currencyOf(csi.contributions[0]), + refreshCoinPubs, + csi.refreshReason, + csi.allocationId, + ); +} -export interface TrustInfo { - isTrusted: boolean; - isAudited: boolean; +export enum TombstoneTag { + DeleteWithdrawalGroup = "delete-withdrawal-group", + DeleteReserve = "delete-reserve", + DeletePayment = "delete-payment", + DeleteReward = "delete-reward", + DeleteRefreshGroup = "delete-refresh-group", + DeleteDepositGroup = "delete-deposit-group", + DeleteRefund = "delete-refund", + DeletePeerPullDebit = "delete-peer-pull-debit", + DeletePeerPushDebit = "delete-peer-push-debit", + DeletePeerPullCredit = "delete-peer-pull-credit", + DeletePeerPushCredit = "delete-peer-push-credit", +} + +export function getExchangeTosStatusFromRecord( + exchange: ExchangeEntryRecord, +): ExchangeTosStatus { + if (!exchange.tosAcceptedEtag) { + return ExchangeTosStatus.Proposed; + } + if (exchange.tosAcceptedEtag == exchange.tosCurrentEtag) { + return ExchangeTosStatus.Accepted; + } + return ExchangeTosStatus.Proposed; +} + +export function getExchangeUpdateStatusFromRecord( + r: ExchangeEntryRecord, +): ExchangeUpdateStatus { + switch (r.updateStatus) { + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + return ExchangeUpdateStatus.UnavailableUpdate; + case ExchangeEntryDbUpdateStatus.Initial: + return ExchangeUpdateStatus.Initial; + case ExchangeEntryDbUpdateStatus.InitialUpdate: + return ExchangeUpdateStatus.InitialUpdate; + case ExchangeEntryDbUpdateStatus.Ready: + return ExchangeUpdateStatus.Ready; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + return ExchangeUpdateStatus.ReadyUpdate; + case ExchangeEntryDbUpdateStatus.Suspended: + return ExchangeUpdateStatus.Suspended; + default: + assertUnreachable(r.updateStatus); + } +} + +export function getExchangeEntryStatusFromRecord( + r: ExchangeEntryRecord, +): ExchangeEntryStatus { + switch (r.entryStatus) { + case ExchangeEntryDbRecordStatus.Ephemeral: + return ExchangeEntryStatus.Ephemeral; + case ExchangeEntryDbRecordStatus.Preset: + return ExchangeEntryStatus.Preset; + case ExchangeEntryDbRecordStatus.Used: + return ExchangeEntryStatus.Used; + default: + assertUnreachable(r.entryStatus); + } } /** - * Interface for exchange-related operations. + * Compute the state of an exchange entry from the DB + * record. */ -export interface ExchangeOperations { - // FIXME: Should other operations maybe always use - // updateExchangeFromUrl? - getExchangeDetails( - tx: GetReadOnlyAccess<{ - exchanges: typeof WalletStoresV1.exchanges; - exchangeDetails: typeof WalletStoresV1.exchangeDetails; - }>, - exchangeBaseUrl: string, - ): Promise<ExchangeDetailsRecord | undefined>; - getExchangeTrust( - ws: InternalWalletState, - exchangeInfo: ExchangeRecord, - ): Promise<TrustInfo>; - updateExchangeFromUrl( - ws: InternalWalletState, - baseUrl: string, - acceptedFormat?: string[], - forceNow?: boolean, - ): Promise<{ - exchange: ExchangeRecord; - exchangeDetails: ExchangeDetailsRecord; - }>; -} - -export interface RecoupOperations { - createRecoupGroup( - ws: InternalWalletState, - tx: GetReadWriteAccess<{ - recoupGroups: typeof WalletStoresV1.recoupGroups; - denominations: typeof WalletStoresV1.denominations; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coins: typeof WalletStoresV1.coins; - }>, - coinPubs: string[], - ): Promise<string>; - processRecoupGroup( - ws: InternalWalletState, - recoupGroupId: string, - forceNow?: boolean, - ): Promise<void>; -} - -export type NotificationListener = (n: WalletNotification) => void; +export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState { + return { + exchangeEntryStatus: getExchangeEntryStatusFromRecord(r), + exchangeUpdateStatus: getExchangeUpdateStatusFromRecord(r), + tosStatus: getExchangeTosStatusFromRecord(r), + }; +} + +export type ParsedTombstone = + | { + tag: TombstoneTag.DeleteWithdrawalGroup; + withdrawalGroupId: string; + } + | { tag: TombstoneTag.DeleteRefund; refundGroupId: string } + | { tag: TombstoneTag.DeleteReserve; reservePub: string } + | { tag: TombstoneTag.DeleteRefreshGroup; refreshGroupId: string } + | { tag: TombstoneTag.DeleteReward; walletTipId: string } + | { tag: TombstoneTag.DeletePayment; proposalId: string }; + +export function constructTombstone(p: ParsedTombstone): TombstoneIdStr { + switch (p.tag) { + case TombstoneTag.DeleteWithdrawalGroup: + return `tmb:${p.tag}:${p.withdrawalGroupId}` as TombstoneIdStr; + case TombstoneTag.DeleteRefund: + return `tmb:${p.tag}:${p.refundGroupId}` as TombstoneIdStr; + case TombstoneTag.DeleteReserve: + return `tmb:${p.tag}:${p.reservePub}` as TombstoneIdStr; + case TombstoneTag.DeletePayment: + return `tmb:${p.tag}:${p.proposalId}` as TombstoneIdStr; + case TombstoneTag.DeleteRefreshGroup: + return `tmb:${p.tag}:${p.refreshGroupId}` as TombstoneIdStr; + case TombstoneTag.DeleteReward: + return `tmb:${p.tag}:${p.walletTipId}` as TombstoneIdStr; + default: + assertUnreachable(p); + } +} /** - * Internal, shard wallet state that is used by the implementation - * of wallet operations. - * - * FIXME: This should not be exported anywhere from the taler-wallet-core package, - * as it's an opaque implementation detail. + * Uniform interface for a particular wallet transaction. */ -export interface InternalWalletState { - memoProcessReserve: AsyncOpMemoMap<void>; - memoMakePlanchet: AsyncOpMemoMap<void>; - memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>; - memoGetBalance: AsyncOpMemoSingle<BalancesResponse>; - memoProcessRefresh: AsyncOpMemoMap<void>; - memoProcessRecoup: AsyncOpMemoMap<void>; - memoProcessDeposit: AsyncOpMemoMap<void>; - cryptoApi: CryptoApi; - - timerGroup: TimerGroup; - stopped: boolean; +export interface TransactionManager { + get taskId(): TaskIdStr; + get transactionId(): TransactionIdStr; + fail(): Promise<void>; + abort(): Promise<void>; + suspend(): Promise<void>; + resume(): Promise<void>; + process(): Promise<TaskRunResult>; +} +export enum TaskRunResultType { + Finished = "finished", + Backoff = "backoff", + Progress = "progress", + Error = "error", + LongpollReturnedPending = "longpoll-returned-pending", + ScheduleLater = "schedule-later", +} + +export type TaskRunResult = + | TaskRunFinishedResult + | TaskRunErrorResult + | TaskRunBackoffResult + | TaskRunProgressResult + | TaskRunLongpollReturnedPendingResult + | TaskRunScheduleLaterResult; + +export namespace TaskRunResult { /** - * Asynchronous condition to interrupt the sleep of the - * retry loop. - * - * Used to allow processing of new work faster. + * Task is finished and does not need to be processed again. */ - latch: AsyncCondition; + export function finished(): TaskRunResult { + return { + type: TaskRunResultType.Finished, + }; + } + /** + * Task is waiting for something, should be invoked + * again with exponentiall back-off until some other + * result is returned. + */ + export function backoff(): TaskRunResult { + return { + type: TaskRunResultType.Backoff, + }; + } + /** + * Task made progress and should be processed again. + */ + export function progress(): TaskRunResult { + return { + type: TaskRunResultType.Progress, + }; + } + /** + * Run the task again at a fixed time in the future. + */ + export function runAgainAt(runAt: AbsoluteTime): TaskRunResult { + return { + type: TaskRunResultType.ScheduleLater, + runAt, + }; + } + /** + * Longpolling returned, but what we're waiting for + * is still pending on the other side. + */ + export function longpollReturnedPending(): TaskRunLongpollReturnedPendingResult { + return { + type: TaskRunResultType.LongpollReturnedPending, + }; + } +} - listeners: NotificationListener[]; +export interface TaskRunFinishedResult { + type: TaskRunResultType.Finished; +} - initCalled: boolean; +export interface TaskRunBackoffResult { + type: TaskRunResultType.Backoff; +} - exchangeOps: ExchangeOperations; - recoupOps: RecoupOperations; +export interface TaskRunProgressResult { + type: TaskRunResultType.Progress; +} - db: DbAccess<typeof WalletStoresV1>; - http: HttpRequestLibrary; +export interface TaskRunScheduleLaterResult { + type: TaskRunResultType.ScheduleLater; + runAt: AbsoluteTime; +} - notify(n: WalletNotification): void; +export interface TaskRunLongpollReturnedPendingResult { + type: TaskRunResultType.LongpollReturnedPending; +} - addNotificationListener(f: (n: WalletNotification) => void): void; +export interface TaskRunErrorResult { + type: TaskRunResultType.Error; + errorDetail: TalerErrorDetail; +} - /** - * Stop ongoing processing. - */ - stop(): void; +export interface DbRetryInfo { + firstTry: DbPreciseTimestamp; + nextRetry: DbPreciseTimestamp; + retryCounter: number; +} - /** - * Run an async function after acquiring a list of locks, identified - * by string tokens. - */ - runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>; +export interface RetryPolicy { + readonly backoffDelta: Duration; + readonly backoffBase: number; + readonly maxTimeout: Duration; +} + +const defaultRetryPolicy: RetryPolicy = { + backoffBase: 1.5, + backoffDelta: Duration.fromSpec({ seconds: 1 }), + maxTimeout: Duration.fromSpec({ minutes: 2 }), +}; + +function updateTimeout( + r: DbRetryInfo, + p: RetryPolicy = defaultRetryPolicy, +): void { + const now = AbsoluteTime.now(); + if (now.t_ms === "never") { + throw Error("assertion failed"); + } + if (p.backoffDelta.d_ms === "forever") { + r.nextRetry = timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(AbsoluteTime.never()), + ); + return; + } + + const nextIncrement = + p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter); + + const t = + now.t_ms + + (p.maxTimeout.d_ms === "forever" + ? nextIncrement + : Math.min(p.maxTimeout.d_ms, nextIncrement)); + r.nextRetry = timestampPreciseToDb(TalerPreciseTimestamp.fromMilliseconds(t)); +} + +export function computeDbBackoff(retryCounter: number): DbPreciseTimestamp { + const now = AbsoluteTime.now(); + if (now.t_ms === "never") { + throw Error("assertion failed"); + } + const p = defaultRetryPolicy; + if (p.backoffDelta.d_ms === "forever") { + throw Error("assertion failed"); + } + + const nextIncrement = + p.backoffDelta.d_ms * Math.pow(p.backoffBase, retryCounter); + + const t = + now.t_ms + + (p.maxTimeout.d_ms === "forever" + ? nextIncrement + : Math.min(p.maxTimeout.d_ms, nextIncrement)); + return timestampPreciseToDb(TalerPreciseTimestamp.fromMilliseconds(t)); +} + +export namespace DbRetryInfo { + export function reset(p: RetryPolicy = defaultRetryPolicy): DbRetryInfo { + const now = TalerPreciseTimestamp.now(); + const info: DbRetryInfo = { + firstTry: timestampPreciseToDb(now), + nextRetry: timestampPreciseToDb(now), + retryCounter: 0, + }; + updateTimeout(info, p); + return info; + } + + export function increment( + r: DbRetryInfo | undefined, + p: RetryPolicy = defaultRetryPolicy, + ): DbRetryInfo { + if (!r) { + return reset(p); + } + const r2 = { ...r }; + r2.retryCounter++; + updateTimeout(r2, p); + return r2; + } +} + +/** + * Timestamp after which the wallet would do an auto-refresh. + */ +export function getAutoRefreshExecuteThreshold(d: { + stampExpireWithdraw: TalerProtocolTimestamp; + stampExpireDeposit: TalerProtocolTimestamp; +}): AbsoluteTime { + const expireWithdraw = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireWithdraw, + ); + const expireDeposit = AbsoluteTime.fromProtocolTimestamp( + d.stampExpireDeposit, + ); + const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit); + const deltaDiv = durationMul(delta, 0.5); + return AbsoluteTime.addDuration(expireWithdraw, deltaDiv); +} + +/** + * Parsed representation of task identifiers. + */ +export type ParsedTaskIdentifier = + | { + tag: PendingTaskType.Withdraw; + withdrawalGroupId: string; + } + | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } + | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string } + | { tag: PendingTaskType.Deposit; depositGroupId: string } + | { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string } + | { tag: PendingTaskType.PeerPullCredit; pursePub: string } + | { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string } + | { tag: PendingTaskType.PeerPushDebit; pursePub: string } + | { tag: PendingTaskType.Purchase; proposalId: string } + | { tag: PendingTaskType.Recoup; recoupGroupId: string } + | { tag: PendingTaskType.RewardPickup; walletRewardId: string } + | { tag: PendingTaskType.Refresh; refreshGroupId: string }; + +export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { + const task = x.split(":"); + + if (task.length < 2) { + throw Error("task id should have al least 2 parts separated by ':'"); + } + + const [type, ...rest] = task; + switch (type) { + case PendingTaskType.Backup: + return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) }; + case PendingTaskType.Deposit: + return { tag: type, depositGroupId: rest[0] }; + case PendingTaskType.ExchangeUpdate: + return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) }; + case PendingTaskType.PeerPullCredit: + return { tag: type, pursePub: rest[0] }; + case PendingTaskType.PeerPullDebit: + return { tag: type, peerPullDebitId: rest[0] }; + case PendingTaskType.PeerPushCredit: + return { tag: type, peerPushCreditId: rest[0] }; + case PendingTaskType.PeerPushDebit: + return { tag: type, pursePub: rest[0] }; + case PendingTaskType.Purchase: + return { tag: type, proposalId: rest[0] }; + case PendingTaskType.Recoup: + return { tag: type, recoupGroupId: rest[0] }; + case PendingTaskType.Refresh: + return { tag: type, refreshGroupId: rest[0] }; + case PendingTaskType.RewardPickup: + return { tag: type, walletRewardId: rest[0] }; + case PendingTaskType.Withdraw: + return { tag: type, withdrawalGroupId: rest[0] }; + default: + throw Error("invalid task identifier"); + } +} + +export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskIdStr { + switch (p.tag) { + case PendingTaskType.Backup: + return `${p.tag}:${p.backupProviderBaseUrl}` as TaskIdStr; + case PendingTaskType.Deposit: + return `${p.tag}:${p.depositGroupId}` as TaskIdStr; + case PendingTaskType.ExchangeUpdate: + return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskIdStr; + case PendingTaskType.PeerPullDebit: + return `${p.tag}:${p.peerPullDebitId}` as TaskIdStr; + case PendingTaskType.PeerPushCredit: + return `${p.tag}:${p.peerPushCreditId}` as TaskIdStr; + case PendingTaskType.PeerPullCredit: + return `${p.tag}:${p.pursePub}` as TaskIdStr; + case PendingTaskType.PeerPushDebit: + return `${p.tag}:${p.pursePub}` as TaskIdStr; + case PendingTaskType.Purchase: + return `${p.tag}:${p.proposalId}` as TaskIdStr; + case PendingTaskType.Recoup: + return `${p.tag}:${p.recoupGroupId}` as TaskIdStr; + case PendingTaskType.Refresh: + return `${p.tag}:${p.refreshGroupId}` as TaskIdStr; + case PendingTaskType.RewardPickup: + return `${p.tag}:${p.walletRewardId}` as TaskIdStr; + case PendingTaskType.Withdraw: + return `${p.tag}:${p.withdrawalGroupId}` as TaskIdStr; + default: + assertUnreachable(p); + } +} + +export namespace TaskIdentifiers { + export function forWithdrawal(wg: WithdrawalGroupRecord): TaskIdStr { + return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskIdStr; + } + export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskIdStr { + return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent( + exch.baseUrl, + )}` as TaskIdStr; + } + export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskIdStr { + return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent( + exchBaseUrl, + )}` as TaskIdStr; + } + export function forTipPickup(tipRecord: RewardRecord): TaskIdStr { + return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskIdStr; + } + export function forRefresh( + refreshGroupRecord: RefreshGroupRecord, + ): TaskIdStr { + return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskIdStr; + } + export function forPay(purchaseRecord: PurchaseRecord): TaskIdStr { + return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskIdStr; + } + export function forRecoup(recoupRecord: RecoupGroupRecord): TaskIdStr { + return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskIdStr; + } + export function forDeposit(depositRecord: DepositGroupRecord): TaskIdStr { + return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskIdStr; + } + export function forBackup(backupRecord: BackupProviderRecord): TaskIdStr { + return `${PendingTaskType.Backup}:${encodeURIComponent( + backupRecord.baseUrl, + )}` as TaskIdStr; + } + export function forPeerPushPaymentInitiation( + ppi: PeerPushDebitRecord, + ): TaskIdStr { + return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskIdStr; + } + export function forPeerPullPaymentInitiation( + ppi: PeerPullCreditRecord, + ): TaskIdStr { + return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskIdStr; + } + export function forPeerPullPaymentDebit( + ppi: PeerPullPaymentIncomingRecord, + ): TaskIdStr { + return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskIdStr; + } + export function forPeerPushCredit( + ppi: PeerPushPaymentIncomingRecord, + ): TaskIdStr { + return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskIdStr; + } +} + +/** + * Result of a transaction transition. + */ +export enum TransitionResultType { + Transition = 1, + Stay = 2, + Delete = 3, +} + +export type TransitionResult<R> = + | { type: TransitionResultType.Stay } + | { type: TransitionResultType.Transition; rec: R } + | { type: TransitionResultType.Delete }; + +export const TransitionResult = { + stay<T>(): TransitionResult<T> { + return { type: TransitionResultType.Stay }; + }, + delete<T>(): TransitionResult<T> { + return { type: TransitionResultType.Delete }; + }, + transition<T>(rec: T): TransitionResult<T> { + return { + type: TransitionResultType.Transition, + rec, + }; + }, +}; + +/** + * Transaction context. + * Uniform interface to all transactions. + */ +export interface TransactionContext { + get taskId(): TaskIdStr | undefined; + get transactionId(): TransactionIdStr; + abortTransaction(): Promise<void>; + suspendTransaction(): Promise<void>; + resumeTransaction(): Promise<void>; + failTransaction(): Promise<void>; + deleteTransaction(): Promise<void>; +} + +/** + * Type and schema definitions for pending tasks in the wallet. + * + * These are only used internally, and are not part of the stable public + * interface to the wallet. + */ + +export enum PendingTaskType { + ExchangeUpdate = "exchange-update", + Purchase = "purchase", + Refresh = "refresh", + Recoup = "recoup", + RewardPickup = "reward-pickup", + Withdraw = "withdraw", + Deposit = "deposit", + Backup = "backup", + PeerPushDebit = "peer-push-debit", + PeerPullCredit = "peer-pull-credit", + PeerPushCredit = "peer-push-credit", + PeerPullDebit = "peer-pull-debit", +} + +declare const __taskIdStr: unique symbol; +export type TaskIdStr = string & { [__taskIdStr]: true }; + +/** + * Wait until the wallet is in a particular state. + * + * Two functions must be provided: + * 1. checkState, which checks if the wallet is in the + * desired state. + * 2. filterNotification, which checks whether a notification + * might have lead to a state change. + */ +export async function genericWaitForState( + wex: WalletExecutionContext, + args: { + checkState: () => Promise<boolean>; + filterNotification: (notif: WalletNotification) => boolean; + }, +): Promise<void> { + await wex.taskScheduler.ensureRunning(); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const flag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our refresh. + const cancelNotif = wex.ws.addNotificationListener((notif) => { + if (args.filterNotification(notif)) { + flag.raise(); + } + }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + flag.raise(); + }); - runUntilDone(req?: { maxRetries?: number }): Promise<void>; + try { + while (true) { + if (wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } + if (await args.checkState()) { + return; + } + // Wait for the next transition + await flag.wait(); + flag.reset(); + } + } catch (e) { + unregisterOnCancelled(); + cancelNotif(); + } } |