MQTT 入门教程:三步从 Docker 部署到 Java 客户端实现

发布于:2025-08-07 ⋅ 阅读:(20) ⋅ 点赞:(0)

在物联网(IoT)与边缘计算快速发展的今天,设备间的高效通信成为核心需求。MQTT 作为一种轻量级的发布 / 订阅模式协议,凭借其低带宽占用、强稳定性和灵活的消息路由能力,已成为物联网通信的事实标准。无论是智能家居的设备联动、工业传感器的数据采集,还是车联网的实时信息交互,MQTT 都在其中扮演着关键角色。
本文将从零开始搭建:从使用 Docker 部署轻量级 MQTT 服务器(Broker),到基于 Java 语言实现完整的消息发布与订阅功能,通过清晰的步骤和可直接运行的代码,最短时间内搭建起自己的 MQTT 通信系统。

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布 / 订阅模式消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)、传感器网络和移动设备通信等场景。
核心概念

  • Broker:消息服务器,负责接收和转发所有消息
  • Publisher:消息发布者,发送消息到 Broker
  • Subscriber:消息订阅者,从 Broker 接收消息
  • Topic:消息主题,用于消息分类和路由
  • QoS (Quality of Service):服务质量等级,定义消息传递的可靠性
    -在这里插入图片描述

第一步:使用 Docker 部署 MQTT Broker

我们将使用 Eclipse Mosquitto,一个流行的开源 MQTT Broker。

1. 拉取 Mosquitto 镜像

docker pull eclipse-mosquitto

2. 创建配置文件

首先创建一个目录用于存放配置文件和数据:

mkdir -p ~/mosquitto/config ~/mosquitto/data ~/mosquitto/log

创建配置文件 mosquitto.conf:

nano ~/mosquitto/config/mosquitto.conf

添加以下内容:

persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
allow_anonymous true
listener 1883:MQTT 默认端口
allow_anonymous true:允许匿名连接(生产环境建议关闭)

3. 启动 Mosquitto 容器

docker run -d \
  --name mosquitto \
  -p 1883:1883 \
  -v ~/mosquitto/config:/mosquitto/config \
  -v ~/mosquitto/data:/mosquitto/data \
  -v ~/mosquitto/log:/mosquitto/log \
  eclipse-mosquitto

4. 验证 Broker 是否运行

docker ps | grep mosquitto

如果看到运行中的容器,说明 Broker 部署成功。

第二步:Java 客户端实现

我们将使用 Eclipse Paho Java 客户端库来实现 MQTT 客户端。

1. 添加依赖

如果使用 Maven,在pom.xml中添加:

<dependencies>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
</dependencies>

2. MQTT 工具类

首先创建一个工具类封装 MQTT 连接的通用功能:

//运行
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQTTUtils {
    // MQTT Broker地址
    private static final String BROKER = "tcp://localhost:1883";
    
    /**
     * 创建MQTT客户端并连接到Broker
     * @param clientId 客户端ID,应唯一
     * @return 已连接的MQTT客户端
     * @throws MqttException 连接异常
     */
    public static MqttClient connect(String clientId) throws MqttException {
        // 设置客户端持久化方式为内存
        MemoryPersistence persistence = new MemoryPersistence();
        
        // 创建客户端
        MqttClient client = new MqttClient(BROKER, clientId, persistence);
        
        // 配置连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true); // 清除会话
        connOpts.setConnectionTimeout(10); // 连接超时时间
        connOpts.setKeepAliveInterval(20); // 心跳间隔
        
        // 连接到Broker
        System.out.println("Connecting to broker: " + BROKER);
        client.connect(connOpts);
        System.out.println("Connected");
        
        return client;
    }
    
    /**
     * 发布消息
     * @param client MQTT客户端
     * @param topic 消息主题
     * @param content 消息内容
     * @param qos 服务质量等级 (0, 1, 2)
     * @throws MqttException 发布异常
     */
    public static void publish(MqttClient client, String topic, String content, int qos) throws MqttException {
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        client.publish(topic, message);
        System.out.println("Published message: " + content + " to topic: " + topic);
    }
    
    /**
     * 订阅主题
     * @param client MQTT客户端
     * @param topic 要订阅的主题
     * @param qos 服务质量等级
     * @throws MqttException 订阅异常
     */
    public static void subscribe(MqttClient client, String topic, int qos) throws MqttException {
        System.out.println("Subscribing to topic: " + topic);
        client.subscribe(topic, qos);
    }
}

3. 订阅者客户端实现

//运行
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MQTTSubscriber {
    // 订阅的主题
    private static final String TOPIC = "test/topic";
    // 客户端ID
    private static final String CLIENT_ID = "subscriber-client";
    // QoS等级
    private static final int QOS = 1;

    public static void main(String[] args) {
        MqttClient client = null;
        try {
            // 连接到Broker
            client = MQTTUtils.connect(CLIENT_ID);
            
            // 设置消息监听器
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("Received message on topic: " + topic);
                    System.out.println("Message content: " + new String(message.getPayload()));
                    System.out.println("QoS: " + message.getQos());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 对于订阅者来说,这个方法通常不需要实现
                }
            });
            
            // 订阅主题
            MQTTUtils.subscribe(client, TOPIC, QOS);
            
            // 保持客户端运行以接收消息
            System.out.println("Waiting for messages...");
            while (true) {
                Thread.sleep(1000);
            }
            
        } catch (MqttException | InterruptedException e) {
            System.err.println("Error: " + e.getMessage());
        } finally {
            if (client != null && client.isConnected()) {
                try {
                    client.disconnect();
                    System.out.println("Disconnected");
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4. 发布者客户端实现

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;

public class MQTTPublisher {
    // 发布的主题
    private static final String TOPIC = "test/topic";
    // 客户端ID
    private static final String CLIENT_ID = "publisher-client";
    // QoS等级
    private static final int QOS = 1;

    public static void main(String[] args) {
        MqttClient client = null;
        try {
            // 连接到Broker
            client = MQTTUtils.connect(CLIENT_ID);
            
            // 发布几条测试消息
            for (int i = 1; i <= 5; i++) {
                String message = "Hello, MQTT! This is message " + i;
                MQTTUtils.publish(client, TOPIC, message, QOS);
                Thread.sleep(2000); // 间隔2秒发送一条
            }
            
        } catch (MqttException | InterruptedException e) {
            System.err.println("Error: " + e.getMessage());
        } finally {
            if (client != null && client.isConnected()) {
                try {
                    client.disconnect();
                    System.out.println("Disconnected");
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

第三步:运行和测试

1. 启动订阅者

首先运行MQTTSubscriber类,它会连接到 Broker 并开始等待接收消息:

Connecting to broker: tcp://localhost:1883
Connected
Subscribing to topic: test/topic
Waiting for messages...

2. 启动发布者

然后运行MQTTPublisher类,它会发送 5 条消息到指定主题:

Connecting to broker: tcp://localhost:1883
Connected
Published message: Hello, MQTT! This is message 1 to topic: test/topic
Published message: Hello, MQTT! This is message 2 to topic: test/topic

3. 查看结果

在订阅者的控制台,你应该能看到接收到的消息:

Received message on topic: test/topic
Message content: Hello, MQTT! This is message 1
QoS: 1
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 2
QoS: 1

总结
本文介绍了 MQTT 的基本概念,展示了如何使用 Docker 快速部署 Mosquitto Broker,并通过 Java 代码实现了 MQTT 客户端的发布和订阅功能。


网站公告

今日签到

点亮在社区的每一天
去签到