Java 8函數式編程學習筆記
author:Gavin
date:2018/11/09
什么是函數式編程
在思考問題時,使用不可變值和函數,函數對一個值進行處理,映射成另一個值。
函數接口
函數接口是只有一個抽象方法的接口,用作Lambda 表達式的類型。
Java中重要的函數接口
| 接口 | 參數 | 返回類型 | 示例 |
|---|---|---|---|
| Predicate<T> | T | boolean | 今天下雨嗎? |
| Consumer<T> | T | void | 輸出一個值 |
| Function<T, R> | T | R | 獲得Artist對象的名字 |
| Supplier<T> | None | T | 工廠方法 |
| UnaryOperator<T> | T | T | 邏輯非(!) |
| BinaryOperator<T> | (T, T) | T | 求兩個數的乘積(*) |
常用的流操作
collect(toList())
collect(toList()) 方法由 Stream 里的值生成一個列表,是一個及早求值操作。
List<String> collected = Stream.of("a", "b", "c")
.collect(Collectors.toList());
assertEquals(Arrays.asList("a", "b", "c"), collected);
map
如果一個函數可以將一種類型的值轉換成另外一種類型,map操作就可以使用該函數,將一個流中的值轉換成一個新的流。
List<String> collected = Stream.of("a", "b", "hello")
.map(string -> string.toUpperCase())
.collect(toList());
assertEquals(asList("A", "B", "HELLO"), collected);

傳給map的 Lambda 表達式只接受一個 String 類型的參數,返回一個新的 String。參數和返回值不必屬于同一種類型,但是 Lambda 表達式必須是 Function 接口的一個實例,Function 接口是只包含一個參數的普通函數接口。

filter
遍歷數據并檢查其中的元素時,可嘗試使用 Stream 中提供的新方法 filter。

List<String> beginningWithNumbers
= Stream.of("a", "1abc", "abc1")
.filter(value -> isDigit(value.charAt(0)))
.collect(toList());
assertEquals(asList("1abc"), beginningWithNumbers);
和map很像,filter 接受一個函數作為參數,該函數用 Lambda 表達式表示。該函數和前面示例中 if 條件判斷語句的功能一樣,如果字符串首字母為數字,則返回 true。若要重構遺留代碼,for 循環(huán)中的 if 條件就是一個很強的信號,可用 filter 方法替代。
由于此方法和 if 條件語句的功能相同,因此其返回值肯定是 true 或者 false。經過過濾, Stream 中符合條件的,即 Lambda 表達式值為 true 的元素被保留下來。該 Lambda 表達式的函數接口正是前面的 Predicate。

flatMap
flatMap 方法可用 Stream 替換值,然后將多個 Stream 連接成一個 Stream。


前面的 map 操作可以用一個新的值替代 Stream 中的值。有時,希望讓 map 操作有點變化,生成一個新的 Stream 對象取而代之。用戶通常不希望結果是一連串的流,此時 flatMap 最能排上用場。
List<Integer> together = Stream.of(asList(1,2), asList(3,4))
.flatMap(numbers -> numbers.stream())
.collect(toList());
assertEquals(asList(1,2,3,4), together);
調用 stream 方法,將每個列表轉換成 Stream 對象,其余部分由 flatMap 方法處理。flatMap 方法的相關函數接口和 map 方法的一樣,都是 Function 接口,只是方法的返回值限定為 Stream 類型罷了。
max 和 min
Stream 上常用的操作之一是求最大值和最小值。Stream API 中的 max 和 min 操作足以解決這一問題。
List<Track> tracks = asList(new Track("Bakai", 524),
new Track("Violets for Your Furs", 378),
new Track("Time Was", 451));
Track shortestTrack = tracks.stream()
.min(Comparator.comparing(track -> track.getLength())).get();
assertEquals(tracks.get(1), shortestTrack);
查找 Stream 中最大或最小元素,首先要考慮的是用什么作為排序的指標。
為了讓 Stream 對象按照曲目長度進行排序,需要傳給它一個 Comparator 對象。Java 8 提供了一個新的靜態(tài)方法 comparing,使用它可以方便地實現一個比較器。
reduce
reduce 操作可以實現一組值中生成一個值。在上述例子中用到的 count、min 和 max 方法,因為常用而被納入標準庫中。事實上,這些方法都是 reduce 操作。
下圖展示了如何通過 reduce 操作對 Stream 中的數字求和。以0作七點-一個空 Stream 的求和結果, accumulator 的值就是所有元素的和。

Lambda 表達式就是 reducer,它執(zhí)行求和操作,有兩個參數:傳入 Stream 中的當前元素和 acc。將兩個參數相加,acc 是累加器,保存著當前的累加結果。
int count = Stream.of(1, 2, 3).reduce(0, (acc, ele) -> acc + ele);
assertEquals(6, count);
Lambda 表達式的返回值是最新的 acc,是上一輪 acc 的值和當前元素相加的結果。reducer 的類型是 BinaryOperator。
展開 reduce 操作:
BinaryOperator<Integer> accumulator = (acc, ele) -> acc + ele;
int count = accumulator.apply(
accumulator.apply(
accumulator.apply(0, 1),
2),
3);
reduce 過程的中間值:
| 元素 | acc | 結果 |
|---|---|---|
| N/A | N/A | 0 |
| 1 | 0 | 1 |
| 2 | 1 | 3 |
| 3 | 3 | 6 |
高階函數
高階函數是指接受另外一個函數作為參數,或返回一個函數的函數。高階函數不難辨認:看函數簽名就夠了。如果函數的參數列表里包含了函數接口,或該函數返回一個函數接口,那么該函數就是高階函數。
map 是一個高階函數,因為它的 mapper 參數是一個函數。事實上 Stream 接口中幾乎所有的函數都是高階函數。之前的排序用到了 comparing 函數,它接受一個函數作為參數,獲取相應的值,同時返回一個 Comparator。Comparator 可能會被誤以為是一個對象,但它有且只有一個抽象方法,所以實際上是一個函數接口。
事實上,可以大膽斷言,Comparator 實際上應該是個函數,但是那時的 Java 只有對象,因為才造出了一個類,一個匿名類。
正確使用 Lambda 表達式
明確要達成什么轉化,而不是說明如何轉化的另外一層含義在于寫出的函數沒有副作用。這一點非常重要,這樣只通過函數的返回值就能充分理解函數的全部作用。
沒有副作用的函數不會改變程序或外界的狀態(tài)。在 Lambda 表達式中使用局部變量,可以不使用 final 關鍵字,但局部變量在既成事實上必須是 final 的。
不論何時,將 Lambda 表達式傳給 Stream 上的高階函數,都應該盡量避免副作用。唯一的例外是 forEach 方法,它是一個終結方法。
基本類型
Java 的泛型是基于對泛型參數類型的擦除-換句話說,假設它是 Object 對象的實例,因此只有裝箱類型才能作為泛型參數。這就解釋了為什么在 Java 中想要一個包含整型值的列表 List<int>,實際上得到的卻是一個包含整型對象的列表List<Integer>。
麻煩的是,由于裝箱類型是對象,因此在內存中存在額外的開銷。比如,整型在內存中占用4字節(jié),整型對象卻要占用16字節(jié)。這一情況在數組上更加嚴重,整型數組中的每個元素只占用基本類型的內存,而整型對象數組中,每個元素都是內存中的一個指針,指向 Java 堆中的某個對象。在最壞的情況下,同樣大小的數組,Integer[]要比 int[] 多占用 6 倍內存。
將基本類型轉換為裝箱類型,稱為裝箱,反之則稱為拆箱,兩者都需要額外的計算開銷。對于需要大量數值運算的算法來說,裝箱和拆箱的計算開銷,以及裝箱類型占用的額外內存,會明顯減緩程序的運行速度。
為了減少這些開銷,Stream 類的某些方法對基本類型和裝箱類型做了區(qū)分。下圖的高階函數 mapToLong 和其它類似函數即為該方面的一個嘗試。在 Java 8 中,僅對整型、長整型和雙浮點型做了特殊處理,因為它們在數值計算中用的最多,特殊處理后的系統(tǒng)性能提升效果最明顯。

對基本類型做特殊處理的方法在命名上有明確的規(guī)范。如果方法返回類型為基本類型,則在基本類型前面加 To,如上圖所示的 ToLongFunction。如果參數類型是基本類型,則不加前綴只需類型名即可,如下圖中的 LongFunction。如果高階函數使用基本類型,則在操作后加后綴 To 再加基本類型,如 mapToLong。

這些基本類型都有與之對應的 Stream,以基本類型名為前綴,如 LongStream。事實上,mapToLong 方法返回的不是一個一般的 Stream,而是一個特殊處理的 Stream。在這個特殊的 Stream 中,map 方法的實現方式也不同,它接受一個 LongUnaryOperator 函數,將一個長整型值映射成另一個長整型值,如下圖所示。通過一些高階函數裝箱方法,如 mapToObj,也可以從一個基本類型的 Stream 得到一個裝箱后的 Stream,如 Stream<Long>。

使用 summaryStatistics 方法統(tǒng)計曲目長度
public static void printTrackLengthStatistics(Album album) {
IntSummaryStatistics trackLengthStats
= album.getTracks()
.mapToInt(track -> track.getLength())
.summaryStatistics();
System.out.println("Max: %d, Min: %d, Ave: %f, Sum: %d",
trackLengthStats.getMax(),
trackLengthStats.getMin(),
trackLengthStats.getAverage(),
trackLengthStats.getSum());
}
這些統(tǒng)計在所有特殊處理的 Stream,如 DoubleStream、LongStream 中都可以得出。
@FunctionInterface
為了提供 Stream 對象可操作性而引入各種新接口,都需要有 Lambda 表達式可以實現它。它們存在的意義在于將代碼塊作為數據打包起來。因此,它們都添加了 @FunctionInterface注釋。
該注釋會強制 javac 檢查一個接口是否符合函數接口的標準。如果該注釋添加給一個枚舉類型、類或另外一個注釋,或者接口包含不止一個抽象方法,javac 就會報錯。
默認方法
Collection 接口中增加了新的 stream 方法,如果讓其它的類在不知道該方法的情況下通過編譯?Java 8 通過如下方式解決該問題:Collection 接口告訴它所有的子類:“如果你沒有實現 stream 方法,就使用我的吧?!?接口中這樣的方法叫做默認方法,在任何接口中,無論函數接口還是非函數接口,都可以使用該方法。
默認方法示例:forEach 實現方式
default void forEach(Consumer<? extends t> action) {
for (T t : this) {
action.accpet(t);
}
}
Optional
reduce 方法的一個重點尚未體積:reduce 方法有兩種形式,一種如前面出現的需要有一個初始值,另一種便是則不需要有初始值。沒有初始值的情況下,reduce 的第一步使用 Stream 中的前兩個元素。有時,reduce 操作不存在有意義的初始值,這樣做就是有意義的,此時,reduce方法返回一個 Optional 對象。
使用 Optional 對象有兩個目的:首先,Optional 對象鼓勵程序員適時檢查變量是否為空,以避免代碼缺陷;其次,它將一個類的 API 中可能為空的值文檔化,這比閱讀實現代碼要簡單很多。
創(chuàng)建某個值的 Optional 對象
Optional<String> a = Optional.of("a");
assertEquals("a", a.get());
Optional 對象也可能為空,因此還有一個對應的工廠方法 empty,另外一個工廠方法 ofNullable 則可以將一個空值轉換成 Optional 對象。
創(chuàng)建一個空的 Optional 對象,并檢查其是否有值
Optional emptyOptional = Optional.empty();
Optional alsoEmpty = Optional.ofNullable(null);
assertEquals(emptyOptional.isPresent());
assertTrue(a.isPresent());
使用 Optional 對象的方式之一是在調用 get() 方法前,先使用 isPresent 檢查 Optional 對象是否有值。使用 orElse 方法則更簡潔,當 Optional 對象為空時,該方法提供了一個備選值。如果計算備選值在計算上太過繁瑣,即可使用 orElseGet 方法。該方法接受一個 Supplier 對象,只有在 Optional 對象真正為空時才會調用。
使用 orElse 和 orElseGet 方法
assertEquals("b", emptyOptional.orElse("b"));
assertEquals("c", emptyOptional.orElseGet(() -> "c"));
高級集合類和收集器
方法引用
Lambda 表達式經常調用參數。比如想得到藝術家的名字,Lambda 的表達式如下:
artist -> artist.getName()
Java 8 提供了一個簡寫語法,叫做方法引用,幫助程序員重用已有方法。代碼如下:
Artist::getName
標準語法為 Classname::methodName。雖然這是一個方法,但是不需要在后面加括號,因為這里并不調用該方法。我們只是提供了和 Lambda 表達式等價的一種結構,在需要時才會調用。凡是使用 Lambda 表達式的地方,就可以使用方法引用。
構造函數同樣也有縮寫形式,如下:
(name, nationality) -> new Artist(name, nationality)
// 使用方法引用,上述代碼可寫為:
Artist::new
另外一個要注意的地方是方法引用自動支持多個參數,前提是選對了正確的函數接口。
還可以用這種方式創(chuàng)建數組,下面的代碼創(chuàng)建了一個字符串型的數組:
String[]::new
使用收集器
標準類庫已經提供了一些有用的收集器:java.util.stream.Collectors。
轉換成其它集合
有一些收集器可以生成其它集合。比如 toList,生成了 java.util.List 類的實例。還有 toSet 和 toCollection,分別生成 Set 和 Collection類的實例。
通常情況下,創(chuàng)建集合時需要調用適當的構造函數指明集合的具體類型:
List<Artist> artiests = new ArrayList<>();
但是調用 toList 或者 toSet 方法時,無需指定具體的類型。Stream 類庫在背后自動為你挑選了合適的類型。你可能希望使用 TreeSet,而不是由框架在背后自動為你指定一種類型的 Set。此時就可以使用 toCollection,它接受一個函數作為參數,來創(chuàng)建集合。
使用 toCollection,用定制的集合收集元素
stream.collect(toCollection(TreeSet::new));
轉換成值
還可以利用收集器讓流生成一個值。maxBy 和 minBy 允許用戶按某種特定的順序生成一個值。下例展示了如何找出成員最多的樂隊。它使用了一個 Lambda 表達式,將藝術家映射為成員變量,然后定義了一個比較器,并將比較器傳入 maxBy 收集器。
找出成員最多的樂隊
public Optional<Artist> biggestGroup(Stream<Artist> artists) {
Function<Artist, Long> getCount = artist -> artist.getMembers().count();
return artists.collect(maxBy(comparing(getCount)));
}
找出一組專輯上曲目的平均數
public double averageNumberOfTracks(List<Artist> albums) {
return albums.stream()
.collect(averagingInt(album -> album.getTrackList().size()));
}
事實上,Java 8 也提供了能完成類似功能的收集器,如 averagingInt??梢允褂?summingInt 及其重載方法求和。 SummaryStatistics也可以使用 summingInt 及其組合收集。
數據分塊
另外一個常用的操作是將其分解成兩個集合。假設有一個藝術家組成的流,你可能希望將其分成兩個部分,一部分是獨唱歌手,另一部分是有多人組成的樂隊??梢允褂?partitioningBy 收集器,它接受一個流,并將其分成兩部分,如下圖所示。它使用 Predicate 對象判斷一個元素應該屬于哪部分,并根據布爾值返回一個 Map 到列表。因此,對于 true List 中的元素, Predicate 返回 true;對其它 List 中的元素,Predicate 返回 false。

將藝術家組成的流分成樂隊和獨唱歌手兩部分
public Map<Boolean, List<Artist>> bandsAndSolo(Stream<Artist> artists) {
// 使用方法引用代替 Lambda 表達式
// return artists.collect(partitioningBy(Artist::isSolo));
return artists.collect(partitioningBy(artist -> artist.isSolo()));
}
數據分組
數據分組是一種更自然的分割數據操作,與將數據分成 true 和 false 兩部分不同,可以使用任意值對數據分組。比如現在有一由專輯組成的流,可以按專輯當中的主唱對專輯分組。
使用主唱對專輯分組
public Map<Artist, List<Album>> albumsByArtist(Stream<Album> albums) {
return albums.collect(groupingBy(album -> album.getMainMusician()));
}
調用流的 collect 方法,傳入一個收集器。groupingBy 收集器接受一個分類函數,用來對數據分組,就像 partitioningBy 一樣,接受一個 Predicate 對象將數據分成 true 和 false 兩部分。我們使用的分類器是一個 Function 對象,和 map 操作用到的一樣。

字符串
很多時候,收集流中的數據都是為了在最后生成一個字符串。
使用流和收集器格式化藝術家姓名
String result =
artists.stream()
.map(Artist::getName)
.collect(Collectors.joining(", ", "[", "]"));
這里使用 map 操作提取出藝術家的姓名,然后使用 Collectors.joining 收集流中的值,該方法可以方便地從一個流得到一個字符串,允許用戶提供分隔符(用以分割元素)、前綴和后綴。
組合收集器
之前我們使用主唱將專輯分組,現在來考慮如何計算一個藝術家的專輯數量。一個簡單的方案是使用前面的方法對專輯先分組后計數。
使用收集器計算每個藝術家的專輯數
public Map<Artist, Long> numberOfAlbums(Stream<Album> albums) {
return albums.collect(groupingBy(album -> album.getMainMusician(), counting()));
}
groupingBy 先將元素分成塊,每塊都與分類函數 getMainMusician 提供的鍵值相關聯,然后使用下游的另一個收集器收集每塊中的元素,最好將結果映射為一個 Map。
使用收集器求每個藝術家的專輯名
public Map<Artist, List<String>> nameOfAlbums(Stream<Album> albums) {
return albums.collect(groupingBy(Album::getMainMusician,
mapping(Album::getName, toList())));
}
每個收集器都是生成最終值的一劑良方。這里需要兩劑配方,一個傳給另一個。Oracle 提供了 mapping 收集器。
mapping 允許在收集器的容器上執(zhí)行類似 map 的操作。但是需要指明使用什么樣的集合類存儲結果,比如 toList。這些收集器就像烏龜疊羅漢,龜龜相馱以致無窮。
mapping 收集器和 map 方法一樣,接受一個 Function 對象作為參數。
這兩個例子我們都用到了第二個收集器,用以收集最終結果的一個子集。這些收集器叫作下游收集器。收集器是生成最終結果的一劑配方,下游收集器則是生成部分結果的配方,主收集器中會用到下游收集器。這種組合使用收集器的方式,使得它們在 Stream 類庫中的作用更加強大。
那些作為基本類型特殊定制的函數,如 averagingInt、summarizingLong等,事實上和調用特殊 Stream 上的方法是等價的,加上它們是為了將它們當做下游收集器來使用的。
重構和定制收集器
使用 reduce 和 StringBuilder 格式化藝術家姓名
StringBuilder reduced =
artists.stream()
.map(Artist::getName)
.reduce(new StringBuilder(), (builder, name) -> {
if (builder.length() > 0)
builder.append(", ");
builder.append(name);
return builder;
}, (left, right) -> left.append(right));
reduced.insert(0, "[");
reduced.append("]");
String result = reduced.toString();
使用 reduce 和 StringCombiner 類格式化藝術家姓名
StringCombiner combined =
artists.stream()
.map(Artist::getName)
.reduce(new StringCombiner(", ", "[", "]"),
StringCombiner::add,
StringCombiner::merge);
String result = combined.toString();
代碼大相徑庭,背后工作是一樣的。我們使用 reduce 操作將姓名和分隔符連接成一個 StringCombiner 對象。不過這次連接姓名操作被代理到了 StringCombiner.add 方法,而連接兩個連接器操作被 StringCombiner.merge 方法代理。
add 方法返回連接新元素后的結果
public StringCombiner add(String element) {
if (areAtStart()) {
builder.append(prefix);
} else {
builder.append(delim);
}
builder.append(element);
return this;
}
add 方法在內部將操作代理給一個 StringBuilder 對象。如果剛開始進行連接,則在最前面添加前綴,否則添加分隔符,然后再添加新的元素。這里返回一個 StringCombiner 對象,因為這是傳給 reduce 操作所需的類型。合并也是同樣的道理,內部操作代理給 StringBuilder 對象。
merge 方法連接兩個 StringCombiner 對象
public StringCombiner merge(StringCombiner other) {
builder.append(other.builder);
return this;
}
使用 reduce 操作,將工作代理給 StringCombiner 對象
String result =
artists.stream()
.map(Artist::getName)
.reduce(new StringJoiner(", ", "[", "]"),
StringJoiner::add,
StringJoiner::merge)
.toString();
因為現有的邏輯在程序中不能重用,因此,我們想將 reduce 操作重構為一個收集器,在程序中任何地方都能使用。不妨將這個收集器叫做 StringCollector。
使用定制的收集器 StringCollector 收集字符串
String result = artists.stream()
.map(Artist::getName)
.collect(new StringCollect(", ", "[", "]"));
既然已經將所有對字符串的連接操作代理給了定制收集器,應用程序就無需關心 StringCollector 對象的任何內部細節(jié),它和框架中其它 Collector 對象用起來是一樣的。
先來實現 Collector 接口,由于 Collector 接口支持泛型,因此先得確定一些具體的類型:
- 待收集元素的類型,這里是 String;
- 累加器的類型 StringJoiner;
- 最終結果的類型,這里依然是 String。
定義字符串收集器
/**
* 自定義StringCollector收集器,需要三個參數:
*
* <pre>
* <li>delimiter:分隔符</li>
* <li>prefix:前綴</li>
* <li>suffix:后綴</li>
* </pre>
*
* @author Gavin
*
*/
public class StringCollector implements Collector<String, StringJoiner, String> {
private final String delimiter;
private final String prefix;
private final String suffix;
/**
* 默認構造函數
*
* @param delimiter
* 分隔符
* @param prefix
* 前綴
* @param suffix
* 后綴
*/
public StringCollector(String delimiter, String prefix, String suffix) {
this.delimiter = delimiter;
this.prefix = prefix;
this.suffix = suffix;
}
/**
* accumulator 是一個函數,將當前元素疊加到收集器
*/
@Override
public BiConsumer<StringJoiner, String> accumulator() {
return StringJoiner::add;
}
/**
* characteristics 返回一個不可變的Set收集器.
*/
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return new HashSet<>();
}
/**
* combiner 合并兩個容器
*/
@Override
public BinaryOperator<StringJoiner> combiner() {
return StringJoiner::merge;
}
/**
* finisher 方法返回收集操作的最終結果
*/
@Override
public Function<StringJoiner, String> finisher() {
return StringJoiner::toString;
}
/**
* Supplier 是創(chuàng)建容器的工廠
*/
@Override
public Supplier<StringJoiner> supplier() {
return () -> new StringJoiner(delimiter, prefix, suffix);
}
}
收集器的每一個組件都是函數,因此我們使用箭頭表示,流中的值用圓圈表示,最終生成的值用橢圓表示。收集操作一開始,Supplier 先創(chuàng)建出新的容器。

收集器的 accumulator 的作用和 reduce 操作的第二個參數一樣,它結合之前操作的結果和當前值,生成并返回新的值。這一邏輯已經在 StringJoiner 的 add 方法中得以實現,直接引用就好了。
accumulator 是一個函數,它將當前元素疊加到收集器
public BiConsumer<StringJoiner, String> accumulator() {
return StringJoiner::add;
}
這里的 accumulator 用來將流中的值疊加入容器中。

combine 方法很像 reduce 操作的第三個方法。如果有兩個容器,我們需要將其合并。同樣,在前面重構中我們已經實現了該功能,直接使用 StringJoiner.merge 方法就行了。
combiner 合并兩個容器
public BinaryOperator<StringJoiner> combiner() {
return StringJoiner::merge;
}
在收集階段,容器被 combiner 方法成對合并進一個容器,直到最后只剩一個容器為止(如下圖所示)。

在使用收集器之前,重構的最后一步將 toString 方法內聯到方法鏈的末端,這就將 StringJoiner 轉換成了我們想要的字符串。

收集器的 finisher 方法作用相同。我們已經將流中的值疊加入一個可變容器中,但這還不是我們想要的最終結果。這里調用了 finisher 方法,以便進行轉換。在我們想創(chuàng)建字符串等不可變的值時特別有用,這里容器是可變的。
為了實現 finisher 方法,只需將該操作代理給已經實現的 toString 方法即可。
finisher 方法返回收集器操作的最終結果
public Function<StringJoiner, String> finisher() {
return StringJoiner::toString();
}
從最后剩下的容器中得到最終結果。
關于收集器,還有一點一直沒有提及,那就是特征,特征是一組描述收集器的對象,框架可以適當對其優(yōu)化。characteristic 方法定了特征。
此時,finisher 方法不需要對容器做任何操作。更正式地說,此時的 finisher 方法其實就是 identity 函數:它返回傳入參數的值。如果這樣,收集器就展現了 IDENTITY_FINISH 的特征,需要使用 characteristics 方法聲明。
對收集器的歸一化處理
就像之前看到的那樣,定制收集器其實不難,但如果你想為自己領域內的類定制一個收集器,不妨考慮一下其他替代方案。最容易想到的方案是構建若干個集合對象,作為參數傳給領域內類的構造函數。如果領域內的類包含多種集合,這種方式又簡單又適用。
reducing 是一種定制收集器的簡便方式
String result =
artist.stream()
.map(Artist::getName)
.collect(Collectors.reducing(
new StringJoiner(", ", "[", "]"),
name -> new StringJoiner(", ", "[", "]").add(name),
StringJoiner::merge))
.toString();
這種方式非常低效,這也是我們要定制收集器的原因之一。
練習
-
方法引用
-
[x] 轉換大寫的 map 方法;
Stream.of("hello", "world").map(String::toUpperCase); -
[x] 使用 reduce 實現 count 方法;
Collectors.reducing(0L, e -> 1L, Long::sum); -
[ ] 使用 flatMap 連接列表。
-
-
收集器
-
[x] 找出名字最長的藝術家,分別使用收集器和 reduce 高階函數實現。然后對比二者的異同:哪一種方式寫起來更簡單,哪一種凡是讀起來更簡單?以下面的參數為例,該方法的正確返回值為 “Stuart Sutcliffe”:
Stream<String> names = Stream.of("John Lennon", "Paul McCartney", "George Harrison", "Ringo Starr", "Pete Best", "Stuart Sutcliffe");private static Comparator<Artist> byNameLength = comparing(artist -> artist.getName().length()); public static Artist byReduce(List<Artist> artists) { return artists.stream().reduce((acc, artist) -> { return (byNameLength.compare(acc, artist) >= 0) ? acc : artist; }).orElseThrow(RuntimeException::new); } public static Artist byCollecting(List<Artist> artists) { return artists.stream().collect(Collectors.maxBy(byNameLength)).orElseThrow(RuntimeException::new); } -
[x] 假設一個元素為單詞的流,計算每個單詞出現的次數。假設輸入如下,則返回值為一個形如[John -> 3, Paul -> 2, George -> 1] 的 Map:
Stream<String> names = Stream.of("John", "Paul", "George", "John", "Paul", "John");public static Map<String, Long> countWords(Stream<String> names) { return names.collect(groupingBy(name -> name, Collectors.counting())); } -
[x] 用一個定制的收集器實現 Collectors.groupingBy 方法,不需要提供一個下游收集器,只需實現一個最簡單的即可。別看 JDK 的源碼,這是作弊!提示:可從下面這行代碼開始:
public class GroupingBy<T, K> implements Collector<T, Map<K, List<T>>, Map<K, List<T>>>這是一個進階練習,不妨最后再嘗試這道習題。
public class GroupingBy<T, K> implements Collector<T, Map<K, List<T>>, Map<K, List<T>>> { private final static Set<Characteristics> characteristics = new HashSet<>(); static { characteristics.add(Characteristics.IDENTITY_FINISH); } private final Function<? super T, ? extends K> classifier; public GroupingBy(Function<? super T, ? extends K> classifier) { this.classifier = classifier; } @Override public Supplier<Map<K, List<T>>> supplier() { return HashMap::new; } @Override public BiConsumer<Map<K, List<T>>, T> accumulator() { return (map, ele) -> { K key = classifier.apply(ele); List<T> elements = map.computeIfAbsent(key, k -> new ArrayList<>()); elements.add(ele); }; } @Override public BinaryOperator<Map<K, List<T>>> combiner() { return (left, right) -> { right.forEach((key, value) -> { left.merge(key, value, (leftValue, rightValue) -> { leftValue.addAll(rightValue); return leftValue; }); }); return left; }; } @Override public Function<Map<K, List<T>>, Map<K, List<T>>> finisher() { return map -> map; } @Override public Set<Characteristics> characteristics() { return characteristics; } }
-
-
改進Map
使用 Map 的 computeIfAbsent 方法高效計算斐波那契數列。這里的 “高效” 是指避免將那些較小的序列重復計算多次。
public class Fibonacci { private final Map<Integer, Long> cache; public Fibonacci() { cache = new HahMap<>(); cache.put(0, 0L); cache.put(1, 1L); } public long fibonacci(int x) { return cache.computeIfAbsent(x, n -> fibonacci(n - 1) + fibonacci(n - 2)); } }
數據并行化
并行和并發(fā)
并發(fā)是兩個任務共享時間段,并行則是兩個任務在同一個時間發(fā)生,比如運行在多核 CPU 上。如果一個程序要運行兩個任務,并且只有一個 CPU 給它們分配了不同的時間片,那么這就是并發(fā),而不是并行。兩者區(qū)別如下圖所示。

并行化是指為縮短任務執(zhí)行時間,將一個任務分解成幾部分,然后并行執(zhí)行。這和順序執(zhí)行的任務的任務量是一樣的,區(qū)別就像用更多的馬來拉車,花費的時間自然減少了。實際上,和順序執(zhí)行相比,并行化執(zhí)行任務時,CPU承載的工作量更大。
數據并行化:數據并行化是指將數據分成塊,為每塊數據分配單獨的處理單元。還是拿馬拉車那個例子打比方,就像從車里取出一些貨物,放到另一輛車上,兩輛馬車都沿著同樣的路徑到達目的地。
當需要在大量數據上執(zhí)行同樣的操作時,數據并行化很管用。它將問題分解為可在多塊數據上求解的形式,然后對每塊數據執(zhí)行運算,最后將各數據塊上得到的結果匯總,從而獲得最終答案。
并行化流操作
并行化操作流只需改變一個方法調用。如果已經有一個 Stream 對象,調用它的 paraller方法就能讓其擁有并行操作的能力。如果想從一個集合類創(chuàng)建一個流,調用 parallelStream就能立即獲得一個擁有并行能力的流。
并行化計算專輯曲目長度
public int parallelArraySum() {
return albums.parallelStream()
.flatMap(Album::getTracks)
.mapToInt(Track::getLength)
.sum();
}
性能
影響并行流的主要因素有5個,依次分析如下:
-
數據大小
輸入數據的大小會影響并行化處理對性能的提升。
-
源數據結構
每個管道的操作都基于一些初始數據源,通常是集合,將不同的數據源分割相對容易,這里的開銷影響了在管道中并行處理數據時到底能帶來多少性能上的提升。
-
裝箱
處理基本類型比處理裝箱類型要快。
-
核的數量
極端情況下,只有一個核,因此完全沒必要并行化。顯然,擁有的核越多,獲得潛在性能提升的幅度就越大。在實際中,核的數量不單指你的機器上有多少核,更是指運行時你的機器能使用多少核。這也就是說同時運行的其它進程,或者線程相關性(強制線程在某些核或 CPU 上運行)會影響性能。
-
單元處理開銷
比如數據大小,這是一場并行執(zhí)行花費時間和分解合并操作開銷之間的戰(zhàn)爭。花在流中每個元素身上的時間越長,并行操作帶來的性能提升越明顯。
并行求和
private int addIntegers(List<Integer> values) {
return values.parallelStream()
.mapToInt(i -> i)
.sum();
}
在底層,并行流還是沿用了 for/join 框架。fork 遞歸式地分解問題,然后每段并行執(zhí)行,最終由 join 合并結果,返回最后的值。

假設并行流將我們的工作分解開,在一個四核的機器上并行執(zhí)行。
- 數據被分成四塊。
- 如上圖所示,計算工作在每個線程里并行執(zhí)行。這包括將每個 Integer 對象映射為
int值,然后在每個線程里將 1/4 的數字相加。理想情況下,我們希望在這里花的時間越多越好,因為這里時并行操作的最佳場所。 - 然后合并結果。就是
sum操作,但這也可能是reduce、collect或其它終結操作。
根據問題的分解方式,初始化的數據源的特性變得尤其重要,它影響了分解的性能。直觀上看,能重復將數據結構對半分解的難易程度,決定了分解操作的快慢。能對半分解同時意味著待分解的值能夠被等量地分解。
我們可以根據性能的好壞,將核心類庫提供的通用數據結構分成以下 3 組。
-
性能好
ArrayList、數組或 IntStream.range,這些數據結構支持隨機讀取,也就是說它們能輕而易舉地被任意分解。
-
性能一般
HashSet、TreeSet,這些數據結構不易公平地被分解,但是大多數時候分解是可能的。
-
性能差
有些數據結構難易分解,比如,可能要花
O(N)的時間復雜度來分解問題。其中包括LinkedList,對于半分解太難了。還有Streams.iterate和BufferedReader.lines,它們長度未知,因此很難預測該在哪里分解。
初始的數據結構影響巨大。舉一個極端的例子,對比對 10 000 個整數并行求和,使用 ArrayList 要比使用 LinkedList 快 10 倍。這不是說業(yè)務邏輯的性能情況也會如此,只是說明了數據結構對于性能的影響之大。形如使用 LinkedList 這樣難于分解的數據結構并行運行可能更慢。
單獨操作每一塊的種類時,可以分成兩種不同的操作:無狀態(tài)的和有狀態(tài)的。
如果能避開有狀態(tài)的,選用無狀態(tài)操作,就能獲得更好的并行性能。無狀態(tài)操作包括 map、filter 和 flatMap,有操作包括 sorted、distinct 和 limit。
并行化數組操作
這些操作都在工具類 Arrays 中。
| 方法名 | 操作 |
|---|---|
| parallelPrefix | 任意給定一個函數,計算數組的和 |
| parallelSetAll | 使用 Lambda 表達式更新數組元素 |
| parallelSort | 并行化對數組元素排序 |
使用 Lambda 表達式編寫并發(fā)程序
Future
構建復雜并行操作的另外一種方案是使用 Future。Future 像一張欠條,方法不是返回一個值,而是返回一個 Future 對象,該對象第一次創(chuàng)建時沒有值,但以后能拿它 "換回" 一個值。
調用 Future 對象的 get方法獲取值,它會阻塞當前線程,直到返回值。
使用 Future 從外部網站下載專輯信息
@Override
public Album lookupByName(String albumName) {
Future<Credentials> trackLogin = loginTo("track");
Future<Credentials> artistLogin = loginTo("artist");
try {
Future<List<Track>> tracks = lookupTracks(albumName, trackLogin.get());
Future<List<Track>> artists = lookupArtists(albumName, artistLogin.get());
} catch(InterruptedException | ExecutionException e) {
throw new AlbumLookupException(e.getCause());
}
}
如果要將 Future 對象的結果傳給其它任務,會阻塞當前線程的執(zhí)行。這會成為一個性能問題,任務不是平行執(zhí)行了,而是串行執(zhí)行了。
這意味著在登錄兩個服務之前,我們無法啟動任何查找任務。沒必要這樣:lookupTracks 只需要自己的登錄憑證,lookupArtists 也是一樣。我們將理想的行為圖描述出來。

可以將對 get 的調用放到 lookupTracks 和 lookupArtist 方法的中間,這能解決問題,但是代碼丑陋,而且無法再多次調用之間重用登錄憑證。
我們真正需要的是不必調用 get 方法阻塞當前線程,就能操作 Future 對象返回的結果。我們需要將 Future 和回調結合起來使用。
CompletableFuture
這些問題的解決之道 CompletableFuture,它結合了 Future 對象打欠條的注意和使用回調處理事件驅動的任務。其要點是可以組合不同的實例,而不用擔心末日金字塔問題。
使用 CompletableFuture 從外部網站下載專輯信息
public Album lookupByName(String albumName) {
CompletableFuture<List<Artist>> artistLookup
= loginTo("artist")
.thenCompose(artistLogin -> lookupArtists(albumName, artistLogin));
return loginTo("track")
.thenCompose(trackLogin -> lookupTracks(albumName, trackLogin))
.themCombine(artistLookup, (tracks, artist)
-> new Album(albumName, tracks, artists))
.join();
}
loginTo、lookupArtists 和 lookupTracks 方法均返回 CompletableFuture ,而不是 Future。CompletableFuture API 的技巧是注冊 Lambda 表達式,并且把高階函數鏈接起來。方法不同,但道理和 Stream API 的設計是相同的。
在調用最終的方法之前,無法保證 CompletableFuture 對象已經生成結構。CompletableFuture 對象實現了 Future 接口,可以調用 get 方法獲取值。CompletableFuture 對象包含 join 方法,我們在上面調用了該方法,它的作用和 get 方法是一樣的,而且它沒有使用 get 方法時令人倒胃口的檢查異常。
CompletableFuture 的常用情景之一是異步執(zhí)行一段代碼,該段代碼計算并返回一個值。有一個工廠方法 supplyAsnc,用來創(chuàng)建 CompletableFuture 實例。
異步創(chuàng)建 CompletableFuture 實例的示例代碼
CompletableFuture<Track> lookupTrack(String id) {
return CompletableFuture.supplyAsync(() -> {
// 這里會做一些繁重的工作 1
// ...
return track; // 2
}, service); // 3
}
supplyAsync 方法接受一個 Supplier 對象作為參數,然后執(zhí)行它。這里的要點是要執(zhí)行一些耗時的任務,同時不會阻塞當前線程 - 這就是方法名中 Async 的含義。3 處的返回值用來完成 COMP了tableFuture。在 2 處我們提供了一個叫做 service 的 Executor,告訴 COMP了tableFuture 對象在哪里執(zhí)行任務。如果沒有提供 Executor,就會使用相同的 fork/join 線程池并行執(zhí)行。
CompletableFuture 提供了 completeExceptionnally,用于處理異常情況。該方法可以視作 complete 方法的備選項,但不能同時調用 complete 和 completeExceptionally 方法。
出現錯誤時完成 Future
future.completeExceptionanlly(new AlbumLookupException("Unable to find " + name));
CompletableFuture 實例:
- 如果想在鏈的末端執(zhí)行一些代碼而不返回任何值,比如 Comsuper 和 Runnable,那就看看 thenAccept 和 thenRun 方法。
- 可使用 thenApply 方法轉換 CompletableFuture 對象的值,有點像 Stream 的 map 方法。
- 在 CompletableFuture 對象出現異常時,可使用 exceptionally 方法恢復,可以將一個函數注冊到該方法,返回一個代替值。
- 如果你想有一個 map,包含異常情況和正常,請使用 handle 方法。
- 要找出 CompletableFuture 對象到底出了什么問題,可使用 isDone 和 isCompletedExceptionally 方法輔助調查。
CompletableFuture 對于處理并發(fā)任務非常有用,但這并不是唯一的方法。
響應式編程
CompletableFuture 背后的概念可以從單一的返回值推廣到數據流,這就是響應式編程。響應式編程其實是一種聲明式編程方法,它讓程序員以自動的變化和數據流來編程。