背景
Paimon 通过将 LSM-Tree(Log-Structured Merge-Tree)结构和本地缓存机制引入数据湖,为 Flink Lookup Join 提供了一种介于“全内存”和“外部 KV 系统”之间的、高性价比的解决方案。
传统 Flink 维表 Join 的一个痛点是,如果维表数据量大,无法完全加载到 TaskManager 的内存中。Paimon 解决了这个问题,它支持将数据缓存在本地磁盘,并利用 RocksDB 进行管理,实现了内存与磁盘的混合存储。
本地文件缓存 (Local File Cache): Paimon 会将远程存储(如 HDFS/S3)上的数据文件拉取到本地磁盘进行缓存,避免每次查询都通过网络读取。这个缓存由
Caffeine
实现,具备 LRU 和超时淘汰策略。// ... existing code ... public class LookupFile { // ... existing code ... // ==================== Cache for Local File ====================== public static Cache<String, LookupFile> createCache( Duration fileRetention, MemorySize maxDiskSize) { return Caffeine.newBuilder() .expireAfterAccess(fileRetention) .maximumWeight(maxDiskSize.getKibiBytes()) .weigher(LookupFile::fileWeigh) .removalListener(LookupFile::removalCallback) .executor(Runnable::run) .build(); } // ... existing code ...
RocksDB 作为本地 KV 存储: 对于已经拉取到本地的数据,Paimon 会将其加载到内嵌的 RocksDB 实例中,构建 Key-Value 索引。这样,后续的 Lookup 操作就变成了对本地 RocksDB 的高效点查,而不是对原始文件的扫描。
FullCacheLookupTable
和NoPrimaryKeyLookupTable
等类都利用了 RocksDB 的能力。FullCacheLookupTable.java
// ... existing code ... public abstract class FullCacheLookupTable implements LookupTable { // ... existing code ... protected RocksDBStateFactory stateFactory; @Nullable private ExecutorService refreshExecutor; // ... existing code ... }
RocksDBOptions
文件中也定义了相关的配置项,比如lookup.cache-rows
用于控制缓存行数。// ... existing code ... public class RocksDBOptions { public static final ConfigOption<Long> LOOKUP_CACHE_ROWS = key("lookup.cache-rows") .longType() .defaultValue(10_000L) .withDescription("The maximum number of rows to store in the cache."); // ... existing code ...
无主键表怎么使用RocksDB
对于没有主键的表,一个 Join Key 可能会对应多条数据,这和有主键表(一个 Key 只对应一条最新数据)的查找逻辑完全不同。
NoPrimaryKeyLookupTable
正是为了处理这种 "一对多" 的情况而设计的。它虽然和有主键的 PrimaryKeyLookupTable
一样都继承自 FullCacheLookupTable
并使用 RocksDB 作为本地缓存,但其内部实现机制有本质区别。
关键在于它使用的 State 类型不同。
PrimaryKeyLookupTable
使用RocksDBValueState
,这是一个标准的 Key-Value 存储,一个 Key 只会映射到一个 Value。NoPrimaryKeyLookupTable
使用RocksDBListState
,这是一个 Key-List<Value> 的存储结构。
NoPrimaryKeyLookupTable
的核心是一个 RocksDBListState
类型的成员变量 state
。这个 State 的 Key 是维表的 Join Key,而 Value 是一个包含所有匹配该 Key 的完整数据行的 列表 (List)。
NoPrimaryKeyLookupTable.java
// ... existing code ...
public class NoPrimaryKeyLookupTable extends FullCacheLookupTable {
private final long lruCacheSize;
private final KeyProjectedRow joinKeyRow;
private RocksDBListState<InternalRow, InternalRow> state;
public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
// ... existing code ...
}
@Override
public void open() throws Exception {
init();
this.state =
stateFactory.listState(
"join-key-index",
InternalSerializers.create(
TypeUtils.project(projectedType, joinKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
bootstrap();
}
// ... existing code ...
在 open()
方法中,它通过 stateFactory.listState(...)
初始化了 state
,明确了这是一个存储列表的 State。
get
方法返回一个列表
当进行 get
操作时,它会从 RocksDBListState
中根据 Key 取出整个列表并返回。这与 PrimaryKeyLookupTable
返回单个 InternalRow
是不同的。
// ... existing code ...
@Override
public List<InternalRow> innerGet(InternalRow key) throws IOException {
return state.get(key);
}
// ... existing code ...
refreshRow
方法执行 add
操作
在加载和刷新数据时,NoPrimaryKeyLookupTable
的行为不是“更新/覆盖”(put),而是“添加”(add)。每当来一条新的数据,它会被添加到对应 Join Key 的列表中,而不是替换掉旧数据。
// ... existing code ...
@Override
protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
joinKeyRow.replaceRow(row);
if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
if (predicate == null || predicate.test(row)) {
state.add(joinKeyRow, row);
}
} else {
// ... existing code ...
}
}
// ... existing code ...
可以看到,这里调用的是 state.add(joinKeyRow, row)
,这会将 row
追加到 joinKeyRow
对应的列表中。
总结
所以,NoPrimaryKeyLookupTable
利用 RocksDB 的方式可以总结为:
- 使用 Join Key 作为 RocksDB 的 Key。
- 将所有匹配该 Join Key 的数据行组织成一个列表。
- 将这个列表序列化后,作为 RocksDB 的 Value 进行存储。
这样就巧妙地通过 Key -> List<Value>
的模式,在 KV 存储上实现了对无主键表的 Lookup Join 支持,能够正确处理一个 Join Key 关联出多条维表数据的情况。
瓶颈与优化
Paimon 的基础方案存在两个主要瓶颈:每个节点加载全量数据和本地磁盘 I/O 性能。Paimon 针对这两点提供了有效的优化手段。
Paimon 专栏 有更详细的讨论
优化一:避免全量数据加载 (Partial Cache)
这个问题本质上是 Flink 作业的数据分发没有与 Paimon 表的物理存储结构(Bucket)对齐。Paimon 通过 PrimaryKeyPartialLookupTable
实现了部分数据加载。其核心思想是:根据上游流数据 Join Key 的值,计算出这个 Key 对应 Paimon 表中的哪个 Bucket,然后只加载该 Bucket 的数据。
这样,每个 Flink 的 Lookup 并发实例就不再需要加载维表的全量数据,而只需要负责一部分 Bucket 的数据,极大地降低了单个节点的内存消耗和启动时间。
FileStoreLookupFunction
中的逻辑清晰地体现了这一点。当满足特定条件时(例如,Join Key 是主键),它会优先创建 PrimaryKeyPartialLookupTable
。
// ... existing code ...
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
if (isRemoteServiceAvailable(table)) {
// ... existing code ...
} else {
try {
this.lookupTable =
PrimaryKeyPartialLookupTable.createLocalTable(
table, projection, path, joinKeys, getRequireCachedBucketIds());
LOG.info(
"Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
} catch (UnsupportedOperationException ignore) {
LOG.info(
"Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor "
+ "because bucket mode isn't {}. Will create FullCacheLookupTable.",
BucketMode.HASH_FIXED);
}
}
}
// ... existing code ...
优化二:缓解本地磁盘 I/O 瓶颈 (Query Service)
为了解决本地磁盘(尤其是 HDD)随机读性能差的问题,Paimon 借鉴了 "服务化" 的思想,推出了 Query Service。
可以为一张 Paimon 表启动一个独立的、常驻的 Flink 作业作为查询服务。这个服务可以部署在拥有高性能 SSD 的专用节点上,它预先加载数据并对外提供点查服务。
当维表 Join 任务运行时,它会优先连接这个 Query Service 来获取数据,从而将 I/O 压力从计算节点转移到专用的服务节点上,实现了计算与存储的物理分离,保证了 Lookup 性能的稳定性和高效性。
文档中描述了如何启动该服务:
// ... existing code ...
# Lookup Joins
## Query Service
You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join
will prioritize obtaining data from it, which will effectively improve query performance.
{{< tabs "query-service" >}}
{{< tab "Flink SQL" >}}
```sql
CALL sys.query_service('database_name.table_name', parallelism);
`FileStoreLookupFunction` 中的代码逻辑也验证了这一点,它会通过 `isRemoteServiceAvailable` 检查查询服务是否存在,如果存在,则创建远程查询的客户端。
FileStoreLookupFunction.java
// ... existing code ...
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
if (isRemoteServiceAvailable(table)) {
this.lookupTable =
PrimaryKeyPartialLookupTable.createRemoteTable(table, projection, joinKeys);
LOG.info(
"Remote service is available. Created PrimaryKeyPartialLookupTable with remote service.");
} else {
// ... existing code ...
不足
性能与灵活性限制
Paimon 在高频更新场景下不如专用 KV 系统,这主要是由其核心架构决定的。
a. 更新路径更长,延迟更高
- 专用 KV 系统 (如 HBase/Redis): 通常为在线服务设计,追求极致的低延迟。一次更新操作的路径非常短,数据可以直接写入内存(如 HBase MemStore),并立即对后续查询可见,然后异步刷写到持久化存储。
- Paimon: Paimon 的设计目标是服务于数据湖的分析场景,它通过 Mini-Batch + LSM-Tree 的方式来处理更新,优先保证吞吐量和批处理效率。一次更新的完整路径如下:
- 数据由 Flink Sink 算子写入内存缓冲区 (
write-buffer-size
)。 - 当 Flink Checkpoint 触发时,内存中的数据被刷写到远端存储(如 HDFS/S3),形成新的 Level-0 小文件。(本地占用达到限制也会写远程,但此时没有更新元数据,因此不可见)
- 生成一个新的快照(Snapshot),包含这次变更的元数据。
- Lookup Join 的 Source 端感知到新的快照后,开始拉取增量数据。
- 将增量数据更新到本地的 RocksDB 缓存中。
- 数据由 Flink Sink 算子写入内存缓冲区 (
这个过程与 Flink 的 Checkpoint 周期强绑定,本质上是异步、微批次的。即使 Checkpoint 间隔很短,其延迟也远高于专用 KV 系统毫秒级的同步更新。
b. Compaction 开销
Paimon 依赖于 Compaction(合并)来合并小的增量文件,以保证查询性能。虽然 Compaction 是后台异步执行的,但它会消耗计算和 I/O 资源,在高频写入时,频繁的 Compaction 会对系统造成一定的压力。
c. 代码实现分析
我们来看 NoPrimaryKeyLookupTable.java
,它是为无主键表(Append-Only 表)设计的 Lookup 实现。
// ... existing code ...
@Override
public void refresh(Iterator<InternalRow> incremental) throws IOException {
if (userDefinedSeqComparator != null) {
throw new IllegalArgumentException(
"Append table does not support user defined sequence fields.");
}
super.refresh(incremental);
}
@Override
protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
joinKeyRow.replaceRow(row);
if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
if (predicate == null || predicate.test(row)) {
state.add(joinKeyRow, row);
}
} else {
throw new RuntimeException(
String.format(
"Received %s message. Only INSERT/UPDATE_AFTER values are expected here.",
row.getRowKind()));
}
}
// ... existing code ...
这里的 refresh
方法由其父类 FullCacheLookupTable
在检测到新快照时调用。它会遍历增量数据(incremental
),并逐条调用 refreshRow
。refreshRow
将新数据 add
到本地的 RocksDBListState
中。整个流程清晰地表明:本地缓存的更新是滞后的,它依赖于上游 Paimon 表产生新快照,这正是其不适合超高频更新场景的根本原因。
可扩展性依赖
“可扩展性依赖”问题 源于 Paimon Lookup 缓存与 Flink 计算节点的紧耦合。
- 专用 KV 系统: 其架构是存储与计算分离的。当存储容量或 QPS 成为瓶颈时,可以独立地对存储层进行水平扩展(例如,为 HBase 增加 RegionServer 节点)。系统会自动进行数据重分布(Region Splitting/Balancing),对上层应用透明。
- Paimon Lookup: Paimon 的本地缓存(无论是部分缓存还是全量缓存)是存在于 Flink TaskManager 的本地磁盘上的。Lookup 的能力直接与 Flink Job 的并发度挂钩。
- 水平扩展方式: 要提升 Lookup 的整体吞吐能力,你需要增加 Flink Job 的并发度,即增加 Flink 的计算节点(TaskManager Slots)。
- Bucket 数量的制约: Paimon 表的数据是根据
bucket
值物理组织的。为了让增加的 Flink 并发能够均匀地分摊负载,理想情况下,Paimon 表的bucket
数量应该大于或等于 Flink 的并发度。如果并发度远大于bucket
数,很多 Flink Task 将会空闲。 - 运维复杂度: 因此,当面临扩展需求时,可能不仅要扩展 Flink 集群,还需要重新规划 Paimon 表的
bucket
数量。而修改一个已存在表的bucket
数是一个重操作,通常需要重写整个表的数据,这带来了额外的运维成本和复杂性。
文档中创建表的示例也说明了 bucket
是一个在表创建时就需要规划好的物理属性。
-- ... existing code ...
CREATE TABLE MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (
'bucket' = '10'
-- ... existing code ...
GlobalDynamicBucketSink
在很大程度上缓解了可扩展性依赖”问题,但并没有完全消除它。
我们来看 GlobalDynamicBucketSink.java
的源码来理解它的工作原理。
// ... existing code ...
public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
// ... existing code ...
// Topology:
// input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket -->
// writer --> committer
SingleOutputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
// ... existing code ...
.setParallelism(input.getParallelism());
// 1. shuffle by key hash
Integer assignerParallelism =
MathUtils.max(
options.dynamicBucketInitialBuckets(),
options.dynamicBucketAssignerParallelism());
if (assignerParallelism == null) {
assignerParallelism = parallelism;
}
KeyPartRowChannelComputer channelComputer =
new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys);
DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
partition(bootstraped, channelComputer, assignerParallelism);
// 2. bucket-assigner
TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =
partitionByKeyHash
.transform(
"cross-partition-bucket-assigner",
rowWithBucketType,
GlobalIndexAssignerOperator.forRowData(table))
.setParallelism(partitionByKeyHash.getParallelism());
// ... existing code ...
// 3. shuffle by bucket
DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
partition(bucketAssigned, new RowWithBucketChannelComputer(schema), parallelism);
// 4. writer and committer
return sinkFrom(partitionByBucket, createCommitUser(options.toConfiguration()));
}
}
从代码注释和实现中我们可以看到一个清晰的 Flink 作业拓扑:
bootstrap
:首先通过IndexBootstrapOperator
为每条数据加载其当前所属的分区和 bucket 信息。shuffle by key hash
:然后,数据不是直接写入,而是根据主键的哈希值进行一次 shuffle。这确保了相同主键的数据会被发送到同一个bucket-assigner
算子实例。bucket-assigner
:这是核心。GlobalIndexAssignerOperator
是一个有状态的算子,它内部维护了一个全局的索引(通常基于 RocksDB),记录了每个主键当前被分配在哪个 bucket。当新数据到来时,它会查询这个索引:- 如果主键已存在,就将数据路由到已有的 bucket。
- 如果主键是新的,它会根据负载情况(比如每个 bucket 的大小)为其分配一个新的 bucket。
shuffle by bucket
:assigner
算子为每条数据打上 bucket 标签后,再进行一次 shuffle,将数据发送到负责写入对应 bucket 的writer
算子。writer
:最后由DynamicBucketRowWriteOperator
将数据写入 Paimon 表的对应 bucket 文件中。
有了动态分桶,我们不再需要在建表时就精确地预估 bucket 数量。当数据量增长时,系统可以自动增加 bucket 的数量来分散数据,避免单个 bucket 过大导致的数据倾斜和性能问题。这极大地提升了运维的灵活性。
但请注意,bucket-assigner 和 writer 仍然是 Flink 作业的一部分,它们的处理能力依然受限于 Flink 的并发度和资源。扩展 Lookup 性能的根本方式,仍然是增加 Flink 作业的并发度(即增加 writer 的并行实例数),让更多的节点来分担写入和后续的查询压力。这与专用 KV 系统可以独立扩展存储层的模式还是有本质区别的。
Compaction压力
为了给您一个更具批判性和深度的分析,我们不能只看“有没有 Compaction”,而必须深入对比两者在更新路径、Compaction 机制、资源隔离和架构设计上的根本不同。
在线数据库 (OLTP) vs. 数据湖格式 (OLAP) 是所有差异的根源。
- HBase: 一个纯粹的、分布式的、在线的 NoSQL 数据库。它的首要目标是为海量数据提供低延迟的随机读写。它的所有设计,包括 MemStore、WAL、Region 分片和 Compaction,都是为了服务这个在线目标。
- Paimon: 一个面向分析场景的数据湖存储格式。它的目标是为 Flink/Spark 等计算引擎提供一个能够进行流式更新和高效分析的统一存储底座。它本质上是文件格式 + 元数据 + 一套读写和维护逻辑,其性能和行为与上层的计算引擎(Flink)强绑定。
更新路径与写入/合并的耦合性 是造成“压力”体感不同的最直接原因。
HBase 的解耦设计:
- 写入路径:
Client -> WAL (Write-Ahead Log) -> MemStore (内存)
. 这个过程极快,写入到内存即可向客户端返回成功。数据对读立即可见。 - 刷盘 (Flush): 当 MemStore 满了,数据才会被异步地刷写到磁盘,形成一个 HFile。这个过程不阻塞新的写入。
- 合并 (Compaction): RegionServer 会在后台,根据策略,独立地对磁盘上的 HFile 进行合并。这个过程也不阻塞新的写入(新数据依然写入 MemStore)。 批判性分析: HBase 通过 WAL+MemStore 这层高性能缓存,将在线写入和后台维护(Flush/Compaction) 这两个操作在时间和资源上完全解耦。因此,即使后台 Compaction 压力很大,也不会直接反压到写入请求上,保证了高频写入的低延迟。
- 写入路径:
Paimon 的耦合性:
- 写入路径:
数据源 -> Flink Operator (内存/磁盘Buffer) -> Checkpoint 触发 -> 刷写到远端存储形成 Level-0 文件 -> Commit 元数据
。 - 合并 (Compaction): Paimon 的 Compaction 策略与写入密切相关。
- 同步/写时合并: 在默认配置下,Compaction 可能在写入链路中被触发。例如,当 L0 文件过多时,
prepareCommit
阶段可能会等待 Compaction 完成,以防止文件数量失控。这在MergeTreeWriter.java
中有所体现,prepareCommit
方法会根据条件决定是否waitCompaction
。MergeTreeWriter.java
这种耦合会导致 Compaction 的压力直接传递给 Flink 的写入算子,造成反压,进而影响整个 Flink 作业的吞吐。// ... existing code ... @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flushWriteBuffer(waitCompaction, false); if (commitForceCompact) { waitCompaction = true; } // ... if (compactManager.shouldWaitForPreparingCheckpoint()) { waitCompaction = true; } trySyncLatestCompaction(waitCompaction); return drainIncrement(); } // ... existing code ...
- 异步/专用合并作业: Paimon 推荐的最佳实践是将 Compaction 独立出来,由一个专门的 Flink 作业来执行。
这确实实现了写入和合并的解耦,但代价是运维复杂度的提升。你需要为一张表维护两个 Flink 作业(一个写入,一个合并),并协调它们。// ... existing code ... # Dedicated Compaction ## Dedicated Compaction Job to the table and perform compactions as needed. // ... existing code ...
- 同步/写时合并: 在默认配置下,Compaction 可能在写入链路中被触发。例如,当 L0 文件过多时,
批判性分析: Paimon 的“压力”来源于其默认模式下写入和合并的潜在耦合。虽然可以通过专用合并作业解耦,但这将问题从“性能压力”转化为了“运维复杂度”。而 HBase 在架构层面原生就解决了这个问题。
- 写入路径:
资源隔离与架构
- HBase: 拥有独立的集群资源。RegionServer 是专门的进程,其 CPU、内存、磁盘 I/O 都服务于数据读写和合并。资源隔离清晰,问题定位和调优(比如调整 Compaction 线程池)都针对性很强。
- Paimon: 其计算资源完全依赖 Flink。写入、合并、动态分桶的
GlobalIndexAssignerOperator
等都是 Flink 的算子,运行在 TaskManager 的 Slot 中,共享 JVM 堆内存、Managed Memory 和磁盘。GlobalIndexAssignerOperator.java
的实现就是一个有状态的 Flink 算子,它的性能、状态大小、恢复速度都受限于 Flink 的框架。// ... existing code ... public class GlobalIndexAssignerOperator extends AbstractStreamOperator<Tuple2<InternalRow, Integer>> implements OneInputStreamOperator< Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>, BoundedOneInput { private static final long serialVersionUID = 1L; private final GlobalIndexAssigner assigner; // ... @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); // ... assigner.open( computeManagedMemory(this), ioManager, // ... } // ... existing code ...
- Paimon 将维护任务“寄生”在通用的计算引擎中,带来了部署上的便利,但也导致了资源竞争和隔离性不足。Compaction 任务的 I/O 尖峰会直接影响同一个 TaskManager 上运行的其他任务。而 HBase 的专用架构提供了更强的资源隔离和稳定性保障。
在接受分钟级读取延迟的前提下,为何高频更新对 Paimon 的压力比对 HBase 更大?答案不在于“Compaction”这个行为本身,而在于系统如何处理它:
- 写入耦合性: HBase 通过 MemStore 将高频写入与后台 Compaction 完全解耦;Paimon 的默认写入模式与 Compaction 存在耦合,可能导致反压,虽然可通过专用作业解耦,但增加了运维成本。
- 架构与资源: HBase 是一个独立的、资源隔离的数据库系统,为 Compaction 等后台任务提供了稳定的运行环境;Paimon 的 Compaction 运行在 Flink 的共享资源环境中,更容易产生资源争抢,且其稳定性与 Flink 作业的稳定性强绑定。
- 设计目标: HBase 为在线服务而生,对稳定性和低延迟的追求是第一位的。Paimon 为分析而生,追求的是吞吐量和与计算生态的融合,它在架构上做出了不同的取舍。
所以,不能简单地说 Paimon 的 compaction “有压力”,更精确的批判性描述应该是:Paimon 在应对高频更新时,其默认架构更容易将后台维护(Compaction)的压力传导至写入链路,并且其资源模型与计算引擎共享,隔离性不如专用的 KV 数据库。用户需要通过更复杂的运维手段(如专用合并作业)来缓解这种压力,这是一个架构取舍(Trade-off)的结果。
结论
Paimon 通过牺牲一部分极致的性能、灵活性和解耦的扩展性,换来了架构的简化、数据链路的统一和更低的持有成本。
- 性能与灵活性限制:Paimon 的性能和灵活性与专用的 KV 系统相比仍有差距。它更适合对查询实时性要求不高(例如,分钟级延迟可接受)且写入负载不会导致 Compaction 严重积压的维表场景。虽然理论上可以支持高频更新,但需要仔细评估其对写入端性能和资源消耗的影响。
- 可扩展性依赖:Paimon 的可扩展性依赖于 Lookup 节点的水平扩展。虽然动态分桶(Dynamic Bucket)机制极大地增强了数据分布的灵活性,避免了预先固定 Bucket 数的僵化问题,但其扩展模型的核心仍然是通过增加 Flink 计算节点的并发度来提升整体服务能力,这与 KV 系统独立扩展存储资源的方式不同。
总结
Paimon 在维表 Join 场景的应用,是一个典型的在成本、性能和运维复杂度之间做权衡的优秀案例。
收益:
- 统一存储:将维表数据统一存储在数据湖中,无需同步到外部 KV 系统,简化了数据架构。
- 简化链路:减少了对 HBase、Redis 等组件的依赖,降低了运维成本。
- 成本效益:利用本地磁盘和 RocksDB,以较低的硬件成本实现了可观的 Lookup 性能。
不足与权衡:
- 性能与灵活性:对于更新极其频繁(例如,秒级或毫秒级)的维表,Paimon 的 LSM 结构和基于文件的更新模式,其性能和灵活性仍无法与专业的 KV 系统(如 HBase/Pegasus)媲美。它更适合更新频率较低的场景(分钟级、小时级或天级)。
- 扩展性模型:Paimon Lookup 的扩展依赖于增加 Flink 计算节点(TaskManagers)的数量和调整 Paimon 表的 Bucket 数量,这是一种计算和存储耦合的扩展方式。而 KV 系统可以独立扩展其存储节点,扩展模型更清晰。
总而言之,Paimon 为数据湖场景下的维表关联提供了一个强大且经济的选择,尤其适合那些对成本敏感、维表更新不那么频繁的业务。通过其灵活的缓存策略和不断演进的优化方案(如 Query Service),Paimon 正在逐步弥补与传统方案的差距。
展望
深入挖掘流计算场景,拓宽应用范围
Paimon 从诞生之初(当时名为 Flink Table Store)就是为了解决 Flink 流式更新数据湖的痛点。
现状分析:
- 核心能力已具备: Paimon 已经通过 Primary Key 表的 LSM 结构、多种 Merge Engine、Changelog Producer 等机制,为流式更新和流式读取(CDC)提供了强大的支持。
- 文档中反复强调了其流批一体的能力,如
docs/content/concepts/overview.md
所述,Paimon 在流模式下像消息队列,在批模式下像 Hive 表。
- 文档中反复强调了其流批一体的能力,如
- 典型场景已落地:
- CDC 数据入湖: 这是最经典的应用,文档
docs/content/cdc-ingestion/postgres-cdc.md
就展示了如何同步数据库 CDC 数据。 - 流式维表 Join: 我们之前已经深入讨论过,
docs/content/flink/sql-lookup.md
详细描述了此场景。 - 流式聚合/ETL: 利用
partial-update
或aggregation
等 Merge Engine,可以在数据写入时就完成聚合,简化流处理拓扑。
- CDC 数据入湖: 这是最经典的应用,文档
未来方向:
- 更丰富的 Merge Engine: 可能会支持更复杂的业务逻辑,比如窗口聚合、更灵活的去重策略等。
- 性能优化: 持续优化 Compaction 策略、索引机制(如 Z-order)、查询下推等,降低流式更新的延迟和资源消耗。
-
- 生态融合: 加强与 Flink 之外的引擎(如 Spark Streaming)的流式集成。文档
docs/content/spark/sql-query.md
提到 Spark 3.3+ 已支持流式读取,未来可能会进一步增强写入和端到端的流处理能力。
- 生态融合: 加强与 Flink 之外的引擎(如 Spark Streaming)的流式集成。文档
建设自动化的维护服务
降低 Paimon 的运维成本,使其更易于在生产环境大规模使用。
现状分析:
- 耦合的维护任务: 正如展望中所说,目前快照过期(Snapshot Expiration)、分区过期(Partition Expiration)、Compaction 等维护操作,要么与写入作业耦合,要么需要用户手动启动一个独立的
Dedicated Compaction
作业。docs/content/maintenance/dedicated-compaction.md
详细描述了如何启动专用的合并作业,这虽然解耦了写入和合并,但增加了用户的运维负担。
- 缺乏统一的治理平台: Paimon 自身提供了丰富的 Metrics(
docs/content/maintenance/metrics.md
)来暴露内部状态,但它不提供一个开箱即用的、自动化的治理服务来消费这些指标并采取行动。
未来方向 (借鉴 Iceberg 和 Amoro):
- 独立的、自动化的维护服务: 这正是 Apache Amoro (incubating) 正在做的事情。
docs/content/ecosystem/amoro.md
中提到,Amoro 旨在为数据湖格式提供一个 Lakehouse 管理系统,其核心功能之一就是自优化(Self-optimizing),包括自动的 Compaction、数据过期等。 - 平台化、服务化: 未来理想的状态是,用户只需在平台上注册 Paimon 表,平台服务(如 Amoro Management Service, AMS)就会自动监控表的健康度(小文件数量、数据倾斜度等),并根据预设策略自动触发和调度维护任务(如 Compaction),对用户完全透明。这能极大降低使用门槛。
提升 Paimon 的 Catalog 能力
将 Paimon 从一个“存储格式”提升为一个“服务化的数据资产”,使其能被更广泛的平台和应用集成。
现状分析:
- 多种 Catalog 后端: Paimon 目前支持多种 Catalog 实现,如
filesystem
,hive
,jdbc
。这使得它可以与现有的元数据服务集成。docs/content/concepts/catalog.md
对此有详细介绍。 - 缺乏标准化的服务接口: 现有的 Catalog 都是“嵌入式”的,即 Flink/Spark 作业在客户端直接加载 Catalog 实现类,并与后端(如 Hive Metastore)交互。这种方式存在一些问题:
- 客户端过重: 需要在客户端配置各种依赖和认证信息。
- 语言绑定: 主要服务于 JVM 生态(Java/Scala)。虽然有基于
py4j
的 Python API(docs/content/program-api/python-api.md
),但本质还是启动 JVM,不够轻量。 - 权限和治理不便: 难以实现统一的、跨引擎的权限控制和元数据访问审计。
未来方向 (引入 REST Catalog):
- 对标 Iceberg REST Catalog: Iceberg 社区定义了一套标准的 REST API 来对 Catalog 进行操作(如创建表、获取表元数据等)。任何实现了这套 API 的服务都可以作为 Iceberg 的 Catalog Server。
- Paimon 的 REST Catalog:
docs/content/concepts/rest/overview.md
和paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
的存在表明,Paimon 社区已经正在实现这个功能。// ... existing code ... public class RESTCatalog implements Catalog { public static final String HEADER_PREFIX = "header."; public static final String MAX_RESULTS = "maxResults"; public static final String PAGE_TOKEN = "pageToken"; public static final String QUERY_PARAMETER_WAREHOUSE_KEY = "warehouse"; private final RESTClient client; private final ResourcePaths resourcePaths; // ... existing code ...
- 带来的好处:
- 解耦与轻客户端: 客户端只需通过 HTTP 与 REST 服务交互,无需关心后端的具体实现(是 Hive 还是 JDBC),也无需引入厚重的依赖。
- 语言无关: 任何语言(Python, Go, C++ 等)都可以通过 HTTP 客户端轻松集成 Paimon 的元数据管理能力。
- 统一治理: 可以在 REST 服务端实现统一的认证、鉴权、审计和策略控制,方便平台进行数据治理。
综上所述,这三个展望描绘了一个清晰的蓝图:在内核层持续深耕流计算能力,在运维层通过自动化服务降低用户成本,在服务层通过标准化的 REST API 开放生态。这三者相辅相成,将推动 Paimon 成为一个更强大、更易用、更开放的实时数据湖平台。