当前位置: 首页 > news >正文

基于LangChain + Milvus 实现RAG

使用 LangChain + Milvus(Lite 本地部署),并用 Hugging Face 的 thenlper/gte-large-zh 做 embedding。方案覆盖:环境准备、Milvus Lite 用法、embedding 实现、文本切片(chunking)、向量和文本入库、检索示例,以及常见注意点与扩展建议。


一、总结(要实现的目标)

用户输入一份大文本 → LangChain 把文本切片成多个 chunk → 用 thenlper/gte-large-zh 对每个 chunk 产生 embedding → 把 embedding 与对应 chunk(和元数据)写入 Milvus Lite(本地文件形式) → 支持后续相似检索 / RAG 等。


二、环境与先决条件

  • 操作系统:Ubuntu / macOS(Milvus Lite 支持)。

  • Python:建议 Python 3.10+。

  • 建议有 GPU(可选),否则在 CPU 上也能跑但较慢(亲测真的很慢)。

  • 需要网络以下载 HuggingFace 模型文件。

关键安装包(下面命令在虚拟环境中执行):

python -m venv venv && source venv/bin/activate
pip install -U pip# 必要包
pip install langchain pymilvus>=2.4.2 transformers torch sentencepiece# 可选:如果想用 langchain 社区的 Milvus 集成(取决于 langchain 版本)
pip install langchain-milvus langchain-community

说明:pymilvus >= 2.4.2 包含 Milvus Lite;使用本地文件 URI 会自动启用 Milvus Lite。(Milvus)


三、设计要点(简要)

  1. Chunk 切分:因为 gte-large-zh 的最大序列长度是 512 tokens(会截断更长的输入),所以 chunk 的长度要小于或等于 512 tokens。中文中 tokens 与字符近似但不完全等价,建议字符级 chunk_size 约 400–500 字符,并使用例如 50–100 的 overlap。这样能兼顾上下文与不超长截断。(Hugging Face)

  2. Embedding 实现:可直接用 transformersAutoTokenizer + AutoModel,从 outputs.last_hidden_state[:,0](CLS)或 mean-pooling 得到向量。thenlper/gte-large-zh 官方示例使用 outputs.last_hidden_state[:,0](CLS token)并做 L2 归一化作为 embedding。embedding 维度为 1024(gte-large-zh)。(Hugging Face)

  3. 向量库:使用 Milvus Lite(本地文件存储)。Milvus Lite 支持 FLAT 索引(受限),适用于原型 / 本地测试;生产上应使用 Milvus Standalone/Cluster。(Milvus)

  4. LangChain 集成:使用 LangChain 的 Milvus 向量存储适配器(示例使用 Milvus.from_documents 或构造器),把 Embeddings 对象传入。(python.langchain.com)


四、完整可运行示例代码

把下面内容保存为 ingest_and_query.py。它包含:读取文本文件、切分、embedding(基于 transformers + CLS pooling)、写入 Milvus Lite(通过 LangChain),并提供一个查询示例。

"""
ingest_and_query.py
Usage:python ingest_and_query.py ingest /path/to/large_text.txtpython ingest_and_query.py query "你的检索问题"
"""import sys
import os
from typing import List, Optional
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.documents import Document
# 如果你的 langchain 版本使用不同命名空间,请调整导入(下面两种导入形式二选一)
try:# newer: langchain-community / langchain_milvusfrom langchain_milvus import Milvus
except Exception:try:from langchain_community.vectorstores import Milvusexcept Exception:# 最后兜底:从 langchain.vectorstores 导入(视版本可能不可用)from langchain.vectorstores import Milvus  # type: ignorefrom langchain.embeddings.base import Embeddings# -------------------------
# Embeddings wrapper using thenlper/gte-large-zh
# -------------------------
class GTEEmbeddings(Embeddings):def __init__(self, model_name: str = "thenlper/gte-large-zh", device: Optional[str] = None, batch_size: int = 8):self.model_name = model_nameself.device = device or ("cuda" if torch.cuda.is_available() else "cpu")self.batch_size = batch_size# load tokenizer + modelself.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)self.model = AutoModel.from_pretrained(model_name).to(self.device)self.model.eval()# model dimension: checked on HF model card (gte-large-zh -> 1024)# self.dim = 1024  # optionaldef _embed_batch(self, texts: List[str]) -> List[List[float]]:all_embs = []with torch.no_grad():for i in range(0, len(texts), self.batch_size):batch = texts[i:i + self.batch_size]batch_inputs = self.tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)outputs = self.model(**batch_inputs)# official example uses CLS token as embedding:embs = outputs.last_hidden_state[:, 0, :]  # shape (B, D)# L2 normalizeembs = F.normalize(embs, p=2, dim=1).cpu()all_embs.extend(embs.tolist())return all_embsdef embed_documents(self, texts: List[str]) -> List[List[float]]:return self._embed_batch(texts)def embed_query(self, text: str) -> List[float]:return self._embed_batch([text])[0]# -------------------------
# Helper: chunk text -> documents
# -------------------------
def chunk_text_to_documents(text: str, chunk_size: int = 400, chunk_overlap: int = 50, source: str = "input") -> List[Document]:splitter = CharacterTextSplitter(separator="\n", chunk_size=chunk_size, chunk_overlap=chunk_overlap)chunks = splitter.split_text(text)docs = []for i, c in enumerate(chunks):docs.append(Document(page_content=c, metadata={"source": source, "chunk": i}))return docs# -------------------------
# Ingest pipeline: read, chunk, embed, write to Milvus Lite via LangChain
# -------------------------
def ingest_file_to_milvus(txt_path: str, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):assert os.path.exists(txt_path), f"{txt_path} not found"with open(txt_path, "r", encoding="utf-8") as f:text = f.read()print("Splitting text...")docs = chunk_text_to_documents(text, chunk_size=400, chunk_overlap=80, source=os.path.basename(txt_path))print(f"Created {len(docs)} chunks.")print("Loading embedding model (this may take a while)...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")print("Writing to Milvus Lite (via LangChain)...")# from_documents 会对 documents 使用 embedding 计算向量并写入 Milvusvectorstore = Milvus.from_documents(documents=docs,embeddings=embeddings,collection_name=collection_name,connection_args={"uri": uri},drop_old=True,  # 如果希望覆盖旧 collection,设为 True)print("Ingest finished.")return vectorstore# -------------------------
# Query demo
# -------------------------
def query_milvus(query: str, k: int = 3, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):print("Loading embeddings and vectorstore handle...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")vs = Milvus(embeddings=embeddings, collection_name=collection_name, connection_args={"uri": uri})print("Searching...")docs = vs.similarity_search(query, k=k)for i, d in enumerate(docs):print(f"=== Result {i+1} ===")print("Score/metadata:", d.metadata)print(d.page_content[:600].rstrip())print()# -------------------------
# CLI
# -------------------------
def main():if len(sys.argv) < 2:print(__doc__)returncmd = sys.argv[1]if cmd == "ingest":if len(sys.argv) < 3:print("Usage: ingest /path/to/large_text.txt")returningest_file_to_milvus(sys.argv[2])elif cmd == "query":if len(sys.argv) < 3:print('Usage: query "你的检索问题"')returnquery = sys.argv[2]query_milvus(query)else:print("Unknown command:", cmd)if __name__ == "__main__":main()

五、如何运行(示例)

  1. 启动虚拟环境并安装依赖(见上文)。

  2. 保存上面的脚本 ingest_and_query.py

  3. 准备一个大文本文件 large_text.txt(UTF-8)。

  4. 执行入库:

python ingest_and_query.py ingest ./large_text.txt
# 这会创建本地 Milvus Lite 文件: ./milvus_demo.db
  1. 运行检索示例:

python ingest_and_query.py query "中国的首都是哪里?"

六、设计与工程注意点(重要)

  1. Chunk 大小与模型截断:gte-large-zh 最大 512 tokens,会截断更长文本,所以请确保每个 chunk 在这个长度内(示例用 400 字符 + overlap)。若你想在 chunk 中保留更多语义,可做层次切分(先大段拆为若干段,再在需要时再二次细拆)。(Hugging Face)

  2. 向量维度 & 集合 schema:gte-large-zh 输出维度 1024。Milvus collection 的 dimension 要与之对应(LangChain 会处理此细节,但如果你用 pymilvus 原生接口建 collection,要设置 dimension=1024)。(Hugging Face)

  3. Milvus Lite 限制:Milvus Lite 只适合原型/本地测试,只支持 FLAT 索引、不支持 partition 的一些高级特性(比如用于多租户的 partition_key)。若数据量大或需要 partition、高级索引,请部署 Milvus Standalone/Cluster。(Milvus)

  4. 持久化:Milvus Lite 使用本地文件(e.g. ./milvus_demo.db),数据会持久化到该文件。你可以用 milvus-lite dump 导出数据迁移到集群。(Milvus)

  5. 性能 / 批量:对大量文本要批量计算 embedding 并批量 insert(脚本里用 LangChain.from_documents,内部会分批),如果数据量很大,考虑使用 pymilvus[bulk_writer] 的 bulk 插入或把 embedding 计算分布到多台机器上。(Milvus)


七、常见坑与排查

  • 如果 huggingface 下载模型太慢,可先预先 transformers 下载并缓存,或使用镜像/代理。

  • 若模型加载报错 trust_remote_code,可在 from_pretrained 中传 trust_remote_code=True(若模型包含自定义架构)。但 gte-large-zh 通常可以直接加载。(Hugging Face)

  • 若 LangChain 的 Milvus 类导入失败,请检查你安装的包名(langchain-milvus / langchain-community / langchain_milvus 的 API 随版本变动)。如果出问题,可直接使用 pymilvus 的原生 API:MilvusClient("./milvus_demo.db") 创建、create_collectioninsert 等操作(官方文档给有示例)。(Milvus, python.langchain.com)


八、扩展建议(当你准备从本地原型走向生产)

  1. 切换到 Milvus Standalone/Distributed:支持更高级索引(IVF_PQ、HNSW 等)、partition、并发等。(python.langchain.com)

  2. 索引类型调优:FLAT 适用于小规模,数据变多后换更快的近似索引并调参(nlist / nprobe 等)。

  3. 多租户 / 权限:Milvus Lite 不支持用户/角色;生产环境通过 Milvus 服务或 Zilliz Cloud 管理。(Milvus)

  4. Embedding 服务化:若多人并发或需要降低模型部署成本,可把 embedding 模型放到一个推理服务(如 xinference、TorchServe、HF Inference、ONNX或自建 FastAPI 服务),LangChain 侧只调用该服务拿 embedding。

  5. 缓存 & 重用:对重复 chunk 做去重/缓存,避免重复计算 embedding。可以基于 chunk 的 hash 来判断是否已存在。


九、参考(关键文档)

  • Milvus Lite 文档(如何本地运行、API 示例):官方 Run Milvus Lite。(Milvus)

  • Milvus + LangChain 集成文档(示例、from_documents、connection_args 用法):Milvus 与 LangChain 集成。(python.langchain.com)

  • thenlper/gte-large-zh 模型页(max seq len=512, dimension=1024,示例用 CLS token):Hugging Face 模型页面。(Hugging Face)

http://www.dtcms.com/a/344893.html

相关文章:

  • 升级 Docker Compose 到最新版本:从安装到验证全指南
  • SOLIDWORKS 2025智能工具优化设计流程
  • 数据结构: 2-3 树的删除操作 (Deletion)
  • Maven的概念与Maven项目的创建
  • 线程异步操作
  • LoRA内部原理代码解析(52)
  • 【笔记】动手学Ollama 第七章 应用案例 Agent应用
  • SpringBoot项目创建的五种方式
  • 线性回归:机器学习中的基石
  • Unreal Engine UE_LOG
  • BigData大数据应用开发学习笔记(04)离线处理--离线分析Spark SQL
  • 用 Go 从零实现一个简易负载均衡器
  • SSM从入门到实战: 2.7 MyBatis与Spring集成
  • 计算机内存中的整型存储奥秘、大小端字节序及其判断方法
  • Bluedroid vs NimBLE
  • 北京-测试-入职甲方金融-上班第三天
  • AR眼镜巡检系统在工业互联网的应用:AR+IoT
  • JAVA后端开发——API状态字段设计规范与实践
  • 目标检测数据集转换为图像分类数据集
  • Pandas中的SettingWithCopyWarning警告出现原因及解决方法
  • 共享内存详细解释
  • 前端在WebSocket中加入Token的方法
  • 12-Linux系统用户管理及基础权限
  • 塞尔达传说 王国之泪 PC/手机双端 免安装中文版
  • celery
  • C语言翻译环境作业
  • 大学校园安消一体化平台——多警合一实现智能联动与网格化管理
  • 【链表 - LeetCode】19. 删除链表的倒数第 N 个结点
  • Android.mk 基础
  • Electron 核心 API 全解析:从基础到实战场景