当前位置: 首页 > news >正文

ES04-批量写入

ES04-批量写入

文章目录

  • ES04-批量写入
  • 1-Docker部署ES-有kibana
      • 方式 1:官方「一键本地体验」脚本(最快)
      • 方式 2:自定义 docker-compose 脚本(推荐生产/开发)
      • 常用附属命令
      • 端口 & 防火墙
  • 2-Docker部署ES-无kibana
      • 1. 建目录 & 赋权
      • 2. 环境变量文件 `.env`
      • 3. 纯 ES 的 docker-compose.yml
      • 4. 启动
      • 5. 验证
      • 6. 常用运维
      • 7. 注意事项
  • 3-ES从CSV进行数据导入
      • ✅ 项目结构
      • ✅ 1. `requirements.txt`
      • ✅ 2. `.env`(**不要提交到Git**)
      • ✅ 3. `scripts/bulk_insert.py`
      • ✅ 4. `README.md`
    • 注意事项
      • ✅ 6. 输出示例
    • 4-ES从CSV进行数据导入-分批插入数据


1-Docker部署ES-有kibana

下面给出一份「开箱即用」的 Elasticsearch 8.x Docker 部署脚本合集,全部命令在 Ubuntu 22.04/24.04 + Docker 20.10+ 验证通过,直接复制即可运行。
如想一键完成,可直接用 Elastic 官方脚本(方式 1);如需自定义目录、内存、集群名等,可用手动 docker-compose 脚本(方式 2)。


方式 1:官方「一键本地体验」脚本(最快)

# 1. 安装 Docker(已安装可跳过)
curl -fsSL https://get.docker.com | sudo sh# 2. 一键拉起 ES+Kibana(默认最新 8.x)
curl -fsSL https://elastic.co/start-local | sh

执行完后会在当前目录生成 elastic-start-local/ 文件夹,里面自带

  • start.sh – 启动
  • stop.sh – 停止
  • docker-compose.yml – 如要改端口、内存、开启远程访问可直接编辑。

默认只能本机访问,需要外网访问时把 docker-compose.yml 里的 127.0.0.1:9200127.0.0.1:5601 中的 127.0.0.1: 删掉即可,再执行 ./stop.sh && ./start.sh


方式 2:自定义 docker-compose 脚本(推荐生产/开发)

  1. 创建目录
mkdir -p /data/es/{data,logs,plugins,config,certs} \/data/kibana/{data,logs,config} \/data/es_docker
cd /data/es_docker
  1. 自定义环境变量 .env
cat > .env <<'EOF'
ELASTIC_VERSION=8.18.0
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
KIBANA_PASSWORD=Kibana@123
CLUSTER_NAME=demo-es
COMPOSE_PROJECT_NAME=es
EOF
  1. 编写 docker-compose.yml
version: "3.9"
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}container_name: es01environment:- node.name=es01- cluster.name=${CLUSTER_NAME}- discovery.type=single-node- bootstrap.memory_lock=true- "ES_JAVA_OPTS=${ES_JAVA_OPTS}"- xpack.security.enabled=true- xpack.security.http.ssl.enabled=false   # 如要https可改为true- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}ulimits:memlock:soft: -1hard: -1volumes:- /data/es/data:/usr/share/elasticsearch/data- /data/es/logs:/usr/share/elasticsearch/logs- /data/es/plugins:/usr/share/elasticsearch/pluginsports:- "9200:9200"- "9300:9300"networks:- elastichealthcheck:test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]interval: 30stimeout: 10sretries: 5kibana:image: docker.elastic.co/kibana/kibana:${ELASTIC_VERSION}container_name: kib01depends_on:- elasticsearchenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200- ELASTICSEARCH_USERNAME=kibana_system- ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}- I18N_LOCALE=zh-CNvolumes:- /data/kibana/data:/usr/share/kibana/dataports:- "5601:5601"networks:- elasticnetworks:elastic:driver: bridge
  1. 赋权 & 启动
sudo chown -R 1000:1000 /data/es /data/kibana
docker compose up -d
  1. 查看状态
docker compose ps
curl -u elastic:Elastic@123 http://localhost:9200

浏览器访问 http://<服务器IP>:5601 即可打开 Kibana,用户名 elastic,密码即 .env 里设置的 ELASTIC_PASSWORD


常用附属命令

  • 关闭/重启
docker compose stop      # 停止
docker compose start     # 再次启动
docker compose down -v   # 删除容器+网络(数据卷保留)
  • 重置 elastic/kibana_system 密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i
docker exec -it es01 bin/elasticsearch-reset-password -u kibana_system -i
  • 查看日志
docker logs -f es01
docker logs -f kib01

端口 & 防火墙

# 如启 firewalld 需放行
sudo firewall-cmd --permanent --add-port=9200/tcp --add-port=5601/tcp
sudo firewall-cmd --reload

云服务器还需在控制台安全组开放对应端口。

至此,Elasticsearch 与 Kibana 的 Docker 部署执行脚本已全部给出,可直接落地使用。


2-Docker部署ES-无kibana

下面给出「只跑 Elasticsearch,不装 Kibana」的最小可运行方案,依旧基于 Docker / docker-compose,复制即可用。
目录、内存、端口、集群名全部可自定义,单节点开发/测试/小规格生产 都能直接落地。


1. 建目录 & 赋权

sudo mkdir -p /data/es/{data,logs,plugins,config,certs}
# 容器内 elasticsearch 进程默认 uid:gid = 1000:1000
sudo chown -R 1000:1000 /data/es

2. 环境变量文件 .env

cat >/data/es/.env <<'EOF'
ELASTIC_VERSION=8.18.0
CLUSTER_NAME=es-single
NODE_NAME=es01
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
# 如要外网访问,把下面改成 0.0.0.0
NETWORK_HOST=0.0.0.0
EOF

3. 纯 ES 的 docker-compose.yml

version: "3.9"
services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}container_name: ${NODE_NAME}restart: unless-stoppedenvironment:- node.name=${NODE_NAME}- cluster.name=${CLUSTER_NAME}- discovery.type=single-node        # 单节点必加- bootstrap.memory_lock=true- "ES_JAVA_OPTS=${ES_JAVA_OPTS}"- xpack.security.enabled=true- xpack.security.http.ssl.enabled=false   # 关闭 https,省证书- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}- network.host=${NETWORK_HOST}ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536volumes:- /data/es/data:/usr/share/elasticsearch/data- /data/es/logs:/usr/share/elasticsearch/logs- /data/es/plugins:/usr/share/elasticsearch/pluginsports:- "9200:9200"- "9300:9300"networks:- esnethealthcheck:test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]interval: 30stimeout: 10sretries: 5networks:esnet:driver: bridge

4. 启动

cd /data/es
docker compose up -d          # 拉镜像 & 后台运行
docker compose ps             # 看状态
docker compose logs -f        # 实时日志

5. 验证

curl -u elastic:Elastic@123 http://localhost:9200
# 返回集群信息即成功

6. 常用运维

# 重启/停止/删除
docker compose restart
docker compose stop
docker compose down           # 容器+网络删除,数据卷保留# 改密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i# 升级小版本
# 改 .env 里的 ELASTIC_VERSION → docker compose pull → docker compose up -d

7. 注意事项

  1. 生产多节点请去掉 discovery.type=single-node,改用
    discovery.seed_hosts / cluster.initial_master_nodes
  2. 如果 内存不足 1 G,把 .env 里的 ES_JAVA_OPTS 改成 -Xms512m -Xmx512m,但官方最低建议 1 GB。
  3. 需要 HTTPS 时把
    xpack.security.http.ssl.enabled=true 并挂证书目录即可。
  4. 云服务器记得 安全组/防火墙 放通 9200。

至此,纯 Elasticsearch 的 Docker 部署脚本 已就绪,复制即可用,Kibana 完全不装。


3-ES从CSV进行数据导入

注意:CSV需要有文件头(对应的是index的key)

下面我将为你创建一个完整的Python项目,用于:

  1. 生成本地CSV文件(模拟数据)
  2. 将CSV数据转换为Elasticsearch(ES)批量插入格式(bulk API)
  3. 使用带用户名密码认证的ES客户端批量插入数据

✅ 项目结构

csv_to_es/
│
├── data/
│   └── sample_data.csv          # 自动生成的CSV文件
├── scripts/
│   └── bulk_insert.py           # 主脚本:CSV → ES
├── requirements.txt              # 依赖
├── .env                          # 环境变量(ES连接信息)
└── README.md                     # 使用说明

✅ 1. requirements.txt

elasticsearch==8.13.0
python-dotenv==1.0.0
pandas==2.2.2

✅ 2. .env不要提交到Git

ES_HOST=https://localhost:9200
ES_USER=elastic
ES_PASSWORD=your_password
ES_INDEX=my_index
ES_CA_CERT=/path/to/http_ca.crt  # 如果是自签名证书

✅ 3. scripts/bulk_insert.py

import os
import csv
import pandas as pd
from elasticsearch import Elasticsearch
from dotenv import load_dotenvload_dotenv()# 配置
CSV_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'sample_data.csv')
ES_HOST = os.getenv("ES_HOST")
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_INDEX = os.getenv("ES_INDEX")
ES_CA_CERT = os.getenv("ES_CA_CERT")def create_sample_csv():os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)data = [{"id": 1, "name": "Alice", "age": 30},{"id": 2, "name": "Bob", "age": 25},{"id": 3, "name": "Charlie", "age": 35},]df = pd.DataFrame(data)df.to_csv(CSV_PATH, index=False)print(f"[INFO] CSV文件已生成:{CSV_PATH}")def csv_to_bulk_actions(csv_file, index_name):actions = []with open(csv_file, newline='', encoding='utf-8') as f:reader = csv.DictReader(f)for row in reader:action = {"_index": index_name,"_source": row}actions.append(action)return actionsdef bulk_insert_to_es(actions):es = Elasticsearch([ES_HOST],basic_auth=(ES_USER, ES_PASSWORD),ca_certs=ES_CA_CERT if ES_CA_CERT else None,verify_certs=bool(ES_CA_CERT))if not es.ping():raise ConnectionError("无法连接到Elasticsearch")from elasticsearch.helpers import bulksuccess, _ = bulk(es, actions)print(f"[INFO] 成功插入 {success} 条记录到索引 '{ES_INDEX}'")def main():create_sample_csv()actions = csv_to_bulk_actions(CSV_PATH, ES_INDEX)bulk_insert_to_es(actions)if __name__ == "__main__":main()

✅ 4. README.md

# CSV to Elasticsearch Bulk Insert## 使用步骤1. 克隆项目
```bash
git clone <your-repo>
cd csv_to_es
  1. 安装依赖
pip install -r requirements.txt
  1. 配置环境变量
    复制 .env.example.env 并填写你的ES连接信息。

  2. 运行脚本

python scripts/bulk_insert.py

注意事项

  • 确保ES服务可访问
  • 如果使用自签名证书,请填写 ES_CA_CERT 路径
  • 默认索引为 my_index,可在 .env 中修改

---### ✅ 5. 快速运行(本地测试)```bash
# 创建虚拟环境(可选)
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate# 安装依赖
pip install -r requirements.txt# 配置 .env 文件(填写你的ES信息)# 运行
python scripts/bulk_insert.py

✅ 6. 输出示例

[INFO] CSV文件已生成:.../csv_to_es/data/sample_data.csv
[INFO] 成功插入 3 条记录到索引 'my_index'

如需支持:

  • 自动创建索引并设置mapping
  • 增量更新(按id去重)
  • 定时任务(cron/airflow)

可以继续扩展,我可以帮你加上。需要吗?


4-ES从CSV进行数据导入-分批插入数据

from elasticsearch import Elasticsearch, helpers
import pandas as pd# 1. 连接写法(8.x 推荐用 Elasticsearch(hosts=…))
es = Elasticsearch(hosts=[{'scheme': 'http', 'host': '127.0.0.1', 'port': 9200}],basic_auth=('username', 'password')
)# 2. 必须给 index 一个名字
INDEX_NAME = 'your_index_name'# 3. 批处理函数
def process_chunk(chunk: pd.DataFrame):actions = []for _, row in chunk.iterrows():          # iterrows() 是方法,要加括号row_dict = row.astype(str).to_dict()# 将usrId转化为字符串,并确保长度为6,不足在前面添加0row_dict['usrId'] = str(row_dict['usrId']).zfill(6)actions.append({"_index": INDEX_NAME,"_source": row_dict})# 4. 真正写入 ESif actions:print(f'开始添加 {len(actions)} 条记录到 {INDEX_NAME}')try:helpers.bulk(es, actions, raise_on_error=True)print(f'成功写入 {len(actions)} 条记录')except Exception as e:               # 注意大小写print(f'索引过程失败: {e}')# 5. 主流程
def main():csv_file = './xxx/xxx.csv'chunk_size = 10_000for chunk in pd.read_csv(csv_file, chunksize=chunk_size, encoding='gbk'):process_chunk(chunk)if __name__ == '__main__':main()

文章转载自:

http://h3ULpepE.qbgfp.cn
http://0bzySVTm.qbgfp.cn
http://0JUU2Xl2.qbgfp.cn
http://CO4GHWWM.qbgfp.cn
http://Ze3uQO6d.qbgfp.cn
http://Kh6v1OBe.qbgfp.cn
http://EX8Lxpqx.qbgfp.cn
http://vy4ctTNL.qbgfp.cn
http://DcEg7vhe.qbgfp.cn
http://zIZamblu.qbgfp.cn
http://M9GFIwp9.qbgfp.cn
http://eXSVVypm.qbgfp.cn
http://x6M7uzbC.qbgfp.cn
http://iF2TQ9KY.qbgfp.cn
http://mUw3kR2o.qbgfp.cn
http://Z9m8RdBy.qbgfp.cn
http://fEjeNAvR.qbgfp.cn
http://rXZb26yI.qbgfp.cn
http://n3ak3RnY.qbgfp.cn
http://EwhONLGi.qbgfp.cn
http://LEWNyJ7a.qbgfp.cn
http://Qjs0EyEc.qbgfp.cn
http://rJ1Bods4.qbgfp.cn
http://Y48TRwgX.qbgfp.cn
http://f9zC3Anx.qbgfp.cn
http://fyDLYM65.qbgfp.cn
http://xHr8tTmY.qbgfp.cn
http://aLbBbJ7S.qbgfp.cn
http://tqJMKffM.qbgfp.cn
http://Z3T1TC2o.qbgfp.cn
http://www.dtcms.com/a/368274.html

相关文章:

  • 大数据毕业设计推荐:基于Spark的零售时尚精品店销售数据分析系统【Hadoop+python+spark】
  • 企业数字安全双保险:终端安全与数据防泄漏如何构筑全方位防护体系
  • 信息系统安全保护措施文件方案
  • 【C++】 list 容器模拟实现解析
  • 鹿客发布旗舰新品AI智能锁V6 Max,打造AI家庭安全领域新标杆
  • 【GEOS-Chem 输入数据】使用 AWS CLI 访问 GEOS-Chem 数据
  • 23种设计模式——原型模式 (Prototype Pattern)详解
  • 《Cocos Creator的2D、3D渲染使用记录》
  • Conda 使用py环境隔离
  • 数据结构:栈和队列力扣算法题
  • 深度学习之第八课迁移学习(残差网络ResNet)
  • 数据一致性、AI样本可追溯性与数据治理
  • 基于MATLAB的CNN大气散射传播率计算与图像去雾实现
  • 【Redis】初识 Redis 与基础数据结构
  • 分布式常见面试题整理
  • “卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
  • 数字时代的 “安全刚需”:为什么销售管理企业都在做手机号码脱敏
  • 乐观并发: TCP 与编程实践
  • 两条平面直线之间通过三次多项式曲线进行过渡的方法介绍
  • if __name__=‘__main__‘的用处
  • MySQL知识回顾总结----数据类型
  • WeaveFox AI智能开发平台介绍
  • Oracle:select top 5
  • sub3G、sub6G和LB、MB、HB、MHB、LMHB、UHB之间的区别和联系
  • Tenda AC20路由器缓冲区溢出漏洞分析
  • 52核心52线程,Intel下一代CPU憋了个大的
  • 50kNm风能传动轴扭转疲劳检测试验台指标
  • 蓓韵安禧DHA温和配方:安全营养的智慧守护
  • Kafka面试精讲 Day 8:日志清理与数据保留策略
  • 轨迹文件缺少时间