什么是 CompletableFuture?

2024年10月23日 | 阅读 12 分钟

CompletableFuture 用于异步编程。异步编程意味着编写非阻塞代码。它在一个独立于主应用程序线程的线程上运行任务,并通知主线程其进度、完成或失败情况。

这样,主线程就不会阻塞或等待任务完成。其他任务可以并行执行。并行化可以提高程序的性能。

CompletableFuture 是 Java 中的一个类。它属于 java.util.concurrent 包。它实现了 CompletionStage 和 Future 接口。

CompletionStage

  • 当另一个完成阶段完成后,它会执行一个操作并返回一个值。
  • 一个可能触发其他任务的任务模型。

因此,它是链条中的一个元素。

当多个线程尝试完成 - 异常完成或取消 CompletableFuture 时,只有一个会成功。

Future vs. CompletableFuture

CompletableFuture 是 Java 8 中引入的 Java Future API 的扩展。

Future 用于异步编程。它提供了 isDone() 和 get() 两个方法。这些方法在计算完成后检索结果。

Future 的局限性

  • Future 不能相互完成。
  • 我们无法在不阻塞的情况下对 Future 的结果执行进一步的操作。
  • Future 没有异常处理。
  • 我们无法组合多个 Future。

Future 有很多局限性,这就是我们有 CompletableFuture 的原因。CompletableFuture 提供了创建多个 Future、链式调用和组合的广泛方法。它还具有全面的异常处理支持。

创建 CompletableFuture

我们只能使用以下无参构造函数来创建 CompletableFuture。

示例

最常用的 CompletableFuture 方法是

  • supplyAsync(): 它异步完成其工作。Supplier 的结果默认由 ForkJoinPool.commonPool() 中的一个任务运行。supplyAsync() 方法返回 CompletableFuture,我们可以在其上应用其他方法。
  • thenApply(): 该方法接受一个函数作为参数。当此阶段正常完成时,它返回一个新的 CompletableStage。新阶段用作提供的函数的参数。
  • join(): 该方法在完成时返回结果值。如果异常完成,它还会抛出 CompletionException(未检查异常)。

文件名:CompletableFutureExample1.java

输出

CompletableFuture in Java

CompletableFuture 的异步方法

CompletableFuture 提供了一组异步方法,支持非阻塞操作,允许在单独的线程中执行任务而不阻塞调用线程。这些方法对于开发响应式应用程序至关重要,尤其是在处理 I/O 密集型任务、复杂计算或任何持续时间不可预测的过程时。以下是 CompletableFuture 提供的一些关键异步方法的摘要。

thenApplyAsync()

thenApplyAsync() 函数异步处理任务结果,并生成一个包含修改后结果的新 CompletableFuture。ForkJoinPool.commonPool() 中的一个独立线程负责此处理,确保操作不会阻塞调用线程,并通过利用异步执行模式来增强应用程序的响应能力。

文件名: CompletableFutureExample.java

输出

Hello, John!

thenAcceptAsync()

thenAcceptAsync() 函数支持异步消费任务的输出,但不会返回任何值。执行发生在 ForkJoinPool.commonPool() 的一个独立线程上,确保主工作流不受此过程的中断。

文件名:ThenAcceptAsyncExample.java

输出

User Score: 85

runAsync()

runAsync() 函数支持以异步方式执行任务,不期望返回值。执行在 ForkJoinPool.commonPool() 中的一个独立线程上进行,确保任务不会中断主工作流。

文件名:RunAsyncExample.java

输出

This is an asynchronous task running in a separate thread.
Asynchronous task completed.

CompletableFsutrue 类方法

修饰符和类型方法和描述
CompletableFuture<Void>acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
生成一个新的 CompletionStage,当当前阶段或指定的另一个阶段正常完成时,将使用第一个完成阶段的结果作为输入触发提供的操作的执行。
CompletableFuture<Void>acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
创建一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将通过此阶段的默认执行机制异步执行给定的操作,并使用第一个完成阶段的结果作为操作的输入。
CompletableFuture<Void>acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
生成一个新的 `CompletionStage`,当当前阶段或指定的另一个阶段正常完成时,将使用指定的执行器异步执行提供的函数,并将第一个完成阶段的结果输入到函数中。
<U> CompletableFuture<U>applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
生成一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将执行预定的函数,并将第一个完成阶段的结果作为函数的参数。
<U> CompletableFuture<U>applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
创建一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将使用此阶段的默认执行机制异步执行给定的函数。第一个完成阶段的结果将用作函数的输入。
<U> CompletableFuture<U>applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
生成一个新的 `CompletionStage`,当当前阶段或指定的另一个阶段正常完成时,将使用指定的执行器异步执行提供的函数。函数将接收第一个成功完成的阶段的结果作为其输入。
booleancancel(boolean mayInterruptIfRunning)
如果此 `CompletableFuture` 尚未完成,则将其标记为已取消,并抛出一个 `CancellationException`。
booleancomplete(T value)
为 `get()` 和类似方法设置指定的值,前提是此 `CompletableFuture` 尚未完成。
booleancompleteExceptionally(Throwable ex)
如果 `CompletableFuture` 尚未完成,则调用 `get()` 和相关方法将抛出指定的异常。
CompletableFuture<T>exceptionally(Function<Throwable,? extends T> fn)
生成一个新的 `CompletableFuture`,当当前 `CompletableFuture` 完成时,该新 Future 将完成。如果当前 Future 异常完成,新 Future 将通过将提供的函数应用于导致完成的异常来完成。反之,如果当前 `CompletableFuture` 正常完成,新 Future 也会以相同的值正常完成。
Tget()
阻塞直到此 Future 完成,然后检索其结果。
Tget(long timeout, TimeUnit unit)
暂停执行,最多指定的时间,直到此 Future 完成,然后在该时间范围内可用时返回其结果。
TgetNow(T valueIfAbsent)
如果操作已完成,则返回结果值或抛出任何遇到的异常;否则,返回指定的 valueIfAbsent。
intgetNumberOfDependents()
提供此 `CompletableFuture` 正在等待其完成的 `CompletableFutures` 数量的估计值。
<U> CompletableFuture<U>handle(BiFunction<? super T,Throwable,? extends U> fn)
生成一个新的 `CompletionStage`,在当前阶段完成时(无论是正常还是异常),使用该阶段的结果和任何异常来执行提供的函数。
<U> CompletableFuture<U>handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
创建一个新的 `CompletionStage`,当当前阶段完成时(无论是正常还是异常),将使用此阶段的默认执行机制异步执行指定的函数。该函数将接收当前阶段的结果和任何抛出的异常作为输入。
<U> CompletableFuture<U>handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
生成一个新的 `CompletionStage`,当当前阶段完成时(无论是正常还是异常),将使用指定的执行器执行提供的函数。该函数将接收当前阶段的结果和任何异常作为参数。
booleanisCancelled()
通过返回 true 来指示 `CompletableFuture` 是否在正常完成之前被取消。
booleanisCompletedExceptionally()
如果 CompletableFuture 由于异常而完成,则返回 true,表明其操作过程中发生了错误。
booleanisDone()
如果 CompletableFuture 因任何原因完成(包括正常完成、遇到异常或被取消),则为 true。
Tjoin()
该方法在完成时检索结果值,或者如果 CompletableFuture 因异常而完成,则抛出未检查异常。
voidobtrudeException(Throwable ex)
该操作确保后续尝试通过 get() 方法或类似方法检索结果时将抛出指定的异常,而不管 CompletableFuture 的当前完成状态如何。
voidobtrudeValue(T value)
该操作强制更新 get() 方法及其相关方法将来返回的值,无论 CompletableFuture 是否已完成。
CompletableFuture<Void>runAfterBoth(CompletionStage<?> other, Runnable action)
创建一个新的 CompletionStage,当此阶段和提供的另一个阶段都成功完成后,将触发指定的 action。
CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other, Runnable action)
创建一个新的 CompletionStage,在当前阶段和指定的其他阶段成功完成后,将通过此阶段的默认异步执行服务触发指定的 action。
CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
创建一个新的 CompletionStage,当此阶段和另一个阶段都正常完成后,将使用指定的执行器执行一个 action。
CompletableFuture<Void>runAfterEither(CompletionStage<?> other, Runnable action)
创建一个新的 CompletionStage,当此阶段或指定的另一个阶段正常完成时,将执行一个 action。
CompletableFuture<Void>runAfterEitherAsync(CompletionStage<?> other, Runnable action)
启动一个新的 CompletionStage,当此阶段或指定的另一个阶段正常完成后,将使用此阶段的默认执行机制异步执行给定的 action。
CompletableFuture<Void>runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
生成一个新的 CompletionStage,一旦此阶段或指定的其他阶段成功完成,将使用提供的执行器执行一个指定的 action,并将第一个成功完成的阶段的结果作为 action 的输入。
static CompletableFuture<Void>runAsync(Runnable runnable)
创建一个新的 CompletableFuture,它将在 ForkJoinPool.commonPool() 的一个任务执行指定的 action 后异步完成。
static CompletableFuture<Void>runAsync(Runnable runnable, Executor executor)
生成一个新的 CompletableFuture,它将在指定执行器中的一个任务异步执行指定的 action 后完成。
static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier)
创建一个新的 CompletableFuture,它将在 ForkJoinPool.commonPool() 的一个任务中异步完成,使用由给定 Supplier 产生的值。
static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor)
生成一个新的 CompletableFuture,它将在指定的执行器中执行的任务异步完成,该完成基于调用提供的 Supplier 所获得的值。
CompletableFuture<Void>thenAccept(Consumer<? super T> action)
创建一个新的 CompletionStage,当此阶段正常完成时,将执行提供的 action,并将此阶段的结果作为参数传递给该 action。
CompletableFuture<Void>thenAcceptAsync(Consumer<? super T> action)
启动一个新的 CompletionStage,在当前阶段正常完成后,将使用此阶段的默认执行服务异步执行给定的 action,并将此阶段的结果作为参数传递给 action。
CompletableFuture<Void>thenAcceptAsync(Consumer<? super T> action, Executor executor)
生成一个新的 CompletionStage,当此阶段正常完成后,将使用指定的 Executor 执行提供的 action,并将此阶段的结果作为 action 的输入。
<U> CompletableFuture<Void>thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
这会创建一个新的 CompletionStage,当此阶段和指定的其他阶段都正常完成后,将使用两个阶段的结果作为参数执行给定的 action。
<U> CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
启动一个新的 CompletionStage,在当前阶段和给定的其他阶段都成功完成后,将使用此阶段的默认执行机制异步执行指定的 action,并将两个阶段的结果作为 action 的参数。
<U> CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
生成一个新的 CompletionStage,一旦此阶段和指定的其他阶段都成功完成,将使用指定的执行器执行提供的函数,并将两个阶段的结果作为函数的参数。
<U> CompletableFuture<U>thenApply(Function<? super T,? extends U> fn)
创建一个新的 CompletionStage,当此阶段成功完成后,将执行给定的函数,并将此阶段的结果作为函数的参数。
<U> CompletableFuture<U>thenApplyAsync(Function<? super T,? extends U> fn)
启动一个新的 CompletionStage,在当前阶段正常完成后,将使用此阶段的默认执行服务异步执行指定的函数,并将阶段的结果作为函数的输入。
<U> CompletableFuture<U>thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
生成一个新的 CompletionStage,当此阶段成功完成后,将使用指定的 Executor 执行提供的函数,并将此阶段的结果作为函数的参数。
<U,V> CompletableFuture<V>thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
创建一个新的 CompletionStage,当此阶段和指定的其他阶段都正常完成后,将运行给定的函数,并将两个阶段的结果作为参数。
<U,V> CompletableFuture<V>thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
这会启动一个新的 CompletionStage,在当前阶段和指定的其他阶段都成功完成后,将使用此阶段的默认执行服务异步执行提供的函数,并将两个阶段的结果作为函数的参数。
<U,V> CompletableFuture<V>thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
生成一个新的 CompletionStage,当此阶段和指定的其他阶段都成功完成后,将使用选择的执行器执行指定的函数,并将两个阶段的结果作为函数的参数。
<U> CompletableFuture<U>thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
这会创建一个新的 CompletionStage,当此阶段正常完成后,将执行给定的函数,并将此阶段本身作为参数传递给该函数。
<U> CompletableFuture<U>thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
启动一个新的 CompletionStage,在当前阶段成功完成后,将使用此阶段的默认执行机制异步执行指定的函数,并将此阶段本身作为参数传递给函数。
<U> CompletableFuture<U>thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
生成一个新的 CompletionStage,当此阶段成功完成后,将使用指定的 Executor 执行提供的函数,并将此阶段的结果作为函数的参数。
CompletableFuture<Void>thenRun(Runnable action)
创建一个新的 CompletionStage,当此阶段成功完成后,将执行指定的 action。
CompletableFuture<Void>thenRunAsync(Runnable action)
这会启动一个新的 CompletionStage,在当前阶段成功完成后,将使用此阶段的默认执行服务异步执行给定的 action。
CompletableFuture<Void>thenRunAsync(Runnable action, Executor executor)
生成一个新的 CompletionStage,当此阶段正常完成后,将使用选择的 Executor 执行指定的 action。
CompletableFuture<T>toCompletableFuture()
该语句简单地返回 CompletableFuture 的当前实例。
StringtoString()
这会返回一个标识此 CompletableFuture 并描述其完成状态的字符串。
CompletableFuture<T>whenComplete(BiConsumer<? super T,? super Throwable> action)
创建一个新的 CompletionStage,当此阶段完成后,将执行指定的 action,并传递与此阶段相同的 Outcome 或异常。
CompletableFuture<T>whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
这会启动一个新的 CompletionStage,当此阶段完成后,将使用此阶段的默认执行服务异步执行给定的 action,并反映此阶段的结果或异常。
CompletableFuture<T>whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
生成一个新的 CompletionStage,当此阶段完成后,将使用指定的 Executor 执行指定的 action,并保持与此阶段相同的 Outcome 或异常。

CompletableFuture 的异常处理

考虑下面的图,它表示五个 CF。

CompletableFuture in Java

假设五个 CFs 正在执行,并且 CF21 引发了一个异常,那么所有依赖的 CF (CF31 和 CF41) 都会出错。这意味着:

  • 调用 isCompletedExceptionally() 方法将返回 true。
  • 调用 get() 将抛出 ExecutionException,该异常会引发根异常。

考虑下面的图,我们在其中创建了带异常的 CF30。

CompletableFuture in Java

当 CF21 正常执行时,CF30 仅传输值。如果它引发异常,CF30 会处理它并为 CF31 生成值。

有三种方法可以处理异常


下一个主题Java 教程