金融监管制度问答助手项目学习笔记(二)----RAG和评估
1、RAG流程
查询改写 → 多路检索(语义 + BM25)→ 重排序 → 上下文拼接 → Prompt 构造 → 生成回答
🧩 一、函数总体作用
def generation_with_knowledge_retrieval_local(query_str_input,category,retriever,bm25_retriever,storage_context,reranker: BaseNodePostprocessor | None = None,
) -> CompletionResponse:
📘 功能:
根据用户问题
query_str_input,结合语义检索和 BM25 检索,从知识库中找出相关内容,经过重排序与过滤后,构造 Prompt,最后调用语言模型生成回答。
📥 参数解释:
| 参数 | 类型 | 含义 |
|---|---|---|
query_str_input | str | 用户输入的问题 |
category | str | 如选择题,问答题 |
retriever | 向量检索器(如 QdrantRetriever) | 用于语义检索(基于向量相似度) |
bm25_retriever | BM25Retriever | 用于关键词检索(基于倒排索引) |
storage_context | StorageContext | 存储层,用于从 docstore 获取节点文本 |
reranker | BaseNodePostprocessor | 可选,用于结果重排序(如 cross-encoder reranker) |
返回:
一个元组
(response, context_str_list)
response: 模型生成的回答
context_str_list: 参与生成的知识块(上下文)
🧠 二、查询改写(Query Rewrite)
使用Qwen3-8B来进行问题改写,提示词如下
提示词:
QUERY_REWRITE_PROMPT = """##角色定义##
你是一个金融监管制度智能问答搜索Query改写助手。##指导规则##
1、你需要结合国家银监会、人民银行、外汇局发布的金融监管制度对用户问题Query进行泛化改写1-6条子Query。
2、你生成的子Query有助于用户去知识库中检索到相关、全面、延展的知识来回答用户问题,多条子Query之间应该彼此独立不重复,保证生成的Query数量最少生成原则。
3、你只需要输出改写后的多条子Query,多条子Query之间用[next]分割,也不要输出其他分析内容,下面是输出示例:
银行业金融机构申请信息安全管理体系认证应向哪个监管机构提交申请?[next]申请信息安全管理体系认证的机构应向哪个部门提出申请?[next]...##输入##
用户问题Query:
{query_str}"""
QUERY_REWRITE_PROMPT_input = QUERY_REWRITE_PROMPT.format(query_str=query_str_input)
rewrite_query_str = model_generate_nothink(QUERY_REWRITE_PROMPT_input)
-
使用一个「问题改写 prompt 模板」来调用模型;
-
目的:让模型生成多条语义等价的查询(比如同义问法),增强召回。
例如:
输入问题:
"今年中国银行的净利润是多少?"
可能生成:
"今年中国银行净利润是多少?[next]中国银行2024年盈利情况[next]中国银行的财务表现如何?"
🧩 三、解析多路查询(Query Expansion)
rewrite_split = rewrite_query_str.strip().split("[next]")
把模型输出按 [next] 切分成多条子查询。
如上例结果:
rewrite_split = ["今年中国银行净利润是多少?","中国银行2024年盈利情况","中国银行的财务表现如何?"
]
这相当于多路检索(Multi-query Retrieval),让系统覆盖更多表述方式。
🔍 四、语义检索 + 关键词检索(多路召回)
node_with_scores=[]
node_id_children_clear=[]for query_strs in rewrite_split:if query_strs=='': continuequery_bundle = QueryBundle(query_str=query_strs.strip())# 语义检索 (embedding 相似度)node_with_scores_bge = retriever._retrieve(query_bundle)# BM25 检索 (关键词匹配)node_with_scores_bm25 = bm25_retriever.retrieve(chinese_tokenizer(query_strs.strip()))
这一步:
-
用向量模型(xiaobu-embedding-v2)在向量数据库 Qdrant 中检索,返回top10;
-
同时用 BM25 (传统文本倒排索引)检索,检索前使用jieba分词,返回top10;
-
两种结果互补:
-
语义检索 → 语义相似内容;
-
BM25 → 精确关键词命中内容。
-
🧩 五、去重合并召回结果
for node in node_with_scores_bm25:if node.node.node_id not in node_id_children_clear:node_id_children_clear.append(node.node.node_id)node_with_scores.append(node)for node in node_with_scores_bge:if node.node.node_id not in node_id_children_clear:node_id_children_clear.append(node.node.node_id)node_with_scores.append(node)
这一段逻辑:
-
按顺序(BM25 → 向量检索)合并结果;
-
通过
node.node.node_id去重; -
最终得到一个不重复的召回集合。
🧠 六、重排序(Reranker)
if reranker:node_with_reranker = reranker.postprocess_nodes(node_with_scores, query_bundle)
如果提供了重排序器(reranker,如 cross-encoder 模型),
就根据查询与节点文本的相关性重新打分排序。
本项目使用:
BAAI/bge-reranker-v2-m3
在语义层面判断哪个 chunk 与 query 最贴切。
结果是一个 node_with_reranker 列表,按相关性降序排列,返回top10。
🧹 七、重复父节点过滤(Parent Node 去重)
node_with_filter = []
node_id_parent_clear=[]
for node in node_with_reranker:if NodeRelationship.PARENT in node.node.relationships:node.node.text = storage_context.docstore.get_node(node.node.relationships[NodeRelationship.PARENT].node_id).textif node.node.parent_node.node_id not in node_id_parent_clear:node_id_parent_clear.append(node.node.parent_node.node_id)node_with_filter.append(node)
解释:
| 步骤 | 含义 |
|---|---|
NodeRelationship.PARENT | 如果 node 是子块,则取出它的父节点文本 |
docstore.get_node() | 从存储中加载父节点完整文本 |
| 去重逻辑 | 过滤掉重复父节点的块,防止重复内容进入上下文 |
这一步的意义:
在分块检索场景中,不同子块可能来自同一个父段落;
这会导致内容重复、Prompt 太长。
因此要只保留每个父节点一次。
📚 八、拼接上下文知识块
context_str_list = [f"|知识点{i+1}|"+node.text for i,node in enumerate(node_with_filter)]
context_str = "\n".join(context_str_list)
示例输出:
|知识点1| 中国银行2024年净利润同比增长10%,主要由于利息收入提升。
|知识点2| 财务报告显示,净息差保持稳定,手续费收入增长5%。
...
这样生成一个完整的“知识上下文段落”,供模型生成回答。
💬 九、构造最终 Prompt 并调用生成模型
fmt_qa_prompt = QA_TEMPLATE_NO_THINK_COT.format(context_str=context_str, query_str=query_str_input
)
response = model_generate_nothink(fmt_qa_prompt)
-
QA_TEMPLATE_NO_THINK_COT回答模板(prompt)
QA_TEMPLATE_NO_THINK_COT = """##角色定义##
你是一个金融监管制度智能问答专家,你需要准确理解检索的金融文档知识来回复用户问题。##指导规则##
1、你需要精准理解用户问题,利用检索的金融文档知识来回复用户问题。
2、用户问题包括选择题和问答题:* 选择题(单选或者多选),你需要结合检索的金融文档知识,准确理解后给出正确选项,你只需要输出正确的选项即可。* 问答题,你需要结合检索的金融文档知识,利用相关信息来回复用户问题,答案需要尽可能简洁且准确,你只需要输出答案即可。
3、下面是输出格式,你需要按照以下格式输出:相关知识点引用:你需要思考那些知识点是与用户问题相关的,有助于生成准确答案,在这里写入相关知识点。答案:在这里写入回复用户问题的回复,用户问题是选择题时你只需要给出正确的答案选项,用户问题是问答题时你只需要给出精简的回复。##输入##
检索的金融文档知识:
{context_str}用户问题:
{query_str}"""
-
model_generate_nothink()就是调用 LLM(如 ChatGLM、Qwen、Llama 等)生成最终答案。
🧾 十、返回结果
return response, context_str_list
输出两个值:
-
response: 生成的答案; -
context_str_list: 参与回答的知识段落(便于调试或可视化)。
📈 总体流程图
用户问题 query_str_input│▼┌────────────┬────────────┐│ Query Rewrite (多路查询) │└────────────┴────────────┘│▼┌────────────┬────────────┐│ BM25 检索 │ 向量检索 (Xiaobu Embedding) │└────────────┴────────────┘│▼去重 + 合并结果│▼(可选) Reranker 重排序│▼父节点去重 / 合并│▼拼接上下文 context_str│▼Prompt = 模板 + 上下文 + 问题│▼调用模型 model_generate_nothink()│▼→ 返回回答 + 知识片段
✅ 小结
| 阶段 | 模块 | 作用 |
|---|---|---|
| 🔁 查询改写 | Query Rewrite | 生成多条语义等价查询,扩大召回 |
| 🔍 检索 | retriever + bm25_retriever | 语义 + 关键词双通道召回 |
| 🔢 去重合并 | node_id 去重 | 保证结果唯一性 |
| 🧮 重排序 | reranker | 提升相关性(可选) |
| 🧱 父节点过滤 | storage_context.docstore | 去除重复父块 |
| 📚 上下文拼接 | context_str | 构造 Prompt 输入 |
| 🧠 模型生成 | model_generate_nothink | 输出自然语言答案 |
2、评估
评估指标
选择题分数=答对的选择题数/总的选择题数
问答题分数=cos_similar(生成答案,参考答案)求和平均
(BERTScore:精确率、召回率、F1)
一、导入与初始化部分
import asyncio
from dotenv import dotenv_values
from llama_index.core import Settings, StorageContext
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.legacy.llms import OpenAILike as OpenAI
from qdrant_client import models
from llama_index.core.postprocessor import SentenceTransformerRerank
from ragfiles.rag import QdrantRetriever, generation_with_knowledge_retrieval_local, model_generate_nothink, model_generate_think
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.node_parser import SentenceSplitter, HierarchicalNodeParser, get_leaf_nodes
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.schema import Document, MetadataMode, NodeRelationship
from ragfiles.transformation import CustomFilePathExtractor, CustomTitleExtractor
from ragfiles.template import QA_TEMPLATE_THINK
import os, time, json, jieba
from tqdm import tqdm
功能:
-
导入 RAG 系统相关模块(
LlamaIndex、Qdrant、BM25、Reranker 等) -
导入自定义模块
ragfiles.rag和ragfiles.transformation -
导入中文分词工具
jieba -
导入进度条工具
tqdm用于显示处理进度
二、中文分词函数
def chinese_tokenizer(text: str):return " ".join(jieba.cut_for_search(text))
-
使用
jieba.cut_for_search对文本进行 中文搜索分词 -
将文本分词后返回空格分隔的字符串,方便 BM25 检索
三、向量模型与 Qdrant 初始化
COLLECTION_NAME="finance2025"
bge_large_embeding = HuggingFaceEmbedding(model_name="./xiaobu-embedding-v2",cache_folder="./",embed_batch_size=1024,
)
Settings.embed_model = bge_large_embeding
-
使用 Xiaobu 嵌入模型(本地 HuggingFace)
-
设置 batch size=1024
-
将其设置为全局 embedding 模型
lock_file = os.path.join("./qdrants/xiaobuembedding_1024_100", ".lock")
if os.path.exists(lock_file):os.remove(lock_file)
-
删除可能存在的锁文件,避免 Qdrant 数据库被占用
client = QdrantClient(path="./qdrants/xiaobuembedding_1024_100")
vector_store = QdrantVectorStore(client=client,collection_name="finance2025",parallel=8,batch_size=512,
)
-
初始化 Qdrant 本地客户端和向量存储
-
parallel=8表示多线程插入/查询 -
batch_size=512表示向量批量操作大小
四、BM25 检索器初始化
storage_context = StorageContext.from_defaults(persist_dir="./qdrants/bm25data")
dict_list = storage_context.docstore.get_all_ref_doc_info()
nodes_list=[]
for doc_id,value in dict_list.items():nodes_list.extend(value.node_ids)
all_nodes = storage_context.docstore.get_nodes(nodes_list)
leaf_nodes = get_leaf_nodes(all_nodes)
for node in leaf_nodes:node.text = chinese_tokenizer(node.text)
bm25_retriever = BM25Retriever.from_defaults(nodes=leaf_nodes, similarity_top_k=10)
逻辑:
-
从持久化存储
./qdrants/bm25data读取所有文档节点 -
获取叶子节点(最小分块)
-
对每个节点文本进行中文分词
-
初始化 BM25 检索器,
similarity_top_k=10表示检索 top 10 文档
五、向量检索器与重排序器
bge_large_retriever = QdrantRetriever(vector_store, bge_large_embeding, similarity_top_k=10)
rerank = SentenceTransformerRerank(top_n=10, model="./bge-reranker-v2-m3")
-
bge_large_retriever:向量检索器 -
rerank:用 SentenceTransformer 做交叉编码重排序 top 10 节点,提高检索精度
六、RAG 本地问答封装函数
def rag_qa_local(query, category):response = ""context_str = ""try:response, context_str = generation_with_knowledge_retrieval_local(query, category, bge_large_retriever, bm25_retriever, storage_context, rerank)print("输出 "+response)answer = response.split('答案:')[1].strip()return answer, context_strexcept Exception as e:print("return answer error "+str(e))return response, context_str
功能:
-
对输入 query 使用本地 RAG 检索生成回答
-
返回两个值:
-
answer→ 解析出答案:后的内容 -
context_str→ 用于生成的知识上下文
-
七、主函数 main() 逻辑
data = open('./测试集/dev500.json','r').readlines()
rag_file = open('./dev_answer.json','w')
-
读取测试集 JSON 每行一条
-
输出文件
dev_answer.json用于保存结果
测试集demo如下:
{"id":0,"category":"选择题","question":"某股份制银行2019年已完成“两增”目标且普惠贷款占比超10%,其在2020年的最低考核要求是什么?","content":" A. 必须继续保持30%以上的增速 \n B. 只需维持贷款余额和户数不低于年初水平 \n C. 需额外增加信用贷款比例 \n D. 暂停当年的小微企业贷款任务 "}
{"id":1,"category":"选择题","question":"以下哪些属于柜台债券市场的合法交易品种?","content":" A. 现券买卖 \n B. 股指期货 \n C. 质押式回购 \n D. 买断式回购 "}
{"id":2,"category":"选择题","question":"境外机构境内发行债券的登记手续通常由谁负责办理?","content":" A. 开户银行\n B. 境内主承销商\n C. 当地税务机关\n D. 债券认购方"}
{"id":3,"category":"选择题","question":"债券市场资金专户允许的资金支出包括哪些?","content":" A) 支付债券交易价款 \n B) 购买海外股票资产 \n C) 投资收益汇至母公司账户 \n D) 房地产项目定向投资 "}
{"id":4,"category":"选择题","question":"质押贷款期限内允许的行为有哪些?","content":" A. 借款人申请展期 \n B. 存款行主动变更存单利率 \n C. 出质人自行办理存单挂失 \n D. 继承人办理存款过户手续 "}
{"id":5,"category":"问答题","question":"若发现某金融机构在回购交易中多次突破质押比例上限,监管当局可采取哪些处置手段?","content":null}
初始化统计变量
choose_count=0
answer_count=0
count=0
-
选择题数、应答题数、漏答数
循环处理每条测试样例
for item in tqdm(data):item = json.loads(item)id = item['id']category = item['category']question = item['question']content = item['content']if content!=None:input_data = category+'\n'+question+'\n'+contentelse:input_data = category+'\n'+question
-
构造输入数据:类别 + 问题 + 可选内容
-
目的是让模型同时看到问题类别、正文上下文
调用 RAG 问答
answer, context_str = rag_qa_local(input_data, category)
-
调用前面封装的函数
-
返回回答 + 上下文
选择题处理逻辑
choose_list=[]
if item['category']=="选择题":for choose_item in ['A','B','C','D','E','F']:if choose_item in answer:choose_list.append(choose_item)if choose_list==[]:# RAG无法给出答案,使用think模式QA_TEMPLATE_WITH_THINK_input = QA_TEMPLATE_THINK.format(query_str=input_data)think_answer = model_generate_think(QA_TEMPLATE_WITH_THINK_input)jieduan_answer = think_answer.split('答案:')[1].strip()for choose_item in ['A','B','C','D','E','F']:if choose_item in jieduan_answer:choose_list.append(choose_item)count+=1answer=choose_listchoose_count+=1
逻辑:
-
检查答案是否包含 A-F
-
如果 RAG 无法给出答案,使用 Think 模式推理(类似 CoT,让 LLM 思考再答)
-
更新统计信息
非选择题(开放问答)处理
else:answer_count+=1if answer=="":QA_TEMPLATE_WITHOUT_RAG_input = QA_TEMPLATE_WITHOUT_RAG.format(query_str=input_data)think_answer = model_generate_think(QA_TEMPLATE_WITHOUT_RAG_input)jieduan_answer = think_answer.split('答案:')[1].strip()answer = jieduan_answercount+=1
-
如果 RAG 无法回答开放问题,也用 Think 模式补救
-
避免空答案
写入输出文件
rag_file.write(json.dumps({"id": item["id"],"answer": answer,"context": context_str
}, ensure_ascii=False)+'\n')
-
保存答案和知识上下文(便于调试和复盘)
统计信息打印
print("选择题"+str(choose_count),"应答题"+str(answer_count),"漏答题"+str(count))
🧩 总体流程总结
-
初始化:向量模型 + Qdrant + BM25 + Reranker
-
读取测试集
-
构造输入:类别 + 问题 + 内容
-
RAG 检索生成回答
-
查询改写、多路检索(BM25 + Embedding)
-
去重、父节点过滤
-
上下文拼接 → 模型生成回答
-
-
Think 模式:RAG 无法回答时,启用 LLM 推理
-
选择题处理:提取 A-F
-
写入输出文件
-
统计输出结果
💡 特点:
-
使用 多路检索 + 重排序 + Think 模式补救 提高答题覆盖率
-
支持 选择题和开放题
-
保存 上下文和答案,方便可追溯
-
全部在本地完成,无需在线 API(除非 Think 模式依赖 LLM API)
