Python 操作 Elasticsearch
什么是 Elasticsearch
Elasticsearch 是一个基于 Lucene 的开源分布式搜索与分析引擎,能在海量数据中以毫秒级速度进行全文检索、结构化查询和实时数据分析,广泛用于站内搜索、日志分析和业务监控,数据以 JSON 文档形式存储,并支持水平扩展。
安装 Python Elasticsearch 客户端
安装最新的版本
# 安装最新的版本
pip install elasticsearch
安装指定的版本
# 安装指定的版本,假如要安装的版本是 8.14.0
pip install elasticsearch==8.14.0
对于 Elasticsearch 8.x,如果你启用了安全认证,需要加上 certifi
pip install certifi
连接 Elasticsearch
连接无密码的 es
from elasticsearch import Elasticsearches = Elasticsearch("http://localhost:9200")if es.ping():print("Elasticsearch 连接成功")
else:raise ConnectionError("Elasticsearch 连接失败")
连接有密码的 es,有有效证书的
from elasticsearch import Elasticsearch# 如果需要用户名和密码
es = Elasticsearch(# es 地址"https://my-domain:9200",# 账号密码basic_auth=("账号", "密码"),# verify_certs=True: 告诉 Python 客户端, 请严格验证 SSL 证书链是否受信任。verify_certs=True
)if es.ping():print("Elasticsearch 连接成功")
else:raise ConnectionError("Elasticsearch 连接失败")
连接有密码的 es,无有效证书的
from elasticsearch import Elasticsearch# 如果需要用户名和密码
es = Elasticsearch(# es 地址"https://10.10.10.1:9200",# 账号密码basic_auth=("账号", "密码"),# verify_certs=False: 告诉 Python 客户端, 不检查 SSL 证书链是否受信任。verify_certs=False,# 不显示 证书无效 的警告ssl_show_warn=False
)if es.ping():print("Elasticsearch 连接成功")
else:raise ConnectionError("Elasticsearch 连接失败")
索引操作
创建索引
# 自定义索引名称
index_name = "products"mapping = {"mappings": {"properties": {"name": {"type": "text"},"price": {"type": "float"},"category": {"type": "keyword"},"created_at": {"type": "date"}}}
}# 创建索引
if not es.indices.exists(index=index_name):es.indices.create(index=index_name, body=mapping)print("索引创建成功")
查询所有索引
indices = es.indices.get_alias(name="*")
print(indices)
查询索引是否存在
# 索引存在 返回 True, 索引不存在 返回 False
print(es.indices.exists(index=index_name))
查询索引数据结构
mapping = es.indices.get_mapping(index=index_name)
print(mapping)
删除指定的索引
deleteResult = es.indices.delete(index="products")
print(deleteResult) # 打印 {'acknowledged': True}
向索引中添加新字段
# 索引名称
index_name = "products"# 定义要添加的新字段 mapping
new_field = {"properties": {"age": {"type": "integer"}}
}# 更新 mapping
response = es.indices.put_mapping(index=index_name, body=new_field)
print(response) # 打印 {'acknowledged': True}
同一集群 迁移单个索引
# 被迁移的索引
source_index = "old-index"
# 迁移到的目标索引
target_index = "new-index"# 1️ 创建新索引(可自定义结构)
new_mapping = {"mappings": {"properties": {"name": {"type": "text"},"age": {"type": "integer"}}}
}# 创建目标索引
es.indices.create(index=target_index, body=new_mapping)# 2️ 迁移数据
res = es.reindex(body={"source": {"index": source_index},"dest": {"index": target_index}
})
从一个集群到另一个集群 迁移单个索引
例如:有如下2个集群
源集群(old):http://192.168.1.10:9200
目标集群(new):http://192.168.1.20:9200
1. 在目标集群中注册源集群远程连接(白名单)
PUT _cluster/settings
{"persistent": {"reindex.remote.whitelist": ["192.168.1.10:9200"]}
}
2. 在目标集群执行 py 迁移代码
from elasticsearch import Elasticsearches = Elasticsearch(# 目标 es 地址"https://192.168.1.20:9200",# 账号密码basic_auth=("elastic", "123456"),# verify_certs=False: 告诉 Python 客户端, 不检查 SSL 证书链是否受信任。verify_certs=False,# 不显示 证书无效 的警告ssl_show_warn=False
)if es.ping():print("Elasticsearch 连接成功")
else:raise ConnectionError("Elasticsearch 连接失败")# 迁移
es.reindex(body={"source": {"remote": {"host": "http://192.168.1.10:9200","username": "elastic","password": "123456"},"index": "old-index"},"dest": {"index": "new-index"}
})
添加 Document 操作
添加一条 doc,索引不存在时创建
index_name = "products"doc = {"name": "Alice","age": 25
}# _id 自动生成的方式
res = es.index(index=index_name, document=doc)# 指定 _id
# res = es.index(index=index_name, id="001", document=doc)print(res) # 打印 {'_index': 'products', '_id': '7z7kYZoBTWmfHFoJzD9j', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
ps. 如果想索引不存在 就不添加,使用 es.indices.exists(index=index_name) 进行判断
只在文档不存在时添加(幂等创建)
try:es.create(index="users", id="user_1001", document={"name": "Bob","age": 30})
except:print("文档已存在")
不存在则添加,存在则更新
es.update(index="users",id="user_1003",doc={"name": "David", "age": 27},doc_as_upsert=True
)
批量添加数据
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk# 如果需要用户名和密码
es = Elasticsearch(# es 地址"https://10.10.10.1:9200",# 账号密码basic_auth=("账号", "密码"),# verify_certs=False: 告诉 Python 客户端, 不检查 SSL 证书链是否受信任。verify_certs=False,# 不显示 证书无效 的警告ssl_show_warn=False
)if es.ping():print("Elasticsearch 连接成功")
else:raise ConnectionError("Elasticsearch 连接失败")# 要添加的批量数据
actions = [{"_index": "users", "_id": 1, "_source": {"name": "Alice", "age": 25}},{"_index": "users", "_id": 2, "_source": {"name": "Bob", "age": 30}},{"_index": "users", "_id": 3, "_source": {"name": "Charlie", "age": 22}},
]# 批量添加
success, _ = bulk(es, actions)
print("成功写入文档数:", success)
查询 Document 操作
根据 _id 查询单条 doc
