From f7556d8962b2106e82c9f0eb90f406eec0b60dd0 Mon Sep 17 00:00:00 2001 From: snek Date: Wed, 28 Aug 2024 19:25:38 -0700 Subject: [PATCH] fix: reland async context (#25140) This reverts commit 71ca61e189cca9215982ce4598b7a4da8430c584. Now uses a shared implementation from deno_core. --- ext/node/lib.rs | 10 - ext/node/polyfills/_next_tick.ts | 16 +- ext/node/polyfills/async_hooks.ts | 340 +++++++--------------------- ext/web/02_timers.js | 52 +++-- tests/unit_node/async_hooks_test.ts | 27 ++- 5 files changed, 153 insertions(+), 292 deletions(-) diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 3ec2d26bfa..17fd7ab5a6 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -148,15 +148,6 @@ fn op_node_build_os() -> String { env!("TARGET").split('-').nth(2).unwrap().to_string() } -#[op2(fast)] -fn op_node_is_promise_rejected(value: v8::Local) -> bool { - let Ok(promise) = v8::Local::::try_from(value) else { - return false; - }; - - promise.state() == v8::PromiseState::Rejected -} - #[op2] #[string] fn op_npm_process_state(state: &mut OpState) -> Result { @@ -347,7 +338,6 @@ deno_core::extension!(deno_node, ops::os::op_cpus

, ops::os::op_homedir

, op_node_build_os, - op_node_is_promise_rejected, op_npm_process_state, ops::require::op_require_init_paths, ops::require::op_require_node_module_paths

, diff --git a/ext/node/polyfills/_next_tick.ts b/ext/node/polyfills/_next_tick.ts index 5915c750ee..5ee27728d0 100644 --- a/ext/node/polyfills/_next_tick.ts +++ b/ext/node/polyfills/_next_tick.ts @@ -10,9 +10,15 @@ import { validateFunction } from "ext:deno_node/internal/validators.mjs"; import { _exiting } from "ext:deno_node/_process/exiting.ts"; import { FixedQueue } from "ext:deno_node/internal/fixed_queue.ts"; +const { + getAsyncContext, + setAsyncContext, +} = core; + interface Tock { callback: (...args: Array) => void; args: Array; + snapshot: unknown; } let nextTickEnabled = false; @@ -23,7 +29,7 @@ export function enableNextTick() { const queue = new FixedQueue(); export function processTicksAndRejections() { - let tock; + let tock: Tock; do { // deno-lint-ignore no-cond-assign while (tock = queue.shift()) { @@ -31,9 +37,11 @@ export function processTicksAndRejections() { // const asyncId = tock[async_id_symbol]; // emitBefore(asyncId, tock[trigger_async_id_symbol], tock); + const oldContext = getAsyncContext(); try { - const callback = (tock as Tock).callback; - if ((tock as Tock).args === undefined) { + setAsyncContext(tock.snapshot); + const callback = tock.callback; + if (tock.args === undefined) { callback(); } else { const args = (tock as Tock).args; @@ -58,6 +66,7 @@ export function processTicksAndRejections() { // FIXME(bartlomieju): Deno currently doesn't support async hooks // if (destroyHooksExist()) // emitDestroy(asyncId); + setAsyncContext(oldContext); } // FIXME(bartlomieju): Deno currently doesn't support async hooks @@ -143,6 +152,7 @@ export function nextTick>( // FIXME(bartlomieju): Deno currently doesn't support async hooks // [async_id_symbol]: asyncId, // [trigger_async_id_symbol]: triggerAsyncId, + snapshot: getAsyncContext(), callback, args: args_, }; diff --git a/ext/node/polyfills/async_hooks.ts b/ext/node/polyfills/async_hooks.ts index f94b8d2c64..017e9e9bc7 100644 --- a/ext/node/polyfills/async_hooks.ts +++ b/ext/node/polyfills/async_hooks.ts @@ -1,191 +1,35 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. -// This implementation is inspired by "workerd" AsyncLocalStorage implementation: -// https://github.com/cloudflare/workerd/blob/77fd0ed6ddba184414f0216508fc62b06e716cab/src/workerd/api/node/async-hooks.c++#L9 - // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials -import { core } from "ext:core/mod.js"; -import { op_node_is_promise_rejected } from "ext:core/ops"; +import { core, primordials } from "ext:core/mod.js"; import { validateFunction } from "ext:deno_node/internal/validators.mjs"; import { newAsyncId } from "ext:deno_node/internal/async_hooks.ts"; -function assert(cond: boolean) { - if (!cond) throw new Error("Assertion failed"); -} -const asyncContextStack: AsyncContextFrame[] = []; +const { + ObjectDefineProperties, + ReflectApply, + FunctionPrototypeBind, + ArrayPrototypeUnshift, + ObjectFreeze, +} = primordials; -function pushAsyncFrame(frame: AsyncContextFrame) { - asyncContextStack.push(frame); -} - -function popAsyncFrame() { - if (asyncContextStack.length > 0) { - asyncContextStack.pop(); - } -} - -let rootAsyncFrame: AsyncContextFrame | undefined = undefined; -let promiseHooksSet = false; - -const asyncContext = Symbol("asyncContext"); - -function setPromiseHooks() { - if (promiseHooksSet) { - return; - } - promiseHooksSet = true; - - const init = (promise: Promise) => { - const currentFrame = AsyncContextFrame.current(); - if (!currentFrame.isRoot()) { - if (typeof promise[asyncContext] !== "undefined") { - throw new Error("Promise already has async context"); - } - AsyncContextFrame.attachContext(promise); - } - }; - const before = (promise: Promise) => { - const maybeFrame = promise[asyncContext]; - if (maybeFrame) { - pushAsyncFrame(maybeFrame); - } else { - pushAsyncFrame(AsyncContextFrame.getRootAsyncContext()); - } - }; - const after = (promise: Promise) => { - popAsyncFrame(); - if (!op_node_is_promise_rejected(promise)) { - // @ts-ignore promise async context - promise[asyncContext] = undefined; - } - }; - const resolve = (promise: Promise) => { - const currentFrame = AsyncContextFrame.current(); - if ( - !currentFrame.isRoot() && op_node_is_promise_rejected(promise) && - typeof promise[asyncContext] === "undefined" - ) { - AsyncContextFrame.attachContext(promise); - } - }; - - core.setPromiseHooks(init, before, after, resolve); -} - -class AsyncContextFrame { - storage: StorageEntry[]; - constructor( - maybeParent?: AsyncContextFrame | null, - maybeStorageEntry?: StorageEntry | null, - isRoot = false, - ) { - this.storage = []; - - setPromiseHooks(); - - const propagate = (parent: AsyncContextFrame) => { - parent.storage = parent.storage.filter((entry) => !entry.key.isDead()); - parent.storage.forEach((entry) => this.storage.push(entry.clone())); - - if (maybeStorageEntry) { - const existingEntry = this.storage.find((entry) => - entry.key === maybeStorageEntry.key - ); - if (existingEntry) { - existingEntry.value = maybeStorageEntry.value; - } else { - this.storage.push(maybeStorageEntry); - } - } - }; - - if (!isRoot) { - if (maybeParent) { - propagate(maybeParent); - } else { - propagate(AsyncContextFrame.current()); - } - } - } - - static tryGetContext(promise: Promise) { - // @ts-ignore promise async context - return promise[asyncContext]; - } - - static attachContext(promise: Promise) { - // @ts-ignore promise async context - promise[asyncContext] = AsyncContextFrame.current(); - } - - static getRootAsyncContext() { - if (typeof rootAsyncFrame !== "undefined") { - return rootAsyncFrame; - } - - rootAsyncFrame = new AsyncContextFrame(null, null, true); - return rootAsyncFrame; - } - - static current() { - if (asyncContextStack.length === 0) { - return AsyncContextFrame.getRootAsyncContext(); - } - - return asyncContextStack[asyncContextStack.length - 1]; - } - - static create( - maybeParent?: AsyncContextFrame | null, - maybeStorageEntry?: StorageEntry | null, - ) { - return new AsyncContextFrame(maybeParent, maybeStorageEntry); - } - - static wrap( - fn: () => unknown, - maybeFrame: AsyncContextFrame | undefined, - // deno-lint-ignore no-explicit-any - thisArg: any, - ) { - // deno-lint-ignore no-explicit-any - return (...args: any) => { - const frame = maybeFrame || AsyncContextFrame.current(); - Scope.enter(frame); - try { - return fn.apply(thisArg, args); - } finally { - Scope.exit(); - } - }; - } - - get(key: StorageKey) { - assert(!key.isDead()); - this.storage = this.storage.filter((entry) => !entry.key.isDead()); - const entry = this.storage.find((entry) => entry.key === key); - if (entry) { - return entry.value; - } - return undefined; - } - - isRoot() { - return AsyncContextFrame.getRootAsyncContext() == this; - } -} +const { + AsyncVariable, + getAsyncContext, + setAsyncContext, +} = core; export class AsyncResource { - frame: AsyncContextFrame; type: string; + #snapshot: unknown; #asyncId: number; constructor(type: string) { this.type = type; - this.frame = AsyncContextFrame.current(); + this.#snapshot = getAsyncContext(); this.#asyncId = newAsyncId(); } @@ -198,35 +42,38 @@ export class AsyncResource { thisArg: unknown, ...args: unknown[] ) { - Scope.enter(this.frame); - + const previousContext = getAsyncContext(); try { - return fn.apply(thisArg, args); + setAsyncContext(this.#snapshot); + return ReflectApply(fn, thisArg, args); } finally { - Scope.exit(); + setAsyncContext(previousContext); } } emitDestroy() {} - bind(fn: (...args: unknown[]) => unknown, thisArg = this) { + bind(fn: (...args: unknown[]) => unknown, thisArg) { validateFunction(fn, "fn"); - const frame = AsyncContextFrame.current(); - const bound = AsyncContextFrame.wrap(fn, frame, thisArg); - - Object.defineProperties(bound, { + let bound; + if (thisArg === undefined) { + // deno-lint-ignore no-this-alias + const resource = this; + bound = function (...args) { + ArrayPrototypeUnshift(args, fn, this); + return ReflectApply(resource.runInAsyncScope, resource, args); + }; + } else { + bound = FunctionPrototypeBind(this.runInAsyncScope, this, fn, thisArg); + } + ObjectDefineProperties(bound, { "length": { + __proto__: null, configurable: true, enumerable: false, value: fn.length, writable: false, }, - "asyncResource": { - configurable: true, - enumerable: true, - value: this, - writable: true, - }, }); return bound; } @@ -236,95 +83,54 @@ export class AsyncResource { type?: string, thisArg?: AsyncResource, ) { - type = type || fn.name; - return (new AsyncResource(type || "AsyncResource")).bind(fn, thisArg); + type = type || fn.name || "bound-anonymous-fn"; + return (new AsyncResource(type)).bind(fn, thisArg); } } -class Scope { - static enter(maybeFrame?: AsyncContextFrame) { - if (maybeFrame) { - pushAsyncFrame(maybeFrame); - } else { - pushAsyncFrame(AsyncContextFrame.getRootAsyncContext()); - } - } - - static exit() { - popAsyncFrame(); - } -} - -class StorageEntry { - key: StorageKey; - value: unknown; - constructor(key: StorageKey, value: unknown) { - this.key = key; - this.value = value; - } - - clone() { - return new StorageEntry(this.key, this.value); - } -} - -class StorageKey { - #dead = false; - - reset() { - this.#dead = true; - } - - isDead() { - return this.#dead; - } -} - -const fnReg = new FinalizationRegistry((key: StorageKey) => { - key.reset(); -}); - export class AsyncLocalStorage { - #key; - - constructor() { - this.#key = new StorageKey(); - fnReg.register(this, this.#key); - } + #variable = new AsyncVariable(); + enabled = false; // deno-lint-ignore no-explicit-any run(store: any, callback: any, ...args: any[]): any { - const frame = AsyncContextFrame.create( - null, - new StorageEntry(this.#key, store), - ); - Scope.enter(frame); - let res; + this.enabled = true; + const previous = this.#variable.enter(store); try { - res = callback(...args); + return ReflectApply(callback, null, args); } finally { - Scope.exit(); + setAsyncContext(previous); } - return res; } // deno-lint-ignore no-explicit-any exit(callback: (...args: unknown[]) => any, ...args: any[]): any { - return this.run(undefined, callback, args); + if (!this.enabled) { + return ReflectApply(callback, null, args); + } + this.enabled = false; + try { + return ReflectApply(callback, null, args); + } finally { + this.enabled = true; + } } // deno-lint-ignore no-explicit-any getStore(): any { - const currentFrame = AsyncContextFrame.current(); - return currentFrame.get(this.#key); + if (!this.enabled) { + return undefined; + } + return this.#variable.get(); } enterWith(store: unknown) { - const frame = AsyncContextFrame.create( - null, - new StorageEntry(this.#key, store), - ); - Scope.enter(frame); + this.enabled = true; + this.#variable.enter(store); + } + + disable() { + this.enabled = false; } static bind(fn: (...args: unknown[]) => unknown) { @@ -335,14 +141,24 @@ export class AsyncLocalStorage { return AsyncLocalStorage.bind(( cb: (...args: unknown[]) => unknown, ...args: unknown[] - ) => cb(...args)); + ) => ReflectApply(cb, null, args)); } } export function executionAsyncId() { - return 1; + return 0; } +export function triggerAsyncId() { + return 0; +} + +export function executionAsyncResource() { + return {}; +} + +export const asyncWrapProviders = ObjectFreeze({ __proto__: null }); + class AsyncHook { enable() { } @@ -355,12 +171,12 @@ export function createHook() { return new AsyncHook(); } -// Placing all exports down here because the exported classes won't export -// otherwise. export default { - // Embedder API - AsyncResource, - executionAsyncId, - createHook, AsyncLocalStorage, + createHook, + executionAsyncId, + triggerAsyncId, + executionAsyncResource, + asyncWrapProviders, + AsyncResource, }; diff --git a/ext/web/02_timers.js b/ext/web/02_timers.js index 5591478619..89acaca42b 100644 --- a/ext/web/02_timers.js +++ b/ext/web/02_timers.js @@ -11,6 +11,10 @@ const { indirectEval, ReflectApply, } = primordials; +const { + getAsyncContext, + setAsyncContext, +} = core; import * as webidl from "ext:deno_webidl/00_webidl.js"; @@ -33,14 +37,16 @@ function checkThis(thisArg) { * Call a callback function immediately. */ function setImmediate(callback, ...args) { - if (args.length > 0) { - const unboundCallback = callback; - callback = () => ReflectApply(unboundCallback, globalThis, args); - } - - return core.queueImmediate( - callback, - ); + const asyncContext = getAsyncContext(); + return core.queueImmediate(() => { + const oldContext = getAsyncContext(); + try { + setAsyncContext(asyncContext); + return ReflectApply(callback, globalThis, args); + } finally { + setAsyncContext(oldContext); + } + }); } /** @@ -53,10 +59,17 @@ function setTimeout(callback, timeout = 0, ...args) { const unboundCallback = webidl.converters.DOMString(callback); callback = () => indirectEval(unboundCallback); } - if (args.length > 0) { - const unboundCallback = callback; - callback = () => ReflectApply(unboundCallback, globalThis, args); - } + const unboundCallback = callback; + const asyncContext = getAsyncContext(); + callback = () => { + const oldContext = getAsyncContext(); + try { + setAsyncContext(asyncContext); + ReflectApply(unboundCallback, globalThis, args); + } finally { + setAsyncContext(oldContext); + } + }; timeout = webidl.converters.long(timeout); return core.queueUserTimer( core.getTimerDepth() + 1, @@ -75,10 +88,17 @@ function setInterval(callback, timeout = 0, ...args) { const unboundCallback = webidl.converters.DOMString(callback); callback = () => indirectEval(unboundCallback); } - if (args.length > 0) { - const unboundCallback = callback; - callback = () => ReflectApply(unboundCallback, globalThis, args); - } + const unboundCallback = callback; + const asyncContext = getAsyncContext(); + callback = () => { + const oldContext = getAsyncContext(asyncContext); + try { + setAsyncContext(asyncContext); + ReflectApply(unboundCallback, globalThis, args); + } finally { + setAsyncContext(oldContext); + } + }; timeout = webidl.converters.long(timeout); return core.queueUserTimer( core.getTimerDepth() + 1, diff --git a/tests/unit_node/async_hooks_test.ts b/tests/unit_node/async_hooks_test.ts index f153f67532..91130972c5 100644 --- a/tests/unit_node/async_hooks_test.ts +++ b/tests/unit_node/async_hooks_test.ts @@ -1,5 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. import { AsyncLocalStorage, AsyncResource } from "node:async_hooks"; +import process from "node:process"; +import { setImmediate } from "node:timers"; import { assert, assertEquals } from "@std/assert"; Deno.test(async function foo() { @@ -92,7 +94,7 @@ Deno.test(async function enterWith() { }); assertEquals(await deferred.promise, { x: 2 }); - assertEquals(await deferred1.promise, { x: 1 }); + assertEquals(await deferred1.promise, null); }); Deno.test(async function snapshot() { @@ -135,3 +137,26 @@ Deno.test(function emitDestroyStub() { const resource = new AsyncResource("foo"); assert(typeof resource.emitDestroy === "function"); }); + +Deno.test(async function worksWithAsyncAPIs() { + const store = new AsyncLocalStorage(); + const test = () => assertEquals(store.getStore(), "data"); + await store.run("data", async () => { + test(); + queueMicrotask(() => test()); + process.nextTick(() => test()); + setImmediate(() => test()); + setTimeout(() => test(), 0); + const intervalId = setInterval(() => { + test(); + clearInterval(intervalId); + }, 0); + + store.run("data2", () => { + assertEquals(store.getStore(), "data2"); + }); + + await new Promise((r) => setTimeout(r, 50)); + test(); + }); +});