summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts167
1 files changed, 68 insertions, 99 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index e655eba4b..cc41abde9 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -33,6 +33,7 @@ import {
TalerProtocolTimestamp,
TalerUriAction,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -62,17 +63,15 @@ import {
timestampPreciseToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant } from "../util/invariants.js";
import {
- LongpollResult,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
} from "./common.js";
import {
codecForExchangePurseStatus,
@@ -81,7 +80,6 @@ import {
import {
constructTransactionIdentifier,
notifyTransition,
- stopLongpolling,
} from "./transactions.js";
import {
getExchangeWithdrawalInfo,
@@ -91,8 +89,8 @@ import {
const logger = new Logger("pay-peer-pull-credit.ts");
export class PeerPullCreditTransactionContext implements TransactionContext {
- private transactionId: string;
- private retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly retryTag: TaskId;
constructor(
public ws: InternalWalletState,
@@ -139,7 +137,6 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -193,12 +190,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -244,11 +241,11 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
});
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.stopShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -301,13 +298,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -355,7 +351,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -363,7 +361,7 @@ async function queryPurseForPeerPullCredit(
ws: InternalWalletState,
pullIni: PeerPullCreditRecord,
cancellationToken: CancellationToken,
-): Promise<LongpollResult> {
+): Promise<TaskRunResult> {
const purseDepositUrl = new URL(
`purses/${pullIni.pursePub}/deposit`,
pullIni.exchangeBaseUrl,
@@ -401,10 +399,10 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
+ return TaskRunResult.backoff();
}
case HttpStatusCode.NotFound:
- return { ready: false };
+ return TaskRunResult.backoff();
}
const result = await readSuccessResponseJsonOrThrow(
@@ -418,7 +416,7 @@ async function queryPurseForPeerPullCredit(
if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) {
logger.info("purse not ready yet (no deposit)");
- return { ready: false };
+ return TaskRunResult.backoff();
}
const reserve = await ws.db
@@ -462,9 +460,7 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
+ return TaskRunResult.backoff();
}
async function longpollKycStatus(
@@ -473,6 +469,7 @@ async function longpollKycStatus(
exchangeUrl: string,
kycInfo: KycPendingInfo,
userType: KycUserType,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -483,56 +480,47 @@ async function longpollKycStatus(
pursePub,
});
- runLongpollAsync(ws, retryTag, async (ct) => {
- const url = new URL(
- `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
- exchangeUrl,
- );
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
- method: "GET",
- cancellationToken: ct,
- });
- if (
- kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
- // remove after the exchange is fixed or clarified
- kycStatusRes.status === HttpStatusCode.NoContent
- ) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
- const peerIni = await tx.peerPullCredit.get(pursePub);
- if (!peerIni) {
- return;
- }
- if (
- peerIni.status !==
- PeerPullPaymentCreditStatus.PendingMergeKycRequired
- ) {
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(peerIni);
- peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
- const newTxState = computePeerPullCreditTransactionState(peerIni);
- await tx.peerPullCredit.put(peerIni);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
- } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
- // FIXME: Do we have to update the URL here?
- return { ready: false };
- } else {
- throw Error(
- `unexpected response from kyc-check (${kycStatusRes.status})`,
- );
- }
+ const url = new URL(
+ `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+ exchangeUrl,
+ );
+ url.searchParams.set("timeout_ms", "10000");
+ logger.info(`kyc url ${url.href}`);
+ const kycStatusRes = await ws.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken,
});
- return {
- type: TaskRunResultType.Longpoll,
- };
+ if (
+ kycStatusRes.status === HttpStatusCode.Ok ||
+ //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // remove after the exchange is fixed or clarified
+ kycStatusRes.status === HttpStatusCode.NoContent
+ ) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.peerPullCredit])
+ .runReadWrite(async (tx) => {
+ const peerIni = await tx.peerPullCredit.get(pursePub);
+ if (!peerIni) {
+ return;
+ }
+ if (
+ peerIni.status !== PeerPullPaymentCreditStatus.PendingMergeKycRequired
+ ) {
+ return;
+ }
+ const oldTxState = computePeerPullCreditTransactionState(peerIni);
+ peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
+ const newTxState = computePeerPullCreditTransactionState(peerIni);
+ await tx.peerPullCredit.put(peerIni);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+ // FIXME: Do we have to update the URL here?
+ } else {
+ throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+ }
+ return TaskRunResult.backoff();
}
async function processPeerPullCreditAbortingDeletePurse(
@@ -584,7 +572,7 @@ async function processPeerPullCreditAbortingDeletePurse(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function handlePeerPullCreditWithdrawing(
@@ -637,7 +625,7 @@ async function handlePeerPullCreditWithdrawing(
return TaskRunResult.finished();
} else {
// FIXME: Return indicator that we depend on the other operation!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
}
@@ -757,13 +745,13 @@ async function handlePeerPullCreditCreatePurse(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
-
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const pullIni = await ws.db
.mktx((x) => [x.peerPullCredit])
@@ -779,14 +767,6 @@ export async function processPeerPullCredit(
pursePub,
});
- // We're already running!
- if (ws.activeLongpoll[retryTag]) {
- logger.info("peer-pull-credit already in long-polling, returning!");
- return {
- type: TaskRunResultType.Longpoll,
- };
- }
-
logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
switch (pullIni.status) {
@@ -794,15 +774,7 @@ export async function processPeerPullCredit(
return TaskRunResult.finished();
}
case PeerPullPaymentCreditStatus.PendingReady:
- runLongpollAsync(ws, retryTag, async (cancellationToken) =>
- queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
- );
- logger.trace(
- "returning early from processPeerPullCredit for long-polling in background",
- );
- return {
- type: TaskRunResultType.Longpoll,
- };
+ return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken);
case PeerPullPaymentCreditStatus.PendingMergeKycRequired: {
if (!pullIni.kycInfo) {
throw Error("invalid state, kycInfo required");
@@ -813,6 +785,7 @@ export async function processPeerPullCredit(
pullIni.exchangeBaseUrl,
pullIni.kycInfo,
"individual",
+ cancellationToken,
);
}
case PeerPullPaymentCreditStatus.PendingCreatePurse:
@@ -866,7 +839,7 @@ async function processPeerPullCreditKycRequired(
kycStatusRes.status === HttpStatusCode.NoContent
) {
logger.warn("kyc requested, but already fulfilled");
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusRes.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
@@ -906,7 +879,7 @@ async function processPeerPullCreditKycRequired(
};
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
} else {
throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
}
@@ -1095,20 +1068,16 @@ export async function initiatePeerPullPayment(
return { oldTxState, newTxState };
});
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.PeerPullCredit,
- pursePub: pursePair.pub,
- });
+ const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub);
// The pending-incoming balance has changed.
ws.notify({
type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
+ hintTransactionId: ctx.transactionId,
});
- notifyTransition(ws, transactionId, transitionInfo);
-
- ws.workAvailable.trigger();
+ notifyTransition(ws, ctx.transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(ctx.retryTag);
return {
talerUri: stringifyTalerUri({
@@ -1116,7 +1085,7 @@ export async function initiatePeerPullPayment(
exchangeBaseUrl: exchangeBaseUrl,
contractPriv: contractKeyPair.priv,
}),
- transactionId,
+ transactionId: ctx.transactionId,
};
}