分布式微服务系统架构第161集:过亿设备架构-AI生成架构图

发布于:2025-08-11 ⋅ 阅读:(15) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

https://github.com/webVueBlog/fastapi_plus

https://webvueblog.github.io/JavaPlusDoc/

点击勘误issues,哪吒感谢大家的阅读

1. 正常情况下(网关没挂)

假设有一台设备,ID 是 ABC123

  • 它连接到 网关1 (192.168.1.10)

  • 网关1 会在 Redis 里写一条记录:

key   = device:gateway:ABC123
value = 192.168.1.10
TTL   = 120秒
  • 网关1 每收到设备的心跳,就 把这条记录的 TTL 续回 120 秒(就像闹钟一直推迟)。

  • Nginx 用 一致性哈希,保证这个设备始终连到 网关1


2. 网关宕机后会发生什么

假设 网关1 突然断电 / 崩溃

  1. 设备的 TCP 连接断了

  • 因为网关死了,设备发的数据没人收,连接会超时或直接被操作系统断开。

  • Redis 里的 key 不再续期

    • 原来网关1 每次收到心跳就延长 TTL

    • 现在它挂了,所以 TTL 会一直往下掉,直到 120 秒后过期

  • 设备尝试重新连接

    • 设备断开后会按程序设置重连,重新通过域名 iot-gateway.example.com:9000 去连 Nginx。


    3. Nginx 的一致性哈希重新分配

    • Nginx 收到设备的重新连接请求时,会用设备的 IP 去算一个哈希值,然后决定转发到哪台网关。

    • 由于 网关1 已经宕机,Nginx 会自动把流量转到 网关2 或 网关3

    • 这样,设备就会直接连到新的网关。


    4. 新网关接管设备

    • 新网关(比如网关2)收到设备连接时,会重新在 Redis 写入映射

    key   = device:gateway:ABC123
    value = 192.168.1.11
    TTL   = 120秒
    • 后端业务(通过 Kafka)就会继续收到来自这个设备的数据,整个过程对业务几乎是无感的。


    5. 过程总结(小白版)

    可以想象成:

    1. Redis 像一个登记簿,记录“哪个设备在哪个网关”;

    2. 网关像前台接待,会不断去登记簿上续签(心跳);

    3. 网关死了 → 不再续签 → 登记过期 → 设备被 Nginx 重新安排到另一个前台(新网关);

    4. 新网关继续接待,并重新登记。

    61ed5027-a9eb-45e4-a658-2b67b76ddcf3.png

    狠活儿来了⚙️——这是一个可跑的 Spring Boot + Netty 接入网关模板,带上 Redis(连接映射 & 心跳续期) 和 Kafka(消息分发) 。你拿过去改下协议解码就能用。

    目录结构

    iot-gateway/
    ├─ pom.xml
    ├─ src/main/java/com/example/gw/
    │  ├─ GatewayApplication.java
    │  ├─ config/
    │  │  ├─ NettyProperties.java
    │  │  ├─ KafkaConfig.java
    │  │  └─ RedisConfig.java
    │  ├─ netty/
    │  │  ├─ NettyServer.java
    │  │  ├─ initializer/GatewayInitializer.java
    │  │  ├─ codec/FrameDecoder.java
    │  │  ├─ codec/MessageDecoder.java
    │  │  ├─ codec/MessageEncoder.java
    │  │  └─ handler/
    │  │     ├─ AuthRegisterHandler.java
    │  │     ├─ HeartbeatHandler.java
    │  │     └─ BusinessDispatcherHandler.java
    │  ├─ service/
    │  │  ├─ ChannelManager.java
    │  │  ├─ DeviceRouterService.java
    │  │  └─ HeartbeatRenewService.java
    │  ├─ mq/KafkaProducer.java
    │  ├─ model/
    │  │  ├─ DeviceMessage.java
    │  │  └─ Consts.java
    │  └─ util/Bytes.java
    └─ src/main/resources/
       ├─ application.yml
       └─ logback-spring.xml

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.example</groupId>
      <artifactId>iot-gateway</artifactId>
      <version>1.0.0</version>
      <properties>
        <java.version>17</java.version>
        <spring.boot.version>3.3.1</spring.boot.version>
        <netty.version>4.1.110.Final</netty.version>
      </properties>
      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type><scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
      <dependencies>
        <!-- Spring Boot -->
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency>
    
        <!-- Netty -->
        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency>
    
        <!-- Kafka -->
        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
    
        <!-- Redis -->
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
    
        <!-- utils -->
        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin>
          <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId>
            <configuration><source>${java.version}</source><target>${java.version}</target></configuration>
          </plugin>
        </plugins>
      </build>
    </project>

    application.yml

    server:
      port: 8080
    
    netty:
      port: 7001
      bossThreads: 1
      workerThreads: 8
      backlog: 1024
      soRcvbuf: 1048576
      soSndbuf: 1048576
      soReuseaddr: true
      soKeepalive: true
      writeBufWaterMarkLow: 33554432    # 32MB
      writeBufWaterMarkHigh: 67108864   # 64MB
      idleReaderSeconds: 90
    
    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        producer:
          acks: all
          retries: 3
          linger-ms: 5
          batch-size: 32768
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
      data:
        redis:
          host: 127.0.0.1
          port: 6379
          lettuce:
            pool:
              max-active: 64
              max-idle: 32
              min-idle: 8
    
    gateway:
      kafkaTopic: device.upstream
      redisDeviceKeyPrefix: device_gateway:
      host: ${HOSTNAME:gw-1}   # 当前网关标识(或 IP)
    
    logging:
      level:
        root: info
        com.example.gw: info

    GatewayApplication.java

    package com.example.gw;
    
    import com.example.gw.netty.NettyServer;
    import lombok.RequiredArgsConstructor;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import jakarta.annotation.PostConstruct;
    
    @SpringBootApplication
    @RequiredArgsConstructor
    public class GatewayApplication {
        private final NettyServer nettyServer;
    
        public static void main(String[] args) {
            // 重要:提升文件句柄数(Linux: ulimit -n 1000000)
            SpringApplication.run(GatewayApplication.class, args);
        }
    
        @PostConstruct
        public void startNetty() {
            nettyServer.start();
        }
    }

    config/NettyProperties.java

    package com.example.gw.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "netty")
    public class NettyProperties {
      private int port;
      private int bossThreads;
      private int workerThreads;
      private int backlog;
      private int soRcvbuf;
      private int soSndbuf;
      private boolean soReuseaddr;
      private boolean soKeepalive;
      private int writeBufWaterMarkLow;
      private int writeBufWaterMarkHigh;
      private int idleReaderSeconds;
    }

    config/RedisConfig.java

    package com.example.gw.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    @Configuration
    public class RedisConfig {
      @Bean
      public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory factory) {
        return new StringRedisTemplate(factory);
      }
    }

    config/KafkaConfig.java

    package com.example.gw.config;
    
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class KafkaConfig {
      // Spring Boot autoconfig 已足够;如需拦截器、分区器可在此扩展
    }

    netty/NettyServer.java

    package com.example.gw.netty;
    
    import com.example.gw.config.NettyProperties;
    import com.example.gw.netty.initializer.GatewayInitializer;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.*;
    import io.netty.channel.epoll.Epoll;
    import io.netty.channel.epoll.EpollEventLoopGroup;
    import io.netty.channel.epoll.EpollServerSocketChannel;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.ResourceLeakDetector;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class NettyServer {
    
      private final NettyProperties props;
      private final GatewayInitializer initializer;
    
      public void start() {
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
    
        boolean epoll = Epoll.isAvailable();
        EventLoopGroup boss = epoll ? new EpollEventLoopGroup(props.getBossThreads())
                                    : new NioEventLoopGroup(props.getBossThreads());
        EventLoopGroup worker = epoll ? new EpollEventLoopGroup(props.getWorkerThreads())
                                      : new NioEventLoopGroup(props.getWorkerThreads());
    
        try {
          ServerBootstrap b = new ServerBootstrap()
              .group(boss, worker)
              .channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
              .option(ChannelOption.SO_BACKLOG, props.getBacklog())
              .option(ChannelOption.SO_REUSEADDR, props.isSoReuseaddr())
              .childOption(ChannelOption.SO_KEEPALIVE, props.isSoKeepalive())
              .childOption(ChannelOption.SO_RCVBUF, props.getSoRcvbuf())
              .childOption(ChannelOption.SO_SNDBUF, props.getSoSndbuf())
              .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                  new WriteBufferWaterMark(props.getWriteBufWaterMarkLow(), props.getWriteBufWaterMarkHigh()))
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
              .childHandler(initializer);
    
          ChannelFuture f = b.bind(props.getPort()).sync();
          log.info("Netty gateway started on port {}", props.getPort());
          f.channel().closeFuture().addListener(cf -> {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
          });
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          log.error("Netty start interrupted", e);
        } catch (Exception e) {
          log.error("Netty start failed", e);
          boss.shutdownGracefully();
          worker.shutdownGracefully();
        }
      }
    }

    initializer/GatewayInitializer.java

    package com.example.gw.netty.initializer;
    
    import com.example.gw.config.NettyProperties;
    import com.example.gw.netty.codec.FrameDecoder;
    import com.example.gw.netty.codec.MessageDecoder;
    import com.example.gw.netty.codec.MessageEncoder;
    import com.example.gw.netty.handler.AuthRegisterHandler;
    import com.example.gw.netty.handler.HeartbeatHandler;
    import com.example.gw.netty.handler.BusinessDispatcherHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.timeout.IdleStateHandler;
    import lombok.RequiredArgsConstructor;
    import org.springframework.stereotype.Component;
    
    @Component
    @RequiredArgsConstructor
    public class GatewayInitializer extends ChannelInitializer<SocketChannel> {
    
      private final NettyProperties props;
      private final AuthRegisterHandler authRegisterHandler;
      private final HeartbeatHandler heartbeatHandler;
      private final BusinessDispatcherHandler dispatcherHandler;
    
      @Override
      protected void initChannel(SocketChannel ch) {
        ch.pipeline()
          // 拆包:根据协议做帧界定(示例长度前缀/分隔符/固定头尾,自选)
          .addLast("frameDecoder", new FrameDecoder())
          // 协议解码(把ByteBuf→DeviceMessage)
          .addLast("msgDecoder", new MessageDecoder())
          // 协议编码(下行时DeviceMessage→ByteBuf)
          .addLast("msgEncoder", new MessageEncoder())
          // 空闲检测(读空闲触发心跳处理/断开)
          .addLast("idle", new IdleStateHandler(props.getIdleReaderSeconds(), 0, 0))
          // 注册鉴权(首包携带deviceId等)
          .addLast("auth", authRegisterHandler)
          // 心跳
          .addLast("hb", heartbeatHandler)
          // 分发到Kafka
          .addLast("dispatcher", dispatcherHandler);
      }
    }

    codec/FrameDecoder.java(示例:简单定界或长度前缀,自己换成 JT808 也行)

    package com.example.gw.netty.codec;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    /** 示例:长度前缀(2字节) + body */
    public class FrameDecoder extends ByteToMessageDecoder {
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 2) return;
        in.markReaderIndex();
        int len = in.readUnsignedShort();
        if (in.readableBytes() < len) {
          in.resetReaderIndex();
          return;
        }
        ByteBuf frame = in.readRetainedSlice(len);
        out.add(frame);
      }
    }

    codec/MessageDecoder.java(把帧→业务对象)

    package com.example.gw.netty.codec;
    
    import com.example.gw.model.DeviceMessage;
    import com.example.gw.util.Bytes;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    
    import java.util.List;
    
    /** Demo 协议:| devLen(1) | devId(N) | msgType(1) | payload(剩余) | */
    public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
        int devLen = buf.readUnsignedByte();
        byte[] devBytes = new byte[devLen];
        buf.readBytes(devBytes);
        String deviceId = new String(devBytes);
    
        int msgType = buf.readUnsignedByte();
        byte[] payload = new byte[buf.readableBytes()];
        buf.readBytes(payload);
    
        DeviceMessage msg = new DeviceMessage();
        msg.setDeviceId(deviceId);
        msg.setMsgType(msgType);
        msg.setPayload(payload);
        msg.setTs(System.currentTimeMillis());
    
        out.add(msg);
      }
    }

    codec/MessageEncoder.java(下行需要时用)

    package com.example.gw.netty.codec;
    
    import com.example.gw.model.DeviceMessage;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class MessageEncoder extends MessageToByteEncoder<DeviceMessage> {
      @Override
      protected void encode(ChannelHandlerContext ctx, DeviceMessage msg, ByteBuf out) {
        byte[] dev = msg.getDeviceId().getBytes();
        out.writeShort(1 + dev.length + 1 + (msg.getPayload() == null ? 0 : msg.getPayload().length)); // length
        out.writeByte(dev.length);
        out.writeBytes(dev);
        out.writeByte(msg.getMsgType());
        if (msg.getPayload() != null) out.writeBytes(msg.getPayload());
      }
    }

    model/DeviceMessage.java

    package com.example.gw.model;
    
    import lombok.Data;
    
    @Data
    public class DeviceMessage {
      private String deviceId;
      private int msgType;         // 自定义或协议中的消息类型
      private byte[] payload;      // 原始负载
      private long ts;             // 网关接收时间
    }

    model/Consts.java

    package com.example.gw.model;
    
    public interface Consts {
      int TYPE_REGISTER = 0x01;
      int TYPE_HEARTBEAT = 0x02;
      int TYPE_GPS = 0x10;
    }

    service/ChannelManager.java(内存 + Redis 映射)

    package com.example.gw.service;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelId;
    import lombok.RequiredArgsConstructor;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    
    @Service
    @RequiredArgsConstructor
    public class ChannelManager {
      private final StringRedisTemplate redis;
    
      @Value("${gateway.redisDeviceKeyPrefix}")
      private String keyPrefix;
    
      @Value("${gateway.host}")
      private String host;
    
      // 本机内存:deviceId -> Channel
      private final Map<String, Channel> deviceChannel = new ConcurrentHashMap<>();
      // 反查:channelId -> deviceId
      private final Map<ChannelId, String> channelDevice = new ConcurrentHashMap<>();
    
      public void bind(String deviceId, Channel ch) {
        deviceChannel.put(deviceId, ch);
        channelDevice.put(ch.id(), deviceId);
        // 写入 Redis(60s 续期)
        redis.opsForValue().set(keyPrefix + deviceId, host, 60, TimeUnit.SECONDS);
      }
    
      public void renew(String deviceId) {
        redis.expire(keyPrefix + deviceId, 60, TimeUnit.SECONDS);
      }
    
      public void unbind(Channel ch) {
        String deviceId = channelDevice.remove(ch.id());
        if (deviceId != null) {
          deviceChannel.remove(deviceId);
          // 不立即删 Redis,交给 TTL 过期;也可显式删除:
          // redis.delete(keyPrefix + deviceId);
        }
      }
    
      public Channel getChannel(String deviceId) {
        return deviceChannel.get(deviceId);
      }
    
      public String getDeviceId(Channel ch) {
        return channelDevice.get(ch.id());
      }
    }

    service/DeviceRouterService.java(查 Redis 看设备在哪台网关)

    package com.example.gw.service;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    @RequiredArgsConstructor
    public class DeviceRouterService {
      private final StringRedisTemplate redis;
      @Value("${gateway.redisDeviceKeyPrefix}")
      private String keyPrefix;
    
      public String getGatewayHost(String deviceId) {
        return redis.opsForValue().get(keyPrefix + deviceId);
      }
    }

    service/HeartbeatRenewService.java(定时续期:可选)

    package com.example.gw.service;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Service;
    
    /** 如果设备心跳很频繁,可在收到心跳时直接续期,不需要此任务。此类仅演示。 */
    @Service
    @RequiredArgsConstructor
    public class HeartbeatRenewService {
      private final ChannelManager channelManager;
    
      @Scheduled(fixedDelay = 30000)
      public void renewAll() {
        // 可遍历已知 deviceId 做 redis TTL 续期(不建议大规模遍历,这里仅示例)
      }
    }

    netty/handler/AuthRegisterHandler.java(设备首包注册)

    package com.example.gw.netty.handler;
    
    import com.example.gw.model.Consts;
    import com.example.gw.model.DeviceMessage;
    import com.example.gw.service.ChannelManager;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @ChannelHandler.Sharable
    @RequiredArgsConstructor
    public class AuthRegisterHandler extends SimpleChannelInboundHandler<DeviceMessage> {
    
      private final ChannelManager channelManager;
    
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) {
        if (msg.getMsgType() == Consts.TYPE_REGISTER) {
          String deviceId = msg.getDeviceId();
          channelManager.bind(deviceId, ctx.channel());
          log.info("Device registered: {} via {}", deviceId, ctx.channel().remoteAddress());
          // TODO: 回应注册应答(按协议编码)
          // ctx.writeAndFlush(ackMessage);
          return; // 注册消息不再下发,直接吃掉
        }
        ctx.fireChannelRead(msg); // 交给下一个
      }
    
      @Override
      public void channelInactive(ChannelHandlerContext ctx) {
        channelManager.unbind(ctx.channel());
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Auth handler error", cause);
        ctx.close();
      }
    }

    netty/handler/HeartbeatHandler.java

    package com.example.gw.netty.handler;
    
    import com.example.gw.model.Consts;
    import com.example.gw.model.DeviceMessage;
    import com.example.gw.service.ChannelManager;
    import io.netty.channel.*;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @ChannelHandler.Sharable
    @RequiredArgsConstructor
    public class HeartbeatHandler extends SimpleChannelInboundHandler<DeviceMessage> {
      private final ChannelManager channelManager;
    
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) {
        if (msg.getMsgType() == Consts.TYPE_HEARTBEAT) {
          String deviceId = channelManager.getDeviceId(ctx.channel());
          if (deviceId != null) channelManager.renew(deviceId);
          // TODO: 心跳应答(可回包)
          return;
        }
        ctx.fireChannelRead(msg);
      }
    
      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent e) {
          log.info("Idle close: {}", ctx.channel().remoteAddress());
          ctx.close();
        } else {
          ctx.fireUserEventTriggered(evt);
        }
      }
    }

    netty/handler/BusinessDispatcherHandler.java(推送到 Kafka)

    package com.example.gw.netty.handler;
    
    import com.example.gw.model.DeviceMessage;
    import com.example.gw.mq.KafkaProducer;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import io.netty.channel.*;
    
    @Slf4j
    @Component
    @ChannelHandler.Sharable
    @RequiredArgsConstructor
    public class BusinessDispatcherHandler extends SimpleChannelInboundHandler<DeviceMessage> {
      private final KafkaProducer producer;
      private final ObjectMapper mapper = new ObjectMapper();
    
      @Value("${gateway.kafkaTopic}")
      private String topic;
    
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) throws Exception {
        // 这里可以做协议转 JSON 的映射(示例直接吐原字段)
        String json = mapper.writeValueAsString(msg);
        producer.send(topic, msg.getDeviceId(), json);
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Dispatcher error", cause);
        ctx.close();
      }
    }

    mq/KafkaProducer.java

    package com.example.gw.mq;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class KafkaProducer {
    
      private final KafkaTemplate<String, String> kafkaTemplate;
    
      public void send(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value)
            .whenComplete((res, ex) -> {
              if (ex != null) {
                log.warn("Kafka send fail: {}", ex.getMessage());
              } else {
                RecordMetadata md = res.getRecordMetadata();
                log.debug("Kafka ok topic={} partition={} offset={}", md.topic(), md.partition(), md.offset());
              }
            });
      }
    }

    util/Bytes.java(占位:你可以放 CRC、BCE、JT808 转义等)

    package com.example.gw.util;
    
    public class Bytes {
      public static int u8(byte b) { return b & 0xFF; }
    }

    logback-spring.xml(可选)

    <configuration>
      <springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="iot-gateway"/>
      <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder><pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n</pattern></encoder>
      </appender>
      <root level="INFO"><appender-ref ref="CONSOLE"/></root>
    </configuration>

    如何跑起来

    1. 起 Redis、Kafka

    2. application.yml 按你机器改好端口/IP

    3. mvn spring-boot:run

    4. (可选)前置 Nginx/LVS 用 一致性 hash(deviceId) 转发到多台网关,确保“同一设备优先落到同一台机”。

    Nginx 简单示例(stream 层 TCP 转发):

    stream {
      upstream gw_hash {
        hash $remote_addr consistent;  # 生产建议自行接入支持按deviceId哈希的四层LB
        server 10.0.0.11:7001;
        server 10.0.0.12:7001;
        server 10.0.0.13:7001;
      }
      server {
        listen 7001;
        proxy_pass gw_hash;
      }
    }

    伸缩&高可用要点(别跳过)

    • 一台机承载连接:合理配置 ulimit -n、开启 epoll、直连内存+堆外缓冲可到 10~15 万长连接/台(看内存/网卡/内核参数)。

    • 多机扩:前置 LB(IP Hash / Consistent Hash)+ Redis 的 deviceId→host 映射 + 心跳 TTL 续期。

    • 宕机迁移:网关挂了 TCP 断开 → 设备自动重连 → 打到其它网关 → 注册时覆盖 Redis host 即完成接管。

    • 业务解耦:Netty 只做“接入+协议解析+转 Kafka”,下游微服务消费 Kafka 做业务。

    • 反向下发:业务若要下发,先查 Redis 看设备在哪台网关,再通过网关本地 ChannelManager 找 Channel 写回(或走 RPC/网关间转发)。


    如果你要换成 JT808

    • 把 FrameDecoder / MessageDecoder / MessageEncoder 换成 808 的转义/校验/拆包实现;

    • AuthRegisterHandler 处理 0x0100 注册应答、0x0002 心跳等;

    • BusinessDispatcherHandler 按 808 消息 ID 把 GPS(0x0200)/报警等转 Kafka。

    行,给你一份能落地的方案包:

    • 两套 Nginx:A. 纯 L4 一致性哈希(最稳) 、B. 按 deviceId 做一致性哈希(OpenResty/Lua,适合强粘性)

    • 扩容与过亿设备分层架构建议

    • 宕机/容灾做法

    • 阿里云部署实操路线(GA/NLB/ACK/ECS)


    架构总览(推荐分层)

    (全球/全国接入)
    [Alibaba Cloud GA]  ——TCP——>  [NLB(多AZ, TCP, ProxyProtocol v2)]
                                       │
                                  TCP 四层透传
                                       │
                            [Nginx Stream 层 × N(可横向扩)]
                                       │
                    L4 源IP一致性哈希 或 L7 解析后按 deviceId 一致性哈希
                                       │
                             [Netty Gateway 池 × M(多AZ)]
                                       │
                               [Redis Cluster 租约/路由]
                                       │
                                   [Kafka/Pulsar]
    • NLB负责公网/跨 AZ/自动 HC;Nginx 层一致性哈希与连接扇出Netty承接长连(10–15 万/台);Redis做 deviceId→host 租约 + 心跳 TTL;Kafka/Pulsar落后端。

    • 过亿设备量级:假设 12 万/台,需 ~830 台 Netty;Nginx 与 NLB均可横向扩(连接是端到端驻留,Nginx只做内网转发)。


    A) 纯 L4 源 IP 一致性哈希(开箱即用,最稳)

    nginx.conf(含注解)

    # ===== 全局:进程与FD =====
    worker_processes auto;                 # 跟随CPU核数
    worker_rlimit_nofile 2000000;          # 打开文件上限,配合系统limits
    
    events {
        worker_connections  200000;        # 每worker最大并发连接
        use epoll;
        multi_accept on;
    }
    
    stream {
        # ===== 上游:网关池(多AZ IP)=====
        upstream gw_pool {
            # 一致性哈希:最小迁移,节点增删重映射少
            hash $binary_remote_addr consistent;     # 按“源IP”粘性
            # 网关实例池(示例)
            server 10.0.1.11:9000 max_fails=3 fail_timeout=5s;
            server 10.0.2.12:9000 max_fails=3 fail_timeout=5s;
            server 10.0.3.13:9000 max_fails=3 fail_timeout=5s;
            # 持续横向扩:直接追加 server 行,reload 即生效
        }
    
        # ===== 接入Server:入口7000(给NLB指向)=====
        server {
            # reuseport把监听负载分散到多个内核队列(Linux 3.9+)
            listen 7000 reuseport proxy_protocol;   # 打开PROXY协议保留真实源IP(NLB需启PPv2)
            proxy_protocol on;
    
            proxy_connect_timeout 3s;               # 首次连接握手超时
            proxy_timeout 86400s;                   # 长连接保活
            proxy_pass gw_pool;                     # 交由一致性哈希上游
    
            # TCP层健康检查:开源版 stream 无“主动HC”
            # 依靠NLB对本server端口做健康检查;本处用被动: max_fails/fail_timeout
            # 若必须主动HC,可编译第三方 stream_upstream_check_module 或用Nginx Plus。
            # 也可部署两个Nginx层+互相做旁路HC切换(keepalived)。
        }
    }

    说明

    • 优点:配置简单,极稳当;一致性哈希对扩缩容迁移最小

    • 缺点:NAT 下“同出口IP一大群设备”会粘在同一网关,分布可能偏斜。如果你国家级分布,通常已足够均衡;若强制均衡,见 B 方案。


    B) 按 deviceId 做一致性哈希(OpenResty/Lua,强粘性,精细均衡)

    原理:在 stream 子系统里用 preread_by_lua* 读取首帧(长度+JSON),解析出 deviceId,把它放入变量 $hash_key,然后 hash $hash_key consistent;

    nginx.conf(OpenResty 版,含注解)

    worker_processes auto;
    worker_rlimit_nofile 2000000;
    
    events { worker_connections 200000; use epoll; multi_accept on; }
    
    stream {
        lua_socket_log_errors off;
        lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    
        # 解析首包:协议固定为 [4-byte BE length][JSON]
        lua_shared_dict tmp 10m;
    
        # 上游:对 $hash_key 做一致性哈希
        upstream gw_pool_by_dev {
            hash $hash_key consistent;
            server 10.0.1.11:9000 max_fails=3 fail_timeout=5s;
            server 10.0.2.12:9000 max_fails=3 fail_timeout=5s;
            server 10.0.3.13:9000 max_fails=3 fail_timeout=5s;
        }
    
        server {
            listen 7000 reuseport proxy_protocol;
            proxy_protocol on;
            proxy_connect_timeout 3s;
            proxy_timeout 86400s;
    
            # 在代理前读取首包,取出 deviceId,设置 $hash_key
            preread_by_lua_block {
                local sock = ngx.req.socket(true)     -- downstream (client) socket
                sock:settimeouts(1000, 0, 0)          -- 1s 读超时
                -- 读取4字节长度
                local len_buf, err = sock:peek(4)
                if not len_buf or #len_buf < 4 then
                    return ngx.exit(ngx.ERROR)
                end
                local b1,b2,b3,b4 = string.byte(len_buf,1,4)
                local len = b1*16777216 + b2*65536 + b3*256 + b4
                if len <= 0 or len > 1024*64 then
                    return ngx.exit(ngx.ERROR)
                end
                -- 再探测读取 payload(不消耗缓冲,保持给后端)
                local frame, err2 = sock:peek(4 + len)
                if not frame or #frame < (4+len) then
                    return ngx.exit(ngx.ERROR)
                end
                local json = string.sub(frame, 5)     -- 去掉4字节长度
                local cjson = require "cjson.safe"
                local obj = cjson.decode(json)
                if not obj or obj.type ~= "auth" or not obj.deviceId then
                    return ngx.exit(ngx.ERROR)
                end
                -- 设置哈希键(变量默认空串,必须赋值)
                ngx.var.hash_key = obj.deviceId
            }
    
            # 交给上游(按deviceId一致性哈希)
            proxy_pass gw_pool_by_dev;
        }
    }

    说明

    • 优势:设备层面强粘性 + 更均衡(避免大NAT出口倾斜)。

    • 注意:必须确保首帧就是鉴权JSON(你的 Netty 项目已使用此约定)。

    • 性能:OpenResty 的 peek + 常量 JSON 解析开销很小(µs 级),可水平扩多个 Nginx 实例分担。


    扩容与“不断加设备”的方法论

    水平扩展步骤(无中断)

    1. 先扩 Netty 池:新增 ECS/ACK 实例,加入 gw_pool(A 方案按 IP、B 方案按 deviceId)。

    2. 上游 reloadnginx -t && nginx -s reload(连接不丢,端到端保留)。

    3. 观察迁移量:一致性哈希仅搬迁小部分连接(理论约 = 新权重 / 新总权重)。

    4. 再扩 Nginx 层(若 Nginx 层的连接数或带宽/中断队列逼近上限):加 ECS,挂到 NLB;NLB 健康后开始分流。

    5. 跨地域:加一个 Region 的整套 NLB+Nginx+Netty,前面套 GA 做最优路由。

    容量估算(经验值,非极限)

    • Netty:10–15 万/台(你已有参数);

    • Nginx(OpenResty) :单台稳定 50–100 万 TCP 透传连接(足够,且可横扩);

    • NLB:云侧承载千万级连接没压力,按地域和账限扩容;

    • Redis Cluster:只保存活跃设备 Key(带 TTL),内存按 1–2KB/Key 粗估,分片水平扩


    宕机&容灾(节点挂了怎么办)

    • 网关(Netty)挂

      • 不再续租 → dev:{id} TTL 到期;设备心跳/重连 → 由 NLB/Nginx 一致性哈希到其它健康网关

      • 新网关 REG(Lua 原子)→ 成为新宿主,并通过 PUB/SUB 向旧宿主发 kick(如果旧还存活)。

    • Nginx 节点挂

      • NLB 健康检查失败 → 流量自动摘除;剩余 Nginx 继续转发;

      • 连接驻留在端到端 TCP:穿过 Nginx 的连接会断(该 Nginx 为中间跳点)。多台 Nginx 并行可把影响面摊薄。

    • Redis 主挂

      • Redis Sentinel/Cluster 自动选主;网关端幂等续租/注册;Lua 保证路由原子性。

    • 单 AZ 故障

      • NLB/GA 跨 AZ/Region;上游池中跨 AZ/MIX的网关节点;

      • 业务端“快速重连 + 指数退避**”容错。


    阿里云部署实操(建议路线)

    1) 网络与计算

    • VPC + 多可用区子网;每个 AZ 放 NLB 实例与 ECS/ACK(Nginx 层、Netty 层)。

    • ECS 机型:选 c7/g7 等新代实例,25Gbps/40Gbps 网卡、多队列(RSS) ;开启 Enhanced Network

    • 安全组:放行 7000(NLB→Nginx)、9000(Nginx→Netty)、Redis/Kafka 内网端口;限制来源。

    2) 入口层

    • Global Accelerator (GA) (可选):跨地域就近接入 + 智能路由;监听 TCP 7000。

    • NLB(Network Load Balancer) :

      • 监听 TCP 7000,后端指向 Nginx ECS

      • 开启 Proxy Protocol v2(保留源IP,后端 Nginx 配置 proxy_protocol on)。

      • 健康检查:TCP/应用端口。

    3) 转发层(Nginx/OpenResty)

    • 部署为 ECS AutoScaling 组(ESS) ,镜像内置 Nginx 配置模板与守护。

    • A 方案:部署开源 Nginx;B 方案:部署 OpenResty。

    • 系统参数nofile=2,000,000somaxconn=65535、合理内核缓冲;

    • 日志:仅记 error/access 的摘要,落 SLS(日志服务) 。

    4) 网关层(Netty)

    • 可用 ACK(容器服务K8s版) 部署 StatefulSet,也可 ECS 裸机 Systemd。

    • 开启 EPC/NUMA、直连内存、epoll;你的 Netty 项目已经准备好。

    • HPA/ESS:按连接数/CPU/队列水位扩缩容;灰度放量。

    5) Redis/Kafka

    • Redis:选 云数据库Redis版(主从/集群) ,AOF 开;避免自建坑。

    • Kafka:选 消息队列 Kafka 版,跨 AZ 部署;上报落库/消费分析。


    运维要点与自动化

    • 动态上游

      • 开源 Nginx 不支持原生 API 动态 upstream,建议:

      • 或使用 NGINX Plus/Envoy(原生API/EDS)做动态后端。

    1. 把 upstream gw_pool*.conf 拆分到 conf.d/upstreams/*.conf

    2. 扩缩容由脚本/Operator 写入/删除 server 行,

    3. nginx -t && nginx -s reload 零停机生效。

  • 容量告警

    • Nginx:active connectionsaccepts/handled/dropworker_conns 接近上限报警;

    • Netty:FD使用率、写缓冲水位、事件循环队列长度、心跳失败率;

    • Redis:keyspace hits/misses、过期速率、eval QPS、延迟;

    • 链路:NLB 5xx/丢包、GA健康端口状态。

  • 内核/NIC

    • ethtool -G rx/tx 4096、启用 gro(小包密集可评估关闭 lro)、txqueuelen 20000

    • 中断打散:irqbalance 或手工 smp_affinity


    你接下来怎么做(最小可行路线)

    1. 先上 A 方案(L4 源IP一致性哈希):足够扛量,配置最稳。

    2. 如果看到 NAT 倾斜(某些网关连接偏多),再切 B 方案(OpenResty 按 deviceId 哈希),把倾斜打散。

    3. 前面加 NLB(或 GA+NLB)做公网入口与健康剔除;Nginx、Netty 两层都可水平扩

    4. 持续接入设备:只要加机器 + reload,一致性哈希只搬小部分流量,无感扩容

    5. 宕机:交给 NLB 健康检查+Redis 租约自愈,无需人工干预。


网站公告

今日签到

点亮在社区的每一天
去签到