12. 高性能I/O框架库Libevent

第12章 高性能I/O框架库Libevent

前面三章的篇幅较为细致地讨论了Linux服务器程序必须处理的三类事件:I/O事件、信号和定时事件。在处理这三类事件时我们通常需要考虑如下三个问题:

❑统一事件源。很明显,统一处理这三类事件既能使代码简单易懂,又能避免一些潜在的逻辑错误。前面我们已经讨论了实现统一事件源的一般方法——利用I/O复用系统调用来管理所有事件。

❑可移植性。不同的操作系统具有不同的I/O复用方式,比如Solaris的dev/poll文件,FreeBSD的kqueue机制,Linux的epoll系列系统调用。

❑对并发编程的支持。在多进程和多线程环境下,我们需要考虑各执行实体如何协同处理客户连接、信号和定时器,以避免竞态条件。

所幸的是,开源社区提供了诸多优秀的I/O框架库。它们不仅解决了上述问题,让开发者可以将精力完全放在程序的逻辑上,而且稳定性、性能等各方面都相当出色。比如ACE、ASIO和Libevent。本章将介绍其中相对轻量级的Libevent框架库。

1. I/O框架库概述

I/O框架库以库函数的形式,封装了较为底层的系统调用,给应用程序提供了一组更便于使用的接口。这些库函数往往比程序员自己实现的同样功能的函数更合理、更高效,且更健壮。因为它们经受住了真实网络环境下的高压测试,以及时间的考验。

各种I/O框架库的实现原理基本相似,要么以Reactor模式实现,要么以Proactor模式实现,要么同时以这两种模式实现。

  • 举例来说,基于Reactor模式的I/O框架库包含如下几个组件:句柄(Handle)、事件多路分发器(EventDemultiplexer)、事件处理器(EventHandler)和具体的事件处理器(ConcreteEventHandler)、Reactor。

1.句柄

  • I/O框架库要处理的对象,即I/O事件、信号和定时事件,统一称为事件源。
  • 一个事件源通常和一个句柄绑定在一起。句柄的作用是,当内核检测到就绪事件时,它将通过句柄来通知应用程序这一事件。在Linux环境下,I/O事件对应的句柄是文件描述符,信号事件对应的句柄就是信号值。

2.事件多路分发器

  • 事件的到来是随机的、异步的。
  • 我们无法预知程序何时收到一个客户连接请求,又亦或收到一个暂停信号。所以程序需要循环地等待并处理事件,这就是事件循环。
  • 在事件循环中,等待事件一般使用I/O复用技术来实现。
  • I/O框架库一般将系统支持的各种I/O复用系统调用封装成统一的接口,称为事件多路分发器。
  • 事件多路分发器的demultiplex方法是等待事件的核心函数,其内部调用的是select、poll、epoll_wait等函数。
  • 此外,事件多路分发器还需要实现register_event和remove_event方法,以供调用者往事件多路分发器中添加事件和从事件多路分发器中删除事件。

3.事件处理器和具体事件处理器

  • 事件处理器执行事件对应的业务逻辑。
  • 它通常包含一个或多个handle_event回调函数,这些回调函数在事件循环中被执行。I/O框架库提供的事件处理器通常是一个接口,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般被声明为虚函数,以支持用户的扩展。
  • 此外,事件处理器一般还提供一个get_handle方法,它返回与该事件处理器关联的句柄。那么,事件处理器和句柄有什么关系?当事件多路分发器检测到有事件发生时,它是通过句柄来通知应用程序的。因此,我们必须将事件处理器和句柄绑定,才能在事件发生时获取到正确的事件处理器。

4.Reactor

Reactor是I/O框架库的核心。它提供的几个主要方法是:

❑handle_events。该方法执行事件循环。它重复如下过程:等待事件,然后依次处理所有就绪事件对应的事件处理器。

❑register_handler。该方法调用事件多路分发器的register_event方法来往事件多路分发器中注册一个事件。

❑remove_handler。该方法调用事件多路分发器的remove_event方法来删除事件多路分发器中的一个事件。

2. Libevent源码分析

Libevent是开源社区的一款高性能的I/O框架库,其学习者和使用者众多。使用Libevent的著名案例有:高性能的分布式内存对象缓存软件memcached,Google浏览器Chromium的Linux版本。作为一个I/O框架库,Libevent具有如下特点:

❑跨平台支持。Libevent支持Linux、UNIX和Windows。

❑统一事件源。Libevent对I/O事件、信号和定时事件提供统一的处理。

❑线程安全。Libevent使用libevent_pthreads库来提供线程安全支持。

❑基于Reactor模式的实现。

这一节中我们将简单地研究一下Libevent源代码的主要部分。分析它除了可以更好地学习网络编程外,还有如下好处:

❑学习编写一个产品级的函数库要考虑哪些细节。

❑提高C语言功底。Libevent源码中使用了大量的函数指针,用C语言实现了多态机制,并提供了一些基础数据结构的高效实现,比如双向链表、最小堆等。

Libevent的官方网站是http://libevent.org/,其中提供Libevent源代码的下载,以及学习Libevent框架库的第一手文档,并且源码和文档的更新也较为频繁。Libevent版本是2.0.19。

2.1 一个实例

分析一款软件的源代码,最简单有效的方式是从使用入手,这样才能真正从整体上把握该软件的逻辑结构。代码清单12-1是使用Libevent库实现的一个“Hello World”程序。

代码清单12-1 Libevent实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <sys/signal.h>
#include <event.h>

void signal_cb(int fd, short event, void* argc) {
struct event_base* base = (struct event_base*)argc;
struct timeval delay = { 2, 0 };
printf( "Caught an interrupt signal; exiting cleanly in two seconds...\n" );
event_base_loopexit(base, &delay);
}

void timeout_cb(int fd, short event, void* argc) {
printf("timeout\n");
}

int main()
{
// 调用event_init函数创建event_base对象
struct event_base* base = event_init();
// 创建具体的事件处理器,并设置它们所从属的Reactor实例。
// evsignal_new和evtimer_new分别用于创建信号事件处理器和定时事件处理器
struct event* signal_event = evsignal_new(base, SIGINT, signal_cb, base);
event_add(signal_event, NULL);
struct timeval tv = {1, 0};
struct event* timeout_event = evtimer_new(base, timeout_cb, NULL);
event_add(timeout_event, &tv);
event_base_dispatch(base);
event_free(timeout_event);
event_free(signal_event);
event_base_free(base);
}

代码清单12-1虽然简单,但却基本上描述了Libevent库的主要逻辑:

1)调用event_init函数创建event_base对象。一个event_base相当于一个Reactor实例。

2)创建具体的事件处理器,并设置它们所从属的Reactor实例。evsignal_new和evtimer_new分别用于创建信号事件处理器和定时事件处理器,它们是定义在include/event2/event.h文件中的宏:

1
2
#define evsignal_new(b,x,cb,arg)\ event_new((b),(x),EV_SIGNAL|EV_PERSIST,(cb),(arg))
#define evtimer_new(b,cb,arg)event_new((b),-1,0,(cb),(arg))

可见,它们的统一入口是event_new函数,即用于创建通用事件处理器(图12-1中的EventHandler)的函数。其定义是:

1
2
struct event*event_new(struct event_base*base,evutil_socket_t
fd,short events,void(*cb)(evutil_socket_t,short,void*),void*arg)
  • base参数指定新创建的事件处理器从属的Reactor。
  • fd参数指定与该事件处理器关联的句柄。创建I/O事件处理器时,应该给fd参数传递文件描述符值;创建信号事件处理器时,应该给fd参数传递信号值,比如代码清单12-1中的SIGINT;
  • 创建定时事件处理器时,则应该给fd参数传递-1。
  • events参数指定事件类型,其可选值都定义在include/event2/event.h文件中,如代码清单12-2所示。

代码清单12-2 Libevent支持的事件类型

1
2
3
4
5
6
7
#define EV_TIMEOUT 0x01/*定时事件*/
#define EV_READ 0x02/*可读事件*/
#define EV_WRITE 0x04/*可写事件*/
#define EV_SIGNAL 0x08/*信号事件*/
#define EV_PERSIST 0x10/*永久事件*/
/*边沿触发事件,需要I/O复用系统调用支持,比如epoll*/
#define EV_ET 0x20

代码清单12-2中,EV_PERSIST的作用是:事件被触发后,自动重新对这个event调用event_add函数(见后文)。

cb参数指定目标事件对应的回调函数,相当于图12-1中事件处理器的handle_event方法。arg参数则是Reactor传递给回调函数的参数。

event_new函数成功时返回一个event类型的对象,也就是Libevent的事件处理器。Libevent用单词“event”来描述事件处理器,而不是事件,会使读者觉得有些混乱,故而我们约定如下:

❑事件指的是一个句柄上绑定的事件,比如文件描述符0上的可读事件。

❑事件处理器,也就是event结构体类型的对象,除了包含事件必须具备的两个要素(句柄和事件类型)外,还有很多其他成员,比如回调函数。

❑事件由事件多路分发器管理,事件处理器则由事件队列管理。事件队列包括多种,比如event_base中的注册事件队列、活动事件队列和通用定时器队列,以及evmap中的I/O事件队列、信号事件队列。关于这些事件队列,我们将在后文依次讨论。

❑事件循环对一个被激活事件(就绪事件)的处理,指的是执行该事件对应的事件处理器中的回调函数。

3)调用event_add函数,将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器中。event_add函数相当于Reactor中的register_handler方法。

4)调用event_base_dispatch函数来执行事件循环。

5)事件循环结束后,使用*_free系列函数来释放系统资源。

由此可见,代码清单12-1给我们提供了一条分析Libevent源代码的主线。不过在此之前,我们先简单介绍一下Libevent源代码的组织结构。

2.2 源代码组织结构

Libevent源代码中的目录和文件按照功能可划分为如下部分:

❑头文件目录include/event2。该目录是自Libevent主版本升级到2.0之后引入的,在1.4及更老的版本中并无此目录。该目录中的头文件是Libevent提供给应用程序使用的,比如,event.h头文件提供核心函数,http.h头文件提供HTTP协议相关服务,rpc.h头文件提供远程过程调用支持。

❑源码根目录下的头文件。这些头文件分为两类:一类是对include/event2目录下的部分头文件的包装,另外一类是供Libevent内部使用的辅助性头文件,它们的文件名都具有*-internal.h的形式。

❑通用数据结构目录compat/sys。该目录下仅有一个文件——queue.h。它封装了跨平台的基础数据结构,包括单向链表、双向链表、队列、尾队列和循环队列。

❑sample目录。它提供一些示例程序。

❑test目录。它提供一些测试代码。

❑WIN32-Code目录。它提供Windows平台上的一些专用代码。

❑event.c文件。该文件实现Libevent的整体框架,主要是event和event_base两个结构体的相关操作。

❑devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c文件。它们分别封装了如下I/O复用机制:/dev/poll、kqueue、event ports、POSIX select、Windows select、poll和epoll。这些文件的主要内容相似,都是针对结构体eventop(见后文)所定义的接口函数的具体实现。

❑minheap-internal.h文件。该文件实现了一个时间堆,以提供对定时事件的支持。

❑signal.c文件。它提供对信号的支持。其内容也是针对结构体eventop所定义的接口函数的具体实现。

❑evmap.c文件。它维护句柄(文件描述符或信号)与事件处理器的映射关系。

❑event_tagging.c文件。它提供往缓冲区中添加标记数据(比如一个整数),以及从缓冲区中读取标记数据的函数。

❑event_iocp.c文件。它提供对Windows IOCP(Input/Output Completion Port,输入输出完成端口)的支持。

❑buffer*.c文件。它提供对网络I/O缓冲的控制,包括:输入输出数据过滤,传输速率限制,使用SSL(Secure Sockets Layer)协议对应用数据进行保护,以及零拷贝文件传输等。

❑evthread*.c文件。它提供对多线程的支持。

❑listener.c文件。它封装了对监听socket的操作,包括监听连接和接受连接。

❑logs.c文件。它是Libevent的日志系统。

❑evutil.c、evutil_rand.c、strlcpy.c和arc4random.c文件。它们提供一些基本操作,比如生成随机数、获取socket地址信息、读取文件、设置socket属性等。

❑evdns.c、http.c和evrpc.c文件。它们分别提供了对DNS协议、HTTP协议和RPC(Remote Procedure Call,远程过程调用)协议的支持。

❑epoll_sub.c文件。该文件未见使用。

在整个源码中,event-internal.h、include/event2/event_struct.h、event.c和evmap.c等4个文件最为重要。它们定义了event和event_base结构体,并实现了这两个结构体的相关操作。下面的讨论也主要是围绕这几个文件展开的。

2.3 event结构体

前文提到,Libevent中的事件处理器是event结构类型。event结构体封装了句柄、事件类型、回调函数,以及其他必要的标志和数据。该结构体在include/event2/event_struct.h文件中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct event { 
TAILQ_ENTRY(event)ev_active_next;
TAILQ_ENTRY(event)ev_next;
union{
TAILQ_ENTRY(event)ev_next_with_common_timeout;
int min_heap_idx;
}ev_timeout_pos;

evutil_socket_t ev_fd;
struct event_base*ev_base;
union{
struct{
TAILQ_ENTRY(event)ev_io_next;
struct timeval ev_timeout;
}ev_io;
struct{
TAILQ_ENTRY(event)ev_signal_next;
short ev_ncalls; short*ev_pncalls;
}ev_signal;
}_ev;
short ev_events;
short ev_res;
short ev_flags;
ev_uint8_t ev_pri;
ev_uint8_t ev_closure;
struct timeval ev_timeout;
void(*ev_callback)(evutil_socket_t,short,void*arg);
void*ev_arg;
};

下面我们详细介绍event结构体中的每个成员:

❑ev_events。它代表事件类型。其取值可以是代码清单12-2所示的标志的按位或(互斥的事件类型除外,比如读写事件和信号事件就不能同时被设置)。

❑ev_next。所有已经注册的事件处理器(包括I/O事件处理器和信号事件处理器)通过该成员串联成一个尾队列,我们称之为注册事件队列。宏TAILQ_ENTRY是尾队列中的节点类型,它定义在compat/sys/queue.h文件中:

1
#define TAILQ_ENTRY(type)\ struct{\ struct type*tqe_next;\/*下一个元素*/ struct type**tqe_prev;\/*前一个元素的地址*/ }

❑ev_active_next。所有被激活的事件处理器通过该成员串联成一个尾队列,我们称之为活动事件队列。活动事件队列不止一个,不同优先级的事件处理器被激活后将被插入不同的活动事件队列中。在事件循环中,Reactor将按优先级从高到低遍历所有活动事件队列,并依次处理其中的事件处理器。

❑ev_timeout_pos。这是一个联合体,它仅用于定时事件处理器。为了讨论的方便,后面我们称定时事件处理器为定时器。老版本的Libevent中,定时器都是由时间堆来管理的。但开发者认为有时候使用简单的链表来管理定时器将具有更高的效率。因此,新版本的Libevent就引入了所谓“通用定时器”的概念。这些定时器不是存储在时间堆中,而是存储在尾队列中,我们称之为通用定时器队列。对于通用定时器而言,ev_timeout_pos联合体的ev_next_with_common_timeout成员指出了该定时器在通用定时器队列中的位置。对于其他定时器而言,ev_timeout_pos联合体的min_heap_idx成员指出了该定时器在时间堆中的位置。一个定时器是否是通用定时器取决于其超时值大小,具体判断原则请读者自己参考event.c文件中的is_common_timeout函数。

❑_ev。这是一个联合体。所有具有相同文件描述符值的I/O事件处理器通过ev.ev_io.ev_io_next成员串联成一个尾队列,我们称之为I/O事件队列;所有具有相同信号值的信号事件处理器通过ev.ev_signal.ev_signal_next成员串联成一个尾队列,我们称之为信号事件队列。ev.ev_signal.ev_ncalls成员指定信号事件发生时,Reactor需要执行多少次该事件对应的事件处理器中的回调函数。ev.ev_signal.ev_pncalls指针成员要么是NULL,要么指向ev.ev_signal.ev_ncalls。

在程序中,我们可能针对同一个socket文件描述符上的可读/可写事件创建多个事件处理器(它们拥有不同的回调函数)。当该文件描述符上有可读/可写事件发生时,所有这些事件处理器都应该被处理。所以,Libevent使用I/O事件队列将具有相同文件描述符值的事件处理器组织在一起。这样,当一个文件描述符上有事件发生时,事件多路分发器就能很快地把所有相关的事件处理器添加到活动事件队列中。信号事件队列的存在也是由于相同的原因。可见,I/O事件队列和信号事件队列并不是注册事件队列的细致分类,而是另有用处。

❑ev_fd。对于I/O事件处理器,它是文件描述符值;对于信号事件处理器,它是信号值。

❑ev_base。该事件处理器从属的event_base实例。

❑ev_res。它记录当前激活事件的类型。

❑ev_flags。它是一些事件标志。其可选值定义在include/event2/event_struct.h文件中:

1
2
3
4
5
6
7
#define EVLIST_TIMEOUT 0x01/*事件处理器从属于通用定时器队列或时间堆*/
#define EVLIST_INSERTED 0x02/*事件处理器从属于注册事件队列*/
#define EVLIST_SIGNAL 0x04/*没有使用*/
#define EVLIST_ACTIVE 0x08/*事件处理器从属于活动事件队列*/
#define EVLIST_INTERNAL 0x10/*内部使用*/
#define EVLIST_INIT 0x80/*事件处理器已经被初始化*/
#define EVLIST_ALL (0xf000|0x9f)/*定义所有标志*/

❑ev_pri。它指定事件处理器优先级,值越小则优先级越高。

❑ev_closure。它指定event_base执行事件处理器的回调函数时的行为。其可选值定义于event-internal.h文件中:

1
2
3
4
5
/*默认行为*/ #define EV_CLOSURE_NONE 0
/*执行信号事件处理器的回调函数时,调用ev.ev_signal.ev_ncalls次该回调函数*/
#define EV_CLOSURE_SIGNAL 1
/*执行完回调函数后,再次将事件处理器加入注册事件队列中*/
#define EV_CLOSURE_PERSIST 2

❑ev_timeout。它仅对定时器有效,指定定时器的超时值。

❑ev_callback。它是事件处理器的回调函数,由event_base调用。回调函数被调用时,它的3个参数分别被传入事件处理器的如下3个成员:ev_fd、ev_res和ev_arg。

❑ev_arg。回调函数的参数。

2.4 往注册事件队列中添加事件处理器

前面提到,创建一个event对象的函数是event_new(及其变体),它在event.c文件中实现。该函数的实现相当简单,主要是给event对象分配内存并初始化它的部分成员,因此我们不讨论它。event对象创建好后,应用程序需要调用event_add函数将其添加到注册事件队列中,并将对应的事件注册到事件多路分发器上。event_add函数在event.c文件中实现,主要是调用另外一个内部函数event_add_internal,如代码清单12-3所示。

代码清单12-3 event_add_internal函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
static inline int event_add_internal(struct event*ev,const
struct timeval*tv,int tv_is_absolute)
{
struct event_base*base=ev->ev_base;
int res=0;
int notify=0;
EVENT_BASE_ASSERT_LOCKED(base);
_event_debug_assert_is_setup(ev);
event_debug(( "event_add:event:%p(fd%d),%s%s%scall%p", ev, (int)ev->ev_fd, ev->ev_events&EV_READ?"EV_READ":"", ev->ev_events&EV_WRITE?"EV_WRITE":"", tv?"EV_TIMEOUT":"", ev->ev_callback));
EVUTIL_ASSERT(!(ev->ev_flags&~EVLIST_ALL));
/*如果新添加的事件处理器是定时器,且它尚未被添加到通用定时器队列或时间堆中,
则为该定时器在时间堆上预留一个位置*/
if(tv!=NULL && !(ev->ev_flags&EVLIST_TIMEOUT)){
if(min_heap_reserve(&base->timeheap, 1+min_heap_size(&base->timeheap))==-1) return(-1);
}
/*如果当前调用者不是主线程(执行事件循环的线程),并且被添加的事件处理器是信号事件处理器,而且主线程正在执行该信号事件处理器的回调函数,则当前调用者必须等待主线程完成调用,否则将引起竞态条件(考虑event结构体的ev_ncalls和ev_pncalls成员)*/
#ifndef_EVENT_DISABLE_THREAD_SUPPORT
if(base->current_event==ev && (ev->ev_events&EV_SIGNAL) &&!EVBASE_IN_THREAD(base)){
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond,base->th_base_lock);
}
#endif
if((ev->ev_events&(EV_READ|EV_WRITE|EV_SIGNAL)) && !(ev->ev_flags&(EVLIST_INSERTED|EVLIST_ACTIVE))){
if(ev->ev_events&(EV_READ|EV_WRITE)) /*添加I/O事件和I/O事件处理器的映射关系*/
res=evmap_io_add(base,ev->ev_fd,ev);
else if(ev->ev_events&EV_SIGNAL) /*添加信号事件和信号事件处理器的映射关系*/
res=evmap_signal_add(base,(int)ev->ev_fd,ev);
if(res!=-1) /*将事件处理器插入注册事件队列*/
event_queue_insert(base,ev,EVLIST_INSERTED);
if(res==1){
/*事件多路分发器中添加了新的事件,所以要通知主线程*/
notify=1;
res=0;
}
}
/*下面将事件处理器添加至通用定时器队列或时间堆中。对于信号事件处理器和I/O事件处理器,根据evmap_*_add函数的结果决定是否添加(这是为了给事件设置超时);而对于定时器,则始终应该添加之*/
if(res!=-1 && tv!=NULL){
struct timeval now;
int common_timeout;
/*对于永久性事件处理器,如果其超时时间不是绝对时间,则将该事件处理器的超时时间记录在变量ev->ev_io_timeout中。ev_io_timeout是定义在event-internal.h 文件中的宏:
#define ev_io_timeout_ev.ev_io.ev_timeout*/
if(ev->ev_closure==EV_CLOSURE_PERSIST && !tv_is_absolute) ev->ev_io_timeout=*tv;
/*如果该事件处理器已经被插入通用定时器队列或时间堆中,则先删除它*/
if(ev->ev_flags&EVLIST_TIMEOUT){
if(min_heap_elt_is_top(ev)) notify=1;
event_queue_remove(base,ev,EVLIST_TIMEOUT);
}
/*如果待添加的事件处理器已经被激活,且原因是超时,则从活动事件队列中删除它,
以避免其回调函数被执行。对于信号事件处理器,必要时还需将其ncalls成员设置为0(注意,ev_pncalls如果不为NULL,它指向ncalls)。前面提到,信号事件被触发时,ncalls指定其回调函数被执行的次数。将ncalls设置为0,可以干净地终止信号事件的处理*/
if((ev->ev_flags&EVLIST_ACTIVE) && (ev->ev_res&EV_TIMEOUT)){
if(ev->ev_events&EV_SIGNAL){
if(ev->ev_ncalls && ev->ev_pncalls){
*ev->ev_pncalls=0;
}
}
event_queue_remove(base,ev,EVLIST_ACTIVE);
}
gettime(base,&now);
common_timeout=is_common_timeout(tv,base);
if(tv_is_absolute){
ev->ev_timeout=*tv;
/*判断应该将定时器插入通用定时器队列,还是插入时间堆*/
}else if(common_timeout){
struct timeval tmp=*tv;
tmp.tv_usec&=MICROSECONDS_MASK;
evutil_timeradd(&now,&tmp,&ev->ev_timeout);
ev->ev_timeout.tv_usec|= (tv->tv_usec&~MICROSECONDS_MASK);
}else{
/*加上当前系统时间,以取得定时器超时的绝对时间*/
evutil_timeradd(&now,tv,&ev->ev_timeout);
}
event_debug(( "event_add:timeout in%d seconds,call%p", (int)tv->tv_sec,ev->ev_callback));
event_queue_insert(base,ev,EVLIST_TIMEOUT);/*最后,插入定时器*/
/*如果被插入的事件处理器是通用定时器队列中的第一个元素,则通过调用
common_timeout_schedule函数将其转移到时间堆中。这样,通用定时器链表和时间堆中的定时器就得到了统一的处理*/
if(common_timeout){
struct common_timeout_list*ctl= get_common_timeout_list(base,&ev->ev_timeout);
if(ev==TAILQ_FIRST(&ctl->events)){
common_timeout_schedule(ctl,&now,ev);
}
}else{
if(min_heap_elt_is_top(ev)) notify=1;
}
}
/*如果必要,唤醒主线程*/
if(res!=-1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base);
_event_debug_note_add(ev);
return(res);
}

从代码清单12-3可见,event_add_internal函数内部调用了几个重要的函数:

❑evmap_io_add。该函数将I/O事件添加到事件多路分发器中,并将对应的事件处理器添加到I/O事件队列中,同时建立I/O事件和I/O事件处理器之间的映射关系。我们将在下一节详细讨论该函数。

❑evmap_signal_add。该函数将信号事件添加到事件多路分发器中,并将对应的事件处理器添加到信号事件队列中,同时建立信号事件和信号事件处理器之间的映射关系。

❑event_queue_insert。该函数将事件处理器添加到各种事件队列中:将I/O事件处理器和信号事件处理器插入注册事件队列;将定时器插入通用定时器队列或时间堆;将被激活的事件处理器添加到活动事件队列中。其实现如代码清单12-4所示。

代码清单12-4 event_queue_insert函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void event_queue_insert(struct event_base*base,struct
event*ev,int queue)
{
EVENT_BASE_ASSERT_LOCKED(base);
/*避免重复插入*/
if(ev->ev_flags&queue){
/*Double insertion is possible for active events*/
if(queue&EVLIST_ACTIVE) return;
event_errx(1,"%s:%p(fd%d)already on queue%x",__func__,ev,ev->
ev_fd,queue);
return;
}
if(~ev->ev_flags&EVLIST_INTERNAL) base->event_count++;
/*将event_base拥有的事件处理器总数加1*/
ev->ev_flags|=queue;/*标记此事件已被添加过*/
switch(queue){
/*将I/O事件处理器或信号事件处理器插入注册事件队列*/
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue,ev,ev_next);
break;
/*将就绪事件处理器插入活动事件队列*/
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], ev,ev_active_next);
break;
/*将定时器插入通用定时器队列或时间堆*/
case EVLIST_TIMEOUT:{
if(is_common_timeout(&ev->ev_timeout,base)){

struct common_timeout_list*ctl= get_common_timeout_list(base,&ev->ev_timeout);
insert_common_timeout_inorder(ctl,ev);
}else
min_heap_push(&base->timeheap,ev);
break;
}
default:
event_errx(1,"%s:unknown queue%x",__func__,queue);
}
}

2.5 往事件多路分发器中注册事件

event_queue_insert函数所做的仅仅是将一个事件处理器加入event_base的某个事件队列中。对于新添加的I/O事件处理器和信号事件处理器,我们还需要让事件多路分发器来监听其对应的事件,同时建立文件描述符、信号值与事件处理器之间的映射关系。这就要通过调用evmap_io_add和evmap_signal_add两个函数来完成。这两个函数相当于事件多路分发器中的register_event方法,它们由evmap.c文件实现。不过在讨论它们之前,我们先介绍一下它们将用到的一些重要数据结构,如代码清单12-5所示。

代码清单12-5 evmap_io、event_io_map和evmap_signal、evmap_signal_map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#ifdef EVMAP_USE_HT 
#include"ht-internal.h"
struct event_map_entry;
/*如果定义了EVMAP_USE_HT,则将event_io_map定义为哈希表。该哈希表存储
event_map_entry对象和I/O事件队列(见前文,具有同样文件描述符值的I/O事件处理器
构成 事件队列 之间的映射关系 实 也就 存储 文件描述符和 事件处 之间

构成I/O事件队列)之间的映射关系,实际上也就是存储了文件描述符和I/O事件处理器之间 的映射关系*/
HT_HEAD(event_io_map,event_map_entry);
#else
#define event_io_map event_signal_map
#endif

/*下面这个结构体中的entries数组成员存储信号值和信号事件处理器之间的映射关系
(用信号值索引数组entries即得到对应的信号事件处理器)*/
struct event_signal_map{
void**entries;/*用于存放evmap_io或evmap_signal的数组*/
int nentries;/*entries数组的大小*/
};

/*如果定义了EVMAP_USE_HT,则哈希表event_io_map中的成员具有如下类型*/
struct event_map_entry{
HT_ENTRY(event_map_entry)map_node;
evutil_socket_t fd;
union{
struct evmap_io evmap_io;
}ent;
};

/*event_list是由event组成的尾队列,前面讨论的所有事件队列都是这种类型*/
TAILQ_HEAD(event_list,event);

/*I/O事件队列(确切地说,evmap_io.events才是I/O事件队列)*/
struct evmap_io{
struct event_list events;
ev_uint16_t nread;
ev_uint16_t nwrite;
};

/*信号事件队列(确切地说,evmap_signal.events才是信号事件队列)*/
struct evmap_signal{
struct event_list events;
};

由于evmap_io_add和evmap_signal_add两个函数的逻辑基本相同,因此我们仅讨论evmap_io_add函数,如代码清单12-6所示。

代码清单12-6 evmap_io_add函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int evmap_io_add(struct event_base*base,evutil_socket_t
fd,struct event*ev)
{
/*获得event_base的后端I/O复用机制实例*/
const struct eventop*evsel=base->evsel;
/*获得event_base中文件描述符与I/O事件队列的映射表(哈希表或数组)*/
struct event_io_map*io=&base->io;
/*fd参数对应的I/O事件队列*/
struct evmap_io*ctx=NULL;
int nread,nwrite,retval=0;
short res=0,old=0;
struct event*old_ev;
EVUTIL_ASSERT(fd==ev->ev_fd);
if(fd<0) return 0;
#ifndef EVMAP_USE_HT
/*I/O事件队列数组io.entries中,每个文件描述符占用一项。如果fd大于当前数组
的大小,则增加数组的大小(扩大后的数组的容量要大于fd)*/
if(fd>=io->nentries){
if(evmap_make_space(io,fd,sizeof(struct evmap_io*))==-1) return(-1);
}
#endif
/*下面这个宏根据EVMAP_USE_HT是否被定义而有不同的实现,但目的都是创建ctx,
在映射表io中为fd和ctx添加映射关系*/
GET_IO_SLOT_AND_CTOR(ctx,io,fd,evmap_io,evmap_io_init,evsel->fdinfo_len);
nread=ctx->nread; nwrite=ctx->nwrite;
if(nread) old|=EV_READ;
if(nwrite) old|=EV_WRITE;
if(ev->ev_events&EV_READ){
if(++nread==1) res|=EV_READ;
}
if(ev->ev_events&EV_WRITE){
if(++nwrite==1) res|=EV_WRITE;
}
if(EVUTIL_UNLIKELY(nread>0xffff||nwrite>0xffff)){
event_warnx("Too many events reading or writing on fd%d",

(int)fd);
return-1;
}
if(EVENT_DEBUG_MODE_IS_ON() && (old_ev=TAILQ_FIRST(&ctx->events)) && (old_ev->ev_events&EV_ET)!=(ev->ev_events&EV_ET)){
event_warnx("Tried to mix edge-triggered and non-edge-triggered" "events on fd%d",(int)fd);
return-1;
}
if(res){
void*extra=((char*)ctx)+sizeof(struct evmap_io);
/*往事件多路分发器中注册事件。add是事件多路分发器的接口函数之一。对不同的后
端I/O复用机制,这些接口函数有不同的实现。我们将在后面讨论事件多路分发器的接口函数*/
if(evsel->add(base,ev->ev_fd, old,(ev->ev_events&EV_ET)|res,extra)==-1) return(-1);
retval=1;
}
ctx->nread=(ev_uint16_t)nread;
ctx->nwrite=(ev_uint16_t)nwrite;
/*将ev插到I/O事件队列ctx的尾部。ev_io_next是定义在event-internal.h文件
中的宏:#define ev_io_next_ev.ev_io.ev_io_next*/
TAILQ_INSERT_TAIL(&ctx->events,ev,ev_io_next);
return(retval);
}

2.6 eventop结构体

eventop结构体封装了I/O复用机制必要的一些操作,比如注册事件、等待事件等。它为event_base支持的所有后端I/O复用机制提供了一个统一的接口。该结构体定义在event-internal.h文件中,如代码清单12-7所示。

代码清单12-7 eventop结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct eventop{ 
/*后端I/O复用技术的名称*/
const char*name;
/*初始化函数*/
void*(*init)(struct event_base*);
/*注册事件*/
int(*add)(struct event_base*,evutil_socket_t fd,short old,short
events,void*fdinfo);
/*删除事件*/
int(*del)(struct event_base*,evutil_socket_t fd,short old,short
events,void*fdinfo);
/*等待事件*/
int(*dispatch)(struct event_base*,struct timeval*);
/*释放I/O复用机制使用的资源*/
void(*dealloc)(struct event_base*);
/*程序调用fork之后是否需要重新初始化event_base*/
int need_reinit;
/*I/O复用技术支持的一些特性,可选如下3个值的按位或:EV_FEATURE_ET(支持边
缘触发事件EV_ET)、EV_FEATURE_O1(事件检测算法的复杂度是O(1))和 EV_FEATURE_FDS(不仅能监听socket上的事件,还能监听其他类型的文件描述符上的事情)*/
enum event_method_feature features;
/*有的I/O复用机制需要为每个I/O事件队列和信号事件队列分配额外的内存,以避免
同一个文件描述符被重复插入I/O复用机制的事件表中。evmap_io_add(或 evmap_io_del)函数在调用eventop的add(或del)方法时,将这段内存的起始地址作 为第5个参数传递给add(或del)方法。下面这个成员则指定了这段内存的长度*/
size_t fdinfo_len;
};

前文提到,devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c文件分别针对不同的I/O复用技术实现了eventop定义的这套接口。那么,在支持多种I/O复用技术的系统上,Libevent将选择使用哪个呢?这取决于这些I/O复用技术的优先级。

Libevent支持的后端I/O复用技术及它们的优先级在event.c文件中定义,如代码清单12-8所示。

代码清单12-8 Libevent支持的后端I/O复用技术及它们的优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifdef_EVENT_HAVE_EVENT_PORTS extern const struct eventop evportops; #endif 
#ifdef_EVENT_HAVE_SELECT extern const struct eventop selectops; #endif
#ifdef_EVENT_HAVE_POLL extern const struct eventop pollops; #endif
#ifdef_EVENT_HAVE_EPOLL extern const struct eventop epollops; #endif
#ifdef_EVENT_HAVE_WORKING_KQUEUE extern const struct eventop kqops; #endif
#ifdef_EVENT_HAVE_DEVPOLL extern const struct eventop devpollops; #endif
#ifdef WIN32 extern const struct eventop win32ops; #endif
static const struct eventop*eventops[]={
#ifdef_EVENT_HAVE_EVENT_PORTS &evportops, #endif
#ifdef_EVENT_HAVE_WORKING_KQUEUE &kqops, #endif
#ifdef_EVENT_HAVE_EPOLL &epollops, #endif
#ifdef_EVENT_HAVE_DEVPOLL &devpollops, #endif
#ifdef_EVENT_HAVE_POLL &pollops, #endif
#ifdef_EVENT_HAVE_SELECT &selectops, #endif
#ifdef WIN32 &win32ops, #endif
NULL
};

Libevent通过遍历eventops数组来选择其后端I/O复用技术。遍历的顺序是从数组的第一个元素开始,到最后一个元素结束。所以在Linux下,Libevent默认选择的后端I/O复用技术是epoll。但很显然,用户可以修改代码清单12-8中定义的一系列宏来选择使用不同的后端I/O复用技术。

2.7 event_base结构体

结构体event_base是Libevent的Reactor。它定义在event-internal.h文件中,如代码清单12-9所示。

代码清单12-9 event_base结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
struct event_base{ 
/*初始化Reactor的时候选择一种后端I/O复用机制,并记录在如下字段中*/
const struct eventop*evsel;
/*指向I/O复用机制真正存储的数据,它通过evsel成员的init函数来初始化*/
void*evbase;
/*事件变化队列。其用途是:如果一个文件描述符上注册的事件被多次修改,则可以使
用缓冲来避免重复的系统调用(比如epoll_ctl)。它仅能用于时间复杂度为O(1)的I/O复用技术*/
struct event_changelist changelist;
/*指向信号的后端处理机制,目前仅在singal.h文件中定义了一种处理方法*/
const struct eventop*evsigsel;
/*信号事件处理器使用的数据结构,其中封装了一个由socketpair创建的管道。它用
于信号处理函数和事件多路分发器之间的通信,这和我们在10.4节讨论的统一事件源的思路是一样的*/
struct evsig_info sig;
/*添加到该event_base的虚拟事件、所有事件和激活事件的数量*/
int virtual_event_count;
int event_count;
int event_count_active;
/*是否执行完活动事件队列上剩余的任务之后就退出事件循环*/
int event_gotterm;
/*是否立即退出事件循环,而不管是否还有任务需要处理*/
int event_break;
/*是否应该启动一个新的事件循环*/
int event_continue;
/*目前正在处理的活动事件队列的优先级*/
int event_running_priority;
/*事件循环是否已经启动*/
int running_loop;
/*活动事件队列数组。索引值越小的队列,优先级越高。高优先级的活动事件队列中的
事件处理器将被优先处理*/
struct event_list*activequeues;
/*活动事件队列数组的大小,即该event_base一共有nactivequeues个不同优先级
的活动事件队列*/
int nactivequeues;
/*下面3个成员用于管理通用定时器队列*/
struct common_timeout_list**common_timeout_queues;
int n_common_timeouts;
int n_common_timeouts_allocated;
/*存放延迟回调函数的链表。事件循环每次成功处理完一个活动事件队列中的所有事件
之后,就调用一次延迟回调函数*/
struct deferred_cb_queue defer_queue;
/*文件描述符和I/O事件之间的映射关系表*/
struct event_io_map io;
/*信号值和信号事件之间的映射关系表*/
struct event_signal_map sigmap;
/*注册事件队列,存放I/O事件处理器和信号事件处理器*/
struct event_list eventqueue;
/*时间堆*/
struct min_heap timeheap;
/*管理系统时间的一些成员*/
struct timeval event_tv;
struct timeval tv_cache;
#if defined(_EVENT_HAVE_CLOCK_GETTIME) &&
defined(CLOCK_MONOTONIC)
struct timeval tv_clock_diff;
time_t last_updated_clock_diff;
#endif
/*多线程支持*/
#ifndef_EVENT_DISABLE_THREAD_SUPPORT
unsigned long th_owner_id;/*当前运行该event_base的事件循环的线程*/
void*th_base_lock;/*对event_base的独占锁*/
/*当前事件循环正在执行哪个事件处理器的回调函数*/
struct event*current_event;
/*条件变量(见第14章),用于唤醒正在等待某个事件处理完毕的线程*/
void*current_event_cond;
int current_event_waiters;/*等待current_event_cond的线程数*/
#endif
#ifdef WIN32

struct event_iocp_port*iocp;
#endif
/*该event_base的一些配置参数*/
enum event_base_config_flag flags;
/*下面这组成员变量给工作线程唤醒主线程提供了方法(使用socketpair创建的管
道)*/
int is_notify_pending;
evutil_socket_t th_notify_fd[2];
struct event th_notify;
int(*th_notify_fn)(struct event_base*base);
};

2.8 事件循环

最后,我们讨论一下Libevent的“动力”,即事件循环。Libevent中实现事件循环的函数是event_base_loop。该函数首先调用I/O事件多路分发器的事件监听函数,以等待事件;当有事件发生时,就依次处理之。event_base_loop函数的实现如代码清单12-10所示。

代码清单12-10 event_base_loop函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
int event_base_loop(struct event_base*base,int flags) { 
const struct eventop*evsel=base->evsel;
struct timeval tv;
struct timeval*tv_p;
int res,done,retval=0;
EVBASE_ACQUIRE_LOCK(base,th_base_lock);
/*一个event_base仅允许运行一个事件循环*/
if(base->running_loop){
event_warnx("%s:reentrant invocation.Only one event_base_loop" "can run on each event_base at once.",__func__);
EVBASE_RELEASE_LOCK(base,th_base_lock);
return-1;
}
base->running_loop=1;/*标记该event_base已经开始运行*/
clear_time_cache(base);/*清除event_base的系统时间缓存*/

/*设置信号事件的event_base实例*/
if(base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done=0;
#ifndef_EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id=EVTHREAD_GET_ID();
#endif
base->event_gotterm=base->event_break=0;
while(!done){
base->event_continue=0;
if(base->event_gotterm){
break;
}
if(base->event_break){
break;
}
timeout_correct(base,&tv);/*校准系统时间*/
tv_p=&tv;
if(!N_ACTIVE_CALLBACKS(base) && !(flags&EVLOOP_NONBLOCK)){
/*获取时间堆上堆顶元素的超时值,即I/O复用系统调用本次应该设置的超时值*/
timeout_next(base,&tv_p);
}else{
/*如果有就绪事件尚未处理,则将I/O复用系统调用的超时时间“置0”。这样I/O复用系
统调用直接返回,程序也就可以立即处理就绪事件了*/
evutil_timerclear(&tv);
}
/*如果event_base中没有注册任何事件,则直接退出事件循环*/
if(!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)){
event_debug(("%s:no events registered.",__func__));
retval=1;
goto done;
}
/*更新系统时间,并清空时间缓存*/
gettime(base,&base->event_tv);
clear_time_cache(base);
/*调用事件多路分发器的dispatch方法等待事件,将就绪事件插入活动事件队列*/
res=evsel->dispatch(base,tv_p);
if(res==-1){
event_debug(("%s:dispatch returned unsuccessfully.",__func__));
retval=-1;
goto done;
}
update_time_cache(base);/*将时间缓存更新为当前系统时间*/
/*检查时间堆上的到期事件并依次执行之*/
timeout_process(base);
if(N_ACTIVE_CALLBACKS(base)){
/*调用event_process_active函数依次处理就绪的信号事件和I/O事件*/

int n=event_process_active(base);
if((flags&EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base)==0 && n!=0) done=1;
}else if(flags&EVLOOP_NONBLOCK) done=1;
}
event_debug(("%s:asked to terminate loop.",__func__));
done:
/*事件循环结束,清空时间缓存,并设置停止循环标志*/
clear_time_cache(base);
base->running_loop=0;
EVBASE_RELEASE_LOCK(base,th_base_lock);
return(retval);
}

12. 高性能I/O框架库Libevent
http://binbo-zappy.github.io/2024/12/16/Linux高性能服务器编程-游双/12-高性能IO框架库Libevent/
作者
Binbo
发布于
2024年12月16日
许可协议