【智能体cooragent】新智能体创建相关代码解析
这里的新智能体创建使用的函数是 agent_factory_node ,它创建出来的智能体(输出):它的格式如下:
Agent(user_id='test', agent_name='traffic_planner', nick_name='traffic_planner', description='交通规划智能体,整合行程信息或相关智能体输出,安排出发/到达时间、路线规划、票价计算,生成详尽的交通计划。', llm_type=<LLMType.REASONING: 'reasoning'>, selected_tools=[Tool(name='maps_direction_driving', description='驾车路径规划 API 可以根据用户起终点经纬度坐标规划以小客车、轿车通勤出行的方案,并且返回通勤方案的数据。'), Tool(name='maps_direction_transit_integrated', description='根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市'), Tool(name='searchFlightItineraries', description='Search for purchasable flight options and the lowest price using the departure city three-letter code, arrival city three-letter code, and departure date. (e.g. BJS for Beijing, SHA for Shanghai, CAN for Guangzhou, HFE for Hefei).'), Tool(name='tavily_tool', description='A search engine optimized for comprehensive, accurate, and trusted results. Useful for when you need to answer questions about current events. Input should be a search query.')], prompt='# Role: 交通规划智能体 (Traffic Planner)\你是一个专门负责交通规划的智能体,名为 `traffic_planner`。你的核心功能是整合行程信息或者从其他智能体接收到的输出内容,安排出发/到达时间、路线规划、票价计算,并生成一份详细的交通计划。\# 步骤:\1. **接收输入**: \\\\- 输入可以包括行程详情(例如起点、终点、日期)、其他智能体的处理结果,或者其他相关信息。\2. **分析需求并调用工具**:\\\\- 使用 `maps_direction_driving` 工具规划驾车路线,若用户要求驾车方案。\\\\- 使用 `maps_direction_transit_integrated` 工具规划公共交通路线,涵盖火车、公交和地铁等多模式通勤选项。\\\\- 如果涉及航空旅行,则使用 `searchFlightItineraries` 获取不同航班的可购选项及最低价格。\\\\- 使用 `tavily_tool` 搜索实时票价、交通规则或任何额外的背景信息。\3. **生成交通计划**:\\\\- 根据所有收集到的信息,决定最佳的出发时间和到达时间。\\\\- 计算总费用,确保包含所有可能的隐性成本(如燃油费、过路费等)。\\\\- 将路线按优先级排序,考虑时效性和经济性。\4. **输出详细交通计划**:\\\\- 输出应为一份清晰易读的 Markdown 报告,包含以下部分:\\\\- 出发时间与到达时间\\\\- 路线建议(具体路线描述、交通工具)\\\\- 费用明细(票价、附加费用等)\# 注意事项:\- 确保所有输出的时间均为本地时区时间。\- 在提供多种选项时,需明确标注每种方案的优劣点(如最快、最便宜等)。\\\\- 若有不确定性(如实时交通变化),应在报告中说明可能存在变动。\\\\- 最终输出的报告需保持一致的格式,方便阅读与后续使用。')
随后在 _run_agent_workflow 中将这个格式转换为 .json 格式,
代码为
if event_type == "new_agent_created" and "data" in res and "agent_obj" in res["data"]:agent_obj: BaseModel = res["data"]["agent_obj"]agent_json = agent_obj.model_dump_json(indent=2) if agent_obj else Noneif agent_json:res["data"]["agent_obj"] = agent_jsonelse:logger.warning("Could not serialize agent object for new_agent_created event.")if "agent_obj" in res["data"]: del res["data"]["agent_obj"]
结果为:(转化前后对比)
序列化前 (原始对象):
# 复杂的 Agent 对象,无法直接传输或显示
agent_obj = Agent(agent_name="researcher",description="...",tools=[...],# ... 其他属性
)
序列化后 (JSON 字符串):
{"agent_name": "researcher","nick_name": "researcher", "description": "This agent specializes in research tasks...","user_id": "share","llm_type": "basic","selected_tools": [...],"prompt": "..."
}