Fast DDS 默认传输机制详解:共享内存与 UDP 的智能选择
Fast DDS 默认传输机制详解:共享内存与 UDP 的智能选择
🧩 一、Fast DDS 默认传输层机制概览
Fast DDS 的通信底层是由 RTPS(Real-Time Publish-Subscribe)协议驱动的,而 RTPS 的数据实际传输由 Transport 层 决定。
Fast DDS 支持多种传输方式(Transport Layer Plugins):
传输层类型 | 类名 | 说明 |
---|---|---|
共享内存传输(SHM) | SharedMemTransportDescriptor | 用于同一主机不同进程间通信(零拷贝) |
UDPv4 传输 | UDPv4TransportDescriptor | 用于跨主机网络通信 |
UDPv6 传输 | UDPv6TransportDescriptor | IPv6 网络通信 |
TCP 传输(可选) | TCPv4TransportDescriptor | 可选插件,需手动启用 |
内存内传输(Intraprocess) | 内置 | 同一进程内通信 |
⚙️ 二、默认配置
你当前的代码中:
DomainParticipantQos participant_qos = PARTICIPANT_QOS_DEFAULT;
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participant_qos);
这就意味着使用了 Fast DDS 默认 QoS + 默认 Transport 配置。
根据官方文档(2.3 之后的版本):
✅ 默认启用 UDPv4 和 Shared Memory (SHM) 传输层,且共享内存优先用于 同主机进程间通信,UDP 仅在检测到远程主机时使用。
🔍 三、默认传输选择逻辑(自动判断)
当 Publisher 和 Subscriber 匹配成功后,Fast DDS 内部会自动判断:
场景 | 实际使用的传输层 | 说明 |
---|---|---|
同一进程 | Intraprocess 通信(内存直传) | 完全无网络、无拷贝 |
同一主机(不同进程) | Shared Memory (SHM) | 使用共享内存环形缓冲区 |
不同主机 | UDPv4 | 标准网络通信 |
💡 优先级顺序:Intraprocess > Shared Memory > UDP
🛠️ 四、自定义 QoS 配置传输层
1. 完全自定义传输配置
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/SharedMemTransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>// 创建自定义 QoS
DomainParticipantQos participant_qos;// 禁用内置传输,使用自定义配置
participant_qos.transport().use_builtin_transports = false;// 方案1:只使用 UDP(禁用共享内存)
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
udp_transport->sendBufferSize = 65536; // 自定义缓冲区大小
udp_transport->receiveBufferSize = 65536;
participant_qos.transport().user_transports.push_back(udp_transport);// 方案2:只使用共享内存(禁用网络)
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
shm_transport->segment_size(16 * 1024 * 1024); // 16MB 共享内存段
participant_qos.transport().user_transports.push_back(shm_transport);// 方案3:同时使用 UDP 和共享内存,但调整优先级
participant_qos.transport().user_transports.push_back(shm_transport);
participant_qos.transport().user_transports.push_back(udp_transport);// 创建参与者
participant_ = factory->create_participant(0, participant_qos);
2. 基于内置传输的微调
// 使用内置传输,但调整参数
DomainParticipantQos participant_qos = PARTICIPANT_QOS_DEFAULT;// 获取内置 UDP 传输描述符并调整
auto udp_transport = std::static_pointer_cast<eprosima::fastdds::rtps::UDPv4TransportDescriptor>(participant_qos.transport().user_transports.front());if (udp_transport) {udp_transport->maxMessageSize = 65536; // 最大消息大小udp_transport->sendBufferSize = 131072; // 发送缓冲区udp_transport->receiveBufferSize = 131072; // 接收缓冲区
}
3. 发布者/订阅者级别的传输配置
// 发布者 QoS 配置
DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT;
writer_qos.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE;
writer_qos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;// 订阅者 QoS 配置
DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
reader_qos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
reader_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
reader_qos.history().depth = 50;
📄 五、传输层验证与监控
1. 环境变量启用详细日志
# Windows
set FASTDDS_LOG_VERBOSITY=info
set FASTDDS_LOG_FILTER=RTPS_TRANSPORT# Linux
export FASTDDS_LOG_VERBOSITY=info
export FASTDDS_LOG_FILTER=RTPS_TRANSPORT
2. 编程方式启用传输日志
#include <fastdds/dds/log/Log.hpp>// 在程序开始时设置日志
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Info);
eprosima::fastdds::dds::Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT"));
3. 完整的传输监控示例代码
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <iostream>class TransportMonitoringListener : public eprosima::fastdds::dds::DomainParticipantListener
{
public:virtual void on_participant_discovery(eprosima::fastdds::dds::DomainParticipant* participant,eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override {if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) {std::cout << "发现新的参与者: " << info.info.m_participantName << std::endl;print_transport_info(participant);}}void print_transport_info(eprosima::fastdds::dds::DomainParticipant* participant) {auto rtps_participant = participant->get_rtps_participant();if (!rtps_participant) return;std::cout << "当前传输统计:" << std::endl;// 获取所有已注册的传输auto& transports = rtps_participant->get_network_factory().get_all_transport_descriptors();for (const auto& transport : transports) {std::cout << " - 传输类型: " << transport->get_type_name() << std::endl;}// 获取发送资源限制auto send_resource_limits = rtps_participant->get_attributes().allocation.send_ports;std::cout << " - 发送端口限制: " << send_resource_limits.initial << "/" << send_resource_limits.maximum << std::endl;}
};// 使用监控监听器
void create_monitored_participant()
{auto factory = eprosima::fastdds::dds::DomainParticipantFactory::get_instance();eprosima::fastdds::dds::DomainParticipantQos qos = eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT;auto listener = std::make_shared<TransportMonitoringListener>();auto participant = factory->create_participant(0, qos, listener.get());if (participant) {std::cout << "参与者创建成功,开始监控传输层..." << std::endl;}
}
4. 实时传输类型检测
#include <fastdds/rtps/transport/TransportInterface.h>class TransportDetector {
public:static void detect_current_transport(eprosima::fastdds::dds::DataReader* reader) {auto rtps_reader = reader->get_rtps_reader();if (!rtps_reader) return;// 获取当前匹配的写入器eprosima::fastrtps::rtps::GUID_t writer_guid;// 这里需要根据实际匹配情况获取 GUIDstd::cout << "检测数据传输路径..." << std::endl;// 通过检查本地定位器来判断传输类型auto& att = rtps_reader->getAttributes();for (const auto& locator : att.endpoint.unicastLocatorList) {std::string transport_type = "未知";if (locator.kind == LOCATOR_KIND_UDPv4) {transport_type = "UDPv4";} else if (locator.kind == LOCATOR_KIND_SHM) {transport_type = "共享内存(SHM)";}std::cout << " - 定位器: " << locator << " -> 传输: " << transport_type << std::endl;}}
};
🧪 六、实际日志输出示例
启用传输日志后,你会看到类似输出:
[RTPS_TRANSPORT] SHM Transport registered.
[RTPS_TRANSPORT] UDPv4 Transport registered.
[RTPS_TRANSPORT] Using SHM transport for locator: shm://0.0.0.0
[RTPS_TRANSPORT] Using UDPv4 transport for locator: udp://192.168.1.5
[RTPS_MSG_OUT] Sending message via SHM to participant [0.0.1.c1]
[RTPS_MSG_IN] Receiving message via UDPv4 from 192.168.1.6:7412
日志分析:
SHM Transport registered
→ 共享内存传输已注册UDPv4 Transport registered
→ UDPv4 传输已注册Using SHM transport
→ 实际使用共享内存传输(本地通信)Using UDPv4 transport
→ 实际使用 UDP 传输(跨机通信)
🚫 七、常见问题排查
1. 强制使用特定传输
// 强制使用 UDP,即使在同一主机上
DomainParticipantQos qos = PARTICIPANT_QOS_DEFAULT;
qos.transport().use_builtin_transports = false;auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
// 禁用回环,强制走网络
udp_transport->interfaceWhiteList.push_back("192.168.1.0");
qos.transport().user_transports.push_back(udp_transport);
2. 共享内存问题诊断
// 检查共享内存配置
auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
shm_transport->segment_size(32 * 1024 * 1024); // 32MB
shm_transport->max_message_size(64 * 1024); // 64KB 最大消息// 在 Linux 上检查共享内存段
std::cout << "检查共享内存段..." << std::endl;
system("ipcs -m");
🧩 八、总结
项目 | 默认行为 | 自定义配置能力 |
---|---|---|
同进程通信 | 内部队列(零拷贝) | ✅ 可调整缓冲区大小 |
同主机进程间通信 | ✅ 使用共享内存 | ✅ 可禁用或参数调优 |
跨主机通信 | ✅ 使用 UDPv4 | ✅ 可配置网络参数 |
传输优先级 | SHM > UDP | ✅ 可调整传输顺序 |
监控能力 | 基础日志 | ✅ 编程式详细监控 |
最佳实践建议:
- 生产环境:保持默认配置,让 Fast DDS 自动选择最优传输
- 调试环境:启用传输日志验证实际使用的传输层
- 特定场景:根据需要自定义传输配置(如容器环境、特定网络需求)
- 性能优化:根据数据大小调整共享内存段和网络缓冲区
你的代码使用默认配置就能获得最佳的性能和灵活性,Fast DDS 会自动为你选择最高效的通信方式!