当前位置: 首页 > news >正文

【Agent的革命之路——LangGraph】工作流中的 map-reduce 模式

这节我们来探索一下工作流设计中常用的 Map-Reduce 模式,在介绍 Map-Reduce 模式之前,我们想来看下 LangGraph 中的Send 对象已经结构化输出。

Send对象

默认情况下,节点(Nodes)和边(Edges)会预先定义,并在同一共享状态(State)下运行。然而,某些情况下可能无法提前确定边的具体数量,或者需要同时存在不同版本的状态。一个典型例子是 Map-Reduce(映射-归约)设计模式:在此模式中,第一个节点可能生成一个对象列表,而后续需要对列表中的每个对象应用另一个节点。此时,对象的数量可能未知(意味着边的数量无法预先确定),且下游节点的输入状态应是独立的(每个生成的对象对应一个独立的状态)。
为支持此类设计模式,LangGraph 允许从**条件边(conditional edges)**返回 Send 对象。Send 接收两个参数:第一个是目标节点的名称,第二个是要传递给该节点的状态(State)。
下面是Send的伪代码:

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

graph.add_conditional_edges("node_a", continue_to_jokes)

Structured outputs 结构化输出

对于许多应用程序(例如聊天机器人),模型需要直接用自然语言响应用户。但是,在某些情况下,我们需要模型以结构化格式输出。例如,我们可能希望将模型输出存储在数据库中,并确保输出符合数据库模式。这需要激发结构化输出的概念,可以指示模型以特定的输出结构进行响应。
在这里插入图片描述
下面我们展示一下结构化输出的伪代码,该伪代码说明了使用结构化输出时推荐的工作流程。 Langchain提供了一种方法with_structured_output(),该方法可自动将模式与模型结合并解析输出的过程。该辅助功能可用于所有支持结构化输出的模型提供商。

# Define schema
schema = {"foo": "bar"}
# Bind schema to model
model_with_structure = model.with_structured_output(schema)
# Invoke the model to produce structured output that matches the schema
structured_output = model_with_structure.invoke(user_input)

我们稍微改动一下上面伪代码,使真实的代码更有可比性。

  1. 定义模式
    通过Pydantic定义结构化输出,它提供了类型的提示和验证,
from pydantic import BaseModel, Field
class ResponseFormatter(BaseModel):
    """Always use this tool to structure your response to the user."""
    answer: str = Field(description="The answer to the user's question")
    followup_question: str = Field(description="A followup question the user could ask")
  1. 返回结构化输出
    有几种结构化输出的方式,也是根据不同的模型提供商提供的输出方式,我一一举例。
    工具调用模式:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Bind responseformatter schema as a tool to the model
model_with_tools = model.bind_tools([ResponseFormatter])
# Invoke the model
ai_msg = model_with_tools.invoke("What is the powerhouse of the cell?")

# Get the tool call arguments
ai_msg.tool_calls[0]["args"]
# 输出结果
{'answer': "The powerhouse of the cell is the mitochondrion. Mitochondria are organelles that generate most of the cell's supply of adenosine triphosphate (ATP), which is used as a source of chemical energy.",
 'followup_question': 'What is the function of ATP in the cell?'}
 
# Parse the dictionary into a pydantic object
pydantic_object = ResponseFormatter.model_validate(ai_msg.tool_calls[0]["args"])

当使用工具调用时,需要将工具调用参数从字典中解析为原始架构。并且,我们还需要指示模型在我们要执行结构化输出时始终使用该工具,这是提供商特定的设置。
json模式:

from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o", model_kwargs={ "response_format": { "type": "json_object" } })
ai_msg = model.invoke("Return a JSON object with key 'random_ints' and a value of 10 random ints in [0-99]")
ai_msg.content
'\n{\n  "random_ints": [23, 47, 89, 15, 34, 76, 58, 3, 62, 91]\n}'

使用JSON模式时,需要将输出解析为JSON对象。
当然Langchain提供了一个辅助功能(with_structured_output())来简化该过程。
在这里插入图片描述
这两者都将模式绑定到模型作为工具,并将输出解析为指定的输出模式。

# Bind the schema to the model
model_with_structure = model.with_structured_output(ResponseFormatter)
# Invoke the model
structured_output = model_with_structure.invoke("What is the powerhouse of the cell?")
# 打印这个 pydantic object
structured_output
# 得到下面结果
ResponseFormatter(answer="The powerhouse of the cell is the mitochondrion. Mitochondria are organelles that generate most of the cell's supply of adenosine triphosphate (ATP), which is used as a source of chemical energy.", followup_question='What is the function of ATP in the cell?')

Map-reduce

我们经过上面介绍了Send对象还有结构化输出,对于我们接下来写 Map-reduce 模式提供了很好的铺垫。
下面我们来开始写个例子,由于Map-reduce模式操作对于高效任务分解和并行处理至关重要。此例子涉及将任务分解为较小的子任务,并行处理每个子任务,并汇总所有已完成子任务的结果。
例子的场景:给定一个来自用户的一般主题,生成相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。在这个设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),我们想要将其他节点(例如,生成一个笑话)应用于所有这些对象(例如,主题)。然而,出现了两个主要挑战。
(1)当我们布置图形时,对象(例如,主体)的数量可能提前未知(意味着可能不知道边的数量),
(2)下游节点的输入状态应该不同(每个生成的对象一个)。
LangGraph通过其SendAPI解决了这些挑战。通过利用条件边,Send可以将不同的状态(例如,主题)分配给节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流管理。
在这里插入图片描述
下面是本例子的详细代码,我在每个组件上都加了关键注释:

import operator
from typing import Annotated
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic

from langgraph.types import Send
from langgraph.graph import END, StateGraph, START

from pydantic import BaseModel, Field

# 定义 model and prompts
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.

{jokes}"""


class Subjects(BaseModel):
    subjects: list[str]


class Joke(BaseModel):
    joke: str


class BestJoke(BaseModel):
    id: int = Field(description="Index of the best joke, starting with 0", ge=0)


model = ChatAnthropic(model="claude-3-5-sonnet-20240620")

# 定义组件

# 这将作为主图的全局状态。
# 它包含一个主题(我们期望用户提供),
# 然后会生成一个主题列表,并为每个主题生成一个笑话。
class OverallState(TypedDict):
    topic: str
    subjects: list
    # 注意这里我们使用了 operator.add
	# 这是因为我们希望将所有从单个节点生成的笑话合并到一个列表中 —— 这本质上是“归约”(reduce)部分。
    jokes: Annotated[list, operator.add]
    best_selected_joke: str


# 这将作为节点的状态,我们将所有主题“映射”到该节点以生成笑话。
class JokeState(TypedDict):
    subject: str


# 这是我们将用于生成笑话主题的函数。
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}


# 这里我们根据主题生成一个笑话。
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}


# 这里我们定义了遍历生成主题的逻辑,
# 并将其作为图中的一条边使用。
def continue_to_jokes(state: OverallState):
    # 我们将返回一个 `Send` 对象列表。
	# 每个 `Send` 对象包含图中一个节点的名称,
	# 以及要发送给该节点的状态。
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]


# 这里我们将评判最佳笑话。
def best_joke(state: OverallState):
    jokes = "\n\n".join(state["jokes"])
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)
    return {"best_selected_joke": state["jokes"][response.id]}


# 构建图:在这里我们将所有内容整合在一起以构建我们的图。
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()

这是我们的graph:
在这里插入图片描述
调用该graph:

# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
    print(s)

得到结果:

{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}}
{'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}

通过上面的示例中,通过设计Map-Reduce 模式来处理生成笑话的任务。它被分为了 Map 阶段,和 Reduce 阶段,在Map阶段generate_topics 节点生成与主题相关的多个子主题(subjects)。然后,通过 continue_to_jokes 函数,将每个子主题映射到 generate_joke 节点,生成对应的笑话。每个子主题的处理是独立的,因此可以并行执行。而在Reduce 阶段,对任务进行了分解,生成笑话的任务可以分解为多个独立的子任务(每个子主题生成一个笑话),适合并行处理。首先 Map-Reduce 模式天然支持这种任务分解和并行化。其次主题的数量是动态的,无法预先确定。Map-Reduce 模式可以灵活处理动态数量的子任务。最终需要将所有生成的笑话合并为一个列表,并选择最佳笑话。Reduce 阶段提供了这种汇总功能。Map-Reduce 模式将任务分解为清晰的阶段(Map 和 Reduce),使代码结构更清晰,易于理解和维护。

相关文章:

  • 【力扣】98.验证搜索二叉树
  • 线性表之顺序表
  • 《LeetCode 763. 划分字母区间 | 高效分割字符串》
  • linux-5.10.110内核源码分析 - bcm2711 pcie BAR地址分配
  • 牛客寒假训练营3
  • 芯片引脚描述或电路原理图中的Ipd、Ipu是什么意思?
  • fps武器系统6:随机弹道
  • vLLM专题(四)-故障排除
  • vue中如何动态的增减组件的类名(class)
  • OpenPose
  • 计算机世界的寻宝游戏:用C语言解密五大查找算法
  • 数据仓库与数据湖的协同工作:智慧数据管理的双引擎
  • 类和对象(5)——抽象类和接口
  • 云存储:云计算储存
  • 安卓设备调试h5页面(调试)
  • 量化交易入门指南
  • 《Stable Diffusion绘画完全指南:从入门到精通的Prompt设计艺术》-配套代码示例
  • 位运算,双指针,二分,排序算法
  • WeMos D1+PIR+Android 的小场景制作
  • freertos源码分析DAY12 (软件定时器)
  • 李强会见巴西总统卢拉
  • 西北大学副校长范代娣成陕西首富?系家庭财富,本人已从上市公司退出
  • 通辽警方侦破一起积压21年的命案:嫌疑人企图强奸遭反抗后杀人
  • 2025年度十大IP!IP SH荣膺文化综合类TOP10
  • 多省市已开展现房销售试点,去年全国现房销售面积占比超30%
  • 当我们提起拉动消费时,应该拉动什么消费?