Mqtt使用笔记

发布于:2025-08-01 ⋅ 阅读:(20) ⋅ 点赞:(0)

环境:JDK17 Springboot 3.2.4

  • 引入jar包
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-stream</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  • 配置MqttClient bean
    @Bean
    public MqttClient mqttClient() throws MqttException {
        //new MemoryPersistence()确保文件paho文件不会生成,或者new MqttDefaultFilePersistence(dir)自定义文件生成的路径
        MqttClient client = new MqttClient("tcp://mqtt.xxxx.cn:1883", MqttClient.generateClientId(), new MemoryPersistence());
        client.setCallback(new MqttCallbackListener());
        client.connect(mqttConnectOptions());
        client.subscribe(MqttTopicEnum.TOPIC_GENERIC);
        return client;
    }

    /**
     * 连接参数
     */
    private MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("账号");
        options.setPassword("密码".toCharArray());
        options.setServerURIs(new String[]{"tcp://mqtt.xxxx.cn:1883"});
        options.setConnectionTimeout(30);
        options.setKeepAliveInterval(60);
        options.setAutomaticReconnect(true);
        //cleanSession 为false时会持久化会话,重连时会使用之前的会话信息包括未处理的消息,设置为true时重连后开启的是新的会话
        options.setCleanSession(true);
        return options;
    }
  • 发布消息
   /**
     * 发布消息 - MQTT
     * 参数:
     * qos 表示服务质量,0 最多一次消息可能送达一次也可能无法送达;1 至少一次,可能出现重复;2 恰好一次确保每条消息仅被接收一次
     * qos 服务质量越高越耗性能
     * retained 是否将当前topic最新的数据保留(其它数据来了会覆盖)用于新订阅者能立即获取到最新状态数据
     * mqttClient.getTopic("topic").publish().waitForCompletion() 手动确认消息发送成功, 关注mqttClient.publish方法源码可发现调用
     */
    @Override
    public void publish(Object message, String topic) {
        if (SystemUtil.isNull(message)) {
            return;
        }
        String data = message instanceof String value ? value : JSON.toJSONString(message);
        byte[] payload = data.getBytes(StandardCharsets.UTF_8);
        try {
            mqttClient.publish(topic, payload, 0, false);
        } catch (Exception e) {
            log.warn("MQTT发送消息失败【{}】【{}】", topic, data, e);
        }
    }
  • 接收消息
public class MqttCallbackListener implements MqttCallbackExtended {

    /**
     * 连接成功订阅主题
     */
    @Override
    public void connectComplete(boolean reconnect, String uri) {
        log.info("MQTT连接成功【{}】【{}】", (reconnect ? "重连" : "直连"), uri);
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.warn("MQTT连接断开", throwable);
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        try {
            String data = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
            log.info("MQTT收到主题为【{}】的数据:【{}】", topic, data);
        } catch (Exception e) {
            log.warn("MQTT数据处理发生异常", e);
        }
    }

    @SneakyThrows(MqttException.class)
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String data = new String(token.getMessage().getPayload(), StandardCharsets.UTF_8);
        String clientId = token.getClient().getClientId();
        log.info("MQTT发送消息成功【{}】【{}】", clientId, data);
    }
}

注意:JDK 21 会有不同后续跟进


网站公告

今日签到

点亮在社区的每一天
去签到