系統(tǒng)的整理一下 java8 Streams 的使用。
思想
是函數式編程(functional programming)的一種 Java 實現
強調將計算過程分解成可復用的函數,主要使用 map 方法和 reduce 方法組合而成的 MapReduce 算法,最好的實現 Apache Hadoop
關于函數式編程,請參考阮一峰 的 函數式編程初探
Streams 和 Collections 的不同
- 不儲存元素。 Stream 不是儲存元素的數據結構;相反的,它通過管道對源就像數據結構、數組、構造方法、IO流的元素進行操作。
- 純粹的方法。 Stream 上的操作會產生結果,但不會修改其來源。例如,過濾從集合獲取的流會生成一個沒有過濾元素的新 Stream,而不是從源集合中刪除元素。
- 惰性化。 許多流操作(例如過濾,映射或重復移除)可以被懶性化實現,從而為優(yōu)化提供機會。例如,“find the first String with three consecutive vowels”不需要檢查所有的輸入字符串。流操作分為中間 intermediate (Stream-producing) 操作和終端 terminal (value-or side-effect-producing) 操作,intermediate 操作總是惰性的。
- 可能沒有限制。 盡管集合的大小有限,但流不需要。諸如 limit(n) 或 findFirstf() 之類的短路操作可以允許無限流上的計算在有限的時間內完成。
- 一次性。 流的元素在流的生命周期中僅訪問過一次。像 Iterator 一樣,必須生成一個新的流來重新訪問源的相同元素。
集合和流,它們有不同的關注點,集合主要關注集合的有效管理和訪問。
相反,流不直接提供訪問和操作元素的手段,而是關注于聲明性地描述它們的來源和將在該來源上進行的計算操作。如果流操作沒有你想要的功能,你可以使用 iterator() 或 spliterator() 來遍歷操作。
**
增強 for 循環(huán)內部還是使用 iterator() 來進行遍歷,它們同屬于外部遍歷器,java8 集合的 forEach 和 stream 的 forEach() 屬于內部遍歷器,流的內部遍歷器可以使用到流的并行(parallel)特性,從而加快速度。
**
- 枚舉,迭代器和增強的 for 循環(huán)都是外部迭代器(記著方法 iterator(),next() 或 hasNext() 嗎?)。
- java8 集合 forEach 和 stream 的 forEach 為外部迭代器。
流操作分為中間操作(intermediate operations)和終端操作(terminal operations),結合形成流管道。流管道由源(例如集合,數組,生成器函數或 I/O 通道)組成; 隨后是零個或多個中間操作,例如 Stream.filter 或 Stream.map 和諸如 Stream.forEach 或 Stream.reduce 之類的終端操作。
Intermediate operations(中間操作)
中間操作返回一個新的流(Stream<T>)。
他們總是惰性的,執(zhí)行諸如 filter() 之類的中間操作實際上并不執(zhí)行任何過濾,而是創(chuàng)建一個新的流,該流在遍歷時包含與給定謂詞相匹配的初始流的元素。在管道的終端操作被執(zhí)行時對源的流水遍歷才會開始。
中間操作進一步分為無狀態(tài)和有狀態(tài)操作。無狀態(tài)操作(如 filter 和 map)在處理新元素時不會保留先前看到的元素的狀態(tài) -- 每個元素可以獨立于其他元素上的操作進行處理。有狀態(tài)的操作(如 distinct 和 sorted)可能會在處理新元素時結合之前看到的元素的狀態(tài)。
有狀態(tài)的操作可能需要在生成結果之前處理整個輸入。例如,只有在查看了流的所有元素之后,才能對排序流產生任何結果。因此,在并行計算中,一些包含有狀態(tài)中間操作的管道可能需要對數據進行多次傳遞,或者可能需要緩存重要數據。只包含無狀態(tài)中間操作的流水線可以一次處理,無論是順序處理還是并行處理,只需最少的數據緩沖。
下面列舉個人常用的一些操作:
-
map
返回由給定函數作用于此流的元素后產生的結果組成的流。
給定函數為無干涉,無狀態(tài)的操作作用于每個元素。不然之后的操作結果可能不會很準確。
-
無干涉
無干涉主要是指在流操作期間不去修改源流。 -
無狀態(tài)
無狀態(tài)是指我們在處理時不產生中間狀態(tài),操作不依賴之前的狀態(tài)。
-
distinct
distinct 保證輸出的流中包含唯一的元素,它是通過 Object.equals(Object) 來檢查是否包含相同的元素。它是一個有狀態(tài)的中間操作。
在并行流中對無序數組去重效率更高,對于有序數組可以使用
unordered()無序檢索提高速度,或者使用sequential()來實現串行。相反有序數組更適合使用串行流。
-
peek
peek 產生一個和原流相同的流,并在遍歷流的過程中去消費每個元素。
使用 peek 的主要目的是“看,不要動”
此方法主要用于支持調試,您希望在元素流經管道中的某個點時看到這些元素:請謹慎使用此方法作為副作用,因為它有可能會修改源流。
-
flatMap
返回一個流,該流包含將原流的每個元素替換為映射函數應用于每個元素而生成的映射流的內容的結果。每個映射流都將其內容放入此流后關閉。(如果映射流為空,則使用空流代替)
簡而言之,就是將原流每個元素通過映射函數生成的新流組合成一個新流。
flatMap() 操作具有對流的元素應用一對多轉換,然后將生成的元素展平為新流的效果。
example
orders 是采購訂單流,并且每個采購訂單都包含一系列采購列,則以下內容會生成包含所有訂單中的所有采購列的流:
orders.flatMap(order -> order.getLineItems().stream())...
如果 path 是文件的路徑,那么下面的內容會生成包含在該文件中的單詞流:
Stream<String> lines = Files.lines(path, StandardCharsets.UTF_8);
Stream<String> words = lines.flatMap(line -> Stream.of(line.split(" +")));
上面 flatmap 中的映射函數使用的正則比較簡單,具體單詞劃分的正則不是這樣。
-
count
返回此流中元素的數量。這是一個簡寫,相當于:
return mapToLong(e -> 1L).sum();
Terminal operations(終端操作)
終端操作返回確定類型的結果
如 Stream.forEach 或 IntStream.sum,可能會遍歷流以產生結果或副作用(side-effect)。終端操作執(zhí)行后,流管道被視為消耗,并不能再使用;如果你需要再次遍歷相同的數據源,則必須返回到數據源以獲取新的流。在幾乎所有情況下,終端操作都非常急切,在返回之前完成數據源的遍歷和管道的處理。只有終端操作 iterator() 和 spliterator() 不是。
副作用(side-effect)
副作用可能會違反無狀態(tài)要求和對線程安全產生危害。
許多計算可能會產生副作用,但是可以更安全有效地表達,而不會產生副作用,例如使用 reduction 而不是 mutable accumulators。少量流操作(例如 forEach() 和 peek())只能通過副作用操作;這些應該小心使用。
比如我們在對流操做以期望得到想要的結果,而無意修改了原始流,便產生了副作用。
折疊(Reduction operations)
歸約操作(也稱為折疊)采用一系列輸入元素,并通過重復應用組合操作(例如查找一組數字的和或最大值)或將元素累加到列表中來將它們組合為單個匯總結果。流類具有多種形式的通用歸約操作,稱為 reduce() 和 collect(),以及多個專用簡化形式,如 sum(),max() 或 count()。
可變歸約(Mutable reduction)
可變歸約操作將輸入元素累加到可變結果容器中,例如 Collection 或 StringBuilder,因為它處理流中的元素。
可變縮減操作稱為 collect(),因為它將所需結果一起收集到結果容器(如集合)中。 收集操作需要三個功能:構造結果容器的新實例的供應者函數,將輸入元素并入結果容器的累加器函數以及將一個結果容器的內容合并到另一個結果容器的組合函數。
供應器 (supplier())
-
累加器 (accumulator())
- 組合器 (combiner())
- 修整器 (finisher()) 可省略
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
例如下面的代碼:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
我們可以寫成:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
簡寫作:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
在這里,我們的供應器是 ArrayList 構造函數,累加器將字符串化的元素添加到 ArrayList,組合器只是簡單地使用 addAll 將字符串從一個容器復制到另一個容器中。
collect 的供應器,累加器和組合器三個方面緊密耦合。我們可以使用抽象的 Collector 來包含三個方面,上面的代碼可以重寫為:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
收集器(Collectors)實現類
一種可變減少操作,將輸入元素累加到可變結果容器中,可選地,在處理完所有輸入元素后,將累加結果轉換為最終表示形式??s減操作可以按順序執(zhí)行也可以并行執(zhí)行。
可變減少操作的例子包括:
- 將元素累加到集合中;
toListtoMaptoSettoCollection - 使用
StringBuilder連接字符串;
joining - 計算關于總和,最小值,最大值或平均值等元素的摘要信息;
- 求和
counting()collectingAndThen - 匯總
summarizingDoublesummingDouble... - 最大值、最小值
maxByminBy - 平均值
averagingDoubleaveragingInt...
- 求和
- 計算“數據透視表”摘要,例如“賣方最大價值交易”等。
- 分組
groupingBy - 分割
partitioningBy
例子:
- 分組
// Accumulate names into a List
List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());
// Accumulate names into a TreeSet
Set<String> set = people.stream().map(Person::getName).collect(Collectors
.toCollection(TreeSet::new));
// Convert elements to strings and concatenate them, separated by commas
String joined = things.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
// Compute sum of salaries of employee
int total = employees.stream()
.collect(Collectors.summingInt(Employee::getSalary)));
// Group employees by department
Map<Department, List<Employee>> byDept = employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment));
// Compute sum of salaries by department
Map<Department, Integer> totalByDept = employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment,
Collectors.summingInt(Employee::getSalary)));
// Partition students into passing and failing
Map<Boolean, List<Student>> passingFailing = students.stream()
.collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));