fix: poll connection after writing response chunk in Deno.serveHttp() (#10961)

This commit changes "op_http_response_write" to first send response chunk
and then poll the underlying HTTP connection.

Previously after writing a chunk of response HTTP connection wasn't polled
and thus data wasn't written to the socket until after next op interacting 
with the connection.
This commit is contained in:
Bartek Iwańczuk 2021-06-14 22:10:55 +02:00 committed by GitHub
parent f48d66b2b0
commit 1246a433f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 1 deletions

View File

@ -7,6 +7,7 @@ import {
assert,
assertEquals,
assertThrowsAsync,
deferred,
unitTest,
} from "./test_util.ts";
@ -450,3 +451,93 @@ unitTest(
}
},
);
unitTest(
{ perms: { net: true } },
// Issue: https://github.com/denoland/deno/issues/10930
async function httpServerStreamingResponse() {
// This test enqueues a single chunk for readable
// stream and waits for client to read that chunk and signal
// it before enqueueing subsequent chunk. Issue linked above
// presented a situation where enqueued chunks were not
// written to the HTTP connection until the next chunk was enqueued.
let counter = 0;
const deferreds = [
deferred(),
deferred(),
deferred(),
];
async function writeRequest(conn: Deno.Conn) {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const w = new BufWriter(conn);
const r = new BufReader(conn);
const body = `GET / HTTP/1.1\r\nHost: 127.0.0.1:4501\r\n\r\n`;
const writeResult = await w.write(encoder.encode(body));
assertEquals(body.length, writeResult);
await w.flush();
const tpr = new TextProtoReader(r);
const statusLine = await tpr.readLine();
assert(statusLine !== null);
const headers = await tpr.readMIMEHeader();
assert(headers !== null);
const chunkedReader = chunkedBodyReader(headers, r);
const buf = new Uint8Array(5);
const dest = new Buffer();
let result: number | null;
while ((result = await chunkedReader.read(buf)) !== null) {
const len = Math.min(buf.byteLength, result);
await dest.write(buf.subarray(0, len));
// Resolve a deferred - this will make response stream to
// enqueue next chunk.
deferreds[counter - 1].resolve();
}
return decoder.decode(dest.bytes());
}
function periodicStream() {
return new ReadableStream({
start(controller) {
controller.enqueue(`${counter}\n`);
counter++;
},
async pull(controller) {
if (counter >= 3) {
return controller.close();
}
await deferreds[counter - 1];
controller.enqueue(`${counter}\n`);
counter++;
},
}).pipeThrough(new TextEncoderStream());
}
const listener = Deno.listen({ port: 4501 });
const finished = (async () => {
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const requestEvent = await httpConn.nextRequest();
const { respondWith } = requestEvent!;
await respondWith(new Response(periodicStream()));
httpConn.close();
})();
// start a client
const clientConn = await Deno.connect({ port: 4501 });
const r1 = await writeRequest(clientConn);
assertEquals(r1, "0\n1\n2\n");
await finished;
clientConn.close();
listener.close();
},
);

View File

@ -502,6 +502,9 @@ async fn op_http_response_write(
let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
poll_fn(|cx| {
let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
// Poll connection so the data is flushed
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
// close ConnResource
// close RequestResource associated with connection
@ -509,7 +512,7 @@ async fn op_http_response_write(
return Poll::Ready(Err(e));
}
send_data_fut.poll_unpin(cx).map_err(AnyError::from)
r
})
.await?;