RTPSParticipant构建流程
一. 概述:
RTPSParticipant的构建主要在RTPSParticipantImpl的构造函数中,主要包含下面几个流程:
-
注册默认UDPTransport
-
注册用于自定义Transport
-
确认元数据通信组播和单播地址
-
确认初始化对等端点(initialPeerList)
-
确认用户数据通信组播和单播地址
-
创建元数据通信和用户数据通信的ReceiverResource
-
创建BufferPool
-
创建FlowControl
-
创建和初始化BuiltinProtocol
下面的章节主要针对代码展开梳理这些流程。
二. 注册默认UDPTransport
如果创建DomainParticipant的时候传入的DomainParticipantQos中设置了如下属性,,就会默认注册UDPTransport到NetworkFactory中:
eprosima::fastdds::dds::DomainParticipantQos pqos = eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT;
pqos.transport().use_builtin_transports = true;
// Builtin transports by default
if (PParam.useBuiltinTransports)
{UDPv4TransportDescriptor descriptor;... // 设置UDPTransport的一些属性m_network_Factory.RegisterTransport(&descriptor, &m_att.properties); // 注册UDPTransportif (!is_intraprocess_only()){SharedMemTransportDescriptor shm_transport;... // 设置SHMTransport的一些属性has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);}
}
可以看到,如果使用了builtTransport,除了UDPTransport以外,SHMTransport也会被注册到NetworkFactory中。
DomainParticipantQos的transport成员(TransportConfigQos类型)中,use_builtin_transports默认就是true
RTPS_DllAPI TransportConfigQos(): QosPolicy(false), use_builtin_transports(true) // 默认为TRUE, send_socket_buffer_size(0), listen_socket_buffer_size(0)
{
}
向NetworkFactory注册Transport的用途是什么?
三. 注册用户自定义Transport
自定义的Transport来源于用户创建DDS DomainParticipant时传入DomainParticipantQos的transport成员中添加的TransportDescriptor,例如:
eprosima::fastdds::dds::DomainParticipantQos pqos = eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT;
std::shared_ptr<eprosima::fastdds::rtps::TCPv4TransportDescriptor> descriptor = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor->add_listener_port(tcp_port_);
pqos.transport().user_transports.push_back(descriptor);
上面这段代码向ParticipantQos中添加了TCPTransportDescriptor,这样,创建RTPSParticipant的时候就将TCPTransportDescriptor注册到NetworkFactory中。
NetworkFactory::RegisterTransport的作用就是根据TransportDescriptor的配置创建对应的Transport对象,例如,同样是TCPTransport,可能我们这次创建的Participant充当的是TCP Server的角色,那么对应的Descriptor需要额外设置listen端口:
descriptor->add_listener_port(tcp_port_);
每个Transport的构造都需要传入对应的TransportDescriptor,例如:
// TCPv4Transport.cpp
TransportInterface* TCPv4TransportDescriptor::create_transport() const
{return new TCPv4Transport(*this);
}TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor& descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor) // 传入的TCPv4TransportDescriptor作为配置
{...for (uint16_t port : configuration_.listening_ports) // 若是TCP Server类型,则初始化listen socket{Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
}
Transport的作用是什么?
Transport和TransportDescriptor的关系是什么?
三. 确认元数据通信组播和单播地址
这个步骤是最重要的,因为元数据地址的设置决定了Participant的Discovery功能,设置的不正确会导致Participant无法域内其他的Participant,或者被其他的Participant发现。
用户在创建DomainParticipant的时候,可以选择手动设置元数据通信地址(MetaData),包括元数据单播地址集合和元数据组播地址集合,可以选择设置初始化对等端点集合(initialPeerList),或者什么都不设置。
这里来看两种情况:
3.1 用户设置了元数据通信地址
首先来看元数据通信组播地址的设置流程
第一步是用已经注册了的传输协议来完善组播地址,如果用户没有给出端口,则根据domainId分配一个组播端口
组播端口的计算: PortBase(7400) + PortIncrement(250) * domainId + d0(0),例如domainid=83,则元数据组播端口为
7400 + 250* 83 + 0 = 28150,这个使用netstat来确认:
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
其中,239.255.0.1就是用户设置的元数据组播通信地址,分配的端口就是28150。
第二步是将完善好端口的组播地址交给已经注册到NetworkFactory中的各个传输协议来判断该协议是否支持这个组播地址,如果支持该地址,则对该地址进行Normalize动作:
void NetworkFactory::NormalizeLocators(LocatorList_t& locators)
{LocatorList_t normalizedLocators;std::for_each(locators.begin(), locators.end(), [&](Locator_t& loc){bool normalized = false;for (auto& transport : mRegisteredTransports){// Check if the locator is supported and filter unicast locators.if (transport->IsLocatorSupported(loc) &&(IPLocator::isMulticast(loc) ||transport->is_locator_allowed(loc))) // 判断当前纯属协议是否支持对应的地址{// First found transport that supports it, this will normalize the locator.normalizedLocators.push_back(transport->NormalizeLocator(loc)); // 对该地址做Normalize操作,返回地址集合normalized = true;}}if (!normalized){// 至少被一个传输协议支持的地址才会被加入到最终的地址集合中并且返回normalizedLocators.push_back(loc);}});
}
也就是说如果NetworkFactory中只有TCPTransport,那么传入的组播地址是会被从元数据组播地址集合中剔除的,因为传输协议不支持。
此外,如果协议支持该组播地址,还需要对该地址做Normalize操作,对于元数据组播地址,只有UDPTransport能够对组播地址做操作,来看下UDPTransport的Normalize函数做了什么事情:
LocatorList UDPv4Transport::NormalizeLocator(const Locator& locator) // 传入的地址
{LocatorList list; // 返回给NetworkFactory的地址集合if (IPLocator::isAny(locator)) // 传入的地址是0.0.0.0{std::vector<IPFinder::info_IP> locNames;get_ipv4s(locNames); // 轮循所有RUNNING状态的网卡的信息for (const auto& infoIP : locNames){auto ip = asio::ip::address_v4::from_string(infoIP.name); // 获取网卡的IPif (is_interface_allowed(ip)) // 判断该IP的网卡是否在白名单中,如果白名单没有设置,allow一直为true{Locator newloc(locator);IPLocator::setIPv4(newloc, infoIP.locator);list.push_back(newloc); // 将该网卡IP放入返回的IP列表中}}}else {list.push_back(locator); // 如果不是0.0.0.0,就直接返回}
也就是说,如果用户给出了具体的元数据组播地址(例如239.255.0.1和239.192.255.251),并且向NetworkFactory注册了UDPTransport,那么这两个地址会被直接作为元数据的组播通信地址,而如果用户给出的是0.0.0.0这样的地址作为元数据组播地址,那么最终会产生UDP:198.18.2.18:28150,UDP:10.117.21.20:28150这样一组元数据组播地址。
总结下来就是在用户提供了元数据组播地址,同时也注册了UDPTransport到NetworkFactory的情况下,如果提供的元数据组播地址不是any(0.0…0.0),那么就使用用户提供的元数据组播地址,否则,将使用网卡的IP作为元数据组播地址。
用户如果给出的Locator中指定了端口,那么使用用户提供的端口,否则根据domainId计算一个端口出来。
对于用户提供的元数据单播地址的处理流程和上面元数据组播地址的处理流程一致。
3.2 用户没有设置元数据通信地址
在用户没有设置元数据通信地址(这种情况指的是用户既没有提供元数据单播地址集合,也没内有提供元数据组播地址集合)时,NetworkFactory会让已注册的Transport返回默认的元数据通信地址(包括单播和组播地址):
if (m_att.builtin.metatrafficMulticastLocatorList.empty() && m_att.builtin.metatrafficUnicastLocatorList.empty())
{get_default_metatraffic_locators(); // 获取传输协议默认的元数据通信地址...
}void RTPSParticipantImpl::get_default_metatraffic_locators()
{// 计算元数据单播地址和组播地址配对的端口uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_);uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_,static_cast<uint32_t>(m_att.participantID));// 从注册的Transport中获取默认元数据组播地址m_network_Factory.getDefaultMetatrafficMulticastLocators(m_att.builtin.metatrafficMulticastLocatorList,metatraffic_multicast_port);// Normalize操作m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList);// 从注册的Transport中获取默认元数据组播地址m_network_Factory.getDefaultMetatrafficUnicastLocators(m_att.builtin.metatrafficUnicastLocatorList,metatraffic_unicast_port);// Normalize操作m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList);
}
以UDPTransport为例,来看下默认元数据单播和组播地址是什么:
const char* const DEFAULT_METATRAFFIC_MULTICAST_ADDRESS = "239.255.0.1";#define LOCATOR_ADDRESS_INVALID(a) {std::memset(a, 0x00, 16 * sizeof(octet));}bool UDPv4Transport::getDefaultMetatrafficMulticastLocators(LocatorList& locators,uint32_t metatraffic_multicast_port) const
{Locator locator;...IPLocator::setIPv4(locator, DEFAULT_METATRAFFIC_MULTICAST_ADDRESS); // 239.255.0.1...
}void set_Invalid_Address()
{LOCATOR_ADDRESS_INVALID(address);
}bool UDPv4Transport::getDefaultMetatrafficUnicastLocators(LocatorList& locators,uint32_t metatraffic_unicast_port) const
{Locator locator;...locator.set_Invalid_Address(); // address any(0.0.0.0)...
}
对于UDP传输协议来说,默认的元数据组播地址是239.255.0.1,默认的元数据单播地址是0.0.0.0
获取到默认的元数据组播和单播地址后,还要对这个地址做Normalize操作,也就是将0.0.0.0地址展开成白名单内的网卡的IP。
不同的传输协议的Normalize操作有什么不一样?
四. 确认初始化对等端点(initialPeerList)
一般来说,元数据通信地址和初始化对等端点集合这两个配置只设置一个就可以了,因为初始化对等端点相当于用户已经知道要通信的对端Particiant的具体地址了,也就不需要通过Discovery去发现对方了。
Fast DDS allows for the configuration of an initial peers list which contains one or more such IP-port address pairs corresponding to remote DomainParticipants PDP discovery listening resources, so that the local DomainParticipant will only send its PDP traffic to the IP-port address pairs specified in the initial peers list.
https://fast-dds.docs.eprosima.com/en/latest/fastdds/discovery/simple.html#initial-peers
初始化对等端点一般用于TCP的模式下,遍历整个FastDDS的demo代码目录,只有HelloWorldExampleTCP和Benchmark两个工程用到了initial peers list,并且这两个工程中创建Participant时ParticipantQos中,user_transport中都添加了TCPv4TransportDescriptor。
如果用户没有提供初始化对等端点,那么会使用元数据组播地址作为初始化对等端点,否则,对用户传入的每一个初始化对等端点,调用已注册Transport的configureInitialPeerLocator函数,Transport的configureInitialPeerLocator主要是对用户传入的initialPeerList中的Locator做端口上的完善,如果没有分配端口的,则根据domainId计算端口并且分配,对于TCPTransport来说,除了物理端口以外,还有一个逻辑端口需要分配。
initialPeerList的主要用在PDP的Writer中:
bool PDPSimple::createPDPEndpoints()
{...WriterAttributes watt; // 创建PDPWriterwatt.endpoint.remoteLocatorList = m_discovery.initialPeersList; // ...RTPSWriter* wout = nullptr;if (mp_RTPSParticipant->createWriter(&wout, watt, endpoints->writer.payload_pool_, endpoints->writer.history_.get(),...
}
看到PDPSimple创建builtin Writer的时候,传入的WriterAtrribute中,Endpoint属性中的remoteLocatorList就是initialPeersList,Writer需要通过SenderResource才可以借助其中的outputChannel向外发送数据:
PDPSimple::createPDPEndpointsRTPSParticipantImpl::createWriterRTPSParticipantImpl::create_writerRTPSParticipantImpl::createSendResources
bool RTPSParticipantImpl::createSendResources(Endpoint* pend) {if (pend->m_att.remoteLocatorList.empty()){// Adds the default locators of every registered transport.// 如果没有指定remoteLocatorList,则让注册的Transport提供默认通信,TCP没有提供,UDPv4提供的是239.255.0.1,UDPv6也提供了一个地址m_network_Factory.GetDefaultOutputLocators(pend->m_att.remoteLocatorList); }// PDPSimple创建Writer时提供的initialPeerList在这里就排上用处了,会被用于创建outputchannel,其中的socket的发送目标就是initialPeerfor (auto it = pend->m_att.remoteLocatorList.begin(); it != pend->m_att.remoteLocatorList.end(); ++it){if (!m_network_Factory.build_send_resources(send_resource_list_, (*it)))...
}
RTPSParticipantImpl会创建哪些SenderResource?分别在什么时候?
五. 确认用户数据通信组播和单播地址
这部分的工作和第三章接种确认元数据通信的组播和单播地址的操作基本类型,就是如果用户没有提供用户数据通信的单播和组播地址,则由注册的Transport来提供默认的,否则使用用户提供的,同时确认通信端口是否设置,没有设置的情况下通过domainid和participantid计算出端口并且赋值给loctor:
if (m_att.defaultUnicastLocatorList.empty() && m_att.defaultMulticastLocatorList.empty())
{//Default Unicast Locators in case they have not been provided/* INSERT DEFAULT UNICAST LOCATORS FOR THE PARTICIPANT */get_default_unicast_locators(); // 由NetworkFactory中已注册的Transport提供默认地址和端口,端口通过calculate_well_known_port计算出来
}
else
{// 由NetworkFactory中已注册的Transport分析传入的defaultUnicastLocatorList和defaultMulticastLocatorList中端口是否需要赋值...
}
六. 创建元数据通信和用户数据通信的ReceiverResource
步骤三和上面的步骤五已经将当前RTPSParticipant的元数据和用户数据的通信地址和端口都确定了,那么至少在这些地址和端口上可以创建ReceiverResource了:
createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false); // 元数据组播
createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false); // 元数据单播
createReceiverResources(m_att.defaultUnicastLocatorList, true, false); // 用户数据组播
createReceiverResources(m_att.defaultMulticastLocatorList, true, false); // 用户数据单播
ReceiverResource调用了Transport创建了用于接收消息(CDRMessage)的ChannelResource,同时ReceiverResource又实现了TransportReceiverInterface接口,是ChannelResource的消费者,不同的Transport在ChannelResource为父类的基础上派生出TCPChannelResource,UDPChannelResource和SharedMemoryResource,前两者中都包含用于网络通信的socket:
class UDPChannelResource : public ChannelResource
{
public:...
private:...eProsimaUDPSocket socket_;...
};class TCPAcceptorBasic : public TCPAcceptor
{
public:...
private:asio::ip::tcp::socket socket_; // server端accept的socket,用于和建立连接的client端socket进行通信
}class TCPChannelResourceBasic : public TCPChannelResource {...std::shared_ptr<asio::ip::tcp::socket> socket_; // 对于Server端,该socket用于listen,对于Client端,该socket用于connect和后面的收发...
}
ReceiverResource从ChannelResouce收到的是CDRMessage,但是并不知道如何处理,每个CDRMessage的Header中,都有指定接收和处理该消息的Endpoint,也就是RTPSReader(StatelessReader/StatefulReader),因此,ReceiverResource还需要借助MessageReceiver的帮助,才能准确地将消息投递到目标RTPSReader上:
bool RTPSParticipantImpl::createReceiverResources(...) {std::vector<std::shared_ptr<ReceiverResource>> newItemsBuffer;// 给每个地址创建ReceiverResourcefor (auto it_loc = Locator_list.begin(); it_loc != Locator_list.end(); ++it_loc){// 创建ReceiverResource,这里可能会创建出多个ReceiverResource,取决于NetworkFactory注册了多少Transportbool ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size);...for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer){m_receiverResourcelist.emplace_back(*it_buffer); //将创建出的每一个ReceiverResource添加到Participant的成员容器中auto mr = new MessageReceiver(this, (*it_buffer)->max_message_size());m_receiverResourcelist.back().mp_receiver = mr; // 绑定ReceiverResource和MessageReceiver‘’...}
}void RTPSParticipantImpl::enable()
{mp_builtinProtocols->enable();//Start receptionfor (auto& receiver : m_receiverResourcelist){receiver.Receiver->RegisterReceiver(receiver.mp_receiver); // 将MessageReceiver注册给ReceiverResource}
}bool MessageReceiver::proc_Submsg_Data(...) {...EntityId_t readerID;valid &= CDRMessage::readEntityId(msg, &readerID); // 读取CDRMessage中该消息的目标Reader的EntityID...//Look for the correct reader to add the changeprocess_data_message_function_(readerID, ch); // 绑定的process_data_message_without_security函数...
}void MessageReceiver::process_data_message_without_security(const EntityId_t& reader_id,CacheChange_t& change)
{auto process_message = [&change](RTPSReader* reader){reader->processDataMsg(&change);};findAllReaders(reader_id, process_message); // 找到合适的RTPSReader处理该消息
}
七. 创建BufferPool
这里的BufferPool指的是SendBuffer,具体是指SendBufferManager,该类负责维护buffer pool,由RTPSParticipantImpl负责创建并且维护:
// Create buffer pool
send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers));
send_buffers_->init(this);
SendBuffersManager内部维护了一个RTPSMessageGroup_t的数组,提供给外部的接口中包括获取和归还RTPSMessageGroup_t对象。
class SendBuffersManager
{
public:...std::unique_ptr<RTPSMessageGroup_t> get_buffer(const RTPSParticipantImpl* participant);void return_buffer(std::unique_ptr <RTPSMessageGroup_t>&& buffer);...
}
RTPSPartcipantImpl创建的SendBufferManager并没有被使用到:
RTPSMessageGroup::RTPSMessageGroup(RTPSParticipantImpl* participant,bool internal_buffer) // 该参数决定了是否使用RTPSParticipantImpl内部的SendBuffer: participant_(participant), send_buffer_(!internal_buffer ? participant->get_send_buffer() : nullptr), internal_buffer_(internal_buffer)
{...
}class FlowControllerImpl : public FlowController
{...add_new_sample_impl(...) {...// 未使用internal sendbufferfastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector);...}
}
无论StatelessWriter还是StatefulWriter,发送CacheChange的时候都依赖FlowControllerImpl的add_new_sample:
void StatelessWriter::unsent_change_added_to_history(...) {...flow_controller_->add_new_sample(this, change, max_blocking_time);...
}
八. 创建FlowControl
这里创建FlowControl并不是只创建一个FlowControl,而是通过FlowControlFactory创建多个不同类型的FlowControl:
flow_controller_factory_.init(this);
FlowControlFactory默认会创建三种类型的FlowControl:
const char* const pure_sync_flow_controller_name = "PureSyncFlowController";
const char* const sync_flow_controller_name = "SyncFlowController";
const char* const async_flow_controller_name = "AsyncFlowController";void FlowControllerFactory::init(...) {...flow_controllers_.insert(new FlowControllerImpl<FlowControllerPureSyncPublishMode,FlowControllerFifoSchedule>(...))...flow_controllers_.insert(new FlowControllerImpl<FlowControllerSyncPublishMode,FlowControllerFifoSchedule>(...))...flow_controllers_.insert(new FlowControllerImpl<FlowControllerAsyncPublishMode,FlowControllerFifoSchedule>(...))
}
创建出来的FlowControl会被StatelessWriter/StatefulWriter使用到:
bool RTPSParticipantImpl::create_writer(...) {...// 获取默认FlowControl(可以在WriterAttributes中设置flow_controller_name来决定使用哪个类型的FlowController)if (nullptr == flow_controller &&(fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT == flow_controller_name ||ASYNCHRONOUS_WRITER == param.mode)){// 从FlowControllerFactory中提取flow_controller = flow_controller_factory_.retrieve_flow_controller(flow_controller_name, param);}...RTPSWriter* SWriter = nullptr;SWriter = callback(guid, param, flow_controller, persistence, param.endpoint.reliabilityKind == RELIABLE);...
}
FlowControl会用于拼接完整的RTPSMessage消息并且基于同步/异步的流程将消息发送出去。
九. 创建和初始化BuiltinProtocol
mp_builtinProtocols = new BuiltinProtocols();// Initialize builtin protocols
if (!mp_builtinProtocols->initBuiltinProtocols(this, m_att.builtin))
{EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "The builtin protocols were not correctly initialized");return;
}
BuiltinProtocol根据BuiltinAttributes属性来决定创建PDP和EDP成员的类型(Simple,Static等)
不同的传输协议的Normalize操作有什么不一样?
这里有一个新的概念,就是逻辑端口,这个是TCPTransport特有的,在Locator结构中,包含地址和端口
class Locator_t
{int32_t kind; // TCP4, TCP6, UDP4, UDP6, SHM/// Network portuint32_t port;/// IP addressoctet address[16];...
}
这里的端口port是32位的,而Socket的端口一般都是16位的,因此,Locator_t的端口是包含了两部分的:物理端口和逻辑端口,两个端口各占用2字节(16bit)。
只有TCPTransport用得到逻辑端口,因为TCP是有连接的,为了最大化利用已连接的channel通路,因此在已经建立好的TCP Connection上面在对每个ChannelResource再分配一个逻辑端口,这样,多个ChannelResource就可以复用同一个TCP连接了。
TCPTransport的逻辑端口的运行逻辑是怎样的?
TCPTransportInterface::configureInitialPeerLocator函数的作用就是将用户传入的initialPeerList中的地址列表做一个处理,没有配置端口的配置端口(物理和逻辑端口),按照TCPTransportInterface::configureInitialPeerLocator的做法,每个物理端口会对应四个逻辑端口,每个TCP的InputChannel会绑定一个逻辑端口。