Java8采用stream、parallelStream迭代的區(qū)別

我們都知道在Java 8 API添加了一個(gè)新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。Stream 使用一種類似用 SQL 語(yǔ)句從數(shù)據(jù)庫(kù)查詢數(shù)據(jù)的直觀方式來(lái)提供一種對(duì) Java 集合運(yùn)算和表達(dá)的高階抽象。Stream API可以極大提高Java程序員的生產(chǎn)力,讓程序員寫出高效率、干凈、簡(jiǎn)潔的代碼。這種風(fēng)格將要處理的元素集合看作一種流, 流在管道中傳輸, 并且可以在管道的節(jié)點(diǎn)上進(jìn)行處理, 比如篩選, 排序,聚合等。元素流在管道中經(jīng)過(guò)中間操作(intermediate operation)的處理,最后由最終操作(terminal operation)得到前面處理的結(jié)果。
通過(guò)查看API能夠看到Java8 API為我們提供了Stream和parallelStream兩個(gè)不同的方法,那么同樣是流處理,這兩個(gè)方法又有什么區(qū)別呢?首先我們來(lái)看看以下的代碼:

public static void main(String[] args) {
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        System.out.println("運(yùn)行結(jié)果:");
        // stream method
        numberList.stream().forEach(number -> {
            System.out.print(String.format("%d ",number));
        });
        System.out.println("\r");
        // parallelStream method
        numberList.parallelStream().forEach(number -> {
            System.out.print(String.format("%d ",number));
        });
        System.out.println("\r");
        // parallelStream method
        numberList.parallelStream().forEachOrdered(number -> {
            System.out.print(String.format("%d ",number));
        });
        System.out.println("\r");
}

通過(guò)多次運(yùn)行上述代碼,我們可以發(fā)現(xiàn),通過(guò)parallelStream方法迭代集合,每次輸出的結(jié)果都不一樣,而通過(guò)steam方法或parallelStream方法并以forEachOrdered方式,每次執(zhí)行輸出的結(jié)果都是一樣的,并且順序符合集合元素的存放順序。
那么,為什么會(huì)造成這樣的結(jié)果差異呢,難道parallelStram是采用多線程并行的方式運(yùn)行?于是,我們進(jìn)一步修改下我們的代碼來(lái)驗(yàn)證一下猜測(cè)。

public static void main(String[] args) {
        System.out.println("運(yùn)行結(jié)果:");
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        // stream method
        numberList.stream().forEach(number -> {
            System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
        });
        System.out.println("\r");
        // parallelStream method
        numberList.parallelStream().forEach(number -> {
            System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
        });
        System.out.println("\r");
        // parallelStream method
        numberList.parallelStream().forEachOrdered(number -> {
            System.out.println(String.format("ParallelStream forEach Ordered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
        });
        System.out.println("\r");
}

修改后代碼運(yùn)行結(jié)果如下:

運(yùn)行結(jié)果:
Stream The Current Thread's ID is 1 and output number 1 
Stream The Current Thread's ID is 1 and output number 2 
Stream The Current Thread's ID is 1 and output number 3 
Stream The Current Thread's ID is 1 and output number 4 
Stream The Current Thread's ID is 1 and output number 5 
Stream The Current Thread's ID is 1 and output number 6 
Stream The Current Thread's ID is 1 and output number 7 
Stream The Current Thread's ID is 1 and output number 8 
Stream The Current Thread's ID is 1 and output number 9 

ParallelStream The Current Thread's ID is 1 and output number 6 
ParallelStream The Current Thread's ID is 19 and output number 9 
ParallelStream The Current Thread's ID is 18 and output number 1 
ParallelStream The Current Thread's ID is 15 and output number 2 
ParallelStream The Current Thread's ID is 17 and output number 4 
ParallelStream The Current Thread's ID is 14 and output number 8 
ParallelStream The Current Thread's ID is 13 and output number 3 
ParallelStream The Current Thread's ID is 16 and output number 7 
ParallelStream The Current Thread's ID is 1 and output number 5 

ParallelStream forEach Ordered The Current Thread's ID is 15 and output number 1 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 2 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 3 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 4 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 5 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 6 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 7 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 8 
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 9 

Disconnected from the target VM, address: '127.0.0.1:52976', transport: 'socket'

Process finished with exit code 0

通過(guò)上面的運(yùn)行結(jié)果,我們可以看到通過(guò)ParallelStream方法迭代的方法,是采用多線程的,可以看過(guò)每次輸出都是不同的線程ID,而ParallelStream(). forEach Ordered是在多線程的基礎(chǔ)上,保證了數(shù)據(jù)的順序輸出。到此,我們驗(yàn)證了我們的猜測(cè)ParallelStream方法是多線程的,而關(guān)于線程是否并行的驗(yàn)證,我們需進(jìn)一步修改下我們的代碼,于是有了下面的代碼:

 public static void main(String[] args) throws InterruptedException {
        System.out.println("運(yùn)行結(jié)果:");
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        //for
        Long forBegin = System.currentTimeMillis();
        for(Integer number : numberList){
            //System.out.println(String.format("For The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
            Thread.sleep(1000);
        }
        System.out.println(String.format("For execute time cost %d ms",System.currentTimeMillis()-forBegin));
        System.out.println("\r");
        // stream method
        Long streamBegin = System.currentTimeMillis();
        numberList.stream().forEach(number -> {
            //System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(String.format("Stream execute time cost %d ms",System.currentTimeMillis()-streamBegin));
        System.out.println("\r");
        // parallelStream method
        Long parallelStreamBegin = System.currentTimeMillis();
        numberList.parallelStream().forEach(number -> {
            //System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(String.format("ParallelStream execute time cost %d ms",System.currentTimeMillis()-parallelStreamBegin));
        System.out.println("\r");
        // parallelStream method
        Long parallelStreamForEachOrderBegin = System.currentTimeMillis();
        numberList.parallelStream().forEachOrdered(number -> {
            //System.out.println(String.format("ParallelStream forEachOrdered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(String.format("ParallelStream forEachOrdered execute time cost %d ms",System.currentTimeMillis()-parallelStreamForEachOrderBegin));
        System.out.println("\r");
}

這里我們加入了傳統(tǒng)的for循環(huán)迭代方式,加入一起比較,由于要體現(xiàn)多線程并行的優(yōu)勢(shì),這里我們將每次循環(huán)里加入線程休眠1秒鐘,運(yùn)行后的結(jié)果如下:

運(yùn)行結(jié)果:
For execute time cost 9032 ms

Stream execute time cost 9079 ms

ParallelStream execute time cost 2011 ms

ParallelStream forEachOrdered execute time cost 9037 ms

通過(guò)運(yùn)行結(jié)果,我們可以看到parallelStream().forEach方式耗時(shí)最短,而另外其他3種方式運(yùn)行的耗時(shí)都幾乎接近。因此,我們可以斷定我們的猜測(cè)是正確的,parallelStream().forEach是通過(guò)多線程并行的方式來(lái)執(zhí)行我們的代碼,而parallelStream(). forEachOrdered也是采用多線程,但由于加入了順序執(zhí)行約束,故程序是采用多線程同步的方式運(yùn)行的,最終耗時(shí)與for、stream兩種單線程執(zhí)行的耗時(shí)接近,但parallelStream(). forEachOrdered由于是多線程,與for、stream兩種單線程的方式相比,優(yōu)勢(shì)在于很好的利用了CPU多核的資源。感興趣的同學(xué)可以通過(guò)以下代碼查看CPU的核數(shù),并通過(guò)jstack dump出堆棧來(lái)查看線程對(duì)CPU使用的情況。

System.out.println("系統(tǒng)一共有"+Runtime.getRuntime().availableProcessors()+"個(gè)cpu");
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容