Centos7安装Flink+Kafka+Zookeeper并允许外部访问

发布于:2025-06-26 ⋅ 阅读:(19) ⋅ 点赞:(0)

本文将手把手指导如何在 CentOS 7 系统中安装并配置 JDK、Apache Zookeeper、Apache Kafka 以及 Apache Flink,实现分布式流处理平台的基本搭建,并支持外部访问。

业务场景

随着互联网金融的发展,信用卡支付场景日益多样化,用户的交易行为呈现出高频、碎片化特征,欺诈行为也愈发隐蔽与智能化。传统的离线风控手段已难以及时识别异常交易,导致欺诈风险上升,信用卡机构面临严重的资金损失与用户信任危机。

为此,构建了一套基于 Apache Flink 的 实时反欺诈指标计算平台,以 Kafka 为数据传输中枢,结合 Flink 的流处理能力,实时处理交易数据、行为日志,提取用户行为特征,执行规则判断及模型计算,最终将欺诈风险评分结果快速写入 Redis,供风控系统和后台展示查询,实现毫秒级预警与实时干预。

该平台具备高吞吐、低延迟、可扩展、可插拔等特点,可支持多业务线并发接入,保障金融交易系统在复杂环境下的安全与稳定。

技术架构图

实时计算逻辑
Flink 集群
Kafka 集群
业务系统
规则匹配
特征提取
欺诈模型判断
Redis 实时指标库
Flink JobManager
Flink TaskManager 1
Flink TaskManager 2
Flink TaskManager N
Kafka Broker 1
Kafka Broker 2
Kafka Broker 3
Zookeeper 1
Zookeeper 2
Zookeeper 3
Kafka Producer
交易服务/风控系统
可视化大屏/风控后台
邮件/短信/钉钉告警

🧱 一、环境准备

✅ 1. 更新系统

sudo yum update -y

✅ 2. 安装 wget 和 unzip

sudo yum install wget unzip -y

☕ 二、安装 JDK(OpenJDK 8)

Flink、Kafka、Zookeeper 都需依赖 Java 环境。

安装命令:

sudo yum install java-1.8.0-openjdk-devel -y

验证安装:

java -version

输出应类似于:

openjdk version "1.8.0_xx"

🐘 三、安装 Zookeeper(Apache Zookeeper 3.8.4)

Zookeeper 是 Kafka 的依赖组件。

✅ 1. 下载并解压

cd /opt
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin zookeeper

✅ 2. 创建配置文件

cd /opt/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg

默认配置即可用于测试,也可调整如下:

tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181

✅ 3. 创建数据目录并启动

mkdir /opt/zookeeper/data
bin/zkServer.sh start

✅ 4. 开放端口

sudo firewall-cmd --permanent --add-port=2181/tcp
sudo firewall-cmd --reload

🦄 四、安装 Kafka(Kafka 2.2.1 + Scala 2.12)

✅ 1. 下载并解压

cd /opt
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -zxvf kafka_2.12-2.2.1.tgz
mv kafka_2.12-2.2.1 kafka

✅ 2. 修改配置文件

编辑 config/server.properties

# 修改以下项
broker.id=0
# 服务器的ip
listeners=PLAINTEXT://192.168.206.133:9092
zookeeper.connect=localhost:2181
log.dirs=/opt/kafka/logs

✅ 3. 启动 Kafka

先启动 Zookeeper(如果未启动):

/opt/zookeeper/bin/zkServer.sh start

再启动 Kafka:

cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

✅ 4. 开放 Kafka 端口

sudo firewall-cmd --permanent --add-port=9092/tcp
sudo firewall-cmd --reload

🌊 五、安装 Flink(Flink 1.18.0)

✅ 1. 下载并解压

cd /opt
wget https://downloads.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
mv flink-1.18.0 flink

✅ 2. 修改配置文件

编辑 conf/flink-conf.yaml

jobmanager.rpc.address: 0.0.0.0
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
rest.port: 8081

如果部署为集群,可设置:

taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

✅ 3. 启动 Flink

cd /opt/flink
bin/start-cluster.sh

✅ 4. 验证启动状态

jps

你应能看到:

StandaloneSessionClusterEntrypoint
TaskManagerRunner

✅ 5. 开放 Flink Web UI 端口

sudo firewall-cmd --permanent --add-port=8081/tcp
sudo firewall-cmd --reload

✅ 6. 访问 Flink

浏览器打开:

http://your_server_ip:8081/

📌 六、测试 Kafka 与 Flink 连通性

Flink 任务运行时需要的 Kafka 连接器类在远程 Flink 集群中不存在。需要在远程 Flink 集群安装 Kafka 连接器

# 下载 Kafka 连接器 JAR 文件
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.1.0-1.18/flink-connector-kafka-3.1.0-1.18.jar
cp flink-connector-kafka-3.1.0-1.18.jar /opt/flink/lib/

# 下载 flink-connector-base(如果没有)
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base/1.18.0/flink-connector-base-1.18.0.jar
cp flink-connector-base-1.18.0.jar /opt/flink/lib/

# 下载 kafka-clients(如果版本不匹配)
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.6.0/kafka-clients-3.6.0.jar
cp kafka-clients-3.6.0.jar /opt/flink/lib/

在这里插入图片描述

联通测试方式一

如果你希望验证 Flink 能消费 Kafka 数据流,可使用以下命令在 Kafka 中创建 Topic 并发送测试数据,再用 Flink 示例任务消费它。

Kafka 创建 Topic:

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

Kafka 发送消息:

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

Flink 运行 KafkaSource 示例代码(你需要自行写 Job 或提供 JAR 包)。

联通测试方式二

package com.example.controller;

import com.example.config.FlinkConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * Flink测试控制器
 */
@RestController
@RequestMapping("/api/flink")
@Slf4j
public class FlinkTestController {

    @Autowired
    private FlinkConfig flinkConfig;

    /**
     * 测试Flink连接
     */
    @PostMapping("/test-connection")
    public Map<String, Object> testFlinkConnection() {
        Map<String, Object> result = new HashMap<>();
        
        try {
            log.info("正在测试Flink连接到 {}:{}", 
                    flinkConfig.getRemote().getHost(), 
                    flinkConfig.getRemote().getPort());
            
            // 创建远程执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
                    flinkConfig.getRemote().getHost(),
                    flinkConfig.getRemote().getPort()
            );
            
            if (env != null) {
                result.put("status", "success");
                result.put("message", "Flink连接测试成功");
                result.put("flinkHost", flinkConfig.getRemote().getHost());
                result.put("flinkPort", flinkConfig.getRemote().getPort());
                log.info("Flink连接测试成功");
            } else {
                result.put("status", "error");
                result.put("message", "无法创建Flink执行环境");
            }
            
        } catch (Exception e) {
            log.error("Flink连接测试失败", e);
            result.put("status", "error");
            result.put("message", "Flink连接测试失败: " + e.getMessage());
            result.put("error", e.getClass().getSimpleName());
        }
        
        return result;
    }
} 

在这里插入图片描述


✅ 七、总结

组件 默认端口 外部可访问配置
Zookeeper 2181 配置文件已支持
Kafka 9092 listeners=0.0.0.0:9092
Flink 8081 rest.address=0.0.0.0