根据并发和响应延迟,实现语音识别接口自动切换需求
根据并发和响应延迟,语音识别接口自动 切换需求
需求描述:
- 当请求的语音识别的请求数量大于3或者请求语音识别接口3秒不可达无响应,切换备用语音识别接口
科大讯飞语音识别作为备用接口
科大讯飞的API文档:
-
进入讯飞开放平台的网页,注册账号,然后选择语音识别和实时语音听写服务.
- 地址
-
API文档
阅读文档可知,讯飞的语音识别需要传参的音频文件类型是pcm也就是纯音频不带音频头文件这些。
go代码实现
package serviceimport ("context""crypto/hmac""crypto/sha256""encoding/base64""encoding/json""fmt""github.com/gorilla/websocket""io""net/url""os""os/exec""strings""time"
)// 判断文件扩展名并返回文件类型
func getFileExtension(filePath string) string {// 获取文件扩展名(不区分大小写)ext := strings.ToLower(filePath[strings.LastIndex(filePath, ".")+1:])return ext
}// 使用 ffmpeg 将音频文件转换为 pcm 格式
func convertToPCM(inputFile string) ([]byte, error) {// 临时输出文件路径outputFile := "output.pcm"// 使用 ffmpeg 将音频文件转换为 pcm 格式cmd := exec.Command("ffmpeg", "-i", inputFile, "-f", "s16le", "-ac", "1", "-ar", "16000", outputFile)err := cmd.Run()if err != nil {return nil, fmt.Errorf("failed to convert to pcm: %v", err)}// 读取转换后的 pcm 文件convertedFile, err := os.Open(outputFile)if err != nil {return nil, fmt.Errorf("failed to open converted pcm file: %v", err)}defer convertedFile.Close()// 读取整个 pcm 文件内容到字节数组audioData, err := io.ReadAll(convertedFile)if err != nil {return nil, fmt.Errorf("failed to read pcm file: %v", err)}// 删除临时的 pcm 文件err = os.Remove(outputFile)if err != nil {fmt.Println("Warning: failed to remove temporary pcm file")}return audioData, nil}// 识别音频流的函数
func recognizeSpeechFromAudioStream(audioFilePath string) (string, error) {// 判断音频文件格式ext := getFileExtension(audioFilePath)// 如果是 wav 或 mp3 格式,则转换为 pcmvar audioStream []bytevar err errorif ext == "wav" || ext == "mp3" {audioStream, err = convertToPCM(audioFilePath)if err != nil {return "", fmt.Errorf("failed to convert audio file to pcm: %v", err)}} else if ext == "pcm" {// 如果已经是 pcm 格式,直接读取文件audioFile, err := os.Open(audioFilePath)if err != nil {return "", fmt.Errorf("failed to open pcm file: %v", err)}defer audioFile.Close()audioStream, err = io.ReadAll(audioFile)if err != nil {return "", fmt.Errorf("failed to read pcm file: %v", err)}} else {return "", fmt.Errorf("unsupported audio format: %s", ext)}hostUrl := "wss://iat-api.xfyun.cn/v2/iat"appid := "xxxxx"apiSecret := "xxxxxx" // API密钥,需替换为自己的值apiKey := "xxxxxx" // API Key,需替换为自己的值const (STATUS_FIRST_FRAME = 0 // 标识音频的第一帧STATUS_CONTINUE_FRAME = 1 // 标识音频的中间帧STATUS_LAST_FRAME = 2 // 标识音频的最后一帧)// 打开 WebSocket 连接st := time.Now() // 获取当前时间用于后续计算总耗时d := websocket.Dialer{HandshakeTimeout: 30 * time.Second, // 增加握手超时时间}conn, resp, err := d.Dial(assembleAuthUrl(hostUrl, apiKey, apiSecret), nil)if err != nil {return "", fmt.Errorf("failed to connect WebSocket: %v", err)} else if resp.StatusCode != 101 {fmt.Printf("WebSocket connection failed with status code: %d", resp.StatusCode)return "", fmt.Errorf("WebSocket connection failed with status code: %d", resp.StatusCode)}defer conn.Close() // 关闭WebSocket连接// 设定音频数据处理参数var frameSize = 1280var intervel = 40 * time.Millisecond// 开启协程发送数据ctx, cancel := context.WithCancel(context.Background())defer cancel() // 取消上下文,通知相关操作停止// 读取字节流并发送音频数据go func() {status := STATUS_FIRST_FRAME// 定义一个缓存buffer := make([]byte, frameSize)for i := 0; i < len(audioStream); i += frameSize {// 这里加入select语句来处理上下文取消select {case <-ctx.Done():// 如果上下文被取消,结束循环fmt.Println("session end ---")returndefault:}// 读取当前帧数据end := i + frameSizeif end > len(audioStream) {end = len(audioStream) // 如果是最后一帧,修正帧的结束位置status = STATUS_LAST_FRAME // 设置为最后一帧}copy(buffer, audioStream[i:end]) // 将当前帧数据复制到buffer// 根据状态发送音频数据switch status {case STATUS_FIRST_FRAME:frameData := map[string]interface{}{"common": map[string]interface{}{"app_id": appid,},"business": map[string]interface{}{"language": "zh_cn","domain": "iat","accent": "mandarin",},"data": map[string]interface{}{"status": STATUS_FIRST_FRAME,"format": "audio/L16;rate=16000","audio": base64.StdEncoding.EncodeToString(buffer[:end-i]),"encoding": "raw",},}conn.WriteJSON(frameData) // 发送数据status = STATUS_CONTINUE_FRAME // 设置为中间帧time.Sleep(intervel) // 延迟发送下一帧case STATUS_CONTINUE_FRAME:frameData := map[string]interface{}{"data": map[string]interface{}{"status": STATUS_CONTINUE_FRAME,"format": "audio/L16;rate=16000","audio": base64.StdEncoding.EncodeToString(buffer[:end-i]),"encoding": "raw",},}conn.WriteJSON(frameData)// 判断是否为最后一帧if status == STATUS_LAST_FRAME {fmt.Println(" send last ") // 打印发送最后一帧数据return}case STATUS_LAST_FRAME:frameData := map[string]interface{}{"data": map[string]interface{}{"status": STATUS_LAST_FRAME,"format": "audio/L16;rate=16000","audio": base64.StdEncoding.EncodeToString(buffer[:end-i]),"encoding": "raw",},}conn.WriteJSON(frameData)return}//time.Sleep(intervel)}}()// 读取 WebSocket 返回的识别结果var fullResult strings.Builderfor {var respData RespData_, msg, err := conn.ReadMessage()if err != nil {return "", fmt.Errorf("failed to read message: %v", err)}err = json.Unmarshal(msg, &respData)if err != nil {return "", fmt.Errorf("failed to parse response:%v", err.Error())}if respData.Code != 0 {return "", fmt.Errorf("error code: %d, message: %s", respData.Code, respData.Message)}fullResult.WriteString(respData.Data.Result.String())if respData.Data.Status == 2 {fmt.Println(respData.Code, respData.Message)fmt.Println("Final result received in :", time.Since(st))//return respData.Data.Result.String(), nilcancel() //通知协程停止发送break}}return fullResult.String(), nil}// RespData 用于解析返回的响应数据
type RespData struct {Sid string `json:"sid"`Code int `json:"code"`Message string `json:"message"`Data Data `json:"data"`
}// Data 用于封装识别数据
type Data struct {Result Result `json:"result"`Status int `json:"status"`
}// Result 用于封装识别结果
type Result struct {Ls bool `json:"ls"`Rg []int `json:"rg"`Sn int `json:"sn"`Pgs string `json:"pgs"`Ws []Ws `json:"ws"`
}// String 返回识别结果的字符串
func (t *Result) String() string {var wss stringfor _, v := range t.Ws {wss += v.String()}return wss
}// Ws 用于封装识别单词的位置信息
type Ws struct {Bg int `json:"bg"`Cw []Cw `json:"cw"`
}// String 返回单词的字符串
func (w *Ws) String() string {var wss stringfor _, v := range w.Cw {wss += v.W}return wss
}// Cw 用于封装单词信息
type Cw struct {Sc int `json:"sc"`W string `json:"w"`
}// 创建鉴权URL
func assembleAuthUrl(hosturl string, apiKey, apiSecret string) string {ul, err := url.Parse(hosturl)if err != nil {fmt.Println(err)}// 获取当前时间,用于签名date := time.Now().UTC().Format(time.RFC1123)// 拼接签名字符串signString := []string{"host: " + ul.Host, "date: " + date, "GET " + ul.Path + " HTTP/1.1"}sgin := strings.Join(signString, "\n")fmt.Println(sgin)// 计算HMAC签名sha := HmacWithShaTobase64("hmac-sha256", sgin, apiSecret)fmt.Println(sha)// 构建请求参数authUrl := fmt.Sprintf("hmac username=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"", apiKey,"hmac-sha256", "host date request-line", sha)// base64编码后加入URLauthorization := base64.StdEncoding.EncodeToString([]byte(authUrl))v := url.Values{}v.Add("host", ul.Host)v.Add("date", date)v.Add("authorization", authorization)// 返回完整的请求URLcallurl := hosturl + "?" + v.Encode()return callurl
}// HmacWithShaTobase64 计算HMAC SHA256签名并返回base64编码结果
func HmacWithShaTobase64(algorithm, data, key string) string {mac := hmac.New(sha256.New, []byte(key))mac.Write([]byte(data))encodeData := mac.Sum(nil)return base64.StdEncoding.EncodeToString(encodeData)
}
Whisper语音识别作为主接口使用
- 使用的语音识别模型是:BELLE-2/Belle-whisper-large-v3-zh-punct,它对标点符号识别比较好。
使用Python封装接口
from flask import Flask, request, jsonify
from transformers import pipeline
from opencc import OpenCC
from pydub import AudioSegment # ✅ 新增导入
import numpy as np
import ioapp = Flask(__name__)# 初始化 Whisper 模型
transcriber = pipeline("automatic-speech-recognition",model="BELLE-2/Belle-whisper-large-v3-zh-punct",device="cuda:0"
)# 设置强制中文转录
transcriber.model.config.forced_decoder_ids = (transcriber.tokenizer.get_decoder_prompt_ids(language="zh",task="transcribe")
)
# 繁体 -> 简体
cc = OpenCC('t2s')@app.route("/transcribe", methods=["POST"])
def transcribe_audio():if "file" not in request.files:return jsonify({"error": "请求中没有文件"}), 400file = request.files["file"]if file.filename == "":return jsonify({"error": "没有选择文件"}), 400# 读取二进制数据audio_bytes = file.read()audio_stream = io.BytesIO(audio_bytes)# 用 pydub 解码(支持 wav/mp3/m4a/ogg 等)audio = AudioSegment.from_file(audio_stream)audio = audio.set_channels(1).set_frame_rate(16000) # 单声道、16kHz# 转为 float32 numpywaveform = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0samplerate = audio.frame_rate# 送给 Whisperresult = transcriber({"array": waveform, "sampling_rate": samplerate})# 转简体text = result.get("text", "")simplified_text = cc.convert(text)segments = [{"text": simplified_text}]print("transcription =>", segments)return jsonify({"transcription": segments})if __name__ == "__main__":app.run(debug=True, port=13667, host="0.0.0.0")
goland调用Python接口
package serviceimport ("WorldEpcho/src/config""WorldEpcho/src/config/e""bytes""context""encoding/json""fmt""github.com/gin-gonic/gin""github.com/google/uuid""io""io/ioutil""log""mime/multipart""net/http""os""sync/atomic""time"
)// ---------------- 新增:全局并发计数 ----------------
var inFlightASR int32 // 统计“尚未回复的语音识别”数量// TranscriptionSegment 定义一个结构体来匹配JSON中transcription的数组元素
type TranscriptionSegment struct {//Start float64 `json:"start"`//End float64 `json:"end"`Text string `json:"text"`
}// TranscriptionResponse 是服务器响应的结构体
type TranscriptionResponse struct {Transcription []TranscriptionSegment `json:"transcription"`
}// TranscribeAudio 用于处理上传的音频文件并调用语音识别接口
func TranscribeAudio(c *gin.Context) {/*判断用户是否登录*/_, isLogin := IsUserLoggedIn(c)if !isLogin {log.Println("用户未登录")c.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户未登录"})return}// 从请求中获取文件file, _, err := c.Request.FormFile("file")if err != nil {c.JSON(http.StatusBadRequest, gin.H{"code": 0, "message": "读取请求文件失败"})return}defer file.Close()// 准备发送请求url := config.Conf.TranscribeUrlfmt.Println(config.ColorBlue, "request url: ", url, config.ColorReset)body := &bytes.Buffer{}writer := multipart.NewWriter(body)part, err := writer.CreateFormFile("file", "filename")if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "创建表单语音文件失败"})return}// 将文件复制到请求体_, err = io.Copy(part, file)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "将文件赋值到请求体失败"})return}writer.Close()// 发送请求fmt.Println(config.ColorCyan, "发送请求...", config.ColorReset)request, err := http.NewRequest("POST", url, body)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "发送请求数据失败"})return}request.Header.Set("Content-Type", writer.FormDataContentType())client := &http.Client{}resp, err := client.Do(request)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "请求失败"})return}defer resp.Body.Close()// 读取响应respBody, err := ioutil.ReadAll(resp.Body)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "获取响应数据失败"})return}// 解析 JSON 到结构体var transResp TranscriptionResponseerr = json.Unmarshal(respBody, &transResp)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": 0, "message": "解析json结构体错误"})return}// 输出结果c.JSON(http.StatusOK, gin.H{"code": 1, "message": transResp})fmt.Println(config.ColorPurple, "语音转换后响应结果:", config.ColorReset)fmt.Println(config.ColorGreen, transResp, config.ColorReset)}// 语音识别接口
func MGTranscribeAudio(c *gin.Context) {// 1) 校验 Token(原逻辑不变)tokenString := c.GetHeader("Token")if tokenString == "" {c.JSON(http.StatusOK, gin.H{"code": e.UnauthorizedStatus, "data": nil, "message": "请求头中无token,或未授权的token访问"})fmt.Println("请求头中无token,或未授权的token访问")return}isValid, err := IsValidMiGuToken(tokenString)if err != nil || !isValid {c.JSON(http.StatusOK, gin.H{"code": e.InvalidToken, "data": nil, "message": "无效或已过期的令牌"})fmt.Println("无效或已过期的令牌")return}// 2) 读取上传文件(保持参数与字段名不变:file)srcFile, fileHeader, err := c.Request.FormFile("file")if err != nil {c.JSON(http.StatusBadRequest, gin.H{"code": e.InvalidParams, "data": nil, "message": "读取请求音频文件失败"})fmt.Println("读取请求音频文件失败")return}defer srcFile.Close()// 把文件读入内存,便于两用(咪咕请求 + 讯飞降级)fileBytes, err := io.ReadAll(srcFile)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": e.InternalError, "data": nil, "message": "读取音频数据失败"})fmt.Println("读取音频数据失败")return}// 为讯飞接口准备一个临时文件路径(降级时使用)tmp, err := os.CreateTemp("", "asr-*"+uuid.New().String()+fileHeader.Filename)if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": e.InternalError, "data": nil, "message": "创建临时文件失败"})return}tmpPath := tmp.Name()if _, err = tmp.Write(fileBytes); err != nil {tmp.Close()os.Remove(tmpPath)c.JSON(http.StatusInternalServerError, gin.H{"code": e.InternalError, "data": nil, "message": "写入临时文件失败"})return}tmp.Close()defer os.Remove(tmpPath)// 3) 准备咪咕请求体(multipart)url := config.Conf.TranscribeUrlfmt.Println(config.ColorBlue, "request url: ", url, config.ColorReset)body := &bytes.Buffer{}writer := multipart.NewWriter(body)part, err := writer.CreateFormFile("file", fileHeader.Filename)if err != nil {fmt.Printf("创建表单语音文件失败: %v", err)c.JSON(http.StatusInternalServerError, gin.H{"code": e.InternalError, "data": nil, "message": "创建表单语音文件失败"})return}if _, err = io.Copy(part, bytes.NewReader(fileBytes)); err != nil {c.JSON(http.StatusInternalServerError, gin.H{"code": e.InternalError, "data": nil, "message": "将文件赋值到请求体失败"})fmt.Println("将文件赋值到请求体失败")return}writer.Close()contentType := writer.FormDataContentType()// 打印请求参数(原样保留)fmt.Println("请求参数:")fmt.Println("URL:", url)fmt.Println("Headers: Content-Type:", contentType)fmt.Println("Token:", tokenString)fmt.Println("File Name:", fileHeader.Filename)// 4) 并发与超时控制// 4.1 先判断并发是否过高(>3)——过高则直接走讯飞降级if atomic.AddInt32(&inFlightASR, 1) > 3 {atomic.AddInt32(&inFlightASR, -1) // 撤销这次占位fmt.Println("当前尚未回复的语音识别请求数 > 3,切换到科大讯飞接口")xfResult, err := recognizeSpeechFromAudioStream(tmpPath)if err != nil {c.JSON(http.StatusOK, gin.H{"code": e.InternalError, "data": nil, "message": "科大讯飞识别失败: " + err.Error()})return}response := gin.H{"code": e.SUCCESS,"data": gin.H{"fullSpeechParse": xfResult},"message": "语音解析成功(已使用科大讯飞负载均衡)",}// 序列化响应数据为 JSON 字符串jsonData, err := json.Marshal(response)if err != nil {// 处理序列化错误fmt.Println("序列化 JSON 数据出错:", err)return}fmt.Println(config.ColorCyan, "语音识别响应数据 ==> ", string(jsonData), config.ColorReset)c.JSON(http.StatusOK, response)return}// 能走到这里,说明并发未超阈值,我们开启咪咕请求协程// 注意:这里不再增加计数(上面已经 +1),协程结束时 -1defer func() {atomic.AddInt32(&inFlightASR, -1)}()// 4.2 建立通道与上下文,用于 3s 超时兜底切换type miGuResp struct {body []byteerr error}resultCh := make(chan miGuResp, 1)// 使用可取消的 context 以便在主 goroutine 决定超时后终止请求reqCtx, cancel := context.WithCancel(context.Background())// 5) 发送咪咕请求的协程go func(b []byte, ct string) {client := &http.Client{}req, err := http.NewRequestWithContext(reqCtx, "POST", url, bytes.NewReader(b))if err != nil {resultCh <- miGuResp{nil, fmt.Errorf("发送请求数据失败: %w", err)}return}req.Header.Set("Content-Type", ct)// 如果服务端需要 Token 放 Header,可在此处设置;原代码打印 Token,但未设置到 Header// 如有需要:req.Header.Set("Token", tokenString)resp, err := client.Do(req)if err != nil {resultCh <- miGuResp{nil, fmt.Errorf("请求失败: %w", err)}return}defer resp.Body.Close()respBody, err := io.ReadAll(resp.Body)if err != nil {resultCh <- miGuResp{nil, fmt.Errorf("获取响应数据失败: %w", err)}return}resultCh <- miGuResp{respBody, nil}}(body.Bytes(), contentType)// 6) 3 秒等待咪咕结果,否则切到讯飞select {case r := <-resultCh:if r.err != nil {// 咪咕直接失败,降级到讯飞fmt.Println("咪咕请求报错,切换到科大讯飞:", r.err)xfResult, err := recognizeSpeechFromAudioStream(tmpPath)if err != nil {fmt.Println("讯飞语音识别失败: " + err.Error())c.JSON(http.StatusOK, gin.H{"code": e.InternalError, "data": nil, "message": "语音识别失败: " + err.Error()})return}fmt.Println("Whisper语音识别失败,已使用科大讯飞)")response := gin.H{"code": e.SUCCESS,"data": gin.H{"fullSpeechParse": xfResult},"message": "语音解析成功",}// 序列化响应数据为 JSON 字符串jsonData, err := json.Marshal(response)if err != nil {// 处理序列化错误fmt.Println("序列化 JSON 数据出错:", err)return}fmt.Println(config.ColorCyan, "语音识别响应数据 ==> ", string(jsonData), config.ColorReset)c.JSON(http.StatusOK, response)return}// 咪咕返回成功,解析 JSONvar transResp TranscriptionResponseif err := json.Unmarshal(r.body, &transResp); err != nil {// 解析失败也走降级fmt.Println("解析咪咕JSON失败,切换到科大讯飞: ", err.Error())xfResult, derr := recognizeSpeechFromAudioStream(tmpPath)if derr != nil {fmt.Println("科大讯飞识别失败: " + derr.Error())c.JSON(http.StatusOK, gin.H{"code": e.InternalError, "data": nil, "message": "科大讯飞识别失败: " + derr.Error()})return}fmt.Println("语音解析成功(Whisper语音识别 JSON异常,已使用科大讯飞)")response := gin.H{"code": e.SUCCESS,"data": gin.H{"fullSpeechParse": xfResult},"message": "语音解析成功",}// 序列化响应数据为 JSON 字符串jsonData, err := json.Marshal(response)if err != nil {// 处理序列化错误fmt.Println("序列化 JSON 数据出错:", err)return}fmt.Println(config.ColorCyan, "语音识别响应数据 ==> ", string(jsonData), config.ColorReset)c.JSON(http.StatusOK, response)return}// 汇总咪咕转写文本var fullTranscript stringfor _, segment := range transResp.Transcription {fullTranscript += segment.Text}response := gin.H{"code": e.SUCCESS,"data": gin.H{"fullSpeechParse": fullTranscript},"message": "语音解析成功",}// 序列化响应数据为 JSON 字符串jsonData, err := json.Marshal(response)if err != nil {// 处理序列化错误fmt.Println("序列化 JSON 数据出错:", err)return}fmt.Println(config.ColorCyan, "语音识别响应数据 ==> ", string(jsonData), config.ColorReset)c.JSON(http.StatusOK, response)returncase <-time.After(3 * time.Second):// 3 秒不可达,切换讯飞fmt.Println("Whisper 3 秒不可达,切换到科大讯飞")cancel() // 取消咪咕请求(让协程尽快退出)xfResult, err := recognizeSpeechFromAudioStream(tmpPath)if err != nil {fmt.Println("科大讯飞识别失败: " + err.Error())c.JSON(http.StatusOK, gin.H{"code": e.InternalError, "data": nil, "message": "科大讯飞识别失败: " + err.Error()})return}fmt.Println("语音解析成功(咪咕超时,已使用科大讯飞)")response := gin.H{"code": e.SUCCESS,"data": gin.H{"fullSpeechParse": xfResult},"message": "语音解析成功",}// 序列化响应数据为 JSON 字符串jsonData, err := json.Marshal(response)if err != nil {// 处理序列化错误fmt.Println("序列化 JSON 数据出错:", err)return}fmt.Println(config.ColorCyan, "语音识别响应数据 ==> ", string(jsonData), config.ColorReset)c.JSON(http.StatusOK, response)return}}
添加路由
//语音识别router.POST("/SpeechConvertor", service.MGTranscribeAudio)