问题:
假设机器A,B,C都用到一个配置表的数据,服务一启动,会把配置表的所有数据缓存到每份机器上,定时器是每5分钟会重新从数据库加载一次到本地缓存,万一这5分钟内配置表发生更新,每个机器上的缓存就对应不上。
解决方法:
利用redis的 pub/sub 通知到所有在线的机器进行缓存删除
在线的服务器收到消息后,自己都去执行删除缓存的操作,并且立马重新刷新缓存
例子:
现在有一款直播互动APP,用送礼物的功能,有用户界面配置(如,更换界面语言),送礼物时风险用户过滤。当我要实现这个功能时,我可以预先把礼物列表,用户配置列表,风险用户列表,加载到本地缓存中。每台机器都有我预先加载好的数据,此时是一致的。
当用户张三更改了界面配置,可能是改了语言类型,或者后台运营人员把上架了新礼物,或者增加了一些风险用户名单,刚好这里所有的修改操作全部落在了机器A这台机器,机器A先写库,后重新从库中加载到本地缓存,此时机器A的数据是最新的,那用户张三在访问礼物列表时,有可能访问到了机器B,这时机器B还是老数据。
为了解决上面那个问题,可以添加上我写的工具包,在这3个类上分别添加订阅,每个类添加订阅时key要求都不一样,这样做的目的是为了保证,当我礼物发生改变时,我只是通知到其他机器去重新拉取GiftCacheImpl的数据,而不会去触发SystemConfigCacheImpl和RiskUserCacheImpl的重新加载数据。
其中PsmDispatcher做接收消息,然后分发用,如:我接收的消息是刷新礼物,那我就在我的handlersMap里面找到这个key对应的值,值是一个接口类,拿到值后去调用handle方法就行。
说明:
消息发布和订阅基于 redis 的 pub/sub,生产者发布消息,所有订阅者都能接收到该消息,和消息队列(mq)的广播消息有点类似。需要注意的是,redis 的 pub/sub 和基于 mq 的广播消息并不一样;
1.只有消费者运行时,才能接收到 redis 的 pub/sub 消息;如果生产者发布消息时,消费者不在运行,并不会保存离线消息供消费者下次启动时消费
2.而 mq 会保存该消息,消费者启动后可以接收到未消费过的离线消息
适用场景:
特定事件发生后,需要通知其他业务进行相应的处理,例如状态的更新,缓存的重新加载不适合需要支持离线消息的业务
如何使用:
附上github链接,这里已经上传了一个工具包,直接使用即可
使用方法,下载jar包,放置仓库里,在使用时可以写一个消息注册中心,这个中心只要负责分发消息到相应的消息处理器即可,因为每个处理器可以处理不一样的业务,我们redis只要监听一个消息订阅者就行了。所有的发布者发布的消息都会先到消息中心,后面再具体分发到具体的消息处理类进行处理。这里附上使用demo的链接:
简单测试和实际工程使用:
1.简单测试:
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
public class PubTest implements MessageHandler{
private JedisPool jedispool;
public PubTest(){
GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMinIdle(8);
genericObjectPoolConfig.setMaxIdle(64);
genericObjectPoolConfig.setMaxTotal(256);
genericObjectPoolConfig.setMaxWaitMillis(200);
genericObjectPoolConfig.setTestWhileIdle(true);
genericObjectPoolConfig.setTestOnBorrow(false);
genericObjectPoolConfig.setTestOnReturn(false);
jedispool = new JedisPool(genericObjectPoolConfig, "127.0.0.1", 6379, 1000, null);
Jedis jedis1 = jedispool.getResource();
//先启动订阅
ThreadPoolExecutorFactoryBean threadPoolExecutorFactoryBean = new ThreadPoolExecutorFactoryBean();
threadPoolExecutorFactoryBean.setCorePoolSize(8);
threadPoolExecutorFactoryBean.setMaxPoolSize(1024);
threadPoolExecutorFactoryBean.setQueueCapacity(0);
threadPoolExecutorFactoryBean.setWaitForTasksToCompleteOnShutdown(true);
threadPoolExecutorFactoryBean.setAwaitTerminationSeconds(5);
threadPoolExecutorFactoryBean.initialize();
ExecutorService threadPool = threadPoolExecutorFactoryBean.getObject();
Subscriber subscriber = new Subscriber(jedis1, threadPool);
subscriber.addMessageHandler("global_channel",this);
subscriber.start();
}
public static void main(String[] args) throws InterruptedException {
PubTest pubTest = new PubTest();
pubTest.pub();
}
private void pub()throws InterruptedException{
Jedis jedis2 = jedispool.getResource();
Publisher pub = Publisher.getPublisher(jedis2, "global_channel");
System.out.println("准备开始");
Thread.sleep(5000);
pub.publishJson("hello");
}
@Override
public void handle(String message) {
System.out.println("hello world");
System.out.println(message);
}
}
工程使用:
在使用的工程中加入一下依赖
<groupId>org.junxi</groupId>
<artifactId>publish-common</artifactId>
<version>1.0-SNAPSHOT</version>