Flink 跨 Catalog 读写(kafka写hive)生产实践
背景
- Flink Table/SQL 任务开发中,常见多种数据源混合使用,如 Kafka、Hive 等。
- 生产要求:Kafka 等 Source 表不要注册到 Hive Metastore,Hive Sink 表必须在 HiveCatalog/Hive Metastore 中注册,以保证数据治理、权限、血缘等规范。
常见问题
- 误操作:在 HiveCatalog 下创建了 Kafka Source 表,导致该表结构同步进 Hive Metastore,Hive CLI 能查到“假表”,影响治理。
- 跨 Catalog 查询:切换 Catalog 后直接用表名引用其它 Catalog 下的表,会报 “Object not found” 错误。
方法:全限定名写法(生产环境标准)
步骤
注册 HiveCatalog,但不切换
tenv.registerCatalog("myhive", hive);
在默认 Catalog 下创建 Kafka 源表
tenv.useCatalog("default_catalog"); tenv.executeSql("CREATE TABLE kafka_source (...) WITH ('connector'='kafka', ...)"); --------------------- 比如: tenv.useCatalog("default_catalog"); tenv.executeSql( "CREATE TABLE IF NOT EXISTS kafka_source ( " + " id STRING, " + " name STRING, " + " age INT " + ") WITH ( " + " 'connector' = 'kafka', " + " 'topic' = 'flinktest1', " + " 'properties.bootstrap.servers' = '192.168.77.88:9092', " + " 'properties.group.id' = 'flink-group', " + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'json', " + " 'json.fail-on-missing-field' = 'false', " + " 'json.ignore-parse-errors' = 'true' " + ")" );
、
```
切换到 HiveCatalog,创建 Sink 表
tenv.useCatalog("myhive"); tenv.executeSql("CREATE TABLE hive_sink (...) WITH ('connector'='hive', ...)"); ----------------------- 比如: tenv.useCatalog(name); tenv.executeSql( "CREATE TABLE IF NOT EXISTS hive_sink ( " + " id STRING, " + " name STRING, " + " age INT " + ") WITH ( " + " 'connector' = 'hive', " + " 'table-name' = 'hive_sink' " + ")" );
insert 时,使用全限定名引用 Source 表
tenv.executeSql("INSERT INTO hive_sink SELECT * FROM default_catalog.default_database.kafka_source");
关键点
- Kafka Source 表只在 Flink 的 default_catalog 下注册,不写入 Hive Metastore。
- Hive Sink 表只在 HiveCatalog 下注册,Hive CLI 能查到。
- 跨 Catalog SQL 必须用全限定名 catalog.database.table。
优点
- 保证 Hive Metastore 只含有真实的 Hive 表,血缘、权限清晰。
- 避免非 Hive connector 表污染 HiveCatalog。
- 生产安全,结构清晰。
最佳实践建议
- Kafka/HBase/Print 等 Source 表,严禁在 HiveCatalog 下创建!
- 跨 Catalog 访问 Source,SQL 必须用全限定名
catalog.database.table
。 - 团队协作时将此写法纳入开发规范。
示意图
[default_catalog] [HiveCatalog(myhive)]
| kafka_source | hive_sink
| (仅 Flink 可见) | (Hive 可见)
| |
insert ... select default_catalog.default_database.kafka_source