【多线程】TaskHandler-分享一个便携式多线程应用工具

发布于:2024-12-18 ⋅ 阅读:(45) ⋅ 点赞:(0)

先上核心代码:

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class TaskHandler {
    private static final Logger log = LoggerFactory.getLogger(TaskHandler.class);
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public TaskHandler() {
    }

    public void async(Runnable task) {
        this.threadPoolTaskExecutor.execute(task);
    }

    public <T, R> List<R> execute(List<T> list, Function<T, R> function) {
        int core = Runtime.getRuntime().availableProcessors();
        return this.execute(list, function, Math.max(core, 8));
    }

    public <T, R> List<R> execute(List<T> list, Function<T, R> function, int parallelThreadSize) {
        if (list.isEmpty()) {
            return Collections.emptyList();
        } else if (list.size() == 1) {
            R r = function.apply(list.get(0));
            return r == null ? Collections.emptyList() : Collections.singletonList(r);
        } else {
            ArrayList futures;
            ArrayList rs;
            Iterator var14;
            Future future;
            if (list.size() <= parallelThreadSize) {
                futures = new ArrayList();
                Iterator var13 = list.iterator();

                while(var13.hasNext()) {
                    T t = var13.next();
                    futures.add(this.threadPoolTaskExecutor.submit(() -> {
                        return function.apply(t);
                    }));
                }

                rs = new ArrayList();
                var14 = futures.iterator();

                while(var14.hasNext()) {
                    future = (Future)var14.next();

                    try {
                        R r = future.get();
                        if (r != null) {
                            rs.add(r);
                        }
                    } catch (ExecutionException | InterruptedException var9) {
                        log.error("TaskHandler.execute Error", var9);
                    }
                }

                return rs;
            } else {
                futures = new ArrayList();
                List[] var5 = this.allocate(list, parallelThreadSize);
                int var6 = var5.length;

                List ls;
                for(int var7 = 0; var7 < var6; ++var7) {
                    ls = var5[var7];
                    futures.add(this.threadPoolTaskExecutor.submit(() -> {
                        List<R> rs = new ArrayList();
                        Iterator var3 = ls.iterator();

                        while(var3.hasNext()) {
                            T t = var3.next();
                            R r = function.apply(t);
                            if (r != null) {
                                rs.add(r);
                            }
                        }

                        return rs;
                    }));
                }

                rs = new ArrayList();
                var14 = futures.iterator();

                while(var14.hasNext()) {
                    future = (Future)var14.next();

                    try {
                        ls = (List)future.get();
                        if (!ls.isEmpty()) {
                            rs.addAll(ls);
                        }
                    } catch (ExecutionException | InterruptedException var10) {
                    }
                }

                return rs;
            }
        }
    }

    private <T> List<T>[] allocate(List<T> list, int parallelThreadSize) {
        List<T>[] array = new List[parallelThreadSize];

        for(int i = 0; i < list.size(); ++i) {
            T t = list.get(i);
            int idx = i % parallelThreadSize;
            List<T> ls = array[idx];
            if (ls == null) {
                ls = new ArrayList();
                array[idx] = (List)ls;
            }

            ((List)ls).add(t);
        }

        return array;
    }
}

应用示例:

先引入这个东西

@Resource
private TaskHander taskHandler;

多线程执行:

// 第一个参数是列表【实体A】,第二个参数是执行的方法
 List<ProductDetail> list = ....;
 List<ProductDetailDTO> dtoList = taskHandler.execute(list, vo -> convertToDTO(vo, userInfo));
// 方法代码
 private ProductDetailDTO convertToDTO(ProductDetail productDetail, UserInfo userInfo) {
    // 逻辑处理 这里将传入的 ProductDetail  转化为 ProductDetailDTO 
 }

累积多次执行:

List<ProductionSpu> spuInsertList = new ArrayList<>();
// 设置spuInsertList 的size属性
// 根据ProductionSpu的 所有size进行多次查询,结果总和汇集在newSpuLists 中
            List<List<ProductionSpu>> newSpuLists = taskHandler.execute(spuInsertList, i -> productionSpuMapper.selectList(Wrappers.<ProductionSpu>lambdaQuery()
                         
                            .eq(ProductionSpu::getSize, i.getSize())
                          
                    )
            );

异步执行:

 taskHandler.async(() -> {
            //获取日志
            List<FactoryCenterLog> logs = ....;
            //插入日志
            factoryCenterLogMapper.insertList(logs);
        });

网站公告

今日签到

点亮在社区的每一天
去签到