thinkPHP6.1使用PhpMqtt进行MQTT消息订阅和发布,并将订阅的消息入库保存,可控制超时退出订阅
#安装PhpMqtt
composer require php-mqtt/client
#上代码
<?php
namespace app\command;use app\common\model\MqttConfig as MqttConfigModel;
use app\common\model\MqttMessage as MqttMessageModel;
use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\MqttClient;
use think\Exception;
use think\facade\Log;class Mqtt
{protected ConnectionSettings $connectionSettings;protected array $mqttConfig = [];/*** @throws Exception*/public function __construct(){$configModel = new MqttConfigModel();if (!$configModel->getMqttConfig(['id' => 1])) {throw new Exception("MQTT配置错误,请联系管理员");}$mqttConfig = $configModel->data;$this->mqttConfig = $mqttConfig;$connectionSettings = (new ConnectionSettings)/*** 心跳间隔是客户端在向代理发送保活信号(ping)之前等待而不发送消息的秒数。* 该值不能小于1秒,也不能高于65535秒。一个合理的值是10秒(默认值)。*/->setKeepAliveInterval($mqttConfig['keep_alive'])/*** 连接超时是客户端尝试与代理建立套接字连接的最大秒数。* 该值不能小于1秒。*/->setConnectTimeout($mqttConfig['connect_timeout']);if (trim($mqttConfig['username']) != '' && trim($mqttConfig['password']) != '') {$connectionSettings = $connectionSettings/*** 连接到代理时用于身份验证的用户名。*/->setUsername($mqttConfig['username'])/*** 连接到代理时用于身份验证的密码。*/->setPassword($mqttConfig['password']);}if (empty($mqttConfig['clean_session'])) {$connectionSettings = $connectionSettings/*** 如果客户端在发送数据时发现断开连接,此标志决定客户端是否会尝试自动重新连接。* 该设置不能与干净会话标志一起使用。*/->setReconnectAutomatically(true)/*** 定义重新连接尝试之间的延迟(毫秒)。* 只有当setReconnectAutomatic()设置为true时,此设置才相关。*/->setDelayBetweenReconnectAttempts(1000)/*** 定义客户端放弃之前重新连接的最大尝试次数。* 只有当setReconnectAutomatic()设置为true时,此设置才相关。*/->setMaxReconnectAttempts(60);}$this->connectionSettings = $connectionSettings;}/*** 订阅消息* @return string*/public function subscribe(): string{try {if (empty($this->mqttConfig)) {throw new Exception("MQTT配置错误,请联系管理员");}$clientId = $this->mqttConfig['client_id'] . '-sub-'.rand(1000, 9999);$mqttService = new MqttClient($this->mqttConfig['server'], $this->mqttConfig['port'], $clientId, $this->mqttConfig['version']);/*** 超时限制:秒*/$maxWaitTime = $this->mqttConfig['max_wait_time'] ?? 10;/*** 超时状态*/$timeOutStatus = false;/*** 验证等待超时* 注册一个循环事件处理程序,每次循环迭代时都会调用该处理程序。例如,此事件处理程序可用于在某些条件下中断循环。* 循环事件处理程序作为第一个参数传递给MQTT客户端实例,并作为第二个参数传递循环已经运行的时间。经过的时间是一个包含秒的浮点数。*//*$mqttService->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use ($maxWaitTime, &$timeOutStatus) {if ($elapsedTime >= $maxWaitTime) {$client->interrupt();$timeOutStatus = true;return;}});*/$mqttService->connect($this->connectionSettings, !empty($this->mqttConfig['clean_session']));$messageModel = new MqttMessageModel();$mqttService->subscribe($this->mqttConfig['sub_topic'], function ($topic, $message) use($messageModel) {$log = $topic . PHP_EOL . $message;Log::channel('mqtt')->write($log, 'subscribe');$data = json_decode($message, true);$messageModel->doBatchInsert($data);//$mqttService->interrupt();//printf("Received message on topic [%s]: %s\n", $topic, $message);}, $this->mqttConfig['qos']);$mqttService->loop(true);$mqttService->disconnect();}catch (\Exception $e) {trace($e->getMessage(), __METHOD__);}$message = implode(PHP_EOL, $msg); return $message;}/*** 发布消息* @return string*/public function publish(): string{ try {if (empty($this->mqttConfig)) {throw new Exception("MQTT配置错误,请联系管理员");}$clientId = $this->mqttConfig['client_id'] . '-pub-'.rand(1000, 9999);$mqttService = new MqttClient($this->mqttConfig['server'], $this->mqttConfig['port'], $clientId, $this->mqttConfig['version']);$mqttService->connect($this->connectionSettings, !empty($this->mqttConfig['clean_session']));$mqttConfig = config('mqtt');$data = $mqttConfig['dev']['payload'];if ($mqttConfig['test_data_type'] == 'device') {$data = ['device_id' => $mqttConfig['test_device_id']];}/*** $qualityOfService* QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。* QoS 1:消息至少传送一次。* QoS 2:消息只传送一次。*/$qualityOfService = 0;for ($i = 0; $i< 1000; $i++) {//$data['id'] = $i;//$data['ts'] = date('Y-m-d H:i:s');$mqttService->publish($this->mqttConfig['pub_topic'],json_encode($data, JSON_UNESCAPED_UNICODE),$this->mqttConfig['qos'],true);$log = $this->mqttConfig['pub_topic'] . PHP_EOL . json_encode($data, JSON_UNESCAPED_UNICODE);Log::channel('mqtt')->write($log, 'publish');sleep(1);}//$mqttService->loop(true);$mqttService->disconnect(); }catch (\Exception $e) {trace($e->getMessage(), __METHOD__);}$message = implode(PHP_EOL, $msg);return $message;}
}
