Flink ClickHouse 连接器数据读取源码深度解析

发布于:2025-07-07 ⋅ 阅读:(17) ⋅ 点赞:(0)
一、引言

在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。

二、整体架构概述

Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat 及其子类展开。AbstractClickHouseInputFormat 是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormatClickHouseShardInputFormat 实现,它们分别适用于不同的场景,以满足多样化的读取需求。

三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>
        implements ResultTypeQueryable<RowData> {

    protected final String[] fieldNames;

    protected final TypeInformation<RowData> rowDataTypeInfo;

    protected final Object[][] parameterValues;

    protected final String parameterClause;

    protected final String filterClause;

    protected final long limit;

    protected AbstractClickHouseInputFormat(
            String[] fieldNames,
            TypeInformation<RowData> rowDataTypeInfo,
            Object[][] parameterValues,
            String parameterClause,
            String filterClause,
            long limit) {
        this.fieldNames = fieldNames;
        this.rowDataTypeInfo = rowDataTypeInfo;
        this.parameterValues = parameterValues;
        this.parameterClause = parameterClause;
        this.filterClause = filterClause;
        this.limit = limit;
    }

AbstractClickHouseInputFormat 继承自 RichInputFormat 并实现了 ResultTypeQueryable 接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat 实例时,所有必要的信息都已正确设置。

2. AbstractClickHouseInputFormat.Builder

AbstractClickHouseInputFormat.Builder 类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输入格式实例。

public Builder withOptions(ClickHouseReadOptions readOptions) {
    this.readOptions = readOptions;
    return this;
}

public Builder withConnectionProperties(Properties connectionProperties) {
    this.connectionProperties = connectionProperties;
    return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {
    Preconditions.checkNotNull(readOptions);
    Preconditions.checkNotNull(connectionProperties);
    Preconditions.checkNotNull(fieldNames);
    Preconditions.checkNotNull(fieldTypes);
    Preconditions.checkNotNull(rowDataTypeInfo);

    ClickHouseConnectionProvider connectionProvider = null;
    try {
        connectionProvider =
                new ClickHouseConnectionProvider(readOptions, connectionProperties);
        DistributedEngineFull engineFullSchema =
                getDistributedEngineFull(
                        connectionProvider.getOrCreateConnection(),
                        readOptions.getDatabaseName(),
                        readOptions.getTableName());
        boolean isDistributed = engineFullSchema != null;

        if (isDistributed && readOptions.isUseLocal()) {
            initShardInfo(connectionProvider, engineFullSchema);
            initPartitionInfo();
        } else if (readOptions.getPartitionColumn() != null) {
            initPartitionInfo();
        }

        LogicalType[] logicalTypes =
                Arrays.stream(fieldTypes)
                        .map(DataType::getLogicalType)
                        .toArray(LogicalType[]::new);
        return isDistributed && readOptions.isUseLocal()
                ? createShardInputFormat(logicalTypes, engineFullSchema)
                : createBatchInputFormat(logicalTypes);
    } catch (Exception e) {
        throw new RuntimeException("Build ClickHouse input format failed.", e);
    } finally {
        if (connectionProvider != null) {
            connectionProvider.closeConnections();
        }
    }
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat

ClickHouseBatchInputFormat 用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat 用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat 会根据分片信息从各个分片上并行读取数据,从而提高读取效率。

private AbstractClickHouseInputFormat createShardInputFormat(
        LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {
    return new ClickHouseShardInputFormat(
            new ClickHouseConnectionProvider(readOptions, connectionProperties),
            new ClickHouseRowConverter(RowType.of(logicalTypes)),
            readOptions,
            engineFullSchema,
            shardMap,
            shardValues,
            fieldNames,
            rowDataTypeInfo,
            parameterValues,
            parameterClause,
            filterClause,
            limit);
}

private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {
    return new ClickHouseBatchInputFormat(
            new ClickHouseConnectionProvider(readOptions, connectionProperties),
            new ClickHouseRowConverter(RowType.of(logicalTypes)),
            readOptions,
            fieldNames,
            rowDataTypeInfo,
            parameterValues,
            parameterClause,
            filterClause,
            limit);
}

这两个方法分别用于创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter 用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData 格式。

4. FilterPushDownHelper
public class FilterPushDownHelper {

    private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();

    static {
        FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);
        FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);
        FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);
        FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);
        FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);
        FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);
        FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);
        FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);
        FILTERS.put(BuiltInFunctionDefinitions.AND, AND);
        FILTERS.put(BuiltInFunctionDefinitions.OR, OR);
    }

    public static String convert(List<ResolvedExpression> filters) {
        int filterSize = filters.size();
        return filters.stream()
                .map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(joining(" AND "));
    }

FilterPushDownHelper 类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert 方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。

四、读取流程总结
  1. 配置参数:使用 AbstractClickHouseInputFormat.BuilderwithXXX 方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。
  2. 构建输入格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchInputFormatClickHouseShardInputFormat 实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。
  3. 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用 FilterPushDownHelper 进行过滤条件的下推,减少不必要的数据传输,提高读取效率。
  4. 资源管理:在读取完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理使用过滤条件下推:尽量使用 FilterPushDownHelper 提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。
  2. 并行读取数据:对于分布式表,可以使用 ClickHouseShardInputFormat 进行分片读取,并行从各个分片上读取数据,提高读取效率。
  3. 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论

通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。