跳转至

Future and Asynchronous Programming⚓︎

824 个字 81 行代码 预计阅读时间 5 分钟

Asynchornous vs Synchronous⚓︎

  • 异步(asynchronous):调用方发出请求后不必等待结果返回,而是立即继续执行后续代码;当结果真正就绪时,通过回调、事件或通知等方式再被动处理
  • 同步(synchronous):发出请求后必须阻塞等待,直到结果返回才能继续下一步
类比:咖啡店买咖啡
  • 同步:你点单后站在柜台前等咖啡做好,期间不能做别的事,拿到咖啡才离开
  • 异步:你点单后拿到一张叫号牌,去座位上刷手机;咖啡做好后,服务员喊号 / 送过来,你再去取
代码对比
// 同步:阻塞当前线程
User user = rpc.getUser(id);
System.out.println(user.name());                   // 直到网络返回才继续

// 异步:立即返回,结果通过回调处理
CompletableFuture<User> fu = rpc.getUserAsync(id);
fu.thenAccept(u -> System.out.println(u.name()));  // 回调里执行
doOtherWork();                                     // 立刻做别的事

下表列举了异步和同步之间的关键区别:

维度 同步 异步
是否阻塞调用线程 ✅ 阻塞 ❌ 立即返回
结果如何拿到 直接返回值 / 异常 回调、事件、Future
CPU 利用率 低(线程空等) 高(线程可做的事)
复杂度 低(线性代码) 高(需处理回调、错误传播)
适用场景 计算快 / 必须结果才能继续 IO/ 远程调用 / 高并发
Java 例子 Stream.forEach() / Future.get() CompletableFuture.thenApply() / NIO
为什么需要异步?
场景 同步耗时 异步耗时
3 RPC 串行 300ms 100ms(并行)
1000 个日志落盘 600ms 50ms(批量)
高并发 QPS 下降 保持

异步 != 多线程:不阻塞线程 = 提高吞吐量

函数并行:

A = F(B) --               FA = future(F(B))
           |--> wait
C = G(A) --               FB = future(G(FA.get()))
           |--> parallel
D = H(A) --               FC = future(H(FA.get()))

Future⚓︎

Future 是异步任务结果的句柄,代表尚未完成的任务。在 Future 上,可以:

  • 查看任务是否完成
  • 获取任务返回值(或异常)
  • 取消任务
  • 限时等待
类比:餐厅取餐小票
  • 你下单后拿到一张小票(Future
  • 厨师后台炒菜(任务执行)
  • 你可以站着等(阻塞)或先玩手机(干别的事)
  • 菜做好后凭小票取餐(get 结果)
  • 等太久可以退单(cancel

核心接口:java.util.concurrent.Future<V>

  • boolean isDone():任务是否已完成(含正常、异常、取消)
  • boolean isCancelled():任务是否已取消
  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务
  • V get():阻塞等待结果,返回 V 或抛 ExecutionException
  • V get(long timeout, TimeUnit unit):带超时阻塞,超时抛 TimeoutException

Future 最简使用模板:

ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 42;
    }
});

// ① 干点别的
doOtherWork();

// ② 阻塞取结果
try {
    Integer result = future.get();        // 阻塞直到任务完成
    System.out.println(result);           // 42
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();   // 重置中断位
} catch (ExecutionException e) {
    Throwable cause = e.getCause();       // 任务内部异常
    cause.printStackTrace();
}

pool.shutdown();
局限性
  • 只能主动轮询或阻塞 get,不能回调
  • 不能链式组合多个异步任务
  • 不能异常链式处理

Java 8 引入 CompletableFuture 解决这些痛点。

CompletableFuture⚓︎

Future vs CompletableFuture

// Future - 阻塞 get
Future<User> f = executor.submit(() -> rpc());
User u = f.get();  // 阻塞

// CompletableFuture - 回调
CompletableFuture<User> cf = CompletableFuture.supplyAsync(() -> rpc());
cf.thenAccept(u -> handle(u));  // 非阻塞

创建 CompletableFuture 4 种方式:

  • supplyAsync:带返回值
    • 示例:supplyAsync(() -> rpc())
  • runAsync:无返回值
    • 示例:runAsync(() -> log())
  • completedFuture:已完结
    • 示例:completedFuture(val)
  • newIncompleteFuture:手动完成
    • 示例:new CF<>() + complete()

串行流水线:thenApply

CompletableFuture<Integer> cf =
    supplyAsync(() -> 1)
        .thenApply(i -> i + 1)      // +1
        .thenApply(i -> i * 2);     // *2
System.out.println(cf.join());      // 4

每一步异步执行,线程复用(ForkJoinPool.commonPool()

并行汇聚:thenCombine / thenAcceptBoth

CompletableFuture<Integer> price = supplyAsync(() -> fetchPrice());
CompletableFuture<Integer> quantity = supplyAsync(() -> fetchQty());

CompletableFuture<BigDecimal> total =
    price.thenCombine(quantity, (p, q) -> p * q);

两个任务并行完成后再聚合,不阻塞主线程。

异常链:exceptionally / handle

supplyAsync(() -> 1 / 0)       // 异常
    .exceptionally(ex -> 0)    // 兜底值
    .thenApply(i -> i + 1)
    .join(); //1
supplyAsync(() -> 1 / 0)
    .handle((res, ex) -> ex != null ? -1 : res)
    .join();                   // -1

异步重试模板

static <T> CompletableFuture<T> retry(Supplier<T> task, int max){
    return supplyAsync(task)
        .handle((r, ex) -> {
            if (ex == null) return completedFuture(r);
            if (max > 0) return retry(task, max - 1);
            return completedExceptionally(ex);
        })
        .thenCompose(cf -> cf);
}

批量等待:allOf / anyOf

List<CompletableFuture<String>> cfs =
    urls.stream()
        .map(u -> supplyAsync(() -> httpGet(u)))
        .collect(toList());

CompletableFuture<Void> all = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0]));
all.thenRun(() -> System.out.println("全部完成"));

自定义线程池(避免 commonPool 被阻塞)

ExecutorService bizPool = Executors.newFixedThreadPool(16);
CompletableFuture.supplyAsync(() -> rpc(), bizPool)
                 .thenApply(this::convert, bizPool)             // 指定同池
                 .thenAccept(this::save);
bizPool.shutdown();

规则:I/O 阻塞任务必须自建池;计算任务可用 commonPool

并行流 vs CompletableFuture

维度 ParallelStream CompletableFuture
拆分源 spliterator 手动 fork
线程池 commonPool 任意池
结果汇聚 collect thenCombine/allOf
异常处理 受限于 Stream handle/exceptionally

性能对比:并行查询 3 RPC

  • 场景:3 RPC 100ms
  • 串行:300ms
  • 并行流:103ms
  • CompletableFuture + 自定义池:98ms

差距来自线程池可控 + 更少装箱。

评论区

如果大家有什么问题或想法,欢迎在下方留言~