什么是 CompletableFuture?2024年10月23日 | 阅读 12 分钟 CompletableFuture 用于异步编程。异步编程意味着编写非阻塞代码。它在一个独立于主应用程序线程的线程上运行任务,并通知主线程其进度、完成或失败情况。 这样,主线程就不会阻塞或等待任务完成。其他任务可以并行执行。并行化可以提高程序的性能。 CompletableFuture 是 Java 中的一个类。它属于 java.util.concurrent 包。它实现了 CompletionStage 和 Future 接口。 CompletionStage- 当另一个完成阶段完成后,它会执行一个操作并返回一个值。
- 一个可能触发其他任务的任务模型。
因此,它是链条中的一个元素。 当多个线程尝试完成 - 异常完成或取消 CompletableFuture 时,只有一个会成功。 Future vs. CompletableFutureCompletableFuture 是 Java 8 中引入的 Java Future API 的扩展。 Future 用于异步编程。它提供了 isDone() 和 get() 两个方法。这些方法在计算完成后检索结果。 Future 的局限性- Future 不能相互完成。
- 我们无法在不阻塞的情况下对 Future 的结果执行进一步的操作。
- Future 没有异常处理。
- 我们无法组合多个 Future。
Future 有很多局限性,这就是我们有 CompletableFuture 的原因。CompletableFuture 提供了创建多个 Future、链式调用和组合的广泛方法。它还具有全面的异常处理支持。 创建 CompletableFuture我们只能使用以下无参构造函数来创建 CompletableFuture。 示例最常用的 CompletableFuture 方法是 - supplyAsync(): 它异步完成其工作。Supplier 的结果默认由 ForkJoinPool.commonPool() 中的一个任务运行。supplyAsync() 方法返回 CompletableFuture,我们可以在其上应用其他方法。
- thenApply(): 该方法接受一个函数作为参数。当此阶段正常完成时,它返回一个新的 CompletableStage。新阶段用作提供的函数的参数。
- join(): 该方法在完成时返回结果值。如果异常完成,它还会抛出 CompletionException(未检查异常)。
文件名:CompletableFutureExample1.java 输出  CompletableFuture 的异步方法CompletableFuture 提供了一组异步方法,支持非阻塞操作,允许在单独的线程中执行任务而不阻塞调用线程。这些方法对于开发响应式应用程序至关重要,尤其是在处理 I/O 密集型任务、复杂计算或任何持续时间不可预测的过程时。以下是 CompletableFuture 提供的一些关键异步方法的摘要。 thenApplyAsync()thenApplyAsync() 函数异步处理任务结果,并生成一个包含修改后结果的新 CompletableFuture。ForkJoinPool.commonPool() 中的一个独立线程负责此处理,确保操作不会阻塞调用线程,并通过利用异步执行模式来增强应用程序的响应能力。 文件名: CompletableFutureExample.java 输出 thenAcceptAsync()thenAcceptAsync() 函数支持异步消费任务的输出,但不会返回任何值。执行发生在 ForkJoinPool.commonPool() 的一个独立线程上,确保主工作流不受此过程的中断。 文件名:ThenAcceptAsyncExample.java 输出 runAsync()runAsync() 函数支持以异步方式执行任务,不期望返回值。执行在 ForkJoinPool.commonPool() 中的一个独立线程上进行,确保任务不会中断主工作流。 文件名:RunAsyncExample.java 输出 This is an asynchronous task running in a separate thread.
Asynchronous task completed.
CompletableFsutrue 类方法| 修饰符和类型 | 方法和描述 |
|---|
| CompletableFuture<Void> | acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 生成一个新的 CompletionStage,当当前阶段或指定的另一个阶段正常完成时,将使用第一个完成阶段的结果作为输入触发提供的操作的执行。 | | CompletableFuture<Void> | acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 创建一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将通过此阶段的默认执行机制异步执行给定的操作,并使用第一个完成阶段的结果作为操作的输入。 | | CompletableFuture<Void> | acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 生成一个新的 `CompletionStage`,当当前阶段或指定的另一个阶段正常完成时,将使用指定的执行器异步执行提供的函数,并将第一个完成阶段的结果输入到函数中。 | | <U> CompletableFuture<U> | applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 生成一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将执行预定的函数,并将第一个完成阶段的结果作为函数的参数。 | | <U> CompletableFuture<U> | applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 创建一个新的 `CompletionStage`,一旦当前阶段或指定的另一个阶段成功完成,将使用此阶段的默认执行机制异步执行给定的函数。第一个完成阶段的结果将用作函数的输入。 | | <U> CompletableFuture<U> | applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 生成一个新的 `CompletionStage`,当当前阶段或指定的另一个阶段正常完成时,将使用指定的执行器异步执行提供的函数。函数将接收第一个成功完成的阶段的结果作为其输入。 | | boolean | cancel(boolean mayInterruptIfRunning) 如果此 `CompletableFuture` 尚未完成,则将其标记为已取消,并抛出一个 `CancellationException`。 | | boolean | complete(T value) 为 `get()` 和类似方法设置指定的值,前提是此 `CompletableFuture` 尚未完成。 | | boolean | completeExceptionally(Throwable ex) 如果 `CompletableFuture` 尚未完成,则调用 `get()` 和相关方法将抛出指定的异常。 | | CompletableFuture<T> | exceptionally(Function<Throwable,? extends T> fn) 生成一个新的 `CompletableFuture`,当当前 `CompletableFuture` 完成时,该新 Future 将完成。如果当前 Future 异常完成,新 Future 将通过将提供的函数应用于导致完成的异常来完成。反之,如果当前 `CompletableFuture` 正常完成,新 Future 也会以相同的值正常完成。 | | T | get() 阻塞直到此 Future 完成,然后检索其结果。 | | T | get(long timeout, TimeUnit unit) 暂停执行,最多指定的时间,直到此 Future 完成,然后在该时间范围内可用时返回其结果。 | | T | getNow(T valueIfAbsent) 如果操作已完成,则返回结果值或抛出任何遇到的异常;否则,返回指定的 valueIfAbsent。 | | int | getNumberOfDependents() 提供此 `CompletableFuture` 正在等待其完成的 `CompletableFutures` 数量的估计值。 | | <U> CompletableFuture<U> | handle(BiFunction<? super T,Throwable,? extends U> fn) 生成一个新的 `CompletionStage`,在当前阶段完成时(无论是正常还是异常),使用该阶段的结果和任何异常来执行提供的函数。 | | <U> CompletableFuture<U> | handleAsync(BiFunction<? super T,Throwable,? extends U> fn) 创建一个新的 `CompletionStage`,当当前阶段完成时(无论是正常还是异常),将使用此阶段的默认执行机制异步执行指定的函数。该函数将接收当前阶段的结果和任何抛出的异常作为输入。 | | <U> CompletableFuture<U> | handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) 生成一个新的 `CompletionStage`,当当前阶段完成时(无论是正常还是异常),将使用指定的执行器执行提供的函数。该函数将接收当前阶段的结果和任何异常作为参数。 | | boolean | isCancelled() 通过返回 true 来指示 `CompletableFuture` 是否在正常完成之前被取消。 | | boolean | isCompletedExceptionally() 如果 CompletableFuture 由于异常而完成,则返回 true,表明其操作过程中发生了错误。 | | boolean | isDone() 如果 CompletableFuture 因任何原因完成(包括正常完成、遇到异常或被取消),则为 true。 | | T | join() 该方法在完成时检索结果值,或者如果 CompletableFuture 因异常而完成,则抛出未检查异常。 | | void | obtrudeException(Throwable ex) 该操作确保后续尝试通过 get() 方法或类似方法检索结果时将抛出指定的异常,而不管 CompletableFuture 的当前完成状态如何。 | | void | obtrudeValue(T value) 该操作强制更新 get() 方法及其相关方法将来返回的值,无论 CompletableFuture 是否已完成。 | | CompletableFuture<Void> | runAfterBoth(CompletionStage<?> other, Runnable action) 创建一个新的 CompletionStage,当此阶段和提供的另一个阶段都成功完成后,将触发指定的 action。 | | CompletableFuture<Void> | runAfterBothAsync(CompletionStage<?> other, Runnable action) 创建一个新的 CompletionStage,在当前阶段和指定的其他阶段成功完成后,将通过此阶段的默认异步执行服务触发指定的 action。 | | CompletableFuture<Void> | runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) 创建一个新的 CompletionStage,当此阶段和另一个阶段都正常完成后,将使用指定的执行器执行一个 action。 | | CompletableFuture<Void> | runAfterEither(CompletionStage<?> other, Runnable action) 创建一个新的 CompletionStage,当此阶段或指定的另一个阶段正常完成时,将执行一个 action。 | | CompletableFuture<Void> | runAfterEitherAsync(CompletionStage<?> other, Runnable action) 启动一个新的 CompletionStage,当此阶段或指定的另一个阶段正常完成后,将使用此阶段的默认执行机制异步执行给定的 action。 | | CompletableFuture<Void> | runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) 生成一个新的 CompletionStage,一旦此阶段或指定的其他阶段成功完成,将使用提供的执行器执行一个指定的 action,并将第一个成功完成的阶段的结果作为 action 的输入。 | | static CompletableFuture<Void> | runAsync(Runnable runnable) 创建一个新的 CompletableFuture,它将在 ForkJoinPool.commonPool() 的一个任务执行指定的 action 后异步完成。 | | static CompletableFuture<Void> | runAsync(Runnable runnable, Executor executor) 生成一个新的 CompletableFuture,它将在指定执行器中的一个任务异步执行指定的 action 后完成。 | | static <U> CompletableFuture<U> | supplyAsync(Supplier<U> supplier) 创建一个新的 CompletableFuture,它将在 ForkJoinPool.commonPool() 的一个任务中异步完成,使用由给定 Supplier 产生的值。 | | static <U> CompletableFuture<U> | supplyAsync(Supplier<U> supplier, Executor executor) 生成一个新的 CompletableFuture,它将在指定的执行器中执行的任务异步完成,该完成基于调用提供的 Supplier 所获得的值。 | | CompletableFuture<Void> | thenAccept(Consumer<? super T> action) 创建一个新的 CompletionStage,当此阶段正常完成时,将执行提供的 action,并将此阶段的结果作为参数传递给该 action。 | | CompletableFuture<Void> | thenAcceptAsync(Consumer<? super T> action) 启动一个新的 CompletionStage,在当前阶段正常完成后,将使用此阶段的默认执行服务异步执行给定的 action,并将此阶段的结果作为参数传递给 action。 | | CompletableFuture<Void> | thenAcceptAsync(Consumer<? super T> action, Executor executor) 生成一个新的 CompletionStage,当此阶段正常完成后,将使用指定的 Executor 执行提供的 action,并将此阶段的结果作为 action 的输入。 | | <U> CompletableFuture<Void> | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 这会创建一个新的 CompletionStage,当此阶段和指定的其他阶段都正常完成后,将使用两个阶段的结果作为参数执行给定的 action。 | | <U> CompletableFuture<Void> | thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 启动一个新的 CompletionStage,在当前阶段和给定的其他阶段都成功完成后,将使用此阶段的默认执行机制异步执行指定的 action,并将两个阶段的结果作为 action 的参数。 | | <U> CompletableFuture<Void> | thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 生成一个新的 CompletionStage,一旦此阶段和指定的其他阶段都成功完成,将使用指定的执行器执行提供的函数,并将两个阶段的结果作为函数的参数。 | | <U> CompletableFuture<U> | thenApply(Function<? super T,? extends U> fn) 创建一个新的 CompletionStage,当此阶段成功完成后,将执行给定的函数,并将此阶段的结果作为函数的参数。 | | <U> CompletableFuture<U> | thenApplyAsync(Function<? super T,? extends U> fn) 启动一个新的 CompletionStage,在当前阶段正常完成后,将使用此阶段的默认执行服务异步执行指定的函数,并将阶段的结果作为函数的输入。 | | <U> CompletableFuture<U> | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 生成一个新的 CompletionStage,当此阶段成功完成后,将使用指定的 Executor 执行提供的函数,并将此阶段的结果作为函数的参数。 | | <U,V> CompletableFuture<V> | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 创建一个新的 CompletionStage,当此阶段和指定的其他阶段都正常完成后,将运行给定的函数,并将两个阶段的结果作为参数。 | | <U,V> CompletableFuture<V> | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 这会启动一个新的 CompletionStage,在当前阶段和指定的其他阶段都成功完成后,将使用此阶段的默认执行服务异步执行提供的函数,并将两个阶段的结果作为函数的参数。 | | <U,V> CompletableFuture<V> | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 生成一个新的 CompletionStage,当此阶段和指定的其他阶段都成功完成后,将使用选择的执行器执行指定的函数,并将两个阶段的结果作为函数的参数。 | | <U> CompletableFuture<U> | thenCompose(Function<? super T,? extends CompletionStage<U>> fn) 这会创建一个新的 CompletionStage,当此阶段正常完成后,将执行给定的函数,并将此阶段本身作为参数传递给该函数。 | | <U> CompletableFuture<U> | thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) 启动一个新的 CompletionStage,在当前阶段成功完成后,将使用此阶段的默认执行机制异步执行指定的函数,并将此阶段本身作为参数传递给函数。 | | <U> CompletableFuture<U> | thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) 生成一个新的 CompletionStage,当此阶段成功完成后,将使用指定的 Executor 执行提供的函数,并将此阶段的结果作为函数的参数。 | | CompletableFuture<Void> | thenRun(Runnable action) 创建一个新的 CompletionStage,当此阶段成功完成后,将执行指定的 action。 | | CompletableFuture<Void> | thenRunAsync(Runnable action) 这会启动一个新的 CompletionStage,在当前阶段成功完成后,将使用此阶段的默认执行服务异步执行给定的 action。 | | CompletableFuture<Void> | thenRunAsync(Runnable action, Executor executor) 生成一个新的 CompletionStage,当此阶段正常完成后,将使用选择的 Executor 执行指定的 action。 | | CompletableFuture<T> | toCompletableFuture() 该语句简单地返回 CompletableFuture 的当前实例。 | | String | toString() 这会返回一个标识此 CompletableFuture 并描述其完成状态的字符串。 | | CompletableFuture<T> | whenComplete(BiConsumer<? super T,? super Throwable> action) 创建一个新的 CompletionStage,当此阶段完成后,将执行指定的 action,并传递与此阶段相同的 Outcome 或异常。 | | CompletableFuture<T> | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 这会启动一个新的 CompletionStage,当此阶段完成后,将使用此阶段的默认执行服务异步执行给定的 action,并反映此阶段的结果或异常。 | | CompletableFuture<T> | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 生成一个新的 CompletionStage,当此阶段完成后,将使用指定的 Executor 执行指定的 action,并保持与此阶段相同的 Outcome 或异常。 |
CompletableFuture 的异常处理考虑下面的图,它表示五个 CF。  假设五个 CFs 正在执行,并且 CF21 引发了一个异常,那么所有依赖的 CF (CF31 和 CF41) 都会出错。这意味着: - 调用 isCompletedExceptionally() 方法将返回 true。
- 调用 get() 将抛出 ExecutionException,该异常会引发根异常。
考虑下面的图,我们在其中创建了带异常的 CF30。  当 CF21 正常执行时,CF30 仅传输值。如果它引发异常,CF30 会处理它并为 CF31 生成值。 有三种方法可以处理异常
|