线程池基础学习

发布于:2024-02-29 ⋅ 阅读:(40) ⋅ 点赞:(0)

作者简介:码上言


代表教程:Spring Boot + vue-element 开发个人博客项目实战教程


专栏内容:个人博客系统


我的文档网站:http://xyhwh-nav.cn/

线程池

什么是线程池?

或问:谈谈你对线程池的理解

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在 Java 中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是”池化资源”技术产生的原因。

线程池(Thread Pool)是一种基于池化思想管理线程的工具,它维护多个线程。在线程池中,总有几个活跃线程。当需要使用线程来执行任务时,可以从池子中随便拿一个空闲线程来用,当完成工作时,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

线程池有哪些好处?

或者提问:为什么要使用线程池?、有哪些作用?

线程:频繁的开启线程或者停止线程,线程需要从新被cpu从就绪到运行状态调度,需要发生cpu的上下文切换,效率非常低。

线程池:线程复用,控制最大的并发数,管理线程

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

来聊一聊JDK中的 Executors线程池吧

为了避免重复的创建线程,线程池的出现可以让线程进行复用。当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。

一个线程池包含以下四个组成部分:

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务。
  • 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务。
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等。
  • 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

JDK提供了哪些默认线程池的实现?

或问:Java有哪些实现线程池的方式

一共有4种自带的线程池。通过以下构造函数我们可以看到线程池内部都是使用ThreadPoolExecutor来创建线程池。

1、Executors.newCachedThreadPool()

创建一个可缓存线程的线程池,如果线程池长度超过处理需要,可灵活回收部分空闲(60 秒不执行任务)的线程;当任务数增加时,此线程池又可以自行添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
 }

2、Executors.newFixedThreadPool(2)

创建一个固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大值。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。如果希望在服务器上使用线程池,建议使用 newFixedThreadPool方法来创建线程池,这样能获得更好的性能。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

3、Executors.newScheduledThreadPool(3)

创建一个大小无限的线程池,此线程池支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

4、Executors.newSingleThreadExecutor()

创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

说一下为什么不使用Executors创建线程池

在《阿里巴巴JAVA开发手册》有这样一条强制规定:线程池不允许使用Executors去创建,而应该通过ThreadPoolExecutor方式,这样处理方式更加明确线程池运行规则,规避资源耗尽风险。Executors 返回的线程池对象的弊端如下:

(1) FixedThreadPoolSingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

(2) CachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

来说一下ThreadPoolExecutor创建线程池的几个常用参数?

ThreadPoolExecutor 是 Java 中的一个强大且灵活的线程池实现,它实现了 ExecutorService 接口。相比于 Executors 类中提供的简单线程池工厂方法,ThreadPoolExecutor 允许更灵活地配置线程池的各个参数,以满足不同场景的需求。

一般都说线程池的五个参数意义,其实他有七个,只不过一般都问五个(前5个参数)。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler, threadFactory, handler);
}

从上面的源码看出这就是我们自定义线程池所需要的参数,但是这些参数都是什么意思呢,才是面试的重要回答。

其实开发者都已经在方法上注明了,我们要适应阅读英文文档。

image240eacb110f54093.png

线程池ThreadPoolExecutor核心参数有哪些


2

  • corePoolSize:核心线程数量。不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。当线程中的线程数目达到corePoolSize之后,就会把到达的任务放到缓存队列中。

  • maximumPoolSize:线程池的最大线程数。线程池中最多允许创建 maximumPoolSize 个线程。此值必须大于等于1

  • keepAliveTime:多余的空闲线程的存活时间。经过 keepAliveTime 时间后,当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止。

    默认情况下,只有当线程池中线程数大于核心线程数corePoolSize时keepAliveTime才会起作用,直到线程池中的线程数不大于核心线程数时才会不起作用。

  • unit:keepAliveTime 的时间单位。

  • workQueue:任务队列,存放待执行的任务。当提交的任务数超过核心线程数大小后,再提交的任务就存放在这里。它仅仅用来存放被 execute 方法提交的 Runnable 任务。

  • threadFactory:线程工厂。这里面可以自定义线程名称,当进行虚拟机栈分析时,看着名字就知道这个线程是哪里来的,不会懵逼。

  • handler :拒绝策略:当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务。

那么这几个参数的值是设置的,如何得出来的?

或者问:线程池如何合理配置参数?

美团技术团队在《Java线程池实现原理及其在美团业务中的实践》这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

《Java并发编程实战》一书中给出的计算方式是这样的:

image9c780d29b1a5ca68.png

美团调研的现有解决方案列表

img

然后结合自己使用的线程参数说明一下,目前选用的是第二种方案,设置为 2*CPU 核心数,有点像是把任务都当做 IO 密集型去处理

美团给出了动态化配置的解决方案。

简单说明一下动态化线程池的核心设计:

1、简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。

2、参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。

3、增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。

图16 动态修改线程池参数新旧流程对比

线程池里的任务是IO密集型还是计算密集型的?

CPU 密集型简单理解就是利用 CPU 计算能力的任务,比如你在内存中对大量数据进行排序。一般这种场景的线程数设置为CPU核心数+1。

但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。一般线程数设置为 2*CPU核心数

线程池都有哪些状态

RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务。

SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。

STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。

TIDYING:所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。

TERMINATED:terminated()方法结束后,线程池的状态就会变成这个。

线程池创建的线程会一直在运行状态吗?

不会的,例如:配置核心线程数corePoolSize为2 、最大线程数maximumPoolSize 为5,我们可以通过配置超出corePoolSize核心线程数后创建的线程的存活时间例如为60s,在60s内没有核心线程一直没有任务执行,则会停止该线程。

说一下ThreadPoolExecutor线程池的工作流程?

或者:线程池底层ThreadPoolExecutor底层实现原理

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果正在运行的线程数量小于 corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果正在运行的线程数量大于等于corePoolSize ,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果正在运行的线程数量>= corePoolSize && 正在运行的线程数量 < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果正在运行的线程数量>= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常
  6. 当一个线程完成任务时,会从队列中取出下一个任务去执行。
  7. 当非核心线程没有任务可做时。并且超过了keepAliveTime这个时间了的话,线程池会判断,当前运行的线程数小于corePoolSize时,那么这个线程就会被回收掉,最后回收到coorePoolSize这个大小。

1

线程池满了有哪些拒绝策略,简单说一下。

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
JDK默认提供了四种拒绝策略:

  • AbortPolicy(默认)抛出异常,中止任务。抛出拒绝执行 RejectedExecutionException 异常信息。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
  • CallerRunsPolicy使用调用线程执行任务。当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。
  • DiscardPolicy: 直接丢弃,无任何操作,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种方案。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。

总结:

四种拒绝策略是相互独立无关的,选择何种策略去执行,还得结合具体的业务场景。实际工作中,一般直接使用 ExecutorService 的时候,都是使用的默认的 defaultHandler ,也即 AbortPolicy 策略。

线程池队列满了,任务会丢失吗?

如果队列满了,且任务总数 > 最大线程数则当前线程会走拒绝策略。

也可以自定义异拒绝异常,将该任务缓存到redis、本地文件、mysql中后期项目启动实现补偿。可以实现实现RejectedExecutionHandler接口,可自定义处理器。

线程池简单例子

实例代码如下:

  • 创建了一个ThreadPoolExecutor线程池,指定线程池的核心线程数为4
  • 循环生成10个线程任务,提交到ThreadPoolExecutor线程池中
  • 在任务中打印了线程的name和i的值。
   public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                4,
                8,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        for (int i = 0; i < 10; i++) {
            int data = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+ "->" +data);
                }
            });
        }
    }

结果:

pool-1-thread-1->0
pool-1-thread-1->4
pool-1-thread-1->5
pool-1-thread-1->6
pool-1-thread-3->2
pool-1-thread-2->1
pool-1-thread-4->3
pool-1-thread-7->9
pool-1-thread-5->7
pool-1-thread-6->8

从上述的示例代码结果中可以看到pool-1-tread-这个就是线程池中线程的名字,在例子中定义了线程池核心线程数为4个,最大为8个,而我们循环创建了10个任务,所以在运行结果中可以看到部分线程池中的线程复用了。这就是线程池的特性,可以实现线程的复用,不用为每一个线程任务单独创建线程。

线程池底层是如何实现复用的

线程池的核心功能就是实现了线程的重复利用,那么线程池是如何实现线程的复用呢?

在示例结果中可以发现,创建的10个线程任务,提交到了线程池,执行时永远都只会打印了线程池中最大的8个执行线程的名字。说明线程池中,有一组8个线程,不断的在运行着,当有线程任务提交到线程时,线程池空闲的线程就会将提交的任务放到当前执行。提交进来的线程任务,虽然是实现了runnable接口,但是在线程池中它们不会作为线程进行调用,而是被当做了简单一个Java对象,在执行任务中,调用的是线程任务的run方法而已。

进入到threadPoolExecutor调用的execute方法中。

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        // 如果运行的线程少于核心线程数,请尝试以给定的命令作为第一个任务启动新线程。
        if (workerCountOf(c) < corePoolSize) {
            //对addworker的调用以原子方式检查runstate和workercount,因此可以通过返回false来防止在不应该添加线程时出现错误警报。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来已有的线程已死亡),或者池是否在进入此方法后关闭。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 我们重新检查状态,如果停止,则回滚排队
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 如果当前的线程池没有线程,则启动新线程。
                addWorker(null, false);
        }
       // 如果无法将任务排队,则尝试添加新线程。如果失败了,我们知道我们被关闭或饱和,所以拒绝任务。
        else if (!addWorker(command, false))
            reject(command);
    }

其中调用的addWorker是线程中添加工作线程的核心代码,主要实现的步骤如下:

  • 增加ctl的值,如果满足条件则 尝试更新worker线程的总数,如果失败则调到retry:进行重试。
  • 新建Worker实例,加锁,判断线程池状态正常则向workers中添加新的工作线程worker。

添加工作线程的源码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 增加ctl的值,如果满足条件则新增worker的个数
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 检查线程池的状态和workQueue和firstTask是否为null
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果core为true,则工作线程池与corePoolSize值比,否则与最小线程数比较
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               // 尝试更新worker的总数,如果失败则调到retry:进行重试
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建Worker实例 
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加锁
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
					// 判断线程池的状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将worker加入线程池的工作线程中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker继承了AQS,也就是有一定锁的特性。代码如下:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
}

Worker类中,runWorker方法就是线程池中的工作线程最核心的代码,在创建Worker对象的时候,会把线程和任务一起封装到Worker内部,然后调用runWorker方法来让线程执行任务。源码如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //这个是我们提交的线程任务对象
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果Worker中需要处理的Task不为空,则继续往下执行。
            while (task != null || (task = getTask()) != null) {
                // 工作线程加锁,确保只处理当前的任务
                w.lock();
                //如果池正在停止,请确保线程已中断;否则,请确保线程未中断。这需要在第二种情况下重新检查,以便在清除中断时处理shutdownnow race
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 我们的任务的实际执行,只是对提交给线程池的Runnable,其实最终在工作线程中,对这个Runnable的任务只是当做了一个简单的对象,调用的run方法而不是start方法。
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看出线程执行完任务不会退出的原因是runWorker内部使用了while死循环,当第一个任务执行完之后,会不断地通过getTask方法获取任务,只要能获取到任务,就会调用run方法,继续执行任务,这就是线程能够复用的主要原因。但是如果从getTask获取不到方法的时候,最后就会调用finally中的processWorkerExit方法,来将线程退出。

在线程池源码中如何判断线程有没有在执行任务

在Worker类中继承了AbstractQueuedSynchronizer(AQS),每次在执行任务之前都会调用Worker的lock方法,执行完任务之后,会调用unlock方法,这样做的目的就可以通过Woker的加锁状态就能判断出当前线程是否正在运行任务。如果想知道线程是否正在运行任务,只需要调用Woker的tryLock方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了,在调用shutdown方法关闭线程池的时候,就用这种方式来判断线程有没有在执行任务,如果没有的话,来尝试打断没有执行任务的线程。

线程池是如何关闭的

线程池提供了shutdownshutdownNow两个方法来关闭线程池。

shutdown方法:就是将线程池的状态修改为SHUTDOWN,然后尝试打断空闲的线程。

shutdownNow方法:就是将线程池的状态修改为STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么shutdownNow不能执行剩余任务的原因。

所以也可以看出shutdown方法和shutdownNow方法的主要区别就是,shutdown之后还能处理在队列中的任务,shutdownNow直接就将任务从队列中移除,线程池里的线程就不再处理了。

线程池可以有哪些监控

线程池本身提供了一些方法来获取线程池的运行状态。

  • getCompletedTaskCount:已经执行完成的任务数量

  • getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。

  • getActiveCount:获取正在执行任务的线程数据

  • getPoolSize:获取当前线程池中线程数量的大小

除此之外可以在runWorker方法里面,在执行任务之前会回调beforeExecute方法,执行任务之后会回调afterExecute方法,而这些方法默认都是空实现,你可以自己继承ThreadPoolExecutor来扩展重写这些方法,来实现自己想要的功能。

线程池是如何回收线程的?

Java 中的线程池通过线程池的维护者(ThreadPoolExecutor)来进行线程的回收。线程池的回收主要发生在两个方面:

空闲线程的回收: 如果线程池中的线程处于空闲状态(即没有执行任务且在空闲时间超过一定阈值),则这些空闲线程可能会被回收。线程池中的核心线程数在空闲时不会被回收,但超过核心线程数的线程如果在一定时间内没有执行任务,可能会被回收。

超时线程的回收: 当线程池中的线程数超过核心线程数,并且工作队列已满,新的任务无法立即执行,那么超过核心线程数的线程可能会被保留一段时间以等待新任务,但如果在一定时间内没有新任务提交,这些超过核心线程数的线程也可能被回收。

线程池如何知道一个线程的任务已经执行完成

1、在线程池内部,当我们把一个任务丢给线程池去执行,线程池会调度工作线程来执行这个任务的 run 方法,run 方法正常结束,也就意味着任务完成了。所以线程池中的工作线程是通过同步调用任务的 run()方法并且等待 run 方法返回后,再去统计任务的完成数量

2、如果想在线程池外部去获得线程池内部任务的执行状态,有以下几种方法可以实现:

  • 使用isTerminated()(不推荐使用)

    线程池提供了一个 isTerminated()方法,可以判断线程池的运行状态,我们可以循环判断 isTerminated()方法的返回结果来了解线程池的运行状态,一旦线程池的运行状态是 Terminated,意味着线程池中的所有任务都已经执行完了。想要通过这个方法获取状态的前提是,程序中主动调用了线程池的 shutdown() 方法。在实际业务中,一般不会主动去关闭线程池,因此这个方法在实用性和灵活性方面都不是很好。

  • submit()

    在使用 ThreadPoolExecutor 时,任务可以通过 Future 对象来获取执行结果。Future 是一个接口,它表示一个异步计算的结果。任务提交给线程池时,可以使用 Callable 接口来表示具有返回值的任务。submit 方法提交任务并返回一个 Future 对象,通过这个对象可以获取任务执行的结果。通过Future,线程池可以判断任务是否完成,也可以获取任务的执行结果。代码如下:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
    
    Future<Integer> future = executor.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // 执行具体任务,返回结果
            return 42;
        }
    });
    
    // 判断任务是否完成
    if (future.isDone()) {
        // 任务已完成
        // 获取任务的执行结果
        Integer result = future.get();
        // 处理结果
    }
    
    
  • CountDownLatch 计数器

    过初始化指定一个计数器进行倒计时,其中有两个方法分别是 await()阻塞线程,以及 countDown() 进行倒计时,一旦倒计时归零,所以被阻塞在 await()方法的线程都会被释放。

    定义一个 CountDownLatch 对象并且计数器为 1接着在线程池代码块后面调用 await()方法阻塞主线程,**然后,当传入到线程池中的任务执行完成后,调用countDown()方法表示任务执行结束。**最后,计数器归零 0,唤醒阻塞在 await()方法的线程。

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
                4,
                8,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        CountDownLatch countDownLatch = new CountDownLatch(1);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 开始执行任务
                try {
                    Thread.sleep(3000); // 模拟任务执行时间
                    countDownLatch.countDown(); // 任务执行结束后,计数器减1
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 阻塞main线程 当任务执行结束调用countDown()方法使得计数器归零后,唤醒主线程。
        countDownLatch.await();
        executor.shutdown();
    }
}

不管是线程池内部还是外部,要想知道线程是否执行结束,我们必须要获取线程执行结束后的状态,而线程本身没有返回值,所以只能通过阻塞-唤醒的方式来实现。

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到