加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
https://1024bat.cn/
https://github.com/webVueBlog/fastapi_plus
https://webvueblog.github.io/JavaPlusDoc/
点击勘误issues,哪吒感谢大家的阅读
1. 正常情况下(网关没挂)
假设有一台设备,ID 是 ABC123
它连接到 网关1 (192.168.1.10)
网关1 会在 Redis 里写一条记录:
key = device:gateway:ABC123
value = 192.168.1.10
TTL = 120秒
网关1 每收到设备的心跳,就 把这条记录的 TTL 续回 120 秒(就像闹钟一直推迟)。
Nginx 用 一致性哈希,保证这个设备始终连到 网关1。
2. 网关宕机后会发生什么
假设 网关1 突然断电 / 崩溃:
设备的 TCP 连接断了
因为网关死了,设备发的数据没人收,连接会超时或直接被操作系统断开。
Redis 里的 key 不再续期
-
原来网关1 每次收到心跳就延长 TTL
现在它挂了,所以 TTL 会一直往下掉,直到 120 秒后过期。
设备尝试重新连接
-
设备断开后会按程序设置重连,重新通过域名
iot-gateway.example.com:9000
去连 Nginx。
3. Nginx 的一致性哈希重新分配
Nginx 收到设备的重新连接请求时,会用设备的 IP 去算一个哈希值,然后决定转发到哪台网关。
由于 网关1 已经宕机,Nginx 会自动把流量转到 网关2 或 网关3。
这样,设备就会直接连到新的网关。
4. 新网关接管设备
新网关(比如网关2)收到设备连接时,会重新在 Redis 写入映射:
key = device:gateway:ABC123 value = 192.168.1.11 TTL = 120秒
后端业务(通过 Kafka)就会继续收到来自这个设备的数据,整个过程对业务几乎是无感的。
5. 过程总结(小白版)
可以想象成:
Redis 像一个登记簿,记录“哪个设备在哪个网关”;
网关像前台接待,会不断去登记簿上续签(心跳);
网关死了 → 不再续签 → 登记过期 → 设备被 Nginx 重新安排到另一个前台(新网关);
新网关继续接待,并重新登记。
61ed5027-a9eb-45e4-a658-2b67b76ddcf3.png 狠活儿来了⚙️——这是一个可跑的 Spring Boot + Netty 接入网关模板,带上 Redis(连接映射 & 心跳续期) 和 Kafka(消息分发) 。你拿过去改下协议解码就能用。
目录结构
iot-gateway/ ├─ pom.xml ├─ src/main/java/com/example/gw/ │ ├─ GatewayApplication.java │ ├─ config/ │ │ ├─ NettyProperties.java │ │ ├─ KafkaConfig.java │ │ └─ RedisConfig.java │ ├─ netty/ │ │ ├─ NettyServer.java │ │ ├─ initializer/GatewayInitializer.java │ │ ├─ codec/FrameDecoder.java │ │ ├─ codec/MessageDecoder.java │ │ ├─ codec/MessageEncoder.java │ │ └─ handler/ │ │ ├─ AuthRegisterHandler.java │ │ ├─ HeartbeatHandler.java │ │ └─ BusinessDispatcherHandler.java │ ├─ service/ │ │ ├─ ChannelManager.java │ │ ├─ DeviceRouterService.java │ │ └─ HeartbeatRenewService.java │ ├─ mq/KafkaProducer.java │ ├─ model/ │ │ ├─ DeviceMessage.java │ │ └─ Consts.java │ └─ util/Bytes.java └─ src/main/resources/ ├─ application.yml └─ logback-spring.xml
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>iot-gateway</artifactId> <version>1.0.0</version> <properties> <java.version>17</java.version> <spring.boot.version>3.3.1</spring.boot.version> <netty.version>4.1.110.Final</netty.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type><scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- Spring Boot --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency> <!-- Netty --> <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency> <!-- Kafka --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency> <!-- Redis --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency> <!-- utils --> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency> </dependencies> <build> <plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId> <configuration><source>${java.version}</source><target>${java.version}</target></configuration> </plugin> </plugins> </build> </project>
application.yml
server: port: 8080 netty: port: 7001 bossThreads: 1 workerThreads: 8 backlog: 1024 soRcvbuf: 1048576 soSndbuf: 1048576 soReuseaddr: true soKeepalive: true writeBufWaterMarkLow: 33554432 # 32MB writeBufWaterMarkHigh: 67108864 # 64MB idleReaderSeconds: 90 spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: acks: all retries: 3 linger-ms: 5 batch-size: 32768 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer data: redis: host: 127.0.0.1 port: 6379 lettuce: pool: max-active: 64 max-idle: 32 min-idle: 8 gateway: kafkaTopic: device.upstream redisDeviceKeyPrefix: device_gateway: host: ${HOSTNAME:gw-1} # 当前网关标识(或 IP) logging: level: root: info com.example.gw: info
GatewayApplication.java
package com.example.gw; import com.example.gw.netty.NettyServer; import lombok.RequiredArgsConstructor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import jakarta.annotation.PostConstruct; @SpringBootApplication @RequiredArgsConstructor public class GatewayApplication { private final NettyServer nettyServer; public static void main(String[] args) { // 重要:提升文件句柄数(Linux: ulimit -n 1000000) SpringApplication.run(GatewayApplication.class, args); } @PostConstruct public void startNetty() { nettyServer.start(); } }
config/NettyProperties.java
package com.example.gw.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "netty") public class NettyProperties { private int port; private int bossThreads; private int workerThreads; private int backlog; private int soRcvbuf; private int soSndbuf; private boolean soReuseaddr; private boolean soKeepalive; private int writeBufWaterMarkLow; private int writeBufWaterMarkHigh; private int idleReaderSeconds; }
config/RedisConfig.java
package com.example.gw.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; @Configuration public class RedisConfig { @Bean public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory factory) { return new StringRedisTemplate(factory); } }
config/KafkaConfig.java
package com.example.gw.config; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaConfig { // Spring Boot autoconfig 已足够;如需拦截器、分区器可在此扩展 }
netty/NettyServer.java
package com.example.gw.netty; import com.example.gw.config.NettyProperties; import com.example.gw.netty.initializer.GatewayInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.ResourceLeakDetector; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component @RequiredArgsConstructor public class NettyServer { private final NettyProperties props; private final GatewayInitializer initializer; public void start() { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); boolean epoll = Epoll.isAvailable(); EventLoopGroup boss = epoll ? new EpollEventLoopGroup(props.getBossThreads()) : new NioEventLoopGroup(props.getBossThreads()); EventLoopGroup worker = epoll ? new EpollEventLoopGroup(props.getWorkerThreads()) : new NioEventLoopGroup(props.getWorkerThreads()); try { ServerBootstrap b = new ServerBootstrap() .group(boss, worker) .channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, props.getBacklog()) .option(ChannelOption.SO_REUSEADDR, props.isSoReuseaddr()) .childOption(ChannelOption.SO_KEEPALIVE, props.isSoKeepalive()) .childOption(ChannelOption.SO_RCVBUF, props.getSoRcvbuf()) .childOption(ChannelOption.SO_SNDBUF, props.getSoSndbuf()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(props.getWriteBufWaterMarkLow(), props.getWriteBufWaterMarkHigh())) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(initializer); ChannelFuture f = b.bind(props.getPort()).sync(); log.info("Netty gateway started on port {}", props.getPort()); f.channel().closeFuture().addListener(cf -> { boss.shutdownGracefully(); worker.shutdownGracefully(); }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Netty start interrupted", e); } catch (Exception e) { log.error("Netty start failed", e); boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
initializer/GatewayInitializer.java
package com.example.gw.netty.initializer; import com.example.gw.config.NettyProperties; import com.example.gw.netty.codec.FrameDecoder; import com.example.gw.netty.codec.MessageDecoder; import com.example.gw.netty.codec.MessageEncoder; import com.example.gw.netty.handler.AuthRegisterHandler; import com.example.gw.netty.handler.HeartbeatHandler; import com.example.gw.netty.handler.BusinessDispatcherHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor public class GatewayInitializer extends ChannelInitializer<SocketChannel> { private final NettyProperties props; private final AuthRegisterHandler authRegisterHandler; private final HeartbeatHandler heartbeatHandler; private final BusinessDispatcherHandler dispatcherHandler; @Override protected void initChannel(SocketChannel ch) { ch.pipeline() // 拆包:根据协议做帧界定(示例长度前缀/分隔符/固定头尾,自选) .addLast("frameDecoder", new FrameDecoder()) // 协议解码(把ByteBuf→DeviceMessage) .addLast("msgDecoder", new MessageDecoder()) // 协议编码(下行时DeviceMessage→ByteBuf) .addLast("msgEncoder", new MessageEncoder()) // 空闲检测(读空闲触发心跳处理/断开) .addLast("idle", new IdleStateHandler(props.getIdleReaderSeconds(), 0, 0)) // 注册鉴权(首包携带deviceId等) .addLast("auth", authRegisterHandler) // 心跳 .addLast("hb", heartbeatHandler) // 分发到Kafka .addLast("dispatcher", dispatcherHandler); } }
codec/FrameDecoder.java(示例:简单定界或长度前缀,自己换成 JT808 也行)
package com.example.gw.netty.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** 示例:长度前缀(2字节) + body */ public class FrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 2) return; in.markReaderIndex(); int len = in.readUnsignedShort(); if (in.readableBytes() < len) { in.resetReaderIndex(); return; } ByteBuf frame = in.readRetainedSlice(len); out.add(frame); } }
codec/MessageDecoder.java(把帧→业务对象)
package com.example.gw.netty.codec; import com.example.gw.model.DeviceMessage; import com.example.gw.util.Bytes; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; /** Demo 协议:| devLen(1) | devId(N) | msgType(1) | payload(剩余) | */ public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { int devLen = buf.readUnsignedByte(); byte[] devBytes = new byte[devLen]; buf.readBytes(devBytes); String deviceId = new String(devBytes); int msgType = buf.readUnsignedByte(); byte[] payload = new byte[buf.readableBytes()]; buf.readBytes(payload); DeviceMessage msg = new DeviceMessage(); msg.setDeviceId(deviceId); msg.setMsgType(msgType); msg.setPayload(payload); msg.setTs(System.currentTimeMillis()); out.add(msg); } }
codec/MessageEncoder.java(下行需要时用)
package com.example.gw.netty.codec; import com.example.gw.model.DeviceMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MessageEncoder extends MessageToByteEncoder<DeviceMessage> { @Override protected void encode(ChannelHandlerContext ctx, DeviceMessage msg, ByteBuf out) { byte[] dev = msg.getDeviceId().getBytes(); out.writeShort(1 + dev.length + 1 + (msg.getPayload() == null ? 0 : msg.getPayload().length)); // length out.writeByte(dev.length); out.writeBytes(dev); out.writeByte(msg.getMsgType()); if (msg.getPayload() != null) out.writeBytes(msg.getPayload()); } }
model/DeviceMessage.java
package com.example.gw.model; import lombok.Data; @Data public class DeviceMessage { private String deviceId; private int msgType; // 自定义或协议中的消息类型 private byte[] payload; // 原始负载 private long ts; // 网关接收时间 }
model/Consts.java
package com.example.gw.model; public interface Consts { int TYPE_REGISTER = 0x01; int TYPE_HEARTBEAT = 0x02; int TYPE_GPS = 0x10; }
service/ChannelManager.java(内存 + Redis 映射)
package com.example.gw.service; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Service @RequiredArgsConstructor public class ChannelManager { private final StringRedisTemplate redis; @Value("${gateway.redisDeviceKeyPrefix}") private String keyPrefix; @Value("${gateway.host}") private String host; // 本机内存:deviceId -> Channel private final Map<String, Channel> deviceChannel = new ConcurrentHashMap<>(); // 反查:channelId -> deviceId private final Map<ChannelId, String> channelDevice = new ConcurrentHashMap<>(); public void bind(String deviceId, Channel ch) { deviceChannel.put(deviceId, ch); channelDevice.put(ch.id(), deviceId); // 写入 Redis(60s 续期) redis.opsForValue().set(keyPrefix + deviceId, host, 60, TimeUnit.SECONDS); } public void renew(String deviceId) { redis.expire(keyPrefix + deviceId, 60, TimeUnit.SECONDS); } public void unbind(Channel ch) { String deviceId = channelDevice.remove(ch.id()); if (deviceId != null) { deviceChannel.remove(deviceId); // 不立即删 Redis,交给 TTL 过期;也可显式删除: // redis.delete(keyPrefix + deviceId); } } public Channel getChannel(String deviceId) { return deviceChannel.get(deviceId); } public String getDeviceId(Channel ch) { return channelDevice.get(ch.id()); } }
service/DeviceRouterService.java(查 Redis 看设备在哪台网关)
package com.example.gw.service; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class DeviceRouterService { private final StringRedisTemplate redis; @Value("${gateway.redisDeviceKeyPrefix}") private String keyPrefix; public String getGatewayHost(String deviceId) { return redis.opsForValue().get(keyPrefix + deviceId); } }
service/HeartbeatRenewService.java(定时续期:可选)
package com.example.gw.service; import lombok.RequiredArgsConstructor; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; /** 如果设备心跳很频繁,可在收到心跳时直接续期,不需要此任务。此类仅演示。 */ @Service @RequiredArgsConstructor public class HeartbeatRenewService { private final ChannelManager channelManager; @Scheduled(fixedDelay = 30000) public void renewAll() { // 可遍历已知 deviceId 做 redis TTL 续期(不建议大规模遍历,这里仅示例) } }
netty/handler/AuthRegisterHandler.java(设备首包注册)
package com.example.gw.netty.handler; import com.example.gw.model.Consts; import com.example.gw.model.DeviceMessage; import com.example.gw.service.ChannelManager; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class AuthRegisterHandler extends SimpleChannelInboundHandler<DeviceMessage> { private final ChannelManager channelManager; @Override protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) { if (msg.getMsgType() == Consts.TYPE_REGISTER) { String deviceId = msg.getDeviceId(); channelManager.bind(deviceId, ctx.channel()); log.info("Device registered: {} via {}", deviceId, ctx.channel().remoteAddress()); // TODO: 回应注册应答(按协议编码) // ctx.writeAndFlush(ackMessage); return; // 注册消息不再下发,直接吃掉 } ctx.fireChannelRead(msg); // 交给下一个 } @Override public void channelInactive(ChannelHandlerContext ctx) { channelManager.unbind(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.warn("Auth handler error", cause); ctx.close(); } }
netty/handler/HeartbeatHandler.java
package com.example.gw.netty.handler; import com.example.gw.model.Consts; import com.example.gw.model.DeviceMessage; import com.example.gw.service.ChannelManager; import io.netty.channel.*; import io.netty.handler.timeout.IdleStateEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class HeartbeatHandler extends SimpleChannelInboundHandler<DeviceMessage> { private final ChannelManager channelManager; @Override protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) { if (msg.getMsgType() == Consts.TYPE_HEARTBEAT) { String deviceId = channelManager.getDeviceId(ctx.channel()); if (deviceId != null) channelManager.renew(deviceId); // TODO: 心跳应答(可回包) return; } ctx.fireChannelRead(msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent e) { log.info("Idle close: {}", ctx.channel().remoteAddress()); ctx.close(); } else { ctx.fireUserEventTriggered(evt); } } }
netty/handler/BusinessDispatcherHandler.java(推送到 Kafka)
package com.example.gw.netty.handler; import com.example.gw.model.DeviceMessage; import com.example.gw.mq.KafkaProducer; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import io.netty.channel.*; @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class BusinessDispatcherHandler extends SimpleChannelInboundHandler<DeviceMessage> { private final KafkaProducer producer; private final ObjectMapper mapper = new ObjectMapper(); @Value("${gateway.kafkaTopic}") private String topic; @Override protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) throws Exception { // 这里可以做协议转 JSON 的映射(示例直接吐原字段) String json = mapper.writeValueAsString(msg); producer.send(topic, msg.getDeviceId(), json); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.warn("Dispatcher error", cause); ctx.close(); } }
mq/KafkaProducer.java
package com.example.gw.mq; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Slf4j @Component @RequiredArgsConstructor public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public void send(String topic, String key, String value) { kafkaTemplate.send(topic, key, value) .whenComplete((res, ex) -> { if (ex != null) { log.warn("Kafka send fail: {}", ex.getMessage()); } else { RecordMetadata md = res.getRecordMetadata(); log.debug("Kafka ok topic={} partition={} offset={}", md.topic(), md.partition(), md.offset()); } }); } }
util/Bytes.java(占位:你可以放 CRC、BCE、JT808 转义等)
package com.example.gw.util; public class Bytes { public static int u8(byte b) { return b & 0xFF; } }
logback-spring.xml(可选)
<configuration> <springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="iot-gateway"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder><pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n</pattern></encoder> </appender> <root level="INFO"><appender-ref ref="CONSOLE"/></root> </configuration>
如何跑起来
起 Redis、Kafka
application.yml
按你机器改好端口/IPmvn spring-boot:run
(可选)前置 Nginx/LVS 用 一致性 hash(deviceId) 转发到多台网关,确保“同一设备优先落到同一台机”。
Nginx 简单示例(stream 层 TCP 转发):
stream { upstream gw_hash { hash $remote_addr consistent; # 生产建议自行接入支持按deviceId哈希的四层LB server 10.0.0.11:7001; server 10.0.0.12:7001; server 10.0.0.13:7001; } server { listen 7001; proxy_pass gw_hash; } }
伸缩&高可用要点(别跳过)
一台机承载连接:合理配置
ulimit -n
、开启epoll
、直连内存+堆外缓冲可到 10~15 万长连接/台(看内存/网卡/内核参数)。多机扩:前置 LB(IP Hash / Consistent Hash)+ Redis 的
deviceId→host
映射 + 心跳 TTL 续期。宕机迁移:网关挂了 TCP 断开 → 设备自动重连 → 打到其它网关 → 注册时覆盖 Redis host 即完成接管。
业务解耦:Netty 只做“接入+协议解析+转 Kafka”,下游微服务消费 Kafka 做业务。
反向下发:业务若要下发,先查 Redis 看设备在哪台网关,再通过网关本地 ChannelManager 找 Channel 写回(或走 RPC/网关间转发)。
如果你要换成 JT808:
把
FrameDecoder / MessageDecoder / MessageEncoder
换成 808 的转义/校验/拆包实现;AuthRegisterHandler
处理 0x0100 注册应答、0x0002 心跳等;BusinessDispatcherHandler
按 808 消息 ID 把 GPS(0x0200)/报警等转 Kafka。
行,给你一份能落地的方案包:
两套 Nginx:A. 纯 L4 一致性哈希(最稳) 、B. 按 deviceId 做一致性哈希(OpenResty/Lua,适合强粘性)
扩容与过亿设备分层架构建议
宕机/容灾做法
阿里云部署实操路线(GA/NLB/ACK/ECS)
架构总览(推荐分层)
(全球/全国接入) [Alibaba Cloud GA] ——TCP——> [NLB(多AZ, TCP, ProxyProtocol v2)] │ TCP 四层透传 │ [Nginx Stream 层 × N(可横向扩)] │ L4 源IP一致性哈希 或 L7 解析后按 deviceId 一致性哈希 │ [Netty Gateway 池 × M(多AZ)] │ [Redis Cluster 租约/路由] │ [Kafka/Pulsar]
NLB负责公网/跨 AZ/自动 HC;Nginx 层做一致性哈希与连接扇出;Netty承接长连(10–15 万/台);Redis做
deviceId→host
租约 + 心跳 TTL;Kafka/Pulsar落后端。过亿设备量级:假设 12 万/台,需 ~830 台 Netty;Nginx 与 NLB均可横向扩(连接是端到端驻留,Nginx只做内网转发)。
A) 纯 L4 源 IP 一致性哈希(开箱即用,最稳)
nginx.conf
(含注解)# ===== 全局:进程与FD ===== worker_processes auto; # 跟随CPU核数 worker_rlimit_nofile 2000000; # 打开文件上限,配合系统limits events { worker_connections 200000; # 每worker最大并发连接 use epoll; multi_accept on; } stream { # ===== 上游:网关池(多AZ IP)===== upstream gw_pool { # 一致性哈希:最小迁移,节点增删重映射少 hash $binary_remote_addr consistent; # 按“源IP”粘性 # 网关实例池(示例) server 10.0.1.11:9000 max_fails=3 fail_timeout=5s; server 10.0.2.12:9000 max_fails=3 fail_timeout=5s; server 10.0.3.13:9000 max_fails=3 fail_timeout=5s; # 持续横向扩:直接追加 server 行,reload 即生效 } # ===== 接入Server:入口7000(给NLB指向)===== server { # reuseport把监听负载分散到多个内核队列(Linux 3.9+) listen 7000 reuseport proxy_protocol; # 打开PROXY协议保留真实源IP(NLB需启PPv2) proxy_protocol on; proxy_connect_timeout 3s; # 首次连接握手超时 proxy_timeout 86400s; # 长连接保活 proxy_pass gw_pool; # 交由一致性哈希上游 # TCP层健康检查:开源版 stream 无“主动HC” # 依靠NLB对本server端口做健康检查;本处用被动: max_fails/fail_timeout # 若必须主动HC,可编译第三方 stream_upstream_check_module 或用Nginx Plus。 # 也可部署两个Nginx层+互相做旁路HC切换(keepalived)。 } }
说明
优点:配置简单,极稳当;一致性哈希对扩缩容迁移最小。
缺点:NAT 下“同出口IP一大群设备”会粘在同一网关,分布可能偏斜。如果你国家级分布,通常已足够均衡;若强制均衡,见 B 方案。
B) 按 deviceId 做一致性哈希(OpenResty/Lua,强粘性,精细均衡)
原理:在 stream 子系统里用
preread_by_lua*
读取首帧(长度+JSON),解析出deviceId
,把它放入变量$hash_key
,然后hash $hash_key consistent;
nginx.conf
(OpenResty 版,含注解)worker_processes auto; worker_rlimit_nofile 2000000; events { worker_connections 200000; use epoll; multi_accept on; } stream { lua_socket_log_errors off; lua_package_path "/usr/local/openresty/lualib/?.lua;;"; # 解析首包:协议固定为 [4-byte BE length][JSON] lua_shared_dict tmp 10m; # 上游:对 $hash_key 做一致性哈希 upstream gw_pool_by_dev { hash $hash_key consistent; server 10.0.1.11:9000 max_fails=3 fail_timeout=5s; server 10.0.2.12:9000 max_fails=3 fail_timeout=5s; server 10.0.3.13:9000 max_fails=3 fail_timeout=5s; } server { listen 7000 reuseport proxy_protocol; proxy_protocol on; proxy_connect_timeout 3s; proxy_timeout 86400s; # 在代理前读取首包,取出 deviceId,设置 $hash_key preread_by_lua_block { local sock = ngx.req.socket(true) -- downstream (client) socket sock:settimeouts(1000, 0, 0) -- 1s 读超时 -- 读取4字节长度 local len_buf, err = sock:peek(4) if not len_buf or #len_buf < 4 then return ngx.exit(ngx.ERROR) end local b1,b2,b3,b4 = string.byte(len_buf,1,4) local len = b1*16777216 + b2*65536 + b3*256 + b4 if len <= 0 or len > 1024*64 then return ngx.exit(ngx.ERROR) end -- 再探测读取 payload(不消耗缓冲,保持给后端) local frame, err2 = sock:peek(4 + len) if not frame or #frame < (4+len) then return ngx.exit(ngx.ERROR) end local json = string.sub(frame, 5) -- 去掉4字节长度 local cjson = require "cjson.safe" local obj = cjson.decode(json) if not obj or obj.type ~= "auth" or not obj.deviceId then return ngx.exit(ngx.ERROR) end -- 设置哈希键(变量默认空串,必须赋值) ngx.var.hash_key = obj.deviceId } # 交给上游(按deviceId一致性哈希) proxy_pass gw_pool_by_dev; } }
说明
优势:设备层面强粘性 + 更均衡(避免大NAT出口倾斜)。
注意:必须确保首帧就是鉴权JSON(你的 Netty 项目已使用此约定)。
性能:OpenResty 的
peek
+ 常量 JSON 解析开销很小(µs 级),可水平扩多个 Nginx 实例分担。
扩容与“不断加设备”的方法论
水平扩展步骤(无中断)
先扩 Netty 池:新增 ECS/ACK 实例,加入
gw_pool
(A 方案按 IP、B 方案按 deviceId)。上游 reload:
nginx -t && nginx -s reload
(连接不丢,端到端保留)。观察迁移量:一致性哈希仅搬迁小部分连接(理论约 = 新权重 / 新总权重)。
再扩 Nginx 层(若 Nginx 层的连接数或带宽/中断队列逼近上限):加 ECS,挂到 NLB;NLB 健康后开始分流。
跨地域:加一个 Region 的整套 NLB+Nginx+Netty,前面套 GA 做最优路由。
容量估算(经验值,非极限)
Netty:10–15 万/台(你已有参数);
Nginx(OpenResty) :单台稳定 50–100 万 TCP 透传连接(足够,且可横扩);
NLB:云侧承载千万级连接没压力,按地域和账限扩容;
Redis Cluster:只保存活跃设备 Key(带 TTL),内存按 1–2KB/Key 粗估,分片水平扩。
宕机&容灾(节点挂了怎么办)
网关(Netty)挂:
-
不再续租 →
dev:{id}
TTL 到期;设备心跳/重连 → 由 NLB/Nginx 一致性哈希到其它健康网关;新网关
REG
(Lua 原子)→ 成为新宿主,并通过PUB/SUB
向旧宿主发kick
(如果旧还存活)。
Nginx 节点挂:
-
NLB 健康检查失败 → 流量自动摘除;剩余 Nginx 继续转发;
连接驻留在端到端 TCP:穿过 Nginx 的连接会断(该 Nginx 为中间跳点)。多台 Nginx 并行可把影响面摊薄。
Redis 主挂:
-
Redis Sentinel/Cluster 自动选主;网关端幂等续租/注册;Lua 保证路由原子性。
单 AZ 故障:
-
NLB/GA 跨 AZ/Region;上游池中跨 AZ/MIX的网关节点;
业务端“快速重连 + 指数退避**”容错。
阿里云部署实操(建议路线)
1) 网络与计算
VPC + 多可用区子网;每个 AZ 放 NLB 实例与 ECS/ACK(Nginx 层、Netty 层)。
ECS 机型:选 c7/g7 等新代实例,25Gbps/40Gbps 网卡、多队列(RSS) ;开启 Enhanced Network。
安全组:放行 7000(NLB→Nginx)、9000(Nginx→Netty)、Redis/Kafka 内网端口;限制来源。
2) 入口层
Global Accelerator (GA) (可选):跨地域就近接入 + 智能路由;监听 TCP 7000。
NLB(Network Load Balancer) :
-
监听 TCP 7000,后端指向 Nginx ECS;
开启 Proxy Protocol v2(保留源IP,后端 Nginx 配置
proxy_protocol on
)。健康检查:TCP/应用端口。
3) 转发层(Nginx/OpenResty)
部署为 ECS AutoScaling 组(ESS) ,镜像内置 Nginx 配置模板与守护。
A 方案:部署开源 Nginx;B 方案:部署 OpenResty。
系统参数:
nofile=2,000,000
、somaxconn=65535
、合理内核缓冲;日志:仅记 error/access 的摘要,落 SLS(日志服务) 。
4) 网关层(Netty)
可用 ACK(容器服务K8s版) 部署 StatefulSet,也可 ECS 裸机 Systemd。
开启 EPC/NUMA、直连内存、epoll;你的 Netty 项目已经准备好。
HPA/ESS:按连接数/CPU/队列水位扩缩容;灰度放量。
5) Redis/Kafka
Redis:选 云数据库Redis版(主从/集群) ,AOF 开;避免自建坑。
Kafka:选 消息队列 Kafka 版,跨 AZ 部署;上报落库/消费分析。
运维要点与自动化
动态上游:
-
开源 Nginx 不支持原生 API 动态 upstream,建议:
或使用 NGINX Plus/Envoy(原生API/EDS)做动态后端。
把
upstream gw_pool*.conf
拆分到conf.d/upstreams/*.conf
,扩缩容由脚本/Operator 写入/删除 server 行,
nginx -t && nginx -s reload
零停机生效。
容量告警:
-
Nginx:
active connections
、accepts/handled/drop
、worker_conns
接近上限报警;Netty:FD使用率、写缓冲水位、事件循环队列长度、心跳失败率;
Redis:
keyspace hits/misses
、过期速率、eval QPS、延迟;链路:NLB 5xx/丢包、GA健康端口状态。
内核/NIC:
-
ethtool -G rx/tx 4096
、启用gro
(小包密集可评估关闭 lro)、txqueuelen 20000
;中断打散:
irqbalance
或手工smp_affinity
。
你接下来怎么做(最小可行路线)
先上 A 方案(L4 源IP一致性哈希):足够扛量,配置最稳。
如果看到 NAT 倾斜(某些网关连接偏多),再切 B 方案(OpenResty 按 deviceId 哈希),把倾斜打散。
前面加 NLB(或 GA+NLB)做公网入口与健康剔除;Nginx、Netty 两层都可水平扩。
持续接入设备:只要加机器 + reload,一致性哈希只搬小部分流量,无感扩容。
宕机:交给 NLB 健康检查+Redis 租约自愈,无需人工干预。