当前位置: 首页 > news >正文

物联网实战:多语言(Java、Go、Rust、C++、C#、Rust)设备接入与数据处理

Spring Boot 物联网设备接入与数据处理实例

物联网(IoT)设备接入与数据处理是Spring Boot的常见应用场景之一。以下是一个完整的实例,涵盖设备接入、数据传输、数据处理和存储等关键环节。

设备接入

物联网设备通常通过MQTT、HTTP或WebSocket等协议接入系统。MQTT是物联网领域最常用的轻量级协议。

// MQTT配置类
@Configuration
public class MqttConfig {@Value("${mqtt.broker}")private String broker;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] {broker});factory.setConnectionOptions(options);return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(), "topic1");adapter.setOutputChannel(mqttInputChannel());return adapter;}
}
数据处理

设备数据通常以JSON格式传输,需要反序列化为Java对象进行处理。

@Service
public class DeviceDataService {@Autowiredprivate DeviceDataRepository repository;@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(Message<?> message) {String payload = message.getPayload().toString();DeviceData data = parseData(payload);processData(data);repository.save(data);}private DeviceData parseData(String json) {ObjectMapper mapper = new ObjectMapper();try {return mapper.readValue(json, DeviceData.class);} catch (IOException e) {throw new RuntimeException("数据解析失败", e);}}private void processData(DeviceData data) {// 数据校验、业务逻辑处理if(data.getTemperature() > 100) {alertService.sendAlert("高温警报", data.getDeviceId());}}
}
数据存储

使用Spring Data JPA持久化设备数据。

@Entity
public class DeviceData {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String deviceId;private double temperature;private double humidity;private LocalDateTime timestamp;// getters and setters
}public interface DeviceDataRepository extends JpaRepository<DeviceData, Long> {List<DeviceData> findByDeviceId(String deviceId);
}
API接口

提供REST API供前端或其他系统查询设备数据。

@RestController
@RequestMapping("/api/devices")
public class DeviceController {@Autowiredprivate DeviceDataService dataService;@GetMapping("/{deviceId}/data")public ResponseEntity<List<DeviceData>> getDeviceData(@PathVariable String deviceId,@RequestParam(required = false) LocalDateTime start,@RequestParam(required = false) LocalDateTime end) {List<DeviceData> data = dataService.getDataByDeviceAndTime(deviceId, start, end);return ResponseEntity.ok(data);}
}
实时监控

使用WebSocket实现设备数据的实时监控。

@Controller
public class WebSocketController {@Autowiredprivate SimpMessagingTemplate template;@Scheduled(fixedRate = 5000)public void sendDeviceUpdates() {List<DeviceStatus> statuses = deviceService.getAllDeviceStatuses();template.convertAndSend("/topic/status", statuses);}
}
安全配置

物联网系统需要严格的安全控制。

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().antMatchers("/api/public/**").permitAll().antMatchers("/api/devices/**").hasRole("ADMIN").anyRequest().authenticated().and().httpBasic();}
}
应用配置
# application.properties
mqtt.broker=tcp://iot.eclipse.org:1883
spring.datasource.url=jdbc:mysql://localhost:3306/iot_db
spring.datasource.username=root
spring.datasource.password=password
spring.jpa.hibernate.ddl-auto=update

以上实例展示了Spring Boot在物联网设备接入与数据处理中的典型应用,包括设备连接、数据处理、存储和展示等关键环节。实际项目中可能需要根据具体需求进行调整和扩展。

物联网设备接入与数据处理实例(Go语言实现)

以下是一个基于Go语言的物联网设备接入与数据处理的完整实例,包含设备接入、数据解析、存储及简单分析的核心流程。

设备接入(MQTT协议)

使用Eclipse Paho库实现MQTT客户端,接收设备上报的数据:

package mainimport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""os""os/signal""syscall"
)var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received: %s from topic: %s\n", msg.Payload(), msg.Topic())// 调用数据处理函数processDeviceData(msg.Payload())
}func main() {opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883")opts.SetClientID("go_iot_server")opts.SetDefaultPublishHandler(messageHandler)client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}if token := client.Subscribe("iot/device/data", 1, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// 保持连接sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanclient.Unsubscribe("iot/device/data")client.Disconnect(250)
}

数据处理(JSON解析)

假设设备上报的数据为JSON格式:

type DeviceData struct {DeviceID  string  `json:"device_id"`Timestamp int64   `json:"timestamp"`Temp      float32 `json:"temperature"`Humidity  float32 `json:"humidity"`
}func processDeviceData(raw []byte) {var data DeviceDataif err := json.Unmarshal(raw, &data); err != nil {fmt.Println("JSON decode error:", err)return}// 数据校验if data.Temp > 100 || data.Humidity > 100 {fmt.Println("Invalid sensor data")return}// 存入数据库saveToDB(data)
}
数据存储(SQLite)

使用go-sqlite3进行本地存储:

import _ "github.com/mattn/go-sqlite3"func saveToDB(data DeviceData) {db, err := sql.Open("sqlite3", "./iot.db")if err != nil {fmt.Println("DB open error:", err)return}defer db.Close()stmt, _ := db.Prepare(`INSERT INTO device_data (device_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)`)_, err = stmt.Exec(data.DeviceID, data.Timestamp, data.Temp, data.Humidity)if err != nil {fmt.Println("DB insert error:", err)}
}


数据分析(简单聚合)

计算指定设备的平均温度:

func getAvgTemperature(deviceID string) float32 {db, _ := sql.Open("sqlite3", "./iot.db")defer db.Close()var avg float32row := db.QueryRow(`SELECT AVG(temperature) FROM device_data WHERE device_id = ?`, deviceID)row.Scan(&avg)return avg
}


扩展建议
  1. 协议扩展:支持CoAP、HTTP等协议接入
  2. 数据管道:结合Kafka或RabbitMQ实现高吞吐量处理
  3. 边缘计算:在网关层使用Go实现数据预处理
  4. 可视化:集成Grafana通过API展示数据

关键依赖
github.com/eclipse/paho.mqtt.golang  # MQTT客户端
github.com/mattn/go-sqlite3          # SQLite驱动
encoding/json                        # JSON处理

该实例展示了从设备接入到数据处理的完整链路,实际部署时需要根据具体场景调整错误处理、安全认证和性能优化策略。

物联网设备接入与数据处理实例(Python实现)

物联网设备接入与数据处理通常涉及设备连接、数据采集、传输、存储和分析等环节。以下是一个基于Python的完整实现示例,涵盖MQTT协议通信、数据存储和简单分析。

MQTT协议实现设备接入

安装MQTT客户端库:

pip install paho-mqtt

模拟设备发布数据:

import paho.mqtt.client as mqtt
import json
import timebroker = "mqtt.eclipseprojects.io"  # 公共测试服务器
topic = "iot/sensor/temperature"def on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, 1883, 60)while True:payload = {"device_id": "sensor_001","value": round(25 + (5 * (0.5 - random.random())), 2),"timestamp": int(time.time())}client.publish(topic, json.dumps(payload))time.sleep(5)

服务器端订阅数据:

def on_message(client, userdata, msg):data = json.loads(msg.payload.decode())print(f"Received: {data}")client = mqtt.Client()
client.connect(broker, 1883, 60)
client.subscribe("iot/sensor/#")
client.on_message = on_message
client.loop_forever()

数据存储方案

使用SQLite进行本地存储:

import sqlite3def init_db():conn = sqlite3.connect('iot_data.db')c = conn.cursor()c.execute('''CREATE TABLE IF NOT EXISTS sensor_data(id INTEGER PRIMARY KEY AUTOINCREMENT,device_id TEXT,value REAL,timestamp INTEGER)''')conn.commit()conn.close()def save_data(data):conn = sqlite3.connect('iot_data.db')c = conn.cursor()c.execute("INSERT INTO sensor_data (device_id, value, timestamp) VALUES (?, ?, ?)",(data['device_id'], data['value'], data['timestamp']))conn.commit()conn.close()

数据分析处理

使用Pandas进行数据分析:

import pandas as pd
from datetime import datetimedef analyze_data():conn = sqlite3.connect('iot_data.db')df = pd.read_sql_query("SELECT * FROM sensor_data", conn)conn.close()df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')df.set_index('datetime', inplace=True)# 按小时聚合平均值hourly_avg = df.resample('H')['value'].mean()print(hourly_avg)# 异常值检测mean = df['value'].mean()std = df['value'].std()anomalies = df[(df['value'] > mean + 2*std) | (df['value'] < mean - 2*std)]print(f"Detected {len(anomalies)} anomalies")

可视化展示

使用Matplotlib进行数据可视化:

import matplotlib.pyplot as pltdef plot_data():conn = sqlite3.connect('iot_data.db')df = pd.read_sql_query("SELECT * FROM sensor_data", conn)conn.close()plt.figure(figsize=(12, 6))plt.plot(pd.to_datetime(df['timestamp'], unit='s'), df['value'])plt.title("Sensor Data Trend")plt.xlabel("Time")plt.ylabel("Temperature")plt.grid()plt.show()

完整数据处理流程整合
class Io
http://www.dtcms.com/a/264843.html

相关文章:

  • 嵌入式系统内核镜像相关(十)
  • 「日拱一码」015 机器学习常用库——scikit-learn
  • Deep semi-supervised learning for medical image segmentation: A review
  • 《解码人工智能:从理解到应用的变革之旅》
  • 当AR遇上深度学习:实时超声肾脏分割与测量技术全解析
  • Linux操作系统之文件(一):重识IO
  • FastAPI 安装使用教程
  • 通用编码器芯片 L1527产品介绍,低重码率,高安全性433解码芯片
  • 激活向量是什么
  • LCS4110R安全芯片防抄板原理
  • HTML初学者第二天
  • Spring-解决IDEA中无法创建JDK17一下的SpringBoot项目
  • 【计算机网络】补充
  • 04.Vue自定义组件制作详细指南
  • 【数据结构】排序算法:冒泡与快速
  • docker-compose编排saleor
  • 基于Apache POI实现百度POI分类快速导入PostgreSQL数据库实战
  • 1-RuoYi框架配置与启动
  • BlenderFBXExporter 导出fbx被修改问题
  • R Studio开发中记录
  • [IMX][UBoot] 08.启动流程 (4) - 平台后期初始化阶段 - board_init_r
  • 深入解析外观模式(Facade Pattern):简化复杂系统的优雅设计
  • 如何系统性评估运维自动化覆盖率:方法与关注重点
  • 拐点的可导性的图像区别
  • 回顾JAVA中的锁机制
  • 解决在Pom文件中写入依赖坐标后, 刷新Maven但是多次尝试都下载不下来
  • Maven工具学习使用(十三)——Maven Wrapper命令解析与使用
  • 告别 ifconfig:openEuler 网络配置的现代化之路
  • Linux 启动过程流程图--ARM版
  • 高速公路闲置土地资源化利用:广西浦北互通3MW分布式光伏监控实践