Gradio全解11——Streaming:流式传输的视频应用(9)——使用FastRTC+Gemini创建沉浸式音频+视频的艺术评论家
Gradio全解11——Streaming:流式传输的视频应用(9)——使用FastRTC+Gemini创建沉浸式音频+视频的艺术评论家
- 11.9 使用FastRTC+Gemini创建实时沉浸式音频+视频的艺术评论家
- 11.9.1 准备工作及音频图像编码器
- 1. 项目说明及准备工作
- 2. 音频和图像编码器
- 11.9.2 使用Gemini+fastrtc.Stream进行音视频处理
- 1. GeminiHandler:实时音视频发送与接受
- 2. 设置流媒体fastrtc.Stream对象并启动UI
- 11.9.3 Gradio.Blocks替换Stream.ui实现自定义界面
- 1. 实现代码及解读
- 2. 运行效果及参考资源
本章目录如下:
- 《Gradio全解11——Streaming:流式传输的视频应用(1)——FastRTC:Python实时通信库》
- 《Gradio全解11——Streaming:流式传输的视频应用(2)——Twilio:网络服务提供商》
- 《Gradio全解11——Streaming:流式传输的视频应用(3)——YOLO系列模型技术架构与实战》
- 《Gradio全解11——Streaming:流式传输的视频应用(4)——基于Gradio.WebRTC+YOLO的实时目标检测》
- 《Gradio全解11——Streaming:流式传输的视频应用(5)——RT-DETR:实时端到端检测模型》
- 《Gradio全解10——Streaming:流式传输的视频应用(6)——基于RT-DETR模型构建目标检测系统》
- 《Gradio全解11——Streaming:流式传输的视频应用(7)——多模态Gemini模型及其思考模式》
- 《Gradio全解11——Streaming:流式传输的视频应用(8)——Gemini Live API:实时音视频连接》
- 《Gradio全解11——Streaming:流式传输的视频应用(9)——使用FastRTC+Gemini创建沉浸式音频+视频的艺术评论家》
11.9 使用FastRTC+Gemini创建实时沉浸式音频+视频的艺术评论家
作为对本章知识的总结,本节将实现一个综合演示:让Gemini扮演艺术评论家,对用户通过FastRTC上传的艺术作品进行点评。本节内容包括准备工作及音频图像编码器、实现Gemini音视频处理程序使用gr.Blocks替换Stream.ui。
11.9.1 准备工作及音频图像编码器
本节先介绍项目及安装等准备工作,然后实现音频和图像编码器。
1. 项目说明及准备工作
FastRTC是一个支持通过WebRTC构建低延迟实时应用的库,Gemini是DeepMind发布的支持多模态和实时任务的大模型。本演示将完成以下工作:
- 将网络摄像头和麦克风数据流式传输至Gemini的实时会话。
- 定期发送视频帧(及可选上传图像)至模型。
- 实时流式返回模型的音频响应。
- 创建精美的全屏Gradio WebRTC用户界面。
在开始之前,需已安装Python>=3.10并获取GEMINI_API_KEY,并安装以下依赖:
pip install "fastrtc[vad, tts]" gradio google-genai python-dotenv websockets pillow
2. 音频和图像编码器
本节实现编码器功能,它将音频转换为base64编码数据,将图像转换为base64编码的JPEG格式,转换代码如下所示:
import base64
import numpy as np
from io import BytesIO
from PIL import Image
def encode_audio(data: np.ndarray) -> dict:"""Encode audio data (int16 mono) for Gemini."""return {"mime_type": "audio/pcm","data": base64.b64encode(data.tobytes()).decode("UTF-8"),}
def encode_image(data: np.ndarray) -> dict:with BytesIO() as output_bytes:pil_image = Image.fromarray(data)pil_image.save(output_bytes, "JPEG")bytes_data = output_bytes.getvalue()base64_str = str(base64.b64encode(bytes_data), "utf-8")return {"mime_type": "image/jpeg", "data": base64_str}
这段代码包含两个函数,分别用于音频和图像数据的编码处理,解读如下:
- encode_audio:接收int16格式的单声道音频numpy数组,将音频数据转换为字节流并进行base64编码,最后返回包含MIME类型和编码数据的字典。输出格式适用于Gemini系统。
- encode_image:接收图像数据的numpy数组,使用Pillow库将数组转为JPEG格式图像。将图像数据转换为字节流并进行base64编码,最后返回包含JPEG类型和编码数据的字典。
两个函数都实现了将原始二进制数据转换为base64编码字符串的功能,并附带相应的MIME类型信息,这种编码方式常用于网络传输或API交互场景。音频处理保持原始PCM格式,而图像处理则转换为JPEG格式进行压缩。
11.9.2 使用Gemini+fastrtc.Stream进行音视频处理
本节先详细讲述实时进行音视频发送与接受的GeminiHandler对象的处理逻辑,然后设置fastrtc.Stream并启动其UI进行展示。
1. GeminiHandler:实时音视频发送与接受
异步音视频流处理核心类GeminiHandler的实现代码如下所示:
import asyncio
import os
import time
import numpy as np
import websockets
from dotenv import load_dotenv
from google import genai
from fastrtc import AsyncAudioVideoStreamHandler, wait_for_item, WebRTCError
load_dotenv()
class GeminiHandler(AsyncAudioVideoStreamHandler):def __init__(self) -> None:super().__init__("mono",output_sample_rate=24000,input_sample_rate=16000,)self.audio_queue = asyncio.Queue()self.video_queue = asyncio.Queue()self.session = Noneself.last_frame_time = 0.0self.quit = asyncio.Event()async def start_up(self):await self.wait_for_args()api_key = self.latest_args[3]hf_token = self.latest_args[4]if hf_token is None or hf_token == "":raise WebRTCError("HF Token is required")os.environ["HF_TOKEN"] = hf_tokenclient = genai.Client(api_key=api_key, http_options={"api_version": "v1alpha"})config = {"response_modalities": ["AUDIO"], "system_instruction": "You are an art critic that will critique the artwork passed in as an image to the user. Critique the artwork in a funny and lighthearted way. Be concise and to the point. Be friendly and engaging. Be helpful and informative. Be funny and lighthearted."}async with client.aio.live.connect(model="gemini-2.0-flash-exp", # Replaceable version: gemini-2.0-flash. Latest version: gemini-live-2.5-flash-previewconfig=config,) as session:self.session = sessionwhile not self.quit.is_set():turn = self.session.receive()try:async for response in turn:if data := response.data:audio = np.frombuffer(data, dtype=np.int16).reshape(1, -1)self.audio_queue.put_nowait(audio)except websockets.exceptions.ConnectionClosedOK:print("connection closed")break# Video: receive and (optionally) send frames to Geminiasync def video_receive(self, frame: np.ndarray):self.video_queue.put_nowait(frame)if self.session and (time.time() - self.last_frame_time > 1.0):self.last_frame_time = time.time()await self.session.send(input=encode_image(frame))# If there is an uploaded image passed alongside the WebRTC component,# it will be available in latest_args[2]if self.latest_args[2] is not None:await self.session.send(input=encode_image(self.latest_args[2]))async def video_emit(self) -> np.ndarray:frame = await wait_for_item(self.video_queue, 0.01)if frame is not None:return frame# Fallback while waiting for first framereturn np.zeros((100, 100, 3), dtype=np.uint8)# Audio: forward microphone audio to Geminiasync def receive(self, frame: tuple[int, np.ndarray]) -> None:_, array = framearray = array.squeeze() # (num_samples,)audio_message = encode_audio(array)if self.session:await self.session.send(input=audio_message)# Audio: emit Gemini’s audio back to the clientasync def emit(self):array = await wait_for_item(self.audio_queue, 0.01)if array is not None:return (self.output_sample_rate, array)return arrayasync def shutdown(self) -> None:if self.session:self.quit.set()await self.session.close()self.quit.clear()
该类继承自AsyncAudioVideoStreamHandler,实现了一个基于Gemini Live API的、通过WebRTC进行实时音视频处理的双向流传输系统,它使用Gemini模型进行实时艺术评论(图像分析)和对话处理,核心功能解析如下:
- __init__(self)方法:初始化类参数。①音频队列(audio_queue)和视频队列(video_queue):使用asyncio实现非阻塞IO操作,用于异步数据交换。②quit事件:传输关闭控制机制。③采样率配置:支持16kHz输入/24kHz输出的音频流转换。
- start_up(self):初始化Gemini会话配置并连接,处理响应消息。 config:系统指令设置响应模态为AUDIO,并让模型扮演艺术评论角色。
client.aio.live.connect(...)
:配置模型参数(gemini-2.0-flash-exp),实时持续接收Gemini响应并存入客户端的音频队列audio_queue。 - 音频处理流程。receive():接受麦克风输入,编码后发送至Gemini。emit():从音频队列audio_queue获取Gemini响应音频,并返回给客户端处理。两个函数通过队列缓冲处理音频数据,最终实现音频流的双向传输。
- 视频处理流程。video_receive():接收摄像头视频流图像帧并存入video_queue,按1秒间隔发送至Gemini,且每秒最多发送一帧视频(避免对API造成过载)。同时可额外处理预设图像(latest_args[2] ),因此可选择同时发送上传的图像(gr.Image)和网络摄像头视频帧。video_emit():从视频队列video_queue获取视频帧输出(含空帧处理),将输出发送到客户端。注意:Gemini Live API只返回评论视频帧和图像帧的音频流,视频队列video_queue中的视频直接来自客户端的摄像头。
系统在初始化时加载环境变量,然后建立Gemini长连接并循环处理音视频数据流:每秒发送一帧视频进行分析,并实时双向传输音频,最后再关闭时清理会话资源。GeminiHandler类的模型版本可替换,支持自定义系统指令,其模块化的队列设计非常便于扩展 。
2. 设置流媒体fastrtc.Stream对象并启动UI
我们将在WebRTC组件旁添加一个可选的gr.Image输入组件,在向Gemini发送帧时,处理程序可通过self.latest_args[2]访问该图像。fastrtc.Stream代码如下所示:
import gradio as gr
from fastrtc import Stream, WebRTC, get_hf_turn_credentials
stream = Stream(handler=GeminiHandler(),modality="audio-video",mode="send-receive",server_rtc_configuration=get_hf_turn_credentials(ttl=600*10000),rtc_configuration=get_hf_turn_credentials(),additional_inputs=[gr.Markdown("## 🎨 Art Critic\n\n""Provide an image of your artwork or hold it up to the webcam, and Gemini will critique it for you.""To get a Gemini API key, please visit the [Gemini API Key](https://aistudio.google.com/apikey) page.""To get an HF Token, please visit the [HF Token](https://huggingface.co/settings/tokens) page."),gr.Image(label="Artwork", value="mona_lisa.jpg", type="numpy", sources=["upload", "clipboard"]),gr.Textbox(label="Gemini API Key", type="password"),gr.Textbox(label="HF Token", type="password"),],ui_args={"icon": "https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png","pulse_color": "rgb(255, 255, 255)","icon_button_color": "rgb(255, 255, 255)","title": "Gemini Audio Video Chat",},time_limit=90,concurrency_limit=5,
)
if __name__ == "__main__":stream.ui.launch()
在FastRTC的Stream中,设置各项配置,核心功能有:
- FastRTC流配置。首先,配置自定义的Gemini处理程序,设置为支持音视频模态及双向通信模式。然后,设置服务器端TURN和客户端TURN,使用Hugging Face的TURN服务器实现NAT穿透,10小时(600*10000毫秒)的凭证有效期确保长时会话稳定性。
- Gradio界面设计。首先,定义输入组件,包括Markdown说明文档、图像上传组件、和安全凭证输入。然后,实现UI定制,包括Gemini官方图标URL、按钮颜色及标题。
- 设置系统约束,包括单次会话最长90秒和最大并发连接数5。
该界面结合WebRTC+P2P通信减少中间环节延迟,实现低延迟架构。通过密码框保护API密钥,符合敏感信息处理规范。运行效果如图11-10所示:
11.9.3 Gradio.Blocks替换Stream.ui实现自定义界面
在11.10.2节中,使用fastrtc.Stream对象的默认ui启动,如果希望实现自定义界面且保留Stream设置,该怎么办呢?
1. 实现代码及解读
使用gr.Blocks替换Stream.ui实现代码如下所示:
stream = Stream(handler=GeminiHandler(),modality="audio-video",mode="send-receive",rtc_configuration=get_cloudflare_turn_credentials_async,time_limit=180 if get_space() else None,additional_inputs=[gr.Image(label="Image", type="numpy", sources=["upload", "clipboard"])],ui_args={"icon": "https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png","pulse_color": "rgb(255, 255, 255)","icon_button_color": "rgb(255, 255, 255)","title": "Gemini Audio Video Chat",},
)css = """
#video-source {max-width: 600px !important; max-height: 600 !important;}
"""
with gr.Blocks(css=css) as demo:gr.HTML("""<div style='display: flex; align-items: center; justify-content: center; gap: 20px'><div style="background-color: var(--block-background-fill); border-radius: 8px"><img src="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png" style="width: 100px; height: 100px;"></div><div><h1>Gen AI SDK Voice Chat</h1><p>Speak with Gemini using real-time audio + video streaming</p><p>Powered by <a href="https://gradio.app/">Gradio</a> and <a href=https://freddyaboulton.github.io/gradio-webrtc/">WebRTC</a>⚡️</p><p>Get an API Key <a href="https://support.google.com/googleapi/answer/6158862?hl=en">here</a></p></div></div>""")with gr.Row() as row:with gr.Column():webrtc = WebRTC(label="Video Chat", modality="audio-video",mode="send-receive", elem_id="video-source",rtc_configuration=get_cloudflare_turn_credentials_async,icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png",pulse_color="rgb(255, 255, 255)",icon_button_color="rgb(255, 255, 255)",)with gr.Column():image_input = gr.Image(label="Image", type="numpy", sources=["upload", "clipboard"])webrtc.stream(GeminiHandler(), inputs=[webrtc, image_input],outputs=[webrtc], time_limit=180 if get_space() else None,concurrency_limit=2 if get_space() else None)
stream.ui = demo
if __name__ == "__main__":if (mode := os.getenv("MODE")) == "UI":stream.ui.launch(server_port=7860)elif mode == "PHONE":raise ValueError("Phone mode not supported for this demo")else:stream.ui.launch(server_port=7860)
本段代码先定义fastrtc.Stream配置,然后构建Gradio界面,最后定义组件交互逻辑,详细解读如下:
- fastrtc.Stream流媒体配置。核心组件,处理音视频流传输和AI集成,使用GeminiHandler作为数据处理核心。支持双工通信模式(send-receive),使用Cloudflare TURN服务器处理NAT穿透。动态设置会话时长限制180秒,通过get_space()函数判断是否启用限制。
- Gradio界面构建。采用Blocks模式创建响应式布局。首先,通过CSS约束视频源尺寸(600x600像素)。然后,左侧显示Gemini品牌图标和产品标题,右侧提供API获取指引。最后在双列布局中,左列是实时音视频组件(WebRTC),右列是图像输入组件(支持上传/粘贴)。严格使用Gemini官方视觉元素(图标、配色),通过HTML/CSS实现品牌展示区。
- 组件交互逻辑。WebRTC组件绑定GeminiHandler处理器,同时接收音视频流和图像输入。设置时长限制和并发限制数,通过get_space()函数实现动态资源配置。启动时根据环境变量MODE选择UI模式或Phone模式(后者暂不支持)
2. 运行效果及参考资源
最终运行效果如图11-11所示:
如需进一步学习,请参考资源:
- Gemini音视频聊天示例——Gemini艺术评论家参考代码:gradio/Gemini-Art-Critic🖇️链接11-61。Gradio Blocks界面版本:fastrtc/gemini-audio-video🖇️链接11-62。
- FastRTC的Audio + Video音视频用户指南:🖇️链接11-63。
- Gradio参考资料:Create a Real-Time Immersive Audio + Video Demo with FastRTC🖇️链接11-64。