Spring使用RabbitMQ
创建 Spring 项目后,引入依赖:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件 application.yml
:
spring:
application:
name: spring-rabbitmq-demo
rabbitmq:
# host: 47.94.9.33
# port: 5672
# username: admin
# password: admin
# virtual-host: /
addresses: amqp://admin:admin@47.94.9.33:5672/
Work-Queue(工作队列模式)
声明队列
package com.ljh.mq.springrabbitmqdemo.config;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// * 工作队列模式
@Bean
public Queue workQueue() {
return QueueBuilder.durable(Constants.WORK_QUEUE)
.build();
}
}
生产者
package com.ljh.mq.springrabbitmqdemo.controller;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
for (int i = 0; i < 10; i++) {
String msg = "hello work queue mode~ " + i;
// ? 当使用默认交换机时,routingKey 和队列名称保持一致
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
}
log.info("消息发送成功");
return "消息发送成功";
}
}
消费者
package com.ljh.mq.springrabbitmqdemo.listener;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
private static final Logger log = LoggerFactory.getLogger(WorkListener.class);
@RabbitListener(queues = Constants.WORK_QUEUE)
public void process1(Message message, Channel channel) {
log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void process2(String message) {
log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
}
}
Publish/Subscribe(发布/订阅模式)
声明队列和交换机
package com.ljh.mq.springrabbitmqdemo.config;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// * 发布订阅模式
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE)
.durable(true)
.build();
}
@Bean("fanoutQueue1")
public Queue fanoutQueue1 () {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1)
.build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2 () {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2)
.build();
}
@Bean("bindingFanout1")
public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange);
}
@Bean("bindingFanout2")
public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange);
}
}
生产者
package com.ljh.mq.springrabbitmqdemo.controller;
package com.ljh.mq.springrabbitmqdemo.controller;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/fanout")
public String fanout() {
for (int i = 0; i < 10; i++) {
String msg = "hello publish fanout mode~ " + i;
// ? 当使用默认交换机时,routingKey 和队列名称保持一致
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
}
log.info("消息发送成功");
return "消息发送成功";
}
}
消费者
package com.ljh.mq.springrabbitmqdemo.listener;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void process1(String message) {
log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void process2(String message) {
log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);
}
}
Routing(路由模式)
声明队列和交换机
package com.ljh.mq.springrabbitmqdemo.config;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// * 路由模式
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE)
.durable(true)
.build();
}
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE1)
.build();
}
@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE2)
.build();
}
@Bean("bindingDirect1")
public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("orange");
}
@Bean("bindingDirect2")
public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("orange");
}
@Bean("bindingDirect3")
public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("black");
}
}
生产者
package com.ljh.mq.springrabbitmqdemo.controller;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);
log.info("消息发送成功:{}", routingKey);
return "消息发送成功:" + routingKey;
}
}
消费者
package com.ljh.mq.springrabbitmqdemo.listener;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener {
private static final Logger log = LoggerFactory.getLogger(DirectListener.class);
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void process1(String message) {
log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void process2(String message) {
log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);
}
}
Topics(通配符模式)
声明队列和交换机
package com.ljh.mq.springrabbitmqdemo.config;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// * 通配符模式
@Bean("topicExchange")
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE)
.durable(true)
.build();
}
@Bean("topicQueue1")
public Queue topicQueue1() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE1)
.build();
}
@Bean("topicQueue2")
public Queue topicQueue2() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE2)
.build();
}
@Bean("bindingTopic1")
public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("*.orange.*");
}
@Bean("bindingTopic2")
public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("*.*.rabbit");
}
@Bean("bindingTopic3")
public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("lazy.#");
}
}
生产者
package com.ljh.mq.springrabbitmqdemo.controller;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);
log.info("消息发送成功:{}", routingKey);
return "消息发送成功:" + routingKey;
}
}
消费者
package com.ljh.mq.springrabbitmqdemo.listener;
import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicListener {
private static final Logger log = LoggerFactory.getLogger(TopicListener.class);
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void process1(String message) {
log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void process2(String message) {
log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);
}
}