当前位置: 首页 > news >正文

【物联网】BLE 系统架构全景图

BLE 开发完整指南:从Arduino到树莓派的完整解决方案

一、BLE 系统架构全景图

系统组成

Arduino Nano 33 IoT (Peripheral) ←BLE→ Raspberry Pi (Central) ←MQTT→ AWS Cloud↑                              ↑传感器数据采集                     数据聚合与转发执行器控制                        协议转换

二、Arduino BLE 代码详解

1. BLE_periodic_update.ino - 周期性数据更新

// 作用:定期更新特征值并通过通知推送
// 应用:实时传感器数据推送#include <ArduinoBLE.h>// 自定义UUID(重要:使用唯一UUID)
#define SERVICE_UUID        "19B10000-E8F2-537E-4F6C-D104768A1214"
#define CHARACTERISTIC_UUID "19B10001-E8F2-537E-4F6C-D104768A1214"BLEService customService(SERVICE_UUID);                
BLECharacteristic readOnlyCharacteristic(CHARACTERISTIC_UUID,BLERead | BLENotify, 20  // 可读+通知权限,最大20字节
);unsigned long lastUpdate = 0;
const unsigned long updateInterval = 1000; // 1秒更新间隔
int counter = 0;void setup() {Serial.begin(9600);// BLE初始化if (!BLE.begin()) {Serial.println("Failed to initialize BLE!");while (true);}// 设置设备标识(必须唯一)BLE.setDeviceName("YourSensorNode");BLE.setLocalName("YourSensorNode");// 配置服务与特征BLE.setAdvertisedService(customService);customService.addCharacteristic(readOnlyCharacteristic);BLE.addService(customService);// 设置初始值readOnlyCharacteristic.writeValue("Init");// 开始广播BLE.advertise();Serial.println("BLE advertising with periodic updates...");
}void loop() {BLE.poll();  // 必须调用,处理BLE事件// 定期更新数据if (millis() - lastUpdate > updateInterval) {lastUpdate = millis();counter++;// 生成新数据(中替换为传感器数据)String newValue = "Update #" + String(counter);readOnlyCharacteristic.writeValue(newValue.c_str());Serial.print("Updated: ");Serial.println(newValue);}
}

2. 改造应用

// 替换数据生成部分为传感器读取
void updateSensorData() {float temperature = dht.readTemperature();float humidity = dht.readHumidity();// JSON格式数据String sensorData = "{\"temp\":" + String(temperature) + ",\"humidity\":" + String(humidity) + "}";readOnlyCharacteristic.writeValue(sensorData.c_str());Serial.println("Sensor data updated: " + sensorData);
}

三、树莓派 Python BLE 客户端详解

1. bleak_scan.py - BLE设备扫描

# 作用:扫描周围BLE设备
# 应用:发现Arduino设备import asyncio
from bleak import BleakScannerasync def scan_ble_devices():print("Scanning for BLE devices...")devices = await BleakScanner.discover(timeout=5.0)if not devices:print("No BLE devices found.")else:print("Found devices:")for d in devices:print(f"{d.address} - {d.name}")# 运行扫描
asyncio.run(scan_ble_devices())

2. bleak_name_conn.py - 按名称连接

# 作用:通过设备名称连接并读取特征
# 应用:连接特定Arduino设备import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称片段
TARGET_NAME_FRAGMENT = "ArduinoSensor"  # 改为你的设备名# 特征UUID(必须与Arduino一致)
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"async def scan_and_connect():print("Scanning for BLE devices...")devices = await BleakScanner.discover(timeout=5.0)# 查找目标设备target_device = Nonefor d in devices:if d.name and TARGET_NAME_FRAGMENT in d.name:target_device = dbreakif not target_device:print(f"No device found with name containing '{TARGET_NAME_FRAGMENT}'.")returnprint(f"Found target: {target_device.name} ({target_device.address})")# 连接并读取数据async with BleakClient(target_device.address) as client:if client.is_connected:print("Connected successfully.")try:# 读取特征值value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"Raw value: {value}")# 尝试解码为字符串try:decoded_value = value.decode('utf-8')print("Decoded value:", decoded_value)except:print("Value is not UTF-8 string")except Exception as e:print("Failed to read characteristic:", e)else:print("Failed to connect.")asyncio.run(scan_and_connect())

3. bleak_rec_notify.py - 接收通知

# 作用:订阅特征通知并实时接收数据
# 应用:实时接收传感器数据import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称片段
TARGET_NAME_FRAGMENT = "ArduinoSensor"# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# 通知处理函数
def handle_notification(sender, data):print(f"[Notification] From {sender}: {data}")try:# 解码JSON数据(需要)decoded_data = data.decode('utf-8')print("Decoded:", decoded_data)# 在这里添加MQTT发布逻辑# mqtt_client.publish("sensors/dht", decoded_data)except Exception as e:print("Decoding error:", e)async def scan_connect_and_subscribe():print("Scanning for devices...")devices = await BleakScanner.discover(timeout=5.0)# 查找设备target_device = Nonefor d in devices:if d.name and TARGET_NAME_FRAGMENT in d.name:target_device = dbreakif not target_device:print(f"No device found with name containing '{TARGET_NAME_FRAGMENT}'.")returnprint(f"Found device: {target_device.name} ({target_device.address})")# 连接并订阅通知async with BleakClient(target_device.address) as client:if client.is_connected:print("Connected successfully.")try:# 启动通知订阅await client.start_notify(CHARACTERISTIC_UUID, handle_notification)print("Subscribed to notifications. Press Ctrl+C to stop.")# 保持连接,持续接收数据while True:await asyncio.sleep(1)except Exception as e:print("Failed to subscribe:", e)else:print("Failed to connect.")try:asyncio.run(scan_connect_and_subscribe())
except KeyboardInterrupt:print("Program stopped by user.")

4. bleak_name_write.py - 写入特征值

# 作用:向特征写入数据
# 应用:发送控制命令到Arduinoimport asyncio
from bleak import BleakClient# 设备地址(通过扫描获取)
DEVICE_ADDRESS = "ABC6F8AB-50BE-AEA6-DD14-83B9340D3C96"# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"async def control_device():async with BleakClient(DEVICE_ADDRESS) as client:print(f"Connected: {client.is_connected}")try:# 读取当前值value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"Current value: {value.decode('utf-8')}")# 写入控制命令command = "LED_ON"  # 改为你的命令await client.write_gatt_char(CHARACTERISTIC_UUID, command.encode('utf-8'))print(f"Sent command: {command}")# 验证写入结果new_value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"New value: {new_value.decode('utf-8')}")except Exception as e:print(f"Error: {e}")asyncio.run(control_device())

5. bleak_multiconn_notify.py - 多设备连接

# 作用:同时连接多个设备并接收通知
# 应用:聚合多个传感器数据import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称关键词
TARGET_NAME_KEYWORDS = ["SensorNode1", "SensorNode2"]# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# 设备客户端字典
clients = {}# 创建通知处理器
def make_notification_handler(name):def handler(sender, data):try:decoded_data = data.decode('utf-8')print(f"[{name}] {decoded_data}")# 在这里添加数据聚合逻辑except:print(f"[{name}] {data}")return handler# 连接并订阅单个设备
async def connect_and_subscribe(device, name):client = BleakClient(device.address)await client.connect()print(f"Connected to {name}")# 订阅通知await client.start_notify(CHARACTERISTIC_UUID, make_notification_handler(name))clients[name] = client# 扫描并连接所有匹配设备
async def scan_and_connect_all():print("Scanning for devices...")devices = await BleakScanner.discover(timeout=5.0)tasks = []for d in devices:if d.name:for keyword in TARGET_NAME_KEYWORDS:if keyword in d.name:tasks.append(connect_and_subscribe(d, d.name))breakif not tasks:print("No matching devices found.")returnprint(f"Connecting to {len(tasks)} devices...")await asyncio.gather(*tasks)print("All devices connected. Listening for notifications...")try:# 保持运行,持续接收数据while True:await asyncio.sleep(1)except KeyboardInterrupt:print("Disconnecting...")for client in clients.values():await client.disconnect()asyncio.run(scan_and_connect_all())

四、在完整应用方案

1. Arduino端改造(传感器数据+命令控制)

// 在BLE_periodic_update基础上改造
#define DHT_CHAR_UUID    "19B10001-E8F2-537E-4F6C-D104768A1214"  // 传感器数据
#define CMD_CHAR_UUID    "19B10002-E8F2-537E-4F6C-D104768A1214"  // 命令控制// 添加命令特征
BLEStringCharacteristic cmdCharacteristic(CMD_CHAR_UUID,BLEWrite,  // 只写权限20
);void setup() {// ...其他初始化代码// 添加命令特征customService.addCharacteristic(cmdCharacteristic);// 设置初始命令值cmdCharacteristic.writeValue("READY");
}void loop() {BLE.poll();// 处理命令if (cmdCharacteristic.written()) {String command = cmdCharacteristic.value();processCommand(command);}// 定期发送传感器数据if (millis() - lastUpdate > updateInterval) {updateSensorData();lastUpdate = millis();}
}void processCommand(String command) {if (command == "LED_ON") {digitalWrite(LED_PIN, HIGH);Serial.println("LED turned ON");} else if (command == "LED_OFF") {digitalWrite(LED_PIN, LOW);Serial.println("LED turned OFF");}
}

2. 树莓派端完整方案(BLE + MQTT)

# bleread_mqttpub.py - 整合BLE读取和MQTT发布
import asyncio
from bleak import BleakScanner, BleakClient
import paho.mqtt.client as mqtt
import json# BLE配置
TARGET_NAME = "ArduinoSensor"
SENSOR_CHAR_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# MQTT配置
MQTT_HOST = "your-ec2-ip"
MQTT_TOPIC = "ifn649/sensors/dht"# MQTT客户端初始化
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883)def on_notification(sender, data):try:# 解析传感器数据sensor_data = data.decode('utf-8')print(f"Received: {sensor_data}")# 发布到MQTTmqtt_client.publish(MQTT_TOPIC, sensor_data)print(f"Published to MQTT: {sensor_data}")except Exception as e:print(f"Error processing data: {e}")async def main():# 扫描并连接设备devices = await BleakScanner.discover()target_device = Nonefor d in devices:if d.name and TARGET_NAME in d.name:target_device = dbreakif not target_device:print("Target device not found")return# 连接并订阅通知async with BleakClient(target_device.address) as client:await client.start_notify(SENSOR_CHAR_UUID, on_notification)print("Connected and subscribed. Press Ctrl+C to stop.")# 保持运行while True:await asyncio.sleep(1)# 运行主程序
try:asyncio.run(main())
except KeyboardInterrupt:print("Program stopped")mqtt_client.disconnect()

3. 命令控制端(MQTT订阅 + BLE写入)

# mqttsub_blewrite.py - MQTT订阅转BLE写入
import asyncio
from bleak import BleakScanner, BleakClient
import paho.mqtt.client as mqtt# BLE配置
TARGET_NAME = "ArduinoSensor"
CMD_CHAR_UUID = "19B10002-E8F2-537E-4F6C-D104768A1214"# MQTT配置
MQTT_HOST = "your-ec2-ip"
MQTT_TOPIC = "ifn649/commands/led"# BLE设备地址缓存
ble_device_address = Noneasync def ble_write_command(command):if not ble_device_address:print("BLE device not found")returntry:async with BleakClient(ble_device_address) as client:await client.write_gatt_char(CMD_CHAR_UUID, command.encode('utf-8'))print(f"Command sent: {command}")except Exception as e:print(f"BLE write error: {e}")def on_mqtt_message(client, userdata, msg):command = msg.payload.decode('utf-8')print(f"MQTT command received: {command}")# 异步执行BLE写入asyncio.create_task(ble_write_command(command))async def discover_ble_device():global ble_device_addressdevices = await BleakScanner.discover()for d in devices:if d.name and TARGET_NAME in d.name:ble_device_address = d.addressprint(f"Found BLE device: {d.name} ({d.address})")return Trueprint("BLE device not found")return Falseasync def main():# 发现BLE设备if not await discover_ble_device():return# 设置MQTT客户端mqtt_client = mqtt.Client()mqtt_client.on_message = on_mqtt_messagemqtt_client.connect(MQTT_HOST, 1883)mqtt_client.subscribe(MQTT_TOPIC)print("MQTT subscriber started. Waiting for commands...")mqtt_client.loop_forever()# 启动程序
asyncio.run(main())

五、调试与最佳实践

1. 调试技巧

# 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)# 异常处理
try:await client.read_gatt_char(CHARACTERISTIC_UUID)
except Exception as e:print(f"Error: {e}")# 重连逻辑

2. 性能优化

# 使用连接池管理多个设备连接
# 设置合理的超时时间
# 使用异步编程避免阻塞

3. 错误恢复

# 自动重连机制
async def resilient_connect(address, max_retries=3):for attempt in range(max_retries):try:async with BleakClient(address) as client:return clientexcept Exception as e:print(f"Connection attempt {attempt+1} failed: {e}")await asyncio.sleep(2)return None
http://www.dtcms.com/a/358490.html

相关文章:

  • 常量指针与指针常量习题(一)
  • Swift 解法详解:LeetCode 367《有效的完全平方数》
  • Notepad++使用技巧1
  • 2025-08-18面试题(nginx,mysql,zabbix为主)
  • C#正则表达式与用法
  • unity学习——视觉小说开发(二)
  • JsMind 常用配置项
  • Qt中的锁(1)
  • AFSIM仿真工具介绍与源码编译
  • Isaac Lab Newton 人形机器人强化学习 Sim2Real 训练与部署
  • uniapp监听物理返回按钮事件
  • 软考 系统架构设计师系列知识点之杂项集萃(136)
  • 将 Logits 得分转换为概率,如何计算
  • SRE命令行兵器谱之三:grep - 日志海洋中的“精确制导”
  • JavaWeb前端06(ElementPlus快速构建网页)
  • IDM手机端,速度能提高6倍!
  • 消息队列核心技术解析与应用场景
  • JAVA EE初阶 4:文件操作和IO
  • 使用 SVM(支持向量机)进行图像分类:从读取图像到训练与分类的完整流程
  • Python API接口实战指南:从入门到精通
  • HarmonyOS三方库的使用
  • Java SpringAI应用开发面试全流程解析:RAG、流式推理与企业落地
  • 【Java工程师面试全攻略】Day13:云原生架构与Service Mesh深度解析
  • 防火墙技术(二):安全区域
  • 【Linux】系统部分——软硬链接动静态库的使用
  • Tomcat 企业级运维实战系列(四):Tomcat 企业级监控
  • 每日Java并发面试系列(5):基础篇(线程池的核心原理是什么、线程池大小设置为多少更合适、线程池哪几种类型?ThreadLocal为什么会导致内存泄漏?)
  • Tomcat 企业级运维实战系列(三):Tomcat 配置解析与集群化部署
  • Qt实战:如何打开摄像头并实现视频的实时预览
  • 生成式 AI 重构内容生产:效率提升背后的创作版权边界争议