【分布式任务调度】(四)XXL-JOB的任务调度及执行流程

发布于:2022-10-29 ⋅ 阅读:(3975) ⋅ 点赞:(3)

1.概述

从上一篇《XXL-JOB的注册与发现流程及原理》了解到了XXL-JOB的注册原理,在这个基础上就可以聊一聊XXL-JOB的调度和执行流程了。
XXL-JOB要完成一次任务调度,需要发起两次Http通信请求,分别是:

  • 调度中心调用执行器,执行定时任务。
  • 执行器回调调度中心,上报定时任务执行结果。

在调度和执行的整个流程中,XXL-JOB使用了大量的异步操作,减少调度中心的资源压力,以此在集中式调度配置与性能之间找到平衡点。

2.对调度流程的思考

在研究XXL-JOB的执行流程之前,不妨先思考一下,如果我们自己来实现一个定时任务的调度,要从哪些方面去入手?


有了前面基本博文的基础,我们现在已经有了一个调度中心集群,一个执行器,以及一个定时任务配置。在不考虑异步执行的情况下,要实现任务调度就非常简单了。

我们只需要在调度中心启动一个线程,不断的去扫描任务配置表,判断该任务是否到了触发时间
如果到了触发时间,调度中心就使用这个任务对应的执行器配置,获取到执行器的ip、端口,直接发起Http请求,等待执行器执行完毕后,响应一个执行结果。这样,一个最简单定时任务就完成了。

当然,这样的定时任务只能存在与Demo中,想要在生产环境中运行,还需要解决很多问题,这里例举一些:

  • 线程中循环扫描任务配置表是否过于频繁
  • 调度中心集群同时执行一个任务,造成重复调度如何处理
  • 任务是否到了触发时间该如何判断
  • 当前线程既要做扫描,又要做调度,同步请求的阻塞过程影响到了其他任务的调度该如何处理
  • 同步调用执行器,执行器执行任务耗时很长阻塞了调度中心的线程怎么办
  • ……

带着这样的问题,我们一起看看XXL-JOB是如何解决的。

不记得定时任务的配置过程的同学,可以回顾一下《XXL-JOB执行器配置及定时任务的创建》

3.调度中心流程

调度中心做的第一件事,就是启动线程不断的扫描定时任务的配置表,我们可以从初始化方法中找到这个线程的初始化过程,在JobScheduleHelper.getInstance().start()中,会启动两个线程,分别是:
在这里插入图片描述

  • scheduleThread:这个线程就是用来扫描任务配置表,并判断当前任务是否应该触发。
  • ringThread:大部分的任务触发都在这个线程中,这个线程会从时间轮中去获取数据。

3.1.任务配置扫描

scheduleThread主要是在循环扫描定时任务配置表xxl_job_info,这里会引出第一个问题调度中心集群是如何避免重复调度问题的
其实就是使用了一个分布式锁,当然分布式锁的实现有很多种,XXL-JOB选择的是使用MySQL的排他锁来实现的,其实就是一句简单的SQL:

select * from xxl_job_lock where lock_name = 'schedule_lock' for update;

每个调度中心的scheduleThread在扫描任务表之前,都会先执行这个语句,如果前面还有其他的线程在执行扫描的过程,当前线程就会被阻塞,等待锁释放。

第二个问题,如何避免频繁的查询数据库呢?
我们先看一下实现的代码:
在这里插入图片描述
这段代码的意思是,如果本次扫描和触发判断的逻辑,消耗时间小于1s,就需要让线程sleep一段时间,这个时间值的计算包含了两个字段值:

  • preReadSuc:是一个boolean值,当查询出的需要做触发判断的 定时任务列表 为空时,值为false,反之,值为true
  • PRE_READ_MS:是一个常量值,表示 5秒。

也就是说,如果查询的定时任务列表有值,则表示程序在正常的处理定时任务,此时就让线程Sleep 0到1秒,如果定时任务列表没有值,则有可能下一秒查询还是没有值,则Sleep 4-5秒,以此来减少查询次数。
至于为什么这里的阈值为5秒,我们可以接着往下看。

定时任务扫描条件
查询定时任务配置的时候,判断是否触发有一个很重要的时间点:当前系统时间
但是我们使用当前系统时间进行精确匹配查询的话,数据是有可能不准确的。比如当前获取到的时间是2022年10月26日18点整,但是由于程序耗时,io耗时等影响,实际进行查询的时候可能是18点过1秒了,那这个18点整的任务就错过了。

一般我们会通过将时间点扩大为一个时间段的方式来处理,例如XXL-JOB的处理方式就是,给当前系统时间加上5秒,然后查询出 触发时间 <=(当前时间+5s) 的数据,得到一个最迟触发时间5秒后 的任务列表。
在这里插入图片描述
接下来,就会依次处理这个列表中每一个任务的实际触发时机。

3.2.任务触发时机

XXL-JOB将查询出的任务列表数据分为了三个部分:

  • 已超时5秒以上
  • 已超时但不足5秒
  • 还未到触发时间或恰好到触发时间(通过时间戳来做得判断,恰好到的情况很少)

由于当前时间nowTime已经固定,而每个任务的触发时间可能会不一样,以触发时间来做一个时间轴,就可以用图示直观的表示这三个部分的数据,如下图:
在这里插入图片描述
这里的触发时间,指的是xxl_job_info表中trigger_next_time字段值,这个值在创建、更新任务或者在任务触发时更新。具体是通过当前时间CRON表达式来计算出一个具体的时间。


3.2.1.已超时5秒以上

根据调度过期策略的配置,有两种不同的执行流程,默认情况下的调度过期策略是忽略,已经超时5秒以上的任务会被丢弃掉。另外一种策略是立即执行一次,就是字面意思,立即触发一次任务调度。
在这里插入图片描述


3.2.2.还未到触发时间

这部分任务会被放入到时间轮中,时间轮中的数据会在ringThread线程中不断的被取出,然后调用trigger方法进行任务触发。
在这里插入图片描述

时间轮是一种用于实现定时器、延时调度等功能的算法,广泛的运用与各种中间件中,例如:Netty、Kafka、Dubbo等。如果想详细的了解时间轮算法,可以自行百度一下,而在XXL-JOB中,使用的是HashMap来实现的,具体的做法是:

先获取到triggerNextTime的值,这是一个时间戳,通过下面的算法可以获取到这个时间戳对应的秒数。

// [0,59]
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

然后 以 ringSecond 为key,jobId(任务Id)为value,put到HashMap中。

private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

 private void pushTimeRing(int ringSecond, int jobId){
    List<Integer> ringItemData = ringData.get(ringSecond);
    if (ringItemData == null) {
        ringItemData = new ArrayList<Integer>();
        ringData.put(ringSecond, ringItemData);
    }
    ringItemData.add(jobId);
}

剥开时间轮神秘的面纱,其实实现起来非常的简单,当然XXL-JOB中的时间轮算法只是一种最简单的运用。

3.2.3.超时未超过5秒

处于这个时间段内的任务,会立即触发一次,并更新下次触发时间。
需要注意的是,如果发现下次触发时间满足在未来5秒内会触发,还会将这个任务直接放入到时间轮中,下次触发就不再从数据库获取数据开始执行了,当然,为了避免重复添加这个任务到时间轮中,会再次更新下次触发时间。

3.3.任务触发

多数的任务都是通过ringThread使用时间轮来进行触发的。
在这里插入图片描述
先获取到当前时间的秒数,然后从时间轮中取出当前秒前一秒的所有任务,最后循环ringItemData,依次触发其中的每一个任务。
在这里插入图片描述
任务的执行是一个相对耗时的操作,对于这种没有紧密的事务关联又相对耗时的操作,一般都会选择使用异步处理,所以trigger方法会把请求扔到线程池triggerPool中处理。
XXL-JOB中的triggerPool有快慢两种线程池fastTriggerPoolslowTriggerPool,主要是做一个线程池的隔离,将执行偏慢的任务放到slowTriggerPool中,避免执行较慢的任务占用过多的字段导致正常的任务也不能快速的调度。
一个任务如果远程调度的时间超过500ms(不是任务执行时间)就可以标记一次慢任务,在10分钟内同一个任务表标记慢10次就会进入到slowTriggerPool中运行了。

线程中的任务会经过保存日志、请求参数封装、路由策略等操作,最终会获取到一个执行器的地址,通过Http调用run方法,执行任务。
在这里插入图片描述


至此,调度中心的调度流程就已经结束了,接下来就等待执行器回调,获取任务执行结果。

4.执行器流程

我们在执行器中配置一个定时任务的时候,会在需要定时执行的方法上使用@XxlJob来标记这个方法,官方示例中是这么去标记一个方法的。
在这里插入图片描述
注解中的demoJobHandler就是任务处理器的名称。

在服务启动,并且单例的Bean注册完成时,会回调afterSingletonsInstantiated方法,进而调用initJobHandlerMethodRepository方法。
在这里插入图片描述
以官方的任务示例为例,这里处理的是任务处理器demoJobHandler与其所在的类(这里使用的是Spring管理,所以是bean)及其标记的方法,将类信息、方法信息生成了一个jobHandler对象来作为value,用demoJobHandler作为key,保存它们之间的映射关系。

4.1.执行器任务调度

任务处理器选择
执行器对象在获取到调度器传入的参数之后,会根据任务处理器名称获取到对应的jobHandler
在这里插入图片描述


阻塞策略判断
XXL-JOB提供了三种阻塞策略,分别是:

  • 单机串行:前一个任务还没有执行完毕,就等前一个任务执行完再执行当次的任务
  • 丢弃后续调度:前一个任务没有执行完毕,就终止当次任务。
  • 覆盖之前调度:前一个任务没有执行完毕,就把它停掉,并执行当次任务。

这里需要说明的是,XXL-JOB中为每一个被@XxlJob标记的方法都创建了一个专用的线程jobThread来执行定时任务。这是一个懒加载的线程,即在方法被触发时才会创建。
jobThread中,有两个重要的成员变量:

  • handler:这个就是上面说的jobHandler
  • triggerQueue:触发队列,单机串行的策略会使用到这个队列。
    在这里插入图片描述
    注:如果当前方法是第一次执行,显然就不会有前一个没有执行完毕的任务,所以第一次被触发的方法不会进入阻塞策略的选择中。

任务执行
通过了阻塞策略判断之后,调度参数会被push到triggerQueue中,jobThread会从这个队列中获取数据。
在这里插入图片描述
然后通过 handler.execute(),通过反射来调用实际的定时任务方法。
在这里插入图片描述
注:如果配置了超时时间的话,这里会通过callable来执行 handler.execute(),并在get方法中超时时间,如下:

// 超时则会抛出 TimeoutException
futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);

4.2.任务回调

最终任务在执行完毕之后,被被push到回调队列callBackQueue中,回调线程会从队列中获取到回调信息,通过callback方法回调调度中心。
在这里插入图片描述
调度中心在获取到回到的信息之后,就会更新数据库的任务信息。

5.流程图

至此,一次任务调度和执行的流程就结束了。
在这里插入图片描述

6.总结

异步实践
XXL-JOB的调度流程中使用了大量的异步用法,总结起来就是两种:

  • 通过线程池来执行异步操作
  • 通过自旋线程 + 阻塞队列的方式来执行异步操作

源码中对多线程的使用方式是一种非常好的,我们完全可以参照这里的源码,在自己的项目里面实现异步调度。

调度流程
XXL-JOB调度流程的思想是比较容易理解的,整个流程看起来很舒服。

  • 获取任务:调度线程不断的扫描任务表,查询出将要执行的任务。
  • 前置处理:对每一个任务都做一次触发时间的计算,能够立即触发的就立即触发,不能立即触发的就放在时间轮中触发,不能触发的就抛弃掉。
  • 触发任务:调度线程不断的从时间轮中获取任务并触发。
  • 异步调度:调度中心将调度与触发做了异步处理,使用触发线程池来做Http调用。
  • 任务执行:执行器为每个任务都分配了一个线程,自己处理自己的任务,任务之间不会互相影响。
  • 任务回调:将执行结果回传到调度中心中,更新任务运行状态。
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到