Apache Flink 1.14.6 Standalone 集群部署指南(适配 Hadoop 环境)

发布于:2025-08-19 ⋅ 阅读:(12) ⋅ 点赞:(0)

集群规划

节点角色 主机名 IP 地址 运行服务
JobManager hadoop1 192.168.16.219 JobManager, Web UI
TaskManager hadoop2 192.168.16.67 TaskManager
TaskManager hadoop3 192.168.16.249 TaskManager

环境准备(所有节点)

1. 安装 JDK 11(如未安装)

sudo yum install -y java-11-openjdk-devel
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk' | sudo tee /etc/profile.d/java.sh
source /etc/profile.d/java.sh

2. 配置主机解析(所有节点)

sudo tee -a /etc/hosts << EOF
192.168.16.219 hadoop1
192.168.16.67 hadoop2
192.168.16.249 hadoop3
EOF

Flink 安装与配置(在 JobManager 节点 hadoop1 操作)

1. 下载并解压 Flink

wget https://archive.apache.org/dist/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz -C /opt/
sudo ln -s /opt/flink-1.14.6 /opt/flink

2. 配置环境变量

sudo tee /etc/profile.d/flink.sh << 'EOF'
export FLINK_HOME=/opt/flink
export PATH=$PATH:$FLINK_HOME/bin
EOF
source /etc/profile

3. 配置 Flink 核心文件

flink-conf.yaml
sudo tee $FLINK_HOME/conf/flink-conf.yaml << 'EOF'
# 基础配置
jobmanager.rpc.address: hadoop1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2

# 网络配置
taskmanager.host: 0.0.0.0
rest.address: hadoop1
rest.port: 8081

# 检查点配置
execution.checkpointing.interval: 10000
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop1:9000/flink/checkpoints

# 与Hadoop集成
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop
classloader.resolve-order: parent-first
EOF
masters
sudo tee $FLINK_HOME/conf/masters << 'EOF'
hadoop1:8081
EOF
workers
sudo tee $FLINK_HOME/conf/workers << 'EOF'
hadoop2
hadoop3
EOF

4. 配置 Hadoop 集成

# 复制 Hadoop 配置文件到 Flink
sudo cp /usr/local/hadoop/etc/hadoop/core-site.xml $FLINK_HOME/conf/
sudo cp /usr/local/hadoop/etc/hadoop/hdfs-site.xml $FLINK_HOME/conf/

分发 Flink 到所有节点

# 分发到 TaskManager 节点
scp -r /opt/flink-1.14.6 hadoop2:/opt/
scp -r /opt/flink-1.14.6 hadoop3:/opt/

# 创建符号链接
ssh hadoop2 "ln -s /opt/flink-1.14.6 /opt/flink"
ssh hadoop3 "ln -s /opt/flink-1.14.6 /opt/flink"

# 分发环境变量
scp /etc/profile.d/flink.sh hadoop2:/etc/profile.d/
scp /etc/profile.d/flink.sh hadoop3:/etc/profile.d/

# 应用环境变量
ssh hadoop2 "source /etc/profile"
ssh hadoop3 "source /etc/profile"

创建 HDFS 目录(在 hadoop1 操作)

hdfs dfs -mkdir -p /flink/checkpoints
hdfs dfs -chmod 777 /flink/checkpoints

启动 Flink 集群(在 hadoop1 操作)

# 启动集群
$FLINK_HOME/bin/start-cluster.sh

# 启动历史服务器(可选)
$FLINK_HOME/bin/historyserver.sh start

验证集群状态

1. 检查进程

# hadoop1 (JobManager)
jps | grep -E 'StandaloneSessionClusterEntrypoint|HistoryServer'

# hadoop2 和 hadoop3 (TaskManager)
jps | grep TaskManager

2. Web UI 访问

http://192.168.16.219:8081

3. 命令行验证

# 查看集群状态
$FLINK_HOME/bin/flink list

# 查看 TaskManager
$FLINK_HOME/bin/flink run -m hadoop1:8081 $FLINK_HOME/examples/streaming/WordCount.jar

运行测试作业

1. 提交 WordCount 示例

$FLINK_HOME/bin/flink run \
  -m hadoop1:8081 \
  $FLINK_HOME/examples/streaming/WordCount.jar

2. 提交 Socket 流处理示例

# 第一个终端:启动数据源
nc -lk 9999

# 第二个终端:提交作业
$FLINK_HOME/bin/flink run \
  -m hadoop1:8081 \
  $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \
  --port 9999

集群管理命令

常用命令

# 启动集群
$FLINK_HOME/bin/start-cluster.sh

# 停止集群
$FLINK_HOME/bin/stop-cluster.sh

# 启动历史服务器
$FLINK_HOME/bin/historyserver.sh start

# 停止历史服务器
$FLINK_HOME/bin/historyserver.sh stop

作业管理

# 提交作业
$FLINK_HOME/bin/flink run -m hadoop1:8081 /path/to/job.jar

# 取消作业
$FLINK_HOME/bin/flink cancel <jobid>

# 保存点操作
$FLINK_HOME/bin/flink savepoint <jobid> hdfs://hadoop1:9000/flink/savepoints

集成 Hadoop YARN(可选)

1. 配置 YARN 模式

# 设置环境变量
echo 'export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop' | sudo tee -a /etc/profile.d/flink.sh
source /etc/profile

# 测试 YARN 集成
$FLINK_HOME/bin/flink run-application -t yarn-application \
  -Djobmanager.memory.process.size=2048m \
  -Dtaskmanager.memory.process.size=4096m \
  $FLINK_HOME/examples/streaming/WordCount.jar

2. YARN 常用命令

# 查看 YARN 应用
yarn application -list

# 停止 Flink YARN 应用
yarn application -kill <applicationid>

故障排查

1. 查看日志

# JobManager 日志
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log

# TaskManager 日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log

2. 常见问题解决

端口冲突
# 查看端口占用
netstat -tunlp | grep <端口号>
HDFS 权限问题
# 修复 HDFS 权限
hdfs dfs -chmod -R 777 /flink
内存配置错误
# 调整 flink-conf.yaml
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m

生产环境建议

1. 安全配置

# flink-conf.yaml
security.ssl.enabled: true
security.ssl.keystore: /path/to/keystore.jks
security.ssl.truststore: /path/to/truststore.jks

2. 高可用配置

# flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop1:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop1:2181,hadoop2:2181,hadoop3:2181

3. 监控配置

# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

4. 资源隔离

# 为 Flink 创建专用用户
sudo useradd -m -s /bin/bash flink
sudo chown -R flink:flink /opt/flink-1.14.6

卸载 Flink

# 停止服务
$FLINK_HOME/bin/stop-cluster.sh
$FLINK_HOME/bin/historyserver.sh stop

# 删除文件
sudo rm -rf /opt/flink-1.14.6
sudo rm /opt/flink
sudo rm /etc/profile.d/flink.sh

# 清理 HDFS
hdfs dfs -rm -r /flink

版本升级

# 1. 停止当前集群
$FLINK_HOME/bin/stop-cluster.sh

# 2. 备份配置
cp -r $FLINK_HOME/conf /tmp/flink-conf-backup

# 3. 下载新版本
wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz

# 4. 解压并迁移配置
tar -zxvf flink-1.15.0-bin-scala_2.12.tgz -C /opt/
cp /tmp/flink-conf-backup/* /opt/flink-1.15.0/conf/

# 5. 更新符号链接
ln -sfn /opt/flink-1.15.0 /opt/flink

# 6. 启动新集群
$FLINK_HOME/bin/start-cluster.sh

通过以上步骤,您可以在已有的 Hadoop 集群上成功部署 Apache Flink 1.14.6 Standalone 集群,并实现与 HDFS 的集成。</端口号></applicationid></jobid></jobid>


网站公告

今日签到

点亮在社区的每一天
去签到