AutoGen学习笔记系列(十)Advanced - Swarm
这篇文章瞄的是AutoGen官方教学文档 Advanced
章节中的 Swarm
篇章,介绍了怎样在Team中适时地转移控制权。
- 官网链接:https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/swarm.html# ;
Swarm
Swarm
是一个特殊的Team,其能力是允许内部的Agent通过特殊tool将自己的task 转移 给另一个Agent。Swarm
这概念首次是 OpenAI 在设计多Agent时提出的,微软在这里直接引入了相同的设计理念,与其他Team类型的对象一样,在一个Team内的所有Agent共享上下文。与我们之前使用的 SelectorGroupChat
最大的却别是 转移 任务。
【注意】:这里的核心在于Agent 主动放弃(即 hand off
),而上一篇文章使用的SelectorGroupChat
是通过LLM或者选择函数来指定下一个Agent是谁。
How Does It Work?
Swarm
的核心是一个Team中的Agent轮流生成响应,与 SelectorGroupChat
和RoundRobinGroupChat
类似,每个Agent都会将自己的信息广播出去,所有Agent共享一个上下文环境。
不同之处在于 Swarm
组成的Team在选择当前与LLM进行对话Agent是根据 上下文中最新的 HandoffMessage
信息,也就要求每个Agent都能返回HandoffMessage
格式的信息。对于 AssistantAgent
这个对象而言可以通过设置参数 handoffs
来允许其转移任务。那么整个流程大致按照下面的顺序:
- 如果想要Agent拥有转移task的能力,那么每个Agent都需要能够生成
HandoffMessage
类型的消息,主要是在你自定义Agent时需要注意这一点,对于AssistantAgent
这个对象而言只需要设置参数handoffs
就可以赋予其转移task的能力; - 当Team开始处理task时,第一个Agent会首先与LLM进行交互,然后在 本地 决定是否将自己的任务转移给其他Agent;
- 当一个Agent决定转移自己当前task时,会生成一个
HandoffMessage
类型数据来指定接受Agent,并且将 上下文信息 同步转移过去; - 整个Team会持续运行直到出发终止条件;
【Note】:官网在这里写一个提示是关于并行计算的,我们之前提到过Team本身是支持并行计算,同时有些模型也支持并行计算。那么当模型处于并行状态时如果同时调用了多个tool,是又可能生成多个 HandoffMessage
消息,这种情况下有可能导致无法预测的结果。AutoGen库中提供的两个模型 OpenAIChatCompletionClient
和 AzureOpenAIChatCompletionClient
可以通过设置参数 parallel_tool_calls=False
的形式禁用模型并行能力。其实就是提示你如果你想要自己定义一个model类,就需要想办法禁用模型并行能力。
Customer Support Example
首先是官网文档中第一个demo,实现了一个包含 “人在回路” 的Agent切换,实现了航班退款的演示,下图中各个Agent功能如下:
- Travel Agent:负责生成旅行计划与退款;
- Flights Refunder:使用
refund_flight
工具来执行退款;
Workflow
那么,整个demo的流程大致如下:
- Travel Agent 初始化task并且评估用户的输入;
- 对于用户请求而言存在以下两种情况:
- 退款相关:将任务转移给Flights Refunder进行处理;
- 查询信息相关:将任务转移给其他Agent;
- 当退款请求被批准后,Flights Refunder 使用专用工具
refund_flight
进行退款处理; - 当用户输入了任务后,会以
HandoffMessages
类型消息传递给Team,这个消息会直接发送给请求用户输入的Agent,即哪个Agent触发了人在回路,那用户的输入内容就会直接发送给这个Agent,无论中间是否有其他Agent介入; - 整个流程会在
Travel Agent
认为任务可以结束,或者出发了Team的终止条件后结束;
Tools & Agents
上面的流程提到了使用工具 refund_flight
来处理退款,其具体定义如下:
def refund_flight(flight_id: str) -> str:
"""Refund a flight"""
return f"Flight {flight_id} refunded"
- 定义模型,这里依旧使用的是
gpt-4o
,对于这种稍微有点复杂的任务而言,选择大模型能降低整个Team内部重复循环的概率:
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key="YOUR_API_KEY",
)
- Travel Agent 通过设置
handoffs
参数来指定可以使用哪些工具,以及触发转移后将task移交给谁:
travel_agent = AssistantAgent(
"travel_agent",
model_client=model_client,
handoffs=["flights_refunder", "user"],
system_message="""You are a travel agent.
The flights_refunder is in charge of refunding flights.
If you need information from the user, you must first send your message, then you can handoff to the user.
Use TERMINATE when the travel planning is complete.""",
)
- Flights Refunder 同样使用
handoffs
来制定任务转移对象:
flights_refunder = AssistantAgent(
"flights_refunder",
model_client=model_client,
handoffs=["travel_agent", "user"],
tools=[refund_flight],
system_message="""You are an agent specialized in refunding flights.
You only need flight reference numbers to refund a flight.
You have the ability to refund a flight using the refund_flight tool.
If you need information from the user, you must first send your message, then you can handoff to the user.
When the transaction is complete, handoff to the travel agent to finalize.""",
)
- 定义终止条件,这里设定当转移对象为
user
或者检测到TERMINATE
关键字时结束Team,分别对应了用户请求的 退款 和 查询信息 的任务:
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm([travel_agent, flights_refunder], termination_condition=termination)
- 定义任务:
task = "I need to refund my flight."
完整代码:
from typing import Any, Dict, List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
import asyncio, os
os.environ["OPENAI_API_KEY"] = "你的OpenAI API Key"
#-------------------------------------------------------#
# Step1. 定义用来退款的工具
def refund_flight(flight_id: str) -> str:
"""Refund a flight"""
return f"Flight {flight_id} refunded"
#-------------------------------------------------------#
# Step2. 定义模型
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key="YOUR_API_KEY",
)
#-------------------------------------------------------#
# Step3. 定义Travel Agent
travel_agent = AssistantAgent(
"travel_agent",
model_client=model_client,
handoffs=["flights_refunder", "user"],
system_message="""You are a travel agent.
The flights_refunder is in charge of refunding flights.
If you need information from the user, you must first send your message, then you can handoff to the user.
Use TERMINATE when the travel planning is complete.""",
)
#-------------------------------------------------------#
# Step4. 定义 Flights Refunder
flights_refunder = AssistantAgent(
"flights_refunder",
model_client=model_client,
handoffs=["travel_agent", "user"],
tools=[refund_flight],
system_message="""You are an agent specialized in refunding flights.
You only need flight reference numbers to refund a flight.
You have the ability to refund a flight using the refund_flight tool.
If you need information from the user, you must first send your message, then you can handoff to the user.
When the transaction is complete, handoff to the travel agent to finalize.""",
)
#-------------------------------------------------------#
# Step5. 定义终止条件
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm([travel_agent, flights_refunder], termination_condition=termination)
#-------------------------------------------------------#
# Step6. 下发任务
task = "I need to refund my flight."
async def run_team_stream() -> None:
task_result = await Console(team.run_stream(task=task))
last_message = task_result.messages[-1]
while isinstance(last_message, HandoffMessage) and last_message.target == "user":
user_message = input("User: ")
task_result = await Console(
team.run_stream(task=HandoffMessage(source="user", target=last_message.source, content=user_message))
)
last_message = task_result.messages[-1]
asyncio.run(
run_team_stream()
)
运行结果如下,下面的例子中需要注意的是,任务转移并在完成后会原路返回,这样做可以对外界屏蔽里面的细节,因为你如果要退航班的话也没有必要知道航司走了哪些部门,你只需要知道钱退回来了即可:
$ python demo.py
Stock Research Example
官网第二个任务是关于股票分析的,整个结构如下图所示,包含以下几个模块:
- Planner:基于其专业知识将特定任务分配给合适的Agent,确保每个Agent都能被高效利用,同时监控整个流程;
- Financial Analyst:使用工具
get_stock_data
来专门负责分析财务指标和股票; - News Analyst::使用工具
get_news
来专门收集并总计与股票相关的新闻; - Writer:将上面两个Agent得到的分析结果总结;
Workflow
对于这个任务的话,总体流程如下:
- Planner负责启动整个任务,并一步一步将其分配给合适的Agent;
- 每个Agent 独立 完成各自的任务,然后将自己的部分附加到上下文中并 共享,当Agent想要向LLM进行询问时其可以访问这个上下文。因为每个Agent可能需要与LLM进行多轮问询才能得到最终结果,所以在这个阶段并不会每询问一次就向Planner返回一次,而是等自己的task完成后再返回;
- 一旦某个Agent完成了指派的task,它就会将自己的控制权转移给planner;
- 整个循环会一直持续下去直到触发终止条件或planner认为可以终止了;
Tools & Agents
根据上迷的描述需要定义两个工具:
- Financial Analyst 的
get_stock_data
:
async def get_stock_data(symbol: str) -> Dict[str, Any]:
"""Get stock market data for a given symbol"""
return {"price": 180.25, "volume": 1000000, "pe_ratio": 65.4, "market_cap": "700B"}
- News Analyst 的
get_news
:
async def get_news(query: str) -> List[Dict[str, str]]:
"""Get recent news articles about a company"""
return [
{
"title": "Tesla Expands Cybertruck Production",
"date": "2024-03-20",
"summary": "Tesla ramps up Cybertruck manufacturing capacity at Gigafactory Texas, aiming to meet strong demand.",
},
{
"title": "Tesla FSD Beta Shows Promise",
"date": "2024-03-19",
"summary": "Latest Full Self-Driving beta demonstrates significant improvements in urban navigation and safety features.",
},
{
"title": "Model Y Dominates Global EV Sales",
"date": "2024-03-18",
"summary": "Tesla's Model Y becomes best-selling electric vehicle worldwide, capturing significant market share.",
},
]
- 定义模型并指定使用
gpt-4o
:
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key="YOUR_API_KEY",
)
- Planner:
planner = AssistantAgent(
"planner",
model_client=model_client,
handoffs=["financial_analyst", "news_analyst", "writer"],
system_message="""You are a research planning coordinator.
Coordinate market research by delegating to specialized agents:
- Financial Analyst: For stock data analysis
- News Analyst: For news gathering and analysis
- Writer: For compiling final report
Always send your plan first, then handoff to appropriate agent.
Always handoff to a single agent at a time.
Use TERMINATE when research is complete.""",
)
- Financial Analyst:
financial_analyst = AssistantAgent(
"financial_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_stock_data],
system_message="""You are a financial analyst.
Analyze stock market data using the get_stock_data tool.
Provide insights on financial metrics.
Always handoff back to planner when analysis is complete.""",
)
- News Analyst:
news_analyst = AssistantAgent(
"news_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_news],
system_message="""You are a news analyst.
Gather and analyze relevant news using the get_news tool.
Summarize key market insights from news.
Always handoff back to planner when analysis is complete.""",
)
- Writer:
writer = AssistantAgent(
"writer",
model_client=model_client,
handoffs=["planner"],
system_message="""You are a financial report writer.
Compile research findings into clear, concise reports.
Always handoff back to planner when writing is complete.""",
)
- 定义终止条件 termination,这里和官网有一些不同,我多添加了一个team内最大message条数限制,平时自己使用的时候用建议加一个,你可以把这个数设置大一些,因为即便是gpt-4o也是有可能出现无限循环:
text_termination = TextMentionTermination("TERMINATE")
max_termination = MaxMessageTermination(100)
termination = text_termination | max_termination
- 定义任务 task:
task = "Conduct market research for TSLA stock"
- 定义整个team,将Agent和中条条件结合起来:
research_team = Swarm(
participants=[planner, financial_analyst, news_analyst, writer], termination_condition=termination
)
完整代码:
from typing import Any, Dict, List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination, MaxMessageTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os, asyncio
os.environ["OPENAI_API_KEY"] = "你的OpenAI API Key"
#-----------------------------------------------------------#
# Part1. 定义两个工具
async def get_stock_data(symbol: str) -> Dict[str, Any]:
"""Get stock market data for a given symbol"""
return {"price": 180.25, "volume": 1000000, "pe_ratio": 65.4, "market_cap": "700B"}
async def get_news(query: str) -> List[Dict[str, str]]:
"""Get recent news articles about a company"""
return [
{
"title": "Tesla Expands Cybertruck Production",
"date": "2024-03-20",
"summary": "Tesla ramps up Cybertruck manufacturing capacity at Gigafactory Texas, aiming to meet strong demand.",
},
{
"title": "Tesla FSD Beta Shows Promise",
"date": "2024-03-19",
"summary": "Latest Full Self-Driving beta demonstrates significant improvements in urban navigation and safety features.",
},
{
"title": "Model Y Dominates Global EV Sales",
"date": "2024-03-18",
"summary": "Tesla's Model Y becomes best-selling electric vehicle worldwide, capturing significant market share.",
},
]
#-----------------------------------------------------------#
# Part2. 指定模型
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
)
#-----------------------------------------------------------#
# Part3. 定义四个Agent
planner = AssistantAgent(
"planner",
model_client=model_client,
handoffs=["financial_analyst", "news_analyst", "writer"],
system_message="""You are a research planning coordinator.
Coordinate market research by delegating to specialized agents:
- Financial Analyst: For stock data analysis
- News Analyst: For news gathering and analysis
- Writer: For compiling final report
Always send your plan first, then handoff to appropriate agent.
Always handoff to a single agent at a time.
Use TERMINATE when research is complete.""",
)
financial_analyst = AssistantAgent(
"financial_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_stock_data],
system_message="""You are a financial analyst.
Analyze stock market data using the get_stock_data tool.
Provide insights on financial metrics.
Always handoff back to planner when analysis is complete.""",
)
news_analyst = AssistantAgent(
"news_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_news],
system_message="""You are a news analyst.
Gather and analyze relevant news using the get_news tool.
Summarize key market insights from news.
Always handoff back to planner when analysis is complete.""",
)
writer = AssistantAgent(
"writer",
model_client=model_client,
handoffs=["planner"],
system_message="""You are a financial report writer.
Compile research findings into clear, concise reports.
Always handoff back to planner when writing is complete.""",
)
#-----------------------------------------------------------#
# Part4. 定义终止条件
text_termination = TextMentionTermination("TERMINATE")
max_termination = MaxMessageTermination(100)
termination = text_termination | max_termination
#-----------------------------------------------------------#
# Part5. 定义team
research_team = Swarm(
participants=[planner, financial_analyst, news_analyst, writer], termination_condition=termination
)
#-----------------------------------------------------------#
# Part6. 定义任务
task = "Conduct market research for TSLA stock"
asyncio.run(
Console(research_team.run_stream(task=task))
)
运行结果如下:
$ python demo.py
如果你往上翻可以看见多次 transfor_to_xxx
这类的信息,可以发现在这个Team中每个Agent都是完成planner指派给自己的任务后再将控制权转移回给Planner。