ThingsBoard MQTT 连接认证过程 源码分析+图例

发布于:2024-06-03 ⋅ 阅读:(252) ⋅ 点赞:(0)

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService

@PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context, false))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        if (sslEnabled) {
            b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MqttTransportServerInitializer(context, true))
                    .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
            sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
        }
        log.info("Mqtt transport started!");
    }

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandler

void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            ctx.close();
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        if (CONNECT.equals(msg.fixedHeader().messageType())) {
            processConnect(ctx, (MqttConnectMessage) msg);
        } else if (deviceSessionCtx.isProvisionOnly()) {
            processProvisionSessionMsg(ctx, msg);
        } else {
            enqueueRegularSessionMsg(ctx, msg);
        }
    }

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandler

void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());
        String userName = msg.payload().userName();
        String clientId = msg.payload().clientIdentifier();
        deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));
        if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
            deviceSessionCtx.setProvisionOnly(true);
            ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));
        } else {
            X509Certificate cert;
            if (sslHandler != null && (cert = getX509Certificate()) != null) {
                processX509CertConnect(ctx, cert, msg);
            } else {
                processAuthTokenConnect(ctx, msg);
            }
        }
    }

private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
        String userName = connectMessage.payload().userName();
        log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);
        TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder()
                .setClientId(connectMessage.payload().clientIdentifier());
        if (userName != null) {
            request.setUserName(userName);
        }
        byte[] passwordBytes = connectMessage.payload().passwordInBytes();
        if (passwordBytes != null) {
            String password = new String(passwordBytes, CharsetUtil.UTF_8);
            request.setPassword(password);
        }
        transportService.process(DeviceTransportType.MQTT, request.build(),
                new TransportServiceCallback<>() {
                    @Override
                    public void onSuccess(ValidateDeviceCredentialsResponse msg) {
                        onValidateDeviceResponse(msg, ctx, connectMessage);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                        ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
                        ctx.close();
                    }
                });
    }

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);
        if (messagesStats != null) {
            messagesStats.incrementTotal();
        }
        // 将消息发送给消息队列topic是tb_transport.api.requests
        requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
            @Override
            public void onSuccess(TbQueueMsgMetadata metadata) {
                if (messagesStats != null) {
                    messagesStats.incrementSuccessful();
                }
                log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);
            }

            @Override
            public void onFailure(Throwable t) {
                if (messagesStats != null) {
                    messagesStats.incrementFailed();
                }
                pendingRequests.remove(requestId);
                future.setException(t);
            }
        });
    }
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer

@Override
    public List<T> poll(long durationInMillis) {
        if (subscribed) {
            @SuppressWarnings("unchecked")
            List<T> messages = partitions
                    .stream()
                    .map(tpi -> {
                        try {
                            return storage.get(tpi.getFullTopicName());
                        } catch (InterruptedException e) {
                            if (!stopped) {
                                log.error("Queue was interrupted.", e);
                            }
                            return Collections.emptyList();
                        }
                    })
                    .flatMap(List::stream)
                    .map(msg -> (T) msg).collect(Collectors.toList());
            if (messages.size() > 0) {
                return messages;
            }
            try {
                Thread.sleep(durationInMillis);
            } catch (InterruptedException e) {
                if (!stopped) {
                    log.error("Failed to sleep.", e);
                }
            }
        }
        return Collections.emptyList();
    }

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
        return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());
    }


org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {
    // tb_transport.api.requests
    @Value("${queue.transport_api.requests_topic}")
    private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService

// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
        DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
        if (credentials != null && credentials.getCredentialsType() == credentialsType) {
            return getDeviceInfo(credentials);
        } else {
            return getEmptyTransportApiResponseFuture();
        }
    }

 

2.2.6 消费消息流程图

3、总结