首页 > 编程语言 >一次多线程并发查询导致结果混乱的问题的排查和记录·JAVA·2022

一次多线程并发查询导致结果混乱的问题的排查和记录·JAVA·2022

时间:2022-12-31 02:11:07浏览次数:48  
标签:JAVA 管理者 查询 并发 线程 2022 scwSaleStatsRequest 多线程

业务背景

该业务是报表查询类业务:要求从销售出库数据(存储于ElasticSearch索引中)中,按照管理者分组聚合查询各个管理者一定时间段内的动销汇总数据,如出库金额、毛利率等。
但由于出库单索引中,只记录了每条出库记录的货主(商品持有人),而没有记录这些商品持有人的管理者(因为管理者和货主的关系是可变动的),因此无法直接按照管理者聚合,只能将先查出每个管理者所附属的货主,再把这些货主作为条件到ES索引中查询这些货主的销售汇总情况。一次查询,只能查到一个管理者的数据(根据其属下的货主),因此要查询管理者维度的汇总列表,需要分多次查询,有多少个管理者,就需要查询多少次。
为了加速查询过程,使用多线程并发查询ES,提高响应速度。

业务模型和查询过程简要示意如下:

代码示例

	@Override
    public List<ScwSaleStatsGroupByManagerResponse> fetchScwSaleStatsGroupByBusinessManager(ScwSaleStatsGroupByManagerRequest scwSaleStatsRequest) throws ExecutionException, InterruptedException, TimeoutException {
        List<ScwSaleStatsGroupByManagerResponse> responses = new ArrayList<>(scwSaleStatsRequest.getBusinessManagers().size());
        List<List<ScwSaleStatsGroupByManagerRequest.BusinessManagerDto>> businessManagerLists = ListUtils.partition(scwSaleStatsRequest.getBusinessManagers(), 10);
        //多线程并发过程,每10个管理者一批划为一批,并发查询。注意scwSaleStatsRequest
        for (List<ScwSaleStatsGroupByManagerRequest.BusinessManagerDto> businessManagers : businessManagerLists) {
            List<CompletableFuture<ScwSaleStatsGroupByManagerResponse>> responseFutures = new ArrayList<>();
            for (ScwSaleStatsGroupByManagerRequest.BusinessManagerDto businessManager : businessManagers) {
                //设置当前要查询的管理者所附带的货主,作为查询ES的条件
                scwSaleStatsRequest.setScwLogins(businessManager.getScwLogins());
                //实例化Supplier接口,定义好要并发的查询行为
                Supplier<ScwSaleStatsGroupByManagerResponse> supplier = () -> {
                    SearchSourceBuilder sourceBuilder = erpScwGroupSearchSourceBuilder(scwSaleStatsRequest);
                    setRuntimeMappings(sourceBuilder);
                    //设置ES要查询返回的聚合的汇总数据,如动销SKU数量等
                    //……省略一部分汇总
                    sourceBuilder.aggregation(AggregationField.SKU_COUNT.getAggregation());
                    //查询ElasticSearch,得到当前管理者(所管辖的一批货主)的汇总返回结果
                    SearchResponse searchResponse = detailGroupBySearch(sourceBuilder, ElasticSearchIndexEnum.ERP_SCW);
                    ElasticResponseModel elasticResponseModel = formateAggsResponse(searchResponse);
                    ElasticErpScwBO elasticErpScwBO = formateAggs(elasticResponseModel, ElasticErpScwBO.class);
                    elasticErpScwBO.calculateGrossMargin();
                    ScwSaleStatsGroupByManagerResponse response = BeanUtil.toBean(elasticErpScwBO, ScwSaleStatsGroupByManagerResponse.class);
                    response.setBusinessManagerId(businessManager.getBusinessManagerId());
                    return response;
                };
                CompletableFuture<ScwSaleStatsGroupByManagerResponse> responseFuture = CompletableFuture.supplyAsync(supplier, taskThreadPoolTaskScheduler);
                responseFutures.add(responseFuture);
            }
            //十个并发查询完成后,取得结果后再
            for (CompletableFuture<ScwSaleStatsGroupByManagerResponse> responseFuture : responseFutures) {
                responses.add(responseFuture.get(10, TimeUnit.SECONDS));
            }
        }
        //排序等后续操作,此处省略
        //………………
        return responses;
    }

问题表现

测试时,发现每次点击查询,返回的列表数据可能不一样:有几行有时是正确的数据,有时和ES索引中的汇总数据不一样,具体表现为列表中不同行(不同管理者)的汇总数据混淆了,A管理者的汇总数据下一次查询可能就到了B管理者上,且几个管理者可能有一模一样的数据。

排查过程

因为一开始对管理者-货主的关系使用了缓存,首先排查了缓存不存在的问题。
应用中有记录接口日志,可以明显看到两次请求入参完全一样,但是响应结果不同。结合数据”张冠李戴“的情况,基本锁定问题是ES查询条件有误。而错误的根源,只可能是多线程造成了对ES查询参数的改写。

问题分析

将上述代码中关键部分提取如下。

for (ScwSaleStatsGroupByManagerRequest.BusinessManagerDto businessManager : businessManagers) {
    //实例化Supplier接口,定义好要并发的查询行为
    Supplier<ScwSaleStatsGroupByManagerResponse> supplier = () -> {
		//设置当前要查询的管理者所附带的货主,作为查询ES的条件
		scwSaleStatsRequest.setScwLogins(businessManager.getScwLogins());
		//多线程并发过程,注意scwSaleStatsRequest
		//每个过程类似于首页的统计数量,一次同时查询最多1 0个商务经理的数据
		SearchSourceBuilder sourceBuilder = erpScwGroupSearchSourceBuilder(scwSaleStatsRequest);
		setRuntimeMappings(sourceBuilder);
		//…………
				    
		return response;
	};
	//…………
}

第3行代码修饰了scwSaleStatsRequest,给请求赋予了当前要查询的货主列表属性。
在第4行中,方法入参scwSaleStatsRequest被直接包进了接口Supplier实例化后的匿名对象中,其中的属性被用于实例化ES的查询源构建者(SearchSourceBuilder)。
然而,supplier对象中的scwSaleStatsRequest只是引用,相当于对象的地址。当程序运行,10个supplier对象中定义的任务被并发执行时,程序访问的是同一个scwSaleStatsRequest对象中的内容(数据),包括第2行赋予的scwLogins属性。
当不同的线程执行supplier任务时,可能取到了具有相同属性的同一个scwSaleStatsRequest:比如,子线程1和2同时发起,子线程2后于子线程1,在给scwSaleStatsRequest的scwLogins属性赋值后,子线程1才执行查询,则两个运行supplier任务的子线程,其中的scwSaleStatsRequest拥有的都是子线程2赋予的scwLogins属性。由此,两个子线程的查询结果相同也难以避免了。

根源探究

该问题是以顺序编程思维书写多线程程序而出现问题的典范,此处出现的是对共享对象的“脏读”问题,即读取了别的线程修改后对象scwSaleStatsRequest。
不同线程可以共享堆空间,一旦出现了多线程并发,就要考虑内存和变量的共享和竞争:是使用锁和同步工具等机制来使对关键资源的访问互斥、独占?还是想办法直接避免访问关键资源?
在上述问题中,不同线程访问了同一个scwSaleStatsRequest对象,其实相当于在业务开发中,在单例的service层中直接定义变量,只不过后者相对显露,前者相对隐蔽。
如果改动上面的程序如下,把修饰scwSaleStatsRequest对象(给其scwLogins属性赋值)的过程移出并发任务,移动到主线程中,如下代码所示,能解决上述问题吗?

for (ScwSaleStatsGroupByManagerRequest.BusinessManagerDto businessManager : businessManagers) {
		//设置当前要查询的管理者所附带的货主,作为查询ES的条件(该行代码在主线程中执行)
		scwSaleStatsRequest.setScwLogins(businessManager.getScwLogins());
	    //实例化Supplier接口,定义好要并发的查询行为
	    Supplier<ScwSaleStatsGroupByManagerResponse> supplier = () -> {
			//多线程并发过程,注意scwSaleStatsRequest
			//每个过程类似于首页的统计数量,一次同时查询最多1 0个商务经理的数据
			SearchSourceBuilder sourceBuilder = erpScwGroupSearchSourceBuilder(scwSaleStatsRequest);
			setRuntimeMappings(sourceBuilder);
			//…………
				    
			return response;
	};
	//…………
}

显然,这样做也无法解决。虽然子线程中不再修改scwSaleStatsRequest对象,但子线程执行过程中,scwSaleStatsRequest对象中的scwLogins属性仍然可能发生变动:主线程两次或多次修改完scwSaleStatsRequest,子线程才陆续启动。
再引申下,关于不同线程中操作的执行顺序,JAVA规范规定了Happens-Before原则,规定在编译器编译JAVA代码,生成字节码指令时,禁止特定的指令重排。这些原则主要是为了在方便编译器优化程序执行效率的同时,保证不同线程间的并发安全。其中两个原则如下:

程序次序规则:在一个线程内一段代码的执行结果是有序的。
线程启动规则:在主线程A执行过程中,启动子线程B,那么线程A在启动子线程B之前对共享变量的修改结果对线程B可见。

在原始代码中,10个子线程的启动顺序、不同线程中指令的执行顺序是不确定的,先修饰scwSaleStatsRequest请求的线程,不一定先执行查询ES的一步;在上述修改代码中,主线程修饰的scwSaleStatsRequest请求会对接下来的子线程可见,但按照happensBefore原则,10次修饰对于不同的子线程,是“不一定”可见的,没有保证子线程能看到最新的scwSaleStatsRequest请求,但更不能保证子线程看到在各自的循环内,主线程修饰后的旧的scwSaleStatsRequest。

需要解决上述问题,还是需要回到对关键资源共享的规避上。

解决办法

修改代码如下:

for (List<ScwSaleStatsGroupByManagerRequest.BusinessManagerDto> businessManagers : businessManagerLists) {
	List<CompletableFuture<ScwSaleStatsGroupByManagerResponse>> responseFutures = new ArrayList<>();
	for (ScwSaleStatsGroupByManagerRequest.BusinessManagerDto businessManager : businessManagers) {
		//复制原始请求,使每个线程使用自己的请求(资源)
		ScwSaleStatsRequest request = new ScwSaleStatsRequest();
		BeanUtil.copyProperties(scwSaleStatsRequest, request);
		request.setScwLogins(businessManager.getScwLogins());
		Supplier<ScwSaleStatsGroupByManagerResponse> supplier = () -> {
			//每个过程类似于首页的统计数量,一次同时查询最多1 0个商务经理的数据
			SearchSourceBuilder sourceBuilder = erpScwGroupSearchSourceBuilder(request);
			setRuntimeMappings(sourceBuilder);
		//……
		}
	}
}

上述代码中,在循环体中, 每个原来的scwSaleStatsRequest被复制为内容相同,但地址不同的新对象request。各个子线程都使用自己独享的request,根本上规避了对同一个scwSaleStatsRequest对象的冲突。把上述复制和修饰请求的代码移动到supplier任务的前三行也同样没有问题。

总结和感想

此次问题在测试阶段被我自己发现并修改,没有造成业务上的影响。这样的问题也提了个醒,面对多线程编程时要格外小心,正确处理好多线程对关键资源的同时访问问题,采取规避访问或者互斥访问的操作来保证并发安全。

2022年就要过去了,这是如此复杂的一年,世界杯的欢乐过后,回首才是沧桑。在短短一年间,疫情形势和防控措施千变万化,最终在年底发生了巨大的转向。处在潮流中的人们被裹挟着,继续走向未知的未来,华夏大地和蓝色星球都在迷茫中继续前行。

在今年年初更换了公司后,我便一直安居于此,在业务和技术上都有不少积累,也度过了工作上相对快乐的时光。然而,生活中的我也同样要经历祖国同胞和全世界人们共同经历的一切,苦难和欢乐总是相生相随。变换莫测的世界中,唯有调整好心态才有希望和未来。

毫无疑问我没有如去年年底的所料想那般写下更多技术文章。以后还有多少机会做更多技术上原创性的工作呢?我不清楚,毕竟技术和工作只是生活的一部分,还有很多事情或者任务等待着我完成。不过有可能的话,我应该还是愿意继续分享一些经验教训和收获感想,在学习与交流中不断进步。苟日新,日日新,莫过于此。

标签:JAVA,管理者,查询,并发,线程,2022,scwSaleStatsRequest,多线程
From: https://www.cnblogs.com/cavern-builder-zyx/p/17016165.html

相关文章