RabbitMQ ③-Spring使用RabbitMQ

发布于:2025-05-13 ⋅ 阅读:(16) ⋅ 点赞:(0)

在这里插入图片描述

Spring使用RabbitMQ

创建 Spring 项目后,引入依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml

spring:
  application:
    name: spring-rabbitmq-demo
  rabbitmq:
#    host: 47.94.9.33
#    port: 5672
#    username: admin
#    password: admin
#    virtual-host: /
    addresses: amqp://admin:admin@47.94.9.33:5672/

Work-Queue(工作队列模式)

声明队列

package com.ljh.mq.springrabbitmqdemo.config;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // * 工作队列模式
    @Bean
    public Queue workQueue() {
        return QueueBuilder.durable(Constants.WORK_QUEUE)
                .build();
    }
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/work")
    public String work() {
        for (int i = 0; i < 10; i++) {
            String msg = "hello work queue mode~ " + i;
            // ? 当使用默认交换机时,routingKey 和队列名称保持一致
            rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
        }
        log.info("消息发送成功");
        return "消息发送成功";
    }
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkListener {

    private static final Logger log = LoggerFactory.getLogger(WorkListener.class);

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void process1(Message message, Channel channel) {
        log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void process2(String message) {
        log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
    }
}

Publish/Subscribe(发布/订阅模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // * 发布订阅模式
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1 () {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1)
                .build();
    }
    @Bean("fanoutQueue2")
    public Queue fanoutQueue2 () {
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2)
                .build();
    }
    @Bean("bindingFanout1")
    public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange);
    }
    @Bean("bindingFanout2")
    public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange);
    }
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;
package com.ljh.mq.springrabbitmqdemo.controller;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/fanout")
    public String fanout() {
        for (int i = 0; i < 10; i++) {
            String msg = "hello publish fanout mode~ " + i;
            // ? 当使用默认交换机时,routingKey 和队列名称保持一致
            rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
        }
        log.info("消息发送成功");
        return "消息发送成功";
    }
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListener {

    private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);

    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void process1(String message) {
        log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    public void process2(String message) {
        log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);
    }
}

Routing(路由模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // * 路由模式
    @Bean("directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("directQueue1")
    public Queue directQueue1() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1)
                .build();
    }
    @Bean("directQueue2")
    public Queue directQueue2() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2)
                .build();
    }
    @Bean("bindingDirect1")
    public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("orange");
    }
    @Bean("bindingDirect2")
    public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("orange");
    }
    @Bean("bindingDirect3")
    public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("black");
    }
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/direct/{routingKey}")
    public String direct(@PathVariable("routingKey") String routingKey) {
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);
        log.info("消息发送成功:{}", routingKey);
        return "消息发送成功:" + routingKey;
    }
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectListener {
    private static final Logger log = LoggerFactory.getLogger(DirectListener.class);

    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void process1(String message) {
        log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);
    }

    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void process2(String message) {
        log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);
    }
}

Topics(通配符模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // * 通配符模式
    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1)
                .build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2)
                .build();
    }
    @Bean("bindingTopic1")
    public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("*.orange.*");
    }
    @Bean("bindingTopic2")
    public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("*.*.rabbit");
    }
    @Bean("bindingTopic3")
    public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("lazy.#");
    }
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/topic/{routingKey}")
    public String topic(@PathVariable("routingKey") String routingKey) {
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);
        log.info("消息发送成功:{}", routingKey);
        return "消息发送成功:" + routingKey;
    }
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;

import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicListener {
    private static final Logger log = LoggerFactory.getLogger(TopicListener.class);

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void process1(String message) {
        log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);
    }

    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void process2(String message) {
        log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);
    }
}