一.执行器与调度器流程图

二.调度器调度执行器时序图

三.执行器接收到调度器的调度信息执行job时序图
四.调度器远程调用执行器部分代码片段
1.配置类实现InitializingBean接口,初始化bean完成,执行afterPropertiesSet方法
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
/**
spring初始化bean完成,执行此方法,初始化触发调度
**/
xxlJobScheduler.init();
}
}
2.调度初始化JobScheduleHelper.getInstance().start()为开启自旋等待job触发
public class XxlJobScheduler {
public void init() throws Exception {
// init i18n
initI18n();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin registry monitor run
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// admin log report start
JobLogReportHelper.getInstance().start();
/**
**/
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
}
3.调度线程,自旋,等待触发调用执行器JobTriggerPoolHelper.trigger(jobInfo.getId())
public class JobScheduleHelper {
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
/**
调度线程,自旋,等待触发调用执行器
**/
while (!scheduleThreadToStop) {
JobTriggerPoolHelper.trigger(jobInfo.getId(),
}
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
}
4.XxlJobTrigger触发器,查询触发策略,远程调用执行器
public class XxlJobTrigger {
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
processTrigger(group, jobInfo, finalFailRetryCount
, triggerType, shardingParam[0], shardingParam[1]);
}
private static void processTrigger(XxlJobGroup group
, XxlJobInfo jobInfo, int finalFailRetryCount
, TriggerTypeEnum triggerType, int index, int total){
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
}
public static ReturnT<String> runExecutor(TriggerParam triggerParam
, String address){
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
}
5.ExecutorBizClient调度器远程调用run接口,执行定时任务
public class ExecutorBizClient implements ExecutorBiz {
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run"
, accessToken, timeout, triggerParam, String.class);
}
}
五.执行器执行job部分代码片段
1.XxlJobSpringExecutor容器初始化,调用afterSingletonsInstantiated方法
public class XxlJobSpringExecutor
extends XxlJobExecutor implements ApplicationContextAware
, SmartInitializingSingleton
, DisposableBean {
// start
@Override
public void afterSingletonsInstantiated() {
/*initJobHandlerRepository(applicationContext);*/
initJobHandlerMethodRepository(applicationContext);
GlueFactory.refreshInstance(1);
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2.XxlJobExecutor的start方法,启动netty服务,等待请求
public class XxlJobExecutor {
public void start() throws Exception {
initEmbedServer(address, ip, port, appname, accessToken);
}
}
3.EmbedHttpServerHandler等待调度请求
public static class EmbedHttpServerHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx
, FullHttpRequest msg) throws Exception {
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
Object responseObj = process(httpMethod, uri
, requestData, accessTokenReq);
}
});
}
private Object process(HttpMethod httpMethod, String uri
, String requestData, String accessTokenReq) {
try {
switch (uri) {
case "/run":
TriggerParam triggerParam = GsonTool
.fromJson(requestData, TriggerParam.class);
}
} catch (Exception e) {
}
}
4.本地执行类ExecutorBizImpl,执行handler,registJobThread注册执行线程,启动
public class ExecutorBizImpl implements ExecutorBiz {
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId()
, jobHandler, removeOldReason);
}
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
5.执行线程类,执行handler
public class JobThread extends Thread{
@Override
public void run() {
while(!toStop){
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
/**
执行业务handler
**/
handler.execute();
return true;
}
});
}
}
六.讨论“为什么xxl-job底层不用quartz框架”?
本文含有隐藏内容,请 开通VIP 后查看
