CompletableFuture详解
# CompletableFuture详解
# 1、CompletableFuture简介
# 简介
CompletableFuture
是 Java 8 引入的一个用于异步编程的类。 在CompletableFuture
出现之前,我们可以使用线程池配合Future来处理异步任务。但是线程池的异步操作功能难以编写复杂异步操作的组合和控制。
所以JDK8推出了CompletableFuture
,它允许我们编写非阻塞的代码,并提供了一种简洁的方式来处理异步任务和并发操作,支持多种异步操作的组合和控制。
# CompletableFuture应用场景
还是拿狗吃骨头举例:
秀逗想吃骨头,但是它必须等待男主人把骨头砸碎,女主人把熬汤的料包准备好,再把碎骨头和料包放在一起熬汤,最后等汤熬好秀逗才能吃上。
# 使用线程池实现:
import java.util.concurrent.*;
public class TestA {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
9,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
Future<String> future1 = executor.submit(() -> {
System.out.println("男主人砸骨头...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "骨头准备好了";
});
Future<String> future2 = executor.submit(() -> {
System.out.println("女主人准备料包...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "料包准备好了";
});
Future<String> future3 = executor.submit(() -> {
System.out.println("开始熬骨头汤...");
try {
String data1 = future1.get();
String data2 = future2.get();
} catch (Exception e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "骨头汤做好了";
});
try {
String s = future3.get();
System.out.println(s);
System.out.println("秀逗开始喝骨头汤~");
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
执行结果:
男主人砸骨头...
女主人准备料包...
开始熬骨头汤...
骨头汤做好了
秀逗开始喝骨头汤~
# 使用CompletableFuture实现:
import java.util.concurrent.*;
public class TestA {
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
1,
9,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// 使用自定义的线程池executorService执行任务
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
System.out.println("男主人砸骨头...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return "骨头准备好了";
}, executorService);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
System.out.println("女主人准备料包...");
// 模拟从数据源B获取数据
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return "料包准备好了";
}, executorService);
// 合并两个 CompletableFuture
CompletableFuture<String> combinedFuture = futureA.thenCombine(futureB, (dataA, dataB) -> {
// System.out.println(dataA + "&" + dataB);
// 合并数据
System.out.println("开始熬骨头汤...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "骨头汤做好了";
});
// 处理数据
combinedFuture.thenAccept(combinedData -> {
System.out.println(combinedData);
System.out.println("秀逗开始喝骨头汤~");
});
// 确保所有任务完成
combinedFuture.join();
executorService.shutdown();
}
}
执行结果:
男主人砸骨头...
女主人准备料包...
开始熬骨头汤...
骨头汤做好了
秀逗开始喝骨头汤~
从上面的场景实现可以看出,单纯的使用线程池处理多任务的组合或者等待,需要自己处理每个任务的结果获取和组合等情况,也得自己try catch
处理异常,如果子任务非常多的情况下,这种处理方式就会变得非常繁琐容易出错。
如果使用CompletableFuture
来处理异步任务的组合、结果获取等情况,可以利用CompletableFuture
提供的API,比如thenCombine
、thenAccept
等操作来灵活组合任务。
# 使用 CompletableFuture 的优势
简化异步任务组合:
CompletableFuture 提供了 thenCombine, thenCompose, allOf 等方法,可以轻松组合多个异步任务的结果。
例如,thenCombine 可以将两个异步任务的结果合并在一起处理,而 thenCompose 允许你将一个异步操作的结果作为另一个异步操作的输入。
提高代码可读性:
使用 CompletableFuture 的链式调用使得异步操作的逻辑更加直观和易于理解。
代码结构更清晰,不需要显式地获取 Future 的结果和处理任务的执行顺序。
方便的错误处理:
CompletableFuture 提供了 exceptionally, handle 等方法来处理异步操作中的异常,使得错误处理更加简洁。
可以在任务链中处理异常,而不需要在每个任务中都编写错误处理逻辑。
灵活的任务调度:
通过 CompletableFuture,你可以指定任务的执行顺序和依赖关系,从而灵活地控制任务的执行流程。
CompletableFuture 允许你将异步任务的执行和结果处理逻辑分离,从而提高了代码的灵活性和可维护性。
# CompletableFuture和线程池处理异步任务的比较
# 线程池
需要手动管理 Future 对象和任务之间的依赖关系。
必须显式地调用 future1.get() 和 future2.get() 来等待任务完成,增加了复杂性。
合成任务的逻辑可能变得复杂,特别是在处理多个任务时。
# CompletableFuture
通过 thenCombine 轻松合并两个异步任务的结果。
使用链式调用和内置的异常处理方法,简化了代码和错误处理。
更直观地表达了任务之间的依赖关系和执行顺序。
# 2、CompletableFuture的继承体系
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
Future接口就不多说了,可以参考 FutureTask详解 (opens new window)。
# CompletionStage
接口
CompletionStage
接口的核心:
异步阶段: 每个 CompletionStage 表示异步计算的一个阶段。计算可能是由上一个阶段的完成触发的,并可能会触发下一个阶段。
链式调用: CompletionStage 的方法返回值都是 CompletionStage 类型,这样可以实现链式调用,从而将多个异步任务链接在一起。
函数式编程: 提供了许多函数式编程方法,如 thenApply、thenCompose、thenAccept 等,允许我们根据实际业务需求对异步任务进行组合和编排。
CompletionStage
接口里面定义了非常多的方法。 所有支持链式调用的方法的返回值都是CompletionStage类型,这样才能实现如下的链式调用:future1.thenApply(xxx).thenApply(xxx).thenCompose(…).thenRun(..)
# 3、CompletableFuture的常用方法
# ①、创建CompletableFuture对象
最简单的创建CompletableFuture
对象方式:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
最简单的CompletableFuture使用示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestA {
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
completableFuture.complete("完成completableFuture");
}, "t1");
// 新创建一个t1线程 来完成 completableFuture
t1.start();
try {
String s = completableFuture.get();
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
完成completableFuture
使用静态工厂方法创建 CompletableFuture 对象:
// 异步地计算结果并返回
CompletableFuture.supplyAsync(Supplier<T> supplier)
// 异步地执行任务,但不返回结果
CompletableFuture.runAsync(Runnable runnable)
# ②、提交任务的方法runAsync
runAsync
方法: 异步执行无返回结果的任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestA {
public static void main(String[] args) {
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println("异步执行任务");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
runAsync.get();
System.out.println("runAsync执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
异步执行任务
runAsync执行完毕
注意runAsync(Runnable runnable)
方法使用的是ForkJoinPool线程池
或者使用ThreadPerTaskExecutor
执行任务。
其中ForkJoinPool计划后续单独写一篇博介绍。
而ThreadPerTaskExecutor
的实现非常简单:
就是一个CompletableFuture
的静态内部类
直接创建一个新的线程执行任务。
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
还有一个重载方法:runAsync(Runnable runnable,Executor executor)
可以传入自定义的线程池执行任务。
# ③、提交任务的方法supplyAsync
supplyAsync
方法: 异步执行有返回结果的任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TestA {
public static void main(String[] args) {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("异步执行任务");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "supplyAsync执行完毕";
});
try {
String s = supplyAsync.get();
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
异步执行任务
supplyAsync执行完毕
supplyAsync
也有一个重载方法:CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
可以传入自定义的线程池执行任务。
# ④、链式调用的方法
# thenRun
thenRun
方法在当前 CompletionStage
完成后,执行一个指定的 Runnable 操作。
这个操作不依赖于之前的计算结果,也不返回任何结果。
示例:
CompletableFuture.supplyAsync(() -> {
// 模拟计算
return 42;
}).thenRun(() -> {
// 在前一个计算完成后执行的操作
System.out.println("在前一个计算完成后执行的操作");
});
# thenAccept
thenAccept
方法在当前 CompletionStage
完成后,接受一个指定的 Consumer
操作。
该操作接收前一个计算的结果,但不返回任何结果。
示例:
CompletableFuture.supplyAsync(() -> {
// 模拟计算
return "Hello, World!";
}).thenAccept(result -> {
// 在前一个计算完成后,处理结果
System.out.println("Result: " + result);
});
# thenApply
thenApply
方法在当前 CompletionStage
完成后,应用一个指定的 Function
操作,并返回一个新的 CompletionStage,
其结果是应用函数后的计算结果。
示例:
CompletableFuture.supplyAsync(() -> {
// 模拟计算
return 5;
}).thenApply(result -> {
// 在前一个计算完成后,处理并转换结果
return result * 2;
}).thenAccept(result -> {
// 输出转换后的结果
System.out.println("计算结果: " + result); // 10
});
# whenComplete
whenComplete 方法允许我们在 CompletionStage
完成后,无论成功还是失败,执行一个 BiConsumer
操作。这个操作接收计算的结果和可能发生的异常。
示例:
CompletableFuture.supplyAsync(() -> {
// 模拟计算
double score = Math.random();
System.out.println(score);
if (score < 0.6) {
throw new RuntimeException("不及格!");
}
return "及格!";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Error: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
# whenCompleteAsync
whenCompleteAsync
方法与 whenComplete
相似,但它在默认的或指定的异步执行器上异步执行操作。
这个方法常用于需要异步执行后处理的场景。
示例:
CompletableFuture.supplyAsync(() -> {
// 模拟计算
double score = Math.random();
System.out.println(score);
if (score < 0.6) {
throw new RuntimeException("不及格!");
}
return "及格!";
}).whenCompleteAsync((result, ex) -> {
if (ex != null) {
System.err.println("Error: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
}, Executors.newFixedThreadPool(2)); // 使用指定的 Executor
# 注意点:
上面说的几个方法 都有对应的 xxxAsync
方法,就像whenComplete
和whenCompleteAsync
的区别类似。
whenComplete
的执行线程是触发 CompletionStage
完成的线程。如果你在主线程中调用 whenComplete
,则 BiConsumer
操作将在主线程中执行。如果 CompletionStage
是在一个线程池中完成的,那么 BiConsumer
操作会在那个线程池里的线程上执行。
whenCompleteAsync
方法允许你指定一个 Executor
,以控制异步操作的执行线程。即使 CompletionStage
在某个线程中完成,whenCompleteAsync
中的操作会在指定的 Executor
上异步执行。
# ⑤、CompletableFuture
的组合
# thenCompose
thenCompose
用于将一个 CompletableFuture
的结果映射到另一个 CompletableFuture
。
适用于后一个异步操作依赖于前一个异步操作结果的场景。
示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> future2 = future1.thenCompose(
result -> CompletableFuture.supplyAsync(() -> result * 2));
future2.thenAccept(result -> System.out.println("Result: " + result)); // 输出: Result: 10
# thenCombine
thenCombine
用于将两个独立的 CompletableFuture
的结果合并成一个结果。
它允许你在两个异步任务完成后进行操作,这两个任务可以是独立的。
适用于两个不关联的异步任务结果合并的场景。
示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2,
(result1, result2) -> result1 + result2);
// 输出: Combined Result: 15
combinedFuture.thenAccept(result -> System.out.println("Combined Result: " + result));
# allOf
allOf
用于等待一组 CompletableFuture
全部完成。这对于在多个异步任务都完成后执行某个操作很有用。
生产中可以利用allOf
处理需要多个异步调用接口并合并结果的场景。
注意:
allOf
返回的是CompletableFuture<Void>
并不包含结果,如果需要得到全部任务的结果,还是需要使用任务返回的CompletableFuture
对象调用get
方法或者join
方法。所以下面的示例最后使用了thenApply
方法,获取了全部结果并汇总后返回一个新的CompletableFuture
。
示例:
public static void main(String[] args){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "task1 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "task2 ";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "task3 ";
});
// allOf 等待全部任务完成
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3);
// 获取全部任务结果 并处理
CompletableFuture<String> thenApply = allOfFuture.thenApply((v) -> {
// join 不需要显示处理异常
String s1 = future1.join();
String s2 = future2.join();
String s3 = future3.join();
return s1 + s2 + s3;
});
try {
// get需要显示处理异常
String result = thenApply.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
# anyOf
anyOf
用于等待一组 CompletableFuture
中的任意一个完成。它返回一个新的 CompletableFuture
,该 CompletableFuture
会在任意一个输入 CompletableFuture
完成时完成。
示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
}
return "Task 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
// 输出: First completed: Task 2
anyOfFuture.thenAccept(result -> System.out.println("First completed: " + result));
# acceptEither
这个方法和anyOf 类似。
acceptEither 处理的是两个任务之间有一个完成就继续执行下一个任务。
示例:
public static void main(String[] args){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "task1 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task2 ";
});
future1.acceptEither(future2,(res)->{
System.out.println(res+"完成"); // 输出task1
System.out.println("task3执行");
});
}
# 注意点 CompletableFuture的join方法和get方法的区别
join()
是处理 CompletableFuture
的结果时一种更简单的方式,因为它只抛出 CompletionException
(运行时异常),而不需要处理 InterruptedException
和 ExecutionException
(非运行时异常)。
get()
提供了更详细的异常处理能力,但需要处理更多的异常类型。
# ⑥、异常处理
CompletableFuture 提供了多种方法来处理和传播异常,正确的使用这些方法,能提升程序的可控性和健壮性,正确记录异常信息也能对程序的维护提供保障。
# exceptionally
exceptionally
方法用于处理异步操作中发生的异常。它允许你定义一个回调函数来处理异常情况,并返回一个默认值。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a =0;
if(a>0){
return "执行完毕~";
}else {
throw new RuntimeException("报错了!");
}
});
CompletableFuture<String> handledFuture = future.exceptionally(ex -> {
System.out.println("处理异常信息: " + ex.getMessage());
return "报错后返回默认值";
});
handledFuture.thenAccept(result -> System.out.println("Result: " + result));
# handle
handle
方法既处理结果也处理异常。它接收两个参数:正常结果和异常,然后返回一个新值。无论是正常完成还是异常完成,handle
方法都会被调用。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a =0;
if(a>0){
return "执行完毕~";
}else {
throw new RuntimeException("报错了!");
}
});
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
System.out.println("处理异常信息: " + ex.getMessage());
return "报错后返回默认值";
} else {
return result;
}
});
handledFuture.thenAccept(result -> System.out.println("Result: " + result));
# whenComplete
whenComplete
方法允许我们在 CompletableFuture
完成时执行一个回调,无论是正常完成还是异常完成。它的回调函数接收两个参数:结果和异常。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
if (a > 0) {
return "执行完毕~";
} else {
throw new RuntimeException("报错了!");
}
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("处理异常信息: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
# 4、CompletableFuture实现原理分析
下面源码基于JDK8
先思考两个问题:
- ①、CompletableFuture链式调用的任务存放在哪,一个任务完成后如果通知另一个任务?
- ②、任意多个CompletableFuture的组合执行是如何存放任务的?
先看下CompletableFuture的几个属性:
// 这三个常量定义了不同的任务完成模式:
// SYNC 代表同步模式,ASYNC 代表异步模式,NESTED 代表嵌套模式
static final int SYNC = 0;
static final int ASYNC = 1;
static final int NESTED = -1;
// NIL 用于表示一个空的结果
static final AltResult NIL = new AltResult(null);
// 这个属性存储 `CompletableFuture` 的结果或者一个 `AltResult` 对象
volatile Object result;
// `Completion` 类型的链表头,用于链式任务处理
// 实际上是 Treiber stack数据结构
volatile Completion stack;
关于 Treiber stack可以参考另一篇文章 FutureTask详解 (opens new window)
Completion 抽象类:
Completion
抽象类用于表示一个完成操作,它可能会在任务完成时被触发,并且它们在链表中以 Treiber stack 形式存储。
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
// `next` 属性指向链表中的下一个 `Completion` 对象
volatile Completion next;
/**
* 在任务完成时执行完成操作,如果需要的话,返回可能需要传播的依赖。
*
* @param mode 完成模式,可以是 SYNC、ASYNC 或 NESTED
* @return 如果存在依赖需要传播,则返回它
*/
abstract CompletableFuture<?> tryFire(int mode);
/**
* 返回 true 如果当前 `Completion` 对象可能仍然可以被触发。
* 用于 `cleanStack` 方法来检查当前 `Completion` 是否仍然有效。
*
* @return 如果 `Completion` 可能仍然可以触发,则返回 true
*/
abstract boolean isLive();
// 实现 `Runnable` 接口的 `run` 方法,调用 `tryFire` 方法,模式为 ASYNC
public final void run() {
tryFire(ASYNC);
}
// 实现 `ForkJoinTask` 的 `exec` 方法,调用 `tryFire` 方法,模式为 ASYNC
public final boolean exec() {
tryFire(ASYNC);
return true;
}
// 实现 `ForkJoinTask` 的方法,`getRawResult` 返回 null
public final Void getRawResult() {
return null;
}
// 实现 `ForkJoinTask` 的方法,`setRawResult` 不执行任何操作
public final void setRawResult(Void v) {}
}
Completion 的子类: 下面这张图来自 https://p0.meituan.net/travelcube/5a889b90d0f2c2a0f6a4f294b9094194112106.png
# 任务链式调用:
以下面这段伪代码举例:
CompletableFuture.supplyAsync(() -> {task1}).thenApply(()->{task2}).thenRun(()->{task3});
上面通过supplyAsync、thenApply、thenRun
方法,分别提交了3个任务,每个方法都会返回一个CompletableFuture
对象。
其中task2会被放入task1的CompletableFuture
维护的Completion stack
也就是Treiber栈中。当task1完成时,会从自己的Treiber栈中
弹出下一个任务task2执行。
如此传递下去、task3会被放入task2的CompletableFuture
维护的Completion stack
也就是Treiber栈中。当task2完成时,会从自己的Treiber栈中弹出下一个任务task3执行。
总结下任务的链式调用机制:
栈结构: 每个 CompletableFuture 维护了一个 Completion 链表,链表中存储了所有需要在当前 CompletableFuture 完成时执行的任务。当一个任务(如task1)完成时,它会遍历链表中的 Completion 对象,将任务按顺序执行。
任务触发:
run
方法会调用tryFire
方法来触发链表中的任务,这样每个任务都能按照预定顺序执行。
其实对于任务的链式调用完全可以不使用栈结构,直接用一个指针把所有任务串成一个单向链表结构就可以了。
但实际上任务还有许多复杂的组合方式,要支持这些任务的各种组合,就需要使用栈结构了。
# 多个CompletableFuture的组合执行
参考《Java并发实现原理:JDK源码剖析》书中的任务组合方式。
任务1完成后,任务2和任务3可以并行执行。
任务2完成后任务4可以执行。
任务2和任务3都完成后任务5可以执行。
任务3完成后任务6可以执行。
其中任务4、5、6可以并行执行。
任务4、5、6有一个完成了,任务7可以执行。
这种任务的组合方式 就成了一种网状结构,并且组成了有向无环图。
我们分析下这个图:
任务1的CompletableFuture
的栈中存储了任务2、任务3。
任务2的CompletableFuutre
的栈中存储了任务4、任务5;
任务3的CompletableFuutre
的栈中存储了任务5、任务6;
任务4、5、6的CompletableFuutre
的栈中存储了任务7;
其中有个问题需要注意:
问:
任务2、任务3的CompletableFuture里面,都存储了任务5,那么任务5是不是会被触发两次,并且执行2次呢?
答:
任务5会被触发2次,但它会判断任务2、任务3的结果是不是都完成了,如果只完成其中一个,它就不会执行。所以任务5会被触发2次但只有任务2、任务3都完成了才会真正执行一次。 对于任务7也是同样的道理。
代码实现上图任务编排示例:
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "任务1";
});
CompletableFuture<String> future2 = future1.thenApplyAsync((result1) -> {
return "任务2";
});
CompletableFuture<String> future3 = future1.thenApplyAsync((result1) -> {
return "任务3";
});
CompletableFuture<String> future4 = future2.thenApplyAsync((result2) -> {
return "任务4";
});
CompletableFuture<String> future5 = future2.thenCombine(future3, (result2, result3) -> {
return "任务5";
});
CompletableFuture<String> future6 = future3.thenApplyAsync((result3) -> {
return "任务6";
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future4, future5, future6);
CompletableFuture<String> future7 = anyOf.thenApply((result) -> {
return "任务7";
});
System.out.println(future7.join());
}
可以看出CompletableFuture对于异步任务的组合能力真的很强大。
对于allOf
或者anyOf
这种可以组合任意多个任务的方法是如何处理的呢?
首先看下两个任务的组合:
比如thenCombine
方法,表示的是and逻辑,对应BiApply
内部类。而applyToEither
方法,表示的是or逻辑,对应OrApply
内部类。
BiApply
和OrApply
都是二元操作符,只能传入二个被依赖的任务。
下面以BiApply
为例说明:
BiApply
构造函数:
BiApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src, CompletableFuture<U> snd,
BiFunction<? super T,? super U,? extends V> fn) {
super(executor, dep, src, snd); this.fn = fn;
}
BiApply通过src和snd两个属性关联被依赖的两个CompletableFuture
,thenCombine
方法会尝试将BiApply
压入这两个被依赖的CompletableFuture
的栈中,每个被依赖的CompletableFuture
完成时都会尝试触发BiApply
的tryFire
方法, BiApply
会检查两个依赖是否都完成,如果完成则开始执行。为了解决重复触发的问题, BiApply
利用CAS操作,执行时会先通过CAS设置状态位,避免重复触发。这也解释了上面说的依赖多个任务会不会被触发多次的问题。
有了上面的基础后再看多个任务的组合:
allOf
对应的是 and逻辑,对应BiRelay
内部类。anyOf
对应的是or逻辑,对应OrRelay
内部类。
对于任意多个的多任务组合实际上都可以重组成多个两个任务的组合。
这里还拿《Java并发实现原理:JDK源码剖析》 中的图看一下:
可以看到 三个任务的组合,可以被重组为多次两个任务的组合。
而BiRelay
和OrRelay
要做的就是把多个任务中的两个重组成一个中间任务,然后再拿这个中间任务继续和另外的任务重组成另一个中间任务,直到最后重组成一个最终的任务。
allOf
或者anyOf
最终会把多个被依赖的CompletableFuture构建成一棵平衡二叉树,执行层层通知,直到根节点,触发回调监听。
拿上面举例就是future7会触发最终的回调监听。
对应的源码:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
这里只看一下andTree方法:
andTree
用于将多个 CompletableFuture
组织成一个树形结构,以便在所有 CompletableFuture
完成时进行统一的处理。
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) {
// 创建一个新的 CompletableFuture 对象,用于最终的合并结果
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 如果范围为空(lo > hi),说明没有任务,直接设置结果为 NIL
if (lo > hi) {
d.result = NIL;
} else {
// 定义两个子任务的 CompletableFuture
CompletableFuture<?> a, b;
// 计算中间索引
int mid = (lo + hi) >>> 1;
// 将任务数组分为两半,递归处理每一半
// 如果 lo == mid,则只取当前任务
// 否则递归处理左半部分
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
// 如果 hi == mid+1,则只取当前任务
// 否则递归处理右半部分
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
// 尝试将 BiRelay 任务推送到 CompletableFuture d 的栈中
// biRelay 方法用于设置任务的触发逻辑
if (!d.biRelay(a, b)) {
// 如果 biRelay 返回 false,则创建一个新的 BiRelay 对象
// 这个对象会处理两个子任务的完成情况
BiRelay<?,?> c = new BiRelay<>(d, a, b);
// 将 BiRelay 任务推送到 a 的栈中
a.bipush(b, c);
// 尝试触发 BiRelay 任务的执行
c.tryFire(SYNC);
}
}
// 返回最终合并后的 CompletableFuture 对象
return d;
}
# runAsync
执行流程分析
CompletableFuture定义了非常多的内部类以支持其强大的异步编程能力。
有两个 runAsync
方法用于异步执行任务,分别使用默认线程池和自定义线程池。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// 使用默认线程池执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
// 使用自定义线程池执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
我们先看使用默认线程池执行任务的runAsync方法。
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
// 检查 Runnable 是否为空,避免空指针异常
if (f == null) throw new NullPointerException();
// 创建一个新的 CompletableFuture 实例,用于表示任务结果
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 提交一个新的 AsyncRun 实例到线程池进行异步执行
e.execute(new AsyncRun(d, f));
// 返回 CompletableFuture 实例
return d;
}
AsyncRun 内部类:
// 异步任务的内部类,继承自 ForkJoinTask,并实现 Runnable 和 AsynchronousCompletionTask 接口
static final class AsyncRun extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; // 存储任务结果的 CompletableFuture
Runnable fn; // 要执行的 Runnable 任务
// 构造函数,初始化 dep 和 fn
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep;
this.fn = fn;
}
// 返回任务的原始结果,这里为 null,因为任务没有返回值
public final Void getRawResult() { return null; }
// 设置任务的原始结果,这里不需要设置任何结果
public final void setRawResult(Void v) {}
// 执行任务,调用 run() 方法
public final boolean exec() { run(); return true; }
// 运行异步任务
public void run() {
CompletableFuture<Void> d;
Runnable f;
// 检查 CompletableFuture 和 Runnable 是否不为空
if ((d = dep) != null && (f = fn) != null) {
dep = null; // 清空引用,避免内存泄漏
fn = null;
// 如果 CompletableFuture 的结果还未设置(即任务尚未完成)
if (d.result == null) {
try {
f.run(); // 执行 Runnable 任务
d.completeNull(); // 任务成功完成,设置 CompletableFuture 为完成状态
} catch (Throwable ex) {
d.completeThrowable(ex); // 任务执行过程中发生异常,设置 CompletableFuture 为异常状态
}
}
d.postComplete(); // 执行任务完成后的处理
}
}
}
总结:
调用 runAsync:
根据是否指定线程池,调用相应的 runAsync 方法,使用默认线程池或自定义线程池。创建 CompletableFuture 实例:
在 asyncRunStage 方法中,创建一个新的CompletableFuture<Void>
实例 d。提交任务到线程池: 在 asyncRunStage 方法中,创建一个新的 AsyncRun 实例,并将其提交到线程池执行。
AsyncRun 执行任务: 线程池中的线程执行 AsyncRun 的 run 方法。
执行 Runnable 任务,并根据任务的执行结果完成 CompletableFuture。
如果任务正常完成,调用 d.completeNull()。
如果任务执行过程中发生异常,调用 d.completeThrowable(ex)。
最后,调用 d.postComplete() 进行任务完成后的处理。
执行任务的默认线程池 ForkJoinPool.commonPool()
,这个方法会返回一个ForkJoinPool。
ForkJoinPool
的execute
方法接收的是一个ForkJoinTask
对象。
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
ForkJoinTask是个抽象类
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
而我们再看下CompletableFuture的常用方法接受的是什么对象:
Runnable、Supplier、BiConsumer、BiFunction
等。
这些接口可以参考 JDK版本特性(JDK8\11\17\21版本) (opens new window) 中的JDK8版本特性的内容。
所以CompletableFuture
需要把这些接口适配成ForkJoinTask
才能用ForkJoinPool
来执行。
下图来源《Java并发实现原理:JDK源码剖析》
上面runAsync
方法就是把Runnable
转换成AsyncRun
(ForkJoinTask的子类),然后提交给ForkJoinPool执行。
类似的还有:
supplierAsync
会把Supplier
转换成AsyncSupply
(ForkJoinTask的子类),然后提交给ForkJoinPool执行。
其余的就放在下图了:
总之这些适配的内部类都是ForkJoinTask的子类。
其余的具体是什么方法把什么类型的任务转换成什么类型提交给ForkJoinPool,可以参考JDK8 CompletableFuture源码。
# 5、CompletableFuture的生产使用示例
# 生产环境业务场景分析
本质上就是异步任务的编排场景。
灵活运用supplyAsync、thenApply、allOf
等方法。
# ①、有依赖关系的任务编排
使用supplyAsync
异步执行有返回值的任务,利用thenApply
组合单个依赖关系,利用allOf
组合全部结果。
代码示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "任务一 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "任务二 ";
});
CompletableFuture<String> future3 = future2.thenApply((result2) -> {
// 任务三 依赖 任务二
return result2 + "任务三 ";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future3);
CompletableFuture<String> finalFuture = allOf.thenApply((v) -> {
String result1 = future1.join();
String result3 = future3.join();
return result1 + result3;
});
System.out.println(finalFuture.join());
# ②、无依赖关系的任务编排
使用supplyAsync
异步执行有返回值的任务,利用allOf
组合全部结果。
代码示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "任务一 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "任务二 ";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "任务三 ";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<String> finalFuture = allOf.thenApply((v) -> {
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
return result1 + result2 + result3;
});
System.out.println(finalFuture.join());
# ③、其他情况
后一个任务依赖前两个任务的情况,可以使用thenCombine
。 (也可以使用allOf
替代)
代码示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "任务一 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "任务二 ";
});
CompletableFuture<String> thenCombine = future1.thenCombine(future2, (result1, result2) -> {
return result1 + result2;
});
CompletableFuture<String> future3 = thenCombine.thenApply((combine) -> {
return combine + "任务三 ";
});
CompletableFuture<String> finalFuture = future3.thenApply((result3) -> {
return result3;
});
System.out.println(finalFuture.join());
后一个任务依赖前n个任务中的任意一个,可以使用anyOf
。
代码示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "任务一 ";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "任务二 ";
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
CompletableFuture<String> future3 = anyOf.thenApply((combine) -> {
return combine + "任务三 ";
});
CompletableFuture<String> finalFuture = future3.thenApply((result3) -> {
return result3;
});
System.out.println(finalFuture.join());
# 6、CompletableFuture的使用注意事项
# ①、使用自定义线程池
默认情况下,CompletableFuture
使用公共的 ForkJoinPool.commonPool()
作为线程池。 这个线程池是全局共享的,如果过多异步任务占用这个线程池会导致资源紧张,也不利于线程管理。
建议使用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
方法,将自定义的 Executor
传递给 CompletableFuture
。这样可以避免线程池资源的争用,提高任务执行的性能和稳定性。
示例:
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
1,
9,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// 使用自定义的线程池executorService执行任务
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
return "使用自定义线程池执行任务~";
}, executorService);
}
# ②、父子任务要做线程池隔离
在使用 CompletableFuture
进行任务链式操作时,建议为父任务和子任务使用不同的线程池。这可以避免线程池资源竞争,提高任务的并行处理能力。可以通过不同的 Executor
对象来实现线程池隔离。
最重要的是如果不做隔离可能会出现死锁的情况。
死锁情况示例:
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
1,
9,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return CompletableFuture.supplyAsync(() -> {
return "子任务";
}, executorService).join() + " 父任务";
}, executorService);
String result = future.join();
System.out.println(result);
executorService.shutdown();
}
上面线程池的使用情况,先执行父任务,使用了一个核心线程,核心线程就一个用完了, 再执行子任务就会把任务放到工作队列等待执行。
父任务又需要等待子任务完成才能释放线程资源,此时就形成了死锁。
最合理的解决方式就是子任务使用另外的一个线程池执行。
# ③、合理捕获异常
上面第3节第⑥点说了异常处理的几种方式。
可以用get()
方法阻塞式try catch
获取异常信息。
其余的CompletableFuture提供的方法比如exceptionally,返回的都是CompletionException ,CompletionException
中真正的异常堆栈信息需要使用Throwable.getCause()方法返回。
可以参考美团技术的这篇文章 (opens new window)中写的CompletableFuture异常处理工具类,来处理异常。
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
我们可以对比下如果不提取看看打印的是什么?
异常处理示例:
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("抛异常!");
});
future.thenApply((result) -> {
return "执行成功~";
}).exceptionally((exc) -> {
// 直接打印异常
System.out.println(exc);
// 打印异常的getCause()返回信息
System.out.println(exc.getCause());
return "异常默认值";
});
}
执行结果:
java.util.concurrent.CompletionException: java.lang.RuntimeException: 抛异常!
java.lang.RuntimeException: 抛异常!
可以看到真正的异常信息被CompletionException包了一层。
# 总结
第4节的原理分析写的有点乱、后续还是要多看源码,再重新整理一下。
参考资料:
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
https://javaguide.cn/java/concurrent/completablefuture-intro.html
《Java并发实现原理:JDK源码剖析》