PHP MQTT 订阅服务:实时消息接收与数据库存储解决方案
下面是一个完整的 PHP MQTT 订阅服务实现,能够连接 EMQX 消息服务器,订阅指定主题,并将接收到的消息持久化存储到 MySQL 数据库。
完整的 PHP 实现代码
php
<?php /*** MQTT 消息订阅与存储服务* * 功能说明:* - 连接到 EMQX MQTT 服务器并订阅指定主题* - 实时接收并处理 MQTT 消息* - 将消息内容安全存储到 MySQL 数据库* - 支持服务质量等级 QOS 1,确保消息可靠传输* * 运行方式:php mqtt_subscriber.php*/require_once dirname(__DIR__) . '/vendor/autoload.php';use PhpMqtt\Client\MqttClient; use PhpMqtt\Client\ConnectionSettings;/*** 服务配置参数*/ // 数据库连接配置 $dbConfig = ['host' => 'localhost', // 数据库主机地址'user' => 'your_username', // 数据库用户名'pass' => 'your_password', // 数据库密码'dbname' => 'your_database' // 数据库名称 ];// EMQX MQTT 服务器配置 $server = 'your_emqx_server.com'; // MQTT 服务器地址 $port = 1883; // MQTT 服务端口 $clientId = 'php_mqtt_subscriber_' . uniqid(); // 客户端唯一标识 $username = 'your_mqtt_username'; // MQTT 认证用户名 $password = 'your_mqtt_password'; // MQTT 认证密码 $topic = 'UADX001/123456/Pub/'; // 订阅的主题 $qos = 1; // 服务质量等级 (0-最多一次, 1-至少一次, 2-恰好一次)/*** 获取数据库连接(单例模式)* * @param array $config 数据库配置数组* @return mysqli MySQLi 连接对象* @throws Exception 数据库连接失败时抛出异常*/ function getDbConnection($config) {static $conn = null;if ($conn === null) {$conn = new mysqli($config['host'], $config['user'], $config['pass'], $config['dbname']);if ($conn->connect_error) {throw new Exception("数据库连接失败: " . $conn->connect_error);}// 设置字符集$conn->set_charset("utf8mb4");}return $conn; }/*** 安全存储消息到数据库* * @param string $topic 消息主题* @param string $message 消息内容* @param array $dbConfig 数据库配置*/ function saveMessage($topic, $message, $dbConfig) {try {$conn = getDbConnection($dbConfig);// 使用预处理语句防止 SQL 注入攻击$stmt = $conn->prepare("INSERT INTO cmf_portal_device_log (topic, message, created_at) VALUES (?, ?, NOW())");if ($stmt === false) {throw new Exception("预处理语句创建失败: " . $conn->error);}$stmt->bind_param("ss", $topic, $message);if ($stmt->execute()) {echo "[" . date('Y-m-d H:i:s') . "] 消息已保存,记录ID: " . $conn->insert_id . "\n";} else {echo "[" . date('Y-m-d H:i:s') . "] 消息保存失败: " . $stmt->error . "\n";}$stmt->close();} catch (Exception $e) {echo "[" . date('Y-m-d H:i:s') . "] 保存消息时出错: " . $e->getMessage() . "\n";} }/*** 消息接收回调函数* * @param string $topic 接收消息的主题* @param string $message 消息内容*/ $messageHandler = function ($topic, $message) use ($dbConfig) {echo "[" . date('Y-m-d H:i:s') . "] 收到新消息\n";echo " 主题: {$topic}\n";echo " 内容: {$message}\n";echo " " . str_repeat("-", 50) . "\n";// 存储消息到数据库saveMessage($topic, $message, $dbConfig); };/*** 创建 MQTT 连接配置*/ $connectionSettings = (new ConnectionSettings)->setUsername($username)->setPassword($password)->setKeepAliveInterval(60) // 保活间隔(秒)->setLastWillTopic('php/subscriber/status')->setLastWillMessage('offline')->setRetainLastWill(true);/*** 主程序执行流程*/ try {// 初始化 MQTT 客户端$client = new MqttClient($server, $port, $clientId);echo "[" . date('Y-m-d H:i:s') . "] 正在连接到 EMQX 服务器...\n";// 建立 MQTT 连接$client->connect($connectionSettings, true);echo "[" . date('Y-m-d H:i:s') . "] ✓ 成功连接到 EMQX 服务器\n";// 订阅主题并设置消息处理回调$client->subscribe($topic, $messageHandler, $qos);echo "[" . date('Y-m-d H:i:s') . "] ✓ 已订阅主题: {$topic} (QOS: {$qos})\n";echo "[" . date('Y-m-d H:i:s') . "] 🚀 服务已启动,等待接收消息...\n";echo " 按 Ctrl + C 可退出程序\n";echo str_repeat("=", 60) . "\n";// 进入消息监听循环$client->loop(true);} catch (Exception $e) {echo "[" . date('Y-m-d H:i:s') . "] ✗ 发生错误: {$e->getMessage()}\n";// 确保连接被正确关闭if (isset($client)) {$client->disconnect();}exit(1); // 非正常退出 }// 正常退出时的清理工作 if (isset($client)) {$client->disconnect();echo "[" . date('Y-m-d H:i:s') . "] 连接已关闭,服务停止\n"; } ?>
服务部署与运行
1. 环境要求
PHP 7.4 或更高版本
MySQL 5.7+ 数据库
php-mqtt/client
扩展包EMQX 或其他 MQTT 消息服务器
2. 安装依赖
bash
composer require php-mqtt/client
3. 创建数据库表
sql
CREATE TABLE cmf_portal_device_log (id BIGINT AUTO_INCREMENT PRIMARY KEY,topic VARCHAR(255) NOT NULL,message TEXT NOT NULL,created_at DATETIME NOT NULL,INDEX idx_topic (topic),INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4. 运行服务
bash
php mqtt_subscriber.php
功能特性
可靠连接:支持自动重连和连接状态监控
数据安全:使用预处理语句防止 SQL 注入
资源优化:数据库连接复用,避免频繁创建连接
完整日志:详细的运行状态和错误日志输出
优雅退出:支持信号中断和安全资源释放
应用场景
IoT 设备数据采集与存储
实时消息处理流水线
设备状态监控系统
分布式系统间数据同步
这个解决方案提供了一个稳定可靠的 MQTT 消息处理基础框架,可以根据具体业务需求进行扩展和定制