当前位置: 首页 > news >正文

Magentic-ui项目相关整理

协作相关

src/magentic_ui/
├── teams/
│   ├── orchestrator/
│   │   ├── _group_chat.py      # GroupChat 类定义,管理参与者的状态
│   │   ├── _orchestrator.py    # Orchestrator 类定义,维护对话的状态
│   │   └── orchestrator_config.py    # 相关的配置 
│   └── roundrobin_orchestrator.py    # 轮询状态的管理
├── agents/
│   ├── file_surfer/
│   │   └── _file_surfer.py
│   ├── web_surfer/
│   │   └── _web_surfer.py
│   └── users/
│       ├── _dummy_user_proxy.py
│       └── _metadata_user_proxy.py
├── backend/
│   ├── teammanager/
│       └── _teammanager.py
└── task_team.py

核心文件的核心函数

  • _group_chat.py:

    • __init__:智能体团队的创建

  • _orchestrator.py:

    • __init__:协调器的初始化

    • _orchestrate_step_planning:进行规划

    • _orchestrate_step_execution:分配合适的智能体进行执行

    • handle_agent_response:处理智能体的响应(例如:某个Agent:我已完成任务,xxx)

    • handle_start:将所有的消息转发给智能体,消息流转的入口点

  • _file_surfer.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _web_surfer.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

    • _handle_action:处理具体的网页操作,返回操作结果消息

    • _handle_tool_call:处理工具调用,返回工具调用结果消息

  • _dummy_user_proxy.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _metadata_user_proxy.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _teammanager.py:

    • run_stream:团队的运行相关,顶级

  • task_team.py:

    • get_task_team:创建智能体的实例

关系的一些区分处理:

handle_start 和 _orchestrate_step_execution之间的关系:

  1. 消息的流转过程

    1. handle_start 是入口点,负责接收原始消息并转发给所有智能体.

    2. 消息添加到message_histroy中保存

    3. 调用_orchestrate_step 开始协调过程,是进行计划还是分发给智能体执行。

    4. _orchestrate_step_execution 处理的是经过的具体消息,也就是planning后的消息

  2. 执行流程

    1. handle_start 启动整个对话流程

    2. _orchestrate_step 根据当前状态决定是进入规划模式还是执行模式

    3. orchestrate_step_execution 负责具体的执行阶段

  3. 通过self._state进行信息的共享

智能体之间的协作:

初始化阶段

  1. 智能体的创建(src\magentic_ui\task_team.py)下的get_task_team

    async def get_task_team(...):# 1. 创建各个智能体实例file_surfer = FileSurfer(name="file_surfer",model_client=model_client_file_surfer,work_dir=paths.internal_run_dir,bind_dir=paths.external_run_dir,model_context_token_limit=magentic_ui_config.model_context_token_limit,approval_guard=approval_guard,)# 2.创建并注册智能体team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)# 3. 初始化团队await team.lazy_init()return team
  2. 团队的创建(src\magentic_ui\teams\orchestrator\_group_chat.py)下的GroupChat,需要有参与的智能体,模型的客户端,协调器

    class GroupChat(BaseGroupChat, Component[GroupChatConfig]):def __init__(self,participants: List[ChatAgent],model_client: ChatCompletionClient,orchestrator_config: OrchestratorConfig,...):# 1. 调用父类初始化super().__init__(participants,group_chat_manager_name="Orchestrator",group_chat_manager_class=Orchestrator,...)# 2. 初始化内部变量self._orchestrator_config = orchestrator_configself._model_client = model_clientself._message_factory = MessageFactory()self._memory_provider = memory_provider
  3.  协调器的初始化(src\magentic_ui\teams\orchestrator\_orchestrator.py)下的Orchestrato
    class Orchestrator(BaseGroupChatManager):def __init__(self,name: str,group_topic_type: str,output_topic_type: str,participant_topic_types: List[str],participant_names: List[str],participant_descriptions: List[str],...):# 初始化基础属性self._model_client = model_clientself._config = configself._user_agent_topic = "user_proxy"self._web_agent_topic = "web_surfer"

消息相关

团队相关

teammanager.py VS task_team.py VS _orchestrator.py

  1. teammanager.py是管理团队的生命周期

  2. task_team.py只是团队的创建,不管生命周期

  3. _orchestrator.py仅是团队的内部协调,进行通信

用户相关

用户代理是如何加入到团队中?

# src\magentic_ui\task_team.py
# 用户代理的创建
user_proxy: DummyUserProxy | MetadataUserProxy | UserProxyAgentif magentic_ui_config.user_proxy_type == "dummy":user_proxy = DummyUserProxy(name="user_proxy")elif magentic_ui_config.user_proxy_type == "metadata":assert (magentic_ui_config.task is not None), "Task must be provided for metadata user proxy"assert (magentic_ui_config.hints is not None), "Hints must be provided for metadata user proxy"assert (magentic_ui_config.answer is not None), "Answer must be provided for metadata user proxy"user_proxy = MetadataUserProxy(name="user_proxy",description="Metadata User Proxy Agent",task=magentic_ui_config.task,helpful_task_hints=magentic_ui_config.hints,task_answer=magentic_ui_config.answer,model_client=model_client_orch,)else:user_proxy_input_func = make_agentchat_input_func(input_func)user_proxy = UserProxyAgent(description=USER_PROXY_DESCRIPTION,name="user_proxy",input_func=user_proxy_input_func,)# 轮询代理if websurfer_loop_team:# simplified team of only the web surferteam = RoundRobinGroupChat(participants=[web_surfer, user_proxy],max_turns=10000,)await team.lazy_init()return team# 由LLM智能选择team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)await team.lazy_init()return team

_metadata_user_proxy.py为例,它是创建复杂的用户代理,另一个是创建简单的用户代理

async def on_messages_stream(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:"""处理输入消息并生成响应流这是核心方法,处理所有消息逻辑"""# 1. 严格模式下的提示重写if (self.how_helpful == "strict"and self.helpful_task_hintsand self.helpful_task_hints != "Helpful hints are not available for this task."and self.rewritten_helpful_task_hints is None):self.rewritten_helpful_task_hints = await self._rewrite_helpful_hints(cancellation_token)# 2. 消息处理chat_messages = thread_to_context(list(messages),agent_name=self.name,is_multimodal=self._model_client.model_info["vision"],)self._chat_history.extend(chat_messages)# 3. 阶段判断if ("type" in messages[-1].metadataand messages[-1].metadata["type"] == "plan_message"):self.have_encountered_plan_message = Trueself.in_planning_phase = Trueelse:if self.have_encountered_plan_message:self.in_planning_phase = Falseelse:self.in_planning_phase = True# 4. 模型上下文准备await self._model_context.clear()system_message = SystemMessage(content=self._get_system_message())await self._model_context.add_message(system_message)# 5. 添加聊天历史for msg in self._chat_history:await self._model_context.add_message(msg)# 6. 获取token限制的历史记录token_limited_history = await self._model_context.get_messages()# 7. 规划阶段处理if self.in_planning_phase:if self.simulated_user_type in ["co-planning", "co-planning-and-execution"]:if (self.max_co_planning_rounds is Noneor self.current_co_planning_round < self.max_co_planning_rounds):# 生成规划响应result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "llm","helpful_task_hints": self.rewritten_helpful_task_hintsif self.rewritten_helpful_task_hintselse self.helpful_task_hints,},),inner_messages=[],)self.current_co_planning_round += 1else:# 达到最大规划轮次yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)else:# 非协作规划模式yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)# 8. 执行阶段处理else:if self.simulated_user_type in ["co-execution", "co-planning-and-execution"]:if (self.max_co_execution_rounds is Noneor self.current_co_execution_round < self.max_co_execution_rounds):# 生成执行响应result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"user_execution_reply": "llm"},),inner_messages=[],)self.current_co_execution_round += 1else:# 达到最大执行轮次yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,),)else:# 非协作执行模式yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,metadata={"user_execution_reply": "idk"},),)

感觉像用户代理这块,不需要进行LLM的调用,我认为的消息流程是:用户输入-->用户代理-->Orchestrator-->智能体-->LLM-->(智能体-->LLM)可能多个-->Orchesatrator-->用户代理-->用户界面的显示。由于在智能体那块以及调用了LLM,有了确切的解决,我认为正确是:

  • 用户代理只负责消息的传递和显示

  • Orchestrator负责任务的规划和分配(交给智能体)

  • 智能体负责调用LLM和处理任务

  • 响应通过用户代理直接显示给用户

原始消息从哪里来的,怎么传到handle_start?

 @rpcasync def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None:
# @rpc 是远程过程调用,运行函数被远程调用,就像调用本地函数一样
# message 来自于外部的调用.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py 
文件中的 run_stream 方法,就是负责将初始值(初始消息)通过 GroupChatStart 消息传递
给群聊管理器(GroupChatManager)的地方。具体流程如下:'''
对第一点的self.team的解释,实际上self.team就是代指GroupChat
路径:src\magentic_ui\teams\orchestrator\_group_chat.py下的run_stream谁去调用?
不会直接 new 一个 GroupChat 然后直接 run_stream,而是通过 TeamManager 统一管理
GroupChat.run_stream 的实际调用,主要是通过 TeamManager 的 self.team.run_stream 
实现的,这就是 “间接调用”
(本文件的)14,15,16行在src\magentic_ui\backend\teammanager\teammanager.py下的run_stream 调用
的313到315行代码中async for message in self.team.run_stream(task=task, cancellation_token=cancellation_token):
其中的self.team.run_stream也就代指了GroupChat.run_stream
src\magentic_ui\backend\teammanager\teammanager.py 的 run_stream 由
src\magentic_ui\backend\web\managers\connection.py 下start_stream方法的run_stream调用
'''
1.调用 self.team.run_stream(task=...),其中 task 可以是字符串、消息对象、
消息列表,或者为 None2.run_stream 方法会把 task 转换成 messages,然后封装到 GroupChatStart(
messages=messages) 这个消息对象里。(路径:.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py)if task is None:# 如果 task 是 None,则 messages 也是 Nonepasselif isinstance(task, str):# 如果 task 是字符串,就会被包装成一个 TextMessage,放到 messages 列表里messages = [TextMessage(content=task, source="user")]elif isinstance(task, BaseChatMessage):# 如果 task 是单个消息对象,也会被放到 messages 列表里。messages = [task]elif isinstance(task, list):# 如果 task 是消息对象列表,则直接赋值给 messages。...# 也就是把 messages 作为参数,封装进 GroupChatStart 消息对象里  499行await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)3.通过 self._runtime.send_message(...),把 GroupChatStart 
消息发送给群聊管理器(GroupChatManager)。
# .venv/Lib/site-packages/autogen_agentchat/teams/_group_chat/_base_group_chat.py 中的run_stream方法await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)
# recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id) 
# 明确指定了接收者就是群聊管理器, _group_chat_manager_topic_type 就是Orchestrator这个群聊管理器实例4.群聊管理器收到 GroupChatStart 后,会调用 handle_start 方法,正式启动群聊流程。
# 对于 GroupChatStart 消息,自动调用handle_start方法,message参数就是刚收到的 GroupChatStart消息

self.team是一个团队对象,类型是GroupChat,在业务层(比如 WebSocket、CLI、API)不会直接操作 GroupChat,而是通过 TeamManager 这个中间层来操作团队。现在到了GroupChat.run_stream了,它内部继承了.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py下的run_stream,这个就是上面代码框中描述的第二点。现在把消息封装到GroupChatStart

  1. WebSocket 路由/管理器调用 WebSocketManager.start_stream

  2. WebSocketManager.start_stream 调用 TeamManager.run_stream

  3. TeamManager.run_stream 内部调用 self.team.run_stream

  4. self.team 实际上就是 GroupChat 或其子类的实例

  5. 最终执行到 GroupChat.run_stream

相关文章:

  • 如何自动化测试 DependencyMatcher 规则效果(CI/CD 集成最佳实践)
  • 60天python训练计划----day52
  • Flutter 状态管理与 API 调用的完美结合:从理论到实践
  • RapidNJ软件的安装
  • 独立看门狗(IWDG)与窗口看门狗(WWDG)
  • 6.14星期六休息一天
  • 从0开始学习语言模型--Day01--亲自构筑语言模型的重要性
  • IPv4详解
  • Qt:Qt桌面程序正常退出注意事项
  • 陈小群飞机随笔总结
  • 【编译原理】第九章 运行时存储
  • linux msyql8 允许远程连接
  • 数据库资源帖
  • 第11次课 深搜1 A
  • Javascript什么是回调函数?
  • LangChain面试内容整理-知识点13:输出解析(OutputParser)模块
  • Seata的事务隔离级别是如何保证的?
  • 案例:塔能科技智启某市光域,勾勒城市照明宏图
  • NY248NY254美光科技闪存NY258NY261
  • 使用 C# 源生成器(Source Generators)进行高效开发:增强 Blazor 及其他功能
  • 网站建设工具有哪些/百度网页版入口链接
  • 门户网站建设自查整改/登封网站关键词优化软件
  • 怎么做网站推销产品/百度推广一年大概多少钱
  • 南昌谁做网站设计/自媒体账号申请
  • wordpress 超级卡/重庆百度关键词优化软件
  • 个人网站注册平台钱/磁力猫引擎入口