summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/state.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2020-08-18 18:23:06 +0530
committerFlorian Dold <florian.dold@gmail.com>2020-08-18 18:23:06 +0530
commite2f7bc79cd4326f769f370f230041101a099d98c (patch)
treecd14da6ca277d20cfbd48cb15850c32e9e1ffb83 /packages/taler-wallet-core/src/operations/state.ts
parent53cd347b1c75496e4f6feee7792cc81c2aeda961 (diff)
downloadwallet-core-e2f7bc79cd4326f769f370f230041101a099d98c.tar.gz
wallet-core-e2f7bc79cd4326f769f370f230041101a099d98c.tar.bz2
wallet-core-e2f7bc79cd4326f769f370f230041101a099d98c.zip
introduce locking to avoid certain simultaneous requests to the exchange
Diffstat (limited to 'packages/taler-wallet-core/src/operations/state.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/state.ts53
1 files changed, 52 insertions, 1 deletions
diff --git a/packages/taler-wallet-core/src/operations/state.ts b/packages/taler-wallet-core/src/operations/state.ts
index cfec85d0f..582dd92d3 100644
--- a/packages/taler-wallet-core/src/operations/state.ts
+++ b/packages/taler-wallet-core/src/operations/state.ts
@@ -22,11 +22,15 @@ import { Logger } from "../util/logging";
import { PendingOperationsResponse } from "../types/pending";
import { WalletNotification } from "../types/notifications";
import { Database } from "../util/query";
+import { openPromise, OpenedPromise } from "../util/promiseUtils";
type NotificationListener = (n: WalletNotification) => void;
const logger = new Logger("state.ts");
+export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
+export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
+
export class InternalWalletState {
cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
@@ -41,6 +45,16 @@ export class InternalWalletState {
listeners: NotificationListener[] = [];
+ /**
+ * Promises that are waiting for a particular resource.
+ */
+ private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
+
+ /**
+ * Resources that are currently locked.
+ */
+ private resourceLocks: Set<string> = new Set();
+
constructor(
public db: Database,
public http: HttpRequestLibrary,
@@ -49,7 +63,7 @@ export class InternalWalletState {
this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
}
- public notify(n: WalletNotification): void {
+ notify(n: WalletNotification): void {
logger.trace("Notification", n);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
@@ -62,4 +76,41 @@ export class InternalWalletState {
addNotificationListener(f: (n: WalletNotification) => void): void {
this.listeners.push(f);
}
+
+ /**
+ * Run an async function after acquiring a list of locks, identified
+ * by string tokens.
+ */
+ async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
+ // Make sure locks are always acquired in the same order
+ tokens = [... tokens].sort();
+
+ for (const token of tokens) {
+ if (this.resourceLocks.has(token)) {
+ const p = openPromise<void>();
+ let waitList = this.resourceWaiters[token];
+ if (!waitList) {
+ waitList = this.resourceWaiters[token] = [];
+ }
+ waitList.push(p);
+ await p.promise;
+ }
+ this.resourceLocks.add(token);
+ }
+
+ try {
+ logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
+ const result = await f();
+ logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
+ return result;
+ } finally {
+ for (const token of tokens) {
+ this.resourceLocks.delete(token);
+ let waiter = (this.resourceWaiters[token] ?? []).shift();
+ if (waiter) {
+ waiter.resolve();
+ }
+ }
+ }
+ }
}