JDK19特性之虛擬線程和結(jié)構(gòu)化并發(fā)

1 虛擬線程

1.1 前言

在讀《深入理解JVM虛擬機(jī)》這本書前兩章的時(shí)候整理了JDK從1.0到最新版本發(fā)展史,其中記錄了JDK這么多年來演進(jìn)過程中的一些趣聞及引人注目的一些特性,在調(diào)研JDK19新增特性的時(shí)候了解到了虛擬線程這個(gè)概念,于是對(duì)虛擬線程進(jìn)行學(xué)習(xí)整理內(nèi)容如下。

1.2 虛擬線程介紹

1.2.1 虛擬線程

虛擬線程(Virtual Threads) 就猶如名字一樣,并非傳統(tǒng)意義上的JAVA線程。傳統(tǒng)意義上的JAVA線程(以下稱為平臺(tái)線程 或者 載體線程(Carrier Thread) )跟操作系統(tǒng)的內(nèi)核線程是一一映射的關(guān)系(如下圖所示)。而對(duì)于平臺(tái)線程的創(chuàng)建和銷毀所帶來的開銷是非常大的,所以JAVA采用線程池的方式來維護(hù)平臺(tái)線程而避免線程的反復(fù)創(chuàng)建和銷毀。

image.png

然而平臺(tái)線程也會(huì)占用內(nèi)存、CPU資源,往往在CPU和網(wǎng)絡(luò)連接成為系統(tǒng)瓶頸前,平臺(tái)線程首當(dāng)其沖的會(huì)成為系統(tǒng)瓶頸。在單臺(tái)服務(wù)器硬件資源確定的情況下,平臺(tái)線程的數(shù)量同樣也會(huì)因?yàn)橛布Y源而受到限制,也成為單臺(tái)服務(wù)器吞吐量提升的主要障礙。

談回虛擬線程,虛擬線程則是由JDK非操作系統(tǒng)提供的一種線程輕量級(jí)實(shí)現(xiàn),它相較于平臺(tái)線程而言具有以下特性:

  • 不依賴于平臺(tái)線程的數(shù)量;
  • 不會(huì)增加額外的上下文切換開銷;
  • 不會(huì)在代碼的整個(gè)生命周期中阻塞系統(tǒng)線程;
  • 整個(gè)虛擬線程的維護(hù)是通過JVM進(jìn)行管理,作為普通的JAVA對(duì)象存放在RAM
  • 用戶態(tài)調(diào)度不需要內(nèi)核參與
  • ??臻g只需幾 KB(線程通常 1MB)
  • 創(chuàng)建和切換不需要系統(tǒng)調(diào)用

那么意味著若干的虛擬線程可以在同一個(gè)系統(tǒng)線程上運(yùn)行應(yīng)用程序的代碼(如下圖所示),只有在虛擬線程執(zhí)行的時(shí)候才會(huì)消耗系統(tǒng)線程,在等待和休眠時(shí)不會(huì)阻塞系統(tǒng)線程。


image.png

相較于平臺(tái)線程而言,虛擬線程是一種非常廉價(jià)和豐富的線程,可以說虛擬線程的數(shù)量是一種近乎于無限多的線程,它對(duì)硬件的利用率接近于最好,在相同硬件配置服務(wù)器的情況下,虛擬線程比使用平臺(tái)線程具備更高的并發(fā)性,從而提升整個(gè)應(yīng)用程序的吞吐量。如果說平臺(tái)線程和系統(tǒng)線程調(diào)度為1:1的方式,虛擬線程則采用M:N的調(diào)度方式,其中大量的虛擬線程M在較少的系統(tǒng)線程N(yùn)上運(yùn)行。

1.2.2 進(jìn)程,線程,協(xié)程區(qū)別

三者區(qū)別:

維度 進(jìn)程 線程 協(xié)程
調(diào)度方 操作系統(tǒng) 操作系統(tǒng) 用戶程序
切換開銷 大(內(nèi)存空間切換) 中(寄存器、棧切換) 極?。ㄓ脩魬B(tài)棧切換)
內(nèi)存占用 獨(dú)立地址空間(MB 級(jí)) 共享進(jìn)程內(nèi)存,獨(dú)立棧(MB 級(jí)) 幾 KB
創(chuàng)建成本 高(fork) 中(系統(tǒng)調(diào)用) 極低(用戶態(tài)對(duì)象)
并發(fā)數(shù)量 幾十~幾百 幾百~幾千 幾萬~幾百萬
通信方式 管道、消息隊(duì)列、共享內(nèi)存 共享內(nèi)存(需同步) 共享內(nèi)存(無需同步)
安全性 一個(gè)崩潰不影響其他 一個(gè)崩潰可能拖垮整個(gè)進(jìn)程 一個(gè)掛起不影響其他

切換開銷對(duì)比
這是面試最核心的對(duì)比,直接看數(shù)據(jù):

維度 進(jìn)程切換 線程切換 協(xié)程切換
切換模式 用戶態(tài) ? 內(nèi)核態(tài) 用戶態(tài) ? 內(nèi)核態(tài) 純用戶態(tài)
切換地址空間 否(同進(jìn)程)
OS 參與
TLB 緩存影響 失效(嚴(yán)重) 可能失效 不影響
典型耗時(shí) 10~100 微秒 1~10 微秒 0.1~1 微秒

切換開銷:進(jìn)程 >> 線程 >> 協(xié)程,每級(jí)差 1~2 個(gè)數(shù)量級(jí)

1.3 虛擬線程如何被JVM調(diào)度

image.png

先創(chuàng)建一個(gè)虛擬線程,此時(shí)JVM會(huì)將虛擬線程裝載在平臺(tái)線程上,平臺(tái)線程則會(huì)去綁定一個(gè)系統(tǒng)線程。
JVM會(huì)使用調(diào)度程序去使用調(diào)度線程執(zhí)行虛擬線程中的任務(wù)。
任務(wù)執(zhí)行完成之后清空上下文變量,將調(diào)度線程返還至調(diào)度程序等待處理下一個(gè)任務(wù)。

JVM 通過 平臺(tái)線程 執(zhí)行虛擬線程,多個(gè)虛擬線程可以復(fù)用同一個(gè) 平臺(tái)線程。當(dāng)虛擬線程遇到 I/O 阻塞時(shí),JVM 會(huì)把它從載體線程上移除,避免 OS 線程空等,而讓平臺(tái)線程執(zhí)行其他任務(wù)。這種機(jī)制稱為協(xié)作式調(diào)度Cooperative Scheduling),可以極大提高線程調(diào)度的效率。
默認(rèn)情況下,底層實(shí)現(xiàn)主要依賴 ForkJoinPool,它會(huì)管理一組載體線程,并在其中調(diào)度虛擬線程。
簡(jiǎn)單來說,虛擬線程就是讓 Java 線程擺脫了 OS 線程的束縛,使得高并發(fā)處理更加高效。適合 I/O 密集型任務(wù),但對(duì) CPU 密集型任務(wù)提升不大。

1.4 平臺(tái)線程和虛擬線程的區(qū)別

1.4.1 虛擬線程始終是守護(hù)線程

虛擬線程始終是守護(hù)線程,Thread.setDaemon(false) 方法不能將虛擬線程更改為非守護(hù)線程。

注意:當(dāng)所有啟動(dòng)的非守護(hù)線程都終止時(shí),JVM 終止。這意味著 JVM 在退出之前不會(huì)等待虛擬線程完成。

Thread virtualThread = ...; //創(chuàng)建虛擬線程
//virtualThread.setDaemon(true);  //沒有作用

1.4.2 虛擬線程始終具有正常優(yōu)先級(jí)

虛擬線程始終具有正常優(yōu)先級(jí),并且即使使用setPriority(n)方法,也無法更改優(yōu)先級(jí)。在虛擬線程上使用此方法無效。

Thread virtualThread = ...; //創(chuàng)建虛擬線程
//virtualThread.setPriority(Thread.MAX_PRIORITY);  //沒有作用

1.4.3 虛擬線程不是線程組的活動(dòng)成員

虛擬線程不是線程組的活動(dòng)成員。在虛擬線程上調(diào)用時(shí),Thread.getThreadGroup() 返回一個(gè)名為 VirtualThreads 的占位符線程組。

1.4.4 虛擬線程不支持stop、suspend或resume

虛擬線程不支持stop()suspend()resume() 方法。
這些方法在虛擬線程上調(diào)用時(shí)會(huì)引發(fā) UnsupportedOperationException

1.4.5 虛擬線程生命周期

image.png

虛擬線程JVM調(diào)度,JVMVT分配給平臺(tái)線程的動(dòng)作稱為掛載(mount),取消分配的動(dòng)作稱為卸載(unmount),線程狀態(tài)如下

    // 初始狀態(tài)
    private static final int NEW      = 0;
    // 線程啟動(dòng),由于虛擬線程的run()是個(gè)空方法,此時(shí)尚未開始執(zhí)行任務(wù)
    // 真正的任務(wù)執(zhí)行在cont.run
    private static final int STARTED  = 1;
    // 可執(zhí)行,尚未分配平臺(tái)線程
    private static final int RUNNABLE = 2;
    // 可執(zhí)行,已分配平臺(tái)線程
    private static final int RUNNING  = 3;
    // 線程嘗試park
    private static final int PARKING  = 4;
    // 從平臺(tái)線程卸載
    private static final int PARKED   = 5;
    // cont.yield失敗,未從平臺(tái)線程卸載
    private static final int PINNED   = 6;
    // 嘗試cont.yield
    private static final int YIELDING = 7;
    // 終結(jié)態(tài)
    private static final int TERMINATED = 99;

1.5 虛擬線程的目標(biāo)、非目標(biāo)

虛擬線程的目標(biāo):

  • java.lang.Thread 增加一種額外的實(shí)現(xiàn),即虛擬線程,它能做到在幾個(gè)G的JVM堆上創(chuàng)建幾百萬個(gè)活動(dòng)的虛擬線程(這在現(xiàn)在的JDK中幾乎不可能實(shí)現(xiàn)),并且表現(xiàn)出和現(xiàn)在的線程幾乎一樣的行為。
  • 對(duì)虛擬線程問題定位也可以通過已經(jīng)存在的JDK工具,盡可能保持和現(xiàn)在的線程相似的方式。
  • Java 中,經(jīng)典線程是 java.lang.Thread 類的實(shí)例。后面我們也將它們稱為平臺(tái)線程。

虛擬線程的非目標(biāo):

  • 虛擬線程不是為了改變現(xiàn)在這種操作系統(tǒng)級(jí)別的線程的實(shí)現(xiàn)。
  • 虛擬線程不是為了自動(dòng)將已經(jīng)存在的線程構(gòu)造方法自動(dòng)轉(zhuǎn)為虛擬線程。
  • 虛擬線程不是為了改變JMM。
  • 虛擬線程不是為了增加一種新的內(nèi)部線程通信機(jī)制。
  • 除了并行流之外,虛擬線程也不是為了提供一種新的數(shù)據(jù)并行結(jié)構(gòu)。

1.6 如何創(chuàng)建虛擬線程

1.6.1 Thread.startVirtualThread()

此方法創(chuàng)建一個(gè)新的虛擬線程來執(zhí)行給定的 Runnable 任務(wù)。

Runnable runnable = () -> System.out.println("Virtual Thread");
Thread.startVirtualThread(runnable);

//or

Thread.startVirtualThread(() -> {
    //Code to execute in virtual thread
    System.out.println("Virtual Thread");
});

1.6.2 Thread.Builder

如果我們想在創(chuàng)建線程后顯式啟動(dòng)它,我們可以使用 Thread.ofVirtual() 返回一個(gè) VirtualThreadBuilder 實(shí)例。它的 start() 方法啟動(dòng)一個(gè)虛擬線程。這里的 Thread.ofVirtual().start(runnable) 等價(jià)于 Thread.startVirtualThread(runnable)

ThreadFactory factory = Thread.ofVirtual().factory();

我們可以使用Thread.Builder引用來創(chuàng)建和啟動(dòng)多個(gè)線程。

Runnable runnable = () -> System.out.println("Virtual Thread");

Thread.Builder builder = Thread.ofVirtual().name("Virtual-Thread");

Thread t1 = builder.start(runnable); 
Thread t2 = builder.start(runnable);

類似的 APIThread.ofPlatform()也可用于創(chuàng)建平臺(tái)線程。

Thread.Builder builder = Thread.ofPlatform().name("Platform-Thread");

Thread t1 = builder.start(() -> {...}); 
Thread t2 = builder.start(() -> {...});

1.6.3 Executors.newVirtualThreadPerTaskExecutor()

此方法為每個(gè)任務(wù)創(chuàng)建一個(gè)新的虛擬線程。 Executor 創(chuàng)建的線程數(shù)是無限的。

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 10_000).forEach(i -> {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            return i;
        });
    });
}

1.7 平臺(tái)線程和虛擬線程性能分析

任務(wù)說明:

在控制臺(tái)中打印一條消息之前等待1秒,現(xiàn)在使用Runnable創(chuàng)建10000個(gè)線程,用虛擬線程和平臺(tái)線程執(zhí)行它們,來比較兩者的性能。我們將使用Duration.between()api來測(cè)量執(zhí)行所有任務(wù)的經(jīng)過時(shí)間。

首先,我們使用一個(gè)包含 100 個(gè)平臺(tái)線程的池。這樣,Executor 一次可以運(yùn)行 100 個(gè)任務(wù),其他任務(wù)需要等待。由于我們有 10,000 個(gè)任務(wù),因此完成執(zhí)行的總時(shí)間約為 100 秒。

Instant start = Instant.now();

try (var executor = Executors.newFixedThreadPool(100)) {
  for(int i = 0; i < 10_000; i++) {
    executor.submit(runnable);
  }
}

Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();  
System.out.println("Total elapsed time : " + timeElapsed);  

輸出
Total elapsed time : 101152 //大概 101 秒

接下來,我們將Executors.newFixedThreadPool(100)替換為Executors.newVirtualThreadPerTaskExecutor()。這將在虛擬線程而不是平臺(tái)線程中執(zhí)行所有任務(wù)。

Instant start = Instant.now();

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  for(int i = 0; i < 10_000; i++) {
    executor.submit(runnable);
  }
}

Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();  
System.out.println("Total elapsed time : " + timeElapsed);  

輸出
Total elapsed time : 1589 // 大概 1.5 秒

請(qǐng)注意虛擬線程的超快性能將執(zhí)行時(shí)間從 100 秒減少到 1.5 秒,而 Runnable 代碼沒有任何變化

1.8 虛擬線程注意點(diǎn)

1.8.1 不要建虛擬線程池

Java 線程池旨在避免創(chuàng)建新操作系統(tǒng)線程的開銷,因?yàn)閯?chuàng)建它們是一項(xiàng)昂貴的操作。但是創(chuàng)建虛擬線程并不昂貴,因此永遠(yuǎn)不需要將它們池化。建議每次需要時(shí)創(chuàng)建一個(gè)新的虛擬線程。 請(qǐng)注意,使用虛擬線程后,我們的應(yīng)用程序可能能夠處理數(shù)百萬個(gè)線程,但其他系統(tǒng)或平臺(tái)一次只能處理幾個(gè)請(qǐng)求。例如,我們可以只有幾個(gè)數(shù)據(jù)庫(kù)連接或與其他服務(wù)器的網(wǎng)絡(luò)連接。 在這些情況下,也不要使用線程池。相反,使用信號(hào)量來確保只有指定數(shù)量的線程正在訪問該資源。

private static final Semaphore SEMAPHORE = new Semaphore(50);

SEMAPHORE.acquire();

try {
  // 信號(hào)量被控制在 50 來訪問請(qǐng)求
  // 訪問數(shù)據(jù)庫(kù)或資源
} finally {
  SEMAPHORE.release();
}

1.8.2 避免使用線程局部變量

虛擬線程支持線程局部行為的方式與平臺(tái)線程相同,但由于虛擬線程可以創(chuàng)建數(shù)百萬個(gè),因此只有在仔細(xì)考慮后才能使用線程局部變量。 例如,如果我們?cè)趹?yīng)用程序中擴(kuò)展一百萬個(gè)虛擬線程,那么將有一百萬個(gè) ThreadLocal 實(shí)例以及它們所引用的數(shù)據(jù)。如此大量的實(shí)例會(huì)給內(nèi)存帶來很大的負(fù)擔(dān),應(yīng)該避免。

1.8.3 使用 ReentrantLock 而不是同步塊

有兩種特定場(chǎng)景,虛擬線程可以阻塞平臺(tái)線程(稱為 OS 線程的固定)

  • 當(dāng)它在同步塊或同步方法內(nèi)執(zhí)行代碼時(shí)
  • 當(dāng)它執(zhí)行本地方法或外部函數(shù)時(shí)

這種同步塊不會(huì)使應(yīng)用程序出錯(cuò),但它會(huì)限制應(yīng)用程序的可擴(kuò)展性,類似于平臺(tái)線程。 如果一個(gè)方法使用非常頻繁并且它使用同步塊,則考慮將其替換為 ReentrantLock 機(jī)制。

public synchronized void m() {
    try {
        // ... 訪問資源
    } finally {
        //
    }
}
private final ReentrantLock lock = new ReentrantLock();

public void m() {
    lock.lock();  // 阻塞
    try {
        // ... 訪問資源
    } finally {
        lock.unlock();
    }
}

2 結(jié)構(gòu)化并發(fā)

2.1 簡(jiǎn)介

傳統(tǒng)并發(fā)的痛點(diǎn):

  • 任務(wù)生命周期不清晰
  • 容易導(dǎo)致資源泄漏
  • 錯(cuò)誤傳播和取消復(fù)雜
javaFuture<?> f1 = executor.submit(task1);
Future<?> f2 = executor.submit(task2);
// 手動(dòng) get(),容易泄漏、取消困難、錯(cuò)誤處理復(fù)雜

結(jié)構(gòu)化并發(fā)是 Java 21 引入的一種并發(fā)任務(wù)管理模型,目的是讓并發(fā)任務(wù)的生命周期有明確的邊界,避免線程泄漏任務(wù)失控。
Java 提供了 核心類:StructuredTaskScope 來實(shí)現(xiàn)結(jié)構(gòu)化并發(fā),它是一個(gè)自動(dòng)關(guān)閉的并發(fā)作用域,可以并發(fā)執(zhí)行多個(gè)子任務(wù),并在作用域結(jié)束時(shí)統(tǒng)一處理結(jié)果或異常。

結(jié)構(gòu)化并發(fā)優(yōu)點(diǎn):

  • 生命周期清晰,子任務(wù)隨 scope 結(jié)束而結(jié)束
  • 自動(dòng)取消,失敗時(shí)自動(dòng)取消其他任務(wù)
  • 錯(cuò)誤聚合,throwIfFailed() 統(tǒng)一處理
  • 避免泄漏,離開代碼塊即清理

2.2 類和方法

2.2.1 關(guān)鍵子類

兩個(gè)關(guān)鍵子類:行為不同

  • ShutdownOnFailure:任一失敗 → 取消其他,全部成功才繼續(xù)(如訂單+用戶)
  • ShutdownOnSuccess<T>:任一成功 → 取消其他,競(jìng)態(tài)取快(如多源緩存)
特性 ShutdownOnFailure ShutdownOnSuccess<T>
觸發(fā) shutdown() 條件 任一任務(wù)失敗 任一任務(wù)成功
取消其他任務(wù)
throwIfFailed() 可用 不可用(編譯錯(cuò)誤)
result() 不可用 可用(返回第一個(gè)成功)
所有失敗時(shí) throwIfFailed() 拋異常 result() 拋 ExecutionException
典型用途 多個(gè)任務(wù)必須都成功 競(jìng)速取最快成功

全部成功(ShutdownOnFailure)并行下載兩個(gè) URL,任意一個(gè)失敗就取消另一個(gè)

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> user = scope.fork(() -> fetchURL(userURL));
    Future<String> order = scope.fork(() -> fetchURL(orderURL));

    scope.join();           // 等待所有子任務(wù)完成
    scope.throwIfFailed();  // 如果任一失敗,拋出異常

    // 成功:使用結(jié)果
    System.out.println(user.resultNow() + order.resultNow());
} // 自動(dòng)關(guān)閉 scope,若有失敗會(huì)取消未完成任務(wù)

競(jìng)態(tài)取勝 (ShutdownOnSuccess)( race 模式)

javatry (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> fetchURL(url1));
    scope.fork(() -> fetchURL(url2));
    
    scope.join();
    String result = scope.result(); // 第一個(gè)成功的結(jié)果
}

2.2.2 方法

方法 功能 是否阻塞 返回值
fork() 啟動(dòng)一個(gè)子任務(wù) 不阻塞 Subtask<T>
join() 等待所有子任務(wù)結(jié)束 阻塞 void
joinUntil(Instant) 超時(shí)等待 阻塞 void
shutdown() 主動(dòng)關(guān)閉,取消未完成任務(wù) 不阻塞 void
throwIfFailed() 失敗時(shí)拋異常 不阻塞 void
result() / resultNow() 獲取結(jié)果(特定子類) 不阻塞 取決于子類

方法調(diào)用順序:fork() → fork() → ... → join() → [throwIfFailed() 或 result()]

2.3 與虛線程配合

組合方式:

  • 虛線程執(zhí)行器 + 結(jié)構(gòu)化并發(fā):用 newVirtualThreadPerTaskExecutor() 創(chuàng)建 StructuredTaskScope 的子任務(wù)
  • 同步寫法 + 高并發(fā):代碼像單線程,但實(shí)際并發(fā)百萬級(jí)

示例:

try (var executor = Executors.newVirtualThreadPerTaskExecutor();
     var scope = new StructuredTaskScope.ShutdownOnFailure()) {

    scope.fork(() -> callServiceA());  // 每個(gè) fork 自動(dòng)用虛線程
    scope.fork(() -> callServiceB());

    scope.join();
    scope.throwIfFailed();
}

注意StructuredTaskScope.fork() 默認(rèn)使用 ForkJoinPool.commonPool(),但可以自定義線程工廠使用虛線程。
自定義虛線程的 TaskScope

class VirtualThreadTaskScope extends StructuredTaskScope<Object> {
    private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
    @Override
    protected <T> Subtask<T> fork(Callable<T> task) {
        var subtask = super.fork(task);
        executor.execute(subtask);
        return subtask;
    }

    @Override
    public void close() {
        ((ExecutorService) executor).close();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容