使用京东AsyncTool实现异步编排

发布于:2025-02-18 ⋅ 阅读:(146) ⋅ 点赞:(0)

 asyncTool: 解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。多线程编排一站式解决方案。来自于京东主App后台。

/**
     * 批量更新用户(使用京东AsyncTool)
     *
     * @param idList
     */
    public void batchUpdateAsyncTool(List<Long> idList) {
        Boolean resultBoolean = processOrderAsyncTool(idList);
        if (resultBoolean) {
            System.out.println("所有都执行完毕");
        }
    }
@Transactional(rollbackFor = Exception.class)
    public Boolean processOrderAsyncTool(List<Long> idList) {
        List<WorkerWrapper> workerWrapperList = new ArrayList<>();
        for (int i = 0; i < idList.size(); i++) {
            UserServiceUpdateAsyncTool userServiceUpdateAsyncToolObject = new UserServiceUpdateAsyncTool();
            WorkerWrapper<Long, Boolean> workerWrapper = new WorkerWrapper.Builder<Long, Boolean>()
                    .id("wrapper" + idList.get(i))
                    .worker(userServiceUpdateAsyncToolObject)
                    .callback(userServiceUpdateAsyncToolObject)
                    .param(idList.get(i))//1+1
                    .build();
            workerWrapperList.add(workerWrapper);
        }
        try {
            //3个WorkerWrapper一起begin
            com.jd.platform.async.executor.Async.beginWork(1000, asyncExecutor, workerWrapperList);
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
        workerWrapperList.stream().forEach(workerWrapper -> {
            System.out.println("workResult:" + workerWrapper.getWorkResult());
            if ("EXCEPTION".equals(workerWrapper.getWorkResult().getResultState().name())) {
                if (workerWrapper.getWorkResult().getEx() instanceof BusinessException) {
                    throw new BusinessException(workerWrapper.getWorkResult().getEx().getMessage());
                } else {
                    throw new RuntimeException(workerWrapper.getWorkResult().getEx().getMessage());
                }
            }
        });
        return Boolean.TRUE;
    }
@Slf4j
@Service
public class UserServiceUpdateAsyncTool implements IWorker<Long, Boolean>, ICallback<Long, Boolean> {

    private UserService userService;


    @Transactional(rollbackFor = Exception.class)
    public Boolean funcAsyncTool(Long id) {
        System.out.println("lo开始=" + id);
        userService = ApplicationContextHolder.context.getBean(UserService.class);
//        try {
//            Thread.sleep(5000);
        User user = new User();
        user.setAge(Integer.valueOf(String.valueOf(Long.valueOf("30") + id)));
        if (id.equals(2L)) {
            throw new BusinessException("出现了2异常");
        }
        else if (id.equals(3L)) {
            throw new RuntimeException("出现了3异常");
        }
        }
        this.userService.update(user, Wrappers.lambdaUpdate(User.class).eq(User::getId, id));
//        } catch (InterruptedException e) {

//        } catch (BusinessException e) {

//        } catch (RuntimeException e) {

//        }
        System.out.println("lo结束=" + id);
        return Boolean.TRUE;
    }

    @Override
    public void begin() {

    }

    @Override
    public void result(boolean b, Long s, WorkResult<Boolean> workResult) {
//        if ("EXCEPTION".equals(workResult.getResultState().name())) {
//            throw new BusinessException(workResult.getEx().getMessage());
//        }
    }

    @Override
    public Boolean action(Long s, Map<String, WorkerWrapper> map) {
        return this.funcAsyncTool(s);
    }

    @Override
    public Boolean defaultValue() {
        return null;
    }
}

运行结果:

{
    "success": false,
    "code": "505",
    "message": "出现了2异常",
    "data": null
}

入参:[1,2,3,4,5]

运行结果,2、3没有更新,更新了1、4、5


网站公告

今日签到

点亮在社区的每一天
去签到