2017-10-12 05:17:48 +00:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
const assert = require('assert');
|
|
|
|
const util = require('util');
|
|
|
|
const { Socket } = require('net');
|
2018-09-04 14:13:25 +00:00
|
|
|
const { JSStream } = internalBinding('js_stream');
|
2018-08-06 21:40:30 +00:00
|
|
|
const uv = internalBinding('uv');
|
2017-10-12 05:17:48 +00:00
|
|
|
const debug = util.debuglog('stream_wrap');
|
2018-07-27 12:35:39 +00:00
|
|
|
const { owner_symbol } = require('internal/async_hooks').symbols;
|
2018-03-04 21:16:24 +00:00
|
|
|
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
|
|
|
|
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
|
|
|
|
|
2018-07-27 12:35:39 +00:00
|
|
|
function isClosing() { return this[owner_symbol].isClosing(); }
|
|
|
|
function onreadstart() { return this[owner_symbol].readStart(); }
|
|
|
|
function onreadstop() { return this[owner_symbol].readStop(); }
|
|
|
|
function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
|
|
|
|
function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
|
2017-12-29 15:17:12 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
/* This class serves as a wrapper for when the C++ side of Node wants access
|
|
|
|
* to a standard JS stream. For example, TLS or HTTP do not operate on network
|
|
|
|
* resources conceptually, although that is the common case and what we are
|
|
|
|
* optimizing for; in theory, they are completely composable and can work with
|
|
|
|
* any stream resource they see.
|
|
|
|
*
|
|
|
|
* For the common case, i.e. a TLS socket wrapping around a net.Socket, we
|
|
|
|
* can skip going through the JS layer and let TLS access the raw C++ handle
|
|
|
|
* of a net.Socket. The flipside of this is that, to maintain composability,
|
|
|
|
* we need a way to create "fake" net.Socket instances that call back into a
|
|
|
|
* "real" JavaScript stream. JSStreamWrap is exactly this.
|
|
|
|
*/
|
|
|
|
class JSStreamWrap extends Socket {
|
|
|
|
constructor(stream) {
|
|
|
|
const handle = new JSStream();
|
|
|
|
handle.close = (cb) => {
|
|
|
|
debug('close');
|
|
|
|
this.doClose(cb);
|
|
|
|
};
|
2017-12-29 15:17:12 +00:00
|
|
|
// Inside of the following functions, `this` refers to the handle
|
2018-07-27 12:35:39 +00:00
|
|
|
// and `this[owner_symbol]` refers to this JSStreamWrap instance.
|
2017-12-29 15:17:12 +00:00
|
|
|
handle.isClosing = isClosing;
|
|
|
|
handle.onreadstart = onreadstart;
|
|
|
|
handle.onreadstop = onreadstop;
|
|
|
|
handle.onshutdown = onshutdown;
|
|
|
|
handle.onwrite = onwrite;
|
2017-10-12 05:57:45 +00:00
|
|
|
|
|
|
|
stream.pause();
|
|
|
|
stream.on('error', (err) => this.emit('error', err));
|
|
|
|
const ondata = (chunk) => {
|
|
|
|
if (typeof chunk === 'string' ||
|
|
|
|
stream._readableState.objectMode === true) {
|
|
|
|
// Make sure that no further `data` events will happen.
|
|
|
|
stream.pause();
|
|
|
|
stream.removeListener('data', ondata);
|
|
|
|
|
2018-03-04 21:16:24 +00:00
|
|
|
this.emit('error', new ERR_STREAM_WRAP());
|
2017-10-12 05:17:48 +00:00
|
|
|
return;
|
2017-10-12 05:57:45 +00:00
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
debug('data', chunk.length);
|
|
|
|
if (this._handle)
|
|
|
|
this._handle.readBuffer(chunk);
|
|
|
|
};
|
|
|
|
stream.on('data', ondata);
|
|
|
|
stream.once('end', () => {
|
|
|
|
debug('end');
|
|
|
|
if (this._handle)
|
|
|
|
this._handle.emitEOF();
|
2017-10-12 05:17:48 +00:00
|
|
|
});
|
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
super({ handle, manualStart: true });
|
|
|
|
this.stream = stream;
|
2017-12-29 15:17:12 +00:00
|
|
|
this[kCurrentWriteRequest] = null;
|
|
|
|
this[kCurrentShutdownRequest] = null;
|
2018-04-12 09:15:31 +00:00
|
|
|
this.readable = stream.readable;
|
|
|
|
this.writable = stream.writable;
|
2017-12-29 15:17:12 +00:00
|
|
|
|
|
|
|
// Start reading.
|
2017-10-12 05:57:45 +00:00
|
|
|
this.read(0);
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
// Legacy
|
|
|
|
static get StreamWrap() {
|
|
|
|
return JSStreamWrap;
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
isClosing() {
|
|
|
|
return !this.readable || !this.writable;
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
readStart() {
|
|
|
|
this.stream.resume();
|
|
|
|
return 0;
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
readStop() {
|
|
|
|
this.stream.pause();
|
|
|
|
return 0;
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
doShutdown(req) {
|
2017-12-29 15:17:12 +00:00
|
|
|
assert.strictEqual(this[kCurrentShutdownRequest], null);
|
|
|
|
this[kCurrentShutdownRequest] = req;
|
|
|
|
|
|
|
|
// TODO(addaleax): It might be nice if we could get into a state where
|
|
|
|
// DoShutdown() is not called on streams while a write is still pending.
|
|
|
|
//
|
|
|
|
// Currently, the only part of the code base where that happens is the
|
|
|
|
// TLS implementation, which calls both DoWrite() and DoShutdown() on the
|
|
|
|
// underlying network stream inside of its own DoShutdown() method.
|
|
|
|
// Working around that on the native side is not quite trivial (yet?),
|
|
|
|
// so for now that is supported here.
|
|
|
|
|
|
|
|
if (this[kCurrentWriteRequest] !== null)
|
|
|
|
return this.on('drain', () => this.doShutdown(req));
|
|
|
|
assert.strictEqual(this[kCurrentWriteRequest], null);
|
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
const handle = this._handle;
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2018-02-08 03:59:10 +00:00
|
|
|
setImmediate(() => {
|
|
|
|
// Ensure that write is dispatched asynchronously.
|
|
|
|
this.stream.end(() => {
|
2017-12-29 15:17:12 +00:00
|
|
|
this.finishShutdown(handle, 0);
|
2017-10-12 05:57:45 +00:00
|
|
|
});
|
2017-10-12 05:17:48 +00:00
|
|
|
});
|
2017-10-12 05:57:45 +00:00
|
|
|
return 0;
|
2017-10-12 05:17:48 +00:00
|
|
|
}
|
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
// handle === this._handle except when called from doClose().
|
|
|
|
finishShutdown(handle, errCode) {
|
|
|
|
// The shutdown request might already have been cancelled.
|
|
|
|
if (this[kCurrentShutdownRequest] === null)
|
|
|
|
return;
|
|
|
|
const req = this[kCurrentShutdownRequest];
|
|
|
|
this[kCurrentShutdownRequest] = null;
|
|
|
|
handle.finishShutdown(req, errCode);
|
|
|
|
}
|
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
doWrite(req, bufs) {
|
2017-12-29 15:17:12 +00:00
|
|
|
assert.strictEqual(this[kCurrentWriteRequest], null);
|
|
|
|
assert.strictEqual(this[kCurrentShutdownRequest], null);
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
const handle = this._handle;
|
|
|
|
const self = this;
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
let pending = bufs.length;
|
2017-10-12 05:57:45 +00:00
|
|
|
|
|
|
|
this.stream.cork();
|
2017-12-29 15:17:12 +00:00
|
|
|
for (var i = 0; i < bufs.length; ++i)
|
|
|
|
this.stream.write(bufs[i], done);
|
2017-10-12 05:57:45 +00:00
|
|
|
this.stream.uncork();
|
|
|
|
|
2018-02-08 17:32:32 +00:00
|
|
|
// Only set the request here, because the `write()` calls could throw.
|
|
|
|
this[kCurrentWriteRequest] = req;
|
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
function done(err) {
|
|
|
|
if (!err && --pending !== 0)
|
|
|
|
return;
|
|
|
|
|
|
|
|
// Ensure that this is called once in case of error
|
|
|
|
pending = 0;
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
let errCode = 0;
|
|
|
|
if (err) {
|
2017-12-29 15:17:12 +00:00
|
|
|
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
|
2017-10-12 05:57:45 +00:00
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
// Ensure that write was dispatched
|
2017-12-29 15:17:12 +00:00
|
|
|
setImmediate(() => {
|
|
|
|
self.finishWrite(handle, errCode);
|
2017-10-12 05:57:45 +00:00
|
|
|
});
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
return 0;
|
|
|
|
}
|
2017-10-12 05:17:48 +00:00
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
// handle === this._handle except when called from doClose().
|
|
|
|
finishWrite(handle, errCode) {
|
|
|
|
// The write request might already have been cancelled.
|
|
|
|
if (this[kCurrentWriteRequest] === null)
|
|
|
|
return;
|
|
|
|
const req = this[kCurrentWriteRequest];
|
|
|
|
this[kCurrentWriteRequest] = null;
|
2017-10-12 05:57:45 +00:00
|
|
|
|
2017-12-29 15:17:12 +00:00
|
|
|
handle.finishWrite(req, errCode);
|
2017-10-12 05:57:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
doClose(cb) {
|
|
|
|
const handle = this._handle;
|
|
|
|
|
|
|
|
setImmediate(() => {
|
|
|
|
// Should be already set by net.js
|
|
|
|
assert.strictEqual(this._handle, null);
|
2017-12-29 15:17:12 +00:00
|
|
|
|
|
|
|
this.finishWrite(handle, uv.UV_ECANCELED);
|
|
|
|
this.finishShutdown(handle, uv.UV_ECANCELED);
|
|
|
|
|
2017-10-12 05:57:45 +00:00
|
|
|
cb();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = JSStreamWrap;
|