MQTT数据集成
数据集成概述
思考问题:如何将一个物联网设备产生的数据传输到业务系统中?
上述方案的弊端:较为麻烦
数据集成:为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
EMQX的数据集成功能不单单可以快速的将物联网设备产生的数据传递到业务系统中,也可以和其他的外部数据系统进行集成,实现数据的快速传输。比如:从Kafka某一个主题中获取数据,然后将数据写入到Redis中。
工作原理介绍
sink和source组件
数据集成使用 Sink
与 Source
组件与外部数据系统对接。
- Sink 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等。
- Source 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。
连接器
连接器负责与外部数据系统的连接,用户可以为不同的外部数据系统创建不同的连接器,一个连接器可以为多个 Sink/Source 提供连接。
规则引擎
规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配数据集成无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则的组成:规则描述了 数据来源、数据处理过程、处理结果去向 三个方面:
- 数据来源:规则的数据源可以是消息或事件,也可以是外部的数据系统 (source)。规则通过 SQL 的 FROM 子句指定数据的来源;
- 数据处理过程:规则通过
SQL
语句和函数
来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据; - 处理结果去向:规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新发布到另一个 MQTT 主题等。支持的动作如下:
- 消息重发布:将结果发布到指定 MQTT 主题
- 控制台输出:将结果输出到控制台或日志中
- 发送到各类 Sink:将结果发送到外部数据系统中,如 MQTT 服务,Kafka,PostgreSQL 等
数据集成入门
需求:将客户端发往 t/a
主题中的消息输出到EMQX的控制台
具体步骤:
-
进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面
-
配置好规则source和sink组件以后,可以点击对规则进行调试
-
启动MQTTX客户端和服务端建立连接,并且向
t/a
主题发布消息,查看EMQX控制台的日志输出docker logs -f ${emqx-dockerName}
连接器使用(需要在EMQX企业版试验)
案例一
需求:将客户端发往 t/b
主题中的消息输出到EMQX的控制台和Redis中
前提:在Docker容器中部署Redis服务
具体步骤:
-
进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击 动作输出 添加一个sink
备注:
Redis的命令模板:HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
-
点击"+"号添加连接器
-
添加完毕以后效果如下图所示
也可以通过flow设计器查看对应的拓扑图
-
使用MQTTX向’t/b’主题中发送消息进行测试,观察Redis中的数据状态
案例二
需求:将发往Kafka中的 test_mqtt_topic
主题中的消息输出到EMQX的控制台和Redis中
前提:通过docker容器部署 kafka服务 + eagle服务
具体步骤:
-
在Kafka中创建
test_mqtt_topic
主题
-
进入到Dashboard中,依次点击"集成" ----> “规则” ----> “创建” 进入到创建规则的表单页面,点击 数据输入 添加一个source
-
在添加source动作的页面,添加连接器"+"号,添加kafka的连接器
-
在创建规则的表单页面,点击
动作输出
添加控制台输出和Redis输出的sink
Redis的命令模板如下所示:HSET kafka_mqtt:${topic} offset ${offset} value ${value}
命令参数可以通过控制台输出进行确定。
-
向Kafka中的
test_mqtt_topic
发送消息具体步骤:
- 创建一个基于spring boot 3.0.5构建的web项目,加入如下依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
- 在application.yml文件中添加如下配置
spring:kafka:producer:bootstrap-servers: xxx:9092
- 编写启动类
@SpringBootApplication public class MqttKafkaApplication {public static void main(String[] args) {SpringApplication.run(MqttKafkaApplication.class , args) ;}}
- 编写测试类,发送消息
@SpringBootTest(classes = MqttKafkaApplication.class) public class MqttKafkaProducerTest {@Autowiredprivate KafkaTemplate<String , String> kafkaTemplate;@Testpublic void sendMsg() {kafkaTemplate.send("test_mqtt_topic" , "mqtt kafka producer msg....") ;}}
-
发送消息进行测试,观察Redis中的数据状态:
SQL语法介绍
SQL 处理结果将以 JSON 形式 呈现在输出结果部分。SQL 处理结果中的所有字段都可以通过后续操作(内置操作或 Sink)以 ${key}
的形式进行引用。
FROM、SELECT 和 WHERE 子句
规则的 SQL 语句基本格式为:
SELECT <字段名> FROM <主题> [WHERE <条件>]
举例:
## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":
SELECT a, b FROM "t/#"# 选取 username 为 'abc' 的终端发来的消息,输出结果为所有可用字段:
SELECT * FROM "#" WHERE username = 'abc'## 选取 clientid 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意 cid 变量是在 SELECT 语句中定义的,故可在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE cid = 'abc'## 选取 username 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientid、username 等) 仍然可以在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE username = 'abc'## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 既不是消息发布事件中的可用字段,又没有在 SELECT 语句中定义:
SELECT clientid as cid FROM "#" WHERE xyz = 'abc'
FROM 语句用于选择事件来源。如果是消息发布则填写消息的主题,如果是事件则填写对应的事件主题。
FOREACH、DO 和 INCASE 子句
语法介绍
如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE
语法。其基本格式为:
FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]FOREACH 子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型
DO 子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段
INCASE 子句用于对 DO 选择出来的某个字段施加条件过滤
举例:
FOREACHpayload.sensors as e ## 选择出的字段必须为数组类型
DO ## DO 相当于针对当前循环中对象的 SELECT 子句,决定最终的输出结果里的字段clientid,e.name as name,e.idx as idx
INCASE ## INCASE 相当于针对当前循环中对象的 WHERE 语句e.idx >= 1 ## 对DO选择出来的某个字段施加条件过滤
FROM "t/#" ## 子句将规则挂载到某个主题上
案例演示
假设有 主题为 t/a
的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个 Object 的数组:
{"date": "2025-09-30","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}
示例 1:要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为 ${name}
。即最终规则将会发出 3 条消息:
- 主题:sensors/0 内容:a
- 主题:sensors/1 内容:b
- 主题:sensors/2 内容:c
第一种方式:
-
动作类型:消息重新发布 (republish)
-
目的主题:sensors/${item.idx}
-
目的 QoS:0
-
消息内容模板:${item.name}
-
以及如下 SQL 语句:
FOREACHpayload.sensors FROM "t/#"
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为(json):
[{"name": "a","idx": 0},{"name": "b","idx": 1},{"name": "c","idx": 2} ]
FOREACH 语句将会对于结果数组里的每个对象分别执行 消息重新发布 动作,所以将会执行重新发布动作 3 次。
配置如下:
结果:往t/a
主题发送消息后,以下主题收到消息:
第二种方式:使用DO语句
-
动作类型:消息重新发布 (republish)
-
目的主题:sensors/${idx}
-
目的 QoS:0
-
消息内容模板:${name}
-
以及如下 SQL 语句:
FOREACH payload.sensors as e DO e.name as name , e.idx as idx FROM "t/#"
配置如下:
示例 2:要求将 sensors 里的 idx
值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为
clientid=${clientid},name=${name},date=${date}
。即最终规则将会发出 2 条消息:
- 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24
- 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24
要完成这个规则,我们需要配置如下动作:
- 动作类型:消息重新发布 (republish)
- 目的主题:sensors/${idx}
- 目的 QoS:0
- 消息内容模板:
clientid=${clientid},name=${name},date=${date}
以及如下 SQL 语句:
FOREACHpayload.sensors as e
DOclientid , payload.date as date,e.idx as idx ,e.name as name
INCASEe.idx >= 1
FROM "t/#"
配置如下:
最终结果:
CASE-WHEN 语法示例
CASE-WHEN语法和MySQL中的很类似,当满足某一个条件的时候,取指定的数据值,如下所示:
示例:将消息中 x 字段的值范围限定在 0~7 之间。
SELECTCASE WHEN payload.x < 0 THEN 0WHEN payload.x > 7 THEN 7ELSE payload.xEND as x
FROM "t/#"
假设消息为:
{"x": 8}
则上面的 SQL 输出为:
{"x": 7}
内置SQL函数
规则引擎提供了各种内置函数,您可以在 SQL 中使用这些函数实现基本的数据处理,包括 数学运算、数据类型判断、数据类型转换、字符串操作、映射操作、数组操作、哈希、压缩与解压缩、位操作、位序列操作、编解码 以及 日期与时间转换。
官网地址
举例说明:
FOREACHpayload.sensors as e
DO abs(-1) as abs,concat(e.name , 'xian') as address ,clientid ,e.name as name , e.idx as idx
INCASEe.idx >= 1
FROM "t/1"
向主题 t/1
发送如下消息:
{"date": "2024-07-05","sensors": [{"name": "a", "idx":0},{"name": "b", "idx":1},{"name": "c", "idx":2}]
}
观察控制台日志输出:
Webhook
Webhook简介
Webhook 提供了一种将 EMQX 客户端消息和事件集成到外部 HTTP 服务器的方法。
Webhook 是 EMQX 中开箱即用的功能。当客户端向特定主题发布消息,或执行特定操作时就会触发 Webhook,将事件数据和消息数据转发到预设的 HTTP 服务器中。
Webhook演示
具体步骤:
-
定义http的请求接口
@RestController @RequestMapping(value = "/webHook") public class WebHookController {@PostMapping(value = "/notify")public void notify(@RequestBody Map<Object , Object> body) {System.out.println(body);} }
-
在Dashboard中创建Webhook
-
通过MQTTX向
a/1
主题发布消息,观察http服务控制台输出
自此,本文分享到此结束!!!