前言
继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
本文主要分析Rocketmq的网络通信部分,源码位于remoting模块下,Rocketmq通信模块是基于Netty建设的,在阅读源码之前最好对Netty有个系统性的认知,这样在读起来更加迅速,另外要学会看类图和。 先看下Remote模块的核心类结构图
RemotingServer解读(RemotingClient代码类似就不说了)
public interface RemotingServer extends RemotingService {
//注册时间对应处理器
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//注册默认处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
//本地监听端口
int localListenPort();
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
//同步发送
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
//异步发送
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//只发送 不关心结果
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
复制代码
RemotingCommand解读,它是Netty传输的载体,并且承载着编解码的工作,其属性如下:
//请求命令编码
private int code;
//语言 每日Java
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//请求客户端的序列号
private int opaque = requestId.getAndIncrement();
//标记 表示请求类型 0 :request, 1 : response
private int flag = 0;
//描述
private String remark;
//扩展属性
private HashMap<String, String> extFields;
// 每个请求的请求头
private transient CommandCustomHeader customHeader;
复制代码
消息格式:
消息编码encode解读
public ByteBuffer encode() {
//消息总长度
int length = 4;
//消息头部数据
byte[] headerData = this.headerEncode();
//加上消息头长度
length += headerData.length;
//加上body体长度
if (this.body != null) {
length += body.length;
}
//分配ByteBuffer +4是因为加上消息头部的长度
ByteBuffer result = ByteBuffer.allocate(4 + length);
//放入4字节的消息总长度
result.putInt(length);
//将消息头长度放入ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
//将消息头数据放入ByteBuffer
result.put(headerData);
// 将消息主体放入ByteBuffer
if (this.body != null) {
result.put(this.body);
}
//重置position位置
result.flip();
return result;
}
//source是消息
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
//序列化的类型
result[0] = type.getCode();
//source右移16位&11111111 获取16-23位的值
result[1] = (byte) ((source >> 16) & 0xFF);
//右移8位 获取8-15位的值
result[2] = (byte) ((source >> 8) & 0xFF);
//获取0-7位的值
result[3] = (byte) (source & 0xFF);
return result;
}
复制代码
start()方法解读
@Override
public void start() {
//初始化默认线程池,用与处理多个hander
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
//设置boss,worker线程池
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//使用epoll还是select
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//tcp的缓冲队列个数包括已经建立的链接和处于三次握手过程中的链接
.option(ChannelOption.SO_BACKLOG, 1024)
//允许重复使用本地地址和端口
.option(ChannelOption.SO_REUSEADDR, true)
//keepalive
.option(ChannelOption.SO_KEEPALIVE, false)
//禁止使用Nagle算法,使用于小数据即时传输
.childOption(ChannelOption.TCP_NODELAY, true)
//发送缓冲区
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
//接收缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//绑定地址
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),//编码器
new NettyDecoder(),//解码器
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//netty的心跳
//连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理
new NettyConnectManageHandler(),
//当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
//异步绑定地址端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//定时清理responseTable的超时结果
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
复制代码
通过start()方法可以看出Rocketmq的通信方式使用Netty的主从多线程模型结合这上图大致说下主从多线程模型
1.Server端eventLoopGroupBoss线程池监听TCP连接请求
2.连接建立好之后,转发给eventLoopGroupSelector中的线程,eventLoopGroupSelector将建立连接的socket注册到selector上
3.eventLoopGroupSelector中的selector监听I/O事件,同时处理I/O事件,经过PipleLine使用defaultEventExecutorGroup一个一个的handler处理下去
4.NettyServerHandler负责使用业务线程池处理对应的业务事件启动结束后,开始看下client和server端是怎么交互的但是在此之前看下二者共同的父类
结束语
今天的源码解析就先到这里,如果有疑问或者想讨论的小伙伴可以在评论区评论。
本文含有隐藏内容,请 开通VIP 后查看