当前位置: 首页 > news >正文

基于Python 批量导入实体与关系到 Neo4j 数据库的完整实践

引言

在知识图谱构建过程中,将结构化或半结构化数据导入到图数据库是关键步骤之一。本文将详细介绍如何使用 Python 解析自定义格式数据,并通过 Neo4j 官方驱动将实体和关系批量导入到 Neo4j 数据库中,帮助你快速搭建自己的知识图谱。我也在最后附上完整代码,可直接运行。

在知识图谱项目中,我们经常需要处理各种格式的数据并将其转换为图数据库中的实体(Entities)和关系(Relationships)。本项目需要解决:

  1. 解析自定义分隔符的原始数据,区分实体和关系
  2. 建立与 Neo4j 数据库的连接
  3. 将解析后的实体和关系批量写入 Neo4j
  4. 确保数据导入的准确性和效率

一、环境依赖与说明

  • 图数据库:Neo4j - 最流行的开源图数据库,适合存储和查询复杂关系数据
  • 编程语言:Python - 强大的数据处理能力和丰富的库支持
  • 驱动库:neo4j - Neo4j 官方 Python 驱动,提供高效的数据库交互能力

二代码解读

1. 导入依赖与配置连接信息

首先需要导入必要的库并配置 Neo4j 连接信息:

from neo4j import GraphDatabase# Neo4j 连接配置
uri = "neo4j://181.149.205.11:14802"
user = "neo4j"
password = "your_password"
database = "neo4j"

2. 定义数据分隔符

原始数据使用了自定义分隔符,我们需要先定义这些分隔符以便正确解析数据:

# 自定义分隔符
TUPLE_DELIMITER = "{tuple_delimiter}"
RECORD_DELIMITER = "{record_delimiter}"
COMPLETION_DELIMITER = "{completion_delimiter}"

3. 原始数据准备

这里是我们需要导入的原始数据,包含实体和关系信息:

# 原始数据
raw_data = '''
(entity{tuple_delimiter}费鲁扎巴德{tuple_delimiter}地理{tuple_delimiter}费鲁扎巴德将奥雷利亚人作为人质扣押)  
{record_delimiter}  
(entity{tuple_delimiter}奥雷利亚{tuple_delimiter}地理{tuple_delimiter}寻求释放人质的国家)  
{record_delimiter}  
(entity{tuple_delimiter}昆塔拉{tuple_delimiter}地理{tuple_delimiter}协商用资金交换人质的国家)  
{record_delimiter}  
{record_delimiter}  
(entity{tuple_delimiter}蒂鲁齐亚{tuple_delimiter}地理{tuple_delimiter}费鲁扎巴德的首都,奥雷利亚人被关押的地方)  
{record_delimiter}  
(entity{tuple_delimiter}克罗哈拉{tuple_delimiter}地理{tuple_delimiter}昆塔拉的首都城市)  
{record_delimiter}  
(entity{tuple_delimiter}卡申{tuple_delimiter}地理{tuple_delimiter}奥雷利亚的首都城市)  
{record_delimiter}  
(entity{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}人物{tuple_delimiter}曾在蒂鲁齐亚的阿尔哈米亚监狱服刑的奥雷利亚人)  
{record_delimiter}  
(entity{tuple_delimiter}阿尔哈米亚监狱{tuple_delimiter}地理{tuple_delimiter}位于蒂鲁齐亚的监狱)  
{record_delimiter}  
(entity{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}人物{tuple_delimiter}曾被扣为人质的奥雷利亚记者)  
{record_delimiter}  
(entity{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}人物{tuple_delimiter}布拉蒂纳斯国民和曾被扣为人质的环保主义者)  
{record_delimiter}  
(relationship{tuple_delimiter}费鲁扎巴德{tuple_delimiter}奥雷利亚{tuple_delimiter}费鲁扎巴德与奥雷利亚进行了人质交换谈判{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}昆塔拉{tuple_delimiter}奥雷利亚{tuple_delimiter}昆塔拉促成了费鲁扎巴德和奥雷利亚之间的人质交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}昆塔拉{tuple_delimiter}费鲁扎巴德{tuple_delimiter}昆塔拉促成了费鲁扎巴德和奥雷利亚之间的人质交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}阿尔哈米亚监狱{tuple_delimiter}萨缪尔·纳马拉是阿尔哈米亚监狱的囚犯{tuple_delimiter}0.8)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}萨缪尔·纳马拉和梅吉·塔兹巴在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}萨缪尔·纳马拉和杜尔克·巴塔格拉尼在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}梅吉·塔兹巴和杜尔克·巴塔格拉尼在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}费鲁扎巴德{tuple_delimiter}萨缪尔·纳马拉是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}费鲁扎巴德{tuple_delimiter}梅吉·塔兹巴是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}费鲁扎巴德{tuple_delimiter}杜尔克·巴塔格拉尼是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{completion_delimiter}
'''.format(tuple_delimiter=TUPLE_DELIMITER,record_delimiter=RECORD_DELIMITER,completion_delimiter=COMPLETION_DELIMITER
)

4. 数据解析函数

接下来实现数据解析函数,将原始数据转换为结构化的实体和关系:

def parse_data(data):entities = []relationships = []# 移除完成符record_data = data.replace(COMPLETION_DELIMITER, '') # 提取每条记录并去除空行records = [r.strip() for r in record_data.strip().split(RECORD_DELIMITER) if r.strip()]for record in records:# 去掉最外层的括号content = record[1:-1].strip() if record.startswith('(') else record.strip()parts = content.split(TUPLE_DELIMITER)if parts[0] == "entity":entities.append({"entity_name": parts[1].strip(),"entity_type": parts[2].strip(),"entity_description": parts[3].strip()})elif parts[0] == "relationship":relationships.append({"source_entity": parts[1].strip(),"target_entity": parts[2].strip(),"relationship_description": parts[3].strip(),"relationship_strength": float(parts[4].strip())})return entities, relationships

解析函数的工作流程:

  1. 移除数据中的完成符
  2. 按记录分隔符拆分数据,过滤空记录
  3. 处理每条记录,去除外层括号
  4. 按元组分隔符拆分记录内容
  5. 根据记录类型(实体/关系)将数据存入不同列表
  6. 返回解析后的实体和关系列表

5. Neo4j 数据写入类

实现一个类来处理与 Neo4j 的交互,包括连接管理和数据写入:

class Neo4jGraphData:def __init__(self, uri, user, password, database="neo4j"):"""初始化数据库连接"""self.driver = GraphDatabase.driver(uri, auth=(user, password))self.database = databasedef close(self):"""关闭数据库连接"""self.driver.close()def _create_entities(self, tx, entities):"""创建实体节点"""for entity in entities:tx.run("""MERGE (e:Entity {name: $name})SET e.type = $type, e.description = $description""",name=entity["entity_name"],type=entity["entity_type"],description=entity["entity_description"])def _create_relationships(self, tx, relationships):"""创建实体之间的关系"""for rel in relationships:tx.run("""MATCH (a:Entity {name: $source})MATCH (b:Entity {name: $target})MERGE (a)-[r:relationships]->(b)SET r.description = $description, r.strength = $strength""",source=rel["source_entity"],target=rel["target_entity"],description=rel["relationship_description"],strength=rel["relationship_strength"])def write_entities_and_relationships(self, entities, relationships):"""写入实体和关系到数据库"""with self.driver.session(database=self.database) as session:session.write_transaction(self._create_entities, entities)session.write_transaction(self._create_relationships, relationships)print("✅ 实体和关系已成功写入 Neo4j")

这个类的核心设计点:

  • 使用 MERGE 而非 CREATE 确保不会创建重复节点
  • 将实体和关系的创建分开处理,逻辑更清晰
  • 使用事务(transaction)确保数据一致性
  • 提供明确的连接关闭方法,避免资源泄露

6. 主函数执行流程

最后实现主函数,完成数据解析和导入的完整流程:

if __name__ == "__main__":# 解析数据entities, relationships = parse_data(raw_data)# 打印解析结果(可选)print("Entities:", entities)print("Relationships:", relationships)# 初始化并连接Neo4j数据库writer = Neo4jGraphData(uri, user, password, database)# 写入数据if entities or relationships:writer.write_entities_and_relationships(entities, relationships)else:print("❌ 未解析到任何实体或关系,请检查数据格式。")# 关闭连接writer.close()

三、代码解析与关键技术点

1. 数据解析策略

原始数据采用了类似元组的格式,每条记录用括号包裹,内部使用自定义分隔符分割字段。解析过程中:

  • 首先按记录分隔符拆分数据
  • 对每条记录进行处理,提取核心内容
  • 根据记录类型(entity/relationship)构建不同结构的字典
  • 保留实体类型、描述和关系强度等重要属性

2. Neo4j 写入策略

在写入 Neo4j 时,我们采用了以下策略确保数据质量:

  • 使用 MERGE 语句:避免创建重复节点,当节点已存在时仅更新属性
  • 分步骤写入:先写入所有实体,再写入关系,确保关系引用的实体已存在
  • 事务处理:将所有实体写入作为一个事务,所有关系写入作为另一个事务,保证数据一致性
  • 属性丰富:为节点添加类型和描述属性,为关系添加描述和强度属性

3. 连接管理

通过类的方式封装数据库连接,确保资源正确释放:

  • __init__ 方法中创建连接
  • 提供 close 方法关闭连接
  • 使用 with 语句管理会话生命周期

四、运行结果与验证

运行程序后,如果一切正常,会输出:

✅ 实体和关系已成功写入 Neo4j

你可以通过 Neo4j Browser 执行查询验证数据是否正确导入:

// 查询所有实体
MATCH (n) RETURN n LIMIT 25// 查询所有关系
MATCH ()-[r]->() RETURN r LIMIT 25// 查询特定实体及其关系
MATCH (n:Entity {name: "费鲁扎巴德"})-[r]->(m) RETURN n, r, m

结果图如下:
在这里插入图片描述

五、完整代码

from neo4j import GraphDatabase# Neo4j 连接配置
uri = "neo4j://181.149.205.11:14802"
user = "neo4j"
password = "your_password"
database = "neo4j"# 自定义分隔符
TUPLE_DELIMITER = "{tuple_delimiter}"
RECORD_DELIMITER = "{record_delimiter}"
COMPLETION_DELIMITER = "{completion_delimiter}"# 原始数据(注意:这里保留原始格式)
raw_data = '''(entity{tuple_delimiter}费鲁扎巴德{tuple_delimiter}地理{tuple_delimiter}费鲁扎巴德将奥雷利亚人作为人质扣押)  
{record_delimiter}  
(entity{tuple_delimiter}奥雷利亚{tuple_delimiter}地理{tuple_delimiter}寻求释放人质的国家)  
{record_delimiter}  
(entity{tuple_delimiter}昆塔拉{tuple_delimiter}地理{tuple_delimiter}协商用资金交换人质的国家)  
{record_delimiter}  
{record_delimiter}  
(entity{tuple_delimiter}蒂鲁齐亚{tuple_delimiter}地理{tuple_delimiter}费鲁扎巴德的首都,奥雷利亚人被关押的地方)  
{record_delimiter}  
(entity{tuple_delimiter}克罗哈拉{tuple_delimiter}地理{tuple_delimiter}昆塔拉的首都城市)  
{record_delimiter}  
(entity{tuple_delimiter}卡申{tuple_delimiter}地理{tuple_delimiter}奥雷利亚的首都城市)  
{record_delimiter}  
(entity{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}人物{tuple_delimiter}曾在蒂鲁齐亚的阿尔哈米亚监狱服刑的奥雷利亚人)  
{record_delimiter}  
(entity{tuple_delimiter}阿尔哈米亚监狱{tuple_delimiter}地理{tuple_delimiter}位于蒂鲁齐亚的监狱)  
{record_delimiter}  
(entity{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}人物{tuple_delimiter}曾被扣为人质的奥雷利亚记者)  
{record_delimiter}  
(entity{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}人物{tuple_delimiter}布拉蒂纳斯国民和曾被扣为人质的环保主义者)  
{record_delimiter}  
(relationship{tuple_delimiter}费鲁扎巴德{tuple_delimiter}奥雷利亚{tuple_delimiter}费鲁扎巴德与奥雷利亚进行了人质交换谈判{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}昆塔拉{tuple_delimiter}奥雷利亚{tuple_delimiter}昆塔拉促成了费鲁扎巴德和奥雷利亚之间的人质交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}昆塔拉{tuple_delimiter}费鲁扎巴德{tuple_delimiter}昆塔拉促成了费鲁扎巴德和奥雷利亚之间的人质交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}阿尔哈米亚监狱{tuple_delimiter}萨缪尔·纳马拉是阿尔哈米亚监狱的囚犯{tuple_delimiter}0.8)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}萨缪尔·纳马拉和梅吉·塔兹巴在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}萨缪尔·纳马拉和杜尔克·巴塔格拉尼在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}梅吉·塔兹巴和杜尔克·巴塔格拉尼在同一次人质释放中被交换{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}萨缪尔·纳马拉{tuple_delimiter}费鲁扎巴德{tuple_delimiter}萨缪尔·纳马拉是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}梅吉·塔兹巴{tuple_delimiter}费鲁扎巴德{tuple_delimiter}梅吉·塔兹巴是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{record_delimiter}  
(relationship{tuple_delimiter}杜尔克·巴塔格拉尼{tuple_delimiter}费鲁扎巴德{tuple_delimiter}杜尔克·巴塔格拉尼是费鲁扎巴德的人质{tuple_delimiter}0.2)  
{completion_delimiter}'''.format(tuple_delimiter=TUPLE_DELIMITER,record_delimiter=RECORD_DELIMITER,completion_delimiter=COMPLETION_DELIMITER
)# ✅ 不修改 parse_data(data)
def parse_data(data):entities = []relationships = []record_data = data.replace(COMPLETION_DELIMITER, '') records = [r.strip() for r in record_data.strip().split(RECORD_DELIMITER) if r.strip()] # 先提取每条记录再去除空行for record in records:content = record[1:-1].strip() if record.startswith('(') else record.strip() # 去掉最外层的括号parts = content.split(TUPLE_DELIMITER)if parts[0] == "entity":entities.append({"entity_name": parts[1].strip(),"entity_type": parts[2].strip(),"entity_description": parts[3].strip()})elif parts[0] == "relationship":relationships.append({"source_entity": parts[1].strip(),"target_entity": parts[2].strip(),"relationship_description": parts[3].strip(),"relationship_strength": float(parts[4].strip())})return entities, relationships# ✅ 使用 Class 实现写入 Neo4j
class Neo4jGraphData:def __init__(self, uri, user, password, database="neo4j"):self.driver = GraphDatabase.driver(uri, auth=(user, password))self.database = databasedef close(self):"""关闭数据库连接"""self.driver.close()def _create_entities(self, tx, entities):"""创建实体节点,如果想添加其它属性,通过这个SET e.type = $type, e.description = $description设置"""for entity in entities:tx.run("""MERGE (e:Entity {name: $name})SET e.type = $type, e.description = $description""",name=entity["entity_name"],type=entity["entity_type"],description=entity["entity_description"])def _create_relationships(self, tx, relationships):"""创建实体之间的关系,如果需要添加其它属性,通过SET r.description = $description, r.strength = $strength设置"""for rel in relationships:tx.run("""MATCH (a:Entity {name: $source})MATCH (b:Entity {name: $target})MERGE (a)-[r:relationships]->(b)SET r.description = $description, r.strength = $strength""",source=rel["source_entity"],target=rel["target_entity"],description=rel["relationship_description"],strength=rel["relationship_strength"])def write_entities_and_relationships(self, entities, relationships):"""写入实体和关系"""with self.driver.session(database=self.database) as session:session.write_transaction(self._create_entities, entities)session.write_transaction(self._create_relationships, relationships)print("✅ 实体和关系已成功写入 Neo4j")if __name__ == "__main__":entities, relationships = parse_data(raw_data)print("Entities:", entities)print("Relationships:", relationships)writer = Neo4jGraphData(uri, user, password, database) # 初始化并连接Neo4j数据库if entities or relationships:writer.write_entities_and_relationships(entities, relationships)else:print("❌ 未解析到任何实体或关系,请检查数据格式。")writer.close()  # 
http://www.dtcms.com/a/313587.html

相关文章:

  • jconsole与jvisualvm监控
  • 数据结构基础 - 平衡二叉树
  • async/await和Promise之间的关系是什么?(补充)
  • NSA稀疏注意力深度解析:DeepSeek如何将Transformer复杂度从O(N²)降至线性,实现9倍训练加速
  • 能表示旋转的矩阵是一个流形吗?
  • 【大模型篇】:GPT-Llama-Qwen-Deepseek
  • 数据结构重点内容
  • Go语言实战案例:多协程并发下载网页内容
  • 《 ThreadLocal 工作机制深度解析:高并发场景的利与弊》
  • Mysql深入学习:InnoDB执行引擎篇
  • C++ : 反向迭代器的模拟实现
  • 【图像处理基石】如何使用deepseek进行图像质量的分析?
  • vllm0.8.5:思维链(Chain-of-Thought, CoT)微调模型的输出结果包括</think>,提供一种关闭思考过程的方法
  • MCP协议:CAD地图应用的AI智能化解决方案(唯杰地图MCP)
  • 【数据结构与算法】数据结构初阶:排序内容加餐(二)——文件归并排序思路详解(附代码实现)
  • 【C++】面向对象编程
  • C语言(长期更新)第8讲 函数递归
  • 网络通信与Socket套接字详解
  • C#模式匹配用法与总结
  • 网页 URL 转 Markdown API 接口
  • 大模型中的Token和Tokenizer:核心概念解析
  • 【Unity3D实例-功能-镜头】俯视角
  • MySQL极简安装挑战
  • 数据结构代码
  • IO流-数据流
  • 语义分割--deeplabV3+
  • 企业级AI Agent构建实践:从理论到落地的完整指南
  • 机器学习中的经典算法
  • 算法讲解--最大连续1的个数
  • C++异常与智能指针,资源泄露