Springboot集成Netty

发布于:2025-09-05 ⋅ 阅读:(22) ⋅ 点赞:(0)

集成Netty实现和设备的TCP通讯

:::success
根据本篇文章实现通用启动方式

高效的netty通讯

:::

定义父类Server

不管启动什么类型的长连接,都通过继承父类Server来启动

该父类中创建了统一的启动和停止服务的方法,子类只需要重写 initialize()方法即可

package cn.com.dyl.server;

import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2023/12/6 14:15
 * @describe
 */

@SuppressWarnings({"rawtypes", "AssignmentUsedAsCondition"})
@Slf4j
public abstract class Server {
	//判断是否启动
	protected boolean isRunning;
	//netty配置类
	protected final NettyConfig config;
	//负责处理连接
	protected EventLoopGroup bossGroup;
	//负责处理channel通道的I/O事件
	protected EventLoopGroup workerGroup;
	//多线程业务组
	protected ExecutorService businessGroup;
	/**
     * 构造方法
     *
     * @param config 配置类
     */
	protected Server(NettyConfig config) {
		this.config = config;
	}
	protected abstract AbstractBootstrap initialize();
	/**
     * 启动方法
     *
     * @return 启动结果
     */
	public synchronized boolean start() {
		if (isRunning) {
			log.warn("==={}已经启动,port:{}===", config.name, config.port);
			return isRunning;
		}
		AbstractBootstrap bootstrap = initialize();
		ChannelFuture future = bootstrap.bind(config.port).awaitUninterruptibly();
		future.channel().closeFuture().addListener(f -> {
			if (isRunning) {
				stop();
			}
		});
		if (future.cause() != null) {
			log.error("启动失败", future.cause());
		}
		if (isRunning = future.isSuccess()) {
			log.warn("==={}启动成功,port:{}===", config.name, config.port);
		}
		return isRunning;
	}

	/**
     * 暂停方法
     */
	public synchronized void stop() {
		//赋值启动标志
		isRunning = false;
		//关闭
		bossGroup.shutdownGracefully();
		//释放资源
		if (workerGroup != null) {
			workerGroup.shutdownGracefully();
		}
		if (businessGroup != null) {
			businessGroup.shutdown();
		}
		//输出日志
		log.warn("==={}已经停止,port:{}===", config.name, config.port);
	}
}

定义netty的配置类

所有启动时所需要的处理器编解码器都在这里进行赋值

该类中通过定义启动时的启动标志来启动指定的子类服务

也可以给子类服务定义心跳检测机制

package cn.com.dyl.server;

import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.webServer.WebMessageHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.ObjectUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2023/12/6 14:15
 * @describe
 */
public class NettyConfig {
    protected final int workerCore;//worker数量
    protected final int businessCore;//business数量
    protected final int readerIdleTime;//读卡器空闲时间
    protected final int writerIdleTime;//写入程序空闲时间
    protected final int allIdleTime;//所有空闲时间
    protected final Integer port;//端口号
    protected final Jtt1078Handler jtt1078Handler;//处理器
    protected final WebMessageHandler webMessageHandler;//web端处理器
    protected final Server server;//服务入口类
    protected final String name;//服务名称

    public NettyConfig(int workerGroup,
                       int businessGroup,
                       int readerIdleTime,
                       int writerIdleTime,
                       int allIdleTime,
                       Integer port,
                       Jtt1078Handler jtt1078Handler,
                       WebMessageHandler webMessageHandler,
                       String name,
                       boolean enableHttp,
                       boolean enableWeb) {
        ObjectUtil.checkNotNull(port, "port");
        ObjectUtil.checkPositive(port, "port");//检查是否严格为正
        int processors = NettyRuntime.availableProcessors();
        this.workerCore = workerGroup > 0 ? workerGroup : processors + 2;
        this.businessCore = businessGroup > 0 ? businessGroup : Math.max(1, processors >> 1);
        this.readerIdleTime = readerIdleTime;
        this.writerIdleTime = writerIdleTime;
        this.allIdleTime = allIdleTime;
        this.port = port;
        this.jtt1078Handler = jtt1078Handler;
        this.webMessageHandler = webMessageHandler;
        //http服务传输标志
        if (enableHttp) {
            this.name = name != null ? name : "mediaHttp";
            this.server = new NettyServer(this);
        } else if (enableWeb) {
            this.name = name != null ? name : "webServer";
            this.server = new AudioServer(this);
        } else {
            this.name = name != null ? name : "jtt1078";
            this.server = new MediaServer(this);
        }
    }

    public Server build() {
        return server;
    }

    public static NettyConfig.Builder custom() {
        return new Builder();
    }

    @NoArgsConstructor
    @Accessors(chain = true)
    @Data
    public static class Builder {
        private int workerCore;
        private int businessCore;
        private int readerIdleTime = 240;
        private int writerIdleTime = 0;
        private int allIdleTime = 0;
        private Integer port;
        private Jtt1078Handler jtt1078Handler;
        private WebMessageHandler webMessageHandler;//web端处理器
        private String name;
        private boolean enableHttp;
        private boolean enableWeb;

        public Builder setThreadGroup(int workerCore, int businessCore) {
            this.workerCore = workerCore;
            this.businessCore = businessCore;
            return this;
        }
        public Builder setIdleStateTime(int readerIdleTime, int writerIdleTime, int allIdleTime) {
            this.readerIdleTime = readerIdleTime;
            this.writerIdleTime = writerIdleTime;
            this.allIdleTime = allIdleTime;
            return this;
        }

        /**
         * 服务创建
         *
         * @return 创建的服务
         */
        public Server build() {
            return new NettyConfig(
                    this.workerCore,
                    this.businessCore,
                    this.readerIdleTime,
                    this.writerIdleTime,
                    this.allIdleTime,
                    this.port,
                    this.jtt1078Handler,
                    this.webMessageHandler,
                    this.name,
                    this.enableHttp,
                    this.enableWeb

            ).build();
        }
    }
}

定义子类启动服务

基于netty的TCP协议,用于接收终端设备的数据

在该类中可以定义编解码器和处理器等

package cn.com.dyl.server;

import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.netty.Jtt1078MessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2023/12/6 14:21
 * @describe
 */
@SuppressWarnings({"rawtypes", "NullableProblems"})
public class MediaServer extends Server {
    /**
     * 构造方法
     *
     * @param config 配置类
     */
    public MediaServer(NettyConfig config) {
        super(config);
    }

    /**
     * 重写初始化方法
     *
     * @return 返回初始化后的数据
     */
    @Override
    protected AbstractBootstrap initialize() {
        //负责处理连接
        bossGroup = new NioEventLoopGroup(
                1,
                new DefaultThreadFactory(
                        config.name,
                        Thread.MAX_PRIORITY
                )
        );
        //负责处理channel通道的I/O事件
        workerGroup = new NioEventLoopGroup(
                Runtime.getRuntime().availableProcessors(),
                new DefaultThreadFactory(
                        config.name,
                        Thread.MAX_PRIORITY
                )
        );
        //增加业务逻辑处理线程组
        DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MediaServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
        //线程数量
        if (config.businessCore > 0) {
            businessGroup = new ThreadPoolExecutor(
                    config.businessCore,
                    config.businessCore,
                    1L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    new DefaultThreadFactory(
                            config.name + "-B",
                            true,
                            Thread.NORM_PRIORITY
                    )
            );
        }
        //和bossGroup一一对应
        return new ServerBootstrap()
                //设置核心线程组和业务工作线程组
                .group(bossGroup, workerGroup)
                //设置nio类型的channel
                .channel(NioServerSocketChannel.class)
                //快速复用端口,避免端口冲突,原理tcp连接需要2ML时间单位回收,这个配置加快进度
                .option(NioChannelOption.SO_REUSEADDR, true)
                .option(NioChannelOption.SO_BACKLOG, 102400)
                //设置低延迟
                .childOption(NioChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    private final HandlerWrapper handler = new HandlerWrapper(config.jtt1078Handler);

                    @Override
                    protected void initChannel(SocketChannel channel) {
                        channel.pipeline()
                                .addLast(new Jtt1078MessageDecoder())
                                .addLast(defaultEventExecutorGroup, "handler", handler);
                    }
                });
    }
}

定义启动配置文件

再该类中定义启动的入口

package cn.com.dyl.config;

import cn.com.dyl.netty.Jtt1078Handler;
import cn.com.dyl.server.NettyConfig;
import cn.com.dyl.server.Server;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2023/12/6 14:14
 * @describe
 */
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "server.enable", havingValue = "true")
public class JTConfig {
    private final Jtt1078Handler jtt1078Handler;

    public JTConfig(Jtt1078Handler jtt1078Handler) {
        this.jtt1078Handler = jtt1078Handler;
    }

    @ConditionalOnProperty(value = "server.media.port")
    @Bean(initMethod = "start", destroyMethod = "stop")
    public Server jtmediaServer(@Value("${server.media.port}") int port) {
        return NettyConfig.custom()
                .setPort(port)
                .setName("1078音视频服务")
                .setJtt1078Handler(jtt1078Handler)
                .build();
    }

集成Netty实现和前端进行webScoket通讯

定义websocket子类

package cn.com.dyl.server;

import cn.com.dyl.netty.HandlerWrapper;
import cn.com.dyl.webServer.HeartBeatHandler;
import cn.com.dyl.webServer.WebMessageDecoder;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2024/1/23 10:20
 * @describe
 */
@SuppressWarnings("rawtypes")
public class AudioServer extends Server {
    /**
     * 构造方法
     *
     * @param config 配置类
     */
    protected AudioServer(NettyConfig config) {
        super(config);
    }

    @Override
    protected AbstractBootstrap initialize() {
        //负责处理连接
        bossGroup = new NioEventLoopGroup(
                1,
                new DefaultThreadFactory(
                        config.name,
                        Thread.MAX_PRIORITY
                )
        );
        //负责处理channel通道的I/O事件
        workerGroup = new NioEventLoopGroup(
                Runtime.getRuntime().availableProcessors(),
                new DefaultThreadFactory(
                        config.name,
                        Thread.MAX_PRIORITY
                )
        );
        //增强也认为逻辑处理线程组
        DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AudioServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
        //线程数量
        if (config.businessCore > 0) {
            businessGroup = new ThreadPoolExecutor(
                    config.businessCore,
                    config.businessCore,
                    1L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    new DefaultThreadFactory(
                            config.name + "-B",
                            true,
                            Thread.NORM_PRIORITY
                    )
            );
        }

        return new ServerBootstrap()
                //设置核心线程组和业务工作线程组
                .group(bossGroup, workerGroup)
                //设置nio类型的channel
                .channel(NioServerSocketChannel.class)//通过TCP/IP方式进行传输
                //快速复用端口,避免端口冲入,原理tcp连接需要2ML时间单位回收,这个;配置加快进度
                .option(NioChannelOption.SO_REUSEADDR, true)
                .option(NioChannelOption.SO_BACKLOG, 102400)
                //设置低延迟
                .childOption(NioChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    private final HandlerWrapper handler = new HandlerWrapper(config.webMessageHandler);

                    @Override
                    protected void initChannel(SocketChannel channel) {
                        channel.pipeline()
                                .addLast(new HttpServerCodec())
                                .addLast(new ChunkedWriteHandler())
                                .addLast(new HttpObjectAggregator(1024*64))
                                .addLast(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime))
                                //自定义的空闲状态监测的handler
                                .addLast(new HeartBeatHandler())
                                .addLast(new WebMessageDecoder())
                                .addLast(defaultEventExecutorGroup, "handler", handler)
                                .addLast(new WebSocketServerProtocolHandler("/audio"));
                    }
                });
    }
}

websocket的重点

websocket的重点在于首次请求连接时使用的http方式,而之后才是webscoket连接

对于http和websocket进行判断并分开处理

package cn.com.dyl.netty;

import cn.com.dyl.threads.SessionManager;
import cn.com.dyl.utils.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;

/**
 * @author 荣家燕
 * @company 达友利科技(廊坊)有限公司
 * @create 2023/12/6 15:20
 * @describe
 */
@ChannelHandler.Sharable
@AllArgsConstructor
@Slf4j
public class HandlerWrapper extends ChannelInboundHandlerAdapter {
    //基础消息处理器
    private final JttMessageHandler handler;
    private static WebSocketServerHandshaker handShaker;//握手构造工厂

    //基础消息处理器
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Channel nettyChannel = ctx.channel();//获取netty的网络传输通道
        //判断是否是websocket
        if (msg instanceof WebSocketFrame) {
            WebSocketFrame frame = (WebSocketFrame) msg;
            handleWebSocketFrame(nettyChannel, frame);
            //判断是否是http请求(websocket建立连接是http请求,需要握手响应)
        } else if (msg instanceof FullHttpRequest) {
            FullHttpRequest httpRequest = (FullHttpRequest) msg;
            handleHttpRequest(nettyChannel, httpRequest);
            //如果都不是那就符合1078 的音视频数据
        } else {
            Packet packet = (Packet) msg;
            handler.channelRead(nettyChannel, packet);
        }
    }

    /**
     * 终端连接过来后的处理
     *
     * @param ctx 上下文
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("<<<<< web服务连接{}", ctx.channel().remoteAddress());
    }

    /**
     * 用来监听客户端是否断开
     *
     * @param ctx 通信管道
     * @throws Exception 异常
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        handler.release(ctx.channel());//如果断开就释放
    }

    /**
     * netty异常处理机制(完成异常处理后要释放并关闭所有的流)
     *
     * @param ctx   通信管道
     * @param cause 异常数据
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        handler.release(ctx.channel());
        ctx.close();
    }

    /**
     * 进行心跳检测,用户超时长时间未操作则会触发
     *
     * @param ctx netty通道
     * @param evt 心跳机制
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {//客户端读超时
                String tag = SessionManager.get(ctx.channel(), "tag");
                log.info("web服务心跳超时: {}", tag);
                handler.release(ctx.channel());//释放
            }
        }
    }

    private void handleHttpRequest(Channel channel, FullHttpRequest request) {
        //如果Http解码失败,返回http异常
        if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(channel, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        //构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory handShakerFactory = new WebSocketServerHandshakerFactory("ws://localhost:1011/audio", null, false);
        handShaker = handShakerFactory.newHandshaker(request);
        if (handShaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
        } else {
            handShaker.handshake(channel, request);
        }
    }

    /**
     * websocket处理
     *
     * @param channel 通道
     * @param frame   数据
     */
    private void handleWebSocketFrame(Channel channel, WebSocketFrame frame) {
        //判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handShaker.close(channel, (CloseWebSocketFrame) frame.retain());
            return;
        }
        //判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            channel.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        //本例程仅支持文本消息,不支持二进制消息
        if (frame instanceof TextWebSocketFrame) {
            //返回应答消息
            String request = ((TextWebSocketFrame) frame).text();
            channel.writeAndFlush(new TextWebSocketFrame(request + ",欢迎使用Netty WebSocket服务,现在时刻:" + LocalDateTime.now()));
        }
        //如果是二进制消息
        if (frame instanceof BinaryWebSocketFrame){
            ByteBuf content = frame.content();
            log.info("收到二进制消息"+content);
        }
    }

    /**
     * 发送http响应
     *
     * @param channel http通道
     * @param req     http请求
     * @param res     http响应
     */
    private static void sendHttpResponse(Channel channel, FullHttpRequest req, FullHttpResponse res) {
        //返回应答给客户端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res, res.content().readableBytes());
        }
        //如果非keep-Alive,关闭连接
        ChannelFuture future = channel.writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到