Debezium SchemaNameAdjuster 分析

发布于:2024-12-18 ⋅ 阅读:(48) ⋅ 点赞:(0)

Debezium SchemaNameAdjuster 分析

目录

1. 概述

SchemaNameAdjuster 是 Debezium 中的一个工具类,主要用于确保 Schema 名称符合 Avro 命名规范。在数据库变更事件被转换为 Kafka 消息时,需要为每个表和字段创建相应的 Avro Schema,而这些名称必须符合 Avro 的命名规则。

2. 核心功能

  1. 名称校验

    • 检查 Schema 名称是否符合 Avro 命名规范
    • 验证首字符和非首字符的合法性
  2. 名称调整

    • 将不合法字符替换为合法字符(默认使用下划线’_')
    • 保持名称的语义性和可读性
  3. 冲突处理

    • 检测并处理名称冲突
    • 支持自定义冲突处理策略

3. 实现原理

3.1 核心接口

public interface SchemaNameAdjuster {
   
    /**
     * 调整提议的名称使其符合 Avro 命名规范
     */
    String adjust(String proposedName);
}

3.2 名称验证规则

  1. 首字符规则
public static boolean isValidFullnameFirstCharacter(char c) {
   
    return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_';
}
  1. 非首字符规则
public static boolean isValidFullnameNonFirstCharacter(char c) {
   
    return c == '.' || isValidFullnameFirstCharacter(c) || (c >= '0' && c <= '9');
}

3.3 调整策略

  1. 默认策略
SchemaNameAdjuster adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) -> {
   
    LOGGER.warn("Schema name '{}' is invalid, using '{}' instead", original, replacement);
});
  1. 自定义替换
SchemaNameAdjuster customAdjuster = SchemaNameAdjuster.create(
    c -> c == '-' ? "_" : String.valueOf(c),
    (original, replacement, conflict) -> {
   
        // 自定义冲突处理逻辑
    }
);

4. 应用场景

4.1 表 Schema 构建

在构建数据库表的 Schema 时,需要为 Value Schema 和 Key Schema 生成合法的 Avro 名称:

SchemaBuilder valSchemaBuilder = SchemaBuilder.struct()
    .name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct()
    .name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));

4.2 CloudEvents 格式转换

在将 Debezium 事件转换为 CloudEvents 格式时,需要调整 Schema 名称:

CESchemaBuilder ceSchemaBuilder = defineSchema()
    .withName(schemaNameAdjuster.adjust(maker.ceEnvelopeSchemaName()))

4.3 逻辑表路由

在进行逻辑表路由时,需要为新的目标主题生成合法的 Schema 名称:

valueBuilder.name(schemaNameAdjuster.adjust(newTopicName + ".Value"));

4.4 心跳机制

在配置心跳机制时,需要确保心跳消息的 Schema 名称符合规范:

return new HeartbeatImpl(
    interval,
    topic,
    logicalName,
    schemaNameAdjuster);

4.5 基本用例

  1. 表名转换
String tableName = "my-table";
String adjustedName = adjuster.adjust(tableName);  // 结果: "my_table"
  1. 复杂Schema名称
String complexName = "com.example.my-schema.v2";
String adjusted = adjuster.adjust(complexName);    // 结果: "com.example.my_schema.v2"
  1. 特殊字符处理
String specialChars = "table$name@2.0";
String adjusted = adjuster.adjust(specialChars);   // 结果: "table_name_2.0"

4.6 具体Schema生成示例

让我们以一个具体的表结构为例,展示 SchemaNameAdjuster 如何处理 Schema 名称:

-- 原始表结构
CREATE TABLE inventory.products (
    id INT PRIMARY KEY,
    name VARCHAR(255),
    description TEXT,
    weight DECIMAL(5,

网站公告

今日签到

点亮在社区的每一天
去签到