summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/crypto/workers/rpcClient.ts
blob: a8df8b4c68f8239f218ffd0d8f53ed5f695a4447 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*
 This file is part of GNU Taler
 (C) 2022 Taler Systems S.A.

 GNU Taler is free software; you can redistribute it and/or modify it under the
 terms of the GNU General Public License as published by the Free Software
 Foundation; either version 3, or (at your option) any later version.

 GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

 You should have received a copy of the GNU General Public License along with
 GNU Taler; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */

/**
 * Imports.
 */
import { Logger } from "@gnu-taler/taler-util";
import child_process from "child_process";
import type internal from "stream";
import { OpenedPromise, openPromise } from "../../util/promiseUtils.js";

const logger = new Logger("synchronousWorkerFactory.ts");

export class CryptoRpcClient {
  proc: child_process.ChildProcessByStdio<
    internal.Writable,
    internal.Readable,
    null
  >;
  requests: Array<{
    p: OpenedPromise<any>;
    req: any;
  }> = [];

  constructor() {
    const stdoutChunks: Buffer[] = [];
    this.proc = child_process.spawn("taler-crypto-worker", {
      //stdio: ["pipe", "pipe", "inherit"],
      stdio: ["pipe", "pipe", "inherit"],
      detached: true,
    });
    this.proc.on("close", (): void => {
      logger.error("child process exited");
    });
    (this.proc.stdout as any).unref();
    (this.proc.stdin as any).unref();
    this.proc.unref();

    this.proc.stdout.on("data", (x) => {
      // console.log("got chunk", x.toString("utf-8"));
      if (x instanceof Buffer) {
        const nlIndex = x.indexOf("\n");
        if (nlIndex >= 0) {
          const before = x.slice(0, nlIndex);
          const after = x.slice(nlIndex + 1);
          stdoutChunks.push(after);
          const str = Buffer.concat([...stdoutChunks, before]).toString(
            "utf-8",
          );
          const req = this.requests.shift();
          if (!req) {
            throw Error("request was undefined");
          }
          if (this.requests.length === 0) {
            this.proc.unref();
          }
          //logger.info(`got response: ${str}`);
          req.p.resolve(JSON.parse(str));
        } else {
          stdoutChunks.push(x);
        }
      } else {
        throw Error(`unexpected data chunk type (${typeof x})`);
      }
    });
  }

  async queueRequest(req: any): Promise<any> {
    const p = openPromise<any>();
    if (this.requests.length === 0) {
      this.proc.ref();
    }
    this.requests.push({ req, p });
    this.proc.stdin.write(`${JSON.stringify(req)}\n`);
    return p.promise;
  }
}