commit 3305c83762e324a008f1b6b09796fe61d7fe0e9a
parent ee15e3d6192eca5dbe9c957dc0fc421efc77abd4
Author: Antoine A <>
Date: Thu, 17 Apr 2025 10:32:35 +0200
wallet-core: allow 5 concurrent long poll per hostname
Diffstat:
9 files changed, 111 insertions(+), 88 deletions(-)
diff --git a/packages/taler-harness/src/integrationtests/test-peer-pull.ts b/packages/taler-harness/src/integrationtests/test-peer-pull.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2020 Taler Systems S.A.
+ (C) 2020-2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -143,7 +143,7 @@ export async function runPeerPullTest(t: GlobalTestState) {
WalletApiOperation.PreparePeerPullDebit,
{ talerUri: tx.talerUri! }
));
- t.assertTrue(e.errorDetail.code === TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE)
+ t.assertTrue(e.errorDetail.code === TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE);
}
t.logStep("P2P pull confirm");
@@ -211,7 +211,7 @@ export async function runPeerPullTest(t: GlobalTestState) {
]);
}
- /*t.logStep("P2P pull self"); TODO timeout
+ t.logStep("P2P pull self");
{
const tx = await init_peer_pull_credit(wallet1, "self");
const prepare = await wallet1.call(
@@ -225,17 +225,17 @@ export async function runPeerPullTest(t: GlobalTestState) {
wallet1.call(WalletApiOperation.TestingWaitTransactionState, {
transactionId: tx.transactionId,
txState: {
- major: TransactionMajorState.Aborted,
+ major: TransactionMajorState.Done,
},
}),
wallet1.call(WalletApiOperation.TestingWaitTransactionState, {
transactionId: prepare.transactionId,
txState: {
- major: TransactionMajorState.Aborted,
+ major: TransactionMajorState.Done,
},
}),
]);
- }*/
+ }
t.logStep("P2P pull conflict");
{
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2021-2023 Taler Systems S.A.
+ (C) 2021-2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -1022,7 +1022,7 @@ async function processDepositGroupPendingKyc(
);
const kycStatusRes = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);
@@ -1114,7 +1114,7 @@ async function processDepositGroupPendingKycAuth(
url.searchParams.set("lpt", "1");
const kycStatusRes = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);
@@ -1841,7 +1841,7 @@ async function trackDeposit(
});
url.searchParams.set("merchant_sig", sigResp.sig);
const httpResp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
@@ -1,6 +1,7 @@
/*
This file is part of GNU Taler
(C) 2019 GNUnet e.V.
+ (C) 2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -3669,7 +3670,7 @@ async function handleExchangeKycPendingLegitimization(
const paytoHash = encodeCrock(hashFullPaytoUri(reservePayto));
const resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
exchange.baseUrl,
async (timeoutMs) => {
const reqUrl = new URL(`kyc-check/${paytoHash}`, exchange.baseUrl);
@@ -3677,6 +3678,7 @@ async function handleExchangeKycPendingLegitimization(
logger.info(`long-polling wallet KYC status at ${reqUrl.href}`);
return await wex.http.fetch(reqUrl.href, {
method: "GET",
+ cancellationToken: wex.cancellationToken,
headers: {
["Account-Owner-Signature"]: sigResp.sig,
},
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2019-2023 Taler Systems S.A.
+ (C) 2019-2025 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -3958,7 +3958,7 @@ async function checkIfOrderIsAlreadyPaid(
if (doLongPolling) {
resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
requestUrl.hostname,
async (timeoutMs) => {
requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
@@ -4136,7 +4136,7 @@ async function processPurchaseAutoRefund(
requestUrl.searchParams.set("refund", Amounts.stringify(totalKnownRefund));
const resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
requestUrl.hostname,
async (timeoutMs) => {
requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -596,12 +596,12 @@ async function queryPurseForPeerPullCredit(
`purses/${pullIni.pursePub}/deposit`,
pullIni.exchangeBaseUrl,
);
- purseDepositUrl.searchParams.set("timeout_ms", "30000");
logger.info(`querying purse status via ${purseDepositUrl.href}`);
const resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
purseDepositUrl.hostname,
- async () => {
+ async (timeoutMs) => {
+ purseDepositUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
return await wex.http.fetch(purseDepositUrl.href, {
timeout: { d_ms: 60000 },
cancellationToken: wex.cancellationToken,
@@ -685,7 +685,7 @@ async function longpollKycStatus(
const ctx = new PeerPullCreditTransactionContext(wex, pursePub);
const url = new URL(`kyc-check/${kycPaytoHash}`, exchangeUrl);
const kycStatusRes = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -734,7 +734,7 @@ async function longpollKycStatus(
const url = new URL(`kyc-check/${kycPaytoHash}`, exchangeUrl);
logger.info(`kyc url ${url.href}`);
const kycStatusRes = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -957,13 +957,12 @@ async function processPeerPushDebitReady(
peerPushInitiation.exchangeBaseUrl,
);
const resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
mergeUrl.hostname,
async (timeoutMs) => {
mergeUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
logger.info(`long-polling on purse status at ${mergeUrl.href}`);
return await wex.http.fetch(mergeUrl.href, {
- // timeout: getReserveRequestTimeout(withdrawalGroup),
cancellationToken: wex.cancellationToken,
});
},
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
@@ -2448,7 +2448,7 @@ async function dispatchWalletCoreApiRequest(
wex = getObservedWalletExecutionContext(ws, cts.token, cts, oc);
} else {
oc = {
- observe(evt) {},
+ observe(evt) { },
};
wex = getNormalWalletExecutionContext(ws, cts.token, cts, oc);
}
@@ -2593,7 +2593,7 @@ export class Cache<T> {
constructor(
private maxCapacity: number,
private cacheDuration: Duration,
- ) {}
+ ) { }
get(key: string): T | undefined {
const r = this.map.get(key);
@@ -2629,7 +2629,7 @@ export class Cache<T> {
* Implementation of triggers for the wallet DB.
*/
class WalletDbTriggerSpec implements TriggerSpec {
- constructor(public ws: InternalWalletState) {}
+ constructor(public ws: InternalWalletState) { }
afterCommit(info: AfterCommitInfo): void {
if (info.mode !== "readwrite") {
@@ -2654,13 +2654,85 @@ class WalletDbTriggerSpec implements TriggerSpec {
}
type LongpollRunFn<T> = (timeoutMs: number) => Promise<T>;
-type ResolveFn = () => void;
-/**
- * Per-hostname state for longpolling.
- */
-interface LongpollState {
- queue: Array<ResolveFn>;
+const PER_HOSTNAME_PERMITS: number = 5;
+
+class LongpollQueue {
+ private idCounter: number = 1
+ private hostNameQueues: Map<string, { queue: (() => void)[], permit: number }> = new Map();
+ // FIXME add an additional global semaphore
+
+ constructor() { }
+
+ async queue<T>(
+ cancellationToken: CancellationToken,
+ hostname: string,
+ f: LongpollRunFn<T>
+ ): Promise<T> {
+ const rid = this.idCounter++;
+
+ const triggerNextLongpoll = () => {
+ logger.trace(`cleaning up after long-poll ${rid} to ${hostname}`);
+ const state = this.hostNameQueues.get(hostname);
+ if (state == null) {
+ return;
+ }
+ // Run pending task
+ const next = state.queue.shift();
+ if (next != null) {
+ next();
+ } else {
+ // Else release permit
+ state.permit++;
+ if (state.permit == PER_HOSTNAME_PERMITS) {
+ this.hostNameQueues.delete(hostname);
+ }
+ }
+ };
+
+ const doRunLongpoll: () => Promise<T> = async () => {
+ const st = this.hostNameQueues.get(hostname);
+ const numWaiting = st?.queue.length ?? 0;
+ logger.info(
+ `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting`,
+ );
+ try {
+ const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1)));
+ return await f(timeoutMs);
+ } finally {
+ triggerNextLongpoll();
+ }
+ };
+
+ // Get or set per hostname queue
+ let state = this.hostNameQueues.get(hostname);
+ if (state == null) {
+ state = {
+ permit: PER_HOSTNAME_PERMITS,
+ queue: []
+ }
+ this.hostNameQueues.set(hostname, state)
+ }
+
+ if (state.permit > 0) {
+ state.permit--;
+ this.hostNameQueues.set(hostname, state)
+ return doRunLongpoll();
+ } else {
+ logger.info(`long-poll request ${rid} to ${hostname} queued`);
+ const promcap = openPromise<void>();
+ state.queue.push(promcap.resolve);
+ try {
+ await cancellationToken.racePromise(promcap.promise);
+ } finally {
+ logger.info(
+ `long-poll request ${rid} to ${hostname} cancelled while queued`,
+ );
+ triggerNextLongpoll();
+ }
+ return doRunLongpoll();
+ }
+ }
}
/**
@@ -2718,9 +2790,7 @@ export class InternalWalletState {
clientCancellationMap: Map<string, CancellationToken.Source> = new Map();
- private longpollStatePerHostname: Map<string, LongpollState> = new Map();
-
- private longpollRequestIdCounter = 1;
+ private longpollQueue = new LongpollQueue();
public get idbFactory(): BridgeIDBFactory {
return this.dbImplementation.idbFactory;
@@ -2753,63 +2823,16 @@ export class InternalWalletState {
}
/**
- * Run a long-polling request, potentially queueing the request
+ * Run a long-polling request, potentially queuing the request
* if too many other long-polling requests against the same hostname
* (or too many overall) are active.
*/
async runLongpollQueueing<T>(
- wex: WalletExecutionContext,
+ cancellationToken: CancellationToken,
hostname: string,
f: LongpollRunFn<T>,
): Promise<T> {
- let rid = this.longpollRequestIdCounter++;
- const triggerNextLongpoll = () => {
- logger.trace(`cleaning up after long-poll ${rid} request to ${hostname}`);
- const st = this.longpollStatePerHostname.get(hostname);
- if (!st) {
- return;
- }
- const next = st.queue.shift();
- if (next) {
- next();
- } else {
- this.longpollStatePerHostname.delete(hostname);
- }
- };
- const doRunLongpoll: () => Promise<T> = async () => {
- const st = this.longpollStatePerHostname.get(hostname);
- const numWaiting = st?.queue.length ?? 0;
- logger.info(
- `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting`,
- );
- try {
- const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1)));
- return await f(timeoutMs);
- } finally {
- triggerNextLongpoll();
- }
- };
- const state = this.longpollStatePerHostname.get(hostname);
- if (state) {
- logger.info(`long-poll request ${rid} to ${hostname} queued`);
- const promcap = openPromise<void>();
- state.queue.push(promcap.resolve);
- try {
- await wex.cancellationToken.racePromise(promcap.promise);
- } catch (e) {
- logger.info(
- `long-poll request ${rid} to ${hostname} cancelled while queued`,
- );
- triggerNextLongpoll();
- throw e;
- }
- return doRunLongpoll();
- } else {
- this.longpollStatePerHostname.set(hostname, {
- queue: [],
- });
- return Promise.resolve().then(doRunLongpoll);
- }
+ return this.longpollQueue.queue(cancellationToken, hostname, f)
}
clearAllCaches(): void {
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2019-2024 Taler Systems SA
+ (C) 2019-2025 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -1148,11 +1148,10 @@ async function processWithdrawalGroupDialogProposed(
url.searchParams.set("old_state", "pending");
const resp = await ctx.wex.ws.runLongpollQueueing(
- ctx.wex,
+ ctx.wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("long_poll_ms", `${timeoutMs}`);
-
return await ctx.wex.http.fetch(url.href, {
method: "GET",
cancellationToken: ctx.wex.cancellationToken,
@@ -1999,7 +1998,7 @@ async function processQueryReserve(
);
const resp = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
reserveUrl.hostname,
async (timeoutMs) => {
reserveUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
@@ -2165,7 +2164,7 @@ async function processWithdrawalGroupPendingKyc(
accountPub: withdrawalGroup.reservePub,
});
const kycStatusRes = await wex.ws.runLongpollQueueing(
- wex,
+ wex.cancellationToken,
url.hostname,
async (timeoutMs) => {
url.searchParams.set("timeout_ms", `${timeoutMs}`);