本地 Graph-RAG(图 + RAG) 部署与使用落地方案
把文本切片后用 thenlper/gte-large-zh
做 embedding 写入 Milvus(向量库),同时把切片作为图节点写入图数据库(Neo4j),基于向量检索 + 图遍历(Graph expansion)得到上下文,再由本地生成模型做 RAG 答复 — 即 Graph-RAG。
一、系统总体架构(概念图)
(说明)用户输入 → 切分成 chunk → embedding → (1)向量写入 Milvus;(2)作为节点写入 Neo4j,并建立关系(顺序关系、语义相似关系、实体关系等) → 查询时:先用向量搜索得到候选 chunk(seed nodes),再在图上扩展(walk / BFS / personalized PageRank)获得更多上下文 → 把最终候选上下文拼接到 prompt 送入本地生成模型生成答案(或摘要)。
关键组件:
文本切片 & 入库服务(Python 脚本)
Embedding:
thenlper/gte-large-zh
(本地 HF 模型,用于 embedding)向量库:Milvus(本地部署,若轻量可用 Milvus Lite;建议示例用 Milvus Standalone docker)
图数据库:Neo4j(存储片段节点、关系、元数据)
生成模型(可本地推理的 LLM,例如 ChatGLM/baichuan/vicuna 等量化模型或 HF 可用的生成模型)
一个 API 层(FastAPI)提供 ingest / query / admin 接口(可选)
二、部署方式(快速一键 Docker Compose 示例)
把下面 docker-compose.yml
保存并运行(示例包含 Milvus Standalone、Neo4j、Redis 可选缓存)。
version: "3.8"
services:milvus:image: milvusdb/milvus:latestcontainer_name: milvusports:- "19530:19530"- "19121:19121"environment:- TZ=Asia/Shanghaivolumes:- ./volumes/milvus:/var/lib/milvusrestart: unless-stoppedneo4j:image: neo4j:latestcontainer_name: neo4jports:- "7474:7474" # browser- "7687:7687" # boltenvironment:- NEO4J_AUTH=neo4j/neo4jpasswordvolumes:- ./volumes/neo4j:/datarestart: unless-stoppedredis:image: redis:7container_name: redisports:- "6379:6379"restart: unless-stopped
启动:
docker compose up -d
验证:
Milvus:
localhost:19530
(pymilvus 能连)Neo4j: 浏览器访问
http://localhost:7474
,账号neo4j
/neo4jpassword
三、Python 环境与依赖
建议 Python ≥ 3.10。创建虚拟环境并安装:
python -m venv venv && source venv/bin/activate
pip install -U pip
pip install transformers torch sentencepiece langchain pymilvus neo4j numpy scipy faiss-cpu fastapi uvicorn redis
# 如果要用 langchain 的 Milvus 适配器也可以 pip install langchain-milvus / langchain-community
如果使用 GPU,请安装对应的
torch
版本与 CUDA 支持包。
四、数据建模(Milvus collection + Neo4j node schema)
Milvus collection(示例):
主键
doc_id
(int64)embedding
(float_vector,dim=1024)chunk_text
(varchar / string)meta
(json / varchar,保存 source、offset 等)
Neo4j 节点(Label: Chunk):
chunk_id
(唯一,与 Milvus doc_id 对应)text
(chunk 文本)source
,pos
,length
,hash
等元数据
关系(Relationship):(:Chunk)-[:NEXT]->(:Chunk)
顺序关系(:Chunk)-[:SIMILAR {score: 0.9}]->(:Chunk)
语义相似关系可按需加入
MENTIONS_ENTITY
或BELONGS_TO_DOC
等关系
五、入库(ingest)示例脚本
ingest_graph_rag.py
(要点:切分 → embed → 写 Milvus → 写 Neo4j → 建关系)
# ingest_graph_rag.py
from typing import List
import os, hashlib, time
import numpy as np
import torch, torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from pymilvus import connections, CollectionSchema, FieldSchema, DataType, Collection, utility
from neo4j import GraphDatabase
from langchain.text_splitter import CharacterTextSplitter# ---------- 配置 ----------
MILVUS_HOST = "localhost"
MILVUS_PORT = 19530
COLLECTION_NAME = "graphrag_chunks"
DIM = 1024 # gte-large-zh 输出维度
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASS = "neo4jpassword"# ---------- Embedding class (gte-large-zh) ----------
class GTEEmbedder:def __init__(self, model_name="thenlper/gte-large-zh", device=None, batch_size=8):self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")self.batch = batch_sizeself.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)self.model = AutoModel.from_pretrained(model_name).to(self.device)self.model.eval()def embed_texts(self, texts: List[str]):embs = []with torch.no_grad():for i in range(0, len(texts), self.batch):b = texts[i:i+self.batch]tokens = self.tokenizer(b, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)out = self.model(**tokens)v = out.last_hidden_state[:,0,:] # CLSv = F.normalize(v, p=2, dim=1).cpu().numpy()embs.append(v)return np.vstack(embs)# ---------- Milvus helper ----------
def ensure_milvus_collection():connections.connect("default", host=MILVUS_HOST, port=str(MILVUS_PORT))if utility.has_collection(COLLECTION_NAME):coll = Collection(COLLECTION_NAME)return coll# schemafields = [FieldSchema(name="doc_id", dtype=DataType.INT64, is_primary=True, auto_id=False),FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIM),FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),FieldSchema(name="meta", dtype=DataType.VARCHAR, max_length=65535),]schema = CollectionSchema(fields, description="Graph-RAG chunks")coll = Collection(COLLECTION_NAME, schema=schema)# create index if desiredcoll.create_index("embedding", {"index_type":"IVF_FLAT","metric_type":"L2","params":{"nlist":1024}})coll.load()return coll# ---------- Neo4j helper ----------
class Neo4jClient:def __init__(self, uri, user, pw):self.driver = GraphDatabase.driver(uri, auth=(user, pw))def close(self): self.driver.close()def create_chunk(self, chunk_id, text, meta):with self.driver.session() as s:s.run("MERGE (c:Chunk {chunk_id:$id}) SET c.text=$text, c.meta=$meta",id=chunk_id, text=text, meta=meta)def create_next_rel(self, from_id, to_id):with self.driver.session() as s:s.run("MATCH (a:Chunk {chunk_id:$a}), (b:Chunk {chunk_id:$b}) MERGE (a)-[:NEXT]->(b)", a=from_id, b=to_id)def create_similar_rel(self, a, b, score):with self.driver.session() as s:s.run("MATCH (x:Chunk {chunk_id:$a}), (y:Chunk {chunk_id:$b}) ""MERGE (x)-[r:SIMILAR]->(y) SET r.score=$score", a=a, b=b, score=float(score))# ---------- 切分函数 ----------
def chunk_text(text, chunk_size=400, overlap=80):splitter = CharacterTextSplitter(separator="\n", chunk_size=chunk_size, chunk_overlap=overlap)return splitter.split_text(text)# ---------- 主流程 ----------
def ingest_file(path):text = open(path, "r", encoding="utf-8").read()chunks = chunk_text(text)embedder = GTEEmbedder()milvus = ensure_milvus_collection()neo = Neo4jClient(NEO4J_URI, NEO4J_USER, NEO4J_PASS)# prepare doc idsbase_id = int(time.time()*1000) # or use incrementalids = []texts = []metas = []for i, c in enumerate(chunks):cid = base_id + iids.append(cid)texts.append(c)metas.append(f"pos:{i}")# embeddingsembs = embedder.embed_texts(texts) # shape (N, DIM)# insert milvusmilvus.insert([ids, embs.tolist(), texts, metas])milvus.flush()# insert into neo4j: create nodes + NEXT edges sequentiallyfor i, cid in enumerate(ids):neo.create_chunk(cid, texts[i], metas[i])if i>0:neo.create_next_rel(ids[i-1], cid)# build SIMILAR edges: for each chunk, search top-N in Milvus and create SIMILAR edges for high-score# (use milvus search)TOPN = 5for i, emb in enumerate(embs):search_params = {"metric_type":"L2", "params":{"nprobe":10}}results = milvus.search([emb.tolist()], "embedding", param=search_params, limit=TOPN, expr=None)# results: list [ [ (id, distance), ...] ]for hit in results[0]:hit_id = hit.id# distance -> convert to similarity if desired; here we use inverse distancescore = 1.0/(1.0 + hit.distance)if hit_id != ids[i] and score > 0.5:neo.create_similar_rel(ids[i], hit_id, score)neo.close()print("Ingest finished: chunks:", len(chunks))if __name__ == "__main__":import sysif len(sys.argv) < 2:print("Usage: python ingest_graph_rag.py /path/to/text.txt")else:ingest_file(sys.argv[1])
说明与注意:
Milvus index:示例创建 IVF_FLAT;小规模可用 FLAT(或 IVF_HNSW/HNSW)。
Neo4j 中 SIMILAR 边的阈值/策略可按需调整(可用 cosine similarity / distance threshold)。
chunk_id 的生成策略应保证全局唯一(示例以时间戳为基础)。
六、检索与 Graph expansion(Query/Graph-Retrieval)
检索流程示例:
用户 query → embed query(gte model)
在 Milvus 做向量检索,取 top-K(seed)
用这些 seed 的
chunk_id
在 Neo4j 上做图遍历:例如 BFS 深度 D(如 2),或做 Personalized PageRank,收集所有到达节点并记录距离(distance)合并结果(向量 score、图距离、节点权重等),做加权重排序(例如
score = alpha * vec_score + beta * (1 / (1 + graph_distance))
)选取 top-M 上下文拼接 prompt,送入本地生成模型完成 RAG 回答
下面给出 query_graph_rag.py
的关键实现(简化版):
# query_graph_rag.py
from pymilvus import connections, Collection
from neo4j import GraphDatabase
import numpy as np
from ingest_graph_rag import GTEEmbedder, COLLECTION_NAME, MILVUS_HOST, MILVUS_PORT, NEO4J_URI, NEO4J_USER, NEO4J_PASSconnections.connect("default", host=MILVUS_HOST, port=str(MILVUS_PORT))
milvus = Collection(COLLECTION_NAME)class Neo4jClientSimple:def __init__(self): self.driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS))def close(self): self.driver.close()def expand_from(self, seed_ids, depth=2, max_nodes=50):# BFS-like expansion returning (chunk_id, distance)with self.driver.session() as s:q = """UNWIND $seeds AS sidMATCH (s:Chunk {chunk_id: sid})CALL apoc.path.expandConfig(s, {relationshipFilter:'SIMILAR>|NEXT>', maxLevel:$depth, limit:$limit}) YIELD pathWITH nodes(path) AS nsUNWIND ns AS nRETURN DISTINCT n.chunk_id AS chunk_id, length(path) AS dist LIMIT $limit"""res = s.run(q, seeds=seed_ids, depth=depth, limit=max_nodes)return [(r["chunk_id"], r["dist"]) for r in res]# retrieve + expand + re-rank
def graph_rag_query(query_text, k=5, expand_depth=2, final_k=5):embedder = GTEEmbedder()q_emb = embedder.embed_texts([query_text])[0].tolist()search_params = {"metric_type":"L2","params":{"nprobe":10}}hits = milvus.search([q_emb], "embedding", param=search_params, limit=k)[0]seeds = [h.id for h in hits]vec_scores = {h.id: 1.0/(1.0 + h.distance) for h in hits}neo = Neo4jClientSimple()expanded = neo.expand_from(seeds, depth=expand_depth, max_nodes=200)neo.close()# compute combined scorecombined = {}for cid, dist in expanded:vec_score = vec_scores.get(cid, 0.0)graph_score = 1.0/(1 + dist)combined[cid] = 0.7*vec_score + 0.3*graph_score# also include original seeds if not in expandedfor s in seeds:combined.setdefault(s, vec_scores.get(s, 0.0))# sortsorted_ids = sorted(combined.items(), key=lambda x: x[1], reverse=True)[:final_k]# fetch texts from milvus to assemble prompt# milvus.query or search by id: use query exprids = [int(x[0]) for x,_ in sorted_ids]res = milvus.query(expr=f"doc_id in {ids}", output_fields=["doc_id","text"])# preserve orderid2text = {r["doc_id"]: r["text"] for r in res}contexts = [id2text[i] for i in ids if i in id2text]# build promptprompt = "请基于下面的上下文回答问题:\n\n"for i,c in enumerate(contexts):prompt += f"[Context {i+1}]\n{c}\n\n"prompt += f"问题:{query_text}\n\n请给出简洁且引用上下文的回答。"return prompt
七、生成(RAG)—— 本地 LLM 集成建议
注意:thenlper/gte-large-zh
只是 embedding 模型,不能生成答案。你需要一个生成型 LLM(本地),常见选项:
ChatGLM 系列(中文对话模型,支持本地推理)
Baichuan / MOSS / LLaMA 系列(需要量化或 GPU)
使用 Hugging Face 的
transformers
支持的 text-generation 模型(若可本地运行)
示例:使用 transformers
的 pipeline("text-generation")
(需替换为你本地可用的生成模型):
from transformers import pipeline# 假设 model_name 指向一个本地可推理的生成模型
generator = pipeline("text-generation", model="local-gen-model", device=0) # or device=-1 for CPU
def generate_answer(prompt):out = generator(prompt, max_new_tokens=256, do_sample=False)return out[0]["generated_text"]# 使用
prompt = graph_rag_query("中国的首都是哪里?")
answer = generate_answer(prompt)
print(answer)
如果你没有强大的 GPU,也可以把生成模型部署为一个独立推理服务(如 text-generation-inference
、FastAPI + bitsandbytes 量化模型、ggml 本地推理等),RAG 服务通过 HTTP 调用推理端点。
八、关键策略与优化(工程实践)
边的建立策略:
顺序关系:每篇文档 chunk 的 NEXT。
语义关系:基于向量相似度 top-N 建 SIMILAR 边(可每天/离线更新)。
实体/主题关系:用NER或OpenIE抽取实体,建立
MENTIONS
关系,把同实体的 chunk 连接(问题导向检索很有用)。
扩展策略(Graph expansion):
深度优先 vs 广度优先:一般用 BFS(深度 1-3),以避免语义漂移。
Personalized PageRank:根据 query seed 给每个节点一个初始权重,然后跑 PPR,选择高得分节点。
最大节点上限:控制返回上下文量(避免 prompt 过长)。
融合排序(Vector + Graph):
线性融合:
score = α * vec_sim + β * graph_score
(α+β=1)学习排序:可收集用户点击/人工标签训练一个小型排序器(LightGBM)。
去重 / 合并:
基于 chunk 的 hash 做重复过滤;检索结果合并相似度极高的文本片段。
增量更新与重建:
新文档入库:只对新增 chunk 建节点并与老节点建立 SIMILAR 边(基于向量搜索 topN)。
周期性重建索引与边(每天/每周),以保证图质量。
性能:
Milvus index 选择 HNSW / IVF + PQ 以在大规模场景下加速搜索。
Neo4j 做复杂图算法(PPR)可能昂贵,必要时将图分析离线化并把重要关系写回 DB。
安全/隐私:
本地部署请注意磁盘加密、访问控制(Neo4j 密码、网络防火墙)。
日志避免保存敏感用户输入(或做脱敏)。
九、示例使用流程
启动服务:
docker compose up -d
准备并入库:
python ingest_graph_rag.py ./data/large_text.txt
查询(命令行 / 通过 API):
from query_graph_rag import graph_rag_query prompt = graph_rag_query("解释什么是分布式事务?") print(prompt) # 然后送到本地生成器生成答案
4.(可选)把检索到的上下文与 query 存入日志用于后续排序器训练。
十、示例 prompt 模板(供生成模型使用)
下面是若干相关上下文,请基于它们回答问题并在回答末尾标注你引用的上下文编号(比如 Context 1):[Context 1]
...文本片段1...[Context 2]
...文本片段2...问题:{user_question}请给出简洁、包含要点的回答;若需要引用上下文请注明 Context 编号;若无法从上下文中直接回答,请诚实说明并给出建议查询方向。
十一、扩展建议与路线图
短期(立刻可做)
部署 Milvus + Neo4j,本地做 ingest 与简单 BFS 扩展。
使用现成的中文生成模型(可 CPU/GPU)做 RAG。
中期(提升质量)
加入实体链接、主题建模,把逻辑关系写入图(提高精确检索)。
实施 PPR 或学习排序器融合向量+图信号。
长期(生产级)
Milvus 升级为分布式集群;Neo4j 做 HA 或用 JanusGraph + Janus/ES 等更大规模图解决方案。
把 embedding/生成服务容器化并做 GPU 调度、监控与限流。
建立评估指标(准确率、召回、回答质量)并做 A/B 测试。
十二、常见问题与排查提示
模型内存不足 / OOM:使用量化模型或在 CPU 上降批量;把 embedding 与生成拆分到不同主机。
Neo4j 查询慢:给常用查询建索引(
CREATE INDEX FOR (c:Chunk) ON (c.chunk_id)
),尽量限制 traversal 的 max nodes。Milvus 搜索速度慢:调整索引类型与 nprobe/nlist 参数;用更合适的 index(HNSW/IVF + PQ)。
重复文本/多余关系:入库前对 chunk 做去重与哈希比对;对 SIMILAR 边设阈值。
Prompt 太长:限制最终上下文 token 总长度,或使用摘要器先压缩上下文。
十三、总结
拉取代码与 Docker Compose(上面给出的 yml)。
准备样本文本,运行
ingest_graph_rag.py
。调整 chunk_size/overlap、SIMILAR 阈值,观察 Neo4j 图结构(Neo4j Browser)。
部署或使用本地生成模型,并把
graph_rag_query
的输出送入生成器观察结果。迭代融合权重 α/β,测试不同 expansion depth,记录效果。