当前位置: 首页 > news >正文

PHP MQTT 订阅服务:实时消息接收与数据库存储解决方案

下面是一个完整的 PHP MQTT 订阅服务实现,能够连接 EMQX 消息服务器,订阅指定主题,并将接收到的消息持久化存储到 MySQL 数据库。

完整的 PHP 实现代码

php

<?php
/*** MQTT 消息订阅与存储服务* * 功能说明:* - 连接到 EMQX MQTT 服务器并订阅指定主题* - 实时接收并处理 MQTT 消息* - 将消息内容安全存储到 MySQL 数据库* - 支持服务质量等级 QOS 1,确保消息可靠传输* * 运行方式:php mqtt_subscriber.php*/require_once dirname(__DIR__) . '/vendor/autoload.php';use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;/*** 服务配置参数*/
// 数据库连接配置
$dbConfig = ['host'     => 'localhost',    // 数据库主机地址'user'     => 'your_username', // 数据库用户名'pass'     => 'your_password', // 数据库密码'dbname'   => 'your_database'  // 数据库名称
];// EMQX MQTT 服务器配置
$server   = 'your_emqx_server.com'; // MQTT 服务器地址
$port     = 1883;                   // MQTT 服务端口
$clientId = 'php_mqtt_subscriber_' . uniqid(); // 客户端唯一标识
$username = 'your_mqtt_username';   // MQTT 认证用户名
$password = 'your_mqtt_password';   // MQTT 认证密码
$topic    = 'UADX001/123456/Pub/';  // 订阅的主题
$qos      = 1;                      // 服务质量等级 (0-最多一次, 1-至少一次, 2-恰好一次)/*** 获取数据库连接(单例模式)* * @param array $config 数据库配置数组* @return mysqli MySQLi 连接对象* @throws Exception 数据库连接失败时抛出异常*/
function getDbConnection($config) {static $conn = null;if ($conn === null) {$conn = new mysqli($config['host'], $config['user'], $config['pass'], $config['dbname']);if ($conn->connect_error) {throw new Exception("数据库连接失败: " . $conn->connect_error);}// 设置字符集$conn->set_charset("utf8mb4");}return $conn;
}/*** 安全存储消息到数据库* * @param string $topic 消息主题* @param string $message 消息内容* @param array $dbConfig 数据库配置*/
function saveMessage($topic, $message, $dbConfig) {try {$conn = getDbConnection($dbConfig);// 使用预处理语句防止 SQL 注入攻击$stmt = $conn->prepare("INSERT INTO cmf_portal_device_log (topic, message, created_at) VALUES (?, ?, NOW())");if ($stmt === false) {throw new Exception("预处理语句创建失败: " . $conn->error);}$stmt->bind_param("ss", $topic, $message);if ($stmt->execute()) {echo "[" . date('Y-m-d H:i:s') . "] 消息已保存,记录ID: " . $conn->insert_id . "\n";} else {echo "[" . date('Y-m-d H:i:s') . "] 消息保存失败: " . $stmt->error . "\n";}$stmt->close();} catch (Exception $e) {echo "[" . date('Y-m-d H:i:s') . "] 保存消息时出错: " . $e->getMessage() . "\n";}
}/*** 消息接收回调函数* * @param string $topic 接收消息的主题* @param string $message 消息内容*/
$messageHandler = function ($topic, $message) use ($dbConfig) {echo "[" . date('Y-m-d H:i:s') . "] 收到新消息\n";echo "    主题: {$topic}\n";echo "    内容: {$message}\n";echo "    " . str_repeat("-", 50) . "\n";// 存储消息到数据库saveMessage($topic, $message, $dbConfig);
};/*** 创建 MQTT 连接配置*/
$connectionSettings = (new ConnectionSettings)->setUsername($username)->setPassword($password)->setKeepAliveInterval(60)     // 保活间隔(秒)->setLastWillTopic('php/subscriber/status')->setLastWillMessage('offline')->setRetainLastWill(true);/*** 主程序执行流程*/
try {// 初始化 MQTT 客户端$client = new MqttClient($server, $port, $clientId);echo "[" . date('Y-m-d H:i:s') . "] 正在连接到 EMQX 服务器...\n";// 建立 MQTT 连接$client->connect($connectionSettings, true);echo "[" . date('Y-m-d H:i:s') . "] ✓ 成功连接到 EMQX 服务器\n";// 订阅主题并设置消息处理回调$client->subscribe($topic, $messageHandler, $qos);echo "[" . date('Y-m-d H:i:s') . "] ✓ 已订阅主题: {$topic} (QOS: {$qos})\n";echo "[" . date('Y-m-d H:i:s') . "] 🚀 服务已启动,等待接收消息...\n";echo "    按 Ctrl + C 可退出程序\n";echo str_repeat("=", 60) . "\n";// 进入消息监听循环$client->loop(true);} catch (Exception $e) {echo "[" . date('Y-m-d H:i:s') . "] ✗ 发生错误: {$e->getMessage()}\n";// 确保连接被正确关闭if (isset($client)) {$client->disconnect();}exit(1); // 非正常退出
}// 正常退出时的清理工作
if (isset($client)) {$client->disconnect();echo "[" . date('Y-m-d H:i:s') . "] 连接已关闭,服务停止\n";
}
?>
服务部署与运行

1. 环境要求

  • PHP 7.4 或更高版本

  • MySQL 5.7+ 数据库

  • php-mqtt/client 扩展包

  • EMQX 或其他 MQTT 消息服务器

2. 安装依赖

bash

composer require php-mqtt/client

3. 创建数据库表

sql

CREATE TABLE cmf_portal_device_log (id BIGINT AUTO_INCREMENT PRIMARY KEY,topic VARCHAR(255) NOT NULL,message TEXT NOT NULL,created_at DATETIME NOT NULL,INDEX idx_topic (topic),INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4. 运行服务

bash

php mqtt_subscriber.php
功能特性
  • 可靠连接:支持自动重连和连接状态监控

  • 数据安全:使用预处理语句防止 SQL 注入

  • 资源优化:数据库连接复用,避免频繁创建连接

  • 完整日志:详细的运行状态和错误日志输出

  • 优雅退出:支持信号中断和安全资源释放

应用场景
  • IoT 设备数据采集与存储

  • 实时消息处理流水线

  • 设备状态监控系统

  • 分布式系统间数据同步

这个解决方案提供了一个稳定可靠的 MQTT 消息处理基础框架,可以根据具体业务需求进行扩展和定制

http://www.dtcms.com/a/483189.html

相关文章:

  • 设计网站网站名称冒用公司名做网站
  • Android进阶之路 - 全方位监听视图加载完成
  • PPT-EA:PPT自动生成器
  • 企业微信-智能表格-视图类型
  • 产品网站建设PHP MYSQL网站开发全程实
  • 性能优化-Vue3 + Vite:图片上传/优化到 OSS 并统一使用vite 的 .env 全局配置,js 和 css 共用变量
  • 网站建设项目背景杭州一起做网站
  • 做最好的色书网站半透明wordpress主题源码
  • 17.UE-游戏逆向-查找Aactor(游戏中物品的名字和坐标)
  • 扫黄打非网站建设wordpress上传视频插件
  • 手机网站底部广告代码wordpress版本下载
  • 量子计算与深度学习:新时代材料模拟实战
  • SMTPman,smtp服务器高效邮件发送核心指南
  • UE C++ TMap容器的 创建和遍历
  • 域名与网站名称的关系企业发展历程网站
  • MyBatis-Flex 来了
  • 带权并查集
  • 建设网站多少钱 郑州浏览器什么网站都能打开的
  • 安卓13_ROM修改定制化-----常用几种去除系统签名类验证的操作步骤解析
  • 安卓导出谷歌包
  • 上海百度网络推广极限优化wordpress
  • 南京市建设监理协会网站dedecms 网站根目录
  • 创建Mybatis框架
  • 从化网站建设方案百度网站好评
  • 电商网站前台模块自己做的网站加载不出验证码
  • 数据结构 03 栈和队列
  • 微商城网站建设哪家好wordpress国内优化
  • 热释电传感器(PIR Sensor)技术深度解析:从物理原理到工程实践
  • 做餐厅网站的需求分析创造网站
  • docker项目打包演示项目(数字排序服务)