第7章 muduo编程示例(5)
7.13 socks4a代理服务器
本节介绍用muduo实现一个简单的socks4a代理服务器(examples/socks4a/)。
7.13.1 TCP中继器
在实现socks4a proxy之前,我们先写一个功能更简单的网络程序——TCP中继器(TCP relay),或者叫做穷人的tcpdump(poor man’s tcpdump)。
一般情况下,客户端程序直接连接服务端,如图7-53所示。

图7-53
有时候,我们想在client和server之间放一个中继器(relay),把client与server之间的通信内容记录下来。这时用tcpdump是最方便省事的,但是tcpdump需要root权限,万一拿不到权限呢?穷人有穷人的办法,自己写一个TcpRelay,让client连接TcpRelay,再让TcpRelay连接server,如图7-54中的T型结构,TcpRelay扮演了类似proxy的角色。
有时候,我们想在client和server之间放一个中继器(relay),把client与server之间的通信内容记录下来。这时用tcpdump是最方便省事的,但是tcpdump需要root权限,万一拿不到权限呢?穷人有穷人的办法,自己写一个TcpRelay,让client连接TcpRelay,再让TcpRelay连接server,如图7-54中的T型结构,TcpRelay扮演了类似proxy的角色。

图7-54
TcpRelay是我们自己写的,可以动动手脚。除了记录通信内容外,还可以制造延时,或者故意翻转1 bit数据以模拟router硬件故障。
TcpRelay的功能(业务逻辑)看上去很简单,无非是把连接C上收到的数据发给连接S,同时把连接S上收到的数据发给连接C。但仔细考虑起来,细节其实不那么简单:
1.建立连接。为了真实模拟client,TcpRelay在accept连接C之后才向server发起连接S,那么在S建立起来之前,从C收到数据怎么办?要不要暂存起来?
2.并发连接的管理。图7-54中只画出了一个client,实际上TcpRelay可以服务多个client,左右两边这些并发连接如何管理,如何防止串话(cross talk)?
3.连接断开。client和server都可能主动断开连接。当client主动断开连接C时,TcpRelay应该立刻断开S。当server主动断开连接S时,TcpRelay应立刻断开C。这样才能比较精确地模拟client和server的行为。在关闭连接的一刹那,又有新的client连接进来,复用了刚刚close的fd号码,会不会造成串话?万一client和server几乎同时主动断开连接,TcpRelay如何应对?
4.速度不匹配。如果连接C的带宽是100kB/s,而连接S的带宽是10MB/s,不巧server是个chargen服务,会全速发送数据,那么会不会撑爆TcpRelay的buffer?如何限速?特别是在使用non-blocking IO和level-trigger polling的时候如何限制读取数据的速度?
在看muduo的实现之前,请读者思考:如果用Sockets API来实现TcpRelay,如何解决以上这些问题。(如果真要实现这么一个功能,可以试试splice(2)系统调用。)
如果用传统多线程阻塞IO的方式来实现TcpRelay一点也不难,好处是自动解决了速度不匹配的问题,Python代码如下。这个实现功能上没有问题,但是并发度就高不到哪儿去了。注意以下代码会一个字节一个字节地转发数据,每两个字节之间间隔1ms,可以用于测试网络程序的消息解码功能(codec)是否完善。
#!/usr/bin/pythonimport socket, thread, timelisten_port = 3007
connect_addr = ('localhost', 2007)
sleep_per_byte = 0.0001def forward(source, destination):source_addr = source.getpeername()while True:data = source.recv(4096)if data:for i in data:destination.sendall(i)time.sleep(sleep_per_byte)else:print 'disconnect', source_addrdestination.shutdown(socket.SHUT_WR)breakserversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('', listen_port))
serversocket.listen(5)while True:(clientsocket, address) = serversocket.accept()print 'accepted', addresssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect(connect_addr)print 'connected', sock.getpeername()thread.start_new_thread(forward, (clientsocket, sock))thread.start_new_thread(forward, (sock, clientsocket))
TcpRelay的实现很简单,只有几十行代码(examples/socks4a/tcprelay.cc),主要逻辑都在Tunnel class里(examples/socks4a/tunnel.h)。这个实现很好地解决了前三个问题,第四个问题的解法比较粗暴,用的是HighWaterMarkCallback,如果发送缓冲区堆积的数据大于10MiB就断开连接(更好的办法见§ 8.9.3)。TcpRelay既是服务端,又是客户端,在阅读代码的时候要注意onClientMessage()处理的是从server发来的消息,表示它作为客户端(client)收到的消息,这与前面的multiplexer正好相反。
-------分隔线----------------------------------start---------
笔记:
// ch7_tunnel.h#ifndef MUDUO_EXAMPLES_SOCKS4A_TUNNEL_H
#define MUDUO_EXAMPLES_SOCKS4A_TUNNEL_H#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/InetAddress.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/TcpServer.h"class Tunnel: public std::enable_shared_from_this<Tunnel>,muduo::noncopyable
{
public:Tunnel(muduo::net::EventLoop* loop,const muduo::net::InetAddress& serverAddr,const muduo::net::TcpConnectionPtr& serverConn):client_(loop, serverAddr, serverConn->name()),serverConn_(serverConn){LOG_INFO << "Tunnel " << serverConn->peerAddress().toIpPort()<< " <-> " << serverAddr.toIpPort();}~Tunnel(){LOG_INFO << "~Tunnel";}void setup(){using std::placeholders::_1;using std::placeholders::_2;using std::placeholders::_3;client_.setConnectionCallback(std::bind(&Tunnel::onClientConnection, shared_from_this(), _1));client_.setMessageCallback(std::bind(&Tunnel::onClientMessage, shared_from_this(), _1, _2, _3));serverConn_->setHighWaterMarkCallback(std::bind(&Tunnel::onHighWaterMarkWeak,std::weak_ptr<Tunnel>(shared_from_this()), kServer, _1, _2),1024*1024);}void connect(){client_.connect();}void disconnect(){client_.disconnect();//serverConn_.reset();}private:void teardown(){client_.setConnectionCallback(muduo::net::defaultConnectionCallback);client_.setMessageCallback(muduo::net::defaultMessageCallback);if(serverConn_){serverConn_->setContext(boost::any());serverConn_->shutdown();}clientConn_.reset();}void onClientConnection(const muduo::net::TcpConnectionPtr& conn){using std::placeholders::_1;using std::placeholders::_2;LOG_DEBUG << (conn->connected() ? "UP" : "DOWN");if(conn->connected()){conn->setTcpNoDelay(true);conn->setHighWaterMarkCallback(std::bind(&Tunnel::onHighWaterMarkWeak,std::weak_ptr<Tunnel>(shared_from_this()), kClient, _1, _2),1024*1024);serverConn_->setContext(conn);serverConn_->startRead();clientConn_ = conn;if(serverConn_->inputBuffer()->readableBytes() > 0){conn->send(serverConn_->inputBuffer());}}else{teardown();}}void onClientMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp){LOG_DEBUG << conn->name() << " " << buf->readableBytes();if(serverConn_){serverConn_->send(buf);}else{buf->retrieveAll();abort();}}enum ServerClient{kServer, kClient};void onHighWaterMark(ServerClient which,const muduo::net::TcpConnectionPtr& conn,size_t bytesToSent){using std::placeholders::_1;LOG_INFO << (which == kServer ? "server":"client")<< " onHighWaterMark " << conn->name()<< " bytes " << bytesToSent;if(which == kServer){if(serverConn_->outputBuffer()->readableBytes() > 0){clientConn_->stopRead();serverConn_->setWriteCompleteCallback(std::bind(&Tunnel::onWriteCompleteWeak,std::weak_ptr<Tunnel>(shared_from_this()), kServer, _1));}}else{if(clientConn_->outputBuffer()->readableBytes() > 0){serverConn_->stopRead();clientConn_->setWriteCompleteCallback(std::bind(&Tunnel::onWriteCompleteWeak,std::weak_ptr<Tunnel>(shared_from_this()), kClient, _1));}}}static void onHighWaterMarkWeak(const std::weak_ptr<Tunnel> & wkTunnel,ServerClient which,const muduo::net::TcpConnectionPtr& conn,size_t bytesToSent){std::shared_ptr<Tunnel> tunnel = wkTunnel.lock();if(tunnel){tunnel->onHighWaterMark(which, conn, bytesToSent);}}void onWriteComplete(ServerClient which, const muduo::net::TcpConnectionPtr& conn){LOG_INFO << (which == kServer ? "server":"client")<< " onWriteComplete " << conn->name();if(which == kServer){clientConn_->startRead();serverConn_->setWriteCompleteCallback(muduo::net::WriteCompleteCallback());}else{serverConn_->startRead();clientConn_->setWriteCompleteCallback(muduo::net::WriteCompleteCallback());}}static void onWriteCompleteWeak(const std::weak_ptr<Tunnel>& wkTunnel,ServerClient which,const muduo::net::TcpConnectionPtr& conn){std::shared_ptr<Tunnel> tunnel = wkTunnel.lock();if(tunnel){tunnel->onWriteComplete(which, conn);}}private:muduo::net::TcpClient client_;muduo::net::TcpConnectionPtr serverConn_;muduo::net::TcpConnectionPtr clientConn_;
};
typedef std::shared_ptr<Tunnel> TunnelPtr;#endif // MUDUO_EXAMPLES_SOCKS4A_TUNNEL_H
ch7_balancer.cpp 文件
//ch7_balancer.cpp 简易的负载均衡器。它的核心功能是:
//作为中间代理,将来自多个客户端的连接请求,分发到后端的多个服务器上。
//运行示例: # 语法:./balancer <监听端口> <后端服务器1> <后端服务器2> ...
// . / balancer 8888 127.0.0.1:2000 127.0.0.1 : 2001#include "ch7_tunnel.h"
#include "muduo/base/ThreadLocal.h"
#include <stdio.h>using namespace muduo;
using namespace muduo::net;std::vector<InetAddress> g_backends;
ThreadLocal<std::map<string, TunnelPtr>> t_tunnels;
MutexLock g_mutex;
size_t g_current = 0;void onServerConnection(const TcpConnectionPtr& conn)
{LOG_DEBUG << (conn->connected() ? "UP": "DOWN");std::map<string, TunnelPtr>& tunnels = t_tunnels.value();if(conn->connected()){conn->setTcpNoDelay(true);conn->stopRead();size_t current = 0;{MutexLockGuard guard(g_mutex);current = g_current;g_current = (g_current + 1) % g_backends.size();}InetAddress backend = g_backends[current];TunnelPtr tunnel(new Tunnel(conn->getLoop(), backend, conn));tunnel->setup();tunnel->connect();tunnels[conn->name()] = tunnel;}else{assert(tunnels.find(conn->name()) != tunnels.end());tunnels[conn->name()]->disconnect();tunnels.erase(conn->name());}
}void onServerMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{if(!conn->getContext().empty()){const TcpConnectionPtr& clientConn= boost::any_cast<const TcpConnectionPtr&>(conn->getContext());clientConn->send(buf);}
}int main(int argc, char * argv[])
{printf("balancer.cpp=====main====argc = %d\n", argc);if(argc < 3){fprintf(stderr, "Usage: %s listen_port backend_ip:port [backend_ip:port]\n", argv[0]);}else{for(int i = 2; i < argc; ++i){string hostport = argv[i];printf("balancer.cpp=====hostport====argv[%d] = %s\n", i, hostport.c_str());size_t colon = hostport.find(':');if(colon != string::npos){string ip = hostport.substr(0, colon);uint16_t port = static_cast<uint16_t>(atoi(hostport.c_str()+colon+1));g_backends.push_back(InetAddress(ip, port));}else{fprintf(stderr, "invalid backend address %s\n", argv[i]);return 1;}}uint16_t port = static_cast<uint16_t>(atoi(argv[1]));InetAddress listenAddr(port);EventLoop loop;TcpServer server(&loop, listenAddr, "TcpBalancer");server.setConnectionCallback(onServerConnection);server.setMessageCallback(onServerMessage);server.setThreadNum(4);server.start();loop.loop();}return 0;
}
设置启动参数,编译运行:

客户端连接测试:
启动两个后端 chargen 服务器
打开两个终端窗口,分别启动 chargen 服务器,监听不同的端口。
# 终端1:启动第一个 chargen 服务器,监听 2000 端口 ./chargen 2000

# 终端2:启动第二个 chargen 服务器,监听 2001 端口 ./chargen 2001

使用客户端进行连接测试
# 终端3:第一个客户端连接 telnet 127.0.0.1 8888

# 终端4:第一个客户端连接 telnet 127.0.0.1 8888

运行后,就数据传输出 两个chargen转发的数据差不多

Balancer的数据一直在转发

=====================================================
ch7_tcprelay.cpp文件:
// ch7_tcprelay.cpp
#include "ch7_tunnel.h"#include <malloc.h>
#include <stdio.h>
#include <sys/resource.h>
#include <unistd.h>using namespace muduo;
using namespace muduo::net;EventLoop* g_eventLoop;
InetAddress* g_serverAddr;
std::map<string, TunnelPtr> g_tunnels;void onServerConnection(const TcpConnectionPtr& conn)
{LOG_DEBUG << (conn->connected() ? "UP" : "DOWN");if (conn->connected()){conn->setTcpNoDelay(true);conn->stopRead();TunnelPtr tunnel(new Tunnel(g_eventLoop, *g_serverAddr, conn));tunnel->setup();tunnel->connect();g_tunnels[conn->name()] = tunnel;}else{assert(g_tunnels.find(conn->name()) != g_tunnels.end());g_tunnels[conn->name()]->disconnect();g_tunnels.erase(conn->name());}
}void onServerMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{LOG_DEBUG << buf->readableBytes();if (!conn->getContext().empty()){const TcpConnectionPtr& clientConn= boost::any_cast<const TcpConnectionPtr&>(conn->getContext());clientConn->send(buf);}
}void memstat()
{malloc_stats();
}int main(int argc, char* argv[])
{if (argc < 4){// 主机IP 转发的端口(服务器监听的端口例如echo服务程序2007) 监听的端口2003(客户端连接)fprintf(stderr, "Usage: %s <host_ip> <port> <listen_port>\n", argv[0]);}else{LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();{// set max virtual memory to 256MB.size_t kOneMB = 1024 * 1024;rlimit rl = { 256 * kOneMB, 256 * kOneMB };setrlimit(RLIMIT_AS, &rl);}const char* ip = argv[1];uint16_t port = static_cast<uint16_t>(atoi(argv[2]));InetAddress serverAddr(ip, port);g_serverAddr = &serverAddr;uint16_t acceptPort = static_cast<uint16_t>(atoi(argv[3]));InetAddress listenAddr(acceptPort);EventLoop loop;g_eventLoop = &loop;loop.runEvery(3, memstat);TcpServer server(&loop, listenAddr, "TcpRelay");server.setConnectionCallback(onServerConnection);server.setMessageCallback(onServerMessage);server.start();loop.loop();}return 0;
}
设置参数

第一个参数是主机IP, 第2个参数是远程服务端的端口,这里的服务采用echo服务,端口是2007,
第3个参数是监听客户端的端口2003,当前客户端连接时,连接的端口是2003,连接成功能,发消息时,由tcprelay把消息转发到echo服务程序。
./tcprelay 127.0.0.1 2007 2003
启动中继器

启动服务端程序echo, 服务端默认的端口是2007

打开客户端服务,连接中继器,连接端口为2003

发送消息

-------分隔线----------------------------------start---------
7.13.2 socks4a代理服务器
socks4a的功能与TcpRelay非常相似,也是把连接C上收到的数据发给连接S,同时把连接S上收到的数据发给连接C。它与TcpRelay的区别在于,TcpRelay固定连到某个server地址,而socks4a允许client指定要连哪个server。在accept连接C之后,socks4a server会读几个字节,以了解server的地址,再发起连接S。socks4a的协议非常简单,请参考维基百科[注40:http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a]。
muduo的socks4a代理服务器的实现在examples/socks4a/socks4a.cc,它也使用了Tunnel class。与TcpRelay相比,只多了解析server地址这一步骤。目前DNS地址解析这一步用的是阻塞的gethostbyname()函数,在真正的系统中,应该换成非阻塞的DNS解析,可参考§ 7.15。muduo的这个socks4a是个标准的网络服务,可以供Web浏览器使用(我正是这么测试它的)。
7.13.3 N:1与1:N连接转发
云风在《写了一个proxy用途你懂的》[注41:http://blog.codingnow.com/2011/05/xtunnel.html]中写了一个TCP隧道tunnel,程序由三部分组成:N:1连接转发服务,1:N连接转发服务,socks代理服务。
我仿照他的思路,用muduo实现了这三个程序。不同的是,我没有做数据混淆,所以功能上有所减弱。
● N:1连接转发服务就是§ 7.12中的multiplexer(数据选择器)。
● 1:N连接转发服务是云风文中提到的backend,一个数据分配器(demulti-plexer),代码在examples/multiplexer/demux.cc。
● socks代理服务正是§ 7.13.2实现的socks4a。
有兴趣的读者可以把这三个程序级联起来试一试。
7.14 短址服务
muduo内置了一个简陋的HTTP服务器,可以处理简单的HTTP请求。这个HTTP服务器是面向内网的暴露进程状态的监控端口,不是面向公网的功能完善且健壮的httpd,其接口与J2EE的HttpServlet有几分类似。我们可以拿它来实现一个简单的短URL转发服务,以简要说明其用法。代码位于examples/shorturl/shorturl.cc。
std::map<string, string> redirections; //URL 转发表void onRequest(const HttpRequest& req, HttpResponse* resp)
{LOG_INFO << "Headers " << req.methodString() << " " << req.path();// TODO: support PUT and DELETE to create new redirections on-the-fly.std::map<string, string>::const_iterator it = redirections.find(req.path());if(it != redirections.end()) //如果找到了短址 {resp->setStatusCode(HttpResponse::k301MovedPermanently);resp->setStatusMessage("Moved Permanently");resp->addHeader("Location", it->second);// resp->setCloseConnection(true);}// ...
}int main(int argc, char* argv[])
{redirections["/1"] = "http://chenshuo.com";redirections["/2"] = "http://blog.csdn.net/Solstice";EventLoop loop;HttpServer server(&loop, InetAddress(8000), "shorturl");server.setHttpCallback(onRequest);server.setThreadNum(numThreads);server.start();loop.loop();
}
muduo并没有为短连接TCP服务优化,无法发挥多核优势。一种真正高效的优化手段是修改Linux内核,例如Google的SO_REUSEPORT内核补丁[注42:http://linux.dell.com/files/presentations/Linux_Plumbers_Conf_2010/Scaling_techniques_for_servers_with_high_connection%20rates.pdf]。
读者可以试试建立一个loop转发,例如“/1”→“/2”→“/3”→“/1”,看看浏览器反应如何。
-------分隔线----------------------------------start---------
笔记:
//ch7_shorturl.cpp#include "muduo/net/http/HttpServer.h"
#include "muduo/net/http/HttpRequest.h"
#include "muduo/net/http/HttpResponse.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThreadPool.h"
#include "muduo/base/Logging.h"#include <map>#include <sys/socket.h> //SO_REUSEPORATusing namespace muduo;
using namespace muduo::net;extern char favicon[555];
bool benchmark = false;std::map<string, string> redirections;void onRequest(const HttpRequest& req, HttpResponse* resp)
{LOG_INFO << "Headers " << req.methodString() << " " << req.path();if(!benchmark){const std::map<string, string>& headers = req.headers();for(std::map<string, string>::const_iterator it = headers.begin();it != headers.end();++it){LOG_DEBUG << it->first << ": " << it->second;}}// TODO: support PUT and DELETE to create new redirections on-the-fly.std::map<string, string>::const_iterator it = redirections.find(req.path());if(it != redirections.end()){resp->setStatusCode(HttpResponse::k301MovedPermanently);resp->setStatusMessage("Moved Permanently");resp->addHeader("Location", it->second);// resp->setCloseConnection(true);}else if(req.path() == "/"){resp->setStatusCode(HttpResponse::k200Ok);resp->setStatusMessage("OK");resp->setContentType("text/html");string now = Timestamp::now().toFormattedString();std::map<string, string>::const_iterator i = redirections.begin();string text;for(; i != redirections.end(); ++i){text.append("<ul>" + i->first + " => " + i->second + "</ul>");}resp->setBody("<html><head><title>My tiny short url service</title></head>""<body><h1>Known redirections</h1>"+ text +"Now is " + now +"</body></html>");}else if(req.path() == "/favicon.ico"){resp->setStatusCode(HttpResponse::k200Ok);resp->setStatusMessage("OK");resp->setContentType("image/png");resp->setBody(string(favicon, sizeof favicon));}else{resp->setStatusCode(HttpResponse::k404NotFound);resp->setStatusMessage("Not Found");resp->setCloseConnection(true);}
}int main(int argc, char* argv[])
{redirections["/1"] = "http://chenshuo.com";redirections["/2"] = "http://blog.csdn.net/Solstice";int numThreads = 0;if(argc > 1){benchmark = true;Logger::setLogLevel(Logger::WARN);numThreads = atoi(argv[1]);}
#ifdef SO_REUSEPORTLOG_WARN << "SO_REUSEPORT";EventLoop loop;EventLoopThreadPool threadPool(&loop, "shorturl");if (numThreads > 1){threadPool.setThreadNum(numThreads);}else{numThreads = 1;}threadPool.start();std::vector<std::unique_ptr<HttpServer>> servers;for (int i = 0; i < numThreads; ++i){servers.emplace_back(new HttpServer(threadPool.getNextLoop(),InetAddress(8000),"shorturl",TcpServer::kReusePort));servers.back()->setHttpCallback(onRequest);servers.back()->getLoop()->runInLoop(std::bind(&HttpServer::start, servers.back().get()));}loop.loop();
#elseLOG_WARN << "Normal";EventLoop loop;HttpServer server(&loop, InetAddress(8000), "shorturl");server.setHttpCallback(onRequest);server.setThreadNum(numThreads);server.start();loop.loop();
#endifreturn 0;
}char favicon[555] = {'\x89', 'P', 'N', 'G', '\xD', '\xA', '\x1A', '\xA','\x0', '\x0', '\x0', '\xD', 'I', 'H', 'D', 'R','\x0', '\x0', '\x0', '\x10', '\x0', '\x0', '\x0', '\x10','\x8', '\x6', '\x0', '\x0', '\x0', '\x1F', '\xF3', '\xFF','a', '\x0', '\x0', '\x0', '\x19', 't', 'E', 'X','t', 'S', 'o', 'f', 't', 'w', 'a', 'r','e', '\x0', 'A', 'd', 'o', 'b', 'e', '\x20','I', 'm', 'a', 'g', 'e', 'R', 'e', 'a','d', 'y', 'q', '\xC9', 'e', '\x3C', '\x0', '\x0','\x1', '\xCD', 'I', 'D', 'A', 'T', 'x', '\xDA','\x94', '\x93', '9', 'H', '\x3', 'A', '\x14', '\x86','\xFF', '\x5D', 'b', '\xA7', '\x4', 'R', '\xC4', 'm','\x22', '\x1E', '\xA0', 'F', '\x24', '\x8', '\x16', '\x16','v', '\xA', '6', '\xBA', 'J', '\x9A', '\x80', '\x8','A', '\xB4', 'q', '\x85', 'X', '\x89', 'G', '\xB0','I', '\xA9', 'Q', '\x24', '\xCD', '\xA6', '\x8', '\xA4','H', 'c', '\x91', 'B', '\xB', '\xAF', 'V', '\xC1','F', '\xB4', '\x15', '\xCF', '\x22', 'X', '\x98', '\xB','T', 'H', '\x8A', 'd', '\x93', '\x8D', '\xFB', 'F','g', '\xC9', '\x1A', '\x14', '\x7D', '\xF0', 'f', 'v','f', '\xDF', '\x7C', '\xEF', '\xE7', 'g', 'F', '\xA8','\xD5', 'j', 'H', '\x24', '\x12', '\x2A', '\x0', '\x5','\xBF', 'G', '\xD4', '\xEF', '\xF7', '\x2F', '6', '\xEC','\x12', '\x20', '\x1E', '\x8F', '\xD7', '\xAA', '\xD5', '\xEA','\xAF', 'I', '5', 'F', '\xAA', 'T', '\x5F', '\x9F','\x22', 'A', '\x2A', '\x95', '\xA', '\x83', '\xE5', 'r','9', 'd', '\xB3', 'Y', '\x96', '\x99', 'L', '\x6','\xE9', 't', '\x9A', '\x25', '\x85', '\x2C', '\xCB', 'T','\xA7', '\xC4', 'b', '1', '\xB5', '\x5E', '\x0', '\x3','h', '\x9A', '\xC6', '\x16', '\x82', '\x20', 'X', 'R','\x14', 'E', '6', 'S', '\x94', '\xCB', 'e', 'x','\xBD', '\x5E', '\xAA', 'U', 'T', '\x23', 'L', '\xC0','\xE0', '\xE2', '\xC1', '\x8F', '\x0', '\x9E', '\xBC', '\x9','A', '\x7C', '\x3E', '\x1F', '\x83', 'D', '\x22', '\x11','\xD5', 'T', '\x40', '\x3F', '8', '\x80', 'w', '\xE5','3', '\x7', '\xB8', '\x5C', '\x2E', 'H', '\x92', '\x4','\x87', '\xC3', '\x81', '\x40', '\x20', '\x40', 'g', '\x98','\xE9', '6', '\x1A', '\xA6', 'g', '\x15', '\x4', '\xE3','\xD7', '\xC8', '\xBD', '\x15', '\xE1', 'i', '\xB7', 'C','\xAB', '\xEA', 'x', '\x2F', 'j', 'X', '\x92', '\xBB','\x18', '\x20', '\x9F', '\xCF', '3', '\xC3', '\xB8', '\xE9','N', '\xA7', '\xD3', 'l', 'J', '\x0', 'i', '6','\x7C', '\x8E', '\xE1', '\xFE', 'V', '\x84', '\xE7', '\x3C','\x9F', 'r', '\x2B', '\x3A', 'B', '\x7B', '7', 'f','w', '\xAE', '\x8E', '\xE', '\xF3', '\xBD', 'R', '\xA9','d', '\x2', 'B', '\xAF', '\x85', '2', 'f', 'F','\xBA', '\xC', '\xD9', '\x9F', '\x1D', '\x9A', 'l', '\x22','\xE6', '\xC7', '\x3A', '\x2C', '\x80', '\xEF', '\xC1', '\x15','\x90', '\x7', '\x93', '\xA2', '\x28', '\xA0', 'S', 'j','\xB1', '\xB8', '\xDF', '\x29', '5', 'C', '\xE', '\x3F','X', '\xFC', '\x98', '\xDA', 'y', 'j', 'P', '\x40','\x0', '\x87', '\xAE', '\x1B', '\x17', 'B', '\xB4', '\x3A','\x3F', '\xBE', 'y', '\xC7', '\xA', '\x26', '\xB6', '\xEE','\xD9', '\x9A', '\x60', '\x14', '\x93', '\xDB', '\x8F', '\xD','\xA', '\x2E', '\xE9', '\x23', '\x95', '\x29', 'X', '\x0','\x27', '\xEB', 'n', 'V', 'p', '\xBC', '\xD6', '\xCB','\xD6', 'G', '\xAB', '\x3D', 'l', '\x7D', '\xB8', '\xD2','\xDD', '\xA0', '\x60', '\x83', '\xBA', '\xEF', '\x5F', '\xA4','\xEA', '\xCC', '\x2', 'N', '\xAE', '\x5E', 'p', '\x1A','\xEC', '\xB3', '\x40', '9', '\xAC', '\xFE', '\xF2', '\x91','\x89', 'g', '\x91', '\x85', '\x21', '\xA8', '\x87', '\xB7','X', '\x7E', '\x7E', '\x85', '\xBB', '\xCD', 'N', 'N','b', 't', '\x40', '\xFA', '\x93', '\x89', '\xEC', '\x1E','\xEC', '\x86', '\x2', 'H', '\x26', '\x93', '\xD0', 'u','\x1D', '\x7F', '\x9', '2', '\x95', '\xBF', '\x1F', '\xDB','\xD7', 'c', '\x8A', '\x1A', '\xF7', '\x5C', '\xC1', '\xFF','\x22', 'J', '\xC3', '\x87', '\x0', '\x3', '\x0', 'K','\xBB', '\xF8', '\xD6', '\x2A', 'v', '\x98', 'I', '\x0','\x0', '\x0', '\x0', 'I', 'E', 'N', 'D', '\xAE','B', '\x60', '\x82',
};
Qt的工程文件配置

编译运行:

打开浏览器,输入 http://192.168.31.61:8000/

输入 http://192.168.31.61:8000/1 跳转到以下网页

输入 http://192.168.31.61:8000/2 跳转到以下网页

-------分隔线--------------------------------------end-----
7.15 与其他库集成
前面介绍的网络应用例子都是直接用muduo库收发网络消息,也就是主要介绍TcpConnection、TcpServer、TcpClient、Buffer等class的使用。本节将稍微深入其内部,介绍Channel class的用法,通过它可以把其他一些现成的网络库融入muduo的event loop中。
Channel class是IO事件回调的分发器(dispatcher),它在handleEvent()中根据事件的具体类型分别回调ReadCallback、WriteCallback等,代码见§ 8.1.1。每个Channel对象服务于一个文件描述符,但并不拥有fd,在析构函数中也不会close(fd)。Channel也使用muduo一贯的boost::function来表示函数回调,它不是基类[注43:相关讨论见http://www.cppblog.com/Solstice/archive/2012/07/01/181058.aspx后面的评论。]。这样用户代码不必继承Channel,也无须override虚函数。
class Channel : noncopyable
{public:typedef std::function<void()> EventCallback;typedef std::function<void(Timestamp)> ReadEventCallback;Channel(EventLoop* loop, int fd);~Channel();void setReadCallback(ReadEventCallback cb);void setWriteCallback(EventCallback cb);void setCloseCallback(EventCallback cb);void setErrorCallback(EventCallback cb);void enableReading();//void disableReading(); //暂时没有用到void enableWriting();void disableWriting();void disableAll();void handleEvent(Timestamp receiveTime); //由EventLoop::loop()调用 /// Tie this channel to the owner object managed by shared_ptr,/// prevent the owner object being destroyed in handleEvent.void tie(const std::shared_ptr<void>&); //tie()的例子见7.15.3节int fd() const; // obviousvoid remove(); //loop_->removeChannel(this);// ......
}
Channel与EventLoop的内部交互有两个函数EventLoop::updateChannel(Channel*)和EventLoop::removeChannel(Channel*)。客户需要在Channel析构前自己调用Chan-nel::remove()。
后面我们将通过一些实例来介绍Channel class的使用。
7.15.1 UDNS
UDNS [注44:http://www.corpit.ru/mjt/udns.html]是一个stub [注45:stub的意思是只会查询一个DNS服务器,而不会递归地(recursive)查询多个DNS服务器,因此适合在公司内网使用(http://en.wikipedia.org/wiki/Domain_Name_System#DNS_resolvers ).]DNS解析器,它能够异步地发起DNS查询,再通过回调函数通知结果。UDNS在设计的时候就考虑到了配合(融入)主程序现有的基于select/poll/epoll的event loop模型,因此它与muduo的配接相对较为容易。由于License限制,本节的代码位于单独的项目中:https://github.com/chenshuo/muduo-udns。
muduo-udns由三部分组成,一是udns-0.2源码[注46:Ubuntu和Debian都不包含UDNS 0.2软件包,因此必须连同上游源码一起发布。];二是UDNS与muduo的配接器(adapter),即Resolver class,位于Resolver.{h,cc};三是简单的测试dns.cc,展示Resolver的使用。前两部分构成了muduo-udns程序库。
先看Resolver class的接口(Resolver.h):
class Resolver : muduo::noncopyable
{public:typedef std::function<void(const InetAddress&)> Callback;Resolver(EventLoop* loop);Resolver(EventLoop* loop, const InetAddress& nameServer);~Resolver();void start();bool resolve(const StringPiece& hostname, const Callback& cb);// ...
};
其中第一个构造函数会使用系统默认的DNS服务器地址,第二个构造函数由用户指明DNS服务器的IP地址(见后面的练习1)。用户最关心的是resolve()函数,它会回调用户的Callback。
在介绍Resolver的实现之前,先来看它的用法(dns.cc),下面这段代码同时解析三个域名,并在stdout输出结果。注意回调函数只提供解析后的地址,因此resolveCallback需要自己设法记住域名,这里我用的是boost::bind。
void resolveCallback(const string& host, const InetAddress& addr)
{LOG_INFO << "resolved " << host << " -> " << addr.toIp();
}
由于是异步解析,因此输出结果的顺序和提交请求的顺序不一定一致,例如:

UDNS与muduo Resolver的交互过程如下:
1.初始化dns_ctx*之后,Resolver::start()调用dns_open()获得UDNS使用的文件描述符,并通过muduo Channel观察其可读事件。由于UDNS始终只用一个socket fd,只观察一个事件,因此特别容易和现有的event loop集成。
2.在解析域名时(Resolver::resolve()),调用dns_submit_a4()发起解析,并通过dns_timeouts()获得超时的秒数,使用EventLoop::runAfter()注册单次定时器回调。
3.在fd可读时(Resolver::onRead()),调用dns_ioevent()。如果DNS解析成功,会回调Resolver::dns_query_a4()通知解析的结果,继而调用Resolver::onQueryResult(),后者会回调用户Callback。
4.在超时后(Resolver::onTimer()),调用dns_timeouts(),必要时继续注册下一次定时器回调。
可见UDNS是一个设计良好的库,可与现有的event loop很好地结合。UDNS使用定时器的原因是UDP可能丢包,因此程序必须自己处理超时重传。
Resolve class不是线程安全的,客户代码只能在EventLoop所属的线程调用它的Resolver::resolve()成员函数,解析结果也是由这个线程回调客户代码。这个函数通过loop_->assertInLoopThread();来确保不被误用。
C++程序与C语言函数库交互的一个难点在于资源管理,muduo-udns不得已使用了手工new/delete的做法,每次解析会在堆上创建QueryData对象,这样在UDNS回调Resolver::dns_query_a4()时才知道该回调哪个用户Callback。
练习1:补充构造函数Resolver(EventLoop*loop,const InetAddress&nameServer)的实现。可利用文档[注47:http://www.corpit.ru/mjt/udns/udns.3.html]介绍的dns_add_serv_s()函数。
udns - 用于stub DNS解析器的库
练习2:用muduo-udns改进§ 7.13的socks4a服务器,替换其中阻塞的get-hostbyname()函数调用,实现完全的无阻塞服务。
7.15.2 c-ares DNS
c-ares DNS [注48:http://c-ares.haxx.se/]是一款常用的异步DNS解析库,§ 6.2介绍了它的安装方法,本节将简要介绍其与muduo的集成。示例代码位于examples/cdns,代码结构与§ 7.15.1的UDNS非常相似。Resolver.{h,cc}是c-ares DNS与muduo的配接器(adapter),即udns::Resolver class;dns.cc是简单的测试,展示Resolver的使用。c-ares DNS的选项非常多[注49:功能也比UDNS强大,例如可以读取/etc/hosts。udns::Resolver的构造函数有选项可禁用此功能。],本节只是展示其与muduo EventLoop集成的基本做法,cdns::Resolver并没有暴露其全部功能。
cdns::Resolver的接口和用法与前面UDNS Resolver相同,只是少了start()函数,此处不再重复举例。cdns::Resolver的实现与前面UDNS Resolver很相似:
1.Resolver::resolve()调用ares_gethostbyname()发起解析,并通过ares_timeout()获得超时的秒数,注册定时器。
2.在fd可读时(Resolver::onRead()),调用ares_process_fd()。如果DNS解析成功,会回调Resolver::ares_host_callback()[注50:ares_host_callback()相当于前面UDNS的dns_query_ a40)回调。]通知解析的结果,继而调用Resolver::onQueryResult(),后者会回调用户Callback。
3.在超时后(Resolver::onTimer()),调用ares_process_fd()处理这一事件,并再次调用dns_timeouts()获得下一次超时的间隔,必要时继续注册下一次定时器回调。
cdns::Resolver的线程安全性与UDNS Resolver相同。
与UDNS不同,c-ares DNS会用到不止一个socket文件描述符[注51:因为DNS解析时,如果UDP响应发生消息截断,会改用TCP重发请求。],而且既会用到fd可读事件,又会用到fd可写事件,因此cdns::Resolver的代码比UDNS要复杂一些。Resolver::ares_sock_create_callback()是新建socket fd的回调函数,其中会调用Resolver::onSockCreate()来创建Channel对象,这正是Resolver没有start()成员函数的原因。Resolver::ares_sock_state_callback()是变更socket fd状态的回调函数,会通知该观察哪些IO事件(可读and/or可写)。
练习3:阅读源码并测试c-ares DNS什么时候需要观察“fd可写”事件,然后补充完整Resolver::onSockStateChange()。
练习4:修改Callback的原型,让Resolver能返回地址列表(std::vector<Inet-Address>),这个练习同样适用于§ 7.15.1的UDNS。
练习5:为libunbound [注52: http://www.unbound.net/documentation/libunbound.html 编写类似的muduo adapter。注意它似乎没有使用timeout,很奇怪。
7.15.3 curl
libcurl是一个常用的HTTP客户端库[注53:也可以访问FTP服务器。],可以方便地下载HTTP和HTTPS数据。libcurl有两套接口,easy和multi,本节介绍的是使用其multi接口[注54:http://curl.haxx.se/libcurl/c/libcurl-multi.html]以达到单线程并发访问多个URL的效果。muduo与libcurl搭配的例子见examples/curl,其中包含单线程多连接并发下载同一文件的示例,即单线程实现的“多线程下载器”。
libcurl融入muduo EventLoop的复杂度比前面两个DNS库都更高,一方面因为它本身的功能丰富,另一方面也因为它的接口设计更偏重传统阻塞IO(它原本是从curl(1)这个命令行工具剥离出来的),在事件驱动方面的调用、回调、传参都比较烦琐。这里不去详细解释每一个函数的作用,想必读者在读过前两节之后已经对Channel的用法有了基本的了解,对照libcurl文档和muduo代码就能搞明白。
练习6:修改curl::Request::DataCallback的原型,改为以muduo::net::Buffer*为参数,方便用户使用。这需要在curl::Request中增加Buffer成员。
第1章我们探讨了多线程程序中的对象生命期管理技术。在单线程事件驱动的程序中,对象的生命期管理有时也不简单。比方说图7-55展示的例子,对方断开TCP连接,这个IO事件会触发Channel::handleEvent()调用,后者会回调用户提供的CloseCallback,而用户代码在onClose()中有可能析构Channel对象,这就造成了灾难。等于说Channel::handleEvent()执行到一半的时候,其所属的Channel对象本身被销毁了。这时程序立刻core dump就是最好的结果了。

图7-55
muduo的解决办法是提供Channel::tie(const boost::shared_ptr<void>&)这个函数,用于延长某些对象[注55:可以是Channel对象,也可以是其owner对象。]的生命期,使之长过Channel::handleEvent()函数。这也是muduo TcpConnection采用shared_ptr管理对象生命期的原因之一;否则的话,Channel::handleEvent()有可能引发TcpConnection析构,继而把当前Channel对象也析构了,引起程序崩溃。
第三方库与muduo集成的另外一个问题是对IO事件变化的理解可能不一致。拿libcurl来说,它会在某个文件描述符需要关注的IO事件发生变化的时候通知外围的event loop库,比方说原来关注POLLIN,现在关注(POLLIN|POLLOUT),muduo在Curl::socketCallback回调函数中会相应地调用Channel::enableWriting(),能正确处理这种变化。
不幸的是,libcurl在与c-ares DNS配合[注56:这两个库是同一个作者,Iibcur默认会用gethostbyname()执行同步DNS解析,在有c-ares DNS的时候用它执行异步DNS解析。]的时候会出现与muduo不兼容的现象。libcurl在访问URL的时候先要解析其中的域名,然后再对那个Web服务器发起TCP连接。在与c-ares DNS搭配时会出现一种情况:c-ares DNS解析域名用到的与DNS服务器通信的socket fd1和libcurl对Web服务器发起TCP连接的fd2恰好相等,即fd1==fd2。原因是POSIX操作系统总是选用当前最小可用的文件描述符,当DNS解析完成后,libcurl内部使用的c-ares DNS会关闭fd1,libcurl随后再立刻新建一个TCP socket fd2,它有可能恰好复用了fd1的值。
但这时libcurl不会认为文件描述符或其关注的IO事件发生了变化,也就不会通知muduo去销毁并新建Channel对象。这种做法与传统的基于select(2)和poll(2)的event loop配合不会有问题,因为select(2)和poll(2)是上下文无关的,每次都从输入重建要关注的文件描述符列表。但是在与epoll(4)配合的时候就有问题了,关闭fd1会使得epoll从关注列表(watch list)中移除fd1的条目,新建的同名fd2却没有机会加入IO事件watch list,也就不会收到任何IO事件通知。这个问题无法在muduo内部修复,只能修改上游的程序库。
另外一个问题是libcurl在通知muduo取消关注某个fd的时候已经事先关闭了它,这将造成muduo调用::epoll_ctl(epollfd_,EPOLL_CTL_DEL,fd,NULL)时会返回错误,因为关闭文件描述符已经就把它从epoll watch list中除掉了。为了应对这种情况,我不得已更改了EPollPoller::update()的错误处理,放宽检查。
7.15.4 更多
除了前面举的几个例子,muduo当然还可以将其他涉及网络IO的库融入其EventLoop/Channel框架,我能想到的有:
● libmicrohttpd——可嵌入的HTTP服务器。
● libpg——PostgreSQL的官方客户端库。
● libdrizzle——MySQL的非官方客户端库。
● QuickFIX——常用的FIX消息库。
在有具体应用场景的时候,我多半会为之提供muduo adapter,也欢迎用户贡献有关补丁。
另外一个扩展思路是,对每个TCP连接创建一个lua state,用muduo为lua提供通信机制。然后用lua来编写业务逻辑,这也可以做到在线更改逻辑而不重启进程。就像OpenResty [注57:http://openresty.org/]和云风的skynet [注58:https://github.com/cloudwu/skynet]那样。这种做法还可以利用coroutine来简化业务逻辑的实现。
