Docker + PyFlink1.17 数据写入 MySQL

发布于:2025-06-16 ⋅ 阅读:(21) ⋅ 点赞:(0)

# PyFlink 数据写入 MySQL 项目文档

## 1. 项目概述

本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署,包含 Flink 集群(JobManager 和 TaskManager)以及 MySQL 数据库。

## 2. 环境要求

- Docker

- Docker Compose

- 操作系统:支持 Linux/MacOS/Windows

## 3. 镜像版本及配置

### 3.1 Flink 镜像

#### 基础镜像

- 官方镜像:`flink:1.17.0-scala_2.12-java11`

- 自定义镜像:`flink-scala_2.12-java11-py3.8`

#### Dockerfile 配置

```dockerfile

# 使用官方 Flink 基础镜像

FROM flink:1.17.0-scala_2.12-java11

# 设置环境变量

ENV PYTHON_VERSION=3.8 \

PYTHON_PIP_VERSION=23.2.1 \

FLINK_HOME=/opt/flink \

PATH="/opt/conda/bin:$PATH"

# 安装系统依赖

RUN apt-get update && \

apt-get install -y --no-install-recommends \

wget \

bzip2 \

ca-certificates \

curl \

openjdk-11-jdk-headless \

build-essential \

python3-dev

# 安装 Miniconda

RUN wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_23.5.2-0-Linux-x86_64.sh && \

/bin/bash Miniconda3-py38_23.5.2-0-Linux-x86_64.sh -b -p /opt/conda && \

rm Miniconda3-py38_23.5.2-0-Linux-x86_64.sh

# 安装 PyFlink 及依赖

RUN pip install --no-cache-dir \

"numpy>=1.21.4,<1.22.0" \

"pandas>=1.3.0,<1.4.0" \

apache-flink==1.17.0

# 设置环境变量

ENV PYFLINK_PYTHON=/opt/conda/bin/python \

PYFLINK_DRIVER_EXECUTABLE=/opt/conda/bin/python

```

### 3.2 MySQL 镜像

- 版本:`mysql:8.0`

- 端口映射:3306:3306

- 环境变量:

- MYSQL_ROOT_PASSWORD=root

- MYSQL_DATABASE=test

## 4. 容器部署

### 4.1 构建自定义 Flink 镜像

```bash

# 在 docker_file 目录下执行

docker build -t flink-scala_2.12-java11-py3.8 -f docker_file_flink_1.17_scala_212_java11_py38 .

```

### 4.2 Docker Compose 配置

```yaml

version: '3'

services:

jobmanager:

image: flink-scala_2.12-java11-py3.8

ports:

- "8081:8081"

command: jobmanager

environment:

- JOB_MANAGER_RPC_ADDRESS=jobmanager

volumes:

- ./flink/custom_jars:/opt/flink/lib

networks:

- flink-network

taskmanager:

image: flink-scala_2.12-java11-py3.8

depends_on:

- jobmanager

command: taskmanager

environment:

- JOB_MANAGER_RPC_ADDRESS=jobmanager

volumes:

- ./flink/custom_jars:/opt/flink/lib

networks:

- flink-network

networks:

flink-network:

driver: bridge

```

### 4.3 启动服务

```bash

# 启动 Flink 集群

docker-compose up -d

# 启动 MySQL(需要连接到 Flink 网络)

docker run --name mysql-flink \

-e MYSQL_ROOT_PASSWORD=root \

-e MYSQL_DATABASE=test \

-p 3306:3306 \

--network flink-network \

-d mysql:8.0

```

### 4.4 网络配置说明

#### 4.4.1 网络架构

- 创建了名为 `flink-network` 的桥接网络

- JobManager 和 TaskManager 通过该网络通信

- MySQL 容器也连接到该网络,确保与 Flink 集群互通

#### 4.4.2 网络连接验证

```bash

# 查看网络列表

docker network ls

# 查看网络详情

docker network inspect flink-network

# 测试网络连通性(在 JobManager 容器中)

docker exec -it docker_utils-jobmanager-1 ping mysql-flink

```

#### 4.4.3 关键网络配置点

1. Flink 集群内部通信

- JobManager 和 TaskManager 通过 `flink-network` 网络通信

- 使用服务名 `jobmanager` 作为 RPC 地址

2. MySQL 连接配置

- MySQL 容器通过 `--network flink-network` 连接到 Flink 网络

- 在 PyFlink 代码中使用 `mysql-flink` 作为主机名连接 MySQL

```python

.with_url("jdbc:mysql://mysql-flink:3306/test")

```

3. 端口映射

- Flink Web UI: 8081:8081

- MySQL: 3306:3306

#### 4.4.4 网络故障排查

1. 检查网络连接

```bash

# 查看容器网络连接状态

docker network inspect flink-network

# 检查容器日志

docker logs docker_utils-jobmanager-1

docker logs docker_utils-taskmanager-1

docker logs mysql-flink

```

2. 常见网络问题

- 容器间无法通信:检查是否在同一网络

- 连接超时:检查服务名是否正确

- 端口冲突:检查端口映射

### 4.5 JobManager 与 TaskManager 分离部署说明

- **必须分开部署**:JobManager 和 TaskManager 必须部署在不同容器中。

- **原因**:

1. **资源隔离**:JobManager 负责任务调度和资源管理,TaskManager 负责实际执行任务,合并会导致资源竞争。

2. **高可用性**:JobManager 故障时,TaskManager 需继续运行,合并会导致单点故障。

3. **扩展性**:TaskManager 需根据任务负载动态扩展,合并会限制扩展能力。

- **部署要求**:至少 1 个 JobManager 容器 + 至少 1 个 TaskManager 容器。

## 5. MySQL 配置

### 5.1 创建用户和授权

```sql

CREATE USER IF NOT EXISTS 'root'@'%' IDENTIFIED BY 'root';

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;

FLUSH PRIVILEGES;

```

### 5.2 创建数据表

```sql

CREATE TABLE IF NOT EXISTS test.users (

id INT PRIMARY KEY,

name VARCHAR(255),

age INT

);

```

## 6. PyFlink 配置

### 6.1 依赖配置

- PyFlink 版本:1.17.0

- 必需依赖:

- numpy>=1.21.4,<1.22.0

- pandas>=1.3.0,<1.4.0

- apache-flink==1.17.0

### 6.2 JDBC 连接器

- 将以下 jar 包放置在 `flink/custom_jars` 目录:

1. Flink JDBC 连接器:

- 文件名:`flink-connector-jdbc-3.1.1-1.17.jar`

- 作用:提供 Flink DataStream API 与 JDBC 数据库的连接能力

- 版本要求:与 Flink 1.17.0 兼容

- 下载地址:

- Maven 中央仓库:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/

- 直接下载链接:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar

- 注意事项:

- 确保与 Flink 1.17.0 兼容

- 使用 DataStream API 的连接器版本

- 文件权限:确保 jar 包在容器中有正确的读取权限

2. MySQL JDBC 驱动:

- 文件名:`mysql-connector-java-8.0.28.jar`

- 作用:提供 MySQL 数据库连接支持

- 版本要求:与 MySQL 8.0 兼容

- 下载地址:

- Maven 中央仓库:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/

- 直接下载链接:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

- 下载方式:

1. 使用 wget 命令下载:

```bash

# 创建目录

mkdir -p flink/custom_jars

cd flink/custom_jars

# 下载 Flink JDBC 连接器

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar

# 下载 MySQL JDBC 驱动

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

```

2. 使用 curl 命令下载:

```bash

# 创建目录

mkdir -p flink/custom_jars

cd flink/custom_jars

# 下载 Flink JDBC 连接器

curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar

# 下载 MySQL JDBC 驱动

curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

```

- 目录结构:

```

flink/

└── custom_jars/

├── flink-connector-jdbc-3.1.1-1.17.jar

└── mysql-connector-java-8.0.28.jar

```

- 部署位置:

1. JobManager 容器(docker_utils-jobmanager-1):

- 路径:`/opt/flink/lib/`

- 包含:两个 jar 包都需要

- 用途:用于提交和执行 PyFlink 作业

2. TaskManager 容器(docker_utils-taskmanager-1):

- 路径:`/opt/flink/lib/`

- 包含:两个 jar 包都需要

- 用途:用于执行具体的任务

3. MySQL 容器(mysql-flink):

- 不需要部署任何 jar 包

- 只提供数据库服务

- 部署方式:

```yaml

# docker-compose.yml 中的配置

volumes:

- ./flink/custom_jars:/opt/flink/lib

```

这个配置会自动将 jar 包挂载到 JobManager 和 TaskManager 容器中。

- 注意事项:

1. 两个 jar 包都必须放在 `/opt/flink/lib` 目录下

2. 确保 jar 包版本与 Flink 和 MySQL 版本兼容

3. 使用 DataStream API 时,需要使用对应的 JDBC 连接器

4. 不要使用 Table API 的连接器(flink-connector-jdbc-table_2.12-1.17.0.jar)

5. 确保 JobManager 和 TaskManager 容器都能访问到这些 jar 包

## 7. PyFlink 代码示例

```python

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.common.typeinfo import Types

from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions

def main():

# 创建执行环境

env = StreamExecutionEnvironment.get_execution_environment()

# 示例数据

sample_data = [(1, "John", 30), (2, "Alice", 25)]

# 定义类型信息

type_info = Types.ROW([Types.INT(), Types.STRING(), Types.INT()])

# 创建数据流

data_stream = env.from_collection(sample_data, type_info=type_info)

# SQL 语句

sql = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"

# 配置 JDBC sink

jdbc_sink = JdbcSink.sink(

sql,

type_info,

JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.with_url("jdbc:mysql://mysql-flink:3306/test")

.with_driver_name("com.mysql.cj.jdbc.Driver")

.with_user_name("root")

.with_password("root")

.build(),

JdbcExecutionOptions.builder()

.with_batch_size(1000)

.with_batch_interval_ms(200)

.with_max_retries(5)

.build()

)

# 添加 sink

data_stream.add_sink(jdbc_sink)

# 执行作业

env.execute("Flink MySQL Output Job")

if __name__ == '__main__':

main()

```

## 8. 执行步骤

1. 复制 PyFlink 脚本到 JobManager 容器

```bash

docker cp flink/test_case/test_flink_simply_output_to_mysql.py docker_utils-jobmanager-1:/opt/flink/

```

2. 进入 JobManager 容器

```bash

docker exec -it docker_utils-jobmanager-1 bash

```

3. 执行 PyFlink 脚本

```bash

flink run -py /opt/flink/test_flink_simply_output_to_mysql.py

```

4. 验证数据写入

```bash

docker exec -i mysql-flink mysql -uroot -proot -e "SELECT * FROM test.users;"

```

## 9. 注意事项

1. 确保 MySQL JDBC 驱动 jar 包已正确放置在 `flink/custom_jars` 目录

2. 检查 MySQL 用户权限配置是否正确

3. 确保 Flink 集群和 MySQL 容器网络互通

4. 注意 PyFlink 版本与 Flink 版本匹配

5. 确保 Python 环境变量正确配置

## 10. 常见问题解决

1. 连接 MySQL 失败

- 检查 MySQL 用户权限

- 验证网络连接

- 确认 MySQL 容器状态

2. PyFlink 执行错误

- 检查 Python 环境

- 验证依赖包版本

- 查看 Flink 日志

3. 数据写入失败

- 检查表结构

- 验证数据类型匹配

- 查看 MySQL 错误日志


网站公告

今日签到

点亮在社区的每一天
去签到