一、简介
Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 publish发布消息,订阅者通过 subcribe订阅接收消息或通过unsubcribe取消订阅。
主要包含三个部分组成:发布者、订阅者、Channel。
发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。
二、通过频道(Channel)实现
- 订阅者订阅频道
- 发布者向「频道」发布消息
- 所有订阅「频道」的订阅者收到消息
三、代码示例
(1)发布者发送消息
其实就是定义一个channel管道,填上合适的topic,后续订阅者根据这个topic来订阅消息。
String channel = new ChannelTopic(MATCH_LOG_REDIS_TOPIC).getTopic();
jedis.publish(channel, message);
(2)订阅者订阅频道
@Bean
public MessageListenerAdapter listenerAdapter(MatchLogMsgHandler matchLogMsgHandler) {
MessageListenerAdapter adapter = new MessageListenerAdapter(matchLogMsgHandler, "onMessage");
return adapter;
}
@Bean
public RedisMessageListenerContainer container(@Qualifier("jedisConnectionFactory4Search") RedisConnectionFactory connectionFactory, Executor redisMqAsyncExecutor,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setTaskExecutor(redisMqAsyncExecutor);
container.addMessageListener(listenerAdapter, new PatternTopic(MATCH_LOG_REDIS_TOPIC));
return container;
}
@Component
@Slf4j
public class MatchLogMsgHandler {
public void onMessage(String message, String pattern) {
log.info("消费match日志:{}", message);
try {
//将从channel拿到的message进行json转换为对象
Map<String, String> map = JSONObject.parseObject(message, Map.class);
String userName = map.get("userName");
String data = map.get("data");
WebSocketServer.sendInfo(data, userName);
} catch (Exception e) {
log.error("MatchLogMsgHandler处理对码日志消息失败", e);
}
}
}
这三片代码的看着似乎有点复杂,其实很简单,听我娓娓道来…
其实这三块代码都被springboot的注解变成了bean对象,并交给了springboot来托管。
那么订阅者是怎么订阅到发布者发布的消息呢?
- 先看第二块代码,这个container其实是在bean加载的流程中,将new PatternTopic(MATCH_LOG_REDIS_TOPIC)这个topic和listenerAdapter的监听器绑定到一块,代表着listenerAdapter的这个监听器只监听这个topic。
- 再看第一块代码,就是声明了一个listenerAdapter监听器,这个监听器监听到消息之 - 后,通过反射调用执行matchLogMsgHandler类中的onMessage方法。
- 最后一块代码,就是创建了matchLogMsgHandler类,在onMessage方法中来具体的处理我们的逻辑。
------------------------------完结撒花------------------------------