fix(ext/node): refactor http.ServerResponse into function class (#26210)

While testing, I found out that light-my-request relies on
`ServerResponse.connection`, which is deprecated, so I added that and
`socket`, the non deprecated property.

It also relies on an undocumented `_header` property, apparently for
[raw header
processing](https://github.com/fastify/light-my-request/blob/v6.1.0/lib/response.js#L180-L186).
I added it as an empty string, feel free to provide other approaches.

Fixes #19901

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Nicola Bovolato 2024-10-25 00:02:26 +02:00 committed by GitHub
parent fd8bf08271
commit 8dd6177c62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 407 additions and 207 deletions

View File

@ -34,6 +34,7 @@ import {
finished,
Readable as NodeReadable,
Writable as NodeWritable,
WritableOptions as NodeWritableOptions,
} from "node:stream";
import {
kUniqueHeaders,
@ -70,6 +71,7 @@ import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
import { UpgradedConn } from "ext:deno_net/01_net.js";
import { STATUS_CODES } from "node:_http_server";
import { methods as METHODS } from "node:_http_common";
import { deprecate } from "node:util";
const { internalRidSymbol } = core;
const { ArrayIsArray, StringPrototypeToLowerCase } = primordials;
@ -1184,49 +1186,95 @@ function onError(self, error, cb) {
}
}
export class ServerResponse extends NodeWritable {
statusCode = 200;
statusMessage?: string = undefined;
#headers: Record<string, string | string[]> = { __proto__: null };
#hasNonStringHeaders: boolean = false;
#readable: ReadableStream;
override writable = true;
// used by `npm:on-finished`
finished = false;
headersSent = false;
#resolve: (value: Response | PromiseLike<Response>) => void;
export type ServerResponse = {
statusCode: number;
statusMessage?: string;
_headers: Record<string, string | string[]>;
_hasNonStringHeaders: boolean;
_readable: ReadableStream;
finished: boolean;
headersSent: boolean;
_resolve: (value: Response | PromiseLike<Response>) => void;
// deno-lint-ignore no-explicit-any
#socketOverride: any | null = null;
_socketOverride: any | null;
// deno-lint-ignore no-explicit-any
socket: any | null;
static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) {
try {
if (typeof chunk === "string") {
controller.enqueue(ENCODER.encode(chunk));
} else {
controller.enqueue(chunk);
}
} catch (_) {
// The stream might have been closed. Ignore the error.
}
}
setHeader(name: string, value: string | string[]): void;
appendHeader(name: string, value: string | string[]): void;
getHeader(name: string): string | string[];
removeHeader(name: string): void;
getHeaderNames(): string[];
getHeaders(): Record<string, string | number | string[]>;
hasHeader(name: string): boolean;
/** Returns true if the response body should be null with the given
* http status code */
static #bodyShouldBeNull(status: number) {
return status === 101 || status === 204 || status === 205 || status === 304;
}
writeHead(
status: number,
statusMessage?: string,
headers?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
): void;
writeHead(
status: number,
headers?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
): void;
constructor(
_ensureHeaders(singleChunk?: Chunk): void;
respond(final: boolean, singleChunk?: Chunk): void;
// deno-lint-ignore no-explicit-any
end(chunk?: any, encoding?: any, cb?: any): void;
flushHeaders(): void;
_implicitHeader(): void;
// Undocumented field used by `npm:light-my-request`.
_header: string;
assignSocket(socket): void;
detachSocket(socket): void;
} & { -readonly [K in keyof NodeWritable]: NodeWritable[K] };
type ServerResponseStatic = {
new (
resolve: (value: Response | PromiseLike<Response>) => void,
socket: FakeSocket,
) {
let controller: ReadableByteStreamController;
const readable = new ReadableStream({
start(c) {
controller = c as ReadableByteStreamController;
},
});
super({
): ServerResponse;
_enqueue(controller: ReadableStreamDefaultController, chunk: Chunk): void;
_bodyShouldBeNull(statusCode: number): boolean;
};
export const ServerResponse = function (
this: ServerResponse,
resolve: (value: Response | PromiseLike<Response>) => void,
socket: FakeSocket,
) {
this.statusCode = 200;
this.statusMessage = undefined;
this._headers = { __proto__: null };
this._hasNonStringHeaders = false;
this.writable = true;
// used by `npm:on-finished`
this.finished = false;
this.headersSent = false;
this._socketOverride = null;
let controller: ReadableByteStreamController;
const readable = new ReadableStream({
start(c) {
controller = c as ReadableByteStreamController;
},
});
NodeWritable.call(
this,
{
autoDestroy: true,
defaultEncoding: "utf-8",
emitClose: true,
@ -1235,16 +1283,16 @@ export class ServerResponse extends NodeWritable {
write: (chunk, encoding, cb) => {
// Writes chunks are directly written to the socket if
// one is assigned via assignSocket()
if (this.#socketOverride && this.#socketOverride.writable) {
this.#socketOverride.write(chunk, encoding);
if (this._socketOverride && this._socketOverride.writable) {
this._socketOverride.write(chunk, encoding);
return cb();
}
if (!this.headersSent) {
ServerResponse.#enqueue(controller, chunk);
ServerResponse._enqueue(controller, chunk);
this.respond(false);
return cb();
}
ServerResponse.#enqueue(controller, chunk);
ServerResponse._enqueue(controller, chunk);
return cb();
},
final: (cb) => {
@ -1260,193 +1308,269 @@ export class ServerResponse extends NodeWritable {
}
return cb(null);
},
});
this.#readable = readable;
this.#resolve = resolve;
this.socket = socket;
}
} satisfies NodeWritableOptions,
);
setHeader(name: string, value: string | string[]) {
if (Array.isArray(value)) {
this.#hasNonStringHeaders = true;
}
this.#headers[StringPrototypeToLowerCase(name)] = value;
return this;
}
this._readable = readable;
this._resolve = resolve;
this.socket = socket;
appendHeader(name: string, value: string | string[]) {
const key = StringPrototypeToLowerCase(name);
if (this.#headers[key] === undefined) {
if (Array.isArray(value)) this.#hasNonStringHeaders = true;
this.#headers[key] = value;
this._header = "";
} as unknown as ServerResponseStatic;
Object.setPrototypeOf(ServerResponse.prototype, NodeWritable.prototype);
Object.setPrototypeOf(ServerResponse, NodeWritable);
ServerResponse._enqueue = function (
this: ServerResponse,
controller: ReadableStreamDefaultController,
chunk: Chunk,
) {
try {
if (typeof chunk === "string") {
controller.enqueue(ENCODER.encode(chunk));
} else {
this.#hasNonStringHeaders = true;
if (!Array.isArray(this.#headers[key])) {
this.#headers[key] = [this.#headers[key]];
controller.enqueue(chunk);
}
} catch (_) {
// The stream might have been closed. Ignore the error.
}
};
/** Returns true if the response body should be null with the given
* http status code */
ServerResponse._bodyShouldBeNull = function (
this: ServerResponse,
status: number,
) {
return status === 101 || status === 204 || status === 205 || status === 304;
};
ServerResponse.prototype.setHeader = function (
this: ServerResponse,
name: string,
value: string | string[],
) {
if (Array.isArray(value)) {
this._hasNonStringHeaders = true;
}
this._headers[StringPrototypeToLowerCase(name)] = value;
return this;
};
ServerResponse.prototype.appendHeader = function (
this: ServerResponse,
name: string,
value: string | string[],
) {
const key = StringPrototypeToLowerCase(name);
if (this._headers[key] === undefined) {
if (Array.isArray(value)) this._hasNonStringHeaders = true;
this._headers[key] = value;
} else {
this._hasNonStringHeaders = true;
if (!Array.isArray(this._headers[key])) {
this._headers[key] = [this._headers[key]];
}
const header = this._headers[key];
if (Array.isArray(value)) {
header.push(...value);
} else {
header.push(value);
}
}
return this;
};
ServerResponse.prototype.getHeader = function (
this: ServerResponse,
name: string,
) {
return this._headers[StringPrototypeToLowerCase(name)];
};
ServerResponse.prototype.removeHeader = function (
this: ServerResponse,
name: string,
) {
delete this._headers[StringPrototypeToLowerCase(name)];
};
ServerResponse.prototype.getHeaderNames = function (this: ServerResponse) {
return Object.keys(this._headers);
};
ServerResponse.prototype.getHeaders = function (
this: ServerResponse,
): Record<string, string | number | string[]> {
return { __proto__: null, ...this._headers };
};
ServerResponse.prototype.hasHeader = function (
this: ServerResponse,
name: string,
) {
return Object.hasOwn(this._headers, name);
};
ServerResponse.prototype.writeHead = function (
this: ServerResponse,
status: number,
statusMessageOrHeaders?:
| string
| Record<string, string | number | string[]>
| Array<[string, string]>,
maybeHeaders?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
) {
this.statusCode = status;
let headers = null;
if (typeof statusMessageOrHeaders === "string") {
this.statusMessage = statusMessageOrHeaders;
if (maybeHeaders !== undefined) {
headers = maybeHeaders;
}
} else if (statusMessageOrHeaders !== undefined) {
headers = statusMessageOrHeaders;
}
if (headers !== null) {
if (ArrayIsArray(headers)) {
headers = headers as Array<[string, string]>;
for (let i = 0; i < headers.length; i++) {
this.appendHeader(headers[i][0], headers[i][1]);
}
const header = this.#headers[key];
if (Array.isArray(value)) {
header.push(...value);
} else {
header.push(value);
} else {
headers = headers as Record<string, string>;
for (const k in headers) {
if (Object.hasOwn(headers, k)) {
this.setHeader(k, headers[k]);
}
}
}
return this;
}
getHeader(name: string) {
return this.#headers[StringPrototypeToLowerCase(name)];
}
removeHeader(name: string) {
delete this.#headers[StringPrototypeToLowerCase(name)];
}
getHeaderNames() {
return Object.keys(this.#headers);
}
getHeaders(): Record<string, string | number | string[]> {
// @ts-ignore Ignore null __proto__
return { __proto__: null, ...this.#headers };
}
hasHeader(name: string) {
return Object.hasOwn(this.#headers, name);
}
return this;
};
writeHead(
status: number,
statusMessage?: string,
headers?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
): this;
writeHead(
status: number,
headers?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
): this;
writeHead(
status: number,
statusMessageOrHeaders?:
| string
| Record<string, string | number | string[]>
| Array<[string, string]>,
maybeHeaders?:
| Record<string, string | number | string[]>
| Array<[string, string]>,
): this {
this.statusCode = status;
ServerResponse.prototype._ensureHeaders = function (
this: ServerResponse,
singleChunk?: Chunk,
) {
if (this.statusCode === 200 && this.statusMessage === undefined) {
this.statusMessage = "OK";
}
if (typeof singleChunk === "string" && !this.hasHeader("content-type")) {
this.setHeader("content-type", "text/plain;charset=UTF-8");
}
};
let headers = null;
if (typeof statusMessageOrHeaders === "string") {
this.statusMessage = statusMessageOrHeaders;
if (maybeHeaders !== undefined) {
headers = maybeHeaders;
}
} else if (statusMessageOrHeaders !== undefined) {
headers = statusMessageOrHeaders;
}
if (headers !== null) {
if (ArrayIsArray(headers)) {
headers = headers as Array<[string, string]>;
for (let i = 0; i < headers.length; i++) {
this.appendHeader(headers[i][0], headers[i][1]);
ServerResponse.prototype.respond = function (
this: ServerResponse,
final: boolean,
singleChunk?: Chunk,
) {
this.headersSent = true;
this._ensureHeaders(singleChunk);
let body = singleChunk ?? (final ? null : this._readable);
if (ServerResponse._bodyShouldBeNull(this.statusCode)) {
body = null;
}
let headers: Record<string, string> | [string, string][] = this
._headers as Record<string, string>;
if (this._hasNonStringHeaders) {
headers = [];
// Guard is not needed as this is a null prototype object.
// deno-lint-ignore guard-for-in
for (const key in this._headers) {
const entry = this._headers[key];
if (Array.isArray(entry)) {
for (const value of entry) {
headers.push([key, value]);
}
} else {
headers = headers as Record<string, string>;
for (const k in headers) {
if (Object.hasOwn(headers, k)) {
this.setHeader(k, headers[k]);
}
}
headers.push([key, entry]);
}
}
return this;
}
#ensureHeaders(singleChunk?: Chunk) {
if (this.statusCode === 200 && this.statusMessage === undefined) {
this.statusMessage = "OK";
}
if (
typeof singleChunk === "string" &&
!this.hasHeader("content-type")
) {
this.setHeader("content-type", "text/plain;charset=UTF-8");
}
}
respond(final: boolean, singleChunk?: Chunk) {
this.headersSent = true;
this.#ensureHeaders(singleChunk);
let body = singleChunk ?? (final ? null : this.#readable);
if (ServerResponse.#bodyShouldBeNull(this.statusCode)) {
body = null;
}
let headers: Record<string, string> | [string, string][] = this
.#headers as Record<string, string>;
if (this.#hasNonStringHeaders) {
headers = [];
// Guard is not needed as this is a null prototype object.
// deno-lint-ignore guard-for-in
for (const key in this.#headers) {
const entry = this.#headers[key];
if (Array.isArray(entry)) {
for (const value of entry) {
headers.push([key, value]);
}
} else {
headers.push([key, entry]);
}
}
}
this.#resolve(
new Response(body, {
headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
);
}
this._resolve(
new Response(body, {
headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
);
};
ServerResponse.prototype.end = function (
this: ServerResponse,
// deno-lint-ignore no-explicit-any
override end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true;
if (!chunk && "transfer-encoding" in this.#headers) {
// FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e.,
// the trailing "0\r\n", but respondWith() just hangs when I try that.
this.#headers["content-length"] = "0";
delete this.#headers["transfer-encoding"];
}
// @ts-expect-error The signature for cb is stricter than the one implemented here
return super.end(chunk, encoding, cb);
chunk?: any,
// deno-lint-ignore no-explicit-any
encoding?: any,
// deno-lint-ignore no-explicit-any
cb?: any,
) {
this.finished = true;
if (!chunk && "transfer-encoding" in this._headers) {
// FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e.,
// the trailing "0\r\n", but respondWith() just hangs when I try that.
this._headers["content-length"] = "0";
delete this._headers["transfer-encoding"];
}
flushHeaders() {
// no-op
}
// @ts-expect-error The signature for cb is stricter than the one implemented here
NodeWritable.prototype.end.call(this, chunk, encoding, cb);
};
// Undocumented API used by `npm:compression`.
_implicitHeader() {
this.writeHead(this.statusCode);
}
ServerResponse.prototype.flushHeaders = function (this: ServerResponse) {
// no-op
};
assignSocket(socket) {
if (socket._httpMessage) {
throw new ERR_HTTP_SOCKET_ASSIGNED();
}
socket._httpMessage = this;
this.#socketOverride = socket;
}
// Undocumented API used by `npm:compression`.
ServerResponse.prototype._implicitHeader = function (this: ServerResponse) {
this.writeHead(this.statusCode);
};
detachSocket(socket) {
assert(socket._httpMessage === this);
socket._httpMessage = null;
this.#socketOverride = null;
ServerResponse.prototype.assignSocket = function (
this: ServerResponse,
socket,
) {
if (socket._httpMessage) {
throw new ERR_HTTP_SOCKET_ASSIGNED();
}
}
socket._httpMessage = this;
this._socketOverride = socket;
};
ServerResponse.prototype.detachSocket = function (
this: ServerResponse,
socket,
) {
assert(socket._httpMessage === this);
socket._httpMessage = null;
this._socketOverride = null;
};
Object.defineProperty(ServerResponse.prototype, "connection", {
get: deprecate(
function (this: ServerResponse) {
return this._socketOverride;
},
"ServerResponse.prototype.connection is deprecated",
"DEP0066",
),
set: deprecate(
// deno-lint-ignore no-explicit-any
function (this: ServerResponse, socket: any) {
this._socketOverride = socket;
},
"ServerResponse.prototype.connection is deprecated",
"DEP0066",
),
});
// TODO(@AaronO): optimize
export class IncomingMessageForServer extends NodeReadable {

View File

@ -3,10 +3,14 @@
// deno-lint-ignore-file no-console
import EventEmitter from "node:events";
import http, { type RequestOptions, type ServerResponse } from "node:http";
import http, {
IncomingMessage,
type RequestOptions,
ServerResponse,
} from "node:http";
import url from "node:url";
import https from "node:https";
import net from "node:net";
import net, { Socket } from "node:net";
import fs from "node:fs";
import { text } from "node:stream/consumers";
@ -1704,3 +1708,75 @@ Deno.test("[node/http] upgraded socket closes when the server closed without clo
await clientSocketClosed.promise;
await serverProcessClosed.promise;
});
// deno-lint-ignore require-await
Deno.test("[node/http] ServerResponse.call()", async () => {
function Wrapper(this: unknown, req: IncomingMessage) {
ServerResponse.call(this, req);
}
Object.setPrototypeOf(Wrapper.prototype, ServerResponse.prototype);
// deno-lint-ignore no-explicit-any
const wrapper = new (Wrapper as any)(new IncomingMessage(new Socket()));
assert(wrapper instanceof ServerResponse);
});
Deno.test("[node/http] ServerResponse _header", async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const server = http.createServer((_req, res) => {
assert(Object.hasOwn(res, "_header"));
res.end();
});
server.listen(async () => {
const { port } = server.address() as { port: number };
const res = await fetch(`http://localhost:${port}`);
await res.body?.cancel();
server.close(() => {
resolve();
});
});
await promise;
});
Deno.test("[node/http] ServerResponse connection", async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const server = http.createServer((_req, res) => {
assert(Object.hasOwn(res, "connection"));
assert(res.connection instanceof Socket);
res.end();
});
server.listen(async () => {
const { port } = server.address() as { port: number };
const res = await fetch(`http://localhost:${port}`);
await res.body?.cancel();
server.close(() => {
resolve();
});
});
await promise;
});
Deno.test("[node/http] ServerResponse socket", async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const server = http.createServer((_req, res) => {
assert(Object.hasOwn(res, "socket"));
assert(res.socket instanceof Socket);
res.end();
});
server.listen(async () => {
const { port } = server.address() as { port: number };
const res = await fetch(`http://localhost:${port}`);
await res.body?.cancel();
server.close(() => {
resolve();
});
});
await promise;
});