summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/crypto/workers
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/crypto/workers')
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts128
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts (renamed from packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts)68
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts8
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/rpcClient.ts92
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts36
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts2
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts174
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts4
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/worker-common.ts14
9 files changed, 182 insertions, 344 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts
new file mode 100644
index 000000000..96e2ee735
--- /dev/null
+++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts
@@ -0,0 +1,128 @@
+/*
+ This file is part of GNU Taler
+ (C) 2023 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+import { AbsoluteTime, TalerErrorCode } from "@gnu-taler/taler-util";
+import test from "ava";
+import { CryptoDispatcher, CryptoWorkerFactory } from "./crypto-dispatcher.js";
+import {
+ CryptoWorker,
+ CryptoWorkerResponseMessage,
+} from "./cryptoWorkerInterface.js";
+
+export class MyCryptoWorker implements CryptoWorker {
+ /**
+ * Function to be called when we receive a message from the worker thread.
+ */
+ onmessage: undefined | ((m: any) => void) = undefined;
+
+ /**
+ * Function to be called when we receive an error from the worker thread.
+ */
+ onerror: undefined | ((m: any) => void) = undefined;
+
+ /**
+ * Add an event listener for either an "error" or "message" event.
+ */
+ addEventListener(event: "message" | "error", fn: (x: any) => void): void {
+ switch (event) {
+ case "message":
+ this.onmessage = fn;
+ break;
+ case "error":
+ this.onerror = fn;
+ break;
+ }
+ }
+
+ private dispatchMessage(msg: any): void {
+ if (this.onmessage) {
+ this.onmessage(msg);
+ }
+ }
+
+ /**
+ * Send a message to the worker thread.
+ */
+ postMessage(msg: any): void {
+ const handleRequest = async () => {
+ let responseMsg: CryptoWorkerResponseMessage;
+ if (msg.operation === "testSuccess") {
+ responseMsg = {
+ id: msg.id,
+ type: "success",
+ result: {
+ testResult: 42,
+ },
+ };
+ } else if (msg.operation === "testError") {
+ responseMsg = {
+ id: msg.id,
+ type: "error",
+ error: {
+ code: TalerErrorCode.ANASTASIS_EMAIL_INVALID,
+ when: AbsoluteTime.now(),
+ hint: "bla",
+ },
+ };
+ } else if (msg.operation === "testTimeout") {
+ // Don't respond
+ return;
+ }
+ try {
+ setTimeout(() => this.dispatchMessage(responseMsg), 0);
+ } catch (e) {
+ console.error("got error during dispatch", e);
+ }
+ };
+ handleRequest().catch((e) => {
+ console.error("Error while handling crypto request:", e);
+ });
+ }
+
+ /**
+ * Forcibly terminate the worker thread.
+ */
+ terminate(): void {
+ // This is a no-op.
+ }
+}
+
+export class MyCryptoWorkerFactory implements CryptoWorkerFactory {
+ startWorker(): CryptoWorker {
+ return new MyCryptoWorker();
+ }
+
+ getConcurrency(): number {
+ return 1;
+ }
+}
+
+test("continues after error", async (t) => {
+ const cryptoDisp = new CryptoDispatcher(new MyCryptoWorkerFactory());
+ const resp1 = await cryptoDisp.doRpc("testSuccess", 0, {});
+ t.assert((resp1 as any).testResult === 42);
+ const exc = await t.throwsAsync(async () => {
+ const resp2 = await cryptoDisp.doRpc("testError", 0, {});
+ });
+
+ // Check that it still works after one error.
+ const resp2 = await cryptoDisp.doRpc("testSuccess", 0, {});
+ t.assert((resp2 as any).testResult === 42);
+
+ // Check that it still works after timeout.
+ const resp3 = await cryptoDisp.doRpc("testSuccess", 0, {});
+ t.assert((resp3 as any).testResult === 42);
+});
diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts
index 1c0d509e6..f86163723 100644
--- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts
@@ -23,10 +23,16 @@
/**
* Imports.
*/
-import { j2s, Logger, TalerErrorCode } from "@gnu-taler/taler-util";
-import { TalerError } from "../../errors.js";
-import { openPromise } from "../../util/promiseUtils.js";
-import { timer, performanceNow, TimerHandle } from "../../util/timer.js";
+import {
+ j2s,
+ Logger,
+ openPromise,
+ performanceNow,
+ TalerError,
+ TalerErrorCode,
+ timer,
+ TimerHandle,
+} from "@gnu-taler/taler-util";
import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
@@ -203,13 +209,7 @@ export class CryptoDispatcher {
ws.idleTimeoutHandle.unref();
}
- handleWorkerError(ws: WorkerInfo, e: any): void {
- if (ws.currentWorkItem) {
- logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
- } else {
- logger.error("error in worker", e);
- }
- logger.error(e.message);
+ private resetWorker(ws: WorkerInfo, e: any): void {
try {
if (ws.w) {
ws.w.terminate();
@@ -227,6 +227,16 @@ export class CryptoDispatcher {
this.findWork(ws);
}
+ handleWorkerError(ws: WorkerInfo, e: any): void {
+ if (ws.currentWorkItem) {
+ logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
+ } else {
+ logger.error("error in worker", e);
+ }
+ logger.error(e.message);
+ this.resetWorker(ws, e);
+ }
+
private findWork(ws: WorkerInfo): void {
// try to find more work for this worker
for (let i = 0; i < NUM_PRIO; i++) {
@@ -304,11 +314,7 @@ export class CryptoDispatcher {
}
}
- private doRpc<T>(
- operation: string,
- priority: number,
- req: unknown,
- ): Promise<T> {
+ doRpc<T>(operation: string, priority: number, req: unknown): Promise<T> {
if (this.stopped) {
throw new CryptoApiStoppedError();
}
@@ -355,30 +361,24 @@ export class CryptoDispatcher {
// (The worker child process won't keep us alive either, because we un-ref
// it to make sure it doesn't keep us alive if there is no work.)
return new Promise<T>((resolve, reject) => {
- let timedOut = false;
- const timeout = timer.after(5000, () => {
- logger.warn(`crypto RPC call ('${operation}') timed out`);
- timedOut = true;
- reject(new Error(`crypto RPC call ('${operation}') timed out`));
- if (workItem.state === WorkItemState.Running) {
- workItem.state = WorkItemState.Finished;
- this.numBusy--;
- }
- });
+ let timeoutHandle: TimerHandle | undefined = undefined;
+ const timeoutMs = 5000;
+ const onTimeout = () => {
+ // FIXME: Maybe destroy and re-init worker if request is in processing
+ // state and really taking too long?
+ logger.warn(
+ `crypto RPC call ('${operation}') has been queued for a long time`,
+ );
+ timeoutHandle = timer.after(timeoutMs, onTimeout);
+ };
myProm.promise
.then((x) => {
- if (timedOut) {
- return;
- }
- timeout.clear();
+ timeoutHandle?.clear();
resolve(x);
})
.catch((x) => {
logger.info(`crypto RPC call ${operation} threw`);
- if (timedOut) {
- return;
- }
- timeout.clear();
+ timeoutHandle?.clear();
reject(x);
});
});
diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
index f255e3cfd..eaa0108bb 100644
--- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
@@ -21,13 +21,15 @@ import { Logger } from "@gnu-taler/taler-util";
import os from "os";
import url from "url";
import { nativeCryptoR } from "../cryptoImplementation.js";
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
+import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
import { processRequestWithImpl } from "./worker-common.js";
const logger = new Logger("nodeThreadWorker.ts");
-const f = url.fileURLToPath(import.meta.url);
+const f = import.meta.url
+ ? url.fileURLToPath(import.meta.url)
+ : "__not_available__";
const workerCode = `
// Try loading the glue library for embedded
@@ -149,7 +151,7 @@ class NodeThreadCryptoWorker implements CryptoWorker {
this.onmessage(v);
}
});
- this.nodeWorker.unref();
+ //this.nodeWorker.unref();
}
/**
diff --git a/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts b/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts
deleted file mode 100644
index 21d88fffa..000000000
--- a/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- This file is part of GNU Taler
- (C) 2022 Taler Systems S.A.
-
- GNU Taler is free software; you can redistribute it and/or modify it under the
- terms of the GNU General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along with
- GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
- */
-
-/**
- * Imports.
- */
-import { Logger } from "@gnu-taler/taler-util";
-import child_process from "child_process";
-import type internal from "stream";
-import { OpenedPromise, openPromise } from "../../util/promiseUtils.js";
-
-const logger = new Logger("synchronousWorkerFactory.ts");
-
-/**
- * Client for the crypto helper process (taler-crypto-worker from exchange.git).
- */
-export class CryptoRpcClient {
- proc: child_process.ChildProcessByStdio<
- internal.Writable,
- internal.Readable,
- null
- >;
- requests: Array<{
- p: OpenedPromise<any>;
- req: any;
- }> = [];
-
- constructor() {
- const stdoutChunks: Buffer[] = [];
- this.proc = child_process.spawn("taler-crypto-worker", {
- //stdio: ["pipe", "pipe", "inherit"],
- stdio: ["pipe", "pipe", "inherit"],
- detached: true,
- });
- this.proc.on("close", (): void => {
- logger.error("child process exited");
- });
- (this.proc.stdout as any).unref();
- (this.proc.stdin as any).unref();
- this.proc.unref();
-
- this.proc.stdout.on("data", (x) => {
- if (x instanceof Buffer) {
- const nlIndex = x.indexOf("\n");
- if (nlIndex >= 0) {
- const before = x.slice(0, nlIndex);
- const after = x.slice(nlIndex + 1);
- stdoutChunks.push(after);
- const str = Buffer.concat([...stdoutChunks, before]).toString(
- "utf-8",
- );
- const req = this.requests.shift();
- if (!req) {
- throw Error("request was undefined");
- }
- if (this.requests.length === 0) {
- this.proc.unref();
- }
- //logger.info(`got response: ${str}`);
- req.p.resolve(JSON.parse(str));
- } else {
- stdoutChunks.push(x);
- }
- } else {
- throw Error(`unexpected data chunk type (${typeof x})`);
- }
- });
- }
-
- async queueRequest(req: any): Promise<any> {
- const p = openPromise<any>();
- if (this.requests.length === 0) {
- this.proc.ref();
- }
- this.requests.push({ req, p });
- this.proc.stdin.write(`${JSON.stringify(req)}\n`);
- return p.promise;
- }
-}
diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts
deleted file mode 100644
index 46cf12915..000000000
--- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- This file is part of GNU Taler
- (C) 2019 GNUnet e.V.
-
- GNU Taler is free software; you can redistribute it and/or modify it under the
- terms of the GNU General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along with
- GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
- */
-
-/**
- * Imports.
- */
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
-import { CryptoWorker } from "./cryptoWorkerInterface.js";
-import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js";
-
-/**
- * The synchronous crypto worker produced by this factory doesn't run in the
- * background, but actually blocks the caller until the operation is done.
- */
-export class SynchronousCryptoWorkerFactoryNode implements CryptoWorkerFactory {
- startWorker(): CryptoWorker {
- return new SynchronousCryptoWorkerNode();
- }
-
- getConcurrency(): number {
- return 1;
- }
-}
diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
index d0c8e4b3a..66381bc0e 100644
--- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
@@ -17,7 +17,7 @@
/**
* Imports.
*/
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
+import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js";
diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts
deleted file mode 100644
index b2653158c..000000000
--- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- This file is part of GNU Taler
- (C) 2019 GNUnet e.V.
-
- GNU Taler is free software; you can redistribute it and/or modify it under the
- terms of the GNU General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along with
- GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
- */
-
-import { j2s, Logger } from "@gnu-taler/taler-util";
-import {
- nativeCryptoR,
- TalerCryptoInterfaceR,
-} from "../cryptoImplementation.js";
-import { CryptoWorker } from "./cryptoWorkerInterface.js";
-import { CryptoRpcClient } from "./rpcClient.js";
-import { processRequestWithImpl } from "./worker-common.js";
-
-const logger = new Logger("synchronousWorker.ts");
-
-/**
- * Worker implementation that uses node subprocesses.
- *
- * The node crypto worker can also use IPC to offload cryptographic
- * operations to a helper process (usually written in C / part of taler-exchange).
- */
-export class SynchronousCryptoWorkerNode implements CryptoWorker {
- /**
- * Function to be called when we receive a message from the worker thread.
- */
- onmessage: undefined | ((m: any) => void);
-
- /**
- * Function to be called when we receive an error from the worker thread.
- */
- onerror: undefined | ((m: any) => void);
-
- cryptoImplR: TalerCryptoInterfaceR;
-
- rpcClient: CryptoRpcClient | undefined;
-
- constructor() {
- this.onerror = undefined;
- this.onmessage = undefined;
-
- this.cryptoImplR = { ...nativeCryptoR };
-
- if (process.env["TALER_WALLET_PRIMITIVE_WORKER"]) {
- logger.info("using RPC for some crypto operations");
- const rpc = (this.rpcClient = new CryptoRpcClient());
- this.cryptoImplR.eddsaSign = async (_, req) => {
- return await rpc.queueRequest({
- op: "eddsa_sign",
- args: {
- msg: req.msg,
- priv: req.priv,
- },
- });
- };
- this.cryptoImplR.setupRefreshPlanchet = async (_, req) => {
- const res = await rpc.queueRequest({
- op: "setup_refresh_planchet",
- args: {
- coin_index: req.coinNumber,
- transfer_secret: req.transferSecret,
- },
- });
- return {
- bks: res.blinding_key,
- coinPriv: res.coin_priv,
- coinPub: res.coin_pub,
- };
- };
- this.cryptoImplR.rsaBlind = async (_, req) => {
- const res = await rpc.queueRequest({
- op: "rsa_blind",
- args: {
- bks: req.bks,
- hm: req.hm,
- pub: req.pub,
- },
- });
- return {
- blinded: res.blinded,
- };
- };
- this.cryptoImplR.keyExchangeEcdheEddsa = async (_, req) => {
- const res = await rpc.queueRequest({
- op: "kx_ecdhe_eddsa",
- args: {
- ecdhe_priv: req.ecdhePriv,
- eddsa_pub: req.eddsaPub,
- },
- });
- return {
- h: res.h,
- };
- };
- this.cryptoImplR.eddsaGetPublic = async (_, req) => {
- const res = await rpc.queueRequest({
- op: "eddsa_get_public",
- args: {
- eddsa_priv: req.priv,
- },
- });
- return {
- pub: res.eddsa_pub,
- };
- };
- this.cryptoImplR.ecdheGetPublic = async (_, req) => {
- const res = await rpc.queueRequest({
- op: "ecdhe_get_public",
- args: {
- ecdhe_priv: req.priv,
- },
- });
- return {
- pub: res.ecdhe_pub,
- };
- };
- }
- }
-
- /**
- * Add an event listener for either an "error" or "message" event.
- */
- addEventListener(event: "message" | "error", fn: (x: any) => void): void {
- switch (event) {
- case "message":
- this.onmessage = fn;
- break;
- case "error":
- this.onerror = fn;
- break;
- }
- }
-
- private dispatchMessage(msg: any): void {
- if (this.onmessage) {
- this.onmessage(msg);
- }
- }
-
- /**
- * Send a message to the worker thread.
- */
- postMessage(msg: any): void {
- const handleRequest = async () => {
- const responseMsg = await processRequestWithImpl(msg, this.cryptoImplR);
- try {
- setTimeout(() => this.dispatchMessage(responseMsg), 0);
- } catch (e) {
- logger.error("got error during dispatch", e);
- }
- };
- handleRequest().catch((e) => {
- logger.error("Error while handling crypto request:", e);
- });
- }
-
- /**
- * Forcibly terminate the worker thread.
- */
- terminate(): void {
- // This is a no-op.
- }
-}
diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts
index 058896828..c80f2f58f 100644
--- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts
@@ -17,7 +17,7 @@
/**
* Imports.
*/
-import { Logger } from "@gnu-taler/taler-util";
+import { j2s, Logger } from "@gnu-taler/taler-util";
import {
nativeCryptoR,
TalerCryptoInterfaceR,
@@ -84,6 +84,8 @@ export class SynchronousCryptoWorkerPlain implements CryptoWorker {
};
handleRequest().catch((e) => {
logger.error("Error while handling crypto request:", e);
+ logger.error("Stack:", e.stack);
+ logger.error(`request was ${j2s(msg)}`);
});
}
diff --git a/packages/taler-wallet-core/src/crypto/workers/worker-common.ts b/packages/taler-wallet-core/src/crypto/workers/worker-common.ts
index 459033526..63147ce92 100644
--- a/packages/taler-wallet-core/src/crypto/workers/worker-common.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/worker-common.ts
@@ -17,8 +17,16 @@
/**
* Imports.
*/
-import { j2s, Logger, TalerErrorCode } from "@gnu-taler/taler-util";
-import { getErrorDetailFromException, makeErrorDetail } from "../../errors.js";
+import {
+ j2s,
+ Logger,
+ stringifyError as safeStringifyError,
+ TalerErrorCode,
+} from "@gnu-taler/taler-util";
+import {
+ getErrorDetailFromException,
+ makeErrorDetail,
+} from "@gnu-taler/taler-util";
import { TalerCryptoInterfaceR } from "../cryptoImplementation.js";
import {
CryptoWorkerRequestMessage,
@@ -88,7 +96,7 @@ export async function processRequestWithImpl(
const result = await (impl as any)[operation](impl, reqMsg.req);
responseMsg = { type: "success", result, id };
} catch (e: any) {
- logger.error(`error during operation: ${e.stack ?? e.toString()}`);
+ logger.error(`error during operation: ${safeStringifyError(e)}`);
responseMsg = {
type: "error",
error: getErrorDetailFromException(e),