基于 LangGraph 框架实现智能研究助手示例程序
上一篇文章介绍LangGraph 是对 LangChain 的重要补充,专为解决复杂智能体流程而设计。它通过图结构、状态驱动、条件控制等机制,使 AI 应用具备更强的流程控制能力、可观测性和人机协作能力。
这一篇实现一个基于 LangGraph 框架的智能研究助手示例程序,主要功能是构建一个能够回答用户问题的工作流系统。
1. 核心功能
实现了一个完整的智能研究助手工作流:接收用户问题 → 分析问题 → (可选)网络搜索 → 生成最终答案
核心概念:
- State(状态):应用程序的状态信息
- Node(节点):执行特定任务的函数
- Edge(边):连接节点的路径,定义执行流程
- Graph(图):由节点和边组成的工作流图
import os
from typing import Dict, List, TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import Tool
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
import json# 添加图形可视化所需的导入
try:from IPython.display import Image, displayIPYTHON_AVAILABLE = True
except ImportError:IPYTHON_AVAILABLE = Falseprint("📝 注意:IPython 不可用,将保存图片到文件")
2. 主要组件
状态定义
使用 TypedDict 定义 ResearchState
状态结构,包含:
question
: 用户原始问题messages
: 消息历史记录(使用 Annotated 支持消息累积)search_results
: 搜索结果need_more_info
: 是否需要更多信息的标志final_answer
: 最终答案
# ===========================
# 定义应用状态 State
# ===========================class ResearchState(TypedDict):"""定义研究助手的状态结构LangGraph 使用 TypedDict 来定义状态,这样可以:1. 明确数据结构2. 提供类型检查3. 在节点间传递状态信息"""# 用户的原始问题question: str# 消息历史记录(使用 Annotated 类型来支持消息累积)messages: Annotated[List[BaseMessage], add_messages]# 搜索结果search_results: List[str]# 是否需要更多信息need_more_info: bool# 最终答案final_answer: str
模拟工具
mock_web_search()
: 模拟网络搜索功能,根据预设的关键词返回相应的搜索结果- 创建了名为
web_search
的 Tool 对象,用于集成到工作流中
# ===========================
# 模拟工具定义
# ===========================
def mock_web_search(query: str) -> str:"""模拟网络搜索工具在实际应用中,这里会调用真实的搜索 API"""# 模拟搜索结果mock_results = {"python": ["Python是一种高级编程语言,由Guido van Rossum于1991年首次发布。","Python具有简洁的语法和强大的库生态系统,广泛用于Web开发、数据科学、人工智能等领域。","Python的设计哲学强调代码的可读性和简洁性。"],"机器学习": ["机器学习是人工智能的一个分支,通过算法让计算机从数据中学习模式。","主要包括监督学习、无监督学习和强化学习三大类。","常用的机器学习库包括scikit-learn、TensorFlow、PyTorch等。"],"langraph": ["LangGraph是LangChain生态系统中用于构建有状态应用的框架。","它提供了图状态管理、节点连接、条件路由等功能。","特别适合构建复杂的AI Agent和工作流系统。"]}# 简单的关键词匹配for keyword, results in mock_results.items():if keyword.lower() in query.lower():return json.dumps(results, ensure_ascii=False)return json.dumps(["未找到相关信息,请尝试其他关键词。"], ensure_ascii=False)# 创建搜索工具
search_tool = Tool(name="web_search",description="搜索网络信息来回答用户问题",func=mock_web_search
)
节点函数
analyze_question_node()
: 分析用户问题,判断是否需要搜索更多信息search_node()
: 使用搜索工具获取信息,处理搜索结果generate_answer_node()
: 基于搜索结果或直接生成最终答案
# ===========================
# 定义节点函数
# ===========================def analyze_question_node(state: ResearchState) -> ResearchState:"""问题分析节点功能:1. 分析用户问题2. 判断是否需要搜索更多信息3. 决定下一步行动"""print("🔍 正在分析用户问题...")question = state["question"]# 简单的问题分析逻辑need_search = True# 一些不需要搜索的简单问题simple_questions = ["你好", "再见", "谢谢"]if any(simple_word in question for simple_word in simple_questions):need_search = False# 更新状态state["need_more_info"] = need_searchstate["messages"].append(SystemMessage(content=f"正在分析问题:{question}"))print(f" 问题:{question}")print(f" 需要搜索:{need_search}")return statedef search_node(state: ResearchState) -> ResearchState:"""搜索节点功能:1. 使用搜索工具获取信息2. 处理搜索结果3. 更新状态"""print("🌐 正在搜索相关信息...")question = state["question"]# 直接调用搜索工具函数search_result = mock_web_search(question)# 解析搜索结果try:results = json.loads(search_result)state["search_results"] = resultsexcept json.JSONDecodeError:state["search_results"] = [search_result]# 更新消息历史state["messages"].append(SystemMessage(content=f"搜索完成,找到 {len(state['search_results'])} 条结果"))print(f" 搜索结果数量:{len(state['search_results'])}")for i, result in enumerate(state["search_results"]):print(f" 结果 {i+1}:{result[:100]}...")return statedef generate_answer_node(state: ResearchState) -> ResearchState:"""答案生成节点功能:1. 基于搜索结果生成答案2. 创建最终回复3. 完成处理流程"""print("📝 正在生成最终答案...")question = state["question"]search_results = state.get("search_results", [])# 如果不需要搜索,直接生成简单回复if not state.get("need_more_info", True):simple_responses = {"你好": "您好!我是智能研究助手,有什么可以帮助您的吗?","再见": "再见!希望我的回答对您有帮助。","谢谢": "不客气!很高兴能够帮助您。"}for key, response in simple_responses.items():if key in question:state["final_answer"] = responsebreakelse:state["final_answer"] = "您好!请问有什么可以帮助您的吗?"else:# 基于搜索结果生成详细答案if search_results:answer = f"根据我的搜索,关于「{question}」的信息如下:\n\n"for i, result in enumerate(search_results, 1):answer += f"{i}. {result}\n"answer += f"\n希望这些信息能够回答您关于「{question}」的问题。如果您需要更多详细信息,请随时告诉我!"else:answer = f"抱歉,我没有找到关于「{question}」的相关信息。请尝试换个角度提问,或者提供更多背景信息。"state["final_answer"] = answer# 更新消息历史state["messages"].append(AIMessage(content=state["final_answer"]))print(f" 生成答案长度:{len(state['final_answer'])} 字符")print(f" 答案预览:{state['final_answer'][:100]}...")return state
条件判断函数
should_search()
: 决定执行流程是进入搜索节点还是直接生成答案
# ===========================
# 定义条件判断函数
# ===========================def should_search(state: ResearchState) -> str:"""条件边函数:判断是否需要进行搜索返回值:- "search": 需要搜索- "generate": 直接生成答案"""if state.get("need_more_info", True):return "search"else:return "generate"
工作流构建
create_research_graph()
: 构建完整的 LangGraph 工作流图,包括:- 创建状态图
- 添加节点
- 设置入口点
- 添加条件边和常规边
- 编译图为可执行应用
# ===========================
# 构建工作流图
# ===========================def create_research_graph():"""创建研究助手的工作流图图结构:START -> analyze_question -> [条件判断] -> search -> generate_answer -> END\-> generate_answer -> END"""print("🏗️ 正在构建 LangGraph 工作流...")# 1. 创建状态图workflow = StateGraph(ResearchState)# 2. 添加节点workflow.add_node("analyze_question", analyze_question_node)workflow.add_node("search", search_node)workflow.add_node("generate_answer", generate_answer_node)# 3. 设置入口点workflow.set_entry_point("analyze_question")# 4. 添加条件边(从问题分析到搜索或直接生成答案)workflow.add_conditional_edges("analyze_question", # 起始节点should_search, # 条件函数{"search": "search", # 如果需要搜索"generate": "generate_answer" # 如果直接生成答案})# 5. 添加常规边workflow.add_edge("search", "generate_answer") # 搜索后生成答案workflow.add_edge("generate_answer", END) # 生成答案后结束# 6. 编译图app = workflow.compile()print("✅ LangGraph 工作流构建完成!")return app
3. 演示与测试
run_research_assistant()
: 运行单个研究助手查询流程demo_langgraph()
: 演示多个测试用例,包括关于 Python、机器学习、LangGraph 的问题和简单问候语visualize_graph()
: 可视化 LangGraph 工作流图,支持保存为本地图片或在 IPython 中显示
def run_research_assistant(question: str):"""运行研究助手参数:- question: 用户问题返回:- 处理结果"""print(f"\n{'='*50}")print(f"🤖 智能研究助手开始工作")print(f"{'='*50}")# 创建图应用app = create_research_graph()# 初始化状态initial_state = ResearchState(question=question,messages=[HumanMessage(content=question)],search_results=[],need_more_info=True,final_answer="")print(f"\n📥 收到用户问题:{question}")# 执行工作流result = app.invoke(initial_state)print(f"\n{'='*50}")print(f"📤 最终回答:")print(f"{'='*50}")print(result["final_answer"])return result# ===========================
# 第七步:演示和测试
# ===========================def demo_langgraph():"""演示 LangGraph 的各种功能"""print("🚀 LangGraph 框架演示开始")print("=" * 60)# 测试用例test_questions = ["什么是 Python?","机器学习的基本概念是什么?","你好","LangGraph 是什么?"]for i, question in enumerate(test_questions, 1):print(f"\n🔹 测试案例 {i}")result = run_research_assistant(question)print(f"\n📊 执行统计:")print(f" - 消息数量:{len(result['messages'])}")print(f" - 搜索结果:{len(result['search_results'])}")print(f" - 是否搜索:{result['need_more_info']}")# 添加分隔线if i < len(test_questions):print("\n" + "-" * 60)def visualize_graph():"""可视化 LangGraph 工作流图"""print(f"\n{'='*50}")print("🎨 正在生成 LangGraph 工作流可视化图...")print(f"{'='*50}")try:# 创建图应用用于可视化app = create_research_graph()# 首先尝试保存图片到本地(无论是否有IPython)saved_successfully = Falsetry:print("💾 保存 Mermaid 图片到本地...")png_bytes = app.get_graph().draw_mermaid_png()with open("langgraph_workflow_mermaid.png", "wb") as f:f.write(png_bytes)print("✅ Mermaid 图片已保存为 langgraph_workflow_mermaid.png")saved_successfully = Trueexcept Exception as e:print(f"⚠️ Mermaid 图片保存失败:{e}")try:print("💾 保存 Graphviz 图片到本地...")png_bytes = app.get_graph().draw_png()with open("langgraph_workflow_graphviz.png", "wb") as f:f.write(png_bytes)print("✅ Graphviz 图片已保存为 langgraph_workflow_graphviz.png")saved_successfully = Trueexcept Exception as e2:print(f"❌ Graphviz 图片保存也失败:{e2}")# 如果在IPython环境中,也显示图片if IPYTHON_AVAILABLE:try:print("🖼️ 在 IPython 中显示图片...")png_bytes = app.get_graph().draw_mermaid_png()display(Image(png_bytes))print("✅ IPython 显示成功")except Exception as e:print(f"⚠️ IPython 显示失败:{e}")try:png_bytes = app.get_graph().draw_png()display(Image(png_bytes))print("✅ IPython Graphviz 显示成功")except Exception as e2:print(f"❌ IPython 显示也失败:{e2}")if saved_successfully:print("✅ 图片已成功保存到本地!")else:print("❌ 图片保存失败,请安装相关依赖:")print(" pip install pygraphviz")print(" pip install graphviz")except Exception as e:print(f"❌ 图形可视化失败:{e}")print("💡 请确保已安装相关依赖:")print(" pip install pygraphviz")print(" pip install graphviz")print("✅ LangGraph 可视化完成")if __name__ == "__main__":"""主函数:运行 LangGraph 演示这个案例展示了 LangGraph 的核心特性:1. 状态管理:通过 TypedDict 定义清晰的状态结构2. 节点设计:每个节点专注于特定功能3. 边的连接:控制执行流程4. 条件路由:根据状态动态选择路径5. 工具集成:集成外部工具和服务"""print("""🎯 LangGraph 框架核心概念总结:1. State(状态):- 使用 TypedDict 定义- 在节点间传递信息- 支持类型检查和注解2. Node(节点):- 执行具体任务的函数- 接收并返回状态- 可以是任何Python函数3. Edge(边):- 连接节点的路径- 定义执行顺序- 支持条件判断4. Graph(图):- 组织节点和边- 编译成可执行应用- 支持复杂的工作流逻辑5. 优势:- 可视化工作流- 状态持久化- 错误处理和重试- 并行执行支持- 工具集成简单""")# 运行演示demo_langgraph()print("\n🎉 LangGraph 演示完成!")print("💡 您可以修改 test_questions 来测试更多场景。")# 添加图形可视化visualize_graph()
总结
该示例全面展示了 LangGraph 的核心特性:
- 状态管理:通过 TypedDict 定义清晰的状态结构
- 节点设计:每个节点专注于特定功能
- 边的连接:控制执行流程
- 条件路由:根据状态动态选择路径
- 工具集成:集成外部工具和服务
- 工作流可视化:支持生成流程图