一、线程的调度机制
线程调度是指系统为线程分配CPU使用权的过程
主要两种:
- 协同式线程调度(Cooperative Threads-Scheduling)
- 抢占式线程调度(Preemptive Threads-Scheduling)
1.协同式线程调度的多线程系统
线程执行的时间由线程本身来控制,线程把自己的工作执行完之后,要主动通知系统切换到另外一个线程上。
使用协同式线程调度的最大好处是实现简单,由于线程要把自己的事情做完后才会通知系统进行线程切换,所以没有线程同步的问题,但是坏处也很明显,如果一个线程出了问题,则程序就会一直阻塞
2.抢占式线程调度的多线程系统
每个线程执行的时间以及是否切换都由系统决定。在这种情况下,线程的执行时间不可控,所以不会有「一个线程导致整个进程阻塞」的问题出现
在Java中,Thread.yield()可以让出CPU执行时间,但是对于获取执行时间,线程本身是没有办法的。对于获取CPU执行时间,线程唯一可以使用的手段是设置线程优先级,Java设置了10个级别的程序优先级,当两个线程同时处于Ready状态时,优先级越高的线程越容易被系统选择执行
二、Java线程模型
为什么Java线程调度是抢占式调度?这需要我们了解Java中线程的实现模式。
我们已经知道线程其实是操作系统层面的实体,Java 底层会调用 pthread_create 来创建线程,所以本质上 java 程序创建的线程,就是和操作系统线程是一样的,是 1 对 1 的线程模型
Java中的线程怎么和操作系统层面对应起来呢?
任何语言实现线程主要有三种方式:
- 使用内核线程实现(1:1实现),
- 使用用户线程实现(1:N实现),
- 使用用户线程加轻量级进程混合实现(N:M实现)。
内核线程实现
使用内核线程实现的方式也被称为1: 1实现。 内核线程(Kernel-Level Thread, KLT) 就是直接由操作系统内核(Kernel, 下称内核) 支持的线程, 这种线程由内核来完成线程切换, 内核通过操纵调度器(Scheduler) 对线程进行调度, 并负责将线程的任务映射到各个处理器上。
每个线程都成为一个独立的调度单元,即使其中某一个在系统调用中被阻塞了,也不会影响整个进程继续工作
Linux系统中线程实现方式
- LinuxThreads linux/glibc包在2.3.2之前只实现了LinuxThreads
- NPTL(Native POSIX Thread Library) 为POSIX(Portable Operating System Interface,可移植操作系统接口)标准线程库
- POSIX 标准定义了一套线程操作相关的函数库pthread,用于让程序员更加方便地操作管理线程
- pthread_create是类Unix操作系统(Unix、Linux、Mac OS X等)的创建线程的函数
用户线程实现
完全建立在用户空间的线程库上,系统内核不能感知到用户线程的存在及如何实现的。用户线程的建立、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。因此操作可以是非常快速且低消耗的, 也能够支持规模更大的线程数量
用户线程的优势在于不需要系统内核支援,劣势也在于没有系统内核的支援, 所有的线程操作都需要由用户程序自己去处理
混合实现
- 线程除了依赖内核线程实现和完全由用户程序自己实现之外, 还有一种将内核线程与用户线程一起使用的实现方式, 被称为N:M实现。 在这种混合实现下, 既存在用户线程, 也存在内核线程。
- 用户线程还是完全建立在用户空间中, 因此用户线程的创建、 切换、 析构等操作依然廉价, 并且可以支持大规模的用户线程并发。
- 同样又可以使用内核提供的线程调度功能及处理器映射,并且用户线程的系统调用要通过内核线程来完成。在这种混合模式中, 用户线程与轻量级进程的数量比是不定的,是N:M的关系。
Java线程的实现
从JDK 1.3起, 主流商用Java虚拟机的线程模型普遍都被替换为基于操作系统原生线程模型来实现,即采用1: 1的线程模型。
HotSpot为例,它的每一个Java线程都是直接映射到一个操作系统原生线程来实现的,而且中间没有额外的间接结构, 所以HotSpot自己是不会去干涉线程调度的,全权交给底下的操作系统去处理
虚拟线程
在Java 21中,引入了虚拟线程
用户级线程的实现。虚拟线程是 Java 中的一种轻量级线程,它旨在解决传统线程模型中的一些限制,提供了更高效的并发处理能力,允许创建数千甚至数万个虚拟线程,而无需占用大量操作系统资源
- 虚拟线程适用于执行阻塞式任务,在阻塞期间,可以将CPU资源让渡给其他任务
- 虚拟线程不适合CPU密集计算或非阻塞任务,虚拟线程并不会运行的更快,而是增加了规模
- 虚拟线程是轻量级资源,用完即抛,不需要池化
- 通常我们不需要直接使用虚拟线程,像Tomcat、Jetty、 Netty、 Spring boot等都已支持虛拟线程
ForkJoin作为平台线程来挂载虚拟线程
public class VTDemo {
public static void main(String[] args) throws InterruptedException {
//平台线程
Thread.ofPlatform().start(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
}
});
//虚拟线程
Thread vt = Thread.ofVirtual().start(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
}
});
//等待虚拟线程打印完毕再退出主程序
vt.join();
}
}
输出:
Thread[#22,Thread-0,5,main]
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-1
三、线程间的通信
1、管道输入输出流
Java 应用生成文件,然后需要将文件上传到云端
我们一般的做法是,文件-->本地磁盘-->云盘,但是通过Java中的管道输入输出流一步到位,则可以避免写入磁盘这一步。
Java中的管道输入/输出流主要包括了如下4种具体实现:
1、面向字节:PipedOutputStream、PipedInputStream
2、面向字符:PipedReader、PipedWriter
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
2、volatile(最轻量的通信/同步机制)
volatile 关键字用于保证变量的可见性,即当一个变量被声明为 volatile 时,它会保证对该变量的写操作会立即刷新到主内存中,而读操作会从主内存中读取最新的值
public class VolatileDemo {
private static volatile boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程t1开始执行");
int i=0;
while (!stop){
i++;
}
System.out.println("跳出循环");
}
},"t1");
t1.start();
Thread.sleep(1000);
stop = true;
System.out.println("主线程修改stop=true");
}
}
(1)不加volatile时,子线程无法感知主线程修改了stop的值,从而不会退出循环
输出:
线程t1开始执行
主线程修改stop=true
(2)加volatile时,子线程可以感知主线程修改了ready的值,迅速退出循环。
输出:
线程t1开始执行
主线程修改stop=true
跳出循环
Java线程之间的通信由Java内存模型(Java Memory Model,简称JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。根据JMM的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序提供内存可见性的保证。
线程B-->共享变量的副本-->主内存共享变量-->共享变量的副本-->线程A
3、Thread.join
等待通知机制
join可以理解成是线程合并,当在一个线程调用另一个线程的join方法时,当前线程阻塞等待被调用join方法的线程执行完毕才能继续执行,所以join的好处能够保证线程的执行顺序,但是如果调用线程的join方法其实已经失去了并行的意义,虽然存在多个线程,但是本质上还是串行的,最后join的实现其实是基于等待通知机制的。
4、等待/通知机制
线程之间相互配合完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。
前者是生产者,后者就是消费者,简单的办法是让消费者线程不断地循环检查变量是否符合预期在while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。却存在如下问题:
- 难以确保及时性。
- 难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
1、wait/notify/notifyAll
基于Object的wait/notify/notifyAll机制。
等待通知机制可以基于对象的wait和notify方法来实现,在一个线程内调用该线程锁对象的wait方法,线程将进入等待队列进行等待直到被唤醒。
- wait(): 调用的线程进入 WAITING状态,等待另外线程的通知或被中断才返回。需要注意,调用wait()方法后,会释放对象的锁。(注:一定要使用synchronized否则报错)
- wait(long): 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
- wait (long,int): 对于超时时间更细粒度的控制,可以达到纳秒
- notify():通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。
- notifyAll():通知所有等待在该对象上的线程。尽可能用notifyAll(),谨慎使用notify(),因为notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。
等待需遵循如下原则:
(1)获取对象的锁。
(2)调用对象的wait()方法,如果条件不满足,被通知后仍要检查条件。
(3)条件满足则执行对应的逻辑。
synchronized(对象) {
while (条件不满足) {
对象.wait();
}
对应的逻辑;
}
通知需遵循如下原则。
1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
synchronized(对象) {
改变条件
对象.notifyAll();
}
2、park/unpark
LockSupport是JDK中用来实现线程阻塞和唤醒的工具,线程调用park则等待“许可”,调用unpark则为指定线程提供“许可”。类似于发放许可证
LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;
如果许可已经被占用,当前线程阻塞,等待获取许可。
使用它可以在任何场合使线程阻塞,可以指定任何线程进行唤醒,并且不用担心阻塞和唤醒操作的顺序,但要注意连续多次唤醒的效果和一次唤醒是一样的
Java锁和同步器框架的核心AQS:AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和 LockSupport.unpark()实现线程的阻塞和唤醒的
public class LockSupportDemo {
public static void main(String[] args) throws InterruptedException {
Thread parkThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("ParkThread开始执行");
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("ParkThread执行完成");
}
});
parkThread.start();
Thread.sleep(1000);
System.out.println("唤醒parkThread");
// 给线程 parkThread 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(parkThread);
}
}