Java并发编程: 第十章 Forkjoin

发布于:2024-05-06 ⋅ 阅读:(32) ⋅ 点赞:(0)

一、介绍

Fork/Join是一种在多线程领域中常用的算法或技术,其核心思想是将大任务分割成若干个小任务,然后将这些小任务分配给多个线程并行处理,最终将结果合并起来。这种思想可以应用于多种场景,例如图像处理、批处理、并行排序等。

ForkJoin模式的任务分解和执行过程:
在这里插入图片描述
Fork/Join框架的基本原理是,每个任务都是由一个或多个子任务组成的。当一个任务被分解为多个子任务时,子任务会不断地被拆分,直到满足拆分条件为止,然后再将这些子任务合并起来,形成完整的结果集。在Fork/Join框架中,任务的拆分是通过fork()方法来实现的,合并是通过join()方法来实现的。

二、ForJoin框架

在Java中,JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在Java8的Lambda并行流框架中充当着底层框架的角色,ForkJoin框架主要包含以下几个组件:

  • ForkJoinPool:
    • 这是ForkJoin框架的核心组件,它是一个线程池,专门用于执行ForkJoin任务。ForkJoinPool会根据系统的CPU核心数来自动调整线程池中的线程数,以优化资源利用率。
    • 线程池中的每个线程都有一个自己的任务队列。当一个线程完成了它的任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行(这种机制称为工作窃取(Work-Stealing)算法)。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高CPU的利用率。
    • 在工作窃取算法的实现过程中,ForkJoinPool会维护一个优先级队列(priority queue),用于存储等待被窃取的任务。
  • ForkJoinTask:
    • ForkJoinTask是一个抽象类,代表可以异步执行的任务。它有两个主要的子类:
      • RecursiveTask:用于有返回结果的任务。当任务被拆分成子任务时,通常使用RecursiveTask来表示。
      • RecursiveAction:用于没有返回结果的任务。
    • ForkJoinTask的主要方法是fork()和join()。fork()方法用于将任务提交给ForkJoinPool进行异步执行,而join()方法则用于等待任务的完成并获取结果(如果任务有结果的话)。
  • ForkJoinWorkerThread:
    • 这是ForkJoinPool中的工作线程。每个ForkJoinWorkerThread都有一个自己的任务队列,并且使用优先级队列来实现工作窃取算法。
    • 这些工作线程会不断地从自己的任务队列中取出任务并执行,当自己的任务队列为空时,它们会尝试从其他线程的任务队列中窃取任务。

因为ForkJoinTask比较复杂,并且其抽象方法比较多,故在日常使用时一般不会直接继承ForkJoinTask来 实 现自 定 义 的任 务 类 ,而 是 通 过继 承 ForkJoinTask两 子 类RecursiveTask或 者RecursiveAction之一去实现自定义任务类,自定义任务类需要实现这些子类的compute()方法,该方法的执行流程一般如下:

if 任务足够小
	直接返回结果
else 
	分解成N个子任务
	依次调用每个子任务的fork方法执行子任务
	依次调用每个子任务的join方法,等待子任务的完成,然后合并执行结果

三、ForJoin框架实战

1、并行求和

  • 使用RecursiveTask来实现并行求和的功能:
package chatpter10;

import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10; // 设定阈值
    private int[] array;
    private int low, high;

    public SumTask(int[] array, int low, int high) {
        this.array = array;
        this.low = low;
        this.high = high;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        if (high - low < THRESHOLD) { // 如果子数组长度小于阈值,直接计算
            for (int i = low; i <= high; i++) {
                sum += array[i];
            }
        } else { // 否则,分解任务
            int mid = (low + high) >>> 1;
            SumTask left = new SumTask(array, low, mid);
            SumTask right = new SumTask(array, mid + 1, high);
            // 并行执行任务
            left.fork(); // fork一个任务,在后台异步执行
            sum = right.compute() + left.join(); // 等待左侧任务完成,并加上右侧任务的结果
        }
        return sum;
    }
}

  • 提交任务:
    创建一个ForkJoinPool实例,并将RecursiveTask提交给这个线程池来执行
package chatpter10;

import org.junit.Test;

import java.util.concurrent.ForkJoinPool;
public class TestForkJoin {
    @Test
    public void test01() {
        ForkJoinPool pool = new ForkJoinPool(); // 创建一个ForkJoinPool实例
        int[] array = new int[1000]; // 初始化数组
        int sum = 0;
        for(int i = 1; i< 1000; i++) {
            array[i-1] = i;
            sum += i;
        }
        System.out.println("Sum: " + sum);
        SumTask task = new SumTask(array, 0, array.length - 1); // 创建一个SumTask实例
        Integer result = pool.invoke(task); // 提交任务并等待结果
        System.out.println("Sum: " + result);
    }
}

运行结果:

Sum: 499500
Sum: 499500

Process finished with exit code 0

RecursiveTask的泛型参数表示任务的返回类型。在上面的例子中,SumTask的返回类型是Integer,表示求和的结果。你可以通过调用RecursiveTask的get方法(或者join方法,如果任务已经提交给ForkJoinPool)来获取结果。

注意:虽然RecursiveTask提供了自动管理线程池的功能,但在使用时仍需注意线程安全问题。特别是当任务涉及共享数据时,需要确保数据的访问是线程安全的。

四、核心API

1、构造器

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

构造器的四个参数具体介绍如下:

  • Parallelism (并行级别/并行度)

    • 这个参数指定了 ForkJoinPool 的目标并行度,也就是线程池中的线程数量。
    • 默认值通常是 Runtime.getRuntime().availableProcessors(),即系统的处理器数量。
    • 较大的并行度可以提高计算速度,但也会增加系统资源消耗。
  • ForkJoinWorkerThreadFactory:
    当ForkJoin框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread()的方法。在ForkJoin框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:DefaultForkJoinWorkerThreadFactory。

  • UncaughtExceptionHandler(未捕获的异常处理器): 当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。

  • AsyncMode (异步模式):asyncMode参数表示任务是否为异步模式,其默认值为false。

2、提交任务的方式

向ForkJoinPool线程池提交任务可与分为两类,如下:

  • 提交外部任务
    提价外部任务有三种方式:
    • invoke()
      • invoke()方法提交任务并等待其完成,然后返回任务的计算结果。也就是说,它会阻塞调用线程直到任务完成。
      • 与submit()类似,invoke()也有返回值,但它不需要使用Future对象来获取结果,因为结果是直接返回的。
      • 如果当前线程是ForkJoinPool的工作线程并且属于当前ForkJoinPool,invoke()会直接调用FJTask的exec()方法。否则,任务将被添加到ForkJoinPool的任务队列,并分配一个ForkJoinWorkerThread来执行,然后等待其完成并返回结果。
    • execute()
      • execute()方法用于提交一个任务,该任务将被异步执行。也就是说,提交任务后立即返回,不等待任务完成。
      • execute()方法没有返回值,即它不返回表示异步计算结果的Future对象。
      • 如果当前线程是ForkJoinPool的工作线程并且属于当前ForkJoinPool,则execute()方法会调用forkOrSubmit()将任务加入到工作线程的任务队列。否则,任务将被添加到ForkJoinPool的任务队列,并分配一个ForkJoinWorkerThread来执行。
    • submit()
      • submit()方法也用于提交一个任务,但与execute()不同,它返回一个表示异步计算结果的ForkJoinTask对象。
      • 可以使用返回的ForkJoinTask对象的get()方法来等待并获取任务的计算结果。
      • submit()的行为与execute()类似,但在任务完成后,它会将结果封装在ForkJoinTask对象中返回。
  • 提交子任务
    在ForkJoinPool中,子任务的提交通常是通过ForkJoinTask的fork()方法进行的。ForkJoinTask是ForkJoinPool用于执行并行任务的基础类,它有两个主要的子类:RecursiveAction(无返回值的任务)和RecursiveTask(有返回值的任务)。

以下是子任务提交的基本过程:

  • 创建ForkJoinTask:首先,你需要创建一个或多个ForkJoinTask对象来表示你的任务。这些任务可以是RecursiveAction或RecursiveTask的实例。
  • 实现compute()方法:在RecursiveAction或RecursiveTask中,你需要实现compute()方法。这个方法是实际执行任务的地方。在RecursiveTask中,compute()方法应该返回一个结果;在RecursiveAction中,compute()方法没有返回值。
  • 使用fork()方法提交子任务:在compute()方法中,你可以使用fork()方法来提交子任务。fork()方法会异步地执行这个任务,并立即返回一个表示该任务的ForkJoinTask对象。这个对象可以用来等待任务完成(通过调用join()方法)或获取任务的结果(如果任务是RecursiveTask)。
  • 使用join()方法等待任务完成:你可以在需要的地方调用join()方法来等待一个或多个任务完成。join()方法会阻塞当前线程,直到任务完成。对于RecursiveTask,join()方法还会返回任务的结果。

注意:ForkJoinPool使用了一种称为“工作窃取”的算法来平衡负载。当一个线程完成了自己的任务时,它会尝试从其他线程的任务队列中“窃取”任务来执行。这有助于保持所有线程都保持忙碌状态,从而提高CPU的利用率。