Paimon 读磁盘内存缓存设计

发布于:2025-07-13 ⋅ 阅读:(20) ⋅ 点赞:(0)

CacheManager

这个类是 Paimon 中负责 堆内内存缓存(On-Heap Cache) 的核心组件,主要用于缓存从文件中读取的数据页(Data Page)和索引页(Index Page),以减少对文件系统的直接 I/O,从而加速查询性能。

这与  LookupFile(基于本地磁盘的缓存)是两种不同层次的缓存策略。CacheManager 作用于更细粒度的内存级别。

CacheManager 的核心作用是提供一个受管理的、有容量限制的内存池,用于缓存从数据文件(如 ORC、Parquet)中读取的数据块

其设计思想可以概括为以下几点:

  1. 分层缓存 (Tiered Cache): Paimon 认识到,文件中的不同部分具有不同的访问价值。

    • 索引页 (Index Page): 如 ORC/Parquet 文件中的 Stripe/RowGroup 索引、字典等。这些元数据被访问的频率非常高,且对查询性能至关重要。如果能将它们常驻内存,就能快速定位到需要的数据块。
    • 数据页 (Data Page): 包含实际的行数据。它们的体积更大,访问模式可能不那么频繁。
    • 因此,CacheManager 内部维护了两个独立的缓存池:indexCache 和 dataCache,允许为更重要的索引数据分配专门的、高优先级的缓存空间。
  2. 统一接口: 尽管内部有两个缓存池,但 CacheManager 对外提供了统一的 getPage 方法。调用者只需提供一个 CacheKeyCacheManager 会根据 CacheKey 中的信息(isIndex())自动判断应该使用哪个缓存池。

  3. 内存管理: 缓存是基于 Paimon 的 MemorySegment 进行管理的,这是一种对 byte[] 的封装,可以实现高效的内存操作,并与 Paimon 的其他内存管理组件(如内存池)协同工作。

  4. 可插拔实现: 通过 Cache.CacheType 和 CacheBuilder,理论上可以支持不同的底层缓存实现(目前主要是 Guava Cache和Caffeine)。

核心成员变量与构造函数

// ... existing code ...
public class CacheManager {

    private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);

// ... existing code ...
    public static final int REFRESH_COUNT = 10;

    private final Cache dataCache;
    private final Cache indexCache;

    private int fileReadCount;

// ... existing code ...
    public CacheManager(
            Cache.CacheType cacheType, MemorySize maxMemorySize, double highPriorityPoolRatio) {
        Preconditions.checkArgument(
                highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1,
                "The high priority pool ratio should in the range [0, 1).");
        MemorySize indexCacheSize =
                MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPriorityPoolRatio));
        MemorySize dataCacheSize =
                MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPriorityPoolRatio)));
        this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
        if (highPriorityPoolRatio == 0) {
            this.indexCache = dataCache;
        } else {
            this.indexCache =
                    CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
        }
        this.fileReadCount = 0;
        LOG.info(
                "Initialize cache manager with data cache of {} and index cache of {}.",
                dataCacheSize,
                indexCacheSize);
    }
// ... existing code ...
  • dataCache & indexCache: 这就是前面提到的两个核心缓存池,类型为 Cache 接口。
  • fileReadCount: 一个简单的计数器,用于统计缓存未命中(cache miss)并实际从文件读取的次数,便于监控缓存效率。
  • 构造函数:
    • 它接收三个关键参数:cacheType(缓存实现类型)、maxMemorySize(总缓存大小)和 highPriorityPoolRatio(高优先级池,即索引缓存的比例)。
    • 根据 highPriorityPoolRatio,它将总内存 maxMemorySize 切分为 indexCacheSize 和 dataCacheSize 两部分。
    • 然后使用 CacheBuilder 分别构建 indexCache 和 dataCache。这是一个典型的工厂+建造者模式的应用。
    • 特殊处理: 如果 highPriorityPoolRatio 为 0,意味着不为索引分配专门的缓存,此时 indexCache 和 dataCache 会共享同一个缓存实例,所有数据都进入同一个池子进行淘汰。

getPage

// ... existing code ...
    public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
        Cache cache = key.isIndex() ? indexCache : dataCache;
        Cache.CacheValue value =
                cache.get(
                        key,
                        k -> {
                            this.fileReadCount++;
                            try {
                                return new Cache.CacheValue(
                                        MemorySegment.wrap(reader.read(key)), callback);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
        return checkNotNull(value, String.format("Cache result for key(%s) is null", key)).segment;
    }
// ... existing code ...

这是 CacheManager 最核心的方法,它完美地封装了“如果缓存有,就从缓存取;如果缓存没有,就加载并放入缓存”的逻辑。

  • 参数:
    • keyCacheKey,缓存的键。它唯一标识了一个数据块,通常包含文件路径、偏移量、长度等信息,并通过 isIndex() 方法表明自己是索引还是数据。
    • readerCacheReader,一个函数式接口。当缓存未命中时,CacheManager 会调用这个 reader 的 read(key) 方法来从物理存储(文件)中加载数据。
    • callbackCacheCallback,一个回调函数,当缓存项被淘汰时会执行。
  • 执行流程:
    1. key.isIndex() ? indexCache : dataCache: 首先根据 key 的类型选择正确的缓存池。
    2. cache.get(key, k -> { ... }): 这是 Guava Cache 的经典用法。
      • 它会先尝试用 key 从缓存中获取值。
      • 如果获取不到(cache miss),它会原子地执行后面提供的 Lambda 表达式(一个 Callable)。
      • 在 Lambda 表达式中:
        • fileReadCount++: 增加未命中计数。
        • reader.read(key): 调用外部传入的加载器,从磁盘读取数据 byte[]
        • MemorySegment.wrap(...): 将读取到的字节数组包装成 MemorySegment
        • new Cache.CacheValue(...): 将 MemorySegment 和 callback 一起封装成 CacheValue 对象。这个 CacheValue 才是真正存储在 Guava Cache 中的值。
      • Lambda 表达式的返回值会被放入缓存,并作为 cache.get 的结果返回。
    3. 最后,从返回的 CacheValue 中取出 MemorySegment 并返回给调用者。

其他方法

  • invalidPage(CacheKey key): 提供了手动让某个缓存项失效的能力。例如,当一个文件被删除或更新时,需要调用此方法来清除其在内存中的旧缓存。
  • SegmentContainer: 一个静态内部类,是一个用于包装 MemorySegment 并记录其访问次数的容器。不过在 CacheManager 的主逻辑中并未使用到它,是为了一些更上层的、需要追踪访问频率的特定场景预留的。

REFRESH_COUNT 的作用

/**
 * Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
 * every 10 visits, refresh the LRU strategy.
 */
public static final int REFRESH_COUNT = 10;

这个常量和它的注释表明,底层的缓存淘汰策略(如 LRU)的维护操作(比如更新访问时间、调整内部数据结构顺序)是有成本的。为了优化性能,Paimon 的设计者可能在更上层的调用代码中(例如 OrcReader 或 ParquetReader)实现了某种机制,使得不是每次访问 CacheManager 都触发缓存的内部刷新,而是累积一定次数(这里是10次)后再进行一次,这是一种常见的性能优化技巧,用稍微的策略延迟换取更高的吞吐量。不过,在 CacheManager 类本身的代码里,我们没有看到直接使用 REFRESH_COUNT 的地方,说明这个逻辑是在调用方实现的。

总结

CacheManager 是 Paimon I/O 路径上的一个关键性能优化组件。它通过以下方式提升效率:

  • 内存缓存: 将热点数据页和索引页缓存在 JVM 堆内,避免了昂贵的磁盘/网络 I/O。
  • 分层策略: 区分对待索引和数据,为更重要的索引提供了专门的、受保护的缓存空间,提高了缓存命中率和查询定位速度。
  • 封装与抽象: 将复杂的缓存读写、加载、淘汰逻辑封装在统一的接口背后,让上层的文件读取器可以方便地使用缓存功能,而无需关心具体实现。

它与 LookupFile(磁盘缓存)共同构成了 Paimon 的多级缓存体系,是 Paimon 实现高性能查询的重要基石。

CacheKey 接口

org.apache.paimon.io.cache.CacheKey

CacheKey 是 Paimon 缓存管理机制的核心组件,其主要职责是为缓存中的数据块(通常是从文件中读取的字节数据)提供一个唯一的、可识别的标识符。一个设计良好的 Key 是实现高效缓存查询、命中和失效的基础。

将从以下几个方面来解析它:

  1. CacheKey 接口设计
  2. 两种具体的实现:PositionCacheKey 和 PageIndexCacheKey
  3. CacheKey 在缓存系统中的实际应用
  4. 与其他 CacheKey 的区别

首先,CacheKey 是一个接口,而不是一个具体的类。这种设计提供了很好的灵活性。接口本身非常简洁,只定义了一个核心方法和两个静态工厂方法。

// ... existing code ...
public interface CacheKey {

    static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean isIndex) {
        return new PositionCacheKey(file, position, length, isIndex);
    }

    static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) {
        return new PageIndexCacheKey(file, pageSize, pageIndex, false);
    }

    /** @return Whether this cache key is for index cache. */
    boolean isIndex();
// ... existing code ...
}

关键设计点分析:

  • 静态工厂方法forPosition(...) 和 forPageIndex(...) 是创建 CacheKey 实例的唯一入口。这种工厂模式隐藏了具体的实现类(PositionCacheKey 和 PageIndexCacheKey),使得调用者无需关心内部细节,只需根据自己的需求(是按位置还是按页索引)来创建 Key。
  • 核心方法 isIndex(): 这个方法是 CacheKey 设计的精髓所在。它用于区分这个缓存条目是属于“数据缓存”还是“索引缓存”。在 Paimon 的 CacheManager 中,通常会为索引数据分配一个独立的、具有更高优先级的缓存池,以确保索引信息能更持久地驻留在内存中,从而加速查询。CacheManager 正是通过调用 key.isIndex() 来决定将请求路由到哪个缓存池。

// ... existing code ...
    public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
        // 通过 isIndex() 方法决定使用 indexCache 还是 dataCache
        Cache cache = key.isIndex() ? indexCache : dataCache;
        Cache.CacheValue value =
                cache.get(
// ... existing code ...

两种具体的实现

CacheKey 接口内部定义了两个私有的静态内部类作为其具体实现。

a. PositionCacheKey

这个 Key 用于标识文件中一个任意位置和长度的数据块。

// ... existing code ...
    /** Key for file position and length. */
    class PositionCacheKey implements CacheKey {

        private final RandomAccessFile file;
        private final long position;
        private final int length;
        private final boolean isIndex;

        private PositionCacheKey(
                RandomAccessFile file, long position, int length, boolean isIndex) {
            this.file = file;
            this.position = position;
            this.length = length;
            this.isIndex = isIndex;
        }

        @Override
        public boolean equals(Object o) {
// ... existing code ...
            PositionCacheKey that = (PositionCacheKey) o;
            return position == that.position
                    && length == that.length
                    && isIndex == that.isIndex
                    && Objects.equals(file, that.file);
        }

        @Override
        public int hashCode() {
            return Objects.hash(file, position, length, isIndex);
        }

        @Override
        public boolean isIndex() {
            return isIndex;
        }
    }
// ... existing code ...
  • 组成部分: 由 file(文件句柄)、position(起始偏移量)、length(数据长度)和 isIndex 标志共同定义。
  • 唯一性equals() 和 hashCode() 方法确保了只有当文件、位置、长度和索引标志都完全相同时,两个 Key 才被认为是相等的。这是它们能作为缓存 Map 中 Key 的基础。
  • 应用场景: 适用于缓存文件中大小不固定的数据块,例如 Paimon 的 SortLookupStore 中读取的一个数据 Block。

b. PageIndexCacheKey

这个 Key 用于标识文件中一个固定大小的页(Page)

// ... existing code ...
    /** Key for file page index. */
    class PageIndexCacheKey implements CacheKey {

        private final RandomAccessFile file;
        private final int pageSize;
        private final int pageIndex;
        private final boolean isIndex;

        private PageIndexCacheKey(
                RandomAccessFile file, int pageSize, int pageIndex, boolean isIndex) {
            this.file = file;
            this.pageSize = pageSize;
            this.pageIndex = pageIndex;
            this.isIndex = isIndex;
        }
// ... existing code ...
        @Override
        public boolean equals(Object o) {
// ... existing code ...
            PageIndexCacheKey that = (PageIndexCacheKey) o;
            return pageSize == that.pageSize
                    && pageIndex == that.pageIndex
                    && isIndex == that.isIndex
                    && Objects.equals(file, that.file);
        }

        @Override
        public int hashCode() {
            return Objects.hash(file, pageSize, pageIndex, isIndex);
        }
// ... existing code ...
    }
}
  • 组成部分: 由 file(文件句柄)、pageSize(页大小)、pageIndex(页号)和 isIndex 标志定义。
  • 唯一性: 同样正确实现了 equals() 和 hashCode()
  • 应用场景: 适用于将文件逻辑上划分为等大小 Page 进行管理的场景。通过页号来定位比通过绝对偏移量更简洁。

CacheKey 在缓存系统中的实际应用

我们通过 BlockCache 的例子来看 CacheKey 是如何工作的。BlockCache 用于缓存从文件中读取的数据块。

// ... existing code ...
    public MemorySegment getBlock(
            long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {

        // 1. 使用工厂方法创建一个 PositionCacheKey
        CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);

        SegmentContainer container = blocks.get(cacheKey);
        if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
            // 2. 将 CacheKey 传递给 CacheManager 获取数据
            MemorySegment segment =
                    cacheManager.getPage(
                            cacheKey,
                            key -> {
                                // 如果缓存未命中,则执行此处的加载逻辑
                                byte[] bytes = readFrom(position, length);
                                return decompressFunc.apply(bytes);
                            },
                            blocks::remove);
// ... existing code ...
        }
        return container.access();
    }
// ... existing code ...

工作流程如下:

  1. 创建 Key: 当 BlockCache 需要一个数据块时,它会根据块在文件中的 position 和 length,调用 CacheKey.forPosition(...) 创建一个 CacheKey 实例。
  2. 查询缓存: 这个 cacheKey 被传递给 CacheManager 的 getPage 方法。
  3. 路由与获取CacheManager 内部会调用 cacheKey.isIndex() 来判断应该去哪个缓存池(indexCache 或 dataCache)查找。然后以这个 cacheKey 为键,在底层的缓存(如 Caffeine 或 Guava Cache)中查找对应的 CacheValue(其内部包装了 MemorySegment)。
  4. 缓存加载: 如果缓存未命中,CacheManager 会执行传入的加载逻辑(lambda 表达式),从文件中读取数据,然后将结果存入缓存,Key 就是之前创建的 cacheKey

与其他 CacheKey 的区别

值得注意的是,在 Paimon 的代码库中,CacheKey 这个名字在不同的模块中有不同的定义。例如,在 paimon-s3-implpaimon-oss-impl 等文件系统实现中,也存在名为 CacheKey 的私有静态类。

// ... existing code ...
private static class CacheKey {

        private final Options options;
        private final String scheme;
        private final String authority;
// ... existing code ...
}

这些 CacheKey 用于缓存文件系统客户端实例(如 S3、OSS 的客户端),它们的 Key 由 schemeauthority 和配置 options 组成,目的是复用昂贵的客户端连接。

而我们分析的 org.apache.paimon.io.cache.CacheKey,其应用范围是缓存文件中的数据块,它的 Key 由文件句柄和块位置/索引组成。两者虽然同名,但用途和上下文完全不同,需要注意区分。

总而言之,org.apache.paimon.io.cache.CacheKey 是 Paimon I/O 层面缓存设计的基石,通过其灵活的接口和实现,为不同类型的文件数据(数据块和索引块)提供了统一且可区分的缓存标识,是实现高性能数据读取的重要一环。

BlockCache

这个类在 Paimon 的 Sort-based Lookup(基于排序的查找)机制中扮演着至关重要的角色,它是连接物理文件块内存缓存的桥梁。

BlockCache 的核心作用是为单个本地文件(通常是 SortLookupStoreReader 使用的排序文件)提供一个面向块(Block)的、带有缓存功能的读取层

在 SortLookupStoreReader 中,一个大的排序文件被逻辑上划分为多个数据块(Data Block)和索引块(Index Block)。当需要查找一个 Key 时,它会先通过索引块定位到可能包含这个 Key 的数据块,然后再加载这个数据块到内存中进行精确查找。

如果每次读取数据块都直接从磁盘 I/O,性能会很差。BlockCache 就是为了解决这个问题而设计的,它负责:

  1. 代理读取: 拦截所有对文件块的读取请求。
  2. 利用全局缓存: 将读取请求转发给全局的 CacheManager。如果 CacheManager 的内存中已经缓存了该块,就直接返回;如果没有,则从磁盘读取,并放入 CacheManager 中。
  3. 管理本地引用BlockCache 自身也维护一个 Map<CacheKey, SegmentContainer>,用于持有从 CacheManager 获取到的内存块的引用,并跟踪这些块的访问情况。
  4. 处理解压缩: 在从磁盘读取数据块后,它还负责调用传入的解压缩函数(decompressFunc)对数据进行解压,然后再存入缓存。

简单来说,BlockCache 是 CacheManager 的一个特定场景下的客户端使用者CacheManager 是全局的缓存池,而每个打开的排序文件都会有一个自己的 BlockCache 实例来管理自己文件内的块缓存。

核心成员变量

// ... existing code ...
public class BlockCache implements Closeable {

    private final RandomAccessFile file;
    private final FileChannel channel;
    private final CacheManager cacheManager;
    private final Map<CacheKey, SegmentContainer> blocks;

    public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
        this.file = file;
        this.channel = this.file.getChannel();
        this.cacheManager = cacheManager;
        this.blocks = new HashMap<>();
    }
// ... existing code ...
  • file 和 channelRandomAccessFile 和 FileChannel 对象,代表了 BlockCache 实例所服务的那个具体的本地文件。当缓存未命中时,会通过 channel 从磁盘读取数据。
  • cacheManager: 全局的 CacheManager 实例。BlockCache 所有的缓存操作最终都委托给它。
  • blocks: 一个 HashMap。这是 BlockCache 内部的一级引用缓存
    • Key 是 CacheKey,唯一标识一个文件块(由文件、偏移量、长度等组成)。
    • Value 是 SegmentContainer,一个包装了 MemorySegment(实际内存块)并增加了访问计数的容器。

getBlock

这是 BlockCache 最核心的方法,我们来详细解读它的逻辑。

// ... existing code ...
    public MemorySegment getBlock(
            long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {

        CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);

        SegmentContainer container = blocks.get(cacheKey);
        if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
            MemorySegment segment =
                    cacheManager.getPage(
                            cacheKey,
                            key -> {
                                byte[] bytes = readFrom(position, length);
                                return decompressFunc.apply(bytes);
                            },
                            blocks::remove);
            container = new SegmentContainer(segment);
            blocks.put(cacheKey, container);
        }
        return container.access();
    }
// ... existing code ...
  1. CacheKey cacheKey = CacheKey.forPosition(...): 首先,根据文件、偏移量、长度和是否为索引块,创建一个全局唯一的 CacheKey
  2. SegmentContainer container = blocks.get(cacheKey): 尝试从自己的本地 blocks Map 中获取这个块的容器。
  3. if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT): 这是关键的判断逻辑。在以下两种情况下,需要与全局的 CacheManager 交互:
    • container == null: 本地 blocks Map 中没有这个块的引用,说明这是第一次请求该块,或者该块之前被淘汰了。
    • container.getAccessCount() == CacheManager.REFRESH_COUNT这是一个性能优化点。这个块已经被连续访问了 REFRESH_COUNT 次(默认是10次),需要去 CacheManager "刷新"一下。这样做的目的是为了更新该块在 CacheManager 内部 LRU 策略中的“新鲜度”,防止一个频繁被 BlockCache 访问的热点块因为长时间没有“惊动”CacheManager 而被错误地淘汰。
  4. cacheManager.getPage(...): 调用全局缓存管理器获取页面。
    • 第一个参数 cacheKey: 要获取的缓存键。
    • 第二个参数 key -> { ... }: 这是一个 CacheReader Lambda。只有在 CacheManager 缓存未命中时,这个 Lambda 才会被执行。它的逻辑是: a. readFrom(position, length): 从磁盘文件中读取原始的、可能被压缩的字节。 b. decompressFunc.apply(bytes): 调用外部传入的解压函数进行解压。 c. 返回解压后的 byte[]CacheManager 会将其包装成 MemorySegment 并存入缓存。
    • 第三个参数 blocks::remove: 这是一个 CacheCallback。它被传递给 CacheManager。当 CacheManager 因为内存压力等原因决定淘汰这个缓存块时,会回调这个方法,即 blocks.remove(key),从而将 BlockCache 自己的本地 blocks Map 中对应的条目也一并移除,保持状态同步。
  5. container = new SegmentContainer(segment): 用从 CacheManager 获取到的 MemorySegment 创建一个新的 SegmentContainer
  6. blocks.put(cacheKey, container): 将新的容器放入本地的 blocks Map 中(覆盖旧的,如果有的话)。
  7. return container.access(): 调用 container.access(),这个方法会增加访问计数,并返回底层的 MemorySegment

close() 方法

// ... existing code ...
    @Override
    public void close() throws IOException {
        Set<CacheKey> sets = new HashSet<>(blocks.keySet());
        for (CacheKey key : sets) {
            cacheManager.invalidPage(key);
        }
    }
}

当一个 SortLookupStoreReader 被关闭时,它会调用其持有的 BlockCache 的 close() 方法。这个方法会遍历其本地 blocks Map 中所有持有的缓存块的 CacheKey,并逐个调用 cacheManager.invalidPage(key)。这会通知全局的 CacheManager 将这些块从缓存中移除,从而释放内存。这是一个良好的资源管理实践,确保与已关闭文件相关的缓存被及时清理。

总结

BlockCache 是一个设计精巧的类,它体现了分层设计的思想:

  • 全局与局部: 它作为 CacheManager(全局缓存)和单个文件读取器(局部需求)之间的适配层。
  • 缓存与解压: 它将缓存逻辑和解压缩逻辑结合在一起,为上层提供了透明的、解压后的数据块视图。
  • 生命周期管理: 通过 close 方法和传递给 CacheManager 的回调,它确保了其管理的缓存块的生命周期与文件的生命周期保持一致。
  • 性能优化: 通过 REFRESH_COUNT 机制,它在减少与全局缓存交互开销和维持热点数据活跃度之间取得了平衡。

BlockIterator

BlockIterator 是 Paimon 中用于 读取和遍历单个数据块(Block) 的核心抽象类。它在 Paimon 的 Sort-based Lookup Store(基于排序的查找存储)中扮演着至关重要的角色,这个存储结构主要用于高效的 Key-Value 点查。

BlockIterator 的核心职责是提供对一个已排序的、存储在内存中的二进制数据块(MemorySlice)的迭代查找功能。

它的设计目标非常明确:

  • 高性能: 直接操作二进制数据(MemorySlice),避免不必要的反序列化开销。
  • 支持二分查找: 数据块内部的记录是按 Key 排序的,BlockIterator 提供了 seekTo(targetKey) 方法,可以利用二分查找快速定位。
  • 抽象与扩展: 它是一个抽象类,定义了通用的遍历和查找逻辑,但将具体的“寻址”(如何跳到块内第 N 条记录)方式交由子类实现。

我们先来看一下它的关键成员变量:

// ... existing code ...
public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice, MemorySlice>> {

    protected final MemorySliceInput data;

    private final int recordCount;
    private final Comparator<MemorySlice> comparator;

    private BlockEntry polled;
// ... existing code ...
  • protected final MemorySliceInput data: 这是最重要的部分,它代表了整个数据块的二进制内容。MemorySliceInput 是一个输入流,可以从中读取数据。protected 修饰符意味着子类可以直接访问它。
  • private final int recordCount: 记录了这个数据块中总共有多少条 Key-Value 记录。这个值在二分查找时是确定边界的关键。
  • private final Comparator<MemorySlice> comparator: 一个比较器,用于比较两个 Key(MemorySlice 类型)的大小。这是执行排序和二分查找的基础。
  • private BlockEntry polled: 一个“预读”或“暂存”的条目。当执行 seekTo 操作时,找到的条目会先存放在这里,next() 方法会优先返回这个暂存的条目。这是一种优化,避免了重复读取。

构造函数

// ... existing code ...
    public BlockIterator(
            MemorySliceInput data, int recordCount, Comparator<MemorySlice> comparator) {
        this.data = data;
        this.recordCount = recordCount;
        this.comparator = comparator;
    }
// ... existing code ...

构造函数非常直观,就是初始化上述几个核心成员变量。

Iterator 接口实现 (hasNextnext)

BlockIterator 实现了标准的 java.util.Iterator 接口,使其可以被用在 for-each 循环等场景。

// ... existing code ...
    @Override
    public boolean hasNext() {
        return polled != null || data.isReadable();
    }

    @Override
    public BlockEntry next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        if (polled != null) {
            BlockEntry result = polled;
            polled = null;
            return result;
        }

        return readEntry();
    }
// ... existing code ...

  • hasNext(): 判断是否还有下一个元素。逻辑是:如果 polled 暂存区不为空,或者底层 data 输入流还有可读数据,就认为还有下一个。
  • next(): 获取下一个元素。它会优先检查 polled,如果 polled 中有值,就返回它并清空 polled。否则,调用 readEntry() 从数据流中读取下一条记录。

readEntry(): 记录的解析

// ... existing code ...
    private BlockEntry readEntry() {
        requireNonNull(data, "data is null");

        int keyLength;
        keyLength = data.readVarLenInt();
        MemorySlice key = data.readSlice(keyLength);

        int valueLength = data.readVarLenInt();
        MemorySlice value = data.readSlice(valueLength);

        return new BlockEntry(key, value);
    }
// ... existing code ...

这个私有方法定义了数据块中单条记录的二进制格式:

  1. 可变长度整数 (keyLength): 首先读取一个可变长度的整数,表示 Key 的长度。
  2. Key 数据 (key): 根据 keyLength 读取相应长度的字节,封装成 MemorySlice
  3. 可变长度整数 (valueLength): 接着读取表示 Value 长度的整数。
  4. Value 数据 (value): 根据 valueLength 读取相应长度的字节。
  5. 最后将 Key 和 Value 封装成 BlockEntry 返回。

seekTo(MemorySlice targetKey): 二分查找

这是 BlockIterator 最核心的功能之一。

// ... existing code ...
    public boolean seekTo(MemorySlice targetKey) {
        int left = 0;
        int right = recordCount - 1;

        while (left <= right) {
            int mid = left + (right - left) / 2;

            seekTo(mid); // <--- 调用抽象方法,跳转到第 mid 条记录的起始位置
            BlockEntry midEntry = readEntry(); // 读取该记录
            int compare = comparator.compare(midEntry.getKey(), targetKey);

            if (compare == 0) {
                polled = midEntry; // 找到了,存入 polled
                return true;
            } else if (compare > 0) { // 中间值比目标大
                polled = midEntry; // 可能是第一个大于等于目标的,暂存
                right = mid - 1;
            } else { // 中间值比目标小
                left = mid + 1;
            }
        }

        return false;
    }
// ... existing code ...

这是一个标准的二分查找算法。它的巧妙之处在于 seekTo(mid) 这个调用。它调用的是下面的抽象方法,将“物理寻址”的责任交给了子类。当循环结束时,如果找到了 targetKeypolled 会持有这个条目;如果没找到,polled 会持有第一个大于 targetKey 的条目,这对于范围扫描非常有用。

 seekTo(int record): 抽象寻址方法

// ... existing code ...
    public abstract void seekTo(int record);
// ... existing code ...

这是一个抽象方法,必须由子类实现。它的作用是将底层 data 输入流的读取指针移动到第 record 条记录的起始位置

子类实现

Paimon 提供了两个主要的子类实现,位于 BlockReader 的内部:

  • AlignedIterator: 用于处理定长记录的块。如果块中每条 Key-Value 对的长度都相同,寻址就非常简单:position = record * recordSize

    // ... existing code ...
    private static class AlignedIterator extends BlockIterator {
    
        private final int recordSize;
        // ... existing code ...
        @Override
        public void seekTo(int record) {
            data.setPosition(record * recordSize);
        }
    }
    
  • UnalignedIterator: 用于处理变长记录的块。由于记录长度可变,不能通过乘法直接计算偏移。因此,这种块的末尾会额外存储一个“偏移量索引区”,记录了每条记录的起始位置。seekTo 的实现就是去这个索引区查找偏移量。

    // ... existing code ...
    private static class UnalignedIterator extends BlockIterator {
    
        private final MemorySlice index;
        // ... existing code ...
        @Override
        public void seekTo(int record) {
            data.setPosition(index.readInt(record * 4));
        }
    }
    

总结

BlockIterator 是一个设计精良的类,它通过抽象与具体实现分离的原则,优雅地实现了对不同格式数据块的高效遍历和查找。

  • 它定义了统一的二进制记录格式(长度前缀 + 数据)和通用的迭代、二分查找逻辑
  • 它将物理寻址这一依赖于具体块格式的逻辑抽象成 seekTo(int record) 方法,交由子类 AlignedIterator 和 UnalignedIterator 实现。
  • 这种设计使得上层调用者(如 SortLookupStoreReader)可以透明地处理不同类型的 Block,而无需关心其内部是定长还是变长,大大简化了上层逻辑。

BlockEntry 和 MemorySlice 

它们是 Paimon 项目中实现高效数据读写和查找功能的核心组件,尤其在基于排序的查找存储(Sort Lookup Store)中扮演着关键角色。

会分三个部分来解析:

  1. MemorySlice:内存视图的抽象。
  2. BlockEntry:数据块中键值对的表示。
  3. 两者如何协同工作。

MemorySlice 类分析

MemorySlice 可以理解为一个“内存切片”或“内存视图”。它本身不存储实际数据,而是指向一个更大的内存区域(由 MemorySegment 管理)中的某一段。这种设计的核心优势在于零拷贝(Zero-Copy),在进行数据切分、传递时,无需复制底层字节,极大地提升了性能。

核心源码解读 (paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java):

// ... existing code ...
public final class MemorySlice implements Comparable<MemorySlice> {

    private final MemorySegment segment;
    private final int offset;
    private final int length;

    public MemorySlice(MemorySegment segment, int offset, int length) {
        this.segment = segment;
        this.offset = offset;
        this.length = length;
    }

    public MemorySegment segment() {
        return segment;
    }

    public int offset() {
        return offset;
    }

    public int length() {
        return length;
    }

    public MemorySlice slice(int index, int length) {
        if (index == 0 && length == this.length) {
            return this;
        }

        return new MemorySlice(segment, offset + index, length);
    }
// ... existing code ...
    public byte[] copyBytes() {
        byte[] bytes = new byte[length];
        segment.get(offset, bytes, 0, length);
        return bytes;
    }

    public MemorySliceInput toInput() {
        return new MemorySliceInput(this);
    }

    @Override
    public boolean equals(Object o) {
// ... existing code ...
        MemorySlice slice = (MemorySlice) o;

        // do lengths match
        if (length != slice.length) {
            return false;
        }
// ... existing code ...
        return segment.equalTo(slice.segment, offset, slice.offset, length);
    }

    @Override
    public int hashCode() {
        return MurmurHashUtils.hashBytes(segment, offset, length);
    }

    public static MemorySlice wrap(byte[] bytes) {
        return new MemorySlice(MemorySegment.wrap(bytes), 0, bytes.length);
    }
// ... existing code ...
    @Override
    public int compareTo(MemorySlice other) {
        int len = Math.min(length, other.length);
        for (int i = 0; i < len; i++) {
            int res =
                    (segment.get(offset + i) & 0xFF) - (other.segment.get(other.offset + i) & 0xFF);
            if (res != 0) {
                return res;
            }
        }
        return length - other.length;
    }
}

关键特性总结:

  • 数据结构:

    • segment: 底层的 MemorySegment 对象,是实际内存(堆上或堆外)的管理者。
    • offset: 表示该切片在 segment 中的起始位置。
    • length: 表示该切片的长度。 这三个 final 字段共同定义了一个不可变的内存视图。
  • 核心功能:

    • 零拷贝切片slice(int index, int length) 方法创建一个新的 MemorySlice 实例,它指向原 segment 的一个子区域。这个过程非常轻量,只创建了一个新对象,没有发生任何字节拷贝。
    • 内容访问: 提供了 readByte()readInt() 等方法直接从内存中读取数据。copyBytes() 方法则会真正地拷贝数据到一个新的 byte[] 数组中,这在需要将数据传递给不识别 MemorySlice 的外部组件时非常有用。
    • 基于内容的比较equals() 和 hashCode() 方法被重写,它们的行为是基于 MemorySlice 所指向的内存区域的实际内容,而不是对象引用。compareTo() 方法实现了按字典序对字节内容进行比较。这使得 MemorySlice 可以被安全地用作 Map 的键或进行排序。
    • 封装与转换wrap() 静态方法可以方便地将一个 byte[] 数组封装成 MemorySlicetoInput() 方法将其转换为 MemorySliceInput,提供流式读取的能力。

BlockEntry 类分析

BlockEntry 是一个逻辑概念,它代表了数据块(Block)中的一个条目,即一个键值对(Key-Value Pair)。它实现了 Java 的 Map.Entry 接口,使其行为符合标准的键值对规范。

核心源码解读 (paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java):

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.paimon.lookup.sort;

import org.apache.paimon.memory.MemorySlice;

import java.util.Map.Entry;

import static java.util.Objects.requireNonNull;

/** Entry represents a key value. */
public class BlockEntry implements Entry<MemorySlice, MemorySlice> {

    private final MemorySlice key;
    private final MemorySlice value;

    public BlockEntry(MemorySlice key, MemorySlice value) {
        requireNonNull(key, "key is null");
        requireNonNull(value, "value is null");
        this.key = key;
        this.value = value;
    }

    @Override
    public MemorySlice getKey() {
        return key;
    }

    @Override
    public MemorySlice getValue() {
        return value;
    }

    @Override
    public final MemorySlice setValue(MemorySlice value) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        BlockEntry entry = (BlockEntry) o;

        if (!key.equals(entry.key)) {
            return false;
        }
        return value.equals(entry.value);
    }

    @Override
    public int hashCode() {
        int result = key.hashCode();
        result = 31 * result + value.hashCode();
        return result;
    }
}

关键特性总结:

  • 数据结构:

    • keyMemorySlice 类型的键。
    • valueMemorySlice 类型的值。 它不直接持有字节数据,而是持有指向数据的 MemorySlice 视图。
  • 核心功能:

    • 不可变性key 和 value 都是 final 的,并且 setValue() 方法直接抛出 UnsupportedOperationException。这表明 BlockEntry 是一个不可变对象,一旦创建,其内容就不会改变,这在多线程或复杂的数据流处理中是安全的。
    • 标准接口: 实现 Map.Entry<MemorySlice, MemorySlice> 接口,提供了 getKey() 和 getValue() 方法,使其可以被用在任何接受 Map.Entry 的地方。
    • 基于值的比较equals() 和 hashCode() 方法委托给了其内部的 key 和 value (MemorySlice)。由于 MemorySlice 的比较是基于内容的,因此两个 BlockEntry 相等,当且仅当它们的键和值的内容分别相等。

两者如何协同工作

BlockEntry 和 MemorySlice 的结合是 Paimon 实现高效 I/O 和内存管理的关键。我们可以通过 BlockIterator 的工作流程来理解这一点。

在一个排好序的文件中,数据被组织成一个个的 Block。当需要读取或查找数据时:

  1. 读取数据块SortLookupStoreReader 从文件中读取一个压缩的数据块,解压后放入一个大的 MemorySegment 中。
  2. 创建迭代器BlockReader 会基于这个包含整个块数据的 MemorySegment 创建一个 BlockIterator
  3. 解析条目: 当 BlockIterator.next() 或 readEntry() 被调用时,它会从 MemorySliceInput (由 MemorySlice 转换而来) 中读取元数据(比如 key 的长度和 value 的长度)。
  4. 创建视图 (非拷贝): 根据读取到的长度信息,迭代器并不会将 key 和 value 的字节拷贝出来,而是调用 MemorySlice.slice() 方法,创建两个分别指向 key 数据区域和 value 数据区域的 MemorySlice 对象。
  5. 封装成 BlockEntry: 最后,这两个轻量级的 MemorySlice 对象被用来创建一个 BlockEntry 实例并返回。

相关代码片段 (paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java):

private BlockEntry readEntry() {
    requireNonNull(data, "data is null");

    int keyLength;
    keyLength = data.readVarLenInt();
    MemorySlice key = data.readSlice(keyLength); // 创建 key 的视图

    int valueLength = data.readVarLenInt();
    MemorySlice value = data.readSlice(valueLength); // 创建 value 的视图

    return new BlockEntry(key, value); // 封装成 BlockEntry
}

总结一下这种协同工作的好处:

  • 高效: 整个数据块的读取和迭代过程中,键和值的实际字节数据只在内存中存在一份(在 MemorySegment 中)。BlockEntry 和 MemorySlice 只是轻量级的视图对象,避免了大量的对象创建和数据拷贝开销。
  • 延迟物化: 只有当用户真正需要一个 byte[] 时(例如,调用 MemorySlice.copyBytes()),实际的拷贝才会发生。在进行比较、排序等操作时,都直接在内存视图上进行,速度非常快。
  • 清晰的职责分离:
    • MemorySegment: 负责管理底层内存(堆上/堆外)。
    • MemorySlice: 负责提供对 MemorySegment 的零拷贝、基于内容的视图。
    • BlockEntry: 负责将 key 和 value 的 MemorySlice 视图组合成一个逻辑上的键值对记录。

FileBasedBloomFilter 

它在 Paimon 中扮演着一个非常重要的角色:为存储在文件中的布隆过滤器数据提供一个高效、带缓存的访问接口

布隆过滤器是一种空间效率极高的数据结构,用于快速判断一个元素“可能存在”或“绝对不存在”于一个集合中。在 Paimon 这类数据湖存储中,它被用作一种文件索引(File Index),可以在查询时跳过(skip)那些肯定不包含所需数据的数据文件,从而极大地提升查询性能。

下面将从以下几个方面来解析 FileBasedBloomFilter

  1. 核心职责与设计目标
  2. 构造函数与依赖
  3. 核心方法 testHash() 的工作机制
  4. 缓存管理与生命周期
  5. 与 BloomFilter 类的关系

FileBasedBloomFilter 的核心职责是代理一个存储在物理文件特定位置的布隆过滤器。它的设计目标是:

  • 延迟加载(Lazy Loading): 不在初始化时就读取整个布隆过滤器的数据,而是在第一次需要进行判断(调用 testHash)时才从磁盘加载。
  • 缓存集成: 与 Paimon 的缓存系统(CacheManager)深度集成,将从磁盘加载的布隆过滤器数据(一个字节数组)缓存到内存中,避免重复的磁盘 I/O。
  • 统一接口: 对上层调用者提供与内存中的 BloomFilter 类似的 testHash() 方法,屏蔽底层的文件读取和缓存管理的复杂性。

我们先来看它的构造函数,这能帮助我们理解它的组成部分。

// ... existing code ...
public class FileBasedBloomFilter implements Closeable {

    private final PageFileInput input;
    private final CacheManager cacheManager;
    private final BloomFilter filter;
    private final long readOffset;
    private final int readLength;
    private final CacheKey cacheKey;
    private int accessCount;

    public FileBasedBloomFilter(
            PageFileInput input,
            CacheManager cacheManager,
            long expectedEntries,
            long readOffset,
            int readLength) {
        this.input = input;
        this.cacheManager = cacheManager;
        checkArgument(expectedEntries >= 0);
        this.filter = new BloomFilter(expectedEntries, readLength);
        this.readOffset = readOffset;
        this.readLength = readLength;
        this.accessCount = 0;
        this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true);
    }
// ... existing code ...
  • input: PageFileInput: 一个文件输入对象,提供了从文件中读取指定位置和长度数据的能力。
  • cacheManager: CacheManager: Paimon 的缓存管理器,负责管理内存缓存池。
  • expectedEntries: long: 预期的元素数量。这个值和下面的 readLength 一起用来初始化一个逻辑上的 BloomFilter 对象,计算出最佳的哈希函数个数。
  • readOffset: long: 布隆过滤器数据在文件 (input) 中的起始偏移量。
  • readLength: int: 布隆过滤器数据的长度(字节数)。
  • filter: BloomFilter: 一个内部持有的 BloomFilter 实例。注意:在构造时,这个 filter 内部的 BitSet 还没有关联任何实际的内存数据,它只是一个“空壳”,定义了哈希函数个数等元信息。
  • cacheKey: CacheKey: 根据文件、偏移量和长度创建的一个缓存键。这个 Key 非常关键,它唯一标识了这段布隆过滤器数据在 CacheManager 中的身份。isIndex 参数被设置为 true,表明这是一个索引缓存,CacheManager 会将其放入索引专用的缓存池中。

核心方法 testHash() 的工作机制

这是 FileBasedBloomFilter 最核心的方法,它实现了延迟加载和缓存刷新逻辑。

// ... existing code ...
    public boolean testHash(int hash) {
        accessCount++;
        // we should refresh cache in LRU, but we cannot refresh everytime, it is costly.
        // so we introduce a refresh count to reduce refresh
        if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) {
            MemorySegment segment =
                    cacheManager.getPage(
                            cacheKey,
                            key -> input.readPosition(readOffset, readLength),
                            new BloomFilterCallBack(filter));
            filter.setMemorySegment(segment, 0);
            accessCount = 0;
        }
        return filter.testHash(hash);
    }
// ... existing code ...

执行流程分析:

  1. 访问计数accessCount 递增。
  2. 触发缓存/加载if 条件判断是否需要从 CacheManager 获取数据。以下两种情况会触发:
    • filter.getMemorySegment() == null: 这是首次调用 testHash,或者布隆过滤器的数据已被缓存淘汰。此时 filter 内部没有指向任何内存,必须加载。
    • accessCount == REFRESH_COUNT: 这是一个缓存刷新优化。对于 LRU 这类缓存策略,频繁访问一个条目会让它一直保持“热门”,但并不会提升它在缓存中的优先级。通过设置一个访问次数阈值 (REFRESH_COUNT),定期地重新 getPage,可以起到“刷新”该条目在缓存中活跃度的作用,使其不容易被淘汰。
  3. 调用 cacheManager.getPage():
    • cacheKey: 使用之前创建的 Key 去缓存中查找。
    • key -> input.readPosition(...): 这是一个 Lambda 表达式,定义了**缓存未命中(Cache Miss)**时的加载逻辑。如果 CacheManager 中没有找到 cacheKey 对应的数据,就会执行这个 Lambda,从文件的 readOffset 处读取 readLength 字节的数据。
    • new BloomFilterCallBack(filter): 这是一个回调对象。当这个缓存条目因为内存压力等原因被 CacheManager **淘汰(Evict)**时,CacheManager 会调用这个回调对象的 onRemoval 方法。
  4. 关联内存cacheManager.getPage() 返回一个 MemorySegment,它指向了包含布隆过滤器数据的内存区域。filter.setMemorySegment(segment, 0) 这行代码将这个内存段与内部的 BloomFilter 实例关联起来。至此,filter 才成为一个功能完整的、可以进行判断的布隆过滤器。
  5. 执行判断return filter.testHash(hash),调用内部 BloomFilter 实例的 testHash 方法,在刚刚加载或从缓存中获取的内存上进行位运算,返回最终结果。

缓存管理与生命周期

FileBasedBloomFilter 实现了 Closeable 接口,并与 CacheCallback 配合,实现了完善的生命周期管理。

// ... existing code ...
    @Override
    public void close() throws IOException {
        cacheManager.invalidPage(cacheKey);
    }

    /** Call back for cache manager. */
    private static class BloomFilterCallBack implements CacheCallback {

        private final BloomFilter bloomFilter;

        private BloomFilterCallBack(BloomFilter bloomFilter) {
            this.bloomFilter = bloomFilter;
        }

        @Override
        public void onRemoval(CacheKey key) {
            this.bloomFilter.unsetMemorySegment();
        }
    }
}
  • close() 方法: 当 FileBasedBloomFilter 不再需要时(例如,一个查询任务结束),调用 close() 方法会主动通知 CacheManager 将 cacheKey 对应的缓存页失效(Invalidate)。这会释放缓存占用的内存。
  • BloomFilterCallBack:
    • 当缓存条目被动地被 CacheManager 淘汰时,onRemoval 方法会被调用。
    • this.bloomFilter.unsetMemorySegment() 这行代码会解除内部 BloomFilter 与 MemorySegment 的关联。这非常重要,它确保了当下次 testHash 被调用时,filter.getMemorySegment() == null 条件成立,从而触发重新从 CacheManager 加载数据。这保证了数据的一致性。

FileBasedBloomFilter 和 BloomFilter 是组合(Composition)关系,前者包装了后者。

  • BloomFilter: 是一个纯粹的、基于内存的布隆过滤器实现。它接收一个 MemorySegment(代表一块内存),并在这块内存上执行 addHash 和 testHash 操作。它不关心这块内存从何而来。
  • FileBasedBloomFilter: 是一个更高层次的封装,它负责处理“脏活累活”:从哪里(PageFileInput)加载数据、如何缓存数据(CacheManager)、何时加载(testHash 中的懒加载逻辑)以及如何释放资源(close 和 CacheCallback)。它将 BloomFilter 从对文件 I/O 和缓存的感知中解耦出来。

总结

FileBasedBloomFilter 是一个设计精巧的类,它通过代理模式和组合模式,将一个纯内存的 BloomFilter 数据结构适配到了一个基于文件和缓存的复杂环境中。它完美地体现了关注点分离的设计原则,使得 Paimon 能够高效地利用布隆过滤器进行数据过滤,同时通过缓存机制将 I/O 开销降到最低。


网站公告

今日签到

点亮在社区的每一天
去签到