当前位置: 首页 > news >正文

Milvus/ES 插入方案对比

在 Python 中加载它并打印一个示例嵌入的维度。 

python -c "from sentence_transformers import SentenceTransformer; model = SentenceTransformer('/root/.cache/modelscope/hub/models/Qwen/Qwen3-Embedding-0.6B'); example_embedding = model.encode('test sentence'); print(example_embedding.shape[0])"

代码环境:

  • ubuntu_22.04_cu124_pytorch,版本 V1.0.2 
  • GPU:RTX3090/24G * 1 
  • CPU:Intel Xeon 8360Y * 12
  • 内存:32GB
  • 硬盘:系统盘 160GB 

Milvus数据库插入数据耗时

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import json
from sentence_transformers import SentenceTransformer
from tqdm import tqdm # 导入 tqdm
import time # 导入 time# Milvus 连接参数
MILVUS_HOST = "localhost" # 请根据您的Milvus实例修改
MILVUS_PORT = "19530"   # 请根据您的Milvus实例修改# Collection 参数
COLLECTION_NAME = "recipe_collection"
VECTOR_DIM = 1024 # 修改为 Qwen3-Embedding-0.6B 模型的实际输出维度# 文本嵌入模型
EMBEDDING_MODEL_NAME = "/root/.cache/modelscope/hub/models/Qwen/Qwen3-Embedding-0.6B"
model = None # 将在 insert_data 函数中加载def connect_milvus():"""连接到 Milvus"""connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)print(f"成功连接到 Milvus: {MILVUS_HOST}:{MILVUS_PORT}")def create_collection():"""创建 Milvus Collection 并定义 Schema"""if utility.has_collection(COLLECTION_NAME):print(f"Collection '{COLLECTION_NAME}' 已经存在,正在删除旧 Collection...")Collection(COLLECTION_NAME).drop()print(f"旧 Collection '{COLLECTION_NAME}' 删除成功.")fields = [FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512), # 增加长度以适应更长的标题FieldSchema(name="description", dtype=DataType.VARCHAR, max_length=8192), # 再次增加长度以适应更长的描述FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM)]schema = CollectionSchema(fields, description="食谱数据集")collection = Collection(name=COLLECTION_NAME, schema=schema)print(f"Collection '{COLLECTION_NAME}' 创建成功.")# 创建索引index_params = {"metric_type":"L2","index_type":"IVF_FLAT","params":{"nlist":128}}collection.create_index(field_name="vector", index_params=index_params)print("索引创建成功.")return collectiondef insert_data(collection, data_path):"""从 JSON 文件批量插入数据到 Milvus"""global modelif model is None:print(f"加载文本嵌入模型: {EMBEDDING_MODEL_NAME}...")model = SentenceTransformer(EMBEDDING_MODEL_NAME)print("模型加载完成.")print("正在读取数据...")with open(data_path, 'r', encoding='utf-8', errors='ignore') as f:recipes = []for line_num, line in enumerate(f):try:recipes.append(json.loads(line))except json.JSONDecodeError as e:print(f"警告: JSON 解析错误在文件 {data_path} 的第 {line_num + 1} 行: {e}. 跳过此行.")print(f"数据读取完成,共 {len(recipes)} 条记录.")if not recipes:print("错误: 未能从文件中读取到任何有效的食谱数据。请检查JSON文件格式和内容。")returnbatch_size = 1000total_inserted = 0start_total_time = time.time()for i in tqdm(range(0, len(recipes), batch_size), desc="插入进度"):batch_start_time = time.time()batch = recipes[i:i + batch_size]titles = [item.get("name", "") for item in batch] # 使用 'name' 作为标题descriptions = [item.get("description", "") for item in batch]# 组合标题和描述生成向量texts_to_embed = [f"{title} {description}" for title, description in zip(titles, descriptions)]embeddings = model.encode(texts_to_embed).tolist()entities = []for j, recipe in enumerate(batch):entities.append({"title": titles[j],"description": descriptions[j],"vector": embeddings[j]})if entities:collection.insert(entities)total_inserted += len(entities)collection.flush() # 每次插入后立即flush,确保数据写入磁盘batch_end_time = time.time()tqdm.write(f"批次 {i // batch_size + 1} 插入 {len(entities)} 条数据,耗时 {batch_end_time - batch_start_time:.2f} 秒.")end_total_time = time.time()print(f"总共插入 {total_inserted} 条数据. 总耗时 {end_total_time - start_total_time:.2f} 秒.")if __name__ == "__main__":connect_milvus()collection = create_collection()insert_data(collection, "/root/recipe_corpus_full.json") 
(base) root@9gpu-com:~# python '/root/milvus_insert.py'
成功连接到 Milvus: localhost:19530
Collection 'recipe_collection' 已经存在,正在删除旧 Collection...
旧 Collection 'recipe_collection' 删除成功.
Collection 'recipe_collection' 创建成功.
索引创建成功.
加载文本嵌入模型: /root/.cache/modelscope/hub/models/Qwen/Qwen3-Embedding-0.6B...
模型加载完成.
正在读取数据...
警告: JSON 解析错误在文件 /root/recipe_corpus_full.json 的第 211588 行: Unterminated string starting at: line 1 column 179 (char 178). 跳过此行.
数据读取完成,共 211587 条有效记录.
正在生成向量...
Batches: 100%|████████████████████████████████████| 6613/6613 [12:35<00:00,  8.75it/s]
向量生成完成.
插入进度: 100%|█████████████████████████████████████| 212/212 [38:45<00:00, 10.97s/it]
总共插入 211587 条数据. 总耗时 2325.87 秒.

ES数据库插入数据耗时

from elasticsearch import Elasticsearch, helpers
import json
from tqdm import tqdm
import time
import requests # 导入 requests
from sentence_transformers import SentenceTransformer # 导入 SentenceTransformer# Elasticsearch 连接参数
ES_HOST = "127.0.0.1" # 显式使用 IP 地址
ES_PORT = 9200
ES_INDEX = "recipes_with_vectors" # 修改索引名称,以区分之前的纯文本索引# 文本嵌入模型
EMBEDDING_MODEL_NAME = "/root/.cache/modelscope/hub/models/Qwen/Qwen3-Embedding-0.6B"
VECTOR_DIM = 1024 # Qwen3-Embedding-0.6B 模型的维度
model = None # 将在 insert_data_to_es 函数中加载def test_es_connection_with_requests():"""使用 requests 库测试 Elasticsearch 连接"""url = f"http://{ES_HOST}:{ES_PORT}"print(f"正在使用 requests 测试连接到 Elasticsearch: {url}...")try:response = requests.get(url, timeout=5)response.raise_for_status() # 如果状态码不是 2xx,则抛出 HTTPError 异常print(f"requests 成功连接到 Elasticsearch: {response.json()}")return Trueexcept requests.exceptions.RequestException as e:print(f"requests 连接 Elasticsearch 失败!详细错误: {e}")return Falsedef connect_elasticsearch():"""连接到 Elasticsearch"""es = Elasticsearch(f"http://{ES_HOST}:{ES_PORT}")try:if not es.ping():raise ValueError("Elasticsearch ping 失败!服务可能未运行或不可访问。")except Exception as e:raise ValueError(f"连接 Elasticsearch 失败!详细错误: {e}")print(f"成功连接到 Elasticsearch: {ES_HOST}:{ES_PORT}")return esdef create_es_index(es_client, index_name):"""创建 Elasticsearch 索引并定义 Mapping (包含 dense_vector) """if es_client.indices.exists(index=index_name):print(f"Elasticsearch 索引 '{index_name}' 已经存在,正在删除旧索引...")es_client.indices.delete(index=index_name)print(f"旧索引 '{index_name}' 删除成功.")mapping = {"properties": {"name": {"type": "text"},"dish": {"type": "keyword"},"description": {"type": "text"},"recipeIngredient": {"type": "text"},"recipeInstructions": {"type": "text"},"author": {"type": "keyword"},"keywords": {"type": "keyword"},"vector": {"type": "dense_vector", "dims": VECTOR_DIM} # 添加 dense_vector 字段}}print(f"正在使用 VECTOR_DIM: {VECTOR_DIM} 来创建 dense_vector 字段。") # 调试打印es_client.indices.create(index=index_name, body={"mappings": mapping}) # 重新使用 body 参数print(f"Elasticsearch 索引 '{index_name}' 创建成功.")def generate_actions(recipes_list, index_name, embeddings):"""生成用于批量插入的 action 字典,包含向量"""for i, doc in enumerate(recipes_list):yield {"_index": index_name,"_source": {"name": doc.get("name", ""),"dish": doc.get("dish", ""),"description": doc.get("description", ""),"recipeIngredient": doc.get("recipeIngredient", ""),"recipeInstructions": doc.get("recipeInstructions", ""),"author": doc.get("author", ""),"keywords": doc.get("keywords", ""),"vector": embeddings[i].tolist() # 添加向量}}def insert_data_to_es(es_client, data_path, index_name):"""从 JSON 文件批量插入数据到 Elasticsearch (带向量) """global modelif model is None:print(f"加载文本嵌入模型: {EMBEDDING_MODEL_NAME}...")model = SentenceTransformer(EMBEDDING_MODEL_NAME)print("模型加载完成.")print("正在读取数据...")valid_recipes = []with open(data_path, 'r', encoding='utf-8', errors='ignore') as f:for line_num, line in enumerate(f):try:valid_recipes.append(json.loads(line))except json.JSONDecodeError as e:print(f"警告: JSON 解析错误在文件 {data_path} 的第 {line_num + 1} 行: {e}. 跳过此行.")print(f"数据读取完成,共 {len(valid_recipes)} 条有效记录.")if not valid_recipes:print("错误: 未能从文件中读取到任何有效的食谱数据。请检查JSON文件格式和内容。")returnprint("正在生成向量...")texts_to_embed = [f"{doc.get('name', '')} {doc.get('description', '')}" for doc in valid_recipes]embeddings = model.encode(texts_to_embed, show_progress_bar=True) # 显示嵌入进度print("向量生成完成.")actions_generator = generate_actions(valid_recipes, index_name, embeddings)start_total_time = time.time()success_count = 0for ok, item in tqdm(helpers.streaming_bulk(es_client, actions_generator, chunk_size=1000, request_timeout=60),total=len(valid_recipes), desc="Elasticsearch 插入进度"):if not ok:print(f"插入失败: {item}")else:success_count += 1end_total_time = time.time()print(f"\n总共成功插入 {success_count} 条数据到 Elasticsearch. 总耗时 {end_total_time - start_total_time:.2f} 秒.")if __name__ == "__main__":try:if not test_es_connection_with_requests():print("requests 无法连接到 Elasticsearch,停止执行。")else:es = connect_elasticsearch()create_es_index(es, ES_INDEX)insert_data_to_es(es, "/root/recipe_corpus_full.json", ES_INDEX)except Exception as e:print(f"发生错误: {e}") 
(base) root@9gpu-com:~# python es_insert.py
正在使用 requests 测试连接到 Elasticsearch: http://127.0.0.1:9200...
requests 成功连接到 Elasticsearch: {'name': '9gpu-com', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'g-C_9E91Qp6E9WhcJGovhg', 'version': {'number': '7.11.1', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': 'ff17057114c2199c9c1bbecc727003a907c0db7a', 'build_date': '2021-02-15T13:44:09.394032Z', 'build_snapshot': False, 'lucene_version': '8.7.0', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}
成功连接到 Elasticsearch: 127.0.0.1:9200
正在使用 VECTOR_DIM: 1024 来创建 dense_vector 字段。
/root/es_insert.py:62: DeprecationWarning: The 'body' parameter is deprecated for the 'create' API and will be removed in a future version. Instead use API parameters directly. See https://github.com/elastic/elasticsearch-py/issues/1698 for more informationes_client.indices.create(index=index_name, body={"mappings": mapping}) # 重新使用 body 参数
Elasticsearch 索引 'recipes_with_vectors' 创建成功.
加载文本嵌入模型: /root/.cache/modelscope/hub/models/Qwen/Qwen3-Embedding-0.6B...
模型加载完成.
正在读取数据...
警告: JSON 解析错误在文件 /root/recipe_corpus_full.json 的第 211588 行: Unterminated string starting at: line 1 column 179 (char 178). 跳过此行.
数据读取完成,共 211587 条有效记录.
正在生成向量...
Batches: 100%|████████████████████████████████████| 6613/6613 [12:34<00:00,  8.76it/s]
向量生成完成.
Elasticsearch 插入进度: 100%|████████████████| 211587/211587 [05:35<00:00, 630.11it/s]总共成功插入 211587 条数据到 Elasticsearch. 总耗时 335.79 秒.

相关文章:

  • OD 算法题 B卷【最多团队】
  • SeaTunnel与Hive集成
  • Mkdocs 阅读时间统计插件
  • 华为云Flexus+DeepSeek征文 | 基于华为云ModelArts Studio搭建PandaWiki知识库问答系统
  • 极客时间《后端存储实战课》阅读笔记
  • linux 阻塞和非阻塞
  • 【一天一个知识点】RAG 是“问答脑”,智能体是“有行动力的大脑”
  • XP POWER EJ ET EY FJ FR 系列软件和驱动程序和手侧
  • 『uniapp』onThemeChange监听主题样式,动态主题不正确生效,样式被覆盖的坑
  • 如何提高电脑打字速度?
  • PHP Swoft2 框架精华系列:Controller 控制器组件解析,用法详解
  • leetcode 1432. 改变一个整数能得到的最大差值 中等
  • PCB设计教程【大师篇】stm32开发板PCB布线(电源部分)
  • 基于C_PSO与BP神经网络回归模型的特征选择方法研究(Python实现)
  • Nginx超快速入门
  • Vite:下一代前端构建工具的革命性突破
  • 对于数据库触发器自动执行的理解
  • 使用VSCode开发MCU,FreeRTOS进Hard_Fault调试
  • idea2024使用卡顿
  • golang-linux环境配置
  • 广东佛山南海区疫情最新情况/成都网站seo公司
  • 做网站的IDE/百度在线下载
  • 如何做网站模版/百度seo公司一路火
  • 培训web网站开发/如何建立电商平台
  • 用服务器建立网站/关键词林俊杰mp3在线听
  • 日本网站制作/关键词优化排名第一