使用reactor-rabbitmq库监听Rabbitmq

发布于:2025-07-06 ⋅ 阅读:(18) ⋅ 点赞:(0)

Reactor RabbitMQ 简介

Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。

维度 AMQP-Client Reactor RabbitMQ Spring Boot Starter AMQP
编程模型 命令式、手动管理 响应式、非阻塞 声明式、自动配置
框架依赖 Reactor Spring Boot
适用场景 轻量级/非 Spring 项目 响应式微服务 Spring Boot 企业应用
资源管理 手动 自动 自动
功能丰富度 基础协议操作 背压、高并发优化 事务、确认、死信队列等
学习曲线 中等(需理解 AMQP) 高(需掌握 Reactor) 低(Spring 生态友好)

Reactor RabbitMQ核心特性

  • 响应式流支持:基于 Reactor 的 FluxMono 实现消息的发布与订阅。
  • 背压管理:自动处理消费者与生产者之间的速率匹配。
  • 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
  • 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。

使用方法

添加依赖

在 Maven 项目中添加以下依赖:

<dependency>  
    <groupId>io.projectreactor.rabbitmq</groupId>  
    <artifactId>reactor-rabbitmq</artifactId>  
    <version>1.5.6</version>  
</dependency>  
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
Sender sender = RabbitFlux.createSender(  
    Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
Receiver receiver = RabbitFlux.createReceiver(  
    Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
发送消息
sender.send(  
    Flux.just(new OutboundMessage(  
        "exchange-name",  
        "routing-key",  
        "Hello RabbitMQ".getBytes()  
    ))  
).subscribe();  
接收消息
receiver.consumeAutoAck("queue-name")  
    .map(delivery -> new String(delivery.getBody()))  
    .subscribe(System.out::println);  

高级配置

消息确认模式

支持自动确认(autoAck)和手动确认(manualAck):

receiver.consumeManualAck("queue-name")  
    .delayUntil(delivery ->  
        delivery.ack()  
            .thenReturn(delivery.getBody())  
    )  
    .subscribe();  
错误处理

通过 Reactor 的 onError 机制处理异常:

sender.send(messages)  
    .doOnError(e -> System.err.println("Send failed: " + e))  
    .retry(3)  
    .subscribe();  

集群监听(自动ACK)
 // 1. 配置集群连接
        ReceiverOptions receiverOptions = new ReceiverOptions()
                .connectionFactory(new ConnectionFactory() {{
                    setUsername("guest");
                    setPassword("guest");
                }})
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[]{
                                new Address("localhost", 5672),
                                new Address("localhost", 5673),
                                new Address("localhost", 5674)
                        },
                        "reactive-cluster"
                ));

        // 2. 创建 Receiver
        Receiver receiver = RabbitFlux.createReceiver(receiverOptions);


        // 监听队列(自动负载均衡)
        receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建)
                .subscribe(
                        delivery -> {
                            String message = new String(delivery.getBody());
                            System.out.println("收到消息: " + message);
                        },
                        error -> System.err.println("监听错误: " + error)
                );
 // 保持程序运行
        Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接
        ReceiverOptions receiverOptions = new ReceiverOptions()
                .connectionFactory(new ConnectionFactory() {{
                    setUsername("guest");
                    setPassword("guest");
                }})
                .connectionSupplier(cf -> cf.newConnection(
                        new Address[]{
                                new Address("localhost", 5672),
                                new Address("localhost", 5673),
                                new Address("localhost", 5674)
                        },
                        "reactive-cluster"
                ));

        // 2. 创建 Receiver
        Receiver receiver = RabbitFlux.createReceiver(receiverOptions);

        // 消费消息并手动ACK
        receiver.consumeManualAck("queue1")
                .flatMap(delivery -> {
                    try {
                        String message = new String(delivery.getBody());
                        log.info("received message:" + message);
                        // 业务逻辑处理...
                        boolean success = false;
                        int i = RandomUtil.randomInt();
                        if (i % 2 == 0) {
                            success = true;
                        }

                        if (success) {
                            log.info("ack success");
                            // 处理成功,手动ACK
                            return Mono.fromRunnable(() -> delivery.ack())
                                    .thenReturn("ACK");
                        } else {
                            log.info("ack fail");
                            // 处理失败,手动NACK(可选择重试或丢弃)
                            return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队
                                    .thenReturn("NACK");
                        }
                    } catch (Exception e) {
                        // 异常情况,NACK并可选择重试
                        delivery.nack(true); // true表示重新入队
                        return Mono.error(e);
                    }
                })
                .subscribe(
                        result -> log.info("Message processed:" + result),
                        error -> log.info("Error:" + error)
                );

        // 保持程序运行
        Mono.never().block();

性能优化建议

  • 连接复用:避免频繁创建/关闭连接,使用 Mono 缓存连接。
  • 批量发送:通过 Flux.buffer() 合并多条消息后一次性发送。
  • 线程池调优:自定义 Scheduler 以匹配业务场景的并发需求。

适用场景

  • 微服务间的异步通信。
  • 事件驱动的数据处理流水线。
  • 需要高吞吐量和低延迟的消息系统。

如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。