Milvus向量数据库
Milvus 是一个向量数据库。专门用来存储、管理和搜索“向量”的数据库。
使用 Zilliz Cloud 释放 Milvus 向量数据库潜能
什么是向量呢?
在人工智能(AI)领域,文本、图片、音频、视频等各种复杂的数据,都可以通过特定的AI模型转换成一串数字,也就是“向量”。这个向量可以理解为这些数据的“数字指纹”或者“特征表示”。比如,两张相似的图片,它们转换成的向量在数学意义上也会比较接近。
Milvus 主要用来干什么?
- 存储海量向量数据:当你有大量的这类“数字指纹”需要保存时,Milvus 可以高效地存储它们。
- 快速搜索相似向量:这是 Milvus 最核心的功能。比如,你给它一张图片的向量,它能非常快地从数据库里找出跟这张图片最相似的其他图片的向量。这在图片搜索、推荐系统、以图搜图等场景非常有用。
- 支持 AI 应用:通过高效的向量存储和检索能力,Milvus 为各种AI应用提供了底层的数据支持,比如:
- 相似性搜索:如图片搜索、视频搜索、音频搜索、文本语义搜索(找到意思相近的句子)。
- 推荐系统:根据用户的兴趣向量推荐相似的商品或内容。
- 问答系统/聊天机器人:匹配用户问题和知识库中最相关答案的向量。
- 异常检测:找出与正常数据向量模式差异较大的异常数据。
- 新药发现、基因序列分析等更专业的领域。
Milvus 2.6 版本的三大升级方向:更省钱、更快、更强!目标是让这个向量数据库更省成本、搜索更快更准、系统架构更稳定强大。对于程序员来说,可以理解为 Milvus 正在努力变得更“好用、实用、耐用”。
一、又省钱又提速:性能优化,钱包友好
Milvus 2.6 在降本增效方面取得了显著进展,主要体现在以下几个方面:
-
RabitQ 量化技术: 设想您拥有大量高维向量数据(如同高清大图),它们通常非常占用内存。RabitQ 技术运用先进的压缩算法,能够大幅度缩小这些高维向量的体积(理论上内存占用可压缩至原先的 1/32)。同时,通过其独特的“精修”(Refine)技术,在显著降低内存占用的前提下,确保搜索准确率基本不受影响,甚至能将每秒查询率(QPS)提升3倍以上。举例来说,以往存储100张“大图”需要大量空间,现在则可以用极小的空间存储,并且检索速度更快。这对于需要处理海量数据的应用场景(如图片搜索、推荐系统)无疑是一大利好。
-
Sparse-BM25 性能起飞: BM25 是一种经典的文本搜索算法。Milvus 2.6 对该算法进行了深度优化,使其在特定场景下的搜索速度能够显著超越广为人知的 Elasticsearch (ES),在某些数据集上甚至能实现数倍的QPS提升。如果您的应用涉及文本搜索,此项升级将带来显著的性能飞跃。
-
JSON Path Index 加速动态字段过滤: 在实际应用中,数据结构往往并非固定不变(即动态字段),其中可能嵌套着复杂的 JSON 数据(例如用户画像信息)。以往,在这些 JSON 数据中依据特定条件进行过滤,效率较低。Milvus 2.6 新增了 JSON Path Index 功能,允许针对 JSON 数据内的特定路径(如用户的城市信息)创建索引。如此一来,当您需要根据这些动态信息筛选搜索结果时(例如“查找所有位于北京的AI爱好者”),查询速度将得到极大提升,响应时间从数百毫秒骤降至几毫秒级别。
-
支持 Int 向量存储: 简而言之,Milvus 2.6 开始支持使用更小的数据类型(整型)来存储向量。这一特性有助于进一步降低存储成本,并能更好地适配那些追求极致性价比和快速推理的模型服务。
二、搜索更强更好用:功能升级,体验更佳
除了性能提升和成本降低,Milvus 2.6 在搜索功能和用户体验方面也进行了诸多增强:
-
分词器 (Analyzer/Tokenizer) 功能更强大: 在文本搜索领域,如何将语句准确切分成词汇(分词)至关重要,它直接影响最终的搜索效果。Milvus 2.6 强化了其分词功能,增加了对更多语种(如日语、韩语)的支持,并允许用户自定义词典,从而实现更精细的文本处理和更精准的搜索结果。
-
短语匹配 (Phrase Match) 更精准: 以往搜索“人工智能最新进展”与“最新人工智能进展”,得到的结果可能差异不大。然而,Phrase Match 功能更加关注词语的顺序和完整性,能够更准确地捕捉用户的搜索意图。例如,在法律文书检索中搜索“合同违约责任”时,词语的顺序就显得尤为重要。此功能对于需要高精度匹配的应用场景(如智能问答、用户评论分析)非常有价值。
-
时间衰减重排序 (Decay Function): 在许多场景下,信息的时效性非常关键,例如新闻、商品信息、社交媒体帖子等。Decay Function 允许您在获得初步搜索结果后,根据信息的新旧程度对其进行重新排序,将更新、更相关的内容置于更靠前的位置。这样,用户就能更容易地找到“当前最有价值”的信息。
-
集成第三方模型 (Data-In, Data-Out): 以往使用 Milvus 时,用户可能需要先自行将文本、图片等原始数据转换为向量数据,然后再导入 Milvus。现在,Milvus 2.6 能够更便捷地集成第三方模型服务(如 OpenAI、Google Vertex AI 等)。您可以直接将原始文本输入 Milvus,系统会自动调用这些外部模型将其转换为向量并执行搜索,整个流程更加顺畅,显著降低了开发复杂度。
三、架构更稳更灵活:底层优化,支撑未来
为应对日益增长的数据量和日趋复杂的查询需求,Milvus 2.6 在其底层架构层面也实施了多项优化:
-
冷热数据分层存储 (Tiered Storage): 数据天然存在“冷热”之分。经常被访问的热数据存储在高速存储介质(如SSD)上以保证访问性能,而不常用的冷数据则会自动迁移至成本更低的存储介质(如对象存储)中,以优化存储成本。整个过程对用户透明,查询时系统会自动拉取冷数据,实现了性能与成本的有效平衡。
-
实时流处理服务 (Streaming Service): 现代应用对数据的实时处理能力要求越来越高,例如金融反欺诈、实时推荐系统等。Milvus 2.6 引入了全新的 Streaming Service,增强了对实时数据流的接入、索引和查询能力。简而言之,这使得 Milvus 不仅能高效处理静态的大数据集,也能从容应对持续不断的实时数据流。新架构下,数据写入后能更快被检索到,系统故障恢复速度也得到提升,整体扩展性更强。
-
支持十万级别 Collection: Collection 可类比为数据库中的数据表。支持更大数量级的 Collection 意味着 Milvus 能更好地满足拥有大量租户或复杂业务隔离需求的场景。
-
Woodpecker 云原生日志系统: 这是由 Zilliz 自主研发、专为云环境设计的轻量级日志系统。它拥有更佳的写入性能,并且不占用本地磁盘空间,更适应云原生部署。
-
优化文件格式和集群协调: 通过采用优化的存储结构和提升效率(File format v2),减少了不必要的IO操作和内存占用。同时,集群内部的协调机制(Coord Merge)也得到了优化,进一步提升了大规模部署场景下的稳定性和运行效率。
总而言之,Milvus 2.6 通过一系列技术创新和深度优化,致力于为开发者和企业提供一个更经济、更高效、更强大、更易用的向量数据库解决方案,助力用户在AI时代更好地挖掘数据价值。
Milvus 是一个开源的向量数据库,专门用于存储、索引和管理由深度学习模型或其他算法生成的大规模嵌入向量。使用 Milvus 可以高效地进行向量相似性搜索和分析。
下面将给出一个使用 Python SDK (pymilvus
) 操作 Milvus 的具体例子。这个例子会涵盖连接 Milvus 服务、创建集合(Collection)、插入向量、创建索引、搜索向量以及删除集合等基本操作。
前提条件:
-
安装 Milvus 服务:
你需要一个运行中的 Milvus 实例。最简单的方式是使用 Docker:# 拉取最新的 Milvus standalone 镜像 docker pull milvusdb/milvus:v2.4.1-standalone # 你可以选择更新的版本 # 启动 Milvus standalone 容器 docker run -d --name milvus_standalone \ -p 19530:19530 \ -p 9091:9091 \ -v /tmp/milvus/data:/var/lib/milvus/data \ milvusdb/milvus:v2.4.1-standalone
你也可以考虑 Milvus Lite (Python 本地版本,无需 Docker,适合快速体验):
pip install milvus-lite
-
安装 PyMilvus SDK 和依赖:
pip install pymilvus numpy
Python 代码示例:
import random
import numpy as np
from pymilvus import (connections,utility,FieldSchema, CollectionSchema, DataType,Collection
)# --- 0. 基本配置 ---
_HOST = 'localhost' # 如果使用 Docker 启动的 Milvus 服务
_PORT = '19530' # Milvus 默认端口
# _HOST = 'your-milvus-lite-uri.db' # 如果使用 Milvus Lite,URI 可能不同_COLLECTION_NAME = 'hello_milvus_demo'
_ID_FIELD = 'id_field' # 主键字段名
_VECTOR_FIELD = 'vector_field' # 向量字段名
_SCALAR_FIELD = 'scalar_field' # 示例标量字段名_DIM = 8 # 向量维度 (示例中使用8维,实际应用中通常是128, 256, 512, 768等)
_NUM_ENTITIES = 3000 # 插入的向量数量
_TOPK = 3 # 搜索时返回最相似的 TopK 个结果# --- 1. 连接 Milvus 服务 ---
print(f"开始连接 Milvus 服务 ({_HOST}:{_PORT})...")
try:connections.connect(host=_HOST, port=_PORT)print("成功连接 Milvus 服务!")
except Exception as e:print(f"连接 Milvus 服务失败: {e}")exit()# --- 2. 创建集合 (Collection) ---
# 首先检查集合是否存在,如果存在则删除(方便重复运行脚本)
if utility.has_collection(_COLLECTION_NAME):print(f"集合 '{_COLLECTION_NAME}' 已存在,将删除之...")utility.drop_collection(_COLLECTION_NAME)print(f"集合 '{_COLLECTION_NAME}' 已删除。")print(f"开始创建集合 '{_COLLECTION_NAME}'...")
# 定义字段 (Schema)
# 主键字段
field_id = FieldSchema(name=_ID_FIELD,dtype=DataType.INT64,is_primary=True,auto_id=True # 通常推荐自动生成ID,也可以设为False手动指定
)
# 向量字段
field_vector = FieldSchema(name=_VECTOR_FIELD,dtype=DataType.FLOAT_VECTOR,dim=_DIM
)
# 示例标量字段 (可选)
field_scalar = FieldSchema(name=_SCALAR_FIELD,dtype=DataType.INT64, # 可以是 INT64, FLOAT, VARCHAR 等
)# 定义集合的 Schema
schema = CollectionSchema(fields=[field_id, field_vector, field_scalar],description="这是一个 Hello Milvus 演示集合",enable_dynamic_field=False # 是否允许动态字段,2.x 版本新增功能
)# 创建集合
collection = Collection(_COLLECTION_NAME, schema=schema, consistency_level="Strong") # Strong, Bounded, Eventually, Session
print(f"集合 '{_COLLECTION_NAME}' 创建成功!")
print(f"当前 Milvus 中的集合列表: {utility.list_collections()}")# --- 3. 准备并插入数据 ---
print(f"准备插入 {_NUM_ENTITIES} 条数据...")
# 随机生成一些向量数据和标量数据
rng = np.random.default_rng(seed=1234) # For reproducibility
embeddings = rng.random((_NUM_ENTITIES, _DIM)).astype(np.float32)
scalar_data = [random.randint(0, 1000) for _ in range(_NUM_ENTITIES)]# 如果 auto_id=False,你需要提供主键ID数据
# ids = [i for i in range(_NUM_ENTITIES)]
# entities = [ids, embeddings, scalar_data]# 当 auto_id=True 时,不需要提供主键ID
entities = [embeddings, # 对应 vector_fieldscalar_data # 对应 scalar_field
]print("开始插入数据...")
insert_result = collection.insert(entities)
print(f"数据插入完成。插入行数: {insert_result.insert_count}")# Milvus 是近实时数据库,数据插入后需要 flush 才能保证立即可搜
print("开始 flush 数据...")
collection.flush()
print(f"Flush 完成。集合 '{_COLLECTION_NAME}' 中的实体数量: {collection.num_entities}")# --- 4. 创建索引 ---
# 为向量字段创建索引以加速搜索
print(f"开始为向量字段 '{_VECTOR_FIELD}' 创建索引...")
index_params = {"metric_type": "L2", # 相似度度量方式: L2 (欧氏距离), IP (内积), COSINE (余弦相似度)"index_type": "IVF_FLAT", # 索引类型: FLAT, IVF_FLAT, IVF_SQ8, IVF_PQ, HNSW, ANNOY, DISKANN 等"params": {"nlist": 128}, # 索引参数,不同索引类型参数不同 (例如 IVF_FLAT 需要 nlist)
}
collection.create_index(field_name=_VECTOR_FIELD,index_params=index_params,index_name='vector_idx' # 可选,为索引指定一个名称
)
print(f"向量字段 '{_VECTOR_FIELD}' 索引创建成功!")
utility.wait_for_index_building_complete(_COLLECTION_NAME, index_name='vector_idx')
print(f"索引 '{_COLLECTION_NAME}:{vector_idx}' 构建完成。")# (可选)为标量字段创建索引,用于加速基于属性的过滤
# print(f"开始为标量字段 '{_SCALAR_FIELD}' 创建索引...")
# collection.create_index(
# field_name=_SCALAR_FIELD,
# index_name='scalar_idx' # 默认使用 MARISA_TRIE 索引
# )
# print(f"标量字段 '{_SCALAR_FIELD}' 索引创建成功!")
# utility.wait_for_index_building_complete(_COLLECTION_NAME, index_name='scalar_idx')
# print(f"索引 '{_COLLECTION_NAME}:{scalar_idx}' 构建完成。")# --- 5. 加载集合到内存 ---
# 执行搜索前,需要将集合加载到内存
print(f"开始加载集合 '{_COLLECTION_NAME}' 到内存...")
collection.load()
print("集合加载完成。")# --- 6. 执行向量相似性搜索 ---
print(f"准备执行向量相似性搜索 (TopK={_TOPK})...")
# 随机选择一个已插入的向量作为查询向量 (或生成新的查询向量)
query_vectors = embeddings[0:1] # 取第一条数据作为查询向量
# query_vectors = rng.random((1, _DIM)).astype(np.float32) # 或者生成新的随机向量search_params = {"metric_type": "L2","params": {"nprobe": 10}, # nprobe 是 IVF 系列索引在搜索时的参数,一般设置为 nlist 的10%左右
}print("开始搜索...")
results = collection.search(data=query_vectors, # 查询向量anns_field=_VECTOR_FIELD, # 要搜索的向量字段param=search_params, # 搜索参数limit=_TOPK, # 返回最相似的 TopK 个结果expr=f"{_SCALAR_FIELD} > 500", # 可选:标量字段过滤表达式,例如 "scalar_field > 500" 或 "id_field in [1, 2, 3]"output_fields=[_SCALAR_FIELD, _ID_FIELD] # 可选:指定除了距离和主键外,还希望返回的字段# consistency_level="Strong" # 可选:搜索时的一致性级别
)
print("搜索完成!")# 处理搜索结果
print(f"\n搜索结果 (查询向量 0):")
for i, hits in enumerate(results): # results 是一个列表,每个元素对应一个查询向量的结果print(f" 查询向量 {i}:")if not hits:print(" 没有找到匹配的结果。")for hit in hits:print(f" - Hit ID: {hit.id}, 距离(distance): {hit.distance:.4f}, scalar_field: {hit.entity.get(_SCALAR_FIELD)}")# --- 7. (可选)基于主键获取向量 ---
if insert_result.primary_keys:pk_to_get = insert_result.primary_keys[0] # 获取插入的第一个IDprint(f"\n尝试通过主键 {pk_to_get} 获取实体...")entities_by_pk = collection.query(expr=f"{_ID_FIELD} == {pk_to_get}",output_fields=[_VECTOR_FIELD, _SCALAR_FIELD])if entities_by_pk:print(f"获取到的实体: {entities_by_pk[0]}")else:print("未找到实体。")# --- 8. 释放和清理 ---
# 释放集合(从内存中卸载)
print(f"\n开始释放集合 '{_COLLECTION_NAME}'...")
collection.release()
print("集合已释放。")# 删除集合(如果不再需要)
# print(f"开始删除集合 '{_COLLECTION_NAME}'...")
# utility.drop_collection(_COLLECTION_NAME)
# print(f"集合 '{_COLLECTION_NAME}' 已成功删除。")# 断开连接 (脚本结束时通常会自动断开,但显式调用是个好习惯)
print("断开 Milvus 连接...")
connections.disconnect(alias='default') # 'default' 是默认的连接别名
print("Milvus 演示脚本执行完毕!")
代码解释:
- 配置: 定义了 Milvus 服务器地址、端口、集合名称、字段名称、向量维度等常量。
- 连接 Milvus: 使用
connections.connect()
连接到正在运行的 Milvus 服务。 - 创建集合:
- Schema 定义:
FieldSchema
用于定义每个字段的名称、数据类型(如INT64
,FLOAT_VECTOR
)、是否为主键、维度等。CollectionSchema
则包含了所有字段的定义。 auto_id=True
表示主键由 Milvus 自动生成。Collection()
用于实际创建集合。consistency_level
定义了集合的一致性级别,影响数据可见性和搜索结果。
- Schema 定义:
- 插入数据:
- 生成随机的向量数据和标量数据作为示例。
collection.insert()
用于将数据插入集合。注意,数据的顺序和数量需要与CollectionSchema
中定义的非auto_id
字段匹配。collection.flush()
:数据插入后调用flush()
是一个好习惯,它能确保数据被写入磁盘并且可以被索引和搜索。num_entities
会在flush
后更新。
- 创建索引:
- 为向量字段创建索引是进行高效相似性搜索的关键。
index_params
中需要指定metric_type
(如L2
欧氏距离,IP
内积) 和index_type
(如IVF_FLAT
,HNSW
) 以及该索引类型所需的特定参数 (如nlist
forIVF_FLAT
)。collection.create_index()
创建索引。utility.wait_for_index_building_complete()
等待索引构建完成。
- 加载集合: 在执行搜索之前,需要使用
collection.load()
将集合的数据加载到内存中。 - 执行搜索:
collection.search()
是核心的搜索函数。data
:一个或多个查询向量。anns_field
:指定要在哪个向量字段上进行搜索。param
:搜索时索引需要的一些参数 (如nprobe
forIVF_FLAT
)。limit
:返回最相似的K
个结果。expr
:可选的过滤表达式,可以基于标量字段或主键进行过滤。output_fields
:除了默认的 ID 和 distance,还可以指定返回哪些字段的值。
- 获取实体(可选):
collection.query()
可以根据表达式(通常是基于主键或标量字段的过滤)来检索实体。 - 释放和清理:
collection.release()
:将集合从内存中卸载,释放资源。utility.drop_collection()
:如果不再需要该集合,可以将其删除。connections.disconnect()
:断开与 Milvus 服务的连接。
这个例子展示了 Milvus 的基本使用流程。在实际应用中,你可能需要根据具体需求调整向量维度、数据量、索引类型和参数、过滤条件等。建议查阅 PyMilvus 的官方文档以获取更详细的信息和高级用法。