Flink ClickHouse 连接器:实现 Flink 与 ClickHouse 无缝对接

发布于:2025-07-07 ⋅ 阅读:(18) ⋅ 点赞:(0)

引言

在大数据处理领域,Apache Flink 是一款强大的流处理和批处理框架,而 ClickHouse 则是一个高性能的列式数据库,专为在线分析处理(OLAP)场景设计。Flink ClickHouse 连接器为这两者之间搭建了一座桥梁,使得用户能够在 Flink 中方便地与 ClickHouse 数据库进行交互,实现数据的读写操作。本文将详细介绍 Flink ClickHouse 连接器的相关内容,包括其特点、使用方法、依赖配置等。

项目概述

Flink ClickHouse 连接器是一个用于 Flink SQL 的连接器,它基于 ClickHouse JDBC 实现,允许用户在 Flink 中直接操作 ClickHouse 数据库。目前,该项目支持 Source/Sink TableFlink Catalog 功能。如果你在使用过程中遇到任何问题,可以在项目仓库中创建 issue,同时也欢迎为项目贡献代码。

主要特点

  • 丰富的功能支持:支持作为数据源和数据接收器,并且可以通过 Flink Catalog 进行管理。
  • 配置灵活:提供了多种配置选项,如批量大小、刷新间隔、最大重试次数等,方便用户根据实际需求进行调整。

项目使用前准备

依赖配置

该项目尚未发布到 Maven 中央仓库,因此在使用之前,需要将其部署或安装到自己的仓库中。具体步骤如下:

# 克隆项目
git clone https://github.com/itinycheng/flink-connector-clickhouse.git

# 进入项目目录
cd flink-connector-clickhouse/

# 显示远程分支
git branch -r

# 检出所需分支
git checkout $branch_name

# 安装或部署项目到自己的仓库
mvn clean install -DskipTests
mvn clean deploy -DskipTests

pom.xml 文件中添加以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-clickhouse</artifactId>
    <version>1.16.0-SNAPSHOT</version>
</dependency>

连接器选项

Flink ClickHouse 连接器提供了一系列配置选项,以下是一些常用选项的介绍:

Option Required Default Type Description
url required none String The ClickHouse jdbc url in format `jdbc:(ch
username optional none String The ‘username’ and ‘password’ must both be specified if any of them is specified.
password optional none String The ClickHouse password.
database-name optional default String The ClickHouse database name.
table-name required none String The ClickHouse table name.
use-local optional false Boolean Directly read/write local tables in case of distributed table engine.
sink.batch-size optional 1000 Integer The max flush size, over this will flush data.
sink.flush-interval optional 1s Duration Over this flush interval mills, asynchronous threads will flush data.
sink.max-retries optional 3 Integer The max retry times when writing records to the database failed.
sink.update-strategy optional update String Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard.
sink.partition-strategy optional balanced String Partition strategy: balanced(round-robin), hash(partition key), shuffle(random).
sink.partition-key optional none String Partition key used for hash strategy.

数据类型映射

Flink 和 ClickHouse 有各自的数据类型体系,Flink ClickHouse 连接器提供了它们之间的映射关系,如下表所示:

Flink Type ClickHouse Type
CHAR String
VARCHAR String / IP / UUID
STRING String / Enum
BOOLEAN UInt8
BYTES FixedString
DECIMAL Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
TINYINT Int8
SMALLINT Int16 / UInt8
INTEGER Int32 / UInt16 / Interval
BIGINT Int64 / UInt32
FLOAT Float32
DOUBLE Float64
DATE Date
TIME DateTime
TIMESTAMP DateTime
TIMESTAMP_LTZ DateTime
INTERVAL_YEAR_MONTH Int32
INTERVAL_DAY_TIME Int64
ARRAY Array
MAP Map
ROW Not supported
MULTISET Not supported
RAW Not supported

如何使用

创建并读写表

在 Flink SQL 中,可以通过 CREATE TABLE 语句注册一个 ClickHouse 表,然后进行读写操作。示例代码如下:

-- register a clickhouse table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    `list` ARRAY<STRING>,
    `map` Map<STRING, BIGINT>,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'jdbc:ch://127.0.0.1:8123',
    'database-name' = 'tutorial',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);

-- read data from clickhouse 
SELECT user_id, user_type from t_user;

-- write data into the clickhouse table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;

创建并使用 ClickHouseCatalog

SQL 方式
> CREATE CATALOG clickhouse WITH (
    'type' = 'clickhouse',
    'url' = 'jdbc:ch://127.0.0.1:8123',
    'username' = 'username',
    'password' = 'password',
    'database-name' = 'default',
    'use-local' = 'false',
    ...
);

> USE CATALOG clickhouse;
> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
> INSERT INTO `default`.`t_user` SELECT ...;
Scala 方式
val tEnv = TableEnvironment.create(setting)

val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "jdbc:ch://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
val cHcatalog = new ClickHouseCatalog("clickhouse", props)
tEnv.registerCatalog("clickhouse", cHcatalog)
tEnv.useCatalog("clickhouse")

tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
Java 方式
TableEnvironment tEnv = TableEnvironment.create(setting);

Map<String, String> props = new HashMap<>();
props.put(ClickHouseConfig.DATABASE_NAME, "default");
props.put(ClickHouseConfig.URL, "jdbc:ch://127.0.0.1:8123");
props.put(ClickHouseConfig.USERNAME, "username");
props.put(ClickHouseConfig.PASSWORD, "password");
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
tEnv.registerCatalog("clickhouse", cHcatalog);
tEnv.useCatalog("clickhouse");

tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");

总结

Flink ClickHouse 连接器为 Flink 和 ClickHouse 之间的集成提供了便捷的解决方案,使得用户能够在 Flink 中高效地读写 ClickHouse 数据库。通过本文的介绍,你应该对该连接器的使用方法有了较为全面的了解。在实际应用中,可以根据具体需求调整连接器的配置选项,以达到最佳的性能和效果。同时,也欢迎参与项目的开发和贡献,共同推动该项目的发展。