当前位置: 首页 > news >正文

[VoiceRAG] RTMiddleTier实时中间层 | WebSocket处理器 | 拦截

第3章:RTMiddleTier(实时中间层)

  • 在第1章:知识库配置(集成向量化)中,我们构建了一个智能数字图书馆

  • 随后在第2章:RAG工具集中,我们为AI配备了专用"助手"——searchreport_grounding工具——使其能够与图书馆交互并提供可信答案。

但这些组件如何协同工作,实现流畅的实时语音对话?你的语音如何传递给AI?AI如何决定使用RAG工具?语音响应又如何无延迟地返回?

这正是**RTMiddleTier(实时中间层)**发挥关键作用的地方。

问题:实时AI对话的编排

想象你正与AI进行语音对话。你说话,AI思考,可能需要查询信息,然后回应。要让这个过程自然流畅,后台需要瞬间完成以下操作:

  • 你的语音需传递给AI
  • AI需理解问题
  • 若问题需从特定知识库获取信息,AI必须暂停常规思考,调用RAG工具(如search工具),等待结果后再继续思考
  • AI形成答案后需转为语音
  • 语音需立即返回浏览器

这是复杂的协作过程,尤其是当AI可能根据你的提示在对话中途决定使用工具时

我们如何管理整个流程,在不干扰实时体验的前提下注入自定义RAG逻辑?

解决方案:RTMiddleTier——智能交通指挥中心

RTMiddleTier(实时中间层)是本应用的核心大脑。它如同智能交通指挥员高级翻译官,位于对话的中央,作为浏览器与强大的Azure OpenAI GPT-4o实时API之间的代理中介

其核心功能:

  1. 监听语音:接收浏览器发送的语音输入
  2. 拦截消息:拦截所有进出Azure OpenAI API的消息(关键点:不仅传递消息,还会解读内容)
  3. 插入智能工具(RAG):当AI决定调用RAG工具(如第2章的search工具)时,RTMiddleTier拦截请求,执行工具,并将结果返回AI(此时第1章的知识库真正发挥作用!)
  4. 处理AI响应:确保AI响应格式正确,并通过report_grounding工具添加来源引用等增强内容
  5. 保障流畅交互:所有操作实时完成,使对话自然响应

本质上,RTMiddleTier通过管理流程并在适当时机注入智能,使AI能够进行动态的、支持RAG的语音对话。

实现

以问题"什么是RTMiddleTier?"为例,其处理流程如下:

在这里插入图片描述

该流程图展示RTMiddleTier如何作为中介:

  • 将语音传递给AI
  • 当AI调用RAG工具时,由其处理与Azure AI Search的交互
  • 将AI的语音响应及相关引用无缝传递给用户

RTMiddleTier的代码实现

RTMiddleTier实现在app/backend/rtmt.py中,使用aiohttp管理浏览器与Azure OpenAI实时API之间的WebSocket连接。

1. 初始化RTMiddleTier

首先在app/backend/app.py中配置RTMiddleTier的Azure OpenAI终端和凭证:

# app/backend/app.py(简化片段)
from rtmt import RTMiddleTierasync def create_app():# ...凭证设置...rtmt = RTMiddleTier(credentials=llm_credential, # Azure OpenAI凭证endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],deployment=os.environ["AZURE_OPENAI_REALTIME_DEPLOYMENT"],voice_choice=os.environ.get("AZURE_OPENAI_REALTIME_VOICE_CHOICE") or "alloy")# ...附加RAG工具和系统消息...# ...关联到web应用...

说明:此代码创建RTMiddleTier实例,配置其连接Azure OpenAI实时API所需的参数,包括语音选择(如"alloy")。

2. 关联到Web应用

RTMiddleTier需通过特定路径接收浏览器连接:

# app/backend/app.py(简化片段)
# ...其他导入...rtmt.attach_to_app(app, "/realtime") # 将RTMiddleTier关联到/realtime路径# ...其余应用设置...

说明:此行告知web应用,所有发送至/realtime的请求应由RTMiddleTier处理,建立WebSocket终端。

3. WebSocket处理器

当浏览器连接/realtime时,app/backend/rtmt.py中的_websocket_handler开始工作:

# app/backend/rtmt.py(简化片段)
# ...导入...class RTMiddleTier:# ...初始化代码...async def _websocket_handler(self, request: web.Request):ws = web.WebSocketResponse()await ws.prepare(request) # 准备WebSocket连接await self._forward_messages(ws) # 开始转发消息return ws

说明:此函数是新实时对话的入口,将请求升级为WebSocket连接后转由_forward_messages处理实际消息中继。

4. 消息转发与拦截

_forward_messages是核心代理逻辑,创建两个并行任务:监听浏览器到OpenAI的消息,以及OpenAI到浏览器的消息。关键是在发送前调用_process_message_to_server_process_message_to_client拦截并可能修改消息。

# app/backend/rtmt.py(简化片段)
import asyncio # 用于并发运行任务class RTMiddleTier:# ...其他方法...async def _forward_messages(self, ws: web.WebSocketResponse):# 建立与Azure OpenAI实时API的连接async with aiohttp.ClientSession(base_url=self.endpoint) as session:async with session.ws_connect("/openai/realtime", # 连接OpenAIheaders=headers, params=params) as target_ws:async def from_client_to_server():async for msg in ws: # 监听来自浏览器的消息if msg.type == aiohttp.WSMsgType.TEXT:new_msg = await self._process_message_to_server(msg, ws) # 拦截!if new_msg is not None:await target_ws.send_str(new_msg) # 发送至OpenAIasync def from_server_to_client():async for msg in target_ws: # 监听来自OpenAI的消息if msg.type == aiohttp.WSMsgType.TEXT:new_msg = await self._process_message_to_client(msg, ws, target_ws) # 拦截!if new_msg is not None:await ws.send_str(new_msg) # 发送至浏览器# 同时运行两个任务await asyncio.gather(from_client_to_server(), from_server_to_client())

说明:此函数创建两个并行"监听站"。发往OpenAI的消息经_process_message_to_server处理,来自OpenAI的消息经_process_message_to_client处理,实现"智能交通指挥"功能。

5. 拦截发往OpenAI的消息(_process_message_to_server

此函数拦截浏览器发送至OpenAI API的消息,主要任务是注入系统消息和可用RAG工具。

# app/backend/rtmt.py(简化片段)
class RTMiddleTier:system_message: Optional[str] = None # 在app.py中设置tools: dict[str, Tool] = {}         # 由attach_rag_tools附加async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) -> Optional[str]:message = json.loads(msg.data)if message is not None and message["type"] == "session.update":session = message["session"]if self.system_message is not None:session["instructions"] = self.system_message # 注入系统消息# 若有工具则设置工具选择为'auto'session["tool_choice"] = "auto" if len(self.tools) > 0 else "none"session["tools"] = [tool.schema for tool in self.tools.values()] # 注入工具定义return json.dumps(message) # 发送修改后的消息return msg.data # 若非会话更新则发送原始消息

说明:当浏览器发起新会话或更新时,RTMiddleTier捕获该消息,用自定义system_message(见第2章)覆盖客户端指令,并向OpenAI API提供searchreport_grounding工具的模式定义,告知AI行为准则和可用工具。

6. 拦截来自OpenAI的消息(_process_message_to_client

这是最关键的环节,处理OpenAI的响应和工具调用。

# app/backend/rtmt.py(简化片段)
class RTMiddleTier:_tools_pending = {} # 跟踪工具调用async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse) -> Optional[str]:message = json.loads(msg.data)updated_message = msg.dataif message is None:return None # 无需处理if message["type"] == "response.output_item.done":item = message["item"]if item["type"] == "function_call": # OpenAI要调用工具!tool = self.tools[item["name"]] # 获取工具(如'search')args = item["arguments"] # 获取参数(如{"query": "RTMiddleTier"})# 执行工具并获取结果!result = await tool.target(json.loads(args)) # 将工具输出返回OpenAIawait server_ws.send_json({"type": "conversation.item.create","item": { "type": "function_call_output", "call_id": item["call_id"], "output": result.to_text() }})# 若工具结果需发送客户端(如引用信息),则转发if result.destination == ToolResultDirection.TO_CLIENT:await client_ws.send_json({"type": "extension.middle_tier_tool_response","tool_name": item["name"],"tool_result": result.to_text()})return None # 不向客户端转发原始工具调用return updated_message # 转发其他消息(如AI语音输出)

说明

  1. OpenAI调用工具:当OpenAI决定使用工具(如search("RTMiddleTier")),会向RTMiddleTier发送消息(即response.output_item.donetype: function_call
  2. RTMiddleTier拦截_process_message_to_client函数拦截此消息
  3. 执行工具:查找工具对应的Python函数(如第2章的_search_tool)并执行,传入OpenAI提供的参数(如搜索词)
  4. 返回结果至OpenAI:工具执行完成后(如Azure AI Search返回结果),RTMiddleTiertool_result返回OpenAI,使AI能继续思考过程
  5. 发送客户端结果:若工具输出与客户端相关(如report_grounding提供的引用信息),则转发至浏览器
  6. 隐藏内部工具调用:通过返回None,阻止原始"工具调用"消息到达浏览器,保持前端对话简洁

总结

RTMiddleTier是本应用的神经中枢,作为智能代理协调浏览器与Azure OpenAI API之间的实时对话流

通过拦截消息,它动态注入system_message并执行RAG工具,使AI能够与自定义知识库交互,实时提供有依据的相关答案。这一复杂中介机制真正实现了动态的、支持RAG的语音体验。

理解RTMiddleTier如何管理后端对话流后,我们将转向语音的采集与处理。在第4章:浏览器音频处理中,将深入探讨浏览器如何捕获你的音频并高效发送至RTMiddleTier

http://www.dtcms.com/a/466803.html

相关文章:

  • 美图秀秀“AI合照”功能风靡欧洲,荣登14国应用商店总榜第一
  • Arduino实战:智能家居控制系统的设计与实现
  • 网站seo评测常州中环做网站多少钱
  • 电影网站建设教程江苏常州建设局网站
  • 格式化json文件
  • PostgreSQL `pg_trgm` 性能调优与索引维护
  • 怎么找个人搭建网站网站h5什么意思
  • 基于单片机的多功能面粉面条馒头面点制作机设计
  • CMP平台(类Cloudera CDP7.3)在华为鲲鹏的Aarch64信创环境中的性能表现
  • HarmonyOS鸿蒙 - 获取设备唯一标识
  • 网站10月份可以做哪些有意思的专题天津网络优化招聘
  • [crackme]026-KeygenMe
  • next 项目中的 ‘use client‘ 是什么意思
  • 高通平台蓝牙学习--蓝牙双 A2DP/AVRCP 功能测试指南:从环境搭建到实操步骤
  • iOS 推送开发完整指南,APNs 配置、证书申请、远程推送实现与上架调试经验分享
  • 单线程拉取消息 + 自定义线程池处理消息,出现线程池超载解决
  • 无锡 网站开发网络优化需要哪些知识
  • 网站开发背景图模板网络培训学校排名
  • ByteDance——jy真题
  • 【原创】SpringBoot3+Vue3个人日记管理系统
  • 做网站需要哪些技术人员金华网站建设策划
  • 第6章 muduo网络库简介(1)
  • 应用层协议之DNS协议
  • AI多维回归模型追踪政策信号:威廉姆斯降息倾向的就业因子分析
  • 哈尔滨自助建站小企业网站建设论文
  • c++的‘-1/-0’用法
  • 苏州企业建设网站价格工会网站建设可以
  • 网站套餐到期是什么意思西安市网页制作公司有哪些
  • 网站设计的内容有哪些网络规划与设计毕业设计
  • 重载和继承的实践