在 Flink on Kubernetes 环境中,Job 部署方式分为Native Kubernetes和Kubernetes Operator2种,本文以Native Kubernetes appplication模式部署运行介绍。
一、环境准备,创建K8S账户以及授权
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
二、部署flink historyserver
1. flink-historyserver-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-historyserver-configmap
namespace: flink
data:
# file format
flink-conf.yaml: |
execution.target: kubernetes-session
rest.address: localhost
rest.bind-address: localhost
jobmanager.bind-host: localhost
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: localhost
taskmanager.host: localhost
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
state.backend: filesystem
fs.allowed-fallback-filesystems: s3
s3.endpoint: http://127.0.0.1:9680
s3.access-key: sk
s3.secret-key: sk
s3.path.style.access: true
state.checkpoints.dir: s3://flink/flink-checkpoints
# HistoryServer
jobmanager.archive.fs.dir: s3://flink/completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir: s3://flink/completed-jobs/
historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.clean-expired-jobs:false
historyserver.archive.retained-jobs:-1
log4j.properties: |
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
log4j-console.properties: |
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
# Flink Deployment Logging Overrides
# rootLogger.level = DEBUG
kubectl apply -f D:/work/code/data-project/test-project/flink-test/src/main/resources/flink-historyserver-configmap.yaml -n flink
2.flink-historyserver-deployment
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: flink
name: flink-historyserver
labels:
app: flink-historyserver
name: flink-historyserver
spec:
replicas: 1
selector:
matchLabels:
name: flink-historyserver
template:
metadata:
namespace: flink
labels:
app: flink-historyserver
name: flink-historyserver
spec:
containers:
- name: flink-historyserver
env:
- name: TZ
value: Asia/Shanghai
image: harbor.com.cn/apache/flink:1.16.2-java8-s3
command: ['sh','-c', ]
args:
- '/docker-entrypoint.sh history-server'
ports:
- containerPort: 8082
volumeMounts:
- name: flink-historyserver-confmap
mountPath: /opt/flink/conf
volumes: # 挂载卷配置
- name: flink-historyserver-confmap
configMap:
name: flink-historyserver-configmap
---
kind: Service
apiVersion: v1
metadata:
namespace: flink
name: flink-historyserver
spec:
type: NodePort
ports:
- port: 8082
nodePort: 31082
selector:
name: flink-historyserver
三、部署flink app
1.构建initContainer
flink app需要flink的base images中,常见方案有如下3种:
a.将 job.jar 打入 Docker Image,只要 job.jar 变动,都得打包镜像,那是次次都麻烦;
b.将 job.jar 上传 HTTP 服务器,利用 initContainers 生命周期阶段,下载 job.jar, 不过它麻烦的是,需部署一个HTTP 文件服务器;
c.将 job.jar 上传 Flink Job 挂载的 PVC 路径下,这样在 Pod 的 volume中直接存在路径,不过它麻烦的是,将 job.jar 上传到 PVC较麻烦,你需要单独部署一个工具 Pod,挂载 这个PVC, 在再上传,否则,独立的 PVC是无法管理文件的;
当前参考b,利用initContainers 生命周期阶段从s3文件服务器下载 job.jar,基于 job.jar运行java程序下载flink app所需的资源文件,open-jdk-s3.dockerfile如下:
docker build -t harbor.com.cn/apache/open-jdk-s3:8u342-slim -f open-jdk-s3.dockerfile .
public static void main(String[] args) {
if (args == null || args.length == 0) {
args = new String[]{"http://120.0.0.1:9680", "ak", "sk", "flink", "flink-artifact/flink-demo-1.0-SNAPSHOT.jar"};
}
AmazonS3 amazonS3 = null;
try {
amazonS3 = getS3Client(args[0], args[1], args[2], null, null);
String buketName = args[3];
String[] files = args[4].split(",");
for (String file : files) {
download(amazonS3, buketName, file, file.equalsIgnoreCase(files[0]) ? "/flink-artifact/" : "/flink-usrlib/");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (amazonS3 != null) {
amazonS3.shutdown();
}
}
}
@Deprecated
public static AmazonS3 getS3Client(String endPoint, String accessKey, String secretKey, String protocol, String region) {
if (StringUtils.isNotBlank(region)) {
Regions clientRegion = Regions.fromName(region);
AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)));
if (null != endPoint && !endPoint.trim().isEmpty()) {
builder = builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, clientRegion.getName()));
} else {
builder = builder.withRegion(clientRegion.getName());
}
return builder.build();
} else {
BasicAWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
ClientConfiguration clientConfiguration = new ClientConfiguration();
if (StringUtils.isBlank(protocol) || "HTTP".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else {
clientConfiguration.setProtocol(Protocol.HTTPS);
}
AmazonS3Client client = new AmazonS3Client(cred, clientConfiguration);
client.setEndpoint(endPoint);
return client;
}
}
private static void download(AmazonS3 amazonS3, String buketName, String Object, String dir) {
FileOutputStream fos = null;
InputStream is = null;
try {
boolean exist = amazonS3.doesObjectExist(buketName, Object);
if (!exist) {
System.out.println(Object + " not exist");
amazonS3.shutdown();
return;
}
String fileName = Object;
if (fileName.indexOf("/") > 0) {
fileName = fileName.substring(fileName.lastIndexOf("/") + 1);
}
File file = new File(dir + fileName);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
if (file.exists()) {
file.delete();
}
S3Object object = amazonS3.getObject(new GetObjectRequest(buketName, Object));
is = object.getObjectContent();
fos = new FileOutputStream(file);
byte[] b = new byte[1024];
int length;
while ((length = is.read(b)) > 0) {
fos.write(b, 0, length);
}
fos.flush();
System.out.println(Object + " download ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(fos);
IOUtils.closeQuietly(is);
}
}
}
2. 构建flink镜像
docker build -t harbor.com.cn/apache/flink:1.16.2-java8-s3 -f flink.dockerfile .
FROM harbor.com.cn/apache/flink:1.16.2-java8
USER flink
RUN mkdir -p /opt/flink/plugins/s3-fs-presto
RUN cp /opt/flink/opt/flink-s3-fs-presto-*.jar /opt/flink/plugins/s3-fs-presto
RUN rm -rf /opt/flink/lib/*.jar
COPY *.jar /opt/flink/lib/
#COPY log4j.properties /opt/flink/conf/
3.java api提交flink任务到k8s集群
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
public class FlinkSubmit {
public static void main(String[] args) throws Exception {
// 创建 Flink 配置对象
Configuration configuration = new Configuration();
// 设置 Kubernetes 相关配置
configuration.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-cluster-1");//45 lowercase alphanumeric characters
// configuration.set(KubernetesConfigOptions.CONTEXT,"user-wuhank8s");
configuration.setString(KubernetesConfigOptions.KUBE_CONFIG_FILE, "C:/Users/lenovo/.kube/config");
configuration.setString(KubernetesConfigOptions.NAMESPACE, "flink"); // 命名空间
configuration.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, "flink-service-account");//设置 jobmanager k8s账户
configuration.setString(KubernetesConfigOptions.CONTAINER_IMAGE, "harbor.com.cn/apache/flink:1.16.2-java8-s3");
configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always);
configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE, KubernetesConfigOptions.NodePortAddressType.InternalIP);//rest 服务 暴露的 IP选择,默认是node 的外网 IP
configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.NodePort);// rest 服务暴露的端口号类型,选择节点端口
configuration.set(KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES, 1);
configuration.setString(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());//部署模式为 application 模式
configuration.set(PipelineOptions.JARS, Collections.singletonList("local:///opt/flink/artifacts/flink-demo-1.0-SNAPSHOT.jar"));
configuration.setString(PipelineOptions.NAME, "flinkapp:4");
configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "7dd30d02daad6ecc2f286866f79bd448");
configuration.setString(CoreOptions.FLINK_JVM_OPTIONS, "-Dtest.a=test.a -Dflink_job_name=flinkapp:4");
// org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME log4j-console.properties
configuration.setString(DeploymentOptionsInternal.CONF_DIR, "D:/work/code/data-project/test-project/flink-test/src/main/resources/flink-log");
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2 * 1024));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2 * 1024));
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, Integer.parseInt("1"));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt("1"));
configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, Boolean.FALSE);
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
configuration.setString(JobManagerOptions.ARCHIVE_DIR, "s3://flink/completed-jobs/");
configuration.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0L);
configuration.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
configuration.setString(CoreOptions.ALLOWED_FALLBACK_FILESYSTEMS, "s3");
configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3");
configuration.setString("s3.endpoint", "http://127.0.0.1:9680");
configuration.setString("s3.flink.bucket", "flink");
configuration.setString("s3.path.style.access", "true");
configuration.setString("s3.access-key", "sk");
configuration.setString("s3.secret-key", "sk");
configuration.setString(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://flink/state/checkpoint");
configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, "s3://flink/state/savepoints");
configuration.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
configuration.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(4));
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 10);
configuration.setString("jobstore.expiration-time", "1");
configuration.setString("jobstore.max-capacity", "1");
configuration.setString("historyserver.web.address", "10.32.123.27");
configuration.setString("historyserver.web.port", "31082");
configuration.setString("historyserver.archive.fs.dir", "s3://flink/completed-jobs/");
configuration.setString("historyserver.archive.fs.refresh-interval", "10000");
// configuration.set(DeploymentOptions.TARGET,KubernetesDeploymentTarget.SESSION.getName());//部署模式为 application 模式
configuration.setString(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, Objects.requireNonNull(FlinkSubmit.class.getClassLoader().getResource("flink-pod-template.yaml")).getPath());
// 创建 KubernetesClusterClientFactoryc
KubernetesClusterClientFactory factory = new KubernetesClusterClientFactory();
// 创建 KubernetesClusterDescriptor
try (KubernetesClusterDescriptor descriptor = factory.createClusterDescriptor(configuration)) {
// 部署 Flink 集群
org.apache.flink.client.deployment.ClusterSpecification clusterSpecification = factory.getClusterSpecification(configuration);
ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[]{}, "com.test.TestWindow");
ClusterClientProvider<String> clusterClientProvider = descriptor.deployApplicationCluster(clusterSpecification, appConfig);
// ClusterClientProvider<String> clusterClientProvider = descriptor.deploySessionCluster(clusterSpecification);
// 可以在这里执行其他操作,例如提交作业
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
System.out.println("webInterfaceURL = " + webInterfaceURL);
// 每个k8s集群是都隔离?
// 部署后, 状态:history有没有,pod(jobmaner和tackmager)有没有,再查jobstatus
// 取消后, 状态: deploy有没有
// 清理7天之前的ck
} catch (Exception e) {
e.printStackTrace();
}
}
}
flink-pod-template.yaml
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
# restartPolicy: Always、OnFailure、Never
initContainers:
- name: artifacts-fetcher
image: harbor.com.cn/apache/open-jdk-s3:8u342-slim
imagePullPolicy: Always
# Use wget or other tools to get user jars from remote storage
command: [ 'java', '-jar', 'app.jar', 'http://127.0.0.1:9680', 'ak', 'sk', 'flink', 'flink-artifact/flink-demo-1.0-SNAPSHOT.jar']
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
- mountPath: /flink-usrlib
name: flink-usrlib
containers:
# Do not change the main container name
- name: flink-main-container
env:
- name: TZ
value: Asia/Shanghai
- name: FLINK_NAME
value: test
image: whharbor.cestc.cn/apache/flink:1.16.2-java8-s3
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/volumes/hostpath
name: flink-volume-hostpath
- mountPath: /opt/flink/log
name: flink-logs
- mountPath: /opt/flink/artifacts
name: flink-artifact
- mountPath: /opt/flink/lib/extr-lib
name: flink-usrlib
hostAliases:
- ip: "127.0.0.1"
hostnames:
- "kafka.test.com"
- "zk.test.com"
volumes:
- name: flink-volume-hostpath
hostPath:
path: /tmp
type: Directory
- name: flink-logs
emptyDir: { }
- name: flink-artifact
emptyDir: { }
- name: flink-usrlib
emptyDir: { }