http: align OutgoingMessage and ClientRequest destroy

Added .destroyed property to OutgoingMessage and ClientRequest
to align with streams.

Fixed ClientRequest.destroy to dump res and re-use socket in agent
pool aligning it with abort.

PR-URL: https://github.com/nodejs/node/pull/32148
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Robert Nagy 2020-03-08 12:39:50 +01:00
parent de8fab95a8
commit 173d044d09
7 changed files with 155 additions and 38 deletions

View File

@ -29,6 +29,7 @@ const {
ObjectAssign,
ObjectKeys,
ObjectSetPrototypeOf,
Symbol
} = primordials;
const net = require('net');
@ -65,6 +66,7 @@ const {
} = require('internal/dtrace');
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
const kError = Symbol('kError');
function validateHost(host, name) {
if (host !== null && host !== undefined && typeof host !== 'string') {
@ -337,10 +339,19 @@ ClientRequest.prototype._implicitHeader = function _implicitHeader() {
};
ClientRequest.prototype.abort = function abort() {
if (!this.aborted) {
process.nextTick(emitAbortNT, this);
if (this.aborted) {
return;
}
this.aborted = true;
process.nextTick(emitAbortNT, this);
this.destroy();
};
ClientRequest.prototype.destroy = function destroy(err) {
if (this.destroyed) {
return;
}
this.destroyed = true;
// If we're aborting, we don't care about any more response data.
if (this.res) {
@ -350,11 +361,29 @@ ClientRequest.prototype.abort = function abort() {
// In the event that we don't have a socket, we will pop out of
// the request queue through handling in onSocket.
if (this.socket) {
// in-progress
this.socket.destroy();
_destroy(this, this.socket, err);
} else if (err) {
this[kError] = err;
}
};
function _destroy(req, socket, err) {
// TODO (ronag): Check if socket was used at all (e.g. headersSent) and
// re-use it in that case. `req.socket` just checks whether the socket was
// assigned to the request and *might* have been used.
if (!req.agent || req.socket) {
socket.destroy(err);
} else {
socket.emit('free');
if (!req.aborted && !err) {
err = connResetException('socket hang up');
}
if (err) {
req.emit('error', err);
}
req.emit('close');
}
}
function emitAbortNT(req) {
req.emit('abort');
@ -750,14 +779,8 @@ ClientRequest.prototype.onSocket = function onSocket(socket) {
};
function onSocketNT(req, socket) {
if (req.aborted) {
// If we were aborted while waiting for a socket, skip the whole thing.
if (!req.agent) {
socket.destroy();
} else {
req.emit('close');
socket.emit('free');
}
if (req.destroyed) {
_destroy(req, socket, req[kError]);
} else {
tickOnSocket(req, socket);
}

View File

@ -93,6 +93,7 @@ function OutgoingMessage() {
this.outputSize = 0;
this.writable = true;
this.destroyed = false;
this._last = false;
this.chunkedEncoding = false;
@ -277,6 +278,11 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
// any messages, before ever calling this. In that case, just skip
// it, since something else is destroying this connection anyway.
OutgoingMessage.prototype.destroy = function destroy(error) {
if (this.destroyed) {
return;
}
this.destroyed = true;
if (this.socket) {
this.socket.destroy(error);
} else {

View File

@ -163,7 +163,6 @@ function isRequest(stream) {
// Normalize destroy for legacy.
function destroyer(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);

View File

@ -0,0 +1,71 @@
'use strict';
const common = require('../common');
const http = require('http');
const assert = require('assert');
{
// abort
const server = http.createServer(common.mustCall((req, res) => {
res.end('Hello');
}));
server.listen(0, common.mustCall(() => {
const options = { port: server.address().port };
const req = http.get(options, common.mustCall((res) => {
res.on('data', (data) => {
req.abort();
assert.strictEqual(req.aborted, true);
assert.strictEqual(req.destroyed, true);
server.close();
});
}));
req.on('error', common.mustNotCall());
assert.strictEqual(req.aborted, false);
assert.strictEqual(req.destroyed, false);
}));
}
{
// destroy + res
const server = http.createServer(common.mustCall((req, res) => {
res.end('Hello');
}));
server.listen(0, common.mustCall(() => {
const options = { port: server.address().port };
const req = http.get(options, common.mustCall((res) => {
res.on('data', (data) => {
req.destroy();
assert.strictEqual(req.aborted, false);
assert.strictEqual(req.destroyed, true);
server.close();
});
}));
req.on('error', common.mustNotCall());
assert.strictEqual(req.aborted, false);
assert.strictEqual(req.destroyed, false);
}));
}
{
// destroy
const server = http.createServer(common.mustNotCall((req, res) => {
}));
server.listen(0, common.mustCall(() => {
const options = { port: server.address().port };
const req = http.get(options, common.mustNotCall());
req.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ECONNRESET');
server.close();
}));
assert.strictEqual(req.aborted, false);
assert.strictEqual(req.destroyed, false);
req.destroy();
assert.strictEqual(req.aborted, false);
assert.strictEqual(req.destroyed, true);
}));
}

View File

@ -3,34 +3,45 @@ const common = require('../common');
const assert = require('assert');
const http = require('http');
let socketsCreated = 0;
class Agent extends http.Agent {
createConnection(options, oncreate) {
const socket = super.createConnection(options, oncreate);
socketsCreated++;
return socket;
for (const destroyer of ['destroy', 'abort']) {
let socketsCreated = 0;
class Agent extends http.Agent {
createConnection(options, oncreate) {
const socket = super.createConnection(options, oncreate);
socketsCreated++;
return socket;
}
}
}
const server = http.createServer((req, res) => res.end());
const server = http.createServer((req, res) => res.end());
server.listen(0, common.mustCall(() => {
const port = server.address().port;
const agent = new Agent({
keepAlive: true,
maxSockets: 1
});
server.listen(0, common.mustCall(() => {
const port = server.address().port;
const agent = new Agent({
keepAlive: true,
maxSockets: 1
});
http.get({ agent, port }, (res) => res.resume());
http.get({ agent, port }, (res) => res.resume());
const req = http.get({ agent, port }, common.mustNotCall());
req.abort();
const req = http.get({ agent, port }, common.mustNotCall());
req[destroyer]();
http.get({ agent, port }, common.mustCall((res) => {
res.resume();
assert.strictEqual(socketsCreated, 1);
agent.destroy();
server.close();
if (destroyer === 'destroy') {
req.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ECONNRESET');
}));
} else {
req.on('error', common.mustNotCall());
}
http.get({ agent, port }, common.mustCall((res) => {
res.resume();
assert.strictEqual(socketsCreated, 1);
agent.destroy();
server.close();
}));
}));
}));
}

View File

@ -14,12 +14,12 @@ server.listen(0, common.mustCall(() => {
const req = http.get({ port: server.address().port }, common.mustNotCall());
let errorEmitted = false;
req.on('error', (err) => {
req.on('error', common.mustCall((err) => {
errorEmitted = true;
assert.strictEqual(err.constructor, Error);
assert.strictEqual(err.message, 'socket hang up');
assert.strictEqual(err.code, 'ECONNRESET');
});
}));
req.on('close', common.mustCall(() => {
assert.strictEqual(errorEmitted, true);

View File

@ -122,3 +122,10 @@ assert.throws(() => {
name: 'TypeError',
message: 'Invalid character in trailer content ["404"]'
});
{
const outgoingMessage = new OutgoingMessage();
assert.strictEqual(outgoingMessage.destroyed, false);
outgoingMessage.destroy();
assert.strictEqual(outgoingMessage.destroyed, true);
}