fs: refactor fs module

PR-URL: https://github.com/nodejs/node/pull/20764
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
This commit is contained in:
James M Snell 2018-05-15 12:34:49 -07:00
parent a14a0fa8dc
commit 7f0f978aff
12 changed files with 1174 additions and 1012 deletions

1402
lib/fs.js

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,108 @@
'use strict';
const { Buffer } = require('buffer');
const { FSReqWrap, close, read } = process.binding('fs');
const kReadFileBufferLength = 8 * 1024;
function readFileAfterRead(err, bytesRead) {
const context = this.context;
if (err)
return context.close(err);
if (bytesRead === 0)
return context.close();
context.pos += bytesRead;
if (context.size !== 0) {
if (context.pos === context.size)
context.close();
else
context.read();
} else {
// unknown size, just read until we don't get bytes.
context.buffers.push(context.buffer.slice(0, bytesRead));
context.read();
}
}
function readFileAfterClose(err) {
const context = this.context;
const callback = context.callback;
let buffer = null;
if (context.err || err)
return callback(context.err || err);
try {
if (context.size === 0)
buffer = Buffer.concat(context.buffers, context.pos);
else if (context.pos < context.size)
buffer = context.buffer.slice(0, context.pos);
else
buffer = context.buffer;
if (context.encoding)
buffer = buffer.toString(context.encoding);
} catch (err) {
return callback(err);
}
callback(null, buffer);
}
class ReadFileContext {
constructor(callback, encoding) {
this.fd = undefined;
this.isUserFd = undefined;
this.size = undefined;
this.callback = callback;
this.buffers = null;
this.buffer = null;
this.pos = 0;
this.encoding = encoding;
this.err = null;
}
read() {
let buffer;
let offset;
let length;
if (this.size === 0) {
buffer = this.buffer = Buffer.allocUnsafeSlow(kReadFileBufferLength);
offset = 0;
length = kReadFileBufferLength;
} else {
buffer = this.buffer;
offset = this.pos;
length = Math.min(kReadFileBufferLength, this.size - this.pos);
}
const req = new FSReqWrap();
req.oncomplete = readFileAfterRead;
req.context = this;
read(this.fd, buffer, offset, length, -1, req);
}
close(err) {
const req = new FSReqWrap();
req.oncomplete = readFileAfterClose;
req.context = this;
this.err = err;
if (this.isUserFd) {
process.nextTick(function tick() {
req.oncomplete(null);
});
return;
}
close(this.fd, req);
}
}
module.exports = ReadFileContext;

383
lib/internal/fs/streams.js Normal file
View File

@ -0,0 +1,383 @@
'use strict';
const {
FSReqWrap,
writeBuffers
} = process.binding('fs');
const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE
} = require('internal/errors').codes;
const fs = require('fs');
const { Buffer } = require('buffer');
const {
copyObject,
getOptions,
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { getPathFromURL } = require('internal/url');
const util = require('util');
const kMinPoolSpace = 128;
let pool;
function allocNewPool(poolSize) {
pool = Buffer.allocUnsafe(poolSize);
pool.used = 0;
}
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
// a little bit bigger buffer and water marks by default
options = copyObject(getOptions(options, {}));
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
// for backwards compat do not emit close on destroy.
options.emitClose = false;
Readable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
this.path = getPathFromURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.end = options.end;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
if (this.start !== undefined) {
if (!Number.isSafeInteger(this.start)) {
if (typeof this.start !== 'number')
throw new ERR_INVALID_ARG_TYPE('start', 'number', this.start);
if (!Number.isInteger(this.start))
throw new ERR_OUT_OF_RANGE('start', 'an integer', this.start);
throw new ERR_OUT_OF_RANGE(
'start',
'>= 0 and <= 2 ** 53 - 1',
this.start
);
}
if (this.start < 0) {
throw new ERR_OUT_OF_RANGE(
'start',
'>= 0 and <= 2 ** 53 - 1',
this.start
);
}
this.pos = this.start;
}
if (this.end === undefined) {
this.end = Infinity;
} else if (this.end !== Infinity) {
if (!Number.isSafeInteger(this.end)) {
if (typeof this.end !== 'number')
throw new ERR_INVALID_ARG_TYPE('end', 'number', this.end);
if (!Number.isInteger(this.end))
throw new ERR_OUT_OF_RANGE('end', 'an integer', this.end);
throw new ERR_OUT_OF_RANGE('end', '>= 0 and <= 2 ** 53 - 1', this.end);
}
if (this.end < 0) {
throw new ERR_OUT_OF_RANGE('end', '>= 0 and <= 2 ** 53 - 1', this.end);
}
if (this.start !== undefined && this.start > this.end) {
throw new ERR_OUT_OF_RANGE(
'start',
`<= "end" (here: ${this.end})`,
this.start
);
}
}
if (typeof this.fd !== 'number')
this.open();
this.on('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}
util.inherits(ReadStream, Readable);
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
return;
}
this.fd = fd;
this.emit('open', fd);
this.emit('ready');
// start the flow of data.
this.read();
});
};
ReadStream.prototype._read = function(n) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._read(n);
});
}
if (this.destroyed)
return;
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool.
allocNewPool(this.readableHighWaterMark);
}
// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
const thisPool = pool;
let toRead = Math.min(pool.length - pool.used, n);
const start = pool.used;
if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
else
toRead = Math.min(this.end - this.bytesRead + 1, toRead);
// already read everything we were supposed to read!
// treat as EOF.
if (toRead <= 0)
return this.push(null);
// the actual read.
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}
this.push(b);
}
});
// move the pool positions, and internal position for reading.
if (this.pos !== undefined)
this.pos += toRead;
pool.used += toRead;
};
ReadStream.prototype._destroy = function(err, cb) {
const isOpen = typeof this.fd !== 'number';
if (isOpen) {
this.once('open', closeFsStream.bind(null, this, cb, err));
return;
}
closeFsStream(this, cb, err);
this.fd = null;
};
function closeFsStream(stream, cb, err) {
fs.close(stream.fd, (er) => {
er = er || err;
cb(er);
stream.closed = true;
if (!er)
stream.emit('close');
});
}
ReadStream.prototype.close = function(cb) {
this.destroy(null, cb);
};
function WriteStream(path, options) {
if (!(this instanceof WriteStream))
return new WriteStream(path, options);
options = copyObject(getOptions(options, {}));
// for backwards compat do not emit close on destroy.
options.emitClose = false;
Writable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
this.path = getPathFromURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
if (this.start !== undefined) {
if (typeof this.start !== 'number') {
throw new ERR_INVALID_ARG_TYPE('start', 'number', this.start);
}
if (this.start < 0) {
const errVal = `{start: ${this.start}}`;
throw new ERR_OUT_OF_RANGE('start', '>= 0', errVal);
}
this.pos = this.start;
}
if (options.encoding)
this.setDefaultEncoding(options.encoding);
if (typeof this.fd !== 'number')
this.open();
}
util.inherits(WriteStream, Writable);
WriteStream.prototype._final = function(callback) {
if (this.autoClose) {
this.destroy();
}
callback();
};
WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
return;
}
this.fd = fd;
this.emit('open', fd);
this.emit('ready');
});
};
WriteStream.prototype._write = function(data, encoding, cb) {
if (!(data instanceof Buffer)) {
const err = new ERR_INVALID_ARG_TYPE('data', 'Buffer', data);
return this.emit('error', err);
}
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._write(data, encoding, cb);
});
}
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
this.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += data.length;
};
function writev(fd, chunks, position, callback) {
function wrapper(err, written) {
// Retain a reference to chunks so that they can't be GC'ed too soon.
callback(err, written || 0, chunks);
}
const req = new FSReqWrap();
req.oncomplete = wrapper;
writeBuffers(fd, chunks, position, req);
}
WriteStream.prototype._writev = function(data, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._writev(data, cb);
});
}
const self = this;
const len = data.length;
const chunks = new Array(len);
let size = 0;
for (var i = 0; i < len; i++) {
const chunk = data[i].chunk;
chunks[i] = chunk;
size += chunk.length;
}
writev(this.fd, chunks, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += size;
};
WriteStream.prototype._destroy = ReadStream.prototype._destroy;
WriteStream.prototype.close = function(cb) {
if (cb) {
if (this.closed) {
process.nextTick(cb);
return;
} else {
this.on('close', cb);
}
}
// If we are not autoClosing, we should call
// destroy on 'finish'.
if (!this.autoClose) {
this.on('finish', this.destroy.bind(this));
}
// we use end() instead of destroy() because of
// https://github.com/nodejs/node/issues/2006
this.end();
};
// There is no shutdown() for files.
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
module.exports = {
ReadStream,
WriteStream
};

View File

@ -0,0 +1,45 @@
'use strict';
const { Writable } = require('stream');
const { inherits } = require('util');
const { closeSync, writeSync } = require('fs');
function SyncWriteStream(fd, options) {
Writable.call(this);
options = options || {};
this.fd = fd;
this.readable = false;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.on('end', () => this._destroy());
}
inherits(SyncWriteStream, Writable);
SyncWriteStream.prototype._write = function(chunk, encoding, cb) {
writeSync(this.fd, chunk, 0, chunk.length);
cb();
return true;
};
SyncWriteStream.prototype._destroy = function() {
if (this.fd === null) // already destroy()ed
return;
if (this.autoClose)
closeSync(this.fd);
this.fd = null;
return true;
};
SyncWriteStream.prototype.destroySoon =
SyncWriteStream.prototype.destroy = function() {
this._destroy();
this.emit('close');
return true;
};
module.exports = SyncWriteStream;

View File

@ -1,7 +1,6 @@
'use strict';
const { Buffer, kMaxLength } = require('buffer');
const { Writable } = require('stream');
const {
ERR_FS_INVALID_SYMLINK_TYPE,
ERR_INVALID_ARG_TYPE,
@ -11,7 +10,6 @@ const {
ERR_OUT_OF_RANGE
} = require('internal/errors').codes;
const { isUint8Array } = require('internal/util/types');
const fs = require('fs');
const pathModule = require('path');
const util = require('util');
@ -256,45 +254,6 @@ function stringToSymlinkType(type) {
return flags;
}
// Temporary hack for process.stdout and process.stderr when piped to files.
function SyncWriteStream(fd, options) {
Writable.call(this);
options = options || {};
this.fd = fd;
this.readable = false;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.on('end', () => this._destroy());
}
util.inherits(SyncWriteStream, Writable);
SyncWriteStream.prototype._write = function(chunk, encoding, cb) {
fs.writeSync(this.fd, chunk, 0, chunk.length);
cb();
return true;
};
SyncWriteStream.prototype._destroy = function() {
if (this.fd === null) // already destroy()ed
return;
if (this.autoClose)
fs.closeSync(this.fd);
this.fd = null;
return true;
};
SyncWriteStream.prototype.destroySoon =
SyncWriteStream.prototype.destroy = function() {
this._destroy();
this.emit('close');
return true;
};
// converts Date or number to a fractional UNIX timestamp
function toUnixTimestamp(time, name = 'time') {
// eslint-disable-next-line eqeqeq
@ -383,7 +342,6 @@ module.exports = {
stringToFlags,
stringToSymlinkType,
Stats,
SyncWriteStream,
toUnixTimestamp,
validateBuffer,
validateOffsetLengthRead,

168
lib/internal/fs/watchers.js Normal file
View File

@ -0,0 +1,168 @@
'use strict';
const errors = require('internal/errors');
const {
kFsStatsFieldsLength,
StatWatcher: _StatWatcher
} = process.binding('fs');
const { FSEvent } = process.binding('fs_event_wrap');
const { EventEmitter } = require('events');
const {
getStatsFromBinding,
validatePath
} = require('internal/fs/utils');
const { toNamespacedPath } = require('path');
const { validateUint32 } = require('internal/validators');
const { getPathFromURL } = require('internal/url');
const util = require('util');
const assert = require('assert');
function emitStop(self) {
self.emit('stop');
}
function StatWatcher() {
EventEmitter.call(this);
this._handle = new _StatWatcher();
// uv_fs_poll is a little more powerful than ev_stat but we curb it for
// the sake of backwards compatibility
let oldStatus = -1;
this._handle.onchange = (newStatus, stats) => {
if (oldStatus === -1 &&
newStatus === -1 &&
stats[2/* new nlink */] === stats[16/* old nlink */]) return;
oldStatus = newStatus;
this.emit('change', getStatsFromBinding(stats),
getStatsFromBinding(stats, kFsStatsFieldsLength));
};
this._handle.onstop = () => {
process.nextTick(emitStop, this);
};
}
util.inherits(StatWatcher, EventEmitter);
// FIXME(joyeecheung): this method is not documented.
// At the moment if filename is undefined, we
// 1. Throw an Error if it's the first time .start() is called
// 2. Return silently if .start() has already been called
// on a valid filename and the wrap has been initialized
// This method is a noop if the watcher has already been started.
StatWatcher.prototype.start = function(filename, persistent, interval) {
assert(this._handle instanceof _StatWatcher, 'handle must be a StatWatcher');
if (this._handle.isActive) {
return;
}
filename = getPathFromURL(filename);
validatePath(filename, 'filename');
validateUint32(interval, 'interval');
const err = this._handle.start(toNamespacedPath(filename),
persistent, interval);
if (err) {
const error = errors.uvException({
errno: err,
syscall: 'watch',
path: filename
});
error.filename = filename;
throw error;
}
};
// FIXME(joyeecheung): this method is not documented while there is
// another documented fs.unwatchFile(). The counterpart in
// FSWatcher is .close()
// This method is a noop if the watcher has not been started.
StatWatcher.prototype.stop = function() {
assert(this._handle instanceof _StatWatcher, 'handle must be a StatWatcher');
if (!this._handle.isActive) {
return;
}
this._handle.stop();
};
function FSWatcher() {
EventEmitter.call(this);
this._handle = new FSEvent();
this._handle.owner = this;
this._handle.onchange = (status, eventType, filename) => {
// TODO(joyeecheung): we may check self._handle.initialized here
// and return if that is false. This allows us to avoid firing the event
// after the handle is closed, and to fire both UV_RENAME and UV_CHANGE
// if they are set by libuv at the same time.
if (status < 0) {
this._handle.close();
const error = errors.uvException({
errno: status,
syscall: 'watch',
path: filename
});
error.filename = filename;
this.emit('error', error);
} else {
this.emit('change', eventType, filename);
}
};
}
util.inherits(FSWatcher, EventEmitter);
// FIXME(joyeecheung): this method is not documented.
// At the moment if filename is undefined, we
// 1. Throw an Error if it's the first time .start() is called
// 2. Return silently if .start() has already been called
// on a valid filename and the wrap has been initialized
// This method is a noop if the watcher has already been started.
FSWatcher.prototype.start = function(filename,
persistent,
recursive,
encoding) {
assert(this._handle instanceof FSEvent, 'handle must be a FSEvent');
if (this._handle.initialized) {
return;
}
filename = getPathFromURL(filename);
validatePath(filename, 'filename');
const err = this._handle.start(toNamespacedPath(filename),
persistent,
recursive,
encoding);
if (err) {
const error = errors.uvException({
errno: err,
syscall: 'watch',
path: filename
});
error.filename = filename;
throw error;
}
};
// This method is a noop if the watcher has not been started.
FSWatcher.prototype.close = function() {
assert(this._handle instanceof FSEvent, 'handle must be a FSEvent');
if (!this._handle.initialized) {
return;
}
this._handle.close();
process.nextTick(emitCloseNT, this);
};
function emitCloseNT(self) {
self.emit('close');
}
module.exports = {
FSWatcher,
StatWatcher
};

View File

@ -167,8 +167,8 @@ function createWritableStdioStream(fd) {
break;
case 'FILE':
var fs = require('internal/fs/utils');
stream = new fs.SyncWriteStream(fd, { autoClose: false });
const SyncWriteStream = require('internal/fs/sync_write_stream');
stream = new SyncWriteStream(fd, { autoClose: false });
stream._type = 'fs';
break;

View File

@ -104,7 +104,11 @@
'lib/internal/fixed_queue.js',
'lib/internal/freelist.js',
'lib/internal/fs/promises.js',
'lib/internal/fs/read_file_context.js',
'lib/internal/fs/streams.js',
'lib/internal/fs/sync_write_stream.js',
'lib/internal/fs/utils.js',
'lib/internal/fs/watchers.js',
'lib/internal/http.js',
'lib/internal/inspector_async_hook.js',
'lib/internal/linkedlist.js',

View File

@ -1162,6 +1162,27 @@ void DefineSystemConstants(Local<Object> target) {
#ifdef X_OK
NODE_DEFINE_CONSTANT(target, X_OK);
#endif
#ifdef UV_FS_COPYFILE_EXCL
# define COPYFILE_EXCL UV_FS_COPYFILE_EXCL
NODE_DEFINE_CONSTANT(target, UV_FS_COPYFILE_EXCL);
NODE_DEFINE_CONSTANT(target, COPYFILE_EXCL);
# undef COPYFILE_EXCL
#endif
#ifdef UV_FS_COPYFILE_FICLONE
# define COPYFILE_FICLONE UV_FS_COPYFILE_FICLONE
NODE_DEFINE_CONSTANT(target, UV_FS_COPYFILE_FICLONE);
NODE_DEFINE_CONSTANT(target, COPYFILE_FICLONE);
# undef COPYFILE_FICLONE
#endif
#ifdef UV_FS_COPYFILE_FICLONE_FORCE
# define COPYFILE_FICLONE_FORCE UV_FS_COPYFILE_FICLONE_FORCE
NODE_DEFINE_CONSTANT(target, UV_FS_COPYFILE_FICLONE_FORCE);
NODE_DEFINE_CONSTANT(target, COPYFILE_FICLONE_FORCE);
# undef COPYFILE_FICLONE_FORCE
#endif
}
void DefineCryptoConstants(Local<Object> target) {
@ -1305,9 +1326,6 @@ void DefineConstants(v8::Isolate* isolate, Local<Object> target) {
// Define libuv constants.
NODE_DEFINE_CONSTANT(os_constants, UV_UDP_REUSEADDR);
NODE_DEFINE_CONSTANT(fs_constants, UV_FS_COPYFILE_EXCL);
NODE_DEFINE_CONSTANT(fs_constants, UV_FS_COPYFILE_FICLONE);
NODE_DEFINE_CONSTANT(fs_constants, UV_FS_COPYFILE_FICLONE_FORCE);
os_constants->Set(OneByteString(isolate, "dlopen"), dlopen_constants);
os_constants->Set(OneByteString(isolate, "errno"), err_constants);

View File

@ -5,7 +5,7 @@ const common = require('../common');
const assert = require('assert');
const fs = require('fs');
const path = require('path');
const SyncWriteStream = require('internal/fs/utils').SyncWriteStream;
const SyncWriteStream = require('internal/fs/sync_write_stream');
const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

View File

@ -178,7 +178,7 @@ function testError() {
// The sync error, with individual property echoes
/Error: ENOENT: no such file or directory, scandir '.*nonexistent.*'/,
/fs\.readdirSync/,
/Object\.readdirSync/,
"'ENOENT'",
"'scandir'",

View File

@ -20,7 +20,7 @@ if (process.argv[2] === 'child') {
execFile(process.execPath, args, function(err, stdout, stderr) {
assert.strictEqual(err, null);
assert.strictEqual(stdout, '');
if (/WARNING[\s\S]*fs\.readFileSync/.test(stderr))
if (/WARNING[\s\S]*readFileSync/.test(stderr))
cntr++;
if (args[0] === '--trace-sync-io') {
assert.strictEqual(cntr, 1);