xxl-job源码解读(调度器调度执行器)

发布于:2022-12-12 ⋅ 阅读:(1100) ⋅ 点赞:(0)

一.执行器与调度器流程图

 二.调度器调度执行器时序图

三.执行器接收到调度器的调度信息执行job时序图

 四.调度器远程调用执行器部分代码片段

1.配置类实现InitializingBean接口,初始化bean完成,执行afterPropertiesSet方法

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        /**
            spring初始化bean完成,执行此方法,初始化触发调度
        **/
        xxlJobScheduler.init();
    }
}

 2.调度初始化JobScheduleHelper.getInstance().start()为开启自旋等待job触发

public class XxlJobScheduler  {
    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        /**
            
        **/
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
}

3.调度线程,自旋,等待触发调用执行器JobTriggerPoolHelper.trigger(jobInfo.getId())

public class JobScheduleHelper {

    public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {
                       
                /**
                调度线程,自旋,等待触发调用执行器
                **/
                while (!scheduleThreadToStop) {

                   JobTriggerPoolHelper.trigger(jobInfo.getId(), 

                }
             }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();
}

4.XxlJobTrigger触发器,查询触发策略,远程调用执行器

public class XxlJobTrigger {

    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {
       
            processTrigger(group, jobInfo, finalFailRetryCount
, triggerType, shardingParam[0], shardingParam[1]);
    
    }

    private static void processTrigger(XxlJobGroup group
, XxlJobInfo jobInfo, int finalFailRetryCount
, TriggerTypeEnum triggerType, int index, int total){
   
        ReturnT<String> triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }
       
    }
  
    public static ReturnT<String> runExecutor(TriggerParam triggerParam
, String address){
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);  
    }

5.ExecutorBizClient调度器远程调用run接口,执行定时任务


public class ExecutorBizClient implements ExecutorBiz {

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run"
, accessToken, timeout, triggerParam, String.class);
    }
}

 五.执行器执行job部分代码片段

1.XxlJobSpringExecutor容器初始化,调用afterSingletonsInstantiated方法

public class XxlJobSpringExecutor 
extends XxlJobExecutor implements ApplicationContextAware
, SmartInitializingSingleton
, DisposableBean {

    // start
    @Override
    public void afterSingletonsInstantiated() {
        /*initJobHandlerRepository(applicationContext);*/
        initJobHandlerMethodRepository(applicationContext);
        GlueFactory.refreshInstance(1);
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2.XxlJobExecutor的start方法,启动netty服务,等待请求

public class XxlJobExecutor  {
 
    public void start() throws Exception {
        initEmbedServer(address, ip, port, appname, accessToken);
    }
}

3.EmbedHttpServerHandler等待调度请求

public static class EmbedHttpServerHandler 
extends SimpleChannelInboundHandler<FullHttpRequest> {
        @Override
        protected void channelRead0(final ChannelHandlerContext ctx
, FullHttpRequest msg) throws Exception {
            bizThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    Object responseObj = process(httpMethod, uri
, requestData, accessTokenReq);

                }
            });
        }

private Object process(HttpMethod httpMethod, String uri
, String requestData, String accessTokenReq) {
            try {
             switch (uri) {
                case "/run":
                        TriggerParam triggerParam = GsonTool
.fromJson(requestData, TriggerParam.class);      
                }
            } catch (Exception e) {
          
            }
        }

4.本地执行类ExecutorBizImpl,执行handler,registJobThread注册执行线程,启动

public class ExecutorBizImpl implements ExecutorBiz {


    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {   
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId()
, jobHandler, removeOldReason);
        }
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

5.执行线程类,执行handler

public class JobThread extends Thread{
    @Override
	public void run() {
		while(!toStop){
		try {
		FutureTask<Boolean> futureTask = new FutureTask<Boolean>(
new Callable<Boolean>() {
				@Override
			public Boolean call() throws Exception {
                  /**
                       执行业务handler
                        **/
						handler.execute();
					return true;
					}                  
				});
							
	}
}

六.讨论“为什么xxl-job底层不用quartz框架”?

本文含有隐藏内容,请 开通VIP 后查看

网站公告


今日签到

点亮在社区的每一天
去签到