加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 建站 > 正文

硬核!Rust异步编程方式重大升级:新版Tokio如何提升10倍性能详解

发布时间:2019-10-24 04:19:00 所属栏目:建站 来源:高可用架构
导读:协程或者绿色线程是近年来经常讨论的话题。Tokio作为Rust上协程调度器实现的典型代表,其设计和实现都有其特色。本文是Tokio团队在新版本调度器发布后,对其设计和实现的经验做的总结,十分值得一读。 Tokio作为 Rust 语言的异步运行时,我们一直在为它的

由于增长本地队列的相关成本不低,因此值得研究是否需要增长队列。这个问题最终导致了调度程序的重写。新调度程序的策略是对每个队列使用固定大小。当队列已满时,任务将被推送到一个全局的、多使用者、多生产者队列中,而不是增长本地队列。处理器需要检查这个全局队列,但检查的频率要比本地队列低得多。

早期实验过用有界mpmc队列代替了Crossbeam队列。由于push和pop都执行了大量的同步,因此没有带来太大的提升。关于窃取任务,需要记住的一个关键点是,在有负载的时候队列几乎没有争用,因为每个处理器只访问自己的队列。

在这一点上,我仔细阅读go源代码,发现它使用了一个固定大小的单生产者、多消费者队列。这个队列令只需要很少的同步就可以正常工作。我对该算法进行了一些修改,使之适用于tokio调度程序。值得注意的是,go实现版本中其原子操作使用顺序一致性(基于我对go的有限知识)。作为tokio调度器的一部分,该版本还降低了较冷代码路径中的一些复制。

该队列实现是一个循环缓冲区,使用数组存储值。原子整数用于跟踪头部和尾部位置。

  1. struct Queue { /// Concurrently updated by many threads. head: AtomicU32, 
  2.  /// Only updated by producer thread but read by many threads. tail: AtomicU32, 
  3.  /// Masks the head / tail position value to obtain the index in the buffer. mask: usize, 
  4.  /// Stores the tasks. buffer: Box<[MaybeUninit<Task>]>,} 

入队由单独线程完成:

  1. loop { let head = self.head.load(Acquire); 
  2.  // safety: this is the **only** thread that updates this cell. let tail = self.tail.unsync_load; 
  3.  if tail.wrapping_sub(head) < self.buffer.len as u32 { // Map the position to a slot index. let idx = tail as usize & self.mask; 
  4.  // Don't drop the previous value in `buffer[idx]` because // it is uninitialized memory. self.buffer[idx].as_mut_ptr.write(task); 
  5.  // Make the task available self.tail.store(tail.wrapping_add(1), Release); 
  6.  return; } 
  7.  // The local buffer is full. Push a batch of work to the global // queue. match self.push_overflow(task, head, tail, global) { Ok(_) => return, // Lost the race, try again Err(v) => task = v, }} 

请注意,在此push函数中,唯一的原子操作是使用Acquire顺序的load和具有Release顺序的store。没有读-修改-写操作(compare_and_swap,fetch_and等)或顺序一致性。因为在x86芯片上,所有load/store 已经是“原子的”。因此,在CPU级别,此功能不是同步的。使用原子操作会影响编译器,因为它会阻止某些优化,但也仅此而已。第一个load很可能可以通过Relaxed顺序完成,但是切换成Relaxed顺序没有明显的收益。

队列已满时,将调用push_overflow。此功能将本地队列中的一半任务移到全局队列中。全局队列是由互斥锁保护的侵入链表。首先将要移动到全局队列中的任务链接在一起,然后获取互斥锁,并通过更新全局队列的尾指针来写入所有任务。

如果您熟悉原子内存操作的细节,您可能会注意到上图所示的push函数可能会产生“问题”。使用Acquire顺序的原子load同步语义非常弱。它可能会返回老值(并发窃取操作可能已经增加了self.head的值),但是执行入队的线程会读到线程中的老值。这对于算法的正确性不是问题。在入队的代码路径中,我们只关心本地运行队列是否已满。鉴于当前线程是可以执行入队操作的唯一线程,则老值将使队列比实际更满。它可能会错误地认为队列已满并进入push_overflow函数,但是此函数包括更强的原子操作。如果push_overflow确定队列实际上未满,则返回w / Err并再次尝试push操作。这是push_overflow将一半运行队列移至全局队列的另一个原因。通过移动一半的队列,“运行队列为空”的误报率就会降低。

本地出对消息也很轻量级:

  1. loop { let head = self.head.load(Acquire); 
  2.  // safety: this is the **only** thread that updates this cell. let tail = self.tail.unsync_load; 
  3.  if head == tail { // queue is empty return None; } 
  4.  // Map the head position to a slot index. let idx = head as usize & self.mask; 
  5.  let task = self.buffer[idx].as_ptr.read; 
  6.  // Attempt to claim the task read above. let actual = self .head .compare_and_swap(head, head.wrapping_add(1), Release); 
  7.  if actual == head { return Some(task.assume_init); }} 

在此函数中,存在单个原子load和一个带有release的compare_and_swap。主要开销来自compare_and_swap。

窃取功能类似于出队,但是self.tail的load必须是原子的。同样,类似于push_overflow,窃取操作将尝试窃取队列的一半,而不是单个任务。这这是不错的特性,我们将在后面介绍。

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读