文章目录
十.RabbitMQ
10.1 简单队列实现
简单队列通常指的是一个基本的消息队列,它可以用于在生产者(生产消息的一方)和消费者(消费消息的一方)之间传递消息。
新创建Springboot项目
引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
连接工具类
public class ConnectionUtils
{
public static Connection getConnection()
{
try
{
Connection connection = null;
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务端地址(域名地址/ip)
factory.setHost("127.0.0.1");
//设置服务器端口号
factory.setPort(5672);
//设置虚拟主机(相当于数据库中的库)
factory.setVirtualHost("/");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
connection = factory.newConnection();
return connection;
}
catch (Exception e)
{
return null;
}
}
}
创建生产者
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 定义发送信息
String msg = "hello rabbitmq-kwh";
// 发送数据
channel.basicPublish("", "test4072", null, msg.getBytes());
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建消费者
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
}
};
// 监听队列
channel.basicConsume("test4072",true,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
10.2 Work 模式(工作队列)
工作队列的概念
- 工作队列模式:生产者将任务发送到队列中,多个消费者从队列中取出任务并并行处理。这意味着,多个消费者可以共同工作来处理同一个队列中的任务。
- 负载均衡:每个消费者只处理一个任务(消息),通过增加消费者数量,任务的处理可以并行化,提高整体处理能力。
工作队列的特点:
- 任务分配:RabbitMQ 将队列中的任务(消息)分配给可用的消费者,通常是按照“轮询”或“平衡”方式分配,即消费者可以公平地处理任务。
- 任务处理并行化:多个消费者可以并行地从同一个队列中消费消息,从而实现任务的并行处理,减轻单一消费者的负担。
- 消息丢失的风险低:通过合理配置队列和消息持久化机制,即使 RabbitMQ 重启,也能确保任务消息不丢失。
生产者
(只是在简单队列中的生产者中循环发送了信息。)
/**
* Work 模式(工作队列)
*/
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
for (int i = 0; i < 50; i++) {
// 定义发送信息
String msg = "hello rabbitmq-kwh"+i;
// 发送数据
channel.basicPublish("", "test4072", null, msg.getBytes());
Thread.sleep(1000);
}
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
}
};
// 监听队列
channel.basicConsume("test4072",true,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02
public class Consumer02 {
public static void main(String[] args) {
try {
System.out.println("======消费者02======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
}
};
// 监听队列
channel.basicConsume("test4072",true,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
. 消费者 1 与消费者 2 处理的数据条数一样。
. 消费者 1 偶数 ;消费者 2 奇数
这种方式叫轮询分发(Round-robin)。
10.3 公平分发
指消息被均匀地分配给多个消费者,以便各个消费者的负载大致相等。通过这种方式,RabbitMQ 旨在避免某些消费者过载而其他消费者空闲的情况。
在10.2 中,现在有 2 个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而 RabbitMQ 则是不了解这些的。这是因为当消息进入队列,RabbitMQ 就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
改造生产者
/*
同一时刻服务器只会发一条消息给消费者
1 限制发送给消费者不得超过一条消息
*/
channel.basicQos(1);
/**
* 公平分发
*/
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 创建队列
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
for (int i = 0; i < 50; i++) {
// 定义发送信息
String msg = "hello rabbitmq-kwh"+i;
// 发送数据
channel.basicPublish("", "test4072", null, msg.getBytes());
Thread.sleep(1000);
}
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
(在10.2 中消费者的基础上,只添加 channel.basicQos(1);
,限制每次只消费一个消息)
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//限制每次只消费一个消息
channel.basicQos(1);
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
String str = new String(body,"utf-8");
System.out.println(envelope.getDeliveryTag()+"msg==接收=="+str);
// 休眠一秒钟
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 手动确认消息
// 第一个参数:消息的序号,
// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 监听队列
// 自动应答设为 false
channel.basicConsume("test4072",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02
(同消费者01)
消费者 1 休眠 1 秒,消费者 2 休眠 2 秒。
分别设置接收消息数,手动反馈,关闭自动应答
10.4 RabbitMQ 消息应答与消息持久化
消息应答
概念
**消息应答(ack)**是 RabbitMQ 中一个重要的机制,用于保证消息在被消费者处理后得以正确确认,确保消息不会丢失。如果消费者成功处理了消息,应该发送一个确认应答;如果消费者遇到问题或失败,则可以选择拒绝该消息,甚至重新放回队列供其他消费者处理。
应答类型:
- **自动应答(auto-ack):**自动应答是默认设置,消费者从队列中获取消息后,RabbitMQ 会立即认为该消息已经被成功处理,即使消费者并未真正处理完成。在这种模式下,消息会在被消费后立即从队列中删除,而无需消费者确认。这种模式的缺点是,如果消费者在处理消息时崩溃,消息会丢失。
- **手动应答(manual ack):**消费者处理完消息后,需要显式地发送确认应答,通知 RabbitMQ 该消息已经处理完成。这样,如果消费者没有发送确认应答,RabbitMQ 会重新将消息发送给其他消费者。
配置
// 监听队列
// 参数2:自动应答设为 false; true:开启自动应答
channel.basicConsume("test4072",false,consumer);
参数2为true时:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq 将消息分发给消费者,就会从内存中删除。(会丢失数据消息)
参数2为false时:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。手动告诉 rabbitmq 消息处理完成后,rabbitmq 删除内存中的消息。
反馈:
//手动回馈
channel.basicAck(envelope.getDeliveryTag(),false);
使用 Nack 让消息回到队列中
// 处理条数; 是否批量处理 ;是否放回队列 false 丢弃
channel.basicNack(envelope.getDeliveryTag(),false,true);
生产者
/**
* 消息应答
*/
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 创建队列
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
for (int i = 0; i < 50; i++) {
// 定义发送信息
String msg = "hello rabbitmq-kwh"+i;
// 发送数据
channel.basicPublish("", "test4072", null, msg.getBytes());
Thread.sleep(1000);
}
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//限制每次只消费一个消息,防止通道中消息阻塞
channel.basicQos(1);
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", false, false, false, null);
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
String str = "";
try {
str = new String(body,"utf-8");
if(envelope.getDeliveryTag()==3){
int i=1/0;
}
System.out.println(envelope.getDeliveryTag()+"消费者01msg==接收=="+str);
//手动应答 处理完了
// 手动确认消息,即手动反馈
// 第一个参数:消息的序号,
// 第二个参数:是否批量,false 单条消息应答 ;当为 true 时批量应答
channel.basicAck(envelope.getDeliveryTag(),false);
}catch(Exception e){
// e.printStackTrace();
System.out.println("消费者01处理第"+envelope.getDeliveryTag()+"条,时报错,消息内容为"+str);
//手动应答 报错了
// 第一个参数:消息的序号,
// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答
// 第三个参数:是否放回队列 ;false丢弃 ,true 放回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
// 休眠一秒钟
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 监听队列
// 参数2:自动应答设为 false; true:开启自动应答
channel.basicConsume("test4072",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02(同消费者01)
消息持久化
概念
RabbitMQ 的持久化机制是确保消息和队列在系统崩溃、重启或其他故障情况下不会丢失的关键功能。确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。
配置
持久化队列
// 创建队列,
// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", true, false, false, null);
消息持久化
// 发送数据
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
如果改动队列参数配置,需要删除原有的队列,重新建,因为在 rabbitmq 是不允许重新定义一个已存在的队列。
生产者
/**
* 消息持久化
*/
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 创建队列
// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072", true, false, false, null);
for (int i = 0; i < 50; i++) {
// 定义发送信息
String msg = "hello rabbitmq-kwh"+i;
// 发送数据
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
Thread.sleep(1000);
}
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
10.5 订阅模式
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
广播模式
将消息交给所有绑定到交换机的队列,生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。交换机把消息发送给绑定过的所有队列.队列的消费者都能拿到消息。实现一条消息被多个消费者消费.
生产者
/**
* 订阅模式---广播
*/
public class Provider01 {
public static void main(String[] args) {
try {
System.out.println("--------生产者-------");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建交换机
channel.exchangeDeclare("test4072_x01","fanout");
// 创建队列
// 参数1;队列名称,
// 参数2:是否持久化,
// 参数3:是否排他:是否允许其他的connection创建的channel下的channel连接
// 参数3:是否自动删除,
// 参数4:是否空闲时自动删除
// channel.queueDeclare("test4072", false, false, false, null);
// 定义发送信息
String msg = "hello rabbitmq-kwh";
// 发送数据
//参数1:交换机名称
//参数2:
//参数3:
channel.basicPublish("test4072_x01", "", null, msg.getBytes());
System.out.println("发送成功....");
// 关闭资源
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_01", false, false, false, null);
//绑定队列到交换机
//参数1: 队列名称
//参数2:交换机名称
// 参数3:
channel.queueBind("test4072_01", "test4072_x01", "");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_01",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02
public class Consumer02 {
public static void main(String[] args) {
try {
System.out.println("======消费者02======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_02", false, false, false, null);
//绑定队列到交换机
//参数1: 队列名称
//参数2:交换机名称
// 参数3:
channel.queueBind("test4072_02", "test4072_x01", "");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_02",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
路由模式
1.在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。
2.在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key).消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
3.P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
4.X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
/**
* 发布订阅-----路由模式
*/
public class Publisher01 {
public static void main(String[] args) {
try {
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
//处理路由键 direct
channel.exchangeDeclare("test4072_x02","direct");
// 消息内容
String message = "Hello direct!11";
channel.basicPublish("test4072_x02","error",null,message.getBytes());
System.out.println("发送成功....");
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_01", false, false, false, null);
channel.basicQos(1);
//绑定交换机与队列 放入 key
channel.queueBind("test4072_01","test4072_x02","error");
channel.queueBind("test4072_01","test4072_x02","info");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_01",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02
public class Consumer02 {
public static void main(String[] args) {
try {
System.out.println("======消费者02======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_02", false, false, false, null);
channel.basicQos(1);
//绑定交换机与队列 放入 key
channel.queueBind("test4072_02","test4072_x02","info");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_02",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
主题模式(通配符模式)
在路由模式的基础上多了一个正则
1.Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
2.Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
3.通配符规则:
#
:匹配一个或多个词
*
:匹配恰好1个词
# 匹配一个或多个 (user.msg.1)
* 匹配一个 (user.goods) user.* user.login.1002
生产者
/**
* 发布订阅-----主题模式
*/
public class Publisher01 {
public static void main(String[] args) {
try {
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
//处理路由键 direct
channel.exchangeDeclare("test4072_x02","topic");
// 消息内容
String message = "Hello direct!11";
channel.basicPublish("test4072_x02","error.1001",null,message.getBytes());
System.out.println("发送成功....");
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者01
public class Consumer01 {
public static void main(String[] args) {
try {
System.out.println("======消费者01======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_01", false, false, false, null);
channel.basicQos(1);
//绑定交换机与队列 放入 key
channel.queueBind("test4072_01","test4072_x02","error.*");
channel.queueBind("test4072_01","test4072_x02","info");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_01",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
消费者02
public class Consumer02 {
public static void main(String[] args) {
try {
System.out.println("======消费者02======");
// 获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
// 创建队列(有就直接连接。没有则创建)
// 队列名称,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare("test4072_02", false, false, false, null);
channel.basicQos(1);
//绑定交换机与队列 放入 key
channel.queueBind("test4072_02","test4072_x02","info");
channel.queueBind("test4072_01","test4072_x02","error");
// 消费者消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //一旦有消息进入 将触发
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列(订阅消息)
//参数2:true:自动应答
channel.basicConsume("test4072_02",false,consumer);
}catch (Exception e) {
e.printStackTrace();
}
}
}
10.6 消息确认机制
10.7 SpringBoot集成MQ
简单队列
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-amqp
</artifactId>
</dependency>
配置MQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
生产者
@RestController
@RequestMapping("/index")
public class IndexController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
rabbitTemplate.convertAndSend("test_4072","hellokwh");
return "发送成功...";
}
}
监听队列
//监听队列
@Component
public class BootRabbitMQListener {
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage01(String message) {
System.out.println("消费者01: " + message);
}
}
工作队列
@RestController
@RequestMapping("/index")
public class IndexController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
rabbitTemplate.convertAndSend("test_4072","hellokwh");
return "发送成功...";
}
}
//监听队列
@Component
public class BootRabbitMQListener {
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage01(String message) {
System.out.println("消费者01: " + message);
}
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage02(String message) {
System.out.println("消费者02: " + message);
}
}
公平分发
需要先设置成手动反馈
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual #手动反馈
@RestController
@RequestMapping("/index")
public class IndexController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_4072","hellokwh"+i);
}
return "发送成功...";
}
}
//监听队列
@Component
public class BootRabbitMQListener {
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage01(Message message, Channel channel) {
String msg = "";
try {
msg = new String(message.getBody(), "utf-8");
System.out.println("消费者01: " + msg);
// 手动反馈
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
// 放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception e1) {
e1.printStackTrace();
}
}finally {
try {
// 休眠一秒钟
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage02(Message message, Channel channel) {
String msg = "";
try {
msg = new String(message.getBody(), "utf-8");
System.out.println("消费者02: " + msg);
// 手动反馈
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
// 放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception e1) {
e1.printStackTrace();
}
}finally {
try {
// 休眠一秒钟
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
传对象
//对象实体类
public class UserGoods implements Serializable {
private Long goodsId;
private String goodsName;
private Long userId;
}
@RestController
@RequestMapping("/index")
public class IndexController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
UserGoods userGoods = new UserGoods(1L, "猕猴桃", 1L);
rabbitTemplate.convertAndSend("test_4072",userGoods);
return "发送成功...";
}
}
@Component
public class BootRabbitMQListener {
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage01(Message message, Channel channel) {
String msg = "";
try {
//获取传过来的对象
UserGoods userGoods = (UserGoods) SerializationUtils.deserialize(message.getBody());
//调用数据库获取商品ID
// 手动反馈
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("消费者01"+userGoods.getUserId()+"购买了商品"+userGoods.getGoodsId()+"--"+userGoods.getGoodsName());
} catch (Exception e) {
e.printStackTrace();
try {
// 放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception e1) {
e1.printStackTrace();
}
}finally {
try {
// 休眠一秒钟
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RabbitListener(queuesToDeclare = @Queue("test_4072"))
public void onMessage02(Message message, Channel channel) {
String msg = "";
try {
//获取传过来的对象
UserGoods userGoods = (UserGoods) SerializationUtils.deserialize(message.getBody());
//调用数据库获取商品ID
// 手动反馈
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("消费者02"+userGoods.getUserId()+"购买了商品"+userGoods.getGoodsId()+"--"+userGoods.getGoodsName());
} catch (Exception e) {
e.printStackTrace();
try {
// 放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (Exception e1) {
e1.printStackTrace();
}
}finally {
try {
// 休眠一秒钟
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Configuration
public class RabbitMQConfig {
// 创建队列
@Bean
public Queue queue() {
return new Queue("test_4072");
}
// @Bean
// public Queue myQueue() {
// Queue queue = QueueBuilder.nonDurable("myNonDurableQueue").autoDelete().build();
// return queue;
// // return new QueueBuilder.nonDurable("myNonDurableQueue").build();
// }
}