当前位置: 首页 > news >正文

Paho MQTT C 客户端源码深入解析

1. 引言

Eclipse Paho MQTT C 客户端是一个开源的 MQTT 客户端库,实现了 MQTT 3.1.1 和 MQTT 5.0 协议。本文档将深入分析该库的源码结构和关键实现机制,帮助开发者更好地理解其工作原理。

2. 项目结构概述

Paho MQTT C 客户端库的源码主要分布在 src 目录下,核心文件包括:

  • MQTTClient.c: 客户端 API 实现

  • MQTTProtocolClient.c: MQTT 协议处理

  • Socket.c: 网络 socket 操作

  • SocketBuffer.c: socket 缓冲区管理

  • Thread.c: 线程相关操作

  • LinkedList.c: 链表数据结构

  • MQTTPacket.c: MQTT 数据包处理

  • MQTTProtocolOut.c: 发送端协议实现

  • Log.c: 日志系统

  • MQTTPersistence.c: 持久化存储

  • MQTTProperties.c: MQTT 5.0 属性处理

  • MQTTPersistenceDefault.c: 默认持久化实现

  • utf-8.c: UTF-8 字符串验证

  • StackTrace.c: 调用栈跟踪

3. 核心数据结构

3.1 Clients 结构体

Clients 结构体是库中最重要的数据结构之一,代表一个 MQTT 客户端实例:

typedef struct Clients
{char* clientID;                 // 客户端标识符const char* username;           // MQTT v3.1 用户名int passwordlen;                // MQTT 密码长度const void* password;           // MQTT v3.1 二进制密码unsigned int cleansession : 1;  // MQTT V3 会话清理标志unsigned int cleanstart : 1;    // MQTT V5 会话开始标志unsigned int connected : 1;     // 是否已连接unsigned int good : 1;          // socket 是否有错误unsigned int ping_outstanding : 1;unsigned int ping_due : 1;      // 无法发送ping,应该在可以时发送signed int connect_state : 4;   // 连接状态START_TIME_TYPE ping_due_time;  // 应该发送ping的时间 (ping_due)networkHandles net;             // 网络信息int msgID;                      // MQTT 消息IDint keepAliveInterval;          // MQTT 心跳间隔int savedKeepAliveInterval;     // 保存的心跳间隔,以防服务器心跳重置int retryInterval;              // MQTT QoS > 0 重试间隔int maxInflightMessages;        // 允许的最大飞行中出站消息数willMessages* will;             // MQTT 遗嘱消息(如果有)List* inboundMsgs;              // 入站飞行中消息List* outboundMsgs;             // 出站飞行中消息int connect_count;              // 重连时的出站消息数 - 确保发送所有消息int connect_sent;               // 重连时已发送的出站消息数List* messageQueue;             // 入站已完成但未传递的消息List* outboundQueue;            // 出站排队消息unsigned int qentry_seqno;void* phandle;                  // 持久化句柄MQTTClient_persistence* persistence; // 持久化实现MQTTPersistence_beforeWrite* beforeWrite; // 持久化写回调MQTTPersistence_afterRead* afterRead; // 持久化读回调void* beforeWrite_context;      // 与持久化beforeWrite回调一起使用的上下文void* afterRead_context;        // 与持久化afterRead回调一起使用的上下文void* context;                  // 调用上下文 - 用于调用disconnect_internalint MQTTVersion;                // 使用的MQTT版本,3、4或5unsigned int sessionExpiry;     // MQTT 5 会话过期char* httpProxy;                // HTTP代理char* httpsProxy;               // HTTPS代理
#if defined(OPENSSL)MQTTClient_SSLOptions *sslopts; // SSL/TLS连接选项SSL_SESSION* session;           // SSL会话指针,用于快速握手
#endif
} Clients;

3.2 Messages 结构体

Messages 结构体用于跟踪 QoS1 和 QoS2 消息:

typedef struct
{int len;                     // 消息长度volatile int retain;         // 保留标志volatile int qos;            // QoS 等级volatile int dup;            // 重复标志volatile int msgid;          // 消息IDMQTTProperties properties;    // MQTT 5.0 属性time_t lastTouch;            // 最后操作时间int nextMessageType;         // 下一个消息类型Publish* publish;            // 发布消息内容int ref_qos;                 // 引用的QoSint MQTTVersion;             // MQTT版本
} Messages;

3.3 Sockets 结构体

Sockets 结构体用于管理 socket 集合:

typedef struct
{List* connect_pending;       // 正在连接的socket列表List* write_pending;         // 有待写操作的socket列表#if defined(USE_SELECT)fd_set rset, rset_saved;     // select读集合int maxfdp1;                 // 最大文件描述符+1List* clientsds;             // 客户端socket描述符列表ListElement* cur_clientsds;  // 当前客户端socket迭代器fd_set pending_wset;         // 待写socket集合
#elseunsigned int nfds;           // poll文件描述符数量struct pollfd* fds_read;     // poll读文件描述符数组struct pollfd* fds_write;    // poll写文件描述符数组struct {int cur_fd;              // 当前处理的文件描述符索引unsigned int nfds;       // 文件描述符数量struct pollfd* fds_write; // 保存的写文件描述符数组struct pollfd* fds_read;  // 保存的读文件描述符数组} saved;
#endif
} Sockets;

4. 网络层实现

4.1 Socket 管理

网络层的核心是 mod_s 全局变量,它维护了所有 socket 的状态信息。

4.1.1 Socket 初始化

在 Socket_outInitialize() 函数中初始化 socket 管理结构:

void Socket_outInitialize(void)
{SocketBuffer_initialize();mod_s.connect_pending = ListInitialize();mod_s.write_pending = ListInitialize();#if defined(USE_SELECT)mod_s.clientsds = ListInitialize();mod_s.cur_clientsds = NULL;FD_ZERO(&(mod_s.rset));FD_ZERO(&(mod_s.pending_wset));mod_s.maxfdp1 = 0;memcpy((void*)&(mod_s.rset_saved), (void*)&(mod_s.rset), sizeof(mod_s.rset_saved));
#elsemod_s.nfds = 0;mod_s.fds_read = NULL;mod_s.fds_write = NULL;mod_s.saved.cur_fd = -1;mod_s.saved.fds_write = NULL;mod_s.saved.fds_read = NULL;mod_s.saved.nfds = 0;
#endif
}
4.1.2 Socket 添加

Socket_addSocket() 函数用于将新 socket 添加到监控列表:

int Socket_addSocket(SOCKET newSd)
{int rc = 0;FUNC_ENTRY;
#if defined(USE_SELECT)if (ListFindItem(mod_s.clientsds, &newSd, intcompare) == NULL) /* make sure we don't add the same socket twice */{SOCKET* pnewSd = (SOCKET*)malloc(sizeof(newSd));*pnewSd = newSd;ListAppend(mod_s.clientsds, pnewSd, sizeof(newSd));FD_SET(newSd, &(mod_s.rset_saved));mod_s.maxfdp1 = max(mod_s.maxfdp1, (int)newSd + 1);rc = Socket_setnonblocking(newSd);}
#elsemod_s.nfds++;if (mod_s.nfds > mod_s.maxfds){mod_s.maxfds = mod_s.nfds;if (mod_s.fds_read){mod_s.fds_read = realloc(mod_s.fds_read, mod_s.nfds * sizeof(mod_s.fds_read[0]));mod_s.fds_write = realloc(mod_s.fds_write, mod_s.nfds * sizeof(mod_s.fds_write[0]));}else{mod_s.fds_read = malloc(mod_s.nfds * sizeof(mod_s.fds_read[0]));mod_s.fds_write = malloc(mod_s.nfds * sizeof(mod_s.fds_write[0]));}}mod_s.fds_read[mod_s.nfds - 1].fd = newSd;mod_s.fds_write[mod_s.nfds - 1].fd = newSd;mod_s.fds_read[mod_s.nfds - 1].events = POLLIN;mod_s.fds_write[mod_s.nfds - 1].events = POLLOUT;qsort(mod_s.fds_read, (size_t)mod_s.nfds, sizeof(mod_s.fds_read[0]), cmpfds);qsort(mod_s.fds_write, (size_t)mod_s.nfds, sizeof(mod_s.fds_write[0]), cmpfds);rc = Socket_setnonblocking(newSd);
#endifFUNC_EXIT_RC(rc);return rc;
}
4.1.3 Socket 监控

Socket_getReadySocket() 函数是 socket 监控的核心,根据平台选择使用 select 或 poll:

SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* rc)
{SOCKET sock = 0;*rc = 0;int timeout_ms = 1000;FUNC_ENTRY;Paho_thread_lock_mutex(mutex);if (mod_s.nfds == 0 && mod_s.saved.nfds == 0)goto exit;if (more_work)timeout_ms = 0;else if (timeout >= 0)timeout_ms = timeout;// 首先检查缓存的就绪socketwhile (mod_s.saved.cur_fd != -1){if (isReady(mod_s.saved.cur_fd))break;mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;}// 如果没有缓存的就绪socket,则调用poll等待if (mod_s.saved.cur_fd == -1){int rc1 = 0;// 同步当前socket列表到保存的数组if (mod_s.nfds != mod_s.saved.nfds){mod_s.saved.nfds = mod_s.nfds;// 内存管理代码...}if (mod_s.fds_read == NULL)mod_s.saved.fds_read = NULL;elsememcpy(mod_s.saved.fds_read, mod_s.fds_read, mod_s.nfds * sizeof(struct pollfd));if (mod_s.fds_write == NULL)mod_s.saved.fds_write = NULL;elsememcpy(mod_s.saved.fds_write, mod_s.fds_write, mod_s.nfds * sizeof(struct pollfd));if (mod_s.saved.nfds == 0){sock = 0;goto exit; /* no work to do */}// 检查有待写操作的socketrc1 = poll(mod_s.saved.fds_write, mod_s.saved.nfds, 0);if (rc1 > 0 && Socket_continueWrites(&sock, mutex) == SOCKET_ERROR){*rc = SOCKET_ERROR;goto exit;}// 调用poll等待事件Paho_thread_unlock_mutex(mutex);*rc = poll(mod_s.saved.fds_read, mod_s.saved.nfds, timeout_ms);Paho_thread_lock_mutex(mutex);if (*rc == SOCKET_ERROR){Socket_error("poll", 0);goto exit;}Log(TRACE_MAX, -1, "Return code %d from poll", *rc);if (rc1 == 0 && *rc == 0){sock = 0;goto exit; /* no work to do */}// 查找就绪的socketmod_s.saved.cur_fd = 0;while (mod_s.saved.cur_fd != -1){if (isReady(mod_s.saved.cur_fd))break;mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;}}*rc = 0;if (mod_s.saved.cur_fd == -1)sock = 0;else{sock = mod_s.saved.fds_read[mod_s.saved.cur_fd].fd;mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;}
exit:Paho_thread_unlock_mutex(mutex);FUNC_EXIT_RC(sock);return sock;
}

4.2 Socket 缓冲区管理

SocketBuffer 模块负责管理 socket 的读写缓冲区。

4.2.1 写操作缓冲

当 socket 无法一次性完成写操作时,数据会被缓存到 pending_writes 结构中:

typedef struct
{SOCKET socket;int count;iobuf iovecs[5];int frees[5];size_t total;size_t bytes;
#if defined(OPENSSL)int ssl;SSL* ssl_struc;
#endif
} pending_writes;

Socket_continueWrites() 函数负责处理这些挂起的写操作。

5. MQTT 协议实现

5.1 连接管理

MQTT 连接过程由 MQTTClient_connect() 函数发起,主要流程包括:

  1. 参数验证和初始化

  2. 网络连接建立(TCP/SSL/WebSocket)

  3. 发送 CONNECT 报文

  4. 等待 CONNACK 响应

  5. 连接状态处理

5.1.1 网络连接建立

MQTTProtocol_connect() 函数负责建立网络连接:

int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersion,MQTTProperties* connectProperties, MQTTProperties* willProperties, int websocket)
{int rc = SOCKET_ERROR;char* p0;int port;char* addr;size_t addr_len;
#if defined(OPENSSL)int ssl = 0;
#endifFUNC_ENTRY;aClient->good = 1;// 解析地址和端口if ((p0 = strrchr(ip_address, ':')) == NULL){free(aClient->net.http_proxy);free(aClient->net.http_proxy_auth);goto exit;}port = atoi(p0+1);addr_len = p0 - ip_address;addr = malloc(addr_len + 1);memcpy(addr, ip_address, addr_len);addr[addr_len] = '\0';// 根据协议类型处理连接if (strncmp(URI_TCP, ip_address, strlen(URI_TCP)) == 0){addr_len = strlen(ip_address) - strlen(URI_TCP);addr = malloc(addr_len + 1);memcpy(addr, ip_address + strlen(URI_TCP), addr_len);addr[addr_len] = '\0';}
#if defined(OPENSSL)else if (strncmp(URI_SSL, ip_address, strlen(URI_SSL)) == 0){ssl = 1;addr_len = strlen(ip_address) - strlen(URI_SSL);addr = malloc(addr_len + 1);memcpy(addr, ip_address + strlen(URI_SSL), addr_len);addr[addr_len] = '\0';}
#endif// 建立实际的网络连接
#if defined(OPENSSL)if (ssl){if (SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, addr, addr_len) == 1){
#if defined(__GNUC__) && defined(__linux__)rc = Socket_new(addr, addr_len, port, &(aClient->net.socket), aClient->connect_timeout);
#elserc = Socket_new(addr, addr_len, port, &(aClient->net.socket));
#endifif (rc == EINPROGRESS || rc == EWOULDBLOCK){aClient->connect_state = SSL_IN_PROGRESS;rc = SSLSocket_connect(aClient->net.ssl, aClient->net.socket, addr,aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context);if (rc == TCPSOCKET_INTERRUPTED)aClient->connect_state = SSL_IN_PROGRESS;}else if (rc == 0){rc = SSLSocket_connect(aClient->net.ssl, aClient->net.socket, addr,aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context);if (rc == TCPSOCKET_INTERRUPTED)aClient->connect_state = SSL_IN_PROGRESS;}}}else
#endif{
#if defined(__GNUC__) && defined(__linux__)rc = Socket_new(addr, addr_len, port, &(aClient->net.socket), aClient->connect_timeout);
#elserc = Socket_new(addr, addr_len, port, &(aClient->net.socket));
#endifif (rc == EINPROGRESS || rc == EWOULDBLOCK)aClient->connect_state = TCP_IN_PROGRESS;}// WebSocket 处理if (websocket){aClient->connect_state = WEBSOCKET_IN_PROGRESS;rc = WebSocket_connect(&aClient->net, ssl, ip_address);}free(addr);
exit:FUNC_EXIT_RC(rc);return rc;
}
5.1.2 CONNECT 报文发送

连接建立后,客户端发送 CONNECT 报文:

m->c->connect_state = WAIT_FOR_CONNACK;
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{rc = SOCKET_ERROR;goto exit;
}

5.2 消息发布

消息发布由 MQTTClient_publish5() 函数处理:

MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
{int rc = MQTTCLIENT_SUCCESS;MQTTClients* m = handle;Messages* msg = NULL;Publish* p = NULL;int blocked = 0;int msgid = 0;MQTTResponse resp = MQTTResponse_initializer;FUNC_ENTRY;Paho_thread_lock_mutex(mqttclient_mutex);if (m == NULL || m->c == NULL)rc = MQTTCLIENT_FAILURE;else if (m->c->connected == 0)rc = MQTTCLIENT_DISCONNECTED;else if (!UTF8_validateString(topicName))rc = MQTTCLIENT_BAD_UTF8_STRING;if (rc != MQTTCLIENT_SUCCESS)goto exit;// 如果出站队列已满,则阻塞等待while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||Socket_noPendingWrites(m->c->net.socket) == 0){if (blocked == 0){blocked = 1;Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);}Paho_thread_unlock_mutex(mqttclient_mutex);MQTTClient_yield();Paho_thread_lock_mutex(mqttclient_mutex);if (m->c->connected == 0){rc = MQTTCLIENT_FAILURE;goto exit;}}if (blocked == 1)Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0){	// 这里应该永远不会发生,因为我们已经等待队列空间了rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;goto exit;}// 创建Publish对象if ((p = malloc(sizeof(Publish))) == NULL){rc = PAHO_MEMORY_ERROR;goto exit_and_free;}memset(p->mask, '\0', sizeof(p->mask));p->payload = NULL;p->payloadlen = payloadlen;if (payloadlen > 0){if ((p->payload = malloc(payloadlen)) == NULL){rc = PAHO_MEMORY_ERROR;goto exit_and_free;}memcpy(p->payload, payload, payloadlen);}if ((p->topic = MQTTStrdup(topicName)) == NULL){rc = PAHO_MEMORY_ERROR;goto exit_and_free;}p->msgId = msgid;p->MQTTVersion = m->c->MQTTVersion;if (m->c->MQTTVersion >= MQTTVERSION_5){if (properties)p->properties = *properties;else{MQTTProperties props = MQTTProperties_initializer;p->properties = props;}}// 开始发布消息rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);// 如果数据包只是部分写入socket,则等待完成if (rc == TCPSOCKET_INTERRUPTED){while (m->c->connected == 1){pending_writes* writing = NULL;Paho_thread_lock_mutex(socket_mutex);writing = SocketBuffer_getWrite(m->c->net.socket);Paho_thread_unlock_mutex(socket_mutex);if (writing == NULL)break;Paho_thread_unlock_mutex(mqttclient_mutex);MQTTClient_yield();Paho_thread_lock_mutex(mqttclient_mutex);}rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;}if (deliveryToken && qos > 0)*deliveryToken = msg->msgid;exit_and_free:if (p){if (p->topic)free(p->topic);if (p->payload)free(p->payload);free(p);}if (rc == SOCKET_ERROR){MQTTClient_disconnect_internal(handle, 0);// 对于qos > 0,返回成功,因为发送将自动重试rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;}exit:Paho_thread_unlock_mutex(mqttclient_mutex);resp.reasonCode = rc;FUNC_EXIT_RC(resp.reasonCode);return resp;
}

5.3 QoS 处理机制

5.3.1 QoS 1 消息处理

对于 QoS 1 消息,发送端需要等待 PUBACK 确认:

  1. 发送 PUBLISH 报文

  2. 将消息添加到 outboundMsgs 列表中进行跟踪

  3. 接收 PUBACK 后从列表中移除消息

int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove)
{Puback* puback = (Puback*)pack;Clients* client = NULL;int rc = TCPSOCKET_COMPLETE;FUNC_ENTRY;client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);Log(LOG_PROTOCOL, 14, NULL, sock, client->clientID, puback->msgId);// 在出站消息记录中按消息ID查找消息if (ListFindItem(client->outboundMsgs, &(puback->msgId), messageIDCompare) == NULL)Log(TRACE_MIN, 3, NULL, "PUBACK", client->clientID, puback->msgId);else{Messages* m = (Messages*)(client->outboundMsgs->current->content);if (m->qos != 1)Log(TRACE_MIN, 4, NULL, "PUBACK", client->clientID, puback->msgId, m->qos);else{Log(TRACE_MIN, 6, NULL, "PUBACK", client->clientID, puback->msgId);#if !defined(NO_PERSISTENCE)rc = MQTTPersistence_remove(client,(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,m->qos, puback->msgId);#endifif (pubToRemove != NULL)*pubToRemove = m->publish;elseMQTTProtocol_removePublication(m->publish);if (m->MQTTVersion >= MQTTVERSION_5)MQTTProperties_free(&m->properties);ListRemove(client->outboundMsgs, m);}}if (puback->MQTTVersion >= MQTTVERSION_5)MQTTProperties_free(&puback->properties);free(pack);FUNC_EXIT_RC(rc);return rc;
}
5.3.2 QoS 2 消息处理

对于 QoS 2 消息,采用四步握手过程:

  1. 发送 PUBLISH 报文

  2. 接收 PUBREC 确认

  3. 发送 PUBREL 报文

  4. 接收 PUBCOMP 确认

int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove)
{Pubrec* pubrec = (Pubrec*)pack;Clients* client = NULL;int rc = TCPSOCKET_COMPLETE;int send_pubrel = 1; // 是否发送PUBREL的布尔值FUNC_ENTRY;client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);// 在出站消息记录中按消息ID查找消息client->outboundMsgs->current = NULL;if (ListFindItem(client->outboundMsgs, &(pubrec->msgId), messageIDCompare) == NULL){if (pubrec->header.bits.dup == 0)Log(TRACE_MIN, 3, NULL, "PUBREC", client->clientID, pubrec->msgId);}else{Messages* m = (Messages*)(client->outboundMsgs->current->content);if (m->qos != 2){if (pubrec->header.bits.dup == 0)Log(TRACE_MIN, 4, NULL, "PUBREC", client->clientID, pubrec->msgId, m->qos);}else if (m->nextMessageType != PUBREC){if (pubrec->header.bits.dup == 0)Log(TRACE_MIN, 5, NULL, "PUBREC", client->clientID, pubrec->msgId);}else{if (pubrec->MQTTVersion >= MQTTVERSION_5 && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR){Log(TRACE_MIN, -1, "Pubrec error %d received for client %s msgid %d, not sending PUBREL",pubrec->rc, client->clientID, pubrec->msgId);#if !defined(NO_PERSISTENCE)rc = MQTTPersistence_remove(client,(pubrec->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,m->qos, pubrec->msgId);#endifif (pubToRemove != NULL)*pubToRemove = m->publish;elseMQTTProtocol_removePublication(m->publish);if (m->MQTTVersion >= MQTTVERSION_5)MQTTProperties_free(&m->properties);ListRemove(client->outboundMsgs, m);}else{m->nextMessageType = PUBREL;#if !defined(NO_PERSISTENCE)rc = MQTTPersistence_update(client,(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,m->qos, pubrec->msgId);#endifif (!Socket_noPendingWrites(sock))rc = MQTTProtocol_queueAck(client, PUBREL, pubrec->msgId);elserc = MQTTPacket_send_pubrel(m->MQTTVersion, m->msgid, 0, &client->net, client->clientID);}}}if (pubrec->MQTTVersion >= MQTTVERSION_5)MQTTProperties_free(&pubrec->properties);free(pack);FUNC_EXIT_RC(rc);return rc;
}

5.4 重试机制

当 QoS 1 或 QoS 2 消息未收到确认时,客户端会进行重试:

void MQTTProtocol_retries(START_TIME_TYPE now, Clients* client, int regardless)
{ListElement* outcurrent = NULL;FUNC_ENTRY;if (client->connected == 0)goto exit;while (client && ListNextElement(client->outboundMsgs, &outcurrent) &&client->connected && client->good &&        /* client is connected and has no errors */Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */{Messages* m = (Messages*)(outcurrent->content);if (regardless || MQTTTime_difftime(now, m->lastTouch) > (DIFF_TIME_TYPE)(max(client->retryInterval, 10) * 1000)){if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC)){Publish publish;int rc;Log(TRACE_MIN, 7, NULL, "PUBLISH", client->clientID, client->net.socket, m->msgid);publish.msgId = m->msgid;publish.topic = m->publish->topic;publish.payload = m->publish->payload;publish.payloadlen = m->publish->payloadlen;publish.properties = m->properties;publish.MQTTVersion = m->MQTTVersion;memcpy(publish.mask, m->publish->mask, sizeof(publish.mask));rc = MQTTPacket_send_publish(&publish, 1, m->qos, m->retain, &client->net, client->clientID);memcpy(m->publish->mask, publish.mask, sizeof(m->publish->mask)); /* store websocket mask used in send */if (rc == SOCKET_ERROR){client->good = 0;Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,Socket_getpeer(client->net.socket));MQTTProtocol_closeSession(client, 1);client = NULL;}else{if (m->qos == 0 && rc == TCPSOCKET_INTERRUPTED)MQTTProtocol_storeQoS0(client, &publish);m->lastTouch = MQTTTime_now();}}else if (m->qos == 2 && m->nextMessageType == PUBREL){Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);if (MQTTPacket_send_pubrel(client->MQTTVersion, m->msgid, 0, &client->net, client->clientID) == SOCKET_ERROR){client->good = 0;Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,Socket_getpeer(client->net.socket));MQTTProtocol_closeSession(client, 1);client = NULL;}elsem->lastTouch = MQTTTime_now();}if (client){ListNextElement(client->outboundMsgs, &outcurrent);ListRemove(client->outboundMsgs, m);if (m->MQTTVersion >= MQTTVERSION_5)MQTTProperties_free(&m->properties);MQTTProtocol_freeMessage(m);}}}
exit:FUNC_EXIT;
}

6. 线程模型

Paho MQTT C 客户端采用多线程模型处理网络 I/O 和用户操作:

6.1 主线程

主线程负责处理用户 API 调用,如连接、发布、订阅等操作。

6.2 网络线程

网络线程负责处理网络事件,通过 MQTTClient_yield() 和 MQTTClient_receive() 函数实现:

int MQTTClient_yield(void)
{int rc = 0;START_TIME_TYPE start = MQTTTime_start_clock();unsigned long elapsed = 0L;int wait_time = 100;FUNC_ENTRY;if (yieldThread_state != STOPPED){rc = MQTTCLIENT_FAILURE;goto exit;}elapsed = MQTTTime_elapsed(start);while (elapsed < wait_time){int rc1 = 0;unsigned long least = wait_time - elapsed;if (MQTTClient_receive(NULL, least, &rc1) == NULL){if (rc1 == SOCKET_ERROR)break;}elapsed = MQTTTime_elapsed(start);}
exit:FUNC_EXIT_RC(rc);return rc;
}

7. 持久化存储

为了确保 QoS 1 和 QoS 2 消息在客户端重启后仍能正确传递,Paho MQTT C 客户端实现了持久化存储机制:

int MQTTPersistence_putPacket(int type, void* data, int len, char** key, int* keyLen)
{int rc = 0;FUNC_ENTRY;if (state.persistence || type == PERSISTENCE_COMMAND){if (type == PERSISTENCE_COMMAND){rc = MQTTPersistence_sendCommand(data, len, key, keyLen);}else{char* buffer = NULL;int buflen = 0;if ((buffer = malloc(len)) == NULL){rc = PAHO_MEMORY_ERROR;goto exit;}memcpy(buffer, data, len);buflen = len;if (state.persistence->popen){if ((rc = state.persistence->pclose(state.persistence)) != 0)Log(LOG_ERROR, -1, "Error closing persistence store %d", rc);}if ((rc = state.persistence->popen(state.persistence, state.clientID, state.serverURI)) != 0){Log(LOG_ERROR, -1, "Error opening persistence store %d", rc);goto exit;}if ((rc = state.persistence->pput(state.persistence, *key, *keyLen, buffer, buflen)) != 0)Log(LOG_ERROR, -1, "Error putting packet into persistence store %d", rc);}}
exit:FUNC_EXIT_RC(rc);return rc;
}

8. 总结

Paho MQTT C 客户端库是一个功能完整、设计精良的 MQTT 实现,具有以下特点:

  1. 协议完整实现:支持 MQTT 3.1.1 和 MQTT 5.0 协议的所有特性

  2. 多平台支持:可在多种操作系统和硬件平台上运行

  3. 高性能设计:采用 I/O 多路复用和缓冲区管理提高性能

  4. 可靠性保证:通过 QoS 机制和重试机制确保消息可靠传递

  5. 灵活配置:支持多种连接方式(TCP、SSL、WebSocket)和配置选项

  6. 线程安全:通过互斥锁保证多线程环境下的安全性

  7. 可扩展性:模块化设计便于扩展和维护

通过深入理解其源码实现,开发者可以更好地使用该库,也可以基于其设计思想开发自己的 MQTT 应用。

http://www.dtcms.com/a/568795.html

相关文章:

  • 零基础学AI大模型之RAG系统链路构建:文档切割转换全解析
  • Vue 核心语法详解:模板语法中的绑定表达式与过滤器(附 Vue3 替代方案)
  • CentOS7.6 部署 k3s 单机版
  • 【算法训练营 · 专项练习篇】Stream流与函数式编程
  • 泰州企业做网站百度地图怎么导航环线
  • int8_to_float(output_tensor->data.int8, output_float, load_class_num);
  • 使用Nmap扫描某个服务器所有开放端口
  • 如何看网站是用什么程序做的如何把qq音乐导入到wordpress
  • SpringCloud网关实战:路由与鉴权全解析
  • 基于ResNet50和PyTorch的猫狗图像分类系统设计与实现
  • 自回归模型例题(AR)与ACF/PACF图绘制
  • ESP32-WROOM-32E LED点灯系列
  • 《红色脉络:一部PLMN在中国的演进史诗 (1G-6G)》 第15篇 | 结语:无尽的前沿——PLMN的未来与中国的全球角色
  • 付网站开发费计入什么科目seo外包杭州
  • 外贸网站域名被封免费网络游戏大全
  • PySide6 Win10记事本从零到一——第七章 格式菜单界面与功能实现
  • PDF文件损坏打不开怎么修复?2025年最新修复工具测评与对比
  • 谈谈MYSQL索引失效场景
  • Qwen-Image-Edit本地到底如何部署使用?怎么还有comfyui
  • 佳能LBP6018L打印浅淡问题的尝试性解决方法
  • 微算法科技(NASDAQ MLGO):以隐私计算区块链筑牢多方安全计算(MPC)安全防线
  • SpringCache :让缓存开发更高效
  • 电路分析 | Phasor Analysis(篇 1)
  • 网站备案取消长春网站建设模板样式
  • get_ccopt系列命令介绍(二)
  • 成都工业学院文献检索在哪个网站做破解wordpress密码
  • 做网站用什么系统好网站登录验证码是怎么做的
  • SQL语法基础教程
  • 算法25.0
  • 无穿戴动捕技术:解锁动作捕捉新维度,拓展多元应用边界