Python 物联网(IoT)与边缘计算开发实战(1)
Python 物联网(IoT)与边缘计算开发实战
https://www.python.org/static/community_logos/python-logo-master-v3-TM.png
物联网基础与硬件交互
Raspberry Pi GPIO控制
python
import RPi.GPIO as GPIO
import time
# 设置GPIO模式
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# 定义引脚
LED_PIN = 17
BUTTON_PIN = 18
# 初始化引脚
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def blink_led(times, speed=0.5):
"""LED闪烁效果"""
for _ in range(times):
GPIO.output(LED_PIN, GPIO.HIGH)
time.sleep(speed)
GPIO.output(LED_PIN, GPIO.LOW)
time.sleep(speed)
try:
print("按下按钮控制LED (Ctrl+C退出)")
while True:
if GPIO.input(BUTTON_PIN) == GPIO.LOW:
print("按钮按下 - LED闪烁")
blink_led(3, 0.3)
time.sleep(0.5) # 防抖延迟
finally:
GPIO.cleanup() # 清理GPIO设置
https://www.raspberrypi.com/documentation/computers/images/GPIO-Pinout-Diagram-2.png
传感器数据采集
python
import Adafruit_DHT
import time
# 设置传感器类型和引脚
DHT_SENSOR = Adafruit_DHT.DHT22
DHT_PIN = 4
def read_sensor():
"""读取温湿度传感器数据"""
humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
if humidity is not None and temperature is not None:
return {
'temperature': round(temperature, 1),
'humidity': round(humidity, 1),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
else:
print("传感器读取失败!")
return None
# 每5秒读取一次数据
while True:
sensor_data = read_sensor()
if sensor_data:
print(f"温度: {sensor_data['temperature']}°C, 湿度: {sensor_data['humidity']}%")
time.sleep(5)
物联网通信协议
MQTT协议实现
python
import paho.mqtt.client as mqtt
import json
import time
# MQTT配置
MQTT_BROKER = "broker.example.com"
MQTT_PORT = 1883
MQTT_TOPIC_PUB = "sensor/data"
MQTT_TOPIC_SUB = "sensor/control"
CLIENT_ID = "python-iot-client"
# 传感器模拟数据
def get_sensor_data():
return {
"device_id": CLIENT_ID,
"temperature": 25.5 + (time.time() % 3),
"humidity": 50 + (time.time() % 10),
"timestamp": int(time.time())
}
# 连接回调
def on_connect(client, userdata, flags, rc):
print(f"连接MQTT服务器,返回码: {rc}")
client.subscribe(MQTT_TOPIC_SUB)
# 消息接收回调
def on_message(client, userdata, msg):
payload = msg.payload.decode()
print(f"收到消息 [{msg.topic}]: {payload}")
try:
command = json.loads(payload)
if command.get('action') == 'reboot':
print("执行重启指令...")
# 这里添加重启逻辑
except json.JSONDecodeError:
print("无效的JSON消息")
# 创建MQTT客户端
client = mqtt.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_message = on_message
# 连接并启动循环
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
try:
while True:
# 发布传感器数据
sensor_data = get_sensor_data()
client.publish(MQTT_TOPIC_PUB, json.dumps(sensor_data))
print(f"发布数据: {sensor_data}")
time.sleep(10)
except KeyboardInterrupt:
print("断开连接...")
client.loop_stop()
client.disconnect()
https://mqtt.org/assets/img/mqtt-publish-subscribe.png
CoAP协议实现
python
from aiocoap import *
import asyncio
import time
async def coap_server():
"""CoAP服务器实现"""
protocol = await Context.create_server_context(CoAPServer())
# 持续运行
print("CoAP服务器启动...")
await asyncio.get_running_loop().create_future()
class CoAPServer(Resource):
"""CoAP资源处理"""
def __init__(self):
super().__init__()
self.sensor_data = {
"temperature": 25.0,
"humidity": 50.0
}
async def render_get(self, request):
"""处理GET请求"""
self.update_sensor_data()
payload = json.dumps(self.sensor_data).encode('utf-8')
return Message(payload=payload)
async def render_post(self, request):
"""处理POST请求"""
try:
payload = json.loads(request.payload.decode('utf-8'))
if 'set_temp' in payload:
self.sensor_data['temperature'] = payload['set_temp']
if 'set_humidity' in payload:
self.sensor_data['humidity'] = payload['set_humidity']
return Message(code=CHANGED,
payload=b"Settings updated")
except:
return Message(code=BAD_REQUEST,
payload=b"Invalid request")
def update_sensor_data(self):
"""更新传感器数据(模拟)"""
self.sensor_data = {
"temperature": 25.0 + (time.time() % 3),
"humidity": 50.0 + (time.time() % 10),
"timestamp": int(time.time())
}
async def coap_client():
"""CoAP客户端实现"""
protocol = await Context.create_client_context()
# 获取数据
request = Message(code=GET, uri='coap://localhost/sensor')
try:
response = await protocol.request(request).response
print(f"收到响应: {response.payload.decode()}")
except Exception as e:
print(f"请求失败: {e}")
# 设置数据
payload = {"set_temp": 26.5, "set_humidity": 55.0}
request = Message(code=POST,
payload=json.dumps(payload).encode(),
uri='coap://localhost/sensor')
try:
response = await protocol.request(request).response
print(f"设置响应: {response.payload.decode()}")
except Exception as e:
print(f"设置失败: {e}")
# 运行示例
async def main():
server_task = asyncio.create_task(coap_server())
await asyncio.sleep(1) # 等待服务器启动
await coap_client()
server_task.cancel()
asyncio.run(main())
边缘计算框架
使用MicroPython
python
# ESP32 MicroPython示例
import machine
import network
import urequests
import ujson
from time import sleep
# 配置WiFi
WIFI_SSID = "your_wifi"
WIFI_PASS = "your_password"
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("连接WiFi...")
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASS)
while not sta_if.isconnected():
pass
print("网络配置:", sta_if.ifconfig())
# 读取传感器(模拟)
def read_sensor():
return {
"temp": 25 + machine.rng() % 5,
"humidity": 50 + machine.rng() % 10
}
# 边缘计算处理
def process_data(data):
# 简单异常检测
if data['temp'] > 30 or data['humidity'] > 80:
data['alert'] = True
else:
data['alert'] = False
return data
# 主循环
connect_wifi()
while True:
sensor_data = read_sensor()
processed_data = process_data(sensor_data)
if processed_data['alert']:
print("警报状态! 发送数据...")
response = urequests.post(
"api.example.com/alerts",
json=processed_data,
headers={'Content-Type': 'application/json'}
)
print("响应:", response.text)
response.close()
sleep(60) # 每分钟检查一次
使用EdgeX Foundry
python
import requests
import json
import time
# EdgeX配置
EDGEX_URL = "localhost:48080/api/v1"
DEVICE_NAME = "temperature-sensor"
def register_device():
"""注册设备到EdgeX"""
device = {
"name": DEVICE_NAME,
"description": "Python IoT温度传感器",
"adminState": "UNLOCKED",
"operatingState": "ENABLED",
"protocols": {
"other": {
"Address": "virtual01",
"Protocol": "300"
}
}
}
response = requests.post(
f"{EDGEX_URL}/device",
json=device,
headers={"Content-Type": "application/json"}
)
return response.json()
def send_reading(value):
"""发送传感器读数"""
reading = {
"device": DEVICE_NAME,
"readings": [
{
"name": "Temperature",
"value": str(value)
}
]
}
response = requests.post(
f"{EDGEX_URL}/reading",
json=reading,
headers={"Content-Type": "application/json"}
)
return response.status_code == 200
# 模拟设备运行
print("注册设备...")
register_device()
print("开始发送传感器数据...")
while True:
temp = 20 + (time.time() % 10) # 模拟温度波动
if send_reading(temp):
print(f"发送温度数据: {temp}°C")
else:
print("发送数据失败")
time.sleep(5)
https://docs.edgexfoundry.org/1.3/_images/EdgeX_arch.png
物联网数据处理
实时数据流处理
python
import pyarrow.flight as flight
import pandas as pd
import numpy as np
class FlightServer(flight.FlightServerBase):
"""Arrow Flight服务器实现"""
def __init__(self, location):
super().__init__(location)
self.data = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=100, freq='s'),
'value': np.random.randn(100)
})
def do_get(self, context, ticket):
"""处理数据获取请求"""
df = self.data[self.data['timestamp'] > pd.Timestamp.now() - pd.Timedelta('1min')]
table = pa.Table.from_pandas(df)
return flight.RecordBatchStream(table)
class DataProcessor:
"""实时数据处理"""
def __init__(self, server_url):
self.client = flight.FlightClient(server_url)
def process_stream(self):
"""处理实时数据流"""
while True:
try:
# 获取最近1分钟数据
descriptor = flight.FlightDescriptor.for_command(b"latest_data")
flight_info = self.client.get_flight_info(descriptor)
for endpoint in flight_info.endpoints:
for location in endpoint.locations:
reader = self.client.do_get(location.ticket)
batch = reader.read_all()
df = batch.to_pandas()
# 实时分析
if not df.empty:
avg = df['value'].mean()
max_val = df['value'].max()
print(f"平均值: {avg:.2f}, 最大值: {max_val:.2f}")
except Exception as e:
print(f"处理错误: {e}")
time.sleep(10)
# 启动服务器
server = FlightServer("grpc://0.0.0.0:8815")
server_thread = threading.Thread(target=server.serve)
server_thread.start()
# 启动客户端处理
processor = DataProcessor("grpc://localhost:8815")
processor.process_stream()
时序数据库集成
python
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
class InfluxDBManager:
"""InfluxDB时序数据库管理"""
def __init__(self, url, token, org, bucket):
self.client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
self.bucket = bucket
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def write_data(self, measurement, tags, fields):
"""写入数据点"""
point = influxdb_client.Point(measurement)
# 添加标签
for tag_key, tag_value in tags.items():
point.tag(tag_key, tag_value)
# 添加字段
for field_key, field_value in fields.items():
point.field(field_key, field_value)
# 写入数据库
self.write_api.write(bucket=self.bucket, record=point)
def query_data(self, query):
"""查询数据"""
result = self.query_api.query(query)
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'measurement': record.get_measurement(),
**record.values
})
return records
# 使用示例
influx_mgr = InfluxDBManager(
url="localhost:8086",
token="your-token",
org="your-org",
bucket="iot-data"
)
# 写入传感器数据
influx_mgr.write_data(
measurement="temperature",
tags={"location": "room1", "device": "sensor1"},
fields={"value": 25.3}
)
# 查询最近1小时数据
query = """
from(bucket: "iot-data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
"""
data = influx_mgr.query_data(query)
print("查询结果:", data)
物联网安全
设备认证与加密
python
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
import os
class IoTDeviceSecurity:
"""物联网设备安全类"""
def __init__(self):
# 生成ECDSA密钥对
self.private_key = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
self.public_key = self.private_key.public_key()
# 预共享密钥(实际应用中应从安全存储获取)
self.shared_secret = os.urandom(32)
def get_public_key_pem(self):
"""获取PEM格式的公钥"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def sign_data(self, data):
"""签名数据"""
if isinstance(data, str):
data = data.encode('utf-8')
signature = self.private_key.sign(
data,
ec.ECDSA(hashes.SHA256())
)
return signature
def verify_signature(self, public_key_pem, data, signature):
"""验证签名"""
public_key = serialization.load_pem_public_key(
public_key_pem,
backend=default_backend()
)
if isinstance(data, str):
data = data.encode('utf-8')
try:
public_key.verify(
signature,
data,
ec.ECDSA(hashes.SHA256())
)
return True
except:
return False
def derive_session_key(self, peer_public_key_pem):
"""派生会话密钥"""
peer_public_key = serialization.load_pem_public_key(
peer_public_key_pem,
backend=default_backend()
)
shared_key = self.private_key.exchange(
ec.ECDH(), peer_public_key
)
# 使用HKDF派生密钥
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'session-key',
backend=default_backend()
).derive(shared_key)
return derived_key
# 使用示例
device1 = IoTDeviceSecurity()
device2 = IoTDeviceSecurity()
# 交换公钥
device1_pubkey = device1.get_public_key_pem()
device2_pubkey = device2.get_public_key_pem()
# 派生会话密钥
session_key1 = device1.derive_session_key(device2_pubkey)
session_key2 = device2.derive_session_key(device1_pubkey)
print("会话密钥匹配:", session_key1 == session_key2)
安全固件更新
python
import hashlib
import hmac
import requests
import tempfile
import subprocess
class SecureFirmwareUpdater:
"""安全固件更新"""
def __init__(self, device_id, secret_key, update_server):
self.device_id = device_id
self.secret_key = secret_key.encode('utf-8')
self.update_server = update_server
def check_update(self):
"""检查更新"""
# 创建认证签名
nonce = os.urandom(16).hex()
message = f"{self.device_id}:{nonce}".encode('utf-8')
signature = hmac.new(
self.secret_key,
message,
hashlib.sha256
).hexdigest()
# 发送认证请求
response = requests.get(
f"{self.update_server}/check-update",
headers={
"Device-ID": self.device_id,
"Nonce": nonce,
"Signature": signature
}
)
if response.status_code == 200:
return response.json()
else:
print(f"检查更新失败: {response.text}")
return None
def download_firmware(self, version, checksum):
"""下载固件"""
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(delete=False)
try:
# 流式下载固件
response = requests.get(
f"{self.update_server}/firmware/{version}",
stream=True
)
# 计算下载文件的哈希
sha256 = hashlib.sha256()
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
sha256.update(chunk)
temp_file.close()
# 验证校验和
if sha256.hexdigest() != checksum:
os.unlink(temp_file.name)
raise ValueError("固件校验和不匹配")
return temp_file.name
except:
os.unlink(temp_file.name)
raise
def apply_update(self, firmware_path):
"""应用更新"""
# 验证固件签名(示例)
if not self.verify_firmware(firmware_path):
raise ValueError("固件签名验证失败")
# 执行更新脚本(实际实现依平台而定)
result = subprocess.run(
["/bin/sh", firmware_path],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"更新失败: {result.stderr}")
return False
print("固件更新成功!")
return True
def verify_firmware(self, firmware_path):
"""验证固件签名"""
# 这里应实现实际的签名验证逻辑
# 示例中仅检查文件大小
return os.path.getsize(firmware_path) > 0
# 使用示例
updater = SecureFirmwareUpdater(
device_id="device-123",
secret_key="your-secret-key",
update_server="firmware.example.com"
)
update_info = updater.check_update()
if update_info and update_info['available']:
print(f"发现新版本: {update_info['version']}")
try:
firmware_path = updater.download_firmware(
update_info['version'],
update_info['checksum']
)
if updater.apply_update(firmware_path):
print("设备需要重启以完成更新")
except Exception as e:
print(f"更新失败: {str(e)}")
else:
print("设备固件已是最新")
物联网可视化
实时仪表盘
python
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import pandas as pd
import random
from datetime import datetime
# 创建Dash应用
app = dash.Dash(__name__)
# 初始数据
initial_data = pd.DataFrame({
'timestamp': [datetime.now()],
'temperature': [25.0],
'humidity': [50.0]
})
# 应用布局
app.layout = html.Div([
html.H1("物联网设备监控仪表盘"),
dcc.Interval(
id='interval-component',
interval=5*1000, # 5秒
n_intervals=0
),
html.Div([
html.Div([
dcc.Graph(id='temp-gauge'),
html.H3("当前温度", style={'text-align': 'center'})
], className="six columns"),
html.Div([
dcc.Graph(id='humidity-gauge'),
html.H3("当前湿度", style={'text-align': 'center'})
], className="six columns")
], className="row"),
dcc.Graph(id='temp-trend'),
html.Div(id='alerts-container')
], className="container")
# 回调函数 - 更新数据
@app.callback(
Output('temp-gauge', 'figure'),
Output('humidity-gauge', 'figure'),
Output('temp-trend', 'figure'),
Output('alerts-container', 'children'),
Input('interval-component', 'n_intervals')
)
def update_metrics(n):
# 模拟新数据
new_data = {
'timestamp': datetime.now(),
'temperature': 25 + random.uniform(-2, 2),
'humidity': 50 + random.uniform(-5, 5)
}
# 更新数据集
global initial_data
initial_data = initial_data.append(new_data, ignore_index=True)
# 温度仪表
temp_gauge = go.Figure(go.Indicator(
mode="gauge+number",
value=new_data['temperature'],
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [None, 40]},
'steps': [
{'range': [0, 20], 'color': "lightgray"},
{'range': [20, 30], 'color': "gray"},
{'range': [30, 40], 'color': "red"}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': 30
}
}
))
temp_gauge.update_layout(height=300, margin=dict(t=0, b=0))
# 湿度仪表
humidity_gauge = go.Figure(go.Indicator(
mode="gauge+number",
value=new_data['humidity'],
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 100]},
'steps': [
{'range': [0, 30], 'color': "red"},
{'range': [30, 70], 'color': "lightgray"},
{'range': [70, 100], 'color': "blue"}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': 70
}
}
))
humidity_gauge.update_layout(height=300, margin=dict(t=0, b=0))
# 温度趋势图
temp_trend = go.Figure()
temp_trend.add_trace(go.Scatter(
x=initial_data['timestamp'],
y=initial_data['temperature'],
name='温度',
line=dict(color='red', width=2)
))
temp_trend.add_trace(go.Scatter(
x=initial_data['timestamp'],
y=initial_data['humidity'],
name='湿度',
yaxis='y2',
line=dict(color='blue', width=2)
))
temp_trend.update_layout(
yaxis=dict(title='温度 (°C)'),
yaxis2=dict(
title='湿度 (%)',
overlaying='y',
side='right'
),
hovermode="x unified"
)
# 警报信息
alerts = []
if new_data['temperature'] > 28:
alerts.append(html.Div(
f"高温警报! 当前温度: {new_data['temperature']:.1f}°C",
style={
'color': 'white',
'background': 'red',
'padding': '10px',
'margin': '10px 0',
'border-radius': '5px'
}
))
if new_data['humidity'] > 70:
alerts.append(html.Div(
f"高湿度警报! 当前湿度: {new_data['humidity']:.1f}%",
style={
'color': 'white',
'background': 'blue',
'padding': '10px',
'margin': '10px 0',
'border-radius': '5px'
}
))
return temp_gauge, humidity_gauge, temp_trend, alerts
# 运行应用
if __name__ == '__main__':
app.run_server(debug=True, host='0.0.0.0')
https://plotly.com/python/static/images/dash-dashboards/iot-dashboard.png
结语与学习路径
https://www.python.org/static/community_logos/python-logo-master-v3-TM.png
通过这十一篇系列教程,你已经掌握了:
物联网硬件交互与传感器集成
物联网通信协议(MQTT/CoAP)
边缘计算框架与应用
物联网数据处理与分析
物联网安全实践
实时可视化仪表盘开发
进阶学习方向:
专业领域深入:
工业物联网(IIoT)平台开发
智能家居系统集成
智慧城市解决方案
技术栈扩展:
5G与物联网融合应用
AIoT(人工智能物联网)开发
数字孪生技术实现
认证体系:
AWS IoT认证
Cisco IoT认证
工业物联网专业认证
开源贡献:
参与主流IoT框架开发
贡献边缘计算项目
开发物联网安全工具
Python在物联网领域的应用前景广阔,持续探索和实践将助你成为这一变革性技术的引领者!