1.Flink HistoryServer用途
HistoryServer可以在Flink 作业终止运行(Flink集群关闭)之后,还可以查询已完成作业的统计信息。此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。
2.部署Flink HistoryServer
1、创建 flink historyserver pvc,保存Flink作业归档数据。
[root@k8s-demo001 ~]# cat flink-historyserver-pvc.yaml
#Flink Historyserver 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-historyserver-pvc # historyserver pvc名称
namespace: flink # 指定归属的名命空间
spec:
storageClassName: nfs-storage #sc名称,更改为实际的sc名称
accessModes:
- ReadWriteMany #采用ReadWriteMany的访问模式
resources:
requests:
storage: 1Gi #存储容量,根据实际需要更改
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-pvc.yaml
2、配置flink historyserver,创建flink historyserver configmap
[root@k8s-demo001 ~]# cat flink-historyserver-conf.yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: flink-historyserver-conf
namespace: flink
annotations:
kubesphere.io/creator: admin
data:
flink-conf.yaml: |
blob.server.port: 6124
kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2
kubernetes.jobmanager.replicas: 1
kubernetes.jobmanager.cpu: 1.0
$internal.flink.version: v1_13
kubernetes.taskmanager.cpu: 1.0
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
kubernetes.service-account: flink
kubernetes.cluster-id: flink-historyserver
kubernetes.container.image: flink-hdfs:1.13.6
parallelism.default: 2
kubernetes.namespace: flink
taskmanager.numberOfTaskSlots: 2
kubernetes.rest-service.exposed.type: ClusterIP
kubernetes.operator.reconcile.interval: 15 s
kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 1024m
kubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
kubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_17272077926352838674.yaml
execution.target: kubernetes-session
jobmanager.archive.fs.dir: file:///opt/flink/flink_history
historyserver.archive.fs.dir: file:///opt/flink/flink_history
historyserver.archive.fs.refresh-interval: 10000
historyserver.web.port: 8082
web.tmpdir: /opt/flink/webupload
web.upload.dir: /opt/flink/webupload
web.cancel.enable: false
internal.cluster.execution-mode: NORMAL
queryable-state.proxy.ports: 6125
state.checkpoints.dir: file:///opt/flink/checkpoints
log4j.properties: |
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
log4j-console.properties: |
本文含有隐藏内容,请 开通VIP 后查看