一、介绍
Fork/Join是一种在多线程领域中常用的算法或技术,其核心思想是将大任务分割成若干个小任务,然后将这些小任务分配给多个线程并行处理,最终将结果合并起来。这种思想可以应用于多种场景,例如图像处理、批处理、并行排序等。
ForkJoin模式的任务分解和执行过程:
Fork/Join框架的基本原理是,每个任务都是由一个或多个子任务组成的。当一个任务被分解为多个子任务时,子任务会不断地被拆分,直到满足拆分条件为止,然后再将这些子任务合并起来,形成完整的结果集。在Fork/Join框架中,任务的拆分是通过fork()方法来实现的,合并是通过join()方法来实现的。
二、ForJoin框架
在Java中,JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在Java8的Lambda并行流框架中充当着底层框架的角色,ForkJoin框架主要包含以下几个组件:
- ForkJoinPool:
- 这是ForkJoin框架的核心组件,它是一个线程池,专门用于执行ForkJoin任务。ForkJoinPool会根据系统的CPU核心数来自动调整线程池中的线程数,以优化资源利用率。
- 线程池中的每个线程都有一个自己的任务队列。当一个线程完成了它的任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行(这种机制称为工作窃取(Work-Stealing)算法)。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高CPU的利用率。
- 在工作窃取算法的实现过程中,ForkJoinPool会维护一个优先级队列(priority queue),用于存储等待被窃取的任务。
- ForkJoinTask:
- ForkJoinTask是一个抽象类,代表可以异步执行的任务。它有两个主要的子类:
- RecursiveTask:用于有返回结果的任务。当任务被拆分成子任务时,通常使用RecursiveTask来表示。
- RecursiveAction:用于没有返回结果的任务。
- ForkJoinTask的主要方法是fork()和join()。fork()方法用于将任务提交给ForkJoinPool进行异步执行,而join()方法则用于等待任务的完成并获取结果(如果任务有结果的话)。
- ForkJoinTask是一个抽象类,代表可以异步执行的任务。它有两个主要的子类:
- ForkJoinWorkerThread:
- 这是ForkJoinPool中的工作线程。每个ForkJoinWorkerThread都有一个自己的任务队列,并且使用优先级队列来实现工作窃取算法。
- 这些工作线程会不断地从自己的任务队列中取出任务并执行,当自己的任务队列为空时,它们会尝试从其他线程的任务队列中窃取任务。
因为ForkJoinTask比较复杂,并且其抽象方法比较多,故在日常使用时一般不会直接继承ForkJoinTask来 实 现自 定 义 的任 务 类 ,而 是 通 过继 承 ForkJoinTask两 子 类RecursiveTask或 者RecursiveAction之一去实现自定义任务类,自定义任务类需要实现这些子类的compute()方法,该方法的执行流程一般如下:
if 任务足够小
直接返回结果
else
分解成N个子任务
依次调用每个子任务的fork方法执行子任务
依次调用每个子任务的join方法,等待子任务的完成,然后合并执行结果
三、ForJoin框架实战
1、并行求和
- 使用RecursiveTask来实现并行求和的功能:
package chatpter10;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 设定阈值
private int[] array;
private int low, high;
public SumTask(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
@Override
protected Integer compute() {
int sum = 0;
if (high - low < THRESHOLD) { // 如果子数组长度小于阈值,直接计算
for (int i = low; i <= high; i++) {
sum += array[i];
}
} else { // 否则,分解任务
int mid = (low + high) >>> 1;
SumTask left = new SumTask(array, low, mid);
SumTask right = new SumTask(array, mid + 1, high);
// 并行执行任务
left.fork(); // fork一个任务,在后台异步执行
sum = right.compute() + left.join(); // 等待左侧任务完成,并加上右侧任务的结果
}
return sum;
}
}
- 提交任务:
创建一个ForkJoinPool实例,并将RecursiveTask提交给这个线程池来执行
package chatpter10;
import org.junit.Test;
import java.util.concurrent.ForkJoinPool;
public class TestForkJoin {
@Test
public void test01() {
ForkJoinPool pool = new ForkJoinPool(); // 创建一个ForkJoinPool实例
int[] array = new int[1000]; // 初始化数组
int sum = 0;
for(int i = 1; i< 1000; i++) {
array[i-1] = i;
sum += i;
}
System.out.println("Sum: " + sum);
SumTask task = new SumTask(array, 0, array.length - 1); // 创建一个SumTask实例
Integer result = pool.invoke(task); // 提交任务并等待结果
System.out.println("Sum: " + result);
}
}
运行结果:
Sum: 499500
Sum: 499500
Process finished with exit code 0
RecursiveTask的泛型参数表示任务的返回类型。在上面的例子中,SumTask的返回类型是Integer,表示求和的结果。你可以通过调用RecursiveTask的get方法(或者join方法,如果任务已经提交给ForkJoinPool)来获取结果。
注意:虽然RecursiveTask提供了自动管理线程池的功能,但在使用时仍需注意线程安全问题。特别是当任务涉及共享数据时,需要确保数据的访问是线程安全的。
四、核心API
1、构造器
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
构造器的四个参数具体介绍如下:
Parallelism (并行级别/并行度):
- 这个参数指定了 ForkJoinPool 的目标并行度,也就是线程池中的线程数量。
- 默认值通常是 Runtime.getRuntime().availableProcessors(),即系统的处理器数量。
- 较大的并行度可以提高计算速度,但也会增加系统资源消耗。
ForkJoinWorkerThreadFactory:
当ForkJoin框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread()的方法。在ForkJoin框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:DefaultForkJoinWorkerThreadFactory。UncaughtExceptionHandler(未捕获的异常处理器): 当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。
AsyncMode (异步模式):asyncMode参数表示任务是否为异步模式,其默认值为false。
2、提交任务的方式
向ForkJoinPool线程池提交任务可与分为两类,如下:
- 提交外部任务
提价外部任务有三种方式:- invoke()
- invoke()方法提交任务并等待其完成,然后返回任务的计算结果。也就是说,它会阻塞调用线程直到任务完成。
- 与submit()类似,invoke()也有返回值,但它不需要使用Future对象来获取结果,因为结果是直接返回的。
- 如果当前线程是ForkJoinPool的工作线程并且属于当前ForkJoinPool,invoke()会直接调用FJTask的exec()方法。否则,任务将被添加到ForkJoinPool的任务队列,并分配一个ForkJoinWorkerThread来执行,然后等待其完成并返回结果。
- execute()
- execute()方法用于提交一个任务,该任务将被异步执行。也就是说,提交任务后立即返回,不等待任务完成。
- execute()方法没有返回值,即它不返回表示异步计算结果的Future对象。
- 如果当前线程是ForkJoinPool的工作线程并且属于当前ForkJoinPool,则execute()方法会调用forkOrSubmit()将任务加入到工作线程的任务队列。否则,任务将被添加到ForkJoinPool的任务队列,并分配一个ForkJoinWorkerThread来执行。
- submit()
- submit()方法也用于提交一个任务,但与execute()不同,它返回一个表示异步计算结果的ForkJoinTask对象。
- 可以使用返回的ForkJoinTask对象的get()方法来等待并获取任务的计算结果。
- submit()的行为与execute()类似,但在任务完成后,它会将结果封装在ForkJoinTask对象中返回。
- invoke()
- 提交子任务
在ForkJoinPool中,子任务的提交通常是通过ForkJoinTask的fork()方法进行的。ForkJoinTask是ForkJoinPool用于执行并行任务的基础类,它有两个主要的子类:RecursiveAction(无返回值的任务)和RecursiveTask(有返回值的任务)。
以下是子任务提交的基本过程:
- 创建ForkJoinTask:首先,你需要创建一个或多个ForkJoinTask对象来表示你的任务。这些任务可以是RecursiveAction或RecursiveTask的实例。
- 实现compute()方法:在RecursiveAction或RecursiveTask中,你需要实现compute()方法。这个方法是实际执行任务的地方。在RecursiveTask中,compute()方法应该返回一个结果;在RecursiveAction中,compute()方法没有返回值。
- 使用fork()方法提交子任务:在compute()方法中,你可以使用fork()方法来提交子任务。fork()方法会异步地执行这个任务,并立即返回一个表示该任务的ForkJoinTask对象。这个对象可以用来等待任务完成(通过调用join()方法)或获取任务的结果(如果任务是RecursiveTask)。
- 使用join()方法等待任务完成:你可以在需要的地方调用join()方法来等待一个或多个任务完成。join()方法会阻塞当前线程,直到任务完成。对于RecursiveTask,join()方法还会返回任务的结果。
注意:ForkJoinPool使用了一种称为“工作窃取”的算法来平衡负载。当一个线程完成了自己的任务时,它会尝试从其他线程的任务队列中“窃取”任务来执行。这有助于保持所有线程都保持忙碌状态,从而提高CPU的利用率。