Linux操作系统:智能工业电表开发
智能工业电表开发文档(基于Linux操作系统)
1. 项目概述
智能工业电表是一款用于实时监测工业设备电流和电压的系统。它通过嵌入式Linux设备采集数据,并将数据上传到云端服务器。用户可以通过微信小程序查询设备的实时数据和历史数据,并进行充值操作。
2. 系统架构
2.1 系统架构图
+-------------------+ +------------------+ +-------------------+
| | | | | |
| 智能工业电表 |----->| 云服务器 |----->| 微信小程序 |
| (嵌入式Linux设备)| | (Linux服务器) | | (用户交互界面) |
| | | (数据存储与处理)| | |
+-------------------+ +------------------+ +-------------------+
2.2 技术栈
-
硬件:嵌入式Linux设备(如树莓派、BeagleBone等)、电流电压传感器模块
-
通信协议:MQTT协议(设备与服务器通信)
-
后端:Python + Flask(运行在Linux服务器上)
-
数据库:MySQL(运行在Linux服务器上)
-
前端:微信小程序(基于微信开发者工具)
3. 功能模块
3.1 功能模块表
模块名称 | 功能描述 |
---|---|
数据采集 | 通过电流电压传感器实时采集工业设备的电流和电压数据 |
数据上传 | 将采集到的数据通过MQTT协议上传到云服务器 |
数据存储 | 将上传的数据存储到MySQL数据库 |
数据查询 | 用户通过微信小程序查询设备的实时数据和历史数据 |
充值功能 | 用户通过微信小程序进行电表充值,并更新余额 |
用户管理 | 管理用户账号、设备绑定和权限控制 |
4. 数据库设计
4.1 数据库表结构
4.1.1 用户表(users
)
字段名 | 数据类型 | 描述 |
---|---|---|
id | INT (Primary Key) | 用户唯一标识 |
username | VARCHAR(50) | 用户名 |
password | VARCHAR(100) | 密码(加密存储) |
devices | JSON | 绑定的设备列表 |
4.1.2 设备表(devices
)
字段名 | 数据类型 | 描述 |
---|---|---|
id | INT (Primary Key) | 设备唯一标识 |
device_id | VARCHAR(50) | 设备编号 |
owner_id | INT | 所属用户ID |
balance | FLOAT | 当前余额 |
data | JSON | 历史数据记录 |
4.1.3 数据记录表(data_records
)
字段名 | 数据类型 | 描述 |
---|---|---|
id | INT (Primary Key) | 记录唯一标识 |
device_id | VARCHAR(50) | 设备编号 |
timestamp | DATETIME | 数据采集时间 |
voltage | FLOAT | 电压值 |
current | FLOAT | 电流值 |
5. 接口设计
5.1 后端接口
5.1.1 用户登录
-
URL:
/api/login
-
方法:
POST
-
请求参数:
{ "username": "user123", "password": "password123" }
-
返回值:
{ "status": "success", "token": "JWT_TOKEN" }
5.1.2 设备数据上传
-
URL:
/api/device/data
-
方法:
POST
-
请求参数:
{ "device_id": "device123", "voltage": 220.5, "current": 10.2 }
-
返回值:
{ "status": "success" }
5.1.3 查询设备数据
-
URL:
/api/device/data/:device_id
-
方法:
GET
-
返回值:
[ { "timestamp": "2024-01-01T12:00:00Z", "voltage": 220.5, "current": 10.2 }, ... ]
5.1.4 充值接口
-
URL:
/api/device/recharge/:device_id
-
方法:
POST
-
请求参数:
{ "amount": 100 }
-
返回值:
{ "status": "success", "new_balance": 200 }
6. 示例代码
6.1 嵌入式Linux设备端代码(Python + MQTT)
import paho.mqtt.client as mqtt
import time
import random
MQTT_BROKER = "mqtt.example.com"
MQTT_PORT = 1883
DEVICE_ID = "device123"
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT Broker")
def read_voltage():
return random.uniform(220.0, 230.0) # 模拟电压读数
def read_current():
return random.uniform(10.0, 15.0) # 模拟电流读数
client = mqtt.Client()
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT)
while True:
voltage = read_voltage()
current = read_current()
data = f"{{\"device_id\": \"{DEVICE_ID}\", \"voltage\": {voltage}, \"current\": {current}}}"
client.publish("device/data", data)
print(f"Published: {data}")
time.sleep(10) # 每10秒发送一次数据
6.2 后端代码(Python + Flask + MySQL)
from flask import Flask, request, jsonify
from flask_mysqldb import MySQL
import jwt
import datetime
app = Flask(__name__)
app.config["MYSQL_HOST"] = "localhost"
app.config["MYSQL_USER"] = "root"
app.config["MYSQL_PASSWORD"] = "password"
app.config["MYSQL_DB"] = "smartmeter"
mysql = MySQL(app)
@app.route("/api/login", methods=["POST"])
def login():
data = request.json
username = data["username"]
password = data["password"]
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, password FROM users WHERE username = %s", (username,))
user = cursor.fetchone()
if user and user[1] == password:
token = jwt.encode({"user_id": user[0], "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)}, "secret_key")
return jsonify({"status": "success", "token": token})
return jsonify({"status": "error", "message": "Invalid credentials"}), 401
@app.route("/api/device/data", methods=["POST"])
def upload_data():
data = request.json
device_id = data["device_id"]
voltage = data["voltage"]
current = data["current"]
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO data_records (device_id, timestamp, voltage, current) VALUES (%s, NOW(), %s, %s)", (device_id, voltage, current))
mysql.connection.commit()
return jsonify({"status": "success"})
@app.route("/api/device/data/<device_id>", methods=["GET"])
def get_data(device_id):
cursor = mysql.connection.cursor()
cursor.execute("SELECT timestamp, voltage, current FROM data_records WHERE device_id = %s ORDER BY timestamp DESC", (device_id,))
data = cursor.fetchall()
result = [{"timestamp": row[0].isoformat(), "voltage": row[1], "current": row[2]} for row in data]
return jsonify(result)
@app.route("/api/device/recharge/<device_id>", methods=["POST"])
def recharge(device_id):
amount = request.json["amount"]
cursor = mysql.connection.cursor()
cursor.execute("UPDATE devices SET balance = balance + %s WHERE device_id = %s", (amount, device_id))
mysql.connection.commit()
cursor.execute("SELECT balance FROM devices WHERE device_id = %s", (device_id,))
new_balance = cursor.fetchone()[0]
return jsonify({"status": "success", "new_balance": new_balance})
if __name__ == "__main__":
app.run(debug=True)
6.3 微信小程序代码
// 查询设备数据
wx.request({
url: "https://api.example.com/api/device/data/device123",
method: "GET",
success(res) {
console.log(res.data); // 数据数组
}
});
// 充值
wx.request({
url: "https://api.example.com/api/device/recharge/device123",
method: "POST",
data: { amount: 100 },
success(res) {
console.log(res.data); // 新余额
}
});