使用高性能NIO框架netty实现IM集群对聊方案

发布于:2024-05-31 ⋅ 阅读:(164) ⋅ 点赞:(0)

前言

在前面的博文中我们分享了原生websoket集群搭建,也用redis 发布订阅实现了集群消息正常有序分发。但是有不少同学希望风向一期netty实现websoket,可以实现对聊方案。其实前面分享的websocket集群解决方案完全可以实现im对聊的,只是性能处理上没有nettty那么好。今天我们就分享一起使用高性能NIO框架netty实现IM集群对聊方案,各位看官敬请鉴赏。

技术积累

什么是netty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

netty的核心是支持零拷贝的bytebuf缓冲对象、通用通信api和可扩展的事件模型;它支持多种传输服务并且支持HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议,也支持自定义协议。
  
在这里插入图片描述

netty的模型是基于reactor多线程模型,其中mainReactor用于接收客户端请求并转发给subReactor。SubReactor负责通道的读写请求,非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
在这里插入图片描述

netty如何实现IM

netty支持websocket通讯协议,那么我们就可以用它来实现websoket,实现后端服务主动向前端推送消息的功能。

比如AB用户分别注册到websoket后台,A用户向B用户发送消息,后端接收到A用户消息后判断消息接收者是B用户,然后后端逻辑直接调用B用户websoket连接进行推送即可。

如何实现IM集群

由于websocket 长连接是注册到后端服务本地缓存的,而且这个信道回话是不能被其他中间件缓存的,你们我们就只能在缓存目标用户的服务上拿到会话进行推送消息。

之前博文中也讲到过可以使用消息广播的形式找到目标回话服务,比如Redis的发布订阅、其他Mq等等。当A用户注册到C后端服务上,B服务注册到D后端服务上,这个时候如果A向B发送消息,则需要在C后端服务上增肌广播逻辑,让其他服务感知并监听到消息体,其他服务D收到消息后会验证是否这个用户会话缓存在本地,如果存在则向B前端对送消息,不再则不予处理。这样,就完成了IM集群回话的整个链路流程。
在这里插入图片描述

实战演示

本次实战我们简单使用netty实现IM集群通讯即可,如果需要用于生成环境需要增加一些防卫式程序设计,比如Redis发布监听冗余检验后错误处理等等。

基础配置

maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<!--netty-->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>
<!-- 整合thymeleaf前端页面 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

application配置文件

server:
  port: 9999
spring:
  profiles:
    active: dev
  mvc:
    pathmatch:
      # Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher
      matching-strategy: ant_path_matcher
  redis:
    host: 127.0.0.1
    port: 6379
  thymeleaf:
    mode: HTML
    encoding: UTF-8
    content-type: text/html
    cache: false
    prefix: classpath:/templates/

websocket演示html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
    var socket;
    function openSocket() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
            //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
            //var socketUrl = "ws://192.168.112.10:7777/ws/" + $("#userId").val();
            //var socketUrl = "wss://192.168.112.10/ws/"+ $("#userId").val();
            //var socketUrl = "wss://192.168.112.10:8899"
            var socketUrl = "ws://127.0.0.1:88?userId="+$("#userId").val();
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl);
            if (socket != null) {
                socket.close();
                socket = null;
            }
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function () {
                console.log("websocket已打开");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function (msg) {
                console.log("接收消息为:"+msg.data);
            };
            //关闭事件
            socket.onclose = function () {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                console.log("websocket发生了错误");
            }
        }
    }

    function sendMessage() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            console.log('发送消息为:{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
            socket.send('{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
        }
    }

</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
<p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
</body>
</html>

netty搭建IM集群

创建消息体
Message.java

import lombok.Data;

/**
 * Message
 * @author senfel
 * @version 1.0
 * @date 2024/5/17 14:39
 */
@Data
public class Message {
    /**
     * 消息编码
     */
    private String code;

    /**
     * 来自(保证唯一)
     */
    private String fromUserId;

    /**
     * 去自(保证唯一)
     */
    private String toUserId;

    /**
     * 内容
     */
    private String contentText;


}

创建netty websocket连接池
NettyWebSocketPool.java

import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;

/**
 * NettyWebSocketPool
 * @author senfel
 * @version 1.0
 * @date 2024/5/23 10:19
 */
public class NettyWebSocketPool {
    /**
     * 通道连接池
     */
    public static final ConcurrentHashMap<String, Channel> CHANNELS = new ConcurrentHashMap<>();
}

创建websocket处理器
WebsocketServerHandler.java

import com.example.ccedemo.im.CommonConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

/**
 * WebsocketServerHandler
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 10:57
 */
@Slf4j
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private final static ThreadLocal<String> USER_LIST = new ThreadLocal<>();

    private WebSocketServerHandshaker handshaker;

    private StringRedisTemplate stringRedisTemplate;

    public WebsocketServerHandler() {
    }

    public WebsocketServerHandler(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = reAddr.getAddress().getHostAddress();
        String clientPort = String.valueOf(reAddr.getPort());
        log.debug("有新的客户端接入:{}:{}", clientIp, clientPort);
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        if (msg instanceof CloseWebSocketFrame) {
            disconnectCurrentUser();
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());
            return;
        }
        if (msg instanceof PingWebSocketFrame) {
            log.info("websocket ping message");
            ctx.channel().write(new PingWebSocketFrame(msg.content().retain()));
        } else if (msg instanceof TextWebSocketFrame) {
            // websocket消息解压成字符串让下一个handler处理
            String text = ((TextWebSocketFrame) msg).text();
            log.info("请求数据|{}", text);
            // 如果不调用这个方法后面的handler就获取不到数据
            ctx.fireChannelRead(text);
        } else {
            log.error("不支持的消息格式");
            throw new UnsupportedOperationException("不支持的消息格式");
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (!req.decoderResult().isSuccess()
                || (!"websocket".equalsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE)))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
                "ws://" + req.headers().get(HttpHeaderNames.HOST), null, false);
        handshaker = wsShakerFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            String uri = req.uri();
            Map<String, String> paramMap = null;
            //如果url包含参数,需要处理
            if (uri.contains(CommonConstants.QUESTION)) {
                paramMap = getUrlParams(uri);
                String newUri = uri.substring(0, uri.indexOf(CommonConstants.QUESTION));
                req.setUri(newUri);
            }
            //缓存当前连接
            assert paramMap != null;
            String channelId = "userId:"+paramMap.get("userId");
            log.info("缓存用户通道信息:{}",ctx.channel().localAddress());
            log.info("缓存用户通道信息:{}",ctx.channel().remoteAddress());
            NettyWebSocketPool.CHANNELS.put(channelId, ctx.channel());
            USER_LIST.set(channelId);
            //写入在线用户
            stringRedisTemplate.opsForValue().set(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+channelId, channelId);
            //建立websocket连接握手
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) {
        if (response.status().code() != HttpResponseStatus.OK.code()) {
            ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        ChannelFuture cf = ctx.channel().writeAndFlush(response);
        if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) {
            cf.addListener(ChannelFutureListener.CLOSE);
        }
    }


    /**
     * url 参数切割
     * @param url
     * @return
     */
    private Map<String, String> getUrlParams(String url) {
        Map<String, String> map = new HashMap<>(4);
        url = url.replace("?", ";");
        if (!url.contains(";")) {
            return map;
        }
        if (url.split(";").length > 0) {
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr) {
                String[] data = s.split("=");
                if (data.length > 1) {
                    map.put(data[0], data[1]);
                }
            }
            return map;
        } else {
            return map;
        }
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("消息处理异常:{}", USER_LIST.get(), cause);
        disconnectCurrentUser();
        ctx.close();
    }

    /**
     * 状态触发 检测是否处于空闲状态 间隔时间 60s
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            log.info("握手完成,连接地址为:{}", ctx.channel().remoteAddress());
        } else if (evt instanceof IdleStateEvent) {
            if (!StringUtils.isEmpty(USER_LIST.get())) {
                //断开连接
                disconnectCurrentUser();
                ctx.disconnect();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * disconnectCurrentUser
     * @author senfel
     * @date 2024/5/23 15:13
     * @return void
     */
    private void disconnectCurrentUser() {
        log.info("谁断开了连接:{}",USER_LIST.get());
        log.info("userEventTriggered 触发,断开连接");
        NettyWebSocketPool.CHANNELS.remove(USER_LIST.get());
        stringRedisTemplate.delete(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+USER_LIST.get());
        USER_LIST.remove();
    }
}

创建websocket输入输出处理器
UserWebsocketInHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * UserWebsocketInHandler
 * 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:10
 */
@Slf4j
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> {

    private StringRedisTemplate stringRedisTemplate;

    public UserWebsocketInHandler(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.error(Thread.currentThread().getName() + "|" + msg);
        String pattern ="\\{.*\\}|\\[.*\\]";
        Pattern r= Pattern.compile(pattern);
        Matcher m =r.matcher(msg);
        if(m.matches()){
            stringRedisTemplate.convertAndSend("nettyWebsocketMsgPush",msg);
        }else {
            ctx.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }
}
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * UserWebsocketOutHandler
 * 出站处理器:判断数据是否需要进行封装
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:10
 */

public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof String) {
            ctx.write(new TextWebSocketFrame((String) msg), promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }
}

创建netty服务端
NettyWebsocketServer.java

import com.example.ccedemo.im.NettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * NettyWebsocketServer
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:03
 */
@Slf4j
public class NettyWebsocketServer implements Runnable {

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 服务端IP地址
     */
    private String ip;
    /**
     * 服务端端口号
     */
    private int port;

    public NettyWebsocketServer(String ip, int port, StringRedisTemplate stringRedisTemplate) {
        this.ip = ip;
        this.port = port;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void run() {
        // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以
        final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));
            }
        });

        // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数
        final int totalThread = Runtime.getRuntime().availableProcessors();
        final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));
            }
        });

        // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应
        final int jobThreads = 1024;
        final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));
            }
        });

        // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用
        final LoggingHandler LOGGING_HANDLER = new LoggingHandler();

        // 指定服务端bootstrap
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker)
                // 指定通道类型
                .channel(NioServerSocketChannel.class)
                // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128
                .option(ChannelOption.SO_BACKLOG, 2048)
                // 维持链接的活跃,清除死链接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 关闭延迟发送
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 添加handler处理链
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();

                        // 日志处理
                        pipeline.addLast(LOGGING_HANDLER);
                        // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)
                        pipeline.addLast(new IdleStateHandler(60, 60, 60, TimeUnit.SECONDS));
                        // 处理http请求的编解码器
                        pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());
                        pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());
                        pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));
                        // 处理websocket的编解码器
                        pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler(stringRedisTemplate));
                        // 自定义处理器
                        pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler());
                        pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler(stringRedisTemplate));
                    }
                });
        try {
            // 服务端绑定对外服务地址
            ChannelFuture future = server.bind(ip, port).sync();
            log.info(NettyServer.class + " 启动正在监听: " + future.channel().localAddress());
            // 等待服务关闭,关闭后释放相关资源
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            job.shutdownGracefully();
        }
    }
}

redis发布订阅

redis配置消息监听
RedisListenerConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * RedisListenerConfig
 * @author senfel
 * @version 1.0
 * @date 2024/5/24 16:26
 */
@Configuration
public class RedisListenerConfig {

     @Bean
     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);
        container.addMessageListener(new LiveRedisKeysExpireListener(), new PatternTopic("nettyWebsocketMsgPush"));
        return container;
    }
}

redis监听消息处理
LiveRedisKeysExpireListener.java

import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.Objects;

/**
 * LiveRedisKeysExpireListener
 * @author senfel
 * @version 1.0
 * @date 2024/5/24 16:25
 */
public class LiveRedisKeysExpireListener implements MessageListener {

    @Override
    public void onMessage(Message msg, byte[] bytes) {
        System.out.println("监听到需要进行负载转发的消息:" + msg.toString());
        com.example.ccedemo.nettydemo.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.nettydemo.Message.class);
        Channel channel = NettyWebSocketPool.CHANNELS.get("userId:" + message.getToUserId());
        if(Objects.nonNull(channel)){
            channel.writeAndFlush(new TextWebSocketFrame(msg.toString()));
        }
    }
}

实战测试

浏览器分别打开两个无痕页面
http://127.0.0.1:9999/websocket/page
模拟对话两个页面的用户互斥
在这里插入图片描述

分别发送消息实现对聊
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到