方案二、Nacos+Gateway(结合SpringCloud生态)
方案三、Zookeeper + Redis + RabbitMQ方案实现
Chat-Web服务监听Zookeeper节点清理Redis与RabbitMQ残留数据
一、背景
Netty 是一个基于 Java NIO 的高性能网络应用框架,广泛应用于高并发、低延迟的通信场景(如游戏服务器、即时通讯、RPC 框架等)。
单体架构:单台服务器运行一个 Netty 实例,处理所有客户端请求,适合轻量级应用或开发测试阶段,资源集中但存在单点风险。
集群架构:多台服务器协同工作,通过负载均衡、分布式通信等技术共同处理请求,提升性能和可靠性,通过横向扩展解决性能瓶颈,适合高并发、高可用性要求的场景,但需处理分布式复杂性。
二、Netty 单体架构的优缺点
优点
简单易用:无需考虑分布式协调、数据分片等问题,开发逻辑直接(如直接操作 Channel 和 EventLoop)。部署方便,一台服务器即可运行,适合快速验证业务逻辑。
低延迟通信:所有请求在同一进程内处理,避免网络传输和序列化开销,适合对延迟敏感的场景(如实时游戏)。
资源集中管理:共享线程池、缓存等资源,减少重复创建开销。调试方便,可直接通过日志或调试工具定位问题。
成本低:无需额外负载均衡器或分布式中间件,硬件和运维成本较低。
缺点
单点故障风险:服务器宕机或网络中断会导致整个服务不可用,缺乏容灾能力。
性能瓶颈:单台服务器的 CPU、内存、网络带宽有限,无法支撑超大规模并发(如百万级连接)。
扩展性差:垂直扩展(升级硬件)成本高,且受物理限制;水平扩展(增加服务器)需重构为集群架构。
维护困难:随着业务增长,单体代码可能变得臃肿,模块间耦合度高,难以维护和迭代。
三、Netty 集群架构的优缺点
优点
高可用性:通过多节点部署和心跳检测,实现故障自动转移(如使用 ZooKeeper 或 etcd 管理节点状态)。单节点故障不影响整体服务,适合金融、电商等对稳定性要求高的场景。
弹性扩展:水平扩展方便,通过增加服务器即可提升处理能力(如支持千万级连接)。结合负载均衡(如 Nginx、LVS)或服务发现(如 Consul)动态分配流量。
负载均衡:请求均匀分发到多个节点,避免单节点过载,提升资源利用率。支持根据业务优先级或用户特征进行智能路由(如灰度发布)。
数据一致性支持:结合分布式缓存(如 Redis)或数据库分片,解决多节点数据同步问题。适合需要强一致性的场景(如订单处理、支付系统)。
缺点
复杂性增加:需处理分布式事务、序列化、网络分区(脑裂)等问题,开发难度显著提升。需要引入中间件(如 Kafka、RocketMQ)或框架(如 Spring Cloud)协调节点间通信。
性能开销:节点间通信需经过网络传输和序列化/反序列化,增加延迟(如 gRPC 的 Protobuf 编码)。负载均衡器可能成为瓶颈(如 Nginx 性能不足时需升级或分片)。
运维成本高:需监控多节点状态、日志聚合(如 ELK)、分布式追踪(如 SkyWalking)等。部署和升级需考虑滚动重启、数据迁移等操作,流程复杂。
一致性挑战:分布式环境下难以保证强一致性,需权衡 CAP 理论(如采用最终一致性模型)。需设计幂等、重试、补偿机制应对网络异常。
四、适用场景对比
选择单体:若业务规模小、对延迟敏感且无需高可用,单体 Netty 是简单高效的选择。
选择集群:若需支撑高并发、高可用或未来扩展,集群架构是必然趋势,但需投入更多资源解决分布式问题。
五、Netty单体架构代码实现
请参考:【Netty实战】基于Netty+WebSocket的IM通信后台服务代码详解-CSDN博客
六、Netty集群架构方案实现
方案一、Nginx负载均衡实现集群(较为简单)
Nginx配置
http {
upstream netty_cluster {
server 192.168.1.101:875; # 节点1
server 192.168.1.102:875; # 节点2
ip_hash; # 基于客户端IP的会话保持
}
server {
listen 80;
location /ws {
proxy_pass http://netty_cluster;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}
前端连接方式
const socket = new WebSocket("ws://your-nginx-ip:875/ws");
由于单节点不可能有全部的channel信息,后续的会话转发可参考方案三中的RabbitMQ实现
方案二、Nacos+Gateway(结合SpringCloud生态)
Netty服务
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project>
<dependencies>
<!-- Netty核心 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
<!-- Nacos服务发现 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</project>
application.yml
server:
port: 875 # Netty服务端口
spring:
application:
name: netty-service
cloud:
nacos:
discovery:
server-addr: nacos-server:8848
namespace: prod
ephemeral: true
netty:
websocket:
path: /ws
启动类
@SpringBootApplication
@EnableDiscoveryClient
public class NettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
}
@Bean
public ApplicationRunner nettyStarter() {
return args -> {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitializer())
.bind(875).sync()
.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
};
}
}
gateway网关服务
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project>
<dependencies>
<!-- Spring Cloud Gateway -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- Nacos服务发现 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8080
spring:
application:
name: api-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: netty-ws-route
uri: lb://netty-service
predicates:
- Path=/ws/**
filters:
- StripPrefix=1
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
nacos:
discovery:
server-addr: nacos-server:8848
前端连接方式
// 通过Gateway连接
const socket = new WebSocket("ws://your-gateway-ip/ws");
由于单节点不可能有全部的channel信息,后续的会话转发可参考方案三中的RabbitMQ实现
方案三、Zookeeper + Redis + RabbitMQ方案实现
redis自动分配端口
其实这里也可以将端口与在线人数放在Redis中,改成zookeeper方案可以不需要在中断连接后,监听并且清理在线人数和端口,因为netty与zk建立的临时节点,中断连接后,会自动删除该临时节点。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.3</version>
</dependency>
/**
* Jedis 连接池工具类
*/
public class JedisPoolUtils {
private static final JedisPool jedisPool;
static {
//配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
//最大连接数
poolConfig.setMaxTotal(10);
//最大空闲连接
poolConfig.setMaxIdle(10);
//最小空闲连接
poolConfig.setMinIdle(5);
//最长等待时间,ms
poolConfig.setMaxWaitMillis(1500);
//创建连接池对象
jedisPool = new JedisPool(poolConfig,
"127.0.0.1",
6379,
1000,
"root");
}
public static Jedis getJedis(){
return jedisPool.getResource();
}
}
// 动态分配端口
public static Integer selectPort(Integer port) {
String portKey = "netty_port";
Jedis jedis = JedisPoolUtils.getJedis();
Map<String, String> portMap = jedis.hgetAll(portKey);
System.out.println(portMap);
// 由于map中的key都应该是整数类型的port,所以先转换成整数后,再比对,否则string类型的比对会有问题
List<Integer> portList = portMap.entrySet().stream()
.map(entry -> Integer.valueOf(entry.getKey()))
.collect(Collectors.toList());
// step1: 编码到此处先运行测试看一下结果
System.out.println(portList);
Integer nettyPort = null;
if (portList == null || portList.isEmpty()) {
// step2: 编码到此处先运行测试看一下结果
jedis.hset(portKey, port+"", initOnlineCounts);
nettyPort = port;
} else {
// 循环portList,获得最大值,并且累加10
Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);
Integer maxPort = maxInteger.get().intValue();
Integer currentPort = maxPort + 10;
jedis.hset(portKey, currentPort+"", initOnlineCounts);
nettyPort = currentPort;
}
// step3: 编码到此处先运行测试看一下最终结果
return nettyPort;
}
// 删除端口分配关系
public static void removePort(Integer port) {
String portKey = "netty_port";
Jedis jedis = JedisPoolUtils.getJedis();
jedis.hdel(portKey, port+"");
}
这样就可以在启动类中自动分配端口
public static void main(String[] args) throws Exception {
// 定义主从线程组
// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务
EventLoopGroup workerGroup = new NioEventLoopGroup();
// Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动
Integer nettyPort = selectPort(875);
try {
// 构建Netty服务器
ServerBootstrap server = new ServerBootstrap(); // 服务的启动类
server.group(bossGroup, workerGroup) // 把主从线程池组放入到启动类中
.channel(NioServerSocketChannel.class) // 设置Nio的双向通道
.childHandler(new WSServerInitializer()); // 设置处理器,用于处理workerGroup
// 启动server,并且绑定分配的端口号,同时启动方式为"同步"
ChannelFuture channelFuture = server.bind(nettyPort).sync();
// 监听关闭的channel
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅的关闭线程池组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// 移除现有的redis与netty的端口关系
removePort(nettyPort);
}
}
Zookeeper实现Netty服务的注册、在线人数
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>
/**
* Zookeeper 配置类
*/
public class CuratorConfig {
private static String host = "127.0.0.1:3191"; // 单机/集群的ip:port地址
private static Integer connectionTimeoutMs = 30 * 1000; // 连接超时时间
private static Integer sessionTimeoutMs = 3 * 1000; // 会话超时时间
private static Integer sleepMsBetweenRetry = 2 * 1000; // 每次重试的间隔时间
private static Integer maxRetries = 3; // 最大重试次数
private static String namespace = "IM"; // 命名空间(root根节点名称)
// curator客户端
private static CuratorFramework client;
static {
// 声明重试策略
RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);
// 声明初始化客户端
client = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(backoffRetry)
.namespace(namespace)
.build();
client.start(); // 启动curator客户端
}
public static CuratorFramework getClient() {
return client;
}
}
/**
* Netty服务节点类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class NettyServerNode {
private String ip; // IP地址
private Integer port; // 服务端口
private Integer onlineCounts = 0; // 在线人数
}
/**
* Zookeeper注册工具类 - 用于注册Netty服务节点和管理在线人数统计
*/
public class ZookeeperRegister {
/**
* 注册Netty服务到Zookeeper
* @param nodeName 节点名称(如服务名称)
* @param ip Netty服务IP地址
* @param port Netty服务端口号
* @throws Exception 可能抛出的异常
*/
public static void registerNettyServer(String nodeName,
String ip,
Integer port) throws Exception {
// 获取Zookeeper客户端连接
CuratorFramework zkClient = CuratorConfig.getClient();
String path = "/" + nodeName;
// 检查父节点是否存在,不存在则创建持久化节点
Stat stat = zkClient.checkExists().forPath(path);
if (stat == null) {
zkClient.create()
.creatingParentsIfNeeded() // 自动创建父节点
.withMode(CreateMode.PERSISTENT) // 持久化节点
.forPath(path);
} else {
System.out.println(stat.toString());
}
// 创建临时顺序节点存储Netty服务信息(EPHEMERAL_SEQUENTIAL表示临时顺序节点)
NettyServerNode serverNode = new NettyServerNode();
serverNode.setIp(ip);
serverNode.setPort(port);
String nodeJson = JsonUtils.objectToJson(serverNode); // 对象转JSON
zkClient.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 临时顺序节点
.forPath(path + "/im-", nodeJson.getBytes()); // 节点路径格式:/nodeName/im-0000000001
}
/**
* 获取本机IP地址
* @return 本机IP地址
* @throws Exception 可能抛出的异常
*/
public static String getLocalIp() throws Exception {
InetAddress addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
System.out.println("本机IP地址:" + ip);
return ip;
}
/**
* 增加在线人数统计
* @param serverNode Netty服务节点信息
* @throws Exception 可能抛出的异常
*/
public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {
dealOnlineCounts(serverNode, 1); // 增加1个在线人数
}
/**
* 减少在线人数统计
* @param serverNode Netty服务节点信息
* @throws Exception 可能抛出的异常
*/
public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {
dealOnlineCounts(serverNode, -1); // 减少1个在线人数
}
/**
* 处理在线人数的增减操作(核心方法)
* @param serverNode Netty服务节点信息
* @param counts 变化量(+1表示增加,-1表示减少)
* @throws Exception 可能抛出的异常
*/
public static void dealOnlineCounts(NettyServerNode serverNode,
Integer counts) throws Exception {
// 获取Zookeeper客户端连接
CuratorFramework zkClient = CuratorConfig.getClient();
// 创建分布式读写锁(防止并发修改问题)
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(
zkClient, "/rw-locks");
readWriteLock.writeLock().acquire(); // 获取写锁
try {
String path = "/server-list";
// 获取所有子节点
List<String> list = zkClient.getChildren().forPath(path);
// 遍历所有节点
for (String node : list) {
String pendingNodePath = path + "/" + node;
// 获取节点数据
String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));
// 反序列化为NettyServerNode对象
NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);
// 匹配IP和端口的服务节点
if (pendingNode.getIp().equals(serverNode.getIp()) &&
(pendingNode.getPort().intValue() == serverNode.getPort().intValue())) {
// 更新在线人数
pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);
String nodeJson = JsonUtils.objectToJson(pendingNode);
// 写回Zookeeper
zkClient.setData().forPath(pendingNodePath, nodeJson.getBytes());
}
}
} finally {
readWriteLock.writeLock().release(); // 释放写锁
}
}
}
然后启动服务时将节点注册到Zookeeper上
public static void main(String[] args) throws Exception {
// 定义主从线程组
// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务
EventLoopGroup workerGroup = new NioEventLoopGroup();
// Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动
Integer nettyPort = selectPort(875);
// 注册当前netty服务到zookeeper中
ZookeeperRegister.registerNettyServer("server-list",
ZookeeperRegister.getLocalIp(),
nettyPort);
try {
// 构建Netty服务器
ServerBootstrap server = new ServerBootstrap(); // 服务的启动类
server.group(bossGroup, workerGroup) // 把主从线程池组放入到启动类中
.channel(NioServerSocketChannel.class) // 设置Nio的双向通道
.childHandler(new WSServerInitializer()); // 设置处理器,用于处理workerGroup
// 启动server,并且绑定自动分配的端口号,同时启动方式为"同步"
ChannelFuture channelFuture = server.bind(nettyPort).sync();
// 监听关闭的channel
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅的关闭线程池组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// 移除现有的redis与netty的端口关系
removePort(nettyPort);
}
}
Chat-Web服务根据人数最少策略拿到Netty地址
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.3.0</version>
</dependency>
zookeeper:
curator:
host: 127.0.0.1:3191
connectionTimeoutMs: 30000
sessionTimeoutMs: 3000
sleepMsBetweenRetry: 2000
maxRetries: 3
namespace: itzixi-im
@Component
@ConfigurationProperties(prefix = "zookeeper.curator")
@Data
public class CuratorConfig extends BaseInfoProperties {
private String host; // 单机/集群的ip:port地址
private Integer connectionTimeoutMs; // 连接超时时间
private Integer sessionTimeoutMs; // 会话超时时间
private Integer sleepMsBetweenRetry; // 每次重试的间隔时间
private Integer maxRetries; // 最大重试次数
private String namespace; // 命名空间(root根节点名称)
public static final String path = "/server-list";
@Bean("curatorClient")
public CuratorFramework curatorClient() {
// 三秒后重连一次,只连一次
//RetryPolicy retryOneTime = new RetryOneTime(3000);
// 每3秒重连一次,重连3次
//RetryPolicy retryNTimes = new RetryNTimes(3, 3000);
// 每3秒重连一次,总等待时间超过10秒则停止重连
//RetryPolicy retryPolicy = new RetryUntilElapsed(10 * 1000, 3000);
// 随着重试次数的增加,重试的间隔时间也会增加(推荐)
RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);
// 声明初始化客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(backoffRetry)
.namespace(namespace)
.build();
client.start(); // 启动curator客户端
return client;
}
@Resource(name = "curatorClient")
private CuratorFramework zkClient;
@PostMapping("getNettyOnlineInfo")
public GraceJSONResult getNettyOnlineInfo() throws Exception {
// 从zookeeper中获得当前已经注册的netty 服务列表
String path = "/server-list";
List<String> list = zkClient.getChildren().forPath(path);
List<NettyServerNode> serverNodeList = new ArrayList<>();
for (String node:list) {
// System.out.println(node);
String nodeValue = new String(zkClient.getData().forPath(path + "/" + node));
// System.out.println(nodeValue);
NettyServerNode serverNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);
serverNodeList.add(serverNode);
}
// 计算当前哪个zk的node是最少人数连接,获得[ip:port]并且返回给前端
Optional<NettyServerNode> minNodeOptional = serverNodeList
.stream()
.min(Comparator.comparing(nettyServerNode -> nettyServerNode.getOnlineCounts()));
NettyServerNode minNode = minNodeOptional.get();
return Result.ok(minNode);
}
这样前端就可以根据调用此接口获得的Netty节点进行连接
RabbitMQ实现Netty服务对消息的监听消费
我们这里将使用RabbitMQ的topic消息队列将消息广播到所有Netty服务,各个Netty服务进行查找要发送的用户的channel,最终会有一台找到了并且进行发送或者都没找到存储到数据库。
当然我们也可以用Redis实现,只需要将用户ID与Netty服务的节点进行绑定,当发送消息时去Redis找到要发送的用户channel所在的节点,使用RabbitMQ发送到对应节点的队列即可,可以不用广播到所有Netty节点了。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
/**
* RabbitMQ连接工具类 - 提供RabbitMQ连接池管理和消息收发功能
*/
public class RabbitMQConnectUtils {
// 连接池集合,用于复用连接
private final List<Connection> connections = new ArrayList<>();
// 连接池最大连接数限制
private final int maxConnection = 20;
// RabbitMQ服务器配置
private final String host = "127.0.0.1";
private final int port = 5682;
private final String username = "root";
private final String password = "1234";
private final String virtualHost = "IM"; // 虚拟主机
// RabbitMQ连接工厂
public ConnectionFactory factory;
/**
* 获取RabbitMQ连接工厂
* @return ConnectionFactory实例
*/
public ConnectionFactory getRabbitMqConnection() {
return getFactory();
}
/**
* 获取连接工厂(单例模式)
* @return 初始化好的ConnectionFactory
*/
public ConnectionFactory getFactory() {
initFactory();
return factory;
}
/**
* 初始化连接工厂配置
*/
private void initFactory() {
try {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(host); // 设置主机地址
factory.setPort(port); // 设置端口
factory.setUsername(username); // 设置用户名
factory.setPassword(password); // 设置密码
factory.setVirtualHost(virtualHost); // 设置虚拟主机
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息到RabbitMQ
* @param message 消息内容
* @param exchange 交换机名称
* @param routingKey 路由键
* @throws Exception 可能抛出的异常
*/
public void sendMsg(String message, String exchange, String routingKey) throws Exception {
// 从连接池获取连接
Connection connection = getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 发布消息(消息持久化)
channel.basicPublish(exchange,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("utf-8"));
// 关闭通道
channel.close();
// 归还连接到连接池
setConnection(connection);
}
/**
* 从指定队列获取单条消息
* @param queue 队列名称
* @param autoAck 是否自动确认
* @return GetResponse对象,包含消息内容
* @throws Exception 可能抛出的异常
*/
public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
GetResponse getResponse = null;
// 从连接池获取连接
Connection connection = getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 获取消息
getResponse = channel.basicGet(queue, autoAck);
// 关闭通道
channel.close();
// 归还连接到连接池
setConnection(connection);
return getResponse;
}
/**
* 从连接池获取连接
* @return RabbitMQ连接
* @throws Exception 可能抛出的异常
*/
public Connection getConnection() throws Exception {
return getAndSetConnection(true, null);
}
/**
* 归还连接到连接池
* @param connection 要归还的连接
* @throws Exception 可能抛出的异常
*/
public void setConnection(Connection connection) throws Exception {
getAndSetConnection(false, connection);
}
/**
* 监听指定交换机的队列消息(FANOUT模式)
* @param fanout_exchange 交换机名称
* @param queueName 队列名称
* @throws Exception 可能抛出的异常
*/
public void listen(String fanout_exchange, String queueName) throws Exception {
// 获取连接
Connection connection = getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明FANOUT类型交换机(持久化)
channel.exchangeDeclare(fanout_exchange,
BuiltinExchangeType.FANOUT,
true, false, false, null);
// 声明队列(持久化,非排他,非自动删除)
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列到交换机(FANOUT模式不需要路由键)
channel.queueBind(queueName, fanout_exchange, "");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel){
/**
* 消息处理回调方法
* @param consumerTag 消费者标签
* @param envelope 消息信封(包含交换机和路由信息)
* @param properties 消息属性
* @param body 消息体
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 解析消息内容
String msg = new String(body);
System.out.println("body = " + msg);
String exchange = envelope.getExchange();
System.out.println("exchange = " + exchange);
// 处理fanout_exchange类型的消息
if (exchange.equalsIgnoreCase("fanout_exchange")) {
// 反序列化消息内容
DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);
String senderId = dataContent.getChatMsg().getSenderId();
String receiverId = dataContent.getChatMsg().getReceiverId();
// 1. 发送消息给接收者(支持多设备)
List<io.netty.channel.Channel> receiverChannels =
UserChannelSession.getMultiChannels(receiverId);
UserChannelSession.sendToTarget(receiverChannels, dataContent);
// 2. 同步消息给发送者的其他设备(排除当前设备)
String currentChannelId = dataContent.getExtend();
List<io.netty.channel.Channel> senderChannels =
UserChannelSession.getMyOtherChannels(senderId, currentChannelId);
UserChannelSession.sendToTarget(senderChannels, dataContent);
}
}
};
// 开始消费消息(自动确认模式)
channel.basicConsume(queueName, true, consumer);
}
/**
* 连接池核心管理方法(线程安全)
* @param isGet true表示获取连接,false表示归还连接
* @param connection 要归还的连接(isGet为false时有效)
* @return 获取到的连接(isGet为true时有效)
* @throws Exception 可能抛出的异常
*/
private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
// 确保连接工厂已初始化
getRabbitMqConnection();
if (isGet) {
// 获取连接逻辑
if (connections.isEmpty()) {
// 连接池为空,创建新连接
return factory.newConnection();
}
// 从连接池取出第一个连接
Connection newConnection = connections.get(0);
connections.remove(0);
// 检查连接是否有效
if (newConnection.isOpen()) {
return newConnection;
} else {
// 连接已关闭,创建新连接
return factory.newConnection();
}
} else {
// 归还连接逻辑
if (connections.size() < maxConnection) {
// 连接池未满,回收连接
connections.add(connection);
}
// 连接池已满,不回收(连接会被自动关闭)
return null;
}
}
}
修改ChatHandler信息处理类,消息不再在此类中处理,而是发给RabbitMQ
/**
* ChatHandler类
*/
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于记录和管理所有客户端的channel组
public static ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
// 获得客户端传输过来的消息
String content = msg.text();
System.out.println("接受到的数据:" + content);
// 1. 获取客户端发来的消息并且解析
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
ChatMsg chatMsg = dataContent.getChatMsg();
String msgText = chatMsg.getMsg();
String receiverId = chatMsg.getReceiverId();
String senderId = chatMsg.getSenderId();
// 判断是否黑名单 start
// 如果双方只要有一方是黑名单,则终止发送
Result result = OkHttpUtil.get("http://127.0.0.1:1000/friendship/isBlack?friendId1st=" + receiverId
+ "&friendId2nd=" + senderId);
boolean isBlack = (Boolean)result.getData();
System.out.println("当前的黑名单关系为: " + isBlack);
if (isBlack) {
return;
}
// 判断是否黑名单 end
// 时间校准,以服务器的时间为准
chatMsg.setChatTime(LocalDateTime.now());
Integer msgType = chatMsg.getMsgType();
// 获取channel
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
String currentChannelIdShort = currentChannel.id().asShortText();
// 2. 判断消息类型,根据不同的类型来处理不同的业务
if (msgType == MsgTypeEnum.CONNECT_INIT.type) {
// 当websocket初次open的时候,初始化channel,把channel和用户userid关联起来
UserChannelSession.putMultiChannels(senderId, currentChannel);
UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);
NettyServerNode minNode = dataContent.getServerNode();
// System.out.println(minNode);
// 初次连接后,该节点下的在线人数累加
ZookeeperRegister.incrementOnlineCounts(minNode);
// 获得ip+端口,在redis中设置关系,以便在前端设备断线后减少在线人数
Jedis jedis = JedisPoolUtils.getJedis();
jedis.set(senderId, JsonUtils.objectToJson(minNode));
} else if (msgType == MsgTypeEnum.WORDS.type
|| msgType == MsgTypeEnum.IMAGE.type
|| msgType == MsgTypeEnum.VIDEO.type
|| msgType == MsgTypeEnum.VOICE.type
) {
// 此处为mq异步解耦,保存信息到数据库,数据库无法获得信息的主键id,
// 所以此处可以用snowflake直接生成唯一的主键id
Snowflake snowflake = new Snowflake(new IdWorkerConfigBean());
String sid = snowflake.nextId();
System.out.println("sid = " + sid);
String iid = IdWorker.getIdStr();
System.out.println("iid = " + iid);
chatMsg.setMsgId(sid);
// 此处receiverId所对应的channel为空
// 发送消息
// List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);
// if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {
// receiverChannels为空,表示用户离线/断线状态,消息不需要发送,后续可以存储到数据库
// chatMsg.setIsReceiverOnLine(false);
// } else {
// chatMsg.setIsReceiverOnLine(true);
if (msgType == MsgTypeEnum.VOICE.type) {
chatMsg.setIsRead(false);
}
dataContent.setChatMsg(chatMsg);
String chatTimeFormat = LocalDateUtils
.format(chatMsg.getChatTime(),
LocalDateUtils.DATETIME_PATTERN_2);
dataContent.setChatTime(chatTimeFormat);
// UserChannelSession.sendToTarget(receiverChannels, dataContent);
// 通过RabbitMQ发送消息
MessagePublisher.sendMsgToOtherNettyServer(JsonUtils.objectToJson(dataContent));
// 当receiverChannels为空不为空的时候,同账户多端设备接受消息
// for (Channel c : receiverChannels) {
// Channel findChannel = clients.find(c.id());
// if (findChannel != null) {
//
// // if (msgType == MsgTypeEnum.VOICE.type) {
// // chatMsg.setIsRead(false);
// // }
// // dataContent.setChatMsg(chatMsg);
// // String chatTimeFormat = LocalDateUtils
// // .format(chatMsg.getChatTime(),
// // LocalDateUtils.DATETIME_PATTERN_2);
// // dataContent.setChatTime(chatTimeFormat);
// // 发送消息给在线的用户
// findChannel.writeAndFlush(
// new TextWebSocketFrame(
// JsonUtils.objectToJson(dataContent)));
// }
//
// }
// }
// TODO: 消息持久化到数据库(通过MQ异步处理或者其他方式)
}
// 此处也不需要了,都在mq的监听中完成
// dataContent.setChatMsg(chatMsg);
// String chatTimeFormat = LocalDateUtils
// .format(chatMsg.getChatTime(),
// LocalDateUtils.DATETIME_PATTERN_2);
// dataContent.setChatTime(chatTimeFormat);
// dataContent.setExtend(currentChannelId);
//
// List<Channel> myOtherChannels = UserChannelSession
// .getMyOtherChannels(senderId, currentChannelId);
// UserChannelSession.sendToMyOthers(myOtherChannels, dataContent);
// for (Channel c : myOtherChannels) {
// Channel findChannel = clients.find(c.id());
// if (findChannel != null) {
// // dataContent.setChatMsg(chatMsg);
// // String chatTimeFormat = LocalDateUtils
// // .format(chatMsg.getChatTime(),
// // LocalDateUtils.DATETIME_PATTERN_2);
// // dataContent.setChatTime(chatTimeFormat);
// // 同步消息给在线的其他设备端
// findChannel.writeAndFlush(
// new TextWebSocketFrame(
// JsonUtils.objectToJson(dataContent)));
// }
// }
// currentChannel.writeAndFlush(new TextWebSocketFrame(currentChannelId));
// clients.writeAndFlush(new TextWebSocketFrame(currentChannelId));
// 调试输出当前会话状态
UserChannelSession.outputMulti();
}
/**
* 客户端连接到服务端之后(打开链接)
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("客户端建立连接,channel对应的长id为:" + currentChannelId);
// 获得客户端的channel,并且存入到ChannelGroup中进行管理(作为一个客户端群组)
clients.add(currentChannel);
}
/**
* 关闭连接,移除channel
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("客户端关闭连接,channel对应的长id为:" + currentChannelId);
// 移除多余的会话
String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);
UserChannelSession.removeUselessChannels(userId, currentChannelId);
clients.remove(currentChannel);
// zk中在线人数累减
Jedis jedis = JedisPoolUtils.getJedis();
NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
NettyServerNode.class);
ZookeeperRegister.decrementOnlineCounts(minNode);
}
/**
* 发生异常并且捕获,移除channel
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("发生异常捕获,channel对应的长id为:" + currentChannelId);
// 发生异常之后关闭连接(关闭channel)
ctx.channel().close();
// 随后从ChannelGroup中移除对应的channel
clients.remove(currentChannel);
// 移除多余的会话
String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);
UserChannelSession.removeUselessChannels(userId, currentChannelId);
// zk中在线人数累减
Jedis jedis = JedisPoolUtils.getJedis();
NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
NettyServerNode.class);
ZookeeperRegister.decrementOnlineCounts(minNode);
}
}
public class RabbitMQConnectUtils {
private final List<Connection> connections = new ArrayList<>();
private final int maxConnection = 20;
private final String host = "127.0.0.1";
private final int port = 5682;
private final String username = "root";
private final String password = "1234";
private final String virtualHost = "IM";
public ConnectionFactory factory;
public ConnectionFactory getRabbitMqConnection() {
return getFactory();
}
public ConnectionFactory getFactory() {
initFactory();
return factory;
}
private void initFactory() {
try {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMsg(String message, String exchange, String routingKey) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(exchange,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("utf-8"));
channel.close();
setConnection(connection);
}
public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
GetResponse getResponse = null;
Connection connection = getConnection();
Channel channel = connection.createChannel();
getResponse = channel.basicGet(queue, autoAck);
channel.close();
setConnection(connection);
return getResponse;
}
public Connection getConnection() throws Exception {
return getAndSetConnection(true, null);
}
public void setConnection(Connection connection) throws Exception {
getAndSetConnection(false, connection);
}
public void listen(String fanout_exchange, String queueName) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
// FANOUT 发布订阅模式(广播模式)
channel.exchangeDeclare(fanout_exchange,
BuiltinExchangeType.FANOUT,
true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, fanout_exchange, "");
Consumer consumer = new DefaultConsumer(channel){
/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("body = " + msg);
String exchange = envelope.getExchange();
System.out.println("exchange = " + exchange);
if (exchange.equalsIgnoreCase("fanout_exchange")) {
DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);
String senderId = dataContent.getChatMsg().getSenderId();
String receiverId = dataContent.getChatMsg().getReceiverId();
// 广播至集群的其他节点并且发送给用户聊天信息
List<io.netty.channel.Channel> receiverChannels =
UserChannelSession.getMultiChannels(receiverId);
UserChannelSession.sendToTarget(receiverChannels, dataContent);
// 广播至集群的其他节点并且同步给自己其他设备聊天信息
String currentChannelId = dataContent.getExtend();
List<io.netty.channel.Channel> senderChannels =
UserChannelSession.getMyOtherChannels(senderId, currentChannelId);
UserChannelSession.sendToTarget(senderChannels, dataContent);
}
}
};
/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(queueName, true, consumer);
}
private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
getRabbitMqConnection();
if (isGet) {
if (connections.isEmpty()) {
return factory.newConnection();
}
Connection newConnection = connections.get(0);
connections.remove(0);
if (newConnection.isOpen()) {
return newConnection;
} else {
return factory.newConnection();
}
} else {
if (connections.size() < maxConnection) {
connections.add(connection);
}
return null;
}
}
}
public class MessagePublisher {
public static void sendMsgToOtherNettyServer(String msg) throws Exception {
RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
String fanout_exchange = "fanout_exchange";
connectUtils.sendMsg(msg, fanout_exchange, "");
}
}
Chat-Web服务监听Zookeeper节点清理Redis与RabbitMQ残留数据
spring:
rabbitmq:
host: 127.0.0.1
port: 5682
username: root
password: 1234
virtual-host: wechat-dev
删除队列只可以使用RabbitAdmin,RabbitTemplate无法删除
/**
* RabbitAdmin的配置类
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/**
* 构建RabbitMQ的连接工厂
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setPassword(username);
connectionFactory.setUsername(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
/**
* 构建RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryPolicy;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* Zookeeper Curator 客户端配置类
* 功能:
* 1. 初始化Curator客户端连接
* 2. 监听Zookeeper节点变化
* 3. 处理节点删除时的清理工作(Redis/RabbitMQ)
*/
@Slf4j
@Component
@ConfigurationProperties(prefix = "zookeeper.curator") // 从配置文件中读取前缀为zookeeper.curator的属性
@Data // Lombok注解,自动生成getter/setter
public class CuratorConfig extends BaseInfoProperties {
// Zookeeper连接配置
private String host; // Zookeeper服务器地址(格式:ip:port)
private Integer connectionTimeoutMs; // 连接超时时间(毫秒)
private Integer sessionTimeoutMs; // 会话超时时间(毫秒)
private Integer sleepMsBetweenRetry; // 重试间隔时间(毫秒)
private Integer maxRetries; // 最大重试次数
private String namespace; // 命名空间(相当于根节点)
// 监听的Zookeeper路径
public static final String path = "/server-list";
// Redis和RabbitMQ操作模板
@Autowired
private RedisTemplate redisTemplate;
@Resource
private RabbitAdmin rabbitAdmin;
/**
* 创建CuratorFramework客户端Bean
* @return 配置好的Curator客户端实例
*/
@Bean("curatorClient")
public CuratorFramework curatorClient() {
// 使用指数退避策略进行重试(推荐)
// 参数:初始重试间隔时间,最大重试次数
RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);
// 构建Curator客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(host) // Zookeeper服务器地址
.connectionTimeoutMs(connectionTimeoutMs) // 连接超时时间
.sessionTimeoutMs(sessionTimeoutMs) // 会话超时时间
.retryPolicy(backoffRetry) // 重试策略
.namespace(namespace) // 命名空间隔离
.build();
client.start(); // 启动客户端
// 注册节点监听器
add(path, client);
return client;
}
/**
* 注册Zookeeper节点监听器
* @param path 监听的节点路径
* @param client Curator客户端实例
*/
public void add(String path, CuratorFramework client) {
// 创建节点缓存
CuratorCache curatorCache = CuratorCache.build(client, path);
// 添加监听器
curatorCache.listenable().addListener((type, oldData, data) -> {
// type: 事件类型(NODE_CREATED, NODE_CHANGED, NODE_DELETED)
// oldData: 事件发生前的节点数据
// data: 事件发生后的节点数据
switch (type.name()) {
case "NODE_CREATED":
log.info("(子)节点创建");
break;
case "NODE_CHANGED":
log.info("(子)节点数据变更");
break;
case "NODE_DELETED":
log.info("(子)节点删除");
// 反序列化被删除节点的数据
NettyServerNode oldNode = JsonUtils.jsonToPojo(
new String(oldData.getData()),
NettyServerNode.class
);
log.info("被删除节点路径: {}, 节点值: {}", oldData.getPath(), oldNode);
// 1. 清理Redis中的相关数据
String oldPort = oldNode.getPort() + "";
String portKey = "netty_port";
redis.hdel(portKey, oldPort); // 删除Redis中存储的端口信息
// 2. 删除RabbitMQ中对应的队列
String queueName = "netty_queue_" + oldPort;
rabbitAdmin.deleteQueue(queueName); // 删除RabbitMQ队列
break;
default:
log.info("未处理的事件类型: {}", type);
break;
}
});
curatorCache.start(); // 启动监听
}
}
七、结语
在分布式系统日益普及的今天,Netty 作为高性能网络通信框架,其单体架构与集群架构的选择需紧密结合业务需求、团队能力和资源投入进行权衡。
单体架构 以 简单、低延迟、低成本 为核心优势,适合快速验证、轻量级应用或资源受限的场景。然而,其 单点故障风险 和 性能天花板 决定了它难以支撑大规模并发或高可用性要求,长期来看可能成为业务增长的瓶颈。
集群架构 通过 分布式扩展、容灾能力和负载均衡 解决了单体架构的痛点,是支撑高并发、高稳定性系统的关键方案。但随之而来的是 复杂性提升、性能开销增加 以及 运维成本高企 等挑战,需要团队具备分布式系统设计、监控治理和故障恢复的成熟经验。
实践建议:
初期优先单体:在业务初期或内部工具开发中,优先选择单体架构以快速迭代,降低开发成本。
渐进式迁移:当并发量接近单机极限(如 10K+ 连接)或可用性要求提升时,通过服务拆分、网关层抽象或消息队列(如 Kafka)逐步向集群过渡。
技术选型平衡:集群架构中需合理选择负载均衡策略(如轮询、最少连接)、序列化协议(如 Protobuf、JSON)和一致性模型(如最终一致性),避免过度设计。
关注可观测性:集群环境下需加强日志聚合(ELK)、分布式追踪(SkyWalking)和链路压测,确保问题可定位、性能可优化。
最终目标:无论选择单体还是集群,均应以 业务价值 为导向,避免为“分布式而分布式”。在技术复杂性与业务需求间找到平衡点,才能构建出既高效又稳定的网络通信系统。
上述三种方案可大致实现Netty集群,如果有更高性能的方案或者疑问欢迎评论区留言讨论!