Future 与异步编程
Thread 和 Runnable 的缺陷
回顾之前学习的,创建线程的方式,有两种
- 继承
Thread类,重写run()接口 - 实现
Runnable接口,实现run()接口
但是这两种实现方式都有一个缺陷:没有返回值,如果需要运行一个带返回值的任务,原有的方式就不够用了。
所以 JDK 提供了一个接口:Callable,只有一个 call() 方法,这个方法既支持泛型,还支持抛出异常,这是 Runnable 不具备的能力
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}Future
Callabel 一般不会单独使用,而是搭配线程池来使用:
Future<Integer> future = executorService.submit(() -> 123);
Integer result = future.get();
System.out.println(result);其中提交给线程池运行后的结果不是直接返回的,而是通过 Future 来进行包装,Future 提供了检查计算是否完成、是否取消,阻塞获取计算结果的方法
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws ...;
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}其中最重要的方法是 get(),他的作用获取 Callable 的执行结果
- 如果此刻
Callable定义的任务还没执行完,那么会阻塞当前线程,直到运算完成;所以Future也提供了带超时时间的get()方法,目的就是避免持续阻塞进而引发的资源耗尽或死锁等问题 get()方法可以抛出异常,其中ExecutionException是Callable任务执行过程中抛出的异常的包装异常- 必须主动调用
get()方法才能得到任务结果,Future无法做到异步回调通知
FutureTask 可执行的 Future
FutureTask 是 Future 的实现类,除此之外还实现了 Runnable,使它具备了:
- 可以被线程执行的能力,(
Future只是一个结果的 "占位符",它并不能直接运行) - 可以获取结果的能力
状态机的体现
FutureTask 的 state 属性和 CAS 修改的方式,保证了 run() 方法只被执行一次
private volatile int state;
private static final int NEW = 0; // 创建完成
private static final int COMPLETING = 1;
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 执行异常
private static final int CANCELLED = 4; // 被取消
private static final int INTERRUPTING = 5; // 正在中断
private static final int INTERRUPTED = 6; // 已中断run() 方法只执行一次的原因
因为 run() 方法执行的时候会优先检查 state 是否为 NEW,是才能执行,否则不执行
public void run() {
if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
} finally {
}
}调用 get() 方法阻塞获取结果
- 如果当前任务未执行完,那么会通过
LockSupport.park()来挂起调用线程 - 如果当前任务执行完成了,
run()方法会调用LockSupport.unpark()所有等待的线程,并把结果存放到outcome属性中
CompletableFuture 异步任务编排
Future 和 FutureTask 虽然给线程任务提供了获取任务结果的能力,但是其本质上还是阻塞等待的方式,无法做到回调通知,因此在复杂的流程编排中还不够用
CompletableFuture 拥有这种能力,能实现多任务流程编排、异步回调等功能
常用创建方式
| 方法 | 说明 |
|---|---|
CompletableFuture.runAsync(Runnable action) | 异步执行有返回结果的任务 |
CompletableFuture.supplyAsync(Supplier<T> supplier) | 异步执行无返回结果的任务 |
默认会使用
ForkJoinPool.commonPool(),在实际生产环境中必须指定自定义线程池,避免影响其他使用公共组件的功能(比如paparallelStreamra)
编排模式
串行依赖(A → B → C)
串行依赖是
CompletableFuture的典型编排模型,适用于每个线程任务存在先后依赖关系的情况
thenApply(Function<T,R>):有输入有输出thenAccept(Consumer<T>):有输入无输出thenRun(Runnable):无输入无输出thenCompose(Function<T, CompletableFuture<R>>):有输入有输出,和thenApply相比,它避免了CompletableFuture的嵌套
并行合并(A & B -> C)
并行合并适用于每个任务之间没有相互依赖关系,可以同时并行,并最终聚合成一个结果返回的情况
CompletableFuture<R> other, BiFunction<T,U,R>):两个CompletableFuture都完成后,输入两个任务的结果,输出合并的结果CompletableFuture<Void> other, BiConsumer<T,U>):两个CompletableFuture都完成后,消费两个结果,输入两个任务的结果,无输出CompletableFuture<Void> runAfterBoth(other, Runnable):两个CompletableFuture都完成后,执行Runnable,无输入无输出- 多任务聚合(更常用):
CompletableFuture.allOf(cf1, cf2, cf3):所有任务都完成才继续CompletableFuture.anyOf(cf1, cf2, cf3):所有任务中任意一个完成就继续
异常处理
exceptionally(Function<Throwable, T>):仅处理异常,返回 fallback 值handle(BiFunction<T, Throwable, R>):同时处理正常结果和异常(推荐)whenComplete(BiFunction<T, Throwable, Void>):类似 finally,不改变结果
此外,对于获取任务结果的方法,有 get() 和 join() 两种,他们在处理异常上也有着不同
get()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("任务失败");
}
return "结果";
});
try {
String result = future.get(); // ❌ 必须 try-catch,且必须要分别处理两种异常
} catch (InterruptedException e) {
// 处理线程中断
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 处理任务执行异常
Throwable cause = e.getCause(); // 获取原始异常
System.out.println("执行失败:" + cause.getMessage());
}join()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("任务失败");
}
return "结果";
});
try {
String result = future.join(); // ✅ 不需要处理受检异常
} catch (CompletionException e) {
// ❗可选 catch,通常用于获取原始异常
Throwable cause = e.getCause(); // 获取原始异常
System.out.println("执行失败:" + cause.getMessage());
}
// 或者更简洁:不 catch,让异常向上抛出
String result = future.join(); // 异常会自动传播| 方法 | 来源 | 异常类型 | 是否必须 catch |
|---|---|---|---|
get() | Future 接口 | InterruptedException (线程中断异常) ExecutionException (任务执行异常) | ✅ 必须 |
join() | CompletableFuture 特有 | CompletionException | ❌ 可选 |
最佳实践
自定义线程池,尽可能不滥用
ForkJoinPool.commonPool(),这个线程池是一个公共线程池,为了 CPU 密集型任务设计的,涉及到 IO 密集型(需要等待)任务容易把线程资源耗尽,还会卡住其他公共组件,比如parallelStrem每个
CompletableFuture都应有异常处理,如果其中一个抛出异常而又没有handle()等异常处理方法时,会中断后续的步骤尽可能多用
join()少用get(),因为join()仅抛出RuntimeException,搭配handle()或者exceptionally()方法返回默认值,可以不处理保证代码的整洁