flink:通过table api把文件中读取的数据写入MySQL

发布于:2024-03-15 ⋅ 阅读:(89) ⋅ 点赞:(0)

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;

public class Test41 {
    //demo 是MySQL中已经创建好的表
    //create table demo (userId varchar(50) not null,total bigint,avgVal double);
    private static String FILE_PATH = "info.txt";
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        tableEnv.connect(new FileSystem().path(FILE_PATH))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("userId", DataTypes.VARCHAR(50))
                        .field("ts", DataTypes.INT())
                        .field("val", DataTypes.DOUBLE()))
                .createTemporaryTable("input");



        Table dataTable = tableEnv.from("input");
        Table aggregateTable = dataTable
                .groupBy("userId")
                .select("userId, userId.count as total, val.avg as avgVal");


        String sql=

                "create table jdbcOutputTable (" +

                        " userId varchar(50) not null,total bigint,avgVal double " +

                        ") with (" +

                        " 'connector.type' = 'jdbc', " +

                        " 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +

                        " 'connector.table' = 'demo', " +

                        " 'connector.driver' = 'com.mysql.jdbc.Driver', " +

                        " 'connector.username' = 'root', " +

                        " 'connector.password' = 123456' )";

        tableEnv.sqlUpdate(sql);

        aggregateTable.insertInto("jdbcOutputTable");




        tableEnv.execute("my job");

    }
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

网站公告

今日签到

点亮在社区的每一天
去签到