先放上github的官方地址:https://github.com/ververica/flink-cdc-connectors/tree/master
官方文档地址:https://ververica.github.io/flink-cdc-connectors/master/
代码工程使用的CDC为mysql的binlog的代码,其他数据库可以参考官网
1. 提前引入maven的环境依赖,和Scala的依赖配置,cdc使用2.2.0版本,开启mysql的binlog日志
<properties>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<encoding>UTF-8</encoding>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--注意这个插件也很重要,里面配置了addScalacArgs ,或者测试出现问题去idea 修改scala 编译选项,添加-target:jvm-1.8-->
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<addScalacArgs>-target:jvm-1.8</addScalacArgs>
</configuration>
</plugin>
</plugins>
</build>
2. 环境准备好后,进行代码编写,分为Usage for DataStream API和Usage for Table/SQL API
2.1 DataStream API
版本不一样导包也不太一样
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, createTypeInformation}
object ReadMysqlCDC {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//开启checkpoint
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointTimeout(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置并发checkpoint的数目
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//不清除checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//checkpoint保存
env.setStateBackend(new EmbeddedRocksDBStateBackend())
env.getCheckpointConfig.setCheckpointStorage("file:///D:/code/meiya/flink-def/src/main/resources/checkpoint/cdc")
//重试机制
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)))
val mySqlSource: MySqlSource[String] = MySqlSource.builder[String]()
.hostname("hd-03")
.port(3306)
.databaseList("db1")
.tableList("db1.stu")
.username("root")
.password("***")
//JsonDebeziumDeserializationSchema和StringDebeziumDeserializationSchema,
//string数据比较详细,也可以自定义实现DebeziumDeserializationSchema接口,进行数据展示
.deserializer(new JsonDebeziumDeserializationSchema)
.startupOptions(StartupOptions.initial)
.build()
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks[String], "aaa").print()
env.execute("Print MySQL Snapshot + Binlog")
}
}
2.2 Table/SQL API
此流程使用13.0的flink版本,sql不能控制台增量更新变动,因此改为13.5版本正常
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object ReadMysqlCDC_Sql {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//table-name 可以用*或. 来匹配
tableEnv.executeSql(
"""
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hd-03',
'port' = '3306',
'username' = 'root',
'password' = '***',
'database-name' = 'db1',
'table-name' = 'stu*',
'scan.startup.mode' = 'initial',
'debezium.inconsistent.schema.handling.mode' = 'warn'
)
""")
val table: Table = tableEnv.sqlQuery("select * from mysql_binlog")
//1. 转换流
tableEnv.toRetractStream[Row](table).print()
//2. 打印表方式
// table.execute().print()
env.execute("cdc-sql")
}
}
本文含有隐藏内容,请 开通VIP 后查看