Linux平台搭建MQTT测试环境
Paho MQTT
Paho MQTT 是 Eclipse 基金会下的一个开源项目,旨在为多种编程语言提供 MQTT 协议的客户端实现。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅(Pub/Sub)消息传输协议,专为物联网(IoT)等低带宽、高延迟或不稳定网络环境设计。
关键信息:
-
定义:
- Paho 是 Eclipse IoT 技术栈的一部分,提供跨平台的 MQTT 客户端库,支持设备与服务器之间的双向通信。
-
paho.mqtt
通常指特定语言的客户端库,例如paho-mqtt
是 Python 的客户端库名称。
-
支持的语言:
- Python:
paho-mqtt
(通过pip install paho-mqtt
安装) - Java:
Eclipse Paho Java Client
- C/C++:
Eclipse Paho C/C++
- JavaScript:
Paho MQTT over WebSocket(适用于浏览器)
- 其他语言(如 Go、.NET 等)也有社区支持的实现。
- Python:
-
核心功能:
- 发布消息到 MQTT Broker(如 Mosquitto、EMQX、AWS IoT Core)。
- 订阅主题并接收消息。
- 支持 MQTT 3.1.1 和 5.0 协议版本。
- 提供 QoS(服务质量)级别(0/1/2),确保消息可靠性。
- 支持 TLS/SSL 加密、持久会话、遗嘱消息(Last Will)等特性。
-
典型应用场景:
- 物联网设备与云平台通信(如传感器上报数据、远程控制设备)。
- 移动端与服务器实时消息推送。
- 跨系统异步通信(如微服务架构中的事件驱动)。
测试Demo
1. 安装MQTT Broker(Mosquitto)
# 安装Mosquitto Broker及客户端工具
sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients
# 启动Mosquitto服务(部分系统需手动启动)
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# 验证Broker是否运行
sudo systemctl status mosquitto
2. 安装C语言MQTT库(推荐Paho MQTT C库)
# 安装依赖项
sudo apt-get install build-essential cmake openssl libssl-dev
# 下载并编译Paho MQTT C库
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
mkdir build
cd build
cmake ..
make
sudo make install
sudo ldconfig # 更新动态库缓存
3. 编写MQTT测试程序
发布者(publisher.c)
------------------------------------------------------------------
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "Publisher"
#define TOPIC "test/topic"
#define QOS 1
#define TIMEOUT 10000L
int main() {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
// 创建客户端实例
rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to create client: %d\n", rc);
exit(EXIT_FAILURE);
}
// 连接Broker
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
rc = MQTTClient_connect(client, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to connect: %d\n", rc);
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
// 发送消息
pubmsg.payload = (void*)"Hello from C MQTT!";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to publish: %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Message published. Token: %d\n", token);
// 断开连接并清理
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return 0;
}
订阅者(subscriber.c)
----------------------------------------------------------------------------
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "Subscriber"
#define TOPIC "test/topic"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt) {
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
printf("Received message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause) {
fprintf(stderr, "Connection lost. Cause: %s\n", cause);
}
int main() {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to create client: %d\n", rc);
exit(EXIT_FAILURE);
}
// 设置回调函数
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
rc = MQTTClient_connect(client, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to connect: %d\n", rc);
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
// 订阅主题
rc = MQTTClient_subscribe(client, TOPIC, QOS);
if (rc != MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Failed to subscribe: %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribed to topic: %s\n", TOPIC);
printf("Press Q<Enter> to quit\n");
while(getchar() != 'Q');
// 清理
MQTTClient_unsubscribe(client, TOPIC);
MQTTClient_disconnect(client, TIMEOUT);
MQTTClient_destroy(&client);
return 0;
}
4. 编译程序
# 编译发布者
gcc publisher.c -o publisher -lpaho-mqtt3c
# 编译订阅者
gcc subscriber.c -o subscriber -lpaho-mqtt3c