当前位置: 首页 > news >正文

【Python】使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理

使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理

随着 AI 与大模型的广泛应用,向量数据库成为文本、图像、音频等非结构化数据管理的核心组件。本文将通过封装一个基于 Python 的 WeaviateHelper 类,演示如何结合本地向量模型(如 Ollama 部署的 nomic-embed-text)实现数据管理、插入、语义检索与混合搜索。


目录

  1. 技术栈概览
  2. 核心功能介绍
  3. 完整代码讲解
  4. 使用示例与效果

技术栈概览

技术说明
Weaviate向量数据库,支持多模态数据管理与语义搜索
Ollama本地大模型部署平台,用于运行 nomic-embed-text 向量模型
Loguru更友好的日志库,替代 Python 原生 logging
Python封装 Weaviate 功能逻辑,主导语言

核心功能介绍

WeaviateHelper 封装了常见的操作接口,包括但不限于:

  • 创建集合并配置向量化器;
  • 插入单条或多条数据;
  • 语义搜索(near_text);
  • 混合搜索(Hybrid Search);
  • 按属性删除集合中的数据;
  • 获取集合中的所有数据;
  • 删除集合、列出集合等管理操作。

完整代码讲解

以下是完整工具类代码(为保证可运行性,部分配置需根据实际环境调整):

pip install weaviate-client

1. 初始化连接

self.client = weaviate.connect_to_custom(http_host=http_host,http_port=http_port,grpc_host=grpc_host,grpc_port=grpc_port,skip_init_checks=True,http_secure=False,grpc_secure=False,
)

✅ 支持通过 IP + 端口连接 Weaviate 服务,同时允许使用本地部署的 Embedding 模型。


2. 创建集合并指定文本向量化模型

self.client.collections.create(weibo_collection,vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text",api_endpoint=self.ollama_endpoint),properties=[Property(name=name, data_type=data_type)for name, data_type in properties],
)

✅ 通过 text2vec_ollama 接入 Ollama 本地模型;避免依赖 OpenAI/Replicate API,有利于私有部署与成本控制。


3. 插入数据支持批量导入

collection = self.client.collections.get(weibo_collection)
collection.data.insert_many(data)

✅ 推荐使用 insert_many 批量导入,提升性能。


4. 混合搜索与语义搜索

# 混合搜索:结合 BM25 和语义向量
collection.query.hybrid(query=query,alpha=alpha,limit=limit,return_metadata=MetadataQuery(distance=True, score=True)
)
# 语义搜索:向量相似度搜索
collection.query.near_text(query=query,limit=limit,return_metadata=MetadataQuery(distance=True)
)

hybrid 模式可调整 alpha 参数平衡关键词权重与向量语义。


5. 数据删除与集合管理

# 删除属性匹配的数据
where_filter = Filter.by_property(property_name).like(property_value)
collection.data.delete_many(where=where_filter)
# 删除集合
self.client.collections.delete(weibo_collection)

✅ 精细化控制数据生命周期,适用于定期清理历史数据或临时集合。


使用示例与效果

示例代码片段:

helper = WeaviateHelper(...)
helper.create_collection("weibo_collection", [("platform", DataType.TEXT),("username", DataType.TEXT),("content", DataType.TEXT),
])helper.insert_data("weibo_collection", [{'platform': 'weibo','username': 'user01','content': '测试内容测试内容'
}])results = helper.semantic_search("weibo_collection", "内容")
for result in results:print(result.properties)

✅ 示例操作展示了集合创建 → 数据导入 → 语义搜索的完整流程。

完整代码

import traceback
import weaviate
from weaviate.classes.config import Configure, Property
from weaviate.classes.query import MetadataQuery, Filter
from loguru import loggerclass WeaviateHelper:def __init__(self, http_host, http_port, grpc_host, grpc_port, ollama_endpoint):self.client = weaviate.connect_to_custom(http_host=http_host,http_port=http_port,http_secure=False,grpc_host=grpc_host,grpc_port=grpc_port,grpc_secure=False,skip_init_checks=True,)self.ollama_endpoint = ollama_endpointdef create_collection(self, weibo_collection, properties):"""创建集合Args:weibo_collection (str): 集合名称properties (list): 属性列表,例如 [("platform", DataType.TEXT), ("username", DataType.TEXT)]"""try:self.client.collections.create(weibo_collection,vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text", api_endpoint=self.ollama_endpoint),properties=[Property(name=name, data_type=data_type)for name, data_type in properties],)except Exception as e:logger.warning(f"Collection '{weibo_collection}' already exists or error: {e}")def insert_data(self, weibo_collection, data):"""插入数据到集合Args:weibo_collection (str): 集合名称data (list): 要插入的数据列表"""collection = self.client.collections.get(weibo_collection)result = collection.data.insert_many(data)logger.info(f"Insertion response: {result}")return resultdef hybrid_search(self, weibo_collection, query, alpha=0.5, limit=1):"""混合搜索(结合向量搜索和关键词搜索)Args:weibo_collection (str): 集合名称query (str): 搜索查询alpha (float): 向量搜索和关键词搜索的权重比例limit (int): 返回结果数量"""collection = self.client.collections.get(weibo_collection)response = collection.query.hybrid(query=query,alpha=alpha,limit=limit,return_metadata=MetadataQuery(distance=True, score=True),)return response.objectsdef semantic_search(self, weibo_collection, query, limit=1):"""语义搜索Args:weibo_collection (str): 集合名称query (str): 搜索查询limit (int): 返回结果数量"""collection = self.client.collections.get(weibo_collection)response = collection.query.near_text(query=query, limit=limit, return_metadata=MetadataQuery(distance=True))return response.objectsdef close(self):"""关闭客户端连接"""self.client.close()def get_all_data(self, weibo_collection):"""获取集合中的所有数据Args:weibo_collection (str): 集合名称Returns:list: 集合中的所有对象"""collection = self.client.collections.get(weibo_collection)response = collection.query.fetch_objects(limit=10000)  # 设置较大的限制以获取所有数据return response.objectsdef delete_collection(self, weibo_collection):"""删除指定的集合Args:weibo_collection (str): 要删除的集合名称"""try:self.client.collections.delete(weibo_collection)logger.info(f"Collection '{weibo_collection}' has been deleted successfully")except Exception as e:logger.error(f"Error deleting collection '{weibo_collection}': {e}")def delete_collection_by_property_name(self, weibo_collection, property_name, property_value):"""根据属性名称删除集合中的数据Args:weibo_collection (str): 集合名称property_name (str): 属性名称property_value (str): 属性值"""try:collection = self.client.collections.get(weibo_collection)where_filter = Filter.by_property(property_name).like(property_value)collection.data.delete_many(where=where_filter)logger.info(f"Successfully deleted data where {property_name}={property_value}")except Exception as e:logger.error(f"Error deleting data: {traceback.format_exc()}")def get_all_collections(self):"""获取所有集合的列表Returns:list: 所有集合名称的列表"""try:collections = self.client.collections.list_all()logger.info(f"Found {len(collections)} collections")return collectionsexcept Exception as e:logger.error(f"Error getting collections: {traceback.format_exc()}")return []# 使用示例
if __name__ == "__main__":# 创建助手实例helper = WeaviateHelper(http_host="x.x.x.x",http_port=8080,grpc_host="x.x.x.x",grpc_port=50051,ollama_endpoint="http://x.x.x.x:11434",)try:# 获取所有集合示例collections = helper.get_all_collections()for collection in collections:logger.info(f"Collection name: {collection}")# 定义集合属性# properties = [#     ("platform", DataType.TEXT),#     ("username", DataType.TEXT),#     ("content", DataType.TEXT),# ]# 创建集合# helper.create_collection("weibo_collection", properties)# 插入数据# test_data = [{#     'platform': 'weibo',#     'username': 'username',#     'content': '测试内容测试内容测试'# }]# helper.insert_data("weibo_collection", test_data)# 混合搜索# results = helper.semantic_search("weibo_collection", "weibo")# for result in results:#     logger.info(f"帖子内容: {result.properties}")#     logger.info(f"语义距离: {result.metadata.distance}, BM25 分数: {result.metadata.score}")# 获取所有数据示例# all_data = helper.get_all_data("weibo_collection")# logger.info("所有数据:")# for item in all_data:#     logger.info(f"数据: {item.properties}")# 删除集合示例(取消注释以执行)# helper.delete_collection("weibo_collection")# 删除特定属性的数据示例# helper.delete_collection_by_property_name("weibo_collection", "platform", "weibo")# 获取所有数据示例# all_data = helper.get_all_data("weibo_collection")# logger.info("所有数据:")# for item in all_data:#     logger.info(f"数据: {item.properties}")except Exception:logger.error(traceback.format_exc())finally:# 关闭连接helper.close()

总结与建议

✅ 本文优势

  • 一站式封装 Weaviate 的常用接口;
  • 适配本地 Ollama 模型,降低外部依赖;
  • 完善的日志记录与异常捕捉;
  • 支持多种搜索方式,满足不同业务场景。

参考文档:https://weaviate.io/developers/weaviate

相关文章:

  • 服务器安装xfce桌面环境并通过浏览器操控
  • Vue大数据量前端性能优化策略
  • 为什么服务器突然变慢?从硬件到软件的排查方法
  • 【Linux笔记】防火墙firewall与相关实验(iptables、firewall-cmd、firewalld)
  • 服务器网络配置 netplan一个网口配置两个ip(双ip、辅助ip、别名IP别名)
  • 每日算法刷题计划Day12 5.21:leetcode不定长滑动窗口求最短/最长3道题,,用时1h40min(有点长了)
  • SQLMesh 宏操作符详解:@IF 的条件逻辑与高级应用
  • 使用 Matter.js 创建封闭箱体与里面的小球
  • Python学习Day1:安装
  • 数独求解器3.0 增加latex格式读取
  • 通过TDE透明加密实现SQL Server数据库免改造加密
  • SQL 数值计算全解析:ABS、CEIL、FLOOR与ROUND函数深度精讲
  • 红杉资本2025 AI 峰会之Cybersecurity
  • 一文理解TCP与UDP
  • [CSS3]百分比布局
  • 第十三届蓝桥杯国赛PythonA题解
  • epoll_wait未触发的小Bug
  • Axure应用交互设计:动态面板嵌套实现超强体验感菜单表头
  • DL00987-基于深度学习YOLOv11的红外鸟类目标检测含完整数据集
  • Java 项目管理工具:Maven 与 Gradle 的深度对比与选择
  • 开发网站的财务分析/域名申请的流程