summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/wallet.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-13 10:53:43 +0100
committerFlorian Dold <florian@dold.me>2024-02-15 21:56:54 +0100
commit70a803038f1cbe05dc4779bdd87376fd073421be (patch)
tree6607d69f6906ada9f912e31d9a9e3b65560a7326 /packages/taler-wallet-core/src/wallet.ts
parent2c17e98c336d96f955ec82ad0a1b164e3da90103 (diff)
downloadwallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.gz
wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.tar.bz2
wallet-core-70a803038f1cbe05dc4779bdd87376fd073421be.zip
implement task shepherd, many small fixes and tweaksdev/dold/task-shepherd
Diffstat (limited to 'packages/taler-wallet-core/src/wallet.ts')
-rw-r--r--packages/taler-wallet-core/src/wallet.ts244
1 files changed, 15 insertions, 229 deletions
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 42aa8cdfc..0246597be 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -24,7 +24,6 @@
*/
import { IDBFactory } from "@gnu-taler/idb-bridge";
import {
- AbsoluteTime,
AmountString,
Amounts,
CoinDumpJson,
@@ -33,7 +32,6 @@ import {
CreateStoredBackupResponse,
DeleteStoredBackupRequest,
DenominationInfo,
- Duration,
ExchangesShortListResponse,
GetCurrencySpecificationResponse,
InitResponse,
@@ -42,15 +40,14 @@ import {
ListGlobalCurrencyAuditorsResponse,
ListGlobalCurrencyExchangesResponse,
Logger,
- NotificationType,
PrepareWithdrawExchangeRequest,
PrepareWithdrawExchangeResponse,
RecoverStoredBackupRequest,
+ RetryLoopOpts,
StoredBackupList,
TalerError,
TalerErrorCode,
TalerUriAction,
- TaskThrottler,
TestingWaitTransactionRequest,
TransactionState,
TransactionType,
@@ -123,8 +120,6 @@ import {
codecForUserAttentionsRequest,
codecForValidateIbanRequest,
codecForWithdrawTestBalance,
- durationFromSpec,
- durationMin,
getErrorDetailFromException,
j2s,
parsePaytoUri,
@@ -152,7 +147,6 @@ import {
} from "./db.js";
import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
import {
- ActiveLongpollInfo,
CancelFn,
InternalWalletState,
MerchantInfo,
@@ -172,23 +166,16 @@ import {
getBackupInfo,
getBackupRecovery,
loadBackupRecovery,
- processBackupForProvider,
removeBackupProvider,
runBackupCycle,
setWalletDeviceId,
} from "./operations/backup/index.js";
import { getBalanceDetail, getBalances } from "./operations/balance.js";
import {
- TaskRunResult,
- TaskRunResultType,
- runTaskWithErrorReporting,
-} from "./operations/common.js";
-import {
computeDepositTransactionStatus,
createDepositGroup,
generateDepositGroupTxId,
prepareDepositGroup,
- processDepositGroup,
} from "./operations/deposits.js";
import {
acceptExchangeTermsOfService,
@@ -200,7 +187,6 @@ import {
getExchangeTos,
listExchanges,
lookupExchangeByUri,
- updateExchangeFromUrlHandler,
} from "./operations/exchanges.js";
import {
computePayMerchantTransactionState,
@@ -209,7 +195,6 @@ import {
getContractTermsDetails,
preparePayForTemplate,
preparePayForUri,
- processPurchase,
sharePayment,
startQueryRefund,
startRefundQueryForUri,
@@ -218,38 +203,28 @@ import {
checkPeerPullPaymentInitiation,
computePeerPullCreditTransactionState,
initiatePeerPullPayment,
- processPeerPullCredit,
} from "./operations/pay-peer-pull-credit.js";
import {
computePeerPullDebitTransactionState,
confirmPeerPullDebit,
preparePeerPullDebit,
- processPeerPullDebit,
} from "./operations/pay-peer-pull-debit.js";
import {
computePeerPushCreditTransactionState,
confirmPeerPushCredit,
preparePeerPushCredit,
- processPeerPushCredit,
} from "./operations/pay-peer-push-credit.js";
import {
checkPeerPushDebit,
computePeerPushDebitTransactionState,
initiatePeerPushDebit,
- processPeerPushDebit,
} from "./operations/pay-peer-push-debit.js";
-import { getPendingOperations } from "./operations/pending.js";
-import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
+import { createRecoupGroup } from "./operations/recoup.js";
import {
- autoRefresh,
computeRefreshTransactionState,
forceRefresh,
- processRefreshGroup,
} from "./operations/refresh.js";
-import {
- computeRewardTransactionStatus,
- processTip,
-} from "./operations/reward.js";
+import { computeRewardTransactionStatus } from "./operations/reward.js";
import {
runIntegrationTest,
runIntegrationTest2,
@@ -257,7 +232,6 @@ import {
waitTransactionState,
waitUntilAllTransactionsFinal,
waitUntilRefreshesDone,
- waitUntilTasksProcessed,
withdrawTestBalance,
} from "./operations/testing.js";
import {
@@ -279,9 +253,9 @@ import {
createManualWithdrawal,
getExchangeWithdrawalInfo,
getWithdrawalDetailsForUri,
- processWithdrawalGroup,
} from "./operations/withdraw.js";
-import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
+import { PendingOperationsResponse } from "./pending-types.js";
+import { TaskScheduler } from "./shepherd.js";
import { assertUnreachable } from "./util/assertUnreachable.js";
import {
convertDepositAmount,
@@ -320,184 +294,11 @@ import {
const logger = new Logger("wallet.ts");
-/**
- * Call the right handler for a pending operation without doing
- * any special error handling.
- */
-async function callOperationHandler(
- ws: InternalWalletState,
- pending: PendingTaskInfo,
-): Promise<TaskRunResult> {
- switch (pending.type) {
- case PendingTaskType.ExchangeUpdate:
- return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Refresh:
- return await processRefreshGroup(ws, pending.refreshGroupId);
- case PendingTaskType.Withdraw:
- return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
- case PendingTaskType.RewardPickup:
- return await processTip(ws, pending.tipId);
- case PendingTaskType.Purchase:
- return await processPurchase(ws, pending.proposalId);
- case PendingTaskType.Recoup:
- return await processRecoupGroup(ws, pending.recoupGroupId);
- case PendingTaskType.ExchangeCheckRefresh:
- return await autoRefresh(ws, pending.exchangeBaseUrl);
- case PendingTaskType.Deposit:
- return await processDepositGroup(ws, pending.depositGroupId);
- case PendingTaskType.Backup:
- return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
- case PendingTaskType.PeerPushDebit:
- return await processPeerPushDebit(ws, pending.pursePub);
- case PendingTaskType.PeerPullCredit:
- return await processPeerPullCredit(ws, pending.pursePub);
- case PendingTaskType.PeerPullDebit:
- return await processPeerPullDebit(ws, pending.peerPullDebitId);
- case PendingTaskType.PeerPushCredit:
- return await processPeerPushCredit(ws, pending.peerPushCreditId);
- default:
- return assertUnreachable(pending);
- }
- throw Error(`not reached ${pending.type}`);
-}
-
-/**
- * Process pending operations.
- */
-export async function runPending(ws: InternalWalletState): Promise<void> {
- const pendingOpsResponse = await getPendingOperations(ws);
- for (const p of pendingOpsResponse.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- await runTaskWithErrorReporting(ws, p.id, async () => {
- logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
- return await callOperationHandler(ws, p);
- });
- }
-}
-
-export interface RetryLoopOpts {
- /**
- * Stop the retry loop when all lifeness-giving pending operations
- * are done.
- *
- * Defaults to false.
- */
- stopWhenDone?: boolean;
-}
-
-/**
- * Main retry loop of the wallet.
- *
- * Looks up pending operations from the wallet, runs them, repeat.
- */
async function runTaskLoop(
ws: InternalWalletState,
opts: RetryLoopOpts = {},
): Promise<void> {
- logger.trace(`running task loop opts=${j2s(opts)}`);
- if (ws.isTaskLoopRunning) {
- logger.warn(
- "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
- );
- }
- const throttler = new TaskThrottler();
- ws.isTaskLoopRunning = true;
- for (let iteration = 0; !ws.stopped; iteration++) {
- const pending = await getPendingOperations(ws);
- logger.trace(`pending operations: ${j2s(pending)}`);
- let numGivingLiveness = 0;
- let numDue = 0;
- let numThrottled = 0;
- let minDue: AbsoluteTime = AbsoluteTime.never();
-
- for (const p of pending.pendingOperations) {
- if (p.givesLifeness) {
- numGivingLiveness++;
- }
- if (!p.isDue) {
- continue;
- }
- numDue++;
-
- const isThrottled = throttler.applyThrottle(p.id);
-
- if (isThrottled) {
- logger.warn(
- `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`,
- );
- numDue--;
- numThrottled++;
- } else {
- minDue = AbsoluteTime.min(minDue, p.timestampDue);
- }
- }
-
- logger.trace(
- `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #throttled=${numThrottled}`,
- );
-
- if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
- logger.warn(`stopping, as no pending operations have lifeness`);
- ws.isTaskLoopRunning = false;
- return;
- }
-
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return;
- }
-
- // Make sure that we run tasks that don't give lifeness at least
- // one time.
- if (iteration !== 0 && numDue === 0) {
- // We've executed pending, due operations at least one.
- // Now we don't have any more operations available,
- // and need to wait.
-
- // Wait for at most 5 seconds to the next check.
- const dt = durationMin(
- durationFromSpec({
- seconds: 5,
- }),
- Duration.getRemaining(minDue),
- );
- logger.trace(`waiting for at most ${dt.d_ms} ms`);
- const timeout = ws.timerGroup.resolveAfter(dt);
- // Wait until either the timeout, or we are notified (via the latch)
- // that more work might be available.
- await Promise.race([timeout, ws.workAvailable.wait()]);
- logger.trace(`done waiting for available work`);
- } else {
- logger.trace(
- `running ${pending.pendingOperations.length} pending operations`,
- );
- for (const p of pending.pendingOperations) {
- if (!AbsoluteTime.isExpired(p.timestampDue)) {
- continue;
- }
- logger.trace(`running task ${p.id}`);
- const res = await runTaskWithErrorReporting(ws, p.id, async () => {
- return await callOperationHandler(ws, p);
- });
- if (!(ws.stopped && res.type === TaskRunResultType.Error)) {
- ws.notify({
- type: NotificationType.PendingOperationProcessed,
- id: p.id,
- taskResultType: res.type,
- });
- }
- if (ws.stopped) {
- ws.isTaskLoopRunning = false;
- return;
- }
- }
- }
- }
- logger.trace("exiting wallet task loop");
- ws.isTaskLoopRunning = false;
- return;
+ await ws.taskScheduler.run(opts);
}
/**
@@ -1035,7 +836,10 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return await getUserAttentionsUnreadCount(ws, req);
}
case WalletApiOperation.GetPendingOperations: {
- return await getPendingOperations(ws);
+ // FIXME: Eventually remove the handler after deprecation period.
+ return {
+ pendingOperations: [],
+ } satisfies PendingOperationsResponse;
}
case WalletApiOperation.SetExchangeTosAccepted: {
const req = codecForAcceptExchangeTosRequest().decode(payload);
@@ -1066,8 +870,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return getContractTermsDetails(ws, req.proposalId);
}
case WalletApiOperation.RetryPendingNow: {
- // FIXME: Should we reset all operation retries here?
- await runPending(ws);
+ logger.error("retryPendingNow currently not implemented");
return {};
}
case WalletApiOperation.SharePayment: {
@@ -1175,10 +978,6 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
await waitTransactionState(ws, req.transactionId, req.txState);
return {};
}
- case WalletApiOperation.TestingWaitTasksProcessed: {
- await waitUntilTasksProcessed(ws);
- return {};
- }
case WalletApiOperation.GetCurrencySpecification: {
// Ignore result, just validate in this mock implementation
const req = codecForGetCurrencyInfoRequest().decode(payload);
@@ -1451,7 +1250,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
case WalletApiOperation.TestingSetTimetravel: {
const req = codecForTestingSetTimetravelRequest().decode(payload);
setDangerousTimetravel(req.offsetMs);
- ws.workAvailable.trigger();
+ ws.taskScheduler.reload();
return {};
}
case WalletApiOperation.DeleteExchange: {
@@ -1634,11 +1433,6 @@ export class Wallet {
this.ws.stop();
}
- async runPending(): Promise<void> {
- await this.ws.ensureWalletDbOpen();
- return runPending(this.ws);
- }
-
async runTaskLoop(opts?: RetryLoopOpts): Promise<void> {
await this.ws.ensureWalletDbOpen();
return runTaskLoop(this.ws, opts);
@@ -1660,11 +1454,6 @@ export class Wallet {
* This ties together all the operation implementations.
*/
class InternalWalletStateImpl implements InternalWalletState {
- /**
- * @see {@link InternalWalletState.activeLongpoll}
- */
- activeLongpoll: ActiveLongpollInfo = {};
-
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;
@@ -1697,6 +1486,8 @@ class InternalWalletStateImpl implements InternalWalletState {
isTaskLoopRunning: boolean = false;
+ taskScheduler: TaskScheduler = new TaskScheduler(this);
+
config: Readonly<WalletConfig>;
private _db: DbAccess<typeof WalletStoresV1> | undefined = undefined;
@@ -1843,7 +1634,7 @@ class InternalWalletStateImpl implements InternalWalletState {
}
notify(n: WalletNotification): void {
- logger.trace("Notification", j2s(n));
+ logger.trace(`Notification: ${j2s(n)}`);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
setTimeout(() => {
@@ -1870,11 +1661,6 @@ class InternalWalletStateImpl implements InternalWalletState {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop();
- for (const key of Object.keys(this.activeLongpoll)) {
- logger.trace(`cancelling active longpoll ${key}`);
- this.activeLongpoll[key].cancel();
- delete this.activeLongpoll[key];
- }
}
/**