Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)

发布于:2025-07-28 ⋅ 阅读:(16) ⋅ 点赞:(0)

Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程

一、整体架构

二、实现步骤

步骤1:添加Maven依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

步骤2:配置Redis连接

# application.yml
spring:
  redis:
    host: localhost
    port: 6379
    lettuce:
      pool:
        max-active: 16
        max-idle: 8
# redisStream配置信息
app:
  redis:
    stream: app-events
    group: app-group
    consumer: consumer-${random.int(1000)}

步骤3:创建消费者组

@Configuration
public class RedisConfig {
    
    @Value("${app.redis.stream}")
    private String streamKey;
    
    @Value("${app.redis.group}")
    private String groupName;

    @Bean
    public void createConsumerGroup(StringRedisTemplate redisTemplate) {
        try {
            redisTemplate.opsForStream().createGroup(streamKey, groupName);
        } catch (Exception e) {
            System.out.println("消费者组已存在: " + groupName);
        }
    }
}

步骤4:配置消息监听容器

@Configuration
public class RedisConfig {
    
    // 配置消息监听线程池
    @Bean(name = "redisStreamTaskExecutor")
    public ThreadPoolTaskExecutor redisStreamTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setThreadNamePrefix("redis-stream-");
        return executor;
    }
    
    // 创建消息监听容器
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(
            RedisConnectionFactory factory,
            @Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainerOptions
                    .builder()
                    .pollTimeout(Duration.ofSeconds(1))
                    .executor(executor)
                    .batchSize(10)
                    .build();
        
        return StreamMessageListenerContainer.create(factory, options);
    }
}

步骤5:注册消息监听器

@Component
public class StreamListenerRegistrar {
    
    @Value("${app.redis.stream}")
    private String streamKey;
    
    @Value("${app.redis.group}")
    private String groupName;
    
    @Value("${app.redis.consumer}")
    private String consumerName;
    
    @PostConstruct
    public void registerListener(StreamMessageListenerContainer container, 
                                RedisMessageProcessor processor) {
        
        StreamReadRequest<String> readRequest = 
            StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
                .consumer(Consumer.from(groupName, consumerName))
                .autoAcknowledge(false) // 手动ACK
                .build();
        
        container.register(readRequest, processor);
    }
}

步骤6:实现消息处理器

@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {
    
    @Override
    public void onMessage(MapRecord<String, String, String> record) {
        CompletableFuture.runAsync(() -> {
            try {
                // 业务处理逻辑
                processBusiness(record);
                
                // 处理成功发送ACK
                redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
            } catch (Exception e) {
                // 失败消息进入Pending List
            }
        });
    }
    
    private void processBusiness(MapRecord<String, String, String> record) throws Exception {
        String eventType = record.getValue().get("eventType");
        String payload = record.getValue().get("payload");
        
        // 根据事件类型处理
        switch (eventType) {
            case "ORDER_CREATED": handleOrder(payload); break;
            case "PAYMENT_PROCESSED": handlePayment(payload); break;
        }
    }
}

步骤7:实现Pending消息处理器

@Component
@Slf4j
public class PendingMessageProcessor {
    
    @Value("${app.redis.stream}")
    private String streamKey;
    
    @Value("${app.redis.group}")
    private String groupName;
    
    @Value("${app.redis.consumer}")
    private String consumerName;
    
    // 每分钟处理一次Pending消息
    @Scheduled(fixedRate = 60000)
    public void processPendingMessages() {
        // 1. 查询Pending消息
        PendingMessages pending = redisTemplate.opsForStream()
                .pending(streamKey, groupName, Range.unbounded(), 100);
        
        pending.forEach(this::handlePendingMessage);
    }
    
    private void handlePendingMessage(PendingMessage pending) {
        try {
            // 2. 重新认领消息
            List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
                    .claim(streamKey, 
                           Consumer.from(groupName, consumerName), 
                           Duration.ofSeconds(30), 
                           pending.getId());
            
            if (!records.isEmpty()) {
                MapRecord<String, String, String> record = records.get(0);
                
                // 3. 重试处理
                messageProcessor.processBusiness(record);
                
                // 4. 处理成功发送ACK
                redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
            }
        } catch (Exception e) {
            // 5. 超过重试次数移入死信队列
            if (pending.getTotalDeliveryCount() > 3) {
                moveToDeadLetterQueue(pending);
            }
        }
    }
    
    private void moveToDeadLetterQueue(PendingMessage pending) {
        // 获取消息内容
        List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
                .range(streamKey, Range.from(pending.getId()));
        
        if (!records.isEmpty()) {
            // 添加到死信队列
            redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());
            
            // 确认原始消息
            redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());
        }
    }
}

步骤8:实现消息生产者

@Service
public class RedisMessageProducer {
    
    @Value("${app.redis.stream}")
    private String streamKey;
    
    public String sendMessage(String eventType, String payload) {
        Map<String, String> message = Map.of(
            "eventType", eventType,
            "payload", payload,
            "timestamp", String.valueOf(System.currentTimeMillis())
        );
        
        return redisTemplate.opsForStream()
                .add(streamKey, message)
                .getValue();
    }
}

步骤9:创建REST接口

@RestController
@RequestMapping("/messages")
public class MessageController {
    
    private final RedisMessageProducer producer;
    
    @PostMapping
    public String sendMessage(@RequestBody MessageRequest request) {
        return producer.sendMessage(request.getEventType(), request.getPayload());
    }
    
    @Data
    public static class MessageRequest {
        private String eventType;
        private String payload;
    }
}

三、消息生命周期流程图

1. 正常消息处理流程

2. Pending消息处理流程

 

3. ACK机制工作原理

四、生产环境建议

  1. 消费者命名策略

    @Value("${app.redis.consumer}")
    private String consumerName;
    
    // 在应用启动时设置
    @PostConstruct
    public void initConsumerName() {
        String hostName = InetAddress.getLocalHost().getHostName();
        String port = environment.getProperty("server.port");
        consumerName = "consumer-" + hostName + "-" + port;
    }
  2. 动态配置重试策略

    app:
      pending:
        max_retry: 5
        retry_interval: 30000 # 30秒
  3. 死信队列监控

    @Scheduled(fixedRate = 3600000) // 每小时检查一次
    public void checkDeadLetterQueue() {
        Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);
        if (size > 0) {
            alertService.sendAlert("死信队列有 " + size + " 条未处理消息");
        }
    }
  4. 消息TTL设置

    // 发送消息时设置最大长度
    public String sendMessage(String eventType, String payload) {
        MapRecord<String, String, String> record = ...;
        return redisTemplate.opsForStream()
                .add(Record.of(record).withMaxLen(10000).approximate(true));
    }

六、总结

本文详细介绍了Spring Boot整合Redis实现发布/订阅功能并添加ACK机制的完整方案:

  1. 事件驱动架构:使用Redis Stream监听器实现真正的发布/订阅模式

  2. 可靠ACK机制:通过手动ACK确认确保消息可靠处理

  3. 自动恢复系统:Pending消息处理器自动处理失败消息

  4. 死信队列:隔离无法处理的消息,防止系统阻塞

  5. 生产就绪:包含多实例部署、动态配置、监控告警等生产级特性

该方案适用于需要高可靠性消息传递的场景,如订单处理、支付系统、事件溯源等,在保证系统吞吐量的同时提供了消息可靠性保障。


网站公告

今日签到

点亮在社区的每一天
去签到