commit bb88585c7cc80907acd4c811c049698b7c60be5f parent 3c04a0703f3d3a3b05c68d470539aa8f0b0b5dc7 Author: Antoine A <> Date: Thu, 24 Apr 2025 13:00:12 +0200 wallet-core: improve P2P logic and improve Node HTTP logging Diffstat:
11 files changed, 320 insertions(+), 238 deletions(-)
diff --git a/packages/taler-harness/src/integrationtests/test-peer-pull.ts b/packages/taler-harness/src/integrationtests/test-peer-pull.ts @@ -214,16 +214,12 @@ export async function runPeerPullTest(t: GlobalTestState) { }), ]); - const prepare1 = await wallet1.call( + const completed_purse = await t.assertThrowsTalerErrorAsync(wallet1.call( WalletApiOperation.PreparePeerPullDebit, { talerUri: tx.talerUri! } - ); - await wallet1.call(WalletApiOperation.TestingWaitTransactionState, { - transactionId: prepare1.transactionId, - txState: { - major: TransactionMajorState.Aborted, - }, - }); + )); + // FIXME this should fail with a proper error code + t.assertTrue(completed_purse.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION); } t.logStep("P2P pull self"); @@ -281,17 +277,44 @@ export async function runPeerPullTest(t: GlobalTestState) { ]); await exchange.start() - - await Promise.all([wallet2, wallet3].map(w => - w.call(WalletApiOperation.TestingWaitTransactionsFinal, {})) - ); - const [tx2, tx3] = await Promise.all([wallet2.getTx(prepare2.transactionId), wallet3.getTx(prepare3.transactionId)]); - if (tx2.txState.major === TransactionMajorState.Done) { - t.assertTrue(tx3.txState.major === TransactionMajorState.Aborted) - } else { - t.assertTrue(tx3.txState.major === TransactionMajorState.Done) - t.assertTrue(tx2.txState.major === TransactionMajorState.Aborted) - } + await Promise.all([ + wallet1.call(WalletApiOperation.TestingWaitTransactionState, { + transactionId: tx.transactionId, + txState: { + major: TransactionMajorState.Done, + }, + }), + Promise.race([ + Promise.all([ + wallet2.call(WalletApiOperation.TestingWaitTransactionState, { + transactionId: prepare2.transactionId, + txState: { + major: TransactionMajorState.Done, + }, + }), + wallet3.call(WalletApiOperation.TestingWaitTransactionState, { + transactionId: prepare3.transactionId, + txState: { + major: TransactionMajorState.Aborted, + }, + }), + ]), + Promise.all([ + wallet2.call(WalletApiOperation.TestingWaitTransactionState, { + transactionId: prepare2.transactionId, + txState: { + major: TransactionMajorState.Aborted, + }, + }), + wallet3.call(WalletApiOperation.TestingWaitTransactionState, { + transactionId: prepare3.transactionId, + txState: { + major: TransactionMajorState.Done, + }, + }), + ]) + ]) + ]); } t.logStep("P2P pull abort"); @@ -334,12 +357,12 @@ export async function runPeerPullTest(t: GlobalTestState) { }), ]); - const unknown_contract = await t.assertThrowsTalerErrorAsync(wallet1.call( + const aborted_contract = await t.assertThrowsTalerErrorAsync(wallet1.call( WalletApiOperation.PreparePeerPullDebit, { talerUri: tx.talerUri! } )); // FIXME this should fail with a proper error code - t.assertTrue(unknown_contract.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR); + t.assertTrue(aborted_contract.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR); } t.logStep("P2P pull expire"); @@ -394,11 +417,11 @@ export async function runPeerPullTest(t: GlobalTestState) { }), ]); - const gone = await t.assertThrowsTalerErrorAsync(wallet1.call( + const expired_purse = await t.assertThrowsTalerErrorAsync(wallet1.call( WalletApiOperation.PreparePeerPullDebit, { talerUri: tx.talerUri! } )); - t.assertTrue(gone.errorDetail.code === TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE); + t.assertTrue(expired_purse.errorDetail.code === TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE); } } diff --git a/packages/taler-harness/src/integrationtests/test-peer-push.ts b/packages/taler-harness/src/integrationtests/test-peer-push.ts @@ -139,6 +139,13 @@ export async function runPeerPushTest(t: GlobalTestState) { ); // FIXME propagate the error correctly // t.assertTrue(ex1.errorDetail.code === TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE); + + const unknown_purse = await t.assertThrowsTalerErrorAsync(wallet1.call( + WalletApiOperation.PreparePeerPushCredit, + { talerUri: "taler+http://pay-push/localhost:8081/MQP1DP1J94ZZWNQS7TRDF1KJZ7V8H74CZF41V90FKXBPN5GNRN6G" } + )); + // FIXME this should fail with a proper error code + t.assertTrue(unknown_purse.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR); } t.logStep("P2P push confirm"); diff --git a/packages/taler-util/src/http-client/exchange.ts b/packages/taler-util/src/http-client/exchange.ts @@ -55,6 +55,7 @@ import { ExchangeMergeConflictResponse, ExchangeMergeSuccessResponse, ExchangePurseMergeRequest, + ExchangePurseStatus, ExchangeReservePurseRequest, ExchangeVersionResponse, KycRequirementInformationId, @@ -70,6 +71,7 @@ import { codecForExchangeKeysResponse, codecForExchangeMergeConflictResponse, codecForExchangeMergeSuccessResponse, + codecForExchangePurseStatus, codecForExchangeTransferList, codecForKycProcessClientInformation, codecForKycProcessStartInformation, @@ -544,16 +546,62 @@ export class TalerExchangeHttpClient { * https://docs.taler.net/core/api-exchange.html#get--purses-$PURSE_PUB-merge * */ - async getPurseInfoAtMerge(): Promise<never> { - throw Error("not yet implemented"); + async getPurseStatusAtMerge( + pursePub: string, + cancellationToken: CancellationToken, + params: { + timeout?: number; + } = {}, + ): Promise< + OperationOk<ExchangePurseStatus> + | OperationFail<HttpStatusCode.NotFound> + | OperationFail<HttpStatusCode.Gone> + > { + const url = new URL(`purses/${pursePub}/merge`, this.baseUrl); + if (params.timeout !== undefined) { + url.searchParams.set("timeout_ms", String(params.timeout)); + } + const resp = await this.httpLib.fetch(url.href, { cancellationToken, logId: "purse status" }); + switch (resp.status) { + case HttpStatusCode.Ok: + return opSuccessFromHttp(resp, codecForExchangePurseStatus()); + case HttpStatusCode.Gone: + case HttpStatusCode.NotFound: + return opKnownHttpFailure(resp.status, resp); + default: + return opUnknownHttpFailure(resp); + } } /** * https://docs.taler.net/core/api-exchange.html#get--purses-$PURSE_PUB-deposit * */ - async getPurseInfoAtDeposit(): Promise<never> { - throw Error("not yet implemented"); + async getPurseStatusAtDeposit( + pursePub: string, + cancellationToken: CancellationToken, + params: { + timeout?: number; + } = {}, + ): Promise< + OperationOk<ExchangePurseStatus> + | OperationFail<HttpStatusCode.NotFound> + | OperationFail<HttpStatusCode.Gone> + > { + const url = new URL(`purses/${pursePub}/deposit`, this.baseUrl); + if (params.timeout !== undefined) { + url.searchParams.set("timeout_ms", String(params.timeout)); + } + const resp = await this.httpLib.fetch(url.href, { cancellationToken, logId: "purse status" }); + switch (resp.status) { + case HttpStatusCode.Ok: + return opSuccessFromHttp(resp, codecForExchangePurseStatus()); + case HttpStatusCode.Gone: + case HttpStatusCode.NotFound: + return opKnownHttpFailure(resp.status, resp); + default: + return opUnknownHttpFailure(resp); + } } /** diff --git a/packages/taler-util/src/http-common.ts b/packages/taler-util/src/http-common.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2023 Taler Systems S.A. + (C) 2023-2025 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -76,6 +76,9 @@ export interface HttpRequestOptions { * How to compress the payload */ compress?: "gzip" | "deflate"; + + /** Which id to use when logging */ + logId?: string } /** diff --git a/packages/taler-util/src/http-impl.node.ts b/packages/taler-util/src/http-impl.node.ts @@ -38,6 +38,7 @@ import { RequestThrottler, TalerErrorCode, URL, + j2s, typedArrayConcat, } from "./index.js"; @@ -72,6 +73,7 @@ export class HttpLibImpl implements HttpRequestLibrary { private throttle = new RequestThrottler(); private throttlingEnabled = true; private requireTls = false; + private idCounter: number = 1; constructor(args?: HttpLibArgs) { this.throttlingEnabled = args?.enableThrottling ?? true; @@ -87,8 +89,10 @@ export class HttpLibImpl implements HttpRequestLibrary { async fetch(url: string, opt?: HttpRequestOptions): Promise<HttpResponse> { const method = opt?.method?.toUpperCase() ?? "GET"; + const rid = this.idCounter++; + const logHeader = opt?.logId != null ? `request ${rid} ${opt.logId}` : `request ${rid}` - logger.trace(`Requesting ${method} ${url}`); + logger.trace(`${logHeader} ${method} ${url}`); const parsedUrl = new URL(url); if (this.throttlingEnabled && this.throttle.applyThrottle(url)) { @@ -99,7 +103,7 @@ export class HttpLibImpl implements HttpRequestLibrary { requestUrl: url, throttleStats: this.throttle.getThrottleStats(url), }, - `request to origin ${parsedUrl.origin} was throttled`, + `request ${rid} to ${parsedUrl.origin} was throttled`, ); } if (this.requireTls && parsedUrl.protocol !== "https:") { @@ -109,7 +113,7 @@ export class HttpLibImpl implements HttpRequestLibrary { requestMethod: method, requestUrl: url, }, - `request to ${parsedUrl.origin} is not possible with protocol ${parsedUrl.protocol}`, + `request ${rid} to ${parsedUrl.origin} is not possible with protocol ${parsedUrl.protocol}`, ); } let timeoutMs: number | undefined; @@ -126,7 +130,7 @@ export class HttpLibImpl implements HttpRequestLibrary { requestHeadersMap[key] = value; }); } - logger.trace(`request timeout ${timeoutMs} ms`); + logger.trace(`request ${rid} timeout ${timeoutMs} ms`); let reqBody: ArrayBuffer | undefined; @@ -232,13 +236,21 @@ export class HttpLibImpl implements HttpRequestLibrary { }, json() { const text = textDecoder.decode(data); - return JSON.parse(text); + const json = JSON.parse(text) + if (logger.shouldLogTrace()) { + logger.trace(`${logHeader} JSON: ${j2s(json)}`); + } + return json; }, async text() { const text = textDecoder.decode(data); + if (logger.shouldLogTrace()) { + logger.trace(`${logHeader} TEXT: ${text}`); + } return text; }, }; + logger.trace(`${logHeader} status code ${resp.status}`); doCleanup(); if (SHOW_CURL_HTTP_REQUEST) { console.log(`TALER_API_DEBUG: ${res.statusCode} ${textDecoder.decode(data)}`) diff --git a/packages/taler-util/src/types-taler-exchange.ts b/packages/taler-util/src/types-taler-exchange.ts @@ -1,6 +1,6 @@ /* This file is part of GNU Taler - (C) 2024 Taler Systems S.A. + (C) 2024-2025 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -1487,6 +1487,33 @@ export const codecForBatchDepositSuccess = (): Codec<BatchDepositSuccess> => .property("transaction_base_url", codecOptional(codecForString())) .build("BatchDepositSuccess"); +export interface ExchangePurseStatus { + // Total amount deposited into the purse so far. + // If 'total_deposit_amount' minus 'deposit_fees' + // exceeds 'merge_value_after_fees', and a + // 'merge_request' exists for the purse, then the + // purse will (have been) merged with the account. + balance: AmountString; + + // Time of the merge, missing if "never". + deposit_timestamp?: TalerProtocolTimestamp; + + // Time of the deposits being complete, missing if "never". + // Note that this time may not be "stable": once sufficient + // deposits have been made, is "now" before the purse + // expiration, and otherwise set to the purse expiration. + // However, this should also not be relied upon. The key + // property is that it is either "never" or in the past. + merge_timestamp?: TalerProtocolTimestamp; +} + +export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> => + buildCodecForObject<ExchangePurseStatus>() + .property("balance", codecForAmountString()) + .property("deposit_timestamp", codecOptional(codecForTimestamp)) + .property("merge_timestamp", codecOptional(codecForTimestamp)) + .build("ExchangePurseStatus"); + export interface TrackTransactionWired { // Raw wire transfer identifier of the deposit. wtid: Base32String; diff --git a/packages/taler-wallet-core/src/pay-peer-common.ts b/packages/taler-wallet-core/src/pay-peer-common.ts @@ -1,6 +1,7 @@ /* This file is part of GNU Taler (C) 2022 GNUnet e.V. + (C) 2025 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -14,21 +15,13 @@ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -/** - * Imports. - */ import { AmountJson, - AmountString, Amounts, - Codec, + ExchangePurseStatus, SelectedProspectiveCoin, TalerProtocolTimestamp, - buildCodecForObject, checkDbInvariant, - codecForAmountString, - codecForTimestamp, - codecOptional, } from "@gnu-taler/taler-util"; import { SpendCoinDetails } from "./crypto/cryptoImplementation.js"; import { @@ -123,19 +116,6 @@ export async function getTotalPeerPaymentCost( ); } -interface ExchangePurseStatus { - balance: AmountString; - deposit_timestamp?: TalerProtocolTimestamp; - merge_timestamp?: TalerProtocolTimestamp; -} - -export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> => - buildCodecForObject<ExchangePurseStatus>() - .property("balance", codecForAmountString()) - .property("deposit_timestamp", codecOptional(codecForTimestamp)) - .property("merge_timestamp", codecOptional(codecForTimestamp)) - .build("ExchangePurseStatus"); - export async function getMergeReserveInfo( wex: WalletExecutionContext, req: { @@ -177,3 +157,17 @@ export async function getMergeReserveInfo( return mergeReserveRecord; } + +/** Check if a purse is merged */ +export function isPurseMerged(purse: ExchangePurseStatus): boolean { + const mergeTimestamp = purse.merge_timestamp; + return mergeTimestamp != null && + !TalerProtocolTimestamp.isNever(mergeTimestamp) +} + +/** Check if a purse is deposited */ +export function isPurseDeposited(purse: ExchangePurseStatus): boolean { + const depositTimestamp = purse.deposit_timestamp; + return depositTimestamp != null && + !TalerProtocolTimestamp.isNever(depositTimestamp) +} diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -33,7 +33,6 @@ import { TalerErrorDetail, TalerExchangeHttpClient, TalerPreciseTimestamp, - TalerProtocolTimestamp, TalerUriAction, Transaction, TransactionAction, @@ -46,7 +45,6 @@ import { WalletNotification, assertUnreachable, checkDbInvariant, - codecForAccountKycStatus, encodeCrock, getRandomBytes, j2s, @@ -55,10 +53,6 @@ import { talerPaytoFromExchangeReserve, } from "@gnu-taler/taler-util"; import { - readResponseJsonOrThrow, - readSuccessResponseJsonOrThrow, -} from "@gnu-taler/taler-util/http"; -import { PendingTaskType, TaskIdStr, TaskIdentifiers, @@ -95,8 +89,8 @@ import { } from "./exchanges.js"; import { checkPeerCreditHardLimitExceeded } from "./kyc.js"; import { - codecForExchangePurseStatus, getMergeReserveInfo, + isPurseDeposited, } from "./pay-peer-common.js"; import { BalanceEffect, @@ -587,39 +581,28 @@ async function queryPurseForPeerPullCredit( wex: WalletExecutionContext, pullIni: PeerPullCreditRecord, ): Promise<TaskRunResult> { - const purseDepositUrl = new URL( - `purses/${pullIni.pursePub}/deposit`, - pullIni.exchangeBaseUrl, - ); - logger.info(`querying purse status via ${purseDepositUrl.href}`); - const resp = await cancelableLongPoll(wex, purseDepositUrl, { - timeout: { d_ms: 60000 }, - }); const ctx = new PeerPullCreditTransactionContext(wex, pullIni.pursePub); + const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) + // FIXME: long poll queue again ? + const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, wex.cancellationToken, { + timeout: 30000 + }); - logger.info(`purse status code: HTTP ${resp.status}`); - - switch (resp.status) { - case HttpStatusCode.Gone: { + switch (resp.case) { + case "ok": + break; + case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.Expired); return TaskRunResult.finished(); - } case HttpStatusCode.NotFound: // FIXME: Maybe check error code? 404 could also mean something else. return TaskRunResult.longpollReturnedPending(); + default: + assertUnreachable(resp) } - const result = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), - ); - - logger.trace(`purse status: ${j2s(result)}`); - - const depositTimestamp = result.deposit_timestamp; - - if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) { + if (!isPurseDeposited(resp.body)) { logger.info("purse not ready yet (no deposit)"); return TaskRunResult.longpollReturnedPending(); } diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -39,8 +39,8 @@ import { TalerError, TalerErrorCode, TalerErrorDetail, + TalerExchangeHttpClient, TalerPreciseTimestamp, - TalerProtocolTimestamp, TalerProtocolViolationError, Transaction, TransactionAction, @@ -78,7 +78,6 @@ import { TransactionContext, TransitionResultType, cancelableFetch, - cancelableLongPoll, constructTaskIdentifier, spendCoins, } from "./common.js"; @@ -94,8 +93,8 @@ import { } from "./db.js"; import { getExchangeScopeInfo, getScopeForAllExchanges } from "./exchanges.js"; import { - codecForExchangePurseStatus, getTotalPeerPaymentCost, + isPurseDeposited, queryCoinInfosForSelection, } from "./pay-peer-common.js"; import { createRefreshGroup } from "./refresh.js"; @@ -519,37 +518,28 @@ async function processPeerPullDebitDialogProposed( wex: WalletExecutionContext, pullIni: PeerPullPaymentIncomingRecord, ): Promise<TaskRunResult> { - const purseDepositUrl = new URL( - `purses/${pullIni.pursePub}/deposit`, - pullIni.exchangeBaseUrl, - ); - logger.info(`querying purse status via ${purseDepositUrl.href}`); - const resp = await cancelableLongPoll(wex, purseDepositUrl); const ctx = new PeerPullDebitTransactionContext(wex, pullIni.peerPullDebitId); + const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) + // FIXME: long poll queue again ? + const resp = await exchangeClient.getPurseStatusAtDeposit(pullIni.pursePub, wex.cancellationToken, { + timeout: 30000 + }); - logger.info(`purse status code: HTTP ${resp.status}`); - - switch (resp.status) { - case HttpStatusCode.Gone: { + switch (resp.case) { + case "ok": + break; + case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted); return TaskRunResult.finished(); - } case HttpStatusCode.NotFound: // FIXME: Maybe check error code? 404 could also mean something else. return TaskRunResult.longpollReturnedPending(); + default: + assertUnreachable(resp) } - const result = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), - ); - - logger.trace(`purse status: ${j2s(result)}`); - - const depositTimestamp = result.deposit_timestamp; - - if (depositTimestamp != null && !TalerProtocolTimestamp.isNever(depositTimestamp)) { + if (isPurseDeposited(resp.body)) { logger.info("purse completed by another wallet"); await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted); return TaskRunResult.finished(); @@ -966,23 +956,28 @@ export async function preparePeerPullDebit( pursePub: pursePub, }); - const getPurseUrl = new URL(`purses/${pursePub}/merge`, exchangeBaseUrl); - - const purseHttpResp = await cancelableFetch(wex, getPurseUrl); - - if (purseHttpResp.status == HttpStatusCode.Gone) { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE, - {}, - ); + const exchangeClient = new TalerExchangeHttpClient(exchangeBaseUrl, wex.http) + const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, wex.cancellationToken); + switch (resp.case) { + case "ok": + break; + case HttpStatusCode.Gone: + throw TalerError.fromDetail( + TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE, + {}, + ); + case HttpStatusCode.NotFound: + // FIXME: appropriated error code + throw Error("unknown peer pull debit") + default: + assertUnreachable(resp) } - const purseStatus = await readSuccessResponseJsonOrThrow( - purseHttpResp, - codecForExchangePurseStatus(), - ); - - // FIXME: throw if already completed + if (isPurseDeposited(resp.body)) { + logger.info("purse completed by another wallet"); + // FIXME: appropriated error code + throw Error("peer pull debit already completed") + } const peerPullDebitId = encodeCrock(getRandomBytes(32)); diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -32,7 +32,6 @@ import { TalerErrorDetail, TalerExchangeHttpClient, TalerPreciseTimestamp, - TalerProtocolTimestamp, Transaction, TransactionAction, TransactionIdStr, @@ -98,8 +97,8 @@ import { getPeerCreditLimitInfo, } from "./kyc.js"; import { - codecForExchangePurseStatus, getMergeReserveInfo, + isPurseMerged, } from "./pay-peer-common.js"; import { BalanceEffect, @@ -606,14 +605,22 @@ export async function preparePeerPushCredit( const contractTerms = codecForPeerContractTerms().decode(dec.contractTerms); - const getPurseUrl = new URL(`purses/${pursePub}/deposit`, exchangeBaseUrl); + const exchangeClient = new TalerExchangeHttpClient(exchangeBaseUrl, wex.http) + const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub, wex.cancellationToken); + switch (resp.case) { + case "ok": + break; + case HttpStatusCode.Gone: + // FIXME: appropriated error code + throw Error("aborted peer push credit") + case HttpStatusCode.NotFound: + // FIXME: appropriated error code + throw Error("unknown peer push credit") + default: + assertUnreachable(resp) + } - const purseHttpResp = await cancelableFetch(wex, getPurseUrl); - - const purseStatus = await readSuccessResponseJsonOrThrow( - purseHttpResp, - codecForExchangePurseStatus(), - ); + const purseStatus = resp.body; logger.info( `peer push credit, purse balance ${purseStatus.balance}, contract amount ${contractTerms.amount}`, @@ -1069,46 +1076,34 @@ async function processPeerPushDebitDialogProposed( wex: WalletExecutionContext, pullIni: PeerPushPaymentIncomingRecord, ): Promise<TaskRunResult> { - const purseDepositUrl = new URL( - `purses/${pullIni.pursePub}/merge`, - pullIni.exchangeBaseUrl, - ); - logger.info(`querying purse status via ${purseDepositUrl.href}`); - const resp = await cancelableLongPoll(wex, purseDepositUrl); const ctx = new PeerPushCreditTransactionContext( wex, pullIni.peerPushCreditId, ); + const exchangeClient = new TalerExchangeHttpClient(pullIni.exchangeBaseUrl, wex.http) + // FIXME: long poll queue again ? + const resp = await exchangeClient.getPurseStatusAtMerge(pullIni.pursePub, wex.cancellationToken, { + timeout: 30000 + }); - logger.info(`purse status code: HTTP ${resp.status}`); - - switch (resp.status) { - case HttpStatusCode.Gone: { + switch (resp.case) { + case "ok": + break; + case HttpStatusCode.Gone: // Exchange says that purse doesn't exist anymore => expired! await ctx.transitionStatus( PeerPushCreditStatus.DialogProposed, PeerPushCreditStatus.Aborted, ); return TaskRunResult.finished(); - } case HttpStatusCode.NotFound: // FIXME: Maybe check error code? 404 could also mean something else. return TaskRunResult.longpollReturnedPending(); + default: + assertUnreachable(resp) } - const result = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), - ); - - logger.trace(`purse status: ${j2s(result)}`); - - const mergeTimestamp = result.merge_timestamp; - - if ( - mergeTimestamp != null && - !TalerProtocolTimestamp.isNever(mergeTimestamp) - ) { + if (isPurseMerged(resp.body)) { logger.info("purse completed by another wallet"); await ctx.transitionStatus( PeerPushCreditStatus.DialogProposed, diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -34,6 +34,7 @@ import { TalerError, TalerErrorCode, TalerErrorDetail, + TalerExchangeHttpClient, TalerPreciseTimestamp, TalerProtocolTimestamp, TalerProtocolViolationError, @@ -54,7 +55,6 @@ import { } from "@gnu-taler/taler-util"; import { HttpResponse, - readSuccessResponseJsonOrThrow, readTalerErrorResponse, } from "@gnu-taler/taler-util/http"; import { @@ -70,7 +70,6 @@ import { TransactionContext, TransitionResultType, cancelableFetch, - cancelableLongPoll, constructTaskIdentifier, runWithClientCancellation, spendCoins, @@ -92,9 +91,9 @@ import { getScopeForAllExchanges, } from "./exchanges.js"; import { - codecForExchangePurseStatus, getTotalPeerPaymentCost, getTotalPeerPaymentCostInTx, + isPurseMerged, queryCoinInfosForSelection, } from "./pay-peer-common.js"; import { createRefreshGroup } from "./refresh.js"; @@ -854,26 +853,24 @@ async function processPeerPushDebitCreateReserve( } // All batches done! + logger.info("ALL DONE") - const getPurseUrl = new URL( - `purses/${pursePub}/deposit`, - peerPushInitiation.exchangeBaseUrl, - ); - - const purseHttpResp = await cancelableFetch(wex, getPurseUrl); - - const purseStatus = await readSuccessResponseJsonOrThrow( - purseHttpResp, - codecForExchangePurseStatus(), - ); - - if (logger.shouldLogTrace()) { - logger.trace(`purse status: ${j2s(purseStatus)}`); + const exchangeClient = new TalerExchangeHttpClient(peerPushInitiation.exchangeBaseUrl, wex.http) + const resp = await exchangeClient.getPurseStatusAtDeposit(pursePub, wex.cancellationToken); + logger.info(j2s(resp)); + switch (resp.case) { + case "ok": + await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady) + return TaskRunResult.progress() + case HttpStatusCode.Gone: + // FIXME we need PeerPushDebitStatus.ExpiredDeletePurse + await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.AbortingDeletePurse) + return TaskRunResult.progress() + case HttpStatusCode.NotFound: + throw Error("peer push credit disappeared"); + default: + assertUnreachable(resp) } - - await ctx.transitionStatus(PeerPushDebitStatus.PendingCreatePurse, PeerPushDebitStatus.PendingReady) - - return TaskRunResult.backoff(); } async function processPeerPushDebitAbortingDeletePurse( @@ -951,69 +948,67 @@ async function processPeerPushDebitReady( logger.trace("processing peer-push-debit pending(ready)"); const pursePub = peerPushInitiation.pursePub; const ctx = new PeerPushDebitTransactionContext(wex, pursePub); - const mergeUrl = new URL( - `purses/${pursePub}/merge`, - peerPushInitiation.exchangeBaseUrl, - ); - logger.info(`long-polling on purse status at ${mergeUrl.href}`); - const resp = await cancelableLongPoll(wex, mergeUrl); - if (resp.status === HttpStatusCode.Ok) { - const purseStatus = await readSuccessResponseJsonOrThrow( - resp, - codecForExchangePurseStatus(), - ); - const mergeTimestamp = purseStatus.merge_timestamp; - logger.info(`got purse status ${j2s(purseStatus)}`); - if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) { - return TaskRunResult.backoff(); - } else { - await ctx.transitionStatus(PeerPushDebitStatus.PendingReady, PeerPushDebitStatus.Done); - return TaskRunResult.progress(); - } - } else if (resp.status === HttpStatusCode.Gone) { - logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); - await ctx.transition({ - extraStores: [ - "coinAvailability", - "coinHistory", - "coins", - "denominations", - "refreshGroups", - "refreshSessions", - ] - }, async (rec, tx) => { - if (rec.status !== PeerPushDebitStatus.PendingReady) { - return TransitionResultType.Stay; + const exchangeClient = new TalerExchangeHttpClient(peerPushInitiation.exchangeBaseUrl, wex.http) + // FIXME: long poll queue again ? + const resp = await exchangeClient.getPurseStatusAtMerge(pursePub, wex.cancellationToken, { + timeout: 30000 + }); + console.log(j2s("RESERVE")) + + switch (resp.case) { + case "ok": { + if (!isPurseMerged(resp.body)) { + return TaskRunResult.longpollReturnedPending(); + } else { + await ctx.transitionStatus(PeerPushDebitStatus.PendingReady, PeerPushDebitStatus.Done); + return TaskRunResult.progress(); } - const currency = Amounts.currencyOf(rec.amount); - const coinPubs: CoinRefreshRequest[] = []; - - if (rec.coinSel) { - for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { - coinPubs.push({ - amount: rec.coinSel.contributions[i], - coinPub: rec.coinSel.coinPubs[i], - }); + } + case HttpStatusCode.Gone: + logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`); + await ctx.transition({ + extraStores: [ + "coinAvailability", + "coinHistory", + "coins", + "denominations", + "refreshGroups", + "refreshSessions", + ] + }, async (rec, tx) => { + if (rec.status !== PeerPushDebitStatus.PendingReady) { + return TransitionResultType.Stay; } + const currency = Amounts.currencyOf(rec.amount); + const coinPubs: CoinRefreshRequest[] = []; + + if (rec.coinSel) { + for (let i = 0; i < rec.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: rec.coinSel.contributions[i], + coinPub: rec.coinSel.coinPubs[i], + }); + } - const refresh = await createRefreshGroup( - wex, - tx, - currency, - coinPubs, - RefreshReason.AbortPeerPushDebit, - ctx.transactionId, - ); + const refresh = await createRefreshGroup( + wex, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + ctx.transactionId, + ); - rec.abortRefreshGroupId = refresh.refreshGroupId; - } - rec.status = PeerPushDebitStatus.Aborted; - return TransitionResultType.Transition - }) - return TaskRunResult.backoff(); - } else { - logger.warn(`unexpected HTTP status for purse: ${resp.status}`); - return TaskRunResult.longpollReturnedPending(); + rec.abortRefreshGroupId = refresh.refreshGroupId; + } + rec.status = PeerPushDebitStatus.Aborted; + return TransitionResultType.Transition + }) + return TaskRunResult.backoff(); + case HttpStatusCode.NotFound: + throw Error("peer push credit disappeared"); + default: + assertUnreachable(resp) } }