summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/transactions.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/transactions.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts190
1 files changed, 159 insertions, 31 deletions
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index c03d2aa3d..1c2ce34bb 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -63,12 +63,15 @@ import {
PeerPullPaymentInitiationRecord,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
+import { PendingTaskType } from "../pending-types.js";
import { checkDbInvariant } from "../util/invariants.js";
-import { RetryTags } from "../util/retries.js";
+import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js";
import {
makeTombstoneId,
makeTransactionId,
parseId,
+ resetOperationTimeout,
+ runOperationWithErrorReporting,
TombstoneTag,
} from "./common.js";
import { processDepositGroup } from "./deposits.js";
@@ -79,6 +82,7 @@ import {
extractContractData,
processPurchasePay,
} from "./pay-merchant.js";
+import { processPeerPullCredit } from "./pay-peer.js";
import { processRefreshGroup } from "./refresh.js";
import { processTip } from "./tip.js";
import {
@@ -152,7 +156,7 @@ export async function getTransactionById(
if (!withdrawalGroupRecord) throw Error("not found");
- const opId = RetryTags.forWithdrawal(withdrawalGroupRecord);
+ const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord);
const ort = await tx.operationRetries.get(opId);
if (
@@ -215,7 +219,7 @@ export async function getTransactionById(
Amounts.zeroOfAmount(contractData.amount),
);
- const payOpId = RetryTags.forPay(purchase);
+ const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
return buildTransactionForPurchase(
@@ -237,7 +241,7 @@ export async function getTransactionById(
if (!tipRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
- RetryTags.forTipPickup(tipRecord),
+ TaskIdentifiers.forTipPickup(tipRecord),
);
return buildTransactionForTip(tipRecord, retries);
});
@@ -250,7 +254,7 @@ export async function getTransactionById(
if (!depositRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
- RetryTags.forDeposit(depositRecord),
+ TaskIdentifiers.forDeposit(depositRecord),
);
return buildTransactionForDeposit(depositRecord, retries);
});
@@ -359,11 +363,11 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPushCredit(pushInc);
+ const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPushCredit(
@@ -394,11 +398,12 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc);
+ const pushIncOpId =
+ TaskIdentifiers.forPeerPullPaymentInitiation(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPullCredit(
@@ -1109,11 +1114,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPushCredit(pi);
+ const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@@ -1142,11 +1147,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi);
+ const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@@ -1166,7 +1171,7 @@ export async function getTransactions(
return;
}
let required = false;
- const opId = RetryTags.forRefresh(rg);
+ const opId = TaskIdentifiers.forRefresh(rg);
if (transactionsRequest?.includeRefreshes) {
required = true;
} else if (rg.operationStatus !== RefreshOperationStatus.Finished) {
@@ -1195,7 +1200,7 @@ export async function getTransactions(
return;
}
- const opId = RetryTags.forWithdrawal(wsr);
+ const opId = TaskIdentifiers.forWithdrawal(wsr);
const ort = await tx.operationRetries.get(opId);
switch (wsr.wgInfo.withdrawalType) {
@@ -1238,7 +1243,7 @@ export async function getTransactions(
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
}
- const opId = RetryTags.forDeposit(dg);
+ const opId = TaskIdentifiers.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForDeposit(dg, retryRecord));
@@ -1309,7 +1314,7 @@ export async function getTransactions(
);
});
- const payOpId = RetryTags.forPay(purchase);
+ const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
transactions.push(
await buildTransactionForPurchase(
@@ -1333,7 +1338,7 @@ export async function getTransactions(
if (!tipRecord.acceptedTimestamp) {
return;
}
- const opId = RetryTags.forTipPickup(tipRecord);
+ const opId = TaskIdentifiers.forTipPickup(tipRecord);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForTip(tipRecord, retryRecord));
});
@@ -1359,6 +1364,77 @@ export async function getTransactions(
return { transactions: [...txNotPending, ...txPending] };
}
+export type ParsedTransactionIdentifier =
+ | { tag: TransactionType.Deposit; depositGroupId: string }
+ | { tag: TransactionType.Payment; proposalId: string }
+ | { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string }
+ | { tag: TransactionType.PeerPullCredit; pursePub: string }
+ | { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string }
+ | { tag: TransactionType.PeerPushDebit; pursePub: string }
+ | { tag: TransactionType.Refresh; refreshGroupId: string }
+ | { tag: TransactionType.Refund; proposalId: string; executionTime: string }
+ | { tag: TransactionType.Tip; walletTipId: string }
+ | { tag: TransactionType.Withdrawal; withdrawalGroupId: string };
+
+/**
+ * Parse a transaction identifier string into a typed, structured representation.
+ */
+export function parseTransactionIdentifier(
+ transactionId: string,
+): ParsedTransactionIdentifier | undefined {
+ const { type, args: rest } = parseId("any", transactionId);
+
+ switch (type) {
+ case TransactionType.Deposit:
+ return { tag: TransactionType.Deposit, depositGroupId: rest[0] };
+ case TransactionType.Payment:
+ return { tag: TransactionType.Payment, proposalId: rest[0] };
+ case TransactionType.PeerPullCredit:
+ return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] };
+ case TransactionType.PeerPullDebit:
+ return {
+ tag: TransactionType.PeerPullDebit,
+ peerPullPaymentIncomingId: rest[0],
+ };
+ case TransactionType.PeerPushCredit:
+ return {
+ tag: TransactionType.PeerPushCredit,
+ peerPushPaymentIncomingId: rest[0],
+ };
+ case TransactionType.PeerPushDebit:
+ return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] };
+ case TransactionType.Refresh:
+ return { tag: TransactionType.Refresh, refreshGroupId: rest[0] };
+ case TransactionType.Refund:
+ return {
+ tag: TransactionType.Refund,
+ proposalId: rest[0],
+ executionTime: rest[1],
+ };
+ case TransactionType.Tip:
+ return {
+ tag: TransactionType.Tip,
+ walletTipId: rest[0],
+ };
+ case TransactionType.Withdrawal:
+ return {
+ tag: TransactionType.Withdrawal,
+ withdrawalGroupId: rest[0],
+ };
+ default:
+ return undefined;
+ }
+}
+
+export function stopLongpolling(ws: InternalWalletState, taskId: string) {
+ const longpoll = ws.activeLongpoll[taskId];
+ if (longpoll) {
+ logger.info(`cancelling long-polling for ${taskId}`);
+ longpoll.cancel();
+ delete ws.activeLongpoll[taskId];
+ }
+}
+
/**
* Immediately retry the underlying operation
* of a transaction.
@@ -1369,34 +1445,86 @@ export async function retryTransaction(
): Promise<void> {
logger.info(`retrying transaction ${transactionId}`);
- const { type, args: rest } = parseId("any", transactionId);
+ const parsedTx = parseTransactionIdentifier(transactionId);
- switch (type) {
+ if (!parsedTx) {
+ throw Error("invalid transaction identifier");
+ }
+
+ // FIXME: We currently don't cancel active long-polling tasks here.
+
+ switch (parsedTx.tag) {
+ case TransactionType.PeerPullCredit: {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullInitiation,
+ pursePub: parsedTx.pursePub,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processPeerPullCredit(ws, parsedTx.pursePub),
+ );
+ break;
+ }
case TransactionType.Deposit: {
- const depositGroupId = rest[0];
- processDepositGroup(ws, depositGroupId, {
- forceNow: true,
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: parsedTx.depositGroupId,
});
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processDepositGroup(ws, parsedTx.depositGroupId),
+ );
break;
}
case TransactionType.Withdrawal: {
- const withdrawalGroupId = rest[0];
- await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true });
+ // FIXME: Abort current long-poller!
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: parsedTx.withdrawalGroupId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processWithdrawalGroup(ws, parsedTx.withdrawalGroupId),
+ );
break;
}
case TransactionType.Payment: {
- const proposalId = rest[0];
- await processPurchasePay(ws, proposalId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: parsedTx.proposalId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processPurchasePay(ws, parsedTx.proposalId),
+ );
break;
}
case TransactionType.Tip: {
- const walletTipId = rest[0];
- await processTip(ws, walletTipId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.TipPickup,
+ walletTipId: parsedTx.walletTipId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processTip(ws, parsedTx.walletTipId),
+ );
break;
}
case TransactionType.Refresh: {
- const refreshGroupId = rest[0];
- await processRefreshGroup(ws, refreshGroupId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: parsedTx.refreshGroupId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processRefreshGroup(ws, parsedTx.refreshGroupId),
+ );
break;
}
default: