物联网协议之MQTT(二)服务端

发布于:2025-06-08 ⋅ 阅读:(18) ⋅ 点赞:(0)

参考文档:mica-mqtt
示例代码:Gitee仓库

  经过前两章的基础铺垫,基础概念就不再过多赘述,大家还有不清楚的,可以访问上面的参考文档,感谢春哥对MQTT的详细介绍,很有用!
   mica-mqtt目前已经加入dromara组织,不仅支持SpringBoot2SpringBoot3,对SolonJFinal都能支持,作为Java开发者对MQTT的学习有极大的帮助。

本章使用测试工具为 MQTTX ,可回看一章获取物联网协议之MQTT(一)基础概念和设备

一、依赖引入

<dependency>
    <groupId>org.dromara.mica-mqtt</groupId>
    <artifactId>mica-mqtt-server-spring-boot-starter</artifactId>
    <version>${最新版本}</version>
</dependency>

按照官方的依赖,大家可以引入以上依赖。但是,作者其实还有个加强版的(付费版)的组件。不过源码不开源,但是依赖可以免费用。

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqttx-server-spring-boot-starter</artifactId>
    <version>${最新版本}</version>
</dependency>

  使用上面的mqttx的依赖就行,比开源版的性能更好,但是接口跟开源版是一样的,完全可以用开源的文档进行使用。

下面贴一张可实现接口,大家可以通过实现下面的接口来自定义功能,下面我会写部分我常用的接口,以供大家参考。
mica-mqtt可实现接口

二、配置项

mqtt:
  server:
    enabled: true               # 是否开启服务端,默认:true
#    ip: 0.0.0.0                # 服务端 ip 默认为空,0.0.0.0,建议不要设置
    port: 1883                  # 端口,默认:1883
    name: Mica-Mqtt-Server      # 名称,默认:Mica-Mqtt-Server
    heartbeat-timeout: 120000   # 心跳超时,单位毫秒,默认: 1000 * 120
    read-buffer-size: 8KB       # 接收数据的 buffer size,默认:8k
    max-bytes-in-message: 10MB  # 消息解析最大 bytes 长度,默认:10M
    auth:
      enable: false             # 是否开启 mqtt 认证
      username: mica            # mqtt 认证用户名
      password: mica            # mqtt 认证密码
    debug: true                 # 如果开启 prometheus 指标收集建议关闭
    stat-enable: true           # 开启指标收集,debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存
    proxy-protocol-enable: false   # 代理协议支持,nginx 可开启 tcp proxy_protocol on; 时转发源 ip 信息。2.4.1 版本开始支持
    web-port: 8083              # http、websocket 端口,默认:8083
    websocket-enable: true      # 是否开启 websocket,默认: true
    http-enable: false          # 是否开启 http api,默认: false
    http-basic-auth:
      enable: false             # 是否开启 http basic auth,默认: false
      username: mica            # http basic auth 用户名
      password: mica            # http basic auth 密码
    ssl:                        # mqtt tcp ssl 认证
      enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证
      keystore-path:            # 必须参数:ssl keystore 目录,支持 classpath:/ 路径。
      keystore-pass:            # 必选参数:ssl keystore 密码
      truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
      truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码
      client-auth: none         # 是否需要客户端认证(双向认证),默认:NONE(不需要)

以上是官网上默认的配置,基本不用改动。


强烈建议大家改一下用户认证,就是把 mqtt.server.auth.enable = true 就行,下面的用户名、密码最好改一下。另外,用户名、密码理论上是没有限制的,但是并不代表采集设备配置服务端没有限制,所以建议大家密码可以有一定的复杂度,但是别离谱,尤其是不常用的符号就免了吧!


物联网平台虽然不想常规的web平台,但是没有认证的设备接入也是有一定风险性的,简单设置一下就能提升安全性,绝对值!

三、认证服务

实现IMqttServerAuthHandler 接口来完成此功能。大家可以把这个理解为微信加好友,大家肯定是不想收到各种各样的垃圾消息,只有通过好友请求(认证)的才允许相互通信。

3.1 接口实现

  个人认为这个功能是一定一定要有的,这样才能保证平台的安全险。
  当然不是一定要实现这个接口,比如上面通过配置把用户名、密码设置好,就是基本的认证功能。当然,实现接口可以实现更复杂的逻辑,比如:①不在系统的客户端不接受连接;②可以设计一套黑名单,拒绝部分IP或客户端连接。

@Configuration(proxyBeanMethods = false)
@Slf4j
@RequiredArgsConstructor
public class MqttAuthListener implements IMqttServerAuthHandler {

    @Value("${mqtt.server.auth.username}")
    private String authUserName;
    @Value("${mqtt.server.auth.password}")
    private String authPassword;

    @Override
    public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
        // 下面我们简单以配置文件的用户名、密码作为检验,你也可以自定义逻辑,甚至每个设备有单独的用户名、密码
        boolean auth = authUserName.equals(userName) && authPassword.equals(password);
        if (!auth) {
            log.error("客户端:{}用户名或密码不正确,不能连接。用户名:{},密码:{}", clientId, userName, password);
            return false;
        }
        if (StrUtil.isBlank(clientId)) {
            log.error("客户端:{}为空,不能连接", clientId);
            return false;
        }

        // TODO 比如可以从缓存中判断clientId是否存在,不存在的可以不接收
        // TODO 比如我们可以判断clientId是否被拉黑
        // TODO 比如我们可以通过ChannelContext判断IP是否被拉黑

        return true;
    }
}

如果实现此接口,配置文件里的用户名、密码就不生效了,会以实现接口的逻辑为准。

3.2 测试成功连接

使用MQTTX连接服务端,截图可能有很多次,如果clientId不同,别在意,看功能就行。
连接配置
客户端上线

3.3 测试用户名密码错误连接

修改用户名
服务端拒绝连接

四、客户端上下线监听

实现MqttConnectStatusListener 接口来完成此功能。大家可以理解为以前的QQ,有好友上线会有个通知。

  对于物联网平台,设备上下线是比较重要的,比如设备离线了,那么所有的监测就毫无意义。

4.1 接口实现

@Slf4j
@Component
@RequiredArgsConstructor
public class MqttConnectStatusListener implements IMqttConnectStatusListener {

    @Override
    public void online(ChannelContext context, String clientId, String username) {
        log.info("设备【{}】上线",  clientId);
    }

    @Override
    public void offline(ChannelContext context, String clientId, String username, String reason) {
        log.warn("设备【{}】离线",  clientId);
    }
}

官网上使用的@EventListener的方式来实现,我不是很习惯,就用的实现接口,大家喜欢的可以用一下

@Service
public class MqttConnectStatusListener {
	private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);

	@EventListener
	public void online(MqttClientOnlineEvent event) {
		logger.info("MqttClientOnlineEvent:{}", event);
	}

	@EventListener
	public void offline(MqttClientOfflineEvent event) {
		logger.info("MqttClientOfflineEvent:{}", event);
	}

}

4.2 上线测试

MQTTX连接
服务端上线

4.3 离线测试

MQTTX断开连接
服务端离线

五、消息接收

实现IMqttMessageListener 接口来完成此功能。大家可以理解给微信好友发消息。

5.1 接口实现

@Slf4j
@Component
@RequiredArgsConstructor
public class MqttServerMessageListener implements IMqttMessageListener {

    @Override
    public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qoS, MqttPublishMessage message) {
        String payload = ByteBufferUtil.toString(message.getPayload(), StandardCharsets.UTF_8);
        log.info("Topic:【{}】,收到客户端:【{}】消息:【{}】",  topic, clientId, payload);
    }
}

5.2 消息测试

消息发送
服务端接收消息

六、消息发送

使用MqttServerTemplate服务,实现服务端向客户端发送消息

6.1 代码实现

topic是必须的,如果你还是不了解topic是什么,建议先学习下。

@RestController
@RequiredArgsConstructor
public class TestPublishController {
    private final MqttServerTemplate mqttServerTemplate;

    @GetMapping("/test/publish")
    public void testPublish() {
        // 向所有订阅topic的客户端发送消息
        mqttServerTemplate.publishAll("/mqtt/testpublish", "mqtt server publishAll message".getBytes());
        // 向指定客户端发送消息
        mqttServerTemplate.publish("mqttx_f1e5c583", "/mqtt/testpublish", "mqtt server publish message".getBytes());
    }
}

6.2 客户端订阅topic

订阅topic

6.3 消息发布

浏览器调用6.1的接口

客户端收到消息

至此,MQTT服务端的基本操作已经完成!