当前位置: 首页 > news >正文

【NLP】使用 LangGraph 构建 RAG 的Research Multi-Agent

本文中,我们介绍了一个使用LangGraph开发的RAG的Research Multi-Agent工具的实际项目。该工具旨在解决需要多个来源和迭代步骤才能得出最终答案的复杂问题。它使用混合搜索和rerank步骤来检索文档,还结合了自我纠正机制,包括幻觉检查过程,以提高响应可靠性,使其成为企业应用程序的理想选择。

一、简介 — Naive 与 Agentic RAG

对于真实的项目的而言,简单的 RAG 方法是不够的,原因如下:

  • 无法理解复杂的查询:无法将复杂的查询分解为多个可管理的子步骤,无法在单一级别处理查询,而不是分析每个步骤并得出统一的结论。
  • 缺乏幻觉或错误处理:简单的 RAG 管道缺乏响应验证步骤和处理幻觉的机制,从而阻止它们通过生成新的响应来纠正错误。
  • 缺乏动态工具使用:Naive RAG 系统不允许使用工具、调用外部 API 或根据工作流条件与数据库交互。

 因此,我们实施了一个多智能体 RAG 系统来解决所有这些问题。事实上,基于智能体的框架可以实现以下功能:

  • 路由和使用工具:路由代理可以对用户的查询进行分类,并将流程引导至适当的节点或工具。这可以实现基于上下文的决策,例如确定文档是否需要完整摘要、是否需要更详细的信息,或者问题是否超出范围。
  • 规划子步骤:复杂的查询通常需要分解成更小、更易于管理的步骤。从查询开始,可以生成一系列步骤,以便在探索查询的不同方面得出结论。例如,如果查询需要比较文档的两个不同部分,基于代理的方法可以识别这种比较需求,分别检索两个源,并将它们合并到最终响应的比较分析中。
  • 反射与纠错:除了简单的响应生成之外,基于代理的方法还可以添加验证步骤,以解决潜在的幻觉、错误或无法准确回答用户查询的响应。这还可以将自我纠正机制与人机交互相结合,将人工输入融入自动化流程。这些功能使基于代理的 RAG 系统成为企业应用更稳健、更可靠的解决方案,因为可靠性是企业应用的首要任务。
  • 共享全局状态:代理 工作流共享全局状态,简化跨多个步骤的状态管理。此共享状态对于维护多代理流程不同阶段的一致性至关重要。

二、项目概述

代理 RAG 图

图表步骤:

  1. 分析和路由查询(自适应 RAG):用户的查询被分类并路由到适当的节点。从那里,系统可以进入下一步(“研究计划生成”),向用户请求更多信息,或者在查询超出范围时立即响应
  2. 研究计划生成:系统会根据请求的复杂程度,生成一个分步研究计划,其中包含一个或多个步骤。然后,系统会返回解决用户问题所需的具体步骤列表。
  3. 研究子图:研究计划生成中定义的每个步骤都会调用一个子图。具体来说,子图首先通过 LLM 生成两个查询。接下来,系统使用集成检索器(使用相似性搜索BM25MMR)检索与这些生成的查询相关的文档。然后,在“重新排序”步骤中应用基于 Cohere 的上下文压缩,最终得出所有步骤中排名前k的相关文档及其相关分数。
  4. 生成步骤:根据相关文档,该工具通过 LLM 生成答案。
  5. 幻觉检查(基于人机交互的自校正 RAG):系统会进行反思步骤,分析生成的答案,以确定其是否与提供的上下文相符,并涵盖所有方面。如果检查失败,则图表工作流程将中断,并提示用户生成修改后的答案或结束流程。

对于向量存储的创建,使用DoclingLangChain实现了基于段落的分块方法,并使用ChromaDB构建了向量数据库。

 

构建向量数据库

文档解析

对于结构复杂的 PDF(包括布局复杂的表格),必须谨慎选择用于解析的工具。许多库在处理页面布局或表格结构复杂的 PDF 时缺乏精确度。

为了解决这个问题,我们使用了开源库Docling 。它能够直接高效地解析文档,并支持导出为所需格式。它可以读取各种常用文档格式,包括 PDF、DOCX、PPTX、XLSX、图像、HTML、AsciiDoc 和 Markdown,并将其导出为 Markdown 和 JSON 格式。Docling 能够全面理解 PDF 文档,包括表格结构、阅读顺序和页面布局。此外,它还支持对扫描 PDF 进行 OCR 识别。

然后将 PDF 中包含的文本转换为 Markdown 格式,这对于遵循基于段落的结构的分块是必要的。

from docling.document_converter import DocumentConverterlogger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()

提取的文本将具有类似于下图的结构。可以看出,PDF 和表格解析提取的文本保留了原始格式。

根据标题和使用MarkdownHeaderTextSplitter,输出文本随后被分成几块,得到 332 个Document对象的列表(LangChain 文档)。

from langchain_text_splitters import MarkdownHeaderTextSplitterheaders_to_split_on = [("#", "Header 1"),("##", "Header 2")
]markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
docs_list = markdown_splitter.split_text(markdown_document)
docs_list
# Output example
[Document(metadata={'Header 2': 'A letter from our Chief Sustainability Officer and our Senior Vice President of Learning and Sustainability'}, page_content="...."),
...]# len(docs_list):
332

向量数据库构建

我们构建一个向量数据库,将句子存储为向量嵌入,并在该数据库中进行搜索。在本例中,我们使用Chroma,并在本地目录“ db_vector”中存储一个持久数据库。

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddingsembd = OpenAIEmbeddings()vectorstore_from_documents = Chroma.from_documents(documents=docs_list,collection_name="rag-chroma-google-v1",embedding=embd,persist_directory='db_vector'
)

基于LangGraph主图构建

实施的系统包括两个图表:

  • 研究人员图表作为子图,其任务是生成不同的查询,用于从向量数据库中检索和重新排序前 k 个文档。
  • 主图,包含主要工作流程,例如分析用户的查询、生成完成任务所需的步骤、产生响应以及使用人在环机制检查幻觉。

 LangGraph 图形预览

LangGraph的核心概念之一是状态。每次图执行都会创建一个状态,该状态在图中节点执行时传递,每个节点执行后都会用其返回值更新此内部状态。

让我们从构建图形状态开始这个项目。为了实现这一点,我们定义两个类:

  • Router: 包含将用户查询分类为以下类别之一的结果:“更多信息”,“环境”或“一般”。
  • GradeHallucination:包含一个二进制分数,表示反应中是否存在幻觉。
from pydantic import BaseModel, Fieldclass Router(TypedDict):"""Classify user query."""logic: strtype: Literal["more-info", "environmental", "general"]from pydantic import BaseModel, Fieldclass GradeHallucinations(BaseModel):"""Binary score for hallucination present in generation answer."""binary_score: str = Field(description="Answer is grounded in the facts, '1' or '0'")

定义的图形状态为:
InputState:包括用户和代理之间交换的消息列表。
AgentState:包含Router对用户查询的分类、研究计划中要执行的步骤列表、代理可以引用的检索文档列表以及二元评分Gradehuciation。

from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.documents import Document
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from utils.utils import reduce_docs@dataclass(kw_only=True)
class InputState:"""Represents the input state for the agent.This class defines the structure of the input state, which includesthe messages exchanged between the user and the agent. It serves asa restricted version of the full State, providing a narrower interfaceto the outside world compared to what is maintained iternally."""messages: Annotated[list[AnyMessage], add_messages]"""Messages track the primary execution state of the agent.Typically accumulates a pattern of Human/AI/Human/AI messages.Returns:A new list of messages with the messages from `right` merged into `left`.If a message in `right` has the same ID as a message in `left`, themessage from `right` will replace the message from `left`."""# Primary agent state
@dataclass(kw_only=True)
class AgentState(InputState):"""State of the retrieval graph / agent."""router: Router = field(default_factory=lambda: Router(type="general", logic=""))"""The router's classification of the user's query."""steps: list[str] = field(default_factory=list)"""A list of steps in the research plan."""documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)"""Populated by the retriever. This is a list of documents that the agent can reference."""hallucination: GradeHallucinations = field(default_factory=lambda: GradeHallucinations(binary_score="0"))

步骤1:分析和路线查询


函数analyze_and_route_query返回并更新状态AgentState的路由器变量。函数route_query根据之前的查询分类确定下一步
具体来说,此步骤使用Router对象更新状态,该Router对象的类型变量包含以下值之一:“更多信息”、“环境”或“常规”。根据此信息,工作流将被路由到适当的节点(“create_research_plan”、“ask_for_more_info”或“respond_to_general_query”之一)。

async def analyze_and_route_query(state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:"""Analyze the user's query and determine the appropriate routing.This function uses a language model to classify the user's query and decide how to route itwithin the conversation flow.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used for query analysis.Returns:dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic)."""model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)messages = [{"role": "system", "content": ROUTER_SYSTEM_PROMPT}] + state.messageslogging.info("---ANALYZE AND ROUTE QUERY---")response = cast(Router, await model.with_structured_output(Router).ainvoke(messages))return {"router": response}def route_query(state: AgentState,
) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:"""Determine the next step based on the query classification.Args:state (AgentState): The current state of the agent, including the router's classification.Returns:Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]: The next step to take.Raises:ValueError: If an unknown router type is encountered."""_type = state.router["type"]if _type == "environmental":return "create_research_plan"elif _type == "more-info":return "ask_for_more_info"elif _type == "general":return "respond_to_general_query"else:raise ValueError(f"Unknown router type {_type}")

 问题的输出示例:“Retrieve the data center PUE efficiency value in Dublin in 2019”

{"logic":"This is a specific question about the environmental efficiency of a data center in Dublin in 2019, which relates to the Environmental Report.","type":"environmental"
}

步骤1.1超出范围/需要更多信息


然后,我们定义了函数ask_for_more_info和respond_to_general_query,它们通过调用LLM直接为用户生成响应:如果路由器确定需要用户提供更多信息,则将执行第一个函数,而第二个函数则生成对与我们的主题无关的一般查询的响应。在这种情况下,有必要将生成的响应与消息列表连接起来,更新状态中的消息变量。

async def ask_for_more_info(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a response asking the user for more information.This node is called when the router determines that more information is needed from the user.Args:state (AgentState): The current state of the agent, including conversation history and router logic.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = MORE_INFO_SYSTEM_PROMPT.format(logic=state.router["logic"])messages = [{"role": "system", "content": system_prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}async def respond_to_general_query(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a response to a general query not related to environmental.This node is called when the router classifies the query as a general question.Args:state (AgentState): The current state of the agent, including conversation history and router logic.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = GENERAL_SYSTEM_PROMPT.format(logic=state.router["logic"])logging.info("---RESPONSE GENERATION---")messages = [{"role": "system", "content": system_prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}

输出示例:“What’s the weather like in Altamura?”

{"logic":"What's the weather like in Altamura?","type":"general"
}
# ---RESPONSE GENERATION---
"I appreciate your question, but I'm unable to provide information about the weather. My focus is on Environmental Reports. If you have any questions related to that topic, please let me know, and I'll be happy to help!"

步骤2:制定research 计划


如果查询分类返回值“environmental”,则用户的请求在文档的范围内,工作流将到达create_research_plan节点,该节点的功能为回答与环境相关的查询创建一个逐步的研究计划。

async def create_research_plan(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:"""Create a step-by-step research plan for answering a environmental-related query.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used to generate the plan.Returns:dict[str, list[str]]: A dictionary with a 'steps' key containing the list of research steps."""class Plan(TypedDict):"""Generate research plan."""steps: list[str]model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)messages = [{"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}] + state.messageslogging.info("---PLAN GENERATION---")response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))return {"steps": response["steps"], "documents": "delete"}

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

{"steps":["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources."]
}

 在这种情况下,用户的请求只需要一步即可检索信息。


步骤3:进行research


此功能从研究计划中迈出第一步,并使用它进行研究。对于研究,该函数调用子图researcher_graph,它返回一个块列表,我们将在下一节中探索。最后,我们通过删除刚刚执行的步骤来更新状态中的steps变量。

async def conduct_research(state: AgentState) -> dict[str, Any]:"""Execute the first step of the research plan.This function takes the first step from the research plan and uses it to conduct research.Args:state (AgentState): The current state of the agent, including the research plan steps.Returns:dict[str, list[str]]: A dictionary with 'documents' containing the research results and'steps' containing the remaining research steps.Behavior:- Invokes the researcher_graph with the first step of the research plan.- Updates the state with the retrieved documents and removes the completed step."""result = await researcher_graph.ainvoke({"question": state.steps[0]}) #graph call directlydocs = result["documents"]step = state.steps[0]logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")return {"documents": result["documents"], "steps": state.steps[1:]}

步骤4:Researcher子图构建

 如上图所示,该图由查询生成步骤和相关块的检索步骤组成,查询生成步骤从主图传递的步骤开始。正如我们对主图所做的那样,让我们继续定义状态QueryState(研究员图中retrieve_documents节点的私有状态)和ResearcherState(researcher图的状态)。

"""States for the researcher subgraph.This module defines the state structures used in the researcher subgraph.
"""from dataclasses import dataclass, field
from typing import Annotated
from langchain_core.documents import Document
from utils.utils import reduce_docs@dataclass(kw_only=True)
class QueryState:"""Private state for the retrieve_documents node in the researcher graph."""query: str@dataclass(kw_only=True)
class ResearcherState:"""State of the researcher graph / agent."""question: str"""A step in the research plan generated by the retriever agent."""queries: list[str] = field(default_factory=list)"""A list of search queries based on the question that the researcher generates."""documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)"""Populated by the retriever. This is a list of documents that the agent can reference."""

步骤4.1:生成查询


此步骤基于问题生成搜索查询(research计划中的一个步骤)。此函数使用LLM生成各种搜索查询来帮助回答问题。

async def generate_queries(state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:"""Generate search queries based on the question (a step in the research plan).This function uses a language model to generate diverse search queries to help answer the question.Args:state (ResearcherState): The current state of the researcher, including the user's question.config (RunnableConfig): Configuration with the model used to generate queries.Returns:dict[str, list[str]]: A dictionary with a 'queries' key containing the list of generated search queries."""class Response(TypedDict):queries: list[str]logger.info("---GENERATE QUERIES---")model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)messages = [{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},{"role": "human", "content": state.question},]response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))queries = response["queries"]queries.append(state.question)logger.info(f"Queries: {queries}")return {"queries": response["queries"]}

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

{"queries":["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources.""PUE efficiency value data centers Dublin 2019","Power Usage Effectiveness statistics data centers Dublin 2019"]
}

生成查询后,我们可以使用前面定义的持久数据库定义向量库。

def _setup_vectorstore() -> Chroma:"""Set up and return the Chroma vector store instance."""embeddings = OpenAIEmbeddings()return Chroma(collection_name=VECTORSTORE_COLLECTION,embedding_function=embeddings,persist_directory=VECTORSTORE_DIRECTORY)

在RAG系统中,最关键的部分是文档检索过程。因此,人们对所使用的技术给予了极大的关注:具体来说,选择了一个集合检索器作为混合搜索和Coherer进行重新排序。

混合搜索是“关键字风格”搜索和“矢量风格”搜索的组合。它具有进行关键字搜索的优点,以及从嵌入和向量搜索中获得的语义搜索的优点。Ensemble Retriever是一种检索算法,旨在通过结合多个单独检索器的优势来提高信息检索的性能。这种方法被称为“集成检索”,它使用一种称为往复式秩融合的方法来重新排序和合并来自不同检索器的结果,从而提供比单独使用任何单个检索器更准确和相关的结果。

# Create base retrievers
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})ensemble_retriever = EnsembleRetriever(retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],weights=ENSEMBLE_WEIGHTS,)

重新排序是一种可用于提高RAG管道性能的技术。这是一种非常强大的方法,可以显著提升搜索系统。简而言之,重新排序需要一个查询和一个响应,并输出它们之间的相关性得分。通过这种方式,可以使用任何搜索系统来显示可能包含查询答案的多个文档,然后使用Rerank端点对其进行排序。

但是:为什么我们需要重新排名?

为了解决准确性方面的挑战,使用了两阶段检索作为提高搜索质量的一种手段。在这些两阶段系统中,第一阶段模型(集成检索器)从更大的数据集中检索一组候选文档。然后,使用第二阶段模型(重新排序器)对第一阶段模型检索到的文档进行重新排序。此外,重新排名模型,如Cohere Rerank,是一种在给定查询和文档对时输出相似性分数的模型。此分数可用于对与搜索查询最相关的文档进行重新排序。在重新排序方法中,Cohere-Rerank模型因其显著提高搜索准确性的能力而脱颖而出。该模型与传统的嵌入模型不同,它采用深度学习来直接评估每个文档和查询之间的对齐情况。Cohere-Rerank通过协同处理查询和文档来输出相关性得分,从而产生更细致的文档选择过程。

在这种情况下,检索到的文档会被重新排序,并返回前两个最相关的文档。

from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
from langchain_community.llms import Cohere# Set up Cohere re-ranking
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")# Build compression retriever
compression_retriever = ContextualCompressionRetriever(base_compressor=compressor,base_retriever=ensemble_retriever,
)compression_retriever.invoke("Retrieve the data center PUE efficiency in Dublin in 2019"
)

Output example to the question “Retrieve the data center PUE efficiency value in Dublin in 2019”:

[Document(metadata={'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 This calculation is based on..."),Document(metadata={'Header 2': 'DATA CENTER GRID REGION CFE', 'relevance_score': 0.20593424}, page_content="2023  \n| Country..." )]

步骤4.2:检索和重新排序文档功能

async def retrieve_and_rerank_documents(state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:"""Retrieve documents based on a given query.This function uses a retriever to fetch relevant documents for a given query.Args:state (QueryState): The current state containing the query string.config (RunnableConfig): Configuration with the retriever used to fetch documents.Returns:dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents."""logger.info("---RETRIEVING DOCUMENTS---")logger.info(f"Query for the retrieval process: {state.query}")response = compression_retriever.invoke(state.query)return {"documents": response}

步骤4.3 :构建子图

builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(retrieve_and_rerank_documents)
builder.add_edge(START, "generate_queries")
builder.add_conditional_edges("generate_queries",retrieve_in_parallel,  # type: ignorepath_map=["retrieve_and_rerank_documents"],
)
builder.add_edge("retrieve_and_rerank_documents", END)
researcher_graph = builder.compile()

步骤5:检查完成


使用conditional_edge,我们构建一个循环,其结束条件由check_finished返回的值决定。此函数检查create_research_plan节点创建的步骤列表中是否没有更多步骤需要处理。完成所有步骤后,流程将继续到响应节点。

def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:"""Determine if the research process is complete or if more research is needed.This function checks if there are any remaining steps in the research plan:- If there are, route back to the `conduct_research` node- Otherwise, route to the `respond` nodeArgs:state (AgentState): The current state of the agent, including the remaining research steps.Returns:Literal["respond", "conduct_research"]: The next step to take based on whether research is complete."""if len(state.steps or []) > 0:return "conduct_research"else:return "respond

步骤6:响应


根据所进行的research生成对用户查询的最终响应。此函数使用对话历史和research代理检索到的文档来制定全面的答案。

async def respond(state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:"""Generate a final response to the user's query based on the conducted research.This function formulates a comprehensive answer using the conversation history and the documents retrieved by the researcher.Args:state (AgentState): The current state of the agent, including retrieved documents and conversation history.config (RunnableConfig): Configuration with the model used to respond.Returns:dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response."""print("--- RESPONSE GENERATION STEP ---")model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)context = format_docs(state.documents)prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)messages = [{"role": "system", "content": prompt}] + state.messagesresponse = await model.ainvoke(messages)return {"messages": [response]}

步骤7:检查幻觉


此步骤检查基于检索到的文档的事实集是否支持LLM在上一步骤中生成的响应,并给出二进制分数。

async def check_hallucinations(state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:"""Analyze the user's query and checks if the response is supported by the set of facts based on the document retrieved,providing a binary score result.This function uses a language model to analyze the user's query and gives a binary score result.Args:state (AgentState): The current state of the agent, including conversation history.config (RunnableConfig): Configuration with the model used for query analysis.Returns:dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic)."""model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)system_prompt = CHECK_HALLUCINATIONS.format(documents=state.documents,generation=state.messages[-1])messages = [{"role": "system", "content": system_prompt}] + state.messageslogging.info("---CHECK HALLUCINATIONS---")response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))return {"hallucination": response}

步骤8:人工审批(人工参与)


如果LLM的回应没有得到一系列事实的支持,那么它很可能包含幻觉。在这种情况下,图形会中断,用户可以控制下一步:只重试上一代步骤,而不重新启动整个工作流或结束流程。这种人在循环步骤确保了用户的控制,同时避免了意外的循环或不希望的操作。

LangGraph中的中断函数通过在特定节点暂停图形、向人类呈现信息以及用他们的输入恢复图形,实现了人类在循环中的工作流程。此功能对于审批、编辑或收集其他输入等任务非常有用。中断函数与Command对象结合使用,以使用人类提供的值恢复图形。

def human_approval(state: AgentState,
):_binary_score = state.hallucination.binary_scoreif _binary_score == "1":return "END"else:retry_generation = interrupt({"question": "Is this correct?","llm_output": state.messages[-1]})if retry_generation == "y":print("voglio continuare")return "respond"else:return "END"

构建主图

from langgraph.graph import END, START, StateGraph
from langgraph.checkpoint.memory import MemorySavercheckpointer = MemorySaver()builder = StateGraph(AgentState, input=InputState)
builder.add_node(analyze_and_route_query)
builder.add_edge(START, "analyze_and_route_query")
builder.add_conditional_edges("analyze_and_route_query", route_query)
builder.add_node(create_research_plan)
builder.add_node(ask_for_more_info)
builder.add_node(respond_to_general_query)
builder.add_node(conduct_research)
builder.add_node("respond", respond)
builder.add_node(check_hallucinations)
builder.add_conditional_edges("check_hallucinations", human_approval, {"END": END, "respond": "respond"})
builder.add_edge("create_research_plan", "conduct_research")
builder.add_conditional_edges("conduct_research", check_finished)
builder.add_edge("respond", "check_hallucinations")graph = builder.compile(checkpointer=checkpointer)

构建main方法(app.py)
“每个函数都被定义为异步,以在生成步骤中启用流行为。

from subgraph.graph_states import ResearcherState
from main_graph.graph_states import AgentState
from utils.utils import config, new_uuid
from subgraph.graph_builder import researcher_graph
from main_graph.graph_builder import InputState, graph
from langgraph.types import Command
import asyncio
import uuidimport asyncio
import time
import builtinsthread = {"configurable": {"thread_id": new_uuid()}}async def process_query(query):inputState = InputState(messages=query)async for c, metadata in graph.astream(input=inputState, stream_mode="messages", config=thread):if c.additional_kwargs.get("tool_calls"):print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="", flush=True)if c.content:time.sleep(0.05)print(c.content, end="", flush=True)if len(graph.get_state(thread)[-1]) > 0:if len(graph.get_state(thread)[-1][0].interrupts) > 0:response = input("\nThe response may contain uncertain information. Retry the generation? If yes, press 'y': ")if response.lower() == 'y':async for c, metadata in graph.astream(Command(resume=response), stream_mode="messages", config=thread):if c.additional_kwargs.get("tool_calls"):print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="")if c.content:time.sleep(0.05)print(c.content, end="", flush=True)async def main():input = builtins.inputprint("Enter your query (type '-q' to quit):")while True:query = input("> ")if query.strip().lower() == "-q":print("Exiting...")breakawait process_query(query)if __name__ == "__main__":asyncio.run(main())

在第一次调用后,检查图形状态是否有中断。如果找到任何图形,可以使用以下命令再次调用该图形:

graph.astream(Command(resume=response), stream_mode="messages", config=thread)

三、结果

现场测试

作为第一个测试,执行以下查询以从不同的表中提取不同的值,结合多步骤方法的功能并利用 Docling 库的解析功能。

复杂问题:Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023

完整结果正确幻觉检查成功通过

聊天机器人生成的步骤:

  • “Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.”,
  • ”Find the regional average CFE for the Asia Pacific region in 2023.”

生成的文本:“- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21

2023年亚太地区平均无碳能源(CFE)为12%。

完整输出

Enter your query (type '-q' to quit):
> Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 
2025-01-10 20:39:53,381 - INFO - ---ANALYZE AND ROUTE QUERY---
2025-01-10 20:39:53,381 - INFO - MESSAGES: [HumanMessage(content='Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 ', additional_kwargs={}, response_metadata={}, id='351a00e9-ecda-49e2-b069-19196348a82a')]
{"logic":"Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023","type":"environmental"}2025-01-10 20:39:55,586 - INFO - ---PLAN GENERATION---
{"steps":["Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.","Find the regional average CFE for the Asia Pacific region in 2023."]}2025-01-10 20:39:57,323 - INFO - ---GENERATE QUERIES---
{"queries":["PUE efficiency values Singapore 2nd facility 2019","PUE efficiency values Singapore 2nd facility 2022"]}2025-01-10 20:39:58,285 - INFO - Queries: ['PUE efficiency values Singapore 2nd facility 2019', 'PUE efficiency values Singapore 2nd facility 2022', 'Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.']
2025-01-10 20:39:58,288 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:58,288 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2019
2025-01-10 20:39:59,568 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:59,568 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2022
2025-01-10 20:40:00,891 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:00,891 - INFO - Query for the retrieval process: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.
2025-01-10 20:40:01,820 - INFO - 
4 documents retrieved in total for the step: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022..
2025-01-10 20:40:01,825 - INFO - ---GENERATE QUERIES---
{"queries":["Asia Pacific regional average CFE 2023","CFE statistics Asia Pacific 2023"]}2025-01-10 20:40:02,778 - INFO - Queries: ['Asia Pacific regional average CFE 2023', 'CFE statistics Asia Pacific 2023', 'Find the regional average CFE for the Asia Pacific region in 2023.']
2025-01-10 20:40:02,780 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:02,780 - INFO - Query for the retrieval process: Asia Pacific regional average CFE 2023
2025-01-10 20:40:03,757 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:03,757 - INFO - Query for the retrieval process: CFE statistics Asia Pacific 2023
2025-01-10 20:40:04,885 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:04,885 - INFO - Query for the retrieval process: Find the regional average CFE for the Asia Pacific region in 2023.
2025-01-10 20:40:06,526 - INFO - 
4 documents retrieved in total for the step: Find the regional average CFE for the Asia Pacific region in 2023..
2025-01-10 20:40:06,530 - INFO - --- RESPONSE GENERATION STEP ---
- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21 [e048d08a-4ef6-77b5-20d3-352dcec590b7].- The regional average Carbon-Free Energy (CFE) in the Asia Pacific for 2023 is 12% [9c489d2f-f16f-572b-abed-ee1d5d0ed379].2025-01-10 20:40:14,918 - INFO - ---CHECK HALLUCINATIONS---
{"binary_score":"1"}> 

现在让我们在ChatGPT上尝试一下。将pdf文件上传到web应用程序后,也进行了相同的查询。

如图所示,ChatGPT返回的值不正确,模型出现幻觉。在这种情况下,幻觉检查步骤将允许反应再生(Self-Reflective RAG)。

四、结论

Agentic RAG:技术挑战和注意事项

尽管性能有所提高,但实施 Agentic RAG 仍存在挑战:

  • 延迟:代理交互的复杂性增加通常会导致更长的响应时间。在速度和准确性之间取得平衡是一项关键挑战。
  • 评估和可观察性:随着 Agentic RAG 系统变得越来越复杂,持续评估和可观察性变得必要。

总而言之,Agentic RAG 标志着人工智能领域的重大突破。通过将大型语言模型的功能与自主推理和信息检索相结合,Agentic RAG 引入了智能和灵活性的全新标准。随着人工智能的不断发展,Agentic RAG 将在各行各业发挥重要作用,彻底改变我们使用技术的方式。

 

相关文章:

  • 前端项目3-01:登录页面
  • 教程 | 一键批量下载 Dify「Markdown 转 Docx」生成的 Word 文件(附源码)
  • 服务器的安装与安全设置
  • 机器学习18-强化学习RLHF
  • Excel基础:数据编辑
  • git 多用户管理 跨平台
  • dify小用
  • Miniconda+Jupyter+PyCharm初始环境配置
  • Linux命令:内置命令与外部命令的本质区别
  • 开疆智能CCLinkIE转ModbusTCP网关连接测联无纸记录仪配置案例
  • 60% 重构项目陷 “越改越烂” 泥潭!
  • 【stm32】HAL库开发——CubeMX配置外部中断和配置PWM
  • Flink部署与应用——Flink架构概览
  • 【机器学习深度学习】线性代数
  • 爬虫简单实操2——以贴吧为例爬取“某吧”前10页的网页代码
  • 华为交换机 USG6311E 新建 vlan
  • python的银行柜台管理系统
  • 大模型在慢性病毒性肝炎预测及诊疗方案制定中的应用研究
  • window显示驱动开发—支持 DXGI DDI(四)
  • 6月份最新代发考试战报:思科华为HCIP HCSE 考试通过