9. I/O复用

第9章 I/O复用

I/O复用使得程序能同时监听多个文件描述符。网络程序在下列情况下需要使用I/O复用技术:

❑ 客户端程序要同时处理多个socket。比如本章将要讨论的非阻塞connect技术。

❑ 客户端程序要同时处理用户输入和网络连接。比如本章将要讨论的聊天室程序。

❑ TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合。

❑ 服务器要同时处理TCP请求和UDP请求。比如本章将要讨论的回射服务器。

❑ 服务器要同时监听多个端口,或者处理多种服务。比如本章将要讨论的xinetd服务器。

需要指出的是,I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。

  • 并且当多个文件描述符同时就绪时,
  • 如果不采取额外的措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得服务器程序看起来像是串行工作的。
  • 如果要实现并发,只能使用多进程或多线程等编程手段。

1. select系统调用

select系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

1.1 select API

select系统调用的原型如下:

1
2
#include<sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);

  • nfds参数指定被监听的文件描述符的总数。它通常被设置为select监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。

  • readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。

    • 应用程序调用select函数时,通过这3个参数传入自己感兴趣的文件描述符。
    • select调用返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。
    • 这3个参数是fd_set结构指针类型。fd_set结构体的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include<typesizes.h>
#define __FD_SETSIZE 1024
#include<sys/select.h>
#define FD_SETSIZE __FD_SETSIZE
typedef long int __fd_mask;
#undef __NFDBITS
#define __NFDBITS (8*(int)sizeof(__fd_mask))
typedef struct {
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];
#define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE/__NFDBITS];
#define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;

fd_set结构体仅包含一个整型数组,该数组的每个元素的每一位(bit)标记一个文件描述符。

fd_set能容纳的文件描述符数量由FD_SETSIZE指定,这就限制了select能同时处理的文件描述符的总量。

由于位操作过于繁琐,我们应该使用下面的一系列宏来访问fd_set结构体中的位:

1
2
3
4
5
#include<sys/select.h>
FD_ZERO(fd_set* fdset); /*清除fdset的所有位*/
FD_SET(int fd, fd_set* fdset); /*设置fdset的位fd*/
FD_CLR(int fd, fd_set* fdset); /*清除fdset的位fd*/
int FD_ISSET(int fd, fd_set* fdset); /*测试fdset的位fd是否被设置*/

  • timeout参数用来设置select函数的超时时间。
    • 它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。
    • 不过我们不能完全信任select调用返回后的timeout值,比如调用失败时timeout值是不确定的。timeval结构体的定义如下:
1
2
3
4
struct timeval {
long tv_sec; /*秒数*/
long tv_usec; /*微秒数*/
};

由以上定义可见,select给我们提供了一个微秒级的定时方式。

  • 如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。

  • 如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。

  • select成功时返回就绪(可读、可写和异常)文件描述符的总数。

    • 如果在超时时间内没有任何文件描述符就绪,select将返回0。
    • select失败时返回-1并设置errno。
    • 如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。

1.2 文件描述符就绪条件

哪些情况下文件描述符可以被认为是可读、可写或者出现异常,对于select的使用非常关键。在网络编程中,下列情况下socket可读:

❑ socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。

❑ socket通信的对方关闭连接。此时对该socket的读操作将返回0。

❑ 监听socket上有新的连接请求。

❑ socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。

下列情况下socket可写:

❑ socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。

❑ socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。

❑ socket使用非阻塞connect连接成功或者失败(超时)之后。

❑ socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。

网络程序中,select能处理的异常情况只有一种:socket上接收到带外数据。下面我们详细讨论之。

1.3 处理带外数据

上一小节提到,socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。代码清单9-1描述了select是如何同时处理二者的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>

int main(int argc, char *argv[])
{
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
printf("ip is %s and port is %d\n", ip, port);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);

int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
if(connfd < 0) {
printf("errno is: %d\n", errno);
close(listenfd);
}

char remote_addr[INET_ADDRSTRLEN];
printf("connected with ip: %s and port: %d\n",
inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN),
ntohs(client_address.sin_port));

char buf[1024];
fd_set read_fds;
fd_set exception_fds;

FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);

int nReuseAddr = 1;
setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE,
&nReuseAddr, sizeof(nReuseAddr));
while(1) {
memset(buf, '\0', sizeof(buf));
FD_SET(connfd, &read_fds);
FD_SET(connfd, &exception_fds);

ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
printf("select one\n");
if(ret < 0) {
printf("selection failure\n");
break;
}
if(FD_ISSET(connfd, &read_fds)) {
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if(ret <= 0) {
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
} else if(FD_ISSET(connfd, &exception_fds)) {
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
if(ret <= 0) {
break;
}
printf("get %d bytes of oob data: %s\n", ret, buf);
}
}
close(connfd);
close(listenfd);
return 0;
}

2. poll系统调用

poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。poll的原型如下:

1
2
#include<poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);

  1. fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd结构体的定义如下:
1
2
3
4
5
struct pollfd {
int fd; /*文件描述符*/
short events; /*注册的事件*/
short revents; /*实际发生的事件,由内核填充*/
};
  • fd成员指定文件描述符;
  • events成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或;
  • revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。
  • poll支持的事件类型如表9-1所示。

表9-1中,POLLRDNORM、POLLRDBAND、POLLWRNORM、POLLWRBAND由XOPEN规范定义。它们实际上是将POLLIN事件和POLLOUT事件分得更细致,以区别对待普通数据和优先数据。但Linux并不完全支持它们。

通常,应用程序需要根据recv调用的返回值来区分socket上接收到的是有效数据还是对方关闭连接的请求,并做相应的处理。

  • 不过,自Linux内核2.6.17开始,GNU为poll系统调用增加了一个POLLRDHUP事件,它在socket上接收到对方关闭连接的请求之后触发。
  • 这为我们区分上述两种情况提供了一种更简单的方式。
  • 但使用POLLRDHUP事件时,我们需要在代码最开始处定义_GNU_SOURCE。
  1. nfds参数指定被监听事件集合fds的大小。其类型nfds_t的定义如下:
1
typedef unsigned long int nfds_t;
  1. timeout参数指定poll的超时值,单位是毫秒。
  • 当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;
  • 当timeout为0时,poll调用将立即返回。
  1. poll系统调用的返回值的含义与select相同。

3. epoll系列系统调用

3.1 内核事件表

  • epoll是Linux特有的I/O复用函数。
  • 它在实现和使用上与select、poll有很大差异。
    • 首先,epoll使用一组函数来完成任务,而不是单个函数。
    • 其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。
    • 但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create函数来创建:
1
2
#include<sys/epoll.h>
int epoll_create(int size)
  • size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。
  • 该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。

下面的函数用来操作epoll的内核事件表:

1
2
#include<sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)

  • fd参数是要操作的文件描述符,op参数则指定操作类型。操作类型有如下3种:

❑ EPOLL_CTL_ADD,往事件表中注册fd上的事件。

❑ EPOLL_CTL_MOD,修改fd上的注册事件。

❑ EPOLL_CTL_DEL,删除fd上的注册事件。

  • event参数指定事件,它是epoll_event结构指针类型。epoll_event的定义如下:
1
2
3
4
struct epoll_event {
__uint32_t events; /*epoll事件*/
epoll_data_t data; /*用户数据*/
};
  • 其中events成员描述事件类型。
  • epoll支持的事件类型和poll基本相同。
  • 表示epoll事件类型的宏是在poll对应的宏前加上“E”,比如epoll的数据可读事件是EPOLLIN。
  • 但epoll有两个额外的事件类型——EPOLLET和EPOLLONESHOT。
  • 它们对于epoll的高效运作非常关键
  • data成员用于存储用户数据,其类型epoll_data_t的定义如下:
1
2
3
4
5
6
typedef union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
  • epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标文件描述符。

  • ptr成员可用来指定与fd相关的用户数据。

  • 但由于epoll_data_t是一个联合体,我们不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来(正如8.5.2小节讨论的将句柄和事件处理器绑定一样),以实现快速的数据访问,只能使用其他手段,比如放弃使用epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。

  • epoll_ctl成功时返回0,失败则返回-1并设置errno。

3.2 epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

1
2
#include<sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno。

  • timeout参数的含义与poll接口的timeout参数相同。
  • maxevents参数指定最多监听多少个事件,它必须大于0。

epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。

这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。

这就极大地提高了应用程序索引就绪文件描述符的效率。代码清单9-2体现了这个差别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*如何索引poll返回的就绪文件描述符*/
int ret=poll(fds,MAX_EVENT_NUMBER,-1);
/*必须遍历所有已注册文件描述符并找到其中的就绪者(当然,可以利用ret来稍做优化)*/
for(int i=0; i<MAX_EVENT_NUMBER; ++i) {
if(fds[i].revents&POLLIN) /*判断第i个文件描述符是否就绪*/ {
int sockfd=fds[i].fd;
/*处理sockfd*/
}
}
/*如何索引epoll返回的就绪文件描述符*/
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
/*仅遍历就绪的ret个文件描述符*/
for(int i=0; i<ret; i++) {
int sockfd=events[i].data.fd;
/*sockfd肯定就绪,直接处理*/
}

3.3 LT和ET模式

epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。

  • LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。
  • 当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。
  • ET模式是epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。

  • 这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。
  • 而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。

可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。代码清单9-3体现了LT和ET在工作方式上的差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>

// 定义最大事件数
#define MAX_EVENT_NUMBER 1024
// BUFFER大小
#define BUFFER_SIZE 10

void addfd( int epollfd, int fd, bool enable_et );
void lt(epoll_event* events, int number, int epollfd, int listenfd);
void et(epoll_event* events, int number, int epollfd, int listenfd);
int main(int argc, char* argv[])
{
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
// 第一个参数为ip地址
const char* ip = argv[1];
// 第二个参数为端口号
int port = atoi(argv[2]);

int ret = 0;
// TCP/IP协议族 ipv4专用socket地址结构体
struct sockaddr_in address;
// 初始化为0
bzero(&address, sizeof(address));
address.sin_family = AF_INET; // TCP/IPv4协议族,地址族
inet_pton(AF_INET, ip, &address.sin_addr); //字符串表示的IP地址IP地址转换成用网络字节序整数表示的IP地址
address.sin_port = htons(port); // 端口号,用网络字节序表示,主机字节序数据转化为网络字节序数据

// 创建socket, 流服务, 0表示默认协议,成功返回文件描述符
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

//命名socket 将一个socket与socket地址绑定称为给socket命名
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

//创建一个监听队列以存放待处理的客户连接, backlog参数提示内核监听队列的最大长度。
ret = listen(listenfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
// epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表,
// size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 注册事件
addfd(epollfd, listenfd, true);

while(1) {
// 在一段超时时间内等待一组文件描述符上的事件,
// 函数成功时返回就绪的文件描述符的个数,
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(ret < 0) {
printf("epoll failure\n");
break;
}

// lt(events, ret, epollfd, listenfd);
et(events, ret, epollfd, listenfd);
}
close(listenfd);
return 0;
}

// 将文件描述符设置成非阻塞的
int setnonblocking(int fd) {
// 获取fd的标志状态
int old_option = fcntl(fd, F_GETFL);
// 设置非阻塞标志
int new_option = old_option | O_NONBLOCK;
// 设置fd的状态标志
fcntl(fd, F_SETFL, new_option);
// 返回旧标志,以便日后恢复该状态标志
return old_option;
}

// 将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,
// 参数enable_et指定是否对fd启用ET模式
void addfd( int epollfd, int fd, bool enable_et ) {
epoll_event event;
event.data.fd = fd;
// epoll的数据可读事件是EPOLLIN
event.events = EPOLLIN;
if(enable_et) {
event.events |= EPOLLET;
}
// 往事件表中注册fd上的事件
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

// LT模式的工作流程
void lt( epoll_event* events, int number, int epollfd, int listenfd ){
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
printf("i: %d, sockfd: %d, listenfd:%d\n", i, sockfd, listenfd);
if (sockfd == listenfd) {
printf("i: %d, sockfd: %d, listenfd:%d\n", i, sockfd, listenfd);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
//从listen监听队列中接受一个连接
int connfd = accept(listenfd, (struct sockaddr*)&client_address,
&client_addrlength);
// 对connfd禁用ET模式
addfd(epollfd, connfd, false);
} else if (events[i].events & EPOLLIN) {
// 只要socket读缓存中还有未读出的数据,这段代码就被触发
printf("i: %d, sockfd: %d, listenfd:%d\n", i, sockfd, listenfd);
printf("event trigger once\n");
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret <= 0) {
close(sockfd);
continue;
}
printf("get %d bytes of content: %s\n", ret, buf);
} else {
printf("something else happened \n");
}

}

}

void et(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
// 对connfd开启ET模式
addfd(epollfd, connfd, true);
} else if (events[i].events & EPOLLIN) {
printf("event trigger once\n");
// 这段代码不会被重复触发,所以我们循环读取数据,以确保把socket读缓存中的所有数据读出
while(1) {
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret < 0) {
// 对于非阻塞IO,下面的条件成立表示数据已经全部读取完毕。
// 此后,epoll就能再次触发sockfd上的EPOLLIN事件,以驱动下一次读操作
if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
printf("read later\n");
break;
}
close(sockfd);
break;
} else if (ret == 0) {
close(sockfd);
} else {
printf("get %d bytes of content: %s\n", ret, buf);
}
}
} else {
printf("something else happened\n");
}
}
}

运行一下这段代码,然后telnet到这个服务器程序上并一次传输超过10字节(BUFFER_SIZE的大小)的数据,然后比较LT模式和ET模式的异同。ET模式下事件被触发的次数要比LT模式下少很多。

注意 每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的事件而一直处于阻塞状态(饥渴状态)。

3.4 EPOLLONESHOT事件

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

代码清单9-4展示了EPOLLONESHOT事件的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

// 定义最大事件数
#define MAX_EVENT_NUMBER 1024
// BUFFER大小
#define BUFFER_SIZE 10
void* worker(void* arg);
// 定义epoll事件结构体
struct fds {
int epollfd;
int sockfd;
};

// 将文件描述符设置成非阻塞的
int setnonblocking(int fd) {
// 获取fd的标志状态
int old_option = fcntl(fd, F_GETFL);
// 设置非阻塞标志
int new_option = old_option | O_NONBLOCK;
// 设置fd的状态标志
fcntl(fd, F_SETFL, new_option);
// 返回旧标志,以便日后恢复该状态标志
return old_option;
}

// 将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,
// 参数oneshot指定是否注册fd上的EPOLLONESHOT事件
void addfd( int epollfd, int fd, bool oneshot ) {
epoll_event event;
event.data.fd = fd;
// epoll的数据可读事件是EPOLLIN
event.events = EPOLLIN | EPOLLET;
if(oneshot) {
event.events |= EPOLLONESHOT;
}
// 往事件表中注册fd上的事件
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
// 注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,
// 否则应用程序只能处理一个客户连接!
// 因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件
addfd( epollfd, listenfd, false );

while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if (ret < 0 ) {
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < ret; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
// 对每个非监听文件描述符都注册EPOLLONESHOT事件
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN )
{
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
// 新启动一个工作线程为sockfd服务
pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
}
else
{
printf( "something else happened \n" );
}
}
}

close( listenfd );
return 0;
}

void reset_oneshot(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}
void* worker(void* arg) {
int sockfd = ((fds*)arg)->sockfd;
int epollfd = ((fds*)arg)->epollfd;
printf("start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, '\0', BUFFER_SIZE);
while(1) {
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret == 0) {
close(sockfd);
printf("foreiner closed the connection\n");
break;
} else if (ret < 0) {
if(errno == EAGAIN) {
reset_oneshot(epollfd, sockfd);
printf("read later\n");
break;
}
} else {
printf("get content: %s\n", buf);
sleep(5);
}
}
printf("end thread receiving data on fd: %d\n", sockfd);
}
  • 从工作线程函数worker来看,如果一个工作线程处理完某个socket上的一次请求(我们用休眠5 s来模拟这个过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务。
  • 并且因为该socket上注册了EPOLLONESHOT事件,其他线程没有机会接触这个socket,如果工作线程等待5 s后仍然没收到该socket上的下一批客户数据,则它将放弃为该socket服务。
  • 同时,它调用reset_oneshot函数来重置该socket上的注册事件,这将使epoll有机会再次检测到该socket上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。

由此看来,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务。这就保证了连接的完整性,从而避免了很多可能的竞态条件。

4. 三组I/O复用函数的比较

前面讨论了select、poll和epoll三组I/O复用系统调用,

  • 这3组系统调用都能同时监听多个文件描述符。
  • 它们将等待由timeout参数指定的超时时间,直到一个或者多个文件描述符上有事件发生时返回,
  • 返回值是就绪的文件描述符的数量。返回0表示没有事件发生。

现在我们从事件集、最大支持文件描述符数、工作模式和具体实现等四个方面进一步比较它们的异同,以明确在实际应用中应该选择使用哪个(或哪些)。

这3组函数都通过某种结构体变量来告诉内核监听哪些文件描述符上的哪些事件,并使用该结构体类型的参数来获取内核处理的结果。

  • select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select需要提供3个这种类型的参数来分别传入和输出可读、可写及异常等事件。
    • 这一方面使得select不能处理更多类型的事件,另一方面由于内核对fd_set集合的在线修改,应用程序下次调用select前不得不重置这3个fd_set集合。
  • poll的参数类型pollfd则多少“聪明”一些。它把文件描述符和事件都定义其中,任何事件都被统一处理,从而使得编程接口简洁得多。
    • 并且内核每次修改的是pollfd结构体的revents成员,而events成员保持不变,因此下次调用poll时应用程序无须重置pollfd类型的事件集参数。
    • 由于每次select和poll调用都返回整个用户注册的事件集合(其中包括就绪的和未就绪的),所以应用程序索引就绪文件描述符的时间复杂度为O(n)。
  • epoll则采用与select和poll完全不同的方式来管理用户注册的事件。它在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除、修改事件。
    • 这样,每次epoll_wait调用都直接从该内核事件表中取得用户注册的事件,而无须反复从用户空间读入这些事件。
    • epoll_wait系统调用的events参数仅用来返回就绪的事件,这使得应用程序索引就绪文件描述符的时间复杂度达到O(1)。
  1. poll和epoll_wait分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件。
  2. 这两个数值都能达到系统允许打开的最大文件描述符数目,即65 535(cat/proc/sys/fs/file-max)。
  3. 而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但这可能导致不可预期的后果。
  • select和poll都只能工作在相对低效的LT模式,而epoll则可以工作在ET高效模式。
  • 并且epoll还支持EPOLLONESHOT事件。该事件能进一步减少可读、可写和异常等事件被触发的次数。
  1. 从实现原理上来说,select和poll采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。
  2. epoll_wait则不同,它采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪事件队列。
  3. 内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。因此epoll_wait无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度是O(1)。
  4. 但是,当活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。
  5. 所以epoll_wait适用于连接数量多,但活动连接较少的情况。

最后,为了便于阅读,我们将这3组I/O复用系统调用的区别总结于表9-2中。

5. I/O复用的高级应用一:非阻塞connect

connect系统调用的man手册中有如下一段内容:

1
EINPROGRESS The socket is nonblocking and the connection cannot be completed immediately. It is possible to select(2) or poll(2) for completion by selecting the socket for writing. After select(2) indicates writability, use getsockopt(2) to read the SO_ERROR option at level SOL_SOCKET to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed here, explaining the reason for the failure).
- 这段话描述了connect出错时的一种errno值:EINPROGRESS。 - 这种错误发生在对非阻塞的socket调用connect,而连接又没有立即建立时。 - 根据man文档的解释,在这种情况下,我们可以调用select、poll等函数来监听这个连接失败的socket上的可写事件。 - 当select、poll等函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。 - 如果错误码是0,表示连接成功建立,否则连接失败。

通过上面描述的非阻塞connect方式,我们就能同时发起多个连接并一起等待。下面看看非阻塞connect的一种实现,如代码清单9-5所示。

代码清单9-5 非阻塞connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <string.h>

#define BUFFER_SIZE 1023

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
// 超时连接函数,参数分别是服务器IP地址、端口号和超时时间(毫秒)。函数成功时
// 返回已经处于连接状态的socket,失败则返回-1
int unblock_connect( const char* ip, int port, int time ){
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int sockfd = socket(PF_INET, SOCK_STREAM, 0);
int fdopt = setnonblocking(sockfd);
ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
if(ret == 0){
// 如果连接成功,则恢复sockfd的属性,并立即返回之
printf("connect with server immediately!\n");
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
} else if (errno != EINPROGRESS) {
// 如果连接没有立即建立,那么只有当errno是EINPROGRESS时才表示连接还在进
// 行,否则出错返回
printf("unblock connect not support!\n");
return -1;
}

fd_set readfds;
fd_set writefds;
struct timeval timeout;

FD_ZERO(&readfds);
FD_SET(sockfd, &writefds);

timeout.tv_sec = time;
timeout.tv_usec = 0;

ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
if(ret <= 0) {
printf("connection timeout\n");
close(sockfd);
return -1;
}

if(!FD_ISSET(sockfd, &writefds)) {
printf("no events on sockfd found\n");
close(sockfd);
return -1;
}

int error = 0;
socklen_t length = sizeof(error);
if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) {
printf("get socket option failed\n");
close(sockfd);
return -1;
}

if(error != 0) {
printf( "connection failed after select with the error: %d \n", error );
close(sockfd);
return -1;
}
printf( "connection ready after select with the socket: %d \n", sockfd );
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int sockfd = unblock_connect(ip, port, 10);
if(sockfd < 0) {
return 1;
}
shutdown(sockfd, SHUT_WR);
sleep(200);
printf("send data out\n");
send(sockfd, "abc", 3, 0);
return 0;
}
- 但遗憾的是,这种方法存在几处移植性问题。首先,非阻塞的socket可能导致connect始终失败。 - 其次,select对处于EINPROGRESS状态下的socket可能不起作用。 - 最后,对于出错的socket,getsockopt在有些系统(比如Linux)上返回-1(正如代码清单9-5所期望的),而在有些系统(比如源自伯克利的UNIX)上则返回0。这些问题没有统一的解决方法

6. I/O复用的高级应用二:聊天室程序

像ssh这样的登录服务通常要同时处理网络连接和用户输入,这也可以可以使用I/O复用来实现。本节我们以poll为例实现一个简单的聊天室程序,以阐述如何使用I/O复用技术来同时处理网络连接和用户输入。该聊天室程序能让所有用户同时在线群聊,它分为客户端和服务器两个部分。其中客户端程序有两个功能:一是从标准输入终端读入用户数据,并将用户数据发送至服务器;二是往标准输出终端打印服务器发送给它的数据。服务器的功能是接收客户数据,并把客户数据发送给每一个登录到该服务器上的客户端(数据发送者除外)。下面我们依次给出客户端程序和服务器程序的代码。

6.1 客户端

客户端程序使用poll同时监听用户输入和网络连接,并利用splice函数将用户输入内容直接定向到网络连接上以发送之,从而实现数据零拷贝,提高了程序执行效率。客户端程序如代码清单9-6所示。

代码清单9-6 聊天室客户端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>

#define BUFFER_SIZE 64

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

struct sockaddr_in server_address;
bzero( &server_address, sizeof( server_address ) );
server_address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address.sin_addr );
server_address.sin_port = htons( port );

int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( sockfd >= 0 );
if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
{
printf( "connection failed\n" );
close( sockfd );
return 1;
}

pollfd fds[2];
fds[0].fd = 0;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP; //在socket上接收到对方关闭连接的请求之后触发
fds[1].revents = 0;
char read_buf[BUFFER_SIZE];
int pipefd[2];
int ret = pipe(pipefd);
assert(ret != -1);
while(1) {
ret = poll(fds, 2, -1);
if( ret < 0 )
{
printf( "poll failure\n" );
break;
}

if( fds[1].revents & POLLRDHUP )
{
printf( "server close the connection\n" );
break;
}
else if( fds[1].revents & POLLIN )
{
memset( read_buf, '\0', BUFFER_SIZE );
recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
printf( "%s\n", read_buf );
}

if(fds[0].revents & POLLIN) {
ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
}
}
close(sockfd);
return 0;
}

6.2 服务器

服务器程序使用poll同时管理监听socket和连接socket,并且使用牺牲空间换取时间的策略来提高服务器性能,如代码清单9-7所示。

代码清单9-7 聊天室服务器程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

struct client_data {
sockaddr_in address;
char* write_buf;
char buf[BUFFER_SIZE];
};

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

// 创建users数组,分配FD_LIMIT个client_data对象。
// 可以预期:每个可能的socket连接都可以获得一个这样的对象,
// 并且socket的值可以直接用来索引(作为数组的下标)
// socket连接对应的client_data对象,
// 这是将socket和客户数据关联的简单而高效的方式
client_data* users = new client_data[FD_LIMIT];
// 尽管我们分配了足够多的client_data对象,但为了提高poll的性能,
// 仍然有必要限制用户的数量
pollfd fds[USER_LIMIT+1];
int user_counter = 0;
for(int i = 1; i <= USER_LIMIT; ++i) {
fds[i].fd = -1;
fds[i].events = 0;
}
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;

while(1) {
ret = poll(fds, user_counter + 1, -1);
if( ret < 0 ) {
printf("poll failure\n");
break;
}

for(int i = 0; i < user_counter + 1; ++i) {
if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
if(connfd < 0) {
printf("errno is %d\n", errno);
continue;
}
if(user_counter >= USER_LIMIT) {
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
user_counter++;
users[connfd].address = client_address;
setnonblocking(connfd);
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
fds[user_counter].revents = 0;
printf("comes a new user, now have %d users\n", user_counter);

} else if (fds[i].revents & POLLERR) {
printf("get an error from %d\n", fds[i].fd);
char errors[100];
memset(errors, '\0', 100);
socklen_t length = sizeof(errors);
if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
{
printf( "get socket option failed\n" );
}
continue;
} else if(fds[i].revents & POLLRDHUP) {
users[fds[i].fd] = users[fds[user_counter].fd];
close(fds[i].fd);
fds[i] = fds[user_counter];
i--;
user_counter--;
printf("a client left\n");
} else if(fds[i].revents & POLLIN) {
int connfd = fds[i].fd;
memset(users[connfd].buf, '\0', BUFFER_SIZE);
ret = recv(connfd, users[connfd].buf, BUFFER_SIZE - 1, 0);
if(ret < 0) {
if(errno != EAGAIN) {
close(connfd);
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
i--;
user_counter--;
}
} else if(ret == 0) {

printf( "code should not come to here\n" );
} else {
for(int j = 1; j <= user_counter; ++j) {
if(fds[j].fd == connfd) {
continue;
}
fds[j].events |= ~POLLIN;
fds[j].events |= POLLOUT;
users[fds[j].fd].write_buf = users[connfd].buf;
}

}
} else if(fds[i].revents & POLLOUT) {
int connfd = fds[i].fd;
if(!users[connfd].write_buf) {
continue;
}
ret = send(connfd, users[connfd].write_buf,
strlen(users[connfd].write_buf), 0);
users[connfd].write_buf = NULL;
fds[i].events |= ~POLLOUT;
fds[i].events |= POLLIN;

}
}
}
delete [] users;
close(listenfd);
return 0;
}

7. I/O复用的高级应用三:同时处理TCP和UDP服务

至此,我们讨论过的服务器程序都只监听一个端口。在实际应用中,有不少服务器程序能同时监听多个端口,比如超级服务inetd和android的调试服务adbd。

  • 从bind系统调用的参数来看,一个socket只能与一个socket地址绑定,即一个socket只能用来监听一个端口。
    • 因此,服务器如果要同时监听多个端口,就必须创建多个socket,并将它们分别绑定到各个端口上。
    • 这样一来,服务器程序就需要同时管理多个监听socket,I/O复用技术就有了用武之地。
  • 另外,即使是同一个端口,如果服务器要同时处理该端口上的TCP和UDP请求,则也需要创建两个不同的socket:一个是流socket,另一个是数据报socket,并将它们都绑定到该端口上。

比如代码清单9-8所示的回射服务器就能同时处理一个端口上的TCP和UDP请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

void addfd( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
//event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN;
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );
assert( udpfd >= 0 );

ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create( 5 );
assert(epollfd != -1);
addfd(epollfd, listenfd);
addfd(epollfd, udpfd);

while(1) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if( number < 0 ) {
printf("epoll failure\n");
break;
}

for(int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
} else if (sockfd == udpfd) {
char buf[UDP_BUFFER_SIZE];
memset(buf, '\0', UDP_BUFFER_SIZE);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, &client_addrlength);
if(ret > 0) {
sendto(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, client_addrlength);
}
} else if(events[i].events & EPOLLIN) {
char buf[TCP_BUFFER_SIZE];
while(1) {
memset(buf, '\0', TCP_BUFFER_SIZE);
ret = recv(sockfd, buf, TCP_BUFFER_SIZE - 1, 0);
if(ret < 0) {
if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
}
close(sockfd);
break;
} else if(ret == 0) {
close(sockfd);
} else {
send(sockfd, buf, ret, 0);
}
}
} else {
printf("something else happened \n");
}
}
}
close(listenfd);
return 0;
}

8. 超级服务xinetd

  • Linux因特网服务inetd是超级服务。它同时管理着多个子服务,即监听多个端口。
  • 现在Linux系统上使用的inetd服务程序通常是其升级版本xinetd。
  • xinetd程序的原理与inetd相同,但增加了一些控制选项,并提高了安全性。

下面我们从配置文件和工作流程两个方面对xinetd进行介绍。

8.1 xinetd配置文件

xinetd采用/etc/xinetd.conf主配置文件和/etc/xinetd.d目录下的子配置文件来管理所有服务。

主配置文件包含的是通用选项,这些选项将被所有子配置文件继承。

不过子配置文件可以覆盖这些选项。每一个子配置文件用于设置一个子服务的参数。

比如,telnet子服务的配置文件/etc/xinetd.d/telnet的典型内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
#default:on
#description:The telnet server serves telnet sessions;it uses
#unencrypted username/password pairs for authentication.
service telnet {
flags=REUSE
socket_type=stream
wait=no
user=root
server=/usr/sbin/in.telnetd
log_on_failure+=USERID
disable=no
}

/etc/xinetd.d/telnet文件中的每一项的含义如表9-3所示。

xinetd配置文件的内容相当丰富,远不止上面这些。读者可参考其man文档来获得更多信息。

8.2 xinetd工作流程

xinetd管理的子服务中有的是标准服务,比如时间日期服务daytime、回射服务echo和丢弃服务discard。xinetd服务器在内部直接处理这些服务。还有的子服务则需要调用外部的服务器程序来处理。xinetd通过调用fork和exec函数来加载运行这些服务器程序。比如telnet、ftp服务都是这种类型的子服务。我们仍以telnet服务为例来探讨xinetd的工作流程。

首先,查看xinetd守护进程的PID(下面的操作都在测试机器Kongming20上执行):

1
2
$ cat /var/run/xinetd.pid
9543
然后开启两个终端并分别使用如下命令telnet到本机:
1
$ telnet 192.168.1.109
接下来使用ps命令查看与进程9543相关的进程:
1
2
3
4
5
$ ps -eo pid,ppid,pgid,sid,comm | grep 9543
PID PPID PGID SESS COMMAND
9543 1 9543 9543 xinetd
9810 9543 9810 9810 in.telnetd
10355 9543 10355 10355 in.telnetd
由此可见,我们每次使用telnet登录到xinetd服务,它都创建一个子进程来为该telnet客户服务。

子进程运行in.telnetd程序,这是在/etc/xinetd.d/telnet配置文件中定义的。

每个子进程都处于自己独立的进程组和会话中。

我们可以使用lsof命令(见第17章)进一步查看子进程都打开了哪些文件描述符:

1
2
3
4
$ sudo lsof -p 9810 # 以子进程9810为例
in.telnet 9810 root 0u IPv4 48189 0t0 TCP Kongming20:telnet->Kongming20:38763 (ESTABLISHED)
in.telnet 9810 root 1u IPv4 48189 0t0 TCP Kongming20:telnet->Kongming20:38763 (ESTABLISHED)
in.telnet 9810 root 2u IPv4 48189 0t0 TCP Kongming20:telnet->Kongming20:38763 (ESTABLISHED)

这里省略了一些无关的输出。通过lsof的输出我们知道,子进程9810关闭了其标准输入、标准输出和标准错误,而将socket文件描述符dup到它们上面。因此,telnet服务器程序将网络连接上的输入当作标准输入,并把标准输出定向到同一个网络连接上。

再进一步,对xinetd进程使用lsof命令:

1
2
$ sudo lsof -p 9543
xinetd 9543 root 5u IPv6 47265 0t0 TCP *:telnet (LISTEN)
这一条输出说明xinetd将一直监听telnet连接请求,因此in.telnetd子进程只处理连接socket,而不处理监听socket。这是子配置文件中的wait参数所定义的行为。

对于内部标准服务,xinetd的处理流程也可以用上述方法来分析,这里不再赘述。

综合上面讨论的,我们将xinetd的工作流程(wait选项的值是no的情况)绘制为图9-1所示的形式。


9. I/O复用
http://binbo-zappy.github.io/2024/12/16/Linux高性能服务器编程-游双/9-IO复用/
作者
Binbo
发布于
2024年12月16日
许可协议