springboot集成mqtt【启动即连接服务器与订阅主题】

发布于:2024-10-08 ⋅ 阅读:(107) ⋅ 点赞:(0)

0. 碎碎念

    是向小白版的springboot中集成mqtt服务springboot集成mqtt(超级无敌详细)学习整理的。但是因为我想要弄成一个,不是项目一启动就连接服务器的,而且连接成功服务器就订阅好固定的主题,太菜了一直理不明白(主要是注入的问题一直有点晕)。
    目前这个是项目一启动就连接mqtt,然后连接成功后就可以订阅主题,这样。

1. 代码部分

    按我个人理解的顺序放的,创建项目直接创建的默认springboot项目,jdk默认用的17。

1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.selma</groupId>
    <artifactId>SpringBootMqttTest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootMqttTest</name>
    <description>SpringBootMqttTest</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 小辣椒 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- MQTT有关依赖 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2 application.yml

server:
  port: 8199

spring:
  application:
    name: SpringBootMqttTest

## MQTT配置
mqtt:
  host: tcp://broker.emqx.io:1883
  qos: 1
  clientId: mqtt_server_common
  timeout: 10
  keepalive: 20

1.3 domain.AjaxResult

    用来controller返回值用的

package com.selma.domain;

import java.util.HashMap;

public class AjaxResult extends HashMap<String, Object> {
    /**
     * 初始化一个新创建的 Message 对象
     */
    public AjaxResult() {
    }

    /**
     * 返回错误消息
     *
     * @return 错误消息
     */
    public static AjaxResult error() {
        return error(500, "操作失败");
    }

    /**
     * 返回错误消息
     *
     * @param msg 内容
     * @return 错误消息
     */
    public static AjaxResult error(String msg) {
        return error(500, msg);
    }

    /**
     * 返回错误消息
     *
     * @param code 错误码
     * @param msg  内容
     * @return 错误消息
     */
    public static AjaxResult error(int code, String msg) {
        AjaxResult json = new AjaxResult();
        json.put("code", code);
        json.put("msg", msg);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @param msg 内容
     * @return 成功消息
     */
    public static AjaxResult success(String msg) {
        AjaxResult json = new AjaxResult();
        json.put("msg", msg);
        json.put("code", 200);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @return 成功消息
     */
    public static AjaxResult success() {
        return AjaxResult.success("操作成功");
    }

    public static AjaxResult success(Object value) {
        return AjaxResult.successData(200, value);
    }

    public static AjaxResult successData(int code, Object value) {
        AjaxResult json = new AjaxResult();
        json.put("code", code);
        json.put("data", value);
        return json;
    }

    /**
     * 返回成功消息
     *
     * @param key   键值
     * @param value 内容
     * @return 成功消息
     */
    @Override
    public AjaxResult put(String key, Object value) {
        super.put(key, value);
        return this;
    }
}

1.4 mqtt.callback.MqttMessageListenerCommon

    回调方法综合类

package com.selma.mqtt.callback;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class MqttMessageListenerCommon implements MqttCallbackExtended {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("MQTT {} 连接成功,连接方式:{}", serverURI, reconnect ? "重连" : "直连");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt connectionLost 连接断开: {}", throwable.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("Message received from topic {} : {}", topic, new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}

1.5 mqtt.config.MqttConfiguration

    mqtt对象注入config

package com.selma.mqtt.config;

import com.selma.mqtt.callback.MqttMessageListenerCommon;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfiguration {
    @Value("${mqtt.host}")
    String mqttHostCommon;

    @Value("${mqtt.clientId}")
    String mqttClientIdCommon;

    @Bean
    public MqttClient mqttClientCommon() {
        try {
            MqttClient mqttClient = new MqttClient(mqttHostCommon, mqttClientIdCommon);
            mqttClient.setCallback(new MqttMessageListenerCommon());

            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true); // 设置为干净会话
            mqttClient.connect(options);
            return mqttClient;
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

1.6 mqtt.service.MqttService

    mqtt有关方法(目前只有我测试的)

package com.selma.mqtt.service;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;

@Service
@Slf4j
public class MqttService {

    @Resource
    private MqttClient mqttClientCommon;

    /**
     * 项目启动后就调用方法
     *
     * @throws MqttException
     */
    @PostConstruct
    public void defaultSubscribe() {
        try {
            //up是传输上来的信息
            this.subscribeCommon("selma/up", 0);
            this.publishMessageCommon("selma/down", "hello", 0);
//        } catch (MqttException mqttException) {
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 给commonClient订阅指定主题
     *
     * @param topic
     * @param qos
     */
    public void subscribeCommon(String topic, int qos) {
        try {
            mqttClientCommon.subscribe(topic, qos);
        } catch (MqttException mqttException) {
            mqttException.printStackTrace();
        }
    }

    /**
     * 给commonClient的指定主题发送指定消息
     *
     * @param topic
     * @param msg
     * @param qos
     */
    public void publishMessageCommon(String topic, String msg, int qos) {
        try {
            byte[] msgBytes = msg.getBytes();
            MqttMessage message = new MqttMessage(msgBytes);
            message.setQos(qos);
            mqttClientCommon.publish(topic, message);
        } catch (MqttException mqttException) {
            mqttException.printStackTrace();
        }
    }

}

1.7 mqtt.controller.MqttController

package com.selma.mqtt.controller;

import com.selma.domain.AjaxResult;
import com.selma.mqtt.service.MqttService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/selma/mqtt")
@Slf4j
public class MqttController {

    @Resource
    MqttService mqttService;

    @PostMapping("/subscribeCommon")
    public AjaxResult subscribeCommon(String topic) {
        mqttService.subscribeCommon(topic, 0);
        return AjaxResult.success();
    }

    @PostMapping("/publishMessageCommon")
    public AjaxResult publishMessageCommon(String topic, String msg) {
        mqttService.publishMessageCommon(topic, msg, 0);
        return AjaxResult.success();
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到