当前位置: 首页 > news >正文

使用langgraph 构建RAG 智能问答代理

RAG 智能问答代理:

✅ 支持用户持续提问

✅ 根据模型判断是否需要查资料

✅ 自动调用 PDF 检索工具查找内容

✅ 自动引用内容回答

✅ 可以输入 exit / quit 退出

下载需要的library

pip install langchain-google-genai
pip install langgraph
pip install langchain-community
pip install langchain-chroma
pip install pypdf
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, ToolMessage, SystemMessage, HumanMessage, AIMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_core.tools import tool
from operator import add as add_messages
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
import os

🔹 from typing import TypedDict, Annotated, Sequence
这些是 Python 的标准类型提示工具(来自 typing 模块):

TypedDict:允许你定义带类型的字典。例如,用于声明状态数据结构。

Annotated:用于在类型提示中附加额外的元信息。常用于 Pydantic 或运行时类型校验。

Sequence:表示有序的元素集合(比如 list 或 tuple),用于函数签名中明确参数是一个有序结构。

🔹 from langchain_core.messages import BaseMessage, ToolMessage, SystemMessage, HumanMessage, AIMessage
这些是 LangChain Core 中的消息类型,用于构建对话流。常见于聊天模型的输入/输出:

BaseMessage:所有消息的基类。

HumanMessage:用户输入的信息(人类说的话)。

AIMessage:AI 生成的回复。

SystemMessage:系统预设信息,例如系统提示或指令。

ToolMessage:来自外部工具的消息(比如函数调用返回的结果)。

这些类型可帮助聊天模型更准确地理解不同来源的消息。

🔹 from langchain_google_genai import ChatGoogleGenerativeAI
这是用于接入 Google Gemini 模型(以前称 PaLM)的 LangChain 接口:

ChatGoogleGenerativeAI:允许你使用 Google 的生成式 AI(例如 Gemini)进行聊天任务,支持多轮对话、函数调用、上下文管理等。

🔹 from langchain_google_genai import GoogleGenerativeAIEmbeddings
此类用于生成文本嵌入向量(Embeddings):

GoogleGenerativeAIEmbeddings:调用 Google Gemini 的嵌入接口,把文字转为向量(常用于检索、语义搜索、向量数据库等)。

🔹 from langchain_core.tools import tool
tool 是一个 装饰器,用于将一个普通函数转换为可供语言模型调用的工具(tool)。

常用于实现带函数调用能力的聊天模型(Function Calling 或 Tool Use)。

🔹 from operator import add as add_messages
Python 标准库的 operator.add 函数用于执行 a + b 操作。

这里使用了 as add_messages 给它起了别名,暗示这个函数可能会被用于合并两个消息列表

🔹 from langgraph.graph import StateGraph, START, END
这是 LangGraph 的核心模块,用于构建有状态的对话图(Stateful Computation Graph):

StateGraph:LangGraph 的状态图结构,用于构建可追踪的多步骤对话流程。

START, END:图中的特殊节点,表示开始和结束点。通常用于定义流程的入口和出口。

LangGraph 是 LangChain 的实验性扩展,可构建类似状态机的复杂应用,例如代理、多工具工作流。

🔹 from langgraph.prebuilt import ToolNode
ToolNode 是一个预构建的节点类,表示图中执行工具调用的节点。

它可自动处理工具输入/输出,在 LangGraph 中连接工具和对话状态。

🔹 from langchain_community.document_loaders import PyPDFLoader
PyPDFLoader 是一个文档加载器,用于从 PDF 文件中提取文本内容。

常用于文档问答(RAG)场景,比如加载报告、合同、书籍等。

🔹 from langchain.text_splitter import RecursiveCharacterTextSplitter
RecursiveCharacterTextSplitter 是一种文本切分器,用于将文档按段落或句子切成小块(chunks),便于索引和嵌入。

支持递归回退策略,根据标点符号逐级切分,保持语义连贯。

🔹 from langchain_chroma import Chroma
Chroma 是向量数据库的接口,支持将嵌入存入本地或远程数据库,并进行语义搜索。

它用于构建带记忆或文档检索能力的聊天系统。

llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", google_api_key=GOOGLE_API_KEY, temperature=0)

创建一个 低温度设定的 Gemini 2.5 Flash 模型客户端,用于多轮对话、函数调用(Tool Use)、文档问答等任务。
temperature 控制模型生成的 随机性/创造性。
模型在生成时将非常稳定、可预测、偏向选择概率最高的输出。非常适合用于确定性应用,如问答系统、数据处理、文档提取等。

embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001",google_api_key=GOOGLE_API_KEY
)

创建一个 embeddings 实例,它使用 Google 的 Gemini 嵌入模型(embedding-001)将文本转化为高维向量,用于语义检索、向量数据库存储或文本相似度比较等下游任务。

pdf_path = "Stock_Market_Performance_2024.pdf"if not os.path.exists(pdf_path):raise FileNotFoundError(f"PDF file not found: {pdf_path}")

os.path.exists(pdf_path) 会检查这个路径下是否存在对应的文件或文件夹。
raise FileNotFoundError(…) 是 Python 的一种错误抛出机制。
当找不到文件时,主动抛出 FileNotFoundError 异常。

pdf_loader = PyPDFLoader(pdf_path)  # This loads the PDF# Checks if the PDF is there
try:pages = pdf_loader.load()print(f"PDF has been loaded and has {len(pages)} pages")
except Exception as e:print(f"Error loading PDF: {e}")raise

PyPDFLoader(pdf_path) 是 LangChain 提供的一个 PDF 文档加载器类。
它会读取 PDF 文件,为后续的问答系统或嵌入向量处理做准备。
但注意:这行代码只是初始化了一个加载器对象,并没有真正加载内容。真正加载发生在 .load() 方法中。

.load() 方法才是真正执行 读取 PDF 文件内容 的操作。
它会返回一个按页拆分的文本内容列表(List[Document]),每一页是一个 LangChain Document 对象,包含内容和元信息(如页码)。

raise:再次抛出这个异常,终止程序运行(防止之后的代码继续运行在错误状态下)。

创建一个「递归字符级文本切分器」,把文档切成一段段长度不超过 1000 字符的 小块文本(chunk),且相邻的文本块之间有 200 字符的重叠(overlap)。

# Chunking Process
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200
)

相邻两个 chunk 之间会有 200 个字符的重叠内容。
目的是:保持语义连续性,避免重要信息被截断在边界上。

把 PDF 文档内容切分成小段(chunks),转化为向量,然后存入本地的 Chroma 向量数据库,用于后续的语义搜索或问答(RAG)应用。

pages_split = text_splitter.split_documents(pages)  # We now apply this to our pagespersist_directory = r"vec_db"
collection_name = "stock_market"# If our collection does not exist in the directory, we create using the os command
if not os.path.exists(persist_directory):os.makedirs(persist_directory)try:# Here, we actually create the chroma database using our embeddigns modelvectorstore = Chroma.from_documents(documents=pages_split,embedding=embeddings,persist_directory=persist_directory,collection_name=collection_name)print(f"Created ChromaDB vector store!")except Exception as e:print(f"Error setting up ChromaDB: {str(e)}")raise

pages 是前面通过 PyPDFLoader().load() 得到的文档页(每页是一个 Document 对象)。

text_splitter.split_documents(pages) 会对每页文本进行切分(chunking),生成多个小段文本,每段仍然是一个 Document 对象。

切分规则由 text_splitter 定义(如:1000 字,重叠 200 字)。

检查本地磁盘上是否已经存在名为 “vec_db” 的文件夹。

如果不存在,则使用 os.makedirs() 创建它,防止写入数据库时报错。

documents=pages_split:要写入向量数据库的 Document 列表(每个含文本和元数据)。

embedding=embeddings:使用哪个嵌入模型把文本转成向量。你前面定义的是 Google Gemini 的嵌入模型。

persist_directory=persist_directory:Chroma 向量数据库在磁盘上保存的文件夹路径。

collection_name=collection_name:给这组向量一个逻辑名称(一个“集合”)。

从 Chroma 向量数据库中创建一个 Retriever(检索器),用于后续问答(RAG)或聊天任务中从文档中“找出最相关的内容”。

# Now we create our retriever
retriever = vectorstore.as_retriever(search_type="similarity",search_kwargs={"k": 5}  # K is the amount of chunks to return
)

创建一个“文档检索器(Retriever)”,每次输入一个查询问题时,它会返回与问题最相关的 5 个文档块(chunk),基于语义相似度。
vectorstore 是你刚刚用 Chroma 建好的本地向量数据库,包含了你 PDF 文档的所有嵌入向量。
.as_retriever() 是 LangChain 的方法,它会将向量数据库对象封装为一个可检索的接口对象。
指定检索方式为 “similarity”,即使用向量相似度搜索。
k=5 表示每次检索返回 最相关的前 5 个文档块(chunk)。

将你之前创建的 retriever 封装成一个可以被 LLM 代理(如 Gemini 或 LangChain Agent)调用的工具。工具的作用是:从 PDF 文档中查找问题相关的段落内容,并返回结果给语言模型。

@tool
def retriever_tool(query: str) -> str:"""This tool searches and returns the information from the Stock Market Performance 2024 document."""docs = retriever.invoke(query)if not docs:return "I found no relevant information in the Stock Market Performance 2024 document."results = []for i, doc in enumerate(docs):results.append(f"Document {i+1}:\n{doc.page_content}")return "\n\n".join(results)

创建了一个名为 retriever_tool 的函数工具,用于在向量数据库中检索有关用户查询的问题的文档段落(chunks),然后格式化成文本返回。

定义一个名为 retriever_tool 的函数

它接收一个字符串 query,表示用户的查询问题

返回一个字符串(即结果文本)

函数的文档注释(docstring)
这段文字会作为 工具的说明文档,提供给代理模型了解工具用途

retriever.invoke(query) 会返回与你的问题语义最相近的文档段落列表(通常是 5 个)

如果找不到相关段落(即检索结果为空),直接返回一段提示信息。
这样可以避免模型误解为“有内容但没展示”。

将所有结果用两个换行符拼接成一个字符串

返回给 LLM,作为检索器的最终输出

将你定义的外部工具(retriever_tool)绑定到语言模型(llm)上,让它具有调用工具的能力。

tools = [retriever_tool]llm = llm.bind_tools(tools)

创建一个名为 tools 的列表,里面包含一个你自定义的工具函数 retriever_tool。

retriever_tool 是你前面定义过并用 @tool 装饰过的检索工具,它可以从 PDF 文档中找出与用户问题相关的段落。

把你定义的工具 tools 绑定到语言模型对象 llm 上,使得它在生成回答时可以调用这些工具。

这使得模型可以在需要外部知识时 自动调用 retriever_tool 来辅助回答。

llm 是你使用的 ChatGoogleGenerativeAI(…)(Gemini 模型)对象。

.bind_tools(tools) 返回的是一个新模型对象,这个对象的能力比原始 llm 更强:

它能识别工具说明(docstring)

它知道哪些工具可以使用、怎么使用(参数是什么)

它会在遇到问题无法回答时自动决定是否调用工具

你可以把这个看作是“增强模型”:它除了聊天之外,还能主动调用外部函数。

构建一个LangGraph 智能代理(Agent)系统中的状态管理与控制逻辑,这是 LangChain 中创建多步骤对话流程(如工具调用、LLM响应、条件跳转等)时的关键组成部分。

class AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], add_messages]def should_continue(state: AgentState):"""Check if the last message contains tool calls."""result = state['messages'][-1]return hasattr(result, 'tool_calls') and len(result.tool_calls) > 0

AgentState 是什么?
这是一个使用 TypedDict 定义的 自定义状态类型,用来在 LangGraph 工作流中传递和记录会话状态。

messages 是 AgentState 中的唯一字段,表示当前会话的消息历史。

它是一个 消息对象的列表(Sequence[BaseMessage]),每个对象可以是:

HumanMessage:用户输入

AIMessage:模型响应

ToolMessage:工具返回结果

SystemMessage:系统指令

LangChain 中统一继承自 BaseMessage。

这部分是 LangGraph 的特有写法,表示:

这个 messages 字段在每一步节点运行时应该追加消息(而不是替换),由 add_messages 函数控制。

🧠 作用:
LangGraph 会根据 add_messages 逻辑自动将新的消息(如 LLM 的回答、工具调用结果)加入到 messages 这个状态列表中,实现完整会话追踪。

should_continue() 是做什么的?
它是一个用于流程控制(Conditional Edge)的函数,告诉 LangGraph:

“根据当前的 AgentState,我该不该继续调用工具(tool)?”

获取当前状态中最后一条消息(即刚生成的 LLM 响应)

判断:

最后一条消息是否有 tool_calls 属性(即模型有没有提出要调用某个工具)

并且这些工具调用的列表不为空

如果满足这两个条件,就返回 True,表示应该“继续”,即跳转到下一步工具调用节点。

否则就返回 False,表示无需再调用工具,可以终止或输出结果。

什么是 system prompt?

在大多数对话式 AI 系统中(包括 GPT、Gemini、Claude 等),system prompt 是用来定义模型角色与行为规则的提示语。

它在每次对话开始时就被送入模型,作为“隐藏的第一句话”,影响模型后续所有行为。

在 LangChain 或 LangGraph 中,常通过 SystemMessage(content=system_prompt) 明确注入。

system_prompt = """
You are an intelligent AI assistant who answers questions about Stock Market Performance in 2024 based on the PDF document loaded into your knowledge base.
Use the retriever tool available to answer questions about the stock market performance data. You can make multiple calls if needed.
If you need to look up some information before asking a follow up question, you are allowed to do that!
Please always cite the specific parts of the documents you use in your answers.
"""

“你现在是一个股市问答专家,但不能靠记忆胡说八道。你只能通过一个叫 retriever_tool 的工具从我们给你的 PDF 文档中查资料,才能回答问题。你可以多次查资料,而且请在回答时引用文档的内容。”

构建一个具备“工具调用能力”的语言模型代理(LLM Agent)的关键组件,用于在 LangGraph 中调用语言模型并处理对话状态。

tools_dict = {our_tool.name: our_tool for our_tool in tools}  # Creating a dictionary of our tools# LLM Agent
def call_llm(state: AgentState) -> AgentState:"""Function to call the LLM with the current state."""messages = list(state['messages'])messages = [SystemMessage(content=system_prompt)] + messagesmessage = llm.invoke(messages)return {'messages': [message]}

创建一个以工具名称为 key、工具本身为 value 的字典,用于在后续流程中快速根据名称定位工具。

定义了一个 LangGraph 中的 节点函数(node),在流程图运行时,它会根据当前 state 调用语言模型并返回新结果。

从当前的对话状态 AgentState 中提取消息列表

state[‘messages’] 是之前记录的用户提问 / 工具回复 / 模型回答等内容

把你之前定义的 system_prompt 加到消息列表的最前面

这一步至关重要:它确保每次模型调用都记住“我是谁、我该怎么答、要用 retriever_tool 查资料”

把你之前定义的 system_prompt 加到消息列表的最前面

这一步至关重要:它确保每次模型调用都记住“我是谁、我该怎么答、要用 retriever_tool 查资料”

把刚才模型生成的新 AIMessage 包装成一个新的 AgentState 字典

LangGraph 中,所有节点函数都要返回类似这种结构(新状态),以便下一步继续推进

🧠 配合你之前的 add_messages 注释,这个新消息会自动合并进原有消息列表中。

LangGraph 智能代理流程中的一个核心步骤:执行工具调用的动作节点。

# Retriever Agent
def take_action(state: AgentState) -> AgentState:"""Execute tool calls from the LLM's response."""tool_calls = state['messages'][-1].tool_calls# 取出对话历史中的最后一条消息(即模型刚刚生成的 AIMessage)
.	# tool_calls 是该消息中模型请求调用的工具清单results = []for t in tool_calls:# 可能有多个工具请求(例如一个问句查两段内容),所以逐一处理# t 表示一个单独的工具调用print(f"Calling Tool: {t['name']} with query: {t['args'].get('query', 'No query provided')}")# 实时打印工具名称和参数(便于调试)if not t['name'] in tools_dict:  # Checks if a valid tool is presentprint(f"\nTool: {t['name']} does not exist.")result = "Incorrect Tool Name, Please Retry and Select tool from List of Available tools."# 如果模型给出的工具名不合法(拼错或没注册),返回错误提示# 避免运行未知或恶意工具名else:result = tools_dict[t['name']].invoke(t['args'].get('query', ''))print(f"Result length: {len(str(result))}")# 从 tools_dict 里取出对应的工具函数(如 retriever_tool)# 调用 .invoke() 方法执行工具(工具的实际逻辑如从 PDF 中查找内容)# 获取结果(通常是字符串),打印结果长度# Appends the Tool Messageresults.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))# LangChain 使用 ToolMessage 来表示“工具的返回结果”# 它需要:# tool_call_id: 和模型请求时的 ID 对应(用于追踪)# name: 工具名# content: 工具运行的结果(必须是字符串)print("Tools Execution Complete. Back to the model!")return {'messages': results}

这个函数叫 take_action,它的作用是:

从上一步模型生成的响应中提取所有工具调用(tool_calls),逐个执行对应工具函数,并把结果封装成 ToolMessage 发送回模型。

用户提问 → LLM 判断是否需要工具 → 如果需要 → 调用工具 → 再回到 LLM → 最终回答。

graph = StateGraph(AgentState)
graph.add_node("llm", call_llm)
graph.add_node("retriever_agent", take_action)graph.add_conditional_edges("llm",should_continue,{True: "retriever_agent", False: END}
)
graph.add_edge("retriever_agent", "llm")
graph.set_entry_point("llm")
rag_agent = graph.compile()

创建一个新的 LangGraph 对话流程图,名为 graph
注册两个节点函数(处理流程的步骤):

“llm”:使用语言模型(如 Gemini 2.5)生成回复或提出工具调用请求 → 对应函数 call_llm

“retriever_agent”:真正执行工具调用 → 对应函数 take_action

执行完 “llm” 节点后

使用 should_continue() 判断模型返回是否包含工具调用(tool_calls)

如果是:

跳转到 “retriever_agent” 节点 → 实际调用工具

如果否:

直接终止(END)→ 模型回答完毕

一旦工具调用完毕,工具节点 retriever_agent 会输出 ToolMessage 到状态中

然后再次执行 “llm” 节点,让模型根据工具返回结果继续回答或再发起下一轮调用

告诉 LangGraph:流程从 “llm” 节点开始(即用户提问后,先调用语言模型)

     ┌────────────┐│   llm      │ ◄─────────────┐└────┬───────┘               ││                       │should_continue()              ││         │                 │Yes       No                 │↓         ↓                ▲
┌────────────┐  └───────▶ END   │
│retriever_agent│               │
└──────┬───────┘                ││                        │└────────▶───────────────┘

RAG 智能问答代理的主程序入口,以命令行(终端)方式运行,实现一个可以连续提问、基于 PDF 自动查资料、调用工具回答的智能问答助手。

print("\n=== RAG AGENT===")while True:user_input = input("\nWhat is your question: ")if user_input.lower() in ['exit', 'quit']:breakmessages = [HumanMessage(content=user_input)]  # converts back to a HumanMessage typeresult = rag_agent.invoke({"messages": messages})print("\n=== ANSWER ===")print(result['messages'][-1].content)

进入一个无限循环,等待用户在命令行输入问题

input() 是 Python 的标准输入函数

每次输入的内容都会被作为提问交给智能代理

如果用户输入了 exit 或 quit(不区分大小写),就退出循环,结束程序

是一种常见的 CLI 退出机制

将用户输入内容包装成 HumanMessage 对象

这是 LangChain 中对话格式的标准写法,便于与模型/工具交互

messages 是一个 list,因为智能代理是多轮对话系统(可以支持多条消息历史)

你在前面已经构建并 compile() 出了一个图形化代理:rag_agent

.invoke() 方法会触发整个 LangGraph 流程,包括:

调用语言模型(llm 节点)

判断是否需要工具(should_continue)

如果需要,调用工具(retriever_agent 节点)

工具返回结果后,再次让模型生成最终回答

输出 LangGraph 最终执行结果中的最后一条消息(通常是 AI 的回答)

result[‘messages’] 是整个状态流中的历史消息列表

[-1] 表示最后一条,通常是模型的回答 AIMessage

User input (What happened in Q2 2024?)[HumanMessage(content=user_input)]↓
rag_agent.invoke(...)  → 调用 LangGraph 图↓
[llm](需要工具?)[retriever_agent][llm]↓
模型综合结果回答↓
输出 ANSWER
=== RAG AGENT===What is your question: how was the SMP500 performing in 2024?
Calling Tool: retriever_tool with query: S&P 500 performance in 2024
Result length: 4385
Tools Execution Complete. Back to the model!=== ANSWER ===
The S&P 500 index had a remarkably strong performance in 2024, delivering roughly a 25% total return (around +23% in price terms). This marked the second consecutive year of over 20% returns for the S&P 500, a feat not seen since the late 1990s. The gains were disproportionately driven by mega-cap technology stocks, particularly the "Magnificent 7" companies, which accounted for over half (about 54%) of the S&P 500's total return for the year.(Source: Document 1, Document 4, Document 5)What is your question: How did OpenAI perform in 2024?
Calling Tool: retriever_tool with query: OpenAI performance 2024
Result length: 4078
Tools Execution Complete. Back to the model!=== ANSWER ===
I am sorry, but I cannot provide information on OpenAI's performance in 2024. The provided documents discuss the overall U.S. stock market performance, highlighting the strong performance of mega-cap technology stocks and companies benefiting from AI adoption, such as Nvidia, Netflix, and Alphabet (Google). However, OpenAI is not mentioned in the provided text.What is your question: exit
http://www.dtcms.com/a/275741.html

相关文章:

  • Kotlin文件
  • 【GESP】C++ 2025年6月一级考试-客观题真题解析
  • 小学家长和老师最喜欢的出题神器!
  • 大模型量化相关
  • 二分法寻找无序序列的峰值
  • 【Scratch】从入门到放弃(五):指令大全-运算、变量、自制积木
  • 第14次课 认识图 A
  • 一分钟快速了解Apache
  • 阿里开源AI大模型ThinkSound如何为视频配上灵魂之声
  • 分层架构的C++高并发内存池性能优化
  • 【PTA数据结构 | C语言版】出栈序列的合法性
  • Paimon Lookup 哈希文件和Sort文件选择
  • 粒子滤波|粒子滤波的相关算法理论介绍
  • el-tree 懒加载 loadNode
  • Vue》》总结
  • Flutter、React Native、Uni-App 的比较与分析
  • Redis分布式锁面试笔记
  • wedo智能车库-----第31节(免费分享图纸)
  • 【离线数仓项目】——数据模型开发实战
  • Kafka——聊聊Kafka的版本号
  • 前后端分离项目的完整部署(Jenkins自动化部署)
  • ScreenToGif开源免费GIF录制制作工具,一键生成编辑GIF文件,自用多年
  • 【嵌入式】51单片机学习笔记-Keil5软件安装教程
  • Qt6中出现 OpenCV(4.10.0) Error: Assertion failed
  • 软件开发模型
  • UV的使用总结
  • Git企业级开发(多人协作)
  • 从万亿参数到「会动手」:Kimi-K2 如何重新定义开源大模型的边界
  • Linux/Ubuntu安装go
  • 【Linux网络】IP 协议详解:结构、地址与交付机制全面解析