Skip to content

Future 与异步编程

Thread 和 Runnable 的缺陷

回顾之前学习的,创建线程的方式,有两种

  • 继承 Thread 类,重写 run() 接口
  • 实现 Runnable 接口,实现 run() 接口

但是这两种实现方式都有一个缺陷:没有返回值,如果需要运行一个带返回值的任务,原有的方式就不够用了。

所以 JDK 提供了一个接口:Callable,只有一个 call() 方法,这个方法既支持泛型,还支持抛出异常,这是 Runnable 不具备的能力

java
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Future

Callabel 一般不会单独使用,而是搭配线程池来使用:

java
Future<Integer> future = executorService.submit(() -> 123);
Integer result = future.get();
System.out.println(result);

其中提交给线程池运行后的结果不是直接返回的,而是通过 Future 来进行包装,Future 提供了检查计算是否完成、是否取消,阻塞获取计算结果的方法

java
public interface Future<V> {
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws ...;
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

其中最重要的方法是 get(),他的作用获取 Callable 的执行结果

  • 如果此刻 Callable 定义的任务还没执行完,那么会阻塞当前线程,直到运算完成;所以 Future 也提供了带超时时间的 get() 方法,目的就是避免持续阻塞进而引发的资源耗尽或死锁等问题
  • get() 方法可以抛出异常,其中 ExecutionExceptionCallable 任务执行过程中抛出的异常的包装异常
  • 必须主动调用 get() 方法才能得到任务结果,Future 无法做到异步回调通知

FutureTask 可执行的 Future

FutureTaskFuture 的实现类,除此之外还实现了 Runnable,使它具备了:

  • 可以被线程执行的能力,(Future 只是一个结果的 "占位符",它并不能直接运行)
  • 可以获取结果的能力

状态机的体现

FutureTaskstate 属性和 CAS 修改的方式,保证了 run() 方法只被执行一次

java
    private volatile int state;
    private static final int NEW          = 0;      // 创建完成
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;      // 正常完成
    private static final int EXCEPTIONAL  = 3;      // 执行异常
    private static final int CANCELLED    = 4;      // 被取消
    private static final int INTERRUPTING = 5;      // 正在中断
    private static final int INTERRUPTED  = 6;      // 已中断

run() 方法只执行一次的原因

因为 run() 方法执行的时候会优先检查 state 是否为 NEW,是才能执行,否则不执行

java
    public void run() {
        if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            
        } finally {

        }
    }

调用 get() 方法阻塞获取结果

  • 如果当前任务未执行完,那么会通过 LockSupport.park() 来挂起调用线程
  • 如果当前任务执行完成了,run() 方法会调用 LockSupport.unpark() 所有等待的线程,并把结果存放到 outcome 属性中

CompletableFuture 异步任务编排

FutureFutureTask 虽然给线程任务提供了获取任务结果的能力,但是其本质上还是阻塞等待的方式,无法做到回调通知,因此在复杂的流程编排中还不够用

CompletableFuture 拥有这种能力,能实现多任务流程编排、异步回调等功能

常用创建方式

方法说明
CompletableFuture.runAsync(Runnable action)异步执行有返回结果的任务
CompletableFuture.supplyAsync(Supplier<T> supplier)异步执行无返回结果的任务

默认会使用 ForkJoinPool.commonPool(),在实际生产环境中必须指定自定义线程池,避免影响其他使用公共组件的功能(比如 paparallelStreamra

编排模式

串行依赖(A → B → C)

串行依赖是 CompletableFuture 的典型编排模型,适用于每个线程任务存在先后依赖关系的情况

  • thenApply(Function<T,R>):有输入有输出
  • thenAccept(Consumer<T>):有输入无输出
  • thenRun(Runnable):无输入无输出
  • thenCompose(Function<T, CompletableFuture<R>>):有输入有输出,和 thenApply 相比,它避免了 CompletableFuture 的嵌套

并行合并(A & B -> C)

并行合并适用于每个任务之间没有相互依赖关系,可以同时并行,并最终聚合成一个结果返回的情况

  • CompletableFuture<R> other, BiFunction<T,U,R>):两个 CompletableFuture 都完成后,输入两个任务的结果,输出合并的结果
  • CompletableFuture<Void> other, BiConsumer<T,U>):两个 CompletableFuture 都完成后,消费两个结果,输入两个任务的结果,无输出
  • CompletableFuture<Void> runAfterBoth(other, Runnable):两个 CompletableFuture 都完成后,执行 Runnable,无输入无输出
  • 多任务聚合(更常用)
    • CompletableFuture.allOf(cf1, cf2, cf3):所有任务都完成才继续
    • CompletableFuture.anyOf(cf1, cf2, cf3):所有任务中任意一个完成就继续

异常处理

  • exceptionally(Function<Throwable, T>):仅处理异常,返回 fallback 值
  • handle(BiFunction<T, Throwable, R>):同时处理正常结果和异常(推荐)
  • whenComplete(BiFunction<T, Throwable, Void>):类似 finally,不改变结果

此外,对于获取任务结果的方法,有 get() 和 join() 两种,他们在处理异常上也有着不同

  • get()
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("任务失败");
    }
    return "结果";
});

try {
    String result = future.get();  // ❌ 必须 try-catch,且必须要分别处理两种异常
} catch (InterruptedException e) {
    // 处理线程中断
    Thread.currentThread().interrupt();
} catch (ExecutionException e) {
    // 处理任务执行异常
    Throwable cause = e.getCause();  // 获取原始异常
    System.out.println("执行失败:" + cause.getMessage());
}
  • join()
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("任务失败");
    }
    return "结果";
});

try {
    String result = future.join();  // ✅ 不需要处理受检异常
} catch (CompletionException e) {
    // ❗可选 catch,通常用于获取原始异常
    Throwable cause = e.getCause();  // 获取原始异常
    System.out.println("执行失败:" + cause.getMessage());
}

// 或者更简洁:不 catch,让异常向上抛出
String result = future.join();  // 异常会自动传播
方法来源异常类型是否必须 catch
get()Future 接口InterruptedException (线程中断异常)
ExecutionException (任务执行异常)
✅ 必须
join()CompletableFuture 特有CompletionException❌ 可选

最佳实践

  • 自定义线程池,尽可能不滥用 ForkJoinPool.commonPool(),这个线程池是一个公共线程池,为了 CPU 密集型任务设计的,涉及到 IO 密集型(需要等待)任务容易把线程资源耗尽,还会卡住其他公共组件,比如 parallelStrem

  • 每个 CompletableFuture 都应有异常处理,如果其中一个抛出异常而又没有 handle() 等异常处理方法时,会中断后续的步骤

  • 尽可能多用 join() 少用 get(),因为 join() 仅抛出 RuntimeException,搭配 handle() 或者 exceptionally() 方法返回默认值,可以不处理保证代码的整洁