RabbitMQ

发布于:2025-05-29 ⋅ 阅读:(26) ⋅ 点赞:(0)

1.HelloWorld案例

简单队列

不使用SpringBoot  给MQ发送简单消息

package com.hrui.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hrui
 * @date 2025/5/28 1:36
 */
public class RabbitMQSendMessage {

    public static void main(String[] args) {
        //1.建立连接
        ConnectionFactory factory=new ConnectionFactory();
        //1.1.设置连接参数 分别是:主机名host  端口port 用户名username 密码password 虚拟机vhost
        factory.setHost("xx.xxx.xxx.xx");
        factory.setPort(5672);
        factory.setUsername("xxxx");
        factory.setPassword("xxxxx");
        factory.setVirtualHost("/");


        Connection connection =null;
        Channel channel =null;
        try {
            //1.2.建立连接
            connection = factory.newConnection();

            //2.创建频道(通道)Channel
            channel = connection.createChannel();
            //3.创建队列
            String queueName="simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);

            //4.发送消息
            String message = "hello world";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送成功");

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (RuntimeException | IOException | TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

    }
}

简单消息不需要使用交换机

简单消息消费者

package com.hrui.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hrui
 * @date 2025/5/28 1:51
 */
public class RabbitMQReceiveMessage {
    public static void main(String[] args) {
        //1.建立连接
        ConnectionFactory factory=new ConnectionFactory();
        //1.1.设置连接参数 分别是:主机名host  端口port 用户名username 密码password 虚拟机vhost
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(5672);
        factory.setUsername("xxxx");
        factory.setPassword("xxxxx");
        factory.setVirtualHost("/");
        //1.2.建立连接
        Connection connection =null;
        Channel channel =null;
        try {
            connection = factory.newConnection();
            //2.创建频道(通道)Channel
            channel = connection.createChannel();

            //3.创建队列
            String queueName="simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
            //4.接收消息(订阅该队列中的消息)
            //注意输出的时候是 等待接收消息..... 先输出  说明已经开启了异步
            channel.basicConsume(queueName, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到的消息:"+new String(body));
                }
            });
            System.out.println("等待接收消息.....");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

    }
}

上面的简单消息发送其实用的是直连交换机  也就是默认交换机

2.使用SpringAMQP

简化消息发送和接收的API

application.properties中的配置

这里生产者和消费者在一起

spring.rabbitmq.host=192.168.1.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=登录账号
spring.rabbitmq.password=登录密码
spring.rabbitmq.virtual-host=/

2.1.Basic Queue简单队列模型

简单队列消息接收者

2.2.Work Queue工作队列模型

这里先把简单队列的代码注释掉

如果你消费者和生产者分开两个应用  就会出现一人一半消费的现象

我这里因为引用启动前就已经往MQ发送了消息

哪个消费者的@Rabbitlistner 先注册,就可能全部抢完

分开应用会看到一人一半的现象

工作模式其实也是简单模式

简单模式和工作队列模式 都不允许 同一条消息给每个消费者消费

事实上简单队列模式和工作队列模式用的是默认交换机""

MQ的工作流程是

生产者 → 交换机 → 队列 → 消费者
 

发布订阅模式允许同一消息发送给多个消费者.实现方式是加入了exchange(交换机)

2.3.发布、订阅模型-Fanout-扇形交换机 真正广播

Fanout是交换机的一种类型         Fanout交换机常用于广播

交换机只用于消息转发

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的Queue

上面的简单队列是因为我们开始写代码时候就声明了Queue

工作队列也是通过@Bean的方式声明了

那么发布、订阅模式可以自己在web页面中配置交换机和队列

也可以应用内声明  注意注意注意:交换机  队列的声明需要在消费者端声明  这也是下面出现问题的原因  配置和

@RabbitListener 要在同一应用中  才会去声明

把工作队列代码注释,先启动下

package com.hrui.rabbitmq.fanoutExchange;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author hrui
 * @date 2025/5/28 14:52
 */
@Configuration
public class FanoutExchangeTest {

    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hrui.fanout");
    }
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    //交换机和队列1绑定
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    //交换机和队列2绑定
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

启动了  但是web页面上并没有看到 声明的交换机和队列

当我定义了两个消费者  启动之后才看到

package com.hrui.rabbitmq.fanoutExchange;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author hrui
 * @date 2025/5/28 15:21
 */
@Component
public class FanoutExchangeListener {
    @RabbitListener(queues = "fanout.queue1")
    public void receive1(String msg) {
        System.out.println("queue1 收到消息:" + msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void receive2(String msg) {
        System.out.println("queue2 收到消息:" + msg);
    }
}

 

发消息

package com.hrui.rabbitmq.fanoutExchange;

import jakarta.annotation.PostConstruct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author hrui
 * @date 2025/5/28 14:52
 */
@Configuration
public class FanoutExchangeTest {

    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hrui.fanout");
    }
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    //交换机和队列1绑定
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    //交换机和队列2绑定
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() throws InterruptedException {
        //第一个参数是交换机   第二个参数是路由键(这里先用""空字符串)  第三个参数是消息
        rabbitTemplate.convertAndSend("hrui.fanout","","hello fanout");
    }
}

接收消息的上面已经写了

运行结果 

2.4.发布、订阅模型-Direct  条件广播

Direct  按一定规则  交换机按一定队则将对应消息发送到对应(指定)的队列  

说明Direct 也是可以适配Fanout  比Fanout更加灵活

上面是在消费者中声明交换机  声明队列的方式

其实还有另外一种更加直接的方式就是用

@RabbitListener注解来接收消息同时来声明交换机   队列   路由Key 就是 Echange  Queue RoutingKey

package com.hrui.rabbitmq.directExchange;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author hrui
 * @date 2025/5/28 15:53
 */
@Component
public class DirectExchangeListener {


    // 消费者1监听 direct.queue1,绑定 routingKey = red, blue
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "hrui.queue3"),
            exchange = @Exchange(name = "hrui.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者1接收到Direct消息: [" + msg + "]");
    }

    // 消费者2监听 direct.queue2,绑定 routingKey = red, yellow
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "hrui.queue4"),
            exchange = @Exchange(name = "hrui.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者2接收到Direct消息: [" + msg + "]");
    }
}

都可以接收到red   1只能接收到blue   2只能接收到 yellow

发送消息

package com.hrui.rabbitmq.directExchange;

import jakarta.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author hrui
 * @date 2025/5/28 16:44
 */
@RestController
public class SendDirectExchange {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("hrui.direct", "red", "direct exchange msg red"+i);
            rabbitTemplate.convertAndSend("hrui.direct", "blue", "direct exchange msg blue"+i);
            rabbitTemplate.convertAndSend("hrui.direct", "yellow", "direct exchange msg yellow"+i);
        }
    }
}
2.5.发布、订阅模型-Topic  条件广播

Topic和Direct  条件广播非常相似  只不过上面 Direct是用的例如  red blue 

而Topic要求必须多个单词  例如  china.bews      jspan.news

2.6.消息转换器  

消息接收时候

返送时候用的Map接收时候也用Map

发送时候用的Map接收时候也用Map那么 发送时候用List 接收用List


网站公告

今日签到

点亮在社区的每一天
去签到