Laravel 12 基于 EMQX 实现 MQTT 消息发送与接收
Laravel 12 基于 EMQX 实现 MQTT 消息发送与接收
要在 Laravel 12 中实现基于 EMQX 的 MQTT 消息发送与接收,你可以按照以下步骤操作:
1. 安装必要的依赖包
首先安装 MQTT 客户端库:
composer require php-mqtt/client
2. 配置 EMQX 连接
在 .env
文件中添加 EMQX 连接配置:
MQTT_HOST=your_emqx_server_address
MQTT_PORT=1883
MQTT_CLIENT_ID=laravel_mqtt_client
MQTT_USERNAME=your_username
MQTT_PASSWORD=your_password
MQTT_KEEPALIVE=60
MQTT_CLEAN_SESSION=true
3. 创建 MQTT 服务类
创建 app/Services/MqttService.php
:
<?phpnamespace App\Services;use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;class MqttService
{protected $client;public function __construct(){$host = env('MQTT_HOST');$port = env('MQTT_PORT');$clientId = env('MQTT_CLIENT_ID');$this->client = new MqttClient($host, $port, $clientId);}public function connect(){$connectionSettings = (new ConnectionSettings)->setUsername(env('MQTT_USERNAME'))->setPassword(env('MQTT_PASSWORD'))->setKeepAliveInterval(env('MQTT_KEEPALIVE'))->setCleanSession(env('MQTT_CLEAN_SESSION'));$this->client->connect($connectionSettings, true);return $this;}public function publish($topic, $message){$this->client->publish($topic, $message);return $this;}public function subscribe($topic, callable $callback){$this->client->subscribe($topic, $callback);return $this;}public function disconnect(){$this->client->disconnect();}public function loop(){$this->client->loop(true);}
}
4. 创建 MQTT 命令
创建订阅消息的命令:
php artisan make:command SubscribeMqtt
编辑 app/Console/Commands/SubscribeMqtt.php
:
<?phpnamespace App\Console\Commands;use Illuminate\Console\Command;
use App\Services\MqttService;class SubscribeMqtt extends Command
{protected $signature = 'mqtt:subscribe {topic}';protected $description = 'Subscribe to MQTT topic';public function handle(){$topic = $this->argument('topic');$mqtt = new MqttService();$mqtt->connect();$mqtt->subscribe($topic, function ($topic, $message) {$this->info("Received message on topic [{$topic}]: {$message}");// 在这里处理接收到的消息});$mqtt->loop();}
}
5. 使用 MQTT 服务
发送消息
在控制器或其他地方发送消息:
use App\Services\MqttService;// 发送消息
$mqtt = new MqttService();
$mqtt->connect()->publish('test/topic', 'Hello from Laravel')->disconnect();
接收消息
启动订阅命令:
php artisan mqtt:subscribe test/topic
6. 高级用法
使用队列处理接收的消息
修改订阅命令的回调:
$mqtt->subscribe($topic, function ($topic, $message) {\App\Jobs\ProcessMqttMessage::dispatch($topic, $message);
});
然后创建对应的队列任务:
php artisan make:job ProcessMqttMessage
使用事件系统
创建 MQTT 消息事件:
php artisan make:event MqttMessageReceived
然后在订阅回调中触发事件:
$mqtt->subscribe($topic, function ($topic, $message) {event(new \App\Events\MqttMessageReceived($topic, $message));
});
7. 安全性考虑
- 使用 TLS/SSL 加密连接(EMQX 默认支持)
- 在 EMQX 中设置 ACL 规则限制客户端权限
- 使用认证插件(如 JWT、MySQL、Redis 认证)
8. 部署建议
- 使用 Supervisor 管理订阅进程
- 考虑使用 EMQX 集群提高可用性
- 监控 EMQX 和 Laravel 应用的资源使用情况