当前位置: 首页 > news >正文

opendds初入门之qos策略初了解(有遗留)

从opendds中自带的相关测试样例中可以看出,对qos的控制,一般是首先获取各默认的qos,然后创建对应实例时以参数传参的方式进行设置生效。

这里主要练习:

​ 1:相关qos的生效测试。

​ 2:通过qos配置文件对qos模块的生效测试。

在这里插入图片描述

0. 思考

这里的前置条件与业务设计时的topic和transport方案有关,是设计一个topic,还是多个topic进行业务设计;使用的传输方式如何设计,使用一个链路支持所有传输,还是设置多个链路进行支持;是否支持事务处理等。

DomainParticipant、Publisher、Subscriber、Topic、DataWriter、DataReader中相关的qos控制也是从idl中配置进行设置的。

2. 首先了解QOS相关代码(DdsDcpsCore.idl)

1.传输层Transport

首先是传输层Transport,其实是不涉及qos的,但是涉及底层的传输方式设置:

=====》这里有必要了解几种传输方式,如何选择?以及如果设计的初始化。(核心关注TransportRegistry单例)。

=====》有必要对传输设置进行简单的思考:(猜测,初始化的时候,从配置文件或者参数设置,加上系统支持的默认传输方式,在启动的时候会设置传输方式;多种传输方式的控制(单例进行关联),通过字符串命名找到对应的传输实例,传输实例真正进行传输)。

//这里只是梳理了Transport所支持的传输方式,实现统一,后续create_inst都是继承自TransportType返回对应真正传输实例(继承的方式实现工厂)。
//提供统一的接口,用于支持初始化时对不同传输方式类型的实例构建。
bool TransportRegistry::register_type(const TransportType_rch& type)
{DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);const OPENDDS_STRING name = type->name();GuardType guard(this->lock_);if (type_map_.count(name)) {return false;}type_map_[name] = type;if (name == "rtps_udp") {type_map_["rtps_discovery"] = type;}return true;
}
//如下是查看到的支持的接口: 支持inforepo,multicast,rtpsudp,shmem,tcp,udp
C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\InfoRepoDiscovery\InfoRepoDiscovery.cpp:1094  {1095    TransportRegistry* registry = TheTransportRegistry;1096:   if (!registry->register_type(make_rch<InfoRepoType>())) {1097      return;1098    }
C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\transport\multicast\MulticastLoader.cpp:39  40    TransportRegistry* registry = TheTransportRegistry;41:   if (!registry->register_type(make_rch<MulticastType>())) {42      return 0;43    }C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\transport\rtps_udp\RtpsUdpLoader.cpp:43    TransportRegistry* registry = TheTransportRegistry;44    TransportType_rch type = make_rch<RtpsUdpType>();45:   if (!registry->register_type(type)) {46      return;47    }C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\transport\shmem\ShmemLoader.cpp:39  40    TransportRegistry* registry = TheTransportRegistry;41:   if (!registry->register_type(make_rch<ShmemType>())) {42      return 0;43    }C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\transport\tcp\TcpLoader.cpp:56  57    TransportRegistry* registry = TheTransportRegistry;58:   if (!registry->register_type(make_rch<TcpType>())) {59      return 0;60    }C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\transport\udp\UdpLoader.cpp:43  44    TransportRegistry* registry = TheTransportRegistry;45:   if (!registry->register_type(make_rch<UdpType>())) {46      return 0;47    }
//随便查看ishapes中main函数的入口,可以看到相关注册传输方式的入口:TransportConfig_rch config = TransportRegistry::instance()->create_config("rtps_interop_demo");TransportInst_rch inst = TransportRegistry::instance()->create_inst("the_rtps_transport","rtps_udp");RtpsUdpInst_rch rui = static_rchandle_cast<RtpsUdpInst>(inst);config->instances_.push_back(inst);TransportRegistry::instance()->global_config(config);
//相关逻辑梳理:TransportRegistry::instance(): 获取 TransportRegistry 单例,管理传输类型和配置。create_config: 创建 TransportConfig 对象,存储多个 TransportInst。create_inst: 创建 TransportInst(此处为 RtpsUdpInst),指定传输类型 "rtps_udp"。static_rchandle_cast<RtpsUdpInst>: 将通用 TransportInst 转换为具体类型 RtpsUdpInst。config->instances_: 存储 TransportInst 列表。global_config: 设置全局传输配置,影响所有 DDS 实体。

2. DomainParticipant相关qos(DomainParticipant)。

这里主要针对的是管理层的交互,transport的一些设置吧。

  struct DomainParticipantFactoryQos {EntityFactoryQosPolicy entity_factory; //是否自动启用新创建的 DomainParticipant,默认为true};struct DomainParticipantQos { //dds域的入口,管理Publisher, Subscriber, 和 Topic。UserDataQosPolicy user_data;  //附加用户数据 存储元数据(如平台标识、仿真场景信息),在发现过程中传递。EntityFactoryQosPolicy entity_factory; //是否自动启动Publisher, Subscriber, 和 Topic,默认为true。PropertyQosPolicy property;  //一些键值对设置  配置自定义属性,如发现协议参数、传输配置等。};//这个键值对的设计也是有内部规则的: 这个没有验证,规则是这样,必须按严格的字符串设置。
DDS::DomainParticipantQos dp_qos;
participant_factory->get_default_participant_qos(dp_qos);
dp_qos.property.value.length(1);
dp_qos.property.value[0].name = "dds.transport.RTPS_UDP.port";
dp_qos.property.value[0].value = "7400";
dp_qos.property.value[0].propagate = true;//这里的内部规则主要是针对传输设置(端口,级别,方式,心跳等),还有传输序列化方式。 

3. Topic相关qos(topic):

这里只是在使用层的角度首先做初步了解,详细内部原理需要根据相关代码细节进行分析。

  struct TopicQos {TopicDataQosPolicy topic_data;     //附加到 Topic 的用户数据//可存储元信息,自动广播给匹配的 DataReader?  DurabilityQosPolicy durability;    //数据持久性(VOLATILE, TRANSIENT_LOCAL, TRANSIENT, PERSISTENT)//不保留,本地保留,跨进程保留(需外部服务支持),永久存储(需持久性服务)DurabilityServiceQosPolicy durability_service;//配置持久性服务(仅 PERSISTENT 时有效)。//PERSISTENT (跨进程)持久性服务的数据保留策略    控制跨进程存储时的保存时间,保存深度等策略。DeadlineQosPolicy deadline;         	//数据更新期限//DataWriter必须在这个设置期限内发布数据,否则会触发DEADLINE_MISSED_STATUS。LatencyBudgetQosPolicy latency_budget;  //延迟预算//指定数据从发布到接收的最大延迟预算。 影响传输调度,适合高实时场景。//DataReader设置的值肯定大于DataWriter,这个值影响了传输优先级,内部会做判断。(可能需要其他配合)LivelinessQosPolicy liveliness;     	//活跃性检测(AUTOMATIC, MANUAL_BY_PARTICIPANT, MANUAL_BY_TOPIC)。//DataWriter是否活跃, 自动发送心跳/同一DomainParticipant的任何写入触发活跃/特定Topic的写入触发ReliabilityQosPolicy reliability;   	//可靠性(BEST_EFFORT, RELIABLE)。//尽力而为,适合UDP     确保数据到达,适合TCP/RTPS UDPDestinationOrderQosPolicy destination_order; //数据接收顺序(BY_RECEPTION_TIMESTAMP, BY_SOURCE_TIMESTAMP)。//按接收时间排序   按发送时间排序HistoryQosPolicy history;           		//历史数据保存(KEEP_LAST, KEEP_ALL)。//保留最近depth个样本    保留所有样本(受resource_limits限制)ResourceLimitsQosPolicy resource_limits;  	//资源限制(样本、实例等)//控制 DataReader或DataWriter内部缓存资源的QoS策略,限制可以存储的样本数量、实例数量,以及每个实例的样本数量。//所有实例总共最多能存储多少个样本   最多能同时跟踪多少个实例(即不同 key)   每个实例最多能存储多少个样本TransportPriorityQosPolicy transport_priority;  //OpenDDS 扩展,设置传输优先级//传输优先级(影响 TCP/UDP 调度)。   越高优先级越高LifespanQosPolicy lifespan;						// 数据生命周期//数据从发布后开始计时,最多存活多久,超过这个时间的数据将被视为“过期”,不会被订阅者接收。//默认是永不过期, 控制数据在 DataWriter → DataReader 之间的有效时间。//订阅端进行判断,接收的消息时间戳减去发送时间戳,过期则丢掉。//发布端也进行判断,如果过期,直接丢弃,不生效。OwnershipQosPolicy ownership;                   // 数据所有权//多个 DataWriter 是否可以写入同一个实例(key),以及在冲突时谁拥有写权限。//所有 DataWriter 都可以写入同一个实例,数据按时间顺序到达。//只有“strength 最大”的 DataWriter 可以写入该实例,其他被忽略。  主备场景下吧,必须设置成EXCLUSIVE 模式。//所有发布者都可以发布数据,设置策略,订阅者根据订阅端设置的策略,只接收最大权限的主的Datawriter的数据,其他丢弃。DataRepresentationQosPolicy representation;		// 数据表示格式(XCDR1, XCDR2)//决定 DataWriter 如何编码数据、DataReader 如何解码数据。//XCDR2 是 CDR 的扩展版本,支持:@appendable:允许结构体添加字段而不破坏兼容性  @mutable:允许字段重排或重命名 @final:禁止扩展};

4. 发布端相关qos(DataWriter和Publisher):

  //数据存储与持久性   传输调度与实时性   可靠性与活跃性   实例控制与所有权   序列化与扩展性struct DataWriterQos {DurabilityQosPolicy durability; //数据持久性(VOLATILE, TRANSIENT_LOCAL, TRANSIENT, PERSISTENT)//不保留,本地保留,跨进程保留(需外部服务支持),永久存储(需持久性服务)DurabilityServiceQosPolicy durability_service;//配置持久性服务(仅 PERSISTENT 时有效)。//PERSISTENT (跨进程)持久性服务的数据保留策略    控制跨进程存储时的保存时间,保存深度等策略。DeadlineQosPolicy deadline;//指定数据发布的最大时间间隔。//DataWriter必须在这个设置期限内发布数据,否则会触发DEADLINE_MISSED_STATUS。LatencyBudgetQosPolicy latency_budget;//指定从发布到接收的最大延迟预算。//提示性 QoS,影响传输调度,默认为0,适合高实时性场景LivelinessQosPolicy liveliness;   	 //检测 DataWriter 是否活跃(AUTOMATIC, MANUAL_BY_PARTICIPANT, MANUAL_BY_TOPIC)//DataWriter是否活跃,自动发送心跳/同一DomainParticipant的任何写入触发活跃/特定Topic的写入触发ReliabilityQosPolicy reliability;//可靠性(BEST_EFFORT, RELIABLE)。//尽力而为,适合UDP     确保数据到达,适合TCP/RTPS UDPDestinationOrderQosPolicy destination_order;//数据接收顺序(BY_RECEPTION_TIMESTAMP, BY_SOURCE_TIMESTAMP)。//按接收时间排序   按发送时间排序 HistoryQosPolicy history;    //历史数据保存(KEEP_LAST, KEEP_ALL)。//保留最近depth个样本    保留所有样本(受resource_limits限制)//默认:kind = KEEP_LAST, depth = 1.  必须与resource_limits配合ResourceLimitsQosPolicy resource_limits;//资源限制(样本、实例等)//控制 DataReader或DataWriter内部缓存资源的QoS策略,限制可以存储的样本数量、实例数量,以及每个实例的样本数量。//所有实例总共最多能存储多少个样本   最多能同时跟踪多少个实例(即不同 key)   每个实例最多能存储多少个样本TransportPriorityQosPolicy transport_priority;//OpenDDS 扩展,设置传输优先级//传输优先级(影响 TCP/UDP 调度)。   越高优先级越高LifespanQosPolicy lifespan;  //指定数据有效期。    适合时间敏感数据UserDataQosPolicy user_data; //附加用户数据,广播给订阅者。OwnershipQosPolicy ownership;                      //所有权(SHARED, EXCLUSIVE)。//SHARED: 多个DataWriter可写同一实例。     EXCLUSIVE: 仅最高strength的DataWriter可写。OwnershipStrengthQosPolicy ownership_strength;     //独占所有权的强度//设置 EXCLUSIVE 所有权的优先级。  ownership.kind = EXCLUSIVE 时有效。WriterDataLifecycleQosPolicy writer_data_lifecycle;//数据生命周期管理(如 autopurge)。//注销实例时是否自动广播“我不再负责这个实例”的信号DataRepresentationQosPolicy representation; 	//指定序列化格式(如 XCDR2)。};struct PublisherQos {PresentationQosPolicy presentation;   //数据分发的顺序和一致性(INSTANCE, TOPIC, GROUP)。//控制一致性范围:按实例、按Topic、按Publisher所有Topic    是否启用“原子性”分发(多个样本一起投递)   是否按顺序分发样本(按时间戳或接收顺序)//普通数据流    Topic级事务   跨Topic事务、原子性投递//这里是提供特定的开始写事务的接口,统一进行写数据,订阅端按照相同的策略进行取数据。PartitionQosPolicy partition;         //逻辑分区,控制发布-订阅匹配。//发布者和订阅者只有在 分区名匹配 时才能建立连接    用于多租户系统、区域隔离、动态分区切换(分区隔离)GroupDataQosPolicy group_data;        //附加到 Publisher 的用户数据。//标识发布者属性(如版本、角色、部署信息)  用于动态发现、QoS调试、发布者识别。EntityFactoryQosPolicy entity_factory;//控制 DataWriter 是否自动启用。//不是自动启动时就需要手动调用enable()才能开始发送数据。    用于需要延迟启用、QoS预配置、权限控制的场景。};

有意思,可以用专门的接口对相关的策略进行设置,统一进行梳理。

5. 订阅端相关qos:

struct DataReaderQos {DurabilityQosPolicy durability;//控制是否接收历史数据(如在订阅前发布的)(VOLATILE, TRANSIENT_LOCAL, TRANSIENT, PERSISTENT)//不接收历史数据/接收 DataWriter 本地缓存的历史数据,适合“迟到订阅者”/接收外部服务缓存的数据/接收永久存储的数据。//适用于“迟到订阅者”场景,如启动后接收最近状态。DeadlineQosPolicy deadline;		//指定数据发布的最大时间间隔。//要求 DataWriter 在指定周期内发布数据  否则触发 DEADLINE_MISSED_STATUS。//用于监控数据源是否“掉线”或“卡顿”。LatencyBudgetQosPolicy latency_budget; //提示性 QoS,期望数据在多久内送达  可影响调度优先级//适用于高实时性订阅,如雷达、控制系统。    会影响发送数据的优先级LivelinessQosPolicy liveliness;        //检测 DataWriter 是否仍“活着”(AUTOMATIC, MANUAL_BY_PARTICIPANT, MANUAL_BY_TOPIC)//用于判断数据源是否断连,适用于主备切换。   这是要和订阅端对应,配合?ReliabilityQosPolicy reliability;  //控制是否要求可靠传输(RELIABLE)或尽力而为(BEST_EFFORT)//需与DataWriter匹配, 用于关键数据(RELIABLE)或高频流(BEST_EFFORT)DestinationOrderQosPolicy destination_order; //控制样本排序方式: 按接收时间排序   按发送时间排序//用于需要按发送顺序处理数据的场景。HistoryQosPolicy history;   //控制保留多少历史样本://KEEP_LAST(保留最近N个) KEEP_ALL(保留所有)    配合resource_limits使用,影响数据缓存深度ResourceLimitsQosPolicy resource_limits;  //限制缓存资源(总样本数/实例数(key 数)/每实例样本数)//防止内存爆炸,适用于多实例场景。UserDataQosPolicy user_data;//附加用户元数据(如订阅者身份、版本)//用于权限控制、调试OwnershipQosPolicy ownership;//控制是否允许多个写者写同一实例//SHARED(默认)  EXCLUSIVE(只接收 strength 最大者)//用于主备发布者架构。TimeBasedFilterQosPolicy time_based_filter;         //控制最小接收间隔,按时间过滤样本。//用于高频数据流的“降采样”。  默认不过滤,只接收大于这个时间间隔的数据,高频数据降采样。ReaderDataLifecycleQosPolicy reader_data_lifecycle; //读取数据生命周期管理。//控制实例在 NOT_ALIVE 状态下是否自动清理   清理无DataWriter的样本。 清理已处置的样本//用于自动释放资源、避免过期数据堆积。DataRepresentationQosPolicy representation;  //控制可接受的数据序列化格式(如 XCDR2)//需要和DataWriter中设置的对应,可以是其中的一个。TypeConsistencyEnforcementQosPolicy type_consistency;//控制类型一致性检查。   控制类型兼容策略(如强类型、可扩展类型)//@appendable标识的topic的扩展?   用于订阅者使用旧版本结构体的场景。};struct SubscriberQos {PresentationQosPolicy presentation; //订阅者端的数据分发的一致性和顺序语义//控制一致性范围(INSTANCE/TOPIC/GROUP)    是否启用“原子性”分发(多个样本一起投递)   是否按顺序分发样本(按时间戳或接收顺序)PartitionQosPolicy partition;	//控制订阅者所属的逻辑分区,影响与发布者的匹配关系//发布者和订阅者只有在分区名匹配时才能建立连接。   用于构建逻辑隔离(如按区域、业务线划分)GroupDataQosPolicy group_data; 	//附加用户元数据到 Subscriber,发布者可读取但不影响数据流//专门的接口获取  用于权限控制、调试、订阅者识别。EntityFactoryQosPolicy entity_factory;//控制 Subscriber 创建的 DataReader 是否自动启用//不是自动启动的话,手动调用 enable() 才能开始接收数据。 用于需要延迟启用、QoS预配置、权限控制的场景。};

6. 简单总结

有意思的是,了解了qos实际上对整个发布订阅就有了一定的认识,也发现这里的qos之间是有一定的包含关系的。

DomainParticipantQos
├── PublisherQos
│   ├── DataWriterQos
│   │   └── TopicQos
│
├── SubscriberQos
│   ├── DataReaderQos
│   │   └── TopicQos#每一层都可以设置 QoS,下层实体会默认继承上层的 QoS 配置,但也可以显式覆盖。

3. 梳理测试各模块支持的qos

在理论的角度上,根据结构定义,已经对各模块所支持的qos进行梳理和说明,这里对其进行汇总梳理。

核心内容主要在DdsDcpsCore.idl和dds_qos.hpp,以及参考一些demo样例中(比如ishapes中的设置qos源码设置方案),梳理和设置相关qos策略的支持。

各模块qos的生效,除了本地生效参数设置外,需要交互的参数都是通过内置主题实现的传递交互。(那么就关注一下内置主题的获取和处理,BitDataReader是构造模拟数据,从内置主题中读取数据的demo;BuiltInTopicTest是构造内置主题交互,使用专门的monitor进程对内置主题进行监控的demo;monitor也涉及一些内置主题)

1. DomainParticipant

1.DomainParticipantFactoryQos

首先对DomainParticipantFactoryQos进行练习,可以参考:OpenDDS\tests\DCPS\DPFactoryQos中的测试代码。

====》主要测试在设置了该策略为false时,需要手动依次按依赖顺序调用enable()接口才能正常交互。

====》如果各模块没有调用enable(),则默认没有启动,无法交互。

====》调用enable()接口有先后依赖顺序,如DomainParticipant ==》Topic,DomainParticipant 》Subscriber=》DataReader,DomainParticipant 》publisher=》datawriter,如果没有按照顺序启动,enable ()接口调用返回是失败的。

====》简单使用我们的原测试代码中给发布端增加这个策略设置,会发现真的不会发布数据了。

再细节:设置该属性后,下面基于该工厂创建实例时,设置的默认属性实际上并不生效,可以对比查看。

//该策略的核心代码,获取默认策略并进行设置。   设置为false时,需要手动依次按顺序调用enable()接口才能正确使用。DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);// Default DomainParticipantFactory qos is to auto enable.::DDS::DomainParticipantFactoryQos fqos;if (dpf->get_qos (fqos) != ::DDS::RETCODE_OK){cerr << "DomainParticipantFactory get_qos failed." << endl;return 1;}if (!fqos.entity_factory.autoenable_created_entities){cerr << "The DomainParticipantFactory defaults to autoenable upon entities creation." << endl;return 1;}fqos.entity_factory.autoenable_created_entities = false;if (dpf->set_qos (fqos) != ::DDS::RETCODE_OK){cerr << "DomainParticipantFactory set_qos failed." << endl;return 1;}
2.DomainParticipantQos

该策略主要涉及三个参数:user_data(附加内容或者配合静态发现做链路),entity_factory和上面的DomainParticipantFactoryQos中参数功能一样吧,property键值(内部有已经支持的键值对对配置进行支持)

**user_data:**可以用户自定义内容,用于发现匹配校验。

\OpenDDS\tests\DCPS\RtpsDiscovery 通过user_data参数配合内置主题用于测试发现过程。

\OpenDDS\tests\DCPS\StaticDiscoveryReconnect 通过静态配置以及DomainParticipantQos 中的user_data实现按需要订阅到。

所以该user_data的练习有两个点:

(该属性是通过内置主题DCPSParticipant实现的业务交互,也就是说从内置主题中获取到相关信息。)

第一:验证设置该数据和获取该数据。

//在原先的测试代码中,基于内置主题DCPSParticipant 对该数据进行获取
//MytestPublisher.cpp 中进行相关qos设置,增加一些user_data的数据。DDS::DomainParticipantQos part_qos;dpf->get_default_participant_qos(part_qos);const char* str = "MyNode-A1";part_qos.user_data.value.length(strlen(str));memcpy(part_qos.user_data.value.get_buffer(), str, strlen(str));//这里自定义键值对是没意义的,DCPSParticipant内置主题对应的数据读取结构不支持这些键值对的获取。DDS::PropertySeq& props = part_qos.property.value;append(props, "OpenDDS.RtpsRelay.Groups", "Messenger", true);//这里的键值对是没有意义的,opendds不提供上层用户获取方式,所以只能用系统提供的键值对。set_property(part_qos.property.value, "test_key", "test_value");set_property(part_qos.property.value, "app_version", "1.0");DDS::DomainParticipant_var participant = dpf->create_participant(111,part_qos,DDS::DomainParticipantListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);//MytestSubscriber.cpp中对基于内置主题对qos进行读取,这里为了方便直接按已有方案读取。//新增用于监听内置主题的类,这里实际上用wait等其他方式更合适。#include <dds/DCPS/BuiltInTopicUtils.h>class ParticipantBitListener : public virtual DDS::DataReaderListener {public:explicit ParticipantBitListener(const OpenDDS::DCPS::GUID_t& expected_guid): expected_guid_(expected_guid){}void on_data_available(DDS::DataReader_ptr reader) override{try {DDS::ParticipantBuiltinTopicDataDataReader_var part_bit =DDS::ParticipantBuiltinTopicDataDataReader::_narrow(reader);if (CORBA::is_nil(part_bit.in())) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Failed to narrow ParticipantBuiltinTopicDataDataReader\n")));return;}DDS::ParticipantBuiltinTopicDataSeq data;DDS::SampleInfoSeq infos;DDS::ReturnCode_t ret = part_bit->take(data, infos, DDS::LENGTH_UNLIMITED,DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE,DDS::ALIVE_INSTANCE_STATE);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: take failed with code %d\n"), ret));return;}CORBA::ULong len = data.length();std::cout << "Got " << len << " participant samples" << std::endl;for (CORBA::ULong i = 0; i < len; ++i) {if (infos[i].valid_data) {const DDS::ParticipantBuiltinTopicData& d = data[i];std::cout << "-----------------------------------" << std::endl;std::cout << "Participant Key: "<< d.key.value[0] << "."<< d.key.value[1] << "."<< d.key.value[2] << std::endl;std::cout << "User Data: ";for (CORBA::ULong j = 0; j < d.user_data.value.length(); ++j) {std::cout << (char)d.user_data.value[j];}std::cout << std::endl;//从内置主题ParticipantBuiltinTopicData的数据结构可以看出,并不支持对该DomainParticipantQos 设置的qos中键值对从远端获取,只能进行本地打印}}ret = part_bit->return_loan(data, infos);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: return_loan failed with code %d\n"), ret));}}catch (const CORBA::Exception& e) {e._tao_print_exception("ERROR: Exception in on_data_available:");}catch (...) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unknown exception in on_data_available\n")));}}// 其他回调(空实现)void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus&) override {}void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus&) override {}void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus&) override {}void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus&) override {}void on_subscription_matched(DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus&) override {}void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) override {}private:OpenDDS::DCPS::GUID_t expected_guid_;};
//MytestSubscriber.cpp 中增加对内置主题的关联,基于已经创建的订阅端的DomainParticipant_var//实际上就是获取基于内置主题的sub,datareader,以及设置关联listener{OpenDDS::DCPS::DomainParticipantImpl* sub_impl =dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(participant.in());OpenDDS::DCPS::GUID_t sub_repo_id = sub_impl->get_id();// 获取内置订阅者DDS::Subscriber_var bit_sub = participant->get_builtin_subscriber();if (CORBA::is_nil(bit_sub.in())) {ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("ERROR: Failed to get builtin subscriber\n")), 1);}// 创建 DataReader 并设置 ListenerDDS::DataReader_var dr = bit_sub->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC);if (CORBA::is_nil(dr.in())) {ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("ERROR: Failed to lookup BUILT_IN_PARTICIPANT_TOPIC DataReader\n")), 1);}//第一个参数无意义的,可以根据业务扩展,配合这里的user_data DDS::DataReaderListener_var listener = new ParticipantBitListener(sub_repo_id);DDS::DataReaderQos dr_qos;bit_sub->get_default_datareader_qos(dr_qos);dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;dr->set_qos(dr_qos);dr->set_listener(listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK);}

第二:验证基于静态发现和该字段,实现区分匹配的功能(user_data还是只是一个标识,用于说明配置文件中设置发布端或者订阅端的唯一标识,由上层区分)。

//这里直接参考OpenDDS\tests\DCPS\StaticDiscoveryReconnect或者 StaticDiscovery中的代码进行理解,主要是DEFAULT_STATIC的设置。
//DEFAULT_STATIC是opendds的一个扩展,说明不是使用rtps或者inforepo进行相互发现,而是由自己控制。
//设置传输方式为DEFAULT_STATIC 时,实际上也通过配置文件强行设计了对应的发布端和订阅端的唯一标识,可以通过user_data进行关联识别对端。

entity_factory的练习,参考DomainParticipantQos 即可(运行测试代码,设置false不手动按顺序启动无法正常交互):

//首先设置该参数为false,用于设置手动启动。DDS::DomainParticipantQos part_qos;dpf->get_default_participant_qos(part_qos);//对比设置该参数为false和默认的true,发现如果设置为false,链路是不通的。//part_qos.entity_factory.autoenable_created_entities = false;DDS::DomainParticipant_var participant = dpf->create_participant(111,part_qos,DDS::DomainParticipantListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//基于上面设置的false,需要按顺序手动启动才能正确使用,在测试代码中增加如下代码即可:if (participant->enable() != ::DDS::RETCODE_OK|| topic->enable() != ::DDS::RETCODE_OK|| pub->enable() != ::DDS::RETCODE_OK || dw->enable() != ::DDS::RETCODE_OK){cerr << "Failed to enable factory." << endl;return 1;}

property键值对自定义属性:。

这里有一些opendds本身设置的,用于功能的键值对,比如设置relay的分区转发,这里简单验证,如何读取到相关的参数值。

//测试发现,该“键值对”属于opendds内部的扩展方案,并不支持用户自己扩展,订阅端也无法获取到这个信息,只能从本地获取。
//属于opendds的内部扩展,外部用户无法操作。
DDS::PropertySeq& props = part_qos.property.value;
append(props, "OpenDDS.RtpsRelay.Groups", "Messenger", true);//貌似在没有使用relay程序实现中转的场景下是没有什么意义的,只有中转场景(relay中继)下才需要
相关内置主题

这些内置主题协助opendds实现底层交互架构,他们所支持的功能大部分在底层代码中有适配,比如由内置主题控制的qos策略的实现。

提供了可扩展思路,有的qos策略提供了动态用户设置信息,也是基于这些内置主题处理,需要上层业务层获取处理。

//opendds标准的四个主题    const char* const BUILT_IN_PARTICIPANT_TOPIC = "DCPSParticipant";const char* const BUILT_IN_PARTICIPANT_TOPIC_TYPE = "PARTICIPANT_BUILT_IN_TOPIC_TYPE";const char* const BUILT_IN_TOPIC_TOPIC = "DCPSTopic";const char* const BUILT_IN_TOPIC_TOPIC_TYPE = "TOPIC_BUILT_IN_TOPIC_TYPE";const char* const BUILT_IN_SUBSCRIPTION_TOPIC = "DCPSSubscription";const char* const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE = "SUBSCRIPTION_BUILT_IN_TOPIC_TYPE";const char* const BUILT_IN_PUBLICATION_TOPIC = "DCPSPublication";const char* const BUILT_IN_PUBLICATION_TOPIC_TYPE = "PUBLICATION_BUILT_IN_TOPIC_TYPE";//opendds扩展的一些主题const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC = "OpenDDSParticipantLocation";const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE = "PARTICIPANT_LOCATION_BUILT_IN_TOPIC_TYPE";const char* const BUILT_IN_CONNECTION_RECORD_TOPIC = "OpenDDSConnectionRecord";const char* const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE = "CONNECTION_RECORD_BUILT_IN_TOPIC_TYPE";const char* const BUILT_IN_INTERNAL_THREAD_TOPIC = "OpenDDSInternalThread";const char* const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE = "INTERNAL_THREAD_BUILT_IN_TOPIC_TYPE";
常量名含义数据内容作用
BUILT_IN_PARTICIPANT_TOPIC = "DCPSParticipant"参与者(DomainParticipant)主题包含每个参与者的 GUID、QoS、用户数据用于参与者间相互发现
BUILT_IN_TOPIC_TOPIC = "DCPSTopic"用户定义 Topic 信息包含 Topic 名称、类型名、QoS让系统知道有哪些 Topic 存在
BUILT_IN_SUBSCRIPTION_TOPIC = "DCPSSubscription"订阅者信息包含 DataReader 的 QoS、所属 Topic 等让 Publisher 发现并匹配订阅者
BUILT_IN_PUBLICATION_TOPIC = "DCPSPublication"发布者信息包含 DataWriter 的 QoS、所属 Topic 等让 Subscriber 发现并匹配发布者
BUILT_IN_PARTICIPANT_LOCATION_TOPIC = "OpenDDSParticipantLocation"参与者位置(OpenDDS 扩展)包含 Locator、传输通道信息支持 OpenDDS 的多传输发现机制
BUILT_IN_CONNECTION_RECORD_TOPIC = "OpenDDSConnectionRecord"连接记录(OpenDDS 扩展)包含连接状态、通道、错误等用于调试/监控传输连接状态
BUILT_IN_INTERNAL_THREAD_TOPIC = "OpenDDSInternalThread"内部线程监控(OpenDDS 扩展)包含线程名、CPU 核心绑定、状态用于运行时性能监

2. Topic

基于DataReaderQos和DataWriteQos策略了解练习后,topic其实和他们类似。

1.使用内置主题BUILT_IN_TOPIC_TOPIC打印设置的topicQos策略。

2.topic相关策略的设置,应该是要兼容DataReaderQos和DataWriteQos的策略的,比如topic设置相关持久化策略,要Datawriter也是这兼容这种策略才可以。

3.相关qos策略的真正验证(持久性,传输策略(尽力而为,可靠传输),优先级,保活策略等)。

3. Subscriber(注意订阅pub端内置主题获取到对方的qos)

首先,从初步的探索看,该模块对应的SubscriberQos策略是直接定义好的,功能模块是独立的,无法由用户进行操作,按需要设置即可。

=====>注意,这里有意思的是,在测试时才发先,SubscriberQos对应的策略,实际上是一个组策略,在底层真正Datareader时才用于交互的,所以用内置主题读取这个时,内置主题有很多的参数定义。

// 从struct SubscriberQos定义中进行分析,presentation控制发送顺序,partition实现分区控制,group_data用户数据,entity_factory同上文一致,控制是否自动启动。 struct SubscriberQos {PresentationQosPolicy presentation; //订阅者端的数据分发的一致性和顺序语义//INSTANCE_PRESENTATION_QOS 是一个topic的一个实例多次发布保持一致//TOPIC_PRESENTATION_QOS是一个topic的多个实例保持一致//GROUP_PRESENTATION_QOS是多个topic的多个实例保持一致PartitionQosPolicy partition;	//控制订阅者所属的逻辑分区,影响与发布者的匹配关系//发布者和订阅者只有在分区名匹配时才能建立连接。   用于构建逻辑隔离(如按区域、业务线划分)GroupDataQosPolicy group_data; 	//附加用户元数据到 Subscriber,发布者可读取但不影响数据流//专门的接口获取  用于权限控制、调试、订阅者识别。EntityFactoryQosPolicy entity_factory;//控制 Subscriber 创建的 DataReader 是否自动启用//不是自动启动的话,手动调用 enable() 才能开始接收数据。 用于需要延迟启用、QoS预配置、权限控制的场景。};

考虑测试方案:

1.简单验证事务的分发顺序一致性(presentation主要用于控制事务逻辑,topic的控制事务逻辑)。

=====》或许是这样的目的,期望实现以事务为目标的交互方案,但是测试时发现一直测试不符合现象,PresentationQosPolicy中设置的两个参数无法生效。

=====》未跟踪到根因,怀疑可能版本不支持这样的功能。

//需要发布端和订阅端策略一致,然后构造事务进行写数据和读数据验证。
//在发布端MytestPublisher.cpp增加qos策略及构造对应的发送。pub_qos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS; // 按组一致性pub_qos.presentation.coherent_access = true; // 启用一致性访问pub_qos.presentation.ordered_access = true; // 可选:按序分发pub->set_qos(pub_qos);//同时构造对应的事务方式进行发布数据pub->begin_coherent_changes();while (message.counter < 9){message.counter++;memset(tMsg, 0, 50);sprintf(tMsg, "begin test ==>Msg Counter : %d", message.counter);message.text = ::TAO::String_Manager(tMsg);message_dw->write(message, handle);ACE_OS::sleep(3);cout << ".........." << "message.counter =" << message.counter << endl;}pub->end_coherent_changes();
//在订阅端MytestSubscriber.cpp增加qos策略及对应的接收:sub_qos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS; // 按组一致性sub_qos.presentation.coherent_access = true; // 启用一致性访问sub_qos.presentation.ordered_access = true; // 可选:按序分发sub->set_qos(sub_qos);//在对应的数据回调处增加事务处理,这里不符合预期现象:void DataReaderListener::on_data_available(DDS::DataReader_ptr reader){++num_reads_;try {DemoTopic1DataReader_var message_dr = DemoTopic1DataReader::_narrow(reader);if (CORBA::is_nil(message_dr.in())) {cerr << "read: _narrow failed." << endl;exit(1);}// 获取订阅者DDS::Subscriber_var subscriber = reader->get_subscriber();subscriber->begin_access(); // 开始处理一致性集DemoTopic1Seq data;DDS::SampleInfoSeq infos;DDS::ReturnCode_t ret = message_dr->take(data, infos, DDS::LENGTH_UNLIMITED,DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE,DDS::ALIVE_INSTANCE_STATE);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: take failed: %d\n"), ret));return;}// 验证一致性集std::cout << "Received coherent set with " << data.length() << " samples:" << std::endl;for (CORBA::ULong i = 0; i < data.length(); ++i) {if (infos[i].valid_data) {std::cout << "  Sample " << i << ": counter = " << data[i].counter << std::endl;}}ret = message_dr->return_loan(data, infos);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: return_loan failed: %d\n"), ret));}subscriber->end_access(); // 结束一致性集处理}catch (CORBA::Exception& e) {cerr << "Exception caught in read:" << endl << e << endl;exit(1);}}//对应的内置主题监控中BUILT_IN_PUBLICATION_TOPIC 对应的listener中可以增加对更多qos策略的输出cout << "presentation.access_scope is:" << data[i].presentation.access_scope << "   coherent_access:" << data[i].presentation.coherent_access <<"  ordered_access:"<< data[i].presentation.ordered_access << endl;

结果说明:发现一直是不触发qos策略的,日志如下:presentation.access_scope is:1 coherent_access:0 ordered_access:0。

有关这个事务的支持,自己验证是没通过的,发布端需要qos策略支持,以及begin_coherent_changes()和end_coherent_changes()内部构造事务数据进行发布,订阅端需要通过begin_access()和end_access()实现对应取数据,理应一次取到这些数据。

====》这里有没有可能是对应的Datawrite或者Datareader设置qos时对pub/sub进行覆盖了。

2.验证分区匹配的的功能(partition定义后,opendds底层做过处理,通过现象说明)。
//分别在发布端和订阅端定义对应qos策略,查看匹配个数。  参考OpenDDS\tests\DCPS\BuiltInTopicTest中monitor.cpp代码
//简单验证  在MytestPublisher.cpp 中新增DDS::PublisherQos pub_qos; //获取默认的qos并进行设置 participant->get_default_publisher_qos(pub_qos);pub_qos.partition.name.length(1);pub_qos.partition.name[0] = "TestPartition";DDS::Publisher_var pub = participant->create_publisher(pub_qos,  DDS::PublisherListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);//.... 这里省去匹配过程和个数的校验。pub_qos.partition.name.length(1);pub_qos.partition.name[0] = "TestPartition_change";pub->set_qos(pub_qos);ACE_OS::sleep(2);//为了观察现象,DDS::PublicationMatchedStatus matches;dw->get_publication_matched_status(matches);cout << "has get count change: " << matches.current_count << endl;
//在MytestSubscriber.cpp中新增 DDS::SubscriberQos sub_qos;participant->get_default_subscriber_qos(sub_qos);sub_qos.partition.name.length(1);sub_qos.partition.name[0] = "TestPartition"; //这里底层做了处理,一致才能关联DDS::Subscriber_var sub = participant->create_subscriber(sub_qos,DDS::SubscriberListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);

观察现象:

====》可以看到pub端在第一次TestPartition对应时,等待连接可以连接上,连接数是1;

====》连接后修改该字段,发现连接断开,无法正常进行发布订阅。

3.验证用户携带数据(group_data由内置主题携带进行传播,这里参考设置的策略是交互的频率)。
//使用内置主题对相关数据字段进行提取即可。
//MytestPublisher.cpp 中新增:char GROUP_DATA[] = "Initial GroupData";char UPDATED_GROUP_DATA[] = "Updated GroupData";DDS::PublisherQos pub_qos;participant->get_default_publisher_qos(pub_qos);CORBA::ULong group_data_len = static_cast<CORBA::ULong>(ACE_OS::strlen(GROUP_DATA));pub_qos.group_data.value.length(group_data_len);pub_qos.group_data.value.replace(group_data_len, group_data_len, reinterpret_cast<CORBA::Octet*>(GROUP_DATA));DDS::Publisher_var pub = participant->create_publisher(pub_qos,DDS::PublisherListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);//... 这里中间的业务不关注,group_data_len = static_cast<CORBA::ULong>(ACE_OS::strlen(UPDATED_GROUP_DATA));pub_qos.group_data.value.length(group_data_len);pub_qos.group_data.value.replace(group_data_len,group_data_len,reinterpret_cast<CORBA::Octet*>(UPDATED_GROUP_DATA));pub->set_qos(pub_qos);//MytestSubscriber.cpp中新增使用内置主题进行读取数据。//在适当的位置增加监控对应的内置主题 {//这里处理sub对应的内置主题:DDS::Subscriber_var bit_sub = participant->get_builtin_subscriber();DDS::DataReader_var dr_sub = bit_sub->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC);DDS::DataReaderListener_var sub_listener = new SubscriptionBitListener();dr_sub->set_listener(sub_listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); //该sub,datareader也是可以设置qos的通过set_qos}//这里注意使用专门的listener对内置主题进行监控的:class SubscriptionBitListener : public virtual DDS::DataReaderListener {
public:SubscriptionBitListener() {}void on_data_available(DDS::DataReader_ptr reader) override{try {//DDS::SubscriptionBuiltinTopicDataDataReader_var sub_bit = DDS::SubscriptionBuiltinTopicDataDataReader::_narrow(reader);//DDS::SubscriptionBuiltinTopicDataSeq data;DDS::PublicationBuiltinTopicDataDataReader_var  sub_bit = DDS::PublicationBuiltinTopicDataDataReader::_narrow(reader);DDS::PublicationBuiltinTopicDataSeq data;DDS::SampleInfoSeq infos;DDS::ReturnCode_t ret = sub_bit->take(data, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: take failed: %d\n"), ret));return;}for (CORBA::ULong i = 0; i < data.length(); ++i) {if (infos[i].valid_data) {//实际上这个内置主题PublicationBuiltinTopicDataSeq有很多参数,这里用于该模块下的DataReader管理std::string group_data(reinterpret_cast<char*>(data[i].group_data.value.get_buffer()), data[i].group_data.value.length());//std::string partition(data[i].partition.name[0].in());//这里和主线程是多线程的,处理数据要加锁的。   partition取数据要判断长度,小心越界//ACE_DEBUG((LM_INFO, ACE_TEXT("Received group_data=%C, partition=%C\n"),group_data.c_str(), partition.c_str()));cout << "group_data is:" << group_data  << endl;}}sub_bit->return_loan(data, infos);}catch (const CORBA::Exception& e) {e._tao_print_exception("ERROR:");}catch (...) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unknown exception in on_data_available\n")));}}// 其他回调(空实现)void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus&) override { cout << "11111" << endl; }void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus&) override { cout << "111110" << endl; }void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus&) override { cout << "111112" << endl; }void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus&) override { cout << "111113" << endl; }void on_subscription_matched(DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus&) override { cout << "111114" << endl; }void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) override { cout << "111115" << endl; }
};

====》好惨,练习时躺坑: 1:在订阅端关注内置主题时,实际上时对端内置主题,即在MytestSubscriber.cpp中应该增加的是对BUILT_IN_PUBLICATION_TOPIC的监控(listener取数据对应)。 2:回调中取数据要注意,一个数组越界的问题在运行时是不显示问题的,异步线程执行。

查看运行结果:

#可以看到业务层正确获取到用户数据。
E:\opendds_test_251010\idl\Release>subscriber.exe
group_data is:Initial GroupData
DataReaderListener::on_subscription_matched
group_data is:Updated GroupData
DataReaderListener::on_liveliness_changed
4.是否设置默认自启动(参考DomainParticipantQos 中的entity_factory即可)。

4. Datareader

在已经了解了相关qos的前提下,该模块的qos策略是蛮多的,主要包含如下:

====》1.有关持久性的策略及历史保存样本数设置(单个,所有)。

====》2.数据发布/接收间隔控制,数据的有效期(延迟)控制。

====》3.保活设置。

====》4.传输策略设置(尽力而为还是可靠传输)。

====》5.传输排序顺序设置(按顺序发送/接收)。

====》6.设置传输数据的优先级。(优先传输)

====》7.用户附加数据,所有权问题(主备),数据生命周期管理(注销时的通知消息~)。

1.验证获取对应的DataReaderQos(通过内置主题DCPSSubscription或者DCPSPublication)

这里用的内置主题和sub端获取对应pub端内置主题一样,使用BUILT_IN_PUBLICATION_TOPIC及对应的topic进行解析。

//这里是在订阅端获取发布端的qos,然后和sub订阅pubQos是一样的逻辑,
//在订阅端MytestSubscriber.cpp增加内置主题的关注:{DDS::Subscriber_var bit_pub = participant->get_builtin_subscriber();DDS::DataReader_var dr_pub = bit_pub->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC);DDS::DataReaderListener_var sub_listener = new SubscriptionBitListener();dr_sub->set_listener(sub_listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); }//注意sub端关注的是对端的qos,这里datareader关注的是datawriter对应的qos,这里内置主题用"DCPSPublication"//增加对应的监听类及相关数据的打印类,这里就能直观的看到相关策略设置。
//在对应的内置主题listener回调中增加如下打印:void on_data_available(DDS::DataReader_ptr reader) override{try {DDS::PublicationBuiltinTopicDataDataReader_var  sub_bit = DDS::PublicationBuiltinTopicDataDataReader::_narrow(reader);DDS::PublicationBuiltinTopicDataSeq data;DDS::SampleInfoSeq infos;DDS::ReturnCode_t ret = sub_bit->take(data, infos, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ALIVE_INSTANCE_STATE);if (ret != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: take failed: %d\n"), ret));return;}for (CORBA::ULong i = 0; i < data.length(); ++i) {if (infos[i].valid_data) {const DDS::PublicationBuiltinTopicData& pub_data = data[i];print_publication_builtin_topic_data(pub_data);}}sub_bit->return_loan(data, infos);}catch (const CORBA::Exception& e) {e._tao_print_exception("ERROR:");}catch (...) {ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Unknown exception in on_data_available\n")));}}//增加该成员函数,打印对应的内置主题获取到的topic数据(PublicationBuiltinTopicData):std::string print_octet_array16(const DDS::OctetArray16& arr) {std::ostringstream oss;oss << std::hex << std::setfill('0');for (size_t i = 0; i < 16; ++i) {oss << std::setw(2) << static_cast<int>(arr[i]);if (i != 15) oss << " ";}return oss.str();}void print_publication_builtin_topic_data(const DDS::PublicationBuiltinTopicData& pub_data){std::cout << "PublicationBuiltinTopicData at " << std::time(nullptr) << ":\n";// 1. keystd::cout << "  Key: " << print_octet_array16(pub_data.key.value) << "\n";// 2. participant_keystd::cout << "  Participant Key: " << print_octet_array16(pub_data.participant_key.value) << "\n";// 3. topic_namestd::cout << "  Topic Name: " << (pub_data.topic_name.in() ? pub_data.topic_name.in() : "null") << "\n";// 4. type_namestd::cout << "  Type Name: " << (pub_data.type_name.in() ? pub_data.type_name.in() : "null") << "\n";// 5. durabilitystd::cout << "  Durability: kind=" << (pub_data.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? "VOLATILE" : "TRANSIENT_LOCAL") << "\n";// 6. durability_servicestd::cout << "  Durability Service: service_cleanup_delay=" << pub_data.durability_service.service_cleanup_delay.sec << "s "<< pub_data.durability_service.service_cleanup_delay.nanosec << "ns, "<< "history_kind=" << (pub_data.durability_service.history_kind == DDS::KEEP_LAST_HISTORY_QOS ? "KEEP_LAST" : "KEEP_ALL") << ", "<< "history_depth=" << pub_data.durability_service.history_depth << ", "<< "max_samples=" << pub_data.durability_service.max_samples << ", "<< "max_instances=" << pub_data.durability_service.max_instances << ", "<< "max_samples_per_instance=" << pub_data.durability_service.max_samples_per_instance << "\n";// 7. deadlinestd::cout << "  Deadline: period=" << pub_data.deadline.period.sec << "s "<< pub_data.deadline.period.nanosec << "ns\n";// 8. latency_budgetstd::cout << "  Latency Budget: duration=" << pub_data.latency_budget.duration.sec << "s "<< pub_data.latency_budget.duration.nanosec << "ns\n";// 9. livelinessstd::cout << "  Liveliness: kind=" << (pub_data.liveliness.kind == DDS::AUTOMATIC_LIVELINESS_QOS ? "AUTOMATIC" :pub_data.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS ? "MANUAL_BY_PARTICIPANT" :"MANUAL_BY_TOPIC") << ", "<< "lease_duration=" << pub_data.liveliness.lease_duration.sec << "s "<< pub_data.liveliness.lease_duration.nanosec << "ns\n";// 10. reliabilitystd::cout << "  Reliability: kind=" << (pub_data.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS ? "RELIABLE" : "BEST_EFFORT") << ", "<< "max_blocking_time=" << pub_data.reliability.max_blocking_time.sec << "s "<< pub_data.reliability.max_blocking_time.nanosec << "ns\n";// 11. lifespanstd::cout << "  Lifespan: duration=" << pub_data.lifespan.duration.sec << "s "<< pub_data.lifespan.duration.nanosec << "ns\n";// 12. user_datastd::cout << "  User Data: ";for (CORBA::ULong j = 0; j < pub_data.user_data.value.length(); ++j) {std::cout << static_cast<char>(pub_data.user_data.value[j]);}std::cout << "\n";// 13. ownershipstd::cout << "  Ownership: kind=" << (pub_data.ownership.kind == DDS::SHARED_OWNERSHIP_QOS ? "SHARED" : "EXCLUSIVE") << "\n";// 14. ownership_strengthstd::cout << "  Ownership Strength: value=" << pub_data.ownership_strength.value << "\n";// 15. destination_orderstd::cout << "  Destination Order: kind=" << (pub_data.destination_order.kind == DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS ? "BY_RECEPTION_TIMESTAMP" : "BY_SOURCE_TIMESTAMP") << "\n";// 16. presentationstd::string access_scope;switch (pub_data.presentation.access_scope) {case DDS::INSTANCE_PRESENTATION_QOS: access_scope = "INSTANCE"; break;case DDS::TOPIC_PRESENTATION_QOS: access_scope = "TOPIC"; break;case DDS::GROUP_PRESENTATION_QOS: access_scope = "GROUP"; break;default: access_scope = "UNKNOWN"; break;}std::cout << "  Presentation: access_scope=" << access_scope << ", "<< "coherent_access=" << (pub_data.presentation.coherent_access ? "true" : "false") << ", "<< "ordered_access=" << (pub_data.presentation.ordered_access ? "true" : "false") << "\n";// 17. partitionstd::cout << "  Partition: ";for (CORBA::ULong j = 0; j < pub_data.partition.name.length(); ++j) {std::cout << (pub_data.partition.name[j].in() ? pub_data.partition.name[j].in() : "null");if (j < pub_data.partition.name.length() - 1) std::cout << ", ";}std::cout << "\n";// 18. topic_datastd::cout << "  Topic Data: ";for (CORBA::ULong j = 0; j < pub_data.topic_data.value.length(); ++j) {std::cout << static_cast<char>(pub_data.topic_data.value[j]);}std::cout << "\n";// 19. group_datastd::cout << "  Group Data: ";for (CORBA::ULong j = 0; j < pub_data.group_data.value.length(); ++j) {std::cout << static_cast<char>(pub_data.group_data.value[j]);}std::cout << "\n";// 20. representationstd::cout << "  Data Representation: ";for (CORBA::ULong j = 0; j < pub_data.representation.value.length(); ++j) {std::cout << pub_data.representation.value[j];if (j < pub_data.representation.value.length() - 1) std::cout << ", ";}std::cout << "\n";}
};

运行结果(这是上面pub测试发布pubqos对应的结果):

PublicationBuiltinTopicData at 1760690635:Key: 01 03 b8 1e a4 46 8d 01 37 54 f2 80 00 00 00 02Participant Key: 01 03 b8 1e a4 46 8d 01 37 54 f2 80 00 00 01 c1Topic Name: Movie Discussion ListType Name: DemoIdlModule::DemoTopic1Durability: kind=VOLATILEDurability Service: service_cleanup_delay=0s 0ns, history_kind=KEEP_LAST, history_depth=1, max_samples=-1, max_instances=-1, max_samples_per_instance=-1Deadline: period=2147483647s 2147483647nsLatency Budget: duration=0s 0nsLiveliness: kind=AUTOMATIC, lease_duration=2147483647s 2147483647nsReliability: kind=RELIABLE, max_blocking_time=0s 100000000nsLifespan: duration=2147483647s 2147483647nsUser Data:Ownership: kind=SHAREDOwnership Strength: value=0Destination Order: kind=BY_RECEPTION_TIMESTAMPPresentation: access_scope=TOPIC, coherent_access=false, ordered_access=falsePartition: TestPartitionTopic Data:Group Data: Initial GroupDataData Representation: 2

只是关注方法,在练习时可以通过这个打印关注到相关策略的设置。

发布端按需进行设置进行观察即可:

//设置datawriterQos:DDS::DataWriterQos dw_qos;pub->get_default_datawriter_qos(dw_qos);dw_qos.liveliness.kind = ::DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;dw_qos.liveliness.lease_duration.sec = 1;dw_qos.liveliness.lease_duration.nanosec = 0;char MYTEST_USERDATA[] = "Mytest UserData";CORBA::ULong user_data_len = static_cast<CORBA::ULong>(ACE_OS::strlen(MYTEST_USERDATA));dw_qos.user_data.value.length(user_data_len);dw_qos.user_data.value.replace(user_data_len, user_data_len, reinterpret_cast<CORBA::Octet*>(MYTEST_USERDATA));DDS::DataWriter_var dw = pub->create_datawriter(topic.in(),dw_qos,DDS::DataWriterListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
2.验证对应的保活策略(liveness心跳控制)。
//可参考OpenDDS\tests\DCPS\WaitSetLivelinessLost
enum LivelinessQosPolicyKind {AUTOMATIC_LIVELINESS_QOS,   //由中间件自动发送 liveliness 信号(推荐)MANUAL_BY_PARTICIPANT_LIVELINESS_QOS,  //由参与者手动调用 assert_liveliness()MANUAL_BY_TOPIC_LIVELINESS_QOS   //由 DataWriter 手动调用 assert_liveliness()目前不支持
};//发布端datawrite设置qos:  并且在中间构建业务等待5s  ACE_OS::sleep(5);DDS::DataWriterQos dw_qos;pub->get_default_datawriter_qos(dw_qos);dw_qos.liveliness.kind = ::DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;dw_qos.liveliness.lease_duration.sec = 1;dw_qos.liveliness.lease_duration.nanosec = 0;char MYTEST_USERDATA[] = "Mytest UserData";CORBA::ULong user_data_len = static_cast<CORBA::ULong>(ACE_OS::strlen(MYTEST_USERDATA));dw_qos.user_data.value.length(user_data_len);dw_qos.user_data.value.replace(user_data_len, user_data_len, reinterpret_cast<CORBA::Octet*>(MYTEST_USERDATA));DDS::DataWriter_var dw = pub->create_datawriter(topic.in(),dw_qos,DDS::DataWriterListener::_nil(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//订阅端datareader设置qos:DDS::DataWriterQos dw_qos;pub->get_default_datawriter_qos(dw_qos);dr_qos.liveliness.kind = ::DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;dr_qos.liveliness.lease_duration.sec = 3;//这是需要大于等于发布端的设置才兼容?dr_qos.liveliness.lease_duration.nanosec = 0;dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; // 确保数据可靠传输DDS::DataReader_var dr = sub->create_datareader(topic.in(),dr_qos,listener.in(),::OpenDDS::DCPS::DEFAULT_STATUS_MASK);

简单查看结果:

DataReaderListener::on_subscription_matched
Liveliness changed: alive_count=1, not_alive_count=0
DataReaderListener::on_liveliness_changed
Received coherent set with 1 samples:Sample 0: counter = 7
Received coherent set with 1 samples:Sample 0: counter = 8
Received coherent set with 1 samples:Sample 0: counter = 9
ERROR: take failed: 11
Liveliness changed: alive_count=0, not_alive_count=1   #这里时超时断开,可以识别到
DataReaderListener::on_liveliness_changed
Liveliness changed: alive_count=1, not_alive_count=0
DataReaderListener::on_liveliness_changed
...

这里尴尬的是,如果设置成MANUAL_BY_PARTICIPANT_LIVELINESS_QOS,是可以识别到链路的断开状态,但是AUTOMATIC_LIVELINESS_QOS状态下,没有通过现象观察到结果,后面可以通过抓包分析。

3.验证持久化策略(有遗留)。

(这里就发现一个敏锐的问题,订阅端和发布端的目标是不一样的,发布端控制的是本地的存储,用于重发;订阅端控制的是本地内存的存储策略,用于决定自己能接收缓存到多少数据;两者的侧重点是不一样的。)

====》既然发布端和订阅端策略目标和结构设计都有差异,是否需要策略设置对应才能真正生效?

这里的持久化策略关键参数主要涉及,不保留,本地保留,跨进程保留(需要外部支持),永久保存。

//查看DataWriterQos属性定义,可以看出,相关qos策略中,与持久化策略有关的两个属性是:
{DurabilityQosPolicy durability; //数据持久性(VOLATILE, TRANSIENT_LOCAL, TRANSIENT, PERSISTENT)DurabilityServiceQosPolicy durability_service;
}
//其对应的结构如下:struct DurabilityQosPolicy {  //控制持久化的策略DurabilityQosPolicyKind kind;};//这里的策略,订阅端必须兼容发布端的策略。enum DurabilityQosPolicyKind {VOLATILE_DURABILITY_QOS,   //默认,数据样本不存储,订阅端只能接收发布后实时发送的样本TRANSIENT_LOCAL_DURABILITY_QOS, //数据样本在发布端 DataWriter 存活期间存储(通常在内存中)TRANSIENT_DURABILITY_QOS,	//数据样本在发布端之外的全局存储中保存(例如 DDS 中间件提供的持久性服务)PERSISTENT_DURABILITY_QOS   //数据样本持久存储(例如磁盘),即使 DDS 系统重启,数据仍可分发};struct DurabilityServiceQosPolicy { //对持久化策略的细节描述Duration_t service_cleanup_delay;    //持久性服务在 DataWriter 销毁后保留数据的时长HistoryQosPolicyKind history_kind;   //持久性服务如何管理历史数据long history_depth;				//每个实例保留的历史样本数long max_samples;				//持久性服务存储的总样本数上限long max_instances;             //持久性服务支持的最大实例数long max_samples_per_instance;  //每个实例的最大样本数(存储的)};enum HistoryQosPolicyKind {KEEP_LAST_HISTORY_QOS,   //保留每个实例的最新 history_depth 个样本KEEP_ALL_HISTORY_QOS     //保留所有样本,直到达到 max_samples 或 max_samples_per_instance 限制};//DataReaderQos相关qos设置,与持久化策略有关的几个为:
{DurabilityQosPolicy durability;//控制是否接收历史数据(如在订阅前发布的)(VOLATILE, TRANSIENT_LOCAL, TRANSIENT, PERSISTENT)HistoryQosPolicy history;   //控制保留多少历史样本://KEEP_LAST(保留最近N个) KEEP_ALL(保留所有)    配合resource_limits使用,影响数据缓存深度ResourceLimitsQosPolicy resource_limits;  //限制缓存资源(总样本数/实例数(key 数)/每实例样本数)//防止内存爆炸,适用于多实例场景。
}

那么关注持久化策略,就关注的是默认情况下的不存储,本地内存存储,外部服务存储,以及本地磁盘存储。

既然默认情况下是不存储,也就是我们发布订阅的默认模式,使用qos打印观察一下相关参数的值即可:

//这是在内置主题中打印的对应的参数:// 5. durabilitystd::cout << "  Durability: kind=" << (pub_data.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? "VOLATILE" : "TRANSIENT_LOCAL") << "\n";// 6. durability_servicestd::cout << "  Durability Service: service_cleanup_delay=" << pub_data.durability_service.service_cleanup_delay.sec << "s "<< pub_data.durability_service.service_cleanup_delay.nanosec << "ns, "<< "history_kind=" << (pub_data.durability_service.history_kind == DDS::KEEP_LAST_HISTORY_QOS ? "KEEP_LAST" : "KEEP_ALL") << ", "<< "history_depth=" << pub_data.durability_service.history_depth << ", "<< "max_samples=" << pub_data.durability_service.max_samples << ", "<< "max_instances=" << pub_data.durability_service.max_instances << ", "<< "max_samples_per_instance=" << pub_data.durability_service.max_samples_per_instance << "\n";//直接运行发布端和测试端的程序,可以看到运行结果如下,即默认结果如下:  (理论上策略设置为VOLATILE,相关策略属性是无意义的,仅仅是设置为默认值,具体得经过验证或者查看源码。)Durability: kind=VOLATILEDurability Service: service_cleanup_delay=0s 0ns, history_kind=KEEP_LAST, history_depth=1, max_samples=-1, max_instances=-1, max_samples_per_instance=-1

关注本地进程存储方案,是存储在发布端,订阅端重启后仍能收到策略设置的历史部分数据。

//service_cleanup_delay 参数起到什么作用呢,是描述的datawriter没有对端datareader连接时,可以删除内部策略控制的数据吗?
//发布端设置存储策略,内存中存储,dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; //设置为发布端本地内存存储//dw_qos.durability_service.service_cleanup_delay.sec = 4;    //设置为没有连接时4s后清理内存数据//dw_qos.durability_service.service_cleanup_delay.nanosec = 0;dw_qos.durability_service.history_kind = DDS::KEEP_LAST_HISTORY_QOS;  //保存历史数据dw_qos.durability_service.history_depth = 2;    //最多保存历史数据个数//dw_qos.durability_service.max_samples = DDS::LENGTH_UNLIMITED;   //设置总样本数不限制,控制内存占用//dw_qos.durability_service.max_instances = DDS::LENGTH_UNLIMITED;   //最大可同时缓存的实例数量,针对topic//dw_qos.durability_service.max_samples_per_instance = DDS::LENGTH_UNLIMITED;  //设置每个实例的最大样本数,与max_instances配合//订阅端需要设置对应的策略,dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;  //这里暂时不关注历史数据的相关设置。dr_qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;dr_qos.history.depth = 3;//dr_qos.resource_limits.max_samples = DDS::LENGTH_UNLIMITED;//dr_qos.resource_limits.max_instances = DDS::LENGTH_UNLIMITED;//dr_qos.resource_limits.max_samples_per_instance = DDS::LENGTH_UNLIMITED;//理论上说,使用场景进行构建,发布端持续发送数据,订阅端启动,应该能收到多的history_depth的历史数据。

====》本地存储策略实际上需要配置文件中配置存储路径+对应的qos策略才能生效。

简单测试发现,配置文件中配置和期望也有一些差异,后续再排查吧。

4.验证数据发布间隔控制策略(有效期用于控制敏感数据)。

反思:这里的相关qos策略,需要订阅端匹配设置,并且订阅端兼容发布端的设置,比如数值大于等于发布端的设置才可以。

//这个策略貌似要对应设置才会有效
//软件端
dw_qos.deadline.period.sec = 2;
dw_qos.deadline.period.nanosec = 0;
//订阅端 
dr_qos.deadline.period.sec = 2;//如果订阅端这个值小于发布端 发现直接无法交互
dr_qos.deadline.period.nanosec = 0;void DataReaderListener::on_requested_deadline_missed(DDS::DataReader_ptr,const DDS::RequestedDeadlineMissedStatus& status)
{std::cout << "[Deadline Missed] total_count: " << status.total_count<< ", last_instance_handle: " << status.last_instance_handle << std::endl;cerr << "DataReaderListener::on_requested_deadline_missed" << endl;
}//查看运行结果,可以看到:
[Deadline Missed] total_count: 1, last_instance_handle: 6
DataReaderListener::on_requested_deadline_missed
[Deadline Missed] total_count: 2, last_instance_handle: 6
DataReaderListener::on_requested_deadline_missed
5.其他

其他策略:如用户附加数据,所有权,生命周期管理,优先级,排序相关策略暂不关注了。

尽力而为/可靠传输:就得构造网络场景进行验证了。 通过抓包/日志和现象(可靠传输在丢包场景下会重传,可以基于配置文件配置及配置特定传输方式和相关工具构造丢包测试)。

5. Publisher(同Subscriber中描述,注意pub端订阅sub端内置主题获取qos)

验证方式参考Subscriber中SubscriberQos的交互验证逻辑即可。

这里要注意:pub端要获取sub端qos中的用户数据,需要订阅的内置主题topic是sub端的“DCPSSubscription”。

//从 struct PublisherQos定义中进行分析,presentation控制发送顺序,partition实现分区控制,group_data用户数据,entity_factory同上文一致,控制是否自动启动。struct PublisherQos {PresentationQosPolicy presentation;   //数据分发的顺序和一致性(INSTANCE, TOPIC, GROUP)。//INSTANCE_PRESENTATION_QOS 是一个topic的一个实例多次发布保持一致//TOPIC_PRESENTATION_QOS是一个topic的多个实例保持一致//GROUP_PRESENTATION_QOS是多个topic的多个实例保持一致PartitionQosPolicy partition;         //逻辑分区,控制发布-订阅匹配。//直接观察匹配个数即可GroupDataQosPolicy group_data;        //附加到 Publisher 的用户数据。//使用内置主题查看这个数据即可,用户层自己处理EntityFactoryQosPolicy entity_factory;//控制 DataWriter 是否自动启用。//同上的是否需要自己enable()启动};

6. Datawriter(同Datareader中一致,注意内置主题是获取sub端的qos)

这里用的内置主题和pub端获取对应sub端内置主题一样,使用BUILT_IN_SUBSCRIPTION_TOPIC及对应的内置主题topic进行解析。

4. opendds中提供xml配置qos

已经以demo对qos进行设置进行分析测试。

opendds中提供了以xml对qos进行处理的支持:

1.首先是源码C:\Users\LEGION\Desktop\opendds_study\OpenDDS\dds\DCPS\QOS_XML_Handler下提供了对qos支持的xml的相关的解析,这里主要有两个项目,分别是xsd和xml对应解析加载qos的策略。

2.测试代码\OpenDDS\tests\DCPS\QoS_XML中提供了对xsd和xml文件解析加载的代码。

这里的疑问:

第一:opendds中xml相关的设置是直接修改xml文件?没有提供其他工具的支持?

===》所以需要自己构建/直接修改qos对应的xml进行加载。

第二:opendds中对xml支持qos配置的解析支持程度到哪一步?

=====》直接和DomainParticipant,subscriber,publisher,topic,datareader,datawriter进行关联,直接可以通过解析后获取到对应的qos策略。

5. 获取相关qos并进行打印

要对qos进行生效设置,首先需要确认的是qos中相关设置的参数都是正确。

读取qos后,对相关参数进行打印。

http://www.dtcms.com/a/503479.html

相关文章:

  • 多视图几何--立体匹配--Gipuma
  • C++智能指针全面解析:原理、使用场景与最佳实践
  • C++指针使用
  • 内江规划建设教育网站国家企业信用公示信息网官网
  • 深入理解 lscpu 命令:如何准确查看 CPU 信息
  • 网站建设需要什么人希腊网站后缀
  • DSync for Mac 文件对比同步工具
  • 「日拱一码」123 内嵌神经网络ENNs
  • C++与易语言开发的基础要求分享
  • 上海市住宅建设发展中心网站建设网站有何要求
  • 广州企业网站建设公司哪家好wordpress改html5
  • ARM 架构核心知识笔记(整理与补充版)
  • 《i.MX6ULL LED 裸机开发实战:从寄存器到点亮》
  • 迈向零信任存储:基于RustFS构建内生安全的数据架构
  • 网站开发公司找哪家帮卖货平台
  • C++ Vector:动态数组的高效使用指南
  • html5微网站漂亮网站
  • C++ 分配内存 new/malloc 区别
  • Respective英文单词学习
  • 网络排错全流程:从DNS解析到防火墙,逐层拆解常见问题
  • 移动端开发工具集锦
  • 使用Nvidia Video Codec(三) NvDecoder
  • 周口规划建设局网站wordpress模板中添加短代码
  • Linux小课堂: 命令手册系统深度解析之掌握 man 与 apropos 的核心技术机制
  • 阿里云做网站官网网站改版的seo注意事项
  • 每日算法刷题Day76:10.19:leetcode 二叉树12道题,用时3h
  • 【OS笔记11】:进程和线程9-死锁及其概念
  • 贪心算法1
  • 服务器搭建vllm框架并部署模型+cursor使用经验
  • Arduino采集温湿度、光照数据