13. 多进程编程

第13章 多进程编程

进程是Linux操作系统环境的基础,它控制着系统上几乎所有的活动。本章从系统程序员的角度来讨论Linux多进程编程,包括如下内容:

❑复制进程映像的fork系统调用和替换进程映像的exec系列系统调用。

❑僵尸进程以及如何避免僵尸进程。

❑进程间通信(Inter-Process Communication,IPC)最简单的方式:管道。

❑3种System V进程间通信方式:信号量、消息队列和共享内存。它们都是由AT&T System V2版本的UNIX引入的,所以统称为System V IPC。

❑在进程间传递文件描述符的通用方法:通过UNIX本地域socket传递特殊的辅助数据(关于辅助数据,参考5.8.3小节)。

1. fork系统调用

Linux下创建新进程的系统调用是fork。其定义如下:

1
2
3
#include<sys/types.h>
#include<unistd.h>
pid_t fork(void);
  • 该函数的每次调用都返回两次,在父进程中返回的是子进程的PID,在子进程中则返回0。
  • 该返回值是后续代码判断当前进程是父进程还是子进程的依据。fork调用失败时返回-1,并设置errno。

fork函数复制当前进程,在内核进程表中创建一个新的进程表项。

  • 新的进程表项有很多属性和原进程相同,比如堆指针、栈指针和标志寄存器的值。
  • 但也有许多属性被赋予了新的值,比如该进程的PPID被设置成原进程的PID,信号位图被清除(原进程设置的信号处理函数不再对新进程起作用)。

子进程的代码与父进程完全相同,同时它还会复制父进程的数据(堆数据、栈数据和静态数据)。

  • 数据的复制采用的是所谓的写时复制(copy on write),即只有在任一进程(父进程或子进程)对数据执行了写操作时,复制才会发生(先是缺页中断,然后操作系统给子进程分配内存并复制父进程的数据)。
  • 即便如此,如果我们在程序中分配了大量内存,那么使用fork时也应当十分谨慎,尽量避免没必要的内存分配和数据复制。

此外,创建子进程后,父进程中打开的文件描述符默认在子进程中也是打开的,且文件描述符的引用计数加1。不仅如此,父进程的用户根目录、当前工作目录等变量的引用计数均会加1。

2. exec系列系统调用

有时我们需要在子进程中执行其他程序,即替换当前进程映像,这就需要使用如下exec系列函数之一:

1
2
3
4
5
6
7
8
#include<unistd.h>
extern char**environ;
int execl(const char*path,const char*arg,...);
int execlp(const char*file,const char*arg,...);
int execle(const char*path,const char*arg,...,char*const envp[]);
int execv(const char*path,char*const argv[]);
int execvp(const char*file,char*const argv[]);
int execve(const char*path,char*const argv[],char*const envp[]);
  • path参数指定可执行文件的完整路径,
  • file参数可以接受文件名,该文件的具体位置则在环境变量PATH中搜寻。
  • arg接受可变参数,argv则接受参数数组,它们都会被传递给新程序(path或file指定的程序)的main函数。
  • envp参数用于设置新程序的环境变量。如果未设置它,则新程序将使用由全局变量environ指定的环境变量。

一般情况下,exec函数是不返回的,除非出错。它出错时返回-1,并设置errno。

  • 如果没出错,则原程序中exec调用之后的代码都不会执行,因为此时原程序已经被exec的参数指定的程序完全替换(包括代码和数据)。

  • exec函数不会关闭原程序打开的文件描述符,除非该文件描述符被设置了类似SOCK_CLOEXEC的属性(见5.2节)。

3. 处理僵尸进程

对于多进程程序而言,父进程一般需要跟踪子进程的退出状态

因此,当子进程结束运行时,内核不会立即释放该进程的进程表项,以满足父进程后续对该子进程退出信息的查询(如果父进程还在运行)。

在子进程结束运行之后,父进程读取其退出状态之前,我们称该子进程处于僵尸态。另外一种使子进程进入僵尸态的情况是:父进程结束或者异常终止,而子进程继续运行。此时子进程的PPID将被操作系统设置为1,即init进程。init进程接管了该子进程,并等待它结束。在父进程退出之后,子进程退出之前,该子进程处于僵尸态。

由此可见,无论哪种情况,如果父进程没有正确地处理子进程的返回信息,子进程都将停留在僵尸态,并占据着内核资源。这是绝对不能容许的,毕竟内核资源有限。

下面这对函数在父进程中调用,以等待子进程的结束,并获取子进程的返回信息,从而避免了僵尸进程的产生,或者使子进程的僵尸态立即结束:

1
2
3
4
#include<sys/types.h>
#include<sys/wait.h>
pid_t wait(int*stat_loc);
pid_t waitpid(pid_t pid,int*stat_loc,int options);
  • wait函数将阻塞进程,直到该进程的某个子进程结束运行为止。
  • 它返回结束运行的子进程的PID,并将该子进程的退出状态信息存储于stat_loc参数指向的内存中。sys/wait.h头文件中定义了几个宏来帮助解释子进程的退出状态信息,如表13-1所示。

wait函数的阻塞特性显然不是服务器程序期望的,而waitpid函数解决了这个问题。

  • waitpid只等待由pid参数指定的子进程。
  • 如果pid取值为-1,那么它就和wait函数相同,即等待任意一个子进程结束。
  • stat_loc参数的含义和wait函数的stat_loc参数相同。
  • options参数可以控制waitpid函数的行为。
    • 该参数最常用的取值是WNOHANG。
    • 当options的取值是WNOHANG时,waitpid调用将是非阻塞的:
    • 如果pid指定的目标子进程还没有结束或意外终止,则waitpid立即返回0;
    • 如果目标子进程确实正常退出了,则waitpid返回该子进程的PID。
    • waitpid调用失败时返回-1并设置errno。

8.3节曾提到,要在事件已经发生的情况下执行非阻塞调用才能提高程序的效率。

对waitpid函数而言,我们最好在某个子进程退出之后再调用它。那么父进程从何得知某个子进程已经退出了呢?这正是SIGCHLD信号的用途。

当一个进程结束时,它将给其父进程发送一个SIGCHLD信号。因此,我们可以在父进程中捕获SIGCHLD信号,并在信号处理函数中调用waitpid函数以“彻底结束”一个子进程,如代码清单13-1所示。

代码清单13-1 SIGCHLD信号的典型处理函数

1
2
3
4
5
6
7
static void handle_child(int sig) {
pid_t pid;
int stat;
while((pid=waitpid(-1,&stat,WNOHANG))>0) {
/*对结束的子进程进行善后处理*/
}
}

4. 管道

管道能在父、子进程间传递数据,利用的是fork调用之后两个管道文件描述符(fd[0]和fd[1])都保持打开。一对这样的文件描述符只能保证父、子进程间一个方向的数据传输,父进程和子进程必须有一个关闭fd[0],另一个关闭fd[1]。比如,我们要使用管道实现从父进程向子进程写数据,就应该按照图13-1所示来操作。

  • 显然,如果要实现父、子进程之间的双向数据传输,就必须使用两个管道。
  • 第6章中我们还介绍过,socket编程接口提供了一个创建全双工管道的系统调用:socketpair。
  • squid服务器程序(见第4章)就是利用socketpair创建管道,以实现在父进程和日志服务子进程之间传递日志信息,下面我们简单地分析之。在测试机器Kongming20上有如下环境:
1
2
3
4
5
6
7
8
9
10
11
$ps -ef|grep squid
root 12489 1 0 20:37? 00:00:00 squid
squid 12491 12489 0 20:37? 00:00:02(squid-1) squid
12492 12491 0 20:37? 00:00:00(logfile daemon)/var/log/squid/access.log
squid 12493 12491 0 20:37? 00:00:00(unlinkd) $sudo lsof -p 12491
squid 12491 squid 9u unix 0xeaf2b440 0t0 40603 socket
$sudo lsof -p 12492
log_file_12492 squid 0u unix 0xeaf2b680 0t0 40604 socket
log_file_12492 squid 1u unix 0xeaf2b680 0t0 40604 socket
log_file_12492 squid 2u CHR 1,3 0t0 4449/dev/null
log_file_12492 squid 3w REG 8,3 202271412/var/log/squid/access.log

这些输出说明Kongming20上开启了squid服务。

  • 该服务创建了几个子进程,其中子进程12492专门用于输出日志到/var/log/squid/access.log文件。
  • 父进程12491使用socketpair创建了一对UNIX域socket,然后关闭了其中的一个,剩下的那个socket的值是9。
  • 子进程12492则从父进程12491继承了这一对UNIX域socket,并关闭了其中的另外一个,剩下的那个socket则被dup到标准输入和标准输出上。
  • 下面我们telnet到squid服务上,并向它发送部分数据。
  • 同时开启另外两个终端,分别运行strace命令以查看进程12491和12492在这个过程中交换的数据。具体操作如代码清单13-2所示。

代码清单13-2 用strace命令查看管道通信

1
2
3
4
5
6
7
8
9
10
$telnet 192.168.1.109 squid
Trying 192.168.1.109...
Connected to 192.168.1.109.
Escape character is '^]'.
a(回车)
$sudo strace -p 12491
write(9,"L1338385956.213 40 192.168.1"...,104)=104
$sudo strace -p 12492
read(0,"L1338385956.213 40 192.168.1"...,4096)=104
write(3,"1338385956.213 40 192.168.1."...,101)=101

由此可见,进程12491接收到客户数据后将日志信息输出至管道(写文件描述符9)。日志服务子进程使用阻塞读操作等待管道上有数据可读(读文件描述符0),然后将读取到的日志信息写入/var/log/squid/access.log文件(写文件描述符3)。

不过,管道只能用于有关联的两个进程(比如父、子进程)间的通信。

而下面要讨论的3种System V IPC能用于无关联的多个进程之间的通信,因为它们都使用一个全局唯一的键值来标识一条信道。

不过,有一种特殊的管道称为FIFO[1](First In First Out,先进先出),也叫命名管道。它也能用于无关联进程之间的通信。因为FIFO管道在网络编程中使用不多,所以本书不讨论它。

[1]这里要注意一下,虽然这种特殊的管道被专门命名为FIFO,但并不是只有这种管道才遵循先进先出的原则,其实所有的管道都遵循先进先出的原则。

5. 信号量

5.1 信号量原语

当多个进程同时访问系统上的某个资源的时候,比如同时写一个数据库的某条记录,或者同时修改某个文件,就需要考虑进程的同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问

通常,程序对共享资源的访问的代码只是很短的一段,但就是这一段代码引发了进程之间的竞态条件。我们称这段代码为关键代码段,或者临界区。对进程同步,也就是确保任一时刻只有一个进程能进入关键代码段。

要编写具有通用目的的代码,以确保关键代码段的独占式访问是非常困难的。有两个名为Dekker算法和Peterson算法的解决方案,它们试图从语言本身(不需要内核支持)解决并发问题。但它们依赖于忙等待,即进程要持续不断地等待某个内存位置状态的改变。这种方式下CPU利用率太低,显然是不可取的。

Dijkstra提出的信号量(Semaphore)概念是并发编程领域迈出的重要一步。

  • 信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作:等待(wait)和信号(signal)。
  • 不过在Linux/UNIX中,“等待”和“信号”都已经具有特殊的含义,所以对信号量的这两种操作更常用的称呼是P、V操作。
  • 这两个字母来自于荷兰语单词passeren(传递,就好像进入临界区)和vrijgeven(释放,就好像退出临界区)。
  • 假设有信号量SV,则对它的P、V操作含义如下:

❑P(SV),如果SV的值大于0,就将它减1;如果SV的值为0,则挂起进程的执行。

❑V(SV),如果有其他进程因为等待SV而挂起,则唤醒之;如果没有,则将SV加1。

信号量的取值可以是任何自然数。但最常用的、最简单的信号量是二进制信号量,它只能取0和1这两个值。使用二进制信号量同步两个进程,以确保关键代码段的独占式访问的一个典型例子如图13-2所示。

图13-2 使用信号量保护关键代码段
  • 当关键代码段可用时,二进制信号量SV的值为1,进程A和B都有机会进入关键代码段。
  • 如果此时进程A执行了P(SV)操作将SV减1,则进程B若再执行P(SV)操作就会被挂起。直到进程A离开关键代码段,并执行V(SV)操作将SV加1,关键代码段才重新变得可用。
  • 如果此时进程B因为等待SV而处于挂起状态,则它将被唤醒,并进入关键代码段。同样,这时进程A如果再执行P(SV)操作,则也只能被操作系统挂起以等待进程B退出关键代码段。

注意 使用一个普通变量来模拟二进制信号量是行不通的,因为所有高级语言都没有一个原子操作可以同时完成如下两步操作:检测变量是否为true/false,如果是,则再将它设置为false/true。

Linux信号量的API都定义在sys/sem.h头文件中,主要包含3个系统调用:semget、semop和semctl。它们都被设计为操作一组信号量,即信号量集,而不是单个信号量,因此这些接口看上去多少比我们期望的要复杂一点。

5.2 semget系统调用

semget系统调用创建一个新的信号量集,或者获取一个已经存在的信号量集。其定义如下:

1
2
#include<sys/sem.h>
int semget(key_t key,int num_sems,int sem_flags);
  • key参数是一个键值,用来标识一个全局唯一的信号量集,就像文件名全局唯一地标识一个文件一样。要通过信号量通信的进程需要使用相同的键值来创建/获取该信号量。

  • num_sems参数指定要创建/获取的信号量集中信号量的数目。如果是创建信号量,则该值必须被指定;如果是获取已经存在的信号量,则可以把它设置为0。

  • sem_flags参数指定一组标志。

    • 它低端的9个比特是该信号量的权限,其格式和含义都与系统调用open的mode参数相同。
    • 此外,它还可以和IPC_CREAT标志做按位“或”运算以创建新的信号量集。
    • 此时即使信号量已经存在,semget也不会产生错误。
    • 我们还可以联合使用IPC_CREAT和IPC_EXCL标志来确保创建一组新的、唯一的信号量集。
    • 在这种情况下,如果信号量集已经存在,则semget返回错误并设置errno为EEXIST。
    • 这种创建信号量的行为与用O_CREAT和O_EXCL标志调用open来排他式地打开一个文件相似。
  • semget成功时返回一个正整数值,它是信号量集的标识符;semget失败时返回-1,并设置errno。

如果semget用于创建信号量集,则与之关联的内核数据结构体semid_ds将被创建并初始化。

semid_ds结构体的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<sys/sem.h>
/*该结构体用于描述IPC对象(信号量、共享内存和消息队列)的权限*/
struct ipc_perm {
key_t key;/*键值*/
uid_t uid;/*所有者的有效用户ID*/
gid_t gid;/*所有者的有效组ID*/
uid_t cuid;/*创建者的有效用户ID*/
gid_t cgid;/*创建者的有效组ID*/
mode_t mode;/*访问权限*/
/*省略其他填充字段*/
}

struct semid_ds {
struct ipc_perm sem_perm;/*信号量的操作权限*/
unsigned long int sem_nsems;/*该信号量集中的信号量数目*/
time_t sem_otime;/*最后一次调用semop的时间*/
time_t sem_ctime;/*最后一次调用semctl的时间*/
/*省略其他填充字段*/
};

semget对semid_ds结构体的初始化包括:

❑将sem_perm.cuid和sem_perm.uid设置为调用进程的有效用户ID。

❑将sem_perm.cgid和sem_perm.gid设置为调用进程的有效组ID。

❑将sem_perm.mode的最低9位设置为sem_flags参数的最低9位。

❑将sem_nsems设置为num_sems。

❑将sem_otime设置为0。

❑将sem_ctime设置为当前的系统时间。

5.3 semop系统调用

semop系统调用改变信号量的值,即执行P、V操作。在讨论semop之前,我们需要先介绍与每个信号量关联的一些重要的内核变量:

1
2
3
4
unsigned short semval; /*信号量的值*/
unsigned short semzcnt; /*等待信号量值变为0的进程数量*/
unsigned short semncnt; /*等待信号量值增加的进程数量*/
pid_t sempid; /*最后一次执行semop操作的进程ID*/

semop对信号量的操作实际上就是对这些内核变量的操作。semop的定义如下:

1
2
#include<sys/sem.h>
int semop(int sem_id,struct sembuf*sem_ops,size_t num_sem_ops);
  • sem_id参数是由semget调用返回的信号量集标识符,用以指定被操作的目标信号量集。
  • sem_ops参数指向一个sembuf结构体类型的数组,sembuf结构体的定义如下:
1
2
3
4
5
struct sembuf{
unsigned short int sem_num; /*信号量集中信号量的编号,0表示信号量集中的第一个信号量*/
short int sem_op; /*指定操作类型,其可选值为正整数、0和负整数。每种类型的操作的行为又受到sem_flg成员的影响*/
short int sem_flg; /*可选值是IPC_NOWAIT和SEM_UNDO。IPC_NOWAIT的含义是,无论信号量操作是否成功,semop调用都将立即返回,这类似于非阻塞I/O操作。SEM_UNDO的含义是,当进程退出时取消正在进行的semop操作*/
}
  • 如果sem_op大于0,则semop将被操作的信号量的值semval增加sem_op。
    • 该操作要求调用进程对被操作信号量集拥有写权限。
    • 此时若设置了SEM_UNDO标志,则系统将更新进程的semadj变量(用以跟踪进程对信号量的修改情况)。
  • 如果sem_op等于0,则表示这是一个“等待0”(wait-for-zero)操作。
    • 该操作要求调用进程对被操作信号量集拥有读权限。
    • 如果此时信号量的值是0,则调用立即成功返回。
    • 如果信号量的值不是0,则semop失败返回或者阻塞进程以等待信号量变为0。
    • 在这种情况下,当IPC_NOWAIT标志被指定时,semop立即返回一个错误,并设置errno为EAGAIN。
    • 如果未指定IPC_NOWAIT标志,则信号量的semzcnt值加1,进程被投入睡眠直到下列3个条件之一发生:
      • 信号量的值semval变为0,此时系统将该信号量的semzcnt值减1;
      • 被操作信号量所在的信号量集被进程移除,此时semop调用失败返回,errno被设置为EIDRM;
      • 调用被信号中断,此时semop调用失败返回,errno被设置为EINTR,同时系统将该信号量的semzcnt值减1。
  • 如果sem_op小于0,则表示对信号量值进行减操作,即期望获得信号量。
    • 该操作要求调用进程对被操作信号量集拥有写权限。
    • 如果信号量的值semval大于或等于sem_op的绝对值,则semop操作成功,调用进程立即获得信号量,并且系统将该信号量的semval值减去sem_op的绝对值。
    • 此时如果设置了SEM_UNDO标志,则系统将更新进程的semadj变量。
    • 如果信号量的值semval小于sem_op的绝对值,则semop失败返回或者阻塞进程以等待信号量可用。
    • 在这种情况下,当IPC_NOWAIT标志被指定时,semop立即返回一个错误,并设置errno为EAGAIN。
    • 如果未指定IPC_NOWAIT标志,则信号量的semncnt值加1,进程被投入睡眠直到下列3个条件之一发生:
      • 信号量的值semval变得大于或等于sem_op的绝对值,此时系统将该信号量的semncnt值减1,并semval减去sem_op的绝对值,同时如果SEM_UNDO标志被设置,则系统更新semadj变量;
      • 被操作信号量所在的信号量集被进程移除,此时semop调用失败返回,errno被设置为EIDRM;
      • 调用被信号中断,此时semop调用失败返回,errno被设置为EINTR,同时系统将该信号量的semncnt值减1。
  • semop系统调用的第3个参数num_sem_ops指定要执行的操作个数,即sem_ops数组中元素的个数。
    • semop对数组sem_ops中的每个成员按照数组顺序依次执行操作,并且该过程是原子操作,以避免别的进程在同一时刻按照不同的顺序对该信号集中的信号量执行semop操作导致的竞态条件。
  • semop成功时返回0,失败则返回-1并设置errno。失败的时候,sem_ops数组中指定的所有操作都不被执行。

5.4 semctl系统调用

semctl系统调用允许调用者对信号量进行直接控制。其定义如下:

1
2
#include<sys/sem.h>
int semctl(int sem_id,int sem_num,int command,...);
  • sem_id参数是由semget调用返回的信号量集标识符,用以指定被操作的信号量集。
  • sem_num参数指定被操作的信号量在信号量集中的编号。
  • command参数指定要执行的命令。
  • 有的命令需要调用者传递第4个参数。第4个参数的类型由用户自己定义,但sys/sem.h头文件给出了它的推荐格式,具体如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
union semun {
int val; /*用于SETVAL命令*/
struct semid_ds*buf; /*用于IPC_STAT和IPC_SET命令*/
unsigned short*array; /*用于GETALL和SETALL命令*/
struct seminfo*__buf; /*用于IPC_INFO命令*/
};

struct seminfo {
内核没有使用
int semmap; /*Linux内核没有使用*/
int semmni; /*系统最多可以拥有的信号量集数目*/
int semmns; /*系统最多可以拥有的信号量数目*/
int semmnu; /*Linux内核没有使用*/
int semmsl; /*一个信号量集最多允许包含的信号量数目*/
int semopm; /*semop一次最多能执行的sem_op操作数目*/
int semume; /*Linux内核没有使用*/
int semusz; /*sem_undo结构体的大小*/
int semvmx; /*最大允许的信号量值*/
/*最多允许的UNDO次数(带SEM_UNDO标志的semop操作的次数)*/
int semaem;
};

semctl支持的所有命令如表13-2所示。

注意 这些操作中,GETNCNT、GETPID、GETVAL、GETZCNT和SETVAL操作的是单个信号量,它是由标识符sem_id指定的信号量集中的第sem_num个信号量;而其他操作针对的是整个信号量集,此时semctl的参数sem_num被忽略。

semctl成功时的返回值取决于command参数,如表13-2所示。semctl失败时返回-1,并设置errno。

5.5 特殊键值IPC_PRIVATE

  • semget的调用者可以给其key参数传递一个特殊的键值IPC_PRIVATE(其值为0),这样无论该信号量是否已经存在,semget都将创建一个新的信号量。
  • 使用该键值创建的信号量并非像它的名字声称的那样是进程私有的。
  • 其他进程,尤其是子进程,也有方法来访问这个信号量。所以semget的man手册的BUGS部分上说,使用名字IPC_PRIVATE有些误导(历史原因),应该称为IPC_NEW。

比如下面的代码清单13-3就在父、子进程间使用一个IPC_PRIVATE信号量来同步。

代码清单13-3 使用IPC_PRIVATE信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

union semun {
int val;
struct semid_ds *buf;
unsigned short int* array;
struct seminfo* __buf;
};

// op为-1时执行P操作,op为1时执行V操作
void pv(int sem_id, int op) {
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = op;
sem_b.sem_flg = SEM_UNDO;
semop(sem_id, &sem_b, 1);
}

int main(int argc, char* argv[]) {
int sem_id = semget(IPC_PRIVATE, 1, 0666);
union semun sem_un;
sem_un.val = 1;
semctl(sem_id, 0, SETVAL, sem_un);

pid_t id = fork();
if(id < 0) {
return 1;
} else if(id == 0) {
printf("child try to get binary sem\n");
pv(sem_id, -1);
printf("child get the sem and would release \n");
sleep(5);
pv(sem_id, 1);
exit(0);
} else {
printf("parent try to get binary sem\n");
pv(sem_id, -1);
printf("parent get the sem and would release \n");
sleep(5);
pv(sem_id, 1);
}
waitpid(id, NULL, 0);
semctl(sem_id, 0, IPC_RMID, sem_un);
return 0;
}

另外一个例子是:工作在prefork模式下的httpd网页服务器程序使用1个IPC_PRIVATE信号量来同步各子进程对epoll_wait的调用权。下面我们简单分析一下这个例子。在测试机器Kongming20上,使用strace命令依次查看httpd的各子进程是如何协调工作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ps -ef|grep httpd
root 1701 1 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1703 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1704 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1705 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1706 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1707 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1708 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1709 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
apache 1710 1701 0 09:17? 00:00:00/usr/sbin/httpd-k start
$sudo strace -p 1703
semop(393222,{{0,-1,SEM_UNDO}},1)
$sudo strace -p 1704
semop(393222,{{0,-1,SEM_UNDO}},1)
……
$sudo strace -p 1709
epoll_wait(14,{},2,10000)=0
$sudo strace -p 1710
semop(393222,{{0,-1,SEM_UNDO}},1)

由此可见,httpd的子进程1703~1708和1710都在等待信号量393222(这是一个标识符)。

只有进程1709暂时拥有该信号量,因为进程1709调用epoll_wait以等待新的客户连接。

当有新连接到来时,进程1709将接受之,并对信号量393222执行V操作,此时将有另外一个子进程获得该信号量并调用epoll_wait来等待新的客户连接。

那么我们如何知道信号量393222是使用键值IPC_PRIVATE创建的呢?答案将在13.8节揭晓。

下面要讨论另外两种IPC——共享内存和消息队列。这两种IPC在创建资源的时候也支持IPC_PRIVATE键值,其含义与信号量的IPC_PRIVATE键值完全相同。

6. 共享内存

共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输。这种高效率带来的问题是,我们必须用其他辅助手段来同步进程对共享内存的访问,否则会产生竞态条件。因此,共享内存通常和其他进程间通信方式一起使用。

Linux共享内存的API都定义在sys/shm.h头文件中,包括4个系统调用:shmget、shmat、shmdt和shmctl。我们将依次讨论之。

6.1 shmget系统调用

shmget系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。其定义如下:

1
2
#include<sys/shm.h>
int shmget(key_t key,size_t size,int shmflg);
  • 和semget系统调用一样,key参数是一个键值,用来标识一段全局唯一的共享内存。
  • size参数指定共享内存的大小,单位是字节。
  • 如果是创建新的共享内存,则size值必须被指定。
  • 如果是获取已经存在的共享内存,则可以把size设置为0。

shmflg参数的使用和含义与semget系统调用的sem_flags参数相同。不过shmget支持两个额外的标志——SHM_HUGETLB和SHM_NORESERVE。它们的含义如下:

❑SHM_HUGETLB,类似于mmap的MAP_HUGETLB标志,系统将使用“大页面”来为共享内存分配空间。

❑SHM_NORESERVE,类似于mmap的MAP_NORESERVE标志,不为共享内存保留交换分区(swap空间)。这样,当物理内存不足的时候,对该共享内存执行写操作将触发SIGSEGV信号。

shmget成功时返回一个正整数值,它是共享内存的标识符。shmget失败时返回-1,并设置errno。

如果shmget用于创建共享内存,则这段共享内存的所有字节都被初始化为0,与之关联的内核数据结构shmid_ds将被创建并初始化。shmid_ds结构体的定义如下:

1
2
3
4
5
6
7
8
9
10
11
struct shmid_ds {
struct ipc_perm shm_perm;/*共享内存的操作权限*/
size_t shm_segsz;/*共享内存大小,单位是字节*/
__time_t shm_atime;/*对这段内存最后一次调用shmat的时间*/
__time_t shm_dtime;/*对这段内存最后一次调用shmdt的时间*/
__time_t shm_ctime;/*对这段内存最后一次调用shmctl的时间*/
__pid_t shm_cpid;/*创建者的PID*/
__pid_t shm_lpid;/*最后一次执行shmat或shmdt操作的进程的PID*/
shmatt_t shm_nattach;/*目前关联到此共享内存的进程数量*/
/*省略一些填充字段*/
};

shmget对shmid_ds结构体的初始化包括:

❑将shm_perm.cuid和shm_perm.uid设置为调用进程的有效用户ID。

❑将shm_perm.cgid和shm_perm.gid设置为调用进程的有效组ID。

❑将shm_perm.mode的最低9位设置为shmflg参数的最低9位。

❑将shm_segsz设置为size。

❑将shm_lpid、shm_nattach、shm_atime、shm_dtime设置为0。

❑将shm_ctime设置为当前的时间。

6.2 shmat和shmdt系统调用

共享内存被创建/获取之后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中。使用完共享内存之后,我们也需要将它从进程地址空间中分离。这两项任务分别由如下两个系统调用实现:

1
2
3
#include<sys/shm.h>
void*shmat(int shm_id,const void*shm_addr,int shmflg);
int shmdt(const void*shm_addr);
  • 其中,shm_id参数是由shmget调用返回的共享内存标识符。
  • shm_addr参数指定将共享内存关联到进程的哪块地址空间,最终的效果还受到shmflg参数的可选标志SHM_RND的影响:

❑如果shm_addr为NULL,则被关联的地址由操作系统选择。这是推荐的做法,以确保代码的可移植性。

❑如果shm_addr非空,并且SHM_RND标志未被设置,则共享内存被关联到addr指定的地址处。

❑如果shm_addr非空,并且设置了SHM_RND标志,

  • 则被关联的地址是[shm_addr-(shm_addr%SHMLBA)]。
  • SHMLBA的含义是“段低端边界地址倍数”(Segment Low Boundary Address Multiple),它必须是内存页面大小(PAGE_SIZE)的整数倍。
  • 现在的Linux内核中,它等于一个内存页大小。SHM_RND的含义是圆整(round),即将共享内存被关联的地址向下圆整到离shm_addr最近的SHMLBA的整数倍地址处。

除了SHM_RND标志外,shmflg参数还支持如下标志:

❑SHM_RDONLY。进程仅能读取共享内存中的内容。若没有指定该标志,则进程可同时对共享内存进行读写操作(当然,这需要在创建共享内存的时候指定其读写权限)。

❑SHM_REMAP。如果地址shmaddr已经被关联到一段共享内存上,则重新关联。

❑SHM_EXEC。它指定对共享内存段的执行权限。对共享内存而言,执行权限实际上和读权限是一样的。

shmat成功时返回共享内存被关联到的地址,失败则返回(void*)-1并设置errno。shmat成功时,将修改内核数据结构shmid_ds的部分字段,如下:

❑将shm_nattach加1。

❑将shm_lpid设置为调用进程的PID。

❑将shm_atime设置为当前的时间。

shmdt函数将关联到shm_addr处的共享内存从进程中分离。它成功时返回0,失败则返回-1并设置errno。shmdt在成功调用时将修改内核数据结构shmid_ds的部分字段,如下:

❑将shm_nattach减1。

❑将shm_lpid设置为调用进程的PID。

❑将shm_dtime设置为当前的时间。

6.3 shmctl系统调用

shmctl系统调用控制共享内存的某些属性。其定义如下:

1
2
#include<sys/shm.h>
int shmctl(int shm_id,int command,struct shmid_ds*buf);

其中,shm_id参数是由shmget调用返回的共享内存标识符。command参数指定要执行的命令。

shmctl成功时的返回值取决于command参数,如表13-3所示。shmctl失败时返回-1,并设置errno。

6.4 共享内存的POSIX方法

  • 6.5节中我们介绍过mmap函数。利用它的MAP_ANONYMOUS标志我们可以实现父、子进程之间的匿名内存共享。
  • 通过打开同一个文件,mmap也可以实现无关进程之间的内存共享。
  • Linux提供了另外一种利用mmap在无关进程之间共享内存的方式。这种方式无须任何文件的支持,但它需要先使用如下函数来创建或打开一个POSIX共享内存对象:
1
2
3
4
#include<sys/mman.h>
#include<sys/stat.h>
#include<fcntl.h>
int shm_open(const char*name,int oflag,mode_t mode);

shm_open的使用方法与open系统调用完全相同。

  • name参数指定要创建/打开的共享内存对象。
  • 从可移植性的角度考虑,该参数应该使用“/somename”的格式:以“/”开始,后接多个字符,且这些字符都不是“/”;
  • 以“\0”结尾,长度不超过NAME_MAX(通常是255)。

oflag参数指定创建方式。它可以是下列标志中的一个或者多个的按位或:

❑O_RDONLY。以只读方式打开共享内存对象。

❑O_RDWR。以可读、可写方式打开共享内存对象。

❑O_CREAT。如果共享内存对象不存在,则创建之。此时mode参数的最低9位将指定该共享内存对象的访问权限。共享内存对象被创建的时候,其初始长度为0。

❑O_EXCL。和O_CREAT一起使用,如果由name指定的共享内存对象已经存在,则shm_open调用返回错误,否则就创建一个新的共享内存对象。

❑O_TRUNC。如果共享内存对象已经存在,则把它截断,使其长度为0。

shm_open调用成功时返回一个文件描述符。该文件描述符可用于后续的mmap调用,从而将共享内存关联到调用进程。shm_open失败时返回-1,并设置errno。

和打开的文件最后需要关闭一样,由shm_open创建的共享内存对象使用完之后也需要被删除。这个过程是通过如下函数实现的:

1
2
3
4
#include<sys/mman.h>
#include<sys/stat.h>
#include<fcntl.h>
int shm_unlink(const char*name);

该函数将name参数指定的共享内存对象标记为等待删除。当所有使用该共享内存对象的进程都使用ummap将它从进程中分离之后,系统将销毁这个共享内存对象所占据的资源。

如果代码中使用了上述POSIX共享内存函数,则编译的时候需要指定链接选项-lrt。

6.5 共享内存实例

在9.6.2小节中,我们介绍过一个聊天室服务器程序。下面我们将它修改为一个多进程服务器:一个子进程处理一个客户连接。同时,我们将所有客户socket连接的读缓冲设计为一块共享内存,如代码清单13-4所示。

代码清单13-4 使用共享内存的聊天室服务器程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
// 处理一个客户连接必要的数据
struct client_data
{
// 客户端的socket地址
sockaddr_in address;
// socket文件描述符
int connfd;
// 处理这个连接的子进程的PID
pid_t pid;
// 和父进程通信用的管道
int pipefd[2];
};
// 定义共享内存的名称
static const char* shm_name = "/my_shm";
// 信号管道文件描述符
int sig_pipefd[2];
// epoll文件描述符
int epollfd;
// 监听socket文件描述符
int listenfd;
// 共享内存文件描述符
int shmfd;
// 客户端连接数组,用于存储每个客户端的连接数据
char* share_mem = 0;
// 客户连接数组。进程用客户连接的编号来索引这个数组,即可取得相关的客户连接数据
client_data* users = 0;
// 子进程和客户连接的映射关系表。用进程的PID来索引这个数组,
// 即可取得该进程所处理的客户连接的编号
int* sub_process = 0;
// 当前客户数量
int user_count = 0;
// 控制子进程是否停止的标志
bool stop_child = false;

// 设置非阻塞模式的函数
int setnonblocking( int fd )
{
// 获取当前文件描述符的选项
int old_option = fcntl( fd, F_GETFL );
// 设置新的选项,添加非阻塞标志
int new_option = old_option | O_NONBLOCK;
// 应用新的选项
fcntl( fd, F_SETFL, new_option );
// 返回旧的选项
return old_option;
}
// 向epoll实例添加文件描述符的函数
void addfd( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

void sig_handler( int sig )
{
int save_errno = errno;
int msg = sig;
send( sig_pipefd[1], ( char* )&msg, 1, 0 );
errno = save_errno;
}

void addsig( int sig, void(*handler)(int), bool restart = true )
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}

void del_resource()
{
close( sig_pipefd[0] );
close( sig_pipefd[1] );
close( listenfd );
close( epollfd );
shm_unlink( shm_name );
delete [] users;
delete [] sub_process;
}

// 停止一个子进程
void child_term_handler( int sig )
{
stop_child = true;
}

// 子进程运行的函数。参数idx指出该子进程处理的客户连接的编号,users是保存所有
// 客户连接数据的数组,参数share_mem指出共享内存的起始地址
int run_child( int idx, client_data* users, char* share_mem )
{
epoll_event events[ MAX_EVENT_NUMBER ];
// 子进程使用I/O复用技术来同时监听两个文件描述符:客户连接socket、
// 与父进程通信的管道文件描述符
int child_epollfd = epoll_create( 5 );
assert( child_epollfd != -1 );
int connfd = users[idx].connfd;
addfd( child_epollfd, connfd );
int pipefd = users[idx].pipefd[1];
addfd( child_epollfd, pipefd );
int ret;
addsig( SIGTERM, child_term_handler, false );

while( !stop_child )
{
int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )
{
memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );
ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );
if( ret < 0 )
{
if( errno != EAGAIN )
{
stop_child = true;
}
}
else if( ret == 0 )
{
stop_child = true;
}
else
{
send( pipefd, ( char* )&idx, sizeof( idx ), 0 );
}
}
else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )
{
int client = 0;
ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
if( ret < 0 )
{
if( errno != EAGAIN )
{
stop_child = true;
}
}
else if( ret == 0 )
{
stop_child = true;
}
else
{
send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );
}
}
else
{
continue;
}
}
}

close( connfd );
close( pipefd );
close( child_epollfd );
return 0;
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );
// 初始化用户计数器
user_count = 0;
// 分配用户数据数组,多分配一个空间用于哨兵值
users = new client_data [ USER_LIMIT+1 ];
// 分配子进程映射表,初始化所有元素为-1
sub_process = new int [ PROCESS_LIMIT ];
for( int i = 0; i < PROCESS_LIMIT; ++i )
{
sub_process[i] = -1;
}
// 创建epoll事件数组
epoll_event events[ MAX_EVENT_NUMBER ];
epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd );

ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );
assert( ret != -1 );
setnonblocking( sig_pipefd[1] );
addfd( epollfd, sig_pipefd[0] );

addsig( SIGCHLD, sig_handler );
addsig( SIGTERM, sig_handler );
addsig( SIGINT, sig_handler );
addsig( SIGPIPE, SIG_IGN );
bool stop_server = false;
bool terminate = false;

shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );
assert( shmfd != -1 );
ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE );
assert( ret != -1 );

share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );
assert( share_mem != MAP_FAILED );
close( shmfd );

while( !stop_server )
{
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
if( user_count >= USER_LIMIT )
{
const char* info = "too many users\n";
printf( "%s", info );
send( connfd, info, strlen( info ), 0 );
close( connfd );
continue;
}
users[user_count].address = client_address;
users[user_count].connfd = connfd;
ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );
assert( ret != -1 );
pid_t pid = fork();
if( pid < 0 )
{
close( connfd );
continue;
}
else if( pid == 0 )
{
close( epollfd );
close( listenfd );
close( users[user_count].pipefd[0] );
close( sig_pipefd[0] );
close( sig_pipefd[1] );
run_child( user_count, users, share_mem );
munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE );
exit( 0 );
}
else
{
close( connfd );
close( users[user_count].pipefd[1] );
addfd( epollfd, users[user_count].pipefd[0] );
users[user_count].pid = pid;
sub_process[pid] = user_count;
user_count++;
}
}
else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
int sig;
char signals[1024];
ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
if( ret == -1 )
{
continue;
}
else if( ret == 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
case SIGCHLD:
{
pid_t pid;
int stat;
while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
{
int del_user = sub_process[pid];
sub_process[pid] = -1;
if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
{
printf( "the deleted user was not change\n" );
continue;
}
epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );
close( users[del_user].pipefd[0] );
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
printf( "child %d exit, now we have %d users\n", del_user, user_count );
}
if( terminate && user_count == 0 )
{
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT:
{
printf( "kill all the clild now\n" );
//addsig( SIGTERM, SIG_IGN );
//addsig( SIGINT, SIG_IGN );
if( user_count == 0 )
{
stop_server = true;
break;
}
for( int i = 0; i < user_count; ++i )
{
int pid = users[i].pid;
kill( pid, SIGTERM );
}
terminate = true;
break;
}
default:
{
break;
}
}
}
}
}
else if( events[i].events & EPOLLIN )
{
int child = 0;
ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );
printf( "read data from child accross pipe\n" );
if( ret == -1 )
{
continue;
}
else if( ret == 0 )
{
continue;
}
else
{
for( int j = 0; j < user_count; ++j )
{
if( users[j].pipefd[0] != sockfd )
{
printf( "send data to child accross pipe\n" );
send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );
}
}
}
}
}
}

del_resource();
return 0;
}

上面的代码有两点需要注意:

❑虽然我们使用了共享内存,但每个子进程都只会往自己所处理的客户连接所对应的那一部分读缓存中写入数据,所以我们使用共享内存的目的只是为了“共享读”。因此,每个子进程在使用共享内存的时候都无须加锁。这样做符合“聊天室服务器”的应用场景,同时提高了程序性能。

❑我们的服务器程序在启动的时候给数组users分配了足够多的空间,使得它可以存储所有可能的客户连接的相关数据。同样,我们一次性给数组sub_process分配的空间也足以存储所有可能的子进程的相关数据。这是牺牲空间换取时间的又一例子。

7. 消息队列

消息队列是在两个进程之间传递二进制块数据的一种简单有效的方式。每个数据块都有一个特定的类型,接收方可以根据类型来有选择地接收数据,而不一定像管道和命名管道那样必须以先进先出的方式接收数据。

Linux消息队列的API都定义在sys/msg.h头文件中,包括4个系统调用:msgget、msgsnd、msgrcv和msgctl。我们将依次讨论之。

7.1 msgget系统调用

msgget系统调用创建一个消息队列,或者获取一个已有的消息队列。其定义如下:

1
2
#include<sys/msg.h>
int msgget(key_t key,int msgflg);
  • 和semget系统调用一样,key参数是一个键值,用来标识一个全局唯一的消息队列。

  • msgflg参数的使用和含义与semget系统调用的sem_flags参数相同。

  • msgget成功时返回一个正整数值,它是消息队列的标识符。msgget失败时返回-1,并设置errno。

  • 如果msgget用于创建消息队列,则与之关联的内核数据结构msqid_ds将被创建并初始化。

  • msqid_ds结构体的定义如下:

1
2
3
4
5
6
7
8
9
10
11
struct msqid_ds {
struct ipc_perm msg_perm;/*消息队列的操作权限*/
time_t msg_stime;/*最后一次调用msgsnd的时间*/
time_t msg_rtime;/*最后一次调用msgrcv的时间*/
time_t msg_ctime;/*最后一次被修改的时间*/
unsigned long __msg_cbytes;/*消息队列中已有的字节数*/
msgqnum_t msg_qnum;/*消息队列中已有的消息数*/
msglen_t msg_qbytes;/*消息队列允许的最大字节数*/
pid_t msg_lspid;/*最后执行msgsnd的进程的PID*/
pid_t msg_lrpid;/*最后执行msgrcv的进程的PID*/
};

7.2 msgsnd系统调用

msgsnd系统调用把一条消息添加到消息队列中。其定义如下:

1
2
#include<sys/msg.h>
int msgsnd(int msqid,const void*msg_ptr,size_t msg_sz,int msgflg);

msqid参数是由msgget调用返回的消息队列标识符。

msg_ptr参数指向一个准备发送的消息,消息必须被定义为如下类型:

1
2
3
4
struct msgbuf {
long mtype;/*消息类型*/
char mtext[512];/*消息数据*/
};
  • 其中,mtype成员指定消息的类型,它必须是一个正整数。
  • mtext是消息数据。
  • msg_sz参数是消息的数据部分(mtext)的长度。这个长度可以为0,表示没有消息数据。

msgflg参数控制msgsnd的行为。

  • 它通常仅支持IPC_NOWAIT标志,即以非阻塞的方式发送消息。
  • 默认情况下,发送消息时如果消息队列满了,则msgsnd将阻塞。
  • 若IPC_NOWAIT标志被指定,则msgsnd将立即返回并设置errno为EAGAIN。

处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:

❑消息队列被移除。此时msgsnd调用将立即返回并设置errno为EIDRM。

❑程序接收到信号。此时msgsnd调用将立即返回并设置errno为EINTR。

msgsnd成功时返回0,失败则返回-1并设置errno。

  • msgsnd成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

❑将msg_qnum加1。

❑将msg_lspid设置为调用进程的PID。

❑将msg_stime设置为当前的时间。

7.3 msgrcv系统调用

msgrcv系统调用从消息队列中获取消息。其定义如下:

1
2
#include<sys/msg.h>
int msgrcv(int msqid,void*msg_ptr,size_t msg_sz,long int msgtype,int msgflg);
  • msqid参数是由msgget调用返回的消息队列标识符。

  • msg_ptr参数用于存储接收的消息,msg_sz参数指的是消息数据部分的长度。

  • msgtype参数指定接收何种类型的消息。我们可以使用如下几种方式来指定消息类型:

    • msgtype等于0。读取消息队列中的第一个消息。
    • msgtype大于0。读取消息队列中第一个类型为msgtype的消息(除非指定了标志MSG_EXCEPT,见后文)。
    • msgtype小于0。读取消息队列中第一个类型值比msgtype的绝对值小的消息。
  • 参数msgflg控制msgrcv函数的行为。它可以是如下一些标志的按位或:

    • IPC_NOWAIT。如果消息队列中没有消息,则msgrcv调用立即返回并设置errno为ENOMSG。
    • MSG_EXCEPT。如果msgtype大于0,则接收消息队列中第一个非msgtype类型的消息。
    • MSG_NOERROR。如果消息数据部分的长度超过了msg_sz,就将它截断。
  • 处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:

    • 消息队列被移除。此时msgrcv调用将立即返回并设置errno为EIDRM。
    • 程序接收到信号。此时msgrcv调用将立即返回并设置errno为EINTR。
  • msgrcv成功时返回0,失败则返回-1并设置errno。msgrcv成功时将修改内核数据结构msqid_ds的部分字段,如下所示:

    • 将msg_qnum减1。
    • 将msg_lrpid设置为调用进程的PID。
    • 将msg_rtime设置为当前的时间。

7.4 msgctl系统调用

msgctl系统调用控制消息队列的某些属性。其定义如下:

1
2
#include<sys/msg.h>
int msgctl(int msqid,int command,struct msqid_ds*buf);

msqid参数是由msgget调用返回的共享内存标识符。command参数指定要执行的命令。msgctl支持的所有命令如表13-4所示。

msgctl成功时的返回值取决于command参数,如表13-4所示。msgctl函数失败时返回-1并设置errno。

8. IPC命令

上述3种System V IPC进程间通信方式都使用一个全局唯一的键值(key)来描述一个共享资源。当程序调用semget、shmget或者msgget时,就创建了这些共享资源的一个实例。

Linux提供了ipcs命令,以观察当前系统上拥有哪些共享资源实例。比如在测试机器Kongming20上执行ipcs命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$sudo ipcs
------Shared Memory Segments-------
key shmid owner perms bytes nattch status
------Semaphore Arrays-------
key semid owner perms nsems
0x00000000 196608 apache 600 1
0x00000000 229377 apache 600 1
0x00000000 262146 apache 600 1
0x00000000 294915 apache 600 1
0x00000000 327684 apache 600 1
0x00000000 360453 apache 600 1
0x00000000 393222 apache 600 1
------Message Queues-------
key msqid owner perms used-bytes messages

输出结果分段显示了系统拥有的共享内存、信号量和消息队列资源。

可见,该系统目前尚未使用任何共享内存和消息队列,却分配了一组键值为0(IPC_PRIVATE)的信号量。这些信号量的所有者是apache,因此它们是由httpd服务器程序创建的。其中标识符为393222的信号量正是我们在13.5.5小节讨论的那个用于在httpd各个子进程之间同步epoll_wait使用权的信号量。

此外,我们可以使用ipcrm命令来删除遗留在系统中的共享资源

9. 在进程间传递文件描述符

由于fork调用之后,父进程中打开的文件描述符在子进程中仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程

需要注意的是,传递一个文件描述符并不是传递一个文件描述符的值,而是要在接收进程中创建一个新的文件描述符,并且该文件描述符和发送进程中被传递的文件描述符指向内核中相同的文件表项。

那么如何把子进程中打开的文件描述符传递给父进程呢?或者更通俗地说,如何在两个不相干的进程之间传递文件描述符呢?

在Linux下,我们可以利用UNIX域socket在进程间传递特殊的辅助数据,以实现文件描述符的传递[2]。

代码清单13-5给出了一个实例,它在子进程中打开一个文件描述符,然后将它传递给父进程,父进程则通过读取该文件描述符来获得文件的内容。

代码清单13-5 在进程间传递文件描述符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include<sys/socket.h>
#include<fcntl.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>
#include<string.h>

static const int CONTROL_LEN=CMSG_LEN(sizeof(int));

/*发送文件描述符,fd参数是用来传递信息的UNIX域socket,fd_to_send参数是待发送的文件描述符*/
void send_fd(int fd,int fd_to_send) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base=buf;
iov[0].iov_len=1;
msg.msg_name=NULL;
msg.msg_namelen=0;
msg.msg_iov=iov;
msg.msg_iovlen=1;
cmsghdr cm;
cm.cmsg_len=CONTROL_LEN;
cm.cmsg_level=SOL_SOCKET;
cm.cmsg_type=SCM_RIGHTS;
*(int*)CMSG_DATA(&cm)=fd_to_send;
msg.msg_control=&cm;/*设置辅助数据*/
msg.msg_controllen=CONTROL_LEN;
sendmsg(fd,&msg,0);
}

/*接收目标文件描述符*/
int recv_fd(int fd) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base=buf;
iov[0].iov_len=1;
msg.msg_name=NULL;
msg.msg_namelen=0;
msg.msg_iov=iov;
msg.msg_iovlen=1;
cmsghdr cm;
msg.msg_control=&cm;
msg.msg_controllen=CONTROL_LEN;
recvmsg(fd,&msg,0);
int fd_to_read=*(int*)CMSG_DATA(&cm);
return fd_to_read;
}

int main() {
int pipefd[2];
int fd_to_pass=0;
/*创建父、子进程间的管道,文件描述符pipefd[0]和pipefd[1]都是UNIX域socket*/
int ret=socketpair(PF_UNIX,SOCK_DGRAM,0,pipefd);
assert(ret!=-1);
pid_t pid=fork();
assert(pid>=0);
if(pid==0) {
close(pipefd[0]);
fd_to_pass=open("test.txt",O_RDWR,0666);
/*子进程通过管道将文件描述符发送到父进程。如果文件test.txt打开失败,则子进程将标准输入文件描述符发送到父进程*/
send_fd(pipefd[1],(fd_to_pass>0)?fd_to_pass:0);
close(fd_to_pass);
exit(0);
}
close(pipefd[1]);
fd_to_pass=recv_fd(pipefd[0]);/*父进程从管道接收目标文件描述符*/
char buf[1024];
memset(buf,'\0',1024);
read(fd_to_pass,buf,1024);/*读目标文件描述符,以验证其有效性*/
printf("I got fd%d and data%s\n",fd_to_pass,buf);
close(fd_to_pass);
}

[2]在Linux下,UNIX域socket除了可以传递文件描述符之外,还可以传递其他类型的辅助数据。


13. 多进程编程
http://binbo-zappy.github.io/2024/12/16/Linux高性能服务器编程-游双/13-多进程编程/
作者
Binbo
发布于
2024年12月16日
许可协议