Java 8 中的 Streams

為什么需要 Stream

Stream 作為 Java 8 的一大亮點,它與 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大數(shù)據(jù)實時處理的 Stream。Java 8 中的 Stream 是對集合(Collection)對象功能的增強(qiáng),它專注于對集合對象進(jìn)行各種非常便利、高效的聚合操作(aggregate operation),或者大批量數(shù)據(jù)操作 (bulk data operation)。Stream API 借助于同樣新出現(xiàn)的 Lambda 表達(dá)式,極大的提高編程效率和程序可讀性。同時它提供串行和并行兩種模式進(jìn)行匯聚操作,并發(fā)模式能夠充分利用多核處理器的優(yōu)勢,使用 fork/join 并行方式來拆分任務(wù)和加速處理過程。通常編寫并行代碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多線程的代碼,就可以很方便地寫出高性能的并發(fā)程序。所以說,Java 8 中首次出現(xiàn)的 java.util.stream 是一個函數(shù)式語言+多核時代綜合影響的產(chǎn)物。

  • 什么是聚合操作

在傳統(tǒng)的 J2EE 應(yīng)用中,Java 代碼經(jīng)常不得不依賴于關(guān)系型數(shù)據(jù)庫的聚合操作來完成諸如:
客戶每月平均消費金額
最昂貴的在售商品
本周完成的有效訂單(排除了無效的)
取十個數(shù)據(jù)樣本作為首頁推薦
這類的操作。

  • 但在當(dāng)今這個數(shù)據(jù)大爆炸的時代,在數(shù)據(jù)來源多樣化、數(shù)據(jù)海量化的今天,很多時候不得不脫離 RDBMS,或者以底層返回的數(shù)據(jù)為基礎(chǔ)進(jìn)行更上層的數(shù)據(jù)統(tǒng)計。而 Java 的集合 API 中,僅僅有極少量的輔助型方法,更多的時候是程序員需要用 Iterator 來遍歷集合,完成相關(guān)的聚合應(yīng)用邏輯。這是一種遠(yuǎn)不夠高效、笨拙的方法。在 Java 7 中,如果要發(fā)現(xiàn) type 為 grocery 的所有交易,然后返回以交易值降序排序好的交易 ID 集合,-

  • 我們需要這樣寫:
    清單 1. Java 7 的排序、取值實現(xiàn)

List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
  if(t.getType() == Transaction.GROCERY){
  groceryTransactions.add(t);
}
}

Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});

List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}

而在 Java 8 使用 Stream,代碼更加簡潔易讀;而且使用并發(fā)模式,程序執(zhí)行速度更快。
清單 2. Java 8 的排序、取值實現(xiàn)

List<Integer> transactionsIds = transactions.parallelStream().
 filter(t -> t.getType() == Transaction.GROCERY).
 sorted(comparing(Transaction::getValue).reversed()).
 map(Transaction::getId).
 collect(toList());

Stream 總覽

什么是流

Stream 不是集合元素,它不是數(shù)據(jù)結(jié)構(gòu)并不保存數(shù)據(jù),它是有關(guān)算法和計算的,它更像一個高級版本的 Iterator。原始版本的 Iterator,用戶只能顯式地一個一個遍歷元素并對其執(zhí)行某些操作;高級版本的 Stream,用戶只要給出需要對其包含的元素執(zhí)行什么操作,比如 “過濾掉長度大于 10 的字符串”、“獲取每個字符串的首字母”等,Stream 會隱式地在內(nèi)部進(jìn)行遍歷,做出相應(yīng)的數(shù)據(jù)轉(zhuǎn)換。
Stream 就如同一個迭代器(Iterator),單向,不可往復(fù),數(shù)據(jù)只能遍歷一次,遍歷過一次后即用盡了,就好比流水從面前流過,一去不復(fù)返。
而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顧名思義,當(dāng)使用串行方式去遍歷時,每個 item 讀完后再讀下一個 item。而使用并行去遍歷時,數(shù)據(jù)會被分成多個段,其中每一個都在不同的線程中處理,然后將結(jié)果一起輸出。Stream 的并行操作依賴于 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務(wù)和加速處理過程。Java 的并行 API 演變歷程基本如下:
1.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
Stream 的另外一大特點是,數(shù)據(jù)源本身可以是無限的。

流的構(gòu)成

當(dāng)我們使用一個流的時候,通常包括三個基本步驟:
獲取一個數(shù)據(jù)源(source)→ 數(shù)據(jù)轉(zhuǎn)換→執(zhí)行操作獲取想要的結(jié)果,每次轉(zhuǎn)換原有 Stream 對象不改變,返回一個新的 Stream 對象(可以有多次轉(zhuǎn)換),這就允許對其操作可以像鏈條一樣排列,變成一個管道,如下圖所示。

圖 1. 流管道 (Stream Pipeline) 的構(gòu)成
image

有多種方式生成 Stream Source:
從 Collection 和數(shù)組

Collection.stream()
Collection.parallelStream()
Arrays.stream(T array) or Stream.of()
從 BufferedReader

java.io.BufferedReader.lines()
靜態(tài)工廠
java.util.stream.IntStream.range()
java.nio.file.Files.walk()
自己構(gòu)建
java.util.Spliterator
其它
Random.ints()
BitSet.stream()
Pattern.splitAsStream(java.lang.CharSequence)
JarFile.stream()

流的操作類型分為兩種:

  • Intermediate:一個流可以后面跟隨零個或多個 intermediate 操作。其目的主要是打開流,做出某種程度的數(shù)據(jù)映射/過濾,然后返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅調(diào)用到這類方法,并沒有真正開始流的遍歷。
  • Terminal:一個流只能有一個 terminal 操作,當(dāng)這個操作執(zhí)行后,流就被使用“光”了,無法再被操作。所以這必定是流的最后一個操作。Terminal 操作的執(zhí)行,才會真正開始流的遍歷,并且會生成一個結(jié)果,或者一個 side effect。
  • 在對于一個 Stream 進(jìn)行多次轉(zhuǎn)換操作 (Intermediate 操作),每次都對 Stream 的每個元素進(jìn)行轉(zhuǎn)換,而且是執(zhí)行多次,這樣時間復(fù)雜度就是 N(轉(zhuǎn)換次數(shù))個 for 循環(huán)里把所有操作都做掉的總和嗎?其實不是這樣的,轉(zhuǎn)換操作都是 lazy 的,多個轉(zhuǎn)換操作只會在 Terminal 操作的時候融合起來,一次循環(huán)完成。我們可以這樣簡單的理解,Stream 里有個操作函數(shù)的集合,每次轉(zhuǎn)換操作就是把轉(zhuǎn)換函數(shù)放入這個集合中,在 Terminal 操作的時候循環(huán) Stream 對應(yīng)的集合,然后對每個元素執(zhí)行所有的函數(shù)。
    還有一種操作被稱為 short-circuiting。用以指:
    對于一個 intermediate 操作,如果它接受的是一個無限大(infinite/unbounded)的 Stream,但返回一個有限的新 Stream。
    對于一個 terminal 操作,如果它接受的是一個無限大的 Stream,但能在有限的時間計算出結(jié)果。
    當(dāng)操作一個無限大的 Stream,而又希望在有限時間內(nèi)完成操作,則在管道內(nèi)擁有一個 short-circuiting 操作是必要非充分條件。
    清單 3. 一個流操作的示例
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
 .mapToInt(w -> w.getWeight())
 .sum();

stream() 獲取當(dāng)前小物件的 source,filter 和 mapToInt 為 intermediate 操作,進(jìn)行數(shù)據(jù)篩選和轉(zhuǎn)換,最后一個 sum() 為 terminal 操作,對符合條件的全部小物件作重量求和。

流的使用詳解

簡單說,對 Stream 的使用就是實現(xiàn)一個 filter-map-reduce 過程,產(chǎn)生一個最終結(jié)果,或者導(dǎo)致一個副作用(side effect)。
下面提供最常見的幾種構(gòu)造 Stream 的樣例。

  • 構(gòu)造流的幾種常見方法
// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();

需要注意的是,對于基本數(shù)值型,目前有三種對應(yīng)的包裝類型 Stream:
IntStream、LongStream、DoubleStream。當(dāng)然我們也可以用 Stream<Integer>、Stream<Long> >、Stream<Double>,但是 boxing 和 unboxing 會很耗時,所以特別為這三種基本數(shù)值型提供了對應(yīng)的 Stream。
Java 8 中還沒有提供其它數(shù)值型 Stream,因為這將導(dǎo)致擴(kuò)增的內(nèi)容較多。而常規(guī)的數(shù)值型聚合運算可以通過上面三種 Stream 進(jìn)行。

  • 數(shù)值流的構(gòu)造
IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
IntStream.range(1, 3).forEach(System.out::println);
IntStream.rangeClosed(1, 3).forEach(System.out::println);

  • 流轉(zhuǎn)換為其它數(shù)據(jù)結(jié)構(gòu)
// 1. Array
String[] strArray1 = stream.toArray(String[]::new);
// 2. Collection
List<String> list1 = stream.collect(Collectors.toList());
List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
Set set1 = stream.collect(Collectors.toSet());
Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
// 3. String
String str = stream.collect(Collectors.joining()).toString();

一個 Stream 只可以使用一次,上面的代碼為了簡潔而重復(fù)使用了數(shù)次。

流的操作

接下來,當(dāng)把一個數(shù)據(jù)結(jié)構(gòu)包裝成 Stream 后,就要開始對里面的元素進(jìn)行各類操作了。常見的操作可以歸類如下。
Intermediate:
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
Terminal:
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
Short-circuiting:
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit
我們下面看一下 Stream 的比較典型用法。

map/flatMap

  • 轉(zhuǎn)換大寫
List<String> output = wordList.stream().
map(String::toUpperCase).
collect(Collectors.toList());
  • 平方數(shù)
List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> squareNums = nums.stream().
map(n -> n * n).
collect(Collectors.toList());

從上面例子可以看出,map 生成的是個 1:1 映射,每個輸入元素,都按照規(guī)則轉(zhuǎn)換成為另外一個元素。還有一些場景,是一對多映射關(guān)系的,這時需要 flatMap。

  • 一對多
Stream<List<Integer>> inputStream = Stream.of(
 Arrays.asList(1),
 Arrays.asList(2, 3),
 Arrays.asList(4, 5, 6)
 );
Stream<Integer> outputStream = inputStream.
flatMap((childList) -> childList.stream());

flatMap 把 input Stream 中的層級結(jié)構(gòu)扁平化,就是將最底層元素抽出來放到一起,最終 output 的新 Stream 里面已經(jīng)沒有 List 了,都是直接的數(shù)字。

filter

filter 對原始 Stream 進(jìn)行某項測試,通過測試的元素被留下來生成一個新 Stream。

  • 留下偶數(shù)
Integer[] sixNums = {1, 2, 3, 4, 5, 6};
Integer[] evens =Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容