当前位置: 首页 > news >正文

使用 Go 语言实现完整且轻量级高性能的 MQTT Broker

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多,但是服务端着实不多,常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并发高性能,很合适做mqtt Broker。本文将详细介绍如何使用 Go 语言实现一个简单轻量级且高性能的 MQTT Broker,并涵盖MQTT3.1.1协议的核心特性和完整功能。

1. 需求分析

本文选择golang语言实现一个完整的 MQTT 3.1.1 Broker,不涉及集群支持和协议版本检测。简单且轻量级,不但可以替代mosquitto,后续还可以灵活的做扩展,如增加webUI的管理界面。且部署也很简单,一个exe可执行文件。

完整项目开源地址:https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

可执行文件在release目录下。

1.1 实现效果截图

服务启动:
在这里插入图片描述
客户端发布:
在这里插入图片描述
客户端订阅:
在这里插入图片描述
使用mosquitto客户端测试效果:

在这里插入图片描述
在这里插入图片描述
优化增加基于redis的持久化存储:
可在etc/config.yaml文件中配置是否启用redis的持久化。默认基于内存。
在这里插入图片描述
windows下的可执行文件仅有7M左右大小,简单小巧。且代码开源方便定制。可以作为替代mosquitto的另外一种选择。
在这里插入图片描述

1.2 功能特性
1.2.1核心功能
  • 完整的 MQTT 3.1.1 协议支持
  • QoS 0, 1, 2 消息传递保证
  • 会话管理(持久会话和清理会话)
  • 保留消息(Retained Messages)
  • 遗嘱消息(Last Will and Testament)
  • 主题通配符(+ 和 # 通配符支持)
  • 客户端认证(用户名/密码)
  • 保活机制(Keep Alive)
  • 并发安全
1.2.2 架构特性
  • 🏗️ 模块化设计,易于扩展
  • 🔌 可插拔存储接口
  • 🔒 线程安全的并发处理
  • 📊 内置监控指标
  • 🐳 Docker 支持

2. 项目架构设计

架构设计

数据流

客户端连接 → TCP Server 接受连接
协议解析 → Client 解析 MQTT 数据包
认证验证 → Auth 模块验证用户凭据
会话管理 → Storage 加载/保存会话信息
消息路由 → Broker 根据订阅关系路由消息
主题匹配 → Topic Manager 处理通配符匹配

2.1 目录结构
mqtt-broker/
├── README.md
├── Makefile
├── Dockerfile
├── go.mod
├── go.sum
├── cmd/
│   ├── broker/
│   │   └── main.go
│   └── test-client/
│       └── main.go
├── internal/
│   ├── auth/
│   │   └── auth.go
│   ├── broker/
│   │   ├── broker.go
│   │   ├── client.go
│   │   └── topic.go
│   ├── protocol/
│   │   ├── common/
│   │   │   └── types.go
│   │   └── mqtt311/
│   │       └── packet.go
│   └── storage/
│       ├── interface.go
│       └── memory/
│           └── store.go
└── pkg/└── mqtt/└── packet.go
2.2 主要模块
  • cmd/broker/main.go:程序入口。
  • internal/broker/:Broker 核心逻辑,包括连接管理、消息路由等。
  • internal/storage/:存储接口和内存实现。
  • pkg/mqtt/packet.go:MQTT 数据包编码和解码。

3. 核心实现

3.1 存储接口

internal/storage/interface.go 文件中定义存储接口:

package storageimport ("github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error
}type Session struct {ClientID      stringCleanSession  boolSubscriptions map[string]*common.SubscriptionPendingAcks   map[uint16]*common.MessageLastSeen      time.Time
}
3.2 内存存储实现

internal/storage/memory/store.go 文件中实现内存存储:

package memoryimport ("sync""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type MemoryStore struct {sessions        map[string]*storage.SessionretainedMsgs    map[string]*common.MessageclientMessages  map[string][]*common.Messagemu              sync.RWMutex
}func NewMemoryStore() *MemoryStore {return &MemoryStore{sessions:       make(map[string]*storage.Session),retainedMsgs:   make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),}
}func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] = sessionreturn nil
}func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists := m.sessions[clientID]if !exists {return nil, nil}return session, nil
}// 其他方法省略...
3.3 客户端连接管理

internal/broker/client.go 文件中实现客户端连接管理:

package brokerimport ("bufio""fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/pkg/mqtt"
)type Client struct {conn           net.ConnclientID       stringinfo           *common.ClientInfosession        *storage.Sessionbroker         *BrokerpacketReader   *mqtt.PacketReaderwriteChan      chan []bytecloseChan      chan struct{}keepAliveTimer *time.Timermu             sync.RWMutexconnected      boolnextPacketID   uint16pendingAcks    map[uint16]*PendingMessage
}type PendingMessage struct {Message   *common.MessageTimestamp time.TimeRetries   int
}func NewClient(conn net.Conn, broker *Broker) *Client {return &Client{conn:         conn,broker:       broker,packetReader: mqtt.NewPacketReader(conn),writeChan:    make(chan []byte, 1000),closeChan:    make(chan struct{}),pendingAcks:  make(map[uint16]*PendingMessage),nextPacketID: 1,}
}func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop()
}func (c *Client) readLoop() {defer c.Close()for {select {case <-c.closeChan:returndefault:packet, err := c.packetReader.ReadPacket()if err != nil {fmt.Printf("Read packet error: %v\n", err)return}c.handlePacket(packet)}}
}func (c *Client) writeLoop() {defer c.Close()for {select {case data := <-c.writeChan:if _, err := c.conn.Write(data); err != nil {fmt.Printf("Write error: %v\n", err)return}case <-c.closeChan:return}}
}func (c *Client) retryLoop() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:c.retryPendingMessages()case <-c.closeChan:return}}
}func (c *Client) handlePacket(packet common.Packet) {switch p := packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()}
}// handleConnect, handlePublish 等其他方法省略...
3.4 主 Broker 实现

internal/broker/broker.go 文件中实现主 Broker 的逻辑:

package brokerimport ("fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/storage"
)type Broker struct {listener      net.Listenerclients       map[string]*ClienttopicManager  *TopicManagerstore         storage.Storeauth          auth.Authenticatormu            sync.RWMutexrunning       boolconfig        *Config
}type Config struct {MaxConnections   intMaxMessageSize   intRetainedMsgLimit intSessionExpiry    time.DurationMessageExpiry    time.Duration
}func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return &Broker{clients:      make(map[string]*Client),topicManager: NewTopicManager(),store:        store,auth:         authenticator,config: &Config{MaxConnections:   10000,MaxMessageSize:   1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry:    24 * time.Hour,MessageExpiry:    24 * time.Hour,},}
}func (b *Broker) Start(address string) error {listener, err := net.Listen("tcp", address)if err != nil {return err}b.listener = listenerb.running = truefmt.Printf("MQTT Broker started on %s\n", address)for b.running {conn, err := listener.Accept()if err != nil {if b.running {fmt.Printf("Accept error: %v\n", err)}continue}client := NewClient(conn, b)go client.Start()}return nil
}func (b *Broker) Stop() {b.running = falseif b.listener != nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client := range b.clients {client.Close()}
}// AddClient, RemoveClient, PublishMessage 等其他方法省略...
3.5 主程序入口

cmd/broker/main.go 文件中定义主程序入口:

package mainimport ("flag""fmt""log""os""os/signal""syscall""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/broker""github.com/yangyongzhen/mqtt-broker/internal/storage/memory"
)func main() {addr := flag.String("addr", ":1883", "MQTT broker address")flag.Parse()authenticator := auth.NewSimpleAuthenticator() // 示例认证器,需要自行实现store := memory.NewMemoryStore()b := broker.NewBroker(store, authenticator)go func() {if err := b.Start(*addr); err != nil {log.Fatalf("Failed to start MQTT broker: %v", err)}}()sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanb.Stop()fmt.Println("MQTT broker stopped")
}

以上代码是实现一个简单的 MQTT Broker 的基础框架,更多详细功能和性能优化可以根据实际需求进行扩展和改进。

安装和运行

  1. 克隆项目
git clone <your-repo-url>
cd mqtt-brokergo mod tidy

安装依赖

go mod tidy

构建项目

make build
或者
go build -o bin/mqtt-broker cmd/broker/main.go
运行 Broker
make run
或者
./bin/mqtt-broker -addr=:1883 -debug

使用 Docker

构建镜像
docker build -t mqtt-broker .
#### 运行容器
docker run -p 1883:1883 mqtt-broker
#### 使用示例**启动 Broker**
#### 默认端口 1883
go run cmd/broker/main.go
##### 自定义端口和调试模式
go run cmd/broker/main.go -addr=:1883 -debug

测试客户端

项目包含一个简单的测试客户端,可以用来测试 broker 功能:

订阅消息:

go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1

发布消息:

go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1

使用第三方客户端

你也可以使用任何标准的 MQTT 客户端连接到 broker:

使用 mosquitto 客户端:

订阅
mosquitto_sub -h localhost -p 1883 -t "test/topic"
发布
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"

使用认证:

默认用户:

admin/password, test/test123
mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message”

配置说明

命令行参数
参数 默认值 说明
-addr :1883 Broker 监听地址
-debug false 启用调试日志

内置用户

Broker 默认创建了以下测试用户:

用户名 密码
admin password
test test123

项目开源地址:

https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

作者

作者csdn猫哥,转载请注明出处: https://blog.csdn.net/yyz_1987

相关文章:

  • vue3使用七牛云上传文件
  • MySQL主从同步原理
  • 快速失败(fail-fast)和安全失败(fail-safe)的区别
  • 传统医疗系统文档集中标准化存储和AI智能化更新路径分析
  • 爬虫知识之IP代理
  • 用 AI 让学习更懂你:如何打造自动化个性化学习系统?
  • 嵌入式开发方向开发利器
  • OpenLayers 加载全屏显示控件
  • 【萤火工场GD32VW553-IOT开发板】ADC电压的LabVIEW采集
  • 【Code Agent Benchmark】论文分享No.15:TAU-Bench
  • 标准版v5.6.1, 优化了一些细节提升体验
  • RabbitMQ的详细使用
  • 超详细网络介绍(超全)
  • 5.24本日总结
  • CQF预备知识:Python相关库 -- NumPy 基础知识 - 数组创建
  • Ubuntu20.04 gr-gsm完整安装教程
  • SQL每日一题
  • SQL SERVER常用聚合函数整理及示例
  • 全面指南:使用Node.js和Python连接与操作MongoDB
  • 二十五、面向对象底层逻辑-SpringMVC九大组件之HandlerMapping接口设计
  • 专业设计自学网站/网站工具查询
  • 织梦手机网站建设/百度问答seo
  • 大连商城网站建设/抖音推广方案
  • 学校网站建设的优势和不足/北京seo网站优化培训
  • 永年网站建设/接外包项目的网站
  • 网站淘宝客 难做/中央新闻频道直播今天