定时任务管理原型

发布于:2025-02-11 ⋅ 阅读:(74) ⋅ 点赞:(0)

​​​​​​​

主逻辑代码

# -*- coding: utf-8 -*-
# import apscheduler
import  pandas as pd
import os,copy,json
from datetime import datetime
from loguru import logger
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.util import undefined


# cron定时调度(某一定时时刻执行)
# (int|str) 表示参数既可以是int类型,也可以是str类型
# (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
# year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
# month (int|str) – month (1-12) -(表示取值范围为1-12月)
# day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
# week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
# day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
# hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
# minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
# second (int|str) – second (0-59) - (表示取值范围为0-59秒)
# start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
# end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)

#  interval 间隔调度(每隔多久执行)
# weeks (int) – number of weeks to wait
# days (int) – number of days to wait
# hours (int) – number of hours to wait
# minutes (int) – number of minutes to wait
# seconds (int) – number of seconds to wait
# start_date (datetime|str) – starting point for the interval calculation
# end_date (datetime|str) – latest possible date/time to trigger on
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations

# date 定时调度(作业只会执行一次)
# run_date (datetime|str) – the date/time to run the job at  -(任务开始的时间)
# timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
def QhRR(QhVALUE,QhVALUE1):
    print("{}--{}执行定时任务...".format(QhVALUE,QhVALUE1))

class QhApscheduler():
    QhFiled = {
                "QhJabId": "None",            # 任务ID
                "QhJabName": "None",          # 任务名称
                "QhJabFuncName": "None",      # 程序名称 执行的程序名
                "QhJabFuncMiaoShu": "None",   # 程序描述
                "QhTimesType": "None",        # 任务类型  重复 间隔 一次
                "QhYear": "None",             # 年
                "QhMonth": "None",            # 月
                "QhDay": "None",              # 日
                "QhWeek": "None",             # 周
                "QhDayOfWeek": "None",        # 星期几
                "QhHour": "None",             # 小时
                "QhMinute": "None",           # 分钟
                "QhSecond": "None",           # 秒钟
                "QhJabArgs": "None",          # 任务参数  函数参数
                "QhJabKwargs": "None",        # 任务参数  函数参数
                "QhJabStartDate": "None",     # 开始时间
                "QhJabNextRunTime": "None",   # 下次运行时间
                "QhJabLastRunTime": "None",    # 上次运行时间
                "QhJabStatus": "None"         # 任务状态 
                }
    def __init__(self,
                 QhThreadCount=20,
                 QhProcessCount = 10,
                  *args, **kwargs):

        self.QhCheduPath = os.path.dirname(os.path.abspath(__file__))         #绝对路径 用于获取定时模块的根目录
        logger.info("【初始化01】:QhApscheduler的绝对路径:[{}]".format(self.QhCheduPath))
        logger.info("【初始化02】:初始化存储位置&初始化任务池文件,QueHui!")
        self.QhCheduPoolJobDf = self._QhinitJobPoolCsv()
        logger.info("【初始化03】:设置任务线程({})和进程({})的数量,QueHui!".format(str(QhThreadCount),str(QhProcessCount)))
        QhExecutors = {
                'default':ThreadPoolExecutor(QhThreadCount),
                'processpool':ProcessPoolExecutor(QhProcessCount)
                    }
        logger.info("【初始化04】:设置定时任务序列数据库,QueHui!")
        self.QhJobstores = {
            # 'default': SQLAlchemyJobStore(url='sqlite:///QhJobPoolDb/QhJabSqlite.sqlite')
            'default': SQLAlchemyJobStore(url='sqlite:///{}/QhJobPoolDb/QhJabSqlite.sqlite'.format(self.QhCheduPath))  # 可以给绝对路径
                }
        self.Qhscheduler = BackgroundScheduler(jobstores=self.QhJobstores,executors=QhExecutors,misfire_grace_time=3,coalescing=True)
        logger.info("【初始化05】:创建定时任务对象({}),QueHui!".format(self.Qhscheduler))
        logger.info("【初始化06】:启动定时任务对象({}),QueHui!".format(self.Qhscheduler))
        self.Qhscheduler.start()
        logger.info("【初始化07】:初始化已存在任务池任务,QueHui!")
        self._QhInitAddJobFor()

    def _QhinitJobPoolCsv(self):
        # 初始化定时任务数据库文件夹
        # 作者:阙辉
        QhCheduPathpd = "{}\QhJobPoolDb".format(self.QhCheduPath)
        if os.path.exists(QhCheduPathpd):
            logger.info("QhJobPoolDbl路径:[{}] is exist".format(QhCheduPathpd))
            # print("{} is exist!".format(QhCheduPathpd))
        else:
            os.mkdir(QhCheduPathpd)
            logger.info("QhJobPoolDbl路径:[{}] is not exist, create it!".format(QhCheduPathpd))
            # print("{} is not exist, create it!".format(QhCheduPathpd))
        self.QhCheduPoolJobCsv = "{}\QhJobPoolCsv.csv".format(QhCheduPathpd)
        if not os.path.exists(self.QhCheduPoolJobCsv):
            logger.info("QhJobPoolCsv:[{}] is not exist, create it!".format(self.QhCheduPoolJobCsv))
            QhFiled = list(QhApscheduler.QhFiled.keys())
            self.QhCheduPoolJobDf = pd.DataFrame(columns=QhFiled)
            self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False)
        else:
            logger.info("QhJobPoolCsv:[{}] is exist , read it!".format(QhCheduPathpd))
            self.QhCheduPoolJobDf = pd.read_csv(self.QhCheduPoolJobCsv, encoding='gbk')
        self.QhCheduPoolJobDf.fillna("None", inplace=True)
        return self.QhCheduPoolJobDf

    def _QhInitAddJobFor(self,QhIsFor = True):
        # 初始化添加任务
        # 作者:阙辉
        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount == 0:
            logger.info("【初始化任务01】,任务数量为({})直接返回,QueHui!".format(QhJobCount)) 
            return               # 如果没有任务就返回
        logger.info("【初始化任务02】,任务数量为({}),批量初始化,QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow["QhJabId"]                               # 任务ID
            QhJabName = QhRow["QhJabName"]                           # 任务名称
            QhJabFuncName = QhRow["QhJabFuncName"]                   # 程序名称 执行的程序名
            QhTimesType = QhRow["QhTimesType"]                       # 任务类型  重复 间隔 一次
            QhYear = self.QhGeShiZH(QhRow["QhYear"])                 # 年
            QhMonth = self.QhGeShiZH(QhRow["QhMonth"])               # 月
            QhDay = self.QhGeShiZH(QhRow["QhDay"])                   # 日
            QhWeek = self.QhGeShiZH(QhRow["QhWeek"])                 # 周
            QhDayOfWeek = self.QhGeShiZH(QhRow["QhDayOfWeek"])       # 星期几
            QhHour = self.QhGeShiZH(QhRow["QhHour"])                 # 小时
            QhMinute = self.QhGeShiZH(QhRow["QhMinute"])             # 分钟
            QhSecond = self.QhGeShiZH(QhRow["QhSecond"])             # 秒钟
            QhJabArgs = self.QhGeShiZH(QhRow["QhJabArgs"])           # 参数*Args
            QhJabKwargs = self.QhGeShiZH(QhRow["QhJabKwargs"])       # 参数 **Kwargs
            QhJabStatus = self.QhGeShiZH(QhRow["QhJabStatus"])       # 任务状态
            logger.info("【初始化任务02】\n任务ID:{};\n任务名称:{};\n程序名称:{};\n任务状态:{};\n初始化,QueHui!".format(QhJabId,QhJabName,QhJabFuncName,QhJabStatus)) 
            self.QhAddJob(QhJabId,
                          QhTimesType=QhTimesType,
                          QhJabFuncName = QhJabFuncName,
                          QhJabArgs=QhJabArgs,
                          QhJabKwargs=QhJabKwargs,
                          QhName = QhJabName,
                          QhYear = QhYear,
                          QhMonth = QhMonth,
                          QhDay = QhDay,
                          QhWeek = QhWeek,
                          QhDayOfWeek = QhDayOfWeek,
                          QhHour = QhHour,
                          QhMinute = QhMinute,
                          QhSecond = QhSecond,
                          QhIsFor = QhIsFor,
                          QhJabStatus=QhJabStatus)
        # QhIsFor当为True时,保存到csv文件,批量增加时的场景
        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        logger.info("【初始化任务03】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor)) 
        if QhIsFor == True:self.QhSavePoolToCsv()

    def QhTiggers(self,QhTimesType,
                  QhYear = None,
                  QhMonth = None,
                  QhDay = None,
                  QhWeek = None,
                  QhDayOfWeek = None,
                  QhHour = None,
                  QhMinute = None,
                  QhSecond = None,
                  QhStartDate = None,
                  QhEndDate = None,
                  QhTimezone=None,
                  QhJitter=None
                  ):
        if QhTimesType == "重复":
            QhTiggers = CronTrigger(year=QhYear,
                                    month=QhMonth,
                                    day=QhDay,
                                    week=QhWeek,
                                    day_of_week=QhDayOfWeek,
                                    hour=QhHour,
                                    minute=QhMinute,
                                    second=QhSecond,
                                    start_date=QhStartDate,
                                    end_date=QhEndDate,
                                    timezone=QhTimezone,
                                    jitter=QhJitter)
        elif QhTimesType == "间隔":
            QhTiggers = IntervalTrigger(weeks= int(0) if QhWeek == None else QhWeek,
                                        days= 0 if QhDay == None else QhDay,
                                        hours= 0 if QhHour == None else QhHour,
                                        minutes= 0 if QhMinute == None else QhMinute,
                                        seconds= 0 if QhSecond == None else QhSecond,
                                        start_date=QhStartDate,
                                        end_date=QhEndDate,
                                        timezone=QhTimezone,
                                        jitter=QhJitter)
        elif QhTimesType == "一次":
            QhRunDate = datetime(0 if QhYear == None else QhYear, 
                                 0 if QhMonth == None else QhMonth, 
                                 0 if QhDay == None else QhDay, 
                                 0 if QhHour == None else QhHour, 
                                 0 if QhMinute == None else QhMinute, 
                                 0 if QhSecond == None else QhSecond)
            QhTiggers = DateTrigger(run_date=QhRunDate,
                                    timezone=QhTimezone)
            
        return QhTiggers
        
    def QhAddJob(self, 
                 QhJabId,
                 QhTimesType,    # 重复 间隔 一次
                 QhJabFuncName,
                 QhJabArgs=None,
                 QhJabKwargs=None,
                 QhName=None,
                 QhYear = None,
                 QhMonth = None,
                 QhDay = None,
                 QhWeek = None,
                 QhDayOfWeek = None,
                 QhHour = None,
                 QhMinute = None,
                 QhSecond = None,
                 QhStartDate = None,
                 QhEndDate = None,
                 QhTimezone=None,
                 QhJitter=None,
                 misfire_grace_time=undefined,
                 next_run_time=undefined,
                 jobstore='default',
                 executor='default',
                 coalesce=undefined,
                 max_instances=undefined,
                 replace_existing=False,
                 QhIsFor = False,
                 QhJabStatus = None,
                 QhJobIsOpen = False,
                 ):
        # 1、判断任务是否存在,存在则直接返回
        if self.Qhscheduler.get_job(QhJabId): 
            logger.info("【任务添加01】{} is exist,return,QueHui!".format(QhJabId)) 
            # print("{} is exist!".format(QhJabId))
            return   # 如果任务已经存在,则不添加
        # 2、如果任务已关闭,则不新建,直接返回
        if QhJabStatus == "已关闭" and (not QhJobIsOpen):
            logger.info("【任务添加02】{} is close,return,QueHui!".format(QhJabId))
            # print("{} 任务已关闭,不用新建!".format(QhJabId))
            return
        # 3、set触发器
        logger.info("【任务添加03】设置触发器({}),QueHui!".format(QhTimesType))
        QhTiggers = self.QhTiggers(QhTimesType =QhTimesType,
                                   QhYear = QhYear,
                                   QhMonth = QhMonth,
                                   QhDay = QhDay,
                                   QhWeek = QhWeek,
                                   QhDayOfWeek = QhDayOfWeek,
                                   QhHour = QhHour,
                                   QhMinute = QhMinute,
                                   QhSecond = QhSecond,
                                   QhStartDate = QhStartDate,
                                   QhEndDate = QhEndDate,
                                   QhTimezone=QhTimezone,
                                   QhJitter=QhJitter)
        
        # 4、组装任务参数
        # 如果位置参数Args,和关键字参数Kwargs都不为None,则只取位置参数值
        logger.info("【任务添加04】程序参数格式化(Args&Kwargs,QueHui!")
        if QhJabArgs!=None:
            logger.info("【任务添加04.01】程序参数格式化(Args)-({}),QueHui!".format(QhJabArgs))
            QhJabArgs = tuple(QhJabArgs.split("+"))
        else:
            if QhJabKwargs != None:
                logger.info("【任务添加04.01】程序参数格式化(Kwargs)-({}),QueHui!".format(QhJabKwargs))
                QhJabKwargs = QhJabKwargs.replace("+",',')
                QhJabKwargs = QhJabKwargs.replace("'",'"')
                QhJabKwargs = json.loads(QhJabKwargs)
        # 5、添加任务
        logger.info("【任务添加05】添加程序({})到定时任务,QueHui!".format(QhJabFuncName))
        self.Qhscheduler.add_job(func=globals()[QhJabFuncName],
                                 trigger=QhTiggers,
                                 args=QhJabArgs,
                                 kwargs=QhJabKwargs,
                                 id=QhJabId,
                                 name=QhName,
                                 misfire_grace_time=misfire_grace_time,
                                 next_run_time=next_run_time,
                                 jobstore=jobstore,
                                 executor=executor,
                                 coalesce=coalesce,
                                 max_instances=max_instances,
                                 replace_existing=replace_existing,)

        # 6、还原参数格式,用于存储
        # 函数参数还原输入格式处理  阙辉
        # print(QhJabArgs,type(QhJabArgs))
        logger.info("【任务添加06】还原参数格式,用于存储,QueHui!")
        if isinstance(QhJabArgs, tuple):
            QhJabArgs = str(QhJabArgs).replace("'",'').replace("(",'').\
            replace(")",'').replace(",","+").replace("+  ","+").replace("+ ","+")
        #     print(QhJabArgs,type(QhJabArgs))
        # print(QhJabKwargs,type(QhJabKwargs))
        if isinstance(QhJabKwargs, dict):
            QhJabKwargs = str(QhJabKwargs).replace("'",'"').replace(",","+")
            # print(QhJabKwargs,type(QhJabKwargs))
        QhAddJoblDic = copy.deepcopy(QhApscheduler.QhFiled) 
        QhAddJoblDic["QhJabId"] = QhJabId
        QhAddJoblDic["QhJabName"] = QhName
        QhAddJoblDic["QhJabFuncName"] = "None" if QhJabFuncName==None else QhJabFuncName
        QhAddJoblDic["QhTimesType"] = QhTimesType
        QhAddJoblDic["QhYear"] = "None" if QhYear==None else QhYear
        QhAddJoblDic["QhMonth"] = "None" if QhMonth==None else QhMonth
        QhAddJoblDic["QhDay"] = "None" if QhDay==None else QhDay
        QhAddJoblDic["QhWeek"] = "None" if QhWeek==None else QhWeek
        QhAddJoblDic["QhDayOfWeek"] = "None" if QhDayOfWeek==None else QhDayOfWeek
        QhAddJoblDic["QhHour"] = "None" if QhHour==None else QhHour
        QhAddJoblDic["QhMinute"] = "None" if QhMinute==None else QhMinute
        QhAddJoblDic["QhSecond"] = "None" if QhSecond==None else QhSecond
        QhAddJoblDic["QhJabArgs"] = "None" if QhJabArgs==None else QhJabArgs
        QhAddJoblDic["QhJabKwargs"] = "None" if QhJabKwargs==None else QhJabKwargs
        # QhAddJoblDic["QhJabStartDate"] = "None"
        # QhAddJoblDic["QhJabNextRunTime"] = "None"
        # QhAddJoblDic["QhJabLastRunTime"] = "None"
        QhAddJoblDic["QhJabStatus"] = "已运行"

        # 7 、
        # try:   # 任务id存在,则更新任务,不存在则新增
            # self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId].index[0]
        logger.info("【任务添加07】判断任务id存在,则更新任务,不存在则新增,QueHui!")
        if QhJabId in self.QhCheduPoolJobDf["QhJabId"]:
            logger.info("【任务添加07.01】{}任务存在,更新任务数据,QueHui!".format(QhJabId))
            # 判断任务是否存在
            for QhKey,QhValue in QhAddJoblDic.items():
                if QhKey == "QhJabId":continue   
                self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId,QhKey] = QhValue
        # except:
        else:
            logger.info("【任务添加07.01】{}任务不存在,新增任务到任务池,QueHui!".format(QhJabId))
            QhCheduPoolJobDfRow = pd.DataFrame([QhAddJoblDic])
            # print(QhCheduPoolJobDfRow)
            try:
                self.QhCheduPoolJobDf = self.QhCheduPoolJobDf._append(QhCheduPoolJobDfRow)
            except:
                self.QhCheduPoolJobDf = self.QhCheduPoolJobDf.append(QhCheduPoolJobDfRow)
            # print(self.QhCheduPoolJobDf)
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        if (not QhIsFor):
            logger.info("【任务添加08】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
            self.QhSavePoolToCsv()
       
    def QhPauseJob(self,QhJabId,QhIsFor=False):
        # 暂停任务 
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉
        try:
            if QhJabId == "":
                # print("任务ID不能空,请输入任务ID")
                logger.warning("【暂停任务01】任务ID不能空,请输入任务ID,QueHui!")
                return
            if self.Qhscheduler.get_job(QhJabId):
                logger.info("【暂停任务02】暂停[{}]任务,QueHui!".format(QhJabId))
                self.Qhscheduler.pause_job(QhJabId)
                # print("暂停任务",QhJabId)
                logger.info("【暂停任务03】设置任务[{}]状态--已暂停,QueHui!".format(QhJabId))
                self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已暂停'
                if (not QhIsFor):
                    logger.info("【暂停任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
                    self.QhSavePoolToCsv()
                # AA = self.Qhscheduler.get_job(QhJabId)
                # print(AA.state)
            else:
                logger.info("【暂停任务05】暂停[{}]任务可能不存在,QueHui!".format(QhJabId))
                # print("暂停失败,请检查任务ID,任务可能不存在")
        except Exception as e:
            logger.error("【暂停任务00】暂停[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
            # print("暂停失败,请检查任务ID,任务可能不存在")
    
    def QhPauseJobFor(self,QhIsFor=True):
        # 暂停所有任务
        # 阙辉

        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount ==0:
            logger.info("【暂停所有任务01】任务数量为({}),QueHui!".format(QhJobCount)) 
            return
        logger.info("【暂停所有任务01】暂停所有任务({}),QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow['QhJabId']
            logger.info("【暂停所有任务01.01】暂停任务-{},QueHui!".format(QhJabId)) 
            self.QhPauseJob(QhJabId,QhIsFor=QhIsFor)

        logger.info("【暂停所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor)) 
        if QhIsFor == True:self.QhSavePoolToCsv()
     
    def QhResumeJob(self,QhJabId,QhIsFor=False):
        # 启动任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉
        try:
            if QhJabId == "":
                # print("任务ID不能空,请输入任务ID")
                logger.warning("【启动任务01】任务ID不能空,请输入任务ID,QueHui!")
                return
            if self.Qhscheduler.get_job(QhJabId):
                logger.info("【启动任务02】启动[{}]任务,QueHui!".format(QhJabId))
                self.Qhscheduler.resume_job(QhJabId)
                logger.info("【启动任务03】设置任务[{}]状态--已运行,QueHui!".format(QhJabId))
                # print("恢复任务",QhJabId)
                self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已运行'
                if (not QhIsFor):
                    logger.info("【启动任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
                    self.QhSavePoolToCsv()
            else:
                # print("恢复失败,请检查任务ID,任务可能不存在")
                logger.info("【启动任务05】启动[{}]任务可能不存在,QueHui!".format(QhJabId))
        except Exception as e:
            # print("恢复失败,请检查任务ID,任务可能不存在")
            logger.error("【启动任务00】启动[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))

    def QhResumeJobFor(self,QhIsFor=True):
        # 启动所有任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉

        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount ==0:
            logger.info("【启动所有任务01】任务数量为({}),QueHui!".format(QhJobCount)) 
            return
        logger.info("【启动所有任务01】启动所有任务({}),QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow['QhJabId']
            logger.info("【启动所有任务01.01】启动任务-{},QueHui!".format(QhJabId)) 
            self.QhResumeJob(QhJabId,QhIsFor=QhIsFor)
        logger.info("【启动所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor)) 
        if QhIsFor == True:self.QhSavePoolToCsv()

    def QhRemoveJob(self,QhJabId,QhIsFor=False):
        # 关闭任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉
        try:
            if QhJabId == "":
                # print("任务ID不能空,请输入任务ID")
                logger.warning("【关闭任务01】任务ID不能空,请输入任务ID,QueHui!")
                return
            if self.Qhscheduler.get_job(QhJabId):
                logger.info("【关闭任务02】关闭[{}]任务,QueHui!".format(QhJabId))
                self.Qhscheduler.remove_job(QhJabId)
                logger.info("【关闭任务03】设置任务[{}]状态--已关闭,QueHui!".format(QhJabId))
                self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已关闭'
                if (not QhIsFor):
                    logger.info("【关闭任务04】单个-{}任务更新保存到CSV-{},QueHui!".format(QhJabId,QhIsFor))
                    self.QhSavePoolToCsv()
            else:
                logger.info("【关闭任务05】关闭[{}]任务可能不存在,QueHui!".format(QhJabId))
        except Exception as e:
            logger.error("【关闭任务00】关闭[{}]任务报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
        
    def QhRemoveJobFor(self,QhIsFor=True):
        # 关闭所有任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉

        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount ==0:
            logger.info("【关闭所有任务01】任务数量为({}),QueHui!".format(QhJobCount)) 
            return
        logger.info("【关闭所有任务01】暂停所有任务({}),QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow['QhJabId']
            logger.info("【关闭所有任务01.01】关闭任务-{},QueHui!".format(QhJabId)) 
            self.QhRemoveJob(QhJabId,QhIsFor=QhIsFor)
        logger.info("【关闭所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor)) 
        if QhIsFor == True:self.QhSavePoolToCsv()

    def QhReopenJob(self,QhJabId,QhIsFor=False):
        # 重新打开任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉
        # try:
        if QhJabId == "":
            # print("任务ID不能空,请输入任务ID")
            logger.warning("【打开任务01】任务ID不能空,请输入任务ID,QueHui!")
            return
        QhJabStatus = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'].values[0])
        if not self.Qhscheduler.get_job(QhJabId):
            if QhJabStatus == "已关闭":
                # QhJabId = QhRow["QhJabId"]
                QhJabName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabName'].values[0]
                QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
                QhTimesType = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhTimesType'].values[0]
                QhYear = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhYear'].values[0])
                QhMonth = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMonth'].values[0])
                QhDay = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDay'].values[0])
                QhWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhWeek'].values[0])
                QhDayOfWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDayOfWeek'].values[0])
                QhHour = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhHour'].values[0])
                QhMinute = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMinute'].values[0])
                QhSecond = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhSecond'].values[0])
                QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
                QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])       
                
                logger.info("【打开任务01】\n任务ID:{};\n任务名称:{};\n程序名称:{};\n任务状态:{};\n初始化,QueHui!".format(QhJabId,QhJabName,QhJabFuncName,QhJabStatus)) 
                self.QhAddJob(QhJabId,
                            QhTimesType=QhTimesType,
                            QhJabFuncName = QhJabFuncName,
                            QhJabArgs=QhJabArgs,
                            QhJabKwargs=QhJabKwargs,
                            QhName = QhJabName,
                            QhYear = QhYear,
                            QhMonth = QhMonth,
                            QhDay = QhDay,
                            QhWeek = QhWeek,
                            QhDayOfWeek = QhDayOfWeek,
                            QhHour = QhHour,
                            QhMinute = QhMinute,
                            QhSecond = QhSecond,
                            QhJabStatus=QhJabStatus,
                            QhIsFor = QhIsFor,
                            QhJobIsOpen = True)  # 状态已在此函数更新
                    
                    
    def QhReopenJobFor(self,QhIsFor=True):
        # 重新打开所有任务
        # QhIsFor当为False时,保存到csv文件,单个增加时的场景
        # 阙辉

        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount ==0:
            logger.info("【打开所有任务01】任务数量为({}),QueHui!".format(QhJobCount)) 
            return
        logger.info("【打开所有任务01】启动所有任务({}),QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow['QhJabId']
            logger.info("【打开所有任务01.01】打开任务-{},QueHui!".format(QhJabId)) 
            self.QhReopenJob(QhJabId,QhIsFor=QhIsFor)
        logger.info("【打开所有任务02】批量({})-任务更新保存到CSV-{},QueHui!".format(QhJobCount,QhIsFor)) 
        if QhIsFor == True:self.QhSavePoolToCsv()


    def QhShouDongRunJob(self,QhJabId):
        # 手动执行任务
        # 阙辉
        try:
            if QhJabId == "":
            # print("任务ID不能空,请输入任务ID")
                logger.warning("【手动执行任务01】任务ID不能空,请输入任务ID,QueHui!")
                return
            # if self.Qhscheduler.get_job(QhJabId):
            QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
            QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
            QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
            QhJabFunc = globals().get(QhJabFuncName)
            logger.info("【手动执行任务01】\n任务ID:{};\n程序名称:{};\n程序参数Args:{};\n程序参数Kwargs:{};\n初始化,QueHui!".format(QhJabId,QhJabFuncName,QhJabArgs,QhJabKwargs)) 
            if QhJabFunc:
                if QhJabArgs!= None:
                    QhJabArgs = tuple(QhJabArgs.split("+"))
                    QhJabFunc(*QhJabArgs)
                elif QhJabKwargs != None:
                    QhJabKwargs = QhJabKwargs.replace("+",',')
                    QhJabKwargs = QhJabKwargs.replace("'",'"')
                    QhJabKwargs = json.loads(QhJabKwargs)
                    QhJabFunc(**QhJabKwargs)
                logger.info("【手动执行任务02】{}-程序执行完成,QueHui!".format(QhJabFuncName))
            else:
                # print(f"Function '{QhJabFuncName}' not found.")
                logger.warning("【手动执行任务02】{}-程序不存在,QueHui!".format(QhJabFuncName))
            # self.Qhscheduler.run_job(QhJabId)
                
        except Exception as e:
            logger.error("【手动执行任务00】程序[{}-{}]执行报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,QhJabFuncName,e))
            # print(f"An error occurred while executing the job: {e}")
            # print("2手动执行任务失败,请检查任务ID,任务可能不存在")

    def QhShouDongRunJobFor(self):
        # 手动运行所有任务
        # 阙辉
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            self.QhShouDongRunJob(QhRow['QhJabId'])

        QhJobCount = self.QhCheduPoolJobDf.shape[0]
        if QhJobCount ==0:
            logger.info("【手动执行所有任务01】任务数量为({}),QueHui!".format(QhJobCount)) 
            return
        logger.info("【手动执行所有任务01】启动所有任务({}),QueHui!".format(QhJobCount)) 
        for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
            QhJabId = QhRow['QhJabId']
            logger.info("【手动执行所有任务01.01】手动执行任务-{},QueHui!".format(QhJabId)) 
            self.QhShouDongRunJob(QhJabId)
    
    def QhXiuGaiJob(self,QhJabId,
                    QhIsFor=True):
        # 修改任务方案:先删除,修改后重新添加
        pass

    def QhGetJobsState(self):
        # 获取任务情况
        # 阙辉
        for job in self.Qhscheduler.get_jobs():
            print(f"Job ID: {job.id}, 任务的下一次运行时间: {self.QhGetNextJobTime(job.id)}")
            print(f"Job ID: {job.id}, 任务是否有待执行: {job.pending}")
            # print(f"Job ID: {job.id}, 任务是否有待执行: {job.job_state}")
            # print(f"Job ID: {job.id}, 任务是否正在运行: {job.running}")

    def QhGetNextJobTime(self,QhJabId):
        # 任务下一次运行时间
        # 阙辉
        try:
            if QhJabId == "":
                logger.warning("【任务下一次运行时间01】任务ID不能空,请输入任务ID,QueHui!")
                return
            QhJob = self.Qhscheduler.get_job(QhJabId)
            if QhJob:
                logger.info("【任务下一次运行时间02】[{}]获取下一次运行时间,QueHui!".format(QhJabId))
                QhNextRunTime = QhJob.next_run_time
                QhNextRunTimeStr = QhNextRunTime.strftime("%Y-%m-%d %H:%M:%S")
                return QhNextRunTimeStr
                # print(QhNextRunTimeStr)
                # print(type(QhNextRunTime))
            else:
                logger.info("【任务下一次运行时间03】[{}]任务可能不存在,QueHui!".format(QhJabId))
        except Exception as e:
            logger.error("【任务下一次运行时间00】[{}]获取下一次运行时间报错,QueHui!\n报错消息如下:\n{}".format(QhJabId,e))
        

    def QhGeShiZH(self,QhValue):
        # 格式转化  后期需要优化
        # 1、如果是字符串"None","",则返回None
        # 2、如果是字符串,则返回字符串
        # 作者:阙辉
        try:
            if isinstance(QhValue, str):
            # 1、判断是不是字符串
                if QhValue in ["None","","0"]:
                    # 1.1、如果是字符串"None","",则返回None
                    return None
                else:
                    # 1.2、如果非["None","","0"]:
                    try:
                        # 1.2.1、尝试转换为int,转换成功则返回int
                        return int(QhValue)
                    except:
                        # 1.2.2、尝试转换为int,转换失败则返回字符串(原值)
                        return QhValue
            else:
                # 2、如果非字符串,尝试转换为int
                try:
                    # 2.1、尝试转换为int,转换成功则返回int
                    return int(QhValue)
                except:
                    # 2.2、尝试转换为int,转换失败则返回字符串(原值)
                    return QhValue
        except:
            # 以上都失败,则返回原值
            return QhValue
        
    def QhSavePoolToCsv(self):
        # 保存 任务池数据
        # 作者:阙辉
        
        try:
            self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False,encoding='gbk')
            logger.info("【任务保存】,任务保存成功,QueHui!") 
        except Exception as e:
            logger.error("【任务保存】,任务保存失败,QueHui!\n错误消息:\n{}".format(e))

UI代码

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui file 'QhTestJob.ui'
#
# Created by: PyQt5 UI code generator 5.15.7
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again.  Do not edit this file unless you know what you are doing.


from PyQt5 import QtCore, QtGui, QtWidgets


class Ui_Form(object):
    def setupUi(self, Form):
        Form.setObjectName("Form")
        Form.resize(632, 610)
        self.gridLayout = QtWidgets.QGridLayout(Form)
        self.gridLayout.setObjectName("gridLayout")
        self.verticalLayout = QtWidgets.QVBoxLayout()
        self.verticalLayout.setObjectName("verticalLayout")
        self.horizontalLayout_5 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_5.setObjectName("horizontalLayout_5")
        self.pushButton = QtWidgets.QPushButton(Form)
        self.pushButton.setObjectName("pushButton")
        self.horizontalLayout_5.addWidget(self.pushButton)
        self.lineEdit_5 = QtWidgets.QLineEdit(Form)
        self.lineEdit_5.setMinimumSize(QtCore.QSize(500, 28))
        self.lineEdit_5.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_5.setObjectName("lineEdit_5")
        self.horizontalLayout_5.addWidget(self.lineEdit_5)
        self.verticalLayout.addLayout(self.horizontalLayout_5)
        self.horizontalLayout_4 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_4.setObjectName("horizontalLayout_4")
        self.pushButton_2 = QtWidgets.QPushButton(Form)
        self.pushButton_2.setObjectName("pushButton_2")
        self.horizontalLayout_4.addWidget(self.pushButton_2)
        self.lineEdit = QtWidgets.QLineEdit(Form)
        self.lineEdit.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit.setObjectName("lineEdit")
        self.horizontalLayout_4.addWidget(self.lineEdit)
        self.verticalLayout.addLayout(self.horizontalLayout_4)
        self.horizontalLayout_18 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_18.setObjectName("horizontalLayout_18")
        self.pushButton_18 = QtWidgets.QPushButton(Form)
        self.pushButton_18.setObjectName("pushButton_18")
        self.horizontalLayout_18.addWidget(self.pushButton_18)
        self.lineEdit_18 = QtWidgets.QLineEdit(Form)
        self.lineEdit_18.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit_18.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_18.setObjectName("lineEdit_18")
        self.horizontalLayout_18.addWidget(self.lineEdit_18)
        self.verticalLayout.addLayout(self.horizontalLayout_18)
        self.horizontalLayout_3 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_3.setObjectName("horizontalLayout_3")
        self.pushButton_3 = QtWidgets.QPushButton(Form)
        self.pushButton_3.setObjectName("pushButton_3")
        self.horizontalLayout_3.addWidget(self.pushButton_3)
        self.lineEdit_2 = QtWidgets.QLineEdit(Form)
        self.lineEdit_2.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit_2.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_2.setObjectName("lineEdit_2")
        self.horizontalLayout_3.addWidget(self.lineEdit_2)
        self.verticalLayout.addLayout(self.horizontalLayout_3)
        self.horizontalLayout_6 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_6.setObjectName("horizontalLayout_6")
        self.pushButton_6 = QtWidgets.QPushButton(Form)
        self.pushButton_6.setObjectName("pushButton_6")
        self.horizontalLayout_6.addWidget(self.pushButton_6)
        self.lineEdit_6 = QtWidgets.QLineEdit(Form)
        self.lineEdit_6.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit_6.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_6.setObjectName("lineEdit_6")
        self.horizontalLayout_6.addWidget(self.lineEdit_6)
        self.verticalLayout.addLayout(self.horizontalLayout_6)
        self.horizontalLayout_2 = QtWidgets.QHBoxLayout()
        self.horizontalLayout_2.setObjectName("horizontalLayout_2")
        self.pushButton_4 = QtWidgets.QPushButton(Form)
        self.pushButton_4.setObjectName("pushButton_4")
        self.horizontalLayout_2.addWidget(self.pushButton_4)
        self.lineEdit_3 = QtWidgets.QLineEdit(Form)
        self.lineEdit_3.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit_3.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_3.setObjectName("lineEdit_3")
        self.horizontalLayout_2.addWidget(self.lineEdit_3)
        self.verticalLayout.addLayout(self.horizontalLayout_2)
        self.horizontalLayout = QtWidgets.QHBoxLayout()
        self.horizontalLayout.setObjectName("horizontalLayout")
        self.pushButton_5 = QtWidgets.QPushButton(Form)
        self.pushButton_5.setObjectName("pushButton_5")
        self.horizontalLayout.addWidget(self.pushButton_5)
        self.lineEdit_4 = QtWidgets.QLineEdit(Form)
        self.lineEdit_4.setMinimumSize(QtCore.QSize(230, 28))
        self.lineEdit_4.setMaximumSize(QtCore.QSize(500, 16777215))
        self.lineEdit_4.setObjectName("lineEdit_4")
        self.horizontalLayout.addWidget(self.lineEdit_4)
        self.verticalLayout.addLayout(self.horizontalLayout)
        self.pushButton_19 = QtWidgets.QPushButton(Form)
        self.pushButton_19.setObjectName("pushButton_19")
        self.verticalLayout.addWidget(self.pushButton_19)
        self.pushButton_24 = QtWidgets.QPushButton(Form)
        self.pushButton_24.setObjectName("pushButton_24")
        self.verticalLayout.addWidget(self.pushButton_24)
        self.pushButton_20 = QtWidgets.QPushButton(Form)
        self.pushButton_20.setObjectName("pushButton_20")
        self.verticalLayout.addWidget(self.pushButton_20)
        self.pushButton_21 = QtWidgets.QPushButton(Form)
        self.pushButton_21.setObjectName("pushButton_21")
        self.verticalLayout.addWidget(self.pushButton_21)
        self.pushButton_22 = QtWidgets.QPushButton(Form)
        self.pushButton_22.setObjectName("pushButton_22")
        self.verticalLayout.addWidget(self.pushButton_22)
        self.pushButton_23 = QtWidgets.QPushButton(Form)
        self.pushButton_23.setObjectName("pushButton_23")
        self.verticalLayout.addWidget(self.pushButton_23)
        self.gridLayout.addLayout(self.verticalLayout, 0, 0, 1, 1)

        self.retranslateUi(Form)
        QtCore.QMetaObject.connectSlotsByName(Form)

    def retranslateUi(self, Form):
        _translate = QtCore.QCoreApplication.translate
        Form.setWindowTitle(_translate("Form", "Form"))
        self.pushButton.setText(_translate("Form", "添加任务"))
        self.pushButton_2.setText(_translate("Form", "暂停任务"))
        self.pushButton_18.setText(_translate("Form", "启动任务"))
        self.pushButton_3.setText(_translate("Form", "删除任务"))
        self.pushButton_6.setText(_translate("Form", "打开任务"))
        self.pushButton_4.setText(_translate("Form", "手动执行"))
        self.pushButton_5.setText(_translate("Form", "获取任务状态"))
        self.pushButton_19.setText(_translate("Form", "获取所有任务的状态"))
        self.pushButton_24.setText(_translate("Form", "手动执行所有任务"))
        self.pushButton_20.setText(_translate("Form", "暂停所有任务"))
        self.pushButton_21.setText(_translate("Form", "恢复所有任务"))
        self.pushButton_22.setText(_translate("Form", "关闭所有任务"))
        self.pushButton_23.setText(_translate("Form", "打开所有任务"))