深入理解Node.js的Async hooks
创始人
2025-05-03 11:41:43
0

 [[417016]]

虽然Async hooks至此还是实验性API,但是他的确可以解决应用中的一些问题,比如日志和调用栈跟踪。本文从应用和原理方便介绍一下Node.js的Async hooks。

1 env中的AsyncHooks

在Node.js的env对象中有一个AsyncHooks对象,负责Node.js进程中async_hooks的管理。我们看一下定义。

1.1 类定义

  1. class AsyncHooks : public MemoryRetainer { 
  2.  public: 
  3.  
  4.   enum Fields { 
  5.     // 五种钩子 
  6.     kInit, 
  7.     kBefore, 
  8.     kAfter, 
  9.     kDestroy, 
  10.     kPromiseResolve, 
  11.     // 钩子总数 
  12.     kTotals, 
  13.     // async_hooks开启的个数 
  14.     kCheck, 
  15.     // 记录栈的top指针 
  16.     kStackLength, 
  17.     // 数组大小 
  18.     kFieldsCount, 
  19.   }; 
  20.  
  21.   enum UidFields { 
  22.     kExecutionAsyncId, 
  23.     kTriggerAsyncId, 
  24.     // 当前async id的值 
  25.     kAsyncIdCounter, 
  26.     kDefaultTriggerAsyncId, 
  27.     kUidFieldsCount, 
  28.   }; 
  29.  
  30.  private: 
  31.   inline AsyncHooks(); 
  32.   // 异步资源的类型 
  33.   std::array, AsyncWrap::PROVIDERS_LENGTH> providers_; 
  34.   // 栈 
  35.   AliasedFloat64Array async_ids_stack_; 
  36.   // 整形数组,每个元素值的意义和Fields对应 
  37.   AliasedUint32Array fields_; 
  38.   // 整形数组,每个元素值的意义和UidFields对应 
  39.   AliasedFloat64Array async_id_fields_; 
  40. }; 

结构图如下

接下来看一下env的AsyncHooks对象提供了哪些API,这些API是上层的基础。

1.2 读API

我们看一下env对象中获取AsyncHooks对象对应字段的API。

  1. // 获取对应的字段 
  2. inline AliasedUint32Array& AsyncHooks::fields() { 
  3.   return fields_; 
  4.  
  5. inline AliasedFloat64Array& AsyncHooks::async_id_fields() { 
  6.   return async_id_fields_; 
  7.  
  8. inline AliasedFloat64Array& AsyncHooks::async_ids_stack() { 
  9.   return async_ids_stack_; 
  10.  
  11. // 获取资源类型 
  12. inline v8::Local AsyncHooks::provider_string(int idx) { 
  13.   return providers_[idx].Get(env()->isolate()); 
  14.  
  15. // 新建资源的时候,获取新的async id 
  16. inline double Environment::new_async_id() { 
  17.   async_hooks()->async_id_fields()[AsyncHooks::kAsyncIdCounter] += 1; 
  18.   return async_hooks()->async_id_fields()[AsyncHooks::kAsyncIdCounter]; 
  19.  
  20. // 获取当前async id 
  21. inline double Environment::execution_async_id() { 
  22.   return async_hooks()->async_id_fields()[AsyncHooks::kExecutionAsyncId]; 
  23.  
  24. // 获取当前trigger async id 
  25. inline double Environment::trigger_async_id() { 
  26.   return async_hooks()->async_id_fields()[AsyncHooks::kTriggerAsyncId]; 
  27.  
  28. // 获取默认的trigger async id,如果没有设置,则获取当前的async id 
  29. inline double Environment::get_default_trigger_async_id() { 
  30.   double default_trigger_async_id = async_hooks()->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId]; 
  31.   // If defaultTriggerAsyncId isn't set, use the executionAsyncId 
  32.   if (default_trigger_async_id < 0) 
  33.     default_trigger_async_id = execution_async_id(); 
  34.   return default_trigger_async_id; 

1.3 写API

  1. inline void AsyncHooks::push_async_ids(double async_id, 
  2.                                        double trigger_async_id) { 
  3.   // 获取当前栈顶指针 
  4.   uint32_t offset = fields_[kStackLength]; 
  5.   // 不够则扩容 
  6.   if (offset * 2 >= async_ids_stack_.Length()) 
  7.     grow_async_ids_stack(); 
  8.   // 把旧的上下文压栈   
  9.   async_ids_stack_[2 * offset] = async_id_fields_[kExecutionAsyncId]; 
  10.   async_ids_stack_[2 * offset + 1] = async_id_fields_[kTriggerAsyncId]; 
  11.   // 栈指针加一 
  12.   fields_[kStackLength] += 1; 
  13.   // 记录当前上下文 
  14.   async_id_fields_[kExecutionAsyncId] = async_id; 
  15.   async_id_fields_[kTriggerAsyncId] = trigger_async_id; 
  16.  
  17. // 和上面的逻辑相反 
  18. inline bool AsyncHooks::pop_async_id(double async_id) { 
  19.  
  20.   if (fields_[kStackLength] == 0) return false; 
  21.   uint32_t offset = fields_[kStackLength] - 1; 
  22.   async_id_fields_[kExecutionAsyncId] = async_ids_stack_[2 * offset]; 
  23.   async_id_fields_[kTriggerAsyncId] = async_ids_stack_[2 * offset + 1]; 
  24.   fields_[kStackLength] = offset; 
  25.  
  26.   return fields_[kStackLength] > 0; 

2 底层资源封装类 - AsyncWrap

接着看一下异步资源的基类AsyncWrap。所有依赖于C、C++层实现的资源(比如TCP、UDP)都会继承AsyncWrap。看看该类的定义。

  1. class AsyncWrap : public BaseObject { 
  2.  private: 
  3.   ProviderType provider_type_ = PROVIDER_NONE; 
  4.   double async_id_ = kInvalidAsyncId; 
  5.   double trigger_async_id_; 
  6. }; 

我们看到每个AsyncWrap对象都有async_id_、trigger_async_id_和provider_type_属性,这正是在init回调里拿到的数据。我们看看AsyncWrap的构造函数。接下来看一下新建一个资源(AsyncWrap)时的逻辑。

2.1 资源初始化

  1. AsyncWrap::AsyncWrap(Environment* env, 
  2.                      Local object, 
  3.                      ProviderType provider, 
  4.                      double execution_async_id, 
  5.                      bool silent) 
  6.     : AsyncWrap(env, object) { 
  7.   // 资源类型 
  8.   provider_type_ = provider; 
  9.   AsyncReset(execution_async_id, silent); 
  10.  
  11. void AsyncWrap::AsyncReset(Local resource, double execution_async_id, 
  12.                            bool silent) { 
  13.   // 获取一个新的async id,execution_async_id默认是kInvalidAsyncId 
  14.   async_id_ = execution_async_id == kInvalidAsyncId ? env()->new_async_id() 
  15.                                                      : execution_async_id; 
  16.   // 获取trigger async id                                                    
  17.   trigger_async_id_ = env()->get_default_trigger_async_id(); 
  18.   // 执行init钩子 
  19.   EmitAsyncInit(env(), resource, 
  20.                 env()->async_hooks()->provider_string(provider_type()), 
  21.                 async_id_, trigger_async_id_); 
  22. 接着看EmitAsyncInit

    1. void AsyncWrap::EmitAsyncInit(Environment* env, 
    2.                               Local object, 
    3.                               Local type, 
    4.                               double async_id, 
    5.                               double trigger_async_id) { 
    6.   AsyncHooks* async_hooks = env->async_hooks(); 
    7.   HandleScope scope(env->isolate()); 
    8.   Local init_fn = env->async_hooks_init_function(); 
    9.  
    10.   Local argv[] = { 
    11.     Number::New(env->isolate(), async_id), 
    12.     type, 
    13.     Number::New(env->isolate(), trigger_async_id), 
    14.     object, 
    15.   }; 
    16.  
    17.   TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal); 
    18.   // 执行init回调 
    19.   USE(init_fn->Call(env->context(), object, arraysize(argv), argv)); 
    20. 那么env->async_hooks_init_function()的值是什么呢?这是在Node.js初始化时设置的。

      1. const { nativeHooks } = require('internal/async_hooks'); 
      2. internalBinding('async_wrap').setupHooks(nativeHooks); 

      SetupHooks的实现如下

      1. static void SetupHooks(const FunctionCallbackInfo& args) { 
      2.  
      3.   Environment* env = Environment::GetCurrent(args); 
      4.   Local fn_obj = args[0].As();#define SET_HOOK_FN(name)                                                      \ 
      5.   do {                                                                         \ 
      6.     Local v =                                                           \ 
      7.         fn_obj->Get(env->context(),                                            \ 
      8.                     FIXED_ONE_BYTE_STRING(env->isolate(), #name))              \ 
      9.             .ToLocalChecked();                                                 \ 
      10.     CHECK(v->IsFunction());                                                    \ 
      11.     env->set_async_hooks_##name##_function(v.As());                  \ 
      12.   } while (0) 
      13.   // 保存到env中 
      14.   SET_HOOK_FN(init); 
      15.   SET_HOOK_FN(before); 
      16.   SET_HOOK_FN(after); 
      17.   SET_HOOK_FN(destroy); 
      18.   SET_HOOK_FN(promise_resolve);#undef SET_HOOK_FN 
      19. nativeHooks的实现如下

        1. nativeHooks: { 
        2.   init: emitInitNative, 
        3.   before: emitBeforeNative, 
        4.   after: emitAfterNative, 
        5.   destroy: emitDestroyNative, 
        6.   promise_resolve: emitPromiseResolveNative 

        这些Hooks会执行对应的回调,比如emitInitNative

        1. function emitInitNative(asyncId, type, triggerAsyncId, resource) { 
        2.   for (var i = 0; i < active_hooks.array.length; i++) { 
        3.       if (typeof active_hooks.array[i][init_symbol] === 'function') { 
        4.         active_hooks.array[i][init_symbol]( 
        5.           asyncId, type, triggerAsyncId, 
        6.           resource 
        7.         ); 
        8.       } 
        9.   } 

        active_hooks.array的值就是我们在业务代码里设置的钩子,每次调研createHooks的时候就对应数组的一个元素。

        2.2 执行资源回调

        当业务代码异步请求底层API,并且底层满足条件时,就会执行上层的回调,比如监听一个socket时,有连接到来。Node.js就会调用MakeCallback函数执行回调。

        1. MaybeLocal AsyncWrap::MakeCallback(const Local cb, 
        2.                                           int argc, 
        3.                                           Local* argv) { 
        4.   // 当前AsyncWrap对象对应的执行上下文                              
        5.   ProviderType provider = provider_type(); 
        6.   async_context context { get_async_id(), get_trigger_async_id() }; 
        7.   MaybeLocal ret = InternalMakeCallback(env(), object(), cb, argc, argv, context); 
        8.  
        9.   return ret; 

        MaybeLocal InternalMakeCallback(Environment* env,

        1. MaybeLocal InternalMakeCallback(Environment* env, 
        2.                                        Local recv, 
        3.                                        const Local callback, 
        4.                                        int argc, 
        5.                                        Local argv[], 
        6.                                        async_context asyncContext) { 
        7.   // 新建一个scope                                      
        8.   InternalCallbackScope scope(env, recv, asyncContext); 
        9.   // 执行回调 
        10.   callback->Call(env->context(), recv, argc, argv); 
        11.   // 关闭scope 
        12.   scope.Close(); 
        13. 我们看看新建和关闭scope都做了什么事情。

          1. InternalCallbackScope::InternalCallbackScope(Environment* env, 
          2.                                              Local object, 
          3.                                              const async_context& asyncContext, 
          4.                                              int flags) 
          5.   : env_(env), 
          6.     async_context_(asyncContext), 
          7.     object_(object), 
          8.     skip_hooks_(flags & kSkipAsyncHooks), 
          9.     skip_task_queues_(flags & kSkipTaskQueues) { 
          10.   // v14版本中,是先触发before再push上下文,顺序是不对的,v16已经改过来。 
          11.   // 当前执行上下文入栈 
          12.   env->async_hooks()->push_async_ids(async_context_.async_id, 
          13.                                async_context_.trigger_async_id); 
          14.   // 触发before钩子 
          15.   if (asyncContext.async_id != 0 && !skip_hooks_) { 
          16.     AsyncWrap::EmitBefore(env, asyncContext.async_id); 
          17.   } 
          18.  
          19.   pushed_ids_ = true; 
          20. 在scope里会把当前AsyncWrap对象的执行上下文作为当前执行上下文,并且触发before钩子,然后执行业务回调,所以我们在回调里获取当前执行上下文时就拿到了AsyncWrap对应的值( 调用executionAsyncId),接着看Close

            1. void InternalCallbackScope::Close() { 
            2.   // 执行 
            3.   if (pushed_ids_) 
            4.     env_->async_hooks()->pop_async_id(async_context_.async_id); 
            5.  
            6.   if (async_context_.async_id != 0 && !skip_hooks_) { 
            7.     AsyncWrap::EmitAfter(env_, async_context_.async_id); 
            8.   } 

            Close在执行回调后被调用,主要是恢复当前执行上下文并且触发after钩子。

            3 上层资源的封装 - Timeout、TickObjecd等

            并不是所有的异步资源都是底层实现的,比如定时器,tick也被定义为异步资源,因为他们都是和回调相关。这种异步资源是在JS层实现的,这里只分析Timeout。

            3.1 创建资源

            我们看一下执行setTimeout时的核心逻辑。

            1. function setTimeout(callback, after, arg1, arg2, arg3) { 
            2.   const timeout = new Timeout(callback, after, args, false, true); 
            3.   return timeout; 
            4.  
            5. function Timeout(callback, after, args, isRepeat, isRefed) { 
            6.   initAsyncResource(this, 'Timeout'); 
            7.  
            8. function initAsyncResource(resource, type) { 
            9.   // 获取新的async id 
            10.   const asyncId = resource[async_id_symbol] = newAsyncId(); 
            11.   const triggerAsyncId = resource[trigger_async_id_symbol] = getDefaultTriggerAsyncId(); 
            12.   // 是否设置了init钩子,是则触发回调 
            13.   if (initHooksExist()) 
            14.     emitInit(asyncId, type, triggerAsyncId, resource); 

            执行setTimeout时,Node.js会创建一个Timeout对象,设置async_hooks相关的上下文并记录到Timeout对象中。然后触发init钩子。

            1. function emitInitScript(asyncId, type, triggerAsyncId, resource) { 
            2.   emitInitNative(asyncId, type, triggerAsyncId, resource); 

            以上代码会执行每个async_hooks对象的init回调(通常我们只有一个async_hooks对象)。

            3.2执行回调

            当定时器到期时,会执行回调,我们看看相关的逻辑。

            1. // 触发before钩子 
            2. emitBefore(asyncId, timer[trigger_async_id_symbol]); 
            3. // 执行回调 
            4. timer._onTimeout(); 
            5. // 触发after回调emitAfter(asyncId); 

            我们看到执行超时回调的前后会触发对应的钩子。

            1. function emitBeforeScript(asyncId, triggerAsyncId) { 
            2.   // 和底层的push_async_ids逻辑一样 
            3.   pushAsyncIds(asyncId, triggerAsyncId); 
            4.   // 如果有回调则执行 
            5.   if (async_hook_fields[kBefore] > 0) 
            6.     emitBeforeNative(asyncId); 
            7.  
            8. function emitAfterScript(asyncId) { 
            9.   // 设置了after回调则emit 
            10.   if (async_hook_fields[kAfter] > 0) 
            11.     emitAfterNative(asyncId); 
            12.   // 和底层的pop_async_ids逻辑一样 
            13.   popAsyncIds(asyncId); 

            JS层的实现和底层是保持一致的。如果我们在setTimeout回调里新建一个资源,比如再次执行setTimeout,这时候trigger async id就是第一个setTimeout对应的async id,所以就连起来了,后面我们会看到具体的例子。

            4 DefaultTriggerAsyncIdScope

            Node.js为了避免过多通过参数传递的方式传递async id,就设计了DefaultTriggerAsyncIdScope。DefaultTriggerAsyncIdScope的作用类似在多个函数外维护一个变量,多个函数都可以通过DefaultTriggerAsyncIdScope获得trigger async id,而不需要通过层层传递的方式,他的实现非常简单。

            1. class DefaultTriggerAsyncIdScope { 
            2.    private: 
            3.     AsyncHooks* async_hooks_; 
            4.     double old_default_trigger_async_id_; 
            5. }; 
            6.  
            7. inline AsyncHooks::DefaultTriggerAsyncIdScope ::DefaultTriggerAsyncIdScope( 
            8.     Environment* env, double default_trigger_async_id) 
            9.     : async_hooks_(env->async_hooks()) { 
            10.  
            11.   // 记录旧的id,设置新的id 
            12.   old_default_trigger_async_id_ = 
            13.     async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId]; 
            14.   async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId] = 
            15.     default_trigger_async_id; 
            16.  
            17. // 恢复 
            18. inline AsyncHooks::DefaultTriggerAsyncIdScope ::~DefaultTriggerAsyncIdScope() { 
            19.   async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId] = 
            20.     old_default_trigger_async_id_; 

            DefaultTriggerAsyncIdScope主要是记录旧的id,然后把新的id设置到env中,当其他函数调用get_default_trigger_async_id时就可以获取设置的async id。同样JS层也实现了类似的API。

            1. function defaultTriggerAsyncIdScope(triggerAsyncId, block, ...args) { 
            2.   const oldDefaultTriggerAsyncId = async_id_fields[kDefaultTriggerAsyncId]; 
            3.   async_id_fields[kDefaultTriggerAsyncId] = triggerAsyncId; 
            4.  
            5.   try { 
            6.     return block(...args); 
            7.   } finally { 
            8.     async_id_fields[kDefaultTriggerAsyncId] = oldDefaultTriggerAsyncId; 
            9.   } 

            在执行block函数时,可以获取到设置的值,而不需要传递,执行完block后恢复。我们看看如何使用。下面摘自net模块的代码。

            1. // 获取handle里的async id 
            2. this[async_id_symbol] = getNewAsyncId(this._handle); 
            3. defaultTriggerAsyncIdScope(this[async_id_symbol], 
            4.                              process.nextTick, 
            5.                              emitListeningNT, 
            6.                              this); 

            我们看一下这里具体的情况。在defaultTriggerAsyncIdScope中会以emitListeningNT为入参执行process.nextTick。我们看看nextTick的实现。

            1. function nextTick(callback) { 
            2.   // 获取新的async id 
            3.   const asyncId = newAsyncId(); 
            4.   // 获取默认的trigger async id,即刚才设置的 
            5.   const triggerAsyncId = getDefaultTriggerAsyncId(); 
            6.   const tickObject = { 
            7.     [async_id_symbol]: asyncId, 
            8.     [trigger_async_id_symbol]: triggerAsyncId, 
            9.     callback, 
            10.     args 
            11.   }; 
            12.   if (initHooksExist()) 
            13.     // 创建了新的资源,触发init钩子 
            14.     emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject); 
            15.   queue.push(tickObject); 

            我们看到在nextTick中通过getDefaultTriggerAsyncId拿到了trigger async id。

            1. function getDefaultTriggerAsyncId() { 
            2.   const defaultTriggerAsyncId = async_id_fields[kDefaultTriggerAsyncId]; 
            3.   if (defaultTriggerAsyncId < 0) 
            4.     return async_id_fields[kExecutionAsyncId]; 
            5.   return defaultTriggerAsyncId; 

            getDefaultTriggerAsyncId返回的就是刚才通过defaultTriggerAsyncIdScope设置的async id。所以在触发TickObject的init钩子时用户就可以拿到对应的id。不过更重要的时,在异步执行nextTick的任务时,还可以拿到原始的trigger async id。因为该id记录在tickObject中。我们看看执行tick任务时的逻辑。

            1. function processTicksAndRejections() { 
            2.   let tock; 
            3.   do { 
            4.     while (tock = queue.shift()) { 
            5.       // 拿到对应的async 上下文 
            6.       const asyncId = tock[async_id_symbol]; 
            7.       emitBefore(asyncId, tock[trigger_async_id_symbol]); 
            8.       try { 
            9.         const callback = tock.callback; 
            10.         callback(); 
            11.       } finally { 
            12.         if (destroyHooksExist()) 
            13.           emitDestroy(asyncId); 
            14.       } 
            15.       emitAfter(asyncId); 
            16.     } 
            17.   } while (!queue.isEmpty() || processPromiseRejections()); 

            5 资源销毁

            资源销毁的时候也会触发对应的钩子,不过不同的是这个钩子是异步触发的。无论是JS还是好C++层触发销毁钩子的时候,逻辑都是一致的。

            1. void AsyncWrap::EmitDestroy(Environment* env, double async_id) { 
            2.   // 之前为空则设置回调 
            3.   if (env->destroy_async_id_list()->empty()) { 
            4.     env->SetUnrefImmediate(&DestroyAsyncIdsCallback); 
            5.   } 
            6.   // async id入队 
            7.   env->destroy_async_id_list()->push_back(async_id); 
            8.  
            9. template void Environment::SetUnrefImmediate(Fn&& cb) { 
            10.   CreateImmediate(std::move(cb), false); 
            11.  
            12. template void Environment::CreateImmediate(Fn&& cb, bool ref) { 
            13.   auto callback = std::make_unique>( 
            14.       std::move(cb), ref); 
            15.   // 加入任务队列     
            16.   native_immediates_.Push(std::move(callback)); 

            在事件循环的check阶段就会执行里面的任务,从而执行回调DestroyAsyncIdsCallback。

            1. void AsyncWrap::DestroyAsyncIdsCallback(Environment* env) { 
            2.   Local fn = env->async_hooks_destroy_function(); 
            3.   do { 
            4.     std::vector destroy_async_id_list; 
            5.     destroy_async_id_list.swap(*env->destroy_async_id_list()); 
            6.     // 遍历销毁的async id 
            7.     for (auto async_id : destroy_async_id_list) { 
            8.       HandleScope scope(env->isolate()); 
            9.       Local async_id_value = Number::New(env->isolate(), async_id); 
            10.       // 执行JS层回调 
            11.       MaybeLocal ret = fn->Call(env->context(), Undefined(env->isolate()), 1, &async_id_value); 
            12.     } 
            13.   } while (!env->destroy_async_id_list()->empty()); 

            6 Async hooks的使用

            我们通常以以下方式使用Async hooks

            1. const async_hooks = require('async_hooks'); 
            2. async_hooks.createHook({ 
            3.   init(asyncId, type, triggerAsyncId) {}, 
            4.   before(asyncId) {}, 
            5.   after(asyncId) {}, 
            6.   destroy(asyncId) {}, 
            7.   promiseResolve(asyncId), 
            8. }).enable(); 

            async_hooks是对资源生命周期的抽象,资源就是操作对象和回调的抽象。async_hooks定义了五个生命周期钩子,当资源的状态到达某个周期节点时,async_hooks就会触发对应的钩子。下面我们看一下具体的实现。我们首先看一下createHook。

            1. function createHook(fns) { 
            2.   return new AsyncHook(fns); 

            createHook是对AsyncHook的封装

            1. class AsyncHook { 
            2.   constructor({ init, before, after, destroy, promiseResolve }) { 
            3.     // 记录回调 
            4.     this[init_symbol] = init; 
            5.     this[before_symbol] = before; 
            6.     this[after_symbol] = after; 
            7.     this[destroy_symbol] = destroy; 
            8.     this[promise_resolve_symbol] = promiseResolve; 
            9.   } 

            AsyncHook的初始化很简单,创建一个AsyncHook对象记录回调函数。创建了AsyncHook之后,我们需要调用AsyncHook的enable函数手动开启。

            1. class AsyncHook { 
            2.   enable() { 
            3.     // 获取一个AsyncHook对象数组和一个整形数组 
            4.     const [hooks_array, hook_fields] = getHookArrays(); 
            5.     // 执行过enable了则不需要再执行 
            6.     if (hooks_array.includes(this)) 
            7.       return this; 
            8.     // 做些统计 
            9.     const prev_kTotals = hook_fields[kTotals]; 
            10.     hook_fields[kTotals] = hook_fields[kInit] += +!!this[init_symbol]; 
            11.     hook_fields[kTotals] += hook_fields[kBefore] += +!!this[before_symbol]; 
            12.     hook_fields[kTotals] += hook_fields[kAfter] += +!!this[after_symbol]; 
            13.     hook_fields[kTotals] += hook_fields[kDestroy] += +!!this[destroy_symbol]; 
            14.     hook_fields[kTotals] += 
            15.         hook_fields[kPromiseResolve] += +!!this[promise_resolve_symbol]; 
            16.     // 当前对象插入数组中 
            17.     hooks_array.push(this); 
            18.     // 如果之前的数量是0,本次操作后大于0则开启底层的逻辑 
            19.     if (prev_kTotals === 0 && hook_fields[kTotals] > 0) { 
            20.       enableHooks(); 
            21.     } 
            22.  
            23.     return this; 
            24.   } 

            1 hooks_array:是一个AsyncHook对象数组,主要用于记录用户创建了哪些AsyncHook对象,然后哪些AsyncHook对象里都设置了哪些钩子,在回调的时候就会遍历这个对象数组,执行里面的回调。

            2 hook_fields:对应底层的async_hook_fields。

            3 enableHooks:

            1. function enableHooks() { 
            2.   // 记录async_hooks的开启个数 
            3.   async_hook_fields[kCheck] += 1; 

            至此,async_hooks的初始化就完成了,我们发现逻辑非常简单。下面我们看一下他是如何串起来的。下面我们以TCP模块为例。

            1. const { createHook, executionAsyncId } = require('async_hooks'); 
            2. const fs = require('fs'); 
            3. const net = require('net'); 
            4. createHook({ 
            5.   init(asyncId, type, triggerAsyncId) { 
            6.     fs.writeSync( 
            7.       1, 
            8.       `${type}(${asyncId}): trigger: ${triggerAsyncId} execution: ${executionAsyncId()}\n`); 
            9.   }}).enable(); 
            10.  
            11. net.createServer((conn) => {}).listen(8080); 

            以上代码输出

            1. init: type: TCPSERVERWRAP asyncId: 2 trigger id: 1 executionAsyncId(): 1 triggerAsyncId(): 0 
            2. init: type: TickObject asyncId: 3 trigger id: 2 executionAsyncId(): 1 triggerAsyncId(): 0 
            3. before: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2 
            4. after: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2 

            下面我们来分析具体过程。我们知道创建资源的时候会执行init回调,具体逻辑在listen函数中,在listen函数中,通过层层调用会执行new TCP新建一个对象,表示服务器。TCP是C++层导出的类,刚才我们说过,TCP会继承AsyncWrap,新建AsyncWrap对象的时候会触发init钩子,结构图如下。

            对应输出

            1. init: type: TCPSERVERWRAP asyncId: 2 trigger id: 1 executionAsyncId(): 1 triggerAsyncId(): 0 

            那TickObject是怎么来的呢?我们接着看listen里的另一段逻辑。

            1. this[async_id_symbol] = getNewAsyncId(this._handle); 
            2. defaultTriggerAsyncIdScope(this[async_id_symbol], 
            3.                            process.nextTick, 
            4.                            emitListeningNT, 
            5.                            this); 

            上面的代码我们刚才已经分析过,在执行process.nextTick的时候会创建一个TickObject对象封装执行上下文和回调。

            1. const asyncId = newAsyncId(); 
            2. const triggerAsyncId = getDefaultTriggerAsyncId(); 
            3. const tickObject = { 
            4.   [async_id_symbol]: asyncId, 
            5.   [trigger_async_id_symbol]: triggerAsyncId, 
            6.   callback, 
            7.   args 
            8. }; 
            9. emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject); 

            emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);

            这次再次触发了init钩子,结构如下(nextTick通过getDefaultTriggerAsyncId获取的id是defaultTriggerAsyncIdScope设置的id)。

            对应输出

            1. init: type: TickObject asyncId: 3 trigger id: 2 executionAsyncId(): 1 triggerAsyncId(): 0 

            接着执行tick任务。

            1. const asyncId = tock[async_id_symbol]; 
            2. emitBefore(asyncId, tock[trigger_async_id_symbol]); 
            3. try { 
            4.   tock.callback(); 
            5. } finally { 
            6.   if (destroyHooksExist()) 
            7.     emitDestroy(asyncId); 
            8. emitAfter(asyncId); 

            emitBefore时,结构图如下。

            对应输出

            1. before: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2 
            2. after: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2 

            执行完我们的JS代码后,所有入栈的上下文都会被清空,结构图如下。

            如果这时候有一个连接建立会输出什么呢?当有连接建立时,会执行C++层的OnConnection。OnConnection会创建一个新的TCP对象表示和客户端通信的对象。

            1. MaybeLocal TCPWrap::Instantiate(Environment* env, 
            2.                                         AsyncWrap* parent, 
            3.                                         TCPWrap::SocketType type) { 
            4.   EscapableHandleScope handle_scope(env->isolate()); 
            5.   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(parent);  
            6.   return handle_scope.EscapeMaybe( 
            7.       constructor->NewInstance(env->context(), 1, &type_value)); 
            8. 首先定义了一个AsyncHooks::DefaultTriggerAsyncIdScope。DefaultTriggerAsyncIdScope用于设置默认default_trigger_async_id为parent的async id(值是2),执行Instantiate时会执行析构函数恢复原来状态。接着NewInstance的时候就会新建一个TCPWrap对象,从而创建一个AsyncWrap对象。然后触发init钩子,结构图如下。

              对应输出

              1. init: type: TCPWRAP asyncId: 4 trigger id: 2 executionAsyncId(): 0 triggerAsyncId(): 0 

              创建完对象后,通过AsyncWrap::MakeCallback回调JS层,刚才我们已经分析过AsyncWrap::MakeCallback会触发before和after钩子,触发before钩子时,结构图如下。

              对应输出

              1. before: asyncId: 2 executionAsyncId(): 2 triggerAsyncId(): 1 

              同样,在回调函数里执行executionAsyncId和triggerAsyncId拿到的内容是一样的。触发after后再恢复上下文,所以输出也是一样的。

              1. after: asyncId: 2 executionAsyncId(): 2 triggerAsyncId(): 1 

              7 AsyncResource

              异步资源并不是Node.js内置的,Node.js只是提供了一套机制,业务层也可以使用。Node.js也提供了一个类给业务使用,核心代码如下。

              1. class AsyncResource { 
              2.   constructor(type, opts = {}) { 
              3.     let triggerAsyncId = opts; 
              4.     let requireManualDestroy = false; 
              5.     if (typeof opts !== 'number') { 
              6.       triggerAsyncId = opts.triggerAsyncId === undefined ? 
              7.         getDefaultTriggerAsyncId() : opts.triggerAsyncId; 
              8.       requireManualDestroy = !!opts.requireManualDestroy; 
              9.     } 
              10.     const asyncId = newAsyncId(); 
              11.     this[async_id_symbol] = asyncId; 
              12.     this[trigger_async_id_symbol] = triggerAsyncId; 
              13.  
              14.     if (initHooksExist()) { 
              15.       emitInit(asyncId, type, triggerAsyncId, this); 
              16.     } 
              17.   } 
              18.  
              19.   runInAsyncScope(fn, thisArg, ...args) { 
              20.     const asyncId = this[async_id_symbol]; 
              21.     emitBefore(asyncId, this[trigger_async_id_symbol]); 
              22.  
              23.     const ret = thisArg === undefined ? 
              24.       fn(...args) : 
              25.       ReflectApply(fn, thisArg, args); 
              26.  
              27.     emitAfter(asyncId); 
              28.     return ret; 
              29.   } 
              30.  
              31.   emitDestroy() { 
              32.     if (this[destroyedSymbol] !== undefined) { 
              33.       this[destroyedSymbol].destroyed = true; 
              34.     } 
              35.     emitDestroy(this[async_id_symbol]); 
              36.     return this; 
              37.   } 
              38.  
              39.   asyncId() { 
              40.     return this[async_id_symbol]; 
              41.   } 
              42.  
              43.   triggerAsyncId() { 
              44.     return this[trigger_async_id_symbol]; 
              45.   } 

              使用方式如下。

              1. const { AsyncResource, executionAsyncId,triggerAsyncId } = require('async_hooks'); 
              2. const asyncResource = new AsyncResource('Demo'); 
              3. asyncResource.runInAsyncScope(() => { 
              4.   console.log(executionAsyncId(), triggerAsyncId()) 
              5. }); 

              runInAsyncScope中会把asyncResource的执行上下文设置为当前执行上下文,async id是2,trigger async id是1,所以在回调里执行executionAsyncId输出的是2,triggerAsyncId输出的是1。

              8 AsyncLocalStorage

              AsyncLocalStorage是基于AsyncResource实现的一个维护异步逻辑中公共上下文的类。我们可以把他理解为Redis。我们看一下怎么使用。

              8.1 使用

              1. const { AsyncLocalStorage } = require('async_hooks'); 
              2. const asyncLocalStorage = new AsyncLocalStorage(); 
              3.  
              4. function logWithId(msg) { 
              5.   const id = asyncLocalStorage.getStore(); 
              6.   console.log(`${id !== undefined ? id : '-'}:`, msg); 
              7.  
              8. asyncLocalStorage.run(1, () => { 
              9.     logWithId('start'); 
              10.     setImmediate(() => { 
              11.       logWithId('finish'); 
              12.     }); 
              13.  }); 

              执行上面代码会输出

              1. 1: start 
              2. 1: finish 

              run的时候初始化公共的上下文,然后在run里执行的异步代码也可以拿得到这个公共上下文,这个在记录日志traceId时就会很有用,否则我们就需要把traceId传遍代码每个需要的地方。下面我们看一下实现。

              8.2 实现

              我们先看一下创建AsyncLocalStorage的逻辑

              1. class AsyncLocalStorage { 
              2.   constructor() { 
              3.     this.kResourceStore = Symbol('kResourceStore'); 
              4.     this.enabled = false; 
              5.   } 

              创建AsyncLocalStorage的时候很简单,主要是置状态为false,并且设置kResourceStore的值为Symbol('kResourceStore')。设置为Symbol('kResourceStore')而不是‘kResourceStore‘很重要,我们后面会看到。继续看一下执行AsyncLocalStorage.run的逻辑。

              1. run(store, callback, ...args) { 
              2.     // 新建一个AsyncResource 
              3.     const resource = new AsyncResource('AsyncLocalStorage', defaultAlsResourceOpts); 
              4.     // 通过runInAsyncScope把resource的执行上下文设置完当前的执行上下文 
              5.     return resource.emitDestroy().runInAsyncScope(() => { 
              6.       this.enterWith(store); 
              7.       return ReflectApply(callback, null, args); 
              8.     }); 
              9.   } 

              设置完上下文之后执行runInAsyncScope的回调,回调里首先执行里enterWith。

              1. enterWith(store) { 
              2.     // 修改AsyncLocalStorage状态 
              3.    this._enable(); 
              4.    // 获得当前执行上下文对于多资源,也就是run里创建的resource 
              5.    const resource = executionAsyncResource(); 
              6.    // 把公共上下文挂载到对象上 
              7.    resource[this.kResourceStore] = store;}_enable() { 
              8.    if (!this.enabled) { 
              9.      this.enabled = true; 
              10.      ArrayPrototypePush(storageList, this); 
              11.      storageHook.enable(); 
              12.    } 

              挂载完公共上下文后,就执行业务回调。回调里可以通过asyncLocalStorage.getStore()获得设置的公共上下文。

              1. getStore() { 
              2.   if(this.enabled) { 
              3.     const resource = executionAsyncResource(); 
              4.     return resource[this.kResourceStore]; 
              5.   }} 

              getStore的原理很简单,就是首先拿到当前执行上下文对应的资源,然后根据AsyncLocalStorage的kResourceStore的值从resource中拿到公共上下文。如果是同步执行getStore,那么executionAsyncResource返回的就是我们在run的时候创建的AsyncResource,但是如果是异步getStore那么怎么办呢?因为这时候executionAsyncResource返回的不再是我们创建的AsyncResource,也就拿不到他挂载的公共上下文。为了解决这个问题,Node.js对公共上下文进行了传递。

              1. const storageList = [];  
              2. // AsyncLocalStorage对象数组 
              3. const storageHook = createHook({ 
              4.   init(asyncId, type, triggerAsyncId, resource) { 
              5.     const currentResource = executionAsyncResource(); 
              6.     for (let i = 0; i < storageList.length; ++i) { 
              7.       storageList[i]._propagate(resource, currentResource); 
              8.     } 
              9.   } 
              10. }); 
              11.  
              12.  _propagate(resource, triggerResource) { 
              13.     const store = triggerResource[this.kResourceStore]; 
              14.     if (this.enabled) { 
              15.       resource[this.kResourceStore] = store; 
              16.     } 
              17.   } 

              我们看到Node.js内部创建了一个Hooks,在每次资源创建的时候,Node.js会把当前执行上下文对应的资源中的一个或多个key(根据storageList里对象的this.kResourceStore字段)对应的值挂载到新创建的资源中。所以在asyncLocalStorage.getStore()时即使不是我们在执行run时创建的资源对象,也可以获得具体asyncLocalStorage对象所设置的资源,我们再来看一个例子。

              1. const { AsyncLocalStorage } = require('async_hooks'); 
              2. const asyncLocalStorage = new AsyncLocalStorage(); 
              3. const asyncLocalStorage2 = new AsyncLocalStorage(); 
              4.  
              5. function logWithId(msg) { 
              6.   console.log(asyncLocalStorage2.getStore()); 
              7.   const id = asyncLocalStorage.getStore(); 
              8.   console.log(`${id !== undefined ? id : '-'}:`, msg); 
              9.  
              10. asyncLocalStorage.run(0, () => { 
              11.     asyncLocalStorage2.enterWith({hello: "world"}); 
              12.     logWithId('start'); 
              13.     setImmediate(() => { 
              14.        logWithId('finish'); 
              15.     }); 
              16. }); 

              除了通过asyncLocalStorage.run设置上下文,我们通过asyncLocalStorage2.enterWith也给对象上下文的资源对象挂载一个新属性,key是Symbol('kResourceStore'),值是{hello: "world"},然后在logWithId中输出asyncLocalStorage2.getStore()。从输出中可以看到成功从资源中获得挂载的所有上下文。

              1. { hello: 'world' }0: start 
              2. { hello: 'world' }0: finish 

              我们也可以修改源码验证

              1. Immediate { 
              2.   _idleNext: null, 
              3.   _idlePrev: null, 
              4.   _onImmediate: [Function (anonymous)], 
              5.   _argv: undefined, 
              6.   _destroyed: true, 
              7.   [Symbol(refed)]: null, 
              8.   [Symbol(asyncId)]: 6, 
              9.   [Symbol(triggerId)]: 2, 
              10.   [Symbol(kResourceStore)]: 0, 
              11.   [Symbol(kResourceStore)]: { hello: 'world' } 

              可以看到资源对象挂载里两个key为Symbol(kResourceStore)的属性。

              9 初始化时的Async hooks

              1. const async_hooks = require('async_hooks'); 
              2. const eid = async_hooks.executionAsyncId(); 
              3. const tid = async_hooks.triggerAsyncId(); 
              4. console.log(eid, tid); 

              以上代码中,输出1和0。对应的API实现如下。

              1. // 获取当前的async id 
              2. function executionAsyncId() { 
              3.   return async_id_fields[kExecutionAsyncId]; 
              4.  
              5. // 获取当前的trigger async id,即触发当前代码的async id 
              6. function triggerAsyncId() { 
              7.   return async_id_fields[kTriggerAsyncId]; 

              那么async_id_fields的初始化是什么呢?从env.h定义中可以看到async_id_fields_(async_id_fields是上层使用的名称,对应底层的async_id_fields_)是AliasedFloat64Array类型。

              1. AliasedFloat64Array async_id_fields_; 

              AliasedFloat64Array是个类型别名。

              1. typedef AliasedBufferBase AliasedFloat64Array; 

              AliasedBufferBase的构造函数如下

              1. AliasedBufferBase(v8::Isolate* isolate, const size_t count) 
              2.       : isolate_(isolate), count_(count), byte_offset_(0) { 
              3.  
              4.     const v8::HandleScope handle_scope(isolate_); 
              5.     const size_t size_in_bytes = MultiplyWithOverflowCheck(sizeof(NativeT), count); 
              6.     v8::Local ab = v8::ArrayBuffer::New(isolate_, size_in_bytes); 
              7.     // ... 
              8.   } 

              底层是一个ArrayBuffer。

              1. Local v8::ArrayBuffer::New(Isolate* isolate, size_t byte_length) { 
              2.   i::Isolate* i_isolate = reinterpret_cast(isolate); 
              3.   LOG_API(i_isolate, ArrayBuffer, New); 
              4.   ENTER_V8_NO_SCRIPT_NO_EXCEPTION(i_isolate); 
              5.   i::MaybeHandle result = 
              6.       i_isolate->factory()->NewJSArrayBufferAndBackingStore( 
              7.           byte_length, i::InitializedFlag::kZeroInitialized); 
              8.   // ... 

              ArrayBuffer::New在申请内存时传入了i::InitializedFlag::kZeroInitialized。从V8定义中可以看到会初始化内存的内容为0。

              1. // Whether the backing store memory is initialied to zero or not. 
              2. enum class InitializedFlag : uint8_t {  
              3.     kUninitialized,  
              4.     kZeroInitialized  
              5. }; 

              回到例子中,为什么输出会是1和0而不是0和0呢?答案在Node.js启动时的这段代码。

              1.       InternalCallbackScope callback_scope( 
              2.           env.get(), 
              3.           Local(), 
              4.           // async id和trigger async id 
              5.           { 1, 0 }, 
              6.           InternalCallbackScope::kAllowEmptyResource | 
              7.               InternalCallbackScope::kSkipAsyncHooks); 
              8.       // 执行我们的js         
              9.       LoadEnvironment(env.get()); 
              10. InternalCallbackScope刚才已经分析过,他会把1和0设置为当前的执行上下文。然后在LoadEnvironment里执行我的JS代码时获取到的值就是1和0。那么如果我们改成以下代码会输出什么呢?

                1. const async_hooks = require('async_hooks'); 
                2. Promise.resolve().then(() => { 
                3.   const eid = async_hooks.executionAsyncId(); 
                4.   const tid = async_hooks.triggerAsyncId(); 
                5.   console.log(eid, tid); 
                6. }) 

                以上代码会输出0和。因为执行完我们的JS代码后,InternalCallbackScope就被析构了,从而恢复为0和0。

                 

                相关内容

                热门资讯

                如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
                如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
                施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
                Windows恶意软件20年“... 在Windows的早期年代,病毒游走于系统之间,偶尔删除文件(但被删除的文件几乎都是可恢复的),并弹...
                20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
                德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
                着眼MAC地址,解救无法享受D... 在安装了DHCP服务器的局域网环境中,每一台工作站在上网之前,都要先从DHCP服务器那里享受到地址动...
                为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...