RabbitMQ 应用
1. 工作模式介绍
1.1 Simple (简单模式)
(1) 概述
特点: 一个生产者, 一个消费者. 消息只能被消费一次. 也称为点对点 (Point-to-Point) 模式.
(2) 代码演示
Producer:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
//(1) IP
//(2) 端口号
//(3) 账号
//(4) 密码
//(5) 虚拟主机
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.53.237.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("vhost1");
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明交换机 (使用内置交换机)
//4.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String,Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化
* exclusive:是否独占
* autoDelete:是否自动删除
* arguments:参数
*/
channel.queueDeclare("hello", true, false, false, null);
//5. 发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数说明:
* exchange:交换机名称
* routingKey:路由标签. (对于内置交换机,routingkey和队列名称保持一致)
* props:属性配置
* body:消息
*/
for (int i = 0; i < 10; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println("消息发送成功" + i);
}
//6. 资源释放
channel.close();
connection.close();
}
}
Consumer:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.53.237.101");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("vhost1");
Connection connection = connectionFactory.newConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 声明队列 (如果消费者要订阅的队列已经存在, 则可以省略)
channel.queueDeclare("hello",true, false, false, null);
//4. 消费消息
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数说明:
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 接收到消息后, 执行的逻辑
*/
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume("hello", true, consumer);
//等待程序执行完成
Thread.sleep(3000);
//5. 释放资源
channel.close();
connection.close();
}
}
1.2 Work Queue (工作队列模式)
(1) 概述
一个生产者 § , 多个消费者 (C1, C2 …).
特点: 消息不会重复, 多个消费者共同消费 Queue 中的消息.
适用场景: 集群环境中做异步处理.
(2) 代码演示
Producer:
package com.wang.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD);//密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列 (使用内置的交换机)
//如果队列不存在, 则创建 ; 如果队列已经存在, 则不创建
/**
* 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。
* 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。
* 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。
* 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。
* 第五个参数: map,arguments,(额外参数),null 就是没有
*/
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4. 发送消息
for (int i = 0; i < 10; i++) {
String msg = "hello work queue...." + i;
/**
* 第一个参数: exchange,这里是空字符串,说明用默认交换器. 。
* 第二个参数: routingKey,这里是队列名,(因为默认交换器下,routingKey 就是队列名,确保消息路由到对应队列)
* 第三个参数: basicProperties,消息的属性,比如持久化、优先级等,null 表示使用默认属性。
* 第四个参数: 消息本体,转成字节数组发送.
*/
channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
}
System.out.println("消息发送成功~");
//6. 资源释放
channel.close();
connection.close();
}
}
Consumer1:
package com.wang.rabbitmq.work;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列 (使用内置的交换机)
//如果队列不存在, 则创建, 如果队列存在, 则不创建
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
/**
* consumerTag:消费者标签,标识消费者。
* envelope:包含消息元数据,比如消息 ID、投递标签等。
* properties:消息的属性,如持久化等设置。
* body:消息体,字节数组,需要转字符串。
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
/**
*
1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.
2.b: true(autoAck):表示是否自动确认消息. true(自动确认)
意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);
若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。
3.consumer:消费者对象,包含处理消息的逻辑.
*/
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//6. 资源释放
// channel.close();
// connection.close();
}
}
Consumer2:
package com.wang.rabbitmq.work;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列 (使用内置的交换机)
//如果队列不存在, 则创建, 如果队列存在, 则不创建
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
/**
* consumerTag:消费者标签,标识消费者。
* envelope:包含消息元数据,比如消息 ID、投递标签等。
* properties:消息的属性,如持久化等设置。
* body:消息体,字节数组,需要转字符串。
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
/**
*
1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.
2.b: true(autoAck):表示是否自动确认消息. true(自动确认)
意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);
若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。
3.consumer:消费者对象,包含处理消息的逻辑.
*/
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//6. 资源释放
// channel.close();
// connection.close();
}
}
1.3 Publish/Subscribe (发布/订阅模式)
(1) 概述
X: 交换机. 交换机将消息按一定规则路由到一个或多个队列上.
交换机有四种类型:
- Fanout: 广播类型. 将消息发给所有绑定到交换机的队列 --> (Publish/Subscribe 模式)
- Direct: 定向类型. 把消息交给符合指定 routing key 的队列 --> (Routing 模式)
- Topic: 通配符类型. 把消息交给符合 routing pattern 的队列 --> (Topics模式)
- headers: headers 类型的交换机不依赖于路由键来路由消息, 而是根据消息内容中的 headers 属性进行匹配. headers 类型的交换器性能会很差, 而且不实用, 基本很少使用.
[!TIP]
注:
路由键 (Routing key). 生产者和交换机之间的路由键称为 Routing key, 交换机和队列之间的路由键称为 Binding key (Binding key也是 Routing key 的一种, 有时候也直接写做 Routing key).
发布/订阅模式中的交换机是 Fanout (广播类型), X 收到消息后会将消息发给所有队列. C1 订阅 Q1 ; C2 订阅 Q2.
(2) 代码演示
Producer:
package com.wang.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
/**
* 第一个参数: 交换器名称,这里 Constants.FANOUT_EXCHANGE,指定交换器的标识。
* 第二个参数: 交换器类型,BuiltinExchangeType.FANOUT,说明是扇出类型,也就是广播模式。
* 第三个参数: durable,true 表示持久化,服务器重启后交换器还在。
*/
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//4. 声明队列
/**
* 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。
* 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。
* 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。
* 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。
* 第五个参数: map,arguments,(额外参数),null 就是没有
*/
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
//6. 发布消息
String msg = "hello fanout....";
channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
Consumer1:
package com.wang.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD);//密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
}
}
Consumer2:
package com.wang.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);
}
}
1.4 Routing (路由模式)
(1) 概述
发布订阅模式是无条件的将所有消息分发给所有消费者, 而路由模式是 Exchange 根据 RoutingKey 的是否匹配, 将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定规则分发消息的场景.
如上图, Binding key 有 a, b ,c 三种. 当 Routing key == a 时, 对应消息发给 Q1, Q2 ; 当 Routing key == b / c 时, 对应消息发给 Q2.
(2) 代码演示
Producer:
package com.wang.rabbitmq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 路由模式生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//4. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
//5. 绑定交换机和队列
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
//6. 发送消息
String msg = "hello direct, my routingkey is a....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());
String msg_b = "hello direct, my routingkey is b....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
String msg_c = "hello direct, my routingkey is c....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
Consumer1:
package com.wang.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
}
}
Consumer2:
package com.wang.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
}
}
1.5 Topics (通配符模式)
(1) 概述
通配符模式是路由模式的升级版. 在路由模式的基础上添加了通配符, 使得匹配规则更加灵活.
(2) 代码演示
Producer:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class TopicRabbitProducer {
public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";
public static void main(String[] args) throws Exception {
//1. 创建channel通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST); //ip 默认值localhost
factory.setPort(Constants.PORT); //默认值5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /
factory.setUsername(Constants.USER_NAME); //用户名,默认guest
factory.setPassword(Constants.PASSWORD); //密码,默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//2. 创建交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);
//3. 声明队列
//如果没有这样的一个队列,会自动创建,如果有,则不创建
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);
//4. 绑定队列和交换机
//队列1绑定error,仅接收error信息
channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
//队列2绑定info,error: error,info信息都接收
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
//5. 发送消息
String msg = "hello topic, I'm order.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBytes());
String msg_black = "hello topic, I'm order.pay.info";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_black.getBytes());
String msg_green= "hello topic, I'm pay.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.getBytes());
//6.释放资源
channel.close();
connection.close();
}
}
Consumer:
import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {
public static void main(String[] args) throws Exception {
//1. 创建channel通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST); //ip 默认值localhost
factory.setPort(Constants.PORT); //默认值5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /
factory.setUsername(Constants.USER_NAME); //用户名,默认guest
factory.setPassword(Constants.PASSWORD); //密码,默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//2. 接收消息,并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
}
}
1.6 RPC (RPC 通信)
(1) 概述
RPC 通信中, 没有生产者和消费者,只有客户端和服务端.
客户端发送请求, 接收响应 ; 服务端接收请求, 发送响应.
工作流程:
- 客户端发送消息到一个指定的队列, 并在消息属性中设置 replyTo 字段, 这个字段指定了一个回调队列, 表示在这个队列里接收服务端响应.
- 服务端接收到请求后, 处理请求并发送响应到指定的回调队列.
- 客户端在回调队列上等待响应消息. 一旦收到响应, 客户端会检查消息的 correlationId 属性, 以确保同一请求对应的响应.
(2) 代码演示
客户端:
package com.wang.rabbitmq.rpc;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* rpc 客户端
* 1. 发送请求
* 2. 接收响应
*/
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
//3. 发送请求
String msg = "hello rpc...";
//设置请求的唯一标识 (correlationId)
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
//4. 接收响应
//使用阻塞队列, 来存储响应信息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调消息: "+ respMsg);
if (correlationID.equals(properties.getCorrelationId())){
//如果correlationID校验一致
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String result = response.take();
System.out.println("[RPC Client 响应结果]:"+ result);
}
}
服务端:
package com.wang.rabbitmq.rpc;
import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RPC server
* 1. 接收请求
* 2. 发送响应
*/
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 接收请求
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body,"UTF-8");
System.out.println("接收到请求:"+ request);
String response = "针对request:"+ request +", 响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
}
}
2. SpringBoot 完成工作模式
详见项目 rabbitmq-spring-demo
(1) 配置文件: application.yml
spring:
application:
name: rabbitmq-spring-demo
rabbitmq:
host: 120.53.237.101
port: 5672
username: admin
password: admin
virtual-host: vhost2
(2) config 包
RabbitMQConfig 类:
(用来配置要使用的 交换机, 队列, 交换机和队列之间的绑定关系)
package com.wang.rabbitmqspringdemo.config;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//工作模式 (使用默认交换机)
@Bean("workQueue")
public Queue workQueue(){
return QueueBuilder.durable(Constants.WORK_QUEUE).build();
}
//广播模式
@Bean("fanoutQueue1")
public Queue fanoutQueue1(){
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2(){
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
}
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
//路由模式
@Bean("directQueue1")
public Queue directQueue1(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
//通配符模式
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}
@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}
}
(3) Constant 包
Constants 类:
(用来配置常量 (交换机名称, 队列名称) )
package com.wang.rabbitmqspringdemo.constant;
public class Constants {
public static final String WORK_QUEUE = "work.queue";
//发布订阅模式
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
//路由模式
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
//通配符模式
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
}
(4) controller 包
ProducerController 类:
(写生产者代码)
使用 rabbbitTemplate对象的 convertAndSend 方法来发送消息.
package com.wang.rabbitmqspringdemo.controller;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work(){
for (int i = 0; i < 10; i++) {
//使用默认交换机, RoutingKey 和队列名称一致
// 发送消息
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work..."+i);
}
return "发送成功";
}
@RequestMapping("/fanout")
public String fanout(){
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");
return "发送成功";
}
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct, my routing key is "+routingKey);
return "发送成功";
}
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);
return "发送成功";
}
}
(5) listener 包
工作队列模式 (WorkListener 类)
package com.wang.rabbitmqspringdemo.listener;
import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(Message message, Channel channel){
System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message + ",channel:"+channel);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message){
System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
}
广播模式 (FanoutListener 类)
package com.wang.rabbitmqspringdemo.listener;
import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void queueListener1(String message){
System.out.println("队列["+ Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void queueListener2(String message){
System.out.println("队列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);
}
}
路由模式 (DirectListener 类)
package com.wang.rabbitmqspringdemo.listener;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message){
System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message){
System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);
}
}
通配符模式 (TopicListener 类)
package com.wang.rabbitmqspringdemo.listener;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicListener {
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void queueListener1(String message){
System.out.println("队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void queueListener2(String message){
System.out.println("队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message);
}
}