SpringBoot整合RabbitMQ

发布于:2024-04-10 ⋅ 阅读:(147) ⋅ 点赞:(0)

一、流程图概括

 二、引入依赖

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

三、配置RabbitMQ连接

application.propertiesapplication.yml中配置RabbitMQ服务器的连接参数:

 1、DirectExchange(直连交换机)

3.1、消费者

package com.yy.consumer;


import com.yy.model.Ordering;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConsumer {
    //启动时注册队列和交换机,启动多次为什么不报错?启动的时候
    // ,会根据这个队列或者交换机的名称先去查找有没有这个队列或者交换机,如果有什么都不做,如果没有创建一个新的新的队列和交换机

    //注册一个队列
    @Bean
    public Queue directQueue() {
        return QueueBuilder.durable("direct_01").build();
    }
    //注册交换机
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("direct_exchange").build();
    }
    //绑定队列和交换机
    @Bean
    public Binding bindDirect(Queue directQueue, DirectExchange directExchange) {
         return BindingBuilder.bind(directQueue).to(directExchange).with("123");
    }
    
//监听消息(string类型)消费者
//    @RabbitListener(queues = "direct_01")
//    public void receiveMessage(String message) {
//            System.out.println("消费者1接收到消息:" + message);
//        }


    //监听消息(Ordering对象类型)
    //@RabbitListener(queues = "direct_01")
    public void receiveMessage(Ordering ordering) {
        System.out.println("消费者1接收到消息:" + ordering);
    }
}

3.2、生产者

package com.yy.provider;

import com.yy.model.Ordering;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DirectProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息
    public void send(Object msg) {
        rabbitTemplate.convertAndSend("direct_exchange", "123", msg);
    }
//    //发送对象类型消息
//    public void send2(Ordering ordering) {
//        rabbitTemplate.convertAndSend("direct_exchange", "123", ordering);
//    }
}

 3.3、发送例ordering对象类型消息

ordering对象

package com.yy.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Queue;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Ordering implements Serializable {
    private int id;
    private String name;
}

 需要添加配置类,将对象转换成字符串

package com.yy.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {

    @Bean //将对象转换成字符串
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 测试类

package com.yy;

import cn.hutool.core.thread.ThreadUtil;
import com.yy.model.Ordering;
import com.yy.provider.DirectProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
class RabbitMqAppTests {
    @Autowired
    private DirectProvider directProvider;

    //测试string类型
    @Test
    void test() throws IOException {
//        1
       // directProvider.send("123456789101111");
       // ThreadUtil.safeSleep(5000);
//      2
        for(int i = 0; i < 10; i++){
            directProvider.send("你好呀"+i);
            System.out.println("发送成功"+i);
            ThreadUtil.safeSleep(1000);
        }
        System.in.read();
    }

    //测试Ordering对象类型
    @Test
    void test1() throws IOException {
        for(int i = 0; i < 10; i++){
            Ordering ordering = Ordering.builder().id(i).name("yy" + i).build();
            directProvider.send(ordering);
            System.out.println("发送成功"+i);
            ThreadUtil.safeSleep(1000);
        }
        System.in.read();
    }
}

 

3.4 一个交换机对多个队列的特点:

3.5 一个队列对多个消费者特点:

 代码:

消费者2(DirectConsumer2)

package com.yy.consumer;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//@Configuration
public class DirectConsumer2 {
    //启动时注册队列和交换机,启动多次为什么不报错?启动的时候
    // ,会根据这个队列或者交换机的名称先去查找有没有这个队列或者交换机,如果有什么都不做,如果没有创建一个新的新的队列和交换机

    //注册一个队列
    @Bean
    public Queue directQueue2() {
        return QueueBuilder.durable("direct_02").build();
    }
    //注册交换机
    //@Bean
   // public DirectExchange directExchange() {
        //return ExchangeBuilder.directExchange("direct_exchange").build();
   // }
    //绑定队列和交换机
    @Bean
    public Binding bindDirect2(Queue directQueue2, DirectExchange directExchange) {
         return BindingBuilder.bind(directQueue2).to(directExchange).with("123");
    }
    //监听消息
    @RabbitListener(queues = "direct_02")
    public void receiveMessage(String message) {
            System.out.println("direct_01接收到消息:" + message);
        }
}

消费者3(DirectConsumer3)

package com.yy.consumer;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//@Configuration
public class DirectConsumer3 {


    //监听消息
    @RabbitListener(queues = "direct_01")
    public void receiveMessage(String message) {
            System.out.println("消费者2接收到消息:" + message);
        }
}

 2、FanoutExchange(广播交换机)

2.1消费者

package com.yy.consumer;


import com.yy.model.Ordering;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConsumer {
    //启动时注册队列和交换机,启动多次为什么不报错?启动的时候
    // ,会根据这个队列或者交换机的名称先去查找有没有这个队列或者交换机,如果有什么都不做,如果没有创建一个新的新的队列和交换机

    //注册一个队列
    @Bean
    public Queue fanoutQueue() {
        return QueueBuilder.durable("fanout01").build();
    }
    //注册交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanoutExchange").build();
    }
    //绑定队列和交换机
    @Bean
    public Binding bindFanout(Queue fanoutQueue, FanoutExchange fanoutExchange) {
         return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
    //监听消息
    @RabbitListener(queues = "fanout01")
    public void receiveMessage(Ordering message) {
            System.out.println("消费者1接收到消息:" + message);
        }
    //监听消息,定义两个消费者同时监听同一队列消息
    @RabbitListener(queues = "fanout01")
    public void receiveMessage1(Ordering message) {
        System.out.println("消费者2接收到消息:" + message);
    }

}

2.2生产者

package com.yy.provider;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class FanoutProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息
    public void send(Object msg) {
        rabbitTemplate.convertAndSend("fanoutExchange", "", msg);
    }
}

 3.3测试

 //广播模式
    @Test
    void test3() throws IOException {
        for(int i = 0; i < 10; i++){
            Ordering ordering = Ordering.builder().id(i).name("yy" + i).build();
            fanoutProvider.send(ordering);
            System.out.println("发送成功"+i);
            ThreadUtil.safeSleep(1000);
        }
        System.in.read();
    }

4、TopicExchange

4.1 消费者

package com.yy.consumer;

import com.yy.model.Ordering;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConsumer {

    @Bean
    public TopicExchange topicExchange(){
        return  ExchangeBuilder.topicExchange("Topic_E01").build();
    }
    @Bean
    public Queue topicQueue1(){
        return   QueueBuilder.durable("小龙").build();
    }
    @Bean
    public Queue topicQueue2(){
        return   QueueBuilder.durable("海洋").build();
    }
    @Bean
    public Queue topicQueue3(){
        return   QueueBuilder.durable("文超").build();
    }
    //注册交换机
    @Bean //交换机与队列关系
    public Binding TopicBinding1(Queue topicQueue1, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#");
    }
    @Bean //交换机与队列关系
    public Binding TopicBinding2(Queue topicQueue2,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("1.6.*");
    }
    @Bean //交换机与队列关系
    public Binding TopicBinding3(Queue topicQueue3,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue3).to(topicExchange).with("1.8.*");
    }
    @RabbitListener(queues = "小龙")
    public void receiveMessage(Ordering msg){
        System.out.println("小龙 收到消息:"+msg);
    }
    @RabbitListener(queues = "海洋")
    public void receiveMessage2(Ordering msg){
        System.out.println("海洋 收到消息:"+msg);
    }
    @RabbitListener(queues = "文超")
    public void receiveMessage3(Ordering msg){
        System.out.println("文超 收到消息:"+msg);
    }
}

4.2生产者

package com.yy.provider;

import com.yy.model.Girl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TopicProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Girl girl) {
        rabbitTemplate.convertAndSend("Topic_E01",girl.getHeight(), girl);
    }
}

 测试:

@Autowired
    private TopicProvider topicProvider;

    @Test
    void test2() throws IOException {

        Girl girl = Girl.builder().id(100).name("小红").height("1.6.8").build();
        topicProvider.send(girl);

        System.in.read();
    }

四、问题:

1、我们在启动时每次注册队列和交换机,启动多次为什么不报错?

答:启动的时候 ,会根据这个队列或者交换机的名称先去查找有没有这个队列或者交换机,如果有什么都不做,如果没有创建一个新的新的队列和交换机

2、交换机对应的是队列,队列对应的是消费者

直流交换机发送消息给队列,对应一个队列是给一个队列发送全部消息

                                                     对应两个队列就是复制一份给每个队列都发送全部消息

          一个队列如果有一个消费者就是发送全部消息

                        如果有多个消费者来消费该队列就是平均分发全部消息到消费者

广播交换机也是一样,一个多个消费者就是平均分发全部消息到多个消费者


网站公告

今日签到

点亮在社区的每一天
去签到