使用 Go 语言实现完整且轻量级高性能的 MQTT Broker
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多,但是服务端着实不多,常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并发高性能,很合适做mqtt Broker。本文将详细介绍如何使用 Go 语言实现一个简单轻量级且高性能的 MQTT Broker,并涵盖MQTT3.1.1协议的核心特性和完整功能。
1. 需求分析
本文选择golang语言实现一个完整的 MQTT 3.1.1 Broker,不涉及集群支持和协议版本检测。简单且轻量级,不但可以替代mosquitto,后续还可以灵活的做扩展,如增加webUI的管理界面。且部署也很简单,一个exe可执行文件。
完整项目开源地址:https://github.com/yangyongzhen/goang-mqtt-broker
gitee: https://gitee.com/yyz116/goang-mqtt-broker
可执行文件在release目录下。
1.1 实现效果截图
服务启动:
客户端发布:
客户端订阅:
使用mosquitto客户端测试效果:
优化增加基于redis的持久化存储:
可在etc/config.yaml文件中配置是否启用redis的持久化。默认基于内存。
windows下的可执行文件仅有7M左右大小,简单小巧。且代码开源方便定制。可以作为替代mosquitto的另外一种选择。
1.2 功能特性
1.2.1核心功能
- ✅ 完整的 MQTT 3.1.1 协议支持
- ✅ QoS 0, 1, 2 消息传递保证
- ✅ 会话管理(持久会话和清理会话)
- ✅ 保留消息(Retained Messages)
- ✅ 遗嘱消息(Last Will and Testament)
- ✅ 主题通配符(+ 和 # 通配符支持)
- ✅ 客户端认证(用户名/密码)
- ✅ 保活机制(Keep Alive)
- ✅ 并发安全
1.2.2 架构特性
- 🏗️ 模块化设计,易于扩展
- 🔌 可插拔存储接口
- 🔒 线程安全的并发处理
- 📊 内置监控指标
- 🐳 Docker 支持
2. 项目架构设计
架构设计
数据流
客户端连接 → TCP Server 接受连接
协议解析 → Client 解析 MQTT 数据包
认证验证 → Auth 模块验证用户凭据
会话管理 → Storage 加载/保存会话信息
消息路由 → Broker 根据订阅关系路由消息
主题匹配 → Topic Manager 处理通配符匹配
2.1 目录结构
mqtt-broker/
├── README.md
├── Makefile
├── Dockerfile
├── go.mod
├── go.sum
├── cmd/
│ ├── broker/
│ │ └── main.go
│ └── test-client/
│ └── main.go
├── internal/
│ ├── auth/
│ │ └── auth.go
│ ├── broker/
│ │ ├── broker.go
│ │ ├── client.go
│ │ └── topic.go
│ ├── protocol/
│ │ ├── common/
│ │ │ └── types.go
│ │ └── mqtt311/
│ │ └── packet.go
│ └── storage/
│ ├── interface.go
│ └── memory/
│ └── store.go
└── pkg/└── mqtt/└── packet.go
2.2 主要模块
- cmd/broker/main.go:程序入口。
- internal/broker/:Broker 核心逻辑,包括连接管理、消息路由等。
- internal/storage/:存储接口和内存实现。
- pkg/mqtt/packet.go:MQTT 数据包编码和解码。
3. 核心实现
3.1 存储接口
在 internal/storage/interface.go
文件中定义存储接口:
package storageimport ("github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error
}type Session struct {ClientID stringCleanSession boolSubscriptions map[string]*common.SubscriptionPendingAcks map[uint16]*common.MessageLastSeen time.Time
}
3.2 内存存储实现
在 internal/storage/memory/store.go
文件中实现内存存储:
package memoryimport ("sync""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type MemoryStore struct {sessions map[string]*storage.SessionretainedMsgs map[string]*common.MessageclientMessages map[string][]*common.Messagemu sync.RWMutex
}func NewMemoryStore() *MemoryStore {return &MemoryStore{sessions: make(map[string]*storage.Session),retainedMsgs: make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),}
}func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] = sessionreturn nil
}func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists := m.sessions[clientID]if !exists {return nil, nil}return session, nil
}// 其他方法省略...
3.3 客户端连接管理
在 internal/broker/client.go
文件中实现客户端连接管理:
package brokerimport ("bufio""fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/pkg/mqtt"
)type Client struct {conn net.ConnclientID stringinfo *common.ClientInfosession *storage.Sessionbroker *BrokerpacketReader *mqtt.PacketReaderwriteChan chan []bytecloseChan chan struct{}keepAliveTimer *time.Timermu sync.RWMutexconnected boolnextPacketID uint16pendingAcks map[uint16]*PendingMessage
}type PendingMessage struct {Message *common.MessageTimestamp time.TimeRetries int
}func NewClient(conn net.Conn, broker *Broker) *Client {return &Client{conn: conn,broker: broker,packetReader: mqtt.NewPacketReader(conn),writeChan: make(chan []byte, 1000),closeChan: make(chan struct{}),pendingAcks: make(map[uint16]*PendingMessage),nextPacketID: 1,}
}func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop()
}func (c *Client) readLoop() {defer c.Close()for {select {case <-c.closeChan:returndefault:packet, err := c.packetReader.ReadPacket()if err != nil {fmt.Printf("Read packet error: %v\n", err)return}c.handlePacket(packet)}}
}func (c *Client) writeLoop() {defer c.Close()for {select {case data := <-c.writeChan:if _, err := c.conn.Write(data); err != nil {fmt.Printf("Write error: %v\n", err)return}case <-c.closeChan:return}}
}func (c *Client) retryLoop() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:c.retryPendingMessages()case <-c.closeChan:return}}
}func (c *Client) handlePacket(packet common.Packet) {switch p := packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()}
}// handleConnect, handlePublish 等其他方法省略...
3.4 主 Broker 实现
在 internal/broker/broker.go
文件中实现主 Broker 的逻辑:
package brokerimport ("fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/storage"
)type Broker struct {listener net.Listenerclients map[string]*ClienttopicManager *TopicManagerstore storage.Storeauth auth.Authenticatormu sync.RWMutexrunning boolconfig *Config
}type Config struct {MaxConnections intMaxMessageSize intRetainedMsgLimit intSessionExpiry time.DurationMessageExpiry time.Duration
}func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return &Broker{clients: make(map[string]*Client),topicManager: NewTopicManager(),store: store,auth: authenticator,config: &Config{MaxConnections: 10000,MaxMessageSize: 1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry: 24 * time.Hour,MessageExpiry: 24 * time.Hour,},}
}func (b *Broker) Start(address string) error {listener, err := net.Listen("tcp", address)if err != nil {return err}b.listener = listenerb.running = truefmt.Printf("MQTT Broker started on %s\n", address)for b.running {conn, err := listener.Accept()if err != nil {if b.running {fmt.Printf("Accept error: %v\n", err)}continue}client := NewClient(conn, b)go client.Start()}return nil
}func (b *Broker) Stop() {b.running = falseif b.listener != nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client := range b.clients {client.Close()}
}// AddClient, RemoveClient, PublishMessage 等其他方法省略...
3.5 主程序入口
在 cmd/broker/main.go
文件中定义主程序入口:
package mainimport ("flag""fmt""log""os""os/signal""syscall""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/broker""github.com/yangyongzhen/mqtt-broker/internal/storage/memory"
)func main() {addr := flag.String("addr", ":1883", "MQTT broker address")flag.Parse()authenticator := auth.NewSimpleAuthenticator() // 示例认证器,需要自行实现store := memory.NewMemoryStore()b := broker.NewBroker(store, authenticator)go func() {if err := b.Start(*addr); err != nil {log.Fatalf("Failed to start MQTT broker: %v", err)}}()sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanb.Stop()fmt.Println("MQTT broker stopped")
}
以上代码是实现一个简单的 MQTT Broker 的基础框架,更多详细功能和性能优化可以根据实际需求进行扩展和改进。
安装和运行
- 克隆项目
git clone <your-repo-url>
cd mqtt-brokergo mod tidy
安装依赖
go mod tidy
构建项目
make build
或者
go build -o bin/mqtt-broker cmd/broker/main.go
运行 Broker
make run
或者
./bin/mqtt-broker -addr=:1883 -debug
使用 Docker
构建镜像
docker build -t mqtt-broker .
#### 运行容器
docker run -p 1883:1883 mqtt-broker
#### 使用示例**启动 Broker**
#### 默认端口 1883
go run cmd/broker/main.go
##### 自定义端口和调试模式
go run cmd/broker/main.go -addr=:1883 -debug
测试客户端
项目包含一个简单的测试客户端,可以用来测试 broker 功能:
订阅消息:
go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1
发布消息:
go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1
使用第三方客户端
你也可以使用任何标准的 MQTT 客户端连接到 broker:
使用 mosquitto 客户端:
订阅
mosquitto_sub -h localhost -p 1883 -t "test/topic"
发布
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"
使用认证:
默认用户:
admin/password, test/test123
mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message”
配置说明
命令行参数
参数 默认值 说明
-addr :1883 Broker 监听地址
-debug false 启用调试日志
内置用户
Broker 默认创建了以下测试用户:
用户名 密码
admin password
test test123
项目开源地址:
https://github.com/yangyongzhen/goang-mqtt-broker
gitee: https://gitee.com/yyz116/goang-mqtt-broker
作者
作者csdn猫哥,转载请注明出处: https://blog.csdn.net/yyz_1987