【Python】使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理
使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理
随着 AI 与大模型的广泛应用,向量数据库成为文本、图像、音频等非结构化数据管理的核心组件。本文将通过封装一个基于 Python 的 WeaviateHelper
类,演示如何结合本地向量模型(如 Ollama 部署的 nomic-embed-text
)实现数据管理、插入、语义检索与混合搜索。
目录
- 技术栈概览
- 核心功能介绍
- 完整代码讲解
- 使用示例与效果
技术栈概览
技术 | 说明 |
---|---|
Weaviate | 向量数据库,支持多模态数据管理与语义搜索 |
Ollama | 本地大模型部署平台,用于运行 nomic-embed-text 向量模型 |
Loguru | 更友好的日志库,替代 Python 原生 logging |
Python | 封装 Weaviate 功能逻辑,主导语言 |
核心功能介绍
WeaviateHelper
封装了常见的操作接口,包括但不限于:
- 创建集合并配置向量化器;
- 插入单条或多条数据;
- 语义搜索(near_text);
- 混合搜索(Hybrid Search);
- 按属性删除集合中的数据;
- 获取集合中的所有数据;
- 删除集合、列出集合等管理操作。
完整代码讲解
以下是完整工具类代码(为保证可运行性,部分配置需根据实际环境调整):
pip install weaviate-client
1. 初始化连接
self.client = weaviate.connect_to_custom(http_host=http_host,http_port=http_port,grpc_host=grpc_host,grpc_port=grpc_port,skip_init_checks=True,http_secure=False,grpc_secure=False,
)
✅ 支持通过 IP + 端口连接 Weaviate 服务,同时允许使用本地部署的 Embedding 模型。
2. 创建集合并指定文本向量化模型
self.client.collections.create(weibo_collection,vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text",api_endpoint=self.ollama_endpoint),properties=[Property(name=name, data_type=data_type)for name, data_type in properties],
)
✅ 通过
text2vec_ollama
接入 Ollama 本地模型;避免依赖 OpenAI/Replicate API,有利于私有部署与成本控制。
3. 插入数据支持批量导入
collection = self.client.collections.get(weibo_collection)
collection.data.insert_many(data)
✅ 推荐使用
insert_many
批量导入,提升性能。
4. 混合搜索与语义搜索
# 混合搜索:结合 BM25 和语义向量
collection.query.hybrid(query=query,alpha=alpha,limit=limit,return_metadata=MetadataQuery(distance=True, score=True)
)
# 语义搜索:向量相似度搜索
collection.query.near_text(query=query,limit=limit,return_metadata=MetadataQuery(distance=True)
)
✅
hybrid
模式可调整alpha
参数平衡关键词权重与向量语义。
5. 数据删除与集合管理
# 删除属性匹配的数据
where_filter = Filter.by_property(property_name).like(property_value)
collection.data.delete_many(where=where_filter)
# 删除集合
self.client.collections.delete(weibo_collection)
✅ 精细化控制数据生命周期,适用于定期清理历史数据或临时集合。
使用示例与效果
示例代码片段:
helper = WeaviateHelper(...)
helper.create_collection("weibo_collection", [("platform", DataType.TEXT),("username", DataType.TEXT),("content", DataType.TEXT),
])helper.insert_data("weibo_collection", [{'platform': 'weibo','username': 'user01','content': '测试内容测试内容'
}])results = helper.semantic_search("weibo_collection", "内容")
for result in results:print(result.properties)
✅ 示例操作展示了集合创建 → 数据导入 → 语义搜索的完整流程。
完整代码
import traceback
import weaviate
from weaviate.classes.config import Configure, Property
from weaviate.classes.query import MetadataQuery, Filter
from loguru import loggerclass WeaviateHelper:def __init__(self, http_host, http_port, grpc_host, grpc_port, ollama_endpoint):self.client = weaviate.connect_to_custom(http_host=http_host,http_port=http_port,http_secure=False,grpc_host=grpc_host,grpc_port=grpc_port,grpc_secure=False,skip_init_checks=True,)self.ollama_endpoint = ollama_endpointdef create_collection(self, weibo_collection, properties):"""创建集合Args:weibo_collection (str): 集合名称properties (list): 属性列表,例如 [("platform", DataType.TEXT), ("username", DataType.TEXT)]"""try:self.client.collections.create(weibo_collection,vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text", api_endpoint=self.ollama_endpoint),properties=[Property(name=name, data_type=data_type)for name, data_type in properties],)except Exception as e:logger.warning(f"Collection '{weibo_collection}' already exists or error: {e}")def insert_data(self, weibo_collection, data):"""插入数据到集合Args:weibo_collection (str): 集合名称data (list): 要插入的数据列表"""collection = self.client.collections.get(weibo_collection)result = collection.data.insert_many(data)logger.info(f"Insertion response: {result}")return resultdef hybrid_search(self, weibo_collection, query, alpha=0.5, limit=1):"""混合搜索(结合向量搜索和关键词搜索)Args:weibo_collection (str): 集合名称query (str): 搜索查询alpha (float): 向量搜索和关键词搜索的权重比例limit (int): 返回结果数量"""collection = self.client.collections.get(weibo_collection)response = collection.query.hybrid(query=query,alpha=alpha,limit=limit,return_metadata=MetadataQuery(distance=True, score=True),)return response.objectsdef semantic_search(self, weibo_collection, query, limit=1):"""语义搜索Args:weibo_collection (str): 集合名称query (str): 搜索查询limit (int): 返回结果数量"""collection = self.client.collections.get(weibo_collection)response = collection.query.near_text(query=query, limit=limit, return_metadata=MetadataQuery(distance=True))return response.objectsdef close(self):"""关闭客户端连接"""self.client.close()def get_all_data(self, weibo_collection):"""获取集合中的所有数据Args:weibo_collection (str): 集合名称Returns:list: 集合中的所有对象"""collection = self.client.collections.get(weibo_collection)response = collection.query.fetch_objects(limit=10000) # 设置较大的限制以获取所有数据return response.objectsdef delete_collection(self, weibo_collection):"""删除指定的集合Args:weibo_collection (str): 要删除的集合名称"""try:self.client.collections.delete(weibo_collection)logger.info(f"Collection '{weibo_collection}' has been deleted successfully")except Exception as e:logger.error(f"Error deleting collection '{weibo_collection}': {e}")def delete_collection_by_property_name(self, weibo_collection, property_name, property_value):"""根据属性名称删除集合中的数据Args:weibo_collection (str): 集合名称property_name (str): 属性名称property_value (str): 属性值"""try:collection = self.client.collections.get(weibo_collection)where_filter = Filter.by_property(property_name).like(property_value)collection.data.delete_many(where=where_filter)logger.info(f"Successfully deleted data where {property_name}={property_value}")except Exception as e:logger.error(f"Error deleting data: {traceback.format_exc()}")def get_all_collections(self):"""获取所有集合的列表Returns:list: 所有集合名称的列表"""try:collections = self.client.collections.list_all()logger.info(f"Found {len(collections)} collections")return collectionsexcept Exception as e:logger.error(f"Error getting collections: {traceback.format_exc()}")return []# 使用示例
if __name__ == "__main__":# 创建助手实例helper = WeaviateHelper(http_host="x.x.x.x",http_port=8080,grpc_host="x.x.x.x",grpc_port=50051,ollama_endpoint="http://x.x.x.x:11434",)try:# 获取所有集合示例collections = helper.get_all_collections()for collection in collections:logger.info(f"Collection name: {collection}")# 定义集合属性# properties = [# ("platform", DataType.TEXT),# ("username", DataType.TEXT),# ("content", DataType.TEXT),# ]# 创建集合# helper.create_collection("weibo_collection", properties)# 插入数据# test_data = [{# 'platform': 'weibo',# 'username': 'username',# 'content': '测试内容测试内容测试'# }]# helper.insert_data("weibo_collection", test_data)# 混合搜索# results = helper.semantic_search("weibo_collection", "weibo")# for result in results:# logger.info(f"帖子内容: {result.properties}")# logger.info(f"语义距离: {result.metadata.distance}, BM25 分数: {result.metadata.score}")# 获取所有数据示例# all_data = helper.get_all_data("weibo_collection")# logger.info("所有数据:")# for item in all_data:# logger.info(f"数据: {item.properties}")# 删除集合示例(取消注释以执行)# helper.delete_collection("weibo_collection")# 删除特定属性的数据示例# helper.delete_collection_by_property_name("weibo_collection", "platform", "weibo")# 获取所有数据示例# all_data = helper.get_all_data("weibo_collection")# logger.info("所有数据:")# for item in all_data:# logger.info(f"数据: {item.properties}")except Exception:logger.error(traceback.format_exc())finally:# 关闭连接helper.close()
总结与建议
✅ 本文优势
- 一站式封装 Weaviate 的常用接口;
- 适配本地 Ollama 模型,降低外部依赖;
- 完善的日志记录与异常捕捉;
- 支持多种搜索方式,满足不同业务场景。
参考文档:https://weaviate.io/developers/weaviate