本文基于何小锋老师的RPC实战和核心原理
核心原理
什么是RPC?
RPC,远程过程调用,远程肯定是指要跨机器而非本机,所以需要用到网络编程才能实现,但是并不是只要通过网络通信访问到另一台机器的应用程序就可以称为 RPC 调用
RPC 是帮助我们屏蔽网络编程细节,实现调用远程方法就跟调用本地(同一个项目中的方法)一样的体验
RPC 就是提供一种透明调用机制,让使用者不必显式地区分本地调用和远程调用。RPC 虽然可以帮助开发者屏蔽远程调用跟本地调用的区别,但毕竟涉及到远程网络通信,所以这里还是有很多使用上的区别
RPC作用主要体现:
- 屏蔽远程调用跟本地调用的区别,让我们感觉就是调用项目内的方法
- 隐藏底层网络通信的复杂性,让我们更专注于业务逻辑
RPC通信流程
发起调用请求叫调用方,被调用的一方叫做服务提供方
1、RPC 是一个远程调用,那么肯定是需要用到网络传输数据,一般使用TCP确保传输数据可靠性;
2、网络上传输的都是二进制数据,但是传入rpc的参数都是对象,因此这里就有了将对象转化为二进制数据的方式,也就是“序列化”;
3、调用方将对象参数序列化为二进制数据,然后通过TCP传输给服务提供方,为了让服务提供方知道数据的开始位置和结束位置,就出现了“协议”,大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等;
4、根据协议的格式,正确拿到传输的数据,将二进制数据转化为对象,也就是反序列化;
5、调用方不需要掌握RPC太多底层,因此就需要用到动态代理技术
协议
协议作用确保一方能够正确解析另一方发送的数据
为什么有了HTTP协议,一些RPC框架还要设计私有协议?
1、相比HTTP,RPC 主要是应用之间的通信,性能要求相对高,HTTP 协议的数据包大小相对请求数据本身要大很多,需要加入很多无用的内容
2、HTTP 协议属于无状态协议,客户端无法对请求和响应进行关联,每次请求都需要重新建立连接,响应完成后再关闭连接(RPC为了吞吐量异步并发发送请求,等待应答,所以需要知道哪个应答对应那个请求)
怎么实现请求和响应对应起来?可以通过消息id,
以 Dubbo 为例,消费者发送请求时,使用 AtomicLong 自增,产生一个 消息 ID。由于 Dubbo 底层 IO 操作是异步的,Dubbo 发送请求之后,需要阻塞等待消费者返回信息。消费者会将消息 ID 保存到 Map 结构中,。为了保证请求响应可以一一对应,这就需要提供者返回的响应信息带上请求者消息 ID。 通过响应的消息 ID,通过 上面提到 Map 存储数据,就能找到对应的请求。
协议设计
协议头,协议体
协议头一般包含协议长度、序列化方式、协议标示、消息 ID、消息类型等
协议体一般只放请求接口方法、请求的业务参数值和一些扩展属性
上面是定长协议,也就是如果以后协议头需要添加字段是不被允许的
整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容,前两部分我们还是可以统称为“协议头”,具体协议如下:
序列化
前面提到过,网络传输的数据必须是二进制数据,但调用方请求的出入参数都是对象。对象是不能直接在网络中传输的,所以我们需要提前把它转成可传输的二进制,并且要求转换算法是可逆的,这个过程我们一般叫做“序列化”。 这时,服务提供方就可以正确地从二进制数据中分割出不同的请求,同时根据请求类型和序列化类型,把二进制的消息体逆向还原成请求对象,这个过程我们称之为“反序列化”。
常见的序列化方式
JDK原生
//JDK原生序列化
public static void test1() throws Exception{
Student yubo = new Student(1, "yubo");
URL url = Test.class.getClassLoader().getResource("student.txt");
new ObjectOutputStream(new FileOutputStream(url.getPath())).writeObject(yubo);
Student stu = (Student)new ObjectInputStream(Test.class.getClassLoader().getResourceAsStream("student.txt")).readObject();
System.out.println(stu.toString());
}
序列化过程就是在读取对象数据的时候,不断加入一些特殊分隔符,这些特殊分隔符用于在反序列化过程中截断用。
- 头部数据用来声明序列化协议、序列化版本,用于高低版本向后兼容
- 对象数据主要包括类名、签名、属性名、属性类型及属性值,当然还有开头结尾等数据,除了属性值属于真正的对象值,其他都是为了反序列化用的元数据
- 存在对象引用、继承的情况下,就是递归遍历“写对象”逻辑
JSON
JSON 是典型的 Key-Value 方式,没有数据类型,是一种文本型序列化框架
但用 JSON 进行序列化有这样两个问题:
- JSON 进行序列化的额外空间开销比较大,对于大数据量服务这意味着需要巨大的内存和磁盘开销;
- JSON 没有类型,但像 Java 这种强类型语言,需要通过反射统一解决,所以性能不会太好。
所以如果 RPC 框架选用 JSON 序列化,服务提供者与服务调用者之间传输的数据量要相对较小,否则将严重影响性能。
Hessian
Hessian 是动态类型、二进制、紧凑的,并且可跨语言移植的一种序列化框架。Hessian 协议要比 JDK、JSON 更加紧凑,性能上要比 JDK、JSON 序列化高效很多,而且生成的字节数也更小。
Student student = new Student(1, "yb0os1");
//student对象转化为byte数组
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(outputStream);
output.writeObject(student);
output.flushBuffer();
byte[] data = outputStream.toByteArray();//序列化后的
System.out.println("Hessian序列化后的字节长度: " + data.length);
outputStream.close();
ByteArrayInputStream bis = new ByteArrayInputStream(data);
Hessian2Input input = new Hessian2Input(bis);
Student deStudent = (Student) input.readObject();
input.close();
System.out.println("反序列化结果: " + deStudent);
Hessian 本身也有问题,官方版本对 Java 里面一些常见对象的类型不支持,比如:
- Linked 系列,LinkedHashMap、LinkedHashSet 等,但是可以通过扩展 CollectionDeserializer 类修复;
- Locale 类,可以通过扩展 ContextSerializerFactory 类修复;
- Byte/Short 反序列化的时候变成 Integer
目前高版本hessian好像已经修复了问题
Protobuf
Protobuf 是 Google 公司内部的混合语言数据标准,是一种轻便、高效的结构化数据存储格式,可以用于结构化数据序列化,支持 Java、Python、C++、Go 等语言;Protobuf 使用的时候需要定义 IDL(Interface description language),然后使用不同语言的 IDL 编译器,生成序列化工具类。
优点:
- 序列化后体积相比 JSON、Hessian 小很多;
- IDL 能清晰地描述语义,所以足以帮助并保证应用程序之间的类型不会丢失,无需类似 XML 解析器;
- 序列化反序列化速度很快,不需要通过反射获取类型
- 消息格式升级和兼容性不错,可以做到向后兼容
使用:
1、假设我们要序列化一个 User 对象,创建 user.proto 文件:
syntax = "proto3";
option java_package = "com.example.protobuf";
option java_outer_classname = "UserProto";
message User {
string name = 1;
int32 age = 2;
repeated string hobbies = 3; // 列表字段
}
2、使用 protoc 编译器生成 Java 代码
protoc --java_out=. user.proto
生成后会在 com/example/protobuf 目录下生成 UserProto.java
3、 Java 序列化与反序列化代码
UserProto.User user = UserProto.User.newBuilder()
.setName("yb0os1")
.setAge(18).
addHobbies("篮球")
.addHobbies("java")
.build();
ByteArrayOutputStream output = new ByteArrayOutputStream();
user.writeTo(output);
byte[] data = output.toByteArray();
System.out.println("ProtoBuf序列化后的字节长度: " + data.length);
output.close();
UserProto.User deUser = UserProto.User.parseFrom(data);
System.out.println("反序列化结果: " + deUser);
Protobuf 非常高效,但是对于具有反射和动态能力的语言来说,这样用起来很费劲,这一点就不如 Hessian,比如用 Java 的话,这个预编译过程不是必须的,可以考虑使用 Protostuff。
Protostuff 不需要依赖 IDL 文件,可以直接对 Java 领域对象进行反 / 序列化操作,在效率上跟 Protobuf 差不多,生成的二进制格式和 Protobuf 是完全相同的,可以说是一个 Java 版本的 Protobuf 序列化框架。
但在使用过程中,一些不支持的情况,: 不支持 null(最新版本已经支持了); ProtoStuff 不支持单纯的 Map、List 集合对象,需要包在对象里面。
ProtoStuff
User user = new User("yb0os1",18);
//序列化
Schema<User> schema = RuntimeSchema.getSchema(User.class);
byte[] bytes = ProtostuffIOUtil.toByteArray(
user,
schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)
);
System.out.println("ProtoStuff 序列化后的字节长度: " + bytes.length);
// 3. 反序列化
User deserializedUser = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, deserializedUser, schema);
System.out.println("反序列化结果: " + deserializedUser);
总体代码
package com.yb0os1.serialization;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.*;
import java.net.URL;
public class Test {
public static void main(String[] args) throws Exception {
System.out.println("=== JDK 原生序列化测试 ===");
test1();
System.out.println("\n=== Hessian2 序列化测试 ===");
test2();
System.out.println("\n=== ProtoBuf 序列化测试 ===");
test3();
System.out.println("\n=== ProtoStuff 序列化测试 ===");
test4();
}
//JDK原生序列化
public static void test1() throws Exception{
Student yubo = new Student(1, "yubo");
URL url = Test.class.getClassLoader().getResource("student.txt");
new ObjectOutputStream(new FileOutputStream(url.getPath())).writeObject(yubo);
Student stu = (Student)new ObjectInputStream(Test.class.getClassLoader().getResourceAsStream("student.txt")).readObject();
System.out.println(stu.toString());
}
//Hessian序列化
public static void test2() throws Exception{
Student student = new Student(1, "yb0os1");
//student对象转化为byte数组
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(outputStream);
output.writeObject(student);
output.flushBuffer();
byte[] data = outputStream.toByteArray();//序列化后的
System.out.println("Hessian序列化后的字节长度: " + data.length);
outputStream.close();
ByteArrayInputStream bis = new ByteArrayInputStream(data);
Hessian2Input input = new Hessian2Input(bis);
Student deStudent = (Student) input.readObject();
input.close();
System.out.println("反序列化结果: " + deStudent);
}
/// ProtoBuf序列化
public static void test3() throws Exception{
UserProto.User user = UserProto.User.newBuilder()
.setName("yb0os1")
.setAge(18).
addHobbies("篮球")
.addHobbies("java")
.build();
ByteArrayOutputStream output = new ByteArrayOutputStream();
user.writeTo(output);
byte[] data = output.toByteArray();
System.out.println("ProtoBuf序列化后的字节长度: " + data.length);
output.close();
UserProto.User deUser = UserProto.User.parseFrom(data);
System.out.println("反序列化结果: " + deUser);
}
/// ProtoStuff序列化
public static void test4() throws Exception{
User user = new User("yb0os1",18);
//序列化
Schema<User> schema = RuntimeSchema.getSchema(User.class);
byte[] bytes = ProtostuffIOUtil.toByteArray(
user,
schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)
);
System.out.println("ProtoStuff 序列化后的字节长度: " + bytes.length);
// 3. 反序列化
User deserializedUser = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, deserializedUser, schema);
System.out.println("反序列化结果: " + deserializedUser);
}
}
如何选择?
序列化的方式不止前面这些,还有 Message pack、kryo 等。那么面对这么多的序列化协议,在 RPC 框架中我们该如何选择呢?
主要考虑性能效率、空间开销、通用性和兼容性、安全性
序列化与反序列化过程是 RPC 调用的一个必须过程,那么序列化与反序列化的性能和效率势必将直接关系到 RPC 框架整体的性能和效率。
空间开销,也就是序列化之后的二进制数据的体积大小。序列化后的字节数据体积越小,网络传输的数据量就越小,传输数据的速度也就越快,由于 RPC 是远程调用,那么网络传输的速度将直接关系到请求响应的耗时
与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次才会去考虑性能、效率和空间开销。
首选的还是 Hessian 与 Protobuf,因为他们在性能、时间开销、空间开销、通用性、兼容性和安全性上,都满足了我们的要求。其中 Hessian 在使用上更加方便,在对象的兼容性上更好;Protobuf 则更加高效,通用性上更有优势。
注意 使用 RPC 框架的过程中,我们构造入参、返回值对象,主要记住以下几点:
- 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
- 入参对象与返回值对象体积不要太大,更不要传太大的集合;
- 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
- 对象不要有复杂的继承关系,最好不要有父子类的情况。
网络通信
常见网络通信模型
同步阻塞BIO、同步非阻塞NIO、IO多路复用、异步非阻塞AIO
最常见的就是BIO和IO多路复用
阻塞IO
同步阻塞 IO 是最简单、最常见的 IO 模型,在 Linux 中,默认情况下所有的 socket 都是 blocking。
首先,应用进程发起 IO 系统调用后,应用进程被阻塞,转到内核空间处理。之后,内核开始等待数据,等待到数据之后,再将内核中的数据拷贝到用户内存中,整个 IO 处理完毕后返回进程。最后应用的进程解除阻塞状态,运行业务逻辑。
系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。而在这两个阶段中,应用进程中 IO 操作的线程会一直都处于阻塞状态,如果是基于 Java 多线程开发,那么每一个 IO 操作都要占用线程,直至 IO 操作结束。
系统内核处理 IO 操作的整个过程中,线程是阻塞的。线程是否需要等待数据由网卡拷贝到内核空间,决定是否阻塞;线程是否需要等待数据由内核空间拷贝到用户空间,决定是否同步。
阻塞 IO 每处理一个 socket 的 IO 请求都会阻塞进程(线程),但使用难度较低。在并发量较低、业务逻辑只需要同步进行 IO 操作的场景下,阻塞 IO 已经满足了需求,并且不需要发起 select 调用,开销上还要比 IO 多路复用低
IO多路复用
多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。
多个网络连接的 IO 可以注册到一个复用器(select)上,当用户进程调用了 select,那么整个进程会被阻塞。同时,内核会“监视”所有 select 负责的 socket,当任何一个 socket 中的数据准备好了,select 就会返回。这个时候用户进程再调用 read 操作,将数据从内核中拷贝到用户进程。
当用户进程发起了 select 调用,进程会被阻塞,当发现该 select 负责的 socket 有准备好的数据时才返回,之后才发起一次 read,整个流程要比阻塞 IO 要复杂,似乎也更浪费性能。但它最大的优势在于,用户可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 IO 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
IO 多路复用更适合高并发的场景,可以用较少的进程(线程)处理较多的 socket 的 IO 请求,但使用难度比较高
RPC 调用在大多数的情况下,是一个高并发调用的场景,考虑到系统内核的支持、编程语言的支持以及 IO 模型本身的特点,在 RPC 框架的实现中,在网络通信的处理上,我们会选择 IO 多路复用的方式。开发语言的网络通信框架的选型上,我们最优的选择是基于 Reactor 模式实现的框架,如 Java 语言,首选的框架便是 Netty 框架(Java 还有很多其他 NIO 框架,但目前 Netty 应用得最为广泛),并且在 Linux 环境下,也要开启 epoll 来提升系统性能(Windows 环境下是无法开启 epoll 的,因为系统内核不支持)。
零拷贝
系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中
应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。
应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能
零拷贝技术可以减少进程间的数据拷贝,提高数据传输的效率
所谓零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,都可以通过一种方式,让应用进程向用户空间写入或者读取数据,就如同直接向内核空间写入或者读取数据一样,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。
零拷贝有两种实现的方式,分别是 mmap+write 方式和 sendfile 方式
mmap是利用了虚拟内存(划分为用户内存和内核内存)映射到用户空间中
sendfile可以直接从磁盘读取到内核中然后直接从内核发送到网卡,不需要进行用户态和内核态的转换拷贝
netty的零拷贝
操作系统的零拷贝跟netty的零拷贝技术不一样,netty是偏向数据优化,OS的零拷贝则是提高了CPU的利用率
思考一个问题,发送数据的时候分多个数据包,那么对于数据包的分割和合并是在内核空间完成,还是在用户空间完成?
当时是用户空间,因为对数据包的处理工作都是由应用程序来处理的,那么这里有没有可能存在数据的拷贝操作?可能会存在,当然不是在用户空间与内核空间之间的拷贝,是用户空间内部内存中的拷贝处理操作。Netty 的零拷贝就是为了解决这个问题,在用户空间对数据操作进行优化。
Netty 是怎么对数据操作进行优化的呢?
- Netty 提供了 CompositeByteBuf 类,它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。
- ByteBuf 支持 slice 操作,因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免了内存的拷贝。通过 wrap 操作,我们可以将 byte[] 数组、ByteBuf、ByteBuffer 等包装成一个 Netty ByteBuf 对象, 进而避免拷贝操作。
- Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。
那么 Netty 有没有解决用户空间与内核空间之间的数据拷贝问题的方法呢?
Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socket 的读写操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。
Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。
动态代理
RPC 会自动给接口生成一个代理类,当我们在项目中注入接口的时候,运行过程中实际绑定的是这个接口生成的代理类。这样在接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样我们就可以在生成的代理类里面,加入远程调用逻辑。
原生、Javassist、Byte Buddy、CGlib等等
动态代理是一种具体的技术框架,那就会涉及到选型。我们可以从这样三个角度去考虑:
- 因为代理类是在运行中生成的,那么代理框架生成代理类的速度、生成代理类的字节码大小等等,都会影响到其性能——生成的字节码越小,运行所占资源就越小。
- 还有就是我们生成的代理类,是用于接口方法请求拦截的,所以每次调用接口方法的时候,都会执行生成的代理类,这时生成的代理类的执行效率就需要很高效。
- 最后一个是从我们的使用角度出发的,我们肯定希望选择一个使用起来很方便的代理类框架,比如我们可以考虑:API 设计是否好理解、社区活跃度、还有就是依赖复杂度等等。
如果没有动态代理帮我们完成方法调用拦截,那么就需要使用静态代理来实现,就需要用户对原始类中所有的方法都重新实现一遍,并且为每个方法附加相似的代码逻辑,如果在RPC中,这种需要代理的类有很多个,就需要针对每个类都创建一个代理类。
gRPC剖析
gRPC 是由 Google 开发并且开源的一款高性能、跨语言的 RPC 框架,当前支持 C、Java 和 Go 等语言。gRPC 有很多特点,比如跨语言,通信协议是基于标准的 HTTP/2 设计的,序列化支持 PB(Protocol Buffer)和 JSON,整个调用示例如下图所示:
使用gRPC
定义一个 say 方法,调用方通过 gRPC 调用服务提供方,然后服务提供方会返回一个字符串给调用方。
为了保证调用方和服务提供方能够正常通信,我们需要先约定一个通信过程中的契约,也就是我们在 Java 里面说的定义一个接口,这个接口里面只会包含一个 say 方法。在 gRPC 里面定义接口是通过写 Protocol Buffer 代码,从而把接口的定义信息通过 Protocol Buffer 语义表达出来。HelloWord 的 Protocol Buffer 代码如下所示:
proto 文件放在 src/main/proto/hello.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.hello";
option java_outer_classname = "HelloProto";
option objc_class_prefix = "HLW";
package hello;
service HelloService{
rpc Say(HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
添加pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yb0os1</groupId>
<artifactId>gRpc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.60.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.60.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.60.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.60.1:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
执行mvn clean compile
自动生成的 Java 文件会放到 target/generated-sources/protobuf/java 和 target/generated-sources/protobuf/grpc-java
生成了基础代码之后我们可以进行解析
发送方
package io.grpc.hello;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
public class HelloWorldClient {
private final ManagedChannel channel;
private final HelloServiceGrpc.HelloServiceBlockingStub blockingStub;
/**
* 构建Channel连接
**/
public HelloWorldClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build());
}
/**
* 构建Stub用于发请求
**/
HelloWorldClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = HelloServiceGrpc.newBlockingStub(channel);
}
/**
* 调用完手动关闭
**/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* 发送rpc请求
**/
public void say(String name) {
// 构建入参对象
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
// 发送请求
response = blockingStub.say(request);
} catch (StatusRuntimeException e) {
return;
}
System.out.println(response);
}
public static void main(String[] args) throws Exception {
HelloWorldClient client = new HelloWorldClient("127.0.0.1", 50051);
try {
client.say("world");
} finally {
client.shutdown();
}
}
}
分为三个步骤:
- 首先用 host 和 port 生成 channel 连接;
- 然后用前面生成的 HelloService gRPC 创建 Stub 类;
- 最后我们可以用生成的这个 Stub 调用 say 方法发起真正的 RPC 调用,后续其它的 RPC 通信细节就对我们使用者透明了。
进入到 ClientCalls.blockingUnaryCall 方法里面看下逻辑细节,最重要的部分:
在调用端代码里面,我们只需要一行代码就可以发起一个 RPC 调用,而具体这个请求是怎么发送到服务提供者那端的呢?这对于我们 gRPC 使用者来说是完全透明的,我们只要关注是怎么创建出 stub 对象的就可以了。
问题1:入参是一个字符对象,那在 gRPC 是怎么把对象转成二进制数据的呢?
回到上面流程图的第 3 步,在 writePayload 之前,ClientCallImpl 里面有一行代码就是 method.streamRequest(message),看方法签名我们大概就知道它是用来把对象转成一个 InputStream,有了 InputStream 我们就很容易获得入参对象的二进制数据。
这个方法为啥不直接返回二进制数组,而是返回一个 InputStream 对象呢?
InputStream封装了底层传输的字节缓冲区实现,它通常是一组通过指针连接起来的内存块的集合,这些内存块由网络的零拷贝获取的。由于不能保证能够从内存块中获取一个byte[],我们不能传递一个简单的byte[]或byte[][],并且可能需要一个目标byte[]来从缓冲区中获取数据。
另外byte[]的缺点是需要从缓冲区中复制一个大的、连续的数据,而实际上没有什么方法可以使它执行得更好。当使用压缩时,我们也不知道消息未压缩的长度,它是动态解压缩的。
也就是避免了二次拷贝。
接着看 streamRequest 方法的拥有者 method 是个什么对象? method 是 MethodDescriptor 对象关联的一个实例,而 MethodDescriptor 是用来存放要调用 RPC 服务的接口名、方法名、服务调用的方式以及请求和响应的序列化和反序列化实现类
MethodDescriptor 是用来存储一些 RPC 调用过程中的元数据,而在 MethodDescriptor 里面 requestMarshaller 是在绑定请求的时候用来序列化方式对象的,所以当我们调用 method.streamRequest(message) 的时候,实际是调用 requestMarshaller.stream(requestMessage) 方法,而 requestMarshaller 里面会绑定一个 Parser,这个 Parser 才真正地把对象转成了 InputStream 对象
问题2:二进制流经过网络传输后,怎么正确地还原请求前语义?
gRPC 的通信协议是基于标准的 HTTP/2 设计的,而 HTTP/2 相对于常用的 HTTP/1.X 来说,它最大的特点就是多路复用、双向流,该怎么理解这个特点呢?这就好比我们生活中的单行道和双行道,HTTP/1.X 就是单行道,HTTP/2 就是双行道。
gRPC 是基于 HTTP/2 协议,而 HTTP/2 传输基本单位是 Frame,Frame 格式是以固定 9 字节长度的 header,后面加上不定长的 payload 组成,协议格式如下图所示:
上面那个流程图的第 4 步,在 write 到 Netty 里面之前,我们看到在 MessageFramer.writePayload 方法里面会间接调用 writeKnownLengthUncompressed 方法,该方法要做的两件事情就是构造 Frame Header 和 Frame Body,然后再把构造的 Frame 发送到 NettyClientHandler,最后将 Frame 写入到 HTTP/2 Stream 中,完成请求消息的发送
接收方
package com.yb0os1;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.hello.HelloReply;
import io.grpc.hello.HelloRequest;
import io.grpc.hello.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
public class HelloWorldServer {
private Server server;
/**
* 对外暴露服务
**/
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new HelloServiceImpl())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
HelloWorldServer.this.stop();
}
});
}
/**
* 关闭端口
**/
private void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* 优雅关闭
**/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
static class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
@Override
public void say(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer server = new HelloWorldServer();
server.start();
server.blockUntilShutdown();
}
}
服务对外暴露的目的是让过来的请求在被还原成信息后,能找到对应接口的实现。在这之前,我们需要先保证能正常接收请求,通俗地讲就是要先开启一个 TCP 端口,让调用方可以建立连接,并把二进制数据发送到这个连接通道里面,这里依然只展示最重要的部分。
这四个步骤是用来开启一个 Netty Server,并绑定编解码逻辑的。重点看下 NettyServerHandler 就行了,在这个 Handler 里面会绑定一个 FrameListener,gRPC 会在这个 Listener 里面处理收到数据请求的 Header 和 Body,并且也会处理 Ping、RST 命令等,具体流程如下图所示:
在收到 Header 或者 Body 二进制数据后,NettyServerHandler 上绑定的 FrameListener 会把这些二进制数据转到 MessageDeframer 里面,从而实现 gRPC 协议消息的解析 。
这些 Header 和 Body 数据是怎么分离出来的呢?按照我们前面说的,调用方发过来的是一串二进制数据,这就是我们前面开启 Netty Server 的时候绑定 Default HTTP/2FrameReader 的作用,它能帮助我们按照 HTTP/2 协议的格式自动切分出 Header 和 Body 数据来,而对我们上层应用 gRPC 来说,它可以直接拿拆分后的数据来用。