Langchain RAG介绍和最佳实践
RAG 介绍
RAG 系统深度解析及实践指南
一、RAG 核心概念
定义:检索增强生成(Retrieval-Augmented Generation)系统通过结合信息检索与生成模型,解决纯生成模型的幻觉问题。
企业级价值:
- 知识实时更新(无需重新训练模型)
- 精准控制数据来源
- 审计追踪能力
二、RAG 核心组件(按实现优先级排序)
- 索引系统(Indexing)
- 检索系统(Retrieval)
- 生成系统(Generation)
- 增强组件(Reranking/Query Expansion)
1. 索引系统架构
(1) 文档加载
最佳实践代码:
from langchain_community.document_loaders import (
DirectoryLoader,
UnstructuredFileLoader,
SeleniumURLLoader
)
# 企业级多源加载
loaders = {
'pdf': DirectoryLoader('./docs', glob="**/*.pdf", loader_cls=UnstructuredFileLoader),
'web': SeleniumURLLoader(urls=["https://company-wiki.com"])
}
documents = []
for loader in loaders.values():
documents.extend(loader.load())
(2) 文本分块
企业级分块策略:
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
# 层级分块(保留文档结构)
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[("#", "Header1")])
md_splits = markdown_splitter.split_text(document.page_content)
# 通用分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
length_function=len,
add_start_index=True
)
all_splits = text_splitter.split_documents(documents)
(3) 向量编码
Embedding 原理:
- 将文本映射到高维空间(通常768-1536维)
- 相似文本在向量空间中距离相近
生产级实现:
from langchain_openai import OpenAIEmbeddings
from langchain.embeddings import HuggingFaceBgeEmbeddings
# 商业方案
oai_embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
# 开源方案
bge_embeddings = HuggingFaceBgeEmbeddings(
model_name="BAAI/bge-large-zh-v1.5",
encode_kwargs={'normalize_embeddings': True}
)
(4) 向量存储
from langchain_chroma import Chroma
from langchain_community.vectorstores import FAISS
# 生产环境推荐
vectorstore = Chroma.from_documents(
documents=all_splits,
embedding=oai_embeddings,
persist_directory="./chroma_db"
)
# 内存型方案
faiss_db = FAISS.from_documents(all_splits, bge_embeddings)
2. 检索系统详解
(1) 基础检索流程
(2) 混合检索实现
from langchain.retrievers import BM25Retriever, EnsembleRetriever
# 稀疏检索
bm25_retriever = BM25Retriever.from_documents(all_splits)
bm25_retriever.k = 3
# 稠密检索
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 混合检索
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.3, 0.7]
)
(3) Rerank 重排序
实现原理:
- 使用交叉编码器进行精细相关性评估
- 典型方案:Cohere Rerank、BGE Reranker
生产级代码:
from langchain_cohere import CohereRerank
reranker = CohereRerank(
cohere_api_key="your_key",
top_n=5 # 保留最终结果数
)
retrieval_chain = ensemble_retriever | reranker
3. 生成系统优化
关键策略:
contextualize_q_prompt = """基于以下上下文,请用中文专业地回答:
上下文:{context}
问题:{question}
要求:
1. 如果上下文不相关,回答'信息不足'
2. 引用具体数据时标明出处
3. 使用Markdown格式排版"""
rag_prompt = ChatPromptTemplate.from_template(contextualize_q_prompt)
生成过程:
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4-1106-preview")
retrieval_chain = (
{"context": ensemble_retriever, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
4. RAG 全流程构建
(1) 架构流程图
(2) 端到端实现
# 企业级完整链
class EnterpriseRAG:
def __init__(self):
self.retriever = self._build_retriever()
self.llm = ChatOpenAI()
def _build_retriever(self):
# 包含数据加载、处理、存储全流程
...
return ensemble_retriever
def query(self, question: str) -> str:
result = retrieval_chain.invoke(question)
self._audit_log(question, result)
return result
def _audit_log(self, query, response):
# 记录审计日志
pass
# 使用示例
rag_system = EnterpriseRAG()
response = rag_system.query("公司Q3财报的关键数据是什么?")
5. RAG 优化策略
(1) 检索优化
# 查询扩展
from langchain.retrievers import QueryAugmentationRetriever
from langchain.retrievers.multi_query import MultiQueryRetriever
augmented_retriever = MultiQueryRetriever.from_llm(
retriever=base_retriever,
llm=llm,
include_original=True
)
# 元数据过滤
vectorstore.as_retriever(search_kwargs={"filter": {"department": "finance"}})
(2) 生成优化
# 渐进式生成
def iterative_generation(contexts):
for context in contexts[:3]:
yield llm.generate(context_prompt.format(context))
# 事实校验
from langchain_community.guards import FactChecker
checker = FactChecker(retriever=retriever)
checked_response = checker.check(response)
(3) 企业级增强方案
# 异步处理
async def process_batch(questions):
semaphore = asyncio.Semaphore(10) # 并发控制
async with semaphore:
return await chain.abatch(questions)
# 缓存机制
from langchain.cache import RedisSemanticCache
langchain.llm_cache = RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=oai_embeddings
)
6. RAG 应用场景
典型应用案例:
- 企业知识库:结合Confluence/Notion数据源
- 技术文档查询:精准定位API文档
- 合规审查:快速检索法律条款
- 客户服务:实时产品信息查询
金融领域特化案例:
class FinancialRAG(EnterpriseRAG):
def __init__(self):
super().__init__()
self._attach_analytics()
def _build_retriever(self):
# 添加财务数据特殊处理
self.embedding = FinBERTEmbeddings()
...
def _attach_analytics(self):
# 集成数据分析模块
from analytics_lib import FinancialAnalyzer
self.analyzer = FinancialAnalyzer()
def query_financial(self, question):
raw_data = self.retriever.get_relevant_documents(question)
analyzed = self.analyzer.process(raw_data)
return self.llm.generate(analyzed)
生产环境监控指标
# 关键性能指标
METRICS = {
'retrieval_precision': ...,
'latency': ...,
'cache_hit_rate': ...,
'hallucination_rate': ...
}
# Prometheus 集成
from prometheus_client import start_http_server, Summary
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
@REQUEST_TIME.time()
def process_request(request):
# 业务逻辑
RAG 架构演进路线
基础版RAG -> 混合检索型 -> 领域增强型
-> 流式处理 -> 分布式RAG集群
建议从简单原型开始快速验证,逐步加入企业级特性:先实现基础检索流程,再优化召回率,最后增强生成质量。最新技术趋势表明,结合图数据库的知识图谱增强RAG将成为下一代发展方向。
RAG方案
内部知识库 RAG+Agent 专项实施方案
一、数据治理与知识建模(关键基础)
- 多源数据对接
- 建立统一接入网关:
- Confluence:通过REST API增量同步(保留页面层级)
- 文件系统:部署Filebeat监听共享目录变化(支持PDF/Word/Markdown)
- 业务系统:对接CRM/ERP元数据接口(获取业务实体关系)
- 智能文档解析
- 分层解析策略:
- 动态分块优化
- 混合分块机制:
- 基础分块:RecursiveCharacterTextSplitter(512 tokens)
- 语义分块:利用BERT模型检测段落边界
- 结构分块:对技术文档保留章节层级关系
- 元数据增强:
- 自动附加文档来源、最后更新时间、编辑者
- 业务标签:部门/项目/保密等级
二、检索增强架构设计(核心模块)
- 多级混合检索
- 召回组件选型
- 向量引擎:FAISS HNSW64索引(精度优化模式)
- 关键词库:Elasticsearch 倒排索引(BM25算法)
- 知识图谱:Neo4j存储实体关系(支持多跳查询)
- 重排序优化
- 两阶段排序策略:
- 第一阶段:Cohere Rerank(top 50→10)
- 第二阶段:领域适配模型(Fine-tuned BERT)
- 业务规则注入:
- 时效性加权(近3月文档+20%)
- 权威性加权(高管签发文档+15%)
三、Agent决策引擎设计
- 意图识别层
- 分类体系:
- 复杂任务处理
- 工作流示例:
用户:对比2023年两个项目的技术方案差异 → Agent分解: 1. 检索项目A方案文档(版本控制) 2. 检索项目B方案文档 3. 调用Diff工具生成对比报告 4. 提取关键差异点摘要
- 业务工具集成
- 内部工具对接:
- 审批系统:自动生成工单
- 知识图谱编辑器:补全缺失关系
- 版本对比工具:集成Git文档比对
四、企业级特性实现
- 权限控制体系
- 四级访问控制:
1. 文档级:AD组权限继承 2. 段落级:敏感信息脱敏 3. 操作级:读写分离控制 4. 审计级:水印追踪
- 动态知识更新
- 变更捕获机制:
- 文档修改 → 触发增量索引更新
- 紧急通知 → 插入优先缓存层
- 知识过期 → 自动标注失效状态
- 效果评估系统
- 评估矩阵:
- bad case分析:
- 建立典型误召回样本库
- 设置混淆词检测规则(如产品型号近似词)
五、落地实施路径
- 数据准备阶段(2周)
- 选择试点部门(建议从技术文档开始)
- 建立标注团队(标注500+标准问答对)
- 效果调优阶段(4周)
- 分阶段验证:
迭代1:基准检索(0.58准确率) 迭代2:混合召回(0.72) 迭代3:rerank优化(0.85+)
- 权限灰度上线(1周)
- 按角色分批开放:
- Day1:只读权限测试组
- Day3:带审批的写入权限
- Day7:全功能开放
六、关键成功要素
- 知识质量看板
- 实时监测:
- 文档覆盖率(目标>95%)
- 知识新鲜度(周更新率>30%)
- 检索衰减曲线(季度衰减<5%)
- 领域适配方案
- 建立企业专属词库:
- 产品型号别名映射表
- 内部术语标准词典
- 禁用词过滤列表
- 容灾保护机制
- 三层降级策略:
正常模式:混合检索+Agent决策 降级模式1:关键词检索优先 降级模式2:静态FAQ库应答
该方案需配套建立跨部门协同机制,建议设置RAG运营官角色统筹知识治理,初期聚焦20%核心文档实现80%问答覆盖率,逐步形成知识生态闭环。
企业级 RAG+Agent 系统落地实施路线图
第一阶段:需求分析与设计规划(2-4周)
- 业务场景定义
- 明确核心场景:客户服务/内部知识库/合规审查等
- 确定覆盖文档类型:PDF/Word/Confluence/数据库等
- 定义成功指标:响应准确率(>95%)、延迟(<3s)、并发能力(1000+ QPS)
- 技术需求分析
- 数据安全等级:GDPR/CCPA合规要求
- 集成系统清单:AD域认证、监控系统、日志平台
- 硬件资源评估:GPU服务器集群规模、分布式存储方案
- 架构蓝图设计
第二阶段:数据治理体系建设(4-8周)
- 多源数据管道
- 建立自动化数据摄取流程:
- 文档预处理标准
- 分级清洗策略:
- Level 1:格式标准化(PDF转Markdown)
- Level 2:噪音去除(页眉页脚/水印)
- Level 3:元数据增强(添加部门/版本/密级标签)
- 智能分块策略
- 动态分块机制:
- 技术文档:按API端点分块(保留代码示例)
- 合同文件:按条款分块(保留层级关系)
- 会议纪要:实体识别分块(提取决策点)
第三阶段:核心引擎构建(6-12周)
- 混合检索系统
- 四级召回架构:
1. 关键词检索(Elasticsearch) 2. 向量检索(FAISS集群) 3. 图关系检索(Neo4j) 4. 业务规则检索(预置策略库)
- Agent决策框架
- 分层决策模型:
- 企业级记忆管理
- 三层记忆体系:
- 短期记忆:对话上下文缓存(Redis集群)
- 长期记忆:用户行为向量化存储
- 组织记忆:企业知识图谱更新机制
第四阶段:企业级增强组件(持续迭代)
- 安全控制层
- 五层防护体系:
1. 输入过滤:SQL注入/XSS检测 2. 权限控制:基于AD的RBAC模型 3. 输出审查:敏感词过滤+人工复核队列 4. 审计追踪:全流程操作日志 5. 漏洞扫描:定期渗透测试
- 运维监控体系
- 关键监控面板:
- 性能看板:P99延迟/错误率/GPU利用率
- 知识新鲜度:文档更新时间分布
- 安全态势:异常请求热力图
- 持续学习机制
- 闭环优化流程:
第五阶段:部署与扩展(2-4周)
- 混合云部署架构
- 典型拓扑:
[公有云] ├── API网关 ├── 无状态服务 └── 缓存集群 [私有云] ├── 向量数据库 ├── 知识图谱 └── 敏感数据存储
- 渐进式上线策略
- 四阶段发布:
1. 内部测试:核心部门试用 2. 影子模式:并行运行不实际响应 3. 区域灰度:按分公司逐步放开 4. 全量上线:旧系统熔断保护
- 扩展能力设计
- 插件化架构:
- 新数据源适配器插件
- 第三方工具对接插件
- 领域模型微调插件
第六阶段:运营与优化(持续进行)
- 知识运营体系
- 三大知识工程角色:
- 知识架构师:设计文档分类体系
- 数据工程师:维护数据管道
- 领域专家:质量审核与标注
- 性能调优方向
- 典型优化场景:
- 冷启动加速:预加载热点知识
- 缓存策略:向量结果分级缓存
- 模型蒸馏:小模型接力处理简单query
- 业务价值度量
- ROI评估模型:
节省人力成本 = 日均处理量 × 人工处理平均耗时 × 人力成本系数 质量提升收益 = 错误率下降百分点 × 单错误挽回金额
关键成功要素
- 组织协同机制
- 建立跨部门AI治理委员会
- 制定知识贡献激励机制
- 技术风险控制
- 设计降级方案(如ES回退检索)
- 保留人工接管通道
- 用户体验管理
- 设计渐进式响应:
- 第一步:快速返回知识片段
- 第二步:生成结构化摘要
- 第三步:提供深度分析选项
此方案需根据企业实际情况调整,建议从最小可行产品(MVP)起步,每阶段设置明确验证指标,重点保障知识治理体系与安全控制机制的早期建设。
RAG 简单文档检索实现
包含文档上传、预处理、会话记忆、双阶段检索(Embedding + Rerank)等核心功能,使用当前较优的中文模型方案:
1. 环境准备
pip install langchain langchain-community faiss-cpu unstructured huggingface_hub sentence-transformers torch
2. 完整代码实现
import os
from io import StringIO
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferWindowMemory
class RAGSystem:
def __init__(self):
# 初始化配置
self.embedding_model = "BAAI/bge-small-zh-v1.5" # 中英双语Embedding
self.rerank_model = "BAAI/bge-reranker-base" # 重排序模型
self.chunk_size = 512 # 文本块大小
self.chunk_overlap = 64 # 块重叠量
self.top_k = 5 # 初步检索数量
self.rerank_top_k = 3 # 重排序后保留数量
# 初始化模型
self.embeddings = HuggingFaceEmbeddings(
model_name=self.embedding_model,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
# 初始化对话记忆
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history",
k=3,
return_messages=True
)
def process_uploaded_file(self, file_path):
"""处理上传的TXT文件"""
loader = TextLoader(file_path, autodetect_encoding=True)
documents = loader.load()
# 文本分块与清洗
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?"]
)
splits = text_splitter.split_documents(documents)
# 简单文本清洗
cleaned_splits = []
for doc in splits:
cleaned_content = doc.page_content.replace('\u3000', ' ').strip()
if len(cleaned_content) > 10: # 移除空内容
doc.page_content = cleaned_content
cleaned_splits.append(doc)
# 创建向量库
self.vector_store = FAISS.from_documents(
cleaned_splits,
self.embeddings
)
return len(cleaned_splits)
def _hyde_expansion(self, query):
"""HyDE查询扩展优化"""
llm = ChatOpenAI(temperature=0.1)
template = """根据以下问题生成一个假设性的答案框架:
问题:{query}
假设答案应包含:"""
prompt = ChatPromptTemplate.from_template(template)
chain = prompt | llm
return chain.invoke({"query": query}).content
def get_retriever(self):
"""创建带优化的检索器"""
base_retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": self.top_k}
)
# 历史感知提示词
contextualize_q_prompt = """根据聊天历史和后续问题,重构搜索查询。
历史:
{chat_history}
问题:
{input}"""
return create_history_aware_retriever(
ChatOpenAI(temperature=0),
base_retriever,
contextualize_q_prompt
)
def query(self, question):
"""执行带记忆的查询"""
# 结合HyDE扩展
expanded_query = f"{question} {self._hyde_expansion(question)}"
# 构建处理链
retriever = self.get_retriever()
qa_prompt = ChatPromptTemplate.from_template("""
你是一个专业助手,请基于以下上下文回答:
{context}
历史对话:
{chat_history}
问题:{input}""")
question_answer_chain = create_stuff_documents_chain(
ChatOpenAI(temperature=0.3),
qa_prompt
)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
# 执行查询
response = rag_chain.invoke({
"input": expanded_query,
"chat_history": self.memory.load_memory_variables({})["chat_history"]
})
# 更新记忆
self.memory.save_context(
{"input": question},
{"output": response["answer"]}
)
return response["answer"]
# 使用示例
if __name__ == "__main__":
rag = RAGSystem()
# 上传并处理文档
file_path = "知识库.txt" # 替换为实际文件路径
doc_count = rag.process_uploaded_file(file_path)
print(f"已处理 {doc_count} 个文本块")
# 测试查询
print(rag.query("我们公司的主要产品是什么?"))
print(rag.query("这些产品的技术优势有哪些?")) # 此处会使用对话历史
3. 关键技术解析
1. 双阶段检索优化:
- 第一阶段:使用BAAI/bge-small-zh-v1.5进行语义检索
- 第二阶段:使用BAAI/bge-reranker-base重排序(需单独调用)
2. 特色功能实现:
- HyDE扩展:通过生成假设答案框架增强查询表述
- 动态分块策略:根据中文标点优化分块逻辑
- 记忆管理:保留最近3轮对话历史作为上下文
- 数据清洗:处理全角空格等常见格式问题
4. 性能优化建议:
# 在process_uploaded_file中添加:
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain.retrievers import ContextualCompressionRetriever
# 添加去重过滤
filter = EmbeddingsRedundantFilter(embeddings=self.embeddings)
compression_retriever = ContextualCompressionRetriever(
base_compressor=filter,
base_retriever=base_retriever
)
4. 模型选择建议表
组件 | 推荐模型 | 特点 |
---|---|---|
Embedding | BAAI/bge-small-zh-v1.5 | 中英双语优化,768维度 |
Reranker | BAAI/bge-reranker-base | 与Embedding模型配套使用效果最佳 |
LLM | Qwen-14B-Chat | 中文支持优秀,需本地部署 |
向量数据库 | FAISS | 本地快速检索,适合中小规模知识库 |
5. 扩展建议
- 添加PDF/Word支持:使用
unstructured
库扩展文件格式 - 实现Rerank功能:添加
FlagEmbedding
库调用Rerank模型 - 部署API:使用FastAPI包装为服务接口
- 可视化:集成Gradio构建交互界面