Flink Oracle CDC 环境配置与验证

发布于:2025-07-05 ⋅ 阅读:(21) ⋅ 点赞:(0)
一、Oracle 数据库核心配置详解
1. 启用归档日志(Archiving Log)

Oracle CDC 依赖归档日志获取增量变更数据,需按以下步骤启用:

非CDB数据库配置:

-- 以DBA身份连接数据库  
CONNECT sys/password AS SYSDBA;  

-- 配置归档目标路径和大小  
ALTER SYSTEM SET db_recovery_file_dest_size = 10G;  
ALTER SYSTEM SET db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' SCOPE=SPFILE;  

-- 重启数据库并启用归档模式  
SHUTDOWN IMMEDIATE;  
STARTUP MOUNT;  
ALTER DATABASE ARCHIVELOG;  
ALTER DATABASE OPEN;  

-- 检查归档模式是否启用  
ARCHIVE LOG LIST;  
-- 输出应显示:Database log mode: Archive Mode  

CDB数据库配置(多租户架构):

-- 连接CDB根容器  
CONNECT sys/password@//localhost:1521/ORCLCDB AS SYSDBA;  

-- 配置归档路径(与非CDB类似)  
ALTER SYSTEM SET db_recovery_file_dest_size = 10G;  
ALTER SYSTEM SET db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' SCOPE=SPFILE;  
SHUTDOWN IMMEDIATE;  
STARTUP MOUNT;  
ALTER DATABASE ARCHIVELOG;  
ALTER DATABASE OPEN;  

-- 进入PDB容器(如ORCLPDB1)  
ALTER SESSION SET CONTAINER = ORCLPDB1;  
2. 启用补充日志(Supplemental Logging)

补充日志用于捕获数据变更的前后状态,需为目标表或数据库启用:

-- 为指定表启用补充日志(捕获所有列变更)  
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;  

-- 为整个数据库启用补充日志  
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;  
3. 创建专用用户并授权
-- 创建表空间(非CDB)  
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' 
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;  

-- 创建用户并授予基础权限(非CDB)  
CREATE USER flinkuser IDENTIFIED BY flinkpw 
DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;  
GRANT CREATE SESSION, SET CONTAINER, SELECT ON V_$DATABASE TO flinkuser;  
GRANT FLASHBACK ANY TABLE, SELECT ANY TABLE TO flinkuser;  
GRANT SELECT_CATALOG_ROLE, EXECUTE_CATALOG_ROLE TO flinkuser;  
GRANT SELECT ANY TRANSACTION, LOGMINING, ANALYZE ANY TO flinkuser;  

-- 授予LogMiner相关权限  
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;  
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;  

-- 授予视图查询权限(关键:读取日志元数据)  
GRANT SELECT ON V_$LOG, V_$LOG_HISTORY TO flinkuser;  
GRANT SELECT ON V_$LOGMNR_LOGS, V_$LOGMNR_CONTENTS TO flinkuser;  
GRANT SELECT ON V_$LOGMNR_PARAMETERS, V_$LOGFILE TO flinkuser;  
GRANT SELECT ON V_$ARCHIVED_LOG, V_$ARCHIVE_DEST_STATUS TO flinkuser;  

CDB数据库特殊配置:

-- 在CDB中创建用户时指定CONTAINER=ALL  
CREATE USER flinkuser IDENTIFIED BY flinkpw 
DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;  
GRANT CREATE SESSION, SET CONTAINER TO flinkuser CONTAINER=ALL;  
-- 其他权限同理添加CONTAINER=ALL后缀(如GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL)  
二、Flink 环境集成配置
1. 添加Maven依赖(项目开发)
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-oracle-cdc</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>
2. SQL Client部署(非Maven环境)
  1. 下载连接器JAR包:flink-sql-connector-oracle-cdc-3.0.1.jar
  2. 将JAR包放入$FLINK_HOME/lib/目录
  3. 重启Flink集群使依赖生效
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 设置checkpoint间隔(可选)  
SET 'execution.checkpointing.interval' = '5s';  

-- 创建Oracle CDC表(含元数据列)  
CREATE TABLE oracle_products (
    id INT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 3),
    -- 元数据列:捕获数据库变更信息  
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
    table_name STRING METADATA FROM 'table_name' VIRTUAL,
    op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = '192.168.1.100',
    'port' = '1521',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'ORCLCDB',
    'schema-name' = 'inventory',
    'table-name' = 'products',
    -- 关键参数详解  
    'debezium.log.mining.strategy' = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true',
    'scan.startup.mode' = 'initial',
    'scan.incremental.snapshot.enabled' = 'true'
);
2. 核心参数详解
参数名 必选 默认值 类型 说明
connector String 固定为oracle-cdc
hostname String Oracle服务器IP(若配置url,则可不填)
username String 连接Oracle的用户名(需具备前文授权的权限)
password String 连接Oracle的密码
database-name String 数据库名(如ORCLCDB
schema-name String 模式名(如inventory
table-name String 表名(如products
port 1521 Integer 数据库端口号
url 自动拼接 String JDBC连接串(优先级高于hostname+port),格式:jdbc:oracle:thin:@host:port/database
scan.startup.mode initial String 启动模式:initial(快照+redo日志)、latest-offset(仅最新变更)
scan.incremental.snapshot.enabled true Boolean 启用增量快照(并行读取,无需锁),建议保持默认
debezium.log.mining.strategy online_catalog String 日志挖掘策略:online_catalog(在线目录)、file_based(基于文件)
debezium.log.mining.continuous.mine true Boolean 持续挖掘日志(保持增量读取)
四、环境验证与测试流程
1. 准备测试数据(Oracle)
-- 创建测试表(假设已在inventory模式下)  
CREATE TABLE inventory.products (
    id INT PRIMARY KEY,
    name VARCHAR2(100),
    price NUMBER(10, 2),
    stock INT,
    update_time TIMESTAMP
);

-- 插入测试数据  
INSERT INTO inventory.products VALUES (1, '笔记本电脑', 5999.00, 100, SYSDATE);  
INSERT INTO inventory.products VALUES (2, '智能手机', 3999.00, 200, SYSDATE);  
COMMIT;
2. 使用Flink SQL验证数据同步
-- 查询Oracle CDC表(首次查询触发快照读取)  
SELECT * FROM oracle_products;  

-- 观察输出:应显示插入的两条记录  
-- 后续在Oracle中更新数据,Flink会实时捕获变更  
UPDATE inventory.products SET price = 6499.00 WHERE id = 1;  
COMMIT;
3. DataStream API 验证示例(并行模式)
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OracleCdcParallelExample {
    public static void main(String[] args) throws Exception {
        // 配置Oracle Source(并行增量快照模式)
        OracleSourceBuilder<String> sourceBuilder = OracleSourceBuilder.<String>builder()
                .hostname("192.168.1.100")
                .port(1521)
                .database("ORCLCDB")
                .schemaList("inventory")
                .tableList("inventory.products")
                .username("flinkuser")
                .password("flinkpw")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .splitSize(1000) // 快照分片大小
                .debeziumProperty("log.mining.strategy", "online_catalog");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 5秒checkpoint
        env.fromSource(
                sourceBuilder.build(),
                WatermarkStrategy.noWatermarks(),
                "Oracle CDC Source")
            .setParallelism(4) // 设置4并行度读取
            .print();
        
        env.execute("Oracle CDC Test");
    }
}
五、常见问题与解决方案
  1. 归档日志未启用错误

    ERROR: ORA-01232: archived log is disabled
    
    • 解决方案:确认已执行ALTER DATABASE ARCHIVELOG,并重启数据库使配置生效。
  2. 权限不足错误

    ERROR: ORA-01031: insufficient privileges
    
    • 解决方案:检查用户是否具备LOGMININGSELECT ANY TRANSACTION等关键权限,重新执行授权语句。
  3. 增量快照失败(无主键表)

    ERROR: Table has no primary key, cannot split snapshot chunks
    
    • 解决方案:为表添加主键,或手动指定分片键:
      'scan.incremental.snapshot.chunk.key-column' = 'id'  -- 替换为实际列名
      
  4. CDB/PDB连接失败

    • 解决方案:在Flink DDL中添加PDB名称:
      'debezium.database.pdb.name' = 'ORCLPDB1'  -- 替换为实际PDB名
      
  5. 快照阶段Checkpoint超时

    • 解决方案:调整Flink配置以避免大表快照时Checkpoint失败:
      SET 'execution.checkpointing.interval' = '10min';
      SET 'execution.checkpointing.tolerable-failed-checkpoints' = '100';
      
六、生产环境优化建议
  1. 归档日志清理策略

    • 配置自动删除过期归档日志:
      -- 创建归档日志删除策略(保留7天)  
      EXEC DBMS_BACKUP_RESTORE.DELETEARCHIVELOG(
          'OLDER THAN 7 DAYS', 
          'DELETE'
      );
      
  2. 连接池优化

    • 在Flink DDL中调整连接池大小:
      'connection.pool.size' = '30'  -- 根据并发需求调整
      
  3. 性能监控

    • 监控Oracle视图V$LOGMNR_CONTENTS确认日志挖掘状态,或通过Flink Web UI观察任务并行度与吞吐量。

通过以上步骤,可完成Flink Oracle CDC的全流程配置与验证。生产环境中需特别注意归档日志空间管理、CDB/PDB架构适配及大表快照的并行参数调优,以确保数据一致性和系统稳定性。


网站公告

今日签到

点亮在社区的每一天
去签到