Flink通过Native Kubernetes部署运行

发布于:2025-06-25 ⋅ 阅读:(17) ⋅ 点赞:(0)

在 Flink on Kubernetes 环境中,Job 部署方式分为Native Kubernetes和Kubernetes Operator2种,本文以Native Kubernetes appplication模式部署运行介绍。

一、环境准备,创建K8S账户以及授权
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
二、部署flink historyserver

1. flink-historyserver-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-historyserver-configmap
  namespace: flink
data:
  # file format
  flink-conf.yaml: |
    execution.target: kubernetes-session
    rest.address: localhost
    rest.bind-address: localhost
    jobmanager.bind-host: localhost
    jobmanager.rpc.address: localhost
    jobmanager.rpc.port: 6123
    jobmanager.memory.process.size: 1600m
    jobmanager.execution.failover-strategy: region
    taskmanager.bind-host: localhost
    taskmanager.host: localhost
    taskmanager.memory.process.size: 1728m
    taskmanager.numberOfTaskSlots: 1
    parallelism.default: 1
    state.backend: filesystem
    fs.allowed-fallback-filesystems: s3
    s3.endpoint: http://127.0.0.1:9680
    s3.access-key: sk
    s3.secret-key: sk
    s3.path.style.access: true
    state.checkpoints.dir: s3://flink/flink-checkpoints
    # HistoryServer
    jobmanager.archive.fs.dir: s3://flink/completed-jobs/
    historyserver.web.address: 0.0.0.0
    historyserver.web.port: 8082
    historyserver.archive.fs.dir: s3://flink/completed-jobs/
    historyserver.archive.fs.refresh-interval: 10000
    historyserver.archive.clean-expired-jobs:false
    historyserver.archive.retained-jobs:-1
  log4j.properties: |
    # Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
    monitorInterval=30
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.file.ref = MainAppender
    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO
    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
    logger.shaded_zookeeper.level = INFO
    # Log all infos in the given file
    appender.main.name = MainAppender
    appender.main.type = RollingFile
    appender.main.append = true
    appender.main.fileName = ${sys:log.file}
    appender.main.filePattern = ${sys:log.file}.%i
    appender.main.layout.type = PatternLayout
    appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.main.policies.type = Policies
    appender.main.policies.size.type = SizeBasedTriggeringPolicy
    appender.main.policies.size.size = 100MB
    appender.main.policies.startup.type = OnStartupTriggeringPolicy
    appender.main.strategy.type = DefaultRolloverStrategy
    appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
  log4j-console.properties: |
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender
     
    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO
     
    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
     
    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
     
    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
     
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
     
    # Flink Deployment Logging Overrides
    # rootLogger.level = DEBUG

kubectl apply -f D:/work/code/data-project/test-project/flink-test/src/main/resources/flink-historyserver-configmap.yaml -n flink

2.flink-historyserver-deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: flink
  name: flink-historyserver
  labels:
    app: flink-historyserver
    name: flink-historyserver
spec:
  replicas: 1
  selector:
    matchLabels:
      name: flink-historyserver
  template:
    metadata:
      namespace: flink
      labels:
        app: flink-historyserver
        name: flink-historyserver
    spec:
      containers:
        - name: flink-historyserver
          env:
            - name: TZ
              value: Asia/Shanghai
          image: harbor.com.cn/apache/flink:1.16.2-java8-s3
          command: ['sh','-c', ]
          args:
            - '/docker-entrypoint.sh history-server'
          ports:
            - containerPort: 8082
          volumeMounts:
            - name: flink-historyserver-confmap
              mountPath: /opt/flink/conf
      volumes:  # 挂载卷配置
        - name: flink-historyserver-confmap
          configMap:
            name: flink-historyserver-configmap
---
kind: Service
apiVersion: v1
metadata:
 namespace: flink
 name: flink-historyserver
spec:
 type: NodePort
 ports:
   - port: 8082
     nodePort: 31082
 selector:
   name: flink-historyserver
三、部署flink app

1.构建initContainer

flink app需要flink的base images中,常见方案有如下3种:

a.将 job.jar 打入 Docker Image,只要 job.jar 变动,都得打包镜像,那是次次都麻烦;
b.将 job.jar 上传 HTTP 服务器,利用 initContainers 生命周期阶段,下载 job.jar, 不过它麻烦的是,需部署一个HTTP 文件服务器;
c.将 job.jar 上传 Flink Job 挂载的 PVC 路径下,这样在 Pod 的 volume中直接存在路径,不过它麻烦的是,将 job.jar 上传到 PVC较麻烦,你需要单独部署一个工具 Pod,挂载 这个PVC, 在再上传,否则,独立的 PVC是无法管理文件的;

当前参考b,利用initContainers 生命周期阶段从s3文件服务器下载 job.jar,基于 job.jar运行java程序下载flink app所需的资源文件,open-jdk-s3.dockerfile如下:

docker build -t harbor.com.cn/apache/open-jdk-s3:8u342-slim -f open-jdk-s3.dockerfile .
public static void main(String[] args) {
        if (args == null || args.length == 0) {
            args = new String[]{"http://120.0.0.1:9680", "ak", "sk", "flink", "flink-artifact/flink-demo-1.0-SNAPSHOT.jar"};
        }
        AmazonS3 amazonS3 = null;
        try {
            amazonS3 = getS3Client(args[0], args[1], args[2], null, null);
            String buketName = args[3];
            String[] files = args[4].split(",");
            for (String file : files) {
                download(amazonS3, buketName, file, file.equalsIgnoreCase(files[0]) ? "/flink-artifact/" : "/flink-usrlib/");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (amazonS3 != null) {
                amazonS3.shutdown();
            }
        }
    }
 
    @Deprecated
    public static AmazonS3 getS3Client(String endPoint, String accessKey, String secretKey, String protocol, String region) {
        if (StringUtils.isNotBlank(region)) {
            Regions clientRegion = Regions.fromName(region);
            AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withCredentials(
                    new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)));
            if (null != endPoint && !endPoint.trim().isEmpty()) {
                builder = builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, clientRegion.getName()));
            } else {
                builder = builder.withRegion(clientRegion.getName());
            }
            return builder.build();
        } else {
            BasicAWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
            ClientConfiguration clientConfiguration = new ClientConfiguration();
            if (StringUtils.isBlank(protocol) || "HTTP".equals(protocol)) {
                clientConfiguration.setProtocol(Protocol.HTTP);
            } else {
                clientConfiguration.setProtocol(Protocol.HTTPS);
            }
            AmazonS3Client client = new AmazonS3Client(cred, clientConfiguration);
            client.setEndpoint(endPoint);
            return client;
        }
    }
 
    private static void download(AmazonS3 amazonS3, String buketName, String Object, String dir) {
        FileOutputStream fos = null;
        InputStream is = null;
        try {
            boolean exist = amazonS3.doesObjectExist(buketName, Object);
            if (!exist) {
                System.out.println(Object + " not exist");
                amazonS3.shutdown();
                return;
            }
            String fileName = Object;
            if (fileName.indexOf("/") > 0) {
                fileName = fileName.substring(fileName.lastIndexOf("/") + 1);
            }
            File file = new File(dir + fileName);
            if (!file.getParentFile().exists()) {
                file.getParentFile().mkdirs();
            }
            if (file.exists()) {
                file.delete();
            }
            S3Object object = amazonS3.getObject(new GetObjectRequest(buketName, Object));
            is = object.getObjectContent();
            fos = new FileOutputStream(file);
            byte[] b = new byte[1024];
            int length;
            while ((length = is.read(b)) > 0) {
                fos.write(b, 0, length);
            }
            fos.flush();
            System.out.println(Object + " download ok");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeQuietly(fos);
            IOUtils.closeQuietly(is);
        }
    }
 
}

2. 构建flink镜像

docker build -t harbor.com.cn/apache/flink:1.16.2-java8-s3 -f flink.dockerfile .
FROM harbor.com.cn/apache/flink:1.16.2-java8
USER flink
RUN mkdir -p /opt/flink/plugins/s3-fs-presto
RUN cp /opt/flink/opt/flink-s3-fs-presto-*.jar /opt/flink/plugins/s3-fs-presto
RUN rm -rf /opt/flink/lib/*.jar
COPY *.jar /opt/flink/lib/
#COPY log4j.properties /opt/flink/conf/

3.java api提交flink任务到k8s集群

import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
 
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
 
public class FlinkSubmit {
 
    public static void main(String[] args) throws Exception {
        // 创建 Flink 配置对象
        Configuration configuration = new Configuration();
        // 设置 Kubernetes 相关配置
        configuration.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-cluster-1");//45 lowercase alphanumeric characters
        // configuration.set(KubernetesConfigOptions.CONTEXT,"user-wuhank8s");
        configuration.setString(KubernetesConfigOptions.KUBE_CONFIG_FILE, "C:/Users/lenovo/.kube/config");
        configuration.setString(KubernetesConfigOptions.NAMESPACE, "flink"); // 命名空间
        configuration.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-service-account");//设置 jobmanager k8s账户
        configuration.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "harbor.com.cn/apache/flink:1.16.2-java8-s3");
        configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always);
        configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE, KubernetesConfigOptions.NodePortAddressType.InternalIP);//rest 服务 暴露的 IP选择,默认是node 的外网 IP
        configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.NodePort);// rest 服务暴露的端口号类型,选择节点端口
        configuration.set(KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES, 1);
 
        configuration.setString(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());//部署模式为 application 模式
        configuration.set(PipelineOptions.JARS, Collections.singletonList("local:///opt/flink/artifacts/flink-demo-1.0-SNAPSHOT.jar"));
        configuration.setString(PipelineOptions.NAME, "flinkapp:4");
        configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "7dd30d02daad6ecc2f286866f79bd448");
        configuration.setString(CoreOptions.FLINK_JVM_OPTIONS, "-Dtest.a=test.a -Dflink_job_name=flinkapp:4");
        //  org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME  log4j-console.properties
        configuration.setString(DeploymentOptionsInternal.CONF_DIR, "D:/work/code/data-project/test-project/flink-test/src/main/resources/flink-log");
 
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2 * 1024));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2 * 1024));
        configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, Integer.parseInt("1"));
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt("1"));
 
        configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
        configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, Boolean.FALSE);
        configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        configuration.setString(JobManagerOptions.ARCHIVE_DIR, "s3://flink/completed-jobs/");
        configuration.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0L);
 
        configuration.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
        configuration.setString(CoreOptions.ALLOWED_FALLBACK_FILESYSTEMS, "s3");
        configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3");
        configuration.setString("s3.endpoint", "http://127.0.0.1:9680");
        configuration.setString("s3.flink.bucket", "flink");
        configuration.setString("s3.path.style.access", "true");
        configuration.setString("s3.access-key", "sk");
        configuration.setString("s3.secret-key", "sk");
 
        configuration.setString(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://flink/state/checkpoint");
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, "s3://flink/state/savepoints");
        configuration.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
        configuration.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(4));
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 10);
 
        configuration.setString("jobstore.expiration-time", "1");
        configuration.setString("jobstore.max-capacity", "1");
        configuration.setString("historyserver.web.address", "10.32.123.27");
        configuration.setString("historyserver.web.port", "31082");
        configuration.setString("historyserver.archive.fs.dir", "s3://flink/completed-jobs/");
        configuration.setString("historyserver.archive.fs.refresh-interval", "10000");
 
//        configuration.set(DeploymentOptions.TARGET,KubernetesDeploymentTarget.SESSION.getName());//部署模式为 application 模式
        configuration.setString(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, Objects.requireNonNull(FlinkSubmit.class.getClassLoader().getResource("flink-pod-template.yaml")).getPath());
        // 创建 KubernetesClusterClientFactoryc
        KubernetesClusterClientFactory factory = new KubernetesClusterClientFactory();
        // 创建 KubernetesClusterDescriptor
        try (KubernetesClusterDescriptor descriptor = factory.createClusterDescriptor(configuration)) {
            // 部署 Flink 集群
            org.apache.flink.client.deployment.ClusterSpecification clusterSpecification = factory.getClusterSpecification(configuration);
            ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[]{}, "com.test.TestWindow");
 
            ClusterClientProvider<String> clusterClientProvider = descriptor.deployApplicationCluster(clusterSpecification, appConfig);
//            ClusterClientProvider<String> clusterClientProvider = descriptor.deploySessionCluster(clusterSpecification);
            // 可以在这里执行其他操作,例如提交作业
            ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
            String webInterfaceURL = clusterClient.getWebInterfaceURL();
            System.out.println("webInterfaceURL = " + webInterfaceURL);
            // 每个k8s集群是都隔离?
            // 部署后, 状态:history有没有,pod(jobmaner和tackmager)有没有,再查jobstatus
            // 取消后, 状态: deploy有没有
            // 清理7天之前的ck
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
}

flink-pod-template.yaml

apiVersion: v1
kind: Pod
metadata:
  name: flink-pod-template
spec:
#  restartPolicy: Always、OnFailure、Never
  initContainers:
    - name: artifacts-fetcher
      image: harbor.com.cn/apache/open-jdk-s3:8u342-slim
      imagePullPolicy: Always
      # Use wget or other tools to get user jars from remote storage
      command: [ 'java', '-jar', 'app.jar', 'http://127.0.0.1:9680', 'ak', 'sk', 'flink', 'flink-artifact/flink-demo-1.0-SNAPSHOT.jar']
      volumeMounts:
        - mountPath: /flink-artifact
          name: flink-artifact
        - mountPath: /flink-usrlib
          name: flink-usrlib
  containers:
    # Do not change the main container name
    - name: flink-main-container
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: FLINK_NAME
          value: test
      image: whharbor.cestc.cn/apache/flink:1.16.2-java8-s3
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /opt/flink/volumes/hostpath
          name: flink-volume-hostpath
        - mountPath: /opt/flink/log
          name: flink-logs
        - mountPath: /opt/flink/artifacts
          name: flink-artifact
        - mountPath: /opt/flink/lib/extr-lib
          name: flink-usrlib
  hostAliases:
    - ip: "127.0.0.1"
      hostnames:
        - "kafka.test.com"
        - "zk.test.com"
  volumes:
    - name: flink-volume-hostpath
      hostPath:
        path: /tmp
        type: Directory
    - name: flink-logs
      emptyDir: { }
    - name: flink-artifact
      emptyDir: { }
    - name: flink-usrlib
      emptyDir: { }


网站公告

今日签到

点亮在社区的每一天
去签到