整个工具的代码都在Gitee或者Github地址内
gitee:solomon-parent: 这个项目主要是总结了工作上遇到的问题以及学习一些框架用于整合例如:rabbitMq、reids、Mqtt、S3协议的文件服务器、mongodb
需要引入的JAR包(版本根据自身要求使用,本教程用的版本均为最新)
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
1.新增JokTask注解
/**
* xxl-job注解
*/
@Target(value = { ElementType.FIELD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface JobTask {
@AliasFor(annotation = Component.class)
String value() default "";
/**
* 执行器主键ID
*/
int jobGroup() default 1;
/**
* 任务描述
*/
String taskName();
/**
* 负责人
*/
String author();
/**
* 报警邮件
*/
String alarmEmail() default "";
/**
* 调度类型 默认不调度
*/
ScheduleTypeEnum scheduleType() default ScheduleTypeEnum.NONE;
/**
* 调度配置 CRON(* * * * * ?) FIX_RATE(30秒)
*/
String scheduleConf() default "";
/**
* 运行模式
*/
GlueTypeEnum glueType() default GlueTypeEnum.BEAN;
/**
* 执行器,任务Handler名称
*/
String executorHandler();
/**
* 执行器 任务参数
*/
String executorParam() default "";
/**
* 路由策略
*/
ExecutorRouteStrategyEnum executorRouteStrategy() default ExecutorRouteStrategyEnum.FIRST;
/**
* 子任务ID,多个逗号分隔
*/
String childJobId() default "";
/**
* 调度过期策略
*/
MisfireStrategyEnum misfireStrategy() default MisfireStrategyEnum.DO_NOTHING;
/**
* 阻塞处理策略
*/
ExecutorBlockStrategyEnum executorBlockStrategy() default ExecutorBlockStrategyEnum.SERIAL_EXECUTION;
/**
* 任务执行超时时间,单位秒
*/
int executorTimeout() default 0;
/**
* 失败重试次数
*/
int executorFailRetryCount() default 0;
/**
* 是否启动
*/
boolean start() default true;
}
2.新增xxl-job配置
public class XxlJobCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
// 从环境配置中读取 "my.feature.enabled" 属性
String enabled = ValidateUtils.getOrDefault(context.getEnvironment().getProperty("xxl.enabled"),"true");
// 返回属性值是否为 "true"
return BooleanUtil.toBoolean(enabled);
}
}
@Configuration
@Import(XxlJobProperties.class)
public class XxlJobConfig {
@Bean
@ConditionalOnMissingBean(XxlJobSpringExecutor.class)
@Conditional(XxlJobCondition.class)
public XxlJobSpringExecutor xxlJobExecutor(XxlJobProperties profile) throws Exception {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(profile.getAdminAddresses());
xxlJobSpringExecutor.setAppname(profile.getAppName());
xxlJobSpringExecutor.setIp(profile.getIp());
xxlJobSpringExecutor.setPort(profile.getPort());
xxlJobSpringExecutor.setAccessToken(profile.getAccessToken());
if(ValidateUtils.isNotEmpty(profile.getLogPath())){
xxlJobSpringExecutor.setLogPath(profile.getLogPath());
}
xxlJobSpringExecutor.setLogRetentionDays(profile.getLogRetentionDays());
return xxlJobSpringExecutor;
}
}
3.新增通用Job处理类
public abstract class AbstractJobConsumer extends IJobHandler {
protected final Logger logger = LoggerUtils.logger(getClass());
private final JobTask jobTask = getClass().getAnnotation(JobTask.class);
public void execute() throws Exception{
String jobParam = XxlJobHelper.getJobParam();
logger.info("BeanName:{},任务参数:{},开启任务调度",getXxlJobBeanName(),jobParam);
StopWatch stopWatch = new StopWatch();
try {
stopWatch.start();
handle(jobParam);
}catch(Throwable e){
logger.error("AbstractJobConsumer:调度报错 异常为:", e);
saveLog(e);
throw e;
} finally {
stopWatch.stop();
Double second = Double.parseDouble(String.valueOf(stopWatch.getLastTaskTimeMillis())) / 1000;
logger.info("BeanName:{},结束任务调度,耗时:{}秒",getXxlJobBeanName(),second);
}
}
public abstract void handle(String jobParam);
public String getXxlJobBeanName(){
return ValidateUtils.isNotEmpty(jobTask) ? jobTask.executorHandler() : "";
}
/**
* 保存消费失败的消息
*/
public abstract void saveLog(Throwable throwable);
}
4.新增自动创建任务实体类
public class XxlJobInfo {
private int id; // 主键ID
private int jobGroup; // 执行器主键ID
private String jobDesc;
private String author; // 负责人
private String alarmEmail; // 报警邮件
private ScheduleTypeEnum scheduleType; // 调度类型
private String scheduleConf; // 调度配置,值含义取决于调度类型
private MisfireStrategyEnum misfireStrategy; // 调度过期策略
private ExecutorRouteStrategyEnum executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器,任务Handler名称
private String executorParam; // 执行器,任务参数
private ExecutorBlockStrategyEnum executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
private GlueTypeEnum glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource; // GLUE源代码
private String glueRemark; // GLUE备注
private String childJobId; // 子任务ID,多个逗号分隔
private int triggerStatus; // 调度状态:0-停止,1-运行
public XxlJobInfo() {
super();
}
public XxlJobInfo(JobTask jobTask) {
super();
this.jobGroup = jobTask.jobGroup();
this.jobDesc = jobTask.taskName();
this.author = jobTask.author();
this.alarmEmail = jobTask.alarmEmail();
this.scheduleType = jobTask.scheduleType();
this.scheduleConf = jobTask.scheduleConf();
this.misfireStrategy = jobTask.misfireStrategy();
this.executorRouteStrategy = jobTask.executorRouteStrategy();
this.executorHandler = jobTask.executorHandler();
this.executorParam = jobTask.executorParam();
this.executorBlockStrategy = jobTask.executorBlockStrategy();
this.executorTimeout = jobTask.executorTimeout();
this.executorFailRetryCount = jobTask.executorFailRetryCount();
this.glueType = jobTask.glueType();
this.childJobId = jobTask.childJobId();
this.triggerStatus = jobTask.start() ? 1 : 0;
}
public void update(JobTask jobTask){
this.jobGroup = jobTask.jobGroup();
this.jobDesc = jobTask.taskName();
this.author = jobTask.author();
this.alarmEmail = jobTask.alarmEmail();
this.scheduleType = jobTask.scheduleType();
this.scheduleConf = jobTask.scheduleConf();
this.misfireStrategy = jobTask.misfireStrategy();
this.executorRouteStrategy = jobTask.executorRouteStrategy();
this.executorHandler = jobTask.executorHandler();
this.executorParam = jobTask.executorParam();
this.executorBlockStrategy = jobTask.executorBlockStrategy();
this.executorTimeout = jobTask.executorTimeout();
this.executorFailRetryCount = jobTask.executorFailRetryCount();
this.glueType = jobTask.glueType();
this.childJobId = jobTask.childJobId();
this.triggerStatus = jobTask.start() ? 1 : 0;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getJobGroup() {
return jobGroup;
}
public void setJobGroup(int jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobDesc() {
return jobDesc;
}
public void setJobDesc(String jobDesc) {
this.jobDesc = jobDesc;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getAlarmEmail() {
return alarmEmail;
}
public void setAlarmEmail(String alarmEmail) {
this.alarmEmail = alarmEmail;
}
public ScheduleTypeEnum getScheduleType() {
return scheduleType;
}
public void setScheduleType(ScheduleTypeEnum scheduleType) {
this.scheduleType = scheduleType;
}
public String getScheduleConf() {
return scheduleConf;
}
public void setScheduleConf(String scheduleConf) {
this.scheduleConf = scheduleConf;
}
public MisfireStrategyEnum getMisfireStrategy() {
return misfireStrategy;
}
public void setMisfireStrategy(MisfireStrategyEnum misfireStrategy) {
this.misfireStrategy = misfireStrategy;
}
public ExecutorRouteStrategyEnum getExecutorRouteStrategy() {
return executorRouteStrategy;
}
public void setExecutorRouteStrategy(ExecutorRouteStrategyEnum executorRouteStrategy) {
this.executorRouteStrategy = executorRouteStrategy;
}
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
public String getExecutorParam() {
return executorParam;
}
public void setExecutorParam(String executorParam) {
this.executorParam = executorParam;
}
public ExecutorBlockStrategyEnum getExecutorBlockStrategy() {
return executorBlockStrategy;
}
public void setExecutorBlockStrategy(ExecutorBlockStrategyEnum executorBlockStrategy) {
this.executorBlockStrategy = executorBlockStrategy;
}
public int getExecutorTimeout() {
return executorTimeout;
}
public void setExecutorTimeout(int executorTimeout) {
this.executorTimeout = executorTimeout;
}
public int getExecutorFailRetryCount() {
return executorFailRetryCount;
}
public void setExecutorFailRetryCount(int executorFailRetryCount) {
this.executorFailRetryCount = executorFailRetryCount;
}
public GlueTypeEnum getGlueType() {
return glueType;
}
public void setGlueType(GlueTypeEnum glueType) {
this.glueType = glueType;
}
public String getGlueSource() {
return glueSource;
}
public void setGlueSource(String glueSource) {
this.glueSource = glueSource;
}
public String getGlueRemark() {
return glueRemark;
}
public void setGlueRemark(String glueRemark) {
this.glueRemark = glueRemark;
}
public String getChildJobId() {
return childJobId;
}
public void setChildJobId(String childJobId) {
this.childJobId = childJobId;
}
public int getTriggerStatus() {
return triggerStatus;
}
public void setTriggerStatus(int triggerStatus) {
this.triggerStatus = triggerStatus;
}
}
5.新增枚举
public enum ExecutorBlockStrategyEnum implements BaseEnum<String> {
SERIAL_EXECUTION("单机串行"),
/*CONCURRENT_EXECUTION("并行"),*/
DISCARD_LATER("丢弃后续调度"),
COVER_EARLY("覆盖之前调度");
private final String desc;
ExecutorBlockStrategyEnum(String desc) {
this.desc = desc;
}
@Override
public String getDesc() {
return desc;
}
@Override
public String label() {
return this.name();
}
@Override
public String key() {
return this.name();
}
}
public enum ExecutorRouteStrategyEnum implements BaseEnum<String> {
FIRST("第一个"),
LAST("最后一个"),
ROUND("轮询"),
RANDOM("随机"),
CONSISTENT_HASH("一致性HASH"),
LEAST_FREQUENTLY_USED("最不经常使用"),
LEAST_RECENTLY_USED("最近最久未使用"),
FAILOVER("故障转移"),
BUSYOVER("忙碌转移"),
SHARDING_BROADCAST("分片广播");
private final String desc;
ExecutorRouteStrategyEnum(String desc) {
this.desc = desc;
}
@Override
public String getDesc() {
return desc;
}
@Override
public String label() {
return this.name();
}
@Override
public String key() {
return this.name();
}
}
public enum GlueTypeEnum implements BaseEnum<String> {
BEAN("BEAN"),
GLUE_GROOVY("GLUE(Java)"),
GLUE_SHELL("GLUE(Shell)"),
GLUE_PYTHON("GLUE(Python)"),
GLUE_PHP("GLUE(Php)"),
GLUE_NODEJS("GLUE(Nodejs)"),
GLUE_POWERSHELL("GLUE(PowerShell)");
private final String desc;
GlueTypeEnum(String desc) {
this.desc = desc;
}
@Override
public String getDesc() {
return desc;
}
@Override
public String label() {
return this.name();
}
@Override
public String key() {
return this.name();
}
}
public enum MisfireStrategyEnum implements BaseEnum<String> {
/**
* do nothing
*/
DO_NOTHING("忽略"),
/**
* fire once now
*/
FIRE_ONCE_NOW("立即执行一次");
private final String desc;
MisfireStrategyEnum(String desc) {
this.desc = desc;
}
@Override
public String getDesc() {
return desc;
}
@Override
public String label() {
return this.name();
}
@Override
public String key() {
return this.name();
}
}
public enum ScheduleTypeEnum implements BaseEnum<String> {
NONE("不调度"),
/**
* schedule by cron
*/
CRON("CRON调度"),
/**
* schedule by fixed rate (in seconds)
*/
FIX_RATE("固定速率"),
/**
* schedule by fix delay (in seconds), after the last time
*/
/*FIX_DELAY(I18nUtil.getString("schedule_type_fix_delay"))*/;
private final String desc;
ScheduleTypeEnum(String desc) {
this.desc = desc;
}
@Override
public String getDesc() {
return desc;
}
@Override
public String label() {
return this.name();
}
@Override
public String key() {
return this.name();
}
}
6.新增初始化方法
@Configuration
@Import(XxlJobProperties.class)
public class XxlJobInit extends AbstractMessageLineRunner<JobTask> {
private final XxlJobProperties profile;
public XxlJobInit(ApplicationContext applicationContext, XxlJobProperties profile) {
this.profile = profile;
SpringUtil.setContext(applicationContext);
}
@Override
public void init(List<Object> clazzList) throws Exception {
if(!profile.getEnabled()){
logger.error("xxl-Job不启用,不初始化定时任务");
return;
}
String cookie = login();
String adminAddresses = profile.getAdminAddresses();
if(!adminAddresses.endsWith("/")){
adminAddresses = adminAddresses + "/";
}
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
for(Object obj : clazzList){
Class<?> clazz = obj.getClass();
JobTask jobTask = AnnotationUtil.getAnnotation(clazz, JobTask.class);
if(ValidateUtils.isEmpty(jobTask)){
logger.error("{}没有JobTask注解,不进行初始化",obj.getClass().getSimpleName());
continue;
}
String className = obj.getClass().getSimpleName();
String executorHandler = ValidateUtils.getOrDefault(jobTask.executorHandler(),className);
List<XxlJobInfo> xxlJobInfoList = findByExecutorHandler(cookie,executorHandler);
Map<String,XxlJobInfo> xxlJobInfoMap = Lambda.toMap(xxlJobInfoList, XxlJobInfo::getExecutorHandler);
XxlJobInfo xxlJobInfo = xxlJobInfoMap.get(executorHandler);
String url = adminAddresses + (ValidateUtils.isEmpty(xxlJobInfo)? "jobinfo/add" : "jobinfo/update");
xxlJobInfo = ValidateUtils.isEmpty(xxlJobInfo) ? new XxlJobInfo(jobTask,className) : xxlJobInfo.update(jobTask,className);
xxlJobInfo.setExecutorHandler(executorHandler);
// 发送 POST 请求
execute(cookie,url,JSONUtil.toBean(JSONUtil.toJsonStr(xxlJobInfo), new TypeReference<Map<String,Object>>() {},true));
//启用或禁止任务,调度类型必须不是不调度才可以
if(!ValidateUtils.equalsIgnoreCase(jobTask.scheduleType().name(), ScheduleTypeEnum.NONE.name())){
url = adminAddresses + (jobTask.start() ? "jobinfo/start" : "jobinfo/stop");
enabled(cookie,executorHandler,url);
} else {
logger.info("{}类的调度类型为不调度,不允许启用或者禁止任务",className);
}
XxlJobSpringExecutor.registJobHandler(executorHandler, (IJobHandler) obj);
}
xxlJobSpringExecutor.start();
}
/**
* 登陆网页
*/
private String login() throws Exception {
String adminAddresses = profile.getAdminAddresses();
String userName = profile.getUserName();
String password = profile.getPassword();
if(ValidateUtils.isEmpty(adminAddresses)){
throw new Exception("xxl-job的网页url不允许为空");
}
if(ValidateUtils.isEmpty(userName)){
throw new Exception("xxl-job的账号不允许为空");
}
if(ValidateUtils.isEmpty(password)){
throw new Exception("xxl-job的密码不允许为空");
}
if(!adminAddresses.endsWith("/")){
adminAddresses = adminAddresses + "/";
}
String url = adminAddresses + "login";
// 构建请求参数
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("userName", userName);
paramMap.put("password", password);
// 发送 POST 请求
return getCookie(url,paramMap);
}
/**
* 启动任务
*/
private void enabled(String cookie,String executorHandler,String url) throws Exception {
List<XxlJobInfo> xxlJobInfoList = findByExecutorHandler(cookie,executorHandler);
Map<String,XxlJobInfo> xxlJobInfoMap = Lambda.toMap(xxlJobInfoList, XxlJobInfo::getExecutorHandler);
XxlJobInfo xxlJobInfo = xxlJobInfoMap.get(executorHandler);
if(ValidateUtils.isEmpty(xxlJobInfo)){
throw new Exception("xxl-job启动"+executorHandler+"任务失败");
}
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("id", xxlJobInfo.getId());
execute(cookie,url,paramMap);
}
/**
* 调用xxl-job的任务页面查询
*/
private List<XxlJobInfo> findByExecutorHandler(String cookie,String executorHandler) throws Exception {
String adminAddresses = profile.getAdminAddresses();
if(!adminAddresses.endsWith("/")){
adminAddresses = adminAddresses + "/";
}
String url = adminAddresses + "jobinfo/pageList?jobGroup=1&triggerStatus=-1&start="+0+"&length="+10+"&executorHandler="+executorHandler;
String body = execute(cookie,url,null);
Map<String,Object> resultMap = JSONUtil.toBean(body, new TypeReference<Map<String, Object>>() {},true);
Object obj = resultMap.get("data");
return JSONUtil.toList(JSONUtil.toJsonStr(obj),XxlJobInfo.class);
}
/**
* 调用接口
*/
private HttpResponse executeResponse(String cookie,String url,Map<String, Object> paramMap) throws Exception {
HttpRequest request = HttpUtil.createPost(url);
if(ValidateUtils.isNotEmpty(cookie)){
request = request.cookie(cookie); // 需要替换为实际的认证信息
}
if(ValidateUtils.isNotEmpty(paramMap)){
request = request.form(paramMap);
}
HttpResponse response = request.execute();
String body = response.body();
String code = null;
String msg = null;
if(JSONUtil.isTypeJSON(body)){
Map<String,Object> resultMap = JSONUtil.toBean(body, new TypeReference<Map<String, Object>>() {},true);
code = ValidateUtils.isEmpty(resultMap.get("code")) ? null : resultMap.get("code").toString();
msg = ValidateUtils.isEmpty(resultMap.get("msg")) ? null : resultMap.get("msg").toString();
}
if(!response.isOk() || (ValidateUtils.isNotEmpty(code) && ValidateUtils.notEqualsIgnoreCase(code,"200"))){
throw new Exception("调用xxl-job接口:["+url+"]失败,请求参数是:["+JSONUtil.toJsonStr(paramMap)+"],原因:"+msg);
}
return response;
}
/**
* 获取登陆网页的cookie
*/
private String getCookie(String url, Map<String, Object> paramMap) throws Exception {
try (HttpResponse response = executeResponse(null, url, paramMap)) {
List<String> cookies = response.headerList("Set-Cookie");
if(ValidateUtils.isEmpty(cookies)){
throw new Exception("获取xxl-job的cookie失败");
}
return cookies.get(0);
}
}
/**
* 获取接口返回的数据
*/
private String execute(String cookie, String url, Map<String, Object> paramMap) throws Exception {
try (HttpResponse response = executeResponse(cookie, url, paramMap)) {
return response.body();
}
}
}
7.新增配置
@ConfigurationProperties("xxl")
public class XxlJobProperties {
// XXL-Job 管理控制台的地址。
private String adminAddresses;
// 用于与管理控制台安全通信的访问令牌。
private String accessToken;
// 执行器的应用名称,用于注册和标识。
private String appName;
// 执行器的自定义地址(如果有指定)。
private String address;
// 执行器机器的 IP 地址。
private String ip;
// 执行器运行的端口号。
private int port;
// 存储执行日志的路径。
private String logPath;
// 保留执行日志的天数。
private Integer logRetentionDays = 30;
//网页登陆账户
private String userName;
//网页登陆密码
private String password;
//是否启用
private boolean enabled = true;
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public boolean getEnabled() {
return enabled;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAdminAddresses() {
return adminAddresses;
}
public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses;
}
public String getAccessToken() {
return accessToken;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getLogRetentionDays() {
return logRetentionDays;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}
}
然后如果遇到找不到的类,可以在我的gitee和github里面能找到