Python串口通信与MQTT物联网网关:连接STM32与物联网平台
下面这篇博客,将带你一步步构建一个连接STM32硬件与MQTT物联网世界的Python桥接程序。无论是智能家居传感器数据上传,还是反向控制开关,这里都有你需要的技术方案。
在物联网项目中,STM32微控制器负责采集传感器数据和执行设备动作,而Python则扮演着连接硬件与物联网平台的桥梁角色。本文将详细介绍如何构建一个稳定可靠的Python网关程序,实现STM32与MQTT服务器之间的双向通信。
系统架构概述
在典型的物联网应用场景中,STM32作为下位机负责硬件层面的操作,包括传感器数据采集、GPIO控制等;而运行Python程序的设备(如树莓派、工控机或普通电脑)则作为网关,负责串口通信与网络通信之间的协议转换。
系统数据流如下:
- 上行数据:STM32传感器数据 → 串口 → Python程序 → MQTT服务器 → 云端/监控系统
- 下行数据:云端/监控系统 → MQTT服务器 → Python程序 → 串口 → STM32控制指令
这种架构的优势在于分工明确,STM32专注于实时性要求高的硬件操作,Python网关则处理网络通信和协议转换等复杂任务。
Python串口通信实现
串口配置与初始化
Python中操作串口主要依赖pyserial
库,它提供了统一的串口操作接口。
import serial
import time# 串口配置参数
SERIAL_PORT = "COM10" # Windows系统,Linux/Mac通常为/dev/ttyUSB0或/dev/ttyACM0
BAUD_RATE = 115200 # 与STM32程序设置的波特率一致
TIMEOUT = 1 # 读取超时时间(秒)# 初始化串口连接
def init_serial():try:ser = serial.Serial(port=SERIAL_PORT,baudrate=BAUD_RATE,timeout=TIMEOUT,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS)# 重置串口缓冲区ser.setDTR(False)ser.setRTS(False)time.sleep(2) # 等待串口稳定ser.reset_input_buffer()ser.reset_output_buffer()print(f"串口 {SERIAL_PORT} 初始化成功")return serexcept serial.SerialException as e:print(f"串口初始化失败: {e}")return None
串口数据读取与解码
STM32发送的数据可能包含非UTF-8字符,需要稳健的解码策略:
def read_serial_data(ser):"""读取并解码串口数据"""if ser and ser.in_waiting > 0:try:# 读取原始字节数据raw_data = ser.readline()# 尝试UTF-8解码,忽略错误字符decoded_data = raw_data.decode('utf-8', errors='ignore').strip()# 如果解码后为空,尝试其他编码if not decoded_data:decoded_data = raw_data.decode('latin-1').strip()return decoded_data if decoded_data else Noneexcept UnicodeDecodeError as e:print(f"串口数据解码错误: {e}")return Noneexcept Exception as e:print(f"读取串口数据时发生错误: {e}")return Nonereturn None
向STM32发送命令
向STM32发送命令时,需要确保格式正确且包含适当的结束符:
def send_serial_command(ser, command):"""向串口发送命令"""if not ser or not ser.is_open:print("错误:串口未打开")return Falsetry:# 添加结束符(STM32常用\r\n作为行结束符)full_command = command + "\r\n"encoded_command = full_command.encode()# 发送命令bytes_written = ser.write(encoded_command)ser.flush() # 确保数据立即发送# 验证发送完整性if bytes_written == len(encoded_command):print(f"成功发送命令: {repr(full_command)}")return Trueelse:print(f"发送不完整: 预期{len(encoded_command)}字节,实际发送{bytes_written}字节")return Falseexcept serial.SerialException as e:print(f"串口发送错误: {e}")return False
MQTT客户端实现
MQTT客户端配置
使用paho-mqtt
库创建MQTT客户端,注意使用持久化连接以提高效率:
import paho.mqtt.client as mqtt
import json
import time# MQTT配置
MQTT_BROKER = "localhost" # MQTT代理服务器地址
MQTT_PORT = 1883 # MQTT端口
KEEPALIVE = 60 # 心跳间隔# MQTT主题定义
TOPIC_LIGHT_SET = "xw103/home/light/set"
TOPIC_MOTOR_SET = "xw103/home/motor/set"
TOPIC_SENSOR_DATA = "xw103/home/sensor/data"class MQTTGateway:def __init__(self):self.client = mqtt.Client(protocol=mqtt.MQTTv311)self.client.on_connect = self._on_connectself.client.on_message = self._on_messageself.client.on_disconnect = self._on_disconnect# 连接状态标记self.connected = Falsedef _on_connect(self, client, userdata, flags, rc):"""MQTT连接回调"""if rc == 0:print("MQTT连接成功")self.connected = True# 订阅相关主题client.subscribe(TOPIC_LIGHT_SET)client.subscribe(TOPIC_MOTOR_SET)print(f"已订阅主题: {TOPIC_LIGHT_SET}, {TOPIC_MOTOR_SET}")else:print(f"MQTT连接失败,错误代码: {rc}")self.connected = Falsedef _on_message(self, client, userdata, msg):"""MQTT消息接收回调"""print(f"收到MQTT消息 - 主题: {msg.topic}, 内容: {msg.payload.decode()}")# 将消息转发给STM32处理self._process_mqtt_message(msg.topic, msg.payload.decode())def _on_disconnect(self, client, userdata, rc):"""MQTT断开连接回调"""print("MQTT连接断开")self.connected = False# 自动重连逻辑if rc != 0:print("意外断开,尝试重连...")time.sleep(5)self.connect()def connect(self):"""连接到MQTT代理"""try:self.client.connect(MQTT_BROKER, MQTT_PORT, KEEPALIVE)# 启动网络循环(非阻塞)self.client.loop_start()except Exception as e:print(f"MQTT连接异常: {e}")def disconnect(self):"""断开MQTT连接"""self.client.loop_stop()self.client.disconnect()def publish_sensor_data(self, data):"""发布传感器数据到MQTT"""if self.connected:try:# 将数据转换为JSON格式payload = json.dumps(data, ensure_ascii=False)result = self.client.publish(TOPIC_SENSOR_DATA, payload, qos=1)if result.rc == mqtt.MQTT_ERR_SUCCESS:print(f"传感器数据发布成功: {payload}")else:print(f"传感器数据发布失败: {result.rc}")except Exception as e:print(f"发布传感器数据时发生错误: {e}")
MQTT消息处理
处理从MQTT接收到的消息并转换为STM32可理解的命令:
def _process_mqtt_message(self, topic, payload):"""处理MQTT消息并转换为串口命令"""cmd = ""try:# 解析JSON格式的payloaddata = json.loads(payload)if topic == TOPIC_LIGHT_SET:# 处理灯光控制消息state = data.get("state", "").upper()if state == "ON":cmd = "LIGHT_ON"elif state == "OFF":cmd = "LIGHT_OFF"elif topic == TOPIC_MOTOR_SET:# 处理电机控制消息if "speed" in data:speed = data["speed"]cmd = "MOTOR_ON" if speed > 0 else "MOTOR_OFF"elif "direction" in data:direction = data["direction"]cmd = f"MOTOR_DIR_{direction.upper()}"except json.JSONDecodeError:# 如果不是JSON格式,使用简单匹配print("Payload不是JSON格式,使用简单匹配")if topic == TOPIC_LIGHT_SET:if "ON" in payload.upper():cmd = "LIGHT_ON"elif "OFF" in payload.upper():cmd = "LIGHT_OFF"# 发送命令到STM32if cmd:# 这里需要访问全局的ser对象,或者通过其他方式传递global sersend_serial_command(ser, cmd)
STM32端串口通信处理
串口中断配置
STM32端需要配置串口中断来接收Python网关发送的命令:
// 串口接收缓冲区定义
#define USART1_REC_LEN 256
uint8_t USART1_RX_BUF[USART1_REC_LEN];
uint16_t USART1_RX_STA = 0;// 串口中断服务函数
void USART1_IRQHandler(void)
{uint8_t r;if(USART_GetITStatus(USART1, USART_IT_RXNE) != RESET){r = USART_ReceiveData(USART1);if((USART1_RX_STA & 0x8000) == 0) // 接收未完成{if(USART1_RX_STA & 0x4000) // 接收到了0x0d(回车){if(r != 0x0a) // 不是0x0a(换行),接收错误USART1_RX_STA = 0;else // 接收到0x0a,接收完成USART1_RX_STA |= 0x8000;}else // 还没收到0x0d{if(r == 0x0d)USART1_RX_STA |= 0x4000;else{USART1_RX_BUF[USART1_RX_STA & 0X3FFF] = r;USART1_RX_STA++;if(USART1_RX_STA > (USART1_REC_LEN-1))USART1_RX_STA = 0; // 缓冲区溢出,重新开始}}}}
}
命令解析与处理
STM32主循环中处理接收到的命令:
// 命令处理函数
void process_uart_command(char* cmd)
{printf("收到命令: %s\r\n", cmd);if(strcmp(cmd, "LIGHT_ON") == 0){LED1 = 0; // 开灯printf("灯光已打开\r\n");}else if(strcmp(cmd, "LIGHT_OFF") == 0){LED1 = 1; // 关灯printf("灯光已关闭\r\n");}else if(strcmp(cmd, "MOTOR_ON") == 0){// 启动电机motor_start();printf("电机已启动\r\n");}else if(strcmp(cmd, "MOTOR_OFF") == 0){// 停止电机motor_stop();printf("电机已停止\r\n");}else{printf("未知命令: %s\r\n", cmd);}
}// 串口命令处理函数
void USART1_ProcessCommand(void)
{if(USART1_RX_STA & 0x8000) // 检查接收完成标志{uint16_t len = USART1_RX_STA & 0x3FFF; // 获取数据长度USART1_RX_BUF[len] = '\0'; // 添加字符串结束符// 处理命令process_uart_command((char*)USART1_RX_BUF);USART1_RX_STA = 0; // 清除接收状态,准备下一次接收}
}// 主循环
int main(void)
{// 系统初始化// ...while(1){// 处理串口命令USART1_ProcessCommand();// 其他任务...delay_ms(10);}
}
数据格式与协议设计
传感器数据上报格式
STM32向Python网关上报传感器数据时,建议使用结构化数据格式:
// STM32端数据上报
void report_sensor_data(float temperature, float humidity)
{// JSON格式数据printf("{\"type\":\"sensor\",\"temp\":%.1f,\"humi\":%.1f,\"ts\":%lu}\r\n", temperature, humidity, get_timestamp());
}
命令协议设计
MQTT到STM32的命令使用一致的JSON格式:
{"device": "light","action": "set","params": {"state": "ON"},"timestamp": 1696789012
}
错误处理与稳定性优化
串口重连机制
def auto_reconnect_serial():"""自动重连串口"""global sermax_retries = 5retry_count = 0while retry_count < max_retries:print(f"尝试重新连接串口,第{retry_count+1}次")ser = init_serial()if ser and ser.is_open:print("串口重新连接成功")return Trueretry_count += 1time.sleep(2) # 等待2秒后重试print("串口重连失败,请检查硬件连接")return False
MQTT消息质量保证
def publish_with_retry(client, topic, payload, qos=1, retry_count=3):"""带重试的消息发布"""for attempt in range(retry_count):try:result = client.publish(topic, payload, qos=qos)if result.rc == mqtt.MQTT_ERR_SUCCESS:return Trueelse:print(f"发布失败,第{attempt+1}次尝试,错误码: {result.rc}")except Exception as e:print(f"发布异常,第{attempt+1}次尝试: {e}")time.sleep(1) # 等待1秒后重试return False
完整应用示例
Python网关主程序
def main():"""主函数"""# 初始化串口ser = init_serial()if not ser:print("无法初始化串口,程序退出")return# 初始化MQTT网关gateway = MQTTGateway()gateway.connect()# 传感器数据上报间隔last_report_time = time.time()report_interval = 30 # 30秒上报一次print("物联网网关启动成功,开始处理数据...")try:while True:# 读取串口数据serial_data = read_serial_data(ser)if serial_data:print(f"收到串口数据: {serial_data}")# 尝试解析并转发传感器数据try:sensor_data = json.loads(serial_data)if "type" in sensor_data and sensor_data["type"] == "sensor":gateway.publish_sensor_data(sensor_data)except json.JSONDecodeError:# 如果不是JSON数据,可以选择其他处理方式pass# 处理MQTT消息(通过回调函数自动处理)# 主循环中可以处理其他任务time.sleep(0.1) # 避免CPU占用过高except KeyboardInterrupt:print("程序被用户中断")except Exception as e:print(f"程序运行错误: {e}")finally:# 清理资源if ser and ser.is_open:ser.close()gateway.disconnect()print("程序已退出")if __name__ == "__main__":main()
调试技巧与常见问题
1. 串口通信调试
- 检查端口权限:Linux/Mac系统需要读写权限
- 验证波特率:确保Python端与STM32端波特率一致
- 监听串口数据:使用串口调试助手验证数据收发
2. MQTT连接问题
- 检查网络连接:确保能访问MQTT代理服务器
- 验证认证信息:检查用户名、密码、客户端ID
- 测试网络防火墙:确保MQTT端口(通常为1883)未被阻塞
3. 数据解析错误
- 添加日志输出:在关键位置打印收发数据
- 验证数据格式:使用JSON验证工具检查格式正确性
- 处理异常情况:添加充分的异常处理代码
总结
通过Python构建STM32与MQTT物联网平台之间的网关,能够充分发挥两者的优势:STM32负责实时硬件操作,Python处理复杂的网络通信和协议转换。这种架构在物联网项目中具有广泛的适用性,可以根据具体需求灵活调整和扩展。
本文提供的代码示例涵盖了从基础通信到错误处理的完整流程,可以作为实际项目的起点。在实际应用中,还需要根据具体硬件特性和业务需求进行适当的调整和优化。