当前位置: 首页 > news >正文

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录

关键点

技术实现1

技术实现2


摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车辆状态信息。文章包含完整代码实现和ZeroMQ应用技巧。

关键点

# 深拷贝避免数据污染 避免直接修改原始配置文件
led_info = json.loads(json.dumps(original_data))# 同步更新两个数据源
led_info['data']['driveMode'] = drive_control
vehicle_status['data']['driveMode'] = drive_control

技术实现1

#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Pollerdef main() -> None:  #> None 表示此函数不返回任何值global drive_control   #声明 drive_control 为全局变量,这样可以在函数内部和外部访问它drive_control = 1if len(sys.argv) != 2:print('Usage: publisher <bind-to>')sys.exit(1)
#tcp://192.168.1.136:5582  发布bind_to = sys.argv[1]# 加载初始数据with open('data.json', 'r', encoding='utf-8') as f:data = json.load(f)['filter_auto_drive_info']#使用 json.load() 方法读取 JSON 数据,将其中的 filter_auto_drive_info 部分加载到 data 变量中# 分离LED和车辆状态数据 从 data 中提取 id 为 'led_info' 的项led_info = next(item for item in data if item['id'] == 'led_info')vehicle_status = next(item for item in data if item['id'] == 'vehicle_status_upload')# ZeroMQ上下文ctx = zmq.Context()# 发布套接字pub_socket = ctx.socket(zmq.PUB)pub_socket.connect(bind_to)# 订阅套接字sub_socket = ctx.socket(zmq.SUB)sub_socket.connect("tcp://192.168.1.144:5581")  # 根据实际情况调整 订阅sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')#设置订阅过滤器,只订阅以 'loudspeaker_end' 为主题的消息#创建一个 Poller 对象,用于检测多个套接字的可用性#将订阅套接字注册到 poller,表示要监听 sub_socket 是否有消息可读poller = Poller()poller.register(sub_socket, zmq.POLLIN)start_time = time.time()received_end = Falsetry:while True:current_time = time.time()elapsed = current_time - start_time# 确定当前状态if not received_end:if elapsed < 5:   #5s内  自动驾驶,亮绿灯status = 1elif elapsed < 10:  # 全灭 人工驾驶status = 2drive_control = 0elif elapsed < 15:  # 远程驾驶,亮黄灯status = 0drive_control = 2else:drive_control = 4  # 主动安全,亮红灯else:status = random.randint(1, 7)# 更新LED状态led_info['data']['loudspeakerStatus'] = statusled_info['data']['driveMode'] = drive_control# 发送LED信息并打印#使用 json.dumps() 将 led_info 转换为 JSON 字符串led_data = json.dumps(led_info)send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))print(f"Sending LED data at {send_time}: {led_data}")pub_socket.send_multipart([b'filter_auto_drive_info',led_data.encode('utf-8')])time.sleep(1)  #0.1s,也即每100ms发送1次# 发送车辆状态并打印#将 vehicle_status 转换为 JSON 字符串并发送,每 100 毫秒发送一次vehicle_data = json.dumps(vehicle_status)print(f"Sending Vehicle data at {send_time}: {vehicle_data}")pub_socket.send_multipart([b'filter_auto_drive_info',vehicle_data.encode('utf-8')])time.sleep(1)# 检查订阅消息 使用 poller.poll(0) 检查是否有新消息socks = dict(poller.poll(0))if sub_socket in socks:#如果有消息,读取并处理它,检查是否是 'loudspeaker_end' 主题的消息。如果是,则打印终止信号并设置 received_end = Truewhile True:try:msg = sub_socket.recv_multipart(zmq.NOBLOCK)message = json.loads(msg[1].decode('utf-8'))# 打印订阅到的消息print(f"Received message on topic '{msg[0].decode()}': {json.dumps(message, indent=2)}")# 处理loudspeaker_end的消息if message.get('id') == 'loudspeaker_end':speaker_id = message.get('data', {}).get('speakerId')print(f"Received termination signal, speakerId: {speaker_id} (This indicates that the sound has finished playing).")received_end = Trueexcept zmq.Again:break#捕获 KeyboardInterrupt(例如用户按 Ctrl+C),打印消息并退出程序except KeyboardInterrupt:print("\nShutting down...")finally:pub_socket.close()sub_socket.close()ctx.term()if __name__ == "__main__":main()"""
try 块用来包裹可能抛出异常的代码。
except 可以用来捕获并处理特定的异常。
finally 块中的代码无论如何都会被执行,通常用于清理操作。
finally 语句确保了无论 try 代码块是否有异常抛出,都会执行其中的代码,因此它非常适合资源管理(如文件、数据库连接的关闭等)。
"""

技术实现2

#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Pollerdef main() -> None:global drive_controldrive_control = 1if len(sys.argv) != 2:print('Usage: publisher <bind-to>')sys.exit(1)bind_to = sys.argv[1]# 加载初始数据with open('data.json', 'r', encoding='utf-8') as f:full_data = json.load(f)# 提取原始数据结构data = full_data['filter_auto_drive_info']# 创建深拷贝避免修改原始数据led_info = json.loads(json.dumps(next(item for item in data if item['id'] == 'led_info')))vehicle_status = json.loads(json.dumps(next(item for item in data if item['id'] == 'vehicle_status_upload')))# ZeroMQ上下文ctx = zmq.Context()# 发布套接字pub_socket = ctx.socket(zmq.PUB)pub_socket.connect(bind_to)# 订阅套接字sub_socket = ctx.socket(zmq.SUB)sub_socket.connect("tcp://192.168.1.144:5581")sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')poller = Poller()poller.register(sub_socket, zmq.POLLIN)start_time = time.time()received_end = Falsemode_change_time = time.time()  # 记录模式切换时间try:while True:current_time = time.time()elapsed = current_time - start_timemode_elapsed = current_time - mode_change_time# 状态机逻辑if not received_end:if mode_elapsed < 5:   # 自动驾驶(绿灯)status = 1drive_control = 1elif mode_elapsed < 10:  # 人工驾驶(全灭)status = 2drive_control = 0elif mode_elapsed < 15:  # 远程驾驶(黄灯)status = 0drive_control = 2else:  # 主动安全(红灯)drive_control = 4status = 3# 重置计时器进入循环mode_change_time = time.time()else:# 接收到结束信号后保持主动安全模式drive_control = 4status = random.randint(1, 7)  # 随机状态演示# 更新数据结构led_info['data']['driveMode'] = drive_controlled_info['data']['loudspeakerStatus'] = status#vehicle_status['data']['driveMode'] = drive_control# 发送LED信息led_data = json.dumps(led_info)send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))print(f"[{send_time}] LED状态 | 模式: {drive_control} | 状态: {status}")pub_socket.send_multipart([b'filter_auto_drive_info',led_data.encode('utf-8')])# 发送车辆状态vehicle_data = json.dumps(vehicle_status)print(f"[{send_time}] 车辆状态 | 模式: {drive_control}")pub_socket.send_multipart([b'filter_auto_drive_info',vehicle_data.encode('utf-8')])time.sleep(1)  # 1000ms发送间隔# 检查订阅消息socks = dict(poller.poll(0))if sub_socket in socks:while True:try:msg = sub_socket.recv_multipart(zmq.NOBLOCK)message = json.loads(msg[1].decode('utf-8'))if message.get('id') == 'loudspeaker_end':speaker_id = message.get('data', {}).get('speakerId')print(f"\n[!] 收到终止信号 | 设备ID: {speaker_id}")print("[系统] 切换到主动安全模式(4)")received_end = Truedrive_control = 4  # 强制切换到主动安全模式except zmq.Again:breakexcept KeyboardInterrupt:print("\n[系统] 程序关闭")finally:pub_socket.close()sub_socket.close()ctx.term()if __name__ == "__main__":main()

其中,所需的data.json文件如下

{
"filter_auto_drive_info": [
{
"id": "led_info",
"data":{
"driveMode":0,
"workStatus": 4,
"vehicleStatus": 3,
"arrowStatus": 0,
"vehicleSpeed": 100,
"loudspeakerStatus": 1,
"faultStatus": 0
}
},
{
"id": "vehicle_status_upload",
"data":{
"ch4Concentration": 444,
"coConcentration": 123,
"powerOnStatus": 1,
"soc": 99,
"mileageAccrual": 234
}
}]
}

通过运行python ./test.py tcp://192.168.1.137:5582 即可运行 (zmq发布端连接)

结语: 本系统展示了如何利用ZeroMQ的发布-订阅模式构建实时车辆监控系统。通过状态机设计和深度数据同步,实现了驾驶模式的智能切换。

相关文章:

  • 2025年登高架设作业考试题库精选
  • uniapp 实现腾讯云IM群文件上传下载功能
  • 智能门锁申请 EN 18031 欧盟网络安全认证指南​
  • AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
  • 边缘计算设备全解析:边缘盒子在各大行业的落地应用场景
  • 云原生安全实战:API网关Envoy的鉴权与限流详解
  • 详解快排的四种方式
  • 1.6 http模块nodejs 对比 go
  • CocosCreator 之 JavaScript/TypeScript和Java的相互交互
  • 篇章十 数据结构——排序
  • “冒个泡泡”,排个序呗~:C语言版冒泡排序全解
  • Linux命令cat /proc/net/snmp查看网络协议层面统计信息
  • 【春秋云镜】CVE-2023-2130漏洞复现exp
  • 如何把工业通信协议转换成http websocket
  • UFW防火墙安全指南
  • 《C++初阶之入门基础》【普通引用 + 常量引用 + 内联函数 + nullptr】
  • Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
  • 在Zenodo下载文件 用到googlecolab googledrive
  • Excel 怎么让透视表以正常Excel表格形式显示
  • WebRTC(一):整体架构
  • b站怎么付费推广自己的视频/微营销平台
  • 国外自助建站/收录之家
  • 长春做网站多少钱/网站建站流程
  • 舟山网站建设推荐/整合营销传播的定义
  • 泉州专业建站品牌/衡水seo营销
  • 扬州网站开发/企业网页设计与推广