AutoGen学习笔记系列(八)Advanced - Custom Agents
从这篇文章开始将进入 AutoGen 库官方教程中的进阶部分内容 Advanced
,对于新手而言必须确保这个系列文章的 Tutorial
部分(共计七篇)已经 完全看完 并 付诸实践(或者已经亲手敲完官方教程中的代码),因为从这篇文章开始我会默认你已经掌握AutoGen中的基础操作,一些细节会一笔带过不再赘述。
其次,在叙述方式上会有一些变动,比如对类会单独拉出来描述,其原因一方面是后面的代码越来越长,如果都放在一块会不太友好,另一方面是对于类的注释可以写的更详细些。但也请放心,在每一小段的末尾都会提供完成且能够直接运行的代码。
下面是这个系列笔记的 Tutorial
部分内容链接:
- AutoGen学习笔记系列(一)Tutorial - Model
- AutoGen学习笔记系列(二)Tutorial - Messages
- AutoGen学习笔记系列(三)Tutorial - Agents
- AutoGen学习笔记系列(四)Tutorial -Teams
- AutoGen学习笔记系列(五)Tutorial -Human-in-the-Loop
- AutoGen学习笔记系列(六)Tutorial - Termination
- AutoGen学习笔记系列(七)Tutorial - Managing State
首先复习下往期的重点知识:
- 与LLM之间交互的最小执行单元是Agent;
- 多个Agent可以组合成一个Team,它维护了所有Agent的上下文环境并且能够暂停、恢复、终止;
- Team内部支持并发,但你要在恰当的时机让里面快的Agent等等慢的Agent;
- 使用 “人在回路“ 的方式可以让Team在它认为的关键节点处等待你的输入,有你来决定Team的走向;
- 使用Team的终止条件,这样可以节省你宝贵的Token,同时避免Team陷入死循环;
- AutoGen提供了非常简便的方式用来将Agent/Team的状态写入磁盘或从磁盘读取;
这篇文章瞄准的是官方文档中 Advanced
部分 Custom Agents
小节,官网链接如下:
- 官方文档: https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/custom-agents.html# ;
Custom Agents
AutoGen框架同样提供了自定义Agent的方式,根据官方介绍所有的Agent都继承于基类 BaseChatAgent
并且需要重写一下方法和属性:
on_messages()
: 实际响应task的行为,同时在run()
函数被调用时使用,需要返回一个Response
对象;on_reset()
: 重置Agent状态,用于清空上下文环境;produced_message_types
: Agent能够生成的ChatMessage
类型的数据列表;- 【可选】
on_messages_stream()
:用于流式输出LLM的结果,如果不实现则会使用默认的方式将结果进行输出;
从上面来看,自定义Agent时需要覆写的部分很少,但仍然需要注意将类的注释部分写明确并尽可能保证没有歧义。
CountDownAgent
官网上的demo给了一个实现倒计数功能的Agent,这个Agent其实非常常用,只需要添加一行time.sleep(1)
就可以将其改造成一个 倒计时 Agent,然后你就可以在Team中用这个Agent实现流程管理,时间一到就触发任务中断让整个Team等待人工介入。
CountDownAgent
类实现:
# 自定义Agent必须继承 BaseChatAgent 基类
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
# 重写函数 produced_message_types 并修饰成属性
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
# 重写 on_messages 方法,功能是通过另一个异步生成器 on_messages_stream 获取相应,并确保返回了一个 Response 对象
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
# 重写 on_messages_stream 方法,功能是生成一系列的AgentEvent、ChatMessage、Response类型数据,然后用yield发送出去
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
inner_messages: List[AgentEvent | ChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
time.sleep(1) # 在这里加一句话就变成了倒计时Agent
yield msg
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
# 重写 on_reset 方法,这里因为不涉及上下文信息,所以直接返回
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
完整代码:
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken
import asyncio, time
#-----------------------------------------
# 将上面的 CountDownAgent 部分代码复制到这里
#-----------------------------------------
async def run_countdown_agent() -> None:
countdown_agent = CountDownAgent("countdown")
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
asyncio.run(run_countdown_agent())
运行结果如下:
$ python demo.py
3...
2...
1...
Done!
ArithmeticAgent
第二个例子是一个算数运行Agent,通过_operator_func
接受一个整型数字,然后返回一个整型,这个过程都在 on_message()
函数中实现。
ArithmeticAgent
类实现:
# 算数运算 Agent
class ArithmeticAgent(BaseChatAgent):
def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[ChatMessage] = []
# 重写函数 produced_message_types 并修饰成一个属性
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
# 重写函数 on_messages
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
self._message_history.extend(messages)
# 解析传入信息中的最后一个数字
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
# 用成员函数 _operator_func 计算这个数
result = self._operator_func(number)
# 准备一个新的 TextMessage 数据作为返回值
response_message = TextMessage(content=str(result), source=self.name)
# 更新交互的信息,将本地对话内容追加到历史记录中
self._message_history.append(response_message)
# 返回响应
return Response(chat_message=response_message)
# 重写 on_reset
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
【Note】:AutoGen允许 on_message
函数的输入列表为空,这个时候会自动调用上一次的对话内容重新执行一次,因此维护好历史沟通记录非常重要。
上面的自定义的类就是通过on_message()
中 self._message_history.append(response_message)
这句话维护了历史沟通信息。
此时你可能会好奇为什么在官方的Note中提到当on_message()
函数输入为空时,会重新请求LLM生成一下,其实这并不是每个Agent框架都支持这么做,这里只是AutoGen被这么设计了,原因是防止你对这次生成结果不满意,重新调用后不需要再填写相同的内容了。
我们可以做一下验证,将上面 Custom Agents
示例中的函数 run_countdown_agent()
修改成下面的内容:
async def run_countdown_agent() -> None:
countdown_agent = CountDownAgent("countdown")
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
# 在普通调用结束后,对生成结果不满意,使用空传入重新生成一次
print('-' * 50)
final_res = await countdown_agent.run() # 新增:传入空让Agent再运行一次
for iter in response.messages:
print(iter.content)
运行结果如下,可以发现这里打印了两次倒计时,但要注意的是因为我们在上面吊用的是 run()
而不是 on_messages_stream()
所以第二次调用会在整个倒计时完成后一次性输出所有的结果:
$ python temp.py
3...
2...
1...
Done!
--------------------------------------------------
3...
2...
1...
Done!
我们继续回到官方教程中,定义好上面的算数Agent后就可以创建一个由5个Agent组合的Team,功能分别如下:
- 对输入的数
x+1
; - 对输入的数
x-1
; - 对输入的数
x*2
; - 对输入的数
floor(x/2)
; - 直接返回输入的数
x
;
上面功能的实现代码如下:
async def run_number_agents() -> None:
# 五个对输入数字操作的功能
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)
# 设置终止条件为完成组内10次交流
termination_condition = MaxMessageTermination(10)
# 创建Team
selector_group_chat = SelectorGroupChat(
[add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=termination_condition,
allow_repeated_speaker=True, # Allow the same agent to speak multiple times, necessary for this task.
selector_prompt=(
"Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
"Current conversation history:\n{history}\n"
"Please select the most appropriate role for the next message, and only return the role name."
),
)
# 定义两个任务列表
# 第一个任务是明确LLM要做的事:将给定的一个值想办法计算得到25
# 第二个任务是给LLM提供一个输入值,即为10
task: List[ChatMessage] = [
TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
TextMessage(content="10", source="user"),
]
stream = selector_group_chat.run_stream(task=task)
await Console(stream)
# 运行任务
asyncio.run(run_number_agents())
完整的demo代码如下:
from typing import Callable, Sequence, List
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import TextMessage
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import ChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
import asyncio, os
os.environ["OPENAI_API_KEY"] = "你的OpenAI API Key"
class ArithmeticAgent(BaseChatAgent):
def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[ChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
self._message_history.extend(messages)
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
result = self._operator_func(number)
response_message = TextMessage(content=str(result), source=self.name)
self._message_history.append(response_message)
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_number_agents() -> None:
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)
termination_condition = MaxMessageTermination(10)
selector_group_chat = SelectorGroupChat(
[add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=termination_condition,
allow_repeated_speaker=True,
selector_prompt=(
"Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
"Current conversation history:\n{history}\n"
"Please select the most appropriate role for the next message, and only return the role name."
),
)
task: List[ChatMessage] = [
TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
TextMessage(content="10", source="user"),
]
stream = selector_group_chat.run_stream(task=task)
await Console(stream)
asyncio.run(run_number_agents())
运行结果如下,在我这次的运行中LLM依次对 10
进行了 +1, *2, +1, +1, +1, +1, =, =, =, =
共计10
次操作,最终触发终止的条件是组内轮询次数到达10次:
$ python demo.py
---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- add_agent ----------
11
---------- multiply_agent ----------
22
---------- add_agent ----------
23
---------- add_agent ----------
24
---------- add_agent ----------
25
---------- identity_agent ----------
25
---------- identity_agent ----------
25
---------- identity_agent ----------
25
Using Custom Model Clients in Custom Agents
官方文档在下面这个小节中介绍了如何使用更多的第三方或自定义模型,如果我们在 vscode
导入 autogen_ext.models
包的话,可以发现目前官方适配的模型厂商就五个,分别为 openai
、azure
、cache
、replay
和 semantic_kernel
:
如果我们想要使用 DeepSeek
或者 Gemini
就需要自己定义一个Agent对象。官方教程在这一小节中给我们展示的是 Gemini
,在此之前需要我们去官网申请一个 免费 的API KEY(还是Google有钱):
- 官网链接:https://aistudio.google.com/prompts/new_chat;
进入到Google AI Studio并登录后点击左上角的按钮即可获得一个免费API KEY,这个KEY同样需要找一个地方记录下来:
然后根据需求安装依赖库:
$ pip install google-genai
然后就是代码部分,依旧需要继承基类 BaseChatAgent
并且重写1个属性+3个方法,完整代码如下:
【注意】:官方demo少导入两个包分别为 autogen_agentchat.messages.TextMessage
和 autogen_agentchat.ui.Console
我这里加上了。
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types
import asyncio, os
os.environ["GEMINI_API_KEY"] = "你的Gemini API KEY"
class GeminiAssistantAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
# 将当前message添加到模型的上下文中
for msg in messages:
await self._model_context.add_message(UserMessage(content=msg.content, source=msg.source))
# 获取沟通的历史记录
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# 调用Gemini生成回复
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# 使用google.genai的库将模型响应格式化
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# 将响应添加到模型上下文中
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
asyncio.run(
Console(gemini_assistant.run_stream(task="What is the capital of New York?"))
)
运行结果如下:
$ python demo.py
---------- user ----------
What is the capital of New York?
---------- gemini_assistant ----------
Albany
TERMINATE
然后官方又展示了一下如何将 OpenAI
和上面自定义的 Gemini
Agent组合成一个Team并运行,实现了如下功能:
- OpenAI Agent 生成四行诗歌;
- Gemini Agent 对其进行打分与评价;
如果你之前有认真跟随笔记的话这部分基本没理解困难,合并后的完整代码如下:
# !pip install google-genai
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent, AssistantAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from google import genai
from google.genai import types
import asyncio, os
os.environ["OPENAI_API_KEY"] = "你的OpenAI API Key"
os.environ["HF_TOKEN"] = "你的HuggingFace Token"
class GeminiAssistantAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
# 将当前message添加到模型的上下文中
for msg in messages:
await self._model_context.add_message(UserMessage(content=msg.content, source=msg.source))
# 获取沟通的历史记录
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# 调用Gemini生成回复
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# 使用google.genai的库将模型响应格式化
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# 将响应添加到模型上下文中
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
#----------------------------------------------------------------#
# 定义一个OpenAI Agent
primary_agent = AssistantAgent(
"primary",
model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
system_message="You are a helpful AI assistant.",
)
# 定义一个Gemini Agnent
gemini_critic_agent = GeminiAssistantAgent(
"gemini_critic",
system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)
# 设定自内轮询终止条件为捕捉到 APPROVE 或者达到 10次轮询
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)
# 运行Team
asyncio.run(
Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
)
运行如下,可以看到在这两个顶级LLM交锋3轮(总计7条内部轮询)后程序自动完成,你们运行后基本也差不多在这个次数,快的话在第一轮沟通结束后就会触发 APPROVE
条件:
$ python demo.py
Making the Custom Agent Declarative
上面我们实现了如何自定义一个Agent,官方紧接着给我们展示了如何将自定义Agent的 配置 保存并加载,这里需要额外实现一个类 GeminiAssistantAgentConfig
并继承 Component
,然后再重写 GeminiAssistantAgent
中的 _from_config
和 _to_config
函数。
【注意】:这里说的是 配置,而不是 状态,前者主要是对这个Agent的基本信息描述,后者才是Agent的记忆。在先前的一篇文章中介绍了如何保存Agent的 状态。
- AutoGen学习笔记系列(七)Tutorial - Managing State:https://blog.csdn.net/nenchoumi3119/article/details/146048857?spm=1001.2014.3001.5501 ;
通常情况下保存Agent配置和状态会同时进行,因为保存的目的在于便于共享与备份,特别是对于自定义的Agent而言保存这两个信息就尤为重要,因为内置的Agent配置不需要保存只需要加载状态即可。
GeminiAssistantAgentConfig
类实现:
# 实现一个描述Agent配置的类
class GeminiAssistantAgentConfig(BaseModel):
name: str
description: str = "An agent that provides assistance with ability to use tools."
model: str = "gemini-1.5-flash-002"
system_message: str | None = None
GeminiAssistantAgent
类 新增 的部分:
class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]):
component_config_schema = GeminiAssistantAgentConfig
# 为了实现配置保存与加载配置需要额外重写的两个成员函数
@classmethod
def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
return cls(
name=config.name, description=config.description, model=config.model, system_message=config.system_message
)
def _to_config(self) -> GeminiAssistantAgentConfig:
return GeminiAssistantAgentConfig(
name=self.name,
description=self.description,
model=self._model,
system_message=self._system_message,
)
完整代码:
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core.models import UserMessage, AssistantMessage, RequestUsage
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core import CancellationToken, Component
from pydantic import BaseModel
from typing_extensions import Self
from google import genai
from google.genai import types
import asyncio, os
os.environ["GEMINI_API_KEY"] = "你的Gemini API Key"
class GeminiAssistantAgentConfig(BaseModel):
name: str
description: str = "An agent that provides assistance with ability to use tools."
model: str = "gemini-1.5-flash-002"
system_message: str | None = None
class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]):
component_config_schema = GeminiAssistantAgentConfig
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
for msg in messages:
await self._model_context.add_message(UserMessage(content=msg.content, source=msg.source))
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
@classmethod
def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
return cls(
name=config.name, description=config.description, model=config.model, system_message=config.system_message
)
def _to_config(self) -> GeminiAssistantAgentConfig:
return GeminiAssistantAgentConfig(
name=self.name,
description=self.description,
model=self._model,
system_message=self._system_message,
)
#------------------------------------------------------------------
# 创建第一个Agent
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
config = gemini_assistant.dump_component()
print(config.model_dump_json(indent=2))
# 用第一个Agetn的配置信息初始化另一个Agent
loaded_agent = GeminiAssistantAgent.load_component(config)
print(loaded_agent)
运行结果如下:
$ python demo.py
{
"provider": "__main__.GeminiAssistantAgent",
"component_type": "agent",
"version": 1,
"component_version": 1,
"description": null,
"label": "GeminiAssistantAgent",
"config": {
"name": "gemini_assistant",
"description": "An agent that provides assistance with ability to use tools.",
"model": "gemini-1.5-flash-002",
"system_message": "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed."
}
}
<__main__.GeminiAssistantAgent object at 0x104665b70>