从一到无穷大 #25 DataFusion:可嵌入,可扩展的模块化工业级计算引擎实现

发布于:2024-05-09 ⋅ 阅读:(25) ⋅ 点赞:(0)

在这里插入图片描述本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。

引言

InfluxDB3.0[11],Greptime使用DataFusion,Lindorm TSDB使用Apache Calcite[8],Db2 Event Store使用内部维护的成熟计算引擎[10],这些业界知名的项目向我们宣告了模块化组件正在推动高性能基础架构领域的发展,计算引擎(DataFusion,Calcite,Velox),一致性引擎(braft,etcd raft),序列化工具(DataFusion-Substrait,),远程数据读取服务(OpenDAL)等已经足够成熟,从现在构建一个项目并不一定要自己实现本身属于公有链路的功能,而应该把精力放在更多增值功能之上

事实上不止我这样想,Andy Pavlo在《Databases in 2022: A Year in Review》中提到:

The long-term trend to watch is the proliferation of frameworks like Velox, DataFusion, and Polars. Along with projects like Substrait, the commoditization of these query execution components means that all OLAP DBMSs will be roughly equivalent in the next five years.

使用开源的执行引擎,所有玩家都将具备同 Snowflake 十年前独有的相同向量化执行能力,当存储层对每个人来说都是相同时(云盘/对象存储),区分 DBMS 产品的关键因素将会是那些难以量化的事物,比如稳定性,UI/UX 设计,查询优化等。好在并不是所有人存储层都一致,这意味着我们可以基于不同的场景设计不同的存储引擎,针对不同场景的存储引擎插件化以及智能化引擎参数调优,并佐以智能索引构建,cache等外部能力建设有竞争力的产品。

这事实上也是我们做多模型数据库的初衷,即用最少的人力/代码完成承载更多的业务,控制面资源管理,调度,arbiter,路由推送,分池,读写链路转化,一致性引擎,wal模块(写入,拉取,同步),备份,多region,部分监控能力等模块共用,协议解析与引擎插件化。

架构总览与可扩展性

请添加图片描述

Datafusion的总体架构可以看作七个部分(数据源,Front End,逻辑计划,逻辑计划优化,执行计划,执行计划优化,执行引擎),与传统高度耦合的系统不同,Datafusion的每个部分用户都可以自由扩展其功能特性,这样不仅可以最大化使用高效稳定的开源代码,也可以在不修改开源代码的情况下实现不同系统的定制功能,我个人认为这就是一个组件类开源项目最大的优势。

Catalog and Data Sources

Datafusion提供了一个目录来存储元数据,比如表格结构,类型,统计信息等,这个信息可以协助执行计划执行时跳过部分数据文件。

对于数据源DataFusion实现了常用文件格式的读取模块,例如Apache Parquet, Apache Avro, JSON, CSV, Apache Arrow IPC files,这些都基于TableProviders模块实现,而且实现了投影,过滤和limit/offset下推的功能。用户也可以使用TableProviders API实现额外的读取模块。

Front End

DataFusion直接使用Arrow的类型系统,在执行的过程中直接使用Arrow数组或者单个值的形式传递数据。

DataFusion 支持sql和DataFrame API,并使用相同的底层逻辑计划表示,当然存在部分系统需要更高级的解析特性,LogicalPlanBuilder API提供了Rust builder-style的接口,用于直接构建逻辑计划

逻辑计划与逻辑计划优化器

DataFusion首先实现了LogicalPlan层面上表示和评估表达式树和关系运算符的全套结构;其次实现了允许使用protobuf和 Substrait 将这些结构序列化/反序列化到适合网络传输的格式;最后携带了解析阶段可能已知的统计信息,如row count、null count和最小/最大值。

除此之外,DataFusion还实现了simplification, interval analysis以及range propagation(减少数据扫描,索引选择,子查询优化,join条件优化)等查询优化策略,并实现了一套ReWrite框架

另外DataFusion又有一个庞大的函数库,可以使用sql和DataFrame 调用,这些函数通过操纵Arrow 数组,使用与用户定义函数相同的API来实现

执行引擎

DataFusion的执行引擎从论文来看没有使用什么巧妙的优化技巧,就是使用pull-base的火山式模型流式执行,允许在多个内核之间分配工作,然后用上一些基础但是有效的实现方式。但是对于我来说还是有很多值得学习的地方,因为我们内部实现了一个完整的执行引擎,用于在时序数据库中支持influxql,当前虽然承载了公司内部大多数的监控数据,但是仍旧属于一个及其早期的阶段。项目内部认为项目初期阶段项目敏捷性,执行引擎的性能,可扩展性,稳定性,远高于其他模块,所以一个成熟执行引擎很多必须的功能都是缺失的,比如论文中提到的很多特性。

  1. 流式执行:所有的运算符都以Arrow 数组的形式递增输出,为了实现矢量化执行,每次拉取都是默认为大小为8192行的RecordBatches,对于full sort, final aggregation, hash join等pipelinebreaking的操作,运算符会对数据进行缓冲,必要时会溢出到磁盘。溢出磁盘这个动作目前来看,是一个极其有效的方法,在实际运营过程中,通常会遇到一个页面一次拉起几十个七天的数据查询,一次就会把时序数据库接入层机器的内存打满,一般我们的接入机都使用CVM,带着一块500GB的cbs盘,但是一般都使用不上,如果可以提供这种数据溢出磁盘的功能,不仅可以防止机器的OOM,而且可以利用上cbs盘的容量。
  2. 多核执行:每个 ExecutionPlan 都使用一个或多个并行执行的 Stream来运行。大多数 Streams 只与它们的输入进行协调,但有些 Streams 必须与同级 Streams 进行协调,如 HashJoinExec 在构建共享哈希表时,或 RepartitionExec 在将数据重新分配到不同 Streams 时。为每个 ExecutionPlan 创建的流的数量称为其分区,分区在执行计划时确定。当然执行引擎的并行化实现还是比较复杂的。我们在存储引擎内部的计算引擎中为了控制并行的数量,控制了一个pod内部允许使用的执行流数量,并控制一个查询允许使用的最大执行流数量。目前来看读取数据(seriesfile,tsi,tsm),解码,过滤,归并操作都可以并行。在计算节点内,计算引擎的并行就麻烦了,为了简化执行计划的过程,目前我们只有解码,归并可以做到并行。
  3. 线程调度:使用tokio作为async-runtime,运行时的实现其实也很有意思,很久以前我使用cpp实现过协作式和抢占式的协程框架,也在dragonfly看到了对于boost.fiber的使用,调度包括不仅限与线程间任务窃取,内部状态trace等。我们内部的多线程管理使用task-flow框架搭配future模型,与协程的使用属于两种异步方式的实现。[4]
  4. 内存管理:DataFusion 使用 MemoryPool 管理内存,一个或多个并发运行的查询共享 MemoryPool。当内存消耗发生重大变化时,Stream会通过调用grow and shrink API 记录。Stream使用一种实用的方法,准确跟踪最大的内存消耗(如用于hash merge的hashTable),但不跟踪小的短暂分配(如当前输出batch的内存)。DataFusion 有两种内置内存池实现。GreedyPool:会强制每个进程的内存限制,但不会试图在查询中将资源公平地分配给各个Stream。FairPool:在所有pipeline-breaking Streams之间平均分配资源。**基于 DataFusion 的系统也允许使用相同的 API,通过特定域策略实现自己的 MemoryPool。**以我们内部的实现来看,因为内部的row没有使用Arrow,而是自己实现的,内存占用不算少,在每个算子迭代器中如果有必要都会生成新的row,这其实会造成大量的内存浪费,还好使用的是cpp,我们可以精确的控制内存使用过的生命周期,但事实上还是有不少的内存占用,所以如果我们要这样做的话也需要记录输出batch的内存。
  5. Cache管理:CacheManager 会缓存目录内容(如昂贵的对象存储 LIST 操作)和每个文件的元数据,如规划和剪枝所需的统计信息。和内存管理一样,需要更多定制策略的用户可提供自己的实现(如驱逐策略或限制临时空间)。在独立的系统中,时序数据库会使用更多的cache策略,当然这是计算引擎之山的行为。另外我也怀疑基于DataFusion实现对象存储的缓存是否是一个好主意,可以给予[9]看到高效的利用对象存储是一个与业务极度挂钩的事情,需要包含对象存储读取线程和实际处理线程的均衡,其次各种业务相关的cache方案也是多种多样的,当然不知道DataFusion有没有提供类似接入API。

优化

查询重写

逻辑计划重写包括projection pushdown, filter pushdown, limit pushdown, expression simplification, common subexpression elimination, join predicate extraction, correlated subquery flattening, and outer-to-inner join conversion。

我们的实现中因为不支持join,所以不支持join predicate extraction和outer-to-inner join conversion;我们支持子查询,但是也不支持 common subexpression elimination和correlated subquery flattening,而projection pushdown, filter pushdown, limit pushdown, expression simplification是支持的。

执行计划重写包括eliminating unnecessary sorts, maximizing parallel execution, and determining specific algorithms(hash merge)。
在我们的系统中支持 maximizing parallel execution, and determining specific algorithms,但是eliminating unnecessary sorts是不支持的。

Sorting

DataFusion中的实现基于[5]。实现了独立的 RowFormat,内存耗尽时溢出到临时磁盘文件的能力,以及对 LIMIT(又名 “Top K”)的专门实现。

我们的实现中也有独立的RowFormat,允许灵活的基于各种排序键进行排序(time,group by tags, order by keys等),其次除了精确的limit实现外,还支持了HyperLogLog,用于快速的,可控资源的返回近似的topk。

但是我们没有内存耗尽时溢出到临时磁盘文件的能力,这个能力可以考虑后续建设,是一个很好的容忍突发大查询的方法。

其他

Grouping,Aggregation,Joins,Window Functions[6]我理解被定义为优化似乎有些牵强,因为基本上所有的计算引擎都会需要这些功能。
Pushdown,Leveraging Sort Order可以被认为是优化,但是也被广泛使用。

在时序数据库中Pushdown是一个极其重要的特性,不仅是Pushdown,还包括SubProcess,这个特性更多的被翻译为协处理器,就是查询被分为N个部分发送到N个节点执行计算操作,随后在一个聚合节点进行二次聚合,利用节点并行的能力带来更多的性能,这个方法使得查询能力几乎可以无限扩展,比如物理pod 54个,用户的表包含54个分片,合理分布下,表所属副本主节点应该均匀的分配在pod之间,那么这个查询可以利用54个pod的计算能力,不仅是腾讯云CTSDBi,谷歌的Monarch[7]和阿里云的Lindorm[8]也深度使用了这个能力。

性能

事实上这样一个拥有活跃社区,且目的明确的开源项目可想而知性能一定会不断演进,目前的性能报告更多的是给大家吃一颗定心丸,也是一种亮出自己拳头的方式。
在这里插入图片描述
ClickBench在单核心与DuckDB各有优劣。

在这里插入图片描述
ClickBench在多核心与DuckDB也是各有优劣。

事实上哪种查询在哪种数据集下谁为什么比谁强,以后能不能做的比彼此更好,已经不是什么重要的事情了,查询引擎这种这么多年没有大的变化的东西,本来就是谁投入大,谁就是大爹,工业性如此强的一种模块想要三个人做个一年半载比别人强基本上不是现实的事情。所以这篇论文中性能对比对绝大多数人来说只需要知道DataFusion具有工业级计算引擎的能力就够了。

结束语

以时序数据库领域看,我认为理论最优架构的InfluxDB3.0与Greptime随着不断的投入,基本性能将趋于一致。但是它强任他强,华为云GaussDB,阿里云Lindorm,腾讯云CTSDBi,TDengine作为架构相对类似的四个系统,虽然投入人力不太对等,但是大家之间的优化方式基本透明,随着时间的发展,最多两年,性能不会有数量级别的差异,刚需功能也会基本对齐,公有云客户看重的更多是价格和稳定性,性能很多时候已经不是最核心的点了(当然性能好赚的多也是实话)。

而Apache IotDB作为学院派作品,基本引领了全球时序数据库近几年科研的方向,每年vldb/sigmod/icde发到手软,但是基本上把自己tsfile的书状索引格式更多的把自己放在拿下传统物联网企业的订单,而且更多的注重边云协同,更多的偏向于解决方案,与公有云要吃的不是一块肉。

参考:

  1. 从一到无穷大 #8 Arrow,Parquet and ORC
  2. Apache arrow datafusion到底是什么?它是伪需求吗?和spark引擎有什么区别?
  3. Morsel-Driven Parallelism: 一种NUMA感知的并行Query Execution框架
  4. CeresDB|Tokio 任务调度原理分析
  5. Implementing sorting in database systems. ACM Comput. Surv. 38, 3 (2006)
  6. Efficient Processing of Window Functions in Analytical SQL Queries vldb2015
  7. 从一到无穷大 #12 Planet-Scale In-Memory Time Series Database, Is it really Monarch?
  8. 从一到无穷大 #13 How does Lindorm TSDB solve the high cardinality problem?
  9. 从一到无穷大 #22 基于对象存储执行OLAP分析的学术or工程经验,我们可以从中学习到什么?
  10. 从一到无穷大 #17 Db2 Event Store,A Purpose-Built IoT Database Engine
  11. Flight, DataFusion, Arrow, and Parquet: Using the FDAP Architecture to build InfluxDB 3.0
  12. 万字带你走过数据库的这激荡的三年

网站公告

今日签到

点亮在社区的每一天
去签到