什么是Netty?
Netty是一个基于Java NIO的异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端程序。它简化了网络编程的复杂性,提供了丰富的协议支持,被广泛应用于各种高性能网络应用中。
为什么选择Netty?
- 高性能:基于NIO,支持异步非阻塞I/O
- 高并发:能够处理大量并发连接
- 易用性:提供了简洁的API,降低了网络编程的复杂度
- 可扩展性:模块化设计,易于扩展和定制
- 稳定性:经过大量生产环境验证
核心概念
Channel(通道)
Channel是Netty网络操作的基础,代表一个到实体的连接,如硬件设备、文件、网络套接字等。
EventLoop(事件循环)
EventLoop负责处理Channel上的I/O操作,一个EventLoop可以处理多个Channel。
ChannelHandler(通道处理器)
ChannelHandler处理I/O事件,如连接建立、数据读取、数据写入等。
Pipeline(管道)
Pipeline是ChannelHandler的容器,定义了ChannelHandler的处理顺序。
基本代码演示
1. Maven依赖配置
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
2. 简单的Echo服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
// 创建boss线程组,用于接收连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建worker线程组,用于处理连接
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器启动引导类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加字符串编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义处理器
pipeline.addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("Echo服务器启动成功,监听端口: " + port);
// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
// 优雅关闭线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new EchoServer(8080).start();
}
}
3. 服务器处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println("服务器收到消息: " + message);
// 回显消息给客户端
ctx.writeAndFlush("服务器回复: " + message + "\n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接: " + ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开连接: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
4. 客户端实现
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new EchoClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
Channel channel = future.channel();
System.out.println("连接到服务器: " + host + ":" + port);
// 从控制台读取输入并发送
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if ("quit".equals(line)) {
break;
}
channel.writeAndFlush(line + "\n");
}
// 关闭连接
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("localhost", 8080).start();
}
}
5. 客户端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println("客户端收到消息: " + message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接成功");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
运行步骤
- 首先运行 EchoServer 类启动服务器
- 然后运行 EchoClient 类启动客户端
- 在客户端控制台输入消息,服务器会回显消息
运行EchoServer 输出:
09:53:01.748 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:53:01.755 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:53:01.820 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:53:01.821 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:53:01.822 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:53:01.827 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:53:01.832 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:53:01.832 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:53:01.849 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:53:02.186 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 2552 (auto-detected)
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:53:02.489 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:53:02.490 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:53:02.744 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
Echo服务器启动成功,监听端口: 8080
运行 EchoClient,控制台输入消息, 输出:
09:55:09.541 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:55:09.547 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:55:09.603 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:55:09.606 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:55:09.607 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:55:09.607 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:55:09.608 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:55:09.618 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:55:10.051 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 21460 (auto-detected)
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:55:10.475 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:55:10.477 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:55:10.773 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:55:10.786 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:55:10.787 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:55:10.822 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
连接到服务器: localhost:8080
客户端连接成功
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
09:56:56.673 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@692bca4c
客户端收到消息: 服务器回复:
hello
客户端收到消息: 服务器回复: hello
testNettyServer
客户端收到消息: 服务器回复: testNettyServer
代码解析
服务器端关键点:
- EventLoopGroup:创建两个线程组,bossGroup负责接收连接,workerGroup负责处理连接
- ServerBootstrap:服务器启动引导类,配置各种参数
- ChannelInitializer:初始化Channel,添加编解码器和处理器
- Pipeline:处理器链,定义了消息处理的顺序
客户端关键点:
- Bootstrap:客户端启动引导类
- 连接建立:通过connect方法连接到服务器
- 消息发送:通过Channel发送消息
实际应用建议
- 合理配置线程池大小:根据CPU核心数和业务特点调整