JavaEE初阶Day 14:多线程(12)

发布于:2024-04-20 ⋅ 阅读:(28) ⋅ 点赞:(0)

Day 14 :多线程(12)

回顾

锁策略

乐观锁VS悲观锁;轻量级锁VS重量级锁;自旋锁VS挂起等待锁;公平锁VS非公平锁;可重入锁VS不可重入锁;普通斥锁VS读写锁

synchronize基本特性与实现原理

  • 锁升级:偏向锁 --> 轻量级锁 --> 重量级锁
  • 锁消除:编译器优化
  • 锁粗化:编译器优化

CAS的ABA问题

使用CAS编写代码:比较然后再交换

在比较过程中:检查当前内存的值,是否被其他线程修改了,如果被修改了,就要稍后再重试,如果没被修改,接下来就可以直接修改,不会有线程安全问题,没有其他线程穿插执行。但是值没变 != 值没变过,有可能另一个线程把这个值从A变为B,再从B变为A了

ABA在大部分情况下没什么问题,但是在极端情况下,就可能产生bug

如何避免ABA问题,核心思路是引入版本号,约定版本号只能加不能减,每一次操作版本号都要+1,通过CAS判定版本号,如果版本号没有发生改变,数据就一定没有变过

Callable接口

Callable也是用来描述任务的,并且call方法带有返回值,表示这个线程执行结束会得到什么结果

package thread;

public class Demo39 {
    private static int sum = 0;

    public static void main(String[] args) throws InterruptedException {
        //创建一个线程,让这个线程来实现 1 + 2 + 3 +......+ 1000
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                int result = 0;
                for (int i = 0; i <= 1000; i++) {
                    result += i;
                }
                //此处为了把result告知主线程,就需要通过静态成员变量倒腾一下
                sum = result;
            }
        });
        t.start();
        t.join();

        //主线程获取得到结果
        System.out.println(sum);
    }
}

上述代码主线程与t线程耦合太大了,线程内部定义的局部变量是不能被其他线程获取得到的,线程更多,就会更麻烦

Callable就是为了更优雅的解决上述问题

package thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Demo40 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i <= 1000; i++) {
                    result += i;
                }
                return result;
            }
        };

        //创建线程,把callable搭载到线程内部执行
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread t = new Thread(futureTask);
        t.start();
        t.join();
        System.out.println(futureTask.get());
    }
}

创建线程的方式

  • 继承Thread
  • 使用Runnable
  • 使用lambda
  • 使用线程池/ThreadFactory
  • 使用Callable

ReentrantLock

ReentrantLock:可重入

package thread;

import java.util.concurrent.locks.ReentrantLock;

public class Demo41 {
    public static void main(String[] args) {
        ReentrantLock locker = new ReentrantLock();
        try {
            //加锁
            locker.lock();
        } finally {
            //解锁
            locker.unlock();
        }
    }
}
  • ReentrantLock提供了公平锁的实现,synchronized只是非公平锁,ReentrantLock locker = new ReentrantLock(true);表示公平锁,false/不填写表示非公平锁

  • ReentrantLock提供tryLock操作,给加锁提供了更多的可操作空间,尝试加锁,如果锁已经被获取到了,直接返回失败,而不会像synchronized遇到锁竞争会阻塞等待,tryLock也可以去指定等待超时时间

  • ReentrantLock搭配Condition类完成等待通知,synchronized搭配wait与notify等待通知机制,Condition可以指定线程唤醒,多个线程wait,notify是唤醒随机一个

信号量Semaphore

信号量就是一个计数器,描述了可用资源的个数,围绕信号量有两个基本操作

  • P操作:计数器+1,申请资源(acquire)
  • V操作:计数器-1,释放资源(release)
package thread;

import java.util.concurrent.Semaphore;

public class Demo42 {
    public static void main(String[] args) throws InterruptedException {
        //4个可用资源
        Semaphore semaphore = new Semaphore(4);
        semaphore.acquire();
        System.out.println("P操作");
        semaphore.acquire();
        System.out.println("P操作");
        semaphore.acquire();
        System.out.println("P操作");
        semaphore.acquire();
        System.out.println("P操作");
        semaphore.acquire();
        System.out.println("P操作");
    }
}

上述代码进行了5次P操作,但是信号量只有4个可用资源,所以在第5次P操作的时候,会出现阻塞等待

锁其实就是特殊的信号量,如果信号量只有0、1两个取值,此时就称为”二元信号量“,本质就是一把锁

package thread;

import java.util.concurrent.Semaphore;

public class Demo43 {

    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(1);
        Thread t1 = new Thread(()->{
            try {
                for (int i = 0; i < 50000; i++) {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        });

        Thread t2 = new Thread(()->{
            try {
                for (int i = 0; i < 50000; i++) {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("count = " + count);
    }
}

CountDownLatch

当我们把一个任务拆分成很多个的时候,可以通过这个工具类来识别任务是否整体执行完毕

package thread;

import java.util.concurrent.CountDownLatch;

public class Demo44 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            int id = i;
            Thread t = new Thread(()->{
                System.out.println("线程启动" + id);
                try {
                    //假设这里是进行一些“下载”这样的耗时操作
                    Thread.sleep(3000);
                }catch (InterruptedException e){
                    throw new RuntimeException(e);
                }

                System.out.println("线程结束" + id);
                latch.countDown();
            });

            t.start();
        }
        //通过await等待所有线程调用countDown
        latch.await();
        System.out.println("所有线程结束");
    }
}

await会阻塞等待,一直到countDown调用的次数和构造方法指定的次数一致的时候,await才会返回,await不仅仅能代替join,比如有1000个任务,交给4个线程的线程池来执行,如何判定1000个任务执行完了,也可以使用CountDownLatch来判定(这个过程没有线程真正结束)

集合类的多线程安全问题

ArrayList、LinkedList、Stack、Queue、HashMap…大部分都是线程不安全的

  • Vector自带了synchronized,Stack继承了Vector,也自带了synchronized
  • Hashtable也自带了synchronized

加锁不能保证线程一定安全,不加锁也不能确定线程一定不安全

手动加锁比较麻烦,标准库提供了一些其他的解决方案

1. Collections.synchronizedList(new ArrayList)

给ArrayList这些集合类,套一层壳,壳上是给关键方法都加了synchronized,就可以使ArrayList达到类似于vector的效果

2. CopyOnWriteArrayList

写时拷贝:在读的时候读取旧的数组,在写的时候,使用新的数组来写,当写完之后,用新的数组的引用,代替旧的数组的引用(引用赋值操作,是原子的),旧的空间就可以释放了

上述过程,没有任何加锁和阻塞等待,也就能确保读线程不会读出错误的数据

上述操作其实实用性非常高,有些服务器程序需要更新配置文件/数据文件,就可以采取上述策略

  • 显卡渲染画面到显示器就是按照写时拷贝的方式,在显示上一个画面的时候,在背后用额外的空间生成下一个画面,生成完毕了,使用下一个画面代替上一个画面

3. BlockingQueue

多线程使用队列,直接使用BlockingQueue即可

4. ConcurrentHashMap

多线程使用哈希表,HashMap是线程不安全的,Hashtable是带锁的,但是标准库提供了更好的代替,即ConcurrentHashMap

Hashtable加锁是简单粗暴给每个方法加了synchronized,相当于是针对this加锁,只要针对Hashtable上的元素进行操作,就会涉及到锁冲突

ConcurrentHashMap做出了优化

  • 使用**“锁桶”的方式,来代替“一把全局锁”,有效降低锁冲突的概率,即对每个哈希桶进行加锁**

    • 如果两个线程针对两个不同的链表进行操作,是不会涉及到锁冲突的,本身操作两个不同链表上的元素,也没修改“公共变量”,本身就不涉及到线程安全问题,上述提升的收益是非常大的,一个hash表,上面的hash桶的个数是非常多的,大部分的操作都没有锁冲突了(synchronized如果不产生锁冲突,就是偏向锁)
    • 另一方面,看起来锁对象多了,实际上也不会产生更多的额外开销,Java中每个对象都可以作为锁对象,就只需要把synchronized加到链表头节点上,就可以达成上述效果
  • 哈希表中的size,即使插入的元素是不同的链表上的元素,也会涉及到多线程修改同一变量,ConcurrentHashMap引入CAS,通过CAS的方式来修改size,也就避免了加锁操作

  • ConcurrentHashMap针对扩容操作做了特殊优化——化整为零,普通的HashMap要在一次put的过程中完成整个扩容过程,就会使put操作非常卡,ConcurrentHashMap会在扩容的时候,搞两份空间

    • 一份是扩容之前的空间
    • 一份是扩容之后的空间

    每次进行hash表的基本操作,都会把一部分数据从就空间搬到新空间,不是一口气搬完,分多次搬

    搬的过程中

    1. 插入:插入到新的上面

    2. 删除:新的旧的都要删除

    3. 查找:新的旧的都要查找

Java 8之前,ConcurrentHashMap基于分段锁的方式实现,引入若干个锁对象,每个锁对象管理若干个哈希桶,Java 8之后就把这种实现方式废弃了