基于LangChain + Milvus 实现RAG
使用 LangChain + Milvus(Lite 本地部署),并用 Hugging Face 的 thenlper/gte-large-zh
做 embedding。方案覆盖:环境准备、Milvus Lite 用法、embedding 实现、文本切片(chunking)、向量和文本入库、检索示例,以及常见注意点与扩展建议。
一、总结(要实现的目标)
用户输入一份大文本 → LangChain 把文本切片成多个 chunk → 用 thenlper/gte-large-zh
对每个 chunk 产生 embedding → 把 embedding 与对应 chunk(和元数据)写入 Milvus Lite(本地文件形式) → 支持后续相似检索 / RAG 等。
二、环境与先决条件
-
操作系统:Ubuntu / macOS(Milvus Lite 支持)。
-
Python:建议 Python 3.10+。
-
建议有 GPU(可选),否则在 CPU 上也能跑但较慢(亲测真的很慢)。
-
需要网络以下载 HuggingFace 模型文件。
关键安装包(下面命令在虚拟环境中执行):
python -m venv venv && source venv/bin/activate
pip install -U pip# 必要包
pip install langchain pymilvus>=2.4.2 transformers torch sentencepiece# 可选:如果想用 langchain 社区的 Milvus 集成(取决于 langchain 版本)
pip install langchain-milvus langchain-community
说明:pymilvus
>= 2.4.2 包含 Milvus Lite;使用本地文件 URI 会自动启用 Milvus Lite。(Milvus)
三、设计要点(简要)
-
Chunk 切分:因为
gte-large-zh
的最大序列长度是 512 tokens(会截断更长的输入),所以 chunk 的长度要小于或等于 512 tokens。中文中 tokens 与字符近似但不完全等价,建议字符级 chunk_size 约 400–500 字符,并使用例如 50–100 的 overlap。这样能兼顾上下文与不超长截断。(Hugging Face) -
Embedding 实现:可直接用
transformers
的AutoTokenizer
+AutoModel
,从outputs.last_hidden_state[:,0]
(CLS)或 mean-pooling 得到向量。thenlper/gte-large-zh
官方示例使用outputs.last_hidden_state[:,0]
(CLS token)并做 L2 归一化作为 embedding。embedding 维度为 1024(gte-large-zh)。(Hugging Face) -
向量库:使用 Milvus Lite(本地文件存储)。Milvus Lite 支持 FLAT 索引(受限),适用于原型 / 本地测试;生产上应使用 Milvus Standalone/Cluster。(Milvus)
-
LangChain 集成:使用 LangChain 的 Milvus 向量存储适配器(示例使用
Milvus.from_documents
或构造器),把Embeddings
对象传入。(python.langchain.com)
四、完整可运行示例代码
把下面内容保存为 ingest_and_query.py
。它包含:读取文本文件、切分、embedding(基于 transformers + CLS pooling)、写入 Milvus Lite(通过 LangChain),并提供一个查询示例。
"""
ingest_and_query.py
Usage:python ingest_and_query.py ingest /path/to/large_text.txtpython ingest_and_query.py query "你的检索问题"
"""import sys
import os
from typing import List, Optional
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.documents import Document
# 如果你的 langchain 版本使用不同命名空间,请调整导入(下面两种导入形式二选一)
try:# newer: langchain-community / langchain_milvusfrom langchain_milvus import Milvus
except Exception:try:from langchain_community.vectorstores import Milvusexcept Exception:# 最后兜底:从 langchain.vectorstores 导入(视版本可能不可用)from langchain.vectorstores import Milvus # type: ignorefrom langchain.embeddings.base import Embeddings# -------------------------
# Embeddings wrapper using thenlper/gte-large-zh
# -------------------------
class GTEEmbeddings(Embeddings):def __init__(self, model_name: str = "thenlper/gte-large-zh", device: Optional[str] = None, batch_size: int = 8):self.model_name = model_nameself.device = device or ("cuda" if torch.cuda.is_available() else "cpu")self.batch_size = batch_size# load tokenizer + modelself.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)self.model = AutoModel.from_pretrained(model_name).to(self.device)self.model.eval()# model dimension: checked on HF model card (gte-large-zh -> 1024)# self.dim = 1024 # optionaldef _embed_batch(self, texts: List[str]) -> List[List[float]]:all_embs = []with torch.no_grad():for i in range(0, len(texts), self.batch_size):batch = texts[i:i + self.batch_size]batch_inputs = self.tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)outputs = self.model(**batch_inputs)# official example uses CLS token as embedding:embs = outputs.last_hidden_state[:, 0, :] # shape (B, D)# L2 normalizeembs = F.normalize(embs, p=2, dim=1).cpu()all_embs.extend(embs.tolist())return all_embsdef embed_documents(self, texts: List[str]) -> List[List[float]]:return self._embed_batch(texts)def embed_query(self, text: str) -> List[float]:return self._embed_batch([text])[0]# -------------------------
# Helper: chunk text -> documents
# -------------------------
def chunk_text_to_documents(text: str, chunk_size: int = 400, chunk_overlap: int = 50, source: str = "input") -> List[Document]:splitter = CharacterTextSplitter(separator="\n", chunk_size=chunk_size, chunk_overlap=chunk_overlap)chunks = splitter.split_text(text)docs = []for i, c in enumerate(chunks):docs.append(Document(page_content=c, metadata={"source": source, "chunk": i}))return docs# -------------------------
# Ingest pipeline: read, chunk, embed, write to Milvus Lite via LangChain
# -------------------------
def ingest_file_to_milvus(txt_path: str, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):assert os.path.exists(txt_path), f"{txt_path} not found"with open(txt_path, "r", encoding="utf-8") as f:text = f.read()print("Splitting text...")docs = chunk_text_to_documents(text, chunk_size=400, chunk_overlap=80, source=os.path.basename(txt_path))print(f"Created {len(docs)} chunks.")print("Loading embedding model (this may take a while)...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")print("Writing to Milvus Lite (via LangChain)...")# from_documents 会对 documents 使用 embedding 计算向量并写入 Milvusvectorstore = Milvus.from_documents(documents=docs,embeddings=embeddings,collection_name=collection_name,connection_args={"uri": uri},drop_old=True, # 如果希望覆盖旧 collection,设为 True)print("Ingest finished.")return vectorstore# -------------------------
# Query demo
# -------------------------
def query_milvus(query: str, k: int = 3, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):print("Loading embeddings and vectorstore handle...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")vs = Milvus(embeddings=embeddings, collection_name=collection_name, connection_args={"uri": uri})print("Searching...")docs = vs.similarity_search(query, k=k)for i, d in enumerate(docs):print(f"=== Result {i+1} ===")print("Score/metadata:", d.metadata)print(d.page_content[:600].rstrip())print()# -------------------------
# CLI
# -------------------------
def main():if len(sys.argv) < 2:print(__doc__)returncmd = sys.argv[1]if cmd == "ingest":if len(sys.argv) < 3:print("Usage: ingest /path/to/large_text.txt")returningest_file_to_milvus(sys.argv[2])elif cmd == "query":if len(sys.argv) < 3:print('Usage: query "你的检索问题"')returnquery = sys.argv[2]query_milvus(query)else:print("Unknown command:", cmd)if __name__ == "__main__":main()
五、如何运行(示例)
-
启动虚拟环境并安装依赖(见上文)。
-
保存上面的脚本
ingest_and_query.py
。 -
准备一个大文本文件
large_text.txt
(UTF-8)。 -
执行入库:
python ingest_and_query.py ingest ./large_text.txt
# 这会创建本地 Milvus Lite 文件: ./milvus_demo.db
-
运行检索示例:
python ingest_and_query.py query "中国的首都是哪里?"
六、设计与工程注意点(重要)
-
Chunk 大小与模型截断:gte-large-zh 最大 512 tokens,会截断更长文本,所以请确保每个 chunk 在这个长度内(示例用 400 字符 + overlap)。若你想在 chunk 中保留更多语义,可做层次切分(先大段拆为若干段,再在需要时再二次细拆)。(Hugging Face)
-
向量维度 & 集合 schema:gte-large-zh 输出维度 1024。Milvus collection 的 dimension 要与之对应(LangChain 会处理此细节,但如果你用 pymilvus 原生接口建 collection,要设置
dimension=1024
)。(Hugging Face) -
Milvus Lite 限制:Milvus Lite 只适合原型/本地测试,只支持 FLAT 索引、不支持 partition 的一些高级特性(比如用于多租户的 partition_key)。若数据量大或需要 partition、高级索引,请部署 Milvus Standalone/Cluster。(Milvus)
-
持久化:Milvus Lite 使用本地文件(e.g.
./milvus_demo.db
),数据会持久化到该文件。你可以用milvus-lite dump
导出数据迁移到集群。(Milvus) -
性能 / 批量:对大量文本要批量计算 embedding 并批量 insert(脚本里用 LangChain.from_documents,内部会分批),如果数据量很大,考虑使用
pymilvus[bulk_writer]
的 bulk 插入或把 embedding 计算分布到多台机器上。(Milvus)
七、常见坑与排查
-
如果 huggingface 下载模型太慢,可先预先
transformers
下载并缓存,或使用镜像/代理。 -
若模型加载报错
trust_remote_code
,可在from_pretrained
中传trust_remote_code=True
(若模型包含自定义架构)。但gte-large-zh
通常可以直接加载。(Hugging Face) -
若 LangChain 的
Milvus
类导入失败,请检查你安装的包名(langchain-milvus
/langchain-community
/langchain_milvus
的 API 随版本变动)。如果出问题,可直接使用pymilvus
的原生 API:MilvusClient("./milvus_demo.db")
创建、create_collection
、insert
等操作(官方文档给有示例)。(Milvus, python.langchain.com)
八、扩展建议(当你准备从本地原型走向生产)
-
切换到 Milvus Standalone/Distributed:支持更高级索引(IVF_PQ、HNSW 等)、partition、并发等。(python.langchain.com)
-
索引类型调优:FLAT 适用于小规模,数据变多后换更快的近似索引并调参(nlist / nprobe 等)。
-
多租户 / 权限:Milvus Lite 不支持用户/角色;生产环境通过 Milvus 服务或 Zilliz Cloud 管理。(Milvus)
-
Embedding 服务化:若多人并发或需要降低模型部署成本,可把 embedding 模型放到一个推理服务(如
xinference
、TorchServe、HF Inference、ONNX或自建 FastAPI 服务),LangChain 侧只调用该服务拿 embedding。 -
缓存 & 重用:对重复 chunk 做去重/缓存,避免重复计算 embedding。可以基于 chunk 的 hash 来判断是否已存在。
九、参考(关键文档)
-
Milvus Lite 文档(如何本地运行、API 示例):官方 Run Milvus Lite。(Milvus)
-
Milvus + LangChain 集成文档(示例、from_documents、connection_args 用法):Milvus 与 LangChain 集成。(python.langchain.com)
-
thenlper/gte-large-zh 模型页(max seq len=512, dimension=1024,示例用 CLS token):Hugging Face 模型页面。(Hugging Face)