路由模式
逻辑图
如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解
一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的
一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的
如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】
如果有一条 info 的日志,它应当只发送给【所有】
如果有一条 warning 的日志,它应当只发送给【所有】
如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class RoutingProducer {
/**
* 生产者 → 消息队列
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* ----------------
* 创建交换机
* 创建队列
* 交换机绑定到队列
* <p>
* 发送消息
*/
//定义交换机名称
private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";
//定义一个 error 队列,仅有 error 的日志到这个队列
private static final String ERROR_QUEUE_NAME = "my_error_queue";
//定义一个 all 队列, error info warning 级别的日志都到这个队列
private static final String ALL_QUEUE_NAME = "my_all_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建交换机,使用路由模式的交换机
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
//创建队列
channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);
channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);
//绑定交换机
/**
* String queue :队列名称
* String exchange :交换机名称
* String routingKey :路由键,fanout 广播模式不需要路由键
* Map<String, Object> arguments:参数
*/
channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");
channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");
channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");
channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");
//发送短信
String[] keys = {"error", "info", "warning"};
int errorCount = 0;
int infoCount = 0;
int warningCount = 0;
for (int i = 0; i < 30; i++) {
int random = (int) (Math.random() * (3 - 1 + 1)) + 0; //生成0,1,2随机数
String logLevel = keys[random];
String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("发送消息:\t" + str);
channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());
if (random == 0) {
errorCount++;
} else if (random == 1) {
infoCount++;
} else if (random == 2) {
warningCount++;
}
}
System.out.println("error\t共计: " + errorCount + "条");
System.out.println("info\t共计: " + infoCount + "条");
System.out.println("warning\t共计: " + warningCount + "条");
// 关闭资源
channel.close();
connection.close();
}
}
消费者
error
- 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ErrorRoutingConsumer {
/**
* 消息队列 ← 消费者
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 订阅队列
* 接收消息
*/
//定义一个 error 队列,仅有 error 的日志到这个队列
private static final String ERROR_QUEUE_NAME = "my_error_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,并设置参数
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
//创建连接 Connection
Connection connection = factory.newConnection();
//创建通道 Channel
Channel channel = connection.createChannel();
/**
* consumerTag 消费信息标签
* delivery 回执
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
System.out.println("【error消费者】消费消息:\t" + new String(body));
};
/**
* basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
* String queue : 队列名称
* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
* DeliverCallback deliverCallback : 回调函数
* CancelCallback cancelCallback : 消费者取消订阅时的回调函数
*/
channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
error info warning
- 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class AllRoutingConsumer {
/**
* 消息队列 ← 消费者
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 订阅队列
* 接收消息
*/
//定义一个 all 队列, error info warning 级别的日志都到这个队列
private static final String ALL_QUEUE_NAME = "my_all_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,并设置参数
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
//创建连接 Connection
Connection connection = factory.newConnection();
//创建通道 Channel
Channel channel = connection.createChannel();
/**
* consumerTag 消费信息标签
* delivery 回执
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
System.out.println("【all 消费者】消费消息:\t" + new String(body));
};
/**
* basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
* String queue : 队列名称
* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
* DeliverCallback deliverCallback : 回调函数
* CancelCallback cancelCallback : 消费者取消订阅时的回调函数
*/
channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
测试
启动生产者,查看 RabbitMQ 网页控制条
启动 error 消费者
启动 all 消费者
再次启动生产者