【Java】从0到1实现DAG+Quartz分布式任务调度系统

发布于:2025-07-23 ⋅ 阅读:(15) ⋅ 点赞:(0)

为什么需要“DAG+Quartz”的分布式任务调度?

在我们日常开发过程中,任务调度始终是支撑核心业务流程的“隐形引擎”。无论是电商大促时的“库存扣减→订单生成→支付通知→物流同步”,还是数据处理平台的“日志采集→清洗→聚合→分析”,或是金融系统的“交易对账→风控校验→报表生成”,都需要任务按特定顺序、依赖关系高效执行。

传统定时任务工具(如Quartz)虽能解决“何时执行”的问题,但面对复杂业务场景时,逐渐暴露出三大痛点:

  • 依赖关系难管理:任务间若存在“前置任务未完成则后续任务不能执行”的强依赖(如支付成功后才能发短信),传统工具无法直观描述这种逻辑,只能通过“延迟触发”或“人工干预”勉强实现,效率低且易出错;
  • 并行执行难实现:对于无依赖关系的任务(如大促时同时执行“库存扣减”和“订单生成”),传统工具需手动拆分任务或依赖外部协调,难以动态适配业务变化;
  • 失败处理难闭环:任务失败后,传统工具仅支持简单的“重试”或“标记失败”,无法根据失败类型(如偶发网络问题 vs 永久性错误)动态调整策略(如指数退避重试 vs 熔断跳过),导致资源浪费或业务中断。

DAG(有向无环图) 恰好能通过“节点表示任务、边表示依赖”的模型,清晰描述任务的执行逻辑。结合Quartz的分布式调度能力,可构建一个“能描述依赖、支持并行、灵活容错”的任务调度系统。但现实中,这一目标的实现面临多重挑战:

  • DAG与Quartz的集成复杂度高:Quartz的任务触发逻辑需与DAG的拓扑排序结果深度绑定,如何动态生成任务执行顺序并触发?
  • 分布式环境下的一致性难题:多实例部署时,如何避免任务重复触发?如何保证任务状态(如“执行中”“成功”“失败”)在分布式系统中的准确同步?
  • 失败处理的工程化落地:如何设计可配置的重试策略(如指数退避)?如何通过熔断机制避免“失败任务反复重试”导致的资源耗尽?

本文将围绕这些核心问题,从“环境搭建→核心功能开发→测试验证”全程拆解,手把手实现一个支持“依赖管理、并行执行、动态容错”的分布式任务调度系统,解决传统工具无法应对的复杂场景需求。


一、环境准备

1.1 必须的软件和环境

1.1.1 JDK 1.8
  • 下载地址:https://adoptium.net/temurin/releases/?version=8(选择Windows/Linux/macOS版本)。
  • 验证安装:命令行输入 java -version,输出类似 java version "1.8.0_301" 表示成功。
1.1.2 Maven 3.8+(管理Java依赖)
  • 下载地址:https://maven.apache.org/download.cgi(选择二进制包)。
  • 安装步骤(以Linux为例):
    # 解压到 /opt/maven
    sudo tar -zxvf apache-maven-3.8.6-bin.tar.gz -C /opt/maven
    
    # 配置环境变量(编辑 ~/.bashrc 或 /etc/profile)
    echo 'export MAVEN_HOME=/opt/maven/apache-maven-3.8.6' >> ~/.bashrc
    echo 'export PATH=$MAVEN_HOME/bin:$PATH' >> ~/.bashrc
    source ~/.bashrc
    
    # 验证安装
    mvn -v  # 输出 Maven 版本信息(如 Apache Maven 3.8.6)
    
1.1.3 MySQL 8.0(存储Quartz的元数据)
  • 下载地址:https://dev.mysql.com/downloads/mysql/(选择MySQL Community Server 8.0.x)。
  • 安装步骤(以Linux为例):
    # 下载并解压
    wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.36-linux-glibc2.17-x86_64.tar.xz
    tar -xvf mysql-8.0.36-linux-glibc2.17-x86_64.tar.xz -C /opt
    
    # 创建软链接(方便后续操作)
    sudo ln -s /opt/mysql-8.0.36-linux-glibc2.17-x86_64 /opt/mysql
    
    # 启动MySQL(首次启动需要初始化)
    sudo /opt/mysql/bin/mysqld --initialize-insecure --user=mysql --basedir=/opt/mysql --datadir=/opt/mysql/data
    
    # 配置环境变量(编辑 ~/.bashrc)
    echo 'export PATH=/opt/mysql/bin:$PATH' >> ~/.bashrc
    source ~/.bashrc
    
    # 登录MySQL(初始无密码)
    mysql -u root
    
1.1.4 Redis 7.0+(存储任务状态和分布式锁)
  • 下载地址:https://download.redis.io/releases/redis-7.0.12.tar.gz。
  • 安装步骤(以Linux为例):
    # 下载并解压
    wget https://download.redis.io/releases/redis-7.0.12.tar.gz
    tar -xzf redis-7.0.12.tar.gz -C /opt
    
    # 编译安装
    cd /opt/redis-7.0.12
    make && make install
    
    # 启动Redis(前台模式,方便测试)
    redis-server
    

1.2 创建Spring Boot项目

1.2.1 访问Spring Initializr

打开浏览器,访问 https://start.spring.io/,按以下步骤创建项目:

步骤 操作
Project 选择 Maven Project(Java项目管理工具)
Language 选择 Java
Spring Boot 选择 3.2.0(最新稳定版)
Group 自定义(如 com.example
Artifact 自定义(如 dag-scheduler-demo
Dependencies 添加以下依赖(点击“Add Dependencies”搜索):
- Spring Web(提供HTTP接口)
- Spring Data Redis(操作Redis)
- Quartz Scheduler(分布式任务调度核心)
- Lombok(简化Java代码)
- Fastjson(解析JSON格式的DAG定义)
1.2.2 导入项目到IDE
  • 使用IntelliJ IDEA或Eclipse导入Maven项目(选择 pom.xml 文件)。

1.3 配置数据库和Quartz集群(关键!)

1.3.1 创建MySQL数据库

登录MySQL,创建存储Quartz元数据的数据库:

-- 登录MySQL(初始无密码)
mysql -u root

-- 创建数据库(名称可自定义,这里用 quartz_db)
CREATE DATABASE quartz_db CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

-- 退出MySQL
exit;
1.3.2 配置Quartz集群(必须!)

src/main/resources/application.yml 中添加以下配置:

# 应用基础配置
server:
  port: 8080  # 服务端口(默认8080)

# Spring Boot 核心配置
spring:
  application:
    name: dag-scheduler-demo  # 应用名称(自定义)

  # Quartz 集群配置(必须!)
  quartz:
    job-store-type: jdbc  # 使用JDBC存储任务元数据(必须)
    properties:
      org.quartz.jobStore:
        class: org.quartz.impl.jdbcjobstore.JobStoreTX  # 事务性存储(必须)
        driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate  # 通用驱动(必须)
        tablePrefix: QRTZ_  # 表前缀(默认,无需修改)
        isClustered: true  # 启用集群(必须!)
        misfireThreshold: 60000  # 任务错过触发的最大时间(1分钟)
      org.quartz.threadPool:
        class: org.quartz.simpl.SimpleThreadPool  # 线程池(必须)
        threadCount: 10  # 线程数(根据任务量调整,默认10)
        threadPriority: 5  # 线程优先级(默认5)

  # MySQL 数据库连接配置
  datasource:
    url: jdbc:mysql://localhost:3306/quartz_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    username: root  # 替换为你的MySQL用户名(初始无密码)
    password: 123456  # 替换为你的MySQL密码(建议修改)
    driver-class-name: com.mysql.cj.jdbc.Driver  # MySQL驱动类名

  # Redis 配置(存储任务状态和分布式锁)
  redis:
    host: localhost  # 替换为你的Redis地址(本地默认localhost)
    port: 6379       # 替换为你的Redis端口(本地默认6379)
    database: 0      # 使用Redis的第0号数据库
1.3.3 初始化Quartz表(自动完成)

首次启动Spring Boot应用时,Quartz会自动在 quartz_db 数据库中创建所需的表(如 QRTZ_TRIGGERSQRTZ_CRON_TRIGGERS 等)。启动后,登录MySQL验证:

mysql -u root -p  # 输入密码(初始无密码)
USE quartz_db;
SHOW TABLES;  # 应看到以 QRTZ_ 开头的表(如 QRTZ_JOB_DETAILS、QRTZ_TRIGGERS)

二、核心功能开发

2.1 定义DAG的数据结构(JSON格式)

DAG(有向无环图)用JSON格式描述任务关系,包含节点(任务)和边(依赖关系)。例如,一个“订单支付后通知”的DAG定义如下:

{
  "dagId": "order_pay_notify",  // DAG的唯一ID(如订单支付后通知)
  "name": "订单支付后通知",       // DAG的名称(自定义)
  "nodes": [                      // 任务列表(节点)
    {"id": "check_pay", "name": "支付校验", "jobClass": "com.example.job.CheckPayJob"},
    {"id": "send_sms", "name": "发送短信", "jobClass": "com.example.job.SendSmsJob"},
    {"id": "send_email", "name": "发送邮件", "jobClass": "com.example.job.SendEmailJob"},
    {"id": "record_log", "name": "记录日志", "jobClass": "com.example.job.RecordLogJob"}
  ],
  "edges": [                      // 依赖关系(边)
    {"from": "check_pay", "to": "send_sms"},   // 发送短信依赖支付校验
    {"from": "check_pay", "to": "send_email"}, // 发送邮件依赖支付校验
    {"from": "send_sms", "to": "record_log"},  // 记录日志依赖发送短信
    {"from": "send_email", "to": "record_log"} // 记录日志依赖发送邮件
  ]
}

关键说明

  • dagId:DAG的唯一标识(如 order_pay_notify),用于区分不同的任务链路;
  • nodes:任务列表,每个任务有 id(唯一ID)、name(显示名称)、jobClass(任务类的全限定名);
  • edges:依赖关系,from 是前置任务ID,to 是后续任务ID(表示“前置任务完成后才能执行后续任务”)。

2.2 解析DAG:将JSON转为Java对象

需要将JSON格式的DAG定义转换为Java对象,方便程序处理。

2.2.1 定义DAG的节点(TaskNode)和边(TaskEdge)

src/main/java/com/example/dag/entity 目录下创建以下类:

// TaskNode.java(任务节点)
package com.example.dag.entity;

import lombok.Data;

@Data
public class TaskNode {
    private String id;      // 任务ID(唯一,如 check_pay)
    private String name;    // 任务名称(如 支付校验)
    private String jobClass;// 任务类的全限定名(如 com.example.job.CheckPayJob)
}
// TaskEdge.java(任务边,表示依赖关系)
package com.example.dag.entity;

import lombok.Data;

@Data
public class TaskEdge {
    private String from;    // 前置任务ID(如 check_pay)
    private String to;      // 后续任务ID(如 send_sms)
}
2.2.2 定义DAG整体结构(DagDefinition)
// DagDefinition.java(完整的DAG定义)
package com.example.dag.entity;

import lombok.Data;
import java.util.List;

@Data
public class DagDefinition {
    private String dagId;       // DAG的唯一ID(如 order_pay_notify)
    private String name;        // DAG名称(如 订单支付后通知)
    private List<TaskNode> nodes; // 任务节点列表
    private List<TaskEdge> edges; // 依赖边列表
}
2.2.3 解析JSON的DAG解析器(DagParser)

使用Fastjson将JSON字符串转换为 DagDefinition 对象:

// DagParser.java(DAG解析器)
package com.example.dag.parser;

import com.alibaba.fastjson.JSON;
import com.example.dag.entity.DagDefinition;
import org.springframework.stereotype.Component;

@Component
public class DagParser {
    // Fastjson的JSON解析器(自动注入)
    private final JSON jsonParser = JSON.parseObject("");

    /**
     * 将JSON字符串解析为DagDefinition对象
     * @param dagJson JSON格式的DAG定义(如上面的示例)
     * @return DagDefinition对象
     */
    public DagDefinition parse(String dagJson) {
        return jsonParser.parseObject(dagJson, DagDefinition.class);
    }
}

2.3 拓扑排序:生成任务的执行顺序

DAG的任务必须按依赖关系执行(比如“支付校验”完成后才能执行“发送短信”)。我们需要用 拓扑排序 算法(Kahn算法)生成执行顺序。

2.3.1 拓扑排序的原理

拓扑排序的核心是:

  1. 统计每个任务的入度(有多少前置任务);
  2. 将入度为0的任务(没有前置任务)加入执行队列;
  3. 依次执行队列中的任务,并将其后续任务的入度减1;
  4. 重复步骤2-3,直到所有任务都被执行。

如果有任务未被执行,说明DAG中存在循环依赖(比如A依赖B,B依赖A),此时拓扑排序会失败。

2.3.2 实现拓扑排序(TopologySorter)

src/main/java/com/example/dag/service 目录下创建 TopologySorter.java

// TopologySorter.java(拓扑排序工具类)
package com.example.dag.service;

import com.example.dag.entity.TaskEdge;
import com.example.dag.entity.TaskNode;
import com.example.dag.entity.DagDefinition;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.stream.Collectors;

@Service
public class TopologySorter {

    /**
     * 对DAG进行拓扑排序,生成执行顺序
     * @param dagDefinition DAG定义
     * @return 执行顺序(任务ID列表)
     */
    public List<String> sort(DagDefinition dagDefinition) {
        // 1. 统计每个任务的入度(初始为0)
        Map<String, Integer> inDegreeMap = new HashMap<>();
        for (TaskNode node : dagDefinition.getNodes()) {
            inDegreeMap.put(node.getId(), 0); // 初始入度为0
        }

        // 2. 遍历所有边,更新后续任务的入度(前置任务+1)
        for (TaskEdge edge : dagDefinition.getEdges()) {
            String toNodeId = edge.getTo();
            inDegreeMap.put(toNodeId, inDegreeMap.getOrDefault(toNodeId, 0) + 1);
        }

        // 3. 初始化队列(入度为0的任务)
        Queue<String> queue = new LinkedList<>();
        for (Map.Entry<String, Integer> entry : inDegreeMap.entrySet()) {
            if (entry.getValue() == 0) {
                queue.offer(entry.getKey()); // 入度为0的任务加入队列
            }
        }

        // 4. 生成执行顺序
        List<String> executionOrder = new ArrayList<>();
        while (!queue.isEmpty()) {
            String nodeId = queue.poll(); // 取出一个入度为0的任务
            executionOrder.add(nodeId);

            // 遍历当前节点的所有后续任务,减少它们的入度
            for (TaskEdge edge : dagDefinition.getEdges()) {
                if (edge.getFrom().equals(nodeId)) { // 当前节点是前置任务
                    String nextNodeId = edge.getTo();
                    int newInDegree = inDegreeMap.get(nextNodeId) - 1; // 后续任务入度-1
                    inDegreeMap.put(nextNodeId, newInDegree);

                    // 如果后续任务的入度变为0,加入队列
                    if (newInDegree == 0) {
                        queue.offer(nextNodeId);
                    }
                }
            }
        }

        // 5. 检查是否有循环依赖(总任务数与执行顺序长度不一致说明有环)
        if (executionOrder.size() != dagDefinition.getNodes().size()) {
            throw new RuntimeException("DAG存在循环依赖,无法排序!");
        }

        return executionOrder;
    }
}

2.4 集成Quartz:触发任务执行

Quartz是Java生态中最经典的任务调度框架,支持定时触发和分布式执行。我们需要让Quartz根据拓扑排序的结果,按顺序触发任务。

2.4.1 自定义Quartz任务类(BaseJob)

所有任务需要继承Quartz的 QuartzJobBean 类,并实现 executeInternal 方法(任务的具体逻辑在这里编写)。

src/main/java/com/example/dag/job 目录下创建 BaseJob.java

// BaseJob.java(所有任务的基类)
package com.example.dag.job;

import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

@Slf4j
public abstract class BaseJob extends QuartzJobBean {

    /**
     * 任务的核心执行逻辑(由子类实现)
     * @param dagId DAG的唯一ID
     * @param nodeId 当前任务的ID
     * @param context Quartz的执行上下文(可传递参数)
     */
    protected abstract void executeTask(String dagId, String nodeId, JobDataMap context);

    /**
     * Quartz触发任务时调用的方法
     */
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        // 从Quartz的上下文中获取参数(如dagId、nodeId)
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        String dagId = dataMap.getString("dagId");
        String nodeId = dataMap.getString("nodeId");

        try {
            log.info("开始执行任务:dagId={}, nodeId={}", dagId, nodeId);
            // 执行当前任务的具体逻辑(由子类实现)
            executeTask(dagId, nodeId, dataMap);
            log.info("任务执行成功:dagId={}, nodeId={}", dagId, nodeId);
            // 任务执行成功,更新状态为SUCCESS(后面会讲状态管理)
            StateManager.markTaskSuccess(dagId, nodeId);
        } catch (Exception e) {
            log.error("任务执行失败:dagId={}, nodeId={}", dagId, nodeId, e);
            // 任务执行失败,更新状态为FAIL(后面会讲失败处理)
            StateManager.markTaskFail(dagId, nodeId, e.getMessage());
        }
    }
}
2.4.2 示例任务类(发送短信)

编写一个具体的任务类(例如“发送短信”任务),继承 BaseJob 并实现 executeTask 方法:

// SendSmsJob.java(发送短信的任务)
package com.example.dag.job;

import com.example.dag.service.StateManager;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SendSmsJob extends BaseJob {

    @Autowired
    private StateManager stateManager;  // 状态管理器(用于记录任务状态)

    @Override
    protected void executeTask(String dagId, String nodeId, JobDataMap context) {
        // 从上下文中获取参数(比如订单ID)
        String orderId = context.getString("orderId");
        log.info("发送短信给订单:{},DAG ID:{},任务ID:{}", orderId, dagId, nodeId);
        
        // 实际调用短信网关(这里模拟)
        // SmsClient.send(orderId, "您的订单已支付成功!");
    }
}
2.4.3 触发DAG执行(DagScheduler)

创建一个调度器类,负责解析DAG、生成执行顺序,并调用Quartz触发每个任务:

// DagScheduler.java(DAG调度器)
package com.example.dag.scheduler;

import com.alibaba.fastjson.JSON;
import com.example.dag.entity.DagDefinition;
import com.example.dag.entity.TaskNode;
import com.example.dag.parser.DagParser;
import com.example.dag.service.TopologySorter;
import com.example.dag.state.StateManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@RequiredArgsConstructor
public class DagScheduler {

    private final SchedulerFactoryBean schedulerFactoryBean;  // Quartz调度器工厂
    private final TopologySorter topologySorter;              // 拓扑排序工具类
    private final DagParser dagParser;                        // DAG解析器
    private final StateManager stateManager;                  // 状态管理器

    /**
     * 触发一个DAG的执行
     * @param dagJson JSON格式的DAG定义(如上面的示例)
     */
    public void triggerDag(String dagJson) {
        try {
            // 1. 解析DAG定义(将JSON转为Java对象)
            DagDefinition dagDefinition = dagParser.parse(dagJson);
            log.info("解析DAG成功:dagId={}", dagDefinition.getDagId());

            // 2. 生成拓扑排序的执行顺序(任务ID列表)
            List<String> executionOrder = topologySorter.sort(dagDefinition);
            log.info("生成执行顺序:{}", executionOrder);

            // 3. 遍历执行顺序,逐个触发任务
            for (String nodeId : executionOrder) {
                // 找到当前节点的定义(从dagDefinition中获取)
                TaskNode currentNode = dagDefinition.getNodes().stream()
                        .filter(node -> node.getId().equals(nodeId))
                        .findFirst()
                        .orElseThrow(() -> new RuntimeException("未找到任务节点:" + nodeId));

                // 4. 构建Quartz的JobDetail(任务的详细信息)
                JobDetail jobDetail = JobBuilder.newJob(BaseJob.class)
                        .withIdentity(nodeId, "dag_group")  // 任务唯一标识(节点ID + 分组)
                        .usingJobData("dagId", dagDefinition.getDagId())  // 传递DAG ID
                        .usingJobData("nodeId", nodeId)  // 传递当前任务ID
                        .usingJobData("orderId", "20250721123456")  // 示例参数(订单ID)
                        .build();

                // 5. 构建Quartz的Trigger(触发器,立即触发)
                Trigger trigger = TriggerBuilder.newTrigger()
                        .withIdentity(nodeId + "_trigger", "dag_group")  // 触发器唯一标识
                        .startNow()  // 立即触发
                        .build();

                // 6. 调度执行(Quartz会自动管理分布式触发)
                Scheduler scheduler = schedulerFactoryBean.getScheduler();
                scheduler.scheduleJob(jobDetail, trigger);
                log.info("触发任务成功:dagId={}, nodeId={}", dagDefinition.getDagId(), nodeId);
            }
        } catch (Exception e) {
            log.error("触发DAG失败:dagJson={}", dagJson, e);
            throw new RuntimeException("触发DAG失败", e);
        }
    }
}

2.5 状态管理:记录任务的执行状态

为了知道任务是否成功或失败,需要记录每个任务的执行状态。这里用Redis存储状态(方便分布式访问)。

2.5.1 定义任务状态枚举(TaskState)

src/main/java/com/example/dag/enum 目录下创建 TaskState.java

// TaskState.java(任务状态枚举)
package com.example.dag.enums;

public enum TaskState {
    PENDING,   // 待执行(等待依赖或触发)
    RUNNING,   // 执行中
    SUCCESS,   // 成功
    FAIL,      // 失败
    RETRYING   // 重试中
}
2.5.2 状态管理器(StateManager)

使用Redis存储每个任务的状态,并提供更新状态的方法:

// StateManager.java(状态管理器)
package com.example.dag.state;

import com.example.dag.enums.TaskState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RequiredArgsConstructor
public class StateManager {

    private final StringRedisTemplate redisTemplate;  // Redis的字符串模板(自动注入)

    // 存储任务状态的键格式:dag:{dagId}:status:{nodeId}
    private static final String STATUS_KEY_PREFIX = "dag:%s:status:%s";

    // 存储错误信息的键格式:dag:{dagId}:error:{nodeId}
    private static final String ERROR_KEY_PREFIX = "dag:%s:error:%s";

    /**
     * 标记任务为成功
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     */
    public void markTaskSuccess(String dagId, String nodeId) {
        updateTaskState(dagId, nodeId, TaskState.SUCCESS);
    }

    /**
     * 标记任务为失败
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     * @param errorMsg 失败原因
     */
    public void markTaskFail(String dagId, String nodeId, String errorMsg) {
        updateTaskState(dagId, nodeId, TaskState.FAIL);
        // 存储错误信息到Redis(可选,保留7天)
        redisTemplate.opsForValue().set(
                String.format(ERROR_KEY_PREFIX, dagId, nodeId), 
                errorMsg, 
                7, TimeUnit.DAYS
        );
    }

    /**
     * 标记任务为运行中
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     */
    public void markTaskRunning(String dagId, String nodeId) {
        updateTaskState(dagId, nodeId, TaskState.RUNNING);
    }

    /**
     * 更新任务状态
     */
    private void updateTaskState(String dagId, String nodeId, TaskState state) {
        String key = String.format(STATUS_KEY_PREFIX, dagId, nodeId);
        redisTemplate.opsForValue().set(key, state.name());
        log.info("更新任务状态:dagId={}, nodeId={}, state={}", dagId, nodeId, state);
    }

    /**
     * 获取任务状态
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     * @return 任务状态(如SUCCESS、FAIL)
     */
    public TaskState getTaskState(String dagId, String nodeId) {
        String key = String.format(STATUS_KEY_PREFIX, dagId, nodeId);
        String stateStr = redisTemplate.opsForValue().get(key);
        if (stateStr == null) {
            return TaskState.PENDING;  // 默认未开始
        }
        return TaskState.valueOf(stateStr);
    }

    /**
     * 获取任务错误信息
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     * @return 错误信息(无错误返回null)
     */
    public String getTaskError(String dagId, String nodeId) {
        String key = String.format(ERROR_KEY_PREFIX, dagId, nodeId);
        return redisTemplate.opsForValue().get(key);
    }
}

2.6 失败处理:重试与熔断

任务执行失败时,需要自动重试或触发熔断机制,避免无限失败。

2.6.1 失败重试策略(RetryPolicy)

定义一个简单的指数退避重试策略(失败后等待时间逐渐增加):

// RetryPolicy.java(失败重试策略)
package com.example.dag.retry;

import org.springframework.stereotype.Component;

@Component
public class RetryPolicy {

    private int maxRetryCount = 3;      // 最大重试次数(3次)
    private long initialInterval = 1000;  // 初始重试间隔(1秒)
    private double backoffFactor = 2;     // 退避因子(每次间隔翻倍)

    /**
     * 计算下一次重试的间隔时间
     * @param retryCount 当前已重试次数
     * @return 间隔时间(毫秒)
     */
    public long getNextInterval(int retryCount) {
        if (retryCount >= maxRetryCount) {
            return -1;  // 超过最大次数,不再重试
        }
        return (long) (initialInterval * Math.pow(backoffFactor, retryCount));
    }
}
2.6.2 熔断机制(CircuitBreaker)

当某个任务频繁失败(比如失败率超过50%),触发熔断,暂时跳过该任务:

// CircuitBreaker.java(熔断机制)
package com.example.dag.circuitbreaker;

import com.example.dag.enums.TaskState;
import com.example.dag.state.StateManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RequiredArgsConstructor
public class CircuitBreaker {

    private final StringRedisTemplate redisTemplate;
    private final StateManager stateManager;

    // 存储熔断状态的键格式:dag:{dagId}:circuit:{nodeId}
    private static final String CIRCUIT_KEY_PREFIX = "dag:%s:circuit:%s";

    // 熔断窗口大小(5分钟)
    private static final long WINDOW_SIZE = 5 * 60 * 1000;

    // 失败率阈值(50%)
    private static final double FAIL_RATE_THRESHOLD = 0.5;

    /**
     * 检查是否允许执行任务(熔断未触发时允许)
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     * @return 允许执行返回true,否则返回false
     */
    public boolean allowExecute(String dagId, String nodeId) {
        String key = String.format(CIRCUIT_KEY_PREFIX, dagId, nodeId);
        Long failCount = redisTemplate.opsForValue().get(key);

        if (failCount == null || failCount < 3) {  // 失败次数小于3次,允许执行
            return true;
        }

        // 计算失败率(假设窗口内有10次执行)
        double failRate = (double) failCount / 10;
        return failRate < FAIL_RATE_THRESHOLD;  // 失败率低于50%允许执行
    }

    /**
     * 记录任务失败(用于熔断统计)
     * @param dagId DAG的唯一ID
     * @param nodeId 任务ID
     */
    public void recordFailure(String dagId, String nodeId) {
        String key = String.format(CIRCUIT_KEY_PREFIX, dagId, nodeId);
        redisTemplate.opsForValue().increment(key);  // 失败次数+1
    }
}

三、测试验证

3.1 启动系统

  1. 确保MySQL和Redis已启动;
  2. 在Spring Boot项目的根目录执行 mvn spring-boot:run,启动应用;
  3. 控制台输出 Started DagSchedulerDemoApplication in X seconds 表示启动成功。

3.2 触发DAG执行

通过HTTP接口触发DAG执行(需要先添加一个Controller):

// DagController.java(提供HTTP接口触发DAG)
package com.example.dag.controller;

import com.example.dag.scheduler.DagScheduler;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/dag")
@RequiredArgsConstructor
public class DagController {

    private final DagScheduler dagScheduler;

    /**
     * 触发DAG执行的接口
     */
    @PostMapping("/trigger")
    public String triggerDag(@RequestBody String dagJson) {
        try {
            dagScheduler.triggerDag(dagJson);
            return "DAG触发成功!";
        } catch (Exception e) {
            e.printStackTrace();
            return "DAG触发失败:" + e.getMessage();
        }
    }
}

3.3 发送POST请求触发DAG

使用Postman或curl发送以下请求(替换 dagJson 为你的DAG定义):

curl -X POST http://localhost:8080/dag/trigger \
  -H "Content-Type: application/json" \
  -d '{
    "dagId": "order_pay_notify",
    "name": "订单支付后通知",
    "nodes": [
      {"id": "check_pay", "name": "支付校验", "jobClass": "com.example.job.CheckPayJob"},
      {"id": "send_sms", "name": "发送短信", "jobClass": "com.example.job.SendSmsJob"},
      {"id": "send_email", "name": "发送邮件", "jobClass": "com.example.job.SendEmailJob"},
      {"id": "record_log", "name": "记录日志", "jobClass": "com.example.job.RecordLogJob"}
    ],
    "edges": [
      {"from": "check_pay", "to": "send_sms"},
      {"from": "check_pay", "to": "send_email"},
      {"from": "send_sms", "to": "record_log"},
      {"from": "send_email", "to": "record_log"}
    ]
  }'

3.4 查看任务执行结果

3.4.1 查看控制台输出

任务执行时,控制台会打印每个任务的触发信息(如“触发任务:check_pay”)。

3.4.2 查看Redis状态

使用Redis客户端(如 redis-cli)查看任务状态:

redis-cli
> KEYS "dag:order_pay_notify:status:*"  # 查看所有任务状态键
1) "dag:order_pay_notify:status:check_pay"
2) "dag:order_pay_notify:status:send_sms"
3) "dag:order_pay_notify:status:send_email"
4) "dag:order_pay_notify:status:record_log"

> GET "dag:order_pay_notify:status:check_pay"  # 查看支付校验任务的状态
"SUCCESS"

> GET "dag:order_pay_notify:status:send_sms"  # 查看发送短信任务的状态
"SUCCESS"

> GET "dag:order_pay_notify:status:send_email"  # 查看发送邮件任务的状态
"SUCCESS"

> GET "dag:order_pay_notify:status:record_log"  # 查看记录日志任务的状态
"SUCCESS"

四、常见问题与解决

4.1 Quartz集群启动失败

现象:启动应用时报错 org.quartz.JobPersistenceException: Couldn't store trigger...
原因:Quartz的元数据表未正确创建。
解决:手动执行Quartz的建表脚本(https://github.com/quartz-scheduler/quartz/tree/main/quartz-core/src/main/resources/org/quartz/impl/jdbcjobstore),或检查MySQL用户是否有建表权限。

4.2 任务未按顺序执行

现象:“发送短信”任务在“支付校验”完成前就执行了。
原因:拓扑排序逻辑错误,或Quartz触发任务时未等待依赖。
解决:检查 TopologySorter 的排序逻辑,确保入度计算正确;或在触发任务时,检查前置任务是否已完成(通过 StateManager 查询状态)。

4.3 任务失败后未重试

现象:任务执行失败后,状态直接变为 FAIL,没有重试。
原因:未在任务中调用重试逻辑,或 RetryPolicy 未正确配置。
解决:在 BaseJobexecuteInternal 方法中,捕获异常后调用重试逻辑(如使用 RetryPolicy 计算间隔,重新触发任务)。


总结

到此为止,已经完成了一个支持“任务依赖、并行执行、失败重试”的分布式任务调度系统的搭建。核心流程是:

  1. 用JSON定义DAG的任务关系;
  2. 用拓扑排序生成执行顺序;
  3. 用Quartz触发任务执行;
  4. 用Redis记录任务状态;
  5. 用重试和熔断机制提升健壮性。

当然,这只是一个基础实现,实际生产环境中还可以扩展:

  • 增加任务参数传递(上游任务的输出作为下游任务的输入);
  • 提供可视化界面(如前端拖拽编辑DAG);
  • 集成监控告警(任务失败时发送邮件/钉钉通知);
  • 支持动态修改DAG(运行时调整任务链路)。

最后,希望通过本文,你能大致了解DAG+Quartz整个流程,后续可以在此基础上进一步优化探索!


网站公告

今日签到

点亮在社区的每一天
去签到