8. muduo网络库设计与实现

第8章 muduo网络库设计与实现

0. 什么都不做的EventLoop

首先定义EventLoop class的基本接口:构造函数、析构函数、loop()成员函数。注意EventLoop是不可拷贝的,因此它继承了boost::noncopyable。muduo中的大多数class都是不可拷贝的,因此以后只会强调某个class是可拷贝的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class EventLoop : boost::noncopyable {
public:
EventLoop();
~EventLoop();
void loop();
void assertInLoopThread() {
if(!isInLoopThread()) {
abortNotInLoopThread();
}
}
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

private:
void abortNotInLoopThread();
bool looping; /*atomic */
const pid_t threadId_;
};

One looper thread顾名思义每个线程只能有一个EventLoop对象,因此EventLoop的构造函数会检查当前线程是否已经创建了其他EventLoop对象,遇到错误就终止程序(LOG_FATAL)。EventLoop的构造函数会记住本对象所属的线程(threadId_)。

创建了EventLoop对象的线程是IO线程,其主要功能是运行事件循环EventLoop:loop()。EventLoop对象的生命期通常和其所属的线程一样长,它不必是heap对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
__thread EventLoop* t_loopInThisThread = 0;

EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid()) {
LOG_TRACE << "EventLoop created" << this << " in thread " << threadId_;
if (t_loopInThisThread) {
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
} else {
t_loopInThisThread = this;
}
}

EventLoop::~EventLoop() {
assert(!looping_);
t_loopInThisThread = NULL;
}

既然每个线程至多有一个EventLoop对象,那么我们让EventLoop的static成员函数getEventLoopOfCurrentThread()返回这个对象。返回值可能为NULL,如果当前线程不是IO线程的话。(这个函数是muduo后来新加的,因此前面头文件中没有它的原型。)

1
2
3
EventLoop* EventLoop::getEventLoopOfCurrentThread() {
return t_loopInThisThread;
}

muduo的接口设计会明确哪些成员函数是线程安全的,可以跨线程调用:哪些成员函数只能在某个特定线程调用(主要是IO线程)。为了能在运行时检查这些pre-condition,EventLoop提供了isInLoopThread()和assertInLoopThread()等函数。

事件循环必须在IO线程执行,因此EventLoop::loop()会检查这一pre-condition。本节的loop()什么事都不做,等5秒就退出。

1
2
3
4
5
6
7
8
void EventLoop::loop() {
assert(!looping_);
assertInLoopThread();
looping_ = true;
::poll(NULL, 0, 5*1000);
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

为了验证现有的功能,我编写了s00/test1.cc和s00/test2.cc。其中test1.cc会在主线程和子线程分别创建一个EventLoop,程序正常运行退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void threadFunc() {
printf("threadFunc(): pid=%d, tid=%d\n", getpid(), muduo::CurrentThread::tid());
muduo::EventLoop loop;
loop.loop();
}

int main() {
printf("main(): pid=%d, tid=%d\n", getpid(), muduo::CurrentThread::tid());
muduo::EventLoop loop;
muduo::Thread thread(threadFunc);
thread.start();
loop.loop();
pthread_exit(NULL);
}

test2.cc是个负面测试,它在主线程创建了EventLoop对象,却试图在另一个线程调用其EventLoop::loop(),程序会因断言失效而异常终止。练习:写一个负面测试,在主线程创建两个EventLoop对象,验证程序会异常终止。

1
2
3
4
5
6
7
8
9
10
11
12
muduo::EventLoop* g_loop;
void threadFunc() {
g_loop->loop();
}

int main() {
muduo::EventLoop loop;
g_loop = &loop;
muduo::Thread t(threadFunc);
t.start();
t.join();
}

1. Reactor的关键结构

本节讲Reactor最核心的事件分发机制,即将IO multiplexing拿到的IO事件分发给各个文件描述符(fd)的事件处理函数。

1.1 Channel class

  • Channel class的功能有一点类似Java NIO的SelectableChannel和SelectionKey的组合。
  • 每个Channel对象自始至终只属于一个EventLoop,因此每个Channel对象都只属于某一个IO线程
  • 每个Channel对象自始至终只负责一个文件描述符(fd)的IO事件分发,但它并不拥有这个fd,也不会在析构的时候关闭这个fd。
  • Channel会把不同的IO事件分发为不同的回调,例如ReadCallback、WriteCallback等,而且“回调”用boost::function表示,用户无须继承Channel,Channel不是基类。

muduo用户一般不直接使用Channel,而会使用更上层的封装,如TcpConnection。Channel的生命期由其owner class负责管理,它一般是其他class的直接或间接成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Channel : boost::noncopyable {
public:
typedef boost::function<void()> EventCallback;

Channel(EventLoop* loop, int fd);

void handleEvent();
void setReadCallback(const EventCallback& cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }

int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; }
bool isNoneEvent() const { return events_ == kNoneEvent; }

void enableReading() { events_ = kReadEvent; update(); }
void enableWriting() { events_ = kWriteEvent; update(); }
// void disableWriting() { events_ &= ~kWriteEvent; update(); }
// void disableAll() { events_ = kNoneEvent; update(); }

// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }

EventLoop* ownerLoop() { return loop_; }

private:
void update();

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop* loop_;
const int fd_;
int events_;
int revents_;
int index_; // used by Poller.

EventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
};

以下是Channel class的数据成员。其中events_是它关心的IO事件,由用户设置;revents_是目前活动的事件,由EventLoop/Poller设置:这两个字段都是bit pattern,它们的名字来自poll(2)的struct pollfd。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fdArg)
: loop(loop),
fd_(fdArg),
events_(0),
revents_(0),
index_(-1) {}

void Channel::update() {
loop_->updateChannel(this);
}

Channel::update() 会调用 EventLoop::updateChannel(),后者会转而调用 Poller::updateChannel()。由于 Channel 没有包含 EventLoop.h,因此 Channel::update() 必须定义在 Channel.cc 中。

Channel::handleEvent是Channel的核心,它由EventLoop::loop()调用,它的功能是根据revents_的值分别调用不同的用户回调。这个函数以后还会扩充。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Channel::handleEvent() {
if (revents_ & POLLNVAL) {
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL)) {
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
if (readCallback_) readCallback_();
}
if (revents_ & POLLOUT) {
if (writeCallback_) writeCallback_();
}
}

1.2 Poller class

Poller class是IO multiplexing的封装。它现在是个具体类,而在muduo中是个抽象基类,因为muduo同时支持poll(2)和epoll(4)两种IO multiplexing机制。Poller是EventLoop的间接成员,只供其owner EventLoop在IO线程调用,因此无须加锁。其生命期与EventLoop相等。Poller并不拥有Channel,Channel在析构之前必须自已unregister(EventLoop::removeChannel()),避免空悬指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Poller : boost::noncopyable {
public:
typedef std::vector<Channel*> ChannelList;

Poller(EventLoop* loop);
~Poller();

Timestamp poll(int timeoutMs, ChannelList* activeChannels);

void updateChannel(Channel* channel);
void assertInLoopThread() { ownerLoop_->assertInLoopThread(); }

private:
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel*> ChannelMap;

EventLoop* ownerLoop_;
PollFdList pollfds_;
ChannelMap channels_;
};

Poller供EventLoop调用的函数目前有两个,poll()和updateChannel(),Poller暂时没有定义removeChannel()成员函数,因为前几节还用不到它。

private 是Poller class的数据成员。其中ChannelMap是从fd到Channel*的映射。Poller::poll()不会在每次调用poll(2)之前临时构造pollfd数组,而是把它缓存起来(pollfds_)

Poller的构造函数和析构函数都很简单,因其成员都是标准库容器。

1
2
3
4
5
6
Poller::Poller(EventLoop* loop)
: ownerLoop_(loop) {
}

Poller::~Poller() {
}

Poller::poll()是Poller的核心功能,它调用poll(2)获得当前活动的IO事件,然后填充调用方传入的activeChannels,并返回poll(2) return的时刻。这里我们直接把vector<struct pollfd> pollfds_作为参数传给poll(2),因为C++标准保证std::vector的元素排列跟数组一样。&*pollfds_.begin()是获得元素的首地址,这个表达式的类型为pollfds_*,符合poll(2)的要求。(在C++11中可写为pollfds_.data),g++4.4的STL也支持这种写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels) {
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0) {
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
} else if (numEvents == 0) {
LOG_TRACE << "nothing happened";
} else {
LOG_SYSERR << "Poller::poll()";
}
return now;
}

fillActiveChannels()遍历pollfds_,找出有活动事件的fd,把它对应的Channel填入activeChannels。这个函数的复杂度是O(N),其中N是pollfds_的长度即文件描述符数目。为了提前结束循环,每找到一个活动fd就递减numEvents,这样当numEvents减为0时表示活动fd都找完了,不必做无用功。当前活动事件revents会保存在Channel中,供Channel::handleEvent()使用。

注意这里我们不能一边遍历pollfds_,一边调用channel::handleEvent(),因为后者会添加或删除Channel,从而造成pollfds_在遍历期间改变大小,这是非常危险的。另外一个原因是简化Poller的职责,它只负责IO multiplexing,不负责事件分发(dispatching)。这样将来可以方便地替换为其他更高效的IO multiplexing机制,如epoll(4)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd) {
if (pfd->revents > 0) {
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
activeChannels->push_back(channel);
}
}
}

Poller::updateChannel()的主要功能是负责维护和更新pollfds_数组。添加新Channel的复杂度是O(logN),更新已有的Channel的复杂度是O(1),因为Channel记住了自已在pollfds_数组中的下标,因此可以快速定位。removeChannel()的复杂度也将会是O(logN)。这里用了大量的assert来检查invariant。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Poller::updateChannel(Channel* channel) {
assertInLoopThread();
LOG_TRACE << "fd=" << channel->fd() << " events=" << channel->events();
if (channel->index() == -1) {
// a new one, add to pollfds.
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size()) - 1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
} else {
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
// ignore this pollfd
pfd.fd = -1;
}
}
}

另外,如果某个Channel暂时不关心任何事件,就把pollfd.fd设为-1,让poll(2)忽略此项。这里不能改为把pollfd.events设为0,这样无法屏蔽POLLERR事件。改进的做法(p.312)是把pollfd.fd设为channel->fd()的相反数减一,这样可以进一步检查invariant。(思考:为什么要减一?)


Tips:

关于为什么要将pollfd.fd设置为channel->fd()的相反数减一,而不是直接设置为0或-1,这里有几点考虑:

  • 区分有效和无效的文件描述符:通过将pollfd.fd设置为-channel->fd() - 1,可以确保这个值与任何有效的文件描述符都不同。因为文件描述符是非负整数,所以这个负值可以明确表示该pollfd是无效的。
  • 避免与0冲突:在某些情况下,文件描述符0可能被使用,如果简单地将pollfd.fd设置为0,可能会与实际的文件描述符0混淆。
  • 检查invariant:通过减一,可以确保在检查invariant时,能够识别出pollfd.fd是否被正确地设置为无效值。如果pollfd.fd的值在-channel->fd() - 1channel->fd()之间,那么它应该是一个有效的文件描述符。如果等于-channel->fd() - 1,则表示该pollfd是无效的。

1.3 EventLoop的改动

EventLoop class新增了quit成员函数,还加了几个数据成员,并在构造函数里初始化它们。注意EventLoop通过scoped_ptr来间接持有Poller,因此EventLoop.h不必包含Poller.h,只需前向声明Poller class。为此,EventLoop的析构函数必须在EventLoop.cc中显式定义。

1
2
3
4
5
6
7
8
9
void EventLoop::abortNotInLoopThread();

+typedef std::vector<Channel*> ChannelList;

bool looping; /*atomic*/
+ bool quit; /*atomic */
const pid_t threadId_;
+ boost::scoped_ptr<Poller> poller_;
+ ChannelList activeChannels_;

EventLoop::loop()有了真正的工作内容,它调用Poller::poll()获得当前活动事件的Channel列表,然后依次调用每个Channel的handleEvent()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void EventLoop::loop() {
assert(!looping_);
assertInLoopThread();
looping_ = true;
+ quit_ = false;
while (!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it) {
(*it)->handleEvent();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

以上几个class尽管简陋,却构成了Reactor模式的核心内容。时序图见图8-1。


Tips:

这段代码和时序图展示了一个基于Reactor模式的事件循环(Event Loop)的基本工作流程。下面是对代码和时序图的解释:

  1. 初始化
    • looping_ 标志被设置为 true,表示事件循环已经开始。
    • assertInLoopThread() 确保当前线程是事件循环所在的线程。
  2. 事件循环
    • while (!quit_) 循环是事件循环的核心,它将持续运行直到 quit_ 被设置为 true
  3. 轮询事件
    • activeChannels_.clear(); 清空上一次循环中活跃的通道列表。
    • poller_->poll(kPollTimeMs, &activeChannels_); 调用 Pollerpoll 方法来检查在 kPollTimeMs 毫秒内哪些通道上有事件发生。这个方法会填充 activeChannels_ 列表,包含所有有事件发生的通道。
  4. 处理事件
    • 循环遍历 activeChannels_ 列表,对每个通道调用 handleEvent() 方法。这是Reactor模式的关键部分,即事件分发器(Event Loop)将事件分发给相应的处理器(Channel)。
  5. 日志记录
    • 当事件循环结束时,记录一条日志信息。
  6. 时序图解释
    • EventLoop:代表事件循环,它调用 loop() 方法开始事件循环。
    • Poller:负责轮询事件,poll() 方法被调用以获取活跃的通道列表。
    • ChannelA 和 ChannelB:代表具体的通道,它们可能代表网络连接或其他类型的事件源。
    • 用户回调(user callbacks):当通道上有事件发生时,handleEvent() 方法被调用,这通常涉及到执行用户定义的回调函数来处理事件。

时序图中的箭头表示方法调用和事件的流动。例如,当 EventLoop 调用 Pollerpoll() 方法时,Poller 会填充活跃通道列表并返回给 EventLoop。然后,EventLoop 会遍历这些通道,并对每个通道调用 handleEvent() 方法,这可能会触发用户定义的回调函数。

这个模式允许系统以非阻塞的方式处理多个事件源,是网络编程中常见的一种设计模式。


我们现在可以终止事件循环,只要将quit_设为true即可,但是quit()不是立刻发生的,它会在EventLoop::loop()下一次检查while(!quit_)的时候起效。如果在非当前IO线程调用quit(),延迟可以长达数秒,将来我们可以唤醒EventLoop以缩小延时。但是quit()不是中断或signal,而是设标志,如果EventLoop::loop()正阻塞在某个调用中,quit不会立刻生效。

1
2
3
4
void EventLoop::quit() {
quit_ = true;
+ wakeup();
}

EventLoop::updateChannel()在检查断言之后调用Polle::updateChannel(),EventLoop不关心Poller是如何管理Channel列表的。

1
2
3
4
5
void EventLoop::updateChannel(Channel* channel) {
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}

有了以上的EventLoop、Poller、Channel,我们写个小程序简单地测试一下功能。s01/test3.cc用timerfd实现了一个单次触发的定时器,为S8.2的内容打下基础。这个程序利用Channel将timerfd的readable事件转发给timeout函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <sys/timerfd.h>

muduo::EventLoop* g_loop;
void timeout() {
printf("Timeout!\n");
g_loop->quit();
}

int main() {
muduo::EventLoop loop;
g_loop = &loop;
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
muduo::Channel channel(&loop, timerfd);
channel.setReadCallback(timeout);
channel.enableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 5;
::timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
::close(timerfd);
}

由于poll(2)是level trigger,在timeout()中应该read() timefd,否则下次会立刻触发。在现阶段采用level trigger的好处之一是可以通过strace命令直观地看到每次poll(2)的参数列表,容易检查程序的行为。

2. TimerQueue定时器

有了前面的Reactor基础,我们可以给EventLoop加上定时器功能。传统的Reactor通过控制select(2)和poll(2)的等待时间来实现定时,而现在在Linux中有了timerfd,我们可以用和处理IO事件相同的方式来处理定时,代码的一致性更好。muduo中的backport.diff展示了传统方案。

2.1 TimerQueue class

muduo的定时器功能由三个class实现,TimerId、Timer、TimerQueue,用户只能够看到第一个class,另外两个都是内部实现细节。TimerId和Timer的实现很简单,这里就不展示源码了。

TimerQueue的接口很简单,只有两个函数addTimer()和cancel()。本节我们只实现addTimer(),cancel()的实现见后文。addTimer()是供EventLoop使用的,EventLoop会把它封装为更好用的runAt()、runAfter()、runEvery()等函数。

1
2
3
4
5
6
7
class TimerQueue : boost::noncopyable {
public:
TimerQueue(EventLoop* loop);
~TimerQueue();
TimerId addTimer(const TimerCallback& cb, Timestamp when, double interval);
// void cancel(TimerId timerId);
};

值得一提的是TimerQueue的数据结构的选择,TimerQueue需要高效地组织目前尚未到期的Timer,能快速地根据当前时间找到已经到期的Timer,也要能高效地添加和删除Timer。最简单的TimerQueue以按到期时间排好序的线性表为数据结构,muduo最早也是用这种结构。这种结构的常用操作都是线性查找,复杂度是O(N)。

另一种常用做法是二叉堆组织优先队列(libev用的是更高效的4-heap),这种做法的复杂度降为O(logN),但是C++标准库的make_heap()等函数不能高效地删除heap中间的某个元素,需要我们自己实现(令Timer记住自已在heap中的位置)。

还有一种做法是使用二叉搜索树(例如std::set/std::map),把Timer按到期时间先后排好序。操作的复杂度仍然是O(logN),不过memory locality比heap要差一些,实际速度可能略慢。但是我们不能直接用map<Timestamp, Timer*>,因为这样无法处理两个Timer到期时间相同的情况。有两个解决方案,一是用multimap或multiset,二是设法区分key。muduo现在采用的是第二种做法,这样可以避免使用不常见的multimap class。具体来说,以**pair<Timestamp, Timer*>为key**,这样即便两个Timer的到期时间相同,它们的地址也必定不同。

以下是TimerQueue的数据成员,这个结构利用了现成的容器库,实现简单,容易验证其正确性,并且性能也不错。TimerList是set而非map,因为只有key没有value。TimerQueue使用了一个Channel来观察timerfd_上的readable事件。注意TimerQueue的成员函数只能在其所属的IO线程调用,因此不必加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

class TimerQueue : boost::noncopyable {
public:
TimerQueue(EventLoop* loop);
~TimerQueue();
void handleRead(); // called when timerfd alarms
std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);
bool insert(Timer* timer);

private:
EventLoop* loop_;
const int timerfd_;
Channel timerfdChannel_;
TimerList timers_;
};

TimerQueue的实现目前有一个不理想的地方,Timer是用裸指针管理的,需要手动delete。这里用shared_ptr似乎有点小题大做了。在C++11中,或许可以改进为unique_ptr,避免手动管理资源。

来看关键的getExpired()函数的实现,这个函数会从timers_中移除已到期的Timer,并通过vector返回它们。编译器会实施RVO优化,不必太担心性能,必要时可以像EventLoop::activeChannels_那样复用vector。注意其中哨兵值(sentry)的选取,sentry让set::lower_bound()返回的是第一个未到期的Timer的迭代器,因此断言中是<而非≤。

1
2
3
4
5
6
7
8
9
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) {
std::vector<Entry> expired;
Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
TimerList::iterator it = timers_.lower_bound(sentry);
assert(it == timers_.end() || now < it->first);
std::copy(timers_.begin(), it, back_inserter(expired));
timers_.erase(timers_.begin(), it);
return expired;
}

图8-2是TimerQueue回调用户代码onTimer的时序图。

2.2 EventLoop的改动

EventLoop新增了几个方便用户使用的定时器接口,这些函数都转而调用TimerQueue::addTimer()。注意这几个EventLoop成员函数应该允许跨线程使用,比如说我想在某个IO线程中执行超时回调。这就带来线程安全性方面的问题,muduo的解决办法不是加锁,而是把对TimerQueue的操作转移到IO线程来进行,这会用到\(\S8.3\)介绍的EventLoop::runInLoop()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) {
return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) {
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) {
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(cb, time, interval);
}

测试代码见s02/test4.cc,这与muduo正式的用法完全一样。

3. EventLoop::runInLoop()函数

EventLoop有一个非常有用的功能:在它的IO线程内执行某个用户任务回调,即EventLoop::runInLoop(const Functor& cb),其中Functor是boost::function<void()>。如果用户在当前IO线程调用这个函数,回调会同步进行;如果用户在其他线程调用runInLoop(),cb会被加入队列,IO线程会被唤醒来调用这个Functor。

1
2
3
4
5
6
7
void EventLoop::runInLoop(const Functor& cb) {
if (isInLoopThread()) {
cb();
} else {
queueInLoop(cb);
}
}

有了这个功能,我们就能轻易地在线程间调配任务,比如说把TimerQueue的成员函数调用移到其IO线程,这样可以在不用锁的情况下保证线程安全性。

由于IO线程平时阻塞在事件循环EventLoop::loop()的poll()调用中,为了让IO线程能立刻执行用户回调,我们需要设法唤醒它。传统的办法是用pipe(2),IO线程始终监视此管道的可读事件,在需要唤醒的时候,其他线程往管道里写一个字节,这样IO线程就从IO multiplexing阻塞调用中返回。(原理类似HTTP long polling。)现在Linux有了eventfd(2),可以更高效地唤醒,因为它不必管理缓冲区。以下是EventLoop新增的成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private:
void abortNotInLoopThread();
+ void handleRead(); // waked up
+ void doPendingFunctors();

typedef std::vector<Channel*> ChannelList;

bool looping_; /* atomic */
bool quit_; /* atomic */
+ bool callingPendingFunctors_; /* atomic */
const pid_t threadId_;
Timestamp pollReturnTime_;
boost::scoped_ptr<Poller> poller_;

+ int wakeupFd_;
+ boost::scoped_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
+ MutexLock mutex_;
+ std::vector<Functor> pendingFunctors_; // @BuardedBy mutex

wakeupChannel_用于处理wakeupFd_上的可读事件,将事件分发至handleRead()函数。其中只有pendingFunctors_暴露给了其他线程,因此用mutex保护。

queueInLoop()的实现很简单,将cb放入队列,并在必要时唤醒IO线程。

1
2
3
4
5
6
7
void EventLoop::queueInLoop(const Functor& cb) {
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
}

“必要时”有两种情况,

  • 如果调用queueInLoop()的线程不是IO线程,那么唤醒是必需的;
  • 如果在IO线程调用queueInLoop,而此时正在调用pending functor,那也必须唤醒。

换句话说,只有在IO线程的事件回调中调用queueInLoop()才无须wakeup()。

看了下面doPendingFunctors()的调用时间点,想必读者就能明白为什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void EventLoop::loop() {
assert(!quit_);
assertInLoopThread();
looping_ = true;
while (!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it) {
(*it)->handleEvent();
}
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

Tips:

在解释为什么“只有在IO线程的事件回调中调用queueInLoop()才无须wakeup()”之前,我们先回顾一下queueInLoop()wakeup()函数的作用和它们在EventLoop机制中的位置。

queueInLoop()的作用

queueInLoop()函数用于将一个任务(callback)排队到EventLoop的待执行队列中。如果这个函数是在IO线程(即EventLoop所在的线程)调用的,那么它将直接将任务添加到队列中;如果从其他线程调用,它还会唤醒IO线程,以便IO线程能够处理这个新加入的任务。

wakeup()的作用

wakeup()函数用于唤醒IO线程,使其从阻塞状态(如poll()调用)返回。这通常是通过向一个特殊的文件描述符(如eventfd)写入数据来实现的,这样IO线程在检查这个文件描述符时会发现它可读,从而退出阻塞状态。

为什么在IO线程的事件回调中调用queueInLoop()无须wakeup()

  1. 避免不必要的唤醒:当queueInLoop()在IO线程内部的事件回调中被调用时,意味着IO线程已经处于运行状态,不需要额外的唤醒操作。如果此时调用wakeup(),实际上是多余的,因为IO线程已经在处理事件回调,它自然会在回调结束后检查待执行队列。

  2. 减少上下文切换:如果queueInLoop()在IO线程内部调用wakeup(),可能会导致不必要的上下文切换。因为wakeup()操作会将IO线程从阻塞状态唤醒,但如果queueInLoop()已经在IO线程中执行,这个唤醒操作就是不必要的,反而增加了上下文切换的开销。

  3. 避免竞态条件:在IO线程内部调用queueInLoop()时,不需要担心其他线程同时修改待执行队列,因为所有操作都在同一个线程(IO线程)中顺序执行,不存在竞态条件。而wakeup()操作通常是在其他线程中调用的,需要确保在唤醒IO线程时,待执行队列的状态是一致的。

  4. 提高效率:在IO线程内部调用queueInLoop()而不唤醒,可以减少唤醒操作的开销,提高整体的效率。因为唤醒操作涉及到系统调用和可能的上下文切换,这些都是相对昂贵的操作。

结论

综上所述,当queueInLoop()在IO线程的事件回调中被调用时,由于IO线程已经处于活跃状态,不需要额外的唤醒操作,因此无须调用wakeup()。这样可以避免不必要的上下文切换,减少竞态条件的风险,并提高整体的效率。


EventLoop::doPendingFunctors()不是简单地在临界区内依次调用Functor,而是把回调列表swap()到局部变量functors中,这样一方面减小了临界区的长度(意味着不会阻塞其他线程调用queueInLoop()),另一方面也避免了死锁(因为Functor可能再调用queueInLoop())。

1
2
3
4
5
6
7
8
9
10
11
12
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i) {
functors[i]();
}
callingPendingFunctors_ = false;
}

由于doPendingFunctors()调用的Functor可能再调用queueInLoop(),这时queueInLoop()就必须wakeup(),否则这些新加的cb就不能被及时调用了。muduo这里没有反复执行doPendingFunctors()直到pendingFunctors_为空,这是有意的,否则IO线程有可能陷入死循环,无法处理IO事件。

EventLoop::quit()中增加几行代码,在必要时唤醒IO线程,让它及时终止循环。思考:为什么在IO线程调用quit就不必wakeup()?

1
2
3
4
5
6
void EventLoop::quit() {
quit_ = true;
if (!isInLoopThread()) {
wakeup();
}
}

EventLoop::wakeup()和EventLoop::handleRead()分别对wakeupFd_写入数据和读出数据,代码从略。注意muduo不是在EventLoop::handleRead()中执行doPendingFunctors(),理由见相关文档。

s03/test5.cc是单线程程序,测试了runInLoop()和queueInLoop()等新函数。

3.1 提高TimerQueue的线程安全性

前面提到TimerQueue::addTimer()只能在IO线程调用,因此EventLoop::runAfter()系列函数不是线程安全的。下面这段代码在\(\S8.2\)中会crash,因为它在非IO线程调用了EventLoop::runAfter()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
muduo::EventLoop* g_loop;

void print() {} // 空函数

void threadFunc() {
g_loop->runAfter(1.0, print);
}

int main() {
muduo::EventLoop loop;
g_loop = &loop;
muduo::Thread t(threadFunc);
t.start();
loop.loop();
}

运行结果:

1
2
3
4
FATAL EventLoop::abortNotInLoopThread
EventLoop 0x7fff892d1070 was created in threadId_=17896,
current thread id = 17897 - EventLoop.cc:102
Aborted (core dumped)

借助EventLoop::runInLoop(),我们可以很容易地将TimerQueue::addTimer()做成线程安全的,而且无须用锁。办法是让addTimer()调用runInLoop(),把实际工作转移到IO线程来做。先新增一个addTimerInLoop成员函数:

1
2
3
4
5
6
7
8
9
10
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

class TimerQueue : boost::noncopyable {
public:
TimerQueue(EventLoop* loop);
~TimerQueue();
void addTimerInLoop(Timer* timer);
// ...
};

然后把addTimer拆成两部分,拆分后的addTimer()只负责转发,addTimerInLoop()完成修改定时器列表的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
TimerId TimerQueue::addTimer(const TimerCallback& cb, Timestamp when, double interval) {
Timer* timer = new Timer(cb, when, interval);
loop_->runInLoop(boost::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer);
}

void TimerQueue::addTimerInLoop(Timer* timer) {
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
if (earliestChanged) {
resetTimerfd(timerfd_, timer->expiration());
}
}

这样无论在哪个线程调用addTimer()都是安全的了,上一页的代码也能正常运行。

3.2 EventLoopThread class

  • IO线程不一定是主线程,我们可以在任何一个线程创建并运行EventLoop。
  • 一个程序也可以有不止一个IO线程,我们可以按优先级将不同的socket分给不同的IO线程,避免优先级反转。

为了方便将来使用,我们定义EventLoopThread class,这正是one looper thread的本意。

EventLoopThread会启动自己的线程,并在其中运行EventLoop::loop()。其中关键的startLoop()函数定义如下,这个函数会返回新线程中EventLoop对象的地址,因此用条件变量来等待线程的创建与运行。

1
2
3
4
5
6
7
8
9
EventLoop* EventLoopThread::startLoop() {
assert(!thread_.started());
thread_.start();
MutexLockGuard lock(mutex_);
while (loop_ == NULL) {
cond_.wait();
}
return loop_;
}

线程主函数在stack上定义EventLoop对象,然后将其地址赋值给loop_成员变量,最后notify()条件变量,唤醒startLoop()。

1
2
3
4
5
6
7
8
9
void EventLoopThread::threadFunc() {
EventLoop loop;
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();
}
loop.loop();
}

由于EventLoop的生命期与线程主函数的作用域相同,因此在threadFunc()退出之后这个指针就失效了。好在服务程序一般不要求能安全地退出,这应该不是什么大问题。

s03/test6.cc测试了EventLoopThread的功能,也测试了跨线程调用EventLoop::runInLoop()和EventLoop::runAfter()。

4. 实现TCP网络库

到目前为止,Reactor事件处理框架已初具规模,从本节开始我们用它逐步实现一个非阻塞TCP网络编程库。从poll(2)返回到再次调用poll(2)阻塞称为一次事件循环。图8-3值得印在脑中,它有助于理解一次循环中各种回调发生的顺序。

传统的Reactor实现一般会把timers做成循环中单独的一步,而muduo把它和IO handlers等同视之,这是使用timerfd的附带效应。将来有必要时也可以在调用IO handlers之前或之后处理timers。

后面几节的内容安排如下:

  • \(\S8.4\)介绍Acceptor class,用于accept(2)新连接。
  • \(\S8.5\)介绍TcpServer,处理新建TcpConnection。
  • \(\S8.6\)处理TcpConnection断开连接。
  • \(\S8.7\)介绍Buffer class并用它读取数据。
  • \(\S8.8\)介绍如何无阻塞发送数据。
  • \(\S8.9\)完善TcpConnection,处理SIGPIPE、TCP keepalive等。

至此,单线程TCP服务端网络编程已经基本成型,大部分muduo示例都可以运行。

4.1 Acceptor class

先定义Acceptor class,用于accept(2)新TCP连接,并通过回调通知使用者。它是内部class,供TcpServer使用,生命期由后者控制。Acceptor的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Acceptor : boost::noncopyable {
public:
typedef boost::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor(EventLoop* loop, const InetAddress& listenAddr);
void setNewConnectionCallback(const NewConnectionCallback& cb) {
newConnectionCallback_ = cb;
}
bool listening() const { return listening_; }
void listen();
private:
void handleRead();
EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listening_;
};

Acceptor的数据成员包括Socket、Channel等。其中Socket是一个RAII handle封装了socket文件描述符的生命期。

  • Acceptor的socket是listening socket,即server socket
  • Channel用于观察此socket上的readable事件,并回调Acceptor::handleRead(),后者会调用accept(2)来接受新连接,并回调用户callback。

Acceptor的构造函数和Acceptor::listen()成员函数执行创建TCP服务端的传统步骤,即调用socket(2)、bind(2)、listen(2)等Sockets API,其中任何一个步骤出错都会造成程序终止,因此这里看不到错误处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie()),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
acceptSocket_.setReuseAddr(true),
acceptSocket_.bindAddress(listenAddr),
acceptChannel_.setReadCallback(boost::bind(&Acceptor::handleRead, this)) {
}

void Acceptor::listen() {
loop_->assertInLoopThread();
listening_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}

Acceptor的接口中用到了InetAddress class,这是对struct sockaddr_in的简单封装,能自动转换字节序,代码从略。InetAddress具备值语义,是可以拷贝的。

Acceptor的构造函数用到createNonblockingOrDie()来创建非阻塞的socket,现在的Linux可以一步完成,代码如下。

1
2
3
4
5
6
7
8
int sockets::createNonblockingOrDie() {
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0) {
LOG_SYSFATAL << "sockets::createNonblockingOrDie";
return -1;
}
return sockfd;
}

Acceptor::listen()的最后一步让acceptChannel_在socket可读的时候调用Acceptor::handleRead(),后者会接受(accept(2))并回调newConnectionCallback_。这里直接把socket fd传给callback,这种传递int句柄的做法不够理想,在C++11中可以先创建Socket对象,再用移动语义把Socket对象std::move()给回调函数,确保资源的安全释放。

1
2
3
4
5
6
7
8
9
10
11
12
void Acceptor::handleRead() {
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0) {
if (newConnectionCallback_) {
newConnectionCallback_(connfd, peerAddr);
} else {
sockets::close(connfd);
}
}
}

注意这里的实现没有考虑文件描述符耗尽的情况,muduo的处理办法见\(\S7.7\)。还有一个改进措施,在拿到大于或等于0的connfd之后,非阻塞地poll(2)一下看看fd是否可读写。正常情况下poll(2)会返回writable,表明connfd可用。如果poll(2)返回错误,表明connfd有问题,应该立刻关闭连接。

利用Linux新增的系统调用可以直接accept(2)一步得到非阻塞的socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int sockets::accept(int sockfd, struct sockaddr_in* addr) {
socklen_t addrlen = sizeof *addr;
#if VALGRIND
int connfd = ::accept(sockfd, sockaddr_cast(addr), &addrlen);
setNonBlockAndCloseOnExec(connfd);
#else
int connfd = ::accept4(sockfd, sockaddr_cast(addr), &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
#endif

if (connfd < 0) {
int savedErrno = errno;
LOG_SYSERR << "Socket::accept";
switch (savedErrno) {
// ... handle different errors ...
}
}
return connfd;
}

这里区分致命错误和暂时错误,并区别对待。对于暂时错误,例如EAGAIN、EINTR、EMFILE、ECONNABORTED等等,处理办法是忽略这次错误。对于致命错误,例如ENFILE、ENOMEM等等,处理办法是终止程序,对于未知错误也照此办理。

下面写个小程序来试验Acceptor的功能,它在9981端口侦听新连接,连接到达后向它发送一个字符串,随即断开连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void newConnection(int sockfd, const muduo::InetAddress& peerAddr) {
printf("newConnection(): accepted a new connection from %s\n", peerAddr.toHostPort().c_str());
write(sockfd, "How are you?\n", 13);
sockets::close(sockfd);
}

int main() {
printf("main(): pid=%d\n", getpid());
muduo::InetAddress listenAddr(9981);
muduo::EventLoop loop;
muduo::Acceptor acceptor(&loop, listenAddr);
acceptor.setNewConnectionCallback(newConnection);
acceptor.listen();
loop.loop();
}

5. TcpServer接受新连接

本节会介绍TcpServer并初步实现TcpConnection,本节只处理连接的建立,下一节处理连接的断开,再往后依次处理读取数据和发送数据。

TcpServer新建连接的相关函数调用顺序见图8-4(有的函数名是简写,省略了poll(2)调用)。其中Channel::handleEvent()的触发条件是listening socket可读,表明有新连接到达。TcpServer会为新连接创建对应的TcpConnection对象。

5.1 TcpServer class

TcpServer class的功能是管理accept(2)获得的TcpConnection。TcpServer是供用户直接使用的,生命期由用户控制。TcpServer的接口如下,用户只需要设置好callback,再调用start()即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class TcpServer : boost::noncopyable {
public:
TcpServer(EventLoop* loop, const InetAddress& listenAddr);
~TcpServer(); // force out-line dtor, for scoped_ptr members.
void start(); // starts the server if it's not listening.
// Starts the server if it's not listening.
// It's harmless to call it multiple times
// Thread safe.
void start();

// setconnection callback
// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}

// Set message callback.
// Not thread safe.
void setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}

private:
// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress& peerAddr);

typedef std::map<std::string, TcpConnectionPtr> ConnectionMap;

EventLoop* loop_; // the acceptor loop
const std::string name_;
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
bool started_;
int nextConnId_; // always in loop thread
ConnectionMap connections_;
};

TcpServer内部使用Acceptor来获得新连接的fd。它保存用户提供的Connection-Callback和MessageCallback,在新建TcpConnection的时候会原样传给后者。TcpServer持有目前存活的TcpConnection的shared_ptr(定义为TcpConnectionPtr),因为TcpConnection对象的生命期是模糊的,用户也可以持有TcpConnectionPtr.

每个TcpConnection对象有一个名字,这个名字是由其所属的TcpServer在创建TcpConnection对象时生成,名字是ConnectionMap的key。

  • 在新连接到达时,Acceptor会回调newConnection(),
  • 后者会创建TcpConnection对象conn,把它加入ConnectionMap,设置好callback,
  • 再调用conn->connectEstablished(),其中会回调用户提供的ConnectionCallback。代码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, "#%d", nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection[" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toHostPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->connectEstablished();
}

注意muduo尽量让依赖是单向的,TcpServer会用到Acceptor,但Acceptor并不知道TcpServer的存在。TcpServer会创建TcpConnection,但TcpConnection并不知道TcpServer的存在。另外可以考虑改用make_shared()以节约一次new。

5.2 TcpConnection class

TcpConnection class可谓是muduo最核心也是最复杂的class,它的头文件和源文件一共有450多行,是muduo最大的class。本章会用5节的篇幅来逐渐完善它。

TcpConnection是muduo里唯一默认使用shared_ptr来管理的class,也是唯一继承enable_shared_from_this的class,这源于其模糊的生命期,原因见\(\S4.7\)

本节的TcpConnection没有可供用户使用的函数,因此接口从略,以下是其数据成员。

  • 目前TcpConnection的状态只有两个,kConnecting和kConnected,
  • TcpConnection使用Channel来获得socket上的IO事件,它会自己处理writable事件,而把readable事件通过MessageCallback传达给客户。
  • TcpConnection拥有TCP socket,它的析构函数会close(fd)(在Socket的析构函数中发生)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TcpConnection : boost::noncopyable, 
public boost::enable_shared_from_this<TcpConnection> {
public:
// ...
private:
enum StateE { kConnecting, kConnected, };

void setState(StateE s) { state_ = s; }
void handleRead();

EventLoop* loop_;
std::string name_;
StateE state_; // FIXME: use atomic variable

// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
};

注意TcpConnection表示的是“一次TCP连接”,它是不可再生的,一旦连接断开,这个TcpConnection对象就没啥用了。另外TcpConnection没有发起连接的功能,其构造函数的参数是已经建立好连接的socketfd(无论是TcpServer被动接受还是TcpClient主动发起),因此其初始状态是kConnecting。

本节的MessageCallback定义很原始,没有使用Buffer class,而只是把(const char* buf,int len)传给用户,这种接口用起来无疑是很不方便的。

1
2
3
4
5
6
void TcpConnection::handleRead() {
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
messageCallback_(shared_from_this(), buf, n);
// FIXME: close connection if n == 0
}

本节的TcpConnection只处理了建立连接,没有处理断开连接(例如handleRead()中的read(2)返回0),接收数据的功能很简陋,也不支持发送数据,这些都将逐步得到完善。

s05/test8.cc试验了目前实现的功能,它实际上是个discard服务。但目前它永远不会关闭socket,即永远不会走到else分支,在遇到对方断开连接的时候会陷入busy loop。\(\S8.6\)会处理连接的断开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void onConnection(const muduo::TcpConnectionPtr& conn) {
if (conn->connected()) {
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toHostPort().c_str());
} else {
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}

void onMessage(const muduo::TcpConnectionPtr& conn,
const char* data, ssize_t len) {
printf("onMessage(): received %zd bytes from connection [%s]\n",
len, conn->name().c_str());
}

int main() {
printf("main(): pid=%d\n", getpid());
muduo::InetAddress listenAddr(9981);
muduo::EventLoop loop;
muduo::TcpServer server(&loop, listenAddr);
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}

6. TcpConnection断开连接

muduo只有一种关闭连接的方式:被动关闭(p.191)。即对方先关闭连接,本地read(2)返回0,触发关闭逻辑。将来如果有必要也可以给TcpConnection新增forceclose()成员函数,用于主动关闭连接,实现很简单,调用handleclose()即可。处理调用的流程见图8-5,其中的“x”表示TcpConnection通常会在此时析构。

一般来讲数据的删除比新建要复杂,TCP连接也不例外。关闭连接的流程看上去有点“绕”,根本原因是p.274讲的对象生命期管理的需要。

6.1 Channel的改动

Channel class新增了closeCallback事件回调,并且断言(assert())在事件处理期间本Channel对象不会析构,即不会发生p.274讲的出错情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Channel : boost::noncopyable {
public:
typedef boost::function<void()> EventCallback;
typedef boost::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; }
void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }
int fd() const { return fd_; }
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
private:
void update();
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_;
const int fd_;
int events_;
int revents_;
int index_; // used by Poller.
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};

TcpConnection的改动

TcpConnection class也新增了closeCallback事件回调,但是这个回调是给TcpServer和TcpClient用的,用于通知它们移除所持有的TcpConnectionPtr,这不是给普通用户用的,普通用户继续使用ConnectionCallback。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection> {
public:
// ...
private:
enum StateE { kConnecting, kConnected, kDisconnecting, kDisconnected };
void setState(StateE s) { state_ = s; }
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const std::string& message);
void shutdownInLoop();
EventLoop* loop_;
std::string name_;
StateE state_; // FIXME: use atomic variable
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
closeCallback closeCallback_;
Buffer inputBuffer_;
Buffer outputBuffer_;
};

TcpConnection把另外几个handle*()事件处理函数也补上了,handleWrite()暂时为空。Channel的closeCallback会调用TcpConnection::handleClose(),依此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void TcpConnection::handleRead(Timestamp receiveTime) {
char buf[65536];
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

// TcpConnection::handleClose()的主要功能是调用closeCallback,这个回调绑定到TcpServer::removeconnection()
void TcpConnection::handleClose() {
loop_->assertInLoopThread();
LOG_TRACE << "TcpConnection::handleClose state=" << state_;
assert(state_ == kConnected);
channel_->disableAll();
closeCallback_(shared_from_this());
}

void TcpConnection::handleError() {
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_ << "] - SO_ERROR = " << err << " " << strerror(err);
}

TcpConnection::connectDestroyed()是TcpConnection析构前最后调用的一个成员函数,它通知用户连接已断开。其中的channel_->disableAll();与上面的channel_->disableAll();重复,这是因为在某些情况下可以不经由handleClose()面直接调用connectDestroyed。

1
2
3
4
5
6
7
8
void TcpConnection::connectDestroyed() {
loop_->assertInLoopThread();
assert(state_ == kConnected);
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
loop_->removeChannel(get_pointer(channel_));
}

6.2 TcpServer的改动

TcpServer向TcpConnection注册closeCallback,用于接收连接断开的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, "#%d", nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toHostPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1));
conn->connectEstablished();
}

void TcpServer::removeConnection(const TcpConnectionPtr& conn) {
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnection [" << name_ << "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
assert(n == 1); (void)n;
loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}

通常TcpServer的生命期长于它建立的TcpConnection,因此不用担心TcpServer对象失效。在muduo中,TcpServer的析构函数会关闭连接,因此也是安全的。

TcpServer:removeConnection()把conn从ConnectionMap中移除。这时Tcp-Connection已经是命悬一线:如果用户不持有TcpConnectionPtr的话,conn的引用计数已降到1。注意这里一定要用EventLoop:queueInLoop(),否则就会出现p.274讲的对象生命期管理问题。另外注意这里用boost::bind让TcpConnection的生命期长到调用connectDestroyed()的时刻

有兴趣的读者可以单步跟踪连接断开的流程,s06/test8.cc不会陷入busyloop。目前的做法不是最简洁的,但是可以几乎原封不动地用到多线程TcpServer中(\(\S8.10\))。

6.3 EventLoop和Poller的改动

本节TcpConnection不再是只生不灭,因此要求EventLoop也提供unregister功能。EventLoop新增了removeChannel()成员函数,它会调用Poller::removeChannel,后者定义如下,复杂度为O(logN)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void Poller::removeChannel(Channel* channel) {
assertInLoopThread();
LOG_TRACE << "fd=" << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd() - 1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size() - 1) {
pollfds_.pop_back();
} else {
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin() + idx, pollfds_.end() - 1);
if (channelAtEnd >= 0) {
channelAtEnd = -channelAtEnd - 1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}

注意其中从数组pollfds_中删除元素是O(1)复杂度,办法是将待删除的元素与最后一个元素交换,再pollfds_.pop_back()。这需要相应地修改p.286的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Poller::updateChannel(Channel* channel) {
assertInLoopThread();
LOG_TRACE << "fd=" << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == -channel->fd() - 1 && pfd.events == channel->events());
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
// ignore this pollfd
pfdfd = -channel->fd() - 1;
}assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == -channel->fd() - 1 && pfd.events == channel->events());
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
// ignore this pollfd
pfdfd = -channel->fd() - 1;
}
}

7. Buffer读取数据

Buffer是非阻塞TCP网络编程必不可少的东西\(\S7.4\)),本节介绍用Buffer来处理数据输入,下一节介绍数据输出。Buffer是另一个具有值语义的对象。

首先修改s07/Callbacks.h中MessageCallback的定义,现在的参数和muduo一样,是Buffer*和Timestamp,不再是原始的(const char* buf,int len)。

1
2
3
typedef boost::function<void (const TcpConnectionPtr&, 
Buffer* buf,
Timestamp)> MessageCallback;

其中Timestamp是poll(2)返回的时刻,即消息到达的时刻,这个时刻早于读到数据的时刻(read(2)调用或返回)。因此如果要比较准确地测量程序处理消息的内部延迟,应该以此时刻为起点,否则测出来的结果偏小,特别是处理并发连接时效果更明显。(为什么?)为此我们需要修改Channel中ReadEventCallback的原型,改动如下。EventLoop::loop()也需要有相应的改动,此处从略。

1
2
3
4
5
6
7
8
9
class Channel : boost::noncopyable {
public:
typedef boost::function<void()> EventCallback;
+typedef boost::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; }
};

s07/test3.cc试验了以上改动:

1
2
3
4
5
6
7
8
9
10
11
void timeout(muduo::Timestamp receiveTime) {
printf("%s Timeout!\n", receiveTime.toFormattedString().c_str());
g_loop->quit();
}

int main() {
printf("%s started\n", muduo::Timestamp::now().toFormattedString().c_str());
muduo::EventLoop loop;
g_loop = &loop;
// ... 省略部分代码 ...
}

7.1 TcpConnection使用Buffer作为输入缓冲

先给TcpConnection添加inputBuffer_成员变量。

1
2
3
4
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
closeCallback closeCallback_;
+Buffer inputBuffer_;

然后修改TcpConnection::handleRead()成员函数,使用Buffer来读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
void TcpConnection::handleRead(Timestamp receiveTime) {
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd, &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

修改s07/test8.cc以试验本次改动后的新功能。

1
2
3
4
5
6
7
void onMessage(const muduo::TcpConnectionPtr& conn, muduo::Buffer* buf, muduo::Timestamp receiveTime) {
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
buf->readableBytes,
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
printf("onMessage():[%s]\n", buf->retrieveAsString().c_str());
}

这个测试程序看上去和muduo的正式用法没有区别。

7.2 Buffer::readFd()

我在p.208提到Buffer读取数据时兼顾了内存使用量和效率,其实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ssize_t Buffer::readFd(int fd, int* savedErrno) {
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
const ssize_t n = readv(fd, vec, 2);
if (n < 0) {
*savedErrno = errno;
} else if (implicit_cast<size_t>(n) <= writable) {
writerIndex_ += n;
} else {
writerIndex_ = buffer_.size;
append(extrabuf, n - writable);
}
return n;
}

这个实现有几点值得一提。

  • 一是使用了scatter/gather IO,并且一部分缓冲区取自stack,这样输入缓冲区足够大,通常一次readv(2)调用就能取完全部数据。由于输入缓冲区足够大,也节省了一次ioctl(socketFd,FIONREAD,&length)系统调用,不必事先知道有多少数据可读而提前预留(reserve())Buffer的capacity(),可以在一次读取之后将extrabuf中的数据append()给Buffer。
  • 二是Buffer::readFd()只调用一次read(2),而没有反复调用read(2)直到其返回EAGAIN。
    • 首先,这么做是正确的,因为muduo采用level trigger,这么做不会丢失数据或消息。
    • 其次,对追求低延迟的程序来说,这么做是高效的,因为每次读数据只需要一次系统调用。
    • 再次,这样做照顾了多个连接的公平性,不会因为某个连接上数据量过大而影响其他连接处理消息。

假如muduo采用edge trigger,那么每次handleRead()至少调用两次read(2),平均起来比level trigger多一次系统调用,edge trigger不见得更高效。

将来的一个改进措施是:如果n = writable + sizeof extrabuf,就再读一次。

在一个不繁忙(没有出现消息堆积)的系统上,程序一般等待在poll(2)上,一有数据到达就会立刻唤醒应用程序来读取,那么每次read的数据不会超过几KiB(一两个以太网frame),这里64KiB缓冲足够容纳千兆网在500us内全速收到的数据,在一定意义下可视为延迟带宽积(bandwidth-delay product)。

8. TcpConnection发送数据

发送数据比接收数据更难,因为发送数据是主动的,接收读取数据是被动的。这也是本章先介绍TcpServer后介绍TcpClient的原因。到目前为止,我们只用到了Channel的ReadCallback:

  • TimerQueue用它来读timerfd(2);
  • EventLoop用它来读eventfd(2);
  • TcpServer/Acceptor用它来读listening socket;
  • TcpConnection用它来读普通TCP socket。

本节会动用其writeCallback,由于muduo采用level trigger,因此我们只在需要时才关注writable事件,否则就会造成busy loop。s08/Channel.h的改动如下:

1
2
3
4
5
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
+bool isWriting() const { return events_ & kWriteEvent; }

TcpConnection的接口中增加了send()和shutdown两个函数,这两个函数都可以跨线程调用。为了简单起见,本章只提供一种send()重载。

1
2
3
4
5
+//void send(const void* message, size_t len);
+// Thread safe.
+void send(const std::string& message);
+// Thread safe.
+void shutdown();

TcpConnection的状态增加到了4个,和目前muduo的实现一致。

1
enum StateE { kConnecting, kConnected, kDisconnecting, kDisconnected, };

其内部实现增加了两个InLoop成员函数,对应前面的两个新接口函数,并使用Buffer作为输出缓冲区。

1
2
3
4
5
6
void handleClose();
void handleError();
+void sendInLoop(const std::string& message);
+void shutdownInLoop();
Buffer inputBuffer_;
+Buffer outputBuffer_;

TcpConnection有一个非常简单的状态图(见图8-6)。

TcpConnection在关闭连接的过程中与其他操作(读写事件)的交互比较复杂,尚需完备的单元测试来验证各种时序下的正确性。必要时可能要新增状态。

shutdown()是线程安全的,它会把实际工作放到shutdownInLoop()中来做,后者保证在IO线程调用。如果当前没有正在写入,则关闭写入端(P.191)。代码注释给出了两个值得改进的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TcpConnection::shutdown() {
// FIXME: use compare and swap
if (state_ == kConnected) {
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}

void TcpConnection::shutdownInLoop() {
loop_->assertInLoopThread();
if (!channel_->isWriting())
socket_->shutdownWrite();
}

由于新增了kDisconnecting状态,TcpConnection::connectDestroyed()和TcpConnection::handleClose()中的assert也需要相应的修改,代码从略。

send()也是一样的,如果在非IO线程调用,它会把message复制一份,传给IO线程中的sendInLoop()来发送。这么做或许有轻微的效率损失,但是线程安全性很容易验证,我认为还是利大于弊。如果真的在乎这点性能,不如让程序只在IO线程调用send()。另外在C++11中可以使用移动语义,避免内存拷贝的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TcpConnection::send(const std::string& message) {
if (state_ == kConnected) {
if (loop_->isInLoopThread())
sendInLoop(message);
else
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop, this, message));
}
}

void TcpConnection::sendInLoop(const std::string& message) {
loop_->assertInLoopThread();
ssize_t nwrote = 0;
// if nothing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
nwrote = ::write(channel_->fd(), message.data(), message.size());
if (nwrote >= 0) {
if (implicit_cast<size_t>(nwrote) < message.size()) {
LOG_TRACE << "I am going to write more data";
}
} else {
nwrote = 0;
if (errno != EWOULDBLOCK) {}
LOG_SYSERR << "TcpConnection::sendInLoop";
}
}
}
assert(nwrote >= 0);
if (implicit_cast<size_t>(nwrote) < message.size()) {
outputBuffer_.append(message.data() + nwrote, message.size() - nwrote);
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
}

sendInLoop()会先尝试直接发送数据,如果一次发送完毕就不会启用WriteCallback;如果只发送了部分数据,则把剩余的数据放入outputBuffer_,并开始关注writable事件,以后在handlerWrite()中发送剩余的数据。如果当前outputBuffer_已经有待发送的数据,那么就不能先尝试发送了,因为这会造成数据乱序。

当socket变得可写时,Channel会调用TcpConnection::handleWrite(),这里我们继续发送outputBuffer_中的数据。一旦发送完毕,立刻停止观察writable事件,避免busy loop。另外如果这时连接正在关闭,则调用shutdownInLoop,继续执行关闭过程。这里不需要处理错误,因为一旦发生错误,handleRead()会读到0字节,继而关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
} else {
LOG_TRACE << "I am going to write more data";
}
} else {
LOG_SYSERR << "TcpConnection::handleWrite";
}
} else {
LOG_TRACE << "Connection is down, no more writing";
}
}

注意sendInLoop()和handleWrite()都只调用了一次write(2)而不会反复调用直至它返回EAGAIN,原因是如果第一次write(2)没有能够发送完全部数据的话,第二次调用write(2)几乎肯定会返回EAGAIN。读者可以很容易用下面的Python代码来验证这一点。因此muduo决定节省一次系统调用,这么做不影响程序的正确性,却能降低延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/python
import socket, sys
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((remote_hostname, 9876)) # 这里最好连接到网络上的一台机器
sock.setblocking(0)
a = 'a' * int(sys.argv[1])
# 两条消息的长度由命令行给出,a应该足够大
b = b'*' * int(sys.argv[2])
n1 = sock.send(a) # 第一次发送
n2 = 0
try:
n2 = sock.send(b) # 第二次发送,遇到EAGAIN会抛socket.error异常
except socket.error as ex:
print(ex) # socket.error: [Errno 11] Resource temporarily unavailable
print(n1)
print(n2)
sock.close()

一个改进措施:TcpConnection的输出缓冲区不必是连续的(outputBuffer_改成ptr_vector<Buffer>),handleWrite()可以用writev(2)来发送多块数据,这样或许能减小内存拷贝的次数,略微提高性能(但这种性能提高不一定能被外界感知)。

在level trigger模式中,数据的发送比较麻烦,因为不能一直关注writable事件,不过数据的读取很简单。我认为理想的做法是对readable事件采用level trigger,对writable事件采用edge trigger,但是目前Linux不支持这种设定。

s08/test9.cc是echo server(S6.4.2),代码从略。s08/test10.cc试验TcpConnection::send()的功能,它和前面的Python示例相近,都是通过命令行指定两条消息的大小,然后连续发送两条消息。通过选择不同的消息长度,可以试验不同的code path。

1
2
3
4
5
6
7
8
9
10
11
12
13
void onConnection(const muduo::TcpConnectionPtr& conn) {
if (conn->connected()) {
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toHostPort().c_str());
conn->send(message1);
conn->shutdown();
conn->send(message2);
} else {
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}

9. 完善TcpConnection

至此TcpConnection的主体功能接近完备,可以应付大部分muduo示例的需求了。本节补充几个小功能,让它成为可以实用的单线程非阻塞TCP网络库。

9.1 SIGPIPE

SIGPIPE的默认行为是终止进程,在命令行程序中这是合理的,在网络编程中,这意味着如果对方断开连接而本地继续写入的话,会造成服务进程意外退出。假如服务进程繁忙,没有及时处理对方断开连接的事件,就有可能出现在连接断开之后继续发送数据的情况。下面这个例子模拟了这种情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
void onConnection(const muduo::TcpConnectionPtr& conn) {
if (conn->connected()) {
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toHostPort().c_str());
if (sleepSeconds > 0) {
::sleep(sleepSeconds);
}
conn->send(message1);
conn->send(message2);
conn->shutdown();
}
}

假设sleepSeconds是5秒,用nc localhost 9981创建连接之后立刻Ctrl-C断开客户端,服务进程过几秒就会退出。解决办法很简单,在程序开始的时候就忽略SIGPIPE,可以用C++全局对象做到这一点。

1
2
3
4
5
6
7
8
class IgnoreSigPipe {
public:
IgnoreSigPipe() {
::signal(SIGPIPE, SIG_IGN);
}
};

IgnoreSigPipe initobj;

9.2 TCPNoDelay和TCPkeepalive

TCPNoDelay和TCPkeepalive都是常用的TCP选项,

  • 前者的作用是禁用Nagle算法,避免连续发包出现延退,这对编写低延迟网络服务很重要。
  • 后者的作用是定期探查TCP连接是否还存在。一般来说如果有应用层心跳的话,TCPkeepalive不是必需的,但是一个通用的网络库应该暴露其接口。

以下是TcpConnection::setTcpNoDelay()的实现,涉及3个文件。

1
void setTcpNoDelay(bool on);
1
2
3
void TcpConnection::setTcpNoDelay(bool on) {
socket_->setTcpNoDelay(on);
}
1
2
3
4
5
void Socket::setTcpNoDelay(bool on) {
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP,
TCP_NODELAY, &optval, sizeof optval);
}

TcpConnection::setKeepAlive()的实现与之类似,此处从略,可参考muduo源码。

9.3 WriteCompleteCallback和HighWaterMarkCallback

非阻塞网络编程的发送数据比读取数据要困难得多:

  • 一方面是\(\S8.8\)提到的“什么时候关注writable事件”的问题,这只带来编码方面的难度;
  • 另一方面是如果发送数据的速度高于对方接收数据的速度,会造成数据在本地内存中堆积,这带来设计及安全性方面的难度。

muduo对此的解决办法是提供两个回调,有的网络库把它们称为“高水位回调”和“低水位回调”,muduo使用HighWaterMarkCallback和WriteCompleteCallback这两个名字。WriteCompleteCallback很容易理解,如果发送缓冲区被清空,就调用它。TcpConnection有两处可能触发此回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void TcpConnection::sendInLoop(const std::string& message) {
loop_->assertInLoopThread();
ssize_t nwrote = 0;
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
nwrote = ::write(channel_->fd(), message.data(), message.size());
if (nwrote >= 0) {
if (implicit_cast<size_t>(nwrote) < message.size()) {
LOG_TRACE << "I am going to write more data";
} else if (writeCompleteCallback_) {
loop_->queueInLoop(
boost::bind(writeCompleteCallback_, shared_from_this()));
}
} else {
nwrote = 0;
}
}
}

void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = ::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
if (writeCompleteCallback_) {
loop_->queueInLoop(
boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
if (state_ == kDisconnecting) {
shutdownInLoop();
}
} else {
LOG_SYSERR << "TcpConnection::handleWrite";
} else if (callingExpiredTimers_) {
cancelingTimers_.insert(timer);
}
}
}

TcpConnection和TcpServer也需要相应地暴露WriteCompleteCallback的接口,代码从略。

s09/test11.cc是chargen服务(\(\S7.1\)),用到了writeCompleteCallback,代码从略。

另外一个有用的callback是HighWaterMarkCallback,如果输出缓冲的长度超过用户指定的大小,就会触发回调(只在上升沿触发一次)。代码见muduo,此处从略。

如果用非阻塞的方式写一个proxy,proxy有C和S两个连接(\(\S7.13\))。只考虑server发给client的数据流(反过来也是一样),为了防止server发过来的数据撑爆C的输出缓冲区,一种做法是在C的HighWaterMarkCallback中停止读取S的数据,而在C的WriteCompleteCallback中恢复读取S的数据。这就跟用粗水管往水桶里灌水,用细水管从水桶中取水一个道理,上下两个水龙头要轮流开合,类似PWM。

10. 多线程TcpServer

本章的最后几节介绍三个主题:多线程TcpServer、TcpClient、epoll(4),主题之间相互独立。

本节介绍多线程TcpServer,用到了EventLoopThreadPool class。

10.1 EventLoopThreadPool

用one loop per thread的思想实现多线程TcpServer的关键步骤是在新建TcpConnection时从event loop pool里挑选一个loop给TcpConnection用。也就是说多线程TcpServer自己的EventLoop只用来接受新连接,而新连接会用其他EventLoop来执行IO。(单线程TcpServer的EventLoop是与TcpConnection共享的。)muduo的event loop pool由EventLoopThreadPool class表示,接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EventLoopThreadPool : boost::noncopyable {
public:
EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start();
EventLoop* getNextLoop();

private:
EventLoop* baseLoop_;
bool started_;
int numThreads_;
int next_; // always in loop thread
boost::ptr_vector<EventLoopThread> threads_;
std::vector<EventLoop*> loops_;
};

TcpServer每次新建一个TcpConnection就会调用getNextLoop()来取得EventLoop,如果是单线程服务,每次返回的都是baseLoop,即TcpServer自己用的那个loop。其中setThreadNum()的参数的意义见TcpServer代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TcpServer : boost::noncopyable {
public:
TcpServer(EventLoop* loop, const InetAddress& listenAddr);
-TcpServer(); // force out-line dtor, for scoped_ptr members.
void setThreadNum(int numThreads);
private:
void newConnection(int sockfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);

typedef std::map<std::string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // the acceptor loop
const std::string name_;
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
boost::scoped_ptr<EventLoopThreadPool> threadPool_;
};

多线程TcpServer的改动很简单,新建连接只改了3行代码。原来是把TcpServer自用的loop_传给TcpConnection,现在是每次从EventLoopThreadPool取得ioLoop。

1
2
3
4
5
6
7
8
9
10
11
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
InetAddress localAddr(sockets::getLocalAddr(sockfd));
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, -1)); // FIXME: unsafe
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}

连接的销毁也不复杂,把原来的removeConnection()拆为两个函数,因为TcpConnection会在自己的ioLoop线程调用removeConnection(),所以需要把它移到TcpServer的loop_线程(因为TcpServer是无锁的)。再次把connectDestroyed()移到TcpConnection的ioLoop线程进行,是为了保证TcpConnection的ConnectionCallback始终在其ioLoop回调,方便客户端代码的编写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TcpServer::removeConnection(const TcpConnectionPtr& conn) {
loop_->runInLoop(
boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) {
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
assert(n == 1); (void)n;
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}

总而言之,TcpServer和TcpConnection的代码都只处理单线程的情况(甚至都没有mutex成员),而我们借助EventLoop::runInLoop并引入EventLoopThreadPool让多线程TcpServer的实现易如反掌。注意ioLoop和loop_间的线程切换都发生在连接建立和断开的时刻,不影响正常业务的性能。

muduo目前采用最简单的round-robin算法来选取pool中的EventLoop,不允许TcpConnection在运行中更换EventLoop,这对长连接和短连接服务都是适用的,不易造成偏载。muduo目前的设计是每个TcpServer有自己的EventLoopThreadPool,多个TcpServer之间不共享EventLoopThreadPool。将来如果有必要,也可以多个TcpServer共享EventLoopThreadPool,比方说一个服务有多个等价的TCP端口,每个TcpServer负责一个端口,而来自这些端口的连接共享一个EventLoopThreadPool。另外一种可能的用法是一个EventLoop供两个TcpServer使用(a和b)。其中a是单线程服务,aLoop既要accept(2)连接也要执行IO;而b是多线程服务,有自己的EventLoopThreadPool,只用aLoop来accept(2)连接。aLoop上还可以运行几个TcpClient。这些搭配都是可行的,这也正是EventLoop的灵活性所在,可以根据需要在多个线程间调配负载。

本节更新了test8~test11,均支持多线程。

11. Connector

主动发起连接比被动接受连接要复杂一些,一方面是错误处理麻烦,另一方面是要考虑重试。在非阻塞网络编程中,发起连接的基本方式是调用connect(2),当socket变得可写时表明连接建立完毕。当然这其中要处理各种类型的错误,因此我们把它封装为Connector class。接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
class Connector : boost::noncopyable {
public:
typedef boost::function<void (int sockfd)> NewConnectionCallback;
Connector(EventLoop* loop, const InetAddress& serverAddr);
-Connector();
void setNewConnectionCallback(const NewConnectionCallback& cb) {
newConnectionCallback_ = cb;
}
void start();
void restart(); // must be called in loop thread
void stop(); // can be called in any thread
};

Connector只负责建立socket连接,不负责创建TcpConnection,它的NewConnectionCallback回调的参数是socket文件描述符。以下是一个简单的测试(s11/test12.cc),它会反复尝试直至成功建立连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
muduo::EventLoop* g_loop;
void connectCallback(int sockfd) {
printf("connected.\n");
g_loop->quit();
}

int main(int argc, char* argv[]) {
muduo::EventLoop loop;
g_loop = &loop;
muduo::InetAddress addr("127.0.0.1", 9981);
muduo::ConnectorPtr connector(new muduo::Connector(&loop, addr));
connector->setNewConnectionCallback(connectCallback);
connector->start();

loop.loop();
}

Connector的实现有几个难点:

  • socket是一次性的,一旦出错(比如对方拒绝连接),就无法恢复,只能关闭重来。但Connector是可以反复使用的,因此每次尝试连接都要使用新的socket文件描述符和新的Channel对象。要留意Channel对象的生命期管理,并防止socket文件描述符泄漏。
  • 错误代码与accept(2)不同,EAGAIN是真的错误,表明本机ephemeral port暂时用完,要关闭socket再延期重试。“正在连接”的返回码是EINPROGRESS。
  • 另外,即便出现socket可写,也不一定意味着连接已成功建立,还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, :.)再次确认一下。
  • 重试的间隔应该逐渐延长,例如0.5s、1s、2s、4s,直至30s,即back-off。这会造成对象生命期管理方面的困难,如果使用EventLoop::runAfter()定时而Connector在定时器到期之前析构了怎么办?本节的做法是在Connector的析构函数中注销定时器。
  • 要处理自连接(self-connection)。出现这种状况的原因如下。在发起连接的时候,TCP/IP协议栈会先选择source IP和source port,在没有显式调用bind(2)的情况下,source IP由路由表确定,source port由TCP/IP协议栈从local port range°中选取尚未使用的port(即ephemeral port)。如果destination IP正好是本机,而destination port位于local port range,且没有服务程序监听的话,ephemeral port可能正好选中了destination port,这就出现(source IP, source port) = (destination IP, destination port)的情况,即发生了自连接。处理办法是断开连接再重试,否则原本侦听destination port的服务进程也无法启动了。

11.1 TimerQueue::cancel

\(\S8.2\)实现的TimerQueue不能注销定时器,本节补充这一功能。TimerQueue::cancel()的一种简单实现是用shared_ptr来管理Timer对象,再将TimerId定义为weak_ptr,这样几乎不用我们做什么事情。在C++11中应该也足够高效,因为shared_ptr具备移动语义,可以做到引用计数值始终不变,没有原子操作的开销。但用shared_ptr来管理Timer对象似乎显得有点小题大做,而且这种做法也有一个小小的缺点,如果用户一直持有TimerId,会造成引用计数所占的内存无法释放,而本节展示的做法不会有这个问题。

本节采用更传统的方式,保持现有的设计,让TimerId包含Timer*。但这是不够的,因为无法区分地址相同的先后两个Timer对象。**因此每个Timer对象有一个全局递增的序列号int64_t sequence_(用原子计数器AtomicInt64生成),TimerId同时保存Timer*和sequence_,这样TimerQueue::cancel()就能根据TimerId找到需要注销的Timer对象。**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Timer : boost::noncopyable {
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet()) {}
private:
const TimerCallback callback_;
Timestamp expiration_;
const double interval_;
const bool repeat_;
const int64_t sequence_;
static AtomicInt64 s_numCreated_;
};

TimerQueue新增了cancel接口函数,这个函数是线程安全的。

cancel()有对应的cancelInLoop()函数,因此TimerQueue不必用锁。TimerQueue新增了几个数据成员,activeTimers_.保存的是目前有效的Timer的指针,并满足invariant:timers_.size()=activeTimers_.size(),因为这两个容器保存的是相同的数据,只不过timers_是按到期时间排序,activeTimers_是按对象地址排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TimerQueue : boost::noncopyable {
public:
void cancel(TimerId timerId);
private:
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;
void addTimerInLoop(Timer* timer);
void cancelInLoop(TimerId timerId);
bool callingExpiredTimers_; /* atomic */
ActiveTimerSet activeTimers_;
ActiveTimerSet cancelingTimers_;
TimerList timers_;
};

由于TimerId不负责Timer的生命期,其中保存的Timer*可能失效,因此不能直接dereference,只有在activeTimers_中找到了Timer时才能提领。注销定时器的流程如下,照例用EventLoop::runInLoop()将调用转发到IO线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TimerQueue::cancel(TimerId timerId) {
loop_->runInLoop(boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::cancelInLoop(TimerId timerId) {
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
ActiveTimerSet::iterator it = activeTimers_.find(timer);
if (it != activeTimers_.end()) {
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please
activeTimers_.erase(it);
} else if (callingExpiredTimers_) {
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}

上面这段代码中的cancelingTimers_和callingExpiredTimers_是为了应对“自注销”这种情况,即在定时器回调中注销当前定时器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
muduo::EventLoop* g_loop;
muduo::TimerId toCancel;

void cancelSelf() {
printf("cancelSelf\n");
g_loop->cancel(toCancel);
}

int main() {
muduo::EventLoop loop;
g_loop = &loop;
toCancel = loop.runEvery(5, cancelSelf);
loop.loop();
}

当运行到cancelSelf()的时候,toCancel代表的Timer已经不在timers_和activeTimers_这两个容器中,而是位于expired中。为了应对这种情况,TimerQueue会记住在本次调用到期Timer期间有哪些cancel()请求,并且不再把已cancel()的Timer添加回timers_和activeTimers_当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void TimerQueue::handleRead() {
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);

std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancelingTimers_.clear();
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it) {
it->second->run();
}
callingExpiredTimers_ = false;
reset(expired, now);
}

// 为了应对这种情况,TimerQueue会记住在本次调用到期Timer期间有哪些cancel()请求,并且不再把已cancel()的Timer添加回timers_和activeTimers_当中。
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) {
Timestamp nextExpire;
for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it) {
ActiveTimer timer(it->second, it->second->sequence());
if (it->second->repeat() &&
cancelingTimers_.find(timer) == cancelingTimers_.end()) {
it->second->restart(now);
insert(it->second);
} else {
// FIXME move to a free list
delete it->second;
}
}
}

注意TimerQueue在执行run()时没有检查Timer是否已撤销,这是因为TimerQueue::cancel()并不提供strong guarantee。TimerQueue::getExpired()和TimerQueue::insert()均增加了与activeTimers_有关的处理,此处从略。

12. TcpClient

有了Connector,TcpClient就不难实现了,它的代码与TcpServer甚至有几分相似(都有newConnection和removeConnection这两个成员函数),只不过每个TcpClient只管理一个TcpConnection。代码从略,此处谈几个要点:

  • TcpClient具备TcpConnection断开之后重新连接的功能,加上Connector具备反复尝试连接的功能,因此客户端和服务端的启动顺序无关紧要。可以先启动客户端,一旦服务端启动,半分钟之内即可恢复连接(由Connector::kMaxRetryDelayMs常数控制);在客户端运行期间服务端可以重启,客户端也会自动重连。
  • 连接断开后初次重试的延迟应该有随机性,比方说服务端崩溃,它所有的客户端连接同时断开,然后0.5s之后同时再次发起连接,这样既可能造成SYN丢包,也可能给服务端带来短期大负载,影响其服务质量。因此每个TcpClient应该等待一段随机的时间(0.5~2s),再重试,避免拥塞。
  • 发起连接的时候如果发生TCP SYN丢包,那么系统默认的重试间隔是3s,这期间不会返回错误码,而且这个间隔似乎不容易修改。如果需要缩短间隔,可以再用一个定时器,在0.5s或1s之后发起另一次连接。如果有需求的话,这个功能可以做到Connector中。
  • 目前本节实现的TcpClient没有充分测试动态增减的情况,也就是说没有充分测试TcpClient的生命期比EventLoop短的情况,特别是没有充分测试TcpClient在连接建立期间析构的情况。编写这方面的单元测试多半要用到S12.4介绍的技术。
  • 目前muduo 0.8.0采用shared_ptr来管理Connector,因为在编写这部分代码的时候TimerQueue尚不支持cancel()操作。将来muduo 1.0会在充分测试的前提下改用这里展示的简洁的实现。

13. epoll

epoll(4)是Linux独有的高效的IO multiplexing机制,它与poll(2)的不同之处主要在于poll(2)每次返回整个文件描述符数组,用户代码需要遍历数组以找到哪些文件描述符上有IO事件,而epoll_wait(2)返回的是活动fd的列表,需要遍历的数组通常会小得多。在并发连接数较大而活动连接比例不高时,epoll(4)比poll(2)更高效。

本节我们把epoll(4)封装为EPoller class,它与\(\S8.1.2\)的Poller class具有完全相同的接口。muduo实际的做法是定义Poller基类并提供两份实现PollPoller和EPoller。这里为了简单起见,我们直接修改EventLoop,只需把代码中的Poller替换为EPoller。

EPoller的关键数据结构如下,其中events_不是保存所有关注的fd列表,而是一次epoll_wait(2)调用返回的活动fd列表,它的大小是自适应的。

1
2
typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel*> ChannelMap;
1
2
3
4
5
6
7
8
9
10
11
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data;
};

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

为了减少转换,muduo Channel没有自已定义IO事件的常量,而是直接使用poll(2)的定义(POLLIN、POLLOUT等等),在Linux中它们和epoll(4)的常量相等。

1
2
3
4
5
6
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);

EPoller::poll()的关键代码如下。&*events_.begin(),在C++11中可写为events_.data()。events_.resize(events_.size() * 2);表示如果当前活动fd的数目填满了events_,那么下次就尝试接收更多的活动fd。events_的初始长度是16(kInitEventListSize),其会根据程序的IO繁忙程度自动增长,但目前不会自动收缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Timestamp EPoller::poll(int timeoutMs, ChannelList* activeChannels) {
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0) {
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size()) {
events_.resize(events_.size() * 2);
}
}
return now;
}

EPoller::fillActiveChannels的功能是将events_中的活动fd填入activeChannels,其中L7~L10是在检查invariant。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void EPoller::fillActiveChannels(int numEvents, ChannelList activeChannels) const {
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i) {
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events);
activeChannels.push_back(channel);
}
}

updateChannel()和removeChannel()的代码从略。因为epoll是有状态的,因此这两个函数要时刻维护内核中的fd状态与应用程序的状态相符,Channel::index()和Channel::set_index()被挪用为标记此Channel是否位于epoll的关注列表之中。这两个函数的复杂度是O(logN),因为Linux内核用红黑树来管理epoll关注的文件描述符清单。

测试程序无须修改,全都已经自动用上了epoll(4)。

至此,一个基于事件的非阻塞TCP网络库已经初具规模。

14. 测试程序一览

本章简要介绍了muduo的实现过程,是一个具有教学示范意义的项目,希望有助于读者理解one looper thread这一编程模型背后的实现,在运用时更加得心应手。如果对本章代码有疑问,应该以最新版的muduo源码为准。

当然可以,以下是转换后的文本:

本节没有配套代码,以下列出前面各节出现的测试代码的功能:

  • \(\S8.0\) s00/test1.cc在两个线程里各自运行一个EventLoop。
  • \(\S8.0\) s00/test2.cc试图在非IO线程调用EventLoop::loop(),程序崩溃。
  • \(\S8.1\) s01/test3.cc用channel关注timerfd的可读事件。
  • \(\S8.2\) s02/test4.cc TimerQueue示例。
  • \(\S8.3\) s03/test5.cc IO线程调用EventLoop::runInLoop()和EventLoop::runAfter()。
  • \(\S8.3\) s03/test6.cc 跨线程调用EventLoop::runInLoop()和EventLoop::runAfter()。
  • \(\S8.4\) s04/test7.cc Acceptor示例。
  • \(\S8.5\) s05/test8.cc discard服务。
  • \(\S8.8\) s08/test9.cc echo服务。
  • \(\S8.8\) s08/test10.cc 发送两次

数据,测试TcpConnection::send()。

  • \(\S8.9\) s09/test11.cc chargen服务,使用writeCompleteCallback。
  • \(\S8.11\) s11/test12.cc Connector示例。
  • \(\S8.12\) s12/test13.cc TcpClient示例。

本章Acceptor、Connector、Reactor等术语是Douglas Schmidt发明的,他的原始论文出处是:


8. muduo网络库设计与实现
http://binbo-zappy.github.io/2024/12/27/muduo多线程/8-muduo网络库设计与实现/
作者
Binbo
发布于
2024年12月27日
许可协议