NamedCache
NamedCache
是 ThreadCache
的内部实现,是 Kafka Streams 中有名字的、服务于单个 Task 的、带有 LRU(最近最少使用)淘汰策略的写缓存(Write-Through/Write-Back Cache)。
- 有名字的 (Named): 每个缓存实例都有一个唯一的名称,通常与它服务的 State Store(状态存储)相关联。例如,一个名为 "my-aggregation-store" 的 KTable 会有一个对应的名为 "my-aggregation-store" 的
NamedCache
。 - 服务于单个 Task: 在 Kafka Streams 的线程模型中,每个 Task 拥有自己独立的
ThreadCache
,而ThreadCache
内部会为每个有状态的 Processor 创建一个NamedCache
。这意味着缓存是 Task 级别的,不存在跨 Task 的缓存共享。 - LRU 淘汰策略: 当缓存大小达到上限时,会优先淘汰掉最近最少被访问的数据,以保证缓存中保留的是热点数据。
- 写缓存: 它的主要目的是批处理和缓冲对底层状态存储(如 RocksDB)的写入操作。通过将多次小的写入合并为一次大的写入,可以显著提升性能,减少对磁盘 I/O 的压力。
核心数据结构:如何同时实现快速查找和 LRU?
NamedCache
的精髓在于它巧妙地结合了两种数据结构,以同时满足“按 Key 快速查找”和“高效实现 LRU 顺序”这两个需求。
// ... existing code ...
class NamedCache {
// ...
// 结构 1: 用于按 Key 快速查找
private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
// 结构 2: 用于记录脏数据
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
// 结构 3: 用于维护 LRU 顺序的双向链表
private LRUNode tail;
private LRUNode head;
// ...
}
NavigableMap<Bytes, LRUNode> cache = new TreeMap<>()
:- 作用: 这是缓存的主体存储,负责按 Key 快速查找。
TreeMap
提供了 O(logN) 的查找、插入和删除效率,并且由于它是NavigableMap
,还能支持范围查询(keyRange
)。 - Key:
Bytes
类型,是 Kafka 中对byte[]
的封装。 - Value:
LRUNode
,这是实现 LRU 的关键。它不仅仅存储了值,还包含了指向前后节点的指针。
- 作用: 这是缓存的主体存储,负责按 Key 快速查找。
LRUNode head
和LRUNode tail
:- 作用: 这两个指针分别指向一个手动管理的双向链表的头部和尾部。这个链表就是 LRU 顺序的体现。
head
: 指向最近刚被使用的节点(Most Recently Used)。tail
: 指向最久未被使用的节点(Least Recently Used),是驱逐(evict)操作的首要目标。
Set<Bytes> dirtyKeys = new LinkedHashSet<>()
:- 作用: 专门用来跟踪哪些 Key 对应的数据是“脏”的(即缓存中的数据比底层存储更新,需要被写回)。
LinkedHashSet
: 使用这个结构既能保证 Key 的唯一性,又能维持插入顺序。这在flush
操作中很重要,可以按数据到来的顺序写回。
这三个结构协同工作,构成了一个功能完备的高性能缓存。
LRU 策略的实现 (get
, put
, updateLRU
)
LRU 的核心是:任何一次成功的 get
或 put
操作,都应该将对应的节点移动到双向链表的头部。
get(Bytes key)
:- 通过
cache.get(key)
在TreeMap
中查找LRUNode
。 - 如果找到(缓存命中
hit
),调用updateLRU(node)
。 - 如果未找到(缓存未命中
miss
),返回null
。
- 通过
put(Bytes key, LRUCacheEntry value)
:- 检查
cache
中是否存在该key
。 - 如果存在(覆盖写): 更新
LRUNode
中的value
,并调用updateLRU(node)
将其移到链表头部。 - 如果不存在(新插入): 创建一个新的
LRUNode
,调用putHead(node)
将其插入到链表头部,并将其存入cache
这个TreeMap
中。
- 检查
updateLRU(LRUNode node)
: 这是 LRU 的核心动作。// ... existing code ... private void updateLRU(final LRUNode node) { // 1. 从链表的当前位置断开 remove(node); // 2. 将其重新链接到链表头部 putHead(node); } // ... existing code ...
remove
和putHead
是操作双向链表指针的标准方法,通过修改next
和previous
指针来完成节点的移动。
脏数据管理与刷新 (put
, flush
)
标记为脏 (
put
):// ... existing code ... if (value.isDirty()) { // first remove and then add so we can maintain ordering as the arrival order of the records. dirtyKeys.remove(key); dirtyKeys.add(key); } // ... existing code ...
当一个带有
isDirty()
标记的LRUCacheEntry
被put
进缓存时,它的key
就会被添加到dirtyKeys
集合中。刷新 (
flush
):flush
是将脏数据写回底层存储的关键操作。它由外部的ThreadCache
在特定时机(如 Task 提交commit
时)触发。// ... existing code ... private void flush(final LRUNode evicted) { // ... if (listener == null) { throw new IllegalArgumentException("No listener for namespace " + name + " registered with cache"); } // ... final List<ThreadCache.DirtyEntry> entries = new ArrayList<>(); // ... // 遍历所有脏键 for (final Bytes key : dirtyKeys) { final LRUNode node = getInternal(key); // ... entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), node.entry)); node.entry.markClean(); // 将条目标记为干净 if (node.entry.value() == null) { deleted.add(node.key); // 如果是删除操作,记录下来 } } // 在调用监听器前清空脏键集合,防止重入问题 dirtyKeys.clear(); // 调用监听器,将脏数据批量写回 listener.apply(entries); // 从缓存中移除已标记为删除的条目 for (final Bytes key : deleted) { delete(key); } } // ... existing code ...
核心流程:
- 检查
listener
是否存在。这个listener
通常就是ThreadCache
,它知道如何与底层的 State Store 交互。 - 遍历
dirtyKeys
集合,从cache
中找出对应的LRUNode
。 - 将这些脏条目(DirtyEntry)收集到一个
List
中。 - 调用
listener.apply(entries)
,将整个列表一次性传递给监听器,由监听器负责批量写入底层存储。 - 将已写入的条目从
dirtyKeys
中移除,并将其在缓存中的状态标记为clean
。
- 检查
驱逐 (evict
)
当 ThreadCache
发现总内存使用量超限时,会调用 NamedCache
的 evict()
方法来释放空间。
// ... existing code ...
synchronized void evict() {
if (tail == null) { // 如果缓存为空,则不执行任何操作
return;
}
// 1. 定位最久未使用的条目
final LRUNode eldest = tail;
currentSizeBytes -= eldest.size();
// 2. 从双向链表中移除
remove(eldest);
// 3. 从 TreeMap 中移除
cache.remove(eldest.key);
// 4. 如果被驱逐的条目是脏的,必须先将其刷新到底层存储
if (eldest.entry.isDirty()) {
flush(eldest);
}
totalCacheSizeSensor.record(currentSizeBytes);
}
// ... existing code ...
核心流程:
- 找到双向链表的尾部
tail
,这就是“最老”的条目。 - 从双向链表 (
remove(eldest)
) 和TreeMap
(cache.remove(eldest.key)
) 中移除这个节点。 - 关键一步:检查被驱逐的条目是否是脏的 (
isDirty()
)。如果是,必须在丢弃它之前调用flush(eldest)
将其数据持久化,否则会造成数据丢失。这是一个典型的**写回缓存(Write-Back Cache)**行为。
范围查询能力
我们来深入探讨 TreeMap
带来的另一个至关重要的能力:支持高效的范围查询。
与 HashMap
不同,HashMap
提供 O(1) 的平均查找时间,但其内部元素是无序的。而 TreeMap
是一个基于红黑树的实现,它保证了其内部的 Key 始终处于排序状态。这个排序特性是实现范围查询的基础。
NamedCache
利用了 TreeMap
实现的 NavigableMap
接口,提供了 keyRange
和 reverseKeyRange
方法。
// ... existing code ...
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, final boolean toInclusive) {
final Set<Bytes> rangeSet = computeSubSet(from, to, toInclusive);
return keySetIterator(rangeSet, true);
}
synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
final Set<Bytes> rangeSet = computeSubSet(from, to, true);
return keySetIterator(rangeSet, false);
}
private Set<Bytes> computeSubSet(final Bytes from, final Bytes to, final boolean toInclusive) {
if (from == null && to == null) {
return cache.navigableKeySet();
} else if (from == null) {
return cache.headMap(to, toInclusive).keySet();
} else if (to == null) {
return cache.tailMap(from, true).keySet();
} else if (from.compareTo(to) > 0) {
return Collections.emptyNavigableSet();
} else {
return cache.navigableKeySet().subSet(from, true, to, toInclusive);
}
}
// ... existing code ...
computeSubSet
方法: 这个方法是核心。它直接利用NavigableMap
提供的headMap
(小于某个key)、tailMap
(大于等于某个key)和subSet
(介于两个key之间)等方法。因为TreeMap
内部是有序的,所以这些操作非常高效,可以快速定位到范围的起始点,然后顺序遍历直到结束点,而无需扫描整个集合。keyRange
和reverseKeyRange
: 这两个方法封装了computeSubSet
,并提供了正向和反向的迭代器,满足不同的查询需求。
结论:选择 TreeMap
而不是 HashMap
,是 Kafka Streams 在设计 NamedCache
时做出的一个关键权衡。它牺牲了单点查询理论上从 O(1) 到 O(logN) 的性能(在实践中差异可能不大),但换来了对范围查询的高效支持,而这个功能对于 Kafka Streams 的很多核心场景是不可或缺的。
tree map 摊销 O(1) 移动到下一个:通过节点内部的 parent 和 left/right 指针,迭代器可以快速地找到当前节点的后继,而无需从根节点重新搜索。
为什么 Kafka Streams 需要范围查询?
范围查询是 Kafka Streams 交互式查询(Interactive Queries) 和 窗口化操作(Windowing) 的核心基础。如果没有高效的范围查询,很多高级功能将无法实现或性能极差。
1. 窗口存储(Window Stores)
这是最典型的应用场景。当我们对一个流进行窗口化聚合时(例如,每分钟计算一次用户点击量),状态存储的 Key 通常会包含业务 Key 和 时间戳。
例如,一个 TumblingWindows.of(Duration.ofMinutes(1))
的窗口操作,其存储在 RocksDB 和缓存中的 Key 结构可能如下:
Key("user-A", 12:00:00)
->Value(10)
Key("user-A", 12:01:00)
->Value(15)
Key("user-B", 12:00:00)
->Value(5)
Key("user-A", 12:02:00)
->Value(8)
现在,如果我们想查询 "user-A" 在 12:00:00
到 12:01:30
之间的所有活动,就需要一个范围查询:
lowerBound
=Key("user-A", 12:00:00)
upperBound
=Key("user-A", 12:01:30)
NamedCache
(以及底层的 RocksDB)可以利用其排序特性,高效地返回 Key("user-A", 12:00:00)
和 Key("user-A", 12:01:00)
这两条记录,而无需遍历所有用户的记录。
2. 交互式查询(Interactive Queries)
Kafka Streams 允许外部应用通过 RPC 等方式直接查询流处理应用内部的状态,这就是交互式查询。RangeQuery
和 WindowRangeQuery
是两种非常常见的查询类型。
// ... existing code ...
/**
* Interactive query for issuing range queries and scans over KeyValue stores.
* <p>
* A range query retrieves a set of records, specified using an upper and/or lower bound on the keys.
* <p>
* A scan query retrieves all records contained in the store.
* <p>
* Keys' order is based on the serialized byte[] of the keys, not the 'logical' key order.
* @param <K> Type of keys
* @param <V> Type of values
*/
@Evolving
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
// ... existing code ...
当一个外部请求到达,需要查询某个 Key 区间的数据时(例如,查询所有以 "user-prefix-" 开头的用户数据),这个查询最终会下推到 StateStore
,并首先在 NamedCache
中执行。NamedCache
的 keyRange
方法能够快速地从缓存中返回满足条件的数据,只有当缓存中数据不完整时,才需要穿透到 RocksDB 去查询。
3. KTable 的范围扫描
对于一个 KTable
,我们不仅可以做单点查询 (get
),也可以做范围扫描 (range
)。这在很多业务场景中都很有用,比如:
- 获取某个订单号区间的所有订单详情。
- 获取某个地理区域编码范围内的所有门店信息。
这些操作都直接依赖于底层状态存储和其缓存(NamedCache
)的范围查询能力。
TreeMap
的使用是为了赋予缓存与底层 RocksDB(本身就是一种有序的 KV 存储)相匹配的范围查询能力。这种能力是实现 Kafka Streams 窗口化操作和交互式查询等高级功能的基石,使得 Kafka Streams 不仅仅是一个简单的流式计算引擎,更是一个功能强大的、可查询的流式数据库。
与外部的交互
ThreadCache
: 是NamedCache
的直接管理者。ThreadCache
负责:- 创建和销毁
NamedCache
实例。 - 根据总内存使用情况,决定何时调用
evict()
。 - 在 Task
commit
时,调用flush()
。 - 作为
DirtyEntryFlushListener
,接收flush
出来的数据并写入 RocksDB。
- 创建和销毁
StreamsMetricsImpl
:NamedCache
在构造时会注册多个Sensor
,用于监控自身的性能指标,如缓存命中率 (hitRatioSensor
)、缓存大小 (totalCacheSizeSensor
) 等。这些指标最终会通过 JMX 暴露出来,便于运维和监控。
总结
NamedCache
是一个设计精良、职责单一的组件。它通过TreeMap
+ 手动管理双向链表**的经典组合,高效地实现了支持范围查询的 LRU 缓存。同时,通过 dirtyKeys
集合和 flush
监听器机制,实现了高性能的批量写回功能,并通过与 ThreadCache
的协作,完成了内存控制和数据持久化的生命周期管理,是 Kafka Streams 高性能状态操作的基石。
ThreadCache
ThreadCache
是 Kafka Streams 中线程级别的缓存管理器。在 Kafka Streams 应用中,每个 StreamThread
都有且仅有一个 ThreadCache
实例。它的主要作用是:
- 统一管理内存: 它负责管理一个
StreamThread
所处理的所有 Task 的缓存总大小,确保其不超过配置的阈值 (cache.max.bytes.buffering
)。 - 作为
NamedCache
的工厂和容器: 它根据需要创建、存储和管理多个NamedCache
实例。 - 执行驱逐策略: 当总缓存大小超限时,它负责决定从哪个
NamedCache
中驱逐数据。 - 协调刷新操作: 它充当
NamedCache
和底层 State Store 之间的桥梁,协调flush
(刷新)操作。
简单来说,如果把 NamedCache
比作一个部门的储物柜,那么 ThreadCache
就是管理整栋办公楼所有部门储物柜的物业管理员。它不关心每个储物柜里具体放了什么,但它关心所有储物柜占用的总空间,并负责在空间不足时进行清理。
核心数据结构
ThreadCache
的内部结构相对简单,其核心是管理 NamedCache
的容器和一些统计/配置变量。
// ... existing code ...
public class ThreadCache {
// ...
// 1. 缓存大小上限,由 'cache.max.bytes.buffering' / num.stream.threads 决定
private volatile long maxCacheSizeBytes;
// 2. 核心容器,存储所有 NamedCache 实例
private final Map<String, NamedCache> caches = new HashMap<>();
// 3. 总缓存大小的原子计数器,保证多 Task 操作下的线程安全
private final AtomicLong sizeInBytes = new AtomicLong();
// ... 各种统计计数器 ...
}
maxCacheSizeBytes
: 这是ThreadCache
的内存容量上限。它是一个volatile
变量,意味着可以被动态调整(通过resize
方法)。Map<String, NamedCache> caches
: 这是ThreadCache
的核心容器。- Key (String): 缓存的命名空间(
namespace
)。这个namespace
通常由taskId
和storeName
拼接而成(例如"0_1-my-store"
),确保了每个 Task 的每个 State Store 都有一个唯一的缓存实例。 - Value (
NamedCache
): 之前我们详细分析过的NamedCache
实例。
- Key (String): 缓存的命名空间(
AtomicLong sizeInBytes
: 这是一个非常重要的字段。它实时跟踪该ThreadCache
管理的所有NamedCache
的总大小之和。使用AtomicLong
是为了确保在多 Task 并发访问(虽然StreamThread
是单线程处理 Task,但这里的原子性更多是为了保证内存可见性和操作的原子性)时,对总大小的更新是线程安全的。
内存管理与驱逐策略 (put
, maybeEvict
, resize
)
这是 ThreadCache
最核心的职责。
put(String namespace, Bytes key, LRUCacheEntry value)
:- 调用
getOrCreateCache(namespace)
获取或创建一个NamedCache
。 - 对该
NamedCache
加锁,防止并发修改。 - 调用
cache.put(key, value)
将数据放入NamedCache
。 - 更新总大小
sizeInBytes
。 - 调用
maybeEvict(namespace, cache)
检查是否需要驱逐。
- 调用
maybeEvict(String namespace, NamedCache cache)
:// ... existing code ... private void maybeEvict(final String namespace, final NamedCache cache) { int numEvicted = 0; while (sizeInBytes.get() > maxCacheSizeBytes) { // ... if (cache.isEmpty()) { return; } final long oldSize = cache.sizeInBytes(); cache.evict(); // 调用 NamedCache 的驱逐方法 sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); numEvicts++; numEvicted++; } // ... }
驱逐逻辑: 当
put
操作导致总缓存大小sizeInBytes
超过maxCacheSizeBytes
时,会立即在当前正在被操作的NamedCache
中循环调用evict()
方法,直到总大小降到阈值以下。这是一种简单直接的驱逐策略:谁惹的祸,谁来承担。resize(long newCacheSizeBytes)
:// ... existing code ... public synchronized void resize(final long newCacheSizeBytes) { final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes; maxCacheSizeBytes = newCacheSizeBytes; if (shrink) { // ... // 使用循环迭代器,公平地从每个 NamedCache 中驱逐数据 final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values()); while (sizeInBytes.get() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); synchronized (cache) { // ... cache.evict(); // ... } numEvicts++; } } // ... }
缩容驱逐逻辑: 当缓存被动态缩小时,
ThreadCache
会采用一种更公平的策略。它使用一个CircularIterator
(循环迭代器)来轮流从所有的NamedCache
中驱逐条目,每个一次,直到总大小满足新的、更小的限制。这避免了因缩容而只清空某一个NamedCache
的问题。
NamedCache
的生命周期管理 (getOrCreateCache
, close
, clear
)
getOrCreateCache(String name)
: 这是一个典型的 "Factory" 和 "Singleton"(在ThreadCache
范围内)模式的结合。当一个 Task 首次访问其 State Store 的缓存时,ThreadCache
会为其创建一个新的NamedCache
实例并存入caches
这个 Map 中。后续的访问将直接返回已存在的实例。close(String namespace)
: 当一个 Task 被关闭或迁移时,StreamThread
会调用此方法。它会从caches
Map 中移除对应的NamedCache
,更新总缓存大小,并调用namedCache.close()
释放其占用的资源(主要是清空内部数据结构)。clear(String namespace)
: 这个方法用于清空某个NamedCache
的内容,但不移除NamedCache
实例本身。
刷新(Flush)协调 (flush
, addDirtyEntryFlushListener
)
ThreadCache
是 flush
机制的协调者。
addDirtyEntryFlushListener(String namespace, DirtyEntryFlushListener listener)
:CachingKeyValueStore
或其他缓存包装类在初始化时,会调用此方法将自己注册为对应NamedCache
的监听器。这个监听器知道如何将数据写入底层的 RocksDB。flush(String namespace)
: 当StreamTask
执行commit
操作时,会触发对其所有 State Store 的flush
调用,最终会调用到ThreadCache.flush()
。- 找到对应的
NamedCache
。 - 对其加锁。
- 调用
namedCache.flush()
。 namedCache.flush()
内部会收集所有脏数据,并调用之前注册的DirtyEntryFlushListener
的apply
方法,将脏数据回传给CachingKeyValueStore
,由后者完成最终的磁盘写入。
- 找到对应的
与外部的交互
StreamThread
/StreamTask
:ThreadCache
的生命周期与StreamThread
绑定。StreamTask
是ThreadCache
的主要使用者,通过它进行put
,get
,delete
,flush
等操作。CachingKeyValueStore
(及其他缓存包装类): 这些类是ThreadCache
的直接客户。它们将底层的 State Store 和ThreadCache
包装在一起,向上层(Processor API)提供一个统一的、带缓存的存储接口。它们负责将自己注册为DirtyEntryFlushListener
。NamedCache
:ThreadCache
是NamedCache
的管理者和容器,负责其创建、销毁、驱逐和刷新调度。
总结
ThreadCache
在 Kafka Streams 的内存管理中扮演着“中层管理者”的角色。它本身不直接存储数据,而是通过管理一组 NamedCache
来实现对整个线程的缓存进行统一的容量控制、生命周期管理和策略执行(如驱逐和刷新)。它通过命名空间(namespace
)隔离了不同 Task 和 State Store 的缓存,同时又通过全局的字节计数器实现了对总内存的统一监控和管理,是实现 cache.max.bytes.buffering
配置的关键组件。