集成Netty实现和设备的TCP通讯
:::success
根据本篇文章实现通用启动方式
高效的netty通讯
:::
定义父类Server
不管启动什么类型的长连接,都通过继承父类Server来启动
该父类中创建了统一的启动和停止服务的方法,子类只需要重写 initialize()方法即可
package cn.com.dyl.server;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2023/12/6 14:15
* @describe
*/
@SuppressWarnings({"rawtypes", "AssignmentUsedAsCondition"})
@Slf4j
public abstract class Server {
//判断是否启动
protected boolean isRunning;
//netty配置类
protected final NettyConfig config;
//负责处理连接
protected EventLoopGroup bossGroup;
//负责处理channel通道的I/O事件
protected EventLoopGroup workerGroup;
//多线程业务组
protected ExecutorService businessGroup;
/**
* 构造方法
*
* @param config 配置类
*/
protected Server(NettyConfig config) {
this.config = config;
}
protected abstract AbstractBootstrap initialize();
/**
* 启动方法
*
* @return 启动结果
*/
public synchronized boolean start() {
if (isRunning) {
log.warn("==={}已经启动,port:{}===", config.name, config.port);
return isRunning;
}
AbstractBootstrap bootstrap = initialize();
ChannelFuture future = bootstrap.bind(config.port).awaitUninterruptibly();
future.channel().closeFuture().addListener(f -> {
if (isRunning) {
stop();
}
});
if (future.cause() != null) {
log.error("启动失败", future.cause());
}
if (isRunning = future.isSuccess()) {
log.warn("==={}启动成功,port:{}===", config.name, config.port);
}
return isRunning;
}
/**
* 暂停方法
*/
public synchronized void stop() {
//赋值启动标志
isRunning = false;
//关闭
bossGroup.shutdownGracefully();
//释放资源
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (businessGroup != null) {
businessGroup.shutdown();
}
//输出日志
log.warn("==={}已经停止,port:{}===", config.name, config.port);
}
}
定义netty的配置类
所有启动时所需要的处理器编解码器都在这里进行赋值
该类中通过定义启动时的启动标志来启动指定的子类服务
也可以给子类服务定义心跳检测机制
package cn.com.dyl.server;
import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.webServer.WebMessageHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.ObjectUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2023/12/6 14:15
* @describe
*/
public class NettyConfig {
protected final int workerCore;//worker数量
protected final int businessCore;//business数量
protected final int readerIdleTime;//读卡器空闲时间
protected final int writerIdleTime;//写入程序空闲时间
protected final int allIdleTime;//所有空闲时间
protected final Integer port;//端口号
protected final Jtt1078Handler jtt1078Handler;//处理器
protected final WebMessageHandler webMessageHandler;//web端处理器
protected final Server server;//服务入口类
protected final String name;//服务名称
public NettyConfig(int workerGroup,
int businessGroup,
int readerIdleTime,
int writerIdleTime,
int allIdleTime,
Integer port,
Jtt1078Handler jtt1078Handler,
WebMessageHandler webMessageHandler,
String name,
boolean enableHttp,
boolean enableWeb) {
ObjectUtil.checkNotNull(port, "port");
ObjectUtil.checkPositive(port, "port");//检查是否严格为正
int processors = NettyRuntime.availableProcessors();
this.workerCore = workerGroup > 0 ? workerGroup : processors + 2;
this.businessCore = businessGroup > 0 ? businessGroup : Math.max(1, processors >> 1);
this.readerIdleTime = readerIdleTime;
this.writerIdleTime = writerIdleTime;
this.allIdleTime = allIdleTime;
this.port = port;
this.jtt1078Handler = jtt1078Handler;
this.webMessageHandler = webMessageHandler;
//http服务传输标志
if (enableHttp) {
this.name = name != null ? name : "mediaHttp";
this.server = new NettyServer(this);
} else if (enableWeb) {
this.name = name != null ? name : "webServer";
this.server = new AudioServer(this);
} else {
this.name = name != null ? name : "jtt1078";
this.server = new MediaServer(this);
}
}
public Server build() {
return server;
}
public static NettyConfig.Builder custom() {
return new Builder();
}
@NoArgsConstructor
@Accessors(chain = true)
@Data
public static class Builder {
private int workerCore;
private int businessCore;
private int readerIdleTime = 240;
private int writerIdleTime = 0;
private int allIdleTime = 0;
private Integer port;
private Jtt1078Handler jtt1078Handler;
private WebMessageHandler webMessageHandler;//web端处理器
private String name;
private boolean enableHttp;
private boolean enableWeb;
public Builder setThreadGroup(int workerCore, int businessCore) {
this.workerCore = workerCore;
this.businessCore = businessCore;
return this;
}
public Builder setIdleStateTime(int readerIdleTime, int writerIdleTime, int allIdleTime) {
this.readerIdleTime = readerIdleTime;
this.writerIdleTime = writerIdleTime;
this.allIdleTime = allIdleTime;
return this;
}
/**
* 服务创建
*
* @return 创建的服务
*/
public Server build() {
return new NettyConfig(
this.workerCore,
this.businessCore,
this.readerIdleTime,
this.writerIdleTime,
this.allIdleTime,
this.port,
this.jtt1078Handler,
this.webMessageHandler,
this.name,
this.enableHttp,
this.enableWeb
).build();
}
}
}
定义子类启动服务
基于netty的TCP协议,用于接收终端设备的数据
在该类中可以定义编解码器和处理器等
package cn.com.dyl.server;
import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.netty.Jtt1078MessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2023/12/6 14:21
* @describe
*/
@SuppressWarnings({"rawtypes", "NullableProblems"})
public class MediaServer extends Server {
/**
* 构造方法
*
* @param config 配置类
*/
public MediaServer(NettyConfig config) {
super(config);
}
/**
* 重写初始化方法
*
* @return 返回初始化后的数据
*/
@Override
protected AbstractBootstrap initialize() {
//负责处理连接
bossGroup = new NioEventLoopGroup(
1,
new DefaultThreadFactory(
config.name,
Thread.MAX_PRIORITY
)
);
//负责处理channel通道的I/O事件
workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(
config.name,
Thread.MAX_PRIORITY
)
);
//增加业务逻辑处理线程组
DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MediaServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//线程数量
if (config.businessCore > 0) {
businessGroup = new ThreadPoolExecutor(
config.businessCore,
config.businessCore,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DefaultThreadFactory(
config.name + "-B",
true,
Thread.NORM_PRIORITY
)
);
}
//和bossGroup一一对应
return new ServerBootstrap()
//设置核心线程组和业务工作线程组
.group(bossGroup, workerGroup)
//设置nio类型的channel
.channel(NioServerSocketChannel.class)
//快速复用端口,避免端口冲突,原理tcp连接需要2ML时间单位回收,这个配置加快进度
.option(NioChannelOption.SO_REUSEADDR, true)
.option(NioChannelOption.SO_BACKLOG, 102400)
//设置低延迟
.childOption(NioChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
private final HandlerWrapper handler = new HandlerWrapper(config.jtt1078Handler);
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new Jtt1078MessageDecoder())
.addLast(defaultEventExecutorGroup, "handler", handler);
}
});
}
}
定义启动配置文件
再该类中定义启动的入口
package cn.com.dyl.config;
import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.server.NettyConfig;
import cn.com.dyl.server.Server;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2023/12/6 14:14
* @describe
*/
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "server.enable", havingValue = "true")
public class JTConfig {
private final Jtt1078Handler jtt1078Handler;
public JTConfig(Jtt1078Handler jtt1078Handler) {
this.jtt1078Handler = jtt1078Handler;
}
@ConditionalOnProperty(value = "server.media.port")
@Bean(initMethod = "start", destroyMethod = "stop")
public Server jtmediaServer(@Value("${server.media.port}") int port) {
return NettyConfig.custom()
.setPort(port)
.setName("1078音视频服务")
.setJtt1078Handler(jtt1078Handler)
.build();
}
集成Netty实现和前端进行webScoket通讯
定义websocket子类
package cn.com.dyl.server;
import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.webServer.HeartBeatHandler;
import cn.com.dyl.webServer.WebMessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2024/1/23 10:20
* @describe
*/
@SuppressWarnings("rawtypes")
public class AudioServer extends Server {
/**
* 构造方法
*
* @param config 配置类
*/
protected AudioServer(NettyConfig config) {
super(config);
}
@Override
protected AbstractBootstrap initialize() {
//负责处理连接
bossGroup = new NioEventLoopGroup(
1,
new DefaultThreadFactory(
config.name,
Thread.MAX_PRIORITY
)
);
//负责处理channel通道的I/O事件
workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(
config.name,
Thread.MAX_PRIORITY
)
);
//增强也认为逻辑处理线程组
DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AudioServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//线程数量
if (config.businessCore > 0) {
businessGroup = new ThreadPoolExecutor(
config.businessCore,
config.businessCore,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DefaultThreadFactory(
config.name + "-B",
true,
Thread.NORM_PRIORITY
)
);
}
return new ServerBootstrap()
//设置核心线程组和业务工作线程组
.group(bossGroup, workerGroup)
//设置nio类型的channel
.channel(NioServerSocketChannel.class)//通过TCP/IP方式进行传输
//快速复用端口,避免端口冲入,原理tcp连接需要2ML时间单位回收,这个;配置加快进度
.option(NioChannelOption.SO_REUSEADDR, true)
.option(NioChannelOption.SO_BACKLOG, 102400)
//设置低延迟
.childOption(NioChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
private final HandlerWrapper handler = new HandlerWrapper(config.webMessageHandler);
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(1024*64))
.addLast(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime))
//自定义的空闲状态监测的handler
.addLast(new HeartBeatHandler())
.addLast(new WebMessageDecoder())
.addLast(defaultEventExecutorGroup, "handler", handler)
.addLast(new WebSocketServerProtocolHandler("/audio"));
}
});
}
}
websocket的重点
websocket的重点在于首次请求连接时使用的http方式,而之后才是webscoket连接
对于http和websocket进行判断并分开处理
package cn.com.dyl.netty;
import cn.com.dyl.threads.SessionManager;
import cn.com.dyl.utils.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;
/**
* @author 荣家燕
* @company 达友利科技(廊坊)有限公司
* @create 2023/12/6 15:20
* @describe
*/
@ChannelHandler.Sharable
@AllArgsConstructor
@Slf4j
public class HandlerWrapper extends ChannelInboundHandlerAdapter {
//基础消息处理器
private final JttMessageHandler handler;
private static WebSocketServerHandshaker handShaker;//握手构造工厂
//基础消息处理器
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel nettyChannel = ctx.channel();//获取netty的网络传输通道
//判断是否是websocket
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
handleWebSocketFrame(nettyChannel, frame);
//判断是否是http请求(websocket建立连接是http请求,需要握手响应)
} else if (msg instanceof FullHttpRequest) {
FullHttpRequest httpRequest = (FullHttpRequest) msg;
handleHttpRequest(nettyChannel, httpRequest);
//如果都不是那就符合1078 的音视频数据
} else {
Packet packet = (Packet) msg;
handler.channelRead(nettyChannel, packet);
}
}
/**
* 终端连接过来后的处理
*
* @param ctx 上下文
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("<<<<< web服务连接{}", ctx.channel().remoteAddress());
}
/**
* 用来监听客户端是否断开
*
* @param ctx 通信管道
* @throws Exception 异常
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
handler.release(ctx.channel());//如果断开就释放
}
/**
* netty异常处理机制(完成异常处理后要释放并关闭所有的流)
*
* @param ctx 通信管道
* @param cause 异常数据
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
handler.release(ctx.channel());
ctx.close();
}
/**
* 进行心跳检测,用户超时长时间未操作则会触发
*
* @param ctx netty通道
* @param evt 心跳机制
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {//客户端读超时
String tag = SessionManager.get(ctx.channel(), "tag");
log.info("web服务心跳超时: {}", tag);
handler.release(ctx.channel());//释放
}
}
}
private void handleHttpRequest(Channel channel, FullHttpRequest request) {
//如果Http解码失败,返回http异常
if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(channel, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//构造握手响应返回,本机测试
WebSocketServerHandshakerFactory handShakerFactory = new WebSocketServerHandshakerFactory("ws://localhost:1011/audio", null, false);
handShaker = handShakerFactory.newHandshaker(request);
if (handShaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
} else {
handShaker.handshake(channel, request);
}
}
/**
* websocket处理
*
* @param channel 通道
* @param frame 数据
*/
private void handleWebSocketFrame(Channel channel, WebSocketFrame frame) {
//判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handShaker.close(channel, (CloseWebSocketFrame) frame.retain());
return;
}
//判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
channel.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
//本例程仅支持文本消息,不支持二进制消息
if (frame instanceof TextWebSocketFrame) {
//返回应答消息
String request = ((TextWebSocketFrame) frame).text();
channel.writeAndFlush(new TextWebSocketFrame(request + ",欢迎使用Netty WebSocket服务,现在时刻:" + LocalDateTime.now()));
}
//如果是二进制消息
if (frame instanceof BinaryWebSocketFrame){
ByteBuf content = frame.content();
log.info("收到二进制消息"+content);
}
}
/**
* 发送http响应
*
* @param channel http通道
* @param req http请求
* @param res http响应
*/
private static void sendHttpResponse(Channel channel, FullHttpRequest req, FullHttpResponse res) {
//返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}
//如果非keep-Alive,关闭连接
ChannelFuture future = channel.writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}