summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts231
1 files changed, 104 insertions, 127 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
index 2e5af4e78..165c8deee 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
@@ -16,6 +16,7 @@
import {
Amounts,
+ CancellationToken,
CheckPeerPushDebitRequest,
CheckPeerPushDebitResponse,
CoinRefreshRequest,
@@ -32,6 +33,7 @@ import {
TalerProtocolTimestamp,
TalerProtocolViolationError,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -56,7 +58,7 @@ import {
timestampProtocolToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { PeerCoinRepair, selectPeerCoins } from "../util/coinSelection.js";
import { checkLogicInvariant } from "../util/invariants.js";
@@ -65,7 +67,6 @@ import {
TaskRunResultType,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
spendCoins,
} from "./common.js";
import {
@@ -76,14 +77,13 @@ import {
import {
constructTransactionIdentifier,
notifyTransition,
- stopLongpolling,
} from "./transactions.js";
const logger = new Logger("pay-peer-push-debit.ts");
export class PeerPushDebitTransactionContext implements TransactionContext {
- public transactionId: string;
- public retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly retryTag: TaskId;
constructor(
public ws: InternalWalletState,
@@ -114,7 +114,6 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -166,12 +165,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -218,12 +217,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -275,13 +275,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -328,7 +327,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -432,7 +433,7 @@ async function handlePurseCreationConflict(
}
await tx.peerPushDebit.put(myPpi);
});
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
async function processPeerPushDebitCreateReserve(
@@ -554,7 +555,7 @@ async function processPeerPushDebitCreateReserve(
stTo: PeerPushDebitStatus.PendingReady,
});
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingDeletePurse(
@@ -628,7 +629,7 @@ async function processPeerPushDebitAbortingDeletePurse(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
interface SimpleTransition {
@@ -712,7 +713,7 @@ async function processPeerPushDebitAbortingRefreshDeleted(
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingRefreshExpired(
@@ -760,7 +761,7 @@ async function processPeerPushDebitAbortingRefreshExpired(
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
/**
@@ -769,118 +770,102 @@ async function processPeerPushDebitAbortingRefreshExpired(
async function processPeerPushDebitReady(
ws: InternalWalletState,
peerPushInitiation: PeerPushDebitRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace("processing peer-push-debit pending(ready)");
const pursePub = peerPushInitiation.pursePub;
- const retryTag = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
const transactionId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub,
});
- runLongpollAsync(ws, retryTag, async (ct) => {
- const mergeUrl = new URL(
- `purses/${pursePub}/merge`,
- peerPushInitiation.exchangeBaseUrl,
+ const mergeUrl = new URL(
+ `purses/${pursePub}/merge`,
+ peerPushInitiation.exchangeBaseUrl,
+ );
+ mergeUrl.searchParams.set("timeout_ms", "30000");
+ logger.info(`long-polling on purse status at ${mergeUrl.href}`);
+ const resp = await ws.http.fetch(mergeUrl.href, {
+ // timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken,
+ });
+ if (resp.status === HttpStatusCode.Ok) {
+ const purseStatus = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForExchangePurseStatus(),
);
- mergeUrl.searchParams.set("timeout_ms", "30000");
- logger.info(`long-polling on purse status at ${mergeUrl.href}`);
- const resp = await ws.http.fetch(mergeUrl.href, {
- // timeout: getReserveRequestTimeout(withdrawalGroup),
- cancellationToken: ct,
- });
- if (resp.status === HttpStatusCode.Ok) {
- const purseStatus = await readSuccessResponseJsonOrThrow(
- resp,
- codecForExchangePurseStatus(),
+ const mergeTimestamp = purseStatus.merge_timestamp;
+ logger.info(`got purse status ${j2s(purseStatus)}`);
+ if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
+ return TaskRunResult.backoff();
+ } else {
+ await transitionPeerPushDebitTransaction(
+ ws,
+ peerPushInitiation.pursePub,
+ {
+ stFrom: PeerPushDebitStatus.PendingReady,
+ stTo: PeerPushDebitStatus.Done,
+ },
);
- const mergeTimestamp = purseStatus.merge_timestamp;
- logger.info(`got purse status ${j2s(purseStatus)}`);
- if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
- return { ready: false };
- } else {
- await transitionPeerPushDebitTransaction(
+ return TaskRunResult.finished();
+ }
+ } else if (resp.status === HttpStatusCode.Gone) {
+ logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
+ const transitionInfo = await ws.db
+ .mktx((x) => [
+ x.peerPushDebit,
+ x.refreshGroups,
+ x.denominations,
+ x.coinAvailability,
+ x.coins,
+ ])
+ .runReadWrite(async (tx) => {
+ const ppiRec = await tx.peerPushDebit.get(pursePub);
+ if (!ppiRec) {
+ return undefined;
+ }
+ if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
+ return undefined;
+ }
+ const currency = Amounts.currencyOf(ppiRec.amount);
+ const oldTxState = computePeerPushDebitTransactionState(ppiRec);
+ const coinPubs: CoinRefreshRequest[] = [];
+
+ for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: ppiRec.coinSel.contributions[i],
+ coinPub: ppiRec.coinSel.coinPubs[i],
+ });
+ }
+
+ const refresh = await createRefreshGroup(
ws,
- peerPushInitiation.pursePub,
- {
- stFrom: PeerPushDebitStatus.PendingReady,
- stTo: PeerPushDebitStatus.Done,
- },
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ transactionId,
);
+ ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
+ ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
+ await tx.peerPushDebit.put(ppiRec);
+ const newTxState = computePeerPushDebitTransactionState(ppiRec);
return {
- ready: true,
+ oldTxState,
+ newTxState,
};
- }
- } else if (resp.status === HttpStatusCode.Gone) {
- logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.peerPushDebit,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
- const ppiRec = await tx.peerPushDebit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
- return undefined;
- }
- const currency = Amounts.currencyOf(ppiRec.amount);
- const oldTxState = computePeerPushDebitTransactionState(ppiRec);
- const coinPubs: CoinRefreshRequest[] = [];
-
- for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
- coinPubs.push({
- amount: ppiRec.coinSel.contributions[i],
- coinPub: ppiRec.coinSel.coinPubs[i],
- });
- }
-
- const refresh = await createRefreshGroup(
- ws,
- tx,
- currency,
- coinPubs,
- RefreshReason.AbortPeerPushDebit,
- transactionId,
- );
- ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
- ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
- await tx.peerPushDebit.put(ppiRec);
- const newTxState = computePeerPushDebitTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
- } else {
- logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
- return {
- ready: false,
- };
- }
- });
- logger.trace(
- "returning early from peer-push-debit for long-polling in background",
- );
- return {
- type: TaskRunResultType.Longpoll,
- };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ return TaskRunResult.backoff();
+ } else {
+ logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
+ return TaskRunResult.backoff();
+ }
}
export async function processPeerPushDebit(
ws: InternalWalletState,
pursePub: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const peerPushInitiation = await ws.db
.mktx((x) => [x.peerPushDebit])
@@ -891,24 +876,15 @@ export async function processPeerPushDebit(
throw Error("peer push payment not found");
}
- const retryTag = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
-
- // We're already running!
- if (ws.activeLongpoll[retryTag]) {
- logger.info("peer-push-debit task already in long-polling, returning!");
- return {
- type: TaskRunResultType.Longpoll,
- };
- }
-
switch (peerPushInitiation.status) {
case PeerPushDebitStatus.PendingCreatePurse:
return processPeerPushDebitCreateReserve(ws, peerPushInitiation);
case PeerPushDebitStatus.PendingReady:
- return processPeerPushDebitReady(ws, peerPushInitiation);
+ return processPeerPushDebitReady(
+ ws,
+ peerPushInitiation,
+ cancellationToken,
+ );
case PeerPushDebitStatus.AbortingDeletePurse:
return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation);
case PeerPushDebitStatus.AbortingRefreshDeleted:
@@ -971,10 +947,9 @@ export async function initiatePeerPushDebit(
const pursePub = pursePair.pub;
- const transactionId = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
+ const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+
+ const transactionId = ctx.transactionId;
const contractEncNonce = encodeCrock(getRandomBytes(24));
@@ -1044,6 +1019,8 @@ export async function initiatePeerPushDebit(
hintTransactionId: transactionId,
});
+ ws.taskScheduler.startShepherdTask(ctx.retryTag);
+
return {
contractPriv: contractKeyPair.priv,
mergePriv: mergePair.priv,