在大数据处理领域,实现高效、实时的数据处理与分析至关重要。Flink作为强大的流批一体化计算框架,结合StarRocks这一高性能的实时分析型数据库,再搭配TiCDC(TiDB Change Data Capture)用于捕获数据变更,能够构建出极为高效的数据处理链路。本教程将详细介绍如何利用这些技术实现从MySQL数据源抽取数据,经Flink处理后写入StarRocks的完整流程,并对相关表结构和字段进行合理抽象与调整,以保障数据处理的通用性与安全性。
一、技术简介
Flink 1.20
Flink 1.20是Apache Flink的一个重要版本,它进一步强化了流批一体的计算能力。在流处理方面,其能够以低延迟处理大规模的实时数据流;而在批处理场景下,也具备高效的性能表现。Flink提供了丰富的连接器(Connector),方便与各类数据源和数据存储系统进行对接,同时支持使用SQL进行数据处理操作,大大降低了开发成本,提升了开发效率。
StarRocks
StarRocks是一款高性能的实时分析型数据库,采用MPP(Massively Parallel Processing)架构,能够对海量数据进行亚秒级的查询分析。它支持多种数据模型,包括聚合模型、主键模型等,适用于各类数据分析场景,如报表生成、实时看板、即席查询等。StarRocks通过其高效的存储和查询引擎,以及对多种数据格式的支持,为数据的快速分析提供了有力保障。
TiCDC
TiCDC是TiDB生态中的数据变更捕获工具,它基于TiDB的分布式事务和MVCC(Multi-Version Concurrency Control)机制,能够实时捕获TiDB数据库中的数据变更,包括增、删、改操作。TiCDC将这些变更数据以有序的方式输出,为数据同步、实时数据处理等场景提供了可靠的数据源。在本教程中,虽然我们主要从MySQL数据源抽取数据,但TiCDC的原理和应用思路可作为扩展参考,在涉及TiDB数据源时能够快速迁移应用。
二、环境准备
安装与配置Flink 1.20
- 下载Flink 1.20.0:通过curl命令下载安装包,执行
curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
。 - 解压文件:使用命令
tar -xzvf flink-1.20.0-bin-scala_2.12.tgz
解压下载的压缩包。 - 移动到目标目录(可选):可将解压后的Flink目录移动到
/opt
或其他目标位置,例如执行sudo mv flink-1.20.0 /opt/flink
。 - 配置环境变量:编辑
~/.bashrc
文件,添加如下内容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH
保存并退出文件后,运行 source ~/.bashrc
使修改生效。
5. 配置Flink:Flink默认已配置一些基本设置。若无需集群配置,可跳过 masters
和 workers
文件的配置。如需调整参数,如内存配置或其他作业配置,可修改Flink配置文件 config.yaml
,该文件位于 /opt/flink/conf
目录下。例如,将 bind-host
设置从 localhost
改为 0.0.0.0
,使Flink能够绑定所有网络接口,修改如下:
jobmanager:
bind-host: 0.0.0.0
rpc:
address: 0.0.0.0
port: 6123
memory:
process:
size: 1600m
execution:
failover-strategy: region
taskmanager:
bind-host: 0.0.0.0
host: 0.0.0.0
numberOfTaskSlots: 1
memory:
process:
size: 1728m
parallelism:
address: 0.0.0.0
bind-address: 0.0.0.0
- 启动Flink:进入Flink目录,执行
./bin/start-cluster.sh
启动Flink。若要关闭Flink,执行./bin/stop-cluster.sh
。启动后,可通过浏览器访问Flink Web UI,默认地址为http://<your_server_ip>:8081
(例如http://192.168.1.1:8081
),以查看Flink集群的状态、提交作业等。
安装与配置StarRocks
- 下载与部署:从StarRocks官方网站获取安装包,按照官方文档指引进行下载与解压操作。根据实际的生产环境需求,选择合适的部署方式,如单节点部署用于测试环境,集群部署用于生产环境。
- 配置参数:在StarRocks的配置文件中,对一些关键参数进行设置,如FE(Frontend)节点的内存分配、BE(Backend)节点的存储路径等。例如,在FE节点的
fe.conf
文件中设置query_mem_limit = 2147483648
来限制查询内存,在BE节点的be.conf
文件中设置storage_root_path = /data/starrocks/be
来指定存储路径。 - 启动服务:分别启动FE和BE节点,确保各个节点正常运行且相互通信正常。启动后,可通过MySQL客户端连接到StarRocks,验证其是否正常工作,例如执行
mysql -h <starrocks_fe_host> -P 9030 -u root -p
。
配置MySQL数据源
- 开启Binlog:确保MySQL开启了Binlog功能,在MySQL配置文件(通常为
my.cnf
或my.ini
)中,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
修改完成后,重启MySQL服务使配置生效。
2. 创建测试表:在MySQL中创建用于测试的数据表,例如创建一个名为 example_table
的表,表结构如下:
CREATE TABLE example_table (
id BIGINT NOT NULL,
data_column_1 VARCHAR(255),
data_column_2 INT,
PRIMARY KEY (id)
);
向表中插入一些测试数据,以便后续进行数据同步与处理测试。
三、表结构设计与调整
StarRocks表结构设计
在StarRocks中创建用于存储数据的表,以用户标签相关数据存储为例,设计如下表结构:
CREATE TABLE table_demo (
id BIGINT NOT NULL COMMENT '主键',
sign CHAR(32) NOT NULL COMMENT '签名',
shop_id BIGINT NOT NULL COMMENT 'shopID',
shop_type BIGINT NOT NULL COMMENT '类型',
user_id BIGINT NULL COMMENT 'userID',
create_time DATETIME NULL COMMENT '记录创建时间',
operation_type VARCHAR(20) COMMENT '操作类型',
row_change_type VARCHAR(20) COMMENT '行变更类型'
) ENGINE=OLAP
PRIMARY KEY (id)
COMMENT '用户商品表'
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES (
"replication_num" = "3",
"bloom_filter_columns" = "shop_id, user_id",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);
该表结构设计充分考虑了数据的存储与查询需求,通过主键约束、哈希分布以及相关属性设置,保障数据的高效存储与查询性能。
Flink中MySQL CDC表结构定义
在Flink中通过MySQL CDC连接器读取MySQL数据时,定义如下表结构:
CREATE TABLE mysql_cdc_example (
id BIGINT,
sign STRING COMMENT '签名',
shop_id BIGINT COMMENT 'shopID',
shop_type BIGINT COMMENT '类型',
user_id BIGINT COMMENT 'userID',
create_time TIMESTAMP(0),
operation_type STRING COMMENT '业务操作字段',
operation_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'operation_timestamp' VIRTUAL,
row_change_type STRING METADATA FROM 'row_change_type' VIRTUAL,
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH
(
'connector' ='mysql-cdc',
'hostname' = '192.168.0.1',
'port' = '3306',
'database-name' = 'your_database_name',
'table-name' = 'example_table',
'username' = 'your_username',
'password' = 'your_password',
'debezium.snapshot.mode' = 'initial'
);
该表结构定义与StarRocks中的目标表结构相对应,同时通过WITH参数配置了MySQL CDC连接器的相关信息,包括数据源地址、端口、数据库名、表名、用户名、密码以及快照模式等。
Flink中StarRocks Sink表结构定义
在Flink中定义用于将处理后数据写入StarRocks的Sink表结构如下:
CREATE TABLE starrocks_sink_example (
id BIGINT PRIMARY KEY NOT ENFORCED,
sign STRING,
shop_id BIGINT,
shop_type BIGINT,
user_id bigint,
create_time STRING,
operation_type STRING,
row_change_type STRING
)
WITH
(
'connector'='starrocks',
'sink.max-retries'='5',
'jdbc-url' = 'jdbc:mysql://192.168.0.1:9030/your_database_name?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai&sessionVariables=query_timeout=86400',
'load-url'='192.168.0.1:8030',
'table-name' = 'table_demo',
'username'='your_username',
'password'='your_password',
'sink.buffer-flush.interval-ms'='5000',
'sink.parallelism' = '2',
'database-name'='your_database_name'
);
此Sink表结构与StarRocks中的目标表结构一致,通过WITH参数配置了StarRocks连接器的相关信息,如JDBC URL、Load URL、表名、用户名、密码、缓冲刷新间隔以及并行度等,确保Flink能够将处理后的数据准确高效地写入StarRocks。
四、数据同步与处理流程
使用Flink SQL进行数据抽取与转换
- 配置Flink SQL环境:在Flink的SQL客户端或相关集成开发环境中,配置好Flink SQL的运行环境,确保能够执行SQL语句对数据进行操作。
- 编写数据抽取与转换SQL:编写SQL语句从MySQL CDC表中抽取数据,并进行必要的转换操作,例如将时间格式进行转换、根据业务规则对某些字段进行计算等。以下是一个简单的示例,将
create_time
字段从TIMESTAMP
类型转换为字符串类型,并根据operation_type
和row_change_type
字段确定最终的操作类型:
INSERT INTO
starrocks_sink_example
SELECT
id,
sign,
shop_id,
shop_type
user_id,
cast(create_time as CHAR) as create_time,
CASE
WHEN operation_type = 'DELETE' THEN 'DELETE'
WHEN row_change_type = '+I' THEN 'INSERT'
WHEN row_change_type IN ('-U', '+U') THEN 'UPDATE'
WHEN row_change_type = '-D' THEN 'DELETE'
ELSE 'UNKNOWN'
END AS operation_type,
row_change_type
FROM
mysql_cdc_example;
该SQL语句从 mysql_cdc_example
表中读取数据,对 create_time
字段进行类型转换,并根据不同的变更类型确定最终的 operation_type
,然后将处理后的数据插入到 starrocks_sink_example
表中。
使用Routine Load进行数据实时摄入(以Kafka数据源为例)
- 配置Kafka数据源:在Kafka中创建用于存储数据变更的主题,确保数据源能够正常向该主题发送数据。例如,创建一个名为
user_table_changes
的主题。 - 创建StarRocks的Routine Load任务:在StarRocks中创建Routine Load任务,用于实时消费Kafka主题中的数据并写入到StarRocks表中。以下是一个示例:
CREATE ROUTINE LOAD your_load_job_name ON table_demo
COLUMNS (
id,
sign,
shop_id,
shop_type,
user_id,
create_time,
operation_type,
row_change_type,
temp_operation_type=IF(operation_type = 'DELETE', 'DELETE', IF(operation_type = 'UPDATE', 'UPSERT', 'APPEND'))
)
PROPERTIES (
"desired_concurrent_number" = "1",
"max_batch_interval" = "10",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM
KAFKA (
"kafka_broker_list" = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092",
"kafka_topic" = "user_table_changes",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
该Routine Load任务配置了从Kafka主题 user_table_changes
中读取数据,按照指定的列映射关系写入到 user_table_mapping
表中,并设置了相关的属性,如期望的并发数、最大批次间隔、最大批次行数、最大批次大小、严格模式以及数据格式等。