官方文档
版本
- Flink 1.15.3
- CDC 2.3.0
- Oracle 11G 12C (官网说支持19,未测试)
Jar包
- 2.1-3.0 : https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/
- 3.1+ : https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/
Oracle 安装
Docker 安装 Oracle 11G
Docker 安装 Oracle 12C
开启归档模式
-- 1. 关闭数据库
SHUTDOWN IMMEDIATE;
-- 2. 启动数据库到 MOUNT 状态
STARTUP MOUNT;
-- 3. 启用归档模式
ALTER DATABASE ARCHIVELOG;
-- 4. 打开数据库
ALTER DATABASE OPEN;
-- 5. 验证归档状态
ARCHIVE LOG LIST;
SP2-0718: illegal ARCHIVE LOG option
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 20
Next log sequence to archive 22
Current log sequence 22
-- 6. 也可以单独查询 log_mode,默认 NOARCHIVELOG
select log_mode from v$database;
ARCHIVELOG
-- 7. 启用最小补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 8. 启用所有列补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
创建表空间、用户并赋权
这里参考官网即可
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/xe/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
创建测试表
Source
CREATE TABLE FLINKUSER.CDC_SOURCE (
ID INTEGER NOT NULL,
NAME VARCHAR2(100),
CONSTRAINT CDC_SOURCE_PK PRIMARY KEY (ID)
) ;
INSERT INTO FLINKUSER.CDC_SOURCE (ID, NAME )VALUES(1, '1');
INSERT INTO FLINKUSER.CDC_SOURCE (ID, NAME )VALUES(2, '2');
……
Sink
CREATE TABLE FLINKUSER.CDC_SINK (
ID INTEGER NOT NULL,
NAME VARCHAR2(100),
CONSTRAINT CDC_SINK_PK PRIMARY KEY (ID)
) ;
CDC Oracle2Oracle
set yarn.application.name=cdc_oracle2oracle;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;
set taskmanager.numberOfTaskSlots=1;
set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_oracle2oracle;
set execution.target=yarn-per-job;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;
CREATE TABLE oracle_cdc_source (
ID int PRIMARY KEY NOT ENFORCED,
NAME string
) WITH (
'connector' = 'oracle-cdc',
'url' = 'jdbc:oracle:thin:@192.168.44.128:1522:XE',
'hostname' = '192.168.44.128',
'port' = '1522',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'FLINKUSER',
'table-name' = 'CDC_SOURCE',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);
create table oracle_cdc_sink (
ID int PRIMARY KEY NOT ENFORCED,
NAME string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@192.168.44.128:1522:XE',
'username' = 'flinkuser',
'password' = 'flinkpw',
'table-name' = 'FLINKUSER.CDC_SINK',
'sink.buffer-flush.max-rows' = '1000000'
);
insert into oracle_cdc_sink select * from oracle_cdc_source;
注意:
- Oracle CDC SQL 中的 字段名称、database-name、schema-name、table-name都要大写,否则会有问题。
- Source 和 Sink表都要有主键,否则数据量对不上,差异很大,好像有参数能支持没有主键的表,暂时没有验证通。
异常
异常1
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: oracle_cdc_source[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: oracle_cdc_sink[3]' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:201)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:394)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197)
... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database version
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:159)
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71)
at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51)
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139)
... 9 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.44.128:1522:XE
# 如果没有使用 url 参数
# Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.44.128:1522/XE
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184)
at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517)
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129)
... 15 more
- jar 冲突:calcite-core-1.10.0.jar.
- 移除 flink lib下该包
异常2
Caused by: java.sql.SQLException: Invalid column type
at oracle.jdbc.driver.Redirector$2.redirect(Redirector.java:261)
at oracle.jdbc.driver.Representation.getObject(Representation.java:423)
at oracle.jdbc.driver.Accessor.getObject(Accessor.java:986)
at oracle.jdbc.driver.OracleStatement.getObject(OracleStatement.java:6521)
at oracle.jdbc.driver.InsensitiveScrollableResultSet.getObject(InsensitiveScrollableResultSet.java:909)
at io.debezium.connector.oracle.logminer.LogMinerHelper.lambda$getSystime$0(LogMinerHelper.java:207)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:649)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517)
at io.debezium.connector.oracle.logminer.LogMinerHelper.getSystime(LogMinerHelper.java:205)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:202)
... 8 more
jar 冲突:ojdbc7.jar
soure表只有一条数据
flink lib 下没有这个包,根据任务日志发现有加载这个包,排查 jdk下面有没有:
find /usr/lib/jvm -name "ojdbc*"
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre/lib/ext/ojdbc7.jar
移除该包即可:
mv /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre/lib/ext/ojdbc7.jar ~/
异常3
2025-06-23 09:26:52
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: oracle_cdc_source[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: oracle_cdc_sink[3]' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:420)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access method oracle.sql.Datum.compareBytes([B[B)I from class com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter
at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.isChunkEndGeMax(OracleChunkSplitter.java:283)
at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.nextChunkEnd(OracleChunkSplitter.java:307)
at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.splitUnevenlySizedChunks(OracleChunkSplitter.java:249)
at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.splitTableIntoChunks(OracleChunkSplitter.java:181)
at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.generateSplits(OracleChunkSplitter.java:80)
at com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:178)
at com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:129)
at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:166)
at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:97)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$2(SourceCoordinator.java:230)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
... 8 more
与异常2一样,jar 冲突:ojdbc7.jar
但soure表数据量 ≥2
异常4
Caused by: io.debezium.DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
没有启用最小补充日志,启用最小补充日志:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
可通过下面的SQL查询是否开启:
SELECT
SUPPLEMENTAL_LOG_DATA_MIN, -- 最小补充日志
SUPPLEMENTAL_LOG_DATA_PK, -- 主键补充日志
SUPPLEMENTAL_LOG_DATA_UI, -- 唯一索引补充日志
SUPPLEMENTAL_LOG_DATA_FK, -- 外键补充日志
SUPPLEMENTAL_LOG_DATA_ALL -- 所有列补充日志
FROM V$DATABASE;
异常5
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table XE.FLINKUSER.CDC_SOURCE. Use command: ALTER TABLE FLINKUSER.CDC_SOURCE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
没有启用所有列补充日志,启用所有列补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
异常6
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]
... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database version
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:159) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]
... 8 more
Caused by: java.sql.SQLRecoverableException: Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_242]
at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_242]
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]
... 8 more
Caused by: oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_242]
at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_242]
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]
... 8 more
原因:database-name 填的是 SID 而非 Service Name,改成 Service Name 或者使用 url参数(官网最开始的文档里是没有 url参数的)
根本原因:Oracle CDC 源码根据配置的参数拼接Url时写死了格式 jdbc:oracle:thin:@//<host>:<port>/<service_name>
,而 SID 和 Service Name的格式是不一样的:
- 使用 SID 的 URL 格式:
jdbc:oracle:thin:@<host>:<port>:<SID>
- 使用 Service Name 的 URL 格式:
jdbc:oracle:thin:@//<host>:<port>/<service_name>
为了解决这个问题:在2.3.0 添加支持自定义url : Support custom url for incremental snapshot source https://github.com/ververica/flink-cdc-connectors/commit/4d9c0e41e169bf6cd8196a318c65fc965e002f57
异常7
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name
该异常出现在抽取增量数据时,原因为 database-name 填的小写,需要将其改为大写
-- 'database-name' = 'xe',
'database-name' = 'XE',
异常8
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
该异常为分区表 bug (不支持分区表),只有分区表才会有这个这个异常,相关链接:
https://github.com/apache/flink-cdc/issues/1737
https://developer.aliyun.com/ask/531328
https://github.com/apache/flink-cdc/pull/2479
https://github.com/apache/seatunnel/pull/8265
解决方法,参考上面两个PR修改源码 OracleConnectionUtils
, 重新打包:
会话数一直增加不释放
flink任务失败后,不释放jdbc线程数(部分异常),一直尝试连接一直失败,导致线程数剧增,超过可用线程总数,最终导致Oracle不可用。
原因为在Flink 重启策略和故障恢复策略中提到的,默认参数时流任务失败后会一直无限重试,可以通过添加重试次数解决:
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
具体哪个异常会导致该问题还没复现,等复现了会更新,因公众号文章修改字数有限制,可以通过点击文章底部阅读原文查看更新。
异常9
Caused by: Error : 1031, Position : 25, Sql = SELECT * FROM "FLINKUSER"."CDC_SOURCE" AS OF SCN 20645794535650, OriginalSql = SELECT * FROM "FLINKUSER"."CDC_SOURCE" AS OF SCN 20645794535650, Error Msg = ORA-01031: insufficient privileges
权限不足,通过SQL添加:
GRANT FLASHBACK ANY TABLE TO flinkuser ;
GRANT SELECT ANY TRANSACTION TO flinkuser ;
增量数据不同步
项目上碰到增量数据不同步问题,不报错,没找到原因就自己变好了,暂时未复现。
参数优化
对于大表,需要优化参数
'debezium.log.mining.batch.size.min' = '200000',
'debezium.log.mining.batch.size.max' = '50000000'
'scan.incremental.snapshot.enabled' = 'false',--默认是true,不修改数据获取很慢,每秒几十条
SET execution.checkpointing.timeout = 600000s;