StampedLock
StampedLock
其实是对读写锁的一种改进,它支持在读同时进行一个写操作,也就是说,它的性能将会比读写锁更快。
更通俗的讲就是在读锁没有释放的时候是可以获取到一个写锁,获取到写锁之后,读锁阻塞,这一点和读写锁一致,唯一的区别在于 读写锁不支持在没有释放读锁的时候获取写锁 。
StampedLock
有三种模式:
悲观读:允许多个线程获取悲观读锁。
写锁:写锁和悲观读是互斥的。
乐观读:无锁机制,类似于数据库中的乐观锁,它支持在不释放乐观读的时候是可以获取到一个写锁。
参考: 有没有比读写锁更快的锁?
示例代码:
悲观读 + 写锁:
package git.snippets.juc; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.StampedLock; import java.util.logging.Logger; // 悲观读 + 写锁 public class StampedLockPessimistic { private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName()); private static final StampedLock lock = new StampedLock(); //缓存中存储的数据 private static final Map<String, String> mapCache = new HashMap<>(); //模拟数据库存储的数据 private static final Map<String, String> mapDb = new HashMap<>(); static { mapDb.put("zhangsan", "你好,我是张三"); mapDb.put("sili", "你好,我是李四"); } private static void getInfo(String name) { //获取悲观读 long stamp = lock.readLock(); log.info("线程名:" + Thread.currentThread().getName() + " 获取了悲观读锁" + " 用户名:" + name); try { if ("zhangsan".equals(name)) { log.info("线程名:" + Thread.currentThread().getName() + " 休眠中" + " 用户名:" + name); Thread.sleep(3000); log.info("线程名:" + Thread.currentThread().getName() + " 休眠结束" + " 用户名:" + name); } String info = mapCache.get(name); if (null != info) { log.info("在缓存中获取到了数据"); return; } } catch (InterruptedException e) { log.info("线程名:" + Thread.currentThread().getName() + " 释放了悲观读锁"); e.printStackTrace(); } finally { //释放悲观读 lock.unlock(stamp); } //获取写锁 stamp = lock.writeLock(); log.info("线程名:" + Thread.currentThread().getName() + " 获取了写锁" + " 用户名:" + name); try { //判断一下缓存中是否被插入了数据 String info = mapCache.get(name); if (null != info) { log.info("获取到了写锁,再次确认在缓存中获取到了数据"); return; } //这里是往数据库获取数据 String infoByDb = mapDb.get(name); //将数据插入缓存 mapCache.put(name, infoByDb); log.info("缓存中没有数据,在数据库获取到了数据"); } finally { //释放写锁 log.info("线程名:" + Thread.currentThread().getName() + " 释放了写锁" + " 用户名:" + name); lock.unlock(stamp); } } public static void main(String[] args) { //线程1 Thread t1 = new Thread(() -> { getInfo("zhangsan"); }); //线程2 Thread t2 = new Thread(() -> { getInfo("lisi"); }); //线程启动 t1.start(); t2.start(); //线程同步 try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
乐观读:
package git.snippets.juc; import java.util.concurrent.locks.StampedLock; import java.util.logging.Logger; // 乐观写 public class StampedLockOptimistic { private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName()); private static final StampedLock lock = new StampedLock(); private static int num1 = 1; private static int num2 = 1; /** * 修改成员变量的值,+1 * * @return */ private static int sum() { log.info("求和方法被执行了"); //获取乐观读 long stamp = lock.tryOptimisticRead(); int cnum1 = num1; int cnum2 = num2; log.info("获取到的成员变量值,cnum1:" + cnum1 + " cnum2:" + cnum2); try { //休眠3秒,目的是为了让其他线程修改掉成员变量的值。 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //判断在运行期间是否存在写操作 true:不存在 false:存在 if (!lock.validate(stamp)) { log.info("存在写操作!"); //存在写锁 //升级悲观读锁 stamp = lock.readLock(); try { log.info("升级悲观读锁"); cnum1 = num1; cnum2 = num2; log.info("重新获取了成员变量的值=========== cnum1=" + cnum1 + " cnum2=" + cnum2); } finally { //释放悲观读锁 lock.unlock(stamp); } } return cnum1 + cnum2; } //使用写锁修改成员变量的值 private static void updateNum() { long stamp = lock.writeLock(); try { num1 = 2; num2 = 2; } finally { lock.unlock(stamp); } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { int sum = sum(); log.info("求和结果:" + sum); }); t1.start(); //休眠1秒,目的为了让线程t1能执行到获取成员变量之后 Thread.sleep(1000); updateNum(); t1.join(); log.info("执行完毕"); } }
使用 StampedLock 的注意事项
看名字就能看出来
StampedLock
不支持重入锁。它适用于读多写少的情况,如果不是这种情况,请慎用,性能可能还不如
synchronized
。StampedLock
的悲观读锁、写锁不支持条件变量。千万不能中断阻塞的悲观读锁或写锁,如果调用阻塞线程的
interrupt()
,会导致cpu飙升,如果希望StampedLock
支持中断操作,请使用readLockInterruptibly
(悲观读锁)与writeLockInterruptibly
(写锁)。
CountDownLatch
类似门闩的概念,可以替代 join
,但是比 join
灵活,因为一个线程里面可以多次 countDown
,但是 join
一定要等线程完成才能执行。
其底层原理是:调用 await()
方法的线程会利用 AQS
排队,一旦数字减为0,则会将 AQS
中排队的线程依次唤醒。
代码如下:
package git.snippets.juc; import java.util.concurrent.CountDownLatch; /** * CountDownLatch可以用Join替代 */ public class CountDownLatchAndJoin { public static void main(String[] args) { useCountDownLatch(); useJoin(); } public static void useCountDownLatch() { // use countdownlatch long start = System.currentTimeMillis(); Thread[] threads = new Thread[100000]; CountDownLatch latch = new CountDownLatch(threads.length); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { int result = 0; for (int i1 = 0; i1 < 1000; i1++) { result += i1; } // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result); latch.countDown(); }); } for (Thread thread : threads) { thread.start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("end latch down, time is " + (end - start)); } public static void useJoin() { long start = System.currentTimeMillis(); // use join Thread[] threads = new Thread[100000]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { int result = 0; for (int i1 = 0; i1 < 1000; i1++) { result += i1; } // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result); }); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); System.out.println("end join, time is " + (end - start)); } }
CyclicBarrier
类似栅栏,类比:满了20个乘客就发车 这样的场景。
比如:一个程序可能收集如下来源的数据:
数据库
网络
文件
程序可以并发执行,用线程操作1,2,3,然后操作完毕后再合并, 然后执行后续的逻辑操作,就可以使用 CyclicBarrier
代码如下:
package git.snippets.juc; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier示例:满员发车 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @since 1.8 */ public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(20, () -> { System.out.println("满了20,发车"); }); for (int i = 0; i < 100; i++) { new Thread(() -> { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
Semaphore
表示信号量,有如下两个操作:
s.acquire()
信号量减1
s.release()
信号量加1
到 0 以后,就不能执行了,这个可以用于 限流 。
底层原理是:如果没有线程许可可用,则线程阻塞,并通过 AQS 来排队,可以通过 release()
方法来释放许可,当某个线程释放了某个许可后,会从 AQS 中正在排队的第一个线程依次开始唤醒,直到没有空闲许可。
Semaphore 使用示例:有N个线程来访问,我需要限制同时运行的只有信号量大小的线程数。
代码如下:
package git.snippets.juc; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Semaphore用于限流 */ public class SemaphoreUsage { public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); new Thread(() -> { try { semaphore.acquire(); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 1 executed"); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); new Thread(() -> { try { semaphore.acquire(); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 2 executed"); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } }
Semaphore
可以有 公平 和 非公平 的方式进行配置。
Semaphore
和 CountDownLatch
的区别?
Semaphore 是信号量,可以做限流,限制 n 个线程并发,释放一个线程后就又能进来一个新的线程。
CountDownLatch 是闭锁,带有阻塞的功能,必须等到 n 个线程都执行完后,被阻塞的线程才能继续往下执行。
Guava RateLimiter
采用令牌桶算法,用于限流
示例代码如下
package git.snippets.juc; import com.google.common.util.concurrent.RateLimiter; import java.util.List; import java.util.concurrent.Executor; /** * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2021/4/21 * @since */ public class RateLimiterUsage { //每秒只发出2个令牌 static final RateLimiter rateLimiter = RateLimiter.create(2.0); static void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // 也许需要等待 executor.execute(task); } } }
注:上述代码需要引入 Guava 包 。
Phaser(Since jdk1.7)
遗传算法,可以用这个结婚的场景模拟: 假设婚礼的宾客有 5 个人,加上新郎和新娘,一共 7 个人。 我们可以把这 7 个人看成 7 个线程,有如下步骤要执行。
到达婚礼现场
吃饭
离开
拥抱(只有新郎和新娘线程可以执行)
每个阶段执行完毕后才能执行下一个阶段,其中拥抱阶段只有新郎新娘这两个线程才能执行。
以上需求,我们可以通过 Phaser 来实现,具体代码和注释如下:
package git.snippets.juc; import java.util.Random; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; public class PhaserUsage { static final Random R = new Random(); static WeddingPhaser phaser = new WeddingPhaser(); static void millSleep() { try { TimeUnit.MILLISECONDS.sleep(R.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { // 宾客的人数 final int guestNum = 5; // 新郎和新娘 final int mainNum = 2; phaser.bulkRegister(mainNum + guestNum); for (int i = 0; i < guestNum; i++) { new Thread(new Person("宾客" + i)).start(); } new Thread(new Person("新娘")).start(); new Thread(new Person("新郎")).start(); } static class WeddingPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { switch (phase) { case 0: System.out.println("所有人到齐"); return false; case 1: System.out.println("所有人吃饭"); return false; case 2: System.out.println("所有人离开"); return false; case 3: System.out.println("新郎新娘拥抱"); return true; default: return true; } } } static class Person implements Runnable { String name; Person(String name) { this.name = name; } @Override public void run() { // 先到达婚礼现场 arrive(); // 吃饭 eat(); // 离开 leave(); // 拥抱,只保留新郎和新娘两个线程可以执行 hug(); } private void arrive() { millSleep(); System.out.println("name:" + name + " 到来"); phaser.arriveAndAwaitAdvance(); } private void eat() { millSleep(); System.out.println("name:" + name + " 吃饭"); phaser.arriveAndAwaitAdvance(); } private void leave() { millSleep(); System.out.println("name:" + name + " 离开"); phaser.arriveAndAwaitAdvance(); } private void hug() { if ("新娘".equals(name) || "新郎".equals(name)) { millSleep(); System.out.println("新娘新郎拥抱"); phaser.arriveAndAwaitAdvance(); } else { phaser.arriveAndDeregister(); } } } }
Exchanger
用于线程之间交换数据, exchange()
方法是阻塞的,所以要两个 exchange
行为都执行到才会触发交换。
package git.snippets.juc; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; /** * Exchanger用于两个线程之间交换变量 */ public class ExchangerUsage { static Exchanger<String> semaphore = new Exchanger<>(); public static void main(String[] args) { new Thread(() -> { String s = "T1"; try { s = semaphore.exchange(s); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 1(T1) executed, Result is " + s); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { String s = "T2"; try { s = semaphore.exchange(s); TimeUnit.SECONDS.sleep(2); System.out.println("Thread 2(T2) executed, Result is " + s); } catch (Exception e) { e.printStackTrace(); } }).start(); } }
LockSupport
其他锁的底层用的是 AQS
原先让线程等待需要 wait/await
,现在仅需要 LockSupport.park()
原先叫醒线程需要 notify/notifyAll
,现在仅需要 LockSupport.unpark()
, LockSupport.unpark()
还可以叫醒指定线程,
示例代码:
package git.snippets.juc; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** * 阻塞指定线程,唤醒指定线程 */ public class LockSupportUsage { public static void main(String[] args) { Thread t = new Thread(() -> { for (int i = 0; i < 10; i++) { try { if (i == 5) { LockSupport.park(); } if (i == 8) { LockSupport.park(); } TimeUnit.SECONDS.sleep(1); System.out.println(i); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); // unpark可以先于park调用 //LockSupport.unpark(t); try { TimeUnit.SECONDS.sleep(8); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(t); System.out.println("after 8 seconds"); } }
实现一个监控元素的容器
实现一个容器,提供两个方法
// 向容器中增加一个元素 void add(T t); // 返回容器大小 int size();
有两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
方法 1. 使用 wait + notify
实现
方法 2. 使用 CountDownLatch
实现
方法 3. 使用 LockSupport
实现
代码如下:
package git.snippets.juc; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; // 实现一个容器,提供两个方法,add,size,有两个线程, // 线程1添加10个元素到容器中, // 线程2实现监控元素的个数, // 当个数到5个时,线程2给出提示并结束 public class MonitorContainer { public static void main(String[] args) { useLockSupport(); // useCountDownLatch(); // useNotifyAndWait(); } /** * 使用LockSupport */ private static void useLockSupport() { System.out.println("use LockSupport..."); Thread adder; List<Object> list = Collections.synchronizedList(new ArrayList<>()); Thread finalMonitor = new Thread(() -> { LockSupport.park(); if (match(list)) { System.out.println("filled 5 elements size is " + list.size()); LockSupport.unpark(null); } }); adder = new Thread(() -> { for (int i = 0; i < 10; i++) { increment(list); if (match(list)) { LockSupport.unpark(finalMonitor); } } }); adder.start(); finalMonitor.start(); } /** * 使用CountDownLatch */ private static void useCountDownLatch() { System.out.println("use CountDownLatch..."); List<Object> list = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(5); Thread adder = new Thread(() -> { for (int i = 0; i < 10; i++) { increment(list); if (i <= 4) { latch.countDown(); } } }); Thread monitor = new Thread(() -> { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (match(list)) { System.out.println("filled 5 elements"); } }); adder.start(); monitor.start(); } /** * notify + wait 实现 */ private static void useNotifyAndWait() { System.out.println("use notify and wait..."); List<Object> list = Collections.synchronizedList(new ArrayList<>()); final Object o = new Object(); Thread adder = new Thread(() -> { synchronized (o) { for (int i = 0; i < 10; i++) { increment(list); if (match(list)) { o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("add finished"); o.notify(); } }); Thread monitor = new Thread(() -> { synchronized (o) { if (match(list)) { System.out.println("5 elements added " + list.size()); o.notify(); try { o.wait(); System.out.println("monitor finished"); o.notify(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); adder.start(); monitor.start(); } /** * 只要是5的倍数,就循环打印 */ private static void useNotifyAndWaitLoop() { List<Object> list = Collections.synchronizedList(new ArrayList<>()); final Object o = new Object(); Thread adder = new Thread(() -> { synchronized (o) { for (; ; ) { increment(list); if (match(list)) { o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }); Thread monitor = new Thread(() -> { synchronized (o) { while (true) { if (match(list)) { System.out.println("filled 5 elements"); } o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); adder.start(); monitor.start(); } private static void increment(List<Object> list) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } list.add(new Object()); System.out.println("list add the ele, size is " + list.size()); } private static boolean match(List<Object> list) { return list.size() % 5 == 0; } }
生产者消费者问题
写一个固定容量的同步容器,拥有 put
和 get
方法,以及 getCount
方法,能够支持 2 个生产者线程以及 10 个消费者线程的阻塞调用。
方法 1. 使用 wait/notifyAll
方法 2. ReentrantLock
的 Condition
,本质就是等待队列
package git.snippets.juc; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; // 写一个固定容量的同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。 public class ProducerAndConsumer { public static void main(String[] args) { // MyContainerByCondition container = new MyContainerByCondition(100); MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100); for (int i = 0; i < 25; i++) { new Thread(container::get).start(); } for (int i = 0; i < 20; i++) { new Thread(() -> container.put(new Object())).start(); } } } // 使用ReentrantLock的Condition class MyContainerByCondition { static ReentrantLock lock = new ReentrantLock(); final int MAX; private final LinkedList<Object> list = new LinkedList<>(); Condition consumer = lock.newCondition(); Condition producer = lock.newCondition(); public MyContainerByCondition(int limit) { this.MAX = limit; } public void put(Object object) { lock.lock(); try { while (getCount() == MAX) { System.out.println("container is full"); try { producer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(object); consumer.signalAll(); System.out.println("contain add a object, current size " + getCount()); } finally { lock.unlock(); } } public Object get() { lock.lock(); try { while (getCount() == 0) { try { System.out.println("container is empty"); consumer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } Object object = list.removeFirst(); producer.signalAll(); System.out.println("contain get a object, current size " + getCount()); return object; } finally { lock.unlock(); } } public synchronized int getCount() { return list.size(); } } // 使用synchronized的wait和notifyAll class MyContainerByNotifyAndWait { LinkedList<Object> list = null; final int limit; MyContainerByNotifyAndWait(int limit) { this.limit = limit; list = new LinkedList<>(); } synchronized int getCount() { return list.size(); } // index 从0开始计数 synchronized Object get() { while (list.size() == 0) { System.out.println("container is empty"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object o = list.removeFirst(); System.out.println("get a data"); this.notifyAll(); return o; } synchronized void put(Object data) { while (list.size() > limit) { System.out.println("container is full , do not add any more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(data); System.out.println("add a data"); this.notifyAll(); } }
说明
本文涉及到的所有代码和图例
更多内容见: Java 多线程