Java线程池ThreadPoolExecutor
一、线程池特性
1.动态调整线程数量
根据当前的工作负载,ThreadPoolExecutor 可以动态地增加或减少工作线程的数量。当有新的任务提交且当前线程数小于核心线程数时,会创建新的线程;如果已有足够的线程,则将任务放入队列中。如果队列已满并且线程数未达到最大值,还会创建额外的线程来处理任务。
2.队列管理
ThreadPoolExecutor 支持多种类型的阻塞队列,如 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。选择合适的队列类型可以影响线程池的行为和性能。
例如,使用无界队列(如 LinkedBlockingQueue)会导致所有超出核心线程数的任务都被排队,而不会立即创建新的线程;相反,SynchronousQueue 则强制每个任务都由一个可用线程立即处理,否则会被拒绝。
3.拒绝策略
当线程池无法接受新任务时(比如因为线程数已达上限并且队列也满了),会触发拒绝策略。Java 提供了几种内置的拒绝策略实现,也可以自定义拒绝策略:
- AbortPolicy:抛出 RejectedExecutionException。
- CallerRunsPolicy:由调用线程(提交任务的线程)执行该任务。
- DiscardPolicy:静默丢弃任务。
- DiscardOldestPolicy:丢弃队列中最老的任务,并尝试重新提交当前任务。
4.生命周期管理
ThreadPoolExecutor 提供了 shutdown() 和 shutdownNow() 方法来优雅地关闭线程池。前者会等待所有已提交的任务完成后再关闭,后者则试图立即停止所有正在执行的任务,并返回尚未开始的任务列表。
二、参数介绍和基本使用
1.参数介绍
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程池中保持的核心线程数,即使这些线程是空闲的。
- maximumPoolSize:线程池允许的最大线程数。
- keepAliveTime:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的时间。
- unit:keepAliveTime参数的时间单位。
- workQueue:用于保存等待执行的任务的阻塞队列。
- threadFactory:用于创建新线程的工厂。
- handler:当任务提交到已满的线程池时所使用的拒绝策略。
2.基本使用
public class ThreadPoolExample {
public static void main(String[] args) {
// 定义线程池参数
int corePoolSize = 2; // 核心线程数
int maximumPoolSize = 4; // 最大线程数
long keepAliveTime = 5000; // 空闲线程存活时间
TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10); // 工作队列
// 创建 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue
);
// 提交任务给线程池执行
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
System.out.println("Executing Task " + taskNumber + " by " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {//等待60秒
executor.shutdownNow();//不管线程池中的任务是否完成,都直接中断掉
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
二、线程池状态
1.线程池状态类型
ThreadPoolExecutor 内部维护了一个状态机来跟踪线程池的不同状态。包含:
- 运行(RUNNING)
- 关闭(SHUTDOWN)
- 停止(STOP)
- 整理(TIDYING)
- 终结(TERMINATED)
2.线程池状态实现
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始化 ctl 的值,表示线程池处于运行状态并且当前没有活动的工作线程。
private static final int COUNT_BITS = Integer.SIZE - 3;//表示用于存储线程计数(workerCount)的位数。由于 Java 的 int 类型有 32 位(Integer.SIZE),这里减去 3 位留给状态标志,因此留下 29 位用于线程计数。如果线程数量不满足,则可以将类型改为long类型以及AtomicInteger改了AtomicLong
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//00011111111111111111111111111111 这是一个掩码,用于从 ctl 中提取出线程计数值。它由 COUNT_BITS 位全为 1 组成,即低 29 位为 1。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//当线程池处于运行状态时,高 3 位设置为 -1 的二进制补码形式(即 111...000)。
private static final int SHUTDOWN = 0 << COUNT_BITS;//线程池正在关闭,不再接受新任务,但会继续处理队列中的任务,高 3 位为 000。
private static final int STOP = 1 << COUNT_BITS;//线程池已停止,不再接受新任务,也不再处理队列中的任务,高 3 位为 001。
private static final int TIDYING = 2 << COUNT_BITS;//所有任务都已完成,线程池即将进入终结状态,高 3 位为 010。
private static final int TERMINATED = 3 << COUNT_BITS;//线程池已经完全终止,所有资源都被释放,高 3 位为 011。
// Packing and unpacking ctl
/**
* COUNT_MASK=00011111111111111111111111111111 取反之后 11100000000000000000000000000000 既为RUNNING初始状态
* c & ~COUNT_MASK 与运算得到线程池当前的状态;可以这样理解:当前ctl的值取二进制高三位进行比对,得到的结果就是当前ctl代表的线程池状态
*
*/
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
/**
* COUNT_MASK=00011111111111111111111111111111
* c & COUNT_MASK 去除高三位状态,相当于重置高三位的二进制为0,后29位二进制就是当前线程数量
*/
private static int workerCountOf(int c) { return c & COUNT_MASK; }
/**
* 或运算运算,例如 00011111111111111111111111111111 | 11100000000000000000000000000000 = 11111111111111111111111111111111
* 这样理解,高三位表示状态|低29位表示线程数量,这个时候只要用一个int类型存储,那么将高三位的二进制和低29位的二进制合并一起为一个数,就是ctl的值
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
三、封装线程池工具类
package com.zzc.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadPoolUtils {
private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;
private static final String COMMON_THREAD_POOL_KEY;
private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;
private static final int DEFAULT_CORE_POOL_SIZE = 1;
private static final int DEFAULT_KEEP_ALIVE_TIME = 30;
private static final int DEFAULT_PROCESSORS = 2;
private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;
static {
THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
COMMON_THREAD_POOL_KEY = "COMMON";
COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
int processors = Runtime.getRuntime().availableProcessors();
ALB_PROCESSORS = processors;
log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
}
// private static Executor executor = Executors.newFixedThreadPool()
/**
*
* @param threadPoolKey
* @param corePoolSize
* @param maxPoolSize
* @param keepAliveTime
* @param timeUnit
* @param discardContinueWait 如果被拒绝,则等待时间,单位ms
* @return
*/
public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
int discardContinueWait) {
return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
}
public static ThreadPoolExecutor newThreadPoolExecutorNewThreadToRun(String threadPoolKey, int corePoolSize,
int maxPoolSize, int keepAliveTime, TimeUnit timeUnit) {
return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new SynchronousQueue(true), new NewThreadToRun());
}
public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
if (StrUtils.isBlank(threadPoolKey)) {
throw new RuntimeException("threadPoolKey is null");
}
if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
return THREAD_POOL_EXECUTOR.get(threadPoolKey);
}
log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
return executor;
}
public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
if (executorService == null) {
executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
}
return executorService;
}
public static ThreadFactory newThreadFactory(String threadPrefix) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(threadPrefix);
thread.setDaemon(true);
log.info("new thread:{}", threadPrefix);
return thread;
}
};
return threadFactory;
}
/**
* 拒绝策略
* 直接给主线程自己执行
*/
static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String threadKey = "";
if (e instanceof PThreadPoolExecutor) {
threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
}
if (r instanceof Thread) {
log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
} else {
log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
}
super.rejectedExecution(r, e);
}
}
/**
* 拒绝策略 -- activateMQ的做法
* 重新尝试加入队列,等待超时,影响线程执行效率
*/
static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {
private long discardContinueWait = 1;
public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
if (discardContinueWait <= 0) {
discardContinueWait = 1;
}
this.discardContinueWait = discardContinueWait;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String threadKey = "";
if (executor instanceof PThreadPoolExecutor) {
threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
}
if (r instanceof Thread) {
log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
} else {
log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
}
if (!executor.isShutdown()) {
try {
executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
log.error("rejectedExecution", e);
}
executor.execute(r);
}
}
}
/**
* 拒绝策略 -- netty的做法
* 创建新的线程,直接执行,直到系统创建不了新的线程为止
*/
static class NewThreadToRun implements RejectedExecutionHandler {
public NewThreadToRun() {
super();
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String threadKey = "";
if (executor instanceof PThreadPoolExecutor) {
threadKey = ((PThreadPoolExecutor) executor).getThreadPoolKey();
}
if (r instanceof Thread) {
log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
} else {
log.warn("NewThreadToRun threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
}
try {
final Thread thread = new Thread(r, "new thread: " + threadKey);
thread.start();
} catch (Exception e) {
throw new RejectedExecutionException("Failed to start new thread. threadKey:" + threadKey, e);
}
}
}
/**
* 线程池,添加添加一些日志打印
*/
static class PThreadPoolExecutor extends ThreadPoolExecutor {
private String threadPoolKey;
public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.threadPoolKey = threadPoolKey;
}
@Override
public void execute(Runnable command) {
try {
super.execute(command);
} catch (Exception e) {
log.error("execute error.", e);
}
log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
}
super.afterExecute(r, t);
}
public String getThreadPoolKey() {
return threadPoolKey;
}
}
}