当前位置: 首页 > news >正文

LangGraph认知篇-Send机制

Send 机制简介

        在LangGraph 中,节点(Nodes)和边(Edges)默认是预先定义的,且基于同一个共享状态(state)进行操作。但在某些场景下,例如实现映射-规约(map-reduce)等设计模式时,可能存在以下需求:

  • 动态分支创建:运行时根据数据决定分支数量(如处理用户上传的多个文档)

  • 状态隔离:不同分支需要独立状态(如并行处理不同主题的文本生成)

  • 并行处理:支持map-reduce等并行模式(如批量API调用)

  • 未知拓扑:处理无法预先定义图结构的场景(如动态工作流生成)

        为满足上述需求,LangGraph 提供了 Send 机制,其核心功能是:通过条件边(conditional edges)动态生成下游节点的调用指令,实现状态的按需分发和节点的动态触发

Send机制与静态图结构的区别

特性静态图Send机制
分支数量预先定义运行时动态生成
状态传递所有节点共享同一状态各分支拥有独立状态
执行模式顺序/固定路径执行并行/动态路径执行
适用场景固定拓扑结构数据驱动的动态拓扑

Send 对象的核心属性

  • node(str):目标节点的名称,即需要被调用的节点标识;

  • arg(Any):传递给目标节点的状态数据,可以是任意类型,通常是与该节点任务相关的特定信息。

工作机制图解

官方代码示例

from typing import Annotated, TypedDict
import operator
from langgraph.types import Send
from langgraph.graph import StateGraph, END, START# 1. 定义整体状态结构
class OverallState(TypedDict):subjects: list[str]  # 存储需要生成笑话的主题列表# 存储生成的笑话列表,使用operator.add指定合并方式(列表元素累加)jokes: Annotated[list[str], operator.add]# 2. 定义条件边处理函数:动态生成Send对象
def continue_to_jokes(state: OverallState):# 为每个主题创建一个Send对象,指定调用"generate_joke"节点并传递该主题return [Send("generate_joke", {"subject": s}) for s in state['subjects']]# 3. 构建状态图工作流
builder = StateGraph(OverallState)# 4. 添加"generate_joke"节点:根据传入的主题生成笑话
# 该节点接收包含"subject"的状态,返回包含笑话的字典(会合并到整体状态的jokes列表中)
builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})# 5. 配置起始节点的条件边:通过continue_to_jokes函数动态决定下一步
builder.add_conditional_edges(START, continue_to_jokes)# 6. 配置"generate_joke"节点的后续节点:生成笑话后到达结束节点
builder.add_edge("generate_joke", END)# 7. 编译工作流图
graph = builder.compile()# 8. 执行工作流:输入包含两个主题的状态
result = graph.invoke({"subjects": ["cats", "dogs"]})
print(result)
# 输出:{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
  • 定义整体状态,收集笑话主题列表,并存储;

  • 创建条件边处理函数,该函数根据主状态中的 subjects 列表,为每个主题生成一个 Send 对象;

  • 构建工作流

    • 添加 generate_joke 节点,其逻辑是基于传入的 subject 生成笑话;

    • 将起始节点(START)通过条件边关联到 continue_to_jokes 函数;

    • 配置 generate_joke 节点完成后指向结束节点(END)。

  • 执行效果:当输入 {"subjects": ["cats", "dogs"]} 时,工作流会:

    • 通过 Send 动态触发两次 generate_joke 节点调用;

    • 第一次调用使用 {"subject": "cats"} 生成对应笑话;

    • 第二次调用使用 {"subject": "dogs"} 生成对应笑话;

    • 最终汇总结果为 {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}

Map-reduce 设计模式简介

        映射 - 归约(Map-Reduce)是一种用于大规模数据处理的分布式计算设计模式,由 Google 提出,旨在通过拆分任务、并行处理和结果聚合,高效处理海量数据。其核心思想是将复杂任务分解为两个主要阶段:映射(Map) 和归约(Reduce),从而实现数据的分布式处理和结果的高效汇总。

核心思想与流程

        Map-Reduce 的工作流程可概括为 “拆分 - 处理 - 合并”,具体分为三个关键步骤:

  • 映射(Map)阶段

    • 输入:原始数据集(通常是大规模、非结构化或半结构化数据,如日志、文本等)。

    • 操作:由 “映射函数”(Map Function)将输入数据拆分为一系列键值对(Key-Value Pair)。例如,对文本数据进行分词时,可将每个单词作为 “键(Key)”,出现次数(初始为 1)作为 “值(Value)”,即 (单词, 1)

    • 目的:将原始数据转换为易于处理的中间格式,为后续聚合做准备。

  • 洗牌(Shuffle)阶段(隐式步骤)

    • 作用:在 Map 和 Reduce 之间自动执行,将 Map 阶段输出的键值对按 “键” 进行分组、排序和分发,确保相同键的所有值被发送到同一个 Reduce 节点。

    • 例如:将所有 (“apple”, 1) 键值对汇总到一起,形成 (“apple”, [1, 1, 1])

  • 归约(Reduce)阶段

    • 输入:Shuffle 阶段处理后的键值对(同一个键对应一组值)。

    • 操作:由 “归约函数”(Reduce Function)对同一键的所有值进行聚合计算(如求和、计数、平均值等)。例如,对 (“apple”, [1, 1, 1]) 求和,得到 (“apple”, 3),即 “apple” 出现 3 次。

    • 目的:将分散的中间结果合并为最终输出(如统计结果、分析报告等)。

Map-reduce 与 LangGraph 中 Send 机制的结合

        在 LangGraph 工作流框架中,Map-Reduce 模式可通过 Send 机制实现:

  • Map 阶段:用 Send 为每个输入对象生成独立任务(如为列表中的每个主题生成文本);

  • Reduce 阶段:收集所有任务的输出,通过归约节点汇总结果(如合并多个文本生成最终报告)。

        这种结合既保留了 Map-Reduce 的并行处理能力,又通过动态节点调用适配了更灵活的工作流场景。

LangGraph 实现Map-Reduce 示例

from typing import TypedDict
from langgraph.graph import StateGraph, Send, END# 状态定义
class MapReduceState(TypedDict):documents: list[str]        # 初始文档chunks: list[str]           # 文档分块summaries: list[str]        # 分块摘要final_summary: str          # 最终摘要# 1. 文档分块(初始节点)
def split_documents(state: MapReduceState):documents = state["documents"]chunks = [chunk for doc in documents for chunk in split_text(doc)]return {"chunks": chunks}# 2. 动态路由(Send核心)
def map_router(state: MapReduceState):return [Send("summarize_chunk", {"chunk": c})for c in state["chunks"]]# 3. 摘要生成(并行执行)
def summarize_chunk(state: MapReduceState):return {"summary": llm(f"Summarize: {state['chunk']}")}# 4. 结果聚合
def reduce_summaries(state: MapReduceState):all_summaries = [s for branch in state.values() if "summary" in s]return {"final_summary": combine_summaries(all_summaries)}# 构建工作流
graph = StateGraph(MapReduceState)
graph.add_node("split_docs", split_documents)
graph.add_node("summarize_chunk", summarize_chunk)
graph.add_node("reduce", reduce_summaries)# 动态分支配置
graph.add_conditional_edges("split_docs", map_router)
graph.add_conditional_edges("summarize_chunk", lambda _: "reduce")
graph.add_edge("reduce", END)# 执行工作流
result = graph.invoke({"documents": ["long_text1", "long_text2"],"chunks": [],"summaries": [],"final_summary": ""
})
print(result["final_summary"])

    执行流程:

    1. 主节点完成 → 触发条件边函数

    2. 生成 Send 对象列表 → 创建并行分支

    3. 每个分支执行指定节点 → 独立状态输入

    4. 所有分支完成后 → 触发聚合节点

    其核心机制包括:

    • 动态路由:运行时根据chunks列表长度决定创建多少分支,生成的Send列表会被LangGraph自动并行处理;

    动态路由(Send核心)def map_router(state: MapReduceState):return [Send("summarize_chunk", {"chunk": c})for c in state["chunks"]]
    • 状态隔离原理:每个分支只接收必要数据(减少内存开销),LangGraph自动管理分支状态的创建和传递;

    # 主状态(分割后)
    {"documents": [doc1, doc2],"chunks": [chunk1, chunk2, chunk3, ...],"chunk_summaries": [],"final_summary": ""
    }# 分支独立状态示例
    [{"chunk": "chunk1内容", "chunk_id": 0},  # 分支1{"chunk": "chunk2内容", "chunk_id": 1},  # 分支2{"chunk": "chunk3内容", "chunk_id": 2}   # 分支3
    ] 
    • 结果聚合机制:summarize_chunk 全部执行完毕后,会调用reduce节点,通过state.values() 获取所有分支的状态,汇总出最终的结果;

    graph.add_conditional_edges("summarize_chunk", lambda _: "reduce")# 4. 结果聚合
    def reduce_summaries(state: MapReduceState):all_summaries = [s for branch in state.values() if "summary" in s]return {"final_summary": combine_summaries(all_summaries)}

    Send 机制典型应用场景

    • 文档分块处理

    Send("process_chunk", {"chunk": c}) for c in doc_chunks
    • API 并行调用

    Send("call_api", {"request": r}) for r in requests
    • 批量数据处理

    Send("process_item", {"item": i}) for i in batch
    • 动态工作流生成

    Send(next_step, config) for config in dynamic_configs

    注意事项

    1. 状态设计:分支节点接收的状态是独立的,不包含主状态的所有字段

      # 推荐
      Send("process", {"item_id": 123})# 不推荐
      Send("process", full_state)
    2. 聚合触发:所有分支必须完成才能触发下游节点

      graph.add_conditional_edges("worker_node",lambda _: "aggregate_node"  # 所有分支完成后触发
      )
    3. 错误处理:单个分支失败会影响整个工作流

    4. 性能考虑:避免创建过多分支导致资源耗尽

    参考文献

    https://github.com/langchain-ai/langgraph

    Overview

    http://www.dtcms.com/a/308172.html

    相关文章:

  1. TypeScript 基础介绍(二)
  2. QT6 Python UI文件转换PY文件的方法
  3. 如何为C#加入EPPlus 包
  4. 【Flask基础②】 | 路由、响应与异常处理
  5. 微服务快速集成 TraceId
  6. 企业智脑1.3.2版本发布,设备管理+智能体OS双核驱动,重构数字生产力边界
  7. 【车联网kafka】Kafka核心架构与实战经验(第二篇)
  8. 网络与信息安全有哪些岗位:(4)应急响应工程师
  9. 【MySQL集群架构与实践3】使用Dcoker实现读写分离
  10. VuePress 使用详解
  11. 安卓基础布局核心知识点整理
  12. 基于UDP的SNMP协议
  13. Svelte 5 完全指南:从入门到跨端应用开发
  14. 【Keras学习笔记】开发环境搭建
  15. MATLAB 实现 SRCNN 图像超分辨率重建
  16. toFixed()方法的报错注意
  17. C++11原子操作实现公平自旋锁
  18. 【IQA技术专题】DISTS代码讲解
  19. 深入剖析:C++ 手写实现 unordered_map 与 unordered_set 全流程指南
  20. Qt 如何从 .ts 文件提取所有源文
  21. 2024年SEVC SCI2区,一致性虚拟领航者跟踪群集算法GDRRT*-PSO+多无人机路径规划,深度解析+性能实测
  22. TDengine 中 TDgp 中添加算法模型(异常检测)
  23. 【生活篇】Ubuntu22.04安装网易云客户端
  24. 河南萌新联赛2025第(三)场:河南理工大学(补题)
  25. .NET 10 中的新增功能系列文章3—— .NET MAUI 中的新增功能
  26. gen_compile_commands.sh
  27. elk部署加日志收集
  28. 网络爬虫(python)入门
  29. webpack-babel
  30. 开发避坑短篇(11):Oracle DATE(7)到MySQL时间类型精度冲突解决方案