当前位置: 首页 > news >正文

CycloneDDS:跨主机多进程通信全解析

分布式通信DDS及CycloneDDS详解

1. DDS基本概念

DDS(Data Distribution Service,数据分发服务)是一种面向实时系统的分布式通信 middleware,基于发布-订阅模式,专注于高效、可靠的数据共享。其核心特点包括:

  • 数据为中心:通信围绕“主题(Topic)”展开,发布者和订阅者通过相同主题交互,无需知道对方位置。
  • 灵活的QoS(服务质量):支持可靠性、实时性、持久性等配置(如可靠传输/尽力传输、历史数据保留等)。
  • 自动发现:分布式节点(进程/主机)可自动发现彼此,无需手动配置通信地址。
2. CycloneDDS简介

CycloneDDS是Eclipse基金会的开源DDS实现,轻量、高性能,支持多平台(Linux/Windows/macOS)和多语言(C/C++/Python等),广泛用于工业控制、自动驾驶等场景。

一、服务发现(Service Discovery)

服务发现是DDS的核心能力,确保分布式节点(跨进程/跨主机)自动识别彼此并建立通信。CycloneDDS遵循DDS规范的SPDP/SEDP协议实现发现:

1.1 核心协议
  • SPDP(Simple Participant Discovery Protocol):用于发现“参与者(Participant)”。参与者是DDS的核心实体(每个进程通常创建一个),启动时通过组播(Multicast) 向同一“域(Domain)”的其他参与者广播自身信息(如域ID、UUID、网络地址)。
  • SEDP(Simple Endpoint Discovery Protocol):在参与者发现后,交换“端点(Endpoint)”信息。端点包括发布者(Publisher)、订阅者(Subscriber)、数据写入者(DataWriter)、数据读取者(DataReader),以及它们关联的主题和QoS。
1.2 多主机发现机制
  • 跨主机时,CycloneDDS默认使用组播地址(如239.255.0.1)和端口(默认7400)广播SPDP/SEDP消息,确保同一域内的所有主机都能收到。
  • 若组播不可用(如网络限制),可配置静态发现列表(在cyclonedds.xml中指定其他主机的IP和端口)。
1.3 域(Domain)的作用

域是DDS的逻辑隔离单元,通过域ID(Domain ID) 区分。只有同一域ID的参与者才能互相发现和通信,避免不同系统间的干扰(如域ID=0和域ID=1的节点完全隔离)。

二、通信机制

DDS通信基于“主题(Topic)”,发布者通过DataWriter向主题写入数据,订阅者通过DataReader从主题读取数据。CycloneDDS自动处理底层网络传输(序列化、路由、反序列化)。

2.1 核心流程
  1. 定义数据类型:用IDL(接口定义语言)描述主题的数据结构(如传感器数据、控制指令)。
  2. 创建参与者:每个进程初始化一个DDS参与者(指定域ID)。
  3. 创建主题:关联数据类型和主题名称(如“Temperature”)。
  4. 发布/订阅:发布者创建DataWriter写入数据;订阅者创建DataReader监听数据(可通过回调或轮询获取)。
2.2 多进程/多主机通信
  • 多进程:同一主机的多个进程可创建不同参与者(同一域),通过相同主题通信。
  • 多主机:不同主机的进程(同一域)通过服务发现识别后,直接建立点对点通信(UDP/TCP)传输数据,无需中心节点转发。

三、多进程管理

在多主机、多进程场景中,需关注以下管理要点:

3.1 进程生命周期
  • 启动:进程初始化时创建参与者、主题、发布者/订阅者、DataWriter/DataReader。
  • 运行:通过DataWriter持续发布数据;通过DataReader接收数据。
  • 退出:显式销毁DDS对象(避免资源泄漏),CycloneDDS提供dds_delete系列API。
3.2 资源隔离与冲突避免
  • 同一主机的多进程需使用独立的参与者(避免共享资源冲突)。
  • 不同进程的主题名称需一致(否则无法通信),但可通过QoS(如Partition)实现逻辑分组(如“ProcessA/Temp”和“ProcessB/Temp”隔离)。
3.3 配置优化

通过cyclonedds.xml配置网络参数(如指定网卡、组播地址、缓冲区大小),确保多主机通信稳定性。示例配置:

<?xml version="1.0" encoding="UTF-8"?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><Domain id="0"><General><NetworkInterfaceAddress>eth0</NetworkInterfaceAddress> <!-- 指定网卡 --></General><Discovery><MulticastGroup>239.255.1.1</MulticastGroup> <!-- 自定义组播地址 --></Discovery></Domain>
</CycloneDDS>

四、示例代码(C语言)

以下示例实现跨主机/多进程的字符串消息通信:发布者发送消息,订阅者接收消息。

步骤1:定义IDL数据类型

创建Chat.idl定义消息结构:

module Chat {struct Message {string content; // 消息内容long sender_id; // 发送者ID};
};
步骤2:生成DDS代码

使用CycloneDDS的idlc工具编译IDL(生成序列化/反序列化代码):

idlc Chat.idl  # 生成Chat.c和Chat.h
步骤3:发布者代码(publisher.c)
#include "Chat.h"
#include <dds/dds.h>
#include <stdio.h>
#include <unistd.h>int main(int argc, char**argv) {dds_entity_t participant, topic, publisher, writer;Chat_Message msg = {.sender_id = 1}; // 发送者ID=1dds_return_t ret;// 1. 创建参与者(域ID=0)participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);if (participant < 0) dds_fatal("创建参与者失败");// 2. 创建主题(名称="ChatTopic",类型=Chat_Message)topic = dds_create_topic(participant, &Chat_Message_desc, "ChatTopic", NULL, NULL);if (topic < 0) dds_fatal("创建主题失败");// 3. 创建发布者publisher = dds_create_publisher(participant, NULL, NULL);if (publisher < 0) dds_fatal("创建发布者失败");// 4. 创建数据写入者writer = dds_create_writer(publisher, topic, NULL, NULL);if (writer < 0) dds_fatal("创建DataWriter失败");// 5. 循环发送消息printf("发布者启动,发送消息(按Ctrl+C退出)...\n");for (int i = 0; ; i++) {snprintf(msg.content, sizeof(msg.content), "Hello from Process 1, count=%d", i);ret = dds_write(writer, &msg);if (ret != DDS_RETCODE_OK) printf("发送失败: %d\n", ret);sleep(1); // 每秒发送一次}// 清理资源(实际中通过信号处理触发)dds_delete(writer);dds_delete(publisher);dds_delete(topic);dds_delete(participant);return 0;
}
步骤4:订阅者代码(subscriber.c)
#include "Chat.h"
#include <dds/dds.h>
#include <stdio.h>// 数据接收回调函数
void on_data_available(dds_entity_t reader, void *arg) {Chat_Message msg;dds_sample_info_t info;dds_return_t ret;// 读取数据ret = dds_take(reader, &msg, &info, 1, 1);if (ret > 0 && info.valid_data) {printf("收到消息 [发送者%d]: %s\n", msg.sender_id, msg.content);}
}int main(int argc, char**argv) {dds_entity_t participant, topic, subscriber, reader;dds_listener_t *listener;// 1. 创建参与者(域ID=0,与发布者一致)participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);if (participant < 0) dds_fatal("创建参与者失败");// 2. 创建主题(与发布者同名)topic = dds_create_topic(participant, &Chat_Message_desc, "ChatTopic", NULL, NULL);if (topic < 0) dds_fatal("创建主题失败");// 3. 创建订阅者subscriber = dds_create_subscriber(participant, NULL, NULL);if (subscriber < 0) dds_fatal("创建订阅者失败");// 4. 创建数据读取者(注册回调)listener = dds_listener_create(NULL);listener->on_data_available = on_data_available; // 数据到达时触发reader = dds_create_reader(subscriber, topic, NULL, listener);if (reader < 0) dds_fatal("创建DataReader失败");// 5. 等待消息(阻塞)printf("订阅者启动,等待消息(按Ctrl+C退出)...\n");dds_sleepfor(DDS_SECS(3600)); // 休眠1小时// 清理资源dds_listener_delete(listener);dds_delete(reader);dds_delete(subscriber);dds_delete(topic);dds_delete(participant);return 0;
}
步骤5:编译与运行
  1. 安装CycloneDDS(参考官方文档)。
  2. 编译代码(链接CycloneDDS库):
    gcc publisher.c Chat.c -o publisher -lcyclonedds
    gcc subscriber.c Chat.c -o subscriber -lcyclonedds
    
  3. 多主机运行:
    • 在主机A运行发布者:./publisher
    • 在主机B运行订阅者:./subscriber(确保同一网络,组播可用,域ID一致)

总结

CycloneDDS通过SPDP/SEDP实现跨主机/多进程的自动服务发现,基于发布-订阅模式简化分布式通信。核心要点:

  • 同一域ID是节点互发现的前提;
  • 主题是通信的核心标识,数据类型需一致;
  • 多进程管理需关注资源创建/销毁和QoS配置。

通过上述示例,可快速实现分布式系统的数据共享。

http://www.dtcms.com/a/503440.html

相关文章:

  • Java基础语法—类型转换、表达式的自动类型提升
  • CentOS8无法使用sudo提权
  • 软件工程师招聘信息网站数据库对网站开发的作用
  • Python核心数据结构与函数编程
  • Spring Boot 3零基础教程,WEB 开发 内容协商 接口返回 YAML 格式的数据 笔记35
  • 网站编程学北京上海网站建设公司
  • 查询土耳其公司商业登记册(工商报告),可以获取什么信息?
  • ip反查域名
  • 把AI“撒”进农田:基于极值量化与状态机的1KB边缘灌溉决策树
  • 代码随想录 404.左叶子之和
  • 《3D可交互道具开发痛点解决:轻量化建模与解耦式逻辑实践》
  • 中铁雄安建设有限公司网站简述商务网站建设的步骤
  • 《3D开放世界地形开发:动态LOD与智能融合的轻量化实战路径》
  • 兽装定制网站wordpress商店安装
  • Redis(70)分布式锁的超时机制如何实现?
  • 自学网站免费晋中seo排名
  • WPF 联合 Web 开发调试流程梳理(基于 Microsoft.Web.WebView2)
  • 最簡實時性操作系統之任務鏈表
  • LeetCode:886. 可能的二分法
  • 【Linux】应用层协议http
  • cms网站后台上传图片提示图片类型错误但是类型是正确的服装网站建设效果
  • C程序中的数组与指针共生关系
  • 网站的建立步骤网站ip地址大全
  • 每日Reddit AI信息汇总 10.19
  • k8s(九)安全机制
  • 中国建设银行陕西分行官方网站微网站建设使用程序
  • 福州网站优化手机网站设计案例
  • Orleans 序列化、Actor Placement 和 Actor 调用详细分析
  • Java Object类及包装类
  • 《算法通关指南---C++编程篇(4)》