当前位置: 首页 > news >正文

本地 Graph-RAG(图 + RAG) 部署与使用落地方案

把文本切片后用 thenlper/gte-large-zh 做 embedding 写入 Milvus(向量库),同时把切片作为图节点写入图数据库(Neo4j),基于向量检索 + 图遍历(Graph expansion)得到上下文,再由本地生成模型做 RAG 答复 — 即 Graph-RAG。


一、系统总体架构(概念图)

(说明)用户输入 → 切分成 chunk → embedding → (1)向量写入 Milvus;(2)作为节点写入 Neo4j,并建立关系(顺序关系、语义相似关系、实体关系等) → 查询时:先用向量搜索得到候选 chunk(seed nodes),再在图上扩展(walk / BFS / personalized PageRank)获得更多上下文 → 把最终候选上下文拼接到 prompt 送入本地生成模型生成答案(或摘要)。

关键组件:

  • 文本切片 & 入库服务(Python 脚本)

  • Embedding:thenlper/gte-large-zh(本地 HF 模型,用于 embedding)

  • 向量库:Milvus(本地部署,若轻量可用 Milvus Lite;建议示例用 Milvus Standalone docker)

  • 图数据库:Neo4j(存储片段节点、关系、元数据)

  • 生成模型(可本地推理的 LLM,例如 ChatGLM/baichuan/vicuna 等量化模型或 HF 可用的生成模型)

  • 一个 API 层(FastAPI)提供 ingest / query / admin 接口(可选)


二、部署方式(快速一键 Docker Compose 示例)

把下面 docker-compose.yml 保存并运行(示例包含 Milvus Standalone、Neo4j、Redis 可选缓存)。

version: "3.8"
services:milvus:image: milvusdb/milvus:latestcontainer_name: milvusports:- "19530:19530"- "19121:19121"environment:- TZ=Asia/Shanghaivolumes:- ./volumes/milvus:/var/lib/milvusrestart: unless-stoppedneo4j:image: neo4j:latestcontainer_name: neo4jports:- "7474:7474"   # browser- "7687:7687"   # boltenvironment:- NEO4J_AUTH=neo4j/neo4jpasswordvolumes:- ./volumes/neo4j:/datarestart: unless-stoppedredis:image: redis:7container_name: redisports:- "6379:6379"restart: unless-stopped

启动:

docker compose up -d

验证:

  • Milvus: localhost:19530(pymilvus 能连)

  • Neo4j: 浏览器访问 http://localhost:7474,账号 neo4j / neo4jpassword


三、Python 环境与依赖

建议 Python ≥ 3.10。创建虚拟环境并安装:

python -m venv venv && source venv/bin/activate
pip install -U pip
pip install transformers torch sentencepiece langchain pymilvus neo4j numpy scipy faiss-cpu fastapi uvicorn redis
# 如果要用 langchain 的 Milvus 适配器也可以 pip install langchain-milvus / langchain-community

如果使用 GPU,请安装对应的 torch 版本与 CUDA 支持包。


四、数据建模(Milvus collection + Neo4j node schema)

Milvus collection(示例):

  • 主键 doc_id(int64)

  • embedding(float_vector,dim=1024)

  • chunk_text(varchar / string)

  • meta(json / varchar,保存 source、offset 等)

Neo4j 节点(Label: Chunk)

  • chunk_id(唯一,与 Milvus doc_id 对应)

  • text(chunk 文本)

  • source, pos, length, hash 等元数据
    关系(Relationship):

  • (:Chunk)-[:NEXT]->(:Chunk) 顺序关系

  • (:Chunk)-[:SIMILAR {score: 0.9}]->(:Chunk) 语义相似关系

  • 可按需加入 MENTIONS_ENTITYBELONGS_TO_DOC 等关系


五、入库(ingest)示例脚本

ingest_graph_rag.py(要点:切分 → embed → 写 Milvus → 写 Neo4j → 建关系)

# ingest_graph_rag.py
from typing import List
import os, hashlib, time
import numpy as np
import torch, torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from pymilvus import connections, CollectionSchema, FieldSchema, DataType, Collection, utility
from neo4j import GraphDatabase
from langchain.text_splitter import CharacterTextSplitter# ---------- 配置 ----------
MILVUS_HOST = "localhost"
MILVUS_PORT = 19530
COLLECTION_NAME = "graphrag_chunks"
DIM = 1024  # gte-large-zh 输出维度
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASS = "neo4jpassword"# ---------- Embedding class (gte-large-zh) ----------
class GTEEmbedder:def __init__(self, model_name="thenlper/gte-large-zh", device=None, batch_size=8):self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")self.batch = batch_sizeself.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)self.model = AutoModel.from_pretrained(model_name).to(self.device)self.model.eval()def embed_texts(self, texts: List[str]):embs = []with torch.no_grad():for i in range(0, len(texts), self.batch):b = texts[i:i+self.batch]tokens = self.tokenizer(b, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)out = self.model(**tokens)v = out.last_hidden_state[:,0,:]   # CLSv = F.normalize(v, p=2, dim=1).cpu().numpy()embs.append(v)return np.vstack(embs)# ---------- Milvus helper ----------
def ensure_milvus_collection():connections.connect("default", host=MILVUS_HOST, port=str(MILVUS_PORT))if utility.has_collection(COLLECTION_NAME):coll = Collection(COLLECTION_NAME)return coll# schemafields = [FieldSchema(name="doc_id", dtype=DataType.INT64, is_primary=True, auto_id=False),FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIM),FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),FieldSchema(name="meta", dtype=DataType.VARCHAR, max_length=65535),]schema = CollectionSchema(fields, description="Graph-RAG chunks")coll = Collection(COLLECTION_NAME, schema=schema)# create index if desiredcoll.create_index("embedding", {"index_type":"IVF_FLAT","metric_type":"L2","params":{"nlist":1024}})coll.load()return coll# ---------- Neo4j helper ----------
class Neo4jClient:def __init__(self, uri, user, pw):self.driver = GraphDatabase.driver(uri, auth=(user, pw))def close(self): self.driver.close()def create_chunk(self, chunk_id, text, meta):with self.driver.session() as s:s.run("MERGE (c:Chunk {chunk_id:$id}) SET c.text=$text, c.meta=$meta",id=chunk_id, text=text, meta=meta)def create_next_rel(self, from_id, to_id):with self.driver.session() as s:s.run("MATCH (a:Chunk {chunk_id:$a}), (b:Chunk {chunk_id:$b}) MERGE (a)-[:NEXT]->(b)", a=from_id, b=to_id)def create_similar_rel(self, a, b, score):with self.driver.session() as s:s.run("MATCH (x:Chunk {chunk_id:$a}), (y:Chunk {chunk_id:$b}) ""MERGE (x)-[r:SIMILAR]->(y) SET r.score=$score", a=a, b=b, score=float(score))# ---------- 切分函数 ----------
def chunk_text(text, chunk_size=400, overlap=80):splitter = CharacterTextSplitter(separator="\n", chunk_size=chunk_size, chunk_overlap=overlap)return splitter.split_text(text)# ---------- 主流程 ----------
def ingest_file(path):text = open(path, "r", encoding="utf-8").read()chunks = chunk_text(text)embedder = GTEEmbedder()milvus = ensure_milvus_collection()neo = Neo4jClient(NEO4J_URI, NEO4J_USER, NEO4J_PASS)# prepare doc idsbase_id = int(time.time()*1000)  # or use incrementalids = []texts = []metas = []for i, c in enumerate(chunks):cid = base_id + iids.append(cid)texts.append(c)metas.append(f"pos:{i}")# embeddingsembs = embedder.embed_texts(texts)  # shape (N, DIM)# insert milvusmilvus.insert([ids, embs.tolist(), texts, metas])milvus.flush()# insert into neo4j: create nodes + NEXT edges sequentiallyfor i, cid in enumerate(ids):neo.create_chunk(cid, texts[i], metas[i])if i>0:neo.create_next_rel(ids[i-1], cid)# build SIMILAR edges: for each chunk, search top-N in Milvus and create SIMILAR edges for high-score# (use milvus search)TOPN = 5for i, emb in enumerate(embs):search_params = {"metric_type":"L2", "params":{"nprobe":10}}results = milvus.search([emb.tolist()], "embedding", param=search_params, limit=TOPN, expr=None)# results: list [ [ (id, distance), ...] ]for hit in results[0]:hit_id = hit.id# distance -> convert to similarity if desired; here we use inverse distancescore = 1.0/(1.0 + hit.distance)if hit_id != ids[i] and score > 0.5:neo.create_similar_rel(ids[i], hit_id, score)neo.close()print("Ingest finished: chunks:", len(chunks))if __name__ == "__main__":import sysif len(sys.argv) < 2:print("Usage: python ingest_graph_rag.py /path/to/text.txt")else:ingest_file(sys.argv[1])

说明与注意:

  • Milvus index:示例创建 IVF_FLAT;小规模可用 FLAT(或 IVF_HNSW/HNSW)。

  • Neo4j 中 SIMILAR 边的阈值/策略可按需调整(可用 cosine similarity / distance threshold)。

  • chunk_id 的生成策略应保证全局唯一(示例以时间戳为基础)。


六、检索与 Graph expansion(Query/Graph-Retrieval)

检索流程示例:

  1. 用户 query → embed query(gte model)

  2. 在 Milvus 做向量检索,取 top-K(seed)

  3. 用这些 seed 的 chunk_id 在 Neo4j 上做图遍历:例如 BFS 深度 D(如 2),或做 Personalized PageRank,收集所有到达节点并记录距离(distance)

  4. 合并结果(向量 score、图距离、节点权重等),做加权重排序(例如 score = alpha * vec_score + beta * (1 / (1 + graph_distance))

  5. 选取 top-M 上下文拼接 prompt,送入本地生成模型完成 RAG 回答

下面给出 query_graph_rag.py 的关键实现(简化版):

# query_graph_rag.py
from pymilvus import connections, Collection
from neo4j import GraphDatabase
import numpy as np
from ingest_graph_rag import GTEEmbedder, COLLECTION_NAME, MILVUS_HOST, MILVUS_PORT, NEO4J_URI, NEO4J_USER, NEO4J_PASSconnections.connect("default", host=MILVUS_HOST, port=str(MILVUS_PORT))
milvus = Collection(COLLECTION_NAME)class Neo4jClientSimple:def __init__(self): self.driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS))def close(self): self.driver.close()def expand_from(self, seed_ids, depth=2, max_nodes=50):# BFS-like expansion returning (chunk_id, distance)with self.driver.session() as s:q = """UNWIND $seeds AS sidMATCH (s:Chunk {chunk_id: sid})CALL apoc.path.expandConfig(s, {relationshipFilter:'SIMILAR>|NEXT>', maxLevel:$depth, limit:$limit}) YIELD pathWITH nodes(path) AS nsUNWIND ns AS nRETURN DISTINCT n.chunk_id AS chunk_id, length(path) AS dist LIMIT $limit"""res = s.run(q, seeds=seed_ids, depth=depth, limit=max_nodes)return [(r["chunk_id"], r["dist"]) for r in res]# retrieve + expand + re-rank
def graph_rag_query(query_text, k=5, expand_depth=2, final_k=5):embedder = GTEEmbedder()q_emb = embedder.embed_texts([query_text])[0].tolist()search_params = {"metric_type":"L2","params":{"nprobe":10}}hits = milvus.search([q_emb], "embedding", param=search_params, limit=k)[0]seeds = [h.id for h in hits]vec_scores = {h.id: 1.0/(1.0 + h.distance) for h in hits}neo = Neo4jClientSimple()expanded = neo.expand_from(seeds, depth=expand_depth, max_nodes=200)neo.close()# compute combined scorecombined = {}for cid, dist in expanded:vec_score = vec_scores.get(cid, 0.0)graph_score = 1.0/(1 + dist)combined[cid] = 0.7*vec_score + 0.3*graph_score# also include original seeds if not in expandedfor s in seeds:combined.setdefault(s, vec_scores.get(s, 0.0))# sortsorted_ids = sorted(combined.items(), key=lambda x: x[1], reverse=True)[:final_k]# fetch texts from milvus to assemble prompt# milvus.query or search by id: use query exprids = [int(x[0]) for x,_ in sorted_ids]res = milvus.query(expr=f"doc_id in {ids}", output_fields=["doc_id","text"])# preserve orderid2text = {r["doc_id"]: r["text"] for r in res}contexts = [id2text[i] for i in ids if i in id2text]# build promptprompt = "请基于下面的上下文回答问题:\n\n"for i,c in enumerate(contexts):prompt += f"[Context {i+1}]\n{c}\n\n"prompt += f"问题:{query_text}\n\n请给出简洁且引用上下文的回答。"return prompt

七、生成(RAG)—— 本地 LLM 集成建议

注意thenlper/gte-large-zh 只是 embedding 模型,不能生成答案。你需要一个生成型 LLM(本地),常见选项:

  • ChatGLM 系列(中文对话模型,支持本地推理)

  • Baichuan / MOSS / LLaMA 系列(需要量化或 GPU)

  • 使用 Hugging Face 的 transformers 支持的 text-generation 模型(若可本地运行)

示例:使用 transformerspipeline("text-generation")(需替换为你本地可用的生成模型):

from transformers import pipeline# 假设 model_name 指向一个本地可推理的生成模型
generator = pipeline("text-generation", model="local-gen-model", device=0)  # or device=-1 for CPU
def generate_answer(prompt):out = generator(prompt, max_new_tokens=256, do_sample=False)return out[0]["generated_text"]# 使用
prompt = graph_rag_query("中国的首都是哪里?")
answer = generate_answer(prompt)
print(answer)

如果你没有强大的 GPU,也可以把生成模型部署为一个独立推理服务(如 text-generation-inference、FastAPI + bitsandbytes 量化模型、ggml 本地推理等),RAG 服务通过 HTTP 调用推理端点。


八、关键策略与优化(工程实践)

  1. 边的建立策略

    • 顺序关系:每篇文档 chunk 的 NEXT。

    • 语义关系:基于向量相似度 top-N 建 SIMILAR 边(可每天/离线更新)。

    • 实体/主题关系:用NER或OpenIE抽取实体,建立 MENTIONS 关系,把同实体的 chunk 连接(问题导向检索很有用)。

  2. 扩展策略(Graph expansion)

    • 深度优先 vs 广度优先:一般用 BFS(深度 1-3),以避免语义漂移。

    • Personalized PageRank:根据 query seed 给每个节点一个初始权重,然后跑 PPR,选择高得分节点。

    • 最大节点上限:控制返回上下文量(避免 prompt 过长)。

  3. 融合排序(Vector + Graph)

    • 线性融合:score = α * vec_sim + β * graph_score(α+β=1)

    • 学习排序:可收集用户点击/人工标签训练一个小型排序器(LightGBM)。

  4. 去重 / 合并

    • 基于 chunk 的 hash 做重复过滤;检索结果合并相似度极高的文本片段。

  5. 增量更新与重建

    • 新文档入库:只对新增 chunk 建节点并与老节点建立 SIMILAR 边(基于向量搜索 topN)。

    • 周期性重建索引与边(每天/每周),以保证图质量。

  6. 性能

    • Milvus index 选择 HNSW / IVF + PQ 以在大规模场景下加速搜索。

    • Neo4j 做复杂图算法(PPR)可能昂贵,必要时将图分析离线化并把重要关系写回 DB。

  7. 安全/隐私

    • 本地部署请注意磁盘加密、访问控制(Neo4j 密码、网络防火墙)。

    • 日志避免保存敏感用户输入(或做脱敏)。


九、示例使用流程

  1. 启动服务:

    docker compose up -d
    
  2. 准备并入库:

    python ingest_graph_rag.py ./data/large_text.txt
    
  3. 查询(命令行 / 通过 API):

    from query_graph_rag import graph_rag_query
    prompt = graph_rag_query("解释什么是分布式事务?")
    print(prompt)
    # 然后送到本地生成器生成答案
    

4.(可选)把检索到的上下文与 query 存入日志用于后续排序器训练。


十、示例 prompt 模板(供生成模型使用)

下面是若干相关上下文,请基于它们回答问题并在回答末尾标注你引用的上下文编号(比如 Context 1):[Context 1]
...文本片段1...[Context 2]
...文本片段2...问题:{user_question}请给出简洁、包含要点的回答;若需要引用上下文请注明 Context 编号;若无法从上下文中直接回答,请诚实说明并给出建议查询方向。

十一、扩展建议与路线图

短期(立刻可做)

  • 部署 Milvus + Neo4j,本地做 ingest 与简单 BFS 扩展。

  • 使用现成的中文生成模型(可 CPU/GPU)做 RAG。

中期(提升质量)

  • 加入实体链接、主题建模,把逻辑关系写入图(提高精确检索)。

  • 实施 PPR 或学习排序器融合向量+图信号。

长期(生产级)

  • Milvus 升级为分布式集群;Neo4j 做 HA 或用 JanusGraph + Janus/ES 等更大规模图解决方案。

  • 把 embedding/生成服务容器化并做 GPU 调度、监控与限流。

  • 建立评估指标(准确率、召回、回答质量)并做 A/B 测试。


十二、常见问题与排查提示

  • 模型内存不足 / OOM:使用量化模型或在 CPU 上降批量;把 embedding 与生成拆分到不同主机。

  • Neo4j 查询慢:给常用查询建索引(CREATE INDEX FOR (c:Chunk) ON (c.chunk_id)),尽量限制 traversal 的 max nodes。

  • Milvus 搜索速度慢:调整索引类型与 nprobe/nlist 参数;用更合适的 index(HNSW/IVF + PQ)。

  • 重复文本/多余关系:入库前对 chunk 做去重与哈希比对;对 SIMILAR 边设阈值。

  • Prompt 太长:限制最终上下文 token 总长度,或使用摘要器先压缩上下文。


十三、总结

  1. 拉取代码与 Docker Compose(上面给出的 yml)。

  2. 准备样本文本,运行 ingest_graph_rag.py

  3. 调整 chunk_size/overlap、SIMILAR 阈值,观察 Neo4j 图结构(Neo4j Browser)。

  4. 部署或使用本地生成模型,并把 graph_rag_query 的输出送入生成器观察结果。

  5. 迭代融合权重 α/β,测试不同 expansion depth,记录效果。

http://www.dtcms.com/a/345153.html

相关文章:

  • Unreal Engine AActor
  • 机器学习--线性回归
  • K8S - NetworkPolicy的使用
  • Spring发布订阅模式详解
  • 国产CANFD芯片技术特性与应用前景综述:以ASM1042系列为例
  • 宝可梦:去吧皮卡丘/去吧伊布 PC/手机双端(Pokemon-Lets Go Pikachu)免安装中文版
  • MeterSphere接口自动化共享cookie
  • 开发避坑指南(33):Mybatisplus QueryWrapper 自定义查询语句防注入解决方案
  • 【Cmake】Cmake概览
  • C2039 “unref“:不是“osgEarth::Symbology::Style”的成员 问题分析及解决方法
  • 【RA-Eco-RA4E2-64PIN-V1.0 开发板】步进电机驱动
  • 育教大师广州专插本培训机构指南
  • STM32项目分享:基于STM32的焊接工位智能排烟系统
  • 视频编码异常的表现
  • 【Linux系列】Linux 中替换文件中的字符串
  • 基于SpringBoot的考研学习交流平台【2026最新】
  • Nginx 创建和配置虚拟主机
  • 掌握设计模式--命令模式
  • 全面解析 `strchr` 字符串查找函数
  • Java面试宝典:Redis底层原理(持久化+分布式锁)
  • 智慧农业新基建:边缘计算网关在精准农业中的落地实践案例
  • C#_高性能内存处理:Span<T>, Memory<T>, ArrayPool
  • const(常量)
  • Android.bp 基础
  • 安全帽检测算法如何提升工地安全管理效率
  • AI 向量库:从文本到数据的奇妙之旅​
  • 编排之神--Kubernetes中包管理Helm工具详解
  • Jmeter压测实操指南
  • 金融量化入门:Pandas 时间序列处理与技术指标实战(含金叉死叉 / 均线策略)
  • GaussDB SQL引擎(1)-SQL执行流程