Thinkphp6实现websocket
项目需要连接一台自动售货机,售货机要求两边用websocket连接,监听9997端口。本文实现了一个基于PHP的WebSocket服务器,用于连接自动售货机,支持start/stop/restart命令操作
1.新建文件
新建文件 /command/socket.php
<?php
namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;class TestSocket extends Command
{public $server;protected static $pidFile = '/tmp/test_socket.pid';protected static $running = true;protected function configure(){$this->setName('test:socket')->setDescription('WebSocket server for JieLe')->addArgument('action', Argument::OPTIONAL, 'start|stop|restart', 'start');}protected function execute(Input $input, Output $output){$action = $input->getArgument('action');switch ($action) {case 'start':$this->startServer($output);break;case 'stop':$this->stopServer($output);break;case 'restart':$this->stopServer($output);sleep(1); // 等待1秒确保服务停止$this->startServer($output);break;default:$output->writeln("Invalid action. Use start|stop|restart");break;}}protected function startServer(Output $output){// 检查是否已运行if (file_exists(self::$pidFile)) {$pid = file_get_contents(self::$pidFile);if (posix_getpgid($pid)) {$output->writeln("Server is already running (PID: {$pid})");return;}}// 创建TCP Socket服务器$this->server = stream_socket_server("tcp://0.0.0.0:9997", $errno, $errstr);if (!$this->server) {$output->error("Failed to start server: $errstr ($errno)");return;}// 保存PIDfile_put_contents(self::$pidFile, getmypid());// 注册信号处理器pcntl_signal(SIGTERM, function() {self::$running = false;});pcntl_signal(SIGINT, function() {self::$running = false;});$output->info("WebSocket server running on ws://0.0.0.0:9997");$clients = [];while (self::$running) {// 处理信号pcntl_signal_dispatch();$read = array_merge([$this->server], $clients);$write = $except = null;// 使用stream_select监听活动连接if (stream_select($read, $write, $except, 5) > 0) {// 处理新连接if (in_array($this->server, $read)) {$client = stream_socket_accept($this->server);$clients[] = $client;}// 处理客户端消息foreach ($read as $socket) {if ($socket === $this->server) continue;$data = fread($socket, 1024);if ($data === false || $data === '') {// 客户端断开$key = array_search($socket, $clients);unset($clients[$key]);fclose($socket);continue;}// WebSocket握手处理if (strpos($data, 'Upgrade: websocket') !== false) {$this->handshake($socket, $data);continue;}// 处理WebSocket帧$decoded = $this->decodeFrame($data);//$decoded = $this->main($decoded); 实际处理业务的函数// 发送回复$response = $decoded;$frame = $this->encodeFrame($response);fwrite($socket, $frame);}}}// 清理工作foreach ($clients as $client) {fclose($client);}fclose($this->server);unlink(self::$pidFile);$output->writeln("Server stopped");}protected function stopServer(Output $output){if (!file_exists(self::$pidFile)) {$output->writeln("Server is not running");return;}$pid = file_get_contents(self::$pidFile);if (posix_getpgid($pid)) {posix_kill($pid, SIGTERM);$output->writeln("Stopping server (PID: {$pid})...");// 等待进程结束$timeout = 10; // 10秒超时while ($timeout-- > 0 && posix_getpgid($pid)) {sleep(1);}if (posix_getpgid($pid)) {posix_kill($pid, SIGKILL); // 强制杀死}}if (file_exists(self::$pidFile)) {unlink(self::$pidFile);}$output->writeln("Server stopped");}/************************************************** websocket转码相关函数 *******************************************************/// WebSocket帧解码public function decodeFrame($data){$len = ord($data[1]) & 127;if ($len === 126) {$masks = substr($data, 4, 4);$data = substr($data, 8);} elseif ($len === 127) {$masks = substr($data, 10, 4);$data = substr($data, 14);} else {$masks = substr($data, 2, 4);$data = substr($data, 6);}$decoded = '';for ($i = 0; $i < strlen($data); $i++) {$decoded .= $data[$i] ^ $masks[$i % 4];}return $decoded;}// WebSocket帧编码public function encodeFrame($data){$frame = [];$frame[0] = 0x81; // FIN + text frame$len = strlen($data);if ($len <= 125) {$frame[1] = $len;} elseif ($len <= 65535) {$frame[1] = 126;$frame[2] = ($len >> 8) & 255;$frame[3] = $len & 255;} else {$frame[1] = 127;for ($i = 0; $i < 8; $i++) {$frame[$i + 2] = ($len >> (8 * (7 - $i))) & 255;}}$frame = array_map('chr', $frame);$frame = implode('', $frame) . $data;return $frame;}// WebSocket握手处理public function handshake($socket, $headers){if (preg_match('/Sec-WebSocket-Key: (.*)\r\n/', $headers, $match)) {$key = base64_encode(sha1($match[1] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));$response = "HTTP/1.1 101 Switching Protocols\r\n";$response .= "Upgrade: websocket\r\n";$response .= "Connection: Upgrade\r\n";$response .= "Sec-WebSocket-Accept: $key\r\n\r\n";fwrite($socket, $response);}}
}
2.开启服务
docker exec php7.3 php /lnmp/nginx/data/thinkphp6/think test:socket start
docker exec php7.3 php /lnmp/nginx/data/thinkphp6/think test:socket stop
docker exec php7.3 php /lnmp/nginx/data/thinkphp6/think test:socket restart
3.在nginx配置目录,可通过浏览器访问socket业务
server {listen 80;root 省略;#172.18.0.3是提供php服务的iplocation /rtsp {proxy_pass http://172.18.0.3:9997;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_set_header Host $host;proxy_read_timeout 600s;}location ~ \.php$ {#省略}}
4.测试