C++ - 仿 RabbitMQ 实现消息队列--muduo快速上手
目录
Muduo 库是什么
Muduo 库常见接口介绍
muduo::net::TcpServer 类基础介绍
muduo::net::EventLoop 类基础介绍
muduo::net::TcpConnection 类基础介绍
muduo::net::TcpClient 类基础介绍
muduo::net::Buffer 类基础介绍
Muduo 库快速上手
echo 服务器
echo客户端
编译命令
基于 muduo 库函数实现 protobuf 协议的通信
大佬代码
服务器
客户端
模仿实现服务器客户端
proto结构的定义
服务器
客户端
编译命令
Muduo 库是什么
Muduo 由陈硕大佬开发,是一个基于非阻塞 IO 和事件驱动的 C++高并发 TCP 网络编程库。 它是一款基于主从 Reactor 模型的网络库,其使用的线程模型是 one loop per thread, 所谓 one loop per thread 指的是:
- 一个线程只能有一个事件循环(EventLoop), 用于响应计时器和 IO 事件。
- 一个文件描述符只能由一个线程进行读写,换句话说就是一个 TCP 连接必须归属于某个 EventLoop 管理。
Muduo 库常见接口介绍
muduo::net::TcpServer 类基础介绍
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&, Buffer*, Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:enum Option{ kNoReusePort,kReusePort,};TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); void setThreadNum(int numThreads); void start();/// 当一个新连接建立成功的时候被调用void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }/// 消息的业务处理回调函数---这是收到新连接消息的时候被调用的函数 void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
};
说明:
- 构造函数:
- loop:进行事件监控的对象。
- listenAddr:绑定的ip,port。
- nameArg:服务器的名称。
- option:是否启用端口复用,默认不启用。
- setThreadNum:设置线程数目。
- start:启动服务器。
- setConnectionCallback:设置收到新链接的时候要调用的回调函数。
- setMessageCallback:设置收到客户端发来的信息时要调用的回调函数,也就是处理数据的函数。
muduo::net::EventLoop 类基础介绍
class EventLoop : noncopyable
{
public:/// Loops forever./// Must be called in the same thread as creation of the object. void loop();/// Quits loop./// This is not 100% thread safe, if you call through a raw pointer, /// better to call through shared_ptr<EventLoop> for 100% safety. void quit();TimerId runAt(Timestamp time, TimerCallback cb);/// Runs callback after @c delay seconds./// Safe to call from other threads.TimerId runAfter(double delay, TimerCallback cb);/// Runs callback every @c interval seconds./// Safe to call from other threads.TimerId runEvery(double interval, TimerCallback cb);/// Cancels the timer./// Safe to call from other threads. void cancel(TimerId timerId);private: std::atomic<bool> quit_; std::unique_ptr<Poller> poller_; mutable MutexLock mutex_; std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
说明:在次项目中我们只需要知道loop是启动事件监控即可,和服务器一起启动。
muduo::net::TcpConnection 类基础介绍
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:/// Constructs a TcpConnection with a connected sockfd////// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); bool connected() const { return state_ == kConnected; } bool disconnected() const { return state_ == kDisconnected; }void send(string&& message); // C++11 void send(const void* message, int len); void send(const StringPiece& message); // void send(Buffer&& message); // C++11 void send(Buffer* message); // this one will swap data void shutdown(); // NOT thread safe, no simultaneous callingvoid setContext(const boost::any& context){ context_ = context; }const boost::any& getContext() const{ return context_; }boost::any* getMutableContext(){ return &context_; }void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }
private:enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;boost::any context_;
};
在此项目中,我们只需要了解一下接口:
- connected()/disconnected():判断链接的状态,即是否是链接状态。
- void send(string&& message):向对端发送数据。
muduo::net::TcpClient 类基础介绍
class TcpClient : noncopyable
{
public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);~TcpClient(); // force out-line dtor, for std::unique_ptr members.void connect();//连接服务器void disconnect();//关闭连接 void stop();//获取客户端对应的通信连接 Connection 对象的接口,发起 connect 后,有
可能还没有连接建立成功TcpConnectionPtr connection() const{ MutexLockGuard lock(mutex_); return connection_;} /// 连接服务器成功时的回调函数void setConnectionCallback(ConnectionCallback cb){ connectionCallback_ = std::move(cb); }/// 收到服务器发送的消息时的回调函数void setMessageCallback(MessageCallback cb){ messageCallback_ = std::move(cb); }
private:EventLoop* loop_;ConnectionCallback connectionCallback_; MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_; TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为 muduo 库不管是服务端还是客户端都是异步操作,
对于客户端来说如果我们在连接还没有完全建立成功的时候发送数据,这是不被允许的。
因此我们可以使用内置的 CountDownLatch 类进行同步控制*/
class CountDownLatch : noncopyable
{
public:explicit CountDownLatch(int count); void wait(){MutexLockGuard lock(mutex_); while (count_ > 0){ condition_.wait();}} void countDown()
{MutexLockGuard lock(mutex_);--count_;if (count_ == 0){ condition_.notifyAll();} } int getCount() const;
private:mutable MutexLock mutex_;Condition condition_ GUARDED_BY(mutex_); int count_ GUARDED_BY(mutex_);
};
说明:
- 构造函数:
- loop:事件监控对象。
- serverAddr:服务器地址。
- nameArg:客户端名称。
- connect/disconnect:连接服务器/关闭连接。
- 两种回调函数与服务器一致。
- 注释中也给出,我们需要通过CountDownLatch类控制正确获取到链接,后续demo中会演示,用法和条件变量类似。
muduo::net::Buffer 类基础介绍
class Buffer : public muduo::copyable
{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize), readerIndex_(kCheapPrepend), writerIndex_(kCheapPrepend);void swap(Buffer& rhs)size_t readableBytes() constsize_t writableBytes() constconst char* peek() constconst char* findEOL() constconst char* findEOL(const char* start) constvoid retrieve(size_t len)void retrieveInt64()void retrieveInt32()void retrieveInt16()void retrieveInt8()string retrieveAllAsString()string retrieveAsString(size_t len)void append(const StringPiece& str)void append(const char* /*restrict*/ data, size_t len)void append(const void* /*restrict*/ data, size_t len)char* beginWrite()const char* beginWrite() constvoid hasWritten(size_t len)void appendInt64(int64_t x)void appendInt32(int32_t x)void appendInt16(int16_t x)void appendInt8(int8_t x)int64_t readInt64()int32_t readInt32()int16_t readInt16()int8_t readInt8()int64_t peekInt64() constint32_t peekInt32() constint16_t peekInt16() constint8_t peekInt8() constvoid prependInt64(int64_t x)void prependInt32(int32_t x)void prependInt16(int16_t x)void prependInt8(int8_t x)void prepend(const void* /*restrict*/ data, size_t len)
private:std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;static const char kCRLF[];
};
主要了解retrieveAllAsString()接口,就是将数据转换为字符串。
Muduo 库快速上手
我们使用 Muduo 网络库来实现一个简单echo服务器和客户端 快速上手 Muduo 库。
echo 服务器
#include "../include/muduo/net/TcpServer.h"
#include "../include/muduo/net/EventLoop.h"
#include "../include/muduo/net/TcpConnection.h"
#include <iostream>
#include <functional>
#include <string>class EchoServer
{
public:EchoServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "EchoServer", muduo::net::TcpServer::kReusePort){_server.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&EchoServer::onMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));}void start(){_server.start(); // 开始事件监听_baseloop.loop();// 开始事件监控,这是一个死循环阻塞接口}
private:// 在连接建立成功的时候和关闭连接的时候调用void onConnection(const muduo::net::TcpConnectionPtr &conn){// 新连接建立成功是的回调函数if(conn->connected())std::cout << "新连接建立成功: " << conn->peerAddress().toIpPort() << std::endl;elsestd::cout << "新连接关闭: " << conn->peerAddress().toIpPort() << std::endl;}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buff, muduo::Timestamp){std::string msg(buff->retrieveAllAsString());conn->send(msg);}
private:// 用来进行事件监控的对象muduo::net::EventLoop _baseloop;// 用来设置回调函数的对象muduo::net::TcpServer _server;
};int main()
{EchoServer server(8080);server.start();return 0;
}
说明:
- 服务器只需要包含两个成员属性,注意:_baseloop必须在_server之前构造,因为_server的构造需要用到_baseloop。
- start():启动服务器和事件监控。
- onConnection/onMessage:收到新链接或者客户端数据时的回调函数,要在构造函数中设置到_server中。
echo客户端
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <string>class EchoClient
{
public:EchoClient(const std::string &ip, int port):_latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "EchoClient"){_client.setConnectionCallback(std::bind(&EchoClient::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&EchoClient::onMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));}void connect(){_client.connect();_latch.wait(); // 阻塞等待}bool send(const std::string &msg){if(_conn && _conn->connected()){_conn->send(msg);return true;}else{std::cout << "未建立连接" << std::endl;return false;}}
private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if(conn->connected()){_conn = conn;_latch.countDown(); //唤醒主线程中的阻塞std::cout << "建立连接" << std::endl;} else{_conn.reset();std::cout << "关闭连接" << std::endl;}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buff, muduo::Timestamp time){std::cout << "server echo: " << buff->retrieveAllAsString() << std::endl;}private:// 这里_loopthread必须在_client之前构造, 要不然无法获得相应的Loopmuduo::CountDownLatch _latch;muduo::net::EventLoopThread _loopthread;muduo::net::TcpClient _client;muduo::net::TcpConnectionPtr _conn;
};int main()
{EchoClient client("127.0.0.1", 8080);client.connect();while(1){std::string buff;std::cout << "请输入: ";std::cin >> buff;client.send(buff);if(!client.send(buff)){std::cout << "连接已断开,无法发送消息" << std::endl;break;}}return 0;
}
说明:
- 成员属性:
- _latch:封装了条件变量的对象,用来确保正确获得连接,必须初始化为1。
- _loopthread:客户端不能像服务器一样一直进行事件监控的循环,EventLoopThread对象就可以重新创建一个线程来进行事件监控,必须在_client之前初始化。
- _client:客户端对象。
- _conn:建立好的连接,用来发送数据。
- connect():关键要等待连接真正建立好,将_conn初始化。
- send():向服务器发送数据。
- onConnection():关键在于建立连接后初始化_conn,并唤醒客户端。
- onMessage():收到服务器应答后调用的回调函数。
编译命令
all:server client
server:server.ccg++ -o $@ $^ -I../include -L../lib -lmuduo_net -lmuduo_base -pthread
client:client.ccg++ -o $@ $^ -I../include -L../lib -lmuduo_net -lmuduo_base -pthread
这里要链接的库的路径根据自己设置的路径来调整。
基于 muduo 库函数实现 protobuf 协议的通信
上面的echo服务器客户端无法解决数据粘包的问题,并且当我们需要处理多个业务时,无法解决问题,所以我们模仿一个陈硕大佬来完成一个基于protobuf 协议的服务器客户端,实现翻译和计算加法两种业务(简单实现)。
大佬代码
服务器
#include "examples/protobuf/codec/codec.h"
#include "examples/protobuf/codec/dispatcher.h"
#include "examples/protobuf/codec/query.pb.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include <stdio.h>
#include <unistd.h>using namespace muduo;
using namespace muduo::net;typedef std::shared_ptr<muduo::Query> QueryPtr;
typedef std::shared_ptr<muduo::Answer> AnswerPtr;class QueryServer : noncopyable
{public:QueryServer(EventLoop* loop,const InetAddress& listenAddr): server_(loop, listenAddr, "QueryServer"),dispatcher_(std::bind(&QueryServer::onUnknownMessage, this, _1, _2, _3)),codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3)){dispatcher_.registerMessageCallback<muduo::Query>(std::bind(&QueryServer::onQuery, this, _1, _2, _3));dispatcher_.registerMessageCallback<muduo::Answer>(std::bind(&QueryServer::onAnswer, this, _1, _2, _3));server_.setConnectionCallback(std::bind(&QueryServer::onConnection, this, _1));server_.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));}void start(){server_.start();}private:void onConnection(const TcpConnectionPtr& conn){LOG_INFO << conn->peerAddress().toIpPort() << " -> "<< conn->localAddress().toIpPort() << " is "<< (conn->connected() ? "UP" : "DOWN");}void onUnknownMessage(const TcpConnectionPtr& conn,const MessagePtr& message,Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onQuery(const muduo::net::TcpConnectionPtr& conn,const QueryPtr& message,muduo::Timestamp){LOG_INFO << "onQuery:\n" << message->GetTypeName() << message->DebugString();Answer answer;answer.set_id(1);answer.set_questioner("Chen Shuo");answer.set_answerer("blog.csdn.net/Solstice");answer.add_solution("Jump!");answer.add_solution("Win!");codec_.send(conn, answer);conn->shutdown();}void onAnswer(const muduo::net::TcpConnectionPtr& conn,const AnswerPtr& message,muduo::Timestamp){LOG_INFO << "onAnswer: " << message->GetTypeName();conn->shutdown();}TcpServer server_;ProtobufDispatcher dispatcher_;ProtobufCodec codec_;
};int main(int argc, char* argv[])
{LOG_INFO << "pid = " << getpid();if (argc > 1){EventLoop loop;uint16_t port = static_cast<uint16_t>(atoi(argv[1]));InetAddress serverAddr(port);QueryServer server(&loop, serverAddr);server.start();loop.loop();}else{printf("Usage: %s port\n", argv[0]);}
}
关键点说明:
- 先看成员属性:
-
dispatcher_:任务的分发器,也就是基于不同的任务进行不同的处理。
- codec_:解析protobuf数据,将数据交给dispatcher_处理。还可以根据连接向对端发送protobuf结构。
-
- 设计的思路:先向dispatcher_中注册针对不同任务的处理函数,再将server_对数据的处理绑定到codec_对数据处理的函数上,即让codec_代替server_处理数据,codec_解析完数据后交给dispatcher_进行真正的任务处理。
- 关于代码中的QueryPtr/AnswerPtr,这些的都是需要自己定义的proto结构体。
客户端
#include "examples/protobuf/codec/dispatcher.h"
#include "examples/protobuf/codec/codec.h"
#include "examples/protobuf/codec/query.pb.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"#include <stdio.h>
#include <unistd.h>using namespace muduo;
using namespace muduo::net;typedef std::shared_ptr<muduo::Empty> EmptyPtr;
typedef std::shared_ptr<muduo::Answer> AnswerPtr;google::protobuf::Message* messageToSend;class QueryClient : noncopyable
{public:QueryClient(EventLoop* loop,const InetAddress& serverAddr): loop_(loop),client_(loop, serverAddr, "QueryClient"),dispatcher_(std::bind(&QueryClient::onUnknownMessage, this, _1, _2, _3)),codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3)){dispatcher_.registerMessageCallback<muduo::Answer>(std::bind(&QueryClient::onAnswer, this, _1, _2, _3));dispatcher_.registerMessageCallback<muduo::Empty>(std::bind(&QueryClient::onEmpty, this, _1, _2, _3));client_.setConnectionCallback(std::bind(&QueryClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));}void connect(){client_.connect();}private:void onConnection(const TcpConnectionPtr& conn){LOG_INFO << conn->localAddress().toIpPort() << " -> "<< conn->peerAddress().toIpPort() << " is "<< (conn->connected() ? "UP" : "DOWN");if (conn->connected()){codec_.send(conn, *messageToSend);}else{loop_->quit();}}void onUnknownMessage(const TcpConnectionPtr&,const MessagePtr& message,Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();}void onAnswer(const muduo::net::TcpConnectionPtr&,const AnswerPtr& message,muduo::Timestamp){LOG_INFO << "onAnswer:\n" << message->GetTypeName() << message->DebugString();}void onEmpty(const muduo::net::TcpConnectionPtr&,const EmptyPtr& message,muduo::Timestamp){LOG_INFO << "onEmpty: " << message->GetTypeName();}EventLoop* loop_;TcpClient client_;ProtobufDispatcher dispatcher_;ProtobufCodec codec_;
};int main(int argc, char* argv[])
{LOG_INFO << "pid = " << getpid();if (argc > 2){EventLoop loop;uint16_t port = static_cast<uint16_t>(atoi(argv[2]));InetAddress serverAddr(argv[1], port);muduo::Query query;query.set_id(1);query.set_questioner("Chen Shuo");query.add_question("Running?");muduo::Empty empty;messageToSend = &query;if (argc > 3 && argv[3][0] == 'e'){messageToSend = ∅}QueryClient client(&loop, serverAddr);client.connect();loop.loop();}else{printf("Usage: %s host_ip port [q|e]\n", argv[0]);}
}
与服务器非常相似。
模仿实现服务器客户端
现在我们模仿一下大佬来实现一下自己的服务器客户端。
proto结构的定义
syntax = "proto3";package jiuqi;message TranslateRequest
{string msg = 1;
};message TranslateResponse
{string msg = 1;
};message AddRequest
{int32 num1 = 1;int32 num2 = 2;
};message AddResponse
{int32 res = 1;
};
服务器
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "request.pb.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include <iostream>
#include <unistd.h>
#include <unordered_map>class Server
{
public:using MessagePtr = std::shared_ptr<google::protobuf::Message>;using TranslateRequestPtr = std::shared_ptr<jiuqi::TranslateRequest>;using TranslateResponsePtr = std::shared_ptr<jiuqi::TranslateResponse>;using AddRequestPtr = std::shared_ptr<jiuqi::AddRequest>;using AddResponsePtr = std::shared_ptr<jiuqi::AddResponse>;Server(int port) :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注册请求处理函数_dispatcher.registerMessageCallback<jiuqi::TranslateRequest>(std::bind(&Server::onTranslate, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::AddRequest>(std::bind(&Server::onAdd, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}void Start(){_server.start();_baseloop.loop();}private:std::string translate(const std::string &str){std::unordered_map<std::string, std::string> dict = {{"hello", "你好"},{"你好", "hello"},};auto it = dict.find(str);if(it == dict.end())return "没听懂";return it->second;}void onTranslate(const muduo::net::TcpConnectionPtr& conn,const TranslateRequestPtr& message,muduo::Timestamp){std::string msg = message->msg();std::string req = translate(msg);jiuqi::TranslateResponse resp;resp.set_msg(req);_codec.send(conn, resp);}void onAdd(const muduo::net::TcpConnectionPtr& conn,const AddRequestPtr& message,muduo::Timestamp){int num1 = message->num1();int num2 = message->num2();int res = num1+num2;jiuqi::AddResponse resp;resp.set_res(res);_codec.send(conn, resp);}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){LOG_INFO << "连接建立成功";}else{LOG_INFO << "连接关闭";}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server; // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数ProtobufCodec _codec; // protobuf协议处理器--针对收到的请求数据进行prototo协议处理
};int main()
{Server server(8080);server.Start();return 0;
}
客户端
#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "request.pb.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include <iostream>
#include <unistd.h>class Client
{
public:using MessagePtr = std::shared_ptr<google::protobuf::Message>;using TranslateRequestPtr = std::shared_ptr<jiuqi::TranslateRequest>;using TranslateResponsePtr = std::shared_ptr<jiuqi::TranslateResponse>;using AddRequestPtr = std::shared_ptr<jiuqi::AddRequest>;using AddResponsePtr = std::shared_ptr<jiuqi::AddResponse>;Client(const std::string ip, int port):_latch(1),_client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "client"),_dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注册请求处理函数_dispatcher.registerMessageCallback<jiuqi::TranslateResponse>(std::bind(&Client::onTranslate, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<jiuqi::AddResponse>(std::bind(&Client::onAdd, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));}void connect(){_client.connect();_latch.wait(); // 阻塞等待}void translate(const std::string &msg){jiuqi::TranslateRequest request;request.set_msg(msg);send(request);}void add(int num1, int num2){jiuqi::AddRequest request;request.set_num1(num1);request.set_num2(num2);send(request);}private:bool send(const google::protobuf::Message &msg){if(_conn && _conn->connected()){_codec.send(_conn, msg);return true;}else{std::cout << "未建立连接" << std::endl;return false;}}void onTranslate(const muduo::net::TcpConnectionPtr&,const TranslateResponsePtr& message,muduo::Timestamp){LOG_INFO << "翻译结果: " << message->msg();}void onAdd(const muduo::net::TcpConnectionPtr&,const AddResponsePtr& message,muduo::Timestamp){LOG_INFO << "加法结果: " << message->res();}void onUnknownMessage(const muduo::net::TcpConnectionPtr&,const MessagePtr& message,muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();}void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){_conn = conn;_latch.countDown();std::cout << "连接建立成功" << std::endl;}else{std::cout << "连接关闭成功" << std::endl;}}private:muduo::CountDownLatch _latch; // 实现同步的muduo::net::EventLoopThread _loopthread;// 异步循环处理线程muduo::net::TcpClient _client;ProtobufDispatcher _dispatcher; // 请求分发器ProtobufCodec _codec; // 协议处理器muduo::net::TcpConnectionPtr _conn; // 客户端对应连接};int main()
{Client client("127.0.0.1", 8080);client.connect();int op = 0;while(true){std::cout << "输入要进行的操作(1.翻译, 2.加法): ";std::cin >> op;if(op == 1){std::string str;std::cout << "输入要翻译的单词: ";std::cin >> str;client.translate(str);} else{int num1, num2;std::cout << "输入两个加数: ";std::cin >> num1 >> num2;client.add(num1, num2);}sleep(1);}return 0;
}
编译命令
all:server client
server:protobufserver.cc request.pb.cc ../include/muduo/proto/codec.ccg++ -o $@ $^ -I../include -I. -L../lib -lmuduo_net -lmuduo_base -lprotobuf -lz -pthread
client:protobufclient.cc request.pb.cc ../include/muduo/proto/codec.ccg++ -o $@ $^ -I../include -I. -L../lib -lmuduo_net -lmuduo_base -lprotobuf -lz -pthread