2. 线程同步精要

第2章 线程同步精要

并发编程有两种基本模型,一种是message passing,另一种是shared memory。在分布式系统中,运行在多台机器上的多个进程的并行编程只有一种实用模型:message passing。

  • 在单机上,我们也可以照搬message passing作为多个进程的并发模型。这样整个分布式系统的架构的一致性很强,扩容(scale out)起来也较容易。
  • 在多线程编程中,message passing更容易保证程序的正确性,有的语言只提供这一种模型。
  • 不过在用C/C++编写多线程程序时,我们仍然需要了解底层的shared memory模型下的同步原语,以备不时之需。分享一些C++多线程编程的经验。本章多次引用《Real-World Concurrency》一文的观点,这篇文章的地址是http://queue.acm.org/detail.cfm?id=1454462,后文简称[RWC]。

线程同步的四项原则,按重要性排列:

  1. 首要原则是尽量最低限度地共享对象,减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露:如果要暴露,优先考虑immutable对象:实在不行才暴露可修改的对象,并用同步措施来充分保护它。
  2. 其次是使用高级的并发编程构件,如TaskQueue、Producer-Consumer Queue、CountDownLatch等等。
  3. 最后不得已必须使用底层同步原语(primitives)时,只用非递归的互斥器和条件变量,慎用读写锁,不要用信号量。
  4. 除了使用atomic整数之外,不自己编写lock-free代码,也不要用“内核级同步原语”。不凭空猜测“哪种做法性能会更好”,比如spinlock vs. mutex。

前面两条很容易理解,这里着重讲一下第3条:底层同步原语的使用。

1. 互斥器(mutex)


Tips:

RAII的中文翻译是“资源获取即初始化”。

RAII(Resource Acquisition Is Initialization)是一种编程技术,主要用于C++中,用于管理资源的生命周期。RAII的核心思想是将资源的获取与对象的生命周期绑定,确保资源在对象生命周期结束时自动释放,从而避免内存泄漏和其他资源管理错误。

具体来说,RAII通过以下方式实现:

  1. 资源获取即初始化:当一个对象被创建时,立即获取所需的资源(如内存、文件句柄、网络连接等)。

  2. 资源释放即析构:当对象的生命周期结束时(例如对象离开作用域或被显式删除),其析构函数会被自动调用,释放在构造函数中获取的资源。

  3. 对象所有权:通过对象的所有权来管理资源的生命周期,确保资源不会被错误地释放或重复释放。

  4. 异常安全:RAII有助于实现异常安全,因为即使在抛出异常的情况下,对象的析构函数也会被调用,从而释放资源。

RAII的优点包括:

  • 自动资源管理:减少了手动管理资源的复杂性。
  • 减少内存泄漏:通过自动释放资源,减少了内存泄漏的风险。
  • 提高代码的可读性和可维护性:资源管理逻辑与业务逻辑分离,使得代码更加清晰。
  • 异常安全:提高了代码的健壮性。

RAII是C++中资源管理的最佳实践之一,它利用C++的构造函数和析构函数机制来实现资源的安全管理。


互斥器(mutex)恐怕是使用得最多的同步原语,粗略地说,它保护了临界区,任何一个时刻最多只能有一个线程在此mutex划出的临界区内活动。单独使用mutex时,我们主要为了保护共享数据。我个人的原则是:

  • 用RAII手法封装mutex的创建、销毁、加锁,解锁这四个操作。用RAII封装这几个操作是通行的做法,这几乎是C++的标准实践。Java里的synchronized语句和C#的using语句也有类似的效果,即保证锁的生效期间等于一个作用域(scope),不会因异常而忘记解锁。
  • 只用非递归的mutex(即不可重入的mutex)。
  • 不手工调用lock()和unlock()函数,一切交给栈上的Guard对象的构造和析构函数负责。Guard对象的生命期正好等于临界区(分析对象在什么时候析构是C++程序员的基本功)。这样我们保证始终在同一个函数同一个scope里对某个mutex加锁和解锁。避免在foo()里加锁,然后跑到bar()里解锁;也避免在不同的语句分支中分别加锁、解锁。这种做法被称为ScopedLocking。

次要原则有:

  • 不使用跨进程的mutex,进程间通信只用TCP sockets。
  • 加锁、解锁在同一个线程,线程a不能去unlock线程b已经锁住的mutex(RAII自动保证)
  • 别忘了解锁(RAII自动保证)。
  • 不重复解锁(RAII自动保证)。
  • 必要的时候可以考虑用PTHREAD_MUTEX_ERRORCHECK来排错。

mutex恐怕是最简单的同步原语,按照上面的几条原则,几乎不可能用错。

1.1 只使用非递归的mutex

mutex分为递归(recursive)和非递归(non-recursive)两种,这是POSIX的叫法,另外的名字是可重入(reentrant)与非可重入。这两种mutex作为线程间(inter-thread)的同步工具时没有区别,它们的唯一区别在于:同一个线程可以重复对recursive mutex加锁,但是不能重复对non-recursive mutex加锁。

首选非递归mutex,绝对不是为了性能,而是为了体现设计意图。non-recursive和recursive的性能差别其实不大,因为少用一个计数器,前者略快一点点而已。在同一个线程里多次对non-recursive mutex加锁会立刻导致死锁,我认为这是它的优点,能帮助我们思考代码对锁的期求,并且及早(在编码阶段)发现问题。

毫无疑问recursive mutex使用起来要方便一些,因为不用考虑一个线程会自己把自己给锁死了,我猜这也是Java和Windows默认提供recursive mutex的原因。(Java语言自带的intrinsic lock是可重入的,它的util.concurrent库里提供的ReentrantLock,Windows的CRITICAL_SECTION也是可重入的。似乎它们都不提供轻量级的non-recursive mutex。)

正因为它方便,recursive mutex可能会隐藏代码里的一些问题。典型情况是你以为拿到一个锁就能修改对象了,没想到外层代码已经拿到了锁,正在修改(或读取)同一个对象呢。来看一个具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
MutexLock mutex;
std::vector<Foo> foos;

void post(const Foo& f) {
MutexLockGuard lock(mutex);
foos.push_back(f);
}

void traverse(){
MutexLockGuard lock(mutex);
for (std::vector<Foo>::const_iterator it = foos.begin();
it != foos.end();++it) {
it->doit();
}

}

post()加锁,然后修改foos对象;traverse()加锁,然后遍历foos向量。这些都是正确的。

将来有一天,Foo::doit()间接调用了post(),那么会有戏剧性的结果:

  1. mutex是非递归的,于是死锁了。
  2. mutex是递归的,由于push_back()可能(但不总是)导致vector迭代器失效,程序偶尔会crash。

这时候就能体现non-recursive的优点:把程序的逻辑错误暴露出来。死锁比较容易debug,把各个线程的调用栈打出来,只要每个函数不是特别长,很容易看出来是怎么死的,见\(\S2.1.2\)的例子。或者可以用PTHREAD_MUTEX_ERRORCHECK一下子找到错误(前提是MutexLock带debug选项)。程序反正要死,不如死得有意义一点,留个“全尸”,让验尸(post-mortem)更容易些。

如果确实需要在遍历的时候修改vector,有两种做法,一是把修改推后,记住循环中试图添加或删除哪些元素,等循环结束了再依记录修改foos:二是用copy-on-write,见\(\S2.8\)的例子。

如果一个函数既可能在已加锁的情况下调用,又可能在未加锁的情况下调用,那么就拆成两个函数:

  1. 跟原来的函数同名,函数加锁,转而调用第2个函数。
  2. 给函数名加上后缀withLockHold,不加锁,把原来的函数体搬过来。

就像这样:

1
2
3
4
5
6
7
8
9
void post(const Foo& f) {
MutexLockGuard lock(mutex);
postwithLockHold(f); // 不用担心开销,编译器会自动内联的
}

void postwithLockHold(const Foo& f){
foos.push_back(f);
}

这有可能出现两个问题:

(a)误用了加锁版本,死锁了。 (b)误用了不加锁版本,数据损坏了。

对于(a),仿造\(\S2.1.2\)的办法能比较容易地排错。对于(b),如果Pthreads提供isLockedByThisThread()就好办,可以写成:

1
2
3
4
void postwithLockHold(const Foo& f)
{
assert(mutex.isLockedByThisThread()); // muduo::MutexLock提供了这个成员函数
}

另外,WithLockHold这个显眼的后缀也让程序中的误用容易暴露出来。

C++没有annotation,不能像Java那样给method或field标上@GuardedBy注释,需要程序员自己小心在意。虽然这里的办法不能一劳永逸地解决全部多线程错误,但能帮上一点是一点了。

我还没有遇到过需要使用recursive mutex的情况,我想将来遇到了都可以借助wrapper改用non-recursive mutex,代码只会更清晰。

Pthreads的权威专家,《Programming with POSIX Threads》的作者David Butenhof也排斥使用recursive mutex。他说:

First, implementation of efficient and reliable threaded code revolves around one simple and basic principle: follow your design. That implies, of course, that you have a design, and that you understand it. A correct and well understood design does not require recursive mutexes(后略)

回到正题。本文这里只谈了mutex本身的正确使用,在C++里多线程编程还会遇到其他一些race condition,请参看第1章。

性能注脚:Linux的Pthreads mutex采用futex实现,不必每次加锁、解锁都陷入系统调用,效率不错。Windows的CRITICAL_SECTION也是类似的,不过它可以嵌入一小段spinlock。在多CPU系统上,如果不能立刻拿到锁,它会先spin一小段时间,如果还不能拿到锁,才挂起当前线程。

1.2 死锁

前面说过,如果坚持只使用ScopedLocking,那么在出现死锁的时候很容易定位。考虑下面这个线程自己与自己死锁的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Request {
public:
void process() __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
print(); // 原本没有这行,某人为了调试程序不小心添加了。
}

void print() const // __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
}

private:
muduo::MutexLockGuard lock(mutex_);
};

int main() {
Reuqest req;
req.process();
}

在上面这个例子中,原本没有L8,在添加它之后,程序立刻出现了死锁。要调试定位这种死锁很容易,只要把函数调用栈打印出来,结合源码一看,我们立刻就会发现第6行Request::process()和第5行Request::print()先后对同一个mutex上锁,引发了死锁。(必要的时候可以加上__attribute__来防止函数inline展开。)

要修复这个错误也很容易,按前面的办法,从 Request::print() 抽取出 Request::printWithLockHold(),并让 Request::print()Request::process() 都调用它即可。

再来看一个更真实的两个线程死锁的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Inventory {
public:
void add(Request* req)
{
muduo::MutexLockGuard lock(mutex_);
requests_.insert(req);
}

void remove(Request* req) // __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
requests.erase(req);
}

void printAll() const;
private:
mutable muduo::MutexLock mutex_;
std::set<Request*> requests_;
};

Inventory g_inventory; // 为了简单起见,这里使用了全局对象。

Request class与Inventory class的交互逻辑很简单,在处理(process)请求的时候,往g_inventory中添加自己。在析构的时候,从g_inventory中移除自己。目前看来,整个程序还是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Request {
public:
void process() // __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(this);
// ...
}

~Request() __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
sleep(1); // 为了容易复现死锁,这里用了延时
g_inventory.remove(this);
}

void print() const __attribute__((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
}
private:
mutable muduo::MutexLock mutex_;
};

Inventory class还有一个功能是打印全部已知的Request对象。Inventory::printAll()里的逻辑单独看是没问题的,但是它有可能引发死锁。

1
2
3
4
5
6
7
8
9
10
void Inventory::printAll() const{
muduo::MutexLockGuard lock(mutex_);
sleep(1); // 为了容易复现死锁,这里用了延时
for (std::set<Request*>::const_iterator it = requests_.begin();
it != requests_.end();++it){
(*it)->print;
}
printf("Inventory::printAll() unlocked\n");
}

下面这个程序运行起来发生了死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void threadFunc()
{
Request* req= new Request;
req->process();
delete req;
}

int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(50e*1000); // 为了让一个线程等在前面第14行的sleep()上。
g_inventory.printAll();
thread.join();
}

通过gdb查看两个线程的函数调用栈,我们发现两个线程都等在mutex上(__lll_lock_wait),估计是发生了死锁。因为一个程序中的线程一般只会等在condition variable上,或者等在epoll_wait上。

注意到 main() 线程是先调用 Inventory::printAll(#6) 再调用 Request::print(#5),而 threadFunc() 线程是先调用 Request::Request(#6) 再调用 Inventory::remove(#5)。这两个调用序列对两个 mutex 的加锁顺序正好相反,于是造成了经典的死锁。见图 2-1,Inventory class 的 mutex 的临界区由灰底表示,Request class 的 mutex 的临界区由斜纹表示。一旦 main() 线程中的 printAll() 在另一个线程的 ~Request() 和 remove() 之间开始执行,死锁已不可避免。

思考:如果printAll()晚于remove()执行,还会出现死锁吗?

练习:修改程序,让~Request()printAll()print()之间开始执行,复现另一种可能的死锁时序。

这里也出现了第1章所说的对象析构的race condition,即一个线程正在析构对象,另一个线程却在调用它的成员函数。

解决死锁的办法很简单,要么把print()移出printAll()的临界区,这可以用\(\S2.8\)介绍的办法;要么把remove()移出~Request()的临界区,比如交换p.37中L13和L15两行代码的位置。当然这没有解决对象析构的race condition,留给读者当做练习吧。

思考:Inventory::printAll()Request::print()有没有可能与Request::process()Inventory::add()发生死锁?

死锁会让程序行为失常,其他一些锁使用不当则会影响性能,例如潘爱民老师写的《Lock Convoys Explained》详细解释了一种性能衰退的现象。除此之外,编写高性能多线程程序至少还要知道false sharing和CPU cache效应。

2. 条件变量(condition variable)

互斥器(mutex)是加锁原语,用来排他性地访问共享数据,它不是等待原语。在便用mutex的时候,我们一般都会期待加锁不要阻塞,总是能立刻拿到锁。然后尽快访问数据,用完之后尽快解锁,这样才能不影响并发性和性能。

如果需要等待某个条件成立,我们应该使用条件变量(condition variable)。条件变量顾名思义是一个或多个线程等待某个布尔表达式为真,即等待别的线程“唤醒”它。条件变量的学名叫管程(monitor)。Java Object内置的wait()、notify()、notifyAll()是条件变量。

条件变量只有一种正确使用的方式,几乎不可能用错。对于wait端:

  1. 必须与mutex一起使用,该布尔表达式的读写需受此mutex保护。
  2. 在mutex已上锁的时候才能调用wait。
  3. 把判断布尔条件和wait()放到while循环中。

写成代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque<int> queue;
int dequeue()
{
MutexLockGuard lock(mutex);
while(queue.empty()) // 必须用循环:必须在判断之后再wait()
{
cond.wait(); // 这一步会原子地unlock mutex并进入等待,不会与enqueue死锁
// wait() 执行完毕时会自动重新加锁
}
assert(queue.empty());
int top = queue.front();
queue.pop_front();
return top;
}

上面的代码中必须用while循环来等待条件变量,而不能用if语句,原因是spurious wakeup。这也是面试多线程编程的常见考点。


Tips:

在多线程编程中,条件变量是一种同步机制,用于阻塞一个或多个线程直到某个条件为真。在某些编程语言和操作系统中,条件变量的使用通常涉及到循环,而不是简单的if语句,这主要是为了防止所谓的“虚假唤醒”(spurious wakeup)问题。

虚假唤醒是指线程从等待状态被唤醒,但不是因为条件变量被通知(如通过notifynotifyAll方法),而是由于其他原因,比如操作系统的调度行为。在这种情况下,即使条件变量没有被正确地通知,线程也可能被唤醒。如果使用if语句来检查条件变量,那么在虚假唤醒的情况下,线程可能会错误地认为条件已经满足,并继续执行,这可能会导致程序出现逻辑错误。

为了避免这个问题,通常推荐的做法是在循环中检查条件变量。这样,即使线程被虚假唤醒,循环会继续执行,直到条件变量真正被通知并且条件满足。这种方式确保了线程只有在条件真正满足时才会继续执行,从而避免了虚假唤醒带来的问题。

以下是使用while循环等待条件变量的一个典型代码示例(以Java为例):

1
2
3
4
5
6
synchronized (lock) {
while (!condition) {
lock.wait(); // 等待条件变量
}
// 条件满足,执行后续操作
}

在这个例子中,即使线程被虚假唤醒,while循环会确保线程只有在condition为真时才会退出等待状态。这种方式提供了一种健壮的同步机制,确保了程序的正确性。


对于signal/broadcast端:

  1. 不一定要在mutex已上锁的情况下调用signal(理论上)。
  2. 在signal之前一般要修改布尔表达式。
  3. 修改布尔表达式通常要用mutex保护(至少用作full memory barrier)。
  4. 注意区分signal与broadcast:“broadcast通常用于表明状态变化,signal通常用于表示资源可用。”写成代码是:
1
2
3
4
5
6
void enqueue(int x)
{
MutexLockGuard lock(mutex);
queue.push_back(x);
cond.notify(); // 可以移出临界区之外
}

上面的dequeue()/enqueue()实际上实现了一个简单的容量无限的(unbounded)BlockingQueue。


Tips:

在多线程编程中,signalbroadcast是条件变量的两个操作,用于通知一个或多个等待的线程条件已经满足。以下是对您提到的几点的解释:

  1. 不一定要在mutex已上锁的情况下调用signal(理论上): 理论上,signalbroadcast可以在没有持有互斥锁的情况下调用,但在实践中,这通常不是一个好主意。因为条件变量通常与互斥锁一起使用,以保护共享资源。在没有互斥锁保护的情况下调用signal可能会导致竞态条件,因为其他线程可能会在条件变量被通知和实际检查条件之间修改共享资源。

  2. 在signal之前一般要修改布尔表达式: 在调用signal之前,通常需要修改控制线程等待的条件的布尔表达式。这是因为signal的目的是通知等待的线程条件可能已经满足,所以必须先确保条件确实满足。

  3. 修改布尔表达式通常要用mutex保护(至少用作full memory barrier): 修改控制条件的布尔表达式时,需要使用互斥锁来保护,以确保修改对所有线程都是可见的,并且防止内存访问的重排序。互斥锁在这里充当一个完整的内存屏障(full memory barrier),确保在释放锁之前的所有内存写入操作完成后,其他线程才能看到这些写入。

  4. 区分signal与broadcast

    • signal通常用于通知单个线程条件已经满足,而broadcast用于通知所有等待的线程。
    • signal常用于表示某个资源已经可用,比如在生产者-消费者模型中,生产者生产了一个项目后,会使用signal来通知消费者。
    • broadcast通常用于表明状态变化,比如在所有资源都已处理完毕或者有重大状态更新时,需要通知所有等待的线程。

您提供的代码示例是一个简单的enqueue操作,它将一个整数x添加到队列中,并在添加后通知条件变量cond。这里的MutexLockGuard lock(mutex);是一个RAII(资源获取即初始化)风格的互斥锁保护,它在构造时自动获取互斥锁,并在析构时自动释放互斥锁。这意味着在enqueue函数的作用域内,互斥锁是被持有的,保证了对队列queue的修改是线程安全的。

1
2
3
4
5
6
void enqueue(int x)
{
MutexLockGuard lock(mutex); // 自动获取互斥锁
queue.push_back(x); // 修改共享资源
cond.notify(); // 通知一个等待的线程
} // 函数结束时自动释放互斥锁

在这个例子中,cond.notify();可以移出临界区之外,因为它只是通知一个等待的线程条件可能已经满足,而不需要修改任何控制条件的布尔表达式。但是,通常建议在修改了控制条件之后立即通知,以减少虚假唤醒的可能性,并确保条件变量的及时通知。


思考:enqueue()中每次添加元素都会调用Condition::notify(),如果改成只在queue.size()从0变1的时候才调用condition.notify()),会出现什么后果?

条件变量是非常底层的同步原语,很少直接使用,一般都是用它来实现高层的同步措施,如BlockingQueue<T>或CountDownLatch。

倒计时(CountDownLatch)是一种常用且易用的同步手段。它主要有两种用途:

  • 主线程发起多个子线程,等这些子线程各自都完成一定的任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
  • 主线程发起多个子线程,子线程都等待主线程,主线程完成其他一些任务之后通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令。

当然我们可以直接用条件变量来实现以上两种同步。不过如果用CountDownLatch的话,程序的逻辑更清晰。CountDownLatch的接口很简单:

1
2
3
4
5
6
7
8
9
10
class CountDownLatch : boost::noncopyable {
public:
explicit CountDownLatch(int count); // 倒数几次
void wait(); // 等待计数值变为0
void countDown(); // 计数减一
private:
mutable MutexLock mutex_;
Condition condition_;
int count_;
};

CountDownLatch的实现同样简单,几乎就是条件变量的教科书式应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void CountDownLatch::wait()
{
MutexLockGuard lock(mutex_);
while(count_ > 0)
condition_.wait();
}

void CountDownLatch::countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
condition_.notifyAll();
}

注意到CountDownLatch::countDown()使用的是Condition::notifyAll(),而前面P.41的enqueue()使用的是Condition::notify(),这都是有意为之。请读者思考,如果交换两种用法会出现什么情况?


Tips:

在讨论CountDownLatch::countDown()enqueue()Condition::notify()Condition::notifyAll()的使用差异之前,我们先回顾一下这两个函数的作用:

  • Condition::notify():唤醒在此条件变量上等待的单个线程。
  • Condition::notifyAll():唤醒在此条件变量上等待的所有线程。

现在,让我们分析如果交换这两种用法会出现什么情况:

  1. 如果CountDownLatch::countDown()使用Condition::notify()而不是Condition::notifyAll()
    • 在这种情况下,当count_变为0时,只有一个等待的线程会被唤醒。如果CountDownLatch被多个线程调用wait(),那么只有其中一个线程会得到执行,而其他线程将继续等待。这可能会导致死锁,因为没有任何机制可以保证其他等待的线程会被唤醒。这违背了CountDownLatch的设计初衷,即确保所有等待的线程在计数器达到0时都能继续执行。
  2. 如果enqueue()使用Condition::notifyAll()而不是Condition::notify()
    • enqueue()的上下文中,通常是为了将一个元素添加到队列中,并唤醒等待队列的消费者线程。如果使用Condition::notifyAll(),那么所有等待的消费者线程都会被唤醒。这可能会导致不必要的上下文切换和竞争条件,因为可能只有一个元素被添加到队列中,而所有被唤醒的线程都会尝试消费这个元素。这可能会导致性能问题,因为多个线程竞争同一个资源,而且实际上只有一个线程能够成功消费元素。

总结来说,CountDownLatch::countDown()使用Condition::notifyAll()是为了确保所有等待的线程都能被唤醒,因为CountDownLatch的目的是确保所有线程在某个条件满足后都能继续执行。而enqueue()使用Condition::notify()是为了只唤醒一个消费者线程,这样可以避免多个线程同时竞争同一个新添加的元素,从而减少不必要的竞争和上下文切换。


互斥器和条件变量构成了多线程编程的全部必备同步原语,用它们即可完成任何多线程同步任务,二者不能相互替代。我认为应该精通这两个同步原语的用法,先学会编写正确、安全的多线程程序,再在必要的时候考虑用其他“高技术”手段提高性能,如果确实能提高性能的话。千万不要连mutex都还没学会、用好,一上来就考虑lock-free设计。

3. 不要用读写锁和信号量

读写锁(Readers-Writer lock,简写为rwlock)是个看上去很美的抽象,它明确区分了read和write两种行为。

初学者常干的一件事情是,一见到某个共享数据结构频繁读而很少写,就把mutex替换为rwlock。甚至首选rwlock来保护共享状态,这不见得是正确的。

  • 从正确性方面来说,一种典型的易犯错误是在持有read lock的时候修改了共享数据。这通常发生在程序的维护阶段,为新增功能,程序员不小心在原来read lock保护的函数中调用了会修改状态的函数。这种错误的后果跟无保护并发读写共享数据是一样的。

  • 从性能方面来说,读写锁不见得比普通mutex更高效。无论如何reader lock加锁的开销不会比mutex lock小,因为它要更新当前reader的数目。如果临界区很小,锁竞争不激烈,那么mutex往往会更快。见\(\S1.9\)的例子。

  • reader lock可能允许提升为writer lock,也可能不允许提升。考虑\(\S2.1.1\)的post()和traverse()示例,如果用读写锁来保护foos对象,那么post()应该持有写锁,而traverse()应该持有读锁。如果允许把读锁提升为写锁,后果跟使用recursive mutex一样,会造成迭代器失效,程序崩溃。如果不允许提升,后果跟使用non-recursive mutex一样,会造成死锁。我宁愿程序死锁,留个“全尸”好验尸。

  • 通常reader lock是可重入的,writer lock是不可重入的。但是为了防止writer饥饿,writer lock通常会阻塞后来的reader lock,因此reader lock在重入的时候可能死锁。另外,在追求低延迟读取的场合也不适用读写锁。

muduo线程库有意不提供读写锁的封装,因为我还没有在工作中遇到过用rwlock替换普通mutex会显著提高性能的例子。相反,我们一般建议首选mutex。

遇到并发读写,如果条件合适,我通常会用\(\S2.8\)的办法,而不用读写锁,同时避免reader被writer阻塞。如果确实对并发读写有极高的性能要求,可以考虑read-copy-update。


Tips:

read-copy-update(RCU)是一种并发控制机制,主要用于多处理器环境中,以提高系统的可扩展性和性能,特别是在读操作远多于写操作的场景中。RCU的核心思想是在读取操作时不加锁,而是在更新数据结构时使用一种特殊的机制来确保一致性。

具体来说,当有线程想要修改某个数据结构时,RCU首先创建该数据结构的一个副本(Copy),然后在这个副本上进行更新操作。一旦更新完成,新的版本将被标记为最新版本,而旧版本仍然可供那些正在读取的线程使用,直到它们完成读取操作后都离开临界区后,指针指向最新版本的指针,并且删除旧版本。由于读取操作是在旧版本上进行的,因此不需要加锁,这极大地减少了读取操作的延迟。

RCU的优点包括: 1. 允许多个读者同时访问数据,而不会阻塞彼此,提高了读取密集型工作负载的性能。 2. 写操作可以在不影响读取操作的情况下进行,从而减少了延迟。 3. 通过延迟更新操作,RCU减少了锁的使用,从而提高了系统的并发性能。

总的来说,RCU是一种高效的并发控制机制,特别适用于读多写少的场景,它通过牺牲一定的数据新鲜度来换取高性能。


信号量(Semaphore):我没有遇到过需要使用信号量的情况,无从谈及个人经验。我认为信号量不是必备的同步原语,因为条件变量配合互斥器可以完全替代其功能,而且更不易用错。除了[RWC]指出的“semaphore has no notion of ownership”之外,信号量的另一个问题在于它有自己的计数值,而通常我们自己的数据结构也有长度值,这就造成了同样的信息存了两份,需要时刻保持一致,这增加了程序员的负担和出错的可能。如果要控制并发度,可以考虑用muduo::ThreadPool。

说一句不知天高地厚的话,如果程序里需要解决如“哲学家就餐”之类的复杂IPC问题,我认为应该首先检讨这个设计:为什么线程之间会有如此复杂的资源争抢(一个线程要同时抢到两个资源,一个资源可以被两个线程争夺)?如果在工作中遇到,我会把“想吃饭”这个事情专门交给一个为哲学家分派餐具的线程来做,然后每个哲学家等在一个简单的condition variable上,到时间了有人通知他去吃饭。从哲学上说,教科书上的解决方案是平权,每个哲学家有自己的线程,自己去拿筷子:我宁愿用集权的方式,用一个线程专门管餐具的分配,让其他哲学家线程拿个号等在食堂门口好了。这样不损失多少效率,却让程序简单很多。虽然Windows的WaitForMultipleObjects让这个问题trivial化,但在Linux下正确模拟WaitForMultipleObjects不是普通程序员该干的。

Pthreads还提供了barrier这个同步原语,我认为不如CountDownLatch实用。

4. 封装MutexLock、MutexLockGuard、Condition

本节把前面用到的MutexLock、MutexLockGuard、Condition等class的代码列出来,前面两个class没太多难度,后面那个有点意思。这几个class都不允许拷贝构造和赋值。完整代码可以在muduo/base找到。

MutexLock的附加值在于提供了isLockedByThisThread()函数,用于程序断言。它用到的CurrentThread::tid()函数将在\(\S4.3\)介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class MutexLock : boost::noncopyable {
public:
MutexLock() : holder_(0) {
pthread_mutex_init(&mutex_, NULL);
}

~MutexLock() {
assert(holder_ == 0);
pthread_mutex_destroy(&mutex_);
}

bool isLockedByThisThread() {
return holder_ == CurrentThread::tid();
}

void assertLocked() {
assert(isLockedByThisThread());
}

void lock() { // 仅供MutexLockGuard调用,严禁用户代码调用
pthread_mutex_lock(&mutex_); // 这两行顺序不能反
holder_ = CurrentThread::tid();
}

void unlock() { // 仅供MutexLockGuard调用,严禁用户代码调用
holder_ = 0; // 这两行顺序不能反
pthread_mutex_unlock(&mutex_);
}

pthread_mutex_t* getPthreadMutex() { // 仅供Condition调用,严禁用户代码调用
return &mutex_;
}
private:
pthread_mutex_t mutex_;
pid_t holder_;
};

class MutexLockGuard : boost::noncopyable {
public:
explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex) {
mutex_.lock();
}
~MutexLockGuard() {
mutex_.unlock();
}
private:
MutexLock& mutex_;
};
#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")

注意上面代码的最后一行定义了一个宏,这个宏的作用是防止程序单出现如下错误:

1
2
3
4
5
void doit() {
MutexLockGuard(mutex); // 遗漏变量名,产生一个临时对象又马上销毁了,结果没有锁住临界区。
// 正确写法是MutexLockGuard lock(mutex);
// 临界区
}

Tips:

这段代码是用C++编写的,它定义了两个类:MutexLockMutexLockGuard,以及一个宏定义 MutexLockGuard。这两个类共同实现了一个互斥锁(mutex)机制,用于多线程编程中,以确保同一时间只有一个线程可以访问共享资源。下面是对代码的逐行解释:

MutexLock

  • class MutexLock : boost::noncopyable:定义了一个名为 MutexLock 的类,它继承自 boost::noncopyable,这意味着 MutexLock 对象不能被复制或赋值,这是为了防止复制互斥锁时出现未定义行为。

  • MutexLock() : holder_(0) { pthread_mutex_init(&mutex_, NULL); }:构造函数初始化互斥锁 mutex_ 并设置持有者 holder_ 为0,表示没有线程持有这个锁。

  • ~MutexLock() { assert(holder_ == 0); pthread_mutex_destroy(&mutex_); }:析构函数断言 holder_ 为0,表示没有线程持有这个锁,然后销毁互斥锁。

  • bool isLockedByThisThread() { return holder_ == CurrentThread::tid(); }:检查当前线程是否持有这个锁。

  • void assertLocked() { assert(isLockedByThisThread()); }:断言当前线程是否持有这个锁。

  • void lock() { pthread_mutex_lock(&mutex_); holder_ = CurrentThread::tid(); }:锁定互斥锁,并设置持有者为当前线程的ID。

  • void unlock() { holder_ = 0; pthread_mutex_unlock(&mutex_); }:释放互斥锁,并清除持有者。

  • pthread_mutex_t* getPthreadMutex() { return &mutex_; }:返回内部的 pthread_mutex_t 对象,仅供 Condition 类使用。

MutexLockGuard

  • class MutexLockGuard : boost::noncopyable:定义了一个名为 MutexLockGuard 的类,它也继承自 boost::noncopyable,防止复制。

  • explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex) { mutex_.lock(); }:构造函数接受一个 MutexLock 对象的引用,并锁定它。

  • ~MutexLockGuard() { mutex_.unlock(); }:析构函数解锁 MutexLock 对象。

宏定义 MutexLockGuard

  • #define MutexLockGuard(x) static_assert(false, "missing mutex guard var name"):这个宏定义用来防止直接使用 MutexLockGuard 而忘记提供变量名,如果直接使用 MutexLockGuard 而不是 MutexLockGuard(mutex),编译时会触发 static_assert 错误。

总结

这段代码实现了一个基本的互斥锁机制,MutexLock 提供了互斥锁的基本操作,而 MutexLockGuard 通过RAII(资源获取即初始化)模式确保在作用域结束时自动释放互斥锁,即使发生异常也是如此。这样可以避免死锁和资源泄露。宏定义 MutexLockGuard 用于编译时检查,确保正确使用 MutexLockGuard


我见过有人把MutexLockGuard写成template,我没有这么做是因为它的模板类类型参数只有MutexLock一种可能,没有必要随意增加灵活性,于是我手工把模板具现化(instantiate)了。此外一种更激进的写法是,把lock/unlock放到private区,然后再把MutexLockGuard设为MutexLock的friend。我认为在注释里告知程序员即可,另外check-in之前的code review也很容易发现误用的情况(grep getPthreadMutex)。

这段代码没有达到工业强度:

  • mutex创建为PTHREAD_MUTEX_DEFAULT类型,而不是我们预想的PTHREAD_MUTEX_NORMAL类型(实际上这二者很可能是等同的),严格的做法是用mutexattr来显示指定mutex的类型。
  • 没有检查返回值。这里不能用assert检查返回值,因为assert()在release build里是空语句。我们检查返回值的意义在于防止ENOMEM之类的资源不足情况,这一般只可能在负载很重的产品程序中出现。一旦出现这种错误,程序必须立刻清理现场并主动退出,否则会莫名其妙地崩溃,给事后调查造成困难。这里我们需要non-debug的assert,或许google-glog的CHECK()宏是个不错的思路。

muduo库的一个特点是只提供最常用、最基本的功能,特别有意避免提供多种功能近似的选择。muduo不是“杂货铺”,不会不分青红皂白地把各种有用的、没用的功能全铺开摆出来。muduo删繁就简,举重若轻;减少选择余地,生活更简单。

MutexLock没有提供trylock()函数,因为我没有在生成代码中用过它。我想不出什么时候程序需要“试着去锁一锁”,或许我写过的代码太简单了。

Condition class的实现有点意思。Pthreads condition variable允许在wait()的时候指定mutex,但是我想不出有什么理由一个condition variable会和不同的mutex配合使用。Java的intrinsic condition和Condition class都不支持这么做,因此我觉得可以放弃这一灵活性,老老实实地一对一好了。

相反,boost::thread的condition_variable是在wait()的时候指定mutex,请参观其同步原语的庞杂设计:

  • Concept有四种:Lockable、TimedLockable、SharedLockable、UpgradeLockable。
  • Lock有六种:lock_guard、unique_lock、shared_lock、upgrade_lock、upgrade_to_unique_lock、scoped_try_lock。
  • Mutex有七种:mutex、try_mutex、timed_mutex、recursive_mutex、recursive_try_mutex、recursive_timed_mutex、shared_mutex。

恕我愚钝,见到boost::thread这样如Rube Goldberg Machine一样让人眼花缭乱的库,我只得三揖绕道而行。很不幸C++11的线程库也采纳了这套方案。这些class名学也很无厘头,为什么不老老实实用readers_writer_lock这样的通俗名字呢?非得增加精神负担,自己发明新名字。我不愿为这样的灵活性付出代价,宁愿自己做几个简简单单的一看就明白的class来用,这种简单的几行代码的“轮子”造造也无妨。提供灵活性固然是本事,然而在不需要灵活性的地方把代码写死,更需要大智慧

下面这个muduo::Condition class简单地封装了Pthreads condition variable,用起来也容易,见本节前面的例子。这里我用notify/notifyAll作为函数名,因为signal有别的含义,C++里的signal/slot、C里的signal handler等等。就别overload这个术语了。

1
2
3
4
5
6
7
8
9
10
11
12
13
class Condition : boost::noncopyable {
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex) {
pthread_cond_init(&pcond_, NULL);
}
~Condition() { pthread_cond_destroy(&pcond_); }
void wait() { pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); }
void notify() { pthread_cond_signal(&pcond_); }
void notifyAll() { pthread_cond_broadcast(&pcond_); }
private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};

这段代码定义了一个名为 Condition 的类,它用于多线程编程中,提供了条件变量的功能。条件变量允许线程在某些条件不满足时挂起(等待),并在条件满足时被唤醒。以下是对代码的逐行解释:

Condition

  • class Condition : boost::noncopyable:定义了一个名为 Condition 的类,它继承自 boost::noncopyable,这意味着 Condition 对象不能被复制或赋值,以防止复制条件变量时出现未定义行为。

  • explicit Condition(MutexLock& mutex) : mutex_(mutex) { pthread_cond_init(&pcond_, NULL); }:构造函数接受一个 MutexLock 对象的引用,并初始化条件变量 pcond_。这里 mutex_ 被用来关联条件变量和互斥锁,确保在等待条件变量时能够正确地锁定和解锁。

  • ~Condition() { pthread_cond_destroy(&pcond_); }:析构函数销毁条件变量 pcond_

  • void wait() { pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); }wait 方法使当前线程等待条件变量。调用 pthread_cond_wait 时,会释放关联的互斥锁,并使线程进入等待状态。当条件变量被唤醒时,线程会重新获取互斥锁。

  • void notify() { pthread_cond_signal(&pcond_); }notify 方法唤醒等待条件变量的一个线程。如果有线程正在等待这个条件变量,其中一个线程会被唤醒。

  • void notifyAll() { pthread_cond_broadcast(&pcond_); }notifyAll 方法唤醒所有等待条件变量的线程。

私有成员

  • MutexLock& mutex_:一个 MutexLock 对象的引用,用于与条件变量关联互斥锁。

  • pthread_cond_t pcond_:系统级别的条件变量,用于线程间的同步。

总结

Condition 类提供了条件变量的基本操作,包括等待(wait)、通知一个线程(notify)和通知所有线程(notifyAll)。这些操作通常与互斥锁一起使用,以实现线程间的同步。通过在等待条件变量时释放互斥锁,并在被唤醒时重新获取互斥锁,可以避免死锁,并确保线程安全地访问共享资源。


如果一个class要包含MutexLock和Condition,请注意它们的声明顺序和初始化顺序,mutex应先于condition_构造,并作为后者的构造参数:

1
2
3
4
5
6
7
8
class CountDownLatch {
public:
CountDownLatch(int count) : mutex_(), condition_(mutex_), count_(count) {}
private:
mutable MutexLock mutex_; // 顺序很重要,先mutex后condition
Condition condition_;
int count_;
};

请允许我再次强调,虽然本章花了大量篇幅介绍如何正确使用mutex和condition variable,但并不代表我鼓励到处使用它们。这两者都是非常底层的同步原语,主要用来实现更高级的并发编程工具。一个多线程程序里如果大量使用mutex和condition variable来同步,基本跟用铅笔刀锯大树(孟岩语)没啥区别。

在程序里使用Pthreads库有一个额外的好处:分析工具认得它们,懂得其语义。线程分析工具如Intel Thread Checker和Valgrind-Helgrind等能识别Pthreads调用,并依据happens-before关系分析程序有无data race。

5. 线程安全的Singleton实现

研究Singleton的线程安全实现的历史会发现很多有意思的事情,人们一度认为double-checked locking(缩写为DCL)是王道,兼顾了效率与正确性。后来有“神牛”指出由于乱序执行的影响,DCL是靠不住的。Java开发者还算幸运,可以借助内部静态类的装载来实现。C++就比较惨,要么次次锁,要么eager initialize,或者动用memory barrier这样的“大杀器”。接下来Java 5修订了内存模型,并给volatile赋予了acquire/release语义,这下DCL(with volatile)又是安全的了。然而C++的内存模型还在修订中,C++的volatile目前还不能(将来也难说)保证DCL的正确性(只在Visual C++ 2005及以上版本有效)。

其实没那么麻烦,在实践中用pthread_once就行:

muduo/base/Singleton.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
class Singleton : boost::noncopyable {
public:
static T& instance() {
pthread_once(&ponce_, &Singleton::init);
return *value_;
}
private:
Singleton();
~Singleton();
static void init() { value_ = new T(); }
private:
static pthread_once_t ponce_;
static T* value_;
};
// 必须在头文件中定义static变量
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;
template<typename T>
T* Singleton<T>::value_ = NULL;

Tips:

这段代码是一个C++模板类,用于实现单例模式(Singleton Pattern)。单例模式是一种常用的设计模式,确保一个类只有一个实例,并提供一个全局访问点。下面是这段代码的逐行解释:

  1. template<typename T>:这是一个模板声明,意味着Singleton类可以用于任何类型的T

  2. class Singleton : boost::noncopyable:定义了一个名为Singleton的类,它继承自boost::noncopyableboost::noncopyable是一个来自Boost库的类,它通过私有化拷贝构造函数和赋值操作符来防止对象被复制。

  3. public::公共部分的开始,类的用户可以直接访问这部分的成员。

  4. static T& instance() {:这是一个静态成员函数,返回类型为T的引用。这个函数是获取单例对象的唯一接口。

  5. pthread_once(&ponce_, &Singleton::init);:调用pthread_once函数,确保init函数只被调用一次。pthread_once是一个线程安全的机制,用于初始化。

  6. return *value_;:返回单例对象的引用。

  7. private::私有部分的开始,这部分的成员不能被类的用户直接访问。

  8. Singleton();:私有的构造函数,防止外部直接创建类的实例。

  9. ~Singleton();:私有的析构函数。

  10. static void init() { value_ = new T(); }:一个静态私有成员函数,用于创建单例对象。它在pthread_once确保只被调用一次的情况下执行,并使用new操作符创建T类型的实例。

  11. static pthread_once_t ponce_;:一个静态私有成员变量,用于pthread_once机制。

  12. static T* value_;:一个静态私有成员变量,指向单例对象。

  13. template<typename T> pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;:这是在类外定义静态成员变量ponce_的地方。PTHREAD_ONCE_INIT是一个宏,用于初始化pthread_once_t类型的变量。

  14. template<typename T> T* Singleton<T>::value_ = NULL;:这是在类外定义静态成员变量value_的地方,初始值为NULL

总结来说,这段代码定义了一个线程安全的单例模板类,它确保了无论在多线程环境下还是在单线程环境下,类T的实例都只会被创建一次,并且提供了一个全局访问点instance()来获取这个唯一的实例。


这个Singleton没有任何花哨的技巧,它用pthread_once_t来保证lazy-initialization的线程安全。线程安全性由Pthreads库保证,如果系统的Pthreads库有bug,那就认命吧,多线程程序反正也不可能正确执行了。

使用方法也很简单:

1
Foo& foo = Singleton<Foo>::instance();

这个Singleton没有考虑对象的销毁。在长时间运行的服务器程序里,这不是一个问题,反正进程也不打算正常退出(\(\S9.2.2\))。在短期运行的程序中,程序退出的时候自然就释放所有资源了(前提是程序从来不使用不能由操作系统自动关闭的资源,比如跨进程的mutex)。在实际的muduo::Singleton class中,通过atexit()提供了销毁功能,聊胜于无。

另外,这个Singleton只能调用默认构造函数,如果用户想要指定T的构造方式,我们可以用模板特化技术来提供一个定制点,这需要引入另一层间接(another level of indirection)。

6. sleep(3)不是同步原语

我认为sleep/usleep/nanosleep只能出现在测试代码中,比如写单元测试的时候;或者用于有意延长临界区,加速复现死锁的情况,就像\(\S2.1.2\)示范的那样。sleep不具备memory barrier语义,它不能保证内存的可见性,见p.84的例子。

生产代码中线程的等待可分为两种:一种是等待资源可用(要么等在select/poll/epoll_wait上,要么等在条件变量上);一种是等着进入临界区(等在mutex上)以便读写共享数据。后一种等待通常极短,否则程序性能和伸缩性就会有问题。

在程序的正常执行中,如果需要等待一段已知的时间,应该往event loop里注册一个timer,然后在timer的回调函数里接着干活,因为线程是个珍贵的共享资源,不能轻易浪费(阻塞也是浪费)。如果等待某个事件发生,那么应该采用条件变量或IO事件回调,不能用sleep来轮询。不要使用下面这种业余做法:

1
2
3
4
5
6
7
while (true) {
if (!dataAvailable) {
sleep(some_time);
} else {
consumeData();
}
}

如果多线程的安全性和效率要靠代码主动调用sleep来保证,这显然是设计出了问题。等待某个事件发生,正确的做法是用select等价物或Condition,抑或(更理想地)高层同步工具:在用户态做轮询(polling)是低效的。

7. 归纳与总结

前面几节内容归纳如下:

  • 线程同步的四项原则,尽量用高层同步设施(线程池、队列、倒计时);
  • 使用普通互斥器和条件变量完成剩余的同步任务,采用RAII惯用手法(idiom)和ScopedLocking。

用好这两样东西,基本上就能应付多线程服务端开发的各种场合。

在现代的多核计算背景下,多线程是不可避免的。尽管在一定程度上可以通过framework来屏蔽,让你感觉像是在与单线程程序,比如Java Servlet。了解under the hood发生了什么对于编写这种程序也会有帮助。

多线程编程是一项重要的个人技能,不能因为它难就本能地排斥,现在的软件开发比起10年、20年前已经难了不知道多少倍。掌握多线程编程,才能更理智地选择用还是不用多线程,因为你能预估多线程实现的难度与收益,在一开始做出正确的选择。要知道把一个单线程程序改成多线程的,往往比从头实现一个多线程的程序更困难。要明白多线程编程中哪些是能做的,哪些是一般程序员应该避开的雷区。

掌握同步原语和它们的适用场合是多线程编程的基本功。以我的经验,熟练使用文中提到的同步原语,就能比较容易地编写线程安全的程序。本文没有考虑signal对多线程编程的影响,Unix的signal在多线程下的行为比较复杂,一般要靠底层的网络库(如Reactor)加以屏蔽,避免干扰上层应用程序的开发。

通篇来看,“效率”并不是我的主要考虑点,我提倡正确加锁而不是自己编写lock-free算法(使用原子整数除外),更不要想当然地自己发明同步设施。在没有实测数据支持的情况下,要谈哪种做法效率更高是靠不住的,不能听信传言或凭感觉“优化”。很多人误认为用锁会让程序变慢,其实真正影响性能的不是锁,而是锁争用。在程序的复杂度和性能之前取得平衡,并考虑未来两三年扩容的可能(无论是CPU变快、核数变多,还是机器数量增加、网络升级)。我认为在分布式系统中,多机伸缩性(scale out)比单机的性能优化更值得投入精力。


Tips:

多机伸缩性(scale out),也称为水平扩展或横向扩展,是指在分布式系统中通过增加更多的机器(节点)来提高系统的处理能力、存储容量和可靠性。这种扩展方式与垂直扩展(scale up)相对,后者是指通过提升单个机器的性能(如增加CPU核心数、内存或存储容量)来增强系统能力。

多机伸缩性的主要特点包括:

  1. 可扩展性:系统可以根据需求动态地增加或减少节点,以适应负载的变化。

  2. 高可用性:通过在多个节点上复制数据和任务,系统可以在某些节点失败时继续运行,提高了系统的可用性。

  3. 容错性:系统设计能够容忍节点故障,不会因单个节点的故障而导致整个系统崩溃。

  4. 负载均衡:通过将工作负载分散到多个节点,可以避免单个节点过载,提高整体性能。

  5. 成本效益:通常,增加更多的低成本机器比升级单个高性能机器更经济。

  6. 灵活性:多机伸缩性允许系统在不同的硬件和软件配置之间灵活选择,以满足特定的业务需求。

在分布式系统中,多机伸缩性尤其重要,因为:

  • 大规模数据处理:随着数据量的增长,单一机器可能无法处理所有的数据,需要多机协作。

  • 服务的高可用性:为了保证服务的连续性,需要在多个地点部署服务实例。

  • 应对不同的工作负载:不同的应用可能需要不同的资源配置,多机伸缩性允许系统根据需要动态调整资源。

  • 成本控制:相比于购买昂贵的高性能服务器,使用多台普通服务器可能更加经济。

因此,多机伸缩性是构建现代分布式系统时一个重要的考虑因素,它有助于构建更加强大、灵活和经济高效的系统。


8. 借shared_ptr实现copy-on-write

本节解决\(\S2.1\)的几个未决问题:

  • \(\S2.1.1\) post()和traverse()死锁。
  • \(\S2.1.2\)把Request::print()移出Inventory::printAll临界区。
  • \(\S2.1.2\)解决Request对象析构的race condition。

然后再示范用普通mutex替换读写锁。解决办法都基于同一个思路,那就是用shared_ptr来管理共享数据。原理如下:

  • shared_ptr是引用计数型智能指针,如果当前只有一个观察者,那么引用计数的值为1。
  • 对于write端,如果发现引用计数为1,这时可以安全地修改共享对象,不必担心有人正在读它。
  • 对于read端,在读之前把引用计数加1,读完之后减1,这样保证在读的期间其引用计数大于1,可以阻止并发写。
  • 比较难的是,对于write端,如果发现引用计数大于1,该如何处理?sleep一小段时间肯定是错的。

先来看一个简单的例子,解决\(\S2.1.1\)中的post()和traverse()死锁。数据结构改成:

1
2
3
4
typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
MutexLock mutex;
FooListPtr g_foos;

在read端,用一个栈上局部FooListPtr变量当做“观察者”,它使得g_foos的引用计数增加。traverse()函数的临界区是L4~L8,临界区内只读了一次共享变量g_foos(这里多线程并发读写shared_ptr,因此必须用mutex保护),比原来的写法大为缩短。而且多个线程同时调用traverse也不会相互阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
void traverse() {
FooListPtr foos;
{
MutexLockGuard lock(mutex);
foos = g_foos;
assert(g_foos.unique());
}
for (std::vector<Foo>::const_iterator it = foos->begin();
it != foos->end();++it) {
it->doit();
}
}

关键看write端的post()该如何写。按照前面的描述,如果g_foos.unique()为true,我们可以放心地在原地(in-place)修改FooList。如果g_foos.unique()为false,说明这时别的线程正在读取FooList,我们不能原地修改,而是复制一份(L5),在副本上修改(L9)。这样就避免了死锁。

1
2
3
4
5
6
7
8
9
10
void post(const Foo& f) {
printf("post \n");
MutexLockGuard lock(mutex);
if (!g_foos.unique()) {
g_foos.reset(new FooList(*g_foos));
printf("copy the whole list \n");
}
assert(g_foos.unique());
g_foos->push_back(f);
}

注意这里临界区包括整个函数(L3~L9),其他写法都是错的。读者可以试着运行这个程序,看看什么时候会打印L6的消息。练习:找出以下几种写法的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 错误一:直接修改g_foos 所指的FooList
void post(const Foo& f) {
MutexLockGuard lock(mutex);
g_foos->push_back(f);
}

// 错误二:试图缩小临界区,把copying移出临界区
void post(const Foo& f) {
FooListPtr newFoos(new FooList(*g_foos));
newFoos->push_back(f);
MutexLockGuard lock(mutex);
g_foos = newFoos; // 或者g_foos.swap(newFoos);
}

// 错误三:把临界区拆成两个小的,把copying放到临界区之外
void post(const Foo& f) {
FooListPtr oldFoos;
{
MutexLockGuard lock(mutex);
oldFoos = g_foos;
}
FooListPtr newFoos(new FooList(*oldFoos));
newFoos->push_back(f);
MutexLockGuard lock(mutex);
g_foos = newFoos; // 或者g_foos.swap(newFoos);
}

Tips:

这三种错误都涉及到多线程编程中的同步问题,特别是在使用互斥锁(mutex)来保护共享资源时。下面我将逐一解释这三种错误:

错误一:直接修改g_foos所指的FooList

1
2
3
4
void post(const Foo& f) {
MutexLockGuard lock(mutex);
g_foos->push_back(f);
}

这个错误在于,虽然在修改g_foos之前获取了互斥锁,但是没有考虑到push_back操作可能不是原子的。如果FooListpush_back方法涉及到内存分配,那么在多线程环境下,可能会有其他线程在push_back操作的中间时刻查看或修改g_foos,导致数据不一致或竞态条件。

错误二:试图缩小临界区,把copying移出临界区

1
2
3
4
5
6
void post(const Foo& f) {
FooListPtr newFoos(new FooList(*g_foos));
newFoos->push_back(f);
MutexLockGuard lock(mutex);
g_foos = newFoos; // 或者g_foos.swap(newFoos);
}

这个错误在于,虽然试图通过缩小临界区来提高效率,但是newFoos的构造和push_back操作是在互斥锁外部完成的。这意味着在构造newFoos和添加元素的过程中,其他线程可能已经修改了g_foos,导致newFoos是基于一个过时的g_foos副本。当lock获取互斥锁并尝试替换g_foos时,可能会覆盖其他线程的更新,造成数据丢失。

错误三:把临界区拆成两个小的,把copying放到临界区之外

1
2
3
4
5
6
7
8
9
void post(const Foo& f) {
FooListPtr oldFoos;
MutexLockGuard lock(mutex);
oldFoos = g_foos;
FooListPtr newFoos(new FooList(*oldFoos));
newFoos->push_back(f);
MutexLockGuard lock(mutex);
g_foos = newFoos; // 或者g_foos.swap(newFoos);
}

这个错误在于,虽然尝试通过拆分临界区来减少锁的持有时间,但是newFoos的构造和push_back操作仍然在互斥锁外部完成。这与错误二类似,oldFoos的复制可能不是最新的,而且在两次获取互斥锁之间,g_foos可能已经被其他线程修改。这可能导致newFoos是基于一个过时的g_foos副本,并且在第二次获取互斥锁时,可能会覆盖其他线程的更新。

正确的做法

为了解决这些问题,应该确保所有对共享资源的操作都在互斥锁的保护下完成,并且尽量减少锁的持有时间。一个可能的解决方案是:

1
2
3
4
5
6
void post(const Foo& f) {
MutexLockGuard lock(mutex);
FooListPtr newFoos(new FooList(*g_foos));
newFoos->push_back(f);
g_foos = newFoos; // 或者g_foos.swap(newFoos);
}

在这个修正的版本中,所有的操作都在互斥锁的保护下完成,确保了线程安全,并且减少了锁的持有时间,因为只有在修改g_foos时才持有锁。


希望读者先吃透上面举的这个例子,再来看如何用相同的思路解决剩下的问题。

解决\(\S2.1.2\)把Request::print()移出Inventory::printAll临界区有两个做法。其一很简单,把requests_复制一份,在临界区之外遍历这个副本。

1
2
3
4
5
6
7
8
void Inventory::printAll() const {
std::set<Request*> requests;
{
MutexLockGuard lock(mutex_);
requests = requests_;
}
// 遍历局部变量requests,调用Request::print();
}

这么做有一个明显的缺点,它复制了整个std::set中的每个元素,开销可能会比较大。如果保证遍历期间没有其他人修改requests_,那么我们可以减小开销,这就引出了第二种做法。

第二种做法的要点是用shared_ptr管理std::set,在遍历的时候先增加引用计数,阻止并发修改。当然Inventory::add()和Inventory::remove()也要相应修改,采用本节前面post()和traverse()的方案。完整的代码见recipes/thread/test/Request-Inventory_test.cc。

注意目前的方案仍然没有解决Request对象析构的race condition,这点还是留作练习吧。一种可能的答案见recipes/thread/test/RequestInventory_test2.cc。

8.1 用普通mutex替换读写锁的一个例子

场景:一个多线程的C++程序,24小时5.5天运行。有几个工作线程Thread-Worker[0,1,2,3],处理客户发过来的交易请求;另外有一个背景线程ThreadBackground,不定期更新程序内部的参考数据。这些线程都跟一个hash表打交道,工作线程只读,背景线程读写,必然要用到一些同步机制,防止数据损坏。这里的示例代码用std::map代替hash表,意思是一样的:

1
2
using namespace std;
typedef map<string, vector<pair<string, int>>> Map;

Map的key是用户名,value是一个vector,里边存的是不同stock的最小交易间隔,vector已经排好序,可以用二分查找。

我们的系统要求工作线程的延迟尽可能小,可以容忍背景线程的延迟略大。一天之内,背景线程对数据更新的次数屈指可数,最多一小时一次,更新的数据来自于网络,所以对更新的及时性不敏感。Map的数据量也不大,大约一千多条数据。

最简单的同步办法是用读写锁:工作线程加读锁,背景线程加写锁。但是读写锁的开销比普通mutex要大,而且写锁优先,会阻塞后面的读锁。如果工作线程能用最普通的非重入mutex实现同步,就不必用读写锁,这能降低工作线程延迟。我们借助shared_ptr做到了这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CustomerData : boost::noncopyable {
public:
CustomerData() : data_(new Map) {}
int query(const string& customer, const string& stock) const;
private:
typedef std::pair<string, int> Entry;
typedef std::vector<Entry> EntryList;
typedef std::map<string, EntryList> Map;
typedef boost::shared_ptr<Map> MapPtr;
void update(const string& customer, const EntryList& entries);
// 用lower_bound在entries里找stock
static int findEntry(const EntryList& entries, const string& stock);
MapPtr getData() const {
MutexLockGuard lock(mutex_);
return data_;
}
mutable MutexLock mutex_;
MapPtr data_;
};

int CustomerData::query(const string& customer, const string& stock) const {
MapPtr data = getData();
// data一旦拿到,就不再需要锁了。
// 取数据的时候只有 getData()内部有锁,多线程并发读的性能很好。
Map::const_iterator entries = data->find(customer);
if (entries != data->end()) {
return findEntry(entries->second, stock);
} else {
return -1;
}
}

关键看CustomerData::update()怎么写。既然要更新数据,那肯定得加锁,如果这时候其他线程正在读,那么不能在原来的数据上修改,得创建一个副本,在副本上修改,修改完了再替换。如果没有用户在读,那么就能直接修改,节约一次Map拷贝。

1
2
3
4
5
6
7
8
9
10
void CustomerData::update(const string& customer, const EntryList& entries) {
MutexLockGuard lock(mutex_); // update必须全程持锁
if (!data_.unique()) {
MapPtr newData(new Map(*data_));
// 在这里打印日志,然后统计日志来判断worst case发生的次数
data_.swap(newData);
}
assert(data_.unique());
(*data_)[customer] = entries;
}

注意其中用了shared_ptr::unique()来判断是不是有人在读,如果有人在读,那么我们不能直接修改,因为query并没有全程加锁,只在getData()内部有锁。shared_ptr::swap把data_替换为新副本,而且我们还在锁里,不会有别的线程来读,可以放心地更新。如果别的reader线程已经刚刚通过getData()拿到了MapPtr,它会读到稍旧的数据。这不是问题,因为数据更新来自网络,如果网络稍有延迟,反正reader线程也会读到旧的数据。

如果每次都更新全部数据,而且始终是在同一个线程更新数据,临界区还可以进一步缩小。

1
2
3
4
5
6
7
8
9
MapPtr parseData(const string& message); // 解析收到的消息,返回新的MapPtr
void CustomerData::update(const string& message) {
// 解析新数据,在临界区之外
MapPtr newData = parseData(message);
if (newData) {
MutexLockGuard lock(mutex_);
data_.swap(newData); // 不要使用data_ = newData;
}
}

旧数据的析构也在临界区外,进一步缩短了临界区。

据我们测试,大多数情况下更新都是在原来数据上进行的,拷贝的比例还不到1%,很高效。更准确地说,这不是copy-on-write,而是copy-on-other-reading。

我们将来可能会采用无锁数据结构,不过目前这个实现已经非常好,可以满足我们的要求。

本节介绍的做法与read-copy-update有相似之处,但理解起来要容易得多。


Tips:

这段代码是一个多线程C++程序的一部分,它展示了如何使用std::map(在这个例子中被Map类型替代)和boost::shared_ptr来实现线程安全的读和写操作。程序的目标是最小化工作线程的延迟,同时允许背景线程进行数据更新。下面是代码的详细解释:

类定义和成员函数

  1. CustomerData类:这个类封装了对Map的操作,Map是一个std::map的别名,用于存储用户数据。Map的键是用户名,值是一个vector,存储的是不同股票的最小交易间隔。

  2. 构造函数CustomerData的构造函数初始化了一个MapPtr,这是一个boost::shared_ptr智能指针,指向一个Map对象。

  3. query函数:这个函数用于查询特定用户和股票的交易间隔。它首先通过getData()获取Map的副本,然后在这个副本上进行查找。getData()函数内部使用MutexLockGuard(一个互斥锁的RAII封装)来保护对data_的访问,确保线程安全。

  4. update函数:这个函数用于更新用户数据。它首先获取互斥锁,然后检查是否有其他线程正在读取数据(通过shared_ptr::unique())。如果有,它会创建一个新的Map副本,在副本上进行更新,然后通过swap替换原来的Map。如果没有其他线程正在读取,它可以直接更新原始Map

关键点

  • 线程安全:通过使用MutexLockGuardshared_ptr::unique(),代码确保了在更新数据时不会影响正在进行的读取操作。

  • 性能优化:通过只在getData()内部加锁,代码减少了工作线程的延迟,因为它们不需要在查询数据时持有锁。

  • 数据一致性:代码通过创建数据副本来更新数据,确保了数据的一致性,即使在更新过程中有新的读取请求。

  • shared_ptr::unique():这个函数检查智能指针是否是唯一的,即没有其他shared_ptr实例共享同一个对象。如果是唯一的,那么可以安全地直接修改原始数据,否则需要创建副本。

  • swap操作swap函数用于交换两个shared_ptr指向的对象,这在更新数据时非常有用,因为它可以原子地替换旧数据,避免了数据不一致的风险。

总结

这段代码展示了如何在多线程环境中安全地进行读和写操作,同时尽量减少工作线程的延迟。通过使用shared_ptr和互斥锁,它能够在保证数据一致性的同时,提高程序的性能。


Tips:

在提供的代码示例中,.swap() 方法的使用是为了在更新数据时实现线程安全和减少锁持有时间。.swap() 方法交换两个 shared_ptr 指向的内部数据指针,这样做有几个关键好处:

1. 原子性

.swap() 是一个原子操作,意味着它在执行时不会被其他线程中断。这确保了在更新数据时不会出现部分更新的情况,从而避免了数据不一致的问题。

2. 减少锁持有时间

通过在 update() 方法中使用 .swap(),代码将锁持有时间最小化。在 update() 方法中,互斥锁只在 .swap() 操作期间被持有,这意味着:

  • 锁外数据处理:在获取锁之前,可以解析新数据并创建一个新的 Map 对象(在 parseData() 函数中完成)。这样,大部分数据处理工作在没有锁的情况下完成,减少了锁的竞争和等待时间。

  • 快速替换:一旦获取了锁,.swap() 操作迅速替换旧的 Map 对象为新的 Map 对象,这个过程非常快,因为它只涉及指针的交换。

3. 避免数据复制

如果不使用 .swap(),更新数据时可能需要复制整个 Map 对象。这不仅会增加内存使用,还可能因为复制操作而增加延迟。使用 .swap() 避免了这种复制,因为它只是交换了指针,而不是数据本身。

4. 保持引用计数的一致性

由于 shared_ptr 使用引用计数来管理内存,.swap() 确保了在交换指针时引用计数的正确更新。这意味着在交换过程中不会出现引用计数错误,从而避免了内存泄漏或过早释放内存的问题。

5. 处理多个读者

在多线程环境中,可能有多个线程同时读取数据。使用 .swap(),可以确保所有正在进行的读操作在数据更新时看到的是一致的数据视图。在读操作完成之前,它们不会看到部分更新的数据,这保证了数据的一致性和线程安全。

总结来说,.swap() 在这个上下文中的使用是为了确保线程安全的数据更新,同时最小化锁持有时间,避免不必要的数据复制,并保持引用计数的一致性。这些都是在高性能多线程应用程序中非常重要的考虑因素。



2. 线程同步精要
http://binbo-zappy.github.io/2024/12/23/muduo多线程/2-线程同步精要/
作者
Binbo
发布于
2024年12月23日
许可协议