Paimon 在维表 Lookup Join 场景中的应用

发布于:2025-07-21 ⋅ 阅读:(17) ⋅ 点赞:(0)

背景

Paimon 通过将 LSM-Tree(Log-Structured Merge-Tree)结构和本地缓存机制引入数据湖,为 Flink Lookup Join 提供了一种介于“全内存”和“外部 KV 系统”之间的、高性价比的解决方案。

传统 Flink 维表 Join 的一个痛点是,如果维表数据量大,无法完全加载到 TaskManager 的内存中。Paimon 解决了这个问题,它支持将数据缓存在本地磁盘,并利用 RocksDB 进行管理,实现了内存与磁盘的混合存储。

  • 本地文件缓存 (Local File Cache): Paimon 会将远程存储(如 HDFS/S3)上的数据文件拉取到本地磁盘进行缓存,避免每次查询都通过网络读取。这个缓存由 Caffeine 实现,具备 LRU 和超时淘汰策略。

    // ... existing code ...
    public class LookupFile {
    
        // ... existing code ...
        // ==================== Cache for Local File ======================
    
        public static Cache<String, LookupFile> createCache(
                Duration fileRetention, MemorySize maxDiskSize) {
            return Caffeine.newBuilder()
                    .expireAfterAccess(fileRetention)
                    .maximumWeight(maxDiskSize.getKibiBytes())
                    .weigher(LookupFile::fileWeigh)
                    .removalListener(LookupFile::removalCallback)
                    .executor(Runnable::run)
                    .build();
        }
    
    // ... existing code ...
    
  • RocksDB 作为本地 KV 存储: 对于已经拉取到本地的数据,Paimon 会将其加载到内嵌的 RocksDB 实例中,构建 Key-Value 索引。这样,后续的 Lookup 操作就变成了对本地 RocksDB 的高效点查,而不是对原始文件的扫描。FullCacheLookupTable 和 NoPrimaryKeyLookupTable 等类都利用了 RocksDB 的能力。

    FullCacheLookupTable.java

    // ... existing code ...
    public abstract class FullCacheLookupTable implements LookupTable {
    // ... existing code ...
        protected RocksDBStateFactory stateFactory;
        @Nullable private ExecutorService refreshExecutor;
    // ... existing code ...
    }
    

    RocksDBOptions 文件中也定义了相关的配置项,比如 lookup.cache-rows 用于控制缓存行数。

    // ... existing code ...
    public class RocksDBOptions {
    
        public static final ConfigOption<Long> LOOKUP_CACHE_ROWS =
                key("lookup.cache-rows")
                        .longType()
                        .defaultValue(10_000L)
                        .withDescription("The maximum number of rows to store in the cache.");
    
    // ... existing code ...
    

无主键表怎么使用RocksDB

对于没有主键的表,一个 Join Key 可能会对应多条数据,这和有主键表(一个 Key 只对应一条最新数据)的查找逻辑完全不同。

NoPrimaryKeyLookupTable 正是为了处理这种 "一对多" 的情况而设计的。它虽然和有主键的 PrimaryKeyLookupTable 一样都继承自 FullCacheLookupTable 并使用 RocksDB 作为本地缓存,但其内部实现机制有本质区别。

关键在于它使用的 State 类型不同。

  • PrimaryKeyLookupTable 使用 RocksDBValueState,这是一个标准的 Key-Value 存储,一个 Key 只会映射到一个 Value。
  • NoPrimaryKeyLookupTable 使用 RocksDBListState,这是一个 Key-List<Value> 的存储结构。

NoPrimaryKeyLookupTable 的核心是一个 RocksDBListState 类型的成员变量 state。这个 State 的 Key 是维表的 Join Key,而 Value 是一个包含所有匹配该 Key 的完整数据行的 列表 (List)

NoPrimaryKeyLookupTable.java

// ... existing code ...
public class NoPrimaryKeyLookupTable extends FullCacheLookupTable {

    private final long lruCacheSize;

    private final KeyProjectedRow joinKeyRow;

    private RocksDBListState<InternalRow, InternalRow> state;

    public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
// ... existing code ...
    }

    @Override
    public void open() throws Exception {
        init();
        this.state =
                stateFactory.listState(
                        "join-key-index",
                        InternalSerializers.create(
                                TypeUtils.project(projectedType, joinKeyRow.indexMapping())),
                        InternalSerializers.create(projectedType),
                        lruCacheSize);
        bootstrap();
    }
// ... existing code ...

在 open() 方法中,它通过 stateFactory.listState(...) 初始化了 state,明确了这是一个存储列表的 State。

get 方法返回一个列表

当进行 get 操作时,它会从 RocksDBListState 中根据 Key 取出整个列表并返回。这与 PrimaryKeyLookupTable 返回单个 InternalRow 是不同的。

// ... existing code ...
    @Override
    public List<InternalRow> innerGet(InternalRow key) throws IOException {
        return state.get(key);
    }
// ... existing code ...

refreshRow 方法执行 add 操作

在加载和刷新数据时,NoPrimaryKeyLookupTable 的行为不是“更新/覆盖”(put),而是“添加”(add)。每当来一条新的数据,它会被添加到对应 Join Key 的列表中,而不是替换掉旧数据。

// ... existing code ...
    @Override
    protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
        joinKeyRow.replaceRow(row);
        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
            if (predicate == null || predicate.test(row)) {
                state.add(joinKeyRow, row);
            }
        } else {
// ... existing code ...
        }
    }
// ... existing code ...

可以看到,这里调用的是 state.add(joinKeyRow, row),这会将 row 追加到 joinKeyRow 对应的列表中。

总结

所以,NoPrimaryKeyLookupTable 利用 RocksDB 的方式可以总结为:

  1. 使用 Join Key 作为 RocksDB 的 Key。
  2. 将所有匹配该 Join Key 的数据行组织成一个列表。
  3. 将这个列表序列化后,作为 RocksDB 的 Value 进行存储。

这样就巧妙地通过 Key -> List<Value> 的模式,在 KV 存储上实现了对无主键表的 Lookup Join 支持,能够正确处理一个 Join Key 关联出多条维表数据的情况。

瓶颈与优化

Paimon 的基础方案存在两个主要瓶颈:每个节点加载全量数据本地磁盘 I/O 性能。Paimon 针对这两点提供了有效的优化手段。

Paimon 专栏 有更详细的讨论

优化一:避免全量数据加载 (Partial Cache)

这个问题本质上是 Flink 作业的数据分发没有与 Paimon 表的物理存储结构(Bucket)对齐。Paimon 通过 PrimaryKeyPartialLookupTable 实现了部分数据加载。其核心思想是:根据上游流数据 Join Key 的值,计算出这个 Key 对应 Paimon 表中的哪个 Bucket,然后只加载该 Bucket 的数据

这样,每个 Flink 的 Lookup 并发实例就不再需要加载维表的全量数据,而只需要负责一部分 Bucket 的数据,极大地降低了单个节点的内存消耗和启动时间。

FileStoreLookupFunction 中的逻辑清晰地体现了这一点。当满足特定条件时(例如,Join Key 是主键),它会优先创建 PrimaryKeyPartialLookupTable

// ... existing code ...
        if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
                && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
            if (isRemoteServiceAvailable(table)) {
// ... existing code ...
            } else {
                try {
                    this.lookupTable =
                            PrimaryKeyPartialLookupTable.createLocalTable(
                                    table, projection, path, joinKeys, getRequireCachedBucketIds());
                    LOG.info(
                            "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
                } catch (UnsupportedOperationException ignore) {
                    LOG.info(
                            "Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor "
                                    + "because bucket mode isn't {}. Will create FullCacheLookupTable.",
                            BucketMode.HASH_FIXED);
                }
            }
        }
// ... existing code ...

优化二:缓解本地磁盘 I/O 瓶颈 (Query Service)

为了解决本地磁盘(尤其是 HDD)随机读性能差的问题,Paimon 借鉴了 "服务化" 的思想,推出了 Query Service

可以为一张 Paimon 表启动一个独立的、常驻的 Flink 作业作为查询服务。这个服务可以部署在拥有高性能 SSD 的专用节点上,它预先加载数据并对外提供点查服务。

当维表 Join 任务运行时,它会优先连接这个 Query Service 来获取数据,从而将 I/O 压力从计算节点转移到专用的服务节点上,实现了计算与存储的物理分离,保证了 Lookup 性能的稳定性和高效性。

文档中描述了如何启动该服务:

// ... existing code ...
# Lookup Joins
## Query Service

You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join
will prioritize obtaining data from it, which will effectively improve query performance.

{{< tabs "query-service" >}}

{{< tab "Flink SQL" >}}

```sql
CALL sys.query_service('database_name.table_name', parallelism);


`FileStoreLookupFunction` 中的代码逻辑也验证了这一点,它会通过 `isRemoteServiceAvailable` 检查查询服务是否存在,如果存在,则创建远程查询的客户端。

FileStoreLookupFunction.java


// ... existing code ...
        if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
                && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
            if (isRemoteServiceAvailable(table)) {
                this.lookupTable =
                        PrimaryKeyPartialLookupTable.createRemoteTable(table, projection, joinKeys);
                LOG.info(
                        "Remote service is available. Created PrimaryKeyPartialLookupTable with remote service.");
            } else {
// ... existing code ...

不足

性能与灵活性限制

Paimon 在高频更新场景下不如专用 KV 系统,这主要是由其核心架构决定的。

a. 更新路径更长,延迟更高

  • 专用 KV 系统 (如 HBase/Redis): 通常为在线服务设计,追求极致的低延迟。一次更新操作的路径非常短,数据可以直接写入内存(如 HBase MemStore),并立即对后续查询可见,然后异步刷写到持久化存储。
  • Paimon: Paimon 的设计目标是服务于数据湖的分析场景,它通过 Mini-Batch + LSM-Tree 的方式来处理更新,优先保证吞吐量和批处理效率。一次更新的完整路径如下:
    1. 数据由 Flink Sink 算子写入内存缓冲区 (write-buffer-size)。
    2. 当 Flink Checkpoint 触发时,内存中的数据被刷写到远端存储(如 HDFS/S3),形成新的 Level-0 小文件。(本地占用达到限制也会写远程,但此时没有更新元数据,因此不可见)
    3. 生成一个新的快照(Snapshot),包含这次变更的元数据。
    4. Lookup Join 的 Source 端感知到新的快照后,开始拉取增量数据。
    5. 将增量数据更新到本地的 RocksDB 缓存中。

这个过程与 Flink 的 Checkpoint 周期强绑定,本质上是异步、微批次的。即使 Checkpoint 间隔很短,其延迟也远高于专用 KV 系统毫秒级的同步更新。

b. Compaction 开销

Paimon 依赖于 Compaction(合并)来合并小的增量文件,以保证查询性能。虽然 Compaction 是后台异步执行的,但它会消耗计算和 I/O 资源,在高频写入时,频繁的 Compaction 会对系统造成一定的压力。

c. 代码实现分析

我们来看 NoPrimaryKeyLookupTable.java,它是为无主键表(Append-Only 表)设计的 Lookup 实现。

// ... existing code ...
    @Override
    public void refresh(Iterator<InternalRow> incremental) throws IOException {
        if (userDefinedSeqComparator != null) {
            throw new IllegalArgumentException(
                    "Append table does not support user defined sequence fields.");
        }
        super.refresh(incremental);
    }

    @Override
    protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
        joinKeyRow.replaceRow(row);
        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
            if (predicate == null || predicate.test(row)) {
                state.add(joinKeyRow, row);
            }
        } else {
            throw new RuntimeException(
                    String.format(
                            "Received %s message. Only INSERT/UPDATE_AFTER values are expected here.",
                            row.getRowKind()));
        }
    }
// ... existing code ...

这里的 refresh 方法由其父类 FullCacheLookupTable 在检测到新快照时调用。它会遍历增量数据(incremental),并逐条调用 refreshRowrefreshRow 将新数据 add 到本地的 RocksDBListState 中。整个流程清晰地表明:本地缓存的更新是滞后的,它依赖于上游 Paimon 表产生新快照,这正是其不适合超高频更新场景的根本原因。

可扩展性依赖

“可扩展性依赖”问题 源于 Paimon Lookup 缓存与 Flink 计算节点的紧耦合

  • 专用 KV 系统: 其架构是存储与计算分离的。当存储容量或 QPS 成为瓶颈时,可以独立地对存储层进行水平扩展(例如,为 HBase 增加 RegionServer 节点)。系统会自动进行数据重分布(Region Splitting/Balancing),对上层应用透明。
  • Paimon Lookup: Paimon 的本地缓存(无论是部分缓存还是全量缓存)是存在于 Flink TaskManager 的本地磁盘上的。Lookup 的能力直接与 Flink Job 的并发度挂钩。
    • 水平扩展方式: 要提升 Lookup 的整体吞吐能力,你需要增加 Flink Job 的并发度,即增加 Flink 的计算节点(TaskManager Slots)。
    • Bucket 数量的制约: Paimon 表的数据是根据 bucket 值物理组织的。为了让增加的 Flink 并发能够均匀地分摊负载,理想情况下,Paimon 表的 bucket 数量应该大于或等于 Flink 的并发度。如果并发度远大于 bucket 数,很多 Flink Task 将会空闲。
    • 运维复杂度: 因此,当面临扩展需求时,可能不仅要扩展 Flink 集群,还需要重新规划 Paimon 表的 bucket 数量。而修改一个已存在表的 bucket 数是一个重操作,通常需要重写整个表的数据,这带来了额外的运维成本和复杂性。

文档中创建表的示例也说明了 bucket 是一个在表创建时就需要规划好的物理属性。

-- ... existing code ...
CREATE TABLE MyTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (
    'bucket' = '10'
-- ... existing code ...

GlobalDynamicBucketSink 在很大程度上缓解了可扩展性依赖”问题,但并没有完全消除它。

我们来看 GlobalDynamicBucketSink.java 的源码来理解它的工作原理。

// ... existing code ...
    public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
// ... existing code ...
        // Topology:
        // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket -->
        // writer --> committer

        SingleOutputStreamOperator<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
// ... existing code ...
                        .setParallelism(input.getParallelism());

        // 1. shuffle by key hash
        Integer assignerParallelism =
                MathUtils.max(
                        options.dynamicBucketInitialBuckets(),
                        options.dynamicBucketAssignerParallelism());
        if (assignerParallelism == null) {
            assignerParallelism = parallelism;
        }

        KeyPartRowChannelComputer channelComputer =
                new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys);
        DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
                partition(bootstraped, channelComputer, assignerParallelism);

        // 2. bucket-assigner
        TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
                new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =
                partitionByKeyHash
                        .transform(
                                "cross-partition-bucket-assigner",
                                rowWithBucketType,
                                GlobalIndexAssignerOperator.forRowData(table))
                        .setParallelism(partitionByKeyHash.getParallelism());
// ... existing code ...
        // 3. shuffle by bucket

        DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
                partition(bucketAssigned, new RowWithBucketChannelComputer(schema), parallelism);

        // 4. writer and committer
        return sinkFrom(partitionByBucket, createCommitUser(options.toConfiguration()));
    }
}

从代码注释和实现中我们可以看到一个清晰的 Flink 作业拓扑:

  1. bootstrap:首先通过 IndexBootstrapOperator 为每条数据加载其当前所属的分区和 bucket 信息。
  2. shuffle by key hash:然后,数据不是直接写入,而是根据主键的哈希值进行一次 shuffle。这确保了相同主键的数据会被发送到同一个 bucket-assigner 算子实例。
  3. bucket-assigner:这是核心。GlobalIndexAssignerOperator 是一个有状态的算子,它内部维护了一个全局的索引(通常基于 RocksDB),记录了每个主键当前被分配在哪个 bucket。当新数据到来时,它会查询这个索引:
    • 如果主键已存在,就将数据路由到已有的 bucket。
    • 如果主键是新的,它会根据负载情况(比如每个 bucket 的大小)为其分配一个新的 bucket。
  4. shuffle by bucketassigner 算子为每条数据打上 bucket 标签后,再进行一次 shuffle,将数据发送到负责写入对应 bucket 的 writer 算子。
  5. writer:最后由 DynamicBucketRowWriteOperator 将数据写入 Paimon 表的对应 bucket 文件中。

有了动态分桶,我们不再需要在建表时就精确地预估 bucket 数量。当数据量增长时,系统可以自动增加 bucket 的数量来分散数据,避免单个 bucket 过大导致的数据倾斜和性能问题。这极大地提升了运维的灵活性。


但请注意,bucket-assigner 和 writer 仍然是 Flink 作业的一部分,它们的处理能力依然受限于 Flink 的并发度和资源。扩展 Lookup 性能的根本方式,仍然是增加 Flink 作业的并发度(即增加 writer 的并行实例数),让更多的节点来分担写入和后续的查询压力。这与专用 KV 系统可以独立扩展存储层的模式还是有本质区别的。

Compaction压力

为了给您一个更具批判性和深度的分析,我们不能只看“有没有 Compaction”,而必须深入对比两者在更新路径、Compaction 机制、资源隔离和架构设计上的根本不同。

在线数据库 (OLTP) vs. 数据湖格式 (OLAP) 是所有差异的根源。

  • HBase: 一个纯粹的、分布式的、在线的 NoSQL 数据库。它的首要目标是为海量数据提供低延迟的随机读写。它的所有设计,包括 MemStore、WAL、Region 分片和 Compaction,都是为了服务这个在线目标。
  • Paimon: 一个面向分析场景的数据湖存储格式。它的目标是为 Flink/Spark 等计算引擎提供一个能够进行流式更新和高效分析的统一存储底座。它本质上是文件格式 + 元数据 + 一套读写和维护逻辑,其性能和行为与上层的计算引擎(Flink)强绑定。

更新路径与写入/合并的耦合性 是造成“压力”体感不同的最直接原因。

  • HBase 的解耦设计:

    1. 写入路径Client -> WAL (Write-Ahead Log) -> MemStore (内存). 这个过程极快,写入到内存即可向客户端返回成功。数据对读立即可见。
    2. 刷盘 (Flush): 当 MemStore 满了,数据才会被异步地刷写到磁盘,形成一个 HFile。这个过程不阻塞新的写入。
    3. 合并 (Compaction): RegionServer 会在后台,根据策略,独立地对磁盘上的 HFile 进行合并。这个过程也不阻塞新的写入(新数据依然写入 MemStore)。 批判性分析: HBase 通过 WAL+MemStore 这层高性能缓存,将在线写入后台维护(Flush/Compaction) 这两个操作在时间和资源上完全解耦。因此,即使后台 Compaction 压力很大,也不会直接反压到写入请求上,保证了高频写入的低延迟。
  • Paimon 的耦合性:

    1. 写入路径数据源 -> Flink Operator (内存/磁盘Buffer) -> Checkpoint 触发 -> 刷写到远端存储形成 Level-0 文件 -> Commit 元数据
    2. 合并 (Compaction): Paimon 的 Compaction 策略与写入密切相关。
      • 同步/写时合并: 在默认配置下,Compaction 可能在写入链路中被触发。例如,当 L0 文件过多时,prepareCommit 阶段可能会等待 Compaction 完成,以防止文件数量失控。这在 MergeTreeWriter.java 中有所体现,prepareCommit 方法会根据条件决定是否 waitCompaction

        MergeTreeWriter.java

        // ... existing code ...
        @Override
        public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
            flushWriteBuffer(waitCompaction, false);
            if (commitForceCompact) {
                waitCompaction = true;
            }
            // ...
            if (compactManager.shouldWaitForPreparingCheckpoint()) {
                waitCompaction = true;
            }
            trySyncLatestCompaction(waitCompaction);
            return drainIncrement();
        }
        // ... existing code ...
        
        这种耦合会导致 Compaction 的压力直接传递给 Flink 的写入算子,造成反压,进而影响整个 Flink 作业的吞吐。
      • 异步/专用合并作业: Paimon 推荐的最佳实践是将 Compaction 独立出来,由一个专门的 Flink 作业来执行。
        // ... existing code ...
        # Dedicated Compaction
        ## Dedicated Compaction Job
        to the table and perform compactions as needed.
        // ... existing code ...
        
        这确实实现了写入和合并的解耦,但代价是运维复杂度的提升。你需要为一张表维护两个 Flink 作业(一个写入,一个合并),并协调它们。

    批判性分析: Paimon 的“压力”来源于其默认模式下写入和合并的潜在耦合。虽然可以通过专用合并作业解耦,但这将问题从“性能压力”转化为了“运维复杂度”。而 HBase 在架构层面原生就解决了这个问题。

资源隔离与架构

  • HBase: 拥有独立的集群资源。RegionServer 是专门的进程,其 CPU、内存、磁盘 I/O 都服务于数据读写和合并。资源隔离清晰,问题定位和调优(比如调整 Compaction 线程池)都针对性很强。
  • Paimon: 其计算资源完全依赖 Flink。写入、合并、动态分桶的 GlobalIndexAssignerOperator 等都是 Flink 的算子,运行在 TaskManager 的 Slot 中,共享 JVM 堆内存、Managed Memory 和磁盘。
    • GlobalIndexAssignerOperator.java 的实现就是一个有状态的 Flink 算子,它的性能、状态大小、恢复速度都受限于 Flink 的框架。
       
      // ... existing code ...
      public class GlobalIndexAssignerOperator
              extends AbstractStreamOperator<Tuple2<InternalRow, Integer>>
              implements OneInputStreamOperator<
                              Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>,
                      BoundedOneInput {
      
          private static final long serialVersionUID = 1L;
      
          private final GlobalIndexAssigner assigner;
      
          // ...
      
          @Override
          public void initializeState(StateInitializationContext context) throws Exception {
              super.initializeState(context);
              // ...
              assigner.open(
                      computeManagedMemory(this),
                      ioManager,
                      // ...
          }
      // ... existing code ...
      
  • Paimon 将维护任务“寄生”在通用的计算引擎中,带来了部署上的便利,但也导致了资源竞争和隔离性不足。Compaction 任务的 I/O 尖峰会直接影响同一个 TaskManager 上运行的其他任务。而 HBase 的专用架构提供了更强的资源隔离和稳定性保障。

在接受分钟级读取延迟的前提下,为何高频更新对 Paimon 的压力比对 HBase 更大?答案不在于“Compaction”这个行为本身,而在于系统如何处理它

  1. 写入耦合性: HBase 通过 MemStore 将高频写入与后台 Compaction 完全解耦;Paimon 的默认写入模式与 Compaction 存在耦合,可能导致反压,虽然可通过专用作业解耦,但增加了运维成本。
  2. 架构与资源: HBase 是一个独立的、资源隔离的数据库系统,为 Compaction 等后台任务提供了稳定的运行环境;Paimon 的 Compaction 运行在 Flink 的共享资源环境中,更容易产生资源争抢,且其稳定性与 Flink 作业的稳定性强绑定。
  3. 设计目标: HBase 为在线服务而生,对稳定性和低延迟的追求是第一位的。Paimon 为分析而生,追求的是吞吐量和与计算生态的融合,它在架构上做出了不同的取舍。

所以,不能简单地说 Paimon 的 compaction  “有压力”,更精确的批判性描述应该是:Paimon 在应对高频更新时,其默认架构更容易将后台维护(Compaction)的压力传导至写入链路,并且其资源模型与计算引擎共享,隔离性不如专用的 KV 数据库。用户需要通过更复杂的运维手段(如专用合并作业)来缓解这种压力,这是一个架构取舍(Trade-off)的结果。

结论

Paimon 通过牺牲一部分极致的性能、灵活性和解耦的扩展性,换来了架构的简化、数据链路的统一和更低的持有成本

  • 性能与灵活性限制:Paimon 的性能和灵活性与专用的 KV 系统相比仍有差距。它更适合对查询实时性要求不高(例如,分钟级延迟可接受)且写入负载不会导致 Compaction 严重积压的维表场景。虽然理论上可以支持高频更新,但需要仔细评估其对写入端性能和资源消耗的影响。
  • 可扩展性依赖:Paimon 的可扩展性依赖于 Lookup 节点的水平扩展。虽然动态分桶(Dynamic Bucket)机制极大地增强了数据分布的灵活性,避免了预先固定 Bucket 数的僵化问题,但其扩展模型的核心仍然是通过增加 Flink 计算节点的并发度来提升整体服务能力,这与 KV 系统独立扩展存储资源的方式不同。

总结

Paimon 在维表 Join 场景的应用,是一个典型的在成本、性能和运维复杂度之间做权衡的优秀案例。

  • 收益:

    • 统一存储:将维表数据统一存储在数据湖中,无需同步到外部 KV 系统,简化了数据架构。
    • 简化链路:减少了对 HBase、Redis 等组件的依赖,降低了运维成本。
    • 成本效益:利用本地磁盘和 RocksDB,以较低的硬件成本实现了可观的 Lookup 性能。
  • 不足与权衡:

    • 性能与灵活性:对于更新极其频繁(例如,秒级或毫秒级)的维表,Paimon 的 LSM 结构和基于文件的更新模式,其性能和灵活性仍无法与专业的 KV 系统(如 HBase/Pegasus)媲美。它更适合更新频率较低的场景(分钟级、小时级或天级)。
    • 扩展性模型:Paimon Lookup 的扩展依赖于增加 Flink 计算节点(TaskManagers)的数量和调整 Paimon 表的 Bucket 数量,这是一种计算和存储耦合的扩展方式。而 KV 系统可以独立扩展其存储节点,扩展模型更清晰。

总而言之,Paimon 为数据湖场景下的维表关联提供了一个强大且经济的选择,尤其适合那些对成本敏感、维表更新不那么频繁的业务。通过其灵活的缓存策略和不断演进的优化方案(如 Query Service),Paimon 正在逐步弥补与传统方案的差距。

展望

深入挖掘流计算场景,拓宽应用范围

Paimon 从诞生之初(当时名为 Flink Table Store)就是为了解决 Flink 流式更新数据湖的痛点。

现状分析:

  • 核心能力已具备: Paimon 已经通过 Primary Key 表的 LSM 结构、多种 Merge Engine、Changelog Producer 等机制,为流式更新和流式读取(CDC)提供了强大的支持。
    • 文档中反复强调了其流批一体的能力,如 docs/content/concepts/overview.md 所述,Paimon 在流模式下像消息队列,在批模式下像 Hive 表。
  • 典型场景已落地:
    • CDC 数据入湖: 这是最经典的应用,文档 docs/content/cdc-ingestion/postgres-cdc.md 就展示了如何同步数据库 CDC 数据。
    • 流式维表 Join: 我们之前已经深入讨论过,docs/content/flink/sql-lookup.md 详细描述了此场景。
    • 流式聚合/ETL: 利用 partial-update 或 aggregation 等 Merge Engine,可以在数据写入时就完成聚合,简化流处理拓扑。

未来方向:

  • 更丰富的 Merge Engine: 可能会支持更复杂的业务逻辑,比如窗口聚合、更灵活的去重策略等。
  • 性能优化: 持续优化 Compaction 策略、索引机制(如 Z-order)、查询下推等,降低流式更新的延迟和资源消耗。
    • 生态融合: 加强与 Flink 之外的引擎(如 Spark Streaming)的流式集成。文档 docs/content/spark/sql-query.md 提到 Spark 3.3+ 已支持流式读取,未来可能会进一步增强写入和端到端的流处理能力。

建设自动化的维护服务

降低 Paimon 的运维成本,使其更易于在生产环境大规模使用。

现状分析:

  • 耦合的维护任务: 正如展望中所说,目前快照过期(Snapshot Expiration)、分区过期(Partition Expiration)、Compaction 等维护操作,要么与写入作业耦合,要么需要用户手动启动一个独立的 Dedicated Compaction 作业。
    • docs/content/maintenance/dedicated-compaction.md 详细描述了如何启动专用的合并作业,这虽然解耦了写入和合并,但增加了用户的运维负担。
  • 缺乏统一的治理平台: Paimon 自身提供了丰富的 Metrics(docs/content/maintenance/metrics.md)来暴露内部状态,但它不提供一个开箱即用的、自动化的治理服务来消费这些指标并采取行动。

未来方向 (借鉴 Iceberg 和 Amoro):

  • 独立的、自动化的维护服务: 这正是 Apache Amoro (incubating) 正在做的事情。docs/content/ecosystem/amoro.md 中提到,Amoro 旨在为数据湖格式提供一个 Lakehouse 管理系统,其核心功能之一就是自优化(Self-optimizing),包括自动的 Compaction、数据过期等。
  • 平台化、服务化: 未来理想的状态是,用户只需在平台上注册 Paimon 表,平台服务(如 Amoro Management Service, AMS)就会自动监控表的健康度(小文件数量、数据倾斜度等),并根据预设策略自动触发和调度维护任务(如 Compaction),对用户完全透明。这能极大降低使用门槛。

提升 Paimon 的 Catalog 能力

将 Paimon 从一个“存储格式”提升为一个“服务化的数据资产”,使其能被更广泛的平台和应用集成。

现状分析:

  • 多种 Catalog 后端: Paimon 目前支持多种 Catalog 实现,如 filesystemhivejdbc。这使得它可以与现有的元数据服务集成。docs/content/concepts/catalog.md 对此有详细介绍。
  • 缺乏标准化的服务接口: 现有的 Catalog 都是“嵌入式”的,即 Flink/Spark 作业在客户端直接加载 Catalog 实现类,并与后端(如 Hive Metastore)交互。这种方式存在一些问题:
    • 客户端过重: 需要在客户端配置各种依赖和认证信息。
    • 语言绑定: 主要服务于 JVM 生态(Java/Scala)。虽然有基于 py4j 的 Python API(docs/content/program-api/python-api.md),但本质还是启动 JVM,不够轻量。
    • 权限和治理不便: 难以实现统一的、跨引擎的权限控制和元数据访问审计。

未来方向 (引入 REST Catalog):

  • 对标 Iceberg REST Catalog: Iceberg 社区定义了一套标准的 REST API 来对 Catalog 进行操作(如创建表、获取表元数据等)。任何实现了这套 API 的服务都可以作为 Iceberg 的 Catalog Server。
  • Paimon 的 REST Catalogdocs/content/concepts/rest/overview.md 和 paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 的存在表明,Paimon 社区已经正在实现这个功能。
     
    // ... existing code ...
    public class RESTCatalog implements Catalog {
    
        public static final String HEADER_PREFIX = "header.";
        public static final String MAX_RESULTS = "maxResults";
        public static final String PAGE_TOKEN = "pageToken";
        public static final String QUERY_PARAMETER_WAREHOUSE_KEY = "warehouse";
    
        private final RESTClient client;
        private final ResourcePaths resourcePaths;
    // ... existing code ...
    
  • 带来的好处:
    1. 解耦与轻客户端: 客户端只需通过 HTTP 与 REST 服务交互,无需关心后端的具体实现(是 Hive 还是 JDBC),也无需引入厚重的依赖。
    2. 语言无关: 任何语言(Python, Go, C++ 等)都可以通过 HTTP 客户端轻松集成 Paimon 的元数据管理能力。
    3. 统一治理: 可以在 REST 服务端实现统一的认证、鉴权、审计和策略控制,方便平台进行数据治理。

综上所述,这三个展望描绘了一个清晰的蓝图:在内核层持续深耕流计算能力,在运维层通过自动化服务降低用户成本,在服务层通过标准化的 REST API 开放生态。这三者相辅相成,将推动 Paimon 成为一个更强大、更易用、更开放的实时数据湖平台。


网站公告

今日签到

点亮在社区的每一天
去签到