浅谈Spring Boot MQTT功能并实现手动连接操作

发布于:2025-02-19 ⋅ 阅读:(123) ⋅ 点赞:(0)

在Spring Boot中使用自身的MQTT功能并实现手动连接操作,可以通过以下步骤实现。以下是基于搜索结果中的信息整理的实现方法:

1. 添加依赖

pom.xml文件中添加Spring Boot的MQTT依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 创建MQTT配置类

创建一个配置类MqttConfiguration,用于加载MQTT连接的相关配置信息:

package com.example.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {
    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setServerURIs(new String[]{hostUrl});
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setAutomaticReconnect(true); // 自动重连
        factory.setConnectionOptions(options);
        return factory;
    }

    // Getters and Setters
}

3. 创建MQTT客户端工具类

创建一个工具类MqttClientUtil,用于手动管理MQTT连接、订阅和发布消息:

package com.example.util;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqttClientUtil {
    private MqttClient client;

    @Autowired
    private MqttConfiguration mqttConfig;

    public void connect() throws Exception {
        if (client != null && client.isConnected()) {
            System.out.println("客户端已连接,无需重复连接。");
            return;
        }

        client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId(), new MemoryPersistence());
        MqttConnectOptions options = mqttConfig.getMqttClientFactory().getConnectionOptions();
        client.connect(options);
        System.out.println("MQTT连接成功:" + mqttConfig.getHostUrl());
    }

    public void subscribe(String topic, int qos) throws MqttException {
        if (!client.isConnected()) {
            throw new IllegalStateException("未连接到MQTT服务器");
        }
        client.subscribe(topic, qos);
        System.out.println("订阅主题成功:" + topic);
    }

    public void publish(String topic, String message, int qos) throws MqttException {
        if (!client.isConnected()) {
            throw new IllegalStateException("未连接到MQTT服务器");
        }
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        client.publish(topic, mqttMessage);
        System.out.println("消息发布成功:" + topic + " -> " + message);
    }

    public void disconnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
            System.out.println("MQTT连接已断开");
        }
    }
}

4. 配置文件

application.properties中配置MQTT参数:

spring.mqtt.username=your-username
spring.mqtt.password=your-password
spring.mqtt.hostUrl=tcp://broker.hivemq.com:1883
spring.mqtt.clientId=client-id
spring.mqtt.timeout=10
spring.mqtt.keepalive=60

5. 创建测试接口

创建一个测试接口MqttController,用于测试MQTT连接、订阅和发布消息:

package com.example.controller;

import com.example.util.MqttClientUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/mqtt")
public class MqttController {
    @Autowired
    private MqttClientUtil mqttClientUtil;

    @GetMapping("/connect")
    public String connect() {
        try {
            mqttClientUtil.connect();
            return "MQTT连接成功";
        } catch (Exception e) {
            return "连接失败:" + e.getMessage();
        }
    }

    @GetMapping("/subscribe")
    public String subscribe(@RequestParam String topic) {
        try {
            mqttClientUtil.subscribe(topic, 1);
            return "订阅成功:" + topic;
        } catch (Exception e) {
            return "订阅失败:" + e.getMessage();
        }
    }

    @GetMapping("/publish")
    public String publish(@RequestParam String topic, @RequestParam String message) {
        try {
            mqttClientUtil.publish(topic, message, 1);
            return "消息发布成功:" + topic + " -> " + message;
        } catch (Exception e) {
            return "发布失败:" + e.getMessage();
        }
    }

    @GetMapping("/disconnect")
    public String disconnect() {
        try {
            mqttClientUtil.disconnect();
            return "MQTT连接已断开";
        } catch (Exception e) {
            return "断开失败:" + e.getMessage();
        }
    }
}

6. 测试

启动Spring Boot应用后,可以通过以下方式测试MQTT功能:

连接到MQTT服务器

GET http://localhost:8080/mqtt/connect

订阅主题

GET http://localhost:8080/mqtt/subscribe?topic=test/topic

发布消息

GET http://localhost:8080/mqtt/publish?topic=test/topic&message=Hello%20MQTT

断开连接

GET http://localhost:8080/mqtt/disconnect


网站公告

今日签到

点亮在社区的每一天
去签到