CacheManager
这个类是 Paimon 中负责 堆内内存缓存(On-Heap Cache) 的核心组件,主要用于缓存从文件中读取的数据页(Data Page)和索引页(Index Page),以减少对文件系统的直接 I/O,从而加速查询性能。
这与 LookupFile
(基于本地磁盘的缓存)是两种不同层次的缓存策略。CacheManager
作用于更细粒度的内存级别。
CacheManager
的核心作用是提供一个受管理的、有容量限制的内存池,用于缓存从数据文件(如 ORC、Parquet)中读取的数据块。
其设计思想可以概括为以下几点:
分层缓存 (Tiered Cache): Paimon 认识到,文件中的不同部分具有不同的访问价值。
- 索引页 (Index Page): 如 ORC/Parquet 文件中的 Stripe/RowGroup 索引、字典等。这些元数据被访问的频率非常高,且对查询性能至关重要。如果能将它们常驻内存,就能快速定位到需要的数据块。
- 数据页 (Data Page): 包含实际的行数据。它们的体积更大,访问模式可能不那么频繁。
- 因此,
CacheManager
内部维护了两个独立的缓存池:indexCache
和dataCache
,允许为更重要的索引数据分配专门的、高优先级的缓存空间。
统一接口: 尽管内部有两个缓存池,但
CacheManager
对外提供了统一的getPage
方法。调用者只需提供一个CacheKey
,CacheManager
会根据CacheKey
中的信息(isIndex()
)自动判断应该使用哪个缓存池。内存管理: 缓存是基于 Paimon 的
MemorySegment
进行管理的,这是一种对byte[]
的封装,可以实现高效的内存操作,并与 Paimon 的其他内存管理组件(如内存池)协同工作。可插拔实现: 通过
Cache.CacheType
和CacheBuilder
,理论上可以支持不同的底层缓存实现(目前主要是 Guava Cache和Caffeine)。
核心成员变量与构造函数
// ... existing code ...
public class CacheManager {
private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
// ... existing code ...
public static final int REFRESH_COUNT = 10;
private final Cache dataCache;
private final Cache indexCache;
private int fileReadCount;
// ... existing code ...
public CacheManager(
Cache.CacheType cacheType, MemorySize maxMemorySize, double highPriorityPoolRatio) {
Preconditions.checkArgument(
highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1,
"The high priority pool ratio should in the range [0, 1).");
MemorySize indexCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPriorityPoolRatio));
MemorySize dataCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPriorityPoolRatio)));
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
if (highPriorityPoolRatio == 0) {
this.indexCache = dataCache;
} else {
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
}
this.fileReadCount = 0;
LOG.info(
"Initialize cache manager with data cache of {} and index cache of {}.",
dataCacheSize,
indexCacheSize);
}
// ... existing code ...
dataCache
&indexCache
: 这就是前面提到的两个核心缓存池,类型为Cache
接口。fileReadCount
: 一个简单的计数器,用于统计缓存未命中(cache miss)并实际从文件读取的次数,便于监控缓存效率。- 构造函数:
- 它接收三个关键参数:
cacheType
(缓存实现类型)、maxMemorySize
(总缓存大小)和highPriorityPoolRatio
(高优先级池,即索引缓存的比例)。 - 根据
highPriorityPoolRatio
,它将总内存maxMemorySize
切分为indexCacheSize
和dataCacheSize
两部分。 - 然后使用
CacheBuilder
分别构建indexCache
和dataCache
。这是一个典型的工厂+建造者模式的应用。 - 特殊处理: 如果
highPriorityPoolRatio
为 0,意味着不为索引分配专门的缓存,此时indexCache
和dataCache
会共享同一个缓存实例,所有数据都进入同一个池子进行淘汰。
- 它接收三个关键参数:
getPage
// ... existing code ...
public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
Cache cache = key.isIndex() ? indexCache : dataCache;
Cache.CacheValue value =
cache.get(
key,
k -> {
this.fileReadCount++;
try {
return new Cache.CacheValue(
MemorySegment.wrap(reader.read(key)), callback);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return checkNotNull(value, String.format("Cache result for key(%s) is null", key)).segment;
}
// ... existing code ...
这是 CacheManager
最核心的方法,它完美地封装了“如果缓存有,就从缓存取;如果缓存没有,就加载并放入缓存”的逻辑。
- 参数:
key
:CacheKey
,缓存的键。它唯一标识了一个数据块,通常包含文件路径、偏移量、长度等信息,并通过isIndex()
方法表明自己是索引还是数据。reader
:CacheReader
,一个函数式接口。当缓存未命中时,CacheManager
会调用这个reader
的read(key)
方法来从物理存储(文件)中加载数据。callback
:CacheCallback
,一个回调函数,当缓存项被淘汰时会执行。
- 执行流程:
key.isIndex() ? indexCache : dataCache
: 首先根据key
的类型选择正确的缓存池。cache.get(key, k -> { ... })
: 这是 Guava Cache 的经典用法。- 它会先尝试用
key
从缓存中获取值。 - 如果获取不到(cache miss),它会原子地执行后面提供的 Lambda 表达式(一个
Callable
)。 - 在 Lambda 表达式中:
fileReadCount++
: 增加未命中计数。reader.read(key)
: 调用外部传入的加载器,从磁盘读取数据byte[]
。MemorySegment.wrap(...)
: 将读取到的字节数组包装成MemorySegment
。new Cache.CacheValue(...)
: 将MemorySegment
和callback
一起封装成CacheValue
对象。这个CacheValue
才是真正存储在 Guava Cache 中的值。
- Lambda 表达式的返回值会被放入缓存,并作为
cache.get
的结果返回。
- 它会先尝试用
- 最后,从返回的
CacheValue
中取出MemorySegment
并返回给调用者。
其他方法
invalidPage(CacheKey key)
: 提供了手动让某个缓存项失效的能力。例如,当一个文件被删除或更新时,需要调用此方法来清除其在内存中的旧缓存。SegmentContainer
: 一个静态内部类,是一个用于包装MemorySegment
并记录其访问次数的容器。不过在CacheManager
的主逻辑中并未使用到它,是为了一些更上层的、需要追踪访问频率的特定场景预留的。
REFRESH_COUNT
的作用
/**
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
* every 10 visits, refresh the LRU strategy.
*/
public static final int REFRESH_COUNT = 10;
这个常量和它的注释表明,底层的缓存淘汰策略(如 LRU)的维护操作(比如更新访问时间、调整内部数据结构顺序)是有成本的。为了优化性能,Paimon 的设计者可能在更上层的调用代码中(例如 OrcReader
或 ParquetReader
)实现了某种机制,使得不是每次访问 CacheManager
都触发缓存的内部刷新,而是累积一定次数(这里是10次)后再进行一次,这是一种常见的性能优化技巧,用稍微的策略延迟换取更高的吞吐量。不过,在 CacheManager
类本身的代码里,我们没有看到直接使用 REFRESH_COUNT
的地方,说明这个逻辑是在调用方实现的。
总结
CacheManager
是 Paimon I/O 路径上的一个关键性能优化组件。它通过以下方式提升效率:
- 内存缓存: 将热点数据页和索引页缓存在 JVM 堆内,避免了昂贵的磁盘/网络 I/O。
- 分层策略: 区分对待索引和数据,为更重要的索引提供了专门的、受保护的缓存空间,提高了缓存命中率和查询定位速度。
- 封装与抽象: 将复杂的缓存读写、加载、淘汰逻辑封装在统一的接口背后,让上层的文件读取器可以方便地使用缓存功能,而无需关心具体实现。
它与 LookupFile
(磁盘缓存)共同构成了 Paimon 的多级缓存体系,是 Paimon 实现高性能查询的重要基石。
CacheKey
接口
org.apache.paimon.io.cache.CacheKey
CacheKey
是 Paimon 缓存管理机制的核心组件,其主要职责是为缓存中的数据块(通常是从文件中读取的字节数据)提供一个唯一的、可识别的标识符。一个设计良好的 Key 是实现高效缓存查询、命中和失效的基础。
将从以下几个方面来解析它:
CacheKey
接口设计- 两种具体的实现:
PositionCacheKey
和PageIndexCacheKey
CacheKey
在缓存系统中的实际应用- 与其他
CacheKey
的区别
首先,CacheKey
是一个接口,而不是一个具体的类。这种设计提供了很好的灵活性。接口本身非常简洁,只定义了一个核心方法和两个静态工厂方法。
// ... existing code ...
public interface CacheKey {
static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean isIndex) {
return new PositionCacheKey(file, position, length, isIndex);
}
static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) {
return new PageIndexCacheKey(file, pageSize, pageIndex, false);
}
/** @return Whether this cache key is for index cache. */
boolean isIndex();
// ... existing code ...
}
关键设计点分析:
- 静态工厂方法:
forPosition(...)
和forPageIndex(...)
是创建CacheKey
实例的唯一入口。这种工厂模式隐藏了具体的实现类(PositionCacheKey
和PageIndexCacheKey
),使得调用者无需关心内部细节,只需根据自己的需求(是按位置还是按页索引)来创建 Key。 - 核心方法
isIndex()
: 这个方法是CacheKey
设计的精髓所在。它用于区分这个缓存条目是属于“数据缓存”还是“索引缓存”。在 Paimon 的CacheManager
中,通常会为索引数据分配一个独立的、具有更高优先级的缓存池,以确保索引信息能更持久地驻留在内存中,从而加速查询。CacheManager
正是通过调用key.isIndex()
来决定将请求路由到哪个缓存池。
// ... existing code ...
public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
// 通过 isIndex() 方法决定使用 indexCache 还是 dataCache
Cache cache = key.isIndex() ? indexCache : dataCache;
Cache.CacheValue value =
cache.get(
// ... existing code ...
两种具体的实现
CacheKey
接口内部定义了两个私有的静态内部类作为其具体实现。
a. PositionCacheKey
这个 Key 用于标识文件中一个任意位置和长度的数据块。
// ... existing code ...
/** Key for file position and length. */
class PositionCacheKey implements CacheKey {
private final RandomAccessFile file;
private final long position;
private final int length;
private final boolean isIndex;
private PositionCacheKey(
RandomAccessFile file, long position, int length, boolean isIndex) {
this.file = file;
this.position = position;
this.length = length;
this.isIndex = isIndex;
}
@Override
public boolean equals(Object o) {
// ... existing code ...
PositionCacheKey that = (PositionCacheKey) o;
return position == that.position
&& length == that.length
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}
@Override
public int hashCode() {
return Objects.hash(file, position, length, isIndex);
}
@Override
public boolean isIndex() {
return isIndex;
}
}
// ... existing code ...
- 组成部分: 由
file
(文件句柄)、position
(起始偏移量)、length
(数据长度)和isIndex
标志共同定义。 - 唯一性:
equals()
和hashCode()
方法确保了只有当文件、位置、长度和索引标志都完全相同时,两个 Key 才被认为是相等的。这是它们能作为缓存Map
中 Key 的基础。 - 应用场景: 适用于缓存文件中大小不固定的数据块,例如 Paimon 的
SortLookupStore
中读取的一个数据 Block。
b. PageIndexCacheKey
这个 Key 用于标识文件中一个固定大小的页(Page)。
// ... existing code ...
/** Key for file page index. */
class PageIndexCacheKey implements CacheKey {
private final RandomAccessFile file;
private final int pageSize;
private final int pageIndex;
private final boolean isIndex;
private PageIndexCacheKey(
RandomAccessFile file, int pageSize, int pageIndex, boolean isIndex) {
this.file = file;
this.pageSize = pageSize;
this.pageIndex = pageIndex;
this.isIndex = isIndex;
}
// ... existing code ...
@Override
public boolean equals(Object o) {
// ... existing code ...
PageIndexCacheKey that = (PageIndexCacheKey) o;
return pageSize == that.pageSize
&& pageIndex == that.pageIndex
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}
@Override
public int hashCode() {
return Objects.hash(file, pageSize, pageIndex, isIndex);
}
// ... existing code ...
}
}
- 组成部分: 由
file
(文件句柄)、pageSize
(页大小)、pageIndex
(页号)和isIndex
标志定义。 - 唯一性: 同样正确实现了
equals()
和hashCode()
。 - 应用场景: 适用于将文件逻辑上划分为等大小 Page 进行管理的场景。通过页号来定位比通过绝对偏移量更简洁。
CacheKey
在缓存系统中的实际应用
我们通过 BlockCache
的例子来看 CacheKey
是如何工作的。BlockCache
用于缓存从文件中读取的数据块。
// ... existing code ...
public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {
// 1. 使用工厂方法创建一个 PositionCacheKey
CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);
SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
// 2. 将 CacheKey 传递给 CacheManager 获取数据
MemorySegment segment =
cacheManager.getPage(
cacheKey,
key -> {
// 如果缓存未命中,则执行此处的加载逻辑
byte[] bytes = readFrom(position, length);
return decompressFunc.apply(bytes);
},
blocks::remove);
// ... existing code ...
}
return container.access();
}
// ... existing code ...
工作流程如下:
- 创建 Key: 当
BlockCache
需要一个数据块时,它会根据块在文件中的position
和length
,调用CacheKey.forPosition(...)
创建一个CacheKey
实例。 - 查询缓存: 这个
cacheKey
被传递给CacheManager
的getPage
方法。 - 路由与获取:
CacheManager
内部会调用cacheKey.isIndex()
来判断应该去哪个缓存池(indexCache
或dataCache
)查找。然后以这个cacheKey
为键,在底层的缓存(如 Caffeine 或 Guava Cache)中查找对应的CacheValue
(其内部包装了MemorySegment
)。 - 缓存加载: 如果缓存未命中,
CacheManager
会执行传入的加载逻辑(lambda 表达式),从文件中读取数据,然后将结果存入缓存,Key 就是之前创建的cacheKey
。
与其他 CacheKey
的区别
值得注意的是,在 Paimon 的代码库中,CacheKey
这个名字在不同的模块中有不同的定义。例如,在 paimon-s3-impl
、paimon-oss-impl
等文件系统实现中,也存在名为 CacheKey
的私有静态类。
// ... existing code ...
private static class CacheKey {
private final Options options;
private final String scheme;
private final String authority;
// ... existing code ...
}
这些 CacheKey
用于缓存文件系统客户端实例(如 S3、OSS 的客户端),它们的 Key 由 scheme
、authority
和配置 options
组成,目的是复用昂贵的客户端连接。
而我们分析的 org.apache.paimon.io.cache.CacheKey
,其应用范围是缓存文件中的数据块,它的 Key 由文件句柄和块位置/索引组成。两者虽然同名,但用途和上下文完全不同,需要注意区分。
总而言之,org.apache.paimon.io.cache.CacheKey
是 Paimon I/O 层面缓存设计的基石,通过其灵活的接口和实现,为不同类型的文件数据(数据块和索引块)提供了统一且可区分的缓存标识,是实现高性能数据读取的重要一环。
BlockCache
这个类在 Paimon 的 Sort-based Lookup
(基于排序的查找)机制中扮演着至关重要的角色,它是连接物理文件块和内存缓存的桥梁。
BlockCache
的核心作用是为单个本地文件(通常是 SortLookupStoreReader
使用的排序文件)提供一个面向块(Block)的、带有缓存功能的读取层。
在 SortLookupStoreReader
中,一个大的排序文件被逻辑上划分为多个数据块(Data Block)和索引块(Index Block)。当需要查找一个 Key 时,它会先通过索引块定位到可能包含这个 Key 的数据块,然后再加载这个数据块到内存中进行精确查找。
如果每次读取数据块都直接从磁盘 I/O,性能会很差。BlockCache
就是为了解决这个问题而设计的,它负责:
- 代理读取: 拦截所有对文件块的读取请求。
- 利用全局缓存: 将读取请求转发给全局的
CacheManager
。如果CacheManager
的内存中已经缓存了该块,就直接返回;如果没有,则从磁盘读取,并放入CacheManager
中。 - 管理本地引用:
BlockCache
自身也维护一个Map<CacheKey, SegmentContainer>
,用于持有从CacheManager
获取到的内存块的引用,并跟踪这些块的访问情况。 - 处理解压缩: 在从磁盘读取数据块后,它还负责调用传入的解压缩函数(
decompressFunc
)对数据进行解压,然后再存入缓存。
简单来说,BlockCache
是 CacheManager
的一个特定场景下的客户端或使用者。CacheManager
是全局的缓存池,而每个打开的排序文件都会有一个自己的 BlockCache
实例来管理自己文件内的块缓存。
核心成员变量
// ... existing code ...
public class BlockCache implements Closeable {
private final RandomAccessFile file;
private final FileChannel channel;
private final CacheManager cacheManager;
private final Map<CacheKey, SegmentContainer> blocks;
public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
this.file = file;
this.channel = this.file.getChannel();
this.cacheManager = cacheManager;
this.blocks = new HashMap<>();
}
// ... existing code ...
file
和channel
:RandomAccessFile
和FileChannel
对象,代表了BlockCache
实例所服务的那个具体的本地文件。当缓存未命中时,会通过channel
从磁盘读取数据。cacheManager
: 全局的CacheManager
实例。BlockCache
所有的缓存操作最终都委托给它。blocks
: 一个HashMap
。这是BlockCache
内部的一级引用缓存。- Key 是
CacheKey
,唯一标识一个文件块(由文件、偏移量、长度等组成)。 - Value 是
SegmentContainer
,一个包装了MemorySegment
(实际内存块)并增加了访问计数的容器。
- Key 是
getBlock
这是 BlockCache
最核心的方法,我们来详细解读它的逻辑。
// ... existing code ...
public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {
CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);
SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
MemorySegment segment =
cacheManager.getPage(
cacheKey,
key -> {
byte[] bytes = readFrom(position, length);
return decompressFunc.apply(bytes);
},
blocks::remove);
container = new SegmentContainer(segment);
blocks.put(cacheKey, container);
}
return container.access();
}
// ... existing code ...
CacheKey cacheKey = CacheKey.forPosition(...)
: 首先,根据文件、偏移量、长度和是否为索引块,创建一个全局唯一的CacheKey
。SegmentContainer container = blocks.get(cacheKey)
: 尝试从自己的本地blocks
Map 中获取这个块的容器。if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT)
: 这是关键的判断逻辑。在以下两种情况下,需要与全局的CacheManager
交互:container == null
: 本地blocks
Map 中没有这个块的引用,说明这是第一次请求该块,或者该块之前被淘汰了。container.getAccessCount() == CacheManager.REFRESH_COUNT
: 这是一个性能优化点。这个块已经被连续访问了REFRESH_COUNT
次(默认是10次),需要去CacheManager
"刷新"一下。这样做的目的是为了更新该块在CacheManager
内部 LRU 策略中的“新鲜度”,防止一个频繁被BlockCache
访问的热点块因为长时间没有“惊动”CacheManager
而被错误地淘汰。
cacheManager.getPage(...)
: 调用全局缓存管理器获取页面。- 第一个参数
cacheKey
: 要获取的缓存键。 - 第二个参数
key -> { ... }
: 这是一个CacheReader
Lambda。只有在CacheManager
缓存未命中时,这个 Lambda 才会被执行。它的逻辑是: a.readFrom(position, length)
: 从磁盘文件中读取原始的、可能被压缩的字节。 b.decompressFunc.apply(bytes)
: 调用外部传入的解压函数进行解压。 c. 返回解压后的byte[]
,CacheManager
会将其包装成MemorySegment
并存入缓存。 - 第三个参数
blocks::remove
: 这是一个CacheCallback
。它被传递给CacheManager
。当CacheManager
因为内存压力等原因决定淘汰这个缓存块时,会回调这个方法,即blocks.remove(key)
,从而将BlockCache
自己的本地blocks
Map 中对应的条目也一并移除,保持状态同步。
- 第一个参数
container = new SegmentContainer(segment)
: 用从CacheManager
获取到的MemorySegment
创建一个新的SegmentContainer
。blocks.put(cacheKey, container)
: 将新的容器放入本地的blocks
Map 中(覆盖旧的,如果有的话)。return container.access()
: 调用container.access()
,这个方法会增加访问计数,并返回底层的MemorySegment
。
close()
方法
// ... existing code ...
@Override
public void close() throws IOException {
Set<CacheKey> sets = new HashSet<>(blocks.keySet());
for (CacheKey key : sets) {
cacheManager.invalidPage(key);
}
}
}
当一个 SortLookupStoreReader
被关闭时,它会调用其持有的 BlockCache
的 close()
方法。这个方法会遍历其本地 blocks
Map 中所有持有的缓存块的 CacheKey
,并逐个调用 cacheManager.invalidPage(key)
。这会通知全局的 CacheManager
将这些块从缓存中移除,从而释放内存。这是一个良好的资源管理实践,确保与已关闭文件相关的缓存被及时清理。
总结
BlockCache
是一个设计精巧的类,它体现了分层设计的思想:
- 全局与局部: 它作为
CacheManager
(全局缓存)和单个文件读取器(局部需求)之间的适配层。 - 缓存与解压: 它将缓存逻辑和解压缩逻辑结合在一起,为上层提供了透明的、解压后的数据块视图。
- 生命周期管理: 通过
close
方法和传递给CacheManager
的回调,它确保了其管理的缓存块的生命周期与文件的生命周期保持一致。 - 性能优化: 通过
REFRESH_COUNT
机制,它在减少与全局缓存交互开销和维持热点数据活跃度之间取得了平衡。
BlockIterator
BlockIterator
是 Paimon 中用于 读取和遍历单个数据块(Block) 的核心抽象类。它在 Paimon 的 Sort-based Lookup Store
(基于排序的查找存储)中扮演着至关重要的角色,这个存储结构主要用于高效的 Key-Value 点查。
BlockIterator
的核心职责是提供对一个已排序的、存储在内存中的二进制数据块(MemorySlice
)的迭代和查找功能。
它的设计目标非常明确:
- 高性能: 直接操作二进制数据(
MemorySlice
),避免不必要的反序列化开销。 - 支持二分查找: 数据块内部的记录是按 Key 排序的,
BlockIterator
提供了seekTo(targetKey)
方法,可以利用二分查找快速定位。 - 抽象与扩展: 它是一个抽象类,定义了通用的遍历和查找逻辑,但将具体的“寻址”(如何跳到块内第 N 条记录)方式交由子类实现。
我们先来看一下它的关键成员变量:
// ... existing code ...
public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice, MemorySlice>> {
protected final MemorySliceInput data;
private final int recordCount;
private final Comparator<MemorySlice> comparator;
private BlockEntry polled;
// ... existing code ...
protected final MemorySliceInput data
: 这是最重要的部分,它代表了整个数据块的二进制内容。MemorySliceInput
是一个输入流,可以从中读取数据。protected
修饰符意味着子类可以直接访问它。private final int recordCount
: 记录了这个数据块中总共有多少条 Key-Value 记录。这个值在二分查找时是确定边界的关键。private final Comparator<MemorySlice> comparator
: 一个比较器,用于比较两个 Key(MemorySlice
类型)的大小。这是执行排序和二分查找的基础。private BlockEntry polled
: 一个“预读”或“暂存”的条目。当执行seekTo
操作时,找到的条目会先存放在这里,next()
方法会优先返回这个暂存的条目。这是一种优化,避免了重复读取。
构造函数
// ... existing code ...
public BlockIterator(
MemorySliceInput data, int recordCount, Comparator<MemorySlice> comparator) {
this.data = data;
this.recordCount = recordCount;
this.comparator = comparator;
}
// ... existing code ...
构造函数非常直观,就是初始化上述几个核心成员变量。
Iterator
接口实现 (hasNext
, next
)
BlockIterator
实现了标准的 java.util.Iterator
接口,使其可以被用在 for-each
循环等场景。
// ... existing code ...
@Override
public boolean hasNext() {
return polled != null || data.isReadable();
}
@Override
public BlockEntry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (polled != null) {
BlockEntry result = polled;
polled = null;
return result;
}
return readEntry();
}
// ... existing code ...
hasNext()
: 判断是否还有下一个元素。逻辑是:如果polled
暂存区不为空,或者底层data
输入流还有可读数据,就认为还有下一个。next()
: 获取下一个元素。它会优先检查polled
,如果polled
中有值,就返回它并清空polled
。否则,调用readEntry()
从数据流中读取下一条记录。
readEntry()
: 记录的解析
// ... existing code ...
private BlockEntry readEntry() {
requireNonNull(data, "data is null");
int keyLength;
keyLength = data.readVarLenInt();
MemorySlice key = data.readSlice(keyLength);
int valueLength = data.readVarLenInt();
MemorySlice value = data.readSlice(valueLength);
return new BlockEntry(key, value);
}
// ... existing code ...
这个私有方法定义了数据块中单条记录的二进制格式:
- 可变长度整数 (keyLength): 首先读取一个可变长度的整数,表示 Key 的长度。
- Key 数据 (key): 根据
keyLength
读取相应长度的字节,封装成MemorySlice
。 - 可变长度整数 (valueLength): 接着读取表示 Value 长度的整数。
- Value 数据 (value): 根据
valueLength
读取相应长度的字节。 - 最后将 Key 和 Value 封装成
BlockEntry
返回。
seekTo(MemorySlice targetKey)
: 二分查找
这是 BlockIterator
最核心的功能之一。
// ... existing code ...
public boolean seekTo(MemorySlice targetKey) {
int left = 0;
int right = recordCount - 1;
while (left <= right) {
int mid = left + (right - left) / 2;
seekTo(mid); // <--- 调用抽象方法,跳转到第 mid 条记录的起始位置
BlockEntry midEntry = readEntry(); // 读取该记录
int compare = comparator.compare(midEntry.getKey(), targetKey);
if (compare == 0) {
polled = midEntry; // 找到了,存入 polled
return true;
} else if (compare > 0) { // 中间值比目标大
polled = midEntry; // 可能是第一个大于等于目标的,暂存
right = mid - 1;
} else { // 中间值比目标小
left = mid + 1;
}
}
return false;
}
// ... existing code ...
这是一个标准的二分查找算法。它的巧妙之处在于 seekTo(mid)
这个调用。它调用的是下面的抽象方法,将“物理寻址”的责任交给了子类。当循环结束时,如果找到了 targetKey
,polled
会持有这个条目;如果没找到,polled
会持有第一个大于 targetKey
的条目,这对于范围扫描非常有用。
seekTo(int record)
: 抽象寻址方法
// ... existing code ...
public abstract void seekTo(int record);
// ... existing code ...
这是一个抽象方法,必须由子类实现。它的作用是将底层 data
输入流的读取指针移动到第 record
条记录的起始位置。
子类实现
Paimon 提供了两个主要的子类实现,位于 BlockReader
的内部:
AlignedIterator
: 用于处理定长记录的块。如果块中每条 Key-Value 对的长度都相同,寻址就非常简单:position = record * recordSize
。// ... existing code ... private static class AlignedIterator extends BlockIterator { private final int recordSize; // ... existing code ... @Override public void seekTo(int record) { data.setPosition(record * recordSize); } }
UnalignedIterator
: 用于处理变长记录的块。由于记录长度可变,不能通过乘法直接计算偏移。因此,这种块的末尾会额外存储一个“偏移量索引区”,记录了每条记录的起始位置。seekTo
的实现就是去这个索引区查找偏移量。// ... existing code ... private static class UnalignedIterator extends BlockIterator { private final MemorySlice index; // ... existing code ... @Override public void seekTo(int record) { data.setPosition(index.readInt(record * 4)); } }
总结
BlockIterator
是一个设计精良的类,它通过抽象与具体实现分离的原则,优雅地实现了对不同格式数据块的高效遍历和查找。
- 它定义了统一的二进制记录格式(长度前缀 + 数据)和通用的迭代、二分查找逻辑。
- 它将物理寻址这一依赖于具体块格式的逻辑抽象成
seekTo(int record)
方法,交由子类AlignedIterator
和UnalignedIterator
实现。 - 这种设计使得上层调用者(如
SortLookupStoreReader
)可以透明地处理不同类型的 Block,而无需关心其内部是定长还是变长,大大简化了上层逻辑。
BlockEntry
和 MemorySlice
它们是 Paimon 项目中实现高效数据读写和查找功能的核心组件,尤其在基于排序的查找存储(Sort Lookup Store)中扮演着关键角色。
会分三个部分来解析:
MemorySlice
:内存视图的抽象。BlockEntry
:数据块中键值对的表示。- 两者如何协同工作。
MemorySlice
类分析
MemorySlice
可以理解为一个“内存切片”或“内存视图”。它本身不存储实际数据,而是指向一个更大的内存区域(由 MemorySegment
管理)中的某一段。这种设计的核心优势在于零拷贝(Zero-Copy),在进行数据切分、传递时,无需复制底层字节,极大地提升了性能。
核心源码解读 (paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
):
// ... existing code ...
public final class MemorySlice implements Comparable<MemorySlice> {
private final MemorySegment segment;
private final int offset;
private final int length;
public MemorySlice(MemorySegment segment, int offset, int length) {
this.segment = segment;
this.offset = offset;
this.length = length;
}
public MemorySegment segment() {
return segment;
}
public int offset() {
return offset;
}
public int length() {
return length;
}
public MemorySlice slice(int index, int length) {
if (index == 0 && length == this.length) {
return this;
}
return new MemorySlice(segment, offset + index, length);
}
// ... existing code ...
public byte[] copyBytes() {
byte[] bytes = new byte[length];
segment.get(offset, bytes, 0, length);
return bytes;
}
public MemorySliceInput toInput() {
return new MemorySliceInput(this);
}
@Override
public boolean equals(Object o) {
// ... existing code ...
MemorySlice slice = (MemorySlice) o;
// do lengths match
if (length != slice.length) {
return false;
}
// ... existing code ...
return segment.equalTo(slice.segment, offset, slice.offset, length);
}
@Override
public int hashCode() {
return MurmurHashUtils.hashBytes(segment, offset, length);
}
public static MemorySlice wrap(byte[] bytes) {
return new MemorySlice(MemorySegment.wrap(bytes), 0, bytes.length);
}
// ... existing code ...
@Override
public int compareTo(MemorySlice other) {
int len = Math.min(length, other.length);
for (int i = 0; i < len; i++) {
int res =
(segment.get(offset + i) & 0xFF) - (other.segment.get(other.offset + i) & 0xFF);
if (res != 0) {
return res;
}
}
return length - other.length;
}
}
关键特性总结:
数据结构:
segment
: 底层的MemorySegment
对象,是实际内存(堆上或堆外)的管理者。offset
: 表示该切片在segment
中的起始位置。length
: 表示该切片的长度。 这三个final
字段共同定义了一个不可变的内存视图。
核心功能:
- 零拷贝切片:
slice(int index, int length)
方法创建一个新的MemorySlice
实例,它指向原segment
的一个子区域。这个过程非常轻量,只创建了一个新对象,没有发生任何字节拷贝。 - 内容访问: 提供了
readByte()
,readInt()
等方法直接从内存中读取数据。copyBytes()
方法则会真正地拷贝数据到一个新的byte[]
数组中,这在需要将数据传递给不识别MemorySlice
的外部组件时非常有用。 - 基于内容的比较:
equals()
和hashCode()
方法被重写,它们的行为是基于MemorySlice
所指向的内存区域的实际内容,而不是对象引用。compareTo()
方法实现了按字典序对字节内容进行比较。这使得MemorySlice
可以被安全地用作Map
的键或进行排序。 - 封装与转换:
wrap()
静态方法可以方便地将一个byte[]
数组封装成MemorySlice
。toInput()
方法将其转换为MemorySliceInput
,提供流式读取的能力。
- 零拷贝切片:
BlockEntry
类分析
BlockEntry
是一个逻辑概念,它代表了数据块(Block)中的一个条目,即一个键值对(Key-Value Pair)。它实现了 Java 的 Map.Entry
接口,使其行为符合标准的键值对规范。
核心源码解读 (paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.lookup.sort;
import org.apache.paimon.memory.MemorySlice;
import java.util.Map.Entry;
import static java.util.Objects.requireNonNull;
/** Entry represents a key value. */
public class BlockEntry implements Entry<MemorySlice, MemorySlice> {
private final MemorySlice key;
private final MemorySlice value;
public BlockEntry(MemorySlice key, MemorySlice value) {
requireNonNull(key, "key is null");
requireNonNull(value, "value is null");
this.key = key;
this.value = value;
}
@Override
public MemorySlice getKey() {
return key;
}
@Override
public MemorySlice getValue() {
return value;
}
@Override
public final MemorySlice setValue(MemorySlice value) {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BlockEntry entry = (BlockEntry) o;
if (!key.equals(entry.key)) {
return false;
}
return value.equals(entry.value);
}
@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + value.hashCode();
return result;
}
}
关键特性总结:
数据结构:
key
:MemorySlice
类型的键。value
:MemorySlice
类型的值。 它不直接持有字节数据,而是持有指向数据的MemorySlice
视图。
核心功能:
- 不可变性:
key
和value
都是final
的,并且setValue()
方法直接抛出UnsupportedOperationException
。这表明BlockEntry
是一个不可变对象,一旦创建,其内容就不会改变,这在多线程或复杂的数据流处理中是安全的。 - 标准接口: 实现
Map.Entry<MemorySlice, MemorySlice>
接口,提供了getKey()
和getValue()
方法,使其可以被用在任何接受Map.Entry
的地方。 - 基于值的比较:
equals()
和hashCode()
方法委托给了其内部的key
和value
(MemorySlice
)。由于MemorySlice
的比较是基于内容的,因此两个BlockEntry
相等,当且仅当它们的键和值的内容分别相等。
- 不可变性:
两者如何协同工作
BlockEntry
和 MemorySlice
的结合是 Paimon 实现高效 I/O 和内存管理的关键。我们可以通过 BlockIterator
的工作流程来理解这一点。
在一个排好序的文件中,数据被组织成一个个的 Block。当需要读取或查找数据时:
- 读取数据块:
SortLookupStoreReader
从文件中读取一个压缩的数据块,解压后放入一个大的MemorySegment
中。 - 创建迭代器:
BlockReader
会基于这个包含整个块数据的MemorySegment
创建一个BlockIterator
。 - 解析条目: 当
BlockIterator.next()
或readEntry()
被调用时,它会从MemorySliceInput
(由MemorySlice
转换而来) 中读取元数据(比如 key 的长度和 value 的长度)。 - 创建视图 (非拷贝): 根据读取到的长度信息,迭代器并不会将 key 和 value 的字节拷贝出来,而是调用
MemorySlice.slice()
方法,创建两个分别指向 key 数据区域和 value 数据区域的MemorySlice
对象。 - 封装成
BlockEntry
: 最后,这两个轻量级的MemorySlice
对象被用来创建一个BlockEntry
实例并返回。
相关代码片段 (paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
):
private BlockEntry readEntry() {
requireNonNull(data, "data is null");
int keyLength;
keyLength = data.readVarLenInt();
MemorySlice key = data.readSlice(keyLength); // 创建 key 的视图
int valueLength = data.readVarLenInt();
MemorySlice value = data.readSlice(valueLength); // 创建 value 的视图
return new BlockEntry(key, value); // 封装成 BlockEntry
}
总结一下这种协同工作的好处:
- 高效: 整个数据块的读取和迭代过程中,键和值的实际字节数据只在内存中存在一份(在
MemorySegment
中)。BlockEntry
和MemorySlice
只是轻量级的视图对象,避免了大量的对象创建和数据拷贝开销。 - 延迟物化: 只有当用户真正需要一个
byte[]
时(例如,调用MemorySlice.copyBytes()
),实际的拷贝才会发生。在进行比较、排序等操作时,都直接在内存视图上进行,速度非常快。 - 清晰的职责分离:
MemorySegment
: 负责管理底层内存(堆上/堆外)。MemorySlice
: 负责提供对MemorySegment
的零拷贝、基于内容的视图。BlockEntry
: 负责将key
和value
的MemorySlice
视图组合成一个逻辑上的键值对记录。
FileBasedBloomFilter
它在 Paimon 中扮演着一个非常重要的角色:为存储在文件中的布隆过滤器数据提供一个高效、带缓存的访问接口。
布隆过滤器是一种空间效率极高的数据结构,用于快速判断一个元素“可能存在”或“绝对不存在”于一个集合中。在 Paimon 这类数据湖存储中,它被用作一种文件索引(File Index),可以在查询时跳过(skip)那些肯定不包含所需数据的数据文件,从而极大地提升查询性能。
下面将从以下几个方面来解析 FileBasedBloomFilter
:
- 核心职责与设计目标
- 构造函数与依赖
- 核心方法
testHash()
的工作机制 - 缓存管理与生命周期
- 与
BloomFilter
类的关系
FileBasedBloomFilter
的核心职责是代理一个存储在物理文件特定位置的布隆过滤器。它的设计目标是:
- 延迟加载(Lazy Loading): 不在初始化时就读取整个布隆过滤器的数据,而是在第一次需要进行判断(调用
testHash
)时才从磁盘加载。 - 缓存集成: 与 Paimon 的缓存系统(
CacheManager
)深度集成,将从磁盘加载的布隆过滤器数据(一个字节数组)缓存到内存中,避免重复的磁盘 I/O。 - 统一接口: 对上层调用者提供与内存中的
BloomFilter
类似的testHash()
方法,屏蔽底层的文件读取和缓存管理的复杂性。
我们先来看它的构造函数,这能帮助我们理解它的组成部分。
// ... existing code ...
public class FileBasedBloomFilter implements Closeable {
private final PageFileInput input;
private final CacheManager cacheManager;
private final BloomFilter filter;
private final long readOffset;
private final int readLength;
private final CacheKey cacheKey;
private int accessCount;
public FileBasedBloomFilter(
PageFileInput input,
CacheManager cacheManager,
long expectedEntries,
long readOffset,
int readLength) {
this.input = input;
this.cacheManager = cacheManager;
checkArgument(expectedEntries >= 0);
this.filter = new BloomFilter(expectedEntries, readLength);
this.readOffset = readOffset;
this.readLength = readLength;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true);
}
// ... existing code ...
input: PageFileInput
: 一个文件输入对象,提供了从文件中读取指定位置和长度数据的能力。cacheManager: CacheManager
: Paimon 的缓存管理器,负责管理内存缓存池。expectedEntries: long
: 预期的元素数量。这个值和下面的readLength
一起用来初始化一个逻辑上的BloomFilter
对象,计算出最佳的哈希函数个数。readOffset: long
: 布隆过滤器数据在文件 (input
) 中的起始偏移量。readLength: int
: 布隆过滤器数据的长度(字节数)。filter: BloomFilter
: 一个内部持有的BloomFilter
实例。注意:在构造时,这个filter
内部的BitSet
还没有关联任何实际的内存数据,它只是一个“空壳”,定义了哈希函数个数等元信息。cacheKey: CacheKey
: 根据文件、偏移量和长度创建的一个缓存键。这个 Key 非常关键,它唯一标识了这段布隆过滤器数据在CacheManager
中的身份。isIndex
参数被设置为true
,表明这是一个索引缓存,CacheManager
会将其放入索引专用的缓存池中。
核心方法 testHash()
的工作机制
这是 FileBasedBloomFilter
最核心的方法,它实现了延迟加载和缓存刷新逻辑。
// ... existing code ...
public boolean testHash(int hash) {
accessCount++;
// we should refresh cache in LRU, but we cannot refresh everytime, it is costly.
// so we introduce a refresh count to reduce refresh
if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) {
MemorySegment segment =
cacheManager.getPage(
cacheKey,
key -> input.readPosition(readOffset, readLength),
new BloomFilterCallBack(filter));
filter.setMemorySegment(segment, 0);
accessCount = 0;
}
return filter.testHash(hash);
}
// ... existing code ...
执行流程分析:
- 访问计数:
accessCount
递增。 - 触发缓存/加载:
if
条件判断是否需要从CacheManager
获取数据。以下两种情况会触发:filter.getMemorySegment() == null
: 这是首次调用testHash
,或者布隆过滤器的数据已被缓存淘汰。此时filter
内部没有指向任何内存,必须加载。accessCount == REFRESH_COUNT
: 这是一个缓存刷新优化。对于 LRU 这类缓存策略,频繁访问一个条目会让它一直保持“热门”,但并不会提升它在缓存中的优先级。通过设置一个访问次数阈值 (REFRESH_COUNT
),定期地重新getPage
,可以起到“刷新”该条目在缓存中活跃度的作用,使其不容易被淘汰。
- 调用
cacheManager.getPage()
:cacheKey
: 使用之前创建的 Key 去缓存中查找。key -> input.readPosition(...)
: 这是一个 Lambda 表达式,定义了**缓存未命中(Cache Miss)**时的加载逻辑。如果CacheManager
中没有找到cacheKey
对应的数据,就会执行这个 Lambda,从文件的readOffset
处读取readLength
字节的数据。new BloomFilterCallBack(filter)
: 这是一个回调对象。当这个缓存条目因为内存压力等原因被CacheManager
**淘汰(Evict)**时,CacheManager
会调用这个回调对象的onRemoval
方法。
- 关联内存:
cacheManager.getPage()
返回一个MemorySegment
,它指向了包含布隆过滤器数据的内存区域。filter.setMemorySegment(segment, 0)
这行代码将这个内存段与内部的BloomFilter
实例关联起来。至此,filter
才成为一个功能完整的、可以进行判断的布隆过滤器。 - 执行判断:
return filter.testHash(hash)
,调用内部BloomFilter
实例的testHash
方法,在刚刚加载或从缓存中获取的内存上进行位运算,返回最终结果。
缓存管理与生命周期
FileBasedBloomFilter
实现了 Closeable
接口,并与 CacheCallback
配合,实现了完善的生命周期管理。
// ... existing code ...
@Override
public void close() throws IOException {
cacheManager.invalidPage(cacheKey);
}
/** Call back for cache manager. */
private static class BloomFilterCallBack implements CacheCallback {
private final BloomFilter bloomFilter;
private BloomFilterCallBack(BloomFilter bloomFilter) {
this.bloomFilter = bloomFilter;
}
@Override
public void onRemoval(CacheKey key) {
this.bloomFilter.unsetMemorySegment();
}
}
}
close()
方法: 当FileBasedBloomFilter
不再需要时(例如,一个查询任务结束),调用close()
方法会主动通知CacheManager
将cacheKey
对应的缓存页失效(Invalidate)。这会释放缓存占用的内存。BloomFilterCallBack
:- 当缓存条目被动地被
CacheManager
淘汰时,onRemoval
方法会被调用。 this.bloomFilter.unsetMemorySegment()
这行代码会解除内部BloomFilter
与MemorySegment
的关联。这非常重要,它确保了当下次testHash
被调用时,filter.getMemorySegment() == null
条件成立,从而触发重新从CacheManager
加载数据。这保证了数据的一致性。
- 当缓存条目被动地被
FileBasedBloomFilter
和 BloomFilter
是组合(Composition)关系,前者包装了后者。
BloomFilter
: 是一个纯粹的、基于内存的布隆过滤器实现。它接收一个MemorySegment
(代表一块内存),并在这块内存上执行addHash
和testHash
操作。它不关心这块内存从何而来。FileBasedBloomFilter
: 是一个更高层次的封装,它负责处理“脏活累活”:从哪里(PageFileInput
)加载数据、如何缓存数据(CacheManager
)、何时加载(testHash
中的懒加载逻辑)以及如何释放资源(close
和CacheCallback
)。它将BloomFilter
从对文件 I/O 和缓存的感知中解耦出来。
总结
FileBasedBloomFilter
是一个设计精巧的类,它通过代理模式和组合模式,将一个纯内存的 BloomFilter
数据结构适配到了一个基于文件和缓存的复杂环境中。它完美地体现了关注点分离的设计原则,使得 Paimon 能够高效地利用布隆过滤器进行数据过滤,同时通过缓存机制将 I/O 开销降到最低。