4 路由模式

发布于:2024-09-19 ⋅ 阅读:(111) ⋅ 点赞:(0)

路由模式

逻辑图

image-20210811143722455

如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解

一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的

一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的

如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】

如果有一条 info 的日志,它应当只发送给【所有】

如果有一条 warning 的日志,它应当只发送给【所有】

如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class RoutingProducer {
    /**
     * 生产者 → 消息队列
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * ----------------
     * 创建交换机
     * 创建队列
     * 交换机绑定到队列
     * <p>
     * 发送消息
     */
    //定义交换机名称
    private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";
    //定义一个 error 队列,仅有 error 的日志到这个队列
    private static final String ERROR_QUEUE_NAME = "my_error_queue";
    //定义一个 all 队列, error info warning 级别的日志都到这个队列
    private static final String ALL_QUEUE_NAME = "my_all_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机,使用路由模式的交换机
        channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
        //创建队列
        channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);
        channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);

        //绑定交换机
        /**
         * String queue                 :队列名称
         * String exchange              :交换机名称
         * String routingKey            :路由键,fanout 广播模式不需要路由键
         * Map<String, Object> arguments:参数
         */
        channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");

        channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");
        channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");
        channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");

        //发送短信
        String[] keys = {"error", "info", "warning"};
        int errorCount = 0;
        int infoCount = 0;
        int warningCount = 0;
        for (int i = 0; i < 30; i++) {
            int random = (int) (Math.random() * (3 - 1 + 1)) + 0;   //生成0,1,2随机数
            String logLevel = keys[random];

            String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            System.out.println("发送消息:\t" + str);
            channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());

            if (random == 0) {
                errorCount++;
            } else if (random == 1) {
                infoCount++;
            } else if (random == 2) {
                warningCount++;
            }
        }

        System.out.println("error\t共计: " + errorCount + "条");
        System.out.println("info\t共计: " + infoCount + "条");
        System.out.println("warning\t共计: " + warningCount + "条");

        // 关闭资源
        channel.close();
        connection.close();
    }
}

消费者

error

  • 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ErrorRoutingConsumer {
    /**
     * 消息队列 ← 消费者
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 订阅队列
     * 接收消息
     */
    //定义一个 error 队列,仅有 error 的日志到这个队列
    private static final String ERROR_QUEUE_NAME = "my_error_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        //创建连接 Connection
        Connection connection = factory.newConnection();
        //创建通道 Channel
        Channel channel = connection.createChannel();


        /**
         * consumerTag  消费信息标签
         * delivery     回执
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] body = delivery.getBody();
            System.out.println("【error消费者】消费消息:\t" + new String(body));
        };
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * String queue                         :   队列名称
         * boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
         * DeliverCallback deliverCallback      :   回调函数
         * CancelCallback cancelCallback        :   消费者取消订阅时的回调函数
         */
        channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

error info warning

  • 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AllRoutingConsumer {
    /**
     * 消息队列 ← 消费者
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 订阅队列
     * 接收消息
     */
    //定义一个 all 队列, error info warning 级别的日志都到这个队列
    private static final String ALL_QUEUE_NAME = "my_all_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        //创建连接 Connection
        Connection connection = factory.newConnection();
        //创建通道 Channel
        Channel channel = connection.createChannel();


        /**
         * consumerTag  消费信息标签
         * delivery     回执
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] body = delivery.getBody();
            System.out.println("【all 消费者】消费消息:\t" + new String(body));
        };
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * String queue                         :   队列名称
         * boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
         * DeliverCallback deliverCallback      :   回调函数
         * CancelCallback cancelCallback        :   消费者取消订阅时的回调函数
         */
        channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

测试

  • 启动生产者,查看 RabbitMQ 网页控制条

  • 启动 error 消费者

  • 启动 all 消费者

  • 再次启动生产者

image-20210811155433733

image-20210811155448009image-20210811155457070

SpringBoot 整合

小结


网站公告

今日签到

点亮在社区的每一天
去签到