7. muduo编程示例

第7章 muduo编程示例

本章将介绍如何用muduo网络库完成常见的TCP网络编程任务。内容包括:

1. 五个简单TCP示例

本节将介绍五个简单TCP网络服务程序,包括echo(RFC862)、discard(RFC863)、chargen(RFC864)、daytime(RFC867)、time(RFC868)这五个协议,以及time协议的客户端。各程序的协议简介如下。

  • discard:丢弃所有收到的数据。
  • daytime:服务端accept连接之后,以字符串形式发送当前时间,然后主动断开连接。
  • time:服务端accept连接之后,以二进制形式发送当前时间(从Epoch到现在的秒数),然后主动断开连接:我们需要一个客户程序来把收到的时间转换为字符串。
  • echo:回显服务,把收到的数据发回客户端。
  • chargen:服务端accept连接之后,不停地发送测试数据。

以上五个协议使用不同的端口,可以放到同一个进程中实现,且不必使用多线程。完整的代码见muduo/examples/simple。

1.1 discard

discard恐怕算是最简单的长连接TCP应用层协议,它只需要关注“三个半事件”中的“消息/数据到达”事件,事件处理函数如下:

1
2
3
4
5
6
7
8
void DiscardServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp time)
{
string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << "discards " << msg.size()
<< " bytes received at " << time.toString());
}

与前面p.139的echo服务相比,除了省略namespace外,关键的区别在于少了将收到的数据发回客户端。

1.2 daytime

daytime是短连接协议,在发送完当前时间后,由服务端主动断开连接。它只需要关注“三个半事件”中的“连接已建立”事件,事件处理函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void DaytimeServer::onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << "DaytimeServer-" << conn->peerAddress().toIpPort()
<< "->" << conn->localAddress().toIpPort()
<< " is " << (conn->connected() ? "UP" : "DOWN");
if (conn->connected())
{
// 发送时间字符串
conn->send(Timestamp::now().toFormattedString() + "\n");
// 主动断开连接
conn->shutdown();
}
}

1.3 time

time协议与daytime极为类似,只不过它返回的不是日期时间字符串,而是一个32-bit整数,表示从1970-01-01 00:00:00Z到现在的秒数。当然,这个协议有“2038年问题”。服务端只需要关注“三个半事件”中的“连接已建立”事件,事件处理函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
examples/simple/time/time.cc
void TimeServer::onConnection(const muduo::net::TcpConnectionPtr& conn)
{
LOG_INFO <<"TimeServer-"<< conn->peerAddress().toIpPortO<<"->
<<conn->localAddress().toIpPort()<<"is"
<<(conn->connectedc)?"UP":"DOwN");
if (conn->connected())
{
time_t now=::time(NULL);
int32_t be32=sockets::hostToNetwork32(static_cast<int32_t>(now));
conn->send(&be32,sizeofbe32);
conn->shutdown();
}
}

L9、L10取当前时间并转换为网络字节序(BigEndian),L11发送32-bit整数,L12主动断开连接。

1.4 time客户端

time服务端发送的是二进制数据,不便直接阅读,我们编写一个客户端来解析并打印收到的4个字节数据。这个程序只需要关注“三个半事件”中的“消息/数据到达”事件,事件处理函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
examples/simple/timeclient/timeclient.cc
void onMessage(const TcpConnectionPtr& conn,Buffer* buf, Timestamp receiveTime)
{
if (buf->readableBytes() >=sizeof(int32_t))
{
const void* data =buf->peek();
int32_t be32 =*static_cast<const int32_t*>(data);
buf->retrieve(sizeof(int32_t));
time_t time =sockets::networkToHost32(be32);
Timestamp ts(time, Timestamp::kMicroSecondsPerSecond);
LOG_INFO <<"Server time-"<<time <<","<< ts.toFormattedString();
}
else
{
LOG_INFo<< conn->name()<<"no enough data"<< buf->readableBytes()
<<"at"<<receiveTime.toFormattedString();
}
}

注意其中考虑到了如果数据没有一次性收全,已经收到的数据会累积在Buffer里(在else分支里没有调用Buffer::retrieve*系列函数),以等待后续数据到达,程序也不会阻塞。这样即便服务器一个字节一个字节地发送数据,代码还是能正常工作,这也是非阻塞网络编程必须在用户态使用接收缓冲的主要原因。

这是我们第一次用到TcpClientClass,完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
examples/simple/timeclient/timeclient.cc
class Timeclient :boost::noncopyable
{
public:
Timeclient(EventLoop* loop,const InetAddress& serverAddr)
:loop_(loop),
client_(loop,serverAddr,"Timeclient")
{
client_.setConnectionCallback(
boost::bind(&Timeclient::onConnection, this,_1));
client_.setMessageCallback(
boost::bind(&Timeclient::onMessage,this,_1,_2,_3));
client_.enableRetry();
}

void connect()
{
client_.connect();
}

private:
EventLoop* loop_;
TcpClient client_;

void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO<<conn->localAddress().toIpPort()<<"->
<<conn->peerAddress().toIpPort()<<"is"
<<(conn->connected()?"Up":"DowN");
if(!conn->connected())
{
loop_->quit();
}
}
};

int main(int argc,char argv[J)
{
LOG_INFO <<"pid="<<getpid();
if (argc>1)
{
EventLoop loop;
InetAddress serverAddr(argv[1],2037);
TimeClient timeclient(&loop,serverAddr);
timeclient.connect();
loop.loop();
}
else
{
printf("Usage:%s host_ip)n",argv[o]);
}
}

以上L32表示如果连接断开,就退出事件循环(L46),程序也就终止了。

注意TcpConnection对象表示“一次”TCP连接,连接断开之后不能重建。TcpClient重试之后新建的连接会是另一个TcpConnection对象。

程序的运行结果如下(有折行),假设timeserver运行在本机:

1
2
3
4
5
6
7
8
9
10
$./simple_timeclient 127.@.0.1
2011-02-0204:10:35.1817174296 INF0 pid =4296-timeclient.cc:71
2011-02-0204:10:35.1836684296 INFO TcpClient::connect[Timeclient]-
connecting to 127.0,0.1:2037-Tcpclient.cc:60
2011-02-0204:10:35.1851784296INF0 127.0.0.1:40960 ->127.0.0.1:2037
is Up -timeclient.cc:39
2011-02-0204:10:35.1852794296INF0 Server time= 1296619835,
2011-02-02 04:10:35.000000-timeclient.cc:56
2011-02-02 04:10:35.1853544296 INF0 127.0.0.1:40960->127.0.0.1:2037
is DowN-timeclient.cc:39

1.5 echo

前面几个协议都是单向接收或发送数据,echo是我们遇到的第一个双向的协议:服务端把客户端发过来的数据原封不动地传回去。它只需要关注“三个半事件”中的“消息/数据到达”事件,事件处理函数已在P.139列出,这里复制一遍。

1
2
3
4
5
6
7
8
9
10
examples/simple/echo/echo.cc
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp time)
{
muduo::string msg(buf->retrieveAl1AsStringO);
LOG_INFO<< conn->name(<<"echo"<<msg.size()<<"bytes,"
<<"data receivedat"<< time.tostring();
conn->send(msg);
}

这段代码实现的不是行回显(lineecho)服务,而是有一点数据就发送一点数据。这样可以避免客户端恶意地不发送换行学符,而服务端又必须缓存已经收到的数据,导致服务器内存暴涨。但这个程序还是有一个安全漏洞,即如果客户端故意不断发送数据,但从不接收,那么服务端的发送缓冲区会一直堆积,导致内存暴涨。解决办法可以参考下面的chargen协议,或者在发送缓冲区累积到一定大小时主动断开连接。一般来说,非阻塞网络编程中正确处理数据发送比接收数据要困难,因为要应对对方接收缓慢的情况。

1.6 chargen

Chargen协议很特殊,它只发送数据,不接收数据。而且,它发送数据的速度不能快过客户端接收的速度,因此需要关注“三个半事件”中的半个“消息/数据发送完毕”事件(onwriteComplete),事件处理函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
examples/simple/chargen/chargen.cc
void ChargenServer::onConnection(const TcpConnectionPtr& conn)
{
LOG_INFo<<"ChargenServer-"<< conn->peerAddress().toIpPort() <->"
<<conn->localAddress.toIpPortO)<<"is"
<<(conn->connected()?"UP":"DowN");
if(conn->connected())
{
conn->setTcpNoDelay(true);
conn->send(message_);
}
}

void ChargenServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp time)
{
string msg(buf->retrieveAllAsString());
LOG_INFo<<conn->name()<<"discards"<<msg.size()
<<"bytes received at " << time.tostring;
}

void ChargenServer::onWriteComplete(constTcpConnectionPtr& conn)
{
transferred_+=message_.size;
conn->send(message_);
}

L10在连接建立时发生第一次数据;L26继续发送数据。

完整的chargen服务端还带流量统计功能,用到了定时器,我们会在S7.8介绍定时器的使用,到时候再回头来看相关代码。

用netcat扮演客户端,运行结果如下:

1
2
3
4
5
6
7
$nc localhost 2019 | head
!"#$%&*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ1J*_"abcdefgh
"#$%&()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[1J*_abcdefgh
#$%&(*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[17-"abcdefghi
$%&()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[1J*_abcdefghijk
%&*+,"./0123456789:<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZL\1-abcdefgh1jk1
&')*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[1J*-abcdefghijk1m

1.6 五合一

前面五个程序都用到了EventLoop。这其实是个Reactor,用于注册和分发IO事件。muduo遵循one loop per thread模型,多个服务端(TcpServer)和客户端(Tcpclient)可以共享同一个EventLoop,也可以分配到多个EventLoop上以发挥多核多线程的好处。这里我们把五个服务端用同一个EventLoop跑起来,程序还是单线程的,功能却强大了很多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
examples/simple/allinone/allinone.cc
int main()
{
LOG_INFO <<"pid="<< getpid();
EventLoop loop; // one loop shared by multiple servers

ChargenServer chargenServer(&loop,InetAddress(2019));
chargenServer.start();

DaytimeServer daytimeServer(&loop,InetAddress(2013));
daytimeServer.start();

DiscardServerdiscardServer(&loop,InetAddress(2ee9));
discardServer.start();

EchoServer echoServer(&loop,InetAddress(2007));
echoServer.start();

TimeServertimeServer(&loop,InetAddress(2037));
timeServer.start();

loop.loop();
}

这个例子充分展示了Reactor模式复用线程的能力,让一个单线程程序同时具备多个网络服务功能。二个容易想到的例子是httpd同时侦听80端口和443端口,另一个例子是程序中有多个Tcpclient,分别和数据库、Redis、SudokuSolver等后台服务打交道。对于初次接触这种编程模型的读者,值得跟踪代码运行的详细过程,弄清除每个事件每个回调发生的时机与条件。

2. 文件传输

本节用发送文件的例子来说明TcpConnection::send()的使用。到目前为止,我们用到了TcpConnection::send()的两个重载,分别是send(const string&)和send(const void* message, size_t len)。

TcpConnection目前提供了三个send重载函数,原型如下。

1
2
3
4
5
6
7
8
9
10
11
muduo/net/TcpConnection.h
class TcpConnection:boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
void send(const void message,size_t len);
void send(const StringPiece& message);
void send(Buffer* message); // this one might swap data without copying
//void send(Buffer&&message);//C++11
//void send(string&&message);// C++17
}

在非阻塞网络编程中,发送消息通常是由网络库完成的,用户代码不会直接调用write(2)或send(2)等系统调用。原因见p.205“TcpConnection必须要有output buffer”。在使用TcpConnection::send()时值得注意的有几点:

  • send()的返回类型是void,意味着用户不必关心调用send时成功发送了多少数据,muduo库会保证把数据发送给对方。
  • send()是非阻塞的。意味着客户代码只管把一条消息准备好,调用send()来发送,即便TCP的发送窗口满了,也绝对不会阻塞当前调用线程。
  • send()是线程安全、原子的。多个线程可以同时调用send(),消息之间不会混合或交织。但是多个线程同时发送的消息的先后顺序是不确定的,muduo只能保证每个消息本身的完整性。另外,send()在多线程下仍然是非阻塞的。
  • send(const void* message, size_t len)这个重载最平淡无奇,可以发送任意字节序列。
  • send(const StringPiece& message)这个重载可以发送std::string和const char*,其中StringPiece是Google发明的专门用于传递字符串参数的class,这样程序里就不必为const char*和const std::string&提供两份重载了。
  • send(Buffer*)有点特殊,它以指针为参数,而不是常见的const引用,因为函数中可能用Buffer::swap()来高效地交换数据,避免内存拷贝,起到类似C++右值引用的效果。
  • 如果将来支持C++11,那么可以增加对右值引用的重载,这样可以用move语义来避免内存拷贝。

下面我们来实现一个发送文件的命令行小工具,这个工具的协议很简单,在启动时通过命令行参数指定要发送的文件,然后在2021端口侦听,每当有新连接进来,就把文件内容完整地发送给对方。

如果不考虑并发,那么这个功能用netcat加重定向就能实现。这里展示的版本更加健壮,比方说发送100MB的文件,支持上方个并发客户连接:内存消耗只与并发连接数有关,跟文件大小无关;任何连接可以在任何时候断开,程序不会有内存泄漏或崩溃。

我一共写了三个版本,代码位于examples/filetransfer。

  1. 一次性把文件读入内存,一次性调用send(const string&)发送完毕。这个版本满足除了“内存消耗只与并发连接数有关,跟文件大小无关”之外的健壮性要求。
  2. 一块一块地发送文件,减少内存使用,用到了writeCompleteCallback。这个版本满足了上述全部健壮性要求。
  3. 同2,但是采用shared_ptr来管理FILE*,避免手动调用fclose(3)。

2.1 版本一

在建立好连接之后,把文件的全部内容读入一个string,一次性调用TcpConnection::send()发送。不用担心文件发送不完整。也不用担心send之后立刻shutdown会有什么同题,见下一节的说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
examples/filetransfer/download.cc
const char*g_file = NULL;
string readFile(const char* filename); // read file content to string
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO<<"FileServer-"<<conn->peerAddress.toIpPortO<<"->"
<< conn->localAddressO.toIpPort()<<is"
<<(conn->connected()?"Up":"DOwN"):
if(conn->connected())
{
LOG_INFO <<"FileServer-Sending file "<< g_file
<"to"<< conn->peerAddress().toIpPort();
string fileContent =readFile(g_file);
conn->send(fileContent);
conn->shutdownO;
LOG_INFO <<"FileServer-done";
}
}

int main(int argc, char* argv[J)
{
LOG_INFO<<"pid="<<getpid();
if (argc>1)
{
g_file =argv[1];
EventLoop loop;
InetAddress1istenAddr(2021):
TcpServer server(&loop,listenAddr,"FileServer");
server.setConnectionCallback(onConnection);
server.start();
loop.loop:
}
else
{
fprintf(stderr,"Usage:%s file_for_downloadingin",argv[e]);
}
}

注意每次建立连接的时候我们都去重新读一遍文件,这是考虑到文件有可能被其他程序修改。如果文件是immutable的,整个程序就可以共享同一个filecontent对象。

这个版本有一个明显的缺陷,即内存消耗与(并发连接数×文件大小)成正比,文件越大内存消耗越多,如果文件大小上GB,那几乎就是灾难了。只需要建立少量并发连接就能把服务器的内存耗尽,因此我们有了版本二。

2.2 版本二

为了解决版本一占用内存过多的问题,我们采用流水线的思路,当新建连接时,先发送文件的前64KiB数据,等这块数据发送完毕时再继续发送下64KiB数据,如此往复直到文件内容全部发送完毕。代码中使用了TcpConnection::setContext()和getContext()来保存TcpConnection的用户上下文(这里是FILE*),因此不必使用额外的std::map<TcpConnectionPtr,FILE*>来记住每个连接的当前文件位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
examples/filetransfer/download2.cc
const int kBufsize = 64*1024;
const chark g_file = NULL;
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFo <<"FileServer-"< conn->peerAddress.toIpPort<<"->"
<<conn->localAddress().toIpPort()<<"is"
<<(conn->connected()?"Up""DoWN");
if(conn->connected())
{
LOG_INFO<<"FileServer-Sending file"<<g_file
<<"to"<< conn->peerAddress().toIpPort();
conn->setHighWaterMarkCallback(onHighWaterMark,kBufSize+1);
FILE* fp=::fopen(g_file,"rb");
if(fp)
{
conn->setContext(fp);
char buf[kBufSize];
size_t nread = ::fread(buf, 1, sizeof buf,fp):
conn->send(buf,nread);
}
else
{
conn->shutdown();
LOG_INFO << "FileServer - no such file";
}
}
else {
if(!conn->getContext().empty()) {
FILE* fp = boost::any_cast<FILE*>(conn->getContext());
if(fp) {
::fclose(fp);
}
}
}
}

void onWriteComplete(const TcpConnectionPtr& conn)
{
FILE*fp =boost::any_cast<FILE*>(conn->getContext());
char buf[kBufsize];
size_t nread = ::fread(buf, 1, sizeof buf,fp);
if (nread>0)
{
conn->send(buf, nread);
}
else
{
::fclose(fp);
fp=NULL;
conn->setContext(fp);
conn->shutdown();
LOG_INFO <<"FileServer- done";
}
}

在onWriteComplete()回调函数中读取下一块文件数据,继续发送。

注意每次建立连接的时候我们都去重新打开那个文件,使得程序中文件描述符的数量翻倍(每个连接占一个socketfd和一个filefa),这是考虑到文件有可能被其他程序修改。如果文件是immutable的,一种改进措施是:整个程序可以共享同一个文件描述符,然后每个连接记住自已当前的偏移量,在onwriteComplete()回调函数里用pread(2)来读取数据。

这个版本也存在一个问题,如果客户端故意只发起连接,不接收数据,那么要么把服务器进程的文件描述符耗尽,要么占用很多服务端内存(因为每个连接有64KiB的发送缓冲区)。解决办法可参考后文\(\S7.7\)“限制服务器的最大并发连接数”和\(\S7.10\)“用timingwheel踢掉空闲连接”。

2.3 版本三

用shared_ptr的custom deleter来减轻资源管理负担,使得FILE*的生命期和TcpConnection一样长,代码也更简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
examples/filetransfer/download3.cc
$diff download2.cc download3.cc -U3
const int kBufsize =64*1024;
const char* g-file =NULL;
+typedef boost::shared_ptr<FILE> FilePtr;

void onConnection(const TcpConnectionPtr& conn)
{
FILE*fp=::fopen(g_file,"rb");
if (fp)
{
- conn->setContext(fp);
+ FilePtr ctx(fp, ::fclose);
+ conn->setContext(ctx);
char buf[kBufsize];
size_t nread =::fread(buf,1, sizeof buf,fp);
conn->send(buf,nread);
LOG_INFO << "FileServer - no such file";
}
}

void onwriteComplete(const TcpConnectionPtr& conn)
{
- FILE*fp=boost::any_cast<FILE*>(conn->getContext());
+ const FilePtr&fp = boost::any_cast<const FilePtr&>(conn->getContext());
char buf[kBufsize];
- size_t nread=::fread(buf, 1, sizeof buf,fp):
+ size_t nread=::fread(buf, 1, sizeof buf,get_pointer(fp));
if (nread > 0)
{
conn->send(buf,nread);
}
else
{
- ::fclose(fp);
- fp=NULL;
- conn->setContext(fp);
conn->shutdown();
LOG_INFO <<"FileServer- done";
}
}

以上代码体现了现代C++的资源管理思路,即无须手动释放资源,而是通过将资源与对象生命期绑定,在对象析构的时候自动释放资源,从而把资源管理转换为对象生命期管理,而后者是早已解决了的问题。这正是C++最重要的编程技法:RAII。

2.4 为什么TcpConnection::shutdown()没有直接关闭TCP连接

我曾经收到一位网友的来信:“在simple的daytime示例中,服务端主动关闭时调用的是如下函数序列,这不是只是关闭了连接上的写操作吗,态么是关闭了整个连接?”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void DaytimeServer::onConnection(const nuduo::net::TcpConnectionPtr& conn)
{
if(conn->connectedO)
{
conn->send(Timestamp::now().toFormattedstring()+"In");
conn->shutdown();//调用 TcpConnection::shutdown()
}
}

void TcpConnection::shutdown()
{
if (state_= kConnected)
{
setState(kDisconnecting);
// 调用TcpConnection::shutdownInLoop()
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop,this));
}
}

void TcpConnection::shutdownInLoop)
{
loop_->assertInLoopThread();
if(!channel_->iswriting()/如果当前没有发送数据
Il we are not writing
socket_->shutdownwrite;
//调用Socket::shutdownwrite()
}
}

void Socket::shutdownwrite()
{
sockets::shutdownwrite(sockfd_);
}

void sockets::shutdownwrite(int sockfd)
{
int ret =::shutdown(sockfd, SHUT_wR);
}

笔者答复如下:

muduo TcpConnection没有提供close(),而只提供shutdown(),这么做是为了收发数据的完整性。

TCP是一个全双工协议,同一个文件描述符既可读又可写,shutdownWrite()关闭了“写”方向的连接,保留了“读”方向,这称为TCP half-close。如果直接close(socket_fd),那么socket_fd就不能读或写了。

用shutdown而不用close的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么muduo不会漏收这些数据。换句话说,muduo在TCP这一层面解决了“当你打算关闭网络连接的时候,如何得知对方是否发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。

也就是说muduo把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。

练习:阅读代码,回答“如果被动关闭连接,muduo的行为如何?”

另外,如果当前outputbuffer里还有数据尚未发出的话,muduo也不会立刻调用shutdownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
muduo/net/TcpConnection.cc
void TcpConnection::handlewrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_,peek(),
outputBuffer_.readableBytes();
if(n>0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes()==0)
{
channel_->disablewriting();
if(writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_,
shared_from_this()));
}
if (state_=kDisconnecting)
{
shutdownInLoop();
}
}
}
}
}

muduo这种关闭连接的方式对对方也有要求,那就是对方read()到0字节之后会主动关闭连接(无论shutdownWrite()还是close()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不关闭连接,那么muduo的连接就一直半开着,消耗系统资源。必要时可以调用TcpConnection::handleclose()来强行关闭连接,这需要将handleclose()改为public成员函数。

完整的流程见图7-1。我们发完了数据,于是shutdownwrite,发送TCPFIN分节,对方会读到0字节,然后对方通常会关闭连接。这样muduo会读到0字节,然后muduo关闭连接。(思考题:在shutdown之后,muduo回调connectioncallback的时间间隔大约是一个round-trip time,为什么?)

如果有必要,对方可以在read()返回0之后继续发送数据,这是直接利用了half-close TCP连接。muduo不会漏收这些数据。

那么muduo什么时候真正close socket呢?在TcpConnection对象析构的时候。TcpConnection持有一个Socket对象,Socket是一个RAII handler,它的析构函数会close(sockfd_)。这样,如果发生TcpConnection对象泄漏,那么我们从/proc/pid/fd/就能找到没有关闭的文件描述符,便于查错。

muduo在read()返回0的时候会回调connectioncallback,TcpServer或TcpClient把TcpConnection的引用计数减一。如果引用计数降到零,则表明用户代码也不持有TcpConnection,它就会析构了。

参考:《TCP/IP详解》[TCPv1]18.5节“TCP Half-Close”和《UNIX网络编程(第3版)》[UNP]6.6节“shutdown()函数"。

在网络编程中,应用程序发送数据往往比接收数据简单(实现非阻塞网络库正相反,发送比接收难),下一节我们再谈接收并解析消息的要领。

3. Boost.Asio的聊天服务器

本节将介绍一个与Boost.Asio的示例代码中的聊天服务器功能类似的网络服务程序,包括客户端与服务端的muduo实现。这个例子的主要目的是介绍如何处理分包,并初步涉及muduo的多线程功能。本文的代码位于examples/asio/chat/。

3.1 TCP分包

\(\S7.1\)“五个简单TCP示例”中处理的协议没有涉及分包,在TCP这种字节流协议上做应用层分包是网络编程的基本需求。分包指的是在发生一个消息(message)或一顿(frame)数据时,通过一定的处理,让接收方能从字节流中识别并截取(还原)出一个个消息。“粘包问题”是个伪题。

对于短连接的TCP服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕,接收方read()返回0,从而知道消息的结尾。例如\(\S7.1\)里的daytime和time协议。

对于长连接的TCP服务,分包有四种方法:

  1. 消息长度固定,比如muduo的roundtrip示例就采用了固定的16字节消息。
  2. 使用特殊的字符或字符串作为消息的边界,例如HTTP协议的headers以“”为字段的分隔符。
  3. 在每条消息的头部加一个长度字段,这恐怕是最常见的做法,本文的聊天协议也采用这一办法。
  4. 利用消息本身的格式来分包,例如XML格式的消息中<root>..</root>的配对,或者JSON格式中的(··的配对。解析这种消息格式通常会用到状态机(state machine)。

聊天服务

本节实现的聊天服务非常简单,由服务端程序和客户端程序组成,协议如下:

  • 服务端程序在某个端口侦听(listen)新的连接。
  • 客户端向服务端发起连接。
  • 连接建立之后,客户端随时准备接收服务端的消息并在屏幕上显示出来。
  • 客户端接受键盘输入,以回车为界,把消息发送给服务端。
  • 服务端接收到消息之后,依次发送给每个连接到它的客户端;原来发送消息的客户端进程也会收到这条消息。
  • 一个服务端进程可以同时服务多个客户端进程。当有消息到达服务端后,每个客户端进程都会收到同一条消息,服务端广播发送消息的顺序是任意的,不一定哪个客户端会先收到这条消息。
  • (可选)如果消息A先于消息B到达服务端,那么每个客户端都会先收到A再收到B。

这实际上是一个简单的基于TCP的应用层广播协议,由服务端负责把消息发送给每个连接到它的客户端。参与“聊天”的既可以是人,也可以是程序。在后文S7.11中,我将介绍一个稍微复杂一点的例子hub,它有“聊天室”的功能,客户端可以注册特定的topic(s),并往某个topic发送消息,这样代码更有意思。

3.2 消息格式

本聊天服务的消息格式非常简单,“消息”本身是一个字符串,每条消息有一个4字节的头部,以网络序存放字符串的长度。消息之间没有间隙,字符串也不要求以"\0"结尾。比方说有两条消息“hello”和“chenshuo”,那么打包后的字节流共有21字节:

1
2
0x00,0x00,0x00,0x05,'h','e','l','l','o',
0x00,0x00,0x00,0x08,'c','h','e','n','s','h','u','o'

打包的代码

这段代码把string message打包为muduo::net::Buffer,并通过conn发送。由于这个codec的代码位于头文件中,因此反复出现了muduo::net namespace。

文件路径examples/asio/chat/codec.h

1
2
3
4
5
6
7
8
void send(muduo::net::TcpConnection* conn, const muduo::StringPiece& message) {
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32);
conn->send(&buf);
}

muduoBuffer有一个很好的功能,它在头部预留了8个字节的空间,这样L6prepend()操作就不需要移动已有的数据,效率较高。

分包的代码

解析数据往往比生成数据更复杂,分包、打包也不例外。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp receiveTime) {
while (buf->readableBytes() >= kHeaderLen) { // kHeaderLen == 4
const void* data = buf->peek();
int32_t be32 = *static_cast<const int32_t*>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0) {
LOG_ERROR << "Invalid length " << len;
conn->shutdown();
break;
} else if (buf->readableBytes() >= len + kHeaderLen) {
buf->retrieve(kHeaderLen);
muduo::string message(buf->peek(), len);
messageCallback_(conn, message, receiveTime);
buf->retrieve(len);
} else {
break;
}
}
}

onMessage()

L12构造完整的消息,L13通过messageCallback_回调用户代码。L4有潜在的问题,在某些不支持非对齐内存访问的体系结构上会造成SIGBUS coredump,读取消息长度应该改用Buffer::peekInt32()。上面这段代码的L2用了while循环来反复读取数据,直到Buffer中的数据不够一条完整的消息。请读者思考,如果换成if (buf->readableBytes() >= kHeaderLen)会有什么后果。


Tips:

潜在问题

  1. 非对齐内存访问(SIGBUS):在不支持非对齐内存访问的体系结构上,直接将const void*转换为const int32_t*并解引用可能会导致SIGBUS信号,因为这违反了对齐要求。为了解决这个问题,可以使用Buffer::peekInt32()方法,它能够安全地处理对齐问题。
  2. 使用if代替while的后果:如果将while循环替换为if语句,可能会导致在某些情况下无法正确处理多条消息。如果Buffer中的数据不足以构成一条完整的消息,if语句只会检查一次,而不会在新数据到达后再次检查。这意味着如果Buffer中的数据量在第一次检查时不足,但随后又有新数据到达,使得总数据量足够构成一条消息,这种情况下使用if会导致消息被遗漏。而while循环会持续检查,直到Buffer中的数据不足以构成一条完整的消息为止。

以前面提到的两条消息的字节流为例:

1
2
0x00,0xe0,0x00,0x05,"h''e'"1'"1','o',
0x00,0xe0,0xe0,0x08,'c','h','e','n','s','h','u','o

假设数据最终都全部到达,onMessage()至少要能正确处理以下各种数据到达的次序,每种情况下messageCallback_都应该被调用两次:

  1. 每次收到一个字节的数据,onMessage()被调用21次;
  2. 数据分两次到达,第一次收到2个字节,不足消息的长度字段;
  3. 数据分两次到达,第一次收到4个字节,刚好够长度字段,但是没有body;
  4. 数据分两次到达,第一次收到8个字节,长度完整,但body不完整;
  5. 数据分两次到达,第一次收到9个字节,长度完整,body也完整;
  6. 数据分两次到送,第一次收到10个字节,第一条消息的长度完整、body也完整,第二条消息长度不完整;
  7. 请自行移动和增加分割点,验证各种情况;一共有超过100万种可能(2^21-1)。
  8. 数据一次就全部到达,这时必须用while循环来读出两条消息,否则消息会堆积在Buffer中。

请读者验证onMessage()是否做到了以上几点。这个例子充分说明了non-blocking read必须和input buffer一起使用。而且在写decoder的时候一定要在收到完整的消息(L10)之后再retrieve整条消息(L11L14),除非接收方使用复杂的状态机来解码。

3.3 编解码器LengthHeaderCodec

有人评论muduo的接收缓冲区不能设置回调函数的触发条件,确实如此。每当socket可读时,muduo的TcpConnection会读取数据并存入input buffer,然后回调用户的函数。不过,一个简单的间接层就能解决问题,让用户代码只关心“消息到达”而不是“数据到达”,如本例中的LengthHeaderCodec所展示的那样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
examples/asio/chat/codec.h
class LengthHeaderCodec : boost::noncopyable
{
public:
typedef boost::function<void (const muduo::net::TcpConnectionPtr&,
const muduo::string& message,
muduo::Timestamp)> StringMessageCallback;

explicit LengthHeaderCodec(const StringMessageCallback& cb)
: messageCallback_(cb)
{
}

// onMessage()和send同前。
};

const static size_t kHeaderLen = sizeof(int32_t);

这段代码把以Buffer*为参数的MessageCallback转换成了以const string&为参数的StringMessageCallback,让用户代码不必关心分包操作。如果编程语言相同,客户端和服务端可以(应该)共享同一个codec,这样既节省工作量,又避免因对协议理解不一致而导致的错误。

3.4 服务端的实现

聊天服务器的服务端代码小于100行,不到asio的一半。

除了经常见到的EventLoop和TcpServer,ChatServer还定义了codec_和connections_作为成员,后者存放目前已建立的客户连接。在收到消息之后,服务器会遍历整个容器,把消息广播给其中的每一个TCP连接(onStringMessage())。

首先,在构造函数里注册回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
examples/asio/chat/server.cc
class ChatServer : boost::noncopyable
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "ChatServer"),
codec_(boost::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
boost::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}

void start()
{
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << "->
<< conn->peerAddress().toIpPort() << "is"
<< (conn->connected() ? "Up" : "Down");
if (conn->connected())
{
connections_.insert(conn);
}
else
{
connections_.erase(conn);
}
}

void onStringMessage(const TcpConnectionPtr& conn,
const string& message,
Timestamp)
{
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end(); ++it)
{
codec_.send(get_pointer(*it), message);
}
}

typedef std::set<TcpConnectionPtr> ConnectionList;
EventLoop* loop_;
TcpServer server_;
LengthHeaderCodec codec_;
ConnectionList connections_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid=" << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
}
else
{
printf("Usage: %sport\n", argv[0]);
}
}

3.5 客户端的实现

我有时觉得服务端的程序常常比客户端的更容易写,聊天服务器再次验证了我的看法。客户端的复杂性来自于它要读取键盘输入,而EventLoop是独占线程的,所以我用了两个线程:main()函数所在的线程负责读键盘,另外用一个EventLoopThread来处理网络IO。

现在来看代码,首先,在构造函数里注册回调,并使用了跟前面一样的LengthHeaderCodec作为中间层,负责打包、分包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
examples/asio/chat/client.cc
class ChatClient : boost::noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
client_(loop, serverAddr, "ChatClient"),
codec_(boost::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
boost::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}

void connect()
{
client_.connect();
}
// disconnect()目前为空,客户端的连接由操作系统在进程终止时关闭。
void disconnect()
{
// client_.disconnect();
}
// write()会由main线程调用,所以要加锁,这个锁不是为了保护TcpConnection,而是为了保护shared_ptr。
void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}

private:
// onConnection()会由EventLoop线程调用,所以要加锁以保护shared_ptr。
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << "->"
<< conn->peerAddress().toIpPort() << "is"
<< (conn->connected() ? "Up" : "Down");
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
//把收到的消息打印到屏幕,这个函数由EventLoop线程调用,但是不用加锁,因为printf()是线程安全的。注意这里不能用std:cout<,它不是线程安全的。
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<<%s\n", message.c_str());
}

EventLoop* loop_;
TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid=" << getpid();
if (argc > 2)
{
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);

ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();

std::string line;
while (std::getline(std::cin, line))
{
client.write(line);
}
client.disconnect();
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}

简单测试

打开三个命令行窗口,在第一个窗口运行:

1
$ ./asio_chat_server 3000

在第二个窗口运行:

1
$ ./asio_chat_client 127.0.0.1 3000

在第三个窗口运行同样的命令:

1
$ ./asio_chat_client 127.0.0.1 3000

这样就有两个客户端进程参与聊天。在第二个窗口里输入一些字符并回车,字符会出现在本窗口和第三个窗口中。

代码示例中还有另外三个server程序,都是多线程的,详细介绍在p.260。

  • server_threaded.cc使用多线程TcpServer,并用mutex来保护共享数据。
  • server_threaded_eficient.cc对共享数据以\(\S2.8\)“借shared_ptr实现copy-on-write"的手法来降低锁竞争。
  • server_threaded_highperformance.cc采用thread local变量,实现多线程高效转发,这个例子值得仔细阅读理解。

\(\S7.8\)会介绍muduo中的定时器,并实现Boost.Asio教程中的timer2~5示例,以及带流量统计功能的discard和echo服务器(来自JavaNetty)。流量等于单位时间内发送或接收的字节数,这要用到定时器功能。

4. muduoBuffer类的设计与使用

本节介绍muduo中输入输出缓冲区的设计与实现。文中buffer指一般的应用层缓冲区、缓冲技术,Buffer特指muduo::net::Buffer class。

4.1 muduo的IO模型

[UNP]6.2节总结了Unix/Linux上的五种IO模型:阻塞(blocking)、非阻塞(non-blocking)、IO复用(IO multiplexing)、信号驱动(signal-driven)、异步(asynchronous)。这些都是单线程下的IO模型。

C10k问题页面介绍了五种IO策略,把线程也纳入考量。在这个多核时代,线程是不可避免的。那么服务端网络编程该如何选择线程模型呢?我赞同libev作者的观点:one loop per thread is usually a good model。之前我也不止一次表述过这个观点,参见\(\S3.3\)“多线程服务器的常用编程模型”和\(\S6.6\)“详解muduo多线程模型”。

如果采用one looper thread的模型,多线程服务端编程的问题就简化为如何设计一个高效且易于使用的event loop,然后每个线程run一个event loop就行了(当然,同步和互斥是不可或缺的)。在“高效”这方面已经有了很多成熟的范例(libev、libevent、memcached,redis、lighttpd、nginx),在“易于使用”方面我希望muduo能有所作为。(muduo可算是用现代C++实现了Reactor模式,比起原始的Reactor来说要好用得多。)

event loop是non-blocking网络编程的核心,在现实生活中,non-blocking几乎总是和IO multiplexing一起使用,原因有两点:

  • 没有人真的会用轮询(busy-pooling)来检查某个non-blocking IO操作是否完成,这样太浪费CPU cycles。
  • IO multiplexing一般不能和blocking IO用在一起,因为blocking IO中read/write()/accept()/connect()都有可能阻塞当前线程,这样线程就没办法处理其他socket上的IO事件了。见[UNP]16.6节“nonblocking accept”的例子。

所以,当我提到non-blocking的时候,实际上指的是non-blocking+IO multiplexing,单用其中任何一个是不现实的。另外,本书所有的“连接”均指TCP连接,socket和connection在文中可互换使用。

当然,non-blocking编程比blocking难得多,见\(\S6.4.1\)“TCP网络编程本质论”列举的难点。基于event loop的网络编程跟直接用C/C++编写单线程Windows程序颇为相像:程序不能阻塞,否则窗口就失去响应了;在event handler中,程序要尽快交出控制权,返回窗口的事件循环。

4.2 为什么non-blocking网络编程中应用层buffer是必需的

non-blocking IO的核心思想是避免阻塞在read()或write()或其他IO系统调用上,这样可以最大限度地复用thread-of-control,让一个线程能服务于多个socket连接。IO线程只能阻塞在IO multiplexing函数上,如select/poll/epoll_wait。这样一来,应用层的缓冲是必需的,每个TCP socket都要有stateful的input buffer和output buffer。

Tcp connection必须要有output buffer

考虑一个常见场景:程序想通过TCP连接发送100kB的数据,但是在write调用中,操作系统只接受了80kB(受TCP advertised window的控制,细节见[TCPv1]),你肯定不想在原地等待,因为不知道会等多久(取决于对方什么时候接收数据,然后滑动TCP窗口)。程序应该尽快交出控制权,返回event loop。在这种情况下,剩余的20kB数据怎么办?

对于应用程序而言,它只管生成数据,它不应该关心到底数据是一次性发送还是分成几次发送,这些应该由网络库来操心,程序只要调用TcpConnection::send()就行了,网络库会负责到底。网络库应该接管这剩余的20kB数据,把它保存在该TCP connection的output buffer里,然后注册POLLOUT事件,一旦socket变得可写就立刻发送数据。当然,这第二次write()也不一定能完全写入20kB,如果还有剩余,网络库应该继续关注POLLOUT事件;如果写完了20kB,网络库应该停止关注POLLOUT,以免造成busy loop。(muduo EventLoop采用的是epoll level trigger,原因见下页。)

如果程序又写入了50kB,而这时候output buffer量还有待发送的20kB数据,那么网络库不应该直接调用write(),而应该把这50kB数据append在那20kB数据之后,等socket变得可写的时候再一并写入。

如果output buffer里还有待发送的数据,而程序又想关闭连接(对程序而言,调用TcpConnection::send()之后他就认为数据早会发出去),那么这时候网络库不能立刻关闭连接,而要等数据发送完毕,见P.191“为什么TcpConnection::shutdown()没有直接关闭TCP连接”中的讲解。

综上,要让程序在write操作上不阻塞,网络库必须要给每个TCP connection配置output buffer.

Tcp connection必须要有input buffer TCP是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息”和“一次收到两条消息的数据”等情况。一个常见的场景是,发送方send()了两条1kB的消息(共2kB),接收方收到数据的情况可能是:

  • 一次性收到2kB数据;
  • 分两次收到,第一次600B,第二次1400B;
  • 分两次收到,第一次1400B,第二次600B;
  • 分两次收到,第一次1kB,第二次1kB;
  • 分三次收到,第一次600B,第二次800B,第三次600B;
  • 其他任何可能。一般而言,长度为n字节的消息分块到达的可能性有2^n-1种。

网络库在处理“socket可读”事件的时候,必须一次性把socket里的数据读完(从操作系统buffer搬到应用层buffer),否则会反复触发POLLIN事件,造成busy-loop。那么网络库必然要应对“数据不完整”的情况,收到的数据先放到input buffer里,等构成一条完整的消息再通知程序的业务逻辑。这通常是codec的职责,见\(\S7.3\)“Boost.Asio的聊天服务器”中的“TCP分包”的论述与代码。所以,在TCP网络编程中,网络库必须要给每个TCP connection配置input buffer。

muduo EventLoop采用的是epoll(4) level trigger,而不是edge trigger。一是为了与传统的poll(2)兼容,因为在文件描述符数目较少,活动文件描述符比例较高时,epoll(4)不见得比poll(2)更高效,必要时可以在进程启动时切换Poller。二是level trigger编程更容易,以往select(2)/poll(2)的经验都可以继续用,不可能发生漏掉事件的bug。三是读写的时候不必等候出现EAGAIN,可以节省系统调用次数,降低延迟。

所有muduo中的IO都是带缓冲的IO(buffered IO),你不会自己去read()或write()某个socket,只会操作TcpConnection的input buffer和output buffer。更确切地说,是在onMessage()回调里读取input buffer;调用TcpConnection::send()来间接操作output buffer,一般不会直接操作output buffer。

另外,muduo的onMessage()的原型如下,它既可以是free function,也可以是member function,反正muduo TcpConnection只认boost::function<>。

1
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime);

对于网络程序来说,一个简单的验收测试是:输入数据每次收到一个字节(200字节的输入数据会分200次收到,每次间隔10ms),程序的功能不受影响。对于muduo程序,通常可以用codec来分离“消息接收”与“消息处理”,见\(\S7.6\)“在muduo中实现Protobuf编解码器与消息分发器”对“编解码器codec”的介绍。

如果某个网络库只提供相当于char buf[8192]的缓冲,或者根本不提供缓冲区,而仅仅通知程序“某socket可读/某socket可写”,要程序自己操心IO buffering,这样的网络库用起来就很不方便了。

4.3 Buffer的功能需求

muduo Buffer的设计考虑了常见的网络编程需求,我试图在易用性和性能之间找一个平衡点,目前这个平衡点更偏向于易用性。

muduo Buffer的设计要点:

  • 对外表现为一块连续的内存(char* p,int len),以方便客户代码的编写。
  • 其size()可以自动增长,以适应不同大小的消息。它不是一个fixed size array(例如char buf[8192])。
  • 内部以std::vector<char>来保存数据,并提供相应的访问函数。

Buffer其实像是一个queue,从末尾写入数据,从头部读出数据。

谁会用Buffer?谁写谁读?根据前文分析,TcpConnection会有两个Buffer成员,input buffer与output buffer。

  • input buffer,TcpConnection会从socket读取数据,然后写入input buffer(其实这一步是用Buffer::readFd()完成的);客户代码从input buffer读取数据。
  • output buffer,客户代码会把数据写入output buffer(其实这一步是用TcpConnection::send()完成的);TcpConnection从output buffer读取数据并写入socket。

其实,input和output是针对客户代码而言的,客户代码从input读,往output写。TcpConnection的读写正好相反。

图7-2是muduo::net::Buffer的类图。请注意,为了后面画图方便,这个类图跟实际代码略有出入,但不影响我要表达的观点。代码位于muduo/net/Buffer.h,cc。

本节不介绍每个成员函数的使用,而会详细讲解readindex和writeindex的作用。

Buffer::readFd()我在p.138写道:

在非阻塞网络编程中,如何设计并使用缓冲区?一方面我们希望减少系统调用,一次读的数据越多越划算,那么似乎应该准备一个大的缓冲区。另一方面希望减少内存占用。如果有10000个并发连接,每个连接一建立就分配各50KB的读写缓冲区的话,将占用1GB内存,而大多数时候这些缓冲区的利用率很低。muduo用readv(2)结合栈上空间巧妙地解决了这个问题。

具体做法是,在栈上准备一个65536字节的extrabuf,然后利用readv()来读取数据,iovec有两块,第一块指向muduoBuffer中的writable区域,另一块指向栈上的extrabuf。这样如果读入的数据不多,那么全部都读到Buffer中去了;如果长度超过Buffer的writable字节数,就会读到栈上的extrabuf里,然后程序再把extrabuf里的数据append()到Buffer中,代码见\(\S8.7.2\)

这么做利用了临时栈空间,避免每个连接的初始Buffer过大造成的内存浪费,也避免反复调用read()的系统开销(由于缓冲区足够大,通常一次readv()系统调用就能读完全部数据)。由于muduo的事件触发来用leveltrigger,因此这个函数并不会反复调用read()直到其返回EAGAIN,从而可以降低消息处理的延退。

这算是一个小小的创新吧。

readFd()是最内层函数,其在每个IO线程的最大stack空间开销是固定的64KiB,与连接数目无关。

如果stack空间紧张,也可以改用threadlocal的extrabuf,但是不能全局共享一个extrabuf。(为什么?)

线程安全?muduo::net::Buffer不是线程安全的(其安全性跟std::vector相同),这么设计的理由如下:

  • 对于inputbuffer,onMessage()回调始终发生在该TcpConnection所属的那个IO线程,应用程序应该在onMessage()完成对inputbuffer的操作,并且不要把inputbuffer暴露给其他线程。这样所有对inputbuffer的操作都在同一个线程,Buffer class不必是线程安全的。
  • 对于outputbuffer,应用程序不会直接操作它,而是调用TcpConnection::send()来发送数据,后者是线程安全的。

代码中用EventLoop::assertInLoopThread()保证以上假设成立。

如果TcpConnection::send()调用发生在该TcpConnection所属的那个IO线程,那么它会转而调用TcpConnection::sendInLoop(), sendInLoop()会在当前线程(也就是IO线程)操作outputbuffer;如果TcpConnection::send()调用发生在别的线程,它不会在当前线程调用sendInLoop(),而是通过EventLoop::runInLoop()把sendInLoop()函数调用转移到IO线程(听上去颇为神奇?),这样sendInLoop()还是在IO线程操作outputbuffer,不会有线程安全问题。当然,跨线程的函数转移调用涉及函数参数的跨线程传递,一种简单的做法是把数据拷贝一份,绝对安全。

另一种更为高效的做法是用swap()。这就是为什么Tcpconnection::send的某个重载以Buffer*为参数,而不是const Buffer&,这样可以避免拷贝,而用Buffer::swap()实现高效的线程间数据转移。(最后这点,仅为设想,暂未实现。目前仍然以数据拷贝方式在线程间传递,略微有些性能损失。)

4.4 Buffer的数据结构

Buffer的内部是个std::vector<char>,它是一块连续的内存。此外,Buffer有两个data member,指向该vector中的元素。这两个index的类型是int,不是char*,目的是应对指针失效。muduo Buffer的设计参考了Netty的ChannelBuffer和libevent1.4.x的evbuffer。不过,其prependable可算是一点“微创新”。

在介绍Buffer的数据结构之前,先简单说一下后面示意图中表示指针或下标的箭头所指位置的具体含义。对于长度为10的字符串"ChenShuo",如果p0指向第0个字符(白色区域的开始),p1指向第5个字符(灰色区域的开始),p2指向""之后的那个位置(通常是end()指针所指的位置),那么精确的画法如图7-3的左图所示,简略的画法如图7-3的右图所示,后文都采用这种简略画法。

muduo Buffer的数据结构如图7-4所示。

两个index把vector的内容分为三块:prependable、readable、writable,各块的大小见式7-1。灰色部分是Buffer的有效载荷(payload),prependable的作用留到后面讨论。

1
2
3
prependable = readIndex
readable = writeIndex - readIndex
writable = size - writeIndex

readIndex和writeIndex满足以下不变式(invariant):

1
0 ≤ readIndex ≤ writeIndex ≤ size()

muduo Buffer里有两个常数kCheapPrepend和kInitialSize,定义了prependable的初始大小和writable的初始大小,readable的初始大小为0。在初始化之后,Buffer的数据结构如图7-5所示,其中括号里的数字是该变量或常量的值。

根据以上公式(见式7-1)可算出各块的大小,刚刚初始化的Buffer里没有payload数据,所以readable==0。

4.5 Buffer的操作

基本的read-write cycle

Buffer初始化后的情况见图7-4。如果向Buffer写入了200字节,那么其布局如图7-6所示。

图7-6中writeIndex向后移动了200字节,readIndex保持不变,readable和writable的值也有变化。

如果从Buffer read() & retrieve()(下称“读入”)了50字节,结果如图7-7所示。与图7-6相比,readIndex向后移动50字节,writeIndex保持不变,readable和writable的值也有变化(这句话往后从略)。

然后又写入了200字节,writeIndex向后移动了200字节,readIndex保持不变,如图7-8所示。

接下来,一次性读入350字节,请注意,由于全部数据读完了,readIndex和writeIndex返回原位以备新一轮使用(见图7-9),这和图7-5是一样的。

以上过程可以看作是发送方发送了两条消息,长度分别为50字节和350字节,接收方分两次收到数据,每次200字节,然后进行分包,再分两次回调客户代码。

自动增长

muduo Buffer不是固定长度的,它可以自动增长,这是使用vector的直接好处。假设当前的状态如图7-10所示。(这和前面的图7-8是一样的。)

客户代码一次性写入1000字节,而当前可写的字节数只有624,那么buffer会自動增长以容纳全部数据,得到的结果如图7-11所示。注意readIndex返回到了前面,以保持prependable等于kCheapPrependable。由于vector重新分配了内存,原来指向其元素的指针会失效,这就是为什么readIndex和writeIndex是整数下标而不是指针。(注意:在目前的实现中prependable会保持58字节,留待将来修正。)

然后读入350字节,readIndex前移,如图7-12所示。

最后,读完剩下的1000字节,readIndex和writeIndex返回kCheapPrependable,如图7-13所示:

注意buffer并没有缩小大小,下次写入1350字节就不会重新分配内存了。换句话說,muduo Buffer的size()是自适应的,它一开始的初始值是1kB多,如果程序中经常收发10kB的数据,那么用几次之后它的size()会自动增长到10kB,然后就保持不变。这样一方面避免浪费内存(Buffer的初始大小直接决定了高并发连接时的内存消耗),另一方面避免反复分配内存。当然,客户代码可以手动shrink() buffer size。

size()与capacity()

使用vector的另一个好处是它的capacity()机制减少了内存分配的次数。比如说程序反复写入1字节,muduo Buffer不会每次都分配内存,vector的capacity()以指数方式增长,让push_back()的平均复杂度是常数。比如说经过第一次增长,size()刚好满足写入的需求,如图7-14所示。但这个时候vector的capacity()已经大于size(),在接下来写入capacity()一size()字节的数据时,都不会重新分配内存,如图7-15所示。

思考题:为什么我们不需要调用reserve()来预先分配空间?因为Buffer在构造函数里把初始size()设为1KiB,这样当size()超过1KiB的时候vector会把capacity()加倍,等于说resize()替我们做了reserve()的事。用一段简单的代码验证一下:

1
2
3
4
5
6
vector<char> vec;
printf("%zd %zd in", vec.size(), vec.capacity());
vec.resize(1024);
printf("%zd %zd in", vec.size(), vec.capacity());
vec.resize(1300);
printf("%zd %zd in", vec.size(), vec.capacity());

运行结果:

1
2
3
4
5
6
0 0
#一开始size()和capacity()都是0
1024 1024
#resize(1024)之后size和capacity()都是1024
1300 2048
#resize(1300)之后capacity()翻倍,相当于reserve(2048)

细心的读者可能会发现用capacity()也不是完美的,它有优化的余地。具体来说,vector::resize()会初始化(memset()/bzero())内存,而我们不需要它初始化,因为反正立刻就要填入数据。比如,在图7-15的基础上写入200字节,由于capacity()足够大,不会重新分配内存,这是好事;但是vector::resize()会先把那200字节memset()为0(见图7-16),然后muduo Buffer再填入数据(见图7-17)。这么做稍微有点浪费,不过我不打算优化它,除非它确实造成了性能瓶颈。(精通STL的读者可能会说用vec.insert(vec.end(), ...)以避免浪费,但是writeIndex和size()不一定是对齐的,会有别的麻烦。)

Google Protobuf中有一个STLStringResizeUninitialized函数,干的就是这个事情。

内部腾挪

有时候,经过若干次读写,readIndex移到了比较靠后的位置,留下了巨大的prependable空间,如图7-18所示。

这时候,如果我们想写入300字节,而writable只有200字节,怎么办?muduo Buffer在这种情况下不会重新分配内存,而是先把已有的数据移到前面去,腾出writable空间,如图7-19所示。

然后,就可以写入300字节了,如图7-20所示。

这么做的原因是,如果重新分配内存,反正也是要把数据拷贝到新分配的内存区域,代价只会更大。

前方添加(prepend)

前面说muduo Buffer有个小小的创新(或许不是创新,我记得在哪儿看到过类似的做法,忘了出处),即提供prependable空间,让程序能以很低的代价在数据前面添加几个字节。

比如说,程序以固定的4个字节表示消息的长度(见\(\S7.3\)“Boost.Asio的聊天服务器”中的LengthHeaderCodec),我要序列化一个消息,但是不知道它有多长,那么我可以一直append()直到序列化完成(图7-21,写入了200字节),然后再在序列化数据的前面添加消息的长度(图7-22,把200这个数prepend到首部)。

通过预留kCheapPrependable空间,可以简化客户代码,以空间换时间。

以上各种use case的单元测试见muduo/net/tests/Buffer_unittest.cc。

4.6 其他设计方案

这里简单谈谈其他可能的应用层buffer设计方案。

不用vector<char>

如果有STL洁癖,那么可以自已管理内存,以4个指针为buffer的成员,数据结构如图7-23所示。

说实话我不觉得这种方案比std::vector好。代码变复杂了,性能也未见得有能察觉得到(noticeable)的改观。如果放弃“连续性”要求,可以用circular buffer,这样可以减少一点内存拷贝(没有“内部腾挪”)。

zero copy

如果对性能有极高的要求,受不了copy()与resize(),那么可以考虑实现分段连续的zero copy buffer再配合gather scatter IO,数据结构如图7-24所示,这是Linux多线程服务端编程:使用muduo C++网络库中libevent2.0.x的设计方案。TCPv2介绍的BSD TCP/IP实现中的mbuf也是类似的方案,Linux的sk_buff估计也差不多。细节有出入,但基本思路都是不要求数据在内存中连续,而是用链表把数据块链接到一起。

当然,高性能的代价是代码变得晦涩难读,buffer不再是连续的,parse消息会稍微麻烦一些。如果你的程序只处理Protobuf Message,这不是问题,因为Protobuf有ZeroCopyInputStream接口,只要实现这个接口,parsing的事情就交给Protobuf Message去操心了。

4.7 性能是不是问题

看到这里,有的读者可能会嘀咕:muduo Buffer有那么多可以优化的地方,其性能会不会太低?对此,我的回应是“可以优化,不一定值得优化。”

muduo的设计目标是用于开发公司内部的分布式程序。换句话说,它是用来写专用的Sudoku server或者游戏服务器,不是用来写通用的httpd或ftpd或Web proxy。前者通常有业务逻辑,后者更强调高并发与高吞吐量。

以Sudoku为例,假设求解一个Sudoku问题需要0.2ms,服务器有8个核,那么理想情况下每秒最多能求解40000个问题。每次Sudoku请求的数据大小低于100字节(一个9×9的数独只要81字节,加上header也可以控制在100字节以下),也就是说100×40000=4MB/s的吞吐量就足以让服务器的CPU饱和。在这种情况下,去优化Buffer的内存拷贝次数似乎没有意义。

再举一个例子,现在最常用的千兆以太网的裸吞吐量是125MB/s,扣除以太网header、IP header、TCP header之后,应用层的吞吐率大约在117MB/s上下。而现在服务器上最常用的DDR2/DDR3内存的带宽至少是4GB/s,比千兆以太网高40倍以上。也就是说,对于几KB或几十kB大小的数据,在内存中复制几次根本不是问题,因为受限于兆以太网延迟和带宽的限制,跟这个程序通信的其他机器上的程序不会觉察到性能差异。

最后举一个例子,如果你实现的服务程序要跟数据库打交道,那么瓶颈通常在DB上,优化服务程序本身不见得能提高性能(从DB读一次数据往往就抵消了你做的全部low-level优化),这时不如把精力投在DB调优上。

专用服务程序与通用服务程序的另一点区别是benchmark的对象不同。如果你打算写一个httpd,自然有人会拿来和目前最好的Nginx对比,立马就能比出性能高低。然而,如果你写一个实现公司内部业务的服务程序(比如分布式存储、搜索、微博、短网址),由于市面上没有同等功能的开源实现,你不需要在优化上投入全部精力,只要一版做得比一版好就行。先正确实现所需的功能,投入生产应用,然后再根据真实的负载情况来做优化,这恐怕比在编码阶段就盲目调优要更effective一些。

muduo的设计目标之一是吞吐量能让千兆以太网饱和,也就是每秒收发120MB数据。这个很容易就达到,不用任何特别的努力。

如果确实在内存带宽方面遇到问题,说明你做的应用实在太critical,或许应该考虑放到Linux kernel里边去,而不是在用户态尝试各种优化。毕竟只有把程序做到kernel里才能真正实现zero copy;否则,核心态和用户态之间始终是有一次内存拷贝的。如果放到kernel里还不能满足需求,那么要么自己写新的kernel,或者直接用FPGA或ASIC操作network adapter来实现你的“高性能服务器”。

5. 一种自动反射消息类型的Protobuf网络传输方案

本节假定读者了解Google Protocol Buffers是什么,这不是一篇Protobuf入门教程。本节的示例代码位于examples/protobuf/codec。


Tips:

Google Protocol Buffers(简称Protobuf)是一种由Google开发的、用于定义结构化数据并在不同的系统或编程语言之间高效地传输和存储数据的方法。它是一种语言中立、平台中立、可扩展的序列化结构数据的机制。Protobuf类似于JSON,但它更小、更快,并且可以生成特定语言的绑定。你只需定义一次数据结构,然后就可以利用特殊生成的源代码轻松地将结构化数据写入和读取各种数据流,使用多种语言。

Protobuf的主要特点包括: 1. 紧凑的数据存储:Protobuf生成的数据比JSON更小。 2. 快速解析:Protobuf的解析速度比JSON更快。 3. 多语言支持:Protobuf支持多种编程语言,如C++、Java、Python等。 4. 自动生成类:通过proto编译器从.proto文件生成代码,简化了数据操作。 5. 向后兼容性:Protobuf允许无缝支持更改,包括添加新字段和删除现有字段,而不会破坏现有服务。

Protobuf通过.proto文件定义数据结构,然后使用proto编译器生成对应语言的代码。例如,定义一个Person消息类型,然后可以生成C++、Java或Python代码,并在这些语言中使用这些数据结构。

Protobuf在Google内部被广泛使用,几乎用于所有的内部RPC协议和文件格式。它也被用于跨服务器通信以及磁盘上数据的存档存储。

最新更新显示,Protobuf v30版本(预计于2025年第一季度发布)中计划进行重大更改。这些更改包括对C++的breaking changes,以及对Bazel构建、Ruby版本支持的变更等。

总的来说,Protobuf是一种高效、灵活且广泛使用的数据序列化工具,特别适用于需要跨语言和平台进行数据传输和存储的场景。


本节要解决的问题是:通信双方在编译时就共享proto文件的情况下,接收方在收到Protobuf二进制数据流之后,如何自动创建具体类型的Protobuf Message对象,并用收到的数据填充该Message对象(即反序列化)。“自动”的意思是:当程序中新增一个Protobuf Message类型时,这部分代码不需要修改,不需要自己再去注册消息类型。其实,Google Protobuf本身具有很强的反射(reflection)功能,可以根据typename创建具体类型的Message对象,我们直接利用即可。

5.1 网络编程中使用Protobuf的两个先决条件

Google Protocol Buffers(简称Protobuf)是一款非常优秀的库,它定义了一种紧凑(compact,相对XML和JSON而言)的可扩展二进制消息格式,特别适合网络数据传输。它为多种语言提供binding,大大方便了分布式程序的开发,让系统不再局限于用某一种语言来编写。

在网络编程中使用Protobuf需要解决以下两个问题:

  1. 长度,Protobuf打包的数据没有自带长度信息或终结符,需要由应用程序自己在发送和接收的时候做正确的切分。
  2. 类型,Protobuf打包的数据没有自带类型信息,需要由发送方把类型信息传给接收方,接收方创建具体的Protobuf Message对象,再做反序列化。

Protobuf这么设计的原因见下一节。这里第一个问题很好解决,通常的做法是在每个消息前面加个固定长度的length header,例如\(\S7.3\)中实现的LengthHeaderCodec。第二个问题其实也很好解决,Protobuf对此有内建的支持。但是奇怪的是,从网上简单搜索的情况看,我发现了很多“山寨”的做法。

“山寨”做法

以下均为在Protobuf data之前加上header,header中包含消息长度和类型信息。类型信息的“山寨”做法主要有两种:

  • 在header中放int typeId,接收方用switch-case来选择对应的消息类型和处理函数;
  • 在header中放string typeName,接收方用look-up table来选择对应的消息类型和处理函数。

这两种做法都有问题。

第一种做法要求保持typeId的唯一性,它和Protobuf message type一一对应。如果Protobuf message的使用范围不广,比如接收方和发送方都是自己维护的程序,那么typeId的唯一性不难保证,用版本管理工具即可。如果Protobuf message的使用范围很大,比如全公司都在用,而且不同部门开发的分布式程序可能相互通信,那么就需要一个公司内部的全局机构来分配typeId,每次增加新message type都要去注册一下,比较麻烦。

第二种做法稍好一点。typeName的唯一性比较好办,因为可以加上package name(也就是用message的fully qualified typename),各个部门事先分好namespace不会冲突与重复。但是每次新增消息类型的时候都要去手工修改look-up table的初始化代码,也比较麻烦。

其实,不需要自己重新发明轮子,Protobuf本身已经自带了解决方案。

5.2 根据typename反射自动创建Message对象

Google Protobuf本身具有很强的反射(reflection)功能,可以根据typename创建具体类型的Message对象。但是奇怪的是,其官方教程从来没有明确提及这个用法,我估计还有很多人不知道这个用法,所以觉得值得谈一谈。

以下是笔者绘制的Protobuf class diagram(见图7-25)。我估计大家通常关心和使用的是这个类图的左半部分:MessageLite、Message、GeneratedMessage Types(Person、AddressBook)等,而较少注意到图7-25的右半部分:Descriptor、DescriptorPool, MessageFactory。

在图7-25中,起关键作用的是Descriptor.class,每个具体Message type对应一个Descriptor对象。尽管我们没有直接调用它的函数,但是Descriptor在“根据typename创建具体类型的Message对象”中扮演了重要的角色,起了桥梁作用。

图7-25中的一箭头描述了根据typename创建具体Message对象的过程,后文会详细介绍。

原理简述

Protobuf Message class采用了Prototype pattern,Message class定义了New()函数,用以返回本对象的一份新实体,类型与本对象的真实类型相同。也就是说,拿到Message*指针,不用知道它的具体类型,就能创建和其类型一样的具体Message type的对象。

每个具体Message type都有一个default instance,可以通过Concrete Message::default_instance()获得,也可以通过Message Factory::GetPrototype(const Descriptor*)来获得。所以,现在问题转变为:1.如何拿到Message Factory;2.如何拿到Descriptor*。

当然,Concrete Message::descriptor()返回了我们想要的Descriptor*,但是在不知道Concrete Message的时候,如何调用它的静态成员函数呢?这似乎是个鸡与蛋的问题。

我们的英雄是Descriptor Pool,它可以根据typename查到Descriptor*,只要找到合适的Descriptor Pool,再调用Descriptor Pool::FindMessageTypeByName(const string& type_name)即可。看到图7-25是不是眼前一亮?

在最终解决问题之前,先简单测试一下,看看我上面说得对不对。

验证思路

本文用于举例的proto文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
examples/protobuf/codec/query.proto
package muduo;
message Query {
required int64 id = 1;
required string questioner = 2;
repeated string question = 3;
message Answer {
required int64 id = 1;
required string questioner = 2;
required string answerer = 3;
repeated string solution = 4;
}
message Empty {
optional int32 id = 1;
}
}

其中的Query.questioner和Answer.answerer是S9.4提到的“分布式系统中的进程标识”。

以下代码验证Concrete Message::default_instance()、Concrete Message::descriptor()、Message Factory::GetPrototype()、Descriptor Pool::FindMessageTypeByName()之间的不变式(invariant),注意其中的assert:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef muduo::Query T;
std::string type_name = T::descriptor()->full_name();
cout << type_name << endl;
const Descriptor* descriptor = DescriptorPool::generated_pool()-> FindMessageTypeByName(type_name);
assert(descriptor == T::descriptor());
cout << "FindMessageTypeByName()=" << descriptor << endl;
cout << "T::descriptor()=" << T::descriptor() << endl;
cout << endl;
const Message* prototype = MessageFactory::generated_factory()->GetPrototype(descriptor);
assert(prototype == &T::default_instance());
cout << "GetPrototype()=" << prototype << endl;
cout << "T::default_instance()=" << &T::default_instance() << endl;
cout << endl;
T* new_obj = dynamic_cast<T*>(prototype->New());
assert(new_obj != NULL);
assert(new_obj == prototype);
assert(typeid(*new_obj) == typeid(T::default_instance()));
cout << "prototype->New()=" << new_obj << endl;
cout << endl;
delete new_obj;

程序运行结果如下:

1
2
3
4
5
6
muduo.Query
FindMessageTypeByName()=@xd4e720
T::descriptor()=@xd4e720
GetPrototype()=@xd47710
T::default_instance()=@xd47710
prototype->New()=@xd459ee

根据typename自动创建Message的关键代码

好了,万事俱备,开始行动:

  1. 用Descriptor Pool::generated_pool()找到一个Descriptor Pool对象,它包含了程序编译的时候所链接的全部Protobuf Message types。
  2. 根据typename用Descriptor Pool::FindMessageTypeByName()查找Descriptor。
  3. 再用Message Factory::generated_factory()找到Message Factory对象,它能创建程序编译的时候所链接的全部Protobuf Message types。
  4. 然后,用Message Factory::GetPrototype()找到具体Message type的default instance。
  5. 最后,用prototype->New()创建对象。

示例代码如下。

1
2
3
4
5
6
7
8
9
10
11
Message* createMessage(const std::string& typeName) {
Message* message = NULL;
const Descriptor* descriptor = DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
if (descriptor) {
const Message* prototype = MessageFactory::generated_factory()->GetPrototype(descriptor);
if (prototype) {
message = prototype->New();
}
}
return message;
}

调用方式:

1
2
3
4
Message* newQuery = createMessage("muduo.Query");
assert(newQuery != NULL);
assert(typeid(*newQuery) == typeid(muduo::Query::default_instance()));
cout << "createMessage(\"muduo.Query\")=" << newQuery << endl;

确实能从消息名称创建消息对象,古人不余欺也:-)

注意,createMessage()返回的是动态创建的对象的指针,调用方有责任释放它,不然就会使内存泄漏。在muduo里,我用shared_ptr<Message>来自动管理Message对象的生命期。

拿到Message*之后怎么办呢?怎么调用这个具体消息类型的处理函数?这就需要消息分发器(dispatcher)出马了,且听下回分解。

线程安全性

Google的文档说,我们用到的那几个Message Factory和Descriptor Pool都是线程安全的,Message::New()也是线程安全的。并且它们都是const member function。

关键问题解决了,那么剩下的工作就是设计一种包含长度和消息类型的Protobuf传输格式。

5.3 Protobuf传输格式

笔者设计了一个简单的格式,包含Protobuf data和其对应的长度与类型信息,消息的末尾还有一个checksum。格式如图7-26所示,图中方块的宽度是32-bit。

用C struct伪代码描述:

1
2
3
4
5
6
7
8
struct ProtobufTransportFormat --attribute--((__PACKED__))
{
int32_t len;
int32_t nameLen;
char typeName[nameLen];
char protobufData[len - nameLen - 8];
int32_t checkSum; // adler32 of nameLen, typeName and protobufData
};

注意,这个格式不要求32-bit对齐,我们的decoder会自动处理非对齐的消息。

例子

用这个格式打包一个muduo.Query对象的结果如图7-27所示。

设计决策

以下是我在设计这个传输格式时的考虑:

  • signed int。消息中的长度字段只使用了signed 32-bit int,而没有使用unsigned int,这是为了跨语言移植性,因为Java语言没有unsigned类型。另外,Protobuf一般用于打包小于1MB的数据,unsigned int也没用。
  • checksum。虽然TCP是可靠传输协议,虽然Ethernet有CRC-32校验,但是网络传输必须要考虑数据损坏的情况,对于关键的网络应用,checksum是必不可少的。见\(\S A.1.13\)“TCP的可靠性有多高”。对于Protobuf这种紧凑的二进制格式而言,肉眼看不出数据有没有问题,需要用checksum。
  • adler32算法。我没有选用常见的CRC-32,而是选用了adler32,因为它的计算量小、速度比较快,强度和CRC-32差不多。另外,zlib和java.util.zip都直接支持这个算法,不用我们自己实现。
  • typename以“\0”结束。这是为了方便troubleshooting,比如通过tcpdump抓下来的包可以用肉眼很容易看出typename,而不用根据nameLen去一个个数字节。同时,为了方便接收方处理,加入了nameLen,节省了strlen(),这是以空间换时间的做法。
  • 没有版本号。ProtobufMessage的一个突出优点是用optional fields来避免协议的版本号(凡是在ProtobufMessage里放版本号的人都没有理解Protobuf的设计,甚至可能没有仔细阅读Protobuf的文档),让通信双方的程序能各自升级,便于系统演化。如果我设计的这个传输格式又把版本号加进去,那就画蛇添足了。

Protobuf可谓是网络协议格式的典范,值得我单独花一节篇幅讲述其思想,见9.6.1“可扩展的消息格式”。

6. 在muduo中实现Protobuf编解码器与消息分发器

本节是前一节的自然延续,介绍如何将前文介绍的打包方案与muduo::net::Buffer结合,实现Protobuf codec和dispatcher。

在介绍codec和dispatcher之前,先讲讲前文的一个未决问题。

为什么Protobuf的默认序列化格式没有包含消息的长度与类型

Protobuf是经过深思熟虑的消息打包方案,它的默认序列化格式没有包含消息的长度与类型,自然有其道理。哪些情况下不需要在Protobuf序列化得到的数据流中包含消息的长度和(或)类型?我能想到的方案有:

  • 如果把消息写入文件,一个文件存一个消息,那么序列化结果中不需要包含长度和类型,因为从文件名和文件长度中可以得知消息的类型与长度。

  • 如果把消息写入文件,一个文件存多个消息,那么序列化结果中不需要包含类型,因为文件名就代表了消息的类型。

  • 如果把消息存入数据库(或者NoSQL),以VARBINARY字段保存,那么序列化结果中不需要包含长度和类型,因为从字段名和字段长度中可以得知消息的类型与长度。

  • 如果把消息以UDP方式发送给对方,而且对方一个UDP port只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从port和UDP packet长度中可以得知消息的类型与长度。

  • 如果把消息以TCP短连接方式发给对方,而且对方一个TCP port只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从port和TCP数据流长度中可以得知消息的类型与长度。

  • 如果把消息以TCP长连接方式发给对方,但是对方一个TCP port只接收一种消息类型,那么序列化结果中不需要包含类型,因为port代表了消息的类型。

  • 如果采用 RPC 方式通信,那么只需要告诉对方 method name,对方自然能推断出 Request 和 Response 的消息类型,这些可以由 protoc 生成的 RPC stubs 自动搞定。

对于以上最后一点,比方说 sudoku.proto 的定义是:

1
2
3
service SudokuService {
rpc Solve(SudokuRequest) returns (SudokuResponse);
}
那么 RPC method SudokuService.Solve 对应的请求和响应分别是 SudokuRequest 和 SudokuResponse。在发送 RPC 请求的时候,不需要包含 SudokuRequest 的类型,只需要发送 method name SudokuService.Solve,对方自然知道应该按照 SudokuRequest 来解析(parse)请求。

对于上述这些情况,如果 Protobuf 无条件地把长度和类型放到序列化的字节串中,只会浪费网络带宽和存储。可见 Protobuf 默认不发送长度和类型是正确的决定。Protobuf 为消息格式的设计树立了典范,哪些该自己搞定,哪些留给外部系统去解决,这些都考虑得很清楚。

只有在使用 TCP 长连接,且在一个连接上传递不止一种消息的情况下(比方同时发 Heartbeat 和 Request/Response),才需要我前文提到的那种打包方案。这时候我们需要一个分发器 dispatcher,把不同类型的消息分给各个消息处理函数,这正是本节的主题之一。

以下均只考虑 TCP 长连接这一应用场景。先谈谈编解码器。

6.1 什么是编解码器(codec)

编解码器(codec)是encoder和decoder的缩写,这是一个软硬件领域都在使用的术语,这里我借指“把网络数据和业务消息之间互相转换”的代码。

在最简单的网络编程中,没有消息(message),只有字节流数据,这时候是不需要用到codec的。比如我们前面讲过的echo server,它只需要把收到的数据原封不动地发送回去,而不必关心消息的边界(也没有“消息”的概念),收多少就发多少,这种情况下它干脆直接使用muduo::net::Buffer,取到数据再交给TcpConnection发送回去。

non-trivial的网络服务程序通常会以消息为单位来通信,每条消息有明确的长度与界限。程序每次收到一个完整的消息的时候才开始处理,发送的时候也是把一个完整的消息交给网络库。比如我们前面讲过的asiochat服务,它的一条聊天记录就是一条消息。为此我们设计了一个简单的消息格式,即在聊天记录前面加上4字节的长度header。LengthHeaderCodec代码及解说见\(\S7.3\)

codec的基本功能之一是做TCP分包:确定每条消息的长度,为消息划分界限。在non-blocking网络编程中,codec几乎是必不可少的。如果只收到了半条消息,那么不会触发消息事件回调,数据会停留在Buffer里(数据已经读到Buffer中了),等待收到一个完整的消息再通知处理函数。既然这个任务太常见,我们干脆做一个utility class,避免服务端和客户端程序都要自己处理分包,这就有了LengthHeaderCodec。这个codec的使用有点奇怪,不需要继承,它也没有基类,只要把它当成普通data member来用,把TcpConnection的数据“喂”给它,然后向它注册onxxxMessage()回调,代码见asio chat示例。muduo里的codec都是这样的风格:通过boost::function黏合到一起。

codec是一层间接性,它位于TcpConnection和ChatServer之间,拦截处理收到的数据(Buffer),在收到完整的消息之后,解出消息对象(std::string),再调用ChatServer对应的处理函数。注意ChatServer::onStringMessage()的参数是std::string,不再是muduo::net::Buffer,也就是说LengthHeaderCodec把Buffer解码成了string。另外,在发送消息的时候,ChatServer通过LengthHeaderCodec::send()来发送string,LengthHeaderCodec负责把它编码成Buffer。这正是“编解码器”名字的由来。

Protobuf codec与此非常类似,只不过消息类型从std::string变成了protobuf::Message。对于只接收处理Query消息的QueryServer来说,用ProtobufCodec非常方便,收到protobuf::Message之后向下转型成Query来用就行。

如果要接收处理不止一种消息,ProtobufCodec恐怕还不能单独完成工作,请继续阅读下文。

6.2 实现ProtobufCodec

Protobuf的打包方案我已经在前一节中讲过。编码算法很直截了当,按照前文定义的消息格式一路打包下来,最后更新一下首部的长度即可。代码位于examples/protobuf/codec/codec.cc中的ProtobufCodec::fillEmptyBuffer()。

解码算法有几个要点:

  • protobuf::Message是new出来的对象,它的生命期如何管理?muduo采用shared_ptr来自动管理对象生命期、与整体风格保持一致。
  • 出错如何处理?比方说长度超出范围、checksum不正确、message typename不能识别、message parse出错等等。ProtobufCodec定义了ErrorCallback,用户代码可以注册这个回调。如果不注册,默认的处理是断开连接,让客户重连重试。codec的单元测试里模拟了各种出错情况。
  • 如何处理一次收到半条消息、一条消息、一条半消息、两条消息等等情况?这是每个non-blocking网络程序中的codec都要面对的问题。在p.196的示例代码中我们已经解决了这个问题。

Protobufcodec在实际使用中有明显的不足:它只负责把Buffer转换为具体类型的ProtobufMessage,每个应用程序拿到Message对象之后还要再根据其具体类型做一次分发。我们可以考虑做一个简单通用的分发器dispatcher,以简化客户代码。

此外,目前ProtobufCodec的实现非常初级,它没有充分利用ZeroCopyInputStream和ZeroCopyOutputStream,而是把收到的数据作为bytearray交给Protobuf Message去解析,这给性能优化留下了空间。ProtobufMessage不要求数据连续(像vector那样),只要求数据分段连续(像deque那样),这给buffer管理带来了性能上的好处(避免重新分配内存,减少内存碎片),当然也使得代码变得更为复杂。

muduo::net::Buffer非常简单,它内部是vector,我目前不想让Protobuf影响muduo本身的设计,毕竟muduo是个通用的网络库,不是为实现ProtobufRPC而特制的。

6.3 消息分发器(dispatcher)有什么用

前面提到,在使用TCP长连接,且在一个连接上传递不止一种Protobuf消息的情况下,客户代码需要对收到的消息按类型做分发。比方说,收到Logon消息就交给QueryServer::onLogon()去处理,收到Query消息就交给QueryServer::onQuery()去处理。这个消息分派机制可以做得稍微有点通用性,让所有muduo+Protobuf程序受益,而且不增加复杂性。

换句话说,又是一层间接性,ProtobufCodec拦截了TcpConnection的数据,把它转换为Message,ProtobufDispatcher拦截了ProtobufCodec的callback,按消息具体类型把它分派给多个callbacks。

6.4 ProtobufCodec与ProtobufDispatcher的综合运用

我写了两个示例代码,client和server,把ProtobufCodec和ProtobufDispatcher串联起来使用。server响应Query消息,发送回Answer消息,如果收到未知消息类型,则断开连接。client可以选择发送Query或Empty消息,由命令行控制。这样可以测试unknownmessagecallback。

为节省篇幅,这里就不列出代码了,见examples/protobuf/codec/(client,server).cc。

在构造函数中,通过注册回调函数把四方(TcpConnection、codec、dispatcher、QueryServer)结合起来。

6.5 ProtobufDispatcher的两种实现

要完成消息分发,其实就是对消息做type-switch,这似乎是一个bad smell,但是ProtobufMessage的Descriptor没有留下定制点(比如暴露一个boost::any成员),我们只好硬来了。

先定义ProtobufMessageCallback回调:

1
typedef boost::function<void (Message*)> ProtobufMessageCallback;

ProtobufDispatcherLite的结构非常简单,它有一个map<Descriptor*, ProtobufMessageCallback>成员,客户代码可以以Descriptor*为key注册回调(回想:每个具体消息类型都有一个全局的Descriptor对象,其地址是不变的,可以用来当key。在收到ProtobufMessage之后,在map中找到对应的ProtobufMessageCallback,然后调用之。如果找不到,就调用defaultCallback。

不过,它的设计也有小小的缺陷,那就是ProtobufMessageCallback限制了客户代码只能接受基类Message,客户代码需要自己做向下转型(downcast)。

如果我希望QueryServer这么设计:不想每个消息处理函数自已做downcast,而是交给dispatcher去处理,客户代码拿到的就已经是想要的具体类型。接口如图7-34所示。

那么该如何实现ProtobufDispatcher呢?它如何与多个未知的消息类型合作?做downcast需要知道目标类型,难道我们要用一长串模板类型参数吗?

有一个办法,把多态与模板结合,利用templated derived class来提供类型上的灵活性。设计如图7-35所示。

ProtobufDispatcher有一个模板成员函数,可以接受注册任意消息类型T的回调,然后它创建一个模板化的派生类CallbackT<T>,这样消息的类型信息就保存在了CallbackT<T>中,做downcast就简单了。

比方说,我们有两个具体消息类型Query和Answer。

然后我们这样注册回调:

1
2
3
4
dispatcher_.registerMessageCallback<muduo::Query>(
boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
dispatcher_.registerMessageCallback<muduo::Answer>(
boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));

这样会具现化(instantiation)出两个CallbackT实体。

6.6 ProtobufCodec和ProtobufDispatcher有何意义

ProtobufCodec和ProtobufDispatcher把每个直接收发ProtobufMessage的网络程序都会用到的功能提炼出来做成了公用的utility,这样以后新写Protobuf网络程序就不必为打包分包和消息分发劳神了。它俩以库的形式存在,是两个可以拿来就用的class。它们没有基类,也没有用到虚函数或者别的什么面向对象特征,不侵人muduo::net或者你的代码。如果不这么做,那将来每个Protobuf网络程序都要自己重新实现类似的功能,徒增负担。

\(\S9.7\)“分布式程序的自动化回归测试”会介绍利用Protobuf的跨语言特性,采用Java为C++服务程序编写test harness。

这种编码方案的JavaNetty示例代码见http/github.com/chenshuo/muduo-protorpc中的com.chenshuo.muduo.codec package。

7. 限制服务器的最大并发连接数

本节以大家都熟悉的EchoServer为例,介绍如何限制TCP服务器的并发连接数。代码见examples/maxconnection/。

7.1 为什么要限制并发连接数

一方面,我们不希望服务程序超载;另一方面,更因为file descriptor是稀缺资源,如果出现file descriptor耗尽,很棘手,跟“malloc()失败/new抛出std::bad_alloc”差不多同样棘手。

我2010年10月在《分布式系统的工程化开发方法》演讲中曾谈到libev的作者Marc Lehmann建议的一种应对“accept()时file descriptor耗尽”的办法。

在服务端网络编程中,我们通常用Reactor模式来处理并发连接。listening socket是一种特殊的IO对象,当有新连接到达时,此listening文件描述符变得可读(POLLIN),epoll_wait返回这一事件。然后我们用accept系统调用获得新连接的socket文件描述符。

假如accept()返回EMFILE该如何应对?这意味着本进程的文件描述符已经达到上限,无法为新连接创建socket文件描述符。但是,既然没有socket文件描述符来表示这个连接,我们就无法close()它。程序继续运行,回到再一次调用epoll_wait。这时候epoll_wait会立刻返回,因为新连接还等待处理,listening fd还是可读的。这样程序立刻就陷入了busy loop,CPU占用率接近100%。这既影响同一event loop上的连接,也影响同一机器上的其他服务。

该怎么办呢?Marc Lehmann提到了几种做法:

  1. 调高进程的文件描述符数目。治标不治本,因为只要有足够多的客户端,就一定能把一个服务进程的文件描述符用完。
  2. 死等。驼鸟算法。
  3. 退出程序。似乎小题大做,为了这种暂时的错误而中断现有的服务似乎不值得。
  4. 关闭listening fd。那么什么时候重新打开呢?
  5. 改用edge trigger。如果漏掉了一次accept(), 程序再也不会收到新连接。
  6. 准备一个空闲的文件描述符。遇到这种情况,先关闭这个空闲文件,获得一个文件描述符的名额;再accept()拿到新socket连接的描述符:随后立刻close()它,这样就优雅地断开了客户端连接:最后重新打开一个空闲文件,把“坑”占住,以备再次出现这种情况时使用。

第2、5两种做法会导致客户端认为连接已建立,但无法获得服务,因为服务端程序没有拿到连接的文件描述符。

muduo的Acceptor正是用第6种方案实现的,见muduo/net/Acceptor.cc。但是,这个做法在多线程下不能保证正确,会有race condition。

其实有另外一种比较简单的办法:file descriptor是hard limit,我们可以自己设个稍低一点的soft limit,如果超过soft limit就主动拒绝新连接,这样就可避免触及“file descriptor耗尽”这种边界条件。比方说当前进程的max file descriptor是1024,那么我们可以在连接数达到1000的时候进入“拒绝新连接”状态,这样就可以留给我们足够的腾挪空间。

7.2 在muduo中限制并发连接数

在muduo中限制并发连接数的做法简单得出奇。以在\(\S6.4.2\)的EchoServer为例,只需要为它增加一个int成员,表示当前的活动连接数。(如果是多线程程序,应该用muduo::AtomicInt32。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ diff examples/simple/echo/echo.h examples/maxconnection/echo.h -u
--- examples/simple/echo/echo.h
+++ examples/maxconnection/echo.h
@@ -8,9 +8,10 @@
public:
EchoServer(muduo::net::EventLoop* loop,
const muduo::net::InetAddress& listenAddr,
int maxConnections); // kMaxConnections_ = maxConnections
void start();
private:
void onConnection(const muduo::net::TcpConnectionPtr& conn);
@@ -21,6 +22,8 @@
muduo::net::EventLoop loop_;
muduo::net::TcpServer server_;
+ int numConnected_; // should be atomic_int
+ const int kMaxConnections_;

然后,在EchoServer::onConnection()中判断当前活动连接数。如果超过最大允许数,则踢掉连接。

1
2
3
4
5
6
7
8
9
10
11
12
void EchoServer::onConnection(const TcpConnectionPtr& conn) {
LOG_INFO << "EchoServer-" << conn->peerAddress().toIpPort() << "->"
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected())
++numConnected_;
if (numConnected_ > kMaxConnections_) // 如果超过最大允许数,则踢掉连接
conn->shutdown();
else
--numConnected_;
LOG_INFO << "numConnected=" << numConnected_;
}

这种做法可以积极地防止耗尽file descriptor。

另外,如果是有业务逻辑的服务,则可以在shutdown()之前发送一个简单的响应,表明本服务程序的负载能力已经饱和,提示客户端尝试下一个可用的server(当然,下一个可用的server地址不一定要在这个响应里给出,客户端可以自已去name service查询),这样方便客户端快速failover。

\(\S7.10\)将介绍如何处理空闲连接的超时:如果一个连接长时间(若干秒)没有输入数据,则踢掉此连接。办法有很多种,我用timing wheel解决。

8. 定时器

从本节开始的三节内容都与非阻塞网络编程中的定时任务有关。

8.1 程序中的时间

在一般的服务端程序设计中,与时间有关的常见任务有:

  1. 获取当前时间,计算时间间隔。
  2. 时区转换与日期计算:把纽约当地时间转换为上海当地时间:2011-02-05之后第100天是几月几号星期几;等等。
  3. 定时操作,比如在预定的时间执行任务,或者在一段延时之后执行任务。

其中第2项看起来比较复杂,但其实最简单。日期计算用Julian Day Number,时区转换用tz database。这些操作都是纯函数,很容易用一套单元测试来验证代码的正确性。需要特别注意的是,用tzset/localtime_r来做时区转换在多线程环境下可能会有问题;对此,我的解决办法是写一个TimeZone class,以避免影响全局,日后在日期与时间专题文章中会讲到,本书不再细述。下文不考虑时区,均为UTC时间。

真正麻烦的是第1项和第3项。一方面,Linux有一天把令人眼花缭乱的与时间相关的函数和结构体,在程序中该如何选用?另一方面,计算机中的时钟不是理想的计时器,它可能会漂移或跳变。最后,民用的UTC时间与闰秒的关系也让定时任务变得复杂和微妙。当然,与系统当前时间有关的操作也让单元测试变得困难。

8.2 Linux时间函数

Linux的计时函数,用于获得当前时间:

  • time(2)/time_t(秒)
  • ftime(3)/struct timeb(毫秒)
  • gettimeofday(2)/struct timeval(微秒)
  • clock_gettime(2)/struct timespec(纳秒)

还有gmtime/localtime/timegm/mktime/strftime/struct tm等与当前时间无关的时间格式转换函数。

定时函数,用于让程序等待一段时间或安排计划任务:

  • sleep(3)
  • alarm(2)
  • usleep(3)
  • nanosleep(2)
  • clock_nanosleep(2)
  • getitimer(2)/setitimer(2)
  • timer_create(2)/timer_settime(2)/timer_gettime(2)/timer_delete(2)
  • timerfd_create(2)/timerfd_gettime(2)/timerfd_settime(2)

我的取舍如下:

  • (计时)只使用gettimeofday(2)来获取当前时间。
  • (定时)只使用timerfd_*系列函数来处理定时任务。

gettimeofday(2)入选原因(这也是muduo::Timestamp class的主要设计考虑):

  1. time(2)的精度太低,ftime(3)已被废弃;clock_gettime(2)精度最高,但是其系统调用的开销比gettimeofday(2)大。
  2. 在x86-64平台上,gettimeofday(2)不是系统调用,而是在用户态实现的,没有上下文切换和陷入内核的开销。
  3. gettimeofday(2)的分辨率(resolution)是1微秒,现在的实现确实能达到这个计时精度,足以满足日常计时的需要。muduo::Timestamp用一个int64_t来表示从Unix Epoch到现在的微秒数,其范围可达上下30万年。

timerfd_*人选的原因:

  1. sleep(3)/alarm(2)/usleep(3)在实现时有可能用了SIGALRM信号,在多线程程序中处理信号是个相当麻烦的事情,应当尽量避免,见\(\S4.10\)。再说,如果主程序和程序库都使用SIGALRM,就糟糕了。(为什么?)
  2. nanosleep(2)和clock_nanosleep(2)是线程安全的,但是在非阻塞网络编程中,绝对不能用让线程挂起的方式来等待一段时间,这样一来程序会失去响应。正确的做法是注册一个时间回调函数。
  3. getitimer(2)和timer_create(2)也是用信号来deliver超时,在多线程程序中也会有麻烦。timer_create(2)可以指定信号的接收方是进程还是线程,算一个进步,不过信号处理函数(signal handler)能做的事情实在很受限。
  4. timerfd_create(2)把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入select(2)/poll(2)框架中,用统一的方式来处理IO事件和超时事件,这也正是Reactor模式的长处。我在以前发表的Linux新增系统调用的启示中也谈到了这个想法,现在我把这个想法在muduo网络库中实现了。
  5. 传统的Reactor利用select(2)/poll(2)/epoll(4)的timeout来实现定时功能,但poll(2)和epoll_wait(2)的定时精度只有毫秒,远低于timerfd_settime(2)的定时精度。

必须要说明,在Linux这种非实时多任务操作系统中,在用户态实现完全精确可控的计时和定时是做不到的,因为当前任务可能会被随时切换出去,这在CPU负载高的时候尤为明显。但是,我们的程序可以尽量提高时间精度,必要的时候通过控制CPU负载来提高时间操作的可靠性,让程序在99.99%的时候都是按预期执行的。这或许比换用实时操作系统并重新编写及测试代码要经济一些。

关于时间的精度(accuracy)问题我留到日期与时间专题文章中讨论,本书不再细述,它与分辨率(resolution)不完全是一回事儿。

8.3 muduo的定时器接口

muduo EventLoop有三个定时器函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
muduo/net/EventLoop.h
typedef boost::function<void()> TimerCallback;
class EventLoop : boost::noncopyable
public:
// Timers
/// Runs callback at time
TimerId runAt(const Timestamp& time, const TimerCallback& cb);
/// Runs callback after @c delay seconds.
TimerId runAfter(double delay, const TimerCallback& cb);
/// Runs callback every @c interval seconds.
TimerId runEvery(double interval, const TimerCallback& cb);
/// Cancels the timer.
void cancel(TimerId timerId);
//...

函数名称很好地反映了其用途:

  • runAt在指定的时间调用TimerCallback;
  • runAfter等一段时间调用TimerCallback;
  • runEvery以固定的间隔反复调用TimerCallback;
  • cancel取消timer。

回调函数在EventLoop对象所属的线程发生,与onMessage、onConnection()等网络事件函数在同一个线程。muduo的TimerQueue采用了平衡二叉树来管理未到期的timers,因此这些操作的时间复杂度是O(logN)。

8.4 Boost.Asio Timer示例

Boost.Asio教程里以Timer和Daytime为例介绍Asio的基本使用,daytime已经在S7.1中介绍过,这里着重谈谈Timer。Asio有5个Timer示例,muduo把其中四个重新实现了一下,并扩充了第5个示例。

  1. 阻塞式的定时,muduo不支持这种用法,无代码。
  2. 非阻塞定时,见examples/asio/tutorial/timer2。
  3. 在TimerCallback里传递参数,见examples/asio/tutorial/timer3。
  4. 以成员函数为TimerCallback,见examples/asio/tutorial/timer4。
  5. 在多线程中回调,用mutex保护共享变量,见examples/asio/tutorial/timer5。
  6. 在多线程中回调,缩小临界区,把不需要互斥执行的代码移出来,见examples/asio/tutorial/timer6。

为节省篇幅,这里只列出timer4。这个程序的功能是以1秒为间隔打印5个整数,乍看起来代码有点小题大做,但是值得注意的是定时器事件与IO事件是在同一线程发生的,程序就像处理IO事件一样处理超时事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
examples/asio/tutorial/timer4/timer.cc
class Printer : boost::noncopyable
{
public:
Printer(muduo::net::EventLoop* loop)
: loop_(loop),
count_(0)
{
loop_->runAfter(1, boost::bind(&Printer::print, this));
}

~Printer()
{
std::cout << "Final count is " << count_ << "\n";
}

void print()
{
if (count_ < 5)
{
std::cout << count_ << "\n";
++count_;
loop_->runAfter(1, boost::bind(&Printer::print, this));
}
else
{
loop_->quit();
}
}

private:
muduo::net::EventLoop* loop_;
int count_;
};

int main()
{
muduo::net::EventLoop loop;
Printer printer(&loop);
loop.loop();
}

最后我再强调一遍,在非阻塞服务端编程中,绝对不能用sleep()或类似的办法来让程序原地停留等待,这会让程序失去响应,因为主事件循环被挂起了,无法处理IO事件。这就像在Windows编程中绝对不能在消息循环里执行耗时的代码是一个道理,这会让程序界面失去响应。Reactor模式的网络编程确实有些类似传统的消息驱动的Windows编程。对于“定时”任务,把它变成一个特定的消息,到时候触发相应的消息处理函数就行了。

Boost.Asio的timer示例只用到了EventLoop::runAfter,我再举一个EventLoop::runEvery的例子。

8.5 Java Netty示例

Netty是一个非常好的Java NIO网络库,它附带的示例程序有echo和discard两个简单网络协议。与S7.1不同,Netty版的echo和discard服务端有流量统计功能,这需要用到固定间隔的定时器(EventLoop::runEvery)。

其client的代码类似前文的chargen,为节省篇幅,请阅读源码examples/netty/discard/client.cc。

这里列出discard server的完整代码。代码整体结构上与S6.4.2的EchoServer差别不大,这算是简单网络服务器的典型模式了。

DiscardServer可以配置成多线程服务器,muduo TcpServer有一个内置的one looper thread多线程IO模型,可以通过setThreadNum()来开启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
muduo/examples/netty/discard/server.cc
int numThreads = 0;

class DiscardServer
{
public:
DiscardServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "DiscardServer"),
oldCounter_(0),
startTime_(Timestamp::now())
{
server_->setConnectionCallback(boost::bind(&DiscardServer::onConnection, this, _1));
server_->setMessageCallback(boost::bind(&DiscardServer::onMessage, this, _1, _2, _3));
server_->setThreadNum(numThreads);
loop->runEvery(3.0, boost::bind(&DiscardServer::printThroughput, this));
}

void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << "DiscardServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
size_t len = buf->readableBytes();
transferred_.add(len);
receivedMessages_.incrementAndGet();
buf->retrieveAll();
}

void printThroughput()
{
Timestamp endTime = Timestamp::now();
int64_t newCounter = transferred_.get();
int64_t bytes = newCounter - oldCounter_;
int64_t msgs = receivedMessages_.getAndSet(0);
double time = timeDifference(endTime, startTime_);
printf("%4.3fMiB/s %4.3f KiMsgs/s %6.2f bytes per msg\n",
static_cast<double>(bytes) / time / 1024 / 1024,
static_cast<double>(msgs) / time / 1024,
static_cast<double>(bytes) / static_cast<double>(msgs));
oldCounter_ = newCounter;
startTime_ = endTime;
}

private:
EventLoop* loop_;
TcpServer server_;
AtomicInt64 transferred_;
AtomicInt64 receivedMessages_;
int64_t oldCounter_;
Timestamp startTime_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid=" << getpid() << ",tid=" << CurrentThread::tid();
if (argc > 1)
{
numThreads = atoi(argv[1]);
}

EventLoop loop;
InetAddress listenAddr(2009);
DiscardServer server(&loop, listenAddr);
server.start();
loop.loop();
}

运行方法,在同一台机器的两个命令行窗口分别运行:

9. 测量两台机器的网络延迟和时间差

本节介绍一个简单的网络程序roundtrip,用于测量两台机器之间的网络延迟,即“往返时间(roundtrip time,RTT)。其主要考察定长TCP消息的分包与TCP_NODELAY的作用。本节的代码见examples/roundtrip/roundtrip.CC。

测量roundtriptime的办法很简单: - hostA发一条消息给hostB,其中包含hostA发送消息的本地时间。 - hostB收到之后立刻把消息echo回hostA。 - hostA收到消息之后,用当前时间减去消息中的时间就得到了roundtriptime。

NTP协议的工作原理与之类似,不过,除了测量roundtriptime,NTP还需要知道两台机器之间的时间差(clock offset),这样才能校准时间。

图7-38是NTP协议收发消息的协议,roundtriptime=(Ta-T4)-(T3-T2),clockoffset=((T1+T4)-(T2+T3))/2。NTP的要求是往返路径上的单程延迟要尽量相等,这样才能减少系统误差。偶然误差由单程延迟的不确定性决定。

在我设计的roundtrip示例程序中,协议有所简化,如图7-39所示。

计算公式如下: - round trip time=T3-T1 - clockoffset=T2 - (T1+T3)/2

简化之后的协议少取一次时间,因为server收到消息之后立刻发送回client,耗时很少(若干微秒),基本不影响最终结果。

我设计的消息格式是16字节定长消息,如图7-40所示。

T1和T2都是muduo::Timestamp,成员是一个int64_t,表示从Unix Epoch到现在的微秒数。为了让消息的单程往返时间接近,server和client发送的消息都是16 bytes,这样做到对称。由于是定长消息,可以不必使用codec,在messagecallback中直接用while(buffer->readableBytes()>= frameLen)就能decode。

client程序以200ms为间隔发送消息,在收到消息之后打印roundtriptime和clockoffset。一次运作实例如图7-41所示。

在这个例子中,client和server各自的本地时钟不是完全对准的,server的时间快了850us,用roundtrip程序能测量出这个时间差。有了这个时间差,就能校正分布式系统中测量得到的消息延迟。

比方说以图7-41为例,server在它本地1.235000s时刻发送了一条消息,client在它本地1.234300s收到这条消息,若直接计算的话延迟是-700us。这个结果肯定是错的,因为server和client不在一个时钟域(clock domain,这是数字电路中的概念),它们的时间直接相减无意义。如果我们已经测量得到server比client快850us,那么用这个数据做一次校正:-700+850=150us,这个结果就比较符合实际了。当然,在实际应用中,clockoffset要经过一个低通滤波才能使用,不然偶然性太大。

请读者思考:为什么不能直接以RTT/2作为两台机器之间收发消息的单程延迟?这个数学是偏大还是偏小?

这个程序在局域网中使用没有问题;如果在广域网上使用,而且RTT大于200ms,那么受Nagle算法影响,测量结果是错误的。因为应用程序记录的发包时间与操作系统真正发出数据包的时间之差不再是一个可以忽略的小间隔。具体分析留作练习,这能测试读者对Nagle的理解。这时候我们需要设置TCP_NODELAY参数,让程序在广域网上也能正常工作。

10. 用timingwheel踢掉空闲连接

本节介绍如何使用timingwheel来踢掉空闲的连接。一个连接如果若干秒没有收到数据,就被认为是空闲连接。本文的代码见examples/idleconnection。

在严肃的网络程序中,应用层的心跳协议是必不可少的。应该用心跳消息来判断对方进程是否能正常工作,“踢掉空闲连接只是一时的权宜之计。我这里想顺便讲讲shared_ptr和weak_ptr的用法。

如果一个连接连续几秒(后文以8s为例)内没有收到数据,就把它断开,为此有两种简单、粗暴的做法:

  • 每个连接保存“最后收到数据的时间lastReceiveTime”,然后用一个定时器,每秒遍历一遍所有连接,断开那些now-connection.lastReceiveTime)>8s的connection。这种做法全局只有一个repeated timer,不过每次timeout都要检查全部连接,如果连接数比较多(几千上方),这一步可能会比较费时。
  • 每个连接设置一个one-shot timer,超时定为8s,在超时的时候就断开本连接。当然,每次收到数据要去更新timer。这种做法需要很多个one-shot timer,会频繁地更新timers。如果连接数目比较大,可能对EventLoop的TimerQueue造成压力。

使用timingwheel能避免上述两种做法的缺点。连接超时不需要精确定时,只要大致8秒超时断开就行,多一秒、少一秒关系不大。处理连接超时可用一个简单的数据结构:8个桶组成的循环队列。第1个桶放1秒之后将要超时的连接,第2个桶放2秒之后将要超时的连接。每个连接一收到数据就把自已放到第8个桶,然后在每秒的timer里把第一个桶里的连接断开,把这个空桶挪到队尾。这样大致可以做到8秒没有数据就超时断开连接。更重要的是,每次不用检查全部的连接,只要检查第一个桶里的连接,相当于把任务分散了。

7.10.1 timingwheel原理

《Hashed and hierarchical timing wheels: efficient data structures for implementing a timer facility》这篇论文详细比较了实现定时器的各种数据结构,并提出了层次化的timingwheel与hash timingwheel等新结构。针对本节要解决的问题的特点,我们不需要实现一个通用的定时器,只用实现simple timingwheel即可。

simple timingwheel的基本结构是一个循环队列,还有一个指向队尾的指针(tail),这个指针每秒移动一格,就像钟表上的时针,timingwheel由此得名。

以下是某一时刻timingwheel的状态(见图7-42的左图),格子里的数字是倒计时(与通常的timingwheel相反),表示这个格子(桶子)中连接的剩余寿命。

1秒以后(见图7-42的右图),tail指针移动一格,原来四点钟方向的格子被清空,其中的连接已被断开。

连接超时被踢掉的过程:

假设在某个时刻,conn1到达,把它放到当前格子中,它的剩余寿命是7秒(见图7-43的左图)。此后conn1上没有收到数据。1秒之后(见图7-43的右图),tail指向下一个格子,conn1的剩余寿命是6秒。

又过了几秒,tail指向conn1之前的那个格子,conn1即将被断开(见图7-44的左图)。下一秒(见图7-44的右图),tail重新指向conn1原来所在的格子,清空其中的数据,断开conn1连接。

连接刷新:

如果在断开conn1之前收到数据,就把它移到当前的格子中。conn1的剩余寿命是3秒(见图7-45的左图),此时conn1收到数据,它的寿命恢复为7秒(见图7-45的右图)。

时间继续前进,conn1寿命递减,不过它已经比第一种情况长寿了(见图7-46)。

多个连接:

timingwheel中的每个格子是个hash set,可以容纳不止一个连接。

比如一开始,conn1到达。随后,conn2到达(见图7-47),这时候tail还没有移动,两个连接位于同一个格子中,具有相同的剩余寿命。(在图7-47中画成链表,代码中是哈希表。)

几秒之后,conn1收到数据,而conn2一直没有收到数据,那么conn1被移到当前的格子中。这时conn1的预期寿命比conn2长(见图7-48)。

10.2 代码实现与改进

我们用以前多次出现的EchoServer来说明具体如何实现timingwheel。代码见examples/idleconnection。

在具体实现中,格子里放的不是连接,而是一个特制的Entry struct,每个Entry包含TcpConnection的weak_ptr。Entry的析构函数会判断连接是否存在(用weak_ptr),如果还存在则断开连接。

数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct Entry : public muduo::copyable {
explicit Entry(const WeakTcpConnectionPtr& weakConn)
: weakConn_(weakConn) {}
~Entry() {
TcpConnectionPtr conn = weakConn_.lock();
if (conn) {
conn->shutdown();
}
}
WeakTcpConnectionPtr weakConn_;
};
typedef boost::shared_ptr<Entry> EntryPtr;
typedef boost::weak_ptr<Entry> weakEntryPtr;
typedef boost::unordered_set<EntryPtr> Bucket;
typedef boost::circular_buffer<Bucket> WeakConnectionList;

在实现中,为了简单起见,我们不会真的把一个连接从一个格子移到另一个格子,而是采用引用计数的办法,用shared_ptr来管理Entry。如果从连接收到数据就把对应的EntryPtr放到这个格子里,这样它的引用计数就递增了。当Entry的引用计数递减到零时,说明它没有在任何一个格子里出现,那么连接超时,Entry的析构函数会断开连接。

注意在头文件中我们自已定义了shared_ptr<T>的hash函数,原因是直到Boost 1.47.0之前,unordered_set<shared_ptr<T>>虽然可以编译通过,但是其hash_value是shared_ptr隐式转换为bool的结果。也就是说,如果不自定义hash函数,那么unordered_set/map会退化为链表。

timingwheel用boost::circular_buffer实现,其中每个Bucket元素是个hash set of EntryPtr。

在构造函数中,注册每秒的回调(EventLoop::runEvery注册EchoServer::onTimer(),然后把timingwheel设为适当的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
EchoServer::EchoServer(EventLoop* loop,
const InetAddress& listenAddr,
int idleSeconds)
: loop_(loop),
server_(loop, listenAddr, "EchoServer"),
connectionBuckets_(idleSeconds) {
server_.setConnectionCallback(
boost::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&EchoServer::onMessage, this, _1, _2, _3));
loop->runEvery(1.0, boost::bind(&EchoServer::onTimer, this));
connectionBuckets_.resize(idleSeconds);
}

其中,EchoServer::onTimer()的实现只有一行:往队尾添加一个空的Bucket,这样circular_buffer会自动弹出队首的Bucket,并析构之。在析构Bucket的时候,会依次析构其中的EntryPtr对象,这样Entry的引用计数就不用我们去操心,C++的值语义会帮我们搞定一切。

1
2
3
void EchoServer::onTimer() {
connectionBuckets_.push_back(Bucket());
}

在连接建立时,创建一个Entry对象,把它放到timingwheel的队尾。另外,我们还需要把Entry的弱引用保存到TcpConnection的context里,因为在收到数据的时候还要用到Entry。(思考题:如果TcpConnection::setContext保存的是强引用EntryPtr,会出现什么情况?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void EchoServer::onConnection(const TcpConnectionPtr& conn) {
LOG_INFO << "EchoServer-" << conn->peerAddress().toIpPort() << "->"
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "Up" : "Down");
if (conn->connected()) {
EntryPtr entry(new Entry(conn));
connectionBuckets_.back().insert(entry);
WeakEntryPtr weakEntry(entry);
conn->setContext(weakEntry);
} else {
assert(!conn->getContext().empty());
WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));
LOG_DEBUG << "Entry use_count = " << weakEntry.use_count();
}
}

在收到消息时,从TcpConnection的context中取出Entry的弱引用,把它提升为强引用EntryPtr,然后放到当前的timingwheel队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void EchoServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp time) {
string msg(buf->retrieveAsString());
LOG_INFO << conn->name() << "echo " << msg.size()
<< " bytes at " << time.toString();
conn->send(msg);
assert(!conn->getContext().empty());
WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));
EntryPtr entry(weakEntry.lock());
if (entry) {
connectionBuckets_.back().insert(entry);
}
}

然后呢?没有然后了,程序已经完成了我们想要的功能。(完整的代码会调用dumpConnectionBuckets()来打印circular_buffer变化的情况,运行一下即可理解。)

希望本节内容有助于你理解shared_ptr和weak_ptr的引用计数。

改进:

在现在的实现中,每次收到消息都会往队尾添加EntryPtr(当然,hashset会帮我们去重)。一个简单的改进措施是,在TcpConnection里保存“最后一次往队尾添加引用时的tail位置”,收到消息时先检查tail是否变化,若无变化则不重复添加EntryPtr,若有变化则把EntryPtr从旧的Bucket移到当前队尾Bucket。这样或许能提高空间和时间效率。以上改进留作练习。

另外一个思路是“选择排序”:使用链表将TcpConnection串起来,TcpConnection每次收到消息就把自己移到链表末尾,这样链表是按接收时间先后排序的。再用一个定时器定期从链表前端查找并踢掉超时的连接。代码示例位于同一目录。

11. 简单的消息广播服务

本节介绍用muduo实现一个简单的topic-based消息广播服务,这其实是“聊天室”的一个简单扩展,不过聊天的不是人,而是分布式系统中的程序。本节的代码见examples/hub。

在分布式系统中,除了常用的end-to-end通信,还有一对多的广播通信。一提到“广播”,或许会让人联想到P2P多播或P组播,这不是本节的主题。本节将要谈的是基于TCP协议的应用层广播。示意图如图7-49所示。

图7-49中的圆角矩形代表程序,“Hub”是一个服务程序,不是网络集线器,它起到类似集线器的作用,故而得名。

Publisher和Subscriber通过TCP协议与Hub程序通信。Publisher把消息发到某个topic上,Subscriber订阅该topic,然后就能收到消息。即Publisher借助Hub把消息广播给了一个或多个Subscriber。

这种pub/sub结构的好处在于可以增加多个Subscriber而不用修改Publisher,一定程度上实现了“解耦”(也可以看成分布式的Observer pattern)。由于走的是TCP协议,广播是基本可靠的,这里的“可靠”指的是“比UDP可靠”,不是“完全可靠”。

为了避免串扰(cross-talk),每个topic在同一时间只应该有一个Publisher,Hub不提供compare-and-swap操作。

应用层广播在分布式系统中用处很大,这里略举几例。

体育比分转播,有8片比赛场地正在进行羽毛球比赛,每个场地的计分程序把当前比分发送到各自的topic上(第1号场地发送到court1,第2号场地发送到court2,依此类推)。需要用到比分的程序(赛场的大屏幕显示、网上比分转播等)自已订阅感兴趣的topic,就能及时收到最新比分数据。由于本节实现的不是100%可靠广播,那么消息应该是snapshot,而不是delta。(换句话说,消息的内容是“现在是儿比儿”,而不是“刚才谁得分”。)

负载监控每台机器上运行一个监控程序,周期性地把本机当前负载(CPU、网络、磁盘、温度)publish到以hostname命名的topic上,这样需要用到这些数据的程序只要在Hub订阅相应的topic就能获得数据,无须与多台机器直接打交道。(为了可靠起见,监控程序发送的消息中应该包含时间截,这样能防止过期(stale)数据,甚至一定程度上起到心跳的作用。)沿着这个思路,分布式系统中的服务程序也可以把自己的当前负载发布到Hub上,供load balancer和monitor取用。

11.1 协议

为了简单起见,muduo的Hub示例采用以“”分界的文本协议,这样用telnet就能测试Hub。协议只有以下三个命令:

1
sub<topic>\r\n

该命令表示订阅<topic>,以后该topic有任何更新都会发给这个TCP连接。在sub的时候,Hub会把该<topic>上最近的消息发给此Subscriber。

1
unsub <topic>

该命令表示退订<topic>。

1
pub<topic>\r\n<content>\r\n

往<topic>发送消息,内容为<content>。所有订阅了此<topic>的Subscriber会收到同样的消息“pub<topic><content>”。

11.2 代码

muduo示例中的Hub分为几个部分:

  • Hub服务程序,负责一对多的消息分发。它会记住每个client订阅了哪些topic,只把消息发给特定的订阅者。代码参见examples/hub/hub.cC。
  • pubsub库,为了方便编写使用Hub服务的应用程序,我写了一个简单的client library,用来和Hub打交道。这个library可以订阅topic、退订topic、往指定的topic发布消息。代码参见examples/hub/pubsub.(h,cc]。
  • sub示例程序,这个命令行程序订阅一个或多个topic,然后等待Hub的数据。代码参见examples/hub/sub.cC。
  • pub示例程序,这个命令行程序往某个topic发布一条消息,消息内容由命令行

参数指定。代码参见examples/hub/pub.c。

一个程序可以既是Publisher又是Subscriber,而且pubsub库只用一个TCP连接(这样failover比较简便)。使用范例如下所示。

  1. 开启4个命令行窗口。
  2. 在第一个窗口运行$hub 9999。
  3. 在第二个窗口运行$sub127.0.0.1:9999mytopic。
  4. 在第三个窗口运行$sub127.0.0.1:9999mytopic court。
  5. 在第四个窗口运行$pub127.0.0.1:9999mytopic "Hello world",这时第二、三号窗口都会打印“mytopic:Hello world.”,表明收到了mytopic这个主题上的消息。
  6. 在第四个窗口运行$pub127.0.0.1:9999court "13:11",这时第三号窗口会打印“court:13:11”,表明收到了court这个主题上的消息。第二号窗口没有订阅此消息,故无输出。

借助这个简单的pub/sub机制,还可以做很多有意思的事情。比如把分布式系统中的程序的一部分end-to-end通信改为通过pub/sub来做(例如,原来是A向B发一个SOAP request,B通过同一个TCP连接发回response(分析二者的通信只能通过查看log或用tcpdump截获):现在是A往topic_a_to_b上发布request,B在topic_b_to_a上发response),这样多挂一个monitoring Subscriber就能轻易地查看通信双方的沟通情况,很容易做状态监控与troubleshooting。

11.3 多线程的高效广播

在本节这个例子中,Hub是个单线程程序。假如有一条消息要广播给1000个订阅者,那么只能一个一个地发,第1个订阅者收到消息和第1000个订阅者收到消息的时差可以长达若干秒。那么,有没有办法提高速度、降低延迟呢?我们当然会想到用多线程。但是简单的办法并不一定能奏效,因为一个全局锁就把多线程程序退化为单线程执行。为了真正提速,我想到了用thread local的办法,比如把1000个订阅者分给4个线程,每个线程的操作基本都是无锁的,这样可以做到并行地发送消息。示例代码见examples/asio/chat/server_threaded_highperformance.cc。

12. “串并转换”连接服务器及其自动化测试

本节介绍如何使用testharness来测试一个具有内部逻辑的网络服务程序。这是一个既扮演服务端,又扮演客户端的网络程序。代码见examples/multiplexer。

云风在他的博客中提到了网游连接服务器的功能需求,我用C++初步实现了这些需求,并为之编写了配套的自动化testharness,作为muduo网络库的示例。

注意:本节呈现的代码仅仅实现了基本的功能需求,没有考虑安全性,也没有特别优化性能,不适合用作真正的放在公网上运行的网游连接服务器。

12.1 功能需求

这个连接服务器把多个客户连接汇聚为一个内部TCP连接,起到“数据审并转换”的作用,让backend的逻辑服务器专心处理业务,而无须顾及多连接的并发性。系统的框图如图7-50所示。

这个连接服务器的作用与数字电路中的数据选择器(multiplexer)类似(见图7-51),所以我把它命名为multiplexer。

12.2 实现

multiplexer的功能需求不复杂,无非是在backend connection和client connections之间倒腾数据。对每个新client connection分配一个新的整数id,如果id用完了,则断开新连接(这样通过控制d的数目就能控制最大连接数)。另外,为了避免id过快地被复用(有可能造成backend串话),multiplexer采用queue来管理free id,每次从队列的头部取id,用完之后放回queue的尾部。具体来说,主要是处理四种事件:

  • 当client connection到达或断开时,向backend发出通知。代码见onclientConnection()。
  • 当从client connection收到数据时,把数据连同connection id一同发给backend。代码见onclientMessage()。
  • 当从backend connection收到数据时,辨别数据是发给哪个client connection,并执行相应的转发操作。代码见onBackendMessage()。
  • 如果backend connection断开连接,则断开所有client connections(假设client会自动重试)。代码见onBackendConnection()。

由上可见,multiplexer的功能与proxy类似。multiplexer_simple.cc是一个单线程版的实现,借助muduo的IO multiplexing特性,可以方便地处理多个并发连接。多线程版的实现见multiplexer.cC。

在实现的时候有以下两点值得注意。

TcpConnection的id如何存放?当从backend收到数据,如何根据id找到对应的client connection?当从client connection收到数据,如何得知其id?

第一个问题比较好解决,用std::map<int, TcpConnectionPtr> clientConns_保存从id到client connection的映射就行。

第二个问题固然可以用类似的办法解决,但是我想借此介绍一下muduo::net::TcpConnection的context功能。每个TcpConnection都有一个boost::any成员,可由客户代码自由支配(get/set),代码如下。这个boost::any是TcpConnection的context,可以用于保存与connection绑定的任意数据(比方说connection id、connection的最后数据到达时间、connection所代表的用户的名字等等)。这样客户代码不必继承TcpConnection就能attach自己的状态,而且也用不着TcpConnectionFactory了(如果允许继承,那么必然要向TcpServer注人此factory)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
muduo/net/TcpConnection.h
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection> {
public:
void setContext(const boost::any& context) {
context_ = context;
}
const boost::any& getContext() const {
return context_;
}
boost::any* getMutableContext() {
return &context_;
}
// ...
private:
// ...
boost::any context_;
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
};

对于multiplexer,在onclientConnection()里调用conn->setContext(id),把id存到TcpConnection对象中。onclientMessage()从TcpConnection对象中取得id,连同数据一起发送给backend,完整实现如下:

1
2
3
4
5
6
7
8
9
10
examples/multiplexer/multiplexer_simple.cc
void onClientMessage(const TcpConnectionPtr& conn, Buffer buf, Timestamp) {
if (!conn->getContext().empty()) {
int id = boost::any_cast<int>(conn->getContext());
sendBackendBuffer(id, buf);
} else {
buf.retrieveAll();
// error handling
}
}

TcpConnection的生命期如何管理?由于client connection是动态创建并销毁的,其生与灭完全由客户决定,如何保证backend想向它发送数据的时候,这个TcpConnection对象还活着?解决思路是用reference counting。当然,不用自己写,用boost::shared_ptr即可。TcpConnection是muduo中唯一默认采用shared_ptr来管理生命期的对象,盖由其动态生命期的本质决定。更多内容请参考第1章。

multiplexer采用二进制协议,如何测试呢?

12.3 自动化测试

multiplexer是muduo网络编程示例中第一个具有non-trivial业务逻辑的网络程序,根据\(\S9.7\)“分布式程序的自动化回归测试”的思路,我为它编写了测试夹具(testharness)。代码见examples/multiplexer/harness/。

这个testharness采用Java编写,用的是Netty网络库。这个testharness要同时扮演clients和backend,也就是既要主动发起连接,也要被动接受连接。而且,testharness与multiplexer的启动顺序是任意的,如何做到这一点请阅读代码。结构如图7-52所示。

testharness会把各种event汇聚到一个blocking queue里边,方便编写test case。test case则操纵testharness,发起连接、发送数据、检查收到的数据,例如以下是一个test case:testcase/TestOneClientSend.java。

这里的几个test cases都是用Java直接写的,如果有必要,也可以采用Groovy来缩写,这样可以在不重启testharness的情况下随时修改、添加test cases。具体做法见笔者的博客《“过家家”版的移动离线计费系统实现》。

12.4 将来的改进

有了这个自动化的testharness,我们可以比较方便且安全地修改(甚至重新设计)multiplexer了。例如:

  • 增加“backend发送指令断开client connection”的功能。有了自动化测试,这个新功能可以被单独测试(开发者测试),而不需要真正的backend参与进来。
  • 将multiplexer改用多线程重写。有了自动化回归测试,我们不用担心破坏原有的功能,可以放心大胆地重写。而且由于testharness是从外部测试,不是单元测试,重写multiplexer的时候不用动test cases,这样保证了测试的稳定性。另外,这个test harness稍加改进还可以进行stress testing,既可用于验证多线程multiplexer的正确性,亦可对比其相对单线程版的效率提升。

13. socks4a代理服务器

本节介绍用muduo实现一个简单的socks4a代理服务器(examples/socks4a/)。

13.1 TCP中继器

在实现socks4a proxy之前,我们先写一个功能更简单的网络程序TCP中继器(TCP relay),或者叫做穷人的tcpdump(poorman's tcpdump)。

一般情况下,客户端程序直接连接服务端,如图7-53所示。

有时候,我们想在client和server之间放一个中继器(relay),把client与server之间的通信内容记录下来。这时用tcpdump是最方便省事的,但是tcpdump需要root权限,万一拿不到权限呢?穷人有穷人的办法,自已写一个TcpRelay,让client连接TcpRelay,再让TcpRelay连接server,如图7-54中的T型结构,TcpRelay扮演了类似proxy的角色。

TcpRelay是我们自已写的,可以动动手脚。除了记录通信内容外,还可以制造延迟,或者故意翻转1bit数据以模拟router硬件故障。

TcpRelay的功能(业务逻辑)看上去很简单,无非是把连接C上收到的数据发给连接S,同时把连接S上收到的数据发给连接C。但存细考虑起来,细节其实不那么简单:

  1. 建立连接。为了真实模拟client,TcpRelay在accept连接C之后才向server发起连接S,那么在S建立起来之前,从C收到数据怎么办?要不要暂存起来?
  2. 并发连接的管理。图7-54中只面出了一个client,实际上TcpRelay可以服务多个client,左右两边这些并发连接如何管理,如何防止串话(crosstalk)?
  3. 连接断开。client和server都可能主动断开连接。当client主动断开连接C时,TcpRelay应该立刻断开S。当server主动断开连接S时,TcpRelay应立刻断开C。这样才能比较精确地模拟client和server的行为。在关闭连接的一刹那,又有新的client连接进来,复用了刚刚close的fd号码,会不会造成串话?万一client和server几乎同时主动断开连接,TcpRelay如何应对?
  4. 速度不匹配。如果连接C的带宽是100kB/s,而连接S的带宽是10MB/s,不巧server是个chargen服务,会全速发送数据,那么会不会撑爆TcpRelay的buffer?如何限速?特别是在使用non-blocking IO和level-trigger polling的时候如何限制读取数据的速度?

在看muduo的实现之前,请读者思考:如果用Sockets API来实现TcpRelay,如何解决以上这些问题。

如果用传统多线程阻塞IO的方式来实现TcpRelay一点也不难,好处是自动解决了速度不匹配的问题,Python代码如下。这个实现功能上没有问题,但是并发度就高不到哪儿去了。注意以下代码会一个字节一个字节地转发数据,每两个字节之间间隔1ms,可以用于测试网络程序的消息解码功能(codec)是否完善。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
recipes/python/tcprelay.py
#!/usr/bin/python

import socket, thread, time

listen_port = 3007
connect_addr = ('localhost', 207)
sleep_per_byte = 0.0001

def forward(source, destination):
source_addr = source.getpeername()
while True:
data = source.recv(4096)
if data:
for i in data:
destination.sendall(i)
time.sleep(sleep_per_byte)
else:
print('disconnect', source_addr)
destination.shutdown(socket.SHUT_WR)
break

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('', listen_port))
serversocket.listen(5)

while True:
clientsocket, address = serversocket.accept()
print('accepted', address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(connect_addr)
print('connected', sock.getpeername())
thread.start_new_thread(forward, (clientsocket, sock))
thread.start_new_thread(forward, (sock, clientsocket))

TcpRelay的实现很简单,只有几十行代码(examples/socks4a/tcprelay.cc),主要逻辑都在Tunnel class里(examples/socks4a/tunnel.h)。这个实现很好地解决了前三个问题,第四个问题的解法比较粗暴,用的是HighWaterMarkCallback,如果发送缓冲区堆积的数据大于10MiB就断开连接(更好的办法见\(\S8.9.3\))。TcpRelay既是服务端,又是客户端,在阅读代码的时候要注意onclientMessage()处理的是从server发来的消息,表示它作为客户端(client)收到的消息,这与前面的multiplexer正好相反。

13.2 socks4a代理服务器

socks4a的功能与TcpRelay非常相似,也是把连接C上收到的数据发给连接S,同时把连接S上收到的数据发给连接C。它与TcpRelay的区别在于,TcpRelay固定连到某个server地址,而socks4a允许client指定要连哪个server。在accept连接C之后,socks4a server会读几个字节,以了解server的地址,再发起连接S。socks4a的协议非常简单,请参考维基百科。

muduo的socks4a代理服务器的实现在examples/socks4a/socks4a.cc,它也使用了Tunnel class。与TcpRelay相比,只多了解析server地址这一步骤。目前DNS地址解析这一步用的是阻塞的gethostbyname()函数,在真正的系统中,应该换成非阻塞的DNS解析,可参考\(\S7.15\)

muduo的这个socks4a是个标准的网络服务,可以供Web浏览器使用(我正是这么测试它的)。

13.3 N:1与1:N连接转发

云风在《写了一个proxy用途你懂的》中写了一个TCP隧道tunnel,程序由三部分组成:N:1连接转发服务,1:N连接转发服务,socks代理服务。

我仿照他的思路,用muduo实现了这三个程序。不同的是,我没有做数据混淆,所以功能上有所减弱。

  • N:1连接转发服务就是\(\S7.12\)中的multiplexer(数据选择器)。
  • 1:N连接转发服务是云风文中提到的backend,一个数据分配器(demultiplexer),代码在examples/multiplexer/demux.cC。
  • socks代理服务正是\(\S7.13.2\)实现的socks4a。

14. 短址服务

muduo内置了一个简陋的HTTP服务器,可以处理简单的HTTP请求。这个HTTP服务器是面向内网的暴露进程状态的监控端口,不是面向公网的功能完善且健壮的httpd,其接口与J2EE的HttpServlet有几分类似。我们可以拿它来实现一个简单的短URL转发服务,以简要说明其用法。代码位于examples/shorturl/shorturl.cC。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::map<string, string> redirections; // URL 转发表
void onRequest(const HttpRequest& req, HttpResponse* resp) {
LOG_INFO << "Headers " << req.methodString() << " " << req.path();
// TODO: support PUT and DELETE to create new redirections on-the-fly.
std::map<string, string>::const_iterator it = redirections.find(req.path());
if (it != redirections.end()) { // 如果找到了短址
resp->setStatusCode(HttpResponse::k301MovedPermanently);
resp->setStatusMessage("Moved Permanently");
resp->addHeader("Location", it->second); // 转发到it->second 地址
// resp->setCloseConnection(true);
} else {
// 未找到短址,返回404
resp->setStatusCode(HttpResponse::k404NotFound);
resp->setStatusMessage("Not Found");
}
}

int main() {
redirections["/1"] = "http://chenshuo.com";
redirections["/2"] = "http://blog.csdn.net/Solstice";
EventLoop loop;
HttpServer server(&loop, InetAddress(8000), "shorturl");
server.setHttpCallback(onRequest);
server.start();
loop.loop(); // 开始事件循环
}

muduo并没有为短连接TCP服务优化,无法发挥多核优势。一种真正高效的优化手段是修改Linux内核,例如Google的SO_REUSEPORT内核补丁。

读者可以试试建立一个loop转发,例如“/1”→“/2”→“/3”→“/1”,看看浏览器反应如何。

15. 与其他库集成

前面介绍的网络应用例子都是直接用muduo库收发网络消息,也就是主要介绍TcpConnection、TcpServer、TcpClient、Buffer等class的使用。本节将稍微深入其内部,介绍Channel class的用法,通过它可以把其他一些现成的网络库融入muduo的event loop中。

Channel class是IO事件回调的分发器(dispatcher),它在handleEvent()中根据事件的具体类型分别回调ReadCallback、WriteCallback等,代码见\(\S8.1.1\)。每个Channel对象服务于一个文件描述符,但并不拥有fd,在析构函数中也不会close(fd)。Channel也使用muduo一贯的boost::function来表示函数回调,它不是基类。这样用户代码不必继承Channel,也无须override虚函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
muduo/net/Channel.h
class Channel : boost::noncopyable {
public:
typedef boost::function<void()> EventCallback;
typedef boost::function<void(Timestamp)> ReadEventCallback;

Channel(EventLoop* loop, int fd);
~Channel();

void setReadCallback(const ReadEventCallback& cb);
void setWriteCallback(const EventCallback& cb);
void setCloseCallback(const EventCallback& cb);
void setErrorCallback(const EventCallback& cb);

void enableReading();
// void disableReading(); // 暂时没有用到
void enableWriting();
void disableWriting();
void disableAll();

void handleEvent(Timestamp receiveTime); // 由 EventLoop::loop()调用

/// Tie this channel to the owner object managed by shared_ptr,
/// to prevent the owner object being destroyed in handleEvent.
void tie(const boost::shared_ptr<void>&); // tie()的例子见7.15.3节

int fd() const; // obvious
void remove(); // loop_->removeChannel(this);

//...
}

Channel与EventLoop的内部交互有两个函数EventLoop::updateChannel(Channel*)和EventLoop::removeChannel(Channel*)。客户需要在Channel析构前自己调用Channel::remove()。

后面我们将通过一些实例来介绍Channel class的使用。

15.1 UDNS

UDNS是一个stub DNS解析器,它能够异步地发起DNS查询,再通过回调函数通知结果。UDNS在设计的时候就考虑到了配合(融入)主程序现有的基于select/poll/epoll的event loop模型,因此它与muduo的配接相对较为容易。由于License限制,本节的代码位于单独的项目中:muduo-udns

muduo-udns由三部分组成,一是udns-0.2源码;二是UDNS与muduo的配接器(adapter),即Resolver class,位于Resolver.h和Resolver.cc;三是简单的测试dns.cc,展示Resolver的使用。前两部分构成muduo-udns程序库。

先看Resolver class的接口(Resolver.h):

1
2
3
4
5
6
7
8
9
10
class Resolver : boost::noncopyable {
public:
typedef boost::function<void(const InetAddress&)> Callback;
Resolver(EventLoop* loop);
Resolver(EventLoop* loop, const InetAddress& nameServer);
~Resolver();
void start();
bool resolve(const StringPiece& hostname, const Callback& cb);
// ...
};

其中第一个构造函数会使用系统默认的DNS服务器地址,第二个构造函数由用户指明DNS服务器的IP地址(见后面的练习1)。用户最关心的是resolve()函数,它会回调用户的Callback。

在介绍Resolver的实现之前,先来看它的用法(dns.cc),下面这段代码同时解析三个域名,并在stdout输出结果。注意回调函数只提供解析后的地址,因此resolve callback需要自己设法记住域名,这里我用的是boost::bind。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void resolveCallback(const string& host, const InetAddress& addr) {
LOG_INFO << "resolved " << host << "->" << addr.toIp();
}

void resolve(Resolver* res, const string& host) {
res->resolve(host, boost::bind(&resolveCallback, host, _1));
}

int main(int argc, char* argv[]) {
EventLoop loop;
Resolver resolver(&loop);
resolver.start();
resolve(&resolver, "chenshuo.com");
resolve(&resolver, "www.example.com");
resolve(&resolver, "www.google.com");
loop.loop(); // 开始事件循环
}

由于是异步解析,因此输出结果的顺序和提交请求的顺序不一定一致,例如:

1
2
3
2012-08-22 04:46:39.945033 15726 INFO resolved www.google.com->74.125.71.104
2012-08-22 04:46:41.944464 15726 INFO resolved chenshuo.com->173.212.209.144
2012-08-22 04:46:42.068084 15726 INFO resolved www.example.com->192.0.43.10

UDNS与muduo Resolver的交互过程如下:

  1. 初始化dns_ctx*之后,Resolver::start()调用dns_open()获得UDNS使用的文件描述符,并通过muduo Channel观察其可读事件。由于UDNS始终只用一个socket fd,只观察一个事件,因此特别容易和现有的event loop集成。
  2. 在解析域名时(Resolver::resolve()),调用dns_submit_a4()发起解析,并通过网络库注册单次定时器回调。
  3. 在fd可读时(Resolver::onRead()),调用dns_ioevent()。如果DNS解析成功,会回调Resolver::dns_query_a4()通知解析的结果,继而调用Resolver::onQueryResult(),后者会回调用户Callback。
  4. 在超时后(Resolver::onTimer()),调用dns_timeouts(),必要时继续注册下一次定时器回调。

可见UDNS是一个设计良好的库,可与现有的event loop很好地结合。UDNS使用定时器的原因是UDP可能丢包,因此程序必须自己处理超时重传。

Resolver class不是线程安全的,客户代码只能在EventLoop所属的线程调用它的Resolver::resolve()成员函数,解析结果也是由这个线程回调客户代码。这个函数通过loop->assertInLoopThread()来确保不被误用。

C++程序与C语言函数库交互的一个难点在于资源管理,muduo-udns不得已使用了手工new/delete的做法,每次解析会在堆上创建QueryData对象,这样在UDNS回调Resolver::dns_query_a4()时才知道该回调哪个用户Callback。

练习1:补充构造函数Resolver(EventLoop* loop, const InetAddress& nameServer)的实现。可利用文档介绍的dns_add_serv_s()函数。

练习2:用muduo-udns改进S7.13的socks4a服务器,替换其中阻塞的gethostbyname()函数调用,实现完全的无阻塞服务。

15.2 c-ares DNS

C-ares DNS是一款常用的异步DNS解析库,\(\S6.2\)介绍了它的安装方法,本节将简要介绍其与muduo的集成。示例代码位于examples/c-ares,代码结构与\(\S7.15.1\)的UDNS非常相似。Resolver.h和Resolver.cc是c-ares DNS与muduo的配接器(adapter);dns.cc是简单的测试,展示Resolver的使用。c-ares DNS的选项非常多,本节只是展示其与muduo EventLoop集成的基本做法,c-ares Resolver并没有暴露其全部功能。

c-ares Resolver的接口和用法与前面UDNS Resolver相同,只是少了start()函数,此处不再重复举例。

c-ares Resolver的实现与前面UDNS Resolver很相似:

  1. Resolver::resolve()调用ares_gethostbyname()发起解析,并通过ares_timeout获得超时的秒数,注册定时器。
  2. 在fd可读时(Resolver::onRead()),调用ares_process_fd().如果DNS解析成功,会回调Resolver::ares_host_callback()通知解析的结果,继而调用Resolver::onQueryResult(),后者会回调用户Callback。
  3. 在超时后(Resolver::onTimer()),调用ares_process_fd()处理这一事件,并再次调用dns_timeouts获得下一次超时的间隔,必要时继续注册下一次定时器回调。

c-ares Resolver的线程安全性与UDNS Resolver相同。

与UDNS不同,c-ares DNS会用到不止一个socket文件描述符,而且既会用到fd可读事件,又会用到fd可写事件,因此c-ares Resolver的代码比UDNS要复杂一些。Resolver::ares_sock_create_callback()是新建socket fd的回调函数,其中会调用Resolver::onSockCreate()来创建Channel对象,这正是Resolver没有start()成员函数的原因。Resolver::ares_sock_state_callback()是变更socket fd状态的回调函数,会通知该观察哪些事件(可读and/or可写)。

15.3 curl

libcurl是一个常用的HTTP客户端库,可以方便地下载HTTP和HTTPS数据。libcurl有两套接口,easy和multi,本节介绍的是使用其multi接口以达到单线程并发访问多个URL的效果。muduo与libcurl搭配的例子见examples/curl,其中包含单线程多连接并发下载同一文件的示例,即单线程实现的“多线程下载器”。

libcurl融入muduo EventLoop的复杂度比前面两个DNS库都更高,一方面因为它本身的功能丰富,另一方面也因为它的接口设计更偏重传统阻塞IO(它原本是从curl(1)这个命令行工具剥离出来的),在事件驱动方面的调用、回调、传参都比较烦琐。这里不去详细解释每一个函数的作用,想必读者在读过前两节之后已经对Channel的用法有了基本的了解,对照libcurl文档和muduo代码就能搞明白。

15.4 更多

除了前面举的几个例子,muduo当然还可以将其他涉及网络IO的库融入其EventLoop/Channel框架,我能想到的有:

  • libmicrohttpd:可嵌入的HTTP服务器。
  • libpq:PostgreSQL的官方客户端库。
  • libdrizzle:MySQL的非官方客户端库。
  • QuickFIX:常用的FIX消息库。

在有具体应用场景的时候,我多半会为之提供muduo adapter,也欢迎用户贡献有关补丁。

另外一个扩展思路是,对每个TCP连接创建一个lua state,用muduo为lua提供通信机制。然后用lua来编写业务逻辑,这也可以做到在线更改逻辑而不重启进程。就像OpenResty和云风的skynet那样。这种做法还可以利用coroutine来简化业务逻辑的实现。


7. muduo编程示例
http://binbo-zappy.github.io/2024/12/25/muduo多线程/7-muduo编程示例/
作者
Binbo
发布于
2024年12月25日
许可协议