从零搭建RAG应用:跳过LangChain,掌握文本分块、向量检索、指代消解等核心技术实现
从零搭建RAG应用:跳过LangChain,掌握文本分块、向量检索、指代消解等核心技术实现
RAG(检索增强生成)本质上就是给AI模型外挂一个知识库。平常用ChatGPT只能基于训练数据回答问题,但RAG可以让它查阅你的专有文档——不管是内部报告、技术文档还是业务资料,都能成为AI的参考资源。
很多人第一反应是用LangChain或LlamaIndex这些现成框架,确实能快速搭起来。但自己实现的核心价值在于:你能清楚知道文档是怎么被切分的、向量是怎么生成的、检索逻辑具体怎么跑的。
当系统出现检索不准确、回答质量差、成本过高这些问题时,你能精确定位到是哪个环节的问题。比如是分块策略不合适,还是embedding模型选择有问题,或者是检索参数需要调整。用框架的话,很多时候只能盲目调参数,治标不治本。
另外业务场景往往有特殊需求:PDF表格要特殊处理、某些文档类型需要提取特定元数据、检索结果要按业务规则重排序等等。自己实现就能在任何环节做针对性优化,而不是被框架的设计限制住。
下面我们开始一步一步的进行:
文档解析:让机器能读懂你的文件
做RAG第一步是把各种格式的文档统一处理成纯文本。PDF、Word、txt这些常见格式各有各的解析方式。
import os
import PyPDF2
import docx def load_plain_text(file_path: str) -> str: """Load and return the full contents of a .txt file.""" with open(file_path, 'r', encoding='utf-8') as fp: return fp.read() def extract_text_from_pdf(file_path: str) -> str: """Read every page of a PDF and stitch the text together.""" texts = [] with open(file_path, 'rb') as fp: reader = PyPDF2.PdfReader(fp) for pg in reader.pages: # ensure we don't end up with None page_txt = pg.extract_text() or "" texts.append(page_txt) # separate pages with a newline return "\n".join(texts) def extract_text_from_docx(file_path: str) -> str: """Grab all paragraphs from a .docx document.""" doc = docx.Document(file_path) paras = [p.text for p in doc.paragraphs] return "\n".join(paras)
然后写个统一的路由器,根据文件后缀调用对应的解析函数:
import os def load_document(file_path: str): """Load a document's text based on its file extension.""" _, extension = os.path.splitext(file_path) extension = extension.lower() if extension == '.txt': return read_text_file(file_path) elif extension == '.pdf': return read_pdf_file(file_path) elif extension == '.docx': return read_docx_file(file_path) else: raise ValueError(f"Unsupported file type: {extension}")
这样设计的好处是:文档结构保持完整(用换行符分隔段落),支持各种大小写的文件扩展名,不支持的格式会给出明确提示。
文本分块:把长文档切成合适的片段
因为有上下文的的限制,所以直接把整篇文档丢给LLM就像让人一口吃下整个pizza——不现实。所以需要把文档切成适当大小的块。
def chunk_sentences(text: str, max_length: int = 500) -> list[str]: """Split text into size-limited chunks, breaking only at sentence boundaries.""" # Normalize whitespace and split on basic sentence delimiter segments = text.replace('\n', ' ').split('. ') blocks = [] buffer = [] buffer_len = 0 for segment in segments: seg = segment.strip() if not seg: continue # skip empty strings # Make sure each segment ends with a period if not seg.endswith('.'): seg += '.' seg_len = len(seg) # If adding this segment would exceed max_length, flush the buffer first if buffer and buffer_len + seg_len > max_length: blocks.append(' '.join(buffer)) buffer = [seg] buffer_len = seg_len else: buffer.append(seg) buffer_len += seg_len # Append any remaining sentences if buffer: blocks.append(' '.join(buffer)) return blocks
块大小选择是个权衡问题。小块(200-500字符)适合精确匹配,像索引卡一样;中等块(500-1000字符)能保留更多上下文;大块(1000+字符)上下文丰富但可能模糊焦点。技术文档通常用小块效果更好,叙述性内容可以用大一些的块,并且分块还可以有很多策略可选,在以前的文章中都有总结。
搭建向量数据库:用ChromaDB存储语义信息
文档切块后需要存到向量数据库里,这样才能做语义搜索。ChromaDB是个不错的选择,轻量但功能够用。
import chromadb
from chromadb.utils import embedding_functions # Persistent storage - saves data between sessions
client = chromadb.PersistentClient(path="chroma_db") # Our "brain" for understanding text meaning
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction( model_name="all-MiniLM-L6-v2" # Compact but powerful
) # Create our knowledge repository
collection = client.get_or_create_collection( name="documents_collection", embedding_function=sentence_transformer_ef
)
这里用了几个核心组件:PersistentClient
确保数据持久化存储,重启程序后数据还在;SentenceTransformerEmbeddingFunction
把文本转成向量,让机器理解语义;all-MiniLM-L6-v2
是个轻量级但效果不错的embedding模型。
文档索引:批量处理和元数据管理
接下来要把文档批量处理成可检索的格式。每个文本块都需要唯一ID和元数据,方便后续溯源。
def build_knowledge_units(path: str): """Ingest a file, break it into chunks, and tag each piece with metadata.""" try: # Pull in the raw text raw = load_document(path) # Break the text into bite-sized segments segments = partition_text(raw) # Grab just the filename for provenance name = os.path.basename(path) # Assemble metadata dicts for each segment metadata_records = [ {"source_file": name, "segment_index": idx} for idx in range(len(segments)) ] # Generate a stable ID for each piece unique_keys = [ f"{name}_seg_{idx}" for idx in range(len(segments)) ] return unique_keys, segments, metadata_records except Exception as err: print(f"Failed to process '{path}': {err}") # Return empty lists so downstream code can continue safely return [], [], []
为了提高效率,批量插入比单条插入快很多:
def batch_insert_into_store(store, record_ids, contents, metadata_list): """Insert items into the vector store in optimized batches.""" batch_size = 100 # tuned for ChromaDB throughput for start_idx in range(0, len(contents), batch_size): stop_idx = min(start_idx + batch_size, len(contents)) store.add( documents=contents[start_idx:stop_idx], # text chunks metadatas=metadata_list[start_idx:stop_idx],# chunk metadata ids=record_ids[start_idx:stop_idx] # unique chunk IDs ) def ingest_folder(store, directory: str): """Walk a directory, process each file, and load into the store.""" # gather only regular files entries = [ os.path.join(directory, name) for name in os.listdir(directory) if os.path.isfile(os.path.join(directory, name)) ] for path in entries: filename = os.path.basename(path) print(f"► Processing {filename} …") ids, contents, metadata_list = process_document(path) if contents: batch_insert_into_store(store, ids, contents, metadata_list) print(f"✔ Loaded {len(contents)} chunks from {filename}")
实际运行时你会看到这样的输出:
► Processing customer_faqs.pdf …
✔ Loaded 51 chunks from customer_faqs.pdf ► Processing onboarding_guide.docx …
✔ Loaded 20 chunks from onboarding_guide.docx
语义检索:找到最相关的内容块
有了向量数据库,就能进行语义搜索了。不是简单的关键词匹配,而是理解查询的语义含义。
def run_semantic_query(collection, query: str, top_k: int = 2): """Run a semantic search to find the most relevant chunks.""" return collection.query( query_texts=[query], # The actual search query n_results=top_k # Number of matches to return ) def build_context_and_citations(results): """Combine matched chunks and reference their original sources.""" combined_text = "\n\n".join(results['documents'][0]) references = [ f"{meta['source']} (chunk {meta['chunk']})" for meta in results['metadatas'][0] ] return combined_text, references
想了解底层发生了什么,可以看看搜索结果的详细信息:
def display_search_hits(results): """Nicely formatted display of search output for readability.""" print("\nTop Matches\n" + "=" * 50) hits = results['documents'][0] metadata = results['metadatas'][0] scores = results['distances'][0] for idx in range(len(hits)): snippet = hits[idx] info = metadata[idx] score = scores[idx] print(f"\nMatch #{idx + 1}") print(f"From: {info['source']} — Chunk {info['chunk']}") print(f"Similarity Score: {1 - score:.2f} / 1.00") print(f"Excerpt: {snippet[:150]}...\n")
搜索结果会显示相似度分数和来源文档,帮你判断检索质量。
接入LLM:让模型基于检索内容回答
检索到相关内容后,就要让LLM基于这些内容生成回答。关键是构造好的prompt,确保模型只基于提供的上下文回答。
import os
from openai import OpenAI # Initialize the OpenAI client
client = OpenAI() # Set your OpenAI API key
os.environ["OPENAI_API_KEY"] = "your_api_key" # Replace this with your actual key def build_prompt(context: str, question: str) -> str: """Construct a focused prompt using context and a user question.""" return f"""You are a helpful assistant. Use only the context provided below to answer.
If the answer cannot be found in the context, reply with "I don't have that information." Context:
{context} Question: {question} Answer:""" def ask_openai(question: str, context: str) -> str: """Send the prompt to OpenAI and return the generated response.""" prompt = build_prompt(context, question) try: reply = client.chat.completions.create( model="gpt-4-turbo", messages=[ {"role": "system", "content": "You answer based strictly on the context provided."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=300 ) return reply.choices[0].message.content except Exception as err: return f"Error: {str(err)}"
temperature参数控制生成的随机性:0.0最保守只输出事实,0.5平衡事实和表达,1.0最有创造性。对RAG来说,0.0到0.3比较合适,保证回答基于文档内容。
对话记忆:让AI记住聊天历史
ChatGPT能记住对话上下文,我们的RAG系统也需要这个能力。实现起来其实不复杂,就是维护一个会话状态。
import uuid
from datetime import datetime # In-memory chat log (swap with persistent storage in production)
chat_sessions = {} def start_new_session() -> str: """Initialize a fresh conversation session with a unique ID.""" session_id = str(uuid.uuid4()) chat_sessions[session_id] = [] return session_id def log_message(session_id: str, sender: str, message: str): """Add a message to the session history.""" if session_id not in chat_sessions: chat_sessions[session_id] = [] chat_sessions[session_id].append({ "role": sender, "content": message, "timestamp": datetime.now().isoformat() }) def fetch_recent_messages(session_id: str, limit: int = 5): """Return the last few messages from a session.""" msgs = chat_sessions.get(session_id, []) return msgs[-limit:] def prepare_history_for_model(messages: list) -> str: """Convert messages into a single formatted string.""" return "\n".join( f"{msg['role'].capitalize()}: {msg['content']}" for msg in messages )
这样设计后,每个用户的对话都有独立的session_id,每条消息都会记录到历史中,生成回答时可以参考最近的几条消息作为上下文。
解决指代消解:理解"它"、"那个"指的是什么
用户经常会问一些不完整的问题,比如先问"LaunchPad项目是什么",接着问"它什么时候开始的"。这里的"它"显然指LaunchPad,但AI不知道。需要把后续问题重写成独立完整的问题。
def rewrite_query_with_context(query: str, chat_log: str, client: OpenAI) -> str: """Rewrites a follow-up query as a full standalone question using prior conversation.""" prompt = f"""Rephrase follow-up questions to be fully self-contained.
Refer to the chat history as needed. Return only the rewritten question. Chat History:
{chat_log} Follow-up: {query}
Standalone Question:""" try: response = client.chat.completions.create( model="gpt-3.5-turbo", # Fast, cheap, reliable messages=[{"role": "user", "content": prompt}], temperature=0 # Keep output consistent ) return response.choices[0].message.content except Exception as err: print(f"Failed to contextualize query: {err}") return query # Return original if there's an error
这样"它什么时候开始的"就能自动变成"LaunchPad项目什么时候开始的",确保搜索能找到正确的内容。
完整的对话流程:把所有组件串起来
最后把所有功能整合成一个完整的对话式RAG系统:
def handle_conversational_query( collection, query: str, session_id: str, n_chunks: int = 3
): """Orchestrates the full RAG-based QA flow in a chat session.""" # Step 1: Pull session history and prep it for context injection chat_log = get_conversation_history(session_id) prior_messages = format_history(chat_log) # Step 2: Resolve pronouns or unclear references in the query refined_query = contextualize_query(query, prior_messages, client) print(f"[Refined Query] {refined_query}") # Step 3: Retrieve relevant knowledge from the vector DB search_results = run_semantic_query(collection, refined_query, n_chunks) retrieved_text, citations = build_context_and_citations(search_results) # Step 4: Generate an answer grounded in retrieved content answer = generate_response(refined_query, retrieved_text) # Step 5: Save both user input and AI reply into memory add_message(session_id, "user", query) add_message(session_id, "assistant", answer) return answer, citations
实际使用时的对话流程:
session = start_conversation() # 初始查询
q1 = "LaunchPad是做什么的?"
reply, refs = smart_retrieval(collection, q1, session)
print(f"Answer: {reply}\nSources: {refs}") # 后续查询
q2 = "它什么时候开始的?"
reply, refs = smart_retrieval(collection, q2, session)
print(f"Answer: {reply}\nSources: {refs}")
系统会自动处理指代消解,输出类似这样:
Contextualized:LaunchPad是做什么的?
Answer:LaunchPad帮助初创公司快速原型设计和验证产品想法。
Sources:['program_overview.pdf (chunk 1)'] Contextualized:LaunchPad项目什么时候开始的?
Answer:LaunchPad项目始于2018年。
Sources:['timeline_notes.txt (chunk 3)']
几个实用的优化技巧
做完基础功能后,还有一些进阶优化可以考虑。
1、混合搜索能结合语义搜索和元数据过滤:
# 在语义搜索中添加元数据过滤
collection.query( query_texts=[query], n_results=5, where={"department": "HR"} # 只搜索HR部门的文档
)
2、自动添加来源引用让用户知道答案的出处:
# 在回答后自动添加来源链接
def enhance_response(response, sources): return f"{response}\n\n来源:\n" + "\n".join( f"- {source}" for source in sources )
3、根据文档类型调整分块策略:
# 根据文档类型动态调整块大小
if "financial" in file_name: chunks = split_text(content, chunk_size=300) # 财务文档用小块
else: chunks = split_text(content, chunk_size=600) # 其他文档用大块
4、长对话需要压缩历史:
# 当对话历史太长时自动摘要
def summarize_history(history): prompt = f"总结以下对话的关键信息:\n{history}" return client.chat.completions.create(/*...*/).choices[0].message.content
总结
从零实现RAG系统确实比用现成框架麻烦一些,但带来的好处很明显。
你对每个环节都有完全控制权,可以根据具体需求精确调优。出了问题能快速定位,不用在框架的抽象层里瞎猜。成本也更透明,每个API调用、每个token都在你掌控之中。更重要的是你真正理解了RAG的工作原理,而不是只会调用几个封装好的函数。这种理解在遇到复杂场景时价值巨大。
虽然初期投入的时间多一些,但长期来看绝对值得。特别是对于有特定需求的业务场景,自实现的灵活性是框架无法比拟的。
https://avoid.overfit.cn/post/a9251c8e996b4c24b1b9536537b0c936
作者:Abdur Rahman