RabbitMQ 应用

发布于:2025-05-25 ⋅ 阅读:(23) ⋅ 点赞:(0)

RabbitMQ 应用

1. 工作模式介绍

1.1 Simple (简单模式)
(1) 概述

image-20250524105705619

特点: 一个生产者, 一个消费者. 消息只能被消费一次. 也称为点对点 (Point-to-Point) 模式.

(2) 代码演示

Producer:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接

        //(1) IP
        //(2) 端口号
        //(3) 账号
        //(4) 密码
        //(5) 虚拟主机
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("120.53.237.101");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("vhost1");
        Connection connection = factory.newConnection();

        //2.开启信道
        Channel channel = connection.createChannel();

        //3.声明交换机 (使用内置交换机)

        //4.声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *              Map<String,Object> arguments)
         *  参数说明:
         *  queue:队列名称
         *  durable:可持久化
         *  exclusive:是否独占
         *  autoDelete:是否自动删除
         *  arguments:参数
         */
        channel.queueDeclare("hello", true, false, false, null);

        //5. 发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *  参数说明:
         *  exchange:交换机名称
         *  routingKey:路由标签. (对于内置交换机,routingkey和队列名称保持一致)
         *  props:属性配置
         *  body:消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello rabbitmq" + i;
            channel.basicPublish("", "hello", null, msg.getBytes());
            System.out.println("消息发送成功" + i);
        }


        //6. 资源释放
        channel.close();
        connection.close();
    }
}

Consumer:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.53.237.101");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("vhost1");
        Connection connection = connectionFactory.newConnection();
        //2. 创建Channel
        Channel channel = connection.createChannel();
        //3. 声明队列 (如果消费者要订阅的队列已经存在, 则可以省略)
        channel.queueDeclare("hello",true, false, false, null);
        //4. 消费消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数说明:
         * queue: 队列名称
         * autoAck: 是否自动确认
         * callback: 接收到消息后, 执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };

        channel.basicConsume("hello", true, consumer);

        //等待程序执行完成
        Thread.sleep(3000);

        //5. 释放资源
        channel.close();
        connection.close();
    }
}

1.2 Work Queue (工作队列模式)
(1) 概述

image-20250524105844133

一个生产者 § , 多个消费者 (C1, C2 …).

特点: 消息不会重复, 多个消费者共同消费 Queue 中的消息.

适用场景: 集群环境中做异步处理.

(2) 代码演示

Producer:

package com.wang.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);//密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   (使用内置的交换机)
        //如果队列不存在, 则创建 ; 如果队列已经存在, 则不创建
        /**
         * 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。
         * 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。
         * 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。
         * 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。
         * 第五个参数: map,arguments,(额外参数),null 就是没有
         */
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4. 发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello work queue...." + i;
            /**
             * 第一个参数: exchange,这里是空字符串,说明用默认交换器. 。
             * 第二个参数: routingKey,这里是队列名,(因为默认交换器下,routingKey 就是队列名,确保消息路由到对应队列)
             * 第三个参数: basicProperties,消息的属性,比如持久化、优先级等,null 表示使用默认属性。
             * 第四个参数: 消息本体,转成字节数组发送.
             */
            channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
        }

        System.out.println("消息发送成功~");

        //6. 资源释放
        channel.close();
        connection.close();
    }
}

Consumer1:

package com.wang.rabbitmq.work;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   (使用内置的交换机)
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            /**
             * consumerTag:消费者标签,标识消费者。
             * envelope:包含消息元数据,比如消息 ID、投递标签等。
             * properties:消息的属性,如持久化等设置。
             * body:消息体,字节数组,需要转字符串。
             */
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        /**
         *
         1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.
         2.b: true(autoAck):表示是否自动确认消息. true(自动确认)
         意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);
         若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。
         3.consumer:消费者对象,包含处理消息的逻辑.
         */
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}

Consumer2:

package com.wang.rabbitmq.work;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   (使用内置的交换机)
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            /**
             * consumerTag:消费者标签,标识消费者。
             * envelope:包含消息元数据,比如消息 ID、投递标签等。
             * properties:消息的属性,如持久化等设置。
             * body:消息体,字节数组,需要转字符串。
             */
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        /**
         *
         1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.
         2.b: true(autoAck):表示是否自动确认消息. true(自动确认)
         意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);
         若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。
         3.consumer:消费者对象,包含处理消息的逻辑.
         */
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}

1.3 Publish/Subscribe (发布/订阅模式)
(1) 概述

image-20250524110202878

X: 交换机. 交换机将消息按一定规则路由到一个或多个队列上.

交换机有四种类型:

  1. Fanout: 广播类型. 将消息发给所有绑定到交换机的队列 --> (Publish/Subscribe 模式)
  2. Direct: 定向类型. 把消息交给符合指定 routing key 的队列 --> (Routing 模式)
  3. Topic: 通配符类型. 把消息交给符合 routing pattern 的队列 --> (Topics模式)
  4. headers: headers 类型的交换机不依赖于路由键来路由消息, 而是根据消息内容中的 headers 属性进行匹配. headers 类型的交换器性能会很差, 而且不实用, 基本很少使用.

[!TIP]

注:

image-20250312103123895

路由键 (Routing key). 生产者和交换机之间的路由键称为 Routing key, 交换机和队列之间的路由键称为 Binding key (Binding key也是 Routing key 的一种, 有时候也直接写做 Routing key).

发布/订阅模式中的交换机是 Fanout (广播类型), X 收到消息后会将消息发给所有队列. C1 订阅 Q1 ; C2 订阅 Q2.

(2) 代码演示

Producer:

package com.wang.rabbitmq.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        /**
         * 第一个参数: 交换器名称,这里 Constants.FANOUT_EXCHANGE,指定交换器的标识。
         * 第二个参数: 交换器类型,BuiltinExchangeType.FANOUT,说明是扇出类型,也就是广播模式。
         * 第三个参数: durable,true 表示持久化,服务器重启后交换器还在。
         */
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
        //4. 声明队列
        /**
         * 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。
         * 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。
         * 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。
         * 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。
         * 第五个参数: map,arguments,(额外参数),null 就是没有
         */
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");

        //6. 发布消息
        String msg = "hello fanout....";
        channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();

    }
}

Consumer1:

package com.wang.rabbitmq.fanout;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);//密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
    }
}

Consumer2:

package com.wang.rabbitmq.fanout;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);

    }
}

1.4 Routing (路由模式)

(1) 概述

image-20250524112203207

发布订阅模式是无条件的将所有消息分发给所有消费者, 而路由模式是 Exchange 根据 RoutingKey 的是否匹配, 将数据筛选后发给对应的消费者队列.

适合场景: 需要根据特定规则分发消息的场景.

如上图, Binding key 有 a, b ,c 三种. 当 Routing key == a 时, 对应消息发给 Q1, Q2 ; 当 Routing key == b / c 时, 对应消息发给 Q2.

(2) 代码演示

Producer:

package com.wang.rabbitmq.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 路由模式生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
        //5. 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");

        //6. 发送消息
        String msg = "hello direct, my routingkey is a....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());

        String msg_b = "hello direct, my routingkey is b....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());

        String msg_c = "hello direct, my routingkey is c....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

Consumer1:

package com.wang.rabbitmq.direct;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

    }
}

Consumer2:

package com.wang.rabbitmq.direct;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);

    }
}

1.5 Topics (通配符模式)
(1) 概述

image-20250524112321369

通配符模式是路由模式的升级版. 在路由模式的基础上添加了通配符, 使得匹配规则更加灵活.

(2) 代码演示

Producer:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;

public class TopicRabbitProducer {
    public static String TOPIC_EXCHANGE_NAME = "test_topic";
    public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
    public static String TOPIC_QUEUE_NAME2 = "topic_queue2";

    public static void main(String[] args) throws Exception {
        //1. 创建channel通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST); //ip 默认值localhost
        factory.setPort(Constants.PORT); //默认值5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /
        factory.setUsername(Constants.USER_NAME); //用户名,默认guest
        factory.setPassword(Constants.PASSWORD); //密码,默认guest
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //2. 创建交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
                BuiltinExchangeType.TOPIC, true, false, false, null);
        //3. 声明队列
        //如果没有这样的一个队列,会自动创建,如果有,则不创建
        channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);
        //4. 绑定队列和交换机
        //队列1绑定error,仅接收error信息
        channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
                "*.error");
        //队列2绑定info,error: error,info信息都接收
        channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
                "#.info");
        channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
                "*.error");
        //5. 发送消息
        String msg = "hello topic, I'm order.error";
        channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBytes());
        String msg_black = "hello topic, I'm order.pay.info";
        channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_black.getBytes());
        String msg_green= "hello topic, I'm pay.error";
        channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.getBytes());
        //6.释放资源
        channel.close();
        connection.close();
    }
}

Consumer:

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;

public class TopicRabbitmqConsumer1 {
    public static void main(String[] args) throws Exception {
        //1. 创建channel通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST); //ip 默认值localhost
        factory.setPort(Constants.PORT); //默认值5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /
        factory.setUsername(Constants.USER_NAME); //用户名,默认guest
        factory.setPassword(Constants.PASSWORD); //密码,默认guest
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //2. 接收消息,并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
    }
}
1.6 RPC (RPC 通信)
(1) 概述

image-20250524112353264

RPC 通信中, 没有生产者和消费者,只有客户端和服务端.

客户端发送请求, 接收响应 ; 服务端接收请求, 发送响应.

image-20250524112405840

工作流程:

  1. 客户端发送消息到一个指定的队列, 并在消息属性中设置 replyTo 字段, 这个字段指定了一个回调队列, 表示在这个队列里接收服务端响应.
  2. 服务端接收到请求后, 处理请求并发送响应到指定的回调队列.
  3. 客户端在回调队列上等待响应消息. 一旦收到响应, 客户端会检查消息的 correlationId 属性, 以确保同一请求对应的响应.
(2) 代码演示

客户端:

package com.wang.rabbitmq.rpc;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
 * rpc 客户端
 * 1. 发送请求
 * 2. 接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识 (correlationId)
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);

    }
}

服务端:

package com.wang.rabbitmq.rpc;

import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RPC server
 * 1. 接收请求
 * 2. 发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

2. SpringBoot 完成工作模式

详见项目 rabbitmq-spring-demo

(1) 配置文件: application.yml
spring:
  application:
    name: rabbitmq-spring-demo
  rabbitmq:
    host: 120.53.237.101
    port: 5672
    username: admin
    password: admin
    virtual-host: vhost2

(2) config 包

RabbitMQConfig 类:

(用来配置要使用的 交换机, 队列, 交换机和队列之间的绑定关系)

package com.wang.rabbitmqspringdemo.config;

import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    //工作模式 (使用默认交换机)
    @Bean("workQueue")
    public Queue workQueue(){
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    }

    //广播模式
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    }

    @Bean("fanoutQueue2")
    public Queue fanoutQueue2(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    }
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
    }

    @Bean("fanoutQueueBinding1")
    public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean("fanoutQueueBinding2")
    public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    
    //路由模式
    @Bean("directQueue1")
    public Queue directQueue1(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }
    @Bean("directQueue2")
    public Queue directQueue2(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }
    @Bean("directExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    }
    @Bean("directQueueBinding1")
    public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }

    @Bean("directQueueBinding2")
    public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("black");
    }

    @Bean("directQueueBinding3")
    public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }
    
    //通配符模式
    @Bean("topicQueue1")
    public Queue topicQueue1(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    }

    @Bean("topicQueueBinding1")
    public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
    }
    @Bean("topicQueueBinding2")
    public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
    }
    @Bean("topicQueueBinding3")
    public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
    }
}

(3) Constant 包

Constants 类:

(用来配置常量 (交换机名称, 队列名称) )

package com.wang.rabbitmqspringdemo.constant;

public class Constants {
    public static final String WORK_QUEUE = "work.queue";

    //发布订阅模式
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    //路由模式
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    //通配符模式
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";
}

(4) controller 包

ProducerController 类:

(写生产者代码)

使用 rabbbitTemplate对象的 convertAndSend 方法来发送消息.

package com.wang.rabbitmqspringdemo.controller;


import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/work")
    public String work(){
        for (int i = 0; i < 10; i++) {
            //使用默认交换机, RoutingKey 和队列名称一致
            // 发送消息
            rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work..."+i);
        }
        return "发送成功";
    }
    @RequestMapping("/fanout")
    public String fanout(){
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");
        return "发送成功";
    }

    @RequestMapping("/direct/{routingKey}")
    public String direct(@PathVariable("routingKey") String routingKey){
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct, my routing key is "+routingKey);
        return "发送成功";
    }

    @RequestMapping("/topic/{routingKey}")
    public String topic(@PathVariable("routingKey") String routingKey){
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);
        return "发送成功";
    }
}

(5) listener 包
工作队列模式 (WorkListener 类)
package com.wang.rabbitmqspringdemo.listener;

import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkListener {

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener1(Message message, Channel channel){
        System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message + ",channel:"+channel);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener2(String message){
        System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
    }
}

广播模式 (FanoutListener 类)
package com.wang.rabbitmqspringdemo.listener;

import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListener {

    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列["+ Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);
    }
}

路由模式 (DirectListener 类)
package com.wang.rabbitmqspringdemo.listener;

import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectListener {

    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);
    }

    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);
    }
}

通配符模式 (TopicListener 类)
package com.wang.rabbitmqspringdemo.listener;


import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicListener {

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message);
    }

    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message);
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到