summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-19 21:13:00 +0100
committerFlorian Dold <florian@dold.me>2024-02-19 21:13:00 +0100
commit862ac9a16aa891c26355f9ad5858283c3aa029d6 (patch)
treef8b4244463a8bba73e277a6113fe306b3a3658a7
parent20397e3fba3fe4b274354047f76e3a8f3a92d6b8 (diff)
downloadwallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.tar.gz
wallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.tar.bz2
wallet-core-862ac9a16aa891c26355f9ad5858283c3aa029d6.zip
wallet-core: safer long-polling
We now wait for some time if long-polling returns too early
-rw-r--r--packages/taler-wallet-core/src/common.ts15
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts57
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts28
3 files changed, 71 insertions, 29 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index 9d7f2e763..45351f680 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -350,6 +350,7 @@ export enum TaskRunResultType {
Backoff = "backoff",
Progress = "progress",
Error = "error",
+ LongpollReturnedPending = "longpoll-returned-pending",
ScheduleLater = "schedule-later",
}
@@ -358,6 +359,7 @@ export type TaskRunResult =
| TaskRunErrorResult
| TaskRunBackoffResult
| TaskRunProgressResult
+ | TaskRunLongpollReturnedPendingResult
| TaskRunScheduleLaterResult;
export namespace TaskRunResult {
@@ -396,6 +398,15 @@ export namespace TaskRunResult {
runAt,
};
}
+ /**
+ * Longpolling returned, but what we're waiting for
+ * is still pending on the other side.
+ */
+ export function longpollReturnedPending(): TaskRunLongpollReturnedPendingResult {
+ return {
+ type: TaskRunResultType.LongpollReturnedPending,
+ };
+ }
}
export interface TaskRunFinishedResult {
@@ -415,6 +426,10 @@ export interface TaskRunScheduleLaterResult {
runAt: AbsoluteTime;
}
+export interface TaskRunLongpollReturnedPendingResult {
+ type: TaskRunResultType.LongpollReturnedPending;
+}
+
export interface TaskRunErrorResult {
type: TaskRunResultType.Error;
errorDetail: TalerErrorDetail;
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index d6fc604e8..0639b7976 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -227,6 +227,18 @@ export class TaskScheduler {
this.startShepherdTask(taskId);
}
+ private async wait(
+ taskId: TaskId,
+ info: ShepherdInfo,
+ delay: Duration,
+ ): Promise<void> {
+ try {
+ await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
+ } catch (e) {
+ logger.info(`waiting for ${taskId} interrupted`);
+ }
+ }
+
private async internalShepherdTask(
taskId: TaskId,
info: ShepherdInfo,
@@ -250,6 +262,7 @@ export class TaskScheduler {
Duration.fromSpec({ seconds: 60 }),
);
}
+ const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
@@ -273,13 +286,7 @@ export class TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
@@ -292,13 +299,7 @@ export class TaskScheduler {
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
@@ -314,17 +315,27 @@ export class TaskScheduler {
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
- try {
- await info.cts.token.racePromise(
- this.ws.timerGroup.resolveAfter(delay),
- );
- } catch (e) {
- logger.info(`waiting for ${taskId} interrupted`);
- }
+ await this.wait(taskId, info, delay);
break;
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
return;
+ case TaskRunResultType.LongpollReturnedPending: {
+ // Make sure that we are waiting a bit if long-polling returned too early.
+ const endTime = AbsoluteTime.now();
+ const taskDuration = AbsoluteTime.difference(endTime, startTime);
+ if (
+ Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
+ ) {
+ logger.info(
+ `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
+ );
+ await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
+ } else {
+ logger.info(`task ${taskId} will long-poll again`);
+ }
+ break;
+ }
default:
assertUnreachable(res);
}
@@ -435,6 +446,10 @@ async function runTaskWithErrorReporting(
case TaskRunResultType.Progress:
await storeTaskProgress(ws, opId);
return resp;
+ case TaskRunResultType.LongpollReturnedPending:
+ // Longpoll should be run again immediately.
+ await storeTaskProgress(ws, opId);
+ return resp;
}
} catch (e) {
if (e instanceof CryptoApiStoppedError) {
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 9cf1ad36d..bfcf23588 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -911,6 +911,7 @@ async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
args: WithdrawalRequestBatchArgs,
+ cancellationToken: CancellationToken,
): Promise<WithdrawalBatchResult> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
@@ -997,6 +998,8 @@ async function processPlanchetExchangeBatchRequest(
const resp = await ws.http.fetch(reqUrl, {
method: "POST",
body: batchReq,
+ cancellationToken,
+ timeout: Duration.fromSpec({ seconds: 40 }),
});
if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs);
@@ -1300,7 +1303,7 @@ async function processQueryReserve(
`got reserve status error, EC=${result.talerErrorResponse.code}`,
);
if (resp.status === HttpStatusCode.NotFound) {
- return TaskRunResult.backoff();
+ return TaskRunResult.longpollReturnedPending();
} else {
throwUnexpectedRequestError(resp, result.talerErrorResponse);
}
@@ -1491,6 +1494,7 @@ async function processWithdrawalGroupPendingKyc(
async function processWithdrawalGroupPendingReady(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { withdrawalGroupId } = withdrawalGroup;
const transactionId = constructTransactionIdentifier({
@@ -1553,10 +1557,15 @@ async function processWithdrawalGroupPendingReady(
const maxBatchSize = 100;
for (let i = 0; i < numTotalCoins; i += maxBatchSize) {
- const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, {
- batchSize: maxBatchSize,
- coinStartIndex: i,
- });
+ const resp = await processPlanchetExchangeBatchRequest(
+ ws,
+ wgContext,
+ {
+ batchSize: maxBatchSize,
+ coinStartIndex: i,
+ },
+ cancellationToken,
+ );
let work: Promise<void>[] = [];
work = [];
for (let j = 0; j < resp.coinIdxs.length; j++) {
@@ -1688,7 +1697,11 @@ export async function processWithdrawalGroup(
);
case WithdrawalGroupStatus.PendingReady:
// Continue with the actual withdrawal!
- return await processWithdrawalGroupPendingReady(ws, withdrawalGroup);
+ return await processWithdrawalGroupPendingReady(
+ ws,
+ withdrawalGroup,
+ cancellationToken,
+ );
case WithdrawalGroupStatus.AbortingBank:
return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup);
case WithdrawalGroupStatus.AbortedBank:
@@ -2265,8 +2278,7 @@ async function processReserveBankStatus(
}
if (!status.transfer_done) {
- // FIXME: This is a long-poll result
- return TaskRunResult.backoff();
+ return TaskRunResult.longpollReturnedPending();
}
const transitionInfo = await ws.db.runReadWriteTx(