Flink Table API 编程入门实践
前言
Apache Flink 是目前大数据实时计算领域的明星产品,Flink Table API 则为开发者提供了声明式、类似 SQL 的数据处理能力,兼具 SQL 的易用性与编程 API 的灵活性。本文将带你快速了解 Flink Table API 的基本用法,并通过代码示例帮助你快速上手。
一、环境准备
在 Flink 中,所有 Table API 操作都需要基于 TableEnvironment
。对于流处理场景,我们一般这样创建环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
二、数据源定义
Table API 支持多种数据源。最常见的两种方式为:
1. 从 DataStream 创建 Table
DataStream<MyPojo> dataStream = env.fromElements(
new MyPojo("Alice", 12),
new MyPojo("Bob", 10)
);
Table table = tableEnv.fromDataStream(dataStream);
2. 从外部系统注册 Table
比如从 Kafka 注册一张表:
tableEnv.executeSql(
"CREATE TABLE user_orders (" +
" user_id STRING, " +
" order_amount DOUBLE " +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'orders', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json'" +
")"
);
三、Table API 常见操作
Table API 提供了丰富的数据处理能力,如筛选、聚合、分组、连接等。例如:
import static org.apache.flink.table.api.Expressions.$;
// 筛选和选择字段
Table result = table
.filter($("age").isGreater(10))
.select($("name"), $("age"));
// 分组聚合
Table agg = table
.groupBy($("name"))
.select($("name"), $("age").avg().as("avg_age"));
四、结果输出
将 Table 转换为 DataStream,方便后续处理或输出:
DataStream<Row> resultStream = tableEnv.toDataStream(result);
resultStream.print();
五、与 SQL API 结合
Table API 与 SQL API 可以无缝结合。例如:
Table sqlResult = tableEnv.sqlQuery(
"SELECT name, AVG(age) as avg_age FROM my_table GROUP BY name"
);
六、完整示例
下面是一个完整的 Flink Table API 示例,演示数据流到 Table 的转换、聚合与结果输出:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class TableApiDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建数据流
DataStream<MyPojo> dataStream = env.fromElements(
new MyPojo("Alice", 12),
new MyPojo("Bob", 10),
new MyPojo("Alice", 15)
);
// 转换为 Table
Table table = tableEnv.fromDataStream(dataStream);
// Table API 查询
Table result = table
.groupBy($("name"))
.select($("name"), $("age").avg().as("avg_age"));
// 输出结果
DataStream<Row> resultStream = tableEnv.toDataStream(result);
resultStream.print();
env.execute();
}
public static class MyPojo {
public String name;
public Integer age;
public MyPojo() {}
public MyPojo(String name, Integer age) {
this.name = name;
this.age = age;
}
}
}
七、常见问题与建议
- 字段名区分大小写,需与数据结构一致。
- Table API 与 SQL API 可混用,灵活应对不同场景。
- 生产环境推荐结合 Catalog 管理元数据。
- Flink 1.14 以后批流统一,建议优先采用流模式开发。
结语
Flink Table API 极大地提升了大数据实时处理的开发效率,结合 SQL 的易用性和 API 的灵活性,非常适合复杂业务场景的数据处理。希望本文能帮你快速入门 Flink Table API,后续还可以深入了解窗口聚合、UDF、自定义 Connector 等高级特性。
如果你在学习和实践中遇到问题,欢迎留言交流!