【阿里云实战】基于MQTT的Java SDK收发消息-终端和终端消息收发

发布于:2025-09-02 ⋅ 阅读:(18) ⋅ 点赞:(0)

前提条件

  • 创建资源:MQTT实例、topic、group
  • 获取AK、SK
  • 安装IDE/VScode,本文以VScode为例

架构流程图

在这里插入图片描述

调用Java SDK收发消息

  1. 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  2. 克隆示例代码至您指定的文件夹。
  3. 在vscode中,打开下载的代码,并确认pom.xml中已包含以下依赖。
<dependencies>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
</dependencies>

我们主要是用绿色框中的两个文件+pom.xml文件
在这里插入图片描述

  1. 按代码注释说明填写上述两个文件的相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。示例代码如下。
package com.aliyun.openservices.lmq.example.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;

/**
 * 本代码提供签名鉴权模式下 MQ4IOT 客户端发送消息到 MQ4IOT 客户端的示例,其中初始化参数请根据实际情况修改
 * 签名模式即使用阿里云账号系统提供的 AccessKey 和 SecretKey 对每个客户端计算出一个独立的签名供客户端识别使用。
 * 对于实际业务场景使用过程中,考虑到私钥 SecretKey 的隐私性,可以将签名过程放在受信任的环境完成。
 *
 * 完整 demo 工程,参考https://github.com/AliwareMQ/lmq-demo
 * 
 */

public class MQ4IoTConsumerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 实例 ID,购买后控制台获取
         */
        String instanceId = "XXXXX"; // **输入你自己的实例ID**
        /**
         * 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "XXXXX";  // **输入自己的公网接入点**
        /**
         * 账号 accesskey,从账号系统控制台获取
         * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
         * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
         * 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV
         * 例如:export MQTT_AK_ENV=<access_key_id>
         *      export MQTT_SK_ENV=<access_key_secret>
         * 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");  // **AK、SK使用环境变量,方式暴露**
        /**
         * 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        
        // 检查环境变量是否设置
        if (accessKey == null || accessKey.isEmpty() || secretKey == null || secretKey.isEmpty()) {
            System.err.println("环境变量MQTT_AK_ENV和MQTT_SK_ENV必须设置");
            System.exit(1);
        }
        
        /**
         * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX"; // **控制台创建的groupid,clientID自定义,consumer与provider需要设置不同的,这里consumer为demo1,provider为demo2**
        /**
         * MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "XXXXX";  // **父级topic**
        /**
         * MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 参考 https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.554.21a37f05ynxokW。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; 

        /**
         * QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密则设置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                System.err.println("Connection lost: " + throwable.getMessage());
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        
        try {
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            System.out.println("MQTT client connected successfully");
            
            // 添加关闭钩子以优雅地关闭连接
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    if (mqttClient.isConnected()) {
                        mqttClient.disconnect();
                        System.out.println("MQTT client disconnected");
                    }
                    executorService.shutdown();
                    if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                        executorService.shutdownNow();
                    }
                } catch (Exception e) {
                    System.err.println("Error while shutting down: " + e.getMessage());
                }
            }));
            
            // 保持程序运行
            while (true) {
                Thread.sleep(1000);
            }
        } catch (MqttException e) {
            System.err.println("Failed to connect to MQTT broker: " + e.getMessage());
            e.printStackTrace();
        }
    }
}
  1. 先运行consumer,再运行provider,成功后可以去MQTT控制台查看消息轨迹。
// Consumer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTConsumerDemo

// producer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTProducerDemo

启动如下图:
在这里插入图片描述
在这里插入图片描述
6. 控制台查看消息轨迹,输入group ID,与device ID,点击查询。
在这里插入图片描述

特别注意

本地IDE/VScode环境需要提前安装好,本地下载java和maven环境才可以执行成功,maven环境安装参考这篇文档

参考文档:
快速使用MQTT的Java SDK收发消息(终端和终端消息收发)


网站公告

今日签到

点亮在社区的每一天
去签到