引言:
在业务系统中,消息中间件常用于实现流量削峰、异步通信等场景中发挥重要作用,通过消息监听机制可实现数据的异步获取与处理;而实际业务中常需根据特定场景(如系统维护、下游服务异常等)动态暂停或启动消息监听,对此即需要通过动态控制消费的启动和停止。以下操作方可实现该功能
一、引入依赖
引入springboot web以及rabbitmq的依赖
<!--springboot web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置application.yaml 配置文件
spring:
rabbitmq:
addresses: 1127.0.0.1:5672
username: admin
password: konne20211220@.+
virtual-host: /
三、创建消费服务
@Configuration
public class Consumer {
@Autowired
private RabbitTemplate rabbitTemplate;
//服务中运行的数量
public int serverRunSite;
Logger logger = LoggerFactory.getLogger(Consumer.class);
/**
* 消费rabbitmq 中 testConsumer 的数据,并设置手动确认消息
* testListener 是rabbitmq中的唯一id,不能重复
*/
@RabbitListener(id = "testListener", queues = "testConsumer", ackMode = "MANUAL")
public void receiveMessage(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {
logger.info("收到方案新增信息:{}", msg);
try {
channel.basicAck(tag,false);
} catch (Exception e) {
logger.error("消息确认失败,msg:{}",msg);
}
int limitSize = 10;
//服务限制的数量和服务当前运行的数量一致进行停止当前服务监听
if(limitSize == serverRunSite){
rabbitTemplate.convertAndSend("datasource_add", msg);
logger.warn("当前服务消费数量已达上限,数据扔回队列");
}else{
//处理下游业务
}
}
}
四、实现rabbitmq的消费者启动和停止
@RestController
public class UpdateStatusController {
//获取到rabbitmq的注册器
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
/**
* 更改状态
* @param status 0 开启,1关闭
*/
@GetMapping("updateStatus")
private void updateStatus(int status) {
if(status == 0){
this.startConsumption();
}else{
this.stopConsumption();
}
}
/**
* 通过rabbitListenerEndpointRegistry 根据唯一id获取container
* 关闭监听
*/
public void stopConsumption() {
AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");
container.stop();
}
/**
* 开启监听
*/
public void startConsumption() {
AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");
container.start();
}
}
按照以上步骤,方可实现通过接口控制rabbitmq中的消费者 启动 与 停止