diff options
author | Jonathan Buchanan <jonathan.russ.buchanan@gmail.com> | 2021-07-16 22:54:02 -0400 |
---|---|---|
committer | Jonathan Buchanan <jonathan.russ.buchanan@gmail.com> | 2021-07-16 22:54:02 -0400 |
commit | 62545d0c3e98472d0f0d945e9a4c5cce5ea03baa (patch) | |
tree | 3b48174c3e8a60491d6db8e98b3969b4b88ae5d3 | |
parent | 7ca4230409a838d143a498b9ceeb5868de12e6f5 (diff) | |
download | iono-62545d0c3e98472d0f0d945e9a4c5cce5ea03baa.tar.gz iono-62545d0c3e98472d0f0d945e9a4c5cce5ea03baa.tar.bz2 iono-62545d0c3e98472d0f0d945e9a4c5cce5ea03baa.zip |
fix uv loop, redo threading code
-rw-r--r-- | iono/iono.cpp | 50 | ||||
-rw-r--r-- | iono/iono.swift | 148 |
2 files changed, 139 insertions, 59 deletions
diff --git a/iono/iono.cpp b/iono/iono.cpp index cec96f1..9a7d6e4 100644 --- a/iono/iono.cpp +++ b/iono/iono.cpp @@ -22,6 +22,7 @@ #include <uv.h> #define NODE_WANT_INTERNALS 1 +#include <env.h> #include <node_binding.h> #include <iostream> @@ -35,22 +36,23 @@ struct __IonoInstance std::unique_ptr<node::CommonEnvironmentSetup> setup; v8::Isolate *isolate; node::Environment *env; + uv_loop_t *event_loop; uv_async_t async_notify; - + bool break_requested; - + /* Notifications to swift */ __NotifyHandler notification_handler; void *notification_userdata; - + __IonoInstance(); - + char * evalJs(const char *js); - + void runNode(); - + void makeCallback(const char *callback); }; @@ -126,16 +128,16 @@ static void getModuleCode(const v8::FunctionCallbackInfo<v8::Value> &args); static const std::string main_code = "const publicRequire =" - " require('module').createRequire(process.cwd() + '/');" - " globalThis.require = publicRequire;" - " require('vm').runInThisContext(process.argv[1]);global.__node_run = (x) => {" - " 0 && console.log('running code', x);" - " global.eval(x);" - "};" - "" - "global.__native_onMessage = (x) => {" - " 0 && console.log('got __native_onMessage', x);" - "};"; + " require('module').createRequire(process.cwd() + '/');" + " globalThis.require = publicRequire;" + " require('vm').runInThisContext(process.argv[1]);" + "global.__node_run = (x) => {" + " 0 && console.log('running code', x);" + " global.eval(x);" + "};" + "global.__native_onMessage = (x) => {" + " 0 && console.log('got __native_onMessage', x);" + "};"; static void _register_iono(); @@ -144,16 +146,10 @@ __IonoInstance::__IonoInstance() : break_requested(false), notification_handler(nullptr) { - { - uv_loop_t *loop = uv_default_loop(); - uv_async_init(loop, &async_notify, ¬ifyCallback); - async_notify.data = this; - } - std::vector<std::string> args = { "node" }; std::vector<std::string> exec_args; std::vector<std::string> errors; - + if (nullptr == platform) { int exit_code = node::InitializeNodeWithArgs(&args, &exec_args, &errors); @@ -178,6 +174,10 @@ __IonoInstance::__IonoInstance() : isolate = setup->isolate(); env = setup->env(); + event_loop = setup->event_loop(); + + uv_async_init(event_loop, &async_notify, ¬ifyCallback); + async_notify.data = this; { v8::Locker locker(isolate); @@ -186,7 +186,7 @@ __IonoInstance::__IonoInstance() : v8::Context::Scope context_scope(setup->context()); node::LoadEnvironment(env, main_code.c_str()); - + v8::Local<v8::ObjectTemplate> data_template = v8::ObjectTemplate::New(isolate); data_template->SetInternalFieldCount(1); v8::Local<v8::Object> data_object = data_template->NewInstance(setup->context()).ToLocalChecked(); @@ -242,7 +242,7 @@ __IonoInstance::runNode() { v8::Context::Scope context_scope(setup->context()); break_requested = false; while (true) { - uv_run(uv_default_loop(), UV_RUN_ONCE); + uv_run(event_loop, UV_RUN_ONCE); platform->DrainTasks(isolate); if (break_requested) break; diff --git a/iono/iono.swift b/iono/iono.swift index 23e0104..1f07140 100644 --- a/iono/iono.swift +++ b/iono/iono.swift @@ -27,48 +27,127 @@ func notification_callback(payload: Optional<UnsafePointer<Int8>>, native.internalOnNotify(payload: string) } +struct Queue<T> { + var contents: [T] + + init() { + self.contents = [] + } + + mutating func push(_ element: T) { + contents.append(element) + } + + mutating func pop() -> T? { + if (contents.isEmpty) { + return nil + } else { + return contents.remove(at: 0) + } + } +} + +class NodeThread: Thread { + var iono: Iono! + var workQueue: Queue<() -> ()> + var initialized: Bool + var initCondition: NSCondition + + override init() { + self.workQueue = Queue<() -> ()>() + self.initialized = false + self.initCondition = NSCondition() + super.init() + } + + override func main() { + iono.instance = __initNative() + __setNotifyHandler(iono.instance, notification_callback, Unmanaged.passUnretained(iono).toOpaque()) + self.initialized = true + initCondition.broadcast() + while true { + __runNode(iono.instance) + while let workItem = workQueue.pop() { + workItem() + } + if iono.stopped { + break + } + } + } + + func waitUntilInitialized(block: @escaping () -> ()) { + if (self.initialized) { + block() + return + } + + initCondition.lock() + while (!self.initialized) { + initCondition.wait() + } + + block() + + initCondition.unlock() + } +} + public class Iono { + var stopped: Bool + var thread: NodeThread + var instance: OpaquePointer! - var work_queue: DispatchQueue - var initialization_group: DispatchGroup - var messageHandler: IonoMessageHandler? - - public init() { - work_queue = DispatchQueue(label: "NodeQueue", qos: .userInitiated) - initialization_group = DispatchGroup() - initialization_group.notify(queue: work_queue) { - self.instance = __initNative() - __setNotifyHandler(self.instance, notification_callback, Unmanaged.passUnretained(self).toOpaque()) - } + public var messageHandler: IonoMessageHandler? + + public init() { // We need to be calling runNode! + self.stopped = false + self.thread = NodeThread() + self.thread.iono = self + + self.thread.start() } - + deinit { __destroyNative(instance) } - - private func scheduleNodeThreadAsync(block: @escaping () -> Void) { - initialization_group.wait() - work_queue.async(execute: block) - notifyNative() + + private func scheduleNodeThreadAsync(block: @escaping () -> ()) { + thread.waitUntilInitialized { + self.thread.workQueue.push(block) + self.notifyNative() + } } - + private func scheduleNodeThreadSync(block: @escaping () -> Void) { - initialization_group.wait() - work_queue.sync(execute: block) - notifyNative() + var hasExecuted = false + let executeCondition = NSCondition() + + executeCondition.lock() + thread.waitUntilInitialized { + self.thread.workQueue.push { + block() + hasExecuted = true + executeCondition.broadcast() + } + self.notifyNative() + } + while (!hasExecuted) { + executeCondition.wait() + } + executeCondition.unlock() } - + public func internalOnNotify(payload: String) { if let handler = messageHandler { handler.handleMessage(message: payload) } } - + public func notifyNative() { - initialization_group.wait() __notifyNative(instance) } - + public func evalSimpleJs(source: String) -> String { var result: String? scheduleNodeThreadSync { @@ -80,13 +159,13 @@ public class Iono { } return result! } - + public func evalNodeCode(source: String) { scheduleNodeThreadAsync { __makeCallbackNative(self.instance, source.cString(using: .utf8)) } } - + public func sendMessage(message: String) { let encoded = message.data(using: .utf8)!.base64EncodedString() let source = """ @@ -99,15 +178,16 @@ public class Iono { """ evalNodeCode(source: source) } - + public func waitStopped() { - - } - - public func putModuleCode(modName: String, code: String) { scheduleNodeThreadSync { - __putModuleCodeNative(self.instance, modName.cString(using: .utf8), - code.cString(using: .utf8)) + self.stopped = true } + thread.cancel() + } + + public func putModuleCode(modName: String, code: String) { + __putModuleCodeNative(self.instance, modName.cString(using: .utf8), + code.cString(using: .utf8)) } } |