当前位置: 首页 > news >正文

剖析graph-rag中最核心的一步:切片文本如何输入到大模型并且构建整体的关系

一、总体思路分层

  • Pass-1:分块抽取(Chunk IE)
    对每个切片做提及级别抽取:实体提及(mention)、关系提及(rel-mention)、证据(原文片段/offset)、置信度。此阶段不要急着“下定论”,只把局部事实 + 证据抛出来。

  • Pass-2:全局对齐(Entity Linking / Coref)
    把每个提及映射到全局实体(canonical entity)。这一步解决同名/别名/代词问题,是把“碎片”粘到“一个人/一个公司”上。

  • Pass-3:关系汇总(Relation Assembly & Consolidation)
    以“全局实体ID”为端点,将所有关系提及合并成全局关系(去重、聚合证据、合并时间戳/属性、计算聚合置信度),并标注来源与证据。

  • Pass-4:入库(Neo4j)
    幂等 MERGE 写库:(:Entity {id})(s)-[:TYPE {…props}]->(t),保留 evidence[]/source_doc_id/chunk_id/offset

关键:Pass-1 是“粒度小+证据充分”Pass-2/3 才做“抽象与合并”。这样最稳、可审计、可回溯。


二、如何把切片“正确喂给”大模型

1. 切割策略(避免上下文断裂)

  • RecursiveCharacterTextSplitter:建议 chunk_size=800~1200 tokenschunk_overlap=150~250 tokens

  • 优先按章节/标题/段落/标点切;必要时用重叠跨界,保证关系动词和主客体尽量在同一块或覆盖于相邻块。

2. 每块输入的上下文打包(非常重要)

对第 i 块输入时,附加两类“轻记忆”:

  • 邻近窗口chunk_{i-1} 的末尾 23 句 + chunk_{i+1} 的开头 23 句(可选),防止句子跨段。

  • 全局实体“摘要索引”(Top-K):Pass-1 后面会有,但首轮没有的话,用逐块内缓存即可。第二轮跑时,把已发现的全局实体摘要(每个1行小卡片)作为上下文附加,帮助模型在 chunk 内就把别名/代词绑定到已有实体。

轻记忆示例(随 prompt 附加):

Known entities so far (id | name | type | key attrs):
E001 | Alice | Person | {nationality: "SG"}
E007 | Acme Corp | Org | {hq: "Singapore"}

3. 输出结构一定要提及级 + 证据

为防止幻觉与错配,要求模型定位原文证据

  • mention:text_span(摘录10~40字)、char_startchar_end

  • relation-mention:subj_mention_idobj_mention_idpredicateevidence_spanwhen/whereconfidence

这样可以在 Pass-3 聚合时做去重与冲突仲裁


三、跨切片构建“整体关系”的算法

1. 实体对齐(Entity Linking / Coreference)

候选生成(越快越好):

  1. 规则匹配:规范化字符串(去空白、大小写、全半角、公司后缀等),精确/近似匹配(可用 rapidfuzz)。

  2. 关键属性:注册号/ISBN/邮箱/URL/地名层级等。

  3. (可选)向量相似:name + type + attrs 生成嵌入,Top-K 候选。

候选裁决(LLM 只做“选择/NEW”而非自由发挥):

  • 把候选列表(最多5个)+ 当前 mention 的证据片段发给 LLM,强制返回 {link_to: <candidate_id | "NEW">, confidence}

  • "NEW",为其生成 canonical_id(哈希),并登记其aliases[]含当前 mention 文本。

裁决提示词见后文模板《链接判定提示词》。

2. 关系汇总(Consolidation)

拿到 subj_global_idobj_global_id 后,把同一对端点、同一关系类型的多条关系提及合并:

  • 关系键(src_id, predicate, tgt_id)

  • 证据聚合:保存去重后的 evidence[];时间/地点/数额等属性用最可信合并集合(如 dates[])。

  • 置信度聚合:例如 p_agg = 1 - ∏(1 - p_i);或按证据权重加权平均。

  • 时间范围:若多处出现时间,合成 valid_from/valid_todates[]

  • 来源与可追溯:保留 source_doc_idchunk_idmention_ids[]offsets[]

3. 生成反向/派生关系

  • 明确定义的可逆关系(如 ACQUIREDWAS_ACQUIRED_BY)自动生成逆向边。

  • 规则推理(如 “创始人 = 在 X 年 时创立 Y”)产出 FOUNDED;但把 derived=true 标记清楚,并把证据链(来源关系ID)挂在属性中。


四、代码关键片段

下面用 LangChain 0.2+ 写法,演示:

  1. 分块+批量送模;2) 提及级输出;3) 实体链接裁决;4) 关系汇总;5) 写入 Neo4j
    模型你可换 OpenAI / 本地 Ollama;此处保留两种写法(择一启用)。

# --- deps ---
# pip install langchain>=0.2 langchain-community langchain-core pydantic neo4j tiktoken rapidfuzz
# pip install openai  # 如用OpenAI
# pip install ollama  # 如用本地Ollamafrom __future__ import annotations
import os, json, hashlib
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from rapidfuzz import fuzz, process
from neo4j import GraphDatabaseUSE_OPENAI = False
if USE_OPENAI:from langchain_openai import ChatOpenAIllm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
else:from langchain_community.chat_models import ChatOllamallm = ChatOllama(model="qwen2.5:14b", temperature=0)# ---- Neo4j driver ----
driver = GraphDatabase.driver(os.getenv("NEO4J_URI", "bolt://localhost:7687"),auth=(os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
)# ---------- 数据结构 ----------
class Mention(BaseModel):id: strtext: strtype: str                   # 预测类型char_start: intchar_end: intattrs: Dict[str, Any] = {}confidence: float = 0.8class RelMention(BaseModel):id: strsubj_mention_id: strobj_mention_id: strpredicate: strevidence: strchar_start: intchar_end: intwhen: Optional[str] = Nonewhere: Optional[str] = Noneprops: Dict[str, Any] = {}confidence: float = 0.7class ChunkIE(BaseModel):chunk_id: intmentions: List[Mention] = []rel_mentions: List[RelMention] = []# ---------- 切割 ----------
def split_text(text: str, chunk_size=1200, chunk_overlap=200):splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap,separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""])return splitter.split_text(text)# ---------- 稳定ID ----------
def stable_id(*parts) -> str:key = "|".join([json.dumps(p, ensure_ascii=False, sort_keys=True) for p in parts])return hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]# ---------- 提及级抽取 Prompt ----------
IE_SYSTEM = """You are a precise IE system. Extract entity mentions and relation mentions.
Return ONLY JSON matching the schema. Use evidence spans from the text (char_start/end)."""IE_USER = """
DOC_ID: {doc_id}
CHUNK_ID: {chunk_id}/{total}
NEIGHBOR_CONTEXT:
<<<
{neighbor}
>>>
TEXT:
<<<
{chunk}
>>>Schema:
{schema}
Guidelines:
- Mentions: short span text from this chunk; include char_start/char_end offsets within THIS chunk.
- Relations: use subj_mention_id/obj_mention_id; keep predicate concise (ACQUIRED, FOUNDED, WORKS_AT, PART_OF, etc.)
- Include evidence (a short quote) and optional when/where if present.
- No fabrication. Only facts supported by this text.
"""ie_prompt = ChatPromptTemplate.from_messages([("system", IE_SYSTEM), ("user", IE_USER)])
ie_parser = JsonOutputParser(pydantic_object=ChunkIE)def build_neighbor_context(prev_tail: str, next_head: str) -> str:return (prev_tail or "").strip() + ("\n---\n" if (prev_tail and next_head) else "") + (next_head or "").strip()def extract_chunk_ie(doc_id: str, chunk: str, chunk_id: int, total: int, neighbor: str) -> ChunkIE:msg = ie_prompt.format_messages(doc_id=doc_id, chunk_id=chunk_id, total=total,neighbor=neighbor, chunk=chunk,schema=json.dumps(ChunkIE.schema(), ensure_ascii=False, indent=2))raw = llm.invoke(msg)text = getattr(raw, "content", str(raw))try:data = ie_parser.parse(text)except ValidationError:start, end = text.find("{"), text.rfind("}")data = ie_parser.parse(text[start:end+1])# 填mention id(若模型未填)for m in data.mentions:if not m.id:m.id = stable_id(doc_id, chunk_id, m.text, m.type, m.char_start, m.char_end)for r in data.rel_mentions:if not r.id:r.id = stable_id(doc_id, chunk_id, r.subj_mention_id, r.predicate, r.obj_mention_id, r.char_start, r.char_end)return data# ---------- 实体全局对齐 ----------
@dataclass
class CanonicalEntity:id: strname: strtype: strattrs: Dict[str, Any] = field(default_factory=dict)aliases: set = field(default_factory=set)def name_key(s: str) -> str:return s.lower().replace(" ", "").replace("(", "(").replace(")", ")")def candidates_for(entities: Dict[str, CanonicalEntity], name: str, etype: str, top=5) -> List[CanonicalEntity]:# 先快速筛:同type优先same_type = [e for e in entities.values() if e.type == etype]# 近似匹配choices = [(e, max(fuzz.partial_ratio(name, e.name), *(fuzz.partial_ratio(name, a) for a in e.aliases) )) for e in same_type]choices.sort(key=lambda x: x[1], reverse=True)return [e for e, score in choices[:top] if score >= 80]LINK_SYSTEM = "Choose the best candidate id for the mention or NEW. Return JSON: {\"link_to\": \"<id|NEW>\", \"confidence\": 0..1}"
LINK_USER = """
Mention:
name: {name}
type: {etype}
evidence: "{evidence}"Candidates:
{cands}Rules:
- If none is a clear match, answer NEW.
- Prefer exact/near-exact alias match; consider type and attrs if present.
"""link_prompt = ChatPromptTemplate.from_messages([("system", LINK_SYSTEM), ("user", LINK_USER)])def link_with_llm(name: str, etype: str, evidence: str, cands: List[CanonicalEntity]) -> Tuple[str, float]:if not cands:return "NEW", 1.0cands_txt = "\n".join([f"- id={c.id}, name={c.name}, type={c.type}, aliases={list(c.aliases)[:3]}" for c in cands])msg = link_prompt.format_messages(name=name, etype=etype, evidence=evidence[:120], cands=cands_txt)out = llm.invoke(msg)try:data = json.loads(out.content)return data.get("link_to", "NEW"), float(data.get("confidence", 0.7))except Exception:return "NEW", 0.6# ---------- 汇总关系 ----------
@dataclass
class RelAgg:src: strpredicate: strtgt: strevidences: list = field(default_factory=list)  # [{doc_id, chunk_id, text, offsets}]when: set = field(default_factory=set)where: set = field(default_factory=set)props: dict = field(default_factory=dict)confidence_list: list = field(default_factory=list)def combine_confidences(ps: List[float]) -> float:from math import prodps = [max(0.0, min(1.0, p)) for p in ps if p is not None]return 1 - prod([(1-p) for p in ps]) if ps else 0.0# ---------- 入库 ----------
def write_to_neo4j(entities: Dict[str, CanonicalEntity], relations: Dict[Tuple[str,str,str], RelAgg], source_doc_id: str):node_cypher = """UNWIND $nodes AS nMERGE (e:Entity {id:n.id})ON CREATE SET e.name=n.name, e.type=n.type, e.aliases=n.aliases, e += n.attrs, e.createdAt=timestamp(), e.updatedAt=timestamp()ON MATCH  SET e.name=coalesce(n.name, e.name), e.type=coalesce(n.type, e.type), e.aliases=apoc.coll.toSet(coalesce(e.aliases, []) + n.aliases), e += n.attrs, e.updatedAt=timestamp()"""rel_cypher = """UNWIND $rels AS rMATCH (s:Entity {id:r.src})MATCH (t:Entity {id:r.tgt})CALL apoc.merge.relationship(s, r.predicate, {}, {sourceDocId:$source_doc_id, evidences:r.evidences, when:r.when, where:r.where, props:r.props, confidence:r.confidence}, t)YIELD relSET rel.updatedAt=timestamp()RETURN count(rel)"""with driver.session() as sess:nodes = [{"id": e.id, "name": e.name, "type": e.type, "attrs": e.attrs, "aliases": list(e.aliases)} for e in entities.values()]sess.run(node_cypher, parameters={"nodes": nodes})rels = []for (src,pred,tgt), agg in relations.items():rels.append({"src": src, "predicate": pred, "tgt": tgt,"evidences": agg.evidences, "when": list(agg.when), "where": list(agg.where),"props": agg.props, "confidence": combine_confidences(agg.confidence_list)})if rels:sess.run(rel_cypher, parameters={"rels": rels, "source_doc_id": source_doc_id})# ---------- 主流程 ----------
def build_graph_from_text(doc_id: str, raw_text: str, source_doc_id: str):chunks = split_text(raw_text)total = len(chunks)# Pass-1: 分块抽取chunk_outputs: List[ChunkIE] = []for i, chunk in enumerate(chunks, 1):prev_tail = chunks[i-2][-300:] if i-2 >= 0 else ""next_head = chunks[i][:300] if i < total else ""neighbor = build_neighbor_context(prev_tail, next_head)ie = extract_chunk_ie(doc_id, chunk, i, total, neighbor)chunk_outputs.append(ie)# Pass-2: 实体对齐entities: Dict[str, CanonicalEntity] = {}mention2entity: Dict[str, str] = {}  # mention_id -> canonical_idfor ie in chunk_outputs:for m in ie.mentions:# 候选cands = candidates_for(entities, m.text, m.type, top=5)decision, conf = link_with_llm(m.text, m.type, m.text, cands)if decision == "NEW":cid = stable_id(m.text, m.type)entities[cid] = CanonicalEntity(id=cid, name=m.text, type=m.type, attrs=m.attrs, aliases={m.text})else:cid = decision# 记录别名entities[cid].aliases.add(m.text)mention2entity[m.id] = cid# Pass-3: 关系汇总relations: Dict[Tuple[str,str,str], RelAgg] = {}for idx, ie in enumerate(chunk_outputs, 1):for r in ie.rel_mentions:s = mention2entity.get(r.subj_mention_id)t = mention2entity.get(r.obj_mention_id)if not s or not t: continuekey = (s, r.predicate, t)if key not in relations:relations[key] = RelAgg(src=s, predicate=r.predicate, tgt=t)agg = relations[key]agg.evidences.append({"doc_id": doc_id, "chunk_id": idx,"text": r.evidence[:180], "offsets": [r.char_start, r.char_end]})if r.when: agg.when.add(r.when)if r.where: agg.where.add(r.where)agg.confidence_list.append(r.confidence)# 可选择把金额/比例等 props 合并为列表或选最大置信度那条for k, v in (r.props or {}).items():if k not in agg.props: agg.props[k] = set()agg.props[k].add(v)# 将 set 转 listfor agg in relations.values():for k,v in list(agg.props.items()):if isinstance(v, set): agg.props[k] = list(v)# Pass-4: 入库write_to_neo4j(entities, relations, source_doc_id=source_doc_id)

五、三份关键提示词模板

1. “分块抽取”提示词(上文 IE_SYSTEM/IE_USER 已给)

  • 要求只输出 JSON必须给 offsets,关系要用提及ID指向主客体。

  • predicate 控制在规范动词集合(自定义枚举)内,减少花样词。

示例枚举(可放到 System):

Entity types: Person, Org, Product, Event, Location, Law, Concept
Predicates: FOUNDED, ACQUIRED, MERGED_WITH, PART_OF, WORKS_AT, LOCATED_IN, PUBLISHED, REGULATED_BY, INVESTED_IN

2. “实体链接判定”提示词(上文 LINK_SYSTEM/LINK_USER 已给)

  • 输出只能是 {"link_to":"<id|NEW>","confidence":0..1}

  • 传入候选列表,LLM只做“选择或NEW”,避免自由发挥。

3. “全局校对/补关系(可选)”

对全局汇总后的草图(top-N 实体与关系),请 LLM 做一致性检查与遗漏补全(例如从证据语句推导 FOUNDED)。
注意:把新增的标为 derived=true 并保留“来源关系键”。


六、Neo4j 图模式与溯源

建议最小可行模式:

  • (:Entity {id, name, type, aliases[], ...})

  • (:Entity)-[:REL_TYPE {sourceDocId, evidences[], when[], where[], props{}, confidence, derived?}]->(:Entity)

  • (可选):Evidence 节点太重,先用数组属性即可;等数据量/法务需求提升后再拆。

约束与索引:

CREATE CONSTRAINT entity_id IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE;
CREATE INDEX ent_type IF NOT EXISTS FOR (n:Entity) ON (n.type);

七、质量与鲁棒性(实践要点)

  1. 证据先行:强制输出 evidence 与 offsets,后续任何纠错都有依据。

  2. 两阶段思路:先“列举事实片段”,再“汇总定论”。

  3. 别名池aliases 越丰富,链接越稳;可在图中周期性把相同邮箱/域名/注册号的节点合并。

  4. 去重与幂等:写库时用 MERGE + APOC apoc.merge.relationship;关系以 (src, pred, tgt) 作为幂等键。

  5. 跨切片缺词:使用邻近窗口Top-K 全局实体摘要当“轻记忆”。

  6. 冲突仲裁:置信度聚合 + “最高权重证据优先”。

  7. 可观测性:记录每块耗时/失败/重试次数、每文档提及/关系密度分布。

  8. 成本控制chain.batch(chunks, config={"max_concurrency": 6}) 并发;重复文本用缓存;只对“模糊提及”调用链接判定 LLM。


八、你可以直接复用的最小运行清单

  • 输入:原始文本字符串 raw_text

  • 步骤

    1. chunks = split_text(raw_text)

    2. 遍历 chunksextract_chunk_ie(...) → 累积 mentions, rel_mentions

    3. link_with_llm(...) 把 mention 绑定到全局实体

    4. 合并成全局关系 relations[(src,pred,tgt)]

    5. write_to_neo4j(...) MERGE 入库

  • 验证

    MATCH (s:Entity)-[r]->(t:Entity) RETURN s.name, type(r), t.name, r.confidence, r.evidences[0..2] LIMIT 20;
    
http://www.dtcms.com/a/351610.html

相关文章:

  • 食用油平台:油香里的生活哲学课
  • 使用 Vue 3 <script setup> 语法实现基于 token 的登录功能
  • 100种交易系统(4)顺势回调开仓
  • android多线程与线程间通信
  • python-多线程(笔记)(持续更新)
  • 基于dify+ollama+bge组合搭建本地知识库
  • 10分钟快速搭建 SkyWalking 服务
  • 【Apache MXNet】
  • Med-SA 论文总结
  • Apache Shiro基本使用指南
  • 基于SpringBoot的社团管理系统【2026最新】
  • 《C++ Primer 第五版》initializer_list 涉及到的范围 for 循环(range-based for) 的语义差别
  • 车载铁框矫平机:把“钣金诊所”开到工地上
  • 【软考论文】论原型开发方法及其应用
  • Ubuntu 24.04 LTS 桌面版安装问题记录
  • 2025年8月27日,七月初五,生活指南
  • Python包管理与安装机制详解
  • pytorch-利用letnet5框架深度学习手写数字识别
  • 漫谈《数字图像处理》之霍夫变换发展历程与演进脉络
  • 类似ant design和element ui的八大Vue的UI框架详解优雅草卓伊凡
  • (vue)el-progress左侧添加标签/名称
  • C++学习(4)模板与STL
  • 虚幻5引擎:我们是在创造世界,还是重新发现世界?
  • 8.26 review
  • 【大前端】React统计所有网络请求的成功率、失败率以及统一入口处理失败页面
  • Ubuntu22.04安装OBS
  • 嵌入式系统学习Day23(进程)
  • 2025.8.26总结
  • 【系统架构设计(二)】系统工程与信息系统基础中:信息系统基础
  • 数据结构青铜到王者第四话---LinkedList与链表(1)