当前位置: 首页 > news >正文

CyberRT学习(一)

一、listener和talker整体代码实现研究(自上而下)

1、cyber/expamples目录下listener.cc、talker.cc

(1)talker(发布者) 整体流程,init cyber,然后创建node节点,再通过节点里面的CreateWriter方法构造发布的话题句柄,再通过句柄里面的Write方法进行数据发布。

(2)listener(订阅者)整体流程,init cyber,然后创建node节点,再通过节点里面的CreateReader方法构造订阅的话题,再通过MessageCallback回调,接收话题发布的数据。

2、node节点里面的构造cyber/node/node.cc

构造里面创建两个方法

new NodeChannelImpl(node_name)

new NodeServiceImpl(node_name)

实际上node中调用CreateWriter和CreateReader方法均是在NodeChannelImpl对象里面实现

其核心方法主要是Writer对象里面的Init()方法

Init()方法详解

该函数是Writer<MessageT>类的初始化方法,负责完成写入器的核心资源创建与拓扑注册,采用线程安全、延迟初始化的设计模式。

(1)线程安全初始化检查

(2)传输器动态创建

(3)角色属性更新,将传输器生成的唯一ID(通过HashValue()计算)设置到role_attr_中,用于后续服务发现时标识当前写入器实例,确保拓扑中节点标识的唯一性。

(4)服务发现拓扑注册

频道管理器获取:通过全局单例TopologyManager获取频道管理器channel_manager_,用于后续节点注册与拓扑维护。

拓扑加入:调用JoinTheTopology()方法将当前写入器注册到服务发现系统。

JoinTheTopology()方法详解

(1)注册拓扑变更监听器

事件驱动机制:通过 channel_manager_ 注册一个拓扑变更监听器,当服务发现系统检测到Reader加入/离开通道时,触发 OnChannelChange 回调。

回调绑定:使用 std::bind 将当前对象的成员函数 OnChannelChange 绑定为回调,std::placeholders::_1 表示事件参数(如变更类型、读者属性)会传递给回调。

连接管理:返回的 change_conn_ 是一个监听连接句柄,用于后续在 LeaveTheTopology() 中取消监听(避免内存泄漏)。

设计意图:实现动态拓扑适应,无需重启写入器即可响应读者变化,提升系统弹性。

(2)获取通道reader的列表

从 role_attr_ 中获取当前写入器的通道名称(如 sensor_topic),作为服务发现的标识符。

查询活跃读者:调用 channel_manager_ 的 GetReadersOfChannel 方法,查询当前已订阅该通道的所有读者(Reader)的元数据(如节点ID、消息类型、QoS配置等),结果存储在 readers 向量中。

设计意图:在写入器加入拓扑前,主动发现已存在的读者,建立初始连接,避免读者后加入时的延迟。

(3)启用与Reader的传输连接

(4)正式注册写入器到服务拓扑

Enable方法详解(这个tramsmitter_因不同的mode格式会有不同的实现,默认是HYBRID,所以进到HYBRID的实现)

(1)关系识别与过滤

关系分类器:GetRelation() 根据对端属性(如节点ID、进程ID、主机地址)判断双方关系类型(如进程内、同主机、跨主机),后面根据这个类型关系选择相应的连接方式。

无效过滤:若返回 NO_RELATION(如协议不兼容、安全策略阻止),直接退出,避免无效连接尝试。

设计意图:通过关系分类实现传输协议的智能选择(如进程内用INTRA,同主机用SHM,跨主机用RTPS)。

 2. 连接管理与路由映射

线程安全连接管理:通过互斥锁 mutex_ 保护共享状态,确保多线程环境下连接注册的原子

路由表映射:

mapping_table_[relation]:将关系类型映射到传输器索引(如 INTRA=0, SHM=1, RTPS=2)。receivers_:按关系类型分组的读者ID集合,记录当前传输器负责的对端节点。

transmitters_:存储具体传输器实例的数组,根据关系类型选择对应传输器并启用连接。

设计意图:实现连接的多路复用与动态路由,支持单一传输器管理多种协议的连接。

 3. 历史消息同步

TransmitHistoryMsg(opposite_attr);

新连接初始化:当新读者加入时,自动触发历史消息回放(如订阅前的关键数据),确保新对端快速同步状态。

设计意图:提升系统的状态一致性即插即用(Plug-and-Play)能力

Writer Init()中的 transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);详解

不传入参数默认mode是HYBRID

3、cyber/transport目录详解

Writer Init()中的 transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);详解

不传入参数默认mode是HYBRID

CreateTransmitter()方法实际实现,根据消息类型和传输模式动态创建特定类型的传输器实例(默认类型是不执行Enable)

3.1各种通信模式详解

集中模式详解

  • HybridTransmitter
    集成 INTRA(进程内)、SHM(共享内存)、RTPS(跨主机) 三种传输模式,根据通信节点关系(如同一进程、同主机不同进程、跨主机)自动选择最优协议。例如:
    • 同一进程内通信 → 调用 IntraTransmitter(直接函数调用,零拷贝开销)。
    • 同主机不同进程 → 调用 ShmTransmitter(通过共享内存减少序列化/反序列化开销)。
    • 跨主机通信 → 调用 RtpsTransmitter(基于 DDS 协议实现可靠传输)。
  • 其他模式
    • IntraTransmitter:仅支持进程内通信,通过函数调用直接传递消息,无中间层开销。
    • ShmTransmitter:仅支持同主机进程间通信,依赖共享内存实现高效数据交换。
    • RtpsTransmitter:仅支持跨主机通信,基于 RTPS 协议(DDS 标准子集)实现可靠传输,但需处理网络层复杂性(如 QoS 协商、心跳检测)。

(1)IntraTransmitter

1. 进程内通信的本质优势

零拷贝传递:同一进程内线程/组件间通信可直接通过函数调用或共享内存块传递消息指针,无需序列化/反序列化(避免CPU开销)。

无网络层开销:无需建立TCP连接、处理网络协议栈(如TCP/IP)、协商QoS参数或管理网络连接状态。

线程安全天然性:通过进程内调度器(如IntraDispatcher)集中管理消息分发,避免多线程竞争。

2、代码拆解

幂等设计:通过enabled_标志确保多次调用不会重复初始化,符合“首次使用初始化

单例调度器:IntraDispatcher::Instance()返回进程内全局唯一的消息调度器,负责:注册消息处理回调函数(如std::bind绑定订阅者函数)。

管理消息队列的线程安全分发(如通过std::mutex保护队列操作)。

触发消息传递的实时性保证(如直接内存访问,延迟纳秒级)。

无资源分配:无需创建共享内存、通知器或网络套接字,避免资源泄漏风险。

(2)ShmTransmitter

动态计数器选择:

     Arena模式:根据消息类型(如RawMessagePyMessageWrap)选择序列化计数器或Arena内存池计数器,适配不同消息处理模式。

    非Arena模式:直接使用序列化计数器,简化处理逻辑。

原子操作:fetch_add(1)确保多线程环境下计数器增量的原子性,避免竞争条件。

设计意图:通过接收器计数跟踪活跃连接数量,为后续资源分配和传输器激活提供依据。

接收器计数校验:

      双计数器(serialized_receiver_count_arena_receiver_count_)均需大于零,否则报错退出,确保存在有效接收器。

Arena管理器初始化:

     通过单例ProtobufArenaManager启用Arena内存池,并绑定到当前通道ID,实现内存池的复用和管理。错误处理:若Arena启用或段分配失败,记录错误并终止流程,避免无效状态。

 共享内存与通知器:

    SegmentFactory:根据通道ID创建共享内存段,负责数据存储和跨进程访问。

    NotifierFactory:创建跨进程通知器(如信号量、事件),实现数据就绪的同步通知。

状态标记:enabled_ = true表示传输器已准备好发送/接收消息。

(3)RtpsTransmitter

1. 启用状态幂等校验

通过enabled_标志确保多次调用Enable()不会重复初始化,符合“单次激活”原则

2. 依赖组件有效性检查

关键依赖校验:participant_是RTPS协议的核心实体(类似DDS的DomainParticipant),负责管理发布者/订阅者的生命周期。

3. RTPS发布者创建

发布者实例化:通过participant_->CreatePublisher()创建RTPS发布者,绑定到指定通道并应用QoS策略。

QoS配置作用:

    可靠性:决定消息是否可靠传输(如RELIABLE模式重传丢失数据,BEST_EFFORT模式忽略丢失)。

   历史策略:控制发布者 保留的历史消息数量(如KEEP_LAST_10保留最近10条)。

   生命周期:管理发布者的自动激活/销毁条件。

错误处理:若发布者创建失败(如资源不足、参数错误),立即终止初始化并标记未启用。

4. 传输器激活

4、cyber/node/reader.h详解

listener.cc里面的CreanteReader最终走到这个函数体

4.1 reader Init()详解

1、初始化状态检查

  使用std::atomicexchange方法实现线程安全的初始化标记。若已初始化则直接返回true,避免重复初始化。

2、统计模块注册
statistics_center->RegisterChanVar()向全局统计中心注册当前Reader的统计变量(如消息延迟、处理时间等)。role_attr_包含节点名、通道名等元信息。

3、消息处理函数绑定

延迟统计:通过Time::Now()获取当前时间戳,与消息接收时间戳计算处理延迟(proc_done_time - proc_start_time

网络延迟:通过GetProcStatus获取消息发送时间,与接收时间计算网络传输延迟

4、任务调度与绑定

创建DataVisitor作为消息访问器,管理消息队列(容量由pending_queue_size_控制)

通过CreateRoutineFactory将lambda函数包装为协程任务

调度器sched创建任务,任务名由节点名+通道名组成(如node_channel

5、拓扑管理与接收器绑定

     从接收器管理器获取对应通道的接收器实例

    设置Reader的唯一标识(基于接收器ID的哈希值)

    获取全局通道管理器实例

    调用JoinTheTopology()将Reader注册到拓扑图,实现服务发现

4.2 reader 中JoinTheTopology函数详解

1、添加拓扑变更监听器

   功能:注册通道变更回调函数OnChannelChange,当拓扑中写入器(Writer)增删时触发

   实现:通过std::bind绑定成员函数到当前对象,std::placeholders::_1表示回调时传入变更事件参数

   作用:实现动态拓扑感知,确保Reader能及时响应写入器变化(如新Writer上线/下线)

2、获取并连接对端写入器

流程 :

  1. role_attr_提取通道名称
  2. 通过channel_manager_查询该通道当前所有写入器(Writer)
  3. 遍历写入器列表,调用receiver_->Enable(writer)建立连接

设计意图:主动连接已存在的写入器,实现即插即用的服务发现

关键点:Enable方法可能完成TCP连接建立、QoS协商、序列化协议匹配等底层操作

3、注册自身到全局拓扑

参数解析:

role_attr_:包含节点名、通道名、ID等元信息

ROLE_READER:声明当前组件为Reader角色

HasSerializer<MessageT>::value:指示消息类型是否需要序列化(影响传输协议选择)

作用:将Reader注册到全局拓扑管理器,使其他组件(如Writer)能通过服务发现找到该Reader

http://www.dtcms.com/a/594921.html

相关文章:

  • 扁平化网站登录界面西安网站seo价格
  • 南京做网站是什么基于阿里云的电商网站建设
  • 网站建设模板双人招聘seo网站推广
  • 2017免费网站空间简单制作html静态网页
  • 长沙h5建站创建公司需要什么条件
  • 公司网站运营方案策划西安网站建设外包
  • 花都网站建设 骏域网站京东商城网站建设目的
  • 网站的建设外链优化wordpress 邀请码插件
  • 做家乡特产的网站少儿编程价格表
  • 短路计算点选择
  • 厂房建设招标网站小公司做网站多少钱
  • WebForms Controls:深入解析与最佳实践
  • 建营销型网站前端低代码开发
  • 虚拟主机安装网站9款好评不断的网页设计工具
  • 怎么制作外贸网站模板开发工程师是程序员吗
  • 写的网站怎么做接口wordpress无觅
  • lol做框网站vs做网站创建项目时选哪个
  • 简述营销型企业网站建设的内容做网站哪里最便宜
  • 唯品会口红数据采集:双请求策略与JSON数据爬取实战
  • 修改文件权限--- chmod ,vi/vim,yum-软件包管理器,systemctl管理系统服务
  • 网站动效是代码做的吗全国信息企业公示系统
  • 个人网站制作体会长沙seo推广营销
  • thinkphp网站开发房产官方网站
  • 网站建设的预算费用番禺人才网最新招聘市场在哪里?
  • 谷歌收录网站wordpress 代码编写
  • 建立什么样的网站好梅州网站优化公司
  • 中国网站优化哪家好北京响应式网站建设费用
  • 东莞厚街网站建设百度网站怎么做的赚钱吗
  • wordpress网站白屏手表网站功能设计
  • 内蒙古网站设计360安全浏览器