diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/state.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/state.ts | 53 |
1 files changed, 52 insertions, 1 deletions
diff --git a/packages/taler-wallet-core/src/operations/state.ts b/packages/taler-wallet-core/src/operations/state.ts index cfec85d0f..582dd92d3 100644 --- a/packages/taler-wallet-core/src/operations/state.ts +++ b/packages/taler-wallet-core/src/operations/state.ts @@ -22,11 +22,15 @@ import { Logger } from "../util/logging"; import { PendingOperationsResponse } from "../types/pending"; import { WalletNotification } from "../types/notifications"; import { Database } from "../util/query"; +import { openPromise, OpenedPromise } from "../util/promiseUtils"; type NotificationListener = (n: WalletNotification) => void; const logger = new Logger("state.ts"); +export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; +export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; + export class InternalWalletState { cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); @@ -41,6 +45,16 @@ export class InternalWalletState { listeners: NotificationListener[] = []; + /** + * Promises that are waiting for a particular resource. + */ + private resourceWaiters: Record<string, OpenedPromise<void>[]> = {}; + + /** + * Resources that are currently locked. + */ + private resourceLocks: Set<string> = new Set(); + constructor( public db: Database, public http: HttpRequestLibrary, @@ -49,7 +63,7 @@ export class InternalWalletState { this.cryptoApi = new CryptoApi(cryptoWorkerFactory); } - public notify(n: WalletNotification): void { + notify(n: WalletNotification): void { logger.trace("Notification", n); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); @@ -62,4 +76,41 @@ export class InternalWalletState { addNotificationListener(f: (n: WalletNotification) => void): void { this.listeners.push(f); } + + /** + * Run an async function after acquiring a list of locks, identified + * by string tokens. + */ + async runSequentialized<T>(tokens: string[], f: () => Promise<T>) { + // Make sure locks are always acquired in the same order + tokens = [... tokens].sort(); + + for (const token of tokens) { + if (this.resourceLocks.has(token)) { + const p = openPromise<void>(); + let waitList = this.resourceWaiters[token]; + if (!waitList) { + waitList = this.resourceWaiters[token] = []; + } + waitList.push(p); + await p.promise; + } + this.resourceLocks.add(token); + } + + try { + logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`); + const result = await f(); + logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`); + return result; + } finally { + for (const token of tokens) { + this.resourceLocks.delete(token); + let waiter = (this.resourceWaiters[token] ?? []).shift(); + if (waiter) { + waiter.resolve(); + } + } + } + } } |