加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
https://1024bat.cn/
好的,咱们直接讲清楚重点:
在 Apache Cassandra 中,Session 是你跟数据库交互的核心对象。
它主要负责干两件事:
管理连接:
Session代表着和 Cassandra 集群 建立好的一组连接(通常是连接池),而不是一个简单的 TCP 连接。
你可以理解成:Session 帮你打通了集群里多个节点之间的路,准备好随时发请求。执行 CQL(Cassandra Query Language)语句:
通过Session,你可以执行查询(SELECT)、插入(INSERT)、更新(UPDATE)、删除(DELETE)等操作。
Session 会帮你把这些 CQL 命令发给合适的节点去处理。
简单来说:
Session 是 Cassandra 客户端和集群之间进行读写交互的通道,且是线程安全的,整个应用通常只创建一个或者少量几个 Session。
举个实际开发里的例子:
// 1. 创建连接集群
CqlSession session = CqlSession.builder()
.withKeyspace("my_keyspace")
.build();
// 2. 用 Session 执行一条查询
ResultSet rs = session.execute("SELECT * FROM users WHERE id = 123");
// 3. 遍历查询结果
for (Row row : rs) {
System.out.println(row.getString("name"));
}
// 4. 用完关闭(重要,避免连接泄漏)
session.close();
这里的
CqlSession是新版 Java Driver 4.x 之后引入的标准(比老的Session更强大一些)。
常见认知误区
误区 |
正解 |
|---|---|
Session 是单一连接 |
❌ 不是,它内部有连接池,可以高并发请求 |
每次用完都要新建 Session |
❌ 不要,Session 是重量级对象,应该复用 |
Session 只能操作一个 keyspace |
❌ 可以动态切换,也可以一开始就绑定某个 keyspace |
🛠 【Kafka消费者配置流程】详细过程说明
① Spring Boot 项目启动阶段
Spring 启动时,扫描到
@Configuration标注的EventConsumerConfig类。Spring 自动执行
@Bean方法:
调用
batchRecordConsumerFactory()方法,注册名字叫"BRConsumerFactory"的 批量消费容器工厂。调用
oneByOneRecordConsumerFactory()方法,注册名字叫"OBORConsumerFactory"的 单条消费容器工厂。
在
getConsumerFactory(true/false)方法里:-
创建
ConcurrentKafkaListenerContainerFactory。设置关联的
ConsumerFactory(封装了 Kafka 连接参数)。配置并发数量(即几条消费线程同时消费)。
配置是否是批量消费。
配置消费完消息后,手动提交 offset (
AckMode.MANUAL_IMMEDIATE)。
整体链路简化图(文字版)
Spring Boot 启动 ↓ 加载 EventConsumerConfig ↓ 注册 KafkaListenerContainerFactory (批量/单条) ↓ 应用内 @KafkaListener 启动监听 ↓ Kafka 推送消息 ↓ KafkaListener 容器拉取消息 ↓ 调用 onMessage 处理逻辑 ↓ 手动提交 offset (ack.acknowledge()) ↓ 继续拉取下一批消息Kafka 消费者配置(
EventConsumerConfig)操作流程说明
1. 项目启动时,Spring 加载配置类
EventConsumerConfig@Configuration注解告诉 Spring:这是一个配置类,会被容器扫描。Spring 创建
EventConsumerConfig实例,并注入需要的配置属性(通过@Value)。
2. 注入配置属性到对象字段
通过
@Value("${xxx}"),从配置文件(比如application.yml)读取Kafka消费者的基础属性,比如:Kafka集群地址 (
kafka.consumer.servers)是否自动提交 (
kafka.consumer.enable.auto.commit)超时时间 (
kafka.consumer.session.timeout)消费者组 ID (
kafka.consumer.group.id)并发线程数 (
kafka.consumer.concurrency) 等等。
如果某些配置没配,带默认值的
@Value注解会保证程序不因空值报错。
3. 创建 Kafka 消费者参数 Map (
consumerConfigs方法)调用
consumerConfigs()方法时,会组装一份消费者所需的配置参数Map<String, Object>。主要包含:
连接Kafka服务器 (
BOOTSTRAP_SERVERS_CONFIG)禁用自动提交offset (
ENABLE_AUTO_COMMIT_CONFIG = false)配置session超时
配置key/value反序列化器
动态组装消费者GroupId(带主机名后缀)
配置拉取最大消息数
配置初始offset位置(
auto.offset.reset)
🔔 特别注意:
此处强制手动提交offset,保证业务处理完成后再提交,避免消息丢失。Kafka Producer 数据流程步骤
从应用代码调用
kafkaTemplate.send()到 Kafka Broker 落盘,整体分成下面 7 个主要步骤:
1. KafkaTemplate 发送消息
应用程序调用
kafkaTemplate.send(topic, key, value)方法发起发送。KafkaTemplate封装了 Producer API,异步地把消息交给 Kafka Client。
2. Producer 将消息序列化
KeySerializer和ValueSerializer(这里用的是StringSerializer)把 key 和 value 转换成字节数组(byte[])。序列化是必须的,因为网络传输需要字节流。
3. 将消息分配到 Partition
根据发送时指定的
key(如果有),使用 Kafka 的 分区器(Partitioner) 算法计算出具体的 partition。如果没指定 key,则使用轮询或随机分配 partition。
4. 消息写入 RecordAccumulator(本地缓存池)
Kafka Producer 不会立刻发送每一条消息,而是先放到内存中的
RecordAccumulator缓冲池中。什么时候触发发送?
-
累积到一定
batch.size大小。或者等待时间超过
linger.ms。
(👉 你的配置:
batch.size、linger.ms就是在控制这里的行为。)
5. Sender 线程异步发送数据
Producer 后台有一个 Sender线程,专门不断从
RecordAccumulator拉取数据打包,异步发送到 Kafka Broker。通过 TCP 网络连接(socket)发送 ProduceRequest。
6. Broker 收到请求并应答(ACK)
Kafka Broker Leader Partition 接收消息。
内存中先缓存,然后立即返回 ACK 应答给 Producer(不一定等落盘! ,Kafka快的原因之一)。
是否等待 ISR 集群同步完副本,取决于
acks配置(你的代码里目前是默认 acks=1)。
7. Producer Callback 回调处理
Producer 收到 Broker 的 ACK。
如果发送成功,触发成功回调(
onSuccess)。如果失败(比如网络中断、broker不可用),触发异常回调(
onFailure),可以重试或记录异常。
Kafka Producer 采用了 "内存批处理 + 异步发送 + 硬件顺序写" 模式,极大提高了消息发送性能与吞吐量。
发送数据(Send Process)
发送一条消息到 Kafka:
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value"); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("发送成功, topic: " + metadata.topic() + ", offset: " + metadata.offset()); } else { exception.printStackTrace(); } });过程细节:
步骤
说明
构造
ProducerRecord封装消息主题、key、value 等信息
调用
send()方法异步将消息发送到消息累加器(RecordAccumulator)
判断是否触发发送
满批量(batch.size)或逗留时间(linger.ms)后,真正打包发送
选择 Partition
Kafka 内部根据 key 的 hash 算法,或随机分配 Partition
网络线程 IO
由 KafkaProducer 的 I/O 线程异步发送数据包到 Kafka Broker
Broker 处理
Kafka Broker 持久化写入、返回应答(根据 acks 决定是否需要副本确认)
回调函数执行
成功/失败都会触发回调(Callback)逻辑
刷新与关闭(Flush & Close)
保证缓冲区内所有数据都发送出去,并优雅关闭资源:
producer.flush(); // 强制立即发送缓冲区内所有数据 producer.close(); // 关闭生产者,释放连接资源Java 实操总结要点
异步发送,提升吞吐量。
批量累加,减少网络请求次数。
配置合理 acks、retries,兼顾性能和可靠性。
合理使用 callback,捕捉异常与记录日志。
flush() + close() ,确保优雅退出。
// 使用线程池处理异步任务 private static final BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(); private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor( 2, 6, 60L, TimeUnit.SECONDS, blockingQueue );try-catch范围精简,异常日志更规范(避免只打e.getMessage()导致漏掉栈信息)。入参判空 (
null校验) 增强,防止 NPE。JSON转换加上了异常保护。
BlockingQueue泛型加了<Runnable>,防止编译警告。统一使用
@PostMapping代替@RequestMapping(method=POST),风格更一致。日志参数化 (
logger.info("xxx {}", value)),性能更优。定义了一个静态常量阻塞队列,用来存储等待执行的任务(Runnable) 。
是给下面这个线程池:
ExecutorService fixedThreadPool = new ThreadPoolExecutor(..., blockingQueue);用的。
本质是作为线程池任务缓冲区。
当线程池的线程忙不过来时,新来的任务(Runnable)就先塞进blockingQueue,排队等执行。
说白了,就是让线程池不丢任务、顺序排队的地方。
线程池 + 阻塞队列配合流程:
顺着你的代码理解是这样:
线程池初始化
核心线程数:2
最大线程数:6
队列:
blockingQueue
提交任务时(比如异步提交一个 Runnable)
-
如果当前活跃线程数 < 核心线程数(2个): ➔ 直接新建线程执行任务。
如果活跃线程数 ≥ 核心线程数: ➔ 任务塞到
blockingQueue里面排队。如果blockingQueue 满了,并且活跃线程数 < 最大线程数(6): ➔ 继续扩容线程去处理。
如果最大线程数也满了,且队列也满了: ➔ 执行拒绝策略(默认抛异常,可以自定义处理)。
线程空闲60秒后,如果是非核心线程,会被回收销毁。
简单流转:
提交任务 Runnable ↓ 活跃线程数 < 核心线程数(2)? └→ 是:直接新建线程处理 ↓ 否 blockingQueue 队列未满? └→ 是:入队排队等待线程处理 ↓ 否 活跃线程数 < 最大线程数(6)? └→ 是:再创建新线程处理 ↓ 否 执行拒绝策略(抛异常或自定义处理)数据库示例(以 Cassandra 为例)
表结构(示例) :
假设我们在 Cassandra 中有这样一张表:
CREATE TABLE IF NOT EXISTS logs ( id text PRIMARY KEY, partition_key text, sort_key text, log_time text, info text );或者业务数据表:
CREATE TABLE IF NOT EXISTS user_data ( id text PRIMARY KEY, user_id text, device_id text, info text );id:唯一主键(如 MongoDB ObjectId)partition_key, sort_key:用于分区和排序log_time:日志时间戳info:对象数据序列化成的 JSON 字符串
🧱 数据结构设计(详细)
字段
类型
描述
clientId
varchar
客户端 ID(设备或租户编号)
day
varchar
日期(格式如 20240420)
dir
varchar
消息方向(如 "up"=上行, "down"=下行)
objId
varchar
对象 ID,自动生成唯一值(MongoDB 风格)
自动清理过期表 + 自动导出数据备份
节省存储成本
保证系统长期稳定
支持灾备和离线分析需求
目标拆分
1. 定时清理过期表
找出超过 N 个月的历史表(比如保留最近 6 个月)
自动 DROP TABLE
2. 自动导出备份
先把即将删除的表数据导出到文件(比如 JSON 或 CSV)
备份到对象存储、NAS,或者直接推到 Kafka 等离线系统
确认备份完成后再删除表
总体思路
步骤
描述
1
分页查询大表数据(防止一次性读完爆内存)
2
多线程并发导出(加速 IO,提升速率)
3
分批写文件(比如每1000条落一次磁盘)
4
最后合并小文件(optional,根据需要)
private static final int PAGE_SIZE = 1000; // 每页查1000条 private static final int THREAD_COUNT = 4; // 4个线程并发导出 private void exportTableData(String tableName) { ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); String baseDir = "/data/backup/" + tableName; new File(baseDir).mkdirs(); List<Future<File>> futures = new ArrayList<>(); AtomicInteger fileCounter = new AtomicInteger(0); try { Statement stmt = new SimpleStatement(String.format("SELECT * FROM %s.%s;", KEYSPACE, tableName)) .setFetchSize(PAGE_SIZE); ResultSet resultSet = session.execute(stmt); Iterator<Row> iterator = resultSet.iterator(); List<Row> batch = new ArrayList<>(PAGE_SIZE); while (iterator.hasNext()) { batch.add(iterator.next()); if (batch.size() >= PAGE_SIZE) { List<Row> batchToExport = new ArrayList<>(batch); batch.clear(); futures.add(executor.submit(() -> exportBatch(batchToExport, baseDir, fileCounter.incrementAndGet()))); } } // 导出最后剩余不足 PAGE_SIZE 的数据 if (!batch.isEmpty()) { futures.add(executor.submit(() -> exportBatch(batch, baseDir, fileCounter.incrementAndGet()))); } // 等待所有导出任务完成 for (Future<File> future : futures) { future.get(); } System.out.println("[Exporter] Exported table " + tableName + " in " + futures.size() + " parts."); } catch (Exception e) { throw new RuntimeException("Batch export failed for " + tableName, e); } finally { executor.shutdown(); } } private File exportBatch(List<Row> rows, String baseDir, int partNumber) { File file = new File(baseDir, "part_" + partNumber + ".json"); try (PrintWriter writer = new PrintWriter(file)) { ObjectMapper objectMapper = new ObjectMapper(); for (Row row : rows) { Map<String, Object> rowMap = new HashMap<>(); row.getColumnDefinitions().forEach(col -> { rowMap.put(col.getName(), row.getObject(col.getName())); }); writer.println(objectMapper.writeValueAsString(rowMap)); } } catch (IOException e) { throw new RuntimeException("Failed to export part " + partNumber, e); } return file; }导出机制总结
点
做法
作用
分页拉取
每次只拉取1000条
不会 OOM
多线程导出
4线程并行
提升 IO 速度
分文件存储
每批数据一个小文件
易于管理,防止文件过大
可横向扩展
调整 PAGE_SIZE、线程数
灵活适配不同规模数据
高并发 + 防爆内存 + 可扩展 + 可落盘备份
1. 高并发(High Concurrency)
目标:
多线程并行导出,提升大表数据备份速度。具体做法:
技术
描述
ExecutorService线程池
控制固定数量线程同时工作,防止CPU过载
每批数据异步提交导出任务
每拉取一页数据(如1000行),提交一个异步导出任务
Future / CompletableFuture
异步任务结果收集,统一等待完成
写磁盘操作并发执行
提升磁盘 I/O 吞吐量
效果:
最大化利用 CPU、内存、磁盘带宽,数据量再大也能稳定快速导出。2. 防爆内存(Prevent OOM)
目标:
无论数据量多大,始终保持内存使用在合理范围内。具体做法:
技术
描述
Cassandra 分页查询(FetchSize)
每次只拉一小批(如1000条)数据进内存
按批处理
拉一批、写一批、清理一批,避免累积太多对象
异步批次分批导出
每个线程只维护自己那一小批数据
效果:
即使单表百万行、千万行,也不会一次性把内存吃满,保证系统稳定运行。可落盘备份(Durable Backup)
目标:
导出的数据安全持久保存,不丢失、不破坏。具体做法:
技术
描述
按批次小文件保存
每批导出一个小 JSON 文件,如
part_1.json可选压缩(gzip)
小文件自动压缩,节省空间
完成后合并小文件(可选)
也可以保留分片,方便增量恢复
目录规范化管理
每张表一个目录,按表名+日期组织
增加导出校验
导出完成后记录总行数、校验码(MD5)防止误差
🚀 实现方案(Java 控制台多线程进度条)
✨ Step 1:核心工具类
ProgressBarpublic class ProgressBar { private final int total; private final AtomicInteger current = new AtomicInteger(0); private final String taskName; private final int barWidth = 20; public ProgressBar(int total, String taskName) { this.total = total; this.taskName = taskName; } public void step(int count) { current.addAndGet(count); print(); } public void print() { int now = current.get(); double percent = now * 1.0 / total; int len = (int)(barWidth * percent); String bar = "[" + "=".repeat(len) + "-".repeat(barWidth - len) + "] " + String.format("%3d%%", (int)(percent * 100)) + " (" + now + " / " + total + ")" + " 表:" + taskName + (now >= total ? " ✔ Done" : ""); synchronized (System.out) { System.out.print("\r" + bar); // 输出当前行 } } }在程序中缓存到本地的数据时,通常会使用本地持久化存储(如文件、数据库、内存缓存等)来存储数据。这样,即使程序重新启动,也能恢复之前缓存的数据。以下是几种常见的实现方案:
1. 使用文件缓存
将缓存数据保存到本地文件中(如 JSON、XML、或二进制文件),在程序启动时读取文件恢复缓存。
示例:使用 JSON 文件存储缓存
import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; import java.util.Map; import java.util.HashMap; public class FileCache { private static final ObjectMapper objectMapper = new ObjectMapper(); private static final String CACHE_FILE_PATH = "cache_data.json"; // 缓存文件路径 private static Map<String, String> cache = new HashMap<>(); // 读取缓存数据 public static void loadCache() { File cacheFile = new File(CACHE_FILE_PATH); if (cacheFile.exists()) { try { cache = objectMapper.readValue(cacheFile, Map.class); } catch (IOException e) { e.printStackTrace(); } } } // 将数据缓存到文件 public static void saveCache() { try { objectMapper.writeValue(new File(CACHE_FILE_PATH), cache); } catch (IOException e) { e.printStackTrace(); } } // 获取缓存 public static String getCache(String key) { return cache.get(key); } // 设置缓存 public static void setCache(String key, String value) { cache.put(key, value); saveCache(); } public static void main(String[] args) { // 加载缓存 loadCache(); // 设置缓存 setCache("user_123", "John Doe"); // 获取缓存 String user = getCache("user_123"); System.out.println("User: " + user); } }优点:
简单易实现。
可以在文件中持久化数据,程序重启后可以恢复缓存。
缺点:
如果数据量非常大,使用文件缓存可能会变得缓慢。
文件存储的安全性较差,容易丢失或被篡改。
2. 使用数据库缓存
可以使用嵌入式数据库(如 SQLite)将缓存数据存储在本地数据库中。程序启动时从数据库中恢复缓存。
示例:使用 SQLite 存储缓存
import java.sql.*; public class DatabaseCache { private static final String DB_URL = "jdbc:sqlite:cache.db"; // SQLite数据库文件路径 private static Connection connection; static { try { connection = DriverManager.getConnection(DB_URL); Statement stmt = connection.createStatement(); stmt.execute("CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value TEXT)"); } catch (SQLException e) { e.printStackTrace(); } } // 获取缓存 public static String getCache(String key) { try { PreparedStatement stmt = connection.prepareStatement("SELECT value FROM cache WHERE key = ?"); stmt.setString(1, key); ResultSet rs = stmt.executeQuery(); if (rs.next()) { return rs.getString("value"); } } catch (SQLException e) { e.printStackTrace(); } return null; } // 设置缓存 public static void setCache(String key, String value) { try { PreparedStatement stmt = connection.prepareStatement("REPLACE INTO cache (key, value) VALUES (?, ?)"); stmt.setString(1, key); stmt.setString(2, value); stmt.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } } public static void main(String[] args) { // 设置缓存 setCache("user_123", "John Doe"); // 获取缓存 String user = getCache("user_123"); System.out.println("User: " + user); } }优点:
数据持久化到数据库,可以进行复杂的查询操作。
数据可靠性较高,支持事务。
缺点:
相比内存缓存,数据库的读写性能较慢。
需要依赖数据库引擎。
3. 使用内存缓存+定期持久化(如 Redis)
Redis 是一个非常流行的内存缓存系统,通常用于存储快速读取的数据。如果需要缓存并持久化,可以使用 Redis 提供的持久化选项(RDB 或 AOF)。
Redis 持久化方式:
RDB(快照) :在指定时间间隔内生成数据的快照进行持久化。
AOF(追加文件) :记录每次修改操作,保证数据的持久化。
示例:Redis 缓存(Spring Data Redis)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; @Component public class RedisCache { @Autowired private RedisTemplate<String, String> redisTemplate; // 获取缓存 public String getCache(String key) { ValueOperations<String, String> ops = redisTemplate.opsForValue(); return ops.get(key); } // 设置缓存 public void setCache(String key, String value) { ValueOperations<String, String> ops = redisTemplate.opsForValue(); ops.set(key, value); } }优点:
高性能,支持分布式缓存。
可以持久化缓存数据,提供高可用性。
缺点:
需要额外的 Redis 服务部署。
如果 Redis 配置不当,可能会丢失部分数据。
4. 使用本地内存缓存 + 恢复机制
如果数据量较小且只需要在内存中缓存,可以选择将数据存储在内存中,在程序启动时加载缓存数据。使用一个简单的文件或数据库同步机制,将内存数据持久化到文件或数据库中。
5. 程序重启后的恢复:
无论使用哪种缓存方案,在程序重启时,都需要加载之前存储的缓存数据。常见的做法是:
在启动时读取文件、数据库或 Redis 中存储的数据并恢复到内存缓存中。
根据具体的需求选择是否清理缓存,或是恢复历史数据。
本地缓存恢复机制就是:程序启动时读取缓存快照(文件、数据库)==> 恢复到内存中。
1. 缓存保存
比如你的应用里有个
Map<String, Object> localCache,要在程序运行时,把它保存到硬盘,通常在两个时机:定时保存,比如每5分钟存一次
关闭程序(Shutdown Hook)时保存一次,避免数据丢失
保存方法可以是:
JSON 文件
本地数据库(如 SQLite)
// 定时保存示例 ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(() -> { saveLocalCacheToFile(); }, 5, 5, TimeUnit.MINUTES); // 程序关闭时保存 Runtime.getRuntime().addShutdownHook(new Thread(() -> { saveLocalCacheToFile(); }));2. 缓存恢复
程序启动时,先去读本地文件,把之前保存的缓存数据加载进来。
@PostConstruct public void loadCache() { Map<String, Object> cacheData = loadLocalCacheFromFile(); if (cacheData != null) { localCache.putAll(cacheData); } }简单总结流程:
场景
处理方式
程序运行中
定时保存到文件
程序退出时
保存一次到文件
程序启动时
读取文件恢复
二、使用Redis缓存时,如何做恢复?(Redis本身的持久化机制)
Redis 本身是内存数据库,为了防止宕机数据丢失,有两种主流持久化方式:
1. RDB 持久化(快照方式)
定时(比如每隔5分钟、或者每有100条写入)保存整个内存快照到磁盘(
dump.rdb)。Redis 宕机、重启后,会自动加载
dump.rdb文件,把数据恢复到内存。
特点
轻量级,适合大部分普通应用。
有可能丢失最近一小段时间(最后一次保存后)修改的数据。
配置示例(
redis.conf)save 900 1 # 900秒(15分钟)内有1次修改就保存快照 save 300 10 # 300秒内有10次修改就保存快照 save 60 10000 # 60秒内有10000次修改就保存快照2. AOF 持久化(日志方式)
把每次写命令(如 SET、HSET、LPUSH)追加到 AOF 文件。
Redis 宕机、重启后,按日志回放的方式重新执行这些指令,恢复数据。
特点
数据最完整,可以做到几乎不丢数据。
缺点是AOF文件增长快,需要后台定期压缩(重写)。
配置示例(
redis.conf)appendonly yes # 打开AOF持久化 appendfsync everysec # 每秒钟同步一次 # appendfsync always # 每次写操作都同步,最安全但最慢 # appendfsync no # 不主动同步,依赖系统(性能高但可能丢数据)3. RDB vs AOF 总结对比
持久化方式
优点
缺点
RDB
占用小,恢复快,适合冷备份
宕机时丢失最后一次快照后的数据
AOF
数据最完整,适合核心数据保护
写入频繁,占用磁盘大,恢复速度慢一点
实际项目里一般是:
RDB+AOF同时开启,互为备份。Redis支持同时打开两种模式。
Redis重启时,优先用 AOF 恢复,AOF坏了再用RDB。
三、如果是你写的应用,想结合Redis缓存做恢复,一般这样做
正常用Redis做缓存
Redis配置了RDB或AOF持久化,保障数据存储。
应用层做好容错,比如读Redis失败时,从数据库兜底,或者重新补缓存。
应用程序启动时,可以适当做一次缓存预热(预加载常用数据到缓存,提高命中率)。
示例:
@PostConstruct public void preloadCache() { // 程序启动时,把常用的设备状态、配置,提前加载到Redis里 cabinetsService.preloadCabinetsInfo(); }程序启动时,要把之前保存的缓存数据恢复回来,也就是常说的 ——
缓存恢复 / 缓存预热(Application Start Cache Recovery)
🛠 一、【本地缓存】启动恢复
假设你程序有个本地
ConcurrentHashMap<String, Object> localCache。
那你启动的时候,应该做 这三步:读取磁盘文件(比如 JSON 文件 / 本地数据库 SQLite)
反序列化成对象
putAll到内存缓存中
示例代码(假设用 JSON 文件保存的)
@Component public class LocalCacheManager { private static final Logger logger = LoggerFactory.getLogger(LocalCacheManager.class); private final ConcurrentHashMap<String, Object> localCache = new ConcurrentHashMap<>(); private static final String CACHE_FILE = "/tmp/local_cache.json"; @PostConstruct public void loadLocalCache() { File file = new File(CACHE_FILE); if (!file.exists()) { logger.warn("缓存文件不存在,跳过加载"); return; } try (FileReader reader = new FileReader(file)) { Type type = new TypeToken<Map<String, Object>>(){}.getType(); Map<String, Object> cacheFromFile = new Gson().fromJson(reader, type); if (cacheFromFile != null) { localCache.putAll(cacheFromFile); logger.info("本地缓存加载完成,条数:" + cacheFromFile.size()); } } catch (Exception e) { logger.error("本地缓存恢复失败", e); } } public Object get(String key) { return localCache.get(key); } }📌 注意:
@PostConstruct:Spring容器初始化后自动调用如果是复杂对象(比如设备状态类),反序列化的时候需要注意泛型
启动时检查Redis关键数据是否存在
缺失的话,要去数据库/接口补充加载到Redis里
示例代码(Redis缓存预热)
@Component public class RedisCachePreloader { private static final Logger logger = LoggerFactory.getLogger(RedisCachePreloader.class); @Autowired private StringRedisTemplate redisTemplate; @Autowired private CabinetsService cabinetsService; // 自己的业务服务,查DB @PostConstruct public void preloadCache() { logger.info("启动时预热Redis缓存..."); List<CabinetsInfo> cabinetsList = cabinetsService.queryAllCabinets(); for (CabinetsInfo info : cabinetsList) { if (!Boolean.TRUE.equals(redisTemplate.hasKey(info.getCabinetId()))) { redisTemplate.opsForHash().put("cabinetInfo", info.getCabinetId(), info.getServerAddr()); } } logger.info("Redis缓存预热完成,数量:" + cabinetsList.size()); } }(缓存恢复完整流程)
[程序启动] ↓ 【本地缓存】 - 读取缓存快照文件(JSON、SQLite) - 反序列化 - 填充ConcurrentHashMap 【Redis缓存】 - Redis自动恢复(RDB/AOF) - 程序检测关键数据 - 缓存缺失时补充预热