当前位置: 首页 > news >正文

MQTT数据集成

数据集成概述

思考问题:如何将一个物联网设备产生的数据传输到业务系统中?

MQTT

上述方案的弊端:较为麻烦

数据集成:为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
MQTT数据集成

EMQX的数据集成功能不单单可以快速的将物联网设备产生的数据传递到业务系统中,也可以和其他的外部数据系统进行集成,实现数据的快速传输。比如:从Kafka某一个主题中获取数据,然后将数据写入到Redis中。

工作原理介绍

sink和source组件

数据集成使用 SinkSource 组件与外部数据系统对接。

  1. Sink 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等。
  2. Source 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。

连接器

连接器负责与外部数据系统的连接,用户可以为不同的外部数据系统创建不同的连接器,一个连接器可以为多个 Sink/Source 提供连接。
连接器

规则引擎

规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配数据集成无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则引擎

规则的组成:规则描述了 数据来源数据处理过程处理结果去向 三个方面:

规则引擎

  1. 数据来源:规则的数据源可以是消息或事件,也可以是外部的数据系统 (source)。规则通过 SQL 的 FROM 子句指定数据的来源;
  2. 数据处理过程:规则通过 SQL 语句和函数来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据;
  3. 处理结果去向:规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新发布到另一个 MQTT 主题等。支持的动作如下:
    • 消息重发布:将结果发布到指定 MQTT 主题
    • 控制台输出:将结果输出到控制台或日志中
    • 发送到各类 Sink:将结果发送到外部数据系统中,如 MQTT 服务,Kafka,PostgreSQL 等

数据集成入门

需求:将客户端发往 t/a 主题中的消息输出到EMQX的控制台

具体步骤:

  1. 进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面
    数据集成规则

  2. 配置好规则source和sink组件以后,可以点击对规则进行调试
    配置source和sink组件

  3. 启动MQTTX客户端和服务端建立连接,并且向t/a主题发布消息,查看EMQX控制台的日志输出

    docker logs -f ${emqx-dockerName}
    

    日志输出

连接器使用(需要在EMQX企业版试验)

案例一

需求:将客户端发往 t/b 主题中的消息输出到EMQX的控制台和Redis中
前提:在Docker容器中部署Redis服务

具体步骤:

  1. 进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击 动作输出 添加一个sink
    添加sink
    备注:
    Redis的命令模板:

    HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
    
  2. 点击"+"号添加连接器
    添加连接器

  3. 添加完毕以后效果如下图所示
    添加完效果+
    也可以通过flow设计器查看对应的拓扑图
    flow设计器

  4. 使用MQTTX向’t/b’主题中发送消息进行测试,观察Redis中的数据状态

案例二

需求:将发往Kafka中的 test_mqtt_topic 主题中的消息输出到EMQX的控制台和Redis中
前提:通过docker容器部署 kafka服务 + eagle服务

具体步骤:

  1. 在Kafka中创建 test_mqtt_topic 主题
    kafka中建立主题

  2. 进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击 数据输入 添加一个source
    添加source

  3. 在添加source动作的页面,添加连接器"+"号,添加kafka的连接器
    添加kafka连接器

  4. 在创建规则的表单页面,点击动作输出添加控制台输出和Redis输出的sink
    Redis的命令模板如下所示:

    HSET kafka_mqtt:${topic} offset ${offset} value ${value}
    

    命令参数可以通过控制台输出进行确定。

  5. 向Kafka中的 test_mqtt_topic 发送消息

    具体步骤:

    1. 创建一个基于spring boot 3.0.5构建的web项目,加入如下依赖
    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
    
    1. 在application.yml文件中添加如下配置
    spring:kafka:producer:bootstrap-servers: xxx:9092
    
    1. 编写启动类
    @SpringBootApplication
    public class MqttKafkaApplication {public static void main(String[] args) {SpringApplication.run(MqttKafkaApplication.class , args) ;}}
    
    1. 编写测试类,发送消息
    @SpringBootTest(classes = MqttKafkaApplication.class)
    public class MqttKafkaProducerTest {@Autowiredprivate KafkaTemplate<String , String> kafkaTemplate;@Testpublic void sendMsg() {kafkaTemplate.send("test_mqtt_topic" , "mqtt kafka producer msg....") ;}}
    
  6. 发送消息进行测试,观察Redis中的数据状态:
    试验结果

SQL语法介绍

SQL 处理结果将以 JSON 形式 呈现在输出结果部分。SQL 处理结果中的所有字段都可以通过后续操作(内置操作或 Sink)以 ${key}的形式进行引用。

FROM、SELECT 和 WHERE 子句

规则的 SQL 语句基本格式为:

SELECT <字段名> FROM <主题> [WHERE <条件>]

举例:

## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":
SELECT a, b FROM "t/#"# 选取 username 为 'abc' 的终端发来的消息,输出结果为所有可用字段:
SELECT * FROM "#" WHERE username = 'abc'## 选取 clientid 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意 cid 变量是在 SELECT 语句中定义的,故可在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE cid = 'abc'## 选取 username 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientid、username 等) 仍然可以在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE username = 'abc'## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 既不是消息发布事件中的可用字段,又没有在 SELECT 语句中定义:
SELECT clientid as cid FROM "#" WHERE xyz = 'abc'

FROM 语句用于选择事件来源。如果是消息发布则填写消息的主题,如果是事件则填写对应的事件主题。

FOREACH、DO 和 INCASE 子句

语法介绍

如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE 语法。其基本格式为:

FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]FOREACH 子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型
DO 子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段
INCASE 子句用于对 DO 选择出来的某个字段施加条件过滤

举例:

FOREACHpayload.sensors as e ## 选择出的字段必须为数组类型
DO                       ## DO 相当于针对当前循环中对象的 SELECT 子句,决定最终的输出结果里的字段clientid,e.name as name,e.idx as idx
INCASE                  ## INCASE 相当于针对当前循环中对象的 WHERE 语句e.idx >= 1          ## 对DO选择出来的某个字段施加条件过滤
FROM "t/#"              ## 子句将规则挂载到某个主题上

案例演示

假设有 主题为 t/a 的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个 Object 的数组:

{"date": "2025-09-30","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}

示例 1:要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为 ${name}。即最终规则将会发出 3 条消息:

  1. 主题:sensors/0 内容:a
  2. 主题:sensors/1 内容:b
  3. 主题:sensors/2 内容:c

第一种方式:

  1. 动作类型:消息重新发布 (republish)

  2. 目的主题:sensors/${item.idx}

  3. 目的 QoS:0

  4. 消息内容模板:${item.name}

  5. 以及如下 SQL 语句:

    FOREACHpayload.sensors
    FROM "t/#"
    

    这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为(json):

    [{"name": "a","idx": 0},{"name": "b","idx": 1},{"name": "c","idx": 2}
    ]
    

    FOREACH 语句将会对于结果数组里的每个对象分别执行 消息重新发布 动作,所以将会执行重新发布动作 3 次。

    配置如下:
    SQL配置
    结果:往 t/a 主题发送消息后,以下主题收到消息:
    订阅收到消息

第二种方式:使用DO语句

  1. 动作类型:消息重新发布 (republish)

  2. 目的主题:sensors/${idx}

  3. 目的 QoS:0

  4. 消息内容模板:${name}

  5. 以及如下 SQL 语句:

    FOREACH
    payload.sensors as e
    DO 
    e.name as name , 
    e.idx as idx
    FROM "t/#"
    

    配置如下:
    SQL语句

示例 2:要求将 sensors 里的 idx 值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为
clientid=${clientid},name=${name},date=${date}。即最终规则将会发出 2 条消息:

  1. 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24
  2. 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24

要完成这个规则,我们需要配置如下动作:

  1. 动作类型:消息重新发布 (republish)
  2. 目的主题:sensors/${idx}
  3. 目的 QoS:0
  4. 消息内容模板:clientid=${clientid},name=${name},date=${date}

以及如下 SQL 语句:

FOREACHpayload.sensors as e
DOclientid , payload.date as date,e.idx as idx ,e.name as name
INCASEe.idx >= 1  
FROM "t/#"

配置如下:
SQL语法
最终结果:
订阅结果

CASE-WHEN 语法示例

CASE-WHEN语法和MySQL中的很类似,当满足某一个条件的时候,取指定的数据值,如下所示:

示例:将消息中 x 字段的值范围限定在 0~7 之间。

SELECTCASE WHEN payload.x < 0 THEN 0WHEN payload.x > 7 THEN 7ELSE payload.xEND as x
FROM "t/#"

假设消息为:

{"x": 8}

则上面的 SQL 输出为:

{"x": 7}

内置SQL函数

规则引擎提供了各种内置函数,您可以在 SQL 中使用这些函数实现基本的数据处理,包括 数学运算、数据类型判断、数据类型转换、字符串操作、映射操作、数组操作、哈希、压缩与解压缩、位操作、位序列操作、编解码 以及 日期与时间转换。

官网地址

举例说明:

FOREACHpayload.sensors as e
DO abs(-1) as abs,concat(e.name , 'xian') as address ,clientid ,e.name as name , e.idx as idx
INCASEe.idx >= 1
FROM "t/1"

向主题 t/1 发送如下消息:

{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}

观察控制台日志输出:
控制台输出

Webhook

Webhook简介

Webhook 提供了一种将 EMQX 客户端消息和事件集成到外部 HTTP 服务器的方法。

Webhook 是 EMQX 中开箱即用的功能。当客户端向特定主题发布消息,或执行特定操作时就会触发 Webhook,将事件数据和消息数据转发到预设的 HTTP 服务器中。
Webhook

Webhook演示

具体步骤:

  1. 定义http的请求接口

    @RestController
    @RequestMapping(value = "/webHook")
    public class WebHookController {@PostMapping(value = "/notify")public void notify(@RequestBody Map<Object , Object> body) {System.out.println(body);}
    }
    
  2. 在Dashboard中创建Webhook
    Webhook

  3. 通过MQTTX向a/1主题发布消息,观察http服务控制台输出


自此,本文分享到此结束!!!

http://www.dtcms.com/a/427204.html

相关文章:

  • 网站的会员系统怎么做电商小程序价格
  • Redis 进阶:跳出缓存局限!7 大核心场景的原理与工程化实践
  • 数据结构——LinkedList和链表
  • 一学一做专题网站哈尔滨黑大主题邮局
  • 基于类的四种设计模式
  • 用ChatGPT修改论文,如何在提升质量的同时降低AI检测风险?
  • 实验指导-基于阿里云Serverless应用引l擎SAE的服务部署迀移
  • 黔西县住房和城乡建设局网站个人网页制作方法
  • 长沙网站推广系统动态wordpress动态主题
  • 基于Matlab实现路径规划
  • 给定数据规模的ACM风格笔试题-子矩阵的最大累加和问题
  • 一站式服务图片wordpress博客整站源码
  • 明星粉丝网站怎么做建设银行手机银行官方网站下载安装
  • Spring boot中 限制 Mybatis SQL日志的大字段输出
  • SQL Server数据库事务日志问题的诊断与解法(从膨胀到瘦身)
  • Postgresql CLOG文件及其从库同步解析
  • wordpress 授权一个空间两个网站对seo
  • 正规的招聘网站永州市网站建设
  • 加强教育信息网站建设昆山建设工程安监站网站
  • EndoChat:面向内镜手术的基于事实依据的多模态大型语言模型|文献速递-文献分享
  • 零基础学AI大模型之ChatModel聊天模型与ChatPromptTemplate实战
  • 产生式规则对自然语言处理深层语义分析的影响与启示研究
  • web渗透之Python反序列化漏洞
  • 做办公用品网站工作计划黄页网站是什么
  • 论文阅读 (1) :Control Flow Management in Modern GPUs
  • 吉林省软环境建设网站网络营销属于哪个专业
  • iOS 26 系统流畅度检测 从视觉特效到帧率稳定的实战策略
  • 2025云栖大会,机器人商业时代降临
  • C++面向对象编程三大特性之一:多态
  • TapTalk | 圆桌实录:澳门综合度假村敏捷转型之旅,MongoDB + TapData 赋能酒店业卓越实践