flink CDC scala版本实战

发布于:2022-07-24 ⋅ 阅读:(673) ⋅ 点赞:(0)

先放上github的官方地址:https://github.com/ververica/flink-cdc-connectors/tree/master
官方文档地址:https://ververica.github.io/flink-cdc-connectors/master/

代码工程使用的CDC为mysql的binlog的代码,其他数据库可以参考官网

1. 提前引入maven的环境依赖,和Scala的依赖配置,cdc使用2.2.0版本,开启mysql的binlog日志

 <properties>
        <flink.version>1.13.5</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <encoding>UTF-8</encoding>
 </properties>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
 </dependency>
 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
 </dependency>
  <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>
  </dependency>
  <dependency>
            <groupId>com.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
  </dependency>
  <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
  </dependency>
 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
 </dependency>
 
<!--注意这个插件也很重要,里面配置了addScalacArgs ,或者测试出现问题去idea 修改scala 编译选项,添加-target:jvm-1.8-->
   <build>
   <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                </configuration>
            </plugin>
          </plugins>
   </build>

2. 环境准备好后,进行代码编写,分为Usage for DataStream API和Usage for Table/SQL API

2.1 DataStream API

版本不一样导包也不太一样

import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, createTypeInformation}

object ReadMysqlCDC {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //开启checkpoint
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointTimeout(10000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设置并发checkpoint的数目
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //不清除checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //checkpoint保存
    env.setStateBackend(new EmbeddedRocksDBStateBackend())
    env.getCheckpointConfig.setCheckpointStorage("file:///D:/code/meiya/flink-def/src/main/resources/checkpoint/cdc")
    //重试机制
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)))

    val mySqlSource: MySqlSource[String] = MySqlSource.builder[String]()
      .hostname("hd-03")
      .port(3306)
      .databaseList("db1")
      .tableList("db1.stu")
      .username("root")
      .password("***")
      //JsonDebeziumDeserializationSchema和StringDebeziumDeserializationSchema,
      //string数据比较详细,也可以自定义实现DebeziumDeserializationSchema接口,进行数据展示
      .deserializer(new JsonDebeziumDeserializationSchema)
      .startupOptions(StartupOptions.initial)
      .build()

    env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks[String], "aaa").print()
    env.execute("Print MySQL Snapshot + Binlog")
  }

}

2.2 Table/SQL API

此流程使用13.0的flink版本,sql不能控制台增量更新变动,因此改为13.5版本正常

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object ReadMysqlCDC_Sql {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//table-name 可以用*或. 来匹配
    tableEnv.executeSql(
      """
               CREATE TABLE mysql_binlog (
               id INT NOT NULL,
               name STRING,
               PRIMARY KEY(id) NOT ENFORCED
              ) WITH (
               'connector' = 'mysql-cdc',
               'hostname' = 'hd-03',
               'port' = '3306',
               'username' = 'root',
               'password' = '***',
               'database-name' = 'db1',
               'table-name' = 'stu*',
               'scan.startup.mode' = 'initial',
               'debezium.inconsistent.schema.handling.mode' = 'warn'
              )
        """)

    val table: Table = tableEnv.sqlQuery("select * from mysql_binlog")
    //1. 转换流
    tableEnv.toRetractStream[Row](table).print()
    //2. 打印表方式
    //    table.execute().print()
    env.execute("cdc-sql")
  }
}
本文含有隐藏内容,请 开通VIP 后查看