【从零实现Json-Rpc框架】- 项目实现 - muduo网络通信类实现篇
📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、muduo网络的意义和作用
- 1.1 Muduo的核心价值
- 1.2 对比传统的socket
- 🏳️🌈二、Muduo通信基类实现
- 2.1 BaseBuffer 缓冲区基类实现
- 2.2 BaseConnection 连接基类实现
- 2.3 BaseProtocol 协议基类实现
- 2.4 回调函数定义
- 2.5 BaseServer 服务端基类实现
- 2.6 BaseClient 客户端基类实现
- 🏳️🌈 三、Muduo 封装实现
- 3.1 MuduoBuffer 缓冲区封装
- 3.2 MuduoConnection 连接封装
- 3.3 MuduoServer 服务端封装
- 3.4 MuduoClient 客户端封装
- 3.5 LVProtocol 协议封装
- 3.6 服务端处理流程图
- 🏳️🌈 四、整体代码
- 4.1 abstract.hpp
- 4.2 Message.hpp
- 4.3 net.hpp
- 👥总结
📢前言
前几篇文章中,笔者介绍了rpc
的原理和目的,也介绍了需要使用的部分第三方库
和我们所需实现的功能
现在我们着手项目实现篇章
,目前零碎接口 和 项目消息字段类型 都已经完成了
接下来我们将着手 抽象层的实现,在完成了通信抽象实现
中的消息抽象实现
后,我们将实现
这里的代码,比如说 MType
就是用我们之前封装的 fields.hpp
代码中的消息类型定义,所以如果有需要的话,可以去前面的文章中取,或者笔者的仓库中linux_test/lesson/CSDN_code/project1_RPC/source/common
中也有
🏳️🌈一、muduo网络的意义和作用
其实在前面的文章笔者中笔者就已经提到过 muduo网络的意义和作用 ,但是为了加深印象,笔者还是想再说一下(其实是我自己有点忘了,再回顾一遍)。
1.1 Muduo的核心价值
通俗的来说,muduo
库是一大佬封装好的 TCP底层网络通信 的一个库,就像我们在学习期间制作的那个socket
库一样。是一个基于Reactor
模式的C++网络库,主要用于处理高并发的网络通信
因为它的高性能和事件驱动模型,适用于需要处理大量并发连接的场景。此外,muduo提供了简洁的API,方便开发者处理TCP连接、缓冲区和事件循环,而无需直接处理底层套接字编程的复杂性
// MuduoServer 初始化
MuduoServer(int port) : _server(&_baseloop, muduo::net::InetAddress(...), ...) {}
// 事件循环启动
virtual void start() {
_server.start();
_baseloop.loop(); // muduo 事件循环驱动
}
1.2 对比传统的socket
性能优势场景
- 高并发连接:单机万级并发连接处理能力
- 低延迟要求:微秒级事件响应(适合RPC场景)
- 资源高效利用:通过 EventLoop 减少线程切换开销
🏳️🌈二、Muduo通信基类实现
在上一篇文章中,笔者针对通信中的各个 消息 做了基类的封装,并细分了。
当然,muduo通信是不仅仅只有消息的封装的,还需要有
- 缓冲区
- 连接
- 服务端
- 客户端
- 协议
我们上一篇中实现的仅仅只是下图中字段的那么工作,呈现 顺时针 方向工作
整个流程就是
- 客户端 设置 请求字段
- 客户端 发送 请求连接和字段
- 服务端 收到 请求连接和字段
- 服务端 调用 请求方法
- 服务端 设置 响应字段
- 服务端 发送 响应连接和字段
- 客户端 收到 响应连接和字段
- 客户端 调用 响应方法
中间每次收到连接和字段时,都会根据协议去判定一下是否合法
2.1 BaseBuffer 缓冲区基类实现
// 基类缓冲区类
class BaseBuffer{
// 方法名中的 32 特指 操作的数据类型为 32 位整数
public:
using ptr = std::shared_ptr<BaseBuffer>;
// 获取可读字节数
virtual size_t readableSize() = 0;
// 获取可写字节数
virtual int32_t peekInt32() = 0;
// 接收字节
virtual void retrieveInt32() = 0;
// 读取字节
virtual int32_t readInt32() = 0;
// 接收字节
virtual std::string retrieveAsString(size_t len) = 0;
};
2.2 BaseConnection 连接基类实现
// 基类连接类
class BaseConnection{
public:
using ptr = std::shared_ptr<BaseConnection>;
// 发送消息
virtual void send(const BaseMessage::ptr& msg) = 0;
// 关闭连接
virtual void shutdown() = 0;
// 是否连接成功
virtual bool connected() = 0;
};
2.3 BaseProtocol 协议基类实现
// 基类协议类
class BaseProtocol{
public:
using ptr = std::shared_ptr<BaseProtocol>;
// 是否能够处理该消息
virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;
// 处理消息
virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0;
// 序列化消息
virtual std::string serialize(const BaseMessage::ptr& msg) = 0;
};
2.4 回调函数定义
// 连接回调函数
using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;
// 连接关闭回调函数
using CloseCallback = std::function<void(const BaseConnection::ptr&)>;
// 消息回调函数
using MessageCallBack = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;
2.5 BaseServer 服务端基类实现
// 基类服务类
class BaseServer{
public:
using ptr = std::shared_ptr<BaseServer>;
// 设置连接回调函数
virtual void setConnectionCallback(const ConnectionCallback& cb){
_cb_connection = cb;
}
// 设置连接关闭回调函数
virtual void setCloseCallback(const CloseCallback& cb){
_cb_close = cb;
}
// 设置消息回调函数
virtual void setMessageCallback(const MessageCallBack& cb){
_cb_message = cb;
}
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallBack _cb_message;
};
2.6 BaseClient 客户端基类实现
// 基类客户端
class BaseClient{
public:
using ptr = std::shared_ptr<BaseClient>;
// 设置连接回调函数
virtual void setConnectionCallback(const ConnectionCallback& cb){
_cb_connection = cb;
}
// 设置连接关闭回调函数
virtual void setCloseCallback(const CloseCallback& cb){
_cb_close = cb;
}
// 设置消息回调函数
virtual void setMessageCallback(const MessageCallBack& cb){
_cb_message = cb;
}
// 开始连接
virtual void connect() = 0;
// 关闭连接
virtual void shutdown() = 0;
// 发送消息
virtual bool send(const BaseMessage::ptr& msg) = 0;
// 获取连接
virtual BaseConnection::ptr connection() = 0;
// 是否连接成功
virtual bool connected() = 0;
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallBack _cb_message;
};
🏳️🌈 三、Muduo 封装实现
├── Network Abstraction (基于muduo)
│ ├── MuduoBuffer : 封装muduo::net::Buffer
│ ├── MuduoConnection : 封装muduo::net::TcpConnection
│ ├── MuduoServer : 基于muduo::net::TcpServer的服务端实现
│ └── MuduoClient : 基于muduo::net::TcpClient的客户端实现
│
└── Protocol Layer (LV协议)
├── LVProtocol : 处理消息的序列化/反序列化
└── Factories : Buffer/Protocol/Connection的工厂类
在进行 Muduo 封装的时候,我们可以利用工厂化思想,提前封装好工厂,便于后续的使用和管理
3.1 MuduoBuffer 缓冲区封装
class MuduoBuffer : public BaseBuffer {
public:
using ptr = std::shared_ptr<BaseBuffer>;
MuduoBuffer(muduo::net::Buffer* buf)
: _buf(buf)
{}
// 获取可读字节数
virtual size_t readableSize() override{
return _buf->readableBytes();
}
// 获取可写字节数
virtual int32_t peekInt32() override{
// muduo 是一个网络库,从缓冲区取出一个4字节整形,会进行网络字节序的转换
return _buf->peekInt32();
}
// 接收字节
virtual void retrieveInt32() override{
_buf->retrieveInt32();
}
// 读取字节
virtual int32_t readInt32() override{
return _buf->readInt32();
}
// 接收字节
virtual std::string retrieveAsString(size_t len) override{
return _buf->retrieveAsString(len);
}
private:
muduo::net::Buffer* _buf;
};
class BufferFactory{
public:
template<typename ...Args>
static BaseBuffer::ptr create(Args&& ...args){
return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);
}
};
3.2 MuduoConnection 连接封装
class MuduoConnection : public BaseConnection {
public:
using ptr = std::shared_ptr<MuduoConnection>;
MuduoConnection(const BaseProtocol::ptr& protocol, // [!code focus:3]
const muduo::net::TcpConnectionPtr& conn)
: _protocol(protocol),
_conn(conn)
{}
// 发送消息
virtual void send(const BaseMessage::ptr& msg) override{
std::string body = _protocol->serialize(msg);
_conn->send(body);
}
// 关闭连接
virtual void shutdown() override{
_conn->shutdown();
}
// 是否连接成功
virtual bool connected() override{
return _conn->connected();
}
private:
muduo::net::TcpConnectionPtr _conn;
BaseProtocol::ptr _protocol;
};
class ConnectionFactory{
public:
template<typename ...Args>
static BaseConnection::ptr create(Args&& ...args){
return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);
}
};
3.3 MuduoServer 服务端封装
class MuduoServer : public BaseServer{
public:
using ptr = std::shared_ptr<MuduoServer>;
MuduoServer(int port)
: _server(
&_baseloop,
muduo::net::InetAddress("0.0.0.0", port),
"DictServer",
muduo::net::TcpServer::kReusePort
),
_protocol(ProtocolFactory::create())
{}
virtual void start(){
_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,
std::placeholders::_1, // TcpConnectionPtr
std::placeholders::_2, // Buffer*
std::placeholders::_3 // Timestamp
));
_server.start(); // 先开始监听
_baseloop.loop(); // 开始循环事件监控
}
private:
// 方法: 连接
void onConnection(const muduo::net::TcpConnectionPtr& conn){
if(conn->connected()){
std::cout << "连接成功!\n";
auto muduo_conn = ConnectionFactory::create(_protocol, conn);
{
std::unique_lock<std::mutex> lock(_mutex);
_conns.insert(std::make_pair(conn, muduo_conn));
}
// 调用连接回调函数
// 通过 if(_cb_xxx) 检查回调是否有效,避免程序崩溃。
if(_cb_connection) _cb_connection(muduo_conn);
}
else{
std::cout << "连接断开!\n";
BaseConnection::ptr muduo_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if(it == _conns.end())
return;
muduo_conn = it->second;
_conns.erase(it);
}
// 调用关闭回调函数
if(_cb_close) _cb_close(muduo_conn);
}
}
// 方法: 处理客户端请求
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp){
DLOG("连接有数据到来,开始处理!");
auto base_buf = BufferFactory::create(buf);
while(1){
if(_protocol->canProcessed(base_buf) == false){
// 无法正确处理缓冲区消息
// 防备一种情况:
// 缓冲区数据过大,导致无法正确处理,导致程序崩溃。
if(base_buf->readableSize() > maxDataSize){
conn->shutdown();
ELOG("缓冲区数据过大,导致无法正确处理!");
return;
}
ELOG("数据量不足!");
break;
}
DLOG("缓冲区中数据可处理!");
BaseMessage::ptr msg;
bool ret = _protocol->onMessage(base_buf, msg);
if(ret == false){
conn->shutdown();
ELOG("缓冲区数据读取错误!")
return;
}
DLOG("反序列化成功!");
DLOG("开始寻找对应的连接对象!");
BaseConnection::ptr base_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if(it == _conns.end()){
ELOG("连接不存在!");
conn->shutdown();
return;
}
base_conn = it->second;
}
DLOG("调用回调函数进行消息处理!");
if(_cb_message) _cb_message(base_conn, msg);
}
}
private:
const size_t maxDataSize = (1 << 16);
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
BaseProtocol::ptr _protocol;
std::mutex _mutex;
// muduo::net::TcpConnectionPtr是指客户端的连接对象
// BaseConnection::ptr是指服务端的连接对象
std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;
};
class ServerFactory{
public:
template<typename ...Args>
static MuduoServer::ptr create(Args&& ...args){
return std::make_shared<MuduoServer>(std::forward<Args>(args)...);
}
};
3.4 MuduoClient 客户端封装
class MuduoClient : public BaseClient{
public:
using ptr = std::shared_ptr<MuduoClient>;
MuduoClient(const std::string& ip, int port)
: _protocol(ProtocolFactory::create()),
_baseloop(_loopthread.startLoop()),
_downlatch(1),
_client(_baseloop, muduo::net::InetAddress(ip, port), "MuduoClient")
{}
// 开始连接
virtual void connect() override{
DLOG("设置回调函数,连接服务器!");
// 设置连接事件(连接建立/管理)的回调
_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));
// 设置连接消息的回调
_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,
std::placeholders::_1, // TcpConnectionPtr
std::placeholders::_2, // Buffer*
std::placeholders::_3 // Timestamp
));
// 连接服务器
_client.connect();
_downlatch.wait(); // 等待连接成功
DLOG("连接服务器成功!");
}
// 关闭连接
virtual void shutdown() override{
_client.disconnect();
}
// 发送消息
virtual bool send(const BaseMessage::ptr& msg) override{
if(connected() == false){
ELOG("连接失败,不能发送消息!");
return false;
}
_conn->send(msg);
return true;
}
// 获取连接
virtual BaseConnection::ptr connection() override{
return _conn;
}
// 是否连接成功
virtual bool connected() override{
return (_conn && _conn->connected());
}
private:
// 方法: 连接
void onConnection(const muduo::net::TcpConnectionPtr& conn){
if(conn->connected()){
std::cout << "连接建立!\n";
_conn = ConnectionFactory::create(_protocol, conn);
//在运行一下试试,重新编译一下,
_downlatch.countDown(); // 计数--,为0时,唤醒阻塞,表示连接成功
}
else{
std::cout << "连接失败!\n";
_conn.reset(); // 清空连接
}
}
// 方法: 处理服务端请求
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){
DLOG("连接有数据到来,开始处理!");
auto base_buf = BufferFactory::create(buf);
while(1) {
if (_protocol->canProcessed(base_buf) == false) {
if (base_buf->readableSize() > maxDataSize) {
conn->shutdown();
ELOG("缓冲区中数据过大!");
return ;
}
DLOG("数据量不足!");
break;
}
DLOG("缓冲区中数据可处理!");
BaseMessage::ptr msg;
bool ret = _protocol->onMessage(base_buf, msg);
if (ret == false) {
conn->shutdown();
ELOG("缓冲区中数据错误!");
return;
}
DLOG("缓冲区中数据解析完毕,调用回调函数进行处理!");
if(_cb_message) _cb_message(_conn, msg);
}
}
private:
const size_t maxDataSize = (1 << 16);
BaseProtocol::ptr _protocol; // 协议对象
BaseConnection::ptr _conn; // 连接对象
muduo::CountDownLatch _downlatch; // 计数器
muduo::net::EventLoopThread _loopthread; // 事件循环线程
muduo::net::EventLoop* _baseloop; // 事件循环对象
muduo::net::TcpClient _client; // 客户端对象
};
class ClientFactory{
public:
template<typename ...Args>
static MuduoClient::ptr create(Args&& ...args){
return std::make_shared<MuduoClient>(std::forward<Args>(args)...);
}
};
客户端这里有一点需要格外注意下
_conn = ConnectionFactory::create(_protocol, conn);
//在运行一下试试,重新编译一下,
_downlatch.countDown(); // 计数--,为0时,唤醒阻塞,表示连接成功
我们采用的是计数器的方式实现,阻塞唤醒的功能,所以我们需要先建立连接,再计数器减减
这里其实两个线程一个线程是你执行代码的主线程,还有另一个线程是muduo库
内部的线程
这个动作是你的主线程要等待muduo库
内部的线程,在连接完成之后,将这个连接成功的connection赋值到你的这个connection上,之后在唤醒主线程就不会出现问题了
3.5 LVProtocol 协议封装
LV协议格式
+------------+------------+------------+-----------+-----------+
| 4字节总长度 | 4字节消息类型 | 4字节ID长度 | 变长ID | 变长消息体 |
+------------+------------+------------+-----------+-----------+
// LV代表Length-Value,也就是长度-值结构
class LVProtocol : public BaseProtocol {
public:
// | --len-- | --value-- |
// 一个消息包含的内容
// 4 4 4 idlen bodylen
// | --len-- | --mtype-- | --idlen-- | --id-- | --body-- |
using ptr = std::shared_ptr<BaseProtocol>;
// 是否能够处理该消息
virtual bool canProcessed(const BaseBuffer::ptr& buf) override{
if (buf->readableSize() < lenFieldsLength){
ELOG("消息长度不足,无法处理!");
return false; // 先检查最小长度
}
int32_t total_len = buf->peekInt32();
DLOG("total_len:%d", total_len);
if (buf->readableSize() < (total_len + lenFieldsLength)) {
ELOG("数据过大,无法处理!");
return false;
}
return true;
}
// 处理消息
virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) override{
// 当调用 onMessage 时,默认任务缓冲区中的数据足够一条完整的消息
int32_t total_len = buf->readInt32();
MType mtype = (MType)buf->readInt32();
int32_t idlen = buf->readInt32();
std::string id = buf->retrieveAsString(idlen);
int32_t body_len = total_len - idlen - idlenFieldsLength - mtypeFieldsLength;
std::string body = buf->retrieveAsString(body_len);
msg = MessageFactory::create(mtype);
if(msg.get() == nullptr) {
ELOG("消息类型错误,构造消息对象失败!");
return false;
}
bool ret = msg->unserialize(body);
if(ret == false) {
ELOG("消息正文反序列化失败!");
return false;
}
msg->setId(id);
msg->setMType(mtype);
return true;
}
// 序列化消息
virtual std::string serialize(const BaseMessage::ptr& msg) override{
// | --Len-- | --mtype-- | --idlen-- | --id-- | --body-- |
std::string body = msg->serialize();
std::string id = msg->rid();
auto mtype = htonl((int32_t)msg->mtype());
int32_t idlen = htonl(id.size());
int32_t h_total_len = mtypeFieldsLength + idlenFieldsLength + id.size() + body.size();
int32_t n_total_len = htonl(h_total_len);
DLOG("h_total_len:%d", h_total_len);
DLOG("n_total_len:%d", n_total_len);
std::string result;
result.reserve(h_total_len);
result.append((char*)&n_total_len, lenFieldsLength);
result.append((char*)&mtype, mtypeFieldsLength);
result.append((char*)&idlen, idlenFieldsLength);
result.append(id);
result.append(body);
return result;
}
private:
const size_t lenFieldsLength = 4;
const size_t idlenFieldsLength = 4;
const size_t mtypeFieldsLength = 4;
};
class ProtocolFactory{
public:
template<typename ...Args>
static BaseProtocol::ptr create(Args&& ...args){
return std::make_shared<LVProtocol>(std::forward<Args>(args)...);
}
};
3.6 服务端处理流程图
在封装好上述所有的 muduo类
和 LV协议
后,为了便于理解,我们以server服务端模拟一下整个流程
sequenceDiagram
participant Client
participant MuduoServer
participant LVProtocol
participant Dispatcher
Client->>MuduoServer: 发送二进制数据流
MuduoServer->>LVProtocol: canProcessed()检查数据完整性
LVProtocol->>LVProtocol: onMessage()解析消息头
LVProtocol->>MessageFactory: 创建具体消息对象
MessageFactory-->>LVProtocol: 返回消息对象
LVProtocol->>Dispatcher: 传递消息对象
Dispatcher->>Handler: 调用注册的回调函数
Handler->>MuduoServer: 生成响应消息
MuduoServer->>Client: 发送响应
🏳️🌈 四、整体代码
为了和上一篇文章串联起来,这里笔者将这些都放在一起便于观看,也可以去笔者的仓库中获取
4.1 abstract.hpp
#pragma once
#include "fields.hpp"
#include "detail.hpp"
#include <memory>
#include <functional>
using namespace fields;
// rpc 基类
namespace rpc{
// 基类消息类
class BaseMessage{
public:
using ptr = std::shared_ptr<BaseMessage>;
// 析构函数
virtual ~BaseMessage(){}
// 设置消息id
virtual void setId(const std::string& id){ _rid = id; }
// 获取消息id
virtual std::string rid() const{ return _rid; }
// 设置消息类型
virtual void setMType(MType type){
DLOG("设置消息类型: %d", static_cast<int>(type));
_mtype = type;
}
// 获取消息类型
virtual MType mtype() const{ return _mtype; }
// 序列化消息
virtual std::string serialize() = 0;
// 反序列化消息
virtual bool unserialize(const std::string& msg) = 0;
// 检查消息是否合法
virtual bool check() = 0;
private:
MType _mtype;
std::string _rid;
};
// 基类缓冲区类
class BaseBuffer{
// 方法名中的 32 特指 操作的数据类型为 32 位整数
public:
using ptr = std::shared_ptr<BaseBuffer>;
// 获取可读字节数
virtual size_t readableSize() = 0;
// 获取可写字节数
virtual int32_t peekInt32() = 0;
// 接收字节
virtual void retrieveInt32() = 0;
// 读取字节
virtual int32_t readInt32() = 0;
// 接收字节
virtual std::string retrieveAsString(size_t len) = 0;
};
// 基类协议类
class BaseProtocol{
public:
using ptr = std::shared_ptr<BaseProtocol>;
// 是否能够处理该消息
virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;
// 处理消息
virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0;
// 序列化消息
virtual std::string serialize(const BaseMessage::ptr& msg) = 0;
};
// 基类连接类
class BaseConnection{
public:
using ptr = std::shared_ptr<BaseConnection>;
// 发送消息
virtual void send(const BaseMessage::ptr& msg) = 0;
// 关闭连接
virtual void shutdown() = 0;
// 是否连接成功
virtual bool connected() = 0;
};
// 连接回调函数
using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;
// 连接关闭回调函数
using CloseCallback = std::function<void(const BaseConnection::ptr&)>;
// 消息回调函数
using MessageCallBack = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;
// 基类服务类
class BaseServer{
public:
using ptr = std::shared_ptr<BaseServer>;
// 设置连接回调函数
virtual void setConnectionCallback(const ConnectionCallback& cb){
_cb_connection = cb;
}
// 设置连接关闭回调函数
virtual void setCloseCallback(const CloseCallback& cb){
_cb_close = cb;
}
// 设置消息回调函数
virtual void setMessageCallback(const MessageCallBack& cb){
_cb_message = cb;
}
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallBack _cb_message;
};
// 基类客户端
class BaseClient{
public:
using ptr = std::shared_ptr<BaseClient>;
// 设置连接回调函数
virtual void setConnectionCallback(const ConnectionCallback& cb){
_cb_connection = cb;
}
// 设置连接关闭回调函数
virtual void setCloseCallback(const CloseCallback& cb){
_cb_close = cb;
}
// 设置消息回调函数
virtual void setMessageCallback(const MessageCallBack& cb){
_cb_message = cb;
}
// 开始连接
virtual void connect() = 0;
// 关闭连接
virtual void shutdown() = 0;
// 发送消息
virtual bool send(const BaseMessage::ptr& msg) = 0;
// 获取连接
virtual BaseConnection::ptr connection() = 0;
// 是否连接成功
virtual bool connected() = 0;
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallBack _cb_message;
};
}
4.2 Message.hpp
#pragma once
#include <memory>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"
namespace rpc{
typedef std::pair<std::string, int> Address;
// Json消息类
class JsonMessage : public BaseMessage{
public:
using ptr = std::shared_ptr<BaseMessage>;
// 序列化
virtual std::string serialize() override {
std::string body;
bool ret = JSON::serialize(_body, body);
if(ret == false){
return body;
}
return body;
}
// 反序列化
virtual bool unserialize(const std::string& msg) override{
return JSON::unserialize(msg, _body);
}
// 检查
virtual bool check() = 0;
protected:
Json::Value _body;
};
// Json消息请求类
class JsonRequest : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonRequest>;
};
// Json消息响应类
class JsonResponse : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonResponse>;
virtual bool check() override{
// 在响应中没大部分的相应都只有响应状态码
// 因此只需要判断响应状态码字段是否存在,类型是否正确即可
if(_body[KEY_RCODE].isNull() == true){
ELOG("响应中没有相应状态码");
return false;
}
if(_body[KEY_RCODE].isIntegral() == false){
ELOG("响应状态码字段类型错误!");
return false;
}
return true;
}
// 获取响应状态码
virtual RCode rcode(){
return (RCode)_body[KEY_RCODE].asInt();
}
// 设置响应状态码
virtual void setRcode(RCode rcode){
_body[KEY_RCODE] = (int)rcode;
}
};
// RPC请求类
class RpcRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<RpcRequest>;
// 检查
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("RPC请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_PARAMS].isNull() == true ||
_body[KEY_PARAMS].isObject() == false){
ELOG("RPC请求中没有参数信息或参数信息类型错误");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取参数
Json::Value params(){
return _body[KEY_PARAMS];
}
// 设置参数
void setParams(const Json::Value& params){
_body[KEY_PARAMS] = params;
}
};
// 主题请求类
class TopicRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<TopicRequest>;
// 检查
virtual bool check() override{
if(_body[KEY_TOPIC_KEY].isNull() == true ||
_body[KEY_TOPIC_KEY].isString() == false){
ELOG("主题请求中没有主题名称或主题名称类型类型错误!");
return false;
}
if(_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false){
ELOG("主题请求中没有主题消息或主题消息类型错误");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("主题请求中没有操作类型或操作类型类型错误!");
return false;
}
return true;
}
// 获取主题名称
std::string topicKey(){
return _body[KEY_TOPIC_KEY].asString();
}
// 设置主题名称
void setTopicKey(const std::string& topic_key){
_body[KEY_TOPIC_KEY] = topic_key;
}
// 获取主题消息
std::string topicMsg(){
return _body[KEY_TOPIC_MSG].asString();
}
// 设置主题消息
void setTopicMsg(const std::string& topic_msg){
_body[KEY_TOPIC_MSG] = topic_msg;
}
// 获取主题请求操作类型
TopicOptype optype(){
return (TopicOptype)_body[KEY_OPTYPE].asInt();
}
// 设置主题请求操作类型
void setOptype(TopicOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
};
// 服务请求类
class ServiceRequest : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceRequest>;
virtual bool check() override{
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("服务请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务请求中没有操作类型或操作类型类型错误");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&
(_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isObject() == false ||
_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST][KEY_HOST_IP].isString() == false ||
_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false
)){
ELOG("服务请求中主机地址信息错误!");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取操作类型
ServiceOptype Serviceoptype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置操作类型
void setServiceOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 获取主机地址
Address host(){
Address addr;
addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();
return addr;
}
void setHost(const Address& host){
Json::Value val;
val[KEY_HOST_IP] = host.first;
val[KEY_HOST_PORT] = host.second;
_body[KEY_HOST] = val;
}
};
// RPC响应类
class RpcResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("RPC响应中没有响应状态码或响应状态码类型错误!");
return false;
}
// 因为返回类型多种多样,就不需要进行判断了
if(_body[KEY_RESULT].isNull() == true){
ELOG("RPC响应中没有响应结果或响应结果类型错误!");
return false;
}
return true;
}
// 获取响应结果
Json::Value result(){
return _body[KEY_RESULT];
}
// 设置响应结果
void setResult(const Json::Value& result){
_body[KEY_RESULT] = result;
}
};
// 主题响应类
class TopicResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
// 服务响应类
class ServiceResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check() override{
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("服务响应中没有响应状态码或响应状态码类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务响应中没有操作类型或操作类型的类型错误!");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) && (
_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false ||
_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isArray() == false
)){
ELOG("服务发现响应中响应信息错误!");
return false;
}
return true;
}
// 获取响应操作类型
ServiceOptype optype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置响应操作类型
void setOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 设置响应方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置响应方法
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取响应主机地址
std::vector<Address> Hosts(){
std::vector<Address> addrs;
int sz = _body[KEY_HOST].size();
for(int i = 0; i < sz; ++i){
Address addr;
addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
// 设置响应主机地址
void setHost(std::vector<Address> addrs){
for(auto& addr : addrs){
Json::Value val;
val[KEY_HOST_IP] = addr.first;
val[KEY_HOST_PORT] = addr.second;
_body[KEY_HOST].append(val);
}
}
};
// 实现一个消息对象的生产工厂
class MessageFactory{
public:
static BaseMessage::ptr create(MType mtype){
switch(mtype){
case MType::REQ_RPC : return std::make_shared<RpcRequest>();
case MType::RSP_RPC : return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();
default: return BaseMessage::ptr(); // 类型错误,返回一个空的指针对象
}
}
template<typename T, typename... Args>
static std::shared_ptr<T> create(Args&& ...args){
return std::make_shared<T>(std::forward<Args>(args)...);
}
};
}
4.3 net.hpp
#pragma once
#include <memory>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"
namespace rpc{
typedef std::pair<std::string, int> Address;
// Json消息类
class JsonMessage : public BaseMessage{
public:
using ptr = std::shared_ptr<BaseMessage>;
// 序列化
virtual std::string serialize() override {
std::string body;
bool ret = JSON::serialize(_body, body);
if(ret == false){
return body;
}
return body;
}
// 反序列化
virtual bool unserialize(const std::string& msg) override{
return JSON::unserialize(msg, _body);
}
// 检查
virtual bool check() = 0;
protected:
Json::Value _body;
};
// Json消息请求类
class JsonRequest : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonRequest>;
};
// Json消息响应类
class JsonResponse : public JsonMessage{
public:
using ptr = std::shared_ptr<JsonResponse>;
virtual bool check() override{
// 在响应中没大部分的相应都只有响应状态码
// 因此只需要判断响应状态码字段是否存在,类型是否正确即可
if(_body[KEY_RCODE].isNull() == true){
ELOG("响应中没有相应状态码");
return false;
}
if(_body[KEY_RCODE].isIntegral() == false){
ELOG("响应状态码字段类型错误!");
return false;
}
return true;
}
// 获取响应状态码
virtual RCode rcode(){
return (RCode)_body[KEY_RCODE].asInt();
}
// 设置响应状态码
virtual void setRcode(RCode rcode){
_body[KEY_RCODE] = (int)rcode;
}
};
// RPC请求类
class RpcRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<RpcRequest>;
// 检查
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("RPC请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_PARAMS].isNull() == true ||
_body[KEY_PARAMS].isObject() == false){
ELOG("RPC请求中没有参数信息或参数信息类型错误");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取参数
Json::Value params(){
return _body[KEY_PARAMS];
}
// 设置参数
void setParams(const Json::Value& params){
_body[KEY_PARAMS] = params;
}
};
// 主题请求类
class TopicRequest : public JsonRequest{
public:
using ptr = std::shared_ptr<TopicRequest>;
// 检查
virtual bool check() override{
if(_body[KEY_TOPIC_KEY].isNull() == true ||
_body[KEY_TOPIC_KEY].isString() == false){
ELOG("主题请求中没有主题名称或主题名称类型类型错误!");
return false;
}
if(_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false){
ELOG("主题请求中没有主题消息或主题消息类型错误");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("主题请求中没有操作类型或操作类型类型错误!");
return false;
}
return true;
}
// 获取主题名称
std::string topicKey(){
return _body[KEY_TOPIC_KEY].asString();
}
// 设置主题名称
void setTopicKey(const std::string& topic_key){
_body[KEY_TOPIC_KEY] = topic_key;
}
// 获取主题消息
std::string topicMsg(){
return _body[KEY_TOPIC_MSG].asString();
}
// 设置主题消息
void setTopicMsg(const std::string& topic_msg){
_body[KEY_TOPIC_MSG] = topic_msg;
}
// 获取主题请求操作类型
TopicOptype optype(){
return (TopicOptype)_body[KEY_OPTYPE].asInt();
}
// 设置主题请求操作类型
void setOptype(TopicOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
};
// 服务请求类
class ServiceRequest : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceRequest>;
virtual bool check() override{
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false){
ELOG("服务请求中没有方法名称或方法名称类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务请求中没有操作类型或操作类型类型错误");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) &&
(_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isObject() == false ||
_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST][KEY_HOST_IP].isString() == false ||
_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST][KEY_HOST_PORT].isIntegral() == false
)){
ELOG("服务请求中主机地址信息错误!");
return false;
}
return true;
}
// 获取方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置方法名称
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取操作类型
ServiceOptype Serviceoptype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置操作类型
void setServiceOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 获取主机地址
Address host(){
Address addr;
addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();
return addr;
}
void setHost(const Address& host){
Json::Value val;
val[KEY_HOST_IP] = host.first;
val[KEY_HOST_PORT] = host.second;
_body[KEY_HOST] = val;
}
};
// RPC响应类
class RpcResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check() override{
// rpc 请求中,包含请求方法名称-字符串,参数字段-对象
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("RPC响应中没有响应状态码或响应状态码类型错误!");
return false;
}
// 因为返回类型多种多样,就不需要进行判断了
if(_body[KEY_RESULT].isNull() == true){
ELOG("RPC响应中没有响应结果或响应结果类型错误!");
return false;
}
return true;
}
// 获取响应结果
Json::Value result(){
return _body[KEY_RESULT];
}
// 设置响应结果
void setResult(const Json::Value& result){
_body[KEY_RESULT] = result;
}
};
// 主题响应类
class TopicResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
// 服务响应类
class ServiceResponse : public JsonResponse{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check() override{
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false){
ELOG("服务响应中没有响应状态码或响应状态码类型错误!");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false){
ELOG("服务响应中没有操作类型或操作类型的类型错误!");
return false;
}
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOptype::SERVICE_DISCOVERY) && (
_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false ||
_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isArray() == false
)){
ELOG("服务发现响应中响应信息错误!");
return false;
}
return true;
}
// 获取响应操作类型
ServiceOptype optype(){
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
// 设置响应操作类型
void setOptype(ServiceOptype optype){
_body[KEY_OPTYPE] = (int)optype;
}
// 设置响应方法名称
std::string method(){
return _body[KEY_METHOD].asString();
}
// 设置响应方法
void setMethod(const std::string& method_name){
_body[KEY_METHOD] = method_name;
}
// 获取响应主机地址
std::vector<Address> Hosts(){
std::vector<Address> addrs;
int sz = _body[KEY_HOST].size();
for(int i = 0; i < sz; ++i){
Address addr;
addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
// 设置响应主机地址
void setHost(std::vector<Address> addrs){
for(auto& addr : addrs){
Json::Value val;
val[KEY_HOST_IP] = addr.first;
val[KEY_HOST_PORT] = addr.second;
_body[KEY_HOST].append(val);
}
}
};
// 实现一个消息对象的生产工厂
class MessageFactory{
public:
static BaseMessage::ptr create(MType mtype){
switch(mtype){
case MType::REQ_RPC : return std::make_shared<RpcRequest>();
case MType::RSP_RPC : return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();
default: return BaseMessage::ptr(); // 类型错误,返回一个空的指针对象
}
}
template<typename T, typename... Args>
static std::shared_ptr<T> create(Args&& ...args){
return std::make_shared<T>(std::forward<Args>(args)...);
}
};
}
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - muduo网络通信类实现篇 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~