# PyFlink 数据写入 MySQL 项目文档
## 1. 项目概述
本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署,包含 Flink 集群(JobManager 和 TaskManager)以及 MySQL 数据库。
## 2. 环境要求
- Docker
- Docker Compose
- 操作系统:支持 Linux/MacOS/Windows
## 3. 镜像版本及配置
### 3.1 Flink 镜像
#### 基础镜像
- 官方镜像:`flink:1.17.0-scala_2.12-java11`
- 自定义镜像:`flink-scala_2.12-java11-py3.8`
#### Dockerfile 配置
```dockerfile
# 使用官方 Flink 基础镜像
FROM flink:1.17.0-scala_2.12-java11
# 设置环境变量
ENV PYTHON_VERSION=3.8 \
PYTHON_PIP_VERSION=23.2.1 \
FLINK_HOME=/opt/flink \
PATH="/opt/conda/bin:$PATH"
# 安装系统依赖
RUN apt-get update && \
apt-get install -y --no-install-recommends \
wget \
bzip2 \
ca-certificates \
curl \
openjdk-11-jdk-headless \
build-essential \
python3-dev
# 安装 Miniconda
RUN wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_23.5.2-0-Linux-x86_64.sh && \
/bin/bash Miniconda3-py38_23.5.2-0-Linux-x86_64.sh -b -p /opt/conda && \
rm Miniconda3-py38_23.5.2-0-Linux-x86_64.sh
# 安装 PyFlink 及依赖
RUN pip install --no-cache-dir \
"numpy>=1.21.4,<1.22.0" \
"pandas>=1.3.0,<1.4.0" \
apache-flink==1.17.0
# 设置环境变量
ENV PYFLINK_PYTHON=/opt/conda/bin/python \
PYFLINK_DRIVER_EXECUTABLE=/opt/conda/bin/python
```
### 3.2 MySQL 镜像
- 版本:`mysql:8.0`
- 端口映射:3306:3306
- 环境变量:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_DATABASE=test
## 4. 容器部署
### 4.1 构建自定义 Flink 镜像
```bash
# 在 docker_file 目录下执行
docker build -t flink-scala_2.12-java11-py3.8 -f docker_file_flink_1.17_scala_212_java11_py38 .
```
### 4.2 Docker Compose 配置
```yaml
version: '3'
services:
jobmanager:
image: flink-scala_2.12-java11-py3.8
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./flink/custom_jars:/opt/flink/lib
networks:
- flink-network
taskmanager:
image: flink-scala_2.12-java11-py3.8
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./flink/custom_jars:/opt/flink/lib
networks:
- flink-network
networks:
flink-network:
driver: bridge
```
### 4.3 启动服务
```bash
# 启动 Flink 集群
docker-compose up -d
# 启动 MySQL(需要连接到 Flink 网络)
docker run --name mysql-flink \
-e MYSQL_ROOT_PASSWORD=root \
-e MYSQL_DATABASE=test \
-p 3306:3306 \
--network flink-network \
-d mysql:8.0
```
### 4.4 网络配置说明
#### 4.4.1 网络架构
- 创建了名为 `flink-network` 的桥接网络
- JobManager 和 TaskManager 通过该网络通信
- MySQL 容器也连接到该网络,确保与 Flink 集群互通
#### 4.4.2 网络连接验证
```bash
# 查看网络列表
docker network ls
# 查看网络详情
docker network inspect flink-network
# 测试网络连通性(在 JobManager 容器中)
docker exec -it docker_utils-jobmanager-1 ping mysql-flink
```
#### 4.4.3 关键网络配置点
1. Flink 集群内部通信
- JobManager 和 TaskManager 通过 `flink-network` 网络通信
- 使用服务名 `jobmanager` 作为 RPC 地址
2. MySQL 连接配置
- MySQL 容器通过 `--network flink-network` 连接到 Flink 网络
- 在 PyFlink 代码中使用 `mysql-flink` 作为主机名连接 MySQL
```python
.with_url("jdbc:mysql://mysql-flink:3306/test")
```
3. 端口映射
- Flink Web UI: 8081:8081
- MySQL: 3306:3306
#### 4.4.4 网络故障排查
1. 检查网络连接
```bash
# 查看容器网络连接状态
docker network inspect flink-network
# 检查容器日志
docker logs docker_utils-jobmanager-1
docker logs docker_utils-taskmanager-1
docker logs mysql-flink
```
2. 常见网络问题
- 容器间无法通信:检查是否在同一网络
- 连接超时:检查服务名是否正确
- 端口冲突:检查端口映射
### 4.5 JobManager 与 TaskManager 分离部署说明
- **必须分开部署**:JobManager 和 TaskManager 必须部署在不同容器中。
- **原因**:
1. **资源隔离**:JobManager 负责任务调度和资源管理,TaskManager 负责实际执行任务,合并会导致资源竞争。
2. **高可用性**:JobManager 故障时,TaskManager 需继续运行,合并会导致单点故障。
3. **扩展性**:TaskManager 需根据任务负载动态扩展,合并会限制扩展能力。
- **部署要求**:至少 1 个 JobManager 容器 + 至少 1 个 TaskManager 容器。
## 5. MySQL 配置
### 5.1 创建用户和授权
```sql
CREATE USER IF NOT EXISTS 'root'@'%' IDENTIFIED BY 'root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;
```
### 5.2 创建数据表
```sql
CREATE TABLE IF NOT EXISTS test.users (
id INT PRIMARY KEY,
name VARCHAR(255),
age INT
);
```
## 6. PyFlink 配置
### 6.1 依赖配置
- PyFlink 版本:1.17.0
- 必需依赖:
- numpy>=1.21.4,<1.22.0
- pandas>=1.3.0,<1.4.0
- apache-flink==1.17.0
### 6.2 JDBC 连接器
- 将以下 jar 包放置在 `flink/custom_jars` 目录:
1. Flink JDBC 连接器:
- 文件名:`flink-connector-jdbc-3.1.1-1.17.jar`
- 作用:提供 Flink DataStream API 与 JDBC 数据库的连接能力
- 版本要求:与 Flink 1.17.0 兼容
- 下载地址:
- Maven 中央仓库:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
- 直接下载链接:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
- 注意事项:
- 确保与 Flink 1.17.0 兼容
- 使用 DataStream API 的连接器版本
- 文件权限:确保 jar 包在容器中有正确的读取权限
2. MySQL JDBC 驱动:
- 文件名:`mysql-connector-java-8.0.28.jar`
- 作用:提供 MySQL 数据库连接支持
- 版本要求:与 MySQL 8.0 兼容
- 下载地址:
- Maven 中央仓库:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/
- 直接下载链接:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
- 下载方式:
1. 使用 wget 命令下载:
```bash
# 创建目录
mkdir -p flink/custom_jars
cd flink/custom_jars
# 下载 Flink JDBC 连接器
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# 下载 MySQL JDBC 驱动
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
```
2. 使用 curl 命令下载:
```bash
# 创建目录
mkdir -p flink/custom_jars
cd flink/custom_jars
# 下载 Flink JDBC 连接器
curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# 下载 MySQL JDBC 驱动
curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
```
- 目录结构:
```
flink/
└── custom_jars/
├── flink-connector-jdbc-3.1.1-1.17.jar
└── mysql-connector-java-8.0.28.jar
```
- 部署位置:
1. JobManager 容器(docker_utils-jobmanager-1):
- 路径:`/opt/flink/lib/`
- 包含:两个 jar 包都需要
- 用途:用于提交和执行 PyFlink 作业
2. TaskManager 容器(docker_utils-taskmanager-1):
- 路径:`/opt/flink/lib/`
- 包含:两个 jar 包都需要
- 用途:用于执行具体的任务
3. MySQL 容器(mysql-flink):
- 不需要部署任何 jar 包
- 只提供数据库服务
- 部署方式:
```yaml
# docker-compose.yml 中的配置
volumes:
- ./flink/custom_jars:/opt/flink/lib
```
这个配置会自动将 jar 包挂载到 JobManager 和 TaskManager 容器中。
- 注意事项:
1. 两个 jar 包都必须放在 `/opt/flink/lib` 目录下
2. 确保 jar 包版本与 Flink 和 MySQL 版本兼容
3. 使用 DataStream API 时,需要使用对应的 JDBC 连接器
4. 不要使用 Table API 的连接器(flink-connector-jdbc-table_2.12-1.17.0.jar)
5. 确保 JobManager 和 TaskManager 容器都能访问到这些 jar 包
## 7. PyFlink 代码示例
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
def main():
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 示例数据
sample_data = [(1, "John", 30), (2, "Alice", 25)]
# 定义类型信息
type_info = Types.ROW([Types.INT(), Types.STRING(), Types.INT()])
# 创建数据流
data_stream = env.from_collection(sample_data, type_info=type_info)
# SQL 语句
sql = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
# 配置 JDBC sink
jdbc_sink = JdbcSink.sink(
sql,
type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url("jdbc:mysql://mysql-flink:3306/test")
.with_driver_name("com.mysql.cj.jdbc.Driver")
.with_user_name("root")
.with_password("root")
.build(),
JdbcExecutionOptions.builder()
.with_batch_size(1000)
.with_batch_interval_ms(200)
.with_max_retries(5)
.build()
)
# 添加 sink
data_stream.add_sink(jdbc_sink)
# 执行作业
env.execute("Flink MySQL Output Job")
if __name__ == '__main__':
main()
```
## 8. 执行步骤
1. 复制 PyFlink 脚本到 JobManager 容器
```bash
docker cp flink/test_case/test_flink_simply_output_to_mysql.py docker_utils-jobmanager-1:/opt/flink/
```
2. 进入 JobManager 容器
```bash
docker exec -it docker_utils-jobmanager-1 bash
```
3. 执行 PyFlink 脚本
```bash
flink run -py /opt/flink/test_flink_simply_output_to_mysql.py
```
4. 验证数据写入
```bash
docker exec -i mysql-flink mysql -uroot -proot -e "SELECT * FROM test.users;"
```
## 9. 注意事项
1. 确保 MySQL JDBC 驱动 jar 包已正确放置在 `flink/custom_jars` 目录
2. 检查 MySQL 用户权限配置是否正确
3. 确保 Flink 集群和 MySQL 容器网络互通
4. 注意 PyFlink 版本与 Flink 版本匹配
5. 确保 Python 环境变量正确配置
## 10. 常见问题解决
1. 连接 MySQL 失败
- 检查 MySQL 用户权限
- 验证网络连接
- 确认 MySQL 容器状态
2. PyFlink 执行错误
- 检查 Python 环境
- 验证依赖包版本
- 查看 Flink 日志
3. 数据写入失败
- 检查表结构
- 验证数据类型匹配
- 查看 MySQL 错误日志