背景
最近项目组准备重构会员系统,搜索引擎选定为elasticSearch7.6.1。在业务编码中遇到了并发读写的问题。在springboot中使用RestHighLevelClient客户端对ES进行读写。在并发环境下,即便对ES的”写请求“已经通过restHighLevelClient发送给ES,同时也收到了响应成功,但实时上ES似乎也是对这个请求做了异步处理,在响应成功的同时可能写操作并没有执行完毕。
以下是我的代码验证
这里我删除一条记录覆盖到ES中
public synchronized void delTagcRela(T_MS_TAGC_RELA tar ,Integer hrId) throws IOException {
Integer msSgId = tar.getMsSgId();
Integer msTagId = tar.getMsTagId();
VT_MS_SHOPGUI_INFO queryParam = new VT_MS_SHOPGUI_INFO();
queryParam.setShopGuideHrid(hrId);
queryParam.setMsSgId(msSgId);
List<Map<String, Object>> rsts = elasticSearchUtil.searchMemberInner(queryParam);
Map<String, Object> rst = rsts.get(0);
ArrayList<HashMap<String,Object>> tags = (ArrayList) rst.get("tags");
if(tags==null||tags.size()==0){
throw new BusinessRuntimeException("未查到相关标签绑定记录");
}else{
HashMap<String,Object> oldItem = new HashMap<String,Object>();//待删
JSONObject paramItem = JSONObject.parseObject(JSON.toJSONString(tar));
for (HashMap<String,Object> item : tags) {
if (msTagId==item.get("msTagId"))
oldItem = item;//定位
}
tags.remove(oldItem);
rst.put("tags",tags);
long startTime = System.currentTimeMillis();
Boolean sucFlag = elasticSearchUtil.addMemberToEs(JSONObject.parseObject(JSON.toJSONString(rst)), msSgId.toString());
boolean continueFlag = true;
while (continueFlag){
//判断修改是否执行成功
if (sucFlag){
List<Map<String, Object>> theInfos = elasticSearchUtil.searchMemberInner(queryParam);
Map<String, Object> theInfo = theInfos.get(0);
ArrayList<HashMap<String,Object>> losTags = (ArrayList) theInfo.get("tags");
System.err.println("***剩下的标签:"+losTags);
if (losTags.size()==0)
continueFlag = false;
}
}
long endTime = System.currentTimeMillis();
System.err.println("耗时"+(endTime-startTime)+"ms");
}
}
elasticSearchUtil部分方法
/**
* 内部调用查客户信息
* @param tar
* @return
* @throws IOException
*/
public synchronized List<Map<String,Object>> searchMemberInner(VT_MS_SHOPGUI_INFO tar) throws IOException{
Integer hrId = tar.getShopGuideHrid();
Integer msSgId = tar.getMsSgId();
//条件搜索
SearchRequest searchRequest = new SearchRequest("shopguide_business_info");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (hrId!=null){
QueryBuilder queryBuilderSGH = QueryBuilders.matchQuery("shopGuideHrid",hrId);//导购id
if (msSgId!=null){
QueryBuilder queryBuilderSgId = QueryBuilders.matchQuery("msSgId",msSgId);//导购前置标识
boolQueryBuilder.must(queryBuilderSgId);
}
boolQueryBuilder.must(queryBuilderSGH);
}
sourceBuilder.query(boolQueryBuilder);
//设置请求超时时间
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//执行搜索
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//解析结果
List<Map<String,Object>> list = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
list.add(hit.getSourceAsMap());
}
return list;
}
/**
* 导购私有会员数据写入ES
* @param inJson
* @param docId
* @return
*/
public synchronized Boolean addMemberToEs(JSONObject inJson,String docId){
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("2m");//超时时间2min
bulkRequest.add(new IndexRequest("shopguide_business_info").source(inJson, XContentType.JSON).id(docId));
BulkResponse bulkResponse = null;
try {
bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("数据写入ES失败:"+e.getMessage());
throw new BusinessRuntimeException(e.getMessage().length()>30?"服务异常":e.getMessage());
}
return !bulkResponse.hasFailures();
}
运行结果及报错内容
测试过程中,不考虑效率,使用重量级锁的情况下,发现在bulkResponse.hasFailures()返回成功后的292ms才真正的查到了修改后的结果:
BulkResponse相关源码
似乎响应成功并不代表修改成功
/**
* Has anything failed with the execution.
*/
public boolean hasFailures() {
for (BulkItemResponse response : responses) {
if (response.isFailed()) {
return true;
}
}
return false;
}
我想要达到的结果
综上,如果是一组有顺序要求的逻辑数据并发写入,就会出现在ES没有修改完成就释放了锁而下一个请求又按既定逻辑读写的情况,进而影响业务的正常运行。
目前我能想到的解决方案就是在写后做循环复读(验证业务数据或es数据的version),验证是否真的写入成功,验证后再释放同步锁。
在此想请教各位是否还有更优雅的解决方案,对于以上陈述如有谬误也希望各位斧正,感激不尽。
这不是并发的问题好吧,你不看elasticsearch的基本介绍吗,近实时搜索引擎!数据的可见性取决于索引刷盘间隔,正常设置1s刷一次你的写入最差的情况下要1秒后刷盘才可能被检索,但是写入可以通过参数控制强制刷盘来达到立即可见,如果你用的spring的elasticsearchTemplate和elasticsearchRepository默认是会加参数强制刷盘的
经过上面采纳回答博主的提点,我明白了问题的关键,在此再次感谢悉心点拨。
先说结论
问题的关键并不是因为并发或异步之类的问题,而是因为ES中存在着一个刷盘机制。
关于ES的刷盘机制
相关链接
https://blog.csdn.net/seanxwq/article/details/122600294
简单来说就是在ES写入新文档时,是先将它写入了内存,然后根据ES配置的刷盘间隔(默认1s)将内存中的文档刷到盘中,也就是说通常情况下并非写入的同时就可以读到新文档。
代码分析
在采用restHighLevelClient时,可以通过对请求追赋参数的方式控制是否在写入后进行强制刷盘,或等刷盘后再关闭请求
public Boolean addMemberToEs(JSONObject inJson,String docId){
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("2m");//超时时间2min
bulkRequest.add(new IndexRequest("shopguide_business_info").source(inJson, XContentType.JSON).id(docId));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//索引刷盘
BulkResponse bulkResponse = null;
try {
bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("数据写入ES失败:"+e.getMessage());
throw new BusinessRuntimeException(e.getMessage().length()>30?"服务异常":e.getMessage());
}
return !bulkResponse.hasFailures();
}
关于请求控制参数的部分源码
/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
RefreshPolicy getRefreshPolicy();
ActionRequestValidationException validate();
enum RefreshPolicy implements Writeable {
/**
* Don't refresh after this request. The default.
*/
NONE("false"),
/**
* Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful
* to present a consistent view to for indices with very low traffic. And it is wonderful for tests!
*/
IMMEDIATE("true"),
/**
* Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is
* compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.
*/
WAIT_UNTIL("wait_for");