Fluss
在Fluss出现之前,实时数据处理领域普遍采用以Apache Kafka为代表的消息队列和以Apache Flink为代表的流处理引擎组成的“黄金搭档”。然而,这个架构在面向 实时分析(Real-time Analytics) 的场景下暴露了诸多弊端,这些弊端正是Fluss诞生的驱动力。
传统消息队列为“消息”而非“分析”设计:Kafka等系统采用行式存储,其设计目标是作为可靠的日志管道(Log Pipe),保证消息的完整投递。但在分析场景下,用户通常只关心海量字段中的某几个(即“列”)。行式存储迫使计算引擎(如Flink)必须读取并反序列化整行数据,然后才能丢弃不需要的列。这个过程不仅在磁盘IO和网络传输上造成了巨大浪费,还在Flink端增加了额外的CPU开销。
流处理中的“状态之痛”:复杂的流处理任务,特别是双流Join,需要在Flink内部维护巨大的状态(State)。随着业务发展,这个状态可能膨胀到数十TB甚至上百TB。这带来了几个严峻挑战:
- 稳定性差:巨大的状态导致Checkpoint(检查点)耗时极长,容易超时失败,进而引发任务频繁重启,严重影响作业稳定性。
- 运维困难:状态的恢复和回溯(backfill)成本极高,一次数据订正或逻辑升级可能需要数天时间。
- 资源隔离性差:计算与状态存储耦合在Flink的TaskManager中,无法独立扩展,资源竞争激烈。
数据孤岛与“黑盒”问题:
- 数据可见性差:Kafka中的数据和Flink的内部状态对于开发者来说是“黑盒”。无法像数据库一样轻松地进行点查(例如,查询某个特定key的数据)或进行数据探查(如
LIMIT
、COUNT
),导致问题排查和数据调试极为低效。 数据无法实时更新的问题:Kafka 中的数据是仅追加、不可变的。如果业务需要更新或删除已有数据(例如,修正订单状态、更新用户信息),在 Kafka 中实现起来非常复杂,通常需要依赖下游的计算引擎或数据库来处理。
- 湖流割裂(Lake-Stream Divide):实时数据(热数据)存储在消息队列中,而历史数据(冷数据)沉淀在数据湖(如Paimon, Iceberg)中。两者在存储上是割裂的,形成了数据孤岛。用户无法用一套统一的接口对全量数据进行分析,数据的回溯和回填流程复杂且存在延迟。
- 数据可见性差:Kafka中的数据和Flink的内部状态对于开发者来说是“黑盒”。无法像数据库一样轻松地进行点查(例如,查询某个特定key的数据)或进行数据探查(如
为了填补 “为分析而生的流式存储” 这一市场空白,解决上述痛点,Fluss应运而生。其名称源自“FLink Unified Streaming Storage”,寓意着它与Flink的深度融合;同时“Fluss”在德语中意为“河流”,象征着数据如河流般汇入数据湖,生生不息。
Fluss是什么?
Fluss是一个为实时分析场景构建的下一代流式存储系统。
它旨在成为湖仓(Lakehouse)架构的实时数据层,通过创新地将列式存储和实时更新能力融合到流存储中,从根本上解决了传统“消息队列+流引擎”架构在分析场景的短板。
根据项目文档(website/docs/intro.md
),Fluss的核心定位是:
"Fluss is a streaming storage built for real-time analytics which can serve as the real-time data layer for Lakehouse architectures."
它具备以下关键特征:
- 两种表类型:提供仅追加的日志表(Log Table)和支持增删改的主键表(Primary Key Table),满足不同业务需求。
- 毫秒级读写延迟:具备与消息队列相媲美的低延迟、高吞吐能力。
- 深度引擎集成:与Apache Flink深度集成,并规划支持Spark、Trino等更多计算引擎。
- 云原生与开源:采用存算分离架构,目前已进入Apache软件基金会(ASF)孵化,拥有活跃的开源社区。
Fluss如何解决核心问题?
Fluss通过四大核心能力,精准地解决了上述背景中提到的痛点。
1. 问题:分析效率低下
解决方案:可裁剪的列式流(Columnar Stream with Server-Side Pruning)
Fluss从根本上改变了数据的存储方式。它将实时流入的数据以Apache Arrow列式格式进行存储(website/blog/2024-12-12-fluss-intro.md
)。
- 工作原理:当Flink等计算引擎发起读取请求时,它会告诉Fluss自己需要哪些列。Fluss在 服务端(Server-Side) 完成列裁剪,只将请求的列数据通过网络传输给Flink。
- 论证与收益:
- IO与网络优化:避免了读取和传输无关数据,极大降低了IO和网络开销。根据淘天业务实践,当43个字段的表只消费13个时,IO消耗降低了20%以上。官方基准测试显示,读取10%的列可以带来近10倍的吞吐提升。
- 计算资源节省:由于数据裁剪前置在存储层,Flink无需再承担反序列化和裁剪的开销,节省了宝贵的CPU资源。
2. 问题:大State作业不稳定且运维复杂
解决方案:KV存储与增量合并引擎(Merge Engine + Delta Join)
Fluss通过其主键表(Primary Key Table)和独特的Delta Join范式,将Flink中沉重的State外置到Fluss存储层,实现了计算与状态的解耦。
- 工作原理:
- 外置排序State:对于需要排序的场景(如取最新值),不再需要Flink的排序State。数据直接写入Fluss主键表,利用其内置的Merge Engine(如
versioned
或first_row
)在存储层完成数据的合并与去重,保证了主键的唯一性。 - 外置Join State (Delta Join):对于双流Join,不再让Flink在内存中缓存两路流的数据。取而代之的是,将两条流分别写入两个Fluss主键表。当一条流的数据到达时,它会触发一次对另一路流对应Fluss表的 点查(Point Lookup) 来完成Join。这巧妙地将Join State从Flink内部转移到了Fluss的持久化存储中。
- 外置排序State:对于需要排序的场景(如取最新值),不再需要Flink的排序State。数据直接写入Fluss主键表,利用其内置的Merge Engine(如
- 论证与收益:
- 稳定性与性能:淘天成交归因任务通过此方案成功卸载了95TB的Flink State,使得Checkpoint时间大幅缩短,任务CPU和内存使用降低了**80%**以上,彻底解决了大State导致的稳定性问题。
- 灵活性与效率:State外置后,业务逻辑的迭代和数据回溯变得极为高效。回追一天的数据,耗时比传统双流Join减少了**70%**以上。
3. 问题:数据“黑盒”与调试困难
解决方案:实时数据探查(Real-time Point Query & State "White-Boxing")
Fluss的主键表内置了基于LSM-Tree的KV索引,使其不仅是流存储,还是一个高性能的KV数据库。
- 工作原理:
- 直接点查:用户可以通过API或SQL直接对Fluss表进行高性能的主键点查,以及
LIMIT
、COUNT
等探索性查询。 - 实时更新:Fluss 原生支持流式更新。它内部维护了一个可变的键值存储(KvStore),可以直接对数据进行更新和删除,并生成相应的变更日志(CDC)。
- State白盒化:由于Flink State已被外置到Fluss主键表中,开发者可以直接查询这些“State表”,实时查看State的内容,极大地提升了问题定位和调试的效率。
- 直接点查:用户可以通过API或SQL直接对Fluss表进行高性能的主键点查,以及
- 论证与收益:
- 提升开发效率:解决了传统消息队列和Flink State无法探查的痛点,实现了从“黑盒”到“白盒”的转变,显著提高了开发和运维效率。
4. 问题:湖流割裂
解决方案:湖流一体(Unified Stream and Lakehouse)
Fluss通过内置的 分层服务(Tiering Service)和联合读取(Union Read) 能力,无缝打通了实时流存储与离线湖存储。
- 工作原理:
- 自动归档:Fluss内部的Tiering Service会自动将实时数据(热数据)平滑地、异步地沉降(Tiering)到指定的湖存储(如Apache Paimon)中,形成历史数据(冷数据)。这个过程对用户透明,保证了元数据的一致性。
- 统一访问:Flink可以通过Union Read功能,发起一个查询,该查询可以同时访问Fluss中的实时数据和Paimon中的历史数据,获得一个逻辑上统一、完整的视图。
- 论证与收益:
- 打破数据孤岛:实现了流(Fluss)与湖(Paimon)的无缝集成,提供秒级新鲜度的统一数据分析能力。
- 简化架构:避免了复杂的ETL链路和数据冗余,降低了架构复杂度和维护成本。
- 高效回溯:数据回溯和回填可以直接通过统一视图完成,极大提升了效率。
关键特性与架构实现
我们来看看介绍中的核心特性是如何通过其架构实现的:
核心特性 |
技术架构实现 |
分析 |
---|---|---|
实时读写 & 流式更新 |
由 TabletServer 中的 LogStore 和 KvStore 协同实现。 |
这是对 Kafka "仅追加、不可变"模型的直接改进。 |
CDC 订阅 |
KvStore 的数据变更会生成完整的变更日志(Changelog),并存储在 LogStore 中。 |
这个特性非常强大。它意味着任何对主数据的更新都会产生一条可供下游消费的 CDC 数据流。这使得构建端到端的实时数据链路成为可能,例如,下游系统可以订阅这些变更来更新物化视图或同步到其他系统。 |
列式裁剪 & 低成本 |
介绍中提到使用列存格式。 |
这是分析型数据库的典型特征。通过按列存储数据,当查询只涉及部分列时,系统只需读取相关列的数据,可以极大减少 I/O 和网络传输,从而将查询性能提升10倍并降低成本。 |
实时点查 |
由 KvStore 提供高性能的主键点查能力。 |
这使得 Fluss 可以直接作为实时处理任务中的维表(Dimension Table)使用,解决了 Flink 在进行流与维表关联时,需要依赖外部 KV 存储(如 HBase, Redis)的痛点,简化了架构。 |
湖流一体 |
数据可以分层存储到远端存储 (Remote Storage)。 |
这是实现成本效益和强大分析能力的关键。热数据存储在 TabletServer 中以保证低延迟读写,而冷数据则可以卸载到成本更低的对象存储(如 S3),形成数据湖。这种架构无缝集成了流存储和数据湖,既为数据湖提供了新鲜的实时数据源,也让流存储中的数据能利用数据湖强大的批量分析能力。 |
系统架构
根据官方文档,Fluss 是一个标准的分布式系统,其主要组件包括:
CoordinatorServer: 集群的"大脑",负责元数据管理、节点管理、数据均衡和故障恢复等控制平面的任务。
TabletServer: 集群的"肌肉",负责存储数据(
LogStore
和KvStore
)、响应用户的读写请求等数据平面的任务。Zookeeper: 用于集群协调和元数据存储(未来计划用内部组件替换,以减少外部依赖)。
Remote Storage: 作为持久化和分层存储的底座,是实现"湖流一体"的关键。
Client (Flink Connector): 提供与计算生态(特别是 Flink)无缝集成的接口,用户可以通过 Flink SQL 方便地操作 Fluss。
总结
Fluss的出现并非是对现有技术的简单替代,而是针对实时分析这一核心场景的深刻洞察与架构重塑。它通过列式存储解决了分析效率问题,通过State外置解决了大作业稳定性问题,通过KV能力解决了数据可见性问题,并通过湖流一体架构解决了数据孤岛问题。
这些环环相扣的解决方案,使其成为构建下一代高效、稳定、透明的实时数仓和数据湖仓(Lakehouse)的理想基石。
Fluss 项目的整体结构
Apache Fluss 是一个为实时分析而构建的流式存储系统,旨在作为现代数据湖(Lakehouse)架构的实时数据层。它通过创新的设计,连接了流处理和数据湖,实现了低延迟、高吞-吐的数据处理。
项目主要结构
Fluss 是一个基于 Java 和 Maven 构建的多模块项目。从文件树和 pom.xml
配置文件可以看出其清晰的模块划分,每个模块各司其职:
fluss-common
: 核心公共模块,包含了被项目内其他模块广泛使用的工具类、自定义注解、以及核心的抽象定义。例如,FileSystem
的抽象接口 (fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
) 和插件加载工具 (PluginUtils.java
) 就在这个模块中定义,为支持多种底层存储系统和可扩展性提供了基础。fluss-rpc
: 负责客户端(Client)和服务端(Server)之间的远程过程调用(RPC)通信。它定义了通信协议和服务接口,是系统分布式能力的基础。从pom.xml
(fluss-rpc/pom.xml
) 可以看到它依赖于fluss-common
。fluss-client
: 客户端模块。它封装了与 Fluss 服务端交互的细节,为上层应用(如 Flink 作业)提供了简洁易用的 API。NOTICE
文件 (fluss-client/src/main/resources/META-INF/NOTICE
) 显示它依赖了一些基础库,如frocksdbjni
,可能用于客户端的某些状态缓存。fluss-server
: 服务端核心模块。这是 Fluss 的大脑,负责处理客户端的读写请求、管理表和数据的元信息、维护数据一致性、执行数据压缩和分层存储等核心逻辑。其NOTICE
文件 (fluss-server/src/main/resources/META-INF/NOTICE
) 表明它使用了frocksdbjni
(RocksDB 的 JNI 封装)。fluss-flink/
: Flink 集成模块。这是 Fluss 连接生态系统的关键部分,它被进一步划分为:fluss-flink-common
: 包含了与 Flink 集成的通用逻辑,如FlussSource
和FlussSourceBuilder
的基本实现。它依赖fluss-client
来与 Fluss 服务端通信。fluss-flink-1.18
,fluss-flink-1.19
,fluss-flink-1.20
: 针对不同 Flink 版本的特定连接器实现。这种结构保证了 Fluss 对主流 Flink 版本的良好兼容性和支持。
fluss-filesystems/
: 可插拔的文件系统支持模块。Fluss 的存储层是抽象的,这个模块下包含了对具体文件系统的实现。例如,fluss-fs-oss
(fluss-filesystems/fluss-fs-oss/pom.xml
) 实现了对阿里云对象存储(OSS)的支持。这使得 Fluss 可以轻松地部署在不同的云环境或本地数据中心。fluss-lake/
: 与数据湖格式集成的模块。例如fluss-lake-lance/pom.xml
表明 Fluss 能够与 Lance 这种现代列式数据格式进行集成,这是其实现高性能分析查询(如列裁剪)的关键。fluss-dist
: 发行版打包模块。它负责将fluss-server
、各种插件(如文件系统插件)以及依赖项打包成一个可以独立部署和运行的软件包。从pom.xml
(fluss-dist/pom.xml
) 可以看到它聚合了fluss-server
和fluss-fs-*
等模块。website/
: 项目的官方文档网站,使用 Docusaurus 构建,提供了丰富的入门指南、架构说明和开发文档。
架构设计
Fluss 采用了经典的分层、客户端-服务端(Client-Server)的分布式架构,其设计思想体现了高内聚、低耦合和高可扩展性。
分层架构
- 集成与应用层 (Integration & Application Layer): 这是最顶层,直接面向用户。用户通过 Apache Flink 等计算引擎的连接器(Connector)来读写 Fluss 中的数据。
fluss-flink
模块就属于这一层,其中的FlussSource
(fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSource.java
) 是 Flink 作业消费 Fluss 数据的入口。 - 客户端层 (Client Layer):
fluss-client
模块。它为上层应用屏蔽了底层 RPC 通信、服务发现、数据序列化等复杂性,提供了面向流和表的操作 API。 - 服务端层 (Server Layer):
fluss-server
模块。作为系统的核心,它是一个独立的、可水平扩展的集群。负责处理所有的数据读写请求、管理表的 Schema 和元数据、执行后台的数据整理(Compaction)和数据分层(Tiering)任务。 - 存储层 (Storage Layer): 这是一个抽象层。Fluss 将数据存储在底层的持久化文件系统上。通过
fluss-common
中定义的FileSystem
接口,Fluss 将自身逻辑与具体的存储实现解耦。fluss-filesystems
目录下的模块(如fluss-fs-oss
)提供了对 HDFS, S3, OSS 等多种存储系统的具体实现。
- 集成与应用层 (Integration & Application Layer): 这是最顶层,直接面向用户。用户通过 Apache Flink 等计算引擎的连接器(Connector)来读写 Fluss 中的数据。
核心设计理念
- 存算分离: Fluss Server 作为存储集群,与 Flink 等计算集群分离部署。这种架构提供了极大的灵活性和可扩展性,计算和存储资源可以根据各自的负载独立伸缩。
- 可插拔性 (Pluggability):
FileSystem
的设计是可插拔架构的最佳体现。用户可以根据自己的基础设施选择合适的存储后端,甚至可以开发自己的插件来支持私有存储。PluginUtils.java
的存在也暗示了项目广泛使用了 Java SPI (Service Provider Interface) 机制来动态加载插件。 - 列式流存储 (Columnar Stream): 这是 Fluss 的一个核心创新。不同于 Kafka 等将数据作为字节流存储,Fluss 将流数据以列式格式存储。如博客文章 (
website/blog/2025-07-10-fluss-joins-asf.md
) 所述,这使得在流式读取时可以进行列裁剪(Column Pruning)和谓词下推,极大地提升了分析查询的性能并降低了网络开销。fluss-lake-lance
模块的集成也印证了这一点。 - 流与湖的统一: Fluss 旨在打破流存储和数据湖之间的壁垒。它既能提供流式数据亚秒级的实时读写,又能通过其列式存储和对数据湖格式的兼容,高效地支持分析查询,扮演了实时数据入湖前置存储和加速层的角色。
模块与类之间的关系
模块间的依赖关系清晰地定义了类之间的调用关系:
数据流向: 一个典型的数据读取流程是:
- Flink 作业中的
FlussSource
(fluss-flink-common
) 被创建。 FlussSource
内部会使用fluss-client
提供的 API 来创建与 Fluss Server 的连接。fluss-client
通过fluss-rpc
模块,向fluss-server
发起数据读取请求。fluss-server
接收到请求后,根据元数据定位到数据所在的文件。fluss-server
通过FileSystem
的抽象接口,从底层存储(如 OSS)读取数据文件。- 数据通过
fluss-rpc
返回给fluss-client
,最终被FlussSource
消费。
- Flink 作业中的
依赖关系:
fluss-common
是最基础的模块,不依赖项目中任何其他模块。- 几乎所有其他核心模块(
fluss-rpc
,fluss-client
,fluss-server
,fluss-filesystems/*
等)都依赖fluss-common
。 fluss-client
和fluss-server
都依赖fluss-rpc
来进行通信。fluss-flink-common
依赖fluss-client
,作为 Flink 与 Fluss 系统交互的桥梁。fluss-dist
模块在打包时,会将fluss-server
和所有需要的fluss-filesystems
插件聚合在一起,形成一个完整的、可部署的服务。
总而言之,Fluss 通过一个精心设计的多模块、分层、可插拔的架构,成功地将列式存储的分析优势引入到流式存储领域。其清晰的模块划分和依赖关系,不仅保证了项目本身的高内聚和低耦合,也为未来的功能扩展和生态集成打下了坚实的基础。
Key-Value 存储引擎
实现了毫秒级的快速查找。其核心实现主要依赖于以下几点:
LSM-Tree 架构: Fluss 的 KV 存储引擎基于 LSM-Tree (Log-Structured Merge Tree) 架构。这种架构非常适合高吞吐的写操作和快速的点查询。
- 写入优化: 数据首先被写入内存中的 MemTable,然后批量刷写到磁盘上不可变的 SSTable 文件中。这使得随机写操作转换为了顺序写,效率极高。
- 快速查找: 查询时,系统会首先检查内存中的 MemTable,如果数据不存在,再逐层查找磁盘上的 SSTable 文件。由于 SSTable 内部是有序的,并且通常有布隆过滤器等索引结构,查找速度非常快。
- 底层实现 (RocksDB): 从代码库中的 RocksDBKvBuilder.java 等文件可以看出,Fluss 底层使用了 RocksDB 作为其本地 KV 存储引擎。RocksDB 是一个由 Facebook 开源的高性能嵌入式 KV 数据库,它本身就是基于 LSM-Tree 实现的,为 Fluss 提供了稳定和高效的单机读写能力。
日志流上的 KV 索引: 如 taobao-practice.md 中所述,Fluss 在高吞吐的日志流之上直接构建 KV 索引。这意味着数据一旦写入,就能立刻通过主键进行查询。这种设计不仅实现了快速查找,还使得其产生的 changelog (变更日志) 能被 Apache Flink 等流计算引擎原生消费,极大地简化了实时数仓的架构,降低了计算和延迟。
总结来说,Fluss 结合了 LSM-Tree 的理论优势和 RocksDB 的成熟工程实现,并将其创新性地应用在流存储之上,从而为实时分析场景提供了高 QPS、毫秒级延迟的 KV 查找能力。
com.alibaba.fluss.server.log
从代码结构来看,这套日志存储(LogStore)的设计思想与 Apache Kafka 非常相似,它是一个高性能、持久化、分布式的 仅追加日志(Append-only Log) 系统。
其核心实现可以概括为以下几个层次和关键组件:
1. 核心分层结构
存储系统是分层的,从管理范围由大到小依次是:
LogManager
: 最高层的管理器。它负责管理一个 Tablet Server 节点上所有的日志实例。它维护着从TableBucket
(可以理解为表的一个分桶)到LogTablet
的映射关系,并处理日志的加载、创建和清理。LogTablet
: 代表一个TableBucket
的完整存储实例。它封装了底层的LocalLog
和RemoteLogManager
,并处理与副本、写入状态(WriterStateManager
)相关的复杂逻辑。对上层来说,读写操作的主要入口就是LogTablet
。LocalLog
: 本地日志的核心实现。它管理着一组日志分段文件(LogSegment
),负责将数据写入本地磁盘。这个类不是线程安全的,其并发控制依赖于上层的LogTablet
。LogSegments
: 一个持有LogSegment
集合的容器,通常是一个按起始偏移量(Base Offset)排序的跳表(ConcurrentSkipListMap
)。它能根据给定的偏移量快速定位到对应的LogSegment
。LogSegment
: 日志的基本存储单元,对应磁盘上的一个数据文件(.log
)和两个索引文件(.index
和.timeindex
)。当一个LogSegment
达到预设的大小或时间阈值后,它会被“滚动”(roll),即变为只读,并创建一个新的LogSegment
用于写入。
2. 关键组件和工作流程
写入流程 (Append)
- 数据写入请求到达
LogTablet
。 LogTablet
将请求转发给LocalLog
。LocalLog
定位到当前活跃的(active)LogSegment
。- 数据被追加到该
LogSegment
对应的.log
文件的末尾。 LogSegment
会定期更新.index
和.timeindex
文件,以建立偏移量/时间戳到物理文件位置的映射。
读取流程 (Read/Fetch)
- 读取请求(如从某个
offset
开始读)到达LogTablet
。 LogTablet
将请求转发给LocalLog
。LocalLog
使用LogSegments
容器,通过二分查找快速定位到包含该offset
的LogSegment
。- 在找到的
LogSegment
中,使用OffsetIndex
(.index
文件) 再次进行二分查找,找到不大于目标offset
的最大索引条目,从而获得数据在.log
文件中的物理起始位置。 - 从该物理位置开始顺序读取数据。
索引文件
OffsetIndex
(.index
文件): 存储稀疏的偏移量索引。它不为每一条消息都建立索引,而是每隔N字节或N条消息记录一条,内容是<相对偏移量, 物理位置>
。这使得在占用空间很小的情况下,依然能极大地加速查找过程。TimeIndex
(.timeindex
文件): 类似地,存储时间戳到偏移量的映射,用于支持按时间戳查找。
日志滚动与清理
- 滚动 (
roll
): 当活跃的LogSegment
满足滚动条件(如文件大小达到log.segment.bytes
),LocalLog
会关闭当前活跃的 segment,并以当前的nextOffset
作为起始偏移量创建一个新的LogSegment
。 - 清理:
LogManager
会定期检查旧的LogSegment
,根据保留策略(如时间或总大小)删除不再需要的 segment 文件,释放磁盘空间。
容灾与恢复
recoveryPoint
:LocalLog
中一个重要的概念,表示已经安全刷写到磁盘(flushed)的最新偏移量。当服务重启时,会从recoveryPoint
之后开始恢复,丢弃未刷盘的数据,保证一致性。LogLoader
: 服务启动时,由LogLoader
负责扫描日志目录,加载所有的LogSegment
和索引,并恢复出每个LocalLog
的状态。
分层存储 (Tiered Storage)
- 包内还包含了
remote
相关的类,如RemoteLogManager
和RemoteLogStorage
。这表明系统支持分层存储。 - 当本地的
LogSegment
被滚动变为只读后,RemoteLogManager
中的LogTieringTask
会将这些 segment 文件异步上传到一个更廉价的远程存储系统(如 HDFS, S3, OSS)。这样既能保证热数据的本地快速访问,又能低成本地长期保存冷数据。
总结
com.alibaba.fluss.server.log
包实现了一个高效、可靠且可扩展的日志存储系统:
- 高性能: 通过顺序写、内存映射(MMap)和稀疏索引机制,实现了极高的写入和读取吞吐量。
- 持久化: 数据通过刷盘机制持久化到磁盘,并通过
recoveryPoint
保证故障恢复后的一致性。 - 可扩展: 日志被切分成段(Segment),便于管理和清理,也为分层存储打下了基础。
- 成本效益: 通过分层存储,将冷数据转移到低成本的远程文件系统,有效降低了存储成本。
TabletServer
TabletServer
本身是一个协调者和生命周期管理器。它并不直接处理每一次读写请求中 KV 和 Log 的交互细节,而是负责初始化并“组装”负责这些工作的核心组件,即 KvManager
和 LogManager
。
真正的、紧密的联系发生在更底层的 KvTablet
层面。Fluss 采用预写日志(Write-Ahead Logging, WAL)的设计模式来保证数据的一致性和持久性。这意味着对于支持更新的 PrimaryKey 表,任何对 KV 存储的修改(增、删、改),都必须先将这次变更操作记录到 Log 存储中,成功之后才能继续后续的 KV 更新流程。
下面我们通过代码来验证这一流程。
TabletServer
:组件的初始化与装配
在 TabletServer
的 startServices
方法中,我们可以清晰地看到各个管理器的初始化顺序。
// ... existing code ...
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();
this.logManager = LogManager.create(conf, zkClient, scheduler, clock);
logManager.startup();
this.kvManager = KvManager.create(conf, zkClient, logManager);
kvManager.startup();
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
// ... existing code ...
分析:
TabletServer
首先创建并启动了logManager
。- 紧接着,它在创建
kvManager
时,将刚刚创建的logManager
实例作为参数传递了进去:KvManager.create(conf, zkClient, logManager)
。
这就在管理器层面建立了依赖关系:KvManager
的运行依赖于 LogManager
。
KvTablet
:WAL 机制的核心实现
KvManager
负责管理 KvTablet
的生命周期。当它创建一个 KvTablet
实例时,会从 LogManager
获取对应的 LogTablet
,并将后者注入到前者中。这种关系在 KvTablet
的成员变量和构造函数中得到了体现。
// ... existing code ...
@ThreadSafe
public final class KvTablet {
// ... existing code ...
private final PhysicalTablePath physicalPath;
private final TableBucket tableBucket;
private final LogTablet logTablet;
// ... existing code ...
private final RocksDBKv rocksDBKv;
private final KvPreWriteBuffer kvPreWriteBuffer;
// ... existing code ...
public KvTablet(
PhysicalTablePath physicalPath,
// ... existing code ...
LogTablet logTablet,
KvFormat kvFormat,
// ... existing code ...
分析:
KvTablet
类中持有一个final
的LogTablet
成员变量。这表明每个KvTablet
实例都与一个确定的LogTablet
实例绑定,负责记录其变更日志。
当有数据写入 KvTablet
时,这种绑定关系就开始发挥作用。putAsLeader
方法是处理主副本(Leader)写入的核心逻辑。
// ... existing code ...
public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
throws Exception {
return inWriteLock(
kvLock,
() -> {
// ... 省略了部分代码 ...
// 1. 为变更构建 WAL (Write-Ahead Log) 记录
WALBuilder walBuilder = createWALBuilder();
// ... 遍历 kvRecords, 合并数据并生成变更 ...
// walBuilder.append(ChangeType.INSERT, newRow);
// walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
// ...
// 2. 将构建好的 WAL 记录写入 LogTablet
LogAppendInfo logAppendInfo = logTablet.appendAsLeader(walBuilder.build());
// 3. 如果日志写入成功 (非重复请求), 则将变更写入内存缓冲区 (kvPreWriteBuffer)
// 后续会由后台线程异步刷写到 RocksDB
if (logAppendInfo.duplicated()) {
kvPreWriteBuffer.truncateTo(
logEndOffsetOfPrevBatch, TruncateReason.DUPLICATED);
}
return logAppendInfo;
});
}
// ... existing code ...
分析:
- 构建日志: 当
putAsLeader
被调用时,它首先会根据传入的KvRecordBatch
(一批KV记录)和存储中已有的旧值,计算出需要发生的具体变更(Insert, Update, Delete),并使用WALBuilder
来构建一条或多条日志条目。 - 写入日志: 最关键的一步是调用
logTablet.appendAsLeader(...)
。这里它将构建好的日志数据发送给LogTablet
进行持久化。 - 更新KV状态: 只有当日志成功写入后,
KvTablet
才会继续更新它自己的内部状态(先写入内存中的kvPreWriteBuffer
,最终再刷盘到 RocksDB)。
这个流程完美地诠释了 WAL 机制:数据变更必须先以日志的形式被持久化,然后才能更新数据本身。
TabletServer
如何响应读请求?
TabletServer
自身不直接处理 I/O 请求,它会启动一个 RPC 服务 TabletService
来接收和响应客户端的请求。
// ... existing code ...
this.tabletService =
new TabletService(
serverId,
remoteFileSystem,
zkClient,
replicaManager,
metadataCache,
metadataManager,
authorizer);
RequestsMetrics requestsMetrics =
RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);
this.rpcServer =
RpcServer.create(
conf,
endpoints,
tabletService,
tabletServerMetricGroup,
requestsMetrics);
rpcServer.start();
// ... existing code ...
TabletService
实现了 TabletServerGateway
接口,该接口定义了所有可用的读写操作,例如:
fetchLog
: 从日志表读取数据。lookup
: 根据主键进行点查。prefixLookup
: 根据主键前缀进行查询。
TabletServerGateway.java
/**
* Fetch log data from the specified table bucket. The request can send by the client scanner or
* other tablet server.
*
* @return the fetch response.
*/
@RPC(api = ApiKeys.FETCH_LOG)
CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest request);
/**
* Put kv data to the specified table bucket.
*
* @return the produce response.
*/
@RPC(api = ApiKeys.PUT_KV)
CompletableFuture<PutKvResponse> putKv(PutKvRequest request);
/**
* Lookup value from the specified table bucket by key.
*
* @return the fetch response.
*/
@RPC(api = ApiKeys.LOOKUP)
CompletableFuture<LookupResponse> lookup(LookupRequest request);
/**
* Prefix lookup to get value by prefix key.
*
* @return Prefix lookup response.
*/
@RPC(api = ApiKeys.PREFIX_LOOKUP)
CompletableFuture<PrefixLookupResponse> prefixLookup(PrefixLookupRequest request);
TabletService
会将这些请求进一步委托给 ReplicaManager
进行处理。
单元测试中的验证
这种行为在项目的单元测试 KvTabletTest.java
中得到了反复的验证。测试用例的普遍模式是:
- 调用
kvTablet.putAsLeader()
写入一批数据。 - 调用
readLogRecords()
方法从logTablet
中读取日志。 - 断言读出的日志内容与预期的变更类型和数据完全一致。
例如,下面的测试代码片段(逻辑简化)展示了这一点:
// ... existing code ...
@Test
public void testPartialUpdate() throws Exception {
// ... existing code ...
// 1. 准备一批KV数据
KvRecordBatch kvRecordBatch =
kvRecordBatchFactory.ofRecords(
// ... records ...
);
// 2. 调用 putAsLeader 写入 KvTablet
kvTablet.putAsLeader(kvRecordBatch, targetColumns);
// 3. 从 LogTablet 读取日志
LogRecords actualLogRecords = readLogRecords();
// 4. 准备预期的日志内容
List<MemoryLogRecords> expectedLogs =
Collections.singletonList(
logRecords(
// ... expected changes ...
));
// 5. 断言实际日志与预期日志相符
checkEqual(actualLogRecords, expectedLogs, rowType);
// ... existing code ...
总结
TabletServer
通过依赖注入的方式,在 KvManager
创建时传入了 logManager
,从而在宏观上建立了 KV 和 Log 组件的联系。
而微观上,每一个 KvTablet
实例都与一个 LogTablet
实例紧密耦合。通过严格执行 Write-Ahead Logging (WAL) 策略——即在 putAsLeader
方法中,先将变更写入 LogTablet
,再更新自身状态——Fluss 确保了 PrimaryKey
表的数据一致性、可恢复性,并天然地生成了可供下游消费的 Changelog(变更日志流)。这种设计也是其能够高效支持实时更新和点查的关键所在。
为什么不使用Rocks DB 的 WAL
Fluss 在使用 RocksDB 时,特意禁用了 RocksDB 内置的 WAL (Write-Ahead Logging) 功能。这是一个经过深思熟虑的架构决策。
根本原因在于:Fluss 在 RocksDB 之上构建了自己的一套更高级、更符合其分布式流存储定位的 WAL 机制,即 LogTablet
。
我们可以从代码中直接找到禁用 RocksDB WAL 的证据。
在 RocksDBResourceContainer.java
中,创建写选项 WriteOptions
时,明确地调用了 setDisableWAL(true)
。
// ... existing code ...
/** Gets the RocksDB {@link WriteOptions} to be used for write operations. */
public WriteOptions getWriteOptions() {
// Disable WAL by default
WriteOptions opt = new WriteOptions().setDisableWAL(true);
handlesToClose.add(opt);
return opt;
}
/** Gets the RocksDB {@link ReadOptions} to be used for read operations. */
// ... existing code ...
同样,在 RocksDBWriteBatchWrapper.java
中,创建批量写入的 WriteOptions
时也禁用了 WAL。
// ... existing code ...
this.toClose.add(this.batch);
// Use default write options with disabled WAL
this.options = new WriteOptions().setDisableWAL(true);
// We own this object, so we must ensure that we close it.
this.toClose.add(this.options);
}
// ... existing code ...
为什么这么做?
既然已经有了 RocksDB 的 WAL,为什么还要自己实现一套呢?主要有以下几个关键原因:
为分布式环境设计:
- RocksDB 是一个嵌入式的本地存储引擎,它的 WAL 文件也是写在本地磁盘上的,主要用于单机故障恢复。
- Fluss 是一个分布式存储系统。它的 WAL (
LogTablet
) 是建立在可插拔的FileSystem
抽象之上的,可以将日志持久化到 HDFS、S3、OSS 等分布式文件系统中。这使得当某个TabletServer
节点宕机后,可以从一个高可用的共享存储中恢复数据和状态,这是分布式系统容错能力的基础。
WAL 即数据流 (Changelog as a Stream):
- 在 Fluss 的设计理念中,WAL 不仅仅是用于内部恢复的日志,它本身就是一份宝贵的数据——变更数据捕获 (CDC) 日志流。
- 这份由
LogTablet
产生的日志流可以被下游的流处理引擎(如 Flink)直接消费,用于实时分析、构建物化视图等场景。Fluss 通过ArrowWalBuilder
等实现,可以生成高效的、列式的、对分析友好的日志格式。 - 而 RocksDB 的 WAL 是其内部实现细节,格式不透明,且不适合作为外部系统消费的数据源。
避免双重写入开销:
- 如果同时开启 Fluss 的
LogTablet
和 RocksDB 的 WAL,那么每一次数据写入都会触发两次日志写入操作:一次写入到分布式文件系统(Fluss WAL),一次写入到本地磁盘(RocksDB WAL)。 - 这会带来不必要的 I/O 开销和性能损耗(即“写放大”问题)。通过禁用 RocksDB 的 WAL,Fluss 确保了数据只被记录一次(在
LogTablet
中),然后异步地、批量地刷写到 RocksDB 的 SST 文件中,从而获得了更高的写入性能。
- 如果同时开启 Fluss 的
总结
可以这样理解它们的关系:Fluss 将 RocksDB 纯粹用作一个高性能的、支持主键查找的本地 KV 存储引擎,负责维护数据的最新状态快照。而关于数据的持久性、一致性和可恢复性,则完全由 Fluss 自己实现的、更上层的、分布式的 LogTablet
(WAL) 来保证。
这个设计是 Fluss 能够将流存储和KV 存储的优点结合起来,并成为一个真正的“流式数据湖存储层”的核心所在。
列式存储:RocksDB 列族 vs. Apache Arrow
在 Fluss 的语境下,“列式存储”主要指的是利用 Apache Arrow 格式实现的列式数据组织方式,而不是 RocksDB 的列族(Column Family)。
下面是详细解释:
RocksDB 的列族 (Column Family)
- 是什么: RocksDB 的列族是一种逻辑上的数据隔离机制。你可以把它想象成一个数据库实例中的多个“表”。所有的 KV 对都必须属于某一个列族(包括默认列族)。
- 物理存储: 它不是列式存储。在物理上,同一个列族内的数据仍然是按 Key-Value 对(即行式)存储在 SST 文件中的。
- 作用: 主要用于将不同类型或访问模式的 KV 对分离开,以便对它们应用不同的配置(如不同的压缩算法、不同的 MemTable 大小、不同的 Block Cache 策略等),从而进行性能调优。Fluss 使用它来管理 RocksDB 内部资源。
Apache Arrow
- 是什么: Arrow 是一个跨语言、跨平台的内存中列式数据格式。它定义了一套标准的、与语言无关的内存数据结构,用于高效地处理和传输分析型数据。
- 物理存储 (在内存中): Arrow 在内存中是真正的列式存储。同一列的所有值都存放在一块连续的内存中。
- 作用:
- 高效分析: 这种布局对于分析查询(OLAP)极为友好。例如,
SELECT a, b FROM table
这样的查询只需要读取 a 和 b 两列的内存,而无需扫描完整的行数据,这被称为“列裁剪”或“投影下推”。 - 向量化计算: CPU 可以利用 SIMD(单指令多数据流)指令对连续内存中的列数据进行批量计算,极大地提升了计算性能。
- 零拷贝传输: 由于格式标准化,不同的进程或系统(如 Java 进程和 Python 进程)之间可以共享 Arrow 格式的内存数据,无需进行昂贵的序列化和反序列化。
- 高效分析: 这种布局对于分析查询(OLAP)极为友好。例如,
Fluss 巧妙地将两者结合起来,发挥各自的优势:
- 在日志(WAL)中:
KvTablet
在生成 WAL 时,可以通过ArrowWalBuilder
将变更数据以 Arrow 格式写入LogTablet
。这意味着 Fluss 的变更日志流(Changelog)本身就是列式的,下游的 Flink 作业可以直接消费这种高效的列式数据进行实时分析。 - 在 KV 存储中:
KvTablet
将经过RowMerger
合并后的最终行数据,序列化成一个二进制的字节数组(BinaryRow
),然后将这个字节数组作为 Value,主键作为 Key,存入 RocksDB。
总结一下这个配合关系:
Fluss 使用 Apache Arrow 来组织其内存中和日志流中的数据,以获得列式存储带来的分析性能优势;同时,它利用 RocksDB 作为高性能的、行式的本地 KV 引擎,来持久化每一行的最新状态快照,并提供快速的主键查找能力。
格式转化
在 KvTablet
的设计中,Arrow 和 RocksDB 扮演着完全不同但又相辅相成的角色:
- Apache Arrow: 主要用于内存中和日志流 (Changelog) 的数据表示。它的列式结构为实时分析和高效数据传输提供了巨大优势。
- RocksDB: 作为本地持久化的 KV 存储引擎。它负责存储每个主键对应的最新状态的行数据,并提供快速的主键查找能力。
我们从数据进入 KvTablet
(putAsLeader
方法) 到最终持久化的完整路径来分析格式转换:
输入阶段
- 输入格式:
putAsLeader
方法的输入是KvRecordBatch
。在这个Batch
内部,数据是以KvRecord
的形式存在的,每条KvRecord
包含一个key
(主键) 和一个row
(行数据)。这个row
的具体格式是BinaryRow
,这是一种紧凑的、面向行的二进制格式,适合在 JVM 内部进行高效处理。
处理阶段 (putAsLeader
内部)
这是最关键的阶段,数据会在这里“兵分两路”,并发生格式转换。
第一路:写入日志流 (Log/Changelog)
目标: 将数据的变更过程(
+I
,-U
,+U
,-D
)记录下来,形成可供外部消费的日志流。转换过程:
在
putAsLeader
方法中,会调用createWalBuilder
来创建一个WalBuilder
。createWalBuilder
会根据配置的logFormat
来决定具体的WalBuilder
实现。关键代码如下:// ... existing code ... private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: // ... return new IndexWalBuilder(schemaId, memorySegmentPool); case ARROW: return new ArrowWalBuilder( schemaId, arrowWriterProvider.getOrCreateWriter( // ... rowType, arrowCompressionInfo), // ... existing code ...
当
logFormat
配置为ARROW
时,会创建一个ArrowWalBuilder
。在
putAsLeader
的循环中,当调用walBuilder.append(ChangeType, BinaryRow)
时,ArrowWalBuilder
内部会将传入的BinaryRow
(行式) 的数据转换并写入到其持有的VectorSchemaRoot
(Arrow 的内存列式表示) 中。最后,当调用
walBuilder.build()
时,ArrowWalBuilder
会将VectorSchemaRoot
中的列式数据序列化成一个批次,交给LogTablet
去持久化。
格式转换:
BinaryRow
(行式) -> Apache Arrow (列式)。这次转换是为了生成对分析查询友好的、高效的列式日志流。
第二路:更新 KV 状态
目标: 将每个主键对应的最新数据存入 RocksDB,以便快速查询。
转换过程:
- 在
putAsLeader
的循环中,经过RowMerger
合并后,得到一个代表最新状态的BinaryRow
。 - 调用
ValueEncoder.encodeValue(schemaId, newRow)
将这个BinaryRow
连同schemaId
一起打包成一个byte[]
。这个byte[]
保留了行式的二进制布局。 - 这个
byte[]
作为 Value,连同主键的byte[]
(Key),被写入KvPreWriteBuffer
(内存缓冲区)。 - 最终,当
KvPreWriteBuffer
刷盘时,这些 Key-Value 对被原封不动地写入 RocksDB。
- 在
格式转换: 几乎没有转换。
BinaryRow
被简单地封装成一个byte[]
存入 RocksDB。RocksDB 并不理解BinaryRow
的内部结构,它只是把它当作一个普通的字节数组来存储。
输出/读取阶段
- 从日志流读取 (Flink Consumer): Flink 连接器会从
LogTablet
读取数据。由于日志是以 Arrow 格式存储的,Flink 可以直接在内存中以零拷贝的方式处理这些列式数据,进行高效的实时分析。 - 从 KV 存储读取 (Point Lookup): 当需要根据主键查询数据时(例如在
putAsLeader
中获取oldValue
),KvTablet
会从 RocksDB 中根据 Key 取出对应的byte[]
Value。然后使用ValueDecoder
将这个byte[]
反序列化回BinaryRow
对象,以便在 JVM 中使用。
磁盘怎么存 Arrow
Apache Arrow 的主要设计目标是一个内存中的列式数据格式。那么它是如何被持久化到磁盘上的呢?
核心答案:通过序列化。 任何内存中的数据结构,只要定义一套规则将其转换成字节流(byte stream),就可以被写入磁盘。Arrow 就有一套标准的进程间通信(IPC)序列化格式。
我们结合代码来看这个过程:
构建内存中的 Arrow 数据 (
ArrowWalBuilder
) 在KvTablet.putAsLeader
方法中,当logFormat
为ARROW
时,会创建一个ArrowWalBuilder
。这个 Builder 内部持有一个ArrowWriter
,ArrowWriter
又管理着一个内存中的VectorSchemaRoot
(这就是 Arrow 在内存中的表示)。当walBuilder.append()
被调用时,数据被一行行地写入这个内存结构中,并被组织成列式。序列化为字节流 (
MemoryLogRecordsArrowBuilder
)ArrowWalBuilder
内部实际委托了MemoryLogRecordsArrowBuilder
来完成工作。当walBuilder.build()
被调用时,MemoryLogRecordsArrowBuilder
会执行最关键的一步:- 它会调用
arrowWriter.finish()
,这会把内存中VectorSchemaRoot
的数据,按照 Arrow IPC Stream Format 规范,序列化成一个完整的字节块。 - 然后,它在这个 Arrow 字节块前面,再包装上一个 Fluss 自定义的日志批次头(
LogRecordBatch Header
),这个头包含了魔数、Writer ID、序列号等元数据。
我们可以在
MemoryLogRecordsArrowBuilder.java
的build()
和writeBatchHeader()
方法中看到这个逻辑。// ... existing code ... public MultiBytesView build() throws IOException { if (aborted) { throw new IllegalStateException("Attempting to build an aborted record batch"); } if (bytesView != null) { if (resetBatchHeader) { // 关键点1: 写入Fluss自定义的批次头 writeBatchHeader(); } return bytesView; } // 关键点2: 调用 arrowWriter.finish() 完成内存中列式数据的构建 arrowWriter.finish(); // ... // 关键点3: 将序列化后的 Arrow 数据和 Header 组合成一个字节视图 (MultiBytesView) this.bytesView = pagedOutputView.wrapBytes(); writeBatchHeader(); return bytesView; } // ... existing code ...
- 它会调用
写入磁盘 (
LogTablet
)build()
方法返回的MultiBytesView
(本质上就是byte[]
) 会被传递给logTablet.appendAsLeader()
。LogTablet
并不关心这些字节内部是 Arrow 格式还是其他格式,它的职责就是将这个完整的字节块作为一个原子单元,写入到后端的持久化存储中(例如 HDFS、OSS 或本地文件系统)。
总结一下: 可以把这个过程类比为“保存一张图片”。图片在内存中可能是解压后的像素矩阵(类似 Arrow 的 VectorSchemaRoot
),当你点击“保存”时,程序会按照 PNG 或 JPEG 格式(类似 Arrow IPC 格式)将像素矩阵压缩并序列化成一个文件字节流,然后写入磁盘。
所以,Fluss 是将内存中的 Arrow 列式数据,通过标准的 IPC 格式序列化成字节流,再由 LogTablet
将这个字节流持久化到磁盘,从而实现了用 Arrow 格式存储日志。
写入 先写内存 buffer 再写 RocksDB 这里有格式转化?
数据 先进入 KvPreWriteBuffer
(内存缓冲区),后续再由后台任务批量刷写(flush)到 RocksDB。
那么这个路径上是否有格式转换呢?答案是:有一次关键的“编码”,但之后就没有“转换”了。
我们来梳理一下这条路径上的数据形态:
进入
putAsLeader
方法:- 输入数据是
KvRecordBatch
,其内部的行数据是BinaryRow
对象。这是一个结构化的 JVM 对象。
- 输入数据是
写入
KvPreWriteBuffer
之前:- 在
putAsLeader
的业务逻辑中(例如rowMerger.merge
之后),我们得到了一个最终要存入 KV 存储的、代表最新状态的BinaryRow
对象。 - 接下来,代码调用了
ValueEncoder.encodeValue(schemaId, newRow)
。 - 这就是关键的“编码”步骤。它将结构化的
BinaryRow
对象,连同一个 schemaId,打包成一个扁平的byte[]
字节数组。这个字节数组的内部布局是 Fluss 自定义的行式二进制格式。 - 这个
byte[]
作为 Value,连同主键(Key),被放入KvPreWriteBuffer
。
// ... existing code ... // ... walBuilder.append(ChangeType.UPDATE_AFTER, newRow); // ... // 关键的编码步骤在这里发生 kvPreWriteBuffer.put( key, ValueEncoder.encodeValue(schemaId, newRow), logOffset + 1); logOffset += 2; // ... existing code ...
- 在
从
KvPreWriteBuffer
到 RocksDB:KvPreWriteBuffer
本质上就是一个内存中的Map<byte[], byte[]>
。- 当
kvPreWriteBuffer.flush()
被调用时,它会取出这些缓存的byte[]
Key 和byte[]
Value。 - 然后,它将这些
byte[]
原封不动地交给RocksDBKv
的WriteBatch
。 RocksDB
接收到这些byte[]
,也不会去解析它们的内部结构,而是直接将它们作为不透明的二进制块(blob)存入 SST 文件。
总结一下: 在这个路径上,真正的格式变化只发生了一次:即在数据存入 KvPreWriteBuffer
之前,将 JVM 中的 BinaryRow
对象编码成 byte[]
。
此后,这个 byte[]
在从内存缓冲区 (KvPreWriteBuffer
) 到 RocksDB 的整个过程中,其格式和内容不再发生任何改变。所以,这里没有“多次转换”,而是一次“编码”和多次“搬运”。
总结
综合以上分析,我们可以清晰地回答:
Arrow 和 RocksDB 的存储关系是什么?
- 它们是分工明确的协作关系。
- Arrow 用于日志流 (Changelog),以列式格式存储数据的变更历史,服务于外部的实时分析场景。
- RocksDB 用于状态存储 (State Store),以行式 (Key-Value) 格式存储数据的最新快照,服务于内部的高性能点查和数据更新。
有多次格式转化吗?
- 是的,但转换是目标明确且高效的。
- 主要的格式转换发生在写入日志流的路径上:
BinaryRow
(行式) -> Arrow (列式)。这次转换是值得的,因为它为下游的分析应用创造了巨大的性能优势。 - 在写入 RocksDB 的路径上,格式转换非常轻微,基本上只是将
BinaryRow
包装成byte[]
,开销极小。 - 整个流程避免了不必要的、重复的转换。例如,数据进入系统后以
BinaryRow
形式存在,直到需要写入 Arrow 日志时才进行一次转换,而不是在每一步都进行序列化和反序列化。这种设计在性能和功能之间取得了很好的平衡。
Fluss 与 Flink 的“深度集成”体现在哪里?
Fluss 既是独立的服务器,又与 Flink 深度集成。 这听起来有些矛盾,但实际上是两个层面的事情。
作为独立服务器: Fluss 拥有自己的
coordinator-server
和tablet-server
进程。它们负责管理元数据、存储数据、处理读写请求、执行副本复制和分层存储等。从这个角度看,它和 Kafka、HBase 一样,是一个可以独立部署和运行的存储系统。 可以不使用 Flink,通过 Fluss 的 Java Client 直接与其交互。与 Flink 的深度集成: 这是 Fluss 设计的精髓所在,也是其宣称的“为实时分析而生的流存储”的核心。这种集成远超“提供一个 Connector”的范畴。
统一的元数据和 Catalog: Fluss 为 Flink 提供了
FlussCatalog
。当 在 Flink SQL 中执行CREATE TABLE
、ALTER TABLE
等 DDL 操作时,Flink 会通过这个 Catalog 直接操作 Fluss 的元数据。 不需要在 Flink 和 Fluss 两边分别建表,实现了元数据层面的统一。优化的读写路径:
fluss-flink-connector
不仅仅是一个简单的 Source/Sink。它利用了 Fluss 的特性进行了深度优化。例如,在读取时,它可以利用 Fluss 的列式存储格式进行列裁剪 (Column Pruning),只读取需要的列,大大减少了 I/O 和网络开销。计算下推 (Pushdown): 这是“深度集成”最典型的体现。Fluss 的路线图提到了更多的下推优化,如过滤下推 (Filter Pushdown)。这意味着 Flink SQL 中的
WHERE
条件可以直接下推到 Fluss 的tablet-server
执行。数据在存储节点就被过滤掉了,只有符合条件的数据才会通过网络传输给 Flink 的计算节点,极大地提升了性能。这是普通 Connector 很难做到的。流式更新与 Delta Join: Fluss 支持高性能的 Key-Value 式流式更新(Upsert),这与 Flink 的 Retraction/Upsert 机制完美契合。Fluss 的路线图还提到了支持新的 Delta Join,这旨在解决 Flink 中流与流 Join 的状态管理和性能痛点,这需要存储层(Fluss)和计算层(Flink)的协同设计。
小结: Fluss 的独立性体现在部署和基础服务上,而其与 Flink 的深度集成则体现在计算与存储的协同优化上,目标是将部分计算逻辑下推到离数据最近的地方,从而实现极致的实时分析性能。
Flink、Fluss、Paimon 是如何配合的?
这三者的关系构成了一个典型的流式湖仓 (Streaming Lakehouse) 架构,实现了数据在不同生命周期的分层存储和统一访问。
Fluss (热层/流存储层): 负责存储近期的、实时的数据。它提供毫秒级的读写延迟,支持高频的流式更新和查询。可以把它看作是数据的“高速缓存”和“实时入口”。
Paimon (温/冷层/湖存储层): 负责长期、海量数据的归档和存储。Paimon 基于 HDFS/S3 等对象存储,成本极低,适合存储全量历史数据。它虽然也支持更新,但延迟通常在分钟级,更适合批处理和近实时分析。
Flink (计算引擎层): 作为统一的计算引擎,串联起整个数据流。
它们的配合流程如下:
- 数据写入: 所有实时数据首先通过 Flink 作业写入 Fluss 表。
- 分层到 Paimon (Tiering): 这是关键的一步。Fluss 内部有一个异步的后台服务,负责将 Fluss 中“变冷”的数据(即不再频繁更新的旧数据)自动、透明地迁移到 Paimon 表中。这个过程对用户是无感知的。
- 统一查询: 当用户通过 Flink SQL 查询一张表时,
FlussCatalog
会呈现一个统一的视图。Flink 会智能地判断:- 如果查询的是近期数据,请求会发往 Fluss。
- 如果查询的是历史数据,请求会发往 Paimon。
- 如果查询的时间范围横跨了热层和冷层,Flink 会分别从 Fluss 和 Paimon 读取数据,并在计算层进行合并(Union),最终返回完整的结果。
从文档中可以看到,这种湖仓一体的架构是 Fluss 的核心卖点之一。
2025-08-07-taobao-practice.md
// ... existing code ...
Under the Fluss lake-stream integrated architecture, Fluss provides a **fully managed** unified data service.
Fluss and Paimon store stream and lake data respectively, output a Catalog to the computing engine (such as Flink), and the data is output externally in the form of a unified table.
Consumers can directly access the data in Fluss and lake storage in the form of Union Read.
// ... existing code ...
如何建立 Paimon 提交?
Paimon 的数据写入和 Schema 变更等操作 强依赖 Flink 作业。那么,Fluss 是如何在后台自动将数据 Tiering 到 Paimon 的呢?
答案是:Fluss 将这个 Tiering 过程本身也实现成了一个 Flink 作业。
Fluss写远程存储 和 写Paimon是独立的
从文档 quickstart/flink.md
中我们可以找到明确的证据:
flink.md
// ... existing code ...
# Real-Time Analytics With Flink
## Integrate with Paimon
### Start the Lakehouse Tiering Service
To integrate with [Apache Paimon](https://paimon.apache.org/), you need to start the `Lakehouse Tiering Service`.
Open a new terminal, navigate to the `fluss-quickstart-flink` directory, and execute the following command within this directory to start the service:
```shell
docker compose exec jobmanager \
/opt/flink/bin/flink run \
/opt/flink/opt/fluss-flink-tiering-$FLUSS_VERSION$.jar \
--fluss.bootstrap.servers coordinator-server:9123 \
--datalake.format paimon \
--datalake.paimon.metastore filesystem \
--datalake.paimon.warehouse /tmp/paimon
这个命令清晰地揭示了整个机制:
1. `docker compose exec jobmanager ...`: 这个命令是在 Flink 的 JobManager 容器内执行的。
2. `/opt/flink/bin/flink run ...`: 这正是在向 Flink 集群提交一个作业。
3. `/opt/flink/opt/fluss-flink-tiering-....jar`: 这个 JAR 包就是 Fluss 提供的“分层服务”作业。
这个特殊的 Flink 作业启动后,会常驻在 Flink 集群中。它的逻辑是:
* 作为消费者,从 Fluss 中读取数据。
* 作为生产者,将读取到的数据写入 Paimon。
这样就巧妙地解决了“Paimon 依赖 Flink”的问题—— Fluss 把自己的一个核心后台功能(数据分层)打包成了一个 Flink 应用,利用 Flink 的能力来完成向 Paimon 的数据同步 。这再次体现了 Fluss 与 Flink 生态的深度融合。
FlussLakeTieringEntrypoint
FlussLakeTieringEntrypoint.java
这个文件本身,就是最有力的证据,证明了“消费 Fluss 写 Paimon”的过程是一个外部的、独立的 Flink 作业,而不是 Fluss Server 的内置功能。
让我们逐行分析代码中的关键线索:
// ... existing code ...
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
public class FlussLakeTieringEntrypoint {
private static final String FLUSS_CONF_PREFIX = "fluss.";
public static void main(String[] args) throws Exception {
// 1. 这是一个标准的 Java 应用入口,用于被 `flink run` 命令调用
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
Map<String, String> paramsMap = params.toMap();
// 2. 从命令行参数中提取 Fluss 的连接信息
// 这表明它是一个客户端,需要连接到远端的 Fluss 集群
Map<String, String> flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
if (bootstrapServers == null) {
throw new IllegalArgumentException(
String.format(
"The bootstrap server to fluss is not configured, please configure %s",
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
}
// ... existing code ...
// 3. 获取 Flink 的执行环境,这是构建 Flink 作业的标准步骤
final StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
// 4. 构建并提交一个 Flink 作业
JobClient jobClient =
LakeTieringJobBuilder.newBuilder(
execEnv,
Configuration.fromMap(flussConfigMap),
Configuration.fromMap(lakeConfigMap),
dataLake)
.build();
System.out.printf(
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
dataLake, jobClient.getJobID());
}
}
结论: 这段代码清晰地表明,fluss-flink-tiering.jar
包是一个标准的 Flink 作业。它通过 main
函数启动,从命令行解析参数(如 Fluss 的地址 fluss.bootstrap.servers
),然后构建一个 Flink 的 StreamExecutionEnvironment
并提交作业。
这完全证实了我们之前的讨论:它是一个运行在 Flink 集群上的独立进程,其角色是 Fluss 的一个“消费者”和 Paimon 的一个“生产者”。
为什么不直接由 Flink 消费写 Paimon?(为什么需要第一份拷贝?)
FlussLakeTieringEntrypoint
启动的这个 Flink 作业,其本质是一个由 Fluss Server 协调和管理的、常驻的、分布式的 ETL (Extract-Transform-Load) 作业。
让我们看看源码是如何证明这一点的:
这个过程不是 Flink 作业“想拉就拉”。它是一个“拉(Pull)”模型,但拉取的内容和时机受到了 Fluss Server 的严格控制。
证据 1:
LakeTableTieringManager.java
(Fluss Server 端) 这个类存在于fluss-server
中,是整个分层服务的“大脑”。它维护着所有需要分层到数据湖的表的状态(tieringStates
)、心跳(liveTieringTableIds
)和调度队列(pendingTieringTables
)。它决定了哪个表、在什么时候、应该被分层。证据 2:
TieringSourceEnumerator.java
(Flink 作业端) 这是 Flink 作业的“协调员”。它并不盲目地读取数据。如文档和源码所示,它会定期(由tiering.poll.table.interval
控制)向 Fluss Server 的LakeTableTieringManager
发送心跳和请求(lakeTieringHeartbeatRequest
),询问:“我应该处理哪些任务(表)?”证据 3:
website/blog/2025-07-01-tiering-service.md
这篇博文的架构图清晰地画出了这个交互流程:Enumerator
->CoordinatorService
->Enumerator
->Reader
。这证实了 Flink 作业是一个“听从指挥的工人”,而不是“自由行动的强盗”。
结论:这个过程由 Flink 作业执行,但其生命周期和任务分配完全由 Fluss Server 端的 LakeTableTieringManager
进行协调。
做什么?—— 数据转换与重组织 (“Compaction”)
这个 Flink 作业的核心价值在于转换和重组织,为分析场景优化数据。
证据 1:
LakeTieringFactory.java
和PaimonLakeTieringFactory.java
这是一个典型的工厂模式。FlussLakeTieringEntrypoint
会根据配置(--datalake.format paimon
)加载PaimonLakeTieringFactory
。这个工厂会创建:LakeWriter
: 负责将从 Fluss 读出的LogRecord
写入 Paimon。这个过程包含了数据格式的适配和转换。LakeCommitter
: 负责将写入的文件提交到 Paimon,形成 Paimon 的快照。
证据 2:
TieringSourceReader.java
它从Enumerator
接收到任务分片(TieringSplit
)后,会真正地从 Fluss 读取数据,然后调用LakeWriter
将数据写入目标数据湖。
这个过程实现了从“为流优化的列存(Fluss/Arrow)”到“为分析优化的列存(Paimon/Parquet)”的转变。Paimon 在接收到这些数据后,会进一步执行其内部的 Compaction 策略,将小文件合并成大文件,但这都得益于 Flink Tiering 作业为其准备好了“原料”。
如何保证一致性?—— 双重提交 (Two-Phase Commit)
这个过程的可靠性由一个精巧的“双重提交”机制保证。
证据 1:
TieringCommitOperator.java
这是 Flink 作业中的一个关键算子。它的逻辑非常清晰:- 调用
lakeCommitter.commit()
,将数据先提交到 Paimon,并获取一个 Paimon 的快照 ID。 - 如果第一步成功,再调用 flussTableLakeSnapshotCommitter
.commit()
,将这个 Paimon 的快照 ID 写回 Fluss Server。
- 调用
证据 2:
TieringCommitOperatorTest.java
测试用例verifyLakeSnapshot
验证了这个行为:它断言admin.getLatestLakeSnapshot(tablePath)
能从 Fluss Server 中获取到正确的、刚刚提交的 Paimon 快照信息。
这个机制确保了 Fluss 和 Paimon 之间的元数据同步,从而实现了我们之前讨论的 Union Read
。
现在我们可以给出最精确的结论了:
第一份数据(Fluss 内置 Tiering):是 Fluss 自身的物理备份和冷存储。它将本地磁盘的原生日志文件原封不动搬到对象存储,目的是降本和支持长周期流式回溯。它是一种存储介质的替换。
第二份数据(Paimon 中的数据):是由一个受 Fluss Server 协调的、独立的 Flink ETL 作业生成的。它不是物理文件的拷贝,而是逻辑数据的转换和重组织。其目的是将数据转化为为分析而优化的、高度“紧凑(Compacted)”的格式和布局,以服务于大规模分析查询。
所以,第二个过程 是一个复杂的、有价值的 ETL 过程。而这个过程之所以独立为一个 Flink 作业,是为了利用 Flink 强大的分布式计算能力、状态管理和生态连接性,同时与 Fluss Server 解耦,使其可以独立伸缩和管理。
Lakehouse (湖仓) 简介(改自官网)
Lakehouse 是一种新兴的开放式架构,它融合了数据湖和数据仓库各自的优点,集数据湖的可扩展性与成本效益和数据仓库的可靠性与高性能于一身。诸如 Apache Iceberg、Apache Paimon、Apache Hudi 和 Delta Lake 等知名的数据湖格式在 Lakehouse 架构中扮演着关键角色,它们在一个统一的平台内,促进了数据存储、可靠性和分析能力之间的和谐平衡。
作为一种现代化架构,Lakehouse 能有效应对数据管理和分析的复杂需求。但由于其实现方式的限制,它们很难满足要求亚秒级数据新鲜度的实时分析场景。使用这些数据湖格式时,会陷入一个两难的境地:
如果要求低延迟,就需要频繁地写入和提交,这意味着会产生大量微小的 Parquet 文件。这会导致读取效率低下,因为查询必须处理海量的小文件。
如果要求高读取效率,就需要累积数据,直到可以写入较大的 Parquet 文件,但这又会带来高得多的延迟。
总而言之,即使在最优的使用条件下,这些数据湖格式通常最多也只能实现分钟级的数据新鲜度。
Streaming Lakehouse:统一流与湖仓
Fluss 是一种流式存储,支持亚秒级低延迟的流式读写。通过结合 Lakehouse 存储,Fluss 在 Lakehouse 之上提供实时流数据服务,从而统一了数据流和数据湖仓。这不仅为数据湖仓带来了低延迟,也为数据流赋予了强大的分析能力。
为了构建 Streaming Lakehouse,Fluss 维护了一个分层服务,该服务将来自 Fluss 集群的实时数据合并(compact)成数据湖格式,并存入 Lakehouse 存储中。Fluss 集群中的数据以流式 Arrow 格式存储,针对低延迟读写操作进行了优化,是短期数据存储的理想选择。
相比之下,Lakehouse 中经过合并的数据以压缩率更高的 Parquet 格式存储,针对高效分析和长期存储进行了优化。因此,Fluss 集群中的数据构成了实时数据层,可保留数天的数据并达到亚秒级新鲜度;而 Lakehouse 中的数据则构成了历史数据层,可保留数月的数据并达到分钟级新鲜度。
Streaming Lakehouse 的核心功能
Streaming Lakehouse 的核心思想是流与湖仓之间共享数据和共享元数据,从而避免了数据冗余和元数据不一致的问题。它提供的一些强大功能包括:
统一元数据 :Fluss 为流中和湖仓中的数据提供了统一的表元数据。因此,用户只需管理一张表,就可以访问实时流数据、历史数据,或是两者的并集。
联合读取 (Union Reads) :计算引擎在查询这张表时,将读取实时流数据和 Lakehouse 数据的并集。目前,只有 Flink 支持联合读取,但对更多引擎的支持已在规划中。
实时湖仓 :联合读取帮助 Lakehouse 从近实时分析演进到真正的实时分析。这使企业能够从实时数据中获得更有价值的洞察。
可分析的数据流 :联合读取帮助数据流拥有强大的分析能力。这降低了开发流式应用的复杂性,简化了调试过程,并允许即时访问实时数据洞察。
连接湖仓生态 :在将数据合并到 Lakehouse 的同时,Fluss 会保持表元数据与数据湖目录的同步。这使得像 Spark、StarRocks、Flink、Trino 这样的外部引擎可以通过连接数据湖目录来直接读取数据。
目前,Fluss 支持使用 Paimon 作为 Lakehouse 存储,对更多数据湖格式的支持已在规划之中。
Paimon
Apache Paimon 创新性地将湖格式(lake format)与 LSM(日志结构合并树)结构相结合,为湖仓(lake)架构带来了高效的更新能力。要将 Fluss 与 Paimon 集成,必须启用湖仓存储并配置 Paimon 作为湖仓存储。
当创建或修改表时,如果设置了选项 'table.datalake.enabled' = 'true',Fluss 将自动创建一个具有相同表路径的对应 Paimon 表。该 Paimon 表的 schema(模式)与 Fluss 表的 schema 相匹配,只是在末尾额外增加了三个系统列:bucket、offset 和 __timestamp。
这些系统列帮助 Fluss 客户端以流式方式从 Paimon 消费数据,例如通过偏移量(offset)或时间戳(timestamp)来定位到特定的存储桶(bucket)。
Flink SQL
USE CATALOG fluss_catalog;
CREATE TABLE fluss_order_with_lake (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH (
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '30s'
);
然后,数据湖仓分层服务会持续地将数据从 Fluss 分层到 Paimon。参数 table.datalake.freshness 控制着 Fluss 向 Paimon 表写入数据的频率。默认情况下,数据新鲜度为 3 分钟。
对于主键表,变更日志(changelogs)也会以 Paimon 格式生成,从而可以通过 Paimon API 进行流式消费。
从 Fluss 0.7 版本开始,还可以在创建启用了 datalake 的 Fluss 表时,通过在 Fluss 表属性子句中使用 paimon. 前缀来指定 Paimon 表的属性。
CREATE TABLE fluss_order_with_lake (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH (
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '30s',
'paimon.file.format' = 'orc',
'paimon.deletion-vectors.enabled' = 'true'
);
例如,可以指定 Paimon 的 file.format 属性来更改 Paimon 表的文件格式,或者设置 deletion-vectors.enabled 来为 Paimon 表启用或禁用删除向量(deletion vectors)。
读取表
使用 Apache Flink 读取
对于设置了 'table.datalake.enabled' = 'true' 的表,其数据存在于两个层次:一层保留在 Fluss 中,另一层已经分层到 Paimon。
可以选择该表的两种视图:
仅 Paimon 视图,提供分钟级的延迟,但具有更好的分析性能。
Fluss 和 Paimon 数据的组合视图,提供秒级的延迟,但查询性能可能会略有下降。
只读取 Paimon 中的数据
要只读取存储在 Paimon 中的数据,请在表名中使用 $lake 后缀。以下示例演示了这一点:
-- 假设我们有一个名为 `orders` 的表
-- 从 Paimon 读取
SELECT COUNT(*) FROM orders$lake;
-- 我们也可以查询系统表
SELECT * FROM orders$lake$snapshots;
当在查询中指定 $lake 后缀时,该表的行为就像一个标准的 Paimon 表,并继承其所有功能。 这使可以充分利用 Flink 对 Paimon 的查询支持和优化,例如查询系统表、时间旅行(time travel)等。 更多信息,请参阅 Paimon 的 SQL 查询文档。
联合读取 Fluss 和 Paimon 中的数据
要读取包含 Fluss(新)数据和 Paimon(历史)数据的完整数据集,只需查询不带任何后缀的表即可。以下示例说明了这一点:
-- 查询将联合 Fluss 和 Paimon 的数据
SELECT SUM(order_count) AS total_orders FROM ads_nation_purchase_power;
这个查询的运行速度可能比只从 Paimon 读取要慢,但它返回的是最新的数据。如果多次执行该查询,由于数据的持续摄入,可能会观察到不同的结果。
使用其他引擎读取
由于从 Fluss 分层到 Paimon 的数据是作为标准 Paimon 表存储的,可以使用任何支持 Paimon 的引擎来读取它。以下是使用 StarRocks 的一个示例:
首先,在 StarRocks 中创建一个 Paimon catalog:
CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES (
"type" = "paimon",
"paimon.catalog.type" = "filesystem",
"paimon.catalog.warehouse" = "/tmp/paimon_data_warehouse"
);
注意:paimon.catalog.type 和 paimon.catalog.warehouse 的配置值必须与在 server.yaml 中为 Fluss 配置 Paimon 作为湖仓存储时使用的值相匹配。
然后,可以使用 StarRocks 查询 orders 表:
-- 该表位于数据库 `fluss` 中
SELECT COUNT(*) FROM paimon_catalog.fluss.orders;
-- 查询系统表以查看表的快照
SELECT * FROM paimon_catalog.fluss.enriched_orders$snapshots;
数据类型映射
与 Paimon 集成时,Fluss 会自动在 Fluss 数据类型和 Paimon 数据类型之间进行转换。 下表显示了 Fluss 数据类型和 Paimon 数据类型之间的映射关系:
Fluss 数据类型 |
Paimon 数据类型 |
---|---|
BOOLEAN |
BOOLEAN |
TINYINT |
TINYINT |
SMALLINT |
SMALLINT |
INT |
INT |
BIGINT |
BIGINT |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DECIMAL |
DECIMAL |
STRING |
STRING |
CHAR |
CHAR |
DATE |
DATE |
TIME |
TIME |
TIMESTAMP |
TIMESTAMP |
TIMESTAMP WITH LOCAL TIMEZONE |
TIMESTAMP WITH LOCAL TIMEZONE |
BINARY |
BINARY |
BYTES |
BYTES |