我们在上篇实现了一个轻量级 RPC 框架,现在要进一步优化 —— 加入异步响应支持,让 RPC 通信变得真正高效、非阻塞、支持并发。
一、为什么需要异步调用?
上篇的 RPC 框架是“同步阻塞”的:
每次发送请求后,必须等待服务端响应后才能返回结果
对于高并发请求,会导致线程阻塞、资源浪费、吞吐低下
理想方案:客户端请求后立即返回一个 Future 对象,等响应来了再异步设置结果,主线程不用阻塞。
二、核心机制:RequestId + ChannelFuture + Map 缓存
我们引入一个全局 ConcurrentHashMap<Long, CompletableFuture<RpcResponse>>:
public class RpcResponseRegistry {
public static final Map<Long, CompletableFuture<RpcResponse>> futures = new ConcurrentHashMap<>();
}
每次发送请求时:
生成唯一 requestId
创建一个 CompletableFuture 存入
RpcResponseRegistry
异步监听服务端响应,当收到响应时设置结果
三、发送请求(非阻塞)
public class NettyClient {
private static AtomicLong REQUEST_ID_GEN = new AtomicLong();
public static CompletableFuture<RpcResponse> send(RpcRequest request, Channel channel) {
long requestId = REQUEST_ID_GEN.incrementAndGet();
request.setRequestId(requestId);
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
RpcResponseRegistry.futures.put(requestId, future);
channel.writeAndFlush(request); // 不阻塞等待
return future;
}
}
四、接收响应并完成 Future
服务端处理完成后写出响应:
ctx.writeAndFlush(response);
客户端接收时通过 requestId
找到对应的 future 并完成它:
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) {
long requestId = response.getRequestId();
CompletableFuture<RpcResponse> future = RpcResponseRegistry.futures.remove(requestId);
if (future != null) {
future.complete(response);
}
}
}
五、客户端调用示例
// 获取动态代理
HelloService proxy = proxyFactory.getProxy(HelloService.class);
// 发起异步调用
CompletableFuture<RpcResponse> future = NettyClient.send(request, channel);
future.thenAccept(response -> {
System.out.println("异步结果:" + response.getResult());
});
// 主线程可以干其他事情
System.out.println("我可以继续做别的事情了!");
六、增强:自动类型转换 + 异常处理
你可以在 Future 结果返回时:
自动封装返回值类型(比如String,List<T>)
自动处理异常并打印日志
future.whenComplete((resp, ex) -> {
if (ex != null) {
System.err.println("RPC 异常: " + ex.getMessage());
} else {
System.out.println("返回结果: " + resp.getResult());
}
});
七、总结
通过本篇的异步机制改造,我们的 Netty RPC 框架已经拥有了:
✅ 异步非阻塞调用
✅ CompletableFuture 响应映射
✅ 并发安全的请求响应管理
✅ 更高的吞吐能力和性能提升