多线程(基础三)
单例模式
🔑单例模式是设计模式之一,那设计模式又是啥呢?设计模式就类似于棋谱,是前人写代码时总结出的一些固定的套路,单例模式指是某个类在程序中只存在唯一的实例,不会创建出多个实例。单例模式有两种实现方式:饿汉模式,懒汉模式
饿汉模式
饿汉模式是程序一启动就创建了实例,实例创建的早
class Singleton{
private static Singleton singleton = new Singleton();
public static Singleton getInstance(){
return singleton;
}
private Singleton(){
}
}
✅类属性引用了一个实例对象,(类属性在了类对象里,而类对象在整个程序中只有一份,所以类属性就只有一份)。当 程序一启动,这个Singleton类就会被加载,就会随之创建一个类对象,那类属性又在类对象里,所以也就是,当程序一启动,类属性就被初始化,singleton就引用了Singleton实例,正因为实例创建的比较早,所以叫饿汉模式
✅把构造方法设为私有的,保证类外无法创建实例,而类属性只有一份,那Singleton实例只有一份,这样就实现了单例模式(只能创建一个实例)
懒汉模式(单线程版)
懒汉模式是第一次用的啥时候再创建实例,一直不用就不创建,这就是懒汉模式
class SingletonLazy{
private static SingletonLazy singletonLazy = null;
public static SingletonLazy getInstace(){
if(singletonLazy==null){
singletonLazy = new SingletonLazy();
}
return singletonLazy;
}
private SingletonLazy(){
}
}
✅第一次用的时候创建实例,以后再用直接返回。类加载的时候不创建实例,调用方法的时候再创建,这就是懒汉模式
懒汉模式(多线程版)
思考:上面两种单例模式如果在多线程环境下会出现线程安全问题吗?
✅对于饿汉模式每次调用方法都是读操作,所以不会出现线程安全问题,但是对于懒汉模式,在多线程环境中,下面代码中的if语句和new语句,两个线程可能会同时执行,当其中一个线程判断if条件成立,new对象之前,另一个线程也判断if成立,然后也new对象,这样两个线程就new了两个对象,造成了线程安全问题。
public static SingletonLazy getInstace(){
if(singletonLazy==null){
singletonLazy = new SingletonLazy();
}
return singletonLazy;
}
改进版1:
✅造成线程安全问题的原因主要是if语句和new对象语句不是原子操作,一个线程在执行时,另一个线程也可以执行。 所以解决方式就是加锁,让if语句(读操作)和new对象语句(写操作)是一个原子操作。这样两个线程执行同步代码块就有了先后顺序。这样就不会产生new出两个对象这样的bug了
class SingletonLazy{
private static SingletonLazy singletonLazy = null;
public static SingletonLazy getInstace(){
synchronized (Singleton.class){
if(singletonLazy==null){
singletonLazy = new SingletonLazy();
}
}
return singletonLazy;
}
private SingletonLazy(){
}
}
改进版2:
✅虽然加锁解决了这样的一个问题,但是又产生了一个新问题,那就是只有当第一次初始化的时候可能会产生线程安全问题,等实例创建好之后再调用getInstance()就不会产生线程安全问题了,但是按上面代码,每次调用方法,都要执行同步代码块,也就是每次都要加锁释放锁(加锁释放锁就会产生较大的时间开销),这样就很不合理,创建好实例之后再调用方法就不会产生线程安全问题了,也就没必要在加锁释放锁了,直接返回实例就行了,看下面的改进版
class SingletonLazy{
private static SingletonLazy singletonLazy = null;
public static SingletonLazy getInstace(){
if(singletonLazy==null){
synchronized (Singleton.class){
if(singletonLazy == null){
singletonLazy = new SingletonLazy();
}
}
}
return singletonLazy;
}
private SingletonLazy(){
}
}
✅外层if判定是否已经初始化好了,如果初始化好了,就直接返回,如果没有初始化,尝试加锁然后尝试初始化,内层if是判定当前线程拿到锁之后,再判定一下是否真的要进行初始化
✅如果多个线程并发执行,只有其中一个线程能先拿到锁,其余线程顶多执行到synchronized (Singleton.class)就阻塞了,当第一个拿到锁的线程创建完对象释放锁后,其余线程如果还没进入外层if那就直接返回,如果其余线程进入了外层if,就能尝试加锁,然后执行代码,但是由于第一个线程创建了对象,其余线程就不能创建对象了,所以再用内层的if判定一下是否真的需要创建线程。
public static SingletonLazy getInstace(){
if(singletonLazy==null){
synchronized (Singleton.class){
if(singletonLazy == null){
singletonLazy = new SingletonLazy();
}
}
}
return singletonLazy;
}
改进版3:
上面方法中,有的是读操作,有的是写操作,如果一个线程把数据读到寄存器了,然后另一个线程会不会直接复用寄存器中的值呢,也就是会不会因为编译器优化导致内存可见性问题呢?这个是不确定的,按理来说每个线程有自己的寄存器,不会出现这种复用的情况,但是编译器优化不好说从哪个角度优化,不好说会不会出现复用这种情况,所以加上volatile关键字,禁止编译器优化,是比较稳健的
class SingletonLazy{
volatile private static SingletonLazy singletonLazy = null;
public static SingletonLazy getInstace(){
if(singletonLazy==null){
synchronized (Singleton.class){
if(singletonLazy == null){
singletonLazy = new SingletonLazy();
}
}
}
return singletonLazy;
}
private SingletonLazy(){
}
}
阻塞队列
阻塞队列应用场景
阻塞队列是一种线程安全的数据结构,也遵循队列先进先出的原则,与普通队列不同的是:当队列满时,尝试入队列就会阻塞(修改线程的状态,让线程对应的PCB暂时不参与调度),直到有其他线程从队列中取走元素,当队列空时,尝试出队列就会阻塞,直到有其他线程往队列中加入元素
阻塞队列的一个典型应用场景是生产者消费者模型,这是一种典型的开发模型
比如说上面这个例子,服务器A给服务器B发生数据,A直接给B发送数据,开发A的代码时得考虑B是如何接收的,开发B的代码时得考虑A是如何发送的,这就是耦合性比较强。极端情况下,A这边出了问题,可能就会把B搞挂,B这边出了问题,A的发送代码可能也会出现异常,搞不好就把A搞挂了
上面这种开发方式耦合性太高,下面引入生产者消费者模型,就能降低耦合,生产者消费者模型就是通过一个容器来解决生产者和消费者之间的强耦合问题,阻塞队列就能充当这种容器
服务器A发送数据给阻塞队列,服务器B把数据从阻塞队列取走,A和B不直接交互这样就能降低耦合,
💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛💛
==上面是生产者消费者模型的一大优势:解决高耦合性的问题,下面来看生产者消费者模型的另一大优势:==比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放
到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求。这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮 (如果请求的线程突然很多,就让请求的线程阻塞等待)
总结:生产者消费者模型的两大优势:降低生产者和消费者之间的强耦合,能够削峰填谷,提高整个系统的抗风险程度
阻塞队列使用
package thread;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class Demo13 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
Thread coustmer = new Thread(()->{
while(true){
try {
int value = queue.take();
System.out.println("消耗元素"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
coustmer.start();
Thread producer = new Thread(() ->{
int n = 0;
while (true){
System.out.println("生产元素"+n);
try {
queue.put(n);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
});
producer.start();
}
}
✅生产者把数据放到阻塞队列中,然后消费者从阻塞队列中取走数据
BlockingDeque是个接口,LinkedBlockingDeque类实现了这个接口,底层是链表,这个阻塞队列最大容量是MAX_VALUE (0x7fffffff)(int类型最大值)
阻塞队列实现
我们是实现的这个阻塞队列,是基于数组实现的,下面看代码
package thread;
public class MyBlockedQueue {
private int[] elem;
public MyBlockedQueue(int capacity){
elem = new int[capacity];
}
int head = 0;
int rear = 0;
int size = 0;
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
if(size == 0){
this.wait();
}
ret = elem[head];
head = (head+1)% elem.length;
size--;
this.notify();//唤醒因队列满而wait()的线程,也可能会唤醒因队列空而wait()的线程
}
return ret;
}
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
if(size== elem.length){
this.wait();
}
elem[rear] = value;
rear = (rear+1)% elem.length;
size++;
this.notify();//主要是为了唤醒因队列空而wait()的线程,也可能会唤醒因队列满而wait()的线程
}
}
}
✅阻塞队列相比于普通的队列,不仅要实现线程安全(不能出现满了之后又入队列这种情况),还得实现阻塞的效果(满了再入要阻塞,空了再出要阻塞),如果满了再入就阻塞,让这个线程不再参与锁的竞争,等待被唤醒之后再参与锁竞争
🔑其实真实的环境中,生产者消费者都是有多个的,所以生产者对应的线程就有多个,消费者对应的线程也就有多个
✅线程安全的实现;入队列和出队列都要加锁,避免因为多个线程并发执行导致队列满了还能入队列(比如队列差一个元素就满了,多个线程并发执行了if判断语句,然后多个线程都判定为未满,然后每个线程都入了一个元素,),或者队列空了还能又出队列这种线程安全问题。
✅阻塞的实现:wait()和notify() ,如果对于一个阻塞队列如果生产者效率比消费者效率高,那队列早晚得满,满了之后再入元素就要使生产者队列阻塞,然后等消费者取完元素,调用notify(),唤醒生产者中阻塞的线程。同理,阻塞队列里空了再出也是一样
✅上面这种写法有可能还会出现线程安全问题,思考下面的过程:对于一个阻塞队列如果生产者效率比消费者效率高,那队列早晚得满,然后当满了的时候,可能又有个生产者线程尝试入队列,那这个线程就会释放锁,然后阻塞等待,释放锁之后,这个对象锁可能是生产者中的一个线程拿到,也可能是消费者中的一个线程拿到,因为这些线程都要竞争锁,不好说哪个线程拿到,假如生产者线程拿到锁,那又会释放锁,然后阻塞等待。假如释放锁之后,这个锁又让生产者线程拿到了,那该线程又释放锁,然后阻塞等待。也就是说可能有多个生产者线程因为队列满而阻塞等待。那总有一次这个锁会让消费者线程拿到了,然后出队列一个元素,然后notify() ,这时候因为调用了wait()而阻塞等待的生产者线程可能有多个,notify()会挑选其中一个唤醒,然后消费者线程执行完,释放锁,这个锁可能被生产者线程拿到,也可能让消费者线程拿到,如果让消费者线程拿到,那就再出一个元素,然后notify()一次,唤醒一个生产者线程;如果让生产者线程拿到,那就入队列一个元素,然后notify()一次,这时唤醒的是一个生产者线程,然后该线程执行完释放锁,然后下次的锁假如被生产者拿到,就会从wait()往下执行,就会满了还入元素,这就是bug,主要是因为满了之后出队列一个,而紧接着入队列可能多个,就会出现满了之后还能入队列的情况。解决办法就是把if判定改为while判定,让阻塞的生产者线程拿到锁之后,再判定一下阻塞队列是不是真的不为满
package thread;
public class MyBlockedQueue {
private int[] elem;
public MyBlockedQueue(int capacity){
elem = new int[capacity];
}
int head = 0;
int rear = 0;
int size = 0;
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
this.wait();
}
ret = elem[head];
head = (head+1)% elem.length;
size--;
this.notify();
}
return ret;
}
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while (size== elem.length){//使用while让阻塞的线程拿到锁之后再判定一次现在的队列是不是真的不为满,因为再一次拿到锁之后,已经过了一段时间了,可能拿到锁之后阻塞队列又满了,所以还得再判断一下
this.wait();
}
elem[rear] = value;
rear = (rear+1)% elem.length;
size++;
this.notify();
}
}
}
定时器
线程池
线程池介绍
❤️我们之前学过字符串常量池,字符串常量池,就是为了避免每次创建字符串的时候都需要在堆上开辟空间,所以搞了一个字符串常量池,每次创建字符串时都往常量池里加入一个字符串,后续如果再写一个字符串,如果常量池里有这个字符串,就不需要再创建字符串对象了。
❤️这里的常量池也是为了避免频繁的创建销毁线程带来的时间开销。高并发场景下,创建销毁进程成本比较高,所以为了解决这种问题,引入了线程,同一进程中的线程创建销毁不需要申请释放资源,这样频繁的创建销毁线程就不需要那么高的成本了。但是在更频繁的场景下,频繁创建销毁线程也是开销较大的,操作系统也有点吃不消了,于是就引入了线程池,把线程创建好后放到池子里,每次用线程就从池子里取,用完还给线程,而不是用完直接销毁,这样就能使开销更少。
❤️为啥从池子里取线程就比系统创建线程来的快呢?
- 因为从池子里取线程是纯用户态操作,而系统创建线程是内核态操作,通常认为牵扯到内核态的操作比用户态的操作更低效
CPU的两种状态:内核态和用户态,内核态运行操作系统程序,用户态运行用户程序。
为啥牵扯到内核态的操作就比用户态的操作更低效呢?
- 因为操作系统本身有很多任务,不能说立即执行创建销毁线程这个操作,可能执行完其他任务再来执行创建销毁线程这个操作,所以相对来说牵扯到内核态的操作更低效
使用java标准库中的线程池
package thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Demo17 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("哈哈");
}
};
threadPool.submit(runnable);
}
}
✅我们一般使用上面这种方式来创建一个线程池,ExecutorService (执行器服务)是一个接口,newFixedThreadPool 是 Executors的一个静态方法,10代表创建固定数量的线程是10个线程。然后调用submit()给线程池提交一个任务,由线程池中的线程执行任务。
✅而这种通过静态方法来创建实例称为工厂方法,这种设计模式称为工厂模式(设计模式的一种),ThreadPoolExecutor是原始的线程池类,通过这个类的构造方法也可以创建线程池对象,但是这个类的构造方法有一些使用上的限制,使用起来比较麻烦,所以就给这个类的构造方法封装了一层,通过Executor这个类的静态方法来调用ThreadPoolExecutor的构造方法,实现线程池对象的创建。
✅我们一般是通过new调用构造方法来创建对象的,但是有时候使用构造方法会有一些限制,不方便使用,所以需要给构造方法再封装一层,外面起到封装作用的方法就是工厂方法。
✅构造方法的限制:构造方法必须得和类名一样,所以要实现不同版本的构造,构造方法得重载,重载要求参数列表必须不同,而有的时候实现不同版本的构造时,参数列表又一样,这就比较难办:
class Point{
double x;
double y;
double r;
double a;
//通过笛卡尔坐标系构建一个点
public Point(double x ,double y){
this.x = x;
this.y = y;
}
//通过极坐标系构建一个点
public Point(double r, double a){
this.r = r;
this.a = a;
}
}
上面两种构建点的方式不同,但是构造方法的参数列表又是相同的,这就会出现编译错误。所以构造方法是有局限性的。为了解决上述问题,就可以使用工厂模式:
class Point{
double x;
double y;
double r;
double a;
public void setX(double x) {
this.x = x;
}
public void setY(double y) {
this.y = y;
}
public void setR(double r) {
this.r = r;
}
public void setA(double a) {
this.a = a;
}
public static Point makePointByXy(double x, double y){
Point p = new Point();
p.setX(x);
p.setY(y);
return p;
}
public static Point makePointByRa(double r, double a){
Point p = new Point();
p.setR(r);
p.setA(a);
return p;
}
}
通过静态的方法把构造方法封装起来,这样就避免了构造方法的局限性。
线程池自主实现
实现一个自定义数量线程的线程池:
class MyThreadPool{
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
public MyThreadPool(int n){
for (int i = 0; i < n; i++) {
Thread thread = new Thread(() -> {
while (true){
try {
//如果阻塞队列为空,则take()操作会使当前线程阻塞
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
public void submit(Runnable runnable){
try {
//如果阻塞对列为满,则put()操作会使当前线程阻塞。
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Demo18 {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(10);
for (int i = 0; i < 100; i++) {
myThreadPool.submit(()->{
System.out.println("哈哈哈");
});
}
}
}
通过submit()方法给线程池提交一个任务,然后用一个阻塞队列(因为是多线程环境下,为了保证线程安全,所以使用阻塞队列)来存储这些任务,然后通过 MyThreadPool类的构造方法一次创建固定数量的线程池,==每个线程的任务是从阻塞队列中取出一个任务,然后执行任务。==线程池其实就类似于生产者消费者模型,生产者就是主线程,主线程把任务加到线程池中,消费者就是线程池,线程池中的线程就会把这些任务解决
每个线程都会死循环地从阻塞队列中取任务执行,如果线程池中的线程取任务时,阻塞队列为空,那当前线程就阻塞等待, 当在阻塞队列中插入任务时,才有可能被唤醒。
线程池存在的目的是降低因为频繁的创建销毁线程带来的开销,线程的创建是内核态操作,而线程池中的线程一旦创建好之后,执行任务就是纯用户态操作。普通线程执行完任务就销毁了,而线程池中的线程一旦创建好之后便不会销毁,可以一直不停的执行任务,所以降低了频繁创建销毁线程带来的时间开销。
❤️上面这种方式是在构造方法创建的固定数量的线程,下面这种方式是写一个类代表了线程池中的工作线程,并且创建工作线程的数量也是根据插入阻塞队列的任务的数量决定的。
package thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class Worker extends Thread{
private BlockingQueue<Runnable> queue = null;
public Worker(BlockingQueue<Runnable> queue){
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class MThreadPool {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private int maxNum = 10;
public int num = 0;
public void submit(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(num < maxNum){
Worker worker = new Worker(queue);
worker.start();
num++;
}
}
}
class Test2{
public static void main(String[] args) {
MThreadPool threadPool = new MThreadPool();
for (int i = 0; i < 100; i++) {
threadPool.submit(()->{
System.out.println("哈哈");
});
}
}
}
✅Worker代表工作线程,调用submit()方法时,给阻塞队列中加一个任务,并且如果已经创建的线程数量小于规定的最大创建线程的数量,就创建线程对象,并且给线程对象传过去阻塞队列,这样工作线程就能从阻塞队列中取出任务并执行了。这些工作线程对象和线程池对象中的属性都引用了同一个阻塞队列,所以给线程池对象中的阻塞队列中插入任务,就是给工作线程中的阻塞队列中插入任务。