Apache Paimon Spark引擎解析

发布于:2024-02-29 ⋅ 阅读:(125) ⋅ 点赞:(0)
1.环境准备

Paimon 当前支持 Spark 3.5, 3.4, 3.3, 3.2 和 3.1,为体验更好的功能,建议使用最新的 Spark 版本。

Version	     Jar
Spark 3.5	   paimon-spark-3.5-0.7.0-incubating.jar
Spark 3.4	   paimon-spark-3.4-0.7.0-incubating.jar
Spark 3.3	   paimon-spark-3.3-0.7.0-incubating.jar
Spark 3.2	   paimon-spark-3.2-0.7.0-incubating.jar
Spark 3.1	   paimon-spark-3.1-0.7.0-incubating.jar
2.指定Paimon Jar文件

如果使用了 HDFS 文件系统,确保设置了环境变量 HADOOP_HOME 或者 HADOOP_CONF_DIR。

当启动 spark-sql 时,将paimon jar文件的路径附加到--jars参数中。

spark-sql ... --jars /path/to/paimon-spark-3.3-0.7.0-incubating.jar

或者,在Spark安装目录的spark/jars下添加paimon-spark-3.3-0.7.0-incubating.jar

3.指定Paimon Catalog

使用 Paimon Catalog

启动spark-sql时,使用以下命令注册Paimon的Spark Catalog,仓库的表文件存储在/tmp/paimon下。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用spark.sql.catalog.(catalog_name)下的属性进行配置,在上述情况下,"paimon"是目录名称。

spark-sql命令行启动后,运行以下SQL以创建并切换到数据库default

USE paimon;
USE default;

切换到catalog ('USE paimon')后,Spark的现有表将无法直接访问,可以使用spark_catalog.${database_name}.${table_name}访问Spark表。

使用 Spark 通用的 Catalog

启动spark-sql时,使用以下命令注册Paimon的Spark通用Catalog,以替换Spark默认的Catalog即spark_catalog(默认仓库为 spark.sql.warehouse.dir

目前,仅建议在Hive metastore的情况下使用Spark Generic Catalog,Paimon将从Spark session中推断Hive conf,只需配置Spark的Hive conf即可。

spark-sql ... \
    --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用Spark Generic Catalog,可以在此Catalog中使用Paimon表或非Paimon表,如Spark的csv、parquet、Hive表等。

4.创建表

Paimon Catalog

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

Spark Generic Catalog

create table my_table (
    k int,
    v string
) USING paimon
tblproperties (
    'primary-key' = 'k'
) ;
5.插入表

Paimon目前支持Spark 3.2+进行SQL写入。

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
6.查询表

SQL 查询

SELECT * FROM my_table;

/*
1	Hi
2	Hello
*/

DataFrame 查询

val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/my_table")
dataset.show()

/*
+---+------+
| k |     v|
+---+------+
|  1|    Hi|
|  2| Hello|
+---+------+
*/
7.更新表

重要的Table属性设置:

  • 只有主键表支持此功能。
  • MergeEngine需要支持deduplicate或partial-update才能支持此功能。

注意:不支持更新主键。

UPDATE my_table SET v = 'new_value' WHERE id = 1;
8.Merge Into Table

Paimon目前支持Spark 3+中的Merge into语法,允许在单个commit中基于source table进行一组updates, insertions 和 deletions。

  1. 仅适用于主键表。
  2. 在update子句中,不支持更新主键列。
  3. WHEN NOT MATCHED BY SOURCE语法不支持。

示例 1:

如果目标表中存在,则更新它,否则插入它。

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

示例 2:

带有多个条件从句。

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;
9.Streaming Write

Paimon目前支持Spark 3+进行流式写入。

Paimon Structured Streaming仅支持appendcomplete模式。

// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")
10.Streaming Read

Paimon目前支持Spark 3.3+进行流式读取,支持的scan mode如下:

Scan Mode Description
latest For streaming sources, continuously reads latest changes without producing a snapshot at the beginning.
latest-full For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
from-timestamp For streaming sources, continuously reads changes starting from timestamp specified by “scan.timestamp-millis”, without producing a snapshot at the beginning.
from-snapshot For streaming sources, continuously reads changes starting from snapshot specified by “scan.snapshot-id”, without producing a snapshot at the beginning.
from-snapshot-full For streaming sources, produces from snapshot specified by “scan.snapshot-id” on the table upon first startup, and continuously reads changes.
default It is equivalent to from-snapshot if “scan.snapshot-id” is specified. It is equivalent to from-timestamp if “timestamp-millis” is specified. Or, It is equivalent to latest-full.

default scan mode 示例:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming 支持多种 streaming read modes,支持许多 triggers 和 read limits:

Key Default Type Description
read.stream.maxFilesPerTrigger (none) Integer The maximum number of files returned in a single batch.
read.stream.maxBytesPerTrigger (none) Long The maximum number of bytes returned in a single batch.
read.stream.maxRowsPerTrigger (none) Long The maximum number of rows returned in a single batch.
read.stream.minRowsPerTrigger (none) Long The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
read.stream.maxTriggerDelayMs (none) Long The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.

示例1:

使用 paimon 定义的 org.apache.spark.sql.streaming.Trigger.AvailableNow() 和 maxBytesPerTrigger:

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

示例2:

使用org.apache.spark.sql.connector.read.streaming.ReadMinRows:

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming通过以下两种方式支持以changelog的形式读取行(在行中添加rowkind以表示其更改类型):

  • 使用系统audit_log表直接流式读取
  • read.changelog设置为true(默认为false),然后使用table location进行流式读取
// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/
11.Schema Evolution

Schema evolution允许用户轻松修改表的current schema,以适应现有数据或随时间变化的新数据,同时保持数据的完整性和一致性。

Paimon支持在数据写入时自动进行source data和current table data的schema合并,并使用merged schema作为表的最新模式,只需要配置write.merge-schema

data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)

启用write.merge-schema时,Paimon默认允许用户在table schema上执行以下操作:

  • 添加列
  • 向上转换数据类型(Int -> Long)

Paimon还支持某些类型之间的显式类型转换(例如String -> Date, Long -> Int),需要配置write.merge-schema.explicit-cast

Schema evolution 也可以在streaming mode下使用。

val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)

相关配置如下:

Scan Mode Description
write.merge-schema If true, merge the data schema and the table schema automatically before write data.
write.merge-schema.explicit-cast If true, allow to merge data types if the two types meet the rules for explicit casting.
12.Spark Procedure

关于 Paimon 可用的 spark procedures。

Procedure Name Explaination Example
compact identifier: the target table identifier. Cannot be empty. partitions: partition filter. Left empty for all partitions. “,” means “AND” “;” means “OR” order_strategy: ‘order’ or ‘zorder’ or ‘hilbert’ or ‘none’. Left empty for ‘none’. order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’. If you want sort compact two partitions date=01 and date=02, you need to write ‘date=01;date=02’ If you want sort one partition with date=01 and day=01, you need to write ‘date=01,day=01’ SET spark.sql.shuffle.partitions=10; --set the compact parallelism CALL sys.compact(table => ‘T’, partitions => ‘p=0’, order_strategy => ‘zorder’, order_by => ‘a,b’)
expire_snapshots To expire snapshots. Argument:table: the target table identifier. Cannot be empty.retain_max: the maximum number of completed snapshots to retain.retain_min: the minimum number of completed snapshots to retain.older_than: timestamp before which snapshots will be removed.max_deletes: the maximum number of snapshots that can be deleted at once. CALL sys.expire_snapshots(table => ‘default.T’, retainMax => 10)
13.Spark Type Conversion

如下为Spark和Paimon之间支持的类型转换,Spark在org.apache.spark.sql.types下的数据类型都可用。

Spark Data Type Paimon Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType, CharType true
DateType DateType true
TimestampType TimestampType, LocalZonedTimestamp true
DecimalType(precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true

注意

  • 目前,Spark的字段注释无法在Flink客户端下表示。
  • 不支持Spark的UserDefinedType和Paimon的UserDefinedType之间的转换。
14.Spark2

Paimon支持Spark 2.4+,Spark2仅支持reading。

Spark 2.4下Paimon不支持DDL,可以使用Dataset reader ,并将Dataset注册为temporary table。

val dataset = spark.read.format("paimon").load("file:/tmp/paimon/default.db/word_count")
dataset.createOrReplaceTempView("word_count")
spark.sql("SELECT * FROM word_count").show()
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到