当前位置: 首页 > news >正文

实现自己的AI视频监控系统-第三章-信息的推送与共享2

文章目录

  • 前言
  • 一、Http方法:GET与POST
    • 1. GET方法
    • 2. POST方法
    • 3.案例代码
  • 二、socket自定义通信
    • 1.设计思路
    • 2.完整案例代码
  • 三、modbus通信
    • 1.Modbus协议简介
    • 2.Modbus工业设备通信模拟示例
    • 3.模拟Modbus通信案例
      • 服务器端代码 (modbus_server.py)
      • 客户端代码 (modbus_client.py)
      • 测试方法及说明
    • 注意事项
  • 总结
  • 下期预告


前言

上一小节,我们简要介绍了基本的通信手段以实现数据推送与共享的方式,本小节将基于使用场景和使用环境,实现稳定、可靠的数据传输。


一、Http方法:GET与POST

基于HTTP的GET和POST方法是推送报警数据的常用手段之一,通常用于系统与服务器之间、或不同服务之间的轻量级数据交互。二者在语义、使用方式和适用场景上具有明显区别,合理选择方法对保证数据传输的可靠性和系统的性能具有重要意义。

1. GET方法

GET方法主要用于从服务器获取资源,其设计初衷是幂等且安全的操作,即多次重复调用不应产生副作用。在报警数据推送中,GET方法可适用于以下场景:

  • 查询报警状态:定期向服务器请求当前报警状态,如 GET /alarms?status=active。

  • 获取历史报警记录:通过参数筛选历史数据,如 GET /alarms?start=2023-10-01&end=2123-10-01。

  • 轻量级触发操作:某些简单报警可通过GET请求触发(需注意安全性)。

特点:

  • 参数通过URL传递,可见性强但安全性低(需配合HTTPS加密)。

  • 长度受浏览器和服务器限制(通常不超过2048字符)。

  • 可缓存,适合重复获取相同数据的场景。

2. POST方法

POST方法主要用于向服务器提交数据,是非幂等的操作(多次调用可能产生不同结果)。在报警推送中,POST更适用于:

  • 主动推送报警信息:将报警详情(如时间、级别、描述)通过请求体发送至服务器,如 POST /alarms。

  • 批量上报数据:一次性传输多条报警记录(JSON/XML格式)。

  • 执行敏感操作:如确认报警、下发控制指令等。

特点:

  • 参数通过请求体传输,支持更大数据量(如JSON、二进制数据)。

  • 安全性更高(结合HTTPS可加密内容)。

  • 不可缓存,适合需要实时处理的场景。

3.案例代码

假设现在服务器端需要向客户端请求报警信息查询,同时客户端需要向服务器端推送相应的报警数据,我们就需要客户端与服务器端的双向通信。以下是使用Python Flask框架实现的服务器端和客户端参考代码:

  • 服务器端代码 (使用Flask)
from flask import Flask, request, jsonify
from datetime import datetime
import threading
import timeapp = Flask(__name__)# 存储接收到的报警数据
alarms = []@app.route('/alarms', methods=['GET'])
def get_alarms():"""处理GET请求:查询报警信息可选参数:- status: 过滤特定状态的报警(active/inactive)- start_time: 查询起始时间- end_time: 查询结束时间- limit: 返回结果数量限制"""# 获取查询参数status_filter = request.args.get('status')start_time = request.args.get('start_time')end_time = request.args.get('end_time')limit = int(request.args.get('limit', 10))# 过滤报警数据filtered_alarms = alarmsif status_filter:filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]if start_time:start_dt = datetime.fromisoformat(start_time)filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) >= start_dt]if end_time:end_dt = datetime.fromisoformat(end_time)filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) <= end_dt]# 限制返回数量result = filtered_alarms[:limit]return jsonify({"count": len(result),"alarms": result})@app.route('/alarms', methods=['POST'])
def receive_alarm():"""处理POST请求:接收报警数据推送期望JSON格式:{"device_id": "设备标识","alarm_type": "报警类型","severity": "紧急程度","description": "报警描述","timestamp": "发生时间(ISO格式)"}"""data = request.json# 数据验证required_fields = ['device_id', 'alarm_type', 'severity', 'description', 'timestamp']if not all(field in data for field in required_fields):return jsonify({"error": "Missing required fields"}), 400# 添加状态字段和接收时间alarm_data = {**data,"status": "active",  # 默认状态为活跃"received_at": datetime.now().isoformat(),"acknowledged": False  # 是否已被确认}# 存储报警数据alarms.append(alarm_data)# 这里可以添加额外的处理逻辑,如发送邮件、短信通知等print(f"Received alarm: {alarm_data}")return jsonify({"message": "Alarm received successfully", "id": len(alarms)-1}), 201@app.route('/alarms/<int:alarm_id>', methods=['PUT'])
def update_alarm(alarm_id):"""更新报警信息(如确认报警、修改状态等)"""if alarm_id < 0 or alarm_id >= len(alarms):return jsonify({"error": "Invalid alarm ID"}), 404data = request.json# 更新允许修改的字段if 'status' in data:alarms[alarm_id]['status'] = data['status']if 'acknowledged' in data:alarms[alarm_id]['acknowledged'] = data['acknowledged']if 'notes' in data:alarms[alarm_id]['notes'] = data['notes']return jsonify({"message": "Alarm updated successfully"})def run_server():"""启动服务器"""app.run(host='0.0.0.0', port=5000, debug=True)if __name__ == '__main__':run_server()
  • 客户端代码
import requests
import json
from datetime import datetime, timedelta
import timeclass AlarmClient:def __init__(self, base_url):self.base_url = base_urldef push_alarm(self, device_id, alarm_type, severity, description):"""向服务器推送报警信息"""alarm_data = {"device_id": device_id,"alarm_type": alarm_type,"severity": severity,"description": description,"timestamp": datetime.now().isoformat()}try:response = requests.post(f"{self.base_url}/alarms",json=alarm_data,headers={'Content-Type': 'application/json'},timeout=5)if response.status_code == 201:print("Alarm pushed successfully")return response.json()else:print(f"Failed to push alarm: {response.status_code} - {response.text}")return Noneexcept requests.exceptions.RequestException as e:print(f"Error pushing alarm: {e}")# 这里可以添加重试逻辑return Nonedef query_alarms(self, status=None, start_time=None, end_time=None, limit=10):"""查询报警信息"""params = {}if status:params['status'] = statusif start_time:params['start_time'] = start_timeif end_time:params['end_time'] = end_timeif limit:params['limit'] = limittry:response = requests.get(f"{self.base_url}/alarms",params=params,timeout=5)if response.status_code == 200:return response.json()else:print(f"Failed to query alarms: {response.status_code} - {response.text}")return Noneexcept requests.exceptions.RequestException as e:print(f"Error querying alarms: {e}")return Nonedef acknowledge_alarm(self, alarm_id, notes=None):"""确认报警"""update_data = {"acknowledged": True}if notes:update_data["notes"] = notestry:response = requests.put(f"{self.base_url}/alarms/{alarm_id}",json=update_data,headers={'Content-Type': 'application/json'},timeout=5)if response.status_code == 200:print("Alarm acknowledged successfully")return response.json()else:print(f"Failed to acknowledge alarm: {response.status_code} - {response.text}")return Noneexcept requests.exceptions.RequestException as e:print(f"Error acknowledging alarm: {e}")return None# 使用示例
if __name__ == "__main__":# 初始化客户端client = AlarmClient("http://localhost:5000")# 推送报警示例client.push_alarm(device_id="sensor_001",alarm_type="overheat",severity="high",description="Temperature exceeded threshold: 85°C")# 等待片刻让服务器处理time.sleep(1)# 查询活跃报警示例end_time = datetime.now().isoformat()start_time = (datetime.now() - timedelta(hours=1)).isoformat()result = client.query_alarms(status="active",start_time=start_time,end_time=end_time,limit=5)if result:print(f"Found {result['count']} active alarms:")for alarm in result['alarms']:print(f" - {alarm['device_id']}: {alarm['description']}")# 确认第一个报警if result['count'] > 0:client.acknowledge_alarm(0, "Handled by operator")
  • 代码说明
    • 服务器端功能:

      1. GET /alarms: 查询报警信息,支持状态、时间范围和数量过滤

      2. POST /alarms: 接收报警数据推送

      3. PUT /alarms/: 更新报警状态(如确认报警)

    • 客户端功能:

      1. push_alarm(): 推送报警数据到服务器

      2. query_alarms(): 从服务器查询报警信息

      3. acknowledge_alarm(): 确认特定报警

二、socket自定义通信

基于Socket的自定义通信提供了更灵活、实时的数据传输方式,特别适合需要低延迟高并发的报警数据处理场景。

1.设计思路

我们将设计一个简单的Socket服务器和客户端,使用TCP协议实现可靠的数据传输。通信协议设计如下:

  • 数据格式:JSON格式的消息体

  • 消息结构:

    • 类型(type): 标识消息类型(alarm_push, alarm_query, alarm_ack等)

    • 数据(data): 实际的消息内容

    • 时间戳(timestamp): 消息创建时间

  • 连接管理:保持长连接,支持心跳检测

2.完整案例代码

  • 服务器端代码 (socket_server.py)
import socket
import json
import threading
import time
from datetime import datetime
from typing import Dict, Listclass AlarmSocketServer:def __init__(self, host='localhost', port=9999):self.host = hostself.port = portself.socket = Noneself.clients = {}  # 存储客户端连接self.alarms = []   # 存储报警数据self.running = Falseself.next_alarm_id = 1def start(self):"""启动服务器"""self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind((self.host, self.port))self.socket.listen(5)self.running = Trueprint(f"Alarm socket server started on {self.host}:{self.port}")# 启动客户端接受线程accept_thread = threading.Thread(target=self.accept_clients)accept_thread.daemon = Trueaccept_thread.start()# 启动心跳检测线程heartbeat_thread = threading.Thread(target=self.heartbeat_check)heartbeat_thread.daemon = Trueheartbeat_thread.start()try:# 主线程保持运行while self.running:time.sleep(1)except KeyboardInterrupt:print("Shutting down server...")self.stop()def stop(self):"""停止服务器"""self.running = Falseif self.socket:self.socket.close()for client_id, client_info in self.clients.items():client_info['socket'].close()print("Server stopped")def accept_clients(self):"""接受客户端连接"""while self.running:try:client_socket, client_address = self.socket.accept()client_id = f"{client_address[0]}:{client_address[1]}"print(f"New client connected: {client_id}")# 存储客户端信息self.clients[client_id] = {'socket': client_socket,'address': client_address,'last_heartbeat': time.time()}# 为每个客户端启动处理线程client_thread = threading.Thread(target=self.handle_client, args=(client_id, client_socket))client_thread.daemon = Trueclient_thread.start()except Exception as e:if self.running:print(f"Error accepting client: {e}")def handle_client(self, client_id, client_socket):"""处理客户端消息"""buffer = ""while self.running:try:data = client_socket.recv(1024).decode('utf-8')if not data:breakbuffer += data# 处理可能的多条消息while '\n' in buffer:message, buffer = buffer.split('\n', 1)if message.strip():self.process_message(client_id, message.strip())except ConnectionResetError:breakexcept Exception as e:print(f"Error handling client {client_id}: {e}")break# 客户端断开连接if client_id in self.clients:del self.clients[client_id]client_socket.close()print(f"Client disconnected: {client_id}")def process_message(self, client_id, message_str):"""处理接收到的消息"""try:message = json.loads(message_str)msg_type = message.get('type')data = message.get('data', {})# 更新心跳时间if msg_type == 'heartbeat':self.clients[client_id]['last_heartbeat'] = time.time()returnprint(f"Received {msg_type} from {client_id}")# 处理不同类型的消息if msg_type == 'alarm_push':self.handle_alarm_push(data)elif msg_type == 'alarm_query':self.handle_alarm_query(client_id, data)elif msg_type == 'alarm_ack':self.handle_alarm_ack(client_id, data)else:print(f"Unknown message type: {msg_type}")except json.JSONDecodeError:print(f"Invalid JSON message from {client_id}: {message_str}")except Exception as e:print(f"Error processing message: {e}")def handle_alarm_push(self, data):"""处理报警推送"""# 验证必要字段required_fields = ['device_id', 'alarm_type', 'severity', 'description']if not all(field in data for field in required_fields):return# 创建报警记录alarm = {'id': self.next_alarm_id,'device_id': data['device_id'],'alarm_type': data['alarm_type'],'severity': data['severity'],'description': data['description'],'timestamp': datetime.now().isoformat(),'status': 'active','acknowledged': False}self.next_alarm_id += 1self.alarms.append(alarm)print(f"New alarm received: {alarm['device_id']} - {alarm['description']}")# 广播给所有客户端self.broadcast_message({'type': 'alarm_update','data': alarm,'timestamp': datetime.now().isoformat()})def handle_alarm_query(self, client_id, data):"""处理报警查询"""# 过滤条件status_filter = data.get('status')device_filter = data.get('device_id')severity_filter = data.get('severity')limit = data.get('limit', 10)# 过滤报警filtered_alarms = self.alarmsif status_filter:filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]if device_filter:filtered_alarms = [a for a in filtered_alarms if a['device_id'] == device_filter]if severity_filter:filtered_alarms = [a for a in filtered_alarms if a['severity'] == severity_filter]# 按时间倒序filtered_alarms.sort(key=lambda x: x['timestamp'], reverse=True)# 限制数量result = filtered_alarms[:limit]# 发送响应self.send_message(client_id, {'type': 'alarm_query_response','data': {'count': len(result),'total': len(filtered_alarms),'alarms': result},'timestamp': datetime.now().isoformat()})def handle_alarm_ack(self, client_id, data):"""处理报警确认"""alarm_id = data.get('alarm_id')notes = data.get('notes')# 查找报警alarm = next((a for a in self.alarms if a['id'] == alarm_id), None)if not alarm:self.send_message(client_id, {'type': 'error','data': {'message': f'Alarm {alarm_id} not found'},'timestamp': datetime.now().isoformat()})return# 更新报警状态alarm['acknowledged'] = Truealarm['status'] = 'inactive'if notes:alarm['notes'] = notesprint(f"Alarm {alarm_id} acknowledged by {client_id}")# 广播更新self.broadcast_message({'type': 'alarm_update','data': alarm,'timestamp': datetime.now().isoformat()})# 发送确认响应self.send_message(client_id, {'type': 'alarm_ack_response','data': {'success': True, 'alarm_id': alarm_id},'timestamp': datetime.now().isoformat()})def send_message(self, client_id, message):"""向指定客户端发送消息"""if client_id not in self.clients:returntry:message_str = json.dumps(message) + '\n'self.clients[client_id]['socket'].sendall(message_str.encode('utf-8'))except Exception as e:print(f"Error sending message to {client_id}: {e}")def broadcast_message(self, message):"""向所有客户端广播消息"""message_str = json.dumps(message) + '\n'disconnected_clients = []for client_id, client_info in self.clients.items():try:client_info['socket'].sendall(message_str.encode('utf-8'))except Exception as e:print(f"Error broadcasting to {client_id}: {e}")disconnected_clients.append(client_id)# 移除断开连接的客户端for client_id in disconnected_clients:if client_id in self.clients:del self.clients[client_id]def heartbeat_check(self):"""心跳检测,移除不活跃的客户端"""while self.running:time.sleep(30)  # 每30秒检查一次current_time = time.time()disconnected_clients = []for client_id, client_info in self.clients.items():if current_time - client_info['last_heartbeat'] > 60:  # 60秒无心跳print(f"Client {client_id} timeout, disconnecting")disconnected_clients.append(client_id)# 移除不活跃的客户端for client_id in disconnected_clients:if client_id in self.clients:self.clients[client_id]['socket'].close()del self.clients[client_id]if __name__ == '__main__':server = AlarmSocketServer('localhost', 9999)server.start()
  • 客户端代码 (socket_client.py)
import socket
import json
import threading
import time
from datetime import datetimeclass AlarmSocketClient:def __init__(self, host='localhost', port=9999):self.host = hostself.port = portself.socket = Noneself.connected = Falseself.callbacks = {}def connect(self):"""连接到服务器"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.connect((self.host, self.port))self.connected = Trueprint(f"Connected to server {self.host}:{self.port}")# 启动接收线程receive_thread = threading.Thread(target=self.receive_messages)receive_thread.daemon = Truereceive_thread.start()# 启动心跳线程heartbeat_thread = threading.Thread(target=self.send_heartbeat)heartbeat_thread.daemon = Trueheartbeat_thread.start()return Trueexcept Exception as e:print(f"Failed to connect: {e}")return Falsedef disconnect(self):"""断开连接"""self.connected = Falseif self.socket:self.socket.close()print("Disconnected from server")def send_message(self, message):"""发送消息到服务器"""if not self.connected:print("Not connected to server")return Falsetry:message_str = json.dumps(message) + '\n'self.socket.sendall(message_str.encode('utf-8'))return Trueexcept Exception as e:print(f"Error sending message: {e}")self.disconnect()return Falsedef receive_messages(self):"""接收服务器消息"""buffer = ""while self.connected:try:data = self.socket.recv(1024).decode('utf-8')if not data:breakbuffer += data# 处理可能的多条消息while '\n' in buffer:message, buffer = buffer.split('\n', 1)if message.strip():self.process_message(message.strip())except ConnectionResetError:breakexcept Exception as e:print(f"Error receiving message: {e}")breakself.disconnect()def process_message(self, message_str):"""处理接收到的消息"""try:message = json.loads(message_str)msg_type = message.get('type')data = message.get('data', {})print(f"Received message: {msg_type}")# 调用注册的回调函数if msg_type in self.callbacks:self.callbacks[msg_type](data)else:print(f"No handler for message type: {msg_type}")except json.JSONDecodeError:print(f"Invalid JSON message: {message_str}")def register_callback(self, msg_type, callback):"""注册消息回调函数"""self.callbacks[msg_type] = callbackdef send_heartbeat(self):"""发送心跳包"""while self.connected:time.sleep(15)  # 每15秒发送一次self.send_message({'type': 'heartbeat','timestamp': datetime.now().isoformat()})def push_alarm(self, device_id, alarm_type, severity, description):"""推送报警信息"""return self.send_message({'type': 'alarm_push','data': {'device_id': device_id,'alarm_type': alarm_type,'severity': severity,'description': description},'timestamp': datetime.now().isoformat()})def query_alarms(self, status=None, device_id=None, severity=None, limit=10):"""查询报警信息"""query_data = {'limit': limit}if status:query_data['status'] = statusif device_id:query_data['device_id'] = device_idif severity:query_data['severity'] = severityreturn self.send_message({'type': 'alarm_query','data': query_data,'timestamp': datetime.now().isoformat()})def acknowledge_alarm(self, alarm_id, notes=None):"""确认报警"""ack_data = {'alarm_id': alarm_id}if notes:ack_data['notes'] = notesreturn self.send_message({'type': 'alarm_ack','data': ack_data,'timestamp': datetime.now().isoformat()})# 使用示例
if __name__ == '__main__':client = AlarmSocketClient('localhost', 9999)# 注册消息处理回调def handle_alarm_update(data):print(f"Alarm update: {data['device_id']} - {data['description']}")def handle_alarm_query_response(data):print(f"Query response: {data['count']} alarms found")for alarm in data['alarms']:print(f"  - {alarm['device_id']}: {alarm['description']}")def handle_error(data):print(f"Error: {data['message']}")client.register_callback('alarm_update', handle_alarm_update)client.register_callback('alarm_query_response', handle_alarm_query_response)client.register_callback('error', handle_error)# 连接到服务器if client.connect():try:# 推送测试报警client.push_alarm(device_id="sensor_001",alarm_type="temperature",severity="high",description="Temperature exceeded threshold: 85°C")# 等待片刻time.sleep(1)# 查询报警client.query_alarms(status="active", limit=5)# 保持连接,等待服务器推送print("Waiting for messages (press Ctrl+C to exit)...")while True:time.sleep(1)except KeyboardInterrupt:print("Exiting...")finally:client.disconnect()

三、modbus通信

不论是基于HTTP的POST、GET请求还是基于Socket的实时通信,在工业设备端使用时都存在很大的弊端

  1. 协议开销大:HTTP头部信息冗余,不适合资源受限的嵌入式设备

  2. 实时性不足:HTTP请求-响应模式无法满足工业控制对实时性的要求

  3. 兼容性问题:许多传统工业设备只支持Modbus等专用工业协议

  4. 电力消耗:复杂的协议栈会增加设备功耗

  5. 可靠性问题:工业环境需要更强的抗干扰能力和错误检测机制

针对以上面临的弊端,工业设备有一套属于自己的专属通信方式modbus

1.Modbus协议简介

Modbus是一种串行通信协议,由Modicon公司(现施耐德电气)于1979年开发,用于PLC(可编程逻辑控制器)之间的通信。它已成为工业领域最常用的通信协议之一,具有简单、开放、易于实施的特点。

  • Modbus通信模式
    • Modbus RTU - 二进制格式,使用串行通信(RS-232/RS-485)

    • Modbus ASCII - ASCII字符格式,使用串行通信

    • Modbus TCP - 基于以太网TCP/IP协议

2.Modbus工业设备通信模拟示例

由于大部分学习者对这块不是特别了解,这里就给了一个模拟界面的示例,以下是网页端模拟的工业通信面板示例,复制代码创建一个.html结尾的文件,将代码复制进去,双击运行即可。

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Modbus通信模拟系统</title><style>* {margin: 0;padding: 0;box-sizing: border-box;font-family: 'Segoe UI', Arial, sans-serif;}body {background: linear-gradient(135deg, #1a2a6c, #2a4b8c);color: #333;min-height: 100vh;padding: 20px;}.container {max-width: 1200px;margin: 0 auto;}header {text-align: center;padding: 20px 0;margin-bottom: 30px;color: white;text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);}h1 {font-size: 2.5rem;margin-bottom: 10px;}.subtitle {font-size: 1.2rem;opacity: 0.9;}.dashboard {display: grid;grid-template-columns: 1fr 1fr;gap: 20px;margin-bottom: 30px;}.panel {background: white;border-radius: 10px;box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);padding: 20px;overflow: hidden;}.panel-title {font-size: 1.5rem;color: #2a4b8c;margin-bottom: 20px;padding-bottom: 10px;border-bottom: 2px solid #eaeaea;}.control-group {margin-bottom: 20px;}.control-label {display: block;margin-bottom: 8px;font-weight: 600;color: #555;}.slider-container {display: flex;align-items: center;}.slider {flex: 1;height: 10px;-webkit-appearance: none;appearance: none;background: #dde6f0;outline: none;border-radius: 5px;}.slider::-webkit-slider-thumb {-webkit-appearance: none;appearance: none;width: 20px;height: 20px;border-radius: 50%;background: #2a4b8c;cursor: pointer;}.value-display {width: 60px;text-align: center;font-weight: bold;color: #2a4b8c;margin-left: 15px;}.status-indicator {display: inline-block;width: 15px;height: 15px;border-radius: 50%;margin-right: 10px;}.status-on {background: #4CAF50;box-shadow: 0 0 8px #4CAF50;}.status-off {background: #f44336;}.button {padding: 10px 20px;border: none;border-radius: 5px;background: #2a4b8c;color: white;font-weight: bold;cursor: pointer;transition: background 0.3s;}.button:hover {background: #3a6bc8;}.button:active {transform: translateY(1px);}.data-table {width: 100%;border-collapse: collapse;margin-top: 15px;}.data-table th, .data-table td {padding: 12px 15px;text-align: left;border-bottom: 1px solid #eaeaea;}.data-table th {background: #f5f8fc;font-weight: 600;color: #2a4b8c;}.data-table tr:hover {background: #f9fbfd;}.communication-log {background: #1e1e1e;color: #00ff00;font-family: monospace;padding: 15px;border-radius: 5px;height: 200px;overflow-y: auto;margin-top: 20px;}.log-entry {margin-bottom: 8px;line-height: 1.4;}.log-time {color: #888;}.log-master {color: #4fa6ff;}.log-slave {color: #ffa64f;}.connection-status {display: flex;align-items: center;margin-bottom: 20px;}.connection-dot {width: 12px;height: 12px;border-radius: 50%;margin-right: 10px;}.connected {background: #4CAF50;box-shadow: 0 0 8px #4CAF50;}.disconnected {background: #f44336;}@media (max-width: 768px) {.dashboard {grid-template-columns: 1fr;}}</style>
</head>
<body><div class="container"><header><h1>Modbus通信模拟系统</h1><div class="subtitle">工业设备通信与控制模拟界面</div></header><div class="dashboard"><div class="panel"><h2 class="panel-title">设备控制面板</h2><div class="connection-status"><div class="connection-dot connected"></div><span>已连接到设备 (Modbus RTU - 从站地址: 1)</span></div><div class="control-group"><label class="control-label">温度设定值 (°C)</label><div class="slider-container"><input type="range" min="20" max="100" value="45" class="slider" id="tempSlider"><span class="value-display" id="tempValue">45</span></div></div><div class="control-group"><label class="control-label">压力设定值 (kPa)</label><div class="slider-container"><input type="range" min="0" max="1000" value="350" class="slider" id="pressureSlider"><span class="value-display" id="pressureValue">350</span></div></div><div class="control-group"><label class="control-label">流量设定值 (L/min)</label><div class="slider-container"><input type="range" min="0" max="500" value="120" class="slider" id="flowSlider"><span class="value-display" id="flowValue">120</span></div></div><div class="control-group"><label class="control-label">设备状态</label><div><button class="button" id="powerButton">启动设备</button><button class="button" id="resetButton">复位报警</button></div></div></div><div class="panel"><h2 class="panel-title">设备状态监测</h2><table class="data-table"><thead><tr><th>参数</th><th></th><th>状态</th></tr></thead><tbody><tr><td>实际温度</td><td id="actualTemp">45.2 °C</td><td><span class="status-indicator status-on"></span>正常</td></tr><tr><td>实际压力</td><td id="actualPressure">348.7 kPa</td><td><span class="status-indicator status-on"></span>正常</td></tr><tr><td>实际流量</td><td id="actualFlow">118.3 L/min</td><td><span class="status-indicator status-on"></span>正常</td></tr><tr><td>设备运行状态</td><td id="deviceStatus">运行中</td><td><span class="status-indicator status-on"></span>正常</td></tr><tr><td>报警状态</td><td id="alarmStatus">无报警</td><td><span class="status-indicator status-on"></span>正常</td></tr></tbody></table><h3 class="panel-title" style="margin-top: 25px;">Modbus通信日志</h3><div class="communication-log" id="commLog"><div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-master">主站</span>: 读取保持寄存器(4x) 40001-40005</div><div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-slave">从站1</span>: 响应 5个寄存器值 [45.2, 348.7, 118.3, 1, 0]</div><div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-master">主站</span>: 写入单个寄存器(4x) 40001 值: 45.0</div><div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-slave">从站1</span>: 响应 写入成功</div></div></div></div></div><script>// 更新滑块显示值document.getElementById('tempSlider').addEventListener('input', function() {document.getElementById('tempValue').textContent = this.value;addLogEntry("主站", `写入单个寄存器(4x) 40001 值: ${this.value}.0`);simulateResponse(`从站1`, "响应 写入成功");});document.getElementById('pressureSlider').addEventListener('input', function() {document.getElementById('pressureValue').textContent = this.value;addLogEntry("主站", `写入单个寄存器(4x) 40002 值: ${this.value}.0`);simulateResponse(`从站1`, "响应 写入成功");});document.getElementById('flowSlider').addEventListener('input', function() {document.getElementById('flowValue').textContent = this.value;addLogEntry("主站", `写入单个寄存器(4x) 40003 值: ${this.value}.0`);simulateResponse(`从站1`, "响应 写入成功");});// 设备控制按钮let devicePoweredOn = true;document.getElementById('powerButton').addEventListener('click', function() {devicePoweredOn = !devicePoweredOn;if (devicePoweredOn) {this.textContent = "停止设备";document.getElementById('deviceStatus').textContent = "运行中";addLogEntry("主站", "写线圈(0x) 00000 值: 1 (启动设备)");simulateResponse(`从站1`, "响应 写入成功");} else {this.textContent = "启动设备";document.getElementById('deviceStatus').textContent = "已停止";addLogEntry("主站", "写线圈(0x) 00000 值: 0 (停止设备)");simulateResponse(`从站1`, "响应 写入成功");}});document.getElementById('resetButton').addEventListener('click', function() {document.getElementById('alarmStatus').textContent = "无报警";addLogEntry("主站", "写线圈(0x) 00001 值: 1 (复位报警)");simulateResponse(`从站1`, "响应 写入成功");});// 添加日志条目function addLogEntry(source, message) {const logElement = document.getElementById('commLog');const time = new Date().toLocaleTimeString();const entry = document.createElement('div');entry.className = 'log-entry';entry.innerHTML = `<span class="log-time">${time}</span> <span class="log-${source === '主站' ? 'master' : 'slave'}">${source}</span>: ${message}`;logElement.appendChild(entry);logElement.scrollTop = logElement.scrollHeight;}// 模拟从站响应function simulateResponse(source, message) {setTimeout(() => {addLogEntry(source, message);}, 300);}// 模拟定期数据读取setInterval(() => {if (devicePoweredOn) {// 更新实际值,在设定值附近随机波动const temp = (parseFloat(document.getElementById('tempSlider').value) + (Math.random() * 2 - 1)).toFixed(1);const pressure = (parseFloat(document.getElementById('pressureSlider').value) + (Math.random() * 5 - 2.5)).toFixed(1);const flow = (parseFloat(document.getElementById('flowSlider').value) + (Math.random() * 4 - 2)).toFixed(1);document.getElementById('actualTemp').textContent = temp + " °C";document.getElementById('actualPressure').textContent = pressure + " kPa";document.getElementById('actualFlow').textContent = flow + " L/min";addLogEntry("主站", "读取保持寄存器(4x) 40001-40005");simulateResponse("从站1", `响应 5个寄存器值 [${temp}, ${pressure}, ${flow}, 1, 0]`);}}, 5000);</script>
</body>
</html>
  • 示例界面
    在这里插入图片描述

3.模拟Modbus通信案例

由于实际Modbus设备不常见且设备需要接线并配合专属模块,所以我们将模拟一个完整的Modbus TCP通信场景,包括服务器(从设备)和客户端(主设备)。

模拟场景描述

我们将模拟一个工业环境中的温度监控系统:

  • 1个Modbus服务器模拟温度控制器

  • 3个模拟温度传感器(地址0x0001-0x0003)

  • 2个模拟继电器输出(地址0x0004-0x0005)

  • 1个模拟报警状态寄存器(地址0x0006)

开始之前请安装指定版本的pymodbus

pip install pymodbus==3.6.5

服务器端代码 (modbus_server.py)

from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.device import ModbusDeviceIdentification
import random
import timedef run_server():# 初始化数据存储# 线圈: 2个继电器 (地址 0x0004-0x0005)coils = ModbusSequentialDataBlock(0, [False] * 6)  # 创建6个线圈,从地址0开始# 保持寄存器: 3个温度传感器 (地址 0x0001-0x0003) 和1个报警状态 (地址 0x0006)hr_data = [0] * 7  # 创建7个寄存器,从地址0开始# 初始化温度值 (20-30度)hr_data[0] = 220  # 地址0x0001: 22.0°Chr_data[1] = 245  # 地址0x0002: 24.5°Chr_data[2] = 190  # 地址0x0003: 19.0°Cholding_registers = ModbusSequentialDataBlock(0, hr_data)# 创建从设备上下文slave_context = ModbusSlaveContext(co=coils,hr=holding_registers)# 创建服务器上下文context = ModbusServerContext(slaves=slave_context, single=True)# 设置设备标识信息identity = ModbusDeviceIdentification()identity.VendorName = '模拟温度控制器'identity.ProductCode = 'TC-100'identity.VendorUrl = 'http://example.com'identity.ProductName = '温度监控系统'identity.ModelName = 'Modbus TCP模拟器'identity.MajorMinorRevision = '1.0.0'# 启动TCP服务器print("启动Modbus TCP服务器在 localhost:5020")StartTcpServer(context=context, identity=identity, address=("localhost", 5020))if __name__ == "__main__":run_server()

客户端代码 (modbus_client.py)

from pymodbus.client import ModbusTcpClient
import timedef read_temperature_sensors():"""读取温度传感器数据"""client = ModbusTcpClient('localhost', port=5020)try:client.connect()# 读取保持寄存器 (温度传感器和报警状态)# 地址 0x0001-0x0003: 温度传感器 (对应寄存器地址 0-2)# 地址 0x0006: 报警状态 (对应寄存器地址 5)result = client.read_holding_registers(address=0, count=6, slave=1)if not result.isError():print("=== 温度传感器读数 ===")temperatures = [value / 10.0 for value in result.registers[:3]]alarm_status = result.registers[5]for i, temp in enumerate(temperatures, 1):print(f"传感器 {i}: {temp}°C")print(f"报警状态: {'正常' if alarm_status == 0 else '警告'}")else:print("读取寄存器错误:", result)except Exception as e:print("连接错误:", e)finally:client.close()def read_relay_status():"""读取继电器状态"""client = ModbusTcpClient('localhost', port=5020)try:client.connect()# 读取线圈 (继电器状态)# 地址 0x0004-0x0005: 继电器 (对应线圈地址 3-4)result = client.read_coils(address=3, count=2, slave=1)if not result.isError():print("\n=== 继电器状态 ===")for i, status in enumerate(result.bits, 1):print(f"继电器 {i}: {'开启' if status else '关闭'}")else:print("读取线圈错误:", result)except Exception as e:print("连接错误:", e)finally:client.close()def toggle_relay(relay_index, state):"""切换继电器状态"""client = ModbusTcpClient('localhost', port=5020)try:client.connect()# 写入线圈 (控制继电器)# 继电器1: 地址 3, 继电器2: 地址 4address = 3 + (relay_index - 1)result = client.write_coil(address=address, value=state, slave=1)if not result.isError():print(f"\n继电器 {relay_index}{'开启' if state else '关闭'}")else:print("写入线圈错误:", result)except Exception as e:print("连接错误:", e)finally:client.close()if __name__ == "__main__":# 读取当前状态read_temperature_sensors()read_relay_status()# 切换第一个继电器状态toggle_relay(1, True)# 再次读取状态以确认更改time.sleep(0.5)read_relay_status()

测试方法及说明

  1. 使用方法
    • 首先启动Modbus服务器:

      python modbus_server.py
      
    • 然后运行客户端:

      python modbus_client.py
      
    • 客户端将连接到服务器并执行一系列Modbus操作:

      • 读取温度传感器值

      • 读取和控制继电器状态

  • 运行示例
=== 温度传感器读数 ===
传感器 1: 24.5°C
传感器 2: 19.0°C
传感器 3: 0.0°C
报警状态: 正常=== 继电器状态 ===
继电器 1: 关闭
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭继电器 1 已开启=== 继电器状态 ===
继电器 1: 开启
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭
  1. Modbus通信优势

    • 简单高效:协议简单,开销小,适合资源受限的设备

    • 实时性强:支持快速响应,适合工业控制场景

    • 广泛兼容:几乎所有工业设备都支持Modbus协议

    • 可靠性高:内置错误检测机制,CRC校验保证数据完整性

    • 灵活部署:支持多种物理层(RS-485, TCP/IP等)

注意事项

  • 对于专门的modbus设备,python等语言有pymodbus的高级接口,本案例为模拟案例,仅供参考和学习,对于深入的串口通信(如modbus RTU),可以采用pyserial等库实现实现基本的通信传输。

总结

本小节以代码的形式展示常见的信息推送和共享机制,涉及服务器端、系统内部、工业设备间的通信机制和基本实现,是AI视频监控系统接入生态圈必不可少的一部分。

下期预告

  • RTSP推流与展示
http://www.dtcms.com/a/361691.html

相关文章:

  • 刘洋洋《一笔相思绘红妆》上线,献给当代痴心人的一封情书
  • 互斥量(Mutex,全称 Mutual Exclusion)用于保证同一时间只有一个线程(或进程)访问共享资源,从而避免并发操作导致的数据不一致问题
  • RAG-文本到SQL
  • SOME/IP-SD中IPv4端点选项与IPv4 SD端点选项
  • 突破超强回归模型,高斯过程回归!
  • 使用 BayesFlow 神经网络简化贝叶斯推断的案例分享(二)
  • 无重复字符的最长子串,leetCode热题100,C++实现
  • 【FireCrawl】:本地部署AI爬虫+DIFY集成+V2新特性
  • FFmpeg 不同编码的压缩命令详解
  • 速卖通自养号测评系统开发指南:环境隔离与行为模拟实战
  • 测试-用例篇
  • FFMPEG AAC
  • 【LeetCode每日一题】19. 删除链表的倒数第 N 个结点 24. 两两交换链表中的节点
  • Java内存模型下的高性能锁优化与无锁编程实践指南
  • 几种特殊的数字滤波器---原理及设计
  • 【零碎小知识点 】(四) Java多线程编程深入与实践
  • MongoDB主从切换实战:如何让指定从库“精准”升级为主库?保姆级教程!
  • 36. Ansible变量+管理机密
  • 【Android】使用Handler做多个线程之间的通信
  • Java面试宝典:Redis高并发高可用(集群)
  • 函数,数组与正则表达式
  • Kafka 架构原理
  • 销售事业十年规划,并附上一套能帮助销售成长的「软件工具组合」
  • 【git 基础】detached HEAD state的出现和解决
  • C++11模板优化大揭秘:让你的代码更简洁、更安全、更高效
  • javaScript变量命名规则
  • 【汇客项目】:在启动过程中报错 本来安装的是node-sass 被卸载后安装的sass ,代码中一部分出现问题
  • 【深度学习基础】深度学习中的数据增强技术:从理论到实践的解析
  • 【ARMv7】开篇:掌握ARMv7架构Soc开发技能
  • Deepoc具身智能运动控制板:赋能机器感知与决策