FastDDS:第三节(3.3小节)
3.3.发布者(Publisher)
3.3.1.发布者(Publisher)
3.3.1.1.发布者服务质量(PublisherQos)
3.3.1.1.1. 默认 PublisherQos(Default PublisherQos)
3.3.2.发布监听器(PublisherListener)
3.3.3.创建发布者(Creating a Publisher)
3.3.3.1.基于配置文件创建发布者(Profile based creation of a Publisher)
3.3.3.2.删除发布者(Deleting a Publisher)
3.3.4.数据写入器(DataWriter)
3.3.4.1.数据写入器服务质量(DataWriterQos)
3.3.4.1.1.默认 DataWriterQos(Default DataWriterQos)
3.3.5.数据写入监听器(DataWriterListener)
3.3.5.1. on_unacknowledged_sample_removed
3.3.6.创建 DataWriter(Creating a DataWriter)
3.3.6.1.基于配置文件创建 DataWriter(Profile based creation of a DataWriter)
3.3.6.2.使用自定义 PayloadPool 创建 DataWriter
3.3.6.3.删除 DataWriter
3.3.7.发布数据(Publishing data)
3.3.7.1.阻止写入操作(Blocking of the write operation)
3.3.7.2.借用数据缓冲区(Borrowing a data buffer)
3.3.7.3.预过滤 DataReader(Prefiltering out DataReaders)
3.3.发布者(Publisher)
发布由DataWriter和Publisher的关联定义。要开始发布数据实例的值,应用程序需要在 Publisher 中创建一个新的 DataWriter。此 DataWriter 将绑定到描述要传输数据类型的主题 (Topic)。与此主题匹配的远程订阅将能够从 DataWriter 接收数据值更新。
3.3.1.发布者(Publisher)
Publisher代表其所属的一个或多个DataWriter对象。它充当一个容器,允许根据Publisher 的PublisherQos提供的通用配置对不同的 DataWriter 对象进行分组。属于同一个 Publisher 的 DataWriter 对象彼此之间除了 PublisherQos 之外没有任何关联,并且在其他方面都独立运行。具体来说,一个 Publisher 可以托管不同Topic和数据类型的 DataWriter 对象。
3.3.1.1.发布者服务质量(PublisherQos)
PublisherQos控制Publisher的行为。它内部包含以下QosPolicy对象:
QosPolicy class | Accessor/Mutator | Mutable |
---|---|---|
PresentationQosPolicy | presentation() | Yes |
PartitionQosPolicy | partition() | Yes |
GroupDataQosPolicy | group_data() | Yes |
EntityFactoryQosPolicy | entity_factory() | Yes |
可以使用Publisher::set_qos()成员函数修改先前创建的发布者的 QoS 值。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Publisher with default PublisherQos
Publisher* publisher =participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
PublisherQos qos = publisher->get_qos();// Modify QoS attributes
// (...)// Assign the new Qos to the object
publisher->set_qos(qos);
3.3.1.1.1. 默认 PublisherQos(Default PublisherQos)
默认的PublisherQos指的是DomainParticipant 实例上的成员函数get_default_publisher_qos()返回的值。该特殊值PUBLISHER_QOS_DEFAULT可以用作create_publisher()或Publisher::set_qos()成员函数的 QoS 参数,以指示应使用当前默认的 PublisherQos。
系统启动时,默认的 PublisherQos 相当于默认的构造值PublisherQos()。可以随时使用DomainParticipant实例上的成员函数set_default_publisher_qos()修改默认的 PublisherQos 。修改默认的 PublisherQos 不会影响已经存在的Publisher实例。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
PublisherQos qos_type1 = participant->get_default_publisher_qos();// Modify QoS attributes
// (...)// Set as the new default PublisherQos
if (participant->set_default_publisher_qos(qos_type1) != RETCODE_OK)
{// Errorreturn;
}// Create a Publisher with the new default PublisherQos.
Publisher* publisher_with_qos_type1 =participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher_with_qos_type1)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
PublisherQos qos_type2;// Modify QoS attributes
// (...)// Set as the new default PublisherQos
if (participant->set_default_publisher_qos(qos_type2) != RETCODE_OK)
{// Errorreturn;
}// Create a Publisher with the new default PublisherQos.
Publisher* publisher_with_qos_type2 =participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher_with_qos_type2)
{// Errorreturn;
}// Resetting the default PublisherQos to the original default constructed values
if (participant->set_default_publisher_qos(PUBLISHER_QOS_DEFAULT)!= RETCODE_OK)
{// Errorreturn;
}// The previous instruction is equivalent to the following
if (participant->set_default_publisher_qos(PublisherQos())!= RETCODE_OK)
{// Errorreturn;
}
set_default_publisher_qos()成员函数也接受特殊值PUBLISHER_QOS_DEFAULT作为输入参数。这会将当前默认的 PublisherQos 重置为默认的构造值PublisherQos()。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a custom PublisherQos
PublisherQos custom_qos;// Modify QoS attributes
// (...)// Create a publisher with a custom PublisherQos
Publisher* publisher = participant->create_publisher(custom_qos);
if (nullptr == publisher)
{// Errorreturn;
}// Set the QoS on the publisher to the default
if (publisher->set_qos(PUBLISHER_QOS_DEFAULT) != RETCODE_OK)
{// Errorreturn;
}// The previous instruction is equivalent to the following:
if (publisher->set_qos(participant->get_default_publisher_qos())!= RETCODE_OK)
{// Errorreturn;
}
PUBLISHER_QOS_DEFAULT根据使用位置的不同,该值具有不同的含义:
- 在create_publisher(),Publisher::set_qos()上调用,它指的是默认的PublisherQos。由 返回get_default_publisher_qos()。
- 在set_default_publisher_qos()上调用,指的是默认构造的PublisherQos()。
3.3.2.发布监听器(PublisherListener)
PublisherListener是一个抽象类,定义了在Publisher状态变化时触发的回调函数。默认情况下,所有这些回调函数都是空的,不执行任何操作。用户应该实现此类的特化版本,以覆盖应用程序所需的回调函数。未被覆盖的回调函数将保持其空的实现。
PublisherListener继承自DataWriterListener。因此,它能够对报告给DataWriter 的所有事件做出反应。由于事件始终会通知给能够处理该事件的最具体的实体监听器 (Entity Listener),因此,PublisherListener继承自 DataWriterListener 的回调函数只有在触发事件的 DataWriter 未附加监听器,或者回调函数被DataWriter 的StatusMask禁用时才会被调用。
PublisherListener未添加任何新的回调。请参阅DataWriterListener中继承的回调列表和覆盖示例。
3.3.3.创建发布者(Creating a Publisher)
一个发布者 (Publisher)始终属于一个DomainParticipant。发布者的创建是通过DomainParticipant 实例上的成员函数create_publisher()完成的,该函数充当发布者的工厂。
强制性参数包括:
- 描述发布者行为的PublisherQos 。如果提供的值为PUBLISHER_QOS_DEFAULT,则使用默认的PublisherQos。
可选参数包括:
- 继承自PublisherListener的 Listener ,用于实现在 Publisher 上发生事件和状态变化时触发的回调函数。默认使用空回调函数。
- StatusMask用于激活或停用 PublisherListener 上各个回调的触发。默认情况下,所有事件均启用。
create_publisher()如果操作过程中出现错误(例如,提供的QoS不兼容或不受支持),将返回空指针。建议检查返回值是否为有效指针。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Publisher with default PublisherQos and no Listener
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
Publisher* publisher_with_default_qos =participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher_with_default_qos)
{// Errorreturn;
}// A custom PublisherQos can be provided to the creation method
PublisherQos custom_qos;// Modify QoS attributes
// (...)Publisher* publisher_with_custom_qos =participant->create_publisher(custom_qos);
if (nullptr == publisher_with_custom_qos)
{// Errorreturn;
}// Create a Publisher with default QoS and a custom Listener.
// CustomPublisherListener inherits from PublisherListener.
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
CustomPublisherListener custom_listener;
Publisher* publisher_with_default_qos_and_custom_listener =participant->create_publisher(PUBLISHER_QOS_DEFAULT, &custom_listener);
if (nullptr == publisher_with_default_qos_and_custom_listener)
{// Errorreturn;
}
3.3.3.1.基于配置文件创建发布者(Profile based creation of a Publisher)
可以不使用 PublisherQos,而是通过调用 DomainParticipant 实例上的 create_publisher_with_profile() 成员函数,并传入一个表示配置文件名称的字符串来创建一个 Publisher。
必填参数为:
- 一个字符串,用于标识该 Publisher 的名称。
可选参数包括:
- 一个派生自 PublisherListener 的监听器对象,用于实现回调函数,这些回调函数将在 Publisher 发生事件或状态变化时被触发。默认情况下使用空的回调函数。
- 一个 StatusMask 类型的参数,用于启用或禁用 PublisherListener 中特定回调函数的触发。默认情况下,所有事件都被启用。
create_publisher_with_profile() 函数在操作过程中如果发生错误(例如提供的 QoS 不兼容或不受支持),将返回一个空指针。因此,建议检查返回值是否为有效的指针。
// First load the XML with the profiles
DomainParticipantFactory::get_instance()->load_XML_profiles_file("profiles.xml");// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Publisher using a profile and no Listener
Publisher* publisher_with_profile =participant->create_publisher_with_profile("publisher_profile");
if (nullptr == publisher_with_profile)
{// Errorreturn;
}// Create a Publisher using a profile and a custom Listener.
// CustomPublisherListener inherits from PublisherListener.
CustomPublisherListener custom_listener;
Publisher* publisher_with_profile_and_custom_listener =participant->create_publisher_with_profile("publisher_profile", &custom_listener);
if (nullptr == publisher_with_profile_and_custom_listener)
{// Errorreturn;
}
3.3.3.2.删除发布者(Deleting a Publisher)
可以使用创建发布者的 DomainParticipant 实例上的成员函数delete_publisher()删除发布者。
仅当所有属于发布者的实体 (DataWriter) 都已删除时,才能删除发布者。否则,该函数将发出错误,并且不会删除发布者。这可以通过使用Publisher的成员函数delete_contained_entities()来执行。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}// Create a Publisher
Publisher* publisher =participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher)
{// Errorreturn;
}// Use the Publisher to communicate
// (...)// Delete the entities the Publisher created.
if (publisher->delete_contained_entities() != RETCODE_OK)
{// Publisher failed to delete the entities it created.return;
}// Delete the Publisher
if (participant->delete_publisher(publisher) != RETCODE_OK)
{// Errorreturn;
}
3.3.4.数据写入器(DataWriter)
一个 DataWriter(数据写入器)恰好依附于一个 Publisher(发布者),该发布者作为其工厂(factory)。此外,每个 DataWriter 在创建时都绑定到一个单独的 Topic(主题)。这个 Topic 必须在 DataWriter 创建之前就已经存在,并且必须绑定到 DataWriter 所要发布的数据类型。
在一个 Publisher 中为特定 Topic 创建新的 DataWriter 的效果是:启动一个具有该 Topic 所描述的名称和数据类型的新的发布(publication)。
一旦 DataWriter 被创建,应用程序就可以通过在其 DataWriter 上调用 write() 成员函数,通知数据值的变化。这些变化将被传输到所有与该发布匹配的订阅(Subscription)上。
3.3.4.1.数据写入器服务质量(DataWriterQos)
DataWriterQos控制 DataWriter 的行为。它内部包含以下QosPolicy对象:
QosPolicy class | Accessor/Mutator | Mutable |
---|---|---|
DurabilityQosPolicy | durability() | No |
DurabilityServiceQosPolicy | durability_service() | No |
DeadlineQosPolicy | deadline() | Yes |
LatencyBudgetQosPolicy | latency_budget() | Yes |
LivelinessQosPolicy | liveliness() | No |
ReliabilityQosPolicy | reliability() | No (*) |
DestinationOrderQosPolicy | destination_order() | No |
HistoryQosPolicy | history() | No |
ResourceLimitsQosPolicy | resource_limits() | No |
TransportPriorityQosPolicy | transport_priority() | Yes |
LifespanQosPolicy | lifespan() | Yes |
UserDataQosPolicy | user_data() | Yes |
OwnershipQosPolicy | ownership() | No |
OwnershipStrengthQosPolicy | ownership_strength() | Yes |
WriterDataLifecycleQosPolicy | writer_data_lifecycle() | Yes |
PublishModeQosPolicy | publish_mode() | No |
DataRepresentationQosPolicy | representation() | No |
PropertyPolicyQos | properties() | No |
RTPSReliableWriterQos | reliable_writer_qos() | Yes (*) |
RTPSEndpointQos | endpoint() | No |
WriterResourceLimitsQos | writer_resource_limits() | No |
DataSharingQosPolicy | data_sharing() | No |
以下未整合的、按属性分配的 QoS 适用于 DataWriters。
Property name | Non-consolidated QoS |
---|---|
fastdds.push_mode | DataWriter operating mode QoS Policy |
partitions | Endpoint Partitions |
可以使用成员函数修改先前创建的 DataWriter 的 QoS 值DataWriter::set_qos()。
// Create a DataWriter with default DataWriterQos
DataWriter* data_writer =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
DataWriterQos qos = data_writer->get_qos();// Modify QoS attributes
// (...)// Assign the new Qos to the object
data_writer->set_qos(qos);
3.3.4.1.1.默认 DataWriterQos(Default DataWriterQos)
默认的 DataWriterQos 是指调用 Publisher 实例的 get_default_datawriter_qos() 成员函数所返回的值。特殊值 DATAWRITER_QOS_DEFAULT 可以作为 create_datawriter() 或 DataWriter::set_qos() 成员函数的 QoS 参数使用,表示应采用当前默认的 DataWriterQos 设置。
在系统启动时,默认的 DataWriterQos 等同于通过默认构造函数生成的 DataWriterQos() 实例。可以通过调用 Publisher 实例的 set_default_datawriter_qos() 成员函数,在任意时刻修改默认的 DataWriterQos。需要注意的是,修改默认的 DataWriterQos 不会影响已经存在的 DataWriter 实例。
// Get the current QoS or create a new one from scratch
DataWriterQos qos_type1 = publisher->get_default_datawriter_qos();// Modify QoS attributes
// (...)// Set as the new default DataWriterQos
if (publisher->set_default_datawriter_qos(qos_type1) != RETCODE_OK)
{// Errorreturn;
}// Create a DataWriter with the new default DataWriterQos.
DataWriter* data_writer_with_qos_type1 =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer_with_qos_type1)
{// Errorreturn;
}// Get the current QoS or create a new one from scratch
DataWriterQos qos_type2;// Modify QoS attributes
// (...)// Set as the new default DataWriterQos
if (publisher->set_default_datawriter_qos(qos_type2) != RETCODE_OK)
{// Errorreturn;
}// Create a DataWriter with the new default DataWriterQos.
DataWriter* data_writer_with_qos_type2 =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer_with_qos_type2)
{// Errorreturn;
}// Resetting the default DataWriterQos to the original default constructed values
if (publisher->set_default_datawriter_qos(DATAWRITER_QOS_DEFAULT)!= RETCODE_OK)
{// Errorreturn;
}// The previous instruction is equivalent to the following
if (publisher->set_default_datawriter_qos(DataWriterQos())!= RETCODE_OK)
{// Errorreturn;
}
set_default_datawriter_qos()成员函数也接受特殊值DATAWRITER_QOS_DEFAULT作为输入参数。这会将当前默认的 DataWriterQos 重置为默认的构造值DataWriterQos()。
// Create a custom DataWriterQos
DataWriterQos custom_qos;// Modify QoS attributes
// (...)// Create a DataWriter with a custom DataWriterQos
DataWriter* data_writer = publisher->create_datawriter(topic, custom_qos);
if (nullptr == data_writer)
{// Errorreturn;
}// Set the QoS on the DataWriter to the default
if (data_writer->set_qos(DATAWRITER_QOS_DEFAULT) != RETCODE_OK)
{// Errorreturn;
}// The previous instruction is equivalent to the following:
if (data_writer->set_qos(publisher->get_default_datawriter_qos())!= RETCODE_OK)
{// Errorreturn;
}
DATAWRITER_QOS_DEFAULT 这一数值的含义会因使用场景不同而变化:
- 在 create_datawriter() 以及 DataWriter::set_qos() 中,它指的是由 get_default_datawriter_qos() 返回的默认 DataWriterQos。
- 在 set_default_datawriter_qos() 中,它指的是默认构造的 DataWriterQos()。
3.3.5.数据写入监听器(DataWriterListener)
DataWriterListener是一个抽象类,定义了在DataWriter状态变化时触发的回调函数。默认情况下,所有这些回调函数都是空的,不执行任何操作。用户应该实现此类的特化版本,以覆盖应用程序所需的回调函数。未被覆盖的回调函数将保持其空的实现。
DataWriterListener定义以下回调:
- on_publication_matched():DataWriter 找到了与Topic匹配且具有共同分区和兼容 QoS 的DataReader,或者已停止与之前认为匹配的 DataReader 匹配。
- on_offered_deadline_missed():DataWriter 未能在其DataWriterQos上配置的截止期限内提供数据。对于 DataWriter 未能提供数据的每个截止期限和数据实例,都会调用此方法。
- on_offered_incompatible_qos():DataWriter 找到了与 Topic 匹配且具有公共分区的 DataReader,但其请求的 QoS 与 DataWriter 上定义的 QoS 不兼容。
- on_liveliness_lost():DataWriter 没有遵守其 DataWriterQos 上的活跃度配置,因此,DataReader 实体将认为 DataWriter 不再处于活动状态。
- on_unacknowledged_sample_removed():Datawriter 删除了未被每个匹配的 DataReader 确认的样本。
3.3.5.1. on_unacknowledged_sample_removed
on_unacknowledged_sample_removed() 是一个非标准的回调函数,用于在样本(sample)未被匹配的 DataReader 发送/接收的情况下被移除时通知用户。这种情况可能发生在网络资源受限或发布数据的吞吐量过高的场景中。通过该回调机制,发布端应用可以检测到此类情况,并采取相应的解决措施,例如降低发布速率,以缓解问题。
判断样本是否因未被确认(acknowledged)而被移除的依据,取决于 ReliabilityQosPolicy 的设置:
- 对于 BEST_EFFORT_RELIABILITY_QOS 设置的 DataWriter:如果样本尚未通过传输层发送出去,就会被标记为“未确认即被移除”,并触发回调通知。
- 对于 REILABLE_RELIABILITY_QOS 设置的 DataWriter:只有当所有匹配的 DataReader 都未确认接收该样本(即未发送相应的元流量 ACK 消息)时,该样本才会被视为“未确认即被移除”。这意味着,虽然某些 DataReader 可能已经接收到该样本,但由于至少有一个匹配的 DataReader 未确认接收,或者其 ACK 消息尚未到达发送端,系统就会触发此回调。这种情况下存在不可避免的竞争条件(race condition):在样本被移除时,虽然某个 DataReader 的 ACK 消息尚未收到,但该消息可能仍在传输途中,或在网络中丢失。因此,这种判断标准可能会导致一些误报(false positives)。但从用户的角度来看,知道样本是否被所有匹配 DataReader 成功确认,是更有意义的。
对于启用了 DisablePositiveACKsQosPolicy 的可靠 DataWriter:该策略会禁用正向确认(ACK)消息的发送,除非 DataReader 检测到样本丢失,此时它将通过负向确认(NACK)消息通知发送端。在没有收到 NACK 的情况下,系统会在指定的 QoS 时间窗口内认为样本已被成功接收。然而,这种机制同样存在竞争条件,因为 NACK 消息可能尚未到达或已在网络中丢失。在该特定情况下,由于不会收到 ACK 消息,可靠的 DataWriter 将采用与 BEST_EFFORT 模式相同的判断标准,即将未发送成功的样本视为“未确认即被移除”。
class CustomDataWriterListener : public DataWriterListener
{public:CustomDataWriterListener(): DataWriterListener(){}virtual ~CustomDataWriterListener(){}void on_publication_matched(DataWriter* writer,const PublicationMatchedStatus& info) override{static_cast<void>(writer);if (info.current_count_change == 1){std::cout << "Matched a remote Subscriber for one of our Topics" << std::endl;}else if (info.current_count_change == -1){std::cout << "Unmatched a remote Subscriber" << std::endl;}}void on_offered_deadline_missed(DataWriter* writer,const OfferedDeadlineMissedStatus& status) override{static_cast<void>(writer);static_cast<void>(status);std::cout << "Some data could not be delivered on time" << std::endl;}void on_offered_incompatible_qos(DataWriter* writer,const OfferedIncompatibleQosStatus& status) override{std::cout << "Found a remote Topic with incompatible QoS (QoS ID: " << status.last_policy_id <<")" << std::endl;}void on_liveliness_lost(DataWriter* writer,const LivelinessLostStatus& status) override{static_cast<void>(writer);static_cast<void>(status);std::cout << "Liveliness lost. Matched Subscribers will consider us offline" << std::endl;}void on_unacknowledged_sample_removed(DataWriter* writer,const InstanceHandle_t& instance) override{static_cast<void>(writer);static_cast<void>(instance);std::cout << "Sample removed unacknowledged" << std::endl;}};
3.3.6.创建 DataWriter(Creating a DataWriter)
一个 DataWriter 始终隶属于一个 Publisher。创建 DataWriter 是通过调用 Publisher 实例上的成员函数 create_datawriter() 来完成的,该函数充当 DataWriter 的工厂。
必填参数包括:
- 一个绑定到将要传输的数据类型的 Topic。
- 描述 DataWriter 行为的 DataWriterQos。如果提供的值为 DATAWRITER_QOS_DEFAULT,则使用默认的 DataWriterQos 值。如果提供的值为 DATAWRITER_QOS_USE_TOPIC_QOS,则使用默认 QoS 和所提供的 TopicQoS 的值,其中 TopicQoS 中设定的任何策略会覆盖默认 QoS 中对应的策略。
可选参数包括:
- 一个继承自 DataWriterListener 的 Listener,用于实现在响应 DataWriter 上的事件和状态变化时触发的回调函数。默认情况下使用空回调。
- 一个 StatusMask,用于启用或禁用 DataWriterListener 上各个回调的触发。默认情况下,所有事件都是启用状态。
函数返回值说明:
create_datawriter() 函数在操作过程中如果发生错误(例如所提供的 QoS 不兼容或不被支持),将返回一个空指针。建议检查返回值是否为有效的指针,以确保创建操作成功。
// Create a DataWriter with default DataWriterQos and no Listener
// The value DATAWRITER_QOS_DEFAULT is used to denote the default QoS.
DataWriter* data_writer_with_default_qos =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer_with_default_qos)
{// Errorreturn;
}// A custom DataWriterQos can be provided to the creation method
DataWriterQos custom_qos;// Modify QoS attributes
// (...)DataWriter* data_writer_with_custom_qos =publisher->create_datawriter(topic, custom_qos);
if (nullptr == data_writer_with_custom_qos)
{// Errorreturn;
}// Create a DataWriter with default QoS and a custom Listener.
// CustomDataWriterListener inherits from DataWriterListener.
// The value DATAWRITER_QOS_DEFAULT is used to denote the default QoS.
CustomDataWriterListener custom_listener;
DataWriter* data_writer_with_default_qos_and_custom_listener =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT, &custom_listener);
if (nullptr == data_writer_with_default_qos_and_custom_listener)
{// Errorreturn;
}// Create a DataWriter with default QoS and a custom TopicQos.
// The value DATAWRITER_QOS_USE_TOPIC_QOS is used to denote the default QoS
// and to override the TopicQos.
Topic* topic;
DataWriter* data_writer_with_default_qos_and_custom_topic_qos =publisher->create_datawriter(topic, DATAWRITER_QOS_USE_TOPIC_QOS);
if (nullptr == data_writer_with_default_qos_and_custom_topic_qos)
{// Errorreturn;
}
3.3.6.1.基于配置文件创建 DataWriter(Profile based creation of a DataWriter)
可以不使用 DataWriterQos,而是通过调用 Publisher 实例上的 create_datawriter_with_profile() 成员函数,并传入一个配置文件(profile)的名称来创建一个 DataWriter。
必填参数包括:
- 一个绑定到待传输数据类型的 Topic。
- 一个用于标识该 DataWriter 的字符串名称。
可选参数包括:
- 一个派生自 DataWriterListener 的监听器,用于实现当 DataWriter 上发生事件或状态变化时触发的回调函数。默认情况下使用空回调。
- 一个 StatusMask,用于启用或禁用 DataWriterListener 中各个回调的触发。默认情况下所有事件都是启用状态。
create_datawriter_with_profile() 在操作过程中如果发生错误(例如提供的 QoS 不兼容或不受支持),将返回一个空指针。建议检查返回值是否为一个有效的指针。
必须先加载 XML 配置文件。请参阅从 XML 文件加载配置文件。
// First load the XML with the profiles
DomainParticipantFactory::get_instance()->load_XML_profiles_file("profiles.xml");// Create a DataWriter using a profile and no Listener
DataWriter* data_writer_with_profile =publisher->create_datawriter_with_profile(topic, "data_writer_profile");
if (nullptr == data_writer_with_profile)
{// Errorreturn;
}// Create a DataWriter using a profile and a custom Listener.
// CustomDataWriterListener inherits from DataWriterListener.
CustomDataWriterListener custom_listener;
DataWriter* data_writer_with_profile_and_custom_listener =publisher->create_datawriter_with_profile(topic, "data_writer_profile", &custom_listener);
if (nullptr == data_writer_with_profile_and_custom_listener)
{// Errorreturn;
}
3.3.6.2.使用自定义 PayloadPool 创建 DataWriter
在创建DataWriter时,可以将自定义PayloadPool作为参数传递。这允许自定义管理 DataWriters 和 DataReaders 之间交换的信息。可以在另一端设置相同的配置。
// A DataWriterQos must be provided to the creation method
DataWriterQos qos;// Create PayloadPool
std::shared_ptr<eprosima::fastdds::rtps::IPayloadPool> payload_pool =std::dynamic_pointer_cast<eprosima::fastdds::rtps::IPayloadPool>(std::make_shared<CustomPayloadPool>());DataWriter* data_writer = publisher->create_datawriter(topic, qos, nullptr, StatusMask::all(), payload_pool);
if (nullptr == data_writer)
{// Errorreturn;
}
此配置也可在RTPS 层执行。自定义示例同时适用于这两个层。
3.3.6.3.删除 DataWriter
可以使用创建 DataWriter 的Publisher实例上的成员函数delete_datawriter()删除 DataWriter 。
// Create a DataWriter
DataWriter* data_writer =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
{// Errorreturn;
}// Use the DataWriter to communicate
// (...)// Delete the DataWriter
if (publisher->delete_datawriter(data_writer) != RETCODE_OK)
{// Errorreturn;
}
3.3.7.发布数据(Publishing data)
当调用 DataWriter 上的 write() 成员函数时,表示某个数据实例的值发生了变化。这一变更随后会被传播到所有与该 DataWriter 匹配的 DataReader。此外,该操作还会产生一个副作用,即它会声明 DataWriter 本身、其所属的 Publisher 以及 DomainParticipant 的活跃状态(liveliness)。
该函数接受两个参数:
- 一个指向数据实例的指针,该实例包含了新的值。
- 一个实例的句柄(InstanceHandle_t)。
如果将句柄参数传入一个空句柄(即默认构造的 InstanceHandle_t),则表示数据实例的身份(identity)应从实例数据的关键字段(key)中自动推导得出。此外,write() 成员函数也支持重载形式,仅接收指向数据实例的指针,这种形式也总是会从数据的关键字段中自动推导实例的身份。
如果传入的句柄非空,则该句柄必须与通过 TypeSupport 实例的 getKey() 方法获取的句柄值一致。否则,write() 函数将返回错误码 RETCODE_PRECONDITION_NOT_MET,表示前提条件未满足,写入操作失败。
// Register the data type in the DomainParticipant.
TypeSupport custom_type_support(new CustomDataType());
custom_type_support.register_type(participant, custom_type_support.get_type_name());// Create a Topic with the registered type.
Topic* custom_topic =participant->create_topic("topic_name", custom_type_support.get_type_name(), TOPIC_QOS_DEFAULT);
if (nullptr == custom_topic)
{// Errorreturn;
}// Create a DataWriter
DataWriter* data_writer =publisher->create_datawriter(custom_topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
{// Errorreturn;
}// Get a data instance
void* data = custom_type_support->create_data();// Fill the data values
// (...)// Publish the new value, deduce the instance handle
if (data_writer->write(data, eprosima::fastdds::rtps::InstanceHandle_t()) != RETCODE_OK)
{// Errorreturn;
}// The data instance can be reused to publish new values,
// but delete it at the end to avoid leaks
custom_type_support->delete_data(data);
3.3.7.1.阻止写入操作(Blocking of the write operation)
如果DataWriterQos中的可靠性类型被设置为RELIABLE(可靠),则write()操作可能会被阻塞。具体来说,如果已达到所配置的资源限制中指定的上限,write()操作将会阻塞,等待可用空间释放。在这种情况下,reliability中的max_blocking_time配置项用于指定write()操作在阻塞等待时的最长时间。如果在max_blocking_time规定的时间内,DataWriter仍无法在不超出限制的情况下存储修改数据,则write()操作将失败,并返回TIMEOUT(超时)错误。
3.3.7.2.借用数据缓冲区(Borrowing a data buffer)
当用户调用 write() 函数并传入一个新的数据样本时,数据会被从该样本复制到 DataWriter 的内存中。对于较大的数据类型来说,这样的复制操作可能会消耗较多的时间和内存资源。为了避免这种情况,DataWriter 可以将其内存中的样本“借出”给用户,用户随后可以直接在这个借出的样本上填充所需的数据。当调用 write() 函数并传入这种借出的样本时,DataWriter 不会再次复制其内容,因为该缓冲区本来就是 DataWriter 所拥有的。
要在发布中使用借出的数据样本,可以按照以下步骤进行操作:
- 调用 loan_sample() 函数获取一个借出样本的引用;
- 利用该引用来构建数据样本,即填充数据;
- 调用 write() 函数将该样本写入。
一旦使用借出的样本调用了 write() 函数,则认为该样本的借用已经被归还,此时对样本内容的任何修改都将不再安全,也不被推荐。
如果调用了 loan_sample() 函数获取了一个样本,但最终并没有调用 write() 函数将其写入,则必须通过调用 discard_loan() 函数将该样本手动归还给 DataWriter。否则,DataWriter 可能会因未被归还的借出样本而最终无法提供新的样本,从而导致资源耗尽的问题。
// Borrow a data instancevoid* data = nullptr;if (RETCODE_OK == data_writer->loan_sample(data)){bool error = false;// Fill the data values// (...)if (error){// Return the loan without publishingdata_writer->discard_loan(data);return;}// Publish the new valueif (data_writer->write(data, eprosima::fastdds::rtps::InstanceHandle_t()) != RETCODE_OK){// Errorreturn;}}
3.3.7.3.预过滤 DataReader(Prefiltering out DataReaders)
用户可以使用一个重载的 write() 方法,该方法允许传入一个 WriteParams 结构。该结构的一个成员是 UserWriteData,用户可以在其中存储额外的信息,供预过滤机制使用,该信息可以通过 user_write_data() 方法进行设置。
通过实现 IContentFilter 接口,并将其传递给数据写入器(DataWriter)的 DataWriter::set_sample_prefilter() 方法,用户可以基于样本的内容(SerializedPayload_t)以及 FilteredSampleInfo 中的 user_write_data,阻止数据写入器将样本发送给匹配的数据读取器(DataReader)。如果 evaluate() 方法返回值为 false,则表示该样本不会被发送到其输入参数中由 Guid_t 标识的 DataReader。
强烈建议 evaluate() 方法的实现不要执行任何阻塞操作,也不应使用 DataWriter,因为这可能导致死锁或出现不可预期的行为。
当前,预过滤机制与 DataSharingQosPolicy 是不兼容的。因此,请确保将 kind() 设置为 OFF;或者如果使用 AUTO,需确保不满足相关约束条件。
接下来是一个使用预过滤机制的示例:
struct CustomUserWriteData : public eprosima::fastdds::rtps::WriteParams::UserWriteData
{CustomUserWriteData(const eprosima::fastdds::rtps::GuidPrefix_t& prefix): filtered_out_prefix(prefix){}eprosima::fastdds::rtps::GuidPrefix_t filtered_out_prefix;
};struct CustomPreFilter : public eprosima::fastdds::dds::IContentFilter
{// Custom evaluationbool evaluate(const SerializedPayload& payload,const FilterSampleInfo& filter_sample_info,const eprosima::fastdds::rtps::GUID_t& reader_guid) const override{static_cast<void>(payload);static_cast<void>(filter_sample_info);bool sample_should_be_sent = true;auto custom_write_data =std::static_pointer_cast<CustomUserWriteData>(filter_sample_info.user_write_data);// If the reader is the one to filter out, do not send the sampleif (custom_write_data->filtered_out_prefix == reader_guid.guidPrefix){sample_should_be_sent = false;}return sample_should_be_sent;}};// Create a DataWriter
DataWriter* data_writer =publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
{// Errorreturn;
}// Set a prefilter on the filtered reader
eprosima::fastdds::dds::ReturnCode_t ret = data_writer->set_sample_prefilter(std::make_shared<CustomPreFilter>());if (ret != eprosima::fastdds::dds::RETCODE_OK)
{// Errorreturn;
}// Create a custom user write data
eprosima::fastdds::rtps::GuidPrefix_t filtered_out_prefix;
std::istringstream("44.53.00.5f.45.60.52.4f.53.49.7c.41") >> filtered_out_prefix;std::shared_ptr<CustomUserWriteData> custom_write_data =std::make_shared<CustomUserWriteData>(filtered_out_prefix);// Set the custom user write data
eprosima::fastdds::rtps::WriteParams write_params;
write_params.user_write_data(custom_write_data);HelloWorld sample; //Auto-generated container class for topic data from Fast DDS-Gen
sample.msg("Hello there!"); // Add contents to the message// Write a sample with the custom user write data
data_writer->write(&sample, write_params);