基于Bboss框架的ElasticSearch并发更新版本冲突问题解决
问题描述
基于Bboss框架的ElasticSearch部分数据更新,当同一条数据同时被多个请求进行并发更新时,会遇到的版本冲突问题。
具体报错
partial update result:{"took":7,"timed_out":false,"total":1,"updated":0,"deleted":0,"batches":1,"version_conflicts":1,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}解决方案
在不使用消息中间件和Redis分布式锁控制并发的情况下,推荐使用直接_update + retry_on_conflict的方式来实现。其原理为,无论是把查询和更新集成在一个http请求的updateByQuery方式;还是通过查询版本号_seq_no和_primary_term,再通过查询带入版本号更新的乐观锁方式,在并发程度较高的情况下都会存在版本冲突导致的更新失败问题。直接_update + retry_on_conflict伪代码实例如下:
@Autowired
private BBossESStarter bbossESStarter;private int partUpdateSpecialEsDataByVersions(String queryFieldName,Object queryFieldValue,String updateFieldName,Map<String,Object> updateFieldMap){ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());String index = getRealIndexName();Map<String,Object> params=new HashMap<>();params.put("queryFieldName", queryFieldName);params.put("queryFieldValue", queryFieldValue);MapRestResponse result = clientUtil.search(index + "/_search","querySpecialIndexByQueryParams", params);if(result != null && result.getSearchHits() != null){MapSearchHits searchHits = result.getSearchHits();if(searchHits != null && !CollectionUtils.isEmpty(searchHits.getHits())){List<MapSearchHit> hits = searchHits.getHits();MapSearchHit mapSearchHit = hits.get(0);String documentId = mapSearchHit.getId();params.put("updateFieldName",updateFieldName);params.put("updateFieldMap",updateFieldMap);String queryPath = index + "/_doc/" + documentId + "/_update?retry_on_conflict=3";log.debug("partial queryPath:"+queryPath);String resultUpdate = clientUtil.updateByPath(queryPath , "partialUpdateDocumentWithMap", params);log.debug("partial update result:"+resultUpdate);return checkIndexParitalUpdateResult(resultUpdate);}return 0;}return 0;}protected int checkIndexParitalUpdateResult(String result){try{JSONObject obj= JSON.parseObject(result);if("updated".equals(obj.getString("result"))){return 1;}}catch(Exception e){log.error("parse parital result error",e);}return -1;}对应的ES查询xml语句如下:
<property name="partialUpdateDocumentWithMap"><![CDATA[{"script": {"lang": "painless","source": @"""#foreach($item in $updateFieldMap.entrySet())#if($velocityCount > 0);#end#if($updateFieldName)ctx._source.$updateFieldName.$item.key= params.data.$item.key#elsectx._source.$item.key= params.data.$item.key#end#end""","params": {"data":#[updateFieldMap,serialJson=true]}}}]]></property><property name="querySpecialIndexByQueryParams"><![CDATA[{"seq_no_primary_term": true,"query": {"bool": {"must": [{"term": {#[queryFieldName]: #[queryFieldValue]}}]}}}]]></property>下面分别对三个方案的原理和实现方式进行介绍
_update_by_query更新
它内部实际上是一个批量操作,会先执行一个查询,然后对查询匹配的每个文档执行更新。
对于每个匹配的文档,更新操作会先读取文档的当前版本号(包括
_seq_no和_primary_term),然后尝试更新该文档,使用版本控制来确保在读取和更新之间没有其他更改。但是,请注意,
_update_by_query在处理多个文档时,并不是一个原子操作。它是针对每个文档逐个进行更新(虽然内部使用批量API,但更新是逐个进行的,并且整个操作不是原子的)。
简单来说就是在一个Http的查询语句里面同时写入查询和更新内容,在ES内部进行查询和修改,但是从查询到修改中间需要生成一份查询快照,数据更新频繁的时候,生成的快照获取的版本号就不是最新的,导致版本冲突失败。
其伪代码示例如下:
protected int partialUpdateDocumentByQuerys(String queryFieldName,Object queryFieldValue,String updateFieldName,Object updateFieldValue) {try {ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());{Map<String,Object> params=new HashMap<>();params.put("updateFieldName",updateFieldName);params.put("updateFieldValue",updateFieldValue);params.put("queryFieldName",queryFieldName);params.put("queryFieldValue",queryFieldValue);String result=clientUtil.updateByQuery (getRealIndexName()+"/"+indexTypeName+"/_update_by_query?conflicts=proceed", "partialUpdateDocumentByQuery",params);log.debug("partial update index result:"+result);return checkParitalUpdateResult(result);}}catch(Exception e){log.error("init error",e);return -1;}return -1;}protected int checkParitalUpdateResult(String result){try{JSONObject obj= JSON.parseObject(result);int updated=obj.getInteger("updated");int total=obj.getInteger("total");int confilicts=obj.getInteger("version_conflicts");if(total==0){return 0;}if(total>0 && updated<total && confilicts>0){return -1;}return 1;}catch(Exception e){log.error("parse parital result error",e);}return -1;}xml配置语句如下:
<property name="partialUpdateDocumentByQuery"><![CDATA[{"query": {"bool": {"must": [{"term": {#[queryFieldName]: #[queryFieldValue]}}]}},"script": {"lang": "painless","source": @"""ctx._source.$updateFieldName = params.data;""","params": {"data":#[updateFieldValue,serialJson=true]}}}]]></property>版本号_seq_no和_primary_term更新
通过两次Http请求,一次请求查询出对应的版本号_seq_no和_primary_term信息,第二次请求带入版本号信息进行更新
private int partUpdateSpecialEsDataByVersions2(String queryFieldName,Object queryFieldValue,String updateFieldName,Map<String,Object> updateFieldMap){ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());String index = getRealIndexName();Map<String,Object> params=new HashMap<>();params.put("queryFieldName", queryFieldName);params.put("queryFieldValue", queryFieldValue);//使用乐观锁,根据版本号更新ES部分指定数据信息,通过2次http请求完成MapRestResponse result = clientUtil.search(index + "/_search","querySpecialIndexByQueryParams", params);if(result != null && result.getSearchHits() != null){MapSearchHits searchHits = result.getSearchHits();if(searchHits != null && !CollectionUtils.isEmpty(searchHits.getHits())){List<MapSearchHit> hits = searchHits.getHits();MapSearchHit mapSearchHit = hits.get(0);String documentId = mapSearchHit.getId();long seqNo = mapSearchHit.getSeqNo();long primaryTerm = mapSearchHit.getPrimaryTerm();params.put("updateFieldName",updateFieldName);params.put("updateFieldMap",updateFieldMap);String queryPath = index + "/_doc/" + documentId + "/_update?seqNo=" + seqNo + "&primaryTerm=" + primaryTerm;log.debug("partial queryPath:"+queryPath);String resultUpdate = clientUtil.updateByPath(queryPath , "partialUpdateDocumentWithMap", params);log.debug("partial update result:"+resultUpdate);return checkIndexParitalUpdateResult(resultUpdate);}return 0;}return 0;}这里的xml方法和解决方案的基本保持一致。
直接_update + retry_on_conflict更新(推荐)
从底层原理来说,它们都是基于版本号进行更新的,但是_update 结合retry_on_conflict的优势是,直接根据文档ID来进行更新,相对于前两个方案来说,节省了一次根据版本号进行查询的开销,它是直接在ES服务器内部实时获取最新的版本号进行更新,还可以自动重试获取版本号,在最大程度上保证了更新成功。
结论
建议根据业务情况,如果有条件,尽可能通过消息中间件或者Redis分布式锁来保证控制并发冲突。
