Java 21 虛擬線程 vs 緩存線程池與固定線程池

探索 Java 并發(fā)如何從 Java 8 的增強(qiáng)發(fā)展到 Java 21 的虛擬線程,從而實(shí)現(xiàn)輕量級(jí)、可擴(kuò)展且高效的多線程處理。

引言

并發(fā)編程仍然是構(gòu)建可擴(kuò)展、響應(yīng)式 Java 應(yīng)用程序的關(guān)鍵部分。多年來(lái),Java 持續(xù)增強(qiáng)了其多線程編程能力。本文回顧了從 Java 8 到 Java 21 并發(fā)的演進(jìn),重點(diǎn)介紹了重要的改進(jìn)以及 Java 21 中引入的具有重大影響的虛擬線程。

從 Java 8 開(kāi)始,并發(fā) API 出現(xiàn)了顯著的增強(qiáng),例如原子變量、并發(fā)映射以及集成 lambda 表達(dá)式以實(shí)現(xiàn)更具表現(xiàn)力的并行編程。

Java 8 引入的關(guān)鍵改進(jìn)包括:

  • 線程與執(zhí)行器
  • 同步與鎖
  • 原子變量與 ConcurrentMap

Java 21 于 2023 年底發(fā)布,帶來(lái)了虛擬線程這一重大演進(jìn),從根本上改變了 Java 應(yīng)用程序處理大量并發(fā)任務(wù)的方式。虛擬線程為服務(wù)器應(yīng)用程序提供了更高的可擴(kuò)展性,同時(shí)保持了熟悉的"每個(gè)請(qǐng)求一個(gè)線程"的編程模型。

[圖片上傳失敗...(image-ee2bf7-1761803226082)]

或許,Java 21 中最重要的特性就是虛擬線程。
在 Java 21 中,Java 的基本并發(fā)模型保持不變,Stream API 仍然是并行處理大型數(shù)據(jù)集的首選方式。
隨著虛擬線程的引入,并發(fā) API 現(xiàn)在能提供更好的性能。在當(dāng)今的微服務(wù)和可擴(kuò)展服務(wù)器應(yīng)用領(lǐng)域,線程數(shù)量必須增長(zhǎng)以滿足需求。虛擬線程的主要目標(biāo)是使服務(wù)器應(yīng)用程序能夠?qū)崿F(xiàn)高可擴(kuò)展性,同時(shí)仍使用簡(jiǎn)單的"每個(gè)請(qǐng)求一個(gè)線程"模型。

虛擬線程

在 Java 21 之前,JDK 的線程實(shí)現(xiàn)使用的是操作系統(tǒng)線程的薄包裝器。然而,操作系統(tǒng)線程代價(jià)高昂:

  • 如果每個(gè)請(qǐng)求在其整個(gè)持續(xù)時(shí)間內(nèi)消耗一個(gè)操作系統(tǒng)線程,線程數(shù)量很快就會(huì)成為可擴(kuò)展性的瓶頸。
  • 即使使用線程池,吞吐量仍然受到限制,因?yàn)閷?shí)際線程數(shù)量是有上限的。

虛擬線程的目標(biāo)是打破 Java 線程與操作系統(tǒng)線程之間的 1:1 關(guān)系。
虛擬線程應(yīng)用了類(lèi)似于虛擬內(nèi)存的概念。正如虛擬內(nèi)存將大的地址空間映射到較小的物理內(nèi)存一樣,虛擬線程允許運(yùn)行時(shí)通過(guò)將它們映射到少量操作系統(tǒng)線程來(lái)制造擁有許多線程的假象。

平臺(tái)線程是操作系統(tǒng)線程的薄包裝器。
而虛擬線程并不綁定到任何特定的操作系統(tǒng)線程。虛擬線程可以執(zhí)行平臺(tái)線程可以運(yùn)行的任何代碼。這是一個(gè)主要優(yōu)勢(shì)——現(xiàn)有的 Java 代碼通常無(wú)需修改或僅需少量修改即可在虛擬線程上運(yùn)行。虛擬線程由平臺(tái)線程承載,這些平臺(tái)線程仍然由操作系統(tǒng)調(diào)度。

例如,您可以像這樣創(chuàng)建一個(gè)使用虛擬線程的執(zhí)行器:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

對(duì)比示例

虛擬線程僅在主動(dòng)執(zhí)行 CPU 密集型任務(wù)時(shí)才消耗操作系統(tǒng)線程。虛擬線程在其生命周期內(nèi)可以在不同的載體線程上掛載或卸載。

通常,當(dāng)虛擬線程遇到阻塞操作時(shí),它會(huì)自行卸載。一旦該阻塞任務(wù)完成,虛擬線程通過(guò)掛載到任何可用的載體線程上來(lái)恢復(fù)執(zhí)行。這種掛載和卸載過(guò)程頻繁且透明地發(fā)生——不會(huì)阻塞操作系統(tǒng)線程。

示例 — 源代碼

Example01CachedThreadPool.java

在此示例中,使用緩存線程池創(chuàng)建了一個(gè)執(zhí)行器:

var executor = Executors.newCachedThreadPool()
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example01CachedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newCachedThreadPool()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newCachedThreadPool()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
                executor.submit(() -> {
                    // 模擬阻塞調(diào)用
                    Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                    return i;
                });
            });

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example01CachedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的測(cè)試結(jié)果:

[圖片上傳失敗...(image-7a34fd-1761803226082)]

[圖片上傳失敗...(image-baed20-1761803226082)]

Example02FixedThreadPool.java

使用固定線程池創(chuàng)建執(zhí)行器:

var executor = Executors.newFixedThreadPool(500)
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example02FixedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newFixedThreadPool(500)' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newFixedThreadPool(500)) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模擬阻塞調(diào)用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example02FixedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的測(cè)試結(jié)果:

[圖片上傳失敗...(image-982508-1761803226082)]

[圖片上傳失敗...(image-205126-1761803226082)]

Example03VirtualThread.java

使用虛擬線程每任務(wù)執(zhí)行器創(chuàng)建執(zhí)行器:

var executor = Executors.newVirtualThreadPerTaskExecutor()
package threads;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example03VirtualThread {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newVirtualThreadPerTaskExecutor()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模擬阻塞調(diào)用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example03VirtualThreadTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1_000_000);
    }

    @Test
    @Order(5)
    public void test_2_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(2_000_000);
    }

}

我 PC 上的測(cè)試結(jié)果:

[圖片上傳失敗...(image-33c69d-1761803226082)]

[圖片上傳失敗...(image-1ae724-1761803226082)]

結(jié)論

您可以清楚地看到用于處理所有 NUMBER_OF_TASKS 的不同執(zhí)行器實(shí)現(xiàn)之間的執(zhí)行時(shí)間差異。值得嘗試不同的 NUMBER_OF_TASKS 值以觀察性能變化。

虛擬線程的優(yōu)勢(shì)在處理大量任務(wù)時(shí)變得尤其明顯。當(dāng) NUMBER_OF_TASKS 設(shè)置為較高的數(shù)值時(shí)——例如 1,000,000——性能差距是顯著的。如下表所示,虛擬線程在處理大量任務(wù)時(shí)效率要高得多:

[圖片上傳失敗...(image-778ad4-1761803226082)]

我確信,在澄清這一點(diǎn)之后,如果您的應(yīng)用程序使用并發(fā) API 處理大量任務(wù),您會(huì)認(rèn)真考慮遷移到 Java 21 并利用虛擬線程。在許多情況下,這種轉(zhuǎn)變可以顯著提高應(yīng)用程序的性能和可擴(kuò)展性。

源代碼:GitHub Repository – Comparing Threads in Java 21


【注】本文譯自:Java 21 Virtual Threads vs Cached and Fixed Threads

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容