揭秘KafkaStreams 线程缓存:NamedCache深度解析

发布于:2025-09-12 ⋅ 阅读:(19) ⋅ 点赞:(0)

NamedCache

NamedCache 是 ThreadCache 的内部实现,是 Kafka Streams 中有名字的、服务于单个 Task 的、带有 LRU(最近最少使用)淘汰策略的写缓存(Write-Through/Write-Back Cache)

  • 有名字的 (Named): 每个缓存实例都有一个唯一的名称,通常与它服务的 State Store(状态存储)相关联。例如,一个名为 "my-aggregation-store" 的 KTable 会有一个对应的名为 "my-aggregation-store" 的 NamedCache
  • 服务于单个 Task: 在 Kafka Streams 的线程模型中,每个 Task 拥有自己独立的 ThreadCache,而 ThreadCache 内部会为每个有状态的 Processor 创建一个 NamedCache。这意味着缓存是 Task 级别的,不存在跨 Task 的缓存共享。
  • LRU 淘汰策略: 当缓存大小达到上限时,会优先淘汰掉最近最少被访问的数据,以保证缓存中保留的是热点数据。
  • 写缓存: 它的主要目的是批处理和缓冲对底层状态存储(如 RocksDB)的写入操作。通过将多次小的写入合并为一次大的写入,可以显著提升性能,减少对磁盘 I/O 的压力。

核心数据结构:如何同时实现快速查找和 LRU?

NamedCache 的精髓在于它巧妙地结合了两种数据结构,以同时满足“按 Key 快速查找”和“高效实现 LRU 顺序”这两个需求。

// ... existing code ...
class NamedCache {
    // ...
    // 结构 1: 用于按 Key 快速查找
    private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
    // 结构 2: 用于记录脏数据
    private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
    // 结构 3: 用于维护 LRU 顺序的双向链表
    private LRUNode tail;
    private LRUNode head;
    // ...
}
  1. NavigableMap<Bytes, LRUNode> cache = new TreeMap<>():

    • 作用: 这是缓存的主体存储,负责按 Key 快速查找。TreeMap 提供了 O(logN) 的查找、插入和删除效率,并且由于它是 NavigableMap,还能支持范围查询(keyRange)。
    • KeyBytes 类型,是 Kafka 中对 byte[] 的封装。
    • ValueLRUNode,这是实现 LRU 的关键。它不仅仅存储了值,还包含了指向前后节点的指针。
  2. LRUNode head 和 LRUNode tail:

    • 作用: 这两个指针分别指向一个手动管理的双向链表的头部和尾部。这个链表就是 LRU 顺序的体现。
    • head: 指向最近刚被使用的节点(Most Recently Used)。
    • tail: 指向最久未被使用的节点(Least Recently Used),是驱逐(evict)操作的首要目标。
  3. Set<Bytes> dirtyKeys = new LinkedHashSet<>():

    • 作用: 专门用来跟踪哪些 Key 对应的数据是“脏”的(即缓存中的数据比底层存储更新,需要被写回)。
    • LinkedHashSet: 使用这个结构既能保证 Key 的唯一性,又能维持插入顺序。这在 flush 操作中很重要,可以按数据到来的顺序写回。

这三个结构协同工作,构成了一个功能完备的高性能缓存。

LRU 策略的实现 (getputupdateLRU)

LRU 的核心是:任何一次成功的 get 或 put 操作,都应该将对应的节点移动到双向链表的头部。

  • get(Bytes key):

    1. 通过 cache.get(key) 在 TreeMap 中查找 LRUNode
    2. 如果找到(缓存命中 hit),调用 updateLRU(node)
    3. 如果未找到(缓存未命中 miss),返回 null
  • put(Bytes key, LRUCacheEntry value):

    1. 检查 cache 中是否存在该 key
    2. 如果存在(覆盖写): 更新 LRUNode 中的 value,并调用 updateLRU(node) 将其移到链表头部。
    3. 如果不存在(新插入): 创建一个新的 LRUNode,调用 putHead(node) 将其插入到链表头部,并将其存入 cache 这个 TreeMap 中。
  • updateLRU(LRUNode node): 这是 LRU 的核心动作。

    // ... existing code ...
    private void updateLRU(final LRUNode node) {
        // 1. 从链表的当前位置断开
        remove(node);
        // 2. 将其重新链接到链表头部
        putHead(node);
    }
    // ... existing code ...
    

    remove 和 putHead 是操作双向链表指针的标准方法,通过修改 next 和 previous 指针来完成节点的移动。

脏数据管理与刷新 (putflush)
  • 标记为脏 (put):

    // ... existing code ...
        if (value.isDirty()) {
            // first remove and then add so we can maintain ordering as the arrival order of the records.
            dirtyKeys.remove(key);
            dirtyKeys.add(key);
        }
    // ... existing code ...
    

    当一个带有 isDirty() 标记的 LRUCacheEntry 被 put 进缓存时,它的 key 就会被添加到 dirtyKeys 集合中。

  • 刷新 (flush)flush 是将脏数据写回底层存储的关键操作。它由外部的 ThreadCache 在特定时机(如 Task 提交 commit 时)触发。

     
    // ... existing code ...
    private void flush(final LRUNode evicted) {
        // ...
        if (listener == null) {
            throw new IllegalArgumentException("No listener for namespace " + name + " registered with cache");
        }
        // ...
        final List<ThreadCache.DirtyEntry> entries = new ArrayList<>();
        // ...
        // 遍历所有脏键
        for (final Bytes key : dirtyKeys) {
            final LRUNode node = getInternal(key);
            // ...
            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), node.entry));
            node.entry.markClean(); // 将条目标记为干净
            if (node.entry.value() == null) {
                deleted.add(node.key); // 如果是删除操作,记录下来
            }
        }
        // 在调用监听器前清空脏键集合,防止重入问题
        dirtyKeys.clear();
        // 调用监听器,将脏数据批量写回
        listener.apply(entries);
        // 从缓存中移除已标记为删除的条目
        for (final Bytes key : deleted) {
            delete(key);
        }
    }
    // ... existing code ...
    

    核心流程:

    1. 检查 listener 是否存在。这个 listener 通常就是 ThreadCache,它知道如何与底层的 State Store 交互。
    2. 遍历 dirtyKeys 集合,从 cache 中找出对应的 LRUNode
    3. 将这些脏条目(DirtyEntry)收集到一个 List 中。
    4. 调用 listener.apply(entries),将整个列表一次性传递给监听器,由监听器负责批量写入底层存储。
    5. 将已写入的条目从 dirtyKeys 中移除,并将其在缓存中的状态标记为 clean
驱逐 (evict)

当 ThreadCache 发现总内存使用量超限时,会调用 NamedCache 的 evict() 方法来释放空间。

// ... existing code ...
    synchronized void evict() {
        if (tail == null) { // 如果缓存为空,则不执行任何操作
            return;
        }
        // 1. 定位最久未使用的条目
        final LRUNode eldest = tail;
        currentSizeBytes -= eldest.size();
        // 2. 从双向链表中移除
        remove(eldest);
        // 3. 从 TreeMap 中移除
        cache.remove(eldest.key);
        // 4. 如果被驱逐的条目是脏的,必须先将其刷新到底层存储
        if (eldest.entry.isDirty()) {
            flush(eldest);
        }
        totalCacheSizeSensor.record(currentSizeBytes);
    }
// ... existing code ...

核心流程:

  1. 找到双向链表的尾部 tail,这就是“最老”的条目。
  2. 从双向链表 (remove(eldest)) 和 TreeMap (cache.remove(eldest.key)) 中移除这个节点。
  3. 关键一步:检查被驱逐的条目是否是脏的 (isDirty())。如果是,必须在丢弃它之前调用 flush(eldest) 将其数据持久化,否则会造成数据丢失。这是一个典型的**写回缓存(Write-Back Cache)**行为。

范围查询能力

我们来深入探讨 TreeMap 带来的另一个至关重要的能力:支持高效的范围查询

与 HashMap 不同,HashMap 提供 O(1) 的平均查找时间,但其内部元素是无序的。而 TreeMap 是一个基于红黑树的实现,它保证了其内部的 Key 始终处于排序状态。这个排序特性是实现范围查询的基础。

NamedCache 利用了 TreeMap 实现的 NavigableMap 接口,提供了 keyRange 和 reverseKeyRange 方法。

// ... existing code ...
    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, final boolean toInclusive) {
        final Set<Bytes> rangeSet = computeSubSet(from, to, toInclusive);
        return keySetIterator(rangeSet, true);
    }

    synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
        final Set<Bytes> rangeSet = computeSubSet(from, to, true);
        return keySetIterator(rangeSet, false);
    }

    private Set<Bytes> computeSubSet(final Bytes from, final Bytes to, final boolean toInclusive) {
        if (from == null && to == null) {
            return cache.navigableKeySet();
        } else if (from == null) {
            return cache.headMap(to, toInclusive).keySet();
        } else if (to == null) {
            return cache.tailMap(from, true).keySet();
        } else if (from.compareTo(to) > 0) {
            return Collections.emptyNavigableSet();
        } else {
            return cache.navigableKeySet().subSet(from, true, to, toInclusive);
        }
    }
// ... existing code ...
  • computeSubSet 方法: 这个方法是核心。它直接利用 NavigableMap 提供的 headMap(小于某个key)、tailMap(大于等于某个key)和 subSet(介于两个key之间)等方法。因为 TreeMap 内部是有序的,所以这些操作非常高效,可以快速定位到范围的起始点,然后顺序遍历直到结束点,而无需扫描整个集合。
  • keyRange 和 reverseKeyRange: 这两个方法封装了 computeSubSet,并提供了正向和反向的迭代器,满足不同的查询需求。

结论:选择 TreeMap 而不是 HashMap,是 Kafka Streams 在设计 NamedCache 时做出的一个关键权衡。它牺牲了单点查询理论上从 O(1) 到 O(logN) 的性能(在实践中差异可能不大),但换来了对范围查询的高效支持,而这个功能对于 Kafka Streams 的很多核心场景是不可或缺的。

tree map 摊销 O(1) 移动到下一个:通过节点内部的 parent 和 left/right 指针,迭代器可以快速地找到当前节点的后继,而无需从根节点重新搜索。


为什么 Kafka Streams 需要范围查询?

范围查询是 Kafka Streams 交互式查询(Interactive Queries) 和 窗口化操作(Windowing) 的核心基础。如果没有高效的范围查询,很多高级功能将无法实现或性能极差。

1. 窗口存储(Window Stores)

这是最典型的应用场景。当我们对一个流进行窗口化聚合时(例如,每分钟计算一次用户点击量),状态存储的 Key 通常会包含业务 Key 和 时间戳

例如,一个 TumblingWindows.of(Duration.ofMinutes(1)) 的窗口操作,其存储在 RocksDB 和缓存中的 Key 结构可能如下:

  • Key("user-A", 12:00:00) -> Value(10)
  • Key("user-A", 12:01:00) -> Value(15)
  • Key("user-B", 12:00:00) -> Value(5)
  • Key("user-A", 12:02:00) -> Value(8)

现在,如果我们想查询 "user-A" 在 12:00:00 到 12:01:30 之间的所有活动,就需要一个范围查询:

  • lowerBound = Key("user-A", 12:00:00)
  • upperBound = Key("user-A", 12:01:30)

NamedCache(以及底层的 RocksDB)可以利用其排序特性,高效地返回 Key("user-A", 12:00:00) 和 Key("user-A", 12:01:00) 这两条记录,而无需遍历所有用户的记录。

2. 交互式查询(Interactive Queries)

Kafka Streams 允许外部应用通过 RPC 等方式直接查询流处理应用内部的状态,这就是交互式查询。RangeQuery 和 WindowRangeQuery 是两种非常常见的查询类型。

// ... existing code ...
/**
 * Interactive query for issuing range queries and scans over KeyValue stores.
 * <p>
 *  A range query retrieves a set of records, specified using an upper and/or lower bound on the keys.
 * <p>
 *  A scan query retrieves all records contained in the store.
 * <p>
 *  Keys' order is based on the serialized byte[] of the keys, not the 'logical' key order.
 * @param <K> Type of keys
 * @param <V> Type of values
 */
@Evolving
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
// ... existing code ...

当一个外部请求到达,需要查询某个 Key 区间的数据时(例如,查询所有以 "user-prefix-" 开头的用户数据),这个查询最终会下推到 StateStore,并首先在 NamedCache 中执行。NamedCache 的 keyRange 方法能够快速地从缓存中返回满足条件的数据,只有当缓存中数据不完整时,才需要穿透到 RocksDB 去查询。

3. KTable 的范围扫描

对于一个 KTable,我们不仅可以做单点查询 (get),也可以做范围扫描 (range)。这在很多业务场景中都很有用,比如:

  • 获取某个订单号区间的所有订单详情。
  • 获取某个地理区域编码范围内的所有门店信息。

这些操作都直接依赖于底层状态存储和其缓存(NamedCache)的范围查询能力。

TreeMap 的使用是为了赋予缓存与底层 RocksDB(本身就是一种有序的 KV 存储)相匹配的范围查询能力。这种能力是实现 Kafka Streams 窗口化操作和交互式查询等高级功能的基石,使得 Kafka Streams 不仅仅是一个简单的流式计算引擎,更是一个功能强大的、可查询的流式数据库。


与外部的交互

  • ThreadCache: 是 NamedCache 的直接管理者。ThreadCache 负责:
    • 创建和销毁 NamedCache 实例。
    • 根据总内存使用情况,决定何时调用 evict()
    • 在 Task commit 时,调用 flush()
    • 作为 DirtyEntryFlushListener,接收 flush 出来的数据并写入 RocksDB。
  • StreamsMetricsImplNamedCache 在构造时会注册多个 Sensor,用于监控自身的性能指标,如缓存命中率 (hitRatioSensor)、缓存大小 (totalCacheSizeSensor) 等。这些指标最终会通过 JMX 暴露出来,便于运维和监控。

总结

NamedCache 是一个设计精良、职责单一的组件。它通过TreeMap + 手动管理双向链表**的经典组合,高效地实现了支持范围查询的 LRU 缓存。同时,通过 dirtyKeys 集合和 flush 监听器机制,实现了高性能的批量写回功能,并通过与 ThreadCache 的协作,完成了内存控制和数据持久化的生命周期管理,是 Kafka Streams 高性能状态操作的基石。

ThreadCache 

ThreadCache 是 Kafka Streams 中线程级别的缓存管理器。在 Kafka Streams 应用中,每个 StreamThread 都有且仅有一个 ThreadCache 实例。它的主要作用是:

  • 统一管理内存: 它负责管理一个 StreamThread 所处理的所有 Task 的缓存总大小,确保其不超过配置的阈值 (cache.max.bytes.buffering)。
  • 作为 NamedCache 的工厂和容器: 它根据需要创建、存储和管理多个 NamedCache 实例。
  • 执行驱逐策略: 当总缓存大小超限时,它负责决定从哪个 NamedCache 中驱逐数据。
  • 协调刷新操作: 它充当 NamedCache 和底层 State Store 之间的桥梁,协调 flush(刷新)操作。

简单来说,如果把 NamedCache 比作一个部门的储物柜,那么 ThreadCache 就是管理整栋办公楼所有部门储物柜的物业管理员。它不关心每个储物柜里具体放了什么,但它关心所有储物柜占用的总空间,并负责在空间不足时进行清理。


核心数据结构

ThreadCache 的内部结构相对简单,其核心是管理 NamedCache 的容器和一些统计/配置变量。

// ... existing code ...
public class ThreadCache {
    // ...
    // 1. 缓存大小上限,由 'cache.max.bytes.buffering' / num.stream.threads 决定
    private volatile long maxCacheSizeBytes;
    // 2. 核心容器,存储所有 NamedCache 实例
    private final Map<String, NamedCache> caches = new HashMap<>();

    // 3. 总缓存大小的原子计数器,保证多 Task 操作下的线程安全
    private final AtomicLong sizeInBytes = new AtomicLong();

    // ... 各种统计计数器 ...
}
  1. maxCacheSizeBytes: 这是 ThreadCache 的内存容量上限。它是一个 volatile 变量,意味着可以被动态调整(通过 resize 方法)。
  2. Map<String, NamedCache> caches: 这是 ThreadCache 的核心容器。
    • Key (String): 缓存的命名空间(namespace)。这个 namespace 通常由 taskId 和 storeName 拼接而成(例如 "0_1-my-store"),确保了每个 Task 的每个 State Store 都有一个唯一的缓存实例。
    • Value (NamedCache): 之前我们详细分析过的 NamedCache 实例。
  3. AtomicLong sizeInBytes: 这是一个非常重要的字段。它实时跟踪该 ThreadCache 管理的所有 NamedCache 的总大小之和。使用 AtomicLong 是为了确保在多 Task 并发访问(虽然 StreamThread 是单线程处理 Task,但这里的原子性更多是为了保证内存可见性和操作的原子性)时,对总大小的更新是线程安全的。

内存管理与驱逐策略 (putmaybeEvictresize)

这是 ThreadCache 最核心的职责。

  • put(String namespace, Bytes key, LRUCacheEntry value):

    1. 调用 getOrCreateCache(namespace) 获取或创建一个 NamedCache
    2. 对该 NamedCache 加锁,防止并发修改。
    3. 调用 cache.put(key, value) 将数据放入 NamedCache
    4. 更新总大小 sizeInBytes
    5. 调用 maybeEvict(namespace, cache) 检查是否需要驱逐。
  • maybeEvict(String namespace, NamedCache cache):

    // ... existing code ...
    private void maybeEvict(final String namespace, final NamedCache cache) {
        int numEvicted = 0;
        while (sizeInBytes.get() > maxCacheSizeBytes) {
            // ...
            if (cache.isEmpty()) {
                return;
            }
            final long oldSize = cache.sizeInBytes();
            cache.evict(); // 调用 NamedCache 的驱逐方法
            sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
            numEvicts++;
            numEvicted++;
        }
        // ...
    }
    

    驱逐逻辑: 当 put 操作导致总缓存大小 sizeInBytes 超过 maxCacheSizeBytes 时,会立即当前正在被操作的 NamedCache 中循环调用 evict() 方法,直到总大小降到阈值以下。这是一种简单直接的驱逐策略:谁惹的祸,谁来承担。

  • resize(long newCacheSizeBytes):

    // ... existing code ...
    public synchronized void resize(final long newCacheSizeBytes) {
        final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
        maxCacheSizeBytes = newCacheSizeBytes;
        if (shrink) {
            // ...
            // 使用循环迭代器,公平地从每个 NamedCache 中驱逐数据
            final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
            while (sizeInBytes.get() > maxCacheSizeBytes) {
                final NamedCache cache = circularIterator.next();
                synchronized (cache) {
                    // ...
                    cache.evict();
                    // ...
                }
                numEvicts++;
            }
        }
        // ...
    }
    

    缩容驱逐逻辑: 当缓存被动态缩小时,ThreadCache 会采用一种更公平的策略。它使用一个 CircularIterator(循环迭代器)来轮流从所有的 NamedCache 中驱逐条目,每个一次,直到总大小满足新的、更小的限制。这避免了因缩容而只清空某一个 NamedCache 的问题。

NamedCache 的生命周期管理 (getOrCreateCachecloseclear)

  • getOrCreateCache(String name): 这是一个典型的 "Factory" 和 "Singleton"(在 ThreadCache 范围内)模式的结合。当一个 Task 首次访问其 State Store 的缓存时,ThreadCache 会为其创建一个新的 NamedCache 实例并存入 caches 这个 Map 中。后续的访问将直接返回已存在的实例。
  • close(String namespace): 当一个 Task 被关闭或迁移时,StreamThread 会调用此方法。它会从 caches Map 中移除对应的 NamedCache,更新总缓存大小,并调用 namedCache.close() 释放其占用的资源(主要是清空内部数据结构)。
  • clear(String namespace): 这个方法用于清空某个 NamedCache 的内容,但不移除 NamedCache 实例本身。

刷新(Flush)协调 (flushaddDirtyEntryFlushListener)

ThreadCache 是 flush 机制的协调者。

  • addDirtyEntryFlushListener(String namespace, DirtyEntryFlushListener listener)CachingKeyValueStore 或其他缓存包装类在初始化时,会调用此方法将自己注册为对应 NamedCache 的监听器。这个监听器知道如何将数据写入底层的 RocksDB。

  • flush(String namespace): 当 StreamTask 执行 commit 操作时,会触发对其所有 State Store 的 flush 调用,最终会调用到 ThreadCache.flush()

    1. 找到对应的 NamedCache
    2. 对其加锁。
    3. 调用 namedCache.flush()
    4. namedCache.flush() 内部会收集所有脏数据,并调用之前注册的 DirtyEntryFlushListener 的 apply 方法,将脏数据回传给 CachingKeyValueStore,由后者完成最终的磁盘写入。

与外部的交互

  • StreamThread / StreamTaskThreadCache 的生命周期与 StreamThread 绑定。StreamTask 是 ThreadCache 的主要使用者,通过它进行 putgetdeleteflush 等操作。
  • CachingKeyValueStore (及其他缓存包装类): 这些类是 ThreadCache 的直接客户。它们将底层的 State Store 和 ThreadCache 包装在一起,向上层(Processor API)提供一个统一的、带缓存的存储接口。它们负责将自己注册为 DirtyEntryFlushListener
  • NamedCacheThreadCache 是 NamedCache 的管理者和容器,负责其创建、销毁、驱逐和刷新调度。

总结

ThreadCache 在 Kafka Streams 的内存管理中扮演着“中层管理者”的角色。它本身不直接存储数据,而是通过管理一组 NamedCache 来实现对整个线程的缓存进行统一的容量控制、生命周期管理和策略执行(如驱逐和刷新)。它通过命名空间(namespace)隔离了不同 Task 和 State Store 的缓存,同时又通过全局的字节计数器实现了对总内存的统一监控和管理,是实现 cache.max.bytes.buffering 配置的关键组件。


网站公告

今日签到

点亮在社区的每一天
去签到