1. 引入POM文件
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> <!-- 请检查最新版本 --> </dependency>
2. 编写配置Java文件
@Configuration
public class MqttConfig {
@Value("${mqtt.broker.url}")
private String brokerUrl;
@Value("${mqtt.broker.clientId}")
private String clientId;
@Value("${mqtt.broker.username}")
private String username;
@Value("${mqtt.broker.password}")
private String password;
@Value("${mqtt.broker.topic}")
private String topic;
@Value("${mqtt.broker.qos}")
private int qos;
@Autowired
private MqttMessageListenerService mqttMessageListenerService;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient mqttClient = new MqttClient(brokerUrl, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
// 暂时不连接 后续用到可以连接
mqttClient.connect(connOpts);
mqttClient.subscribe(topic, qos, mqttMessageListenerService);
return mqttClient;
}
@Bean
public MqttTopic mqttTopic(MqttClient mqttClient) {
return mqttClient.getTopic(topic);
}
}
3. 增加订阅的消费类
@Service @Slf4j public class MqttMessageListenerService implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("messageArrived Topic: " + topic + " Message: " + new String(mqttMessage.getPayload())); String messageStr = new String(mqttMessage.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageStr); // 创建 DateTimeFormatter 实例,并定义转换格式 log.info("MqttMessageListener end"); } }
4.测试消息发送
@Service @Slf4j public class MqttService { @Autowired private MqttClient mqttClient; @Autowired private MqttTopic mqttTopic; @Value("${mqtt.broker.qos:0}") private int qos; public void sendMessage(String messageContent) { try { MqttMessage message = new MqttMessage(messageContent.getBytes()); message.setQos(qos); // 设置QoS等级 mqttTopic.publish(message); } catch (Exception e) { e.printStackTrace(); } } public void publishMessage(String topic, String messageContent) { MqttMessage message = new MqttMessage(messageContent.getBytes()); message.setQos(qos); try { mqttClient.publish(topic, message); } catch (MqttException e) { log.error("publishMessage error", e); } } }