summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/common.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/common.ts')
-rw-r--r--packages/taler-wallet-core/src/common.ts897
1 files changed, 782 insertions, 115 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index dd8542def..edaba5ba4 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2019 GNUnet e.V.
+ (C) 2022 GNUnet e.V.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -15,142 +15,809 @@
*/
/**
- * Common interface of the internal wallet state. This object is passed
- * to the various operations (exchange management, withdrawal, refresh, reserve
- * management, etc.).
- *
- * Some operations can be accessed via this state object. This allows mutual
- * recursion between operations, without having cycling dependencies between
- * the respective TypeScript files.
- *
- * (You can think of this as a "header file" for the wallet implementation.)
- */
-
-/**
* Imports.
*/
-import { WalletNotification, BalancesResponse } from "@gnu-taler/taler-util";
-import { CryptoApi } from "./crypto/workers/cryptoApi.js";
-import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js";
-import { PendingOperationsResponse } from "./pending-types.js";
-import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js";
-import { HttpRequestLibrary } from "./util/http.js";
-import { AsyncCondition } from "./util/promiseUtils.js";
import {
- DbAccess,
- GetReadOnlyAccess,
- GetReadWriteAccess,
-} from "./util/query.js";
-import { TimerGroup } from "./util/timer.js";
+ AbsoluteTime,
+ AmountJson,
+ Amounts,
+ AsyncFlag,
+ CoinRefreshRequest,
+ CoinStatus,
+ Duration,
+ ExchangeEntryState,
+ ExchangeEntryStatus,
+ ExchangeTosStatus,
+ ExchangeUpdateStatus,
+ Logger,
+ RefreshReason,
+ TalerErrorDetail,
+ TalerPreciseTimestamp,
+ TalerProtocolTimestamp,
+ TombstoneIdStr,
+ TransactionIdStr,
+ WalletNotification,
+ assertUnreachable,
+ checkDbInvariant,
+ checkLogicInvariant,
+ durationMul,
+} from "@gnu-taler/taler-util";
+import {
+ BackupProviderRecord,
+ CoinRecord,
+ DbPreciseTimestamp,
+ DepositGroupRecord,
+ ExchangeEntryDbRecordStatus,
+ ExchangeEntryDbUpdateStatus,
+ ExchangeEntryRecord,
+ PeerPullCreditRecord,
+ PeerPullPaymentIncomingRecord,
+ PeerPushDebitRecord,
+ PeerPushPaymentIncomingRecord,
+ PurchaseRecord,
+ RecoupGroupRecord,
+ RefreshGroupRecord,
+ RewardRecord,
+ WalletDbReadWriteTransaction,
+ WithdrawalGroupRecord,
+ timestampPreciseToDb,
+} from "./db.js";
+import { createRefreshGroup } from "./refresh.js";
+import { WalletExecutionContext, getDenomInfo } from "./wallet.js";
+
+const logger = new Logger("operations/common.ts");
+
+export interface CoinsSpendInfo {
+ coinPubs: string[];
+ contributions: AmountJson[];
+ refreshReason: RefreshReason;
+ /**
+ * Identifier for what the coin has been spent for.
+ */
+ allocationId: TransactionIdStr;
+}
+
+export async function makeCoinsVisible(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<["coins", "coinAvailability"]>,
+ transactionId: string,
+): Promise<void> {
+ const coins =
+ await tx.coins.indexes.bySourceTransactionId.getAll(transactionId);
+ for (const coinRecord of coins) {
+ if (!coinRecord.visible) {
+ coinRecord.visible = 1;
+ await tx.coins.put(coinRecord);
+ const ageRestriction = coinRecord.maxAge;
+ const car = await tx.coinAvailability.get([
+ coinRecord.exchangeBaseUrl,
+ coinRecord.denomPubHash,
+ ageRestriction,
+ ]);
+ if (!car) {
+ logger.error("missing coin availability record");
+ continue;
+ }
+ const visCount = car.visibleCoinCount ?? 0;
+ car.visibleCoinCount = visCount + 1;
+ await tx.coinAvailability.put(car);
+ }
+ }
+}
-export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
-export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
+export async function makeCoinAvailable(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ ["coins", "coinAvailability", "denominations"]
+ >,
+ coinRecord: CoinRecord,
+): Promise<void> {
+ checkLogicInvariant(coinRecord.status === CoinStatus.Fresh);
+ const existingCoin = await tx.coins.get(coinRecord.coinPub);
+ if (existingCoin) {
+ return;
+ }
+ const denom = await tx.denominations.get([
+ coinRecord.exchangeBaseUrl,
+ coinRecord.denomPubHash,
+ ]);
+ checkDbInvariant(!!denom);
+ const ageRestriction = coinRecord.maxAge;
+ let car = await tx.coinAvailability.get([
+ coinRecord.exchangeBaseUrl,
+ coinRecord.denomPubHash,
+ ageRestriction,
+ ]);
+ if (!car) {
+ car = {
+ maxAge: ageRestriction,
+ value: denom.value,
+ currency: denom.currency,
+ denomPubHash: denom.denomPubHash,
+ exchangeBaseUrl: denom.exchangeBaseUrl,
+ freshCoinCount: 0,
+ visibleCoinCount: 0,
+ };
+ }
+ car.freshCoinCount++;
+ await tx.coins.put(coinRecord);
+ await tx.coinAvailability.put(car);
+}
+
+export async function spendCoins(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadWriteTransaction<
+ [
+ "coins",
+ "coinAvailability",
+ "refreshGroups",
+ "refreshSessions",
+ "denominations",
+ ]
+ >,
+ csi: CoinsSpendInfo,
+): Promise<void> {
+ if (csi.coinPubs.length != csi.contributions.length) {
+ throw Error("assertion failed");
+ }
+ if (csi.coinPubs.length === 0) {
+ return;
+ }
+ let refreshCoinPubs: CoinRefreshRequest[] = [];
+ for (let i = 0; i < csi.coinPubs.length; i++) {
+ const coin = await tx.coins.get(csi.coinPubs[i]);
+ if (!coin) {
+ throw Error("coin allocated for payment doesn't exist anymore");
+ }
+ const denom = await getDenomInfo(
+ wex,
+ tx,
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ );
+ checkDbInvariant(!!denom);
+ const coinAvailability = await tx.coinAvailability.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ coin.maxAge,
+ ]);
+ checkDbInvariant(!!coinAvailability);
+ const contrib = csi.contributions[i];
+ if (coin.status !== CoinStatus.Fresh) {
+ const alloc = coin.spendAllocation;
+ if (!alloc) {
+ continue;
+ }
+ if (alloc.id !== csi.allocationId) {
+ // FIXME: assign error code
+ logger.info("conflicting coin allocation ID");
+ logger.info(`old ID: ${alloc.id}, new ID: ${csi.allocationId}`);
+ throw Error("conflicting coin allocation (id)");
+ }
+ if (0 !== Amounts.cmp(alloc.amount, contrib)) {
+ // FIXME: assign error code
+ throw Error("conflicting coin allocation (contrib)");
+ }
+ continue;
+ }
+ coin.status = CoinStatus.Dormant;
+ coin.spendAllocation = {
+ id: csi.allocationId,
+ amount: Amounts.stringify(contrib),
+ };
+ const remaining = Amounts.sub(denom.value, contrib);
+ if (remaining.saturated) {
+ throw Error("not enough remaining balance on coin for payment");
+ }
+ refreshCoinPubs.push({
+ amount: Amounts.stringify(remaining.amount),
+ coinPub: coin.coinPub,
+ });
+ checkDbInvariant(!!coinAvailability);
+ if (coinAvailability.freshCoinCount === 0) {
+ throw Error(
+ `invalid coin count ${coinAvailability.freshCoinCount} in DB`,
+ );
+ }
+ coinAvailability.freshCoinCount--;
+ if (coin.visible) {
+ if (!coinAvailability.visibleCoinCount) {
+ logger.error("coin availability inconsistent");
+ } else {
+ coinAvailability.visibleCoinCount--;
+ }
+ }
+ await tx.coins.put(coin);
+ await tx.coinAvailability.put(coinAvailability);
+ }
+
+ await createRefreshGroup(
+ wex,
+ tx,
+ Amounts.currencyOf(csi.contributions[0]),
+ refreshCoinPubs,
+ csi.refreshReason,
+ csi.allocationId,
+ );
+}
-export interface TrustInfo {
- isTrusted: boolean;
- isAudited: boolean;
+export enum TombstoneTag {
+ DeleteWithdrawalGroup = "delete-withdrawal-group",
+ DeleteReserve = "delete-reserve",
+ DeletePayment = "delete-payment",
+ DeleteReward = "delete-reward",
+ DeleteRefreshGroup = "delete-refresh-group",
+ DeleteDepositGroup = "delete-deposit-group",
+ DeleteRefund = "delete-refund",
+ DeletePeerPullDebit = "delete-peer-pull-debit",
+ DeletePeerPushDebit = "delete-peer-push-debit",
+ DeletePeerPullCredit = "delete-peer-pull-credit",
+ DeletePeerPushCredit = "delete-peer-push-credit",
+}
+
+export function getExchangeTosStatusFromRecord(
+ exchange: ExchangeEntryRecord,
+): ExchangeTosStatus {
+ if (!exchange.tosAcceptedEtag) {
+ return ExchangeTosStatus.Proposed;
+ }
+ if (exchange.tosAcceptedEtag == exchange.tosCurrentEtag) {
+ return ExchangeTosStatus.Accepted;
+ }
+ return ExchangeTosStatus.Proposed;
+}
+
+export function getExchangeUpdateStatusFromRecord(
+ r: ExchangeEntryRecord,
+): ExchangeUpdateStatus {
+ switch (r.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ return ExchangeUpdateStatus.UnavailableUpdate;
+ case ExchangeEntryDbUpdateStatus.Initial:
+ return ExchangeUpdateStatus.Initial;
+ case ExchangeEntryDbUpdateStatus.InitialUpdate:
+ return ExchangeUpdateStatus.InitialUpdate;
+ case ExchangeEntryDbUpdateStatus.Ready:
+ return ExchangeUpdateStatus.Ready;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ return ExchangeUpdateStatus.ReadyUpdate;
+ case ExchangeEntryDbUpdateStatus.Suspended:
+ return ExchangeUpdateStatus.Suspended;
+ default:
+ assertUnreachable(r.updateStatus);
+ }
+}
+
+export function getExchangeEntryStatusFromRecord(
+ r: ExchangeEntryRecord,
+): ExchangeEntryStatus {
+ switch (r.entryStatus) {
+ case ExchangeEntryDbRecordStatus.Ephemeral:
+ return ExchangeEntryStatus.Ephemeral;
+ case ExchangeEntryDbRecordStatus.Preset:
+ return ExchangeEntryStatus.Preset;
+ case ExchangeEntryDbRecordStatus.Used:
+ return ExchangeEntryStatus.Used;
+ default:
+ assertUnreachable(r.entryStatus);
+ }
}
/**
- * Interface for exchange-related operations.
+ * Compute the state of an exchange entry from the DB
+ * record.
*/
-export interface ExchangeOperations {
- // FIXME: Should other operations maybe always use
- // updateExchangeFromUrl?
- getExchangeDetails(
- tx: GetReadOnlyAccess<{
- exchanges: typeof WalletStoresV1.exchanges;
- exchangeDetails: typeof WalletStoresV1.exchangeDetails;
- }>,
- exchangeBaseUrl: string,
- ): Promise<ExchangeDetailsRecord | undefined>;
- getExchangeTrust(
- ws: InternalWalletState,
- exchangeInfo: ExchangeRecord,
- ): Promise<TrustInfo>;
- updateExchangeFromUrl(
- ws: InternalWalletState,
- baseUrl: string,
- acceptedFormat?: string[],
- forceNow?: boolean,
- ): Promise<{
- exchange: ExchangeRecord;
- exchangeDetails: ExchangeDetailsRecord;
- }>;
-}
-
-export interface RecoupOperations {
- createRecoupGroup(
- ws: InternalWalletState,
- tx: GetReadWriteAccess<{
- recoupGroups: typeof WalletStoresV1.recoupGroups;
- denominations: typeof WalletStoresV1.denominations;
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- coins: typeof WalletStoresV1.coins;
- }>,
- coinPubs: string[],
- ): Promise<string>;
- processRecoupGroup(
- ws: InternalWalletState,
- recoupGroupId: string,
- forceNow?: boolean,
- ): Promise<void>;
-}
-
-export type NotificationListener = (n: WalletNotification) => void;
+export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState {
+ return {
+ exchangeEntryStatus: getExchangeEntryStatusFromRecord(r),
+ exchangeUpdateStatus: getExchangeUpdateStatusFromRecord(r),
+ tosStatus: getExchangeTosStatusFromRecord(r),
+ };
+}
+
+export type ParsedTombstone =
+ | {
+ tag: TombstoneTag.DeleteWithdrawalGroup;
+ withdrawalGroupId: string;
+ }
+ | { tag: TombstoneTag.DeleteRefund; refundGroupId: string }
+ | { tag: TombstoneTag.DeleteReserve; reservePub: string }
+ | { tag: TombstoneTag.DeleteRefreshGroup; refreshGroupId: string }
+ | { tag: TombstoneTag.DeleteReward; walletTipId: string }
+ | { tag: TombstoneTag.DeletePayment; proposalId: string };
+
+export function constructTombstone(p: ParsedTombstone): TombstoneIdStr {
+ switch (p.tag) {
+ case TombstoneTag.DeleteWithdrawalGroup:
+ return `tmb:${p.tag}:${p.withdrawalGroupId}` as TombstoneIdStr;
+ case TombstoneTag.DeleteRefund:
+ return `tmb:${p.tag}:${p.refundGroupId}` as TombstoneIdStr;
+ case TombstoneTag.DeleteReserve:
+ return `tmb:${p.tag}:${p.reservePub}` as TombstoneIdStr;
+ case TombstoneTag.DeletePayment:
+ return `tmb:${p.tag}:${p.proposalId}` as TombstoneIdStr;
+ case TombstoneTag.DeleteRefreshGroup:
+ return `tmb:${p.tag}:${p.refreshGroupId}` as TombstoneIdStr;
+ case TombstoneTag.DeleteReward:
+ return `tmb:${p.tag}:${p.walletTipId}` as TombstoneIdStr;
+ default:
+ assertUnreachable(p);
+ }
+}
/**
- * Internal, shard wallet state that is used by the implementation
- * of wallet operations.
- *
- * FIXME: This should not be exported anywhere from the taler-wallet-core package,
- * as it's an opaque implementation detail.
+ * Uniform interface for a particular wallet transaction.
*/
-export interface InternalWalletState {
- memoProcessReserve: AsyncOpMemoMap<void>;
- memoMakePlanchet: AsyncOpMemoMap<void>;
- memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>;
- memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
- memoProcessRefresh: AsyncOpMemoMap<void>;
- memoProcessRecoup: AsyncOpMemoMap<void>;
- memoProcessDeposit: AsyncOpMemoMap<void>;
- cryptoApi: CryptoApi;
-
- timerGroup: TimerGroup;
- stopped: boolean;
+export interface TransactionManager {
+ get taskId(): TaskIdStr;
+ get transactionId(): TransactionIdStr;
+ fail(): Promise<void>;
+ abort(): Promise<void>;
+ suspend(): Promise<void>;
+ resume(): Promise<void>;
+ process(): Promise<TaskRunResult>;
+}
+export enum TaskRunResultType {
+ Finished = "finished",
+ Backoff = "backoff",
+ Progress = "progress",
+ Error = "error",
+ LongpollReturnedPending = "longpoll-returned-pending",
+ ScheduleLater = "schedule-later",
+}
+
+export type TaskRunResult =
+ | TaskRunFinishedResult
+ | TaskRunErrorResult
+ | TaskRunBackoffResult
+ | TaskRunProgressResult
+ | TaskRunLongpollReturnedPendingResult
+ | TaskRunScheduleLaterResult;
+
+export namespace TaskRunResult {
/**
- * Asynchronous condition to interrupt the sleep of the
- * retry loop.
- *
- * Used to allow processing of new work faster.
+ * Task is finished and does not need to be processed again.
*/
- latch: AsyncCondition;
+ export function finished(): TaskRunResult {
+ return {
+ type: TaskRunResultType.Finished,
+ };
+ }
+ /**
+ * Task is waiting for something, should be invoked
+ * again with exponentiall back-off until some other
+ * result is returned.
+ */
+ export function backoff(): TaskRunResult {
+ return {
+ type: TaskRunResultType.Backoff,
+ };
+ }
+ /**
+ * Task made progress and should be processed again.
+ */
+ export function progress(): TaskRunResult {
+ return {
+ type: TaskRunResultType.Progress,
+ };
+ }
+ /**
+ * Run the task again at a fixed time in the future.
+ */
+ export function runAgainAt(runAt: AbsoluteTime): TaskRunResult {
+ return {
+ type: TaskRunResultType.ScheduleLater,
+ runAt,
+ };
+ }
+ /**
+ * Longpolling returned, but what we're waiting for
+ * is still pending on the other side.
+ */
+ export function longpollReturnedPending(): TaskRunLongpollReturnedPendingResult {
+ return {
+ type: TaskRunResultType.LongpollReturnedPending,
+ };
+ }
+}
- listeners: NotificationListener[];
+export interface TaskRunFinishedResult {
+ type: TaskRunResultType.Finished;
+}
- initCalled: boolean;
+export interface TaskRunBackoffResult {
+ type: TaskRunResultType.Backoff;
+}
- exchangeOps: ExchangeOperations;
- recoupOps: RecoupOperations;
+export interface TaskRunProgressResult {
+ type: TaskRunResultType.Progress;
+}
- db: DbAccess<typeof WalletStoresV1>;
- http: HttpRequestLibrary;
+export interface TaskRunScheduleLaterResult {
+ type: TaskRunResultType.ScheduleLater;
+ runAt: AbsoluteTime;
+}
- notify(n: WalletNotification): void;
+export interface TaskRunLongpollReturnedPendingResult {
+ type: TaskRunResultType.LongpollReturnedPending;
+}
- addNotificationListener(f: (n: WalletNotification) => void): void;
+export interface TaskRunErrorResult {
+ type: TaskRunResultType.Error;
+ errorDetail: TalerErrorDetail;
+}
- /**
- * Stop ongoing processing.
- */
- stop(): void;
+export interface DbRetryInfo {
+ firstTry: DbPreciseTimestamp;
+ nextRetry: DbPreciseTimestamp;
+ retryCounter: number;
+}
- /**
- * Run an async function after acquiring a list of locks, identified
- * by string tokens.
- */
- runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
+export interface RetryPolicy {
+ readonly backoffDelta: Duration;
+ readonly backoffBase: number;
+ readonly maxTimeout: Duration;
+}
+
+const defaultRetryPolicy: RetryPolicy = {
+ backoffBase: 1.5,
+ backoffDelta: Duration.fromSpec({ seconds: 1 }),
+ maxTimeout: Duration.fromSpec({ minutes: 2 }),
+};
+
+function updateTimeout(
+ r: DbRetryInfo,
+ p: RetryPolicy = defaultRetryPolicy,
+): void {
+ const now = AbsoluteTime.now();
+ if (now.t_ms === "never") {
+ throw Error("assertion failed");
+ }
+ if (p.backoffDelta.d_ms === "forever") {
+ r.nextRetry = timestampPreciseToDb(
+ AbsoluteTime.toPreciseTimestamp(AbsoluteTime.never()),
+ );
+ return;
+ }
+
+ const nextIncrement =
+ p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter);
+
+ const t =
+ now.t_ms +
+ (p.maxTimeout.d_ms === "forever"
+ ? nextIncrement
+ : Math.min(p.maxTimeout.d_ms, nextIncrement));
+ r.nextRetry = timestampPreciseToDb(TalerPreciseTimestamp.fromMilliseconds(t));
+}
+
+export function computeDbBackoff(retryCounter: number): DbPreciseTimestamp {
+ const now = AbsoluteTime.now();
+ if (now.t_ms === "never") {
+ throw Error("assertion failed");
+ }
+ const p = defaultRetryPolicy;
+ if (p.backoffDelta.d_ms === "forever") {
+ throw Error("assertion failed");
+ }
+
+ const nextIncrement =
+ p.backoffDelta.d_ms * Math.pow(p.backoffBase, retryCounter);
+
+ const t =
+ now.t_ms +
+ (p.maxTimeout.d_ms === "forever"
+ ? nextIncrement
+ : Math.min(p.maxTimeout.d_ms, nextIncrement));
+ return timestampPreciseToDb(TalerPreciseTimestamp.fromMilliseconds(t));
+}
+
+export namespace DbRetryInfo {
+ export function reset(p: RetryPolicy = defaultRetryPolicy): DbRetryInfo {
+ const now = TalerPreciseTimestamp.now();
+ const info: DbRetryInfo = {
+ firstTry: timestampPreciseToDb(now),
+ nextRetry: timestampPreciseToDb(now),
+ retryCounter: 0,
+ };
+ updateTimeout(info, p);
+ return info;
+ }
+
+ export function increment(
+ r: DbRetryInfo | undefined,
+ p: RetryPolicy = defaultRetryPolicy,
+ ): DbRetryInfo {
+ if (!r) {
+ return reset(p);
+ }
+ const r2 = { ...r };
+ r2.retryCounter++;
+ updateTimeout(r2, p);
+ return r2;
+ }
+}
+
+/**
+ * Timestamp after which the wallet would do an auto-refresh.
+ */
+export function getAutoRefreshExecuteThreshold(d: {
+ stampExpireWithdraw: TalerProtocolTimestamp;
+ stampExpireDeposit: TalerProtocolTimestamp;
+}): AbsoluteTime {
+ const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireWithdraw,
+ );
+ const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireDeposit,
+ );
+ const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+ const deltaDiv = durationMul(delta, 0.5);
+ return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
+}
+
+/**
+ * Parsed representation of task identifiers.
+ */
+export type ParsedTaskIdentifier =
+ | {
+ tag: PendingTaskType.Withdraw;
+ withdrawalGroupId: string;
+ }
+ | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
+ | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
+ | { tag: PendingTaskType.Deposit; depositGroupId: string }
+ | { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string }
+ | { tag: PendingTaskType.PeerPullCredit; pursePub: string }
+ | { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string }
+ | { tag: PendingTaskType.PeerPushDebit; pursePub: string }
+ | { tag: PendingTaskType.Purchase; proposalId: string }
+ | { tag: PendingTaskType.Recoup; recoupGroupId: string }
+ | { tag: PendingTaskType.RewardPickup; walletRewardId: string }
+ | { tag: PendingTaskType.Refresh; refreshGroupId: string };
+
+export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
+ const task = x.split(":");
+
+ if (task.length < 2) {
+ throw Error("task id should have al least 2 parts separated by ':'");
+ }
+
+ const [type, ...rest] = task;
+ switch (type) {
+ case PendingTaskType.Backup:
+ return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) };
+ case PendingTaskType.Deposit:
+ return { tag: type, depositGroupId: rest[0] };
+ case PendingTaskType.ExchangeUpdate:
+ return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
+ case PendingTaskType.PeerPullCredit:
+ return { tag: type, pursePub: rest[0] };
+ case PendingTaskType.PeerPullDebit:
+ return { tag: type, peerPullDebitId: rest[0] };
+ case PendingTaskType.PeerPushCredit:
+ return { tag: type, peerPushCreditId: rest[0] };
+ case PendingTaskType.PeerPushDebit:
+ return { tag: type, pursePub: rest[0] };
+ case PendingTaskType.Purchase:
+ return { tag: type, proposalId: rest[0] };
+ case PendingTaskType.Recoup:
+ return { tag: type, recoupGroupId: rest[0] };
+ case PendingTaskType.Refresh:
+ return { tag: type, refreshGroupId: rest[0] };
+ case PendingTaskType.RewardPickup:
+ return { tag: type, walletRewardId: rest[0] };
+ case PendingTaskType.Withdraw:
+ return { tag: type, withdrawalGroupId: rest[0] };
+ default:
+ throw Error("invalid task identifier");
+ }
+}
+
+export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskIdStr {
+ switch (p.tag) {
+ case PendingTaskType.Backup:
+ return `${p.tag}:${p.backupProviderBaseUrl}` as TaskIdStr;
+ case PendingTaskType.Deposit:
+ return `${p.tag}:${p.depositGroupId}` as TaskIdStr;
+ case PendingTaskType.ExchangeUpdate:
+ return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskIdStr;
+ case PendingTaskType.PeerPullDebit:
+ return `${p.tag}:${p.peerPullDebitId}` as TaskIdStr;
+ case PendingTaskType.PeerPushCredit:
+ return `${p.tag}:${p.peerPushCreditId}` as TaskIdStr;
+ case PendingTaskType.PeerPullCredit:
+ return `${p.tag}:${p.pursePub}` as TaskIdStr;
+ case PendingTaskType.PeerPushDebit:
+ return `${p.tag}:${p.pursePub}` as TaskIdStr;
+ case PendingTaskType.Purchase:
+ return `${p.tag}:${p.proposalId}` as TaskIdStr;
+ case PendingTaskType.Recoup:
+ return `${p.tag}:${p.recoupGroupId}` as TaskIdStr;
+ case PendingTaskType.Refresh:
+ return `${p.tag}:${p.refreshGroupId}` as TaskIdStr;
+ case PendingTaskType.RewardPickup:
+ return `${p.tag}:${p.walletRewardId}` as TaskIdStr;
+ case PendingTaskType.Withdraw:
+ return `${p.tag}:${p.withdrawalGroupId}` as TaskIdStr;
+ default:
+ assertUnreachable(p);
+ }
+}
+
+export namespace TaskIdentifiers {
+ export function forWithdrawal(wg: WithdrawalGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskIdStr;
+ }
+ export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskIdStr {
+ return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent(
+ exch.baseUrl,
+ )}` as TaskIdStr;
+ }
+ export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskIdStr {
+ return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent(
+ exchBaseUrl,
+ )}` as TaskIdStr;
+ }
+ export function forTipPickup(tipRecord: RewardRecord): TaskIdStr {
+ return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskIdStr;
+ }
+ export function forRefresh(
+ refreshGroupRecord: RefreshGroupRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskIdStr;
+ }
+ export function forPay(purchaseRecord: PurchaseRecord): TaskIdStr {
+ return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskIdStr;
+ }
+ export function forRecoup(recoupRecord: RecoupGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskIdStr;
+ }
+ export function forDeposit(depositRecord: DepositGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskIdStr;
+ }
+ export function forBackup(backupRecord: BackupProviderRecord): TaskIdStr {
+ return `${PendingTaskType.Backup}:${encodeURIComponent(
+ backupRecord.baseUrl,
+ )}` as TaskIdStr;
+ }
+ export function forPeerPushPaymentInitiation(
+ ppi: PeerPushDebitRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskIdStr;
+ }
+ export function forPeerPullPaymentInitiation(
+ ppi: PeerPullCreditRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskIdStr;
+ }
+ export function forPeerPullPaymentDebit(
+ ppi: PeerPullPaymentIncomingRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskIdStr;
+ }
+ export function forPeerPushCredit(
+ ppi: PeerPushPaymentIncomingRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskIdStr;
+ }
+}
+
+/**
+ * Result of a transaction transition.
+ */
+export enum TransitionResultType {
+ Transition = 1,
+ Stay = 2,
+ Delete = 3,
+}
+
+export type TransitionResult<R> =
+ | { type: TransitionResultType.Stay }
+ | { type: TransitionResultType.Transition; rec: R }
+ | { type: TransitionResultType.Delete };
+
+export const TransitionResult = {
+ stay<T>(): TransitionResult<T> {
+ return { type: TransitionResultType.Stay };
+ },
+ delete<T>(): TransitionResult<T> {
+ return { type: TransitionResultType.Delete };
+ },
+ transition<T>(rec: T): TransitionResult<T> {
+ return {
+ type: TransitionResultType.Transition,
+ rec,
+ };
+ },
+};
+
+/**
+ * Transaction context.
+ * Uniform interface to all transactions.
+ */
+export interface TransactionContext {
+ get taskId(): TaskIdStr | undefined;
+ get transactionId(): TransactionIdStr;
+ abortTransaction(): Promise<void>;
+ suspendTransaction(): Promise<void>;
+ resumeTransaction(): Promise<void>;
+ failTransaction(): Promise<void>;
+ deleteTransaction(): Promise<void>;
+}
+
+/**
+ * Type and schema definitions for pending tasks in the wallet.
+ *
+ * These are only used internally, and are not part of the stable public
+ * interface to the wallet.
+ */
+
+export enum PendingTaskType {
+ ExchangeUpdate = "exchange-update",
+ Purchase = "purchase",
+ Refresh = "refresh",
+ Recoup = "recoup",
+ RewardPickup = "reward-pickup",
+ Withdraw = "withdraw",
+ Deposit = "deposit",
+ Backup = "backup",
+ PeerPushDebit = "peer-push-debit",
+ PeerPullCredit = "peer-pull-credit",
+ PeerPushCredit = "peer-push-credit",
+ PeerPullDebit = "peer-pull-debit",
+}
+
+declare const __taskIdStr: unique symbol;
+export type TaskIdStr = string & { [__taskIdStr]: true };
+
+/**
+ * Wait until the wallet is in a particular state.
+ *
+ * Two functions must be provided:
+ * 1. checkState, which checks if the wallet is in the
+ * desired state.
+ * 2. filterNotification, which checks whether a notification
+ * might have lead to a state change.
+ */
+export async function genericWaitForState(
+ wex: WalletExecutionContext,
+ args: {
+ checkState: () => Promise<boolean>;
+ filterNotification: (notif: WalletNotification) => boolean;
+ },
+): Promise<void> {
+ await wex.taskScheduler.ensureRunning();
+
+ // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+ const flag = new AsyncFlag();
+ // Raise purchaseNotifFlag whenever we get a notification
+ // about our refresh.
+ const cancelNotif = wex.ws.addNotificationListener((notif) => {
+ if (args.filterNotification(notif)) {
+ flag.raise();
+ }
+ });
+ const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+ cancelNotif();
+ flag.raise();
+ });
- runUntilDone(req?: { maxRetries?: number }): Promise<void>;
+ try {
+ while (true) {
+ if (wex.cancellationToken.isCancelled) {
+ throw Error("cancelled");
+ }
+ if (await args.checkState()) {
+ return;
+ }
+ // Wait for the next transition
+ await flag.wait();
+ flag.reset();
+ }
+ } catch (e) {
+ unregisterOnCancelled();
+ cancelNotif();
+ }
}