summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/deposits.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-21 14:23:01 +0100
committerFlorian Dold <florian@dold.me>2024-02-21 14:23:01 +0100
commit52a1f63e0a8cc2ca78910e8b56326376eb1d75d0 (patch)
treee59e898731a9eb76a9af3cec75256b5a07adf893 /packages/taler-wallet-core/src/deposits.ts
parent612b85c18fc17af412d08e075e1fddaa67aa7bf0 (diff)
downloadwallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.gz
wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.bz2
wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.zip
wallet-core: use cancellation tokens when possible
Diffstat (limited to 'packages/taler-wallet-core/src/deposits.ts')
-rw-r--r--packages/taler-wallet-core/src/deposits.ts148
1 files changed, 75 insertions, 73 deletions
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 617f32887..ed8778368 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -75,7 +75,7 @@ import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { selectPayCoinsNew } from "./coinSelection.js";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TombstoneTag,
TransactionContext,
@@ -119,7 +119,8 @@ const logger = new Logger("deposits.ts");
export class DepositTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly taskId: TaskId;
+ readonly taskId: TaskIdStr;
+
constructor(
public ws: InternalWalletState,
public depositGroupId: string,
@@ -210,7 +211,8 @@ export class DepositTransactionContext implements TransactionContext {
switch (dg.operationStatus) {
case DepositOperationStatus.Finished:
return undefined;
- case DepositOperationStatus.PendingDeposit: {
+ case DepositOperationStatus.PendingDeposit:
+ case DepositOperationStatus.SuspendedDeposit: {
dg.operationStatus = DepositOperationStatus.Aborting;
await tx.depositGroups.put(dg);
return {
@@ -218,9 +220,6 @@ export class DepositTransactionContext implements TransactionContext {
newTxState: computeDepositTransactionStatus(dg),
};
}
- case DepositOperationStatus.SuspendedDeposit:
- // FIXME: Can we abort a suspended transaction?!
- return undefined;
}
return undefined;
},
@@ -410,74 +409,10 @@ export function computeDepositTransactionActions(
}
}
-/**
- * Check whether the refresh associated with the
- * aborting deposit group is done.
- *
- * If done, mark the deposit transaction as aborted.
- *
- * Otherwise continue waiting.
- *
- * FIXME: Wait for the refresh group notifications instead of periodically
- * checking the refresh group status.
- * FIXME: This is just one transaction, can't we do this in the initial
- * transaction of processDepositGroup?
- */
-async function waitForRefreshOnDepositGroup(
- ws: InternalWalletState,
- depositGroup: DepositGroupRecord,
-): Promise<TaskRunResult> {
- const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
- checkLogicInvariant(!!abortRefreshGroupId);
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId: depositGroup.depositGroupId,
- });
- const transitionInfo = await ws.db.runReadWriteTx(
- ["depositGroups", "refreshGroups"],
- async (tx) => {
- const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
- let newOpState: DepositOperationStatus | undefined;
- if (!refreshGroup) {
- // Maybe it got manually deleted? Means that we should
- // just go into aborted.
- logger.warn("no aborting refresh group found for deposit group");
- newOpState = DepositOperationStatus.Aborted;
- } else {
- if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
- newOpState = DepositOperationStatus.Aborted;
- } else if (
- refreshGroup.operationStatus === RefreshOperationStatus.Failed
- ) {
- newOpState = DepositOperationStatus.Aborted;
- }
- }
- if (newOpState) {
- const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
- if (!newDg) {
- return;
- }
- const oldTxState = computeDepositTransactionStatus(newDg);
- newDg.operationStatus = newOpState;
- const newTxState = computeDepositTransactionStatus(newDg);
- await tx.depositGroups.put(newDg);
- return { oldTxState, newTxState };
- }
- return undefined;
- },
- );
-
- notifyTransition(ws, transactionId, transitionInfo);
- ws.notify({
- type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
- });
- return TaskRunResult.backoff();
-}
-
async function refundDepositGroup(
ws: InternalWalletState,
depositGroup: DepositGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const newTxPerCoin = [...depositGroup.statusPerCoin];
logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`);
@@ -520,6 +455,7 @@ async function refundDepositGroup(
const httpResp = await ws.http.fetch(refundUrl.href, {
method: "POST",
body: refundReq,
+ cancellationToken,
});
logger.info(
`coin ${i} refund HTTP status for coin: ${httpResp.status}`,
@@ -600,15 +536,81 @@ async function refundDepositGroup(
return TaskRunResult.backoff();
}
+/**
+ * Check whether the refresh associated with the
+ * aborting deposit group is done.
+ *
+ * If done, mark the deposit transaction as aborted.
+ *
+ * Otherwise continue waiting.
+ *
+ * FIXME: Wait for the refresh group notifications instead of periodically
+ * checking the refresh group status.
+ * FIXME: This is just one transaction, can't we do this in the initial
+ * transaction of processDepositGroup?
+ */
+async function waitForRefreshOnDepositGroup(
+ ws: InternalWalletState,
+ depositGroup: DepositGroupRecord,
+): Promise<TaskRunResult> {
+ const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
+ checkLogicInvariant(!!abortRefreshGroupId);
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: depositGroup.depositGroupId,
+ });
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups", "refreshGroups"],
+ async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
+ let newOpState: DepositOperationStatus | undefined;
+ if (!refreshGroup) {
+ // Maybe it got manually deleted? Means that we should
+ // just go into aborted.
+ logger.warn("no aborting refresh group found for deposit group");
+ newOpState = DepositOperationStatus.Aborted;
+ } else {
+ if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
+ newOpState = DepositOperationStatus.Aborted;
+ } else if (
+ refreshGroup.operationStatus === RefreshOperationStatus.Failed
+ ) {
+ newOpState = DepositOperationStatus.Aborted;
+ }
+ }
+ if (newOpState) {
+ const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
+ if (!newDg) {
+ return;
+ }
+ const oldTxState = computeDepositTransactionStatus(newDg);
+ newDg.operationStatus = newOpState;
+ const newTxState = computeDepositTransactionStatus(newDg);
+ await tx.depositGroups.put(newDg);
+ return { oldTxState, newTxState };
+ }
+ return undefined;
+ },
+ );
+
+ notifyTransition(ws, transactionId, transitionInfo);
+ ws.notify({
+ type: NotificationType.BalanceChange,
+ hintTransactionId: transactionId,
+ });
+ return TaskRunResult.backoff();
+}
+
async function processDepositGroupAborting(
ws: InternalWalletState,
depositGroup: DepositGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.info("processing deposit tx in 'aborting'");
const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
if (!abortRefreshGroupId) {
logger.info("refunding deposit group");
- return refundDepositGroup(ws, depositGroup);
+ return refundDepositGroup(ws, depositGroup, cancellationToken);
}
logger.info("waiting for refresh");
return waitForRefreshOnDepositGroup(ws, depositGroup);
@@ -1059,7 +1061,7 @@ export async function processDepositGroup(
cancellationToken,
);
case DepositOperationStatus.Aborting:
- return processDepositGroupAborting(ws, depositGroup);
+ return processDepositGroupAborting(ws, depositGroup, cancellationToken);
}
return TaskRunResult.finished();