php 对接mqtt 完整版本,订阅消息,发送消息
首先打开链接如何在 PHP 项目中使用 MQTT
根据文章让所用依赖安装一下: composer require php-mqtt/client
安装之后弄一个部署
之后在工具里边可以相应链接上
接下来是代码:
/***
* 订阅消息
* @return void
* @throws \PhpMqtt\Client\Exceptions\ConfigurationInvalidException
* @throws \PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException
* @throws \PhpMqtt\Client\Exceptions\DataTransferException
* @throws \PhpMqtt\Client\Exceptions\InvalidMessageException
* @throws \PhpMqtt\Client\Exceptions\MqttClientException
* @throws \PhpMqtt\Client\Exceptions\ProtocolNotSupportedException
* @throws \PhpMqtt\Client\Exceptions\ProtocolViolationException
* @throws \PhpMqtt\Client\Exceptions\RepositoryException
*/
public function dingyue()
{
$server = '*********.aliyun.emqxcloud.cn';//连接地址
$port = 1883;
$clientId = rand(5, 15);
$username = '用户名';
$password = '密码';
$clean_session = false;
$mqtt_version = MqttClient::MQTT_3_1_1;
$connectionSettings = (new ConnectionSettings)
->setUsername($username)
->setPassword($password)
->setKeepAliveInterval(60)
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId, $mqtt_version);
$mqtt->connect($connectionSettings, $clean_session);
//订阅消息
// 下边是轮询订阅,订阅多个设备
// 查数据库中设备列表
$list = DeviceBall::where(['topic'=>['<>','']])->field('topic')->select();
// 获取topic
$list = array_column($list, 'topic');
//去重
$list = array_unique($list);
// dump($list);exit;
foreach ($list as $i) {
$mqtt->subscribe('CIOT/2000/DEVICE/' . $i, function ($topic,
$message) use ($mqtt, $i) {
printf("Received message on topic [%s]: %s\n", $topic, $message);
$message = json_decode($message, true);
if (isset($message['Body']['Info']['Msg']['Pass'])) {
if ($message['Body']['Info']['Msg']['Pass'] == 1) {
// 关闭订单
$ball_id = DeviceBall::where(['topic' => $i])->find();
$order_no = BallOrder::where(['ball_id' => $ball_id['id']])->order('id DESC')->where('status', 1)->value('order_no');
$this->confirmorder($order_no);
//语音指令,提前关闭,电源关闭
$payload = array(
'Header' => [
"Code" => '1004',
"Time" => time() . '000',
"Sign" => "lvzaina"
],
'Body' => [
"MsgId" => time(),
'Cmd' => "Voice",
'Index' => 41
]
);
$mqtt->publish(
// topic
'CIOT/2000/HOST/' . $i,
// payload
json_encode($payload),
// qos
0,
// retain
true
);
//可以关闭锁球器
//发送关闭指令
$payload = array(
'Header' => [
"Code" => '1004',
"Time" => time() . '000',
"Sign" => "lvzaina"
],
'Body' => [
"Mode" => "0",
"MsgId" => time(),
'Cmd' => "Order",
'ID' => 1,
'Delay' => 0,
'Type' => 0,
"DCL" => 0,
]
);
$mqtt->publish(
// topic
'CIOT/2000/HOST/' . $i,
// payload
json_encode($payload),
// qos
0,
// retain
true
);
} else {
//异常,播报语音,不关闭锁球器,关锁失败请检车
$payload = array(
'Header' => [
"Code" => '1004',
"Time" => time() . '000',
"Sign" => "lvzaina"
],
'Body' => [
"MsgId" => time(),
'Cmd' => "Voice",
'Index' => 45
]
);
$mqtt->publish(
// topic
'CIOT/2000/HOST/' . $i,
// payload
json_encode($payload),
// qos
0,
// retain
true
);
}
}
}, 0);
}
$mqtt->loop(true);
}
发送指令
static function publish()
{
$server = '********ted.aliyun.emqxcloud.cn';
$port = 1883;
$clientId = rand(5, 15);
$username = '用户名';
$password = '密码';
$clean_session = false;
$mqtt_version = MqttClient::MQTT_3_1_1;
$connectionSettings = (new ConnectionSettings)
->setUsername($username)
->setPassword($password)
->setKeepAliveInterval(60)
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId, $mqtt_version);
$mqtt->connect($connectionSettings, $clean_session);
$payload = array(
'Header' => [
"Code" => '1004',
"Time" => time() . '000',
"Sign" => "lvzaina"
],
'Body' => [
"Mode" => "0",
"MsgId" => time(),
'Cmd' => "Check",
]
);
$mqtt->publish(
// topic
'CIOT/2000/HOST/RE2BBEEC78',//推送的地址
// payload
json_encode($payload),
// qos
0,
// retain
true
);
}
上边是代码,下边事最重要的
1首先如果在本地测试的话,新建一个.bat文件,放在根目录
.bat文件内容,然后双击文件执行,这里执行的是上边订阅消息的方法,执行之后,可以在小黑窗里边看到打印的信息
chcp 65001
@echo off
:loop
E:\phpstudy_pro\Extensions\php\php7.4.3nts\php.exe public/index.php api/Billiards/dingyue
ping -n 3 127.1 >nul
goto loop
pause
2,如果是在linux中执行
新建一个.sh文件
.sh文件内容
#!/bin/bash
while true
do
# 这里是要执行的代码
/www/server/php/72/bin/php public/index.php api/Billiards/dingyue
# 这里是要执行的代码
endDate=`date +"%Y-%m-%d %H:%M:%S"`
echo "At [$endDate] Successful"
echo "----------------------------------------------------------------------------"
sleep 3
done
开启列队:运行队列的代码
nohup bash run-api.sh >> run-api.out &
然后敲回车敲两下
查看执行文件返回内容
tail -f run-api.out
执行前,要看下是否已经在运行:
ps aux | grep bash
如果已经在执行
结束命令:
kill -9 进程号(256300)
然后再执行ps aux | grep bash 看是否已经停止,重新启动即可。
这样一个.sh文件就启动了