Flink2.0学习笔记:Table API & SQL

发布于:2025-07-21 ⋅ 阅读:(12) ⋅ 点赞:(0)

stevensu1/EC0720

表 API 和 SQL#

表 API 和 SQL——用于统一流和批处理 加工。表 API 是适用于 Java、Scala 和 Python 的语言集成查询 API,它 允许组合来自关系运算符的查询,例如 selection、filter 和 join in 一种非常直观的方式。Flink 的 SQL 支持基于实现 SQL 标准的 Apache Calcite。任一接口中指定的查询具有相同的语义 并指定相同的结果,无论输入是连续的(流式处理:无界)还是有界的(批处理:有界)。

我们的目标是同步mysql表和数据

先完成maven依赖:这里我们只引入flink-table-api-java:概览 |Apache Flink

如果在ide中运行:还要引入<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->三个模块:概览 |Apache Flink

接着是mysql连接相关JDBC |Apache Flink

JDBC SQL 连接器

JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据和将数据写入任何关系数据库。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

如果在 DDL 上定义了主键,则 JDBC 接收器以更新插入模式运行,以便与外部系统交换 UPDATE/DELETE 消息,否则,它以追加模式运行,不支持使用 UPDATE/DELETE 消息。

依次引入对应maven依赖:<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
到此所需的依赖引入完成。不过程序通常需要打包并通过web ui上传到Fink服务器上运行,Fink服务器通过java SPI服务发现运行我们的jar,关于java SPI接口,前面的文章《关于Red Hat Single Sign-On的User Storage SPI》里有提到过。

这是官网的插件配置地址:

第一步 |Apache Flink,所以要需要添加官方提供的maven打包插件:使用 Maven |Apache Flink

最后完整的依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FLINKTAS-TEST-Catalog</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>FLINKTAS-TEST-Catalog</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>2.0.0</flink.version>
    </properties>

    <dependencies>
        <!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!--flink-table-api-java    -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql		-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc-mysql</artifactId>
            <version>4.0.0-2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc-core</artifactId>
            <version>4.0.0-2.0</version>
        </dependency>


        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- Replace this with the main class of your job -->
                                    <mainClass>org.example.App</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>15</source>
                    <target>15</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

现在来实现java处理流程:

先理解一下Catalogs:他可把整个数据库一次性注册到表环境TableEnvironment中

Catalogs | Apache Flink

flink-connector-jdbc-mysql模块已经对mysql的Catalogs 做了实现MySqlCatalog,但是它不能创建物理表,对此需要对其进行扩展实现对应的建表逻辑。

这是我的实现:

package org.example;


import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.types.DataType;


import java.sql.*;
import java.util.*;

public class MyMySqlCatalog extends MySqlCatalog {


    public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
    }

    public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectionProperties) {
        super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
    }

    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

        // 检查数据库是否存在
        if (!databaseExists(tablePath.getDatabaseName())) {
            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
        }

        // 检查表是否已存在
        if (tableExists(tablePath)) {
            if (!ignoreIfExists) {
                return;
            }
        }
        Connection conn = null;
        try {
            conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), this.getUsername(), this.getPassword());

            String createTableSql = generateCreateTableSql(tablePath.getObjectName(), table);

            try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) {
                stmt.execute();
            }

        } catch (SQLException e) {
            throw new CatalogException(
                    String.format("Failed to create table %s", tablePath.getFullName()), e);
        } finally {
            try {
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    private String generateCreateTableSql(String tableName, CatalogBaseTable table) {
        StringBuilder sql = new StringBuilder();
        sql.append("CREATE TABLE IF NOT EXISTS `").append(tableName).append("` (");

        // 构建列定义
        Schema schema = table.getUnresolvedSchema();
        List<String> columnDefs = new ArrayList<>();

        for (Schema.UnresolvedColumn column : schema.getColumns()) {
            if (column instanceof Schema.UnresolvedPhysicalColumn) {
                Schema.UnresolvedPhysicalColumn physicalColumn =
                        (Schema.UnresolvedPhysicalColumn) column;
                String columnDef = String.format("`%s` %s",
                        physicalColumn.getName(),
                        convertFlinkTypeToMySql((DataType) physicalColumn.getDataType()));
                columnDefs.add(columnDef);
            }
        }

        sql.append(String.join(", ", columnDefs));
        sql.append(")");

        return sql.toString();
    }

    private String convertFlinkTypeToMySql(DataType dataType) {
        // 简化的类型转换,您可以根据需要扩展
        String typeName = dataType.getLogicalType().getTypeRoot().name();
        switch (typeName) {
            case "INTEGER":
                return "INT";
            case "VARCHAR":
                return "VARCHAR(255)";
            case "BIGINT":
                return "BIGINT";
            case "DOUBLE":
                return "DOUBLE";
            case "BOOLEAN":
                return "BOOLEAN";
            case "TIMESTAMP_WITHOUT_TIME_ZONE":
                return "TIMESTAMP";
            default:
                return "TEXT";
        }
    }
}

最后贴一下做数据同步过程的代码:

package org.example;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import java.util.List;

import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;

/**
 * Hello world!
 */
public class App {
    public static void main(String[] args) throws DatabaseNotExistException, TableAlreadyExistException {
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "my_catalog";
        String defaultDatabase = "test";
        String username = "root";
        String password = "root";
        String baseUrl = "jdbc:mysql://localhost:3306";

        MyMySqlCatalog catalog = new MyMySqlCatalog(
                ClassLoader.getSystemClassLoader(),
                name,
                defaultDatabase,
                username,
                password,
                baseUrl

        );
        tableEnv.registerCatalog("my_catalog", catalog);

        // set the JdbcCatalog as the current catalog of the session
        tableEnv.useCatalog("my_catalog");
        List<String> tables = catalog.listTables(defaultDatabase);
        boolean exists = catalog.tableExists(ObjectPath.fromString("test.my_table_03"));
        //如果表不存在,则创建
        if (!exists) {
            // 定义表的字段和类型
            List<Column> columns = List.of(
                    Column.physical("id", INT().notNull()),
                    Column.physical("name", STRING())
            );
            Schema.Builder chemaB = Schema.newBuilder();
            chemaB.column("id", INT().notNull());
            chemaB.column("name", STRING());
            chemaB.primaryKey("id");
            Schema chema = chemaB.build();
            CatalogTable catalogTable = CatalogTable.newBuilder()
                    .schema(chema)
                    .build();

            catalog.createTable(ObjectPath.fromString("test.my_table_03"), catalogTable, true);
        }

        tableEnv.executeSql("SELECT * FROM my_table_01").print();
        tableEnv.executeSql("SELECT * FROM my_table_03").print();
        // 执行同步
        tableEnv.executeSql("INSERT INTO my_table_03 SELECT id, name FROM my_table_01");
        System.out.println("Hello World!");
    }
}

执行结果:

但是如果系统表太多,注册Catalogs可能会很消耗Flink内存,所以也可以只把需要的表注册到表环境中,

package org.example;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;


/**
 * Hello world!
 */
public class App {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        registerMySqlTable(tableEnv);
        Table table1 = tableEnv.from("my_table_01");
        table1.printSchema();
        tableEnv.executeSql("SELECT * FROM my_table_01").print();


        registerMySqlTable02(tableEnv); // my_table_02
        Table table2 = tableEnv.from("my_table_02");
        table2.printSchema();
        tableEnv.executeSql("SELECT * FROM my_table_02").print();


        // 执行同步
        tableEnv.executeSql("INSERT INTO my_table_02 SELECT id, name FROM my_table_01");
        System.out.println("Hello World!");
    }

    /**
     * 注册 MySQL 表 my_table_02 到 Flink 表环境中
     */
    public static void registerMySqlTable02(TableEnvironment tableEnv) {
        tableEnv.executeSql(
                "CREATE TABLE my_table_02 (" +
                        "id INT PRIMARY KEY NOT ENFORCED, " +
                        "name STRING" +
                        ") WITH (" +
                        "'connector' = 'jdbc', " +
                        "'url' = 'jdbc:mysql://localhost:3306/test', " +
                        "'table-name' = 'my_table_02', " +
                        "'username' = 'root', " +
                        "'password' = 'root'" +
                        ")"
        );
    }

    /**
     * 注册 MySQL 表到 Flink 表环境中
     */
    public static void registerMySqlTable(TableEnvironment tableEnv) {
        tableEnv.executeSql(
                "CREATE TABLE my_table_01 (" +
                        "id INT PRIMARY KEY NOT ENFORCED," +
                        "name STRING" +
                        ") WITH (" +
                        "'connector' = 'jdbc'," +
                        "'url' = 'jdbc:mysql://localhost:3306/test'," +
                        "'table-name' = 'my_table_01'," +
                        "'username' = 'root'," +
                        "'password' = 'root'" +
                        ")"
        );
    }
}

这样也可以实现数据同步。最后优化建议可以使用jdbc连接池技术。


网站公告

今日签到

点亮在社区的每一天
去签到