基于知识图谱增强的RAG系统阅读笔记(五)Agentic RAG:基于代理的RAG
第五章 Agentic RAG:基于代理的RAG
5.1 理论
5.1 .1 Agentic RAG
Agentic RAG系统是一种包含多种检索代理(retrieval agents)的系统,能够根据用户查询的需求调用相应的代理来获取所需数据。Agentic RAG 系统的入口通常是一个检索器路由模块(retriever router),其作用是为当前任务选择最合适的一个或多个检索器(retriever)。实现 Agentic RAG 系统的一种常见方法是利用大语言模型(LLM)的工具调用能力(有时称为函数调用,function calling)。
Agentic 系统的核心思想是:系统能够代表用户主动执行任务。Agentic RAG 系统通常需要包含以下几个基础组件:
- 检索器路由(Retriever Router) —— 一个接收用户问题并返回最适配的检索器(或多个检索器)的函数
- 检索代理(Retriever Agents) —— 实际用于检索数据的检索模块,可用于回答用户问题
- 答案评判器(Answer Critic) —— 一个接收检索结果并验证原始问题是否被正确回答的函数
5.2 实现
import requests
import json
import random
from neo4j import GraphDatabase
from typing import Any, List, Dict, Optional
import re# Neo4j 数据库连接配置
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "你的密码"# Ollama 本地大语言模型配置
OLLAMA_BASE_URL = "http://localhost:11434"
LLM_MODEL = "qwen3:32b"# 初始化 Neo4j 数据库连接
neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))def remove_think_tags(text: str) -> str:"""从文本中移除 <think> 标签及其内容"""pattern = r'<think>.*?</think>'cleaned_text = re.sub(pattern, '', text, flags=re.DOTALL)return cleaned_text.strip()def extract_json_from_response(response_text: str) -> Optional[str]:"""从LLM响应中提取JSON内容,忽略<think>标签"""cleaned_response = remove_think_tags(response_text)if not cleaned_response:print(f"[JSON解析] 清理后的响应为空")return Nonestart_brace_index = cleaned_response.find('{')start_bracket_index = cleaned_response.find('[')start_index = -1if start_brace_index != -1 and (start_bracket_index == -1 or start_brace_index < start_bracket_index):start_index = start_brace_indexelif start_bracket_index != -1:start_index = start_bracket_indexif start_index != -1:potential_json = cleaned_response[start_index:].strip()if potential_json:try:parsed = json.loads(potential_json)if isinstance(parsed, (dict, list)):return potential_jsonexcept json.JSONDecodeError as e:print(f"[JSON解析] JSON验证失败: {e}")print(f"[JSON解析] 无法提取有效的JSON内容")return Nonedef call_local_llm(prompt: str, model: str = LLM_MODEL) -> str:"""调用本地 Ollama 大语言模型"""payload = {"model": model,"prompt": prompt,"stream": False}try:response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload)response.raise_for_status()return response.json()["response"].strip()except Exception as e:print(f"调用 LLM 失败: {e}")return ""def execute_cypher_query(cypher: str) -> List[Dict]:"""执行 Cypher 查询并返回结果"""try:with neo4j_driver.session() as session:result = session.run(cypher)return [dict(record) for record in result]except Exception as e:print(f"执行 Cypher 查询失败: {e}")return []def serialize_neo4j_result(records):"""将 Neo4j 查询结果转换为可 JSON 序列化的格式"""def node_to_dict(node):return dict(node)def value_to_serializable(val):if hasattr(val, '_properties'): # 是 Node 或 Relationshipreturn node_to_dict(val)elif isinstance(val, list):return [value_to_serializable(item) for item in val]else:return valserialized = []for record in records:serialized_record = {}for key in record.keys():serialized_record[key] = value_to_serializable(record[key])serialized.append(serialized_record)return serializeddef generate_cypher_from_question(question: str) -> str:"""根据自然语言问题生成 Cypher 查询语句"""prompt = f"""
你是一个专业的 Cypher 查询生成器。请根据以下图数据库结构,将自然语言问题转换为正确的 Cypher 查询。图数据库结构:
- Movie 节点:属性 title (电影标题)
- Person 节点:属性 name (人物姓名)
- Country 节点:属性 name (国家名称)
- Reviewer 节点:属性 name (影评人姓名)关系类型:
- [:ACTED_IN] - 演员与电影的关系
- [:DIRECTED] - 导演与电影的关系
- [:PRODUCED_IN] - 电影与制作国家的关系
- [:REVIEWED] - 影评人与电影的关系,包含 score 属性表示评分问题:{question}请返回一个有效的 Cypher 查询语句,只返回查询语句,不要其他内容,查询语句必须写在一行内:
"""result = call_local_llm(prompt)print(f"[LLM原始响应 - Cypher生成] {result}")# 移除<think>标签并提取查询语句cleaned_result = remove_think_tags(result)# 简单提取 Cypher 查询语句lines = cleaned_result.split('\n')for line in lines:line = line.strip()if line.upper().startswith('MATCH') or line.upper().startswith('RETURN'):return linereturn cleaned_result.strip()# 检索器工具定义
def text2cypher_retriever(question: str) -> List[Dict]:"""通用文本转 Cypher 检索器"""print(f"[检索器] 使用通用 text2cypher 检索器处理问题: {question}")# 第一步:生成 Cypher 查询cypher = generate_cypher_from_question(question)print(f"[检索器] 生成的 Cypher 查询: {cypher}")# 第二步:执行查询results = execute_cypher_query(cypher)print(f"[检索器] 查询返回 {len(results)} 条结果")return resultsdef movie_title_retriever(title: str) -> List[Dict]:"""根据电影标题检索电影信息的专用检索器"""print(f"[检索器] 使用电影标题检索器查找: {title}")query = """MATCH (m:Movie)WHERE toLower(m.title) CONTAINS $titleOPTIONAL MATCH (m)<-[:ACTED_IN]-(a:Person)OPTIONAL MATCH (m)<-[:DIRECTED]-(d:Person)RETURN m AS movie, collect(a.name) AS cast, collect(d.name) AS directors"""try:with neo4j_driver.session() as session:result = session.run(query, title=title.lower())results = []for record in result:movie_node = record["movie"]movie_dict = {"title": movie_node["title"],"year": movie_node.get("year"),}results.append({"movie": movie_dict,"cast": list(record["cast"]),"directors": list(record["directors"])})print(f"[检索器] 找到 {len(results)} 部相关电影")return resultsexcept Exception as e:print(f"[检索器] 电影标题检索失败: {e}")return []def actor_movies_retriever(actor: str) -> List[Dict]:"""根据演员姓名检索相关电影信息的专用检索器"""print(f"[检索器] 使用演员检索器查找: {actor}")query = """MATCH (a:Person)-[:ACTED_IN]->(m:Movie)WHERE toLower(a.name) CONTAINS $actorOPTIONAL MATCH (m)<-[:ACTED_IN]-(co_actor:Person)OPTIONAL MATCH (m)<-[:DIRECTED]-(d:Person)RETURN m AS movie, collect(co_actor.name) AS cast, collect(d.name) AS directors"""try:with neo4j_driver.session() as session:result = session.run(query, actor=actor.lower())results = []for record in result:movie_node = record["movie"]movie_dict = {"title": movie_node["title"],"year": movie_node.get("year"),}results.append({"movie": movie_dict,"cast": list(record["cast"]),"directors": list(record["directors"])})print(f"[检索器] 找到 {len(results)} 部相关电影")return resultsexcept Exception as e:print(f"[检索器] 演员检索失败: {e}")return []def direct_answer_retriever(answer: str) -> str:"""直接答案检索器"""print(f"[检索器] 使用直接答案检索器: {answer}")return answer# 检索器工具注册表
RETRIEVER_TOOLS = {"text2cypher": {"description": "通过用户问题查询数据库。当其他工具不适用时,作为默认选项使用。","function": text2cypher_retriever,"parameters": ["question"]},"movie_info_by_title": {"description": "通过提供电影标题获取电影信息,包括演员和导演","function": movie_title_retriever,"parameters": ["title"]},"movies_info_by_actor": {"description": "通过提供演员姓名获取该演员参演的电影信息","function": actor_movies_retriever,"parameters": ["actor"]},"answer_given": {"description": "若问题的完整答案已存在于对话中,请使用此工具提取答案","function": direct_answer_retriever,"parameters": ["answer"]}
}def choose_retriever_tool(question: str, conversation_history: List[Dict]) -> Dict:"""根据问题选择最合适的检索器工具"""print(f"[路由器] 为问题选择合适的检索器: {question}")prompt = f"""
你需要为用户问题选择最合适的检索工具。可用工具:
"""for tool_name, tool_info in RETRIEVER_TOOLS.items():prompt += f"- {tool_name}: {tool_info['description']}\n"prompt += f"""
用户问题: {question}请选择最合适的工具并提供参数,按以下JSON格式返回:
{{"function": {{"name": "工具名", "arguments": "{{\\"参数名\\": \\"参数值\\"}}"}}}}只返回JSON,不要其他内容:
"""response = call_local_llm(prompt)print(f"[LLM原始响应 - 工具选择] {response}")json_str = extract_json_from_response(response)if json_str:try:tool_choice = json.loads(json_str)tool_name = tool_choice["function"]["name"]arguments = json.loads(tool_choice["function"]["arguments"])print(f"[路由器] 选择工具: {tool_name}, 参数: {arguments}")return {"tool": tool_name, "arguments": arguments}except json.JSONDecodeError as e:print(f"[路由器] JSON 解析失败: {e}")else:print(f"[路由器] 无法提取JSON,使用默认工具")return {"tool": "text2cypher", "arguments": {"question": question}}def execute_retriever_tool(tool_name: str, arguments: Dict) -> Any:"""执行指定的检索器工具"""if tool_name not in RETRIEVER_TOOLS:print(f"[执行器] 未知工具: {tool_name},使用默认工具")tool_name = "text2cypher"arguments = {"question": str(arguments)}tool_info = RETRIEVER_TOOLS[tool_name]function = tool_info["function"]try:print(f"[执行器] 执行工具: {tool_name}")result = function(**arguments)return resultexcept Exception as e:print(f"[执行器] 工具执行失败: {e}")return []def update_question_with_context(original_question: str, conversation_history: List[Dict]) -> str:"""根据对话历史优化和更新问题"""if not conversation_history:return original_questionprint(f"[查询更新器] 根据上下文优化问题: {original_question}")context = ""for entry in conversation_history:if entry.get("role") == "assistant":context += f"之前的回答: {entry.get('content', '')}\n"prompt = f"""
你是问题优化专家。根据对话历史,将问题更新为更完整、具体且易于回答的形式。规则:
1. 只在必要时修改问题
2. 利用之前的答案补充缺失信息
3. 不要超出原问题的范围
4. 如果原问题已经足够清楚,保持不变对话历史:
{context}原始问题: {original_question}请返回优化后的问题,只返回问题文本:
"""updated_question = call_local_llm(prompt)print(f"[LLM原始响应 - 问题优化] {updated_question}")cleaned_question = remove_think_tags(updated_question)if cleaned_question and cleaned_question.strip():print(f"[查询更新器] 问题已优化为: {cleaned_question}")return cleaned_question.strip()else:print(f"[查询更新器] 保持原问题不变")return original_questiondef evaluate_answer_completeness(original_question: str, conversation_history: List[Dict]) -> Dict[str, Any]:"""评估答案是否完整"""print(f"[答案评判器] 评估问题答案的完整性: {original_question}")context = ""for entry in conversation_history:if entry.get("role") == "assistant":context += f"已获得的信息: {entry.get('content', '')}\n"prompt = f"""
你是答案完整性评估专家。判断原始问题是否已被完全回答。原始问题: {original_question}已获得的信息:
{context}请严格按照以下JSON格式返回结果:
如果信息充分回答了问题,返回: {{"success": true, "follow_up_questions": []}}
如果信息不够,生成需要补充的问题列表,格式: {{"success": false, "follow_up_questions": ["补充问题1", "补充问题2"]}}只返回JSON,不要其他内容:
"""response = call_local_llm(prompt)print(f"[LLM原始响应 - 答案评估] {response}")json_str = extract_json_from_response(response)if json_str:try:result = json.loads(json_str)if isinstance(result, dict) and "success" in result and "follow_up_questions" in result:success = result["success"]follow_up_questions = result["follow_up_questions"]print(f"[答案评判器] 评估结果: Success={success}, Follow-up Questions={len(follow_up_questions)}")return resultexcept json.JSONDecodeError as e:print(f"[答案评判器] JSON解析失败: {e}")else:print(f"[答案评判器] 无法提取JSON")print(f"[答案评判器] 使用默认评估结果: Success=False")return {"success": False, "follow_up_questions": [original_question]}def process_supplemental_questions(original_question: str, follow_up_questions: List[str],conversation_history: List[Dict]) -> List[Dict]:"""处理补充问题列表"""print(f"[补充检索] 开始处理 {len(follow_up_questions)} 个补充问题")for i, question in enumerate(follow_up_questions, 1):print(f"\n[补充检索] 处理补充问题 {i}/{len(follow_up_questions)}: {question}")conversation_history = process_single_question(question, conversation_history)return conversation_historydef process_single_question(question: str, conversation_history: List[Dict] = None) -> List[Dict]:"""处理单个问题"""if conversation_history is None:conversation_history = []print(f"[主流程] 开始处理问题: {question}")optimized_question = update_question_with_context(question, conversation_history)tool_choice = choose_retriever_tool(optimized_question, conversation_history)retrieval_result = execute_retriever_tool(tool_choice["tool"], tool_choice["arguments"])# 确保结果是可序列化的if isinstance(retrieval_result, list):# 如果是列表,尝试序列化每个元素serialized_results = []for item in retrieval_result:if isinstance(item, dict):serialized_results.append(item)else:serialized_results.append(str(item))retrieval_result = serialized_resultselif not isinstance(retrieval_result, (str, int, float, bool, type(None))):# 如果不是基本类型,转换为字符串retrieval_result = str(retrieval_result)result_text = f"针对问题:'{optimized_question}',使用工具 {tool_choice['tool']} 获得结果:{json.dumps(retrieval_result, ensure_ascii=False)}"conversation_history.append({"role": "assistant","content": result_text})print(f"[主流程] 问题处理完成")return conversation_historydef generate_final_answer(original_question: str, conversation_history: List[Dict]) -> str:"""基于检索到的信息生成最终答案"""print(f"[答案生成器] 基于检索信息生成最终答案")context = ""for entry in conversation_history:if entry.get("role") == "assistant":context += f"{entry.get('content', '')}\n"prompt = f"""
你是一个知识助手,需要基于提供的信息回答用户问题。重要规则:
1. 只能使用提供的信息回答问题
2. 如果信息不足,明确说明缺少什么信息
3. 不能编造或猜测信息
4. 回答要简洁明了用户问题: {original_question}可用信息:
{context}请基于以上信息回答用户问题:
"""final_answer = call_local_llm(prompt)print(f"[LLM原始响应 - 最终答案] {final_answer}")print(f"[答案生成器] 最终答案生成完成")return final_answerdef answer_question(user_question: str) -> str:"""回答用户问题的主入口函数"""print(f"\n开始处理用户问题: {user_question}")print(f"{'=' * 60}")print(f"\n第一阶段:初始信息检索")conversation_history = process_single_question(user_question)print(f"\n第二阶段:答案完整性评估")evaluation_result = evaluate_answer_completeness(user_question, conversation_history)if not evaluation_result.get("success", False):follow_up_questions = evaluation_result.get("follow_up_questions", [])if follow_up_questions:print(f"\n第三阶段:补充信息检索")conversation_history = process_supplemental_questions(user_question, follow_up_questions,conversation_history)else:print(f"\n第三阶段:跳过,评判器未提供补充问题但标记为不成功")else:print(f"\n第三阶段:跳过,答案已完整")print(f"\n第四阶段:生成最终答案")final_answer = generate_final_answer(user_question, conversation_history)return final_answerdef clear_database_small():"""清空 Neo4j 数据库中所有节点和关系"""with neo4j_driver.session() as session:session.run("MATCH (n) DETACH DELETE n")print("数据库已清空")def create_constraints_small():"""创建示范用唯一性约束"""with neo4j_driver.session() as session:session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (m:Movie) REQUIRE m.title IS UNIQUE")session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (p:Person) REQUIRE p.name IS UNIQUE")session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (r:Reviewer) REQUIRE r.name IS UNIQUE")session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:Country) REQUIRE c.name IS UNIQUE")print("唯一性约束已创建")def populate_small_demo_data():"""写入演示数据"""movies = [{"title": "侏罗纪公园", "year": 1993, "country": "美国"},{"title": "辛德勒的名单", "year": 1993, "country": "美国"},{"title": "拯救大兵瑞恩", "year": 1998, "country": "美国"},]director = "史蒂文·斯皮尔伯格"reviewers = ["Alice", "Bob", "Charlie"]actors = ["山姆·尼尔", "杰夫·高布伦", "朱莉安·摩尔", "连姆·尼森", "本·金斯利", "汤姆·汉克斯", "马特·达蒙"]with neo4j_driver.session() as session:# 国家session.run("MERGE (:Country {name: $name})", name="美国")# 导演session.run("MERGE (:Person {name: $name})", name=director)# 电影及 PRODUCED_IN、DIRECTEDfor m in movies:session.run("MERGE (:Movie {title: $title, year: $year})", title=m["title"], year=m["year"])session.run("""MATCH (mov:Movie {title: $title})MATCH (c:Country {name: $country})MERGE (mov)-[:PRODUCED_IN]->(c)""",title=m["title"], country=m["country"],)session.run("""MATCH (d:Person {name: $director})MATCH (mov:Movie {title: $title})MERGE (d)-[:DIRECTED]->(mov)""",director=director, title=m["title"],)# 评论者与评分for name in reviewers:session.run("MERGE (:Reviewer {name: $name})", name=name)for name in reviewers:sampled = random.sample(movies, k=2)for m in sampled:score = round(random.gauss(8.2, 0.8), 1)score = max(1.0, min(10.0, score))session.run("""MATCH (r:Reviewer {name: $name})MATCH (mov:Movie {title: $title})MERGE (r)-[rev:REVIEWED]->(mov)SET rev.score = $score""",name=name, title=m["title"], score=score,)# 少量演员与参演for a in actors:session.run("MERGE (:Person {name: $name})", name=a)movie_actors = {"侏罗纪公园": ["山姆·尼尔", "杰夫·高布伦"],"辛德勒的名单": ["连姆·尼森", "本·金斯利"],"拯救大兵瑞恩": ["汤姆·汉克斯", "马特·达蒙"],}for title, cast in movie_actors.items():for a in cast:session.run("""MATCH (p:Person {name: $actor})MATCH (m:Movie {title: $title})MERGE (p)-[:ACTED_IN]->(m)""",actor=a, title=title,)print("小规模演示数据已写入")def main():"""主程序入口"""clear_database_small()create_constraints_small()populate_small_demo_data()# 测试问题test_question = "侏罗纪公园的导演是谁?"try:# 调用主处理函数answer = answer_question(test_question)# 输出结果print(f"\n最终结果:")print(f"问题: {test_question}")print(f"答案: {answer}")except Exception as e:print(f"处理问题时发生错误: {e}")import tracebacktraceback.print_exc()finally:# 关闭数据库连接print(f"\n关闭数据库连接")neo4j_driver.close()if __name__ == "__main__":main()
数据库已清空
唯一性约束已创建
小规模演示数据已写入开始处理用户问题: 侏罗纪公园的导演是谁?
============================================================第一阶段:初始信息检索
[主流程] 开始处理问题: 侏罗纪公园的导演是谁?
[路由器] 为问题选择合适的检索器: 侏罗纪公园的导演是谁?
[LLM原始响应 - 工具选择] <think>
好的,用户问的是“侏罗纪公园的导演是谁?”。我需要选择合适的工具来回答这个问题。首先看可用的工具,有text2cypher、movie_info_by_title、movies_info_by_actor和answer_given。用户的问题是关于电影《侏罗纪公园》的导演,所以应该使用与电影信息相关的工具。movie_info_by_title这个工具是通过电影标题获取信息的,包括演员和导演,所以这个应该是最合适的。而movies_info_by_actor是通过演员来找电影的,这里不需要。text2cypher可能需要查询数据库,但如果有直接获取信息的工具,应该优先用那个。answer_given这里显然问题没有被回答过,所以不需要用。所以应该选movie_info_by_title,参数是电影标题“侏罗纪公园”。这样就能得到导演的信息了。
</think>{"function": {"name": "movie_info_by_title", "arguments": "{\"title\": \"侏罗纪公园\"}"}}
[路由器] 选择工具: movie_info_by_title, 参数: {'title': '侏罗纪公园'}
[执行器] 执行工具: movie_info_by_title
[检索器] 使用电影标题检索器查找: 侏罗纪公园
[检索器] 找到 1 部相关电影
[主流程] 问题处理完成第二阶段:答案完整性评估
[答案评判器] 评估问题答案的完整性: 侏罗纪公园的导演是谁?
[LLM原始响应 - 答案评估] <think>
好的,我需要评估用户的问题“侏罗纪公园的导演是谁?”是否已经被完全回答。根据提供的已获得信息,使用工具movie_info_by_title得到了结果,显示导演是史蒂文·斯皮尔伯格,而且有两个条目,但都是同一个人。这说明信息已经明确给出了导演的名字,没有矛盾或缺失。因此,问题已经充分回答,不需要补充问题。返回success为true,follow_up_questions为空列表。
</think>{"success": true, "follow_up_questions": []}
[答案评判器] 评估结果: Success=True, Follow-up Questions=0第三阶段:跳过,答案已完整第四阶段:生成最终答案
[答案生成器] 基于检索信息生成最终答案
[LLM原始响应 - 最终答案] <think>
好的,用户问的是侏罗纪公园的导演是谁。我需要先检查提供的可用信息。根据工具movie_info_by_title返回的结果,电影侏罗纪公园的导演列表中有两个“史蒂文·斯皮尔伯格”。这可能是个重复,但根据常识,侏罗纪公园确实是史蒂文·斯皮尔伯格导演的。不过按照规则,只能使用提供的信息,所以需要确认信息是否正确。这里虽然导演名字重复了,但明显是同一个导演,所以应该回答史蒂文·斯皮尔伯格。同时,确保没有使用其他信息来源,只基于给定的数据。用户的问题已经得到明确答案,不需要额外信息。
</think>侏罗纪公园的导演是史蒂文·斯皮尔伯格。
[答案生成器] 最终答案生成完成最终结果:
问题: 侏罗纪公园的导演是谁?
答案: <think>
好的,用户问的是侏罗纪公园的导演是谁。我需要先检查提供的可用信息。根据工具movie_info_by_title返回的结果,电影侏罗纪公园的导演列表中有两个“史蒂文·斯皮尔伯格”。这可能是个重复,但根据常识,侏罗纪公园确实是史蒂文·斯皮尔伯格导演的。不过按照规则,只能使用提供的信息,所以需要确认信息是否正确。这里虽然导演名字重复了,但明显是同一个导演,所以应该回答史蒂文·斯皮尔伯格。同时,确保没有使用其他信息来源,只基于给定的数据。用户的问题已经得到明确答案,不需要额外信息。
</think>侏罗纪公园的导演是史蒂文·斯皮尔伯格。关闭数据库连接进程已结束,退出代码为 0