Mqtt实现消息发送及订阅

发布于:2024-12-08 ⋅ 阅读:(78) ⋅ 点赞:(0)

1. 引入POM文件

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version> <!-- 请检查最新版本 -->
</dependency>

2. 编写配置Java文件

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker.url}")
    private String brokerUrl;

    @Value("${mqtt.broker.clientId}")
    private String clientId;

    @Value("${mqtt.broker.username}")
    private String username;

    @Value("${mqtt.broker.password}")
    private String password;

    @Value("${mqtt.broker.topic}")
    private String topic;

    @Value("${mqtt.broker.qos}")
    private int qos;
    @Autowired
    private MqttMessageListenerService mqttMessageListenerService;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient mqttClient = new MqttClient(brokerUrl, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true);
        // 暂时不连接 后续用到可以连接
        mqttClient.connect(connOpts);
        mqttClient.subscribe(topic, qos, mqttMessageListenerService);
        return mqttClient;
    }

    @Bean
    public MqttTopic mqttTopic(MqttClient mqttClient) {
        return mqttClient.getTopic(topic);
    }
}

3. 增加订阅的消费类

@Service
@Slf4j
public class MqttMessageListenerService implements IMqttMessageListener {

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("messageArrived Topic: " + topic + " Message: " + new String(mqttMessage.getPayload()));
        String messageStr = new String(mqttMessage.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageStr);
        // 创建 DateTimeFormatter 实例,并定义转换格式
        log.info("MqttMessageListener end");
    }

}

 4.测试消息发送

@Service
@Slf4j
public class MqttService {

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private MqttTopic mqttTopic;

    @Value("${mqtt.broker.qos:0}")
    private int qos;

    public void sendMessage(String messageContent) {
        try {
            MqttMessage message = new MqttMessage(messageContent.getBytes());
            message.setQos(qos); // 设置QoS等级
            mqttTopic.publish(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void publishMessage(String topic, String messageContent) {
        MqttMessage message = new MqttMessage(messageContent.getBytes());
        message.setQos(qos);
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            log.error("publishMessage error", e);
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到