commit 1fa0d85e36431dafcd1d42497bc7f32d2413026b
parent 2c8ff2dafc9cb2fc2a54aa538b5de37032bc29d9
Author: Florian Dold <florian@dold.me>
Date: Sat, 18 Apr 2026 01:33:52 +0200
wallet-core: when starting tasks, interrupt existing scheduled wait
Diffstat:
11 files changed, 96 insertions(+), 49 deletions(-)
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
@@ -472,8 +472,7 @@ export class DepositTransactionContext implements TransactionContext {
}
return undefined;
});
- wex.taskScheduler.stopShepherdTask(retryTag);
- wex.taskScheduler.startShepherdTask(retryTag);
+ await wex.taskScheduler.resetTask(retryTag);
}
async userResumeTransaction(): Promise<void> {
@@ -540,7 +539,7 @@ export class DepositTransactionContext implements TransactionContext {
oldStId,
});
});
- wex.taskScheduler.startShepherdTask(retryTag);
+ await wex.taskScheduler.resetTask(retryTag);
}
async userFailTransaction(reason?: TalerErrorDetail): Promise<void> {
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
@@ -1200,7 +1200,7 @@ export async function startUpdateExchangeEntry(
logger.info(`updating exchange in task ${taskId}`);
if (options.forceUpdate) {
- await wex.taskScheduler.resetTaskRetries(taskId);
+ await wex.taskScheduler.resetTask(taskId);
} else {
wex.taskScheduler.startShepherdTask(taskId);
}
@@ -2185,8 +2185,8 @@ export async function updateExchangeFromUrlHandler(
// Make sure an auto-refresh task is scheduled for this exchange.
const autoRefreshTaskId =
TaskIdentifiers.forExchangeAutoRefreshFromUrl(exchangeBaseUrl);
- await wex.taskScheduler.resetTaskRetries(autoRefreshTaskId);
- await wex.taskScheduler.resetTaskRetries(
+ await wex.taskScheduler.resetTask(autoRefreshTaskId);
+ await wex.taskScheduler.resetTask(
constructTaskIdentifier({ tag: PendingTaskType.ValidateDenoms }),
);
diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts
@@ -97,7 +97,7 @@ export class ObservableTaskScheduler implements TaskScheduler {
return this.impl.stopShepherdTask(taskId);
}
- resetTaskRetries(taskId: TaskIdStr): Promise<void> {
+ resetTask(taskId: TaskIdStr): Promise<void> {
this.declareDep(taskId);
if (this.taskDepCache.size > 500) {
this.taskDepCache.clear();
@@ -106,7 +106,7 @@ export class ObservableTaskScheduler implements TaskScheduler {
type: ObservabilityEventType.TaskReset,
taskId,
});
- return this.impl.resetTaskRetries(taskId);
+ return this.impl.resetTask(taskId);
}
async reload(): Promise<void> {
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -488,8 +488,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
}
await h.update(purchase);
});
- wex.taskScheduler.stopShepherdTask(this.taskId);
- wex.taskScheduler.startShepherdTask(this.taskId);
+ await wex.taskScheduler.resetTask(this.taskId);
}
async userResumeTransaction(): Promise<void> {
@@ -506,7 +505,7 @@ export class PayMerchantTransactionContext implements TransactionContext {
purchase.purchaseStatus = newStatus.next;
await h.update(purchase, BalanceEffect.Any);
});
- wex.taskScheduler.startShepherdTask(this.taskId);
+ await wex.taskScheduler.resetTask(this.taskId);
}
async userFailTransaction(reason?: TalerErrorDetail): Promise<void> {
@@ -1938,7 +1937,7 @@ async function checkPaymentByProposalId(
await h.update(p);
});
- wex.taskScheduler.startShepherdTask(ctx.taskId);
+ await wex.taskScheduler.resetTask(ctx.taskId);
// FIXME: Consider changing the API here so that we don't have to
// wait inline for the repurchase.
@@ -2729,7 +2728,7 @@ export async function confirmPay(
wex,
existingPurchase.proposalId,
);
- await wex.taskScheduler.resetTaskRetries(ctx.taskId);
+ await wex.taskScheduler.resetTask(ctx.taskId);
// This will become the default behavior on the future.
if (wex.ws.devExperimentState.flagConfirmPayNoWait) {
return {
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -455,7 +455,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
async userAbortTransaction(reason?: TalerErrorDetail): Promise<void> {
@@ -496,8 +496,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.stopShepherdTask(this.taskId);
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -281,7 +281,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
async userFailTransaction(reason?: TalerErrorDetail): Promise<void> {
@@ -358,8 +358,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
pi.abortReason = reason;
await h.update(pi);
});
- this.wex.taskScheduler.stopShepherdTask(this.taskId);
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
}
@@ -818,8 +817,7 @@ export async function confirmPeerPullDebit(
await h.update(rec);
});
- wex.taskScheduler.stopShepherdTask(ctx.taskId);
- wex.taskScheduler.startShepherdTask(ctx.taskId);
+ await wex.taskScheduler.resetTask(ctx.taskId);
return {
transactionId: ctx.transactionId,
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -411,7 +411,7 @@ export class PeerPushCreditTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
async failTransaction(
@@ -1284,8 +1284,7 @@ export async function confirmPeerPushCredit(
}
});
- wex.taskScheduler.stopShepherdTask(ctx.taskId);
- wex.taskScheduler.startShepherdTask(ctx.taskId);
+ await wex.taskScheduler.resetTask(ctx.taskId);
return {
transactionId: ctx.transactionId,
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -289,8 +289,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.stopShepherdTask(this.taskId);
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
async userResumeTransaction(): Promise<void> {
@@ -323,7 +322,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
async failTransaction(
@@ -369,8 +368,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
await h.update(rec);
});
- this.wex.taskScheduler.stopShepherdTask(this.taskId);
- this.wex.taskScheduler.startShepherdTask(this.taskId);
+ await this.wex.taskScheduler.resetTask(this.taskId);
}
}
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
@@ -37,6 +37,7 @@ import {
assertUnreachable,
getErrorDetailFromException,
j2s,
+ openPromise,
safeStringifyException,
} from "@gnu-taler/taler-util";
import {
@@ -114,13 +115,28 @@ import {
const logger = new Logger("shepherd.ts");
+enum TaskState {
+ StopRequested = 0,
+ Running = 1,
+ WaitingBackoff = 2,
+ WaitingScheduled = 3,
+ Done = 4,
+}
+
/**
* Info about one task being shepherded.
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
+ /**
+ * Promise that resolves when the current
+ * shepherd loop for the task has terminated.
+ */
latch?: Promise<void>;
- stopped: boolean;
+
+ interruptWait: () => void;
+
+ taskState: TaskState;
}
/**
@@ -163,7 +179,10 @@ export interface TaskScheduler {
ensureRunning(): Promise<void>;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
- resetTaskRetries(taskId: TaskIdStr): Promise<void>;
+ /**
+ * Restart a task and reset retry counter.
+ */
+ resetTask(taskId: TaskIdStr): Promise<void>;
reload(): Promise<void>;
getActiveTasks(): TaskIdStr[];
isIdle(): boolean;
@@ -268,7 +287,9 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.error(`error running scheduler: ${safeStringifyException(e)}`);
});
// Run in the background, no await!
- this.internalStartShepherdTask(taskId);
+ this.internalStartShepherdTask(taskId).catch((e) => {
+ logger.error(`error running shepherd loop: ${safeStringifyException(e)}`);
+ });
}
/**
@@ -292,9 +313,22 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
- if (!oldShep.stopped) {
- logger.trace(`Already have a shepherd for ${taskId}`);
- return;
+ switch (oldShep.taskState) {
+ case TaskState.WaitingBackoff:
+ case TaskState.Running:
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ case TaskState.WaitingScheduled:
+ logger.trace(
+ `Already have a shepherd (waiting) for ${taskId}, kicking shepherd`,
+ );
+ oldShep.interruptWait();
+ return;
+ case TaskState.StopRequested:
+ case TaskState.Done:
+ break;
+ default:
+ assertUnreachable(oldShep.taskState);
}
logger.trace(
`Waiting for old task to complete the loop in cancel mode ${taskId}`,
@@ -305,7 +339,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
- stopped: false,
+ taskState: TaskState.Running,
+ interruptWait: () => undefined,
};
this.sheps.set(taskId, newShep);
try {
@@ -323,8 +358,19 @@ export class TaskSchedulerImpl implements TaskScheduler {
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
- oldShep.cts.cancel(`stopping task ${taskId}`);
- oldShep.stopped = true;
+ switch (oldShep.taskState) {
+ case TaskState.Running:
+ case TaskState.WaitingBackoff:
+ case TaskState.WaitingScheduled:
+ oldShep.cts.cancel(`stopping task ${taskId}`);
+ oldShep.taskState = TaskState.StopRequested;
+ break;
+ case TaskState.Done:
+ case TaskState.StopRequested:
+ break;
+ default:
+ assertUnreachable(oldShep.taskState);
+ }
this.iterCond.trigger();
}
}
@@ -334,7 +380,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
this.startShepherdTask(taskId);
}
- async resetTaskRetries(taskId: TaskIdStr): Promise<void> {
+ async resetTask(taskId: TaskIdStr): Promise<void> {
await this.ws.runStandaloneLegacyWalletDbTx(async (tx) => {
logger.trace(`storing task [reset] for ${taskId}`);
await tx.operationRetries.delete(taskId);
@@ -358,7 +404,11 @@ export class TaskSchedulerImpl implements TaskScheduler {
delay: Duration,
): Promise<void> {
try {
- await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
+ const op = openPromise<void>();
+ info.interruptWait = () => op.resolve();
+ await info.cts.token.racePromise(
+ Promise.race([this.ws.timerGroup.resolveAfter(delay), op.promise]),
+ );
} catch (e) {
if (e instanceof CancellationToken.CancellationError) {
logger.trace(
@@ -367,6 +417,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
} else {
logger.trace(`waiting for ${taskId} interrupted: ${e}`);
}
+ } finally {
+ info.interruptWait = () => undefined;
}
}
@@ -439,6 +491,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.trace(
`Stored error for ${taskId}, waiting for ${delay.d_ms} ms`,
);
+ info.taskState = TaskState.WaitingBackoff;
await this.wait(taskId, info, delay);
break;
}
@@ -447,6 +500,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
+ info.taskState = TaskState.WaitingBackoff;
await this.wait(taskId, info, delay);
break;
}
@@ -463,6 +517,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
+ info.taskState = TaskState.WaitingScheduled;
await this.wait(taskId, info, delay);
break;
}
@@ -478,6 +533,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
logger.info(
`long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
);
+ info.taskState = TaskState.WaitingScheduled;
await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
} else {
logger.info(`task ${taskId} will long-poll again`);
@@ -488,6 +544,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
await storePendingTaskPending(this.ws, taskId);
const delay = Duration.getForever();
logger.trace(`Not retrying task until network is restored.`);
+ info.taskState = TaskState.WaitingScheduled;
await this.wait(taskId, info, delay);
break;
}
@@ -568,23 +625,21 @@ async function storePendingTaskPending(
);
}
await tx.operationRetries.put(retryRecord);
- let notification: WalletNotification | undefined = undefined;
if (hadError) {
- notification = await taskToRetryNotification(
+ const notif = await taskToRetryNotification(
ws,
tx,
pendingTaskId,
undefined,
);
+ if (notif) {
+ tx.notify(notif);
+ }
}
return {
- notification,
retryRecord,
};
});
- if (res.notification) {
- ws.notify(res.notification);
- }
return res.retryRecord;
}
diff --git a/packages/taler-wallet-core/src/transactions.ts b/packages/taler-wallet-core/src/transactions.ts
@@ -810,7 +810,7 @@ export async function retryTransaction(
logger.info(`resetting retry timeout for ${transactionId}`);
const taskId = maybeTaskFromTransaction(transactionId);
if (taskId) {
- await wex.taskScheduler.resetTaskRetries(taskId);
+ await wex.taskScheduler.resetTask(taskId);
}
}
@@ -821,7 +821,7 @@ export async function retryAll(wex: WalletExecutionContext): Promise<void> {
await wex.taskScheduler.ensureRunning();
const tasks = wex.taskScheduler.getActiveTasks();
for (const task of tasks) {
- await wex.taskScheduler.resetTaskRetries(task);
+ await wex.taskScheduler.resetTask(task);
}
}
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
@@ -4055,7 +4055,7 @@ export async function confirmWithdrawal(
}
});
- await wex.taskScheduler.resetTaskRetries(ctx.taskId);
+ await wex.taskScheduler.resetTask(ctx.taskId);
return {
transactionId: req.transactionId as TransactionIdStr,