大模型及agent开发6 OpenAI Assistant API 高阶应用 - 流式输出功能
1.Assistant API 的主要优点:
减少编码工作量、自动管理上下文窗口、安全的访问控制、工具和文档的轻松集成
本节讲应用设计和性能
流式输出:借助流式输出,可以让应用程序实时处理和响应用户输入。具体来说,这种技术允许数据在生成的同时即刻传输和处理,而不必等待整个数据处理过程完成。这样,用户无需等待全部数据加载完成即可开始接收到部分结果,从而显著提高了应用的反应速度和交互流畅性。
eg:对话时回答是逐渐展现的。
在流式输出的实现方式中,我们需要在调用 client.chat.completions.create() 时添加 stream=True 参数,用以指定启用流式输出,从而允许 API 逐步发送生成的文本片段,然后使用 for 循环来迭代 completion 对象
code如下:
from openai import OpenAI
client = OpenAI()
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一位乐于助人的人工智能小助理。"},
{"role": "user", "content": "你好,请你介绍一下你自己。"}
],
stream=True
)
for chunk in completion:
print(chunk.choices[0].delta)
OpenAI 提供了两种主要的流实现方法:第一种基于 WebSocket 的直接实时通信方法,第二种是适用于本身不支持此类交互的平台的模拟流。两种流实现方法各有其特点和应用场景:
基于 WebSocket 的直接实时通信方法:
WebSocket 是一种网络通信协议,它允许在用户的浏览器和服务器之间建立一个不断开的连接,通过这个连接可以实现双向的数据传输。
模拟流
是为了在不支持 WebSocket 或其他实时协议的环境中提供类似的功能。这种方法通过定期发送HTTP请求来“模拟”一个持续的数据流。常见的实现方式包括长轮询和服务器发送事件(Server-Sent Events, SSE)。
在这两种方法中,模拟流更加常用,因为它依赖于标准的HTTP请求和响应,易于理解和实现。
二.具体实操:
1.Assistant API 如何开启流式传输
要在一个完整的生命周期内启用流式传输,可以分别在Create Run、 Create Thread and Run和 Submit Tool Outputs 这三个 API 端点中添加 "stream": True 参数。
这样设置后,API 返回的响应将是一个事件流。
首先,我们按照 Assistant API创建对话或代理的标准流程来构建应用实例。如下代码所示:
from openai import OpenAI
# 构建 OpenAI 客户端对象的实例
client = OpenAI()
# Step 1. 创建 Assistant 对象
assistant = client.beta.assistants.create(
model="gpt-4o-mini-2024-07-18",
name="Good writer", # 优秀的作家
instructions="You are an expert at writing excellent literature" # 你是一位善于写优秀文学作品的专家
)
# Step 2. 创建 Thread 对象
thread = client.beta.threads.create()
# Step 3. 向Thread中添加Message
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="写一篇关于一个小女孩在森林里遇到一只怪兽的故事。详细介绍她的所见所闻,并描述她的心里活动"
)
在Create Run的过程中,添加stream=True参数,开启流媒体传输。代码如下:
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True # 开启流式传输
)
print(run)
for event in run:
print(event)
其返回的结果是一个 openai.Stream 对象。这个对象表示的是一个流式数据通道,可以用来接收连续传输的数据。
2.Assistant API 流式传输中的事件流
整个事件流的核心流程包括:当新的运行被创建时,发出 thread.run.created 事件;当运行完成时,发出 thread.run.completed 事件。在运行期间选择创建消息时,将发出一个 thread.message.created 事件、一个 thread.message.in_progress 事件、多个 thread.message.delta 事件,以及最终的 thread.message.completed 事件。
在处理过程中,任何需要的信息都可以通过访问数据结构的方式来获取。
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True # 开启流媒体传输
)
for event in run:
#print(event.event)
if event.event == 'thread.message.delta':
# 提取 text delta 的 value
value = event.data.delta.content[0].text.value
print(value)
按照相同的思路,我们还可以尝试测试Create Thread and Run和 Submit Tool Outputs两个端点在启用流媒体传输时的实现方法。
首先来看Create Thread and Run方法。这个方法很容易理解,它所做的事情就是把创建Thread对象实例、向Thread对象实例中追加Message信息以及构建Run状态这三个单独的步骤合并在了一个.beta.threads.create_and_run方法中,所以调用方法也发生了一定的变化,代码如下:
run = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": [
{"role": "user", "content": "写一篇歌颂中国的文章"}
]
},
stream=True,
)
for event in run:
print(event)
3.如何在函数调用中启用流式传输
这里我们定义一个外部函数get_current_weather,用来获取指定地点的当前天气状况。
import json
def get_current_weather(location, unit="celsius"):
"""Get the current weather in a given location in China"""
if "beijing" in location.lower():
return json.dumps({"location": "Beijing", "temperature": "15", "unit": unit})
elif "shanghai" in location.lower():
return json.dumps({"location": "Shanghai", "temperature": "20", "unit": unit})
elif "guangzhou" in location.lower():
return json.dumps({"location": "Guangzhou", "temperature": "25", "unit": unit})
else:
return json.dumps({"location": location, "temperature": "unknown"})
接下来,需要为 get_current_weather 函数编写一个 JSON Schema的表示。这个 Json Schema的描述将用于指导大模型何时以及如何调用这个外部函数。通过定义必要的参数、预期的数据格式和响应类型,可以确保大模型能够正确并有效地利用这个功用这个功能。定义如下:
get_weather_desc = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g.beijing",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
然后,定义一个available_functions 字典,用来映射函数名到具体的函数对象,因为在大模型识别到需要调用外部函数的时候,我们需要动态地调用函数并获取到函数的执行结果。
available_functions = {
"get_current_weather": get_current_weather,
}
在准备好外部函数后,我们在定义 Assistant 对象时需要使用 tools 参数。此参数是一个列表的数据类型,我们要将可用的外部工具的 JSON Schema 描述添加进去,从而让 Assistant 可以正确识别这些工具,代码如下:
from openai import OpenAI
client = OpenAI()
# Step 1. 创建一个新的 assistant 对象实例
assistant = client.beta.assistants.create(
name="你是一个实时天气小助理",
instructions="你可以调用工具,获取到当前的实时天气,再给出最终的回复",
model="gpt-4o-mini-2024-07-18",
tools=[get_weather_desc],
)
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()
然后,还是按照Assistant API的标准流程,将Messages追加到Thread中,并执行Run运行状态。代码如下:
# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)
# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
stream=True
)
for event in run:
print(event)
从上述的输出中可以分析出:**与直接生成回复的流式过程相比,明显的区别体现在 `thread.run.step.delta` 事件中。**在这一事件中,增量更新的是大模型输出的参数`arguments`,这个参数中的内容是用于执行外部函数的,而非对“北京现在的天气怎么样?”这个提问的回答。此外,流程会暂停在 `thread.run.requires_action` 事件处,整个过程不会自动生成最终的回复,而是等待外部操作的完成。
因此要在操作中识别这两点
这里我们仍然可以通过直接提取数据结构的方式来获取到想要的信息。
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()
# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)
# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
stream=True
)
for event in run:
if event.event == 'thread.run.step.delta':
# 打印大模型输出的参数
print("Delta Event Arguments:")
print(event.data.delta.step_details.tool_calls[0].function.arguments)
print("--------------------------------------------------") # 添加分隔符
if event.event == 'thread.run.requires_action':
# 打印需要采取行动的详细信息
print("Requires Action Data:")
print(event.data)
print("--------------------------------------------------") # 添加分隔符
在 event.event == 'thread.run.requires_action' 事件中,当识别到需要执行的外部函数及其参数后,我们可以根据Function Calling中介绍的方法来执行这些函数。在流媒体的事件流中我们只要适当的进行修改,就可以完成这个过程。代码如下:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()
# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)
# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
stream=True
)
tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")
required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")
tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")
# 执行外部函数,使用循环是因为`Assistant API` 可以并行执行多个函数
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")
tool_outputs.append({"tool_call_id": tool_id, "output":function_result})
print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")
当获取到外部函数的执行结果后,需要将这些结果再次追加到 Thread 中,使其再次进入到队列中,以继续回答北京现在的天气怎么样?这个原始的问题。正如我们在上节课中介绍的,使用.beta.threads.runs.submit_tool_outputs方法用于提交外部工具的输出,此方法也支持流媒体输出,如果在这个阶段需要开启流媒体传输,我们就需要使用 stream=True 参数来明确指定。代码如下:
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()
# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)
# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
stream=True
)
tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")
required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")
tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")
# 执行外部函数,使用循环是因为`Assistant API` 可以并行执行多个函数
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")
tool_outputs.append({"tool_call_id": tool_id, "output":function_result})
print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")
run_tools = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=function_info.id,
tool_outputs=tool_outputs,
stream=True)
for event_tool in run_tools:
# print(f"event_tool:{event_tool}")
if event_tool.event == 'thread.message.delta':
# 提取 text delta 的 value
text = event_tool.data.delta.content[0].text.value
print("Delta Text Output:", text)
print("--------------------------------------------------") # 添加分隔符
if event_tool.event == 'thread.message.completed':
# 提取 text delta 的 value
full_text = event_tool.data.content[0].text.value
print("Completed Message Text:", full_text)
print("--------------------------------------------------") # 添加分隔符
由此可见,当流式传输中涉及到函数调用的中间过程时,实际上也是发生了两次`Run`操作。为了使第二个`Run`能够接续第一个`Run`的输出,关键在于两者之间需要共享第一个`Run`的`Run id`。此外,在第二个`Run`的运行状态中需要设置`stream=True`以启用流式传输,确保两个`Run`状态的输出能够有效链接,形成一次连贯的响应。整个过程的事件流非常多,需要大家仔细体会。