diff options
Diffstat (limited to 'packages/taler-wallet-core/src/crypto/workers')
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts | 128 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts (renamed from packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts) | 68 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts | 8 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/rpcClient.ts | 92 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts | 36 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts | 174 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts | 4 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/worker-common.ts | 14 |
9 files changed, 182 insertions, 344 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts new file mode 100644 index 000000000..96e2ee735 --- /dev/null +++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts @@ -0,0 +1,128 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +import { AbsoluteTime, TalerErrorCode } from "@gnu-taler/taler-util"; +import test from "ava"; +import { CryptoDispatcher, CryptoWorkerFactory } from "./crypto-dispatcher.js"; +import { + CryptoWorker, + CryptoWorkerResponseMessage, +} from "./cryptoWorkerInterface.js"; + +export class MyCryptoWorker implements CryptoWorker { + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void) = undefined; + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void) = undefined; + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + private dispatchMessage(msg: any): void { + if (this.onmessage) { + this.onmessage(msg); + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any): void { + const handleRequest = async () => { + let responseMsg: CryptoWorkerResponseMessage; + if (msg.operation === "testSuccess") { + responseMsg = { + id: msg.id, + type: "success", + result: { + testResult: 42, + }, + }; + } else if (msg.operation === "testError") { + responseMsg = { + id: msg.id, + type: "error", + error: { + code: TalerErrorCode.ANASTASIS_EMAIL_INVALID, + when: AbsoluteTime.now(), + hint: "bla", + }, + }; + } else if (msg.operation === "testTimeout") { + // Don't respond + return; + } + try { + setTimeout(() => this.dispatchMessage(responseMsg), 0); + } catch (e) { + console.error("got error during dispatch", e); + } + }; + handleRequest().catch((e) => { + console.error("Error while handling crypto request:", e); + }); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate(): void { + // This is a no-op. + } +} + +export class MyCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + return new MyCryptoWorker(); + } + + getConcurrency(): number { + return 1; + } +} + +test("continues after error", async (t) => { + const cryptoDisp = new CryptoDispatcher(new MyCryptoWorkerFactory()); + const resp1 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp1 as any).testResult === 42); + const exc = await t.throwsAsync(async () => { + const resp2 = await cryptoDisp.doRpc("testError", 0, {}); + }); + + // Check that it still works after one error. + const resp2 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp2 as any).testResult === 42); + + // Check that it still works after timeout. + const resp3 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp3 as any).testResult === 42); +}); diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts index 1c0d509e6..f86163723 100644 --- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts +++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts @@ -23,10 +23,16 @@ /** * Imports. */ -import { j2s, Logger, TalerErrorCode } from "@gnu-taler/taler-util"; -import { TalerError } from "../../errors.js"; -import { openPromise } from "../../util/promiseUtils.js"; -import { timer, performanceNow, TimerHandle } from "../../util/timer.js"; +import { + j2s, + Logger, + openPromise, + performanceNow, + TalerError, + TalerErrorCode, + timer, + TimerHandle, +} from "@gnu-taler/taler-util"; import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; @@ -203,13 +209,7 @@ export class CryptoDispatcher { ws.idleTimeoutHandle.unref(); } - handleWorkerError(ws: WorkerInfo, e: any): void { - if (ws.currentWorkItem) { - logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e); - } else { - logger.error("error in worker", e); - } - logger.error(e.message); + private resetWorker(ws: WorkerInfo, e: any): void { try { if (ws.w) { ws.w.terminate(); @@ -227,6 +227,16 @@ export class CryptoDispatcher { this.findWork(ws); } + handleWorkerError(ws: WorkerInfo, e: any): void { + if (ws.currentWorkItem) { + logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e); + } else { + logger.error("error in worker", e); + } + logger.error(e.message); + this.resetWorker(ws, e); + } + private findWork(ws: WorkerInfo): void { // try to find more work for this worker for (let i = 0; i < NUM_PRIO; i++) { @@ -304,11 +314,7 @@ export class CryptoDispatcher { } } - private doRpc<T>( - operation: string, - priority: number, - req: unknown, - ): Promise<T> { + doRpc<T>(operation: string, priority: number, req: unknown): Promise<T> { if (this.stopped) { throw new CryptoApiStoppedError(); } @@ -355,30 +361,24 @@ export class CryptoDispatcher { // (The worker child process won't keep us alive either, because we un-ref // it to make sure it doesn't keep us alive if there is no work.) return new Promise<T>((resolve, reject) => { - let timedOut = false; - const timeout = timer.after(5000, () => { - logger.warn(`crypto RPC call ('${operation}') timed out`); - timedOut = true; - reject(new Error(`crypto RPC call ('${operation}') timed out`)); - if (workItem.state === WorkItemState.Running) { - workItem.state = WorkItemState.Finished; - this.numBusy--; - } - }); + let timeoutHandle: TimerHandle | undefined = undefined; + const timeoutMs = 5000; + const onTimeout = () => { + // FIXME: Maybe destroy and re-init worker if request is in processing + // state and really taking too long? + logger.warn( + `crypto RPC call ('${operation}') has been queued for a long time`, + ); + timeoutHandle = timer.after(timeoutMs, onTimeout); + }; myProm.promise .then((x) => { - if (timedOut) { - return; - } - timeout.clear(); + timeoutHandle?.clear(); resolve(x); }) .catch((x) => { logger.info(`crypto RPC call ${operation} threw`); - if (timedOut) { - return; - } - timeout.clear(); + timeoutHandle?.clear(); reject(x); }); }); diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts index f255e3cfd..eaa0108bb 100644 --- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts +++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts @@ -21,13 +21,15 @@ import { Logger } from "@gnu-taler/taler-util"; import os from "os"; import url from "url"; import { nativeCryptoR } from "../cryptoImplementation.js"; -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; +import { CryptoWorkerFactory } from "./crypto-dispatcher.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; import { processRequestWithImpl } from "./worker-common.js"; const logger = new Logger("nodeThreadWorker.ts"); -const f = url.fileURLToPath(import.meta.url); +const f = import.meta.url + ? url.fileURLToPath(import.meta.url) + : "__not_available__"; const workerCode = ` // Try loading the glue library for embedded @@ -149,7 +151,7 @@ class NodeThreadCryptoWorker implements CryptoWorker { this.onmessage(v); } }); - this.nodeWorker.unref(); + //this.nodeWorker.unref(); } /** diff --git a/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts b/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts deleted file mode 100644 index 21d88fffa..000000000 --- a/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts +++ /dev/null @@ -1,92 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2022 Taler Systems S.A. - - GNU Taler is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - -/** - * Imports. - */ -import { Logger } from "@gnu-taler/taler-util"; -import child_process from "child_process"; -import type internal from "stream"; -import { OpenedPromise, openPromise } from "../../util/promiseUtils.js"; - -const logger = new Logger("synchronousWorkerFactory.ts"); - -/** - * Client for the crypto helper process (taler-crypto-worker from exchange.git). - */ -export class CryptoRpcClient { - proc: child_process.ChildProcessByStdio< - internal.Writable, - internal.Readable, - null - >; - requests: Array<{ - p: OpenedPromise<any>; - req: any; - }> = []; - - constructor() { - const stdoutChunks: Buffer[] = []; - this.proc = child_process.spawn("taler-crypto-worker", { - //stdio: ["pipe", "pipe", "inherit"], - stdio: ["pipe", "pipe", "inherit"], - detached: true, - }); - this.proc.on("close", (): void => { - logger.error("child process exited"); - }); - (this.proc.stdout as any).unref(); - (this.proc.stdin as any).unref(); - this.proc.unref(); - - this.proc.stdout.on("data", (x) => { - if (x instanceof Buffer) { - const nlIndex = x.indexOf("\n"); - if (nlIndex >= 0) { - const before = x.slice(0, nlIndex); - const after = x.slice(nlIndex + 1); - stdoutChunks.push(after); - const str = Buffer.concat([...stdoutChunks, before]).toString( - "utf-8", - ); - const req = this.requests.shift(); - if (!req) { - throw Error("request was undefined"); - } - if (this.requests.length === 0) { - this.proc.unref(); - } - //logger.info(`got response: ${str}`); - req.p.resolve(JSON.parse(str)); - } else { - stdoutChunks.push(x); - } - } else { - throw Error(`unexpected data chunk type (${typeof x})`); - } - }); - } - - async queueRequest(req: any): Promise<any> { - const p = openPromise<any>(); - if (this.requests.length === 0) { - this.proc.ref(); - } - this.requests.push({ req, p }); - this.proc.stdin.write(`${JSON.stringify(req)}\n`); - return p.promise; - } -} diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts deleted file mode 100644 index 46cf12915..000000000 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts +++ /dev/null @@ -1,36 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2019 GNUnet e.V. - - GNU Taler is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - -/** - * Imports. - */ -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; -import { CryptoWorker } from "./cryptoWorkerInterface.js"; -import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js"; - -/** - * The synchronous crypto worker produced by this factory doesn't run in the - * background, but actually blocks the caller until the operation is done. - */ -export class SynchronousCryptoWorkerFactoryNode implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - return new SynchronousCryptoWorkerNode(); - } - - getConcurrency(): number { - return 1; - } -} diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts index d0c8e4b3a..66381bc0e 100644 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts +++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts @@ -17,7 +17,7 @@ /** * Imports. */ -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; +import { CryptoWorkerFactory } from "./crypto-dispatcher.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js"; diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts deleted file mode 100644 index b2653158c..000000000 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts +++ /dev/null @@ -1,174 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2019 GNUnet e.V. - - GNU Taler is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - -import { j2s, Logger } from "@gnu-taler/taler-util"; -import { - nativeCryptoR, - TalerCryptoInterfaceR, -} from "../cryptoImplementation.js"; -import { CryptoWorker } from "./cryptoWorkerInterface.js"; -import { CryptoRpcClient } from "./rpcClient.js"; -import { processRequestWithImpl } from "./worker-common.js"; - -const logger = new Logger("synchronousWorker.ts"); - -/** - * Worker implementation that uses node subprocesses. - * - * The node crypto worker can also use IPC to offload cryptographic - * operations to a helper process (usually written in C / part of taler-exchange). - */ -export class SynchronousCryptoWorkerNode implements CryptoWorker { - /** - * Function to be called when we receive a message from the worker thread. - */ - onmessage: undefined | ((m: any) => void); - - /** - * Function to be called when we receive an error from the worker thread. - */ - onerror: undefined | ((m: any) => void); - - cryptoImplR: TalerCryptoInterfaceR; - - rpcClient: CryptoRpcClient | undefined; - - constructor() { - this.onerror = undefined; - this.onmessage = undefined; - - this.cryptoImplR = { ...nativeCryptoR }; - - if (process.env["TALER_WALLET_PRIMITIVE_WORKER"]) { - logger.info("using RPC for some crypto operations"); - const rpc = (this.rpcClient = new CryptoRpcClient()); - this.cryptoImplR.eddsaSign = async (_, req) => { - return await rpc.queueRequest({ - op: "eddsa_sign", - args: { - msg: req.msg, - priv: req.priv, - }, - }); - }; - this.cryptoImplR.setupRefreshPlanchet = async (_, req) => { - const res = await rpc.queueRequest({ - op: "setup_refresh_planchet", - args: { - coin_index: req.coinNumber, - transfer_secret: req.transferSecret, - }, - }); - return { - bks: res.blinding_key, - coinPriv: res.coin_priv, - coinPub: res.coin_pub, - }; - }; - this.cryptoImplR.rsaBlind = async (_, req) => { - const res = await rpc.queueRequest({ - op: "rsa_blind", - args: { - bks: req.bks, - hm: req.hm, - pub: req.pub, - }, - }); - return { - blinded: res.blinded, - }; - }; - this.cryptoImplR.keyExchangeEcdheEddsa = async (_, req) => { - const res = await rpc.queueRequest({ - op: "kx_ecdhe_eddsa", - args: { - ecdhe_priv: req.ecdhePriv, - eddsa_pub: req.eddsaPub, - }, - }); - return { - h: res.h, - }; - }; - this.cryptoImplR.eddsaGetPublic = async (_, req) => { - const res = await rpc.queueRequest({ - op: "eddsa_get_public", - args: { - eddsa_priv: req.priv, - }, - }); - return { - pub: res.eddsa_pub, - }; - }; - this.cryptoImplR.ecdheGetPublic = async (_, req) => { - const res = await rpc.queueRequest({ - op: "ecdhe_get_public", - args: { - ecdhe_priv: req.priv, - }, - }); - return { - pub: res.ecdhe_pub, - }; - }; - } - } - - /** - * Add an event listener for either an "error" or "message" event. - */ - addEventListener(event: "message" | "error", fn: (x: any) => void): void { - switch (event) { - case "message": - this.onmessage = fn; - break; - case "error": - this.onerror = fn; - break; - } - } - - private dispatchMessage(msg: any): void { - if (this.onmessage) { - this.onmessage(msg); - } - } - - /** - * Send a message to the worker thread. - */ - postMessage(msg: any): void { - const handleRequest = async () => { - const responseMsg = await processRequestWithImpl(msg, this.cryptoImplR); - try { - setTimeout(() => this.dispatchMessage(responseMsg), 0); - } catch (e) { - logger.error("got error during dispatch", e); - } - }; - handleRequest().catch((e) => { - logger.error("Error while handling crypto request:", e); - }); - } - - /** - * Forcibly terminate the worker thread. - */ - terminate(): void { - // This is a no-op. - } -} diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts index 058896828..c80f2f58f 100644 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts +++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerPlain.ts @@ -17,7 +17,7 @@ /** * Imports. */ -import { Logger } from "@gnu-taler/taler-util"; +import { j2s, Logger } from "@gnu-taler/taler-util"; import { nativeCryptoR, TalerCryptoInterfaceR, @@ -84,6 +84,8 @@ export class SynchronousCryptoWorkerPlain implements CryptoWorker { }; handleRequest().catch((e) => { logger.error("Error while handling crypto request:", e); + logger.error("Stack:", e.stack); + logger.error(`request was ${j2s(msg)}`); }); } diff --git a/packages/taler-wallet-core/src/crypto/workers/worker-common.ts b/packages/taler-wallet-core/src/crypto/workers/worker-common.ts index 459033526..63147ce92 100644 --- a/packages/taler-wallet-core/src/crypto/workers/worker-common.ts +++ b/packages/taler-wallet-core/src/crypto/workers/worker-common.ts @@ -17,8 +17,16 @@ /** * Imports. */ -import { j2s, Logger, TalerErrorCode } from "@gnu-taler/taler-util"; -import { getErrorDetailFromException, makeErrorDetail } from "../../errors.js"; +import { + j2s, + Logger, + stringifyError as safeStringifyError, + TalerErrorCode, +} from "@gnu-taler/taler-util"; +import { + getErrorDetailFromException, + makeErrorDetail, +} from "@gnu-taler/taler-util"; import { TalerCryptoInterfaceR } from "../cryptoImplementation.js"; import { CryptoWorkerRequestMessage, @@ -88,7 +96,7 @@ export async function processRequestWithImpl( const result = await (impl as any)[operation](impl, reqMsg.req); responseMsg = { type: "success", result, id }; } catch (e: any) { - logger.error(`error during operation: ${e.stack ?? e.toString()}`); + logger.error(`error during operation: ${safeStringifyError(e)}`); responseMsg = { type: "error", error: getErrorDetailFromException(e), |