java实现RabbitMQ消息发送和接收功能(包含测试)

发布于:2025-06-13 ⋅ 阅读:(20) ⋅ 点赞:(0)

以下是一个完整的Java类,同时包含RabbitMQ消息发送和接收功能,使用纯Java实现(非Spring Boot),包含Maven依赖:

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RabbitMQManager {

    private static final Logger logger = LoggerFactory.getLogger(RabbitMQManager.class);
    
    private final ConnectionFactory factory;
    private Connection connection;
    private Channel sendChannel;
    private Channel receiveChannel;
    private volatile boolean running = true;
    
    // 配置参数
    private final String host;
    private final int port;
    private final String username;
    private final String password;

    public RabbitMQManager(String host, int port, String username, String password) {
        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
    }

    /**
     * 初始化RabbitMQ连接和通道
     */
    public void initialize() throws IOException, TimeoutException {
        connection = factory.newConnection();
        logger.info("成功连接到RabbitMQ服务器: {}:{}", host, port);
        
        // 创建发送通道(用于feedback队列)
        sendChannel = connection.createChannel();
        // 声明发送队列(持久化)
        sendChannel.queueDeclare("feedback", true, false, false, null);
        logger.info("发送队列 'feedback' 已声明");
        
        // 创建接收通道(单独通道用于消费消息)
        receiveChannel = connection.createChannel();
    }

    /**
     * 发送消息到feedback队列
     * @param message 要发送的消息内容
     */
    public void sendToFeedbackQueue(String message) throws IOException {
        if (sendChannel == null || !sendChannel.isOpen()) {
            throw new IllegalStateException("发送通道未初始化或已关闭");
        }
        
        sendChannel.basicPublish(
            "", // 使用默认交换机
            "feedback", // 路由键为队列名
            MessageProperties.PERSISTENT_TEXT_PLAIN, // 设置消息持久化
            message.getBytes(StandardCharsets.UTF_8)
        );
        
        logger.info("消息已发送到feedback队列: {}", message);
    }

    /**
     * 开始监听指定队列的消息
     * @param queueName 要监听的队列名称
     */
    public void startListening(String queueName) throws IOException {
        if (receiveChannel == null || !receiveChannel.isOpen()) {
            throw new IllegalStateException("接收通道未初始化或已关闭");
        }
        
        // 声明队列(如果不存在则创建)
        receiveChannel.queueDeclare(queueName, true, false, false, null);
        logger.info("开始监听队列: {}", queueName);

        // 设置每次只接收一条消息(公平分发)
        receiveChannel.basicQos(1);

        // 创建消费者
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);
            
            try {
                // 处理消息
                processMessage(message, queueName);
                
                // 手动确认消息
                receiveChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (Exception e) {
                logger.error("消息处理失败", e);
                // 处理失败时拒绝消息(不重新入队)
                receiveChannel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }
        };

        // 取消消费者回调
        CancelCallback cancelCallback = consumerTag -> {
            logger.warn("消费者被取消: {}", consumerTag);
        };

        // 开始消费消息(关闭自动确认)
        receiveChannel.basicConsume(queueName, false, deliverCallback, cancelCallback);
        
        logger.info("监听器已启动,等待消息... (按CTRL+C停止)");

        // 保持程序运行
        try {
            while (running) {
                Thread.sleep(1000); // 防止CPU空转
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info("监听线程被中断");
        }
    }

    /**
     * 消息处理逻辑(可根据需要自定义)
     */
    private void processMessage(String message, String queueName) {
        logger.info("处理来自队列 [{}] 的消息: {}", queueName, message);
        
        // 示例:如果是feedback队列的消息,可以发送响应
        if ("feedback".equals(queueName)) {
            logger.info("处理feedback消息: {}", message);
        } else {
            // 其他队列的处理逻辑
            logger.info("处理来自 {} 队列的消息", queueName);
        }
        
        // 这里可以添加业务逻辑,例如:
        // - 解析JSON消息
        // - 调用其他服务
        // - 发送响应到另一个队列
    }

    /**
     * 停止监听并关闭资源
     */
    public void shutdown() {
        running = false;
        logger.info("停止监听...");
        closeResources();
    }

    private void closeResources() {
        try {
            if (sendChannel != null && sendChannel.isOpen()) {
                sendChannel.close();
            }
            if (receiveChannel != null && receiveChannel.isOpen()) {
                receiveChannel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
            logger.info("RabbitMQ连接已关闭");
        } catch (IOException | TimeoutException e) {
            logger.error("关闭资源时出错", e);
        }
    }

    public static void main(String[] args) {
        // 配置RabbitMQ连接参数
        String host = "localhost";
        int port = 5672;
        String username = "guest";
        String password = "guest";
        String listenQueue = "my_queue"; // 监听的队列名称
        
        RabbitMQManager manager = new RabbitMQManager(host, port, username, password);
        
        try {
            // 初始化连接
            manager.initialize();
            
            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(manager::shutdown));
            
            // 启动监听线程
            new Thread(() -> {
                try {
                    manager.startListening(listenQueue);
                } catch (IOException e) {
                    logger.error("监听失败", e);
                }
            }).start();
            
            // 主线程发送测试消息到feedback队列
            Thread.sleep(2000); // 等待监听器启动
            
            for (int i = 1; i <= 5; i++) {
                String message = "测试消息 #" + i;
                manager.sendToFeedbackQueue(message);
                Thread.sleep(1000);
            }
            
            // 保持程序运行
            while (manager.running) {
                Thread.sleep(1000);
            }
            
        } catch (IOException | TimeoutException | InterruptedException e) {
            logger.error("RabbitMQ操作失败", e);
            manager.shutdown();
        }
    }
}

Maven 依赖 (pom.xml)

<dependencies>
    <!-- RabbitMQ Java Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    
    <!-- SLF4J 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.7</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.7</version>
    </dependency>
</dependencies>

关键设计说明:

  1. 双通道设计

    • sendChannel:专门用于发送消息到feedback队列
    • receiveChannel:专门用于从其他队列接收消息
    • 分离通道避免发送和接收操作相互阻塞
  2. 消息发送功能

    public void sendToFeedbackQueue(String message) throws IOException {
        sendChannel.basicPublish(
            "", // 默认交换机
            "feedback", 
            MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化消息
            message.getBytes(StandardCharsets.UTF_8)
        );
    }
    
  3. 消息接收功能

    • 支持监听任意指定队列
    • 手动消息确认机制
    • 公平分发设置(basicQos(1))
  4. 资源管理

    • 统一的initialize()方法初始化连接
    • 优雅的shutdown()方法关闭资源
    • 使用Runtime shutdown hook确保程序退出时清理资源
  5. 线程模型

    • 主线程:发送消息到feedback队列
    • 单独线程:持续监听消息队列
    • 使用volatile running标志控制线程退出

使用说明:

  1. 运行程序

    mvn compile exec:java -Dexec.mainClass="RabbitMQManager"
    
  2. 测试流程

    • 程序启动后会自动初始化RabbitMQ连接
    • 启动监听线程监听my_queue队列
    • 主线程发送5条测试消息到feedback队列
    • 按CTRL+C停止程序
  3. 自定义配置

    • 修改main方法中的连接参数
    • 更改监听的队列名称
    • 在processMessage()方法中添加业务逻辑

扩展建议:

  1. 添加消息序列化

    // 发送对象
    public void sendObjectToFeedback(Object obj) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        oos.flush();
        byte[] bytes = bos.toByteArray();
        
        sendChannel.basicPublish("", "feedback", 
            MessageProperties.PERSISTENT_BASIC, bytes);
    }
    
  2. 添加JSON支持

    <!-- 添加Jackson依赖 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
    
    // 发送JSON
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(myObject);
    sendToFeedbackQueue(json);
    
    // 接收JSON
    MyObject obj = mapper.readValue(message, MyObject.class);
    
  3. 添加重连机制

    public void ensureConnected() throws IOException, TimeoutException {
        if (connection == null || !connection.isOpen()) {
            initialize();
        }
    }
    

这个实现提供了生产级别的RabbitMQ操作,包含:

  • 连接管理
  • 通道分离
  • 消息持久化
  • 手动确认
  • 错误处理
  • 优雅关闭
  • 完整的日志记录

您可以根据实际需求调整队列名称、消息处理逻辑和错误处理策略。