当前位置: 首页 > news >正文

【C++实战(76)】从0到1:C++在MQTT物联网开发中的实战之旅

目录

  • 一、MQTT 协议的基础概念
    • 1.1 MQTT 的特点
    • 1.2 MQTT 的核心组件
    • 1.3 MQTT 的常见应用场景
  • 二、MQTT 客户端库(Paho MQTT C++)实战
    • 2.1 Paho MQTT C++ 库的安装与配置(Windows/Linux)
    • 2.2 MQTT 客户端的创建与连接(连接 Broker、设置连接参数)
    • 2.3 主题订阅与消息发布(subscribe、publish、消息回调处理)
  • 三、物联网设备数据交互
    • 3.1 设备端数据采集(模拟传感器数据、定时发布)
    • 3.2 服务端数据接收与处理(订阅主题、数据存储到数据库)
    • 3.3 设备远程控制(服务端发布控制指令、设备端接收并执行)
  • 四、实战项目:物联网温湿度监控系统
    • 4.1 项目需求(模拟温湿度传感器、MQTT 发布数据、服务端接收并展示)
    • 4.2 Paho MQTT C++ 实现客户端与服务端交互
    • 4.3 数据可视化(结合 Qt 展示实时数据)与异常报警


一、MQTT 协议的基础概念

1.1 MQTT 的特点

MQTT(Message Queuing Telemetry Transport)即消息队列遥测传输协议,是一种基于发布 / 订阅模式的轻量级通信协议,专为低带宽、高延迟或不稳定网络环境设计,尤其适用于物联网(IoT)场景。

MQTT 最突出的特点就是轻量级 ,其协议头开销小,代码实现也相对简单。这对于资源受限的物联网设备来说至关重要,因为它们通常内存较小、处理能力有限,小的协议头开销意味着可以更高效地利用设备资源,降低对设备硬件的要求,从而降低设备成本。在一个简单的物联网设备向服务器发送数据的场景中,使用 MQTT 协议传输一条包含少量有效载荷的数据消息,其协议头可能仅占几个字节,而一些较为复杂的协议,其协议头可能会占用数十甚至上百字节。

MQTT 采用发布 / 订阅模式 ,该模式下,发布者(如传感器设备)和订阅者(如数据处理服务器、用户终端等)之间不需要直接建立连接,也不需要知道对方的具体位置和状态。传感器只需要将数据发布到主题(Topic)上,而订阅者从自己订阅的主题中获取数据,这种解耦的方式大大提高了系统的灵活性和可扩展性。并且一个发布者可以将消息发布到多个主题,而多个订阅者也可以订阅同一个主题。这种多对多的通信模式非常适合物联网场景。

由于具备以上这些特点,MQTT 非常适用于物联网领域 。物联网中的设备种类繁多,资源和网络条件各异,MQTT 的轻量级特性使其能够在资源受限的设备上运行,发布 / 订阅模式则满足了设备之间复杂的通信需求。无论是智能家居中的各种传感器和智能家电,还是工业物联网中的大量工业设备,都可以借助 MQTT 实现高效的数据传输和通信。

1.2 MQTT 的核心组件

  • 客户端(Client):任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。客户端可以扮演发布者(Publisher)和订阅者(Subscriber)的角色。发布者负责向特定主题发送消息,订阅者则订阅特定主题以接收消息,并且一个客户端可以同时是发布者和订阅者。在智能家居系统中,温度传感器作为发布者,定期向 “home/livingroom/temperature” 主题发布温度数据;手机 APP 作为订阅者,订阅该主题以显示实时温度;空调控制器也作为订阅者,根据温度数据自动调节工作状态。
  • Broker:Broker 是实现了 MQTT 通讯协议的代理软件,是 MQTT 协议的核心组件,相当于消息的 “中转站” 或 “邮局”。其主要职责包括处理客户端的连接、断开请求,维护会话状态;接收发布者的消息,根据主题将消息转发给对应的订阅者;以及为离线客户端暂存消息(当启用持久会话时),同时实施认证和权限控制策略 。常见的 MQTT 代理软件包括 EMQX、Mosquitto、HiveMQ 等,其中 EMQX 以高性能和企业级特性著称,是大规模物联网应用的理想选择。
  • 主题 Topic:Topic 是存在于 MQTT Broker 中的一个普通字符串,是 MQTT 中消息的分类方式,采用层次化的结构设计,非常类似文件系统的路径 ,使用 “/” 分隔层级,每一级代表一个分类维度 。主题无需预先创建,在发布时即创建,并且大小写敏感 ,如 “Home” 和 “home” 是两个不同的主题。同时,主题还支持通配符,“+” 为单层通配符,匹配一个层级,如 home/+/temperature 匹配任何房间的温度;“#” 为多层通配符,匹配多个层级,如 home/# 匹配家中所有数据。
  • QoS 等级:MQTT 支持三种服务质量(QoS)等级,以确保消息传输的可靠性 。QoS 0 为至多一次传递,即最多发送一次,不保证到达,适合环境监测等可容忍丢失数据的场景;QoS 1 为至少一次传递,保证到达,但可能重复,适合设备控制指令等场景;QoS 2 为恰好一次传递,保证到达且不重复,适合计费、支付等对消息准确性要求极高的关键操作场景 。

1.3 MQTT 的常见应用场景

  • 设备数据采集:在物联网环境中,存在大量的传感器设备,如温湿度传感器、压力传感器、光照传感器等。这些传感器需要将采集到的数据发送到服务器进行处理和分析。以智能农业为例,分布在农田各处的土壤湿度传感器、光照强度传感器等,可通过 MQTT 协议将实时数据传输至中央监控平台。传感器作为发布者,定期向特定主题(如 “farm/sensor/soil”、“farm/sensor/light” )发布数据,服务器作为订阅者,订阅这些主题来接收数据,从而实现对农田环境数据的实时采集和监控 。
  • 远程控制:通过 MQTT 可以实现对设备的远程控制。比如在智能家居系统中,用户可以通过手机 APP 远程控制家中的智能家电,如智能灯泡、智能插座、智能空调等。手机 APP 作为发布者,向对应的设备主题发布控制指令(如 “home/light/control”、“home/airconditioner/control” ),智能家电作为订阅者,接收这些指令并执行相应的操作,从而实现远程控制的功能 。在工业领域,也可以通过 MQTT 对工厂中的设备进行远程监控和控制,提高生产效率和管理水平。

二、MQTT 客户端库(Paho MQTT C++)实战

2.1 Paho MQTT C++ 库的安装与配置(Windows/Linux)

  • Windows 下的安装与配置
    • 安装依赖:确保已安装 Visual Studio 或 MinGW 环境,为了支持安全连接,还需要安装 OpenSSL 库。
    • 获取源码:从 Eclipse Paho MQTT C++ 的官方 GitHub 仓库克隆最新版本的源代码。如果要构建静态链接库,需选择适合项目的分支或标签版本。
    • 配置构建环境:使用 CMake 工具简化跨平台项目设置过程。打开命令提示符,切换到下载的 Paho MQTT C++ 源码目录,创建一个新的 build 文件夹并进入该文件夹,然后运行cmake -G “Visual Studio 16 2019” -A x64 …(这里以 Visual Studio 2019 64 位为例,如果是其他版本的 VS,需相应调整)。如果之前安装的 OpenSSL 库不在系统默认路径,还需要指定OPENSSL_ROOT_DIR等相关变量,例如cmake -G “Visual Studio 16 2019” -A x64 -DOPENSSL_ROOT_DIR=C:/OpenSSL-Win64 …。
    • 编译项目:在 build 文件夹中,找到生成的.sln 解决方案文件,使用 Visual Studio 打开并进行编译。也可以在命令行中运行cmake --build . --config Release进行编译。编译完成后,会在 build 文件夹下生成相应的库文件和可执行文件。
  • Linux 下的安装与配置
    • 安装依赖:使用包管理器安装编译相关依赖,如sudo apt-get install build-essential gcc make cmake,还要安装 SSL 开发库sudo apt-get install libssl-dev。
    • 获取源码:在终端中运行git clone https://github.com/eclipse/paho.mqtt.cpp,将代码克隆到本地。
    • 编译安装:进入克隆的代码目录cd paho.mqtt.cpp,然后创建一个构建目录mkdir build && cd build。在构建目录中运行cmake -DPAHO_WITH_MQTT_C=ON -DPAHO_BUILD_EXAMPLES=ON …进行项目配置。配置完成后,使用sudo make install进行编译和安装,这样 Paho MQTT C++ 库就会被安装到系统的默认路径(如/usr/local/lib和/usr/local/include)。

2.2 MQTT 客户端的创建与连接(连接 Broker、设置连接参数)

在 C++ 中使用 Paho MQTT C++ 库创建并连接 MQTT 客户端的示例代码如下:

#include <iostream>
#include <paho-mqttpp3.hpp>// 定义回调类,继承自mqtt::callback
class MQTTCallback : public mqtt::callback {
public:void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::cout << "Message arrived on topic: " << topic<< ", Message: " << message.payload.data() << std::endl;}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}
};int main() {// 连接地址,这里以本地MQTT Broker为例std::string broker = "tcp://localhost:1883";// 客户端ID,需要唯一std::string clientId = "myClient123";try {// 创建MQTT客户端对象mqtt::async_client client(broker, clientId);// 创建回调对象MQTTCallback cb;// 设置回调client.set_callback(cb);// 创建连接选项对象mqtt::connect_options connOpts;// 设置用户名和密码(如果需要认证)// connOpts.set_user_name("username");// connOpts.set_password("password");// 设置是否清除会话,false表示创建持久会话connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;// 异步连接到Brokermqtt::token_ptr conntok = client.connect(connOpts);// 等待连接完成conntok->wait();std::cout << "Connected" << std::endl;// 后续操作,如订阅主题、发布消息等// 断开连接client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在上述代码中,首先定义了一个MQTTCallback类,用于处理连接丢失、消息到达和消息投递完成等事件。然后在main函数中,创建了 MQTT 客户端对象,设置了连接选项,包括用户名、密码(这里注释掉了,实际使用时根据情况配置)和是否清除会话等参数 ,接着进行异步连接并等待连接完成,最后进行断开连接操作。

2.3 主题订阅与消息发布(subscribe、publish、消息回调处理)

在连接到 MQTT Broker 后,可以进行主题订阅和消息发布操作。以下是在前面代码基础上添加主题订阅和消息发布的示例:

#include <iostream>
#include <paho-mqttpp3.hpp>class MQTTCallback : public mqtt::callback {
public:void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::cout << "Message arrived on topic: " << topic<< ", Message: " << message.payload.data() << std::endl;}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "myClient123";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;// connOpts.set_user_name("username");// connOpts.set_password("password");connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;// 订阅主题std::string topic = "sensor/data";// QoS等级设置为1int qos = 1;mqtt::token_ptr subTok = client.subscribe(topic, qos);subTok->wait();std::cout << "Subscribed to topic: " << topic << std::endl;// 发布消息std::string message = "Hello, MQTT!";mqtt::token_ptr pubTok = client.publish(topic, message, qos, false);pubTok->wait();std::cout << "Message published: " << message << std::endl;client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在上述代码中,添加了主题订阅和消息发布的功能。通过client.subscribe方法订阅了名为sensor/data的主题,并设置 QoS 等级为 1;通过client.publish方法向该主题发布了一条消息Hello, MQTT!,同样设置 QoS 等级为 1 。在MQTTCallback类中,messageArrived方法用于处理接收到的消息,当有消息到达订阅的主题时,会在控制台输出消息的主题和内容 。

三、物联网设备数据交互

3.1 设备端数据采集(模拟传感器数据、定时发布)

在物联网应用中,设备端需要采集各种数据并发送到服务器。这里以模拟温湿度传感器数据为例,使用 C++ 和 Paho MQTT C++ 库实现定时发布数据的功能。

#include <iostream>
#include <paho-mqttpp3.hpp>
#include <thread>
#include <chrono>
#include <random>class MQTTCallback : public mqtt::callback {
public:void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::cout << "Message arrived on topic: " << topic<< ", Message: " << message.payload.data() << std::endl;}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "deviceClient";std::string topic = "sensor/temperature_humidity";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;// 随机数生成器,用于模拟传感器数据std::random_device rd;std::mt19937 gen(rd());std::uniform_real_distribution<> dis_temperature(20.0, 30.0);std::uniform_real_distribution<> dis_humidity(40.0, 60.0);while (true) {// 生成模拟的温湿度数据double temperature = dis_temperature(gen);double humidity = dis_humidity(gen);// 构建消息内容std::string message = "Temperature: " + std::to_string(temperature) + " C, Humidity: " + std::to_string(humidity) + " %";mqtt::token_ptr pubTok = client.publish(topic, message, 1, false);pubTok->wait();std::cout << "Published: " << message << std::endl;// 每隔5秒发布一次数据std::this_thread::sleep_for(std::chrono::seconds(5));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在上述代码中,使用了 C++ 的随机数生成库来模拟温湿度传感器的数据,然后通过 Paho MQTT C++ 库将数据发布到名为sensor/temperature_humidity的主题上,并且每隔 5 秒发布一次 。

3.2 服务端数据接收与处理(订阅主题、数据存储到数据库)

服务端需要订阅设备端发布数据的主题,并对接收到的数据进行处理,这里以将数据存储到 SQLite 数据库为例。首先需要安装 SQLite 的 C++ 开发库,在 Ubuntu 上可以通过sudo apt-get install libsqlite3-dev命令安装 。

#include <iostream>
#include <paho-mqttpp3.hpp>
#include <sqlite3.h>class MQTTCallback : public mqtt::callback {
public:MQTTCallback() {// 初始化SQLite数据库int rc = sqlite3_open("sensor_data.db", &db);if (rc) {std::cerr << "Can't open database: " << sqlite3_errmsg(db) << std::endl;} else {std::cout << "Opened database successfully" << std::endl;// 创建表std::string createTable = "CREATE TABLE IF NOT EXISTS sensor_readings (""id INTEGER PRIMARY KEY AUTOINCREMENT,""timestamp TEXT,""data TEXT);";char *zErrMsg = 0;rc = sqlite3_exec(db, createTable.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);}}}~MQTTCallback() {sqlite3_close(db);}void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string data(message.payload.data());auto now = std::chrono::system_clock::now();auto in_time_t = std::chrono::system_clock::to_time_t(now);std::string timestamp(ctime(&in_time_t));timestamp.pop_back();// 去掉换行符// 插入数据到数据库std::string insert = "INSERT INTO sensor_readings (timestamp, data) VALUES ('" + timestamp + "', '" + data + "');";char *zErrMsg = 0;int rc = sqlite3_exec(db, insert.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);} else {std::cout << "Stored data: " << data << " at " << timestamp << std::endl;}}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}private:sqlite3 *db;
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "serverClient";std::string topic = "sensor/temperature_humidity";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;mqtt::token_ptr subTok = client.subscribe(topic, 1);subTok->wait();std::cout << "Subscribed to topic: " << topic << std::endl;// 保持程序运行,持续接收消息while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在上述代码中,MQTTCallback类的构造函数中初始化了 SQLite 数据库并创建了用于存储传感器数据的表 。messageArrived方法在接收到消息时,将消息内容和当前时间插入到数据库中 。

3.3 设备远程控制(服务端发布控制指令、设备端接收并执行)

实现设备远程控制功能,服务端可以发布控制指令到特定主题,设备端订阅该主题并在接收到指令时执行相应操作。这里以控制设备的开关为例,假设设备端有一个executeCommand函数来执行具体的控制操作。

// 设备端代码,添加控制指令处理部分
#include <iostream>
#include <paho-mqttpp3.hpp>
#include <thread>
#include <chrono>
#include <random>class MQTTCallback : public mqtt::callback {
public:void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string command(message.payload.data());std::cout << "Received command: " << command << std::endl;executeCommand(command);}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}void executeCommand(const std::string& command) {if (command == "turn_on") {std::cout << "Device turned on" << std::endl;// 这里添加实际打开设备的代码,比如控制硬件的GPIO等} else if (command == "turn_off") {std::cout << "Device turned off" << std::endl;// 这里添加实际关闭设备的代码}}
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "deviceClient";std::string dataTopic = "sensor/temperature_humidity";std::string controlTopic = "device/control";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;// 订阅控制指令主题mqtt::token_ptr subTok = client.subscribe(controlTopic, 1);subTok->wait();std::cout << "Subscribed to control topic: " << controlTopic << std::endl;// 随机数生成器,用于模拟传感器数据std::random_device rd;std::mt19937 gen(rd());std::uniform_real_distribution<> dis_temperature(20.0, 30.0);std::uniform_real_distribution<> dis_humidity(40.0, 60.0);while (true) {// 生成模拟的温湿度数据double temperature = dis_temperature(gen);double humidity = dis_humidity(gen);// 构建消息内容std::string message = "Temperature: " + std::to_string(temperature) + " C, Humidity: " + std::to_string(humidity) + " %";mqtt::token_ptr pubTok = client.publish(dataTopic, message, 1, false);pubTok->wait();std::cout << "Published: " << message << std::endl;// 每隔5秒发布一次数据std::this_thread::sleep_for(std::chrono::seconds(5));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}
// 服务端代码,添加发布控制指令部分
#include <iostream>
#include <paho-mqttpp3.hpp>
#include <sqlite3.h>
#include <thread>class MQTTCallback : public mqtt::callback {
public:MQTTCallback() {// 初始化SQLite数据库int rc = sqlite3_open("sensor_data.db", &db);if (rc) {std::cerr << "Can't open database: " << sqlite3_errmsg(db) << std::endl;} else {std::cout << "Opened database successfully" << std::endl;// 创建表std::string createTable = "CREATE TABLE IF NOT EXISTS sensor_readings (""id INTEGER PRIMARY KEY AUTOINCREMENT,""timestamp TEXT,""data TEXT);";char *zErrMsg = 0;rc = sqlite3_exec(db, createTable.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);}}}~MQTTCallback() {sqlite3_close(db);}void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string data(message.payload.data());auto now = std::chrono::system_clock::now();auto in_time_t = std::chrono::system_clock::to_time_t(now);std::string timestamp(ctime(&in_time_t));timestamp.pop_back();// 去掉换行符// 插入数据到数据库std::string insert = "INSERT INTO sensor_readings (timestamp, data) VALUES ('" + timestamp + "', '" + data + "');";char *zErrMsg = 0;int rc = sqlite3_exec(db, insert.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);} else {std::cout << "Stored data: " << data << " at " << timestamp << std::endl;}}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}private:sqlite3 *db;
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "serverClient";std::string dataTopic = "sensor/temperature_humidity";std::string controlTopic = "device/control";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;mqtt::token_ptr subTok = client.subscribe(dataTopic, 1);subTok->wait();std::cout << "Subscribed to data topic: " << dataTopic << std::endl;// 模拟发布控制指令std::thread([&client, controlTopic]() {while (true) {std::string command;std::cout << "Enter control command (turn_on/turn_off): ";std::cin >> command;mqtt::token_ptr pubTok = client.publish(controlTopic, command, 1, false);pubTok->wait();std::cout << "Published command: " << command << std::endl;}}).detach();// 保持程序运行,持续接收消息while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在设备端代码中,MQTTCallback类新增了executeCommand方法,用于处理接收到的控制指令 。在服务端代码中,通过一个线程模拟用户输入控制指令并发布到控制主题上 。

四、实战项目:物联网温湿度监控系统

4.1 项目需求(模拟温湿度传感器、MQTT 发布数据、服务端接收并展示)

本实战项目旨在构建一个物联网温湿度监控系统,模拟真实环境中的温湿度传感器,利用 MQTT 协议进行数据传输,实现数据从设备端到服务端的高效交互,并在服务端接收和展示数据 。

系统的设备端通过代码模拟温湿度传感器,生成随机的温湿度数据,模拟真实传感器在不同环境下采集数据的过程 。这些数据将通过 MQTT 协议发布到指定的主题上,利用 MQTT 的轻量级和发布 / 订阅模式的优势,确保数据能够快速、可靠地传输 。

服务端则负责订阅设备端发布数据的主题,接收并处理这些数据 。处理过程包括将数据存储到数据库中,以便后续进行数据分析和查询,同时将数据展示出来,让用户能够直观地了解温湿度的实时状态 。展示方式可以是简单的控制台输出,也可以通过 Web 界面或其他可视化工具进行展示 。通过这样的设计,系统能够实现对温湿度数据的实时监控,为相关决策提供数据支持 。

4.2 Paho MQTT C++ 实现客户端与服务端交互

在本项目中,使用 Paho MQTT C++ 库来实现客户端与服务端的交互 。设备端作为客户端,代码如下:

#include <iostream>
#include <paho-mqttpp3.hpp>
#include <thread>
#include <chrono>
#include <random>class MQTTCallback : public mqtt::callback {
public:void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::cout << "Message arrived on topic: " << topic<< ", Message: " << message.payload.data() << std::endl;}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "deviceClient";std::string topic = "sensor/temperature_humidity";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;// 随机数生成器,用于模拟传感器数据std::random_device rd;std::mt19937 gen(rd());std::uniform_real_distribution<> dis_temperature(20.0, 30.0);std::uniform_real_distribution<> dis_humidity(40.0, 60.0);while (true) {// 生成模拟的温湿度数据double temperature = dis_temperature(gen);double humidity = dis_humidity(gen);// 构建消息内容std::string message = "Temperature: " + std::to_string(temperature) + " C, Humidity: " + std::to_string(humidity) + " %";mqtt::token_ptr pubTok = client.publish(topic, message, 1, false);pubTok->wait();std::cout << "Published: " << message << std::endl;// 每隔5秒发布一次数据std::this_thread::sleep_for(std::chrono::seconds(5));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

上述代码中,设备端客户端首先连接到 MQTT Broker,然后通过随机数生成器模拟温湿度传感器生成数据,并每隔 5 秒将数据发布到名为sensor/temperature_humidity的主题上 。

服务端代码如下:

#include <iostream>
#include <paho-mqttpp3.hpp>
#include <sqlite3.h>class MQTTCallback : public mqtt::callback {
public:MQTTCallback() {// 初始化SQLite数据库int rc = sqlite3_open("sensor_data.db", &db);if (rc) {std::cerr << "Can't open database: " << sqlite3_errmsg(db) << std::endl;} else {std::cout << "Opened database successfully" << std::endl;// 创建表std::string createTable = "CREATE TABLE IF NOT EXISTS sensor_readings (""id INTEGER PRIMARY KEY AUTOINCREMENT,""timestamp TEXT,""data TEXT);";char *zErrMsg = 0;rc = sqlite3_exec(db, createTable.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);}}}~MQTTCallback() {sqlite3_close(db);}void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string data(message.payload.data());auto now = std::chrono::system_clock::now();auto in_time_t = std::chrono::system_clock::to_time_t(now);std::string timestamp(ctime(&in_time_t));timestamp.pop_back();// 去掉换行符// 插入数据到数据库std::string insert = "INSERT INTO sensor_readings (timestamp, data) VALUES ('" + timestamp + "', '" + data + "');";char *zErrMsg = 0;int rc = sqlite3_exec(db, insert.c_str(), 0, 0, &zErrMsg);if (rc != SQLITE_OK) {std::cerr << "SQL error: " << zErrMsg << std::endl;sqlite3_free(zErrMsg);} else {std::cout << "Stored data: " << data << " at " << timestamp << std::endl;}}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}private:sqlite3 *db;
};int main() {std::string broker = "tcp://localhost:1883";std::string clientId = "serverClient";std::string topic = "sensor/temperature_humidity";try {mqtt::async_client client(broker, clientId);MQTTCallback cb;client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;mqtt::token_ptr subTok = client.subscribe(topic, 1);subTok->wait();std::cout << "Subscribed to topic: " << topic << std::endl;// 保持程序运行,持续接收消息while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

服务端客户端连接到 MQTT Broker 并订阅sensor/temperature_humidity主题,在接收到消息时,将消息内容和当前时间插入到 SQLite 数据库中 。

4.3 数据可视化(结合 Qt 展示实时数据)与异常报警

为了实现数据可视化,结合 Qt 框架来展示实时的温湿度数据 。首先需要安装 Qt 开发环境,在 Ubuntu 上可以通过sudo apt-get install qtbase5-dev qtdeclarative5-dev命令安装 。

创建一个 Qt 项目,在项目中添加一个 QChartView 用于显示折线图,展示温湿度随时间的变化 。以下是一个简单的示例代码,展示如何在 Qt 中创建一个实时更新的折线图来显示温湿度数据:

#include <QtWidgets/QApplication>
#include <QtCharts/QChartView>
#include <QtCharts/QLineSeries>
#include <QtCharts/QValueAxis>
#include <thread>
#include <chrono>
#include <paho-mqttpp3.hpp>QT_CHARTS_USE_NAMESPACEclass MQTTCallback : public mqtt::callback {
public:MQTTCallback(QLineSeries *tempSeries, QLineSeries *humidSeries): temperatureSeries(tempSeries), humiditySeries(humidSeries) {}void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string data(message.payload.data());size_t pos1 = data.find("Temperature: ");size_t pos2 = data.find(" C, Humidity: ");if (pos1 != std::string::npos && pos2 != std::string::npos) {double temperature = std::stod(data.substr(pos1 + 12, pos2 - pos1 - 12));double humidity = std::stod(data.substr(pos2 + 12, data.length() - pos2 - 13));QMetaObject::invokeMethod(temperatureSeries, [this, temperature]() {static int count = 0;temperatureSeries->append(count++, temperature);if (count > 100) {temperatureSeries->remove(0);}}, Qt::QueuedConnection);QMetaObject::invokeMethod(humiditySeries, [this, humidity]() {static int count = 0;humiditySeries->append(count++, humidity);if (count > 100) {humiditySeries->remove(0);}}, Qt::QueuedConnection);}}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}private:QLineSeries *temperatureSeries;QLineSeries *humiditySeries;
};int main(int argc, char *argv[]) {QApplication a(argc, argv);QChart *chart = new QChart();chart->legend()->hide();QValueAxis *axisX = new QValueAxis;axisX->setRange(0, 100);axisX->setTitleText("Time");QValueAxis *axisY = new QValueAxis;axisY->setRange(15, 35);axisY->setTitleText("Temperature (C)");QLineSeries *temperatureSeries = new QLineSeries;QLineSeries *humiditySeries = new QLineSeries;chart->addAxis(axisX, Qt::AlignBottom);chart->addAxis(axisY, Qt::AlignLeft);temperatureSeries->attachAxis(axisX);temperatureSeries->attachAxis(axisY);humiditySeries->attachAxis(axisX);QValueAxis *axisY2 = new QValueAxis;axisY2->setRange(30, 70);axisY2->setTitleText("Humidity (%)");chart->addAxis(axisY2, Qt::AlignRight);humiditySeries->attachAxis(axisY2);chart->addSeries(temperatureSeries);chart->addSeries(humiditySeries);QChartView *chartView = new QChartView(chart);chartView->setRenderHint(QPainter::Antialiasing);chartView->resize(800, 600);chartView->show();std::string broker = "tcp://localhost:1883";std::string clientId = "qtClient";std::string topic = "sensor/temperature_humidity";try {mqtt::async_client client(broker, clientId);MQTTCallback cb(temperatureSeries, humiditySeries);client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(false);std::cout << "Connecting to broker: " << broker << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected" << std::endl;mqtt::token_ptr subTok = client.subscribe(topic, 1);subTok->wait();std::cout << "Subscribed to topic: " << topic << std::endl;// 保持程序运行,持续接收消息while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}client.disconnect()->wait();std::cout << "Disconnected" << std::endl;} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return a.exec();
}

在上述代码中,创建了两个 QLineSeries 分别用于显示温度和湿度数据 。MQTTCallback类在接收到消息时,解析出温湿度数据,并通过QMetaObject::invokeMethod将数据添加到对应的折线图中 。

为了实现异常报警功能,在接收到数据时,判断温湿度是否超出正常范围,如果超出则进行报警 。可以通过控制台输出报警信息,也可以通过发送邮件、短信等方式进行报警 。以下是在MQTTCallback类中添加异常报警的示例代码:

class MQTTCallback : public mqtt::callback {
public:MQTTCallback(QLineSeries *tempSeries, QLineSeries *humidSeries): temperatureSeries(tempSeries), humiditySeries(humidSeries) {}void connectionLost(const std::string& cause) override {std::cout << "Connection lost: " << cause << std::endl;}void messageArrived(const std::string& topic, mqtt::message& message) override {std::string data(message.payload.data());size_t pos1 = data.find("Temperature: ");size_t pos2 = data.find(" C, Humidity: ");if (pos1 != std::string::npos && pos2 != std::string::npos) {double temperature = std::stod(data.substr(pos1 + 12, pos2 - pos1 - 12));double humidity = std::stod(data.substr(pos2 + 12, data.length() - pos2 - 13));QMetaObject::invokeMethod(temperatureSeries, [this, temperature]() {static int count = 0;temperatureSeries->append(count++, temperature);if (count > 100) {temperatureSeries->remove(0);}if (temperature < 20 || temperature > 30) {std::cout << "Temperature alarm: " << temperature << " C" << std::endl;}}, Qt::QueuedConnection);QMetaObject::invokeMethod(humiditySeries, [this, humidity]() {static int count = 0;humiditySeries->append(count++, humidity);if (count > 100) {humiditySeries->remove(0);}if (humidity < 40 || humidity > 60) {std::cout << "Humidity alarm: " << humidity << " %" << std::endl;}}, Qt::QueuedConnection);}}void deliveryComplete(mqtt::delivery_token_ptr token) override {std::cout << "Delivery complete for token: " << token->get_message_id() << std::endl;}private:QLineSeries *temperatureSeries;QLineSeries *humiditySeries;
};

在上述代码中,当温度超出 20 - 30°C 范围或湿度超出 40 - 60% 范围时,在控制台输出报警信息 。这样就完成了物联网温湿度监控系统的数据可视化和异常报警功能 。

http://www.dtcms.com/a/450341.html

相关文章:

  • VGG改进(13):基于FFT的Frequency Attention模块应用
  • 商业网站教程阿里云有主体新增网站
  • 《数据密集型应用系统设计2》--数据系统对比:OLAP/OLTP,数仓/数据湖/数据湖屋
  • Oracle OCP认证考试题目详解082系列第55题
  • 学做宝宝衣服网站好h5和小程序有什么区别
  • day5
  • 2025-10-06 Python不基础13——mro
  • 那片海dede织梦源码企业网络公司工作室网站模板源码模板php网页游戏维京传奇
  • 【深度学习03】神经网络基本骨架、卷积、池化、非线性激活、线性层、搭建网络
  • 新媒体营销seo个人优化方案案例
  • Redis项目应用总结(苍穹外卖/黑马头条/乐尚代驾)
  • 做网站js还是jq2021年世界500强榜单
  • 建设旅游网站的费用预算杭州抖音代运营
  • 【LaTeX】 13 LaTeX 长文档结构管理
  • Python入门:Python3基础练习题详解
  • 高端网站建设加盟帮人做彩票网站
  • 哪个网站做的ppt模板好查查企业网
  • 为什么做的网站别的浏览器打不开怎么回事做网站规划
  • 做影视后期应该关注哪些网站做神马网站优化快速
  • 测试题——1
  • 力扣3634. 使数组平衡的最少移除数目
  • 网站服务器不稳定樟木头网站建设
  • 建设网站都需投入哪些资源wordpress没有图片
  • 网站主栏目投资网站排行
  • 国内永久crmseo刷关键词排名免费
  • 爬虫的道德与法律边界:Robots 协议、版权与个人信息保护
  • @[TOC](文件操作和IO)
  • 打开网站不要出现 index.html携程网站联盟
  • 律师行业协会网站建设做简历的网站叫什么
  • c++ enum和enum class