summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/wallet.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/wallet.ts')
-rw-r--r--packages/taler-wallet-core/src/wallet.ts202
1 files changed, 197 insertions, 5 deletions
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index a9c4c97e8..3a3b4f6fd 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -23,6 +23,7 @@
* Imports.
*/
import {
+ BalancesResponse,
codecForAny,
codecForDeleteTransactionRequest,
codecForRetryTransactionRequest,
@@ -32,9 +33,11 @@ import {
getDurationRemaining,
isTimestampExpired,
j2s,
+ PreparePayResultType,
TalerErrorCode,
Timestamp,
timestampMin,
+ WalletNotification,
} from "@gnu-taler/taler-util";
import {
addBackupProvider,
@@ -59,6 +62,7 @@ import {
import {
acceptExchangeTermsOfService,
getExchangeDetails,
+ getExchangeTrust,
updateExchangeFromUrl,
} from "./operations/exchanges.js";
import {
@@ -85,7 +89,11 @@ import {
getFundingPaytoUris,
processReserve,
} from "./operations/reserves.js";
-import { InternalWalletState } from "./common.js";
+import {
+ ExchangeOperations,
+ InternalWalletState,
+ NotificationListener,
+} from "./common.js";
import {
runIntegrationTest,
testPay,
@@ -106,16 +114,16 @@ import {
AuditorTrustRecord,
CoinSourceType,
ReserveRecordStatus,
+ WalletStoresV1,
} from "./db.js";
import { NotificationType } from "@gnu-taler/taler-util";
import {
PendingOperationInfo,
+ PendingOperationsResponse,
PendingOperationType,
} from "./pending-types.js";
import { CoinDumpJson } from "@gnu-taler/taler-util";
-import {
- codecForTransactionsRequest,
-} from "@gnu-taler/taler-util";
+import { codecForTransactionsRequest } from "@gnu-taler/taler-util";
import {
AcceptManualWithdrawalResult,
AcceptWithdrawalResponse,
@@ -151,6 +159,16 @@ import { assertUnreachable } from "./util/assertUnreachable.js";
import { Logger } from "@gnu-taler/taler-util";
import { setWalletDeviceId } from "./operations/backup/state.js";
import { WalletCoreApiClient } from "./wallet-api-types.js";
+import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js";
+import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi.js";
+import { TimerGroup } from "./util/timer.js";
+import {
+ AsyncCondition,
+ OpenedPromise,
+ openPromise,
+} from "./util/promiseUtils.js";
+import { DbAccess } from "./util/query.js";
+import { HttpRequestLibrary } from "./util/http.js";
const builtinAuditors: AuditorTrustRecord[] = [
{
@@ -618,7 +636,6 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
return coinsJson;
}
-
/**
* Get an API client from an internal wallet state object.
*/
@@ -936,3 +953,178 @@ export async function handleCoreApiRequest(
}
}
}
+
+/**
+ * Public handle to a running wallet.
+ */
+export class Wallet {
+ private ws: InternalWalletState;
+ private _client: WalletCoreApiClient;
+
+ private constructor(
+ db: DbAccess<typeof WalletStoresV1>,
+ http: HttpRequestLibrary,
+ cryptoWorkerFactory: CryptoWorkerFactory,
+ ) {
+ this.ws = new InternalWalletStateImpl(db, http, cryptoWorkerFactory);
+ }
+
+ get client() {
+ return this._client;
+ }
+
+ static async create(
+ db: DbAccess<typeof WalletStoresV1>,
+ http: HttpRequestLibrary,
+ cryptoWorkerFactory: CryptoWorkerFactory,
+ ): Promise<Wallet> {
+ const w = new Wallet(db, http, cryptoWorkerFactory);
+ w._client = await getClientFromWalletState(w.ws);
+ return w;
+ }
+
+ addNotificationListener(f: (n: WalletNotification) => void): void {
+ return this.ws.addNotificationListener(f);
+ }
+
+ stop(): void {
+ this.ws.stop();
+ }
+
+ runRetryLoop(): Promise<void> {
+ return runRetryLoop(this.ws);
+ }
+
+ runPending(forceNow: boolean = false) {
+ return runPending(this.ws, forceNow);
+ }
+
+ runUntilDone(
+ req: {
+ maxRetries?: number;
+ } = {},
+ ) {
+ return runUntilDone(this.ws, req);
+ }
+
+ handleCoreApiRequest(
+ operation: string,
+ id: string,
+ payload: unknown,
+ ): Promise<CoreApiResponse> {
+ return handleCoreApiRequest(this.ws, operation, id, payload);
+ }
+}
+
+/**
+ * Internal state of the wallet.
+ *
+ * This ties together all the operation implementations.
+ */
+class InternalWalletStateImpl implements InternalWalletState {
+ memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle();
+ memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
+ memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ cryptoApi: CryptoApi;
+
+ timerGroup: TimerGroup = new TimerGroup();
+ latch = new AsyncCondition();
+ stopped = false;
+ memoRunRetryLoop = new AsyncOpMemoSingle<void>();
+
+ listeners: NotificationListener[] = [];
+
+ initCalled: boolean = false;
+
+ exchangeOps: ExchangeOperations = {
+ getExchangeDetails,
+ getExchangeTrust,
+ updateExchangeFromUrl,
+ };
+
+ /**
+ * Promises that are waiting for a particular resource.
+ */
+ private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
+
+ /**
+ * Resources that are currently locked.
+ */
+ private resourceLocks: Set<string> = new Set();
+
+ constructor(
+ // FIXME: Make this a getter and make
+ // the actual value nullable.
+ // Check if we are in a DB migration / garbage collection
+ // and throw an error in that case.
+ public db: DbAccess<typeof WalletStoresV1>,
+ public http: HttpRequestLibrary,
+ cryptoWorkerFactory: CryptoWorkerFactory,
+ ) {
+ this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
+ }
+
+ notify(n: WalletNotification): void {
+ logger.trace("Notification", n);
+ for (const l of this.listeners) {
+ const nc = JSON.parse(JSON.stringify(n));
+ setTimeout(() => {
+ l(nc);
+ }, 0);
+ }
+ }
+
+ addNotificationListener(f: (n: WalletNotification) => void): void {
+ this.listeners.push(f);
+ }
+
+ /**
+ * Stop ongoing processing.
+ */
+ stop(): void {
+ this.stopped = true;
+ this.timerGroup.stopCurrentAndFutureTimers();
+ this.cryptoApi.stop();
+ }
+
+ /**
+ * Run an async function after acquiring a list of locks, identified
+ * by string tokens.
+ */
+ async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
+ // Make sure locks are always acquired in the same order
+ tokens = [...tokens].sort();
+
+ for (const token of tokens) {
+ if (this.resourceLocks.has(token)) {
+ const p = openPromise<void>();
+ let waitList = this.resourceWaiters[token];
+ if (!waitList) {
+ waitList = this.resourceWaiters[token] = [];
+ }
+ waitList.push(p);
+ await p.promise;
+ }
+ this.resourceLocks.add(token);
+ }
+
+ try {
+ logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
+ const result = await f();
+ logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
+ return result;
+ } finally {
+ for (const token of tokens) {
+ this.resourceLocks.delete(token);
+ let waiter = (this.resourceWaiters[token] ?? []).shift();
+ if (waiter) {
+ waiter.resolve();
+ }
+ }
+ }
+ }
+}