This commit is contained in:
David Sherret 2024-11-21 00:12:16 -05:00
parent a715757d9b
commit d7948d0ce9
3 changed files with 128 additions and 74 deletions

View File

@ -12,6 +12,7 @@ use deno_config::workspace::FolderConfigs;
use deno_config::workspace::TaskDefinition;
use deno_config::workspace::TaskOrScript;
use deno_config::workspace::WorkspaceDirectory;
use deno_config::workspace::WorkspaceMemberTasksConfig;
use deno_config::workspace::WorkspaceTasksConfig;
use deno_core::anyhow::anyhow;
use deno_core::anyhow::bail;
@ -270,11 +271,7 @@ impl<'a> TaskRunner<'a> {
pkg_tasks_config: &PackageTaskInfo,
) -> Result<i32, deno_core::anyhow::Error> {
match sort_tasks_topo(pkg_tasks_config) {
Ok(sorted) => {
self
.run_tasks_in_parallel(&pkg_tasks_config.tasks_config, sorted)
.await
}
Ok(sorted) => self.run_tasks_in_parallel(sorted).await,
Err(err) => match err {
TaskError::NotFound(name) => {
if self.task_flags.is_run {
@ -308,64 +305,62 @@ impl<'a> TaskRunner<'a> {
async fn run_tasks_in_parallel(
&self,
tasks_config: &WorkspaceTasksConfig,
task_names: Vec<String>,
tasks: Vec<ResolvedTask<'a>>,
) -> Result<i32, deno_core::anyhow::Error> {
struct PendingTasksContext {
completed: HashSet<String>,
running: HashSet<String>,
task_names: Vec<String>,
struct PendingTasksContext<'a> {
completed: HashSet<usize>,
running: HashSet<usize>,
tasks: &'a [ResolvedTask<'a>],
}
impl PendingTasksContext {
impl<'a> PendingTasksContext<'a> {
fn has_remaining_tasks(&self) -> bool {
self.completed.len() < self.task_names.len()
self.completed.len() < self.tasks.len()
}
fn mark_complete(&mut self, task_name: String) {
self.running.remove(&task_name);
self.completed.insert(task_name);
fn mark_complete(&mut self, task: &ResolvedTask) {
self.running.remove(&task.id);
self.completed.insert(task.id);
}
fn get_next_task<'a>(
fn get_next_task<'b>(
&mut self,
runner: &'a TaskRunner<'a>,
tasks_config: &'a WorkspaceTasksConfig,
) -> Option<LocalBoxFuture<'a, Result<(i32, String), AnyError>>> {
for name in &self.task_names {
if self.completed.contains(name) || self.running.contains(name) {
runner: &'b TaskRunner<'b>,
) -> Option<
LocalBoxFuture<'b, Result<(i32, &'a ResolvedTask<'a>), AnyError>>,
>
where
'a: 'b,
{
for task in self.tasks.iter() {
if self.completed.contains(&task.id)
|| self.running.contains(&task.id)
{
continue;
}
let Some((folder_url, task_or_script)) = tasks_config.task(name)
else {
continue;
};
let should_run = match task_or_script {
TaskOrScript::Task(_, def) => def
.dependencies
.iter()
.all(|dep| self.completed.contains(dep)),
TaskOrScript::Script(_, _) => true,
};
let should_run = task
.dependencies
.iter()
.all(|dep_id| self.completed.contains(dep_id));
if !should_run {
continue;
}
self.running.insert(name.clone());
let name = name.clone();
self.running.insert(task.id);
return Some(
async move {
match task_or_script {
match task.task_or_script {
TaskOrScript::Task(_, def) => {
runner.run_deno_task(folder_url, &name, def).await
runner.run_deno_task(task.folder_url, task.name, def).await
}
TaskOrScript::Script(scripts, _) => {
runner.run_npm_script(folder_url, &name, scripts).await
runner
.run_npm_script(task.folder_url, task.name, scripts)
.await
}
}
.map(|exit_code| (exit_code, name))
.map(|exit_code| (exit_code, task))
}
.boxed_local(),
);
@ -375,16 +370,16 @@ impl<'a> TaskRunner<'a> {
}
let mut context = PendingTasksContext {
completed: HashSet::with_capacity(task_names.len()),
completed: HashSet::with_capacity(tasks.len()),
running: HashSet::with_capacity(self.concurrency),
task_names,
tasks: &tasks,
};
let mut queue = futures_unordered::FuturesUnordered::new();
while context.has_remaining_tasks() {
while queue.len() < self.concurrency {
if let Some(task) = context.get_next_task(self, tasks_config) {
if let Some(task) = context.get_next_task(self) {
queue.push(task);
} else {
break;
@ -393,7 +388,7 @@ impl<'a> TaskRunner<'a> {
// If queue is empty at this point, then there are no more tasks in the queue.
let Some(result) = queue.next().await else {
debug_assert_eq!(context.task_names.len(), 0);
debug_assert_eq!(context.tasks.len(), 0);
break;
};
@ -521,46 +516,105 @@ enum TaskError {
TaskDepCycle { path: Vec<String> },
}
fn sort_tasks_topo(
pkg_task_config: &PackageTaskInfo,
) -> Result<Vec<String>, TaskError> {
struct ResolvedTask<'a> {
id: usize,
name: &'a str,
folder_url: &'a Url,
task_or_script: TaskOrScript<'a>,
dependencies: Vec<usize>,
}
fn sort_tasks_topo<'a>(
pkg_task_config: &'a PackageTaskInfo,
) -> Result<Vec<ResolvedTask<'a>>, TaskError> {
trait TasksConfig {
fn task(
&self,
name: &str,
) -> Option<(&Url, TaskOrScript, &dyn TasksConfig)>;
}
impl TasksConfig for WorkspaceTasksConfig {
fn task(
&self,
name: &str,
) -> Option<(&Url, TaskOrScript, &dyn TasksConfig)> {
if let Some(member) = &self.member {
if let Some((dir_url, task_or_script)) = member.task(name) {
return Some((dir_url, task_or_script, self as &dyn TasksConfig));
}
}
if let Some(root) = &self.root {
if let Some((dir_url, task_or_script)) = root.task(name) {
// switch to only using the root tasks for the dependencies
return Some((dir_url, task_or_script, root as &dyn TasksConfig));
}
}
None
}
}
impl TasksConfig for WorkspaceMemberTasksConfig {
fn task(
&self,
name: &str,
) -> Option<(&Url, TaskOrScript, &dyn TasksConfig)> {
self.task(name).map(|(dir_url, task_or_script)| {
(dir_url, task_or_script, self as &dyn TasksConfig)
})
}
}
fn sort_visit<'a>(
name: &'a str,
sorted: &mut Vec<String>,
mut path: Vec<&'a str>,
tasks_config: &'a WorkspaceTasksConfig,
) -> Result<(), TaskError> {
// Already sorted
if sorted.iter().any(|sorted_name| sorted_name == name) {
return Ok(());
}
// Graph has a cycle
if path.contains(&name) {
path.push(name);
return Err(TaskError::TaskDepCycle {
path: path.iter().map(|s| s.to_string()).collect(),
});
}
let Some((_, task_or_script)) = tasks_config.task(name) else {
sorted: &mut Vec<ResolvedTask<'a>>,
mut path: Vec<(&'a Url, &'a str)>,
tasks_config: &'a dyn TasksConfig,
) -> Result<usize, TaskError> {
let Some((folder_url, task_or_script, tasks_config)) =
tasks_config.task(name)
else {
return Err(TaskError::NotFound(name.to_string()));
};
if let Some(existing_task) = sorted
.iter()
.find(|task| task.name == name && task.folder_url == folder_url)
{
// already exists
return Ok(existing_task.id);
}
if path.contains(&(folder_url, name)) {
path.push((folder_url, name));
return Err(TaskError::TaskDepCycle {
path: path.iter().map(|(_, s)| s.to_string()).collect(),
});
}
let mut dependencies: Vec<usize> = Vec::new();
if let TaskOrScript::Task(_, task) = task_or_script {
dependencies.reserve(task.dependencies.len());
for dep in &task.dependencies {
let mut path = path.clone();
path.push(name);
sort_visit(dep, sorted, path, tasks_config)?
path.push((folder_url, name));
dependencies.push(sort_visit(dep, sorted, path, tasks_config)?);
}
}
sorted.push(name.to_string());
let id = sorted.len();
sorted.push(ResolvedTask {
id,
name,
folder_url,
task_or_script,
dependencies,
});
Ok(())
Ok(id)
}
let mut sorted: Vec<String> = vec![];
let mut sorted: Vec<ResolvedTask<'a>> = vec![];
for name in &pkg_task_config.matched_tasks {
sort_visit(name, &mut sorted, Vec::new(), &pkg_task_config.tasks_config)?;

View File

@ -1,5 +1,5 @@
Task a echo root-a
a
root-a
Task b echo b
b
Task a echo a

View File

@ -1,10 +1,10 @@
[UNORDERED_START]
Task build echo member
member
Task build echo root
root
Task root-depending-root echo test
test
Task root echo root
root
[UNORDERED_END]
Task member-depending-root-and-member echo member-test
Task member-dependending-root-and-member echo member-test
member-test