无锁编程(三):实现无锁的队列

September 19, 2016 at 4:20 pm

距离上一篇正篇已经隔了整整三个月了,之前学的知识也全都忘光了,无论如何,今天开始慢慢填坑吧。
上一篇我们实现了一个简单的无锁的栈,并且介绍了无锁编程中经常涉及的一些概念,如CAS原语。今天我们开始实现一个无锁的队列。在传统的编程中,队列的实现并不比栈来得困难,但是在无锁编程中,两者还是有显著的差别。在栈中,我们只需关心栈顶这样一个位置的情况,而在队列中,元素从队列的尾部进入队列,却是从头部出去,这样我们就需要兼顾两端在无锁的情况下线程安全。
事实上,在多线程编程中,队列的使用比栈要多得多,所以实现无锁的队列,才真正意味着无锁技术得到了实际应用。
本篇主要包含以下内容:
1. 入队操作的设计
2. 出队操作的设计

入队操作的设计

我们首先来讨论入队操作的设计。在上一节栈的设计中,我们的入栈操作只需保证栈顶得到及时的更新就可以了,但是在入队操作中,我们要进行两个操作,首先将队尾元素的next指针域指向新的元素,接着需要将tail指针也指向这个新的元素。这样我们就需要进行两次CAS操作,初步的想法如下(我们假设节点的结构和上一节的栈的节点结构一致):

void enqueue(const T &val) {
  Node *node = new Node;
  node->value_ = val;
  node->next = NULL;
  Node *old_tail;
  do {
    old_tail = tail_;
  } while(!CAS(&(old_tail->next), NULL, node));
  CAS(&tail_, old_tail, node);
}

需要注意的是,两次CAS虽然都分别是原子操作,但它们的整体并不是一个原子操作,这样就带来一个问题,当队列处于第一个CAS操作完成,而第二个CAS操作还未开始的中间状态时,容易发生意外。
lock_free_queue
如上图所示,此时原来的队尾已经指向了新的节点,但是tail指针还没有发生更新。如果此时这个线程挂起,那么对于其它的所有线程,第8行的CAS判断都会失败,实际上陷入了死循环。这显然不是无锁的设计,因为一个线程的挂起阻碍了其它所有线程的工作。

到这里我们的设计似乎陷入了僵局。但是反过来想,既然tail_不总是指向队尾,那么我们不把它当作队尾,而是当作一个离队尾比较近的节点不就可以了吗?事实上,由于tail_不指向队尾仅仅存在于上面描述的这种情况发生的时候,此时tail_实际上总是指向队尾的前一个元素。所以tail_在任何时刻,要么指向队尾,要么指向队尾的前一个元素。

有了这样一个不变式之后,我们只需在tail_不指向队尾时人为地把它向后挪一个单位就可以了,当然,为了保证挪动tail_时的安全,同样要使用CAS原语,这样我们就可以实现一个无锁的入队操作:

void enqueue(const T &val) {
  Node *node = new Node;
  node->value_ = val;
  node->next = NULL;
  Node *old_tail;
  bool succ;
  do {
    old_tail = tail_;
    succ = CAS(&(old_tail->next), NULL, node);
    if (!succ) {
      CAS(&tail_, old_tail, old_tail->next);
    }
  } while(!succ);
  CAS(&tail_, old_tail, node);
}

上述的设计方法来自于John D. Valois大神的论文《Implementing Lock-Free Queues》。除了上述方法之外,Valois在论文中还给出了一种不增加额外CAS的方式,在那种方式下,tail_的位置在p个并发的情况下最多可以偏离队尾2p-1个节点。两种设计方式的本质都是把tail_作为一种hint。本文暂时不讨论另一个方式(也许哪天会填上?)。
入队操作的可线性化点在最后一个CAS操作之前,不论最后一个操作是否已经执行,对于其它线程来说,新的元素已经插入到队列中了。由于我们不再假设tail_一定指向队尾,其它线程不会由于这个线程挂起而阻塞,第一个CAS如果返回false,说明有其它节点已经插入,它的最后一步可能完成,也可能还没有完成。第二个CAS则来检测这一点,如果它的最后一步还没有完成,帮它完成这一步,否则什么也不做。第二个CAS失败说明其它节点已经插入且最后一步已经完成。即使其它线程在完成最后一步之前挂起,第二个CAS会“帮助”它来完成这个工作,而不是“等待”其它线程完成。这种“帮助”,实际上就是无锁编程的一个核心概念。

出队操作的设计

相比于入队操作,出队操作要简单得多,我们只需将head_指向下一个节点就可以了(这里我们仍然持续上一篇的假设,节点的内存是自动GC的),和栈的pop操作没有差别。

T dequeue() {
  T ret;
  Node *old_head;
  do {
    old_head = head_;
    if (old_head == NULL) throw std::logic_error("queue is empty!");

    ret = head_->value;
  }while(!CAS(&head_, old_head, old_head->next));

  return ret;
}

这样的设计同样存在一个问题。为了使实现简单,我们的enqueue操作只对tail_进行操作,而dequeue只对head_进行操作,两者互相不知道对方,那么假设某个时刻队里只有一个元素A,head_和tail_都指向这个元素,但它们彼此并不知道。此时进行dequeue操作,head_变成了NULL,而tail_不发生改变。如果再进行enqueue操作插入B,此时队列中实际上存在一个元素B,但由于head_已经变成了NULL,我们实际上访问不到B。还可能存在其它的边界问题,主要原因即两个互不知道的变量指向了同一个元素,而我们只改变了其中一个。为了解决这个问题,我们设置一个dummy节点,head_和tail_在初始化时指向这个dummy节点,每当有enqueue操作时,tail_就会向后移动;类似地,每当有dequeue操作时,head_也向后移动,所以head_总是指向刚刚被出队的节点,而不是队首。这样,只要队列不空,head_和tail_永远不会重合(实际上仍有可能重合,因为如前所述tail_可能来不及更新,但这是没有关系的,因为我们实际的插入总是插在真正的队尾,这里实际上是head_不和实际意义上的队尾重合),也就不会存在一系列边界问题:

T dequeue() {
  T ret;
  Node *old_head;
  do {
    old_head = head_;
    if (old_head->next == NULL) throw std::logic_error("queue is empty!");

    ret = head_->next->value;
  }while(!CAS(&head_, old_head, old_head->next));

  return ret;
}

由于和栈的出栈操作基本类似,出队操作显然也是无锁的,一旦CAS返回false,说明有其它线程进行了dequeue操作且成功地改变了head_。由于入队和出队完全没有访问相同的变量,所以它们之间不会存在竞争。

实现无锁的队列要比实现无锁的栈复杂了一个层次,下一篇,我们讨论如何实现一个无锁的单向链表,单向链表要求在任何位置的插入和删除操作,要复杂得多。