Fork Join in Java

2025年5月7日 | 阅读6分钟

如今,系统都使用多核处理器。多核处理器可以加快计算速度。因此,程序员有必要有效地利用多核处理器,以便在更短的时间内生成结果。Java 中的 Fork/Join 用于有效地利用核心(处理指令的 CPU 大脑)。Fork/Join 将较大的任务分解成较小的子任务。然后将这些子任务分配给各个核心。随后将这些子任务的结果合并以生成最终结果。任务的分解和结果的合并模仿了分而治之算法。Fork 负责分解任务,Join 负责合并任务结果以生成最终结果。

Fork Join in Java

值得注意的是,负责完成子任务的各种线程从不空闲。实际上,它们实现了工作窃取算法,在这种算法中,空闲线程会从忙碌的线程那里窃取工作。

伪代码

需要记住的一个重要观点是,不应该盲目地将问题分解成子问题。将问题分解成子问题是有开销的。如果开销和解决子问题所需的时间大于解决问题本身所需的时间,那么就不应该分解问题。逻辑上可以合理地将问题分解为子问题的界限称为阈值。

Java Fork/Join 程序

以下程序演示了 Java 中 fork/join 的工作原理。

文件名: ForkJoinExample.java

输出

The number 50 is found 5 times.

说明:在代码中,我们扩展了抽象类 RecursiveTask。该类的 compute() 方法包含计算代码。RecursiveTask 类的类型参数 <Integer> 决定了返回结果的类型。使用 Recursive 类创建的搜索任务被提交到 ForkJoinPool。当调用 ForkJoinPool 类的 invoke() 方法时,任务执行开始。invoke() 方法在单次调用中组合了 fork() 和 join() 方法,并等待任务执行完成,然后返回结果。当调用 invoke() 方法时,它会将任务分解成子任务。

Java ForkJoinPool 类

fork/join 框架的核心是 ForkJoinPool 类。ForkJoinPool 类实现了 ExecutorService 接口。它还扩展了 AbstractExecutorService 类并实现了工作窃取算法。

ForkJoinPool 类的方法

方法描述
public int getParallelism()此方法返回池的并行级别。
public boolean isShutdown()如果调用此方法的池已关闭,则方法返回 true;否则返回 false。
public boolean isQuiescent()如果池中的所有工作线程都处于空闲状态,则方法返回 true;否则返回 false。
public boolean hasQueuedSubmissions()如果提交给池的任何任务尚未开始执行,则方法返回 true;否则返回 false。
public int getPoolSize()返回已开始执行但尚未完成执行的工作线程的总数。
public long getStealCount()此方法返回从其他工作线程窃取的任务的总数。
public boolean isTerminating()如果终止过程已开始但尚未完成,则方法返回 true;否则返回 false。
public int getActiveThreadCount()此方法返回当前正在执行或从其他线程窃取任务的线程总数。
public long getQueuedTaskCount()此方法返回工作线程排队中的任务总数。
public ForkJoinTask<?> submit(Runnable task)此方法提交一个 Runnable 任务以供执行,并返回代表该任务的 Future。
public List<Runnable> shutdownNow()此方法尝试停止或取消所有任务,并拒绝所有后续任务。

Fork/Join 实现

有两种方法可以实例化 ForkJoinClass

1) 使用类的构造函数

ForkJoinPool():它是 ForkJoinPool 类的默认构造函数。它创建一个默认池。生成的池支持的并行度等于系统中可用的处理器总数。在上面的示例中,我们使用此构造函数实例化了 ForkJoinPool 类。

ForkJoinPool(int p):这是一个参数化构造函数,也用于创建具有自定义并行度的池。值 p 必须是自然数(> 0),并且不应超过系统中可用的处理器数量。

2) 使用静态方法 commonPool()

ForkJoinPool 类的静态方法 commonPool() 也可用于创建 ForkJoinPool 类的实例。

文件名: JavaForkJoinPoolExample.java

输出

Total Number of available cores in the system processor is: 4
Total number of active threads before invoking: 0
Total number of active threads after invoking: 3
The size of the Common Pool is: 3