summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/crypto
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2022-09-26 14:40:06 +0200
committerFlorian Dold <florian@dold.me>2022-09-26 14:40:11 +0200
commit25eb7624b39f05d720c150b047c15c267e2a1a6d (patch)
tree239de3714d0f225b7d38be0c18e80ca246ed1ad4 /packages/taler-wallet-core/src/crypto
parent360cb806107b5b3ea356d479bbacc16f3744fd35 (diff)
downloadwallet-core-25eb7624b39f05d720c150b047c15c267e2a1a6d.tar.gz
wallet-core-25eb7624b39f05d720c150b047c15c267e2a1a6d.tar.bz2
wallet-core-25eb7624b39f05d720c150b047c15c267e2a1a6d.zip
wallet-core: improve crypto worker error handling
Diffstat (limited to 'packages/taler-wallet-core/src/crypto')
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts136
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts67
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts30
3 files changed, 146 insertions, 87 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
index 2ef0d7c69..48c9c6060 100644
--- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
@@ -22,7 +22,9 @@
/**
* Imports.
*/
-import { Logger } from "@gnu-taler/taler-util";
+import { Logger, TalerErrorCode } from "@gnu-taler/taler-util";
+import { TalerError } from "../../errors.js";
+import { openPromise } from "../../util/promiseUtils.js";
import { timer, performanceNow, TimerHandle } from "../../util/timer.js";
import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
@@ -32,7 +34,7 @@ const logger = new Logger("cryptoApi.ts");
/**
* State of a crypto worker.
*/
-interface WorkerState {
+interface WorkerInfo {
/**
* The actual worker thread.
*/
@@ -64,6 +66,8 @@ interface WorkItem {
* Time when the work was submitted to a (non-busy) worker thread.
*/
startTime: BigInt;
+
+ state: WorkItemState;
}
/**
@@ -92,12 +96,18 @@ export class CryptoApiStoppedError extends Error {
}
}
+export enum WorkItemState {
+ Pending = 1,
+ Running = 2,
+ Finished = 3,
+}
+
/**
* Dispatcher for cryptographic operations to underlying crypto workers.
*/
export class CryptoDispatcher {
private nextRpcId = 1;
- private workers: WorkerState[];
+ private workers: WorkerInfo[];
private workQueues: WorkItem[][];
private workerFactory: CryptoWorkerFactory;
@@ -141,7 +151,7 @@ export class CryptoDispatcher {
/**
* Start a worker (if not started) and set as busy.
*/
- wake(ws: WorkerState, work: WorkItem): void {
+ wake(ws: WorkerInfo, work: WorkItem): void {
if (this.stopped) {
return;
}
@@ -167,10 +177,11 @@ export class CryptoDispatcher {
};
this.resetWorkerTimeout(ws);
work.startTime = performanceNow();
+ work.state = WorkItemState.Running;
timer.after(0, () => worker.postMessage(msg));
}
- resetWorkerTimeout(ws: WorkerState): void {
+ resetWorkerTimeout(ws: WorkerInfo): void {
if (ws.idleTimeoutHandle !== null) {
ws.idleTimeoutHandle.clear();
ws.idleTimeoutHandle = null;
@@ -187,7 +198,7 @@ export class CryptoDispatcher {
ws.idleTimeoutHandle.unref();
}
- handleWorkerError(ws: WorkerState, e: any): void {
+ handleWorkerError(ws: WorkerInfo, e: any): void {
if (ws.currentWorkItem) {
logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
} else {
@@ -203,6 +214,7 @@ export class CryptoDispatcher {
logger.error(e as string);
}
if (ws.currentWorkItem !== null) {
+ ws.currentWorkItem.state = WorkItemState.Finished;
ws.currentWorkItem.reject(e);
ws.currentWorkItem = null;
this.numBusy--;
@@ -210,7 +222,7 @@ export class CryptoDispatcher {
this.findWork(ws);
}
- private findWork(ws: WorkerState): void {
+ private findWork(ws: WorkerInfo): void {
// try to find more work for this worker
for (let i = 0; i < NUM_PRIO; i++) {
const q = this.workQueues[NUM_PRIO - i - 1];
@@ -225,26 +237,38 @@ export class CryptoDispatcher {
}
}
- handleWorkerMessage(ws: WorkerState, msg: any): void {
+ handleWorkerMessage(ws: WorkerInfo, msg: any): void {
const id = msg.data.id;
if (typeof id !== "number") {
- console.error("rpc id must be number");
+ logger.error("rpc id must be number");
return;
}
const currentWorkItem = ws.currentWorkItem;
ws.currentWorkItem = null;
- this.numBusy--;
- this.findWork(ws);
if (!currentWorkItem) {
- console.error("unsolicited response from worker");
+ logger.error("unsolicited response from worker");
return;
}
if (id !== currentWorkItem.rpcId) {
- console.error(`RPC with id ${id} has no registry entry`);
+ logger.error(`RPC with id ${id} has no registry entry`);
return;
}
-
- currentWorkItem.resolve(msg.data.result);
+ if (currentWorkItem.state === WorkItemState.Running) {
+ this.numBusy--;
+ currentWorkItem.state = WorkItemState.Finished;
+ if (msg.data.type === "success") {
+ currentWorkItem.resolve(msg.data.result);
+ } else if (msg.data.type === "error") {
+ currentWorkItem.reject(
+ TalerError.fromDetail(TalerErrorCode.WALLET_CRYPTO_WORKER_ERROR, {
+ innerError: msg.data.error,
+ }),
+ );
+ } else {
+ currentWorkItem.reject(new Error("bad message from crypto worker"));
+ }
+ }
+ this.findWork(ws);
}
cryptoApi: TalerCryptoInterface;
@@ -258,7 +282,7 @@ export class CryptoDispatcher {
this.cryptoApi = fns;
this.workerFactory = workerFactory;
- this.workers = new Array<WorkerState>(workerFactory.getConcurrency());
+ this.workers = new Array<WorkerInfo>(workerFactory.getConcurrency());
for (let i = 0; i < this.workers.length; i++) {
this.workers[i] = {
@@ -282,36 +306,42 @@ export class CryptoDispatcher {
if (this.stopped) {
throw new CryptoApiStoppedError();
}
- const p: Promise<T> = new Promise<T>((resolve, reject) => {
- const rpcId = this.nextRpcId++;
- const workItem: WorkItem = {
- operation,
- req,
- resolve,
- reject,
- rpcId,
- startTime: BigInt(0),
- };
-
- if (this.numBusy === this.workers.length) {
- const q = this.workQueues[priority];
- if (!q) {
- throw Error("assertion failed");
- }
- this.workQueues[priority].push(workItem);
- return;
+ const rpcId = this.nextRpcId++;
+ const myProm = openPromise<T>();
+ const workItem: WorkItem = {
+ operation,
+ req,
+ resolve: myProm.resolve,
+ reject: myProm.reject,
+ rpcId,
+ startTime: BigInt(0),
+ state: WorkItemState.Pending,
+ };
+ let scheduled = false;
+ if (this.numBusy === this.workers.length) {
+ // All workers are busy, queue work item
+ const q = this.workQueues[priority];
+ if (!q) {
+ throw Error("assertion failed");
}
-
+ this.workQueues[priority].push(workItem);
+ scheduled = true;
+ }
+ if (!scheduled) {
for (const ws of this.workers) {
if (ws.currentWorkItem !== null) {
continue;
}
this.wake(ws, workItem);
- return;
+ scheduled = true;
+ break;
}
+ }
+ if (!scheduled) {
+ // Could not schedule work.
throw Error("assertion failed");
- });
+ }
// Make sure that we wait for the result while a timer is active
// to prevent the event loop from dying, as just waiting for a promise
@@ -324,21 +354,27 @@ export class CryptoDispatcher {
logger.warn(`crypto RPC call ('${operation}') timed out`);
timedOut = true;
reject(new Error(`crypto RPC call ('${operation}') timed out`));
- });
- p.then((x) => {
- if (timedOut) {
- return;
- }
- timeout.clear();
- resolve(x);
- }).catch((x) => {
- logger.info(`crypto RPC call ${operation} threw`);
- if (timedOut) {
- return;
+ if (workItem.state === WorkItemState.Running) {
+ workItem.state = WorkItemState.Finished;
+ this.numBusy--;
}
- timeout.clear();
- reject(x);
});
+ myProm.promise
+ .then((x) => {
+ if (timedOut) {
+ return;
+ }
+ timeout.clear();
+ resolve(x);
+ })
+ .catch((x) => {
+ logger.info(`crypto RPC call ${operation} threw`);
+ if (timedOut) {
+ return;
+ }
+ timeout.clear();
+ reject(x);
+ });
});
}
}
diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
index 42370fc1b..71f137f29 100644
--- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
@@ -22,6 +22,7 @@ import { CryptoWorker } from "./cryptoWorkerInterface.js";
import os from "os";
import { Logger } from "@gnu-taler/taler-util";
import { nativeCryptoR } from "../cryptoImplementation.js";
+import { getErrorDetailFromException } from "../../errors.js";
const logger = new Logger("nodeThreadWorker.ts");
@@ -69,58 +70,72 @@ const workerCode = `
* a message.
*/
export function handleWorkerMessage(msg: any): void {
- const req = msg.req;
- if (typeof req !== "object") {
- console.error("request must be an object");
- return;
- }
- const id = msg.id;
- if (typeof id !== "number") {
- console.error("RPC id must be number");
- return;
- }
- const operation = msg.operation;
- if (typeof operation !== "string") {
- console.error("RPC operation must be string");
- return;
- }
-
const handleRequest = async (): Promise<void> => {
+ const req = msg.req;
+ if (typeof req !== "object") {
+ logger.error("request must be an object");
+ return;
+ }
+ const id = msg.id;
+ if (typeof id !== "number") {
+ logger.error("RPC id must be number");
+ return;
+ }
+ const operation = msg.operation;
+ if (typeof operation !== "string") {
+ logger.error("RPC operation must be string");
+ return;
+ }
const impl = nativeCryptoR;
if (!(operation in impl)) {
- console.error(`crypto operation '${operation}' not found`);
+ logger.error(`crypto operation '${operation}' not found`);
return;
}
+ let responseMsg: any;
+
try {
const result = await (impl as any)[operation](impl, req);
+ responseMsg = { data: { type: "success", result, id } };
+ } catch (e: any) {
+ logger.error(`error during operation: ${e.stack ?? e.toString()}`);
+ responseMsg = {
+ data: {
+ type: "error",
+ error: getErrorDetailFromException(e),
+ id,
+ },
+ };
+ }
+
+ try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const _r = "require";
const worker_threads: typeof import("worker_threads") =
module[_r]("worker_threads");
// const worker_threads = require("worker_threads");
-
const p = worker_threads.parentPort;
- worker_threads.parentPort?.postMessage;
if (p) {
- p.postMessage({ data: { result, id } });
+ p.postMessage(responseMsg);
} else {
- console.error("parent port not available (not running in thread?");
+ logger.error("parent port not available (not running in thread?");
}
- } catch (e) {
- console.error("error during operation", e);
+ } catch (e: any) {
+ logger.error(
+ `error sending back operation result: ${e.stack ?? e.toString()}`,
+ );
return;
}
};
handleRequest().catch((e) => {
- console.error("error in node worker", e);
+ logger.error("error in node worker", e);
});
}
export function handleWorkerError(e: Error): void {
- console.log("got error from worker", e);
+ logger.error(`got error from worker: ${e.stack ?? e.toString()}`);
}
export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory {
@@ -161,7 +176,7 @@ class NodeThreadCryptoWorker implements CryptoWorker {
this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true });
this.nodeWorker.on("error", (err: Error) => {
- console.error("error in node worker:", err);
+ logger.error("error in node worker:", err);
if (this.onerror) {
this.onerror(err);
}
diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts
index 4dda9cd11..f3cfc5ef9 100644
--- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts
@@ -15,6 +15,7 @@
*/
import { Logger } from "@gnu-taler/taler-util";
+import { getErrorDetailFromException } from "../../errors.js";
import {
nativeCryptoR,
TalerCryptoInterfaceR,
@@ -139,7 +140,7 @@ export class SynchronousCryptoWorker {
private dispatchMessage(msg: any): void {
if (this.onmessage) {
- this.onmessage({ data: msg });
+ this.onmessage(msg);
}
}
@@ -151,20 +152,27 @@ export class SynchronousCryptoWorker {
const impl = this.cryptoImplR;
if (!(operation in impl)) {
- console.error(`crypto operation '${operation}' not found`);
+ logger.error(`crypto operation '${operation}' not found`);
return;
}
- let result: any;
+ let responseMsg: any;
try {
- result = await (impl as any)[operation](impl, req);
+ const result = await (impl as any)[operation](impl, req);
+ responseMsg = { data: { type: "success", result, id } };
} catch (e: any) {
- logger.error(`error during operation '${operation}': ${e}`);
- return;
+ logger.error(`error during operation: ${e.stack ?? e.toString()}`);
+ responseMsg = {
+ data: {
+ type: "error",
+ id,
+ error: getErrorDetailFromException(e),
+ },
+ };
}
try {
- setTimeout(() => this.dispatchMessage({ result, id }), 0);
+ setTimeout(() => this.dispatchMessage(responseMsg), 0);
} catch (e) {
logger.error("got error during dispatch", e);
}
@@ -176,22 +184,22 @@ export class SynchronousCryptoWorker {
postMessage(msg: any): void {
const req = msg.req;
if (typeof req !== "object") {
- console.error("request must be an object");
+ logger.error("request must be an object");
return;
}
const id = msg.id;
if (typeof id !== "number") {
- console.error("RPC id must be number");
+ logger.error("RPC id must be number");
return;
}
const operation = msg.operation;
if (typeof operation !== "string") {
- console.error("RPC operation must be string");
+ logger.error("RPC operation must be string");
return;
}
this.handleRequest(operation, id, req).catch((e) => {
- console.error("Error while handling crypto request:", e);
+ logger.error("Error while handling crypto request:", e);
});
}