ThreadPoolExecutor 源码分析

发布于:2023-01-22 ⋅ 阅读:(6) ⋅ 点赞:(0) ⋅ 评论:(0)

线程池概述

线程池,是指管理一组同构工作线程的资源池。
线程池在工作队列(Work Queue)中保存了所有等待执行的任务。工作者线程(Work Thread)会从工作队列中获取一个任务并执行,然后返回线程池并等待下一个任务。

线程池比执行任务再创建线程会有以下优势:

  1. 节省资源。通过重用线程来省去在线程创建和销毁过程中产生的开销。
  2. 提高响应性。当执行请求到达时,工作线程通常已经存在,因此不需要等待线程创建,从而提高了响应性。
  3. 防止资源耗尽。通过调整线程池的大小,防止过多线程相互竞争资源。

ThreadPoolExecutor 是Java中线程池的实现类。下图是继承关系:

AbstractExecutorService

«Interface»

Executor

«Interface»

ExecutorService

ThreadPoolExecutor

  • Executor ,基于生产者-消费者模式,提交相当于生产者,执行任务相当于消费者。通过该模式将任务的提交与执行解耦开来。
  • ExecutorService,是Executor 的实现,该接口基于Executor 提供了生命周期的支持,例如任务的运行、关闭和中止,以及通过 submit 方法来创建一个异步任务 Future。
  • AbstractExecutorService,是ExecutorService 的实现,使得下层实现只需要关注任务的执行。
  • ThreadPoolExecutor,是Java中线程池的实现。

下图是ThreadPoolExecutor的大致结构

生命周期

Executor 的实现通常会创建线程来执行任务。由于 Executor 以异步的方式来执行任务,因此之前提交任务的状态不是立即可见的,有些任务已经完成,有些还在运行,有些在等待执行。当关闭应用程序时,是完成当前任务并不接受新任务,还是直接关闭所有任务(不管是在执行还是没有执行)。
为了解决执行服务的生命周期问题,Executor 扩展了 ExecutorService 接口,该接口提供了对生命周期管理的方法。

public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean awaitTermination(long timeout, TimeUnit unit);boolean isShutdown();boolean isTerminated();// 以及一些用于创建异步任务的方法<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);Future<?> submit(Runnable task);....}

在ThreadPoolExecutor中提供了五种状态:

下图是状态之间的转换关系:

如果任务队列被填满(在队列大小有限的情况下)或者某个任务被提交到一个已经被关闭的Executor中时应该怎么处理这些情况?JDK提供了一种策略来处理这些情况--饱和策略

在ThreadPoolExecutor中通过ctl字段来维护了线程池的运行状态和线程数量(工作者线程)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

具体的可以通过两个参数来说明:

  • COUNT_BIT的值为29(32-3)
  • COUNT_MASK 则为高三位为0、低29位全1的字段
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

每个状态的取值如下,每个取值都向左边移动了29位:

private static final int RUNNING    = -1 << COUNT_BITS;  private static final int SHUTDOWN   =  0 << COUNT_BITS;  private static final int STOP       =  1 << COUNT_BITS;  private static final int TIDYING    =  2 << COUNT_BITS;  private static final int TERMINATED =  3 << COUNT_BITS;

所以高三位用来表示线程池的状态
之后用来确定线程池中的线程数量都使用COUNT_MASK来计算,这样就能计算低29位

private static int workerCountOf(int c)  { return c & COUNT_MASK; }

ThreadPoolExecutor 通过如下的方法来检测当前线程池的状态:
运行是要比终结状态小的

// 当前状态要比给定的状态小,如 running < terminatedprivate static boolean runStateLessThan(int c, int s) {      return c < s;  }  // 当前状态要高于给定的状态,如 terminated >= runningprivate static boolean runStateAtLeast(int c, int s) {      return c >= s;  }  // 检测当前线程是否在运行private static boolean isRunning(int c) {      return c < SHUTDOWN;  }

饱和(拒绝)策略

ThreadPoolExecutor的饱和策略可以通过调用
setRejectedtExecutionHandler 来修改。
JDK 提供了几种不同的 RejectedExecutionHandler 的实现:

队列任务管理

如果请求速率超过了线程池的处理速率,那么新到来的请求将会累计起来,这些请求会在一个由 Executor 管理的 Runnable 队列(也就是任务队列 Work Queue)中等待。但这仍有可能超出缓存的数量。
基本的任务排队方法有三种:

  • 有界队列:有长度限制
  • 无界队列:没有长度限制
  • 同步移交:立即将元素传输给正在等待的消费者
    BlockingQueue 是ThreadPoolExecutor 的任务队列:
public interface BlockingQueue<E> extends Queue<E> {offer();pool();put();take();....}

JDK 提供了以下几种实现类:

工作者线程

所有的任务都是由工作者线程来执行的,那么工作者线程是如何执行这些任务的,以及线程池是如何维护工作者线程的。下面是ThreadPoolExecutor中的工作者线程类Worker

private final class Worker      extends AbstractQueuedSynchronizer      implements Runnable  {// 线程final Thread thread;// 第一个任务Runnable firstTask;// 执行任务的数量volatile long completedTasks;....}

工作者线程实现了Runnable接口,并且它包含了一个Thread ,说明工作者线程是一个特殊的任务也是一个线程,它可以去执行一些其他任务也可以自控制。

工作者线程继承自AQS,而不是使用的可重入锁ReentrantLock,目的是实现可不重入的特性(线程封闭)。每个工作者线程必须是线程封闭的。
在工作者线程构造器中有一段:

this.thread = getThreadFactory().newThread(this);

通过调用线程工厂创建线程并将this(也就是工作者线程)提交给Thread
也就是执行Thread就启动了自己,所以工作者线程可以自己管理自己
addWorkder中创建工作者线程之后执行了它,这里的t就是thread

t.start();  

线程池中的工作者线程的状态由两个字段来控制:

// 生存时间private volatile long keepAliveTime;// 是否允许核心线程超时等待private volatile boolean allowCoreThreadTimeOut;

keepAliveTime 是当线程数量大于核心线程数数量时工作者线程没有任务时存活的时间
例如当前工作者线程数量是30,核心线程数量上限是20,最大线程数量是30。那么多出来10个线程在线程池比较空闲的时候是需要清除的,因为这是占用了多余的系统资源。
keepAliveTime是为了保证突然之间线程池繁忙的情况,这时候就没必要立马清除这些线程,可以"等等看"有没有突发情况。

allowCoreThreadTimeOut 则是使得核心线程也受keepAliveTime的影响
这些具体体现在从队列中获取任务的时候,下面会详细描述

线程工厂

每当线程池创建一个线程,都是通过线程的工厂方法创建的。
默认的线程工厂方法创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过制定一个工厂方法,可以定制线程池的配置信息。每当线程池需要一个新的线程都会调用getThreadFactory()这个方法。

以下是ThreadPoolExecutor 创建一个工作者线程,是通过工厂方法创建的:

Worker(Runnable firstTask) {      setState(-1); // inhibit interrupts until runWorker      this.firstTask = firstTask;      this.thread = getThreadFactory().newThread(this);  }

ThreadFactory 是一个接口

public interface ThreadFactory {    Thread newThread(Runnable r);  }

在构造线程池ThreadPoolExecutor时,可以传入一个线程工厂,使得创建线程时通过该线程工厂创建。

执行一个任务

ThreadPoolExecutor 是通过实现Executor来执行任务的
具体分为三个步骤:

  1. 如果当前正在运行的线程小于corePoolSize则创建一个工作者线程并将该任务作为该线程的第一个任务执行
  2. 如果当前任务能够进入任务队列,仍需要检查线程池的运行状态。如果线程池关闭则需要将任务交给饱和策略处理。如果没有关闭并且工作者线程为0,则需要创建工作者线程(这个时候任务已经在队列中)。
  3. 如果不能够进入队列,则尝试创建工作者线程去处理任务;如果失败则说明已经饱和,则将任务交给饱和(拒绝)策略处理。

copy from Java线程池实现原理及其在美团业务中的实践

通过源码来理解一下:

public void execute(Runnable command) {  // 任务不能为空    if (command == null)          throw new NullPointerException();// 任务数量int c = ctl.get();      if (workerCountOf(c) < corePoolSize) {          if (addWorker(command, true))              return;          c = ctl.get();      }      if (isRunning(c) && workQueue.offer(command)) {          int recheck = ctl.get();          if (! isRunning(recheck) && remove(command))              reject(command);          else if (workerCountOf(recheck) == 0)              addWorker(null, false);      }      else if (!addWorker(command, false))          reject(command);  }

当执行一个任务会将一个任务放到工作队列中或者是直接创建一个工作者线程去执行该任务。
addWorker 是创建一个工作者线程并运行一个任务的(可以不运行一个任务)
这段代码主要做两个工作:

  1. 保证能正常添加工作者线程,数量不能超过设定的范围并且线程池没有关闭。
  2. 向工作者线程组中添加一个工作者线程并且线程池在正常运行,之后运行该线程去执行任务。
private boolean addWorker(Runnable firstTask, boolean core) {  // 保证能添加工作者线程    retry:      for (int c = ctl.get();;) {          // Check if queue empty only if necessary.          if (runStateAtLeast(c, SHUTDOWN)              && (runStateAtLeast(c, STOP)                  || firstTask != null                  || workQueue.isEmpty()))              return false;            for (;;) {        // 如果超过限定数量,这个数量可以是最小的活跃线程数量可以是最大的活跃线程数量              if (workerCountOf(c)                  >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))                return false;              if (compareAndIncrementWorkerCount(c))              // 只有能增加工作者线程才退出                break retry;              //重新获取并检查运行状态            c = ctl.get();  // Re-read ctl              if (runStateAtLeast(c, SHUTDOWN))                  continue retry;              // else CAS failed due to workerCount change; retry inner loop          }      }  // 从这里创建一个工作者线程并运行该线程    boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {          w = new Worker(firstTask);          final Thread t = w.thread;          if (t != null) {              final ReentrantLock mainLock = this.mainLock;              mainLock.lock();              try {              // 检测线程池运行状态                if (isRunning(c) ||                      (runStateLessThan(c, STOP) && firstTask == null)) {                    // 检测线程状态,线程状态必须为 NEW                    if (t.getState() != Thread.State.NEW)                          throw new IllegalThreadStateException();                      // 将该工作者线程添加到工作者线程组中                    workers.add(w);                      workerAdded = true;                      int s = workers.size();                      if (s > largestPoolSize)                          largestPoolSize = s;                  }              } finally {                  mainLock.unlock();              }              // 如果工作者线程已经添加则运行该线程            if (workerAdded) {                  t.start();                  workerStarted = true;              }          }      } finally {          if (! workerStarted)              addWorkerFailed(w);      }      return workerStarted;  }折叠 

获取并执行任务

工作者线程获取任务主要有两个途径:

  1. 通过创建工作者线程时给予的
  2. 从任务队列中获取的
    期间会一直检查线程池的状态。如果遇到线程池停止了需要去确保当前线程的中断状态如果线程池没有终止也需要确保线程能够正常运行
final void runWorker(Worker w) {      Thread wt = Thread.currentThread();      Runnable task = w.firstTask;      w.firstTask = null;      w.unlock(); // allow interrupts      boolean completedAbruptly = true;      try {      // 有两种情况    // 1. 执行firstTask    // 2. 获取一个任务并执行        while (task != null || (task = getTask()) != null) {              w.lock();            // 如果线程池已经停止,确保当前线程是中断的            // 如果线程池正在运行,确保线程没有中断        // 第二次获取控制状态是为了确保线程池在关闭的过程中能够正常中断当前线程if ((runStateAtLeast(ctl.get(), STOP) ||   (Thread.interrupted() &&    runStateAtLeast(ctl.get(), STOP))) &&  !wt.isInterrupted())wt.interrupt();// 从这里开始执行任务            try {                  beforeExecute(wt, task);                  try {                      task.run();                      afterExecute(task, null);                  } catch (Throwable ex) {                      afterExecute(task, ex);                      throw ex;                  }              } finally {                  task = null;                // 增加任务执行数量,这里是线程封闭的,所以不需要考虑并发的情况                  w.completedTasks++;                  w.unlock();              }          }          completedAbruptly = false;      } finally {      // 线程自回收        processWorkerExit(w, completedAbruptly);      }  }

从任务队列中获取任务

工作者线程通过 getTask获取一个任务来执行

private Runnable getTask() {      boolean timedOut = false; // Did the last poll() time out?        for (;;) {          int c = ctl.get();            // Check if queue empty only if necessary.          // 检测线程池状态,队列不能为空        if (runStateAtLeast(c, SHUTDOWN)              && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {              decrementWorkerCount();              return null;          }  //         int wc = workerCountOf(c);            // 这一段用来表示核心线程是否受影响        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  // 工作者线程太多或者任务队列为空        if ((wc > maximumPoolSize || (timed && timedOut))              && (wc > 1 || workQueue.isEmpty())) {              // 减少工作者线程数量            if (compareAndDecrementWorkerCount(c))                  return null;              continue;          }  // 从队列中获取一个任务        try {          // 这一段体现了keepAliveTime的作用        // 超过keepAliveTime给定时间没有获取到任务,那么线程将会被清理/回收掉            Runnable r = timed ?                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                  workQueue.take();              if (r != null)                  return r;              timedOut = true;          } catch (InterruptedException retry) {              timedOut = false;          }      }  }

工作者线程回收

processWorkerExit 负责这一工作,具体流程如下:

  1. 从工作者线程集合中移除
  2. 尝试终止线程池
  3. 尝试通过创建一个工作者线程来替换当前线程,这种情况可能由以下原因当前被清除的线程可能由任务异常而退出没有工作者线程执行任务队列中没有任务
private void processWorkerExit(Worker w, boolean completedAbruptly) {      if (completed