mirror of
https://github.com/denoland/deno.git
synced 2024-11-22 04:51:22 +00:00
Revert various PRs related to "ext/http" (#14339)
* Revert "feat(ext/http): stream auto resp body compression (#14325)" * Revert "core: introduce `resource.read_return` (#14331)" * Revert "perf(http): optimize `ReadableStream`s backed by a resource (#14284)"
This commit is contained in:
parent
aaaa877d91
commit
03019e7781
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
@ -236,7 +236,7 @@ jobs:
|
||||
~/.cargo/registry/index
|
||||
~/.cargo/registry/cache
|
||||
~/.cargo/git/db
|
||||
key: 9-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
|
||||
key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
# In main branch, always creates fresh cache
|
||||
- name: Cache build output (main)
|
||||
@ -252,7 +252,7 @@ jobs:
|
||||
!./target/*/*.zip
|
||||
!./target/*/*.tar.gz
|
||||
key: |
|
||||
9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
|
||||
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
|
||||
|
||||
# Restore cache from the latest 'main' branch build.
|
||||
- name: Cache build output (PR)
|
||||
@ -268,7 +268,7 @@ jobs:
|
||||
!./target/*/*.tar.gz
|
||||
key: never_saved
|
||||
restore-keys: |
|
||||
9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
|
||||
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
|
||||
|
||||
# Don't save cache after building PRs or branches other than 'main'.
|
||||
- name: Skip save cache (PR)
|
||||
|
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -964,7 +964,6 @@ dependencies = [
|
||||
name = "deno_http"
|
||||
version = "0.41.0"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"base64 0.13.0",
|
||||
"brotli",
|
||||
"bytes",
|
||||
|
@ -854,45 +854,6 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
|
||||
listener.close();
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerClosedStream() {
|
||||
const listener = Deno.listen({ port: 4502 });
|
||||
|
||||
const client = await Deno.connect({ port: 4502 });
|
||||
await client.write(new TextEncoder().encode(
|
||||
`GET / HTTP/1.0\r\n\r\n`,
|
||||
));
|
||||
|
||||
const conn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(conn);
|
||||
const ev = await httpConn.nextRequest();
|
||||
const { respondWith } = ev!;
|
||||
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
await file.write(new TextEncoder().encode("hello"));
|
||||
|
||||
const reader = await file.readable.getReader();
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
assert(value);
|
||||
}
|
||||
|
||||
try {
|
||||
await respondWith(new Response(file.readable));
|
||||
fail("The stream should've been locked");
|
||||
} catch {
|
||||
// pass
|
||||
}
|
||||
|
||||
httpConn.close();
|
||||
listener.close();
|
||||
client.close();
|
||||
},
|
||||
);
|
||||
|
||||
// https://github.com/denoland/deno/issues/11595
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
@ -1224,25 +1185,26 @@ Deno.test(
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
Deno.test({
|
||||
name: "http server compresses body - check headers",
|
||||
name: "http server compresses body",
|
||||
permissions: { net: true, run: true },
|
||||
async fn() {
|
||||
const hostname = "localhost";
|
||||
const port = 4501;
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
|
||||
const data = { hello: "deno", now: "with", compressed: "body" };
|
||||
|
||||
async function server() {
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
const tcpConn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn.nextRequest();
|
||||
assert(e);
|
||||
const { request, respondWith } = e;
|
||||
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
|
||||
const response = new Response(JSON.stringify(data), {
|
||||
headers: { "content-type": "application/json" },
|
||||
});
|
||||
const response = new Response(
|
||||
JSON.stringify({ hello: "deno", now: "with", compressed: "body" }),
|
||||
{
|
||||
headers: { "content-type": "application/json" },
|
||||
},
|
||||
);
|
||||
await respondWith(response);
|
||||
httpConn.close();
|
||||
listener.close();
|
||||
@ -1273,60 +1235,6 @@ Deno.test({
|
||||
},
|
||||
});
|
||||
|
||||
Deno.test({
|
||||
name: "http server compresses body - check body",
|
||||
permissions: { net: true, run: true },
|
||||
async fn() {
|
||||
const hostname = "localhost";
|
||||
const port = 4501;
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
|
||||
const data = { hello: "deno", now: "with", compressed: "body" };
|
||||
|
||||
async function server() {
|
||||
const tcpConn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn.nextRequest();
|
||||
assert(e);
|
||||
const { request, respondWith } = e;
|
||||
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
|
||||
const response = new Response(JSON.stringify(data), {
|
||||
headers: { "content-type": "application/json" },
|
||||
});
|
||||
await respondWith(response);
|
||||
httpConn.close();
|
||||
listener.close();
|
||||
}
|
||||
|
||||
async function client() {
|
||||
const url = `http://${hostname}:${port}/`;
|
||||
const cmd = [
|
||||
"curl",
|
||||
"--request",
|
||||
"GET",
|
||||
"--url",
|
||||
url,
|
||||
"--header",
|
||||
"Accept-Encoding: gzip, deflate, br",
|
||||
];
|
||||
const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
|
||||
const status = await proc.status();
|
||||
assert(status.success);
|
||||
const stdout = proc.stdout!.readable
|
||||
.pipeThrough(new DecompressionStream("gzip"))
|
||||
.pipeThrough(new TextDecoderStream());
|
||||
let body = "";
|
||||
for await (const chunk of stdout) {
|
||||
body += chunk;
|
||||
}
|
||||
assertEquals(JSON.parse(body), data);
|
||||
proc.close();
|
||||
}
|
||||
|
||||
await Promise.all([server(), client()]);
|
||||
},
|
||||
});
|
||||
|
||||
Deno.test({
|
||||
name: "http server doesn't compress small body",
|
||||
permissions: { net: true, run: true },
|
||||
@ -1706,18 +1614,15 @@ Deno.test({
|
||||
});
|
||||
|
||||
Deno.test({
|
||||
name: "http server compresses streamed bodies - check headers",
|
||||
name: "http server doesn't compress streamed bodies",
|
||||
permissions: { net: true, run: true },
|
||||
async fn() {
|
||||
const hostname = "localhost";
|
||||
const port = 4501;
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
|
||||
const data = { hello: "deno", now: "with", compressed: "body" };
|
||||
|
||||
async function server() {
|
||||
const encoder = new TextEncoder();
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
const tcpConn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn.nextRequest();
|
||||
@ -1726,13 +1631,23 @@ Deno.test({
|
||||
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
|
||||
const bodyInit = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(encoder.encode(JSON.stringify(data)));
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
hello: "deno",
|
||||
now: "with",
|
||||
compressed: "body",
|
||||
}),
|
||||
),
|
||||
);
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
const response = new Response(
|
||||
bodyInit,
|
||||
{ headers: { "content-type": "application/json" } },
|
||||
{
|
||||
headers: { "content-type": "application/json", vary: "Accept" },
|
||||
},
|
||||
);
|
||||
await respondWith(response);
|
||||
httpConn.close();
|
||||
@ -1755,71 +1670,8 @@ Deno.test({
|
||||
const status = await proc.status();
|
||||
assert(status.success);
|
||||
const output = decoder.decode(await proc.output());
|
||||
assert(output.includes("vary: Accept-Encoding\r\n"));
|
||||
assert(output.includes("content-encoding: gzip\r\n"));
|
||||
proc.close();
|
||||
}
|
||||
|
||||
await Promise.all([server(), client()]);
|
||||
},
|
||||
});
|
||||
|
||||
Deno.test({
|
||||
name: "http server compresses streamed bodies - check body",
|
||||
permissions: { net: true, run: true },
|
||||
async fn() {
|
||||
const hostname = "localhost";
|
||||
const port = 4501;
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
|
||||
const data = { hello: "deno", now: "with", compressed: "body" };
|
||||
|
||||
async function server() {
|
||||
const tcpConn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn.nextRequest();
|
||||
assert(e);
|
||||
const { request, respondWith } = e;
|
||||
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
|
||||
const bodyInit = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(encoder.encode(JSON.stringify(data)));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
const response = new Response(
|
||||
bodyInit,
|
||||
{ headers: { "content-type": "application/json" } },
|
||||
);
|
||||
await respondWith(response);
|
||||
httpConn.close();
|
||||
listener.close();
|
||||
}
|
||||
|
||||
async function client() {
|
||||
const url = `http://${hostname}:${port}/`;
|
||||
const cmd = [
|
||||
"curl",
|
||||
"--request",
|
||||
"GET",
|
||||
"--url",
|
||||
url,
|
||||
"--header",
|
||||
"Accept-Encoding: gzip, deflate, br",
|
||||
];
|
||||
const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
|
||||
const status = await proc.status();
|
||||
assert(status.success);
|
||||
const stdout = proc.stdout.readable
|
||||
.pipeThrough(new DecompressionStream("gzip"))
|
||||
.pipeThrough(new TextDecoderStream());
|
||||
let body = "";
|
||||
for await (const chunk of stdout) {
|
||||
body += chunk;
|
||||
}
|
||||
assertEquals(JSON.parse(body), data);
|
||||
assert(output.includes("vary: Accept\r\n"));
|
||||
assert(!output.includes("content-encoding: "));
|
||||
proc.close();
|
||||
}
|
||||
|
||||
@ -1884,6 +1736,8 @@ Deno.test({
|
||||
// Ensure the content-length header is updated.
|
||||
assert(!output.includes(`content-length: ${contentLength}\r\n`));
|
||||
assert(output.includes("content-length: 72\r\n"));
|
||||
console.log(output);
|
||||
|
||||
proc.close();
|
||||
}
|
||||
|
||||
|
@ -83,18 +83,13 @@ struct TcpStream {
|
||||
}
|
||||
|
||||
impl TcpStream {
|
||||
async fn read(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), Error> {
|
||||
async fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> Result<usize, Error> {
|
||||
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = rd
|
||||
.read(&mut buf)
|
||||
rd.read(&mut buf)
|
||||
.try_or_cancel(cancel)
|
||||
.await
|
||||
.map_err(Error::from)?;
|
||||
Ok((nread, buf))
|
||||
.map_err(Error::from)
|
||||
}
|
||||
|
||||
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
|
||||
@ -104,10 +99,7 @@ impl TcpStream {
|
||||
}
|
||||
|
||||
impl Resource for TcpStream {
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
|
@ -36,17 +36,7 @@ pub trait Resource: Any + 'static {
|
||||
}
|
||||
|
||||
/// Resources may implement `read()` to be a readable stream
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(async move {
|
||||
let (nread, _) = self.read_return(buf).await?;
|
||||
Ok(nread)
|
||||
})
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
_buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(futures::future::err(not_supported()))
|
||||
}
|
||||
|
||||
|
@ -485,15 +485,12 @@ impl Resource for FetchResponseBodyResource {
|
||||
"fetchResponseBody".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(async move {
|
||||
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
|
||||
Ok((read, buf))
|
||||
Ok(read)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,7 @@
|
||||
} = window.__bootstrap.webSocket;
|
||||
const { TcpConn, UnixConn } = window.__bootstrap.net;
|
||||
const { TlsConn } = window.__bootstrap.tls;
|
||||
const { Deferred, getReadableStreamRid, readableStreamClose } =
|
||||
window.__bootstrap.streams;
|
||||
const { Deferred } = window.__bootstrap.streams;
|
||||
const {
|
||||
ArrayPrototypeIncludes,
|
||||
ArrayPrototypePush,
|
||||
@ -236,6 +235,7 @@
|
||||
typeof respBody === "string" ||
|
||||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
|
||||
);
|
||||
|
||||
try {
|
||||
await core.opAsync(
|
||||
"op_http_write_headers",
|
||||
@ -269,50 +269,35 @@
|
||||
) {
|
||||
throw new TypeError("Unreachable");
|
||||
}
|
||||
const resourceRid = getReadableStreamRid(respBody);
|
||||
if (resourceRid) {
|
||||
if (respBody.locked) {
|
||||
throw new TypeError("ReadableStream is locked.");
|
||||
const reader = respBody.getReader();
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
||||
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
||||
break;
|
||||
}
|
||||
const _reader = respBody.getReader(); // Aquire JS lock.
|
||||
await core.opAsync(
|
||||
"op_http_write_resource",
|
||||
streamRid,
|
||||
resourceRid,
|
||||
);
|
||||
readableStreamClose(respBody); // Release JS lock.
|
||||
} else {
|
||||
const reader = respBody.getReader();
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
||||
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
await core.opAsync("op_http_write", streamRid, value);
|
||||
} catch (error) {
|
||||
const connError = httpConn[connErrorSymbol];
|
||||
if (
|
||||
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
||||
connError != null
|
||||
) {
|
||||
// deno-lint-ignore no-ex-assign
|
||||
error = new connError.constructor(connError.message);
|
||||
}
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await core.opAsync("op_http_shutdown", streamRid);
|
||||
await core.opAsync("op_http_write", streamRid, value);
|
||||
} catch (error) {
|
||||
const connError = httpConn[connErrorSymbol];
|
||||
if (
|
||||
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
||||
connError != null
|
||||
) {
|
||||
// deno-lint-ignore no-ex-assign
|
||||
error = new connError.constructor(connError.message);
|
||||
}
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
try {
|
||||
await core.opAsync("op_http_shutdown", streamRid);
|
||||
} catch (error) {
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
const deferred = request[_deferred];
|
||||
|
@ -14,7 +14,6 @@ description = "HTTP server implementation for Deno"
|
||||
path = "lib.rs"
|
||||
|
||||
[dependencies]
|
||||
async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] }
|
||||
base64 = "0.13.0"
|
||||
brotli = "3.3.3"
|
||||
bytes = "1"
|
||||
|
227
ext/http/lib.rs
227
ext/http/lib.rs
@ -1,7 +1,6 @@
|
||||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use async_compression::tokio::write::BrotliEncoder;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Bytes;
|
||||
use cache_control::CacheControl;
|
||||
use deno_core::error::custom_error;
|
||||
use deno_core::error::AnyError;
|
||||
@ -22,6 +21,7 @@ use deno_core::futures::StreamExt;
|
||||
use deno_core::futures::TryFutureExt;
|
||||
use deno_core::include_js_files;
|
||||
use deno_core::op;
|
||||
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::ByteString;
|
||||
use deno_core::CancelFuture;
|
||||
@ -60,9 +60,7 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::task::spawn_local;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
mod compressible;
|
||||
|
||||
@ -77,7 +75,6 @@ pub fn init() -> Extension {
|
||||
op_http_read::decl(),
|
||||
op_http_write_headers::decl(),
|
||||
op_http_write::decl(),
|
||||
op_http_write_resource::decl(),
|
||||
op_http_shutdown::decl(),
|
||||
op_http_websocket_accept_header::decl(),
|
||||
op_http_upgrade_websocket::decl(),
|
||||
@ -341,7 +338,7 @@ impl Default for HttpRequestReader {
|
||||
/// The write half of an HTTP stream.
|
||||
enum HttpResponseWriter {
|
||||
Headers(oneshot::Sender<Response<Body>>),
|
||||
Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
|
||||
Body(hyper::body::Sender),
|
||||
Closed,
|
||||
}
|
||||
|
||||
@ -548,60 +545,55 @@ async fn op_http_write_headers(
|
||||
let body: Response<Body>;
|
||||
let new_wr: HttpResponseWriter;
|
||||
|
||||
// Set Vary: Accept-Encoding header for direct body response.
|
||||
// Note: we set the header irrespective of whether or not we compress the data
|
||||
// to make sure cache services do not serve uncompressed data to clients that
|
||||
// support compression.
|
||||
let vary_value = if let Some(value) = vary_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.to_lowercase().contains("accept-encoding") {
|
||||
format!("Accept-Encoding, {}", value_str)
|
||||
} else {
|
||||
value_str.to_string()
|
||||
}
|
||||
} else {
|
||||
// the header value wasn't valid UTF8, so it would have been a
|
||||
// problem anyways, so sending a default header.
|
||||
"Accept-Encoding".to_string()
|
||||
}
|
||||
} else {
|
||||
"Accept-Encoding".to_string()
|
||||
};
|
||||
builder = builder.header("vary", &vary_value);
|
||||
|
||||
let accepts_compression = matches!(
|
||||
*stream.accept_encoding.borrow(),
|
||||
Encoding::Brotli | Encoding::Gzip
|
||||
);
|
||||
let should_compress = body_compressible
|
||||
&& (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
|
||||
&& accepts_compression;
|
||||
|
||||
if should_compress {
|
||||
// If user provided a ETag header for uncompressed data, we need to
|
||||
// ensure it is a Weak Etag header ("W/").
|
||||
if let Some(value) = etag_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.starts_with("W/") {
|
||||
builder = builder.header("etag", format!("W/{}", value_str));
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
}
|
||||
} else if let Some(value) = etag_header {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
|
||||
match data {
|
||||
Some(data) => {
|
||||
// Set Vary: Accept-Encoding header for direct body response.
|
||||
// Note: we set the header irrespective of whether or not we compress the
|
||||
// data to make sure cache services do not serve uncompressed data to
|
||||
// clients that support compression.
|
||||
let vary_value = if let Some(value) = vary_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.to_lowercase().contains("accept-encoding") {
|
||||
format!("Accept-Encoding, {}", value_str)
|
||||
} else {
|
||||
value_str.to_string()
|
||||
}
|
||||
} else {
|
||||
// the header value wasn't valid UTF8, so it would have been a
|
||||
// problem anyways, so sending a default header.
|
||||
"Accept-Encoding".to_string()
|
||||
}
|
||||
} else {
|
||||
"Accept-Encoding".to_string()
|
||||
};
|
||||
builder = builder.header("vary", &vary_value);
|
||||
|
||||
let accepts_compression = matches!(
|
||||
*stream.accept_encoding.borrow(),
|
||||
Encoding::Brotli | Encoding::Gzip
|
||||
);
|
||||
|
||||
let should_compress =
|
||||
body_compressible && data.len() > 20 && accepts_compression;
|
||||
|
||||
if should_compress {
|
||||
// Drop 'content-length' header. Hyper will update it using compressed body.
|
||||
if let Some(headers) = builder.headers_mut() {
|
||||
headers.remove("content-length");
|
||||
}
|
||||
// If user provided a ETag header for uncompressed data, we need to
|
||||
// ensure it is a Weak Etag header ("W/").
|
||||
if let Some(value) = etag_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.starts_with("W/") {
|
||||
builder = builder.header("etag", format!("W/{}", value_str));
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
}
|
||||
|
||||
match *stream.accept_encoding.borrow() {
|
||||
Encoding::Brotli => {
|
||||
@ -629,6 +621,9 @@ async fn op_http_write_headers(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if let Some(value) = etag_header {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
// If a buffer was passed, but isn't compressible, we use it to
|
||||
// construct a response body.
|
||||
body = builder.body(data.into_bytes().into())?;
|
||||
@ -638,35 +633,19 @@ async fn op_http_write_headers(
|
||||
None => {
|
||||
// If no buffer was passed, the caller will stream the response body.
|
||||
|
||||
// Create a one way pipe that implements tokio's async io traits. To do
|
||||
// this we create a [tokio::io::DuplexStream], but then throw away one
|
||||
// of the directions to create a one way pipe.
|
||||
let (a, b) = tokio::io::duplex(64 * 1024);
|
||||
let (reader, _) = tokio::io::split(a);
|
||||
let (_, writer) = tokio::io::split(b);
|
||||
// TODO(@kitsonk) had compression for streamed bodies.
|
||||
|
||||
let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
|
||||
|
||||
if should_compress {
|
||||
match *stream.accept_encoding.borrow() {
|
||||
Encoding::Brotli => {
|
||||
let writer = BrotliEncoder::new(writer);
|
||||
writer_body = Box::pin(writer);
|
||||
builder = builder.header("content-encoding", "br");
|
||||
}
|
||||
_ => {
|
||||
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
|
||||
let writer = GzipEncoder::new(writer);
|
||||
writer_body = Box::pin(writer);
|
||||
builder = builder.header("content-encoding", "gzip");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
writer_body = Box::pin(writer);
|
||||
// Set the user provided ETag & Vary headers for a streaming response
|
||||
if let Some(value) = etag_header {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
if let Some(value) = vary_header {
|
||||
builder = builder.header("vary", value.as_slice());
|
||||
}
|
||||
|
||||
body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
|
||||
new_wr = HttpResponseWriter::Body(writer_body);
|
||||
let (body_tx, body_rx) = Body::channel();
|
||||
body = builder.body(body_rx)?;
|
||||
new_wr = HttpResponseWriter::Body(body_tx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -685,69 +664,6 @@ async fn op_http_write_headers(
|
||||
}
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_write_resource(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
stream: ResourceId,
|
||||
) -> Result<(), AnyError> {
|
||||
let http_stream = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<HttpStreamResource>(rid)?;
|
||||
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
|
||||
let resource = state.borrow().resource_table.get_any(stream)?;
|
||||
loop {
|
||||
let body_writer = match &mut *wr {
|
||||
HttpResponseWriter::Body(body_writer) => body_writer,
|
||||
HttpResponseWriter::Headers(_) => {
|
||||
return Err(http_error("no response headers"))
|
||||
}
|
||||
HttpResponseWriter::Closed => {
|
||||
return Err(http_error("response already completed"))
|
||||
}
|
||||
};
|
||||
|
||||
let vec = vec![0u8; 64 * 1024]; // 64KB
|
||||
let buf = ZeroCopyBuf::new_temp(vec);
|
||||
let (nread, buf) = resource.clone().read_return(buf).await?;
|
||||
if nread == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut res = body_writer.write_all(&buf).await;
|
||||
if res.is_ok() {
|
||||
res = body_writer.flush().await;
|
||||
}
|
||||
match res {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
http_stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let wr = take(&mut *wr);
|
||||
if let HttpResponseWriter::Body(mut body_writer) = wr {
|
||||
match body_writer.shutdown().await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
http_stream.conn.closed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_write(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
@ -761,7 +677,7 @@ async fn op_http_write(
|
||||
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
|
||||
|
||||
loop {
|
||||
let body_writer = match &mut *wr {
|
||||
let body_tx = match &mut *wr {
|
||||
HttpResponseWriter::Body(body_tx) => body_tx,
|
||||
HttpResponseWriter::Headers(_) => {
|
||||
break Err(http_error("no response headers"))
|
||||
@ -771,17 +687,13 @@ async fn op_http_write(
|
||||
}
|
||||
};
|
||||
|
||||
let mut res = body_writer.write_all(&buf).await;
|
||||
if res.is_ok() {
|
||||
res = body_writer.flush().await;
|
||||
}
|
||||
|
||||
match res {
|
||||
let bytes = Bytes::copy_from_slice(&buf[..]);
|
||||
match body_tx.send_data(bytes).await {
|
||||
Ok(_) => break Ok(()),
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Don't return "channel closed", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
assert!(err.is_closed());
|
||||
stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
@ -803,18 +715,7 @@ async fn op_http_shutdown(
|
||||
.resource_table
|
||||
.get::<HttpStreamResource>(rid)?;
|
||||
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
|
||||
let wr = take(&mut *wr);
|
||||
if let HttpResponseWriter::Body(mut body_writer) = wr {
|
||||
match body_writer.shutdown().await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
stream.conn.closed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
take(&mut *wr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||
const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
|
||||
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
||||
const {
|
||||
Error,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
@ -65,6 +65,8 @@
|
||||
return core.opAsync("op_dns_resolve", { query, recordType, options });
|
||||
}
|
||||
|
||||
const DEFAULT_CHUNK_SIZE = 64 * 1024;
|
||||
|
||||
function tryClose(rid) {
|
||||
try {
|
||||
core.close(rid);
|
||||
@ -73,6 +75,32 @@
|
||||
}
|
||||
}
|
||||
|
||||
function readableStreamForRid(rid) {
|
||||
return new ReadableStream({
|
||||
type: "bytes",
|
||||
async pull(controller) {
|
||||
const v = controller.byobRequest.view;
|
||||
try {
|
||||
const bytesRead = await read(rid, v);
|
||||
if (bytesRead === null) {
|
||||
tryClose(rid);
|
||||
controller.close();
|
||||
controller.byobRequest.respond(0);
|
||||
} else {
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
}
|
||||
} catch (e) {
|
||||
controller.error(e);
|
||||
tryClose(rid);
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
tryClose(rid);
|
||||
},
|
||||
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
});
|
||||
}
|
||||
|
||||
function writableStreamForRid(rid) {
|
||||
return new WritableStream({
|
||||
async write(chunk, controller) {
|
||||
|
@ -70,13 +70,13 @@ where
|
||||
pub async fn read(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
) -> Result<usize, AnyError> {
|
||||
let mut rd = self.rd_borrow_mut().await;
|
||||
let nread = rd
|
||||
.read(&mut buf)
|
||||
.try_or_cancel(self.cancel_handle())
|
||||
.await?;
|
||||
Ok((nread, buf))
|
||||
Ok(nread)
|
||||
}
|
||||
|
||||
pub async fn write(
|
||||
@ -103,10 +103,7 @@ impl Resource for TcpStreamResource {
|
||||
"tcpStream".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
@ -163,7 +160,7 @@ impl UnixStreamResource {
|
||||
pub async fn read(
|
||||
self: Rc<Self>,
|
||||
_buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
) -> Result<usize, AnyError> {
|
||||
unreachable!()
|
||||
}
|
||||
pub async fn write(
|
||||
@ -185,10 +182,7 @@ impl Resource for UnixStreamResource {
|
||||
"unixStream".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
|
@ -674,11 +674,11 @@ impl TlsStreamResource {
|
||||
pub async fn read(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
) -> Result<usize, AnyError> {
|
||||
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
|
||||
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
|
||||
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
|
||||
Ok((nread, buf))
|
||||
Ok(nread)
|
||||
}
|
||||
|
||||
pub async fn write(
|
||||
@ -722,10 +722,7 @@ impl Resource for TlsStreamResource {
|
||||
"tlsStream".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,6 @@
|
||||
"use strict";
|
||||
|
||||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const webidl = window.__bootstrap.webidl;
|
||||
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
|
||||
window.__bootstrap.abortSignal;
|
||||
@ -641,41 +640,6 @@
|
||||
return stream[_disturbed];
|
||||
}
|
||||
|
||||
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
|
||||
|
||||
function readableStreamForRid(rid) {
|
||||
const stream = new ReadableStream({
|
||||
type: "bytes",
|
||||
async pull(controller) {
|
||||
const v = controller.byobRequest.view;
|
||||
try {
|
||||
const bytesRead = await core.read(rid, v);
|
||||
if (bytesRead === 0) {
|
||||
core.tryClose(rid);
|
||||
controller.close();
|
||||
controller.byobRequest.respond(0);
|
||||
} else {
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
}
|
||||
} catch (e) {
|
||||
controller.error(e);
|
||||
core.tryClose(rid);
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
core.tryClose(rid);
|
||||
},
|
||||
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
});
|
||||
|
||||
stream[_maybeRid] = rid;
|
||||
return stream;
|
||||
}
|
||||
|
||||
function getReadableStreamRid(stream) {
|
||||
return stream[_maybeRid];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {unknown} value
|
||||
* @returns {value is WritableStream}
|
||||
@ -4324,7 +4288,6 @@
|
||||
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
|
||||
}
|
||||
|
||||
const _maybeRid = Symbol("[[maybeRid]]");
|
||||
/** @template R */
|
||||
class ReadableStream {
|
||||
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
|
||||
@ -4339,8 +4302,6 @@
|
||||
[_state];
|
||||
/** @type {any} */
|
||||
[_storedError];
|
||||
/** @type {number | null} */
|
||||
[_maybeRid] = null;
|
||||
|
||||
/**
|
||||
* @param {UnderlyingSource<R>=} underlyingSource
|
||||
@ -5879,9 +5840,6 @@
|
||||
errorReadableStream,
|
||||
createProxy,
|
||||
writableStreamClose,
|
||||
readableStreamClose,
|
||||
readableStreamForRid,
|
||||
getReadableStreamRid,
|
||||
Deferred,
|
||||
// Exposed in global runtime scope
|
||||
ByteLengthQueuingStrategy,
|
||||
|
@ -6,8 +6,8 @@
|
||||
const { read, readSync, write, writeSync } = window.__bootstrap.io;
|
||||
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
|
||||
const { pathFromURL } = window.__bootstrap.util;
|
||||
const { writableStreamForRid } = window.__bootstrap.streamUtils;
|
||||
const { readableStreamForRid } = window.__bootstrap.streams;
|
||||
const { readableStreamForRid, writableStreamForRid } =
|
||||
window.__bootstrap.streamUtils;
|
||||
const {
|
||||
ArrayPrototypeFilter,
|
||||
Error,
|
||||
|
@ -174,13 +174,13 @@ where
|
||||
async fn read(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
) -> Result<usize, AnyError> {
|
||||
let mut rd = self.borrow_mut().await;
|
||||
let nread = rd
|
||||
.read(&mut buf)
|
||||
.try_or_cancel(self.cancel_handle())
|
||||
.await?;
|
||||
Ok((nread, buf))
|
||||
Ok(nread)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> S {
|
||||
@ -211,10 +211,7 @@ impl Resource for ChildStdoutResource {
|
||||
"childStdout".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
@ -230,10 +227,7 @@ impl Resource for ChildStderrResource {
|
||||
"childStderr".into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
@ -277,17 +271,16 @@ impl StdFileResource {
|
||||
async fn read(
|
||||
self: Rc<Self>,
|
||||
mut buf: ZeroCopyBuf,
|
||||
) -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
) -> Result<usize, AnyError> {
|
||||
if self.fs_file.is_some() {
|
||||
let fs_file = self.fs_file.as_ref().unwrap();
|
||||
let std_file = fs_file.0.as_ref().unwrap().clone();
|
||||
tokio::task::spawn_blocking(
|
||||
move || -> Result<(usize, ZeroCopyBuf), AnyError> {
|
||||
let mut std_file = std_file.lock().unwrap();
|
||||
Ok((std_file.read(&mut buf)?, buf))
|
||||
},
|
||||
)
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut std_file = std_file.lock().unwrap();
|
||||
std_file.read(&mut buf)
|
||||
})
|
||||
.await?
|
||||
.map_err(AnyError::from)
|
||||
} else {
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
@ -337,10 +330,7 @@ impl Resource for StdFileResource {
|
||||
self.name.as_str().into()
|
||||
}
|
||||
|
||||
fn read_return(
|
||||
self: Rc<Self>,
|
||||
buf: ZeroCopyBuf,
|
||||
) -> AsyncResult<(usize, ZeroCopyBuf)> {
|
||||
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
|
||||
Box::pin(self.read(buf))
|
||||
}
|
||||
|
||||
|
@ -14,28 +14,19 @@ use crate::magic::transl8::impl_magic;
|
||||
pub enum MagicBuffer {
|
||||
FromV8(ZeroCopyBuf),
|
||||
ToV8(Mutex<Option<Box<[u8]>>>),
|
||||
// Variant of the MagicBuffer than is never exposed to the JS.
|
||||
// Generally used to pass Vec<u8> backed buffers to resource methods.
|
||||
Temp(Vec<u8>),
|
||||
}
|
||||
|
||||
impl_magic!(MagicBuffer);
|
||||
|
||||
impl MagicBuffer {
|
||||
pub fn empty() -> Self {
|
||||
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
|
||||
}
|
||||
|
||||
pub fn new_temp(vec: Vec<u8>) -> Self {
|
||||
MagicBuffer::Temp(vec)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for MagicBuffer {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
|
||||
Self::Temp(vec) => Self::Temp(vec.clone()),
|
||||
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
@ -58,7 +49,6 @@ impl Deref for MagicBuffer {
|
||||
fn deref(&self) -> &[u8] {
|
||||
match self {
|
||||
Self::FromV8(buf) => &*buf,
|
||||
Self::Temp(vec) => &*vec,
|
||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
@ -68,7 +58,6 @@ impl DerefMut for MagicBuffer {
|
||||
fn deref_mut(&mut self) -> &mut [u8] {
|
||||
match self {
|
||||
Self::FromV8(buf) => &mut *buf,
|
||||
Self::Temp(vec) => &mut *vec,
|
||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
@ -96,7 +85,6 @@ impl ToV8 for MagicBuffer {
|
||||
let value: &[u8] = buf;
|
||||
value.into()
|
||||
}
|
||||
Self::Temp(_) => unreachable!(),
|
||||
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user