第7章 muduo编程示例(4)
7.10 用timing wheel踢掉空闲连接
本节介绍如何使用timing wheel来踢掉空闲的连接。一个连接如果若干秒没有收到数据,就被认为是空闲连接。本文的代码见examples/idleconnection。
在严肃的网络程序中,应用层的心跳协议是必不可少的。应该用心跳消息来判断对方进程是否能正常工作,“踢掉空闲连接”只是一时的权宜之计。我这里想顺便讲讲shared_ptr和weak_ptr的用法。
如果一个连接连续几秒(后文以8s为例)内没有收到数据,就把它断开,为此有两种简单、粗暴的做法:
● 每个连接保存“最后收到数据的时间lastReceiveTime”,然后用一个定时器,每秒遍历一遍所有连接,断开那些(now-connection.lastReceiveTime)>8s的connection。这种做法全局只有一个repeated timer,不过每次timeout都要检查全部连接,如果连接数目比较大(几千上万),这一步可能会比较费时。
● 每个连接设置一个one-shot timer,超时定为8s,在超时的时候就断开本连接。当然,每次收到数据要去更新timer。这种做法需要很多个one-shot timer,会频繁地更新timers。如果连接数目比较大,可能对EventLoop的TimerQueue造成压力。
使用timing wheel能避免上述两种做法的缺点。timing wheel可以翻译为“时间轮盘”或“刻度盘”,本文保留英文。
连接超时不需要精确定时,只要大致8秒超时断开就行,多一秒、少一秒关系不大。处理连接超时可用一个简单的数据结构:8个桶组成的循环队列。第1个桶放1秒之后将要超时的连接,第2个桶放2秒之后将要超时的连接。每个连接一收到数据就把自己放到第8个桶,然后在每秒的timer里把第一个桶里的连接断开,把这个空桶挪到队尾。这样大致可以做到8秒没有数据就超时断开连接。更重要的是,每次不用检查全部的连接,只要检查第一个桶里的连接,相当于把任务分散了。——这方法确实比前面的效率高。
7.10.1 timing wheel原理
《Hashed and hierarchical timing wheels:efficient data structures for imple-menting a timer facility》[注36:http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf]这篇论文详细比较了实现定时器的各种数据结构,并提出了层次化的timing wheel与hash timing wheel等新结构。针对本节要解决的问题的特点,我们不需要实现一个通用的定时器,只用实现simple timing wheel即可。
simple timing wheel的基本结构是一个循环队列,还有一个指向队尾的指针(tail),这个指针每秒移动一格,就像钟表上的时针,timing wheel由此得名。
以下是某一时刻timing wheel的状态(见图7-42的左图),格子里的数字是倒计时(与通常的timing wheel相反),表示这个格子(桶子)中连接的剩余寿命。

图7-42
1秒以后(见图7-42的右图),tail指针移动一格,原来四点钟方向的格子被清空,其中的连接已被断开。
连接超时被踢掉的过程
假设在某个时刻,conn 1到达,把它放到当前格子中,它的剩余寿命是7秒(见图7-43的左图)。此后conn 1上没有收到数据。1秒之后(见图7-43的右图),tail指向下一个格子,conn 1的剩余寿命是6秒。

图7-43
又过了几秒,tail指向conn 1之前的那个格子,conn 1即将被断开(见图7-44的左图)。下一秒(见图7-44的右图),tail重新指向conn 1原来所在的格子,清空其中的数据,断开conn 1连接。

图7-44
连接刷新
如果在断开conn 1之前收到数据,就把它移到当前的格子里。conn 1的剩余寿命是3秒(见图7-45的左图),此时conn 1收到数据,它的寿命恢复为7秒(见图7-45的右图)。

图7-45
时间继续前进,conn 1寿命递减,不过它已经比第一种情况长寿了(见图7-46)。

图7-46
多个连接
timing wheel中的每个格子是个hash set,可以容纳不止一个连接。比如一开始,conn 1到达。随后,conn 2到达(见图7-47),这时候tail还没有移动,两个连接位于同一个格子中,具有相同的剩余寿命。(在图7-47中画成链表,代码中是哈希表。)

图7-47
几秒之后,conn 1收到数据,而conn 2一直没有收到数据,那么conn 1被移到当前的格子中。这时conn 1的预期寿命比conn 2长(见图7-48)。

图7-48
7.10.2 代码实现与改进
我们用以前多次出现的EchoServer来说明具体如何实现timing wheel。代码见examples/idleconnection。
在具体实现中,格子里放的不是连接,而是一个特制的Entry struct,每个Entry包含TcpConnection的weak_ptr。Entry的析构函数会判断连接是否还存在(用weak_ptr),如果还存在则断开连接。
数据结构:(本节的代码压缩了单行缩进)
struct Entry: public muduo::copyable{explicit Entry(const WeakTcpConnectionPtr& weakConn):weakConn_(weakConn){}~Entry(){muduo::net::TcpConnectionPtr conn = weakConn_.lock();if(conn){conn->shutdown();}}WeakTcpConnectionPtr weakConn_;};typedef std::shared_ptr<Entry> EntryPtr;typedef std::weak_ptr<Entry> WeakEntryPtr;typedef std::unordered_set<EntryPtr> Bucket;typedef boost::circular_buffer<Bucket> WeakConnectionList;
在实现中,为了简单起见,我们不会真的把一个连接从一个格子移到另一个格子,而是采用引用计数的办法,用shared_ptr来管理Entry。如果从连接收到数据,就把对应的EntryPtr放到这个格子里,这样它的引用计数就递增了。当Entry的引用计数递减到零时,说明它没有在任何一个格子里出现,那么连接超时,Entry的析构函数会断开连接。
注意在头文件中我们自己定义了shared_ptr<T>的hash函数,原因是直到Boost 1.47.0之前,unordered_set<shared_ptr<T>>虽然可以编译通过,但是其hash_value是shared_ptr隐式转换为bool的结果。也就是说,如果不自定义hash函数,那么unordered_{set/map}会退化为链表。
timing wheel用boost::circular_buffer实现,其中每个Bucket元素是个hash set of EntryPtr。
在构造函数中,注册每秒的回调(EventLoop::runEvery()注册EchoServer::onTimer()),然后把timing wheel设为适当的大小。
EchoServer::EchoServer(EventLoop* loop,const InetAddress& listenAddr,int idleSeconds):server_(loop, listenAddr, "EchoServer"),connectionBuckets_(idleSeconds)
{server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));server_.setMessageCallback(std::bind(&EchoServer::onMessage, this,_1, _2,_3));loop->runEvery(1.0, std::bind(&EchoServer::onTimer, this));connectionBuckets_.resize(idleSeconds); //固定大小
}
其中,EchoServer::onTimer()的实现只有一行:往队尾添加一个空的Bucket,这样circular_buffer会自动弹出队首的Bucket,并析构之。在析构Bucket的时候,会依次析构其中的EntryPtr对象,这样Entry的引用计数就不用我们去操心,C++的值语意会帮我们搞定一切。
void EchoServer::onTimer()
{connectionBuckets_.push_back(Bucket());
}
在连接建立时,创建一个Entry对象,把它放到timing wheel的队尾。另外,我们还需要把Entry的弱引用保存到TcpConnection的context里,因为在收到数据的时候还要用到Entry。(思考题:如果TcpConnection::setContext保存的是强引用EntryPtr,会出现什么情况?)
void EchoServer::onConnection(const TcpConnectionPtr &conn)
{LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "<< conn->localAddress().toIpPort() << " is "<< (conn->connected() ? "UP":"DOWN");if(conn->connected()){EntryPtr entry(new Entry(conn));connectionBuckets_.back().insert(entry);dumpConnectionBuckets();WeakEntryPtr weakEntry(entry);conn->setContext(weakEntry);}else{assert(!conn->getContext().empty());WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));LOG_DEBUG << "Entry use_count = " << weakEntry.use_count();}
}
在收到消息时,从TcpConnection的context中取出Entry的弱引用,把它提升为强引用EntryPtr,然后放到当前的timing wheel队尾。(思考题:为什么要把Entry作为TcpConnection的context保存,如果这里再创建一个新的Entry会有什么后果?)
void EchoServer::onMessage(const TcpConnectionPtr &conn,Buffer *buf,Timestamp time)
{string msg(buf->retrieveAllAsString());LOG_INFO << conn->name() << " echo " << msg.size()<< " bytes at " << time.toString();conn->send(msg);assert(!conn->getContext().empty());WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));EntryPtr entry(weakEntry.lock());if(entry){connectionBuckets_.back().insert(entry);dumpConnectionBuckets();}
}
然后呢?没有然后了,程序已经完成了我们想要的功能。(完整的代码会调用dumpConnectionBuckets()来打印circular_buffer变化的情况,运行一下即可理解。)
希望本节内容有助于你理解shared_ptr和weak_ptr的引用计数。
-------分隔线----------------------------------start---------
笔记:
// echoserver.h
#ifndef MUDUO_EXAMPLES_IDLECONNECTION_ECHO_H
#define MUDUO_EXAMPLES_IDLECONNECTION_ECHO_H#include "muduo/net/TcpServer.h"#include <unordered_set>#include <boost/circular_buffer.hpp>// RFC 862
class EchoServer
{
public:EchoServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr,int idleSeconds);void start();private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp time);void onTimer();void dumpConnectionBuckets() const;typedef std::weak_ptr<muduo::net::TcpConnection> WeakTcpConnectionPtr;struct Entry: public muduo::copyable{explicit Entry(const WeakTcpConnectionPtr& weakConn):weakConn_(weakConn){}~Entry(){muduo::net::TcpConnectionPtr conn = weakConn_.lock();if(conn){conn->shutdown();}}WeakTcpConnectionPtr weakConn_;};typedef std::shared_ptr<Entry> EntryPtr;typedef std::weak_ptr<Entry> WeakEntryPtr;typedef std::unordered_set<EntryPtr> Bucket;typedef boost::circular_buffer<Bucket> WeakConnectionList;muduo::net::TcpServer server_;WeakConnectionList connectionBuckets_;
};#endif // MUDUO_EXAMPLES_IDLECONNECTION_ECHO_H
echoserver.cpp文件
// echoserver.cpp
#include "echoserver.h"#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include <assert.h>
#include <stdio.h>using namespace muduo;
using namespace muduo::net;EchoServer::EchoServer(EventLoop* loop,const InetAddress& listenAddr,int idleSeconds):server_(loop, listenAddr, "EchoServer"),connectionBuckets_(idleSeconds)
{server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));server_.setMessageCallback(std::bind(&EchoServer::onMessage, this,_1, _2,_3));loop->runEvery(1.0, std::bind(&EchoServer::onTimer, this));connectionBuckets_.resize(idleSeconds); //固定大小dumpConnectionBuckets();
}void EchoServer::start()
{server_.start();
}void EchoServer::onConnection(const TcpConnectionPtr &conn)
{LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "<< conn->localAddress().toIpPort() << " is "<< (conn->connected() ? "UP":"DOWN");if(conn->connected()){EntryPtr entry(new Entry(conn));connectionBuckets_.back().insert(entry);dumpConnectionBuckets();WeakEntryPtr weakEntry(entry);conn->setContext(weakEntry);}else{assert(!conn->getContext().empty());WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));LOG_DEBUG << "Entry use_count = " << weakEntry.use_count();}
}void EchoServer::onMessage(const TcpConnectionPtr &conn,Buffer *buf,Timestamp time)
{string msg(buf->retrieveAllAsString());LOG_INFO << conn->name() << " echo " << msg.size()<< " bytes at " << time.toString();conn->send(msg);assert(!conn->getContext().empty());WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));EntryPtr entry(weakEntry.lock());if(entry){connectionBuckets_.back().insert(entry);dumpConnectionBuckets();}
}void EchoServer::onTimer()
{printf("onTimer========== connectionBuckets_.size = %zd\n", connectionBuckets_.size());// boost::circular_buffer 是一个固定容量的环形缓冲区,当容量已满时:添加新元素,但会移除第一个元素connectionBuckets_.push_back(Bucket()); //为什么这个数量没有增加呢? 原因:resize(idleSeconds); 固定大小dumpConnectionBuckets();
}void EchoServer::dumpConnectionBuckets() const
{LOG_INFO << "size = " << connectionBuckets_.size();int idx = 0;for(WeakConnectionList::const_iterator bucketI = connectionBuckets_.begin();bucketI != connectionBuckets_.end();++bucketI, ++idx){const Bucket& bucket = *bucketI;printf("[%d] len = %zd : ", idx, bucket.size());for(const auto& it: bucket){bool connectionDead = it->weakConn_.expired();printf(":%p(%ld)%s, ", get_pointer(it), it.use_count(),connectionDead ? "DEAD": "");}puts("");}
}
主函数文件
// main.cpp
#include "echoserver.h"
#include <stdio.h>#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"using namespace muduo;
using namespace muduo::net;void testHash()
{std::hash<std::shared_ptr<int> > h;std::shared_ptr<int> x1(new int(10));std::shared_ptr<int> x2(new int(10));h(x1);printf("testHash=====1========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());assert(h(x1) != h(x2));printf("testHash=====2========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());x1 = x2;printf("testHash=====3========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());assert(h(x1) == h(x2));printf("testHash=====4========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());x1.reset();printf("testHash=====5========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());assert(h(x1) != h(x2));printf("testHash=====6========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());x2.reset();printf("testHash=====7========x1.use_count = %ld x2.use_count = %ld\n", x1.use_count(), x2.use_count());assert(h(x1) == h(x2));
}int main(int argc, char* argv[])
{testHash();EventLoop loop;InetAddress listenAddr(2007);int idleSeconds = 10;if(argc > 1){idleSeconds = atoi(argv[1]);}LOG_INFO << "pid = " << getpid() << ", idle seconds = " << idleSeconds;EchoServer server(&loop, listenAddr, idleSeconds);server.start();loop.loop();return 0;
}
设置运行参数

编译运行启动服务端

客户端用telnet连接测试

服务端日志输出
testHash=====1========x1.use_count = 1 x2.use_count = 1
testHash=====2========x1.use_count = 1 x2.use_count = 1
testHash=====3========x1.use_count = 2 x2.use_count = 2
testHash=====4========x1.use_count = 2 x2.use_count = 2
testHash=====5========x1.use_count = 0 x2.use_count = 1
testHash=====6========x1.use_count = 0 x2.use_count = 1
testHash=====7========x1.use_count = 0 x2.use_count = 0
20251024 02:21:28.445280Z 15119 INFO pid = 15119, idle seconds = 8 - main.cpp:44
20251024 02:21:28.445325Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:29.446462Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
20251024 02:21:30.257769Z 15119 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2007#1] from 127.0.0.1:40256 - TcpServer.cc:80
20251024 02:21:30.257800Z 15119 INFO EchoServer - 127.0.0.1:40256 -> 127.0.0.1:2007 is UP - echoserver.cpp:34
20251024 02:21:30.257809Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 1 : :0x5555557a9840(2),
onTimer========== connectionBuckets_.size = 8
20251024 02:21:30.457989Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 1 : :0x5555557a9840(1),
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:31.460583Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 1 : :0x5555557a9840(1),
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:32.494507Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 1 : :0x5555557a9840(1),
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:33.502792Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 1 : :0x5555557a9840(1),
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:34.503382Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 1 : :0x5555557a9840(1),
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
20251024 02:21:34.857894Z 15119 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2007#2] from 127.0.0.1:40258 - TcpServer.cc:80
20251024 02:21:34.857914Z 15119 INFO EchoServer - 127.0.0.1:40258 -> 127.0.0.1:2007 is UP - echoserver.cpp:34
20251024 02:21:34.857924Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 1 : :0x5555557a9840(1),
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 1 : :0x5555557aa570(2),
onTimer========== connectionBuckets_.size = 8
20251024 02:21:35.509719Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 1 : :0x5555557a9840(1),
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 1 : :0x5555557aa570(1),
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:36.522664Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 1 : :0x5555557a9840(1),
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 1 : :0x5555557aa570(1),
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:37.542962Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 1 : :0x5555557aa570(1),
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
20251024 02:21:37.543102Z 15119 INFO EchoServer - 127.0.0.1:40256 -> 127.0.0.1:2007 is DOWN - echoserver.cpp:34
20251024 02:21:37.543112Z 15119 INFO TcpServer::removeConnectionInLoop [EchoServer] - connection EchoServer-0.0.0.0:2007#1 - TcpServer.cc:109
onTimer========== connectionBuckets_.size = 8
20251024 02:21:38.543157Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 1 : :0x5555557aa570(1),
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:39.544932Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 1 : :0x5555557aa570(1),
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:40.551486Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 1 : :0x5555557aa570(1),
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:41.567855Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 1 : :0x5555557aa570(1),
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:42.597365Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
20251024 02:21:42.597614Z 15119 INFO EchoServer - 127.0.0.1:40258 -> 127.0.0.1:2007 is DOWN - echoserver.cpp:34
20251024 02:21:42.597623Z 15119 INFO TcpServer::removeConnectionInLoop [EchoServer] - connection EchoServer-0.0.0.0:2007#2 - TcpServer.cc:109
onTimer========== connectionBuckets_.size = 8
20251024 02:21:43.598530Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:44.598588Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
onTimer========== connectionBuckets_.size = 8
20251024 02:21:45.600316Z 15119 INFO size = 8 - echoserver.cpp:81
[0] len = 0 :
[1] len = 0 :
[2] len = 0 :
[3] len = 0 :
[4] len = 0 :
[5] len = 0 :
[6] len = 0 :
[7] len = 0 :
-------分隔线----------------------------------end---------
改进
在现在的实现中,每次收到消息都会往队尾添加EntryPtr(当然,hash set会帮我们去重(deduplication))。一个简单的改进措施是,在TcpConnection里保存“最后一次往队尾添加引用时的tail位置”,收到消息时先检查tail是否变化,若无变化则不重复添加EntryPtr,若有变化则把EntryPtr从旧的Bucket移到当前队尾Bucket。这样或许能提高空间和时间效率。以上改进留作练习。
另外一个思路是“选择排序”:使用链表将TcpConnection串起来,TcpConnection每次收到消息就把自己移到链表末尾,这样链表是按接收时间先后排序的。再用一个定时器定期从链表前端查找并踢掉超时的连接。代码示例位于同一目录。
-------分隔线----------------------------------start---------
笔记:
// sortedlist.cpp
#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <list>
#include <stdio.h>
#include <unistd.h>using namespace muduo;
using namespace muduo::net;// RFC 862
class EchoServer
{
public:
EchoServer(EventLoop* loop,
const InetAddress& listenAddr,
int idleSeconds);void start()
{server_.start();
}private:
void onConnection(const TcpConnectionPtr& conn);void onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp time);void onTimer();void dumpConnectionList() const;typedef std::weak_ptr<TcpConnection> WeakTcpConnectionPtr;
typedef std::list<WeakTcpConnectionPtr> WeakConnectionList;struct Node: public muduo::copyable
{
Timestamp lastReceiveTime;
WeakConnectionList::iterator position;
};TcpServer server_;
int idleSeconds_;
WeakConnectionList connectionList_;
};EchoServer::EchoServer(EventLoop *loop,
const InetAddress& listenAddr,
int idleSeconds)
:server_(loop, listenAddr, "EchoServer"),
idleSeconds_(idleSeconds)
{server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));loop->runEvery(1.0, std::bind(&EchoServer::onTimer, this));dumpConnectionList();
}void EchoServer::onConnection(const TcpConnectionPtr &conn)
{LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "<< conn->localAddress().toIpPort() << " is "<< (conn->connected() ? "UP":"DOWN");if(conn->connected()){Node node;node.lastReceiveTime = Timestamp::now();connectionList_.push_back(conn);node.position = --connectionList_.end();conn->setContext(node);}else{assert(!conn->getContext().empty());const Node& node = boost::any_cast<const Node&>(conn->getContext());connectionList_.erase(node.position);}dumpConnectionList();
}void EchoServer::onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{string msg(buf->retrieveAllAsString());LOG_INFO << conn->name() << " echo " << msg.size()<< " bytes at " << time.toString();conn->send(msg);assert(!conn->getContext().empty());Node* node = boost::any_cast<Node>(conn->getMutableContext());node->lastReceiveTime = time;connectionList_.splice(connectionList_.end(), connectionList_, node->position);assert(node->position == --connectionList_.end());dumpConnectionList();
}void EchoServer::onTimer()
{dumpConnectionList();Timestamp now = Timestamp::now();for(WeakConnectionList::iterator it = connectionList_.begin();it != connectionList_.end();){TcpConnectionPtr conn = it->lock();if(conn){Node *n = boost::any_cast<Node>(conn->getMutableContext());double age = timeDifference(now, n->lastReceiveTime);if(age > idleSeconds_){if(conn->connected()){conn->shutdown();LOG_INFO << "shutting down " << conn->name();conn->forceCloseWithDelay(3.5); //> round trip of the whole Internet.}}else if (age < 0){LOG_WARN << "Time jump";n->lastReceiveTime = now;}else{break;}++it;}else{LOG_WARN << "Expired";it = connectionList_.erase(it);}}
}void EchoServer::dumpConnectionList() const
{LOG_INFO << "size = " << connectionList_.size();for(WeakConnectionList::const_iterator it = connectionList_.begin();it != connectionList_.end(); ++it){TcpConnectionPtr conn = it->lock();if(conn){printf("conn %p\n", get_pointer(conn));const Node& n = boost::any_cast<const Node&>(conn->getContext());printf(" time %s\n", n.lastReceiveTime.toString().c_str());}else{printf("expired\n");}}
}int main(int argc, char* argv[])
{EventLoop loop;InetAddress listenAddr(2007);int idleSeconds = 10;if(argc > 1){idleSeconds = atoi(argv[1]);}LOG_INFO << "pid = " << getpid() << ", idle seconds = " << idleSeconds;EchoServer server(&loop, listenAddr, idleSeconds);server.start();loop.loop();return 0;
}
编译运行:

客户端连接,8秒后自动关闭

服务端每隔一秒检测一次
scott@ubuntu1804:~$ 20251024 03:15:43.196787Z 27261 INFO pid = 27261, idle seconds = 8 - sortedlist.cpp:173
20251024 03:15:43.196820Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:44.203325Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:45.203389Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:46.203558Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:47.211458Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:48.214835Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:49.228495Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:50.229265Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:51.230211Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:52.230619Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:53.231671Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:54.250130Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:55.253240Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:15:56.151559Z 27261 INFO TcpServer::newConnection [EchoServer] - new connection [EchoServer-0.0.0.0:2007#1] from 127.0.0.1:54680 - TcpServer.cc:80
20251024 03:15:56.151580Z 27261 INFO EchoServer - 127.0.0.1:54680 -> 127.0.0.1:2007 is UP - sortedlist.cpp:66
20251024 03:15:56.151586Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:15:56.256738Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:15:57.262092Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:15:58.262748Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:15:59.273358Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:00.273776Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:01.275951Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:02.279767Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:03.280744Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:04.281837Z 27261 INFO size = 1 - sortedlist.cpp:146
conn 0x55555579fa60time 1761275756.151584
20251024 03:16:04.281915Z 27261 INFO shutting down EchoServer-0.0.0.0:2007#1 - sortedlist.cpp:121
20251024 03:16:04.282021Z 27261 INFO EchoServer - 127.0.0.1:54680 -> 127.0.0.1:2007 is DOWN - sortedlist.cpp:66
20251024 03:16:04.282028Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:16:04.282032Z 27261 INFO TcpServer::removeConnectionInLoop [EchoServer] - connection EchoServer-0.0.0.0:2007#1 - TcpServer.cc:109
20251024 03:16:05.282953Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:16:06.283008Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:16:07.283278Z 27261 INFO size = 0 - sortedlist.cpp:146
20251024 03:16:08.290867Z 27261 INFO size = 0 - sortedlist.cpp:146
-------分隔线----------------------------------end---------
7.11 简单的消息广播服务
本节介绍用muduo实现一个简单的topic-based消息广播服务,这其实是“聊天室”的一个简单扩展,不过聊天的不是人,而是分布式系统中的程序。本节的代码见examples/hub。
在分布式系统中,除了常用的end-to-end通信,还有一对多的广播通信。一提到“广播”,或许会让人联想到IP多播或IP组播,这不是本节的主题。本节将要谈的是基于TCP协议的应用层广播。示意图如图7-49所示。

图7-49
图7-49中的圆角矩形代表程序,“Hub”是一个服务程序,不是网络集线器,它起到类似集线器的作用,故而得名。Publisher和Subscriber通过TCP协议与Hub程序通信。Publisher把消息发到某个topic上,Subscriber订阅该topic,然后就能收到消息。即Publisher借助Hub把消息广播给了一个或多个Subscriber。这种pub/sub结构的好处在于可以增加多个Subscriber而不用修改Publisher,一定程度上实现了“解耦”(也可以看成分布式的Observer pattern)。由于走的是TCP协议,广播是基本可靠的,这里的“可靠”指的是“比UDP可靠”,不是“完全可靠”。[注37:“可靠广播、原子广播”在分布式系统中有重大意义,是以replicated state machine方式实现可靠的分布式服务的基础。“可靠广播”涉及consensus算法超出了本书的范围。](思考:如何避免Hub成为single point of failure?)
为了避免串扰(cross-talk),每个topic在同一时间只应该有一个Publisher,Hub不提供compare-and-swap操作。
应用层广播在分布式系统中用处很大,这里略举几例。
体育比分转播 有8片比赛场地正在进行羽毛球比赛,每个场地的计分程序把当前比分发送到各自的topic上(第1号场地发送到court1,第2号场地发送到court2,依此类推)。需要用到比分的程序(赛场的大屏幕显示、网上比分转播等)自己订阅感兴趣的topic,就能及时收到最新比分数据。由于本节实现的不是100%可靠广播,那么消息应该是snapshot,而不是delta。(换句话说,消息的内容是“现在是几比几”,而不是“刚才谁得分”。)
负载监控 每台机器上运行一个监控程序,周期性地把本机当前负载(CPU、网络、磁盘、温度)publish到以hostname命名的topic上,这样需要用到这些数据的程序只要在Hub订阅相应的topic就能获得数据,无须与多台机器直接打交道。(为了可靠起见,监控程序发送的消息中应该包含时间戳,这样能防止过期(stale)数据,甚至一定程度上起到心跳的作用。)沿着这个思路,分布式系统中的服务程序也可以把自己的当前负载发布到Hub上,供load balancer和monitor取用。
协议
为了简单起见,muduo的Hub示例采用以“\r\n”分界的文本协议,这样用telnet就能测试Hub。协议只有以下三个命令:
● sub<topic>\r\n
该命令表示订阅<topic>,以后该topic有任何更新都会发给这个TCP连接。在sub的时候,Hub会把该<topic>上最近的消息发给此Subscriber。
● unsub<topic>\r\n
该命令表示退订<topic>。
● pub<topic>\r\n
<content>\r\n往<topic>发送消息,内容为<content>。所有订阅了此<topic>的Subscriber会收到同样的消息“pub<topic>\r\n<content>\r\n”。
代码
muduo示例中的Hub分为几个部分:
● Hub服务程序,负责一对多的消息分发。它会记住每个client订阅了哪些topic,只把消息发给特定的订阅者。代码参见examples/hub/hub.cc。
● pubsub库,为了方便编写使用Hub服务的应用程序,我写了一个简单的client library,用来和Hub打交道。这个library可以订阅topic、退订topic、往指定的topic发布消息。代码参见examples/hub/pubsub.{h,cc}。
● sub示例程序,这个命令行程序订阅一个或多个topic,然后等待Hub的数据。代码参见examples/hub/sub.cc。
● pub示例程序,这个命令行程序往某个topic发布一条消息,消息内容由命令行参数指定。代码参见examples/hub/pub.cc。
一个程序可以既是Publisher又是Subscriber,而且pubsub库只用一个TCP连接(这样failover比较简便)。使用范例如下所示。
1.开启4个命令行窗口。
2.在第一个窗口运行$ hub 9999。
3.在第二个窗口运行$ sub 127.0.0.1:9999 mytopic。
4.在第三个窗口运行$ sub 127.0.0.1:9999 mytopic court。
5.在第四个窗口运行$ pub 127.0.0.1:9999 mytopic "Hello world.",这时第二、三号窗口都会打印“mytopic:Hello world.”,表明收到了mytopic这个主题上的消息。
6.在第四个窗口运行$ pub 127.0.0.1:9999 court "13:11",这时第三号窗口会打印“court:13:11”,表明收到了court这个主题上的消息。第二号窗口没有订阅此消息,故无输出。
借助这个简单的pub/sub机制,还可以做很多有意思的事情。比如把分布式系统中的程序的一部分end-to-end通信改为通过pub/sub来做(例如,原来是A向B发一个SOAP request,B通过同一个TCP连接发回response(分析二者的通信只能通过查看log或用tcpdump截获);现在是A往topic_a_to_b上发布request,B在topic_b_to_a上发response),这样多挂一个monitoring subscriber就能轻易地查看通信双方的沟通情况,很容易做状态监控与trouble shooting。
-------分隔线----------------------------------start---------
笔记:
// ch7_codec.h
#ifndef MUDUO_EXAMPLES_HUB_CODEC_H
#define MUDUO_EXAMPLES_HUB_CODEC_H
//内部头文件
#include "muduo/base/Types.h"
#include "muduo/net/Buffer.h"namespace pubsub {using muduo::string;enum ParseResult{kError,kSuccess,kContinue,
};ParseResult ParseMessage(muduo::net::Buffer* buf,string* cmd,string* topic,string* content);
} // namespace pubsub#endif //MUDUO_EXAMPLES_HUB_CODEC_H
ch7_codec.cpp 文件:
// ch7_codec.cpp
#include "ch7_codec.h"using namespace muduo;
using namespace muduo::net;
using namespace pubsub;ParseResult pubsub::ParseMessage(Buffer* buf,
string* cmd,
string* topic,
string* content)
{ParseResult result = kError;const char* crlf = buf->findCRLF();if(crlf){const char* space = std::find(buf->peek(), crlf, ' ');if(space != crlf){cmd->assign(buf->peek(), space);topic->assign(space+1, crlf);if(*cmd == "pub"){const char* start = crlf + 2;crlf = buf->findCRLF(start);if(crlf){content->assign(start, crlf);buf->retrieveUntil(crlf+2);result = kSuccess;}else{result = kContinue;}}else{buf->retrieveUntil(crlf+2);result = kSuccess;}}else{result = kError;}}else{result = kContinue;}return result;
}
ch7_pubsub.h 文件:
// ch7_pubsub.h
#ifndef MUDUO_EXAMPLES_HUB_PUBSUB_H
#define MUDUO_EXAMPLES_HUB_PUBSUB_H#include "muduo/net/TcpClient.h"namespace pubsub {using muduo::string;// FIXME: dtor is not thread safeclass PubSubClient: muduo::noncopyable{public:typedef std::function<void (PubSubClient*)> ConnectionCallback;typedef std::function<void (const string& topic,const string& content,muduo::Timestamp)> SubscribeCallback;PubSubClient(muduo::net::EventLoop* loop,const muduo::net::InetAddress& hubAddr,const string& name);void start();void stop();bool connected() const;void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }bool subscribe(const string& topic, const SubscribeCallback& cb); //订阅void unsubscribe(const string& topic);//退订bool publish(const string& topic, const string& content);//发布private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp);bool send(const string& message);muduo::net::TcpClient client_;muduo::net::TcpConnectionPtr conn_;ConnectionCallback connectionCallback_;SubscribeCallback subscribeCallback_;
};
} // namespace pubsub#endif //MUDUO_EXAMPLES_HUB_PUBSUB_H
ch7_pubsub.cpp 文件:
// ch7_pubsub.cpp pub示例程序,这个命令行程序往某个topic发布一条消息,消息内容由命令行参数指定。
#include "ch7_pubsub.h"
#include "ch7_codec.h"using namespace muduo;
using namespace muduo::net;
using namespace pubsub;PubSubClient::PubSubClient(EventLoop* loop,const InetAddress& hubAddr,const string& name):client_(loop, hubAddr,name)
{//FIXME:析构函数不是线程安全的client_.setConnectionCallback(std::bind(&PubSubClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&PubSubClient::onMessage, this, _1, _2, _3));
}void PubSubClient::start()
{client_.connect();
}void PubSubClient::stop()
{client_.disconnect();
}bool PubSubClient::connected() const
{return conn_ && conn_->connected();
}
//订阅
bool PubSubClient::subscribe(const string &topic, const SubscribeCallback &cb)
{string message = "sub " + topic + "\r\n";subscribeCallback_ = cb;return send(message);
}
//退订
void PubSubClient::unsubscribe(const string& topic)
{string message = "unsub " + topic + "\r\n";send(message);
}
//发布
bool PubSubClient::publish(const string &topic, const string &content)
{string message = "pub " + topic + "\r\n" + content + "\r\n";return send(message);
}void PubSubClient::onConnection(const muduo::net::TcpConnectionPtr &conn)
{if(conn->connected()){conn_ = conn;// FIXME: re-sub}else{conn_.reset();}if(connectionCallback_){connectionCallback_(this);}
}void PubSubClient::onMessage(const TcpConnectionPtr &conn,Buffer *buf,Timestamp receiveTime)
{ParseResult result = kSuccess;while(result == kSuccess){string cmd;string topic;string content;result = ParseMessage(buf, &cmd, &topic, &content);if(result == kSuccess){if(cmd == "pub" && subscribeCallback_){subscribeCallback_(topic, content, receiveTime);}}else if(result == kError){conn->shutdown();}}
}bool PubSubClient::send(const string& message)
{bool succeed = false;if(conn_ && conn_->connected()){conn_->send(message);succeed = true;}return succeed;
}
ch7_hub.cpp 文件 编译生成 hub
// ch7_hub.cpp Hub服务程序,负责一对多的消息分发。
#include "ch7_codec.h"#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include <map>
#include <set>
#include <stdio.h>using namespace muduo;
using namespace muduo::net;namespace pubsub
{
typedef std::set<string> ConnectionSubscription;
class Topic: public muduo::copyable
{
public:Topic(const string& topic):topic_(topic){}void add(const TcpConnectionPtr& conn){audiences_.insert(conn);if(lastPubTime_.valid()){conn->send(makeMessage());}}void remove(const TcpConnectionPtr& conn){audiences_.erase(conn);}void publish(const string& content, Timestamp time){content_ = content;lastPubTime_ = time;string message = makeMessage();for(std::set<TcpConnectionPtr>::iterator it = audiences_.begin();it != audiences_.end();++it){(*it)->send(message);}}
private:string makeMessage(){return "pub " + topic_ + "\r\n" + content_ + "\r\n";}string topic_;string content_;Timestamp lastPubTime_;std::set<TcpConnectionPtr> audiences_;
};class PubSubServer: noncopyable
{
public:PubSubServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr):loop_(loop),server_(loop, listenAddr, "PubSubServer"){server_.setConnectionCallback(std::bind(&PubSubServer::onConnection, this, _1));server_.setMessageCallback(std::bind(&PubSubServer::onMessage, this, _1, _2, _3));loop_->runEvery(1.0, std::bind(&PubSubServer::timePublish, this));}void start(){server_.start();}private:void onConnection(const TcpConnectionPtr& conn){if(conn->connected()){conn->setContext(ConnectionSubscription());}else{const ConnectionSubscription& connSub= boost::any_cast<const ConnectionSubscription&>(conn->getContext());//微妙的:doUnsubscribe会删除*它,所以在调用之前增加。for(ConnectionSubscription::const_iterator it = connSub.begin();it != connSub.end();){doUnsubscribe(conn, *it++);}}}void onMessage(const TcpConnectionPtr& conn,Buffer *buf,Timestamp receiveTime){ParseResult result = kSuccess;while(result == kSuccess){string cmd;string topic;string content;result = ParseMessage(buf, &cmd, &topic, &content);if(result == kSuccess){if(cmd == "pub"){doPublish(conn->name(), topic, content, receiveTime);}else if(cmd == "sub"){LOG_INFO << conn->name() << " subscribes " << topic;doSubscribe(conn, topic);}else if(cmd == "unsub"){doUnsubscribe(conn, topic);}else{conn->shutdown();result = kError;}}else if(result == kError){conn->shutdown();}}}void timePublish(){Timestamp now = Timestamp::now();doPublish("internal", "utc_time", now.toFormattedString(), now);}void doSubscribe(const TcpConnectionPtr& conn,const string& topic){ConnectionSubscription* connSub= boost::any_cast<ConnectionSubscription>(conn->getMutableContext());connSub->insert(topic);getTopic(topic).add(conn);}void doUnsubscribe(const TcpConnectionPtr& conn,const string& topic){LOG_INFO << conn->name() << " unsubscribes " << topic;getTopic(topic).remove(conn);//主题可能是要销毁的主题,所以擦除后不要使用它。ConnectionSubscription* connSub= boost::any_cast<ConnectionSubscription>(conn->getMutableContext());connSub->erase(topic);}void doPublish(const string& source,const string& topic,const string& content,Timestamp time){getTopic(topic).publish(content, time);}Topic& getTopic(const string& topic){std::map<string, Topic>::iterator it = topics_.find(topic);if(it == topics_.end()){it = topics_.insert(make_pair(topic, Topic(topic))).first;}return it->second;}EventLoop* loop_;TcpServer server_;std::map<string, Topic> topics_;
};
} // namespace pubsubint main(int argc, char *argv[])
{if(argc > 1){uint16_t port = static_cast<uint16_t>(atoi(argv[1]));EventLoop loop;if(argc > 2){int inspectPort = atoi(argv[2]);printf("main=========inspectPort=%d\n", inspectPort);}pubsub::PubSubServer server(&loop, InetAddress(port));server.start();loop.loop();}else{printf("Usage: %s pubsub_port [inspect_port\n", argv[0]);}return 0;
}
ch7_pub.cpp 文件 编译生成 pub
// ch7_pub.cpp pub示例程序,这个命令行程序往某个topic发布一条消息,消息内容由命令行参数指定。
#include "pubsub.h"
#include "muduo/base/ProcessInfo.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThread.h"#include <iostream>
#include <stdio.h>using namespace muduo;
using namespace muduo::net;
using namespace pubsub;EventLoop* g_loop = nullptr;
string g_topic;
string g_content;void connection(PubSubClient* client)
{if(client->connected()){client->publish(g_topic, g_content);client->stop();}else{g_loop->quit();}
}int main(int argc, char* argv[])
{if(argc == 4){string hostport = argv[1];size_t colon = hostport.find(':');if(colon != string::npos){string hostip = hostport.substr(0, colon);uint16_t port = static_cast<uint16_t>(atoi(hostport.c_str() + colon+1));g_topic = argv[2];g_content = argv[3];string name = ProcessInfo::username() + "@"+ProcessInfo::hostname();name += ":" + ProcessInfo::pidString();if(g_content == "-"){EventLoopThread loopThread;g_loop = loopThread.startLoop();PubSubClient client(g_loop, InetAddress(hostip, port), name);client.start();string line;while(getline(std::cin, line)){client.publish(g_topic, line);}client.stop();CurrentThread::sleepUsec(1000*1000);}else{EventLoop loop;g_loop = &loop;PubSubClient client(g_loop, InetAddress(hostip, port), name);client.setConnectionCallback(connection);client.start();loop.loop();}}else{printf("Usage: %s hub_ip:port topic content\n", argv[0]);}}else{printf("Usage:%s hub_ip:port topic content\n""Read contents from stdio:\n"" %s hub_ip:port topic -\n", argv[0], argv[0]);}return 0;
}
ch7_sub.cpp 文件 编译生成 sub
// ch7_sub.cpp sub示例程序,这个命令行程序订阅一个或多个topic,然后等待Hub的数据。
#include "pubsub.h"
#include "muduo/base/ProcessInfo.h"
#include "muduo/net/EventLoop.h"#include <vector>
#include <stdio.h>using namespace muduo;
using namespace muduo::net;
using namespace pubsub;EventLoop* g_loop = nullptr;
std::vector<string> g_topics;void subscription(const string& topic, const string& content, Timestamp)
{printf("%s: %s\n", topic.c_str(), content.c_str());
}void connection(PubSubClient* client)
{if(client->connected()){for(std::vector<string>::iterator it = g_topics.begin();it != g_topics.end(); ++it){client->subscribe(*it, subscription);}}else{g_loop->quit();}
}int main(int argc, char* argv[])
{if(argc > 2){string hostport = argv[1];size_t colon = hostport.find(':');if(colon != string::npos){string hostip = hostport.substr(0, colon);uint16_t port = static_cast<uint16_t>(atoi(hostport.c_str() + colon + 1));for(int i = 2; i < argc; ++i){g_topics.push_back(argv[i]);}EventLoop loop;g_loop = &loop;string name = ProcessInfo::username() + "@" + ProcessInfo::hostname();name += ":" + ProcessInfo::pidString();PubSubClient client(&loop, InetAddress(hostip, port), name);client.setConnectionCallback(connection);client.start();loop.loop();}else{printf("Usage: %s hub_ip:port topic [topic ...]\n", argv[0]);}}else{printf("Usage: %s hub_ip:port topic [topic ...]\n", argv[0]);}return 0;
}
测试:
第一个终端执行 ./hub 9999

第2个终端执行命令 ./sub 127.0.0.1:9999 mytopic

第3个终端执行命令: ./sub 127.0.0.1:9999 mytopic court

第4个终端执行命令:./pub 127.0.0.1:9999 mytopic "Hellow,world"

-------分隔线----------------------------------end---------
多线程的高效广播
在本节这个例子中,Hub是个单线程程序。假如有一条消息要广播给1000个订阅者,那么只能一个一个地发,第1个订阅者收到消息和第1000个订阅者收到消息的时差可以长达若干毫秒。那么,有没有办法提高速度、降低延迟呢?我们当然会想到用多线程。但是简单的办法并不一定能奏效,因为一个全局锁就把多线程程序退化为单线程执行。为了真正提速,我想到了用thread local的办法,比如把1000个订阅者分给4个线程,每个线程的操作基本都是无锁的,这样可以做到并行地发送消息。示例代码见examples/asio/chat/server_threaded_highperformance.cc。
7.12“串并转换”连接服务器及其自动化测试
本节介绍如何使用test harness来测试一个具有内部逻辑的网络服务程序。这是一个既扮演服务端,又扮演客户端的网络程序。代码见examples/multiplexer。
云风在他的博客中提到了网游连接服务器的功能需求[注38:在http://blog.codingnow.com/2010/11/go_prime.html 搜“练手项目”],我用C++初步实现了这些需求,并为之编写了配套的自动化test harness,作为muduo网络库的示例。
注意:本节呈现的代码仅仅实现了基本的功能需求,没有考虑安全性,也没有特别优化性能,不适合用作真正的放在公网上运行的网游连接服务器。
功能需求
这个连接服务器把多个客户连接汇聚为一个内部TCP连接,起到“数据串并转换”的作用,让backend的逻辑服务器专心处理业务,而无须顾及多连接的并发性。系统的框图如图7-50所示。

图7-50
这个连接服务器的作用与数字电路中的数据选择器(multiplexer)类似(见图7-51),所以我把它命名为multiplexer。(其实IO multiplexing也是取的这个意思,让一个thread-of-control能有选择地处理多个IO文件描述符。)

图7-51
(本图取自wikipedia,是public domain版权)
实现
multiplexer的功能需求不复杂,无非是在backend connection和client connec-tions之间倒腾数据。对每个新client connection分配一个新的整数id,如果id用完了,则断开新连接(这样通过控制id的数目就能控制最大连接数)。另外,为了避免id过快地被复用(有可能造成backend串话),multiplexer采用queue来管理free id,每次从队列的头部取id,用完之后放回queue的尾部。具体来说,主要是处理四种事件:
● 当client connection到达或断开时,向backend发出通知。代码见onClient-Connection()。
● 当从client connection收到数据时,把数据连同connection id一同发给back-end。代码见onClientMessage()。
● 当从backend connection收到数据时,辨别数据是发给哪个client connection,并执行相应的转发操作。代码见onBackendMessage()。
● 如果backend connection断开连接,则断开所有client connections(假设client会自动重试)。代码见onBackendConnection()。
由上可见,multiplexer的功能与proxy颇为类似。multiplexer_simple.cc是一个单线程版的实现,借助muduo的IO multiplexing特性,可以方便地处理多个并发连接。多线程版的实现见multiplexer.cc。
在实现的时候有以下两点值得注意。
TcpConnection的id如何存放?当从backend收到数据,如何根据id找到对应的client connection?当从client connection收到数据,如何得知其id?
第一个问题比较好解决,用std::map<int,TcpConnectionPtr>clientConns_保存从id到client connection的映射就行。
第二个问题固然可以用类似的办法解决,但是我想借此介绍一下muduo::net::TcpConnection的context功能。每个TcpConnection都有一个boost::any成员,可由客户代码自由支配(get/set),代码如下。这个boost::any是TcpConnection的con-text,可以用于保存与connection绑定的任意数据(比方说connection id、connection的最后数据到达时间、connection所代表的用户的名字等等)。这样客户代码不必继承TcpConnection就能attach自己的状态,而且也用不着TcpConnectionFactory了(如果允许继承,那么必然要向TcpServer注入此factory)。
class TcpConnection:boost::noncopyable,public boost::enable_shared_from_this<TcpConnection>
{
public:void setContext(const boost::any& context){ context_ = context; }const boost::any& getContent() const{ return context_; }boost::any* getMutableContext(){ return &context_; }// ...
private:// ...boost::any context_;
};typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
对于multiplexer,在onClientConnection()里调用conn->setContext(id),把id存到TcpConnection对象中。onClientMessage()从TcpConnection对象中取得id,连同数据一起发送给backend,完整实现如下:
void onClientMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{if (!conn->getContext().empty()){int id = boost::any_cast<int>(conn->getContext());sendBackendBuffer(id, buf);}else{buf->retrieveAll();// FIXME: error handling}
}
TcpConnection的生命期如何管理?由于client connection是动态创建并销毁的,其生与灭完全由客户决定,如何保证backend想向它发送数据的时候,这个TcpConnection对象还活着?解决思路是用reference counting。当然,不用自己写,用boost::shared_ptr即可。TcpConnection是muduo中唯一默认采用shared_ptr来管理生命期的对象,盖由其动态生命期的本质决定。更多内容请参考第1章。
multiplexer采用二进制协议,如何测试呢?
自动化测试
multiplexer是muduo网络编程示例中第一个具有non-trivial业务逻辑的网络程序,根据§9.7“分布式程序的自动化回归测试”的思路,我为它编写了测试夹具(test harness)。代码见examples/multiplexer/harness/。
这个test harness采用Java编写,用的是Netty网络库。这个test harness要同时扮演clients和backend,也就是既要主动发起连接,也要被动接受连接。而且,test harness与multiplexer的启动顺序是任意的,如何做到这一点请阅读代码。结构如图7-52所示。

图7-52
test harness会把各种event汇聚到一个blocking queue里边,方便编写test case。test case则操纵test harness,发起连接、发送数据、检查收到的数据,例如以下是其中一个test case:testcase/TestOneClientSend.java。
这里的几个test cases都是用Java直接写的,如果有必要,也可以采用Groovy来编写,这样可以在不重启test harness的情况下随时修改、添加test cases。具体做法见笔者的博客《“过家家”版的移动离线计费系统实现》[注39:http://www.cnblogs.com/Solstice/archive/2011/04/22/2024791.html]。
将来的改进
有了这个自动化的test harness,我们可以比较方便且安全地修改(甚至重新设计)multiplexer了。例如:
● 增加“backend发送指令断开client connection”的功能。有了自动化测试,这个新功能可以被单独测试(开发者测试),而不需要真正的backend参与进来。
● 将multiplexer改用多线程重写。有了自动化回归测试,我们不用担心破坏原有的功能,可以放心大胆地重写。而且由于test harness是从外部测试,不是单元测试,重写multiplexer的时候不用动test cases,这样保证了测试的稳定性。另外,这个test harness稍加改进还可以进行stress testing,既可用于验证多线程multiplexer的正确性,亦可对比其相对单线程版的效率提升。
