refactor: update minimal ops & rename to buffer ops (#9719)

This commit rewrites "dispatch_minimal" into "dispatch_buffer".

It's part of an effort to unify JS interface for ops for both json
and minimal (buffer) ops.

Before this commit "minimal ops" could be either sync or async
depending on the return type from the op, but this commit changes
it to have separate signatures for sync and async ops (just like 
in case of json ops).
This commit is contained in:
Inteon 2021-03-18 14:10:27 +01:00 committed by GitHub
parent 0e70d9e59b
commit 20627c9136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 696 additions and 506 deletions

View File

@ -0,0 +1,76 @@
import {
assert,
assertEquals,
assertMatch,
unitTest,
unreachable,
} from "./test_util.ts";
const readErrorStackPattern = new RegExp(
`^.*
at handleError \\(.*10_dispatch_buffer\\.js:.*\\)
at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\)
at Array.<anonymous> \\(.*10_dispatch_buffer\\.js:.*\\).*$`,
"ms",
);
unitTest(async function sendAsyncStackTrace(): Promise<void> {
const buf = new Uint8Array(10);
const rid = 10;
try {
await Deno.read(rid, buf);
unreachable();
} catch (error) {
assertMatch(error.stack, readErrorStackPattern);
}
});
declare global {
// deno-lint-ignore no-namespace
namespace Deno {
// deno-lint-ignore no-explicit-any
var core: any; // eslint-disable-line no-var
}
}
unitTest(function bufferOpsHeaderTooShort(): void {
for (const op of ["op_read_sync", "op_read_async"]) {
const readOpId = Deno.core.ops()[op];
const res = Deno.core.send(
readOpId,
new Uint8Array([
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
]),
);
const headerByteLength = 4 * 4;
assert(res.byteLength > headerByteLength);
const view = new DataView(
res.buffer,
res.byteOffset + res.byteLength - headerByteLength,
headerByteLength,
);
const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);
assert(requestId === 0);
assert(status !== 0);
assertEquals(new TextDecoder().decode(res.slice(0, result)), "TypeError");
assertEquals(
new TextDecoder().decode(res.slice(result, -headerByteLength)).trim(),
"Unparsable control buffer",
);
}
});

View File

@ -1,49 +0,0 @@
import {
assert,
assertEquals,
assertMatch,
unitTest,
unreachable,
} from "./test_util.ts";
const readErrorStackPattern = new RegExp(
`^.*
at unwrapResponse \\(.*dispatch_minimal\\.js:.*\\)
at sendAsync \\(.*dispatch_minimal\\.js:.*\\)
at async Object\\.read \\(.*io\\.js:.*\\).*$`,
"ms",
);
unitTest(async function sendAsyncStackTrace(): Promise<void> {
const buf = new Uint8Array(10);
const rid = 10;
try {
await Deno.read(rid, buf);
unreachable();
} catch (error) {
assertMatch(error.stack, readErrorStackPattern);
}
});
declare global {
// deno-lint-ignore no-namespace
namespace Deno {
// deno-lint-ignore no-explicit-any
var core: any; // eslint-disable-line no-var
}
}
unitTest(function malformedMinimalControlBuffer(): void {
const readOpId = Deno.core.ops()["op_read"];
const res = Deno.core.send(readOpId, new Uint8Array([1, 2, 3, 4, 5]));
const header = res.slice(0, 12);
const buf32 = new Int32Array(
header.buffer,
header.byteOffset,
header.byteLength / 4,
);
const arg = buf32[1];
const codeAndMessage = new TextDecoder().decode(res.slice(12)).trim();
assert(arg < 0);
assertEquals(codeAndMessage, "TypeErrorUnparsable control buffer");
});

View File

@ -13,7 +13,7 @@ unitTest(async function metrics(): Promise<void> {
assert(m1.bytesSentControl > 0);
assert(m1.bytesSentData >= 0);
assert(m1.bytesReceived > 0);
const m1OpWrite = m1.ops["op_write"];
const m1OpWrite = m1.ops["op_write_async"];
assert(m1OpWrite.opsDispatchedAsync > 0);
assert(m1OpWrite.opsCompletedAsync > 0);
assert(m1OpWrite.bytesSentControl > 0);
@ -28,7 +28,7 @@ unitTest(async function metrics(): Promise<void> {
assert(m2.bytesSentControl > m1.bytesSentControl);
assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength);
assert(m2.bytesReceived > m1.bytesReceived);
const m2OpWrite = m2.ops["op_write"];
const m2OpWrite = m2.ops["op_write_async"];
assert(m2OpWrite.opsDispatchedAsync > m1OpWrite.opsDispatchedAsync);
assert(m2OpWrite.opsCompletedAsync > m1OpWrite.opsCompletedAsync);
assert(m2OpWrite.bytesSentControl > m1OpWrite.bytesSentControl);

View File

@ -15,7 +15,7 @@ import "./console_test.ts";
import "./copy_file_test.ts";
import "./custom_event_test.ts";
import "./dir_test.ts";
import "./dispatch_minimal_test.ts";
import "./dispatch_buffer_test.ts";
import "./dispatch_json_test.ts";
import "./error_stack_test.ts";
import "./event_test.ts";

View File

@ -155,6 +155,10 @@ SharedQueue Binary Layout
asyncHandlers[opId] = cb;
}
function setAsyncHandlerByName(opName, cb) {
setAsyncHandler(opsCache[opName], cb);
}
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
@ -256,6 +260,7 @@ SharedQueue Binary Layout
jsonOpAsync,
jsonOpSync,
setAsyncHandler,
setAsyncHandlerByName,
dispatch: send,
dispatchByName: dispatch,
ops,

View File

@ -0,0 +1,150 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const core = window.Deno.core;
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////// General async handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
// General Async response handling
let nextRequestId = 1;
const promiseTable = {};
function opAsync(opName, opRequestBuilder, opResultParser) {
// Make sure requests of this type are handled by the asyncHandler
// The asyncHandler's role is to call the "promiseTable[requestId]" function
core.setAsyncHandlerByName(opName, (bufUi8, _) => {
const [requestId, result, error] = opResultParser(bufUi8, true);
if (error !== null) {
promiseTable[requestId][1](error);
} else {
promiseTable[requestId][0](result);
}
delete promiseTable[requestId];
});
const requestId = nextRequestId++;
// Create and store promise
const promise = new Promise((resolve, reject) => {
promiseTable[requestId] = [resolve, reject];
});
// Synchronously dispatch async request
core.dispatchByName(opName, ...opRequestBuilder(requestId));
// Wait for async response
return promise;
}
function opSync(opName, opRequestBuilder, opResultParser) {
const rawResult = core.dispatchByName(opName, ...opRequestBuilder());
const [_, result, error] = opResultParser(rawResult, false);
if (error !== null) throw error;
return result;
}
////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////// Error handling /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
function handleError(className, message) {
const [ErrorClass, args] = core.getErrorClassAndArgs(className);
if (!ErrorClass) {
return new Error(
`Unregistered error class: "${className}"\n` +
` ${message}\n` +
` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);
}
return new ErrorClass(message, ...args);
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////// Buffer ops handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
const scratchBytes = new ArrayBuffer(3 * 4);
const scratchView = new DataView(
scratchBytes,
scratchBytes.byteOffset,
scratchBytes.byteLength,
);
function bufferOpBuildRequest(requestId, argument, zeroCopy) {
scratchView.setBigUint64(0, BigInt(requestId), true);
scratchView.setUint32(8, argument, true);
return [scratchView, ...zeroCopy];
}
function bufferOpParseResult(bufUi8, isCopyNeeded) {
// Decode header value from ui8 buffer
const headerByteLength = 4 * 4;
assert(bufUi8.byteLength >= headerByteLength);
assert(bufUi8.byteLength % 4 == 0);
const view = new DataView(
bufUi8.buffer,
bufUi8.byteOffset + bufUi8.byteLength - headerByteLength,
headerByteLength,
);
const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);
// Error handling
if (status !== 0) {
const className = core.decode(bufUi8.subarray(0, result));
const message = core.decode(bufUi8.subarray(result, -headerByteLength))
.trim();
return [requestId, null, handleError(className, message)];
}
if (bufUi8.byteLength === headerByteLength) {
return [requestId, result, null];
}
// Rest of response buffer is passed as reference or as a copy
let respBuffer = null;
if (isCopyNeeded) {
// Copy part of the response array (if sent through shared array buf)
respBuffer = bufUi8.slice(0, result);
} else {
// Create view on existing array (if sent through overflow)
respBuffer = bufUi8.subarray(0, result);
}
return [requestId, respBuffer, null];
}
function bufferOpAsync(opName, argument = 0, ...zeroCopy) {
return opAsync(
opName,
(requestId) => bufferOpBuildRequest(requestId, argument, zeroCopy),
bufferOpParseResult,
);
}
function bufferOpSync(opName, argument = 0, ...zeroCopy) {
return opSync(
opName,
() => bufferOpBuildRequest(0, argument, zeroCopy),
bufferOpParseResult,
);
}
window.__bootstrap.dispatchBuffer = {
bufferOpSync,
bufferOpAsync,
};
})(this);

View File

@ -1,115 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const core = window.Deno.core;
const util = window.__bootstrap.util;
// Using an object without a prototype because `Map` was causing GC problems.
const promiseTableMin = Object.create(null);
const decoder = new TextDecoder();
// Note it's important that promiseId starts at 1 instead of 0, because sync
// messages are indicated with promiseId 0. If we ever add wrap around logic for
// overflows, this should be taken into account.
let _nextPromiseId = 1;
function nextPromiseId() {
return _nextPromiseId++;
}
function recordFromBufMinimal(ui8) {
const headerLen = 12;
const header = ui8.subarray(0, headerLen);
const buf32 = new Int32Array(
header.buffer,
header.byteOffset,
header.byteLength / 4,
);
const promiseId = buf32[0];
const arg = buf32[1];
const result = buf32[2];
let err;
if (arg < 0) {
err = {
className: decoder.decode(ui8.subarray(headerLen, headerLen + result)),
message: decoder.decode(ui8.subarray(headerLen + result)),
};
} else if (ui8.length != 12) {
throw new TypeError("Malformed response message");
}
return {
promiseId,
arg,
result,
err,
};
}
function unwrapResponse(res) {
if (res.err != null) {
const [ErrorClass, args] = core.getErrorClassAndArgs(res.err.className);
if (!ErrorClass) {
throw new Error(
`Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);
}
throw new ErrorClass(res.err.message, ...args);
}
return res.result;
}
const scratch32 = new Int32Array(3);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength,
);
util.assert(scratchBytes.byteLength === scratch32.length * 4);
function asyncMsgFromRust(ui8) {
const record = recordFromBufMinimal(ui8);
const { promiseId } = record;
const promise = promiseTableMin[promiseId];
delete promiseTableMin[promiseId];
util.assert(promise);
promise.resolve(record);
}
async function sendAsync(opName, arg, zeroCopy) {
const promiseId = nextPromiseId(); // AKA cmdId
scratch32[0] = promiseId;
scratch32[1] = arg;
scratch32[2] = 0; // result
const promise = util.createResolvable();
const buf = core.dispatchByName(opName, scratchBytes, zeroCopy);
if (buf != null) {
const record = recordFromBufMinimal(buf);
// Sync result.
promise.resolve(record);
} else {
// Async result.
promiseTableMin[promiseId] = promise;
}
const res = await promise;
return unwrapResponse(res);
}
function sendSync(opName, arg, zeroCopy) {
scratch32[0] = 0; // promiseId 0 indicates sync
scratch32[1] = arg;
const res = core.dispatchByName(opName, scratchBytes, zeroCopy);
const resRecord = recordFromBufMinimal(res);
return unwrapResponse(resRecord);
}
window.__bootstrap.dispatchMinimal = {
asyncMsgFromRust,
sendSync,
sendAsync,
};
})(this);

View File

@ -4,7 +4,7 @@
((window) => {
const assert = window.__bootstrap.util.assert;
const core = window.Deno.core;
const { sendSync } = window.__bootstrap.dispatchMinimal;
const { bufferOpSync } = window.__bootstrap.dispatchBuffer;
function opStopGlobalTimer() {
core.jsonOpSync("op_global_timer_stop");
@ -20,7 +20,7 @@
const nowBytes = new Uint8Array(8);
function opNow() {
sendSync("op_now", 0, nowBytes);
bufferOpSync("op_now", 0, nowBytes);
return new DataView(nowBytes.buffer).getFloat64();
}

View File

@ -7,7 +7,7 @@
((window) => {
const DEFAULT_BUFFER_SIZE = 32 * 1024;
const { sendSync, sendAsync } = window.__bootstrap.dispatchMinimal;
const { bufferOpSync, bufferOpAsync } = window.__bootstrap.dispatchBuffer;
// Seek whence values.
// https://golang.org/pkg/io/#pkg-constants
const SeekMode = {
@ -81,7 +81,7 @@
return 0;
}
const nread = sendSync("op_read", rid, buffer);
const nread = bufferOpSync("op_read_sync", rid, buffer);
if (nread < 0) {
throw new Error("read error");
}
@ -97,7 +97,7 @@
return 0;
}
const nread = await sendAsync("op_read", rid, buffer);
const nread = await bufferOpAsync("op_read_async", rid, buffer);
if (nread < 0) {
throw new Error("read error");
}
@ -106,7 +106,7 @@
}
function writeSync(rid, data) {
const result = sendSync("op_write", rid, data);
const result = bufferOpSync("op_write_sync", rid, data);
if (result < 0) {
throw new Error("write error");
}
@ -115,7 +115,7 @@
}
async function write(rid, data) {
const result = await sendAsync("op_write", rid, data);
const result = await bufferOpAsync("op_write_async", rid, data);
if (result < 0) {
throw new Error("write error");
}

View File

@ -11,7 +11,6 @@ delete Object.prototype.__proto__;
const eventTarget = window.__bootstrap.eventTarget;
const globalInterfaces = window.__bootstrap.globalInterfaces;
const location = window.__bootstrap.location;
const dispatchMinimal = window.__bootstrap.dispatchMinimal;
const build = window.__bootstrap.build;
const version = window.__bootstrap.version;
const errorStack = window.__bootstrap.errorStack;
@ -142,12 +141,7 @@ delete Object.prototype.__proto__;
}
function runtimeStart(runtimeOptions, source) {
const opsMap = core.ops();
for (const [name, opId] of Object.entries(opsMap)) {
if (name === "op_write" || name === "op_read") {
core.setAsyncHandler(opId, dispatchMinimal.asyncMsgFromRust);
}
}
core.ops();
core.setMacrotaskCallback(timers.handleTimerMacrotask);
version.setVersions(

View File

@ -1,210 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use deno_core::error::AnyError;
use deno_core::futures::future::FutureExt;
use deno_core::BufVec;
use deno_core::Op;
use deno_core::OpFn;
use deno_core::OpState;
use std::cell::RefCell;
use std::future::Future;
use std::iter::repeat;
use std::mem::size_of_val;
use std::pin::Pin;
use std::rc::Rc;
use std::slice;
pub enum MinimalOp {
Sync(Result<i32, AnyError>),
Async(Pin<Box<dyn Future<Output = Result<i32, AnyError>>>>),
}
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
pub promise_id: i32,
pub arg: i32,
pub result: i32,
}
impl Into<Box<[u8]>> for Record {
fn into(self) -> Box<[u8]> {
let vec = vec![self.promise_id, self.arg, self.result];
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
pub struct ErrorRecord {
pub promise_id: i32,
pub arg: i32,
pub error_len: i32,
pub error_class: &'static [u8],
pub error_message: Vec<u8>,
}
impl Into<Box<[u8]>> for ErrorRecord {
fn into(self) -> Box<[u8]> {
let Self {
promise_id,
arg,
error_len,
error_class,
error_message,
..
} = self;
let header_i32 = [promise_id, arg, error_len];
let header_u8 = unsafe {
slice::from_raw_parts(
&header_i32 as *const _ as *const u8,
size_of_val(&header_i32),
)
};
let padded_len =
(header_u8.len() + error_class.len() + error_message.len() + 3usize)
& !3usize;
header_u8
.iter()
.cloned()
.chain(error_class.iter().cloned())
.chain(error_message.into_iter())
.chain(repeat(b' '))
.take(padded_len)
.collect()
}
}
pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
if bytes.len() % std::mem::size_of::<i32>() != 0 {
return None;
}
let p = bytes.as_ptr();
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *const i32;
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };
if s.len() != 3 {
return None;
}
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Some(Record {
promise_id: ints[0],
arg: ints[1],
result: ints[2],
})
}
pub fn minimal_op<F>(op_fn: F) -> Box<OpFn>
where
F: Fn(Rc<RefCell<OpState>>, bool, i32, BufVec) -> MinimalOp + 'static,
{
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| {
let mut bufs_iter = bufs.into_iter();
let record_buf = bufs_iter.next().expect("Expected record at position 0");
let zero_copy = bufs_iter.collect::<BufVec>();
let mut record = match parse_min_record(&record_buf) {
Some(r) => r,
None => {
let error_class = b"TypeError";
let error_message = b"Unparsable control buffer";
let error_record = ErrorRecord {
promise_id: 0,
arg: -1,
error_len: error_class.len() as i32,
error_class,
error_message: error_message[..].to_owned(),
};
return Op::Sync(error_record.into());
}
};
let is_sync = record.promise_id == 0;
let rid = record.arg;
let min_op = op_fn(state.clone(), is_sync, rid, zero_copy);
match min_op {
MinimalOp::Sync(sync_result) => Op::Sync(match sync_result {
Ok(r) => {
record.result = r;
record.into()
}
Err(err) => {
let error_class = (state.borrow().get_error_class_fn)(&err);
let error_record = ErrorRecord {
promise_id: record.promise_id,
arg: -1,
error_len: error_class.len() as i32,
error_class: error_class.as_bytes(),
error_message: err.to_string().as_bytes().to_owned(),
};
error_record.into()
}
}),
MinimalOp::Async(min_fut) => {
let fut = async move {
match min_fut.await {
Ok(r) => {
record.result = r;
record.into()
}
Err(err) => {
let error_class = (state.borrow().get_error_class_fn)(&err);
let error_record = ErrorRecord {
promise_id: record.promise_id,
arg: -1,
error_len: error_class.len() as i32,
error_class: error_class.as_bytes(),
error_message: err.to_string().as_bytes().to_owned(),
};
error_record.into()
}
}
};
Op::Async(fut.boxed_local())
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_record() {
let expected = vec![
1, 0, 0, 0, 255, 255, 255, 255, 11, 0, 0, 0, 66, 97, 100, 82, 101, 115,
111, 117, 114, 99, 101, 69, 114, 114, 111, 114,
];
let err_record = ErrorRecord {
promise_id: 1,
arg: -1,
error_len: 11,
error_class: b"BadResource",
error_message: b"Error".to_vec(),
};
let buf: Box<[u8]> = err_record.into();
assert_eq!(buf, expected.into_boxed_slice());
}
#[test]
fn test_parse_min_record() {
let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
assert_eq!(
parse_min_record(&buf),
Some(Record {
promise_id: 1,
arg: 3,
result: 4
})
);
let buf = vec![];
assert_eq!(parse_min_record(&buf), None);
let buf = vec![5];
assert_eq!(parse_min_record(&buf), None);
}
}

View File

@ -1,13 +1,8 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use super::dispatch_minimal::minimal_op;
use super::dispatch_minimal::MinimalOp;
use crate::metrics::metrics_op;
use deno_core::error::resource_unavailable;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::error::{bad_resource_id, not_supported};
use deno_core::futures::future::FutureExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
@ -24,7 +19,6 @@ use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::TryInto;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
@ -105,8 +99,12 @@ lazy_static! {
}
pub fn init(rt: &mut JsRuntime) {
rt.register_op("op_read", metrics_op("op_read", minimal_op(op_read)));
rt.register_op("op_write", metrics_op("op_write", minimal_op(op_write)));
super::reg_buffer_async(rt, "op_read_async", op_read_async);
super::reg_buffer_async(rt, "op_write_async", op_write_async);
super::reg_buffer_sync(rt, "op_read_sync", op_read_sync);
super::reg_buffer_sync(rt, "op_write_sync", op_write_sync);
super::reg_json_async(rt, "op_shutdown", op_shutdown);
}
@ -138,10 +136,6 @@ fn get_stdio_stream(
}
}
fn no_buffer_specified() -> AnyError {
type_error("no buffer specified")
}
#[cfg(unix)]
use nix::sys::termios;
@ -526,36 +520,15 @@ impl Resource for StdFileResource {
}
}
pub fn op_read(
state: Rc<RefCell<OpState>>,
is_sync: bool,
rid: i32,
bufs: BufVec,
) -> MinimalOp {
match bufs.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
};
let buf = bufs.into_iter().next().unwrap();
if is_sync {
MinimalOp::Sync(op_read_sync(state, rid, buf))
} else {
MinimalOp::Async(op_read_async(state, rid, buf).boxed_local())
}
}
fn op_read_sync(
state: Rc<RefCell<OpState>>,
rid: i32,
mut buf: ZeroCopyBuf,
) -> Result<i32, AnyError> {
let rid = rid.try_into().map_err(|_| bad_resource_id())?;
StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r {
state: &mut OpState,
rid: u32,
bufs: &mut [ZeroCopyBuf],
) -> Result<u32, AnyError> {
StdFileResource::with(state, rid, move |r| match r {
Ok(std_file) => std_file
.read(&mut buf)
.map(|n: usize| n as i32)
.read(&mut bufs[0])
.map(|n: usize| n as u32)
.map_err(AnyError::from),
Err(_) => Err(not_supported()),
})
@ -563,65 +536,44 @@ fn op_read_sync(
async fn op_read_async(
state: Rc<RefCell<OpState>>,
rid: i32,
mut buf: ZeroCopyBuf,
) -> Result<i32, AnyError> {
let rid = rid.try_into().map_err(|_| bad_resource_id())?;
rid: u32,
mut bufs: BufVec,
) -> Result<u32, AnyError> {
let buf = &mut bufs[0];
let resource = state
.borrow()
.resource_table
.get_any(rid)
.ok_or_else(bad_resource_id)?;
let nread = if let Some(s) = resource.downcast_rc::<ChildStdoutResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<ChildStderrResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else if let Some(s) = resource.downcast_rc::<StdFileResource>() {
s.read(&mut buf).await?
s.read(buf).await?
} else {
return Err(not_supported());
};
Ok(nread as i32)
}
pub fn op_write(
state: Rc<RefCell<OpState>>,
is_sync: bool,
rid: i32,
bufs: BufVec,
) -> MinimalOp {
match bufs.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
};
let buf = bufs.into_iter().next().unwrap();
if is_sync {
MinimalOp::Sync(op_write_sync(state, rid, buf))
} else {
MinimalOp::Async(op_write_async(state, rid, buf).boxed_local())
}
Ok(nread as u32)
}
fn op_write_sync(
state: Rc<RefCell<OpState>>,
rid: i32,
buf: ZeroCopyBuf,
) -> Result<i32, AnyError> {
let rid = rid.try_into().map_err(|_| bad_resource_id())?;
StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r {
state: &mut OpState,
rid: u32,
bufs: &mut [ZeroCopyBuf],
) -> Result<u32, AnyError> {
StdFileResource::with(state, rid, move |r| match r {
Ok(std_file) => std_file
.write(&buf)
.map(|nwritten: usize| nwritten as i32)
.write(&bufs[0])
.map(|nwritten: usize| nwritten as u32)
.map_err(AnyError::from),
Err(_) => Err(not_supported()),
})
@ -629,36 +581,36 @@ fn op_write_sync(
async fn op_write_async(
state: Rc<RefCell<OpState>>,
rid: i32,
buf: ZeroCopyBuf,
) -> Result<i32, AnyError> {
let rid = rid.try_into().map_err(|_| bad_resource_id())?;
rid: u32,
bufs: BufVec,
) -> Result<u32, AnyError> {
let buf = &bufs[0];
let resource = state
.borrow()
.resource_table
.get_any(rid)
.ok_or_else(bad_resource_id)?;
let nwritten = if let Some(s) = resource.downcast_rc::<ChildStdinResource>() {
s.write(&buf).await?
s.write(buf).await?
} else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
s.write(&buf).await?
s.write(buf).await?
} else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() {
s.write(&buf).await?
s.write(buf).await?
} else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() {
s.write(&buf).await?
s.write(buf).await?
} else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
s.write(&buf).await?
s.write(buf).await?
} else if let Some(s) = resource.downcast_rc::<StdFileResource>() {
s.write(&buf).await?
s.write(buf).await?
} else {
return Err(not_supported());
};
Ok(nwritten as i32)
Ok(nwritten as u32)
}
#[derive(Deserialize)]
struct ShutdownArgs {
rid: i32,
rid: u32,
}
async fn op_shutdown(
@ -666,10 +618,7 @@ async fn op_shutdown(
args: Value,
_zero_copy: BufVec,
) -> Result<Value, AnyError> {
let rid = serde_json::from_value::<ShutdownArgs>(args)?
.rid
.try_into()
.map_err(|_| bad_resource_id())?;
let rid = serde_json::from_value::<ShutdownArgs>(args)?.rid;
let resource = state
.borrow()
.resource_table

View File

@ -1,8 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
mod dispatch_minimal;
pub use dispatch_minimal::MinimalOp;
pub mod crypto;
pub mod fetch;
pub mod fs;
@ -11,6 +8,7 @@ pub mod io;
pub mod net;
#[cfg(unix)]
mod net_unix;
mod ops_buffer;
pub mod os;
pub mod permissions;
pub mod plugin;
@ -36,6 +34,9 @@ use deno_core::BufVec;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use ops_buffer::buffer_op_async;
use ops_buffer::buffer_op_sync;
use ops_buffer::ValueOrVector;
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
@ -62,6 +63,26 @@ where
rt.register_op(name, metrics_op(name, json_op_sync(op_fn)));
}
pub fn reg_buffer_async<F, R, RV>(
rt: &mut JsRuntime,
name: &'static str,
op_fn: F,
) where
F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: ValueOrVector,
{
rt.register_op(name, metrics_op(name, buffer_op_async(op_fn)));
}
pub fn reg_buffer_sync<F, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
{
rt.register_op(name, metrics_op(name, buffer_op_sync(op_fn)));
}
/// `UnstableChecker` is a struct so it can be placed inside `GothamState`;
/// using type alias for a bool could work, but there's a high chance
/// that there might be another type alias pointing to a bool, which

377
runtime/ops/ops_buffer.rs Normal file
View File

@ -0,0 +1,377 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use deno_core::error::AnyError;
use deno_core::futures::future::FutureExt;
use deno_core::BufVec;
use deno_core::Op;
use deno_core::OpFn;
use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use std::boxed::Box;
use std::cell::RefCell;
use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct RequestHeader {
pub request_id: u64,
pub argument: u32,
}
impl RequestHeader {
pub fn from_raw(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 3 * 4 {
return None;
}
Some(Self {
request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
})
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct ResponseHeader {
pub request_id: u64,
pub status: u32,
pub result: u32,
}
impl Into<[u8; 16]> for ResponseHeader {
fn into(self) -> [u8; 16] {
let mut resp_header = [0u8; 16];
resp_header[0..8].copy_from_slice(&self.request_id.to_le_bytes());
resp_header[8..12].copy_from_slice(&self.status.to_le_bytes());
resp_header[12..16].copy_from_slice(&self.result.to_le_bytes());
resp_header
}
}
pub trait ValueOrVector {
fn value(&self) -> u32;
fn vector(self) -> Option<Vec<u8>>;
}
impl ValueOrVector for Vec<u8> {
fn value(&self) -> u32 {
self.len() as u32
}
fn vector(self) -> Option<Vec<u8>> {
Some(self)
}
}
impl ValueOrVector for u32 {
fn value(&self) -> u32 {
*self
}
fn vector(self) -> Option<Vec<u8>> {
None
}
}
fn gen_padding_32bit(len: usize) -> &'static [u8] {
&[b' ', b' ', b' '][0..(4 - (len & 3)) & 3]
}
/// Creates an op that passes data synchronously using raw ui8 buffer.
///
/// The provided function `op_fn` has the following parameters:
/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
/// * `argument`: the i32 value that is passed to the Rust function.
/// * `&mut [ZeroCopyBuf]`: raw bytes passed along.
///
/// `op_fn` returns an array buffer value, which is directly returned to JavaScript.
///
/// When registering an op like this...
/// ```ignore
/// let mut runtime = JsRuntime::new(...);
/// runtime.register_op("hello", deno_core::buffer_op_sync(Self::hello_op));
/// ```
///
/// ...it can be invoked from JS using the provided name, for example:
/// ```js
/// Deno.core.ops();
/// let result = Deno.core.bufferOpSync("function_name", args);
/// ```
///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory.
pub fn buffer_op_sync<F, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
{
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
let mut bufs_iter = bufs.into_iter();
let record_buf = bufs_iter.next().expect("Expected record at position 0");
let mut zero_copy = bufs_iter.collect::<BufVec>();
let req_header = match RequestHeader::from_raw(&record_buf) {
Some(r) => r,
None => {
let error_class = b"TypeError";
let error_message = b"Unparsable control buffer";
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: 0,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
};
match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) {
Ok(possibly_vector) => {
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 0,
result: possibly_vector.value(),
};
let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
let resp_vector = match possibly_vector.vector() {
Some(mut vector) => {
let padding = gen_padding_32bit(vector.len());
vector.extend(padding);
vector.extend(&resp_encoded_header);
vector
}
None => resp_encoded_header.to_vec(),
};
Op::Sync(resp_vector.into_boxed_slice())
}
Err(error) => {
let error_class =
(state.borrow().get_error_class_fn)(&error).as_bytes();
let error_message = error.to_string().as_bytes().to_owned();
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
}
})
}
/// Creates an op that passes data asynchronously using raw ui8 buffer.
///
/// The provided function `op_fn` has the following parameters:
/// * `Rc<RefCell<OpState>>`: the op state, can be used to read/write resources in the runtime from an op.
/// * `argument`: the i32 value that is passed to the Rust function.
/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
///
/// `op_fn` returns a future, whose output is a JSON value. This value will be asynchronously
/// returned to JavaScript.
///
/// When registering an op like this...
/// ```ignore
/// let mut runtime = JsRuntime::new(...);
/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
/// ```
///
/// ...it can be invoked from JS using the provided name, for example:
/// ```js
/// Deno.core.ops();
/// let future = Deno.core.jsonOpAsync("function_name", args);
/// ```
///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory.
pub fn buffer_op_async<F, R, RV>(op_fn: F) -> Box<OpFn>
where
F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: ValueOrVector,
{
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
let mut bufs_iter = bufs.into_iter();
let record_buf = bufs_iter.next().expect("Expected record at position 0");
let zero_copy = bufs_iter.collect::<BufVec>();
let req_header = match RequestHeader::from_raw(&record_buf) {
Some(r) => r,
None => {
let error_class = b"TypeError";
let error_message = b"Unparsable control buffer";
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: 0,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
};
let fut =
op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| {
match result {
Ok(possibly_vector) => {
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 0,
result: possibly_vector.value(),
};
let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
let resp_vector = match possibly_vector.vector() {
Some(mut vector) => {
let padding = gen_padding_32bit(vector.len());
vector.extend(padding);
vector.extend(&resp_encoded_header);
vector
}
None => resp_encoded_header.to_vec(),
};
resp_vector.into_boxed_slice()
}
Err(error) => {
let error_class =
(state.borrow().get_error_class_fn)(&error).as_bytes();
let error_message = error.to_string().as_bytes().to_owned();
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 1,
result: error_class.len() as u32,
};
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect()
}
}
});
let temp = Box::pin(fut);
Op::Async(temp)
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn padding() {
assert_eq!(gen_padding_32bit(0), &[] as &[u8]);
assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']);
assert_eq!(gen_padding_32bit(2), &[b' ', b' ']);
assert_eq!(gen_padding_32bit(3), &[b' ']);
assert_eq!(gen_padding_32bit(4), &[] as &[u8]);
assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']);
}
#[test]
fn response_header_to_bytes() {
// Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
let resp_header = ResponseHeader {
request_id: 0x0102030405060708u64,
status: 0x090A0B0Cu32,
result: 0x0D0E0F10u32,
};
// All numbers are always little-endian encoded, as the js side also wants this to be fixed
assert_eq!(
&Into::<[u8; 16]>::into(resp_header),
&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13]
);
}
#[test]
fn response_header_to_bytes_max_value() {
// Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
let resp_header = ResponseHeader {
request_id: (1u64 << 53u64) - 1u64,
status: 0xFFFFFFFFu32,
result: 0xFFFFFFFFu32,
};
// All numbers are always little-endian encoded, as the js side also wants this to be fixed
assert_eq!(
&Into::<[u8; 16]>::into(resp_header),
&[
255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255,
255
]
);
}
#[test]
fn request_header_from_bytes() {
let req_header =
RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9])
.unwrap();
assert_eq!(req_header.request_id, 0x0102030405060708u64);
assert_eq!(req_header.argument, 0x090A0B0Cu32);
}
#[test]
fn request_header_from_bytes_max_value() {
let req_header = RequestHeader::from_raw(&[
255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255,
])
.unwrap();
assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64);
assert_eq!(req_header.argument, 0xFFFFFFFFu32);
}
#[test]
fn request_header_from_bytes_too_short() {
let req_header =
RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]);
assert_eq!(req_header, None);
}
#[test]
fn request_header_from_bytes_long() {
let req_header = RequestHeader::from_raw(&[
8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21,
])
.unwrap();
assert_eq!(req_header.request_id, 0x0102030405060708u64);
assert_eq!(req_header.argument, 0x090A0B0Cu32);
}
}

View File

@ -8,9 +8,6 @@
//! only need to be able to start, cancel and await a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.
use super::dispatch_minimal::minimal_op;
use super::dispatch_minimal::MinimalOp;
use crate::metrics::metrics_op;
use crate::permissions::Permissions;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@ -81,7 +78,7 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop);
super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start);
super::reg_json_async(rt, "op_global_timer", op_global_timer);
rt.register_op("op_now", metrics_op("op_now", minimal_op(op_now)));
super::reg_buffer_sync(rt, "op_now", op_now);
super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync);
}
@ -143,21 +140,16 @@ async fn op_global_timer(
// If the High precision flag is not set, the
// nanoseconds are rounded on 2ms.
fn op_now(
state: Rc<RefCell<OpState>>,
// Arguments are discarded
_sync: bool,
_x: i32,
mut zero_copy: BufVec,
) -> MinimalOp {
op_state: &mut OpState,
_argument: u32,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<u32, AnyError> {
match zero_copy.len() {
0 => return MinimalOp::Sync(Err(type_error("no buffer specified"))),
0 => return Err(type_error("no buffer specified")),
1 => {}
_ => {
return MinimalOp::Sync(Err(type_error("Invalid number of arguments")))
}
_ => return Err(type_error("Invalid number of arguments")),
}
let op_state = state.borrow();
let start_time = op_state.borrow::<StartTime>();
let seconds = start_time.elapsed().as_secs();
let mut subsec_nanos = start_time.elapsed().subsec_nanos() as f64;
@ -174,7 +166,7 @@ fn op_now(
(&mut zero_copy[0]).copy_from_slice(&result.to_be_bytes());
MinimalOp::Sync(Ok(0))
Ok(0)
}
#[derive(Deserialize)]