当前位置: 首页 > news >正文

Linux平台搭建MQTT测试环境

Paho MQTT

Paho MQTT‌ 是 Eclipse 基金会下的一个开源项目,旨在为多种编程语言提供 ‌MQTT 协议‌的客户端实现。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅(Pub/Sub)消息传输协议,专为物联网(IoT)等低带宽、高延迟或不稳定网络环境设计。

关键信息:

  1. 定义‌:

    • Paho‌ 是 Eclipse IoT 技术栈的一部分,提供跨平台的 MQTT 客户端库,支持设备与服务器之间的双向通信。
    • paho.mqtt‌ 通常指特定语言的客户端库,例如 paho-mqtt 是 Python 的客户端库名称。
  2. 支持的语言‌:

    • Python‌: paho-mqtt(通过 pip install paho-mqtt 安装)
    • Java‌: Eclipse Paho Java Client
    • C/C++‌: Eclipse Paho C/C++
    • JavaScript‌: Paho MQTT over WebSocket(适用于浏览器)
    • 其他语言(如 Go、.NET 等)也有社区支持的实现。
  3. 核心功能‌:

    • 发布消息到 MQTT Broker(如 Mosquitto、EMQX、AWS IoT Core)。
    • 订阅主题并接收消息。
    • 支持 MQTT 3.1.1 和 5.0 协议版本。
    • 提供 QoS(服务质量)级别(0/1/2),确保消息可靠性。
    • 支持 TLS/SSL 加密、持久会话、遗嘱消息(Last Will)等特性。
  4. 典型应用场景‌:

    • 物联网设备与云平台通信(如传感器上报数据、远程控制设备)。
    • 移动端与服务器实时消息推送。
    • 跨系统异步通信(如微服务架构中的事件驱动)。

测试Demo


1. 安装MQTT Broker(Mosquitto)

# 安装Mosquitto Broker及客户端工具
sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients

# 启动Mosquitto服务(部分系统需手动启动)
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# 验证Broker是否运行
sudo systemctl status mosquitto

2. 安装C语言MQTT库(推荐Paho MQTT C库)

# 安装依赖项
sudo apt-get install build-essential cmake openssl libssl-dev

# 下载并编译Paho MQTT C库
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
mkdir build
cd build
cmake ..
make
sudo make install
sudo ldconfig  # 更新动态库缓存

3. 编写MQTT测试程序

‌发布者(publisher.c)
------------------------------------------------------------------
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "Publisher"
#define TOPIC       "test/topic"
#define QOS         1
#define TIMEOUT     10000L

int main() {
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;

    int rc;
    // 创建客户端实例
    rc = MQTTClient_create(&client, ADDRESS, CLIENTID, 
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to create client: %d\n", rc);
        exit(EXIT_FAILURE);
    }

    // 连接Broker
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    rc = MQTTClient_connect(client, &conn_opts);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to connect: %d\n", rc);
        MQTTClient_destroy(&client);
        exit(EXIT_FAILURE);
    }

    // 发送消息
    pubmsg.payload = (void*)"Hello from C MQTT!";
    pubmsg.payloadlen = strlen(pubmsg.payload);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to publish: %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("Message published. Token: %d\n", token);

    // 断开连接并清理
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return 0;
}
‌订阅者(subscriber.c)
----------------------------------------------------------------------------
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "Subscriber"
#define TOPIC       "test/topic"
#define QOS         1
#define TIMEOUT     10000L

volatile MQTTClient_deliveryToken deliveredtoken;

void delivered(void *context, MQTTClient_deliveryToken dt) {
    deliveredtoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
    printf("Received message: %.*s\n", message->payloadlen, (char*)message->payload);
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

void connlost(void *context, char *cause) {
    fprintf(stderr, "Connection lost. Cause: %s\n", cause);
}

int main() {
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;

    rc = MQTTClient_create(&client, ADDRESS, CLIENTID, 
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to create client: %d\n", rc);
        exit(EXIT_FAILURE);
    }

    // 设置回调函数
    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    rc = MQTTClient_connect(client, &conn_opts);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to connect: %d\n", rc);
        MQTTClient_destroy(&client);
        exit(EXIT_FAILURE);
    }

    // 订阅主题
    rc = MQTTClient_subscribe(client, TOPIC, QOS);
    if (rc != MQTTCLIENT_SUCCESS) {
        fprintf(stderr, "Failed to subscribe: %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("Subscribed to topic: %s\n", TOPIC);
    printf("Press Q<Enter> to quit\n");
    while(getchar() != 'Q');

    // 清理
    MQTTClient_unsubscribe(client, TOPIC);
    MQTTClient_disconnect(client, TIMEOUT);
    MQTTClient_destroy(&client);
    return 0;
}

4. 编译程序

# 编译发布者
gcc publisher.c -o publisher -lpaho-mqtt3c

# 编译订阅者
gcc subscriber.c -o subscriber -lpaho-mqtt3c

5. 运行测试

相关文章:

  • 使用scoop一键下载jdk和实现版本切换
  • Python数据分析-NumPy模块-矩阵的运算
  • Vue3+Ts封装ToolTip组件(2.0版本)
  • Vue.js 中 v-if 的使用及其原理
  • Nginx漏洞复现
  • andorid 查找没有使用的资源
  • Navicat和PLSQL在oracle 使用语句报ORA-00911: 无效字符
  • Mysql专题篇章
  • SQL Server 数据库邮件配置失败:SMTP 连接与权限问题
  • zookeeper平滑扩缩容
  • 蓝桥杯 C/C++ 组历届真题合集速刷(二)
  • 数字IC后端项目典型问题之后端实战项目问题记录
  • Linux驱动开发:SPI驱动开发原理
  • sql-labs靶场 less-1
  • fabric.js基础使用
  • CrystalDiskInfo电脑硬盘监控工具 v9.6.0中文绿色便携版
  • 平台算法暗战:ebay欧洲站搜索词长度同比缩短2.3字符的应对策略
  • Java 泛型的逆变与协变:深入理解类型安全与灵活性
  • Windows系统中Miniforge安装后的环境变量配置与conda命令不可用解决方案
  • Redis主从复制:告别单身Redis!
  • SIFF动画单元公布首批片单:《燃比娃》《凡尔赛玫瑰》等
  • 沙县小吃中东首店在沙特首都利雅得开业,首天营业额超5万元
  • 广东省人大教科文卫委原主任委员梁万里被开除党籍:退休后受贿仍不知止
  • 均价19.5万元/平米!上海徐汇滨江地王项目“日光”,销售额近70亿元
  • 协会:坚决支持司法机关依法打击涉象棋行业的违法行为
  • 2024年上市公司合计实现营业收入71.98万亿元