当前位置: 首页 > news >正文

基于CAMEL 的Workforce 实现多智能体协同工作系统

文章目录

    • 一、workforce 简介
      • 1.架构设计
      • 2.通信机制
    • 二、workforce 工作流程图示例
      • 1.用户角色
      • 2.工作流程
    • 三、workforce 中重要函数说明
      • 1.`__init__`函数
      • 2.`add_single_agent_worker` 函数
      • 3.`add_role_playing_worker` 函数
      • 4.`add_workforce` 函数
    • 四、基于workforce实现多智能体协调
      • (1)创建一个 Workforce 实例
      • (2)worker定义
      • (3)添加到Workforce 的工作节点
      • (4)创建一个任务
      • (5)启动 Workforce 的任务处理流程
      • 完整示例代码
    • 五、遇到问题

一、workforce 简介

Workforce是CAMEL框架中的一个多智能体协同工作系统。它以一种简洁的方式让多个智能体协作完成任务,类似于一个高效的团队合作系统。

1.架构设计

Workforce采用层级架构设计。一个workforce可以包含多个工作节点(worker nodes),每个工作节点可以包含一个或多个智能体作为工作者。工作节点由workforce内部的**协调智能体(coordinator agent)**管理,协调智能体根据工作节点的描述及其工具集来分配任务。

2.通信机制

Workforce内部的通信基于任务通道(task channel)。Workforce初始化时会创建一个所有节点共享的通道。任务会被发布到这个通道中,每个工作节点会监听通道,接受分配给它的任务并解决。当任务完成后,工作节点会将结果发布回通道,结果会作为其他任务的"依赖项"保留在通道中,供所有工作节点共享。

通过这种机制,workforce能够以协作和高效的方式解决任务。

二、workforce 工作流程图示例

如图,展示了如何通过以下智能体协作完成一个请求,即“创建一个产品的登录页面”

在这里插入图片描述

1.用户角色

  • Root Node (Manager):作为系统的管理者,负责接收任务并协调任务的分解和分配。

  • Coordinator Agent (协调智能体)Task Manager Agent (任务管理智能体):管理任务分解、依赖关系、分发任务,以及监控任务完成情况。

  • Leaf Nodes (Workers):执行任务的实际智能体,分别承担不同的角色(如“内容撰写者”和“代码撰写者”)。

2.工作流程

(a) 用户需求的接收

用户发出任务请求(例如“创建一个登录页面”,图中 1)。

Coordinator Agent 接收需求,作为入口点。

(b) 任务分解与定义

Coordinator Agent 通过任务分解策略,将请求拆分为多个子任务(如任务 A.1 和 A.2),并定义:

这些任务被送到 Task Manager Agent 进行分发(图中 2 和 3)。

© 任务的分配

Task Manager Agent 将任务分发到 Channel(图中 4),这是一个任务管理中枢。

任务按角色需求分配到 Leaf Nodes (Workers),包括:

(d) Leaf Nodes 执行任务

内容撰写者 (Content Writer)

代码撰写者 (Code Writer)

(e) 结果整合与返回

Coordinator Agent 汇总所有任务结果(如 A.1 和 A.2 的结果)。

将完整的任务结果返回给用户(图中 18)。

三、workforce 中重要函数说明

1.__init__函数

    def __init__(
        self,
        description: str,
        children: Optional[List[BaseNode]] = None,
        coordinator_agent_kwargs: Optional[Dict] = None,
        task_agent_kwargs: Optional[Dict] = None,
        new_worker_agent_kwargs: Optional[Dict] = None,
    ) -> None:
        super().__init__(description)
        self._child_listening_tasks: Deque[asyncio.Task] = deque()
        self._children = children or []
        self.new_worker_agent_kwargs = new_worker_agent_kwargs

        coord_agent_sys_msg = BaseMessage.make_assistant_message(
            role_name="Workforce Manager",
            content="You are coordinating a group of workers. A worker can be "
            "a group of agents or a single agent. Each worker is "
            "created to solve a specific kind of task. Your job "
            "includes assigning tasks to a existing worker, creating "
            "a new worker for a task, etc.",
        )
        self.coordinator_agent = ChatAgent(
            coord_agent_sys_msg, **(coordinator_agent_kwargs or {})
        )

        task_sys_msg = BaseMessage.make_assistant_message(
            role_name="Task Planner",
            content="You are going to compose and decompose tasks.",
        )
        self.task_agent = ChatAgent(task_sys_msg, **(task_agent_kwargs or {}))

        # If there is one, will set by the workforce class wrapping this
        self._task: Optional[Task] = None
        self._pending_tasks: Deque[Task] = deque()

其中coordinator_agent的本质为ChatAgent,role_name=“Workforce Manager”,对应的提示词为:

"You are coordinating a group of workers. A worker can be "
            "a group of agents or a single agent. Each worker is "
            "created to solve a specific kind of task. Your job "
            "includes assigning tasks to a existing worker, creating "
            "a new worker for a task, etc."

task_agentt的本质也是ChatAgent,role_name=“Task Planner”,对应的提示词为:

"You are going to compose and decompose tasks."

2.add_single_agent_worker 函数

    @check_if_running(False)
    def add_single_agent_worker(
        self, description: str, worker: ChatAgent
    ) -> Workforce:
        r"""Add a worker node to the workforce that uses a single agent.

        Args:
            description (str): Description of the worker node.
            worker (ChatAgent): The agent to be added.

        Returns:
            Workforce: The workforce node itself.
        """
        worker_node = SingleAgentWorker(description, worker)
        self._children.append(worker_node)
        return self

调用add_single_agent_worker时会添加单个工作节点worker_node.

3.add_role_playing_worker 函数

    def add_role_playing_worker(
        self,
        description: str,
        assistant_role_name: str,
        user_role_name: str,
        assistant_agent_kwargs: Optional[Dict] = None,
        user_agent_kwargs: Optional[Dict] = None,
        chat_turn_limit: int = 3,
    ) -> Workforce:
        r"""Add a worker node to the workforce that uses `RolePlaying` system.

        Args:
            description (str): Description of the node.
            assistant_role_name (str): The role name of the assistant agent.
            user_role_name (str): The role name of the user agent.
            assistant_agent_kwargs (Optional[Dict], optional): The keyword
                arguments to initialize the assistant agent in the role
                playing, like the model name, etc. Defaults to `None`.
            user_agent_kwargs (Optional[Dict], optional): The keyword arguments
                to initialize the user agent in the role playing, like the
                model name, etc. Defaults to `None`.
            chat_turn_limit (int, optional): The maximum number of chat turns
                in the role playing. Defaults to 3.

        Returns:
            Workforce: The workforce node itself.
        """
        worker_node = RolePlayingWorker(
            description,
            assistant_role_name,
            user_role_name,
            assistant_agent_kwargs,
            user_agent_kwargs,
            chat_turn_limit,
        )
        self._children.append(worker_node)
        return self

调用add_role_playing_worker时会添加RolePlayingWorker的工作节点.

4.add_workforce 函数

	@check_if_running(False)
    def add_workforce(self, workforce: Workforce) -> Workforce:
        r"""Add a workforce node to the workforce.

        Args:
            workforce (Workforce): The workforce node to be added.

        Returns:
            Workforce: The workforce node itself.
        """
        self._children.append(workforce)
        return self

调用add_workforce时会添加workforce类型的工作节点.

四、基于workforce实现多智能体协调

关键步骤为

(1)创建一个 Workforce 实例

workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})

(2)worker定义

planner_agent = ChatAgent(
            system_message="""你是一个专业的旅行规划师。你的职责是:
                1. 根据景点分布规划合理的游览顺序
                2. 为每天安排适量的景点和活动
                3. 考虑用餐、休息等时间
                4. 注意不同季节的特点
                请确保行程安排合理且具有可行性。""",
            model=model,
            output_language='中文'
        )

(3)添加到Workforce 的工作节点

workforce.add_single_agent_worker(
    "负责制定详细行程规划",
    worker=planner_agent
)

(4)创建一个任务

from camel.tasks import Task

# 创建一个用于测试的任务
task = Task(
    content="规划一个3天的巴黎旅行计划。",
    id="0",  # id可以是任何标记字符串
)

(5)启动 Workforce 的任务处理流程

task = workforce.process_task(task)

完整示例代码

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.societies.workforce import Workforce
from camel.toolkits import SearchToolkit
from camel.tasks import Task
from camel.toolkits import FunctionTool
from camel.configs import SiliconFlowConfig  # 关键

from dotenv import load_dotenv
import os

load_dotenv()

api_key = os.getenv('Siliconflow_API_KEY')

model = ModelFactory.create(
    model_platform=ModelPlatformType.SILICONFLOW,
    model_type="Qwen/Qwen2.5-72B-Instruct",
    model_config_dict=SiliconFlowConfig(temperature=0.2).as_dict(),
    api_key=api_key
)
        
# 创建一个 Workforce 实例
workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})

search_tool = FunctionTool(SearchToolkit().search_duckduckgo)

search_agent = ChatAgent(
            system_message="""你是一个专业的旅游信息搜索助手。你的职责是:
                1. 搜索目的地的主要景点信息
                2. 搜索当地特色美食信息
                3. 搜索交通和住宿相关信息
                请确保信息的准确性和实用性。""",
            model=model,
            tools=[search_tool],
            output_language='中文'
        )

planner_agent = ChatAgent(
            system_message="""你是一个专业的旅行规划师。你的职责是:
                1. 根据景点分布规划合理的游览顺序
                2. 为每天安排适量的景点和活动
                3. 考虑用餐、休息等时间
                4. 注意不同季节的特点
                请确保行程安排合理且具有可行性。""",
            model=model,
            output_language='中文'
        )

reviewer_agent = ChatAgent(
    system_message="""你是一个经验丰富的旅行爱好者。你的职责是:
        1. 从游客角度评估行程的合理性
        2. 指出可能的问题和改进建议
        3. 补充实用的旅行小贴士
        4. 评估行程的性价比
        请基于实际旅行经验给出中肯的建议。""",
    model=model,
    output_language='中文'
)

# 添加工作节点
workforce.add_single_agent_worker(
    "负责搜索目的地相关信息",
    worker=search_agent
).add_single_agent_worker(
    "负责制定详细行程规划",
    worker=planner_agent
).add_single_agent_worker(
    "负责从游客角度评估行程",
    worker=reviewer_agent
)

from camel.tasks import Task

# 创建一个用于测试的任务
task = Task(
    content="规划一个3天的巴黎旅行计划。",
    id="0",  # id可以是任何标记字符串
)

task = workforce.process_task(task)

print(task.result)

五、遇到问题

关键报错代码

If you have a specific format in mind or a particular context for this number, please let me know!
Traceback (most recent call last):
  File "/home/allyoung/camel_course/03-2-33-Workforce.py", line 84, in <module>
    task = workforce.process_task(task)
  File "/home/allyoung/camel/camel/societies/workforce/utils.py", line 69, in wrapper
    return func(self, *args, **kwargs)
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 153, in process_task
    asyncio.run(self.start())
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/base_events.py", line 641, in run_until_complete
    return future.result()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 469, in start
    await self._listen_to_channel()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 437, in _listen_to_channel
    await self._post_ready_tasks()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 402, in _post_ready_tasks
    assignee_id = self._find_assignee(task=ready_task)
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 288, in _find_assignee
    result_dict = json.loads(response.msg.content)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

相关文章:

  • Guava:Google开源的Java工具库,太强大了
  • ZCS的随机游走的题解
  • 用Llama 3微调私有知识库:本地部署避坑指南
  • 大屏技术汇集【目录】
  • CMake 函数和宏
  • 34-三数之和
  • 应用案例 | 核能工业:M-PM助力核工业科研项目
  • 华为网路设备学习-16 虚拟路由器冗余协议(VRRP)
  • vue设置自定义logo跟标题
  • 基于ISO 26262的汽车芯片认证流程解读
  • 使用PlotNeuralNet绘制ResNet50模型
  • 第十五次CCF-CSP认证(含C++源码)
  • VC6.0图文安装教程
  • NFT在艺术品市场的影响:面纵花魄还是一场夢?
  • 【读点论文】Chain Replication for Supporting High Throughput and Availability
  • PLY格式文件如何转换成3DTiles格式——使用GISBox软件实现高效转换
  • 【NPU 系列专栏 3.0 -- scale-out 和 scale-in 和 scale-up 和 scale-down
  • Vue学习汇总(JS长期更新版)
  • 【leetcode hot 100 22】括号生成
  • 算法2--两数相加
  • 电信宽带办理/索引擎优化 seo
  • 卡通网站建设/免费推广软件工具
  • 上海做公司网站的公司/深圳关键词
  • 手机网站建设+上海/营销型网站更受用户欢迎的原因是
  • 沈阳网站建设招标公司/青岛网站快速排名提升
  • dw做网站怎么用到java/北京高端网站建设