项目---网络通信组件JsonRpc
C++网络编程项目实践 --- JsonRpc
- 一、项目简介
- (一)项目介绍
- (二)项目模块
- (三)项目技术栈
- 二、项目构建基础
- (一)JsonCpp库
- 1、定义
- 2、存储方式
- 3、库的接口
- 4、样例实现
- (二)Muduo库
- 1、介绍
- 2、接口实现
- 3、样例实践
- (三)C++11异步操作
- 1、使⽤ std::async关联异步任务
- 2、使用std::packaged_task
- 3、使用std::promise
- (四)随机数的生成和uuid
- 三、框架设计
- (一)抽象层设计
- 1. BaseMessage
- 2、BaseBuffer
- 3、BaseProtocol
- 4、BaseConnection
- 5、BaseServer/BaseClient
- (二)具象层设计
- 1、消息类
- 2、MuduoBuffer
- 3、LvProtocol
- 4、MuduoConnection
- 5、MuduoServer/MuduoClient
- (三)业务层设计
- 1、Rpc调用端
- 2、RPC 服务端
- 3、服务注册中心
- 4、发布订阅系统
- 5、消息总线与异步调度
- 四、项目中遇到的一些问题
- (一)bind
- (二)muduo库在收发信息有网络序和主机序的转换
- (三)使用可变参数列表来设置工厂
- (四)可变参数编写日志宏
- (五)通过给参数传入对象形成类型
- (六)Json数组的遍历
一、项目简介
(一)项目介绍
本项目是一个基于 muduo 网络库 开发的 高性能 RPC 分布式框架,采用 C++17 编写,支持 服务注册与发现、异步调用、发布订阅、负载均衡 等功能,适用于构建高并发、可扩展的分布式系统。
- RPC 调用:客户端调用远程服务端已经注册的方法。
- 服务注册:服务端启动时向注册中心注册自己提供的方法。
- 服务发现:客户端根据方法名从注册中心获取服务提供者地址。
- 发布订阅:客户端订阅主题,服务端发布消息,客户端实时接收。
(二)项目模块
层级 | 模块名称 | 文件 | 功能简介 |
---|---|---|---|
抽象层 | BaseMessage | abstract.hpp | 定义所有消息的抽象接口 |
抽象层 | BaseConnection | abstract.hpp | 抽象 TCP 连接,屏蔽底层实现 |
抽象层 | BaseProtocol | abstract.hpp | 抽象协议处理,支持序列化与反序列化 |
抽象层 | BaseServer / BaseClient | abstract.hpp | 抽象服务端与客户端接口 |
具象层 | MuduoConnection | net.hpp | 基于 muduo 的 TCP 连接实现 |
具象层 | LvProtocol | net.hpp | 自定义 LV 协议实现 |
具象层 | MuduoServer / MuduoClient | net.hpp | 基于 muduo 的服务端与客户端实现 |
具象层 | JsonMessage | message.hpp | JSON 消息基类,支持序列化/反序列化 |
具象层 | RpcRequest / RpcResponse | message.hpp | RPC 请求与响应消息 |
具象层 | ServiceRequest / ServiceResponse | message.hpp | 服务注册与发现消息 |
具象层 | TopicRequest / TopicResponse | message.hpp | 发布订阅消息 |
业务层 | RpcCaller | rpc_caller.hpp | 客户端调用器,支持同步/异步/回调 |
业务层 | RpcClient | rpc_client.hpp | 客户端封装,支持服务发现与连接池 |
业务层 | RpcServer | rpc_server.hpp | 服务端封装,支持方法注册与监听 |
业务层 | RegisterServer / RegisterClient | rpc_register.hpp | 服务注册中心与客户端 |
业务层 | DiscoverClient | rpc_client.hpp | 服务发现客户端 |
业务层 | TopicServer / TopicClient | rpc_topic.hpp | 发布订阅系统服务端与客户端 |
业务层 | RpcRouter | rpc_router.hpp | 服务端方法路由与参数校验 |
业务层 | Dispatcher | dispatcher.hpp | 消息分发器,按类型分发消息 |
业务层 | Requestor | rpc_requestor.hpp | 请求管理器,管理异步请求生命周期 |
(三)项目技术栈
语言 | C++17 |
网络库 | muduo |
序列化 | jsoncpp |
构建工具 | Makefile |
平台 | Linux |
二、项目构建基础
(一)JsonCpp库
1、定义
JsonCpp
是采用完全独立于编程语言的格式来存储数据。
// C 代码表⽰
char *name = "xx";
int age = 18;
float score[3] = {88.5, 99, 58};//Json 表⽰
{"姓名" : "xx","年龄" : 18,"成绩" : [88.5, 99, 58],"爱好" :{"书籍" : "西游记","运动" : "打篮球"}
}
2、存储方式
Json 的数据类型包括对象,数组,字符串,数字等。
- 对象:使⽤花括号 {} 括起来的表⽰⼀个对象
- 数组:使⽤中括号 [] 括起来的表⽰⼀个数组
- 字符串:使⽤常规双引号 “” 括起来的表⽰⼀个字符串
- 数字:包括整形和浮点型,直接使⽤
3、库的接口
Json数据类型类的接口实现:
class Json::Value{Value &operator=(const Value &other); //Value重载了[]和=,因此所有的赋值和获取
数据都可以通过Value& operator[](const std::string& key);//简单的⽅式完成 val["name"] =
"xx";Value& operator[](const char* key);Value removeMember(const char* key);//移除元素const Value& operator[](ArrayIndex index) const; //val["score"][0]Value& append(const Value& value);//添加数组元素val["score"].append(88); ArrayIndex size() const;//获取数组元素个数 val["score"].size();std::string asString() const;//转string string name =
val["name"].asString();const char* asCString() const;//转char* char *name =
val["name"].asCString();Int asInt() const;//转int int age = val["age"].asInt();float asFloat() const;//转float float weight = val["weight"].asFloat();bool asBool() const;//转 bool bool ok = val["ok"].asBool();
};
Json序列化接口实现:
class JSON_API StreamWriter {virtual int write(Value const& root, std::ostream* sout) = 0;
}
class JSON_API StreamWriterBuilder : public StreamWriter::Factory {virtual StreamWriter* newStreamWriter() const;
}
Json反序列化接口实现:
class JSON_API CharReader {virtual bool parse(char const* beginDoc, char const* endDoc, Value* root, std::string* errs) = 0;
}
class JSON_API CharReaderBuilder : public CharReader::Factory {virtual CharReader* newCharReader() const;}
4、样例实现
#include <iostream>
#include <sstream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>bool serialize(Json::Value &root, std::string &jsonStr)
{std::stringstream ss;Json::StreamWriterBuilder builter;std::unique_ptr<Json::StreamWriter> writer(builter.newStreamWriter());int ret = writer->write(root, &ss);if(ret){std::cout << "serialize failed" << std::endl;return false;}jsonStr = ss.str();return true;
}bool deserialize(std::string &jsonStr, Json::Value &root)
{Json::CharReaderBuilder builter;std::unique_ptr<Json::CharReader> reader(builter.newCharReader());std::string errs;int ret = reader->parse(jsonStr.c_str(), jsonStr.c_str() + jsonStr.size(), &root, &errs);if(ret){std::cout << "deserialize failed" << std::endl;return false;}return true;
}int main()
{Json::Value root;root["name"] = "jsoncpp";root["age"] = 18;root["sex"] = "male";std::string jsonStr;serialize(root, jsonStr);std::cout << jsonStr << std::endl;Json::Value root2; deserialize(jsonStr, root2);std::cout << root2["name"] << std::endl;std::cout << root2["age"] << std::endl;std::cout << root2["sex"] << std::endl;for(auto it = root2.begin(); it != root2.end(); ++it){std::cout << it.key() << std::endl;}return 0;
}
(二)Muduo库
1、介绍
Muduo网络库简介
MuduoC++高并发TCP网络编程库,采用非阻塞IO和事件驱动架构,专为高性能网络应用设计。
核心架构
Muduo基于主从Reactor
模型实现,采用"one loop per thread"线程模型。该模型的特点是每个线程只能运行一个事件循环(EventLoop),用于处理计时器和IO事件。
关键特性
每个文件描述符严格绑定到单个线程进行读写操作,确保TCP连接的线程安全性。这意味着每个TCP连接必须由特定的EventLoop
实例管理,这种设计避免了多线程竞争问题。
性能优势
这种架构设计提供了高效的并发处理能力,同时保持了良好的线程隔离性,特别适合需要处理大量并发连接的网络应用场景。
2、接口实现
TcpServer实现:
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &, Buffer *, Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop,const InetAddress &listenAddr,const string &nameArg,Option option = kNoReusePort);void setThreadNum(int numThreads);void start();/// 当⼀个新连接建⽴成功的时候被调⽤void setConnectionCallback(const ConnectionCallback &cb){connectionCallback_ = cb;}/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数void setMessageCallback(const MessageCallback &cb){messageCallback_ = cb;}
}
事件循环类实现:
class EventLoop : noncopyable
{
public:/// Loops forever./// Must be called in the same thread as creation of the object.void loop();/// Quits loop./// This is not 100% thread safe, if you call through a raw pointer,/// better to call through shared_ptr<EventLoop> for 100% safety.void quit();TimerId runAt(Timestamp time, TimerCallback cb);/// Runs callback after @c delay seconds./// Safe to call from other threads.TimerId runAfter(double delay, TimerCallback cb);/// Runs callback every @c interval seconds./// Safe to call from other threads.TimerId runEvery(double interval, TimerCallback cb);/// Cancels the timer./// Safe to call from other threads.void cancel(TimerId timerId);private:std::atomic<bool> quit_;std::unique_ptr<Poller> poller_;mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
Tcp连接类实现:
class TcpConnection : noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:/// Constructs a TcpConnection with a connected sockfd////// User should not create this object.TcpConnection(EventLoop *loop,const string &name,int sockfd,const InetAddress &localAddr,const InetAddress &peerAddr);bool connected() const { return state_ == kConnected; }bool disconnected() const { return state_ == kDisconnected; }void send(string &&message); // C++11void send(const void *message, int len);void send(const StringPiece &message);// void send(Buffer&& message); // C++11void send(Buffer *message); // this one will swap datavoid shutdown(); // NOT thread safe, no simultaneous callingvoid setContext(const boost::any &context){context_ = context;}const boost::any &getContext() const{return context_;}boost::any *getMutableContext(){return &context_;}void setConnectionCallback(const ConnectionCallback &cb){connectionCallback_ = cb;}void setMessageCallback(const MessageCallback &cb){messageCallback_ = cb;}private:enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};EventLoop *loop_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;boost::any context_;
};
TcpClient实现:
class TcpClient : noncopyable
{
public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop *loop,const InetAddress &serverAddr,const string &nameArg);~TcpClient(); // force out-line dtor, for std::unique_ptr members.void connect(); // 连接服务器void disconnect(); // 关闭连接void stop();// 获取客⼾端对应的通信连接Connection对象的接⼝,发起connect后,有可能还没有连接建⽴成功TcpConnectionPtrconnection() const{MutexLockGuard lock(mutex_);return connection_;}/// 连接服务器成功时的回调函数void setConnectionCallback(ConnectionCallback cb){connectionCallback_ = std::move(cb);}/// 收到服务器发送的消息时的回调函数void setMessageCallback(MessageCallback cb){messageCallback_ = std::move(cb);}private:EventLoop *loop_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。
因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
*/
class CountDownLatch : noncopyable
{
public:explicit CountDownLatch(int count);void wait(){MutexLockGuard lock(mutex_);while (count_ > 0){condition_.wait();}}void countDown(){MutexLockGuard lock(mutex_);--count_;if (count_ == 0){condition_.notifyAll();}}int getCount() const;private:mutable MutexLock mutex_;Condition condition_ GUARDED_BY(mutex_);int count_ GUARDED_BY(mutex_);
};
Buffer类实现:
class Buffer : public muduo::copyable
{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend);
void swap(Buffer &rhs)size_t readableBytes() constsize_t writableBytes() constconst char *peek() constconst char *findEOL() constconst char *findEOL(const char *start) constvoid retrieve(size_t len) void retrieveInt64() void retrieveInt32() void retrieveInt16() void retrieveInt8()string retrieveAllAsString()string retrieveAsString(size_t len) void append(const StringPiece &str) void append(const char * /*restrict*/ data, size_t len) void append(const void * /*restrict*/ data, size_t len) char *beginWrite()const char *beginWrite() constvoid hasWritten(size_t len) void appendInt64(int64_t x) void appendInt32(int32_t x) void appendInt16(int16_t x) void appendInt8(int8_t x)int64_t readInt64()int32_t readInt32()int16_t readInt16()int8_t readInt8()int64_t peekInt64() constint32_t peekInt32() constint16_t peekInt16() constint8_t peekInt8() constvoid prependInt64(int64_t x) void prependInt32(int32_t x) void prependInt16(int16_t x) void prependInt8(int8_t x) void prepend(const void * /*restrict*/ data, size_t len) private : std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;static const char kCRLF[];
};
3、样例实践
实现一个英汉互译的小程序来使用一下Muduo库。
/*实现一个翻译服务器,客户端发送过来一个英语单词,返回一个汉语词语
*/
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>class DictServer {public:DictServer(int port): _server(&_baseloop,muduo::net::InetAddress("0.0.0.0", port),"DictServer", muduo::net::TcpServer::kReusePort){//设置连接事件(连接建立/管理)的回调_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));//设置连接消息的回调_server.setMessageCallback(std::bind(&DictServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start() {_server.start();//先开始监听_baseloop.loop();//开始死循环事件监控}private:void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {std::cout << "连接建立!\n";}else {std::cout << "连接断开!\n";}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"world", "世界"},};std::string msg = buf->retrieveAllAsString();std::string res;auto it = dict_map.find(msg);if (it != dict_map.end()) {res = it->second;}else {res = "未知单词!";}conn->send(res);}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
};int main()
{DictServer server(8080);server.start();return 0;
}
#include <muduo/net/Buffer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpClient.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>
#include <string>
#include <iostream>class DictClient
{
public:DictClient(const std::string sip, u_int16_t port): _downlatch(1), _baseloop(_loopthread.startLoop()), _dictclient(_baseloop, muduo::net::InetAddress(sip, port), "DictClient"){//设置连接事件(连接建立/管理)的回调_dictclient.setConnectionCallback(std::bind(&DictClient::_ConnectionCallback, this, std::placeholders::_1));//设置连接消息的回调_dictclient.setMessageCallback(std::bind(&DictClient::_MessageCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//连接服务器_dictclient.connect();_downlatch.wait();}void _ConnectionCallback(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "连接成功!" << std::endl;_conn = conn;_downlatch.countDown();}else{std::cout << "取消连接!" << std::endl;}}bool Send(const std::string &msg){if (_conn->connected() == false){std::cout << "连接已经断开,发送数据失败!\n";return false;}_conn->send(msg);}void _MessageCallback(const muduo::net::TcpConnectionPtr &conn,muduo::net::Buffer *buffer, muduo::Timestamp){std::string res = buffer->retrieveAllAsString();std::cout << res << std::endl;}private:muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::TcpConnectionPtr _conn; //? 引用还是复制 赋值muduo::net::EventLoop* _baseloop;muduo::net::TcpClient _dictclient;
};int main()
{DictClient client("127.0.0.1", 8080);while(1) {std::string msg;std::cin >> msg;client.Send(msg);}return 0;
}
(三)C++11异步操作
std::future
是C++11标准库中的⼀个模板类,它表⽰⼀个异步操作的结果。当我们在多线程编程中使⽤异步任务时,std::future
可以帮助我们在需要的时候获取任务的执⾏结果。std::future
的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。
1、使⽤ std::async关联异步任务
std::async 的基本概述
std::async 是 C++ 标准库中用于异步执行任务的工具,它关联一个任务与 std::future 对象,允许在需要时获取任务的结果。
参数与启动策略
std::async 的行为由其第一个参数(std::launch 类型)决定:
- std::launch::deferred:任务延迟执行,直到调用 future 的 get() 或 wait() 时才同步运行。
- std::launch::async:任务立即在独立线程中异步执行。
- std::launch::deferred | std::launch::async(默认):由系统自动选择策略(可能异步或延迟)。
#include <iostream>
#include <future>
#include <chrono>
int aysnc_task()
{std::this_thread::sleep_for(std::chrono::seconds(3));return 2;
}
int main()
{// 关联异步任务aysnc_task 和 futruestd::future<int> result_future = std::async(std::launch::async,aysnc_task);// 此处可执⾏其他操作, ⽆需等待std::cout << "hello bit!" << std::endl;// 获取异步任务结果int result = result_future.get();std::cout << "Result: " << result << std::endl;return 0;
}
2、使用std::packaged_task
std::packaged_task
就是将任务和std::feature
绑定在⼀起的模板,是⼀种对任务的封装。我们可以通过std::packaged_task
对象获取任务相关联的std::feature
对象,通过调⽤get_future()
⽅法获得。
std::packaged_task
的模板参数是函数签名。
#include <iostream>
#include <future>
#include <chrono>
int add(int num1, int num2)
{return num1 + num2;
}
int main()
{// 封装任务std::packaged_task<int(int, int)> task(add);// 此处可执⾏其他操作, ⽆需等待std::cout << "hello bit!" << std::endl;std::future<int> result_future = task.get_future();// 这⾥必须要让任务执⾏, 否则在get()获取future的值时会⼀直阻塞task(1, 2);// 获取异步任务结果int result = result_future.get();std::cout << "Result: " << result << std::endl;return 0;
}
#include <iostream>
#include <future>
#include <chrono>
#include <memory>
int add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(3));return num1 + num2;
}
int main()
{// 封装任务// std::packaged_task<int(int, int)> task(add);// 此处可执⾏其他操作, ⽆需等待// std::cout << "hello bit!" << std::endl;// std::future<int> result_future = task.get_future();// 需要注意的是,task虽然重载了()运算符,但task并不是⼀个函数,// std::async(std::launch::async, task, 1, 2); //--错误⽤法// 所以导致它作为线程的⼊⼝函数时,语法上看没有问题,但是实际编译的时候会报错// std::thread(task, 1, 2); //---错误⽤法// ⽽packaged_task禁⽌了拷⻉构造,// 且因为每个packaged_task所封装的函数签名都有可能不同,因此也⽆法当作参数⼀样传递// 传引⽤不可取,毕竟任务在多线程下执⾏存在局部变量声明周期的问题,因此不能传引⽤// 因此想要将⼀个packaged_task进⾏异步调⽤,// 简单⽅法就只能是new packaged_task,封装函数传地址进⾏解引⽤调⽤了// ⽽类型不同的问题,在使⽤的时候可以使⽤类型推导来解决auto task = std::make_shared<std::packaged_task<int(int, int)>>(add);std::future<int> result_future = task->get_future();std::thread thr([task](){ (*task)(1, 2); });thr.detach();// 获取异步任务结果int result = result_future.get();std::cout << "Result: " << result << std::endl;return 0;
}
3、使用std::promise
std::promise
提供了⼀种设置值的⽅式,它可以在设置之后通过相关联的std::future
对象进⾏读取。std::future
可以读取⼀个异步函数的返回值了, 但是要等待就绪, ⽽std::promise
就提供⼀种 ⽅式手动std::future
就绪
#include <iostream>
#include <future>
#include <chrono>
void task(std::promise<int> result_promise)
{int result = 2;std::cout << "task result:" << result << std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));result_promise.set_value(result);
}
int main()
{// 创建promisestd::promise<int> result_promise;std::future<int> result_future = result_promise.get_future();// 创建⼀个新线程, 执⾏⻓时间运⾏的任务std::thread task_thread(task, std::move(result_promise));// 此处可执⾏其他操作, ⽆需等待std::cout << "hello bit!" << std::endl;// 获取异步任务结果int result = result_future.get();std::cout << "Result: " << result << std::endl;task_thread.join();return 0;
}
(四)随机数的生成和uuid
- 1、使用机器随机数产生一个随机数种子。(不能直接以机器随机数持续生成,否则效率低下)
- 2、使用mt19937并且以机器随机数种子生成随机数对象。(使用mt19937生成随机数效率高效)
- 3、生成控制随机数生成范围的对象。
std::setw()
:设置下一个输出项的字段宽度。
std::setfill()
:指定当字段宽度不足时用来填充的字符。
std::hex()
:把输出流切换为十六进制格式(不是函数,是 manipulator)。
std::atomic<>()
:把变量包装成原子类型,线程访问无需额外锁。
fetch_add()
:原子地读取并加上指定值,返回原来的数值。
class UUID{public:static std::string uuid(){std::stringstream ss;// 1、创建机器随机数对象std::random_device rd;// 2、利用机器随机数创建随机生成随机数对象std::mt19937 rand_generator(rd());// 3、生成控制随机数生成范围对象。std::uniform_int_distribution<int> distribution(0, 255);for (int i = 0; i < 8; i++){if (i == 4 || i == 6)ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << distribution(rand_generator);}DEBUG_LOG("%s", ss.str().c_str());// DEBUG_LOG("%s", "hello");static std::atomic<size_t> count(0);count.fetch_add(1);ss << "-";for (int i = 7; i >= 0; i--){if (i == 5)ss << "-";ss << std::setw(2) << std::setfill('0') << std::hex << ((count >> 8 * i) & 0xff);}DEBUG_LOG("%s", ss.str().c_str());return ss.str();}};
三、框架设计
(一)抽象层设计
1. BaseMessage
作用:所有消息的基类,定义消息 ID、类型、序列化接口
- 提供两个成员的接口
setId() / id()
setType() / type()- 纯虚函数(虚基类不能实例化但是可以作为指向子类的指针)
serialize() / deserialize() / check()
2、BaseBuffer
作用:抽象 TCP 连接,屏蔽底层网络库差异
- 纯虚函数
send():发送消息
shutdown():关闭连接
connected():判断连接状态
3、BaseProtocol
作用:抽象协议处理,定义序列化与反序列化接口
纯虚函数:
serialize():消息转字符串
onMessage():字符串转消息
canProcessd():判断缓冲区是否足够解析一条消息
4、BaseConnection
作用:抽象 TCP 连接,屏蔽底层网络库差异
纯虚函数
send():发送消息
shutdown():关闭连接
connected():判断连接状态
5、BaseServer/BaseClient
作用:抽象服务端与客户端,定义生命周期与回调接口
纯虚函数
服务器:start()
客服端: connect() / shutdown() /connected()/conn()/send()
共同成员函数 :setMessageCallback() / setConnectionCallback() / setCloseCallback()
(二)具象层设计
1、消息类
类名 | 基类 | 关键数据 | 核心接口(具象实现) |
---|---|---|---|
JsonMessage | BaseMessage | Json::Value _body | string serialize() bool deserialize(const string &) |
RpcRequest | JsonMessage | — | setMethod(m) / method() setParam(v) / Param() bool check() |
RpcResponse | JsonMessage | — | setResult(v) / Result() setRCode(c) / rCode() |
ServiceRequest | JsonMessage | — | setMethod(m) / method() setOpType(t) / opType() setHost(addr) / host() bool check() |
ServiceResponse | JsonMessage | — | setMethod(m) / Method() setOpType(t) / Optype() setHost(vec) / host() |
TopicRequest | JsonMessage | — | setTopicKey(k) / topicKey() setOpType(t) / opType() setMessage(msg) / message() bool check() |
TopicResponse | JsonMessage | — | setRCode(c) / rCode() |
2、MuduoBuffer
类名 | 基类 | 关键数据 | 核心接口(具象实现) |
---|---|---|---|
MuduoBuffer | BaseBuffer | muduo::net::Buffer *_buf | size_t readAbleSize() int32_t peekInt32_t() void retrieveInt32_t() int32_t readInt32_t() string retrieveAsString(len) |
3、LvProtocol
类名 | 基类 | 关键数据 | 核心接口(具象实现) |
---|---|---|---|
LvProtocol | BaseProtocol | size_t lenFieldsLength = 4 typeFieldsLength = 4 idFieldsLength = 4 | bool canProcessd(buffer) bool onMessage(buffer, message) string serialize(message) |
4、MuduoConnection
类名 | 基类 | 关键数据 | 核心接口(具象实现) |
---|---|---|---|
MuduoConnection | BaseConnection | muduo::net::TcpConnectionPtr _conn BaseProtocol::Ptr _protocol | void send(msg) void shutdown() bool connected() |
5、MuduoServer/MuduoClient
类名 | 基类 | 关键数据 | 核心接口(具象实现) |
---|---|---|---|
MuduoServer | BaseServer | muduo::net::TcpServer _server muduo::net::EventLoop _loop BaseProtocol::Ptr _protocol | void start() |
MuduoClient | BaseClient | muduo::net::TcpClient _client muduo::net::EventLoopThread _loopThread BaseProtocol::Ptr _protocol | void connect() void shutdown() void send(msg) bool connected() BaseConnection::Ptr conn() |
(三)业务层设计
1、Rpc调用端
类名 | 作用 | 关键数据 | 核心接口(业务级) |
---|---|---|---|
RpcCaller | 客户端调用器:同步/异步/回调三种方式 | Requestor::Ptr _requestor | bool call(conn,method,params,result) 同步bool call(conn,method,params,JsonAsyncResponse&) 异步bool call(conn,method,params,JsonCallback) 回调 |
RpcClient | 客户端门面:服务发现+连接池管理 | DiscoverClient::Ptr _discover_client unordered_map<Address,BaseClient::Ptr> _clients | bool send(method,params,result/future/cb) BaseClient::Ptr getClient(method) |
2、RPC 服务端
类名 | 作用 | 关键数据 | 核心接口(业务级) |
---|---|---|---|
RpcServer | 服务端门面:注册方法+监听 | RpcRouter::Ptr _router RegisterClient::Ptr _register_client | void registerMethod(service) void start() |
RpcRouter | 方法路由:参数校验+业务回调 | ServiceManager _service_manager | void onRpcRequest(conn,request) void registerRequest(service) |
3、服务注册中心
类名 | 作用 | 关键数据 | 核心接口(业务级) |
---|---|---|---|
RegisterServer | 注册中心服务器:处理注册/发现/上下线通知 | PDManager::Ptr _pdmanager | void start() |
RegisterClient | 服务注册客户端:向中心注册方法 | Provider::Ptr _provider | bool registorMethod(conn,host,method) |
DiscoverClient | 服务发现客户端:获取地址+本地缓存 | Discover::Ptr _discover | bool serviceDiscover(method,host) |
PDManager | 注册中心核心业务:Provider/Discover 生命周期管理 | ProviderManager _providermanager DiscoverManager _discovermanager | void onServerRequest(conn,msg) void onConnShutdown(conn) |
4、发布订阅系统
类名 | 作用 | 关键数据 | 核心接口(业务级) |
---|---|---|---|
Dispatcher | 消息总线:按类型分发到业务回调 | unordered_map<MType,CallBack::Ptr> _callbacks | template<typename T> registerCallback(type,handler) void onMessage(conn,msg) |
Requestor | 异步请求生命周期管理:future/callback | unordered_map<string,RequestDescribe::Ptr> _request_desc | bool send(conn,req,future/callback) void onResponse(conn,msg) |
5、消息总线与异步调度
类名 | 作用 | 关键数据 | 核心接口(业务级) |
---|---|---|---|
Dispatcher | 消息总线:按类型分发到业务回调 | unordered_map<MType,CallBack::Ptr> _callbacks | template<typename T> registerCallback(type,handler) void onMessage(conn,msg) |
Requestor | 异步请求生命周期管理:future/callback | unordered_map<string,RequestDescribe::Ptr> _request_desc | bool send(conn,req,future/callback) void onResponse(conn,msg) |
四、项目中遇到的一些问题
(一)bind
std::bind 按值拷贝/拷贝构造函数把 Dispatcher
拷了一份,而 Dispatcher
的拷贝构造函数被删除或不可访问(典型的 std::mutex
、std::atomic
等成员会导致编译器隐式删除拷贝构造),于是
std::bind(&RPC::Dispatcher::onMessage, dispatcher, ...)
编译失败;改成传指针(地址)就没有拷贝问题,所以能过。
auto onMessage = std::bind(&RPC::Dispatcher::onMessage, &dispatcher, std::placeholders::_1, std::placeholders::_2);server->setMessageCallback(onMessage);
private:std::mutex _mutex;std::unordered_map<MType, CallBack::Ptr> _callbacks;
这里只需要把传值改成传指针就没有拷贝了,或者自己显示定义拷贝构造。
(二)muduo库在收发信息有网络序和主机序的转换
virtual std::string serialize(const BaseMessage::Ptr &message){int type = (int)message->type();std::string id = message->id();std::string body = message->serialize();// 转成网络字节序。int type_n = htonl(type);int idlen_n = htonl(id.size());int totallen_h = typeFieldsLength + idFieldsLength + id.size() + body.size();int totallen_n = htonl(totallen_h);// 序列化形成字符串std::string ret;ret.reserve(totallen_h + lenFieldsLength);ret.append((char *)&totallen_n, lenFieldsLength);ret.append((char *)&type_n, typeFieldsLength);ret.append((char *)&idlen_n, idFieldsLength);ret.append(id);ret.append(body);// std::cout << "type: " << type << std::endl;// std::cout << "id.size: " << id.size() << std::endl;// std::cout << "body.size(): " << body.size() << std::endl;// std::cout << "totallen_h" << totallen_h << std::endl;// ERROR_LOG("%s", id.c_str());// ERROR_LOG("%s", body.c_str());return ret;}
- 这里的
htonl
函数会可以将主机序转化为网络序,不要用成htons
这个只能转换16字节。- 直接使用
append
而不是用string
构造可以减少一次构造函数的消耗。
(三)使用可变参数列表来设置工厂
使用可变参数列表设置工厂模式,这个是因为构造函数的参数数量是不确定的,使用可变参数以便于可以适合各种类型的参数。
class MessageFactory{public:static BaseMessage::Ptr create(MType type){switch (type){case MType::REQ_RPC:return std::make_shared<RpcRequest>();case MType::RES_RPC:return std::make_shared<RpcResponse>();case MType::REQ_TOPIC:return std::make_shared<TopicRequest>();case MType::RES_TOPIC:return std::make_shared<TopicResponse>();case MType::REQ_SERVICE:return std::make_shared<ServiceRequest>();case MType::RES_SERVICE:return std::make_shared<ServiceResponse>();}return BaseMessage::Ptr();}template <typename T, typename... Args>static std::shared_ptr<T> create(Args &&...args){return std::make_shared<T>(std::forward<Args>(args)...);}};
(四)可变参数编写日志宏
可变参数编写日志宏可以在写日志时控制参数的个数等,当然也可以像之前一样使用仿函数重载,简便的化可以使用下面的方法,但是可能导致线程安全的问题。
template <typename T>LogMessage &operator<<(const T &info){std::stringstream ssbuffer;ssbuffer << info;_loginfo += ssbuffer.str();return *this;}
#define LOG(levelstr, level, format, ...) \{ \if (level >= DEFAULT_LEVEL) \{ \time_t timestamp = time(0); \tm *cur_time = localtime(×tamp); \printf("[%d_%d_%d %d:%d:%d|%s:%d]" \"[" \"%s" \"]" \": " format "\n", \cur_time->tm_year + 1900, cur_time->tm_mon + 1, \cur_time->tm_mday, cur_time->tm_hour, cur_time->tm_min, cur_time->tm_sec, \__FILE__, __LINE__, levelstr, ##__VA_ARGS__); \} \}#define INFO_LOG(format, ...) LOG("INFO", _INFO, format, ##__VA_ARGS__)
#define DEBUG_LOG(format, ...) LOG("DEBUG", _DEBUG, format, ##__VA_ARGS__)
#define ERROR_LOG(format, ...) LOG("ERROR", _ERROR, format, ##__VA_ARGS__)
(五)通过给参数传入对象形成类型
我们在对每一种类型设置回调函数时,因为要设计到调用子类对象成员,我们这里不能使用基类。(当然也可以使用将智能指针指向的对象强转)。
为了使代码更加优雅,可扩展性。我们使用下面的设计思路,传入子类消息对象类型,生成一种函数类型的对象使用类进行管理,由于unordered_map
不支持传入不同类型的对象,因此我们再给管理函数的类加一个基类,由于多态的原理, 我们插入unordered_map
时只需要传子类对象,就可以利用切片原理同时插入。
#pragma once
#include "field.hpp"
#include "tool.hpp"
#include "message.hpp"
#include "net.hpp"
#include <unordered_map>
#include <mutex>// using MessageCallback = std::function<void(const BaseConnection::Ptr &, BaseMessage::Ptr &)>;namespace RPC
{class CallBack{public:using Ptr = std::shared_ptr<CallBack>;virtual void onMessage(const BaseConnection::Ptr &conn, BaseMessage::Ptr &msg) = 0;};template <class T>class CallBackT : public CallBack{public:using Ptr = std::shared_ptr<CallBackT<T>>;using MsgCallback =std::function<void(const BaseConnection::Ptr &, std::shared_ptr<T>& )>;CallBackT(MsgCallback handler) : _handler(handler) {}void onMessage(const BaseConnection::Ptr &conn, BaseMessage::Ptr &msg){auto type_msg = std::dynamic_pointer_cast<T>(msg);_handler(conn, type_msg);}private:MsgCallback _handler;};class Dispatcher{public:using Ptr = std::shared_ptr<Dispatcher>;template <class T>void registerCallback(const MType &type, const typename CallBackT<T>::MsgCallback &handler){std::unique_lock<std::mutex> lock(_mutex);if (_callbacks.count(type)){ERROR_LOG("该回调函数已经注册过了!");return;}auto cb = std::make_shared<CallBackT<T>>(handler);_callbacks.insert({type, cb});DEBUG_LOG("回调函数组测成功!");}void onMessage(const BaseConnection::Ptr &conn, BaseMessage::Ptr &msg){std::unique_lock<std::mutex> lock(_mutex);MType type = msg->type();if (_callbacks.count(type)){_callbacks[type]->onMessage(conn, msg);return;}ERROR_LOG("当前没有注册这个回调函数!");}private:std::mutex _mutex;std::unordered_map<MType, CallBack::Ptr> _callbacks;};
}
(六)Json数组的遍历
在进行服务应答时,服务器要返回发现客服端能提供服务的服务器,这时候返回的是一个Address
类型的数组,因此要解读json
的key_val