Redis发布-订阅模式之Channel的发布订阅

发布于:2024-05-03 ⋅ 阅读:(29) ⋅ 点赞:(0)

一、简介

Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 publish发布消息,订阅者通过 subcribe订阅接收消息或通过unsubcribe取消订阅。

主要包含三个部分组成:发布者、订阅者、Channel。

发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。

二、通过频道(Channel)实现

  • 订阅者订阅频道
  • 发布者向「频道」发布消息
  • 所有订阅「频道」的订阅者收到消息

三、代码示例

(1)发布者发送消息

其实就是定义一个channel管道,填上合适的topic,后续订阅者根据这个topic来订阅消息。

String channel = new ChannelTopic(MATCH_LOG_REDIS_TOPIC).getTopic();
jedis.publish(channel, message);

(2)订阅者订阅频道

@Bean
    public MessageListenerAdapter listenerAdapter(MatchLogMsgHandler matchLogMsgHandler) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(matchLogMsgHandler, "onMessage");
        return adapter;
    }
    @Bean
    public RedisMessageListenerContainer container(@Qualifier("jedisConnectionFactory4Search") RedisConnectionFactory connectionFactory, Executor redisMqAsyncExecutor,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setTaskExecutor(redisMqAsyncExecutor);
        container.addMessageListener(listenerAdapter, new PatternTopic(MATCH_LOG_REDIS_TOPIC));
        return container;
    }
@Component
@Slf4j
public class MatchLogMsgHandler {

    public void onMessage(String message, String pattern) {

        log.info("消费match日志:{}", message);
        try {
            //将从channel拿到的message进行json转换为对象
            Map<String, String> map = JSONObject.parseObject(message, Map.class);
            String userName = map.get("userName");
            String data = map.get("data");

            WebSocketServer.sendInfo(data, userName);
        } catch (Exception e) {
            log.error("MatchLogMsgHandler处理对码日志消息失败", e);
        }
    }
}

这三片代码的看着似乎有点复杂,其实很简单,听我娓娓道来…
其实这三块代码都被springboot的注解变成了bean对象,并交给了springboot来托管。

那么订阅者是怎么订阅到发布者发布的消息呢?

  • 先看第二块代码,这个container其实是在bean加载的流程中,将new PatternTopic(MATCH_LOG_REDIS_TOPIC)这个topic和listenerAdapter的监听器绑定到一块,代表着listenerAdapter的这个监听器只监听这个topic
  • 再看第一块代码,就是声明了一个listenerAdapter监听器,这个监听器监听到消息之 - 后,通过反射调用执行matchLogMsgHandler类中的onMessage方法
  • 最后一块代码,就是创建了matchLogMsgHandler类,在onMessage方法中来具体的处理我们的逻辑。

------------------------------完结撒花------------------------------