【JavaEE多线程】深入解析Java并发工具类与应用实践

发布于:2024-04-30 ⋅ 阅读:(24) ⋅ 点赞:(0)


JUC(java.util.concurrent)的常见类

concurrent:并发(多线程)

Callable 接口

Callable 是一个 interface。也是一种创建多线程的方式,相当于把线程封装了一个 “返回值”。方便程序员借助多线程的方式计算结果。

Runnable能表示一个任务(run方法),返回void

Callable也能表示一个任务(call方法),返回一个具体的值,类型可以通过泛型参数来指定(Object)

如果进行多线程操作,如果你关心多线程执行的过程,使用Runnable,比如线程池,定时器,就是用的Runnable只关心过程

如果进行多线程操作,如果你关心多线程的计算结果,使用Callable,比如通过多线程的方式计算一个公式,计算1+2+…+1000

代码示例: 创建线程计算 1 + 2 + 3 + … + 1000

不使用 Callable 版本

  • 创建一个类 Result , 包含一个 sum 表示最终结果, lock 表示线程同步使用的锁对象.
  • main 方法中先创建 Result 实例, 然后创建一个线程 t. 在线程内部计算 1 + 2 + 3 + … + 1000.
  • 主线程同时使用 wait 等待线程 t 计算结束. (注意, 如果执行到 wait 之前, 线程 t 已经计算完了, 就不必等待了).
  • 当线程 t 计算完毕后, 通过 notify 唤醒主线程, 主线程再打印结果.
static class Result {
    public int sum = 0;
    public Object lock = new Object();
}
public static void main(String[] args) throws InterruptedException {
    Result result = new Result();
    Thread t = new Thread() {
        @Override
        public void run() {
            int sum = 0;
            for (int i = 1; i <= 1000; i++) {
                sum += i;
           }
            synchronized (result.lock) {
                result.sum = sum;
                result.lock.notify();
           }
       }
   };
    t.start();
    synchronized (result.lock) {
        while (result.sum == 0) {
            result.lock.wait();
       }
        System.out.println(result.sum);
   }
}

可以看到, 上述代码需要一个辅助类 Result, 还需要使用一系列的加锁和 wait notify 操作, 代码复杂, 容易出错.

使用 Callable 版本

  • 创建一个匿名内部类, 实现 Callable 接口. Callable 带有泛型参数. 泛型参数表示返回值的类型.
  • 重写 Callable 的 call 方法, 完成累加的过程. 直接通过返回值返回计算结果.
  • 把 callable 实例使用 FutureTask 包装一下.
  • 创建线程, 线程的构造方法传入 FutureTask . 此时新线程就会执行 FutureTask 内部的 Callable 的call 方法, 完成计算. 计算结果就放到了 FutureTask 对象中.
  • 在主线程中调用 futureTask.get() 能够阻塞等待新线程计算完毕. 并获取到 FutureTask 中的结果.
Callable<Integer> callable = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 1000; i++) {
            sum += i;
       }
        return sum;
   }
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread t = new Thread(futureTask);//使用Callable不能作为Thread的构造方法参数,是借助FutureTask
t.start();
int result = futureTask.get();//通过FutureTask获取Callable的call方法的结果,get类似join一样,如果call方法没算完会阻塞等待
System.out.println(result);

可以看到, 使用 Callable 和 FutureTask 之后, 代码简化了很多, 也不必手动写线程同步代码了.

理解Callable

Callable 和 Runnable 相对, 都是描述一个 “任务”. Callable 描述的是带有返回值的任务, Runnable 描述的是不带返回值的任务.

Callable 通常需要搭配 FutureTask 来使用. FutureTask 用来保存 Callable 的返回结果. 因为Callable 往往是在另一个线程中执行的, 啥时候执行完并不确定.

FutureTask 就可以负责这个等待结果出来的工作.

理解FutureTask

想象去吃麻辣烫. 当餐点好后, 后厨就开始做了. 同时前台会给你一张 “小票” . 这个小票就是FutureTask. 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没

目前为止学到的创建线程的方式:

  1. 直接继承Thread
  2. 实现Runnable
  3. 使用lambda
  4. 使用线程池
  5. 使用Callable

其中1、2、5搭配匿名内部类使用

ReentrantLock

可重入互斥锁,和 synchronized 定位类似,都是用来实现互斥效果,保证线程安全

ReentrantLock的用法:

  • lock():加锁,如果获取不到锁就死等
  • trylock(超时时间):加锁,如果获取不到锁,等待一定的时间之后就放弃加锁
  • unlock():解锁

ReentrantLock具有一些特点,是synchronized 不具备的功能:

  1. 提供了一个tryLock方法进行加锁:

​ 对于lock操作,如果加锁不成功,就会阻塞等待(死等)

​ 对于tryLock,如果加锁失败,直接返回false/也可以设定等待时间

  1. ReentrantLock有两种模式,可以在工作在公平锁状态下,也可以工作在非公平锁状态下。构造方法中通过参数设定的公平/非公平模式
  2. ReentrantLock也有等待通知机制,搭配Condition这样的类来完成,这里的等待通知要比wait、notify功能更强
  3. 但是ReentrantLock也可能容易遗漏unlock(),通常使用finally来执行

ReentrantLock 和 synchronized 的区别:

  • synchronized 是一个关键字,是 JVM 内部实现的(大概率是基于 C++ 实现)。ReentrantLock 是标准库的一个类,在 JVM 外实现的(基于 Java 实现)。
  • synchronized 使用时不需要手动释放锁。ReentrantLock 使用时需要手动释放。使用起来更灵活,但是也容易遗漏 unlock.
  • synchronized 在申请锁失败时,会死等. ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃.
  • synchronized 是非公平锁。ReentrantLock 默认是非公平锁,可以通过构造方法传入一个 true 开启公平锁模式.。

如何选择使用哪个锁?

  • 锁竞争不激烈的时候,使用 synchronized,效率更高,自动释放更方便(实际开发也是用synchronized多)
  • 锁竞争激烈的时候,使用 ReentrantLock,搭配 trylock 更灵活控制加锁的行为,而不是死等
  • 如果需要使用公平锁,使用 ReentrantLock

原子类

原子类内部用的是 CAS 实现,所以性能要比加锁实现 i++ 高很多。原子类有以下几个

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicLong
  • AtomicReference
  • AtomicStampedReference

以 AtomicInteger 举例,常见方法有

addAndGet(int delta);   i += delta;
decrementAndGet(); --i;
getAndDecrement(); i--;
incrementAndGet(); ++i;
getAndIncrement(); i++;

应用场景:

  1. 计数需求

​ 播放量、点赞量…

  1. 统计效果

​ 统计出现错误的请求数目、统计收到的请求数目(衡量服务器的压力)、统计每个请求的响应时间=>平均响应时间(衡量服务器的运行效率)…

​ 通过上述统计内容,实现监控服务器,获取/统计/展示/报警

线程池

虽然创建销毁线程比创建销毁进程更轻量,但是在频繁创建销毁线程的时候还是会比较低效.

线程池就是为了解决这个问题。如果某个线程不再使用了,并不是真正把线程释放,而是放到一个 “池子” 中,下次如果需要用到线程就直接从池子中取,不必通过系统来创建了。

ExecutorService 和 Executors

代码示例

  • ExecutorService 表示一个线程池实例.
  • Executors 是一个工厂类,能够创建出几种不同风格的线程池.
  • ExecutorService 的 submit 方法能够向线程池中提交若干个任务.
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println("hello");
   }
});

Executors 创建线程池的几种方式

  • newFixedThreadPool:创建固定线程数的线程池
  • newCachedThreadPool:创建线程数目动态增长的线程池.
  • newSingleThreadExecutor:创建只包含单个线程的线程池.
  • newScheduledThreadPool:设定 延迟时间后执行命令,或者定期执行命令。是进阶版的 Timer.

Executors 本质上是 ThreadPoolExecutor 类的封装.

ThreadPoolExecutor

ThreadPoolExecutor 提供了更多的可选参数,可以进一步细化线程池行为的设定.

ThreadPoolExecutor 的构造方法

信号量 Semaphore

信号量, 用来表示 “可用资源的个数”. 本质上就是一个计数器,描述的是当前这个线程,是否“有临界资源可以用”

临界资源:多个线程/进程等并发执行的实体可以公共使用到的资源(多个线程修改同一个资源,这个变量就可以认为是临界资源)

理解信号量

可以把信号量想象成是停车场的展示牌: 当前有车位 100 个. 表示有 100 个可用资源.

当有车开进去的时候, 就相当于申请一个可用资源, 可用车位就 -1 (这个称为信号量的 P 操作)

当有车开出来的时候, 就相当于释放一个可用资源, 可用车位就 +1 (这个称为信号量的 V 操作)

如果计数器的值已经为 0 了, 还尝试申请资源, 就会阻塞等待, 直到有其他线程释放资源.

这个阻塞等待的过程有点像锁是吧。

锁,本质上就是一个特殊的信号量(里面的数值,非0即1,二元信号量)

信号量要比锁更广义,不仅可以描述一个资源,还可以描述N个资源。

但还是锁用的更多一些

Semaphore 的 PV 操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.

CountDownLatch

针对特定场景一个组件

同时等待 N 个任务执行结束

好像跑步比赛,10个选手依次就位,哨声响才同时出发;所有选手都通过终点,才能公布成绩。

  • 构造 CountDownLatch 实例,初始化 10 表示有 10 个任务需要完成.
  • 每个任务执行完毕, 都调用 latch.countDown(),在 CountDownLatch 内部的计数器同时自减.
  • 主线程中使用 latch.await(),阻塞等待所有任务执行完毕,相当于计数器为 0 了。(这里await的a是all的意思)

集合类

原来的集合类,大部分都不是线程安全的

Vector,Stack,HashTable,是线程安全的(不建议用),其他的集合类不是线程安全的

Vector、HashTable是上古时期搞出来的集合类

加了锁不一定就线程安全了,不加锁也不一定就线程不安全了

多线程环境使用 ArrayList

  1. 自己使用同步机制 (synchronized 或者 ReentrantLock)
  2. Collections.synchronizedList(new ArrayList)

synchronizedList 是标准库提供的一个基于 synchronized 进行线程同步的 List.

synchronizedList 的关键操作上都带有 synchronized

相当于让ArrayList像Vector一样使用

  1. 使用 CopyOnWriteArrayList

CopyOnWrite容器即写时复制的容器。

  • 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,
  • 添加完元素之后,再将原容器的引用指向新的容器。

这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会

添加任何元素。

所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

优点

在读多写少的场景下, 性能很高, 不需要加锁竞争.

缺点

  1. 占用内存较多.
  2. 新写的数据不能被第一时间读取到.

多线程环境使用队列

  1. ArrayBlockingQueue

基于数组实现的阻塞队列

  1. LinkedBlockingQueue

基于链表实现的阻塞队列

  1. PriorityBlockingQueue

基于堆实现的带优先级的阻塞队列

  1. TransferQueue

最多只包含一个元素的阻塞队列

多线程环境使用哈希表

HashMap 本身不是线程安全的

在多线程环境下使用哈希表可以使用:

  • Hashtable
  • ConcurrentHashMap
  1. Hashtable

只是简单的把关键方法加上了 synchronized 关键字

public synchronized V put(K key, V value) {
public synchronized V get(Object key) {

这相当于直接针对 Hashtable 对象本身加锁

  • 如果多线程访问同一个 Hashtable 就会直接造成锁冲突
  • size 属性也是通过 synchronized 来控制同步,也是比较慢的
  • 一旦触发扩容, 就由该线程完成整个扩容过程。这个过程会涉及到大量的元素拷贝,效率会非常低
  1. ConcurrentHashMap

相比于 Hashtable 做出了一系列的改进和优化. 以 Java1.8 为例

  • 读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁. 加锁的方式仍然是用 synchronized, 但是不是锁整个对象, 而是 “锁桶” (用每个链表的头结点作为锁对象), 大大降低了锁冲突的概率.
  • 充分利用 CAS 特性. 比如 size 属性通过 CAS 来更新. 避免出现重量级锁的情况.
  • 优化了扩容方式: 化整为零
    • 发现需要扩容的线程, 只需要创建一个新的数组, 同时只搬几个元素过去.
    • 扩容期间, 新老数组同时存在.
    • 后续每个来操作 ConcurrentHashMap 的线程, 都会参与搬家的过程. 每个操作负责搬运一小
    • 部分元素.
    • 搬完最后一个元素再把老数组删掉.
    • 这个期间, 插入只往新数组加.
    • 这个期间, 查找需要同时查新数组和老数组

网站公告

今日签到

点亮在社区的每一天
去签到