14. 多线程编程

第14章 多线程编程

自内核2.6开始,Linux才真正提供内核级的线程支持,并有两个组织致力于编写新的线程库:NGPT(Next Generation POSIX Threads)和NPTL(Native POSIX Thread Library)。不过前者在2003年就放弃了,因此新的线程库就称为NPTL。NPTL比LinuxThreads效率高,且更符合POSIX规范,所以它已经成为glibc的一部分。本书所有线程相关的例程使用的线程库都是NPTL。

本章要讨论的线程相关的内容都属于POSIX线程(简称pthread)标准,而不局限于NPTL实现,具体包括:

❑创建线程和结束线程。

❑读取和设置线程属性。

❑POSIX线程同步方式:POSIX信号量、互斥锁和条件变量。

1. Linux线程概述

1.1 线程模型

线程是程序中完成一个独立任务的完整执行序列,即一个可调度的实体。根据运行环境和调度者的身份,线程可分为内核线程和用户线程。

  • 内核线程,在有的系统上也称为LWP(Light Weight Process,轻量级进程),运行在内核空间,由内核来调度;
  • 用户线程运行在用户空间,由线程库来调度。
  • 当进程的一个内核线程获得CPU的使用权时,它就加载并运行一个用户线程。

可见,内核线程相当于用户线程运行的“容器”。

  • 一个进程可以拥有M个内核线程和N个用户线程,其中M≤N。

  • 并且在一个系统的所有进程中,M和N的比值都是固定的。

  • 按照M:N的取值,线程的实现方式可分为三种模式:完全在用户空间实现、完全由内核调度和双层调度(two level scheduler)。

完全在用户空间实现的线程无须内核的支持,内核甚至根本不知道这些线程的存在。

  • 线程库负责管理所有执行线程,比如线程的优先级、时间片等。
  • 线程库利用longjmp来切换线程的执行,使它们看起来像是“并发”执行的。
  • 但实际上内核仍然是把整个进程作为最小单位来调度的。
  • 换句话说,一个进程的所有执行线程共享该进程的时间片,它们对外表现出相同的优先级。
  • 因此,对这种实现方式而言,N=1,即M个用户空间线程对应1个内核线程,而该内核线程实际上就是进程本身。
  • 完全在用户空间实现的线程的优点是:
    • 创建和调度线程都无须内核的干预,因此速度相当快。
    • 并且由于它不占用额外的内核资源,所以即使一个进程创建了很多线程,也不会对系统性能造成明显的影响。
  • 其缺点是:
    • 对于多处理器系统,一个进程的多个线程无法运行在不同的CPU上,因为内核是按照其最小调度单位来分配CPU的。
    • 此外,线程的优先级只对同一个进程中的线程有效,比较不同进程中的线程的优先级没有意义。早期的伯克利UNIX线程就是采用这种方式实现的。

完全由内核调度的模式将创建、调度线程的任务都交给了内核,运行在用户空间的线程库无须执行管理任务,这与完全在用户空间实现的线程恰恰相反。

  • 二者的优缺点也正好互换。
  • 较早的Linux内核对内核线程的控制能力有限,线程库通常还要提供额外的控制能力,尤其是线程同步机制,不过现代Linux内核已经大大增强了对线程的支持。
  • 完全由内核调度的这种线程实现方式满足M:N=1:1,即1个用户空间线程被映射为1个内核线程。

双层调度模式是前两种实现模式的混合体:

  • 内核调度M个内核线程,线程库调度N个用户线程。
  • 这种线程实现方式结合了前两种方式的优点:不但不会消耗过多的内核资源,而且线程切换速度也较快,同时它可以充分利用多处理器的优势。

1.2 Linux线程库

Linux上两个最有名的线程库是LinuxThreads和NPTL,它们都是采用1:1的方式实现的。由于LinuxThreads在开发的时候,Linux内核对线程的支持还非常有限,所以其可用性、稳定性以及POSIX兼容性都远远不及NPTL。现代Linux上默认使用的线程库是NPTL。

用户可以使用如下命令来查看当前系统上所使用的线程库:

1
2
$getconf GNU_LIBPTHREAD_VERSION
NPTL 2.14.90
  • LinuxThreads线程库的内核线程是用clone系统调用创建的进程模拟的。
  • clone系统调用和fork系统调用的作用类似:创建调用进程的子进程。
  • 不过我们可以为clone系统调用指定CLONE_THREAD标志,这种情况下它创建的子进程与调用进程共享相同的虚拟地址空间、文件描述符和信号处理函数,这些都是线程的特点。
  • 不过,用进程来模拟内核线程会导致很多语义问题,比如:
    • 每个线程拥有不同的PID,因此不符合POSIX规范。
    • Linux信号处理本来是基于进程的,但现在一个进程内部的所有线程都能而且必须处理信号。
    • 用户ID、组ID对一个进程中的不同线程来说可能是不一样的。
    • 程序产生的核心转储文件不会包含所有线程的信息,而只包含产生该核心转储文件的线程的信息。
    • 由于每个线程都是一个进程,因此系统允许的最大进程数也就是最大线程数。

LinuxThreads线程库一个有名的特性是所谓的管理线程。它是进程中专门用于管理其他工作线程的线程。其作用包括:

  • 系统发送给进程的终止信号先由管理线程接收,管理线程再给其他工作线程发送同样的信号以终止它们。
  • 当终止工作线程或者工作线程主动退出时,管理线程必须等待它们结束,以避免僵尸进程。
  • 如果主线程先于其他工作线程退出,则管理线程将阻塞它,直到所有其他工作线程都结束之后才唤醒它。
  • 回收每个线程堆栈使用的内存。

管理线程的引入,增加了额外的系统开销。并且由于它只能运行在一个CPU上,所以LinuxThreads线程库也不能充分利用多处理器系统的优势。

要解决LinuxThreads线程库的一系列问题,不仅需要改进线程库,最主要的是需要内核提供更完善的线程支持。因此,Linux内核从2.6版本开始,提供了真正的内核线程。新的NPTL线程库也应运而生。相比LinuxThreads,NPTL的主要优势在于:

  • 内核线程不再是一个进程,因此避免了很多用进程模拟内核线程导致的语义问题。
  • 摒弃了管理线程,终止线程、回收线程堆栈等工作都可以由内核来完成。
  • 由于不存在管理线程,所以一个进程的线程可以运行在不同的CPU上,从而充分利用了多处理器系统的优势。
  • 线程的同步由内核来完成。隶属于不同进程的线程之间也能共享互斥锁,因此可实现跨进程的线程同步。

2. 创建线程和结束线程

下面我们讨论创建和结束线程的基础API。Linux系统上,它们都定义在pthread.h头文件中。

  1. pthread_create

创建一个线程的函数是pthread_create。其定义如下:

1
2
#include<pthread.h>
int pthread_create(pthread_t*thread,const pthread_attr_t*attr,void*(*start_routine)(void*),void*arg);

thread参数是新线程的标识符,后续pthread_*函数通过它来引用新线程。其类型pthread_t的定义如下:

1
2
#include<bits/pthreadtypes.h>
typedef unsigned long int pthread_t;

可见,pthread_t是一个整型类型。实际上,Linux上几乎所有的资源标识符都是一个整型数,比如socket、各种System V IPC标识符等。

  • attr参数用于设置新线程的属性。给它传递NULL表示使用默认线程属性。线程拥有众多属性,我们将在后面详细讨论之。

  • start_routine和arg参数分别指定新线程将运行的函数及其参数。

  • pthread_create成功时返回0,失败时返回错误码。

    • 一个用户可以打开的线程数量不能超过RLIMIT_NPROC软资源限制(见表7-1)。此外,系统上所有用户能创建的线程总数也不得超过/proc/sys/kernel/threads-max内核参数所定义的值。
  1. pthread_exit

线程一旦被创建好,内核就可以调度内核线程来执行start_routine函数指针所指向的函数了。线程函数在结束时最好调用如下函数,以确保安全、干净地退出:

1
2
#include<pthread.h>
void pthread_exit(void*retval);

pthread_exit函数通过retval参数向线程的回收者传递其退出信息。它执行完之后不会返回到调用者,而且永远不会失败。

  1. pthread_join

一个进程中的所有线程都可以调用pthread_join函数来回收其他线程(前提是目标线程是可回收的,见后文),即等待其他线程结束,这类似于回收进程的wait和waitpid系统调用。pthread_join的定义如下:

1
2
#include<pthread.h>
int pthread_join(pthread_t thread,void**retval);

thread参数是目标线程的标识符,retval参数则是目标线程返回的退出信息。该函数会一直阻塞,直到被回收的线程结束为止。该函数成功时返回0,失败则返回错误码。可能的错误码如表14-1所示。

4.pthread_cancel

有时候我们希望异常终止一个线程,即取消线程,它是通过如下函数实现的:

1
2
#include<pthread.h>
int pthread_cancel(pthread_t thread);

thread参数是目标线程的标识符。该函数成功时返回0,失败则返回错误码。不过,接收到取消请求的目标线程可以决定是否允许被取消以及如何取消,这分别由如下两个函数完成:

1
2
3
#include<pthread.h>
int pthread_setcancelstate(int state,int*oldstate);
int pthread_setcanceltype(int type,int*oldtype);
  • 这两个函数的第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型(如何取消),
  • 第二个参数则分别记录线程原来的取消状态和取消类型。
  • state参数有两个可选值:
    • PTHREAD_CANCEL_ENABLE,允许线程被取消。它是线程被创建时的默认取消状态。
    • PTHREAD_CANCEL_DISABLE,禁止线程被取消。这种情况下,如果一个线程收到取消请求,则它会将请求挂起,直到该线程允许被取消。
  • type参数也有两个可选值:
    • PTHREAD_CANCEL_ASYNCHRONOUS,线程随时都可以被取消。它将使得接收到取消请求的目标线程立即采取行动。
    • PTHREAD_CANCEL_DEFERRED,允许目标线程推迟行动,直到它调用了下面几个所谓的取消点函数中的一个:pthread_join、pthread_testcancel、pthread_cond_wait、pthread_cond_timedwait、sem_wait和sigwait。根据POSIX标准,其他可能阻塞的系统调用,比如read、wait,也可以成为取消点。不过为了安全起见,我们最好在可能被取消的代码中调用pthread_testcancel函数以设置取消点。
  • pthread_setcancelstate和pthread_setcanceltype成功时返回0,失败则返回错误码。

3. 线程属性

pthread_attr_t结构体定义了一套完整的线程属性,如下所示:

1
2
3
4
5
6
#include<bits/pthreadtypes.h>
#define__SIZEOF_PTHREAD_ATTR_T 36
typedef union {
char _size[__SIZEOF_PTHREAD_ATTR_T];
long int _align;
} pthread_attr_t;

各种线程属性全部包含在一个字符数组中。线程库定义了一系列函数来操作pthread_attr_t类型的变量,以方便我们获取和设置线程属性。这些函数包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<pthread.h>
/*初始化线程属性对象*/
int pthread_attr_init(pthread_attr_t*attr);
/*销毁线程属性对象。被销毁的线程属性对象只有再次初始化之后才能继续使用*/
int pthread_attr_destroy(pthread_attr_t*attr);
/*下面这些函数用于获取和设置线程属性对象的某个属性*/
int pthread_attr_getdetachstate(const pthread_attr_t*attr,int*detachstate);
int pthread_attr_setdetachstate(pthread_attr_t*attr,int detachstate);
int pthread_attr_getstackaddr(const pthread_attr_t*attr,void**stackaddr);
int pthread_attr_setstackaddr(pthread_attr_t*attr,void*stackaddr);
int pthread_attr_getstacksize(const pthread_attr_t*attr,size_t*stacksize);
int pthread_attr_setstacksize(pthread_attr_t*attr,size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t*attr,void**stackaddr,size_t*stacksize);
int pthread_attr_setstack(pthread_attr_t*attr,void*stackaddr,size_t stacksize);
int pthread_attr_getguardsize(const pthread_attr_t*__attr,size_t*guardsize);
int pthread_attr_setguardsize(pthread_attr_t*attr,size_t guardsize);
int pthread_attr_getschedparam(const pthread_attr_t*attr,struct sched_param*param);
int pthread_attr_setschedparam(pthread_attr_t*attr,const struct sched_param*param);
int pthread_attr_getschedpolicy(const pthread_attr_t*attr,int*policy);
int pthread_attr_setschedpolicy(pthread_attr_t*attr,int policy);
int pthread_attr_getinheritsched(const pthread_attr_t*attr,int*inherit);
int pthread_attr_setinheritsched(pthread_attr_t*attr,int inherit);
int pthread_attr_getscope(const pthread_attr_t*attr,int*scope);
int pthread_attr_setscope(pthread_attr_t*attr,int scope);

下面我们详细讨论每个线程属性的含义:

❑detachstate,线程的脱离状态。

  • 它有PTHREAD_CREATE_JOINABLE和PTHREAD_CREATE_DETACHED两个可选值。
  • 前者指定线程是可以被回收的,后者使调用线程脱离与进程中其他线程的同步。
  • 脱离了与其他线程同步的线程称为“脱离线程”。脱离线程在退出时将自行释放其占用的系统资源。
  • 线程创建时该属性的默认值是PTHREAD_CREATE_JOINABLE。
  • 此外,我们也可以使用pthread_detach函数直接将线程设置为脱离线程。

❑stackaddr和stacksize,线程堆栈的起始地址和大小。

  • 一般来说,我们不需要自己来管理线程堆栈,因为Linux默认为每个线程分配了足够的堆栈空间(一般是8 MB)。
  • 我们可以使用ulimt-s命令来查看或修改这个默认值。

❑guardsize,保护区域大小。

  • 如果guardsize大于0,则系统创建线程的时候会在其堆栈的尾部额外分配guardsize字节的空间,作为保护堆栈不被错误地覆盖的区域。
  • 如果guardsize等于0,则系统不为新创建的线程设置堆栈保护区。
  • 如果使用者通过pthread_attr_setstackaddr或pthread_attr_setstack函数手动设置线程的堆栈,则guardsize属性将被忽略。

❑schedparam,线程调度参数。

  • 其类型是sched_param结构体。
  • 该结构体目前还只有一个整型类型的成员——sched_priority,该成员表示线程的运行优先级。

❑schedpolicy,线程调度策略。

  • 该属性有SCHED_FIFO、SCHED_RR和SCHED_OTHER三个可选值,
  • 其中SCHED_OTHER是默认值。
  • SCHED_RR表示采用轮转算法(round-robin)调度,SCHED_FIFO表示使用先进先出的方法调度,这两种调度方法都具备实时调度功能,但只能用于以超级用户身份运行的进程。

❑inheritsched,是否继承调用线程的调度属性。

  • 该属性有PTHREAD_INHERIT_SCHED和PTHREAD_EXPLICIT_SCHED两个可选值。
  • 前者表示新线程沿用其创建者的线程调度参数,这种情况下再设置新线程的调度参数属性将没有任何效果。
  • 后者表示调用者要明确地指定新线程的调度参数。

❑scope,线程间竞争CPU的范围,即线程优先级的有效范围。

  • POSIX标准定义了该属性的PTHREAD_SCOPE_SYSTEM和PTHREAD_SCOPE_PROCESS两个可选值,
  • 前者表示目标线程与系统中所有线程一起竞争CPU的使用,
  • 后者表示目标线程仅与其他隶属于同一进程的线程竞争CPU的使用。
  • 目前Linux只支持PTHREAD_SCOPE_SYSTEM这一种取值。

4. POSIX信号量

和多进程程序一样,多线程程序也必须考虑同步问题。

pthread_join可以看作一种简单的线程同步方式,不过很显然,它无法高效地实现复杂的同步需求,比如控制对共享资源的独占式访问,抑或是在某个条件满足之后唤醒一个线程。

接下来我们讨论3种专门用于线程同步的机制:POSIX信号量、互斥量和条件变量。

在Linux上,信号量API有两组。一组是第13章讨论过的System V IPC信号量,另外一组是我们现在要讨论的POSIX信号量。这两组接口很相似,但不保证能互换。由于这两种信号量的语义完全相同,因此我们不再赘述信号量的原理。

POSIX信号量函数的名字都以sem_开头,并不像大多数线程函数那样以pthread_开头。常用的POSIX信号量函数是下面5个:

1
2
3
4
5
6
#include<semaphore.h>
int sem_init(sem_t*sem,int pshared,unsigned int value);
int sem_destroy(sem_t*sem);
int sem_wait(sem_t*sem);
int sem_trywait(sem_t*sem);
int sem_post(sem_t*sem);

这些函数的第一个参数sem指向被操作的信号量。

sem_init函数用于初始化一个未命名的信号量。

  • pshared参数指定信号量的类型。如果其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程之间共享。
  • value参数指定信号量的初始值。此外,初始化一个已经被初始化的信号量将导致不可预期的结果。

sem_destroy函数用于销毁信号量,以释放其占用的内核资源。

  • 如果销毁一个正被其他线程等待的信号量,则将导致不可预期的结果。

sem_wait函数以原子操作的方式将信号量的值减1。

  • 如果信号量的值为0,则sem_wait将被阻塞,直到这个信号量具有非0值。

sem_trywait与sem_wait函数相似

  • 不过它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait的非阻塞版本。
  • 当信号量的值非0时,sem_trywait对信号量执行减1操作。
  • 当信号量的值为0时,它将返回-1并设置errno为EAGAIN。

sem_post函数以原子操作的方式将信号量的值加1。

  • 当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒。

上面这些函数成功时返回0,失败则返回-1并设置errno。

5. 互斥锁

互斥锁(也称互斥量)可以用于保护关键代码段,以确保其独占式的访问,这有点像一个二进制信号量(见13.5.1小节)。当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的P操作;当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这等价于二进制信号量的V操作。

5.1 互斥锁基础API

POSIX互斥锁的相关函数主要有如下5个:

1
2
3
4
5
6
#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t*mutex,const thread_mutexattr_t*mutexattr);
int pthread_mutex_destroy(pthread_mutex_t*mutex);
int pthread_mutex_lock(pthread_mutex_t*mutex);
int pthread_mutex_trylock(pthread_mutex_t*mutex);
int pthread_mutex_unlock(pthread_mutex_t*mutex);

这些函数的第一个参数mutex指向要操作的目标互斥锁,互斥锁的类型是pthread_mutex_t结构体。

pthread_mutex_init函数用于初始化互斥锁。

  • mutexattr参数指定互斥锁的属性。
  • 如果将它设置为NULL,则表示使用默认属性。
  • ,我们还可以使用如下方式来初始化一个互斥锁:
1
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

宏PTHREAD_MUTEX_INITIALIZER实际上只是把互斥锁的各个字段都初始化为0

pthread_mutex_destroy函数用于销毁互斥锁,以释放其占用的内核资源。

  • 销毁一个已经加锁的互斥锁将导致不可预期的后果。

pthread_mutex_lock函数以原子操作的方式给一个互斥锁加锁。

  • 如果目标互斥锁已经被锁上,则pthread_mutex_lock调用将阻塞,直到该互斥锁的占有者将其解锁。

pthread_mutex_trylock与pthread_mutex_lock函数类似,

  • 不过它始终立即返回,而不论被操作的互斥锁是否已经被加锁,相当于pthread_mutex_lock的非阻塞版本。
  • 当目标互斥锁未被加锁时,pthread_mutex_trylock对互斥锁执行加锁操作。
  • 当互斥锁已经被加锁时,pthread_mutex_trylock将返回错误码EBUSY。
  • 需要注意的是,这里讨论的pthread_mutex_lock和pthread_mutex_trylock的行为是针对普通锁而言的。后面我们将看到,对于其他类型的锁而言,这两个加锁函数会有不同的行为。

pthread_mutex_unlock函数以原子操作的方式给一个互斥锁解锁。

  • 如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。

上面这些函数成功时返回0,失败则返回错误码。

5.2 互斥锁属性

pthread_mutexattr_t结构体定义了一套完整的互斥锁属性。线程库提供了一系列函数来操作pthread_mutexattr_t类型的变量,以方便我们获取和设置互斥锁属性。这里我们列出其中一些主要的函数:

1
2
3
4
5
6
7
8
9
10
11
#include<pthread.h>
/*初始化互斥锁属性对象*/
int pthread_mutexattr_init(pthread_mutexattr_t*attr);
/*销毁互斥锁属性对象*/
int pthread_mutexattr_destroy(pthread_mutexattr_t*attr);
/*获取和设置互斥锁的pshared属性*/
int pthread_mutexattr_getpshared(const pthread_mutexattr_t*attr,int*pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t*attr,int pshared);
/*获取和设置互斥锁的type属性*/
int pthread_mutexattr_gettype(const pthread_mutexattr_t*attr,int*type);
int pthread_mutexattr_settype(pthread_mutexattr_t*attr,int type);

本书只讨论互斥锁的两种常用属性:pshared和type。

互斥锁属性pshared指定是否允许跨进程共享互斥锁,其可选值有两个:

  • PTHREAD_PROCESS_SHARED。互斥锁可以被跨进程共享。
  • PTHREAD_PROCESS_PRIVATE。互斥锁只能被和锁的初始化线程隶属于同一个进程的线程共享。

互斥锁属性type指定互斥锁的类型。Linux支持如下4种类型的互斥锁:

  • PTHREAD_MUTEX_NORMAL,普通锁。
    • 这是互斥锁默认的类型。
    • 当一个线程对一个普通锁加锁以后,其余请求该锁的线程将形成一个等待队列,并在该锁解锁后按优先级获得它。
    • 这种锁类型保证了资源分配的公平性。
    • 但这种锁也很容易引发问题:一个线程如果对一个已经加锁的普通锁再次加锁,将引发死锁;对一个已经被其他线程加锁的普通锁解锁,或者对一个已经解锁的普通锁再次解锁,将导致不可预期的后果。
  • PTHREAD_MUTEX_ERRORCHECK,检错锁。
    • 一个线程如果对一个已经加锁的检错锁再次加锁,则加锁操作返回EDEADLK。
    • 对一个已经被其他线程加锁的检错锁解锁,或者对一个已经解锁的检错锁再次解锁,则解锁操作返回EPERM。
  • PTHREAD_MUTEX_RECURSIVE,嵌套锁。
    • 这种锁允许一个线程在释放锁之前多次对它加锁而不发生死锁。
    • 不过其他线程如果要获得这个锁,则当前锁的拥有者必须执行相应次数的解锁操作。
    • 对一个已经被其他线程加锁的嵌套锁解锁,或者对一个已经解锁的嵌套锁再次解锁,则解锁操作返回EPERM。
  • PTHREAD_MUTEX_DEFAULT,默认锁。
    • 一个线程如果对一个已经加锁的默认锁再次加锁,
    • 或者对一个已经被其他线程加锁的默认锁解锁,
    • 或者对一个已经解锁的默认锁再次解锁,将导致不可预期的后果。这种锁在实现的时候可能被映射为上面三种锁之一。

5.3 死锁举例

使用互斥锁的一个噩耗是死锁。

死锁使得一个或多个线程被挂起而无法继续执行,而且这种情况还不容易被发现。

前文提到,在一个线程中对一个已经加锁的普通锁再次加锁,将导致死锁。这种情况可能出现在设计得不够仔细的递归函数中。另外,如果两个线程按照不同的顺序来申请两个互斥锁,也容易产生死锁,如代码清单14-1所示。

代码清单14-1 按不同顺序访问互斥锁导致死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include<pthread.h>
#include<unistd.h>
#include<stdio.h>
int a=0;
int b=0;
pthread_mutex_t mutex_a;
pthread_mutex_t mutex_b;
void*another(void*arg) {
pthread_mutex_lock(&mutex_b);
printf("in child thread,got mutex b,waiting for mutex a\n");
sleep(5);
++b;
pthread_mutex_lock(&mutex_a);
b+=a++;
pthread_mutex_unlock(&mutex_a);
pthread_mutex_unlock(&mutex_b);
pthread_exit(NULL);
}
int main() {
pthread_t id;
pthread_mutex_init(&mutex_a,NULL);
pthread_mutex_init(&mutex_b,NULL);
pthread_create(&id,NULL,another,NULL);
pthread_mutex_lock(&mutex_a);
printf("in parent thread,got mutex a,waiting for mutex b\n");
sleep(5);
++a;
pthread_mutex_lock(&mutex_b);
a+=b++;
pthread_mutex_unlock(&mutex_b);
pthread_mutex_unlock(&mutex_a);
pthread_join(id,NULL);
pthread_mutex_destroy(&mutex_a);
pthread_mutex_destroy(&mutex_b);
return 0;
}

代码清单14-1中,

  • 主线程试图先占有互斥锁mutex_a,然后操作被该锁保护的变量a,
  • 但操作完毕之后,主线程并没有立即释放互斥锁mutex_a,而是又申请互斥锁mutex_b,
  • 并在两个互斥锁的保护下,操作变量a和b,最后才一起释放这两个互斥锁;
  • 与此同时,子线程则按照相反的顺序来申请互斥锁mutex_a和mutex_b,
  • 并在两个锁的保护下操作变量a和b。
  • sleep函数来模拟连续两次调用pthread_mutex_lock之间的时间差,以确保代码中的两个线程各自先占有一个互斥锁(主线程占有mutex_a,子线程占有mutex_b),然后等待另外一个互斥锁(主线程等待mutex_b,子线程等待mutex_a)。

这样,两个线程就僵持住了,谁都不能继续往下执行,从而形成死锁。如果代码中不加入sleep函数,则这段代码或许总能成功地运行,从而为程序留下了一个潜在的BUG。

6. 条件变量

如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。

条件变量的相关函数主要有如下5个:

1
2
3
4
5
6
#include<pthread.h>
int pthread_cond_init(pthread_cond_t*cond,const pthread_condattr_t*cond_attr);
int pthread_cond_destroy(pthread_cond_t*cond);
int pthread_cond_broadcast(pthread_cond_t*cond);
int pthread_cond_signal(pthread_cond_t*cond);
int pthread_cond_wait(pthread_cond_t*cond,pthread_mutex_t*mutex);

这些函数的第一个参数cond指向要操作的目标条件变量,条件变量的类型是pthread_cond_t结构体。

pthread_cond_init函数用于初始化条件变量。

  • cond_attr参数指定条件变量的属性。如果将它设置为NULL,则表示使用默认属性。
  • 条件变量的属性不多,而且和互斥锁的属性类型相似,所以我们不再赘述。
  • 除了pthread_cond_init函数外,我们还可以使用如下方式来初始化一个条件变量:
1
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

宏PTHREAD_COND_INITIALIZER实际上只是把条件变量的各个字段都初始化为0。

pthread_cond_destroy函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在被等待的条件变量将失败并返回EBUSY。

pthread_cond_broadcast函数以广播的方式唤醒所有等待目标条件变量的线程。

pthread_cond_signal函数用于唤醒一个等待目标条件变量的线程。

  • 至于哪个线程将被唤醒,则取决于线程的优先级和调度策略。
  • 有时候我们可能想唤醒一个指定的线程,但pthread没有对该需求提供解决方法。
  • 不过我们可以间接地实现该需求:定义一个能够唯一表示目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播方式唤醒所有等待条件变量的线程,这些线程被唤醒后都检查该变量以判断被唤醒的是否是自己,如果是就开始执行后续代码,如果不是则返回继续等待。

pthread_cond_wait函数用于等待目标条件变量。

  • mutex参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait操作的原子性。
  • 在调用pthread_cond_wait前,必须确保互斥锁mutex已经加锁,否则将导致不可预期的结果。
  • pthread_cond_wait函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex解锁。
  • 可见,从pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内,pthread_cond_signal和pthread_cond_broadcast等函数不会修改条件变量。
  • 换言之,pthread_cond_wait函数不会错过目标条件变量的任何变化[7]。当pthread_cond_wait函数成功返回时,互斥锁mutex将再次被锁上。

上面这些函数成功时返回0,失败则返回错误码。

7. 线程同步机制包装类

为了充分复用代码,同时由于后文的需要,我们将前面讨论的3种线程同步机制分别封装成3个类,实现在locker.h文件中,如代码清单14-2所示。

代码清单14-2 locker.h文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#ifndef LOCKER_H
#define LOCKER_H
#include<exception>
#include<pthread.h>
#include<semaphore.h>

/*封装信号量的类*/
class sem {
public:
/*创建并初始化信号量*/
sem() {
if(sem_init(&m_sem,0,0)!=0) {
/*构造函数没有返回值,可以通过抛出异常来报告错误*/
throw std::exception();
}
}
/*销毁信号量*/
~sem() {
sem_destroy(&m_sem);
}
/*等待信号量*/
bool wait() {
return sem_wait(&m_sem)==0;
}
/*增加信号量*/
bool post() {
return sem_post(&m_sem)==0;
}
private:
sem_t m_sem;
};

/*封装互斥锁的类*/
class locker {
public:
/*创建并初始化互斥锁*/
locker() {
if(pthread_mutex_init(&m_mutex,NULL)!=0) {
throw std::exception();
}
}
/*销毁互斥锁*/
~locker() {
pthread_mutex_destroy(&m_mutex);
}
/*获取互斥锁*/
bool lock() {
return pthread_mutex_lock(&m_mutex)==0;
}
/*释放互斥锁*/
bool unlock() {
return pthread_mutex_unlock(&m_mutex)==0;
}
private:
pthread_mutex_t m_mutex;
};

/*封装条件变量的类*/
class cond {
public:
/*创建并初始化条件变量*/
cond() {
if(pthread_mutex_init(&m_mutex,NULL)!=0) {
throw std::exception();
}
if(pthread_cond_init(&m_cond,NULL)!=0) {
/*构造函数中一旦出现问题,就应该立即释放已经成功分配了的资源*/
pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
/*销毁条件变量*/
~cond() {
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
/*等待条件变量*/
bool wait() {
int ret=0;
pthread_mutex_lock(&m_mutex);
ret=pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);
return ret==0;
}
/*唤醒等待条件变量的线程*/
bool signal() {
return pthread_cond_signal(&m_cond)==0;
}
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif

8. 多线程环境

8.1 可重入函数

如果一个函数能被多个线程同时调用且不发生竞态条件,则我们称它是线程安全的(thread safe),或者说它是可重入函数

Linux库函数只有一小部分是不可重入的,比如5.1.4小节讨论的inet_ntoa函数,以及5.12.2小节讨论的getservbyname和getservbyport函数。这些库函数之所以不可重入,主要是因为其内部使用了静态变量。

不过Linux对很多不可重入的库函数提供了对应的可重入版本,这些可重入版本的函数名是在原函数名尾部加上_r。比如,函数localtime对应的可重入函数是localtime_r。在多线程程序中调用库函数,一定要使用其可重入版本,否则可能导致预想不到的结果。

8.2 线程和进程

思考这样一个问题:如果一个多线程程序的某个线程调用了fork函数,那么新创建的子进程是否将自动创建和父进程相同数量的线程呢?

答案是“否”,正如我们期望的那样。子进程只拥有一个执行线程,它是调用fork的那个线程的完整复制

并且子进程将自动继承父进程中互斥锁(条件变量与之类似)的状态。

也就是说,父进程中已经被加锁的互斥锁在子进程中也是被锁住的。这就引起了一个问题:子进程可能不清楚从父进程继承而来的互斥锁的具体状态(是加锁状态还是解锁状态)。

这个互斥锁可能被加锁了,但并不是由调用fork函数的那个线程锁住的,而是由其他线程锁住的。

如果是这种情况,则子进程若再次对该互斥锁执行加锁操作将会导致死锁,如代码清单14-3所示。

代码清单14-3 在多线程程序中调用fork函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include<pthread.h>
#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
#include<wait.h>
pthread_mutex_t mutex;

/*子线程运行的函数。它首先获得互斥锁mutex,然后暂停5 s,再释放该互斥锁*/
void*another(void*arg) {
printf("in child thread,lock the mutex\n");
pthread_mutex_lock(&mutex);
sleep(5);
pthread_mutex_unlock(&mutex);
return NULL;
}

int main() {
pthread_t id;
pthread_mutex_init(&mutex,NULL);
pthread_create(&id,NULL,another,NULL);
/*父进程中的主线程暂停1 s,以确保在执行fork操作之前,子线程已经开始运行并获得了互斥变量mutex*/
sleep(1);
int pid=fork();
if(pid<0) {
pthread_join(id,NULL);
pthread_mutex_destroy(&mutex);
return 1;
} else if(pid==0) {
printf("I am in the child,want to get the lock\n");
/*子进程从父进程继承了互斥锁mutex的状态,该互斥锁处于锁住的状态,这是由父进程中的子线程执行pthread_mutex_lock引起的,因此,下面这句加锁操作会一直阻塞,尽管从逻辑上来说它是不应该阻塞的*/
pthread_mutex_lock(&mutex);
printf("I can not run to here,oop...\n");
pthread_mutex_unlock(&mutex);
exit(0);
} else {
wait(NULL);
}
pthread_join(id,NULL);
pthread_mutex_destroy(&mutex);
return 0;
}

不过,pthread提供了一个专门的函数pthread_atfork,以确保fork调用后父进程和子进程都拥有一个清楚的锁状态。该函数的定义如下:

1
2
#include<pthread.h>
int pthread_atfork(void(*prepare)(void),void(*parent)(void),void(*child)(void));

该函数将建立3个fork句柄来帮助我们清理互斥锁的状态。

  • prepare句柄将在fork调用创建出子进程之前被执行。它可以用来锁住所有父进程中的互斥锁。
  • parent句柄则是fork调用创建出子进程之后,而fork返回之前,在父进程中被执行。它的作用是释放所有在prepare句柄中被锁住的互斥锁。
  • child句柄是fork返回之前,在子进程中被执行。和parent句柄一样,child句柄也是用于释放所有在prepare句柄中被锁住的互斥锁。该函数成功时返回0,失败则返回错误码。

因此,如果要让代码清单14-3正常工作,就应该在其中的fork调用前加入代码清单14-4所示的代码。

1
2
3
4
5
6
7
void prepare() {
pthread_mutex_lock(&mutex);
}
void infork() {
pthread_mutex_unlock(&mutex);
}
pthread_atfork(prepare,infork,infork);

8.3 线程和信号

每个线程都可以独立地设置信号掩码。我们在10.3.2小节讨论过设置进程信号掩码的函数sigprocmask,但在多线程环境下我们应该使用如下所示的pthread版本的sigprocmask函数来设置线程信号掩码:

1
2
3
#include<pthread.h>
#include<signal.h>
int pthread_sigmask(int how,const sigset_t*newmask,sigset_t*oldmask);

该函数的参数的含义与sigprocmask的参数完全相同,因此不再赘述。pthread_sigmask成功时返回0,失败则返回错误码。

  • 由于进程中的所有线程共享该进程的信号,所以线程库将根据线程掩码决定把信号发送给哪个具体的线程
  • 因此,如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。
  • 此外,所有线程共享信号处理函数。
  • 也就是说,当我们在一个线程中设置了某个信号的信号处理函数后,它将覆盖其他线程为同一个信号设置的信号处理函数。
  • 这两点都说明,我们应该定义一个专门的线程来处理所有的信号。这可以通过如下两个步骤来实现:

1)在主线程创建出其他子线程之前就调用pthread_sigmask来设置好信号掩码,所有新创建的子线程都将自动继承这个信号掩码。这样做之后,实际上所有线程都不会响应被屏蔽的信号了。

2)在某个线程中调用如下函数来等待信号并处理之:

1
2
#include<signal.h>
int sigwait(const sigset_t*set,int*sig);
  • set参数指定需要等待的信号的集合。
    • 我们可以简单地将其指定为在第1步中创建的信号掩码,表示在该线程中等待所有被屏蔽的信号。
  • 参数sig指向的整数用于存储该函数返回的信号值。
  • sigwait成功时返回0,失败则返回错误码。
  • 一旦sigwait正确返回,我们就可以对接收到的信号做处理了。很显然,如果我们使用了sigwait,就不应该再为信号设置信号处理函数了。这是因为当程序接收到信号时,二者中只能有一个起作用。

代码清单14-5取自pthread_sigmask函数的man手册。它展示了如何通过上述两个步骤实现在一个线程中统一处理所有信号。

代码清单14-5 用一个线程处理所有信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<signal.h>
#include<errno.h>

#define handle_error_en(en,msg)\ do{errno=en;perror(msg);exit(EXIT_FAILURE);}while(0)

static void*sig_thread(void*arg) {
sigset_t*set=(sigset_t*)arg;
int s,sig;
for(;;) {
/*第二个步骤,调用sigwait等待信号*/
s=sigwait(set,&sig);
if(s!=0) handle_error_en(s,"sigwait");
printf("Signal handling thread got signal%d\n",sig);
}
}

int main(int argc,char*argv[]) {
pthread_t thread;
sigset_t set;
int s;

/*第一个步骤,在主线程中设置信号掩码*/
sigemptyset(&set);
sigaddset(&set,SIGQUIT);
sigaddset(&set,SIGUSR1);

s=pthread_sigmask(SIG_BLOCK,&set,NULL);
if(s!=0) handle_error_en(s,"pthread_sigmask");
s=pthread_create(&thread,NULL,&sig_thread,(void*)&set);
if(s!=0) handle_error_en(s,"pthread_create");
pause();
}

最后,pthread还提供了下面的方法,使得我们可以明确地将一个信号发送给指定的线程:

1
2
#include<signal.h>
int pthread_kill(pthread_t thread,int sig);

其中,thread参数指定目标线程,sig参数指定待发送的信号。如果sig为0,则pthread_kill不发送信号,但它仍然会执行错误检查。我们可以利用这种方式来检测目标线程是否存在。pthread_kill成功时返回0,失败则返回错误码。


14. 多线程编程
http://binbo-zappy.github.io/2024/12/16/Linux高性能服务器编程-游双/14-多线程编程/
作者
Binbo
发布于
2024年12月16日
许可协议