commit 2d93886a7506296470f7328b5c3742133b17c33f
parent 3305c83762e324a008f1b6b09796fe61d7fe0e9a
Author: Antoine A <>
Date: Thu, 17 Apr 2025 11:52:57 +0200
wallet-core: improve pay-peer-pull
Diffstat:
3 files changed, 133 insertions(+), 25 deletions(-)
diff --git a/packages/taler-harness/src/integrationtests/test-peer-pull.ts b/packages/taler-harness/src/integrationtests/test-peer-pull.ts
@@ -138,12 +138,18 @@ export async function runPeerPullTest(t: GlobalTestState) {
t.logStep("P2P pull errors");
{
const tx = await init_peer_pull_credit(wallet1, "confirm", "TESTKUDOS:1000");
-
- const e = await t.assertThrowsTalerErrorAsync(wallet1.call(
+ const insufficient_balance = await t.assertThrowsTalerErrorAsync(wallet1.call(
WalletApiOperation.PreparePeerPullDebit,
{ talerUri: tx.talerUri! }
));
- t.assertTrue(e.errorDetail.code === TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE);
+ t.assertTrue(insufficient_balance.errorDetail.code === TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE);
+
+ const unknown_purse = await t.assertThrowsTalerErrorAsync(wallet1.call(
+ WalletApiOperation.PreparePeerPullDebit,
+ { talerUri: "taler+http://pay-pull/localhost:8081/MQP1DP1J94ZZWNQS7TRDF1KJZ7V8H74CZF41V90FKXBPN5GNRN6G" }
+ ));
+ // FIXME this should fail with a proper error code
+ t.assertTrue(unknown_purse.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR);
}
t.logStep("P2P pull confirm");
@@ -200,15 +206,24 @@ export async function runPeerPullTest(t: GlobalTestState) {
major: TransactionMajorState.Aborted,
},
}),
- // FIXME should be aborted
wallet4.call(WalletApiOperation.TestingWaitTransactionState, {
transactionId: prepare4.transactionId,
txState: {
- major: TransactionMajorState.Dialog,
- minor: TransactionMinorState.Proposed
+ major: TransactionMajorState.Aborted,
},
}),
]);
+
+ const prepare1 = await wallet1.call(
+ WalletApiOperation.PreparePeerPullDebit,
+ { talerUri: tx.talerUri! }
+ );
+ await wallet1.call(WalletApiOperation.TestingWaitTransactionState, {
+ transactionId: prepare1.transactionId,
+ txState: {
+ major: TransactionMajorState.Aborted,
+ },
+ });
}
t.logStep("P2P pull self");
@@ -235,6 +250,12 @@ export async function runPeerPullTest(t: GlobalTestState) {
},
}),
]);
+
+ // Check scan after completion
+ const idempotent = await wallet1.call(WalletApiOperation.PreparePeerPullDebit,
+ { talerUri: tx.talerUri! }
+ );
+ t.assertTrue(prepare.transactionId === idempotent.transactionId);
}
t.logStep("P2P pull conflict");
@@ -305,18 +326,23 @@ export async function runPeerPullTest(t: GlobalTestState) {
major: TransactionMajorState.Aborted,
},
}),
- // FIXME should be aborted
wallet3.call(WalletApiOperation.TestingWaitTransactionState, {
transactionId: prepare3.transactionId,
txState: {
- major: TransactionMajorState.Dialog,
- minor: TransactionMinorState.Proposed
+ major: TransactionMajorState.Aborted
},
}),
]);
+
+ const unknown_contract = await t.assertThrowsTalerErrorAsync(wallet1.call(
+ WalletApiOperation.PreparePeerPullDebit,
+ { talerUri: tx.talerUri! }
+ ));
+ // FIXME this should fail with a proper error code
+ t.assertTrue(unknown_contract.errorDetail.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR);
}
- t.logStep("P2P push expire");
+ t.logStep("P2P pull expire");
{
const tx = await init_peer_pull_credit(wallet1, "expire");
@@ -360,15 +386,19 @@ export async function runPeerPullTest(t: GlobalTestState) {
major: TransactionMajorState.Aborted,
},
}),
- // FIXME should be aborted
wallet3.call(WalletApiOperation.TestingWaitTransactionState, {
transactionId: prepare3.transactionId,
txState: {
- major: TransactionMajorState.Dialog,
- minor: TransactionMinorState.Proposed
+ major: TransactionMajorState.Aborted,
},
}),
]);
+
+ const gone = await t.assertThrowsTalerErrorAsync(wallet1.call(
+ WalletApiOperation.PreparePeerPullDebit,
+ { talerUri: tx.talerUri! }
+ ));
+ t.assertTrue(gone.errorDetail.code === TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -616,7 +616,7 @@ async function queryPurseForPeerPullCredit(
case HttpStatusCode.Gone: {
// Exchange says that purse doesn't exist anymore => expired!
await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.Expired);
- return TaskRunResult.backoff();
+ return TaskRunResult.finished();
}
case HttpStatusCode.NotFound:
// FIXME: Maybe check error code? 404 could also mean something else.
@@ -634,7 +634,7 @@ async function queryPurseForPeerPullCredit(
if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) {
logger.info("purse not ready yet (no deposit)");
- return TaskRunResult.backoff();
+ return TaskRunResult.longpollReturnedPending();
}
const reserve = await wex.db.runReadOnlyTx(
@@ -663,7 +663,7 @@ async function queryPurseForPeerPullCredit(
},
});
await ctx.transitionStatus(PeerPullPaymentCreditStatus.PendingReady, PeerPullPaymentCreditStatus.PendingWithdrawing);
- return TaskRunResult.backoff();
+ return TaskRunResult.progress();
}
async function longpollKycStatus(
@@ -956,12 +956,10 @@ export async function processPeerPullCredit(
case PeerPullPaymentCreditStatus.SuspendedReady:
case PeerPullPaymentCreditStatus.SuspendedWithdrawing:
case PeerPullPaymentCreditStatus.SuspendedBalanceKycInit:
- break;
+ return TaskRunResult.finished();
default:
assertUnreachable(pullIni.status);
}
-
- return TaskRunResult.finished();
}
async function processPeerPullCreditBalanceKyc(
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -40,6 +40,7 @@ import {
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
+ TalerProtocolTimestamp,
TalerProtocolViolationError,
Transaction,
TransactionAction,
@@ -515,6 +516,58 @@ async function handlePurseCreationConflict(
return TaskRunResult.backoff();
}
+async function processPeerPullDebitDialogProposed(
+ wex: WalletExecutionContext,
+ pullIni: PeerPullPaymentIncomingRecord,
+): Promise<TaskRunResult> {
+ const purseDepositUrl = new URL(
+ `purses/${pullIni.pursePub}/deposit`,
+ pullIni.exchangeBaseUrl,
+ );
+ logger.info(`querying purse status via ${purseDepositUrl.href}`);
+ const resp = await wex.ws.runLongpollQueueing(
+ wex.cancellationToken,
+ purseDepositUrl.hostname,
+ async (timeoutMs) => {
+ purseDepositUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
+ return await wex.http.fetch(purseDepositUrl.href, {
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+ const ctx = new PeerPullDebitTransactionContext(wex, pullIni.peerPullDebitId);
+
+ logger.info(`purse status code: HTTP ${resp.status}`);
+
+ switch (resp.status) {
+ case HttpStatusCode.Gone: {
+ // Exchange says that purse doesn't exist anymore => expired!
+ await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
+ return TaskRunResult.finished();
+ }
+ case HttpStatusCode.NotFound:
+ // FIXME: Maybe check error code? 404 could also mean something else.
+ return TaskRunResult.longpollReturnedPending();
+ }
+
+ const result = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForExchangePurseStatus(),
+ );
+
+ logger.trace(`purse status: ${j2s(result)}`);
+
+ const depositTimestamp = result.deposit_timestamp;
+
+ if (depositTimestamp != null && !TalerProtocolTimestamp.isNever(depositTimestamp)) {
+ logger.info("purse completed by another wallet");
+ await ctx.transitionStatus(PeerPullDebitRecordStatus.DialogProposed, PeerPullDebitRecordStatus.Aborted);
+ return TaskRunResult.finished();
+ }
+
+ return TaskRunResult.longpollReturnedPending();
+}
+
async function processPeerPullDebitPendingDeposit(
wex: WalletExecutionContext,
peerPullInc: PeerPullPaymentIncomingRecord,
@@ -723,12 +776,21 @@ export async function processPeerPullDebit(
}
switch (peerPullInc.status) {
+ case PeerPullDebitRecordStatus.DialogProposed:
+ return processPeerPullDebitDialogProposed(wex, peerPullInc)
case PeerPullDebitRecordStatus.PendingDeposit:
- return await processPeerPullDebitPendingDeposit(wex, peerPullInc);
+ return processPeerPullDebitPendingDeposit(wex, peerPullInc);
case PeerPullDebitRecordStatus.AbortingRefresh:
- return await processPeerPullDebitAbortingRefresh(wex, peerPullInc);
+ return processPeerPullDebitAbortingRefresh(wex, peerPullInc);
+ case PeerPullDebitRecordStatus.Done:
+ case PeerPullDebitRecordStatus.Aborted:
+ case PeerPullDebitRecordStatus.Failed:
+ case PeerPullDebitRecordStatus.SuspendedAbortingRefresh:
+ case PeerPullDebitRecordStatus.SuspendedDeposit:
+ return TaskRunResult.finished();
+ default:
+ assertUnreachable(peerPullInc.status);
}
- return TaskRunResult.finished();
}
export async function confirmPeerPullDebit(
@@ -816,6 +878,7 @@ export async function confirmPeerPullDebit(
rec.status = PeerPullDebitRecordStatus.PendingDeposit;
return TransitionResultType.Transition
})
+ wex.taskScheduler.stopShepherdTask(ctx.taskId);
wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
@@ -918,11 +981,20 @@ export async function preparePeerPullDebit(
const purseHttpResp = await wex.http.fetch(getPurseUrl.href);
+ if (purseHttpResp.status == HttpStatusCode.Gone) {
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_PEER_PULL_DEBIT_PURSE_GONE,
+ {},
+ );
+ }
+
const purseStatus = await readSuccessResponseJsonOrThrow(
purseHttpResp,
codecForExchangePurseStatus(),
);
+ // FIXME: throw if already completed
+
const peerPullDebitId = encodeCrock(getRandomBytes(32));
let contractTerms: PeerContractTerms;
@@ -973,14 +1045,14 @@ export async function preparePeerPullDebit(
const currency = Amounts.currencyOf(totalAmount);
const ctx = new PeerPullDebitTransactionContext(wex, peerPullDebitId);
- await wex.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPullDebit", "contractTerms", "transactionsMeta"] },
async (tx) => {
await tx.contractTerms.put({
h: contractTermsHash,
contractTermsRaw: contractTerms,
});
- await tx.peerPullDebit.add({
+ const record = {
peerPullDebitId,
contractPriv: contractPriv,
exchangeBaseUrl: exchangeBaseUrl,
@@ -990,10 +1062,18 @@ export async function preparePeerPullDebit(
amount: contractTerms.amount,
status: PeerPullDebitRecordStatus.DialogProposed,
totalCostEstimated: Amounts.stringify(totalAmount),
- });
+ }
+ await tx.peerPullDebit.add(record);
await ctx.updateTransactionMeta(tx);
+ const oldTxState: TransactionState = {
+ major: TransactionMajorState.None,
+ };
+ const newTxState = computePeerPullDebitTransactionState(record);
+ return { oldTxState, newTxState, balanceEffect: BalanceEffect.Any };
},
);
+ notifyTransition(wex, ctx.transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(ctx.taskId)
const scopeInfo = await wex.db.runAllStoresReadOnlyTx({}, (tx) => {
return getExchangeScopeInfo(tx, exchangeBaseUrl, currency);