ByteToMessageCodec
ByteToMessageCodec 是一个结合了 ByteToMessageDecoder 和 MessageToByteEncoder 的编解码器,可以实时地将字节流编码或解码为消息,反之亦然。
public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {
private final TypeParameterMatcher outboundMsgMatcher;
private final MessageToByteEncoder<I> encoder;
private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
ByteToMessageCodec.this.decode(ctx, in);
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
ByteToMessageCodec.this.decodeLast(ctx, in);
}
};
protected ByteToMessageCodec() {
this((BufferAllocator) null);
}
protected ByteToMessageCodec(Class<? extends I> outboundMessageType) {
this(outboundMessageType, null);
}
protected ByteToMessageCodec(BufferAllocator allocator) {
outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
encoder = new Encoder(allocator);
}
protected ByteToMessageCodec(Class<? extends I> outboundMessageType, BufferAllocator allocator) {
outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
encoder = new Encoder(allocator);
}
@Override
public final boolean isSharable() {
return false;
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return outboundMsgMatcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.channelRead(ctx, msg);
}
@Override
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
return encoder.write(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
decoder.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
decoder.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
try {
decoder.handlerAdded(ctx);
} finally {
encoder.handlerAdded(ctx);
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
decoder.handlerRemoved(ctx);
} finally {
encoder.handlerRemoved(ctx);
}
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() > 0) {
decode(ctx, in);
}
}
private final class Encoder extends MessageToByteEncoder<I> {
private final BufferAllocator allocator;
Encoder(BufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return ByteToMessageCodec.this.acceptOutboundMessage(msg);
}
@Override
protected Buffer allocateBuffer(ChannelHandlerContext ctx, I msg) throws Exception {
BufferAllocator alloc = allocator != null? allocator : ctx.bufferAllocator();
return alloc.allocate(256);
}
@Override
protected void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception {
ByteToMessageCodec.this.encode(ctx, msg, out);
}
}
}
MessageToByteEncoder
MessageToByteEncoder
是一个用于将消息编码为字节流的抽象类。它继承自 ChannelHandlerAdapter
,并通过两个抽象方法来实现编码功能:allocateBuffer 用于分配一个 Buffer,encode 用于将消息编码到 Buffer 中。它通过 acceptOutboundMessage 方法决定是否处理给定的消息类型,并在 write 方法中执行编码操作。
public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {
private final TypeParameterMatcher matcher;
protected MessageToByteEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
matcher = TypeParameterMatcher.get(requireNonNull(outboundMessageType, "outboundMessageType"));
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
Buffer buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast);
try (AutoCloseable ignore = autoClosing(cast)) {
encode(ctx, cast, buf);
}
if (buf.readableBytes() > 0) {
Future<Void> f = ctx.write(buf);
buf = null;
return f;
}
return ctx.write(ctx.bufferAllocator().allocate(0));
}
return ctx.write(msg);
} catch (EncoderException e) {
return ctx.newFailedFuture(e);
} catch (Throwable e) {
return ctx.newFailedFuture(new EncoderException(e));
} finally {
if (buf != null) {
buf.close();
}
}
}
protected abstract Buffer allocateBuffer(ChannelHandlerContext ctx, I msg) throws Exception;
protected abstract void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception;
}
ByteToMessageDecoder
ByteToMessageDecoder 是 Netty 5 中用于处理 TCP 粘包拆包的解码器基础类。它通过维护一个累积缓冲区 cumulation,将接收到的 Buffer 连续拼接、缓存,并在适当时机调用 decode(…) 方法将字节流转为高层消息对象
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();
public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();
private final int discardAfterReads = 16;
private final Cumulator cumulator;
// 累积的字节缓冲区,保存之前未解码完的字节
private Buffer cumulation;
// 是否每次只解码一条消息,默认关闭以提升性能,开启后适合协议升级场景
private boolean singleDecode;
// 标记是否是第一次解码调用
private boolean first;
// 标记本次 decode() 是否产出并传递了消息,决定是否需要继续读。
private boolean firedChannelRead;
// 标记当前的 channelRead 是 decoder 主动触发的,防止重复触发 read()。
private boolean selfFiredChannelRead;
// 统计读操作次数
private int numReads;
// 包装的上下文对象
private ByteToMessageDecoderContext context;
protected ByteToMessageDecoder() {
this(MERGE_CUMULATOR);
}
protected ByteToMessageDecoder(Cumulator cumulator) {
this.cumulator = requireNonNull(cumulator, "cumulator");
}
@Override
public final boolean isSharable() {
// Can't be sharable as we keep state.
return false;
}
public void setSingleDecode(boolean singleDecode) {
this.singleDecode = singleDecode;
}
public boolean isSingleDecode() {
return singleDecode;
}
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
protected Buffer internalBuffer() {
return cumulation;
}
@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
context = new ByteToMessageDecoderContext(ctx);
handlerAdded0(context);
}
protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
}
@Override
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Buffer buf = cumulation;
if (buf != null) {
cumulation = null;
numReads = 0;
int readable = buf.readableBytes();
if (readable > 0) {
ctx.fireChannelRead(buf);
ctx.fireChannelReadComplete();
} else {
buf.close();
}
}
handlerRemoved0(context);
}
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
// 1. TCP 是流式协议,发送方即便发送了一整条消息,接收方可能会分多次读取数据
// 2. 每次读取都触发channelRead,读取到的msg是Buffer类型,内容可能不完整,需要累积合并到 cumulation
// 3. 每次读取都会触发callDecode,去尝试根据现有的数据,进行解码,如果成功则向调用链传递结果,否则啥也不干,等待下次 channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Buffer) {
selfFiredChannelRead = true;
try {
Buffer data = (Buffer) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
}
assert context.delegatingCtx() == ctx || ctx == context;
// 尝试解码
callDecode(context, cumulation);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
// 如果数据已读尽且无剩余,释放 buffer
if (cumulation != null && cumulation.readableBytes() == 0) {
numReads = 0;
if (cumulation.isAccessible()) {
cumulation.close();
}
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// 如果累计读取次数到达阈值,主动丢弃已消费字节,防止 cumulation 越来越大
numReads = 0;
discardSomeReadBytes();
}
// 跟踪本次读数据是否至少成功解码过一次, 向下游传播了消息
firedChannelRead |= context.fireChannelReadCallCount() > 0;
// 状态清理
context.reset();
}
} else {
ctx.fireChannelRead(msg);
}
}
// 在 channelRead 之后,执行 channelReadComplete
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 重置读取次数,清理 cumulation 的无效字节。
numReads = 0;
discardSomeReadBytes();
// 解码还未成功过,自动读取关闭,在 channelRead完成后,执行本方法时,手动触发下一次读取
if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
ctx.read();
}
// 重置状态标志位并通知下游 handler。
firedChannelRead = false;
selfFiredChannelRead = false;
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
// 丢弃 cumulation 中已消费的字节。只在存在历史累积(!first)时执行
if (cumulation != null && !first) {
cumulator.discardSomeReadBytes(cumulation);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assert context.delegatingCtx() == ctx || ctx == context;
channelInputClosed(context, true);
}
@Override
public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
ctx.fireChannelShutdown(direction);
if (direction == ChannelShutdownDirection.Inbound) {
assert context.delegatingCtx() == ctx || ctx == context;
channelInputClosed(context, false);
}
}
// Channel 被关闭时,尝试从 cumulation 中解码剩余数据,释放资源,并向下传递 fire* 事件
private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
try {
channelInputClosed(ctx);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null) {
cumulation.close();
cumulation = null;
}
if (ctx.fireChannelReadCallCount() > 0) {
ctx.reset();
ctx.fireChannelReadComplete();
}
if (callChannelInactive) {
ctx.fireChannelInactive();
}
}
}
void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
if (cumulation != null) {
callDecode(ctx, cumulation);
if (!ctx.isRemoved()) {
if (cumulation == null) {
try (Buffer buffer = ctx.bufferAllocator().allocate(0)) {
decodeLast(ctx, buffer);
}
} else {
decodeLast(ctx, cumulation);
}
}
} else {
try (Buffer buffer = ctx.bufferAllocator().allocate(0)) {
decodeLast(ctx, buffer);
}
}
}
void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
try {
// 当仍有未解码的数据 (in.readableBytes() > 0) 且当前 Handler 还在 pipeline 中(未被移除),就继续调用 decode() 试图解码更多消息。
while (in.readableBytes() > 0 && !ctx.isRemoved()) {
int oldInputLength = in.readableBytes();
int numReadCalled = ctx.fireChannelReadCallCount();
decodeRemovalReentryProtection(ctx, in);
if (ctx.isRemoved()) {
break;
}
if (numReadCalled == ctx.fireChannelReadCallCount()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() > 0) {
decodeRemovalReentryProtection(ctx, in);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in) throws Exception {
decode(ctx, in);
}
// 建议结合 FixedLengthFrameDecoder阅读,理解decode方法做了什么
protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
// ...
}
ByteToMessageDecoderContext
ByteToMessageDecoderContext
是 Netty 为解码器设计的包装上下文,用于统计 fireChannelRead 次数,以精确控制解码行为和数据流转。
static final class ByteToMessageDecoderContext extends DelegatingChannelHandlerContext {
private int fireChannelReadCalled;
private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
super(ctx);
}
void reset() {
fireChannelReadCalled = 0;
}
int fireChannelReadCallCount() {
return fireChannelReadCalled;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
fireChannelReadCalled ++;
super.fireChannelRead(msg);
return this;
}
}
Cumulator
Cumulator 接口用于处理和累积多个 Buffer 数据。它有两个主要方法,分别负责将多个 Buffer 合并成一个更大的 Buffer,并管理已经处理过的数据。
public interface Cumulator {
// BufferAllocator alloc:缓冲区分配器,用于分配新的 Buffer
// Buffer cumulation:当前的累积数据缓冲区
// Buffer in:新的输入数据缓冲区
// 将多个 Buffer 合并成一个新的 Buffer,即将当前 Buffer 中的可读数据与新的数据合并
Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
// 丢弃缓冲区中已经读取的数据,返回一个新的缓冲区,去除了之前已处理过的部分
Buffer discardSomeReadBytes(Buffer cumulation);
}
CompositeBufferCumulator
CompositeBufferCumulator
适用于大量 Buffer 分段输入时,通过复合缓冲区高效拼接,避免数据拷贝和缓冲区重分配。
private static final class CompositeBufferCumulator implements Cumulator {
@Override
public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
// 如果累加缓冲区 cumulation 没有可读字节了,直接释放并返回输入缓冲区 in
if (cumulation.readableBytes() == 0) {
cumulation.close();
return in;
}
try (in) {
// 输入缓冲区 in 没有可读字节,保留原来的 cumulation。
if (in.readableBytes() == 0) {
return cumulation;
}
// 如果累加缓冲区是只读的,复制出一个可写副本
if (cumulation.readOnly()) {
Buffer tmp = cumulation.copy();
cumulation.close();
cumulation = tmp;
}
// 如果当前累加区已经是 CompositeBuffer,直接扩展进去
if (CompositeBuffer.isComposite(cumulation)) {
CompositeBuffer composite = (CompositeBuffer) cumulation;
composite.extendWith(prepareInForCompose(in));
return composite;
}
return alloc.compose(Arrays.asList(cumulation.send(), prepareInForCompose(in)));
}
}
// 确保 in 是只读安全的后,调用 send() 移交所有权用于合并。
private static Send<Buffer> prepareInForCompose(Buffer in) {
return in.readOnly() ? in.copy().send() : in.send();
}
// 通过 readSplit(0) 剪掉已经读过的部分,释放空间。相比 compact() 更适合复合缓冲区(CompositeBuffer)
@Override
public Buffer discardSomeReadBytes(Buffer cumulation) {
cumulation.readSplit(0).close();
return cumulation;
}
@Override
public String toString() {
return "CompositeBufferCumulator";
}
}
MergeCumulator
MergeCumulator
更适合频繁接收小块数据的场景,追求高访问性能和代码简单;但当数据量变大时,可能因为频繁扩容带来复制开销。
private static final class MergeCumulator implements Cumulator {
@Override
public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
// 如果当前累积的 Buffer 为空
if (cumulation.readableBytes() == 0) {
cumulation.close();
return in;
}
try (in) {
final int required = in.readableBytes();
// 如果当前的累积 Buffer 没有足够的空间,或者它是只读,就扩展大小。
if (required > cumulation.writableBytes() || cumulation.readOnly()) {
return expandCumulationAndWrite(alloc, cumulation, in);
}
cumulation.writeBytes(in);
return cumulation;
}
}
// 如果 cumulation 中的已读字节数超过可写字节数(readerOffset() > writableBytes()),则调用 compact() 方法来压缩缓冲区,移除已读的数据
@Override
public Buffer discardSomeReadBytes(Buffer cumulation) {
if (cumulation.readerOffset() > cumulation.writableBytes()) {
cumulation.compact();
}
return cumulation;
}
private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
// 1. 计算新的 Buffer 大小:计算新的 Buffer 大小,确保它能够容纳当前的 cumulation 和输入的 Buffer。新的大小是当前已读字节和输入字节总和的下一个最接近的 2 的幂次方。
final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
// 创建新的 Buffer:根据是否是只读的 Buffer 来决定如何创建新的 Buffer。如果是只读的,会重新分配内存;否则,直接扩展当前的 Buffer。
Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
oldCumulation.ensureWritable(newSize);
// 3. 将旧数据和新数据写入新 Buffer:如果创建了新的 Buffer,将旧的 cumulation 和输入的 Buffer 都写入新的 Buffer。
try {
if (newCumulation != oldCumulation) {
newCumulation.writeBytes(oldCumulation);
}
newCumulation.writeBytes(in);
return newCumulation;
} finally {
// 4. 关闭旧的 Buffer:如果创建了新的 Buffer,则关闭原先的 cumulation,释放内存。
if (newCumulation != oldCumulation) {
oldCumulation.close();
}
}
}
@Override
public String toString() {
return "MergeCumulator";
}
}
}
FixedLengthFrameDecoder
在此引入这个类,只是因为这个类最简单,方便大家理解 解码器的工作流程。
FixedLengthFrameDecoder
:按照固定的字节长度切割每一帧,适用于每条消息长度一致的协议(如定长二进制协议)。LengthFieldBasedFrameDecoder
:根据消息中指定位置的“长度字段”值来动态切割完整帧,适用于二进制协议。LineBasedFrameDecoder
:以 \n 或 \r\n 为分隔符,将每行作为一帧,适用于基于文本的行协议。DelimiterBasedFrameDecoder
:使用自定义的分隔符(如 $_、#END#)拆分帧,适用于定界符结束的文本协议。
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
public FixedLengthFrameDecoder(int frameLength, Cumulator cumulator) {
super(cumulator);
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
// ByteToMessageDecoder(channelRead[首次读] -> callDecode[循环读取数据]
// ->
// decodeRemovalReentryProtection)
// ->
// FixedLengthFrameDecoder.decode
@Override
protected final void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
Object decoded = decode0(ctx, in);
if (decoded != null) {
// 只要解码成功,就向下传递结果
// ByteToMessageDecoderContext.fireChannelRead
ctx.fireChannelRead(decoded);
}
}
protected Object decode0(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readSplit(frameLength);
}
}
}