C#-mqtt通讯,服务端和客户端,以及esp32-mqtt
c#-mqtt服务端
internal class MqttServer{private List<string> clientId = new List<string>();private IMqttServer mqttServer = new MqttFactory().CreateMqttServer();string pwd = "123456";string username = "admin";private static MqttServer _Instance = null;public static MqttServer Instance{get{if (_Instance == null)_Instance = new MqttServer();_Instance.MqttServerInit();return _Instance;}}void MqttServerInit() {try{mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(ServerStarted);//启动mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(ServerStop);//关闭mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(GetClientId);//客户端连接mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(delClient);//客户端断开连接mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(GetMessage);//接收客户端信息mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribed);//客户端订阅mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribed);//客户端取消订阅}catch (Exception ex) {MqttLog.m.MqttOpen().Info("Mqtt启动失败:" + ex.Message);}}[Obsolete]public void MqttOpen(){try {// 声明一个服务端配置建立MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder();//绑定IP地址mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(Server.LocalIP);//绑定端口号mqttServerOptionsBuilder.WithDefaultEndpointPort(Server.MqttPort);//客户端验证(账号和密码)mqttServerOptionsBuilder.WithConnectionValidator(ConnectionValidator);//验证IMqttServerOptions options = mqttServerOptionsBuilder.Build();//将配置建立//开启服务mqttServer.StartAsync(options);//停止服务//mqttServer.StopAsync();}catch (Exception ex) {MqttLog.m.MqttOpen().Info("Mqtt启动失败2:" + ex.Message);}}public void TimerCallback(object state){try{if (mqttServer.IsStarted){var mqttMessage = new MqttApplicationMessageBuilder().WithTopic("system/admin").WithPayload("这是系统主题,每5秒推送是一次").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build();mqttServer.PublishAsync(mqttMessage);}}catch (Exception ex){MqttLog.m.MqttOpen().Info("推送失败"+ ex.Message);}// 业务逻辑}public void StopSetver(){mqttServer.StopAsync();}//服务端对客户端验证[Obsolete]void ConnectionValidator(MqttConnectionValidatorContext context){if (context != null && context.Password == pwd && context.Username == username){context.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted;//连接进入}else{context.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;//连接失败账号或者密码错误}}//启动void ServerStarted(EventArgs e){MqttLog.m.MqttOpen().Info("mqtt服务已开启,等待用户连接...");Debug.WriteLine("mqtt服务已开启,等待用户连接...");}//停止void ServerStop(EventArgs e){MqttLog.m.MqttOpen().Info("mqtt服务停止...");Debug.WriteLine("mqtt服务停止...");}//客户端连接void GetClientId(MqttServerClientConnectedEventArgs e){clientId.Add(e.ClientId);MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}已连接");Debug.WriteLine($"客户端:{e.ClientId}已连接");}void delClient(MqttServerClientDisconnectedEventArgs e){clientId.Remove(e.ClientId);MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}断开连接");Debug.WriteLine($"客户端:{e.ClientId}断开连接");}//接收客户端信息void GetMessage(MqttApplicationMessageReceivedEventArgs message){//this.Invoke(new Action(() =>//{//}));MqttLog.m.MqttOpen().Info($"客户端:{message.ClientId}\\n\\n发送:{Encoding.Default.GetString(message.ApplicationMessage.Payload)}");Debug.WriteLine($"客户端:{message.ClientId}");Debug.WriteLine($"发送:{Encoding.Default.GetString(message.ApplicationMessage.Payload)}");}//客户端订阅void ClientSubscribed(MqttServerClientSubscribedTopicEventArgs e){MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}---订阅{e.TopicFilter.Topic})");Debug.WriteLine($"客户端:{e.ClientId}---订阅{e.TopicFilter.Topic})");}//客户端取消订阅void ClientUnsubscribed(MqttServerClientUnsubscribedTopicEventArgs e){MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}---取消订阅{e.TopicFilter})");Debug.WriteLine($"客户端:{e.ClientId}---取消订阅{e.TopicFilter})");}}
esp32客户端;
#设备客户端代码 import network import time from umqtt.simple import MQTTClient import ubinascii import machine ssid = '24LOU' password = '12356789' # 连接wifi wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected():print('connecting to network...')wlan.connect(ssid, password)while not wlan.isconnected():pass print('网络配置:', wlan.ifconfig()) # ===== 配置参数 ===== MQTT_BROKER_IP = "192.168.3.79" # 替换为运行Broker的PC的IP地址 MQTT_PORT = 1883 MQTT_CLIENT_ID = ubinascii.hexlify(machine.unique_id()) # 生成唯一客户端ID PUBLISH_TOPIC = b"esp32/data" # 发布消息的主题 SUBSCRIBE_TOPIC = b"esp32/command" # 订阅消息的主题 # ===== 2. MQTT消息回调函数 ===== def mqtt_callback(topic, msg):print(f"收到消息 [主题: {topic.decode()}]: {msg.decode()}")# 示例:收到"ON"消息时点亮LED(需硬件支持)if topic == SUBSCRIBE_TOPIC and msg == b"ON":led = machine.Pin(2, machine.Pin.OUT) # ESP32内置LED通常对应GPIO2led.value(0) # ===== 3. 连接MQTT Broker并发布消息 ===== def connect_mqtt():client = MQTTClient(client_id=MQTT_CLIENT_ID,server=MQTT_BROKER_IP,port=MQTT_PORT)client.set_callback(mqtt_callback)try:client.connect()print(f"已连接MQTT Broker: {MQTT_BROKER_IP}:{MQTT_PORT}")client.subscribe(SUBSCRIBE_TOPIC)print(f"已订阅主题: {SUBSCRIBE_TOPIC.decode()}")return clientexcept Exception as e:print("MQTT连接失败:", e)return None# ===== 4. 主循环 ===== def main():mqtt_client = connect_mqtt()if not mqtt_client:returnpublish_count = 0try:while True:# 每5秒发布一次数据publish_count += 1print('------------------5---------------------')message = f"Hello Broker! 计数: {publish_count}"mqtt_client.publish(PUBLISH_TOPIC, message.encode())print(f"已发布: {PUBLISH_TOPIC.decode()} -> {message}")# 检查订阅消息(非阻塞)mqtt_client.check_msg()time.sleep(5) # 降低CPU占用finally:mqtt_client.disconnect()print("MQTT连接已断开")if __name__ == "__main__":main()
mqtt客户端调试通讯工具