我們都知道在Java 8 API添加了一個(gè)新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。Stream 使用一種類似用 SQL 語(yǔ)句從數(shù)據(jù)庫(kù)查詢數(shù)據(jù)的直觀方式來(lái)提供一種對(duì) Java 集合運(yùn)算和表達(dá)的高階抽象。Stream API可以極大提高Java程序員的生產(chǎn)力,讓程序員寫出高效率、干凈、簡(jiǎn)潔的代碼。這種風(fēng)格將要處理的元素集合看作一種流, 流在管道中傳輸, 并且可以在管道的節(jié)點(diǎn)上進(jìn)行處理, 比如篩選, 排序,聚合等。元素流在管道中經(jīng)過(guò)中間操作(intermediate operation)的處理,最后由最終操作(terminal operation)得到前面處理的結(jié)果。
通過(guò)查看API能夠看到Java8 API為我們提供了Stream和parallelStream兩個(gè)不同的方法,那么同樣是流處理,這兩個(gè)方法又有什么區(qū)別呢?首先我們來(lái)看看以下的代碼:
public static void main(String[] args) {
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
System.out.println("運(yùn)行結(jié)果:");
// stream method
numberList.stream().forEach(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEach(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEachOrdered(number -> {
System.out.print(String.format("%d ",number));
});
System.out.println("\r");
}
通過(guò)多次運(yùn)行上述代碼,我們可以發(fā)現(xiàn),通過(guò)parallelStream方法迭代集合,每次輸出的結(jié)果都不一樣,而通過(guò)steam方法或parallelStream方法并以forEachOrdered方式,每次執(zhí)行輸出的結(jié)果都是一樣的,并且順序符合集合元素的存放順序。
那么,為什么會(huì)造成這樣的結(jié)果差異呢,難道parallelStram是采用多線程并行的方式運(yùn)行?于是,我們進(jìn)一步修改下我們的代碼來(lái)驗(yàn)證一下猜測(cè)。
public static void main(String[] args) {
System.out.println("運(yùn)行結(jié)果:");
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
// stream method
numberList.stream().forEach(number -> {
System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEach(number -> {
System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
// parallelStream method
numberList.parallelStream().forEachOrdered(number -> {
System.out.println(String.format("ParallelStream forEach Ordered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
});
System.out.println("\r");
}
修改后代碼運(yùn)行結(jié)果如下:
運(yùn)行結(jié)果:
Stream The Current Thread's ID is 1 and output number 1
Stream The Current Thread's ID is 1 and output number 2
Stream The Current Thread's ID is 1 and output number 3
Stream The Current Thread's ID is 1 and output number 4
Stream The Current Thread's ID is 1 and output number 5
Stream The Current Thread's ID is 1 and output number 6
Stream The Current Thread's ID is 1 and output number 7
Stream The Current Thread's ID is 1 and output number 8
Stream The Current Thread's ID is 1 and output number 9
ParallelStream The Current Thread's ID is 1 and output number 6
ParallelStream The Current Thread's ID is 19 and output number 9
ParallelStream The Current Thread's ID is 18 and output number 1
ParallelStream The Current Thread's ID is 15 and output number 2
ParallelStream The Current Thread's ID is 17 and output number 4
ParallelStream The Current Thread's ID is 14 and output number 8
ParallelStream The Current Thread's ID is 13 and output number 3
ParallelStream The Current Thread's ID is 16 and output number 7
ParallelStream The Current Thread's ID is 1 and output number 5
ParallelStream forEach Ordered The Current Thread's ID is 15 and output number 1
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 2
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 3
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 4
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 5
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 6
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 7
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 8
ParallelStream forEach Ordered The Current Thread's ID is 14 and output number 9
Disconnected from the target VM, address: '127.0.0.1:52976', transport: 'socket'
Process finished with exit code 0
通過(guò)上面的運(yùn)行結(jié)果,我們可以看到通過(guò)ParallelStream方法迭代的方法,是采用多線程的,可以看過(guò)每次輸出都是不同的線程ID,而ParallelStream(). forEach Ordered是在多線程的基礎(chǔ)上,保證了數(shù)據(jù)的順序輸出。到此,我們驗(yàn)證了我們的猜測(cè)ParallelStream方法是多線程的,而關(guān)于線程是否并行的驗(yàn)證,我們需進(jìn)一步修改下我們的代碼,于是有了下面的代碼:
public static void main(String[] args) throws InterruptedException {
System.out.println("運(yùn)行結(jié)果:");
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
//for
Long forBegin = System.currentTimeMillis();
for(Integer number : numberList){
//System.out.println(String.format("For The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
Thread.sleep(1000);
}
System.out.println(String.format("For execute time cost %d ms",System.currentTimeMillis()-forBegin));
System.out.println("\r");
// stream method
Long streamBegin = System.currentTimeMillis();
numberList.stream().forEach(number -> {
//System.out.println(String.format("Stream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("Stream execute time cost %d ms",System.currentTimeMillis()-streamBegin));
System.out.println("\r");
// parallelStream method
Long parallelStreamBegin = System.currentTimeMillis();
numberList.parallelStream().forEach(number -> {
//System.out.println(String.format("ParallelStream The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("ParallelStream execute time cost %d ms",System.currentTimeMillis()-parallelStreamBegin));
System.out.println("\r");
// parallelStream method
Long parallelStreamForEachOrderBegin = System.currentTimeMillis();
numberList.parallelStream().forEachOrdered(number -> {
//System.out.println(String.format("ParallelStream forEachOrdered The Current Thread's ID is %d and output number %d ",Thread.currentThread().getId(),number));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("ParallelStream forEachOrdered execute time cost %d ms",System.currentTimeMillis()-parallelStreamForEachOrderBegin));
System.out.println("\r");
}
這里我們加入了傳統(tǒng)的for循環(huán)迭代方式,加入一起比較,由于要體現(xiàn)多線程并行的優(yōu)勢(shì),這里我們將每次循環(huán)里加入線程休眠1秒鐘,運(yùn)行后的結(jié)果如下:
運(yùn)行結(jié)果:
For execute time cost 9032 ms
Stream execute time cost 9079 ms
ParallelStream execute time cost 2011 ms
ParallelStream forEachOrdered execute time cost 9037 ms
通過(guò)運(yùn)行結(jié)果,我們可以看到parallelStream().forEach方式耗時(shí)最短,而另外其他3種方式運(yùn)行的耗時(shí)都幾乎接近。因此,我們可以斷定我們的猜測(cè)是正確的,parallelStream().forEach是通過(guò)多線程并行的方式來(lái)執(zhí)行我們的代碼,而parallelStream(). forEachOrdered也是采用多線程,但由于加入了順序執(zhí)行約束,故程序是采用多線程同步的方式運(yùn)行的,最終耗時(shí)與for、stream兩種單線程執(zhí)行的耗時(shí)接近,但parallelStream(). forEachOrdered由于是多線程,與for、stream兩種單線程的方式相比,優(yōu)勢(shì)在于很好的利用了CPU多核的資源。感興趣的同學(xué)可以通過(guò)以下代碼查看CPU的核數(shù),并通過(guò)jstack dump出堆棧來(lái)查看線程對(duì)CPU使用的情況。
System.out.println("系統(tǒng)一共有"+Runtime.getRuntime().availableProcessors()+"個(gè)cpu");