Springboot整合rabbitMQ

发布于:2023-07-27 ⋅ 阅读:(82) ⋅ 点赞:(0)

目录

1.Springboot整合rabbitMQ

2 如何确保消息的可靠性

 2.1 保证消息从生产者到交换机

2.2 保证消息可以从交换机到队列

2.3 如何保证消息在队列

2.4 保证消费者可靠的消费消息

3 如何限制消费者消费消息的条数(消费端限流)

4 设置队列过期时间

4.1 创建队列时为队列设置过期时间(每条信息都是相同时间)

4.2 单独为消息设置过期时间(为某一个设置过期时间)

 5 死信队列

6 延迟队列

7 如何防止消息被重复消费

8 rabbitMQ的常见面试题


1.Springboot整合rabbitMQ

1.创建springboot项目

        //rabbitmq依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.创建两个子项目producer生产者服务    consumer消费者服务

3.生产者服务

①配置文件application.properties

server.port=7000

#rabbitMQ配置  rabbitMQ所在ip
spring.rabbitmq.host=192.168.1.88
# 都有默认值
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

②使用工具类RabbitTemplate发送消息到队列

package com.wzh.producer.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;


@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("saveOrder")
    public void saveOrder(){
        System.out.println("保存订单到数据库");
        Map<String,Object> map = new HashMap<>();
        map.put("orderId","110");
        map.put("price",2500);
        map.put("num",3);
        map.put("phone","15700085997");
        // 交换机  路由key
        rabbitTemplate.convertAndSend("topic_exchange","a.orange.b",map);
    }

}

③创建主启动类

package com.wzh.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class ProducerApp {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApp.class,args);
    }
}

④运行并访问7000端口 

然后rabbitmq客户端就可以查看到,这里都是提前创建好的队列,交换机等。 

4.消费者服务

①配置文件application.properties

server.port=7001

#rabbitMQ配置
spring.rabbitmq.host=192.168.1.88
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

②使用工具类RabbitTemplate发送消息到队列

package com.wzh.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
public class MyListener {
    //队列名
    @RabbitListener(queues = "topic_queue01")
    public void test(Map<String,Object> msg){
        System.out.println(msg);
        //进行相关的业务处理
    }

③创建主启动类

package com.wzh.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class ConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class,args);
    }
}

④运行主启动类

2 如何确保消息的可靠性

首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。

 2.1 保证消息从生产者到交换机

confirm确认机制

1. 手动开启确认机制(producer服务的配置文件)

        spring.rabbitmq.publisher-confirm-type=correlated
 2. 为rabbitTemplate设置确认回调函数

package com.wzh;

import com.wzh.producer.ProducerApp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @ProjectName: springboot-rabbitmq
 * @Package: com.wzh
 * @ClassName: TestRabbitMQ
 * @Author: 王振华
 * @Description:
 * @Date: 2022/9/20 19:56
 * @Version: 1.0
 */
@SpringBootTest(classes=ProducerApp.class)
public class TestRabbitMQ {
    //springboot集成了rabbitMQ提供了一个工具类,该类封装了消息的发送
    @Resource
    private RabbitTemplate rabbitTemplate;

    //测试确认机制
    @Test
    public void testConfirm(){
        //为rabbitTemplate设置确认回调函数
        //ConfirmCallback函数式接口   匿名内部类
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            //不管是否到达交换机  都会触发该方法
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b==false){
                    //根据自己的业务完成相应的代码
                    System.out.println("消息发送失败---订单回滚");
                }
            }
        });
        //这里用的是路由模式
        rabbitTemplate.convertAndSend("direct_exchange","info","hello springboot");
    }
}

查看消费者

package com.wzh.consumer.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @ProjectName: springboot-rabbitmq
 * @Package: com.wzh.consumer.rabbitmq
 * @ClassName: Consumer
 * @Author: 王振华
 * @Description:
 * @Date: 2022/9/20 15:27
 * @Version: 1.0
 */
@Component
public class Consumer {
    @RabbitListener(queues = "router_queue01")
    public void queue01(String s){
        System.out.println(s);
    }

    @RabbitListener(queues = "router_queue02")
    public void queue02(String s){
        System.out.println(s);
    }
}

消费者的参数必须与生产者的参数类型一致

2.2 保证消息可以从交换机到队列

returning机制: 如果消息无法到达队列,则会触发returning机制。如果能从交换机到队列则不会触发returning机制。

默认rabbitMQ不开启该机制。

①开启returning机制(生产者配置文件)

         spring.rabbitmq.publisher-returns=true

②为rabbitTemplate设置returning回调函数

/**
     * 1.开启returning机制 spring.rabbitmq.publisher-returns=true
     * 2.为rabbitTemplate设置returning回调函数
     */
    @Test
    public void testReturning(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            //该方法只有从交换机到队列失败时才会触发
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                //根据自己的业务完成相应的代码
                System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            }
        });
        //这里用的是路由模式
        rabbitTemplate.convertAndSend("direct_exchange","error","hello springboot2");
    }

 ③消费者监听结果

2.3 如何保证消息在队列

  1. 队列持久化---》创建的时候设置持久化

  2. 搭建rabbitmq集群--保证高可用

2.4 保证消费者可靠的消费消息

①修改为手动确认模式

改为手动确认(消费者服务配置文件)

        spring.rabbitmq.listener.simple.acknowledge-mode=manual

②当业务处理完毕后在确认消息给队列让其删除该消息

package com.wzh.consumer.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * @ProjectName: springboot-rabbitmq
 * @Package: com.wzh.consumer.rabbitmq
 * @ClassName: Consumer
 * @Author: 王振华
 * @Description:
 * @Date: 2022/9/20 15:27
 * @Version: 1.0
 */
@Component
public class Consumer {
    @RabbitListener(queues = "router_queue01")
    public void queue01(String s){
        System.out.println(s);
    }

    @RabbitListener(queues = "router_queue02")
    //Message message 封装了信息类, Channel channel 信道
    public void queue02(Message message, Channel channel){
        byte[] body = message.getBody();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("接受到消息"+new String(body));
        try {

            System.out.println("处理业务代码");
            //这里会因为硬件问题 或其他业务代码问题  导致处理不完  出现异常了 走catch
            System.out.println("业务处理完毕");
            //设置手动确认---队列会把该信息移除。
            //long deliveryTag,消息的标记
            // boolean multiple: 是否把之前没有确认的消息以前确认掉。
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            //1.让队列在给我发一次。
            //long deliveryTag, boolean multiple,
            // boolean requeue: 继续发给我 直接扔掉
            try {
                channel.basicNack(deliveryTag,true,true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }


    }
}

如何保证消息的可靠性。

  1. 设置confirm和returning机制

  2. 设置队列和交互机的持久化

  3. 搭建rabbitMQ服务集群

  4. 消费者改为手动确认机制。

3 如何限制消费者消费消息的条数(消费端限流)

如果队列里有10w条数据,消费者不管能不能消化,会直接把10w条消息都接受,会导致消费者服务垮掉

  1. 设置消费者消费消息的条数

  2. 消费者端必须为手动确认模式。

① 修改每次拉取消息的条数

 ②测试

 

package com.wzh.consumer.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * @ProjectName: springboot-rabbitmq
 * @Package: com.wzh.consumer.rabbitmq
 * @ClassName: Consumer
 * @Author: 王振华
 * @Description:
 * @Date: 2022/9/20 15:27
 * @Version: 1.0
 */
@Component
public class Consumer {
    @RabbitListener(queues = "test03")
    public void queue01(Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        System.out.println("消息的内容:"+new String(body));
        //手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }

    @RabbitListener(queues = "router_queue02")
    //Message message 封装了信息类, Channel channel 信道
    public void queue02(Message message, Channel channel){
        byte[] body = message.getBody();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("接受到消息"+new String(body));
        try {

            System.out.println("处理业务代码");
            //这里会因为硬件问题 或其他业务代码问题  导致处理不完  出现异常了 走catch
            System.out.println("业务处理完毕");
            //设置手动确认---队列会把该信息移除。
            //long deliveryTag,消息的标记
            // boolean multiple: 是否把之前没有确认的消息以前确认掉。
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            //1.让队列在给我发一次。
            //long deliveryTag, boolean multiple,
            // boolean requeue: 继续发给我 直接扔掉
            try {
                channel.basicNack(deliveryTag,true,true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }


    }
}

4 设置队列过期时间

TTL:time to live

可以为整个队列设置也可以单独为某条信息设置

4.1 创建队列时为队列设置过期时间(每条信息都是相同时间)

创建队列 

 

创建交换机

 点击test01进行 交换机与队列绑定

 测试

//测试过期时间
    @Test
    public void test01(){
        for(int i=0;i<10;i++) {
            if(i<5){
                //前五个  同时创建  后五个 每个间隔两秒 创建   过期时间也不一样 
                rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot"+i);
            }else {
                try{
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot" + i);
            }
        }
    }

4.2 单独为消息设置过期时间(为某一个设置过期时间)

当前消息队列为普通队列,没有在创建队列的时候设置过期时间

 @Test
    public void test02() {
        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend("test01", "aaa", "Hello Springboot",messagePostProcessor);
    }

 5 死信队列

 

创建普通队列

 创建死信队列

 

 创建普通交换机

创建死信交换机 

 队列绑定交换机

 

 死信交换机与死信队列绑定

 测试

    @Test
    public void test02() {
        for (int i = 0; i < 10; i++)
            rabbitTemplate.convertAndSend("pt_exchange", "dead", "Hello Springboot-------"+i);
        }
    }

6 延迟队列

 这里的判断订单状态 是因为 如果支付系统第29分分钟去支付,支付的比较慢,最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚,就会出错。

 7 如何防止消息被重复消费

 

8 rabbitMQ的常见面试题

1. 如何防止消息被重复消费

2.如何保证消息的可靠性

3.rabbitMQ消息积压过多

 

本文含有隐藏内容,请 开通VIP 后查看

网站公告


今日签到

点亮在社区的每一天
去签到