第12章
高性能I/O框架库Libevent
前面三章的篇幅较为细致地讨论了Linux服务器程序必须处理的三类事件:I/O事件、信号和定时事件。在处理这三类事件时我们通常需要考虑如下三个问题:
❑统一事件源。很明显,统一处理这三类事件既能使代码简单易懂,又能避免一些潜在的逻辑错误。前面我们已经讨论了实现统一事件源的一般方法——利用I/O复用系统调用来管理所有事件。
❑可移植性。不同的操作系统具有不同的I/O复用方式,比如Solaris的dev/poll文件,FreeBSD的kqueue机制,Linux的epoll系列系统调用。
❑对并发编程的支持。在多进程和多线程环境下,我们需要考虑各执行实体如何协同处理客户连接、信号和定时器,以避免竞态条件。
所幸的是,开源社区提供了诸多优秀的I/O框架库。它们不仅解决了上述问题,让开发者可以将精力完全放在程序的逻辑上,而且稳定性、性能等各方面都相当出色。比如ACE、ASIO和Libevent。本章将介绍其中相对轻量级的Libevent框架库。
1. I/O框架库概述
I/O框架库以库函数的形式,封装了较为底层的系统调用,给应用程序提供了一组更便于使用的接口。这些库函数往往比程序员自己实现的同样功能的函数更合理、更高效,且更健壮。因为它们经受住了真实网络环境下的高压测试,以及时间的考验。
各种I/O框架库的实现原理基本相似,要么以Reactor模式实现,要么以Proactor模式实现,要么同时以这两种模式实现。
举例来说,基于Reactor模式的I/O框架库包含如下几个组件:句柄(Handle)、事件多路分发器(EventDemultiplexer)、事件处理器(EventHandler)和具体的事件处理器(ConcreteEventHandler)、Reactor。
1.句柄
I/O框架库要处理的对象,即I/O事件、信号和定时事件,统一称为事件源。
一个事件源通常和一个句柄绑定在一起。句柄的作用是,当内核检测到就绪事件时,它将通过句柄来通知应用程序这一事件。在Linux环境下,I/O事件对应的句柄是文件描述符,信号事件对应的句柄就是信号值。
2.事件多路分发器
事件的到来是随机的、异步的。
我们无法预知程序何时收到一个客户连接请求,又亦或收到一个暂停信号。所以程序需要循环地等待并处理事件,这就是事件循环。
在事件循环中,等待事件一般使用I/O复用技术来实现。
I/O框架库一般将系统支持的各种I/O复用系统调用封装成统一的接口,称为事件多路分发器。
事件多路分发器的demultiplex方法是等待事件的核心函数,其内部调用的是select、poll、epoll_wait等函数。
此外,事件多路分发器还需要实现register_event和remove_event方法,以供调用者往事件多路分发器中添加事件和从事件多路分发器中删除事件。
3.事件处理器和具体事件处理器
事件处理器执行事件对应的业务逻辑。
它通常包含一个或多个handle_event回调函数,这些回调函数在事件循环中被执行。I/O框架库提供的事件处理器通常是一个接口,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般被声明为虚函数,以支持用户的扩展。
此外,事件处理器一般还提供一个get_handle方法,它返回与该事件处理器关联的句柄。那么,事件处理器和句柄有什么关系?当事件多路分发器检测到有事件发生时,它是通过句柄来通知应用程序的。因此,我们必须将事件处理器和句柄绑定,才能在事件发生时获取到正确的事件处理器。
4.Reactor
Reactor是I/O框架库的核心。它提供的几个主要方法是:
❑handle_events。该方法执行事件循环。它重复如下过程:等待事件,然后依次处理所有就绪事件对应的事件处理器。
❑register_handler。该方法调用事件多路分发器的register_event方法来往事件多路分发器中注册一个事件。
❑remove_handler。该方法调用事件多路分发器的remove_event方法来删除事件多路分发器中的一个事件。
2. Libevent源码分析
Libevent是开源社区的一款高性能的I/O框架库,其学习者和使用者众多。使用Libevent的著名案例有:高性能的分布式内存对象缓存软件memcached,Google浏览器Chromium的Linux版本。作为一个I/O框架库,Libevent具有如下特点:
❑跨平台支持。Libevent支持Linux、UNIX和Windows。
❑统一事件源。Libevent对I/O事件、信号和定时事件提供统一的处理。
❑线程安全。Libevent使用libevent_pthreads库来提供线程安全支持。
❑基于Reactor模式的实现。
这一节中我们将简单地研究一下Libevent源代码的主要部分。分析它除了可以更好地学习网络编程外,还有如下好处:
❑学习编写一个产品级的函数库要考虑哪些细节。
❑提高C语言功底。Libevent源码中使用了大量的函数指针,用C语言实现了多态机制,并提供了一些基础数据结构的高效实现,比如双向链表、最小堆等。
Libevent的官方网站是http://libevent.org/,其中提供Libevent源代码的下载,以及学习Libevent框架库的第一手文档,并且源码和文档的更新也较为频繁。Libevent版本是2.0.19。
2.1 一个实例
分析一款软件的源代码,最简单有效的方式是从使用入手,这样才能真正从整体上把握该软件的逻辑结构。代码清单12-1是使用Libevent库实现的一个“Hello
World”程序。
代码清单12-1 Libevent实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include <sys/signal.h> #include <event.h> void signal_cb (int fd, short event, void * argc) { struct event_base * base = (struct event_base*)argc; struct timeval delay = { 2 , 0 }; printf ( "Caught an interrupt signal; exiting cleanly in two seconds...\n" ); event_base_loopexit(base, &delay); }void timeout_cb (int fd, short event, void * argc) { printf ("timeout\n" ); }int main () { struct event_base * base = event_init(); struct event * signal_event = evsignal_new(base, SIGINT, signal_cb, base); event_add(signal_event, NULL ); struct timeval tv = {1 , 0 }; struct event * timeout_event = evtimer_new(base, timeout_cb, NULL ); event_add(timeout_event, &tv); event_base_dispatch(base); event_free(timeout_event); event_free(signal_event); event_base_free(base); }
代码清单12-1虽然简单,但却基本上描述了Libevent库的主要逻辑:
1)调用event_init函数创建event_base对象。一个event_base相当于一个Reactor实例。
2)创建具体的事件处理器,并设置它们所从属的Reactor实例。evsignal_new和evtimer_new分别用于创建信号事件处理器和定时事件处理器,它们是定义在include/event2/event.h文件中的宏:
1 2 #define evsignal_new(b,x,cb,arg)\ event_new((b),(x),EV_SIGNAL|EV_PERSIST,(cb),(arg)) #define evtimer_new(b,cb,arg)event_new((b),-1,0,(cb),(arg))
可见,它们的统一入口是event_new函数,即用于创建通用事件处理器(图12-1中的EventHandler)的函数。其定义是:
1 2 struct event*event_new (struct event_base*base,evutil_socket_t fd,short events,void (*cb)(evutil_socket_t ,short ,void *),void *arg)
base参数指定新创建的事件处理器从属的Reactor。
fd参数指定与该事件处理器关联的句柄。创建I/O事件处理器时,应该给fd参数传递文件描述符值;创建信号事件处理器时,应该给fd参数传递信号值,比如代码清单12-1中的SIGINT;
创建定时事件处理器时,则应该给fd参数传递-1。
events参数指定事件类型,其可选值都定义在include/event2/event.h文件中,如代码清单12-2所示。
代码清单12-2 Libevent支持的事件类型
1 2 3 4 5 6 7 #define EV_TIMEOUT 0x01 #define EV_READ 0x02 #define EV_WRITE 0x04 #define EV_SIGNAL 0x08 #define EV_PERSIST 0x10 #define EV_ET 0x20
代码清单12-2中,EV_PERSIST的作用是:事件被触发后,自动重新对这个event调用event_add函数(见后文)。
cb参数指定目标事件对应的回调函数,相当于图12-1中事件处理器的handle_event方法。arg参数则是Reactor传递给回调函数的参数。
event_new函数成功时返回一个event类型的对象,也就是Libevent的事件处理器。Libevent用单词“event”来描述事件处理器,而不是事件,会使读者觉得有些混乱,故而我们约定如下:
❑事件指的是一个句柄上绑定的事件,比如文件描述符0上的可读事件。
❑事件处理器,也就是event结构体类型的对象,除了包含事件必须具备的两个要素(句柄和事件类型)外,还有很多其他成员,比如回调函数。
❑事件由事件多路分发器管理,事件处理器则由事件队列管理。事件队列包括多种,比如event_base中的注册事件队列、活动事件队列和通用定时器队列,以及evmap中的I/O事件队列、信号事件队列。关于这些事件队列,我们将在后文依次讨论。
❑事件循环对一个被激活事件(就绪事件)的处理,指的是执行该事件对应的事件处理器中的回调函数。
3)调用event_add函数,将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器中。event_add函数相当于Reactor中的register_handler方法。
4)调用event_base_dispatch函数来执行事件循环。
5)事件循环结束后,使用*_free系列函数来释放系统资源。
由此可见,代码清单12-1给我们提供了一条分析Libevent源代码的主线。不过在此之前,我们先简单介绍一下Libevent源代码的组织结构。
2.2 源代码组织结构
Libevent源代码中的目录和文件按照功能可划分为如下部分:
❑头文件目录include/event2。该目录是自Libevent主版本升级到2.0之后引入的,在1.4及更老的版本中并无此目录。该目录中的头文件是Libevent提供给应用程序使用的,比如,event.h头文件提供核心函数,http.h头文件提供HTTP协议相关服务,rpc.h头文件提供远程过程调用支持。
❑源码根目录下的头文件。这些头文件分为两类:一类是对include/event2目录下的部分头文件的包装,另外一类是供Libevent内部使用的辅助性头文件,它们的文件名都具有*-internal.h的形式。
❑通用数据结构目录compat/sys。该目录下仅有一个文件——queue.h。它封装了跨平台的基础数据结构,包括单向链表、双向链表、队列、尾队列和循环队列。
❑sample目录。它提供一些示例程序。
❑test目录。它提供一些测试代码。
❑WIN32-Code目录。它提供Windows平台上的一些专用代码。
❑event.c文件。该文件实现Libevent的整体框架,主要是event和event_base两个结构体的相关操作。
❑devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c文件。它们分别封装了如下I/O复用机制:/dev/poll、kqueue、event
ports、POSIX select、Windows
select、poll和epoll。这些文件的主要内容相似,都是针对结构体eventop(见后文)所定义的接口函数的具体实现。
❑minheap-internal.h文件。该文件实现了一个时间堆,以提供对定时事件的支持。
❑signal.c文件。它提供对信号的支持。其内容也是针对结构体eventop所定义的接口函数的具体实现。
❑evmap.c文件。它维护句柄(文件描述符或信号)与事件处理器的映射关系。
❑event_tagging.c文件。它提供往缓冲区中添加标记数据(比如一个整数),以及从缓冲区中读取标记数据的函数。
❑event_iocp.c文件。它提供对Windows IOCP(Input/Output Completion
Port,输入输出完成端口)的支持。
❑buffer*.c文件。它提供对网络I/O缓冲的控制,包括:输入输出数据过滤,传输速率限制,使用SSL(Secure
Sockets Layer)协议对应用数据进行保护,以及零拷贝文件传输等。
❑evthread*.c文件。它提供对多线程的支持。
❑listener.c文件。它封装了对监听socket的操作,包括监听连接和接受连接。
❑logs.c文件。它是Libevent的日志系统。
❑evutil.c、evutil_rand.c、strlcpy.c和arc4random.c文件。它们提供一些基本操作,比如生成随机数、获取socket地址信息、读取文件、设置socket属性等。
❑evdns.c、http.c和evrpc.c文件。它们分别提供了对DNS协议、HTTP协议和RPC(Remote
Procedure Call,远程过程调用)协议的支持。
❑epoll_sub.c文件。该文件未见使用。
在整个源码中,event-internal.h、include/event2/event_struct.h、event.c和evmap.c等4个文件最为重要。它们定义了event和event_base结构体,并实现了这两个结构体的相关操作。下面的讨论也主要是围绕这几个文件展开的。
2.3 event结构体
前文提到,Libevent中的事件处理器是event结构类型。event结构体封装了句柄、事件类型、回调函数,以及其他必要的标志和数据。该结构体在include/event2/event_struct.h文件中定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 struct event { TAILQ_ENTRY(event)ev_active_next; TAILQ_ENTRY(event)ev_next; union { TAILQ_ENTRY(event)ev_next_with_common_timeout; int min_heap_idx; }ev_timeout_pos; evutil_socket_t ev_fd; struct event_base *ev_base ; union { struct { TAILQ_ENTRY(event)ev_io_next; struct timeval ev_timeout ; }ev_io; struct { TAILQ_ENTRY(event)ev_signal_next; short ev_ncalls; short *ev_pncalls; }ev_signal; }_ev; short ev_events; short ev_res; short ev_flags; ev_uint8_t ev_pri; ev_uint8_t ev_closure; struct timeval ev_timeout ; void (*ev_callback)(evutil_socket_t ,short ,void *arg); void *ev_arg; };
下面我们详细介绍event结构体中的每个成员:
❑ev_events。它代表事件类型。其取值可以是代码清单12-2所示的标志的按位或(互斥的事件类型除外,比如读写事件和信号事件就不能同时被设置)。
❑ev_next。所有已经注册的事件处理器(包括I/O事件处理器和信号事件处理器)通过该成员串联成一个尾队列,我们称之为注册事件队列。宏TAILQ_ENTRY是尾队列中的节点类型,它定义在compat/sys/queue.h文件中:
1 #define TAILQ_ENTRY(type)\ struct{\ struct type*tqe_next;\ struct type**tqe_prev;\ }
❑ev_active_next。所有被激活的事件处理器通过该成员串联成一个尾队列,我们称之为活动事件队列。活动事件队列不止一个,不同优先级的事件处理器被激活后将被插入不同的活动事件队列中。在事件循环中,Reactor将按优先级从高到低遍历所有活动事件队列,并依次处理其中的事件处理器。
❑ev_timeout_pos。这是一个联合体,它仅用于定时事件处理器。为了讨论的方便,后面我们称定时事件处理器为定时器。老版本的Libevent中,定时器都是由时间堆来管理的。但开发者认为有时候使用简单的链表来管理定时器将具有更高的效率。因此,新版本的Libevent就引入了所谓“通用定时器”的概念。这些定时器不是存储在时间堆中,而是存储在尾队列中,我们称之为通用定时器队列。对于通用定时器而言,ev_timeout_pos联合体的ev_next_with_common_timeout成员指出了该定时器在通用定时器队列中的位置。对于其他定时器而言,ev_timeout_pos联合体的min_heap_idx成员指出了该定时器在时间堆中的位置。一个定时器是否是通用定时器取决于其超时值大小,具体判断原则请读者自己参考event.c文件中的is_common_timeout函数。
❑_ev。这是一个联合体。所有具有相同文件描述符值的I/O事件处理器通过ev.ev_io.ev_io_next成员串联成一个尾队列,我们称之为I/O事件队列;所有具有相同信号值的信号事件处理器通过ev.ev_signal.ev_signal_next成员串联成一个尾队列,我们称之为信号事件队列。ev.ev_signal.ev_ncalls成员指定信号事件发生时,Reactor需要执行多少次该事件对应的事件处理器中的回调函数。ev.ev_signal.ev_pncalls指针成员要么是NULL,要么指向ev.ev_signal.ev_ncalls。
在程序中,我们可能针对同一个socket文件描述符上的可读/可写事件创建多个事件处理器(它们拥有不同的回调函数)。当该文件描述符上有可读/可写事件发生时,所有这些事件处理器都应该被处理。所以,Libevent使用I/O事件队列将具有相同文件描述符值的事件处理器组织在一起。这样,当一个文件描述符上有事件发生时,事件多路分发器就能很快地把所有相关的事件处理器添加到活动事件队列中。信号事件队列的存在也是由于相同的原因。可见,I/O事件队列和信号事件队列并不是注册事件队列的细致分类,而是另有用处。
❑ev_fd。对于I/O事件处理器,它是文件描述符值;对于信号事件处理器,它是信号值。
❑ev_base。该事件处理器从属的event_base实例。
❑ev_res。它记录当前激活事件的类型。
❑ev_flags。它是一些事件标志。其可选值定义在include/event2/event_struct.h文件中:
1 2 3 4 5 6 7 #define EVLIST_TIMEOUT 0x01 #define EVLIST_INSERTED 0x02 #define EVLIST_SIGNAL 0x04 #define EVLIST_ACTIVE 0x08 #define EVLIST_INTERNAL 0x10 #define EVLIST_INIT 0x80 #define EVLIST_ALL (0xf000|0x9f)
❑ev_pri。它指定事件处理器优先级,值越小则优先级越高。
❑ev_closure。它指定event_base执行事件处理器的回调函数时的行为。其可选值定义于event-internal.h文件中:
1 2 3 4 5 #define EV_CLOSURE_NONE 0 #define EV_CLOSURE_SIGNAL 1 #define EV_CLOSURE_PERSIST 2
❑ev_timeout。它仅对定时器有效,指定定时器的超时值。
❑ev_callback。它是事件处理器的回调函数,由event_base调用。回调函数被调用时,它的3个参数分别被传入事件处理器的如下3个成员:ev_fd、ev_res和ev_arg。
❑ev_arg。回调函数的参数。
2.4
往注册事件队列中添加事件处理器
前面提到,创建一个event对象的函数是event_new(及其变体),它在event.c文件中实现。该函数的实现相当简单,主要是给event对象分配内存并初始化它的部分成员,因此我们不讨论它。event对象创建好后,应用程序需要调用event_add函数将其添加到注册事件队列中,并将对应的事件注册到事件多路分发器上。event_add函数在event.c文件中实现,主要是调用另外一个内部函数event_add_internal,如代码清单12-3所示。
代码清单12-3 event_add_internal函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 static inline int event_add_internal (struct event*ev,const struct timeval*tv,int tv_is_absolute) { struct event_base *base = ev->ev_base; int res=0 ; int notify=0 ; EVENT_BASE_ASSERT_LOCKED(base); _event_debug_assert_is_setup(ev); event_debug(( "event_add:event:%p(fd%d),%s%s%scall%p" , ev, (int )ev->ev_fd, ev->ev_events&EV_READ?"EV_READ" :"" , ev->ev_events&EV_WRITE?"EV_WRITE" :"" , tv?"EV_TIMEOUT" :"" , ev->ev_callback)); EVUTIL_ASSERT(!(ev->ev_flags&~EVLIST_ALL)); if (tv!=NULL && !(ev->ev_flags&EVLIST_TIMEOUT)){ if (min_heap_reserve(&base->timeheap, 1 +min_heap_size(&base->timeheap))==-1 ) return (-1 ); } #ifndef_EVENT_DISABLE_THREAD_SUPPORT if (base->current_event==ev && (ev->ev_events&EV_SIGNAL) &&!EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond,base->th_base_lock); } #endif if ((ev->ev_events&(EV_READ|EV_WRITE|EV_SIGNAL)) && !(ev->ev_flags&(EVLIST_INSERTED|EVLIST_ACTIVE))){ if (ev->ev_events&(EV_READ|EV_WRITE)) res=evmap_io_add(base,ev->ev_fd,ev); else if (ev->ev_events&EV_SIGNAL) res=evmap_signal_add(base,(int )ev->ev_fd,ev); if (res!=-1 ) event_queue_insert(base,ev,EVLIST_INSERTED); if (res==1 ){ notify=1 ; res=0 ; } } if (res!=-1 && tv!=NULL ){ struct timeval now ; int common_timeout; if (ev->ev_closure==EV_CLOSURE_PERSIST && !tv_is_absolute) ev->ev_io_timeout=*tv; if (ev->ev_flags&EVLIST_TIMEOUT){ if (min_heap_elt_is_top(ev)) notify=1 ; event_queue_remove(base,ev,EVLIST_TIMEOUT); } if ((ev->ev_flags&EVLIST_ACTIVE) && (ev->ev_res&EV_TIMEOUT)){ if (ev->ev_events&EV_SIGNAL){ if (ev->ev_ncalls && ev->ev_pncalls){ *ev->ev_pncalls=0 ; } } event_queue_remove(base,ev,EVLIST_ACTIVE); } gettime(base,&now); common_timeout=is_common_timeout(tv,base); if (tv_is_absolute){ ev->ev_timeout=*tv; }else if (common_timeout){ struct timeval tmp=*tv; tmp.tv_usec&=MICROSECONDS_MASK; evutil_timeradd(&now,&tmp,&ev->ev_timeout); ev->ev_timeout.tv_usec|= (tv->tv_usec&~MICROSECONDS_MASK); }else { evutil_timeradd(&now,tv,&ev->ev_timeout); } event_debug(( "event_add:timeout in%d seconds,call%p" , (int )tv->tv_sec,ev->ev_callback)); event_queue_insert(base,ev,EVLIST_TIMEOUT); if (common_timeout){ struct common_timeout_list *ctl = get_common_timeout_list(base,&ev->ev_timeout); if (ev==TAILQ_FIRST(&ctl->events)){ common_timeout_schedule(ctl,&now,ev); } }else { if (min_heap_elt_is_top(ev)) notify=1 ; } } if (res!=-1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); _event_debug_note_add(ev); return (res); }
从代码清单12-3可见,event_add_internal函数内部调用了几个重要的函数:
❑evmap_io_add。该函数将I/O事件添加到事件多路分发器中,并将对应的事件处理器添加到I/O事件队列中,同时建立I/O事件和I/O事件处理器之间的映射关系。我们将在下一节详细讨论该函数。
❑evmap_signal_add。该函数将信号事件添加到事件多路分发器中,并将对应的事件处理器添加到信号事件队列中,同时建立信号事件和信号事件处理器之间的映射关系。
❑event_queue_insert。该函数将事件处理器添加到各种事件队列中:将I/O事件处理器和信号事件处理器插入注册事件队列;将定时器插入通用定时器队列或时间堆;将被激活的事件处理器添加到活动事件队列中。其实现如代码清单12-4所示。
代码清单12-4 event_queue_insert函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 static void event_queue_insert (struct event_base*base,struct event*ev,int queue ) { EVENT_BASE_ASSERT_LOCKED(base); if (ev->ev_flags&queue ){ if (queue &EVLIST_ACTIVE) return ; event_errx(1 ,"%s:%p(fd%d)already on queue%x" ,__func__,ev,ev-> ev_fd,queue ); return ; } if (~ev->ev_flags&EVLIST_INTERNAL) base->event_count++; ev->ev_flags|=queue ; switch (queue ){ case EVLIST_INSERTED: TAILQ_INSERT_TAIL(&base->eventqueue,ev,ev_next); break ; case EVLIST_ACTIVE: base->event_count_active++; TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], ev,ev_active_next); break ; case EVLIST_TIMEOUT:{ if (is_common_timeout(&ev->ev_timeout,base)){ struct common_timeout_list *ctl = get_common_timeout_list(base,&ev->ev_timeout); insert_common_timeout_inorder(ctl,ev); }else min_heap_push(&base->timeheap,ev); break ; } default : event_errx(1 ,"%s:unknown queue%x" ,__func__,queue ); } }
2.5 往事件多路分发器中注册事件
event_queue_insert函数所做的仅仅是将一个事件处理器加入event_base的某个事件队列中。对于新添加的I/O事件处理器和信号事件处理器,我们还需要让事件多路分发器来监听其对应的事件,同时建立文件描述符、信号值与事件处理器之间的映射关系。这就要通过调用evmap_io_add和evmap_signal_add两个函数来完成。这两个函数相当于事件多路分发器中的register_event方法,它们由evmap.c文件实现。不过在讨论它们之前,我们先介绍一下它们将用到的一些重要数据结构,如代码清单12-5所示。
代码清单12-5
evmap_io、event_io_map和evmap_signal、evmap_signal_map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #ifdef EVMAP_USE_HT #include "ht-internal.h" struct event_map_entry ; HT_HEAD(event_io_map,event_map_entry); #else #define event_io_map event_signal_map #endif struct event_signal_map { void **entries; int nentries; }; struct event_map_entry { HT_ENTRY(event_map_entry)map_node; evutil_socket_t fd; union { struct evmap_io evmap_io ; }ent; }; TAILQ_HEAD(event_list,event); struct evmap_io { struct event_list events ; ev_uint16_t nread; ev_uint16_t nwrite; }; struct evmap_signal { struct event_list events ; };
由于evmap_io_add和evmap_signal_add两个函数的逻辑基本相同,因此我们仅讨论evmap_io_add函数,如代码清单12-6所示。
代码清单12-6 evmap_io_add函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 int evmap_io_add (struct event_base*base,evutil_socket_t fd,struct event*ev) { const struct eventop *evsel = base->evsel; struct event_io_map *io = &base->io; struct evmap_io *ctx =NULL ; int nread,nwrite,retval=0 ; short res=0 ,old=0 ; struct event *old_ev ; EVUTIL_ASSERT(fd==ev->ev_fd); if (fd<0 ) return 0 ; #ifndef EVMAP_USE_HT if (fd>=io->nentries){ if (evmap_make_space(io,fd,sizeof (struct evmap_io*))==-1 ) return (-1 ); } #endif GET_IO_SLOT_AND_CTOR(ctx,io,fd,evmap_io,evmap_io_init,evsel->fdinfo_len); nread=ctx->nread; nwrite=ctx->nwrite; if (nread) old|=EV_READ; if (nwrite) old|=EV_WRITE; if (ev->ev_events&EV_READ){ if (++nread==1 ) res|=EV_READ; } if (ev->ev_events&EV_WRITE){ if (++nwrite==1 ) res|=EV_WRITE; } if (EVUTIL_UNLIKELY(nread>0xffff ||nwrite>0xffff )){ event_warnx("Too many events reading or writing on fd%d" , (int )fd); return -1 ; } if (EVENT_DEBUG_MODE_IS_ON() && (old_ev=TAILQ_FIRST(&ctx->events)) && (old_ev->ev_events&EV_ET)!=(ev->ev_events&EV_ET)){ event_warnx("Tried to mix edge-triggered and non-edge-triggered" "events on fd%d" ,(int )fd); return -1 ; } if (res){ void *extra=((char *)ctx)+sizeof (struct evmap_io); if (evsel->add(base,ev->ev_fd, old,(ev->ev_events&EV_ET)|res,extra)==-1 ) return (-1 ); retval=1 ; } ctx->nread=(ev_uint16_t )nread; ctx->nwrite=(ev_uint16_t )nwrite; TAILQ_INSERT_TAIL(&ctx->events,ev,ev_io_next); return (retval); }
2.6 eventop结构体
eventop结构体封装了I/O复用机制必要的一些操作,比如注册事件、等待事件等。它为event_base支持的所有后端I/O复用机制提供了一个统一的接口。该结构体定义在event-internal.h文件中,如代码清单12-7所示。
代码清单12-7 eventop结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 struct eventop { const char *name; void *(*init)(struct event_base*); int (*add)(struct event_base*,evutil_socket_t fd,short old,short events,void *fdinfo); int (*del)(struct event_base*,evutil_socket_t fd,short old,short events,void *fdinfo); int (*dispatch)(struct event_base*,struct timeval*); void (*dealloc)(struct event_base*); int need_reinit; enum event_method_feature features ; size_t fdinfo_len; };
前文提到,devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c文件分别针对不同的I/O复用技术实现了eventop定义的这套接口。那么,在支持多种I/O复用技术的系统上,Libevent将选择使用哪个呢?这取决于这些I/O复用技术的优先级。
Libevent支持的后端I/O复用技术及它们的优先级在event.c文件中定义,如代码清单12-8所示。
代码清单12-8 Libevent支持的后端I/O复用技术及它们的优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #ifdef_EVENT_HAVE_EVENT_PORTS extern const struct eventop evportops ; #endif #ifdef_EVENT_HAVE_SELECT extern const struct eventop selectops ; #endif #ifdef_EVENT_HAVE_POLL extern const struct eventop pollops ; #endif #ifdef_EVENT_HAVE_EPOLL extern const struct eventop epollops ; #endif #ifdef_EVENT_HAVE_WORKING_KQUEUE extern const struct eventop kqops ; #endif #ifdef_EVENT_HAVE_DEVPOLL extern const struct eventop devpollops ; #endif #ifdef WIN32 extern const struct eventop win32ops; #endif static const struct eventop *eventops []= { #ifdef_EVENT_HAVE_EVENT_PORTS &evportops, #endif #ifdef_EVENT_HAVE_WORKING_KQUEUE &kqops, #endif #ifdef_EVENT_HAVE_EPOLL &epollops, #endif #ifdef_EVENT_HAVE_DEVPOLL &devpollops, #endif #ifdef_EVENT_HAVE_POLL &pollops, #endif #ifdef_EVENT_HAVE_SELECT &selectops, #endif #ifdef WIN32 &win32ops, #endif NULL };
Libevent通过遍历eventops数组来选择其后端I/O复用技术。遍历的顺序是从数组的第一个元素开始,到最后一个元素结束。所以在Linux下,Libevent默认选择的后端I/O复用技术是epoll。但很显然,用户可以修改代码清单12-8中定义的一系列宏来选择使用不同的后端I/O复用技术。
2.7 event_base结构体
结构体event_base是Libevent的Reactor。它定义在event-internal.h文件中,如代码清单12-9所示。
代码清单12-9 event_base结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 struct event_base { const struct eventop *evsel ; void *evbase; struct event_changelist changelist ; const struct eventop *evsigsel ; struct evsig_info sig ; int virtual_event_count; int event_count; int event_count_active; int event_gotterm; int event_break; int event_continue; int event_running_priority; int running_loop; struct event_list *activequeues ; int nactivequeues; struct common_timeout_list **common_timeout_queues ; int n_common_timeouts; int n_common_timeouts_allocated; struct deferred_cb_queue defer_queue ; struct event_io_map io ; struct event_signal_map sigmap ; struct event_list eventqueue ; struct min_heap timeheap ; struct timeval event_tv ; struct timeval tv_cache ; #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) struct timeval tv_clock_diff ; time_t last_updated_clock_diff; #endif #ifndef_EVENT_DISABLE_THREAD_SUPPORT unsigned long th_owner_id; void *th_base_lock; struct event *current_event ; void *current_event_cond; int current_event_waiters; #endif #ifdef WIN32 struct event_iocp_port *iocp ; #endif enum event_base_config_flag flags ; int is_notify_pending; evutil_socket_t th_notify_fd[2 ]; struct event th_notify ; int (*th_notify_fn)(struct event_base*base); };
2.8 事件循环
最后,我们讨论一下Libevent的“动力”,即事件循环。Libevent中实现事件循环的函数是event_base_loop。该函数首先调用I/O事件多路分发器的事件监听函数,以等待事件;当有事件发生时,就依次处理之。event_base_loop函数的实现如代码清单12-10所示。
代码清单12-10 event_base_loop函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 int event_base_loop (struct event_base*base,int flags) { const struct eventop *evsel = base->evsel; struct timeval tv ; struct timeval *tv_p ; int res,done,retval=0 ; EVBASE_ACQUIRE_LOCK(base,th_base_lock); if (base->running_loop){ event_warnx("%s:reentrant invocation.Only one event_base_loop" "can run on each event_base at once." ,__func__); EVBASE_RELEASE_LOCK(base,th_base_lock); return -1 ; } base->running_loop=1 ; clear_time_cache(base); if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) evsig_set_base(base); done=0 ; #ifndef_EVENT_DISABLE_THREAD_SUPPORT base->th_owner_id=EVTHREAD_GET_ID(); #endif base->event_gotterm=base->event_break=0 ; while (!done){ base->event_continue=0 ; if (base->event_gotterm){ break ; } if (base->event_break){ break ; } timeout_correct(base,&tv); tv_p=&tv; if (!N_ACTIVE_CALLBACKS(base) && !(flags&EVLOOP_NONBLOCK)){ timeout_next(base,&tv_p); }else { evutil_timerclear(&tv); } if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)){ event_debug(("%s:no events registered." ,__func__)); retval=1 ; goto done; } gettime(base,&base->event_tv); clear_time_cache(base); res=evsel->dispatch(base,tv_p); if (res==-1 ){ event_debug(("%s:dispatch returned unsuccessfully." ,__func__)); retval=-1 ; goto done; } update_time_cache(base); timeout_process(base); if (N_ACTIVE_CALLBACKS(base)){ int n=event_process_active(base); if ((flags&EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base)==0 && n!=0 ) done=1 ; }else if (flags&EVLOOP_NONBLOCK) done=1 ; } event_debug(("%s:asked to terminate loop." ,__func__)); done: clear_time_cache(base); base->running_loop=0 ; EVBASE_RELEASE_LOCK(base,th_base_lock); return (retval); }