文章目录
前言
之前写过直接在springboot项目中整合websocet的操作,但tomcat对于websocket的长连接而言显得太过笨重了。
长连接放入netty服务中。
其他正常接口请求放于tomcat中。
netty简介
Netty
是一个NIO
客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
配置前的准备
本次使用的各项版本如下:
- jdk 17
- maven 3.6.3
- springboot 3.4.0
- netty-all 4.1.87.Final
项目配置
增加依赖
常规的springboot项目中,想要使用netty很简单,只需要引入netty的pom依赖即可。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.87.Final</version>
</dependency>
启动类配置
在springboot的启动类中,监听项目的运行状态,若项目启动成功,则进行netty服务的部署。
import cn.xj.service.NettyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
@Autowired
private NettyService nettyService;
/**
* netty 容器启动
* 定义默认绑定端口 9090
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
nettyService.start();
}
}
netty-server 主要配置类
增加如下的配置代码,各项配置说明在代码中已做标注。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.cors.CorsConfig;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* netty 服务
*/
@Service
public class NettyService {
@Value("${websocket.server.port:9090}")
private int port;
@Autowired
private MyAuthHandler myAuthHandler;
@Autowired
private MyChannelHandler myChannelHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void start() throws InterruptedException {
try {
// 创建线程池,用于处理服务器的接受连接请求
bossGroup = new NioEventLoopGroup(1);
// 创建线程池,用于处理 已经接受的连接 ,包括读取数据、处理数据和发送数据
workerGroup = new NioEventLoopGroup();
// 创建服务器启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// (非必备)打印日志
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 绑定关联线程池
serverBootstrap.group(bossGroup,workerGroup);
// 指定 IO 模型
serverBootstrap.channel(NioServerSocketChannel.class);
// 初始化连接通道配置
// 此处可以放入一个自定义的子类 ,继承 ChannelInitializer 即可,可以采取spring注入
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 职责链模式
ChannelPipeline pipeline = channel.pipeline();
// 设置自定义通道消息处理器
// 会出现转换异常(class java.lang.String cannot be cast to class io.netty.handler.codec.http.HttpObject (java.lang.String is in module java.base of loader 'bootstrap'; io.netty.handler.codec.http.HttpObject is in unnamed module of loader 'app'))
//pipeline.addLast(new StringDecoder(), new StringEncoder());
// http 支持
pipeline.addLast(new HttpServerCodec());
//解决跨域问题
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
pipeline.addLast(new CorsHandler(corsConfig));
// 大数据流支持
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(65536));
// 绑定鉴权处理
pipeline.addLast(myAuthHandler);
// websocket 绑定 path
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 心跳检测
pipeline.addLast(new IdleStateHandler(3000,3000,10, TimeUnit.SECONDS));
// 绑定自定义数据接受处理器
pipeline.addLast(myChannelHandler);
}
});
// 设置服务器可以挂起未处理的连接的数量
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 绑定服务器监听端口, 同步等待成功
ChannelFuture sync = serverBootstrap.bind(port).sync();
System.out.println("启动正在监听: " + sync.channel().localAddress());
// 关闭服务器通道(阻塞监听关闭)
sync.channel().closeFuture().sync();
} finally {
// 释放线程池资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
配置鉴权处理类 MyAuthHandler
本次操作中不写鉴权的逻辑,仅打印日志做一个输出处理。
仅在每个连接建立的时候,会进入其中处理。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import org.springframework.stereotype.Component;
@Component
public class MyAuthHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
System.out.println("****** 进行鉴权处理 ******");
// 鉴权处理 不满足权限设定直接抛出异常即可
ctx.fireChannelRead(request.retain()); // 不加这个链路不会继续向下执行
// 仅在开始建立连接操作时鉴权,当连接已创建则无需继续鉴权
ctx.pipeline().remove(MyAuthHandler.class);
}
}
这里需要特别注意,鉴权失败时,仅需要对上游抛出异常即可。但鉴权成功,需要让当前责任链继续向下执行,必须要添加ctx.fireChannelRead(request.retain());
配置连接成功后的处理类 MyChannelHandler
这里在建立连接成功后会打印日志信息,其次在前端成功发送消息后,将原消息返回前端。
import com.alibaba.fastjson2.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class MyChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 处理接收到的消息
* @param ctx 通道上下文
* @param message 消息对象
*/
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
// System.out.println("Server received: " + message.toString());
// ctx.writeAndFlush("from service: Hello,client!");
// ctx.fireChannelActive();
//
// }
/**
* 处理I/O事件的异常
* @param ctx 通道上下文
* @param cause 异常原因
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.out.println("[" + incoming.remoteAddress() + "] 出现异常: " + cause.getMessage());
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息:" + msg.text());
// 写回消息
Map<String, Object> map = new HashMap<>();
map.put("data","server 回执:server" + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(map)));
}
}
测试
apipost 连接测试
在apipost中选择新建websocket连接,如下
netty-server 中绑定的端口,本次设定的是9090
,websocket绑定的path地址为 /
,所以连接地址配置为ws://localhost:9090/
。
发送消息测试: