學(xué)習(xí)Java 8 Stream Api (5) - Stream 周邊及其他

Stream API

經(jīng)過前面 4 篇內(nèi)容的學(xué)習(xí),我們已經(jīng)掌握了 Stream 大部分的知識,本節(jié)我們針對之前 Stream 未涉及的內(nèi)容及周邊知識點做個補充。

Fork/Join 框架

fork/join 框架是 Java 7 中引入的新特性 ,它是一個工具,通過 「 分而治之 」 的方法嘗試將所有可用的處理器內(nèi)核使用起來幫助加速并行處理。

在實際使用過程中,這種 「 分而治之 」的方法意味著框架首先要 fork ,遞歸地將任務(wù)分解為較小的獨立子任務(wù),直到它們足夠簡單以便異步執(zhí)行。然后,join 部分開始工作,將所有子任務(wù)的結(jié)果遞歸地連接成單個結(jié)果,或者在返回 void 的任務(wù)的情況下,程序只是等待每個子任務(wù)執(zhí)行完畢。

02_fork_join_principle.jpg

為了提供有效的并行執(zhí)行,fork/join 框架使用了一個名為 ForkJoinPool 的線程池,用于管理 ForkJoinWorkerThread 類型的工作線程。

Fork/Join 優(yōu)點

Fork/Join 架構(gòu)使用了一種名為工作竊?。?work-stealing )算法來平衡線程的工作負(fù)載。

簡單來說,工作竊取算法就是空閑的線程試圖從繁忙線程的隊列中竊取工作。

默認(rèn)情況下,每個工作線程從其自己的雙端隊列中獲取任務(wù)。但如果自己的雙端隊列中的任務(wù)已經(jīng)執(zhí)行完畢,雙端隊列為空時,工作線程就會從另一個忙線程的雙端隊列尾部或全局入口隊列中獲取任務(wù),因為這是最大概率可能找到工作的地方。

這種方法最大限度地減少了線程競爭任務(wù)的可能性。它還減少了工作線程尋找任務(wù)的次數(shù),因為它首先在最大可用的工作塊上工作。

Fork/Join 使用

ForkJoinTask 是 ForkJoinPool 線程之中執(zhí)行的任務(wù)的基本類型。我們?nèi)粘J褂脮r,一般不直接使用 ForkJoinTask ,而是擴展它的兩個子類中的任意一個

  1. 任務(wù)不返回結(jié)果 ( 返回 void ) 的 RecursiveAction
  2. 返回值的任務(wù)的 RecursiveTask <V>

這兩個類都有一個抽象方法 compute() ,用于定義任務(wù)的邏輯。

我們所要做的,就是繼承任意一個類,然后實現(xiàn) compute() 方法,步驟如下:

  1. 創(chuàng)建一個表示工作總量的對象
  2. 選擇合適的閾值
  3. 定義分割工作的方法
  4. 定義執(zhí)行工作的方法

如下是使用 Fork/Join 方式實現(xiàn)的1至1000006587的 Fork/Join 方式累加,我們和單線程的循環(huán)累加做了下對比,在 Intel i5-4460 的 PC 機器下,單線程執(zhí)行使用了 650 ms,使用了 Fork/Join 方式執(zhí)行 210 ms,優(yōu)化效果挺明顯。


public class NumberAddTask extends RecursiveTask<Long> {

    private static final int THRESHOLD = 10_0000;
    private final int begin;
    private final int end;

    public NumberAddTask(int begin, int end) {
        super();
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - begin <= THRESHOLD) {
            long sum = 0;
            for(int i = begin; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
        int mid = (begin + end) /2;
        NumberAddTask t1 = new NumberAddTask(begin, mid);
        NumberAddTask t2 = new NumberAddTask(mid + 1,  end);
        ForkJoinTask.invokeAll(t1, t2);
        return t1.join() + t2.join();
    }
}

// 1至1000006587的Fork/Join方式累加
@Test
public void testAddForkJoin() {
    long begin = System.currentTimeMillis();
    int n = 10_0000_6587;
    ForkJoinPool pool = ForkJoinPool.commonPool();
    log.info("1 + 2 + ... {} = {}", n, pool.invoke(new NumberAddTask(1, n)));
    long end = System.currentTimeMillis();
    log.info("ForkJoin方式執(zhí)行時間:{}ms", end - begin);
}

// 1至1000006587的單線程累加
@Test
public void testAddFunction() {
    long begin = System.currentTimeMillis();
    int n = 10_0000_6587;
    long sum = 0;
    for(int i = 1; i <= n; i++ ) {
        sum += i;
    }
    log.info("1 + 2 + ... {} = {}", n, sum);
    long end = System.currentTimeMillis();
    log.info("函數(shù)方式執(zhí)行時間:{}ms", end - begin);
}

Fork/Join 使用場景

我使用 Java 8 官方 Api 中 RecursiveTask 的示例,創(chuàng)建了一個計算斐波那契數(shù)列的 Fork/Join 實現(xiàn),雖然官方也提到了這是愚蠢的實現(xiàn)斐波那契數(shù)列方法,甚至效果還不如單線程的遞歸計算,但是這也說明了 Fork/Join 并非萬能的。

@Test
public void testForkJoin() {
    // 執(zhí)行f(40) = 102334155使用3411ms
    // 執(zhí)行f(80) 2個多小時,無法計算出結(jié)果
    long begin = System.currentTimeMillis();
    int n = 40;
    ForkJoinPool pool = ForkJoinPool.commonPool();
    log.info("ForkJoinPool初始化時間:{}ms", System.currentTimeMillis() - begin);
    log.info("斐波那契數(shù)列f({}) = {}", n, pool.invoke(new FibonacciTask(n)));
    long end = System.currentTimeMillis();
    log.info("ForkJoin方式執(zhí)行時間:{}ms", end - begin);
}

// 不用遞歸計算斐波那契數(shù)列反而更快
@Test
public void testFibonacci() {
    // 執(zhí)行f(50000) 使用 110ms
    // 輸出 f(50000) = 17438開頭的10450位長的整數(shù)
    long begin = System.currentTimeMillis();
    int n = 50000;
    log.info("斐波那契數(shù)列f({}) = {}", n, FibonacciUtil.fibonacci(n));
    long end = System.currentTimeMillis();
    log.info("函數(shù)方式執(zhí)行時間:{}ms", end - begin);
}

以上代碼見 StreamOtherTest 。

Fork/Join 最大的優(yōu)點是提供了工作竊取算法,可以在多核CPU處理器上加速并行處理,他并非多線程開發(fā)替代品。

那么他們之間有什么區(qū)別呢?

Fork/Join框架是從jdk7中新特性,它同ThreadPoolExecutor一樣,也實現(xiàn)了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執(zhí)行的任務(wù),而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入,如果沒有向構(gòu)造函數(shù)中傳入希望的線程數(shù)量,那么當(dāng)前計算機可用的CPU數(shù)量會被設(shè)置為線程數(shù)量作為默認(rèn)值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應(yīng)用比如快速排序算法。這里的要點在于,F(xiàn)orkJoinPool需要使用相對少的線程來處理大量的任務(wù)。比如要對1000萬個數(shù)據(jù)進行排序,那么會將這個任務(wù)分割成兩個500萬的排序任務(wù)和一個針對這兩組500萬數(shù)據(jù)的合并任務(wù)。以此類推,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設(shè)置一個閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時,停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時,會停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M行排序。那么到最后,所有的任務(wù)加起來會有大概2000000+個。問題的關(guān)鍵在于,對于一個任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。

所以當(dāng)使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務(wù)隊列中再添加一個任務(wù)并且在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時,就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時線程就能夠從隊列中選擇子任務(wù)執(zhí)行。

那么使用ThreadPoolExecutor或者ForkJoinPool,會有什么差異呢?

首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù),比如使用4個線程來完成超過200萬個任務(wù)。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬個具有父子關(guān)系的任務(wù)時,也需要200萬個線程,顯然這是不可行的。

在實踐中,ThreadPoolExecutor通常用于同時(并行)處理許多獨立請求(又稱為事務(wù)),F(xiàn)ork/Join通常用于加速一項連貫的工作任務(wù)。

parallelStream 并行化

parallelStream 其實就是一個并行執(zhí)行的流.它通過默認(rèn)的 ForkJoinPool ,可以提高你的多線程任務(wù)的速度。parallelStream 具有并行處理能力,處理的過程會分而治之,也就是將一個大任務(wù)切分成多個小任務(wù),這表示每個任務(wù)都是一個操作,可以并行處理。

parallelStream 的使用

使用方式:

  1. 創(chuàng)建時返回并行流:如 Collection<T>.parallelStream()
  2. 過程中轉(zhuǎn)換為并行流:如 Stream<T>.parallel()
  3. 如果需要,轉(zhuǎn)換為順序流:Stream<T>.sequential()
// 并行流時,并非按照1,2,3...500的順序輸出
IntStream.range(1, 500).parallel().forEach(System.out::println);

parallelStream 的陷阱

由于 parallelStream 使用的是 ForkJoinPool 中的 commonPool,該方法默認(rèn)創(chuàng)建程序運行時所在計算機處理器內(nèi)核數(shù)量的線程,當(dāng)同時存在多個工作并行執(zhí)行時,F(xiàn)orkJoinPool 中的線程將被消耗完,而當(dāng)有的worker因為執(zhí)行耗時操作,將導(dǎo)致其他工作也被阻塞,而此時我們也不清楚哪個任務(wù)導(dǎo)致了阻塞。這就是 parallelStream 的陷阱。

parallelStream 是無法預(yù)測的,而且想要正確地使用它有些棘手。幾乎任何 parallelStream 的使用都會影響程序中其他部分的性能,而且是一種無法預(yù)測的方式。但是在調(diào)用stream.parallel() 或者 parallelStream() 時候在我的代碼里之前我仍然會重新審視一遍他給我的程序究竟會帶來什么問題,他能有多大的提升,是否有使用他的意義。

那么到底是使用 stream 還是 parallelStream 呢?通過下面3個標(biāo)準(zhǔn)來鑒定

1. 是否需要并行?

在回答這個問題之前,你需要弄清楚你要解決的問題是什么,數(shù)據(jù)量有多大,計算的特點是什么?并不是所有的問題都適合使用并發(fā)程序來求解,比如當(dāng)數(shù)據(jù)量不大時,順序執(zhí)行往往比并行執(zhí)行更快。畢竟,準(zhǔn)備線程池和其它相關(guān)資源也是需要時間的。但是,當(dāng)任務(wù)涉及到I/O操作并且任務(wù)之間不互相依賴時,那么并行化就是一個不錯的選擇。通常而言,將這類程序并行化之后,執(zhí)行速度會提升好幾個等級。

2. 任務(wù)之間是否是獨立的?是否會引起任何競態(tài)條件?

如果任務(wù)之間是獨立的,并且代碼中不涉及到對同一個對象的某個狀態(tài)或者某個變量的更新操作,那么就表明代碼是可以被并行化的。

3. 結(jié)果是否取決于任務(wù)的調(diào)用順序?

由于在并行環(huán)境中任務(wù)的執(zhí)行順序是不確定的,因此對于依賴于順序的任務(wù)而言,并行化也許不能給出正確的結(jié)果。

創(chuàng)建流的其他方式

我們在第1篇中記錄了幾種創(chuàng)建流的方式,但還是遺漏了一部分,再此稍作補充。

從I/O通道

方式1:從緩存流中讀取為Stream,詳見如下代碼:

final String name = "明玉";
// 從網(wǎng)絡(luò)上讀取文字內(nèi)容
new BufferedReader(
        new InputStreamReader(
                new URL("https://www.txtxzz.com/txt/download/NWJhZjI3YjIzYWQ3N2UwMTZiNDQwYWE3")
                // new URL("https://api.apiopen.top/getAllUrl")
                        .openStream()))
        .lines()
        .filter(str -> StrUtil.contains(str, name))
        .forEach(System.out::println);

方式2:從文件系統(tǒng)獲取下級路徑及文件,詳見如下代碼:

// 獲取文件系統(tǒng)的下級路徑及其文件
Files.walk(FileSystems.getDefault().getPath("D:\\soft"))
        .forEach(System.out::println);

方式3:從文件系統(tǒng)獲取文件內(nèi)容,詳見如下代碼:

Files.lines(FileSystems.getDefault().getPath("D:\\", "a.txt"))
    // .parallel()
    .limit(200)
    .forEach(System.out::println);

方式4:讀取JarFile內(nèi)的文件,詳見如下代碼:

new JarFile("D:\\J2EE_Tools\\repository\\org\\springframework\\spring-core\\5.2.6.RELEASE\\spring-core-5.2.6.RELEASE.jar")
        .stream()
        .filter(entry -> StrUtil.contains(entry.getName(), "Method"))
        .forEach(System.out::println);

獲取隨機數(shù)字流

使用類Random的ints、longs、doubles的方法,根據(jù)傳遞不同的參數(shù),可以產(chǎn)生無限數(shù)字流、有限數(shù)字流、以及指定范圍的有限或無限數(shù)字流,示例如下:

double v = new Random()
        .doubles(30, 2, 45)
        .peek(System.out::println)
        .max()
        .getAsDouble();
log.info("一串隨機數(shù)的最大值為:{}", v);

位向量流

將BitSet中位向量為真的轉(zhuǎn)換為Stream,示例如下:

BitSet bitSet = new BitSet(8);
bitSet.set(1);
bitSet.set(6);
log.info("cardinality值{}", bitSet.cardinality());
bitSet.stream().forEach(System.out::println);

正則分割流

將字符串按照正則表達式分隔成子串流,示例如下:

Pattern.compile(":")
        .splitAsStream("boo:and:foo")
        .map(String::toUpperCase)
        .forEach(System.out::println);

Stream 的其他方法

轉(zhuǎn)為無序流

使用 unordered() 方法可將 Stream 隨時轉(zhuǎn)為無序流。

轉(zhuǎn)換為Spliterator

使用 spliterator() 方法可將 Stream 轉(zhuǎn)為 Spliterator,Spliterator 介紹請看 https://juejin.im/post/5cf2622de51d4550bf1ae7ff。

綜合示例

根據(jù)1962年第1屆百花獎至2018年第34屆百花獎數(shù)據(jù),有以下數(shù)據(jù),編寫代碼按照獲得最佳男主角的演員次數(shù)排名,次數(shù)相同的按照參演年份正序排,并打印他所參演的電影。

序號 最佳男主角 電影
第1屆1962年 崔嵬 《紅旗譜》
第2屆1963年 張良 《哥倆好
第3屆1980年 李仁堂 《淚痕》
第4屆1981年 達式常 《燕歸來》
第5屆1982年 王心剛 《知音》
第6屆1983年 嚴(yán)順開 《阿Q正傳》
第7屆1984年 楊在葆 《血,總是熱的》
第8屆1985年 呂曉禾 《高山下的花環(huán)》
第9屆1986年 楊在葆 《代理市長》
第10屆1987年 姜文 《芙蓉鎮(zhèn)》
第11屆1988年 張藝謀 《老井》
第12屆1989年 姜文 《春桃》
第13屆1990年 古月 《開國大典》
第14屆1991年 李雪健 《焦裕祿》
第15屆1992年 王鐵成 《周恩來》
第16屆1993年 古月 《毛澤東的故事》
第17屆1994年 李保田 《鳳凰琴》
第18屆1995年 李仁堂 《被告山杠爺》
第19屆1996年 張國立 《混在北京》
第20屆1997年 高明 《孔繁森》
第21屆1998年 葛優(yōu) 《甲方乙方》
第22屆1999年 趙本山 《男婦女主任》
第23屆2000年 潘長江 《明天我愛你》
第24屆2001年 王慶祥 《生死抉擇》
第25屆2002年 葛優(yōu) 《大腕》
第26屆2003年 盧奇 《鄧小平》
第27屆2004年 葛優(yōu) 《手機》
第27屆2004年 李幼斌 《驚心動魂》
第28屆2006年 吳軍 《張思德》
第29屆2008年 張涵予 《集結(jié)號》
第30屆2010年 陳坤 《畫皮》
第31屆2012年 文章 《失戀33天》
第32屆2014年 黃曉明 《中國合伙人》
第33屆2016年 馮紹峰 《狼圖騰》
第34屆2018年 吳京 《戰(zhàn)狼2》

根據(jù)題目要求,創(chuàng)建 HundredFlowersAwards 實體用來存儲上述數(shù)據(jù),我們分析題目要求最終需要轉(zhuǎn)換為以演員為主的信息,然后再根據(jù)演員的獲獎次數(shù)及出演年份做排序。
所以創(chuàng)建 ActorInfo 實體,包含 演員姓名和出演電影的信息。出演電影也需創(chuàng)建實體 FilmInfo ,包含 出演年份和電影名稱。

有了如上存儲數(shù)據(jù)實體信息后,代碼實現(xiàn)邏輯如下:

  1. 將百花獎的集合數(shù)據(jù)轉(zhuǎn)換為 Stream
  2. 將該數(shù)據(jù)流轉(zhuǎn)換為Map類型,Map 的 key 為演員名,Map 的 Value 為演員信息
  3. 對于重復(fù)出現(xiàn)的演員,我們需要把電影信息追加到該演員出現(xiàn)的電影列表中
  4. 對于處理完的 Map 數(shù)據(jù),將該 Map 的 values 數(shù)據(jù)再次轉(zhuǎn)換為 Stream
  5. 將該 Stream 排序即可。
list.stream()
    .collect(Collectors.toMap(HundredFlowersAwards::getActorName, ActorInfo::new, ActorInfo::addFilmInfos))
    .values()
    .stream()
    .sorted(new ActorComparator())
    .forEach(System.out::println);

本節(jié)代碼見 StreamOtherTest 。

經(jīng)過幾天的學(xué)習(xí)和總結(jié),以上就是 Java Stream Api 的全部內(nèi)容了。從開始認(rèn)識 Stream Api,我們逐漸了解了使用 Stream Api 的流程:創(chuàng)建 Stream 、中間操作、終端操作。
我們對創(chuàng)建 Stream 、中間操作、終端操作的各個 api 方法進行了介紹及案例演示,之后我們還單獨抽出一節(jié)講解了 Collector 接口的實現(xiàn)及使用。
上述內(nèi)容雖然文字不多,大部分都在代碼中給出了演示,希望大家能下載下來代碼并運行,以加深印象。

以上是前傳部分的學(xué)習(xí)內(nèi)容了,接下來我們將進入到 Reactor 部分的學(xué)習(xí)。

源碼下載:https://github.com/crystalxmumu/spring-web-flux-study-note

參考

  1. 【Java8新特性】關(guān)于Java8的Stream API,看這一篇就夠了
  2. 一文秒懂 Java Fork/Join
  3. 深入淺出parallelStream
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容