RAG学习(五)——查询构建、Text2SQL、查询重构与分发
检索优化(二)
一、查询构建
在前面的章节中,我们探讨了如何通过向量嵌入和相似度搜索来从非结构化数据中检索信息。然而,在实际应用中,我们常常需要处理更加复杂和多样化的数据,包括结构化数据(如SQL数据库)、半结构化数据(如带有元数据的文档)以及图数据。用户的查询也可能不仅仅是简单的语义匹配,而是包含复杂的过滤条件、聚合操作或关系查询。
1.1 什么是查询构建
构建查询键是指在接收用户查询时,把查询中的自然语言语义、结构化条件、过滤逻辑等信息,转化为一个或多个 可执行的检索键 (query keys),用于与数据库或索引结构进行高效匹配。
你可以把“查询键”理解为查询的“向量化/标准化表示”,它既可能是:
- 语义向量(embedding,用于语义相似度搜索)
- 符号化条件(SQL 过滤条件、时间戳、标签等)
- 图模式(graph pattern,用于图数据库)
换句话说,构建查询键是把自然语言或复杂查询“翻译”为数据库能理解的检索指令。
如上图所示,这是 信息检索(尤其是 RAG)中的查询构建流程,主要分为三个阶段:
1. Multi Representation Indexing (多表示索引)
- 输入:原始文档(Raw Documents)
- 处理:通过 LLM 把文档转化为更适合检索的表示形式,比如:
- Retrieval optimized summary(面向检索优化的摘要)
- 元数据(Metadata)
- 存储:
- Vectorstore:存储语义向量(便于相似度搜索)
- Docstore:存储结构化文本、元数据
👉 这一阶段的目标是:让文档更容易被检索
2. Query Transformation (查询转换)
- 输入:用户的自然语言查询
- 处理:通过 LLM 对用户的查询进行重构,例如:
- 改写问题(Re-phrase query)
- 生成子查询(Sub-queries)
- 把长对话压缩成更清晰的单一查询(Condensed chat to query)
- 输出:Retrieval optimized question(适合检索的查询)
👉 这一阶段的目标是:让查询更容易匹配文档。
3. Query Construction (查询构建)
- 输入:优化后的查询 + 元数据条件
- 处理:LLM 将自然语言转化为数据库可执行的检索语句:
- 添加 Metadata filters(如 “year=2023”、“type=medical”)
- 生成 SQL 或者 其他查询语言(如 Elasticsearch DSL、GraphQL、SPARQL)
- 输出:可直接用于执行的“查询计划”(Question + Filters/SQL)
👉 这一阶段的目标是:把自然语言“翻译”为结构化的检索指令。
1.2 构建查询的基本流程
在构建向量索引时,常常会为文档块(Chunks)附加元数据(Metadata),例如文档来源、发布日期、作者、章节、类别等。这些元数据为我们提供了在语义搜索之外进行精确过滤的可能。
自查询检索器(Self-Query Retriever) 是LangChain中实现这一功能的核心组件。它的工作流程如下:
-
定义元数据结构:首先,需要向LLM清晰地描述文档内容和每个元数据字段的含义及类型。
-
查询解析
:当用户输入一个自然语言查询时,自查询检索器会调用LLM,将查询分解为两部分:
- 查询字符串(Query String):用于进行语义搜索的部分。
- 元数据过滤器(Metadata Filter):从查询中提取出的结构化过滤条件。
-
执行查询:检索器将解析出的查询字符串和元数据过滤器发送给向量数据库,执行一次同时包含语义搜索和元数据过滤的查询。
例如,对于查询“关于2022年发布的机器学习的论文”,自查询检索器会将其解析为:
- 查询字符串: “查找 2023 年发表的关于 Graph Neural Networks 的医学论文”
- 元数据过滤器:
year == 2023
、ORDER BY impact_factor DESC
1.3代码实践
接下来以B站视频为例来看看如何使用SelfQueryRetriever
。完整代码如下
import os
from langchain_deepseek import ChatDeepSeek
from langchain_community.document_loaders import BiliBiliLoader
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
import logginglogging.basicConfig(level=logging.INFO)# 1. 初始化视频数据
video_urls = ["https://www.bilibili.com/video/BV1Bo4y1A7FU", "https://www.bilibili.com/video/BV1ug4y157xA","https://www.bilibili.com/video/BV1yh411V7ge",
]bili = []
try:loader = BiliBiliLoader(video_urls=video_urls)docs = loader.load()for doc in docs:original = doc.metadata# 提取基本元数据字段metadata = {'title': original.get('title', '未知标题'),'author': original.get('owner', {}).get('name', '未知作者'),'source': original.get('bvid', '未知ID'),'view_count': original.get('stat', {}).get('view', 0),'length': original.get('duration', 0),}doc.metadata = metadatabili.append(doc)except Exception as e:print(f"加载BiliBili视频失败: {str(e)}")if not bili:print("没有成功加载任何视频,程序退出")exit()# 2. 创建向量存储
embed_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
vectorstore = Chroma.from_documents(bili, embed_model)# 3. 配置元数据字段信息
metadata_field_info = [AttributeInfo(name="title",description="视频标题(字符串)",type="string", ),AttributeInfo(name="author",description="视频作者(字符串)",type="string",),AttributeInfo(name="view_count",description="视频观看次数(整数)",type="integer",),AttributeInfo(name="length",description="视频长度(整数)",type="integer")
]# 4. 创建自查询检索器
llm = ChatDeepSeek(model="deepseek-chat", temperature=0, api_key=os.getenv("DEEPSEEK_API_KEY"))retriever = SelfQueryRetriever.from_llm(llm=llm,vectorstore=vectorstore,document_contents="记录视频标题、作者、观看次数等信息的视频元数据",metadata_field_info=metadata_field_info,enable_limit=True,verbose=True
)# 5. 执行查询示例
queries = ["时间最短的视频","时长大于600秒的视频"
]for query in queries:print(f"\n--- 查询: '{query}' ---")results = retriever.invoke(query)if results:for doc in results:title = doc.metadata.get('title', '未知标题')author = doc.metadata.get('author', '未知作者')view_count = doc.metadata.get('view_count', '未知')length = doc.metadata.get('length', '未知')print(f"标题: {title}")print(f"作者: {author}")print(f"观看次数: {view_count}")print(f"时长: {length}秒")print("="*50)else:print("未找到匹配的视频")
代码执行后的结果:
--- 查询: '时间最短的视频' ---
INFO:httpx:HTTP Request: POST https://api.deepseek.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:langchain.retrievers.self_query.base:Generated Query: query=' ' filter=None limit=None
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】03.Prompt如何迭代优化
作者: 二次元的Datawhale
观看次数: 7168
时长: 806秒
==================================================
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】01.课程介绍
作者: 二次元的Datawhale
观看次数: 45442
时长: 390秒
==================================================
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】02.Prompt 的构建原则
作者: 二次元的Datawhale
观看次数: 19024
时长: 1063秒
==================================================--- 查询: '时长大于600秒的视频' ---
INFO:httpx:HTTP Request: POST https://api.deepseek.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:langchain.retrievers.self_query.base:Generated Query: query=' ' filter=Comparison(comparator=<Comparator.GT: 'gt'>, attribute='length', value=600) limit=None
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】03.Prompt如何迭代优化
作者: 二次元的Datawhale
观看次数: 7168
时长: 806秒
==================================================
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】02.Prompt 的构建原则
作者: 二次元的Datawhale
观看次数: 19024
时长: 1063秒
==================================================
分析:
代码的执行流程如下所示:
- 准备数据与日志
- 定义 B 站视频 URL,开启
logging
.
- 定义 B 站视频 URL,开启
- 加载与清洗文档
- 用
BiliBiliLoader(video_urls=...)
抓取视频文档docs
。 - 逐条扁平化元数据(标题、作者、播放量、时长等),并替换到
doc.metadata
,只保留简单字段(避免嵌套 dict 影响存储/过滤)。 - 若加载失败或为空则退出。
- 用
- 向量化并入库
- 用
HuggingFaceEmbeddings("BAAI/bge-small-zh-v1.5")
生成中文向量。 Chroma.from_documents(bili, embed_model)
建立向量库(同时把清洗过的元数据写入 metadata)。
- 用
- 声明可检索的元数据模式
- 构造
metadata_field_info
(若干AttributeInfo
),描述各字段名、类型、含义:title(str)
,author(str)
,view_count(int)
,length(int)
。
- 构造
- 初始化 LLM 与自查询检索器
- 用
ChatDeepSeek
(读取DEEPSEEK_API_KEY
)作为解析器。 SelfQueryRetriever.from_llm(...)
创建自查询检索器:它会把自然语言问题自动解析为「向量查询 + 元数据过滤 + 可选 limit」的结构化查询计划。
- 用
- 执行查询
- 循环两条查询:
- “时间最短的视频”(期望解析出 limit=1 或与时长相关的约束,但排序不是所有向量库都原生支持,该句在多数实现中更可能转成“取最相关的一条”)。
- “时长大于600秒的视频”(解析为
length > 600
的过滤条件 + 语义向量检索)。
retriever.invoke(query)
返回Document
列表,逐条打印 metadata。
- 循环两条查询:
本环节的关键代码——构建自查询检索器的代码如下所示:
retriever = SelfQueryRetriever.from_llm(llm=llm,vectorstore=vectorstore,document_contents="记录视频标题、作者、观看次数等信息的视频元数据",metadata_field_info=metadata_field_info,enable_limit=True,verbose=True
)
document_contents
:告诉 LLM 文档主体是什么(有助于生成更精准的“向量 query”)。enable_limit=True
:允许 LLM 在解析中设定返回条数。verbose=True
:打印解析出的结构化查询。
二、Text2SQL
在数据世界中,除了向量数据库能够处理的非结构化数据,关系型数据库(如 MySQL, PostgreSQL, SQLite)仍然是存储和管理结构化数据的核心。文本到SQL(Text-to-SQL)正是为了打破人与结构化数据之间的语言障碍而生。它利用大语言模型(LLM)将用户的自然语言问题,直接翻译成可以在数据库上执行的SQL查询语句。
例如,它的目标是:
- 输入:用户的自然语言问题(例如 “查找2020年销售额排名前5的产品”)
- 输出:对应的 SQL 查询语句(例如
SELECT product_name, SUM(sales) FROM sales WHERE year = 2020 GROUP BY product_name ORDER BY SUM(sales) DESC LIMIT 5;
)
这样用户就无需学习 SQL 语法,就能直接与数据库交互。
2.1 Text2SQL具体示例
场景:我们有一个电商数据库,包含以下表:
customers(customer_id, name, city)
orders(order_id, customer_id, order_date, order_amount)
用户问题:
“请给我找出 2021 年订单总金额最高的前 3 个客户的姓名。”
流程解析:
-
语义理解
- 找“客户姓名” →
customers.name
- “订单总金额” →
SUM(orders.order_amount)
- 时间约束 →
YEAR(order_date) = 2021
- 排序规则 →
ORDER BY SUM(...) DESC
- 取前 3 →
LIMIT 3
- 找“客户姓名” →
-
Schema Linking
- “客户姓名” →
customers.name
- “订单” →
orders
- “订单金额” →
orders.order_amount
- “客户与订单的关系” →
customers.customer_id = orders.customer_id
- “客户姓名” →
-
SQL 生成
SELECT c.name, SUM(o.order_amount) AS total_amount FROM customers c JOIN orders o ON c.customer_id = o.customer_id WHERE YEAR(o.order_date) = 2021 GROUP BY c.name ORDER BY total_amount DESC LIMIT 3;
-
执行与结果返回
数据库执行 SQL,返回前 3 个客户的姓名与金额。
2.2 优化策略
- 提供精确的数据库模式 (Schema):这是最基础也是最关键的一步。我们需要向LLM提供数据库中相关表的
CREATE TABLE
语句。这就像是给了LLM一张地图,让它了解数据库的结构,包括表名、列名、数据类型和外键关系。
目标:让模型“只看到必要的真相”,同时尽量降低“无关表/列”带来的幻觉。
策略:
-
最小可用 Schema 包:
-
只注入与当前任务强相关的表(Top-k 表/列裁剪)。
-
每个表提供:
CREATE TABLE
、主外键、索引、行数估计(row_count)、示例行(1~3条,必要时脱敏)。
-
-
跨表关系显式化:
- 列出外键图(Join Graph):
orders.customer_id → customers.customer_id
,并给出推荐连接路径(最短/最强约束路径)。
- 列出外键图(Join Graph):
-
方言与函数白名单:
- 指定数据库方言(MySQL / PostgreSQL / SQLite…),同时提供常用日期/字符串函数速查(如
DATE_TRUNC()
vsDATE_FORMAT()
),减少方言误用。
- 指定数据库方言(MySQL / PostgreSQL / SQLite…),同时提供常用日期/字符串函数速查(如
-
命名规范映射:
- 给出常见别名→真实列的映射(如 “price/amount/cost → orders.order_amount”),但此处只是 schema 补充,不等同 RAG 的同义词库(见下一节)。
-
格式与顺序:
- 以“库级摘要→表级 DDL(数据定义语言)→外键图→函数白名单→示例行”的顺序组织,利于模型逐步定位信息。
-
提供少量高质量的示例 (Few-shot Examples):在提示(Prompt)中加入一些“问题-SQL”的示例对,可以极大地提升LLM生成查询的准确性。这相当于给了LLM几个范例,让它学习如何根据相似的问题构建查询。
目标:用极少但覆盖面好的示例,让模型学到“构造策略”,而非死记硬背。
策略:- 覆盖关键操作的最小基集:筛选(
WHERE
)、投影、排序、聚合(COUNT/AVG/SUM
)、多表JOIN
、GROUP BY
、HAVING
、子查询/CTE、窗口函数(如需要)。 - 一问多说:相同 SQL 意图的多种自然语言表述(口语/书面/含代词/含同义词),提升鲁棒性。
- 方言一致:示例 SQL 必须与目标数据库方言一致。
- 去偶然性:避免依赖具体值的示例(如某个客户名),用参数化/占位符或“可替换值”的数据。
- 可解释的“对齐步骤”:在示例中加入结构化中间标注(而非链路推理文本),如:
intent: aggregation
tables: customers, orders
join_keys: customers.customer_id = orders.customer_id
filters: YEAR(order_date)=2021
- 这样既教会“思路”,又不会暴露长篇思维链。
- 覆盖关键操作的最小基集:筛选(
-
利用RAG增强上下文:这是更进一步的策略。我们可以像RAGFlow一样,为数据库构建一个专门的“知识库”2,其中不仅包含表的DDL(数据定义语言),还可以包含:
- 表和字段的详细描述:用自然语言解释每个表是做什么的,每个字段代表什么业务含义。
- 同义词和业务术语:例如,将用户的“花费”映射到数据库的
cost
字段。 - 复杂的查询示例:提供一些包含
JOIN
、GROUP BY
或子查询的复杂问答对。 当用户提问时,系统首先从这个知识库中检索最相关的信息(如相关的表结构、字段描述、相似的Q&A),然后将这些信息和用户的问题一起组合成一个内容更丰富的提示,交给LLM生成最终的SQL查询。这种方式极大地降低了“幻觉”的风险,提高了查询的准确度。
目标:把“要查什么”和“能怎么查”的知识,在提示阶段就喂给模型,降低幻觉与方言/业务错配。
-
错误修正与反思 (Error Correction and Reflection):在生成SQL后,系统会尝试执行它。如果数据库返回错误,可以将错误信息反馈给LLM,让它“反思”并修正SQL语句,然后重试。这个迭代过程可以显著提高查询的成功率。
目标:把“执行器”纳入回路,用真实反馈修正 SQL。
常见可修正错误类型与策略:- UnknownColumn / UnknownTable:提示最相似列/表(Levenshtein/embedding 最近邻);
- AmbiguousColumn:自动补全前缀;
- TypeError(字符串与数值/日期比较):加上显式转换或改用等价函数;
- Join 爆炸/重复:增加主键去重/
DISTINCT
或移动聚合到子查询; - 方言不兼容:按方言映射表替换函数;
- 性能问题:加
WHERE
选择性、推下LIMIT
、避免跨大表CROSS JOIN
2.3 代码与简单实现
这段代码整体上是一个 简化的 RAG(检索增强生成)知识库系统,用来辅助 Text2SQL,核心思路是:
- 用嵌入模型(BGE-Small)对表结构和 SQL 示例进行向量化;
- 存储在 Milvus 向量数据库中;
- 当用户提问时,先检索最相关的 Schema 和 SQL 示例;
- 再将这些检索结果提供给 LLM,辅助生成更精确的 SQL。
import os
import json
import sqlite3
import numpy as np
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from pymilvus import connections, MilvusClient, FieldSchema, CollectionSchema, DataType, Collectionclass BGESmallEmbeddingFunction:"""BGE-Small中文嵌入函数,用于Text2SQL知识库向量化"""def __init__(self, model_name="BAAI/bge-small-zh-v1.5", device="cpu"):self.model_name = model_nameself.device = deviceself.model = SentenceTransformer(model_name, device=device)self.dense_dim = self.model.get_sentence_embedding_dimension()def encode_text(self, texts):"""编码文本为密集向量"""if isinstance(texts, str):texts = [texts]embeddings = self.model.encode(texts,normalize_embeddings=True,batch_size=16,convert_to_numpy=True)return embeddings@propertydef dim(self):"""返回向量维度"""return self.dense_dim
封装了 BGE-small-zh 模型,负责 将文本转化为稠密向量表示。
self.model.encode(texts, normalize_embeddings=True)
—— 生成标准化后的向量,用于语义相似度计算。self.dense_dim
—— 获取嵌入维度,用于 Milvus 建表时指定向量维度。
class SimpleKnowledgeBase:
def __init__(self, milvus_uri: str = "http://localhost:19530"):self.milvus_uri = milvus_uriself.collection_name = "text2sql_knowledge_base"self.milvus_client = Noneself.collection = Noneself.embedding_function = BGESmallEmbeddingFunction(model_name="BAAI/bge-small-zh-v1.5",device="cpu")self.sql_examples = []self.table_schemas = []self.data_loaded = False
作用:初始化向量模型,定义知识库的存储结构。
default_examples = [{"question": "查询所有用户信息", "sql": "SELECT * FROM users", "description": "获取用户记录"},{"question": "按城市统计用户", "sql": "SELECT city, COUNT(*) FROM users GROUP BY city", "description": "城市分组"}
]
作用:定义一些 Few-shot 示例,帮助 LLM 学习 SQL 生成方式。
default_schemas = [{"table_name": "users", "description": "用户信息表", "columns": [{"name": "id", "type": "INTEGER", "description": "用户ID"},{"name": "name", "type": "VARCHAR", "description": "用户姓名"}]},{"table_name": "orders", "description": "订单信息表", ...}
]
作用:提供数据库表的 Schema,供模型参考。
def vectorize_and_store(self):"""向量化数据并存储到Milvus"""self.create_collection()all_texts = []all_metadata = []for example in self.sql_examples:text = f"问题: {example['question']} SQL: {example['sql']} 描述: {example.get('description', '')}"all_texts.append(text)all_metadata.append({"content_type": "sql_example","question": example['question'],"sql": example['sql'],"description": example.get('description', ''),"table_name": ""})for schema in self.table_schemas:columns_desc = ", ".join([f"{col['name']} ({col['type']}): {col.get('description', '')}" for col in schema['columns']])text = f"表 {schema['table_name']}: {schema['description']} 字段: {columns_desc}"all_texts.append(text)all_metadata.append({"content_type": "table_schema","question": "","sql": "","description": schema['description'],"table_name": schema['table_name']})embeddings = self.embedding_function.encode_text(all_texts)insert_data = []for i, (embedding, metadata) in enumerate(zip(embeddings, all_metadata)):insert_data.append([metadata["content_type"],metadata["question"],metadata["sql"],metadata["description"],metadata["table_name"],embedding.tolist()])self.collection.insert(insert_data)self.collection.flush()self.collection.load()
把 SQL 示例和表结构 转化为嵌入向量,并存入 Milvus。这也就是RAG 的关键 —— 把知识转化为向量,方便检索。
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:"""搜索相关的知识库信息"""if not self.data_loaded:self.load_data()query_embedding = self.embedding_function.encode_text([query])[0]search_params = {"metric_type": "IP", "params": {}}results = self.collection.search([query_embedding.tolist()],anns_field="embedding",param=search_params,limit=top_k,output_fields=["content_type", "question", "sql", "description", "table_name"])[0]formatted_results = []for hit in results:result = {"score": float(hit.distance),"content_type": hit.entity.get("content_type"),"question": hit.entity.get("question"),"sql": hit.entity.get("sql"),"description": hit.entity.get("description"),"table_name": hit.entity.get("table_name")}formatted_results.append(result)return formatted_results
给定用户问题,计算相似度,在知识库中找到最相关的 SQL 示例 / Schema。
def _fallback_search(self, query: str, top_k: int) -> List[Dict[str, Any]]:"""降级搜索方法(简单文本匹配)"""results = []query_lower = query.lower()for example in self.sql_examples:question_lower = example['question'].lower()sql_lower = example['sql'].lower()score = 0for word in query_lower.split():if word in question_lower:score += 2if word in sql_lower:score += 1if score > 0:results.append({"score": score,"content_type": "sql_example","question": example['question'],"sql": example['sql'],"description": example.get('description', ''),"table_name": ""})results.sort(key=lambda x: x['score'], reverse=True)return results[:top_k]
当 Milvus 检索失败时,用简单的字符串匹配做 backup。
def add_sql_example(self, question, sql, description=""):self.sql_examples.append(...)embedding = self.embedding_function.encode_text([f"问题: {question} SQL: {sql}"])[0]self.collection.insert([...])
支持动态扩展知识库。
工作流程总结:
- 知识准备
- 收集 SQL 示例和数据库 Schema。
- 用 BGE 模型向量化,存入 Milvus。
- 用户提问
- 输入自然语言问题,例如:「查询年龄大于30的用户」。
- 向量检索
- 对问题做向量化,去 Milvus 检索最相似的 SQL 示例和 Schema。
- 结果组合(Prompt 构造)
- 检索出的 Schema(表结构)、示例 SQL 一起作为提示,交给 LLM。
- SQL 生成
- LLM 生成候选 SQL。
- 执行 + 错误修正
- 执行 SQL,如果报错,将错误信息返回给 LLM,要求修正。
输出结果:
向量维度: (2, 512)问题 1: 查询所有用户
----------------------------------------
SQL: SELECT * FROM users
返回 3 行数据1. (1, '张三', 25, '北京')2. (2, '李四', 32, '上海')... 还有 1 行问题 2: 年龄大于30的用户
----------------------------------------
SQL: SELECT * FROM users WHERE age > 30
返回 2 行数据1. (2, '李四', 32, '上海')2. (3, '王五', 35, '深圳')问题 3: 统计用户总数
----------------------------------------
SQL: SELECT COUNT(*) FROM users
返回 1 行数据1. (3,)
三、查询重构与分发
用户的原始问题往往不是最优的检索输入。它可能过于复杂、包含歧义,或者与文档的实际措辞存在偏差。为了解决这些问题,我们需要在检索之前对用户的查询进行“预处理”,这就是本节要探讨的查询重构与分发。
查询翻译产出的是更好的查询;查询路由决定把哪个查询送到哪个检索器/数据源(例如:结构化条件→SQL;定义型问题→百科索引;代码示例→代码片段库;长文本语义→向量库)。两者常配合:先翻译出多种形式,再用路由器按规则/小模型打分分发,最后统一融合结果。
3.1 查询翻译
查询翻译就是把用户的原始提问改写成更利于检索系统理解与匹配的一个或多个“检索友好型查询”(searchable queries)。这些改写会解决歧义、补全上下文、贴合语料措辞或索引特性(向量/关键词/结构化 SQL 等)。
3.1.1 提示工程
这是最直接的查询重构方法。通过精心设计的提示词(Prompt),可以引导 LLM 将用户的原始查询改写得更清晰、更具体,或者转换成一种更利于检索的叙述风格。
提示工程用稳定的提示模板,让 LLM 将口语化/含糊的原问改写为结构清晰、字段齐备、面向检索的查询文本(可多版本:关键词式、向量式、SQL 条件式)。
模板要素:
- 明确任务:“将用户问题改写为面向检索的短查询,保留核心名词与约束条件。”
- 约束输出:长度上限、必须包含/禁止包含的词、时间/地域格式化、实体标准化(同义词折叠)。
- 多目标生成:同时产出
keyword_query
、vector_query_summary
、sql_predicates
。
模板示例:
# 使用大模型将自然语言转换为排序指令
prompt = f"""你是一个智能助手,请将用户的问题转换成一个用于排序视频的JSON指令。你需要识别用户想要排序的字段和排序方向。
- 排序字段必须是 'view_count' (观看次数) 或 'length' (时长) 之一。
- 排序方向必须是 'asc' (升序) 或 'desc' (降序) 之一。例如:
- '时间最短的视频' 或 '哪个视频时间最短' 应转换为 {{"sort_by": "length", "order": "asc"}}
- '播放量最高的视频' 或 '哪个视频最火' 应转换为 {{"sort_by": "view_count", "order": "desc"}}请根据以下问题生成JSON指令:
原始问题: "{query}"JSON指令:"""
完整代码如下所示:
import os
from langchain_deepseek import ChatDeepSeek
from langchain_community.document_loaders import BiliBiliLoader
from langchain.chains.query_constructor.base import AttributeInfo
from openai import OpenAI
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
import logginglogging.basicConfig(level=logging.INFO)# 1. 初始化视频数据
video_urls = ["https://www.bilibili.com/video/BV1Bo4y1A7FU", "https://www.bilibili.com/video/BV1ug4y157xA","https://www.bilibili.com/video/BV1yh411V7ge",
]bili = []
try:loader = BiliBiliLoader(video_urls=video_urls)docs = loader.load()for doc in docs:original = doc.metadata# 提取基本元数据字段metadata = {'title': original.get('title', '未知标题'),'author': original.get('owner', {}).get('name', '未知作者'),'source': original.get('bvid', '未知ID'),'view_count': original.get('stat', {}).get('view', 0),'length': original.get('duration', 0),}doc.metadata = metadatabili.append(doc)except Exception as e:print(f"加载BiliBili视频失败: {str(e)}")if not bili:print("没有成功加载任何视频,程序退出")exit()# 2. 创建向量存储
embed_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
vectorstore = Chroma.from_documents(bili, embed_model)# 3. 配置元数据字段信息
metadata_field_info = [AttributeInfo(name="title",description="视频标题(字符串)",type="string", ),AttributeInfo(name="author",description="视频作者(字符串)",type="string",),AttributeInfo(name="view_count",description="视频观看次数(整数)",type="integer",),AttributeInfo(name="length",description="视频长度(整数)",type="integer")
]# 4. 初始化LLM客户端
client = OpenAI(base_url="https://api.deepseek.com",api_key=os.getenv("DEEPSEEK_API_KEY")
)# 5. 获取所有文档用于排序
all_documents = vectorstore.similarity_search("", k=len(bili)) # 6. 执行查询示例
queries = ["时间最短的视频","播放量最高的视频"
]for query in queries:print(f"\n--- 原始查询: '{query}' ---")# 使用大模型将自然语言转换为排序指令prompt = f"""你是一个智能助手,请将用户的问题转换成一个用于排序视频的JSON指令。你需要识别用户想要排序的字段和排序方向。
- 排序字段必须是 'view_count' (观看次数) 或 'length' (时长) 之一。
- 排序方向必须是 'asc' (升序) 或 'desc' (降序) 之一。例如:
- '时间最短的视频' 或 '哪个视频时间最短' 应转换为 {{"sort_by": "length", "order": "asc"}}
- '播放量最高的视频' 或 '哪个视频最火' 应转换为 {{"sort_by": "view_count", "order": "desc"}}请根据以下问题生成JSON指令:
原始问题: "{query}"JSON指令:"""response = client.chat.completions.create(model="deepseek-chat",messages=[{"role": "user", "content": prompt}],temperature=0,response_format={"type": "json_object"})try:import jsoninstruction_str = response.choices[0].message.contentinstruction = json.loads(instruction_str)print(f"--- 生成的排序指令: {instruction} ---")sort_by = instruction.get('sort_by')order = instruction.get('order')if sort_by in ['length', 'view_count'] and order in ['asc', 'desc']:# 在代码中执行排序reverse_order = (order == 'desc')sorted_docs = sorted(all_documents, key=lambda doc: doc.metadata.get(sort_by, 0), reverse=reverse_order)# 获取排序后的第一个结果if sorted_docs:doc = sorted_docs[0]title = doc.metadata.get('title', '未知标题')author = doc.metadata.get('author', '未知作者')view_count = doc.metadata.get('view_count', '未知')length = doc.metadata.get('length', '未知')print(f"标题: {title}")print(f"作者: {author}")print(f"观看次数: {view_count}")print(f"时长: {length}秒")print("="*50)else:print("没有找到任何视频")else:print("生成的指令无效,无法执行排序")except (json.JSONDecodeError, KeyError) as e:print(f"解析或执行指令失败: {e}")
结果:
--- 原始查询: '时间最短的视频' ---
INFO:httpx:HTTP Request: POST https://api.deepseek.com/chat/completions "HTTP/1.1 200 OK"
--- 生成的排序指令: {'sort_by': 'length', 'order': 'asc'} ---
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】01.课程介绍
作者: 二次元的Datawhale
观看次数: 45458
时长: 390秒
==================================================--- 原始查询: '播放量最高的视频' ---
INFO:httpx:HTTP Request: POST https://api.deepseek.com/chat/completions "HTTP/1.1 200 OK"
--- 生成的排序指令: {'sort_by': 'view_count', 'order': 'desc'} ---
标题: 《吴恩达 x OpenAI Prompt课程》【专业翻译,配套代码笔记】01.课程介绍
作者: 二次元的Datawhale
观看次数: 45458
时长: 390秒
==================================================
3.1.2 多查询分解
当用户提出一个复杂的问题时,直接用整个问题去检索可能效果不佳,因为它可能包含多个子主题或意图。分解技术的核心思想是将这个复杂问题拆分成多个更简单、更具体的子问题。然后,系统分别对每个子问题进行检索,最后将所有检索到的结果合并、去重,形成一个更全面的上下文,再交给 LLM 生成最终答案。
两种典型形态
- 分解型(Decomposition):将复合任务拆为若干原子子问题(实体定义、条件约束、子目标)。
- 扩展型(Expansion):围绕同一意图生成多种等价问法/同义词组合,覆盖语料措辞差异。
操作流程
- 用 LLM 识别意图槽位(实体、关系、时间、指标、场景)。
- 产出 N 个子查询/改写(可设置 diversity 提示:鼓励不同词汇/句式)。
- 对候选查询去重与门控(用向量相似度>τ 则去重;过滤过短/过长;黑名单词)。
- 发到多个检索器(BM25/向量/正则/SQL),并行召回。
- 用 RRF(Reciprocal Rank Fusion)/Borda/学习排序融合,取 Top-K。
- 可选:将 Top 文档做交叉编码器重排(如 bge-reranker、co-condenser 等)。
示例:
- 原始问题:“在《流浪地球》中,刘慈欣对人工智能和未来社会结构有何看法?”
- 分解后的子问题:
- “《流浪地球》中描述的人工智能技术有哪些?”
- “《流浪地球》中描绘的未来社会是怎样的?”
- “刘慈欣关于人工智能的观点是什么?”
3.1.3 退步提示(Step-Back Prompting)
退步提示是让模型先“后退半步”,把问题提升到更抽象、更本质的表述(principle/higher-level question),再由此生成具体检索查询。常用于表述很“表层”但语料以“概念/原理/定义”出现的场景。
操作流程
- 生成高层问题:请模型把原问概括为 1~2 个“更一般”的问题或原则表述。
- 再下钻:由高层问题反推 1~3 条可检索的具体查询(关键词/向量摘要)。
- 混合检索:高层与下钻查询共同检索,融合去偏。
示例
图中展示了两个例子(理想气体问题 & Estella Leopold 的教育经历),对比了 Chain-of-Thought 和 Step-Back Prompting 的表现。这两个原始问题分别是:
- 例 1(物理问题):气体温度增加 2 倍,体积增加 8 倍,压力会怎样变化?
- 例 2(教育经历问题):Estella Leopold 在 1954 年 8 月到 11 月之间就读于哪所学校?
Chain-of-Thought(直接推理,左侧)是让模型直接一步步算/推理。但它出现了一些错误:在例 1 中推理链条出错,算出结果“压力减少 16 倍”而非正确的“4 倍”;在例 2 中模型误读时间与地点信息,给出错误学校。这表示Chain-of-Thought 容易被局部细节带偏,犯逻辑错误或事实性错误。
Step-Back Prompting(右侧)分成两个步骤:
- Step 1:抽象(Abstraction)
- 把原始问题上升到一个更普遍、更高层次的问题。
- 例 1:问“这个问题背后的物理原理是什么?” → 得到“理想气体定律”。
- 例 2:问“她的教育经历是什么?” → 得到完整的教育背景列表(本科、硕士、博士)。
- Step 2:推理(Reasoning)
- 基于抽象问题的答案(即背景知识/原理),再回到原始问题进行解答。
- 例 1:用理想气体定律 PV=nRTPV = nRTPV=nRT → 正确推导压力减少 4 倍。
- 例 2:从教育经历中锁定时间段 → 正确判断她在耶鲁大学博士阶段,回答“她当时在 Yale University”。
最终,Step-Back Prompting 避免了原始 CoT 的错误,得出了正确答案。
3.1.4 假设性文档嵌入(HyDE)
让模型先生成一段“假设性答案文档”(看起来像真正的答案摘要,但不承诺其正确),然后对这段文档做向量嵌入,用它去查相似文档。直觉:答案的嵌入往往比“问题的嵌入”更接近真实答案所在文本。
假设性文档嵌入(Hypothetical Document Embeddings, HyDE)是一种无需微调即可显著提升向量检索质量的查询改写技术,由 Luyu Gao 等人在其论文中首次提出3。其核心是解决一个普遍存在于检索任务中的难题:用户的查询(Query)通常简短、关键词有限,而数据库中存储的文档则内容详实、上下文丰富,两者在语义向量空间中可能存在“鸿沟”,导致直接用查询向量进行搜索效果不佳。Zilliz 的一篇技术博客4也对该技术进行了深入浅出的解读。
操作流程(标准 HyDE Pipeline)
- 生成假设文档:
- 体裁:与你语料风格一致(技术摘要、条例条目、FAQ 口吻等)。
- 长度:一般 80~200 字(过短信息不足,过长稀释语义)。
- 约束:中立、可错但不妄断(避免硬性数值与来源声明)。
- 向量化:对假设文档做 embedding(与语料同模型/同维度)。
- 向量检索:以该向量在向量库检索 Top-K。
- 重排与验证:用交叉编码器重排;把真实文档交给生成模型回答并做自洽/引用校验。
1.输入(Instruction + Query)
用户提出问题,例如:
- “How long does it take to remove wisdom tooth?”
- “How has the COVID-19 pandemic impacted mental health?”
- 韩语问题:“ 인간은 언제 불을 사용했는가?”
这些问题可能措辞不同、语言不同,甚至是开放式问题。
2.HyDE 生成假设性文档(Generated Document)
GPT 接到问题后,不是直接回答用户,而是写一段“看似合理的回答”。
- 例 1:生成 “It usually takes between 30 minutes and two hours to remove a wisdom tooth …”
- 例 2:生成 “… depression and anxiety had increased by 20% since the start of the pandemic …”
- 例 3:生成韩语段落 “… 불을 사용한 기록은 약 80만 년 전부터 나타나…”
这些文档 不一定完全正确,但包含了与答案相关的核心词汇、上下文、语义。
3.向量化 & 检索(Contriever 阶段)**
把生成的假设文档送入向量模型(如 Contriever),得到嵌入向量。
➡️ 然后在向量数据库里检索,找到 最相似的真实文档。
4.返回真实文档(Real Document)
最终系统返回语料库中真正存在的文档片段:
- 例 1:真实文档说明“拔智齿时间可能几分钟到 20 分钟以上”。
- 例 2:真实研究表明“COVID-19 患者中抑郁水平显著更高”。
- 例 3:真实考古学研究指出“人类使用火大约在 142 万年前”。
这些才是最终作为证据的检索结果。
HyDE 的原理是先由 LLM 生成一篇“假设性回答文档”,再将该文档向量化用于检索。这样比直接用原始问题更贴近真实答案所在的文档,从而提升召回率和检索效果。
3.2 查询路由
查询路由(Query Routing) 是用于优化复杂 RAG 系统的一项关键技术。当系统接入了多个不同的数据源或具备多种处理能力时,就需要一个“智能调度中心”来分析用户的查询,并动态选择最合适的处理路径。其本质是替代硬编码规则,通过语义理解将查询分发至最匹配的数据源、处理组件或提示模板,从而提升系统的效率与答案的准确性。
在多数据源 / 多检索器 / 多提示模板的 RAG 体系里,查询路由就是一个“智能调度中心(mixture-of-experts 的门控/Gating)”。它读取用户查询,判定意图与约束,再把请求分发到最合适的:
- 数据源(向量库、BM25 索引、SQL/OLAP、时序库、网页检索……)
- 检索器或工具(reranker、代码搜索、表格问答、数学求解器……)
- 提示模板与查询翻译策略(单查询/多查询、退步提示、HyDE 等)
目标:在 质量(可证答案率、准确度)× 成本(时延、token) 上最优。
查询路由的应用场景十分广泛。
- 数据源路由:这是最常见的场景。根据查询意图,将其路由到不同的知识库。例如:
- 查询“最新的 iPhone 有什么功能?” -> 路由到产品文档向量数据库。
- 查询“我上次订购了什么?” -> 路由到用户历史SQL数据库(执行Text-to-SQL)。
- 查询“A公司和B公司的投资关系是怎样的?” -> 路由到企业知识图谱数据库。
- 组件路由:根据问题的复杂性,将其分配给不同的处理组件,以平衡成本和效果。
- 简单FAQ → 直接进行向量检索,速度快、成本低。
- 复杂操作或需要与外部API交互 → 调用 Agent 来执行任务。
- 提示模板路由:为不同类型的任务动态选择最优的提示词模板,以优化生成效果。
- 数学问题 → 选用包含分步思考(Step-by-Step)逻辑的提示模板。
- 代码生成 → 选用专门为代码优化过的提示模板。
3.2.1 基于 LLM 的意图识别
这个方法的本质是:用 LLM 对输入问题进行分类 → 把分类结果作为意图标签 → 用意图标签驱动路由分支 → 最终选择最合适的处理链。
基于 LLM 的意图识别的核心思想是:
- 让大语言模型直接对用户问题进行分类,得到一个结构化的标签(例如 “川菜”、“粤菜”、“其他”)。
- 然后根据这个标签,决定要把问题交给哪个下游的处理链(retriever、工具、数据源、提示模板)。
换句话说:
LLM 在这里充当了一个 智能分类器(代替传统的关键词匹配 / 规则硬编码),利用语义理解能力来做更精准的任务/领域识别 → 再交由路由器分发。
代码
import os
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_deepseek import ChatDeepSeek
from langchain_core.runnables import RunnableBranchllm = ChatDeepSeek(model="deepseek-chat", temperature=0, api_key=os.getenv("DEEPSEEK_API_KEY"))# 1. 设置不同菜系的处理链
sichuan_prompt = ChatPromptTemplate.from_template("你是一位川菜大厨。请用正宗的川菜做法,回答关于「{question}」的问题。"
)
sichuan_chain = sichuan_prompt | llm | StrOutputParser()cantonese_prompt = ChatPromptTemplate.from_template("你是一位粤菜大厨。请用经典的粤菜做法,回答关于「{question}」的问题。"
)
cantonese_chain = cantonese_prompt | llm | StrOutputParser()# 定义备用通用链
general_prompt = ChatPromptTemplate.from_template("你是一个美食助手。请回答关于「{question}」的问题。"
)
general_chain = general_prompt | llm | StrOutputParser()# 2. 创建路由链
classifier_prompt = ChatPromptTemplate.from_template("""根据用户问题中提到的菜品,将其分类为:['川菜', '粤菜', 或 '其他']。不要解释你的理由,只返回一个单词的分类结果。问题: {question}"""
)
classifier_chain = classifier_prompt | llm | StrOutputParser()# 定义路由分支
router_branch = RunnableBranch((lambda x: "川菜" in x["topic"], sichuan_chain),(lambda x: "粤菜" in x["topic"], cantonese_chain),general_chain # 默认选项
)# 组合成完整路由链
full_router_chain = {"topic": classifier_chain, "question": lambda x: x["question"]} | router_branch
print("完整的路由链创建成功。\n")# 3. 运行演示查询
demo_questions = [{"question": "麻婆豆腐怎么做?"}, # 应该路由到川菜{"question": "白切鸡的正宗做法是什么?"}, # 应该路由到粤菜{"question": "番茄炒蛋需要放糖吗?"} # 应该路由到其他
]for i, item in enumerate(demo_questions, 1):question = item["question"]print(f"\n--- 问题 {i}: {question} ---")try:# 获取路由决策topic = classifier_chain.invoke({"question": question})print(f"路由决策: {topic}")# 执行完整链result = full_router_chain.invoke(item)print(f"回答: {result}")except Exception as e:print(f"执行错误: {e}")
结果(部分):
--- 问题 1: 麻婆豆腐怎么做? ---
路由决策: 川菜
回答: 好的,老师傅来啰!麻婆豆腐是川菜的看家菜,讲究的是「麻、辣、烫、鲜、
香、酥、活」七大境界。缺一样,都不是那个味儿。今天,我就把灶头上的硬功夫,给你摆一摆。---### **正宗麻婆豆腐(陈麻婆豆腐)**这道菜的灵魂不在于步骤多复杂,而在于对材料和火候的极致讲究。#### **一、核心食材(选对料,就成功了一大半)**1. **豆腐**:首选**石膏豆腐(南豆腐)**。质地细嫩,能充分吸收味道。韧豆腐 (北豆腐)也可,但口感稍逊。切成一厘米五见方的小块,用淡盐水浸泡备用(可去 豆腥且让豆腐不易碎)。
2. **肉末**:必须是**牛肉末**。传统用法是黄牛肉,炒出来的“酥”才香。用猪肉 是妥协,风味差一截。
3. **郫县豆瓣酱**:菜的灵魂!要选陈年**鹃城牌**或**丹丹牌**的红油豆瓣,颜 色暗红,香味醇厚。用时需用刀剁得极细,才能出色出味。
4. **花椒面**:必须是**现焙现舂的汉源大红袍花椒**。市面上的花椒粉香气跑了 大半。干锅小火焙香花椒,用擀面杖碾碎成面,那个麻香才是顶级的。
5. **蒜苗**:也叫青蒜。取蒜苗头(蒜白部分),用刀斜切成“马耳朵”形,最后撒 上,是“活”的点睛之笔。
6. **调料**:豆豉(剁碎)、姜末、蒜末、酱油、盐、水淀粉(红薯淀粉最佳)。
7. **辣椒面**:提辣增色,选用二荆条或秦椒面,香而不燥。#### **二、烹饪步骤(火候是精髓)**1. **煵肉酥(读 nàn)**:* 热锅冷油(菜籽油更香),下牛肉末,中火煸炒。* 将水分完全炒干,炒至牛肉末变得金黄酥香,吐油。这一步是“酥”的关键, 千万不能急。2. **炒臊子(炒酱料)**:* 将肉末拨到一边,留底油,转为中小火。* 下入剁细的郫县豆瓣酱,慢炒至油色红亮、香味四溢(约需1-2分钟)。 * 接着下剁碎的豆豉、姜末、蒜末,与豆瓣酱一起炒出复合香味。* 注意火候,绝不能炒焦。3. **烧豆腐**:* 沿着锅边烹入少许酱油,激出酱香。* 立刻倒入适量的**开水或肉汤**(水量约能没过豆腐的一半)。* 轻轻滑入沥干水分的豆腐块,用勺子背轻轻推匀(切忌用锅铲乱搅,豆腐会 碎)。* 加入少许盐和糖(糖是提鲜,吃不出甜味)调味。* 中火烧制,让味道慢慢烧进豆腐里。期间用勺子背轻轻推几次,防止粘锅。 4. **勾芡(三次勾芡法)**:* 这是让汤汁“包”住豆腐,达到“烫”和“鲜”的关键。* 第一次勾芡:汤汁烧掉一半时,淋入三分之一水淀粉,轻轻推动,让芡汁糊 化。* 第二次勾芡:汤汁变浓稠后,再淋入三分之一,继续推动。* 第三次勾芡:出锅前,淋入最后的水淀粉,大火收汁至红油亮汁紧紧包裹每 一块豆腐。* 撒入“马耳朵”蒜苗,略推一下即可。5. **出锅装盘与点睛**:* 将豆腐盛入碗中,立刻在正中央厚厚的撒上那一把**现舂的花椒面**。 * 最后再淋上一勺滚烫的**红油**,“刺啦”一声,彻底激发出花椒的麻香和辣 椒的辣香。#### **三、老师傅的秘诀*** **麻辣顺序**:一定是“辣在先,麻在后”。麻辣味来自炒制的豆瓣酱和辣椒面, 而麻味主要靠最后撒的花椒面,顺序不能错。
* **烫**:要用深碗盛装,保温性好。豆腐内部极其烫口,才是正宗。
* **活**:就是指绿色的蒜苗在红油中栩栩如生,不仅好看,更带来清新的口感, 解腻增香。
* **不勾芡的麻婆豆腐就是耍流氓**:汤汁清汤寡水,味道根本挂不到豆腐上。 好了,这道“陈麻婆豆腐”就算成了。你按这个方子做,保证麻辣鲜香烫酥活,巴适得 板!慢用,缺啥子调料再问我!
3.2.2嵌入相似性路由(Embedding Similarity Routing)
为每个数据源/工具/模板建立一个或多个原型向量(Prototypes),把它们放进“路由索引”。用户查询取嵌入后,与这些原型计算相似度(通常余弦相似度),选出最匹配的路由。
score(q,rj)=cos(E(q)∥E(q)∥,Cj∥Cj∥)\operatorname{score}(q, r_j) = \cos\left( \frac{E(q)}{\| E(q) \|}, \frac{C_j}{\| C_j \|} \right) score(q,rj)=cos(∥E(q)∥E(q),∥Cj∥Cj)
其中E(q)E(q)E(q)是查询嵌入,CjC_jCj是第jjj各路由原型向量。
第一步:定义路由描述并向量化
为每个路由创建一个详细的文本描述,并使用嵌入模型将其转换为向量,供后续相似度计算使用。
第二步:定义目标链
创建路由最终要分发到的目标处理链,并用一个字典 route_map
将路由名称和链对应起来。
第三步:定义路由函数
定义一个 route
函数,接收用户问题,计算与各路由描述的相似度,选择最相似的路由并调用相应的处理链。
第四步:组合并调用
最后,将 route
函数包装成一个 RunnableLambda
,形成一个完整的、可执行的路由链。
完整代码如下
import os
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_deepseek import ChatDeepSeek
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.runnables import RunnableLambda, RunnablePassthrough, RunnablePassthrough
from langchain_community.utils.math import cosine_similarity
import numpy as np# 1. 定义路由描述
sichuan_route_prompt = "你是一位处理川菜的专家。用户的问题是关于麻辣、辛香、重口味的菜肴,例如水煮鱼、麻婆豆腐、鱼香肉丝、宫保鸡丁、花椒、海椒等。"
cantonese_route_prompt = "你是一位处理粤菜的专家。用户的问题是关于清淡、鲜美、原汁原味的菜肴,例如白切鸡、老火靓汤、虾饺、云吞面等。"route_prompts = [sichuan_route_prompt, cantonese_route_prompt]
route_names = ["川菜", "粤菜"]# 初始化嵌入模型,并对路由描述进行向量化
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
route_prompt_embeddings = embeddings.embed_documents(route_prompts)
print(f"已定义 {len(route_names)} 个路由: {', '.join(route_names)}")# 2. 定义不同路由的目标链
llm = ChatDeepSeek(model="deepseek-chat", temperature=0, api_key=os.getenv("DEEPSEEK_API_KEY"))# 定义川菜和粤菜处理链
sichuan_chain = (PromptTemplate.from_template("你是一位川菜大厨。请用正宗的川菜做法,回答关于「{query}」的问题。")| llm| StrOutputParser()
)
cantonese_chain = (PromptTemplate.from_template("你是一位粤菜大厨。请用经典的粤菜做法,回答关于「{query}」的问题。")| llm| StrOutputParser()
)route_map = { "川菜": sichuan_chain, "粤菜": cantonese_chain }
print("川菜和粤菜的处理链创建成功。\n")# 3. 创建路由函数
def route(info):# 对用户查询进行嵌入query_embedding = embeddings.embed_query(info["query"])# 计算与各路由提示的余弦相似度similarity_scores = cosine_similarity([query_embedding], route_prompt_embeddings)[0]# 找到最相似的路由chosen_route_index = np.argmax(similarity_scores)chosen_route_name = route_names[chosen_route_index]print(f"路由决策: 检测到问题与“{chosen_route_name}”最相似。")# 获取对应的处理链chosen_chain = route_map[chosen_route_name]# 直接调用选中的链并返回结果return chosen_chain.invoke(info)# 创建完整的路由链
full_chain = RunnableLambda(route)# 4. 运行演示查询
demo_queries = ["水煮鱼怎么做才嫩?", # 应该路由到川菜"如何做一碗清淡的云吞面?", # 应该路由到粤菜"麻婆豆腐的核心调料是什么?", # 应该路由到川菜
]for i, query in enumerate(demo_queries, 1):print(f"\n--- 问题 {i}: {query} ---")try:# 传入字典,full_chain 会直接返回最终答案result = full_chain.invoke({"query": query})print(f"回答: {result}")except Exception as e:print(f"执行错误: {e}")
3.2.3 两种 Trick 的协同与仲裁
-
双通道投票:
FinalScorej=α⋅EmbedScorej+(1−α)⋅LLMScorejFinalScore_j=\alpha⋅EmbedScore_j+(1−\alpha)⋅LLMScore_j FinalScorej=α⋅EmbedScorej+(1−α)⋅LLMScorej
其中LLMScore
可由confidence
或离线校准后的概率给出。 -
规则优先级:
- 高敏/高合规域:LLM 置信度不足 → 并发 + 更强证据门槛;
- 极低相似度:直接拒识,回退到“通用向量库 + 询问澄清”。
-
成本驱动:在同等质量预期下,优先选择成本更低的路由。
参考文献
1.第四章检索优化