summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-13 10:53:43 +0100
committerFlorian Dold <florian@dold.me>2024-02-15 21:56:54 +0100
commit70a803038f1cbe05dc4779bdd87376fd073421be (patch)
tree6607d69f6906ada9f912e31d9a9e3b65560a7326
parent2c17e98c336d96f955ec82ad0a1b164e3da90103 (diff)
downloadwallet-core-dev/dold/task-shepherd.tar.gz
wallet-core-dev/dold/task-shepherd.tar.bz2
wallet-core-dev/dold/task-shepherd.zip
implement task shepherd, many small fixes and tweaksdev/dold/task-shepherd
-rw-r--r--packages/taler-harness/src/harness/harness.ts11
-rw-r--r--packages/taler-harness/src/integrationtests/test-payment-fault.ts82
-rw-r--r--packages/taler-harness/src/integrationtests/test-payment-share.ts33
-rw-r--r--packages/taler-harness/src/integrationtests/test-peer-repair.ts4
-rw-r--r--packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts1
-rw-r--r--packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts19
-rw-r--r--packages/taler-util/src/time.ts23
-rw-r--r--packages/taler-util/src/wallet-types.ts12
-rw-r--r--packages/taler-wallet-cli/src/index.ts11
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts23
-rw-r--r--packages/taler-wallet-core/src/operations/README.md7
-rw-r--r--packages/taler-wallet-core/src/operations/backup/index.ts165
-rw-r--r--packages/taler-wallet-core/src/operations/common.ts466
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts140
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts468
-rw-r--r--packages/taler-wallet-core/src/operations/pay-merchant.ts552
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts167
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts33
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts123
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts231
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts814
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts155
-rw-r--r--packages/taler-wallet-core/src/operations/reward.ts6
-rw-r--r--packages/taler-wallet-core/src/operations/testing.ts41
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts442
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts233
-rw-r--r--packages/taler-wallet-core/src/pending-types.ts10
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts851
-rw-r--r--packages/taler-wallet-core/src/util/coinSelection.ts3
-rw-r--r--packages/taler-wallet-core/src/util/promiseUtils.ts40
-rw-r--r--packages/taler-wallet-core/src/util/query.ts2
-rw-r--r--packages/taler-wallet-core/src/wallet-api-types.ts15
-rw-r--r--packages/taler-wallet-core/src/wallet.ts244
33 files changed, 2589 insertions, 2838 deletions
diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts
index 48f8450fd..975d73cf8 100644
--- a/packages/taler-harness/src/harness/harness.ts
+++ b/packages/taler-harness/src/harness/harness.ts
@@ -412,6 +412,17 @@ export class GlobalTestState {
}
}
}
+
+ /**
+ * Log that the test arrived a certain step.
+ *
+ * The step name should be unique across the whole
+ */
+ logStep(stepName: string): void {
+ // Now we just log, later we may report the steps that were done
+ // to easily see where the test hangs.
+ console.info(`STEP: ${stepName}`);
+ }
}
export function shouldLingerInTest(): boolean {
diff --git a/packages/taler-harness/src/integrationtests/test-payment-fault.ts b/packages/taler-harness/src/integrationtests/test-payment-fault.ts
index af6751ef4..cadcc9056 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-fault.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-fault.ts
@@ -21,11 +21,7 @@
/**
* Imports.
*/
-import {
- TalerCorebankApiClient,
- CoreApiResponse,
- MerchantApiClient,
-} from "@gnu-taler/taler-util";
+import { ConfirmPayResultType, MerchantApiClient } from "@gnu-taler/taler-util";
import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
import { defaultCoinConfig } from "../harness/denomStructures.js";
import {
@@ -38,10 +34,13 @@ import {
ExchangeService,
GlobalTestState,
MerchantService,
- WalletCli,
generateRandomPayto,
setupDb,
} from "../harness/harness.js";
+import {
+ createWalletDaemonWithClient,
+ withdrawViaBankV2,
+} from "../harness/helpers.js";
/**
* Run test for basic, bank-integrated withdrawal.
@@ -123,45 +122,20 @@ export async function runPaymentFaultTest(t: GlobalTestState) {
console.log("setup done!");
- const wallet = new WalletCli(t);
-
- // Create withdrawal operation
-
- const bankClient = new TalerCorebankApiClient(bank.corebankApiBaseUrl);
-
- const user = await bankClient.createRandomBankUser();
- const wop = await bankClient.createWithdrawalOperation(
- user.username,
- "TESTKUDOS:20",
- );
-
- // Hand it to the wallet
-
- await wallet.client.call(WalletApiOperation.GetWithdrawalDetailsForUri, {
- talerWithdrawUri: wop.taler_withdraw_uri,
- });
-
- await wallet.runPending();
-
- // Withdraw
-
- await wallet.client.call(WalletApiOperation.AcceptBankIntegratedWithdrawal, {
- exchangeBaseUrl: faultyExchange.baseUrl,
- talerWithdrawUri: wop.taler_withdraw_uri,
+ const { walletClient } = await createWalletDaemonWithClient(t, {
+ name: "default",
});
- await wallet.runPending();
- // Confirm it
+ await walletClient.call(WalletApiOperation.GetBalances, {});
- await bankClient.confirmWithdrawalOperation(user.username, {
- withdrawalOperationId: wop.withdrawal_id,
+ const wres = await withdrawViaBankV2(t, {
+ walletClient,
+ bank,
+ exchange: faultyExchange,
+ amount: "TESTKUDOS:20",
});
- await wallet.runUntilDone();
-
- // Check balance
-
- await wallet.client.call(WalletApiOperation.GetBalances, {});
+ await wres.withdrawalFinishedCond;
// Set up order.
@@ -181,24 +155,22 @@ export async function runPaymentFaultTest(t: GlobalTestState) {
// Make wallet pay for the order
- let apiResp: CoreApiResponse;
-
- const prepResp = await wallet.client.call(
+ const prepResp = await walletClient.call(
WalletApiOperation.PreparePayForUri,
{
talerPayUri: orderStatus.taler_pay_uri,
},
);
- const proposalId = prepResp.proposalId;
-
- await wallet.runPending();
-
// Drop 3 responses from the exchange.
let faultCount = 0;
faultyExchange.faultProxy.addFault({
async modifyResponse(ctx: FaultInjectionResponseContext) {
- if (!ctx.request.requestUrl.endsWith("/deposit")) {
+ console.log(`in modifyResponse for ${ctx.request.requestUrl}`);
+ if (
+ !ctx.request.requestUrl.endsWith("/deposit") &&
+ !ctx.request.requestUrl.endsWith("/batch-deposit")
+ ) {
return;
}
if (faultCount < 3) {
@@ -213,12 +185,16 @@ export async function runPaymentFaultTest(t: GlobalTestState) {
// confirmPay won't work, as the exchange is unreachable
- await wallet.client.call(WalletApiOperation.ConfirmPay, {
- // FIXME: should be validated, don't cast!
- proposalId: proposalId,
- });
+ const confirmPayResp = await walletClient.call(
+ WalletApiOperation.ConfirmPay,
+ {
+ transactionId: prepResp.transactionId,
+ },
+ );
+
+ t.assertDeepEqual(confirmPayResp.type, ConfirmPayResultType.Pending);
- await wallet.runUntilDone();
+ await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
// Check if payment was successful.
diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts b/packages/taler-harness/src/integrationtests/test-payment-share.ts
index ef4f8adeb..034bbc98d 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-share.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts
@@ -65,6 +65,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
});
await secondWallet.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
+ t.logStep("setup-done");
+
// create two orders to pay
async function createOrder(amount: string) {
const order = {
@@ -74,7 +76,6 @@ export async function runPaymentShareTest(t: GlobalTestState) {
};
const args = { order };
- const auth = {};
const orderResp = await merchantClient.createOrder({
order: args.order,
@@ -88,6 +89,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
return { id: orderResp.order_id, uri: orderStatus.taler_pay_uri };
}
+ t.logStep("orders-created");
+
/**
* FIRST CASE, create in first wallet and pay in the second wallet
* first wallet should not be able to continue
@@ -104,6 +107,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
claimFirstWallet.status === PreparePayResultType.PaymentPossible,
);
+ t.logStep("w1-payment-possible");
+
// share order from the first wallet
const { privatePayUri } = await firstWallet.call(
WalletApiOperation.SharePayment,
@@ -113,6 +118,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
},
);
+ t.logStep("w1-payment-shared");
+
// claim from the second wallet
const claimSecondWallet = await secondWallet.call(
WalletApiOperation.PreparePayForUri,
@@ -123,18 +130,25 @@ export async function runPaymentShareTest(t: GlobalTestState) {
claimSecondWallet.status === PreparePayResultType.PaymentPossible,
);
+ t.logStep("w2-claimed");
+
// pay from the second wallet
const r2 = await secondWallet.call(WalletApiOperation.ConfirmPay, {
- proposalId: claimSecondWallet.proposalId,
+ transactionId: claimSecondWallet.transactionId,
});
+ t.assertTrue(r2.type === ConfirmPayResultType.Done);
+
+ t.logStep("w2-confirmed");
+
// Wait for refresh to settle before we do checks
await secondWallet.call(
WalletApiOperation.TestingWaitTransactionsFinal,
{},
);
- t.assertTrue(r2.type === ConfirmPayResultType.Done);
+ t.logStep("w2-refresh-settled");
+
{
const first = await firstWallet.call(WalletApiOperation.GetBalances, {});
const second = await secondWallet.call(
@@ -155,11 +169,16 @@ export async function runPaymentShareTest(t: GlobalTestState) {
claimFirstWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
);
+ t.logStep("w1-prepared-again");
+
const r1 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
- proposalId: claimFirstWallet.proposalId,
+ transactionId: claimFirstWallet.transactionId,
});
t.assertTrue(r1.type === ConfirmPayResultType.Done);
+
+ t.logStep("w1-confirmed-shared");
+
{
const first = await firstWallet.call(WalletApiOperation.GetBalances, {});
const second = await secondWallet.call(
@@ -171,6 +190,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
}
}
+ t.logStep("first-case-done");
+
/**
* SECOND CASE, create in first wallet and share to the second wallet
* pay with the first wallet, second wallet should not be able to continue
@@ -208,7 +229,7 @@ export async function runPaymentShareTest(t: GlobalTestState) {
// pay from the second wallet
const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
- proposalId: claimFirstWallet.proposalId,
+ transactionId: claimFirstWallet.transactionId,
});
t.assertTrue(r2.type === ConfirmPayResultType.Done);
@@ -232,6 +253,8 @@ export async function runPaymentShareTest(t: GlobalTestState) {
claimSecondWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
);
}
+
+ t.logStep("second-case-done");
}
runPaymentShareTest.suites = ["wallet"];
diff --git a/packages/taler-harness/src/integrationtests/test-peer-repair.ts b/packages/taler-harness/src/integrationtests/test-peer-repair.ts
index a225a2057..22664bcc1 100644
--- a/packages/taler-harness/src/integrationtests/test-peer-repair.ts
+++ b/packages/taler-harness/src/integrationtests/test-peer-repair.ts
@@ -22,21 +22,19 @@ import {
AmountString,
Duration,
NotificationType,
- TalerUriAction,
TransactionMajorState,
TransactionMinorState,
TransactionType,
WalletNotification,
- stringifyTalerUri,
} from "@gnu-taler/taler-util";
import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
+import * as fs from "node:fs";
import { GlobalTestState } from "../harness/harness.js";
import {
createSimpleTestkudosEnvironmentV2,
createWalletDaemonWithClient,
withdrawViaBankV2,
} from "../harness/helpers.js";
-import * as fs from "node:fs";
export async function runPeerRepairTest(t: GlobalTestState) {
// Set up test environment
diff --git a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
index e8d34e288..b61a3941b 100644
--- a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
+++ b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
@@ -33,7 +33,6 @@ import {
BankServiceHandle,
ExchangeService,
GlobalTestState,
- WalletCli,
WalletClient,
} from "../harness/harness.js";
import {
diff --git a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
index def2462e0..3c47f30db 100644
--- a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
+++ b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
@@ -22,6 +22,7 @@ import {
Duration,
durationFromSpec,
MerchantApiClient,
+ NotificationType,
PreparePayResultType,
} from "@gnu-taler/taler-util";
import {
@@ -124,6 +125,12 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) {
});
await wres.withdrawalFinishedCond;
+ const exchangeUpdated1Cond = walletClient.waitForNotificationCond(
+ (x) =>
+ x.type === NotificationType.ExchangeStateTransition &&
+ x.exchangeBaseUrl === exchange.baseUrl,
+ );
+
// Travel into the future, the deposit expiration is two years
// into the future.
console.log("applying first time travel");
@@ -142,7 +149,8 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) {
console.log("pending operations after first time travel");
console.log(JSON.stringify(p, undefined, 2));
- await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {});
+ // The time travel should cause exchanges to update.
+ await exchangeUpdated1Cond;
await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
const wres2 = await withdrawViaBankV2(t, {
@@ -155,6 +163,12 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) {
await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
+ const exchangeUpdated2Cond = walletClient.waitForNotificationCond(
+ (x) =>
+ x.type === NotificationType.ExchangeStateTransition &&
+ x.exchangeBaseUrl === exchange.baseUrl,
+ );
+
// Travel into the future, the deposit expiration is two years
// into the future.
console.log("applying second time travel");
@@ -167,7 +181,8 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) {
},
);
- await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {});
+ // The time travel should cause exchanges to update.
+ await exchangeUpdated2Cond;
await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
// At this point, the original coins should've been refreshed.
diff --git a/packages/taler-util/src/time.ts b/packages/taler-util/src/time.ts
index c677d52ae..5702b2947 100644
--- a/packages/taler-util/src/time.ts
+++ b/packages/taler-util/src/time.ts
@@ -21,7 +21,7 @@
/**
* Imports.
*/
-import { Codec, renderContext, Context } from "./codec.js";
+import { Codec, Context, renderContext } from "./codec.js";
declare const flavor_AbsoluteTime: unique symbol;
declare const flavor_TalerProtocolTimestamp: unique symbol;
@@ -412,6 +412,10 @@ export namespace AbsoluteTime {
return cmp(t, now()) <= 0;
}
+ export function isNever(t: AbsoluteTime): boolean {
+ return t.t_ms === "never";
+ }
+
export function fromProtocolTimestamp(
t: TalerProtocolTimestamp,
): AbsoluteTime {
@@ -503,6 +507,23 @@ export namespace AbsoluteTime {
return { t_ms: t1.t_ms + d.d_ms, [opaque_AbsoluteTime]: true };
}
+ /**
+ * Get the remaining duration until {@param t1}.
+ *
+ * If {@param t1} already happened, the remaining duration
+ * is zero.
+ */
+ export function remaining(t1: AbsoluteTime): Duration {
+ if (t1.t_ms === "never") {
+ return Duration.getForever();
+ }
+ const stampNow = now();
+ if (stampNow.t_ms === "never") {
+ throw Error("invariant violated");
+ }
+ return Duration.fromMilliseconds(Math.max(0, t1.t_ms - stampNow.t_ms));
+ }
+
export function subtractDuraction(
t1: AbsoluteTime,
d: Duration,
diff --git a/packages/taler-util/src/wallet-types.ts b/packages/taler-util/src/wallet-types.ts
index 0749df9f9..b79bfe4fe 100644
--- a/packages/taler-util/src/wallet-types.ts
+++ b/packages/taler-util/src/wallet-types.ts
@@ -71,12 +71,10 @@ import {
} from "./taler-types.js";
import {
AbsoluteTime,
- Duration,
TalerPreciseTimestamp,
TalerProtocolDuration,
TalerProtocolTimestamp,
codecForAbsoluteTime,
- codecForDuration,
codecForTimestamp,
} from "./time.js";
import {
@@ -3062,3 +3060,13 @@ export const codecForRemoveGlobalCurrencyAuditorRequest =
.property("auditorBaseUrl", codecForString())
.property("auditorPub", codecForString())
.build("RemoveGlobalCurrencyAuditorRequest");
+
+export interface RetryLoopOpts {
+ /**
+ * Stop the retry loop when all lifeness-giving pending operations
+ * are done.
+ *
+ * Defaults to false.
+ */
+ stopWhenDone?: boolean;
+}
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts
index 91dcd2702..f81236cd4 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -284,9 +284,6 @@ async function createLocalWallet(
console.error("Operation failed: " + summarizeTalerErrorDetail(ed));
console.error("Error details:", JSON.stringify(ed, undefined, 2));
processExit(1);
- } finally {
- logger.trace("operation with wallet finished, stopping");
- logger.trace("stopped wallet");
}
}
@@ -343,6 +340,7 @@ async function withLocalWallet<T>(
const wh = await createLocalWallet(walletCliArgs);
const w = wh.wallet;
const res = await f({ client: w.client, ws: w });
+ logger.info("Work done, stopping wallet.");
w.stop();
return res;
}
@@ -956,7 +954,6 @@ depositCli
},
);
console.log(`Created deposit ${resp.depositGroupId}`);
- await wallet.ws.runPending();
});
});
@@ -1231,9 +1228,9 @@ advancedCli
help: "Run pending operations.",
})
.action(async (args) => {
- await withLocalWallet(args, async (wallet) => {
- await wallet.ws.runPending();
- });
+ logger.error(
+ "Subcommand run-pending not supported anymore. Please use run-until-done or the client/server wallet.",
+ );
});
advancedCli
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index fdf04a65f..4379f20b5 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -31,16 +31,14 @@
*/
import { IDBFactory } from "@gnu-taler/idb-bridge";
import {
- CoinRefreshRequest,
DenominationInfo,
- RefreshGroupId,
- RefreshReason,
TransactionState,
WalletNotification,
} from "@gnu-taler/taler-util";
import { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import { WalletStoresV1 } from "./db.js";
+import { TaskScheduler } from "./shepherd.js";
import { AsyncCondition } from "./util/promiseUtils.js";
import {
DbAccess,
@@ -78,12 +76,6 @@ export interface RecoupOperations {
export type NotificationListener = (n: WalletNotification) => void;
-export interface ActiveLongpollInfo {
- [opId: string]: {
- cancel: () => void;
- };
-}
-
export type CancelFn = () => void;
/**
@@ -94,11 +86,6 @@ export type CancelFn = () => void;
* as it's an opaque implementation detail.
*/
export interface InternalWalletState {
- /**
- * Active longpoll operations.
- */
- activeLongpoll: ActiveLongpollInfo;
-
cryptoApi: TalerCryptoInterface;
timerGroup: TimerGroup;
@@ -106,13 +93,7 @@ export interface InternalWalletState {
config: Readonly<WalletConfig>;
- /**
- * Asynchronous condition to interrupt the sleep of the
- * retry loop.
- *
- * Used to allow processing of new work faster.
- */
- workAvailable: AsyncCondition;
+ taskScheduler: TaskScheduler;
listeners: NotificationListener[];
diff --git a/packages/taler-wallet-core/src/operations/README.md b/packages/taler-wallet-core/src/operations/README.md
deleted file mode 100644
index a40349d37..000000000
--- a/packages/taler-wallet-core/src/operations/README.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# Wallet Operations
-
-This folder contains the implementations for all wallet operations that operate on the wallet state.
-
-To avoid cyclic dependencies, these files must **not** reference each other. Instead, other operations should only be accessed via injected dependencies.
-
-Avoiding cyclic dependencies is important for module bundlers.
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts
index 7a2771c57..e4e4e43f6 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -30,12 +30,10 @@ import {
AttentionType,
BackupRecovery,
Codec,
- DenomKeyType,
EddsaKeyPair,
HttpStatusCode,
Logger,
PreparePayResult,
- PreparePayResultType,
RecoveryLoadRequest,
RecoveryMergeStrategy,
TalerError,
@@ -61,11 +59,9 @@ import {
encodeCrock,
getRandomBytes,
hash,
- hashDenomPub,
j2s,
kdf,
notEmpty,
- rsaBlind,
secretbox,
secretbox_open,
stringToBytes,
@@ -75,7 +71,6 @@ import {
readTalerErrorResponse,
} from "@gnu-taler/taler-util/http";
import { gunzipSync, gzipSync } from "fflate";
-import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js";
import {
BackupProviderRecord,
BackupProviderState,
@@ -84,25 +79,23 @@ import {
ConfigRecord,
ConfigRecordKey,
WalletBackupConfState,
+ WalletStoresV1,
timestampOptionalPreciseFromDb,
- timestampPreciseFromDb,
timestampPreciseToDb,
} from "../../db.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
-import { assertUnreachable } from "../../util/assertUnreachable.js";
import {
checkDbInvariant,
checkLogicInvariant,
} from "../../util/invariants.js";
+import { GetReadOnlyAccess } from "../../util/query.js";
import { addAttentionRequest, removeAttentionRequest } from "../attention.js";
import {
+ TaskIdentifiers,
TaskRunResult,
TaskRunResultType,
- TaskIdentifiers,
} from "../common.js";
-import { checkPaymentByProposalId, preparePayForUri } from "../pay-merchant.js";
-import { WalletStoresV1 } from "../../db.js";
-import { GetReadOnlyAccess } from "../../util/query.js";
+import { preparePayForUri } from "../pay-merchant.js";
const logger = new Logger("operations/backup.ts");
@@ -318,9 +311,10 @@ async function runBackupCycleForProvider(
await tx.backupProviders.put(prov);
});
- return {
- type: TaskRunResultType.Pending,
- };
+ throw Error("not implemented");
+ // return {
+ // type: TaskRunResultType.Pending,
+ // };
}
const result = res;
@@ -352,9 +346,10 @@ async function runBackupCycleForProvider(
provider.baseUrl,
);
- return {
- type: TaskRunResultType.Pending,
- };
+ throw Error("not implemented");
+ // return {
+ // type: TaskRunResultType.Pending,
+ // };
}
if (resp.status === HttpStatusCode.NoContent) {
@@ -658,30 +653,27 @@ async function runFirstBackupCycleForProvider(
ws: InternalWalletState,
args: BackupForProviderArgs,
): Promise<AddBackupProviderResponse> {
- const resp = await runBackupCycleForProvider(ws, args);
- switch (resp.type) {
- case TaskRunResultType.Error:
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- resp.errorDetail as any, //FIXME create an error for backup problems
- );
- case TaskRunResultType.Finished:
- return {
- status: "ok",
- };
- case TaskRunResultType.Longpoll:
- throw Error(
- "unexpected runFirstBackupCycleForProvider result (longpoll)",
- );
- case TaskRunResultType.Pending:
- return {
- status: "payment-required",
- talerUri: "FIXME",
- //talerUri: resp.result.talerUri,
- };
- default:
- assertUnreachable(resp);
- }
+ throw Error("not implemented");
+ // const resp = await runBackupCycleForProvider(ws, args);
+ // switch (resp.type) {
+ // case TaskRunResultType.Error:
+ // throw TalerError.fromDetail(
+ // TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ // resp.errorDetail as any, //FIXME create an error for backup problems
+ // );
+ // case TaskRunResultType.Finished:
+ // return {
+ // status: "ok",
+ // };
+ // case TaskRunResultType.Pending:
+ // return {
+ // status: "payment-required",
+ // talerUri: "FIXME",
+ // //talerUri: resp.result.talerUri,
+ // };
+ // default:
+ // assertUnreachable(resp);
+ // }
}
export async function restoreFromRecoverySecret(): Promise<void> {
@@ -780,51 +772,52 @@ async function getProviderPaymentInfo(
ws: InternalWalletState,
provider: BackupProviderRecord,
): Promise<ProviderPaymentStatus> {
- if (!provider.currentPaymentProposalId) {
- return {
- type: ProviderPaymentType.Unpaid,
- };
- }
- const status = await checkPaymentByProposalId(
- ws,
- provider.currentPaymentProposalId,
- ).catch(() => undefined);
-
- if (!status) {
- return {
- type: ProviderPaymentType.Unpaid,
- };
- }
-
- switch (status.status) {
- case PreparePayResultType.InsufficientBalance:
- return {
- type: ProviderPaymentType.InsufficientBalance,
- amount: status.amountRaw,
- };
- case PreparePayResultType.PaymentPossible:
- return {
- type: ProviderPaymentType.Pending,
- talerUri: status.talerUri,
- };
- case PreparePayResultType.AlreadyConfirmed:
- if (status.paid) {
- return {
- type: ProviderPaymentType.Paid,
- paidUntil: AbsoluteTime.addDuration(
- AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp),
- durationFromSpec({ years: 1 }), //FIXME: take this from the contract term
- ),
- };
- } else {
- return {
- type: ProviderPaymentType.Pending,
- talerUri: status.talerUri,
- };
- }
- default:
- assertUnreachable(status);
- }
+ throw Error("not implemented");
+ // if (!provider.currentPaymentProposalId) {
+ // return {
+ // type: ProviderPaymentType.Unpaid,
+ // };
+ // }
+ // const status = await checkPaymentByProposalId(
+ // ws,
+ // provider.currentPaymentProposalId,
+ // ).catch(() => undefined);
+
+ // if (!status) {
+ // return {
+ // type: ProviderPaymentType.Unpaid,
+ // };
+ // }
+
+ // switch (status.status) {
+ // case PreparePayResultType.InsufficientBalance:
+ // return {
+ // type: ProviderPaymentType.InsufficientBalance,
+ // amount: status.amountRaw,
+ // };
+ // case PreparePayResultType.PaymentPossible:
+ // return {
+ // type: ProviderPaymentType.Pending,
+ // talerUri: status.talerUri,
+ // };
+ // case PreparePayResultType.AlreadyConfirmed:
+ // if (status.paid) {
+ // return {
+ // type: ProviderPaymentType.Paid,
+ // paidUntil: AbsoluteTime.addDuration(
+ // AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp),
+ // durationFromSpec({ years: 1 }), //FIXME: take this from the contract term
+ // ),
+ // };
+ // } else {
+ // return {
+ // type: ProviderPaymentType.Pending,
+ // talerUri: status.talerUri,
+ // };
+ // }
+ // default:
+ // assertUnreachable(status);
+ // }
}
/**
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts
index 4c7c55212..92950b35b 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -21,7 +21,6 @@ import {
AbsoluteTime,
AmountJson,
Amounts,
- CancellationToken,
CoinRefreshRequest,
CoinStatus,
Duration,
@@ -29,22 +28,15 @@ import {
ExchangeEntryStatus,
ExchangeTosStatus,
ExchangeUpdateStatus,
- getErrorDetailFromException,
- j2s,
Logger,
- makeErrorDetail,
- NotificationType,
RefreshReason,
- TalerError,
- TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
+ TalerProtocolTimestamp,
TombstoneIdStr,
TransactionIdStr,
- TransactionType,
- WalletNotification,
+ durationMul,
} from "@gnu-taler/taler-util";
-import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
import {
BackupProviderRecord,
CoinRecord,
@@ -61,17 +53,16 @@ import {
RecoupGroupRecord,
RefreshGroupRecord,
RewardRecord,
- timestampPreciseToDb,
WalletStoresV1,
WithdrawalGroupRecord,
+ timestampPreciseToDb,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
+import { GetReadWriteAccess } from "../util/query.js";
import { createRefreshGroup } from "./refresh.js";
-import { constructTransactionIdentifier } from "./transactions.js";
const logger = new Logger("operations/common.ts");
@@ -251,331 +242,6 @@ export async function spendCoins(
);
}
-/**
- * Convert the task ID for a task that processes a transaction int
- * the ID for the transaction.
- */
-function convertTaskToTransactionId(
- taskId: string,
-): TransactionIdStr | undefined {
- const parsedTaskId = parseTaskIdentifier(taskId);
- switch (parsedTaskId.tag) {
- case PendingTaskType.PeerPullCredit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPullCredit,
- pursePub: parsedTaskId.pursePub,
- });
- case PendingTaskType.PeerPullDebit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPullDebit,
- peerPullDebitId: parsedTaskId.peerPullDebitId,
- });
- // FIXME: This doesn't distinguish internal-withdrawal.
- // Maybe we should have a different task type for that as well?
- // Or maybe transaction IDs should be valid task identifiers?
- case PendingTaskType.Withdraw:
- return constructTransactionIdentifier({
- tag: TransactionType.Withdrawal,
- withdrawalGroupId: parsedTaskId.withdrawalGroupId,
- });
- case PendingTaskType.PeerPushCredit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPushCredit,
- peerPushCreditId: parsedTaskId.peerPushCreditId,
- });
- case PendingTaskType.Deposit:
- return constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId: parsedTaskId.depositGroupId,
- });
- case PendingTaskType.Refresh:
- return constructTransactionIdentifier({
- tag: TransactionType.Refresh,
- refreshGroupId: parsedTaskId.refreshGroupId,
- });
- case PendingTaskType.RewardPickup:
- return constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: parsedTaskId.walletRewardId,
- });
- case PendingTaskType.PeerPushDebit:
- return constructTransactionIdentifier({
- tag: TransactionType.PeerPushDebit,
- pursePub: parsedTaskId.pursePub,
- });
- case PendingTaskType.Purchase:
- return constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId: parsedTaskId.proposalId,
- });
- default:
- return undefined;
- }
-}
-
-async function makeTransactionRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- const txId = convertTaskToTransactionId(pendingTaskId);
- if (!txId) {
- return undefined;
- }
- const txState = await ws.getTransactionState(ws, tx, txId);
- if (!txState) {
- return undefined;
- }
- const notif: WalletNotification = {
- type: NotificationType.TransactionStateTransition,
- transactionId: txId,
- oldTxState: txState,
- newTxState: txState,
- };
- if (e) {
- notif.errorInfo = {
- code: e.code as number,
- hint: e.hint,
- };
- }
- return notif;
-}
-
-async function makeExchangeRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- logger.info("making exchange retry notification");
- const parsedTaskId = parseTaskIdentifier(pendingTaskId);
- if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
- throw Error("invalid task identifier");
- }
- const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
-
- if (!rec) {
- logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
- return undefined;
- }
-
- const notif: WalletNotification = {
- type: NotificationType.ExchangeStateTransition,
- exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
- oldExchangeState: getExchangeState(rec),
- newExchangeState: getExchangeState(rec),
- };
- if (e) {
- notif.errorInfo = {
- code: e.code as number,
- hint: e.hint,
- };
- }
- return notif;
-}
-
-/**
- * Generate an appropriate error transition notification
- * for applicable tasks.
- *
- * Namely, transition notifications are generated for:
- * - exchange update errors
- * - transactions
- */
-async function taskToRetryNotification(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<typeof WalletStoresV1>,
- pendingTaskId: string,
- e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
- const parsedTaskId = parseTaskIdentifier(pendingTaskId);
-
- switch (parsedTaskId.tag) {
- case PendingTaskType.ExchangeUpdate:
- return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
- case PendingTaskType.PeerPullCredit:
- case PendingTaskType.PeerPullDebit:
- case PendingTaskType.Withdraw:
- case PendingTaskType.PeerPushCredit:
- case PendingTaskType.Deposit:
- case PendingTaskType.Refresh:
- case PendingTaskType.RewardPickup:
- case PendingTaskType.PeerPushDebit:
- case PendingTaskType.Purchase:
- return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
- case PendingTaskType.Backup:
- case PendingTaskType.ExchangeCheckRefresh:
- case PendingTaskType.Recoup:
- return undefined;
- }
-}
-
-async function storePendingTaskError(
- ws: InternalWalletState,
- pendingTaskId: string,
- e: TalerErrorDetail,
-): Promise<void> {
- logger.info(`storing pending task error for ${pendingTaskId}`);
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- if (!retryRecord) {
- retryRecord = {
- id: pendingTaskId,
- lastError: e,
- retryInfo: DbRetryInfo.reset(),
- };
- } else {
- retryRecord.lastError = e;
- retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
- }
- await tx.operationRetries.put(retryRecord);
- return taskToRetryNotification(ws, tx, pendingTaskId, e);
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-export async function resetPendingTaskTimeout(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- if (retryRecord) {
- // Note that we don't reset the lastError, it should still be visible
- // while the retry runs.
- retryRecord.retryInfo = DbRetryInfo.reset();
- await tx.operationRetries.put(retryRecord);
- }
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-async function storePendingTaskPending(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
- let retryRecord = await tx.operationRetries.get(pendingTaskId);
- let hadError = false;
- if (!retryRecord) {
- retryRecord = {
- id: pendingTaskId,
- retryInfo: DbRetryInfo.reset(),
- };
- } else {
- if (retryRecord.lastError) {
- hadError = true;
- }
- delete retryRecord.lastError;
- retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
- }
- await tx.operationRetries.put(retryRecord);
- if (hadError) {
- return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
- } else {
- return undefined;
- }
- });
- if (maybeNotification) {
- ws.notify(maybeNotification);
- }
-}
-
-async function storePendingTaskFinished(
- ws: InternalWalletState,
- pendingTaskId: string,
-): Promise<void> {
- await ws.db
- .mktx((x) => [x.operationRetries])
- .runReadWrite(async (tx) => {
- await tx.operationRetries.delete(pendingTaskId);
- });
-}
-
-export async function runTaskWithErrorReporting(
- ws: InternalWalletState,
- opId: TaskId,
- f: () => Promise<TaskRunResult>,
-): Promise<TaskRunResult> {
- let maybeError: TalerErrorDetail | undefined;
- try {
- const resp = await f();
- switch (resp.type) {
- case TaskRunResultType.Error:
- await storePendingTaskError(ws, opId, resp.errorDetail);
- return resp;
- case TaskRunResultType.Finished:
- await storePendingTaskFinished(ws, opId);
- return resp;
- case TaskRunResultType.Pending:
- await storePendingTaskPending(ws, opId);
- return resp;
- case TaskRunResultType.Longpoll:
- return resp;
- }
- } catch (e) {
- if (e instanceof CryptoApiStoppedError) {
- if (ws.stopped) {
- logger.warn("crypto API stopped during shutdown, ignoring error");
- return {
- type: TaskRunResultType.Error,
- errorDetail: makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- "Crypto API stopped during shutdown",
- ),
- };
- }
- }
- if (e instanceof TalerError) {
- logger.warn("operation processed resulted in error");
- logger.warn(`error was: ${j2s(e.errorDetail)}`);
- maybeError = e.errorDetail;
- await storePendingTaskError(ws, opId, maybeError!);
- return {
- type: TaskRunResultType.Error,
- errorDetail: e.errorDetail,
- };
- } else if (e instanceof Error) {
- // This is a bug, as we expect pending operations to always
- // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
- // or return something.
- logger.error(`Uncaught exception: ${e.message}`);
- logger.error(`Stack: ${e.stack}`);
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {
- stack: e.stack,
- },
- `unexpected exception (message: ${e.message})`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- } else {
- logger.error("Uncaught exception, value is not even an error.");
- maybeError = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
- {},
- `unexpected exception (not even an error)`,
- );
- await storePendingTaskError(ws, opId, maybeError);
- return {
- type: TaskRunResultType.Error,
- errorDetail: maybeError,
- };
- }
- }
-}
-
export enum TombstoneTag {
DeleteWithdrawalGroup = "delete-withdrawal-group",
DeleteReserve = "delete-reserve",
@@ -646,47 +312,6 @@ export function getExchangeState(r: ExchangeEntryRecord): ExchangeEntryState {
};
}
-export interface LongpollResult {
- ready: boolean;
-}
-
-export function runLongpollAsync(
- ws: InternalWalletState,
- retryTag: string,
- reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
-): void {
- const asyncFn = async () => {
- if (ws.stopped) {
- logger.trace("not long-polling reserve, wallet already stopped");
- await storePendingTaskPending(ws, retryTag);
- return;
- }
- const cts = CancellationToken.create();
- let res: { ready: boolean } | undefined = undefined;
- try {
- ws.activeLongpoll[retryTag] = {
- cancel: () => {
- logger.trace("cancel of reserve longpoll requested");
- cts.cancel();
- },
- };
- res = await reqFn(cts.token);
- } catch (e) {
- const errDetail = getErrorDetailFromException(e);
- logger.warn(`got error during long-polling: ${j2s(errDetail)}`);
- await storePendingTaskError(ws, retryTag, errDetail);
- return;
- } finally {
- delete ws.activeLongpoll[retryTag];
- }
- if (!res.ready) {
- await storePendingTaskPending(ws, retryTag);
- }
- ws.workAvailable.trigger();
- };
- asyncFn();
-}
-
export type ParsedTombstone =
| {
tag: TombstoneTag.DeleteWithdrawalGroup;
@@ -732,31 +357,53 @@ export interface TransactionManager {
export enum TaskRunResultType {
Finished = "finished",
- Pending = "pending",
+ Backoff = "backoff",
+ Progress = "progress",
Error = "error",
- Longpoll = "longpoll",
+ ScheduleLater = "schedule-later",
}
export type TaskRunResult =
| TaskRunFinishedResult
| TaskRunErrorResult
- | TaskRunLongpollResult
- | TaskRunPendingResult;
+ | TaskRunBackoffResult
+ | TaskRunProgressResult
+ | TaskRunScheduleLaterResult;
export namespace TaskRunResult {
+ /**
+ * Task is finished and does not need to be processed again.
+ */
export function finished(): TaskRunResult {
return {
type: TaskRunResultType.Finished,
};
}
- export function pending(): TaskRunResult {
+ /**
+ * Task is waiting for something, should be invoked
+ * again with exponentiall back-off until some other
+ * result is returned.
+ */
+ export function backoff(): TaskRunResult {
+ return {
+ type: TaskRunResultType.Backoff,
+ };
+ }
+ /**
+ * Task made progress and should be processed again.
+ */
+ export function progress(): TaskRunResult {
return {
- type: TaskRunResultType.Pending,
+ type: TaskRunResultType.Progress,
};
}
- export function longpoll(): TaskRunResult {
+ /**
+ * Run the task again at a fixed time in the future.
+ */
+ export function runAgainAt(runAt: AbsoluteTime): TaskRunResult {
return {
- type: TaskRunResultType.Longpoll,
+ type: TaskRunResultType.ScheduleLater,
+ runAt,
};
}
}
@@ -765,8 +412,17 @@ export interface TaskRunFinishedResult {
type: TaskRunResultType.Finished;
}
-export interface TaskRunPendingResult {
- type: TaskRunResultType.Pending;
+export interface TaskRunBackoffResult {
+ type: TaskRunResultType.Backoff;
+}
+
+export interface TaskRunProgressResult {
+ type: TaskRunResultType.Progress;
+}
+
+export interface TaskRunScheduleLaterResult {
+ type: TaskRunResultType.ScheduleLater;
+ runAt: AbsoluteTime;
}
export interface TaskRunErrorResult {
@@ -774,10 +430,6 @@ export interface TaskRunErrorResult {
errorDetail: TalerErrorDetail;
}
-export interface TaskRunLongpollResult {
- type: TaskRunResultType.Longpoll;
-}
-
export interface DbRetryInfo {
firstTry: DbPreciseTimestamp;
nextRetry: DbPreciseTimestamp;
@@ -867,6 +519,24 @@ export namespace DbRetryInfo {
}
/**
+ * Timestamp after which the wallet would do an auto-refresh.
+ */
+export function getAutoRefreshExecuteThreshold(d: {
+ stampExpireWithdraw: TalerProtocolTimestamp;
+ stampExpireDeposit: TalerProtocolTimestamp;
+}): AbsoluteTime {
+ const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireWithdraw,
+ );
+ const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+ d.stampExpireDeposit,
+ );
+ const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+ const deltaDiv = durationMul(delta, 0.5);
+ return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
+}
+
+/**
* Parsed representation of task identifiers.
*/
export type ParsedTaskIdentifier =
@@ -877,7 +547,6 @@ export type ParsedTaskIdentifier =
| { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
| { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
| { tag: PendingTaskType.Deposit; depositGroupId: string }
- | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string }
| { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string }
| { tag: PendingTaskType.PeerPullCredit; pursePub: string }
| { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string }
@@ -900,8 +569,6 @@ export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.Deposit:
return { tag: type, depositGroupId: rest[0] };
- case PendingTaskType.ExchangeCheckRefresh:
- return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.ExchangeUpdate:
return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
case PendingTaskType.PeerPullCredit:
@@ -933,8 +600,6 @@ export function constructTaskIdentifier(p: ParsedTaskIdentifier): TaskId {
return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId;
case PendingTaskType.Deposit:
return `${p.tag}:${p.depositGroupId}` as TaskId;
- case PendingTaskType.ExchangeCheckRefresh:
- return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
case PendingTaskType.ExchangeUpdate:
return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
case PendingTaskType.PeerPullDebit:
@@ -974,11 +639,6 @@ export namespace TaskIdentifiers {
exchBaseUrl,
)}` as TaskId;
}
- export function forExchangeCheckRefresh(exch: ExchangeEntryRecord): TaskId {
- return `${PendingTaskType.ExchangeCheckRefresh}:${encodeURIComponent(
- exch.baseUrl,
- )}` as TaskId;
- }
export function forTipPickup(tipRecord: RewardRecord): TaskId {
return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as TaskId;
}
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index 3619ac4f4..38b5d43f0 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -48,6 +48,7 @@ import {
TalerProtocolTimestamp,
TrackTransaction,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -75,6 +76,7 @@ import {
KycPendingInfo,
PendingTaskType,
RefreshOperationStatus,
+ TaskId,
createRefreshGroup,
getCandidateWithdrawalDenomsTx,
getTotalRefreshCost,
@@ -90,7 +92,6 @@ import {
TombstoneTag,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
spendCoins,
} from "./common.js";
import { getExchangeWireDetailsInTx } from "./exchanges.js";
@@ -103,7 +104,6 @@ import {
constructTransactionIdentifier,
notifyTransition,
parseTransactionIdentifier,
- stopLongpolling,
} from "./transactions.js";
/**
@@ -112,8 +112,8 @@ import {
const logger = new Logger("deposits.ts");
export class DepositTransactionContext implements TransactionContext {
- private transactionId: string;
- private retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskId;
constructor(
public ws: InternalWalletState,
public depositGroupId: string,
@@ -122,7 +122,7 @@ export class DepositTransactionContext implements TransactionContext {
tag: TransactionType.Deposit,
depositGroupId,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId,
});
@@ -148,7 +148,7 @@ export class DepositTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, depositGroupId, transactionId, retryTag } = this;
+ const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.depositGroups])
.runReadWrite(async (tx) => {
@@ -185,12 +185,12 @@ export class DepositTransactionContext implements TransactionContext {
newTxState: computeDepositTransactionStatus(dg),
};
});
- stopLongpolling(ws, retryTag);
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
- const { ws, depositGroupId, transactionId, retryTag } = this;
+ const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.depositGroups])
.runReadWrite(async (tx) => {
@@ -219,14 +219,13 @@ export class DepositTransactionContext implements TransactionContext {
}
return undefined;
});
- stopLongpolling(ws, retryTag);
- // Need to process the operation again.
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
- const { ws, depositGroupId, transactionId, retryTag } = this;
+ const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.depositGroups])
.runReadWrite(async (tx) => {
@@ -263,12 +262,12 @@ export class DepositTransactionContext implements TransactionContext {
newTxState: computeDepositTransactionStatus(dg),
};
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const { ws, depositGroupId, transactionId, retryTag } = this;
+ const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.depositGroups])
.runReadWrite(async (tx) => {
@@ -294,7 +293,7 @@ export class DepositTransactionContext implements TransactionContext {
return undefined;
});
// FIXME: Also cancel ongoing work (via cancellation token, once implemented)
- stopLongpolling(ws, retryTag);
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
}
@@ -453,7 +452,7 @@ async function waitForRefreshOnDepositGroup(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function refundDepositGroup(
@@ -568,7 +567,7 @@ async function refundDepositGroup(
await tx.depositGroups.put(newDg);
});
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function processDepositGroupAborting(
@@ -588,6 +587,7 @@ async function processDepositGroupAborting(
async function processDepositGroupPendingKyc(
ws: InternalWalletState,
depositGroup: DepositGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { depositGroupId } = depositGroup;
const transactionId = constructTransactionIdentifier({
@@ -606,51 +606,45 @@ async function processDepositGroupPendingKyc(
throw Error("invalid DB state, in pending(kyc), but no kycInfo present");
}
- runLongpollAsync(ws, retryTag, async (ct) => {
- const url = new URL(
- `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
- kycInfo.exchangeBaseUrl,
- );
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
- method: "GET",
- cancellationToken: ct,
- });
- if (
- kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
- // remove after the exchange is fixed or clarified
- kycStatusRes.status === HttpStatusCode.NoContent
- ) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
- const newDg = await tx.depositGroups.get(depositGroupId);
- if (!newDg) {
- return;
- }
- if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) {
- return;
- }
- const oldTxState = computeDepositTransactionStatus(newDg);
- newDg.operationStatus = DepositOperationStatus.PendingTrack;
- const newTxState = computeDepositTransactionStatus(newDg);
- await tx.depositGroups.put(newDg);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
- } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
- // FIXME: Do we have to update the URL here?
- return { ready: false };
- } else {
- throw Error(
- `unexpected response from kyc-check (${kycStatusRes.status})`,
- );
- }
+ const url = new URL(
+ `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+ kycInfo.exchangeBaseUrl,
+ );
+ url.searchParams.set("timeout_ms", "10000");
+ logger.info(`kyc url ${url.href}`);
+ const kycStatusRes = await ws.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken,
});
- return TaskRunResult.longpoll();
+ if (
+ kycStatusRes.status === HttpStatusCode.Ok ||
+ //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // remove after the exchange is fixed or clarified
+ kycStatusRes.status === HttpStatusCode.NoContent
+ ) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.depositGroups])
+ .runReadWrite(async (tx) => {
+ const newDg = await tx.depositGroups.get(depositGroupId);
+ if (!newDg) {
+ return;
+ }
+ if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) {
+ return;
+ }
+ const oldTxState = computeDepositTransactionStatus(newDg);
+ newDg.operationStatus = DepositOperationStatus.PendingTrack;
+ const newTxState = computeDepositTransactionStatus(newDg);
+ await tx.depositGroups.put(newDg);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+ // FIXME: Do we have to update the URL here?
+ } else {
+ throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+ }
+ return TaskRunResult.backoff();
}
/**
@@ -682,7 +676,7 @@ async function transitionToKycRequired(
});
if (kycStatusReq.status === HttpStatusCode.Ok) {
logger.warn("kyc requested, but already fulfilled");
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
} else if (kycStatusReq.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusReq.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
@@ -864,7 +858,7 @@ async function processDepositGroupPendingTrack(
return TaskRunResult.finished();
} else {
// FIXME: Use long-polling.
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
}
@@ -993,7 +987,7 @@ async function processDepositGroupPendingDeposit(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
/**
@@ -1002,9 +996,7 @@ async function processDepositGroupPendingDeposit(
export async function processDepositGroup(
ws: InternalWalletState,
depositGroupId: string,
- options: {
- cancellationToken?: CancellationToken;
- } = {},
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const depositGroup = await ws.db
.mktx((x) => [x.depositGroups])
@@ -1021,15 +1013,15 @@ export async function processDepositGroup(
return processDepositGroupPendingTrack(
ws,
depositGroup,
- options.cancellationToken,
+ cancellationToken,
);
case DepositOperationStatus.PendingKyc:
- return processDepositGroupPendingKyc(ws, depositGroup);
+ return processDepositGroupPendingKyc(ws, depositGroup, cancellationToken);
case DepositOperationStatus.PendingDeposit:
return processDepositGroupPendingDeposit(
ws,
depositGroup,
- options.cancellationToken,
+ cancellationToken,
);
case DepositOperationStatus.Aborting:
return processDepositGroupAborting(ws, depositGroup);
@@ -1393,10 +1385,8 @@ export async function createDepositGroup(
operationStatus: DepositOperationStatus.PendingDeposit,
};
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Deposit,
- depositGroupId,
- });
+ const ctx = new DepositTransactionContext(ws, depositGroupId);
+ const transactionId = ctx.transactionId;
const newTxState = await ws.db
.mktx((x) => [
@@ -1439,6 +1429,8 @@ export async function createDepositGroup(
hintTransactionId: transactionId,
});
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
return {
depositGroupId,
transactionId,
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index b4d45db2c..22be4102a 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -28,6 +28,8 @@ import {
AgeRestriction,
Amounts,
CancellationToken,
+ CoinRefreshRequest,
+ CoinStatus,
DeleteExchangeRequest,
DenomKeyType,
DenomOperationMap,
@@ -53,6 +55,7 @@ import {
NotificationType,
OperationErrorInfo,
Recoup,
+ RefreshReason,
ScopeInfo,
ScopeType,
TalerError,
@@ -67,8 +70,11 @@ import {
WireFeeMap,
WireFeesJson,
WireInfo,
+ assertUnreachable,
canonicalizeBaseUrl,
codecForExchangeKeysJson,
+ durationFromSpec,
+ durationMul,
encodeCrock,
hashDenomPub,
j2s,
@@ -89,19 +95,22 @@ import {
WalletStoresV1,
} from "../db.js";
import {
+ AsyncFlag,
ExchangeEntryDbRecordStatus,
ExchangeEntryDbUpdateStatus,
PendingTaskType,
WalletDbReadOnlyTransactionArr,
WalletDbReadWriteTransactionArr,
+ createRefreshGroup,
createTimeline,
isWithdrawableDenom,
selectBestForOverlappingDenominations,
selectMinimumFee,
- timestampOptionalAbsoluteFromDb,
+ timestampAbsoluteFromDb,
timestampOptionalPreciseFromDb,
timestampPreciseFromDb,
timestampPreciseToDb,
+ timestampProtocolFromDb,
timestampProtocolToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
@@ -117,11 +126,11 @@ import {
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
+ getAutoRefreshExecuteThreshold,
getExchangeEntryStatusFromRecord,
getExchangeState,
getExchangeTosStatusFromRecord,
getExchangeUpdateStatusFromRecord,
- runTaskWithErrorReporting,
} from "./common.js";
const logger = new Logger("exchanges.ts");
@@ -635,11 +644,13 @@ async function downloadExchangeKeysInfo(
baseUrl: string,
http: HttpRequestLibrary,
timeout: Duration,
+ cancellationToken: CancellationToken,
): Promise<ExchangeKeysDownloadResult> {
const keysUrl = new URL("keys", baseUrl);
const resp = await http.fetch(keysUrl.href, {
timeout,
+ cancellationToken,
});
// We must make sure to parse out the protocol version
@@ -828,13 +839,19 @@ async function downloadTosFromAcceptedFormat(
* If the exchange entry doesn't exist,
* a new ephemeral entry is created.
*/
-export async function startUpdateExchangeEntry(
+async function startUpdateExchangeEntry(
ws: InternalWalletState,
exchangeBaseUrl: string,
options: { forceUpdate?: boolean } = {},
): Promise<void> {
const canonBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
+ logger.info(
+ `starting update of exchange entry ${canonBaseUrl}, forced=${
+ options.forceUpdate ?? false
+ }`,
+ );
+
const { notification } = await ws.db
.mktx((x) => [x.exchanges, x.exchangeDetails])
.runReadWrite(async (tx) => {
@@ -845,7 +862,7 @@ export async function startUpdateExchangeEntry(
ws.notify(notification);
}
- const { oldExchangeState, newExchangeState } = await ws.db
+ const { oldExchangeState, newExchangeState, taskId } = await ws.db
.mktx((x) => [x.exchanges, x.operationRetries])
.runReadWrite(async (tx) => {
const r = await tx.exchanges.get(canonBaseUrl);
@@ -882,7 +899,7 @@ export async function startUpdateExchangeEntry(
// Reset retries for updating the exchange entry.
const taskId = TaskIdentifiers.forExchangeUpdate(r);
await tx.operationRetries.delete(taskId);
- return { oldExchangeState, newExchangeState };
+ return { oldExchangeState, newExchangeState, taskId };
});
ws.notify({
type: NotificationType.ExchangeStateTransition,
@@ -890,7 +907,7 @@ export async function startUpdateExchangeEntry(
newExchangeState: newExchangeState,
oldExchangeState: oldExchangeState,
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.restartShepherdTask(taskId);
}
/**
@@ -909,6 +926,119 @@ export interface ReadyExchangeSummary {
scopeInfo: ScopeInfo;
}
+async function internalWaitReadyExchange(
+ ws: InternalWalletState,
+ canonUrl: string,
+ exchangeNotifFlag: AsyncFlag,
+ options: {
+ cancellationToken?: CancellationToken;
+ forceUpdate?: boolean;
+ expectedMasterPub?: string;
+ } = {},
+): Promise<ReadyExchangeSummary> {
+ const operationId = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: canonUrl,
+ });
+ while (true) {
+ logger.info(`waiting for ready exchange ${canonUrl}`);
+ const { exchange, exchangeDetails, retryInfo, scopeInfo } =
+ await ws.db.runReadOnlyTx(
+ [
+ "exchanges",
+ "exchangeDetails",
+ "operationRetries",
+ "globalCurrencyAuditors",
+ "globalCurrencyExchanges",
+ ],
+ async (tx) => {
+ const exchange = await tx.exchanges.get(canonUrl);
+ const exchangeDetails = await getExchangeRecordsInternal(
+ tx,
+ canonUrl,
+ );
+ const retryInfo = await tx.operationRetries.get(operationId);
+ let scopeInfo: ScopeInfo | undefined = undefined;
+ if (exchange && exchangeDetails) {
+ scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
+ }
+ return { exchange, exchangeDetails, retryInfo, scopeInfo };
+ },
+ );
+
+ if (!exchange) {
+ throw Error("exchange entry does not exist anymore");
+ }
+
+ let ready = false;
+
+ switch (exchange.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Ready:
+ ready = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ // If the update is forced,
+ // we wait until we're in a full "ready" state,
+ // as we're not happy with the stale information.
+ if (!options.forceUpdate) {
+ ready = true;
+ }
+ break;
+ default: {
+ if (retryInfo) {
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ }
+ }
+ }
+
+ if (!ready) {
+ logger.info("waiting for exchange update notification");
+ await exchangeNotifFlag.wait();
+ logger.info("done waiting for exchange update notification");
+ exchangeNotifFlag.reset();
+ continue;
+ }
+
+ if (!exchangeDetails) {
+ throw Error("invariant failed");
+ }
+
+ if (!scopeInfo) {
+ throw Error("invariant failed");
+ }
+
+ const res: ReadyExchangeSummary = {
+ currency: exchangeDetails.currency,
+ exchangeBaseUrl: canonUrl,
+ masterPub: exchangeDetails.masterPublicKey,
+ tosStatus: getExchangeTosStatusFromRecord(exchange),
+ tosAcceptedEtag: exchange.tosAcceptedEtag,
+ wireInfo: exchangeDetails.wireInfo,
+ protocolVersionRange: exchangeDetails.protocolVersionRange,
+ tosCurrentEtag: exchange.tosCurrentEtag,
+ tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
+ exchange.tosAcceptedTimestamp,
+ ),
+ scopeInfo,
+ };
+
+ if (options.expectedMasterPub) {
+ if (res.masterPub !== options.expectedMasterPub) {
+ throw Error(
+ "public key of the exchange does not match expected public key",
+ );
+ }
+ }
+ return res;
+ }
+}
+
/**
* Ensure that a fresh exchange entry exists for the given
* exchange base URL.
@@ -933,127 +1063,149 @@ export async function fetchFreshExchange(
} = {},
): Promise<ReadyExchangeSummary> {
const canonUrl = canonicalizeBaseUrl(baseUrl);
- const operationId = constructTaskIdentifier({
- tag: PendingTaskType.ExchangeUpdate,
- exchangeBaseUrl: canonUrl,
+
+ ws.ensureTaskLoopRunning();
+
+ await startUpdateExchangeEntry(ws, canonUrl, {
+ forceUpdate: options.forceUpdate,
});
- const oldExchange = await ws.db
- .mktx((x) => [x.exchanges])
- .runReadOnly(async (tx) => {
- return tx.exchanges.get(canonUrl);
- });
+ return waitReadyExchange(ws, canonUrl, options);
+}
- let needsUpdate = false;
+async function waitReadyExchange(
+ ws: InternalWalletState,
+ canonUrl: string,
+ options: {
+ cancellationToken?: CancellationToken;
+ forceUpdate?: boolean;
+ expectedMasterPub?: string;
+ } = {},
+): Promise<ReadyExchangeSummary> {
+ // FIXME: We should use Symbol.dispose magic here for cleanup!
- if (!oldExchange || options.forceUpdate) {
- needsUpdate = true;
- await startUpdateExchangeEntry(ws, canonUrl, {
- forceUpdate: options.forceUpdate,
- });
- } else {
- const nextUpdate = timestampOptionalAbsoluteFromDb(
- oldExchange.nextUpdateStamp,
- );
+ const exchangeNotifFlag = new AsyncFlag();
+ // Raise exchangeNotifFlag whenever we get a notification
+ // about our exchange.
+ const cancelNotif = ws.addNotificationListener((notif) => {
if (
- nextUpdate == null ||
- AbsoluteTime.isExpired(nextUpdate) ||
- oldExchange.updateStatus !== ExchangeEntryDbUpdateStatus.Ready
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === canonUrl
) {
- needsUpdate = true;
+ logger.info(`raising update notification: ${j2s(notif)}`);
+ exchangeNotifFlag.raise();
}
- }
+ });
- if (needsUpdate) {
- await runTaskWithErrorReporting(ws, operationId, () =>
- updateExchangeFromUrlHandler(ws, canonUrl),
+ try {
+ const res = await internalWaitReadyExchange(
+ ws,
+ canonUrl,
+ exchangeNotifFlag,
+ options,
);
+ logger.info("done waiting for ready exchange");
+ return res;
+ } finally {
+ cancelNotif();
}
+}
- const { exchange, exchangeDetails, retryInfo, scopeInfo } =
- await ws.db.runReadOnlyTx(
- [
- "exchanges",
- "exchangeDetails",
- "operationRetries",
- "globalCurrencyAuditors",
- "globalCurrencyExchanges",
- ],
- async (tx) => {
- const exchange = await tx.exchanges.get(canonUrl);
- const exchangeDetails = await getExchangeRecordsInternal(tx, canonUrl);
- const retryInfo = await tx.operationRetries.get(operationId);
- let scopeInfo: ScopeInfo | undefined = undefined;
- if (exchange && exchangeDetails) {
- scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
- }
- return { exchange, exchangeDetails, retryInfo, scopeInfo };
- },
- );
+/**
+ * Update an exchange entry in the wallet's database
+ * by fetching the /keys and /wire information.
+ * Optionally link the reserve entry to the new or existing
+ * exchange entry in then DB.
+ */
+export async function updateExchangeFromUrlHandler(
+ ws: InternalWalletState,
+ exchangeBaseUrl: string,
+ cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+ logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
+ exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
- if (!exchange) {
- throw Error("exchange entry does not exist anymore");
+ const oldExchangeRec = await ws.db.runReadOnlyTx(
+ ["exchanges"],
+ async (tx) => {
+ return tx.exchanges.get(exchangeBaseUrl);
+ },
+ );
+
+ if (!oldExchangeRec) {
+ logger.info(`not updating exchange ${exchangeBaseUrl}, no record in DB`);
+ return TaskRunResult.finished();
}
- switch (exchange.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Ready:
+ let updateRequestedExplicitly = false;
+
+ switch (oldExchangeRec.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Suspended:
+ logger.info(`not updating exchange in status "suspended"`);
+ return TaskRunResult.finished();
+ case ExchangeEntryDbUpdateStatus.Initial:
+ logger.info(`not updating exchange in status "initial"`);
+ return TaskRunResult.finished();
+ case ExchangeEntryDbUpdateStatus.InitialUpdate:
case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ updateRequestedExplicitly = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.Ready:
break;
default:
- throw TalerError.fromDetail(TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- });
+ assertUnreachable(oldExchangeRec.updateStatus);
}
- if (!exchangeDetails) {
- throw Error("invariant failed");
- }
+ let refreshCheckNecessary = true;
- if (!scopeInfo) {
- throw Error("invariant failed");
- }
+ if (!updateRequestedExplicitly) {
+ // If the update wasn't requested explicitly,
+ // check if we really need to update.
- const res: ReadyExchangeSummary = {
- currency: exchangeDetails.currency,
- exchangeBaseUrl: canonUrl,
- masterPub: exchangeDetails.masterPublicKey,
- tosStatus: getExchangeTosStatusFromRecord(exchange),
- tosAcceptedEtag: exchange.tosAcceptedEtag,
- wireInfo: exchangeDetails.wireInfo,
- protocolVersionRange: exchangeDetails.protocolVersionRange,
- tosCurrentEtag: exchange.tosCurrentEtag,
- tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
- exchange.tosAcceptedTimestamp,
- ),
- scopeInfo,
- };
+ let nextUpdateStamp = timestampAbsoluteFromDb(
+ oldExchangeRec.nextUpdateStamp,
+ );
- if (options.expectedMasterPub) {
- if (res.masterPub !== options.expectedMasterPub) {
- throw Error(
- "public key of the exchange does not match expected public key",
+ let nextRefreshCheckStamp = timestampAbsoluteFromDb(
+ oldExchangeRec.nextRefreshCheckStamp,
+ );
+
+ let updateNecessary = true;
+
+ if (
+ !AbsoluteTime.isNever(nextUpdateStamp) &&
+ !AbsoluteTime.isExpired(nextUpdateStamp)
+ ) {
+ logger.info(
+ `exchange update for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString(
+ nextUpdateStamp,
+ )}`,
+ );
+ updateNecessary = false;
+ }
+
+ if (
+ !AbsoluteTime.isNever(nextRefreshCheckStamp) &&
+ !AbsoluteTime.isExpired(nextRefreshCheckStamp)
+ ) {
+ logger.info(
+ `exchange refresh check for ${exchangeBaseUrl} not necessary, scheduled for ${AbsoluteTime.toIsoString(
+ nextRefreshCheckStamp,
+ )}`,
+ );
+ refreshCheckNecessary = false;
+ }
+
+ if (!(updateNecessary || refreshCheckNecessary)) {
+ return TaskRunResult.runAgainAt(
+ AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp),
);
}
}
- return res;
-}
-/**
- * Update an exchange entry in the wallet's database
- * by fetching the /keys and /wire information.
- * Optionally link the reserve entry to the new or existing
- * exchange entry in then DB.
- */
-export async function updateExchangeFromUrlHandler(
- ws: InternalWalletState,
- exchangeBaseUrl: string,
- options: {
- cancellationToken?: CancellationToken;
- } = {},
-): Promise<TaskRunResult> {
- logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
- exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
+ // When doing the auto-refresh check, we always update
+ // the key info before that.
logger.trace("updating exchange /keys info");
@@ -1063,6 +1215,7 @@ export async function updateExchangeFromUrlHandler(
exchangeBaseUrl,
ws.http,
timeout,
+ cancellationToken,
);
logger.trace("validating exchange wire info");
@@ -1302,9 +1455,13 @@ export async function updateExchangeFromUrlHandler(
});
if (recoupGroupId) {
+ const recoupTaskId = constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId,
+ });
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(recoupTaskId);
}
if (!updated) {
@@ -1313,6 +1470,84 @@ export async function updateExchangeFromUrlHandler(
logger.trace("done updating exchange info in database");
+ logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
+
+ let minCheckThreshold = AbsoluteTime.addDuration(
+ AbsoluteTime.now(),
+ durationFromSpec({ days: 1 }),
+ );
+
+ if (refreshCheckNecessary) {
+ // Do auto-refresh.
+ await ws.db
+ .mktx((x) => [
+ x.coins,
+ x.denominations,
+ x.coinAvailability,
+ x.refreshGroups,
+ x.exchanges,
+ ])
+ .runReadWrite(async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange || !exchange.detailsPointer) {
+ return;
+ }
+ const coins = await tx.coins.indexes.byBaseUrl
+ .iter(exchangeBaseUrl)
+ .toArray();
+ const refreshCoins: CoinRefreshRequest[] = [];
+ for (const coin of coins) {
+ if (coin.status !== CoinStatus.Fresh) {
+ continue;
+ }
+ const denom = await tx.denominations.get([
+ exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ logger.warn("denomination not in database");
+ continue;
+ }
+ const executeThreshold =
+ getAutoRefreshExecuteThresholdForDenom(denom);
+ if (AbsoluteTime.isExpired(executeThreshold)) {
+ refreshCoins.push({
+ coinPub: coin.coinPub,
+ amount: denom.value,
+ });
+ } else {
+ const checkThreshold = getAutoRefreshCheckThreshold(denom);
+ minCheckThreshold = AbsoluteTime.min(
+ minCheckThreshold,
+ checkThreshold,
+ );
+ }
+ }
+ if (refreshCoins.length > 0) {
+ const res = await createRefreshGroup(
+ ws,
+ tx,
+ exchange.detailsPointer?.currency,
+ refreshCoins,
+ RefreshReason.Scheduled,
+ undefined,
+ );
+ logger.trace(
+ `created refresh group for auto-refresh (${res.refreshGroupId})`,
+ );
+ }
+ logger.trace(
+ `next refresh check at ${AbsoluteTime.toIsoString(
+ minCheckThreshold,
+ )}`,
+ );
+ exchange.nextRefreshCheckStamp = timestampPreciseToDb(
+ AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
+ );
+ await tx.exchanges.put(exchange);
+ });
+ }
+
ws.notify({
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl,
@@ -1320,7 +1555,33 @@ export async function updateExchangeFromUrlHandler(
oldExchangeState: updated.oldExchangeState,
});
- return TaskRunResult.finished();
+ // Next invocation will cause the task to be run again
+ // at the necessary time.
+ return TaskRunResult.progress();
+}
+
+function getAutoRefreshExecuteThresholdForDenom(
+ d: DenominationRecord,
+): AbsoluteTime {
+ return getAutoRefreshExecuteThreshold({
+ stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
+ stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
+ });
+}
+
+/**
+ * Timestamp after which the wallet would do the next check for an auto-refresh.
+ */
+function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
+ const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+ timestampProtocolFromDb(d.stampExpireWithdraw),
+ );
+ const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+ timestampProtocolFromDb(d.stampExpireDeposit),
+ );
+ const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+ const deltaDiv = durationMul(delta, 0.75);
+ return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
}
/**
@@ -1420,6 +1681,7 @@ export async function downloadExchangeInfo(
exchangeBaseUrl,
http,
Duration.getForever(),
+ CancellationToken.CONTINUE,
);
return {
keys: keysInfo,
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts
index 52f9c70b1..e00432bd0 100644
--- a/packages/taler-wallet-core/src/operations/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts
@@ -77,6 +77,7 @@ import {
TalerProtocolViolationError,
TalerUriAction,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -102,12 +103,14 @@ import {
WalletStoresV1,
} from "../db.js";
import {
+ AsyncFlag,
getCandidateWithdrawalDenomsTx,
PendingTaskType,
RefundGroupRecord,
RefundGroupStatus,
RefundItemRecord,
RefundItemStatus,
+ TaskId,
timestampPreciseToDb,
timestampProtocolFromDb,
timestampProtocolToDb,
@@ -128,8 +131,6 @@ import {
import {
constructTaskIdentifier,
DbRetryInfo,
- runLongpollAsync,
- runTaskWithErrorReporting,
spendCoins,
TaskIdentifiers,
TaskRunResult,
@@ -147,7 +148,6 @@ import {
constructTransactionIdentifier,
notifyTransition,
parseTransactionIdentifier,
- stopLongpolling,
} from "./transactions.js";
/**
@@ -156,8 +156,8 @@ import {
const logger = new Logger("pay-merchant.ts");
export class PayMerchantTransactionContext implements TransactionContext {
- private transactionId: string;
- private retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskId;
constructor(
public ws: InternalWalletState,
@@ -167,7 +167,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
tag: TransactionType.Payment,
proposalId,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId,
});
@@ -252,7 +252,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, proposalId, transactionId } = this;
- stopLongpolling(ws, this.retryTag);
+ ws.taskScheduler.stopShepherdTask(this.taskId);
const transitionInfo = await ws.db
.mktx((x) => [x.purchases])
.runReadWrite(async (tx) => {
@@ -270,7 +270,6 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- ws.workAvailable.trigger();
}
async abortTransaction(): Promise<void> {
@@ -330,18 +329,18 @@ export class PayMerchantTransactionContext implements TransactionContext {
break;
}
await tx.purchases.put(purchase);
- await tx.operationRetries.delete(this.retryTag);
+ await tx.operationRetries.delete(this.taskId);
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
},
);
+ ws.taskScheduler.stopShepherdTask(this.taskId);
notifyTransition(ws, transactionId, transitionInfo);
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
async resumeTransaction(): Promise<void> {
- const { ws, proposalId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
+ const { ws, proposalId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.purchases])
.runReadWrite(async (tx) => {
@@ -358,9 +357,8 @@ export class PayMerchantTransactionContext implements TransactionContext {
const newTxState = computePayMerchantTransactionState(purchase);
return { oldTxState, newTxState };
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
@@ -394,7 +392,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(this.taskId);
}
}
@@ -638,14 +636,18 @@ async function processDownloadProposal(
return TaskRunResult.finished();
}
+ const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
if (proposal.purchaseStatus != PurchaseStatus.PendingDownloadingProposal) {
+ logger.error(
+ `unexpected state ${proposal.purchaseStatus}/${
+ PurchaseStatus[proposal.purchaseStatus]
+ } for ${ctx.transactionId} in processDownloadProposal`,
+ );
return TaskRunResult.finished();
}
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- });
+ const transactionId = ctx.transactionId;
const orderClaimUrl = new URL(
`orders/${proposal.orderId}/claim`,
@@ -857,7 +859,7 @@ async function processDownloadProposal(
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
/**
@@ -865,7 +867,7 @@ async function processDownloadProposal(
* record for the provided arguments already exists,
* return the old proposal ID.
*/
-async function createPurchase(
+async function createOrReusePurchase(
ws: InternalWalletState,
merchantBaseUrl: string,
orderId: string,
@@ -889,23 +891,26 @@ async function createPurchase(
p.claimToken === claimToken
);
});
- /* If we have already claimed this proposal with the same sessionId
- * nonce and claim token, reuse it. */
+ // If we have already claimed this proposal with the same sessionId
+ // nonce and claim token, reuse it. */
if (
oldProposal &&
oldProposal.downloadSessionId === sessionId &&
(!noncePriv || oldProposal.noncePriv === noncePriv) &&
oldProposal.claimToken === claimToken
) {
- // FIXME: This lacks proper error handling
- await processDownloadProposal(ws, oldProposal.proposalId);
-
+ logger.info(
+ `Found old proposal (status=${
+ PurchaseStatus[oldProposal.purchaseStatus]
+ }) for order ${orderId} at ${merchantBaseUrl}`,
+ );
if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
const download = await expectProposalDownload(ws, oldProposal);
const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+ logger.info(`old proposal paid: ${paid}`);
if (paid) {
- //if this transaction was shared and the order is paid then it
- //means that another wallet already paid the proposal
+ // if this transaction was shared and the order is paid then it
+ // means that another wallet already paid the proposal
const transitionInfo = await ws.db
.mktx((x) => [x.purchases])
.runReadWrite(async (tx) => {
@@ -990,8 +995,6 @@ async function createPurchase(
proposalId,
});
notifyTransition(ws, transactionId, transitionInfo);
-
- await processDownloadProposal(ws, proposalId);
return proposalId;
}
@@ -1244,11 +1247,10 @@ async function handleInsufficientFunds(
});
}
-// FIXME: Should probably not be exported in its current state
// FIXME: Should take a transaction ID instead of a proposal ID
// FIXME: Does way more than checking the payment
// FIXME: Should return immediately.
-export async function checkPaymentByProposalId(
+async function checkPaymentByProposalId(
ws: InternalWalletState,
proposalId: string,
sessionId?: string,
@@ -1284,10 +1286,9 @@ export async function checkPaymentByProposalId(
proposalId = proposal.proposalId;
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- });
+ const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+ const transactionId = ctx.transactionId;
const talerUri = stringifyTalerUri({
type: TalerUriAction.Pay,
@@ -1377,12 +1378,12 @@ export async function checkPaymentByProposalId(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- // FIXME: What about error handling?! This doesn't properly store errors in the DB.
- const r = await processPurchasePay(ws, proposalId);
- if (r.type !== TaskRunResultType.Finished) {
- // FIXME: This does not surface the original error
- throw Error("submitting pay failed");
- }
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+ // FIXME: Consider changing the API here so that we don't have to
+ // wait inline for the repurchase.
+
+ await waitPaymentResult(ws, proposalId, sessionId);
const download = await expectProposalDownload(ws, purchase);
return {
status: PreparePayResultType.AlreadyConfirmed,
@@ -1476,7 +1477,7 @@ export async function preparePayForUri(
);
}
- const proposalId = await createPurchase(
+ const proposalId = await createOrReusePurchase(
ws,
uriResult.merchantBaseUrl,
uriResult.orderId,
@@ -1485,9 +1486,79 @@ export async function preparePayForUri(
uriResult.noncePriv,
);
+ await waitProposalDownloaded(ws, proposalId);
+
return checkPaymentByProposalId(ws, proposalId, uriResult.sessionId);
}
+/**
+ * Wait until a proposal is at least downloaded.
+ */
+async function waitProposalDownloaded(
+ ws: InternalWalletState,
+ proposalId: string,
+): Promise<void> {
+ const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+ logger.info(`waiting for ${ctx.transactionId} to be downloaded`);
+
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+ // FIXME: We should use Symbol.dispose magic here for cleanup!
+
+ const payNotifFlag = new AsyncFlag();
+ // Raise exchangeNotifFlag whenever we get a notification
+ // about our exchange.
+ const cancelNotif = ws.addNotificationListener((notif) => {
+ if (
+ notif.type === NotificationType.TransactionStateTransition &&
+ notif.transactionId === ctx.transactionId
+ ) {
+ logger.info(`raising update notification: ${j2s(notif)}`);
+ payNotifFlag.raise();
+ }
+ });
+
+ try {
+ await internalWaitProposalDownloaded(ctx, payNotifFlag);
+ logger.info(`done waiting for ${ctx.transactionId} to be downloaded`);
+ } finally {
+ cancelNotif();
+ }
+}
+
+async function internalWaitProposalDownloaded(
+ ctx: PayMerchantTransactionContext,
+ payNotifFlag: AsyncFlag,
+): Promise<void> {
+ while (true) {
+ const { purchase, retryInfo } = await ctx.ws.db.runReadOnlyTx(
+ ["purchases", "operationRetries"],
+ async (tx) => {
+ return {
+ purchase: await tx.purchases.get(ctx.proposalId),
+ retryInfo: await tx.operationRetries.get(ctx.taskId),
+ };
+ },
+ );
+ if (!purchase) {
+ throw Error("purchase does not exist anymore");
+ }
+ if (purchase.download) {
+ return;
+ }
+ if (retryInfo) {
+ if (retryInfo.lastError) {
+ throw TalerError.fromUncheckedDetail(retryInfo.lastError);
+ } else {
+ throw Error("transient error while waiting for proposal download");
+ }
+ }
+ await payNotifFlag.wait();
+ payNotifFlag.reset();
+ }
+}
+
export async function preparePayForTemplate(
ws: InternalWalletState,
req: PreparePayTemplateRequest,
@@ -1598,71 +1669,101 @@ export async function generateDepositPermissions(
return depositPermissions;
}
-/**
- * Run the operation handler for a payment
- * and return the result as a {@link ConfirmPayResult}.
- */
-async function runPayForConfirmPay(
- ws: InternalWalletState,
- proposalId: string,
+async function internalWaitPaymentResult(
+ ctx: PayMerchantTransactionContext,
+ purchaseNotifFlag: AsyncFlag,
+ waitSessionId?: string,
): Promise<ConfirmPayResult> {
- logger.trace("processing proposal for confirmPay");
- const taskId = constructTaskIdentifier({
- tag: PendingTaskType.Purchase,
- proposalId,
- });
- const res = await runTaskWithErrorReporting(ws, taskId, async () => {
- return await processPurchasePay(ws, proposalId);
- });
- logger.trace(`processPurchasePay response type ${res.type}`);
- switch (res.type) {
- case TaskRunResultType.Finished: {
- const purchase = await ws.db
- .mktx((x) => [x.purchases])
- .runReadOnly(async (tx) => {
- return tx.purchases.get(proposalId);
- });
- if (!purchase) {
- throw Error("purchase record not available anymore");
+ while (true) {
+ const txRes = await ctx.ws.db.runReadOnlyTx(
+ ["purchases", "operationRetries"],
+ async (tx) => {
+ const purchase = await tx.purchases.get(ctx.proposalId);
+ const retryRecord = await tx.operationRetries.get(ctx.taskId);
+ return { purchase, retryRecord };
+ },
+ );
+
+ if (!txRes.purchase) {
+ throw Error("purchase gone");
+ }
+
+ const purchase = txRes.purchase;
+
+ logger.info(
+ `purchase is in state ${PurchaseStatus[purchase.purchaseStatus]}`,
+ );
+
+ const d = await expectProposalDownload(ctx.ws, purchase);
+
+ if (txRes.purchase.timestampFirstSuccessfulPay) {
+ if (
+ waitSessionId == null ||
+ txRes.purchase.lastSessionId === waitSessionId
+ ) {
+ return {
+ type: ConfirmPayResultType.Done,
+ contractTerms: d.contractTermsRaw,
+ transactionId: ctx.transactionId,
+ };
}
- const d = await expectProposalDownload(ws, purchase);
- return {
- type: ConfirmPayResultType.Done,
- contractTerms: d.contractTermsRaw,
- transactionId: constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- }),
- };
}
- case TaskRunResultType.Error: {
- // We hide transient errors from the caller.
- const opRetry = await ws.db
- .mktx((x) => [x.operationRetries])
- .runReadOnly(async (tx) => tx.operationRetries.get(taskId));
+
+ if (txRes.retryRecord) {
return {
type: ConfirmPayResultType.Pending,
- lastError: opRetry?.lastError,
- transactionId: constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- }),
+ lastError: txRes.retryRecord.lastError,
+ transactionId: ctx.transactionId,
};
}
- case TaskRunResultType.Pending:
- logger.trace("reporting pending as confirmPay response");
- return {
- type: ConfirmPayResultType.Pending,
- transactionId: constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- }),
- lastError: undefined,
- };
- case TaskRunResultType.Longpoll:
- throw Error("unexpected processPurchasePay result (longpoll)");
- default:
- assertUnreachable(res);
+
+ await purchaseNotifFlag.wait();
+ purchaseNotifFlag.reset();
+ }
+}
+
+/**
+ * Wait until either:
+ * a) the payment succeeded (if provided under the {@param waitSessionId}), or
+ * b) the attempt to pay failed (merchant unavailable, etc.)
+ */
+async function waitPaymentResult(
+ ws: InternalWalletState,
+ proposalId: string,
+ waitSessionId?: string,
+): Promise<ConfirmPayResult> {
+ const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+ ws.ensureTaskLoopRunning();
+
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+ // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+ const purchaseNotifFlag = new AsyncFlag();
+ // Raise purchaseNotifFlag whenever we get a notification
+ // about our purchase.
+ const cancelNotif = ws.addNotificationListener((notif) => {
+ if (
+ notif.type === NotificationType.TransactionStateTransition &&
+ notif.transactionId === ctx.transactionId
+ ) {
+ purchaseNotifFlag.raise();
+ }
+ });
+
+ try {
+ logger.info(`waiting for first payment success on ${ctx.transactionId}`);
+ const res = await internalWaitPaymentResult(
+ ctx,
+ purchaseNotifFlag,
+ waitSessionId,
+ );
+ logger.info(
+ `done waiting for first payment success on ${ctx.transactionId}, result ${res.type}`,
+ );
+ return res;
+ } finally {
+ cancelNotif();
}
}
@@ -1719,7 +1820,12 @@ export async function confirmPay(
if (existingPurchase && existingPurchase.payInfo) {
logger.trace("confirmPay: submitting payment for existing purchase");
- return runPayForConfirmPay(ws, proposalId);
+ const ctx = new PayMerchantTransactionContext(
+ ws,
+ existingPurchase.proposalId,
+ );
+ await ws.taskScheduler.resetTaskRetries(ctx.taskId);
+ return waitPaymentResult(ws, proposalId);
}
logger.trace("confirmPay: purchase record does not exist yet");
@@ -1817,9 +1923,8 @@ export async function confirmPay(
hintTransactionId: transactionId,
});
- // We directly make a first attempt to pay.
- // FIXME: In the future we should just wait for the right event
- return runPayForConfirmPay(ws, proposalId);
+ // Wait until we have completed the first attempt to pay.
+ return waitPaymentResult(ws, proposalId);
}
export async function processPurchase(
@@ -2017,7 +2122,7 @@ async function processPurchasePay(
// FIXME: Should we really consider this to be pending?
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
}
@@ -2076,7 +2181,7 @@ async function processPurchasePay(
await storePayReplaySuccess(ws, proposalId, sessionId);
}
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
export async function refuseProposal(
@@ -2365,7 +2470,7 @@ export async function sharePayment(
p.purchaseStatus !== PurchaseStatus.DialogProposed &&
p.purchaseStatus !== PurchaseStatus.DialogShared
) {
- //FIXME: purchase can be shared before being paid
+ // FIXME: purchase can be shared before being paid
return undefined;
}
if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
@@ -2426,57 +2531,37 @@ async function processPurchaseDialogShared(
): Promise<TaskRunResult> {
const proposalId = purchase.proposalId;
logger.trace(`processing dialog-shared for proposal ${proposalId}`);
-
- const taskId = constructTaskIdentifier({
- tag: PendingTaskType.Purchase,
- proposalId,
- });
-
- // FIXME: Put this logic into runLongpollAsync?
- if (ws.activeLongpoll[taskId]) {
- return TaskRunResult.longpoll();
- }
const download = await expectProposalDownload(ws, purchase);
if (purchase.purchaseStatus !== PurchaseStatus.DialogShared) {
return TaskRunResult.finished();
}
- runLongpollAsync(ws, taskId, async (ct) => {
- const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
- if (paid) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
- const p = await tx.purchases.get(purchase.proposalId);
- if (!p) {
- logger.warn("purchase does not exist anymore");
- return;
- }
- const oldTxState = computePayMerchantTransactionState(p);
- p.purchaseStatus = PurchaseStatus.FailedClaim;
- const newTxState = computePayMerchantTransactionState(p);
- await tx.purchases.put(p);
- return { oldTxState, newTxState };
- });
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
+ const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+ if (paid) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.purchases])
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(purchase.proposalId);
+ if (!p) {
+ logger.warn("purchase does not exist anymore");
+ return;
+ }
+ const oldTxState = computePayMerchantTransactionState(p);
+ p.purchaseStatus = PurchaseStatus.FailedClaim;
+ const newTxState = computePayMerchantTransactionState(p);
+ await tx.purchases.put(p);
+ return { oldTxState, newTxState };
});
+ const transactionId = constructTransactionIdentifier({
+ tag: TransactionType.Payment,
+ proposalId,
+ });
- notifyTransition(ws, transactionId, transitionInfo);
-
- return {
- ready: true,
- };
- }
-
- return {
- ready: false,
- };
- });
+ notifyTransition(ws, transactionId, transitionInfo);
+ }
- return TaskRunResult.longpoll();
+ return TaskRunResult.backoff();
}
async function processPurchaseAutoRefund(
@@ -2496,97 +2581,81 @@ async function processPurchaseAutoRefund(
proposalId,
});
- // FIXME: Put this logic into runLongpollAsync?
- if (ws.activeLongpoll[taskId]) {
- return TaskRunResult.longpoll();
- }
-
const download = await expectProposalDownload(ws, purchase);
- runLongpollAsync(ws, taskId, async (ct) => {
- if (
- !purchase.autoRefundDeadline ||
- AbsoluteTime.isExpired(
- AbsoluteTime.fromProtocolTimestamp(
- timestampProtocolFromDb(purchase.autoRefundDeadline),
- ),
- )
- ) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
- const p = await tx.purchases.get(purchase.proposalId);
- if (!p) {
- logger.warn("purchase does not exist anymore");
- return;
- }
- if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
- return;
- }
- const oldTxState = computePayMerchantTransactionState(p);
- p.purchaseStatus = PurchaseStatus.Done;
- p.refundAmountAwaiting = undefined;
- const newTxState = computePayMerchantTransactionState(p);
- await tx.purchases.put(p);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
- }
+ if (
+ !purchase.autoRefundDeadline ||
+ AbsoluteTime.isExpired(
+ AbsoluteTime.fromProtocolTimestamp(
+ timestampProtocolFromDb(purchase.autoRefundDeadline),
+ ),
+ )
+ ) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.purchases])
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(purchase.proposalId);
+ if (!p) {
+ logger.warn("purchase does not exist anymore");
+ return;
+ }
+ if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
+ return;
+ }
+ const oldTxState = computePayMerchantTransactionState(p);
+ p.purchaseStatus = PurchaseStatus.Done;
+ p.refundAmountAwaiting = undefined;
+ const newTxState = computePayMerchantTransactionState(p);
+ await tx.purchases.put(p);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ return TaskRunResult.finished();
+ }
- const requestUrl = new URL(
- `orders/${download.contractData.orderId}`,
- download.contractData.merchantBaseUrl,
- );
- requestUrl.searchParams.set(
- "h_contract",
- download.contractData.contractTermsHash,
- );
+ const requestUrl = new URL(
+ `orders/${download.contractData.orderId}`,
+ download.contractData.merchantBaseUrl,
+ );
+ requestUrl.searchParams.set(
+ "h_contract",
+ download.contractData.contractTermsHash,
+ );
- requestUrl.searchParams.set("timeout_ms", "1000");
- requestUrl.searchParams.set("await_refund_obtained", "yes");
+ requestUrl.searchParams.set("timeout_ms", "1000");
+ requestUrl.searchParams.set("await_refund_obtained", "yes");
- const resp = await ws.http.fetch(requestUrl.href);
+ const resp = await ws.http.fetch(requestUrl.href);
- // FIXME: Check other status codes!
+ // FIXME: Check other status codes!
- const orderStatus = await readSuccessResponseJsonOrThrow(
- resp,
- codecForMerchantOrderStatusPaid(),
- );
+ const orderStatus = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForMerchantOrderStatusPaid(),
+ );
- if (orderStatus.refund_pending) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.purchases])
- .runReadWrite(async (tx) => {
- const p = await tx.purchases.get(purchase.proposalId);
- if (!p) {
- logger.warn("purchase does not exist anymore");
- return;
- }
- if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) {
- return;
- }
- const oldTxState = computePayMerchantTransactionState(p);
- p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
- const newTxState = computePayMerchantTransactionState(p);
- await tx.purchases.put(p);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
- } else {
- return {
- ready: false,
- };
- }
- });
+ if (orderStatus.refund_pending) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.purchases])
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(purchase.proposalId);
+ if (!p) {
+ logger.warn("purchase does not exist anymore");
+ return;
+ }
+ if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) {
+ return;
+ }
+ const oldTxState = computePayMerchantTransactionState(p);
+ p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
+ const newTxState = computePayMerchantTransactionState(p);
+ await tx.purchases.put(p);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ }
- return TaskRunResult.longpoll();
+ return TaskRunResult.backoff();
}
async function processPurchaseAbortingRefund(
@@ -2734,7 +2803,7 @@ async function processPurchaseQueryRefund(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
} else {
const refundAwaiting = Amounts.sub(
Amounts.parseOrThrow(orderStatus.refund_amount),
@@ -2760,7 +2829,7 @@ async function processPurchaseQueryRefund(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
}
@@ -2836,10 +2905,7 @@ export async function startQueryRefund(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Payment,
- proposalId,
- });
+ const ctx = new PayMerchantTransactionContext(ws, proposalId);
const transitionInfo = await ws.db
.mktx((x) => [x.purchases])
.runReadWrite(async (tx) => {
@@ -2857,8 +2923,8 @@ export async function startQueryRefund(
await tx.purchases.put(p);
return { oldTxState, newTxState };
});
- notifyTransition(ws, transactionId, transitionInfo);
- ws.workAvailable.trigger();
+ notifyTransition(ws, ctx.transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
}
async function computeRefreshRequest(
@@ -3128,10 +3194,10 @@ async function storeRefunds(
notifyTransition(ws, transactionId, result.transitionInfo);
if (result.numPendingItemsTotal > 0) {
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
+ } else {
+ return TaskRunResult.progress();
}
-
- return TaskRunResult.finished();
}
export function computeRefundTransactionState(
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index e655eba4b..cc41abde9 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -33,6 +33,7 @@ import {
TalerProtocolTimestamp,
TalerUriAction,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -62,17 +63,15 @@ import {
timestampPreciseToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant } from "../util/invariants.js";
import {
- LongpollResult,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
} from "./common.js";
import {
codecForExchangePurseStatus,
@@ -81,7 +80,6 @@ import {
import {
constructTransactionIdentifier,
notifyTransition,
- stopLongpolling,
} from "./transactions.js";
import {
getExchangeWithdrawalInfo,
@@ -91,8 +89,8 @@ import {
const logger = new Logger("pay-peer-pull-credit.ts");
export class PeerPullCreditTransactionContext implements TransactionContext {
- private transactionId: string;
- private retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly retryTag: TaskId;
constructor(
public ws: InternalWalletState,
@@ -139,7 +137,6 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -193,12 +190,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -244,11 +241,11 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
return undefined;
});
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.stopShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -301,13 +298,12 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullCredit])
.runReadWrite(async (tx) => {
@@ -355,7 +351,9 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -363,7 +361,7 @@ async function queryPurseForPeerPullCredit(
ws: InternalWalletState,
pullIni: PeerPullCreditRecord,
cancellationToken: CancellationToken,
-): Promise<LongpollResult> {
+): Promise<TaskRunResult> {
const purseDepositUrl = new URL(
`purses/${pullIni.pursePub}/deposit`,
pullIni.exchangeBaseUrl,
@@ -401,10 +399,10 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
+ return TaskRunResult.backoff();
}
case HttpStatusCode.NotFound:
- return { ready: false };
+ return TaskRunResult.backoff();
}
const result = await readSuccessResponseJsonOrThrow(
@@ -418,7 +416,7 @@ async function queryPurseForPeerPullCredit(
if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) {
logger.info("purse not ready yet (no deposit)");
- return { ready: false };
+ return TaskRunResult.backoff();
}
const reserve = await ws.db
@@ -462,9 +460,7 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
+ return TaskRunResult.backoff();
}
async function longpollKycStatus(
@@ -473,6 +469,7 @@ async function longpollKycStatus(
exchangeUrl: string,
kycInfo: KycPendingInfo,
userType: KycUserType,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -483,56 +480,47 @@ async function longpollKycStatus(
pursePub,
});
- runLongpollAsync(ws, retryTag, async (ct) => {
- const url = new URL(
- `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
- exchangeUrl,
- );
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
- method: "GET",
- cancellationToken: ct,
- });
- if (
- kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
- // remove after the exchange is fixed or clarified
- kycStatusRes.status === HttpStatusCode.NoContent
- ) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPullCredit])
- .runReadWrite(async (tx) => {
- const peerIni = await tx.peerPullCredit.get(pursePub);
- if (!peerIni) {
- return;
- }
- if (
- peerIni.status !==
- PeerPullPaymentCreditStatus.PendingMergeKycRequired
- ) {
- return;
- }
- const oldTxState = computePeerPullCreditTransactionState(peerIni);
- peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
- const newTxState = computePeerPullCreditTransactionState(peerIni);
- await tx.peerPullCredit.put(peerIni);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
- } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
- // FIXME: Do we have to update the URL here?
- return { ready: false };
- } else {
- throw Error(
- `unexpected response from kyc-check (${kycStatusRes.status})`,
- );
- }
+ const url = new URL(
+ `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+ exchangeUrl,
+ );
+ url.searchParams.set("timeout_ms", "10000");
+ logger.info(`kyc url ${url.href}`);
+ const kycStatusRes = await ws.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken,
});
- return {
- type: TaskRunResultType.Longpoll,
- };
+ if (
+ kycStatusRes.status === HttpStatusCode.Ok ||
+ //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // remove after the exchange is fixed or clarified
+ kycStatusRes.status === HttpStatusCode.NoContent
+ ) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.peerPullCredit])
+ .runReadWrite(async (tx) => {
+ const peerIni = await tx.peerPullCredit.get(pursePub);
+ if (!peerIni) {
+ return;
+ }
+ if (
+ peerIni.status !== PeerPullPaymentCreditStatus.PendingMergeKycRequired
+ ) {
+ return;
+ }
+ const oldTxState = computePeerPullCreditTransactionState(peerIni);
+ peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
+ const newTxState = computePeerPullCreditTransactionState(peerIni);
+ await tx.peerPullCredit.put(peerIni);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+ // FIXME: Do we have to update the URL here?
+ } else {
+ throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+ }
+ return TaskRunResult.backoff();
}
async function processPeerPullCreditAbortingDeletePurse(
@@ -584,7 +572,7 @@ async function processPeerPullCreditAbortingDeletePurse(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function handlePeerPullCreditWithdrawing(
@@ -637,7 +625,7 @@ async function handlePeerPullCreditWithdrawing(
return TaskRunResult.finished();
} else {
// FIXME: Return indicator that we depend on the other operation!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
}
@@ -757,13 +745,13 @@ async function handlePeerPullCreditCreatePurse(
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
-
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const pullIni = await ws.db
.mktx((x) => [x.peerPullCredit])
@@ -779,14 +767,6 @@ export async function processPeerPullCredit(
pursePub,
});
- // We're already running!
- if (ws.activeLongpoll[retryTag]) {
- logger.info("peer-pull-credit already in long-polling, returning!");
- return {
- type: TaskRunResultType.Longpoll,
- };
- }
-
logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
switch (pullIni.status) {
@@ -794,15 +774,7 @@ export async function processPeerPullCredit(
return TaskRunResult.finished();
}
case PeerPullPaymentCreditStatus.PendingReady:
- runLongpollAsync(ws, retryTag, async (cancellationToken) =>
- queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
- );
- logger.trace(
- "returning early from processPeerPullCredit for long-polling in background",
- );
- return {
- type: TaskRunResultType.Longpoll,
- };
+ return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken);
case PeerPullPaymentCreditStatus.PendingMergeKycRequired: {
if (!pullIni.kycInfo) {
throw Error("invalid state, kycInfo required");
@@ -813,6 +785,7 @@ export async function processPeerPullCredit(
pullIni.exchangeBaseUrl,
pullIni.kycInfo,
"individual",
+ cancellationToken,
);
}
case PeerPullPaymentCreditStatus.PendingCreatePurse:
@@ -866,7 +839,7 @@ async function processPeerPullCreditKycRequired(
kycStatusRes.status === HttpStatusCode.NoContent
) {
logger.warn("kyc requested, but already fulfilled");
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusRes.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
@@ -906,7 +879,7 @@ async function processPeerPullCreditKycRequired(
};
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
} else {
throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
}
@@ -1095,20 +1068,16 @@ export async function initiatePeerPullPayment(
return { oldTxState, newTxState };
});
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.PeerPullCredit,
- pursePub: pursePair.pub,
- });
+ const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub);
// The pending-incoming balance has changed.
ws.notify({
type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
+ hintTransactionId: ctx.transactionId,
});
- notifyTransition(ws, transactionId, transitionInfo);
-
- ws.workAvailable.trigger();
+ notifyTransition(ws, ctx.transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(ctx.retryTag);
return {
talerUri: stringifyTalerUri({
@@ -1116,7 +1085,7 @@ export async function initiatePeerPullPayment(
exchangeBaseUrl: exchangeBaseUrl,
contractPriv: contractKeyPair.priv,
}),
- transactionId,
+ transactionId: ctx.transactionId,
};
}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index c7e447dab..e5ae6b73b 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -42,6 +42,7 @@ import {
TalerPreciseTimestamp,
TalerProtocolViolationError,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -69,6 +70,7 @@ import {
PendingTaskType,
RefreshOperationStatus,
StoreNames,
+ TaskId,
WalletStoresV1,
createRefreshGroup,
timestampPreciseToDb,
@@ -93,7 +95,6 @@ import {
constructTransactionIdentifier,
notifyTransition,
parseTransactionIdentifier,
- stopLongpolling,
} from "./transactions.js";
const logger = new Logger("pay-peer-pull-debit.ts");
@@ -103,8 +104,8 @@ const logger = new Logger("pay-peer-pull-debit.ts");
*/
export class PeerPullDebitTransactionContext implements TransactionContext {
ws: InternalWalletState;
- transactionId: string;
- taskId: string;
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskId;
peerPullDebitId: string;
constructor(ws: InternalWalletState, peerPullDebitId: string) {
@@ -140,7 +141,6 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const transactionId = this.transactionId;
const ws = this.ws;
const peerPullDebitId = this.peerPullDebitId;
- stopLongpolling(ws, taskId);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullDebit])
.runReadWrite(async (tx) => {
@@ -185,11 +185,11 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
return undefined;
});
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.stopShepherdTask(taskId);
}
async resumeTransaction(): Promise<void> {
const ctx = this;
- stopLongpolling(ctx.ws, ctx.taskId);
await ctx.transition(async (pi) => {
switch (pi.status) {
case PeerPullDebitRecordStatus.SuspendedDeposit:
@@ -207,11 +207,11 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
return TransitionResult.Stay;
}
});
+ this.ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
const ctx = this;
- stopLongpolling(ctx.ws, ctx.taskId);
await ctx.transition(async (pi) => {
switch (pi.status) {
case PeerPullDebitRecordStatus.SuspendedDeposit:
@@ -225,6 +225,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
return TransitionResult.Stay;
}
});
+ this.ws.taskScheduler.stopShepherdTask(this.taskId);
}
async abortTransaction(): Promise<void> {
@@ -325,7 +326,9 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
},
);
+ ws.taskScheduler.stopShepherdTask(this.taskId);
notifyTransition(ws, this.transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
}
@@ -405,7 +408,7 @@ async function handlePurseCreationConflict(
}
await tx.peerPullDebit.put(myPpi);
});
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
async function processPeerPullDebitPendingDeposit(
@@ -469,7 +472,7 @@ async function processPeerPullDebitPendingDeposit(
}
case HttpStatusCode.Gone: {
await ctx.abortTransaction();
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
case HttpStatusCode.Conflict: {
return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
@@ -529,7 +532,7 @@ async function processPeerPullDebitAbortingRefresh(
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
export async function processPeerPullDebit(
@@ -607,7 +610,7 @@ export async function confirmPeerPullDebit(
coinSelRes.result.coins,
);
- const ppi = await ws.db
+ await ws.db
.mktx((x) => [
x.exchanges,
x.coins,
@@ -643,19 +646,19 @@ export async function confirmPeerPullDebit(
};
}
await tx.peerPullDebit.put(pi);
- return pi;
});
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.PeerPullDebit,
- peerPullDebitId,
- });
+ const ctx = new PeerPullDebitTransactionContext(ws, peerPullDebitId);
+
+ const transactionId = ctx.transactionId;
ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
});
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
return {
transactionId,
};
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
index 427961f44..23976f11b 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
@@ -17,6 +17,7 @@
import {
AcceptPeerPushPaymentResponse,
Amounts,
+ CancellationToken,
ConfirmPeerPushCreditRequest,
ContractTermsUtil,
ExchangePurseMergeRequest,
@@ -54,9 +55,10 @@ import {
InternalWalletState,
KycPendingInfo,
KycUserType,
- PeerPushPaymentIncomingRecord,
PeerPushCreditStatus,
+ PeerPushPaymentIncomingRecord,
PendingTaskType,
+ TaskId,
WithdrawalGroupStatus,
WithdrawalRecordType,
timestampPreciseToDb,
@@ -69,9 +71,8 @@ import {
TombstoneTag,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
} from "./common.js";
-import { fetchFreshExchange, markExchangeUsed } from "./exchanges.js";
+import { fetchFreshExchange } from "./exchanges.js";
import {
codecForExchangePurseStatus,
getMergeReserveInfo,
@@ -81,7 +82,6 @@ import {
constructTransactionIdentifier,
notifyTransition,
parseTransactionIdentifier,
- stopLongpolling,
} from "./transactions.js";
import {
PerformCreateWithdrawalGroupResult,
@@ -93,8 +93,8 @@ import {
const logger = new Logger("pay-peer-push-credit.ts");
export class PeerPushCreditTransactionContext implements TransactionContext {
- private transactionId: string;
- private retryTag: string;
+ readonly transactionId: string;
+ readonly retryTag: TaskId;
constructor(
public ws: InternalWalletState,
@@ -141,7 +141,6 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, peerPushCreditId, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -191,11 +190,11 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
},
);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.stopShepherdTask(retryTag);
}
async abortTransaction(): Promise<void> {
const { ws, peerPushCreditId, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -248,11 +247,11 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
},
);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, peerPushCreditId, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -300,13 +299,12 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
const { ws, peerPushCreditId, retryTag, transactionId } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushCredit"],
async (tx) => {
@@ -349,8 +347,9 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -521,63 +520,51 @@ async function longpollKycStatus(
exchangeUrl: string,
kycInfo: KycPendingInfo,
userType: KycUserType,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
peerPushCreditId,
});
- const retryTag = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushCredit,
- peerPushCreditId,
- });
-
- runLongpollAsync(ws, retryTag, async (ct) => {
- const url = new URL(
- `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
- exchangeUrl,
- );
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
- method: "GET",
- cancellationToken: ct,
- });
- if (
- kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
- // remove after the exchange is fixed or clarified
- kycStatusRes.status === HttpStatusCode.NoContent
- ) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.peerPushCredit])
- .runReadWrite(async (tx) => {
- const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
- if (!peerInc) {
- return;
- }
- if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) {
- return;
- }
- const oldTxState = computePeerPushCreditTransactionState(peerInc);
- peerInc.status = PeerPushCreditStatus.PendingMerge;
- const newTxState = computePeerPushCreditTransactionState(peerInc);
- await tx.peerPushCredit.put(peerInc);
- return { oldTxState, newTxState };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return { ready: true };
- } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
- // FIXME: Do we have to update the URL here?
- return { ready: false };
- } else {
- throw Error(
- `unexpected response from kyc-check (${kycStatusRes.status})`,
- );
- }
+ const url = new URL(
+ `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+ exchangeUrl,
+ );
+ url.searchParams.set("timeout_ms", "10000");
+ logger.info(`kyc url ${url.href}`);
+ const kycStatusRes = await ws.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken,
});
- return {
- type: TaskRunResultType.Longpoll,
- };
+ if (
+ kycStatusRes.status === HttpStatusCode.Ok ||
+ //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // remove after the exchange is fixed or clarified
+ kycStatusRes.status === HttpStatusCode.NoContent
+ ) {
+ const transitionInfo = await ws.db
+ .mktx((x) => [x.peerPushCredit])
+ .runReadWrite(async (tx) => {
+ const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
+ if (!peerInc) {
+ return;
+ }
+ if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) {
+ return;
+ }
+ const oldTxState = computePeerPushCreditTransactionState(peerInc);
+ peerInc.status = PeerPushCreditStatus.PendingMerge;
+ const newTxState = computePeerPushCreditTransactionState(peerInc);
+ await tx.peerPushCredit.put(peerInc);
+ return { oldTxState, newTxState };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+ // FIXME: Do we have to update the URL here?
+ } else {
+ throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+ }
+ return TaskRunResult.backoff();
}
async function processPeerPushCreditKycRequired(
@@ -786,7 +773,7 @@ async function handlePendingMerge(
);
notifyTransition(ws, transactionId, txRes?.peerPushCreditTransition);
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
async function handlePendingWithdrawing(
@@ -839,13 +826,14 @@ async function handlePendingWithdrawing(
return TaskRunResult.finished();
} else {
// FIXME: Return indicator that we depend on the other operation!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
}
export async function processPeerPushCredit(
ws: InternalWalletState,
peerPushCreditId: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
let peerInc: PeerPushPaymentIncomingRecord | undefined;
let contractTerms: PeerContractTerms | undefined;
@@ -886,6 +874,7 @@ export async function processPeerPushCredit(
peerInc.exchangeBaseUrl,
peerInc.kycInfo,
"individual",
+ cancellationToken,
);
}
@@ -940,7 +929,9 @@ export async function confirmPeerPushCredit(
);
}
- ws.workAvailable.trigger();
+ const ctx = new PeerPushCreditTransactionContext(ws, peerPushCreditId);
+
+ ws.taskScheduler.startShepherdTask(ctx.retryTag);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
index 2e5af4e78..165c8deee 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
@@ -16,6 +16,7 @@
import {
Amounts,
+ CancellationToken,
CheckPeerPushDebitRequest,
CheckPeerPushDebitResponse,
CoinRefreshRequest,
@@ -32,6 +33,7 @@ import {
TalerProtocolTimestamp,
TalerProtocolViolationError,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -56,7 +58,7 @@ import {
timestampProtocolToDb,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { PeerCoinRepair, selectPeerCoins } from "../util/coinSelection.js";
import { checkLogicInvariant } from "../util/invariants.js";
@@ -65,7 +67,6 @@ import {
TaskRunResultType,
TransactionContext,
constructTaskIdentifier,
- runLongpollAsync,
spendCoins,
} from "./common.js";
import {
@@ -76,14 +77,13 @@ import {
import {
constructTransactionIdentifier,
notifyTransition,
- stopLongpolling,
} from "./transactions.js";
const logger = new Logger("pay-peer-push-debit.ts");
export class PeerPushDebitTransactionContext implements TransactionContext {
- public transactionId: string;
- public retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly retryTag: TaskId;
constructor(
public ws: InternalWalletState,
@@ -114,7 +114,6 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -166,12 +165,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -218,12 +217,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -275,13 +275,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
const { ws, pursePub, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushDebit])
.runReadWrite(async (tx) => {
@@ -328,7 +327,9 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -432,7 +433,7 @@ async function handlePurseCreationConflict(
}
await tx.peerPushDebit.put(myPpi);
});
- return TaskRunResult.finished();
+ return TaskRunResult.progress();
}
async function processPeerPushDebitCreateReserve(
@@ -554,7 +555,7 @@ async function processPeerPushDebitCreateReserve(
stTo: PeerPushDebitStatus.PendingReady,
});
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingDeletePurse(
@@ -628,7 +629,7 @@ async function processPeerPushDebitAbortingDeletePurse(
});
notifyTransition(ws, transactionId, transitionInfo);
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
interface SimpleTransition {
@@ -712,7 +713,7 @@ async function processPeerPushDebitAbortingRefreshDeleted(
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingRefreshExpired(
@@ -760,7 +761,7 @@ async function processPeerPushDebitAbortingRefreshExpired(
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
/**
@@ -769,118 +770,102 @@ async function processPeerPushDebitAbortingRefreshExpired(
async function processPeerPushDebitReady(
ws: InternalWalletState,
peerPushInitiation: PeerPushDebitRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace("processing peer-push-debit pending(ready)");
const pursePub = peerPushInitiation.pursePub;
- const retryTag = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
const transactionId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub,
});
- runLongpollAsync(ws, retryTag, async (ct) => {
- const mergeUrl = new URL(
- `purses/${pursePub}/merge`,
- peerPushInitiation.exchangeBaseUrl,
+ const mergeUrl = new URL(
+ `purses/${pursePub}/merge`,
+ peerPushInitiation.exchangeBaseUrl,
+ );
+ mergeUrl.searchParams.set("timeout_ms", "30000");
+ logger.info(`long-polling on purse status at ${mergeUrl.href}`);
+ const resp = await ws.http.fetch(mergeUrl.href, {
+ // timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken,
+ });
+ if (resp.status === HttpStatusCode.Ok) {
+ const purseStatus = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForExchangePurseStatus(),
);
- mergeUrl.searchParams.set("timeout_ms", "30000");
- logger.info(`long-polling on purse status at ${mergeUrl.href}`);
- const resp = await ws.http.fetch(mergeUrl.href, {
- // timeout: getReserveRequestTimeout(withdrawalGroup),
- cancellationToken: ct,
- });
- if (resp.status === HttpStatusCode.Ok) {
- const purseStatus = await readSuccessResponseJsonOrThrow(
- resp,
- codecForExchangePurseStatus(),
+ const mergeTimestamp = purseStatus.merge_timestamp;
+ logger.info(`got purse status ${j2s(purseStatus)}`);
+ if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
+ return TaskRunResult.backoff();
+ } else {
+ await transitionPeerPushDebitTransaction(
+ ws,
+ peerPushInitiation.pursePub,
+ {
+ stFrom: PeerPushDebitStatus.PendingReady,
+ stTo: PeerPushDebitStatus.Done,
+ },
);
- const mergeTimestamp = purseStatus.merge_timestamp;
- logger.info(`got purse status ${j2s(purseStatus)}`);
- if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
- return { ready: false };
- } else {
- await transitionPeerPushDebitTransaction(
+ return TaskRunResult.finished();
+ }
+ } else if (resp.status === HttpStatusCode.Gone) {
+ logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
+ const transitionInfo = await ws.db
+ .mktx((x) => [
+ x.peerPushDebit,
+ x.refreshGroups,
+ x.denominations,
+ x.coinAvailability,
+ x.coins,
+ ])
+ .runReadWrite(async (tx) => {
+ const ppiRec = await tx.peerPushDebit.get(pursePub);
+ if (!ppiRec) {
+ return undefined;
+ }
+ if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
+ return undefined;
+ }
+ const currency = Amounts.currencyOf(ppiRec.amount);
+ const oldTxState = computePeerPushDebitTransactionState(ppiRec);
+ const coinPubs: CoinRefreshRequest[] = [];
+
+ for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
+ coinPubs.push({
+ amount: ppiRec.coinSel.contributions[i],
+ coinPub: ppiRec.coinSel.coinPubs[i],
+ });
+ }
+
+ const refresh = await createRefreshGroup(
ws,
- peerPushInitiation.pursePub,
- {
- stFrom: PeerPushDebitStatus.PendingReady,
- stTo: PeerPushDebitStatus.Done,
- },
+ tx,
+ currency,
+ coinPubs,
+ RefreshReason.AbortPeerPushDebit,
+ transactionId,
);
+ ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
+ ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
+ await tx.peerPushDebit.put(ppiRec);
+ const newTxState = computePeerPushDebitTransactionState(ppiRec);
return {
- ready: true,
+ oldTxState,
+ newTxState,
};
- }
- } else if (resp.status === HttpStatusCode.Gone) {
- logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- const transitionInfo = await ws.db
- .mktx((x) => [
- x.peerPushDebit,
- x.refreshGroups,
- x.denominations,
- x.coinAvailability,
- x.coins,
- ])
- .runReadWrite(async (tx) => {
- const ppiRec = await tx.peerPushDebit.get(pursePub);
- if (!ppiRec) {
- return undefined;
- }
- if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
- return undefined;
- }
- const currency = Amounts.currencyOf(ppiRec.amount);
- const oldTxState = computePeerPushDebitTransactionState(ppiRec);
- const coinPubs: CoinRefreshRequest[] = [];
-
- for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
- coinPubs.push({
- amount: ppiRec.coinSel.contributions[i],
- coinPub: ppiRec.coinSel.coinPubs[i],
- });
- }
-
- const refresh = await createRefreshGroup(
- ws,
- tx,
- currency,
- coinPubs,
- RefreshReason.AbortPeerPushDebit,
- transactionId,
- );
- ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
- ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
- await tx.peerPushDebit.put(ppiRec);
- const newTxState = computePeerPushDebitTransactionState(ppiRec);
- return {
- oldTxState,
- newTxState,
- };
- });
- notifyTransition(ws, transactionId, transitionInfo);
- return {
- ready: true,
- };
- } else {
- logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
- return {
- ready: false,
- };
- }
- });
- logger.trace(
- "returning early from peer-push-debit for long-polling in background",
- );
- return {
- type: TaskRunResultType.Longpoll,
- };
+ });
+ notifyTransition(ws, transactionId, transitionInfo);
+ return TaskRunResult.backoff();
+ } else {
+ logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
+ return TaskRunResult.backoff();
+ }
}
export async function processPeerPushDebit(
ws: InternalWalletState,
pursePub: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const peerPushInitiation = await ws.db
.mktx((x) => [x.peerPushDebit])
@@ -891,24 +876,15 @@ export async function processPeerPushDebit(
throw Error("peer push payment not found");
}
- const retryTag = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
-
- // We're already running!
- if (ws.activeLongpoll[retryTag]) {
- logger.info("peer-push-debit task already in long-polling, returning!");
- return {
- type: TaskRunResultType.Longpoll,
- };
- }
-
switch (peerPushInitiation.status) {
case PeerPushDebitStatus.PendingCreatePurse:
return processPeerPushDebitCreateReserve(ws, peerPushInitiation);
case PeerPushDebitStatus.PendingReady:
- return processPeerPushDebitReady(ws, peerPushInitiation);
+ return processPeerPushDebitReady(
+ ws,
+ peerPushInitiation,
+ cancellationToken,
+ );
case PeerPushDebitStatus.AbortingDeletePurse:
return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation);
case PeerPushDebitStatus.AbortingRefreshDeleted:
@@ -971,10 +947,9 @@ export async function initiatePeerPushDebit(
const pursePub = pursePair.pub;
- const transactionId = constructTaskIdentifier({
- tag: PendingTaskType.PeerPushDebit,
- pursePub,
- });
+ const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+
+ const transactionId = ctx.transactionId;
const contractEncNonce = encodeCrock(getRandomBytes(24));
@@ -1044,6 +1019,8 @@ export async function initiatePeerPushDebit(
hintTransactionId: transactionId,
});
+ ws.taskScheduler.startShepherdTask(ctx.retryTag);
+
return {
contractPriv: contractKeyPair.priv,
mergePriv: mergePair.priv,
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
deleted file mode 100644
index 990d9a7b3..000000000
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ /dev/null
@@ -1,814 +0,0 @@
-/*
- This file is part of GNU Taler
- (C) 2019 GNUnet e.V.
-
- GNU Taler is free software; you can redistribute it and/or modify it under the
- terms of the GNU General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along with
- GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
- */
-
-/**
- * Derive pending tasks from the wallet database.
- */
-
-/**
- * Imports.
- */
-import { GlobalIDB } from "@gnu-taler/idb-bridge";
-import {
- AbsoluteTime,
- TalerPreciseTimestamp,
- TransactionRecordFilter,
-} from "@gnu-taler/taler-util";
-import {
- BackupProviderStateTag,
- DbPreciseTimestamp,
- DepositElementStatus,
- DepositGroupRecord,
- ExchangeEntryDbUpdateStatus,
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- PeerPullCreditRecord,
- PeerPullDebitRecordStatus,
- PeerPullPaymentIncomingRecord,
- PeerPushCreditStatus,
- PeerPushDebitRecord,
- PeerPushPaymentIncomingRecord,
- PurchaseRecord,
- PurchaseStatus,
- RefreshCoinStatus,
- RefreshGroupRecord,
- RefreshOperationStatus,
- RefundGroupRecord,
- RewardRecord,
- WalletStoresV1,
- WithdrawalGroupRecord,
- timestampAbsoluteFromDb,
- timestampOptionalAbsoluteFromDb,
- timestampPreciseFromDb,
- timestampPreciseToDb,
-} from "../db.js";
-import { InternalWalletState } from "../internal-wallet-state.js";
-import {
- PendingOperationsResponse,
- PendingTaskType,
- TaskId,
-} from "../pending-types.js";
-import { GetReadOnlyAccess } from "../util/query.js";
-import { TaskIdentifiers } from "./common.js";
-
-function getPendingCommon(
- ws: InternalWalletState,
- opTag: TaskId,
- timestampDue: AbsoluteTime,
-): {
- id: TaskId;
- isDue: boolean;
- timestampDue: AbsoluteTime;
- isLongpolling: boolean;
-} {
- const isDue =
- AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag];
- return {
- id: opTag,
- isDue,
- timestampDue,
- isLongpolling: !!ws.activeLongpoll[opTag],
- };
-}
-
-async function gatherExchangePending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- exchanges: typeof WalletStoresV1.exchanges;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- let timestampDue: DbPreciseTimestamp | undefined = undefined;
- await tx.exchanges.iter().forEachAsync(async (exch) => {
- switch (exch.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Initial:
- case ExchangeEntryDbUpdateStatus.Suspended:
- return;
- }
- const opUpdateExchangeTag = TaskIdentifiers.forExchangeUpdate(exch);
- let opr = await tx.operationRetries.get(opUpdateExchangeTag);
-
- switch (exch.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Ready:
- timestampDue = opr?.retryInfo.nextRetry ?? exch.nextRefreshCheckStamp;
- break;
- case ExchangeEntryDbUpdateStatus.ReadyUpdate:
- case ExchangeEntryDbUpdateStatus.InitialUpdate:
- case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
- timestampDue =
- opr?.retryInfo.nextRetry ??
- timestampPreciseToDb(TalerPreciseTimestamp.now());
- break;
- }
-
- resp.pendingOperations.push({
- type: PendingTaskType.ExchangeUpdate,
- ...getPendingCommon(
- ws,
- opUpdateExchangeTag,
- AbsoluteTime.fromPreciseTimestamp(timestampPreciseFromDb(timestampDue)),
- ),
- givesLifeness: false,
- exchangeBaseUrl: exch.baseUrl,
- lastError: opr?.lastError,
- });
-
- // We only schedule a check for auto-refresh if the exchange update
- // was successful.
- if (!opr?.lastError) {
- const opCheckRefreshTag = TaskIdentifiers.forExchangeCheckRefresh(exch);
- resp.pendingOperations.push({
- type: PendingTaskType.ExchangeCheckRefresh,
- ...getPendingCommon(
- ws,
- opCheckRefreshTag,
- AbsoluteTime.fromPreciseTimestamp(
- timestampPreciseFromDb(timestampDue),
- ),
- ),
- timestampDue: AbsoluteTime.fromPreciseTimestamp(
- timestampPreciseFromDb(exch.nextRefreshCheckStamp),
- ),
- givesLifeness: false,
- exchangeBaseUrl: exch.baseUrl,
- });
- }
- });
-}
-
-/**
- * Iterate refresh records based on a filter.
- */
-export async function iterRecordsForRefresh(
- tx: GetReadOnlyAccess<{
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- }>,
- filter: TransactionRecordFilter,
- f: (r: RefreshGroupRecord) => Promise<void>,
-): Promise<void> {
- let refreshGroups: RefreshGroupRecord[];
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- RefreshOperationStatus.Pending,
- RefreshOperationStatus.Suspended,
- );
- refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
- } else {
- refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
- }
-
- for (const r of refreshGroups) {
- await f(r);
- }
-}
-
-async function gatherRefreshPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- refreshGroups: typeof WalletStoresV1.refreshGroups;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => {
- if (r.timestampFinished) {
- return;
- }
- const opId = TaskIdentifiers.forRefresh(r);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Refresh,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- refreshGroupId: r.refreshGroupId,
- finishedPerCoin: r.statusPerCoin.map(
- (x) => x === RefreshCoinStatus.Finished,
- ),
- retryInfo: retryRecord?.retryInfo,
- });
- });
-}
-
-export async function iterRecordsForWithdrawal(
- tx: GetReadOnlyAccess<{
- withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
- }>,
- filter: TransactionRecordFilter,
- f: (r: WithdrawalGroupRecord) => Promise<void>,
-): Promise<void> {
- let withdrawalGroupRecords: WithdrawalGroupRecord[];
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- withdrawalGroupRecords =
- await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange);
- } else {
- withdrawalGroupRecords =
- await tx.withdrawalGroups.indexes.byStatus.getAll();
- }
- for (const wgr of withdrawalGroupRecords) {
- await f(wgr);
- }
-}
-
-async function gatherWithdrawalPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
- planchets: typeof WalletStoresV1.planchets;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => {
- const opTag = TaskIdentifiers.forWithdrawal(wsr);
- let opr = await tx.operationRetries.get(opTag);
- /**
- * kyc pending operation don't give lifeness
- * since the user need to complete kyc procedure
- */
- const userNeedToCompleteKYC = wsr.kycUrl !== undefined;
- const now = AbsoluteTime.now();
- if (!opr) {
- opr = {
- id: opTag,
- retryInfo: {
- firstTry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
- nextRetry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
- retryCounter: 0,
- },
- };
- }
- resp.pendingOperations.push({
- type: PendingTaskType.Withdraw,
- ...getPendingCommon(
- ws,
- opTag,
- timestampOptionalAbsoluteFromDb(opr.retryInfo?.nextRetry) ??
- AbsoluteTime.now(),
- ),
- givesLifeness: !userNeedToCompleteKYC,
- withdrawalGroupId: wsr.withdrawalGroupId,
- lastError: opr.lastError,
- retryInfo: opr.retryInfo,
- });
- });
-}
-
-export async function iterRecordsForDeposit(
- tx: GetReadOnlyAccess<{
- depositGroups: typeof WalletStoresV1.depositGroups;
- }>,
- filter: TransactionRecordFilter,
- f: (r: DepositGroupRecord) => Promise<void>,
-): Promise<void> {
- let dgs: DepositGroupRecord[];
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange);
- } else {
- dgs = await tx.depositGroups.indexes.byStatus.getAll();
- }
-
- for (const dg of dgs) {
- await f(dg);
- }
-}
-
-async function gatherDepositPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- depositGroups: typeof WalletStoresV1.depositGroups;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => {
- let deposited = true;
- for (const d of dg.statusPerCoin) {
- if (d === DepositElementStatus.DepositPending) {
- deposited = false;
- }
- }
- /**
- * kyc pending operation don't give lifeness
- * since the user need to complete kyc procedure
- */
- const userNeedToCompleteKYC = dg.kycInfo !== undefined;
- const opId = TaskIdentifiers.forDeposit(dg);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Deposit,
- ...getPendingCommon(ws, opId, timestampDue),
- // Fully deposited operations don't give lifeness,
- // because there is no reason to wait on the
- // deposit tracking status.
- givesLifeness: !deposited && !userNeedToCompleteKYC,
- depositGroupId: dg.depositGroupId,
- lastError: retryRecord?.lastError,
- retryInfo: retryRecord?.retryInfo,
- });
- });
-}
-
-export async function iterRecordsForReward(
- tx: GetReadOnlyAccess<{
- rewards: typeof WalletStoresV1.rewards;
- }>,
- filter: TransactionRecordFilter,
- f: (r: RewardRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherRewardPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- rewards: typeof WalletStoresV1.rewards;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => {
- const opId = TaskIdentifiers.forTipPickup(tip);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
-
- /**
- * kyc pending operation don't give lifeness
- * since the user need to complete kyc procedure
- */
- // const userNeedToCompleteKYC = tip.
-
- if (tip.acceptedTimestamp) {
- resp.pendingOperations.push({
- type: PendingTaskType.RewardPickup,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- timestampDue,
- merchantBaseUrl: tip.merchantBaseUrl,
- tipId: tip.walletRewardId,
- merchantTipId: tip.merchantRewardId,
- });
- }
- });
-}
-
-export async function iterRecordsForRefund(
- tx: GetReadOnlyAccess<{
- refundGroups: typeof WalletStoresV1.refundGroups;
- }>,
- filter: TransactionRecordFilter,
- f: (r: RefundGroupRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.refundGroups.iter().forEachAsync(f);
- }
-}
-
-export async function iterRecordsForPurchase(
- tx: GetReadOnlyAccess<{
- purchases: typeof WalletStoresV1.purchases;
- }>,
- filter: TransactionRecordFilter,
- f: (r: PurchaseRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherPurchasePending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- purchases: typeof WalletStoresV1.purchases;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => {
- switch (pr.purchaseStatus) {
- // These states are nonfinal but don't need any processing
- case PurchaseStatus.DialogProposed:
- case PurchaseStatus.DialogShared:
- return;
- }
- const opId = TaskIdentifiers.forPay(pr);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Purchase,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- statusStr: PurchaseStatus[pr.purchaseStatus],
- proposalId: pr.proposalId,
- retryInfo: retryRecord?.retryInfo,
- lastError: retryRecord?.lastError,
- });
- });
-}
-
-async function gatherRecoupPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- recoupGroups: typeof WalletStoresV1.recoupGroups;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- // FIXME: Have a status field!
- await tx.recoupGroups.iter().forEachAsync(async (rg) => {
- if (rg.timestampFinished) {
- return;
- }
- const opId = TaskIdentifiers.forRecoup(rg);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Recoup,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- recoupGroupId: rg.recoupGroupId,
- retryInfo: retryRecord?.retryInfo,
- lastError: retryRecord?.lastError,
- });
- });
-}
-
-async function gatherBackupPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- backupProviders: typeof WalletStoresV1.backupProviders;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await tx.backupProviders.iter().forEachAsync(async (bp) => {
- const opId = TaskIdentifiers.forBackup(bp);
- const retryRecord = await tx.operationRetries.get(opId);
- if (bp.state.tag === BackupProviderStateTag.Ready) {
- const timestampDue = timestampAbsoluteFromDb(
- bp.state.nextBackupTimestamp,
- );
- resp.pendingOperations.push({
- type: PendingTaskType.Backup,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: false,
- backupProviderBaseUrl: bp.baseUrl,
- lastError: undefined,
- });
- } else if (bp.state.tag === BackupProviderStateTag.Retrying) {
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo?.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Backup,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: false,
- backupProviderBaseUrl: bp.baseUrl,
- retryInfo: retryRecord?.retryInfo,
- lastError: retryRecord?.lastError,
- });
- }
- });
-}
-
-export async function iterRecordsForPeerPullInitiation(
- tx: GetReadOnlyAccess<{
- peerPullCredit: typeof WalletStoresV1.peerPullCredit;
- }>,
- filter: TransactionRecordFilter,
- f: (r: PeerPullCreditRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherPeerPullInitiationPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- peerPullCredit: typeof WalletStoresV1.peerPullCredit;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForPeerPullInitiation(
- tx,
- { onlyState: "nonfinal" },
- async (pi) => {
- const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
-
- /**
- * kyc pending operation don't give lifeness
- * since the user need to complete kyc procedure
- */
- const userNeedToCompleteKYC = pi.kycUrl !== undefined;
-
- resp.pendingOperations.push({
- type: PendingTaskType.PeerPullCredit,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: !userNeedToCompleteKYC,
- retryInfo: retryRecord?.retryInfo,
- pursePub: pi.pursePub,
- internalOperationStatus: `0x${pi.status.toString(16)}`,
- });
- },
- );
-}
-
-export async function iterRecordsForPeerPullDebit(
- tx: GetReadOnlyAccess<{
- peerPullDebit: typeof WalletStoresV1.peerPullDebit;
- }>,
- filter: TransactionRecordFilter,
- f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherPeerPullDebitPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- peerPullDebit: typeof WalletStoresV1.peerPullDebit;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForPeerPullDebit(
- tx,
- { onlyState: "nonfinal" },
- async (pi) => {
- const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- switch (pi.status) {
- case PeerPullDebitRecordStatus.DialogProposed:
- return;
- }
- resp.pendingOperations.push({
- type: PendingTaskType.PeerPullDebit,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- retryInfo: retryRecord?.retryInfo,
- peerPullDebitId: pi.peerPullDebitId,
- internalOperationStatus: `0x${pi.status.toString(16)}`,
- });
- },
- );
-}
-
-export async function iterRecordsForPeerPushInitiation(
- tx: GetReadOnlyAccess<{
- peerPushDebit: typeof WalletStoresV1.peerPushDebit;
- }>,
- filter: TransactionRecordFilter,
- f: (r: PeerPushDebitRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherPeerPushInitiationPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- peerPushDebit: typeof WalletStoresV1.peerPushDebit;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForPeerPushInitiation(
- tx,
- { onlyState: "nonfinal" },
- async (pi) => {
- const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.PeerPushDebit,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- retryInfo: retryRecord?.retryInfo,
- pursePub: pi.pursePub,
- });
- },
- );
-}
-
-export async function iterRecordsForPeerPushCredit(
- tx: GetReadOnlyAccess<{
- peerPushCredit: typeof WalletStoresV1.peerPushCredit;
- }>,
- filter: TransactionRecordFilter,
- f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
-): Promise<void> {
- if (filter.onlyState === "nonfinal") {
- const keyRange = GlobalIDB.KeyRange.bound(
- OPERATION_STATUS_ACTIVE_FIRST,
- OPERATION_STATUS_ACTIVE_LAST,
- );
- await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
- } else {
- await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f);
- }
-}
-
-async function gatherPeerPushCreditPending(
- ws: InternalWalletState,
- tx: GetReadOnlyAccess<{
- peerPushCredit: typeof WalletStoresV1.peerPushCredit;
- operationRetries: typeof WalletStoresV1.operationRetries;
- }>,
- now: AbsoluteTime,
- resp: PendingOperationsResponse,
-): Promise<void> {
- await iterRecordsForPeerPushCredit(
- tx,
- { onlyState: "nonfinal" },
- async (pi) => {
- const opId = TaskIdentifiers.forPeerPushCredit(pi);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
- AbsoluteTime.now();
-
- /**
- * kyc pending operation don't give lifeness
- * since the user need to complete kyc procedure
- */
- const userNeedToCompleteKYC = pi.kycUrl !== undefined;
-
- switch (pi.status) {
- // Status is nonfinal but no processing needs to be done
- case PeerPushCreditStatus.DialogProposed:
- return;
- default:
- resp.pendingOperations.push({
- type: PendingTaskType.PeerPushCredit,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: !userNeedToCompleteKYC,
- retryInfo: retryRecord?.retryInfo,
- peerPushCreditId: pi.peerPushCreditId,
- });
- }
- },
- );
-}
-
-const taskPrio: { [X in PendingTaskType]: number } = {
- [PendingTaskType.Deposit]: 2,
- [PendingTaskType.ExchangeUpdate]: 1,
- [PendingTaskType.PeerPullCredit]: 2,
- [PendingTaskType.PeerPullDebit]: 2,
- [PendingTaskType.PeerPushCredit]: 2,
- [PendingTaskType.Purchase]: 2,
- [PendingTaskType.Recoup]: 3,
- [PendingTaskType.RewardPickup]: 2,
- [PendingTaskType.Refresh]: 3,
- [PendingTaskType.Withdraw]: 3,
- [PendingTaskType.ExchangeCheckRefresh]: 3,
- [PendingTaskType.PeerPushDebit]: 2,
- [PendingTaskType.Backup]: 4,
-};
-
-export async function getPendingOperations(
- ws: InternalWalletState,
-): Promise<PendingOperationsResponse> {
- const now = AbsoluteTime.now();
- const resp = await ws.db
- .mktx((x) => [
- x.backupProviders,
- x.exchanges,
- x.exchangeDetails,
- x.refreshGroups,
- x.coins,
- x.withdrawalGroups,
- x.rewards,
- x.purchases,
- x.planchets,
- x.depositGroups,
- x.recoupGroups,
- x.operationRetries,
- x.peerPullCredit,
- x.peerPushDebit,
- x.peerPullDebit,
- x.peerPushCredit,
- ])
- .runReadWrite(async (tx) => {
- const resp: PendingOperationsResponse = {
- pendingOperations: [],
- };
- await gatherExchangePending(ws, tx, now, resp);
- await gatherRefreshPending(ws, tx, now, resp);
- await gatherWithdrawalPending(ws, tx, now, resp);
- await gatherDepositPending(ws, tx, now, resp);
- await gatherRewardPending(ws, tx, now, resp);
- await gatherPurchasePending(ws, tx, now, resp);
- await gatherRecoupPending(ws, tx, now, resp);
- await gatherBackupPending(ws, tx, now, resp);
- await gatherPeerPushInitiationPending(ws, tx, now, resp);
- await gatherPeerPullInitiationPending(ws, tx, now, resp);
- await gatherPeerPullDebitPending(ws, tx, now, resp);
- await gatherPeerPushCreditPending(ws, tx, now, resp);
- return resp;
- });
-
- resp.pendingOperations.sort((a, b) => {
- let prioA = taskPrio[a.type];
- let prioB = taskPrio[b.type];
- return Math.sign(prioA - prioB);
- });
-
- return resp;
-}
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 5f7169dbd..b9ac12518 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -15,12 +15,12 @@
*/
import {
- AbsoluteTime,
AgeCommitment,
AgeRestriction,
AmountJson,
Amounts,
amountToPretty,
+ CancellationToken,
codecForExchangeMeltResponse,
codecForExchangeRevealResponse,
CoinPublicKeyString,
@@ -29,8 +29,6 @@ import {
DenominationInfo,
DenomKeyType,
Duration,
- durationFromSpec,
- durationMul,
encodeCrock,
ExchangeMeltRequest,
ExchangeProtocolVersion,
@@ -51,7 +49,6 @@ import {
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
- TalerProtocolTimestamp,
TransactionAction,
TransactionMajorState,
TransactionState,
@@ -79,10 +76,11 @@ import {
} from "../db.js";
import {
getCandidateWithdrawalDenomsTx,
+ PendingTaskType,
RefreshGroupPerExchangeInfo,
RefreshSessionRecord,
+ TaskId,
timestampPreciseToDb,
- timestampProtocolFromDb,
WalletDbReadWriteTransactionArr,
} from "../index.js";
import {
@@ -94,6 +92,7 @@ import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
import {
+ constructTaskIdentifier,
makeCoinAvailable,
makeCoinsVisible,
TaskRunResult,
@@ -111,6 +110,7 @@ const logger = new Logger("refresh.ts");
export class RefreshTransactionContext implements TransactionContext {
public transactionId: string;
+ readonly taskId: TaskId;
constructor(
public ws: InternalWalletState,
@@ -120,6 +120,10 @@ export class RefreshTransactionContext implements TransactionContext {
tag: TransactionType.Refresh,
refreshGroupId,
});
+ this.taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId,
+ });
}
async deleteTransaction(): Promise<void> {
@@ -211,8 +215,8 @@ export class RefreshTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
@@ -250,8 +254,9 @@ export class RefreshTransactionContext implements TransactionContext {
newTxState: computeRefreshTransactionState(dg),
};
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(this.taskId);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
}
@@ -1003,7 +1008,7 @@ async function refreshReveal(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
- options: Record<string, never> = {},
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace(`processing refresh group ${refreshGroupId}`);
@@ -1053,7 +1058,7 @@ export async function processRefreshGroup(
logger.warn(`exception: ${e}`);
}
if (inShutdown) {
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
if (errors.length > 0) {
return {
@@ -1068,7 +1073,7 @@ export async function processRefreshGroup(
};
}
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function processRefreshSession(
@@ -1311,134 +1316,18 @@ export async function createRefreshGroup(
logger.trace(`created refresh group ${refreshGroupId}`);
+ const ctx = new RefreshTransactionContext(ws, refreshGroupId);
+
+ // Shepherd the task.
+ // If the current transaction fails to commit the refresh
+ // group to the DB, the shepherd will give up.
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
return {
refreshGroupId,
};
}
-/**
- * Timestamp after which the wallet would do the next check for an auto-refresh.
- */
-function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
- const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
- timestampProtocolFromDb(d.stampExpireWithdraw),
- );
- const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
- timestampProtocolFromDb(d.stampExpireDeposit),
- );
- const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
- const deltaDiv = durationMul(delta, 0.75);
- return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-/**
- * Timestamp after which the wallet would do an auto-refresh.
- */
-export function getAutoRefreshExecuteThreshold(d: {
- stampExpireWithdraw: TalerProtocolTimestamp;
- stampExpireDeposit: TalerProtocolTimestamp;
-}): AbsoluteTime {
- const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
- d.stampExpireWithdraw,
- );
- const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
- d.stampExpireDeposit,
- );
- const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
- const deltaDiv = durationMul(delta, 0.5);
- return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-function getAutoRefreshExecuteThresholdForDenom(
- d: DenominationRecord,
-): AbsoluteTime {
- return getAutoRefreshExecuteThreshold({
- stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
- stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
- });
-}
-
-export async function autoRefresh(
- ws: InternalWalletState,
- exchangeBaseUrl: string,
-): Promise<TaskRunResult> {
- logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
-
- // We must make sure that the exchange is up-to-date so that
- // can refresh into new denominations.
- await fetchFreshExchange(ws, exchangeBaseUrl);
-
- let minCheckThreshold = AbsoluteTime.addDuration(
- AbsoluteTime.now(),
- durationFromSpec({ days: 1 }),
- );
- await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.coinAvailability,
- x.refreshGroups,
- x.exchanges,
- ])
- .runReadWrite(async (tx) => {
- const exchange = await tx.exchanges.get(exchangeBaseUrl);
- if (!exchange || !exchange.detailsPointer) {
- return;
- }
- const coins = await tx.coins.indexes.byBaseUrl
- .iter(exchangeBaseUrl)
- .toArray();
- const refreshCoins: CoinRefreshRequest[] = [];
- for (const coin of coins) {
- if (coin.status !== CoinStatus.Fresh) {
- continue;
- }
- const denom = await tx.denominations.get([
- exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- logger.warn("denomination not in database");
- continue;
- }
- const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom);
- if (AbsoluteTime.isExpired(executeThreshold)) {
- refreshCoins.push({
- coinPub: coin.coinPub,
- amount: denom.value,
- });
- } else {
- const checkThreshold = getAutoRefreshCheckThreshold(denom);
- minCheckThreshold = AbsoluteTime.min(
- minCheckThreshold,
- checkThreshold,
- );
- }
- }
- if (refreshCoins.length > 0) {
- const res = await createRefreshGroup(
- ws,
- tx,
- exchange.detailsPointer?.currency,
- refreshCoins,
- RefreshReason.Scheduled,
- undefined,
- );
- logger.trace(
- `created refresh group for auto-refresh (${res.refreshGroupId})`,
- );
- }
- logger.trace(
- `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`,
- );
- exchange.nextRefreshCheckStamp = timestampPreciseToDb(
- AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
- );
- await tx.exchanges.put(exchange);
- });
- return TaskRunResult.finished();
-}
-
export function computeRefreshTransactionState(
rg: RefreshGroupRecord,
): TransactionState {
diff --git a/packages/taler-wallet-core/src/operations/reward.ts b/packages/taler-wallet-core/src/operations/reward.ts
index 6dcd48019..4d8653a9d 100644
--- a/packages/taler-wallet-core/src/operations/reward.ts
+++ b/packages/taler-wallet-core/src/operations/reward.ts
@@ -83,7 +83,6 @@ import {
constructTransactionIdentifier,
notifyTransition,
parseTransactionIdentifier,
- stopLongpolling,
} from "./transactions.js";
import { PendingTaskType } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
@@ -125,7 +124,6 @@ export class RewardTransactionContext implements TransactionContext {
async suspendTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.rewards])
.runReadWrite(async (tx) => {
@@ -161,13 +159,11 @@ export class RewardTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.rewards])
.runReadWrite(async (tx) => {
@@ -207,7 +203,6 @@ export class RewardTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.rewards])
.runReadWrite(async (tx) => {
@@ -247,7 +242,6 @@ export class RewardTransactionContext implements TransactionContext {
async failTransaction(): Promise<void> {
const { ws, walletRewardId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
const transitionInfo = await ws.db
.mktx((x) => [x.rewards])
.runReadWrite(async (tx) => {
diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts
index 4c2cfae2c..5902e8362 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -62,6 +62,7 @@ import {
import { InternalWalletState } from "../internal-wallet-state.js";
import { checkLogicInvariant } from "../util/invariants.js";
import { getBalances } from "./balance.js";
+import { createDepositGroup } from "./deposits.js";
import { fetchFreshExchange } from "./exchanges.js";
import {
confirmPay,
@@ -78,10 +79,8 @@ import {
preparePeerPushCredit,
} from "./pay-peer-push-credit.js";
import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
-import { getPendingOperations } from "./pending.js";
import { getTransactionById, getTransactions } from "./transactions.js";
import { acceptWithdrawalFromUri } from "./withdraw.js";
-import { createDepositGroup } from "./deposits.js";
const logger = new Logger("operations/testing.ts");
@@ -521,44 +520,6 @@ export async function waitUntilGivenTransactionsFinal(
logger.info("done waiting until given transactions are in a final state");
}
-/**
- * Wait until pending work is processed.
- */
-export async function waitUntilTasksProcessed(
- ws: InternalWalletState,
-): Promise<void> {
- logger.info("waiting until pending work is processed");
- ws.ensureTaskLoopRunning();
- let p: OpenedPromise<void> | undefined = undefined;
- const cancelNotifs = ws.addNotificationListener((notif) => {
- if (!p) {
- return;
- }
- if (notif.type === NotificationType.PendingOperationProcessed) {
- p.resolve();
- }
- });
- while (1) {
- p = openPromise();
- const pendingTasksResp = await getPendingOperations(ws);
- logger.info(`waiting on pending ops: ${j2s(pendingTasksResp)}`);
- let finished = true;
- for (const task of pendingTasksResp.pendingOperations) {
- if (task.isDue) {
- finished = false;
- }
- logger.info(`continuing waiting for task ${task.id}`);
- }
- if (finished) {
- break;
- }
- // Wait until task is done
- await p.promise;
- }
- logger.info("done waiting until pending work is processed");
- cancelNotifs();
-}
-
export async function waitUntilRefreshesDone(
ws: InternalWalletState,
): Promise<void> {
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index 3b4e75427..10e018d23 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -17,6 +17,7 @@
/**
* Imports.
*/
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
import {
AbsoluteTime,
Amounts,
@@ -69,18 +70,19 @@ import {
} from "../db.js";
import {
GetReadOnlyAccess,
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
PeerPushDebitStatus,
timestampPreciseFromDb,
timestampProtocolFromDb,
WalletStoresV1,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import {
constructTaskIdentifier,
- resetPendingTaskTimeout,
TaskIdentifiers,
TransactionContext,
} from "./common.js";
@@ -123,18 +125,6 @@ import {
PeerPushDebitTransactionContext,
} from "./pay-peer-push-debit.js";
import {
- iterRecordsForDeposit,
- iterRecordsForPeerPullInitiation as iterRecordsForPeerPullCredit,
- iterRecordsForPeerPullDebit,
- iterRecordsForPeerPushCredit,
- iterRecordsForPeerPushInitiation as iterRecordsForPeerPushDebit,
- iterRecordsForPurchase,
- iterRecordsForRefresh,
- iterRecordsForRefund,
- iterRecordsForReward,
- iterRecordsForWithdrawal,
-} from "./pending.js";
-import {
computeRefreshTransactionActions,
computeRefreshTransactionState,
RefreshTransactionContext,
@@ -159,25 +149,32 @@ function shouldSkipCurrency(
exchangesInTransaction: string[],
): boolean {
if (transactionsRequest?.scopeInfo) {
- const sameCurrency = transactionsRequest.scopeInfo.currency.toLowerCase() === currency.toLowerCase()
+ const sameCurrency =
+ transactionsRequest.scopeInfo.currency.toLowerCase() ===
+ currency.toLowerCase();
switch (transactionsRequest.scopeInfo.type) {
case ScopeType.Global: {
- return !sameCurrency
+ return !sameCurrency;
}
case ScopeType.Exchange: {
- const exchangeInvolveInTransaction = exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== -1
- return !sameCurrency || !exchangeInvolveInTransaction
+ const exchangeInvolveInTransaction =
+ exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !==
+ -1;
+ return !sameCurrency || !exchangeInvolveInTransaction;
}
case ScopeType.Auditor: {
// same currency and same auditor
- throw Error("filering balance in auditor scope is not implemented")
+ throw Error("filering balance in auditor scope is not implemented");
}
- default: assertUnreachable(transactionsRequest.scopeInfo)
+ default:
+ assertUnreachable(transactionsRequest.scopeInfo);
}
}
// FIXME: remove next release
if (transactionsRequest?.currency) {
- return transactionsRequest.currency.toLowerCase() !== currency.toLowerCase();
+ return (
+ transactionsRequest.currency.toLowerCase() !== currency.toLowerCase()
+ );
}
return false;
}
@@ -565,7 +562,7 @@ function buildTransactionForPeerPullCredit(
const silentWithdrawalErrorForInvoice =
wsrOrt?.lastError &&
wsrOrt.lastError.code ===
- TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
+ TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
Object.values(wsrOrt.lastError.errorsPerCoin ?? {}).every((e) => {
return (
e.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR &&
@@ -598,10 +595,10 @@ function buildTransactionForPeerPullCredit(
kycUrl: pullCredit.kycUrl,
...(wsrOrt?.lastError
? {
- error: silentWithdrawalErrorForInvoice
- ? undefined
- : wsrOrt.lastError,
- }
+ error: silentWithdrawalErrorForInvoice
+ ? undefined
+ : wsrOrt.lastError,
+ }
: {}),
};
}
@@ -1118,8 +1115,14 @@ export async function getTransactions(
.runReadOnly(async (tx) => {
await iterRecordsForPeerPushDebit(tx, filter, async (pi) => {
const amount = Amounts.parseOrThrow(pi.amount);
- const exchangesInTx = [pi.exchangeBaseUrl]
- if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) {
+ const exchangesInTx = [pi.exchangeBaseUrl];
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ amount.currency,
+ exchangesInTx,
+ )
+ ) {
return;
}
if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1134,8 +1137,14 @@ export async function getTransactions(
await iterRecordsForPeerPullDebit(tx, filter, async (pi) => {
const amount = Amounts.parseOrThrow(pi.amount);
- const exchangesInTx = [pi.exchangeBaseUrl]
- if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) {
+ const exchangesInTx = [pi.exchangeBaseUrl];
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ amount.currency,
+ exchangesInTx,
+ )
+ ) {
return;
}
if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1169,8 +1178,10 @@ export async function getTransactions(
// Legacy transaction
return;
}
- const exchangesInTx = [pi.exchangeBaseUrl]
- if (shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx)) {
+ const exchangesInTx = [pi.exchangeBaseUrl];
+ if (
+ shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx)
+ ) {
return;
}
if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1208,7 +1219,7 @@ export async function getTransactions(
await iterRecordsForPeerPullCredit(tx, filter, async (pi) => {
const currency = Amounts.currencyOf(pi.amount);
- const exchangesInTx = [pi.exchangeBaseUrl]
+ const exchangesInTx = [pi.exchangeBaseUrl];
if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) {
return;
}
@@ -1243,16 +1254,16 @@ export async function getTransactions(
await iterRecordsForRefund(tx, filter, async (refundGroup) => {
const currency = Amounts.currencyOf(refundGroup.amountRaw);
- const exchangesInTx: string[] = []
- const p = await tx.purchases.get(refundGroup.proposalId)
+ const exchangesInTx: string[] = [];
+ const p = await tx.purchases.get(refundGroup.proposalId);
if (!p || !p.payInfo) return; //refund with no payment
p.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => {
- const c = await tx.coins.get(cp)
+ const c = await tx.coins.get(cp);
if (c?.exchangeBaseUrl) {
- exchangesInTx.push(c.exchangeBaseUrl)
+ exchangesInTx.push(c.exchangeBaseUrl);
}
- })
+ });
if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) {
return;
@@ -1265,8 +1276,12 @@ export async function getTransactions(
});
await iterRecordsForRefresh(tx, filter, async (rg) => {
- const exchangesInTx = rg.infoPerExchange ? Object.keys(rg.infoPerExchange) : []
- if (shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx)) {
+ const exchangesInTx = rg.infoPerExchange
+ ? Object.keys(rg.infoPerExchange)
+ : [];
+ if (
+ shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx)
+ ) {
return;
}
let required = false;
@@ -1286,7 +1301,7 @@ export async function getTransactions(
});
await iterRecordsForWithdrawal(tx, filter, async (wsr) => {
- const exchangesInTx = [wsr.exchangeBaseUrl]
+ const exchangesInTx = [wsr.exchangeBaseUrl];
if (
shouldSkipCurrency(
transactionsRequest,
@@ -1343,8 +1358,16 @@ export async function getTransactions(
await iterRecordsForDeposit(tx, filter, async (dg) => {
const amount = Amounts.parseOrThrow(dg.amount);
- const exchangesInTx = dg.infoPerExchange ? Object.keys(dg.infoPerExchange) : []
- if (shouldSkipCurrency(transactionsRequest, amount.currency, exchangesInTx)) {
+ const exchangesInTx = dg.infoPerExchange
+ ? Object.keys(dg.infoPerExchange)
+ : [];
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ amount.currency,
+ exchangesInTx,
+ )
+ ) {
return;
}
const opId = TaskIdentifiers.forDeposit(dg);
@@ -1362,15 +1385,21 @@ export async function getTransactions(
return;
}
- const exchangesInTx: string[] = []
+ const exchangesInTx: string[] = [];
purchase.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => {
- const c = await tx.coins.get(cp)
+ const c = await tx.coins.get(cp);
if (c?.exchangeBaseUrl) {
- exchangesInTx.push(c.exchangeBaseUrl)
+ exchangesInTx.push(c.exchangeBaseUrl);
}
- })
+ });
- if (shouldSkipCurrency(transactionsRequest, download.currency, exchangesInTx)) {
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ download.currency,
+ exchangesInTx,
+ )
+ ) {
return;
}
const contractTermsRecord = await tx.contractTerms.get(
@@ -1429,7 +1458,6 @@ export async function getTransactions(
transactions.push(buildTransactionForTip(tipRecord, retryRecord));
});
//ends REMOVE REWARDS
-
});
// One-off checks, because of a bug where the wallet previously
@@ -1587,25 +1615,7 @@ export function parseTransactionIdentifier(
}
}
-export function stopLongpolling(ws: InternalWalletState, taskId: string) {
- const longpoll = ws.activeLongpoll[taskId];
- if (longpoll) {
- logger.info(`cancelling long-polling for ${taskId}`);
- longpoll.cancel();
- delete ws.activeLongpoll[taskId];
- }
-}
-
-/**
- * Immediately retry the underlying operation
- * of a transaction.
- */
-export async function retryTransaction(
- ws: InternalWalletState,
- transactionId: string,
-): Promise<void> {
- logger.info(`resetting retry timeout for ${transactionId}`);
-
+function maybeTaskFromTransaction(transactionId: string): TaskId | undefined {
const parsedTx = parseTransactionIdentifier(transactionId);
if (!parsedTx) {
@@ -1615,100 +1625,80 @@ export async function retryTransaction(
// FIXME: We currently don't cancel active long-polling tasks here.
switch (parsedTx.tag) {
- case TransactionType.PeerPullCredit: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.PeerPullCredit:
+ return constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: parsedTx.pursePub,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.Deposit: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.Deposit:
+ return constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: parsedTx.depositGroupId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
case TransactionType.InternalWithdrawal:
- case TransactionType.Withdrawal: {
- // FIXME: Abort current long-poller!
- const taskId = constructTaskIdentifier({
+ case TransactionType.Withdrawal:
+ return constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: parsedTx.withdrawalGroupId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.Payment: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.Payment:
+ return constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: parsedTx.proposalId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.Reward: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.Reward:
+ return constructTaskIdentifier({
tag: PendingTaskType.RewardPickup,
walletRewardId: parsedTx.walletRewardId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.Refresh: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.Refresh:
+ return constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: parsedTx.refreshGroupId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.PeerPullDebit: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.PeerPullDebit:
+ return constructTaskIdentifier({
tag: PendingTaskType.PeerPullDebit,
peerPullDebitId: parsedTx.peerPullDebitId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.PeerPushCredit: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.PeerPushCredit:
+ return constructTaskIdentifier({
tag: PendingTaskType.PeerPushCredit,
peerPushCreditId: parsedTx.peerPushCreditId,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
- case TransactionType.PeerPushDebit: {
- const taskId = constructTaskIdentifier({
+ case TransactionType.PeerPushDebit:
+ return constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub: parsedTx.pursePub,
});
- await resetPendingTaskTimeout(ws, taskId);
- stopLongpolling(ws, taskId);
- break;
- }
case TransactionType.Refund:
// Nothing to do for a refund transaction.
- break;
+ return undefined;
case TransactionType.Recoup:
- // FIXME!
- throw Error("not implemented");
+ return constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId: parsedTx.recoupGroupId,
+ });
default:
assertUnreachable(parsedTx);
}
}
+/**
+ * Immediately retry the underlying operation
+ * of a transaction.
+ */
+export async function retryTransaction(
+ ws: InternalWalletState,
+ transactionId: string,
+): Promise<void> {
+ logger.info(`resetting retry timeout for ${transactionId}`);
+ const taskId = maybeTaskFromTransaction(transactionId);
+ if (taskId) {
+ ws.taskScheduler.resetTaskRetries(taskId);
+ }
+}
+
async function getContextForTransaction(
ws: InternalWalletState,
transactionId: string,
@@ -1828,5 +1818,203 @@ export function notifyTransition(
experimentalUserData,
});
}
- ws.workAvailable.trigger();
+}
+
+/**
+ * Iterate refresh records based on a filter.
+ */
+async function iterRecordsForRefresh(
+ tx: GetReadOnlyAccess<{
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefreshGroupRecord) => Promise<void>,
+): Promise<void> {
+ let refreshGroups: RefreshGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ RefreshOperationStatus.Pending,
+ RefreshOperationStatus.Suspended,
+ );
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
+ } else {
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
+ }
+
+ for (const r of refreshGroups) {
+ await f(r);
+ }
+}
+
+async function iterRecordsForWithdrawal(
+ tx: GetReadOnlyAccess<{
+ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: WithdrawalGroupRecord) => Promise<void>,
+): Promise<void> {
+ let withdrawalGroupRecords: WithdrawalGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange);
+ } else {
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll();
+ }
+ for (const wgr of withdrawalGroupRecords) {
+ await f(wgr);
+ }
+}
+
+async function iterRecordsForDeposit(
+ tx: GetReadOnlyAccess<{
+ depositGroups: typeof WalletStoresV1.depositGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: DepositGroupRecord) => Promise<void>,
+): Promise<void> {
+ let dgs: DepositGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange);
+ } else {
+ dgs = await tx.depositGroups.indexes.byStatus.getAll();
+ }
+
+ for (const dg of dgs) {
+ await f(dg);
+ }
+}
+
+async function iterRecordsForReward(
+ tx: GetReadOnlyAccess<{
+ rewards: typeof WalletStoresV1.rewards;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RewardRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForRefund(
+ tx: GetReadOnlyAccess<{
+ refundGroups: typeof WalletStoresV1.refundGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefundGroupRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.refundGroups.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForPurchase(
+ tx: GetReadOnlyAccess<{
+ purchases: typeof WalletStoresV1.purchases;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PurchaseRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForPeerPullCredit(
+ tx: GetReadOnlyAccess<{
+ peerPullCredit: typeof WalletStoresV1.peerPullCredit;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullCreditRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForPeerPullDebit(
+ tx: GetReadOnlyAccess<{
+ peerPullDebit: typeof WalletStoresV1.peerPullDebit;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForPeerPushDebit(
+ tx: GetReadOnlyAccess<{
+ peerPushDebit: typeof WalletStoresV1.peerPushDebit;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushDebitRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
+async function iterRecordsForPeerPushCredit(
+ tx: GetReadOnlyAccess<{
+ peerPushCredit: typeof WalletStoresV1.peerPushCredit;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+ await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f);
+ }
}
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 86f05478a..4a50e0775 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -1,6 +1,6 @@
/*
This file is part of GNU Taler
- (C) 2019-2021 Taler Systems SA
+ (C) 2019-2024 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -52,6 +52,7 @@ import {
TalerPreciseTimestamp,
TalerProtocolTimestamp,
TransactionAction,
+ TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
@@ -67,7 +68,6 @@ import {
codecForCashinConversionResponse,
codecForConversionBankConfig,
codecForExchangeWithdrawBatchResponse,
- codecForIntegrationBankConfig,
codecForReserveStatus,
codecForWalletKycUuid,
codecForWithdrawOperationStatusResponse,
@@ -103,7 +103,6 @@ import {
import { isWithdrawableDenom, timestampPreciseToDb } from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
- TaskIdentifiers,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
@@ -111,9 +110,8 @@ import {
constructTaskIdentifier,
makeCoinAvailable,
makeCoinsVisible,
- runLongpollAsync,
} from "../operations/common.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
selectForcedWithdrawalDenominations,
@@ -141,7 +139,6 @@ import {
TransitionInfo,
constructTransactionIdentifier,
notifyTransition,
- stopLongpolling,
} from "./transactions.js";
/**
@@ -150,8 +147,8 @@ import {
const logger = new Logger("operations/withdraw.ts");
export class WithdrawTransactionContext implements TransactionContext {
- public transactionId: string;
- public retryTag: string;
+ readonly transactionId: TransactionIdStr;
+ readonly taskId: TaskId;
constructor(
public ws: InternalWalletState,
@@ -161,7 +158,7 @@ export class WithdrawTransactionContext implements TransactionContext {
tag: TransactionType.Withdrawal,
withdrawalGroupId,
});
- this.retryTag = constructTaskIdentifier({
+ this.taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
@@ -185,8 +182,7 @@ export class WithdrawTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, withdrawalGroupId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
+ const { ws, withdrawalGroupId, transactionId, taskId } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.withdrawalGroups])
.runReadWrite(async (tx) => {
@@ -235,13 +231,12 @@ export class WithdrawTransactionContext implements TransactionContext {
}
return undefined;
});
-
+ ws.taskScheduler.stopShepherdTask(taskId);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
- const { ws, withdrawalGroupId, transactionId } = this;
- stopLongpolling(ws, this.retryTag);
+ const { ws, withdrawalGroupId, transactionId, taskId } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.withdrawalGroups])
.runReadWrite(async (tx) => {
@@ -297,12 +292,13 @@ export class WithdrawTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(taskId);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(taskId);
}
async resumeTransaction(): Promise<void> {
- const { ws, withdrawalGroupId, transactionId } = this;
+ const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db
.mktx((x) => [x.withdrawalGroups])
.runReadWrite(async (tx) => {
@@ -351,13 +347,12 @@ export class WithdrawTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
- const { ws, withdrawalGroupId, transactionId, retryTag } = this;
- stopLongpolling(ws, retryTag);
+ const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
const stateUpdate = await ws.db
.mktx((x) => [x.withdrawalGroups])
.runReadWrite(async (tx) => {
@@ -387,7 +382,9 @@ export class WithdrawTransactionContext implements TransactionContext {
}
return undefined;
});
+ ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, stateUpdate);
+ ws.taskScheduler.startShepherdTask(retryTag);
}
}
@@ -744,10 +741,8 @@ async function transitionKycUrlUpdate(
kycUrl: string,
): Promise<void> {
let notificationKycUrl: string | undefined = undefined;
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Withdrawal,
- withdrawalGroupId,
- });
+ const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+ const transactionId = ctx.transactionId;
const transitionInfo = await ws.db
.mktx((x) => [x.planchets, x.withdrawalGroups])
@@ -782,7 +777,7 @@ async function transitionKycUrlUpdate(
experimentalUserData: notificationKycUrl,
});
}
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
}
async function handleKycRequired(
@@ -1273,7 +1268,7 @@ async function queryReserve(
ws: InternalWalletState,
withdrawalGroupId: string,
cancellationToken: CancellationToken,
-): Promise<{ ready: boolean }> {
+): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId,
@@ -1283,7 +1278,7 @@ async function queryReserve(
});
checkDbInvariant(!!withdrawalGroup);
if (withdrawalGroup.status !== WithdrawalGroupStatus.PendingQueryingStatus) {
- return { ready: true };
+ return TaskRunResult.backoff();
}
const reservePub = withdrawalGroup.reservePub;
@@ -1312,7 +1307,7 @@ async function queryReserve(
`got reserve status error, EC=${result.talerErrorResponse.code}`,
);
if (resp.status === HttpStatusCode.NotFound) {
- return { ready: false };
+ return TaskRunResult.backoff();
} else {
throwUnexpectedRequestError(resp, result.talerErrorResponse);
}
@@ -1341,13 +1336,7 @@ async function queryReserve(
notifyTransition(ws, transactionId, transitionResult);
- return { ready: true };
-}
-
-enum BankStatusResultCode {
- Done = "done",
- Waiting = "waiting",
- Aborted = "aborted",
+ return TaskRunResult.backoff();
}
/**
@@ -1452,6 +1441,7 @@ async function transitionKycSatisfied(
async function processWithdrawalGroupPendingKyc(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const userType = "individual";
const kycInfo = withdrawalGroup.kycPending;
@@ -1467,45 +1457,35 @@ async function processWithdrawalGroupPendingKyc(
const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
- const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
- runLongpollAsync(ws, retryTag, async (cancellationToken) => {
- logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
- method: "GET",
- cancellationToken,
- });
- logger.info(
- `kyc long-polling response status: HTTP ${kycStatusRes.status}`,
- );
- if (
- kycStatusRes.status === HttpStatusCode.Ok ||
- //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
- // remove after the exchange is fixed or clarified
- kycStatusRes.status === HttpStatusCode.NoContent
- ) {
- await transitionKycSatisfied(ws, withdrawalGroup);
- return { ready: true };
- } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
- const kycStatus = await kycStatusRes.json();
- logger.info(`kyc status: ${j2s(kycStatus)}`);
- const kycUrl = kycStatus.kyc_url;
- if (typeof kycUrl === "string") {
- await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl);
- }
- return { ready: false };
- } else if (
- kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons
- ) {
- const kycStatus = await kycStatusRes.json();
- logger.info(`aml status: ${j2s(kycStatus)}`);
- return { ready: false };
- } else {
- throw Error(
- `unexpected response from kyc-check (${kycStatusRes.status})`,
- );
- }
+ logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
+ const kycStatusRes = await ws.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken,
});
- return TaskRunResult.longpoll();
+ logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`);
+ if (
+ kycStatusRes.status === HttpStatusCode.Ok ||
+ //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+ // remove after the exchange is fixed or clarified
+ kycStatusRes.status === HttpStatusCode.NoContent
+ ) {
+ await transitionKycSatisfied(ws, withdrawalGroup);
+ } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+ const kycStatus = await kycStatusRes.json();
+ logger.info(`kyc status: ${j2s(kycStatus)}`);
+ const kycUrl = kycStatus.kyc_url;
+ if (typeof kycUrl === "string") {
+ await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl);
+ }
+ } else if (
+ kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons
+ ) {
+ const kycStatus = await kycStatusRes.json();
+ logger.info(`aml status: ${j2s(kycStatus)}`);
+ } else {
+ throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+ }
+ return TaskRunResult.backoff();
}
async function processWithdrawalGroupPendingReady(
@@ -1666,12 +1646,13 @@ async function processWithdrawalGroupPendingReady(
};
}
- return TaskRunResult.finished();
+ return TaskRunResult.backoff();
}
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
@@ -1684,54 +1665,30 @@ export async function processWithdrawalGroup(
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
}
- const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
-
- // We're already running!
- if (ws.activeLongpoll[retryTag]) {
- logger.info("withdrawal group already in long-polling, returning!");
- return {
- type: TaskRunResultType.Longpoll,
- };
- }
-
switch (withdrawalGroup.status) {
case WithdrawalGroupStatus.PendingRegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
// FIXME: This will get called by the main task loop, why call it here?!
- return await processWithdrawalGroup(ws, withdrawalGroupId);
- case WithdrawalGroupStatus.PendingQueryingStatus: {
- runLongpollAsync(ws, retryTag, (ct) => {
- return queryReserve(ws, withdrawalGroupId, ct);
- });
- logger.trace(
- "returning early from withdrawal for long-polling in background",
+ return await processWithdrawalGroup(
+ ws,
+ withdrawalGroupId,
+ cancellationToken,
);
- return {
- type: TaskRunResultType.Longpoll,
- };
+ case WithdrawalGroupStatus.PendingQueryingStatus: {
+ return queryReserve(ws, withdrawalGroupId, cancellationToken);
}
case WithdrawalGroupStatus.PendingWaitConfirmBank: {
- const res = await processReserveBankStatus(ws, withdrawalGroupId);
- switch (res.status) {
- case BankStatusResultCode.Aborted:
- case BankStatusResultCode.Done:
- return TaskRunResult.finished();
- case BankStatusResultCode.Waiting: {
- return TaskRunResult.pending();
- }
- }
- break;
- }
- case WithdrawalGroupStatus.Done:
- case WithdrawalGroupStatus.FailedBankAborted: {
- // FIXME
- return TaskRunResult.pending();
+ return await processReserveBankStatus(ws, withdrawalGroupId);
}
case WithdrawalGroupStatus.PendingAml:
// FIXME: Handle this case, withdrawal doesn't support AML yet.
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
case WithdrawalGroupStatus.PendingKyc:
- return processWithdrawalGroupPendingKyc(ws, withdrawalGroup);
+ return processWithdrawalGroupPendingKyc(
+ ws,
+ withdrawalGroup,
+ cancellationToken,
+ );
case WithdrawalGroupStatus.PendingReady:
// Continue with the actual withdrawal!
return await processWithdrawalGroupPendingReady(ws, withdrawalGroup);
@@ -1747,6 +1704,8 @@ export async function processWithdrawalGroup(
case WithdrawalGroupStatus.SuspendedReady:
case WithdrawalGroupStatus.SuspendedRegisteringBank:
case WithdrawalGroupStatus.SuspendedWaitConfirmBank:
+ case WithdrawalGroupStatus.Done:
+ case WithdrawalGroupStatus.FailedBankAborted:
// Nothing to do.
return TaskRunResult.finished();
default:
@@ -2168,14 +2127,10 @@ async function registerReserveWithBank(
notifyTransition(ws, transactionId, transitionInfo);
}
-interface BankStatusResult {
- status: BankStatusResultCode;
-}
-
async function processReserveBankStatus(
ws: InternalWalletState,
withdrawalGroupId: string,
-): Promise<BankStatusResult> {
+): Promise<TaskRunResult> {
const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
withdrawalGroupId,
});
@@ -2188,9 +2143,7 @@ async function processReserveBankStatus(
case WithdrawalGroupStatus.PendingRegisteringBank:
break;
default:
- return {
- status: BankStatusResultCode.Done,
- };
+ return TaskRunResult.backoff();
}
if (
@@ -2200,9 +2153,7 @@ async function processReserveBankStatus(
}
const bankInfo = withdrawalGroup.wgInfo.bankInfo;
if (!bankInfo) {
- return {
- status: BankStatusResultCode.Done,
- };
+ return TaskRunResult.backoff();
}
const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri);
@@ -2246,9 +2197,7 @@ async function processReserveBankStatus(
};
});
notifyTransition(ws, transactionId, transitionInfo);
- return {
- status: BankStatusResultCode.Aborted,
- };
+ return TaskRunResult.finished();
}
// Bank still needs to know our reserve info
@@ -2302,15 +2251,7 @@ async function processReserveBankStatus(
notifyTransition(ws, transactionId, transitionInfo);
- if (status.transfer_done) {
- return {
- status: BankStatusResultCode.Done,
- };
- } else {
- return {
- status: BankStatusResultCode.Waiting,
- };
- }
+ return TaskRunResult.backoff();
}
export interface PrepareCreateWithdrawalGroupResult {
@@ -2492,6 +2433,13 @@ export async function internalPerformCreateWithdrawalGroup(
prep.withdrawalGroup.exchangeBaseUrl,
);
+ const ctx = new WithdrawTransactionContext(
+ ws,
+ withdrawalGroup.withdrawalGroupId,
+ );
+
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
return {
withdrawalGroup,
transitionInfo,
@@ -2619,10 +2567,10 @@ export async function acceptWithdrawalFromUri(
});
const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Withdrawal,
- withdrawalGroupId,
- });
+
+ const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+
+ const transactionId = ctx.transactionId;
// We do this here, as the reserve should be registered before we return,
// so that we can redirect the user to the bank's status page.
@@ -2639,7 +2587,7 @@ export async function acceptWithdrawalFromUri(
);
}
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
return {
reservePub: withdrawalGroup.reservePub,
@@ -2795,10 +2743,9 @@ export async function createManualWithdrawal(
});
const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.Withdrawal,
- withdrawalGroupId,
- });
+ const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+
+ const transactionId = ctx.transactionId;
const exchangePaytoUris = await ws.db
.mktx((x) => [x.withdrawalGroups, x.exchanges, x.exchangeDetails])
@@ -2806,7 +2753,7 @@ export async function createManualWithdrawal(
return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
return {
reservePub: withdrawalGroup.reservePub,
diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts
index f8406033a..4dfad9389 100644
--- a/packages/taler-wallet-core/src/pending-types.ts
+++ b/packages/taler-wallet-core/src/pending-types.ts
@@ -29,7 +29,6 @@ import { DbRetryInfo } from "./operations/common.js";
export enum PendingTaskType {
ExchangeUpdate = "exchange-update",
- ExchangeCheckRefresh = "exchange-check-refresh",
Purchase = "purchase",
Refresh = "refresh",
Recoup = "recoup",
@@ -49,7 +48,6 @@ export enum PendingTaskType {
export type PendingTaskInfo = PendingTaskInfoCommon &
(
| PendingExchangeUpdateTask
- | PendingExchangeCheckRefreshTask
| PendingPurchaseTask
| PendingRefreshTask
| PendingTipPickupTask
@@ -109,14 +107,6 @@ export interface PendingPeerPushCreditTask {
peerPushCreditId: string;
}
-/**
- * The wallet should check whether coins from this exchange
- * need to be auto-refreshed.
- */
-export interface PendingExchangeCheckRefreshTask {
- type: PendingTaskType.ExchangeCheckRefresh;
- exchangeBaseUrl: string;
-}
export enum ReserveType {
/**
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
new file mode 100644
index 000000000..d1648acc7
--- /dev/null
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -0,0 +1,851 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
+import {
+ AbsoluteTime,
+ CancellationToken,
+ Duration,
+ Logger,
+ NotificationType,
+ RetryLoopOpts,
+ TalerError,
+ TalerErrorCode,
+ TalerErrorDetail,
+ TaskThrottler,
+ TransactionIdStr,
+ TransactionType,
+ WalletNotification,
+ assertUnreachable,
+ j2s,
+ makeErrorDetail,
+} from "@gnu-taler/taler-util";
+import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
+import {
+ GetReadOnlyAccess,
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ WalletStoresV1,
+ timestampAbsoluteFromDb,
+} from "./index.js";
+import { InternalWalletState } from "./internal-wallet-state.js";
+import { processBackupForProvider } from "./operations/backup/index.js";
+import {
+ DbRetryInfo,
+ TaskRunResult,
+ TaskRunResultType,
+ constructTaskIdentifier,
+ getExchangeState,
+ parseTaskIdentifier,
+} from "./operations/common.js";
+import { processDepositGroup } from "./operations/deposits.js";
+import { updateExchangeFromUrlHandler } from "./operations/exchanges.js";
+import { processPurchase } from "./operations/pay-merchant.js";
+import { processPeerPullCredit } from "./operations/pay-peer-pull-credit.js";
+import { processPeerPullDebit } from "./operations/pay-peer-pull-debit.js";
+import { processPeerPushCredit } from "./operations/pay-peer-push-credit.js";
+import { processPeerPushDebit } from "./operations/pay-peer-push-debit.js";
+import { processRecoupGroup } from "./operations/recoup.js";
+import { processRefreshGroup } from "./operations/refresh.js";
+import { constructTransactionIdentifier } from "./operations/transactions.js";
+import { processWithdrawalGroup } from "./operations/withdraw.js";
+import { PendingTaskType, TaskId } from "./pending-types.js";
+import { AsyncCondition } from "./util/promiseUtils.js";
+
+const logger = new Logger("shepherd.ts");
+
+/**
+ * Info about one task being shepherded.
+ */
+interface ShepherdInfo {
+ cts: CancellationToken.Source;
+}
+
+export class TaskScheduler {
+ private sheps: Map<TaskId, ShepherdInfo> = new Map();
+
+ private iterCond = new AsyncCondition();
+
+ private throttler = new TaskThrottler();
+
+ constructor(private ws: InternalWalletState) {}
+
+ async loadTasksFromDb(): Promise<void> {
+ const activeTasks = await getActiveTaskIds(this.ws);
+
+ for (const tid of activeTasks.taskIds) {
+ this.startShepherdTask(tid);
+ }
+ }
+
+ async run(opts: RetryLoopOpts = {}): Promise<void> {
+ logger.info("Running task loop.");
+ this.ws.isTaskLoopRunning = true;
+ await this.loadTasksFromDb();
+ while (true) {
+ if (opts.stopWhenDone && this.sheps.size === 0) {
+ logger.info("Breaking out of task loop (no more work).");
+ break;
+ }
+ if (this.ws.stopped) {
+ logger.info("Breaking out of task loop (wallet stopped).");
+ break;
+ }
+ await this.iterCond.wait();
+ }
+ this.ws.isTaskLoopRunning = false;
+ logger.info("Done with task loop.");
+ }
+
+ startShepherdTask(taskId: TaskId): void {
+ // Run in the background, no await!
+ this.internalStartShepherdTask(taskId);
+ }
+
+ /**
+ * Stop and re-load all existing tasks.
+ *
+ * Mostly useful to interrupt all waits when time-travelling.
+ */
+ reload() {
+ const tasksIds = [...this.sheps.keys()];
+ logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
+ for (const taskId of tasksIds) {
+ this.stopShepherdTask(taskId);
+ }
+ for (const taskId of tasksIds) {
+ this.startShepherdTask(taskId);
+ }
+ }
+
+ private async internalStartShepherdTask(taskId: TaskId): Promise<void> {
+ logger.trace(`Starting to shepherd task ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ }
+ logger.trace(`Creating new shepherd for ${taskId}`);
+ const newShep: ShepherdInfo = {
+ cts: CancellationToken.create(),
+ };
+ this.sheps.set(taskId, newShep);
+ try {
+ await this.internalShepherdTask(taskId, newShep);
+ } finally {
+ logger.trace(`Done shepherding ${taskId}`);
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ stopShepherdTask(taskId: TaskId): void {
+ logger.trace(`Stopping shepherding of ${taskId}`);
+ const oldShep = this.sheps.get(taskId);
+ if (oldShep) {
+ logger.trace(`Cancelling old shepherd for ${taskId}`);
+ oldShep.cts.cancel();
+ this.sheps.delete(taskId);
+ this.iterCond.trigger();
+ }
+ }
+
+ restartShepherdTask(taskId: TaskId): void {
+ this.stopShepherdTask(taskId);
+ this.startShepherdTask(taskId);
+ }
+
+ async resetTaskRetries(taskId: TaskId): Promise<void> {
+ const maybeNotification = await this.ws.db
+ .mktxAll()
+ .runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(taskId);
+ return taskToRetryNotification(this.ws, tx, taskId, undefined);
+ });
+ this.stopShepherdTask(taskId);
+ if (maybeNotification) {
+ this.ws.notify(maybeNotification);
+ }
+ this.startShepherdTask(taskId);
+ }
+
+ private async internalShepherdTask(
+ taskId: TaskId,
+ info: ShepherdInfo,
+ ): Promise<void> {
+ while (true) {
+ if (this.ws.stopped) {
+ logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
+ return;
+ }
+ if (info.cts.token.isCancelled) {
+ logger.trace(`Shepherd for ${taskId} got cancelled`);
+ return;
+ }
+ const isThrottled = this.throttler.applyThrottle(taskId);
+ if (isThrottled) {
+ logger.warn(
+ `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
+ );
+ logger.warn("waiting for 60 seconds");
+ await this.ws.timerGroup.resolveAfter(
+ Duration.fromSpec({ seconds: 60 }),
+ );
+ }
+ logger.trace(`Shepherd for ${taskId} will call handler`);
+ // FIXME: This should already return the retry record.
+ const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
+ return await callOperationHandlerForTaskId(
+ this.ws,
+ taskId,
+ info.cts.token,
+ );
+ });
+ const retryRecord = await this.ws.db.runReadOnlyTx(
+ ["operationRetries"],
+ async (tx) => {
+ return tx.operationRetries.get(taskId);
+ },
+ );
+ switch (res.type) {
+ case TaskRunResultType.Error: {
+ logger.trace(`Shepherd for ${taskId} got error result.`);
+ if (retryRecord) {
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ } else {
+ logger.trace("Retrying immediately.");
+ }
+ break;
+ }
+ case TaskRunResultType.Backoff: {
+ logger.trace(`Shepherd for ${taskId} got backoff result.`);
+ if (retryRecord) {
+ let delay: Duration;
+ const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+ delay = AbsoluteTime.remaining(t);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ } else {
+ logger.trace("Retrying immediately.");
+ }
+ break;
+ }
+ case TaskRunResultType.Progress: {
+ logger.trace(
+ `Shepherd for ${taskId} got progress result, re-running immediately.`,
+ );
+ break;
+ }
+ case TaskRunResultType.ScheduleLater:
+ logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+ const delay = AbsoluteTime.remaining(res.runAt);
+ logger.trace(`Waiting for ${delay.d_ms} ms`);
+ try {
+ await info.cts.token.racePromise(
+ this.ws.timerGroup.resolveAfter(delay),
+ );
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ break;
+ case TaskRunResultType.Finished:
+ logger.trace(`Shepherd for ${taskId} got finished result.`);
+ return;
+ default:
+ assertUnreachable(res);
+ }
+ }
+ }
+}
+
+async function storePendingTaskError(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+ e: TalerErrorDetail,
+): Promise<void> {
+ logger.info(`storing pending task error for ${pendingTaskId}`);
+ const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ lastError: e,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ retryRecord.lastError = e;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ return taskToRetryNotification(ws, tx, pendingTaskId, e);
+ });
+ if (maybeNotification) {
+ ws.notify(maybeNotification);
+ }
+}
+
+/**
+ * Task made progress, clear error.
+ */
+async function storeTaskProgress(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db.mktxAll().runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
+}
+
+async function storePendingTaskPending(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ let hadError = false;
+ if (!retryRecord) {
+ retryRecord = {
+ id: pendingTaskId,
+ retryInfo: DbRetryInfo.reset(),
+ };
+ } else {
+ if (retryRecord.lastError) {
+ hadError = true;
+ }
+ delete retryRecord.lastError;
+ retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+ }
+ await tx.operationRetries.put(retryRecord);
+ if (hadError) {
+ return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
+ } else {
+ return undefined;
+ }
+ });
+ if (maybeNotification) {
+ ws.notify(maybeNotification);
+ }
+}
+
+async function storePendingTaskFinished(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => [x.operationRetries])
+ .runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
+}
+
+async function runTaskWithErrorReporting(
+ ws: InternalWalletState,
+ opId: TaskId,
+ f: () => Promise<TaskRunResult>,
+): Promise<TaskRunResult> {
+ let maybeError: TalerErrorDetail | undefined;
+ try {
+ const resp = await f();
+ switch (resp.type) {
+ case TaskRunResultType.Error:
+ await storePendingTaskError(ws, opId, resp.errorDetail);
+ return resp;
+ case TaskRunResultType.Finished:
+ await storePendingTaskFinished(ws, opId);
+ return resp;
+ case TaskRunResultType.Backoff:
+ await storePendingTaskPending(ws, opId);
+ return resp;
+ case TaskRunResultType.ScheduleLater:
+ // Task succeeded but wants to be run again.
+ await storeTaskProgress(ws, opId);
+ return resp;
+ case TaskRunResultType.Progress:
+ await storeTaskProgress(ws, opId);
+ return resp;
+ }
+ } catch (e) {
+ if (e instanceof CryptoApiStoppedError) {
+ if (ws.stopped) {
+ logger.warn("crypto API stopped during shutdown, ignoring error");
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {},
+ "Crypto API stopped during shutdown",
+ ),
+ };
+ }
+ }
+ if (e instanceof TalerError) {
+ logger.warn("operation processed resulted in error");
+ logger.warn(`error was: ${j2s(e.errorDetail)}`);
+ maybeError = e.errorDetail;
+ await storePendingTaskError(ws, opId, maybeError!);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: e.errorDetail,
+ };
+ } else if (e instanceof Error) {
+ // This is a bug, as we expect pending operations to always
+ // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
+ // or return something.
+ logger.error(`Uncaught exception: ${e.message}`);
+ logger.error(`Stack: ${e.stack}`);
+ maybeError = makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {
+ stack: e.stack,
+ },
+ `unexpected exception (message: ${e.message})`,
+ );
+ await storePendingTaskError(ws, opId, maybeError);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: maybeError,
+ };
+ } else {
+ logger.error("Uncaught exception, value is not even an error.");
+ maybeError = makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {},
+ `unexpected exception (not even an error)`,
+ );
+ await storePendingTaskError(ws, opId, maybeError);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: maybeError,
+ };
+ }
+ }
+}
+
+async function callOperationHandlerForTaskId(
+ ws: InternalWalletState,
+ taskId: TaskId,
+ cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+ const pending = parseTaskIdentifier(taskId);
+ switch (pending.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return await updateExchangeFromUrlHandler(
+ ws,
+ pending.exchangeBaseUrl,
+ cancellationToken,
+ );
+ case PendingTaskType.Refresh:
+ return await processRefreshGroup(
+ ws,
+ pending.refreshGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Withdraw:
+ return await processWithdrawalGroup(
+ ws,
+ pending.withdrawalGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Purchase:
+ return await processPurchase(ws, pending.proposalId);
+ case PendingTaskType.Recoup:
+ return await processRecoupGroup(ws, pending.recoupGroupId);
+ case PendingTaskType.Deposit:
+ return await processDepositGroup(
+ ws,
+ pending.depositGroupId,
+ cancellationToken,
+ );
+ case PendingTaskType.Backup:
+ return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
+ case PendingTaskType.PeerPushDebit:
+ return await processPeerPushDebit(
+ ws,
+ pending.pursePub,
+ cancellationToken,
+ );
+ case PendingTaskType.PeerPullCredit:
+ return await processPeerPullCredit(
+ ws,
+ pending.pursePub,
+ cancellationToken,
+ );
+ case PendingTaskType.PeerPullDebit:
+ return await processPeerPullDebit(ws, pending.peerPullDebitId);
+ case PendingTaskType.PeerPushCredit:
+ return await processPeerPushCredit(
+ ws,
+ pending.peerPushCreditId,
+ cancellationToken,
+ );
+ case PendingTaskType.RewardPickup:
+ throw Error("not supported anymore");
+ default:
+ return assertUnreachable(pending);
+ }
+ throw Error(`not reached ${pending.tag}`);
+}
+
+/**
+ * Generate an appropriate error transition notification
+ * for applicable tasks.
+ *
+ * Namely, transition notifications are generated for:
+ * - exchange update errors
+ * - transactions
+ */
+async function taskToRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.PeerPullCredit:
+ case PendingTaskType.PeerPullDebit:
+ case PendingTaskType.Withdraw:
+ case PendingTaskType.PeerPushCredit:
+ case PendingTaskType.Deposit:
+ case PendingTaskType.Refresh:
+ case PendingTaskType.RewardPickup:
+ case PendingTaskType.PeerPushDebit:
+ case PendingTaskType.Purchase:
+ return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
+ case PendingTaskType.Backup:
+ case PendingTaskType.Recoup:
+ return undefined;
+ }
+}
+
+async function makeTransactionRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ const txId = convertTaskToTransactionId(pendingTaskId);
+ if (!txId) {
+ return undefined;
+ }
+ const txState = await ws.getTransactionState(ws, tx, txId);
+ if (!txState) {
+ return undefined;
+ }
+ const notif: WalletNotification = {
+ type: NotificationType.TransactionStateTransition,
+ transactionId: txId,
+ oldTxState: txState,
+ newTxState: txState,
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+async function makeExchangeRetryNotification(
+ ws: InternalWalletState,
+ tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+ pendingTaskId: string,
+ e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+ logger.info("making exchange retry notification");
+ const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+ if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
+ throw Error("invalid task identifier");
+ }
+ const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
+
+ if (!rec) {
+ logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
+ return undefined;
+ }
+
+ const notif: WalletNotification = {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
+ oldExchangeState: getExchangeState(rec),
+ newExchangeState: getExchangeState(rec),
+ };
+ if (e) {
+ notif.errorInfo = {
+ code: e.code as number,
+ hint: e.hint,
+ };
+ }
+ return notif;
+}
+
+/**
+ * Convert the task ID for a task that processes a transaction int
+ * the ID for the transaction.
+ */
+function convertTaskToTransactionId(
+ taskId: string,
+): TransactionIdStr | undefined {
+ const parsedTaskId = parseTaskIdentifier(taskId);
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.PeerPullCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullCredit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.PeerPullDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullDebitId: parsedTaskId.peerPullDebitId,
+ });
+ // FIXME: This doesn't distinguish internal-withdrawal.
+ // Maybe we should have a different task type for that as well?
+ // Or maybe transaction IDs should be valid task identifiers?
+ case PendingTaskType.Withdraw:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Withdrawal,
+ withdrawalGroupId: parsedTaskId.withdrawalGroupId,
+ });
+ case PendingTaskType.PeerPushCredit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushCredit,
+ peerPushCreditId: parsedTaskId.peerPushCreditId,
+ });
+ case PendingTaskType.Deposit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Deposit,
+ depositGroupId: parsedTaskId.depositGroupId,
+ });
+ case PendingTaskType.Refresh:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Refresh,
+ refreshGroupId: parsedTaskId.refreshGroupId,
+ });
+ case PendingTaskType.RewardPickup:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Reward,
+ walletRewardId: parsedTaskId.walletRewardId,
+ });
+ case PendingTaskType.PeerPushDebit:
+ return constructTransactionIdentifier({
+ tag: TransactionType.PeerPushDebit,
+ pursePub: parsedTaskId.pursePub,
+ });
+ case PendingTaskType.Purchase:
+ return constructTransactionIdentifier({
+ tag: TransactionType.Payment,
+ proposalId: parsedTaskId.proposalId,
+ });
+ default:
+ return undefined;
+ }
+}
+
+export interface ActiveTaskIdsResult {
+ taskIds: TaskId[];
+}
+
+export async function getActiveTaskIds(
+ ws: InternalWalletState,
+): Promise<ActiveTaskIdsResult> {
+ const res: ActiveTaskIdsResult = {
+ taskIds: [],
+ };
+ await ws.db
+ .mktx((x) => [
+ x.exchanges,
+ x.refreshGroups,
+ x.withdrawalGroups,
+ x.purchases,
+ x.depositGroups,
+ x.recoupGroups,
+ x.peerPullCredit,
+ x.peerPushDebit,
+ x.peerPullDebit,
+ x.peerPushCredit,
+ ])
+ .runReadWrite(async (tx) => {
+ const active = GlobalIDB.KeyRange.bound(
+ OPERATION_STATUS_ACTIVE_FIRST,
+ OPERATION_STATUS_ACTIVE_LAST,
+ );
+
+ // Withdrawals
+
+ {
+ const activeRecs =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: rec.withdrawalGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Deposits
+
+ {
+ const activeRecs =
+ await tx.depositGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: rec.depositGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Refreshes
+
+ {
+ const activeRecs =
+ await tx.refreshGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: rec.refreshGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // Purchases
+
+ {
+ const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: rec.proposalId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-debit
+
+ {
+ const activeRecs =
+ await tx.peerPushDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushDebit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-push-credit
+
+ {
+ const activeRecs =
+ await tx.peerPushCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushCredit,
+ peerPushCreditId: rec.peerPushCreditId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-debit
+
+ {
+ const activeRecs =
+ await tx.peerPullDebit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullDebit,
+ peerPullDebitId: rec.peerPullDebitId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // peer-pull-credit
+
+ {
+ const activeRecs =
+ await tx.peerPullCredit.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullCredit,
+ pursePub: rec.pursePub,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // recoup
+
+ {
+ const activeRecs =
+ await tx.recoupGroups.indexes.byStatus.getAll(active);
+ for (const rec of activeRecs) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Recoup,
+ recoupGroupId: rec.recoupGroupId,
+ });
+ res.taskIds.push(taskId);
+ }
+ }
+
+ // exchange update
+
+ {
+ const exchanges = await tx.exchanges.getAll();
+ for (const rec of exchanges) {
+ const taskIdUpdate = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: rec.baseUrl,
+ });
+ res.taskIds.push(taskIdUpdate);
+ }
+ }
+
+ // FIXME: Recoup!
+ });
+
+ return res;
+}
diff --git a/packages/taler-wallet-core/src/util/coinSelection.ts b/packages/taler-wallet-core/src/util/coinSelection.ts
index e06d7454b..be868867d 100644
--- a/packages/taler-wallet-core/src/util/coinSelection.ts
+++ b/packages/taler-wallet-core/src/util/coinSelection.ts
@@ -56,10 +56,8 @@ import {
} from "@gnu-taler/taler-util";
import { DenominationRecord } from "../db.js";
import {
- getAutoRefreshExecuteThreshold,
getExchangeWireDetailsInTx,
isWithdrawableDenom,
- WalletDbReadOnlyTransaction,
WalletDbReadOnlyTransactionArr,
} from "../index.js";
import { InternalWalletState } from "../internal-wallet-state.js";
@@ -67,6 +65,7 @@ import {
getMerchantPaymentBalanceDetails,
getPeerPaymentBalanceDetailsInTx,
} from "../operations/balance.js";
+import { getAutoRefreshExecuteThreshold } from "../operations/common.js";
import { checkDbInvariant, checkLogicInvariant } from "./invariants.js";
const logger = new Logger("coinSelection.ts");
diff --git a/packages/taler-wallet-core/src/util/promiseUtils.ts b/packages/taler-wallet-core/src/util/promiseUtils.ts
index d152a12f4..bc1e40260 100644
--- a/packages/taler-wallet-core/src/util/promiseUtils.ts
+++ b/packages/taler-wallet-core/src/util/promiseUtils.ts
@@ -70,3 +70,43 @@ export class AsyncCondition {
this.promCap = undefined;
}
}
+
+/**
+ * Flag that can be raised to notify asynchronous waiters.
+ *
+ * You can think of it as a promise that can
+ * be un-resolved.
+ */
+export class AsyncFlag {
+ private promCap?: OpenedPromise<void> = undefined;
+ private internalFlagRaised: boolean = false;
+
+ constructor() {}
+
+ /**
+ * Wait until the flag is raised.
+ *
+ * Reset if before returning.
+ */
+ wait(): Promise<void> {
+ if (this.internalFlagRaised) {
+ return Promise.resolve();
+ }
+ if (!this.promCap) {
+ this.promCap = openPromise<void>();
+ }
+ return this.promCap.promise;
+ }
+
+ raise(): void {
+ this.internalFlagRaised = true;
+ if (this.promCap) {
+ this.promCap.resolve();
+ }
+ }
+
+ reset(): void {
+ this.internalFlagRaised = false;
+ this.promCap = undefined;
+ }
+}
diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts
index d96a03c61..5fba61f11 100644
--- a/packages/taler-wallet-core/src/util/query.ts
+++ b/packages/taler-wallet-core/src/util/query.ts
@@ -919,7 +919,7 @@ export class DbAccess<StoreMap> {
strStoreNames.push(swi.storeName);
accessibleStores[swi.storeName] = swi;
}
- const tx = this.db.transaction(strStoreNames, "readwrite");
+ const tx = this.db.transaction(strStoreNames, "readonly");
const readContext = makeReadContext(tx, accessibleStores);
return runTx(tx, readContext, txf);
}
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts b/packages/taler-wallet-core/src/wallet-api-types.ts
index cdde2ee62..190558e14 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -29,8 +29,6 @@ import {
AcceptExchangeTosRequest,
AcceptManualWithdrawalRequest,
AcceptManualWithdrawalResult,
- AcceptRewardRequest,
- AcceptTipResponse,
AcceptWithdrawalResponse,
AddExchangeRequest,
AddGlobalCurrencyAuditorRequest,
@@ -102,8 +100,6 @@ import {
PreparePeerPushCreditRequest,
PreparePeerPushCreditResponse,
PrepareRefundRequest,
- PrepareRewardRequest,
- PrepareTipResult as PrepareRewardResult,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
RecoverStoredBackupRequest,
@@ -242,7 +238,6 @@ export enum WalletApiOperation {
DeleteStoredBackup = "deleteStoredBackup",
RecoverStoredBackup = "recoverStoredBackup",
UpdateExchangeEntry = "updateExchangeEntry",
- TestingWaitTasksProcessed = "testingWaitTasksProcessed",
ListExchangesForScopedCurrency = "listExchangesForScopedCurrency",
PrepareWithdrawExchange = "prepareWithdrawExchange",
TestingInfiniteTransactionLoop = "testingInfiniteTransactionLoop",
@@ -1125,15 +1120,6 @@ export type TestingWaitRefreshesFinalOp = {
};
/**
- * Wait until all tasks have been processed and the wallet is idle.
- */
-export type TestingWaitTasksProcessedOp = {
- op: WalletApiOperation.TestingWaitTasksProcessed;
- request: EmptyObject;
- response: EmptyObject;
-};
-
-/**
* Wait until a transaction is in a particular state.
*/
export type TestingWaitTransactionStateOp = {
@@ -1245,7 +1231,6 @@ export type WalletOperations = {
[WalletApiOperation.ValidateIban]: ValidateIbanOp;
[WalletApiOperation.TestingWaitTransactionsFinal]: TestingWaitTransactionsFinalOp;
[WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp;
- [WalletApiOperation.TestingWaitTasksProcessed]: TestingWaitTasksProcessedOp;
[WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp;
[WalletApiOperation.TestingWaitTransactionState]: TestingWaitTransactionStateOp;
[WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp;
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 42aa8cdfc..0246597be 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -24,7 +24,6 @@
*/
import { IDBFactory } from "@gnu-taler/idb-bridge";
import {
- AbsoluteTime,
AmountString,
Amounts,
CoinDumpJson,
@@ -33,7 +32,6 @@ import {
CreateStoredBackupResponse,
DeleteStoredBackupRequest,
DenominationInfo,
- Duration,
ExchangesShortListResponse,
GetCurrencySpecificationResponse,
InitResponse,
@@ -42,15 +40,14 @@ import {
ListGlobalCurrencyAuditorsResponse,
ListGlobalCurrencyExchangesResponse,
Logger,
- NotificationType,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
RecoverStoredBackupRequest,
+ RetryLoopOpts,
StoredBackupList,
TalerError,
TalerErrorCode,
TalerUriAction,
- TaskThrottler,
TestingWaitTransactionRequest,
TransactionState,
TransactionType,
@@ -123,8 +120,6 @@ import {
codecForUserAttentionsRequest,
codecForValidateIbanRequest,
codecForWithdrawTestBalance,
- durationFromSpec,
- durationMin,
getErrorDetailFromException,
j2s,
parsePaytoUri,
@@ -152,7 +147,6 @@ import {
} from "./db.js";
import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
import {
- ActiveLongpollInfo,
CancelFn,
InternalWalletState,
MerchantInfo,
@@ -172,23 +166,16 @@ import {
getBackupInfo,
getBackupRecovery,
loadBackupRecovery,
- processBackupForProvider,
removeBackupProvider,
runBackupCycle,
setWalletDeviceId,
} from "./operations/backup/index.js";
import { getBalanceDetail, getBalances } from "./operations/balance.js";
import {
- TaskRunResult,
- TaskRunResultType,
- runTaskWithErrorReporting,
-} from "./operations/common.js";
-import {
computeDepositTransactionStatus,
createDepositGroup,
generateDepositGroupTxId,
prepareDepositGroup,
- processDepositGroup,
} from "./operations/deposits.js";
import {
acceptExchangeTermsOfService,
@@ -200,7 +187,6 @@ import {
getExchangeTos,
listExchanges,
lookupExchangeByUri,
- updateExchangeFromUrlHandler,
} from "./operations/exchanges.js";
import {
computePayMerchantTransactionState,
@@ -209,7 +195,6 @@ import {
getContractTermsDetails,
preparePayForTemplate,
preparePayForUri,
- processPurchase,
sharePayment,
startQueryRefund,
startRefundQueryForUri,
@@ -218,38 +203,28 @@ import {
checkPeerPullPaymentInitiation,
computePeerPullCreditTransactionState,
initiatePeerPullPayment,
- processPeerPullCredit,
} from "./operations/pay-peer-pull-credit.js";
import {
computePeerPullDebitTransactionState,
confirmPeerPullDebit,
preparePeerPullDebit,
- processPeerPullDebit,
} from "./operations/pay-peer-pull-debit.js";
import {
computePeerPushCreditTransactionState,
confirmPeerPushCredit,
preparePeerPushCredit,
- processPeerPushCredit,
} from "./operations/pay-peer-push-credit.js";
import {
checkPeerPushDebit,
computePeerPushDebitTransactionState,
initiatePeerPushDebit,
- processPeerPushDebit,
} from "./operations/pay-peer-push-debit.js";
-import { getPendingOperations } from "./operations/pending.js";
-import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
+import { createRecoupGroup } from "./operations/recoup.js";
import {
- autoRefresh,
computeRefreshTransactionState,
forceRefresh,
- processRefreshGroup,
} from "./operations/refresh.js";
-import {
- computeRewardTransactionStatus,
- processTip,
-} from "./operations/reward.js";
+import { computeRewardTransactionStatus } from "./operations/reward.js";
import {
runIntegrationTest,
runIntegrationTest2,
@@ -257,7 +232,6 @@ import {
waitTransactionState,
waitUntilAllTransactionsFinal,
waitUntilRefreshesDone,
- waitUntilTasksProcessed,
withdrawTestBalance,
} from "./operations/testing.js";
import {
@@ -279,9 +253,9 @@ import {
createManualWithdrawal,
getExchangeWithdrawalInfo,
getWithdrawalDetailsForUri,
- processWithdrawalGroup,
} from "./operations/withdraw.js";
-import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
+import { PendingOperationsResponse } from "./pending-types.js";
+import { TaskScheduler } from "./shepherd.js";
import { assertUnreachable } from "./util/assertUnreachable.js";
import {
convertDepositAmount,
@@ -320,184 +294,11 @@ import {
const logger = new Logger("wallet.ts");
-/**
- * Call the right handler for a pending operation without doing
- * any special error handling.
- */
-async function callOperationHandler(
- ws: InternalWalletState,
- pending: PendingTaskInfo,
-): Promise<TaskRunResult> {
- switch (pending.type) {
- case PendingTaskType.ExchangeUpdate:
- return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Refresh:
- return await processRefreshGroup(ws, pending.refreshGroupId);
- case PendingTaskType.Withdraw:
- return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
- case PendingTaskType.RewardPickup:
- return await processTip(ws, pending.tipId);
- case PendingTaskType.Purchase:
- return await processPurchase(ws, pending.proposalId);
- case PendingTaskType.Recoup:
- return await processRecoupGroup(ws, pending.recoupGroupId);
- case PendingTaskType.ExchangeCheckRefresh:
- return await autoRefresh(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Deposit:
- return await processDepositGroup(ws, pending.depositGroupId);
- case PendingTaskType.Backup:
- return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
- case PendingTaskType.PeerPushDebit:
- return await processPeerPushDebit(ws, pending.pursePub);
- case PendingTaskType.PeerPullCredit:
- return await processPeerPullCredit(ws, pending.pursePub);
- case PendingTaskType.PeerPullDebit:
- return await processPeerPullDebit(ws, pending.peerPullDebitId);
- case PendingTaskType.PeerPushCredit:
- return await processPeerPushCredit(ws, pending.peerPushCreditId);
- default:
- return assertUnreachable(pending);
- }
- throw Error(`not reached ${pending.type}`);
-}
-
-/**
- * Process pending operations.
- */
-export async function runPending(ws: InternalWalletState): Promise<void> {
- const pendingOpsResponse = await getPendingOperations(ws);
- for (const p of pendingOpsResponse.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- await runTaskWithErrorReporting(ws, p.id, async () => {
- logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
- return await callOperationHandler(ws, p);
- });
- }
-}
-
-export interface RetryLoopOpts {
- /**
- * Stop the retry loop when all lifeness-giving pending operations
- * are done.
- *
- * Defaults to false.
- */
- stopWhenDone?: boolean;
-}
-
-/**
- * Main retry loop of the wallet.
- *
- * Looks up pending operations from the wallet, runs them, repeat.
- */
async function runTaskLoop(
ws: InternalWalletState,
opts: RetryLoopOpts = {},
): Promise<void> {
- logger.trace(`running task loop opts=${j2s(opts)}`);
- if (ws.isTaskLoopRunning) {
- logger.warn(
- "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
- );
- }
- const throttler = new TaskThrottler();
- ws.isTaskLoopRunning = true;
- for (let iteration = 0; !ws.stopped; iteration++) {
- const pending = await getPendingOperations(ws);
- logger.trace(`pending operations: ${j2s(pending)}`);
- let numGivingLiveness = 0;
- let numDue = 0;
- let numThrottled = 0;
- let minDue: AbsoluteTime = AbsoluteTime.never();
-
- for (const p of pending.pendingOperations) {
- if (p.givesLifeness) {
- numGivingLiveness++;
- }
- if (!p.isDue) {
- continue;
- }
- numDue++;
-
- const isThrottled = throttler.applyThrottle(p.id);
-
- if (isThrottled) {
- logger.warn(
- `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`,
- );
- numDue--;
- numThrottled++;
- } else {
- minDue = AbsoluteTime.min(minDue, p.timestampDue);
- }
- }
-
- logger.trace(
- `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #throttled=${numThrottled}`,
- );
-
- if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
- logger.warn(`stopping, as no pending operations have lifeness`);
- ws.isTaskLoopRunning = false;
- return;
- }
-
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return;
- }
-
- // Make sure that we run tasks that don't give lifeness at least
- // one time.
- if (iteration !== 0 && numDue === 0) {
- // We've executed pending, due operations at least one.
- // Now we don't have any more operations available,
- // and need to wait.
-
- // Wait for at most 5 seconds to the next check.
- const dt = durationMin(
- durationFromSpec({
- seconds: 5,
- }),
- Duration.getRemaining(minDue),
- );
- logger.trace(`waiting for at most ${dt.d_ms} ms`);
- const timeout = ws.timerGroup.resolveAfter(dt);
- // Wait until either the timeout, or we are notified (via the latch)
- // that more work might be available.
- await Promise.race([timeout, ws.workAvailable.wait()]);
- logger.trace(`done waiting for available work`);
- } else {
- logger.trace(
- `running ${pending.pendingOperations.length} pending operations`,
- );
- for (const p of pending.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- logger.trace(`running task ${p.id}`);
- const res = await runTaskWithErrorReporting(ws, p.id, async () => {
- return await callOperationHandler(ws, p);
- });
- if (!(ws.stopped && res.type === TaskRunResultType.Error)) {
- ws.notify({
- type: NotificationType.PendingOperationProcessed,
- id: p.id,
- taskResultType: res.type,
- });
- }
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return;
- }
- }
- }
- }
- logger.trace("exiting wallet task loop");
- ws.isTaskLoopRunning = false;
- return;
+ await ws.taskScheduler.run(opts);
}
/**
@@ -1035,7 +836,10 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return await getUserAttentionsUnreadCount(ws, req);
}
case WalletApiOperation.GetPendingOperations: {
- return await getPendingOperations(ws);
+ // FIXME: Eventually remove the handler after deprecation period.
+ return {
+ pendingOperations: [],
+ } satisfies PendingOperationsResponse;
}
case WalletApiOperation.SetExchangeTosAccepted: {
const req = codecForAcceptExchangeTosRequest().decode(payload);
@@ -1066,8 +870,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return getContractTermsDetails(ws, req.proposalId);
}
case WalletApiOperation.RetryPendingNow: {
- // FIXME: Should we reset all operation retries here?
- await runPending(ws);
+ logger.error("retryPendingNow currently not implemented");
return {};
}
case WalletApiOperation.SharePayment: {
@@ -1175,10 +978,6 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
await waitTransactionState(ws, req.transactionId, req.txState);
return {};
}
- case WalletApiOperation.TestingWaitTasksProcessed: {
- await waitUntilTasksProcessed(ws);
- return {};
- }
case WalletApiOperation.GetCurrencySpecification: {
// Ignore result, just validate in this mock implementation
const req = codecForGetCurrencyInfoRequest().decode(payload);
@@ -1451,7 +1250,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
case WalletApiOperation.TestingSetTimetravel: {
const req = codecForTestingSetTimetravelRequest().decode(payload);
setDangerousTimetravel(req.offsetMs);
- ws.workAvailable.trigger();
+ ws.taskScheduler.reload();
return {};
}
case WalletApiOperation.DeleteExchange: {
@@ -1634,11 +1433,6 @@ export class Wallet {
this.ws.stop();
}
- async runPending(): Promise<void> {
- await this.ws.ensureWalletDbOpen();
- return runPending(this.ws);
- }
-
async runTaskLoop(opts?: RetryLoopOpts): Promise<void> {
await this.ws.ensureWalletDbOpen();
return runTaskLoop(this.ws, opts);
@@ -1660,11 +1454,6 @@ export class Wallet {
* This ties together all the operation implementations.
*/
class InternalWalletStateImpl implements InternalWalletState {
- /**
- * @see {@link InternalWalletState.activeLongpoll}
- */
- activeLongpoll: ActiveLongpollInfo = {};
-
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;
@@ -1697,6 +1486,8 @@ class InternalWalletStateImpl implements InternalWalletState {
isTaskLoopRunning: boolean = false;
+ taskScheduler: TaskScheduler = new TaskScheduler(this);
+
config: Readonly<WalletConfig>;
private _db: DbAccess<typeof WalletStoresV1> | undefined = undefined;
@@ -1843,7 +1634,7 @@ class InternalWalletStateImpl implements InternalWalletState {
}
notify(n: WalletNotification): void {
- logger.trace("Notification", j2s(n));
+ logger.trace(`Notification: ${j2s(n)}`);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
setTimeout(() => {
@@ -1870,11 +1661,6 @@ class InternalWalletStateImpl implements InternalWalletState {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop();
- for (const key of Object.keys(this.activeLongpoll)) {
- logger.trace(`cancelling active longpoll ${key}`);
- this.activeLongpoll[key].cancel();
- delete this.activeLongpoll[key];
- }
}
/**