自从上了flink后,其实已经好几年没有编写SparkStreaming作业了。但是还有一个机器学习的作业是通过sparkml+sparkstreaming的。这个不方便迁移到flink上,所以一直保留着。
再有就是,我们的spark作业都是运行在k8s上的。并没有工具或者平台进行sparkstreaming作业的监控和维护。因为作业较少,以后也不会再新增类似作业了,所以我就编写几个shell脚本+企业微信机器人+Linux的crontab来时间sparkstreaming作业的状态监控告警,日志收集和自动重启。
1.启动脚本
#!/bin/sh
#source ~/.bashrc
cd $(cd "`dirname "$0"`";pwd)
#这是作业提交spark-on-k8s的yaml文件无后缀名字
yamlname1=ParkPrediction
#名称为1-63个字符,可包含数字、小写英文字以及短划线(-)、不能以短线(-)开头
appname=${yamlname1,,}
date=$(date +"%Y-%m-%d" -d "-0 day")
yamlname=${yamlname1}-${date}
appname=${appname}-${date}
#指定日志路径
logdir=/home/hadoop/pdEnv/spark_k8s/ml/log/${appname}
[ ! -d $logdir ] && mkdir -p $logdir
#定义日志输出方法
log2file() {
running_pod=$(kubectl get pod -n bigdata | grep ${appname} | grep "Running\|Error" 2>/dev/null| awk '{print $1}')
for podName in ${running_pod[@]}
do
logcommand=$(ps -aux | grep kubectl | grep logs |grep $podName)
logpath=${logdir}/${podName}.log
if [[ -z "$logcommand" ]]; then
echo '输出日志文件:'${logpath}
nohup kubectl logs -f --tail=100 ${podName} -n bigdata >> ${logpath} 2>&1 &
fi
done
}
#定义执行文件
[ ! -d executor ] && mkdir executor
cp ${yamlname1}.yaml executor/${yamlname}-exec.yaml
sed -i "s/\${appname}/${appname}/g" executor/${yamlname}-exec.yaml
#删除已有作业
kubectl delete -f executor/${yamlname}-exec.yaml --wait
#提交执行文件
sleep 20s
echo "执行 executor/${yamlname}-exec.yaml"
kubectl apply -f executor/${yamlname}-exec.yaml
sleep 5s
#监听状态获取日志
while [ 1 ]
do
sleep 5s
#echo "获取 driver pod状态"
status=`kubectl get pod -n bigdata|grep ${appname}-driver|awk '{print $3}'`
if [[ ${status} == "Error" ]]; then
log2file
exit 255
elif [[ ${status} == "" ]]; then
sleep 120s
status=`kubectl get pod -n bigdata|grep ${appname}-driver|awk '{print $3}'`
if [[ ${status} == "" ]]; then
echo "pod 异常,请检查yaml文件"
exit 255
elif [[ ${status} == "Error" ]]; then
kubectl logs --tail=100 ${appname}-driver -n bigdata
exit 255
fi
elif [[ ${status} == "Running" ]];then
log2file
continue
elif [[ ${status} == "Completed" ]]; then
exit 0
else
continue
fi
sleep 5s
done
执行命令为:
nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &
执行效果为:ParkPrediction.log里会记录driver的日志路径和executor日志的路径。方便排查问题查看日志
2.状态监控脚本+自动重启
#!/bin/bash
#预定义需要状态监听的spark-on-k8s作业名称
apps=("parkareaprediction" "parkprediction")
running_app=$(kubectl get sparkapplication -n bigdata 2>/dev/null \
| grep -E "parkareaprediction|parkprediction" \
| grep "RUNNING" \
| awk '{print $1}' \
| sed -E 's/-[0-9]{4}-[0-9]{2}-[0-9]{2}//')
echo running_app=$running_app
for app in ${apps[@]}
do
if [[ "${running_app[@]}" =~ "${app}" ]]; then
echo $app " is running"
else
echo $app " is not running"
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=this-is-your-robot-key' \
-H 'Content-Type: application/json' \
-d '
{
"msgtype": "text",
"text": {
"mentioned_mobile_list":["your-phone-number"],
"content": "华为云生产环境sparkStreaming实时任务:'$app' is not running,即将重启"
}
}'
echo
cd /home/hadoop/pdEnv/spark_k8s/ml
if [ "$app" == "parkprediction" ]; then
echo "ParkPrediction 作业日志路径如下:"
tail -n 10 ParkPrediction.log
echo "-----------开始重启 ParkPrediction---------------"
nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &
elif [ "$app" == "parkareaprediction" ]; then
echo "ParkAreaPrediction 作业日志路径如下:"
tail -n 10 ParkAreaPrediction.log
echo "-----------开始重启 ParkAreaPrediction---------------"
nohup sh ParkAreaPrediction.sh > ParkAreaPrediction.log 2>&1 &
else
echo "未知作业名:$app"
fi
fi
done
然后配置crontab定时执行状态监控脚本
#每20分钟执行检查一次spark streaming作业运行状态
*/20 * * * * sh /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.sh >> /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.log
效果如下: