【智能体cooragent】_process_workflow 结构拆解分析
这个函数是整个架构的核心,也是理解起来最难的地方,下面详细分析一下它的流程。
啥也不说,先列代码(后面分析)
async def _process_workflow(workflow: CompiledWorkflow, initial_state: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:"""处理自定义工作流的事件流"""current_node = Noneworkflow_id = initial_state["workflow_id"]yield { # start_of_workflow 时的返回值"event": "start_of_workflow","data": {"workflow_id": workflow_id, "input": initial_state["messages"]},}try:current_node = workflow.start_nodestate = State(**initial_state)while current_node != "__end__":agent_name = current_nodelogger.info(f"Started node: {agent_name}")yield {"event": "start_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}node_func = workflow.nodes[current_node]command = await node_func(state)if hasattr(command, "update") and command.update:for key, value in command.update.items():if key != "messages":state[key] = valueif key == "messages" and isinstance(value, list) and value:# State ignores coordinator messages, which not only lacks contextual benefits# but may also cause other unpredictable effects.if agent_name != "coordinator":state["messages"] += valuelast_message = value[-1]if "content" in last_message:if agent_name == "coordinator":content = last_message["content"]if content.startswith("handover"):# mark handoff, do not send maesagesglobal is_handoff_caseis_handoff_case = Truecontinueif agent_name in ["planner", "coordinator", "agent_proxy"]:content = last_message["content"]chunk_size = 10 # send 10 words for each chunkfor i in range(0, len(content), chunk_size):chunk = content[i : i + chunk_size]if "processing_agent_name" in state:agent_name = state["processing_agent_name"]yield {"event": "messages","agent_name": agent_name,"data": {"message_id": f"{workflow_id}_{agent_name}_msg_{i}","delta": {"content": chunk},},}await asyncio.sleep(0.01)if agent_name == "agent_factory" and key == "new_agent_name":yield {"event": "new_agent_created","agent_name": value,"data": {"new_agent_name": value,"agent_obj": agent_manager.available_agents[value],},}yield {"event": "end_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}next_node = command.gotocurrent_node = next_nodeyield {"event": "end_of_workflow","data": {"workflow_id": workflow_id,"messages": [{"role": "user", "content": "workflow completed"}],},}cache.dump(workflow_id, initial_state["workflow_mode"])except Exception as e:import tracebacktraceback.print_exc()logger.error("Error in Agent workflow: %s", str(e))yield {"event": "error","data": {"workflow_id": workflow_id,"error": str(e),},}
_process_workflow 函数返回一个字典到 run_agent_workflow ,这个返回值共有 7 中类型,分别在7个 yeild 处。
下面分别说明这 7 个 yeild 返回时的 event 类型:
其中有一个 while 循环,其循环依次代表一个智能体的从开始到结束,在while循环的外面:循环上面有一个yeild,下面有一个yeild,它们返回时的event分别表示start_of_workflow 和 end_of_workflow,进入while循环,第一个 yeild 返回时表示此时的event为 start_of_agent ,第二个返回时表示此时的 event 是 messages ,第三个返回时表示此时的 event 是 new_agent_created(比较特殊),第四个返回时表示此时的 event 是 end_of_agent,最后一个是 error 时的返回。
下面分析每次yeild返回前所执行的事情:
第一个:
current_node = Noneworkflow_id = initial_state["workflow_id"]yield { # start_of_workflow 时的返回值"event": "start_of_workflow","data": {"workflow_id": workflow_id, "input": initial_state["messages"]},}
给节点赋值为none,因为此时还没开始第一个智能体的执行。
第二个:
try:current_node = workflow.start_nodestate = State(**initial_state)while current_node != "__end__":agent_name = current_nodelogger.info(f"Started node: {agent_name}")yield {"event": "start_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}
上面代码中显示的是第一次给节点更新值(后面可能是在这个while循环结尾给节点更新值,不会再执行while循环上面的两句了),状态更新
第三、四、五个:
node_func = workflow.nodes[current_node]command = await node_func(state)if hasattr(command, "update") and command.update:for key, value in command.update.items():if key != "messages":state[key] = valueif key == "messages" and isinstance(value, list) and value:# State ignores coordinator messages, which not only lacks contextual benefits# but may also cause other unpredictable effects.if agent_name != "coordinator":state["messages"] += valuelast_message = value[-1]if "content" in last_message:if agent_name == "coordinator":content = last_message["content"]if content.startswith("handover"):# mark handoff, do not send maesagesglobal is_handoff_caseis_handoff_case = Truecontinueif agent_name in ["planner", "coordinator", "agent_proxy"]:content = last_message["content"]chunk_size = 10 # send 10 words for each chunkfor i in range(0, len(content), chunk_size):chunk = content[i : i + chunk_size]if "processing_agent_name" in state:agent_name = state["processing_agent_name"]yield {"event": "messages","agent_name": agent_name,"data": {"message_id": f"{workflow_id}_{agent_name}_msg_{i}","delta": {"content": chunk},},}await asyncio.sleep(0.01)if agent_name == "agent_factory" and key == "new_agent_name":yield {"event": "new_agent_created","agent_name": value,"data": {"new_agent_name": value,"agent_obj": agent_manager.available_agents[value],},}yield {"event": "end_of_agent","data": {"agent_name": agent_name,"agent_id": f"{workflow_id}_{agent_name}_1",},}
调用大模型获得其推理结果,随后的event也是不定的,不同的智能体阶段可能不同。
如果是messages,会一点一点地输出。(第三个yeild)
如果 agent_name == "agent_factory" and key == "new_agent_name",则返回第四个yeild
如果是end_of_agent,会直接到end_of_agent地yeild返回。(第五个yeild)
第六个:
next_node = command.gotocurrent_node = next_nodeyield {"event": "end_of_workflow","data": {"workflow_id": workflow_id,"messages": [{"role": "user", "content": "workflow completed"}],},}
current_node = "__end__"时,while循环结束,执行一下节点更新,然后返回yeild
第七个:
第七个是发生错误时的 yeild 返回值。