RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它允许应用程序通过消息队列进行异步通信,提高系统的解耦性和扩展性。本文将展示一个简单的RabbitMQ生产者和消费者实现。
核心组件
1. 生产者(Producer.java)
生产者负责创建消息并将其发送到RabbitMQ队列:
package com.qcby.rabbitmq.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Produce {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); // RabbitMQ服务器IP
factory.setUsername("lql"); // 用户名
factory.setPassword("liu20020624."); // 密码
// 2. 建立连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 声明队列(如果不存在则创建)
channel.queueDeclare(
QUEUE_NAME, // 队列名称
false, // 是否持久化
false, // 是否独占
false, // 是否自动删除
null // 其他参数
);
// 4. 发送消息
String message = "hello world";
channel.basicPublish(
"", // 使用默认交换机
QUEUE_NAME, // 路由键(队列名称)
null, // 消息属性
message.getBytes() // 消息体
);
System.out.println("发送消息完毕");
}
}
}
关键点说明:
使用
ConnectionFactory
配置RabbitMQ连接queueDeclare()
创建队列(幂等操作)basicPublish()
发送消息到默认交换机使用try-with-resources自动关闭连接
2. 消费者(Consumer.java)
消费者监听队列并处理接收到的消息:
package com.qcby.rabbitmq.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂(同生产者)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");//换成你自己的ip地址
factory.setUsername("lql");
factory.setPassword("liu20020624.");
// 2. 建立连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 定义消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("收到消息: " + message);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
// 4. 开始消费消息
channel.basicConsume(
QUEUE_NAME, // 队列名称
true, // 自动确认
deliverCallback, // 消息处理回调
cancelCallback // 取消回调
);
}
}
关键点说明:
DeliverCallback
处理接收到的消息CancelCallback
处理消费中断情况basicConsume()
启动消息监听消费者需要保持运行状态以持续接收消息
工作流程
生产者工作流:
消费者工作流:
运行说明
启动顺序:
先启动消费者(保持运行状态)
再启动生产者(发送消息)
预期输出:
生产者控制台:
发送消息完毕
消费者控制台:
收到消息: hello world
常见问题解决
连接失败:
检查RabbitMQ服务状态:
rabbitmqctl status
验证防火墙设置(开放5672端口)
确认用户名/密码权限
消息未接收:
确保消费者在生产者之前启动
检查队列名称是否一致
验证网络连通性:
telnet <IP> 5672
SLF4J警告:
在pom.xml中添加日志实现依赖:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency>
这个示例展示了RabbitMQ最基本的消息传递模式。实际应用中,可以结合交换机、绑定键、不同消息确认模式等实现更复杂的消息路由和处理逻辑。
通过这个示例,您可以快速理解RabbitMQ的核心概念和工作原理。建议从简单队列开始,逐步探索更高级的功能如发布/订阅、路由、主题匹配等。