11. 定时器

第11章 定时器

网络程序需要处理的第三类事件是定时事件。服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。

为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,比如链表、排序链表和时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。

本章讨论的就是两种高效的管理定时器的容器:时间轮和时间堆。

先介绍定时的方法。定时是指在一段时间之后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器。换言之,定时机制是定时器得以被处理的原动力。

Linux提供了三种定时方法,它们是:

❑socket选项SO_RCVTIMEO和SO_SNDTIMEO。

❑SIGALRM信号。

❑I/O复用系统调用的超时参数。

1. socket选项SO_RCVTIMEO和SO_SNDTIMEO

socket选项SO_RCVTIMEO和SO_SNDTIMEO,它们分别用来设置socket接收数据超时时间和发送数据超时时间。

因此,这两个选项仅对与数据接收和发送相关的socket专用系统调用(socket专用的系统调用指的是5.2~5.11节介绍的那些socket API)有效,这些系统调用包括send、sendmsg、recv、recvmsg、accept和connect。

由表11-1可见,在程序中,我们可以根据系统调用(send、sendmsg、recv、recvmsg、accept和connect)的返回值以及errno来判断超时时间是否已到,进而决定是否开始处理定时任务。

代码清单11-1以connect为例,说明程序中如何使用SO_SNDTIMEO选项来定时。

代码清单11-1 设置connect超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

int timeout_connect( const char* ip, int port, int time ) {
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( sockfd >= 0 );

struct timeval timeout;
timeout.tv_sec = time;
timeout.tv_usec = 0;
socklen_t len = sizeof( timeout );
ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
assert( ret != -1 );
ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
if (ret == -1) {
if(errno == EINPROGRESS) {
printf("connecting timeout\n");
return -1;
}
printf("error occur when connecting to server\n");
return -1;
}

return sockfd;

}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int sockfd = timeout_connect( ip, port, 10 );
if ( sockfd < 0 )
{
return 1;
}
return 0;
}

2. SIGALRM信号

  • 第10章提到,由alarm和setitimer函数设置的实时闹钟一旦超时,将触发SIGALRM信号。
  • 因此,我们可以利用该信号的信号处理函数来处理定时任务。
  • 但是,如果要处理多个定时任务,我们就需要不断地触发SIGALRM信号,并在其信号处理函数中执行到期的任务。
  • 一般来说,SIGALRM信号按照固定的频率生成,即由alarm或setitimer函数设置的定时周期T保持不变。
  • 如果某个定时任务的超时时间不是T的整数倍,那么它实际被执行的时间和预期的时间将略有偏差。
  • 因此定时周期T反映了定时的精度。

2.1 基于升序链表的定时器

  • 定时器通常至少要包含两个成员:一个超时时间(相对时间或者绝对时间)和一个任务回调函数。
  • 有的时候还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。
  • 如果使用链表作为容器来串联所有的定时器,则每个定时器还要包含指向下一个定时器的指针成员。
  • 进一步,如果链表是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。

代码清单11-2实现了一个简单的升序定时器链表。升序定时器链表将其中的定时器按照超时时间做升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#ifndef LST_TIMER
#define LST_TIMER
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <time.h>

#define BUFFER_SIZE 64
class util_timer;

// 用户数据结构:客户端socket地址、socket文件描述符、读缓存和定时器
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer* timer;
};

// 定时器类
class util_timer {
public:
util_timer() : prev(NULL), next(NULL) {}

public:
// 任务的超时时间,这里使用绝对时间
time_t expire;
// 任务回调函数
void(*cb_func)(client_data*);
// 回调函数处理的客户数据,由定时器的执行者传递给回调函数
client_data* user_data;
// 指向前一个定时器
util_timer* prev;
// 指向下一个定时器
util_timer* next;
};

// 定时器链表。它是一个升序、双向链表,且带有头结点和尾节点
class sort_timer_lst {

public:
sort_timer_lst() : head(NULL), tail(NULL) {}

// 链表被销毁时,删除其中所有的定时器
~sort_timer_lst() {
util_timer* tmp = head;
while(tmp) {
head = tmp->next;
delete tmp;
tmp = head;
}
}
// 将目标定时器timer添加到链表中
void add_timer(util_timer* timer) {
if(!timer) {
return;
}
if(!head) {
head = tail = timer;
return;
}
// 如果目标定时器的超时时间小于当前链表中所有定时器的超时时间,
// 则把该定时器插入链表头部,作为链表新的头节点。
// 否则就需要调用重载函数
// add_timer(util_timer*timer,util_timer*lst_head),
// 把它插入链表中合适的位置,以保证链表的升序特性
if(timer->expire < head->expire) {
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}
// 当某个定时任务发生变化时,调整对应的定时器在链表中的位置。
// 这个函数只考虑被调整的定时器的超时时间延长的情况,
// 即该定时器需要往链表的尾部移动
void adjust_timer(util_timer* timer) {
if(!timer) {
return;
}
util_timer* tmp = timer->next;
// 如果被调整的目标定时器处在链表尾部,
// 或者该定时器新的超时值仍然小于其下一个定时器的超时值,则不用调整
if(!tmp || (timer->expire < tmp->expire)) {
return;
}
// 如果目标定时器是链表的头节点,则将该定时器从链表中取出并重新插入链表
if(timer == head) {
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
} else {
// 如果目标定时器不是链表的头节点,则将该定时器从链表中取出,
// 然后插入其原来所在位置之后的部分链表中
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
// 将目标定时器timer从链表中删除
void del_timer(util_timer* timer) {
if(!timer) {
return;
}
// 下面这个条件成立表示链表中只有一个定时器,即目标定时器
if((timer == head) && (timer == tail)) {
delete timer;
head = NULL;
tail = NULL;
return;
}
// 如果链表中至少有两个定时器,且目标定时器是链表的头结点,
// 则将链表的头结点重置为原头节点的下一个节点,然后删除目标定时器
if(timer == head) {
head = head->next;
head->prev = NULL;
delete timer;
return;
}
if(timer == tail ) {
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
// 如果目标定时器位于链表的中间,则把它前后的定时器串联起来,然后删除目标定时器
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}

// SIGALRM信号每次被触发就在其信号处理函数(如果使用统一事件源,则是主函数)
// 中执行一次tick函数,以处理链表上到期的任务
void tick() {
if(!head) {
return;
}
printf("timer tick\n");
// 获得系统当前的时间
time_t cur = time(NULL);
util_timer* tmp = head;
// 从头结点开始依次处理每个定时器,直到遇到一个尚未到期的定时器,
// 这就是定时器的核心逻辑
while(tmp) {
// 因为每个定时器都使用绝对时间作为超时值,
// 所以我们可以把定时器的超时值和系统当前时间,比较以判断定时器是否到期
if(cur < tmp->expire) {
break;
}
// 调用定时器的回调函数,以执行定时任务
tmp->cb_func(tmp->user_data);
head = tmp->next;
if(head) {
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
private:
util_timer* head;
util_timer* tail;

private:
// 一个重载的辅助函数,它被公有的add_timer函数和adjust_timer函数调用。
// 该函数表示将目标定时器timer添加到节点lst_head之后的部分链表中
void add_timer(util_timer* timer, util_timer* lst_head) {
util_timer* prev = lst_head;
util_timer* tmp = prev->next;
// 遍历lst_head节点之后的部分链表,直到找到一个超时时间大于目标定时器的超时
// 时间的节点,并将目标定时器插入该节点之前
while(tmp) {
if(timer->expire < tmp->expire) {
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
// 如果遍历完lst_head节点之后的部分链表,仍未找到超时时间大于目标定时器的超
// 时时间的节点,则将目标定时器插入链表尾部,并把它设置为链表新的尾节点
if(!tmp) {
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
};


#endif
  • sort_timer_lst是一个升序链表。

  • 其核心函数tick相当于一个心搏函数,它每隔一段固定的时间就执行一次,以检测并处理到期的任务。

  • 判断定时任务到期的依据是定时器的expire值小于当前的系统时间。

  • 从执行效率来看,添加定时器的时间复杂度是O(n),删除定时器的时间复杂度是O(1),执行定时任务的时间复杂度是O(1)。

2.2 处理非活动连接

升序定时器链表的实际应用——处理非活动连接。

服务器程序通常要定期处理非活动连接:给客户端发一个重连请求,或者关闭该连接,或者其他。

Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可以通过socket选项KEEPALIVE来激活它。不过使用这种方式将使得应用程序对连接的管理变得复杂。

因此,我们可以考虑在应用层实现类似于KEEPALIVE的机制,以管理所有长时间处于非活动状态的连接。

比如,代码清单11-3利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动的连接。

代码清单11-3 关闭非活动连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "lst_timer.h"

#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5

static int pipefd[2];
static sort_timer_lst timer_lst;
static int epollfd = 0;

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}


void addfd( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

void sig_handler( int sig )
{
int save_errno = errno;
int msg = sig;
send( pipefd[1], ( char* )&msg, 1, 0 );
errno = save_errno;
}
void addsig( int sig )
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}

void timer_handler()
{
timer_lst.tick();
alarm( TIMESLOT );
}

// 定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之
void cb_func( client_data* user_data )
{
epoll_ctl( epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0 );
assert( user_data );
close( user_data->sockfd );
printf( "close fd %d\n", user_data->sockfd );
}

int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd );

ret = socketpair( PF_UNIX, SOCK_STREAM, 0, pipefd );
assert( ret != -1 );
setnonblocking( pipefd[1] );
addfd( epollfd, pipefd[0] );

addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;

client_data* users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT);

while(!stop_server) {
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}

for(int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd );
users[connfd].address = client_address;
users[connfd].sockfd = connfd;
util_timer* timer = new util_timer;
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer(timer);
} else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if(ret == -1) {
continue;
} else if(ret == 0) {
continue;
} else {
for(int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGALRM: {
timeout = true;
break;
}
case SIGTERM: {
stop_server = true;
break;
}
}
}
}
} else if(events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of content: %s\n", ret, users[sockfd].buf);
util_timer* timer = users[sockfd].timer;
if(ret < 0) {
if(errno != EAGAIN) {
cb_func(&users[sockfd]);
if(timer) {
timer_lst.del_timer(timer);
}
}
} else if(ret == 0) {
cb_func(&users[sockfd]);
if(timer) {
timer_lst.del_timer(timer);
}
}
} else {

}

}
}

close(listenfd);
close(pipefd[0]);
close(pipefd[1]);
delete[] users;
return 0;
}

3. I/O复用系统调用的超时参数

Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号和I/O事件,也能统一处理定时事件。

但是由于I/O复用系统调用可能在超时时间到期之前就返回(有I/O事件发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间,如代码清单11-4所示。

代码清单11-4 利用I/O复用系统调用定时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#define TIMEOUT 5000
int timeout=TIMEOUT;
time_t start=time(NULL);
time_t end=time(NULL);
while(1) {
printf("the timeout is now%d mil-seconds\n",timeout);
start=time(NULL);
int number=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,timeout);
if((number<0)&&(errno!=EINTR)) {
printf("epoll failure\n");
break;
}
/*如果epoll_wait成功返回0,则说明超时时间到,此时便可处理定时任务,并重置
定时时间*/
if(number==0) {
timeout=TIMEOUT;
continue;
}
end=time(NULL);
/*如果epoll_wait的返回值大于0,则本次epoll_wait调用持续的时间是(end
start)*1000 ms,我们需要将定时时间timeout减去这段时间,以获得下次epoll_wait 调用的超时参数*/

timeout-=(end-start)*1000;
/*重新计算之后的timeout值有可能等于0,说明本次epoll_wait调用返回时,不仅
有文件描述符就绪,而且其超时时间也刚好到达,此时我们也要处理定时任务,并重置定时 时间*/
if(timeout<=0) {
timeout=TIMEOUT;
}
//handle connections
}

4. 高性能定时器

4.1 时间轮

基于排序链表的定时器存在一个问题:添加定时器的效率偏低。时间轮解决了这个问题。一种简单的时间轮如图11-1所示。

  • 实线)指针指向轮子上的一个槽(slot)。
  • 它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动称为一个滴答(tick)。
  • 一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。
  • 该时间轮共有N个槽,因此它运转一周的时间是N*si。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差N*si的整数倍。
  • 时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中:

\(ts=(cs+(ti/si))\%N\)

  • 基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低。
  • 而时间轮使用哈希表的思想,将定时器散列到不同的链表上。
  • 这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。

很显然,对时间轮而言,要提高定时精度,就要使si值足够小;要提高执行效率,则要求N值足够大。

图11-1描述的是一种简单的时间轮,因为它只有一个轮子。而复杂的时间里可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈,精度低的仅往前移动一槽,就像水表一样。

下面将按照图11-1来编写一个较为简单的时间轮实现代码,如代码清单11-5所示。

代码清单11-5 时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER

#include <time.h>
#include <netinet/in.h>
#include <stdio.h>

#define BUFFER_SIZE 64
class tw_timer;
// 绑定socket和定时器
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};

// 定时器类
class tw_timer {
public:
tw_timer(int rot, int ts)
: next(NULL), prev(NULL), rotation(rot), time_slot(ts) {}

public:
// 记录定时器在时间轮转多少圈后生效
int rotation;
// 记录定时器属于时间轮上哪个槽(对应的链表,下同)
int time_slot;
// 定时器回调函数
void(*cb_func)(client_data*);
// 客户数据
client_data* user_data;
// 指向下一个定时器
tw_timer* next;
// 指向前一个定时器
tw_timer* prev;
};

class time_wheel {
public:
time_wheel() : cur_slot(0) {
for(int i = 0; i < N; ++i) {
slots[i] = NULL;
}
}
~time_wheel(){
// 遍历每个槽,并销毁其中的定时器
for(int i = 0; i < N; ++i) {
tw_timer* tmp = slots[i];
while(tmp) {
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
// 根据定时值timeout创建一个定时器,并把它插入合适的槽中
tw_timer* add_timer(int timeout) {
if(timeout < 0) {
return NULL;
}
int ticks = 0;
// 下面根据待插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发,
// 并将该滴答数存储于变量ticks中。
// 如果待插入定时器的超时值小于时间轮的槽间隔SI,
// 则将ticks向上折合为1,否则就将ticks向下折合为timeout/SI
if(timeout < TI) {
ticks = 1;
} else {
ticks = timeout / TI;
}
// 计算待插入的定时器在时间轮转动多少圈后被触发
int rotation = ticks / N;
// 计算待插入的定时器应该被插入哪个槽中
int ts = (cur_slot + (ticks % N)) % N;
// 创建新的定时器,它在时间轮转动rotation圈之后被触发,且位于第ts个槽上
tw_timer* timer = new tw_timer(rotation, ts);
// 如果第ts个槽中尚无任何定时器,则把新建的定时器插入其中,
// 并将该定时器设置为该槽的头结点
if(!slots[ts]) {
printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n",
rotation, ts, cur_slot);
slots[ts] = timer;
} else {
// 否则,将定时器插入第ts个槽中
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}

// 删除目标定时器timer
void del_timer(tw_timer* timer) {
if(!timer) {
return;
}
int ts = timer->time_slot;
// slots[ts]是目标定时器所在槽的头结点。
// 如果目标定时器就是该头结点,则需要重置第ts个槽的头结点
if(timer == slots[ts]) {
slots[ts] = slots[ts]->next;
if(slots[ts]) {
slots[ts]->prev = NULL;
}
delete timer;
} else {
timer->prev->next = timer->next;
if(timer->next) {
timer->next->prev = timer->prev;
}
delete timer;
}
}
// SI时间到后,调用该函数,时间轮向前滚动一个槽的间隔
void tick() {
// 取得时间轮上当前槽的头结点
tw_timer* tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
while(tmp) {
printf("tick the timer once\n");
// 如果定时器的rotation值大于0,则它在这一轮不起作用
if(tmp->rotation > 0) {
tmp->rotation--;
tmp = tmp->next;
} else {
// 否则,说明定时器已经到期,于是执行定时任务,然后删除该定时器
tmp->cb_func(tmp->user_data);
if(tmp == slots[cur_slot]) {
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next;
delete tmp;
if(slots[cur_slot]) {
slots[cur_slot]->prev = NULL;
}
tmp = slots[cur_slot];
} else {
tmp->prev->next = tmp->next;
if(tmp->next) {
tmp->next->prev = tmp->prev;
}
tw_timer* tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
// 更新时间轮的当前槽,以反映时间轮的转动
cur_slot = ++cur_slot % N;
}

private:
// 时间轮上槽的数目
static const int N = 60;
// 每1 s时间轮转动一次,即槽间隔为1 s
static const int TI = 1;
// 时间轮的槽,其中每个元素指向一个定时器链表,链表无序
tw_timer* slots[N];
// 时间轮的当前槽
int cur_slot;
};

#endif
  • 可见,对时间轮而言,添加一个定时器的时间复杂度是O(1),删除一个定时器的时间复杂度也是O(1),执行一个定时器的时间复杂度是O(n)。
  • 但实际上执行一个定时器任务的效率要比O(n)好得多,因为时间轮将所有的定时器散列到了不同的链表上。时间轮的槽越多,等价于散列表的入口(entry)越多,从而每条链表上的定时器数量越少。此外,我们的代码仅使用了一个时间轮。
  • 当使用多个轮子来实现时间轮时,执行一个定时器任务的时间复杂度将接近O(1)。

4.2 时间堆

  • 前面定时方案都是以固定的频率调用心搏函数tick,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。
  • 设计定时器的另外一种思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔。
  • 这样,一旦心搏函数tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。
  • 然后,再次从剩余的定时器中找出超时时间最小的一个,并将这段最短时间设置为下一次心搏间隔。
  • 如此反复,就实现了较为精确的定时。最小堆很适合处理这种定时方案。
  • 最小堆是指每个节点的值都小于或等于其子节点的值的完全二叉树。图11-2给出了一个具有6个元素的最小堆。

  • 树的基本操作是插入节点和删除节点。
  • 对最小堆而言,它们都很简单。为了将一个元素X插入最小堆,我们可以在树的下一个空闲位置创建一个空穴。
  • 如果X可以放在空穴中而不破坏堆序,则插入完成。否则就执行上滤操作,即交换空穴和它的父节点上的元素。
  • 不断执行上述过程,直到X可以被放入空穴,则插入操作完成。比如,我们要往图11-2所示的最小堆中插入值为14的元素,则可以按照图11-3所示的步骤来操作。

  • 最小堆的删除操作指的是删除其根节点上的元素,并且不破坏堆序性质。
  • 执行删除操作时,我们需要先在根节点处创建一个空穴。
  • 由于堆现在少了一个元素,因此我们可以把堆的最后一个元素X移动到该堆的某个地方。
  • 如果X可以被放入空穴,则删除操作完成。否则就执行下滤操作,即交换空穴和它的两个儿子节点中的较小者。
  • 不断进行上述过程,直到X可以被放入空穴,则删除操作完成。

由于最小堆是一种完全二叉树,所以我们可以用数组来组织其中的元素。比如,图11-2所示的最小堆可以用图11-5所示的数组来表示。对于数组中的任意一个位置i上的元素,其左儿子节点在位置2i+1上,其右儿子节点在位置2i+2上,其父节点则在位置[(i-1)/2](i>0)上。与用链表来表示堆相比,用数组表示堆不仅节省空间,而且更容易实现堆的插入、删除等操作[5]。

  • 假设我们已经有一个包含N个元素的数组,现在要把它初始化为一个最小堆。那么最简单的方法是:初始化一个空堆,然后将数组中的每个元素插入该堆中。不过这样做的效率偏低。
  • 实际上,我们只需要对数组中的第[(N-1)/2]~0个元素执行下滤操作,即可确保该数组构成一个最小堆。
  • 这是因为对包含N个元素的完全二叉树而言,它具有[(N-1)/2]个非叶子节点,这些非叶子节点正是该完全二叉树的第0~[(N-1)/2]个节点。我们只要确保这些非叶子节点构成的子树都具有堆序性质,整个树就具有堆序性质。

我们称用最小堆实现的定时器为时间堆。代码清单11-6给出了一种时间堆的实现,其中,最小堆使用数组来表示。

代码清单11-6 时间堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#ifndef intIME_HEAP
#define intIME_HEAP

#include <iostream>
#include <netinet/in.h>
#include <time.h>
using std::exception;

#define BUFFER_SIZE 64

class heap_timer;

// 绑定socket和定时器
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer* timer;
};
// 定时器类
class heap_timer {
public:
heap_timer(int delay) {
expire = time(NULL) + delay;
}
public:
// 定时器生效的绝对时间
time_t expire;
void (*cb_func)(client_data*);
client_data* user_data;
};

class time_heap {
public:
// 构造函数之一,初始化一个大小为cap的空堆
time_heap(int cap) throw(std::exception)
: capacity( cap ), cur_size( 0 )
{
array = new heap_timer* [capacity];
if ( !array )
{
throw std::exception();
}
for( int i = 0; i < capacity; ++i )
{
array[i] = NULL;
}
}
// 构造函数之二,用已有数组来初始化堆
time_heap( heap_timer** init_array, int size, int capacity ) throw ( std::exception )
: cur_size( size ), capacity( capacity )
{
if ( capacity < size )
{
throw std::exception();
}
array = new heap_timer* [capacity];
if ( ! array )
{
throw std::exception();
}
for( int i = 0; i < capacity; ++i )
{
array[i] = NULL;
}
if ( size != 0 )
{
for ( int i = 0; i < size; ++i )
{
array[ i ] = init_array[ i ];
}
for ( int i = (cur_size-1)/2; i >=0; --i )
{
percolate_down( i );
}
}
}
~time_heap()
{
for ( int i = 0; i < cur_size; ++i )
{
delete array[i];
}
delete [] array;
}

public:
void add_timer( heap_timer* timer ) throw ( std::exception )
{
if( !timer )
{
return;
}
// 如果当前堆数组容量不够,则将其扩大1倍
if( cur_size >= capacity )
{
resize();
}
int hole = cur_size++;
int parent = 0;
// 对从空穴到根节点的路径上的所有节点执行上虑操作
for( ; hole > 0; hole=parent )
{
parent = (hole-1)/2;
if ( array[parent]->expire <= timer->expire )
{
break;
}
array[hole] = array[parent];
}
array[hole] = timer;
}
void del_timer( heap_timer* timer )
{
if( !timer )
{
return;
}
// lazy delelte
timer->cb_func = NULL;
}
heap_timer* top() const
{
if ( empty() )
{
return NULL;
}
return array[0];
}
void pop_timer()
{
if( empty() )
{
return;
}
if( array[0] )
{
delete array[0];
array[0] = array[--cur_size];
percolate_down( 0 );
}
}
void tick()
{
heap_timer* tmp = array[0];
time_t cur = time( NULL );
while( !empty() )
{
if( !tmp )
{
break;
}
if( tmp->expire > cur )
{
break;
}
if( array[0]->cb_func )
{
array[0]->cb_func( array[0]->user_data );
}
pop_timer();
tmp = array[0];
}
}
bool empty() const { return cur_size == 0; }

private:
void percolate_down( int hole )
{
heap_timer* temp = array[hole];
int child = 0;
for ( ; ((hole*2+1) <= (cur_size-1)); hole=child )
{
child = hole*2+1;
if ( (child < (cur_size-1)) && (array[child+1]->expire < array[child]->expire ) )
{
++child;
}
if ( array[child]->expire < temp->expire )
{
array[hole] = array[child];
}
else
{
break;
}
}
array[hole] = temp;
}
void resize() throw ( std::exception )
{
heap_timer** temp = new heap_timer* [2*capacity];
for( int i = 0; i < 2*capacity; ++i )
{
temp[i] = NULL;
}
if ( ! temp )
{
throw std::exception();
}
capacity = 2*capacity;
for ( int i = 0; i < cur_size; ++i )
{
temp[i] = array[i];
}
delete [] array;
array = temp;
}

private:
heap_timer** array;
int capacity;
int cur_size;
};


#endif

对时间堆而言,添加一个定时器的时间复杂度是O(logn),删除一个定时器的时间复杂度是O(1),执行一个定时器的时间复杂度是O(1)。因此,时间堆的效率是很高的。


11. 定时器
http://binbo-zappy.github.io/2024/12/16/Linux高性能服务器编程-游双/11-定时器/
作者
Binbo
发布于
2024年12月16日
许可协议