'use strict'; const { FSReqCallback, writeBuffers } = internalBinding('fs'); const { ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE } = require('internal/errors').codes; const { validateNumber } = require('internal/validators'); const fs = require('fs'); const { Buffer } = require('buffer'); const { copyObject, getOptions, } = require('internal/fs/utils'); const { Readable, Writable } = require('stream'); const { toPathIfFileURL } = require('internal/url'); const kMinPoolSpace = 128; let pool; // It can happen that we expect to read a large chunk of data, and reserve // a large chunk of the pool accordingly, but the read() call only filled // a portion of it. If a concurrently executing read() then uses the same pool, // the "reserved" portion cannot be used, so we allow it to be re-used as a // new pool later. const poolFragments = []; function allocNewPool(poolSize) { if (poolFragments.length > 0) pool = poolFragments.pop(); else pool = Buffer.allocUnsafe(poolSize); pool.used = 0; } // Check the `this.start` and `this.end` of stream. function checkPosition(pos, name) { if (!Number.isSafeInteger(pos)) { validateNumber(pos, name); if (!Number.isInteger(pos)) throw new ERR_OUT_OF_RANGE(name, 'an integer', pos); throw new ERR_OUT_OF_RANGE(name, '>= 0 and <= 2 ** 53 - 1', pos); } if (pos < 0) { throw new ERR_OUT_OF_RANGE(name, '>= 0 and <= 2 ** 53 - 1', pos); } } function ReadStream(path, options) { if (!(this instanceof ReadStream)) return new ReadStream(path, options); // A little bit bigger buffer and water marks by default options = copyObject(getOptions(options, {})); if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; // For backwards compat do not emit close on destroy. options.emitClose = false; Readable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); this.fd = options.fd === undefined ? null : options.fd; this.flags = options.flags === undefined ? 'r' : options.flags; this.mode = options.mode === undefined ? 0o666 : options.mode; this.start = options.start; this.end = options.end; this.autoClose = options.autoClose === undefined ? true : options.autoClose; this.pos = undefined; this.bytesRead = 0; this.closed = false; if (this.start !== undefined) { checkPosition(this.start, 'start'); this.pos = this.start; } if (this.end === undefined) { this.end = Infinity; } else if (this.end !== Infinity) { checkPosition(this.end, 'end'); if (this.start !== undefined && this.start > this.end) { throw new ERR_OUT_OF_RANGE( 'start', `<= "end" (here: ${this.end})`, this.start ); } } if (typeof this.fd !== 'number') this.open(); this.on('end', function() { if (this.autoClose) { this.destroy(); } }); } Object.setPrototypeOf(ReadStream.prototype, Readable.prototype); Object.setPrototypeOf(ReadStream, Readable); ReadStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { if (er) { if (this.autoClose) { this.destroy(); } this.emit('error', er); return; } this.fd = fd; this.emit('open', fd); this.emit('ready'); // start the flow of data. this.read(); }); }; ReadStream.prototype._read = function(n) { if (typeof this.fd !== 'number') { return this.once('open', function() { this._read(n); }); } if (this.destroyed) return; if (!pool || pool.length - pool.used < kMinPoolSpace) { // discard the old pool. allocNewPool(this.readableHighWaterMark); } // Grab another reference to the pool in the case that while we're // in the thread pool another read() finishes up the pool, and // allocates a new one. const thisPool = pool; let toRead = Math.min(pool.length - pool.used, n); const start = pool.used; if (this.pos !== undefined) toRead = Math.min(this.end - this.pos + 1, toRead); else toRead = Math.min(this.end - this.bytesRead + 1, toRead); // already read everything we were supposed to read! // treat as EOF. if (toRead <= 0) return this.push(null); // the actual read. fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { if (er) { if (this.autoClose) { this.destroy(); } this.emit('error', er); } else { let b = null; // Now that we know how much data we have actually read, re-wind the // 'used' field if we can, and otherwise allow the remainder of our // reservation to be used as a new pool later. if (start + toRead === thisPool.used && thisPool === pool) thisPool.used += bytesRead - toRead; else if (toRead - bytesRead > kMinPoolSpace) poolFragments.push(thisPool.slice(start + bytesRead, start + toRead)); if (bytesRead > 0) { this.bytesRead += bytesRead; b = thisPool.slice(start, start + bytesRead); } this.push(b); } }); // Move the pool positions, and internal position for reading. if (this.pos !== undefined) this.pos += toRead; pool.used += toRead; }; ReadStream.prototype._destroy = function(err, cb) { if (typeof this.fd !== 'number') { this.once('open', closeFsStream.bind(null, this, cb, err)); return; } closeFsStream(this, cb, err); this.fd = null; }; function closeFsStream(stream, cb, err) { fs.close(stream.fd, (er) => { er = er || err; cb(er); stream.closed = true; if (!er) stream.emit('close'); }); } ReadStream.prototype.close = function(cb) { this.destroy(null, cb); }; Object.defineProperty(ReadStream.prototype, 'pending', { get() { return this.fd === null; }, configurable: true }); function WriteStream(path, options) { if (!(this instanceof WriteStream)) return new WriteStream(path, options); options = copyObject(getOptions(options, {})); // For backwards compat do not emit close on destroy. options.emitClose = false; Writable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); this.fd = options.fd === undefined ? null : options.fd; this.flags = options.flags === undefined ? 'w' : options.flags; this.mode = options.mode === undefined ? 0o666 : options.mode; this.start = options.start; this.autoClose = options.autoClose === undefined ? true : !!options.autoClose; this.pos = undefined; this.bytesWritten = 0; this.closed = false; if (this.start !== undefined) { if (typeof this.start !== 'number') { throw new ERR_INVALID_ARG_TYPE('start', 'number', this.start); } if (this.start < 0) { const errVal = `{start: ${this.start}}`; throw new ERR_OUT_OF_RANGE('start', '>= 0', errVal); } this.pos = this.start; } if (options.encoding) this.setDefaultEncoding(options.encoding); if (typeof this.fd !== 'number') this.open(); } Object.setPrototypeOf(WriteStream.prototype, Writable.prototype); Object.setPrototypeOf(WriteStream, Writable); WriteStream.prototype._final = function(callback) { if (this.autoClose) { this.destroy(); } callback(); }; WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { if (er) { if (this.autoClose) { this.destroy(); } this.emit('error', er); return; } this.fd = fd; this.emit('open', fd); this.emit('ready'); }); }; WriteStream.prototype._write = function(data, encoding, cb) { if (!(data instanceof Buffer)) { const err = new ERR_INVALID_ARG_TYPE('data', 'Buffer', data); return this.emit('error', err); } if (typeof this.fd !== 'number') { return this.once('open', function() { this._write(data, encoding, cb); }); } fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { if (er) { if (this.autoClose) { this.destroy(); } return cb(er); } this.bytesWritten += bytes; cb(); }); if (this.pos !== undefined) this.pos += data.length; }; function writev(fd, chunks, position, callback) { function wrapper(err, written) { // Retain a reference to chunks so that they can't be GC'ed too soon. callback(err, written || 0, chunks); } const req = new FSReqCallback(); req.oncomplete = wrapper; writeBuffers(fd, chunks, position, req); } WriteStream.prototype._writev = function(data, cb) { if (typeof this.fd !== 'number') { return this.once('open', function() { this._writev(data, cb); }); } const self = this; const len = data.length; const chunks = new Array(len); let size = 0; for (var i = 0; i < len; i++) { const chunk = data[i].chunk; chunks[i] = chunk; size += chunk.length; } writev(this.fd, chunks, this.pos, function(er, bytes) { if (er) { self.destroy(); return cb(er); } self.bytesWritten += bytes; cb(); }); if (this.pos !== undefined) this.pos += size; }; WriteStream.prototype._destroy = ReadStream.prototype._destroy; WriteStream.prototype.close = function(cb) { if (cb) { if (this.closed) { process.nextTick(cb); return; } else { this.on('close', cb); } } // If we are not autoClosing, we should call // destroy on 'finish'. if (!this.autoClose) { this.on('finish', this.destroy.bind(this)); } // we use end() instead of destroy() because of // https://github.com/nodejs/node/issues/2006 this.end(); }; // There is no shutdown() for files. WriteStream.prototype.destroySoon = WriteStream.prototype.end; Object.defineProperty(WriteStream.prototype, 'pending', { get() { return this.fd === null; }, configurable: true }); module.exports = { ReadStream, WriteStream };