探索 Java 并發(fā)如何從 Java 8 的增強(qiáng)發(fā)展到 Java 21 的虛擬線程,從而實(shí)現(xiàn)輕量級(jí)、可擴(kuò)展且高效的多線程處理。
引言
并發(fā)編程仍然是構(gòu)建可擴(kuò)展、響應(yīng)式 Java 應(yīng)用程序的關(guān)鍵部分。多年來(lái),Java 持續(xù)增強(qiáng)了其多線程編程能力。本文回顧了從 Java 8 到 Java 21 并發(fā)的演進(jìn),重點(diǎn)介紹了重要的改進(jìn)以及 Java 21 中引入的具有重大影響的虛擬線程。
從 Java 8 開(kāi)始,并發(fā) API 出現(xiàn)了顯著的增強(qiáng),例如原子變量、并發(fā)映射以及集成 lambda 表達(dá)式以實(shí)現(xiàn)更具表現(xiàn)力的并行編程。
Java 8 引入的關(guān)鍵改進(jìn)包括:
- 線程與執(zhí)行器
- 同步與鎖
- 原子變量與 ConcurrentMap
Java 21 于 2023 年底發(fā)布,帶來(lái)了虛擬線程這一重大演進(jìn),從根本上改變了 Java 應(yīng)用程序處理大量并發(fā)任務(wù)的方式。虛擬線程為服務(wù)器應(yīng)用程序提供了更高的可擴(kuò)展性,同時(shí)保持了熟悉的"每個(gè)請(qǐng)求一個(gè)線程"的編程模型。
[圖片上傳失敗...(image-ee2bf7-1761803226082)]
或許,Java 21 中最重要的特性就是虛擬線程。
在 Java 21 中,Java 的基本并發(fā)模型保持不變,Stream API 仍然是并行處理大型數(shù)據(jù)集的首選方式。
隨著虛擬線程的引入,并發(fā) API 現(xiàn)在能提供更好的性能。在當(dāng)今的微服務(wù)和可擴(kuò)展服務(wù)器應(yīng)用領(lǐng)域,線程數(shù)量必須增長(zhǎng)以滿足需求。虛擬線程的主要目標(biāo)是使服務(wù)器應(yīng)用程序能夠?qū)崿F(xiàn)高可擴(kuò)展性,同時(shí)仍使用簡(jiǎn)單的"每個(gè)請(qǐng)求一個(gè)線程"模型。
虛擬線程
在 Java 21 之前,JDK 的線程實(shí)現(xiàn)使用的是操作系統(tǒng)線程的薄包裝器。然而,操作系統(tǒng)線程代價(jià)高昂:
- 如果每個(gè)請(qǐng)求在其整個(gè)持續(xù)時(shí)間內(nèi)消耗一個(gè)操作系統(tǒng)線程,線程數(shù)量很快就會(huì)成為可擴(kuò)展性的瓶頸。
- 即使使用線程池,吞吐量仍然受到限制,因?yàn)閷?shí)際線程數(shù)量是有上限的。
虛擬線程的目標(biāo)是打破 Java 線程與操作系統(tǒng)線程之間的 1:1 關(guān)系。
虛擬線程應(yīng)用了類(lèi)似于虛擬內(nèi)存的概念。正如虛擬內(nèi)存將大的地址空間映射到較小的物理內(nèi)存一樣,虛擬線程允許運(yùn)行時(shí)通過(guò)將它們映射到少量操作系統(tǒng)線程來(lái)制造擁有許多線程的假象。
平臺(tái)線程是操作系統(tǒng)線程的薄包裝器。
而虛擬線程并不綁定到任何特定的操作系統(tǒng)線程。虛擬線程可以執(zhí)行平臺(tái)線程可以運(yùn)行的任何代碼。這是一個(gè)主要優(yōu)勢(shì)——現(xiàn)有的 Java 代碼通常無(wú)需修改或僅需少量修改即可在虛擬線程上運(yùn)行。虛擬線程由平臺(tái)線程承載,這些平臺(tái)線程仍然由操作系統(tǒng)調(diào)度。
例如,您可以像這樣創(chuàng)建一個(gè)使用虛擬線程的執(zhí)行器:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
對(duì)比示例
虛擬線程僅在主動(dòng)執(zhí)行 CPU 密集型任務(wù)時(shí)才消耗操作系統(tǒng)線程。虛擬線程在其生命周期內(nèi)可以在不同的載體線程上掛載或卸載。
通常,當(dāng)虛擬線程遇到阻塞操作時(shí),它會(huì)自行卸載。一旦該阻塞任務(wù)完成,虛擬線程通過(guò)掛載到任何可用的載體線程上來(lái)恢復(fù)執(zhí)行。這種掛載和卸載過(guò)程頻繁且透明地發(fā)生——不會(huì)阻塞操作系統(tǒng)線程。
示例 — 源代碼
Example01CachedThreadPool.java
在此示例中,使用緩存線程池創(chuàng)建了一個(gè)執(zhí)行器:
var executor = Executors.newCachedThreadPool()
package threads;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example01CachedThreadPool {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newCachedThreadPool()' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (var executor = Executors.newCachedThreadPool()) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example01CachedThreadPoolTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(1_000_000);
}
}
我 PC 上的測(cè)試結(jié)果:
[圖片上傳失敗...(image-7a34fd-1761803226082)]
[圖片上傳失敗...(image-baed20-1761803226082)]
Example02FixedThreadPool.java
使用固定線程池創(chuàng)建執(zhí)行器:
var executor = Executors.newFixedThreadPool(500)
package threads;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example02FixedThreadPool {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newFixedThreadPool(500)' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (var executor = Executors.newFixedThreadPool(500)) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example02FixedThreadPoolTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(1_000_000);
}
}
我 PC 上的測(cè)試結(jié)果:
[圖片上傳失敗...(image-982508-1761803226082)]
[圖片上傳失敗...(image-205126-1761803226082)]
Example03VirtualThread.java
使用虛擬線程每任務(wù)執(zhí)行器創(chuàng)建執(zhí)行器:
var executor = Executors.newVirtualThreadPerTaskExecutor()
package threads;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example03VirtualThread {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newVirtualThreadPerTaskExecutor()' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example03VirtualThreadTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(1_000_000);
}
@Test
@Order(5)
public void test_2_000_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(2_000_000);
}
}
我 PC 上的測(cè)試結(jié)果:
[圖片上傳失敗...(image-33c69d-1761803226082)]
[圖片上傳失敗...(image-1ae724-1761803226082)]
結(jié)論
您可以清楚地看到用于處理所有 NUMBER_OF_TASKS 的不同執(zhí)行器實(shí)現(xiàn)之間的執(zhí)行時(shí)間差異。值得嘗試不同的 NUMBER_OF_TASKS 值以觀察性能變化。
虛擬線程的優(yōu)勢(shì)在處理大量任務(wù)時(shí)變得尤其明顯。當(dāng) NUMBER_OF_TASKS 設(shè)置為較高的數(shù)值時(shí)——例如 1,000,000——性能差距是顯著的。如下表所示,虛擬線程在處理大量任務(wù)時(shí)效率要高得多:
[圖片上傳失敗...(image-778ad4-1761803226082)]
我確信,在澄清這一點(diǎn)之后,如果您的應(yīng)用程序使用并發(fā) API 處理大量任務(wù),您會(huì)認(rèn)真考慮遷移到 Java 21 并利用虛擬線程。在許多情況下,這種轉(zhuǎn)變可以顯著提高應(yīng)用程序的性能和可擴(kuò)展性。
源代碼:GitHub Repository – Comparing Threads in Java 21