一、核心设计原则
- 明确适用场景
- I/O密集型任务:网络请求、文件读写、数据库查询(优先异步)。
- 非阻塞响应需求:Web服务快速响应、实时消息推送。
- 避免滥用:短时计算或顺序依赖任务(如链式调用)不适合异步。
- 任务拆分与组合
- 粒度控制:将大任务拆分为独立子任务(如分步处理订单:支付→物流→通知)。
- 组合模式:
- 并行执行:
CompletableFuture.allOf()
等待所有任务完成。 - 任选最快:
CompletableFuture.anyOf()
获取首个结果。
- 并行执行:
二、编码规范
- 异步任务管理
- 自定义线程池:
关键参数:@Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 核心线程数 executor.setMaxPoolSize(20); // 最大线程数 executor.setQueueCapacity(100); // 任务队列容量 executor.setThreadNamePrefix("async-"); executor.initialize(); return executor; }
- CPU密集型任务:线程数 ≈ CPU核数。
- I/O密集型任务:线程数 = CPU核数 × (等待时间/计算时间)。
- 资源释放:
try (ExecutorService pool = Executors.newCachedThreadPool()) { CompletableFuture.runAsync(() -> task(), pool); } // 自动关闭线程池(Java 9+)
- 异常处理
- 强制兜底机制:
future.whenComplete((result, ex) -> { if (ex != null) { log.error("任务失败", ex); sendAlert(ex); // 异步异常通知 } });
- 结果降级:
future.exceptionally(ex -> "默认值"); // 异常时返回降级数据
- 上下文传递
- ThreadLocal失效解决方案:
// 使用TransmittableThreadLocal传递上下文 TtlRunnable wrappedTask = TtlRunnable.wrap(() -> { String traceId = MDC.get("traceId"); // 获取异步任务中的上下文 System.out.println("Trace ID: " + traceId); }); CompletableFuture.runAsync(wrappedTask);
三、性能优化
- 背压控制
- 响应式编程中限制数据流速:
Flux.range(1, 1000) .limitRate(100) // 每次处理100条 .subscribe();
- 响应式编程中限制数据流速:
- 超时机制
future.orTimeout(3, TimeUnit.SECONDS) // 超时后返回指定结果 .completeOnTimeout("超时结果", 3, TimeUnit.SECONDS);
- 避免阻塞回调
- 错误示例:在回调中执行同步阻塞操作。
- 正确做法:阻塞任务分配独立线程池:
future.thenApplyAsync(result -> { blockingOperation(result); // 阻塞操作 return result; }, ioExecutor); // 使用I/O专用线程池
四、监控与测试
- 指标监控
- 线程池状态:活跃线程数、队列堆积量(通过Micrometer接入Prometheus)。
- 任务耗时:记录每个阶段的执行时间。
- 单元测试
- 使用
CountDownLatch
等待异步任务完成:CountDownLatch latch = new CountDownLatch(1); future.thenRun(latch::countDown); latch.await(1, TimeUnit.SECONDS);
- 使用
五、框架级实践
- Spring异步配置
- 启用注解:
@Configuration @EnableAsync public class AsyncConfig { @Bean public Executor taskExecutor() { // 配置线程池 } }
- 异步方法标记:
@Service public class OrderService { @Async public void sendNotification(Order order) { /* 异步发送邮件 */ } }
- 响应式替代方案
- WebFlux流式处理:
@GetMapping("/stream") public Flux streamData() { return webClient.get() .uri("data-source") .retrieve() .bodyToFlux(Data.class); }
六、常见陷阱规避
问题 | 解决方案 |
---|---|
回调地狱 | 使用thenCompose() 扁平化嵌套,或async/await 风格(Loom虚拟线程)。 |
线程池资源耗尽 | 按任务类型隔离线程池(CPU/I/O分离)。 |
上下文丢失 | 使用TransmittableThreadLocal 传递MDC日志上下文。 |
七、核心实践总结表
类别 | 关键实践 | 示例代码 |
---|---|---|
线程池配置 | 按任务类型隔离线程池,设置合理队列大小 | new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)) |
异常处理 | 强制添加全局异常处理器 | future.exceptionally(ex -> handle(ex)) |
任务编排 | 使用thenCombine 组合结果,allOf 等待完成 |
future1.thenCombine(future2, (a,b) -> merge(a,b)) |
上下文传递 | 使用Transmitt:ableThreadLocal 或手动传递Context |
TtlRunnable.wrap(task) |
💡 生产建议:优先选择
CompletableFuture
处理复杂任务编排,结合自定义线程池和异常兜底机制。高并发场景可引入响应式编程(WebFlux)或Project Loom虚拟线程(Java 19+)进一步优化。