FastDDS Transport功能模块初步整理
一. 总体结构
二. 主要类的功能
2.1 TransportDescriptor和TransportInterface
FastDDS中整个Transport类的设计遵循的是设计模式中的建造者模式,其中,TransportDescriptor就是建造者,而TransportInterface则是建造出来的产品。
TransportDescriptor是抽象类,申明了后续实现类必须实现的最终要的一个函数,也是建造者需要完成的主要工作,就是建造出实际的Transport对象:
struct RTPS_DllAPI TransportDescriptorInterface
{ .../*** Factory method pattern. It will create and return a TransportInterface* corresponding to this descriptor. This provides an interface to the NetworkFactory* to create the transports without the need to know about their type*/virtual TransportInterface* create_transport() const = 0;...
};
TransportInterface定义了Transort的接口,所有的Transport都必须实现该接口,根据传输方式的不同有TCPTransportInterface, UDPTransportInterface和SharedMemTransport,而根据TCP/UDP版本的不同,前两者分别有V4和V6两个派生类。
TransportInterface接口类的初衷是为了在FastRTPS中隔离传输层的实现,用户需要实现基于特定传输层协议(TCP, UDP, SharedMem)的通道和对应地址之间的代码逻辑。TransportInterface接口类中申明了实现类必须实现的一些接口(init,IsInputChannelOpen,IsLocatorSupported, is_locator_allowed, RemoteToMainLocal,transform_remote_locator等),其中最重要的两个接口是OpenOutputChannel和OpenInputChannel这两个接口函数,前者会创建SenderResource而后者创建ReceiverResource。
2.2 TCPTransportInterface和UDPTransportInterface
2.2.1 TCPTransportInterface:
TCPTransportInterface定义并且实现了基于TCP传输层的通信操作,主要接口都在TCPTransportInterface中,另外还有一个辅助类TCPAcceptor用于实现TCP独有的连接操作,接收客户端的连接。
TCPTransoprtInterface中的perform_listen_operation函数实现基于TCP传输层的数据接收操作,send函数实现基于TCP传输层的数据发送操作,在初始化TCPTransportInterface的时候,如果发现TCPTransportDescriptor中定义了listen_port,那么会额外创建TCPAcceptBasic对象:
TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor& descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor)
{...// 如果TCPv4TransportDescriptor配置了listening_ports监听端口,则创建TCPAcceptorBasicfor (uint16_t port : configuration_.listening_ports){Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
}
TCPAcceptBasic可以理解为对于listen socket的封装,主要执行listen操作,返回建立连接了的client socket给到TCPTransoprtInterface:
void TCPAcceptorBasic::accept(TCPTransportInterface* parent)
{...acceptor_.async_accept(socket_,[parent, locator, this](const std::error_code& error){if (!error){auto socket = std::make_shared<tcp::socket>(std::move(socket_)); // 返回建立连接的客户端socketparent->SocketAccepted(socket, locator, error); //TCPTransoprtInterface根据该socket创建ChannelResource}});
}
2.2.2 UDPTransportInterface:
UDPTRansportInterface的接口结构比TCPTransportInterface要简单很多,并且接口函数也会少许多,因为没有TCP的连接操作,此外,UDPTransportInterface中没有实现perform_listen_operation接口(UDP的perform_listen_operation接口在UDPChannelResource中实现),send函数实现基于UDP传输层的数据发送操作。
2.2.3 OpenInputChannel和OpenOutputChannel
在2.1 TransportInterface中说过,其最重要的接口就是OpenInputChannel和OpenOutputChannel,下面以UDPTransportInterface来看下这两个接口实现了什么功能。
首先,这两个接口的名字中都带有Channel,但是不要被名字误导,认为这两个接口都会和下面的ChannelResource打交道,从代码看,只有InputChannel才会创建ChannelResource,而OutputChannel创建的是SenderResource。
OpenInputChannel:
bool UDPv4Transport::OpenInputChannel(const Locator& locator,TransportReceiverInterface* receiver,uint32_t maxMsgSize) {...if (!IsInputChannelOpen(locator)){success = OpenAndBindInputSockets(locator, receiver, IPLocator::isMulticast(locator), maxMsgSize);}... // 如果是组播地址,下面还会创建额外的ChannelResource,同时将网卡加入组播地址
}bool UDPTransportInterface::OpenAndBindInputSockets(...) {...std::vector<std::string> vInterfaces = get_binding_interfaces_list(); // 获取白名单网卡列表for (std::string sInterface : vInterfaces){ // 在每张网卡上创建InputChannelResource(一般来说用户会通过白名单限制用于DDS通信的网卡,不太会出现在多个网卡上创建的情况)UDPChannelResource* p_channel_resource;p_channel_resource = CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver);mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(p_channel_resource);}...
}UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(...) {// 创建接收数据用的socketeProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface,IPLocator::getPhysicalPort(locator), is_multicast);// 创建ChannelResource,ChannelResource包装传入的socket,并且对外提供接收数据的统一接口UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator,sInterface, receiver);return p_channel_resource;
}
OpenOutputChannel:
bool UDPTransportInterface::OpenOutputChannel(SendResourceList& sender_resource_list, // 函数中创建的SenderResource放在这个list中返回给上层const Locator& locator)
{...if (is_interface_whitelist_empty()) { ... } // 没有配置网卡白名单,会在0.0.0.0地址上创建socket和SenderResourceelse {// 获取网卡列表(需要剔除已经在locator参数上创建SenderResource的网卡)get_unknown_network_interfaces(sender_resource_list, locNames, true);for (const auto& infoIP : locNames){ // 创建用于发送数据的socketeProsimaUDPSocket unicastSocket = OpenAndBindUnicastOutputSocket(generate_endpoint(infoIP.name, port), port);SetSocketOutboundInterface(unicastSocket, infoIP.name); // 设置多播数据的发送接口...sender_resource_list.emplace_back( // 创建SenderResource,用于包装上面创建的发送socket的操作接口static_cast<SenderResource*>(new UDPSenderResource(*this, unicastSocket, false, true)));}}...
}
2.3 ChannelResource, TCPChannelResource, UDPChannelResource
ChannelResource包装了用于接口数据的socket,并且对外提供相对统一的操作接口(针对不同的传输类型还是有所区别的)
ChannelResource是接口类,主要提供了用于接收数据使用的线程成员thread_以及保存接收到的消息的message_buffer_成员,后续可以看到transportinterface的perform_listen_operation函数就是在这个thread_成员上运行的。
class ChannelResource {...inline void thread(std::thread&& pThread){// 初始化接收数据的线程thread_...}...
};UDPChannelResource::UDPChannelResource(...): ChannelResource(maxMsgSize)...
{ //指定thread_线程运行perform_listen_operation函数thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator));
}
TCPChannelResource和UDPChannelResource两者都包含了用于接收数据使用的socket对象,除此以外,两者的接口差距还是挺大的(TCP和UDP的通信流程本来就有区别),TCPChannelResource的接口比较多,实现也很复杂,并且不属于DDS协议中约定的默认通信方式,因此不需要太关注TCP的视线。
这里主要来看UDPChannelResource的接口,UDPChannelResource的接口很简单,就是perform_listen_operation:
void UDPChannelResource::perform_listen_operation(Locator input_locator) {// set thread name for debugpthread_setname_np(pthread_self(), "UDPChannel"); // thread_的线程名称为UDPChannelLocator remote_locator;while (alive()){// Blocking receive.auto& msg = message_buffer();if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) // Receive函数调用socket的recv_from接收数据(阻塞式){continue;}// 将收到的消息交个MessageReceiver去处理(通过message_receiver接口主导到UDPChannelResource中)...}
}
2.4 SenderResource, UDPSenderResource
SenderResource从名字看就是用于发送数据的,SenderResource是接口类,其中最重要的一个接口就是send,send内部调用了send_lambda_这个std::function类型的成员,send_lambda_成员有SenderResource的派生类来实现:
对于UDPSenderResource来说,send_lambda_函数对象依赖UDPTransportInterface的接口来完成最终的数据发送:
send_lambda_ = [this, &transport](...) -> bool{ // 调用UDPTransportInterface::send接口, 并且将socket对象传入return transport.send(data, dataSize, socket_, destination_locators_begin,destination_locators_end, only_multicast_purpose_, whitelisted_,max_blocking_time_point);};
[!TIP]
读到这段代码的时候,不太理解为什么不在UDPSenderResource的send_lambda_中直接实现send的逻辑代码,反而要依赖UDPTransportInterface来实现。个人理解是不是因为SenderResource这个类主要功能还是体现在Resource上,说明其只是一个用来保管发送socket的资源类而已,个人猜想。
RTPSParticipantImpl是SenderResource的容器,保存了其创建的所有SenderResource,在发送数据的时候会调用这些SenderResource的接口:
bool sendSync(...) {...for (auto& send_resource : send_resource_list_) // send_resource_list_中保存了该Participant创建的所有SenderResource{...send_resource->send(msg->buffer, msg->length, &locators_begin, &locators_end, max_blocking_time_point);}...
}
第三章节中的3.2发送数据中会说明SenderResource的send接口是怎么被调用到的,调用栈是如何的。
2.5 ReceiverResource, MessageReciever
ReceiverResource实现TransportReceiverInterface接口,TransportReceiverInterface这个接口的用途从名字看就是用于接收Transport传输层收到数据的接口,其实现了TransoprtReceiverInterface接口中的OnDataReceived函数接口,OnDataReceived函数接口在UDPChannelResource的数据接收接口perform_listen_operation函数中被调用:
void UDPChannelResource::perform_listen_operation(Locator input_locator) {...// Processes the data through the CDR Message interface.if (message_receiver() != nullptr){message_receiver()->OnDataReceived(msg.buffer, msg.length, input_locator, remote_locator);}...
}
ReceiverResource的OnDataReceived接口实现非常简单,就是将Transoprt收到的CDRMessage交给MessageReceiver,MessageReceiver会对该CDRMessage进行进一步细化的处理,看过DDS协议文档的应该知道一个完整的DDS Message包括Header和Submessages两部分,其中Submessages中包含不止一种子消息(HeartBeat,GAP,Timestamp,Data, DataFrag等等)。
而MessageReceiver的作用就是解析ReceiverResource传递过来的CDRMessage,解析其中的Header以及各个子消息,可以看到MessageReceiver中定义了各种proess函数用来处理不同的子消息:
这些proc函数中最重要的一个函数是process_data_message_without_security(假设没有开启TLS功能,开启的话就是with_security),该函数将Data子消息中的payload交给RTPSReader进行处理:
void MessageReceiver::process_data_message_without_security(const EntityId_t& reader_id,CacheChange_t& change)
{auto process_message = [&change](RTPSReader* reader){reader->processDataMsg(&change);};findAllReaders(reader_id, process_message); // 找到对应EntityID的RTPSReader,调用其processDataMsg处理Data子消息
}
2.6 LocatorSelector, LocatorSelectorSender和LocatorSelectorEntry
LocatorSelectorSender: Class used by writers to inform a RTPSMessageGroup object which remote participants will be addressees of next RTPS submessages. (Writer使用LocatorSelectSender告知RTPSMessageGroup下一包要发送的RTPS消息要发给哪些远端Particiant)
LocatorSelector: A class used for the efficient selection of locators when sending data to multiple entities. (当发送数据给不同的远端Reader时,该类可以协助选择合适的Locator地址进行发送)
LocatorSelectorEntry: This class holds the locators of a remote endpoint along with data required for the locator selection algorithm. (该类报错了某个远端Endpiont的地址信息,地址信息回被用在locator选择上)
LocatorSelector内部对于每一个匹配的远端RTPSReader,都维护了其Locator信息在LocatorSelectorEntry中。然后LocatorSelector是LocatorSelectorSender的内部成员,而LocatorSelectorSender是每个StatelessWriter/StatefulWriter的内部成员,当Writer匹配到远端Reader的时候,会将远端Reader的Locator信息(以LocatorSelectorEntry的方式)更新到LocatorSelectorSender的LocatorSelector成员中:
bool StatelessWriter::matched_reader_add(...) {...filter_remote_locators(*new_reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());...
}
而在RTPSWriter发送RTPSMessage的时候,则可以根据locator_selector_.locator_selector找到目前还存活的匹配的远端RTPSRerader的LocatorSelectorEntry,取出其中的Locator作为消息的发送目的地:
DeliveryRetCode StatelessWriter::deliver_sample_nts(...) {...locator_selector.locator_selector.reset(true); //重新选择目前还存活的RTPSReader的LocatorSelectorEntry作为目标地址集合size_t num_locators = locator_selector.locator_selector.selected_size() + fixed_locators_.size();if (0 < num_locators) {...if (group.add_data(*cache_change, is_inline_qos_expected_)) { // 向RTPSMessageGroup添加Data_Submessage...}}
}bool LocatorSelectorSender::send(CDRMessage_t* message,std::chrono::steady_clock::time_point max_blocking_time_point) const
{return writer_.send_nts(message, *this, max_blocking_time_point);
}bool RTPSWriter::send_nts(...) {RTPSParticipantImpl* participant = getRTPSParticipant();// 向LocatorSelectorSender.LocatorSelector中每一个有效的RTPSReader的Locator发送消息return locator_selector.locator_selector.selected_size() == 0 ||participant->sendSync(message, m_guid, locator_selector.locator_selector.begin(),locator_selector.locator_selector.end(), max_blocking_time_point);
}
三. 主要流程
3.1 初始化
初始化流程中,RTPSParticipantImpl和各种Endpoint会创建各个SenderResource/ReceiverResource,这里主要梳理一下会创建哪些Resource,在什么时候创建的:
对于RecevierResource创建的socket,其端口一般是有固定的计算规则的,根据domainid,participantid以及是地址是否为组播地址可以算出固定的端口。
对于SenderResource创建的socket,则是随机绑定未使用的端口,而且因为每个RTPSWriter知道匹配的Reader信息(保存在ReaderProxy/ReaderLocator中),因此,socket绑定的IP都是本地网卡的IP,然后发送的时候使用sendto发送到Reader的地址和端口上去就行了。
3.2 发送数据
从RTPS层看,发送数据时,我们一般向RTPSWriter索取一个CacheChange,然后将要发送的数据填充到CacheChange的payload中,最后将这个CacheChange加入到WriterHistory中。
从我们将数据填充到CacheChange到通过SenderResource关联的socket发送到对端的大致流程如下:
3.3 接收数据
接收数据的工作从UDPChannelResource的thread_线程开始的,前面说到过,UDPChannelResource的thread_线程被用来运行perform_listen_operation函数,该函数中调用关联的socket执行receive_from操作,从绑定的Locator上读取数据,读取到的数据通过MessageReceiver,RTPSParticipantImpl最终到达RTPSReader手上: