Guava并发编程深度解析
回调注册的核心概念
什么是回调注册?
回调注册是一种异步编程模式,它允许我们在启动一个异步任务后,预先定义任务完成时要执行的逻辑(回调函数),而不需要阻塞等待任务完成。
为什么需要回调注册?
在传统同步编程中:
Future<String> future = executor.submit(task); // 提交任务
String result = future.get(); // 阻塞等待结果
processResult(result); // 处理结果
这种模式存在两个关键问题:
- 线程阻塞:主线程在
get()
处被阻塞 - 资源浪费:CPU 在等待期间无法执行其他任务
回调注册通过非阻塞通知机制解决这些问题:
ListenableFuture<String> future = executor.submit(task);
Futures.addCallback(future, new FutureCallback<String>() {
public void onSuccess(String result) {
// 任务完成后自动执行
processResult(result);
}
});
// 主线程继续执行其他任务
JVM 运行流程详解
以下通过一个完整的订单处理系统示例,展示回调注册在 JVM 中的执行流程:
import com.google.common.util.concurrent.*;
import java.util.concurrent.*;
public class OrderSystem {
// 1. 创建可监听线程池
private static final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
public static void main(String[] args) {
System.out.println("【主线程】启动订单处理系统");
// 2. 提交订单处理任务
ListenableFuture<OrderResult> future = processOrder("ORDER-101");
// 3. 注册回调函数
Futures.addCallback(future, new FutureCallback<OrderResult>() {
@Override
public void onSuccess(OrderResult result) {
System.out.println("【回调线程】订单处理成功: " + result.orderId());
System.out.println("【回调线程】库存更新: " + result.stock());
}
@Override
public void onFailure(Throwable t) {
System.err.println("【回调线程】订单处理失败: " + t.getMessage());
}
}, executor);
// 4. 主线程继续执行其他任务
System.out.println("【主线程】继续处理其他业务");
processOtherTasks();
// 5. 关闭线程池
executor.shutdown();
}
// 模拟订单处理
private static ListenableFuture<OrderResult> processOrder(String orderId) {
System.out.println("【任务线程】开始处理订单: " + orderId);
return executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(1000);
System.out.println("【任务线程】订单处理完成: " + orderId);
return new OrderResult(orderId, new Random().nextInt(100));
});
}
// 模拟其他任务
private static void processOtherTasks() {
System.out.println("【主线程】执行其他业务逻辑");
// 模拟其他工作
try {
Thread.sleep(500); } catch (InterruptedException e) {
}
}
// 领域模型
record OrderResult(String orderId, int stock) {
}
}
JVM 运行流程分析
阶段1:初始化(主线程)
- JVM 加载
OrderSystem
类 - 主线程创建监听线程池(2个工作线程)
- 调用
processOrder()
提交任务 - 注册回调函数(但尚未执行)
- 主线程继续执行
processOtherTasks()