后端并发编程操作简述 Java高并发程序设计 六类并发容器 七种线程池 四种阻塞队列

发布于:2024-11-28 ⋅ 阅读:(28) ⋅ 点赞:(0)

目录

并发集合

1. ConcurrentHashMap:

2. CopyOnWriteArrayList:

3. CopyOnWriteArraySet:

4. BlockingQueue系列:

5. ConcurrentSkipListMap 和 ConcurrentSkipListSet:

6. ConcurrentLinkedDeque:

注意事项

七种线程池

1. FixedThreadPool

2.CachedThreadPool

3.ScheduledThreadPool

4.SingleThreadExecutor

5.SingleThreadScheduledExecutor

6.ForkJoinPool

7. CustomThreadPool(自定义线程池)

四种阻塞队列

1. ArrayBlockingQueue

2. LinkedBlockingQueue

3. PriorityBlockingQueue

4. DelayQueue

综合示例

1.在实际应用中,应该确保线程池和阻塞队列的正确关闭和资源释放。

2.消费者任务中的无限循环需要额外的机制来优雅地终止,例如使用中断或额外的停止标志。

3.根据具体需求选择合适的线程池和阻塞队列实现。


并发集合

除了 ConcurrentLinkedQueue 之外, Java 还提供了多种并发集合类,以满足不同场景下的并发需求。
以下是一些常见的并发集合类:

1. ConcurrentHashMap

这是一个线程安全的哈希表实现,针对多线程环境进行了优化,以减少锁竞争从而提高性
能。
它通过分段锁(在 JDK8 之前)或 CAS Compare-And-Swap )操作(在 JDK8 及之后)来实现
高效的并发访问。
适用于多线程操作大量数据,且读操作远多于写操作的场景。

2. CopyOnWriteArrayList

这是一个线程安全的可变数组,其中所有可变操作(如添加、设置等)都是通过对底层数组
的复制来实现的。
适用于读多写少的场景,因为写操作需要复制整个数组,开销较大。
迭代器是弱一致性的,即在遍历过程中可能看不到其他线程的修改。

3. CopyOnWriteArraySet

这是一个线程安全的 Set 实现,基于 CopyOnWriteArrayList
继承了 CopyOnWriteArrayList 的线程安全性和弱一致性迭代器的特性。

4. BlockingQueue系列

这是一个支持阻塞操作的队列接口,其实现类包括 ArrayBlockingQueue
LinkedBlockingQueue PriorityBlockingQueue DelayQueue SynchronousQueue
LinkedBlockingDeque 等。
这些队列在插入和移除元素时,如果队列为空或已满,则会根据具体情况阻塞调用线程或抛
出异常。
常用于生产者 - 消费者问题,以及需要线程间协调的场景。

5. ConcurrentSkipListMap ConcurrentSkipListSet

这两个类是基于跳表( Skip List )的并发实现。
提供了快速的并发访问能力,并且支持有序的数据结构。
适用于跨多个线程共享访问的有序数据,以及在并发情况下需要排序功能的场景。

6. ConcurrentLinkedDeque

这是一个线程安全的双端队列,支持在队列的两端进行高效的插入和删除操作。
适用于需要频繁在队列两端进行操作的场景。
这些并发集合类在内部实现了必要的同步策略,确保了在多线程环境下的线程安全性。开发者可以根据
具体的需求和场景,选择合适的并发集合类来提高程序的并发性能和安全性
当然,以下是在上述代码的基础上添加了详细注释的版本,以帮助理解每个部分的作用和目的:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentWriteExample {
    // 定义线程数量和每个线程写入的次数
    private static final int NUM_THREADS = 10;
    private static final int NUM_WRITES_PER_THREAD = 100;

    // 共享资源:一个线程安全的ArrayList(实际上这里不是线程安全的,因为我们会手动添加锁)
    // 注意:如果直接使用ArrayList进行并发写入,需要外部同步机制。
    // 更推荐使用ConcurrentLinkedQueue或其他并发集合。
    private static final List<String> sharedList = new ArrayList<>();

    // 用于控制对共享资源的访问的锁
    private static final Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);

        // 提交多个任务到线程池
        for (int i = 0; i < NUM_THREADS; i++) {
            executorService.submit(() -> {
                // 每个任务会循环写入数据到共享列表中
                for (int j = 0; j < NUM_WRITES_PER_THREAD; j++) {
                    writeData("Data from thread " + Thread.currentThread().getId());
                }
            });
        }

        // 关闭线程池,不再接受新任务,但会继续执行已提交的任务
        executorService.shutdown();

        // 等待所有任务完成
        // 注意:在实际应用中,更推荐使用awaitTermination来等待指定时间或直到任务完成,
        // 并且要处理InterruptedException。这里的简单while循环只是为了演示。
        while (!executorService.isTerminated()) {
            // 等待所有任务完成
        }

        // 打印共享列表的大小和内容,以验证并发写入的结果
        System.out.println("Shared list size: " + sharedList.size());
        System.out.println("Shared list content: " + sharedList);
    }

    // 写入数据到共享列表的方法,使用锁来确保线程安全
    private static void writeData(String data) {
        // 获取锁,如果锁不可用,则当前线程会阻塞直到锁可用
        lock.lock();
        try {
            // 在锁的保护下,安全地向共享列表添加数据
            sharedList.add(data);
        } finally {
            // 确保无论如何都会释放锁,以避免死锁
            lock.unlock();
        }
    }
}
注释中强调了以下几点:
1. 线程数量和写入次数 :定义了将有多少线程同时运行,以及每个线程将写入多少次数据。
2. 共享资源 :虽然这里使用了 ArrayList ,但它是非线程安全的。我们通过外部添加
ReentrantLock 来确保线程安全。在实际应用中,更推荐使用 java.util.concurrent 包中的并
发集合。
3. :使用 ReentrantLock 来控制对共享资源的访问,确保同一时间只有一个线程可以写入数据。
4. 线程池 :使用 ExecutorService 来管理线程池,这样可以更高效地管理线程的生命周期和资源。
5. 任务提交 :将多个任务提交到线程池,每个任务都会执行写入操作。
6. 等待任务完成 :使用 shutdown() 方法关闭线程池,并使用 while 循环等待所有任务完成。在实
际应用中,更推荐使用 awaitTermination() 方法。
7. 打印结果 :在所有任务完成后,打印共享列表的大小和内容,以验证并发写入的结果是否正确。
请注意,虽然这个示例展示了如何使用锁来实现线程安全的并发写入,但在实际应用中,如果可能的
话,更推荐使用 java.util.concurrent 包中提供的并发集合和工具类,因为它们通常提供了更高的性
能和更好的可扩展性。

注意事项

使用 ReentrantLock 可以确保对共享资源的互斥访问,但也会引入一定的性能开销。
在实际应用中,如果可以使用 java.util.concurrent 包中的其他并发集合(如
ConcurrentHashMap CopyOnWriteArrayList 等),则可能更合适,因为这些集合已经内置
了线程安全机制。
始终注意死锁和性能问题,特别是在高并发场景下。
这个示例展示了基本的并发写入实现,但在实际应用中,可能需要根据具体需求进行更复杂的处理。
Java 中的 java.util.concurrent 包提供了丰富的线程池和阻塞队列实现,用于满足不同的并发需
求。以下是 Java 中的七种主要线程池和四种常见阻塞队列的详细举例、演示及特点说明,并附有代码
注释。

七种线程池

1. FixedThreadPool

特点:固定数量的线程池,线程数量在创建时指定,任务队列为无界链表。
示例:
// 创建一个固定大小的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

2.CachedThreadPool

特点:会根据需要创建新线程,如果空闲线程超过 60 秒则会被回收,任务队列为同步队列
(无存储能力)。

示例:

// 创建一个可缓存的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

3.ScheduledThreadPool

特点:可以调度任务在给定的延迟后运行,或者定期执行。

示例:

// 创建一个调度线程池
ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(3);
scheduledThreadPool.schedule(() -> {
System.out.println("Task executed after delay");
}, 1, TimeUnit.SECONDS);

4.SingleThreadExecutor

特点:单线程执行器,确保任务按顺序执行,任务队列为无界链表
示例:
// 创建一个单线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

5.SingleThreadScheduledExecutor

特点:单线程调度执行器,确保任务按顺序执行,且可以调度任务。
示例(基于 ScheduledThreadPool 简化):
// 实际上可以通过ScheduledThreadPool(1)来实现单线程调度
ScheduledExecutorService singleThreadScheduledExecutor =
Executors.newScheduledThreadPool(1);

6.ForkJoinPool

特点:用于执行分而治之算法的任务,适用于大量小规模任务的并行处理。

示例:
// 创建一个ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(() -> {
// 分而治之的任务
});

7. CustomThreadPool(自定义线程池)

特点:通过 ThreadPoolExecutor 构造函数自定义线程池的核心线程数、最大线程数、存
活时间、任务队列等。
示例:
// 创建一个自定义的线程池
ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<Runnable>() // 任务队列
);

四种阻塞队列

1. ArrayBlockingQueue

特点:基于数组的阻塞队列,有界, FIFO (先进先出)顺序。
示例:
BlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(10);

2. LinkedBlockingQueue

特点:基于链表的阻塞队列,可选有界或无界, FIFO 顺序,吞吐量通常高于
ArrayBlockingQueue
示例:
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();

3. PriorityBlockingQueue

特点:支持优先级的阻塞队列,无界,基于优先级(自然顺序或提供的 Comparator )进行
排序。
示例:

BlockingQueue<Integer> priorityBlockingQueue = new PriorityBlockingQueue<>();

4. DelayQueue

特点:支持延迟获取元素的阻塞队列,无界,元素只有在其延迟期满时才能被提取。
示例:
BlockingQueue<DelayedElement> delayQueue = new DelayQueue<>();
// DelayedElement 需要实现 Delayed 接口

综合示例

由于篇幅限制,这里只提供一个综合使用 FixedThreadPool ArrayBlockingQueue 的示例,并简
要说明其他线程池和队列的用法。
import java.util.concurrent.*;

public class ThreadPoolAndBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        // 创建一个有界的阻塞队列
        BlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(10);

        // 生产者任务
        Runnable producer = () -> {
            try {
                for (int i = 0; i < 20; i++) {
                    arrayBlockingQueue.put(i); // 如果队列满了,会阻塞
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        };

        // 消费者任务
        Runnable consumer = () -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    Integer value = arrayBlockingQueue.take(); // 如果队列空了,会阻塞
                    System.out.println("Consumed: " + value);
                    // 模拟任务处理时间
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        };

        // 提交生产者任务给线程池
        fixedThreadPool.submit(producer);

        // 提交多个消费者任务给线程池(这里为了演示只提交一个,实际可以根据需要提交多个)
        fixedThreadPool.submit(consumer);

        // 注意:为了示例简洁,这里没有添加关闭线程池的代码。在实际应用中,应该在适当的时候调用 shutdown() 方法。
        // 优雅地关闭线程池
        try {
            Thread.sleep(10000); // 等待一段时间,允许生产者和消费者执行
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 关闭线程池
        fixedThreadPool.shutdown();
    }
}
注意

1.在实际应用中,应该确保线程池和阻塞队列的正确关闭和资源释放。

2.消费者任务中的无限循环需要额外的机制来优雅地终止,例如使用中断或额外的停止标志。

3.根据具体需求选择合适的线程池和阻塞队列实现。