使用Reindex迁移Elasticsearch集群数据详解(下)
#作者:stackofumbrella
文章目录
- 使用Ingest Node
- Reindex远程服务器的索引到本地
- version_type用法
- op_type用法
- conflicts用法
- query用法
- size设置
- sort配置
- 查看进度
- 使用Task API查询进度
- 取消reindex
使用Ingest Node
这个功能是最好用的,当source是因为不合理的结构,需要重新结构化所有的数据时,通过ingest node,可以很方便的在新的index中获得不一样的mapping和value
POST _reindex
{"source": {"index": "source"},"dest": {"index": "dest","pipeline": "some_ingest_pipeline"}
}
Reindex远程服务器的索引到本地
在本地目标Elasticsearch中需要在elasticsearch.yml中添加白名单设置,然后重启Elasticsearch服务
reindex.remote.whitelist: ["elasticsearch.aliyuncs.com:9200"]
POST _reindex
{"source": {"remote": {"host": "http://remotehost:9200","username": "elastic","password": "password"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
默认的buffer的size是100M,在scroll size是1000的情况下,如果单个document的平均大小超过100K,则有可能会报错,因此遇到非常大的document需要减小batch的size
POST _reindex
{"source": {"remote": {"host": "http://remotehost:9200"},"index": "source","size": 100,"query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
version_type用法
如果新的index中有数据,并且可能会发生冲突,这时可以设置"version_type": “internal”,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和id的任何内容
POST _reindex
{"source": {"index": "old_index"},"dest": {"index": "new_index","version_type": "internal"}
}
就像_update_by_query,reindex会生成源索引的快照,但它的目标必须是一个不同的索引,避免版本冲突,dest对象可以像index API一样进行配置,以乐观锁控制并发,Elasticsearch将会直接将文档转储到dest中,覆盖任何具有相同类型和id的document
POST _reindex
{"source": {"index": "mm"},"dest": {"index": "new_mm","version_type": "internal"}
}
如果把version_type设置为external,则Elasticsearch会从source读取version字段,当遇到具有相同类型和id的documents,只更新new version。简单来说,就是在reindex的时候,目标index可以不是一个新的index,而是有数据的,如果源index和目标index里面有相同类型和id的document,对于使用internal是直接覆盖,使用external的话,只有当source的version更加新的时候才更新。
POST _reindex
{"source": {"index": "mm"},"dest": {"index": "mm","version_type": "external"}
}
op_type用法
把op_type设置为create,只在目标index中添加不存在的document,如果相同的document已经存在,则会报version conflict的错误。
POST _reindex
{"source": {"index": "mm"},"dest": {"index": "new_mm","op_type": "create"}
}
conflicts用法
默认当发生version conflict的时候,_reindex会被abort,除非把conflict设置为“proceed”
POST _reindex
{"conflicts": "proceed","source": {"index": "mm"},"dest": {"index": "new_mm","op_type": "create"}
}
query用法
通过query把需要_reindex的document限定在一定的范围
POST _reindex
{"source": {"index": "mm","type": "tweet","query": {"term": {"user": "alen"}}},"dest": {"index": "new_mm"}
}
size设置
通过size可以控制复制多少内容
POST _reindex
{"size": 1,"source": {"index": "mm"},"dest": {"index": "new_mm"}
}
sort配置
把时间排序前1000个document复制到目标索引
POST _reindex
{"size": 1000,"source": {"index": "mm","sort": { "date": "desc" }},"dest": {"index": "new_mm"}
}
查看进度
使用Task API查询进度
一般来说如果源index很大,则可能需要比较长的时间来完成reindex工作,如果需要查看进度,可以通过tasks API查看
GET _tasks?detailed=true&actions=*reindex
{"nodes" : {"r1A2WoRbTwKZ516z6NEs5A" : {"name" : "r1A2WoR","transport_address" : "127.0.0.1:9300","host" : "127.0.0.1","ip" : "127.0.0.1:9300","attributes" : {"testattr" : "test","portsfile" : "true"},"tasks" : {"r1A2WoRbTwKZ516z6NEs5A:36619" : {"node" : "r1A2WoRbTwKZ516z6NEs5A","id" : 36619,"type" : "transport","action" : "indices:data/write/reindex","status" : { "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0 },"description" : "" }}}}
}
取消reindex
这里的task_id可以通过tasks API获得
POST _tasks/task_id:1/_cancel
Reindex迁移注意事项
Reindex会占用大量资源,建议在低峰期执行
大数据量reindex建议使用slices并行处理
可以使用wait_for_completion=false异步执行
监控任务进度,避免影响集群性能
考虑版本兼容性问题,特别是跨大版本迁移时