当前位置: 首页 > news >正文

AI Agent设计模式五:Orchestrator

概念 :中央任务调度中枢

  • ✅ 优点:全局资源协调,确保任务执行顺序
  • ❌ 缺点:单点故障风险,可能成为性能瓶颈

在这里插入图片描述

import operator
import os

from langchain.schema import SystemMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from pydantic import BaseModel, Field
from typing_extensions import Annotated, TypedDict, List

# 将一个论证标题拆分为多个子标题,大模型并行执行这些子标题,最终汇总所有的结果。

# 初始化模型
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    openai_api_key=os.environ["GPT_API_KEY"],
    openai_api_base="https://api.chatanywhere.tech/v1",
    streaming=False  # 禁用流式传输
)

# 大模型拆分的某一个维度的结构
class Section(BaseModel):
    name: str = Field(description="章节标题")
    description: str = Field(description="章节概述")

# 结构化输出信息的实体类,输出一个列表
class Sections(BaseModel):
    sections: List[Section] = Field(description="章节列表")

# 大模型用到的一些参数
class State(TypedDict):
    topic: str
    sections: list[Section]
    completed_sections: Annotated[list, operator.add]
    final_report: str

# 大模型并行执行的任务参数
class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add]

# 大模型的调度器,将任务拆分
def orchestrator(state: State):
    print("大模型调度器开始拆分任务")
    new_llm = llm.with_structured_output(Sections, method="function_calling")
    output = new_llm.invoke(
        [
            SystemMessage(content="你需要分析论证某个主题,并将其拆分成几个不同的立场进行多视角分析。"),
            HumanMessage(content=f"论证主题:{state['topic']}")
        ]
    )
    print(f"大模型调度器任务拆分完成: {output.sections}")
    return {"sections": output.sections}

# 大模型执行的具体任务
def llm_call(state: WorkerState):
    print(f"大模型执行任务: {state['section'].name}")
    output = llm.invoke(
        [
            SystemMessage(content="根据提供的章节标题和概述,完成论证文章中的其中一个章节。"),
            HumanMessage(content=f"章节标题为:{state['section'].name} 章节概述为:{state['section'].description}")
        ]
    )
    return {"completed_sections": [output.content]}

# 大模型合成最终结果
def synthesizer(state: WorkerState):
    print(f"大模型合成最终结果")
    completed_sections = state["completed_sections"]
    completed_report_sections = "\n\n---\n\n".join(completed_sections)
    return {"final_report": completed_report_sections}

# 分配到多个任务上
def assign_worker(state: State):
    # 需要多个任务并发,但是并不清楚有多少个任务时,使用Send
    return [Send("llm_call", {"section": s}) for s in state["sections"]]

# 创建工作流
work_flow = StateGraph(State)
work_flow.add_node("orchestrator", orchestrator)
work_flow.add_node("llm_call", llm_call)
work_flow.add_node("synthesizer", synthesizer)

work_flow.add_edge(START, "orchestrator")
work_flow.add_conditional_edges(
    "orchestrator",
    assign_worker,
    ["llm_call"]
)
work_flow.add_edge("llm_call", "synthesizer")
work_flow.add_edge("synthesizer", END)

graph = work_flow.compile()

result = graph.invoke({"topic": "如何评价ChatGPT"})
print(f"最终结果: {result['final_report']}")

执行结果
在这里插入图片描述

http://www.dtcms.com/a/113770.html

相关文章:

  • form实现pdf文件转换成jpg文件
  • Spring Cloud 框架为什么能处理高并发
  • python基础-16-处理csv文件和json数据
  • 未来已来:探索AI驱动的HMI设计新方向
  • 动画过渡设置
  • 【JS】接雨水题解
  • 春季赛day15 Snailography
  • 铁电液晶(FLC)与反铁电液晶(AFLC)
  • SCADE One - 弥合基于模型设计与传统编程之间的鸿沟
  • 【学Rust写CAD】31 muldiv255函数(muldiv255.rs)
  • 设计模式简述(三)工厂模式
  • 《C语言代码解析与应用:数组操作的两种实现》
  • ctfshow VIP题目限免 版本控制泄露源码2
  • LeetCode详解之如何一步步优化到最佳解法:20. 有效的括号
  • 配置ASP.NET Core+NLog配置日志示例
  • 基于 FPGA 的分秒计数器
  • 如何实现两个视频融合EasyCVR平台的数据同步?详细步骤指南
  • 爬虫练习案例
  • zk基础—5.Curator的使用与剖析二
  • 打造高效英文单词记忆系统:基于Python的实现与分析
  • $R^n$超平面约束下的向量列
  • 游戏引擎学习第206天
  • React框架的Hooks实现原理
  • MicroPython 开发ESP32应用教程 之 WIFI简单应用 :时间同步、天气信息获取,ST7735 TFT屏驱动及任意中文字符显示
  • Linux制作deb安装包
  • 卡尔曼滤波器浅聊
  • windows 常用命令总结
  • MySQL表的增删改查基础版
  • 【大模型深度学习】如何估算大模型需要的显存
  • JavaScript基础--09-流程控制语句:选择结构(if和switch)