mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
lib: implement async_hooks API in core
Implement async_hooks support in the following: * fatalException handler * process.nextTick * Timers * net/dgram/http PR-URL: https://github.com/nodejs/node/pull/12892 Ref: https://github.com/nodejs/node/pull/11883 Ref: https://github.com/nodejs/node/pull/8531 Reviewed-By: Andreas Madsen <amwebdk@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Sam Roberts <vieuxtech@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Refael Ackermann <refack@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
This commit is contained in:
parent
7e3a3c962f
commit
4a7233c178
@ -25,6 +25,8 @@ const net = require('net');
|
||||
const util = require('util');
|
||||
const EventEmitter = require('events');
|
||||
const debug = util.debuglog('http');
|
||||
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
|
||||
const nextTick = require('internal/process/next_tick').nextTick;
|
||||
|
||||
// New Agent code.
|
||||
|
||||
@ -93,6 +95,7 @@ function Agent(options) {
|
||||
self.freeSockets[name] = freeSockets;
|
||||
socket.setKeepAlive(true, self.keepAliveMsecs);
|
||||
socket.unref();
|
||||
socket[async_id_symbol] = -1;
|
||||
socket._httpMessage = null;
|
||||
self.removeSocket(socket, options);
|
||||
freeSockets.push(socket);
|
||||
@ -163,6 +166,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
|
||||
if (freeLen) {
|
||||
// we have a free socket, so use that.
|
||||
var socket = this.freeSockets[name].shift();
|
||||
// Assign the handle a new asyncId and run any init() hooks.
|
||||
socket._handle.asyncReset();
|
||||
debug('have free socket');
|
||||
|
||||
// don't leak
|
||||
@ -177,7 +182,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
|
||||
// If we are under maxSockets create a new one.
|
||||
this.createSocket(req, options, function(err, newSocket) {
|
||||
if (err) {
|
||||
process.nextTick(function() {
|
||||
nextTick(newSocket._handle.getAsyncId(), function() {
|
||||
req.emit('error', err);
|
||||
});
|
||||
return;
|
||||
@ -290,7 +295,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
|
||||
// If we have pending requests and a socket gets closed make a new one
|
||||
this.createSocket(req, options, function(err, newSocket) {
|
||||
if (err) {
|
||||
process.nextTick(function() {
|
||||
nextTick(newSocket._handle.getAsyncId(), function() {
|
||||
req.emit('error', err);
|
||||
});
|
||||
return;
|
||||
|
@ -36,6 +36,7 @@ const Agent = require('_http_agent');
|
||||
const Buffer = require('buffer').Buffer;
|
||||
const urlToOptions = require('internal/url').urlToOptions;
|
||||
const outHeadersKey = require('internal/http').outHeadersKey;
|
||||
const nextTick = require('internal/process/next_tick').nextTick;
|
||||
|
||||
// The actual list of disallowed characters in regexp form is more like:
|
||||
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
|
||||
@ -587,9 +588,12 @@ function responseKeepAlive(res, req) {
|
||||
socket.removeListener('close', socketCloseListener);
|
||||
socket.removeListener('error', socketErrorListener);
|
||||
socket.once('error', freeSocketErrorListener);
|
||||
// There are cases where _handle === null. Avoid those. Passing null to
|
||||
// nextTick() will call initTriggerId() to retrieve the id.
|
||||
const asyncId = socket._handle ? socket._handle.getAsyncId() : null;
|
||||
// Mark this socket as available, AFTER user-added end
|
||||
// handlers have a chance to run.
|
||||
process.nextTick(emitFreeNT, socket);
|
||||
nextTick(asyncId, emitFreeNT, socket);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser;
|
||||
const FreeList = require('internal/freelist');
|
||||
const ondrain = require('internal/http').ondrain;
|
||||
const incoming = require('_http_incoming');
|
||||
const emitDestroy = require('async_hooks').emitDestroy;
|
||||
const IncomingMessage = incoming.IncomingMessage;
|
||||
const readStart = incoming.readStart;
|
||||
const readStop = incoming.readStop;
|
||||
@ -211,8 +212,13 @@ function freeParser(parser, req, socket) {
|
||||
parser.incoming = null;
|
||||
parser.outgoing = null;
|
||||
parser[kOnExecute] = null;
|
||||
if (parsers.free(parser) === false)
|
||||
if (parsers.free(parser) === false) {
|
||||
parser.close();
|
||||
} else {
|
||||
// Since the Parser destructor isn't going to run the destroy() callbacks
|
||||
// it needs to be triggered manually.
|
||||
emitDestroy(parser.getAsyncId());
|
||||
}
|
||||
}
|
||||
if (req) {
|
||||
req.parser = null;
|
||||
|
@ -31,6 +31,8 @@ const common = require('_http_common');
|
||||
const checkIsHttpToken = common._checkIsHttpToken;
|
||||
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
|
||||
const outHeadersKey = require('internal/http').outHeadersKey;
|
||||
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
|
||||
const nextTick = require('internal/process/next_tick').nextTick;
|
||||
|
||||
const CRLF = common.CRLF;
|
||||
const debug = common.debug;
|
||||
@ -264,8 +266,9 @@ function _writeRaw(data, encoding, callback) {
|
||||
if (this.output.length) {
|
||||
this._flushOutput(conn);
|
||||
} else if (!data.length) {
|
||||
if (typeof callback === 'function')
|
||||
process.nextTick(callback);
|
||||
if (typeof callback === 'function') {
|
||||
nextTick(this.socket[async_id_symbol], callback);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
// Directly write to socket.
|
||||
@ -623,7 +626,10 @@ const crlf_buf = Buffer.from('\r\n');
|
||||
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
|
||||
if (this.finished) {
|
||||
var err = new Error('write after end');
|
||||
process.nextTick(writeAfterEndNT.bind(this), err, callback);
|
||||
nextTick(this.socket[async_id_symbol],
|
||||
writeAfterEndNT.bind(this),
|
||||
err,
|
||||
callback);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ var processing_hook = false;
|
||||
// Use to temporarily store and updated active_hooks_array if the user enables
|
||||
// or disables a hook while hooks are being processed.
|
||||
var tmp_active_hooks_array = null;
|
||||
// Keep track of the field counds held in tmp_active_hooks_array.
|
||||
// Keep track of the field counts held in tmp_active_hooks_array.
|
||||
var tmp_async_hook_fields = null;
|
||||
|
||||
// Each constant tracks how many callbacks there are for any given step of
|
||||
@ -41,9 +41,9 @@ var tmp_async_hook_fields = null;
|
||||
const { kInit, kBefore, kAfter, kDestroy, kCurrentAsyncId, kCurrentTriggerId,
|
||||
kAsyncUidCntr, kInitTriggerId } = async_wrap.constants;
|
||||
|
||||
const { async_id_symbol, trigger_id_symbol } = async_wrap;
|
||||
|
||||
// Used in AsyncHook and AsyncEvent.
|
||||
const async_id_symbol = Symbol('_asyncId');
|
||||
const trigger_id_symbol = Symbol('_triggerId');
|
||||
const init_symbol = Symbol('init');
|
||||
const before_symbol = Symbol('before');
|
||||
const after_symbol = Symbol('after');
|
||||
|
12
lib/dgram.js
12
lib/dgram.js
@ -25,7 +25,10 @@ const assert = require('assert');
|
||||
const Buffer = require('buffer').Buffer;
|
||||
const util = require('util');
|
||||
const EventEmitter = require('events');
|
||||
const setInitTriggerId = require('async_hooks').setInitTriggerId;
|
||||
const UV_UDP_REUSEADDR = process.binding('constants').os.UV_UDP_REUSEADDR;
|
||||
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
|
||||
const nextTick = require('internal/process/next_tick').nextTick;
|
||||
|
||||
const UDP = process.binding('udp_wrap').UDP;
|
||||
const SendWrap = process.binding('udp_wrap').SendWrap;
|
||||
@ -111,6 +114,7 @@ function Socket(type, listener) {
|
||||
this._handle = handle;
|
||||
this._receiving = false;
|
||||
this._bindState = BIND_STATE_UNBOUND;
|
||||
this[async_id_symbol] = this._handle.getAsyncId();
|
||||
this.type = type;
|
||||
this.fd = null; // compatibility hack
|
||||
|
||||
@ -432,6 +436,10 @@ function doSend(ex, self, ip, list, address, port, callback) {
|
||||
req.callback = callback;
|
||||
req.oncomplete = afterSend;
|
||||
}
|
||||
// node::SendWrap isn't instantiated and attached to the JS instance of
|
||||
// SendWrap above until send() is called. So don't set the init trigger id
|
||||
// until now.
|
||||
setInitTriggerId(self[async_id_symbol]);
|
||||
var err = self._handle.send(req,
|
||||
list,
|
||||
list.length,
|
||||
@ -441,7 +449,7 @@ function doSend(ex, self, ip, list, address, port, callback) {
|
||||
if (err && callback) {
|
||||
// don't emit as error, dgram_legacy.js compatibility
|
||||
const ex = exceptionWithHostPort(err, 'send', address, port);
|
||||
process.nextTick(callback, ex);
|
||||
nextTick(self[async_id_symbol], callback, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -468,7 +476,7 @@ Socket.prototype.close = function(callback) {
|
||||
this._stopReceiving();
|
||||
this._handle.close();
|
||||
this._handle = null;
|
||||
process.nextTick(socketCloseNT, this);
|
||||
nextTick(this[async_id_symbol], socketCloseNT, this);
|
||||
|
||||
return this;
|
||||
};
|
||||
|
24
lib/internal/bootstrap_node.js
vendored
24
lib/internal/bootstrap_node.js
vendored
@ -292,10 +292,20 @@
|
||||
}
|
||||
|
||||
function setupProcessFatal() {
|
||||
const async_wrap = process.binding('async_wrap');
|
||||
// Arrays containing hook flags and ids for async_hook calls.
|
||||
const { async_hook_fields, async_uid_fields } = async_wrap;
|
||||
// Internal functions needed to manipulate the stack.
|
||||
const { clearIdStack, popAsyncIds } = async_wrap;
|
||||
const { kAfter, kCurrentAsyncId, kInitTriggerId } = async_wrap.constants;
|
||||
|
||||
process._fatalException = function(er) {
|
||||
var caught;
|
||||
|
||||
// It's possible that kInitTriggerId was set for a constructor call that
|
||||
// threw and was never cleared. So clear it now.
|
||||
async_uid_fields[kInitTriggerId] = 0;
|
||||
|
||||
if (process.domain && process.domain._errorHandler)
|
||||
caught = process.domain._errorHandler(er);
|
||||
|
||||
@ -314,9 +324,21 @@
|
||||
// nothing to be done about it at this point.
|
||||
}
|
||||
|
||||
// if we handled an error, then make sure any ticks get processed
|
||||
} else {
|
||||
// If we handled an error, then make sure any ticks get processed
|
||||
NativeModule.require('timers').setImmediate(process._tickCallback);
|
||||
|
||||
// Emit the after() hooks now that the exception has been handled.
|
||||
if (async_hook_fields[kAfter] > 0) {
|
||||
do {
|
||||
NativeModule.require('async_hooks').emitAfter(
|
||||
async_uid_fields[kCurrentAsyncId]);
|
||||
// popAsyncIds() returns true if there are more ids on the stack.
|
||||
} while (popAsyncIds(async_uid_fields[kCurrentAsyncId]));
|
||||
// Or completely empty the id stack.
|
||||
} else {
|
||||
clearIdStack();
|
||||
}
|
||||
}
|
||||
|
||||
return caught;
|
||||
|
@ -7,11 +7,26 @@
|
||||
const kMaxCallbacksPerLoop = 1e4;
|
||||
|
||||
exports.setup = setupNextTick;
|
||||
// Will be overwritten when setupNextTick() is called.
|
||||
exports.nextTick = null;
|
||||
|
||||
function setupNextTick() {
|
||||
const async_wrap = process.binding('async_wrap');
|
||||
const async_hooks = require('async_hooks');
|
||||
const promises = require('internal/process/promises');
|
||||
const errors = require('internal/errors');
|
||||
const emitPendingUnhandledRejections = promises.setup(scheduleMicrotasks);
|
||||
const initTriggerId = async_hooks.initTriggerId;
|
||||
// Two arrays that share state between C++ and JS.
|
||||
const { async_hook_fields, async_uid_fields } = async_wrap;
|
||||
// Used to change the state of the async id stack.
|
||||
const { pushAsyncIds, popAsyncIds } = async_wrap;
|
||||
// The needed emit*() functions.
|
||||
const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks;
|
||||
// Grab the constants necessary for working with internal arrays.
|
||||
const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr, kInitTriggerId } =
|
||||
async_wrap.constants;
|
||||
const { async_id_symbol, trigger_id_symbol } = async_wrap;
|
||||
var nextTickQueue = [];
|
||||
var microtasksScheduled = false;
|
||||
|
||||
@ -27,6 +42,9 @@ function setupNextTick() {
|
||||
process._tickCallback = _tickCallback;
|
||||
process._tickDomainCallback = _tickDomainCallback;
|
||||
|
||||
// Set the nextTick() function for internal usage.
|
||||
exports.nextTick = internalNextTick;
|
||||
|
||||
// This tickInfo thing is used so that the C++ code in src/node.cc
|
||||
// can have easy access to our nextTick state, and avoid unnecessary
|
||||
// calls into JS land.
|
||||
@ -51,10 +69,13 @@ function setupNextTick() {
|
||||
if (microtasksScheduled)
|
||||
return;
|
||||
|
||||
nextTickQueue.push({
|
||||
callback: runMicrotasksCallback,
|
||||
domain: null
|
||||
});
|
||||
const tickObject =
|
||||
new TickObject(runMicrotasksCallback, undefined, null);
|
||||
// For the moment all microtasks come from the void until the PromiseHook
|
||||
// API is implemented.
|
||||
tickObject[async_id_symbol] = 0;
|
||||
tickObject[trigger_id_symbol] = 0;
|
||||
nextTickQueue.push(tickObject);
|
||||
|
||||
tickInfo[kLength]++;
|
||||
microtasksScheduled = true;
|
||||
@ -89,20 +110,58 @@ function setupNextTick() {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(trevnorris): Using std::stack of Environment::AsyncHooks::ids_stack_
|
||||
// is much slower here than was the Float64Array stack used in a previous
|
||||
// implementation. Problem is the Float64Array stack was a bit brittle.
|
||||
// Investigate how to harden that implementation and possibly reintroduce it.
|
||||
function nextTickEmitBefore(asyncId, triggerId) {
|
||||
if (async_hook_fields[kBefore] > 0)
|
||||
emitBefore(asyncId, triggerId);
|
||||
else
|
||||
pushAsyncIds(asyncId, triggerId);
|
||||
}
|
||||
|
||||
function nextTickEmitAfter(asyncId) {
|
||||
if (async_hook_fields[kAfter] > 0)
|
||||
emitAfter(asyncId);
|
||||
else
|
||||
popAsyncIds(asyncId);
|
||||
}
|
||||
|
||||
// Run callbacks that have no domain.
|
||||
// Using domains will cause this to be overridden.
|
||||
function _tickCallback() {
|
||||
var callback, args, tock;
|
||||
|
||||
do {
|
||||
while (tickInfo[kIndex] < tickInfo[kLength]) {
|
||||
tock = nextTickQueue[tickInfo[kIndex]++];
|
||||
callback = tock.callback;
|
||||
args = tock.args;
|
||||
const tock = nextTickQueue[tickInfo[kIndex]++];
|
||||
const callback = tock.callback;
|
||||
const args = tock.args;
|
||||
|
||||
// CHECK(Number.isSafeInteger(tock[async_id_symbol]))
|
||||
// CHECK(tock[async_id_symbol] > 0)
|
||||
// CHECK(Number.isSafeInteger(tock[trigger_id_symbol]))
|
||||
// CHECK(tock[trigger_id_symbol] > 0)
|
||||
|
||||
nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]);
|
||||
// emitDestroy() places the async_id_symbol into an asynchronous queue
|
||||
// that calls the destroy callback in the future. It's called before
|
||||
// calling tock.callback so destroy will be called even if the callback
|
||||
// throws an exception that is handles by 'uncaughtException' or a
|
||||
// domain.
|
||||
// TODO(trevnorris): This is a bit of a hack. It relies on the fact
|
||||
// that nextTick() doesn't allow the event loop to proceed, but if
|
||||
// any async hooks are enabled during the callback's execution then
|
||||
// this tock's after hook will be called, but not its destroy hook.
|
||||
if (async_hook_fields[kDestroy] > 0)
|
||||
emitDestroy(tock[async_id_symbol]);
|
||||
|
||||
// Using separate callback execution functions allows direct
|
||||
// callback invocation with small numbers of arguments to avoid the
|
||||
// performance hit associated with using `fn.apply()`
|
||||
_combinedTickCallback(args, callback);
|
||||
|
||||
nextTickEmitAfter(tock[async_id_symbol]);
|
||||
|
||||
if (kMaxCallbacksPerLoop < tickInfo[kIndex])
|
||||
tickDone();
|
||||
}
|
||||
@ -113,20 +172,33 @@ function setupNextTick() {
|
||||
}
|
||||
|
||||
function _tickDomainCallback() {
|
||||
var callback, domain, args, tock;
|
||||
|
||||
do {
|
||||
while (tickInfo[kIndex] < tickInfo[kLength]) {
|
||||
tock = nextTickQueue[tickInfo[kIndex]++];
|
||||
callback = tock.callback;
|
||||
domain = tock.domain;
|
||||
args = tock.args;
|
||||
const tock = nextTickQueue[tickInfo[kIndex]++];
|
||||
const callback = tock.callback;
|
||||
const domain = tock.domain;
|
||||
const args = tock.args;
|
||||
if (domain)
|
||||
domain.enter();
|
||||
|
||||
// CHECK(Number.isSafeInteger(tock[async_id_symbol]))
|
||||
// CHECK(tock[async_id_symbol] > 0)
|
||||
// CHECK(Number.isSafeInteger(tock[trigger_id_symbol]))
|
||||
// CHECK(tock[trigger_id_symbol] > 0)
|
||||
|
||||
nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]);
|
||||
// TODO(trevnorris): See comment in _tickCallback() as to why this
|
||||
// isn't a good solution.
|
||||
if (async_hook_fields[kDestroy] > 0)
|
||||
emitDestroy(tock[async_id_symbol]);
|
||||
|
||||
// Using separate callback execution functions allows direct
|
||||
// callback invocation with small numbers of arguments to avoid the
|
||||
// performance hit associated with using `fn.apply()`
|
||||
_combinedTickCallback(args, callback);
|
||||
|
||||
nextTickEmitAfter(tock[async_id_symbol]);
|
||||
|
||||
if (kMaxCallbacksPerLoop < tickInfo[kIndex])
|
||||
tickDone();
|
||||
if (domain)
|
||||
@ -138,6 +210,25 @@ function setupNextTick() {
|
||||
} while (tickInfo[kLength] !== 0);
|
||||
}
|
||||
|
||||
function TickObject(callback, args, domain) {
|
||||
this.callback = callback;
|
||||
this.domain = domain;
|
||||
this.args = args;
|
||||
this[async_id_symbol] = -1;
|
||||
this[trigger_id_symbol] = -1;
|
||||
}
|
||||
|
||||
function setupInit(tickObject, triggerId) {
|
||||
tickObject[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
|
||||
tickObject[trigger_id_symbol] = triggerId || initTriggerId();
|
||||
if (async_hook_fields[kInit] > 0) {
|
||||
emitInit(tickObject[async_id_symbol],
|
||||
'TickObject',
|
||||
tickObject[trigger_id_symbol],
|
||||
tickObject);
|
||||
}
|
||||
}
|
||||
|
||||
function nextTick(callback) {
|
||||
if (typeof callback !== 'function')
|
||||
throw new errors.TypeError('ERR_INVALID_CALLBACK');
|
||||
@ -152,11 +243,33 @@ function setupNextTick() {
|
||||
args[i - 1] = arguments[i];
|
||||
}
|
||||
|
||||
nextTickQueue.push({
|
||||
callback,
|
||||
domain: process.domain || null,
|
||||
args
|
||||
});
|
||||
var obj = new TickObject(callback, args, process.domain || null);
|
||||
setupInit(obj, null);
|
||||
nextTickQueue.push(obj);
|
||||
tickInfo[kLength]++;
|
||||
}
|
||||
|
||||
function internalNextTick(triggerId, callback) {
|
||||
if (typeof callback !== 'function')
|
||||
throw new TypeError('callback is not a function');
|
||||
// CHECK(Number.isSafeInteger(triggerId) || triggerId === null)
|
||||
// CHECK(triggerId > 0 || triggerId === null)
|
||||
|
||||
if (process._exiting)
|
||||
return;
|
||||
|
||||
var args;
|
||||
if (arguments.length > 2) {
|
||||
args = new Array(arguments.length - 2);
|
||||
for (var i = 2; i < arguments.length; i++)
|
||||
args[i - 2] = arguments[i];
|
||||
}
|
||||
|
||||
var obj = new TickObject(callback, args, process.domain || null);
|
||||
setupInit(obj, triggerId);
|
||||
// The call to initTriggerId() was skipped, so clear kInitTriggerId.
|
||||
async_uid_fields[kInitTriggerId] = 0;
|
||||
nextTickQueue.push(obj);
|
||||
tickInfo[kLength]++;
|
||||
}
|
||||
}
|
||||
|
45
lib/net.js
45
lib/net.js
@ -39,6 +39,9 @@ const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
|
||||
const PipeConnectWrap = process.binding('pipe_wrap').PipeConnectWrap;
|
||||
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
|
||||
const WriteWrap = process.binding('stream_wrap').WriteWrap;
|
||||
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
|
||||
const { newUid, setInitTriggerId } = require('async_hooks');
|
||||
const nextTick = require('internal/process/next_tick').nextTick;
|
||||
|
||||
var cluster;
|
||||
var dns;
|
||||
@ -57,6 +60,12 @@ function createHandle(fd) {
|
||||
}
|
||||
|
||||
|
||||
function getNewAsyncId(handle) {
|
||||
return (!handle || typeof handle.getAsyncId !== 'function') ?
|
||||
newUid() : handle.getAsyncId();
|
||||
}
|
||||
|
||||
|
||||
const debug = util.debuglog('net');
|
||||
|
||||
function isPipeName(s) {
|
||||
@ -147,6 +156,7 @@ function initSocketHandle(self) {
|
||||
if (self._handle) {
|
||||
self._handle.owner = self;
|
||||
self._handle.onread = onread;
|
||||
self[async_id_symbol] = getNewAsyncId(self._handle);
|
||||
|
||||
// If handle doesn't support writev - neither do we
|
||||
if (!self._handle.writev)
|
||||
@ -162,6 +172,10 @@ function Socket(options) {
|
||||
if (!(this instanceof Socket)) return new Socket(options);
|
||||
|
||||
this.connecting = false;
|
||||
// Problem with this is that users can supply their own handle, that may not
|
||||
// have _handle.getAsyncId(). In this case an[async_id_symbol] should
|
||||
// probably be supplied by async_hooks.
|
||||
this[async_id_symbol] = -1;
|
||||
this._hadError = false;
|
||||
this._handle = null;
|
||||
this._parent = null;
|
||||
@ -176,9 +190,11 @@ function Socket(options) {
|
||||
|
||||
if (options.handle) {
|
||||
this._handle = options.handle; // private
|
||||
this[async_id_symbol] = getNewAsyncId(this._handle);
|
||||
} else if (options.fd !== undefined) {
|
||||
this._handle = createHandle(options.fd);
|
||||
this._handle.open(options.fd);
|
||||
this[async_id_symbol] = this._handle.getAsyncId();
|
||||
// options.fd can be string (since it is user-defined),
|
||||
// so changing this to === would be semver-major
|
||||
// See: https://github.com/nodejs/node/pull/11513
|
||||
@ -264,6 +280,10 @@ function onSocketFinish() {
|
||||
var req = new ShutdownWrap();
|
||||
req.oncomplete = afterShutdown;
|
||||
req.handle = this._handle;
|
||||
// node::ShutdownWrap isn't instantiated and attached to the JS instance of
|
||||
// ShutdownWrap above until shutdown() is called. So don't set the init
|
||||
// trigger id until now.
|
||||
setInitTriggerId(this[async_id_symbol]);
|
||||
var err = this._handle.shutdown(req);
|
||||
|
||||
if (err)
|
||||
@ -329,7 +349,7 @@ function writeAfterFIN(chunk, encoding, cb) {
|
||||
// TODO: defer error events consistently everywhere, not just the cb
|
||||
this.emit('error', er);
|
||||
if (typeof cb === 'function') {
|
||||
process.nextTick(cb, er);
|
||||
nextTick(this[async_id_symbol], cb, er);
|
||||
}
|
||||
}
|
||||
|
||||
@ -887,6 +907,10 @@ function internalConnect(
|
||||
req.localAddress = localAddress;
|
||||
req.localPort = localPort;
|
||||
|
||||
// node::TCPConnectWrap isn't instantiated and attached to the JS instance
|
||||
// of TCPConnectWrap above until connect() is called. So don't set the init
|
||||
// trigger id until now.
|
||||
setInitTriggerId(self[async_id_symbol]);
|
||||
if (addressType === 4)
|
||||
err = self._handle.connect(req, address, port);
|
||||
else
|
||||
@ -896,6 +920,10 @@ function internalConnect(
|
||||
const req = new PipeConnectWrap();
|
||||
req.address = address;
|
||||
req.oncomplete = afterConnect;
|
||||
// node::PipeConnectWrap isn't instantiated and attached to the JS instance
|
||||
// of PipeConnectWrap above until connect() is called. So don't set the
|
||||
// init trigger id until now.
|
||||
setInitTriggerId(self[async_id_symbol]);
|
||||
err = self._handle.connect(req, address, afterConnect);
|
||||
}
|
||||
|
||||
@ -1020,6 +1048,7 @@ function lookupAndConnect(self, options) {
|
||||
debug('connect: dns options', dnsopts);
|
||||
self._host = host;
|
||||
var lookup = options.lookup || dns.lookup;
|
||||
setInitTriggerId(self[async_id_symbol]);
|
||||
lookup(host, dnsopts, function emitLookup(err, ip, addressType) {
|
||||
self.emit('lookup', err, ip, addressType, host);
|
||||
|
||||
@ -1167,6 +1196,7 @@ function Server(options, connectionListener) {
|
||||
configurable: true, enumerable: false
|
||||
});
|
||||
|
||||
this[async_id_symbol] = -1;
|
||||
this._handle = null;
|
||||
this._usingSlaves = false;
|
||||
this._slaves = [];
|
||||
@ -1274,6 +1304,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
|
||||
this._handle = rval;
|
||||
}
|
||||
|
||||
this[async_id_symbol] = getNewAsyncId(this._handle);
|
||||
this._handle.onconnection = onconnection;
|
||||
this._handle.owner = this;
|
||||
|
||||
@ -1286,7 +1317,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
|
||||
var ex = exceptionWithHostPort(err, 'listen', address, port);
|
||||
this._handle.close();
|
||||
this._handle = null;
|
||||
process.nextTick(emitErrorNT, this, ex);
|
||||
nextTick(this[async_id_symbol], emitErrorNT, this, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1297,7 +1328,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
|
||||
if (this._unref)
|
||||
this.unref();
|
||||
|
||||
process.nextTick(emitListeningNT, this);
|
||||
nextTick(this[async_id_symbol], emitListeningNT, this);
|
||||
}
|
||||
|
||||
Server.prototype._listen2 = setupListenHandle; // legacy alias
|
||||
@ -1398,6 +1429,7 @@ Server.prototype.listen = function() {
|
||||
// (handle[, backlog][, cb]) where handle is an object with a handle
|
||||
if (options instanceof TCP) {
|
||||
this._handle = options;
|
||||
this[async_id_symbol] = this._handle.getAsyncId();
|
||||
listenInCluster(this, null, -1, -1, backlogFromArgs);
|
||||
return this;
|
||||
}
|
||||
@ -1521,8 +1553,10 @@ function onconnection(err, clientHandle) {
|
||||
|
||||
|
||||
Server.prototype.getConnections = function(cb) {
|
||||
const self = this;
|
||||
|
||||
function end(err, connections) {
|
||||
process.nextTick(cb, err, connections);
|
||||
nextTick(self[async_id_symbol], cb, err, connections);
|
||||
}
|
||||
|
||||
if (!this._usingSlaves) {
|
||||
@ -1597,7 +1631,8 @@ Server.prototype._emitCloseIfDrained = function() {
|
||||
return;
|
||||
}
|
||||
|
||||
process.nextTick(emitCloseNT, this);
|
||||
const asyncId = this._handle ? this[async_id_symbol] : null;
|
||||
nextTick(asyncId, emitCloseNT, this);
|
||||
};
|
||||
|
||||
|
||||
|
107
lib/timers.js
107
lib/timers.js
@ -21,14 +21,29 @@
|
||||
|
||||
'use strict';
|
||||
|
||||
const async_wrap = process.binding('async_wrap');
|
||||
const TimerWrap = process.binding('timer_wrap').Timer;
|
||||
const L = require('internal/linkedlist');
|
||||
const internalUtil = require('internal/util');
|
||||
const { createPromise, promiseResolve } = process.binding('util');
|
||||
const async_hooks = require('async_hooks');
|
||||
const assert = require('assert');
|
||||
const util = require('util');
|
||||
const debug = util.debuglog('timer');
|
||||
const kOnTimeout = TimerWrap.kOnTimeout | 0;
|
||||
const initTriggerId = async_hooks.initTriggerId;
|
||||
// Two arrays that share state between C++ and JS.
|
||||
const { async_hook_fields, async_uid_fields } = async_wrap;
|
||||
// Used to change the state of the async id stack.
|
||||
const { pushAsyncIds, popAsyncIds } = async_wrap;
|
||||
// The needed emit*() functions.
|
||||
const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks;
|
||||
// Grab the constants necessary for working with internal arrays.
|
||||
const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr } =
|
||||
async_wrap.constants;
|
||||
// Symbols for storing async id state.
|
||||
const async_id_symbol = Symbol('asyncId');
|
||||
const trigger_id_symbol = Symbol('triggerId');
|
||||
|
||||
// Timeout values > TIMEOUT_MAX are set to 1.
|
||||
const TIMEOUT_MAX = 2147483647; // 2^31-1
|
||||
@ -134,6 +149,22 @@ exports._unrefActive = function(item) {
|
||||
};
|
||||
|
||||
|
||||
function timerEmitBefore(asyncId, triggerId) {
|
||||
if (async_hook_fields[kBefore] > 0)
|
||||
emitBefore(asyncId, triggerId);
|
||||
else
|
||||
pushAsyncIds(asyncId, triggerId);
|
||||
}
|
||||
|
||||
|
||||
function timerEmitAfter(asyncId) {
|
||||
if (async_hook_fields[kAfter] > 0)
|
||||
emitAfter(asyncId);
|
||||
else
|
||||
popAsyncIds(asyncId);
|
||||
}
|
||||
|
||||
|
||||
// The underlying logic for scheduling or re-scheduling a timer.
|
||||
//
|
||||
// Appends a timer onto the end of an existing timers list, or creates a new
|
||||
@ -154,6 +185,14 @@ function insert(item, unrefed) {
|
||||
lists[msecs] = list = createTimersList(msecs, unrefed);
|
||||
}
|
||||
|
||||
if (!item[async_id_symbol] || item._destroyed) {
|
||||
item._destroyed = false;
|
||||
item[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
|
||||
item[trigger_id_symbol] = initTriggerId();
|
||||
if (async_hook_fields[kInit] > 0)
|
||||
emitInit(item[async_id_symbol], 'Timeout', item[trigger_id_symbol], item);
|
||||
}
|
||||
|
||||
L.append(list, item);
|
||||
assert(!L.isEmpty(list)); // list is not empty
|
||||
}
|
||||
@ -218,7 +257,14 @@ function listOnTimeout() {
|
||||
L.remove(timer);
|
||||
assert(timer !== L.peek(list));
|
||||
|
||||
if (!timer._onTimeout) continue;
|
||||
if (!timer._onTimeout) {
|
||||
if (async_hook_fields[kDestroy] > 0 && !timer._destroyed &&
|
||||
typeof timer[async_id_symbol] === 'number') {
|
||||
emitDestroy(timer[async_id_symbol]);
|
||||
timer._destroyed = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
var domain = timer.domain;
|
||||
if (domain) {
|
||||
@ -268,11 +314,25 @@ function listOnTimeout() {
|
||||
// 4.7) what is in this smaller function.
|
||||
function tryOnTimeout(timer, list) {
|
||||
timer._called = true;
|
||||
const timerAsyncId = (typeof timer[async_id_symbol] === 'number') ?
|
||||
timer[async_id_symbol] : null;
|
||||
var threw = true;
|
||||
if (timerAsyncId !== null)
|
||||
timerEmitBefore(timerAsyncId, timer[trigger_id_symbol]);
|
||||
try {
|
||||
ontimeout(timer);
|
||||
threw = false;
|
||||
} finally {
|
||||
if (timerAsyncId !== null) {
|
||||
if (!threw)
|
||||
timerEmitAfter(timerAsyncId);
|
||||
if (!timer._repeat && async_hook_fields[kDestroy] > 0 &&
|
||||
!timer._destroyed) {
|
||||
emitDestroy(timerAsyncId);
|
||||
timer._destroyed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!threw) return;
|
||||
|
||||
// Postpone all later list events to next tick. We need to do this
|
||||
@ -324,6 +384,15 @@ function reuse(item) {
|
||||
|
||||
// Remove a timer. Cancels the timeout and resets the relevant timer properties.
|
||||
const unenroll = exports.unenroll = function(item) {
|
||||
// Fewer checks may be possible, but these cover everything.
|
||||
if (async_hook_fields[kDestroy] > 0 &&
|
||||
item &&
|
||||
typeof item[async_id_symbol] === 'number' &&
|
||||
!item._destroyed) {
|
||||
emitDestroy(item[async_id_symbol]);
|
||||
item._destroyed = true;
|
||||
}
|
||||
|
||||
var handle = reuse(item);
|
||||
if (handle) {
|
||||
debug('unenroll: list empty');
|
||||
@ -516,6 +585,11 @@ function Timeout(after, callback, args) {
|
||||
this._onTimeout = callback;
|
||||
this._timerArgs = args;
|
||||
this._repeat = null;
|
||||
this._destroyed = false;
|
||||
this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
|
||||
this[trigger_id_symbol] = initTriggerId();
|
||||
if (async_hook_fields[kInit] > 0)
|
||||
emitInit(this[async_id_symbol], 'Timeout', this[trigger_id_symbol], this);
|
||||
}
|
||||
|
||||
|
||||
@ -570,6 +644,15 @@ Timeout.prototype.ref = function() {
|
||||
Timeout.prototype.close = function() {
|
||||
this._onTimeout = null;
|
||||
if (this._handle) {
|
||||
// Fewer checks may be possible, but these cover everything.
|
||||
if (async_hook_fields[kDestroy] > 0 &&
|
||||
this &&
|
||||
typeof this[async_id_symbol] === 'number' &&
|
||||
!this._destroyed) {
|
||||
emitDestroy(this[async_id_symbol]);
|
||||
this._destroyed = true;
|
||||
}
|
||||
|
||||
this._idleTimeout = -1;
|
||||
this._handle[kOnTimeout] = null;
|
||||
this._handle.close();
|
||||
@ -673,11 +756,21 @@ function processImmediate() {
|
||||
// 4.7) what is in this smaller function.
|
||||
function tryOnImmediate(immediate, oldTail) {
|
||||
var threw = true;
|
||||
timerEmitBefore(immediate[async_id_symbol], immediate[trigger_id_symbol]);
|
||||
try {
|
||||
// make the actual call outside the try/catch to allow it to be optimized
|
||||
runCallback(immediate);
|
||||
threw = false;
|
||||
} finally {
|
||||
// clearImmediate checks _callback === null for kDestroy hooks.
|
||||
immediate._callback = null;
|
||||
if (!threw)
|
||||
timerEmitAfter(immediate[async_id_symbol]);
|
||||
if (async_hook_fields[kDestroy] > 0 && !immediate._destroyed) {
|
||||
emitDestroy(immediate[async_id_symbol]);
|
||||
immediate._destroyed = true;
|
||||
}
|
||||
|
||||
if (threw && immediate._idleNext) {
|
||||
// Handle any remaining on next tick, assuming we're still alive to do so.
|
||||
const curHead = immediateQueue.head;
|
||||
@ -726,7 +819,12 @@ function Immediate() {
|
||||
this._callback = null;
|
||||
this._argv = null;
|
||||
this._onImmediate = null;
|
||||
this._destroyed = false;
|
||||
this.domain = process.domain;
|
||||
this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
|
||||
this[trigger_id_symbol] = initTriggerId();
|
||||
if (async_hook_fields[kInit] > 0)
|
||||
emitInit(this[async_id_symbol], 'Immediate', this[trigger_id_symbol], this);
|
||||
}
|
||||
|
||||
function setImmediate(callback, arg1, arg2, arg3) {
|
||||
@ -785,6 +883,13 @@ function createImmediate(args, callback) {
|
||||
exports.clearImmediate = function(immediate) {
|
||||
if (!immediate) return;
|
||||
|
||||
if (async_hook_fields[kDestroy] > 0 &&
|
||||
immediate._callback !== null &&
|
||||
!immediate._destroyed) {
|
||||
emitDestroy(immediate[async_id_symbol]);
|
||||
immediate._destroyed = true;
|
||||
}
|
||||
|
||||
immediate._onImmediate = null;
|
||||
|
||||
immediateQueue.remove(immediate);
|
||||
|
@ -45,6 +45,7 @@ using v8::MaybeLocal;
|
||||
using v8::Number;
|
||||
using v8::Object;
|
||||
using v8::RetainedObjectInfo;
|
||||
using v8::Symbol;
|
||||
using v8::TryCatch;
|
||||
using v8::Uint32Array;
|
||||
using v8::Value;
|
||||
@ -325,6 +326,17 @@ void AsyncWrap::Initialize(Local<Object> target,
|
||||
#undef V
|
||||
FORCE_SET_TARGET_FIELD(target, "Providers", async_providers);
|
||||
|
||||
// These Symbols are used throughout node so the stored values on each object
|
||||
// can be accessed easily across files.
|
||||
FORCE_SET_TARGET_FIELD(
|
||||
target,
|
||||
"async_id_symbol",
|
||||
Symbol::New(isolate, FIXED_ONE_BYTE_STRING(isolate, "asyncId")));
|
||||
FORCE_SET_TARGET_FIELD(
|
||||
target,
|
||||
"trigger_id_symbol",
|
||||
Symbol::New(isolate, FIXED_ONE_BYTE_STRING(isolate, "triggerId")));
|
||||
|
||||
#undef FORCE_SET_TARGET_FIELD
|
||||
|
||||
env->set_async_hooks_init_function(Local<Function>());
|
||||
|
@ -23,7 +23,7 @@
|
||||
at *
|
||||
at Promise.then *
|
||||
at Promise.catch *
|
||||
at Immediate.setImmediate (*test*message*unhandled_promise_trace_warnings.js:*)
|
||||
at Immediate.setImmediate [as _onImmediate] (*test*message*unhandled_promise_trace_warnings.js:*)
|
||||
at *
|
||||
at *
|
||||
at *
|
||||
|
43
test/parallel/test-async-wrap-uncaughtexception.js
Normal file
43
test/parallel/test-async-wrap-uncaughtexception.js
Normal file
@ -0,0 +1,43 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const async_hooks = require('async_hooks');
|
||||
const call_log = [0, 0, 0, 0]; // [before, callback, exception, after];
|
||||
let call_id = null;
|
||||
let hooks = null;
|
||||
|
||||
|
||||
process.on('beforeExit', common.mustCall(() => {
|
||||
process.removeAllListeners('uncaughtException');
|
||||
hooks.disable();
|
||||
assert.strictEqual(typeof call_id, 'number');
|
||||
assert.deepStrictEqual(call_log, [1, 1, 1, 1]);
|
||||
}));
|
||||
|
||||
|
||||
hooks = async_hooks.createHook({
|
||||
init(id, type) {
|
||||
if (type === 'RANDOMBYTESREQUEST')
|
||||
call_id = id;
|
||||
},
|
||||
before(id) {
|
||||
if (id === call_id) call_log[0]++;
|
||||
},
|
||||
after(id) {
|
||||
if (id === call_id) call_log[3]++;
|
||||
},
|
||||
}).enable();
|
||||
|
||||
|
||||
process.on('uncaughtException', common.mustCall(() => {
|
||||
assert.strictEqual(call_id, async_hooks.currentId());
|
||||
call_log[2]++;
|
||||
}));
|
||||
|
||||
|
||||
require('crypto').randomBytes(1, common.mustCall(() => {
|
||||
assert.strictEqual(call_id, async_hooks.currentId());
|
||||
call_log[1]++;
|
||||
throw new Error('ah crap');
|
||||
}));
|
Loading…
Reference in New Issue
Block a user