多线程与并发编程深度解析
文章目录
1. 引言:为什么需要多线程与并发编程
在当今计算机硬件发展的浪潮中,多核处理器已经成为主流。如何充分利用多核CPU的计算能力,提高程序的性能和响应速度,成为程序员必须面对的问题。多线程与并发编程正是解决这一问题的关键技术。
多线程编程允许程序在同一个进程中同时执行多个线程,从而充分利用CPU资源,提高程序的执行效率。并发编程则是一种更广泛的编程范式,它关注的是如何正确地处理多个任务的执行,即使这些任务可能在不同的时间点运行。
然而,并发编程并非易事。由于线程之间的交互和资源共享,会带来一系列复杂的问题,如竞态条件、死锁、活锁等。这些问题往往难以重现和调试,给程序开发带来了巨大的挑战。
本文将深入探讨Java平台下的多线程与并发编程技术,从基础的线程概念到高级的并发工具,从理论原理到实践应用,全面解析并发编程的各个方面。希望通过本文的介绍,能够帮助读者更好地理解和掌握并发编程,写出高效、可靠、安全的并发程序。
2. Java内存模型与线程基础
2.1 线程的生命周期
在Java中,线程的生命周期可以分为五种状态:
- 新建状态(New):当线程对象被创建时,它处于新建状态。此时线程已经分配了必要的资源,但尚未启动。
Thread thread = new Thread(); // 新建状态
- 就绪状态(Runnable):当线程对象调用了start()方法后,它就进入了就绪状态。此时线程已经具备了运行的条件,但尚未获得CPU的使用权。
thread.start(); // 进入就绪状态
运行状态(Running):当线程获得了CPU的使用权,开始执行run()方法中的代码时,它就进入了运行状态。
阻塞状态(Blocked):当线程因为某种原因(如等待I/O、获取锁等)暂时放弃CPU的使用权时,它就进入了阻塞状态。阻塞状态的线程不能立即进入运行状态,必须等待特定的条件满足。
死亡状态(Terminated):当线程执行完run()方法或被强制终止时,它就进入了死亡状态。死亡的线程不能再被启动。
// 示例:线程生命周期演示
public class ThreadLifecycleDemo {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("线程运行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程即将结束...");
});
System.out.println("线程状态: " + thread.getState()); // NEW
thread.start();
System.out.println("线程状态: " + thread.getState()); // RUNNABLE
try {
thread.join();
System.out.println("线程状态: " + thread.getState()); // TERMINATED
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.2 Java内存模型(JMM)
Java内存模型(Java Memory Model, JMM)定义了一组规范,来屏蔽各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。JMM的主要目标是定义程序中各个变量的访问规则,即在虚拟机中将变量存储到内存和从内存中取出变量这样的底层细节。
JMM规定了所有的变量都存储在主内存(Main Memory)中。每条线程还有自己的工作内存(Working Memory),线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量。不同线程之间无法直接访问对方工作内存中的变量,线程间的通信必须通过主内存来完成。
// JMM示例
class JMMExample {
private int count = 0; // 存储在主内存中
public void increment() {
count++; // 操作在工作内存中进行,然后写回主内存
}
public int getCount() {
return count; // 从主内存中读取到工作内存,然后返回
}
}
2.3 volatile关键字
volatile是Java提供的一种轻量级的同步机制,它有两个主要作用:
- 保证变量的可见性:当一个线程修改了被volatile修饰的变量,其他线程能立即看到最新的值。
- 禁止指令重排序优化:volatile关键字会插入内存屏障,禁止处理器和编译器对指令进行重排序。
// volatile示例
class VolatileExample {
private volatile boolean flag = false;
public void writer() {
flag = true; // 写操作,会立即刷新到主内存
}
public void reader() {
if (flag) { // 读操作,会直接从主内存读取
// do something
}
}
}
注意事项:
- volatile不保证原子性:volatile只能保证可见性,不能保证复合操作的原子性。
- volatile不能替代synchronized:volatile不能修饰final类型的变量,也不能修饰引用类型。
- volatile的使用场景:通常用于状态标记量,如boolean flag,或者确保单例模式的双重检查锁定。
2.4 synchronized关键字
synchronized是Java提供的内置锁机制,它既可以修饰方法,也可以修饰代码块。synchronized关键字可以保证在同一时刻,只有一个线程可以执行被修饰的方法或代码块。
// synchronized修饰方法
class SynchronizedMethod {
public synchronized void method() {
// 同步方法,锁对象为当前对象(this)
}
}
// synchronized修饰代码块
class SynchronizedBlock {
public void method() {
synchronized (this) { // 同步代码块,锁对象为当前对象(this)
// 同步代码块
}
}
}
synchronized的原理:
- JVM通过monitor(监视器)实现同步,每个对象都可以作为monitor。
- 当线程尝试获取锁时,会先检查monitor的计数器,如果为0,表示锁未被获取,线程将获取锁并将计数器设为1。
- 如果计数器不为0,表示锁已被其他线程获取,当前线程将阻塞,直到锁被释放。
- 当线程释放锁时,计数器减1,当计数器为0时,锁被释放。
注意事项:
- 避免锁的过度使用:锁的获取和释放是有开销的,不必要的同步会降低程序性能。
- 避免锁的嵌套:容易导致死锁问题。
- 注意锁的对象:不同的锁对象之间不会相互影响,确保使用正确的锁对象。
// synchronized注意事项示例
class SynchronizedExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
// 同步代码块1
synchronized (lock2) {
// 同步代码块2 - 可能导致死锁
}
}
}
public void method2() {
synchronized (lock2) {
// 同步代码块2
synchronized (lock1) {
// 同步代码块1 - 可能导致死锁
}
}
}
}
3. 并发集合类
3.1 ConcurrentHashMap
ConcurrentHashMap是Java并发包中提供的一个线程安全的哈希表实现,它可以在高并发环境下提供良好的性能。与Hashtable不同,ConcurrentHashMap采用了分段锁(Segment)技术,将数据分成多个段,每段都有自己的锁,从而允许多个线程同时访问不同的段,提高了并发性能。
在Java 8中,ConcurrentHashMap的实现进行了优化,取消了分段锁,改用CAS和synchronized来实现。当没有哈希冲突时,使用CAS操作来保证原子性;当出现哈希冲突时,使用synchronized锁定链表或红黑树的头部,保证并发安全。
// ConcurrentHashMap示例
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 并发写入
for (int i = 0; i < 10; i++) {
final int key = i;
new Thread(() -> {
map.put("key" + key, key);
System.out.println("Put key" + key + " success");
}).start();
}
// 并发读取
for (int i = 0; i < 10; i++) {
final int key = i;
new Thread(() -> {
Integer value = map.get("key" + key);
System.out.println("Get key" + key + " value: " + value);
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final map size: " + map.size());
}
}
ConcurrentHashMap的主要方法:
- put(K key, V value):将指定的键值对映射到此表中。
- get(Object key):返回指定键所映射的值。
- remove(Object key):从此表中移除该键及其对应的值。
- size():返回此表中的键值映射数量。
- containsKey(Object key):如果此表包含指定键的映射,则返回true。
注意事项:
- ConcurrentHashMap不能保证复合操作的原子性,如putIfAbsent。
- 遍历时可能不能反映最新的更新,因为遍历时不会锁定整个表。
- 计算操作(如compute、merge等)可能会被阻塞,直到操作完成。
3.2 CopyOnWriteArrayList
CopyOnWriteArrayList是Java并发包中提供的一个线程安全的List实现,它的特点是写操作时复制底层数组,从而保证读操作的高性能。CopyOnWriteArrayList适用于读多写少的场景。
// CopyOnWriteArrayList示例
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
// 并发添加元素
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
list.add(num);
System.out.println("Added: " + num);
}).start();
}
// 并发遍历
new Thread(() -> {
for (Integer num : list) {
System.out.println("Iterating: " + num);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final list size: " + list.size());
}
}
CopyOnWriteArrayList的主要方法:
- add(E e):将指定元素添加到此列表的尾部。
- get(int index):返回此列表中指定位置上的元素。
- remove(int index):移除此列表中指定位置上的元素。
- size():返回此列表的元素数。
- iterator():返回在此列表元素上进行迭代的列表迭代器。
注意事项:
- 写操作(add、set、remove等)会复制整个底层数组,开销较大。
- 遍历时使用的是创建迭代器时的数组快照,不能反映最新的修改。
- 不适合写多读少的场景,因为写操作的开销较大。
- 内存占用问题:由于每次写操作都会复制数组,可能导致内存占用翻倍。
3.3 BlockingQueue接口及其实现
BlockingQueue是Java并发包中提供的一个阻塞队列接口,它支持两个附加操作:
- 当队列元素为空时,获取元素的线程会阻塞,直到队列中有可用元素。
- 当队列已满时,添加元素的线程会阻塞,直到队列中有可用空间。
BlockingQueue常用于生产者-消费者模型,是并发编程中非常实用的工具。
// BlockingQueue示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生产者线程
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put(i); // 如果队列满,会阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
int num = queue.take(); // 如果队列空,会阻塞
System.out.println("Consumed: " + num);
Thread.sleep(100); // 模拟处理时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
BlockingQueue的主要实现类:
- ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO原则对元素进行排序。
- LinkedBlockingQueue:基于链表的阻塞队列,按FIFO原则对元素进行排序,吞吐量通常高于ArrayBlockingQueue。
- PriorityBlockingQueue:支持优先级的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,每个put操作必须等待一个take操作。
BlockingQueue的主要方法:
- add(E e):将指定元素添加到此队列中,如果队列已满,则抛出IllegalStateException。
- offer(E e):将指定元素添加到此队列中,如果队列已满,则返回false。
- put(E e):将指定元素添加到此队列中,如果队列已满,则等待可用空间。
- take():获取并移除此队列的头部元素,如果队列为空,则等待可用元素。
- poll(long timeout, TimeUnit unit):获取并移除此队列的头部元素,如果队列为空,则等待指定时间。
注意事项:
- 使用put和take方法时要注意处理InterruptedException。
- 有界队列可以防止资源耗尽,但需要在生产者和消费者之间平衡。
- 无界队列可能导致内存耗尽,使用时要谨慎。
4. 线程池
4.1 ThreadPoolExecutor详解
ThreadPoolExecutor是Java并发包中提供的线程池实现,它是最灵活、最强大的线程池实现。通过合理配置ThreadPoolExecutor,可以有效地控制线程的执行,提高系统的性能和稳定性。
ThreadPoolExecutor的核心参数:
- corePoolSize:核心线程数,线程池中长期存活的线程数量。
- maximumPoolSize:最大线程数,线程池中允许的最大线程数量。
- keepAliveTime:空闲线程存活时间,超过这个时间的空闲线程将被终止。
- unit:keepAliveTime的时间单位。
- workQueue:工作队列,用于存储等待执行的任务。
- threadFactory:线程工厂,用于创建新线程。
- handler:拒绝策略,当线程池和队列都满时,如何处理新任务。
// ThreadPoolExecutor示例
import java.util.concurrent.*;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("MyThread-" + thread.getId());
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
try {
// 等待所有任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
ThreadPoolExecutor的执行流程:
- 当提交一个新任务时,如果当前线程数小于核心线程数,则创建一个新的核心线程来执行任务。
- 如果当前线程数大于或等于核心线程数,则将任务添加到工作队列中。
- 如果工作队列已满,且当前线程数小于最大线程数,则创建一个新的非核心线程来执行任务。
- 如果工作队列已满,且当前线程数等于最大线程数,则执行拒绝策略。
ThreadPoolExecutor的主要方法:
- execute(Runnable command):执行指定的任务。
- submit(Callable task):提交一个返回值的任务用于执行。
- shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
- shutdownNow():尝试停止所有的活动执行任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
- getPoolSize():返回线程池的当前线程数。
- getActiveCount():返回正在执行任务的线程数。
注意事项:
- 合理设置线程池大小:CPU密集型任务通常设置为CPU核心数+1,IO密集型任务可以设置大一些。
- 选择合适的工作队列:有界队列可以防止资源耗尽,无界队列可能导致内存溢出。
- 处理拒绝策略:根据业务需求选择合适的拒绝策略,如丢弃任务、记录日志或由调用者执行。
- 正确关闭线程池:使用shutdown()或shutdownNow()方法,并处理InterruptedException。
4.2 Executors工厂类
Executors是Java并发包中提供的一个工厂类,它提供了一些静态方法来创建预配置的线程池。虽然使用Executors可以方便地创建线程池,但在高并发场景下,推荐使用ThreadPoolExecutor来创建线程池,以避免资源耗尽的风险。
// Executors示例
import java.util.concurrent.*;
public class ExecutorsExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 创建单线程执行器
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 创建缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 创建定时线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 创建单线程定时执行器
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedThreadPool.execute(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
fixedThreadPool.shutdown();
}
}
Executors提供的线程池类型:
- newFixedThreadPool(int nThreads):创建固定大小的线程池,核心线程数和最大线程数相同,使用无界队列。
- newSingleThreadExecutor():创建单线程执行器,核心线程数和最大线程数都为1,使用无界队列。
- newCachedThreadPool():创建可缓存的线程池,核心线程数为0,最大线程数为Integer.MAX_VALUE,使用SynchronousQueue。
- newScheduledThreadPool(int corePoolSize):创建定时线程池,可以执行定时任务和周期性任务。
- newSingleThreadScheduledExecutor():创建单线程定时执行器,功能类似newScheduledThreadPool,但核心线程数和最大线程数都为1。
注意事项:
- 避免使用无界队列:newFixedThreadPool和newSingleThreadExecutor使用无界队列,可能导致内存溢出。
- newCachedThreadPool的线程数可能无限制增长:当任务到达速度超过处理速度时,会创建大量线程。
- 定时线程池的资源管理:合理设置核心线程数,避免资源浪费。
4.3 Fork/Join框架
Fork/Join框架是Java 7引入的一种并行执行任务的框架,它将一个大任务分解成多个小任务,并行执行这些小任务,最后将结果合并。Fork/Join框架是基于工作窃取(Work-Stealing)算法实现的,可以提高多核CPU的利用率。
// Fork/Join示例
import java.util.concurrent.*;
public class ForkJoinExample {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
// 创建一个计算1到100和的任务
SumTask task = new SumTask(1, 100);
// 提交任务
Future<Integer> result = pool.submit(task);
try {
System.out.println("Result: " + result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
pool.shutdown();
}
static class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 任务分解阈值
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小,直接计算
if (end - start <= THRESHOLD) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 任务太大,分解成两个子任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 合并子任务结果
sum = leftTask.join() + rightTask.join();
}
return sum;
}
}
}
Fork/Join框架的核心组件:
- ForkJoinPool:执行Fork/Join任务的线程池,管理工作线程和工作队列。
- ForkJoinTask:Fork/Join框架的任务抽象类,有两个子类:RecursiveAction(无返回值任务)和RecursiveTask(有返回值任务)。
- fork():异步执行任务,将任务分解成子任务。
- join():等待任务执行完成,获取结果。
- compute():任务的计算逻辑,通常在这里实现任务的分解和合并。
注意事项:
- 合理设置任务分解阈值:阈值太小会导致任务分解开销过大,阈值太大会影响并行度。
- 避免内存过度消耗:任务分解时要注意内存使用,避免内存溢出。
- 处理异常:ForkJoinTask中的异常需要通过Future.get()获取,或者在任务中捕获并处理。
5. 锁机制
5.1 ReentrantLock
ReentrantLock是Java并发包中提供的一个可重入互斥锁,它类似于synchronized,但提供了更灵活的锁定机制。ReentrantLock支持公平锁和非公平锁两种模式,并且可以响应中断、尝试获取锁和定时获取锁。
// ReentrantLock示例
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 获取锁
try {
count++; // 临界区代码
} finally {
lock.unlock(); // 确保锁被释放
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
// 尝试获取锁示例
public boolean tryIncrement() {
if (lock.tryLock()) { // 尝试获取锁
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 定时获取锁示例
public boolean tryIncrementWithTimeout() throws InterruptedException {
if (lock.tryLock(1, TimeUnit.SECONDS)) { // 尝试获取锁,最多等待1秒
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
ReentrantLock的主要特性:
- 可重入:同一个线程可以多次获取已经持有的锁。
- 公平性:可以设置为公平锁(FIFO)或非公平锁(默认,吞吐量更高)。
- 响应中断:可以响应中断,避免线程无法获取锁时被永久阻塞。
- 尝试获取锁:可以尝试获取锁,获取失败后立即返回。
- 定时获取锁:可以在指定时间内尝试获取锁。
ReentrantLock的主要方法:
- lock():获取锁,如果锁已被其他线程获取,则等待。
- unlock():释放锁。
- tryLock():尝试获取锁,如果锁不可用,则返回false。
- tryLock(long time, TimeUnit unit):尝试获取锁,在指定时间内获取成功返回true,否则返回false。
- lockInterruptibly():获取锁,但可以响应中断。
- isLocked():查询锁是否被任何线程持有。
- isHeldByCurrentThread():查询锁是否被当前线程持有。
注意事项:
- 确保锁的释放:使用try-finally块确保锁一定会被释放。
- 避免死锁:不要在持锁时调用其他可能阻塞的方法。
- 注意锁的粒度:锁的粒度越小,并发性越高,但要确保不会破坏原子性。
5.2 ReadWriteLock
ReadWriteLock是Java并发包中提供的一种读写锁实现,它将锁分为读锁和写锁两种。读锁是共享的,多个线程可以同时持有读锁;写锁是独占的,只有一个线程可以持有写锁。ReadWriteLock适用于读多写少的场景,可以提高并发性能。
// ReadWriteLock示例
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int count = 0;
// 写操作
public void increment() {
rwLock.writeLock().lock(); // 获取写锁
try {
count++; // 临界区代码
} finally {
rwLock.writeLock().unlock(); // 释放写锁
}
}
// 读操作
public int getCount() {
rwLock.readLock().lock(); // 获取读锁
try {
return count; // 临界区代码
} finally {
rwLock.readLock().unlock(); // 释放读锁
}
}
// 降级锁示例:先获取写锁,再获取读锁,然后释放写锁
public void downgrade() {
rwLock.writeLock().lock();
try {
count++;
rwLock.readLock().lock(); // 获取读锁
} finally {
rwLock.writeLock().unlock(); // 释放写锁,但保留读锁
}
// 此时持有读锁
try {
// 可以进行读操作
} finally {
rwLock.readLock().unlock(); // 释放读锁
}
}
}
ReadWriteLock的主要特性:
- 读共享:多个线程可以同时持有读锁。
- 写独占:只有一个线程可以持有写锁。
- 锁降级:写锁可以降级为读锁,但读锁不能升级为写锁。
- 公平性:可以设置为公平锁或非公平锁。
ReadWriteLock的主要方法:
- readLock():返回读锁。
- writeLock():返回写锁。
注意事项:
- 避免锁饥饿:在写操作频繁的场景下,读操作可能会被饿死(一直获取不到锁)。
- 注意锁的升级:Java不支持锁的升级(读锁升级为写锁),可能会导致死锁。
- 合理使用锁:在读多写少的场景下使用,如果写操作频繁,性能可能不如独占锁。
5.3 StampedLock
StampedLock是Java 8引入的一种新的锁机制,它是对ReadWriteLock的增强,提供了三种模式的锁:写锁、悲观读锁和乐观读。StampedLock在读多写少的场景下性能优于ReadWriteLock,因为它支持乐观读,避免了不必要的锁获取和释放。
// StampedLock示例
import java.util.concurrent.locks.StampedLock;
public class StampedLockExample {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
// 写操作
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
// 悲观读操作
public double distanceFromOrigin() {
long stamp = stampedLock.readLock(); // 获取读锁
try {
return Math.sqrt(x * x + y * y);
} finally {
stampedLock.unlockRead(stamp); // 释放读锁
}
}
// 乐观读操作
public double distanceFromOriginOptimistic() {
long stamp = stampedLock.tryOptimisticRead(); // 尝试获取乐观读锁
double currentX = x;
double currentY = y;
if (!stampedLock.validate(stamp)) { // 检查乐观读锁是否仍然有效
stamp = stampedLock.readLock(); // 如果无效,获取悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 锁转换示例:从写锁转换为读锁
public void moveAndDistance(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
stamp = stampedLock.tryConvertToReadLock(stamp); // 尝试转换为读锁
if (stamp == 0) {
stampedLock.unlockWrite(stamp); // 转换失败,释放写锁
stamp = stampedLock.readLock(); // 获取读锁
}
// 此时持有读锁
double distance = Math.sqrt(x * x + y * y);
System.out.println("Distance from origin: " + distance);
} finally {
stampedLock.unlock(stamp); // 释放锁
}
}
}
StampedLock的主要特性:
- 乐观读:不获取锁,而是检查锁是否仍然有效,适用于读多写少的场景。
- 三种锁模式:写锁(独占)、悲观读锁(共享)和乐观读(无锁)。
- 锁转换:支持从写锁转换为读锁,但不支持从读锁转换为写锁。
- 非公平性:不支持公平性,性能更高。
StampedLock的主要方法:
- writeLock():获取写锁。
- readLock():获取读锁。
- tryOptimisticRead():尝试获取乐观读锁。
- tryConvertToReadLock(long stamp):尝试将写锁转换为读锁。
- tryConvertToWriteLock(long stamp):尝试将读锁或乐观读锁升级为写锁。
- unlockWrite(long stamp):释放写锁。
- unlockRead(long stamp):释放读锁。
- unlock(long stamp):释放锁。
- validate(long stamp):检查乐观读锁是否仍然有效。
注意事项:
- 乐观读不是绝对安全的:在获取乐观读锁后,如果数据被修改,需要重新获取悲观读锁。
- 避免锁升级:不支持从读锁升级为写锁,可能会导致死锁。
- 注意释放锁:必须使用相同的stamp值来释放锁,否则会抛出异常。
- 不支持可重入:StampedLock不是可重入锁,一个线程不能重复获取已经持有的锁。
5.4 锁优化策略
锁优化是提高并发程序性能的重要手段,合理的锁优化可以减少锁竞争,提高系统的吞吐量。以下是一些常用的锁优化策略:
- 减少锁的持有时间:尽量只锁定必要的代码段,减少锁的持有时间,提高并发性。
// 不好的做法:锁定整个方法
public synchronized void badMethod() {
// 一些不需要同步的代码
// 临界区代码
// 一些不需要同步的代码
}
// 好的做法:只锁定必要的代码段
public void goodMethod() {
// 一些不需要同步的代码
synchronized (this) {
// 临界区代码
}
// 一些不需要同步的代码
}
- 减小锁的粒度:将大锁拆分成多个小锁,减少锁竞争。
// 不好的做法:使用一个大的锁
class BadExample {
private final Object lock = new Object();
private Map<String, String> map1 = new HashMap<>();
private Map<String, String> map2 = new HashMap<>();
public void put(String key, String value) {
synchronized (lock) {
map1.put(key, value);
map2.put(key, value);
}
}
}
// 好的做法:使用多个小锁
class GoodExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
private Map<String, String> map1 = new HashMap<>();
private Map<String, String> map2 = new HashMap<>();
public void put(String key, String value) {
synchronized (lock1) {
map1.put(key, value);
}
synchronized (lock2) {
map2.put(key, value);
}
}
}
- 锁分离:将不同类型的操作使用不同的锁,减少锁竞争。
// 锁分离示例
class LockSplitExample {
private final Object readLock = new Object();
private final Object writeLock = new Object();
private Map<String, String> map = new HashMap<>();
public void get(String key) {
synchronized (readLock) {
return map.get(key);
}
}
public void put(String key, String value) {
synchronized (writeLock) {
map.put(key, value);
}
}
}
- 锁粗化:对于频繁加锁和解锁的代码块,可以将其合并为一个大的代码块,减少锁操作的开销。
// 不好的做法:频繁加锁解锁
for (int i = 0; i < 10; i++) {
synchronized (lock) {
// 临界区代码
}
}
// 好的做法:锁粗化
synchronized (lock) {
for (int i = 0; i < 10; i++) {
// 临界区代码
}
}
使用并发集合:使用Java并发包提供的并发集合类,如ConcurrentHashMap、CopyOnWriteArrayList等,它们已经实现了高效的并发控制。
使用CAS操作:对于简单的原子操作,可以使用CAS(Compare-And-Swap)操作,避免使用锁。
// CAS示例
import java.util.concurrent.atomic.AtomicInteger;
class CASExample {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
// 使用CAS操作递增
int current, next;
do {
current = count.get();
next = current + 1;
} while (!count.compareAndSet(current, next));
}
}
读写锁分离:对于读多写少的场景,使用读写锁(ReadWriteLock)来提高并发性能。
避免锁嵌套:避免在一个锁的临界区内获取另一个锁,可能导致死锁。
// 不好的做法:可能导致死锁
class DeadlockExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void badMethod1() {
synchronized (lock1) {
synchronized (lock2) {
// 临界区代码
}
}
}
public void badMethod2() {
synchronized (lock2) {
synchronized (lock1) {
// 临界区代码
}
}
}
}
- 使用锁超时:避免线程无限期等待锁,可以设置锁获取的超时时间。
// 锁超时示例
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class LockTimeoutExample {
private final Lock lock = new ReentrantLock();
public boolean tryWithTimeout() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) { // 尝试获取锁,最多等待1秒
try {
// 临界区代码
return true;
} finally {
lock.unlock();
}
}
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
- 使用无锁数据结构:对于某些场景,可以使用无锁数据结构,如ConcurrentLinkedQueue,它们基于CAS操作实现。
6. 原子类
6.1 基本类型原子类
Java并发包提供了几个用于基本类型的原子类,它们使用CAS(Compare-And-Swap)操作来实现原子性,比使用锁更高效。这些类包括AtomicInteger、AtomicLong和AtomicBoolean。
// 基本类型原子类示例
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
public class AtomicPrimitiveExample {
private AtomicInteger atomicInteger = new AtomicInteger(0);
private AtomicLong atomicLong = new AtomicLong(0L);
private AtomicBoolean atomicBoolean = new AtomicBoolean(false);
// AtomicInteger示例
public void increment() {
atomicInteger.incrementAndGet(); // 原子递增
}
public int get() {
return atomicInteger.get(); // 原子获取
}
public int compareAndSet(int expect, int update) {
return atomicInteger.compareAndSet(expect, update) ? 1 : 0; // 原子比较并设置
}
// AtomicLong示例
public void add(long delta) {
atomicLong.addAndGet(delta); // 原子增加
}
public long getLong() {
return atomicLong.get(); // 原子获取
}
// AtomicBoolean示例
public void setTrue() {
atomicBoolean.set(true); // 原子设置
}
public boolean get() {
return atomicBoolean.get(); // 原子获取
}
public boolean compareAndSet(boolean expect, boolean update) {
return atomicBoolean.compareAndSet(expect, update); // 原子比较并设置
}
}
基本类型原子类的主要方法:
- get():获取当前值。
- set(int newValue):设置为新值。
- lazySet(int newValue):延迟设置为新值,可能不会立即对其他线程可见。
- compareAndSet(int expect, int update):如果当前值等于expect,则更新为update。
- weakCompareAndSet(int expect, int update):弱版本的compareAndSet,可能失败。
- getAndSet(int newValue):获取当前值并设置为新值。
- getAndIncrement():获取当前值并递增。
- incrementAndGet():递增并获取新值。
- getAndDecrement():获取当前值并递减。
- decrementAndGet():递减并获取新值。
- getAndAdd(int delta):获取当前值并增加delta。
- addAndGet(int delta):增加delta并获取新值。
注意事项:
- 原子类只能保证单个操作的原子性,不能保证复合操作的原子性。
- 原子类的性能通常优于锁,但在高竞争场景下,CAS操作可能因为自旋导致CPU消耗增加。
- 原子类不是可重入的,同一个线程不能重复获取已经持有的原子值。
6.2 数组类型原子类
Java并发包还提供了用于数组元素的原子类,如AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray。这些类可以原子地更新数组中的元素。
// 数组类型原子类示例
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class AtomicArrayExample {
private AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
private AtomicLongArray atomicLongArray = new AtomicLongArray(10);
private AtomicReferenceArray<String> atomicReferenceArray = new AtomicReferenceArray<>(10);
// AtomicIntegerArray示例
public void set(int index, int value) {
atomicIntegerArray.set(index, value); // 原子设置数组元素
}
public int get(int index) {
return atomicIntegerArray.get(index); // 原子获取数组元素
}
public int addAndGet(int index, int delta) {
return atomicIntegerArray.addAndGet(index, delta); // 原子增加数组元素
}
// AtomicLongArray示例
public void setLong(int index, long value) {
atomicLongArray.set(index, value); // 原子设置数组元素
}
public long getLong(int index) {
return atomicLongArray.get(index); // 原子获取数组元素
}
// AtomicReferenceArray示例
public void setReference(int index, String value) {
atomicReferenceArray.set(index, value); // 原子设置数组元素
}
public String getReference(int index) {
return atomicReferenceArray.get(index); // 原子获取数组元素
}
public boolean compareAndSet(int index, String expect, String update) {
return atomicReferenceArray.compareAndSet(index, expect, update); // 原子比较并设置数组元素
}
}
数组类型原子类的主要方法:
- get(int i):获取数组中第i个元素的值。
- set(int i, int newValue):设置数组中第i个元素的值。
- lazySet(int i, int newValue):延迟设置数组中第i个元素的值。
- compareAndSet(int i, int expect, int update):如果数组中第i个元素的值等于expect,则更新为update。
- weakCompareAndSet(int i, int expect, int update):弱版本的compareAndSet,可能失败。
- getAndSet(int i, int newValue):获取数组中第i个元素的值并设置为新值。
- getAndAdd(int i, int delta):获取数组中第i个元素的值并增加delta。
- addAndGet(int i, int delta):增加数组中第i个元素的delta并获取新值。
注意事项:
- 数组类型原子类只能保证单个数组元素操作的原子性,不能保证整个数组操作的原子性。
- 数组类型原子类的性能通常优于锁,但在高竞争场景下,CAS操作可能因为自旋导致CPU消耗增加。
- 数组类型原子类不是可重入的,同一个线程不能重复获取已经持有的数组元素。
6.3 引用类型原子类
Java并发包提供了用于引用类型的原子类,如AtomicReference、AtomicStampedReference和AtomicMarkableReference。这些类可以原子地更新引用类型的数据。
// 引用类型原子类示例
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.atomic.AtomicMarkableReference;
public class AtomicReferenceExample {
private AtomicReference<String> atomicReference = new AtomicReference<>();
private AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("initial", 0);
private AtomicMarkableReference<String> atomicMarkableReference = new AtomicMarkableReference<>("initial", false);
// AtomicReference示例
public void set(String value) {
atomicReference.set(value); // 原子设置引用
}
public String get() {
return atomicReference.get(); // 原子获取引用
}
public boolean compareAndSet(String expect, String update) {
return atomicReference.compareAndSet(expect, update); // 原子比较并设置引用
}
// AtomicStampedReference示例
public void setWithStamp(String value, int stamp) {
atomicStampedReference.set(value, stamp); // 原子设置引用和版本号
}
public String getReference() {
return atomicStampedReference.getReference(); // 原子获取引用
}
public int getStamp() {
return atomicStampedReference.getStamp(); // 原子获取版本号
}
public boolean compareAndSet(String expect, String update, int expectStamp, int updateStamp) {
return atomicStampedReference.compareAndSet(expect, update, expectStamp, updateStamp); // 原子比较并设置引用和版本号
}
// AtomicMarkableReference示例
public void setWithMark(String value, boolean mark) {
atomicMarkableReference.set(value, mark); // 原子设置引用和标记
}
public String getReference() {
return atomicMarkableReference.getReference(); // 原子获取引用
}
public boolean getMark() {
return atomicMarkableReference.getMark(); // 原子获取标记
}
public boolean compareAndSet(String expect, String update, boolean expectMark, boolean updateMark) {
return atomicMarkableReference.compareAndSet(expect, update, expectMark, updateMark); // 原子比较并设置引用和标记
}
}
引用类型原子类的主要方法:
- get():获取当前引用。
- set(V newValue):设置为新引用。
- lazySet(V newValue):延迟设置为新引用,可能不会立即对其他线程可见。
- compareAndSet(V expect, V update):如果当前引用等于expect,则更新为update。
- weakCompareAndSet(V expect, V update):弱版本的compareAndSet,可能失败。
- getAndSet(V newValue):获取当前引用并设置为新引用。
对于AtomicStampedReference:
- getReference():获取当前引用。
- getStamp():获取当前版本号。
- set(V newValue, int newStamp):设置为新引用和新版本号。
- compareAndSet(V expect, V update, int expectStamp, int updateStamp):如果当前引用和版本号分别等于expect和expectStamp,则更新为update和updateStamp。
对于AtomicMarkableReference:
- getReference():获取当前引用。
- getMark():获取当前标记。
- set(V newValue, boolean newMark):设置为新引用和新标记。
- compareAndSet(V expect, V update, boolean expectMark, boolean updateMark):如果当前引用和标记分别等于expect和expectMark,则更新为update和updateMark。
注意事项:
- 引用类型原子类只能保证单个操作的原子性,不能保证复合操作的原子性
- 引用类型原子类在高竞争场景下,CAS操作可能因为自旋导致CPU消耗增加。
- AtomicReference不能解决ABA问题,而AtomicStampedReference和AtomicMarkableReference可以解决ABA问题。
6.4 字段更新器
Java并发包提供了字段更新器类,如AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。这些类可以原子地更新对象中的字段,而不需要将字段声明为原子类型。
// 字段更新器示例
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class FieldUpdaterExample {
private static class Counter {
volatile int count;
volatile long longValue;
volatile String referenceValue;
}
private Counter counter = new Counter();
private static final AtomicIntegerFieldUpdater<Counter> countUpdater =
AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
private static final AtomicLongFieldUpdater<Counter> longValueUpdater =
AtomicLongFieldUpdater.newUpdater(Counter.class, "longValue");
private static final AtomicReferenceFieldUpdater<Counter, String> referenceValueUpdater =
AtomicReferenceFieldUpdater.newUpdater(Counter.class, String.class, "referenceValue");
// AtomicIntegerFieldUpdater示例
public void increment() {
countUpdater.incrementAndGet(counter); // 原子递增字段
}
public int getCount() {
return countUpdater.get(counter); // 原子获取字段
}
// AtomicLongFieldUpdater示例
public void add(long delta) {
longValueUpdater.addAndGet(counter, delta); // 原子增加字段
}
public long getLongValue() {
return longValueUpdater.get(counter); // 原子获取字段
}
// AtomicReferenceFieldUpdater示例
public void setReferenceValue(String value) {
referenceValueUpdater.set(counter, value); // 原子设置字段
}
public String getReferenceValue() {
return referenceValueUpdater.get(counter); // 原子获取字段
}
public boolean compareAndSetReferenceValue(String expect, String update) {
return referenceValueUpdater.compareAndSet(counter, expect, update); // 原子比较并设置字段
}
}
字段更新器的主要方法:
- get(T obj):获取对象obj中字段的值。
- set(T obj, int newValue):设置对象obj中字段的值为newValue。
- lazySet(T obj, int newValue):延迟设置对象obj中字段的值为newValue。
- compareAndSet(T obj, int expect, int update):如果对象obj中字段的值等于expect,则更新为update。
- weakCompareAndSet(T obj, int expect, int update):弱版本的compareAndSet,可能失败。
- getAndSet(T obj, int newValue):获取对象obj中字段的值并设置为newValue。
- getAndAdd(T obj, int delta):获取对象obj中字段的值并增加delta。
- addAndGet(T obj, int delta):增加对象obj中字段的delta并获取新值。
注意事项:
- 字段必须是volatile修饰的,否则会抛出IllegalArgumentException。
- 字段不能是final修饰的,否则会抛出IllegalArgumentException。
- 字段更新器是受限的,只能更新指定类型的字段,不能更新其他类型的字段。
- 字段更新器的性能通常优于锁,但在高竞争场景下,CAS操作可能因为自旋导致CPU消耗增加。
6.5 原子累加器
Java并发包提供了原子累加器类,如DoubleAccumulator、DoubleAdder、LongAccumulator和LongAdder。这些类适用于高并发场景下的统计计算,比原子类型有更好的性能。
// 原子累加器示例
import java.util.concurrent.DoubleAccumulator;
import java.util.concurrent.DoubleAdder;
import java.util.concurrent.LongAccumulator;
import java.util.concurrent.LongAdder;
public class AtomicAccumulatorExample {
private DoubleAccumulator doubleAccumulator = new DoubleAccumulator(Double::sum, 0.0);
private DoubleAdder doubleAdder = new DoubleAdder();
private LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L);
private LongAdder longAdder = new LongAdder();
// DoubleAccumulator示例
public void accumulate(double value) {
doubleAccumulator.accumulate(value); // 累加值
}
public double get() {
return doubleAccumulator.get(); // 获取当前值
}
public void reset() {
doubleAccumulator.reset(); // 重置为初始值
}
// DoubleAdder示例
public void add(double value) {
doubleAdder.add(value); // 增加值
}
public double sum() {
return doubleAdder.sum(); // 获取当前和
}
public void reset() {
doubleAdder.reset(); // 重置为0
}
// LongAccumulator示例
public void accumulate(long value) {
longAccumulator.accumulate(value); // 累加值
}
public long get() {
return longAccumulator.get(); // 获取当前值
}
public void reset() {
longAccumulator.reset(); // 重置为初始值
}
// LongAdder示例
public void increment() {
longAdder.increment(); // 递增1
}
public void add(long value) {
longAdder.add(value); // 增加值
}
public long sum() {
return longAdder.sum(); // 获取当前和
}
public void reset() {
longAdder.reset(); // 重置为0
}
}
原子累加器的主要方法:
对于DoubleAccumulator和LongAccumulator:
- accumulate(double x / long x):累加值x。
- get():获取当前值。
- reset():重置为初始值。
- doubleValue() / longValue():获取当前值的double/long表示。
- intValue():获取当前值的int表示。
- floatValue():获取当前值的float表示。
对于DoubleAdder和LongAdder:
- increment() / decrement():递增1/递减1。
- add(double x / long x):增加值x。
- sum():获取当前和。
- reset():重置为0。
- doubleValue() / longValue():获取当前和的double/long表示。
- intValue():获取当前和的int表示。
- floatValue():获取当前和的float表示。
注意事项:
- 原子累加器适用于高并发场景下的统计计算,不适合需要精确值的场景。
- 原子累加器的性能通常优于原子类型,但在低并发场景下,原子类型可能更高效。
- 原子累加器不是可重入的,同一个线程不能重复获取已经持有的累加器。
- 原子累加器不支持CAS操作,只支持累加和获取操作。
7. 并发工具类
7.1 CountDownLatch
CountDownLatch是Java并发包中提供的一个同步工具类,它允许一个或多个线程等待其他线程完成操作后再继续执行。CountDownLatch通过一个计数器来实现,计数器的初始值表示需要等待的线程数量,每当一个线程完成操作,计数器减1,当计数器为0时,等待的线程可以继续执行。
// CountDownLatch示例
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建并启动多个工作线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println("Thread " + Thread.currentThread().getName() + " is working");
try {
Thread.sleep(1000); // 模拟工作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread " + Thread.currentThread().getName() + " finished");
latch.countDown(); // 完成工作,计数器减1
}).start();
}
System.out.println("Main thread is waiting for other threads to finish");
latch.await(); // 等待计数器为0
System.out.println("All threads have finished, main thread continues");
}
}
CountDownLatch的主要方法:
- CountDownLatch(int count):构造一个CountDownLatch,初始计数器值为count。
- void await():使当前线程等待,直到计数器为0。
- boolean await(long timeout, TimeUnit unit):使当前线程等待,直到计数器为0或超时。
- void countDown():递减计数器的值,如果计数器达到0,则唤醒所有等待的线程。
- long getCount():返回当前计数器的值。
注意事项:
- CountDownLatch是一次性的,一旦计数器为0,就不能再重用。
- CountDownLatch的计数器不能为负数,否则会抛出异常。
- 在调用await()方法时,如果线程被中断,会抛出InterruptedException。
- CountDownLatch适用于一个线程等待多个线程完成的场景,不适用于多个线程互相等待的场景。
7.2 CyclicBarrier
CyclicBarrier是Java并发包中提供的一个同步工具类,它允许一组线程相互等待,直到所有线程都到达某个公共屏障点后再继续执行。与CountDownLatch不同,CyclicBarrier可以重复使用。
// CyclicBarrier示例
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All threads have reached the barrier, main task continues");
});
// 创建并启动多个线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println("Thread " + Thread.currentThread().getName() + " is working");
try {
Thread.sleep(1000); // 模拟工作
System.out.println("Thread " + Thread.currentThread().getName() + " reached the barrier");
barrier.await(); // 等待其他线程到达屏障
System.out.println("Thread " + Thread.currentThread().getName() + " passed the barrier");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
CyclicBarrier的主要方法:
- CyclicBarrier(int parties):构造一个CyclicBarrier,初始值为parties,表示需要等待的线程数量。
- CyclicBarrier(int parties, Runnable barrierAction):构造一个CyclicBarrier,并指定当所有线程到达屏障时执行的barrierAction。
- int await():使当前线程等待,直到所有线程都到达屏障。
- int await(long timeout, TimeUnit unit):使当前线程等待,直到所有线程都到达屏障或超时。
- int getParties():返回需要等待的线程数量。
- int getNumberWaiting():返回已到达屏障的线程数量。
- boolean isBroken():返回屏障是否被破坏(例如,某个线程被中断)。
- void reset():重置屏障到初始状态。
注意事项:
- CyclicBarrier可以重复使用,而CountDownLatch不能。
- 在调用await()方法时,如果线程被中断,会抛出BrokenBarrierException或InterruptedException。
- 如果屏障被破坏(例如,某个线程被中断),所有等待的线程会抛出BrokenBarrierException。
- CyclicBarrier适用于多个线程互相等待的场景,如分阶段计算。
7.3 Semaphore
Semaphore是Java并发包中提供的一个同步工具类,它控制同时访问特定资源的线程数量。Semaphore通过一个计数器来实现,计数器的初始值表示可用的许可证数量,每当一个线程获取许可证,计数器减1,当计数器为0时,其他线程必须等待,直到有许可证被释放。
// Semaphore示例
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
int permits = 3; // 允许同时访问的线程数
Semaphore semaphore = new Semaphore(permits);
// 创建并启动多个线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is trying to acquire a permit");
semaphore.acquire(); // 获取许可证
System.out.println(Thread.currentThread().getName() + " acquired a permit, working...");
Thread.sleep(1000); // 模拟工作
System.out.println(Thread.currentThread().getName() + " finished, releasing the permit");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可证
}
}).start();
}
}
}
Semaphore的主要方法:
- Semaphore(int permits):构造一个Semaphore,初始许可证数量为permits。
- Semaphore(int permits, boolean fair):构造一个Semaphore,并指定是否使用公平模式。
- void acquire():获取一个许可证,如果没有可用的许可证,则等待。
- void acquire(int permits):获取指定数量的许可证,如果没有足够的许可证,则等待。
- boolean tryAcquire():尝试获取一个许可证,如果成功返回true,否则返回false。
- boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可证,如果在指定时间内获取成功返回true,否则返回false。
- void release():释放一个许可证。
- void release(int permits):释放指定数量的许可证。
- int availablePermits():返回当前可用的许可证数量。
- int getQueueLength():返回正在等待许可证的线程数量。
- boolean hasQueuedThreads():返回是否有线程正在等待许可证。
注意事项:
- Semaphore的许可证数量可以为负数,表示已经释放了更多的许可证。
- 在调用acquire()方法时,如果线程被中断,会抛出InterruptedException。
- Semaphore可以用于实现资源池,如数据库连接池。
- Semaphore可以用于实现限流,控制系统的并发访问量。
7.4 Exchanger
Exchanger是Java并发包中提供的一个同步工具类,它允许两个线程在某个点上交换对象。Exchanger提供了一个同步点,当两个线程都到达这个同步点时,它们可以交换彼此的数据。
// Exchanger示例
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// 创建并启动两个线程
new Thread(() -> {
try {
String data1 = "Thread1 data";
System.out.println(Thread.currentThread().getName() + " is exchanging: " + data1);
String data2 = exchanger.exchange(data1); // 交换数据
System.out.println(Thread.currentThread().getName() + " received: " + data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread1").start();
new Thread(() -> {
try {
Thread.sleep(1000); // 模拟工作
String data1 = "Thread2 data";
System.out.println(Thread.currentThread().getName() + " is exchanging: " + data1);
String data2 = exchanger.exchange(data1); // 交换数据
System.out.println(Thread.currentThread().getName() + " received: " + data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread2").start();
}
}
Exchanger的主要方法:
- Exchanger():构造一个Exchanger。
- V exchange(V x):与另一个线程交换数据,如果没有线程等待,则等待。
- V exchange(V x, long timeout, TimeUnit unit):与另一个线程交换数据,如果在指定时间内没有线程等待,则抛出TimeoutException。
注意事项:
- Exchanger只能用于两个线程之间的数据交换,不能用于多个线程。
- 在调用exchange()方法时,如果线程被中断,会抛出InterruptedException。
- Exchanger适用于生产者-消费者模式,如数据缓冲区的交换。
- Exchanger的交换是原子操作,要么两个线程都完成交换,都不完成。
8. 并发编程最佳实践与常见陷阱
8.1 最佳实践
- 避免过度同步:同步会带来性能开销,只在必要时使用同步。
// 不好的做法:过度同步
public class BadExample {
private final Object lock = new Object();
private int count;
public void increment() {
synchronized (lock) { // 不必要的同步
count++;
}
}
public int getCount() {
synchronized (lock) { // 不必要的同步
return count;
}
}
}
// 好的做法:减少同步
public class GoodExample {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 使用原子类,避免同步
}
public int getCount() {
return count.get(); // 使用原子类,避免同步
}
}
- 使用合适的并发集合:根据场景选择合适的并发集合,如ConcurrentHashMap、CopyOnWriteArrayList等。
// 不好的做法:使用同步集合
public class BadExample {
private Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
public void put(String key, String value) {
map.put(key, value); // 每次操作都需要同步
}
}
// 好的做法:使用并发集合
public class GoodExample {
private ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public void put(String key, String value) {
map.put(key, value); // 内部使用分段锁,性能更好
}
}
- 避免锁的嵌套:避免在一个锁的临界区内获取另一个锁,可能导致死锁。
// 不好的做法:可能导致死锁
public class BadExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void badMethod1() {
synchronized (lock1) {
synchronized (lock2) {
// 临界区代码
}
}
}
public void badMethod2() {
synchronized (lock2) {
synchronized (lock1) {
// 临界区代码
}
}
}
}
// 好的做法:避免锁的嵌套
public class GoodExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void goodMethod1() {
synchronized (lock1) {
// 临界区代码1
}
synchronized (lock2) {
// 临界区代码2
}
}
public void goodMethod2() {
synchronized (lock2) {
// 临界区代码2
}
synchronized (lock1) {
// 临界区代码1
}
}
}
- 使用线程池:避免频繁创建和销毁线程,使用线程池来管理线程。
// 不好的做法:频繁创建线程
public class BadExample {
public void executeTask() {
new Thread(() -> {
// 执行任务
}).start();
}
}
// 好的做法:使用线程池
public class GoodExample {
private ExecutorService executor = Executors.newFixedThreadPool(10);
public void executeTask() {
executor.submit(() -> {
// 执行任务
});
}
}
- 使用不可变对象:不可变对象是线程安全的,不需要同步。
// 不好的做法:可变对象
public class BadExample {
private int value;
public void setValue(int value) {
this.value = value; // 需要同步
}
public int getValue() {
return value; // 需要同步
}
}
// 好的做法:不可变对象
public class GoodExample {
private final int value;
public GoodExample(int value) {
this.value = value;
}
public int getValue() {
return value; // 不需要同步
}
}
- 使用并发工具类:使用CountDownLatch、CyclicBarrier、Semaphore等并发工具类,简化并发编程。
// 不好的做法:手动实现等待
public class BadExample {
private boolean ready = false;
public void setReady() {
ready = true;
}
public void waitForReady() {
while (!ready) {
Thread.yield(); // 忙等待,浪费CPU资源
}
}
}
// 好的做法:使用CountDownLatch
public class GoodExample {
private CountDownLatch latch = new CountDownLatch(1);
public void setReady() {
latch.countDown(); // 唤醒等待的线程
}
public void waitForReady() throws InterruptedException {
latch.await(); // 等待,不浪费CPU资源
}
}
8.2 常见陷阱
- 死锁:多个线程互相等待对方持有的锁,导致所有线程都无法继续执行。
// 死锁示例
public class DeadlockExample {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
synchronized (lock2) {
// 临界区代码
}
}
}
public void method2() {
synchronized (lock2) {
synchronized (lock1) {
// 临界区代码
}
}
}
}
解决方法:
- 避免锁的嵌套。
- 使用锁超时。
- 按固定顺序获取锁。
- 活锁:多个线程互相谦让,导致所有线程都无法继续执行。
// 活锁示例
public class LivelockExample {
private static class Person {
private String name;
private boolean isPolite;
public Person(String name, boolean isPolite) {
this.name = name;
this.isPolite = isPolite;
}
public void passThroughDoor(Door door) {
while (door.isBusy()) {
if (isPolite) {
// 礼貌地让对方先过
System.out.println(name + " says: 'After you'");
door.setBusy(false);
} else {
// 不礼貌地自己先过
System.out.println(name + " says: 'I'm going first'");
door.setBusy(true);
}
}
}
}
private static class Door {
private boolean busy = false;
public boolean isBusy() {
return busy;
}
public void setBusy(boolean busy) {
this.busy = busy;
}
}
}
解决方法:
- 使用随机性。
- 设置最大重试次数。
- 饥饿:某些线程长时间得不到执行机会,导致无法完成工作。
// 饥饿示例
public class StarvationExample {
private static class Worker implements Runnable {
private int id;
public Worker(int id) {
this.id = id;
}
@Override
public void run() {
while (true) {
System.out.println("Worker " + id + " is working");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Thread worker1 = new Thread(new Worker(1));
Thread worker2 = new Thread(new Worker(2));
worker1.setPriority(Thread.MAX_PRIORITY); // 高优先级
worker2.setPriority(Thread.MIN_PRIORITY); // 低优先级
worker1.start();
worker2.start();
}
}
解决方法:
- 使用公平锁。
- 避免设置过高的线程优先级。
- 竞态条件:多个线程共享数据,导致结果依赖于线程执行的顺序。
// 竞态条件示例
public class RaceConditionExample {
private int count = 0;
public void increment() {
count++; // 非原子操作,可能导致竞态条件
}
public int getCount() {
return count;
}
}
解决方法:
- 使用同步。
- 使用原子类。
- 使用不可变对象。
- 内存可见性问题:一个线程对共享变量的修改对其他线程不可见。
// 内存可见性问题示例
public class VisibilityExample {
private boolean flag = false;
public void writer() {
flag = true; // 修改对其他线程不可见
}
public void reader() {
while (!flag) { // 可能永远看不到flag的更新
Thread.yield();
}
}
}
解决方法:
- 使用volatile关键字。
- 使用synchronized关键字。
- 使用并发工具类。
- 并发修改异常:在遍历集合时修改集合的结构。
// 并发修改异常示例
public class ConcurrentModificationExample {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
list.add("A");
list.add("B");
list.add("C");
new Thread(() -> {
for (String item : list) {
System.out.println(item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
list.add("D"); // 并发修改,可能抛出ConcurrentModificationException
}).start();
}
}
解决方法:
- 使用并发集合,如CopyOnWriteArrayList。
- 使用同步集合,如Collections.synchronizedList。
- 使用迭代器的remove()方法。
9. 总结
多线程与并发编程是现代软件开发中的重要技能,它可以帮助我们充分利用多核CPU的计算能力,提高程序的性能和响应速度。然而,并发编程也带来了一系列复杂的问题,如竞态条件、死锁、活锁等,这些问题往往难以重现和调试,给程序开发带来了巨大的挑战。
在本文中,我们深入探讨了Java平台下的多线程与并发编程技术,从基础的线程概念到高级的并发工具,从理论原理到实践应用,全面解析了并发编程的各个方面。我们学习了Java内存模型与线程基础、并发集合类、线程池、锁机制、原子类、并发工具类等内容,并提供了详细的代码示例和注意事项。
在实际开发中,我们应该遵循并发编程的最佳实践,避免常见的陷阱,写出高效、可靠、安全的并发程序。同时,我们也应该不断学习和探索新的并发技术和工具,以适应不断变化的硬件环境和应用需求。
最后,记住并发编程是一门艺术,需要不断实践和总结,只有通过不断的尝试和失败,才能真正掌握并发编程的精髓。希望本文能够帮助读者更好地理解和掌握并发编程,为成为一名优秀的并发程序员打下坚实的基础。
如果有哪里写的不对,评论或者私信我~我会及时更改