多线程执行List的addAll方法产生的并发问题

发布于:2024-04-26 ⋅ 阅读:(27) ⋅ 点赞:(0)

问题分析

将查询条件subList分为70个一组,通过CompletableFuture执行异步多线程分批次查询数据库,查询完成后在whenCompleteAsync方法中将结果存储在resultList中。

诡异的情况发生了,查询出来的结果resultList中有10000个数据,其中70 * n个为null的对象

try {
    //分页的参数 每个大小70 总大小100000
    List<List<Long>> subList = ......;
    //结果集
    List<QueryEntity> resultList = new ArrayList<>();
    //全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
    CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {
             resultList.addAll(v);
           })).toArray(CompletableFuture[]::new);
    //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
    CompletableFuture.allOf(cfs).join();
    
    log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
    //结果集中空对象的个数: 70
    
} catch (Exception e) {
    throw new RuntimeException("异常", e);
}

问题排查

猜测1:查询数据库的问题

查询数据库的过程中,因为网络或者其他原因导致查询出来的结果为null

List<QueryEntity> querySQLList(List<Long> paramList) {
    List<QueryEntity> resultList = this.sqlMapper.querySQL(paramList);
    log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
    //结果集中空对象的个数: 0
    return resultList;
}

执行方法后,日志打印的空对象个数都是0,所以这一步能查到数据

猜测2: whenCompleteAsync方法的问题

怀疑是whenCompleteAsync方法接受的参数为List中全是空参数,于是在方法中增加日志

CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {
             resultList.addAll(v);
    		log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
    		//结果集中空对象的个数: 0
           })).toArray(CompletableFuture[]::new);
    //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
    CompletableFuture.allOf(cfs).join();

但是结果也是空对象个数都是0

最终原因

因为是异步多线程,所有有可能会产生并发问题。因为数据大小为10000,一个数据不多也不少,所以一开始没想到是并发导致的问题。

CopyOnWriteArrayList是线程安全的List,替换ArrayList后,就没有发生过List中为空的问题了

//结果集
List<QueryEntity> resultList = new CopyOnWriteArrayList<>();
//....分片查询