当前位置: 首页 > news >正文

MySQL存储STM32F407上的HX711数据

注意:在串口调试助手和脚本之间切换时记得拔下单片机再重新插上,否则串口会被堵塞

1.监听脚本

# -*- coding: utf-8 -*-
import serial
import mysql.connector
import re
import time
import signal# ======================
# 配置区域(根据实际情况修改)
# ======================
SERIAL_PORT = 'COM3'
BAUD_RATE = 9600
ENCODING = 'latin-1'  # 适配特殊字符DB_CONFIG = {'host': 'localhost','user': 'root','password': '','database': 'weight_data','raise_on_warnings': True
}# ======================
# 核心功能函数
# ======================def test_db_connection():"""测试数据库连接是否正常"""try:conn = mysql.connector.connect(**DB_CONFIG)conn.close()print("[数据库] ✅ 连接测试成功")except mysql.connector.Error as e:print(f"[数据库] ❌ 连接失败: {e}")exit(1)def create_table(cursor):"""创建数据表(如果不存在)"""try:create_sql = """CREATE TABLE IF NOT EXISTS weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_sql)# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] 📝 表结构正常 (现有记录数: {count})")except mysql.connector.Error as e:if e.errno == 1050:  # 表已存在的错误码print("[数据库] 📝 表已存在,继续使用")else:raise  # 其他错误则继续抛出def parse_serial_data(line):"""解析串口数据"""try:# 清理数据,移除空白字符line = line.strip()print(f"[调试] 清理后的数据: '{line}'")# 直接尝试将整行转换为数字if line:weight = float(line)print(f"[解析] ✅ 成功解析重量值: {weight}g")return weightprint(f"[解析] ⚠️ 数据为空")return Noneexcept Exception as e:print(f"[解析] ❌ 解析错误: {e}")print(f"[解析] ⚠️ 问题数据: {repr(line)}")return Nonedef save_to_db(weight):"""无条件保存数据到数据库"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("INSERT INTO weight_records (weight) VALUES (%s)", (weight,))conn.commit()print(f"[数据库] 📊 数据已保存: {weight}g")except mysql.connector.Error as e:print(f"[数据库] ❌ 保存失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在优雅退出...")print("\n当前数据库中的记录:")view_stored_data()exit(0)def main():"""主程序"""signal.signal(signal.SIGINT, signal_handler)try:ser = serial.Serial(port=SERIAL_PORT,baudrate=BAUD_RATE,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=1  # 设置更短的超时时间,方便调试)except serial.SerialException as e:print(f"[串口] ❌ 设备未找到: {e}")exit(1)print(f"[系统] 🟢 正在监听 {SERIAL_PORT}...")print("[提示] 按 Ctrl+C 可以查看已存储数据并退出程序")test_db_connection()try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()create_table(cursor)conn.commit()print("[数据库] ✅ 数据表已就绪")except mysql.connector.Error as e:if e.errno != 1050:  # 忽略"表已存在"的错误print(f"[数据库] ❌ 数据库操作失败: {e}")exit(1)finally:if 'conn' in locals():cursor.close()conn.close()data_count = 0buffer = ""try:with ser:print("[串口] 🔄 正在等待数据...")print("[调试] 提示:请确保STM32已连接并运行")while True:if ser.in_waiting > 0:# 读取字节char = ser.read().decode(ENCODING, errors='ignore')print(".", end="", flush=True)  # 显示接收进度if char == '\n':  # 收到换行符时处理数据if buffer:print(f"\n[串口] 📡 收到完整数据: {repr(buffer)}")weight = parse_serial_data(buffer)if weight is not None:if 0 <= weight <= 100000:  # 数据范围检查save_to_db(weight)data_count += 1print(f"[统计] ✅ 已成功存储 {data_count} 条数据")else:print(f"[警告] ⚠️ 数据超出合理范围: {weight}g")buffer = ""  # 清空缓冲区else:buffer += char  # 累积数据time.sleep(0.01)  # 短暂延时,避免CPU占用过高except Exception as e:print(f"\n[系统] ❌ 发生错误: {e}")finally:if ser.is_open:ser.close()print("[系统] 🔌 串口已关闭")if __name__ == "__main__":main()

2.查询脚本 

import mysql.connector
import signal
import sys# ======================
# 配置区域(根据实际情况修改)
# ======================DB_CONFIG = {'host': 'localhost','user': 'root','password': '12345wjh','database': 'weight_data','raise_on_warnings': True
}def ensure_table_exists():"""确保数据表存在并返回当前记录数"""conn = Nonetry:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()# 检查表是否存在cursor.execute("SHOW TABLES LIKE 'weight_records'")table_exists = cursor.fetchone() is not Noneif not table_exists:# 创建表create_table_sql = """CREATE TABLE weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_table_sql)conn.commit()print("[数据库] ✅ 数据表创建成功")# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] ✅ 表结构正常 (现有记录数: {count})")return countexcept mysql.connector.Error as e:error_msg = str(e)if "Access denied" in error_msg:print(f"[数据库] ❌ 数据库访问被拒绝,请检查用户名和密码")elif "Connection refused" in error_msg:print(f"[数据库] ❌ 无法连接到数据库服务器,请确保MySQL服务已启动")elif "Unknown database" in error_msg:print(f"[数据库] ❌ 数据库'{DB_CONFIG['database']}'不存在")else:print(f"[数据库] ❌ 数据库操作失败: {error_msg}")sys.exit(1)finally:if conn:try:conn.cursor().close()conn.close()except:passdef view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("\n[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在退出程序...")sys.exit(0)def main():# 设置信号处理器signal.signal(signal.SIGINT, signal_handler)print("[系统] 🟢 数据库监控程序已启动")record_count = ensure_table_exists()print("[提示] 按 Ctrl+C 可以退出程序")while True:print("\n请选择操作:")print("1. 查看最近10条记录")print("2. 退出程序")choice = input("请输入选项数字: ")if choice == "1":view_stored_data()elif choice == "2":print("[系统] ⏹️ 正在退出程序...")breakelse:print("[错误] ❌ 无效的选项,请重试")if __name__ == "__main__":main()

相关文章:

  • 高光谱相机在生物医学中的应用:病理分析、智慧中医与成分分析
  • 【C++】模版初阶:函数模板、类模板
  • 1.1 java开发的准备工作(入门)
  • 2025/4/23 心得
  • 使用logrotate实现日志轮转
  • 专题二十:路由策略与策略路由
  • 详解 synchronized 关键字【通俗易懂】
  • GPLT-2025年第十届团体程序设计天梯赛总决赛题解(2025天梯赛题解,266分)
  • nginx部署前端项目时,正常访问前端页面成功后,浏览器刷新报404解决访问
  • Android开发常用外部组件及使用指南(下)
  • 【自我介绍前端界面分享】附源码
  • java后端开发day35--集合进阶(四)--双列集合:MapHashMapTreeMap
  • 深入剖析PHP反弹Shell:OSCP场景下的实现、原理与优化
  • sql 根据时间范围获取每日,每月,年月的模版数据
  • MOS管驱动电路以及阻值选取
  • rl中,GRPO损失函数详解。
  • VulnHub-DarkHole_2靶机渗透教程
  • DCAN,ECAN和MCAN的区别
  • 基于SpringBoot的校园二手商品在线交易系统+含项目运行说明文档
  • UniGoal 具身导航 | 通用零样本目标导航 CVPR 2025
  • 澎湃读报丨央媒头版集中刊发社论,庆祝“五一”国际劳动节
  • 乌美签署矿产协议
  • 鲁迅先生儿媳、周海婴先生夫人马新云女士逝世,享年94岁
  • 赵乐际主持十四届全国人大常委会第十五次会议闭幕会并作讲话
  • 美参议院通过新任美国驻华大使任命,外交部回应
  • A股三大股指涨跌互现:3343股收涨,两市成交超1.1万亿元