当前位置: 首页 > news >正文

【智能体cooragent】_process_workflow 结构拆解分析

这个函数是整个架构的核心,也是理解起来最难的地方,下面详细分析一下它的流程。

啥也不说,先列代码(后面分析)

async def _process_workflow(workflow: CompiledWorkflow, initial_state: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:"""处理自定义工作流的事件流"""current_node = Noneworkflow_id = initial_state["workflow_id"]yield { # start_of_workflow 时的返回值"event": "start_of_workflow","data": {"workflow_id": workflow_id, "input": initial_state["messages"]},}try:current_node = workflow.start_nodestate = State(**initial_state)while current_node != "__end__":agent_name = current_nodelogger.info(f"Started node: {agent_name}")yield {"event": "start_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}node_func = workflow.nodes[current_node]command = await node_func(state)if hasattr(command, "update") and command.update:for key, value in command.update.items():if key != "messages":state[key] = valueif key == "messages" and isinstance(value, list) and value:# State ignores coordinator messages, which not only lacks contextual benefits# but may also cause other unpredictable effects.if agent_name != "coordinator":state["messages"] += valuelast_message = value[-1]if "content" in last_message:if agent_name == "coordinator":content = last_message["content"]if content.startswith("handover"):# mark handoff, do not send maesagesglobal is_handoff_caseis_handoff_case = Truecontinueif agent_name in ["planner", "coordinator", "agent_proxy"]:content = last_message["content"]chunk_size = 10  # send 10 words for each chunkfor i in range(0, len(content), chunk_size):chunk = content[i : i + chunk_size]if "processing_agent_name" in state:agent_name = state["processing_agent_name"]yield {"event": "messages","agent_name": agent_name,"data": {"message_id": f"{workflow_id}_{agent_name}_msg_{i}","delta": {"content": chunk},},}await asyncio.sleep(0.01)if agent_name == "agent_factory" and key == "new_agent_name":yield {"event": "new_agent_created","agent_name": value,"data": {"new_agent_name": value,"agent_obj": agent_manager.available_agents[value],},}yield {"event": "end_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}next_node = command.gotocurrent_node = next_nodeyield {"event": "end_of_workflow","data": {"workflow_id": workflow_id,"messages": [{"role": "user", "content": "workflow completed"}],},}cache.dump(workflow_id, initial_state["workflow_mode"])except Exception as e:import tracebacktraceback.print_exc()logger.error("Error in Agent workflow: %s", str(e))yield {"event": "error","data": {"workflow_id": workflow_id,"error": str(e),},}

_process_workflow 函数返回一个字典到 run_agent_workflow ,这个返回值共有 7 中类型,分别在7个 yeild 处。

下面分别说明这 7 个 yeild 返回时的 event 类型:

其中有一个 while 循环,其循环依次代表一个智能体的从开始到结束,在while循环的外面:循环上面有一个yeild,下面有一个yeild,它们返回时的event分别表示start_of_workflow 和 end_of_workflow,进入while循环,第一个 yeild 返回时表示此时的event为 start_of_agent ,第二个返回时表示此时的 event 是 messages ,第三个返回时表示此时的 event 是 new_agent_created(比较特殊),第四个返回时表示此时的 event 是 end_of_agent,最后一个是 error 时的返回。

下面分析每次yeild返回前所执行的事情:

第一个:

    current_node = Noneworkflow_id = initial_state["workflow_id"]yield { # start_of_workflow 时的返回值"event": "start_of_workflow","data": {"workflow_id": workflow_id, "input": initial_state["messages"]},}

给节点赋值为none,因为此时还没开始第一个智能体的执行。


第二个:

try:current_node = workflow.start_nodestate = State(**initial_state)while current_node != "__end__":agent_name = current_nodelogger.info(f"Started node: {agent_name}")yield {"event": "start_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}

上面代码中显示的是第一次给节点更新值(后面可能是在这个while循环结尾给节点更新值,不会再执行while循环上面的两句了),状态更新


第三、四、五个:

            node_func = workflow.nodes[current_node]command = await node_func(state)if hasattr(command, "update") and command.update:for key, value in command.update.items():if key != "messages":state[key] = valueif key == "messages" and isinstance(value, list) and value:# State ignores coordinator messages, which not only lacks contextual benefits# but may also cause other unpredictable effects.if agent_name != "coordinator":state["messages"] += valuelast_message = value[-1]if "content" in last_message:if agent_name == "coordinator":content = last_message["content"]if content.startswith("handover"):# mark handoff, do not send maesagesglobal is_handoff_caseis_handoff_case = Truecontinueif agent_name in ["planner", "coordinator", "agent_proxy"]:content = last_message["content"]chunk_size = 10  # send 10 words for each chunkfor i in range(0, len(content), chunk_size):chunk = content[i : i + chunk_size]if "processing_agent_name" in state:agent_name = state["processing_agent_name"]yield {"event": "messages","agent_name": agent_name,"data": {"message_id": f"{workflow_id}_{agent_name}_msg_{i}","delta": {"content": chunk},},}await asyncio.sleep(0.01)if agent_name == "agent_factory" and key == "new_agent_name":yield {"event": "new_agent_created","agent_name": value,"data": {"new_agent_name": value,"agent_obj": agent_manager.available_agents[value],},}yield {"event": "end_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}

调用大模型获得其推理结果,随后的event也是不定的,不同的智能体阶段可能不同。

如果是messages,会一点一点地输出。(第三个yeild)

如果 agent_name == "agent_factory" and key == "new_agent_name",则返回第四个yeild

如果是end_of_agent,会直接到end_of_agent地yeild返回。(第五个yeild)


第六个:

            next_node = command.gotocurrent_node = next_nodeyield {"event": "end_of_workflow","data": {"workflow_id": workflow_id,"messages": [{"role": "user", "content": "workflow completed"}],},}

current_node = "__end__"时,while循环结束,执行一下节点更新,然后返回yeild


第七个:

第七个是发生错误时的 yeild 返回值。

http://www.dtcms.com/a/311243.html

相关文章:

  • 一维dp-序列类型-最长有效括号
  • XGBoost三部曲:XGBoost参数详解
  • 机械臂的轨迹生成的多种方案
  • 信号完整性、电源完整性与电磁兼容的含义
  • Removing Digits(Dynamic Programming)
  • SEA-RAFT:更简单、更高效、更准确的RAFT架构
  • 人工智能与交通:智能出行的变革与未来
  • OneCode 3.0表达式从语法到执行的全链路设计
  • 解锁智能油脂润滑系统:加速度与温振传感器选型协同攻略
  • 【隧道篇 / IPsec】(7.6) ❀ 02. 如何删除向导创建的IPsec安全隧道 (点对点) ❀ FortiGate 防火墙
  • 阿里云:Ubuntu系统部署宝塔
  • 【Go语言-Day 29】从time.Now()到Ticker:Go语言time包实战指南
  • eSIM技术深度解析:从物理芯片到数字革命
  • SAP 标准代码测试OO ALV案例分享
  • ubuntu22.04离线一键安装gpu版docker
  • Unity —— Android 应用构建与发布​
  • 社群团购市场选择与开源技术赋能下的下沉市场开拓策略研究——以开源AI智能名片、链动2+1模式与S2B2C商城小程序为例
  • 苹果MAC 安卓模拟器
  • 2561. 重排水果
  • 48Days-Day12 | 添加字符,数组变换,装箱问题
  • 2025牛客暑期多校训练营1(G,E,L,K,I)
  • 力扣 hot100 Day63
  • (LeetCode 面试经典 150 题) 138. 随机链表的复制 (哈希表)
  • Jupyter notebook如何显示行号?
  • 邮科工业交换机:互联网世界的“隐形守护者”
  • 【DL学习笔记】计算图与自动求导
  • K8S部署ELK(一):部署Filebeat日志收集器
  • 红黑树(RBTree)
  • Redis面试精讲 Day 7:GEO地理位置应用详解
  • Mysql在页内是怎么查找数据的?