summaryrefslogtreecommitdiff
path: root/packages/taler-util/src/twrpc-impl.node.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-util/src/twrpc-impl.node.ts')
-rw-r--r--packages/taler-util/src/twrpc-impl.node.ts216
1 files changed, 216 insertions, 0 deletions
diff --git a/packages/taler-util/src/twrpc-impl.node.ts b/packages/taler-util/src/twrpc-impl.node.ts
new file mode 100644
index 000000000..30e362e5b
--- /dev/null
+++ b/packages/taler-util/src/twrpc-impl.node.ts
@@ -0,0 +1,216 @@
+/*
+ This file is part of GNU Taler
+ (C) 2023 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+import * as net from "node:net";
+import * as fs from "node:fs";
+import { Logger } from "./logging.js";
+import { bytesToString, typedArrayConcat } from "./taler-crypto.js";
+import type { RpcConnectArgs, RpcServerArgs } from "./twrpc.js";
+
+interface ReadLinewiseArgs {
+ onLine(lineData: Uint8Array): void;
+ sock: net.Socket;
+}
+
+const logger = new Logger("twrpc-impl.node.ts");
+
+function readStreamLinewise(args: ReadLinewiseArgs): void {
+ let chunks: Uint8Array[] = [];
+ args.sock.on("data", (buf: Uint8Array) => {
+ // Process all newlines in the newly received buffer
+ while (1) {
+ const newlineIdx = buf.indexOf("\n".charCodeAt(0));
+ if (newlineIdx >= 0) {
+ let left = buf.subarray(0, newlineIdx + 1);
+ let right = buf.subarray(newlineIdx + 1);
+ chunks.push(left);
+ const line = typedArrayConcat(chunks);
+ args.onLine(line);
+ chunks = [];
+ buf = right;
+ } else {
+ chunks.push(buf);
+ break;
+ }
+ }
+ });
+}
+
+export async function connectRpc<T>(args: RpcConnectArgs<T>): Promise<T> {
+ let sockFilename = args.socketFilename;
+ return new Promise((resolve, reject) => {
+ const client = net.createConnection(sockFilename);
+ client.on("error", (e) => {
+ reject(e);
+ });
+ client.on("connect", () => {
+ let parsingBody: string | undefined = undefined;
+ let bodyChunks: string[] = [];
+
+ logger.info("connected!");
+ client.write("%hello-from-client\n");
+ const res = args.onEstablished({
+ sendMessage(m) {
+ client.write("%request\n");
+ client.write(JSON.stringify(m));
+ client.write("\n");
+ client.write("%end\n");
+ },
+ close() {
+ client.destroy();
+ },
+ });
+ readStreamLinewise({
+ sock: client,
+ onLine(line) {
+ const lineStr = bytesToString(line);
+ // Are we currently parsing the body of a request?
+ if (!parsingBody) {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%message") {
+ parsingBody = "message";
+ } else if (strippedLine == "%hello-from-server") {
+ } else if (strippedLine.startsWith("%error:")) {
+ client.end();
+ res.onDisconnect();
+ } else {
+ logger.warn("got unknown request");
+ client.write("%error: invalid message\n");
+ client.end();
+ }
+ } else if (parsingBody == "message") {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%end") {
+ let req = bodyChunks.join("");
+ let reqJson: any = undefined;
+ try {
+ reqJson = JSON.parse(req);
+ } catch (e) {
+ logger.warn("JSON message from server was invalid");
+ logger.info(`message was: ${req}`);
+ }
+ if (reqJson !== undefined) {
+ res.onMessage(reqJson);
+ } else {
+ client.write("%error: invalid JSON");
+ client.end();
+ }
+ bodyChunks = [];
+ parsingBody = undefined;
+ } else {
+ bodyChunks.push(lineStr);
+ }
+ } else {
+ logger.info("invalid parser state");
+ client.write("%error: internal error\n");
+ client.end();
+ }
+ },
+ });
+ client.on("close", () => {
+ res.onDisconnect();
+ });
+ client.on("data", () => {});
+ resolve(res.result);
+ });
+ });
+}
+
+export async function runRpcServer(args: RpcServerArgs): Promise<void> {
+ let sockFilename = args.socketFilename;
+ try {
+ fs.unlinkSync(sockFilename);
+ } catch (e) {
+ // Do nothing!
+ }
+ return new Promise((resolve, reject) => {
+ const server = net.createServer((sock) => {
+ // Are we currently parsing the body of a request?
+ let parsingBody: string | undefined = undefined;
+ let bodyChunks: string[] = [];
+
+ sock.write("%hello-from-server\n");
+ const handlers = args.onConnect({
+ sendResponse(message) {
+ sock.write("%message\n");
+ sock.write(JSON.stringify(message));
+ sock.write("\n");
+ sock.write("%end\n");
+ },
+ });
+
+ sock.on("error", (err) => {
+ logger.error(`connection error: ${err}`);
+ });
+
+ function processLine(line: Uint8Array) {
+ const lineStr = bytesToString(line);
+ if (!parsingBody) {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%request") {
+ parsingBody = "request";
+ } else if (strippedLine === "%hello-from-client") {
+ // Nothing to do, ignore hello
+ } else if (strippedLine.startsWith("%error:")) {
+ logger.warn("got error from client");
+ sock.end();
+ handlers.onDisconnect();
+ } else {
+ logger.info("got unknown request");
+ sock.write("%error: invalid request\n");
+ sock.end();
+ }
+ } else if (parsingBody == "request") {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%end") {
+ let req = bodyChunks.join("");
+ let reqJson: any = undefined;
+ try {
+ reqJson = JSON.parse(req);
+ } catch (e) {
+ logger.warn("JSON request from client was invalid");
+ }
+ if (reqJson !== undefined) {
+ handlers.onMessage(reqJson);
+ } else {
+ sock.write("%error: invalid JSON");
+ sock.end();
+ }
+ bodyChunks = [];
+ parsingBody = undefined;
+ } else {
+ bodyChunks.push(lineStr);
+ }
+ } else {
+ logger.error("invalid parser state");
+ sock.write("%error: internal error\n");
+ sock.end();
+ }
+ }
+
+ readStreamLinewise({
+ sock,
+ onLine: processLine,
+ });
+
+ sock.on("close", (hadError: boolean) => {
+ logger.trace(`connection closed, hadError=${hadError}`);
+ handlers.onDisconnect();
+ });
+ });
+ server.listen(args.socketFilename);
+ });
+}