ES04-批量写入
ES04-批量写入
文章目录
- ES04-批量写入
- 1-Docker部署ES-有kibana
- 方式 1:官方「一键本地体验」脚本(最快)
- 方式 2:自定义 docker-compose 脚本(推荐生产/开发)
- 常用附属命令
- 端口 & 防火墙
- 2-Docker部署ES-无kibana
- 1. 建目录 & 赋权
- 2. 环境变量文件 `.env`
- 3. 纯 ES 的 docker-compose.yml
- 4. 启动
- 5. 验证
- 6. 常用运维
- 7. 注意事项
- 3-ES从CSV进行数据导入
- ✅ 项目结构
- ✅ 1. `requirements.txt`
- ✅ 2. `.env`(**不要提交到Git**)
- ✅ 3. `scripts/bulk_insert.py`
- ✅ 4. `README.md`
- 注意事项
- ✅ 6. 输出示例
- 4-ES从CSV进行数据导入-分批插入数据
1-Docker部署ES-有kibana
下面给出一份「开箱即用」的 Elasticsearch 8.x Docker 部署脚本合集,全部命令在 Ubuntu 22.04/24.04 + Docker 20.10+ 验证通过,直接复制即可运行。
如想一键完成,可直接用 Elastic 官方脚本(方式 1);如需自定义目录、内存、集群名等,可用手动 docker-compose 脚本(方式 2)。
方式 1:官方「一键本地体验」脚本(最快)
# 1. 安装 Docker(已安装可跳过)
curl -fsSL https://get.docker.com | sudo sh# 2. 一键拉起 ES+Kibana(默认最新 8.x)
curl -fsSL https://elastic.co/start-local | sh
执行完后会在当前目录生成 elastic-start-local/
文件夹,里面自带
start.sh
– 启动stop.sh
– 停止docker-compose.yml
– 如要改端口、内存、开启远程访问可直接编辑。
默认只能本机访问,需要外网访问时把 docker-compose.yml
里的 127.0.0.1:9200
与 127.0.0.1:5601
中的 127.0.0.1:
删掉即可,再执行 ./stop.sh && ./start.sh
。
方式 2:自定义 docker-compose 脚本(推荐生产/开发)
- 创建目录
mkdir -p /data/es/{data,logs,plugins,config,certs} \/data/kibana/{data,logs,config} \/data/es_docker
cd /data/es_docker
- 自定义环境变量
.env
cat > .env <<'EOF'
ELASTIC_VERSION=8.18.0
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
KIBANA_PASSWORD=Kibana@123
CLUSTER_NAME=demo-es
COMPOSE_PROJECT_NAME=es
EOF
- 编写
docker-compose.yml
version: "3.9"
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}container_name: es01environment:- node.name=es01- cluster.name=${CLUSTER_NAME}- discovery.type=single-node- bootstrap.memory_lock=true- "ES_JAVA_OPTS=${ES_JAVA_OPTS}"- xpack.security.enabled=true- xpack.security.http.ssl.enabled=false # 如要https可改为true- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}ulimits:memlock:soft: -1hard: -1volumes:- /data/es/data:/usr/share/elasticsearch/data- /data/es/logs:/usr/share/elasticsearch/logs- /data/es/plugins:/usr/share/elasticsearch/pluginsports:- "9200:9200"- "9300:9300"networks:- elastichealthcheck:test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]interval: 30stimeout: 10sretries: 5kibana:image: docker.elastic.co/kibana/kibana:${ELASTIC_VERSION}container_name: kib01depends_on:- elasticsearchenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200- ELASTICSEARCH_USERNAME=kibana_system- ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}- I18N_LOCALE=zh-CNvolumes:- /data/kibana/data:/usr/share/kibana/dataports:- "5601:5601"networks:- elasticnetworks:elastic:driver: bridge
- 赋权 & 启动
sudo chown -R 1000:1000 /data/es /data/kibana
docker compose up -d
- 查看状态
docker compose ps
curl -u elastic:Elastic@123 http://localhost:9200
浏览器访问 http://<服务器IP>:5601
即可打开 Kibana,用户名 elastic
,密码即 .env
里设置的 ELASTIC_PASSWORD
。
常用附属命令
- 关闭/重启
docker compose stop # 停止
docker compose start # 再次启动
docker compose down -v # 删除容器+网络(数据卷保留)
- 重置 elastic/kibana_system 密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i
docker exec -it es01 bin/elasticsearch-reset-password -u kibana_system -i
- 查看日志
docker logs -f es01
docker logs -f kib01
端口 & 防火墙
# 如启 firewalld 需放行
sudo firewall-cmd --permanent --add-port=9200/tcp --add-port=5601/tcp
sudo firewall-cmd --reload
云服务器还需在控制台安全组开放对应端口。
至此,Elasticsearch 与 Kibana 的 Docker 部署执行脚本已全部给出,可直接落地使用。
2-Docker部署ES-无kibana
下面给出「只跑 Elasticsearch,不装 Kibana」的最小可运行方案,依旧基于 Docker / docker-compose,复制即可用。
目录、内存、端口、集群名全部可自定义,单节点、开发/测试/小规格生产 都能直接落地。
1. 建目录 & 赋权
sudo mkdir -p /data/es/{data,logs,plugins,config,certs}
# 容器内 elasticsearch 进程默认 uid:gid = 1000:1000
sudo chown -R 1000:1000 /data/es
2. 环境变量文件 .env
cat >/data/es/.env <<'EOF'
ELASTIC_VERSION=8.18.0
CLUSTER_NAME=es-single
NODE_NAME=es01
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
# 如要外网访问,把下面改成 0.0.0.0
NETWORK_HOST=0.0.0.0
EOF
3. 纯 ES 的 docker-compose.yml
version: "3.9"
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}container_name: ${NODE_NAME}restart: unless-stoppedenvironment:- node.name=${NODE_NAME}- cluster.name=${CLUSTER_NAME}- discovery.type=single-node # 单节点必加- bootstrap.memory_lock=true- "ES_JAVA_OPTS=${ES_JAVA_OPTS}"- xpack.security.enabled=true- xpack.security.http.ssl.enabled=false # 关闭 https,省证书- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}- network.host=${NETWORK_HOST}ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536volumes:- /data/es/data:/usr/share/elasticsearch/data- /data/es/logs:/usr/share/elasticsearch/logs- /data/es/plugins:/usr/share/elasticsearch/pluginsports:- "9200:9200"- "9300:9300"networks:- esnethealthcheck:test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]interval: 30stimeout: 10sretries: 5networks:esnet:driver: bridge
4. 启动
cd /data/es
docker compose up -d # 拉镜像 & 后台运行
docker compose ps # 看状态
docker compose logs -f # 实时日志
5. 验证
curl -u elastic:Elastic@123 http://localhost:9200
# 返回集群信息即成功
6. 常用运维
# 重启/停止/删除
docker compose restart
docker compose stop
docker compose down # 容器+网络删除,数据卷保留# 改密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i# 升级小版本
# 改 .env 里的 ELASTIC_VERSION → docker compose pull → docker compose up -d
7. 注意事项
- 生产多节点请去掉
discovery.type=single-node
,改用
discovery.seed_hosts
/cluster.initial_master_nodes
。 - 如果 内存不足 1 G,把
.env
里的ES_JAVA_OPTS
改成-Xms512m -Xmx512m
,但官方最低建议 1 GB。 - 需要 HTTPS 时把
xpack.security.http.ssl.enabled=true
并挂证书目录即可。 - 云服务器记得 安全组/防火墙 放通 9200。
至此,纯 Elasticsearch 的 Docker 部署脚本 已就绪,复制即可用,Kibana 完全不装。
3-ES从CSV进行数据导入
注意:CSV需要有文件头(对应的是index的key)
下面我将为你创建一个完整的Python项目,用于:
- 生成本地CSV文件(模拟数据)
- 将CSV数据转换为Elasticsearch(ES)批量插入格式(bulk API)
- 使用带用户名密码认证的ES客户端批量插入数据
✅ 项目结构
csv_to_es/
│
├── data/
│ └── sample_data.csv # 自动生成的CSV文件
├── scripts/
│ └── bulk_insert.py # 主脚本:CSV → ES
├── requirements.txt # 依赖
├── .env # 环境变量(ES连接信息)
└── README.md # 使用说明
✅ 1. requirements.txt
elasticsearch==8.13.0
python-dotenv==1.0.0
pandas==2.2.2
✅ 2. .env
(不要提交到Git)
ES_HOST=https://localhost:9200
ES_USER=elastic
ES_PASSWORD=your_password
ES_INDEX=my_index
ES_CA_CERT=/path/to/http_ca.crt # 如果是自签名证书
✅ 3. scripts/bulk_insert.py
import os
import csv
import pandas as pd
from elasticsearch import Elasticsearch
from dotenv import load_dotenvload_dotenv()# 配置
CSV_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'sample_data.csv')
ES_HOST = os.getenv("ES_HOST")
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_INDEX = os.getenv("ES_INDEX")
ES_CA_CERT = os.getenv("ES_CA_CERT")def create_sample_csv():os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)data = [{"id": 1, "name": "Alice", "age": 30},{"id": 2, "name": "Bob", "age": 25},{"id": 3, "name": "Charlie", "age": 35},]df = pd.DataFrame(data)df.to_csv(CSV_PATH, index=False)print(f"[INFO] CSV文件已生成:{CSV_PATH}")def csv_to_bulk_actions(csv_file, index_name):actions = []with open(csv_file, newline='', encoding='utf-8') as f:reader = csv.DictReader(f)for row in reader:action = {"_index": index_name,"_source": row}actions.append(action)return actionsdef bulk_insert_to_es(actions):es = Elasticsearch([ES_HOST],basic_auth=(ES_USER, ES_PASSWORD),ca_certs=ES_CA_CERT if ES_CA_CERT else None,verify_certs=bool(ES_CA_CERT))if not es.ping():raise ConnectionError("无法连接到Elasticsearch")from elasticsearch.helpers import bulksuccess, _ = bulk(es, actions)print(f"[INFO] 成功插入 {success} 条记录到索引 '{ES_INDEX}'")def main():create_sample_csv()actions = csv_to_bulk_actions(CSV_PATH, ES_INDEX)bulk_insert_to_es(actions)if __name__ == "__main__":main()
✅ 4. README.md
# CSV to Elasticsearch Bulk Insert## 使用步骤1. 克隆项目
```bash
git clone <your-repo>
cd csv_to_es
- 安装依赖
pip install -r requirements.txt
-
配置环境变量
复制.env.example
为.env
并填写你的ES连接信息。 -
运行脚本
python scripts/bulk_insert.py
注意事项
- 确保ES服务可访问
- 如果使用自签名证书,请填写
ES_CA_CERT
路径 - 默认索引为
my_index
,可在.env
中修改
---### ✅ 5. 快速运行(本地测试)```bash
# 创建虚拟环境(可选)
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate# 安装依赖
pip install -r requirements.txt# 配置 .env 文件(填写你的ES信息)# 运行
python scripts/bulk_insert.py
✅ 6. 输出示例
[INFO] CSV文件已生成:.../csv_to_es/data/sample_data.csv
[INFO] 成功插入 3 条记录到索引 'my_index'
如需支持:
- 自动创建索引并设置mapping
- 增量更新(按id去重)
- 定时任务(cron/airflow)
可以继续扩展,我可以帮你加上。需要吗?
4-ES从CSV进行数据导入-分批插入数据
from elasticsearch import Elasticsearch, helpers
import pandas as pd# 1. 连接写法(8.x 推荐用 Elasticsearch(hosts=…))
es = Elasticsearch(hosts=[{'scheme': 'http', 'host': '127.0.0.1', 'port': 9200}],basic_auth=('username', 'password')
)# 2. 必须给 index 一个名字
INDEX_NAME = 'your_index_name'# 3. 批处理函数
def process_chunk(chunk: pd.DataFrame):actions = []for _, row in chunk.iterrows(): # iterrows() 是方法,要加括号row_dict = row.astype(str).to_dict()# 将usrId转化为字符串,并确保长度为6,不足在前面添加0row_dict['usrId'] = str(row_dict['usrId']).zfill(6)actions.append({"_index": INDEX_NAME,"_source": row_dict})# 4. 真正写入 ESif actions:print(f'开始添加 {len(actions)} 条记录到 {INDEX_NAME}')try:helpers.bulk(es, actions, raise_on_error=True)print(f'成功写入 {len(actions)} 条记录')except Exception as e: # 注意大小写print(f'索引过程失败: {e}')# 5. 主流程
def main():csv_file = './xxx/xxx.csv'chunk_size = 10_000for chunk in pd.read_csv(csv_file, chunksize=chunk_size, encoding='gbk'):process_chunk(chunk)if __name__ == '__main__':main()