RabbitMQ工作模式(下)

发布于:2025-09-07 ⋅ 阅读:(19) ⋅ 点赞:(0)

路由模式

在这里插入图片描述

交换机通过不同的 routingkey 绑定队列,生产者通过 routingkey 来向不同的队列发送消息

生产者代码演示:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置 MQ 参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        //建立连接
        Connection connection = factory.newConnection();
        // 开启信道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("DIRECT_EXCHANGE", BuiltinExchangeType.DIRECT, true, false, null);
        //声明队列
        channel.queueDeclare("direct1", true, false, false, null);
        channel.queueDeclare("direct2", true, false, false, null);
        //绑定队列和交换机
        channel.queueBind("direct1","DIRECT_EXCHANGE","a");
        channel.queueBind("direct2","DIRECT_EXCHANGE","a");
        channel.queueBind("direct2","DIRECT_EXCHANGE","b");
        channel.queueBind("direct2","DIRECT_EXCHANGE","c");
        //发送消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("DIRECT_EXCHANGE", "a", null, ("hello" + i).getBytes());
            channel.basicPublish("DIRECT_EXCHANGE", "b", null, ("hello" + i).getBytes());
            channel.basicPublish("DIRECT_EXCHANGE", "c", null, ("hello" + i).getBytes());
        }
        //关闭资源
        channel.close();
        connection.close();
    }
}

前面的建立连接的代码大家可以抽离出来,这里不抽离是方便大家了解整个代码的编写过程

在上面我们建立的队列和交换机的绑定关系图如下:
在这里插入图片描述
这里可以看到同样的 routingkey 为 a 绑定了两个队列,如果生产者使用 a 这个 routingkey 发送消息,那么两个队列都会接收到
在这里插入图片描述

消费者代码演示:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置 MQ 参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        //建立连接
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("direct1", true, false, false, null);
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("direct1", true, consumer);
    }
}
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置 MQ 参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        //建立连接
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("direct2", true, false, false, null);
        //进行消费
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("direct2", true, consumer);
    }
}

通配符模式

在这里插入图片描述

这里有两种符号需要认识,首先 * 表示匹配一个单词,# 表示匹配 0 - n 个单词,只要符合上面的路由规则交换机就会将消息发送到对应的队列上。

生产者代码演示:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
        //声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
        //绑定队列和交换机
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.orange.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.rabbit");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "lazy.#");
        //发送消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(Constants.TOPIC_EXCHANGE, "quick.orange.rabbit", null, ("quick.orange.rabbit hello" + i).getBytes());
            channel.basicPublish(Constants.TOPIC_EXCHANGE, "lazy", null, ("lazy hello" + i).getBytes());
        }
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码演示:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        //进行消费
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
    }
}
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列 避免没有队列发生异常
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
        //从队列中获取信息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
    }
}

RPC

在这里插入图片描述

PRC 模式一般很少使用,当我们的生产者需要消费者的响应的时候,我们才会使用这个模式。

这里有两个重要的参数,correlation_id 是消息的标识符,主要用于区分消息,由于Client需要得到 Server 的响应所以这里的 correlation_id 需要区分这是哪条消息

reply_to 用于Client指定对应的队列去路由消息

Client 代码演示:

public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        // 设置消息
        String msg = "hello rpc";
        //设置请求的唯一标识
        String correlationId = UUID.randomUUID().toString();
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .correlationId(correlationId)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes());
        //接收响应
        //使用阻塞队列存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        //消费者逻辑
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String responseMsg = new String(body);
                System.out.println("接收到回调信息:" + responseMsg);
                if (correlationId.equals(properties.getAppId())) {
                    response.offer(responseMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true , consumer);
        //取出消息进行消费
        String res = response.take();
        System.out.println("最终结果:" + res);
    }
}

代码简单介绍:
String correlationId = UUID.randomUUID().toString(); 用于标识消息

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
通过设置 属性,将我们的 correlationId 和 replyTo(指定对应的队列接发消息)设置进去

Server 代码演示:

public class PpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        //由于使用的是默认的交换机,所以绑定队列省略
        //消费者最多获取到一个未确认的消息
        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body, "UTF-8");
                System.out.println("接收到的请求为:" + request);
                String response = "针对 resquest 的响应为:" + request + "响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

代码简单介绍
channel.basicQos(1); 这是rabbitmq 的高级特性,用于消费者最多接收多少个未确认的消息,可以调节消费者的吞吐量

在 RPC 中我们也要设置correlationId,这里要求correlationId和生产者的correlationId要保持一致,这样生产者才能识别出来这是哪一条消息

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(“”, Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());

Publisher Confirms(发布确认)

这个也可以当作是生产者将消息成功发送到指定的broker的可靠保证的方式

发布确认一共有三种方式:单独确认,批量确认,异步确认

下面是建立连接的方法:

    //建立连接
    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        return factory.newConnection();
    }

Publishing Messages Individually(单独确认)

单独确认就是生成者每发送一条消息,都要等待broker 回复 ack,只有等到ack,才会发送下一则消息,效率不高

开启发布确认模式需要进行下面的设置:

channel.confirmSelect();

等到ack 回复可以使用下面的方法:

channel.waitForConfirmsOrDie(5000);

    /**
     * 单独确认模式
     */
    public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = createConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //设置为发布确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);
        //发布消息
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "publishingMessagesIndividually:" + i;
            channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());
            //最多等待消息确认的时间 5s
            channel.waitForConfirmsOrDie(5000);
        }
        long end = System.currentTimeMillis();
        System.out.println("单独确认消息总耗时:" + (end - start));
    }

Publishing Messages in Batches(批量确认)

批量确认顾名思义就是等待消息发送到一定数量的时候才进行确认,这里有个问题就是在发生故障时我们无法确切知道哪里出了问题,因此我们可能需要将整个批次保存在内存中,以记录一些有意义的信息或重新发布消息。此外,该解决方案仍然是同步的,因此会阻塞消息的发布。下面的官方的解释:

在这里插入图片描述

对比我们的单独确认,批量确认确实能够提高我们的吞吐量。

    /**
     * 批量确认模式
     */
    public static void publishingMessagesBatches() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = createConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //设置为发布确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);
        //发布消息
        long start = System.currentTimeMillis();
        //计数
        int count = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "publishingMessagesBatches:" + i;
            channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());
            count++;
            if(count == 100) {
                //最大等待时间
                channel.waitForConfirmsOrDie(5000);
                count = 0;
            }
        }
        //最后一次确认,确保剩余的不足100条消息全部确认掉
        if(count > 0) {
            channel.waitForConfirmsOrDie(5000);
        }
        long end = System.currentTimeMillis();
        System.out.println("批量确认消息总耗时:" + (end - start));
    }

Handling Publisher Confirms Asynchronously(异步确认)

异步确认是指发送消息和接收消息的 ack 这两个动作是异步的,也就意味着在高吞吐量的情况下,我们可以更好地应对,比起前两种方式,这种异步确认的方式确实效率更高

    /**
     * 异步确认模式
     */
    public static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = createConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //设置为发布确认模式
        channel.confirmSelect();
        //声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);
        //发布消息
        long start = System.currentTimeMillis();
        //存放消息序号的容器
        SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
        //添加监听器
        channel.addConfirmListener(new ConfirmListener() {
            //收到确认信息
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if(multiple) {
                    //将当前消息以及当前消息前面所有的消息删除
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    //只删除当前消息
                    confirmSet.remove(deliveryTag);
                }
            }

            //收到nack 的消息
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    confirmSet.remove(deliveryTag);
                }
                //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
            }
        });

        //发布消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "handlingPublisherConfirmsAsynchronously: " + i;
            //将消息的序号放入到容器中
            //channel.getNextPublishSeqNo() 在确认模式下,返回要发布的下一条消息的序列号。
            confirmSet.add(channel.getNextPublishSeqNo());
            channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());
        }
        while(!confirmSet.isEmpty()) {
            Thread.sleep(10);
        }
        long end = System.currentTimeMillis();
        System.out.println("异步确认消息总耗时:" + (end - start));
    }

代码解读:
异步确认最重要的就是设置监听器:我们要设置两个方法,一个是接收到 ack 的处理,另一个则是接收到 nack 的处理,这里有一个参数 multiple 【表示是否批量确认,如果是批量确认的话,意味着接受到序列号为deliveryTag的消息的时候,小于等于deliveryTag的消息一同都要被确认掉,如果不是批量确认,那就单独确认,需要我们一条一条消息进行确认】

        //添加监听器
        channel.addConfirmListener(new ConfirmListener() {
            //收到确认信息
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if(multiple) {
                    //将当前消息以及当前消息前面所有的消息删除
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    //只删除当前消息
                    confirmSet.remove(deliveryTag);
                }
            }

            //收到nack 的消息
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    confirmSet.remove(deliveryTag);
                }
                //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
            }
        });

其次就是要创建一个容器用于保存消息的序列号

        //存放消息序号的容器
        SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

channel.getNextPublishSeqNo() 这个方法是在确认模式下,返回要发布的下一条消息的序列号deliveryTag

channel.getNextPublishSeqNo()

网站公告

今日签到

点亮在社区的每一天
去签到