summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts184
1 files changed, 106 insertions, 78 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index f04bcd2c2..d662bd7ae 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -27,7 +27,6 @@ import {
NotificationType,
ObservabilityContext,
ObservabilityEventType,
- RetryLoopOpts,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
@@ -37,6 +36,7 @@ import {
assertUnreachable,
getErrorDetailFromException,
j2s,
+ safeStringifyException,
} from "@gnu-taler/taler-util";
import { processBackupForProvider } from "./backup/index.js";
import {
@@ -61,7 +61,10 @@ import {
computeDepositTransactionStatus,
processDepositGroup,
} from "./deposits.js";
-import { updateExchangeFromUrlHandler } from "./exchanges.js";
+import {
+ computeDenomLossTransactionStatus,
+ updateExchangeFromUrlHandler,
+} from "./exchanges.js";
import {
computePayMerchantTransactionState,
computeRefundTransactionState,
@@ -88,7 +91,6 @@ import {
computeRefreshTransactionState,
processRefreshGroup,
} from "./refresh.js";
-import { computeRewardTransactionStatus } from "./reward.js";
import {
constructTransactionIdentifier,
parseTransactionIdentifier,
@@ -140,13 +142,14 @@ function taskGivesLiveness(taskId: string): boolean {
}
export interface TaskScheduler {
- ensureRunning(): void;
- run(opts?: RetryLoopOpts): Promise<void>;
+ ensureRunning(): Promise<void>;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
resetTaskRetries(taskId: TaskIdStr): Promise<void>;
- reload(): void;
+ reload(): Promise<void>;
getActiveTasks(): TaskIdStr[];
+ isIdle(): boolean;
+ shutdown(): Promise<void>;
}
export class TaskSchedulerImpl implements TaskScheduler {
@@ -174,58 +177,73 @@ export class TaskSchedulerImpl implements TaskScheduler {
return [...this.sheps.keys()];
}
- ensureRunning(): void {
+ async shutdown(): Promise<void> {
+ const tasksIds = [...this.sheps.keys()];
+ logger.info(`Stopping task shepherd.`);
+ for (const taskId of tasksIds) {
+ this.stopShepherdTask(taskId);
+ }
+ }
+
+ async ensureRunning(): Promise<void> {
if (this.isRunning) {
return;
}
+ this.isRunning = true;
+ try {
+ await this.loadTasksFromDb();
+ } catch (e) {
+ this.isRunning = false;
+ throw e;
+ }
this.run()
.catch((e) => {
logger.error("error running task loop");
logger.error(`err: ${e}`);
})
.then(() => {
- logger.info("done running task loop");
+ logger.trace("done running task loop");
+ this.isRunning = false;
});
}
- async run(opts: RetryLoopOpts = {}): Promise<void> {
- if (this.isRunning) {
- throw Error("task loop already running");
+ isIdle(): boolean {
+ let alive = false;
+ const taskIds = [...this.sheps.keys()];
+ for (const taskId of taskIds) {
+ if (taskGivesLiveness(taskId)) {
+ alive = true;
+ break;
+ }
}
- logger.info("Running task loop.");
- this.isRunning = true;
- await this.loadTasksFromDb();
- logger.info("loaded!");
- logger.info(`sheps: ${this.sheps.size}`);
+ // We're idle if no task is alive anymore.
+ return !alive;
+ }
+
+ private async run(): Promise<void> {
+ logger.trace("Running task loop.");
+ logger.trace(`sheps: ${this.sheps.size}`);
while (true) {
- if (opts.stopWhenDone) {
- let alive = false;
- const taskIds = [...this.sheps.keys()];
- logger.info(`current task IDs: ${j2s(taskIds)}`);
- logger.info(`sheps: ${this.sheps.size}`);
- for (const taskId of taskIds) {
- if (taskGivesLiveness(taskId)) {
- alive = true;
- break;
- }
- }
- if (!alive) {
- logger.info("Breaking out of task loop (no more work).");
- break;
- }
- }
if (this.ws.stopped) {
- logger.info("Breaking out of task loop (wallet stopped).");
+ logger.trace("Breaking out of task loop (wallet stopped).");
break;
}
+
+ if (this.isIdle()) {
+ this.ws.notify({
+ type: NotificationType.Idle,
+ });
+ }
+
await this.iterCond.wait();
}
- this.isRunning = false;
- logger.info("Done with task loop.");
+ logger.trace("Done with task loop.");
}
startShepherdTask(taskId: TaskIdStr): void {
- this.ensureRunning();
+ this.ensureRunning().catch((e) => {
+ logger.error(`error running scheduler: ${safeStringifyException(e)}`);
+ });
// Run in the background, no await!
this.internalStartShepherdTask(taskId);
}
@@ -235,10 +253,10 @@ export class TaskSchedulerImpl implements TaskScheduler {
*
* Mostly useful to interrupt all waits when time-travelling.
*/
- reload() {
- this.ensureRunning();
+ async reload(): Promise<void> {
+ await this.ensureRunning();
const tasksIds = [...this.sheps.keys()];
- logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
+ logger.info(`reloading shepherd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
this.stopShepherdTask(taskId);
}
@@ -351,11 +369,11 @@ export class TaskSchedulerImpl implements TaskScheduler {
};
}
if (info.cts.token.isCancelled) {
- logger.info("task cancelled, not processing result");
+ logger.trace("task cancelled, not processing result");
return;
}
if (this.ws.stopped) {
- logger.info("wallet stopped, not processing result");
+ logger.trace("wallet stopped, not processing result");
return;
}
wex.oc.observe({
@@ -364,15 +382,20 @@ export class TaskSchedulerImpl implements TaskScheduler {
});
switch (res.type) {
case TaskRunResultType.Error: {
- logger.trace(`Shepherd for ${taskId} got error result.`);
+ if (logger.shouldLogTrace()) {
+ logger.trace(
+ `Shepherd for ${taskId} got error result: ${j2s(
+ res.errorDetail,
+ )}`,
+ );
+ }
const retryRecord = await storePendingTaskError(
this.ws,
taskId,
res.errorDetail,
);
- let delay: Duration;
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
+ const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
@@ -380,9 +403,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
const retryRecord = await storePendingTaskPending(this.ws, taskId);
- let delay: Duration;
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
- delay = AbsoluteTime.remaining(t);
+ const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
@@ -394,13 +416,14 @@ export class TaskSchedulerImpl implements TaskScheduler {
await storeTaskProgress(this.ws, taskId);
break;
}
- case TaskRunResultType.ScheduleLater:
+ case TaskRunResultType.ScheduleLater: {
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
await storeTaskProgress(this.ws, taskId);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
+ }
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
await storePendingTaskFinished(this.ws, taskId);
@@ -466,9 +489,12 @@ async function storeTaskProgress(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
- await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
- await tx.operationRetries.delete(pendingTaskId);
- });
+ await ws.db.runReadWriteTx(
+ { storeNames: ["operationRetries"] },
+ async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ },
+ );
}
async function storePendingTaskPending(
@@ -515,9 +541,12 @@ async function storePendingTaskFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
- await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
- await tx.operationRetries.delete(pendingTaskId);
- });
+ await ws.db.runReadWriteTx(
+ { storeNames: ["operationRetries"] },
+ async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ },
+ );
}
function getWalletExecutionContextForTask(
@@ -636,6 +665,7 @@ async function getTransactionState(
"peerPushCredit",
"rewards",
"refreshGroups",
+ "denomLossEvents",
]
>,
transactionId: string,
@@ -674,12 +704,13 @@ async function getTransactionState(
}
return computeRefundTransactionState(rec);
}
- case TransactionType.PeerPullCredit:
+ case TransactionType.PeerPullCredit: {
const rec = await tx.peerPullCredit.get(parsedTxId.pursePub);
if (!rec) {
return undefined;
}
return computePeerPullCreditTransactionState(rec);
+ }
case TransactionType.PeerPullDebit: {
const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId);
if (!rec) {
@@ -708,15 +739,15 @@ async function getTransactionState(
}
return computeRefreshTransactionState(rec);
}
- case TransactionType.Reward: {
- const rec = await tx.rewards.get(parsedTxId.walletRewardId);
+ case TransactionType.Recoup:
+ throw Error("not yet supported");
+ case TransactionType.DenomLoss: {
+ const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId);
if (!rec) {
return undefined;
}
- return computeRewardTransactionStatus(rec);
+ return computeDenomLossTransactionStatus(rec);
}
- case TransactionType.Recoup:
- throw Error("not yet supported");
default:
assertUnreachable(parsedTxId);
}
@@ -855,8 +886,6 @@ export function listTaskForTransactionId(transactionId: string): TaskIdStr[] {
];
case TransactionType.Refund:
return [];
- case TransactionType.Reward:
- return [];
case TransactionType.Withdrawal:
return [
constructTaskIdentifier({
@@ -864,6 +893,8 @@ export function listTaskForTransactionId(transactionId: string): TaskIdStr[] {
withdrawalGroupId: tid.withdrawalGroupId,
}),
];
+ case TransactionType.DenomLoss:
+ return [];
default:
assertUnreachable(tid);
}
@@ -911,11 +942,6 @@ export function convertTaskToTransactionId(
tag: TransactionType.Refresh,
refreshGroupId: parsedTaskId.refreshGroupId,
});
- case PendingTaskType.RewardPickup:
- return constructTransactionIdentifier({
- tag: TransactionType.Reward,
- walletRewardId: parsedTaskId.walletRewardId,
- });
case PendingTaskType.PeerPushDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
@@ -942,18 +968,20 @@ export async function getActiveTaskIds(
taskIds: [],
};
await ws.db.runReadWriteTx(
- [
- "exchanges",
- "refreshGroups",
- "withdrawalGroups",
- "purchases",
- "depositGroups",
- "recoupGroups",
- "peerPullCredit",
- "peerPushDebit",
- "peerPullDebit",
- "peerPushCredit",
- ],
+ {
+ storeNames: [
+ "exchanges",
+ "refreshGroups",
+ "withdrawalGroups",
+ "purchases",
+ "depositGroups",
+ "recoupGroups",
+ "peerPullCredit",
+ "peerPushDebit",
+ "peerPullDebit",
+ "peerPushCredit",
+ ],
+ },
async (tx) => {
const active = GlobalIDB.KeyRange.bound(
OPERATION_STATUS_ACTIVE_FIRST,