summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/taler-harness/src/integrationtests/test-payment-share.ts36
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts38
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts171
3 files changed, 123 insertions, 122 deletions
diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts b/packages/taler-harness/src/integrationtests/test-payment-share.ts
index ec7391586..4a7f853da 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-share.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts
@@ -100,10 +100,11 @@ export async function runPaymentShareTest(t: GlobalTestState) {
t.logStep("orders-created");
/**
- * FIRST CASE, create in first wallet and pay in the second wallet
- * first wallet should not be able to continue
+ * Case 1:
+ * - Claim with first wallet and pay in the second wallet.
+ * - First wallet should be notified.
*/
- {
+ if (0) {
const order = await createOrder("TESTKUDOS:5");
// Claim the order with the first wallet
const claimFirstWallet = await firstWallet.call(
@@ -168,10 +169,10 @@ export async function runPaymentShareTest(t: GlobalTestState) {
}
t.logStep("wait-for-payment");
- firstWallet.waitForNotificationCond(n =>
- n.type === NotificationType.TransactionStateTransition &&
- n.transactionId === claimFirstWallet.transactionId
- )
+ // firstWallet.waitForNotificationCond(n =>
+ // n.type === NotificationType.TransactionStateTransition &&
+ // n.transactionId === claimFirstWallet.transactionId
+ // )
// Claim the order with the first wallet
const claimFirstWalletAgain = await firstWallet.call(
WalletApiOperation.PreparePayForUri,
@@ -210,8 +211,9 @@ export async function runPaymentShareTest(t: GlobalTestState) {
t.logStep("first-case-done");
/**
- * SECOND CASE, create in first wallet and share to the second wallet
- * pay with the first wallet, second wallet should not be able to continue
+ * Case 2:
+ * - Claim with first wallet and share with the second wallet
+ * - Pay with the first wallet, second wallet should be notified
*/
{
const order = await createOrder("TESTKUDOS:3");
@@ -225,6 +227,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
claimFirstWallet.status === PreparePayResultType.PaymentPossible,
);
+ t.logStep("case2-w1-claimed");
+
// share order from the first wallet
const { privatePayUri } = await firstWallet.call(
WalletApiOperation.SharePayment,
@@ -234,17 +238,21 @@ export async function runPaymentShareTest(t: GlobalTestState) {
},
);
+ t.logStep("case2-w1-shared");
+
// claim from the second wallet
const claimSecondWallet = await secondWallet.call(
WalletApiOperation.PreparePayForUri,
{ talerPayUri: privatePayUri },
);
+ t.logStep("case2-w2-prepared");
+
t.assertTrue(
claimSecondWallet.status === PreparePayResultType.PaymentPossible,
);
- // pay from the second wallet
+ // pay from the first wallet
const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
transactionId: claimFirstWallet.transactionId,
});
@@ -263,10 +271,10 @@ export async function runPaymentShareTest(t: GlobalTestState) {
t.assertAmountEquals(bal2.balances[0].available, "TESTKUDOS:14.23");
t.logStep("wait-for-payment");
- secondWallet.waitForNotificationCond(n =>
- n.type === NotificationType.TransactionStateTransition &&
- n.transactionId === claimSecondWallet.transactionId
- )
+ // secondWallet.waitForNotificationCond(n =>
+ // n.type === NotificationType.TransactionStateTransition &&
+ // n.transactionId === claimSecondWallet.transactionId
+ // )
// Claim the order with the first wallet
const claimSecondWalletAgain = await secondWallet.call(
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index 8eff7e17b..be3f7f106 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -880,7 +880,11 @@ async function createOrReusePurchase(
);
if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
const download = await expectProposalDownload(wex, oldProposal);
- const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, false);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ wex,
+ download.contractData,
+ false,
+ );
logger.info(`old proposal paid: ${paid}`);
if (paid) {
// if this transaction was shared and the order is paid then it
@@ -1912,6 +1916,11 @@ export async function confirmPay(
hintTransactionId: transactionId,
});
+ const ctx = new PayMerchantTransactionContext(wex, proposalId);
+
+ // In case we're sharing the payment and we're long-polling
+ wex.taskScheduler.stopShepherdTask(ctx.taskId);
+
// Wait until we have completed the first attempt to pay.
return waitPaymentResult(wex, proposalId);
}
@@ -2011,7 +2020,11 @@ async function processPurchasePay(
const download = await expectProposalDownload(wex, purchase);
if (purchase.shared) {
- const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, false);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ wex,
+ download.contractData,
+ false,
+ );
if (paid) {
const transitionInfo = await wex.db.runReadWriteTx(
@@ -2463,17 +2476,24 @@ export async function sharePayment(
// FIXME: purchase can be shared before being paid
return undefined;
}
+ const oldTxState = computePayMerchantTransactionState(p);
if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
p.purchaseStatus = PurchaseStatus.DialogShared;
p.shared = true;
tx.purchases.put(p);
}
+ const newTxState = computePayMerchantTransactionState(p);
+
return {
proposalId: p.proposalId,
nonce: p.noncePriv,
session: p.lastSessionId ?? p.downloadSessionId,
token: p.claimToken,
+ transitionInfo: {
+ oldTxState,
+ newTxState,
+ },
};
});
@@ -2481,8 +2501,11 @@ export async function sharePayment(
throw Error("This purchase can't be shared");
}
- // schedule a task to watch for the status
const ctx = new PayMerchantTransactionContext(wex, result.proposalId);
+
+ notifyTransition(wex, ctx.transactionId, result.transitionInfo);
+
+ // schedule a task to watch for the status
wex.taskScheduler.startShepherdTask(ctx.taskId);
const privatePayUri = stringifyPayUri({
@@ -2514,6 +2537,7 @@ async function checkIfOrderIsAlreadyPaid(
const resp = await wex.http.fetch(requestUrl.href, {
cancellationToken: wex.cancellationToken,
});
+
if (
resp.status === HttpStatusCode.Ok ||
resp.status === HttpStatusCode.Accepted ||
@@ -2539,7 +2563,11 @@ async function processPurchaseDialogShared(
return TaskRunResult.finished();
}
- const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, true);
+ const paid = await checkIfOrderIsAlreadyPaid(
+ wex,
+ download.contractData,
+ true,
+ );
if (paid) {
const transitionInfo = await wex.db.runReadWriteTx(
["purchases"],
@@ -2551,7 +2579,7 @@ async function processPurchaseDialogShared(
}
const oldTxState = computePayMerchantTransactionState(p);
p.purchaseStatus = PurchaseStatus.FailedClaim;
- p.paidByOther = true
+ p.paidByOther = true;
const newTxState = computePayMerchantTransactionState(p);
await tx.purchases.put(p);
return { oldTxState, newTxState };
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index db090c352..0544288ba 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -28,7 +28,6 @@ import {
ObservabilityContext,
ObservabilityEventType,
RetryLoopOpts,
- TalerError,
TalerErrorCode,
TalerErrorDetail,
TaskThrottler,
@@ -37,6 +36,7 @@ import {
TransactionType,
WalletNotification,
assertUnreachable,
+ getErrorDetailFromException,
j2s,
makeErrorDetail,
} from "@gnu-taler/taler-util";
@@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
+ OperationRetryRecord,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
@@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
}
getActiveTasks(): TaskIdStr[] {
- return [...this.sheps.keys()]
+ return [...this.sheps.keys()];
}
ensureRunning(): void {
@@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler {
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
- const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
- return await callOperationHandlerForTaskId(wex, taskId);
- });
- const retryRecord = await this.ws.db.runReadOnlyTx(
- ["operationRetries"],
- async (tx) => {
- return tx.operationRetries.get(taskId);
+ const res = await runTaskWithErrorReporting(
+ this.ws,
+ taskId,
+ info,
+ async () => {
+ return await callOperationHandlerForTaskId(wex, taskId);
},
);
+ if (info.cts.token.isCancelled) {
+ logger.info("task cancelled, not processing result");
+ return;
+ }
+ if (this.ws.stopped) {
+ logger.info("wallet stopped, not processing result");
+ }
wex.oc.observe({
type: ObservabilityEventType.ShepherdTaskResult,
resultType: res.type,
@@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler {
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(`Shepherd for ${taskId} got error result.`);
- if (retryRecord) {
- let delay: Duration;
- const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
- logger.trace(`Waiting for ${delay.d_ms} ms`);
- await this.wait(taskId, info, delay);
- } else {
- logger.trace("Retrying immediately.");
- }
+ const retryRecord = await storePendingTaskError(
+ this.ws,
+ taskId,
+ res.errorDetail,
+ );
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
- if (retryRecord) {
- let delay: Duration;
- const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
- logger.trace(`Waiting for ${delay.d_ms} ms`);
- await this.wait(taskId, info, delay);
- } else {
- logger.trace("Retrying immediately.");
- }
+ const retryRecord = await storePendingTaskPending(this.ws, taskId);
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Progress: {
logger.trace(
`Shepherd for ${taskId} got progress result, re-running immediately.`,
);
+ await storeTaskProgress(this.ws, taskId);
break;
}
case TaskRunResultType.ScheduleLater:
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+ await storeTaskProgress(this.ws, taskId);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
+ await storePendingTaskFinished(this.ws, taskId);
return;
case TaskRunResultType.LongpollReturnedPending: {
+ await storeTaskProgress(this.ws, taskId);
// Make sure that we are waiting a bit if long-polling returned too early.
const endTime = AbsoluteTime.now();
const taskDuration = AbsoluteTime.difference(endTime, startTime);
@@ -425,9 +434,9 @@ async function storePendingTaskError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
-): Promise<void> {
+): Promise<OperationRetryRecord> {
logger.info(`storing pending task error for ${pendingTaskId}`);
- const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+ const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
@@ -440,11 +449,15 @@ async function storePendingTaskError(
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
- return taskToRetryNotification(ws, tx, pendingTaskId, e);
+ return {
+ notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
+ retryRecord,
+ };
});
- if (maybeNotification) {
- ws.notify(maybeNotification);
+ if (res?.notification) {
+ ws.notify(res.notification);
}
+ return res.retryRecord;
}
/**
@@ -462,8 +475,8 @@ async function storeTaskProgress(
async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
+): Promise<OperationRetryRecord> {
+ const res = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
@@ -479,15 +492,24 @@ async function storePendingTaskPending(
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
+ let notification: WalletNotification | undefined = undefined;
if (hadError) {
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- } else {
- return undefined;
+ notification = await taskToRetryNotification(
+ ws,
+ tx,
+ pendingTaskId,
+ undefined,
+ );
}
+ return {
+ notification,
+ retryRecord,
+ };
});
- if (maybeNotification) {
- ws.notify(maybeNotification);
+ if (res.notification) {
+ ws.notify(res.notification);
}
+ return res.retryRecord;
}
async function storePendingTaskFinished(
@@ -502,33 +524,11 @@ async function storePendingTaskFinished(
async function runTaskWithErrorReporting(
ws: InternalWalletState,
opId: TaskIdStr,
+ info: ShepherdInfo,
f: () => Promise<TaskRunResult>,
): Promise<TaskRunResult> {
- let maybeError: TalerErrorDetail | undefined;
try {
- const resp = await f();
- switch (resp.type) {
- case TaskRunResultType.Error:
- await storePendingTaskError(ws, opId, resp.errorDetail);
- return resp;
- case TaskRunResultType.Finished:
- await storePendingTaskFinished(ws, opId);
- return resp;
- case TaskRunResultType.Backoff:
- await storePendingTaskPending(ws, opId);
- return resp;
- case TaskRunResultType.ScheduleLater:
- // Task succeeded but wants to be run again.
- await storeTaskProgress(ws, opId);
- return resp;
- case TaskRunResultType.Progress:
- await storeTaskProgress(ws, opId);
- return resp;
- case TaskRunResultType.LongpollReturnedPending:
- // Longpoll should be run again immediately.
- await storeTaskProgress(ws, opId);
- return resp;
- }
+ return await f();
} catch (e) {
if (e instanceof CryptoApiStoppedError) {
if (ws.stopped) {
@@ -543,46 +543,11 @@ async function runTaskWithErrorReporting(
};
}
}
- if (e instanceof TalerError) {
- logger.warn("operation processed resulted in error");
- logger.warn(`error was: ${j2s(e.errorDetail)}`);
- maybeError = e.errorDetail;
- await storePendingTaskError(ws, opId, maybeError!);
- return {
- type: TaskRunResultType.Error,
- errorDetail: e.errorDetail,
- };
- } else if (e instanceof Error) {
- // This is a bug, as we expect pending operations to always
- // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
- // or return something.
- logger.error(`Uncaught exception: ${e.message}`);
- logger.error(`Stack: ${e.stack}`);
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {
- stack: e.stack,
- },
- `unexpected exception (message: ${e.message})`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- } else {
- logger.error("Uncaught exception, value is not even an error.");
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- `unexpected exception (not even an error)`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- }
+ const errorDetail = getErrorDetailFromException(e);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail,
+ };
}
}