Redis 发布订阅模式详解:实现高效实时消息通信

发布于:2025-09-04 ⋅ 阅读:(18) ⋅ 点赞:(0)

概述

Redis 发布订阅(Pub/Sub)是一种消息通信模式,发送者(发布者)将消息发送到频道,而接收者(订阅者)可以接收它们。这种模式实现了消息的生产者和消费者之间的完全解耦,是构建实时消息系统的理想选择。

核心概念

1. 频道(Channel)

消息传输的通道,发布者向频道发送消息,订阅者从频道接收消息。

2. 发布者(Publisher)

向频道发送消息的客户端。

3. 订阅者(Subscriber)

订阅频道并接收消息的客户端。

4. 模式订阅(Pattern Subscription)

使用通配符订阅多个匹配的频道。

基本命令

发布消息

PUBLISH channel message
  • 将消息发送到指定频道

  • 返回接收到消息的订阅者数量

订阅频道

SUBSCRIBE channel1 channel2 ...
  • 订阅一个或多个频道

  • 进入订阅状态,阻塞等待消息

取消订阅

UNSUBSCRIBE [channel1 channel2 ...]
  • 取消订阅指定频道

  • 不指定参数则取消所有订阅

模式订阅

PSUBSCRIBE pattern1 pattern2 ...
  • 使用通配符订阅多个频道

  • 支持 *(匹配任意字符)和 ?(匹配单个字符)

查看订阅信息

PUBSUB CHANNELS [pattern]  # 查看活跃频道
PUBSUB NUMSUB [channel...] # 查看指定频道的订阅数
PUBSUB NUMPAT              # 查看模式订阅的数量

Spring Boot 中的实现

1. 配置 Redis 连接

# application.yml
spring:
  redis:
    host: localhost
    port: 6379
    password: 
    database: 0

2. Redis 配置类

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
    
    @Bean
    public ChannelTopic smsTopic() {
        return new ChannelTopic("sms:user:accept:query");
    }
}

3. 消息发布者

@Component
public class MessagePublisher {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ChannelTopic smsTopic;
    
    /**
     * 向指定频道发布消息
     */
    public void publish(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
    
    /**
     * 向预设频道发布消息
     */
    public void publishSmsMessage(String userId) {
        redisTemplate.convertAndSend(smsTopic.getTopic(), userId);
        log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);
    }
}

要理解 MessagePublisher 中预设频道指定频道的区别,核心是从「频道的定义时机」「使用场景」「灵活性与维护性」三个维度切入 —— 两者本质都是基于 Redis 发布订阅(Pub/Sub)机制传递消息,但在「频道值如何确定」和「适用场景」上完全不同。以下结合代码和 Redis Pub/Sub 原理详细讲解:

一、先明确基础:Redis 发布订阅(Pub/Sub)的核心逻辑

在分析两者区别前,需先回顾 Redis Pub/Sub 的基本概念:

  • 频道(Channel):消息的 “通道”,类似广播电台的 “频率”,发布者(Publisher)向某个频道发消息,所有订阅该频道的消费者(Subscriber)都会收到消息。
  • 发布者:调用 PUBLISH channel message 命令向频道发消息(对应代码中 redisTemplate.convertAndSend(...))。
  • 消费者:调用 SUBSCRIBE channel 命令订阅频道,实时接收该频道的消息。

MessagePublisher 的两个方法,本质都是封装了 Redis 的 PUBLISH 操作,区别仅在于「频道(channel)的值从哪来」。

二、指定频道(publish 方法):动态传入频道,灵活度高

/**
 * 向指定频道发布消息
 */
public void publish(String channel, Object message) {
    redisTemplate.convertAndSend(channel, message);
}
 频道的定义方式:调用时动态传入
  • 「指定频道」的核心是:频道的名称(如 channel 参数的值)不是预先固定的,而是在调用 publish 方法时,由调用者根据业务需求动态传入
  • 示例:若业务需要向 “用户 123 的短信通知频道” 和 “订单 456 的状态通知频道” 发消息,调用方式如下
    // 向“sms_notify_user_123”频道发消息(用户123的短信通知)
    messagePublisher.publish("sms_notify_user_123", "您有一条新短信");
    // 向“order_status_456”频道发消息(订单456的状态更新)
    messagePublisher.publish("order_status_456", "订单已发货");
  • 这里的 sms_notify_user_123 和 order_status_456 都是 “指定频道”—— 调用时才确定具体值。
2. 核心特点
特点 说明
灵活性极高 支持任意频道名称,可根据业务场景动态生成(如拼接用户 ID、订单 ID)
无预设依赖 不需要提前定义频道,调用时直接传入即可,无配置或注入依赖
通用性强 可用于所有需要 Pub/Sub 的场景(不仅限于短信,还能用于订单、通知等)
调用者需感知频道 调用者必须明确知道 “要向哪个频道发消息”,需自己管理频道名称的正确性
3. 适用场景
  • 业务场景不固定,需要动态生成频道的场景(如 “按用户 ID 拆分频道”“按订单 ID 拆分频道”);
  • 跨业务复用(同一方法可用于短信、订单、日志等不同模块的消息发布);
  • 临时或一次性的消息发布(如调试时向特定测试频道发消息)。

三、预设频道(publishSmsMessage 方法):预先定义频道,专注特定业务

再看 publishSmsMessage 方法的代码:

@Autowired
private ChannelTopic smsTopic; // 注入预设的频道对象

/**
 * 向预设频道发布消息
 */
public void publishSmsMessage(String userId) {
    redisTemplate.convertAndSend(smsTopic.getTopic(), userId);
    log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);
}
1. 频道的定义方式:启动时预先注入 / 配置
  • 「预设频道」的核心是:频道的名称在项目启动前就已固定(通过配置或代码定义),并通过 ChannelTopic 类封装后注入到 MessagePublisher 中,调用时无需传入频道,直接使用预设值
  • 关键依赖 ChannelTopic smsTopic 的定义逻辑(通常在配置类中):
@Configuration
public class RedisPubSubConfig {
    // 1. 从配置文件读取预设的短信频道名称(如 application.yml 中配置)
    @Value("${redis.pubsub.sms.topic}")
    private String smsTopicName;

    // 2. 定义预设频道的 ChannelTopic 对象,注入Spring容器
    @Bean
    public ChannelTopic smsTopic() {
        // 预设频道名称固定为配置文件中的值(如 "sms_user_accept_query")
        return new ChannelTopic(smsTopicName);
    }
}

此时 smsTopic.getTopic() 的值是固定的(如 sms_user_accept_query),调用 publishSmsMessage 时,始终向这个预设频道发消息:

// 调用时只需传入 userId,频道固定为 smsTopic 对应的预设值
messagePublisher.publishSmsMessage("123"); 
// 实际效果:向 "sms_user_accept_query" 频道发布消息 "123"
2. 核心特点
特点 说明
频道固定 频道名称在启动时就已确定,调用时无法修改,专注于特定业务(如短信)
依赖预设配置 需要提前在配置文件或代码中定义频道,通过 Spring 注入 ChannelTopic
业务关联性强 方法名(publishSmsMessage)和频道(smsTopic)都绑定 “短信” 业务,不可跨业务复用
调用者无需感知频道 调用者只需传入业务参数(如 userId),无需关心具体频道名称,降低使用成本
3. 适用场景
  • 业务场景固定,频道无需动态变化的场景(如你之前代码中的 “短信发送请求通知”—— 所有短信请求的消费通知,都通过同一个预设频道触发);
  • 专注单一业务模块(如仅用于短信、仅用于订单),避免频道名称混乱;
  • 团队协作场景:预先定义频道名称,所有开发者统一使用,避免因频道名称拼写错误导致消息无法接收(如统一用 sms_user_accept_query,而非有人写 sms_accept_user_query)。

四、预设频道 vs 指定频道:核心区别对比

为了更清晰区分,整理成表格:

对比维度 预设频道(publishSmsMessage 指定频道(publish
频道来源 启动时通过配置 / 代码预设,注入 ChannelTopic 调用时由调用者动态传入 String 类型的频道名称
频道灵活性 低(固定不变,仅服务特定业务) 高(可动态生成,支持任意场景)
业务关联性 强(绑定单一业务,如短信) 弱(通用,可跨业务复用)
调用复杂度 低(只需传业务参数,如 userId 高(需传 “频道 + 业务参数”,需确保频道名称正确)
维护成本 低(频道统一管理,避免拼写错误) 高(需手动管理频道名称,易因拼写错误导致消息丢失)
典型使用场景 短信发送通知、订单状态变更(固定频道) 按用户 / 订单拆分的动态通知、临时调试消息

4. 消息订阅者

@Component
public class MessageSubscriber extends MessageListenerAdapter {
    
    private static final Logger log = LoggerFactory.getLogger(MessageSubscriber.class);
    
    /**
     * 处理接收到的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        
        log.info("收到消息 - 频道: {}, 内容: {}", channel, body);
        
        // 根据不同的频道进行不同的处理
        handleMessage(channel, body);
    }
    
    /**
     * 消息处理逻辑
     */
    private void handleMessage(String channel, String message) {
        switch (channel) {
            case "sms:user:accept:query":
                processSmsMessage(message);
                break;
            case "order:create:notice":
                processOrderMessage(message);
                break;
            default:
                log.warn("未知频道: {}", channel);
        }
    }
    
    private void processSmsMessage(String userId) {
        log.info("处理用户 {} 的短信消息", userId);
        // 具体的业务逻辑
    }
    
    private void processOrderMessage(String orderId) {
        log.info("处理订单 {} 的创建消息", orderId);
        // 具体的业务逻辑
    }
}

5. 消息监听容器配置

@Configuration
public class RedisPubSubConfig {
    
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    
    @Autowired
    private MessageSubscriber messageSubscriber;
    
    @Bean
    public RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        
        // 添加频道订阅
        container.addMessageListener(messageSubscriber, 
            Arrays.asList(
                new ChannelTopic("sms:user:accept:query"),
                new ChannelTopic("order:create:notice"),
                new ChannelTopic("user:status:update")
            ));
        
        // 添加模式订阅
        container.addMessageListener(messageSubscriber, 
            new PatternTopic("logs:*"));
        
        return container;
    }
}

高级特性

1. 模式匹配订阅

// 订阅所有以 sms: 开头的频道
PSUBSCRIBE sms:*

// 订阅所有以 :notice 结尾的频道  
PSUBSCRIBE *:notice

// 订阅符合特定模式的频道
PSUBSCRIBE user:?:status

2. 消息格式设计

建议使用 JSON 格式的消息体:

{
  "eventType": "USER_SMS_RECEIVED",
  "timestamp": 1621234567890,
  "data": {
    "userId": "12345",
    "deviceId": "device_001",
    "messageContent": "Hello World"
  },
  "version": "1.0"
}

3. 错误处理和重试机制

@Component
public class ReliableMessageSubscriber {
    
    @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))
    public void handleMessageWithRetry(String message) {
        try {
            processMessage(message);
        } catch (Exception e) {
            log.error("消息处理失败,将进行重试", e);
            throw e; // 触发重试
        }
    }
    
    @Recover
    public void recover(Exception e, String message) {
        log.error("消息处理最终失败,存入死信队列: {}", message, e);
        // 将消息存入死信队列
        deadLetterQueue.add(message);
    }
}

实战应用场景

1. 实时消息通知系统

// 发布消息
public void notifyUser(String userId, String message) {
    String channel = "user:notify:" + userId;
    redisTemplate.convertAndSend(channel, 
        Map.of("type", "notification", "content", message, "time", System.currentTimeMillis()));
}

// 订阅消息
@RedisListener(topic = "user:notify:*")
public void onUserNotification(String channel, Map<String, Object> message) {
    String userId = channel.substring("user:notify:".length());
    // 推送消息到用户界面
    webSocketService.sendToUser(userId, message);
}

2. 分布式系统事件总线

// 系统事件发布
public void publishEvent(SystemEvent event) {
    redisTemplate.convertAndSend("system:events", 
        Map.of("eventType", event.getType(), "payload", event.getData()));
}

// 事件处理
@RedisListener(topic = "system:events")
public void handleSystemEvent(Map<String, Object> event) {
    String eventType = (String) event.get("eventType");
    Object payload = event.get("payload");
    
    switch (eventType) {
        case "USER_CREATED":
            handleUserCreated(payload);
            break;
        case "ORDER_PAID":
            handleOrderPaid(payload);
            break;
        // 其他事件处理...
    }
}

3. 实时数据同步

// 数据变更时发布消息
@Transactional
public void updateUserProfile(User user) {
    userRepository.save(user);
    
    // 发布数据变更消息
    redisTemplate.convertAndSend("data:sync:user", 
        Map.of("id", user.getId(), "action", "UPDATE", "timestamp", System.currentTimeMillis()));
}

// 其他服务监听数据变更
@RedisListener(topic = "data:sync:user")
public void onUserDataChange(Map<String, Object> change) {
    // 更新本地缓存或索引
    cacheService.refreshUserCache((Long) change.get("id"));
}

2. 消息压缩

对于大消息,可以先压缩再发送:

public void publishCompressed(String channel, Object message) {
    String json = objectMapper.writeValueAsString(message);
    byte[] compressed = compress(json.getBytes());
    redisTemplate.convertAndSend(channel, compressed);
}

3. 批量处理

@Component
public class BatchMessageProcessor {
    
    private final List<String> messageBuffer = new ArrayList<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    
    @PostConstruct
    public void init() {
        scheduler.scheduleAtFixedRate(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);
    }
    
    @RedisListener(topic = "high:frequency:channel")
    public void onMessage(String message) {
        synchronized (messageBuffer) {
            messageBuffer.add(message);
        }
    }
    
    private void processBatch() {
        List<String> batch;
        synchronized (messageBuffer) {
            batch = new ArrayList<>(messageBuffer);
            messageBuffer.clear();
        }
        
        if (!batch.isEmpty()) {
            businessService.processBatch(batch);
        }
    }
}

监控和运维

1. 监控指标

@Component
public class PubSubMonitor {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorPubSub() {
        // 监控频道数量
        Set<String> channels = redisTemplate.execute((RedisCallback<Set<String>>) connection -> 
            connection.pubSubChannels("*".getBytes()).stream()
                .map(bytes -> new String(bytes))
                .collect(Collectors.toSet()));
        
        // 监控消息吞吐量
        Map<String, Long> messageStats = new HashMap<>();
        for (String channel : channels) {
            Long subscribers = redisTemplate.execute((RedisCallback<Long>) connection -> 
                connection.pubSubNumSub(channel.getBytes()).get(channel.getBytes()));
            messageStats.put(channel, subscribers);
        }
        
        log.info("PubSub 监控 - 频道数: {}, 订阅统计: {}", channels.size(), messageStats);
    }
}

2. 异常处理

@Configuration
public class RedisErrorConfig {
    
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.setErrorHandler(new RedisPubSubErrorHandler());
        return container;
    }
    
    public class RedisPubSubErrorHandler implements ErrorHandler {
        @Override
        public void handleError(Throwable t) {
            log.error("Redis Pub/Sub 错误", t);
            // 发送告警通知
            alertService.sendAlert("Redis Pub/Sub 异常", t.getMessage());
        }
    }
}

总结

Redis 发布订阅模式提供了强大的实时消息通信能力,具有以下优势:

  1. 高性能:基于内存操作,吞吐量高

  2. 实时性:消息即时推送,延迟低

  3. 解耦性:生产者和消费者完全解耦

  4. 灵活性:支持频道和模式订阅

  5. 扩展性:易于水平扩展

但在使用时也需要注意:

  • 消息不持久化,重启会丢失

  • 没有消息确认机制

  • 不适合需要严格顺序的场景

通过合理的架构设计和代码实现,Redis Pub/Sub 可以成为构建实时应用系统的强大工具。


网站公告

今日签到

点亮在社区的每一天
去签到