问题分析
将查询条件subList分为70个一组,通过CompletableFuture执行异步多线程分批次查询数据库,查询完成后在whenCompleteAsync方法中将结果存储在resultList中。
诡异的情况发生了,查询出来的结果resultList中有10000个数据,其中70 * n个为null的对象
try {
//分页的参数 每个大小70 总大小100000
List<List<Long>> subList = ......;
//结果集
List<QueryEntity> resultList = new ArrayList<>();
//全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {
resultList.addAll(v);
})).toArray(CompletableFuture[]::new);
//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
//结果集中空对象的个数: 70
} catch (Exception e) {
throw new RuntimeException("异常", e);
}
问题排查
猜测1:查询数据库的问题
查询数据库的过程中,因为网络或者其他原因导致查询出来的结果为null
List<QueryEntity> querySQLList(List<Long> paramList) {
List<QueryEntity> resultList = this.sqlMapper.querySQL(paramList);
log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
//结果集中空对象的个数: 0
return resultList;
}
执行方法后,日志打印的空对象个数都是0,所以这一步能查到数据
猜测2: whenCompleteAsync方法的问题
怀疑是whenCompleteAsync方法接受的参数为List中全是空参数,于是在方法中增加日志
CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {
resultList.addAll(v);
log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());
//结果集中空对象的个数: 0
})).toArray(CompletableFuture[]::new);
//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
但是结果也是空对象个数都是0
最终原因
因为是异步多线程,所有有可能会产生并发问题。因为数据大小为10000,一个数据不多也不少,所以一开始没想到是并发导致的问题。
CopyOnWriteArrayList是线程安全的List,替换ArrayList后,就没有发生过List中为空的问题了
//结果集
List<QueryEntity> resultList = new CopyOnWriteArrayList<>();
//....分片查询