summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-21 14:23:01 +0100
committerFlorian Dold <florian@dold.me>2024-02-21 14:23:01 +0100
commit52a1f63e0a8cc2ca78910e8b56326376eb1d75d0 (patch)
treee59e898731a9eb76a9af3cec75256b5a07adf893 /packages/taler-wallet-core/src
parent612b85c18fc17af412d08e075e1fddaa67aa7bf0 (diff)
downloadwallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.gz
wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.tar.bz2
wallet-core-52a1f63e0a8cc2ca78910e8b56326376eb1d75d0.zip
wallet-core: use cancellation tokens when possible
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r--packages/taler-wallet-core/src/common.ts88
-rw-r--r--packages/taler-wallet-core/src/denominations.ts3
-rw-r--r--packages/taler-wallet-core/src/deposits.ts148
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts7
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts73
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-credit.ts37
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts4
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-credit.ts31
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts32
-rw-r--r--packages/taler-wallet-core/src/recoup.ts8
-rw-r--r--packages/taler-wallet-core/src/refresh.ts59
-rw-r--r--packages/taler-wallet-core/src/reward.ts5
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts24
-rw-r--r--packages/taler-wallet-core/src/transactions.ts9
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts78
15 files changed, 374 insertions, 232 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index 9b69ad6c4..8c6650f4a 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -337,7 +337,7 @@ export function constructTombstone(p: ParsedTombstone): TombstoneIdStr {
* Uniform interface for a particular wallet transaction.
*/
export interface TransactionManager {
- get taskId(): TaskId;
+ get taskId(): TaskIdStr;
get transactionId(): TransactionIdStr;
fail(): Promise<void>;
abort(): Promise<void>;
@@ -600,90 +600,92 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
}
}
-export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId {
+export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskIdStr {
switch (p.tag) {
case PendingTaskType.Backup:
- return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId;
+ return `${p.tag}:${p.backupProviderBaseUrl}` as TaskIdStr;
case PendingTaskType.Deposit:
- return `${p.tag}:${p.depositGroupId}` as TaskId;
+ return `${p.tag}:${p.depositGroupId}` as TaskIdStr;
case PendingTaskType.ExchangeUpdate:
- return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
+ return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskIdStr;
case PendingTaskType.PeerPullDebit:
- return `${p.tag}:${p.peerPullDebitId}` as TaskId;
+ return `${p.tag}:${p.peerPullDebitId}` as TaskIdStr;
case PendingTaskType.PeerPushCredit:
- return `${p.tag}:${p.peerPushCreditId}` as TaskId;
+ return `${p.tag}:${p.peerPushCreditId}` as TaskIdStr;
case PendingTaskType.PeerPullCredit:
- return `${p.tag}:${p.pursePub}` as TaskId;
+ return `${p.tag}:${p.pursePub}` as TaskIdStr;
case PendingTaskType.PeerPushDebit:
- return `${p.tag}:${p.pursePub}` as TaskId;
+ return `${p.tag}:${p.pursePub}` as TaskIdStr;
case PendingTaskType.Purchase:
- return `${p.tag}:${p.proposalId}` as TaskId;
+ return `${p.tag}:${p.proposalId}` as TaskIdStr;
case PendingTaskType.Recoup:
- return `${p.tag}:${p.recoupGroupId}` as TaskId;
+ return `${p.tag}:${p.recoupGroupId}` as TaskIdStr;
case PendingTaskType.Refresh:
- return `${p.tag}:${p.refreshGroupId}` as TaskId;
+ return `${p.tag}:${p.refreshGroupId}` as TaskIdStr;
case PendingTaskType.RewardPickup:
- return `${p.tag}:${p.walletRewardId}` as TaskId;
+ return `${p.tag}:${p.walletRewardId}` as TaskIdStr;
case PendingTaskType.Withdraw:
- return `${p.tag}:${p.withdrawalGroupId}` as TaskId;
+ return `${p.tag}:${p.withdrawalGroupId}` as TaskIdStr;
default:
assertUnreachable(p);
}
}
export namespace TaskIdentifiers {
- export function forWithdrawal(wg: WithdrawalGroupRecord): TaskId {
- return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskId;
+ export function forWithdrawal(wg: WithdrawalGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}` as TaskIdStr;
}
- export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskId {
+ export function forExchangeUpdate(exch: ExchangeEntryRecord): TaskIdStr {
return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent(
exch.baseUrl,
- )}` as TaskId;
+ )}` as TaskIdStr;
}
- export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskId {
+ export function forExchangeUpdateFromUrl(exchBaseUrl: string): TaskIdStr {
return `${PendingTaskType.ExchangeUpdate}:${encodeURIComponent(
exchBaseUrl,
- )}` as TaskId;
+ )}` as TaskIdStr;
}
- export function forTipPickup(tipRecord: RewardRecord): TaskId {
- return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId;
+ export function forTipPickup(tipRecord: RewardRecord): TaskIdStr {
+ return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskIdStr;
}
- export function forRefresh(refreshGroupRecord: RefreshGroupRecord): TaskId {
- return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskId;
+ export function forRefresh(
+ refreshGroupRecord: RefreshGroupRecord,
+ ): TaskIdStr {
+ return `${PendingTaskType.Refresh}:${refreshGroupRecord.refreshGroupId}` as TaskIdStr;
}
- export function forPay(purchaseRecord: PurchaseRecord): TaskId {
- return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskId;
+ export function forPay(purchaseRecord: PurchaseRecord): TaskIdStr {
+ return `${PendingTaskType.Purchase}:${purchaseRecord.proposalId}` as TaskIdStr;
}
- export function forRecoup(recoupRecord: RecoupGroupRecord): TaskId {
- return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskId;
+ export function forRecoup(recoupRecord: RecoupGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}` as TaskIdStr;
}
- export function forDeposit(depositRecord: DepositGroupRecord): TaskId {
- return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskId;
+ export function forDeposit(depositRecord: DepositGroupRecord): TaskIdStr {
+ return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}` as TaskIdStr;
}
- export function forBackup(backupRecord: BackupProviderRecord): TaskId {
+ export function forBackup(backupRecord: BackupProviderRecord): TaskIdStr {
return `${PendingTaskType.Backup}:${encodeURIComponent(
backupRecord.baseUrl,
- )}` as TaskId;
+ )}` as TaskIdStr;
}
export function forPeerPushPaymentInitiation(
ppi: PeerPushDebitRecord,
- ): TaskId {
- return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskId;
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPushDebit}:${ppi.pursePub}` as TaskIdStr;
}
export function forPeerPullPaymentInitiation(
ppi: PeerPullCreditRecord,
- ): TaskId {
- return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskId;
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPullCredit}:${ppi.pursePub}` as TaskIdStr;
}
export function forPeerPullPaymentDebit(
ppi: PeerPullPaymentIncomingRecord,
- ): TaskId {
- return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskId;
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPullDebit}:${ppi.peerPullDebitId}` as TaskIdStr;
}
export function forPeerPushCredit(
ppi: PeerPushPaymentIncomingRecord,
- ): TaskId {
- return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskId;
+ ): TaskIdStr {
+ return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushCreditId}` as TaskIdStr;
}
}
@@ -700,6 +702,8 @@ export enum TransitionResult {
* Uniform interface to all transactions.
*/
export interface TransactionContext {
+ get taskId(): TaskIdStr | undefined;
+ get transactionId(): TransactionIdStr;
abortTransaction(): Promise<void>;
suspendTransaction(): Promise<void>;
resumeTransaction(): Promise<void>;
@@ -729,5 +733,5 @@ export enum PendingTaskType {
PeerPullDebit = "peer-pull-debit",
}
-declare const __taskId: unique symbol;
-export type TaskId = string & { [__taskId]: true };
+declare const __taskIdStr: unique symbol;
+export type TaskIdStr = string & { [__taskIdStr]: true };
diff --git a/packages/taler-wallet-core/src/denominations.ts b/packages/taler-wallet-core/src/denominations.ts
index 177070622..a539918de 100644
--- a/packages/taler-wallet-core/src/denominations.ts
+++ b/packages/taler-wallet-core/src/denominations.ts
@@ -14,6 +14,9 @@
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+/**
+ * Imports.
+ */
import {
AbsoluteTime,
AmountJson,
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 617f32887..ed8778368 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -75,7 +75,7 @@ import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { selectPayCoinsNew } from "./coinSelection.js";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TombstoneTag,
TransactionContext,
@@ -119,7 +119,8 @@ const logger = new Logger("deposits.ts");
export class DepositTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly taskId: TaskId;
+ readonly taskId: TaskIdStr;
+
constructor(
public ws: InternalWalletState,
public depositGroupId: string,
@@ -210,7 +211,8 @@ export class DepositTransactionContext implements TransactionContext {
switch (dg.operationStatus) {
case DepositOperationStatus.Finished:
return undefined;
- case DepositOperationStatus.PendingDeposit: {
+ case DepositOperationStatus.PendingDeposit:
+ case DepositOperationStatus.SuspendedDeposit: {
dg.operationStatus = DepositOperationStatus.Aborting;
await tx.depositGroups.put(dg);
return {
@@ -218,9 +220,6 @@ export class DepositTransactionContext implements TransactionContext {
newTxState: computeDepositTransactionStatus(dg),
};
}
- case DepositOperationStatus.SuspendedDeposit:
- // FIXME: Can we abort a suspended transaction?!
- return undefined;
}
return undefined;
},
@@ -410,74 +409,10 @@ export function computeDepositTransactionActions(
}
}
-/**
- * Check whether the refresh associated with the
- * aborting deposit group is done.
- *
- * If done, mark the deposit transaction as aborted.
- *
- * Otherwise continue waiting.
- *
- * FIXME: Wait for the refresh group notifications instead of periodically
- * checking the refresh group status.
- * FIXME: This is just one transaction, can't we do this in the initial
- * transaction of processDepositGroup?
- */
-async function waitForRefreshOnDepositGroup(
- ws: InternalWalletState,
- depositGroup: DepositGroupRecord,
-): Promise<TaskRunResult> {
- const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
- checkLogicInvariant(!!abortRefreshGroupId);
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId: depositGroup.depositGroupId,
- });
- const transitionInfo = await ws.db.runReadWriteTx(
- ["depositGroups", "refreshGroups"],
- async (tx) => {
- const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
- let newOpState: DepositOperationStatus | undefined;
- if (!refreshGroup) {
- // Maybe it got manually deleted? Means that we should
- // just go into aborted.
- logger.warn("no aborting refresh group found for deposit group");
- newOpState = DepositOperationStatus.Aborted;
- } else {
- if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
- newOpState = DepositOperationStatus.Aborted;
- } else if (
- refreshGroup.operationStatus === RefreshOperationStatus.Failed
- ) {
- newOpState = DepositOperationStatus.Aborted;
- }
- }
- if (newOpState) {
- const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
- if (!newDg) {
- return;
- }
- const oldTxState = computeDepositTransactionStatus(newDg);
- newDg.operationStatus = newOpState;
- const newTxState = computeDepositTransactionStatus(newDg);
- await tx.depositGroups.put(newDg);
- return { oldTxState, newTxState };
- }
- return undefined;
- },
- );
-
- notifyTransition(ws, transactionId, transitionInfo);
- ws.notify({
- type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
- });
- return TaskRunResult.backoff();
-}
-
async function refundDepositGroup(
ws: InternalWalletState,
depositGroup: DepositGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const newTxPerCoin = [...depositGroup.statusPerCoin];
logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`);
@@ -520,6 +455,7 @@ async function refundDepositGroup(
const httpResp = await ws.http.fetch(refundUrl.href, {
method: "POST",
body: refundReq,
+ cancellationToken,
});
logger.info(
`coin ${i} refund HTTP status for coin: ${httpResp.status}`,
@@ -600,15 +536,81 @@ async function refundDepositGroup(
return TaskRunResult.backoff();
}
+/**
+ * Check whether the refresh associated with the
+ * aborting deposit group is done.
+ *
+ * If done, mark the deposit transaction as aborted.
+ *
+ * Otherwise continue waiting.
+ *
+ * FIXME: Wait for the refresh group notifications instead of periodically
+ * checking the refresh group status.
+ * FIXME: This is just one transaction, can't we do this in the initial
+ * transaction of processDepositGroup?
+ */
+async function waitForRefreshOnDepositGroup(
+ ws: InternalWalletState,
+ depositGroup: DepositGroupRecord,
+): Promise<TaskRunResult> {
+ const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
+ checkLogicInvariant(!!abortRefreshGroupId);
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: depositGroup.depositGroupId,
+ });
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups", "refreshGroups"],
+ async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
+ let newOpState: DepositOperationStatus | undefined;
+ if (!refreshGroup) {
+ // Maybe it got manually deleted? Means that we should
+ // just go into aborted.
+ logger.warn("no aborting refresh group found for deposit group");
+ newOpState = DepositOperationStatus.Aborted;
+ } else {
+ if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
+ newOpState = DepositOperationStatus.Aborted;
+ } else if (
+ refreshGroup.operationStatus === RefreshOperationStatus.Failed
+ ) {
+ newOpState = DepositOperationStatus.Aborted;
+ }
+ }
+ if (newOpState) {
+ const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
+ if (!newDg) {
+ return;
+ }
+ const oldTxState = computeDepositTransactionStatus(newDg);
+ newDg.operationStatus = newOpState;
+ const newTxState = computeDepositTransactionStatus(newDg);
+ await tx.depositGroups.put(newDg);
+ return { oldTxState, newTxState };
+ }
+ return undefined;
+ },
+ );
+
+ notifyTransition(ws, transactionId, transitionInfo);
+ ws.notify({
+ type: NotificationType.BalanceChange,
+ hintTransactionId: transactionId,
+ });
+ return TaskRunResult.backoff();
+}
+
async function processDepositGroupAborting(
ws: InternalWalletState,
depositGroup: DepositGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.info("processing deposit tx in 'aborting'");
const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
if (!abortRefreshGroupId) {
logger.info("refunding deposit group");
- return refundDepositGroup(ws, depositGroup);
+ return refundDepositGroup(ws, depositGroup, cancellationToken);
}
logger.info("waiting for refresh");
return waitForRefreshOnDepositGroup(ws, depositGroup);
@@ -1059,7 +1061,7 @@ export async function processDepositGroup(
cancellationToken,
);
case DepositOperationStatus.Aborting:
- return processDepositGroupAborting(ws, depositGroup);
+ return processDepositGroupAborting(ws, depositGroup, cancellationToken);
}
return TaskRunResult.finished();
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index a4732e474..4792c3c20 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -150,6 +150,7 @@ async function downloadExchangeWithTermsOfService(
exchangeBaseUrl: string,
http: HttpRequestLibrary,
timeout: Duration,
+ cancellationToken: CancellationToken,
acceptFormat: string,
acceptLanguage: string | undefined,
): Promise<ExchangeTosDownloadResult> {
@@ -169,6 +170,7 @@ async function downloadExchangeWithTermsOfService(
const resp = await http.fetch(reqUrl.href, {
headers,
timeout,
+ cancellationToken,
});
const tosText = await readSuccessResponseTextOrThrow(resp);
const tosEtag = resp.headers.get("etag") || "unknown";
@@ -789,6 +791,7 @@ async function downloadTosFromAcceptedFormat(
ws: InternalWalletState,
baseUrl: string,
timeout: Duration,
+ cancellationToken: CancellationToken,
acceptedFormat?: string[],
acceptLanguage?: string,
): Promise<ExchangeTosDownloadResult> {
@@ -800,6 +803,7 @@ async function downloadTosFromAcceptedFormat(
baseUrl,
ws.http,
timeout,
+ cancellationToken,
format,
acceptLanguage,
);
@@ -816,6 +820,7 @@ async function downloadTosFromAcceptedFormat(
baseUrl,
ws.http,
timeout,
+ cancellationToken,
"text/plain",
acceptLanguage,
);
@@ -1256,6 +1261,7 @@ export async function updateExchangeFromUrlHandler(
ws,
exchangeBaseUrl,
timeout,
+ cancellationToken,
["text/plain"],
);
@@ -1632,6 +1638,7 @@ export async function getExchangeTos(
ws,
exchangeBaseUrl,
getExchangeRequestTimeout(),
+ CancellationToken.CONTINUE,
acceptedFormat,
acceptLanguage,
);
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index 5e01ae716..09e9e1fb3 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -103,8 +103,8 @@ import {
DbRetryInfo,
PendingTaskType,
spendCoins,
- TaskId,
TaskIdentifiers,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -150,7 +150,7 @@ const logger = new Logger("pay-merchant.ts");
export class PayMerchantTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly taskId: TaskId;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -391,7 +391,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
export class RefundTransactionContext implements TransactionContext {
- public transactionId: string;
+ public transactionId: TransactionIdStr;
+ public taskId: TaskIdStr | undefined = undefined;
constructor(
public ws: InternalWalletState,
public refundGroupId: string,
@@ -612,6 +613,7 @@ export function extractContractData(
async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
return await tx.purchases.get(proposalId);
@@ -662,6 +664,7 @@ async function processDownloadProposal(
method: "POST",
body: requestBody,
timeout: getProposalRequestTimeout(retryRecord?.retryInfo),
+ cancellationToken,
});
const r = await readSuccessResponseJsonOrErrorCode(
httpResponse,
@@ -861,6 +864,7 @@ async function createOrReusePurchase(
sessionId: string | undefined,
claimToken: string | undefined,
noncePriv: string | undefined,
+ cancellationToken: CancellationToken,
): Promise<string> {
const oldProposals = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
return tx.purchases.indexes.byUrlAndOrderId.getAll([
@@ -891,7 +895,11 @@ async function createOrReusePurchase(
);
if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
const download = await expectProposalDownload(ws, oldProposal);
- const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ ws,
+ download.contractData,
+ cancellationToken,
+ );
logger.info(`old proposal paid: ${paid}`);
if (paid) {
// if this transaction was shared and the order is paid then it
@@ -1466,6 +1474,7 @@ export async function preparePayForUri(
uriResult.sessionId,
uriResult.claimToken,
uriResult.noncePriv,
+ CancellationToken.CONTINUE,
);
await waitProposalDownloaded(ws, proposalId);
@@ -1951,20 +1960,20 @@ export async function processPurchase(
switch (purchase.purchaseStatus) {
case PurchaseStatus.PendingDownloadingProposal:
- return processDownloadProposal(ws, proposalId);
+ return processDownloadProposal(ws, proposalId, cancellationToken);
case PurchaseStatus.PendingPaying:
case PurchaseStatus.PendingPayingReplay:
- return processPurchasePay(ws, proposalId);
+ return processPurchasePay(ws, proposalId, cancellationToken);
case PurchaseStatus.PendingQueryingRefund:
- return processPurchaseQueryRefund(ws, purchase);
+ return processPurchaseQueryRefund(ws, purchase, cancellationToken);
case PurchaseStatus.PendingQueryingAutoRefund:
- return processPurchaseAutoRefund(ws, purchase);
+ return processPurchaseAutoRefund(ws, purchase, cancellationToken);
case PurchaseStatus.AbortingWithRefund:
- return processPurchaseAbortingRefund(ws, purchase);
+ return processPurchaseAbortingRefund(ws, purchase, cancellationToken);
case PurchaseStatus.PendingAcceptRefund:
- return processPurchaseAcceptRefund(ws, purchase);
+ return processPurchaseAcceptRefund(ws, purchase, cancellationToken);
case PurchaseStatus.DialogShared:
- return processPurchaseDialogShared(ws, purchase);
+ return processPurchaseDialogShared(ws, purchase, cancellationToken);
case PurchaseStatus.FailedClaim:
case PurchaseStatus.Done:
case PurchaseStatus.DoneRepurchaseDetected:
@@ -1990,6 +1999,7 @@ export async function processPurchase(
async function processPurchasePay(
ws: InternalWalletState,
proposalId: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => {
return tx.purchases.get(proposalId);
@@ -2024,7 +2034,11 @@ async function processPurchasePay(
const download = await expectProposalDownload(ws, purchase);
if (purchase.shared) {
- const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ ws,
+ download.contractData,
+ cancellationToken,
+ );
if (paid) {
const transitionInfo = await ws.db.runReadWriteTx(
@@ -2088,6 +2102,7 @@ async function processPurchasePay(
method: "POST",
body: reqBody,
timeout: getPayRequestTimeout(purchase),
+ cancellationToken,
}),
);
@@ -2163,7 +2178,11 @@ async function processPurchasePay(
};
logger.trace(`/paid request body: ${j2s(reqBody)}`);
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
- ws.http.fetch(payAgainUrl, { method: "POST", body: reqBody }),
+ ws.http.fetch(payAgainUrl, {
+ method: "POST",
+ body: reqBody,
+ cancellationToken,
+ }),
);
logger.trace(`/paid response status: ${resp.status}`);
if (
@@ -2499,6 +2518,7 @@ export async function sharePayment(
async function checkIfOrderIsAlreadyPaid(
ws: InternalWalletState,
contract: WalletContractData,
+ cancellationToken: CancellationToken,
) {
const requestUrl = new URL(
`orders/${contract.orderId}`,
@@ -2508,7 +2528,9 @@ async function checkIfOrderIsAlreadyPaid(
requestUrl.searchParams.set("timeout_ms", "1000");
- const resp = await ws.http.fetch(requestUrl.href);
+ const resp = await ws.http.fetch(requestUrl.href, {
+ cancellationToken,
+ });
if (
resp.status === HttpStatusCode.Ok ||
resp.status === HttpStatusCode.Accepted ||
@@ -2518,13 +2540,14 @@ async function checkIfOrderIsAlreadyPaid(
} else if (resp.status === HttpStatusCode.PaymentRequired) {
return false;
}
- //forbidden, not found, not acceptable
+ // forbidden, not found, not acceptable
throw Error(`this order cant be paid: ${resp.status}`);
}
async function processPurchaseDialogShared(
ws: InternalWalletState,
purchase: PurchaseRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const proposalId = purchase.proposalId;
logger.trace(`processing dialog-shared for proposal ${proposalId}`);
@@ -2534,7 +2557,11 @@ async function processPurchaseDialogShared(
return TaskRunResult.finished();
}
- const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ ws,
+ download.contractData,
+ cancellationToken,
+ );
if (paid) {
const transitionInfo = await ws.db.runReadWriteTx(
["purchases"],
@@ -2565,6 +2592,7 @@ async function processPurchaseDialogShared(
async function processPurchaseAutoRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const proposalId = purchase.proposalId;
logger.trace(`processing auto-refund for proposal ${proposalId}`);
@@ -2624,7 +2652,9 @@ async function processPurchaseAutoRefund(
requestUrl.searchParams.set("timeout_ms", "1000");
requestUrl.searchParams.set("await_refund_obtained", "yes");
- const resp = await ws.http.fetch(requestUrl.href);
+ const resp = await ws.http.fetch(requestUrl.href, {
+ cancellationToken,
+ });
// FIXME: Check other status codes!
@@ -2661,6 +2691,7 @@ async function processPurchaseAutoRefund(
async function processPurchaseAbortingRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const proposalId = purchase.proposalId;
const download = await expectProposalDownload(ws, purchase);
@@ -2701,6 +2732,7 @@ async function processPurchaseAbortingRefund(
const abortHttpResp = await ws.http.fetch(requestUrl.href, {
method: "POST",
body: abortReq,
+ cancellationToken,
});
if (abortHttpResp.status === HttpStatusCode.NotFound) {
@@ -2753,6 +2785,7 @@ async function processPurchaseAbortingRefund(
async function processPurchaseQueryRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const proposalId = purchase.proposalId;
logger.trace(`processing query-refund for proposal ${proposalId}`);
@@ -2768,7 +2801,9 @@ async function processPurchaseQueryRefund(
download.contractData.contractTermsHash,
);
- const resp = await ws.http.fetch(requestUrl.href);
+ const resp = await ws.http.fetch(requestUrl.href, {
+ cancellationToken,
+ });
const orderStatus = await readSuccessResponseJsonOrThrow(
resp,
codecForMerchantOrderStatusPaid(),
@@ -2834,6 +2869,7 @@ async function processPurchaseQueryRefund(
async function processPurchaseAcceptRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const download = await expectProposalDownload(ws, purchase);
@@ -2849,6 +2885,7 @@ async function processPurchaseAcceptRefund(
body: {
h_contract: download.contractData.contractTermsHash,
},
+ cancellationToken,
});
const refundResponse = await readSuccessResponseJsonOrThrow(
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index e764d2169..7774dfd5f 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -54,7 +54,7 @@ import {
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -91,13 +91,13 @@ const logger = new Logger("pay-peer-pull-credit.ts");
export class PeerPullCreditTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly retryTag: TaskId;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
public pursePub: string,
) {
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub,
});
@@ -138,7 +138,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, pursePub, retryTag, transactionId } = this;
+ const { ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -198,7 +198,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { ws, pursePub, retryTag, transactionId } = this;
+ const { ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -249,7 +249,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { ws, pursePub, retryTag, transactionId } = this;
+ const { ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -308,7 +308,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { ws, pursePub, retryTag, transactionId } = this;
+ const { ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -529,6 +529,7 @@ async function longpollKycStatus(
async function processPeerPullCreditAbortingDeletePurse(
ws: InternalWalletState,
peerPullIni: PeerPullCreditRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { pursePub, pursePriv } = peerPullIni;
const transactionId = constructTransactionIdentifier({
@@ -545,6 +546,7 @@ async function processPeerPullCreditAbortingDeletePurse(
headers: {
"taler-purse-signature": sigResp.sig,
},
+ cancellationToken,
});
logger.info(`deleted purse with response status ${resp.status}`);
@@ -637,6 +639,7 @@ async function handlePeerPullCreditWithdrawing(
async function handlePeerPullCreditCreatePurse(
ws: InternalWalletState,
pullIni: PeerPullCreditRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const pursePub = pullIni.pursePub;
@@ -717,13 +720,19 @@ async function handlePeerPullCreditCreatePurse(
const httpResp = await ws.http.fetch(reservePurseMergeUrl.href, {
method: "POST",
body: reservePurseReqBody,
+ cancellationToken,
});
if (httpResp.status === HttpStatusCode.UnavailableForLegalReasons) {
const respJson = await httpResp.json();
const kycPending = codecForWalletKycUuid().decode(respJson);
logger.info(`kyc uuid response: ${j2s(kycPending)}`);
- return processPeerPullCreditKycRequired(ws, pullIni, kycPending);
+ return processPeerPullCreditKycRequired(
+ ws,
+ pullIni,
+ kycPending,
+ cancellationToken,
+ );
}
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
@@ -792,9 +801,13 @@ export async function processPeerPullCredit(
);
}
case PeerPullPaymentCreditStatus.PendingCreatePurse:
- return handlePeerPullCreditCreatePurse(ws, pullIni);
+ return handlePeerPullCreditCreatePurse(ws, pullIni, cancellationToken);
case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- return await processPeerPullCreditAbortingDeletePurse(ws, pullIni);
+ return await processPeerPullCreditAbortingDeletePurse(
+ ws,
+ pullIni,
+ cancellationToken,
+ );
case PeerPullPaymentCreditStatus.PendingWithdrawing:
return handlePeerPullCreditWithdrawing(ws, pullIni);
case PeerPullPaymentCreditStatus.Aborted:
@@ -817,6 +830,7 @@ async function processPeerPullCreditKycRequired(
ws: InternalWalletState,
peerIni: PeerPullCreditRecord,
kycPending: WalletKycUuid,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -833,6 +847,7 @@ async function processPeerPullCreditKycRequired(
logger.info(`kyc url ${url.href}`);
const kycStatusRes = await ws.http.fetch(url.href, {
method: "GET",
+ cancellationToken,
});
if (
@@ -1080,7 +1095,7 @@ export async function initiatePeerPullPayment(
});
notifyTransition(ws, ctx.transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(ctx.retryTag);
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
return {
talerUri: stringifyTalerUri({
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 9fa7eb575..30bd1a2c8 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -68,7 +68,7 @@ import {
import { PeerCoinRepair, selectPeerCoins } from "./coinSelection.js";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TransactionContext,
@@ -105,7 +105,7 @@ const logger = new Logger("pay-peer-pull-debit.ts");
export class PeerPullDebitTransactionContext implements TransactionContext {
ws: InternalWalletState;
readonly transactionId: TransactionIdStr;
- readonly taskId: TaskId;
+ readonly taskId: TaskIdStr;
peerPullDebitId: string;
constructor(ws: InternalWalletState, peerPullDebitId: string) {
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
index 42a5b19df..e629bffe4 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -31,6 +31,7 @@ import {
TalerPreciseTimestamp,
TalerProtocolTimestamp,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -55,7 +56,7 @@ import {
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -93,8 +94,8 @@ import {
const logger = new Logger("pay-peer-push-credit.ts");
export class PeerPushCreditTransactionContext implements TransactionContext {
- readonly transactionId: string;
- readonly retryTag: TaskId;
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -104,7 +105,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
tag: TransactionType.PeerPushCredit,
peerPushCreditId,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushCredit,
peerPushCreditId,
});
@@ -140,7 +141,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, peerPushCreditId, retryTag, transactionId } = this;
+ const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -194,7 +195,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { ws, peerPushCreditId, retryTag, transactionId } = this;
+ const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -251,7 +252,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { ws, peerPushCreditId, retryTag, transactionId } = this;
+ const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -304,7 +305,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { ws, peerPushCreditId, retryTag, transactionId } = this;
+ const { ws, peerPushCreditId, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -575,6 +576,7 @@ async function processPeerPushCreditKycRequired(
ws: InternalWalletState,
peerInc: PeerPushPaymentIncomingRecord,
kycPending: WalletKycUuid,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
@@ -591,6 +593,7 @@ async function processPeerPushCreditKycRequired(
logger.info(`kyc url ${url.href}`);
const kycStatusRes = await ws.http.fetch(url.href, {
method: "GET",
+ cancellationToken,
});
if (
@@ -651,6 +654,7 @@ async function handlePendingMerge(
ws: InternalWalletState,
peerInc: PeerPushPaymentIncomingRecord,
contractTerms: PeerContractTerms,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { peerPushCreditId } = peerInc;
const transactionId = constructTransactionIdentifier({
@@ -705,7 +709,12 @@ async function handlePendingMerge(
const respJson = await mergeHttpResp.json();
const kycPending = codecForWalletKycUuid().decode(respJson);
logger.info(`kyc uuid response: ${j2s(kycPending)}`);
- return processPeerPushCreditKycRequired(ws, peerInc, kycPending);
+ return processPeerPushCreditKycRequired(
+ ws,
+ peerInc,
+ kycPending,
+ cancellationToken,
+ );
}
logger.trace(`merge request: ${j2s(mergeReq)}`);
@@ -887,7 +896,7 @@ export async function processPeerPushCredit(
}
case PeerPushCreditStatus.PendingMerge:
- return handlePendingMerge(ws, peerInc, contractTerms);
+ return handlePendingMerge(ws, peerInc, contractTerms, cancellationToken);
case PeerPushCreditStatus.PendingWithdrawing:
return handlePendingWithdrawing(ws, peerInc);
@@ -940,7 +949,7 @@ export async function confirmPeerPushCredit(
const ctx = new PeerPushCreditTransactionContext(ws, peerPushCreditId);
- ws.taskScheduler.startShepherdTask(ctx.retryTag);
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index 1bb3b8772..40a5d97a4 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -52,7 +52,7 @@ import {
import { PeerCoinRepair, selectPeerCoins } from "./coinSelection.js";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TransactionContext,
@@ -84,7 +84,7 @@ const logger = new Logger("pay-peer-push-debit.ts");
export class PeerPushDebitTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly retryTag: TaskId;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -94,7 +94,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
tag: TransactionType.PeerPushDebit,
pursePub,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub,
});
@@ -112,7 +112,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, retryTag } = this;
+ const { ws, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
@@ -170,7 +170,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, retryTag } = this;
+ const { ws, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
@@ -224,7 +224,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, retryTag } = this;
+ const { ws, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
@@ -282,7 +282,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, retryTag } = this;
+ const { ws, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
@@ -440,6 +440,7 @@ async function handlePurseCreationConflict(
async function processPeerPushDebitCreateReserve(
ws: InternalWalletState,
peerPushInitiation: PeerPushDebitRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const pursePub = peerPushInitiation.pursePub;
const purseExpiration = peerPushInitiation.purseExpiration;
@@ -519,6 +520,7 @@ async function processPeerPushDebitCreateReserve(
const httpResp = await ws.http.fetch(createPurseUrl.href, {
method: "POST",
body: reqBody,
+ cancellationToken,
});
{
@@ -563,6 +565,7 @@ async function processPeerPushDebitCreateReserve(
async function processPeerPushDebitAbortingDeletePurse(
ws: InternalWalletState,
peerPushInitiation: PeerPushDebitRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { pursePub, pursePriv } = peerPushInitiation;
const transactionId = constructTransactionIdentifier({
@@ -582,6 +585,7 @@ async function processPeerPushDebitAbortingDeletePurse(
headers: {
"taler-purse-signature": sigResp.sig,
},
+ cancellationToken,
});
logger.info(`deleted purse with response status ${resp.status}`);
@@ -886,7 +890,11 @@ export async function processPeerPushDebit(
switch (peerPushInitiation.status) {
case PeerPushDebitStatus.PendingCreatePurse:
- return processPeerPushDebitCreateReserve(ws, peerPushInitiation);
+ return processPeerPushDebitCreateReserve(
+ ws,
+ peerPushInitiation,
+ cancellationToken,
+ );
case PeerPushDebitStatus.PendingReady:
return processPeerPushDebitReady(
ws,
@@ -894,7 +902,11 @@ export async function processPeerPushDebit(
cancellationToken,
);
case PeerPushDebitStatus.AbortingDeletePurse:
- return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation);
+ return processPeerPushDebitAbortingDeletePurse(
+ ws,
+ peerPushInitiation,
+ cancellationToken,
+ );
case PeerPushDebitStatus.AbortingRefreshDeleted:
return processPeerPushDebitAbortingRefreshDeleted(ws, peerPushInitiation);
case PeerPushDebitStatus.AbortingRefreshExpired:
@@ -1028,7 +1040,7 @@ export async function initiatePeerPushDebit(
hintTransactionId: transactionId,
});
- ws.taskScheduler.startShepherdTask(ctx.retryTag);
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
return {
contractPriv: contractKeyPair.priv,
diff --git a/packages/taler-wallet-core/src/recoup.ts b/packages/taler-wallet-core/src/recoup.ts
index 6f1546d57..0ec71f4e7 100644
--- a/packages/taler-wallet-core/src/recoup.ts
+++ b/packages/taler-wallet-core/src/recoup.ts
@@ -31,6 +31,7 @@ import {
Logger,
RefreshReason,
TalerPreciseTimestamp,
+ TransactionIdStr,
TransactionType,
URL,
checkDbInvariant,
@@ -43,6 +44,7 @@ import {
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
PendingTaskType,
+ TaskIdStr,
TaskRunResult,
TransactionContext,
constructTaskIdentifier,
@@ -432,8 +434,8 @@ export class RewardTransactionContext implements TransactionContext {
deleteTransaction(): Promise<void> {
throw new Error("Method not implemented.");
}
- public transactionId: string;
- public retryTag: string;
+ public transactionId: TransactionIdStr;
+ public taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -443,7 +445,7 @@ export class RewardTransactionContext implements TransactionContext {
tag: TransactionType.Recoup,
recoupGroupId,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.Recoup,
recoupGroupId,
});
diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts
index 3b75ae2f3..f1ee84f3e 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -52,6 +52,7 @@ import {
TalerErrorDetail,
TalerPreciseTimestamp,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionState,
TransactionType,
@@ -68,7 +69,7 @@ import {
makeCoinAvailable,
makeCoinsVisible,
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -104,8 +105,8 @@ import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
const logger = new Logger("refresh.ts");
export class RefreshTransactionContext implements TransactionContext {
- public transactionId: string;
- readonly taskId: TaskId;
+ public transactionId: TransactionIdStr;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -493,6 +494,7 @@ async function refreshMelt(
ws: InternalWalletState,
refreshGroupId: string,
coinIndex: number,
+ cancellationToken: CancellationToken,
): Promise<void> {
const d = await ws.db.runReadWriteTx(
["refreshGroups", "refreshSessions", "coins", "denominations"],
@@ -606,6 +608,7 @@ async function refreshMelt(
method: "POST",
body: meltReqBody,
timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken,
});
});
@@ -687,6 +690,7 @@ async function refreshMelt(
headers: {
"Taler-Coin-History-Signature": historySig.sig,
},
+ cancellationToken,
});
const historyJson = await historyResp.json();
@@ -789,6 +793,7 @@ async function refreshReveal(
ws: InternalWalletState,
refreshGroupId: string,
coinIndex: number,
+ cancellationToken: CancellationToken,
): Promise<void> {
logger.trace(
`doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
@@ -913,6 +918,7 @@ async function refreshReveal(
body: req,
method: "POST",
timeout: getRefreshRequestTimeout(refreshGroup),
+ cancellationToken,
});
});
@@ -1026,26 +1032,28 @@ export async function processRefreshGroup(
let errors: TalerErrorDetail[] = [];
let inShutdown = false;
const ps = refreshGroup.oldCoinPubs.map((x, i) =>
- processRefreshSession(ws, refreshGroupId, i).catch((x) => {
- if (x instanceof CryptoApiStoppedError) {
- inShutdown = true;
- logger.info(
- "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
- );
- return;
- }
- if (x instanceof TalerError) {
- logger.warn("process refresh session got exception (TalerError)");
- logger.warn(`exc ${x}`);
- logger.warn(`exc stack ${x.stack}`);
- logger.warn(`error detail: ${j2s(x.errorDetail)}`);
- } else {
- logger.warn("process refresh session got exception");
- logger.warn(`exc ${x}`);
- logger.warn(`exc stack ${x.stack}`);
- }
- errors.push(getErrorDetailFromException(x));
- }),
+ processRefreshSession(ws, refreshGroupId, i, cancellationToken).catch(
+ (x) => {
+ if (x instanceof CryptoApiStoppedError) {
+ inShutdown = true;
+ logger.info(
+ "crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
+ );
+ return;
+ }
+ if (x instanceof TalerError) {
+ logger.warn("process refresh session got exception (TalerError)");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ logger.warn(`error detail: ${j2s(x.errorDetail)}`);
+ } else {
+ logger.warn("process refresh session got exception");
+ logger.warn(`exc ${x}`);
+ logger.warn(`exc stack ${x.stack}`);
+ }
+ errors.push(getErrorDetailFromException(x));
+ },
+ ),
);
try {
logger.info("waiting for refreshes");
@@ -1078,6 +1086,7 @@ async function processRefreshSession(
ws: InternalWalletState,
refreshGroupId: string,
coinIndex: number,
+ cancellationToken: CancellationToken,
): Promise<void> {
logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
@@ -1109,9 +1118,9 @@ async function processRefreshSession(
return;
}
if (refreshSession.norevealIndex === undefined) {
- await refreshMelt(ws, refreshGroupId, coinIndex);
+ await refreshMelt(ws, refreshGroupId, coinIndex, cancellationToken);
}
- await refreshReveal(ws, refreshGroupId, coinIndex);
+ await refreshReveal(ws, refreshGroupId, coinIndex, cancellationToken);
}
export interface RefreshOutputInfo {
diff --git a/packages/taler-wallet-core/src/reward.ts b/packages/taler-wallet-core/src/reward.ts
index 6bfd3b324..51eb0f5bd 100644
--- a/packages/taler-wallet-core/src/reward.ts
+++ b/packages/taler-wallet-core/src/reward.ts
@@ -31,6 +31,7 @@ import {
} from "@gnu-taler/taler-util";
import {
PendingTaskType,
+ TaskIdStr,
TaskRunResult,
TombstoneTag,
TransactionContext,
@@ -46,8 +47,8 @@ import { InternalWalletState } from "./wallet.js";
const logger = new Logger("operations/tip.ts");
export class RewardTransactionContext implements TransactionContext {
- public transactionId: string;
- public taskId: string;
+ public transactionId: TransactionIdStr;
+ public taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index e8fddfc73..2fd260b11 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -41,7 +41,7 @@ import { processBackupForProvider } from "./backup/index.js";
import {
DbRetryInfo,
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
@@ -104,7 +104,7 @@ function taskGivesLiveness(taskId: string): boolean {
}
export class TaskScheduler {
- private sheps: Map<TaskId, ShepherdInfo> = new Map();
+ private sheps: Map<TaskIdStr, ShepherdInfo> = new Map();
private iterCond = new AsyncCondition();
@@ -155,7 +155,7 @@ export class TaskScheduler {
logger.info("Done with task loop.");
}
- startShepherdTask(taskId: TaskId): void {
+ startShepherdTask(taskId: TaskIdStr): void {
// Run in the background, no await!
this.internalStartShepherdTask(taskId);
}
@@ -176,7 +176,7 @@ export class TaskScheduler {
}
}
- private async internalStartShepherdTask(taskId: TaskId): Promise<void> {
+ private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
@@ -197,7 +197,7 @@ export class TaskScheduler {
}
}
- stopShepherdTask(taskId: TaskId): void {
+ stopShepherdTask(taskId: TaskIdStr): void {
logger.trace(`Stopping shepherding of ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
@@ -208,12 +208,12 @@ export class TaskScheduler {
}
}
- restartShepherdTask(taskId: TaskId): void {
+ restartShepherdTask(taskId: TaskIdStr): void {
this.stopShepherdTask(taskId);
this.startShepherdTask(taskId);
}
- async resetTaskRetries(taskId: TaskId): Promise<void> {
+ async resetTaskRetries(taskId: TaskIdStr): Promise<void> {
const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
async (tx) => {
await tx.operationRetries.delete(taskId);
@@ -228,7 +228,7 @@ export class TaskScheduler {
}
private async wait(
- taskId: TaskId,
+ taskId: TaskIdStr,
info: ShepherdInfo,
delay: Duration,
): Promise<void> {
@@ -240,7 +240,7 @@ export class TaskScheduler {
}
private async internalShepherdTask(
- taskId: TaskId,
+ taskId: TaskIdStr,
info: ShepherdInfo,
): Promise<void> {
while (true) {
@@ -423,7 +423,7 @@ async function storePendingTaskFinished(
async function runTaskWithErrorReporting(
ws: InternalWalletState,
- opId: TaskId,
+ opId: TaskIdStr,
f: () => Promise<TaskRunResult>,
): Promise<TaskRunResult> {
let maybeError: TalerErrorDetail | undefined;
@@ -510,7 +510,7 @@ async function runTaskWithErrorReporting(
async function callOperationHandlerForTaskId(
ws: InternalWalletState,
- taskId: TaskId,
+ taskId: TaskIdStr,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const pending = parseTaskIdentifier(taskId);
@@ -741,7 +741,7 @@ function convertTaskToTransactionId(
}
export interface ActiveTaskIdsResult {
- taskIds: TaskId[];
+ taskIds: TaskIdStr[];
}
export async function getActiveTaskIds(
diff --git a/packages/taler-wallet-core/src/transactions.ts b/packages/taler-wallet-core/src/transactions.ts
index 7d54ca980..d7f0c0d18 100644
--- a/packages/taler-wallet-core/src/transactions.ts
+++ b/packages/taler-wallet-core/src/transactions.ts
@@ -54,8 +54,8 @@ import {
import {
constructTaskIdentifier,
PendingTaskType,
- TaskId,
TaskIdentifiers,
+ TaskIdStr,
TransactionContext,
} from "./common.js";
import {
@@ -1622,7 +1622,9 @@ export function parseTransactionIdentifier(
}
}
-function maybeTaskFromTransaction(transactionId: string): TaskId | undefined {
+function maybeTaskFromTransaction(
+ transactionId: string,
+): TaskIdStr | undefined {
const parsedTx = parseTransactionIdentifier(transactionId);
if (!parsedTx) {
@@ -1786,6 +1788,9 @@ export async function deleteTransaction(
): Promise<void> {
const ctx = await getContextForTransaction(ws, transactionId);
await ctx.deleteTransaction();
+ if (ctx.taskId) {
+ ws.taskScheduler.stopShepherdTask(ctx.taskId);
+ }
}
export async function abortTransaction(
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 2d9f5c35c..a54295613 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -100,7 +100,7 @@ import {
} from "./coinSelection.js";
import {
PendingTaskType,
- TaskId,
+ TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -155,7 +155,7 @@ const logger = new Logger("operations/withdraw.ts");
export class WithdrawTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
- readonly taskId: TaskId;
+ readonly taskId: TaskIdStr;
constructor(
public ws: InternalWalletState,
@@ -799,6 +799,7 @@ async function handleKycRequired(
resp: HttpResponse,
startIdx: number,
requestCoinIdxs: number[],
+ cancellationToken: CancellationToken,
): Promise<void> {
logger.info("withdrawal requires KYC");
const respJson = await resp.json();
@@ -822,6 +823,7 @@ async function handleKycRequired(
logger.info(`kyc url ${url.href}`);
const kycStatusRes = await ws.http.fetch(url.href, {
method: "GET",
+ cancellationToken,
});
let kycUrl: string;
let amlStatus: ExchangeAmlStatus | undefined;
@@ -1003,7 +1005,14 @@ async function processPlanchetExchangeBatchRequest(
timeout: Duration.fromSpec({ seconds: 40 }),
});
if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
- await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs);
+ await handleKycRequired(
+ ws,
+ withdrawalGroup,
+ resp,
+ 0,
+ requestCoinIdxs,
+ cancellationToken,
+ );
return {
batchResp: { ev_sigs: [] },
coinIdxs: [],
@@ -1359,6 +1368,7 @@ interface WithdrawalGroupContext {
async function processWithdrawalGroupAbortingBank(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { withdrawalGroupId } = withdrawalGroup;
const transactionId = constructTransactionIdentifier({
@@ -1375,6 +1385,7 @@ async function processWithdrawalGroupAbortingBank(
const abortResp = await ws.http.fetch(abortUrl, {
method: "POST",
body: {},
+ cancellationToken,
});
logger.info(`abort response status: ${abortResp.status}`);
@@ -1708,7 +1719,11 @@ export async function processWithdrawalGroup(
cancellationToken,
);
case WithdrawalGroupStatus.AbortingBank:
- return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup);
+ return await processWithdrawalGroupAbortingBank(
+ ws,
+ withdrawalGroup,
+ cancellationToken,
+ );
case WithdrawalGroupStatus.AbortedBank:
case WithdrawalGroupStatus.AbortedExchange:
case WithdrawalGroupStatus.FailedAbortingBank:
@@ -1749,10 +1764,14 @@ export async function getExchangeWithdrawalInfo(
);
}
- const withdrawalAccountsList = await fetchWithdrawalAccountInfo(ws, {
- exchange,
- instructedAmount,
- });
+ const withdrawalAccountsList = await fetchWithdrawalAccountInfo(
+ ws,
+ {
+ exchange,
+ instructedAmount,
+ },
+ CancellationToken.CONTINUE,
+ );
logger.trace("updating withdrawal denoms");
await updateWithdrawalDenoms(ws, exchangeBaseUrl);
@@ -2069,6 +2088,7 @@ export function getBankAbortUrl(talerWithdrawUri: string): string {
async function registerReserveWithBank(
ws: InternalWalletState,
withdrawalGroupId: string,
+ cancellationToken: CancellationToken,
): Promise<void> {
const withdrawalGroup = await ws.db.runReadOnlyTx(
["withdrawalGroups"],
@@ -2106,6 +2126,7 @@ async function registerReserveWithBank(
method: "POST",
body: reqBody,
timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken,
});
const status = await readSuccessResponseJsonOrThrow(
httpResp,
@@ -2231,7 +2252,7 @@ async function processBankRegisterReserve(
// FIXME: Put confirm transfer URL in the DB!
- await registerReserveWithBank(ws, withdrawalGroupId);
+ await registerReserveWithBank(ws, withdrawalGroupId, cancellationToken);
return TaskRunResult.progress();
}
@@ -2631,10 +2652,14 @@ export async function acceptWithdrawalFromUri(
const exchange = await fetchFreshExchange(ws, selectedExchange);
- const withdrawalAccountList = await fetchWithdrawalAccountInfo(ws, {
- exchange,
- instructedAmount: withdrawInfo.amount,
- });
+ const withdrawalAccountList = await fetchWithdrawalAccountInfo(
+ ws,
+ {
+ exchange,
+ instructedAmount: withdrawInfo.amount,
+ },
+ CancellationToken.CONTINUE,
+ );
const withdrawalGroup = await internalCreateWithdrawalGroup(ws, {
amount: withdrawInfo.amount,
@@ -2762,7 +2787,8 @@ async function fetchAccount(
ws: InternalWalletState,
instructedAmount: AmountJson,
acct: ExchangeWireAccount,
- reservePub?: string,
+ reservePub: string | undefined,
+ cancellationToken: CancellationToken,
): Promise<WithdrawalExchangeAccountDetails> {
let paytoUri: string;
let transferAmount: AmountString | undefined = undefined;
@@ -2773,7 +2799,9 @@ async function fetchAccount(
"amount_credit",
Amounts.stringify(instructedAmount),
);
- const httpResp = await ws.http.fetch(reqUrl.href);
+ const httpResp = await ws.http.fetch(reqUrl.href, {
+ cancellationToken,
+ });
const respOrErr = await readSuccessResponseJsonOrErrorCode(
httpResp,
codecForCashinConversionResponse(),
@@ -2789,7 +2817,9 @@ async function fetchAccount(
paytoUri = acct.payto_uri;
transferAmount = resp.amount_debit;
const configUrl = new URL("config", acct.conversion_url);
- const configResp = await ws.http.fetch(configUrl.href);
+ const configResp = await ws.http.fetch(configUrl.href, {
+ cancellationToken,
+ });
const configRespOrError = await readSuccessResponseJsonOrErrorCode(
configResp,
codecForConversionBankConfig(),
@@ -2840,6 +2870,7 @@ async function fetchWithdrawalAccountInfo(
instructedAmount: AmountJson;
reservePub?: string;
},
+ cancellationToken: CancellationToken,
): Promise<WithdrawalExchangeAccountDetails[]> {
const { exchange } = req;
const withdrawalAccounts: WithdrawalExchangeAccountDetails[] = [];
@@ -2849,6 +2880,7 @@ async function fetchWithdrawalAccountInfo(
req.instructedAmount,
acct,
req.reservePub,
+ cancellationToken,
);
withdrawalAccounts.push(acctInfo);
}
@@ -2885,11 +2917,15 @@ export async function createManualWithdrawal(
{},
);
- const withdrawalAccountsList = await fetchWithdrawalAccountInfo(ws, {
- exchange,
- instructedAmount: amount,
- reservePub: reserveKeyPair.pub,
- });
+ const withdrawalAccountsList = await fetchWithdrawalAccountInfo(
+ ws,
+ {
+ exchange,
+ instructedAmount: amount,
+ reservePub: reserveKeyPair.pub,
+ },
+ CancellationToken.CONTINUE,
+ );
const withdrawalGroup = await internalCreateWithdrawalGroup(ws, {
amount: Amounts.jsonifyAmount(req.amount),