引言
在大数据处理领域,Apache Flink 是一款强大的流处理和批处理框架,而 ClickHouse 则是一个高性能的列式数据库,专为在线分析处理(OLAP)场景设计。Flink ClickHouse 连接器为这两者之间搭建了一座桥梁,使得用户能够在 Flink 中方便地与 ClickHouse 数据库进行交互,实现数据的读写操作。本文将详细介绍 Flink ClickHouse 连接器的相关内容,包括其特点、使用方法、依赖配置等。
项目概述
Flink ClickHouse 连接器是一个用于 Flink SQL 的连接器,它基于 ClickHouse JDBC 实现,允许用户在 Flink 中直接操作 ClickHouse 数据库。目前,该项目支持 Source/Sink Table
和 Flink Catalog
功能。如果你在使用过程中遇到任何问题,可以在项目仓库中创建 issue,同时也欢迎为项目贡献代码。
主要特点
- 丰富的功能支持:支持作为数据源和数据接收器,并且可以通过 Flink Catalog 进行管理。
- 配置灵活:提供了多种配置选项,如批量大小、刷新间隔、最大重试次数等,方便用户根据实际需求进行调整。
项目使用前准备
依赖配置
该项目尚未发布到 Maven 中央仓库,因此在使用之前,需要将其部署或安装到自己的仓库中。具体步骤如下:
# 克隆项目
git clone https://github.com/itinycheng/flink-connector-clickhouse.git
# 进入项目目录
cd flink-connector-clickhouse/
# 显示远程分支
git branch -r
# 检出所需分支
git checkout $branch_name
# 安装或部署项目到自己的仓库
mvn clean install -DskipTests
mvn clean deploy -DskipTests
在 pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
连接器选项
Flink ClickHouse 连接器提供了一系列配置选项,以下是一些常用选项的介绍:
Option | Required | Default | Type | Description |
---|---|---|---|---|
url | required | none | String | The ClickHouse jdbc url in format `jdbc:(ch |
username | optional | none | String | The ‘username’ and ‘password’ must both be specified if any of them is specified. |
password | optional | none | String | The ClickHouse password. |
database-name | optional | default | String | The ClickHouse database name. |
table-name | required | none | String | The ClickHouse table name. |
use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. |
sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. |
sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. |
sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. |
sink.update-strategy | optional | update | String | Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard. |
sink.partition-strategy | optional | balanced | String | Partition strategy: balanced(round-robin), hash(partition key), shuffle(random). |
sink.partition-key | optional | none | String | Partition key used for hash strategy. |
数据类型映射
Flink 和 ClickHouse 有各自的数据类型体系,Flink ClickHouse 连接器提供了它们之间的映射关系,如下表所示:
Flink Type | ClickHouse Type |
---|---|
CHAR | String |
VARCHAR | String / IP / UUID |
STRING | String / Enum |
BOOLEAN | UInt8 |
BYTES | FixedString |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | Not supported |
MULTISET | Not supported |
RAW | Not supported |
如何使用
创建并读写表
在 Flink SQL 中,可以通过 CREATE TABLE
语句注册一个 ClickHouse 表,然后进行读写操作。示例代码如下:
-- register a clickhouse table `t_user` in flink sql.
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
`list` ARRAY<STRING>,
`map` Map<STRING, BIGINT>,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:ch://127.0.0.1:8123',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
-- read data from clickhouse
SELECT user_id, user_type from t_user;
-- write data into the clickhouse table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;
创建并使用 ClickHouseCatalog
SQL 方式
> CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'jdbc:ch://127.0.0.1:8123',
'username' = 'username',
'password' = 'password',
'database-name' = 'default',
'use-local' = 'false',
...
);
> USE CATALOG clickhouse;
> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
> INSERT INTO `default`.`t_user` SELECT ...;
Scala 方式
val tEnv = TableEnvironment.create(setting)
val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "jdbc:ch://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
val cHcatalog = new ClickHouseCatalog("clickhouse", props)
tEnv.registerCatalog("clickhouse", cHcatalog)
tEnv.useCatalog("clickhouse")
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
Java 方式
TableEnvironment tEnv = TableEnvironment.create(setting);
Map<String, String> props = new HashMap<>();
props.put(ClickHouseConfig.DATABASE_NAME, "default");
props.put(ClickHouseConfig.URL, "jdbc:ch://127.0.0.1:8123");
props.put(ClickHouseConfig.USERNAME, "username");
props.put(ClickHouseConfig.PASSWORD, "password");
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
tEnv.registerCatalog("clickhouse", cHcatalog);
tEnv.useCatalog("clickhouse");
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
总结
Flink ClickHouse 连接器为 Flink 和 ClickHouse 之间的集成提供了便捷的解决方案,使得用户能够在 Flink 中高效地读写 ClickHouse 数据库。通过本文的介绍,你应该对该连接器的使用方法有了较为全面的了解。在实际应用中,可以根据具体需求调整连接器的配置选项,以达到最佳的性能和效果。同时,也欢迎参与项目的开发和贡献,共同推动该项目的发展。