【Mosquitto的数据流程架构】
分析了 Mosquitto 的源代码后,绘制出其核心的数据流程架构和模块拓扑图。
Mosquitto 数据流程架构总览
1. 核心架构拓扑图
┌──────────────────────────────────────────────────────────────────────────────┐
│ MOSQUITTO BROKER │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ Network I/O │ │ Protocol │ │ Security & │ │
│ │ Module │◄──►│ Processing │◄──►│ Authentication │ │
│ │ (net.c) │ │ (read_handle.c)│ │ (mosquitto_pw.c) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────────────┘ │
│ ▲ │ │ │
│ │ ▼ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ Socket │ │ Message │ │ Access Control │ │
│ │ Management │ │ Routing │ │ (acl.c) │ │
│ │ (socket.c) │ │ (handle_*.c) │ └─────────────────────────┘ │
│ └─────────────────┘ └─────────────────┘ │ │
│ │ │ │ │
│ │ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ Event Loop │ │ In-Memory Database │ │
│ │ (loop.c) │ │ (database.c) │ │
│ └─────────────────┘ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ │ • Client Sessions │ │ │
│ │ │ │ • Message Store (QoS 1/2) │ │ │
│ ▼ │ │ • Subscription Trees │ │ │
│ ┌─────────────────┐ │ │ • Retained Messages │ │ │
│ │ Thread Pool │ │ └─────────────────────────────────────────┘ │ │
│ │ (thread.c) │ └─────────────────────────────────────────────┘ │
│ └─────────────────┘ │ │
│ │ │ │
│ │ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ Persistence │ │ Plugins & │ │
│ │ (persist.c) │◄──►│ Extensions │ │
│ └─────────────────┘ │ (plugin.c, dynamic_security.c) │ │
│ └─────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘│▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ External Dependencies │
├──────────────────────────────────────────────────────────────────────────────┤
│ • libevent/libev (Event Loop) │
│ • OpenSSL (TLS Encryption) │
│ • cJSON (JSON Processing) │
│ • libwebsockets (WebSocket Support) │
└──────────────────────────────────────────────────────────────────────────────┘
2. 详细数据流程图
┌─────────────┐ TCP/UDP/WebSocket ┌─────────────┐
│ Client │ ──────────────────────► │ Listener │
│ (Publisher/ │ │ (socket.c) │
│ Subscriber) │ ◄────────────────────── │ │
└─────────────┘ MQTT Packets └─────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ Network I/O Layer (net.c) │
│ • Socket accept/read/write │
│ • SSL/TLS handshake │
│ • Protocol detection (MQTT/WebSocket) │
└─────────────────────────────────────────────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ Protocol Processing (read_handle.c) │
│ • MQTT packet parsing │
│ • Protocol validation │
│ • CONNECT/CONNACK handshake │
└─────────────────────────────────────────────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ Security Layer (mosquitto_pw.c) │
│ • Username/password authentication │
│ • Client certificate validation │
│ • ACL verification (acl.c) │
└─────────────────────────────────────────────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ Message Processing (handle_*.c) │
│ • handle_publish() - Process publications │
│ • handle_subscribe() - Process subscriptions │
│ • handle_unsubscribe()- Remove subscriptions │
│ • handle_ping() - Keep-alive │
└─────────────────────────────────────────────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ In-Memory Database (database.c) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Client Sessions│ │ Message Store │ │
│ │ • Clean session│ │ • QoS 1/2 msgs │ │
│ │ • Will message │ │ • Message IDs │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌───────────────── ┐ │
│ │ Subscriptions │ │ Retained Messages│ │
│ │ • Topic filters│ │ • Last message │ │
│ │ • QoS levels │ │ per topic │ │
│ └─────────────────┘ └───────────────── ┘ │
└─────────────────────────────────────────────────────┘│ │ ││ │ │▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Session Restore │ │ Message Delivery│ │ Retained Message│
│ on Reconnect │ │ to Subscribers │ │ Delivery │
└─────────────────┘ └─────────────────┘ └─────────────────┘│▼
┌─────────────────────────────────────────────────────┐
│ Persistence Layer (persist.c) │
│ • Session data persistence │
│ • Message store backup │
│ • Configuration persistence │
└─────────────────────────────────────────────────────┘
3. 核心模块详细说明
网络层模块 (Network Layer)
// net.c - 核心网络I/O
struct mosquitto_net {int sock; // 套接字描述符SSL *ssl; // SSL上下文struct mosquitto *context; // 客户端上下文time_t last_msg_in; // 最后消息时间time_t last_msg_out; // 最后发送时间
};// 主要功能:
// - mosquitto_net_init() 网络初始化
// - net__socket_accept() 接受新连接
// - net__read() 读取数据
// - net__write() 写入数据
协议处理层 (Protocol Layer)
// read_handle.c - 数据包处理
int mqtt3_packet_handle(struct mosquitto *context)
{switch(packet->command){case CMD_CONNECT: return handle__connect(context);case CMD_PUBLISH: return handle__publish(context);case CMD_SUBSCRIBE: return handle__subscribe(context);case CMD_UNSUBSCRIBE:return handle__unsubscribe(context);case CMD_PINGREQ: return handle__pingreq(context);case CMD_DISCONNECT: return handle__disconnect(context);}
}// handle_publish.c - 发布消息处理
int handle__publish(struct mosquitto *context)
{// 1. 验证消息有效性// 2. 检查ACL权限// 3. 存储QoS消息// 4. 查找匹配的订阅者// 5. 分发消息
}
内存数据库模块 (In-Memory Database)
// database.c - 核心数据结构
struct mosquitto_db {// 客户端管理struct mosquitto **contexts; // 客户端数组int context_count; // 客户端数量// 消息存储struct mosquitto_msg_store *msg_store; // QoS消息链表// 订阅管理struct sub__node *subscriptions; // 订阅链表// 保留消息struct retain__node *retains; // 保留消息链表// 桥接配置struct bridge *bridges; // 桥接连接
};// 订阅树结构(用于高效主题匹配)
struct sub__node {char *topic; // 主题过滤器struct mosquitto *context; // 客户端上下文uint8_t qos; // QoS等级struct sub__node *next; // 下一个订阅struct sub__node *children; // 子节点(通配符处理)
};
消息路由流程
┌──────────────┐│ Publish ││ Message │└──────────────┘│▼┌──────────────┐│ Topic-based ││ Matching │└──────────────┘│▼┌─────────────────┐│ Find Subscribers││ for Topic │└─────────────────┘│▼┌─────────────────┐│ Apply ACL Rules ││ (acl.c) │└─────────────────┘│▼┌─────────────────┐│ Quality of ││ Service Handling│└─────────────────┘┌──────────────┼──────────────┐│ │ │▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ QoS 0 │ │ QoS 1 │ │ QoS 2 │
│ Immediate │ │ Store & │ │ Complex │
│ Delivery │ │ Forward │ │ Handshake │
└─────────────────┘ └─────────────────┘ └─────────────────┘
4. 线程和事件循环架构
┌─────────────────────────────────────────────────────┐
│ Main Event Loop (loop.c) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Listen Sockets│ │ Client │ │
│ │ • Accept new │ │ Sockets │ │
│ │ connections │ │ • Read/write │ │
│ │ • Port reuse │ │ • Timeout │ │
│ └─────────────────┘ │ handling │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Thread Pool (thread.c) │ │
│ │ • Background tasks │ │
│ │ • Persistence operations │ │
│ │ • Plugin execution │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
5. 插件系统架构
┌─────────────────────────────────────────────────────┐
│ Plugin System (plugin.c) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Auth Plugins │ │ Message │ │
│ │ • Authentication│ │ Plugins │ │
│ │ • ACL checks │ │ • Transform │ │
│ └─────────────────┘ │ messages │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Dynamic Security │ │
│ │ • Runtime configuration │ │
│ │ • Security policies │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
6. 数据持久化流程
┌─────────────────┐ Periodic ┌─────────────────┐
│ In-Memory │ ◄───────────── │ Persistence │
│ Database │ Sync │ Timer │
│ (database.c) │ ─────────────► │ (persist.c) │
└─────────────────┘ Save └─────────────────┘│ │ ││ │ ▼│ │ ┌─────────────────┐│ │ │ Disk Storage ││ │ │ • Session data││ │ │ • Messages ││ │ └─────────────────┘│ ││ └──────────────────────────┐▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Client │ │ Broker │
│ Reconnect │ │ Restart │
│ Recovery │ │ Recovery │
└─────────────────┘ └─────────────────┘
关键设计特点
单线程事件循环:主逻辑运行在单线程中,通过libevent/libev处理高并发
零拷贝设计:消息在内存中传递时避免不必要的复制
分层架构:清晰的网络-协议-业务逻辑分离
插件化扩展:通过插件系统支持自定义功能
内存数据库:所有状态保存在内存中,确保高性能
异步持久化:后台线程处理磁盘IO,不影响主线程性能
这种架构使 Mosquitto 能够高效处理大量并发连接,同时保持低延迟和高吞吐量。