当前位置: 首页 > news >正文

基于Bboss框架的ElasticSearch并发更新版本冲突问题解决

问题描述

基于Bboss框架的ElasticSearch部分数据更新,当同一条数据同时被多个请求进行并发更新时,会遇到的版本冲突问题。

具体报错

partial update result:{"took":7,"timed_out":false,"total":1,"updated":0,"deleted":0,"batches":1,"version_conflicts":1,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}

解决方案

在不使用消息中间件和Redis分布式锁控制并发的情况下,推荐使用直接_update + retry_on_conflict的方式来实现。其原理为,无论是把查询和更新集成在一个http请求的updateByQuery方式;还是通过查询版本号_seq_no和_primary_term,再通过查询带入版本号更新的乐观锁方式,在并发程度较高的情况下都会存在版本冲突导致的更新失败问题。直接_update + retry_on_conflict伪代码实例如下:

@Autowired
private BBossESStarter bbossESStarter;private int partUpdateSpecialEsDataByVersions(String queryFieldName,Object queryFieldValue,String updateFieldName,Map<String,Object> updateFieldMap){ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());String index = getRealIndexName();Map<String,Object> params=new HashMap<>();params.put("queryFieldName", queryFieldName);params.put("queryFieldValue", queryFieldValue);MapRestResponse result =  clientUtil.search(index + "/_search","querySpecialIndexByQueryParams", params);if(result != null && result.getSearchHits() != null){MapSearchHits searchHits = result.getSearchHits();if(searchHits != null && !CollectionUtils.isEmpty(searchHits.getHits())){List<MapSearchHit> hits = searchHits.getHits();MapSearchHit mapSearchHit = hits.get(0);String documentId = mapSearchHit.getId();params.put("updateFieldName",updateFieldName);params.put("updateFieldMap",updateFieldMap);String queryPath = index + "/_doc/" + documentId + "/_update?retry_on_conflict=3";log.debug("partial queryPath:"+queryPath);String resultUpdate = clientUtil.updateByPath(queryPath , "partialUpdateDocumentWithMap", params);log.debug("partial update result:"+resultUpdate);return checkIndexParitalUpdateResult(resultUpdate);}return 0;}return 0;}protected  int checkIndexParitalUpdateResult(String result){try{JSONObject obj= JSON.parseObject(result);if("updated".equals(obj.getString("result"))){return 1;}}catch(Exception e){log.error("parse parital result error",e);}return -1;}

对应的ES查询xml语句如下:

<property name="partialUpdateDocumentWithMap"><![CDATA[{"script": {"lang": "painless","source": @"""#foreach($item in $updateFieldMap.entrySet())#if($velocityCount > 0);#end#if($updateFieldName)ctx._source.$updateFieldName.$item.key= params.data.$item.key#elsectx._source.$item.key= params.data.$item.key#end#end""","params": {"data":#[updateFieldMap,serialJson=true]}}}]]></property><property name="querySpecialIndexByQueryParams"><![CDATA[{"seq_no_primary_term": true,"query": {"bool": {"must": [{"term": {#[queryFieldName]: #[queryFieldValue]}}]}}}]]></property>

下面分别对三个方案的原理和实现方式进行介绍

_update_by_query更新

它内部实际上是一个批量操作,会先执行一个查询,然后对查询匹配的每个文档执行更新。

对于每个匹配的文档,更新操作会先读取文档的当前版本号(包括_seq_no_primary_term),然后尝试更新该文档,使用版本控制来确保在读取和更新之间没有其他更改。

但是,请注意,_update_by_query在处理多个文档时,并不是一个原子操作。它是针对每个文档逐个进行更新(虽然内部使用批量API,但更新是逐个进行的,并且整个操作不是原子的)。

简单来说就是在一个Http的查询语句里面同时写入查询和更新内容,在ES内部进行查询和修改,但是从查询到修改中间需要生成一份查询快照,数据更新频繁的时候,生成的快照获取的版本号就不是最新的,导致版本冲突失败。

其伪代码示例如下:

protected int partialUpdateDocumentByQuerys(String queryFieldName,Object queryFieldValue,String updateFieldName,Object updateFieldValue) {try {ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());{Map<String,Object> params=new HashMap<>();params.put("updateFieldName",updateFieldName);params.put("updateFieldValue",updateFieldValue);params.put("queryFieldName",queryFieldName);params.put("queryFieldValue",queryFieldValue);String result=clientUtil.updateByQuery (getRealIndexName()+"/"+indexTypeName+"/_update_by_query?conflicts=proceed", "partialUpdateDocumentByQuery",params);log.debug("partial update index result:"+result);return checkParitalUpdateResult(result);}}catch(Exception e){log.error("init error",e);return -1;}return -1;}protected  int checkParitalUpdateResult(String result){try{JSONObject obj= JSON.parseObject(result);int updated=obj.getInteger("updated");int total=obj.getInteger("total");int confilicts=obj.getInteger("version_conflicts");if(total==0){return 0;}if(total>0 && updated<total && confilicts>0){return -1;}return 1;}catch(Exception e){log.error("parse parital result error",e);}return -1;}

xml配置语句如下:

<property name="partialUpdateDocumentByQuery"><![CDATA[{"query": {"bool": {"must": [{"term": {#[queryFieldName]: #[queryFieldValue]}}]}},"script": {"lang": "painless","source": @"""ctx._source.$updateFieldName = params.data;""","params": {"data":#[updateFieldValue,serialJson=true]}}}]]></property>

版本号_seq_no和_primary_term更新

通过两次Http请求,一次请求查询出对应的版本号_seq_no和_primary_term信息,第二次请求带入版本号信息进行更新

private int partUpdateSpecialEsDataByVersions2(String queryFieldName,Object queryFieldValue,String updateFieldName,Map<String,Object> updateFieldMap){ClientInterface clientUtil = bbossESStarter.getConfigRestClient(getESCommonMappingFile());String index = getRealIndexName();Map<String,Object> params=new HashMap<>();params.put("queryFieldName", queryFieldName);params.put("queryFieldValue", queryFieldValue);//使用乐观锁,根据版本号更新ES部分指定数据信息,通过2次http请求完成MapRestResponse result =  clientUtil.search(index + "/_search","querySpecialIndexByQueryParams", params);if(result != null && result.getSearchHits() != null){MapSearchHits searchHits = result.getSearchHits();if(searchHits != null && !CollectionUtils.isEmpty(searchHits.getHits())){List<MapSearchHit> hits = searchHits.getHits();MapSearchHit mapSearchHit = hits.get(0);String documentId = mapSearchHit.getId();long seqNo = mapSearchHit.getSeqNo();long primaryTerm = mapSearchHit.getPrimaryTerm();params.put("updateFieldName",updateFieldName);params.put("updateFieldMap",updateFieldMap);String queryPath = index + "/_doc/" + documentId + "/_update?seqNo=" + seqNo + "&primaryTerm=" + primaryTerm;log.debug("partial queryPath:"+queryPath);String resultUpdate = clientUtil.updateByPath(queryPath , "partialUpdateDocumentWithMap", params);log.debug("partial update result:"+resultUpdate);return checkIndexParitalUpdateResult(resultUpdate);}return 0;}return 0;}

这里的xml方法和解决方案的基本保持一致。

直接_update + retry_on_conflict更新(推荐)

从底层原理来说,它们都是基于版本号进行更新的,但是_update 结合retry_on_conflict的优势是,直接根据文档ID来进行更新,相对于前两个方案来说,节省了一次根据版本号进行查询的开销,它是直接在ES服务器内部实时获取最新的版本号进行更新,还可以自动重试获取版本号,在最大程度上保证了更新成功。

结论

建议根据业务情况,如果有条件,尽可能通过消息中间件或者Redis分布式锁来保证控制并发冲突。

http://www.dtcms.com/a/617804.html

相关文章:

  • Highcharts常见问题解析(5):如何将多个图表导出到同一张图片或 PDF?
  • 什么是中间件?必须要有中间件吗?有哪些国产中间件厂商?
  • 第七章深度解析:从零构建智能体框架——模块化设计与全流程落地指南
  • 机器视觉3D无序抓取如何确保抓取精度,需要从以下五个核心方面入手,形成一个闭环的控制系统
  • Git Bisect - Git Commit 故障排查利器使用详解
  • 青岛科技街网站建设不懂外贸做外贸网站好做吗
  • 2511C++,CTAD简化回调
  • 【ros2】ROS2 C++参数设置指南(含跨节点修改方法)
  • STM32通信接口----USART
  • 解决Web游戏Canvas内容在服务器部署时的显示问题
  • 我爱学算法之—— 哈希
  • Linux字符设备驱动模型
  • C++ List 容器详解:迭代器失效、排序与高效操作
  • 婚纱网站wordpress微商模板
  • GPT问答:泛型、哈希表与缓存、命名参数。251116
  • 免费学软件的自学网站保健品网站建设流程
  • 网络访问流程:HTTPS + TCP + IP
  • 智能体AI、技术浪潮与冲浪哲学
  • 基于 PyTorch + BERT 意图识别与模型微调
  • 沃尔沃公司网站建设微信官方网站建设
  • 网站备案域名怎么买找在农村适合的代加工
  • 42 解决一些问题
  • Claude Code 功能+技巧
  • 基于人脸识别和 MySQL 的考勤管理系统实现
  • AUTOSAR_CP_OS-Operating System for Multi-Core:多核操作系统
  • 什么是 “信任模型” 和 “安全假设”?
  • 【秣厉科技】LabVIEW工具包——HIKRobot(海康机器人系列)
  • 网易UU远程全功能技术解构:游戏级性能突围与安全边界探析
  • 蓝桥杯第八届省赛单片机设计完全入门(零基础保姆级教程)
  • 搭建网站分类建立名词