summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/common.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/common.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/common.ts466
1 files changed, 63 insertions, 403 deletions
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts
index 4c7c55212..92950b35b 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -21,7 +21,6 @@ import {
AbsoluteTime,
AmountJson,
Amounts,
- CancellationToken,
CoinRefreshRequest,
CoinStatus,
Duration,
@@ -29,22 +28,15 @@ import {
ExchangeEntryStatus,
ExchangeTosStatus,
ExchangeUpdateStatus,
- getErrorDetailFromException,
- j2s,
Logger,
- makeErrorDetail,
- NotificationType,
RefreshReason,
- TalerError,
- TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
+ TalerProtocolTimestamp,
TombstoneIdStr,
TransactionIdStr,
- TransactionType,
- WalletNotification,
+ durationMul,
} from "@gnu-taler/taler-util";
-import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
import {
BackupProviderRecord,
CoinRecord,
@@ -61,17 +53,16 @@ import {
RecoupGroupRecord,
RefreshGroupRecord,
RewardRecord,
- timestampPreciseToDb,
WalletStoresV1,
WithdrawalGroupRecord,
+ timestampPreciseToDb,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
+import { GetReadWriteAccess } from "../util/query.js";
import { createRefreshGroup } from "./refresh.js";
-import { constructTransactionIdentifier } from "./transactions.js";
const logger = new Logger("operations/common.ts");
@@ -251,331 +242,6 @@ export async function spendCoins(
);
}
-/**
- * Convert the task ID for a task that processes a transaction int
- * the ID for the transaction.
- */
-function convertTaskToTransactionId(
- taskId: string,
-): TransactionIdStr | undefined {
- const parsedTaskId = parseTaskIdentifier(taskId);
- switch (parsedTaskId.tag) {
- case PendingTaskType.PeerPullCredit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPullCredit,
- pursePub: parsedTaskId.pursePub,
- });
- case PendingTaskType.PeerPullDebit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPullDebit,
- peerPullDebitId: parsedTaskId.peerPullDebitId,
- });
- // FIXME: This doesn't distinguish internal-withdrawal.
- // Maybe we should have a different task type for that as well?
- // Or maybe transaction IDs should be valid task identifiers?
- case PendingTaskType.Withdraw:
- return constructTransactionIdentifier({
- tag: TransactionType.Withdrawal,
- withdrawalGroupId: parsedTaskId.withdrawalGroupId,
- });
- case PendingTaskType.PeerPushCredit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPushCredit,
- peerPushCreditId: parsedTaskId.peerPushCreditId,
- });
- case PendingTaskType.Deposit:
- return constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId: parsedTaskId.depositGroupId,
- });
- case PendingTaskType.Refresh:
- return constructTransactionIdentifier({
- tag: TransactionType.Refresh,
- refreshGroupId: parsedTaskId.refreshGroupId,
- });
- case PendingTaskType.RewardPickup:
- return constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: parsedTaskId.walletRewardId,
- });
- case PendingTaskType.PeerPushDebit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPushDebit,
- pursePub: parsedTaskId.pursePub,
- });
- case PendingTaskType.Purchase:
- return constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId: parsedTaskId.proposalId,
- });
- default:
- return undefined;
- }
-}
-
-async function makeTransactionRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- const txId = convertTaskToTransactionId(pendingTaskId);
- if (!txId) {
- return undefined;
- }
- const txState = await ws.getTransactionState(ws, tx, txId);
- if (!txState) {
- return undefined;
- }
- const notif: WalletNotification = {
- type: NotificationType.TransactionStateTransition,
- transactionId: txId,
- oldTxState: txState,
- newTxState: txState,
- };
- if (e) {
- notif.errorInfo = {
- code: e.code as number,
- hint: e.hint,
- };
- }
- return notif;
-}
-
-async function makeExchangeRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- logger.info("making exchange retry notification");
- const parsedTaskId = parseTaskIdentifier(pendingTaskId);
- if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
- throw Error("invalid task identifier");
- }
- const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
-
- if (!rec) {
- logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
- return undefined;
- }
-
- const notif: WalletNotification = {
- type: NotificationType.ExchangeStateTransition,
- exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
- oldExchangeState: getExchangeState(rec),
- newExchangeState: getExchangeState(rec),
- };
- if (e) {
- notif.errorInfo = {
- code: e.code as number,
- hint: e.hint,
- };
- }
- return notif;
-}
-
-/**
- * Generate an appropriate error transition notification
- * for applicable tasks.
- *
- * Namely, transition notifications are generated for:
- * - exchange update errors
- * - transactions
- */
-async function taskToRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- const parsedTaskId = parseTaskIdentifier(pendingTaskId);
-
- switch (parsedTaskId.tag) {
- case PendingTaskType.ExchangeUpdate:
- return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
- case PendingTaskType.PeerPullCredit:
- case PendingTaskType.PeerPullDebit:
- case PendingTaskType.Withdraw:
- case PendingTaskType.PeerPushCredit:
- case PendingTaskType.Deposit:
- case PendingTaskType.Refresh:
- case PendingTaskType.RewardPickup:
- case PendingTaskType.PeerPushDebit:
- case PendingTaskType.Purchase:
- return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
- case PendingTaskType.Backup:
- case PendingTaskType.ExchangeCheckRefresh:
- case PendingTaskType.Recoup:
- return undefined;
- }
-}
-
-async function storePendingTaskError(
- ws: InternalWalletState,
- pendingTaskId: string,
- e: TalerErrorDetail,
-): Promise<void> {
- logger.info(`storing pending task error for ${pendingTaskId}`);
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- if (!retryRecord) {
- retryRecord = {
- id: pendingTaskId,
- lastError: e,
- retryInfo: DbRetryInfo.reset(),
- };
- } else {
- retryRecord.lastError = e;
- retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
- }
- await tx.operationRetries.put(retryRecord);
- return taskToRetryNotification(ws, tx, pendingTaskId, e);
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-export async function resetPendingTaskTimeout(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- if (retryRecord) {
- // Note that we don't reset the lastError, it should still be visible
- // while the retry runs.
- retryRecord.retryInfo = DbRetryInfo.reset();
- await tx.operationRetries.put(retryRecord);
- }
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-async function storePendingTaskPending(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- let hadError = false;
- if (!retryRecord) {
- retryRecord = {
- id: pendingTaskId,
- retryInfo: DbRetryInfo.reset(),
- };
- } else {
- if (retryRecord.lastError) {
- hadError = true;
- }
- delete retryRecord.lastError;
- retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
- }
- await tx.operationRetries.put(retryRecord);
- if (hadError) {
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- } else {
- return undefined;
- }
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-async function storePendingTaskFinished(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- await ws.db
- .mktx((x) => [x.operationRetries])
- .runReadWrite(async (tx) => {
- await tx.operationRetries.delete(pendingTaskId);
- });
-}
-
-export async function runTaskWithErrorReporting(
- ws: InternalWalletState,
- opId: TaskId,
- f: () => Promise<TaskRunResult>,
-): Promise<TaskRunResult> {
- let maybeError: TalerErrorDetail | undefined;
- try {
- const resp = await f();
- switch (resp.type) {
- case TaskRunResultType.Error:
- await storePendingTaskError(ws, opId, resp.errorDetail);
- return resp;
- case TaskRunResultType.Finished:
- await storePendingTaskFinished(ws, opId);
- return resp;
- case TaskRunResultType.Pending:
- await storePendingTaskPending(ws, opId);
- return resp;
- case TaskRunResultType.Longpoll:
- return resp;
- }
- } catch (e) {
- if (e instanceof CryptoApiStoppedError) {
- if (ws.stopped) {
- logger.warn("crypto API stopped during shutdown, ignoring error");
- return {
- type: TaskRunResultType.Error,
- errorDetail: makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- "Crypto API stopped during shutdown",
- ),
- };
- }
- }
- if (e instanceof TalerError) {
- logger.warn("operation processed resulted in error");
- logger.warn(`error was: ${j2s(e.errorDetail)}`);
- maybeError = e.errorDetail;
- await storePendingTaskError(ws, opId, maybeError!);
- return {
- type: TaskRunResultType.Error,
- errorDetail: e.errorDetail,
- };
- } else if (e instanceof Error) {
- // This is a bug, as we expect pending operations to always
- // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
- // or return something.
- logger.error(`Uncaught exception: ${e.message}`);
- logger.error(`Stack: ${e.stack}`);
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {
- stack: e.stack,
- },
- `unexpected exception (message: ${e.message})`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- } else {
- logger.error("Uncaught exception, value is not even an error.");
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- `unexpected exception (not even an error)`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- }
- }
-}
-
export enum TombstoneTag {
DeleteWithdrawalGroup = "delete-withdrawal-group",
DeleteReserve = "delete-reserve",
@@ -646,47 +312,6 @@ export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState {
};
}
-export interface LongpollResult {
- ready: boolean;
-}
-
-export function runLongpollAsync(
- ws: InternalWalletState,
- retryTag: string,
- reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
-): void {
- const asyncFn = async () => {
- if (ws.stopped) {
- logger.trace("not long-polling reserve, wallet already stopped");
- await storePendingTaskPending(ws, retryTag);
- return;
- }
- const cts = CancellationToken.create();
- let res: { ready: boolean } | undefined = undefined;
- try {
- ws.activeLongpoll[retryTag] = {
- cancel: () => {
- logger.trace("cancel of reserve longpoll requested");
- cts.cancel();
- },
- };
- res = await reqFn(cts.token);
- } catch (e) {
- const errDetail = getErrorDetailFromException(e);
- logger.warn(`got error during long-polling: ${j2s(errDetail)}`);
- await storePendingTaskError(ws, retryTag, errDetail);
- return;
- } finally {
- delete ws.activeLongpoll[retryTag];
- }
- if (!res.ready) {
- await storePendingTaskPending(ws, retryTag);
- }
- ws.workAvailable.trigger();
- };
- asyncFn();
-}
-
export type ParsedTombstone =
| {
tag: TombstoneTag.DeleteWithdrawalGroup;
@@ -732,31 +357,53 @@ export interface TransactionManager {
export enum TaskRunResultType {
Finished = "finished",
- Pending = "pending",
+ Backoff = "backoff",
+ Progress = "progress",
Error = "error",
- Longpoll = "longpoll",
+ ScheduleLater = "schedule-later",
}
export type TaskRunResult =
| TaskRunFinishedResult
| TaskRunErrorResult
- | TaskRunLongpollResult
- | TaskRunPendingResult;
+ | TaskRunBackoffResult
+ | TaskRunProgressResult
+ | TaskRunScheduleLaterResult;
export namespace TaskRunResult {
+ /**
+ * Task is finished and does not need to be processed again.
+ */
export function finished(): TaskRunResult {
return {
type: TaskRunResultType.Finished,
};
}
- export function pending(): TaskRunResult {
+ /**
+ * Task is waiting for something, should be invoked
+ * again with exponentiall back-off until some other
+ * result is returned.
+ */
+ export function backoff(): TaskRunResult {
+ return {
+ type: TaskRunResultType.Backoff,
+ };
+ }
+ /**
+ * Task made progress and should be processed again.
+ */
+ export function progress(): TaskRunResult {
return {
- type: TaskRunResultType.Pending,
+ type: TaskRunResultType.Progress,
};
}
- export function longpoll(): TaskRunResult {
+ /**
+ * Run the task again at a fixed time in the future.
+ */
+ export function runAgainAt(runAt: AbsoluteTime): TaskRunResult {
return {
- type: TaskRunResultType.Longpoll,
+ type: TaskRunResultType.ScheduleLater,
+ runAt,
};
}
}
@@ -765,8 +412,17 @@ export interface TaskRunFinishedResult {
type: TaskRunResultType.Finished;
}
-export interface TaskRunPendingResult {
- type: TaskRunResultType.Pending;
+export interface TaskRunBackoffResult {
+ type: TaskRunResultType.Backoff;
+}
+
+export interface TaskRunProgressResult {
+ type: TaskRunResultType.Progress;
+}
+
+export interface TaskRunScheduleLaterResult {
+ type: TaskRunResultType.ScheduleLater;
+ runAt: AbsoluteTime;
}
export interface TaskRunErrorResult {
@@ -774,10 +430,6 @@ export interface TaskRunErrorResult {
errorDetail: TalerErrorDetail;
}
-export interface TaskRunLongpollResult {
- type: TaskRunResultType.Longpoll;
-}
-
export interface DbRetryInfo {
firstTry: DbPreciseTimestamp;
nextRetry: DbPreciseTimestamp;
@@ -867,6 +519,24 @@ export namespace DbRetryInfo {
}
/**
+ * Timestamp after which the wallet would do an auto-refresh.
+ */
+export function getAutoRefreshExecuteThreshold(d: {
+ stampExpireWithdraw: TalerProtocolTimestamp;
+ stampExpireDeposit: TalerProtocolTimestamp;
+}): AbsoluteTime {
+ const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireWithdraw,
+ );
+ const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireDeposit,
+ );
+ const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+ const deltaDiv = durationMul(delta, 0.5);
+ return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
+}
+
+/**
* Parsed representation of task identifiers.
*/
export type ParsedTaskIdentifier =
@@ -877,7 +547,6 @@ export type ParsedTaskIdentifier =
| { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
| { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
| { tag: PendingTaskType.Deposit; depositGroupId: string }
- | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string }
| { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string }
| { tag: PendingTaskType.PeerPullCredit; pursePub: string }
| { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string }
@@ -900,8 +569,6 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.Deposit:
return { tag: type, depositGroupId: rest[0] };
- case PendingTaskType.ExchangeCheckRefresh:
- return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.ExchangeUpdate:
return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.PeerPullCredit:
@@ -933,8 +600,6 @@ export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId {
return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId;
case PendingTaskType.Deposit:
return `${p.tag}:${p.depositGroupId}` as TaskId;
- case PendingTaskType.ExchangeCheckRefresh:
- return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
case PendingTaskType.ExchangeUpdate:
return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
case PendingTaskType.PeerPullDebit:
@@ -974,11 +639,6 @@ export namespace TaskIdentifiers {
exchBaseUrl,
)}` as TaskId;
}
- export function forExchangeCheckRefresh(exch: ExchangeEntryRecord): TaskId {
- return `${PendingTaskType.ExchangeCheckRefresh}:${encodeURIComponent(
- exch.baseUrl,
- )}` as TaskId;
- }
export function forTipPickup(tipRecord: RewardRecord): TaskId {
return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId;
}