分布式微服务系统架构第155集:JavaPlus技术文档平台日更-Java线程池实现原理

发布于:2025-07-07 ⋅ 阅读:(19) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

https://github.com/webVueBlog/fastapi_plus

https://webvueblog.github.io/JavaPlusDoc/

点击勘误issues,哪吒感谢大家的阅读

Java线程池实现原理

1. 线程池概述

1.1 什么是线程池

线程池(Thread Pool)是一种多线程处理形式,预先创建若干个线程,这些线程在没有任务处理时处于等待状态,当有任务来临时分配给其中的一个线程来处理,当处理完后又回到等待状态等待下一个任务。

1.2 为什么要使用线程池

传统方式创建线程的问题:
┌─────────────────────────────────────┐
│  每次需要执行任务时创建新线程        │
│  ↓                                 │
│  线程执行完任务后被销毁              │
│  ↓                                 │
│  频繁创建和销毁线程开销大            │
│  ↓                                 │
│  无法控制线程数量,可能导致系统崩溃   │
└─────────────────────────────────────┘

线程池的优势:
┌─────────────────────────────────────┐
│  ✓ 降低资源消耗                     │
│  ✓ 提高响应速度                     │
│  ✓ 提高线程的可管理性                │
│  ✓ 提供更多更强大的功能              │
└─────────────────────────────────────┘

2. 线程池核心参数

2.1 ThreadPoolExecutor构造参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

2.2 参数详解

参数

说明

作用

corePoolSize

核心线程数

线程池中始终保持的线程数量

maximumPoolSize

最大线程数

线程池中允许的最大线程数量

keepAliveTime

线程空闲时间

非核心线程空闲时的存活时间

unit

时间单位

keepAliveTime的时间单位

workQueue

工作队列

存储等待执行任务的队列

threadFactory

线程工厂

创建新线程的工厂

handler

拒绝策略

当线程池和队列都满时的处理策略

2.3 参数关系图解

线程池执行流程:
┌─────────────────────────────────────────────────────────┐
│                    提交任务                              │
│                      ↓                                 │
│              核心线程数是否已满?                        │
│                ↙        ↘                              │
│              否          是                             │
│              ↓           ↓                             │
│          创建核心线程    工作队列是否已满?               │
│          执行任务        ↙        ↘                     │
│                        否          是                   │
│                        ↓           ↓                   │
│                    加入队列    最大线程数是否已满?       │
│                              ↙        ↘                │
│                            否          是               │
│                            ↓           ↓               │
│                      创建非核心线程   执行拒绝策略       │
│                      执行任务                           │
└─────────────────────────────────────────────────────────┘

3. 工作队列类型

3.1 常用队列类型

// 1. ArrayBlockingQueue - 有界队列
BlockingQueue<Runnable> queue1 = new ArrayBlockingQueue<>(100);

// 2. LinkedBlockingQueue - 无界队列(默认Integer.MAX_VALUE)
BlockingQueue<Runnable> queue2 = new LinkedBlockingQueue<>();

// 3. SynchronousQueue - 同步队列
BlockingQueue<Runnable> queue3 = new SynchronousQueue<>();

// 4. PriorityBlockingQueue - 优先级队列
BlockingQueue<Runnable> queue4 = new PriorityBlockingQueue<>();

// 5. DelayQueue - 延迟队列
BlockingQueue<Runnable> queue5 = new DelayQueue<>();

3.2 队列特性对比

队列类型

容量

特点

适用场景

ArrayBlockingQueue

有界

基于数组,FIFO

资源有限,需要控制内存使用

LinkedBlockingQueue

无界

基于链表,FIFO

任务量不确定,但要避免拒绝

SynchronousQueue

0

直接传递

任务量大,希望直接处理

PriorityBlockingQueue

无界

优先级排序

任务有优先级要求

DelayQueue

无界

延迟执行

定时任务场景

4. 拒绝策略

4.1 内置拒绝策略

// 1. AbortPolicy - 抛出异常(默认)
RejectedExecutionHandler abort = new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy - 调用者运行
RejectedExecutionHandler caller = new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy - 丢弃任务
RejectedExecutionHandler discard = new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy - 丢弃最老任务
RejectedExecutionHandler discardOldest = new ThreadPoolExecutor.DiscardOldestPolicy();

4.2 自定义拒绝策略

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志
        System.err.println("任务被拒绝: " + r.toString());
        
        // 可以选择:
        // 1. 保存到数据库或文件
        // 2. 发送到消息队列
        // 3. 降级处理
        // 4. 通知监控系统
        
        // 示例:保存到备用队列
        saveToBackupQueue(r);
    }
    
    private void saveToBackupQueue(Runnable task) {
        // 实现备用处理逻辑
        System.out.println("任务已保存到备用队列");
    }
}

5. 线程池状态

5.1 线程池生命周期

线程池状态转换图:
┌─────────────────────────────────────────────────────────┐
│                                                         │
│    RUNNING ──────shutdown()─────→ SHUTDOWN              │
│       │                              │                 │
│       │                              │                 │
│  shutdownNow()                   队列为空且             │
│       │                        活跃线程为0              │
│       ↓                              ↓                 │
│     STOP ──────队列为空且─────→ TIDYING ──terminated()─→│
│              活跃线程为0              │                 │
│                                      ↓                 │
│                                 TERMINATED              │
│                                                         │
└─────────────────────────────────────────────────────────┘

5.2 状态详解

// 线程池状态常量
privatestaticfinalint RUNNING    = -1 << COUNT_BITS;  // 接受新任务,处理队列任务
privatestaticfinalint SHUTDOWN   =  0 << COUNT_BITS;  // 不接受新任务,处理队列任务
privatestaticfinalint STOP       =  1 << COUNT_BITS;  // 不接受新任务,不处理队列任务
privatestaticfinalint TIDYING    =  2 << COUNT_BITS;  // 所有任务终止,线程数为0
privatestaticfinalint TERMINATED =  3 << COUNT_BITS;  // terminated()方法执行完成

6. 核心源码分析

6.1 execute方法源码分析

public void execute(Runnable command) {
    if (command == null)
        thrownew NullPointerException();
    
    // 获取当前线程池状态和线程数
    int c = ctl.get();
    
    // 1. 如果当前线程数 < 核心线程数,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 2. 如果线程池运行中且任务成功加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查:如果线程池不是运行状态,移除任务并拒绝
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果没有工作线程,创建一个
        elseif (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列满了,尝试创建非核心线程
    elseif (!addWorker(command, false))
        // 创建失败,执行拒绝策略
        reject(command);
}

6.2 addWorker方法分析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 检查线程池状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            returnfalse;
        
        for (;;) {
            int wc = workerCountOf(c);
            // 检查线程数是否超限
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                returnfalse;
            // CAS增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次检查线程池状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        thrownew IllegalThreadStateException();
                    // 添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

6.3 Worker内部类

private finalclass Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatilelong completedTasks;
    
    Worker(Runnable firstTask) {
        setState(-1); // 禁止中断直到runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this);
    }
    
    // AQS方法实现
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            returntrue;
        }
        returnfalse;
    }
    
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        returntrue;
    }
}

6.4 runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 循环获取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查线程池状态,决定是否中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; thrownew Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

7. 实际应用示例

7.1 基础使用示例

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                                    // 核心线程数
            4,                                    // 最大线程数
            60L,                                  // 空闲时间
            TimeUnit.SECONDS,                     // 时间单位
            new ArrayBlockingQueue<>(10),         // 工作队列
            new ThreadFactory() {                 // 线程工厂
                private AtomicInteger threadNumber = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // 提交任务
        for (int i = 0; i < 20; i++) {
            finalint taskId = i;
            executor.execute(() -> {
                System.out.println("执行任务 " + taskId + 
                    " - 线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

7.2 监控线程池状态

public class ThreadPoolMonitor {
    privatefinal ThreadPoolExecutor executor;
    privatefinal ScheduledExecutorService monitor;
    
    public ThreadPoolMonitor(ThreadPoolExecutor executor) {
        this.executor = executor;
        this.monitor = Executors.newScheduledThreadPool(1);
    }
    
    public void startMonitoring() {
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("=== 线程池状态监控 ===");
            System.out.println("核心线程数: " + executor.getCorePoolSize());
            System.out.println("最大线程数: " + executor.getMaximumPoolSize());
            System.out.println("当前线程数: " + executor.getPoolSize());
            System.out.println("活跃线程数: " + executor.getActiveCount());
            System.out.println("队列大小: " + executor.getQueue().size());
            System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
            System.out.println("总任务数: " + executor.getTaskCount());
            System.out.println("是否关闭: " + executor.isShutdown());
            System.out.println("是否终止: " + executor.isTerminated());
            System.out.println("========================\n");
        }, 0, 5, TimeUnit.SECONDS);
    }
    
    public void stopMonitoring() {
        monitor.shutdown();
    }
}

7.3 优雅关闭线程池

public class GracefulShutdown {
    public static void shutdownThreadPool(ExecutorService executor) {
        executor.shutdown(); // 不再接受新任务
        
        try {
            // 等待已提交任务完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                System.out.println("强制关闭线程池");
                executor.shutdownNow(); // 强制关闭
                
                // 等待任务响应中断
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("线程池未能正常关闭");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("关闭过程被中断,强制关闭");
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

8. 最佳实践

8.1 参数配置建议

public class ThreadPoolBestPractices {
    
    // CPU密集型任务
    public static ThreadPoolExecutor createCpuIntensivePool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        returnnew ThreadPoolExecutor(
            cpuCount,                           // 核心线程数 = CPU核数
            cpuCount,                           // 最大线程数 = CPU核数
            0L, TimeUnit.MILLISECONDS,          // 无需保持空闲线程
            new LinkedBlockingQueue<>(100),     // 有界队列
            new ThreadFactory() {
                private AtomicInteger counter = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    returnnew Thread(r, "CPU-Worker-" + counter.getAndIncrement());
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    // IO密集型任务
    public static ThreadPoolExecutor createIoIntensivePool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        returnnew ThreadPoolExecutor(
            cpuCount * 2,                       // 核心线程数 = CPU核数 * 2
            cpuCount * 4,                       // 最大线程数 = CPU核数 * 4
            60L, TimeUnit.SECONDS,              // 空闲线程保持60秒
            new LinkedBlockingQueue<>(200),     // 较大的队列
            new ThreadFactory() {
                private AtomicInteger counter = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "IO-Worker-" + counter.getAndIncrement());
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    // 混合型任务
    public static ThreadPoolExecutor createMixedPool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        returnnew ThreadPoolExecutor(
            cpuCount + 1,                       // 核心线程数 = CPU核数 + 1
            cpuCount * 2 + 1,                   // 最大线程数 = (CPU核数 + 1) * 2
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(150),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

8.2 常见问题和解决方案

public class ThreadPoolTroubleshooting {
    
    // 问题1:内存泄漏
    public static void avoidMemoryLeak() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10),  // 使用有界队列
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        // 确保在应用关闭时正确关闭线程池
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("关闭线程池...");
            GracefulShutdown.shutdownThreadPool(executor);
        }));
    }
    
    // 问题2:任务执行异常处理
    publicstaticclass SafeThreadPoolExecutor extends ThreadPoolExecutor {
        public SafeThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    ((Future<?>) r).get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            if (t != null) {
                System.err.println("任务执行异常: " + t.getMessage());
                t.printStackTrace();
                // 可以添加告警、日志记录等
            }
        }
    }
    
    // 问题3:线程池大小动态调整
    public static void dynamicAdjustment(ThreadPoolExecutor executor) {
        // 监控系统负载,动态调整线程池大小
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            double cpuUsage = getCpuUsage(); // 获取CPU使用率
            int queueSize = executor.getQueue().size();
            
            if (cpuUsage > 0.8 && queueSize > 50) {
                // CPU使用率高且队列积压,增加线程
                int currentMax = executor.getMaximumPoolSize();
                executor.setMaximumPoolSize(Math.min(currentMax + 2, 20));
                System.out.println("增加最大线程数到: " + executor.getMaximumPoolSize());
            } elseif (cpuUsage < 0.3 && queueSize < 10) {
                // CPU使用率低且队列空闲,减少线程
                int currentMax = executor.getMaximumPoolSize();
                int coreSize = executor.getCorePoolSize();
                executor.setMaximumPoolSize(Math.max(currentMax - 1, coreSize));
                System.out.println("减少最大线程数到: " + executor.getMaximumPoolSize());
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private static double getCpuUsage() {
        // 简化的CPU使用率获取,实际应用中可以使用JMX
        return Math.random();
    }
}

9. 总结

9.1 核心要点

  1. 理解参数含义:正确配置核心线程数、最大线程数、队列大小等参数

  2. 选择合适队列:根据业务场景选择有界或无界队列

  3. 制定拒绝策略:根据业务需求选择或自定义拒绝策略

  4. 监控线程池状态:实时监控线程池的运行状态和性能指标

  5. 优雅关闭:确保应用关闭时正确关闭线程池

9.2 性能调优建议

调优步骤:
1. 分析任务特性(CPU密集型 vs IO密集型)
2. 确定合理的线程数量
3. 选择合适的队列类型和大小
4. 配置适当的拒绝策略
5. 添加监控和告警
6. 压力测试验证配置
7. 根据监控数据持续优化

9.3 注意事项

  • 避免使用Executors工具类:推荐手动创建ThreadPoolExecutor

  • 合理设置队列大小:避免无界队列导致内存溢出

  • 处理任务异常:重写afterExecute方法处理任务执行异常

  • 线程池复用:避免频繁创建和销毁线程池

  • 资源清理:确保线程池正确关闭,避免资源泄漏

通过深入理解线程池的实现原理和最佳实践,可以更好地利用线程池提升应用性能和稳定性。


网站公告

今日签到

点亮在社区的每一天
去签到