目录
3.3 定义本地缓存工具类(AlarmLocalCache)
前言
基于Spring事件驱动的异步缓存更新机制是一种解耦、高效、实时的数据同步方案,适用于分布式系统或微服务架构中缓存一致性的维护
一、事件源
1.1 角色:触发缓存更新的组件(如Redis消息监听器、MQTT监听器、数据库变更监听器)
1.2 当检测到数据变更时,发布自定义事件(如CacheEvent
)
1.3 载体:封装变更数据(如设备ID、状态)及上下文信息(如频道、操作类型)
1.4 CacheEvent 类
public class CacheEvent {
private RedisChannelEnum channel; // 事件频道(如DEVICE、ALARM)
private String message; // 序列化后的数据(如JSON)
// 构造方法、Getter/Setter 省略
}
二、事件监听器(Event Listener)
- 角色:异步消费事件并更新本地缓存的组件。
- 关键注解:
@EventListener
:声明监听特定事件。@Async
:实现异步处理。@Order
:控制监听器执行顺序(可选)。
1.4 本地缓存(Local Cache)
- 角色:存储热点数据的内存缓存(如
ConcurrentHashMap
、Caffeine)。 - 动作:监听器将事件中的数据反序列化后写入缓存。
三、完整案例
3.1 定义事件枚举(RedisChannelEnum)
public enum RedisChannelEnum {
DEVICE("DEVICE"), // 设备状态频道
ALARM("ALARM"); // 告警频道
private final String name;
RedisChannelEnum(String name) {
this.name = name;
}
public String getName() {
return name;
}
// 根据名称获取枚举
public static RedisChannelEnum getChannel(String name) {
return Arrays.stream(values())
.filter(c -> c.name.equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown channel: " + name));
}
}
3.2 发布缓存事件(模拟Redis监听器)
@Component
public class RedisMessageListener {
@Autowired
private ApplicationEventPublisher publisher;
// 模拟Redis消息到达
public void simulateRedisMessage(String channelName, String message) {
RedisChannelEnum channel = RedisChannelEnum.getChannel(channelName);
publisher.publishEvent(new CacheEvent(channel, message));
}
}
3.3 定义本地缓存工具类(AlarmLocalCache)
public class AlarmLocalCache {
private static final ConcurrentHashMap<String, DeviceCache> DEVICE_CACHE = new ConcurrentHashMap<>();
public static void putDevice(String deviceName, DeviceCache deviceCache) {
DEVICE_CACHE.put(deviceName, deviceCache);
}
public static DeviceCache getDevice(String deviceName) {
return DEVICE_CACHE.get(deviceName);
}
}
3.4 异步事件监听器(更新设备缓存)
@Slf4j
@Component
public class DeviceCacheUpdater {
@Async
@Order(1) // 明确执行顺序(可选)
@EventListener(value = CacheEvent.class, condition = "#event.channel.name()=='DEVICE'")
public void updateDeviceCache(CacheEvent event) {
log.info("异步处理设备缓存事件: {}", event.getMessage());
try {
// 反序列化JSON
DeviceCache deviceCache = JSONObject.parseObject(event.getMessage(), DeviceCache.class);
// 更新本地缓存
AlarmLocalCache.putDevice(deviceCache.getDeviceName(), deviceCache);
log.info("设备缓存更新成功: {}", deviceCache.getDeviceName());
} catch (Exception e) {
log.error("处理设备缓存事件失败: {}", event.getMessage(), e);
}
}
}
3.5 测试代码
@SpringBootTest
public class CacheUpdateTest {
@Autowired
private RedisMessageListener redisListener;
@Test
public void testCacheUpdate() throws InterruptedException {
// 模拟Redis发送设备状态变更事件
String jsonMessage = "{\"deviceName\":\"sensor_001\",\"status\":\"online\"}";
redisListener.simulateRedisMessage("DEVICE", jsonMessage);
// 等待异步任务完成(实际生产环境无需等待)
Thread.sleep(1000);
// 验证本地缓存
DeviceCache cachedDevice = AlarmLocalCache.getDevice("sensor_001");
Assertions.assertEquals("online", cachedDevice.getStatus());
}
}
四、 执行流程
事件发布:
RedisMessageListener
模拟Redis消息到达,发布CacheEvent
事件(频道DEVICE
,消息{"deviceName":"sensor_001","status":"online"}
)。事件路由:
Spring事件机制根据@EventListener
的条件过滤,仅当事件频道为DEVICE
时,触发DeviceCacheUpdater
的updateDeviceCache
方法。异步处理:
@Async
注解使方法在独立线程执行,避免阻塞事件发布线程。缓存更新:
解析JSON消息,将设备状态存入AlarmLocalCache
,后续业务可直接从本地缓存读取数据。
五、机制优势
- 解耦:事件生产者与消费者无直接依赖,便于独立扩展。
- 异步非阻塞:提高系统吞吐量,适合高并发场景。
- 实时性:通过事件驱动实现数据变更的近实时同步。
- 可观测性:可通过日志追踪事件流转全链路。
六、实际应用场景
- 分布式缓存协调:在微服务间同步Redis缓存变更。
- 数据库变更监听:通过Binlog监听实现数据库到缓存的异步更新。
- 实时数据管道:将Kafka消息转换为本地缓存更新。