当前位置: 首页 > news >正文

Mosquitto:MQTT Broker入门与分布式部署最佳实践

1 Mosquitto 简介与核心特性

Mosquitto 是 Eclipse 基金会下的开源 MQTT 代理服务器,采用 C 语言编写,轻量高效,支持 MQTT 3.1、3.1.1 和 5.0 协议。

核心特性

· 轻量级,资源占用少
· 支持 QoS 0/1/2 消息等级
· 持久化会话支持
· TLS/SSL 加密通信
· 灵活的认证授权机制
· 桥接模式支持分布式部署

2 安装与使用

项目连接:https://github.com/eclipse-mosquitto/mosquitto
下载一份代码 git clone https://github.com/eclipse-mosquitto/mosquitto,直接make编译,我的环境是Centos7,缺少依赖包就先安装依赖的包。
编译完成,直接运行程序:
在这里插入图片描述
mosquitto已经运行在监听1883端口,在另外的终端分别运行订阅与发布命令测试下,编译完成后在项目的client目录下已经有这两个命令。

订阅:
在这里插入图片描述
发布:
在这里插入图片描述

mosquitto已经可以正常运行,客户端正常订阅与发布消息,接下来就可以项目化使用。

3. 最佳实践:分布式实践方案

3.1 架构设计
MQTT 分布式基本架构图
3.2 多节点部署配置

Mosquitto作为MQTT的一个Broker,可以分布式部署,设备端通过一个API接口获取Broker列表,然后根据连接策略选择一个Broker进行连接与登陆。

主节点配置文件 (mosquitto_primary.conf)

# 基础配置
listener 1883
protocol mqtt# 持久化
persistence true
persistence_location /var/lib/mosquitto/# 日志配置
log_dest file /var/log/mosquitto/mosquitto.log
log_type all# 安全配置
allow_anonymous false
password_file /etc/mosquitto/passwd# 桥接配置 - 连接到其他节点
connection bridge_secondary
address 192.168.1.2:1883
topic # both 2 "" ""# 集群发现
listener 1884
protocol websockets

从节点配置文件 (mosquitto_secondary.conf)

listener 1883
protocol mqtt
persistence true
persistence_location /var/lib/mosquitto_secondary/# 连接到主节点
connection bridge_primary
address 192.168.1.1:1883
topic # both 2 "" ""

3.3 使用 Docker 部署集群

docker-compose.yml

version: '3.8'
services:mosquitto-primary:image: eclipse-mosquitto:latestports:- "1883:1883"- "9001:9001"volumes:- ./primary.conf:/mosquitto/config/mosquitto.conf- ./primary-data:/mosquitto/data- ./primary-log:/mosquitto/logmosquitto-secondary:image: eclipse-mosquitto:latestports:- "1884:1883"volumes:- ./secondary.conf:/mosquitto/config/mosquitto.conf- ./secondary-data:/mosquitto/data- ./secondary-log:/mosquitto/logdepends_on:- mosquitto-primary

4. 自定义认证插件开发

mosquitto本身没有实现对接数据库进行访问,不过可以开发一个简单插件实现访问数据库对每个设备进行认证鉴权。

按照项目提供的插件例子,实现相应的接口,把插件编译为一个so文件,然后在配置文件中配置插件文件名、数据库用户名与密码,就可以完成一个可访问数据库进行认证的Broker。

4.1 插件基础结构

mosquitto_auth_db.h

#ifndef MOSQUITTO_AUTH_DB_H
#define MOSQUITTO_AUTH_DB_H#include "mosquitto_broker.h"
#include "mosquitto_plugin.h"
#include "mosquitto.h"// 插件配置结构
struct auth_db_config {char *db_host;int db_port;char *db_name;char *db_user;char *db_password;int cache_timeout;
};// 数据库连接结构
struct db_connection {// 数据库连接句柄void *conn;// 连接状态int connected;
};#endif

4.2 主要认证函数实现

mosquitto_auth_db.c

#include "mosquitto_auth_db.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>// 插件版本
int mosquitto_auth_plugin_version(void) {return MOSQ_AUTH_PLUGIN_VERSION;
}// 插件初始化
int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *opts, int opt_count) {struct auth_db_config *config = malloc(sizeof(struct auth_db_config));// 解析配置参数for (int i = 0; i < opt_count; i++) {if (strcmp(opts[i].key, "db_host") == 0) {config->db_host = strdup(opts[i].value);} else if (strcmp(opts[i].key, "db_port") == 0) {config->db_port = atoi(opts[i].value);}// 其他参数解析...}*user_data = config;// 初始化数据库连接if (db_connect(config) != 0) {mosquitto_log_printf(MOSQ_LOG_ERR, "Failed to connect to database");return MOSQ_ERR_UNKNOWN;}return MOSQ_ERR_SUCCESS;
}// 用户名/密码认证
int mosquitto_auth_plugin_authenticate_user(void *user_data, struct mosquitto *client,const char *username, const char *password
) {struct auth_db_config *config = (struct auth_db_config *)user_data;if (!username || !password) {return MOSQ_ERR_AUTH;}// 查询数据库验证用户int auth_result = db_authenticate_user(config, username, password);if (auth_result == 0) {mosquitto_log_printf(MOSQ_LOG_INFO, "User %s authenticated successfully", username);return MOSQ_ERR_SUCCESS;} else {mosquitto_log_printf(MOSQ_LOG_WARNING, "Authentication failed for user %s", username);return MOSQ_ERR_AUTH;}
}// ACL 检查
int mosquitto_auth_plugin_acl_check(void *user_data,int access,struct mosquitto *client,const struct mosquitto_acl_msg *msg
) {// 实现主题访问控制逻辑const char *username = mosquitto_client_username(client);if (username && check_topic_permission(username, msg->topic, access)) {return MOSQ_ERR_SUCCESS;}return MOSQ_ERR_ACL_DENIED;
}// 数据库认证函数
int db_authenticate_user(struct auth_db_config *config, const char *username, const char *password) {// 这里实现具体的数据库查询逻辑// 示例使用 PostgreSQLPGconn *conn = PQconnectdb(fmt::format("host={} port={} dbname={} user={} password={}",config->db_host, config->db_port, config->db_name, config->db_user, config->db_password));if (PQstatus(conn) != CONNECTION_OK) {mosquitto_log_printf(MOSQ_LOG_ERR, "Database connection failed: %s", PQerrorMessage(conn));return -1;}// 执行认证查询const char *query = "SELECT password_hash FROM devices WHERE device_id = $1 AND status = 'active'";PGresult *res = PQexecParams(conn, query, 1, NULL, &username, NULL, NULL, 0);if (PQresultStatus(res) != PGRES_TUPLES_OK) {PQclear(res);PQfinish(conn);return -1;}if (PQntuples(res) == 1) {char *stored_hash = PQgetvalue(res, 0, 0);int result = verify_password(password, stored_hash);PQclear(res);PQfinish(conn);return result;}PQclear(res);PQfinish(conn);return -1;
}

4.3 插件编译与配置

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(mosquitto_auth_db)find_library(MOSQUITTO_LIB mosquitto)
find_path(MOSQUITTO_INCLUDE_DIR mosquitto_broker.h)add_library(auth_db SHARED mosquitto_auth_db.c)
target_include_directories(auth_db PRIVATE ${MOSQUITTO_INCLUDE_DIR})
target_link_libraries(auth_db ${MOSQUITTO_LIB})

插件配置文件

# mosquitto.conf 中添加
allow_anonymous false
auth_plugin /path/to/auth_db.so
auth_opt_db_host localhost
auth_opt_db_port 5432
auth_opt_db_name iot_db
auth_opt_db_user mosquitto
auth_opt_db_password secret
auth_opt_cache_timeout 300

5. 后端服务集成

5.1 go 后端服务示例
我们项目的后端使用go语言开发,go语言有很好的mqtt开发包,可以直接引入使用。下面是一个简单的范例,只展示MQTT的初始化连接部分。

MQTT的相关接口在mqtt.go实现

package servicesimport ("encoding/json""fmt""itcps/database""itcps/logger""itcps/models""math""math/rand""strings""sync""time"MQTT "github.com/eclipse/paho.mqtt.golang"
)// connectBroker 尝试连接单个broker
func (m *MQTTService) connectBroker(broker *models.MQTTBroker) error {opts := MQTT.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s", broker.IP))clientID := fmt.Sprintf("itcps-service-%s-%d", broker.IP, time.Now().UnixNano())opts.SetClientID(clientID)opts.SetUsername(broker.Username)opts.SetPassword(broker.Password)opts.SetDefaultPublishHandler(m.messageHandler)opts.SetAutoReconnect(true)opts.SetConnectRetry(true)opts.SetOnConnectHandler(m.onConnect)opts.SetConnectionLostHandler(m.onConnectionLost)opts.SetConnectTimeout(5 * time.Second)client := MQTT.NewClient(opts)// 创建一个带超时的通道done := make(chan error, 1)go func() {token := client.Connect()token.Wait()done <- token.Error()}()// 设置5秒超时select {case err := <-done:if err != nil {return fmt.Errorf("连接broker失败: %v", err)}// 从IP:Port中提取IP地址brokerIP := strings.Split(broker.IP, ":")[0]m.mutex.Lock()m.clients[brokerIP] = clientm.mutex.Unlock()logger.Logs("info",map[string]interface{}{"MQTT":      "connectBroker","full_addr": broker.IP,"ip":        brokerIP,},fmt.Sprintf("成功添加broker client, IP: %s", brokerIP))return nilcase <-time.After(5 * time.Second):// 尝试断开连接client.Disconnect(250)return fmt.Errorf("连接broker超时: %s", broker.IP)}
}// Connect 连接到所有可用的broker
func (m *MQTTService) Connect() error {// 获取所有broker列表,使用较大的pageSize确保获取所有brokerbrokers, _, err := models.GetAllBrokers(1, 100)if err != nil {logger.Logs("error",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("获取MQTT代理列表失败: %v", err))return err}logger.Logs("info",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("获取到broker列表,总数: %d", len(brokers)))successCount := 0var lastErr error// 遍历所有状态为0的broker,尝试连接for i := range brokers {logger.Logs("info",map[string]interface{}{"MQTT":          "Connect","broker_ipport": brokers[i].IP,"broker_status": brokers[i].Status,},fmt.Sprintf("正在处理broker: %s, 状态: %s", brokers[i].IP, brokers[i].Status))if brokers[i].Status != "0" {logger.Logs("info",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("跳过非活动broker: %s, 状态: %s", brokers[i].IP, brokers[i].Status))continue}err := m.connectBroker(&brokers[i])if err != nil {lastErr = errlogger.Logs("warn",map[string]interface{}{"MQTT":  "Connect","error": err.Error(),},fmt.Sprintf("连接broker %s 失败: %v", brokers[i].IP, err))continue}successCount++logger.Logs("info",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("成功连接到broker %s", brokers[i].IP))}logger.Logs("info",map[string]interface{}{"MQTT":          "Connect","success_count": successCount,"total_brokers": len(brokers),},fmt.Sprintf("broker连接总结: 成功 %d 个, 总数 %d 个", successCount, len(brokers)))if successCount == 0 {m.resultChan <- lastErrreturn fmt.Errorf("所有broker连接均失败,最后错误: %v", lastErr)}m.resultChan <- nilreturn nil
}// Publish 发布消息到指定的broker
func (m *MQTTService) Publish(topic string, message interface{}, brokerHost string) error {m.mutex.RLock()client, exists := m.clients[brokerHost]m.mutex.RUnlock()if !exists {logger.Logs("error",map[string]interface{}{"MQTT":        "Publish","broker_host": brokerHost,},fmt.Sprintf("broker %s 的client不存在", brokerHost))return fmt.Errorf("broker %s 的client不存在", brokerHost)}payload, err := json.Marshal(message)if err != nil {logger.Logs("error",map[string]interface{}{"MQTT":        "Publish","broker_host": brokerHost,"topic":       topic,"error":       err.Error(),},"消息序列化失败")return err}logger.Logs("info",map[string]interface{}{"MQTT":        "Publish","broker_host": brokerHost,"topic":       topic,"payload":     string(payload),},"准备发布MQTT消息")token := client.Publish(topic, 1, false, payload)if token.Wait() && token.Error() != nil {logger.Logs("error",map[string]interface{}{"MQTT":        "Publish","broker_host": brokerHost,"topic":       topic,"error":       token.Error(),},"MQTT消息发布失败")return token.Error()}logger.Logs("info",map[string]interface{}{"MQTT":        "Publish","broker_host": brokerHost,"topic":       topic,},"MQTT消息发布成功")return nil
}// GetRandomClient 随机获取一个可用的client
func (m *MQTTService) GetRandomClient() (MQTT.Client, string, error) {m.mutex.RLock()defer m.mutex.RUnlock()if len(m.clients) == 0 {return nil, "", fmt.Errorf("没有可用的MQTT client")}// 将所有client转换为切片hosts := make([]string, 0, len(m.clients))for host := range m.clients {hosts = append(hosts, host)}// 随机选择一个hostrand.Seed(time.Now().UnixNano())selectedHost := hosts[rand.Intn(len(hosts))]return m.clients[selectedHost], selectedHost, nil
}// GetMQTTInstance 获取MQTT服务的单例
func GetMQTTInstance() *MQTTService {mqttOnce.Do(func() {mqttInstance = &MQTTService{topics: []string{"REP_INFO",              // 设备启动"HEART_INFO",            // 心跳"REP_LOCATE",            // 定位上报"START_UPDATE_RESPONSE", // 版本升级"REP_CALLEVENT",         // 呼叫事件"LASTWILL",              // 最后遗言"REP_CALLDETAIL",        // 呼叫详情},resultChan:   make(chan error, 1),clients:      make(map[string]MQTT.Client),isConnecting: false,connected:    false,}// 异步连接MQTTgo mqttInstance.connectInBackground()})return mqttInstance
}// 异步连接方法
func (m *MQTTService) connectInBackground() {m.mu.Lock()if m.isConnecting {m.mu.Unlock()return}m.isConnecting = truem.mu.Unlock()logger.Logs("info",map[string]interface{}{"MQTT": "Connect"},"开始后台连接MQTT服务")// 获取所有broker并尝试连接brokers, _, err := models.GetAllBrokers(1, 100)if err != nil {logger.Logs("error",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("获取MQTT代理列表失败: %v", err))return}for i := range brokers {if brokers[i].Status != "0" {continue}go func(broker models.MQTTBroker) {err := m.connectBroker(&broker)if err != nil {logger.Logs("warn",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("连接broker %s 失败: %v", broker.IP, err))return}m.mu.Lock()m.connected = truem.mu.Unlock()logger.Logs("info",map[string]interface{}{"MQTT": "Connect"},fmt.Sprintf("成功连接到broker %s", broker.IP))}(brokers[i])}
}// 新增检查连接状态的方法
func (m *MQTTService) IsConnected() bool {m.mu.RLock()defer m.mu.RUnlock()return m.connected
}
// 消息处理函数
func (m *MQTTService) messageHandler(client MQTT.Client, msg MQTT.Message) {......
}# 使用示例
# 在main.go中调用GetMQTTInstance即可// 2. 然后初始化MQTT服务mqttService := services.GetMQTTInstance()defer mqttService.Disconnect()logger.Logs("info",map[string]interface{}{"Main": "main"},"开启MQTT异步连接")

5.2 设备端连接示例

device_client.py

import paho.mqtt.client as mqtt
import json
import time
import randomclass DeviceClient:def __init__(self, device_id, username, password, broker_host="localhost", broker_port=1883):self.device_id = device_idself.client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)self.client.username_pw_set(username, password)self.client.on_connect = self.on_connectself.client.on_message = self.on_messageself.broker_host = broker_hostself.broker_port = broker_portself.connected = Falsedef on_connect(self, client, userdata, flags, rc):if rc == 0:print(f"Device {self.device_id} connected successfully")self.connected = True# 订阅控制主题control_topic = f"devices/{self.device_id}/control"client.subscribe(control_topic)else:print(f"Connection failed with code {rc}")def on_message(self, client, userdata, msg):print(f"Received control message: {msg.payload.decode()}")# 处理控制命令self.handle_control_message(json.loads(msg.payload.decode()))def handle_control_message(self, message):command = message.get('command')if command == 'reboot':print("Rebooting device...")# 执行重启逻辑self.publish_status("rebooting")elif command == 'update':print("Updating device...")# 执行更新逻辑def connect(self):self.client.connect(self.broker_host, self.broker_port, 60)self.client.loop_start()def disconnect(self):self.client.loop_stop()self.client.disconnect()def publish_telemetry(self):"""发布遥测数据"""telemetry = {"timestamp": int(time.time()),"temperature": random.uniform(20, 30),"humidity": random.uniform(40, 60),"battery": random.uniform(80, 100)}topic = f"devices/{self.device_id}/telemetry"self.client.publish(topic, json.dumps(telemetry), qos=1)def publish_status(self, status):"""发布设备状态"""message = {"status": status,"timestamp": int(time.time())}topic = f"devices/{self.device_id}/status"self.client.publish(topic, json.dumps(message), qos=1)# 使用示例
if __name__ == "__main__":device = DeviceClient(device_id="device_001",username="device_001",password="device_password")device.connect()device.publish_status("online")try:while True:if device.connected:device.publish_telemetry()time.sleep(30)  # 每30秒发送一次遥测数据except KeyboardInterrupt:device.publish_status("offline")device.disconnect()

6. 优化建议

6.1 安全最佳实践

  1. TLS/SSL 加密: 始终在生产环境使用 TLS 加密
  2. 认证强化: 使用强密码策略和定期轮换
  3. ACL 精细化: 基于最小权限原则配置主题访问控制
  4. 网络隔离: 将 MQTT Broker 部署在 DMZ 区域

6.2 性能优化

  1. 连接池管理: 后端服务使用连接池避免频繁连接
  2. QoS 选择: 根据业务需求选择合适的 QoS 等级
  3. 消息大小控制: 限制单个消息大小,避免大消息阻塞
  4. 持久化优化: 合理配置持久化策略平衡性能和数据安全

6.3 监控与运维

  1. 健康检查: 实现 Broker 健康检查机制
  2. 指标监控: 监控连接数、消息吞吐量等关键指标
  3. 日志管理: 集中管理日志并设置合理的日志级别
  4. 备份策略: 定期备份配置和持久化数据

7 总结

本文详细介绍了 Mosquitto MQTT Broker 的分布式部署方案、自定义认证插件开发以及前后端集成的最佳实践。通过这种架构,可以实现高可用、可扩展的物联网平台,满足企业级应用的需求。在我的一个物联网项目中,已经稳定运行一年多,没有出现一起Broker的故障事件,还是很稳定。对于一些物联网项目的简单应用,比如设备状态上报,指令下发,消息广播,Mosquitto完全满足生产需要。

http://www.dtcms.com/a/418167.html

相关文章:

  • 无人机姿态控制技术详解
  • 做网站所需要的公司细责及条款微信小程序推广
  • keepalived部署
  • 前端实现抽烟识别:从算法到可视化
  • j2ee网站开发免费教程甘肃金顶建设公司网站
  • Linux ssh/scp/sftp命令使用及免密登录配置
  • CBB21B-MPB电子元器件 RC容钏电子 高性能金属化聚丙烯薄膜直流电容器 技术解析
  • CustomKD论文阅读
  • 腾讯面试题总结(1)
  • 【服务器知识】HTTP 请求头信息及其用途详细说明
  • AbMole| ABDP 493/503( M9850;中性脂滴荧光探针)
  • QML核心概念:用户输入与布局管理
  • 在原备案号下增加新网站微信公众平台是什么
  • AI智能体实战开发教程(从0到企业级项目落地):62节全完结,助力金九银十升职加薪!
  • 【网络编程】套接字入门:网络字节序与套接字种类剖析
  • 【Linux】Linux下的静态链接的底层逻辑
  • 2、Lombok核心注解详解:@Getter、@Setter、@Data 等基础注解全面解析
  • 兴力网站建设wordpress文章类型模板
  • springboot高校教务管理系统设计与实现(代码+数据库+LW)
  • Vala 编程语言高级特性-具有语法支持的方法
  • JavaEE初阶4.0
  • 医疗编程AI技能树与培训技能树报告(国内外一流大学医疗AI相关专业分析2025版,上)
  • 【IEEE出版 | 高录用、稳定检索】第七届信息与计算机前沿技术国际学术会议(ICFTIC 2025)
  • 我爱学算法之—— 模拟(上)
  • 白云做网站网店怎么注册开网店
  • 有了域名和主机怎么做网站erp软件是什么软件
  • 大数据毕业设计选题推荐-基于大数据的青光眼数据可视化分析系统-大数据-Spark-Hadoop-Bigdata
  • 数据可视化 | 热力图Heatmap绘制Python代码 相关性矩阵学术可视化
  • C#对称加密(AES)的简单代码
  • AR眼镜在安防领域人脸识别技术方案|阿法龙XR云平台