购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.2 Envoy网络模型

4.2.1 Envoy事件调度模型

事件调度是一种基于异步事件触发来驱动业务实现和流转的机制,事件调度是Envoy的基石。Envoy的请求处理和状态流转均依赖事件调度机制来实现,Envoy事件调度相关实现在/source/common/event目录下。

Envoy事件调度模型以Libevent为基础,Libevent是一个开源的基于事件驱动的高性能网络库,当前支持文件、信号和定时器等多种事件类型,Envoy事件调度器由DispatcherImpl负责管理。


DispatcherImpl::DispatcherImpl(TimeSystem& time_system, Buffer::WatermarkFactoryPtr&& factory,  Api::Api& api)
    : api_(api), time_system_(time_system), buffer_factory_(std::move(factory)),
      base_(event_base_new()), scheduler_(time_system_.createScheduler(base_)),
      deferred_delete_timer_(createTimer([this]() -> void { clearDeferredDeleteList(); })),
      post_timer_(createTimer([this]() -> void { runPostCallbacks(); })),
      current_to_delete_(&to_delete_1_) {
}

Envoy当前使用Libevent的如下接口:


//以下几个接口分别用于初始化普通事件、定时器事件和信号事件
event_assign
evtimer_assign
evsignal_assign
//将初始化的event注册到libevent的事件链表上
event_add
//将事件加入到激活队列, event_add的事件只是已注册,加入到激活队列的事件会进入事件触发流程
event_active
//进入事件循环,等待事件触发
event_base_loop

Envoy事件调度模型分为事件创建和提交事件给调度器两部分。Envoy通过FileEvent管理使用最频繁的网络I/O事件,DispatcherImpl::createFileEvent负责创建相应的FileEvent。调用时会指定事件类型和事件触发回调函数,FileEventImpl创建时会分别调用event_assign和event_add创建事件,并将事件加入libevent,调用event_assign会传入事件触发回调函数,事件就绪时,会回调注册的事件回调函数。


FileEventPtr DispatcherImpl::createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) {
  return FileEventPtr{new FileEventImpl(*this, fd, cb, trigger, events)};
}

FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb,
                             FileTriggerType trigger, uint32_t events)
    : cb_(cb), base_(&dispatcher.base()), fd_(fd), trigger_(trigger) {
  assignEvents(events);
  event_add(&raw_event_, nullptr);
}

void FileEventImpl::assignEvents(uint32_t events) {
  event_assign(&raw_event_, base_, fd_,
               EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
                   (events & FileReadyType::Read ? EV_READ : 0) |
                   (events & FileReadyType::Write ? EV_WRITE : 0) |
                   (events & FileReadyType::Closed ? EV_CLOSED : 0),
               [](evutil_socket_t, short what, void* arg) -> void {
                 FileEventImpl* event = static_cast<FileEventImpl*>(arg);
                 uint32_t events = 0;
                 if (what & EV_READ) {
                   events |= FileReadyType::Read;
                 }
                 if (what & EV_WRITE) {
                   events |= FileReadyType::Write;
                 }
                 if (what & EV_CLOSED) {
                   events |= FileReadyType::Closed;
                 }
                 event->cb_(events);
               },
               this);
}

对于定时器事件来说,通过DispatcherImpl::createTimer创建定时器事件,通过TimerImpl::enableTimer完成定时器的提交。


TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
  ASSERT(isThreadSafe());
  return scheduler_->createTimer(cb);
}

class RealScheduler : public Scheduler {
public:
  RealScheduler(Libevent::BasePtr& libevent) : libevent_(libevent) {}
  TimerPtr createTimer(const TimerCb& cb) override {
    return std::make_unique<TimerImpl>(libevent_, cb);
  };

private:
  Libevent::BasePtr& libevent_;
};

TimerImpl::TimerImpl(Libevent::BasePtr& libevent, TimerCb cb) : cb_(cb) {
  ASSERT(cb_);
  evtimer_assign(
      &raw_event_, libevent.get(),
      [](evutil_socket_t, short, void* arg) -> void 
{ static_cast<TimerImpl*>(arg)->cb_(); }, this);
}

void TimerImpl::disableTimer() { event_del(&raw_event_); }

void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
  if (d.count() == 0) {
    event_active(&raw_event_, EV_TIMEOUT, 0);
  } else {
    timeval tv;
    tv.tv_sec = us.count() / 1000000;
    tv.tv_usec = us.count() % 1000000;
    event_add(&raw_event_, &tv);
  }
}

除了支持常规的事件触发外,Envoy事件调度器也支持任务的异步触发,通过Dispat-cherImpl::post投递具体任务,DispatcherImpl通过post_timer_定时器触发异步任务的具体触发。

为了方便各个工作线程的事件触发和管理,减少不同线程处理时的锁开销,Envoy线程模型为每个线程创建一个事件调度器,各个模块直接传递指针对象到事件调度器,如果回调时对象已经被销毁,就会出现访问异常,因此必须对传递给事件调度器的指针对象的生命周期进行有效管理,Envoy使用延迟析构来解决这个问题。

DispatcherImpl在管理析构对象时,先调用DispatcherImpl::deferredDelete进行被析构对象的注册,并使用deferred_delete_timer_定时器完成被析构对象的实际处理。定时器的处理函数是DispatcherImpl::clearDeferredDeleteList,clearDeferredDeleteList函数调用每个被析构对象的reset函数进行实际的析构处理。具体处理过程中,DispatcherImpl通过to_delete_1_和to_delete_2_对被析构对象进行交替处理,由于对象析构比异步任务执行的优先级低一些,所以会将被析构对象拆分为两部分交替执行,避免对象析构耗时过长影响其他任务的执行。

4.2.2 Envoy线程模型

Envoy采用控制流和数据流分离的思想,将线程模型主要分为主线程和工作线程两类。

①Envoy的主线程负责控制流逻辑,主要是XDS配置更新、修改XDS相关的配置和数据。

②Envoy的工作线程负责数据流逻辑,主要是请求消息转发处理,转发处理时会查询XDS数据。统计信息也类似,工作线程负责修改统计信息,主线程负责对各个线程的统计信息进行汇总和统一上报。

4.2.3 线程本地存储机制

Envoy中多个线程并发操作同一个数据的场景很多,为了减少多线程并发访问共享数据时的锁开销,提升系统的整体性能,Envoy引入了TLS(线程本地存储)的概念。通过TLS,主线程上运行的代码可以分配到进程范围内的TLS槽,TLS槽是一个允许O(1)访问的向量索引,主线程可以将任意数据放在TLS槽,然后通过事件的方式将数据变更发布到每个工作线程中。每个工作线程都会基于主线程的TLS槽数据,复制一份完全相同的线程本地数据,工作线程只需要访问本线程的线程本地数据,同时每个线程的事件调度器响应主线程发布的变更事件,触发线程本地数据的更新。Envoy TLS的整体处理流程如图4-2所示。

图4-2 Envoy TLS处理流程图

Envoy的TLS数据通过下述的thread_local_data_来进行管理,thread_local_data_是基于C++11 thread_local的线程本地数据,每个线程均拥有各自的线程本地数据,同时通过本地的事件调度器dispatcher_和主线程进行通信。


struct ThreadLocalData {
    Event::Dispatcher* dispatcher_{};
    std::vector<ThreadLocalObjectSharedPtr> data_;
  };

static thread_local ThreadLocalData thread_local_data_;

为了在工作线程和主线程之间实现配置与数据交互,需要在主线程和工作线程之间建立一条信息传递的管道,各个工作线程的角色完全相同,相互之间不需要通信,它们均和主线程进行通信。由于主线程只有一个,各个工作线程通过全局变量main_thread_dispatcher_就可以和主线程进行通信。同时,Envoy工作线程启动后,向主线程注册自己的事件调度器,相当于表明自己的身份,当有事件发生时,主线程就可以通过注册的事件调度器进行事件通知。实现如下:


WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
Event::DispatcherPtr&& dispatcher,
Network::ConnectionHandlerPtr handler,
OverloadManager& overload_manager, Api::Api& api)
: tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), api_(api) {
    //注册当前线程的事件调度器,用于当前线程和主线程之间的事件通信
    tls_.registerThread(*dispatcher_, false);
    ...
}

在registerThread实现中,如果当前线程为主线程,将当前事件调度器设置到主线程调度器main_thread_dispatcher_;如果当前线程为工作线程,则将当前线程记录到线程列表registered_threads_中,后续主线程会基于registered_threads_对每一个工作线程进行循环处理。


void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
  if (main_thread) {
    main_thread_dispatcher_ = &dispatcher;
    thread_local_data_.dispatcher_ = &dispatcher;
  } else {
    registered_threads_.push_back(dispatcher);
    dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
  }
}

为了对不同的TLS对象进行区分和管理,Envoy为每个TLS对象分配不同的槽位,因此TLS对象使用前需要通过allocateSlot申请当前对象对应的槽位信息。allocateSlot会遍历当前已有槽位信息,查找是否有空闲的槽位,如果有直接拿过来使用;如果没有,分配新的槽位信息。

槽位信息的维护本质是资源池的管理,当前实现采用O(N)的方式从头开始遍历,当已有槽位个数非常多时,这里遍历查找的性能会很低,可以通过记录空闲槽位位置等方式进行优化。


SlotPtr InstanceImpl::allocateSlot() {
  for (uint64_t i = 0; i < slots_.size(); i++) {
    if (slots_[i] == nullptr) {
      std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, i));
      slots_[i] = slot.get();
      return slot;
    }
  }
  std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
  slots_.push_back(slot.get());
  return slot;
}

创建TLS对象的槽位后,会返回当前分配的槽位的地址,作为当前槽位的唯一标识和入口,后续对该TLS对象的所有操作均通过该槽位标识进行。

创建槽位后就可以通过set接口进行TLS数据的设置,registered_threads_存放了各个工作线程的线程本地信息,之前工作线程启动时已经将本线程的事件调度器注册到registered_threads_,这里遍历registered_threads_将数据设置事件推送到各个工作线程的调度器中进行处理,当然对于主线程自身来说是直接处理,具体处理函数是setThreadLocal。


void InstanceImpl::SlotImpl::set(InitializeCb cb) {
  for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
    const uint32_t index = index_;
    dispatcher.post([index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); });
  }
  //主线程处理
  setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}

setThreadLocal的数据处理主要是利用了C++11的Thread Local特性,C++11的Thread Local对象内部为每个线程创建一个独立的副本信息,对Thread Local进行设置操作时,Thread Local内部会对这些线程副本数据进行更新。


void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
  if (thread_local_data_.data_.size() <= index) {
    thread_local_data_.data_.resize(index + 1);
  }
  thread_local_data_.data_[index] = object;
}

对TLS数据的查询和使用通过tls_->getTyped的方式,实际调用的是SlotImpl的get接口,get接口的实现非常简单,直接使用SlotImpl保存的槽位编号,返回线程本地数据对应槽位的数据即可。 SuKcl87a7NoxQhdkSMDMdGORz1JDyZRE3HQyhJTYO2FAQS65Uyg/erOHJMb1k0pd


ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
  return thread_local_data_.data_[index_];
}
点击中间区域
呼出菜单
上一章
目录
下一章
×