当前位置: 首页 > news >正文

C#MQTT协议服务器与客户端通讯实现(客户端包含断开重连模块)

C#MQTT协议服务器与客户端通讯实现

  • 1 DLL版本
  • 2 服务器
  • 3 客户端

1 DLL版本

MQTTnet.DLL版本-2.7.5.0
基于比较老的项目中应用的DLL,其他更高版本变化可能较大,谨慎参考。

2 服务器

开启服务器
关闭服务器
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端订阅主题事件】
绑定事件【客户端退订主题事件】
绑定事件【接收客户端(发送)消息事件】

using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;

namespace Demo_MQTT.Model
{
    public class ServerModel
    {
        private static MqttServer _mqttServer = null;


        private readonly Action<string> _callbackLog;

        public ServerModel(Action<string> callbackLog)
        {
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 绑定客户端连接服务器事件
        /// </summary>
        private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
        {
            WriteLog($"客户端[{e.Client.ClientId}]已连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 绑定客户端断开连接事件
        /// </summary>
        private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            WriteLog($"客户端[{e.Client.ClientId}]已断开连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 绑定客户端订阅主题事件
        /// </summary>
        private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客户端{e.ClientId}订阅主题{e.TopicFilter.Topic}");
        }

        /// <summary>
        /// 绑定客户端退订主题事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客户端{e.ClientId}退订主题{e.TopicFilter}");

        }

        /// <summary>
        /// 绑定接收客户端消息事件
        /// </summary>
        private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            WriteLog($"接收到{e.ClientId}发送来的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }

        /// <summary>
        /// 开启服务器
        /// </summary>
        /// <param name="ip">IP地址</param>
        /// <param name="port">端口号</param>
        public void StartServer(string ip, int port)
        {
            if (_mqttServer == null)
            {
                var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))
                    .WithConnectionBacklog(1000)
                    .WithDefaultEndpointPort(port);

                IMqttServerOptions options = optionsBuilder.Build();

                try
                {
                    _mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
                    _mqttServer.ClientConnected += MqttServer_ClientConnected;
                    _mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
                    _mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;

                    _mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;
                    _mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;

                    _mqttServer.StartAsync(options);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    return;
                }

                WriteLog($"MQTT服务器启动成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
            }
        }

        /// <summary>
        /// 关闭服务器
        /// </summary>
        public void CloseServer()
        {
            _mqttServer?.StopAsync();
        }
    }
}

3 客户端

连接服务器
属性:客户端连接状态
客户端断开重连线程
获取所有订阅主题
订阅主题
退订主题
发送消息
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端接收消息事件】

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

namespace Demo_MQTT.Model
{
    public class ClientModel
    {
        /// <summary>
        /// 记录所有订阅主题,用于断开重连时重新订阅主题
        /// </summary>
        private readonly List<string> _subscribeTopics = new List<string>();
        private MqttClient _mqttClient = null;
        private string _serverIp;
        private int _nServerPort;
        private bool _isRunningReConnectThreadStart = false;
        private string _clienID;

        /// <summary>
        /// 接受消息回调函数,参数:主题,消息内容
        /// </summary>
        private readonly Action<string, byte[]> _callbackReceived;
        private readonly Action<string> _callbackLog;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="callbackReceived">接受消息回调函数,参数:主题,消息内容</param>
        /// <param name="callbackLog"></param>
        public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog)
        {
            _callbackReceived = callbackReceived;
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 连接服务器
        /// </summary>
        private async void ConnectServer()
        {
            try
            {
                if (_mqttClient == null)
                {
                    _mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;
                    _mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已连接到MQTT服务器!");
                    _mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已断开MQTT连接!");
                    _mqttClient.ApplicationMessageReceived += (sender, args) =>
                    {
                        _callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);
                    };
                }
                if (_mqttClient.IsConnected) return;

                IMqttClientOptions options = new MqttClientOptions
                {
                    ChannelOptions = new MqttClientTcpOptions()
                    {
                        Server = _serverIp,
                        Port = _nServerPort
                    },
                    CleanSession = true
                };

                _clienID = options.ClientId;
                await _mqttClient.ConnectAsync(options);
                if (_mqttClient.IsConnected)
                {
                    ReConnectThreadStart();
                    SubscribeAsync();
                }
            }
            catch (Exception ex)
            {
                WriteLog("连接到MQTT服务器失败!");
            }
        }

        /// <summary>
        /// 客户端重连服务器线程-启动
        /// </summary>
        /// <returns></returns>
        private void ReConnectThreadStart()
        {
            if (_isRunningReConnectThreadStart) return;

            if (_mqttClient != null)
            {
                new Task(() =>
                {
                    _isRunningReConnectThreadStart = true;
                    Thread.Sleep(5000);
                    while (true)
                    {
                        Thread.Sleep(1000);
                        if (!IsConnect)
                        {
                            WriteLog($"客户端[{_clienID}]断开连接,尝试重新连接服务器中...");
                            int i;
                            for (i = 0; i < 60; i++)
                            {
                                if (IsConnect) break;
                                WriteLog($"尝试第{i + 1}次连接服务器");
                                ConnectServer();
                                Thread.Sleep(1000);
                                if (IsConnect) break;
                            }
                            _isRunningReConnectThreadStart = i < 60;
                        }

                        if (!_isRunningReConnectThreadStart) break;
                    }

                }).Start();
            }
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }


        /// <summary>
        /// 客户端连接状态
        /// </summary>
        public bool IsConnect => _mqttClient?.IsConnected == true;

        /// <summary>
        /// 连接服务器
        /// </summary>
        /// <param name="serverIp">服务器IP</param>
        /// <param name="serverPort">服务器端口</param>
        /// <param name="topic"></param>
        public async void ConnectServer(string serverIp, int serverPort)
        {
            _serverIp = serverIp;
            _nServerPort = serverPort;

            await Task.Run(() => { ConnectServer(); });
        }

        /// <summary>
        /// 关闭客户端,断开客户端和服务器的连接
        /// </summary>
        public void CloseClient()
        {
            _mqttClient.DisconnectAsync();
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic">发送主题</param>
        /// <param name="cmd">发送内容</param>
        [Obsolete("Obsolete")]
        public void PublishAsync(string topic, string cmd)
        {
            var bytes = Encoding.UTF8.GetBytes(cmd);
            var mode = MqttQualityOfServiceLevel.AtMostOnce;
            var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);
            _mqttClient.PublishAsync(appMsg);//发送消息
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topics">主题标识</param>
        public void SubscribeAsync(params string[] topics)
        {
            foreach (var topic in topics)
            {
                if (!_subscribeTopics.Contains(topic))
                {
                    _subscribeTopics.Add(topic);
                }
            }
            var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient?.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 退订已订阅主题
        /// </summary>
        /// <param name="topics">主题标识</param>
        public void UnSubscribeAsync(params string[] topics)
        {
            if (topics == null || topics.Length == 0) return;
            var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 获取所有订阅主题
        /// </summary>
        public string[] GetAllTopic => _subscribeTopics.ToArray();

    }
}

相关文章:

  • Day 8 上篇:深入理解 Linux 驱动模型中的平台驱动与总线驱动
  • JS实现文件点击或者拖拽上传
  • Sql with as 语句
  • 重读《人件》Peopleware -(6)Ⅰ管理人力资源Ⅴ-帕金森定律重探 Parkinson’s Law Revisited
  • [算法题:快排(一)]颜色分类
  • 【unity游戏开发介绍之UGUI篇】UGUI概述和基础使用
  • ThingsBoard3.9.1 MQTT Topic(1)
  • Apollo源码总结
  • 寻找峰值 --- 二分查找
  • 主流开源大模型评估数据集
  • 【工具】Fiddler抓包
  • 本地部署大模型(ollama模式)
  • 【Code】《代码整洁之道》笔记-Chapter13-并发编程
  • 机械臂只有位置信息是否可以进行手眼标定?
  • HDF5文件格式:数据类型与读写功能详解
  • asm汇编源代码之CPU型号检测
  • Axure中继器(Repeater): 列表多选和 列表查询
  • Python 数据分析01 环境搭建教程
  • SpringBoot项目如何用ServiceLocatorFactoryBean优雅切换支付渠道?
  • FreeRTOS使任务处于运行态的API ?
  • 旅游网站设计的目的/智能营销系统
  • 成都建设监理协会网站网址/百度百科入口
  • 青岛做网站大公司有哪些/互联网广告销售好做吗
  • 网站怎么样建设/百度店铺怎么入驻
  • b2c电子商务购物网站/杭州seo排名优化外包
  • 核桃少儿编程加盟/seo优化网站