CyberRT学习(一)
一、listener和talker整体代码实现研究(自上而下)
1、cyber/expamples目录下listener.cc、talker.cc
(1)talker(发布者) 整体流程,init cyber,然后创建node节点,再通过节点里面的CreateWriter方法构造发布的话题句柄,再通过句柄里面的Write方法进行数据发布。

(2)listener(订阅者)整体流程,init cyber,然后创建node节点,再通过节点里面的CreateReader方法构造订阅的话题,再通过MessageCallback回调,接收话题发布的数据。

2、node节点里面的构造cyber/node/node.cc
构造里面创建两个方法
new NodeChannelImpl(node_name)
new NodeServiceImpl(node_name)
实际上node中调用CreateWriter和CreateReader方法均是在NodeChannelImpl对象里面实现


其核心方法主要是Writer对象里面的Init()方法

Init()方法详解
该函数是Writer<MessageT>类的初始化方法,负责完成写入器的核心资源创建与拓扑注册,采用线程安全、延迟初始化的设计模式。
(1)线程安全初始化检查
(2)传输器动态创建
(3)角色属性更新,将传输器生成的唯一ID(通过HashValue()计算)设置到role_attr_中,用于后续服务发现时标识当前写入器实例,确保拓扑中节点标识的唯一性。
(4)服务发现拓扑注册
频道管理器获取:通过全局单例TopologyManager获取频道管理器channel_manager_,用于后续节点注册与拓扑维护。
拓扑加入:调用JoinTheTopology()方法将当前写入器注册到服务发现系统。

JoinTheTopology()方法详解
(1)注册拓扑变更监听器
事件驱动机制:通过 channel_manager_ 注册一个拓扑变更监听器,当服务发现系统检测到Reader加入/离开通道时,触发 OnChannelChange 回调。
回调绑定:使用 std::bind 将当前对象的成员函数 OnChannelChange 绑定为回调,std::placeholders::_1 表示事件参数(如变更类型、读者属性)会传递给回调。
连接管理:返回的 change_conn_ 是一个监听连接句柄,用于后续在 LeaveTheTopology() 中取消监听(避免内存泄漏)。
设计意图:实现动态拓扑适应,无需重启写入器即可响应读者变化,提升系统弹性。
(2)获取通道reader的列表
从 role_attr_ 中获取当前写入器的通道名称(如 sensor_topic),作为服务发现的标识符。
查询活跃读者:调用 channel_manager_ 的 GetReadersOfChannel 方法,查询当前已订阅该通道的所有读者(Reader)的元数据(如节点ID、消息类型、QoS配置等),结果存储在 readers 向量中。
设计意图:在写入器加入拓扑前,主动发现已存在的读者,建立初始连接,避免读者后加入时的延迟。
(3)启用与Reader的传输连接
(4)正式注册写入器到服务拓扑

Enable方法详解(这个tramsmitter_因不同的mode格式会有不同的实现,默认是HYBRID,所以进到HYBRID的实现)
(1)关系识别与过滤
关系分类器:GetRelation() 根据对端属性(如节点ID、进程ID、主机地址)判断双方关系类型(如进程内、同主机、跨主机),后面根据这个类型关系选择相应的连接方式。
无效过滤:若返回 NO_RELATION(如协议不兼容、安全策略阻止),直接退出,避免无效连接尝试。
设计意图:通过关系分类实现传输协议的智能选择(如进程内用INTRA,同主机用SHM,跨主机用RTPS)。
2. 连接管理与路由映射
线程安全连接管理:通过互斥锁 mutex_ 保护共享状态,确保多线程环境下连接注册的原子
路由表映射:
mapping_table_[relation]:将关系类型映射到传输器索引(如 INTRA=0, SHM=1, RTPS=2)。receivers_:按关系类型分组的读者ID集合,记录当前传输器负责的对端节点。
transmitters_:存储具体传输器实例的数组,根据关系类型选择对应传输器并启用连接。
设计意图:实现连接的多路复用与动态路由,支持单一传输器管理多种协议的连接。
3. 历史消息同步
TransmitHistoryMsg(opposite_attr);
新连接初始化:当新读者加入时,自动触发历史消息回放(如订阅前的关键数据),确保新对端快速同步状态。
设计意图:提升系统的状态一致性与即插即用(Plug-and-Play)能力。

Writer Init()中的 transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);详解
不传入参数默认mode是HYBRID
3、cyber/transport目录详解
Writer Init()中的 transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);详解
不传入参数默认mode是HYBRID

CreateTransmitter()方法实际实现,根据消息类型和传输模式动态创建特定类型的传输器实例(默认类型是不执行Enable)


3.1各种通信模式详解
集中模式详解
- HybridTransmitter:
集成 INTRA(进程内)、SHM(共享内存)、RTPS(跨主机) 三种传输模式,根据通信节点关系(如同一进程、同主机不同进程、跨主机)自动选择最优协议。例如:- 同一进程内通信 → 调用
IntraTransmitter(直接函数调用,零拷贝开销)。 - 同主机不同进程 → 调用
ShmTransmitter(通过共享内存减少序列化/反序列化开销)。 - 跨主机通信 → 调用
RtpsTransmitter(基于 DDS 协议实现可靠传输)。
- 同一进程内通信 → 调用
- 其他模式:
- IntraTransmitter:仅支持进程内通信,通过函数调用直接传递消息,无中间层开销。
- ShmTransmitter:仅支持同主机进程间通信,依赖共享内存实现高效数据交换。
- RtpsTransmitter:仅支持跨主机通信,基于 RTPS 协议(DDS 标准子集)实现可靠传输,但需处理网络层复杂性(如 QoS 协商、心跳检测)。
(1)IntraTransmitter
1. 进程内通信的本质优势
零拷贝传递:同一进程内线程/组件间通信可直接通过函数调用或共享内存块传递消息指针,无需序列化/反序列化(避免CPU开销)。
无网络层开销:无需建立TCP连接、处理网络协议栈(如TCP/IP)、协商QoS参数或管理网络连接状态。
线程安全天然性:通过进程内调度器(如IntraDispatcher)集中管理消息分发,避免多线程竞争。
2、代码拆解
幂等设计:通过enabled_标志确保多次调用不会重复初始化,符合“首次使用初始化
单例调度器:IntraDispatcher::Instance()返回进程内全局唯一的消息调度器,负责:注册消息处理回调函数(如std::bind绑定订阅者函数)。
管理消息队列的线程安全分发(如通过std::mutex保护队列操作)。
触发消息传递的实时性保证(如直接内存访问,延迟纳秒级)。
无资源分配:无需创建共享内存、通知器或网络套接字,避免资源泄漏风险。

(2)ShmTransmitter
动态计数器选择:
Arena模式:根据消息类型(如RawMessage或PyMessageWrap)选择序列化计数器或Arena内存池计数器,适配不同消息处理模式。
非Arena模式:直接使用序列化计数器,简化处理逻辑。
原子操作:fetch_add(1)确保多线程环境下计数器增量的原子性,避免竞争条件。
设计意图:通过接收器计数跟踪活跃连接数量,为后续资源分配和传输器激活提供依据。

接收器计数校验:
双计数器(serialized_receiver_count_和arena_receiver_count_)均需大于零,否则报错退出,确保存在有效接收器。
Arena管理器初始化:
通过单例ProtobufArenaManager启用Arena内存池,并绑定到当前通道ID,实现内存池的复用和管理。错误处理:若Arena启用或段分配失败,记录错误并终止流程,避免无效状态。
共享内存与通知器:
SegmentFactory:根据通道ID创建共享内存段,负责数据存储和跨进程访问。
NotifierFactory:创建跨进程通知器(如信号量、事件),实现数据就绪的同步通知。
状态标记:enabled_ = true表示传输器已准备好发送/接收消息。

(3)RtpsTransmitter
1. 启用状态幂等校验
通过enabled_标志确保多次调用Enable()不会重复初始化,符合“单次激活”原则
2. 依赖组件有效性检查
关键依赖校验:participant_是RTPS协议的核心实体(类似DDS的DomainParticipant),负责管理发布者/订阅者的生命周期。
3. RTPS发布者创建
发布者实例化:通过participant_->CreatePublisher()创建RTPS发布者,绑定到指定通道并应用QoS策略。
QoS配置作用:
可靠性:决定消息是否可靠传输(如RELIABLE模式重传丢失数据,BEST_EFFORT模式忽略丢失)。
历史策略:控制发布者 保留的历史消息数量(如KEEP_LAST_10保留最近10条)。
生命周期:管理发布者的自动激活/销毁条件。
错误处理:若发布者创建失败(如资源不足、参数错误),立即终止初始化并标记未启用。
4. 传输器激活

4、cyber/node/reader.h详解
listener.cc里面的CreanteReader最终走到这个函数体

4.1 reader Init()详解
1、初始化状态检查
使用std::atomic的exchange方法实现线程安全的初始化标记。若已初始化则直接返回true,避免重复初始化。
2、统计模块注册
statistics_center->RegisterChanVar()向全局统计中心注册当前Reader的统计变量(如消息延迟、处理时间等)。role_attr_包含节点名、通道名等元信息。
3、消息处理函数绑定
延迟统计:通过Time::Now()获取当前时间戳,与消息接收时间戳计算处理延迟(proc_done_time - proc_start_time)
网络延迟:通过GetProcStatus获取消息发送时间,与接收时间计算网络传输延迟

续
4、任务调度与绑定
创建DataVisitor作为消息访问器,管理消息队列(容量由pending_queue_size_控制)
通过CreateRoutineFactory将lambda函数包装为协程任务
调度器sched创建任务,任务名由节点名+通道名组成(如node_channel)
5、拓扑管理与接收器绑定
从接收器管理器获取对应通道的接收器实例
设置Reader的唯一标识(基于接收器ID的哈希值)
获取全局通道管理器实例
调用JoinTheTopology()将Reader注册到拓扑图,实现服务发现

4.2 reader 中JoinTheTopology函数详解
1、添加拓扑变更监听器
功能:注册通道变更回调函数OnChannelChange,当拓扑中写入器(Writer)增删时触发
实现:通过std::bind绑定成员函数到当前对象,std::placeholders::_1表示回调时传入变更事件参数
作用:实现动态拓扑感知,确保Reader能及时响应写入器变化(如新Writer上线/下线)
2、获取并连接对端写入器
流程 :
- 从
role_attr_提取通道名称 - 通过
channel_manager_查询该通道当前所有写入器(Writer) - 遍历写入器列表,调用
receiver_->Enable(writer)建立连接
设计意图:主动连接已存在的写入器,实现即插即用的服务发现
关键点:Enable方法可能完成TCP连接建立、QoS协商、序列化协议匹配等底层操作
3、注册自身到全局拓扑
参数解析:
role_attr_:包含节点名、通道名、ID等元信息
ROLE_READER:声明当前组件为Reader角色
HasSerializer<MessageT>::value:指示消息类型是否需要序列化(影响传输协议选择)
作用:将Reader注册到全局拓扑管理器,使其他组件(如Writer)能通过服务发现找到该Reader

