Elasticsearch并发更新冲突问题与解决
文章目录
一、引出问题:多线程更新导致版本冲突
1、问题重现
ES7.10版本:
<dependencies><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.10.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.10.0</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency>
</dependencies>
该案例将:
创建一个测试索引
初始化一个文档
多线程并发更新同一文档(不处理版本控制)
触发版本冲突异常(VersionConflictEngineException)
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClient;import java.io.IOException;
import java.util.concurrent.CountDownLatch;public class EsConcurrentUpdateConflict {// ES客户端private static RestHighLevelClient client;// 测试索引名private static final String INDEX_NAME = "test_concurrent_update";// 测试文档ID(固定为1,确保并发更新同一文档)private static final String DOC_ID = "1";public static void main(String[] args) throws IOException, InterruptedException {// 1. 初始化ES客户端initClient();try {// 2. 创建测试索引createIndex();// 3. 初始化文档(初始版本为1)initDocument();// 4. 并发更新:启动10个线程同时更新同一文档int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {final int threadId = i;new Thread(() -> {try {updateDocument(threadId);} catch (Exception e) {System.err.println("线程" + threadId + "更新失败:" + e.getMessage());} finally {latch.countDown();}}).start();}latch.await(); // 等待所有线程执行完毕} finally {// 5. 关闭客户端client.close();}}// 初始化ES客户端(连接本地ES,默认端口9200)private static void initClient() {RestClientBuilder builder = RestClient.builder(new org.apache.http.HttpHost("192.168.56.10", 9200, "http"));client = new RestHighLevelClient(builder);}// 创建索引(设置映射:一个简单的text字段)private static void createIndex() throws IOException {CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME);// 设置索引配置(可选)request.settings(Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0));// 设置映射(字段:content,类型text)String mappingJson = "{\n" +" \"properties\": {\n" +" \"content\": {\n" +" \"type\": \"text\"\n" +" }\n" +" }\n" +"}";request.mapping(mappingJson, XContentType.JSON);CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);if (response.isAcknowledged()) {System.out.println("索引创建成功:" + INDEX_NAME);} else {throw new RuntimeException("索引创建失败");}}// 初始化文档(版本会从1开始)private static void initDocument() throws IOException {IndexRequest request = new IndexRequest(INDEX_NAME);request.id(DOC_ID);// 初始内容String json = "{\n" +" \"content\": \"初始内容\"\n" +"}";request.source(json, XContentType.JSON);IndexResponse response = client.index(request, RequestOptions.DEFAULT);if (response.getResult() == DocWriteResponse.Result.CREATED) {System.out.println("初始文档创建成功,版本:" + response.getVersion());}}// 并发更新文档(不指定版本,依赖ES自动版本控制)private static void updateDocument(int threadId) throws IOException {UpdateRequest request = new UpdateRequest(INDEX_NAME, DOC_ID);// 更新内容:标记当前线程IDString updateJson = "{\n" +" \"content\": \"线程" + threadId + "更新的内容\"\n" +"}";request.doc(updateJson, XContentType.JSON);// 立即刷新request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);// 执行更新(不处理版本冲突,直接提交)UpdateResponse response = client.update(request, RequestOptions.DEFAULT);// 如果更新成功,打印版本if (response.getResult() == DocWriteResponse.Result.UPDATED) {System.out.println("线程" + threadId + "更新成功,新版本:" + response.getVersion());}}
}
// 结果
线程5更新成功,新版本:2
线程2更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程7更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程8更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程6更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程0更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程9更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [6], primary term [1]. current document has seqNo [7] and primary term [1]]
线程4更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
线程3更新失败:Elasticsearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, required seqNo [7], primary term [1]. current document has seqNo [8] and primary term [1]]
2、冲突原理
Elasticsearch 使用乐观锁控制文档更新:每个文档有一个_version字段(或_seq_no+_primary_term),每次更新会自动递增版本。
当多个线程并发更新同一文档时,第一个线程更新成功后版本会递增,后续线程的更新请求会发现版本不匹配(本地版本 < 服务器当前版本),从而抛出VersionConflictEngineException。
案例中未指定版本参数(request.version(...)),客户端默认使用 “当前最新版本” 更新,但并发场景下会因版本变化导致冲突。
二、冲突处理
在实际开发中,处理Elasticsearch的并发更新冲突(版本冲突)需要结合业务场景选择合适的策略,核心目标是保证数据一致性并减少冲突对业务的影响。以下是常见的处理方式及实现方案:
1、自动重试(推荐,适用于冲突频率低的场景)
思路:捕获版本冲突异常后,重新获取文档最新版本和数据,基于最新数据重试更新,直到成功或达到最大重试次数。
适用场景:更新逻辑简单、冲突不频繁(如用户资料修改、商品库存微调)。
/*** 带重试机制的更新方法* @param maxRetries 最大重试次数*/
private static boolean updateWithRetry(int threadId, int maxRetries) throws IOException {int retryCount = 0;while (retryCount < maxRetries) {try {// 1. 先获取文档最新数据(确保基于最新内容更新)GetResponse getResponse = client.get(new GetRequest(INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);if (!getResponse.isExists()) {System.out.println("文档不存在,更新失败");return false;}// 获取最新版本(或seq_no+primary_term)long currentVersion = getResponse.getVersion();// 2. 基于最新数据构建更新请求(这里示例为追加线程ID到content)UpdateRequest request = new UpdateRequest(INDEX_NAME, DOC_ID);String currentContent = (String) getResponse.getSource().get("content");String newContent = currentContent + " | 线程" + threadId + "更新";request.doc("content", newContent);// 可选:显式指定基于当前版本更新(增强安全性)request.version(currentVersion);// 3. 执行更新UpdateResponse response = client.update(request, RequestOptions.DEFAULT);System.out.println("线程" + threadId + "更新成功(重试" + retryCount + "次),新版本:" + response.getVersion());return true;} catch (ElasticsearchException e) {// 判断是否为版本冲突异常if (e.isVersionConflictException()) {retryCount++;System.out.println("线程" + threadId + "发生版本冲突,准备重试(第" + retryCount + "次)");// 可选:重试前短暂休眠,减少冲突概率try {Thread.sleep(100);} catch (InterruptedException ie) {Thread.currentThread().interrupt();return false;}} else {// 其他异常(如网络错误),直接抛出throw e;}}}System.out.println("线程" + threadId + "超过最大重试次数(" + maxRetries + "次),更新失败");return false;
}
关键点:
重试前必须重新获取文档最新数据(避免基于旧数据更新,导致“丢失更新”);
控制最大重试次数(如3-5次),避免无限重试消耗资源;
可添加随机休眠时间(如50-200ms),减少并发线程再次冲突的概率。
2、使用retry_on_conflict参数(适用于简单更新场景)
思路:通过UpdateRequest的retryOnConflict(int)方法,指定ES服务器在发生冲突时自动重试的次数(无需客户端手动处理)。
适用场景:更新逻辑简单(如仅修改单个字段)、冲突频率低,且不需要客户端复杂处理的场景。
private static void updateWithRetryOnConflict(int threadId) throws IOException {UpdateRequest request = new UpdateRequest(INDEX_NAME, DOC_ID);// 设置服务器自动重试次数(如3次)request.retryOnConflict(3);// 简单更新:直接覆盖content(实际应避免覆盖,推荐基于最新数据更新)request.doc("content", "线程" + threadId + "更新的内容(retry_on_conflict)");try {UpdateResponse response = client.update(request, RequestOptions.DEFAULT);System.out.println("线程" + threadId + "更新成功,新版本:" + response.getVersion());} catch (ElasticsearchException e) {if (e.isVersionConflictException()) {System.err.println("线程" + threadId + "超过服务器重试次数,更新失败");} else {throw e;}}
}
注意:
服务器重试的是“更新操作”本身,若更新逻辑依赖客户端本地计算(如基于旧值累加),可能导致错误(因为服务器无法获取客户端的计算逻辑);
仅适用于“无状态更新”(如直接设置字段值),不适用于“有状态更新”(如count = count + 1)。
3、脚本更新(减少冲突窗口,适用于有状态更新)
思路:使用ES的Script Update,将更新逻辑(如累加、字段合并)放在服务器端脚本中执行,避免客户端“先查询再更新”的窗口期,从根源减少冲突概率。
适用场景:更新逻辑可通过脚本表达(如库存增减、计数器累加),且需要原子性操作。
实现示例(以计数器累加为例):
private static void updateWithScript(int threadId) throws IOException {UpdateRequest request = new UpdateRequest(INDEX_NAME, DOC_ID);// 脚本:将count字段加1(若字段不存在则初始化为1)Script script = new Script(ScriptType.INLINE,"painless","ctx._source.count = (ctx._source.count ?: 0) + 1",Collections.emptyMap());request.script(script);// 可选:设置服务器自动重试request.retryOnConflict(3);try {UpdateResponse response = client.update(request, RequestOptions.DEFAULT);System.out.println("线程" + threadId + "脚本更新成功,count更新后的值:" + response.getGetResult().getSourceAsMap().get("count"));} catch (ElasticsearchException e) {if (e.isVersionConflictException()) {System.err.println("线程" + threadId + "脚本更新冲突,更新失败");} else {throw e;}}
}
优势:
脚本在服务器端原子执行,无需客户端查询后再更新,缩短了冲突窗口;
适合“基于当前值修改”的场景(如库存、点赞数),避免并发下的“丢失更新”。
4、业务层面规避(适用于可容忍最终一致性的场景)
若业务可接受最终一致性(而非强一致性),可采用以下策略减少冲突:
批量更新:将高频次小更新合并为低频次大更新(如1分钟批量同步一次);
分片更新:将大文档拆分为多个小文档(按ID哈希分片),减少同一文档的并发更新;
冲突忽略:非核心字段(如日志、浏览量)冲突时可直接忽略,接受最终覆盖(需评估业务影响)。
5、悲观锁(适用于强一致性、低并发场景)
ES本身不支持悲观锁,但可通过外部机制实现(如分布式锁):
更新前通过Redis/ZooKeeper获取分布式锁,确保同一时间只有一个线程更新文档;
更新完成后释放锁。
适用场景:并发极低、但对数据一致性要求极高的场景(如订单状态更新)。
缺点:会降低并发性能,且可能引入锁超时、死锁等问题,需谨慎使用。
