php实现火山引擎 【双向流式websocket-V3-支持复刻2.0/混音mix】开箱即用,可用于各种PHP框架。
最近需要对接一个 火山引擎的一个产品,叫:双向流式websocket-V3-支持复刻2.0/混音mix。
要求是 编程语言:PHP。
官方文档地址:
https://www.volcengine.com/docs/6561/1329505
一般像大厂的产品都会自带sdk 代码。对于开发者来说,我们直接照着sdk写即可,但是 现在有很多产品 都不写php的sdk了。所以要实现需求,就只能我们自己去研究 编写。
下面的php相关的代码 是参考官网的别的语言的编写的。
我这里直接分享的是两个php文件,大家自行修改到自己使用的框架里。
目前只支持 中文。英文自行扩展
依赖安装:
安装WebSocket依赖
composer require textalk/websocket
依赖开源地址:
https://github.com/Textalk/websocket-php
代码1: Tts 类
<?php
namespace app\controller;use app\service\VolcengineProtocol;class Tts
{// 火山引擎API参数private $appid = '';private $access_token = '';private $voice_type = '';private $encoding = '';private $endpoint = 'wss://openspeech.bytedance.com/api/v3/tts/bidirection';/*** 根据语音类型获取资源ID*/private function voiceToResourceId($voice){if (strpos($voice, 'S_') === 0) {return 'volc.megatts.default';}return 'volc.service_type.10029';}/*** 生成UUID*/private function generateUuid(){return sprintf('%04x%04x-%04x-%04x-%04x-%04x%04x%04x',mt_rand(0, 0xffff), mt_rand(0, 0xffff),mt_rand(0, 0xffff),mt_rand(0, 0x0fff) | 0x4000,mt_rand(0, 0x3fff) | 0x8000,mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff));}/*** 创建WebSocket客户端*/private function createWebSocketClient(){$headers = ['X-Api-App-Key' => $this->appid,'X-Api-Access-Key' => $this->access_token,'X-Api-Resource-Id' => $this->voiceToResourceId($this->voice_type),'X-Api-Connect-Id' => $this->generateUuid(),];$options = ['headers' => $headers,'timeout' => 180, // 进一步增加超时时间到180秒'filter' => ['text', 'binary'], // 只接收文本和二进制消息'fragment_size' => 4096, // 设置分片大小'persistent' => true // 启用持久连接];return new \WebSocket\Client($this->endpoint, $options);}/*** 发送消息到WebSocket*/private function sendMessage($client, $message){$data = $message->marshal();print_r('发送消息: ' . $message->__toString());$client->send($data, 'binary');}/*** 接收WebSocket消息*/private function receiveMessage($client){$maxRetries = 5; // 增加最大重试次数$retryCount = 0;while ($retryCount < $maxRetries) {try {$data = $client->receive();if ($data === null || $data === '') {$retryCount++;print_r("接收到空消息,重试第 {$retryCount} 次");usleep(500000); // 增加等待时间到500mscontinue;}$message = VolcengineProtocol::newMessageFromBytes($data);print_r('接收消息: ' . $message->__toString());return $message;} catch (\WebSocket\ConnectionException $e) {$retryCount++;print_r("接收消息异常,重试第 {$retryCount} 次: " . $e->getMessage());if ($retryCount >= $maxRetries) {// 如果是连接问题,尝试重新连接if (strpos($e->getMessage(), 'Connection') !== false) {print_r("检测到连接问题,尝试重新建立连接");return null; // 返回null表示需要重新连接}throw new \Exception('接收消息失败: ' . $e->getMessage());}usleep(500000); // 增加等待时间到500ms} catch (\Exception $e) {$retryCount++;print_r("接收消息异常,重试第 {$retryCount} 次: " . $e->getMessage());if ($retryCount >= $maxRetries) {throw new \Exception('接收到空消息');}usleep(500000); // 增加等待时间到500ms}}throw new \Exception('接收消息超时');}/*** 等待特定事件消息*/private function waitForEvent($client, $msgType, $eventType){while (true) {$message = $this->receiveMessage($client);if ($message->msgType == $msgType && $message->eventType == $eventType) {return $message;}if ($message->msgType == VolcengineProtocol::MSG_TYPE_ERROR) {throw new \Exception('服务器返回错误: ' . $message->payload);}}}/*** TTS测试接口*/public function tts(){try {// 获取请求参数$text = Request::param('text', '你好,我是Json,你知道万物OOP网站吗?如果你在学习编程,这个网站也许能帮助到你~网站地址:www.wwwoop.com! ');print_r('开始TTS处理,文本: ' . $text);// 创建WebSocket连接$client = $this->createWebSocketClient();print_r('WebSocket连接已建立');// 开始连接$startConnMsg = VolcengineProtocol::createStartConnection();$this->sendMessage($client, $startConnMsg);// 等待连接建立$this->waitForEvent($client, VolcengineProtocol::MSG_TYPE_FULL_SERVER_RESPONSE, VolcengineProtocol::EVENT_TYPE_CONNECTION_STARTED);print_r('连接已建立');// 处理文本转语音$audioData = $this->processTTS($client, $text);// 结束连接$finishConnMsg = VolcengineProtocol::createFinishConnection();$this->sendMessage($client, $finishConnMsg);// 等待连接结束$this->waitForEvent($client, VolcengineProtocol::MSG_TYPE_FULL_SERVER_RESPONSE, VolcengineProtocol::EVENT_TYPE_CONNECTION_FINISHED);print_r('连接已结束');$client->close();if (empty($audioData)) {return json_encode(['code' => 1, 'msg' => '未接收到音频数据']);}// 保存音频文件$fileName = $this->voice_type . '_' . date('YmdHis') . '.' . $this->encoding;$filePath = './audio/' . $fileName;// 确保目录存在$audioDir = dirname($filePath);if (!is_dir($audioDir)) {mkdir($audioDir, 0755, true);}file_put_contents($filePath, $audioData);print_r('音频文件已保存: ' . $filePath . ', 大小: ' . strlen($audioData) . ' 字节');return json_encode(['code' => 0,'msg' => 'TTS处理成功','data' => ['file_name' => $fileName,'file_size' => strlen($audioData),'download_url' => '/audio/' . $fileName]]);} catch (\Exception $e) {print_r('TTS处理失败: ' . $e->getMessage());return json_encode(['code' => 1, 'msg' => 'TTS处理失败: ' . $e->getMessage()]);}}/*** 处理TTS转换 - 按句子分割处理.*/private function processTTS($client, $text){// 按句号分割文本,$sentences = explode('。', $text);$allAudioData = '';$audioReceived = false;// 构建基础请求参数$baseRequest = ['user' => ['uid' => $this->generateUuid()],'namespace' => 'BidirectionalTTS','req_params' => ['speaker' => $this->voice_type,'audio_params' => ['format' => $this->encoding,'sample_rate' => 24000,'enable_timestamp' => true],'additions' => json_encode(['disable_markdown_filter' => false])]];print_r('开始处理 ' . count($sentences) . ' 个句子');// 逐句处理for ($i = 0; $i < count($sentences); $i++) {$sentence = trim($sentences[$i]);if (empty($sentence)) {continue;}print_r('处理第 ' . ($i + 1) . ' 个句子: ' . mb_substr($sentence, 0, 20) . '...');$maxRetries = 3;$retryCount = 0;$success = false;while ($retryCount < $maxRetries && !$success) {try {// 为每个句子创建独立的session$sessionID = $this->generateUuid();// 开始会话$startReq = ['user' => $baseRequest['user'],'event' => VolcengineProtocol::EVENT_TYPE_START_SESSION,'namespace' => $baseRequest['namespace'],'req_params' => $baseRequest['req_params']];$startSessionMsg = VolcengineProtocol::createStartSession(json_encode($startReq), $sessionID);$this->sendMessage($client, $startSessionMsg);// 等待会话开始$response = $this->waitForEvent($client, VolcengineProtocol::MSG_TYPE_FULL_SERVER_RESPONSE, VolcengineProtocol::EVENT_TYPE_SESSION_STARTED);if ($response === null) {throw new \Exception('需要重新连接');}print_r('句子 ' . ($i + 1) . ' 会话已开始');// 发送当前句子的文本请求$this->sendTextRequests($client, $sentence, $baseRequest, $sessionID);// 结束会话$finishSessionMsg = VolcengineProtocol::createFinishSession($sessionID);$this->sendMessage($client, $finishSessionMsg);// 接收当前句子的音频数据$sentenceAudioData = $this->receiveAudioData($client);if (!empty($sentenceAudioData)) {$allAudioData .= $sentenceAudioData;$audioReceived = true;print_r('句子 ' . ($i + 1) . ' 音频数据接收完成,大小: ' . strlen($sentenceAudioData) . ' 字节');}$success = true;} catch (\Exception $e) {$retryCount++;print_r("句子 " . ($i + 1) . " 处理失败,重试第 {$retryCount} 次: " . $e->getMessage());if ($retryCount >= $maxRetries) {throw new \Exception("句子 " . ($i + 1) . " 处理失败: " . $e->getMessage());}sleep(1); // 等待1秒后重试}}}if (!$audioReceived) {throw new \Exception('未接收到任何音频数据');}print_r('所有句子处理完成,总音频大小: ' . strlen($allAudioData) . ' 字节');return $allAudioData;}/*** 发送文本请求 - 按字符逐个发送*/private function sendTextRequests($client, $text, $request, $sessionID){// 将文本按字符发送$chars = mb_str_split($text, 1, 'UTF-8');print_r('开始发送 ' . count($chars) . ' 个字符');foreach ($chars as $index => $char) {// 跳过空字符if (trim($char) === '') {continue;}$request['req_params']['text'] = $char;$ttsReq = ['user' => $request['user'],'event' => VolcengineProtocol::EVENT_TYPE_TASK_REQUEST,'namespace' => $request['namespace'],'req_params' => $request['req_params']];$taskRequestMsg = VolcengineProtocol::createTaskRequest(json_encode($ttsReq), $sessionID);$this->sendMessage($client, $taskRequestMsg);// 短暂延迟usleep(5000); // 5毫秒}print_r('所有字符发送完成');}/*** 接收音频数据*/private function receiveAudioData($client){$audioData = '';while (true) {$message = $this->receiveMessage($client);switch ($message->msgType) {case VolcengineProtocol::MSG_TYPE_FULL_SERVER_RESPONSE:// 处理服务器响应break;case VolcengineProtocol::MSG_TYPE_AUDIO_ONLY_SERVER:// 接收音频数据$audioData .= $message->payload;break;case VolcengineProtocol::MSG_TYPE_ERROR:throw new \Exception('服务器返回错误: ' . $message->payload);default:print_r('未知消息类型: ' . $message->getMsgTypeName());break;}// 检查会话是否结束if ($message->eventType == VolcengineProtocol::EVENT_TYPE_SESSION_FINISHED) {print_r('会话已结束');break;}}return $audioData;}}
代码2:VolcengineProtocol类
<?php
namespace app\service;/*** 火山引擎双向TTS协议类* 实现消息的编解码功能*/
class VolcengineProtocol
{// 事件类型常量const EVENT_TYPE_NONE = 0;const EVENT_TYPE_START_CONNECTION = 1;const EVENT_TYPE_FINISH_CONNECTION = 2;const EVENT_TYPE_CONNECTION_STARTED = 50;const EVENT_TYPE_CONNECTION_FAILED = 51;const EVENT_TYPE_CONNECTION_FINISHED = 52;const EVENT_TYPE_START_SESSION = 100;const EVENT_TYPE_CANCEL_SESSION = 101;const EVENT_TYPE_FINISH_SESSION = 102;const EVENT_TYPE_SESSION_STARTED = 150;const EVENT_TYPE_SESSION_CANCELED = 151;const EVENT_TYPE_SESSION_FINISHED = 152;const EVENT_TYPE_SESSION_FAILED = 153;const EVENT_TYPE_TASK_REQUEST = 200;const EVENT_TYPE_TTS_RESPONSE = 352;const EVENT_TYPE_TTS_ENDED = 359;// 消息类型常量const MSG_TYPE_INVALID = 0;const MSG_TYPE_FULL_CLIENT_REQUEST = 0b1;const MSG_TYPE_AUDIO_ONLY_CLIENT = 0b10;const MSG_TYPE_FULL_SERVER_RESPONSE = 0b1001;const MSG_TYPE_AUDIO_ONLY_SERVER = 0b1011;const MSG_TYPE_FRONT_END_RESULT_SERVER = 0b1100;const MSG_TYPE_ERROR = 0b1111;// 消息类型标志位常量const MSG_TYPE_FLAG_NO_SEQ = 0;const MSG_TYPE_FLAG_POSITIVE_SEQ = 0b1;const MSG_TYPE_FLAG_LAST_NO_SEQ = 0b10;const MSG_TYPE_FLAG_NEGATIVE_SEQ = 0b11;const MSG_TYPE_FLAG_WITH_EVENT = 0b100;// 版本常量const VERSION_1 = 1;// 头部大小常量const HEADER_SIZE_4 = 1;const HEADER_SIZE_8 = 2;const HEADER_SIZE_12 = 3;const HEADER_SIZE_16 = 4;// 序列化类型常量const SERIALIZATION_RAW = 0;const SERIALIZATION_JSON = 0b1;const SERIALIZATION_THRIFT = 0b11;const SERIALIZATION_CUSTOM = 0b1111;// 压缩类型常量const COMPRESSION_NONE = 0;const COMPRESSION_GZIP = 0b1;const COMPRESSION_CUSTOM = 0b1111;/*** 消息结构*/public $version;public $headerSize;public $msgType;public $msgTypeFlag;public $serialization;public $compression;public $eventType;public $sessionID;public $connectID;public $sequence;public $errorCode;public $payload;/*** 构造函数*/public function __construct($msgType = null, $flag = null){if ($msgType !== null && $flag !== null) {$this->msgType = $msgType;$this->msgTypeFlag = $flag;$this->version = self::VERSION_1;$this->headerSize = self::HEADER_SIZE_4;$this->serialization = self::SERIALIZATION_JSON;$this->compression = self::COMPRESSION_NONE;$this->eventType = self::EVENT_TYPE_NONE;$this->sessionID = '';$this->connectID = '';$this->sequence = 0;$this->errorCode = 0;$this->payload = '';}}/*** 创建新消息*/public static function newMessage($msgType, $flag){return new self($msgType, $flag);}/*** 从字节数据创建消息*/public static function newMessageFromBytes($data){if (strlen($data) < 3) {throw new \Exception("数据太短: 期望至少3字节,得到" . strlen($data));}$typeAndFlag = ord($data[1]);$msgType = ($typeAndFlag >> 4) & 0x0F;$flag = $typeAndFlag & 0x0F;$msg = new self($msgType, $flag);$msg->unmarshal($data);return $msg;}/*** 编码消息为字节数据*/public function marshal(){$header = [];$header[0] = ($this->version << 4) | $this->headerSize;$header[1] = ($this->msgType << 4) | $this->msgTypeFlag;$header[2] = ($this->serialization << 4) | $this->compression;$headerSize = 4 * $this->headerSize;$padding = $headerSize - count($header);if ($padding > 0) {$header = array_merge($header, array_fill(0, $padding, 0));}$data = pack('C*', ...$header);// 写入可选字段if ($this->msgTypeFlag == self::MSG_TYPE_FLAG_WITH_EVENT) {$data .= pack('N', $this->eventType); // 写入事件类型// 写入SessionID(除了连接相关事件)if (!in_array($this->eventType, [self::EVENT_TYPE_START_CONNECTION,self::EVENT_TYPE_FINISH_CONNECTION,self::EVENT_TYPE_CONNECTION_STARTED,self::EVENT_TYPE_CONNECTION_FAILED,self::EVENT_TYPE_CONNECTION_FINISHED])) {$sessionIDLen = strlen($this->sessionID);$data .= pack('N', $sessionIDLen);$data .= $this->sessionID;}}// 写入序列号(如果需要)if (in_array($this->msgType, [self::MSG_TYPE_FULL_CLIENT_REQUEST,self::MSG_TYPE_FULL_SERVER_RESPONSE,self::MSG_TYPE_FRONT_END_RESULT_SERVER,self::MSG_TYPE_AUDIO_ONLY_CLIENT,self::MSG_TYPE_AUDIO_ONLY_SERVER]) && in_array($this->msgTypeFlag, [self::MSG_TYPE_FLAG_POSITIVE_SEQ,self::MSG_TYPE_FLAG_NEGATIVE_SEQ])) {$data .= pack('N', $this->sequence);}// 写入错误码(如果是错误消息)if ($this->msgType == self::MSG_TYPE_ERROR) {$data .= pack('N', $this->errorCode);}// 写入载荷$payloadLen = strlen($this->payload);$data .= pack('N', $payloadLen);$data .= $this->payload;return $data;}/*** 解码字节数据为消息*/public function unmarshal($data){$pos = 0;// 读取头部$versionAndHeaderSize = ord($data[$pos++]);$this->version = ($versionAndHeaderSize >> 4) & 0x0F;$this->headerSize = $versionAndHeaderSize & 0x0F;$typeAndFlag = ord($data[$pos++]);$this->msgType = ($typeAndFlag >> 4) & 0x0F;$this->msgTypeFlag = $typeAndFlag & 0x0F;$serializationCompression = ord($data[$pos++]);$this->serialization = ($serializationCompression >> 4) & 0x0F;$this->compression = $serializationCompression & 0x0F;// 跳过头部填充$headerSize = 4 * $this->headerSize;$pos = $headerSize;// 读取序列号(如果需要)if (in_array($this->msgType, [self::MSG_TYPE_FULL_CLIENT_REQUEST,self::MSG_TYPE_FULL_SERVER_RESPONSE,self::MSG_TYPE_FRONT_END_RESULT_SERVER,self::MSG_TYPE_AUDIO_ONLY_CLIENT,self::MSG_TYPE_AUDIO_ONLY_SERVER]) && in_array($this->msgTypeFlag, [self::MSG_TYPE_FLAG_POSITIVE_SEQ,self::MSG_TYPE_FLAG_NEGATIVE_SEQ])) {$this->sequence = unpack('N', substr($data, $pos, 4))[1];$pos += 4;}// 读取错误码(如果是错误消息)if ($this->msgType == self::MSG_TYPE_ERROR) {$this->errorCode = unpack('N', substr($data, $pos, 4))[1];$pos += 4;}// 读取事件相关字段if ($this->msgTypeFlag == self::MSG_TYPE_FLAG_WITH_EVENT) {$this->eventType = unpack('N', substr($data, $pos, 4))[1];$pos += 4;// 读取SessionID(除了连接相关事件)if (!in_array($this->eventType, [self::EVENT_TYPE_START_CONNECTION,self::EVENT_TYPE_FINISH_CONNECTION,self::EVENT_TYPE_CONNECTION_STARTED,self::EVENT_TYPE_CONNECTION_FAILED,self::EVENT_TYPE_CONNECTION_FINISHED])) {$sessionIDLen = unpack('N', substr($data, $pos, 4))[1];$pos += 4;if ($sessionIDLen > 0) {$this->sessionID = substr($data, $pos, $sessionIDLen);$pos += $sessionIDLen;}}// 读取ConnectID(连接相关事件)if (in_array($this->eventType, [self::EVENT_TYPE_CONNECTION_STARTED,self::EVENT_TYPE_CONNECTION_FAILED,self::EVENT_TYPE_CONNECTION_FINISHED])) {$connectIDLen = unpack('N', substr($data, $pos, 4))[1];$pos += 4;if ($connectIDLen > 0) {$this->connectID = substr($data, $pos, $connectIDLen);$pos += $connectIDLen;}}}// 读取载荷$payloadLen = unpack('N', substr($data, $pos, 4))[1];$pos += 4;if ($payloadLen > 0) {$this->payload = substr($data, $pos, $payloadLen);}}/*** 创建开始连接消息*/public static function createStartConnection(){$msg = self::newMessage(self::MSG_TYPE_FULL_CLIENT_REQUEST, self::MSG_TYPE_FLAG_WITH_EVENT);$msg->eventType = self::EVENT_TYPE_START_CONNECTION;$msg->payload = '{}';return $msg;}/*** 创建结束连接消息*/public static function createFinishConnection(){$msg = self::newMessage(self::MSG_TYPE_FULL_CLIENT_REQUEST, self::MSG_TYPE_FLAG_WITH_EVENT);$msg->eventType = self::EVENT_TYPE_FINISH_CONNECTION;$msg->payload = '{}';return $msg;}/*** 创建开始会话消息*/public static function createStartSession($payload, $sessionID){$msg = self::newMessage(self::MSG_TYPE_FULL_CLIENT_REQUEST, self::MSG_TYPE_FLAG_WITH_EVENT);$msg->eventType = self::EVENT_TYPE_START_SESSION;$msg->sessionID = $sessionID;$msg->payload = $payload;return $msg;}/*** 创建结束会话消息*/public static function createFinishSession($sessionID){$msg = self::newMessage(self::MSG_TYPE_FULL_CLIENT_REQUEST, self::MSG_TYPE_FLAG_WITH_EVENT);$msg->eventType = self::EVENT_TYPE_FINISH_SESSION;$msg->sessionID = $sessionID;$msg->payload = '{}';return $msg;}/*** 创建任务请求消息*/public static function createTaskRequest($payload, $sessionID){$msg = self::newMessage(self::MSG_TYPE_FULL_CLIENT_REQUEST, self::MSG_TYPE_FLAG_WITH_EVENT);$msg->eventType = self::EVENT_TYPE_TASK_REQUEST;$msg->sessionID = $sessionID;$msg->payload = $payload;return $msg;}/*** 获取事件类型名称*/public function getEventTypeName(){$eventNames = [self::EVENT_TYPE_NONE => 'None',self::EVENT_TYPE_START_CONNECTION => 'StartConnection',self::EVENT_TYPE_FINISH_CONNECTION => 'FinishConnection',self::EVENT_TYPE_CONNECTION_STARTED => 'ConnectionStarted',self::EVENT_TYPE_CONNECTION_FAILED => 'ConnectionFailed',self::EVENT_TYPE_CONNECTION_FINISHED => 'ConnectionFinished',self::EVENT_TYPE_START_SESSION => 'StartSession',self::EVENT_TYPE_CANCEL_SESSION => 'CancelSession',self::EVENT_TYPE_FINISH_SESSION => 'FinishSession',self::EVENT_TYPE_SESSION_STARTED => 'SessionStarted',self::EVENT_TYPE_SESSION_CANCELED => 'SessionCanceled',self::EVENT_TYPE_SESSION_FINISHED => 'SessionFinished',self::EVENT_TYPE_SESSION_FAILED => 'SessionFailed',self::EVENT_TYPE_TASK_REQUEST => 'TaskRequest',self::EVENT_TYPE_TTS_RESPONSE => 'TTSResponse',self::EVENT_TYPE_TTS_ENDED => 'TTSEnded',];return $eventNames[$this->eventType] ?? 'Unknown(' . $this->eventType . ')';}/*** 获取消息类型名称*/public function getMsgTypeName(){$msgNames = [self::MSG_TYPE_INVALID => 'Invalid',self::MSG_TYPE_FULL_CLIENT_REQUEST => 'FullClientRequest',self::MSG_TYPE_AUDIO_ONLY_CLIENT => 'AudioOnlyClient',self::MSG_TYPE_FULL_SERVER_RESPONSE => 'FullServerResponse',self::MSG_TYPE_AUDIO_ONLY_SERVER => 'AudioOnlyServer',self::MSG_TYPE_FRONT_END_RESULT_SERVER => 'FrontEndResultServer',self::MSG_TYPE_ERROR => 'Error',];return $msgNames[$this->msgType] ?? 'Unknown(' . $this->msgType . ')';}/*** 转换为字符串表示*/public function __toString(){$result = $this->getMsgTypeName() . ', ' . $this->getEventTypeName();if (in_array($this->msgTypeFlag, [self::MSG_TYPE_FLAG_POSITIVE_SEQ, self::MSG_TYPE_FLAG_NEGATIVE_SEQ])) {$result .= ', Sequence: ' . $this->sequence;}if ($this->msgType == self::MSG_TYPE_ERROR) {$result .= ', ErrorCode: ' . $this->errorCode;}if (in_array($this->msgType, [self::MSG_TYPE_AUDIO_ONLY_SERVER, self::MSG_TYPE_AUDIO_ONLY_CLIENT])) {$result .= ', PayloadSize: ' . strlen($this->payload);} else {$result .= ', Payload: ' . $this->payload;}return $result;}
}
测试方法 在Tts类里 :tts();
效果: