Move create_channels into worker constructor (#3889)

This commit is contained in:
Ryan Dahl 2020-02-05 02:40:38 -05:00 committed by GitHub
parent 7d115a2a65
commit 55ea854671
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 79 additions and 79 deletions

View File

@ -2,7 +2,6 @@
use crate::ops;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno_core;
use deno_core::StartupData;
use std::ops::Deref;
@ -28,10 +27,9 @@ impl CompilerWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_, external_channels);
let mut worker = Worker::new(name, startup_data, state_);
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);

View File

@ -240,11 +240,10 @@ impl TsCompiler {
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's
/// runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
let (int, ext) = ThreadSafeState::create_channels();
let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, entry_point, int)
ThreadSafeState::new(global_state.clone(), None, entry_point)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
@ -257,7 +256,6 @@ impl TsCompiler {
"TS".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
ext,
);
worker.execute("bootstrapTsCompilerRuntime()").unwrap();
worker

View File

@ -45,12 +45,11 @@ pub struct WasmCompiler {
impl WasmCompiler {
/// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
let (int, ext) = ThreadSafeState::create_channels();
let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts")
.unwrap();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, entry_point, int)
ThreadSafeState::new(global_state.clone(), None, entry_point)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
@ -63,7 +62,6 @@ impl WasmCompiler {
"WASM".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
ext,
);
worker.execute("bootstrapWasmCompilerRuntime()").unwrap();
worker

View File

@ -122,8 +122,7 @@ fn create_main_worker(
global_state: ThreadSafeGlobalState,
main_module: ModuleSpecifier,
) -> MainWorker {
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(global_state, None, main_module, int)
let state = ThreadSafeState::new(global_state, None, main_module)
.map_err(deno_error::print_err_and_exit)
.unwrap();
@ -136,12 +135,7 @@ fn create_main_worker(
resource_table.add("stderr", Box::new(stderr));
}
MainWorker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
)
MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state)
}
fn types_command() {

View File

@ -7,8 +7,6 @@ use crate::state::ThreadSafeState;
use deno_core::*;
use futures;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std;
use std::convert::From;
@ -31,8 +29,8 @@ fn op_worker_get_message(
) -> Result<JsonOp, ErrBox> {
let state_ = state.clone();
let op = async move {
let mut receiver = state_.worker_channels.receiver.lock().await;
let maybe_buf = receiver.next().await;
let c = state_.worker_channels_internal.lock().unwrap();
let maybe_buf = c.as_ref().unwrap().get_message().await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
@ -47,8 +45,9 @@ fn op_worker_post_message(
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut sender = state.worker_channels.sender.clone();
futures::executor::block_on(sender.send(d))
let c = state.worker_channels_internal.lock().unwrap();
let fut = c.as_ref().unwrap().post_message(d);
futures::executor::block_on(fut)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))

View File

@ -99,12 +99,10 @@ fn op_create_worker(
result.unwrap()
};
let (int, ext) = ThreadSafeState::create_channels();
let result = ThreadSafeState::new_for_worker(
parent_state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
module_specifier.clone(),
int,
);
if let Err(err) = result {
load_sender.send(Err(err)).unwrap();
@ -122,7 +120,6 @@ fn op_create_worker(
worker_name.to_string(),
startup_data::deno_isolate_init(),
child_state,
ext,
);
let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name);
js_check(worker.execute(&script));

View File

@ -9,7 +9,8 @@ use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::web_worker::WebWorker;
use crate::worker::WorkerChannels;
use crate::worker::WorkerChannelsExternal;
use crate::worker::WorkerChannelsInternal;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
@ -36,7 +37,6 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
use tokio::sync::Mutex as AsyncMutex;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@ -48,14 +48,13 @@ pub struct State {
pub global_state: ThreadSafeGlobalState,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: ModuleSpecifier,
// TODO(ry) rename to worker_channels_internal
pub worker_channels: WorkerChannels,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<HashMap<u32, WorkerChannels>>,
pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>,
pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>,
pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
@ -222,26 +221,11 @@ impl Loader for ThreadSafeState {
}
impl ThreadSafeState {
pub fn create_channels() -> (WorkerChannels, WorkerChannels) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
sender: out_tx,
receiver: Arc::new(AsyncMutex::new(in_rx)),
};
let external_channels = WorkerChannels {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
};
(internal_channels, external_channels)
}
/// If `shared_permission` is None then permissions from globa state are used.
pub fn new(
global_state: ThreadSafeGlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: ModuleSpecifier,
internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
match global_state.flags.import_map_path.as_ref() {
@ -265,9 +249,9 @@ impl ThreadSafeState {
main_module,
permissions,
import_map,
worker_channels: internal_channels,
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
worker_channels_internal: Mutex::new(None),
workers: Mutex::new(HashMap::new()),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
@ -286,7 +270,6 @@ impl ThreadSafeState {
global_state: ThreadSafeGlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: ModuleSpecifier,
internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let seeded_rng = match global_state.flags.seed {
Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))),
@ -304,9 +287,9 @@ impl ThreadSafeState {
main_module,
permissions,
import_map: None,
worker_channels: internal_channels,
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
worker_channels_internal: Mutex::new(None),
workers: Mutex::new(HashMap::new()),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
@ -388,17 +371,13 @@ impl ThreadSafeState {
}
#[cfg(test)]
pub fn mock(
main_module: &str,
internal_channels: WorkerChannels,
) -> ThreadSafeState {
pub fn mock(main_module: &str) -> ThreadSafeState {
let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module)
.expect("Invalid entry module");
ThreadSafeState::new(
ThreadSafeGlobalState::mock(vec!["deno".to_string()]),
None,
module_specifier,
internal_channels,
)
.unwrap()
}
@ -431,6 +410,5 @@ impl ThreadSafeState {
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
let (int, _) = ThreadSafeState::create_channels();
f(ThreadSafeState::mock("./hello.js", int));
f(ThreadSafeState::mock("./hello.js"));
}

View File

@ -2,7 +2,6 @@
use crate::ops;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData;
@ -28,10 +27,9 @@ impl WebWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_, external_channels);
let mut worker = Worker::new(name, startup_data, state_);
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
@ -76,13 +74,11 @@ mod tests {
use crate::tokio_util;
fn create_test_worker() -> WebWorker {
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::mock("./hello.js", int);
let state = ThreadSafeState::mock("./hello.js");
let mut worker = WebWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
);
worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap();
worker.execute("runWorkerMessageLoop()").unwrap();

View File

@ -52,6 +52,51 @@ impl WorkerChannels {
}
}
pub struct WorkerChannelsInternal(WorkerChannels);
impl Deref for WorkerChannelsInternal {
type Target = WorkerChannels;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WorkerChannelsInternal {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Clone)]
pub struct WorkerChannelsExternal(WorkerChannels);
impl Deref for WorkerChannelsExternal {
type Target = WorkerChannels;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WorkerChannelsExternal {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannelsInternal(WorkerChannels {
sender: out_tx,
receiver: Arc::new(AsyncMutex::new(in_rx)),
});
let external_channels = WorkerChannelsExternal(WorkerChannels {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
});
(internal_channels, external_channels)
}
/// Worker is a CLI wrapper for `deno_core::Isolate`.
///
/// It provides infrastructure to communicate with a worker and
@ -68,7 +113,7 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: ThreadSafeState,
external_channels: WorkerChannels,
external_channels: WorkerChannelsExternal,
}
impl Worker {
@ -76,7 +121,6 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
@ -86,6 +130,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
});
let (internal_channels, external_channels) = create_channels();
{
let mut c = state.worker_channels_internal.lock().unwrap();
*c = Some(internal_channels);
}
Self {
name,
isolate,
@ -128,7 +178,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
pub fn thread_safe_handle(&self) -> WorkerChannels {
pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
self.external_channels.clone()
}
}
@ -157,10 +207,9 @@ impl MainWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_, external_channels);
let mut worker = Worker::new(name, startup_data, state_);
{
let op_registry = worker.isolate.op_registry.clone();
let isolate = &mut worker.isolate;
@ -233,14 +282,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state =
ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@ -269,14 +317,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state =
ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@ -312,19 +359,16 @@ mod tests {
};
let global_state =
ThreadSafeGlobalState::new(flags, Progress::new()).unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
None,
module_specifier.clone(),
int,
)
.unwrap();
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state.clone(),
ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
let result = worker
@ -346,13 +390,11 @@ mod tests {
}
fn create_test_worker() -> MainWorker {
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::mock("./hello.js", int);
let state = ThreadSafeState::mock("./hello.js");
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
worker