RabbitMQ入门:生产者和消费者示例

发布于:2025-08-18 ⋅ 阅读:(23) ⋅ 点赞:(0)

RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它允许应用程序通过消息队列进行异步通信,提高系统的解耦性和扩展性。本文将展示一个简单的RabbitMQ生产者和消费者实现。

核心组件

1. 生产者(Producer.java)

生产者负责创建消息并将其发送到RabbitMQ队列:

package com.qcby.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Produce {
    public static final String QUEUE_NAME = "hello";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1"); // RabbitMQ服务器IP
        factory.setUsername("lql");     // 用户名
        factory.setPassword("liu20020624."); // 密码
        
        // 2. 建立连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 3. 声明队列(如果不存在则创建)
            channel.queueDeclare(
                QUEUE_NAME,   // 队列名称
                false,        // 是否持久化
                false,        // 是否独占
                false,        // 是否自动删除
                null          // 其他参数
            );
            
            // 4. 发送消息
            String message = "hello world";
            channel.basicPublish(
                "",           // 使用默认交换机
                QUEUE_NAME,   // 路由键(队列名称)
                null,         // 消息属性
                message.getBytes() // 消息体
            );
            System.out.println("发送消息完毕");
        }
    }
}

关键点说明

  • 使用ConnectionFactory配置RabbitMQ连接

  • queueDeclare()创建队列(幂等操作)

  • basicPublish()发送消息到默认交换机

  • 使用try-with-resources自动关闭连接

2. 消费者(Consumer.java)

消费者监听队列并处理接收到的消息:

package com.qcby.rabbitmq.one;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static final String QUEUE_NAME = "hello";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂(同生产者)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");//换成你自己的ip地址
        factory.setUsername("lql");
        factory.setPassword("liu20020624.");
        
        // 2. 建立连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 3. 定义消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("收到消息: " + message);
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        
        // 4. 开始消费消息
        channel.basicConsume(
            QUEUE_NAME,   // 队列名称
            true,         // 自动确认
            deliverCallback, // 消息处理回调
            cancelCallback   // 取消回调
        );
    }
}

关键点说明

  • DeliverCallback处理接收到的消息

  • CancelCallback处理消费中断情况

  • basicConsume()启动消息监听

  • 消费者需要保持运行状态以持续接收消息

工作流程

  1. 生产者工作流

  2. 消费者工作流

运行说明

  1. 启动顺序

    • 先启动消费者(保持运行状态)

    • 再启动生产者(发送消息)

  2. 预期输出

    • 生产者控制台:发送消息完毕

    • 消费者控制台:收到消息: hello world

常见问题解决

  1. 连接失败

    • 检查RabbitMQ服务状态:rabbitmqctl status

    • 验证防火墙设置(开放5672端口)

    • 确认用户名/密码权限

  2. 消息未接收

    • 确保消费者在生产者之前启动

    • 检查队列名称是否一致

    • 验证网络连通性:telnet <IP> 5672

  3. SLF4J警告
    在pom.xml中添加日志实现依赖:

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.36</version>
    </dependency>

这个示例展示了RabbitMQ最基本的消息传递模式。实际应用中,可以结合交换机、绑定键、不同消息确认模式等实现更复杂的消息路由和处理逻辑。


        通过这个示例,您可以快速理解RabbitMQ的核心概念和工作原理。建议从简单队列开始,逐步探索更高级的功能如发布/订阅、路由、主题匹配等。


网站公告

今日签到

点亮在社区的每一天
去签到