剖析graph-rag中最核心的一步:切片文本如何输入到大模型并且构建整体的关系
一、总体思路分层
Pass-1:分块抽取(Chunk IE)
对每个切片做提及级别抽取:实体提及(mention)、关系提及(rel-mention)、证据(原文片段/offset)、置信度。此阶段不要急着“下定论”,只把局部事实 + 证据抛出来。Pass-2:全局对齐(Entity Linking / Coref)
把每个提及映射到全局实体(canonical entity)。这一步解决同名/别名/代词问题,是把“碎片”粘到“一个人/一个公司”上。Pass-3:关系汇总(Relation Assembly & Consolidation)
以“全局实体ID”为端点,将所有关系提及合并成全局关系(去重、聚合证据、合并时间戳/属性、计算聚合置信度),并标注来源与证据。Pass-4:入库(Neo4j)
幂等 MERGE 写库:(:Entity {id})
,(s)-[:TYPE {…props}]->(t)
,保留evidence[]/source_doc_id/chunk_id/offset
。
关键:Pass-1 是“粒度小+证据充分”;Pass-2/3 才做“抽象与合并”。这样最稳、可审计、可回溯。
二、如何把切片“正确喂给”大模型
1. 切割策略(避免上下文断裂)
RecursiveCharacterTextSplitter
:建议chunk_size=800~1200 tokens
,chunk_overlap=150~250 tokens
。优先按章节/标题/段落/标点切;必要时用重叠跨界,保证关系动词和主客体尽量在同一块或覆盖于相邻块。
2. 每块输入的上下文打包(非常重要)
对第 i 块输入时,附加两类“轻记忆”:
邻近窗口:
chunk_{i-1}
的末尾 23 句 +chunk_{i+1}
的开头 23 句(可选),防止句子跨段。全局实体“摘要索引”(Top-K):Pass-1 后面会有,但首轮没有的话,用逐块内缓存即可。第二轮跑时,把已发现的全局实体摘要(每个1行小卡片)作为上下文附加,帮助模型在 chunk 内就把别名/代词绑定到已有实体。
轻记忆示例(随 prompt 附加):
Known entities so far (id | name | type | key attrs):
E001 | Alice | Person | {nationality: "SG"}
E007 | Acme Corp | Org | {hq: "Singapore"}
3. 输出结构一定要提及级 + 证据
为防止幻觉与错配,要求模型定位原文证据:
mention:
text_span
(摘录10~40字)、char_start
、char_end
relation-mention:
subj_mention_id
、obj_mention_id
、predicate
、evidence_span
、when/where
、confidence
这样可以在 Pass-3 聚合时做去重与冲突仲裁。
三、跨切片构建“整体关系”的算法
1. 实体对齐(Entity Linking / Coreference)
候选生成(越快越好):
规则匹配:规范化字符串(去空白、大小写、全半角、公司后缀等),精确/近似匹配(可用
rapidfuzz
)。关键属性:注册号/ISBN/邮箱/URL/地名层级等。
(可选)向量相似:
name + type + attrs
生成嵌入,Top-K 候选。
候选裁决(LLM 只做“选择/NEW”而非自由发挥):
把候选列表(最多5个)+ 当前 mention 的证据片段发给 LLM,强制返回
{link_to: <candidate_id | "NEW">, confidence}
。若
"NEW"
,为其生成canonical_id
(哈希),并登记其aliases[]
含当前 mention 文本。
裁决提示词见后文模板《链接判定提示词》。
2. 关系汇总(Consolidation)
拿到 subj_global_id
与 obj_global_id
后,把同一对端点、同一关系类型的多条关系提及合并:
关系键:
(src_id, predicate, tgt_id)
证据聚合:保存去重后的
evidence[]
;时间/地点/数额等属性用最可信或合并集合(如dates[]
)。置信度聚合:例如
p_agg = 1 - ∏(1 - p_i)
;或按证据权重加权平均。时间范围:若多处出现时间,合成
valid_from/valid_to
或dates[]
。来源与可追溯:保留
source_doc_id
、chunk_id
、mention_ids[]
、offsets[]
。
3. 生成反向/派生关系
明确定义的可逆关系(如
ACQUIRED
↔WAS_ACQUIRED_BY
)自动生成逆向边。规则推理(如 “创始人 = 在 X 年 时创立 Y”)产出
FOUNDED
;但把derived=true
标记清楚,并把证据链(来源关系ID)挂在属性中。
四、代码关键片段
下面用 LangChain 0.2+ 写法,演示:
分块+批量送模;2) 提及级输出;3) 实体链接裁决;4) 关系汇总;5) 写入 Neo4j。
模型你可换 OpenAI / 本地 Ollama;此处保留两种写法(择一启用)。
# --- deps ---
# pip install langchain>=0.2 langchain-community langchain-core pydantic neo4j tiktoken rapidfuzz
# pip install openai # 如用OpenAI
# pip install ollama # 如用本地Ollamafrom __future__ import annotations
import os, json, hashlib
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from rapidfuzz import fuzz, process
from neo4j import GraphDatabaseUSE_OPENAI = False
if USE_OPENAI:from langchain_openai import ChatOpenAIllm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
else:from langchain_community.chat_models import ChatOllamallm = ChatOllama(model="qwen2.5:14b", temperature=0)# ---- Neo4j driver ----
driver = GraphDatabase.driver(os.getenv("NEO4J_URI", "bolt://localhost:7687"),auth=(os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
)# ---------- 数据结构 ----------
class Mention(BaseModel):id: strtext: strtype: str # 预测类型char_start: intchar_end: intattrs: Dict[str, Any] = {}confidence: float = 0.8class RelMention(BaseModel):id: strsubj_mention_id: strobj_mention_id: strpredicate: strevidence: strchar_start: intchar_end: intwhen: Optional[str] = Nonewhere: Optional[str] = Noneprops: Dict[str, Any] = {}confidence: float = 0.7class ChunkIE(BaseModel):chunk_id: intmentions: List[Mention] = []rel_mentions: List[RelMention] = []# ---------- 切割 ----------
def split_text(text: str, chunk_size=1200, chunk_overlap=200):splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap,separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""])return splitter.split_text(text)# ---------- 稳定ID ----------
def stable_id(*parts) -> str:key = "|".join([json.dumps(p, ensure_ascii=False, sort_keys=True) for p in parts])return hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]# ---------- 提及级抽取 Prompt ----------
IE_SYSTEM = """You are a precise IE system. Extract entity mentions and relation mentions.
Return ONLY JSON matching the schema. Use evidence spans from the text (char_start/end)."""IE_USER = """
DOC_ID: {doc_id}
CHUNK_ID: {chunk_id}/{total}
NEIGHBOR_CONTEXT:
<<<
{neighbor}
>>>
TEXT:
<<<
{chunk}
>>>Schema:
{schema}
Guidelines:
- Mentions: short span text from this chunk; include char_start/char_end offsets within THIS chunk.
- Relations: use subj_mention_id/obj_mention_id; keep predicate concise (ACQUIRED, FOUNDED, WORKS_AT, PART_OF, etc.)
- Include evidence (a short quote) and optional when/where if present.
- No fabrication. Only facts supported by this text.
"""ie_prompt = ChatPromptTemplate.from_messages([("system", IE_SYSTEM), ("user", IE_USER)])
ie_parser = JsonOutputParser(pydantic_object=ChunkIE)def build_neighbor_context(prev_tail: str, next_head: str) -> str:return (prev_tail or "").strip() + ("\n---\n" if (prev_tail and next_head) else "") + (next_head or "").strip()def extract_chunk_ie(doc_id: str, chunk: str, chunk_id: int, total: int, neighbor: str) -> ChunkIE:msg = ie_prompt.format_messages(doc_id=doc_id, chunk_id=chunk_id, total=total,neighbor=neighbor, chunk=chunk,schema=json.dumps(ChunkIE.schema(), ensure_ascii=False, indent=2))raw = llm.invoke(msg)text = getattr(raw, "content", str(raw))try:data = ie_parser.parse(text)except ValidationError:start, end = text.find("{"), text.rfind("}")data = ie_parser.parse(text[start:end+1])# 填mention id(若模型未填)for m in data.mentions:if not m.id:m.id = stable_id(doc_id, chunk_id, m.text, m.type, m.char_start, m.char_end)for r in data.rel_mentions:if not r.id:r.id = stable_id(doc_id, chunk_id, r.subj_mention_id, r.predicate, r.obj_mention_id, r.char_start, r.char_end)return data# ---------- 实体全局对齐 ----------
@dataclass
class CanonicalEntity:id: strname: strtype: strattrs: Dict[str, Any] = field(default_factory=dict)aliases: set = field(default_factory=set)def name_key(s: str) -> str:return s.lower().replace(" ", "").replace("(", "(").replace(")", ")")def candidates_for(entities: Dict[str, CanonicalEntity], name: str, etype: str, top=5) -> List[CanonicalEntity]:# 先快速筛:同type优先same_type = [e for e in entities.values() if e.type == etype]# 近似匹配choices = [(e, max(fuzz.partial_ratio(name, e.name), *(fuzz.partial_ratio(name, a) for a in e.aliases) )) for e in same_type]choices.sort(key=lambda x: x[1], reverse=True)return [e for e, score in choices[:top] if score >= 80]LINK_SYSTEM = "Choose the best candidate id for the mention or NEW. Return JSON: {\"link_to\": \"<id|NEW>\", \"confidence\": 0..1}"
LINK_USER = """
Mention:
name: {name}
type: {etype}
evidence: "{evidence}"Candidates:
{cands}Rules:
- If none is a clear match, answer NEW.
- Prefer exact/near-exact alias match; consider type and attrs if present.
"""link_prompt = ChatPromptTemplate.from_messages([("system", LINK_SYSTEM), ("user", LINK_USER)])def link_with_llm(name: str, etype: str, evidence: str, cands: List[CanonicalEntity]) -> Tuple[str, float]:if not cands:return "NEW", 1.0cands_txt = "\n".join([f"- id={c.id}, name={c.name}, type={c.type}, aliases={list(c.aliases)[:3]}" for c in cands])msg = link_prompt.format_messages(name=name, etype=etype, evidence=evidence[:120], cands=cands_txt)out = llm.invoke(msg)try:data = json.loads(out.content)return data.get("link_to", "NEW"), float(data.get("confidence", 0.7))except Exception:return "NEW", 0.6# ---------- 汇总关系 ----------
@dataclass
class RelAgg:src: strpredicate: strtgt: strevidences: list = field(default_factory=list) # [{doc_id, chunk_id, text, offsets}]when: set = field(default_factory=set)where: set = field(default_factory=set)props: dict = field(default_factory=dict)confidence_list: list = field(default_factory=list)def combine_confidences(ps: List[float]) -> float:from math import prodps = [max(0.0, min(1.0, p)) for p in ps if p is not None]return 1 - prod([(1-p) for p in ps]) if ps else 0.0# ---------- 入库 ----------
def write_to_neo4j(entities: Dict[str, CanonicalEntity], relations: Dict[Tuple[str,str,str], RelAgg], source_doc_id: str):node_cypher = """UNWIND $nodes AS nMERGE (e:Entity {id:n.id})ON CREATE SET e.name=n.name, e.type=n.type, e.aliases=n.aliases, e += n.attrs, e.createdAt=timestamp(), e.updatedAt=timestamp()ON MATCH SET e.name=coalesce(n.name, e.name), e.type=coalesce(n.type, e.type), e.aliases=apoc.coll.toSet(coalesce(e.aliases, []) + n.aliases), e += n.attrs, e.updatedAt=timestamp()"""rel_cypher = """UNWIND $rels AS rMATCH (s:Entity {id:r.src})MATCH (t:Entity {id:r.tgt})CALL apoc.merge.relationship(s, r.predicate, {}, {sourceDocId:$source_doc_id, evidences:r.evidences, when:r.when, where:r.where, props:r.props, confidence:r.confidence}, t)YIELD relSET rel.updatedAt=timestamp()RETURN count(rel)"""with driver.session() as sess:nodes = [{"id": e.id, "name": e.name, "type": e.type, "attrs": e.attrs, "aliases": list(e.aliases)} for e in entities.values()]sess.run(node_cypher, parameters={"nodes": nodes})rels = []for (src,pred,tgt), agg in relations.items():rels.append({"src": src, "predicate": pred, "tgt": tgt,"evidences": agg.evidences, "when": list(agg.when), "where": list(agg.where),"props": agg.props, "confidence": combine_confidences(agg.confidence_list)})if rels:sess.run(rel_cypher, parameters={"rels": rels, "source_doc_id": source_doc_id})# ---------- 主流程 ----------
def build_graph_from_text(doc_id: str, raw_text: str, source_doc_id: str):chunks = split_text(raw_text)total = len(chunks)# Pass-1: 分块抽取chunk_outputs: List[ChunkIE] = []for i, chunk in enumerate(chunks, 1):prev_tail = chunks[i-2][-300:] if i-2 >= 0 else ""next_head = chunks[i][:300] if i < total else ""neighbor = build_neighbor_context(prev_tail, next_head)ie = extract_chunk_ie(doc_id, chunk, i, total, neighbor)chunk_outputs.append(ie)# Pass-2: 实体对齐entities: Dict[str, CanonicalEntity] = {}mention2entity: Dict[str, str] = {} # mention_id -> canonical_idfor ie in chunk_outputs:for m in ie.mentions:# 候选cands = candidates_for(entities, m.text, m.type, top=5)decision, conf = link_with_llm(m.text, m.type, m.text, cands)if decision == "NEW":cid = stable_id(m.text, m.type)entities[cid] = CanonicalEntity(id=cid, name=m.text, type=m.type, attrs=m.attrs, aliases={m.text})else:cid = decision# 记录别名entities[cid].aliases.add(m.text)mention2entity[m.id] = cid# Pass-3: 关系汇总relations: Dict[Tuple[str,str,str], RelAgg] = {}for idx, ie in enumerate(chunk_outputs, 1):for r in ie.rel_mentions:s = mention2entity.get(r.subj_mention_id)t = mention2entity.get(r.obj_mention_id)if not s or not t: continuekey = (s, r.predicate, t)if key not in relations:relations[key] = RelAgg(src=s, predicate=r.predicate, tgt=t)agg = relations[key]agg.evidences.append({"doc_id": doc_id, "chunk_id": idx,"text": r.evidence[:180], "offsets": [r.char_start, r.char_end]})if r.when: agg.when.add(r.when)if r.where: agg.where.add(r.where)agg.confidence_list.append(r.confidence)# 可选择把金额/比例等 props 合并为列表或选最大置信度那条for k, v in (r.props or {}).items():if k not in agg.props: agg.props[k] = set()agg.props[k].add(v)# 将 set 转 listfor agg in relations.values():for k,v in list(agg.props.items()):if isinstance(v, set): agg.props[k] = list(v)# Pass-4: 入库write_to_neo4j(entities, relations, source_doc_id=source_doc_id)
五、三份关键提示词模板
1. “分块抽取”提示词(上文 IE_SYSTEM
/IE_USER
已给)
要求只输出 JSON,必须给 offsets,关系要用提及ID指向主客体。
predicate
控制在规范动词集合(自定义枚举)内,减少花样词。
示例枚举(可放到 System):
Entity types: Person, Org, Product, Event, Location, Law, Concept
Predicates: FOUNDED, ACQUIRED, MERGED_WITH, PART_OF, WORKS_AT, LOCATED_IN, PUBLISHED, REGULATED_BY, INVESTED_IN
2. “实体链接判定”提示词(上文 LINK_SYSTEM
/LINK_USER
已给)
输出只能是
{"link_to":"<id|NEW>","confidence":0..1}
。传入候选列表,LLM只做“选择或NEW”,避免自由发挥。
3. “全局校对/补关系(可选)”
对全局汇总后的草图(top-N 实体与关系),请 LLM 做一致性检查与遗漏补全(例如从证据语句推导 FOUNDED
)。
注意:把新增的标为 derived=true
并保留“来源关系键”。
六、Neo4j 图模式与溯源
建议最小可行模式:
(:Entity {id, name, type, aliases[], ...})
(:Entity)-[:REL_TYPE {sourceDocId, evidences[], when[], where[], props{}, confidence, derived?}]->(:Entity)
(可选)
:Evidence
节点太重,先用数组属性即可;等数据量/法务需求提升后再拆。
约束与索引:
CREATE CONSTRAINT entity_id IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE;
CREATE INDEX ent_type IF NOT EXISTS FOR (n:Entity) ON (n.type);
七、质量与鲁棒性(实践要点)
证据先行:强制输出 evidence 与 offsets,后续任何纠错都有依据。
两阶段思路:先“列举事实片段”,再“汇总定论”。
别名池:
aliases
越丰富,链接越稳;可在图中周期性把相同邮箱/域名/注册号的节点合并。去重与幂等:写库时用
MERGE
+ APOCapoc.merge.relationship
;关系以(src, pred, tgt)
作为幂等键。跨切片缺词:使用邻近窗口与Top-K 全局实体摘要当“轻记忆”。
冲突仲裁:置信度聚合 + “最高权重证据优先”。
可观测性:记录每块耗时/失败/重试次数、每文档提及/关系密度分布。
成本控制:
chain.batch(chunks, config={"max_concurrency": 6})
并发;重复文本用缓存;只对“模糊提及”调用链接判定 LLM。
八、你可以直接复用的最小运行清单
输入:原始文本字符串
raw_text
步骤:
chunks = split_text(raw_text)
遍历
chunks
→extract_chunk_ie(...)
→ 累积mentions, rel_mentions
link_with_llm(...)
把 mention 绑定到全局实体合并成全局关系
relations[(src,pred,tgt)]
write_to_neo4j(...)
MERGE 入库
验证:
MATCH (s:Entity)-[r]->(t:Entity) RETURN s.name, type(r), t.name, r.confidence, r.evidences[0..2] LIMIT 20;