commit 29faccd67d87d66533405bcbe4ebc68127ac678a parent a93a3c92ca580f79ac3ec27ca03d06109320035f Author: Antoine A <> Date: Thu, 24 Apr 2025 15:09:07 +0200 wallet: add long polling and cancelation to exchange client Diffstat:
23 files changed, 327 insertions(+), 377 deletions(-)
diff --git a/packages/aml-backoffice-ui/src/pages/CaseDetails.tsx b/packages/aml-backoffice-ui/src/pages/CaseDetails.tsx @@ -423,7 +423,7 @@ function SubmitNewDecision({ case HttpStatusCode.Conflict: return i18n.str`Officer disabled or more recent decision was already submitted.`; default: - assertUnreachable(fail); + assertUnreachable(fail.case); } }, ); diff --git a/packages/aml-backoffice-ui/src/pages/CaseUpdate.tsx b/packages/aml-backoffice-ui/src/pages/CaseUpdate.tsx @@ -127,7 +127,7 @@ export function CaseUpdate({ case HttpStatusCode.Conflict: return i18n.str`Officer disabled or more recent decision was already submitted.`; default: - assertUnreachable(fail); + assertUnreachable(fail.case); } }, ); diff --git a/packages/aml-backoffice-ui/src/pages/Dashboard.tsx b/packages/aml-backoffice-ui/src/pages/Dashboard.tsx @@ -163,7 +163,7 @@ function EventMetrics({ </Fragment> ); default: - assertUnreachable(error); + assertUnreachable(error.case); } } const result = resp as OperationOk< diff --git a/packages/aml-backoffice-ui/src/pages/decision/Summary.tsx b/packages/aml-backoffice-ui/src/pages/decision/Summary.tsx @@ -156,7 +156,7 @@ export function Summary({ case HttpStatusCode.Conflict: return i18n.str`Officer disabled or more recent decision was already submitted.`; default: - assertUnreachable(fail); + assertUnreachable(fail.case); } }, ); diff --git a/packages/kyc-ui/src/hooks/kyc.ts b/packages/kyc-ui/src/hooks/kyc.ts @@ -36,7 +36,7 @@ export function useKycInfo(token: AccessToken) { } = useExchangeApiContext(); async function fetcher([ac]: [AccessToken]) { - return await api.checkKycInfo(ac, [], { timeout: 1000 }); + return await api.checkKycInfo(ac, [], true); } const { data, error } = useSWR< TalerExchangeResultByMethod<"checkKycInfo">, diff --git a/packages/kyc-ui/src/pages/FillForm.tsx b/packages/kyc-ui/src/pages/FillForm.tsx @@ -149,7 +149,7 @@ function ShowForm({ case HttpStatusCode.Conflict: return i18n.str`Officer disabled or more recent decision was already submitted.`; default: - assertUnreachable(fail); + assertUnreachable(fail.case); } }, ); diff --git a/packages/kyc-ui/src/pages/TriggerKyc.tsx b/packages/kyc-ui/src/pages/TriggerKyc.tsx @@ -13,7 +13,6 @@ You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -import { CancellationToken } from "@gnu-taler/taler-util"; import { AbsoluteTime, AccessToken, @@ -91,7 +90,7 @@ export function TriggerKyc({ onKycStarted }: Props): VNode { const paytoHash = kycAccount; async function check() { const { signingKey } = await accountPromise; - const result = await lib.exchange.checkKycStatus(signingKey, paytoHash, CancellationToken.CONTINUE); + const result = await lib.exchange.checkKycStatus(signingKey, paytoHash); switch (result.case) { case "ok": console.log("empty body"); diff --git a/packages/taler-harness/src/harness/environments.ts b/packages/taler-harness/src/harness/environments.ts @@ -1266,7 +1266,7 @@ export async function createKycTestkudosEnvironment( const exchangeApi = new TalerExchangeHttpClient( exchange.baseUrl, - harnessHttpLib, + { httpClient: harnessHttpLib } ); return { diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts @@ -1728,7 +1728,7 @@ export class ExchangeService implements ExchangeServiceInterface { await this.pingUntilAvailable(); { - const exchangeClient = new TalerExchangeHttpClient(this.baseUrl); + const exchangeClient = new TalerExchangeHttpClient(this.baseUrl, {}); // Would throw on incompatible version. const configResp = await exchangeClient.getConfig(); this.globalState.assertTrue(configResp.type === "ok"); diff --git a/packages/taler-harness/src/index.ts b/packages/taler-harness/src/index.ts @@ -1739,15 +1739,13 @@ merchantCli const { merchant_priv } = await db.getInstancePrivateKey(instanceId); const allAccounts = await db.getInstanceKycStatus(instanceId); - - const httpLib = createPlatformHttpLib(); + const info = await Promise.all( allAccounts.map(async ({ exchange_url, payto_uri }) => { - const exchangeApi = new TalerExchangeHttpClient(exchange_url, httpLib); + const exchangeApi = new TalerExchangeHttpClient(exchange_url, {}); const kyc_status = await exchangeApi.checkKycStatus( merchant_priv, - encodeCrock(hashNormalizedPaytoUri(parsePaytoUri(payto_uri)!)), - CancellationToken.CONTINUE + encodeCrock(hashNormalizedPaytoUri(parsePaytoUri(payto_uri)!)) ); return { payto_uri, diff --git a/packages/taler-harness/src/lint.ts b/packages/taler-harness/src/lint.ts @@ -580,7 +580,7 @@ export async function lintExchangeUrl(exchangeBaseUrl: string): Promise<void> { console.error(`invalid or non-canonical base url: ${exchangeBaseUrl}`); process.exit(1); } - const exchangeClient = new TalerExchangeHttpClient(canonUrl); + const exchangeClient = new TalerExchangeHttpClient(canonUrl, {}); const configResp = succeedOrThrow(await exchangeClient.getConfig()); const currency = configResp.currency; const keysResp = succeedOrThrow(await exchangeClient.getKeys()); diff --git a/packages/taler-util/src/http-client/exchange.ts b/packages/taler-util/src/http-client/exchange.ts @@ -17,6 +17,8 @@ import { Codec, codecForAny } from "../codec.js"; import { HttpRequestLibrary, + HttpRequestOptions, + HttpResponse, readSuccessResponseJsonOrThrow, readTalerErrorResponse, } from "../http-common.js"; @@ -54,6 +56,7 @@ import { ExchangeLegacyBatchWithdrawRequest, ExchangeMergeConflictResponse, ExchangeMergeSuccessResponse, + ExchangePurseDeposits, ExchangePurseMergeRequest, ExchangePurseStatus, ExchangeReservePurseRequest, @@ -84,6 +87,7 @@ import { AmountJson, Amounts, CancellationToken, + LongpollQueue, signAmlDecision, signAmlQuery, signKycAuth, @@ -91,7 +95,7 @@ import { } from "../index.js"; import { TalerErrorCode } from "../taler-error-codes.js"; import { AbsoluteTime } from "../time.js"; -import { codecForEmptyObject } from "../types-taler-wallet.js"; +import { codecForEmptyObject, TalerErrorDetail } from "../types-taler-wallet.js"; export type TalerExchangeResultByMethod< prop extends keyof TalerExchangeHttpClient, @@ -110,20 +114,28 @@ export type ReservePub = string & { [__pubId]: true }; /** */ export class TalerExchangeHttpClient { - httpLib: HttpRequestLibrary; public readonly PROTOCOL_VERSION = "21:0:0"; - cacheEvictor: CacheEvictor<TalerExchangeCacheEviction>; - preventCompression: boolean; + private httpLib: HttpRequestLibrary; + private cacheEvictor: CacheEvictor<TalerExchangeCacheEviction>; + private preventCompression: boolean; + private cancelationToken: CancellationToken; + private longPollQueue: LongpollQueue constructor( readonly baseUrl: string, - httpClient?: HttpRequestLibrary, - cacheEvictor?: CacheEvictor<TalerExchangeCacheEviction>, - preventCompression?: boolean, + params: { + httpClient?: HttpRequestLibrary, + cacheEvictor?: CacheEvictor<TalerExchangeCacheEviction>, + preventCompression?: boolean, + cancelationToken?: CancellationToken, + longPollQueue?: LongpollQueue + } ) { - this.httpLib = httpClient ?? createPlatformHttpLib(); - this.cacheEvictor = cacheEvictor ?? nullEvictor; - this.preventCompression = !!preventCompression; + this.httpLib = params.httpClient ?? createPlatformHttpLib(); + this.cacheEvictor = params.cacheEvictor ?? nullEvictor; + this.preventCompression = !!params.preventCompression; + this.cancelationToken = params.cancelationToken ?? CancellationToken.CONTINUE; + this.longPollQueue = params.longPollQueue ?? new LongpollQueue() } isCompatible(version: string): boolean { @@ -131,6 +143,24 @@ export class TalerExchangeHttpClient { return compare?.compatible ?? false; } + private async fetch(url_or_path: URL | string, opts: HttpRequestOptions = {}, longpoll: boolean = false): Promise<HttpResponse> { + const url = typeof url_or_path == 'string' ? new URL(url_or_path, this.baseUrl) : url_or_path; + if (longpoll || url.searchParams.has("timeout_ms")) { + return this.longPollQueue.run(url, this.cancelationToken, async (timeoutMs) => { + url.searchParams.set("timeout_ms", String(timeoutMs)) + return this.httpLib.fetch(url.href, { + cancellationToken: this.cancelationToken, + ...opts + }) + }) + } else { + return this.httpLib.fetch(url.href, { + cancellationToken: this.cancelationToken, + ...opts + }) + } + } + // TERMS /** @@ -152,10 +182,7 @@ export class TalerExchangeHttpClient { | OperationOk<Uint8Array<ArrayBuffer>> | OperationFail<HttpStatusCode.NotFound> > { - const url = new URL(`seed`, this.baseUrl); - const resp = await this.httpLib.fetch(url.href, { - method: "GET", - }); + const resp = await this.fetch('seed'); switch (resp.status) { case HttpStatusCode.Ok: const buffer = await resp.bytes(); @@ -175,10 +202,7 @@ export class TalerExchangeHttpClient { | OperationFail<HttpStatusCode.NotFound> | OperationOk<ExchangeVersionResponse> > { - const url = new URL(`config`, this.baseUrl); - const resp = await this.httpLib.fetch(url.href, { - method: "GET", - }); + const resp = await this.fetch('config'); switch (resp.status) { case HttpStatusCode.Ok: { const minBody = await readSuccessResponseJsonOrThrow( @@ -226,10 +250,7 @@ export class TalerExchangeHttpClient { * PARTIALLY IMPLEMENTED!! */ async getKeys() { - const url = new URL(`keys`, this.baseUrl); - const resp = await this.httpLib.fetch(url.href, { - method: "GET", - }); + const resp = await this.fetch('keys'); switch (resp.status) { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForExchangeKeysResponse()); @@ -382,9 +403,7 @@ export class TalerExchangeHttpClient { * */ async withdraw(rid: ReservePub, body: ExchangeLegacyBatchWithdrawRequest) { - const url = new URL(`reserves/${rid}/batch-withdraw`, this.baseUrl); - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`reserves/${rid}/batch-withdraw`, { method: "POST", body, }); @@ -548,20 +567,13 @@ export class TalerExchangeHttpClient { */ async getPurseStatusAtMerge( pursePub: string, - cancellationToken: CancellationToken, - params: { - timeout?: number; - } = {}, + longpoll: boolean = false ): Promise< OperationOk<ExchangePurseStatus> | OperationFail<HttpStatusCode.NotFound> | OperationFail<HttpStatusCode.Gone> > { - const url = new URL(`purses/${pursePub}/merge`, this.baseUrl); - if (params.timeout !== undefined) { - url.searchParams.set("timeout_ms", String(params.timeout)); - } - const resp = await this.httpLib.fetch(url.href, { cancellationToken, logId: "purse status" }); + const resp = await this.fetch(`purses/${pursePub}/merge`, {}, longpoll); switch (resp.status) { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForExchangePurseStatus()); @@ -579,20 +591,13 @@ export class TalerExchangeHttpClient { */ async getPurseStatusAtDeposit( pursePub: string, - cancellationToken: CancellationToken, - params: { - timeout?: number; - } = {}, + longpoll: boolean = false ): Promise< OperationOk<ExchangePurseStatus> | OperationFail<HttpStatusCode.NotFound> | OperationFail<HttpStatusCode.Gone> > { - const url = new URL(`purses/${pursePub}/deposit`, this.baseUrl); - if (params.timeout !== undefined) { - url.searchParams.set("timeout_ms", String(params.timeout)); - } - const resp = await this.httpLib.fetch(url.href, { cancellationToken, logId: "purse status" }); + const resp = await this.fetch(`purses/${pursePub}/deposit`, {}, longpoll); switch (resp.status) { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForExchangePurseStatus()); @@ -616,8 +621,31 @@ export class TalerExchangeHttpClient { * https://docs.taler.net/core/api-exchange.html#delete--purses-$PURSE_PUB * */ - async deletePurse(): Promise<never> { - throw Error("not yet implemented"); + async deletePurse( + pursePub: string, + purseSig: string + ): Promise< + OperationOk<void> + | OperationFail<HttpStatusCode.Forbidden> + | OperationFail<HttpStatusCode.NotFound> + | OperationFail<HttpStatusCode.Conflict> + > { + const resp = await this.fetch(`purses/${pursePub}`, { + method: "DELETE", + headers: { + "taler-purse-signature": purseSig, + } + }); + switch (resp.status) { + case HttpStatusCode.NoContent: + return opEmptySuccess(resp); + case HttpStatusCode.Forbidden: + case HttpStatusCode.NotFound: + case HttpStatusCode.Conflict: + return opUnknownHttpFailure(resp); + default: + return opUnknownHttpFailure(resp); + } } /** @@ -627,10 +655,9 @@ export class TalerExchangeHttpClient { */ async postPurseMerge( pursePub: string, - body: ExchangePurseMergeRequest, - cancellationToken: CancellationToken + body: ExchangePurseMergeRequest ): Promise< - | OperationOk<ExchangeMergeSuccessResponse> + OperationOk<ExchangeMergeSuccessResponse> | OperationAlternative< HttpStatusCode.UnavailableForLegalReasons, LegitimizationNeededResponse @@ -642,14 +669,9 @@ export class TalerExchangeHttpClient { | OperationFail<HttpStatusCode.NotFound> | OperationFail<HttpStatusCode.Gone> > { - const url = new URL( - `purses/${pursePub}/merge`, - this.baseUrl, - ); - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`purses/${pursePub}/merge`, { method: "POST", - body, - cancellationToken + body }); switch (resp.status) { case HttpStatusCode.Ok: @@ -680,8 +702,7 @@ export class TalerExchangeHttpClient { */ async createPurseFromReserve( pursePub: string, - body: ExchangeReservePurseRequest, - cancellationToken: CancellationToken, + body: ExchangeReservePurseRequest ): Promise< | OperationOk<void> | OperationAlternative< @@ -689,14 +710,9 @@ export class TalerExchangeHttpClient { LegitimizationNeededResponse > > { - const url = new URL( - `reserves/${pursePub}/purse`, - this.baseUrl, - ); - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`reserves/${pursePub}/purse`, { method: "POST", - body, - cancellationToken + body }); switch (resp.status) { case HttpStatusCode.Ok: @@ -717,8 +733,37 @@ export class TalerExchangeHttpClient { * https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-deposit * */ - async depositIntoPurse(): Promise<never> { - throw Error("not yet implemented"); + async depositIntoPurse( + pursePub: string, + body: ExchangePurseDeposits, + ): Promise< + OperationOk<void> + | OperationAlternative<HttpStatusCode.Conflict, TalerErrorDetail> + | OperationFail<HttpStatusCode.Forbidden> + | OperationFail<HttpStatusCode.NotFound> + | OperationFail<HttpStatusCode.Gone> + > { + const resp = await this.fetch(`purses/${pursePub}/deposit`, { + method: "POST", + body, + }); + switch (resp.status) { + case HttpStatusCode.Ok: + // FIXME: parse PurseDepositSuccessResponse + return opSuccessFromHttp(resp, codecForAny()); + case HttpStatusCode.Conflict: + // FIXME: parse PurseConflict + return opKnownAlternativeFailure(resp, resp.status, codecForAny()); + case HttpStatusCode.Forbidden: + case HttpStatusCode.NotFound: + case HttpStatusCode.Gone: + return opKnownHttpFailure( + resp.status, + resp + ); + default: + return opUnknownHttpFailure(resp); + } } // WADS @@ -740,8 +785,6 @@ export class TalerExchangeHttpClient { * */ async notifyKycBalanceLimit(account: ReserveAccount, balance: AmountString) { - const url = new URL(`kyc-wallet`, this.baseUrl); - const body: WalletKycRequest = { balance, reserve_pub: account.id, @@ -749,8 +792,7 @@ export class TalerExchangeHttpClient { signWalletAccountSetup(account.signingKey, balance), ), }; - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`kyc-wallet`, { method: "POST", body, }); @@ -780,9 +822,8 @@ export class TalerExchangeHttpClient { async checkKycStatus( signingKey: EddsaPrivP | string, paytoHash: string, - cancellationToken: CancellationToken, + longpoll: boolean = false, params: { - timeout?: number; awaitAuth?: boolean; } = {}, ): Promise< @@ -794,29 +835,24 @@ export class TalerExchangeHttpClient { | OperationFail<HttpStatusCode.Conflict> > { const url = new URL(`kyc-check/${paytoHash}`, this.baseUrl); - - if (params.timeout !== undefined) { - url.searchParams.set("timeout_ms", String(params.timeout)); - } if (params.awaitAuth !== undefined) { url.searchParams.set("await_auth", params.awaitAuth ? "YES" : "NO"); } const signature = typeof signingKey === 'string' ? signingKey : encodeCrock(signKycAuth(signingKey)); - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(url, { headers: { "Account-Owner-Signature": signature, - }, - cancellationToken - }); + } + }, longpoll); switch (resp.status) { case HttpStatusCode.Ok: case HttpStatusCode.Accepted: return opKnownAlternativeFailure(resp, resp.status, codecForAccountKycStatus()); case HttpStatusCode.NoContent: - return opFixedSuccess(undefined); + return opEmptySuccess(resp); case HttpStatusCode.Forbidden: case HttpStatusCode.NotFound: case HttpStatusCode.Conflict: @@ -833,36 +869,22 @@ export class TalerExchangeHttpClient { async checkKycInfo( token: AccessToken, known: KycRequirementInformationId[], - params: { - timeout?: number; - } = {}, + longpoll: boolean = false ) { - const url = new URL(`kyc-info/${token}`, this.baseUrl); - - if (params.timeout !== undefined) { - url.searchParams.set("timeout_ms", String(params.timeout)); - } - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`kyc-info/${token}`, { method: "GET", headers: { "If-None-Match": known.length ? known.join(",") : undefined, }, - }); - + }, longpoll); switch (resp.status) { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForKycProcessClientInformation()); case HttpStatusCode.Accepted: - return opKnownAlternativeFailure( - resp, - HttpStatusCode.Accepted, - codecForEmptyObject(), - ); case HttpStatusCode.NoContent: return opKnownAlternativeFailure( resp, - HttpStatusCode.NoContent, + resp.status, codecForEmptyObject(), ); case HttpStatusCode.NotModified: @@ -880,14 +902,11 @@ export class TalerExchangeHttpClient { requirement: KycRequirementInformationId, body: T, ) { - const url = new URL(`kyc-upload/${requirement}`, this.baseUrl); - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`kyc-upload/${requirement}`, { method: "POST", body, compress: this.preventCompression ? undefined : "deflate", }); - switch (resp.status) { case HttpStatusCode.NoContent: { this.cacheEvictor.notifySuccess( @@ -896,13 +915,11 @@ export class TalerExchangeHttpClient { return opEmptySuccess(resp); } case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.PayloadTooLarge: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -914,24 +931,19 @@ export class TalerExchangeHttpClient { requirement: KycRequirementInformationId, body: object = {}, ) { - const url = new URL(`kyc-start/${requirement}`, this.baseUrl); - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`kyc-start/${requirement}`, { method: "POST", - body, + body }); - switch (resp.status) { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForKycProcessStartInformation()); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.PayloadTooLarge: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -944,12 +956,7 @@ export class TalerExchangeHttpClient { state: string, code: string, ) { - const url = new URL( - `kyc-proof/${provider}?state=${state}&code=${code}`, - this.baseUrl, - ); - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`kyc-proof/${provider}?state=${state}&code=${code}`, { method: "GET", redirect: "manual", }); @@ -960,7 +967,7 @@ export class TalerExchangeHttpClient { case HttpStatusCode.NotFound: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -973,9 +980,7 @@ export class TalerExchangeHttpClient { * */ async getAmlMesasures(auth: OfficerAccount) { - const url = new URL(`aml/${auth.id}/measures`, this.baseUrl); - - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`aml/${auth.id}/measures`, { method: "GET", headers: { "Taler-AML-Officer-Signature": encodeCrock( @@ -988,13 +993,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForAvailableMeasureSummary()); case HttpStatusCode.Conflict: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Forbidden: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1019,7 +1022,7 @@ export class TalerExchangeHttpClient { url.searchParams.set("end_date", String(filter.until.t_ms)); } - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(url, { method: "GET", headers: { "Taler-AML-Officer-Signature": encodeCrock( @@ -1031,13 +1034,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.Ok: return opSuccessFromHttp(resp, codecForEventCounter()); case HttpStatusCode.Conflict: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Forbidden: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1069,8 +1070,7 @@ export class TalerExchangeHttpClient { ); } - const resp = await this.httpLib.fetch(url.href, { - method: "GET", + const resp = await this.fetch(url, { headers: { "Taler-AML-Officer-Signature": encodeCrock( signAmlQuery(auth.signingKey), @@ -1084,13 +1084,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.NoContent: return opFixedSuccess({ records: [] }); case HttpStatusCode.Forbidden: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1106,8 +1104,7 @@ export class TalerExchangeHttpClient { const url = new URL(`aml/${auth.id}/attributes/${account}`, this.baseUrl); addPaginationParams(url, params); - const resp = await this.httpLib.fetch(url.href, { - method: "GET", + const resp = await this.fetch(url, { headers: { "Taler-AML-Officer-Signature": encodeCrock( signAmlQuery(auth.signingKey), @@ -1121,13 +1118,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.NoContent: return opFixedSuccess({ details: [] }); case HttpStatusCode.Forbidden: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1139,13 +1134,11 @@ export class TalerExchangeHttpClient { auth: OfficerAccount, decision: Omit<AmlDecisionRequest, "officer_sig">, ) { - const url = new URL(`aml/${auth.id}/decision`, this.baseUrl); - const body: AmlDecisionRequest = { officer_sig: encodeCrock(signAmlDecision(auth.signingKey, decision)), ...decision, }; - const resp = await this.httpLib.fetch(url.href, { + const resp = await this.fetch(`aml/${auth.id}/decision`, { method: "POST", headers: { "Taler-AML-Officer-Signature": encodeCrock( @@ -1164,13 +1157,11 @@ export class TalerExchangeHttpClient { return opEmptySuccess(resp); } case HttpStatusCode.Forbidden: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1190,8 +1181,7 @@ export class TalerExchangeHttpClient { url.searchParams.set("threshold", Amounts.stringify(params.threshold)); } - const resp = await this.httpLib.fetch(url.href, { - method: "GET", + const resp = await this.fetch(url, { headers: { "Taler-AML-Officer-Signature": encodeCrock( signAmlQuery(auth.signingKey), @@ -1205,13 +1195,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.NoContent: return opFixedSuccess({ transfers: [] }); case HttpStatusCode.Forbidden: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } @@ -1231,8 +1219,7 @@ export class TalerExchangeHttpClient { url.searchParams.set("threshold", Amounts.stringify(params.threshold)); } - const resp = await this.httpLib.fetch(url.href, { - method: "GET", + const resp = await this.fetch(url, { headers: { "Taler-AML-Officer-Signature": encodeCrock( signAmlQuery(auth.signingKey), @@ -1246,13 +1233,11 @@ export class TalerExchangeHttpClient { case HttpStatusCode.NoContent: return opFixedSuccess({ transfers: [] }); case HttpStatusCode.Forbidden: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.NotFound: - return opKnownHttpFailure(resp.status, resp); case HttpStatusCode.Conflict: return opKnownHttpFailure(resp.status, resp); default: - return opUnknownFailure(resp, await readTalerErrorResponse(resp)); + return opUnknownHttpFailure(resp) } } diff --git a/packages/taler-util/src/http-common.ts b/packages/taler-util/src/http-common.ts @@ -76,9 +76,6 @@ export interface HttpRequestOptions { * How to compress the payload */ compress?: "gzip" | "deflate"; - - /** Which id to use when logging */ - logId?: string } /** diff --git a/packages/taler-util/src/http-impl.node.ts b/packages/taler-util/src/http-impl.node.ts @@ -90,7 +90,7 @@ export class HttpLibImpl implements HttpRequestLibrary { async fetch(url: string, opt?: HttpRequestOptions): Promise<HttpResponse> { const method = opt?.method?.toUpperCase() ?? "GET"; const rid = this.idCounter++; - const logHeader = opt?.logId != null ? `request ${rid} ${opt.logId}` : `request ${rid}` + const logHeader = `request ${rid}` logger.trace(`${logHeader} ${method} ${url}`); diff --git a/packages/taler-util/src/index.ts b/packages/taler-util/src/index.ts @@ -57,6 +57,7 @@ export * from "./time.js"; export * from "./timer.js"; export * from "./transaction-test-data.js"; export * from "./url.js"; +export * from "./longpool-queue.js" // FIXME: remove all this, needs refactor export * from "./types-taler-bank-conversion.js"; diff --git a/packages/taler-util/src/longpool-queue.ts b/packages/taler-util/src/longpool-queue.ts @@ -0,0 +1,86 @@ +/* + This file is part of GNU Taler + (C) 2025 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +import { openPromise, CancellationToken } from "./index.js"; +import { Logger } from "./logging.js"; + +const logger = new Logger("longpoll-queue.ts"); + +const PERMITS: number = 20; +type LongpollRunFn<T> = (timeoutMs: number) => Promise<T>; + +export class LongpollQueue { + private idCounter: number = 0; + private queue: (() => void)[] = []; + private permits: number = PERMITS + + constructor() { } + + async run<T>( + url: URL, + cancellationToken: CancellationToken, + f: LongpollRunFn<T>, + ): Promise<T> { + const hostname = url.hostname; + const rid = this.idCounter++; + + const triggerNextLongpoll = () => { + logger.trace(`cleaning up after long-poll ${rid} to ${hostname}`); + // Run pending task + const next = this.queue.shift(); + if (next != null) { + next(); + } else { + // Else release permit + this.permits++; + } + }; + + const doRunLongpoll: () => Promise<T> = async () => { + const numWaiting = this.queue.length; + const numConcurrent = PERMITS - this.permits; + logger.info( + `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting and ${numConcurrent} running` + ); + try { + const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1))); + return await f(timeoutMs); + } finally { + triggerNextLongpoll(); + } + }; + + if (this.permits > 0) { + this.permits--; + return doRunLongpoll(); + } else { + logger.info(`long-poll ${rid} to ${hostname} queued`); + const promcap = openPromise<void>(); + this.queue.push(promcap.resolve); + try { + await cancellationToken.racePromise(promcap.promise); + } finally { + logger.info( + `long-poll ${rid} to ${hostname} cancelled while queued`, + ); + triggerNextLongpoll(); + } + return doRunLongpoll(); + } + } +} +\ No newline at end of file diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts @@ -483,6 +483,16 @@ export namespace TaskRunResult { type: TaskRunResultType.NetworkRequired, }; } + /** + * Network connection is required to complete the task. + * When network is back, the transaction will be retried. + */ + export function error(detail: TalerErrorDetail): TaskRunResult { + return { + type: TaskRunResultType.Error, + errorDetail: detail + }; + } } export interface TaskRunFinishedResult { @@ -1041,7 +1051,7 @@ export async function cancelableLongPoll( url.searchParams.set("timeout_ms", `${timeoutMs}`); return cancelableFetch(wex, url, opt); }; - return wex.ws.longpollQueue.queue(wex.cancellationToken, url, longPoll); + return wex.ws.longpollQueue.run(url, wex.cancellationToken, longPoll); } /** diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -31,7 +31,6 @@ import { ScopeType, TalerErrorCode, TalerErrorDetail, - TalerExchangeHttpClient, TalerPreciseTimestamp, TalerUriAction, Transaction, @@ -59,7 +58,6 @@ import { TaskRunResult, TransactionContext, TransitionResultType, - cancelableFetch, cancelableLongPoll, constructTaskIdentifier, genericWaitForStateVal, @@ -100,7 +98,7 @@ import { constructTransactionIdentifier, isUnsuccessfulTransaction, } from "./transactions.js"; -import { WalletExecutionContext } from "./wallet.js"; +import { walletExchangeClient, WalletExecutionContext } from "./wallet.js"; import { WithdrawTransactionContext, getExchangeWithdrawalInfo, @@ -470,11 +468,8 @@ async function queryPurseForPeerPullCredit( pullIni: PeerPullCreditRecord, ): Promise<TaskRunResult> { const ctx = new PeerPullCreditTransactionContext(wex, pullIni.pursePub); - const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) - // FIXME: long poll queue again ? - const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, wex.cancellationToken, { - timeout: 30000 - }); + const exchangeClient = walletExchangeClient(pullIni.exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, true); switch (resp.case) { case "ok": @@ -567,22 +562,25 @@ async function processPeerPullCreditAbortingDeletePurse( ): Promise<TaskRunResult> { const { pursePub, pursePriv } = peerPullIni; const ctx = new PeerPullCreditTransactionContext(wex, peerPullIni.pursePub); - + const exchangeClient = walletExchangeClient(peerPullIni.exchangeBaseUrl, wex) const sigResp = await wex.cryptoApi.signDeletePurse({ pursePriv, }); - const purseUrl = new URL(`purses/${pursePub}`, peerPullIni.exchangeBaseUrl); - const resp = await cancelableFetch(wex, purseUrl, { - method: "DELETE", - headers: { - "taler-purse-signature": sigResp.sig, - } - }); - logger.info(`deleted purse with response status ${resp.status}`); - - await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.AbortingDeletePurse, PeerPullPaymentCreditStatus.Aborted); - - return TaskRunResult.backoff(); + const resp = await exchangeClient.deletePurse(pursePub, sigResp.sig); + switch (resp.case) { + case "ok": + case HttpStatusCode.NotFound: + await recordTransitionStatus(ctx, PeerPullPaymentCreditStatus.AbortingDeletePurse, PeerPullPaymentCreditStatus.Aborted); + return TaskRunResult.finished(); + case HttpStatusCode.Forbidden: + // FIXME appropriate error + throw Error(`base signature`); + case HttpStatusCode.Conflict: + // FIXME check if done ? + throw Error(`cannot be deleted`); + default: + assertUnreachable(resp); + } } async function handlePeerPullCreditWithdrawing( @@ -710,17 +708,13 @@ async function handlePeerPullCreditCreatePurse( econtract: econtractResp.econtract, }; - const exchangeClient = new TalerExchangeHttpClient( - pullIni.exchangeBaseUrl, - wex.http, - ); + const exchangeClient = walletExchangeClient(pullIni.exchangeBaseUrl, wex) logger.info(`reserve purse request: ${j2s(reservePurseReqBody)}`); const httpResp = await exchangeClient.createPurseFromReserve( mergeReserve.reservePub, - reservePurseReqBody, - wex.cancellationToken + reservePurseReqBody ) switch (httpResp.case) { @@ -879,11 +873,8 @@ async function processPeerPullCreditKycRequired( accountPub: mergeReserveInfo.reservePub, }); - const exchangeClient = new TalerExchangeHttpClient( - peerIni.exchangeBaseUrl, - wex.http - ) - const res = await exchangeClient.checkKycStatus(sigResp.sig, kycPayoHash, wex.cancellationToken); + const exchangeClient = walletExchangeClient(peerIni.exchangeBaseUrl, wex) + const res = await exchangeClient.checkKycStatus(sigResp.sig, kycPayoHash); switch (res.case) { case "ok": diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -29,7 +29,6 @@ import { ExchangePurseDeposits, HttpStatusCode, Logger, - NotificationType, ObservabilityEventType, PeerContractTerms, PreparePeerPullDebitRequest, @@ -39,7 +38,6 @@ import { TalerError, TalerErrorCode, TalerErrorDetail, - TalerExchangeHttpClient, TalerPreciseTimestamp, TalerProtocolViolationError, Transaction, @@ -53,7 +51,6 @@ import { assertUnreachable, checkDbInvariant, checkLogicInvariant, - codecForAny, codecForExchangeGetContractResponse, codecForPeerContractTerms, decodeCrock, @@ -65,16 +62,13 @@ import { parsePayPullUri, } from "@gnu-taler/taler-util"; import { - HttpResponse, readSuccessResponseJsonOrThrow, - readTalerErrorResponse, } from "@gnu-taler/taler-util/http"; import { PreviousPayCoins, selectPeerCoins } from "./coinSelection.js"; import { PendingTaskType, TaskIdStr, TaskRunResult, - TaskRunResultType, TransactionContext, TransitionResultType, cancelableFetch, @@ -107,7 +101,7 @@ import { isUnsuccessfulTransaction, parseTransactionIdentifier, } from "./transactions.js"; -import { WalletExecutionContext } from "./wallet.js"; +import { walletExchangeClient, WalletExecutionContext } from "./wallet.js"; const logger = new Logger("pay-peer-pull-debit.ts"); @@ -323,17 +317,16 @@ export class PeerPullDebitTransactionContext implements TransactionContext { async function handlePurseCreationConflict( ctx: PeerPullDebitTransactionContext, peerPullInc: PeerPullPaymentIncomingRecord, - resp: HttpResponse, + detail: TalerErrorDetail, ): Promise<TaskRunResult> { const ws = ctx.wex; - const errResp = await readTalerErrorResponse(resp); - if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) { + if (detail.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) { await ctx.failTransaction(); return TaskRunResult.finished(); } // FIXME: Properly parse! - const brokenCoinPub = (errResp as any).coin_pub; + const brokenCoinPub = (detail as any).coin_pub; logger.trace(`excluded broken coin pub=${brokenCoinPub}`); if (!brokenCoinPub) { @@ -409,11 +402,8 @@ async function processPeerPullDebitDialogProposed( pullIni: PeerPullPaymentIncomingRecord, ): Promise<TaskRunResult> { const ctx = new PeerPullDebitTransactionContext(wex, pullIni.peerPullDebitId); - const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) - // FIXME: long poll queue again ? - const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, wex.cancellationToken, { - timeout: 30000 - }); + const exchangeClient = walletExchangeClient(pullIni.exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, true); switch (resp.case) { case "ok": @@ -518,11 +508,7 @@ async function processPeerPullDebitPendingDeposit( return TaskRunResult.backoff(); } } - - const purseDepositUrl = new URL( - `purses/${pursePub}/deposit`, - peerPullInc.exchangeBaseUrl, - ); + const exchangeClient = walletExchangeClient(peerPullInc.exchangeBaseUrl, wex) // FIXME: We could skip batches that we've already submitted. @@ -552,21 +538,10 @@ async function processPeerPullDebitPendingDeposit( if (logger.shouldLogTrace()) { logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); } - - const httpResp = await cancelableFetch(wex, purseDepositUrl, { - method: "POST", - body: depositPayload - }); - - switch (httpResp.status) { - case HttpStatusCode.Ok: { - const resp = await readSuccessResponseJsonOrThrow( - httpResp, - codecForAny(), - ); - logger.trace(`purse deposit response: ${j2s(resp)}`); + const resp = await exchangeClient.depositIntoPurse(pursePub, depositPayload); + switch (resp.case) { + case "ok": continue; - } case HttpStatusCode.Gone: { await ctx.abortTransaction( makeTalerErrorDetail( @@ -576,19 +551,18 @@ async function processPeerPullDebitPendingDeposit( ); return TaskRunResult.backoff(); } - case HttpStatusCode.Conflict: { - return handlePurseCreationConflict(ctx, peerPullInc, httpResp); - } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, - }; - } + case HttpStatusCode.Conflict: + return handlePurseCreationConflict(ctx, peerPullInc, resp.body); + case HttpStatusCode.Forbidden: + // FIXME: appropriated error code + throw Error("bad signature") + case HttpStatusCode.NotFound: + // FIXME: appropriated error code + throw Error("unknown peer pull debit") + default: + assertUnreachable(resp) } } - // All batches succeeded, we can transition! await recordTransitionStatus(ctx, PeerPullDebitRecordStatus.PendingDeposit, PeerPullDebitRecordStatus.Done); return TaskRunResult.finished(); @@ -846,8 +820,8 @@ export async function preparePeerPullDebit( pursePub: pursePub, }); - const exchangeClient = new TalerExchangeHttpClient(exchangeBaseUrl, wex.http) - const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, wex.cancellationToken); + const exchangeClient = walletExchangeClient(exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtMerge(pursePub); switch (resp.case) { case "ok": break; diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -30,7 +30,6 @@ import { PreparePeerPushCreditRequest, PreparePeerPushCreditResponse, TalerErrorDetail, - TalerExchangeHttpClient, TalerPreciseTimestamp, Transaction, TransactionAction, @@ -111,7 +110,7 @@ import { notifyTransition, parseTransactionIdentifier, } from "./transactions.js"; -import { WalletExecutionContext } from "./wallet.js"; +import { walletExchangeClient, WalletExecutionContext } from "./wallet.js"; import { PerformCreateWithdrawalGroupResult, WithdrawTransactionContext, @@ -510,9 +509,8 @@ export async function preparePeerPushCredit( }); const contractTerms = codecForPeerContractTerms().decode(dec.contractTerms); - - const exchangeClient = new TalerExchangeHttpClient(exchangeBaseUrl, wex.http) - const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub, wex.cancellationToken); + const exchangeClient = walletExchangeClient(exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub); switch (resp.case) { case "ok": break; @@ -772,10 +770,7 @@ async function handlePendingMerge( reservePriv: mergeReserveInfo.reservePriv, }); - const exchangeClient = new TalerExchangeHttpClient( - peerInc.exchangeBaseUrl, - wex.http, - ); + const exchangeClient = walletExchangeClient(peerInc.exchangeBaseUrl, wex) const mergeReq: ExchangePurseMergeRequest = { payto_uri: reservePayto, @@ -786,8 +781,7 @@ async function handlePendingMerge( const mergeResp = await exchangeClient.postPurseMerge( peerInc.pursePub, - mergeReq, - wex.cancellationToken, + mergeReq ); logger.trace(`merge request: ${j2s(mergeReq)}`); @@ -965,11 +959,8 @@ async function processPeerPushDebitDialogProposed( wex, pullIni.peerPushCreditId, ); - const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) - // FIXME: long poll queue again ? - const resp = await exchangeClient.getPurseStatusAtMerge(pullIni.pursePub, wex.cancellationToken, { - timeout: 30000 - }); + const exchangeClient = walletExchangeClient(pullIni.exchangeBaseUrl, wex); + const resp = await exchangeClient.getPurseStatusAtMerge(pullIni.pursePub, true); switch (resp.case) { case "ok": diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -33,7 +33,6 @@ import { TalerError, TalerErrorCode, TalerErrorDetail, - TalerExchangeHttpClient, TalerPreciseTimestamp, TalerProtocolTimestamp, TalerProtocolViolationError, @@ -104,7 +103,7 @@ import { constructTransactionIdentifier, isUnsuccessfulTransaction, } from "./transactions.js"; -import { WalletExecutionContext } from "./wallet.js"; +import { walletExchangeClient, WalletExecutionContext } from "./wallet.js"; import { updateWithdrawalDenomsForCurrency } from "./withdraw.js"; const logger = new Logger("pay-peer-push-debit.ts"); @@ -739,11 +738,8 @@ async function processPeerPushDebitCreateReserve( } // All batches done! - logger.info("ALL DONE") - - const exchangeClient = new TalerExchangeHttpClient(peerPushInitiation.exchangeBaseUrl, wex.http) - const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub, wex.cancellationToken); - logger.info(j2s(resp)); + const exchangeClient = walletExchangeClient(peerPushInitiation.exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub); switch (resp.case) { case "ok": await recordTransitionStatus(ctx, PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady) @@ -835,11 +831,8 @@ async function processPeerPushDebitReady( logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; const ctx = new PeerPushDebitTransactionContext(wex, pursePub); - const exchangeClient = new TalerExchangeHttpClient(peerPushInitiation.exchangeBaseUrl, wex.http) - // FIXME: long poll queue again ? - const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, wex.cancellationToken, { - timeout: 30000 - }); + const exchangeClient = walletExchangeClient(peerPushInitiation.exchangeBaseUrl, wex) + const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, true); switch (resp.case) { case "ok": { diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts @@ -92,6 +92,7 @@ import { ListGlobalCurrencyAuditorsResponse, ListGlobalCurrencyExchangesResponse, Logger, + LongpollQueue, NotificationType, ObservabilityContext, ObservabilityEventType, @@ -112,6 +113,7 @@ import { TalerBankIntegrationHttpClient, TalerError, TalerErrorCode, + TalerExchangeHttpClient, TalerProtocolTimestamp, TalerUriAction, TestingGetDenomStatsRequest, @@ -419,6 +421,14 @@ export interface WalletExecutionContext { readonly taskScheduler: TaskScheduler; } +export function walletExchangeClient(baseUrl: string, wex: WalletExecutionContext): TalerExchangeHttpClient { + return new TalerExchangeHttpClient(baseUrl, { + httpClient: wex.http, + cancelationToken: wex.cancellationToken, + longPollQueue: wex.ws.longpollQueue + }) +} + export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; @@ -2451,7 +2461,7 @@ async function dispatchWalletCoreApiRequest( wex = getObservedWalletExecutionContext(ws, cts.token, cts, oc); } else { oc = { - observe(evt) {}, + observe(evt) { }, }; wex = getNormalWalletExecutionContext(ws, cts.token, cts, oc); } @@ -2596,7 +2606,7 @@ export class Cache<T> { constructor( private maxCapacity: number, private cacheDuration: Duration, - ) {} + ) { } get(key: string): T | undefined { const r = this.map.get(key); @@ -2632,7 +2642,7 @@ export class Cache<T> { * Implementation of triggers for the wallet DB. */ class WalletDbTriggerSpec implements TriggerSpec { - constructor(public ws: InternalWalletState) {} + constructor(public ws: InternalWalletState) { } afterCommit(info: AfterCommitInfo): void { if (info.mode !== "readwrite") { @@ -2656,92 +2666,6 @@ class WalletDbTriggerSpec implements TriggerSpec { } } -type LongpollRunFn<T> = (timeoutMs: number) => Promise<T>; - -const PER_HOSTNAME_PERMITS: number = 5; - -class LongpollQueue { - private idCounter: number = 1; - private hostNameQueues: Map< - string, - { queue: (() => void)[]; permit: number } - > = new Map(); - // FIXME add an additional global semaphore - - constructor() {} - - async queue<T>( - cancellationToken: CancellationToken, - url: URL, - f: LongpollRunFn<T>, - ): Promise<T> { - const hostname = url.hostname; - const rid = this.idCounter++; - - const triggerNextLongpoll = () => { - logger.trace(`cleaning up after long-poll ${rid} to ${hostname}`); - const state = this.hostNameQueues.get(hostname); - if (state == null) { - return; - } - // Run pending task - const next = state.queue.shift(); - if (next != null) { - next(); - } else { - // Else release permit - state.permit++; - if (state.permit == PER_HOSTNAME_PERMITS) { - this.hostNameQueues.delete(url.hostname); - } - } - }; - - const doRunLongpoll: () => Promise<T> = async () => { - const st = this.hostNameQueues.get(hostname); - const numWaiting = st?.queue.length ?? 0; - logger.info( - `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting`, - ); - try { - const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1))); - return await f(timeoutMs); - } finally { - triggerNextLongpoll(); - } - }; - - // Get or set per hostname queue - let state = this.hostNameQueues.get(hostname); - if (state == null) { - state = { - permit: PER_HOSTNAME_PERMITS, - queue: [], - }; - this.hostNameQueues.set(hostname, state); - } - - if (state.permit > 0) { - state.permit--; - this.hostNameQueues.set(hostname, state); - return doRunLongpoll(); - } else { - logger.info(`long-poll request ${rid} to ${hostname} queued`); - const promcap = openPromise<void>(); - state.queue.push(promcap.resolve); - try { - await cancellationToken.racePromise(promcap.promise); - } finally { - logger.info( - `long-poll request ${rid} to ${hostname} cancelled while queued`, - ); - triggerNextLongpoll(); - } - return doRunLongpoll(); - } - } -} - /** * Internal state of the wallet. * diff --git a/packages/web-util/src/context/exchange-api.ts b/packages/web-util/src/context/exchange-api.ts @@ -171,7 +171,7 @@ export const ExchangeApiProvider = ({ }); } - const {lib: unthrottledApi} = buildExchangeApiClient(baseUrl, evictors, !!preventCompression, true) + const { lib: unthrottledApi } = buildExchangeApiClient(baseUrl, evictors, !!preventCompression, true) const value: ExchangeContextType = { url: baseUrl, @@ -206,7 +206,7 @@ function buildExchangeApiClient( }, }); - const ex = new TalerExchangeHttpClient(url.href, httpLib, evictors.exchange, preventCompression); + const ex = new TalerExchangeHttpClient(url.href, { httpClient: httpLib, cacheEvictor: evictors.exchange, preventCompression }); async function getRemoteConfig(): Promise<KeysAndConfigType> { const configResp = await ex.getConfig();