Elasticsearch 如何实现跨数据中心的数据同步?
实战场景:
双数据中心容灾,要求RPO<5分钟,RTO<30分钟
RPO(Recovery Point Objective):
RPO指的是灾难发生后,系统能够恢复到的数据更新点的时间。简单来说,它衡量的是数据丢失的量。在你的例子中,RPO<5分钟意味着在灾难发生后,系统能够恢复到灾难发生前5分钟内的数据状态,从而确保数据丢失量控制在5分钟以内。
RTO(Recovery Time Objective):
RTO则是指从灾难发生到业务系统完全恢复并可以重新提供服务所需的时间。它衡量的是业务中断的时间长度。在你的例子中,RTO<30分钟意味着在灾难发生后,系统能够在30分钟内完全恢复并重新提供服务,从而将业务中断的时间控制在30分钟以内。
1. CCR核心配置(北京->上海 单向同步)
# 建立集群间安全连接
PUT /_cluster/settings
{"persistent": {"cluster.remote.shanghai_cluster.seeds": "es-sh-node1:9300,es-sh-node2:9300","cluster.remote.shanghai_cluster.skip_unavailable": true}
}# 创建跟随策略(同步订单索引)
PUT /_ccr/auto_follow/jd_orders
{"remote_cluster" : "shanghai_cluster","leader_index_patterns" : ["jd_orders-*"],"follow_index_pattern" : "sh_{{leader_index}}","max_read_request_operation_count" : 5120,"max_outstanding_read_requests" : 24
}
2. Logstash增量备份(双向同步商品索引)
input {elasticsearch {hosts => ["http://es-bj-node1:9200"]index => "jd_goods"query => '{ "query": { "range": { "@timestamp": { "gte": "now-5m" }}}}'docinfo => truesize => 500scroll => "5m"}
}output {# 上海集群写入elasticsearch {hosts => ["http://es-sh-node1:9200"]index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"pipeline => "timestamp_pipeline"}# 本地备份elasticsearch {hosts => ["http://localhost:9200"]index => "jd_goods_backup"}
}
3. 网络优化配置
# 跨数据中心专用线程池
thread_pool.search.size: 32
thread_pool.search.queue_size: 2000# 传输层参数优化
transport.tcp.compress: true
transport.profiles.default.tcp_keep_alive: true
transport.profiles.default.tcp_no_delay: true
监控方案(基于Kibana):
{"alert": {"name": "CCR延迟告警","conditions": {"script": "ctx.results[0].hits.hits[0]._source.follower_lag > 300000"},"actions": {"webhook": "http://alert.jd.com/ccr_warn"}}
}
实战经验:
- 使用专用10Gbps通道,实测同步延迟120-180ms
- 索引分片数=数据中心数量×2(北京8节点集群使用24分片)
- 采用时间戳管道统一时区:
PUT _ingest/pipeline/timestamp_pipeline
{"processors": [{"date": {"field": "bj_timestamp","target_field": "@timestamp","timezone": "Asia/Shanghai","formats": ["ISO8601"]}}]
}