先上核心代码:
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@Component
public class TaskHandler {
private static final Logger log = LoggerFactory.getLogger(TaskHandler.class);
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
public TaskHandler() {
}
public void async(Runnable task) {
this.threadPoolTaskExecutor.execute(task);
}
public <T, R> List<R> execute(List<T> list, Function<T, R> function) {
int core = Runtime.getRuntime().availableProcessors();
return this.execute(list, function, Math.max(core, 8));
}
public <T, R> List<R> execute(List<T> list, Function<T, R> function, int parallelThreadSize) {
if (list.isEmpty()) {
return Collections.emptyList();
} else if (list.size() == 1) {
R r = function.apply(list.get(0));
return r == null ? Collections.emptyList() : Collections.singletonList(r);
} else {
ArrayList futures;
ArrayList rs;
Iterator var14;
Future future;
if (list.size() <= parallelThreadSize) {
futures = new ArrayList();
Iterator var13 = list.iterator();
while(var13.hasNext()) {
T t = var13.next();
futures.add(this.threadPoolTaskExecutor.submit(() -> {
return function.apply(t);
}));
}
rs = new ArrayList();
var14 = futures.iterator();
while(var14.hasNext()) {
future = (Future)var14.next();
try {
R r = future.get();
if (r != null) {
rs.add(r);
}
} catch (ExecutionException | InterruptedException var9) {
log.error("TaskHandler.execute Error", var9);
}
}
return rs;
} else {
futures = new ArrayList();
List[] var5 = this.allocate(list, parallelThreadSize);
int var6 = var5.length;
List ls;
for(int var7 = 0; var7 < var6; ++var7) {
ls = var5[var7];
futures.add(this.threadPoolTaskExecutor.submit(() -> {
List<R> rs = new ArrayList();
Iterator var3 = ls.iterator();
while(var3.hasNext()) {
T t = var3.next();
R r = function.apply(t);
if (r != null) {
rs.add(r);
}
}
return rs;
}));
}
rs = new ArrayList();
var14 = futures.iterator();
while(var14.hasNext()) {
future = (Future)var14.next();
try {
ls = (List)future.get();
if (!ls.isEmpty()) {
rs.addAll(ls);
}
} catch (ExecutionException | InterruptedException var10) {
}
}
return rs;
}
}
}
private <T> List<T>[] allocate(List<T> list, int parallelThreadSize) {
List<T>[] array = new List[parallelThreadSize];
for(int i = 0; i < list.size(); ++i) {
T t = list.get(i);
int idx = i % parallelThreadSize;
List<T> ls = array[idx];
if (ls == null) {
ls = new ArrayList();
array[idx] = (List)ls;
}
((List)ls).add(t);
}
return array;
}
}
应用示例:
先引入这个东西
@Resource
private TaskHander taskHandler;
多线程执行:
// 第一个参数是列表【实体A】,第二个参数是执行的方法
List<ProductDetail> list = ....;
List<ProductDetailDTO> dtoList = taskHandler.execute(list, vo -> convertToDTO(vo, userInfo));
// 方法代码
private ProductDetailDTO convertToDTO(ProductDetail productDetail, UserInfo userInfo) {
// 逻辑处理 这里将传入的 ProductDetail 转化为 ProductDetailDTO
}
累积多次执行:
List<ProductionSpu> spuInsertList = new ArrayList<>();
// 设置spuInsertList 的size属性
// 根据ProductionSpu的 所有size进行多次查询,结果总和汇集在newSpuLists 中
List<List<ProductionSpu>> newSpuLists = taskHandler.execute(spuInsertList, i -> productionSpuMapper.selectList(Wrappers.<ProductionSpu>lambdaQuery()
.eq(ProductionSpu::getSize, i.getSize())
)
);
异步执行:
taskHandler.async(() -> {
//获取日志
List<FactoryCenterLog> logs = ....;
//插入日志
factoryCenterLogMapper.insertList(logs);
});