diff options
Diffstat (limited to 'deps/npm/node_modules/libcipm/node_modules/worker-farm/lib')
4 files changed, 457 insertions, 0 deletions
diff --git a/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/child/index.js b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/child/index.js new file mode 100644 index 0000000000..f91e08433a --- /dev/null +++ b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/child/index.js @@ -0,0 +1,52 @@ +'use strict' + +let $module + +/* + let contextProto = this.context; + while (contextProto = Object.getPrototypeOf(contextProto)) { + completionGroups.push(Object.getOwnPropertyNames(contextProto)); + } +*/ + + +function handle (data) { + let idx = data.idx + , child = data.child + , method = data.method + , args = data.args + , callback = function () { + let _args = Array.prototype.slice.call(arguments) + if (_args[0] instanceof Error) { + let e = _args[0] + _args[0] = { + '$error' : '$error' + , 'type' : e.constructor.name + , 'message' : e.message + , 'stack' : e.stack + } + Object.keys(e).forEach(function(key) { + _args[0][key] = e[key] + }) + } + process.send({ idx: idx, child: child, args: _args }) + } + , exec + + if (method == null && typeof $module == 'function') + exec = $module + else if (typeof $module[method] == 'function') + exec = $module[method] + + if (!exec) + return console.error('NO SUCH METHOD:', method) + + exec.apply(null, args.concat([ callback ])) +} + + +process.on('message', function (data) { + if (!$module) return $module = require(data.module) + if (data == 'die') return process.exit(0) + handle(data) +}) diff --git a/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/farm.js b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/farm.js new file mode 100644 index 0000000000..c77c95d2c7 --- /dev/null +++ b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/farm.js @@ -0,0 +1,339 @@ +'use strict' + +const DEFAULT_OPTIONS = { + maxCallsPerWorker : Infinity + , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length + , maxConcurrentCallsPerWorker : 10 + , maxConcurrentCalls : Infinity + , maxCallTime : Infinity // exceed this and the whole worker is terminated + , maxRetries : Infinity + , forcedKillTime : 100 + , autoStart : false + } + +const extend = require('xtend') + , fork = require('./fork') + , TimeoutError = require('errno').create('TimeoutError') + , ProcessTerminatedError = require('errno').create('ProcessTerminatedError') + , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') + + +function Farm (options, path) { + this.options = extend(DEFAULT_OPTIONS, options) + this.path = path + this.activeCalls = 0 +} + + +// make a handle to pass back in the form of an external API +Farm.prototype.mkhandle = function (method) { + return function () { + let args = Array.prototype.slice.call(arguments) + if (this.activeCalls >= this.options.maxConcurrentCalls) { + let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')') + if (typeof args[args.length - 1] == 'function') + return process.nextTick(args[args.length - 1].bind(null, err)) + throw err + } + this.addCall({ + method : method + , callback : args.pop() + , args : args + , retries : 0 + }) + }.bind(this) +} + + +// a constructor of sorts +Farm.prototype.setup = function (methods) { + let iface + if (!methods) { // single-function export + iface = this.mkhandle() + } else { // multiple functions on the export + iface = {} + methods.forEach(function (m) { + iface[m] = this.mkhandle(m) + }.bind(this)) + } + + this.searchStart = -1 + this.childId = -1 + this.children = {} + this.activeChildren = 0 + this.callQueue = [] + + if (this.options.autoStart) { + while (this.activeChildren < this.options.maxConcurrentWorkers) + this.startChild() + } + + return iface +} + + +// when a child exits, check if there are any outstanding jobs and requeue them +Farm.prototype.onExit = function (childId) { + // delay this to give any sends a chance to finish + setTimeout(function () { + let doQueue = false + if (this.children[childId] && this.children[childId].activeCalls) { + this.children[childId].calls.forEach(function (call, i) { + if (!call) return + else if (call.retries >= this.options.maxRetries) { + this.receive({ + idx : i + , child : childId + , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] + }) + } else { + call.retries++ + this.callQueue.unshift(call) + doQueue = true + } + }.bind(this)) + } + this.stopChild(childId) + doQueue && this.processQueue() + }.bind(this), 10) +} + + +// start a new worker +Farm.prototype.startChild = function () { + this.childId++ + + let forked = fork(this.path) + , id = this.childId + , c = { + send : forked.send + , child : forked.child + , calls : [] + , activeCalls : 0 + , exitCode : null + } + + forked.child.on('message', this.receive.bind(this)) + forked.child.once('exit', function (code) { + c.exitCode = code + this.onExit(id) + }.bind(this)) + + this.activeChildren++ + this.children[id] = c +} + + +// stop a worker, identified by id +Farm.prototype.stopChild = function (childId) { + let child = this.children[childId] + if (child) { + child.send('die') + setTimeout(function () { + if (child.exitCode === null) + child.child.kill('SIGKILL') + }, this.options.forcedKillTime).unref() + ;delete this.children[childId] + this.activeChildren-- + } +} + + +// called from a child process, the data contains information needed to +// look up the child and the original call so we can invoke the callback +Farm.prototype.receive = function (data) { + let idx = data.idx + , childId = data.child + , args = data.args + , child = this.children[childId] + , call + + if (!child) { + return console.error( + 'Worker Farm: Received message for unknown child. ' + + 'This is likely as a result of premature child death, ' + + 'the operation will have been re-queued.' + ) + } + + call = child.calls[idx] + if (!call) { + return console.error( + 'Worker Farm: Received message for unknown index for existing child. ' + + 'This should not happen!' + ) + } + + if (this.options.maxCallTime !== Infinity) + clearTimeout(call.timer) + + if (args[0] && args[0].$error == '$error') { + let e = args[0] + switch (e.type) { + case 'TypeError': args[0] = new TypeError(e.message); break + case 'RangeError': args[0] = new RangeError(e.message); break + case 'EvalError': args[0] = new EvalError(e.message); break + case 'ReferenceError': args[0] = new ReferenceError(e.message); break + case 'SyntaxError': args[0] = new SyntaxError(e.message); break + case 'URIError': args[0] = new URIError(e.message); break + default: args[0] = new Error(e.message) + } + args[0].type = e.type + args[0].stack = e.stack + + // Copy any custom properties to pass it on. + Object.keys(e).forEach(function(key) { + args[0][key] = e[key]; + }); + } + + process.nextTick(function () { + call.callback.apply(null, args) + }) + + ;delete child.calls[idx] + child.activeCalls-- + this.activeCalls-- + + if (child.calls.length >= this.options.maxCallsPerWorker + && !Object.keys(child.calls).length) { + // this child has finished its run, kill it + this.stopChild(childId) + } + + // allow any outstanding calls to be processed + this.processQueue() +} + + +Farm.prototype.childTimeout = function (childId) { + let child = this.children[childId] + , i + + if (!child) + return + + for (i in child.calls) { + this.receive({ + idx : i + , child : childId + , args : [ new TimeoutError('worker call timed out!') ] + }) + } + this.stopChild(childId) +} + + +// send a call to a worker, identified by id +Farm.prototype.send = function (childId, call) { + let child = this.children[childId] + , idx = child.calls.length + + child.calls.push(call) + child.activeCalls++ + this.activeCalls++ + + child.send({ + idx : idx + , child : childId + , method : call.method + , args : call.args + }) + + if (this.options.maxCallTime !== Infinity) { + call.timer = + setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) + } +} + + +// a list of active worker ids, in order, but the starting offset is +// shifted each time this method is called, so we work our way through +// all workers when handing out jobs +Farm.prototype.childKeys = function () { + let cka = Object.keys(this.children) + , cks + + if (this.searchStart >= cka.length - 1) + this.searchStart = 0 + else + this.searchStart++ + + cks = cka.splice(0, this.searchStart) + + return cka.concat(cks) +} + + +// Calls are added to a queue, this processes the queue and is called +// whenever there might be a chance to send more calls to the workers. +// The various options all impact on when we're able to send calls, +// they may need to be kept in a queue until a worker is ready. +Farm.prototype.processQueue = function () { + let cka, i = 0, childId + + if (!this.callQueue.length) + return this.ending && this.end() + + if (this.activeChildren < this.options.maxConcurrentWorkers) + this.startChild() + + for (cka = this.childKeys(); i < cka.length; i++) { + childId = +cka[i] + if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker + && this.children[childId].calls.length < this.options.maxCallsPerWorker) { + + this.send(childId, this.callQueue.shift()) + if (!this.callQueue.length) + return this.ending && this.end() + } /*else { + console.log( + , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker + , this.children[childId].calls.length < this.options.maxCallsPerWorker + , this.children[childId].calls.length , this.options.maxCallsPerWorker) + }*/ + } + + if (this.ending) + this.end() +} + + +// add a new call to the call queue, then trigger a process of the queue +Farm.prototype.addCall = function (call) { + if (this.ending) + return this.end() // don't add anything new to the queue + this.callQueue.push(call) + this.processQueue() +} + + +// kills child workers when they're all done +Farm.prototype.end = function (callback) { + let complete = true + if (this.ending === false) + return + if (callback) + this.ending = callback + else if (this.ending == null) + this.ending = true + Object.keys(this.children).forEach(function (child) { + if (!this.children[child]) + return + if (!this.children[child].activeCalls) + this.stopChild(child) + else + complete = false + }.bind(this)) + + if (complete && typeof this.ending == 'function') { + process.nextTick(function () { + this.ending() + this.ending = false + }.bind(this)) + } +} + + +module.exports = Farm +module.exports.TimeoutError = TimeoutError diff --git a/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/fork.js b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/fork.js new file mode 100644 index 0000000000..46cf79b73e --- /dev/null +++ b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/fork.js @@ -0,0 +1,32 @@ +'use strict' + +const childProcess = require('child_process') + , childModule = require.resolve('./child/index') + + +function fork (forkModule) { + // suppress --debug / --inspect flags while preserving others (like --harmony) + let filteredArgs = process.execArgv.filter(function (v) { + return !(/^--(debug|inspect)/).test(v) + }) + , child = childProcess.fork(childModule, process.argv, { + execArgv: filteredArgs + , env: process.env + , cwd: process.cwd() + }) + + child.on('error', function() { + // this *should* be picked up by onExit and the operation requeued + }) + + child.send({ module: forkModule }) + + // return a send() function for this child + return { + send : child.send.bind(child) + , child : child + } +} + + +module.exports = fork diff --git a/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/index.js b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/index.js new file mode 100644 index 0000000000..4df0902f84 --- /dev/null +++ b/deps/npm/node_modules/libcipm/node_modules/worker-farm/lib/index.js @@ -0,0 +1,34 @@ +'use strict' + +const Farm = require('./farm') + +let farms = [] // keep record of farms so we can end() them if required + + +function farm (options, path, methods) { + if (typeof options == 'string') { + methods = path + path = options + options = {} + } + + let f = new Farm(options, path) + , api = f.setup(methods) + + farms.push({ farm: f, api: api }) + + // return the public API + return api +} + + +function end (api, callback) { + for (let i = 0; i < farms.length; i++) + if (farms[i] && farms[i].api === api) + return farms[i].farm.end(callback) + process.nextTick(callback.bind(null, 'Worker farm not found!')) +} + + +module.exports = farm +module.exports.end = end
\ No newline at end of file |