【无标题】

发布于:2025-08-03 ⋅ 阅读:(14) ⋅ 点赞:(0)

Flink 跨 Catalog 读写(kafka写hive)生产实践

背景

  • Flink Table/SQL 任务开发中,常见多种数据源混合使用,如 Kafka、Hive 等。
  • 生产要求:Kafka 等 Source 表不要注册到 Hive Metastore,Hive Sink 表必须在 HiveCatalog/Hive Metastore 中注册,以保证数据治理、权限、血缘等规范。

常见问题

  • 误操作:在 HiveCatalog 下创建了 Kafka Source 表,导致该表结构同步进 Hive Metastore,Hive CLI 能查到“假表”,影响治理。
  • 跨 Catalog 查询:切换 Catalog 后直接用表名引用其它 Catalog 下的表,会报 “Object not found” 错误。

方法:全限定名写法(生产环境标准)

步骤

  1. 注册 HiveCatalog,但不切换

    tenv.registerCatalog("myhive", hive);
    
  2. 在默认 Catalog 下创建 Kafka 源表

    tenv.useCatalog("default_catalog"); 
    tenv.executeSql("CREATE TABLE kafka_source (...) WITH ('connector'='kafka', ...)");
    ---------------------
    比如:
          tenv.useCatalog("default_catalog"); 
          tenv.executeSql(
                "CREATE TABLE IF NOT EXISTS kafka_source ( " +
                        " id STRING, " +
                        " name STRING, " +
                        " age INT " +
                        ") WITH ( " +
                        " 'connector' = 'kafka', " +
                        " 'topic' = 'flinktest1', " +
                        " 'properties.bootstrap.servers' = '192.168.77.88:9092', " +
                        " 'properties.group.id' = 'flink-group', " +
                        " 'scan.startup.mode' = 'earliest-offset', " +
                        " 'format' = 'json', " +
                        " 'json.fail-on-missing-field' = 'false', " +
                        " 'json.ignore-parse-errors' = 'true' " +
                        ")"
        );
    
    


```

  1. 切换到 HiveCatalog,创建 Sink 表

    tenv.useCatalog("myhive");
    tenv.executeSql("CREATE TABLE hive_sink (...) WITH ('connector'='hive', ...)");
    -----------------------
    比如:
        tenv.useCatalog(name);
        tenv.executeSql(
                "CREATE TABLE IF NOT EXISTS hive_sink ( " +
                        " id STRING, " +
                        " name STRING, " +
                        " age INT " +
                        ") WITH ( " +
                        " 'connector' = 'hive', " +
                        " 'table-name' = 'hive_sink' " +
                        ")"
        );
    
    
    
  2. insert 时,使用全限定名引用 Source 表

    tenv.executeSql("INSERT INTO hive_sink SELECT * FROM default_catalog.default_database.kafka_source");
    
    

关键点

  • Kafka Source 表只在 Flink 的 default_catalog 下注册,不写入 Hive Metastore。
  • Hive Sink 表只在 HiveCatalog 下注册,Hive CLI 能查到。
  • 跨 Catalog SQL 必须用全限定名 catalog.database.table。

优点

  • 保证 Hive Metastore 只含有真实的 Hive 表,血缘、权限清晰。
  • 避免非 Hive connector 表污染 HiveCatalog。
  • 生产安全,结构清晰。

最佳实践建议

  • Kafka/HBase/Print 等 Source 表,严禁在 HiveCatalog 下创建!
  • 跨 Catalog 访问 Source,SQL 必须用全限定名 catalog.database.table
  • 团队协作时将此写法纳入开发规范。

示意图

[default_catalog]      [HiveCatalog(myhive)]
    |   kafka_source          |   hive_sink
    |   (仅 Flink 可见)         |   (Hive 可见)
    |                        |
           insert ... select default_catalog.default_database.kafka_source


网站公告

今日签到

点亮在社区的每一天
去签到