Future and Asynchronous Programming⚓︎
约 824 个字 81 行代码 预计阅读时间 5 分钟
Asynchornous vs Synchronous⚓︎
- 异步(asynchronous):调用方发出请求后不必等待结果返回,而是立即继续执行后续代码;当结果真正就绪时,通过回调、事件或通知等方式再被动处理
- 同步(synchronous):发出请求后必须阻塞等待,直到结果返回才能继续下一步
类比:咖啡店买咖啡
- 同步:你点单后站在柜台前等咖啡做好,期间不能做别的事,拿到咖啡才离开
- 异步:你点单后拿到一张叫号牌,去座位上刷手机;咖啡做好后,服务员喊号 / 送过来,你再去取
代码对比
下表列举了异步和同步之间的关键区别:
| 维度 | 同步 | 异步 |
|---|---|---|
| 是否阻塞调用线程 | ✅ 阻塞 | ❌ 立即返回 |
| 结果如何拿到 | 直接返回值 / 异常 | 回调、事件、Future |
| CPU 利用率 | 低(线程空等) | 高(线程可做的事) |
| 复杂度 | 低(线性代码) | 高(需处理回调、错误传播) |
| 适用场景 | 计算快 / 必须结果才能继续 | IO/ 远程调用 / 高并发 |
| Java 例子 | Stream.forEach() / Future.get() |
CompletableFuture.thenApply() / NIO |
为什么需要异步?
| 场景 | 同步耗时 | 异步耗时 |
|---|---|---|
| 3 个 RPC 串行 | 300ms | 100ms(并行) |
| 1000 个日志落盘 | 600ms | 50ms(批量) |
| 高并发 QPS | 下降 | 保持 |
异步 != 多线程:不阻塞线程 = 提高吞吐量
函数并行:
A = F(B) -- FA = future(F(B))
|--> wait
C = G(A) -- FB = future(G(FA.get()))
|--> parallel
D = H(A) -- FC = future(H(FA.get()))
Future⚓︎
Future 是异步任务结果的句柄,代表尚未完成的任务。在 Future 上,可以:
- 查看任务是否完成
- 获取任务返回值(或异常)
- 取消任务
- 限时等待
类比:餐厅取餐小票
- 你下单后拿到一张小票(
Future) - 厨师后台炒菜(任务执行)
- 你可以站着等(阻塞)或先玩手机(干别的事)
- 菜做好后凭小票取餐(
get结果) - 等太久可以退单(
cancel)
核心接口:java.util.concurrent.Future<V>
boolean isDone():任务是否已完成(含正常、异常、取消)boolean isCancelled():任务是否已取消boolean cancel(boolean mayInterruptIfRunning):尝试取消任务V get():阻塞等待结果,返回V或抛ExecutionExceptionV get(long timeout, TimeUnit unit):带超时阻塞,超时抛TimeoutException
Future 最简使用模板:
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(new Callable<Integer>() {
public Integer call() throws Exception {
Thread.sleep(1000);
return 42;
}
});
// ① 干点别的
doOtherWork();
// ② 阻塞取结果
try {
Integer result = future.get(); // 阻塞直到任务完成
System.out.println(result); // 42
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重置中断位
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 任务内部异常
cause.printStackTrace();
}
pool.shutdown();
局限性
- 只能主动轮询或阻塞
get,不能回调 - 不能链式组合多个异步任务
- 不能异常链式处理
Java 8 引入
CompletableFuture解决这些痛点。
CompletableFuture⚓︎
Future vs CompletableFuture:
// Future - 阻塞 get
Future<User> f = executor.submit(() -> rpc());
User u = f.get(); // 阻塞
// CompletableFuture - 回调
CompletableFuture<User> cf = CompletableFuture.supplyAsync(() -> rpc());
cf.thenAccept(u -> handle(u)); // 非阻塞
创建 CompletableFuture 的 4 种方式:
supplyAsync:带返回值- 示例:
supplyAsync(() -> rpc())
- 示例:
runAsync:无返回值- 示例:
runAsync(() -> log())
- 示例:
completedFuture:已完结- 示例:
completedFuture(val)
- 示例:
newIncompleteFuture:手动完成- 示例:
new CF<>() + complete()
- 示例:
串行流水线:thenApply
CompletableFuture<Integer> cf =
supplyAsync(() -> 1)
.thenApply(i -> i + 1) // +1
.thenApply(i -> i * 2); // *2
System.out.println(cf.join()); // 4
每一步异步执行,线程复用(ForkJoinPool.commonPool()
并行汇聚:thenCombine / thenAcceptBoth
CompletableFuture<Integer> price = supplyAsync(() -> fetchPrice());
CompletableFuture<Integer> quantity = supplyAsync(() -> fetchQty());
CompletableFuture<BigDecimal> total =
price.thenCombine(quantity, (p, q) -> p * q);
两个任务并行完成后再聚合,不阻塞主线程。
异常链:exceptionally / handle
异步重试模板
static <T> CompletableFuture<T> retry(Supplier<T> task, int max){
return supplyAsync(task)
.handle((r, ex) -> {
if (ex == null) return completedFuture(r);
if (max > 0) return retry(task, max - 1);
return completedExceptionally(ex);
})
.thenCompose(cf -> cf);
}
批量等待:allOf / anyOf
List<CompletableFuture<String>> cfs =
urls.stream()
.map(u -> supplyAsync(() -> httpGet(u)))
.collect(toList());
CompletableFuture<Void> all = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0]));
all.thenRun(() -> System.out.println("全部完成"));
自定义线程池(避免 commonPool 被阻塞)
ExecutorService bizPool = Executors.newFixedThreadPool(16);
CompletableFuture.supplyAsync(() -> rpc(), bizPool)
.thenApply(this::convert, bizPool) // 指定同池
.thenAccept(this::save);
bizPool.shutdown();
规则:I/O 阻塞任务必须自建池;计算任务可用 commonPool
并行流 vs CompletableFuture:
| 维度 | ParallelStream | CompletableFuture |
|---|---|---|
| 拆分源 | spliterator |
手动 fork |
| 线程池 | commonPool |
任意池 |
| 结果汇聚 | collect |
thenCombine/allOf |
| 异常处理 | 受限于 Stream |
handle/exceptionally |
性能对比:并行查询 3 个 RPC
- 场景:3 个 RPC 各 100ms
- 串行:300ms
- 并行流:103ms
CompletableFuture+ 自定义池:98ms
差距来自线程池可控 + 更少装箱。
评论区
如果大家有什么问题或想法,欢迎在下方留言~