施磊老师rpc(四)
文章目录
- rpc网络服务
- 简介
- RpcProvider 的设计目标
- Eventloop不使用智能指针-弃用
- RpcProvider类似于集群的服务器
- provider网络实现
- **src/include/rpcprovider.h**
- **src/include/mprpcapplication.h**
- **src/rpcprovider.cc**
- 错误1
- 错误2-重点
- **本项目的 mprpc 是动态库, muduo..是静态库**
- 重点错误!
- 详见知识补充
- 错误3
- 测试
- RpcProvider发布服务方法(一)
- RPC Provider的核心功能
- RPC Provider 网络模块实现
- RPC 服务方法的发布功能
- 框架提供的接口:
- 角色说明
- run函数的流程
- `NotifyService`的作用
- 框架帮我们做了什么?
- RpcProvider发布服务方法(二)
- 实现 `RpcProvider::NotifyService`
- 为什么这样做(目的):
- 框架中处理服务的通用做法
- 具体实现步骤(NotifyService 方法逻辑)
- 错误-1
- **解决方法**
- NotifyService实现
- 单个服务:**Service**
- 所有服务的集合:**服务注册表(Service Registry)** 或 **服务映射表(Service Map)**
- **src/include/rpcprovider.h**
- **src/rpcprovider.cc**
- 注意
- 示例
- 总结:为什么我们选择 Protocol Buffer 而不是 JSON
- 常见的选择:
- JSON 和 Protobuf 的对比:
- 为什么 Protobuf 更适合 RPC 场景:
- RpcProvider分发rpc服务(一)
- 本节课任务
- 回顾
- onMessage 中要做什么?
- 理解流程-不要混
- 理解错误
- **序列化后的字符串 和 传输的字节流**
- **以json为例**
- 英语单词
- **Customer(顾客)**
- **Consumer(消费者)**
- json vs proto反序列化
- 1. **Proto需要预定义数据结构**
- 2. **Proto生成代码**
- 3. **数据反序列化过程**
- 4. **数据格式**
- 数据头
- json也可以 数据头
- protobuf使用不太一样---具体看实现
- 粘包处理机制
- **例子**
- 为什么不用“加竖杠分隔字符串”的做法?
- 上不了 台面的 玩意儿!! 太勾八垃圾了
- proto反序列化
- **自定义协议设计(数据格式)**
- 示例
- Proto 实现
- 数据处理流程总结(onMessage 中)
- **这是整个流程, 本节课仅完成 前三个步骤**
- 对protobuf字节流的理解-不一定对
- **重点理解**
- onmessage实现-部分
- 总结
- 小技巧:读取定长二进制整数--暂时还没遇到问题
rpc网络服务
简介
从 配置文件 读取 ip和port 后, 就需要 进行连接 传输 了
也就是 rpc框架准备好了, 现在需要 网络, 服务方 才能发布 rpc, 然后 消费端 才能 连接 去调用
RpcProvider 的设计目标
- 让使用者只需要注册服务即可,不暴露
muduo
网络细节。 - 从配置文件中自动加载 IP 和端口,避免用户手动输入。
- 封装
muduo
的启动流程,让RpcProvider::Run()
启动网络服务。 - 使用基于 Reactor 模型的高性能网络服务。
Eventloop不使用智能指针-弃用
在 muduo 网络库中,
EventLoop
通常不使用智能指针管理,而是直接使用原始指针,主要原因如下:
- 生命周期明确
EventLoop
通常是长期存在的对象(如伴随整个线程或程序生命周期),其销毁时机由代码逻辑直接控制,无需智能指针自动管理。- 栈对象主导
muduo 的设计中,EventLoop
多作为栈对象(如EventLoop loop;
),依赖作用域自动析构,无需堆内存分配和智能指针介入。- 从属关系清晰
TcpServer
仅持有EventLoop
的指针(表示依赖关系),而非所有权。EventLoop
的实际生命周期由更高层(如main()
或线程函数)管理,避免所有权混乱。- 性能与简洁性
原始指针更轻量,符合 muduo 高性能网络库的设计目标,同时代码更直观(智能指针在此场景无显著优势)。总结:
EventLoop
的生命周期由应用逻辑显式控制,且多为栈对象,使用原始指针更符合 muduo 的设计哲学和实际使用模式。
RpcProvider类似于集群的服务器
类似, 并不完全相同
provider网络实现
[!TIP]
这个
run 函数
就相当于 集群聊天里 的main
调用 chatserver 差不多
src/include/rpcprovider.h
private: // // 组合TcpServer ----- 不写成成员了, 只有 run 会访问 // std::unique_ptr<muduo::net::TcpServer> m_pTcpServer; // 智能指针 // 组合 EventLoop muduo::net::EventLoop m_eventLoop; // 事件循环 void OnConnection(const muduo::net::TcpConnectionPtr &conn); // 连接回调函数void OnMessage(const muduo::net::TcpConnectionPtr &conn, // 消息回调函数muduo::net::Buffer *buffer,muduo::Timestamp time);
src/include/mprpcapplication.h
// 获取配置文件对象static MprpcConfig &GetConfig();
.cc
// 获取配置文件对象 MprpcConfig &MprpcApplication::GetConfig() {return m_config; // 返回配置文件对象 }
[!tip]
如果 有学过 集群项目
run 里面的
muduo部分
将是 融合的 集群服务器的main, chatserver
src/rpcprovider.cc
// 启动rpc服务发布 节点, 开始提供rpc远程网络调用服务 void RpcProvider::Run() {std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress addr(ip, port); // 绑定ip和端口号// 创建TcpServer对象muduo::net::TcpServer server(&m_eventLoop, addr, "RpcProvider");// 设置线程数量server.setThreadNum(4);// 设置连接回调函数server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));// 设置消息回调函数server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));std::cout << "RpcProvider start service " << std::endl;// 启动服务server.start();// 事件循环m_eventLoop.loop(); }// 实现连接回调函数 void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn) { }// 实现消息回调函数 void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, // 消息回调函数muduo::net::Buffer *buffer,muduo::Timestamp time) {// 1. 解析rpc请求数据// 2. 生成响应数据// 3. 发送响应数据 }
由于使用了 muduo, 因此 cmke 要添加 编译选项
错误1
如果 cmake 加的编译选项是 muduo, 一般会 找不到库
find /usr -name "libmuduo*"
/usr/local/lib/libmuduo_base.a
/usr/local/lib/libmuduo_net.a
/usr/local/lib/libmuduo_http.a
/usr/local/lib/libmuduo_inspect.a
[!important]
muduo 变为 muduo_net muduo_base
顺序必须一样, net依赖base
一般是这些库, 没有 muduo.a
前两个最重要
库文件 | 功能说明 | 典型依赖关系 |
---|---|---|
libmuduo_base.a | 基础核心库,包含 EventLoop 、Timestamp 、Logging 等基础工具类 | 无依赖,是其他模块的基础 |
libmuduo_net.a | 网络通信库,提供 TcpServer 、TcpClient 、Buffer 等网络相关类 | 依赖 libmuduo_base.a |
libmuduo_http.a | HTTP 协议支持库,提供简单的 HTTP 服务器和客户端功能 | 依赖 libmuduo_net.a |
libmuduo_inspect.a | 调试监控库,支持通过 HTTP 接口查看服务器内部状态(如连接数、线程状态) | 依赖 libmuduo_http.a |
错误2-重点
[build] /usr/bin/ld: /usr/local/lib/libmuduo_net.a(EventLoop.cc.o): relocation R_X86_64_TPOFF32 against `_ZN12_GLOBAL__N_118t_loopInThisThreadE' can not be used when making a shared object; recompile with -fPIC
[build] /usr/bin/ld: failed to set dynamic section sizes: bad value
[build] collect2: error: ld returned 1 exit status
[build] gmake[2]: *** [src/CMakeFiles/mprpc.dir/build.make:129: ../lib/libmprpc.so] Error 1
[build] gmake[1]: *** [CMakeFiles/Makefile2:131: src/CMakeFiles/mprpc.dir/all] Error 2
本项目的 mprpc 是动态库, muduo…是静态库
错误的原因:你在编译动态库时,链接了一个静态库(
libmuduo_net.a
),但是这个静态库中的对象文件(EventLoop.cc.o
)没有使用-fPIC
编译。这导致链接器出现了 “relocation can not be used when making a shared object” 错误,因为静态库中的代码是位置相关的,不能直接与动态库链接。
[!important]
重点错误!
当你在构建动态库(
.so
)时,链接了一个静态库(.a
),但是静态库中的代码没有使用 位置无关代码(Position Independent Code,PIC) 编译选项(即-fPIC
),就会出现链接错误,通常表现为类似以下的错误信息:relocation R_X86_64_TPOFF32 against `symbol` can not be used when making a shared object; recompile with -fPIC
详见知识补充
静态库 一般不需要 -fPIC, 一般只有动态库需要
但是, 当 静态库 需要连接到 动态库时, 静态库必须加 -fPIC 选项
错误3
muduo_net muduo_base
顺序必须一样, net依赖base
测试
自行
RpcProvider发布服务方法(一)
[!tip]
建议看 原视频, 过程讲的很清晰
RPC Provider的核心功能
- 已实现功能:(run函数)
- 网络模块:基于Muduo库实现TCP服务器,处理网络数据收发(绿色部分)。
- 事件循环与回调:通过
onConnection
和onMessage
回调处理连接和消息。
- 待实现功能(黄色部分):
- 服务方法注册与映射:通过
NotifyService
记录服务对象及其方法,供远程调用时定位。
- 服务方法注册与映射:通过
RPC Provider 网络模块实现
- 利用
muduo
网络库,实现了 RPC Provider 的网络通信能力; - 涉及
TcpServer
和EventLoop
以及两个核心回调函数:onConnection
:新连接建立onMessage
:接收远程调用请求(字节流)
RPC 服务方法的发布功能
目的:让使用者能够将本地服务对象的方法注册为远程可调用的 RPC 方法。
框架提供的接口:
void notifyService(::google::protobuf::Service* service);
- 参数:是一个继承自
protobuf::Service
的指针; - 使用者将自定义的服务类(如
UserService
)传入,即可注册它的所有 RPC 方法。
角色说明
名称 | 说明 |
---|---|
caller | RPC 的调用者(客户端) |
callee | RPC 的提供者(服务端) |
mprpc 框架 | 提供 mprpcApplication 、RpcProvider 等功能模块 |
[!tip]
任何 分布式节点 都可能成为一个 rpc服务器—collee, 也可能 请求调用 其他rpc方法 — coller
run函数的流程
流程示例(以UserService::Login
为例):
- 接收请求:RPC Provider通过网络模块接收字节流。
- 反序列化:Protobuf将数据解析为
LoginRequest
对象。 - 方法调用:框架查表找到
UserService
的Login
方法并调用。 - 处理响应:用户代码填充
LoginResponse
,框架序列化后通过网络返回。
NotifyService
的作用
-
核心目标:建立服务对象与方法调用的映射表,使框架能根据请求调用正确的本地方法。
-
记录服务对象(如
UserServiceRpc
);记录该服务对象中有哪些方法(login、register 等);
使用 protobuf 提供的反射接口,获取:
- service 名称;
- 每个方法的名称;
- 方法的编号 / 反射调用方式。
框架帮我们做了什么?
步骤 | 谁来做? | 作用 |
---|---|---|
接收字节流 | 网络模块(muduo) | 收到 RPC 请求 |
反序列化 | protobuf | 还原请求对象 |
定位方法 | 框架内部映射表 | 找到服务和函数 |
反射调用 | protobuf + 注册表 | 调用我们自己重写的逻辑 |
回传结果 | 框架负责序列化 + 网络发送 | 客户端接收结果 |
RpcProvider发布服务方法(二)
实现 RpcProvider::NotifyService
为什么这样做(目的):
- 将用户定义的 service 对象(如
UserService
)注册到框架中。 - 框架后续通过 service 名和 method 名即可定位到具体的服务方法,实现远程调用。
- 抽象性设计原则:框架不依赖具体业务类,仅依赖于 protobuf 的
Service
基类。
框架中处理服务的通用做法
-
使用
google::protobuf::Service
作为服务对象的基类指针。 -
利用
protobuf
的 反射机制(通过 descriptor 描述服务和方法),实现对服务对象的动态管理。 -
// pb.h 里面 server的子类里 static const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* descriptor();
-
至于 这个 类型里 有什么, 点进去看即可 , 挺多的, 下面这个不全
-
具体实现步骤(NotifyService 方法逻辑)
- 获取服务描述信息
使用service->GetDescriptor()
得到服务的元数据,包括服务名称、方法个数等。- 提取服务名
用descriptor->name()
获取服务的唯一标识名(如UserServiceRpc
)。- 提取每个方法的描述信息
通过循环descriptor->method(i)
获取每个方法的MethodDescriptor
,记录方法名等。- 定义数据结构用于存储注册信息
自定义一个ServiceInfo
结构体:
- 包含一个服务对象指针
Service*
.- 包含一个
unordered_map<string, const MethodDescriptor*>
存储方法名与其描述对象的映射。- 维护全局服务映射表
用unordered_map<string, ServiceInfo>
,键是服务名,值是该服务的所有信息(对象+方法表)。- 将服务及其方法插入映射表中
完成注册,后续框架收到请求时即可根据服务名和方法名快速查找、调用对应的业务逻辑。
错误-1
不允许使用指向不完整类型 “google::protobuf::ServiceDescriptor” 的指针或引用
这个错误表明编译器在处理
google::protobuf::ServiceDescriptor
时认为它是一个不完整类型(incomplete type),即编译器看到了它的声明(比如前向声明),但没有看到完整的定义。这通常是因为缺少对应的头文件包含。
解决方法
#include <google/protobuf/descriptor.h> // 这个头文件里有 ServiceDescriptor
NotifyService实现
[!important]
每个
ServiceInfo
记录一个服务及其所有方法.整个服务注册表维护多个这样的服务信息,支持多服务统一管理。
服务注册表(map<string, ServiceInfo>)
└── "UserServiceRpc" → ServiceInfo├── service指针(UserServiceRpc*)└── method_map(map<string, const MethodDescriptor*>)├── "Login" → MethodDescriptor*└── "Register" → MethodDescriptor*
└── "FriendServiceRpc" → ServiceInfo├── service_ptr: FriendServiceRpc*└── method_map├── "AddFriend" → AddFriend 方法的描述符└── "GetList" → GetList 方法的描述符
当你调用
NotifyService(UserServiceRpc*)
:
- 通过服务对象获取其描述信息(ServiceDescriptor)。
- 从描述信息中提取出:
- 服务名称(如 “UserServiceRpc”)
- 每一个方法的:
- 方法名(如 “Login”, “Register”)
- 输入参数类型(如
LoginRequest
)- 输出参数类型(如
LoginResponse
)
在 RPC 框架中,命名一般如下:
单个服务:Service
指的是一个具体的业务服务,比如
UserService
、OrderService
,通常是继承自google::protobuf::Service
的类实例。它包含多个可以远程调用的方法(RPC 方法)。所有服务的集合:服务注册表(Service Registry) 或 服务映射表(Service Map)
src/include/rpcprovider.h
// private 添加 // service 服务类型信息----服务名及方法名struct ServiceInfo{google::protobuf::Service *m_service; // 服务对象std::unordered_map<std::string, const google::protobuf::MethodDescriptor *> m_methodMap; // 方法名和方法描述对象的映射关系};// 存储服务对象的容器std::unordered_map<std::string, ServiceInfo> m_serviceMap; // 服务名和服务对象的映射关系
src/rpcprovider.cc
// 这里是框架提供给外部使用的, 可以发布rpc方法的函数接口 void RpcProvider::NotifyService(google::protobuf::Service *server) {ServiceInfo service_info; // 服务对象信息// 获取服务对象描述信息---- 去看看 pb.h 和 pb.cc 这个接口, 是const!!const google::protobuf::ServiceDescriptor *pserviceDesc = server->GetDescriptor(); // 获取服务名称std::string service_name = pserviceDesc->name(); // 获取服务名称std::cout << "service_name: " << service_name << std::endl;// 获取服务方法数量int method_count = pserviceDesc->method_count(); // 获取服务方法数量// 获取服务对象指定下标的方法描述信息for (int i = 0; i < method_count; ++i){const google::protobuf::MethodDescriptor *pMethodDesc = pserviceDesc->method(i); // 获取服务对象指定下标的方法描述信息// 获取方法名称std::string method_name = pMethodDesc->name(); // 获取方法名称service_info.m_methodMap.insert({method_name, pMethodDesc}); // 将方法名称和方法描述对象的映射关系存入容器std::cout << "method_name: " << method_name << std::endl;}service_info.m_service = server; // 将服务对象存入容器// 将服务名称和服务对象存入容器m_serviceMap.insert({service_name, service_info}); }
注意
[!warning]
NotifyService
函数 每次 只注册一个 服务想要注册多个 服务, 就要 多次进行调用
示例
// 用户代码:注册多个服务 UserServiceRpc user_service; OrderServiceRpc order_service; provider.NotifyService(&user_service); // 注册 UserService provider.NotifyService(&order_service); // 注册 OrderService
总结:为什么我们选择 Protocol Buffer 而不是 JSON
在网络通信中,必须选用一种数据传输协议来进行结构化数据的交换。不能直接传输原始的字节流或字符串,因为我们需要明确区分不同的字段、类型和结构,这就需要一种标准化的“数据格式”。
常见的选择:
- XML:过于冗长、效率低,已经很少使用。
- JSON:结构清晰,易于阅读,学习成本低,但效率偏低。
- Protocol Buffer(Protobuf):谷歌开发的高效二进制序列化协议。
JSON 和 Protobuf 的对比:
项目 JSON Protobuf 存储格式 文本(可读性好) 二进制(效率高) 序列化性能 较慢 非常快 传输体积 大(包含键名) 小(紧凑、无额外字段名) 类型系统 弱类型(依赖解析库) 强类型(.proto 明确定义) 支持 RPC 方法 不支持 支持 Service 定义和方法描述(gRPC) 为什么 Protobuf 更适合 RPC 场景:
- 它不仅提供数据结构的序列化与反序列化功能(像 JSON 一样),
- 还支持对服务(service)和方法(rpc method)的描述,可以用于自动生成代码、进行远程调用处理(这是 JSON 无法做到的)。
因此,在构建聊天服务器等高性能通信系统时,我们更倾向于使用 Protobuf 而不是 JSON,尤其是在服务之间通过 RPC 调用的场景下,它能极大提升效率与可维护性。
RpcProvider分发rpc服务(一)
本节课任务
完成proto的反序列化
完成 RPC 框架中 Provider 端的 onMessage
方法,实现:
- 从网络接收 RPC 请求数据;
- 解析出请求的目标服务、方法名、参数;
- 找到对应方法并调用;
- 返回结果给 Client。
回顾
notifyService()
中已将用户发布的服务和方法注册进 Map 中(类比于“服务表”)。run()
方法完成了网络监听和服务启动。onConnection()
:短连接模型下,当断开连接时关闭 socket(shutdown(fd, SHUT_RDWR)
)。
onMessage 中要做什么?
收到的数据本质是一个字符流(字节流),必须:
- 拆包:防止 TCP 粘包;
- 反序列化:根据协议提取出
serviceName
、methodName
和args
; - 查找目标方法并调用;
- 序列化响应并返回给 Client。
理解流程-不要混
【客户端】 【服务端】RPCStub::login() onMessage()↓ ↓
序列化:方法名+参数 <----- 收到字节流↓ ↓
通过 TCP 发出请求 -----> 提取数据(Buffer)↓RPC框架反序列化↓映射到本地的 login()↓执行并获取返回值↓序列化返回值,send()
[!important]
RPC 是目的,Muduo 是网络通信框架,onMessage 是 Muduo 中处理收到消息的“钩子函数”,它帮我们解包网络数据,并触发 RPC 调用。
理解错误
序列化后的字符串 和 传输的字节流
你发送的是字符串(字节数组),网络传输的是流(字节流),收到后要自己从流中“分段”,提取成完整字符串,再反序列化还原数据。
muduo 中的 conn->send() 使得 序列化后的 数据 以字节流 传输
因此,
onmessage
基本第一步 都是把 收到的字节流 转为 字符串, 然后 再反序列化----->以json为例
{"name": "Tom","age": 18 }
// 序列化 std::string jsonStr = "{\"name\":\"Tom\",\"age\":18}";
// 字节流 0x7b 0x22 0x6e 0x61 0x6d 0x65 0x22 0x3a 0x22 0x54 0x6f 0x6d 0x22 0x2c ...
// 再转字符串, 并 反序列化
英语单词
Customer(顾客)
Consumer(消费者)
json vs proto反序列化
1. Proto需要预定义数据结构
- JSON: 通常是动态的,不需要预定义数据结构,你可以直接使用字符串和数字等常见数据类型。
- Proto: 使用 proto 文件 定义数据结构,所有的消息格式需要事先声明和编译生成相应的代码(例如
.proto
文件生成 C++、Java 或 Python 代码)。这意味着你必须严格遵循定义好的消息结构才能正确地进行序列化和反序列化。2. Proto生成代码
- JSON: 不需要生成任何代码,直接通过标准库(如
nlohmann/json
、rapidjson
)进行处理。- Proto: 需要用 protoc 编译器将
.proto
文件转换为特定语言的类。这些类中会包含生成的 getter 和 setter 方法来访问字段。3. 数据反序列化过程
- JSON: 在反序列化时,直接通过 JSON 库将字符串转为 JSON 对象,通常代码简洁。----
json::parse(recvBuf);
- Proto: 反序列化时,需要将字节流(例如,网络传输的消息)解析为特定的 protobuf 对象。这个过程稍微复杂一些,通常涉及到对序列化数据流的解析,调用
ParseFromString
或类似的方法。4. 数据格式
- JSON: 是文本格式,便于阅读和调试,但占用空间相对较大。
- Proto: 是二进制格式,具有更高的性能和更小的消息体积,但不容易直接读取和调试。
数据头
json也可以 数据头
#include <iostream> #include <nlohmann/json.hpp>using json = nlohmann::json;struct Header {std::string message_type;size_t message_length;int checksum;std::string protocol_version; };struct Message {Header header;json body; };int main() {// 构建 JSON 数据体json body = {{"userId", 1},{"name", "Alice"},{"age", 30}};// 构建数据头Header header = {"Request", // 消息类型body.dump().size(), // 消息体的长度12345, // 校验和"1.0" // 协议版本};// 构建完整消息Message message = { header, body };// 输出完整消息(仅示意)std::cout << "Header:\n";std::cout << "Message Type: " << message.header.message_type << "\n";std::cout << "Message Length: " << message.header.message_length << "\n";std::cout << "Checksum: " << message.header.checksum << "\n";std::cout << "Protocol Version: " << message.header.protocol_version << "\n";std::cout << "\nBody:\n";std::cout << message.body.dump() << std::endl;return 0; }
protobuf使用不太一样—具体看实现
粘包处理机制
TCP 是流式协议,不能保证一次接收的数据就是一整包。为此我们必须:
- 用固定长度(4字节)作为 header 长度标志;
- 然后再根据这个长度去读取 header,再读取参数。
例子
假设客户端连续发送了三个消息,每个消息的长度为 5 字节:
Message 1: "Hello" Message 2: "World" Message 3: "Data"
然而,TCP 协议会将这些数据合并成一个连续的字节流进行传输,可能会变成如下形式(这仅是一个例子,实际数据可能更复杂):
HelloWorldData
接收方收到的就是一个连续的数据流:“HelloWorldData”,无法直接知道哪个部分属于哪个消息。
为什么不用“加竖杠分隔字符串”的做法?
上不了 台面的 玩意儿!! 太勾八垃圾了
因为:
- 不规范、性能差;
- 不支持嵌套结构;
- 不可扩展;
- 容易出错;
正确做法是采用 protobuf 作为结构化序列化协议,明确字段类型和结构。
proto反序列化
自定义协议设计(数据格式)
RPC Client 与 RPC Server 之间使用自定义协议通信。数据格式如下:
[4字节 header size][header 字符串][args 字符串]
- header size:4 字节整数(使用二进制形式存储,不能转字符串!)
- header 字符串:使用 protobuf 定义的
RpcHeader
,包含:
service_name
method_name
args_size
(参数部分的字符串长度)- args 字符串:参数 message 的序列化结果
示例
内容类型 十六进制内容 含义 头部长度 0x00 0x00 0x00 0x12
18 字节,表示后面数据头部分长度为 18 字节 数据头 {"method":"Chat"}
一个 JSON 字符串,18 字节,表示调用的 RPC 方法名是 Chat 消息体 "hello world"
实际发送的消息内容
Proto 实现
syntax = "proto3";
package mprpc;message RpcHeader {string service_name = 1;string method_name = 2;uint32 args_size = 3;
}
生成对应 .pb.h
、.pb.cc
后在 onMessage
中使用。
数据处理流程总结(onMessage 中)
一个 char 占用 1个字节(8bit)
string 每个字符 就是一个字节
std::string[index]
操作的是字节
这是整个流程, 本节课仅完成 前三个步骤
- 读取前 4 字节 ➜ 得到 header 的长度; ----- 这个长度 不能用字符串, 因为长度 就不固定了!!! “10” “10000”
- 读取 header 字符串 ➜
RpcHeader::ParseFromString()
得到 service/method/args_size;- 读取 args 部分 ➜ 根据 args_size 读取参数字符串;
- 从 map 中查找 service 对象;
- 通过 method_name 查找 MethodDescriptor;
- 调用
CallMethod()
执行目标服务逻辑;- 将响应结果 serialize 成字符串,通过 TCP 发回 client。
对protobuf字节流的理解-不一定对
重点理解
在写好 proto 文件后, 就有了 类似于键值对的 东西, 但是 protobuf 自动完成了封装, 也就是 message类, 使用标签 就可以知道 是哪个字段
使用时, 就相当于 是是用类, 定义一个 message 对象, 进行 set_字段 赋值
序列化后
SerializeToString
, 可能变为了标签:值
这样的, 1:name 2:pwd传输可能就是 这么传输
然后 反序列化
ParseFromString
后, 使用.字段()
就能直接拿到值, 这在内部 protobuf 本身进行了封装, 使得 调用 直接就拿到了 值
onmessage实现-部分
src/rpcprovider.cc
#include "rpcheader.pb.h" // 这个头文件里有 RpcHeader/*
在框架内部, rpcprovider 和 rpcconsumer 协商好 之间的协议, 约定好数据的格式
service_name method_name args
UserServiceLoginzhang san123456 传这样的 肯定不行,需要拆分
定义 proto 的 message 类型, 进行 数据头的 序列化和 反序列化\加上头部长度
16UserServiceLoginzhang san123456
16 4个字节的头部长度
从 4个子节后 取出 16
UserServiceLoginzhang san123456
*/// 实现消息回调函数
// 已建立连接用户的 读写事件回调, 如果远程有 rpc服务的 调用请求, 那么onMessage 就会被调用
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, // 消息回调函数muduo::net::Buffer *buffer,muduo::Timestamp time)
{// 网络上接收的远程rpc调用请求的字符流 std::string recv_buf = buffer->retrieveAllAsString(); // 获取接收的字符流uint32_t header_len = 0; // 定义头部长度recv_buf.copy((char *)&header_len, 4, 0); // 本项目 自定义 是 4个字节的头部长度// 根据头部长度, 获取到数据头的内容 // recv_buf 是一个字符串, 头部长度是4个字节, 所以从第4个字节开始, 取header_len长度的内容std::string rpc_header_str = recv_buf.substr(4, header_len);// 反序列化数据, 得到rpc请求的 详细信息mprpc::RpcHeader rpc_header; // 定义rpc请求头对象std::string service_name; // 定义服务名称std::string method_name; // 定义方法名称uint32_t args_size = 0; // 调用方法所要的参数大小if(rpc_header.ParseFromString(rpc_header_str)) // 反序列化数据, 得到rpc请求的 详细信息{rpc_header.service_name(); // 获取服务名称service_name = rpc_header.service_name(); // 获取服务名称method_name = rpc_header.method_name(); // 获取方法名称}else{std::cout << "rpc_header parse error" << std::endl;return;}// 获取 rpc的方法参数 的字符流数据std::string args_str = recv_buf.substr(4 + header_len, recv_buf.size() - 4 - header_len); // 获取方法参数的字符流数据// 打印调试信息std::cout<<"============================="<<std::endl;std::cout<<"header_len: "<<header_len<<std::endl;std::cout<<"rpc_header_str: "<<rpc_header_str<<std::endl;std::cout<<"service_name: "<<service_name<<std::endl;std::cout<<"method_name: "<<method_name<<std::endl;std::cout<<"args_size: "<<args_str.size()<<std::endl;std::cout<<"args_str: "<<args_str<<std::endl;std::cout<<"============================="<<std::endl;}
总结
本节完成 proto 和 onmessage 实现
小技巧:读取定长二进制整数–暂时还没遇到问题
C++ 里读取头部 4 字节为整数的方法(假设 buffer 是 std::string):
uint32_t header_size = 0;
memcpy(&header_size, recv_str.data(), sizeof(uint32_t));
注意字节序问题(本地是小端序)——视项目需求是否使用 ntohl()
。