物联网实战:多语言(Java、Go、Rust、C++、C#、Rust)设备接入与数据处理
Spring Boot 物联网设备接入与数据处理实例
物联网(IoT)设备接入与数据处理是Spring Boot的常见应用场景之一。以下是一个完整的实例,涵盖设备接入、数据传输、数据处理和存储等关键环节。
设备接入
物联网设备通常通过MQTT、HTTP或WebSocket等协议接入系统。MQTT是物联网领域最常用的轻量级协议。
// MQTT配置类
@Configuration
public class MqttConfig {@Value("${mqtt.broker}")private String broker;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] {broker});factory.setConnectionOptions(options);return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(), "topic1");adapter.setOutputChannel(mqttInputChannel());return adapter;}
}
数据处理
设备数据通常以JSON格式传输,需要反序列化为Java对象进行处理。
@Service
public class DeviceDataService {@Autowiredprivate DeviceDataRepository repository;@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(Message<?> message) {String payload = message.getPayload().toString();DeviceData data = parseData(payload);processData(data);repository.save(data);}private DeviceData parseData(String json) {ObjectMapper mapper = new ObjectMapper();try {return mapper.readValue(json, DeviceData.class);} catch (IOException e) {throw new RuntimeException("数据解析失败", e);}}private void processData(DeviceData data) {// 数据校验、业务逻辑处理if(data.getTemperature() > 100) {alertService.sendAlert("高温警报", data.getDeviceId());}}
}
数据存储
使用Spring Data JPA持久化设备数据。
@Entity
public class DeviceData {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String deviceId;private double temperature;private double humidity;private LocalDateTime timestamp;// getters and setters
}public interface DeviceDataRepository extends JpaRepository<DeviceData, Long> {List<DeviceData> findByDeviceId(String deviceId);
}
API接口
提供REST API供前端或其他系统查询设备数据。
@RestController
@RequestMapping("/api/devices")
public class DeviceController {@Autowiredprivate DeviceDataService dataService;@GetMapping("/{deviceId}/data")public ResponseEntity<List<DeviceData>> getDeviceData(@PathVariable String deviceId,@RequestParam(required = false) LocalDateTime start,@RequestParam(required = false) LocalDateTime end) {List<DeviceData> data = dataService.getDataByDeviceAndTime(deviceId, start, end);return ResponseEntity.ok(data);}
}
实时监控
使用WebSocket实现设备数据的实时监控。
@Controller
public class WebSocketController {@Autowiredprivate SimpMessagingTemplate template;@Scheduled(fixedRate = 5000)public void sendDeviceUpdates() {List<DeviceStatus> statuses = deviceService.getAllDeviceStatuses();template.convertAndSend("/topic/status", statuses);}
}
安全配置
物联网系统需要严格的安全控制。
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().antMatchers("/api/public/**").permitAll().antMatchers("/api/devices/**").hasRole("ADMIN").anyRequest().authenticated().and().httpBasic();}
}
应用配置
# application.properties
mqtt.broker=tcp://iot.eclipse.org:1883
spring.datasource.url=jdbc:mysql://localhost:3306/iot_db
spring.datasource.username=root
spring.datasource.password=password
spring.jpa.hibernate.ddl-auto=update
以上实例展示了Spring Boot在物联网设备接入与数据处理中的典型应用,包括设备连接、数据处理、存储和展示等关键环节。实际项目中可能需要根据具体需求进行调整和扩展。
物联网设备接入与数据处理实例(Go语言实现)
以下是一个基于Go语言的物联网设备接入与数据处理的完整实例,包含设备接入、数据解析、存储及简单分析的核心流程。
设备接入(MQTT协议)
使用Eclipse Paho
库实现MQTT客户端,接收设备上报的数据:
package mainimport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""os""os/signal""syscall"
)var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received: %s from topic: %s\n", msg.Payload(), msg.Topic())// 调用数据处理函数processDeviceData(msg.Payload())
}func main() {opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883")opts.SetClientID("go_iot_server")opts.SetDefaultPublishHandler(messageHandler)client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}if token := client.Subscribe("iot/device/data", 1, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// 保持连接sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanclient.Unsubscribe("iot/device/data")client.Disconnect(250)
}
数据处理(JSON解析)
假设设备上报的数据为JSON格式:
type DeviceData struct {DeviceID string `json:"device_id"`Timestamp int64 `json:"timestamp"`Temp float32 `json:"temperature"`Humidity float32 `json:"humidity"`
}func processDeviceData(raw []byte) {var data DeviceDataif err := json.Unmarshal(raw, &data); err != nil {fmt.Println("JSON decode error:", err)return}// 数据校验if data.Temp > 100 || data.Humidity > 100 {fmt.Println("Invalid sensor data")return}// 存入数据库saveToDB(data)
}
数据存储(SQLite)
使用go-sqlite3
进行本地存储:
import _ "github.com/mattn/go-sqlite3"func saveToDB(data DeviceData) {db, err := sql.Open("sqlite3", "./iot.db")if err != nil {fmt.Println("DB open error:", err)return}defer db.Close()stmt, _ := db.Prepare(`INSERT INTO device_data (device_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)`)_, err = stmt.Exec(data.DeviceID, data.Timestamp, data.Temp, data.Humidity)if err != nil {fmt.Println("DB insert error:", err)}
}
数据分析(简单聚合)
计算指定设备的平均温度:
func getAvgTemperature(deviceID string) float32 {db, _ := sql.Open("sqlite3", "./iot.db")defer db.Close()var avg float32row := db.QueryRow(`SELECT AVG(temperature) FROM device_data WHERE device_id = ?`, deviceID)row.Scan(&avg)return avg
}
扩展建议
- 协议扩展:支持CoAP、HTTP等协议接入
- 数据管道:结合Kafka或RabbitMQ实现高吞吐量处理
- 边缘计算:在网关层使用Go实现数据预处理
- 可视化:集成Grafana通过API展示数据
关键依赖
github.com/eclipse/paho.mqtt.golang # MQTT客户端
github.com/mattn/go-sqlite3 # SQLite驱动
encoding/json # JSON处理
该实例展示了从设备接入到数据处理的完整链路,实际部署时需要根据具体场景调整错误处理、安全认证和性能优化策略。
物联网设备接入与数据处理实例(Python实现)
物联网设备接入与数据处理通常涉及设备连接、数据采集、传输、存储和分析等环节。以下是一个基于Python的完整实现示例,涵盖MQTT协议通信、数据存储和简单分析。
MQTT协议实现设备接入
安装MQTT客户端库:
pip install paho-mqtt
模拟设备发布数据:
import paho.mqtt.client as mqtt
import json
import timebroker = "mqtt.eclipseprojects.io" # 公共测试服务器
topic = "iot/sensor/temperature"def on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, 1883, 60)while True:payload = {"device_id": "sensor_001","value": round(25 + (5 * (0.5 - random.random())), 2),"timestamp": int(time.time())}client.publish(topic, json.dumps(payload))time.sleep(5)
服务器端订阅数据:
def on_message(client, userdata, msg):data = json.loads(msg.payload.decode())print(f"Received: {data}")client = mqtt.Client()
client.connect(broker, 1883, 60)
client.subscribe("iot/sensor/#")
client.on_message = on_message
client.loop_forever()
数据存储方案
使用SQLite进行本地存储:
import sqlite3def init_db():conn = sqlite3.connect('iot_data.db')c = conn.cursor()c.execute('''CREATE TABLE IF NOT EXISTS sensor_data(id INTEGER PRIMARY KEY AUTOINCREMENT,device_id TEXT,value REAL,timestamp INTEGER)''')conn.commit()conn.close()def save_data(data):conn = sqlite3.connect('iot_data.db')c = conn.cursor()c.execute("INSERT INTO sensor_data (device_id, value, timestamp) VALUES (?, ?, ?)",(data['device_id'], data['value'], data['timestamp']))conn.commit()conn.close()
数据分析处理
使用Pandas进行数据分析:
import pandas as pd
from datetime import datetimedef analyze_data():conn = sqlite3.connect('iot_data.db')df = pd.read_sql_query("SELECT * FROM sensor_data", conn)conn.close()df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')df.set_index('datetime', inplace=True)# 按小时聚合平均值hourly_avg = df.resample('H')['value'].mean()print(hourly_avg)# 异常值检测mean = df['value'].mean()std = df['value'].std()anomalies = df[(df['value'] > mean + 2*std) | (df['value'] < mean - 2*std)]print(f"Detected {len(anomalies)} anomalies")
可视化展示
使用Matplotlib进行数据可视化:
import matplotlib.pyplot as pltdef plot_data():conn = sqlite3.connect('iot_data.db')df = pd.read_sql_query("SELECT * FROM sensor_data", conn)conn.close()plt.figure(figsize=(12, 6))plt.plot(pd.to_datetime(df['timestamp'], unit='s'), df['value'])plt.title("Sensor Data Trend")plt.xlabel("Time")plt.ylabel("Temperature")plt.grid()plt.show()
完整数据处理流程整合
class Io