当前位置: 首页 > news >正文

LCM中间件入门(2):LCM核心实现原理解析

文章目录

        • 一、`good()`函数:LCM实例状态检查的实现原理
          • 1. 实现逻辑
          • 2. 简化代码示例(C语言核心逻辑)
        • 二、`publish()`:向指定channel发送消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
        • 三、`subscribe()`:接收指定channel消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
      • 四、整体协同机制总结

LCM的核心功能基于C语言实现(C++接口为其封装),其底层通过 UDP网络通信消息序列化回调管理实现发布-订阅模式。以下从代码原理层面解析关键函数的工作机制。

一、good()函数:LCM实例状态检查的实现原理

good()函数用于判断LCM实例是否初始化成功,其核心是检查LCM内部关键资源的有效性。

1. 实现逻辑

LCM实例(lcm_t结构体)的核心成员包括:

  • 网络套接字(socket_fd):用于收发数据的UDP套接字;
  • 线程状态(thread_running):接收消息的后台线程是否启动;
  • 错误码(error):记录初始化或运行中的错误状态。

good()函数的本质是检查这些成员是否处于“可用状态”:

  • 套接字是否成功创建(socket_fd != -1);
  • 后台线程是否正常运行(针对需要异步接收的模式);
  • 无致命错误(error == 0)。
2. 简化代码示例(C语言核心逻辑)
// LCM实例结构体(简化)
typedef struct {int socket_fd;          // UDP套接字描述符int thread_running;     // 接收线程状态(1=运行,0=停止)int error;              // 错误码(0=无错误)// 其他成员:回调表、组播地址等
} lcm_t;// good()函数实现
int lcm_good(lcm_t *lcm) {return (lcm != NULL && lcm->socket_fd != -1 && lcm->thread_running && lcm->error == 0);
}

在C++接口中,lcm::LCM::good()是对上述C函数的封装,返回bool类型。

二、publish():向指定channel发送消息的原理

publish()的核心是将消息序列化为字节流,并通过UDP组播发送到与channel关联的网络地址,同时在数据包中嵌入channel标识。

1. 完整流程拆解
  1. 消息序列化
    根据.lcm文件生成的编解码函数(如example_temperature_t_pack()),将消息结构体转换为二进制字节流(解决跨平台数据格式差异)。

  2. channel与网络地址映射
    LCM默认将channel名称映射为UDP组播地址(239.255.x.y,其中x.y由channel名称的哈希值计算得出),确保同一channel的消息仅被订阅该channel的节点接收。

  3. 数据包封装
    构造LCM协议数据包,格式为:

    [4字节魔数] + [4字节消息长度] + [channel名称] + [序列化的消息数据]
    

    魔数(0x4C434D00,即"LCM\0")用于接收方识别LCM数据包。

  4. UDP发送
    通过LCM实例的套接字将数据包发送到channel对应的组播地址。

2. 简化代码示例(C++核心逻辑)
// C++ publish()接口
void LCM::publish(const std::string& channel, const void* data, size_t len) {if (!good()) return;  // 检查实例状态// 1. 计算channel对应的组播地址(基于哈希)struct sockaddr_in addr;lcm_channel_to_multicast(channel.c_str(), &addr);  // 内部哈希映射// 2. 封装LCM协议头uint8_t header[8];header[0] = 0x4C; header[1] = 0x43; header[2] = 0x4D; header[3] = 0x00;  // 魔数*(uint32_t*)(header + 4) = htonl(len);  // 消息长度(网络字节序)// 3. 拼接完整数据包:头 + channel + 消息数据std::vector<uint8_t> packet;packet.insert(packet.end(), header, header + 8);packet.insert(packet.end(), channel.begin(), channel.end());packet.push_back('\0');  // channel以空字符结尾packet.insert(packet.end(), (uint8_t*)data, (uint8_t*)data + len);// 4. 通过UDP发送到组播地址sendto(lcm->socket_fd, packet.data(), packet.size(), 0,(struct sockaddr*)&addr, sizeof(addr));
}
三、subscribe():接收指定channel消息的原理

subscribe()的核心是注册回调函数并与channel绑定,后台线程接收数据包后,根据channel查找对应的回调并触发执行。

1. 完整流程拆解
  1. 回调函数注册
    订阅时,LCM将channel名称消息类型回调函数存储在内部的回调表(哈希表,channel -> 回调列表)中。

  2. 后台接收线程
    LCM初始化时启动一个后台线程,循环从套接字读取UDP数据包:

    • 解析数据包,验证魔数和格式;
    • 提取channel名称和序列化的消息数据。
  3. 消息路由与反序列化
    根据解析出的channel名称,在回调表中查找对应的回调函数:

    • 若找到,调用自动生成的反序列化函数(如example_temperature_t_unpack()),将字节流转换为消息结构体;
    • 调用注册的回调函数,传入消息结构体。
  4. 线程安全处理
    回调函数的执行在后台线程中进行,若需在多线程环境中使用,需用户自行添加同步机制(如互斥锁)。

2. 简化代码示例(C++核心逻辑)
// 回调表结构(简化):channel -> 回调函数列表
typedef struct {std::unordered_map<std::string, std::vector<Callback>> callbacks;std::mutex mutex;  // 保护回调表的线程安全
} CallbackTable;// 订阅函数实现
void LCM::subscribe(const std::string& channel, void (*callback)(const ReceiveBuffer*, const std::string&, void*),void* userdata) {std::lock_guard<std::mutex> lock(callback_table.mutex);// 将回调函数注册到channel对应的列表中callback_table.callbacks[channel].emplace_back(callback, userdata);
}// 后台接收线程逻辑
void receive_thread(lcm_t* lcm) {while (lcm->thread_running) {uint8_t buffer[65536];  // UDP最大包长struct sockaddr_in sender;socklen_t sender_len = sizeof(sender);ssize_t n = recvfrom(lcm->socket_fd, buffer, sizeof(buffer), 0,(struct sockaddr*)&sender, &sender_len);if (n <= 0) continue;// 解析数据包:检查魔数、提取channel和消息数据if (buffer[0] != 0x4C || buffer[1] != 0x43 || buffer[2] != 0x4D || buffer[3] != 0x00)continue;  // 非LCM数据包,忽略uint32_t msg_len = ntohl(*(uint32_t*)(buffer + 4));std::string channel = (char*)(buffer + 8);  // channel以空字符结尾const uint8_t* msg_data = buffer + 8 + channel.size() + 1;// 查找回调并执行std::lock_guard<std::mutex> lock(callback_table.mutex);auto it = callback_table.callbacks.find(channel);if (it != callback_table.callbacks.end()) {for (auto& cb : it->second) {// 构造接收缓冲区,调用回调ReceiveBuffer rbuf{msg_data, msg_len};cb.function(&rbuf, channel, cb.userdata);}}}
}

四、整体协同机制总结

工作原理交互图如下:

PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化错误码为0调用good()返回状态 (socket有效+线程运行+无错误)初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化回调表(空)调用good()返回状态 (socket有效+线程运行+无错误)订阅通道THERMOMETER调用subscribe("THERMOMETER", 回调函数)1. 加锁保护回调表2. 将回调函数注册到"THERMOMETER"条目下订阅完成发布消息到通道构造Temperature消息(温度值、时间戳等)调用publish("THERMOMETER", 消息)publish()内部处理1. 消息序列化(调用自动生成的pack函数)2. 计算channel哈希映射到组播地址(239.255.x.y)3. 封装LCM数据包(魔数+长度+channel+序列化数据)通过UDP发送数据包到组播地址接收UDP数据包(含"THERMOMETER"标识)接收线程处理1. 验证魔数和数据包格式2. 提取channel("THERMOMETER")和序列化数据3. 消息反序列化(调用自动生成的unpack函数)4. 查找回调表中"THERMOMETER"对应的回调函数触发回调函数(传入反序列化后的消息)执行回调逻辑(打印温度数据等)PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber
  1. 初始化阶段lcm_t实例创建套接字、启动接收线程,`good(
  2. )`验证这些资源是否就绪。
  3. 发布阶段publish()将消息序列化,通过channel映射的组播地址发送UDP包,嵌入channel标识。
  4. 订阅阶段subscribe()将回调注册到channel对应的哈希表;接收线程解析UDP包,根据channel查找回调,反序列化消息后触发执行。

这种设计实现了无中心节点的轻量化通信,通过UDP组播和哈希映射保证低延迟,适用于实时系统中基于channel的高效数据交互。

http://www.dtcms.com/a/307998.html

相关文章:

  • 牛客练习赛142 第四次忍界大战 并查集
  • 永磁同步电机无速度算法--直流误差抑制自适应二阶反推观测器
  • Gemini Fullstack LangGraph Quickstart(DeepSeek+Tavily版本)
  • 【React】diff 算法
  • Elasticsearch 索引及节点级别增删改查技术
  • 基于单片机胎压检测/锅炉蒸汽压力/气压检测系统
  • VBA代码解决方案第二十七讲:禁用EXCEL工作簿右上角的关闭按钮
  • 分布式ID方案(标记)
  • TDengine oss数据的导出和导入
  • 大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
  • 学习Redis源码路径
  • 开发避坑短篇(12):达梦数据库TIMESTAMP字段日期区间查询实现方案
  • 打破数据质量瓶颈:用n8n实现30秒专业数据质量报告自动化
  • 【数据结构初阶】--二叉树选择题专辑
  • 《Spring Boot应用工程化提升:多模块、脚手架与DevTools》
  • leetcode 2683. 相邻值的按位异或 中等
  • Python实现调整矩阵维度: view
  • 今日矩阵系列
  • mac环境配置rust
  • 机器人系统对接线索平台好处
  • 前端工程化包管理器:从npm基础到nvm多版本管理实战
  • HCIP面试第一章内容总结
  • 老旧远程控制管理模块(物联网设备)渗透实战:SNMP泄露+内核提权攻击链深度解析
  • java web 通过 servlet 给前端设置编码格式
  • 2025年物联网新趋势:格行随身WiFi的模块化架构与低延迟优化
  • AI Agent 的 10 种应用场景:物联网、RAG 与灾难响应
  • 【前端知识】JS单线程模型深入解析
  • 第 10 章 文件和异常
  • 机器人学和自动化领域中的路径规划方法
  • 在幸狐RV1106板子上用gcc14.2本地编译安装samba-4.22.3服务器,并且支持XP系统访问共享文件夹