文章目录
前提条件
- 创建资源:MQTT实例、topic、group
- 获取AK、SK
- 安装IDE/VScode,本文以VScode为例
架构流程图
调用Java SDK收发消息
- 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 克隆示例代码至您指定的文件夹。
- 在vscode中,打开下载的代码,并确认pom.xml中已包含以下依赖。
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
我们主要是用绿色框中的两个文件+pom.xml文件
- 按代码注释说明填写上述两个文件的相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。示例代码如下。
package com.aliyun.openservices.lmq.example.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
/**
* 本代码提供签名鉴权模式下 MQ4IOT 客户端发送消息到 MQ4IOT 客户端的示例,其中初始化参数请根据实际情况修改
* 签名模式即使用阿里云账号系统提供的 AccessKey 和 SecretKey 对每个客户端计算出一个独立的签名供客户端识别使用。
* 对于实际业务场景使用过程中,考虑到私钥 SecretKey 的隐私性,可以将签名过程放在受信任的环境完成。
*
* 完整 demo 工程,参考https://github.com/AliwareMQ/lmq-demo
*
*/
public class MQ4IoTConsumerDemo {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT 实例 ID,购买后控制台获取
*/
String instanceId = "XXXXX"; // **输入你自己的实例ID**
/**
* 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
*/
String endPoint = "XXXXX"; // **输入自己的公网接入点**
/**
* 账号 accesskey,从账号系统控制台获取
* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
* 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV
* 例如:export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。
*/
String accessKey = System.getenv("MQTT_AK_ENV"); // **AK、SK使用环境变量,方式暴露**
/**
* 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
*/
String secretKey = System.getenv("MQTT_SK_ENV");
// 检查环境变量是否设置
if (accessKey == null || accessKey.isEmpty() || secretKey == null || secretKey.isEmpty()) {
System.err.println("环境变量MQTT_AK_ENV和MQTT_SK_ENV必须设置");
System.exit(1);
}
/**
* MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
*/
String clientId = "GID_XXXXX@@@XXXXX"; // **控制台创建的groupid,clientID自定义,consumer与provider需要设置不同的,这里consumer为demo1,provider为demo2**
/**
* MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
* 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。
*/
final String parentTopic = "XXXXX"; // **父级topic**
/**
* MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
* 需要注意的是,完整的 topic 参考 https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.554.21a37f05ynxokW。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密则设置ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 客户端设置好发送超时时间,防止无限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客户端连接成功后就需要尽快订阅需要的 topic
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
System.err.println("Connection lost: " + throwable.getMessage());
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制
* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
try {
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
System.out.println("MQTT client connected successfully");
// 添加关闭钩子以优雅地关闭连接
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
System.out.println("MQTT client disconnected");
}
executorService.shutdown();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (Exception e) {
System.err.println("Error while shutting down: " + e.getMessage());
}
}));
// 保持程序运行
while (true) {
Thread.sleep(1000);
}
} catch (MqttException e) {
System.err.println("Failed to connect to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
}
}
- 先运行consumer,再运行provider,成功后可以去MQTT控制台查看消息轨迹。
// Consumer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTConsumerDemo
// producer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTProducerDemo
启动如下图:
6. 控制台查看消息轨迹,输入group ID,与device ID,点击查询。
特别注意
本地IDE/VScode环境需要提前安装好,本地下载java和maven环境才可以执行成功,maven环境安装参考这篇文档。