Java多线程从入门到精通

发布于:2025-06-09 ⋅ 阅读:(25) ⋅ 点赞:(0)

一、基础概念

1.1 进程与线程

        进程是指运行中的程序。 比如我们使用浏览器,需要启动这个程序,操作系统会给这个程序分配一定的资源(占用内存资源)。

        线程是CPU调度的基本单位,每个线程执行的都是某一个进程的代码的某个片段。

1.2 多线程

多线程是指:单个进程中同时运行多个线程。

多线程的不低是为了提高CPU的利用率。

可以通过避免一些网络IO或者磁盘IO等需要等待的操作,让CPU去调度其他线程。

多线程的局限

  • 如果线程数量特别多,CPU在切换线程上下文时,会额外造成很大的消耗。
  • 任务的拆分需要依赖业务场景,有一些异构化的任务,很难对任务拆分,还有很多业务并不是多线程处理更好。
  • 线程安全问题:虽然多线程带来了一定的性能提升,但是再做一些操作时,多线程如果操作临界资源,可能会发生一些数据不一致的安全问题,甚至涉及到锁操作时,会造成死锁问题。

1.3 串行、并行、并发

串行就是一个一个排队,第一个做完,第二个才能上。

并行就是同时处理。(一起上!!!)

多线程中的并发概念(CPU调度线程的概念)。CPU在极短的时间内,反复切换执行不同的线程,看似好像是并行,但是只是CPU高速的切换。

并行囊括并发。

并行就是多核CPU同时调度多个线程,是真正的多个线程同时执行。

单核CPU无法实现并行效果,单核CPU是并发。

二、线程的创建

2.1 继承Thread类 重写run方法

启动线程是调用start方法,这样会创建一个新的线程,并执行线程的任务。

如果直接调用run方法,这样会让当前线程执行run方法中的业务逻辑。

public class MiTest {

    public static void main(String[] args) {
        MyJob t1 = new MyJob();
        t1.start();
        for (int i = 0; i < 100; i++) {
            System.out.println("main:" + i);
        }
    }

}
class MyJob extends Thread{
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("MyJob:" + i);
        }
    }
}

2.2 实现Runnable接口 重写run方法

public class MiTest {

    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        Thread t1 = new Thread(myRunnable);
        t1.start();
        for (int i = 0; i < 1000; i++) {
            System.out.println("main:" + i);
        }
    }

}

class MyRunnable implements Runnable{

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            System.out.println("MyRunnable:" + i);
        }

    }
}

匿名内部类方式:

Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            System.out.println("匿名内部类:" + i);
        }
    }
});

lambda方式:

Thread t2 = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        System.out.println("lambda:" + i);
    }
});

2.3 实现Callable 重写call方法,配合FutureTask

Callable一般用于有返回结果的非阻塞的执行方法。

public class MiTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建MyCallable
        MyCallable myCallable = new MyCallable();
        //2. 创建FutureTask,传入Callable
        FutureTask futureTask = new FutureTask(myCallable);
        //3. 创建Thread线程
        Thread t1 = new Thread(futureTask);
        //4. 启动线程
        t1.start();
        //5. 做一些操作
        //6. 要结果
        Object count = futureTask.get();
        System.out.println("总和为:" + count);
    }
}

class MyCallable implements Callable{

    @Override
    public Object call() throws Exception {
        int count = 0;
        for (int i = 0; i < 100; i++) {
            count += i;
        }
        return count;
    }
}

三、线程的使用

3.1 线程的状态

NEW:Thread对象被创建出来了,但是还没有执行start方法。

RUNNABLE:Thread对象调用了start方法,就为RUNNABLE状态(CPU调度/没有调度)

WAITING:可以理解为是阻塞、等待状态,因为处在这三种状态下,CPU不会调度当前线程

WAITING:调用wait方法就会处于WAITING状态,需要被手动唤醒

TERMINATED:run方法执行完毕,线程生命周期到头了

3.2 线程的常用方法

3.2.1 获取当前线程

Thread的静态方法获取当前线程对象:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	// 获取当前线程的方法
    Thread main = Thread.currentThread();
    System.out.println(main);
    // "Thread[" + getName() + "," + getPriority() + "," +  group.getName() + "]";
    // Thread[main,5,main]
}

3.2.2 线程的名字

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Thread t1 = new Thread(() -> {
        System.out.println(Thread.currentThread().getName());
    });
    t1.setName("模块-功能-计数器");
    t1.start();
}

3.2.3 线程的优先级

低到高:1-10

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 1000; i++) {
            System.out.println("t1:" + i);
        }
    });
    Thread t2 = new Thread(() -> {
        for (int i = 0; i < 1000; i++) {
            System.out.println("t2:" + i);
        }
    });
    t1.setPriority(1);
    t2.setPriority(10);
    t2.start();
    t1.start();
}

3.2.4 线程的让步

可以通过Thread的静态方法yield,让当前线程从运行状态转变为就绪状态。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 100; i++) {
            if(i == 50){
                Thread.yield();
            }
            System.out.println("t1:" + i);
        }
    });
    Thread t2 = new Thread(() -> {
        for (int i = 0; i < 100; i++) {
            System.out.println("t2:" + i);
        }
    });
    t2.start();
    t1.start();
}

3.2.5 线程的休眠

Thread的静态方法,让线程从运行状态转变为等待状态

sleep有两个方法重载:

  • 第一个就是native修饰的,让线程转为等待状态的效果
public static native void sleep(long millis) throws InterruptedException;
  • 第二个是可以传入毫秒和一个纳秒的方法(如果纳秒值大于等于0.5毫秒,就给休眠的毫秒值+1。如果传入的毫秒值是0,纳秒值不为0,就休眠1毫秒)
 public static void sleep(long millis, int nanos)
    throws InterruptedException {
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (nanos < 0 || nanos > 999999) {
            throw new IllegalArgumentException(
                                "nanosecond timeout value out of range");
        }

        if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
            millis++;
        }

        sleep(millis);
    }
public static void main(String[] args) throws InterruptedException {
    System.out.println(System.currentTimeMillis());
    Thread.sleep(1000);
    System.out.println(System.currentTimeMillis());
}

3.2.6 线程的强占

某一个线程下去调用Thread的非静态方法join方法。

如果在main线程中调用了t1.join(),那么main线程会进入到等待状态,需要等待t1线程全部执行完毕,在恢复到就绪状态等待CPU调度。

如果在main线程中调用了t1.join(2000),那么main线程会进入到等待状态,需要等待t1执行2s后,在恢复到就绪状态等待CPU调度。如果在等待期间,t1已经结束了,那么main线程自动变为就绪状态等待CPU调度。

public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            System.out.println("t1:" + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    t1.start();
    for (int i = 0; i < 10; i++) {
        System.out.println("main:" + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (i == 1){
            try {
                t1.join(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

3.2.7 线程的等待和唤醒

获取synchronized锁资源的线程通过wait方法进去到锁的等待池,并且会释放锁资源。

可以让获取synchronized锁资源的线程,通过notify或者notifyAll方法,将等待池中的线程唤醒,添加到锁池中。

notify随机的唤醒等待池中的一个线程到锁池。

notifyAll将等待池中的全部线程都唤醒,并且添加到锁池。

在调用wait方法和notify以及norifyAll方法时,必须在synchronized修饰的代码块或者方法内部才可以,因为要操作基于某个对象的锁的信息维护。

public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        sync();
    },"t1");

    Thread t2 = new Thread(() -> {
        sync();
    },"t2");
    t1.start();
    t2.start();
    Thread.sleep(12000);
    synchronized (MiTest.class) {
        MiTest.class.notifyAll();
    }
}

public static synchronized void sync()  {
    try {
        for (int i = 0; i < 10; i++) {
            if(i == 5) {
                MiTest.class.wait();
            }
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName());
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

四、锁

4.1锁的分类

1 可重入锁、不可重入锁

Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是可重入锁。

重入:当前线程获取到A锁,在获取之后尝试再次获取A锁是可以直接拿到的。

不可重入:当前线程获取到A锁,在获取之后尝试再次获取A锁,无法获取到的,因为A锁被当前线程占用着,需要等待自己释放锁再获取锁。

2 乐观锁、悲观锁

Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是悲观锁。

Java中提供的CAS操作,就是乐观锁的一种实现。

悲观锁:获取不到锁资源时,会将当前线程挂起(进入BLOCKED、WAITING),线程挂起会涉及到用户态和内核的太的切换,而这种切换是比较消耗资源的。

  • 用户态:JVM可以自行执行的指令,不需要借助操作系统执行。
  • 内核态:JVM不可以自行执行,需要操作系统才可以执行。

乐观锁:获取不到锁资源,可以再次让CPU调度,重新尝试获取锁资源。

Atomic原子性类中,就是基于CAS乐观锁实现的。

3 公平锁、非公平锁

Java中提供的synchronized只能是非公平锁。

Java中提供的ReentrantLock,ReentrantReadWriteLock可以实现公平锁和非公平锁

公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,锁被A持有,同时线程B在排队。直接排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。

非公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,先尝试竞争一波

  • 拿到锁资源:开心,插队成功。
  • 没有拿到锁资源:依然要排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。

4 互斥锁、共享锁

Java中提供的synchronized、ReentrantLock是互斥锁。

Java中提供的ReentrantReadWriteLock,有互斥锁也有共享锁。

互斥锁:同一时间点,只会有一个线程持有者当前互斥锁。

共享锁:同一时间点,当前共享锁可以被多个线程同时持有。

4.2synchronized

synchronized的锁是基于对象实现的,使用一般就是同步方法和同步代码块。

synchronized的优化:

锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,会触发锁消除,你即便写了synchronized,他也不会触发。

锁膨胀:如果在一个循环中,频繁的获取和释放做资源,这样带来的消耗很大,锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。

锁升级:ReentrantLock的实现,是先基于乐观锁的CAS尝试获取锁资源,如果拿不到锁资源,才会挂起线程。

  • 无锁、匿名偏向:当前对象没有作为锁存在。
  • 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放,那么这个线程过来,只需要判断,当前指向的线程是否是当前线程 。
    • 如果是,直接拿着锁资源走。
    • 如果当前线程不是,基于CAS的方式,尝试将偏向锁指向当前线程。如果获取不到,触发锁升级,升级为轻量级锁。(偏向锁状态出现了锁竞争的情况)
  • 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源(采用的是自适应自旋锁
    • 如果成功获取到,拿着锁资源走
    • 如果自旋了一定次数,没拿到锁资源,锁升级。
  • 重量级锁:就是最传统的synchronized方式,拿不到锁资源,就挂起当前线程。(用户态&内核态)

偏向锁在升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点(STW),才可以做偏向锁撤销,在明知道有并发情况,就可以选择不开启偏向锁,或者是设置偏向锁延迟开启。

因为JVM在启动时,需要加载大量的.class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有一个延迟4s开启偏向锁的操作

如果正常开启偏向锁了,那么不会出现无锁状态,对象会直接变为匿名偏向。

4.3ReentrantLock

  • ReentrantLock是个类,synchronized是关键字,当然都是在JVM层面实现互斥锁的方式

效率区别:

  • 如果竞争比较激烈,推荐ReentrantLock去实现,不存在锁升级概念。而synchronized是存在锁升级概念的,如果升级到重量级锁,是不存在锁降级的。

底层实现区别:

  • 实现原理是不一样,ReentrantLock基于AQS实现的,synchronized是基于ObjectMonitor

功能向的区别:

  • ReentrantLock的功能比synchronized更全面。
    • ReentrantLock支持公平锁和非公平锁
    • ReentrantLock可以指定等待锁资源的时间。

非公平锁的流程

释放锁

五、阻塞队列

生产者 消费者彼此之间不会直接通讯的,而是通过一个容器(队列)进行通讯。

所以生产者生产完数据后扔到容器中,不通用等待消费者来处理。

消费者不需要去找生产者要数据,直接从容器中获取即可。

而这种容器最常用的结构就是队列。

5.1ArrayBlockingQueue

ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。

因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。

public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
    // 必须设置队列的长度
    ArrayBlockingQueue queue = new ArrayBlockingQueue(4);

    // 生产者扔数据
    queue.add("1");
    queue.offer("2");
    queue.offer("3",2,TimeUnit.SECONDS);
    queue.put("2");

    // 消费者取数据
    System.out.println(queue.remove());
    System.out.println(queue.poll());
    System.out.println(queue.poll(2,TimeUnit.SECONDS));
    System.out.println(queue. Take());
}

生产者方法实现原理

add方法实现

add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        // 抛出的异常
        throw new IllegalStateException("Queue full");
}

offer方法实现

public boolean offer(E e) {
    // 要求存储的数据不允许为null,为null就抛出空指针
    checkNotNull(e);
    // 当前阻塞队列的lock锁
    final ReentrantLock lock = this.lock;
    // 为了保证线程安全,加锁
    lock.lock();
    try {
        // 如果队列中的元素已经存满了,
        if (count == items.length)
            // 返回false
            return false;
        else {
            // 队列没满,执行enqueue将元素添加到队列中
            enqueue(e);
            // 返回true
            return true;
        }
    } finally {
        // 操作完释放锁
        lock.unlock();
    }
}

//==========================================================
private void enqueue(E x) {
    // 拿到数组的引用
    final Object[] items = this.items;
    // 将元素放到指定位置
    items[putIndex] = x;
    // 对inputIndex进行++操作,并且判断是否已经等于数组长度,需要归位
    if (++putIndex == items.length)
        // 将索引设置为0
        putIndex = 0;
    // 元素添加成功,进行++操作。
    count++;
    // 将一个Condition中阻塞的线程唤醒。
    notEmpty.signal();
}

offer(time,unit)方法

生产者在添加数据时,如果队列已经满了,阻塞一会。

  • 阻塞到消费者消费了消息,然后唤醒当前阻塞线程
  • 阻塞到了time时间,再次判断是否可以添加,不能,不添加。
// 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	// 非空检验
    checkNotNull(e);
    // 将时间单位转换为纳秒
    long nanos = unit.toNanos(timeout);
    // 加锁
    final ReentrantLock lock = this.lock;
    // 允许线程中断并排除异常的加锁方式
    lock.lockInterruptibly();
    try {
        // 为什么是while(虚假唤醒)
        // 如果元素个数和数组长度一致,队列慢了
        while (count == items.length) {
            // 判断等待的时间是否还充裕
            if (nanos <= 0)
                // 不充裕,直接添加失败
                return false;
            // 挂起等待,会同时释放锁资源(对标sync的wait方法)
            // awaitNanos会挂起线程,并且返回剩余的阻塞时间
            // 恢复执行时,需要重新获取锁资源
            nanos = notFull.awaitNanos(nanos);
        }
        // 说明队列有空间了,enqueue将数据扔到阻塞队列中
        enqueue(e);
        return true;
    } finally {
        // 释放锁资源
        lock.unlock();
    }
}

put方法

如果队列是满的, 就一直挂起,直到被唤醒,或者被中断。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // await方法一直阻塞,直到被唤醒或者中断标记位
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

消费者方法实现原理

remove方法

// remove方法就是调用了poll
public E remove() {
    E x = poll();
    // 如果有数据,直接返回
    if (x != null)
        return x;
    // 没数据抛出异常
    else
        throw new NoSuchElementException();
}

poll方法

// 拉取数据
public E poll() {
    // 加锁操作
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果没有数据,直接返回null,如果有数据,执行dequeue,取出数据并返回
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//==========================================================
// 取出数据
private E dequeue() {
    // 将成员变量引用到局部变量
    final Object[] items = this.items;
    // 直接获取指定索引位置的数据
    E x = (E) items[takeIndex];
    // 将数组上指定索引位置设置为null
    items[takeIndex] = null;
    // 设置下次取数据时的索引位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 对count进行--操作
    count--;
    // 迭代器内容,先跳过
    if (itrs != null)
        itrs.elementDequeued();
    // signal方法,会唤醒当前Condition中排队的一个Node。
    // signalAll方法,会将Condition中所有的Node,全都唤醒
    notFull.signal();
    // 返回数据。
    return x;
}

poll(time, unit)方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 转换时间单位
    long nanos = unit.toNanos(timeout);
    // 竞争锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果没有数据
        while (count == 0) {
            if (nanos <= 0)
                // 没数据,也无法阻塞了,返回null
                return null;
            // 没数据,挂起消费者线程
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take()方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 虚假唤醒
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

六、线程池

6.1构建线程池

6.1.1newFixedThreadPool

        这个线程池的线程数是固定的,在创建时指定。构建方法:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

        构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着人任务的提交才会将线程在线程池中国构建出来)。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。

6.1.2newSingleThreadExecutor

        单例线程池,线程池中只有一个工作线程在处理任务,业务涉及到顺序消费可以使用:

// 当前这里就是构建单例线程池的方式
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        // 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1
        // 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中
        // 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序
        // 当前这种线程池可以用作顺序处理的一些业务中
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
    // 线程池的使用没有区别,跟正常的ThreadPoolExecutor没区别
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    // finalize是当前对象被GC干掉之前要执行的方法
    // 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前
    // 可以执行shutdown,shutdown方法是将当前线程池停止,并且干掉工作线程
    // 但是不能基于这种方式保证线程池一定会执行shutdown
    // finalize在执行时,是守护线程,这种线程无法保证一定可以执行完毕。
    // 在使用线程池时,如果线程池是基于一个业务构建的,在使用完毕之后,一定要手动执行shutdown,
    // 否则会造成JVM中一堆线程
    protected void finalize() {
        super.shutdown();
    }
}

6.1.3newCachedThreadPool

构建方式:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

当第一次提交任务到线程池时,会直接构建一个工作线程。

这个工作线程带执行完人后,60秒没有任务可以执行后,会结束。

如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行。

如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。

最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理。

6.1.4newScheduleThreadPool

        是一个定时任务的线程,可以以一个周期去执行一个任务,或者延迟多久执行一个任务一次。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

        在原来的线程池基础上实现了定时任务的功能,原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。

6.2ThreadPoolExecutor应用

        ThreadPoolExecutor提供的七个核心参数:
public ThreadPoolExecutor(
    int corePoolSize,           // 核心工作线程(当前任务执行结束后,不会被销毁)
    int maximumPoolSize,        // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
    long keepAliveTime,         // 非核心工作线程在阻塞队列位置等待的时间
    TimeUnit unit,              // 非核心工作线程在阻塞队列位置等待时间的单位
    BlockingQueue<Runnable> workQueue,   // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
    ThreadFactory threadFactory,         // 构建线程的线程工作,可以设置thread的一些信息
    RejectedExecutionHandler handler) {  // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
    // 初始化线程池的操作
}
        拒绝策略:

AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
//默认的拒绝策略 
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

 

CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操作

private static class MyRejectedExecution implements RejectedExecutionHandler{
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("根据自己的业务情况,决定编写的代码!");
    }
}

代码构建线程池,并处理有无返回结果的任务:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //1. 构建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2,
            5,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(5),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("test-ThreadPoolExecutor");
                    return thread;
                }
            },
            new MyRejectedExecution()
    );

    //2. 让线程池处理任务,没返回结果
    threadPool.execute(() -> {
        System.out.println("没有返回结果的任务");
    });

    //3. 让线程池处理有返回结果的任务
    Future<Object> future = threadPool.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("我有返回结果!");
            return "返回结果";
        }
    });
    Object result = future.get();
    System.out.println(result);

    //4. 如果是局部变量的线程池,记得用完要shutdown
    threadPool.shutdown();
}



private static class MyRejectedExecution implements RejectedExecutionHandler{
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("根据自己的业务情况,决定编写的代码!");
    }
}
核心属性:
// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


// 线程池状态的表示
// 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;

// 在使用下面这几个方法时,需要传递ctl进来

// 基于&运算的特点,保证只会拿到ctl高三位的值。
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c)  { return c & CAPACITY; }
线程池状态和切换方式:

ThreadPoolExecutor的execute方法
// 提交任务到线程池的核心方法
// command就是提交过来的任务
public void execute(Runnable command) {
    // 提交的任务不能为null
    if (command == null)
        throw new NullPointerException();
    // 获取核心属性ctl,用于后面的判断
    int c = ctl.get();
    // 如果工作线程个数,小于核心线程数。
    // 满足要求,添加核心工作线程
    if (workerCountOf(c) < corePoolSize) {
        // addWorker(任务,是核心线程吗)
        // addWorker返回true:代表添加工作线程成功
        // addWorker返回false:代表添加工作线程失败
        // addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程
        if (addWorker(command, true))
            // 工作线程构建出来了,任务也交给command去处理了。
            return;
        // 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl
        c = ctl.get();
    }
    // 添加核心工作线程失败,往这走
    // 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 如果任务添加到阻塞队列成功,走if内部
        // 如果任务在扔到阻塞队列之前,线程池状态突然改变了。
        // 重新获取ctl
        int recheck = ctl.get();
        // 如果线程池的状态不是RUNNING,将任务从阻塞队列移除,
        if (!isRunning(recheck) && remove(command))
            // 并且直接拒绝策略
            reject(command);
        // 在这,说明阻塞队列有我刚刚放进去的任务
        // 查看一下工作线程数是不是0个
        // 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
        // 发生这种情况有两种:
        // 1. 构建线程池时,核心线程数是0个。
        // 2. 即便有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut为true,代表核心线程也可以超时
        else if (workerCountOf(recheck) == 0)
            // 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理
            addWorker(null, false);
    }
    // 任务添加到阻塞队列失败
    // 构建一个非核心工作线程
    // 如果添加非核心工作线程成功,直接完事,告辞
    else if (!addWorker(command, false))
        // 添加失败,执行决绝策略
        reject(command);
}
execute方法的流程图:

ThreadPoolExecutor的addWorker方法:
  • 一、校验线程池的状态以及工作线程个数
  • 二、添加工作线程并且启动工作线程

校验线程池的状态以及工作线程个数

// 添加工作线程之校验源码
private boolean addWorker(Runnable firstTask, boolean core) {
    // 外层for循环在校验线程池的状态
    // 内层for循环是在校验工作线程的个数

    // retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环
    retry:
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 拿到ctl的高3位的值
        int rs = runStateOf(c);
//==========================线程池状态判断==================================================
        // 如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程个数为0,添加一个工作线程去处理阻塞队列的任务

        // 判断线程池的状态是否大于等于SHUTDOWN,如果满足,说明线程池不是RUNNING
        if (rs >= SHUTDOWN &&
            // 如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务
            // 如果三个条件有一个没满足,返回false,配合!,就代表不需要添加
            !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            // 不需要添加工作线程
            return false;

        for (;;) {
//==========================工作线程个数判断================================================== 
            // 基于ctl拿到低29位的值,代表当前工作线程个数   
            int wc = workerCountOf(c);
            // 如果工作线程个数大于最大值了,不可以添加了,返回false
            if (wc >= CAPACITY ||
                // 基于core来判断添加的是否是核心工作线程
                // 如果是核心:基于corePoolSize去判断
                // 如果是非核心:基于maximumPoolSize去判断
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 代表不能添加,工作线程个数不满足要求
                return false;
            // 针对ctl进行 + 1,采用CAS的方式
            if (compareAndIncrementWorkerCount(c))
                // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
                break retry;
            // 重新获取一次ctl的值
            c = ctl.get(); 
            // 判断重新获取到的ctl中,表示的线程池状态跟之前的是否有区别
            // 如果状态不一样,说明有变化,重新的去判断线程池状态
            if (runStateOf(c) != rs)
                // 跳出一次外层for循环
                continue retry;
        }
    }
    // 省略添加工作线程以及启动的过程
}

添加工作线程并且启动工作线程

private boolean addWorker(Runnable firstTask, boolean core) {
    // 省略校验部分的代码

    // 添加工作线程以及启动工作线程~~~
    // 声明了三个变量
    // 工作线程启动了没,默认false
    boolean workerStarted = false;
    // 工作线程添加了没,默认false
    boolean workerAdded = false;
    // 工作线程,默认为null
    Worker w = null;

    try {
        // 构建工作线程,并且将任务传递进去
        w = new Worker(firstTask);
        // 获取了Worker中的Thread对象
        final Thread t = w.thread;
        // 判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker
        // 一般如果为null,代表ThreadFactory有问题。
        if (t != null) {
            // 加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次获取线程池状态。
                int rs = runStateOf(ctl.get());
                // 再次判断
                // 如果满足  rs < SHUTDOWN  说明线程池是RUNNING,状态正常,执行if代码块
                // 如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 到这,可以添加工作线程。
                    // 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,抛出异常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 将new好的Worker添加到HashSet中。
                    workers.add(w);
                    // 获取了HashSet的size,拿到工作线程个数
                    int s = workers.size();
                    // largestPoolSize在记录最大线程个数的记录
                    // 如果当前工作线程个数,大于最大线程个数的记录,就赋值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加工作线程成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果工作线程添加成功,
            if (workerAdded) {
                // 直接启动Worker中的线程
                t.start();
                // 启动工作线程成功
                workerStarted = true;
            }
        }
    } finally {
        // 做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回工作线程是否启动成功
    return workerStarted;
}
// 工作线程启动失败,需要不的步长操作
private void addWorkerFailed(Worker w) {
    // 因为操作了workers,需要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 如果w不为null,之前Worker已经new出来了。
        if (w != null)
            // 从HashSet中移除
            workers.remove(w);
        // 同时对ctl进行 - 1,代表去掉了一个工作线程个数
        decrementWorkerCount();
        // 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了。
        tryTerminate();
    } finally {
        // 释放锁
        mainLock.unlock();
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到