条件变量入门

January 19, 2017 at 10:14 pm

好久没更了呃。最近终于闲下来瞎看了一些东西,但是目前能单独整理出来的并不多,关于条件变量,算是第2次接触了,但是见到的时候还是一脸蒙蔽,所以这次更详细地学了一下。本来打算再看一下用条件变量来解决生产者消费者问题,研究一下条件变量的实现细节的,但是相关内容比较多,而且条件变量目前用到的并不多,所以暂时不多做展开,就是简单入一下门好了。

本文包含一下几个小节:
1. 条件变量及其初衷
2. 条件变量的使用:以pthread库为例
3. c++11中条件变量的使用

条件变量及其初衷

条件变量(condition variable)是多线程编程同步问题中的一个典型设计,和信号量类似,是建立在底层锁结构之上的一种同步机制。通常来讲,条件变量和信号量可以用来实现相类似的功能,比如解决生产者消费者问题,某种意义上可以说,条件变量是简陋版的信号量。条件变量的设计初衷从这个名字本身就可以很明显地体现出来。在多线程环境中,我们经常会遇到这样的场景:某个线程需要等某个条件成立时才能继续进行,而这个条件的成立过程由其它线程来完成。典型的例子是父线程在创造子线程之后,希望等到“子线程运行结束”这个条件结束,再继续执行之后的代码。问题的代码描述如下:

void *child(void *arg) {
  printf("child\n");
  // XXX how to indicate we are done?
  return NULL;
}

int main(int argc, char *argv[]) {
  printf("parent: begin\n");
  pthread_t c;
  Pthread_create(&c, NULL, child, NULL); // create child
  // XXX how to wait for child?
  printf("parent: end\n");
  return 0;
}

一个最简单的实现方式是使用一个父子线程都可以访问到的全局变量done,子线程在完成之后将其置1,而父线程则循环判断这个变量是否为1,直到其为1时跳出循环:

volatile int done = 0;

void *child(void *arg) {
  printf("child\n");
  done = 1;
  return NULL;
}

int main(int argc, char *argv[]) {
  printf("parent: begin\n");
  pthread_t c;
  pthread_create(&c, NULL, child, NULL); // create child
  while (done == 0)
  ; // spin
  printf("parent: end\n");
  return 0;
}

这种做法的一个明显缺陷就是父线程在while中自旋时对CPU资源的浪费。当然,我们可以考虑在while循环中让父线程sleep一段时间,但是这个时间太长的话,又将导致父线程在子线程完成之后无法及时地响应。
条件变量将帮助我们智能地解决这个sleep时间的问题。条件变量一般对外提供wait和signal两个接口,本文不具体分析其实现远离,简单地来说,wait表示等待某个条件变量的成立,底层会将线程睡眠,并放入一个等待队列中,而signal则表明条件的成立,并唤醒等待队列中的一个(或多个)线程。

条件变量的使用:以pthread库为例

我们以pthread库为例,讨论条件变量的基本使用。下面列出的是pthread库提供的和条件变量相关的函数和宏:

#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

int pthread_cond_signal(pthread_cond_t *cond);

int pthread_cond_broadcast(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

int pthread_cond_destroy(pthread_cond_t *cond);

其中 pthread_cond_t 就是条件变量的类型,我们可以使用pthread_cond_init函数来对它进行初始化,关于其第2个参数cond_attr这里不做介绍,不需要时可以直接传NULL。除了init函数以外,也可以使用PTHREAD_COND_INITIALIZER宏(#define PTHREAD_COND_INITIALIZER { { 0, 0, 0, 0, 0, (void *) 0, 0, 0 } })来进行静态初始化。这里我们只使用wait和signal两个功能。使用条件变量的实现如下:

int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit() {
  pthread_mutex_lock(&m);
  done = 1;
  pthread_cond_signal(&c);
  pthread_mutex_unlock(&m);
}

void *child(void *arg) {
  printf("child\n");
  thr_exit();
  return NULL;
}

void thr_join() {
  pthread_mutex_lock(&m);
  while (done == 0)
    pthread_cond_wait(&c, &m);
  pthread_mutex_unlock(&m);
}

int main(int argc, char *argv[]) {
  printf("parent: begin\n");
  pthread_t p;
  pthread_create(&p, NULL, child, NULL);
  thr_join();
  printf("parent: end\n");
  return 0;
 }

代码的逻辑还是比较清晰的。主要是如下几点:
1. 无论父线程(等待条件成立的线程)还是子线程(使条件成立的线程),都需要将代码用mutex包围
2. 仍然需要使用条件done来判断条件是否成立
3. 父线程调用wait,而子线程调用signal,需要注意的是这wait函数需要以使用的mutex为参数传入,因为wait会暂时把mutex释放以使其它线程执行

首先我们分析一下为什么需要第2点。既然我们已经使用了条件变量,为什么还需要对状态done进行判断呢?父线程首先wait,子线程在完成之后调用signal,似乎也没有什么问题,于是我们这样改写一下程序:

void thr_exit() {
  pthread_mutex_lock(&m);
  pthread_cond_signal(&c);
  pthread_mutex_unlock(&m);
}

void thr_join() {
  pthread_mutex_lock(&m);
  pthread_cond_wait(&c, &m);
  pthread_mutex_unlock(&m);
}

这个程序实际上存在缺陷。设想父线程在创建完子线程之后正好时间片到期进入睡眠,而子线程早早运行,执行了signal,但这时父线程由于还没有调用wait,所以并不在等待队列里,也就不会被唤醒。而当父线程真正调用wait将自己放入等待队列中后,将不会再有线程唤醒它。从这里我们可以看到变量done的价值。
我们再来分析mutex在这里的作用。如果我们移除mutex,那么代码就变成了这样:

void thr_exit() {
  done = 1;
  pthread_cond_signal(&c);
}

void thr_join() {
  if (done == 0)
    pthread_cond_wait(&c); // 注意这里仅是示意代码,由于我们移除了mutex,所以这里没有以它为参数。
}

这将导致race condition。如果父线程在if语句判断完done为0之后立刻由于时间片到期陷入睡眠,子线程执行完毕将done置为1,调用signal,同样没法唤起任何线程,而父线程醒来时直接执行wait,同样陷入了等待队列里,不会再被唤醒。这里的mutex实际上保证了父线程满足if条件之后而进入等待队列之前,signal不会被调用。

c++11中条件变量的使用

c++11标准实现了条件变量,包含在头文件中。这里我们简单介绍一下使用方法,本质上和pthread库的使用没有多大区别。
wait函数的原型如下:

void wait (unique_lock<mutex>& lck);
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

其中第2个重载相当于

while (!pred()) wait(lck);

notify_one和notify_all即是对应了signal,前者只唤醒一个线程,而后者则唤醒所有线程。这里直接附上http://en.cppreference.com中的示例程序:

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // Wait until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
    // after the wait, we own the lock.
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // Manual unlocking is done before notifying, to avoid waking up
    // the waiting thread only to block again (see notify_one for details)
    lk.unlock();
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
 
    data = "Example data";
    // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();
 
    // wait for the worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}