summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/worker-farm/lib/farm.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/node_modules/worker-farm/lib/farm.js')
-rw-r--r--deps/npm/node_modules/worker-farm/lib/farm.js339
1 files changed, 339 insertions, 0 deletions
diff --git a/deps/npm/node_modules/worker-farm/lib/farm.js b/deps/npm/node_modules/worker-farm/lib/farm.js
new file mode 100644
index 0000000000..bdc70e84a1
--- /dev/null
+++ b/deps/npm/node_modules/worker-farm/lib/farm.js
@@ -0,0 +1,339 @@
+'use strict'
+
+const DEFAULT_OPTIONS = {
+ maxCallsPerWorker : Infinity
+ , maxConcurrentWorkers : require('os').cpus().length
+ , maxConcurrentCallsPerWorker : 10
+ , maxConcurrentCalls : Infinity
+ , maxCallTime : Infinity // exceed this and the whole worker is terminated
+ , maxRetries : Infinity
+ , forcedKillTime : 100
+ , autoStart : false
+ }
+
+const extend = require('xtend')
+ , fork = require('./fork')
+ , TimeoutError = require('errno').create('TimeoutError')
+ , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
+ , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
+
+
+function Farm (options, path) {
+ this.options = extend(DEFAULT_OPTIONS, options)
+ this.path = path
+ this.activeCalls = 0
+}
+
+
+// make a handle to pass back in the form of an external API
+Farm.prototype.mkhandle = function (method) {
+ return function () {
+ let args = Array.prototype.slice.call(arguments)
+ if (this.activeCalls >= this.options.maxConcurrentCalls) {
+ let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')')
+ if (typeof args[args.length - 1] == 'function')
+ return process.nextTick(args[args.length - 1].bind(null, err))
+ throw err
+ }
+ this.addCall({
+ method : method
+ , callback : args.pop()
+ , args : args
+ , retries : 0
+ })
+ }.bind(this)
+}
+
+
+// a constructor of sorts
+Farm.prototype.setup = function (methods) {
+ let iface
+ if (!methods) { // single-function export
+ iface = this.mkhandle()
+ } else { // multiple functions on the export
+ iface = {}
+ methods.forEach(function (m) {
+ iface[m] = this.mkhandle(m)
+ }.bind(this))
+ }
+
+ this.searchStart = -1
+ this.childId = -1
+ this.children = {}
+ this.activeChildren = 0
+ this.callQueue = []
+
+ if (this.options.autoStart) {
+ while (this.activeChildren < this.options.maxConcurrentWorkers)
+ this.startChild()
+ }
+
+ return iface
+}
+
+
+// when a child exits, check if there are any outstanding jobs and requeue them
+Farm.prototype.onExit = function (childId) {
+ // delay this to give any sends a chance to finish
+ setTimeout(function () {
+ let doQueue = false
+ if (this.children[childId] && this.children[childId].activeCalls) {
+ this.children[childId].calls.forEach(function (call, i) {
+ if (!call) return
+ else if (call.retries >= this.options.maxRetries) {
+ this.receive({
+ idx : i
+ , child : childId
+ , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
+ })
+ } else {
+ call.retries++
+ this.callQueue.unshift(call)
+ doQueue = true
+ }
+ }.bind(this))
+ }
+ this.stopChild(childId)
+ doQueue && this.processQueue()
+ }.bind(this), 10)
+}
+
+
+// start a new worker
+Farm.prototype.startChild = function () {
+ this.childId++
+
+ let forked = fork(this.path)
+ , id = this.childId
+ , c = {
+ send : forked.send
+ , child : forked.child
+ , calls : []
+ , activeCalls : 0
+ , exitCode : null
+ }
+
+ forked.child.on('message', this.receive.bind(this))
+ forked.child.once('exit', function (code) {
+ c.exitCode = code
+ this.onExit(id)
+ }.bind(this))
+
+ this.activeChildren++
+ this.children[id] = c
+}
+
+
+// stop a worker, identified by id
+Farm.prototype.stopChild = function (childId) {
+ let child = this.children[childId]
+ if (child) {
+ child.send('die')
+ setTimeout(function () {
+ if (child.exitCode === null)
+ child.child.kill('SIGKILL')
+ }, this.options.forcedKillTime)
+ ;delete this.children[childId]
+ this.activeChildren--
+ }
+}
+
+
+// called from a child process, the data contains information needed to
+// look up the child and the original call so we can invoke the callback
+Farm.prototype.receive = function (data) {
+ let idx = data.idx
+ , childId = data.child
+ , args = data.args
+ , child = this.children[childId]
+ , call
+
+ if (!child) {
+ return console.error(
+ 'Worker Farm: Received message for unknown child. '
+ + 'This is likely as a result of premature child death, '
+ + 'the operation will have been re-queued.'
+ )
+ }
+
+ call = child.calls[idx]
+ if (!call) {
+ return console.error(
+ 'Worker Farm: Received message for unknown index for existing child. '
+ + 'This should not happen!'
+ )
+ }
+
+ if (this.options.maxCallTime !== Infinity)
+ clearTimeout(call.timer)
+
+ if (args[0] && args[0].$error == '$error') {
+ let e = args[0]
+ switch (e.type) {
+ case 'TypeError': args[0] = new TypeError(e.message); break
+ case 'RangeError': args[0] = new RangeError(e.message); break
+ case 'EvalError': args[0] = new EvalError(e.message); break
+ case 'ReferenceError': args[0] = new ReferenceError(e.message); break
+ case 'SyntaxError': args[0] = new SyntaxError(e.message); break
+ case 'URIError': args[0] = new URIError(e.message); break
+ default: args[0] = new Error(e.message)
+ }
+ args[0].type = e.type
+ args[0].stack = e.stack
+
+ // Copy any custom properties to pass it on.
+ Object.keys(e).forEach(function(key) {
+ args[0][key] = e[key];
+ });
+ }
+
+ process.nextTick(function () {
+ call.callback.apply(null, args)
+ })
+
+ ;delete child.calls[idx]
+ child.activeCalls--
+ this.activeCalls--
+
+ if (child.calls.length >= this.options.maxCallsPerWorker
+ && !Object.keys(child.calls).length) {
+ // this child has finished its run, kill it
+ this.stopChild(childId)
+ }
+
+ // allow any outstanding calls to be processed
+ this.processQueue()
+}
+
+
+Farm.prototype.childTimeout = function (childId) {
+ let child = this.children[childId]
+ , i
+
+ if (!child)
+ return
+
+ for (i in child.calls) {
+ this.receive({
+ idx : i
+ , child : childId
+ , args : [ new TimeoutError('worker call timed out!') ]
+ })
+ }
+ this.stopChild(childId)
+}
+
+
+// send a call to a worker, identified by id
+Farm.prototype.send = function (childId, call) {
+ let child = this.children[childId]
+ , idx = child.calls.length
+
+ child.calls.push(call)
+ child.activeCalls++
+ this.activeCalls++
+
+ child.send({
+ idx : idx
+ , child : childId
+ , method : call.method
+ , args : call.args
+ })
+
+ if (this.options.maxCallTime !== Infinity) {
+ call.timer =
+ setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
+ }
+}
+
+
+// a list of active worker ids, in order, but the starting offset is
+// shifted each time this method is called, so we work our way through
+// all workers when handing out jobs
+Farm.prototype.childKeys = function () {
+ let cka = Object.keys(this.children)
+ , cks
+
+ if (this.searchStart >= cka.length - 1)
+ this.searchStart = 0
+ else
+ this.searchStart++
+
+ cks = cka.splice(0, this.searchStart)
+
+ return cka.concat(cks)
+}
+
+
+// Calls are added to a queue, this processes the queue and is called
+// whenever there might be a chance to send more calls to the workers.
+// The various options all impact on when we're able to send calls,
+// they may need to be kept in a queue until a worker is ready.
+Farm.prototype.processQueue = function () {
+ let cka, i = 0, childId
+
+ if (!this.callQueue.length)
+ return this.ending && this.end()
+
+ if (this.activeChildren < this.options.maxConcurrentWorkers)
+ this.startChild()
+
+ for (cka = this.childKeys(); i < cka.length; i++) {
+ childId = +cka[i]
+ if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
+ && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
+
+ this.send(childId, this.callQueue.shift())
+ if (!this.callQueue.length)
+ return this.ending && this.end()
+ } /*else {
+ console.log(
+ , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
+ , this.children[childId].calls.length < this.options.maxCallsPerWorker
+ , this.children[childId].calls.length , this.options.maxCallsPerWorker)
+ }*/
+ }
+
+ if (this.ending)
+ this.end()
+}
+
+
+// add a new call to the call queue, then trigger a process of the queue
+Farm.prototype.addCall = function (call) {
+ if (this.ending)
+ return this.end() // don't add anything new to the queue
+ this.callQueue.push(call)
+ this.processQueue()
+}
+
+
+// kills child workers when they're all done
+Farm.prototype.end = function (callback) {
+ let complete = true
+ if (this.ending === false)
+ return
+ if (callback)
+ this.ending = callback
+ else if (this.ending == null)
+ this.ending = true
+ Object.keys(this.children).forEach(function (child) {
+ if (!this.children[child])
+ return
+ if (!this.children[child].activeCalls)
+ this.stopChild(child)
+ else
+ complete = false
+ }.bind(this))
+
+ if (complete && typeof this.ending == 'function') {
+ process.nextTick(function () {
+ this.ending()
+ this.ending = false
+ }.bind(this))
+ }
+}
+
+
+module.exports = Farm
+module.exports.TimeoutError = TimeoutError