公众号【离心计划】,一起离开地球表面
【RPC系列合集】
| 前言
前一小结我们已经熟悉了实现一个RPC框架,必需和极其重要的组件与功能,这一节我们将会以序列化为起点,构建我们的Sparrow-Rpc。另外我建议进入实战篇后,阅读最好在PC端进行,同步开发。这一节的代码链接如下,每一小节对应一个git tag方便大家阅读。
https://github.com/JAYqq/my-sparrow-rpc/tree/v1.1
| 定义传输格式
我们从客户端出发,将我们的目标请求通过动态代理为我们生成的代理类发送给远端服务然后拿到结果,那么我们的请求中除了方法必要的参数,还有什么“功能性”的属性呢,定义Request也是定义应用层协议的过程,我们需要注意些什么呢?个人拙见,当我们制定协议的时候,最重要的是兼容性,因为协议一定会有一个迭代的过程,如何保证协议升级的过程可以被识别并兼容很重要,而具体的设计方式我们在 【专栏】RPC系列(理论)-协议与序列化 中已经介绍过了,我们直接定义类结构,首先是头部(为了控制文章长度,均省略get/set方法)
public class RpcHeader implements Serializable {
/**
* 版本号
*/
String version;
/**
* traceId
*/
String traceId;
/**
* 类型
*/
String type;
public int getSize() {
return Integer.BYTES + version.getBytes(StandardCharsets.UTF_8).length + Integer.BYTES + traceId.getBytes(StandardCharsets.UTF_8).length + Integer.BYTES + type.getBytes(StandardCharsets.UTF_8).length;
}
}
值得注意的是getSize()方法定义在这边是为后面序列化做准备,返回的值是头部占用的大小,然后定义Request
public class RpcRequest {
private String nameSpace;
private String serviceName;
private String methodName;
/**
* 方法的参数,这边直接用byte接收,这样避免在序列化时重复序列化这部分数据
*/
private byte[] parameters;
/**
* 构造服务签名
* @return
*/
public String buildServiceSign() {
return nameSpace + ":" + serviceName;
}
}
RpcRequest是主要的请求信息,其中我们用nameSpace+serviceName作为一个服务签名表示唯一性,parameters定义为已经经过序列化后的参数。然后我们定义一个RpcCommand作为后面经过Netty的实例格式,这部分设计是因为我们为了课程需要,我将序列化部分分成了自定义序列化协议、jdk序列化以及开源序列化方案三部分,完全只是教学目的。
public class RpcCommand {
RpcHeader header;
byte[] data;
}
这里的data就是我们通过自定义序列化方式序列化好的RpcRequest信息。ok我们定义完了传输格式后的项目结构如下:
我们本节的目的是序列化,所以我们暂时只把眼光放到如何把这些类的实例与二进制数据之间转化即可,上面三个类中我们需要序列化的数据应该只有两种:
RpcHeader和RpcRequest,由我们自定义序列化协议
parameters,我们采用jdk提供的序列化方式
| 序列化阶段
我们设计的过程中遵守面向接口编程的方式,因此我们先定义接口类
public interface Serializer<T> {
/**
* 获取T的长度
*
* @return
*/
int getSize(T t);
/**
* 序列化
*
* @param o
* @return
*/
byte[] serialize(T o, ByteBuffer buffer);
/**
* 反序列化
*
* @return
*/
T parse(ByteBuffer buffer);
/**
* 序列器类型
*/
byte getType();
Class<T> getSerializerClass();
}
我们用byte标识序列化类型,由于篇幅原因我们详解自定义的序列化协议,jdk序列化方式与开源的方案我们粗略过一下,只因为序列化的目的都是一个:在对象与二进制数据之间转换。接下来我们实现RpcRequest的序列化类,看看怎么实现。
public class RpcRequestSerializer implements Serializer<RpcRequest>
首先getSize方法是RpcRequest类中每个实例所占的字节数,我们再贴一下RpcRequest的类结构
public class RpcRequest {
private String nameSpace;
private String serviceName;
private String methodName;
private byte[] parameters;
}
所以getSize的方法如下,大小就是三个
@Override
public int getSize(RpcRequest request) {
return request.getNameSpace().getBytes(StandardCharsets.UTF_8).length + Integer.BYTES +
request.getServiceName().getBytes(StandardCharsets.UTF_8).length + Integer.BYTES +
request.getMethodName().getBytes(StandardCharsets.UTF_8).length + Integer.BYTES +
request.getParameters().length + Integer.BYTES;
}
三个String转换成byte的长度加上paramters的长度,还有四个整型长度是什么?还记得我们理论篇介绍协议的断句么,这里的四个整型代表四个字段的长度,这样方便我们后面创建byte数组来接受四个字段的数据。然后看serialize方法,如何把RpcRequest转换成二进制数据
@Override
public byte[] serialize(RpcRequest o, ByteBuffer buffer)
{
byte[] nameSpaceBytes = o.getNameSpace().getBytes();
buffer.putInt(nameSpaceBytes.length);
buffer.put(nameSpaceBytes);
byte[] serviceBytes = o.getServiceName().getBytes();
buffer.putInt(serviceBytes.length);
buffer.put(serviceBytes);
byte[] methodBytes = o.getMethodName().getBytes();
buffer.putInt(methodBytes.length);
buffer.put(methodBytes);
buffer.putInt(o.getParameters().length);
buffer.put(o.getParameters());
return buffer.array();
}
结构很清楚,我们的目的是把四个字段放到ByteBuffer中,我们只需要将字段数据放到byte数组中,然后依次put到buffer中就行。关于ByteBuffer,它是jdk的nio包下的一个工具,继承关系如下
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
提供了对字节数据的基本操作,除了Buffer的“原生”方法外,ByteBuffer提供了如
putInt() 往字节数组尾部添加一个整型转换后的字节数据
putDouble() 往字节数组尾部添加一个双精度小数转换后的字节数据
getInt() 按顺序读取一个整型大小的字节数据,也就是读取四个字节
get(byte[] arr) 按顺序读取一个区域的字节数据放到arr中
这样api友好的方法,方便我们操作字节数组。然后我们看反序列的parse方法是如何写的
@Override
public RpcRequest parse(ByteBuffer buffer) {
int len = buffer.getInt();
byte[] arrBytes = new byte[len];
buffer.get(arrBytes);
String nameSpace = new String(arrBytes, StandardCharsets.UTF_8);
len = buffer.getInt();
arrBytes = new byte[len];
buffer.get(arrBytes);
String serviceName = new String(arrBytes, StandardCharsets.UTF_8);
len = buffer.getInt();
arrBytes = new byte[len];
buffer.get(arrBytes);
String methodName = new String(arrBytes, StandardCharsets.UTF_8);
len = buffer.getInt();
arrBytes = new byte[len];
buffer.get(arrBytes);
RpcRequest request = new RpcRequest();
request.setNameSpace(nameSpace);
request.setServiceName(serviceName);
request.setMethodName(methodName);
request.setParameters(arrBytes);
return request;
}
结构也很清晰,逻辑就是先读取下一段的长度len,然后读取len个字节数据作为这个字段的值,只要按照顺序下来解析就行。
这样一个简单序列化RpcRequest的方法就做好了,但这也仅仅是针对这一个类的实例做序列化与反序列化,但是我们通过这个至少清楚了代码层面的原理。接下来,还记得我们RpcRequest中的paramters字段么,它的原型是我们调用远程方法需要传递的参数类型,这部分的序列化我们采用jdk序列化方式,看看jdk是怎么做的
public byte[] toByteArray(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeByte(getType());
oos.writeObject(obj);
oos.flush();
bytes = bos.toByteArray();
oos.close();
bos.close();
} catch (IOException ex) {
ex.printStackTrace();
}
return bytes;
}
/**
* 数组转对象
*
* @param bytes
* @return
*/
public Object toObject(byte[] bytes) {
Object obj = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
//先读取
ois.readByte();
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException ex) {
ex.printStackTrace();
}
return obj;
}
通过 ObjectOutputStream和 ObjectInputStream 实现,由于都是Object所以反序列化后我们通常需要进行类型转化,转化成我们需要的实例类型。
最后我们再来采用一种开源的序列化方式:Hessian。Hessian是Dubbo的默认序列化协议,使用hessian需要引入依赖
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
让我们看看Hessian的序列化与反序列的使用方式
@Override
public byte[] serialize(Object o, ByteBuffer buffer) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Hessian2Output out = new Hessian2Output(outputStream);
out.writeObject(o);
out.flush();
return outputStream.toByteArray();
} catch (Exception e) {
throw new SecurityException("serializer error");
}
}
@Override
public Object parse(ByteBuffer buffer) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(buffer.array());
Hessian2Input input = new Hessian2Input(inputStream);
Object o = input.readObject();
return o;
} catch (Exception e) {
throw new SerializeException("parse object error");
}
}
看起来和jdk序列化方式差不多(说明封装的很好),使用 Hessian2Output 和Hessian2Input 进行操作,在我们的Sparrow-Rpc中,我们的服务响应Response后续我们会使用Hessian进行序列化与反序列化,这样我们可以了解到三种序列化的使用方式,就很nice(虽然很不规范)。除此之外,我们自定义了序列化异常类,便于定位异常类型。
public class SerializeException extends RuntimeException {
public SerializeException(String message) {
super(message);
}
}
这样,我们就完成了我们通信需要用到的序列化部分的功能,但是现在还有一个问题,就是如何优雅的使用我们想要的序列化方式。在真实场景中,有可能是这样的情况,Sparrow-Rpc开始采用的是jdk序列化方式,但是后面想替换成Hessian,这个步骤你想“无感升级”,那么我们就缺少一种可拔插的开发方式。所以 SPI 的出现就解决了这一问题。
| SPI
SPI(Service Provider Interface),是JDK内置的一种 服务提供发现机制,可以用来启用框架扩展和替换组件。简单来说,我们利用SPI可以无缝替换上面的Serializer接口的实现方式,只要修改配置文件即可,调用者与SPI形成了下面的关系,可以看出我们想要替换序列化方式是对调用者透明的,这也是功能解耦的体现。
SPI要求我们需要在META-INF/services/下编写配置文件,目的是为了告诉JDK需要帮我们生成什么实现类的实例,由于我们用到了多种序列化方式,因此我们的配置文件是这样的 (com.sparrow.rpc.core.serialize.Serializer)
com.sparrow.rpc.core.serialize.impl.ObjectSerializer
com.sparrow.rpc.core.serialize.impl.ObjectArraysSerializer
com.sparrow.rpc.core.serialize.impl.RpcRequestSerializer
com.sparrow.rpc.core.serialize.impl.HessianSerializer
为了序列化有一个统一出口,我们实现一个SerializeSupport支持类,对外提供一致序列化方式,我们先定义了两个Map成员
private staticMap<Byte, Serializer<?>> typeSerializerMap = new ConcurrentHashMap<>();
private static Map<Class<?>, Serializer<?>> classSerializerMap = new ConcurrentHashMap<>();
static {
//加载所有序列器
Collection<Serializer> serializers = SpiSupport.loadAll(Serializer.class);
serializers.forEach(serializer -> {
typeSerializerMap.put(serializer.getType(), serializer);
classSerializerMap.put(serializer.getSerializerClass(), serializer);
});
}
我们在序列化时会先在字节数组的第一位放一个Byte类型的值表示序列化类型
typeSerializerMap 是为了在做反序列时,可以先拿到序列化类型,然后找到对应的序列化器做反序列化;同理classSerializerMap也是为了在序列化时做准备,所以我们的序列化与反序列化方法就是
public static <T> byte[] serialize(T t) {
Serializer<T> serializer = (Serializer<T>) classSerializerMap.get(t.getClass());
if (Objects.isNull(serializer)) {
throw new IllegalArgumentException(String.format("Cannot find correct serializer for class:%s", t.getClass()));
}
ByteBuffer buffer = ByteBuffer.allocate(serializer.getSize(t) + 1);
//先放上类型
buffer.put(serializer.getType());
return serializer.serialize(t, buffer);
}
public static <E> E parse(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
//先取出类型
byte type = buffer.get();
Serializer<?> serializer = typeSerializerMap.get(type);
if (Objects.isNull(serializer)) {
throw new IllegalArgumentException(String.format("Unknown type:%s", type));
}
Object rs = serializer.parse(buffer);
return (E) rs;
}
| 小结
这样我们就完成了序列化部分的工作,只要调用SerializeSupport的对应方法,就能利用SPI机制动态生成的序列化器实例进行操作了。具体的序列化与反序列化部分也是利用了ByteBuffer进行读写,只要理清了逻辑,就能明白序列化的工作内容了。
这一节都到这里,下一节我们会讲解Netty相关的部分,看看我们怎么将序列化好的数据发送出去,又怎么接受二进制数据反序列化成我们想要的结果,Netty到底为我们做了哪些工作呢?下期见旁友~