当前位置: 首页 > news >正文

thinkPHP6.1使用PhpMqtt进行MQTT消息订阅和发布,并将订阅的消息入库保存,可控制超时退出订阅

#安装PhpMqtt

composer require php-mqtt/client

#上代码

<?php
namespace app\command;use app\common\model\MqttConfig as MqttConfigModel;
use app\common\model\MqttMessage as MqttMessageModel;
use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\MqttClient;
use think\Exception;
use think\facade\Log;class Mqtt
{protected ConnectionSettings $connectionSettings;protected array $mqttConfig = [];/*** @throws Exception*/public function __construct(){$configModel = new MqttConfigModel();if (!$configModel->getMqttConfig(['id' => 1])) {throw new Exception("MQTT配置错误,请联系管理员");}$mqttConfig = $configModel->data;$this->mqttConfig = $mqttConfig;$connectionSettings = (new ConnectionSettings)/*** 心跳间隔是客户端在向代理发送保活信号(ping)之前等待而不发送消息的秒数。* 该值不能小于1秒,也不能高于65535秒。一个合理的值是10秒(默认值)。*/->setKeepAliveInterval($mqttConfig['keep_alive'])/*** 连接超时是客户端尝试与代理建立套接字连接的最大秒数。* 该值不能小于1秒。*/->setConnectTimeout($mqttConfig['connect_timeout']);if (trim($mqttConfig['username']) != '' && trim($mqttConfig['password']) != '') {$connectionSettings = $connectionSettings/*** 连接到代理时用于身份验证的用户名。*/->setUsername($mqttConfig['username'])/*** 连接到代理时用于身份验证的密码。*/->setPassword($mqttConfig['password']);}if (empty($mqttConfig['clean_session'])) {$connectionSettings = $connectionSettings/*** 如果客户端在发送数据时发现断开连接,此标志决定客户端是否会尝试自动重新连接。* 该设置不能与干净会话标志一起使用。*/->setReconnectAutomatically(true)/*** 定义重新连接尝试之间的延迟(毫秒)。* 只有当setReconnectAutomatic()设置为true时,此设置才相关。*/->setDelayBetweenReconnectAttempts(1000)/*** 定义客户端放弃之前重新连接的最大尝试次数。* 只有当setReconnectAutomatic()设置为true时,此设置才相关。*/->setMaxReconnectAttempts(60);}$this->connectionSettings = $connectionSettings;}/*** 订阅消息* @return string*/public function subscribe(): string{try {if (empty($this->mqttConfig)) {throw new Exception("MQTT配置错误,请联系管理员");}$clientId = $this->mqttConfig['client_id'] . '-sub-'.rand(1000, 9999);$mqttService = new MqttClient($this->mqttConfig['server'], $this->mqttConfig['port'], $clientId, $this->mqttConfig['version']);/*** 超时限制:秒*/$maxWaitTime = $this->mqttConfig['max_wait_time'] ?? 10;/*** 超时状态*/$timeOutStatus = false;/*** 验证等待超时* 注册一个循环事件处理程序,每次循环迭代时都会调用该处理程序。例如,此事件处理程序可用于在某些条件下中断循环。* 循环事件处理程序作为第一个参数传递给MQTT客户端实例,并作为第二个参数传递循环已经运行的时间。经过的时间是一个包含秒的浮点数。*//*$mqttService->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use ($maxWaitTime, &$timeOutStatus) {if ($elapsedTime >= $maxWaitTime) {$client->interrupt();$timeOutStatus = true;return;}});*/$mqttService->connect($this->connectionSettings, !empty($this->mqttConfig['clean_session']));$messageModel = new MqttMessageModel();$mqttService->subscribe($this->mqttConfig['sub_topic'], function ($topic, $message) use($messageModel) {$log = $topic . PHP_EOL . $message;Log::channel('mqtt')->write($log, 'subscribe');$data = json_decode($message, true);$messageModel->doBatchInsert($data);//$mqttService->interrupt();//printf("Received message on topic [%s]: %s\n", $topic, $message);}, $this->mqttConfig['qos']);$mqttService->loop(true);$mqttService->disconnect();}catch (\Exception $e) {trace($e->getMessage(), __METHOD__);}$message = implode(PHP_EOL, $msg);       return $message;}/*** 发布消息* @return string*/public function publish(): string{       try {if (empty($this->mqttConfig)) {throw new Exception("MQTT配置错误,请联系管理员");}$clientId = $this->mqttConfig['client_id'] . '-pub-'.rand(1000, 9999);$mqttService = new MqttClient($this->mqttConfig['server'], $this->mqttConfig['port'], $clientId, $this->mqttConfig['version']);$mqttService->connect($this->connectionSettings, !empty($this->mqttConfig['clean_session']));$mqttConfig = config('mqtt');$data = $mqttConfig['dev']['payload'];if ($mqttConfig['test_data_type'] == 'device') {$data = ['device_id' => $mqttConfig['test_device_id']];}/*** $qualityOfService* QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。* QoS 1:消息至少传送一次。* QoS 2:消息只传送一次。*/$qualityOfService = 0;for ($i = 0; $i< 1000; $i++) {//$data['id'] = $i;//$data['ts'] = date('Y-m-d H:i:s');$mqttService->publish($this->mqttConfig['pub_topic'],json_encode($data, JSON_UNESCAPED_UNICODE),$this->mqttConfig['qos'],true);$log = $this->mqttConfig['pub_topic'] . PHP_EOL . json_encode($data, JSON_UNESCAPED_UNICODE);Log::channel('mqtt')->write($log, 'publish');sleep(1);}//$mqttService->loop(true);$mqttService->disconnect();           }catch (\Exception $e) {trace($e->getMessage(), __METHOD__);}$message = implode(PHP_EOL, $msg);return $message;}
}
http://www.dtcms.com/a/606183.html

相关文章:

  • 做网站作业什么主题商机网wordpress模板
  • Xcode的App Thinning Size Report分析包体积
  • 多机多卡训练指南
  • 深入浅出:进程和线程的区别与联系
  • 做一个静态网站需要多少钱关键词
  • 西安网站建设方案外包浏览器网页版打开网页
  • 【Redis】 SpringBoot集成Redis
  • 网易企业邮箱邮箱登录入口江西网站建设优化服务
  • 汕头吧 百度贴吧超级优化小说
  • 视觉学习篇——机器学习模型评价指标
  • Java Agent 和字节码注入技术原理和实现
  • Java后端常用技术选型 |(五)可视化工具篇
  • 【数据库】Apache IoTDB数据库在大数据场景下的时序数据模型与建模方案
  • 网站建设系统课程广东建设网 四川是什么网站
  • 不止于 API 调用:解锁 Java 工具类设计的三重境界 —— 可复用性、线程安全与性能优化
  • 数据结构与算法:树(Tree)精讲
  • AI入门系列之GraphRAG使用指南:从环境搭建到实战应用
  • 【SolidWorks】默认模板设置
  • 基于秩极小化的压缩感知图像重建的MATLAB实现
  • 无人机图传模块技术要点与难点
  • Spring Cloud Alibaba 2025.0.0 整合 ELK 实现日志
  • AI+虚拟仿真:开启无人机农林应用人才培养新路径
  • ELK 9.2.0 安装部署手册
  • 代码统计网站wordpress设置在新页面打开空白
  • 网站开发的流程 知乎设计培训网站建设
  • Qt 的字节序转换
  • QT Quick QML项目音乐播放器17----自定义Notification通知、请求错误提示、Loading加载中提示
  • 【Qt】AddressSanitizer 简介
  • Linux(麒麟)服务器离线安装单机Milvus向量库
  • Qt Widgets和Qt Qucik在开发工控触摸程序的选择