summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/parallel-transform/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/node_modules/parallel-transform/index.js')
-rw-r--r--deps/npm/node_modules/parallel-transform/index.js105
1 files changed, 105 insertions, 0 deletions
diff --git a/deps/npm/node_modules/parallel-transform/index.js b/deps/npm/node_modules/parallel-transform/index.js
new file mode 100644
index 0000000000..77329e4ccf
--- /dev/null
+++ b/deps/npm/node_modules/parallel-transform/index.js
@@ -0,0 +1,105 @@
+var Transform = require('readable-stream').Transform;
+var inherits = require('inherits');
+var cyclist = require('cyclist');
+var util = require('util');
+
+var ParallelTransform = function(maxParallel, opts, ontransform) {
+ if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform);
+
+ if (typeof maxParallel === 'function') {
+ ontransform = maxParallel;
+ opts = null;
+ maxParallel = 1;
+ }
+ if (typeof opts === 'function') {
+ ontransform = opts;
+ opts = null;
+ }
+
+ if (!opts) opts = {};
+ if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16);
+ if (opts.objectMode !== false) opts.objectMode = true;
+
+ Transform.call(this, opts);
+
+ this._maxParallel = maxParallel;
+ this._ontransform = ontransform;
+ this._destroyed = false;
+ this._flushed = false;
+ this._ordered = opts.ordered !== false;
+ this._buffer = this._ordered ? cyclist(maxParallel) : [];
+ this._top = 0;
+ this._bottom = 0;
+ this._ondrain = null;
+};
+
+inherits(ParallelTransform, Transform);
+
+ParallelTransform.prototype.destroy = function() {
+ if (this._destroyed) return;
+ this._destroyed = true;
+ this.emit('close');
+};
+
+ParallelTransform.prototype._transform = function(chunk, enc, callback) {
+ var self = this;
+ var pos = this._top++;
+
+ this._ontransform(chunk, function(err, data) {
+ if (self._destroyed) return;
+ if (err) {
+ self.emit('error', err);
+ self.push(null);
+ self.destroy();
+ return;
+ }
+ if (self._ordered) {
+ self._buffer.put(pos, (data === undefined || data === null) ? null : data);
+ }
+ else {
+ self._buffer.push(data);
+ }
+ self._drain();
+ });
+
+ if (this._top - this._bottom < this._maxParallel) return callback();
+ this._ondrain = callback;
+};
+
+ParallelTransform.prototype._flush = function(callback) {
+ this._flushed = true;
+ this._ondrain = callback;
+ this._drain();
+};
+
+ParallelTransform.prototype._drain = function() {
+ if (this._ordered) {
+ while (this._buffer.get(this._bottom) !== undefined) {
+ var data = this._buffer.del(this._bottom++);
+ if (data === null) continue;
+ this.push(data);
+ }
+ }
+ else {
+ while (this._buffer.length > 0) {
+ var data = this._buffer.pop();
+ this._bottom++;
+ if (data === null) continue;
+ this.push(data);
+ }
+ }
+
+
+ if (!this._drained() || !this._ondrain) return;
+
+ var ondrain = this._ondrain;
+ this._ondrain = null;
+ ondrain();
+};
+
+ParallelTransform.prototype._drained = function() {
+ var diff = this._top - this._bottom;
+ return this._flushed ? !diff : diff < this._maxParallel;
+};
+
+module.exports = ParallelTransform;