summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-06-05 18:38:17 +0200
committerFlorian Dold <florian@dold.me>2023-06-05 18:38:17 +0200
commitda927b5e48453b5bddb56944f7073619f693f526 (patch)
tree1aa84cfad016bbe665715e1ef7171dd9a3d586b5
parentbdb67c83a9d0244ba58e22f4811736722bbcb659 (diff)
downloadwallet-core-da927b5e48453b5bddb56944f7073619f693f526.tar.gz
wallet-core-da927b5e48453b5bddb56944f7073619f693f526.tar.bz2
wallet-core-da927b5e48453b5bddb56944f7073619f693f526.zip
wallet-core: handle Gone in peer-pull-debit
-rw-r--r--packages/taler-wallet-core/src/db.ts2
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts7
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts4
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts6
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts149
-rw-r--r--packages/taler-wallet-core/src/operations/recoup.ts17
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts6
-rw-r--r--packages/taler-wallet-core/src/wallet.ts30
8 files changed, 150 insertions, 71 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index d64d1fbc6..a5db49649 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -2052,6 +2052,8 @@ export interface PeerPullPaymentIncomingRecord {
*/
totalCostEstimated: AmountString;
+ abortRefreshGroupId?: string;
+
coinSel?: PeerPullPaymentCoinSelection;
}
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index 760f40d6c..8dc83c65a 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -134,13 +134,6 @@ export interface RecoupOperations {
exchangeBaseUrl: string,
coinPubs: string[],
): Promise<string>;
- processRecoupGroup(
- ws: InternalWalletState,
- recoupGroupId: string,
- options?: {
- forceNow?: boolean;
- },
- ): Promise<void>;
}
export type NotificationListener = (n: WalletNotification) => void;
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index 142e0cf03..29d2451e6 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -863,9 +863,7 @@ export async function updateExchangeFromUrlHandler(
if (recoupGroupId) {
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
- ws.recoupOps.processRecoupGroup(ws, recoupGroupId).catch((e) => {
- logger.error("error while recouping coins:", e);
- });
+ ws.workAvailable.trigger();
}
if (!updated) {
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index e9c34cf73..a85df66d2 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -28,6 +28,7 @@ import {
Logger,
TalerErrorCode,
TalerPreciseTimestamp,
+ TalerUriAction,
TransactionAction,
TransactionMajorState,
TransactionMinorState,
@@ -37,11 +38,11 @@ import {
WalletKycUuid,
codecForAny,
codecForWalletKycUuid,
- constructPayPullUri,
encodeCrock,
getRandomBytes,
j2s,
makeErrorDetail,
+ stringifyTalerUri,
} from "@gnu-taler/taler-util";
import {
readSuccessResponseJsonOrErrorCode,
@@ -741,7 +742,8 @@ export async function initiatePeerPullPayment(
});
return {
- talerUri: constructPayPullUri({
+ talerUri: stringifyTalerUri({
+ type: TalerUriAction.PayPull,
exchangeBaseUrl: exchangeBaseUrl,
contractPriv: contractKeyPair.priv,
}),
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index 212d69eea..2be21c68d 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -17,8 +17,10 @@
import {
AcceptPeerPullPaymentResponse,
Amounts,
+ CoinRefreshRequest,
ConfirmPeerPullDebitRequest,
ExchangePurseDeposits,
+ HttpStatusCode,
Logger,
PeerContractTerms,
PreparePeerPullDebitRequest,
@@ -48,6 +50,8 @@ import {
PeerPullDebitRecordStatus,
PeerPullPaymentIncomingRecord,
PendingTaskType,
+ RefreshOperationStatus,
+ createRefreshGroup,
} from "../index.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
@@ -68,6 +72,7 @@ import {
notifyTransition,
stopLongpolling,
} from "./transactions.js";
+import { checkLogicInvariant } from "../util/invariants.js";
const logger = new Logger("pay-peer-pull-debit.ts");
@@ -104,24 +109,89 @@ async function processPeerPullDebitPendingDeposit(
logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
}
- const httpResp = await ws.http.postJson(purseDepositUrl.href, depositPayload);
- const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
- logger.trace(`purse deposit response: ${j2s(resp)}`);
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullPaymentIncomingId,
+ });
- await ws.db
- .mktx((x) => [x.peerPullPaymentIncoming])
- .runReadWrite(async (tx) => {
- const pi = await tx.peerPullPaymentIncoming.get(
- peerPullPaymentIncomingId,
- );
- if (!pi) {
- throw Error("peer pull payment not found anymore");
- }
- if (pi.status === PeerPullDebitRecordStatus.PendingDeposit) {
+ const httpResp = await ws.http.fetch(purseDepositUrl.href, {
+ method: "POST",
+ body: depositPayload,
+ });
+ if (httpResp.status === HttpStatusCode.Gone) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [
+ x.peerPullPaymentIncoming,
+ x.refreshGroups,
+ x.denominations,
+ x.coinAvailability,
+ x.coins,
+ ])
+ .runReadWrite(async (tx) => {
+ const pi = await tx.peerPullPaymentIncoming.get(
+ peerPullPaymentIncomingId,
+ );
+ if (!pi) {
+ throw Error("peer pull payment not found anymore");
+ }
+ if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return;
+ }
+ const oldTxState = computePeerPullDebitTransactionState(pi);
+
+ const currency = Amounts.currencyOf(pi.totalCostEstimated);
+ const coinPubs: CoinRefreshRequest[] = [];
+
+ if (!pi.coinSel) {
+ throw Error("invalid db state");
+ }
+
+ for (let i = 0; i < pi.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: pi.coinSel.contributions[i],
+ coinPub: pi.coinSel.coinPubs[i],
+ });
+ }
+
+ const refresh = await createRefreshGroup(
+ ws,
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ );
+
+ pi.status = PeerPullDebitRecordStatus.AbortingRefresh;
+ pi.abortRefreshGroupId = refresh.refreshGroupId;
+ const newTxState = computePeerPullDebitTransactionState(pi);
+ await tx.peerPullPaymentIncoming.put(pi);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ } else {
+ const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
+ logger.trace(`purse deposit response: ${j2s(resp)}`);
+
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.peerPullPaymentIncoming])
+ .runReadWrite(async (tx) => {
+ const pi = await tx.peerPullPaymentIncoming.get(
+ peerPullPaymentIncomingId,
+ );
+ if (!pi) {
+ throw Error("peer pull payment not found anymore");
+ }
+ if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return;
+ }
+ const oldTxState = computePeerPullDebitTransactionState(pi);
pi.status = PeerPullDebitRecordStatus.DonePaid;
- }
- await tx.peerPullPaymentIncoming.put(pi);
- });
+ const newTxState = computePeerPullDebitTransactionState(pi);
+ await tx.peerPullPaymentIncoming.put(pi);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ }
return {
type: OperationAttemptResultType.Finished,
@@ -133,7 +203,50 @@ async function processPeerPullDebitAbortingRefresh(
ws: InternalWalletState,
peerPullInc: PeerPullPaymentIncomingRecord,
): Promise<OperationAttemptResult> {
- throw Error("not implemented");
+ const peerPullPaymentIncomingId = peerPullInc.peerPullPaymentIncomingId;
+ const abortRefreshGroupId = peerPullInc.abortRefreshGroupId;
+ checkLogicInvariant(!!abortRefreshGroupId);
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullPaymentIncomingId,
+ });
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.refreshGroups, x.peerPullPaymentIncoming])
+ .runReadWrite(async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
+ let newOpState: PeerPullDebitRecordStatus | undefined;
+ if (!refreshGroup) {
+ // Maybe it got manually deleted? Means that we should
+ // just go into failed.
+ logger.warn("no aborting refresh group found for deposit group");
+ newOpState = PeerPullDebitRecordStatus.Failed;
+ } else {
+ if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
+ newOpState = PeerPullDebitRecordStatus.Aborted;
+ } else if (
+ refreshGroup.operationStatus === RefreshOperationStatus.Failed
+ ) {
+ newOpState = PeerPullDebitRecordStatus.Failed;
+ }
+ }
+ if (newOpState) {
+ const newDg = await tx.peerPullPaymentIncoming.get(
+ peerPullPaymentIncomingId,
+ );
+ if (!newDg) {
+ return;
+ }
+ const oldTxState = computePeerPullDebitTransactionState(newDg);
+ newDg.status = newOpState;
+ const newTxState = computePeerPullDebitTransactionState(newDg);
+ await tx.peerPullPaymentIncoming.put(newDg);
+ return { oldTxState, newTxState };
+ }
+ return undefined;
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ // FIXME: Shouldn't this be finished in some cases?!
+ return OperationAttemptResult.pendingEmpty();
}
export async function processPeerPullDebit(
@@ -158,7 +271,7 @@ export async function processPeerPullDebit(
return {
type: OperationAttemptResultType.Finished,
result: undefined,
- }
+ };
}
export async function confirmPeerPullDebit(
diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts
index 1f36117ee..fcb7d6c98 100644
--- a/packages/taler-wallet-core/src/operations/recoup.ts
+++ b/packages/taler-wallet-core/src/operations/recoup.ts
@@ -304,24 +304,7 @@ async function recoupRefreshCoin(
export async function processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- await unwrapOperationHandlerResultOrThrow(
- await processRecoupGroupHandler(ws, recoupGroupId, options),
- );
- return;
-}
-
-export async function processRecoupGroupHandler(
- ws: InternalWalletState,
- recoupGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
): Promise<OperationAttemptResult> {
- const forceNow = options.forceNow ?? false;
let recoupGroup = await ws.db
.mktx((x) => [x.recoupGroups])
.runReadOnly(async (tx) => {
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 4801a67ee..7db6dcd2a 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -1273,7 +1273,6 @@ export interface WithdrawalGroupContext {
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
- options: {} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
@@ -1303,9 +1302,8 @@ export async function processWithdrawalGroup(
switch (withdrawalGroup.status) {
case WithdrawalGroupStatus.PendingRegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
- return await processWithdrawalGroup(ws, withdrawalGroupId, {
- forceNow: true,
- });
+ // FIXME: This will get called by the main task loop, why call it here?!
+ return await processWithdrawalGroup(ws, withdrawalGroupId);
case WithdrawalGroupStatus.PendingQueryingStatus: {
runLongpollAsync(ws, retryTag, (ct) => {
return queryReserve(ws, withdrawalGroupId, ct);
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 89e1bf383..5277916de 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -219,9 +219,7 @@ import {
} from "./operations/pay-peer-push-debit.js";
import { getPendingOperations } from "./operations/pending.js";
import {
- createRecoupGroup,
- processRecoupGroup,
- processRecoupGroupHandler,
+ createRecoupGroup, processRecoupGroup,
} from "./operations/recoup.js";
import {
autoRefresh,
@@ -295,27 +293,20 @@ const logger = new Logger("wallet.ts");
async function callOperationHandler(
ws: InternalWalletState,
pending: PendingTaskInfo,
- forceNow = false,
): Promise<OperationAttemptResult> {
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
- return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, {
- forceNow,
- });
+ return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
case PendingTaskType.Refresh:
return await processRefreshGroup(ws, pending.refreshGroupId);
case PendingTaskType.Withdraw:
- return await processWithdrawalGroup(ws, pending.withdrawalGroupId, {
- forceNow,
- });
+ return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
case PendingTaskType.TipPickup:
return await processTip(ws, pending.tipId);
case PendingTaskType.Purchase:
return await processPurchase(ws, pending.proposalId);
case PendingTaskType.Recoup:
- return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
- forceNow,
- });
+ return await processRecoupGroup(ws, pending.recoupGroupId);
case PendingTaskType.ExchangeCheckRefresh:
return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
@@ -342,16 +333,15 @@ async function callOperationHandler(
*/
export async function runPending(
ws: InternalWalletState,
- forceNow = false,
): Promise<void> {
const pendingOpsResponse = await getPendingOperations(ws);
for (const p of pendingOpsResponse.pendingOperations) {
- if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
+ if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
await runOperationWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
- return await callOperationHandler(ws, p, forceNow);
+ return await callOperationHandler(ws, p);
});
}
}
@@ -1168,7 +1158,8 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return getContractTermsDetails(ws, req.proposalId);
}
case WalletApiOperation.RetryPendingNow: {
- await runPending(ws, true);
+ // FIXME: Should we reset all operation retries here?
+ await runPending(ws);
return {};
}
case WalletApiOperation.PreparePayForUri: {
@@ -1624,8 +1615,8 @@ export class Wallet {
this.ws.stop();
}
- runPending(forceNow = false): Promise<void> {
- return runPending(this.ws, forceNow);
+ runPending(): Promise<void> {
+ return runPending(this.ws);
}
runTaskLoop(opts?: RetryLoopOpts): Promise<TaskLoopResult> {
@@ -1673,7 +1664,6 @@ class InternalWalletStateImpl implements InternalWalletState {
recoupOps: RecoupOperations = {
createRecoupGroup,
- processRecoupGroup,
};
merchantOps: MerchantOperations = {