流式事件与块响应接口设计流程拆解
01. 流式事件响应初识
流式事件响应(SSE,Server-Sent Event)是一种用于服务器向客户端推送数据的机制,它通过 HTTP 协议将 事件流 不断发送给客户端,典型的场景是实时更新数据,比如股票价格、社交媒体通知、聊天系统等。并且随着 ChatGPT 在 2022 年底的爆火,流式事件响应 也成为 LLM/Agent 应用的标配。
使用 流式事件响应 可以让 Agent/LLM 在漫长的生成过程中,每处理完一个部分的内容即进行响应,从而提升用户的体验感,不至于让用户一个请求等待数十秒乃至几分钟,简单来说,流式事件响应 会处理一部分即返回一部分,而不是等待所有内容都处理完毕后才返回。
例如 ChatGPT 应用在生成内容时,流式事件输出格式如下
event: delta_encoding
data: "v1"
event: delta
data: {"p": "", "o": "add", "v": {"message": {"id": "88085b01-824c-4eca-b7ae-59ba95f71f6d", "author": {"role": "system", "name": null, "metadata": {}}, "create_time": null, "update_time": null, "content": {"content_type": "text", "parts": [""]}, "status": "finished_successfully", "end_turn": true, "weight": 0.0, "metadata": {"is_visually_hidden_from_conversation": true}, "recipient": "all", "channel": null}, "conversation_id": "670e5b00-3368-8008-bc51-9b3c00547c52", "error": null}, "c": 0}
event: delta
data: {"v": {"message": {"id": "aaa2b3d5-23e0-4f20-ab70-3b85a876994b", "author": {"role": "user", "name": null, "metadata": {}}, "create_time": 1728994048.550613, "update_time": null, "content": {"content_type": "text", "parts": ["你能介绍下什么是LLM大语言模型么?"]}, "status": "finished_successfully", "end_turn": null, "weight": 1.0, "metadata": {"serialization_metadata": {"custom_symbol_offsets": []}, "request_id": "8d2fb0609f82d769-NRT", "message_source": null, "timestamp_": "absolute", "message_type": null}, "recipient": "all", "channel": null}, "conversation_id": "670e5b00-3368-8008-bc51-9b3c00547c52", "error": null}, "c": 1}
event: delta
data: {"v": {"message": {"id": "b7830f0a-353a-44a4-bcc3-4b9a304d2f9f", "author": {"role": "assistant", "name": null, "metadata": {}}, "create_time": 1728994048.639616, "update_time": null, "content": {"content_type": "text", "parts": [""]}, "status": "in_progress", "end_turn": null, "weight": 1.0, "metadata": {"citations": [], "content_references": [], "gizmo_id": null, "message_type": "next", "model_slug": "gpt-4o", "default_model_slug": "auto", "parent_id": "aaa2b3d5-23e0-4f20-ab70-3b85a876994b", "model_switcher_deny": []}, "recipient": "all", "channel": null}, "conversation_id": "670e5b00-3368-8008-bc51-9b3c00547c52", "error": null}, "c": 2}
event: delta
data: {"p": "/message/content/parts/0", "o": "append", "v": "大"}
event: delta
data: {"v": "语言"}
event: delta
data: {"v": "模型"}
这种格式使用 自定义流式事件,目前 Coze 也是使用这类格式
event:message
data:{"message":{"role":"user","type":"ack","content":"你好,你是?","content_type":"text","message_id":"7425982033576443915","reply_id":"7425982033576443915","section_id":"7425981978379304995","extra_info":{"local_message_id":"0Xb_YQomIZJioJx1P5gzx","input_tokens":"","output_tokens":"","token":"","plugin_status":"","time_cost":"","workflow_tokens":"","bot_state":"","plugin_request":"","tool_name":"","plugin":"","mock_hit_info":"","log_id":"2024101520425801F9908687A224C64D3A","stream_id":"","message_title":"","stream_plugin_running":"","new_section_id":"","remove_query_id":"","execute_display_name":"","task_type":"","refer_format":"","call_id":""},"status":"","sender_id":"7380392476970287142","content_time":1728996178862,"message_index":"25","source":0},"is_finish":true,"index":0,"conversation_id":"7380392338814320659","seq_id":0}
event:message
data:{"message":{"role":"assistant","type":"verbose","content":"{"msg_type":"time_capsule_recall","data":"{\"wraped_text\":\"# 以下信息来源于用户与你对话,如有助于提升当前回答的完整性、准确性和亲切性,请适当使用:\\n1. “用户编写的信息”:用户主动提供的信息,若其他信息与此类信息有冲突,请优先采纳此类信息。\\n2. “用户画像信息”:用户的个人信息和喜好。\\n3. “用户记忆点信息”:可能与当前会话相关的记忆,每个记忆前标记了用户提供该信息的日期。\\n4. 避免使用隐私和敏感信息,除非用户主动提及;避免生硬插入历史信息;避免使用语义不清或不完整的历史信息。\\n5. 特别注意:避免过度啰嗦或主动罗列过多历史信息,保持对话风格的简洁(尤其在打招呼、主动发起话题等场景)。\\n6. 若无特别说明,默认主语为用户。\\n\\n\\n## 用户记忆点信息\\n1. [2024年06月20日] 感觉有点压力\",\"origin_search_results\":\"[{\\\"text\\\":\\\"感觉有点压力\\\",\\\"event_ms\\\":1718819716908,\\\"ext\\\":{\\\"total_message_cnt\\\":\\\"3\\\",\\\"filter_model_score\\\":\\\"0.92752\\\",\\\"finish_ms\\\":\\\"1718819730231\\\"},\\\"tags\\\":[\\\"subtype-self_conscious\\\"],\\\"item_type\\\":\\\"chat.essential_memory_v2\\\"}]\"}","from_module":null,"from_unit":null}","content_type":"text","message_id":"7425982033576525835","reply_id":"7425982033576443915","section_id":"7425981978379304995","extra_info":{"local_message_id":"","input_tokens":"","output_tokens":"","token":"","plugin_status":"","time_cost":"0.3","workflow_tokens":"","bot_state":"{"bot_id":"7380392476970287142","agent_name":"聊天机器人","agent_id":"7380392476970287142","awaiting":"7380392476970287142"}","plugin_request":"","tool_name":"","plugin":"","mock_hit_info":"","log_id":"2024101520425801F9908687A224C64D3A","stream_id":"","message_title":"","stream_plugin_running":"","new_section_id":"","remove_query_id":"","execute_display_name":"","task_type":"","refer_format":"","call_id":""},"status":"","sender_id":"7380392476970287142","content_time":0,"message_index":"0","source":0},"is_finish":true,"index":1,"conversation_id":"7380392338814320659","seq_id":1}
event:message
data:{"message":{"role":"assistant","type":"verbose","content":"{"msg_type":"knowledge_recall","data":"{\"chunks\":[]}"}","content_type":"text","message_id":"7425982033576624139","reply_id":"7425982033576443915","section_id":"7425981978379304995","extra_info":{"local_message_id":"","input_tokens":"","output_tokens":"","token":"","plugin_status":"","time_cost":"0.2","workflow_tokens":"","bot_state":"{"bot_id":"7380392476970287142","agent_name":"聊天机器人","agent_id":"7380392476970287142","awaiting":"7380392476970287142"}","plugin_request":"","tool_name":"","plugin":"","mock_hit_info":"","log_id":"2024101520425801F9908687A224C64D3A","stream_id":"","message_title":"","stream_plugin_running":"","new_section_id":"","remove_query_id":"","execute_display_name":"","task_type":"","refer_format":"","call_id":""},"status":"","sender_id":"7380392476970287142","content_time":0,"message_index":"0","source":0},"is_finish":true,"index":2,"conversation_id":"7380392338814320659","seq_id":2}
event:message
data:{"message":{"role":"assistant","type":"answer","content":"你","content_type":"text","message_id":"7425982040492802084","reply_id":"7425982033576443915","section_id":"7425981978379304995","extra_info":{"local_message_id":"","input_tokens":"484","output_tokens":"0","token":"484","plugin_status":"","time_cost":"","workflow_tokens":"","bot_state":"{"bot_id":"7380392476970287142","agent_name":"聊天机器人","agent_id":"7380392476970287142","awaiting":"7380392476970287142"}","plugin_request":"","tool_name":"","plugin":"","mock_hit_info":"","log_id":"2024101520425801F9908687A224C64D3A","stream_id":"0c3b69ecd9aa4d9ab76855beb4552518","message_title":"","stream_plugin_running":"","new_section_id":"","remove_query_id":"","execute_display_name":"","task_type":"","refer_format":"","call_id":""},"status":"","sender_id":"7380392476970287142","content_time":1728996179997,"message_index":"26","source":0},"is_finish":false,"index":3,"conversation_id":"7380392338814320659","seq_id":3}
简化后的格式是
event: 自定义事件名称
data: 事件数据
event: 自定义事件名称
data: 事件数据
event: 自定义事件名称
data: 事件数据
Dify 使用的 流式事件响应 结构截取如下
data: {"event": "agent_thought", "conversation_id": "0aa1619a-f0ff-43b4-ad04-9d2ef0381bb1", "message_id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "created_at": 1728996237, "task_id": "ab075c9b-4b58-46ba-8bd5-023456423697", "id": "f6b89ae5-542c-4e1f-8263-d4d9ae6381a6", "position": 1, "thought": "", "observation": "", "tool": "", "tool_labels": {}, "tool_input": "", "message_files": []}
data: {"event": "agent_message", "conversation_id": "0aa1619a-f0ff-43b4-ad04-9d2ef0381bb1", "message_id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "created_at": 1728996237, "task_id": "ab075c9b-4b58-46ba-8bd5-023456423697", "id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "answer": "\u4f60\u597d"}
data: {"event": "agent_message", "conversation_id": "0aa1619a-f0ff-43b4-ad04-9d2ef0381bb1", "message_id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "created_at": 1728996237, "task_id": "ab075c9b-4b58-46ba-8bd5-023456423697", "id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "answer": "\uff01"}
data: {"event": "agent_message", "conversation_id": "0aa1619a-f0ff-43b4-ad04-9d2ef0381bb1", "message_id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "created_at": 1728996237, "task_id": "ab075c9b-4b58-46ba-8bd5-023456423697", "id": "72e5bbef-dc79-439e-aa6b-51833560f62a", "answer": "\u6211\u662f"}
这种方式使用默认的 消息事件,通过在 输出数据 结构内定义 事件,简化后的结构为
data: {"event": "事件名", xxx}
data: {"event": "事件名", xxx}
而在 Flask 中,要实现 流式事件响应 ,其实也非常简单,创建一个 生成器(使用yield关键字),然后使用 flask 内置的 Response 并在传递 上下文 的同时设置 mimetype 为 text/event-stream 即可,示例如下
from typing import Generator
from pkg.response import Response
from flask import FlaskResponse, stream_with_context
def compact_generate_response(response: Union[Response, Generator]) -> FlaskResponse:
"""统一合并处理块内容输出与流式事件输出"""
# 1.检测是否为块输出,直接调用json
if isinstance(response, Response):
return json(response)
else:
# 2.response格式为生成器,代表本次执行流式事件输出
def generate() -> Generator:
"""构建generate,流式从response中获取内容"""
yield from response
# 3.返回携带上下文的流式事件输出
return FlaskResponse(
stream_with_context(generate()),
status=200,
mimetype="text/event-stream",
)
两种流式事件响应结构,没有任何差异,具体使用取决于业务的选择,如果想使用 postman 调试简单一些,可以使用 自定义事件,前端交互逻辑写法简单一些,可以使用 默认事件。
02. 两种响应设计方案与运行流程
我们可以使用 .stream() 或者 .astream() 方法来流式生成对应的内容,不过对于多节点的应用,LangGraph图结构程序 的 stream() 方法仅仅是将 每个节点 执行完毕的内容进行输出,对于节点内部的代码并不会执行 流式事件响应。
所以借用 队列(Queue)+线程(Thread) 的方式来重新实现这个逻辑,思路如下:
- 构建一个 队列(Queue),用于存储数据,队列的数据先进先出,并且是线程安全的,非常适合用于开发。
- 在 LangGraph 构建的 节点(nodes) 中,所有代码都使用 stream() 代替 invoke(),获取数据的时候,将数据添加到 队列(Queue) 中。
- 创建一条线程,专门用于执行 LangGraph图程序,这样不影响主线程。
- 在主线程监听 队列(Queue) 里的数据,并逐个取出,然后进行流式事件响应输出,直到取到结果为 None 的内容结束请求。
基于该思想, 流式事件响应 + 块内容响应 的两种输出响应运行流程如下