当前位置: 首页 > news >正文

AI Agent设计模式二:Parallelization

概念 :并行任务执行引擎

  • ✅ 优点:提升吞吐量,充分利用多核资源
  • ❌ 缺点:复杂度高,存在竞态条件风险

在这里插入图片描述

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
import os

# 初始化模型
client = ChatOpenAI(
    model="deepseek-r1",
    openai_api_key=os.environ["BAILIAN_API_KEY"],
    openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
    streaming=False  # 禁用流式传输
)

# 定义实体类
class PhilosophyState(TypedDict):
    topic: str
    part0: str
    part1: str
    part2: str
    combined_output: str

meta_prompt = """
    针对如下问题进行思考,并得出结论。
    问题如下:{topic}
    你分析的角度如下:{aspect}
"""

# 道家观点解释
def part0_interpreter(state: PhilosophyState):
    print(f"道家视点解释开始 :{state['topic']}")

    prompt = meta_prompt.format(topic = state['topic'], aspect = "道家")
    response = client.invoke(prompt)

    print(f"道家观点:{response}")
    return {'part0': response}

# 儒学观点解释
def part1_interpreter(state: PhilosophyState):
    print(f"儒学观点解释开始 :{state['topic']}")
    prompt = meta_prompt.format(topic=state['topic'], aspect="儒学")
    response = client.invoke(prompt)
    print(f"儒学观点:{response}")
    return {'part1': response}

# 法学观点解释
def part2_interpreter(state: PhilosophyState):
    print(f"法学观点解释开始 :{state['topic']}")
    prompt = meta_prompt.format(topic=state['topic'], aspect="法学")
    response = client.invoke(prompt)
    print(f"法学观点:{response}")
    return {'part2': response}

def aggregate_results(state: PhilosophyState):
    combined = f"{state['part0']}\n{state['part1']}\n{state['part2']}"
    return {'combined_output': combined }

# 创建工作流
workflow = StateGraph(PhilosophyState)

# 添加节点
workflow.add_node("part0_interpreter", part0_interpreter)
workflow.add_node("part1_interpreter", part1_interpreter)
workflow.add_node("part2_interpreter", part2_interpreter)
workflow.add_node("aggregate_results", aggregate_results)

# 添加节点边
workflow.add_edge(START, "part0_interpreter")
workflow.add_edge(START, "part1_interpreter")
workflow.add_edge(START, "part2_interpreter")
workflow.add_edge("part0_interpreter", "aggregate_results")
workflow.add_edge("part1_interpreter", "aggregate_results")
workflow.add_edge("part2_interpreter", "aggregate_results")
workflow.add_edge("aggregate_results", END)

# 编译工作流
app = workflow.compile()

result = app.invoke({"topic": "治国之道在于平衡各方利益"})
print(result)

执行结果
在这里插入图片描述

http://www.dtcms.com/a/111217.html

相关文章:

  • 【新能源汽车整车动力学模型深度解析:面向MATLAB/Simulink仿真测试工程师的硬核指南】
  • PyTorch:解锁AI新时代的钥匙
  • Python基于时间序列分析的降雨量预测系统的设计与实现【附源码、文档说明】
  • 一周学会Pandas2 Python数据处理与分析-Jupyter Notebook安装
  • C++类的特殊成员函数:构造、拷贝构造与析构函数详解
  • F#语言的折线图
  • Prolog语言的强化学习
  • MySQL 知识点详解(索引、存储引擎、事务、锁机制、优化)
  • 当机器学习遇见购物车分析:FP-Growth算法全解析
  • 对模板方法模式的理解
  • WPF设计学习记录滴滴滴6
  • 池化技术的深度解析与实践指南【大模型总结】
  • 【51单片机】2-6【I/O口】电动车简易防盗报警器实现
  • Python循环控制语句
  • 幻觉抵抗优化大模型:teapotllm
  • Linux 线程1-线程的概念、线程与进程区别、线程的创建、线程的调度机制、线程函数传参
  • SpringBoot+Spring+MyBatis相关知识点
  • MQL5教程 05 指标开发实战:双色线、双线变色MACD、跨时间周期均线
  • TSMaster在新能源汽车研发测试中的硬核应用指南
  • 【rockchip】使用RKMPP+RGA解码H264并转换数据格式输出
  • 一文理解什么是中值模糊
  • C++多线程函数介绍
  • 【Kafka基础】ZooKeeper在Kafka中的核心作用:分布式系统中枢神经系统
  • 【LeetCode Solutions】LeetCode 141 ~ 145 题解
  • RocketMQ 中的 ProducerManager 组件剖析
  • 【Java Stream详解】
  • 提高:图论:强连通分量 图的遍历
  • Nginx功能及应用全解:从负载均衡到反向代理的全面剖析
  • OpenAI:人工智能领域的探索者与变革者
  • 黑马点评redis改 part 1