refactor(lsp): remove circular dependency between LanguageServer and DiagnosticsServer (#13577)

This commit is contained in:
David Sherret 2022-02-02 18:02:59 -05:00 committed by GitHub
parent de5a4a1757
commit ed3086e4b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 100 deletions

View File

@ -36,6 +36,19 @@ fn incremental_change_wait(bench: &mut Bencher) {
}),
)
.unwrap();
let (id, method, _): (u64, String, Option<Value>) =
client.read_request().unwrap();
assert_eq!(method, "workspace/configuration");
client
.write_response(
id,
json!({
"enable": true
}),
)
.unwrap();
let (method, _maybe_diag): (String, Option<Value>) =
client.read_notification().unwrap();
assert_eq!(method, "textDocument/publishDiagnostics");

View File

@ -8,6 +8,7 @@ use super::documents;
use super::documents::Document;
use super::documents::Documents;
use super::language_server;
use super::language_server::StateSnapshot;
use super::performance::Performance;
use super::tsc;
use super::tsc::TsServer;
@ -37,6 +38,8 @@ use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
pub(crate) type SnapshotForDiagnostics =
(Arc<StateSnapshot>, Arc<ConfigSnapshot>, Option<LintConfig>);
pub type DiagnosticRecord =
(ModuleSpecifier, Option<i32>, Vec<lsp::Diagnostic>);
pub type DiagnosticVec = Vec<DiagnosticRecord>;
@ -92,10 +95,53 @@ impl DiagnosticsPublisher {
}
}
#[derive(Clone, Default, Debug)]
struct TsDiagnosticsStore(Arc<deno_core::parking_lot::Mutex<DiagnosticMap>>);
impl TsDiagnosticsStore {
pub fn get(
&self,
specifier: &ModuleSpecifier,
document_version: Option<i32>,
) -> Vec<lsp::Diagnostic> {
let ts_diagnostics = self.0.lock();
if let Some((diagnostics_doc_version, diagnostics)) =
ts_diagnostics.get(specifier)
{
// only get the diagnostics if they're up to date
if document_version == *diagnostics_doc_version {
return diagnostics.clone();
}
}
Vec::new()
}
pub fn invalidate(&self, specifiers: &[ModuleSpecifier]) {
let mut ts_diagnostics = self.0.lock();
for specifier in specifiers {
ts_diagnostics.remove(specifier);
}
}
pub fn invalidate_all(&self) {
self.0.lock().clear();
}
fn update(&self, diagnostics: &DiagnosticVec) {
let mut stored_ts_diagnostics = self.0.lock();
*stored_ts_diagnostics = diagnostics
.iter()
.map(|(specifier, version, diagnostics)| {
(specifier.clone(), (*version, diagnostics.clone()))
})
.collect();
}
}
#[derive(Debug)]
pub(crate) struct DiagnosticsServer {
channel: Option<mpsc::UnboundedSender<()>>,
ts_diagnostics: Arc<Mutex<DiagnosticMap>>,
channel: Option<mpsc::UnboundedSender<SnapshotForDiagnostics>>,
ts_diagnostics: TsDiagnosticsStore,
client: Client,
performance: Arc<Performance>,
ts_server: Arc<TsServer>,
@ -116,44 +162,28 @@ impl DiagnosticsServer {
}
}
pub(crate) async fn get_ts_diagnostics(
pub(crate) fn get_ts_diagnostics(
&self,
specifier: &ModuleSpecifier,
document_version: Option<i32>,
) -> Vec<lsp::Diagnostic> {
let ts_diagnostics = self.ts_diagnostics.lock().await;
if let Some((diagnostics_doc_version, diagnostics)) =
ts_diagnostics.get(specifier)
{
// only get the diagnostics if they're up to date
if document_version == *diagnostics_doc_version {
return diagnostics.clone();
}
}
Vec::new()
self.ts_diagnostics.get(specifier, document_version)
}
pub(crate) async fn invalidate(&self, specifiers: Vec<ModuleSpecifier>) {
let mut ts_diagnostics = self.ts_diagnostics.lock().await;
for specifier in &specifiers {
ts_diagnostics.remove(specifier);
}
pub(crate) fn invalidate(&self, specifiers: &[ModuleSpecifier]) {
self.ts_diagnostics.invalidate(specifiers);
}
pub(crate) async fn invalidate_all(&self) {
let mut ts_diagnostics = self.ts_diagnostics.lock().await;
ts_diagnostics.clear();
pub(crate) fn invalidate_all(&self) {
self.ts_diagnostics.invalidate_all();
}
pub(crate) fn start(
&mut self,
language_server: Arc<Mutex<language_server::Inner>>,
) {
let (tx, mut rx) = mpsc::unbounded_channel::<()>();
pub(crate) fn start(&mut self) {
let (tx, mut rx) = mpsc::unbounded_channel::<SnapshotForDiagnostics>();
self.channel = Some(tx);
let client = self.client.clone();
let performance = self.performance.clone();
let stored_ts_diagnostics = self.ts_diagnostics.clone();
let ts_diagnostics_store = self.ts_diagnostics.clone();
let ts_server = self.ts_server.clone();
let _join_handle = thread::spawn(move || {
@ -170,28 +200,19 @@ impl DiagnosticsServer {
match rx.recv().await {
// channel has closed
None => break,
Some(()) => {
Some((snapshot, config, maybe_lint_config)) => {
// cancel the previous run
token.cancel();
token = CancellationToken::new();
diagnostics_publisher.clear().await;
let (snapshot, config, maybe_lint_config) = {
let language_server = language_server.lock().await;
(
language_server.snapshot(),
language_server.config.snapshot(),
language_server.maybe_lint_config.clone(),
)
};
let previous_ts_handle = ts_handle.take();
ts_handle = Some(tokio::spawn({
let performance = performance.clone();
let diagnostics_publisher = diagnostics_publisher.clone();
let ts_server = ts_server.clone();
let token = token.clone();
let stored_ts_diagnostics = stored_ts_diagnostics.clone();
let ts_diagnostics_store = ts_diagnostics_store.clone();
let snapshot = snapshot.clone();
let config = config.clone();
async move {
@ -227,17 +248,7 @@ impl DiagnosticsServer {
.unwrap_or_default();
if !token.is_cancelled() {
{
let mut stored_ts_diagnostics =
stored_ts_diagnostics.lock().await;
*stored_ts_diagnostics = diagnostics
.iter()
.map(|(specifier, version, diagnostics)| {
(specifier.clone(), (*version, diagnostics.clone()))
})
.collect();
}
ts_diagnostics_store.update(&diagnostics);
diagnostics_publisher.publish(diagnostics, &token).await;
if !token.is_cancelled() {
@ -310,9 +321,15 @@ impl DiagnosticsServer {
});
}
pub(crate) fn update(&self) -> Result<(), AnyError> {
pub(crate) fn update(
&self,
message: SnapshotForDiagnostics,
) -> Result<(), AnyError> {
// todo(dsherret): instead of queuing up messages, it would be better to
// instead only store the latest message (ex. maybe using a
// tokio::sync::watch::channel)
if let Some(tx) = &self.channel {
tx.send(()).map_err(|err| err.into())
tx.send(message).map_err(|err| err.into())
} else {
Err(anyhow!("diagnostics server not started"))
}

View File

@ -41,6 +41,7 @@ use super::diagnostics::DiagnosticsServer;
use super::documents::to_hover_text;
use super::documents::to_lsp_range;
use super::documents::AssetOrDocument;
use super::documents::Document;
use super::documents::Documents;
use super::documents::LanguageId;
use super::logging::lsp_log;
@ -755,15 +756,8 @@ impl Inner {
&mut self,
specifier: &ModuleSpecifier,
params: DidOpenTextDocumentParams,
) {
) -> Document {
let mark = self.performance.mark("did_open", Some(&params));
if params.text_document.uri.scheme() == "deno" {
// we can ignore virtual text documents opening, as they don't need to
// be tracked in memory, as they are static assets that won't change
// already managed by the language service
return;
}
let language_id =
params
.text_document
@ -787,17 +781,8 @@ impl Inner {
content,
);
if document.is_diagnosable() {
self
.diagnostics_server
.invalidate(self.documents.dependents(specifier))
.await;
if let Err(err) = self.diagnostics_server.update() {
error!("{}", err);
}
}
self.performance.measure(mark);
document
}
async fn did_change(&mut self, params: DidChangeTextDocumentParams) {
@ -812,11 +797,8 @@ impl Inner {
if document.is_diagnosable() {
self
.diagnostics_server
.invalidate(self.documents.dependents(&specifier))
.await;
if let Err(err) = self.diagnostics_server.update() {
error!("{}", err);
}
.invalidate(&self.documents.dependents(&specifier));
self.send_diagnostics_update();
}
}
Err(err) => error!("{}", err),
@ -840,10 +822,8 @@ impl Inner {
if self.is_diagnosable(&specifier) {
let mut specifiers = self.documents.dependents(&specifier);
specifiers.push(specifier.clone());
self.diagnostics_server.invalidate(specifiers).await;
if let Err(err) = self.diagnostics_server.update() {
error!("{}", err);
}
self.diagnostics_server.invalidate(&specifiers);
self.send_diagnostics_update();
}
self.performance.measure(mark);
}
@ -887,13 +867,13 @@ impl Inner {
if let Err(err) = self.update_tsconfig().await {
self.client.show_message(MessageType::WARNING, err).await;
}
if let Err(err) = self.diagnostics_server.update() {
error!("{}", err);
}
self.documents.update_config(
self.maybe_import_map.clone(),
self.maybe_config_file.as_ref(),
);
self.send_diagnostics_update();
}
async fn did_change_watched_files(
@ -936,10 +916,8 @@ impl Inner {
self.maybe_import_map.clone(),
self.maybe_config_file.as_ref(),
);
self.diagnostics_server.invalidate_all().await;
if let Err(err) = self.diagnostics_server.update() {
error!("Cannot update diagnostics: {}", err);
}
self.diagnostics_server.invalidate_all();
self.send_diagnostics_update();
}
self.performance.measure(mark);
}
@ -1185,8 +1163,7 @@ impl Inner {
let mut code_actions = CodeActionCollection::default();
let file_diagnostics = self
.diagnostics_server
.get_ts_diagnostics(&specifier, asset_or_doc.document_lsp_version())
.await;
.get_ts_diagnostics(&specifier, asset_or_doc.document_lsp_version());
for diagnostic in &fixable_diagnostics {
match diagnostic.source.as_deref() {
Some("deno-ts") => {
@ -2342,6 +2319,17 @@ impl Inner {
self.performance.measure(mark);
Ok(maybe_symbol_information)
}
fn send_diagnostics_update(&self) {
let snapshot = (
self.snapshot(),
self.config.snapshot(),
self.maybe_lint_config.clone(),
);
if let Err(err) = self.diagnostics_server.update(snapshot) {
error!("Cannot update diagnostics: {}", err);
}
}
}
#[lspower::async_trait]
@ -2351,7 +2339,7 @@ impl lspower::LanguageServer for LanguageServer {
params: InitializeParams,
) -> LspResult<InitializeResult> {
let mut language_server = self.0.lock().await;
language_server.diagnostics_server.start(self.0.clone());
language_server.diagnostics_server.start();
language_server.initialize(params).await
}
@ -2364,14 +2352,29 @@ impl lspower::LanguageServer for LanguageServer {
}
async fn did_open(&self, params: DidOpenTextDocumentParams) {
if params.text_document.uri.scheme() == "deno" {
// we can ignore virtual text documents opening, as they don't need to
// be tracked in memory, as they are static assets that won't change
// already managed by the language service
return;
}
let (client, uri, specifier, had_specifier_settings) = {
let mut inner = self.0.lock().await;
let client = inner.client.clone();
let uri = params.text_document.uri.clone();
let specifier = inner.url_map.normalize_url(&uri);
inner.did_open(&specifier, params).await;
let document = inner.did_open(&specifier, params).await;
let has_specifier_settings =
inner.config.has_specifier_settings(&specifier);
if document.is_diagnosable() {
let specifiers = inner.documents.dependents(&specifier);
inner.diagnostics_server.invalidate(&specifiers);
// don't send diagnotics yet if we don't have the specifier settings
if has_specifier_settings {
inner.send_diagnostics_update();
}
}
(client, uri, specifier, has_specifier_settings)
};
@ -2381,12 +2384,12 @@ impl lspower::LanguageServer for LanguageServer {
let language_server = self.clone();
tokio::spawn(async move {
let response = client.specifier_configuration(&uri).await;
let mut inner = language_server.0.lock().await;
match response {
Ok(specifier_settings) => {
// now update the config
let mut inner = language_server.0.lock().await;
// now update the config and send a diagnostics update
inner.config.set_specifier_settings(
specifier,
specifier.clone(),
uri,
specifier_settings,
);
@ -2395,6 +2398,14 @@ impl lspower::LanguageServer for LanguageServer {
error!("{}", err);
}
}
if inner
.documents
.get(&specifier)
.map(|d| d.is_diagnosable())
.unwrap_or(false)
{
inner.send_diagnostics_update();
}
});
}
}
@ -2722,12 +2733,9 @@ impl Inner {
// now that we have dependencies loaded, we need to re-analyze them and
// invalidate some diagnostics
self.diagnostics_server.invalidate(vec![referrer]).await;
self.diagnostics_server.invalidate(&[referrer]);
self.send_diagnostics_update();
self.diagnostics_server.update().map_err(|err| {
error!("{}", err);
LspError::internal_error()
})?;
self.performance.measure(mark);
Ok(Some(json!(true)))
}

View File

@ -57,6 +57,11 @@ where
.write_notification("textDocument/didOpen", params)
.unwrap();
handle_configuration_request(client);
read_diagnostics(client).0
}
fn handle_configuration_request(client: &mut LspClient) {
let (id, method, _) = client.read_request::<Value>().unwrap();
assert_eq!(method, "workspace/configuration");
client
@ -70,8 +75,6 @@ where
}]),
)
.unwrap();
read_diagnostics(client).0
}
fn read_diagnostics(client: &mut LspClient) -> CollectedDiagnostics {
@ -490,6 +493,7 @@ fn lsp_import_assertions() {
}),
)
.unwrap();
handle_configuration_request(&mut client);
let diagnostics = CollectedDiagnostics(did_open(
&mut client,

View File

@ -122,7 +122,16 @@ where
R: de::DeserializeOwned,
{
let maybe_params = match maybe_params {
Some(params) => Some(serde_json::from_value(params)?),
Some(params) => {
Some(serde_json::from_value(params.clone()).map_err(|err| {
anyhow::anyhow!(
"Could not deserialize message '{}': {}\n\n{:?}",
method,
err,
params
)
})?)
}
None => None,
};
Ok((method, maybe_params))