summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/refresh.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/refresh.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts155
1 files changed, 22 insertions, 133 deletions
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 5f7169dbd..b9ac12518 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -15,12 +15,12 @@
*/
import {
- AbsoluteTime,
AgeCommitment,
AgeRestriction,
AmountJson,
Amounts,
amountToPretty,
+ CancellationToken,
codecForExchangeMeltResponse,
codecForExchangeRevealResponse,
CoinPublicKeyString,
@@ -29,8 +29,6 @@ import {
DenominationInfo,
DenomKeyType,
Duration,
- durationFromSpec,
- durationMul,
encodeCrock,
ExchangeMeltRequest,
ExchangeProtocolVersion,
@@ -51,7 +49,6 @@ import {
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
- TalerProtocolTimestamp,
TransactionAction,
TransactionMajorState,
TransactionState,
@@ -79,10 +76,11 @@ import {
} from "../db.js";
import {
getCandidateWithdrawalDenomsTx,
+ PendingTaskType,
RefreshGroupPerExchangeInfo,
RefreshSessionRecord,
+ TaskId,
timestampPreciseToDb,
- timestampProtocolFromDb,
WalletDbReadWriteTransactionArr,
} from "../index.js";
import {
@@ -94,6 +92,7 @@ import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
import {
+ constructTaskIdentifier,
makeCoinAvailable,
makeCoinsVisible,
TaskRunResult,
@@ -111,6 +110,7 @@ const logger = new Logger("refresh.ts");
export class RefreshTransactionContext implements TransactionContext {
public transactionId: string;
+ readonly taskId: TaskId;
constructor(
public ws: InternalWalletState,
@@ -120,6 +120,10 @@ export class RefreshTransactionContext implements TransactionContext {
tag: TransactionType.Refresh,
refreshGroupId,
});
+ this.taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId,
+ });
}
async deleteTransaction(): Promise<void> {
@@ -211,8 +215,8 @@ export class RefreshTransactionContext implements TransactionContext {
}
return undefined;
});
- ws.workAvailable.trigger();
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
async failTransaction(): Promise<void> {
@@ -250,8 +254,9 @@ export class RefreshTransactionContext implements TransactionContext {
newTxState: computeRefreshTransactionState(dg),
};
});
- ws.workAvailable.trigger();
+ ws.taskScheduler.stopShepherdTask(this.taskId);
notifyTransition(ws, transactionId, transitionInfo);
+ ws.taskScheduler.startShepherdTask(this.taskId);
}
}
@@ -1003,7 +1008,7 @@ async function refreshReveal(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
- options: Record<string, never> = {},
+ cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace(`processing refresh group ${refreshGroupId}`);
@@ -1053,7 +1058,7 @@ export async function processRefreshGroup(
logger.warn(`exception: ${e}`);
}
if (inShutdown) {
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
if (errors.length > 0) {
return {
@@ -1068,7 +1073,7 @@ export async function processRefreshGroup(
};
}
- return TaskRunResult.pending();
+ return TaskRunResult.backoff();
}
async function processRefreshSession(
@@ -1311,134 +1316,18 @@ export async function createRefreshGroup(
logger.trace(`created refresh group ${refreshGroupId}`);
+ const ctx = new RefreshTransactionContext(ws, refreshGroupId);
+
+ // Shepherd the task.
+ // If the current transaction fails to commit the refresh
+ // group to the DB, the shepherd will give up.
+ ws.taskScheduler.startShepherdTask(ctx.taskId);
+
return {
refreshGroupId,
};
}
-/**
- * Timestamp after which the wallet would do the next check for an auto-refresh.
- */
-function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
- const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
- timestampProtocolFromDb(d.stampExpireWithdraw),
- );
- const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
- timestampProtocolFromDb(d.stampExpireDeposit),
- );
- const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
- const deltaDiv = durationMul(delta, 0.75);
- return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-/**
- * Timestamp after which the wallet would do an auto-refresh.
- */
-export function getAutoRefreshExecuteThreshold(d: {
- stampExpireWithdraw: TalerProtocolTimestamp;
- stampExpireDeposit: TalerProtocolTimestamp;
-}): AbsoluteTime {
- const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
- d.stampExpireWithdraw,
- );
- const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
- d.stampExpireDeposit,
- );
- const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
- const deltaDiv = durationMul(delta, 0.5);
- return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-function getAutoRefreshExecuteThresholdForDenom(
- d: DenominationRecord,
-): AbsoluteTime {
- return getAutoRefreshExecuteThreshold({
- stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
- stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
- });
-}
-
-export async function autoRefresh(
- ws: InternalWalletState,
- exchangeBaseUrl: string,
-): Promise<TaskRunResult> {
- logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
-
- // We must make sure that the exchange is up-to-date so that
- // can refresh into new denominations.
- await fetchFreshExchange(ws, exchangeBaseUrl);
-
- let minCheckThreshold = AbsoluteTime.addDuration(
- AbsoluteTime.now(),
- durationFromSpec({ days: 1 }),
- );
- await ws.db
- .mktx((x) => [
- x.coins,
- x.denominations,
- x.coinAvailability,
- x.refreshGroups,
- x.exchanges,
- ])
- .runReadWrite(async (tx) => {
- const exchange = await tx.exchanges.get(exchangeBaseUrl);
- if (!exchange || !exchange.detailsPointer) {
- return;
- }
- const coins = await tx.coins.indexes.byBaseUrl
- .iter(exchangeBaseUrl)
- .toArray();
- const refreshCoins: CoinRefreshRequest[] = [];
- for (const coin of coins) {
- if (coin.status !== CoinStatus.Fresh) {
- continue;
- }
- const denom = await tx.denominations.get([
- exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- logger.warn("denomination not in database");
- continue;
- }
- const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom);
- if (AbsoluteTime.isExpired(executeThreshold)) {
- refreshCoins.push({
- coinPub: coin.coinPub,
- amount: denom.value,
- });
- } else {
- const checkThreshold = getAutoRefreshCheckThreshold(denom);
- minCheckThreshold = AbsoluteTime.min(
- minCheckThreshold,
- checkThreshold,
- );
- }
- }
- if (refreshCoins.length > 0) {
- const res = await createRefreshGroup(
- ws,
- tx,
- exchange.detailsPointer?.currency,
- refreshCoins,
- RefreshReason.Scheduled,
- undefined,
- );
- logger.trace(
- `created refresh group for auto-refresh (${res.refreshGroupId})`,
- );
- }
- logger.trace(
- `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`,
- );
- exchange.nextRefreshCheckStamp = timestampPreciseToDb(
- AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
- );
- await tx.exchanges.put(exchange);
- });
- return TaskRunResult.finished();
-}
-
export function computeRefreshTransactionState(
rg: RefreshGroupRecord,
): TransactionState {