33、SparkStreaming作业监控,日志收集,自动重启shell脚本

发布于:2025-03-02 ⋅ 阅读:(105) ⋅ 点赞:(0)

自从上了flink后,其实已经好几年没有编写SparkStreaming作业了。但是还有一个机器学习的作业是通过sparkml+sparkstreaming的。这个不方便迁移到flink上,所以一直保留着。
再有就是,我们的spark作业都是运行在k8s上的。并没有工具或者平台进行sparkstreaming作业的监控和维护。因为作业较少,以后也不会再新增类似作业了,所以我就编写几个shell脚本+企业微信机器人+Linux的crontab来时间sparkstreaming作业的状态监控告警,日志收集和自动重启。

1.启动脚本

#!/bin/sh
#source ~/.bashrc

cd $(cd "`dirname "$0"`";pwd)
#这是作业提交spark-on-k8s的yaml文件无后缀名字
yamlname1=ParkPrediction

#名称为1-63个字符,可包含数字、小写英文字以及短划线(-)、不能以短线(-)开头
appname=${yamlname1,,}

date=$(date  +"%Y-%m-%d" -d "-0 day")

yamlname=${yamlname1}-${date}
appname=${appname}-${date}
#指定日志路径
logdir=/home/hadoop/pdEnv/spark_k8s/ml/log/${appname}
[ ! -d $logdir  ] && mkdir -p $logdir
#定义日志输出方法
log2file() {
        running_pod=$(kubectl get pod -n bigdata | grep ${appname} | grep "Running\|Error"  2>/dev/null| awk '{print $1}')

        for podName in ${running_pod[@]}
        do
                logcommand=$(ps -aux | grep kubectl | grep logs |grep $podName)
                logpath=${logdir}/${podName}.log
                if [[ -z "$logcommand" ]]; then
                        echo '输出日志文件:'${logpath}
                        nohup kubectl logs -f  --tail=100 ${podName} -n bigdata >> ${logpath} 2>&1 &
                fi
        done
}

#定义执行文件
[ ! -d executor  ] && mkdir executor
cp ${yamlname1}.yaml executor/${yamlname}-exec.yaml
sed -i "s/\${appname}/${appname}/g" executor/${yamlname}-exec.yaml

#删除已有作业
kubectl delete -f executor/${yamlname}-exec.yaml --wait


#提交执行文件
sleep 20s
echo "执行 executor/${yamlname}-exec.yaml"
kubectl apply -f executor/${yamlname}-exec.yaml
sleep 5s

#监听状态获取日志
while [ 1 ]
do
   sleep 5s
   #echo "获取 driver pod状态"
   status=`kubectl get pod  -n bigdata|grep ${appname}-driver|awk '{print $3}'`
   if [[ ${status} == "Error" ]]; then
       log2file
       exit 255
   elif [[ ${status} == "" ]]; then
       sleep 120s
       status=`kubectl get pod  -n bigdata|grep ${appname}-driver|awk '{print $3}'`
       if [[ ${status} == "" ]]; then
           echo "pod 异常,请检查yaml文件"
           exit 255
       elif [[ ${status} == "Error" ]]; then
           kubectl logs --tail=100 ${appname}-driver -n bigdata
           exit 255
       fi
   elif [[ ${status} == "Running" ]];then
       log2file
       continue
   elif [[ ${status} == "Completed" ]]; then
       exit 0
   else
       continue
   fi
   sleep 5s
done

执行命令为:

nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &

执行效果为:ParkPrediction.log里会记录driver的日志路径和executor日志的路径。方便排查问题查看日志
在这里插入图片描述
在这里插入图片描述

2.状态监控脚本+自动重启

#!/bin/bash
#预定义需要状态监听的spark-on-k8s作业名称
apps=("parkareaprediction" "parkprediction")

running_app=$(kubectl get sparkapplication -n bigdata 2>/dev/null \
    | grep -E "parkareaprediction|parkprediction" \
    | grep "RUNNING" \
    | awk '{print $1}' \
    | sed -E 's/-[0-9]{4}-[0-9]{2}-[0-9]{2}//')

echo running_app=$running_app

for app in ${apps[@]}
        do
                if [[ "${running_app[@]}" =~ "${app}" ]]; then
                        echo $app " is running"
                else
                        echo $app " is not running"
                        curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=this-is-your-robot-key' \
                        -H 'Content-Type: application/json' \
                        -d '
                        {
                                        "msgtype": "text",
                                        "text": {
                                                "mentioned_mobile_list":["your-phone-number"],
                                                "content": "华为云生产环境sparkStreaming实时任务:'$app' is not running,即将重启"
                                        }
                        }'
                        echo
                       cd /home/hadoop/pdEnv/spark_k8s/ml
                       if [ "$app" == "parkprediction" ]; then
                          echo "ParkPrediction 作业日志路径如下:"
                          tail -n 10 ParkPrediction.log
                          echo "-----------开始重启 ParkPrediction---------------"
                          nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &
                        elif [ "$app" == "parkareaprediction" ]; then
                          echo "ParkAreaPrediction 作业日志路径如下:"
                          tail -n 10 ParkAreaPrediction.log
                          echo "-----------开始重启 ParkAreaPrediction---------------"
                          nohup sh ParkAreaPrediction.sh > ParkAreaPrediction.log 2>&1 &
                        else
                          echo "未知作业名:$app"
                        fi
                fi

        done

然后配置crontab定时执行状态监控脚本

#每20分钟执行检查一次spark streaming作业运行状态
*/20 * * * * sh /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.sh >>  /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.log

效果如下:
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到