当前位置: 首页 > news >正文

esp32 挂载mpu6050实现加速度计

使用Nordic uart service传输数据给app。

mpu6050.py

import _thread
import time
import uasyncio as asyncio
from machine import Pin, I2C, PWM
import math# 定义I2C接口 - 尝试使用不同的引脚组合
i2c = I2C(0, scl=Pin(0), sda=Pin(1), freq=400000)  # 常用的I2C引脚组合# MPU6050的I2C地址
MPU6050_ADDR = 0x68
# MPU6050寄存器地址
PWR_MGMT_1 = 0x6B
ACCEL_XOUT_H = 0x3B
GYRO_XOUT_H = 0x43
WHO_AM_I = 0x75# 添加I2C设备扫描函数
def scan_i2c_devices():print("scan I2C bus devices...")devices = i2c.scan()if len(devices) == 0:print("not find I2C device!")#print("请检查:")#print("1. MPU6050传感器是否正确连接")#print("2. I2C引脚配置是否正确")#print("3. 传感器电源是否正常")return Falseelse:print(f"find {len(devices)} I2C device")for device in devices:print(f"device addr: 0x{device:02X}")if MPU6050_ADDR in devices:print(f"find one MPU6050, it's addr:0x{MPU6050_ADDR:02X}")return Trueelse:print(f"can not find MPU6050, addr:0x{MPU6050_ADDR:02X}")return Falsedef mpu6050_init():try:# 先扫描I2C总线,确认设备存在if not scan_i2c_devices():return False# 唤醒MPU6050(默认处于睡眠模式)print("尝试唤醒MPU6050...")i2c.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00')time.sleep_ms(100)# 读取并验证设备IDdevice_id = i2c.readfrom_mem(MPU6050_ADDR, WHO_AM_I, 1)[0]if device_id == 0x68:print(f"MPU6050设备ID验证通过: 0x{device_id:02X}")else:print(f"设备ID不匹配!预期0x68,实际返回0x{device_id:02X}")print("这可能表示:")print("1. 传感器不是MPU6050")print("2. 传感器损坏")print("3. I2C通信受到干扰")return False# 配置采样率和滤波器i2c.writeto_mem(MPU6050_ADDR, 0x19, b'\x07')  # SMPLRT_DIV寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1A, b'\x00')  # CONFIG寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1B, b'\x00')  # GYRO_CONFIG寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1C, b'\x00')  # ACCEL_CONFIG寄存器 (±2g量程)print("MPU6050初始化成功!")return Trueexcept OSError as e:print(f"初始化MPU6050时出错: {e}")print("这通常表示:")print("1. I2C通信失败")print("2. 传感器未正确连接")print("3. 传感器地址不正确")return Falsedef read_word(adr):high = i2c.readfrom_mem(MPU6050_ADDR, adr, 1)[0]low = i2c.readfrom_mem(MPU6050_ADDR, adr + 1, 1)[0]val = (high << 8) + lowreturn valdef read_word_2c(adr):val = read_word(adr)if (val >= 0x8000):return -((65535 - val) + 1)else:return valdef read_accel_x():return read_word_2c(ACCEL_XOUT_H)def read_accel_y():return read_word_2c(ACCEL_XOUT_H + 2)def read_accel_z():return read_word_2c(ACCEL_XOUT_H + 4)def read_gyro_x():return read_word_2c(GYRO_XOUT_H)def read_gyro_y():return read_word_2c(GYRO_XOUT_H + 2)def read_gyro_z():return read_word_2c(GYRO_XOUT_H + 4)def calculate_angles():while True:ax = read_accel_x()ay = read_accel_y()az = read_accel_z()# 计算俯仰角(Pitch)pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi# 计算翻滚角(Roll)roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.piprint(f"Pitch: {pitch:.2f}°, Roll: {roll:.2f}°")time.sleep(0.1)def pwm_thread():# 创建LED控制对象led = PWM(Pin(12), freq=1000)while True:# 渐亮for i in range(0, 1024):led.duty(i)time.sleep_ms(1)# 渐暗for i in range(1023, 0, -1):led.duty(i)time.sleep_ms(1)# 新增:将加速度值打包为字节数组(大端模式)
def pack_accel_data(ax, ay, az):"""将加速度值(short类型)打包为6字节大端格式字节数组"""data = bytearray(6)data[0] = (ax >> 8) & 0xFF  # X轴高字节data[1] = ax & 0xFF          # X轴低字节data[2] = (ay >> 8) & 0xFF  # Y轴高字节data[3] = ay & 0xFF          # Y轴低字节data[4] = (az >> 8) & 0xFF  # Z轴高字节data[5] = az & 0xFF          # Z轴低字节return datadef register_speed_ble_callback(callback):global _ble_callback_ble_callback = callbackasync def acc_read():# 初始化MPU6050if not mpu6050_init():print("MPU6050 init fail, exit process")returnelse:while True:try:ax = read_accel_x()ay = read_accel_y()az = read_accel_z()# 计算俯仰角(Pitch)pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi# 计算翻滚角(Roll)roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.piprint(f"Pitch: {pitch:.2f}, Roll: {roll:.2f}")# 打包并发送加速度数据if _ble_callback:accel_data = pack_accel_data(ax, ay, az)_ble_callback(accel_data)#print(f"发送加速度: X={ax}, Y={ay}, Z={az}")else:print("BLE callback not registered, data not sent")await asyncio.sleep(0.1)except Exception as e:print(f"读取传感器数据失败: {e}")await asyncio.sleep(1)if __name__ == "__main__":print('mpu6050 main')asyncio.run(acc_read())

main.py

import uasyncio as asyncio
import machine
import ubinascii
import time
from ble_advertising import advertising_payload
import mpu6050
# import at24c02
# import CD4067BMT
from micropython import const
import bluetooth
import _thread# 定义LED引脚
led = machine.PWM(machine.Pin(12))
led.freq(1000)# 蓝牙相关常量
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)# 服务和特征的UUID
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_UART_RX = (bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),bluetooth.FLAG_WRITE,
)
_UART_SERVICE = (_UART_UUID,(_UART_TX, _UART_RX),
)# 设备名称服务UUID和特征
_SPEED_AND_CADENCE_SERVICE_UUID = bluetooth.UUID("00001814-0000-1000-8000-00805f9b34fb")  # speedAndCadence Access Service
_SPEED_AND_CADENCE_CHAR_UUID = bluetooth.UUID("00002a67-0000-1000-8000-00805f9b34fb")   # speedAndCadence Characteristic_SPEED_AND_CADENCE_CHAR = (_SPEED_AND_CADENCE_CHAR_UUID,bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,  # 允许通知
)_SPEED_AND_CADENCE_SERVICE = (_SPEED_AND_CADENCE_SERVICE_UUID,(_SPEED_AND_CADENCE_CHAR,),
)# 全局标志
pwm_running = False
program_running = True# 呼吸灯效果函数
def breathing_led():global pwm_runningwhile pwm_running:# 亮度从0到1023逐渐增加print('亮度从 0 到 1023 逐渐增加')for i in range(0, 1024):if not pwm_running:breakled.duty(i)time.sleep_ms(1)# 亮度从1023到0逐渐减小print('亮度从 1023 到 0 逐渐减小')for i in range(1023, -1, -1):if not pwm_running:breakled.duty(i)time.sleep_ms(1)# 蓝牙类
class BLEUART:def __init__(self, ble, name="mpy-uart"):self._ble = bleself._ble.active(True)self._ble.irq(self._irq)# 处理 MAC 地址mac_tuple = ble.config("mac")  # 获取 (addr_type, addr_bytes)print(mac_tuple)mac_bytes = mac_tuple[1]       # 提取字节部分(6字节)mac_str = ubinascii.hexlify(mac_bytes).decode().upper()  # 转换为十六进制字符串short_mac = mac_str[-6:]       # 取后6位(如 "FFDC22")self.current_device_name = f"F-{short_mac}"  # 生成唯一名称:"left-FFDC22"#self.current_device_name = name# ble.gap_set_name(f"right-{short_mac}")  # 示例名称:"right-63E1D5"# 休眠/唤醒self._last_connection_time = time.ticks_ms()  # 记录最后连接时间self._wakeup_pin = machine.Pin(6, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP)  # 配置唤醒引脚self._wakeup_pin.irq(trigger=machine.Pin.IRQ_FALLING, handler=self._wakeup_handler)  # 下降沿触发唤醒self._is_sleeping = False# 注册UART服务和加速度服务((self._handle_tx, self._handle_rx), (self._handle_accel,)) = self._ble.gatts_register_services((_UART_SERVICE, _SPEED_AND_CADENCE_SERVICE))self._is_connected = Falseself._connections = set()self._write_callback = Noneself._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])print(f"Advertising data length: {len(self._payload)} bytes")  # 应 ≤ 31字节self._start_advertising()def _wakeup_handler(self, pin):"""唤醒处理函数"""print("Wakeup key(gpio0) triggered")if self._is_sleeping:print("Wakeup detected from GPIO5")self._is_sleeping = False# 停止呼吸灯线程(需全局标志控制)global ledglobal pwm_runningpwm_running = False# 重新初始化LEDled.deinit()  # 先释放旧资源led = machine.PWM(machine.Pin(12))led.freq(1000)# 恢复广播self._start_advertising()self._last_connection_time = time.ticks_ms()  # 重置连接时间戳def _enter_sleep(self):"""进入休眠模式"""print("Entering sleep mode...")self._stop_advertising()  # 停止广播self._is_sleeping = True# 关闭非必要外设(示例:关闭LED)led.deinit()# 进入深度睡眠,等待唤醒machine.deepsleep()def _check_sleep_condition(self):"""检查是否满足休眠条件:10分钟无连接"""if self._is_connected:return  # 连接中不休眠current_time = time.ticks_ms()elapsed_time = time.ticks_diff(current_time, self._last_connection_time)if elapsed_time >= 600000:  # 10分钟(600000毫秒)self._enter_sleep()def _irq(self, event, data):if event == _IRQ_CENTRAL_CONNECT:conn_handle, _, _ = dataprint("New connection", conn_handle)self._connections.add(conn_handle)self._is_connected = Trueself._stop_advertising()self._last_connection_time = time.ticks_ms()  # 更新最后连接时间elif event == _IRQ_CENTRAL_DISCONNECT:conn_handle, _, _ = dataprint("Disconnected", conn_handle)self._connections.remove(conn_handle)self._is_connected = Falseself._start_advertising()self._last_connection_time = time.ticks_ms()  # 断开时刷新时间戳elif event == _IRQ_GATTS_WRITE:conn_handle, value_handle = dataif value_handle == self._handle_rx:data = self._ble.gatts_read(self._handle_rx)if self._write_callback:self._write_callback(data)def send(self, data):# 创建连接句柄的副本,避免在迭代过程中修改原始集合connections = list(self._connections)for conn_handle in connections:try:self._ble.gatts_notify(conn_handle, self._handle_tx, data)except Exception as e:print(f"Notify failed for conn_handle {conn_handle}: {e}")# 从连接集合中移除无效的句柄if conn_handle in self._connections:self._connections.remove(conn_handle)# 如果没有有效连接了,更新连接状态if not self._connections:self._is_connected = Falseself._start_advertising()# 新增:发送加速度数据的方法def send_acceleration(self, data):"""通过加速度特征值发送数据"""connections = list(self._connections)for conn_handle in connections:try:self._ble.gatts_notify(conn_handle, self._handle_accel, data)except Exception as e:print(f"加速度数据发送失败: {e}")if conn_handle in self._connections:self._connections.remove(conn_handle)if not self._connections:self._is_connected = Falseself._start_advertising()def is_connected(self):return len(self._connections) > 0def _start_advertising(self, interval_us=500000):"""开始广播"""if not self._is_sleeping:print(f"Starting advertising with name: {self.current_device_name}")self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])self._ble.gap_advertise(interval_us, adv_data=self._payload)else:print("Still sleeping, skip advertising")def _stop_advertising(self):"""停止广播"""print("Stopping advertising")self._ble.gap_advertise(None)def on_write(self, callback):self._write_callback = callbackdef _change_device_name(self, new_name):# 限制名称长度if len(new_name) > 20:new_name = new_name[:20]self.current_device_name = new_name# 如果没有连接,重新开始广播使用新名称if not self.is_connected():self._start_advertising()print(f"Device name changed to: {new_name}")# 定义数据发送函数
def send_acceleration_data_via_ble(data):if uart.is_connected():uart.send_acceleration(data)else:print("BLE not connected, data not sent")# 新增全局共享标志(控制扫描线程启停)
flag_scan = True   # 默认开启扫描
scan_lock = _thread.allocate_lock()  # 添加锁保证线程安全# 蓝牙控制回调函数
def ble_callback(data):global pwm_running, program_running, flag_scancommand = data.decode().strip().lower()print(f"Received command: {command}")if command == 'on':with scan_lock:flag_scan = Trueif not pwm_running:pwm_running = True#_thread.start_new_thread(breathing_led, ())elif command == 'off':with scan_lock:flag_scan = Falseif pwm_running:pwm_running = Falseled.duty(0)elif command == 'stop':program_running = False# 初始化蓝牙
ble = bluetooth.BLE()
uart = BLEUART(ble)
uart.on_write(ble_callback)# 主循环
#while program_running:
#    # if not uart.is_connected() and not uart._is_sleeping:
#    #    uart._check_sleep_condition()  # 主循环中定期检查休眠条件
#    time.sleep(1)async def peripheral_task():"""蓝牙外设任务,处理广告和连接"""while True:print("Peripheral task running...")await asyncio.sleep(1)async def central_task():"""蓝牙中心任务,处理扫描和连接"""while True:print("Central task running...")await asyncio.sleep(1)async def main():"""主函数,负责协调所有异步任务"""try:# 创建任务peripheral = asyncio.create_task(peripheral_task())central = asyncio.create_task(central_task())acc_task = asyncio.create_task(mpu6050.acc_read())# 等待任务完成(这里不会完成,因为是无限循环)await asyncio.gather(peripheral, central, acc_task)except KeyboardInterrupt:print("Interrupted by user")finally:# 清理工作print("Cleaning up...")# 注册回调函数到matrixScan模块
mpu6050.register_speed_ble_callback(send_acceleration_data_via_ble)
if __name__ == "__main__":print('main')# 启动主事件循环asyncio.run(main())
http://www.dtcms.com/a/295433.html

相关文章:

  • MiniCPM 学习部署实战 vlm
  • OSPF开放式最短路径优先
  • 【Git知识】Git 常用知识集合之基础--分支系统与 Tag 标签机制
  • YOLO算法演进综述:从YOLOv1到YOLOv13的技术突破与应用实践,一文掌握YOLO家族全部算法!
  • 2025最新MySQL面试题实战记录,互联网公司常问题目
  • 如何在 Ubuntu 24.04 服务器或桌面版上安装和使用 gedit
  • Spring AI Alibaba 快速入门指南(适合初学者)
  • 【C++】简单学——list类
  • 磁性材料如何破解服务器电源高频损耗难题?
  • Unity UI的未来之路:从UGUI到UI Toolkit的架构演进与特性剖析(3)
  • Element-UI 解决省市级数据
  • Map接口-实现类HashMap
  • Hive常用函数
  • C++STL系列之set和map系列
  • Spring AI - 函数调用演示:AI算术运算助手
  • 计算机网络知识点总结 (2)
  • Vue3 面试题及详细答案120道(91-105 )
  • 02.面向对象的特性
  • 斐波那契数列策略
  • 深入UniApp X:掌握复杂列表与全局状态管理的艺术
  • 光伏电站巡检清扫飞行机器人设计cad【6张】三维图+设计说明书
  • go项目实战二
  • 支持OCR和AI解释的Web PDF阅读器:解决大文档阅读难题
  • 飞腾D3000麒麟信安系统下配置intel I210 MAC
  • 最新免费使用Claude Code指南(Windows macOS/Linux)
  • 使用ffmpeg转码h265后mac默认播放器不支持问题
  • 快速启用 JMeter(macOS Automator 创建 JMeter 脚本)
  • 【MAC电脑系统变量管理】
  • Mac电脑开发Python(基于vs code)
  • 闲庭信步使用图像验证平台加速FPGA的开发:第三十三课——车牌识别的FPGA实现(5)车牌字符的识别