用ai来写一个CO2传感器检测
1、目标
打算做一个二氧化碳检测,买的传感器是非分散红外吸收法(NDIR)。
硬件,esp-32S v1.1,是一个nodemcu。软件系统,micropython
- wifi
- webserver: microdot框架
- mqtt: GLM 4.6 socket实现非阻塞
MicroPython v1.26.1 on 2025-09-11; Generic ESP32 module with ESP32 Type "help()" for more information.

MPY: soft reboot
自动连接失败,启动配置模式
AP模式已启动,SSID: ESP3230aea41c6d19, IP: 192.168.4.1
[MQTT] Attempting to connect...
[MQTT] Resolving hostname: 172.29.168.230...
[MQTT] Connecting to 172.29.168.230:1883...
启动Microdot服务,访问: http://192.168.4.1
Starting async server on 0.0.0.0:80...
main: MQTT loop
[MQTT DEBUG] Sending CONNECT packet (hex): 00044d5154540402003c0006657370313233
response:b' \x02\x00\x00'
[MQTT] Connected to broker.
main: MQTT loop
[MQTT] Successfully subscribed to topic: demo2/led
订阅成功
[Main] Publishing: 'Hello from ESP32: 0'
[MQTT Callback] Received message '{"msg": "ON"
}' on topic 'demo2/led'
Command received: Turn ON LED
main: MQTT loop
[Main] Publishing: 'Hello from ESP32: 1'
main: MQTT loop
2、wifi连接
import network
import socket
import json
import os
import uasyncio# WiFi配置存储文件
CONFIG_FILE = 'wifi_config.json'class WiFiConfigEnhanced:def __init__(self):self.ap_ssid = "ESP32"self.ap_password = "12345678"self.sta_if = network.WLAN(network.STA_IF)self.ap_if = network.WLAN(network.AP_IF)self.ip = ""def start_ap(self):"""启动AP模式"""self.ap_if.active(True)self.ap_ssid=self.ap_ssid+''.join([f'{i:02x}' for i in self.ap_if.config('mac')])self.ap_if.config(essid=self.ap_ssid, password=self.ap_password, authmode=network.AUTH_WPA_WPA2_PSK)print(f"AP模式已启动,SSID: {self.ap_ssid}, IP: {self.ap_if.ifconfig()[0]}")return self.ap_if.ifconfig()[0]async def connect_sta(self, ssid, password):"""连接STA模式"""self.sta_if.active(True)self.sta_if.connect(ssid, password)# 等待连接for i in range(20):if self.sta_if.isconnected():print(f"已连接到WiFi: {ssid}")print(f"IP地址: {self.sta_if.ifconfig()[0]}")#self.save_config(ssid, password)self.ip=self.sta_if.ifconfig()[0]return Trueawait uasyncio.sleep(1)return Falsedef save_config(self, ssid, password):"""保存WiFi配置到文件"""config = {"ssid": ssid, "password": password}with open(CONFIG_FILE, 'w') as f:json.dump(config, f)print("WiFi配置已保存")def load_config(self):"""从文件加载WiFi配置"""try:with open(CONFIG_FILE, 'r') as f:config = json.load(f)return config.get("ssid"), config.get("password")except:return None, Nonedef try_auto_connect(self):"""尝试自动连接已保存的WiFi"""ssid, password = self.load_config()if ssid and password:print(f"尝试自动连接: {ssid}")return self.connect_sta(ssid, password)return Falsedef scan_networks(self):"""扫描可用WiFi网络"""self.sta_if.active(True)networks = self.sta_if.scan()result = []for net in networks:result.append({'ssid': net[0].decode('utf-8'),'signal': net[3],'security': net[4]})return resultdef get_ip(self):"""获得的ip"""return self.ip
3、web服务
使用7zip压缩css,js文件,文件尺寸缩小明显。
注意send_filed选项 compressed,microdot会自动生成正确的header.
from microdot import Microdot, send_file
from websocket import with_websocket
import network
import json
import uasyncioclass WebServerMicrodot:def __init__(self, wifi_config,mqtt_client):self.wifi_config = wifi_configself.mqtt_client=mqtt_clientself.app = Microdot()self.setup_routes()def setup_routes(self):"""设置路由"""@self.app.route('/')async def index(request):return send_file('index.html')@self.app.route('/milligram.min.css')async def style_css(request):return send_file('milligram.min.css.gz', compressed=True)@self.app.route('/zepto.min.js')async def script_js(request):return send_file('zepto.min.js.gz', compressed=True )@self.app.route('/config', methods=['POST'])async def handle_config(request):try:config = request.jsonssid = config.get('ssid', '')password = config.get('password', '')success = await self.wifi_config.connect_sta(ssid, password)if(success):sta_if = network.WLAN(network.STA_IF)return {'success': True, 'ip': sta_if.ifconfig()}, 200else:return {'success': False}, 200except Exception as e:return {'success': False, 'error': str(e)}, 400@self.app.route('/status')async def get_status(request):"""获取连接状态"""sta_if = network.WLAN(network.STA_IF)status = {'sta_connected': sta_if.isconnected(),'sta_config': sta_if.ifconfig() if sta_if.isconnected() else None,'ap_active': self.wifi_config.ap_if.active(),'ap_config': self.wifi_config.ap_if.ifconfig() if self.wifi_config.ap_if.active() else None}return status@self.app.errorhandler(404)async def not_found(request):return '页面未找到', 404@self.app.route('/connect_mqtt', methods=['POST'])async def connect_mqtt(request):"""手动连接mqtt broker"""self.mqtt_client.connect()return '', 200@self.app.route('/ws')@with_websocketasync def read_data(request, ws):""" websocket IO """while True:ip = self.wifi_config.get_ip()# sendawait ws.send(ip)await uasyncio.sleep_ms(1000)async def start_server(self):"""启动Microdot服务"""print(f"启动Microdot服务,访问: http://{self.wifi_config.ap_if.ifconfig()[0]}")await self.app.start_server(port=80, debug=True)
4、MQTTClient类
wifi未能成功取得IP时,使用umqtt类连接,会阻塞webserver进程。
用llama.cpp+jinx-gpt-oss-20b-mxfp4.gguf,代码进入循环,始终报错。问智谱的GLM 4.6代码,得到能执行的代码。
# mqtt_async.py
import uasyncio as asyncio
import usocket as socket
import ustruct as struct
import utime as time
import urandom as random# MQTT Packet Types
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
SUBSCRIBE = 0x82
SUBACK = 0x90
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0class MQTTClient:def __init__(self, client_id, server, port=1883, user=None, password=None, keepalive=60, ssl=False):self.client_id = client_idself.server = serverself.port = portself.user = userself.password = passwordself.keepalive = keepaliveself.ssl = sslself.reader = Noneself.writer = Noneself._lock = asyncio.Lock()self._pid = 0self._message_task = Noneself.is_connected = Falseself._callback = None# Events for QoS 1 publish and subscribeself._puback_events = {}self._suback_events = {}def _next_pid(self):self._pid = (self._pid + 1) & 0xFFFFif self._pid == 0:self._pid = 1return self._piddef _pack_remaining_length(self, length):s = b''while True:byte = length & 0x7Flength >>= 7if length > 0:byte |= 0x80s += struct.pack('B', byte)if length == 0:breakreturn s# --- 新增的健壮读取函数 ---async def _readexactly(self, n):"""Read exactly n bytes from the stream or raise an error."""data = b''while len(data) < n:packet = await self.reader.read(n - len(data))if not packet:# Connection closed before all data was receivedraise OSError("Connection closed unexpectedly")data += packetreturn dataasync def _send_packet(self, cmd, data=b''):await self._lock.acquire()try:pkt = cmd.to_bytes(1, 'big') + self._pack_remaining_length(len(data)) + dataself.writer.write(pkt)await self.writer.drain()finally:self._lock.release()def set_callback(self, f):self._callback = fasync def connect(self):if self.is_connected:returntry:# DNS 查询 (阻塞操作)print(f"[MQTT] Resolving hostname: {self.server}...")addr_info = socket.getaddrinfo(self.server, self.port)if not addr_info:raise OSError("DNS resolution failed")addr = addr_info[0][-1]print(f"[MQTT] Connecting to {addr[0]}:{addr[1]}...")self.reader, self.writer = await asyncio.open_connection(addr[0], addr[1])# Construct CONNECT packetdata = bytearray()data += struct.pack('>H', 4) + b'MQTT' # Protocol Name Length + "MQTT"data += struct.pack('B', 4) # Protocol Level 4flags = 0x02 # Clean Sessionif self.user:flags |= 0x80if self.password:flags |= 0x40data += struct.pack('B', flags)data += struct.pack('>H', self.keepalive) # Keep Alivedata += struct.pack('>H', len(self.client_id)) + self.client_id.encode()if self.user:data += struct.pack('>H', len(self.user)) + self.user.encode()if self.password:data += struct.pack('>H', len(self.password)) + self.password.encode()print(f"[MQTT DEBUG] Sending CONNECT packet (hex): {data.hex()}")await self._send_packet(CONNECT, data)# --- 修改点: 使用 _readexactly ---# Wait for CONNACKresp = await self._readexactly(4)resp_str=str(resp)print(f'response:{resp_str}')if resp[0] != CONNACK:raise OSError(f"Expected CONNACK, got {resp[0]:#x}")if resp[3] != 0:raise OSError(f"Connection failed with code {resp[3]}")self.is_connected = Trueprint("[MQTT] Connected to broker.")# Start the message handling loopif self._message_task is None or self._message_task.done():self._message_task = asyncio.create_task(self._message_loop())except OSError as e:print(f"[MQTT] Connection failed: {e}")self.is_connected = Falseif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Noneraiseasync def disconnect(self):if not self.is_connected:returntry:await self._send_packet(DISCONNECT)finally:self.is_connected = Falseif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Noneif self._message_task:self._message_task.cancel()self._message_task = Noneprint("[MQTT] Disconnected.")# 在 mqtt_async.py 文件中,找到 publish 方法并替换为以下版本:async def publish(self, topic, msg, qos=0, retain=False):if not self.is_connected:raise OSError("Not connected")pid = 0if qos > 0:pid = self._next_pid()self._puback_events[pid] = asyncio.Event()pkt = bytearray()# --- 修改点 1 ---pkt += struct.pack('>H', len(topic)) + topic.encode()if qos > 0:# --- 修改点 2 ---pkt += struct.pack('>H', pid)pkt += msg.encode()cmd = PUBLISH | (qos << 1) | (retain << 0)await self._send_packet(cmd, pkt)if qos > 0:try:await asyncio.wait_for(self._puback_events[pid].wait(), timeout=5)except asyncio.TimeoutError:print(f"[MQTT] PUBACK timeout for PID {pid}")del self._puback_events[pid]raise OSError("Publish QoS 1 failed (timeout)")finally:if pid in self._puback_events:del self._puback_events[pid]# 在 mqtt_async.py 文件中,找到 subscribe 方法并替换为以下版本:async def subscribe(self, topic, qos=1):"""订阅一个主题。Args:topic (str): 要订阅的主题。qos (int): 服务质量等级 (0 或 1)。Returns:bool: 订阅成功返回 True,否则返回 False。"""if not self.is_connected:print("[MQTT] Cannot subscribe, not connected.")return Falsepid = self._next_pid()self._suback_events[pid] = asyncio.Event()data = bytearray()data += struct.pack('>H', pid)data += struct.pack('>H', len(topic)) + topic.encode()data += struct.pack('B', qos)await self._send_packet(SUBSCRIBE, data)try:# 等待服务器的 SUBACK 确认await asyncio.wait_for(self._suback_events[pid].wait(), timeout=5)print(f"[MQTT] Successfully subscribed to topic: {topic}")return Trueexcept asyncio.TimeoutError:print(f"[MQTT] SUBACK timeout for PID {pid}. Subscription failed.")return Falsefinally:# 无论成功还是失败,都要清理事件if pid in self._suback_events:del self._suback_events[pid]async def _message_loop(self):while self.is_connected:try:# --- 修改点: 在消息循环中也使用 _readexactly ---fixed_header = await asyncio.wait_for(self._readexactly(1), timeout=self.keepalive)packet_type = fixed_header[0] & 0xF0remaining_len = 0multiplier = 1while True:b = await self._readexactly(1)remaining_len += (b[0] & 0x7F) * multiplierif b[0] & 0x80 == 0:breakmultiplier <<= 7payload = await self._readexactly(remaining_len)if packet_type == PUBLISH:self._handle_publish(payload)elif packet_type == PUBACK:self._handle_puback(payload)elif packet_type == SUBACK:self._handle_suback(payload)elif packet_type == PINGRESP:passelse:print(f"[MQTT] Received unhandled packet type: {packet_type:#x}")except asyncio.TimeoutError:await self._send_packet(PINGREQ)print("[MQTT] Sent PINGREQ")except OSError as e:print(f"[MQTT] Message loop error: {e}. Reconnecting...")self.is_connected = Falsebreakif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Nonedef _handle_publish(self, payload):i = 0topic_len = struct.unpack_from('>H', payload, i)[0]i += 2topic = payload[i:i+topic_len].decode()i += topic_lenpid = 0if (payload[0] & 0x06) != 0: # QoS > 0pid = struct.unpack_from('>H', payload, i)[0]i += 2msg = payload[i:].decode()if self._callback:asyncio.create_task(self._callback(topic, msg))if (payload[0] & 0x06) == 0x02: # QoS 1, send PUBACKasyncio.create_task(self._send_packet(PUBACK, struct.pack('>H', pid)))def _handle_puback(self, payload):pid = struct.unpack('>H', payload)[0]if pid in self._puback_events:self._puback_events[pid].set()def _handle_suback(self, payload):pid = struct.unpack('>H', payload)[0]if pid in self._suback_events:self._suback_events[pid].set()async def run_forever(self):"""Main loop with auto-reconnect."""while True:try:if not self.is_connected:print("[MQTT] Attempting to connect...")await self.connect()await asyncio.sleep(1)except (OSError, asyncio.TimeoutError) as e:print(f"[MQTT] Connection attempt failed: {e}. Retrying in 10 seconds...")await asyncio.sleep(10)
完善onconnect和ondisconnect后的代码,会导致内存不够。
import uasyncio as asyncio
import usocket as socket
import ustruct as struct
import utime as time
import urandom as random# MQTT Packet Types
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
SUBSCRIBE = 0x82
SUBACK = 0x90
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0class MQTTClient:def __init__(self, client_id, server, port=1883, user=None, password=None, keepalive=60, ssl=False):self.client_id = client_idself.server = serverself.port = portself.user = userself.password = passwordself.keepalive = keepaliveself.ssl = sslself.reader = Noneself.writer = Noneself._lock = asyncio.Lock()self._pid = 0self._message_task = Noneself.is_connected = False# --- 新增:回调函数 ---self._callback = Noneself.on_connect = Noneself.on_disconnect = None# Events for QoS 1 publish and subscribeself._puback_events = {}self._suback_events = {}def _next_pid(self):self._pid = (self._pid + 1) & 0xFFFFif self._pid == 0:self._pid = 1return self._piddef _pack_remaining_length(self, length):s = b''while True:byte = length & 0x7Flength >>= 7if length > 0:byte |= 0x80s += struct.pack('B', byte)if length == 0:breakreturn sasync def _readexactly(self, n):"""Read exactly n bytes from the stream or raise an error."""data = b''while len(data) < n:packet = await self.reader.read(n - len(data))if not packet:raise OSError("Connection closed unexpectedly")data += packetreturn dataasync def _send_packet(self, cmd, data=b''):await self._lock.acquire()try:pkt = cmd.to_bytes(1, 'big') + self._pack_remaining_length(len(data)) + dataself.writer.write(pkt)await self.writer.drain()finally:self._lock.release()# --- 修改:新增设置回调的方法 ---def set_callback(self, f):"""设置接收 PUBLISH 消息的回调函数"""self._callback = fdef set_on_connect(self, f):"""设置连接成功后的回调函数"""self.on_connect = fdef set_on_disconnect(self, f):"""设置断开连接后的回调函数"""self.on_disconnect = fasync def connect(self):if self.is_connected:returntry:print(f"[MQTT] Resolving hostname: {self.server}...")addr_info = socket.getaddrinfo(self.server, self.port)if not addr_info:raise OSError("DNS resolution failed")addr = addr_info[0][-1]print(f"[MQTT] Connecting to {addr[0]}:{addr[1]}...")self.reader, self.writer = await asyncio.open_connection(addr[0], addr[1])# Construct CONNECT packetdata = bytearray()data += struct.pack('>H', 4) + b'MQTT' # Protocol Namedata += struct.pack('B', 4) # Protocol Level 4flags = 0x02 # Clean Sessionif self.user:flags |= 0x80if self.password:flags |= 0x40data += struct.pack('B', flags)data += struct.pack('>H', self.keepalive) # Keep Alivedata += struct.pack('>H', len(self.client_id)) + self.client_id.encode()if self.user:data += struct.pack('>H', len(self.user)) + self.user.encode()if self.password:data += struct.pack('>H', len(self.password)) + self.password.encode()await self._send_packet(CONNECT, data)resp = await self._readexactly(4)if resp[0] != CONNACK:raise OSError(f"Expected CONNACK, got {resp[0]:#x}")if resp[3] != 0:raise OSError(f"Connection failed with code {resp[3]}")self.is_connected = Trueprint("[MQTT] Connected to broker.")# --- 新增:触发 on_connect 回调 ---if self.on_connect:asyncio.create_task(self.on_connect())# Start the message handling loopif self._message_task is None or self._message_task.done():self._message_task = asyncio.create_task(self._message_loop())except OSError as e:print(f"[MQTT] Connection failed: {e}")self.is_connected = Falseif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Noneraiseasync def disconnect(self):if not self.is_connected:returntry:await self._send_packet(DISCONNECT)finally:self.is_connected = Falseif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Noneif self._message_task:self._message_task.cancel()self._message_task = Noneprint("[MQTT] Disconnected.")# --- 新增:触发 on_disconnect 回调 ---if self.on_disconnect:asyncio.create_task(self.on_disconnect(reason="clean"))async def publish(self, topic, msg, qos=0, retain=False):if not self.is_connected:raise OSError("Not connected")pid = 0if qos > 0:pid = self._next_pid()self._puback_events[pid] = asyncio.Event()pkt = bytearray()pkt += struct.pack('>H', len(topic)) + topic.encode()if qos > 0:pkt += struct.pack('>H', pid)pkt += msg.encode()cmd = PUBLISH | (qos << 1) | (retain << 0)await self._send_packet(cmd, pkt)if qos > 0:try:await asyncio.wait_for(self._puback_events[pid].wait(), timeout=5)except asyncio.TimeoutError:print(f"[MQTT] PUBACK timeout for PID {pid}")del self._puback_events[pid]raise OSError("Publish QoS 1 failed (timeout)")finally:if pid in self._puback_events:del self._puback_events[pid]async def subscribe(self, topic, qos=1):if not self.is_connected:print("[MQTT] Cannot subscribe, not connected.")return Falsepid = self._next_pid()self._suback_events[pid] = asyncio.Event()data = bytearray()data += struct.pack('>H', pid)data += struct.pack('>H', len(topic)) + topic.encode()data += struct.pack('B', qos)await self._send_packet(SUBSCRIBE, data)try:await asyncio.wait_for(self._suback_events[pid].wait(), timeout=5)print(f"[MQTT] Successfully subscribed to topic: {topic}")return Trueexcept asyncio.TimeoutError:print(f"[MQTT] SUBACK timeout for PID {pid}. Subscription failed.")return Falsefinally:if pid in self._suback_events:del self._suback_events[pid]async def _message_loop(self):while self.is_connected:try:fixed_header = await asyncio.wait_for(self._readexactly(1), timeout=self.keepalive)packet_type = fixed_header[0] & 0xF0remaining_len = 0multiplier = 1while True:b = await self._readexactly(1)remaining_len += (b[0] & 0x7F) * multiplierif b[0] & 0x80 == 0:breakmultiplier <<= 7payload = await self._readexactly(remaining_len)if packet_type == PUBLISH:self._handle_publish(payload)elif packet_type == PUBACK:self._handle_puback(payload)elif packet_type == SUBACK:self._handle_suback(payload)elif packet_type == PINGRESP:passelse:print(f"[MQTT] Received unhandled packet type: {packet_type:#x}")except asyncio.TimeoutError:await self._send_packet(PINGREQ)print("[MQTT] Sent PINGREQ")except OSError as e:print(f"[MQTT] Message loop error: {e}. Reconnecting...")self.is_connected = False# --- 新增:触发 on_disconnect 回调 ---if self.on_disconnect:asyncio.create_task(self.on_disconnect(reason="unexpected"))breakif self.writer:self.writer.close()await self.writer.wait_closed()self.reader = Noneself.writer = Nonedef _handle_publish(self, payload):i = 0topic_len = struct.unpack_from('>H', payload, i)[0]i += 2topic = payload[i:i+topic_len].decode()i += topic_lenpid = 0if (payload[0] & 0x06) != 0: # QoS > 0pid = struct.unpack_from('>H', payload, i)[0]i += 2msg = payload[i:].decode()if self._callback:asyncio.create_task(self._callback(topic, msg))if (payload[0] & 0x06) == 0x02: # QoS 1, send PUBACKasyncio.create_task(self._send_packet(PUBACK, struct.pack('>H', pid)))def _handle_puback(self, payload):pid = struct.unpack('>H', payload)[0]if pid in self._puback_events:self._puback_events[pid].set()def _handle_suback(self, payload):pid = struct.unpack('>H', payload)[0]if pid in self._suback_events:self._suback_events[pid].set()async def run_forever(self):"""Main loop with auto-reconnect."""while True:try:if not self.is_connected:print("[MQTT] Attempting to connect...")await self.connect()await asyncio.sleep(1)except (OSError, asyncio.TimeoutError) as e:print(f"[MQTT] Connection attempt failed: {e}. Retrying in 10 seconds...")await asyncio.sleep(10)
5、Main程序执行
from config_wifi import WiFiConfigEnhanced
from web_server import WebServerMicrodot
from MqttClient import MQTTClientimport uasyncio
import json# --- Callback function for received messages ---
async def mqtt_callback(topic, msg):print(f"[MQTT Callback] Received message '{msg}' on topic '{topic}'")# You can add logic here to react to received commandsdata=json.loads(msg)if data['msg'] == "ON":print("Command received: Turn ON LED")# Add your code to turn on an LED, e.g., led.on()elif ['msg'] == "OFF":print("Command received: Turn OFF LED")# Add your code to turn off an LED, e.g., led.off()# publish topic
async def mqtt_loop(client: MQTTClient): counter = 0subscribed=False while True:print('main: MQTT loop')if client.is_connected:if not subscribed:subscribed=await client.subscribe('demo2/led')if subscribed:print(f'订阅成功')try:message = f"Hello from ESP32: {counter}"print(f"[Main] Publishing: '{message}'")await client.publish('demo/counter', message, qos=1)counter += 1except OSError as e:print(f"[Main] Failed to publish: {e}")# Publish every 10 secondsawait uasyncio.sleep(10)async def main():# wifiwifi = WiFiConfigEnhanced()if not wifi.try_auto_connect():print("自动连接失败,启动配置模式")wifi.start_ap()else:print("已成功连接到WiFi")print(f"IP地址: {wifi.sta_if.ifconfig()[0]}")# mqttmqtt_client = MQTTClient(client_id='esp123',server='172.29.168.230')mqtt_client.set_callback(mqtt_callback)task_mqtt=uasyncio.create_task(mqtt_client.run_forever())# webserver = WebServerMicrodot(wifi,mqtt_client)task_web=uasyncio.create_task(server.start_server())# publishtask_put=uasyncio.create_task(mqtt_loop(mqtt_client))tasks = [task_web,task_mqtt,task_put]await uasyncio.gather(*tasks)if __name__ == "__main__":uasyncio.run(main())
6、index.html
用websocket更新页面。
helpers.py,websocket.py从microdot-2.3.5\src\microdot上传到根目录。
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>ESP32 WiFi配置</title><link rel="stylesheet" href="/milligram.min.css"><style>.message {margin-top: 20px;padding: 15px;border-radius: 5px;}.success {background-color: #d4edda;color: #155724;}.error {background-color: #f8d7da;color: #721c24;}</style>
</head>
<body>
<div class="container"> <div class="row"><div class="column"><h6>ESP32 WiFi配置</h6></div> </div><div class="row"><div class="column"><form id="wifiForm"><fieldset><div class="form-group"><label for="ssid">WiFi名称 (SSID)</label><input class="input" type="text" id="ssid" name="ssid" required></div><div class="form-group"><label for="password">WiFi密码</label><input class="input" type="text" id="password" name="password"></div><button class="button button-primary" type="submit">连接</button></fieldset></form></div></div><div class="row"><div class="column"><form id="mqttForm"><label>Mqtt</label><button class="button button-primary" type="submit">连接broker</button></form></div></div><div class="row"><div class="column message" id="wifi_satus"></div></div> <div class="row"><div class="column message" id="message" style="display: none;"></div></div> <script src="/zepto.min.js"></script><script>var websocket=new WebSocket(`ws://${location.host}/ws`);websocket.onmessage = onMessage;function onMessage(event) {$('#wifi_satus').text(event.data);}$(function() {$('#mqttForm').on('submit', function(e) {e.preventDefault();$.ajax({url: '/connect_mqtt',type: 'POST'});});$('#wifiForm').on('submit', function(e) {e.preventDefault();const ssid = $('#ssid').val();const password = $('#password').val();// Disable form during submissionconst submitButton = $('button[type="submit"]');submitButton.prop('disabled', true).addClass('button-disabled');// Show loading message$('#message').removeClass('error').addClass('success').text('正在连接...').show();$.ajax({url: '/config',type: 'POST',contentType: 'application/json',data: JSON.stringify({ ssid: ssid, password: password }),timeout: 10000, // 10 second timeoutsuccess: function(response) {if (response.success) {$('#message').removeClass('error').addClass('success').text('连接成功!IP:'+response.ip);} else {$('#message').removeClass('success').addClass('error').text('连接失败。');}},error: function(xhr, status, error) {let errorMsg = '连接失败';if (xhr.status === 0) {errorMsg = '无法连接到服务器,请检查网络';} else {try {const response = JSON.parse(xhr.responseText);errorMsg = response.error || errorMsg;} catch (e) {}}$('#message').removeClass('success').addClass('error').text(errorMsg);},complete: function() {// Re-enable formsubmitButton.prop('disabled', false).removeClass('button-disabled');}});});});</script>
</div>
</body>
</html>