diff options
Diffstat (limited to 'packages/taler-util/src/twrpc-impl.node.ts')
-rw-r--r-- | packages/taler-util/src/twrpc-impl.node.ts | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/packages/taler-util/src/twrpc-impl.node.ts b/packages/taler-util/src/twrpc-impl.node.ts new file mode 100644 index 000000000..30e362e5b --- /dev/null +++ b/packages/taler-util/src/twrpc-impl.node.ts @@ -0,0 +1,216 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +import * as net from "node:net"; +import * as fs from "node:fs"; +import { Logger } from "./logging.js"; +import { bytesToString, typedArrayConcat } from "./taler-crypto.js"; +import type { RpcConnectArgs, RpcServerArgs } from "./twrpc.js"; + +interface ReadLinewiseArgs { + onLine(lineData: Uint8Array): void; + sock: net.Socket; +} + +const logger = new Logger("twrpc-impl.node.ts"); + +function readStreamLinewise(args: ReadLinewiseArgs): void { + let chunks: Uint8Array[] = []; + args.sock.on("data", (buf: Uint8Array) => { + // Process all newlines in the newly received buffer + while (1) { + const newlineIdx = buf.indexOf("\n".charCodeAt(0)); + if (newlineIdx >= 0) { + let left = buf.subarray(0, newlineIdx + 1); + let right = buf.subarray(newlineIdx + 1); + chunks.push(left); + const line = typedArrayConcat(chunks); + args.onLine(line); + chunks = []; + buf = right; + } else { + chunks.push(buf); + break; + } + } + }); +} + +export async function connectRpc<T>(args: RpcConnectArgs<T>): Promise<T> { + let sockFilename = args.socketFilename; + return new Promise((resolve, reject) => { + const client = net.createConnection(sockFilename); + client.on("error", (e) => { + reject(e); + }); + client.on("connect", () => { + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + logger.info("connected!"); + client.write("%hello-from-client\n"); + const res = args.onEstablished({ + sendMessage(m) { + client.write("%request\n"); + client.write(JSON.stringify(m)); + client.write("\n"); + client.write("%end\n"); + }, + close() { + client.destroy(); + }, + }); + readStreamLinewise({ + sock: client, + onLine(line) { + const lineStr = bytesToString(line); + // Are we currently parsing the body of a request? + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%message") { + parsingBody = "message"; + } else if (strippedLine == "%hello-from-server") { + } else if (strippedLine.startsWith("%error:")) { + client.end(); + res.onDisconnect(); + } else { + logger.warn("got unknown request"); + client.write("%error: invalid message\n"); + client.end(); + } + } else if (parsingBody == "message") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON message from server was invalid"); + logger.info(`message was: ${req}`); + } + if (reqJson !== undefined) { + res.onMessage(reqJson); + } else { + client.write("%error: invalid JSON"); + client.end(); + } + bodyChunks = []; + parsingBody = undefined; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.info("invalid parser state"); + client.write("%error: internal error\n"); + client.end(); + } + }, + }); + client.on("close", () => { + res.onDisconnect(); + }); + client.on("data", () => {}); + resolve(res.result); + }); + }); +} + +export async function runRpcServer(args: RpcServerArgs): Promise<void> { + let sockFilename = args.socketFilename; + try { + fs.unlinkSync(sockFilename); + } catch (e) { + // Do nothing! + } + return new Promise((resolve, reject) => { + const server = net.createServer((sock) => { + // Are we currently parsing the body of a request? + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + sock.write("%hello-from-server\n"); + const handlers = args.onConnect({ + sendResponse(message) { + sock.write("%message\n"); + sock.write(JSON.stringify(message)); + sock.write("\n"); + sock.write("%end\n"); + }, + }); + + sock.on("error", (err) => { + logger.error(`connection error: ${err}`); + }); + + function processLine(line: Uint8Array) { + const lineStr = bytesToString(line); + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%request") { + parsingBody = "request"; + } else if (strippedLine === "%hello-from-client") { + // Nothing to do, ignore hello + } else if (strippedLine.startsWith("%error:")) { + logger.warn("got error from client"); + sock.end(); + handlers.onDisconnect(); + } else { + logger.info("got unknown request"); + sock.write("%error: invalid request\n"); + sock.end(); + } + } else if (parsingBody == "request") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON request from client was invalid"); + } + if (reqJson !== undefined) { + handlers.onMessage(reqJson); + } else { + sock.write("%error: invalid JSON"); + sock.end(); + } + bodyChunks = []; + parsingBody = undefined; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.error("invalid parser state"); + sock.write("%error: internal error\n"); + sock.end(); + } + } + + readStreamLinewise({ + sock, + onLine: processLine, + }); + + sock.on("close", (hadError: boolean) => { + logger.trace(`connection closed, hadError=${hadError}`); + handlers.onDisconnect(); + }); + }); + server.listen(args.socketFilename); + }); +} |