跳转至

Virtual Thread and Reactive⚓︎

308 个字 54 行代码 预计阅读时间 2 分钟

百万并发 HttpServer

// 平台线程
Executors.newFixedThreadPool(200) -> OOM

// 虚拟线程
Executors.newVirtualThreadPerTaskExecutor() -> 1M + 连接

本地 Demo1s 创建 1M 虚拟线程,内存 < 1GB

维度 平台线程 虚拟线程
内存 1MB/ \(\approx\) 几百字节
创建 系统调用 用户态
数量 数千 数百万
阻塞代价 内核调度 挂起续体

冷知识:虚拟线程曾用名为「纤程 (fiber)

虚拟线程核心:Continuation 续体

%%{init: { 'theme': 'base', 'themeVariables': { 
    'actorBkg': '#fff9e5', 
    'actorBorder': '#ffcc5c', 
    'noteBkgColor': '#fff9e5', 
    'noteBorderColor': '#ffcc5c' 
}}}%%
sequenceDiagram
    participant VT
    participant Carrier

    VT->>Carrier: run()

    Note over VT: Socket.read()阻塞

    Carrier->>VT: **挂起**→保存PC/栈

    Carrier->>Carrier: 偷任务

    Note over VT: IO完成

    Carrier->>VT: **恢复**→继续执行

挂起 / 恢复由 JVM 完成,OS 无感知。

创建虚拟线程的方法:

// ① 快速创建
Thread v = Thread.startVirtualThread(() -> task());

// ② Builder
Thread.ofVirtual().name("vt-", 0).start(() -> task());

// ③ Executor
try (var exe = Executors.newVirtualThreadPerTaskExecutor()) {
    exe.submit(() -> task());
}

结构化并发(structured concurrency):自动 join、取消剩余、错误传播

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> rpc1());
    Future<String> f2 = scope.fork(() -> rpc2());
    scope.join();               // 等全部完成
    scope.throwIfFailed();      // 任一失败抛异常
    return f1.resultNow() + f2.resultNow();
}

阻塞不再是罪:I/O 示例

//虚拟线程内阻
塞read
try (var server = ServerSocketChannel.open()) {
    server.bind(new InetSocketAddress(8080));
    while (true) {
        SocketChannel ch = server.accept();
        Thread.startVirtualThread(() -> {
            try {
                ch.read(buf);  // 阻塞 -> 挂起
                ch.write(buf);
            } catch (IOException ignore) {}
        });
    }
}
  • 阻塞不会占用平台线程,性能 ≈ NIO

固定(pinning)陷阱

synchronized (lock) {       // 内部 Monitor
    InputStream.read();     // 阻塞 -> pinned
}
  • 虚拟线程仍占用平台线程
  • 解决:
    • 改用 ReentrantLock
    • 避免在 synchronized 内阻塞 I/O

调试与监控:

  • jstack:显示 VirtualThread[#123]
  • jcmd:Thread.dump_to_file
  • VisualVM:插件 2.1+ 可聚合 VT
  • JFR:JDK20+ 记录 VT 事件

Reactive 编程思想:

graph LR
    %% 定义节点
    P[Publisher]
    S[Subscriber]

    %% 中间节点
    N1[nNext]
    N2[nComplete]
    N3[nError]

    %% 连接关系
    %% --o 表示末端带圆点的连接线
    P --o N1
    N1 --> S

    P --o N2
    N2 --> S

    P --o N3
    N3 --> S

Reactor 核心 API

// Mono(0|1)
Mono<String> mono =
    Mono.fromCallable(() -> rpc())
        .subscribeOn(Schedulers.parallel())
        .timeout(Duration.ofSeconds(2))
        .onErrorReturn("fallback");

// Flux(N)
Flux<Integer> flux =
    Flux.range(1, 5)
        .map(i -> i * 2)
        .filter(i -> i > 5);

背压(back-pressure) 机制:

Flux.interval(Duration.ofMillis(1))   // 生产快
    .onBackpressureBuffer(1000)       // 缓冲 1k
    .subscribe(i -> slowConsumer(i)); // 消费慢

溢出时丢弃 / 报错 / 缓冲,防止 OOM

VT + Reactive:对比选择

维度 虚拟线程 Reactive
代码风格 同步阻塞 链式回调
学习成本 ( 操作符 )
调试 栈可追踪 回调栈深
适用 IO 密集 IO 密集 + 高组合
背压 ( 需额外 ) 原生

二选一?-> 混合:VT 调用 Reactive 客户端

性能对比:VT vs NIO vs Reactive

  • 场景:Echo 服务器,1k 并发连接
  • 平台线程:12k QPS,内存 800MB
  • NIO+Reactor:28k QPS,内存 300MB
  • 虚拟线程:27k QPS,内存 <300MB

评论区

如果大家有什么问题或想法,欢迎在下方留言~