【后端】Java封装一个多线程处理任务,可以设置任务优先级优先插队处理,并且提供根据任务ID取消任务

发布于:2025-09-13 ⋅ 阅读:(19) ⋅ 点赞:(0)

这个需求其实就是一个 带优先级调度、可取消的线程池任务管理器。封装一个 Java Demo,支持:

  1. 任务优先级:数值越大优先级越高,可以“插队”。

  2. 任务取消:可以根据任务 ID 取消还未执行的任务。

  3. 多线程并发执行:用线程池来跑任务。


✨ 基础功能

实现思路

  • 使用 PriorityBlockingQueue 管理任务,保证高优先级任务先执行。

  • 封装一个 Task 类,实现 RunnableComparable<Task>,用于排序。

  • 每个任务有 唯一 ID,方便取消。

  • 使用 ThreadPoolExecutor 配合优先级队列执行任务。


代码示例

import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 优先级任务
 */
class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private final String taskId;
    private final int priority;
    private final Runnable action;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);

    public PriorityTask(String taskId, int priority, Runnable action) {
        this.taskId = taskId;
        this.priority = priority;
        this.action = action;
    }

    public String getTaskId() {
        return taskId;
    }

    public void cancel() {
        cancelled.set(true);
    }

    @Override
    public void run() {
        if (!cancelled.get()) {
            action.run();
        } else {
            System.out.println("任务 " + taskId + " 已被取消,未执行");
        }
    }

    @Override
    public int compareTo(PriorityTask other) {
        // 优先级高的排在前面
        return Integer.compare(other.priority, this.priority);
    }
}

/**
 * 任务管理器
 */
class TaskManager {
    private final ThreadPoolExecutor executor;
    private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();

    public TaskManager(int poolSize) {
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        this.executor = new ThreadPoolExecutor(
                poolSize,
                poolSize,
                60L,
                TimeUnit.SECONDS,
                queue
        );
    }

    /** 提交任务 */
    public String submitTask(int priority, Runnable action) {
        String taskId = UUID.randomUUID().toString();
        PriorityTask task = new PriorityTask(taskId, priority, action);
        taskMap.put(taskId, task);
        executor.execute(task);
        return taskId;
    }

    /** 取消任务 */
    public boolean cancelTask(String taskId) {
        PriorityTask task = taskMap.remove(taskId);
        if (task != null) {
            task.cancel();
            return true;
        }
        return false;
    }

    /** 关闭线程池 */
    public void shutdown() {
        executor.shutdown();
    }
}

/**
 * 测试
 */
public class PriorityTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        TaskManager manager = new TaskManager(3);

        // 提交一些任务
        String id1 = manager.submitTask(1, () -> {
            System.out.println("执行任务1,优先级1");
        });

        String id2 = manager.submitTask(10, () -> {
            System.out.println("执行任务2,优先级10");
        });

        String id3 = manager.submitTask(5, () -> {
            System.out.println("执行任务3,优先级5");
        });

        // 取消一个任务
        boolean cancelled = manager.cancelTask(id3);
        System.out.println("取消任务3结果: " + cancelled);

        Thread.sleep(2000);
        manager.shutdown();
    }
}

执行结果示例

可能输出类似:

执行任务1,优先级1
执行任务2,优先级10
任务 c264a955-23e2-4192-8914-bd5e83d9f89d 已被取消,未执行
取消任务3结果: true

说明:

  • 任务 优先级10 插队最先执行。

  • 任务3 被取消,不会执行。

  • 任务1 继续执行。


进一步扩展一下,让任务支持 回调监听(成功/失败/取消),方便在业务里统一埋点


✨ 功能扩展

  1. 每个任务可以带一个 TaskListener 回调,监听任务状态:

    • onSuccess(String taskId)

    • onFailure(String taskId, Throwable error)

    • onCancelled(String taskId)

  2. PriorityTask 内部捕获异常,调用对应的回调方法。

  3. TaskManager 提交任务时可以选择性传入监听器。


完整代码

import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 任务监听器
 */
interface TaskListener {
    void onSuccess(String taskId);
    void onFailure(String taskId, Throwable error);
    void onCancelled(String taskId);
}

/**
 * 优先级任务
 */
class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private final String taskId;
    private final int priority;
    private final Runnable action;
    private final TaskListener listener;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);

    public PriorityTask(String taskId, int priority, Runnable action, TaskListener listener) {
        this.taskId = taskId;
        this.priority = priority;
        this.action = action;
        this.listener = listener;
    }

    public String getTaskId() {
        return taskId;
    }

    public void cancel() {
        cancelled.set(true);
    }

    @Override
    public void run() {
        if (cancelled.get()) {
            if (listener != null) listener.onCancelled(taskId);
            System.out.println("任务 " + taskId + " 已被取消,未执行");
            return;
        }
        try {
            action.run();
            if (listener != null) listener.onSuccess(taskId);
        } catch (Throwable t) {
            if (listener != null) listener.onFailure(taskId, t);
        }
    }

    @Override
    public int compareTo(PriorityTask other) {
        // 优先级高的排在前面
        return Integer.compare(other.priority, this.priority);
    }
}

/**
 * 任务管理器
 */
class TaskManager {
    private final ThreadPoolExecutor executor;
    private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();

    public TaskManager(int poolSize) {
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        this.executor = new ThreadPoolExecutor(
                poolSize,
                poolSize,
                60L,
                TimeUnit.SECONDS,
                queue
        );
    }

    /** 提交任务(带监听器) */
    public String submitTask(int priority, Runnable action, TaskListener listener) {
        String taskId = UUID.randomUUID().toString();
        PriorityTask task = new PriorityTask(taskId, priority, action, listener);
        taskMap.put(taskId, task);
        executor.execute(task);
        return taskId;
    }

    /** 简化方法(无监听器) */
    public String submitTask(int priority, Runnable action) {
        return submitTask(priority, action, null);
    }

    /** 取消任务 */
    public boolean cancelTask(String taskId) {
        PriorityTask task = taskMap.remove(taskId);
        if (task != null) {
            task.cancel();
            return true;
        }
        return false;
    }

    /** 关闭线程池 */
    public void shutdown() {
        executor.shutdown();
    }
}

/**
 * 测试
 */
public class PriorityTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        TaskManager manager = new TaskManager(3);

        // 定义一个统一的监听器
        TaskListener listener = new TaskListener() {
            @Override
            public void onSuccess(String taskId) {
                System.out.println("任务 " + taskId + " 执行成功 ✅");
            }

            @Override
            public void onFailure(String taskId, Throwable error) {
                System.out.println("任务 " + taskId + " 执行失败 ❌: " + error.getMessage());
            }

            @Override
            public void onCancelled(String taskId) {
                System.out.println("任务 " + taskId + " 被取消 ⏹️");
            }
        };

        // 提交任务
        String id1 = manager.submitTask(1, () -> {
            System.out.println("执行任务1(优先级1)");
        }, listener);

        String id2 = manager.submitTask(10, () -> {
            System.out.println("执行任务2(优先级10)");
            throw new RuntimeException("模拟异常");
        }, listener);

        String id3 = manager.submitTask(5, () -> {
            System.out.println("执行任务3(优先级5)");
        }, listener);

        // 取消一个任务
        boolean cancelled = manager.cancelTask(id3);
        System.out.println("取消任务3结果: " + cancelled);

        Thread.sleep(2000);
        manager.shutdown();
    }
}

运行结果示例

执行任务1(优先级1)
执行任务2(优先级10)
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 被取消 ⏹️
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 已被取消,未执行
任务 ede52aab-e4a3-4896-88a3-e664b1c1a667 执行成功 ✅
取消任务3结果: true
任务 36230882-e049-44a9-8406-a90d34acad47 执行失败 ❌: 模拟异常

这样就实现了:

  • 优先级插队(10 → 5 → 1)

  • 取消任务(取消后不会执行,并回调 onCancelled

  • 失败捕获(异常不会导致线程池崩溃,回调 onFailure

  • 成功回调(回调 onSuccess


再加一个 全局监听器(所有任务统一回调,方便在一个地方打点统计),希望在 一个地方统一打点统计,包括:

  • 任务ID

  • 开始时间

  • 结束时间

  • 耗时

  • 执行状态(成功 / 失败 / 取消)

这可以通过在 TaskManager 里引入一个 全局监听器 GlobalTaskListener 来实现。


✨ 改进方案

  1. 新增 GlobalTaskListener 接口,统一接收所有任务的生命周期事件。

  2. PriorityTask 内部在执行时打点(记录开始、结束时间),并回调到 GlobalTaskListener

  3. TaskManager 可以配置一个全局监听器(所有任务都会走这里)。


🔧 完整代码

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 全局任务监听器(统一打点)
 */
interface GlobalTaskListener {
    void onTaskStart(String taskId, int priority, Instant startTime);

    void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime, Duration duration);

    void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime, Duration duration,
            Throwable error);

    void onTaskCancelled(String taskId, int priority, Instant startTime);
}

/**
 * 优先级任务
 */
class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private final String taskId;
    private final int priority;
    private final Runnable action;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final GlobalTaskListener globalListener;

    public PriorityTask(String taskId, int priority, Runnable action, GlobalTaskListener globalListener) {
        this.taskId = taskId;
        this.priority = priority;
        this.action = action;
        this.globalListener = globalListener;
    }

    public String getTaskId() {
        return taskId;
    }

    public void cancel() {
        cancelled.set(true);
    }

    @Override
    public void run() {
        Instant startTime = Instant.now();

        if (cancelled.get()) {
            if (globalListener != null) {
                globalListener.onTaskCancelled(taskId, priority, startTime);
            }
            System.out.println("任务 " + taskId + " 已被取消,未执行");
            return;
        }

        if (globalListener != null) {
            globalListener.onTaskStart(taskId, priority, startTime);
        }

        try {
            action.run();
            Instant endTime = Instant.now();
            if (globalListener != null) {
                globalListener.onTaskSuccess(taskId, priority, startTime, endTime,
                        Duration.between(startTime, endTime));
            }
        } catch (Throwable t) {
            Instant endTime = Instant.now();
            if (globalListener != null) {
                globalListener.onTaskFailure(taskId, priority, startTime, endTime, Duration.between(startTime, endTime),
                        t);
            }
        }
    }

    @Override
    public int compareTo(PriorityTask other) {
        // 优先级高的排在前面
        return Integer.compare(other.priority, this.priority);
    }
}

/**
 * 任务管理器
 */
class TaskManager {
    private final ThreadPoolExecutor executor;
    private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();
    private final GlobalTaskListener globalListener;

    public TaskManager(int poolSize, GlobalTaskListener globalListener) {
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        this.executor = new ThreadPoolExecutor(
                poolSize,
                poolSize,
                60L,
                TimeUnit.SECONDS,
                queue);
        this.globalListener = globalListener;
    }

    /** 提交任务 */
    public String submitTask(int priority, Runnable action) {
        String taskId = UUID.randomUUID().toString();
        PriorityTask task = new PriorityTask(taskId, priority, action, globalListener);
        taskMap.put(taskId, task);
        executor.execute(task);
        return taskId;
    }

    /** 取消任务 */
    public boolean cancelTask(String taskId) {
        PriorityTask task = taskMap.remove(taskId);
        if (task != null) {
            task.cancel();
            return true;
        }
        return false;
    }

    /** 关闭线程池 */
    public void shutdown() {
        executor.shutdown();
    }
}

/**
 * 测试
 */
public class PriorityTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        // 定义全局监听器
        GlobalTaskListener listener = new GlobalTaskListener() {
            @Override
            public void onTaskStart(String taskId, int priority, Instant startTime) {
                System.out.println("任务 " + taskId + "(优先级 " + priority + ")开始执行,时间:" + startTime);
            }

            @Override
            public void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime,
                    Duration duration) {
                System.out.println("任务 " + taskId + " 成功 ✅,耗时:" + duration.toMillis() + " ms");
            }

            @Override
            public void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime,
                    Duration duration, Throwable error) {
                System.out.println("任务 " + taskId + " 失败 ❌,耗时:" + duration.toMillis() + " ms,错误:" + error.getMessage());
            }

            @Override
            public void onTaskCancelled(String taskId, int priority, Instant startTime) {
                System.out.println("任务 " + taskId + " 被取消 ⏹️,取消时间:" + startTime);
            }
        };

        TaskManager manager = new TaskManager(3, listener);

        // 提交几个任务
        String id1 = manager.submitTask(1, () -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
            }
            System.out.println("执行任务1");
        });

        String id2 = manager.submitTask(10, () -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException ignored) {
            }
            System.out.println("执行任务2");
            throw new RuntimeException("模拟异常");
        });

        String id3 = manager.submitTask(5, () -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException ignored) {
            }
            System.out.println("执行任务3");
        });

        // 取消任务3
        boolean cancelled = manager.cancelTask(id3);
        System.out.println("取消任务3结果: " + cancelled);

        Thread.sleep(2000);
        manager.shutdown();
    }
}

🖥️ 输出示例

取消任务3结果: true
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 被取消 ⏹️,取消时间:2025-09-12T15:49:57.720205Z
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a(优先级 1)开始执行,时间:2025-09-12T15:49:57.719738Z
任务 d489c530-70d7-4ee7-87b8-627eba04c797(优先级 10)开始执行,时间:2025-09-12T15:49:57.719892Z
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 已被取消,未执行
执行任务2
任务 d489c530-70d7-4ee7-87b8-627eba04c797 失败 ❌,耗时:218 ms,错误:模拟异常
执行任务1
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a 成功 ✅,耗时:518 ms

这样就可以在一个地方(GlobalTaskListener)统一打点,清楚知道:

  • 任务什么时候开始

  • 是否执行成功 / 失败 / 被取消

  • 每个任务的耗时


网站公告

今日签到

点亮在社区的每一天
去签到