引子
首先Lambda配合Stream擁有很強(qiáng)大的數(shù)據(jù)處理能力,并且能夠以更加清晰的表達(dá)方式描述數(shù)據(jù),大大減少了代碼的冗余。在平常開(kāi)發(fā)中,能大大提高開(kāi)發(fā)效率,學(xué)習(xí)它的目的也正因?yàn)槿绱?,此文介紹了一些Lambda相關(guān)的知識(shí)以及一些注意事項(xiàng),避免濫用反而起到反作用。
Lambda基本介紹
Lambda:可以理解為一種匿名函數(shù):它沒(méi)有名稱(chēng),但它有參數(shù)列表、函數(shù)主體、返回類(lèi)型,可能還有一個(gè)可以?huà)伋龅漠惓A斜怼?/p>
Lambda示例
// 在以前,我們使用匿名類(lèi)是這樣:
Thread t = new Thread(new Runnable() {
public void run(){
System.out.println("Hello world");
}
});
// 現(xiàn)在用Lambda表達(dá)式的話(huà),看起來(lái)是這樣:
Thread t = new Thread(() -> System.out.println("Hello world"));
從上面的例子中可以看出,采用匿名內(nèi)部類(lèi)和采用Lambda的寫(xiě)法,Lambda的寫(xiě)法明顯更加精簡(jiǎn)和清晰了
Lambda最基本的構(gòu)成
() -> System.out.println("Hello world");
- 參數(shù)列表: 空參則括號(hào)里面什么都不寫(xiě)
- ->: 把參數(shù)列表與Lambda主體分隔開(kāi)
- Lambda主體: 代碼具體邏輯
帶入?yún)?并且有返回值
// 第一種寫(xiě)法,如果Lambda主體部分不帶花括號(hào),可以不用寫(xiě)return,返回的具體類(lèi)型編譯器會(huì)自動(dòng)推斷
(String s1, String s2) -> s1.concat(s2);
// 第二種寫(xiě)法,如果Lambda主體部分加了花括號(hào),要帶返回值必須加上return,否者就是Void類(lèi)型的匿名函數(shù)
(String s1, String s2) -> {
return s1.concat(s2);
};
默認(rèn)方法
如果要在接口中添加新方法,則必須在實(shí)現(xiàn)該接口的類(lèi)中提供其實(shí)現(xiàn)代碼。為了解決這個(gè)問(wèn)題,Java 8引入了默認(rèn)方法的概念,它允許接口具有默認(rèn)方法,而不會(huì)影響其實(shí)現(xiàn)類(lèi)。默認(rèn)方法不是抽象方法,子類(lèi)實(shí)現(xiàn)了該接口會(huì)繼承該默認(rèn)實(shí)現(xiàn),子類(lèi)也可以覆蓋該默認(rèn)實(shí)現(xiàn)。
對(duì)于學(xué)習(xí)函數(shù)式接口關(guān)系不大,可以當(dāng)做是一個(gè)新特性,如果不打算了解可以直接跳過(guò)。
子類(lèi)可以繼承接口的默認(rèn)方法
// 定義接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
// 定義接口2,接口2繼承了接口1,也默認(rèn)繼承了接口1的默認(rèn)方法
interface MyInterface2 extends MyInterface1 {
}
static class A implements MyInterface2 {
}
static class B implements MyInterface1 {
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
B b = new B();
b.defaultMethod();
}
// 輸出
// defaultMethod1
// defaultMethod1
子類(lèi)可以覆蓋接口的默認(rèn)方法
// 定義接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
static class A implements MyInterface1 {
public void defaultMethod() {
System.out.println("defaultMethod1 from MyInterface1");
}
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 輸出
// defaultMethod1 from MyInterface1
子類(lèi)實(shí)現(xiàn)了兩個(gè)擁有相同默認(rèn)方法,可以通過(guò):接口名稱(chēng).super.方法名()調(diào)用
// 定義接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
interface MyInterface2 {
default void defaultMethod() {
System.out.println("defaultMethod2");
}
}
static class A implements MyInterface1,MyInterface2 {
// 此時(shí)必須實(shí)現(xiàn)該方法,否則通過(guò)不了編譯
@Override
public void defaultMethod() {
// 如果要調(diào)用MyInterface2的默認(rèn)方法,可以使用MyInterface2.super.defaultMethod();
MyInterface2.super.defaultMethod();
}
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 輸出
// defaultMethod2
子類(lèi)對(duì)接口默認(rèn)方法調(diào)用規(guī)則
- 類(lèi)中的方法優(yōu)先級(jí)最高。類(lèi)或父類(lèi)中聲明的方法的優(yōu)先級(jí)高于任何聲明為默認(rèn)方法的優(yōu)先級(jí)。
- 如果無(wú)法依據(jù)第一條進(jìn)行判斷,那么子接口的優(yōu)先級(jí)更高:函數(shù)簽名相同時(shí),優(yōu)先選擇擁有最具體實(shí)現(xiàn)的默認(rèn)方法的接口。
- 最后,如果還是無(wú)法判斷,繼承了多個(gè)接口的類(lèi)必須通過(guò)顯式覆蓋和調(diào)用期望的方法,顯式地選擇使用哪一個(gè)默認(rèn)方法的實(shí)現(xiàn)。
上面第1條規(guī)則以及第3條規(guī)則都已經(jīng)展示過(guò),下面展示第二條規(guī)則,該類(lèi)圖繼承關(guān)系如下:
按照規(guī)則2,MyInterface2比MyInterface1更加具體,所以A會(huì)調(diào)用MyInterface2的defalutMethod
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
interface MyInterface2 extends MyInterface1{
default void defaultMethod() {
System.out.println("defaultMethod2");
}
}
static class A implements MyInterface1,MyInterface2 {
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 輸出
// defaultMethod2
接口靜態(tài)方法
與接口的默認(rèn)方法類(lèi)似,需要加上關(guān)鍵字static,靜態(tài)方法需要,并且由于定義是完整的并且方法是靜態(tài)的,因此在實(shí)現(xiàn)類(lèi)中不能覆蓋或更改這些方法。
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
static void staticMethod() {
System.out.println("staticMethod1");
}
}
public static void main(String[] args) {
MyInterface1.staticMethod();
}
// 輸出
// staticMethod1
過(guò)于簡(jiǎn)單,就不上更多的例子了,下面直接說(shuō)明與默認(rèn)方法的區(qū)別就差不多了解了。
與默認(rèn)方法的相同點(diǎn):
- 靜態(tài)方法與默認(rèn)方法必須要有默認(rèn)的實(shí)現(xiàn);
- 靜態(tài)方法與默認(rèn)方法都不是抽象方法;
與默認(rèn)方法的不同點(diǎn):
- 子類(lèi)實(shí)現(xiàn)了該接口,子類(lèi)不會(huì)繼承靜態(tài)接口方法,并且也不能覆蓋靜態(tài)接口方法,但是子類(lèi)可以定義與父接口一樣的靜態(tài)方法,如果要調(diào)用接口的靜態(tài)方法只能以接口名稱(chēng).方法名()來(lái)調(diào)用;接口的默認(rèn)方法子類(lèi)是可以繼承默認(rèn)方法并且也可以覆蓋的
函數(shù)式接口
說(shuō)起Lambda,就必須了解函數(shù)式接口,因?yàn)橐褂肔ambda,必須在函數(shù)式接口上使用。
函數(shù)式接口:就是一個(gè)有且僅有一個(gè)抽象方法,但是可以有多個(gè)默認(rèn)方法的接口,這樣的接口可以隱式轉(zhuǎn)換為L(zhǎng)ambda表達(dá)式。一般在函數(shù)式接口上都有個(gè)注解@FunctionalInterface,該注解的作用類(lèi)似@Override一樣告訴編譯器這是一個(gè)函數(shù)式接口,用于編譯期間檢測(cè)該接口是否僅有一個(gè)抽象方法,如果擁有多個(gè)則編譯不通過(guò)。如下圖所示
在函數(shù)式接口上使用lambda表達(dá)式
函數(shù)式接口可以被隱式轉(zhuǎn)換為 lambda 表達(dá)式。
如下例子
Thread t = new Thread(() -> System.out.println("Hello world"));
我們可以看看Thread的構(gòu)造:
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
其中入?yún)镽unnable類(lèi)型的接口,繼續(xù)查看Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
可以看出在jdk1.8中,Runnable就是一個(gè)函數(shù)式接口
Runnable r1 = () -> System.out.println("Hello world")
// 等價(jià)于
Runnable r2 = new Runnable() {
public void run(){
System.out.println("Hello world");
}
}
run()方法簽名:參數(shù)列表為空,返回為void;lambda簽名:() -> void 參數(shù)列表為空,返回為void可以看出Runnable的run方法簽名與lambda的簽名匹配,我們將這種對(duì)方法抽象描述叫作函數(shù)描述符
在java8中,提供了很多函數(shù)式接口,可以用于描述各種Lambda表達(dá)式的簽名
| 函數(shù)式接口 | 函數(shù)描述符 |
|---|---|
| Predicate<T> | T->boolean |
| Consumer<T> | T->void |
| Function<T,R> | T->R |
| Supplier<T> | ()->T |
| UnaryOperator<T> | T->T |
| BiPredicate<L,R> | (L,R)->boolean |
| BiConsumer<T,U> | (T,U)->void |
| BiFunction<T,U,R> | (T,U)->R |
這些都是較為常用的函數(shù)式接口,還有很多都在
java.util.function包下,有興趣可以自行查看。
Stream
一個(gè)新的抽象,稱(chēng)為流,可以以聲明的方式處理數(shù)據(jù)。提供了一系列的api,使用類(lèi)似sql語(yǔ)句直觀(guān)的方式來(lái)提供對(duì)集合處理的高階抽象。
另外還有兩個(gè)特點(diǎn):
- 流水線(xiàn):很多流操作本身會(huì)返回一個(gè)流,這樣多個(gè)操作就可以鏈接起來(lái),形成一個(gè)大的流水線(xiàn)。這樣做可以對(duì)操作進(jìn)行優(yōu)化, 比如延遲執(zhí)行和短路。流水線(xiàn)的操作可以看作對(duì)數(shù)據(jù)源進(jìn)行數(shù)據(jù)庫(kù)式查詢(xún)。
- 內(nèi)部迭代:以前對(duì)集合遍歷都是通過(guò)Iterator或者For-Each的方式, 顯式的在集合外部進(jìn)行迭代, 這叫做外部迭代。 Stream提供了內(nèi)部迭代的方式。
示例
// 創(chuàng)建一個(gè)1至10的集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream() //將集合轉(zhuǎn)化成串行流
.filter(i -> i % 2 == 0) //過(guò)濾掉奇數(shù)
.limit(3) //取出前三個(gè)元素
.map(Double::valueOf) // 將int類(lèi)型轉(zhuǎn)換成double類(lèi)型
.forEach(System.out::println); // 迭代并打印出元素
// 最終輸出結(jié)果:2.0 4.0 6.0
如果采用傳統(tǒng)的For-Each迭代方式來(lái)處理集合,可以想象代碼可不是這短短這幾行了。
整個(gè)流水線(xiàn)的操作包含兩個(gè)
- 中間操作:形成一條操作流水線(xiàn)。例如:filter()對(duì)流操作并返回一個(gè)流,limit()對(duì)流操作也會(huì)返回流,這樣可以通過(guò)多個(gè)中間操作連接起來(lái)合成一個(gè)查詢(xún)。注意:整個(gè)流水線(xiàn)中,除非觸發(fā)了一個(gè)終端操作,否者中間操作不會(huì)執(zhí)行任何處理。因?yàn)槎鄠€(gè)中間操作可以合并起來(lái),在終端操作時(shí)一次性全部處理。
- 終端操作:執(zhí)行流水線(xiàn),生成處理結(jié)果。
方法引用
上面例子中有一行代碼為:map(Double::valueOf),::這個(gè)寫(xiě)法是什么意思呢?
實(shí)際上Double::valueOf就是一個(gè)方法引用。map(Double::valueOf)等價(jià)于map(element -> Double.value(element))
類(lèi)名放在分隔符::前,方法的名稱(chēng)放在后面。
例如,Double::valueOf就是引用了Double類(lèi)中定義的方法valueOf,并且不需要加括號(hào);
方法引用就是Lambda表達(dá)式的快捷寫(xiě)法,例如:
-
(Integer i) -> Double.valueOf(i)---Double::valueOf -
(String s) -> System.out.println(s)---System.out::println -
(str, i) -> str.substring(i)---String::substring
方法引用的種類(lèi)
- 靜態(tài)方法引用:ClassName::methodName
- 實(shí)例上的實(shí)例方法引用:instanceReference::methodName
- 超類(lèi)上的實(shí)例方法引用:super::methodName
- 類(lèi)型上的實(shí)例方法引用:ClassName::methodName
- 構(gòu)造方法引用:Class::new
- 數(shù)組構(gòu)造方法引用:TypeName[]::new
生成流
- Collection的stream()方法或者parallelStream() ,例如Arrays.asList(1,2,3).stream()。
- Arrays.stream(Object[]) 例如Arrays.stream(new int[]{1,2,3})。
- 使用流的靜態(tài)方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator) ,如Stream.iterate(0, n -> n * 2) , 或者generate(Supplier<T> s) 如Stream.generate(Math::random)。
- BufferedReader.lines() 從文件中獲得行的流。
- Files類(lèi)的操作路徑的方法,如list、find、walk等。
- 隨機(jī)數(shù)流Random.ints()。
- 其它一些類(lèi)提供了創(chuàng)建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
- 更底層的使用StreamSupport,它提供了將Spliterator轉(zhuǎn)換成流的方法。
// 列舉一些常用創(chuàng)建流的例子
List<Integer> list = Arrays.asList(1, 2, 3);
// Collection的stream方法
Stream<Integer> stream = list.stream();
// Stream的of方法
Stream<List<Integer>> stream2 = Stream.of(list);
// BufferedReader的lines方法
BufferedReader bufferedReader = new BufferedReader(new FileReader("filePath"));
Stream<String> lines = bufferedReader.lines();
中間操作
filter
Stream<T> filter(Predicate<? super T> predicate)
返回此流中匹配元素組成的流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream()
.filter(i -> i % 2 == 0) //過(guò)濾掉奇數(shù)
.forEach(System.out::println); // 終端操作,打印結(jié)果
// 輸出:2 4 6 8 10
map
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
返回一個(gè)流,該流的元素映射成另外的值,新的值類(lèi)型可以與原來(lái)的類(lèi)型不同
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream()
.map(i -> i + "str ") // 轉(zhuǎn)換成String
.forEach(System.out::print); // 終端操作,打印結(jié)果
// 輸出:1str 2str 3str 4str 5str 6str 7str 8str 9str 10str
mapToInt
IntStream mapToInt(ToIntFunction<? super T> mapper)
返回一個(gè)IntStream,該流的元素映射成int類(lèi)型的流 IntStream:原始流
List<String> strings = Arrays.asList("1","2","3");
strings
.stream()
.mapToInt(Integer::parseInt) // 轉(zhuǎn)換成int
.forEach(System.out::println); // 終端操作,打印結(jié)果
// 輸出:int類(lèi)型的 1 2 3
mapToLong,mapToDouble與mapToInt類(lèi)似只不過(guò)原始類(lèi)型不同而已,下面會(huì)單獨(dú)講解這三個(gè)原始流的作用及區(qū)別。
flatMap
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
返回一個(gè)Stream,和map類(lèi)似,不同的是會(huì)將每個(gè)元素的扁平化。
flatMap的理解可能稍微有點(diǎn)難,通過(guò)下面兩個(gè)例子來(lái)展示。
例子1:引用java8實(shí)戰(zhàn)中的例子 將集合中的兩個(gè)字符串根據(jù)字母去重復(fù)
List<String> strings = Arrays.asList("Hello", "World");
List<String> list = strings
.stream() // 將集合轉(zhuǎn)成流
.map(s -> s.split("")) // 轉(zhuǎn)換成['H','e','l','l','o'],['W','o','r','l','d'] 兩個(gè)數(shù)組
.flatMap(Arrays::stream) // 將兩個(gè)數(shù)組扁平化成為['H','e','l','l','o','W','o','r','l','d'],實(shí)際上還是把兩個(gè)數(shù)組再次轉(zhuǎn)成流
.distinct() // 去除重復(fù)元素
.collect(Collectors.toList()); // 終端操作,轉(zhuǎn)化成集合
System.out.println(list);
// 輸出: [H, e, l, o, W, r, d]
引入java8實(shí)戰(zhàn)的流程圖如下:
例子2:
List<Integer> numbers1 = Arrays.asList(1, 2, 3);
List<Integer> numbers2 = Arrays.asList(4, 5, 6);
Stream.of(numbers1, numbers2) // 將兩個(gè)集合轉(zhuǎn)成流
.flatMap(numbers -> numbers.stream()) // 兩個(gè)集合流扁平化為[1,2,3,4,5,6]
.forEach(System.out::println);
// 輸出: 1,2,3,4,5,6
distinct
Stream<T> distinct()
過(guò)濾流中重復(fù)的元素
Arrays.asList(1, 2, 3, 2, 3, 4)
.stream()
.distinct() // 去除重復(fù)
.forEach(System.out::println);
// 輸出: 1234
sorted
Stream<T> sorted()
對(duì)流中的元素順序排序
Arrays.asList(1, 3, 5, 2, 4)
.stream()
.sorted() // 順序排序
.forEach(System.out::println);
// 輸出: 12345
上面的例子中只支持順序排序,如果要倒序呢?Stream中sorted還提供了一個(gè)重載方法:
Stream<T> sorted(Comparator<? super T> comparator);
可以通過(guò)傳入Comparator來(lái)實(shí)現(xiàn)自己的排序規(guī)則
Arrays.asList(1, 3, 5, 2, 4)
.stream()
.sorted(Comparator.reverseOrder()) // 倒序排序
.forEach(System.out::println);
// 輸出: 54321
limit
Stream<T> limit(long maxSize)
截取流,返回一個(gè)不超過(guò)給定長(zhǎng)度的流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.limit(3) // 截取前三個(gè)元素
.forEach(System.out::println);
// 輸出: 123
skip
Stream<T> skip(long n)
跳過(guò)給定長(zhǎng)度的流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.skip(2) // 跳過(guò)前兩個(gè)元素
.forEach(System.out::println);
// 輸出: 345
parallel
S parallel()
將流轉(zhuǎn)成并行流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.parallel() // 轉(zhuǎn)成并行流
.forEach(System.out::println);
// 由于是并行的,每次輸出結(jié)果都會(huì)不一致
// 輸出: 1 5 2 4 3
parallel線(xiàn)程安全需要注意的點(diǎn)
直接上例子
// 反例1
for (int i = 0; i < 5; i++) {
List<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 1000).parallel().forEach(element->{
list.add(element);
});
System.out.println(list.size());
}
// 輸出:981 990 962 ...... 多次運(yùn)行會(huì)發(fā)現(xiàn)每次結(jié)果都不一樣,并且有時(shí)還會(huì)報(bào)ArrayIndexOutOfBoundsException數(shù)組越界
// 這邊體現(xiàn)了在多線(xiàn)程中操作共享變量引發(fā)的問(wèn)題,例如list容器當(dāng)前容量為50,兩個(gè)線(xiàn)程同時(shí)進(jìn)入方法體,此時(shí)線(xiàn)程A持有的list里面有49個(gè)元素,線(xiàn)程B持有的list里面也是49個(gè)元素,然后線(xiàn)程A執(zhí)行l(wèi)ist.add()完成,此時(shí)容器內(nèi)的元素的數(shù)量有50,由于線(xiàn)程之間不可見(jiàn),線(xiàn)程B也進(jìn)入到了add方法并且過(guò)了list容器擴(kuò)容的檢查,然后添加元素時(shí)發(fā)生ArrayIndexOutOfBoundsException
// 如果要能安全的新增,那么可以使用線(xiàn)程安全的容器
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
List<Integer> list = new CopyOnWriteArrayList<>();
// 反例2
List<Integer> list = new ArrayList<>(1000);
long count = IntStream.rangeClosed(1, 1000).parallel().map(element -> {
list.add(element);
return element;
}).count();
long count = IntStream.rangeClosed(1, 1000).parallel().peek(element -> {
list.add(element);
}).count();
// 使用并行流時(shí),不要去操作共享變量,以上例子皆為反例
parallel性能上需要注意的點(diǎn)
對(duì)并行流的效率進(jìn)行測(cè)試,每臺(tái)機(jī)器上的結(jié)果可能不一致,請(qǐng)自行注意。下面例子全部采用遍歷五次,取其中最快的一次。
// 串行與并行流效率測(cè)試 基于i7 8核cpu
// 對(duì)100_000_000求和
// for求和性能測(cè)試
static void testFor(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long sum = 0;
long start = System.currentTimeMillis();
for (long j = 0L; j <= size; j++) {
sum += j;
}
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("For 處理時(shí)間:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
// 并行流求和性能測(cè)試
static void testParallel(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
Stream.iterate(0L, (element -> element + 1L)).limit(size).parallel().reduce(0L,Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelStream 處理時(shí)間:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
}
// 輸出為:
// For 處理時(shí)間:5ms
// ParallelStream 處理時(shí)間:254ms
為什么并行的會(huì)比傳統(tǒng)For要慢,是因?yàn)镾tream.iterate生成的是裝箱對(duì)象,在求和過(guò)程中,裝箱對(duì)象需要拆箱,計(jì)算完還會(huì)在裝箱,數(shù)據(jù)量越大,那么采用裝箱對(duì)象計(jì)算則會(huì)越慢??梢陨晕⒏囊恍写a:
Stream.iterate(0L, (element -> element + 1)).limit(size).parallel().reduce(0L,Long::sum);更改為
Stream.iterate(0L, (element -> element + 1)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);,這邊生成流的時(shí)候先轉(zhuǎn)成原始流,然后在去做計(jì)算
// 并行流求和性能測(cè)試
static void testParallelStream(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
Stream.iterate(0L, (element -> element + 1L)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelStream 處理時(shí)間:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
}
// 輸出為:
// For 處理時(shí)間:5ms
// ParallelStream 處理時(shí)間:143ms
// 可以看出提升了接近一倍的性能,在數(shù)據(jù)量更大的情況下,會(huì)更高。
// 在java8里,還提供了3個(gè)生成原始流的對(duì)象:LongStream,DoubleStream,IntStream,下面直接測(cè)試采用原始流來(lái)做測(cè)試
static void testParallelLongStream(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
LongStream.rangeClosed(0, size).parallel().sum();
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelLongStream 處理時(shí)間:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
testParallelLongStream(size)
}
// 輸出為:
// For 處理時(shí)間:5ms
// ParallelStream 處理時(shí)間:148ms
// ParallelLongStream 處理時(shí)間:1ms
雖然將序列流轉(zhuǎn)成并行流很容易,但是不恰當(dāng)?shù)氖褂梅吹箷?huì)成為負(fù)優(yōu)化。在數(shù)據(jù)量不大的情況下,并行不一定比順序的要快,反倒要慢上很多,因?yàn)閿?shù)據(jù)量小的情況下,在線(xiàn)程的上下文切換之間的開(kāi)銷(xiāo)已經(jīng)大于數(shù)據(jù)處理的開(kāi)銷(xiāo)了。以及在做數(shù)值計(jì)算的情況下,要留意是否是裝箱對(duì)象,自動(dòng)裝箱拆箱在數(shù)據(jù)量大起來(lái)會(huì)成為性能上的累贅。
下面再看一個(gè)例子
static void testStructure(Collection<Long> c) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
c.parallelStream().reduce(0L, Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
// 取五次中最快的一次
System.out.println("處理時(shí)間:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 使用ArrayList容器
ArrayList<Long> arrayList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(ArrayList::new));
// 使用LinkedList容器
LinkedList<Long> linkedList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(LinkedList::new));
testStructure(linkedList);
testStructure(arrayList);
}
// 輸出
// 處理時(shí)間:420ms
// 處理時(shí)間:36ms
在選用數(shù)據(jù)結(jié)構(gòu)上,可以看出ArrayList在并行中效率要高于LinkedList,這是因?yàn)锳rrayList的拆分效率比LinkedList高得多,前者用不著遍歷就可以平均拆分,而后者則必須遍歷。
按照可分解性總結(jié)了一些流數(shù)據(jù)源適不適于并行
| 數(shù)據(jù)源 | 可分解性 |
|---|---|
| ArrayList | 極佳 |
| IntStream.range | 極佳 |
| HashSet | 好 |
| TreeSet | 好 |
| LinkedList | 差 |
| Stream.iterate | 差 |
parallel操作上需要注意的點(diǎn)
并行流底層使用的是java7引入的Fork/Join(并發(fā)框架),它可以以并行的方式將任務(wù)拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體的結(jié)果。此文不多描述,有興趣者自行查閱。需要注意的是,使用并行流時(shí),內(nèi)部使用了默認(rèn)的 ForkJoinPool,池的大小為默認(rèn)的cup核數(shù)-1(java8實(shí)戰(zhàn)說(shuō)的是默認(rèn)核數(shù),如果看過(guò)此書(shū)的請(qǐng)自行測(cè)試),
Runtime.getRuntime().availableProcessors()來(lái)查看cpu的核心數(shù)量。
parallel運(yùn)行時(shí)監(jiān)控的線(xiàn)程數(shù)
在使用并行流時(shí)請(qǐng)注意,如果為IO密集型的并行,如果在多處使用,極有可能會(huì)影響所有的并行流,因?yàn)槭褂玫氖窍到y(tǒng)全局的ForkJoinPool,當(dāng)池子里的線(xiàn)程被占用了,那么別處要使用線(xiàn)程只能等待它被釋放。
// 模擬8個(gè)任務(wù),獨(dú)占線(xiàn)程并且不釋放
Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 啟用任務(wù)
new Thread(runnable).start();
System.out.println("任務(wù)開(kāi)始");
// 等待一會(huì),讓池子里的線(xiàn)程充分被占用
Thread.sleep(1000);
IntStream.rangeClosed(0, 1000).parallel().forEach(c -> {
try {
// 打印當(dāng)前前程,查看是否使用了ForkJoinPool中的線(xiàn)程
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
可以看出,當(dāng)池子里的線(xiàn)程被占用完,別的地方使用了并行流,完全變成了單線(xiàn)程執(zhí)行。如果要避免這種情況,可以設(shè)置JVM啟動(dòng)參數(shù)
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16來(lái)設(shè)置ForkJoinPool的大小,也可以使用代碼System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16")來(lái)設(shè)置全局的參數(shù),以上兩種方法及其不推薦,因?yàn)樗鼘⒂绊懰械牟⑿辛?,推薦使用自定義ForkJoinPool的方式,如下所示
Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 啟用任務(wù)
new Thread(runnable).start();
System.out.println("任務(wù)開(kāi)始");
// 設(shè)置一個(gè)容量為10的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
// 執(zhí)行任務(wù)
ForkJoinTask<?> submit = forkJoinPool.submit(() -> {
IntStream.rangeClosed(0, 20).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
});
while (!submit.isDone()) {
Thread.sleep(500);
}
sequential
S sequential()
將流轉(zhuǎn)成序列流
Arrays.asList(1, 2, 3, 4, 5)
.parallelStream()
.sequential() // 轉(zhuǎn)成序列流
.forEach(System.out::println);
// 輸出: 1 2 3 4 5
終端操作
allMatch,anyMatch,noneMatch
boolean anyMatch(Predicate<? super T> predicate)
anyMatch:此流的任意元素有一個(gè)匹配返回ture,都不匹配返回false
boolean allMatch(Predicate<? super T> predicate)
allMatch:此流的所有元素是都匹配返回ture,否者為false
boolean noneMatch(Predicate<? super T> predicate)
noneMatch:此流中沒(méi)有一個(gè)元素匹配返回ture,否者返回false
// 全部匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i >= 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i > 5)); // false
// 任意一個(gè)匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 9)); // false
// 都不匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 5)); // false
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 9)); // true
reduce
聚合操作 sum()、max()、min()、count()調(diào)用的都是reduce
Optional<T> reduce(BinaryOperator<T> accumulator)
無(wú)初始值,按傳入的lambda的累加規(guī)則來(lái)聚合數(shù)據(jù)
// 無(wú)默認(rèn)值,求和
Optional<Integer> sum1 = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce((a, b) -> a + b);
System.out.println(sum1.get()); // 輸出:15
T reduce(T identity, BinaryOperator<T> accumulator)
第一個(gè)參數(shù)為初始值,第二個(gè)參數(shù)為累加器(歸并數(shù)據(jù)的lambda)
// 有默認(rèn)值,求和
Integer sum2 = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce(5, (a, b) -> a + b);
System.out.println(sum2); // 輸出:20
// 求最大值
Integer max = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce(0, Integer::max); // 也可以寫(xiě)成 reduce(0, (a, b) -> a > b ? a : b);
System.out.println(max); // 輸出:20
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)
combiner:合并器,用于合并累加器的值,這個(gè)參數(shù)只有在并行流下才會(huì)生效
reduce操作可以并行進(jìn)行,為了避免競(jìng)爭(zhēng),每個(gè)reduce線(xiàn)程都會(huì)有獨(dú)立的result,combiner的作用在于合并每個(gè)線(xiàn)程的result得到最終結(jié)果。
Integer reduce = Arrays.asList(1, 2, 3, 4, 5)
.parallelStream()
.reduce(0, (a, b) -> a + b, (c, d) -> c + d);
System.out.println(reduce); // 輸出:20
reduce在并行流中的注意事項(xiàng)
System.out.println(
Arrays.asList(1, 2, 3)
.parallelStream()
.reduce(0,(a, b) -> (a - b),(c, d) -> c + d)
);
// 如果無(wú)意料,那么輸出將會(huì)是 -6,當(dāng)運(yùn)行程序的時(shí)候結(jié)果卻是 -2,這與我們的預(yù)期結(jié)果大大不符
// 為什么會(huì)是-3呢,那么在序列流和并行流結(jié)果不一致,將以上代碼修改一下,把參數(shù)和線(xiàn)程打印出來(lái)
System.out.println(
Arrays.asList(1, 2, 3)
.parallelStream()
.reduce(0,
(a, b) -> {
System.out.format("a:%s b:%s Thread:%s \n", a, b, Thread.currentThread().getName());
return a - b;
},
(c, d) -> {
System.out.format("c:%s d:%s Thread:%s \n", c, d, Thread.currentThread().getName());
return c - d;
}
));
在并行流中,reduce計(jì)算的方式與序列流不同,這歸根于fork/join的特殊性,所有任務(wù)不斷拆分,如果有初始值,那么會(huì)在累加階段會(huì)以每個(gè)初始值與流中的數(shù)據(jù)累加,例如初始值為1,執(zhí)行一個(gè)求和的累加,那么如果有N個(gè)元素,那么最終結(jié)果值為SUM + (N * 1),在相乘,相加,相減等等計(jì)算在使用并行流時(shí)需要好好考慮由并行帶來(lái)的影響,當(dāng)然如果只是聚合計(jì)算(sum,avg,max,min)可以放心的使用,如果采用自定義計(jì)算規(guī)則,那么一定需要謹(jǐn)慎使用,并測(cè)試。
findFirst,findAny
Optional<T> findFirst()
返回此流的第一個(gè)元素的Optional,如果流為空,則返回空Optional。
Optional<T> findAny()
返回此流的任意一個(gè)元素的Optional,如果流為空,則返回空Optional。
findFirst在并行流中的執(zhí)行代價(jià)非常大,需要注意
Optional<Integer> first = Arrays.asList(1, 2, 3, 4, 5)
.stream().findFirst();
System.out.println(first.get()); // 輸出 1
Optional<Integer> any = Arrays.asList(1, 2, 3, 4, 5)
.stream().findAny();
System.out.println(any.get()); // 因?yàn)槭琼樞蛄?,所以輸?
collect
<R, A> R collect(Collector<? super T, A, R> collector)
收集,對(duì)數(shù)據(jù)做聚合,將流轉(zhuǎn)換為其他形式,比如List,Map,Integer,Long...
// 準(zhǔn)備一些初始數(shù)據(jù)
@Data
@AllArgsConstructorclass
Student {
private String name;
private Integer age;
}
// 初始化數(shù)據(jù)
Student student1 = new Student("zhangsan", 20);
Student student2 = new Student("lisi", 15);
Student student3 = new Student("wangwu", 10);
Student student4 = new Student("zhaoliu", 20);
List<Student> students = Arrays.asList(student1, student2, student3, student4);
// 如果要取出所有學(xué)生的姓名并轉(zhuǎn)成集合可以寫(xiě)成
List<String> names = students.stream()
.map(Student::getName) // 獲取name
.collect(Collectors.toList()); // 轉(zhuǎn)成List
System.out.println(names); // 輸出:[zhangsan, lisi, wangwu, zhaoliu]
// 以年齡為key,姓名為value轉(zhuǎn)成Map可以寫(xiě)成
Map<Integer, String> map = students.stream()
.collect(Collectors.toMap(Student::getAge, Student::getName)); // 此寫(xiě)法會(huì)有問(wèn)題,如果Map的key重復(fù)了,會(huì)報(bào)java.lang.IllegalStateException: Duplicate key 如果可以確保key不會(huì)重復(fù)就可以省略第三個(gè)參數(shù)
Map<Integer, String> map = students.stream()
.collect(Collectors.toMap(Student::getAge, Student::getName, (first, second) -> second)); // 前面兩個(gè)參數(shù)是映射key和value,第三個(gè)參數(shù)為如果key重復(fù)了要如何處理,是保留舊的還是選擇新的
System.out.println(map); // 輸出:{20=zhaoliu, 10=wangwu, 15=lisi} 因?yàn)閦hangsan和zhaoliu的年齡都是20,按照我們的策略,始終選擇新的,所以key為20的value是zhaoliu
Map<Integer, List<Student>> groupByAge = students.stream()
.collect(Collectors.groupingBy(Student::getAge)); // 根據(jù)age分組
System.out.println(groupByAge);
// 輸出:{20=[Student(name=zhangsan, age=20), Student(name=zhaoliu, age=20)], 10=[Student(name=wangwu, age=10)], 15=[Student(name=lisi, age=15)]}
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner)
supplier:定義一個(gè)容器
accumulator:該容器怎么添加流中的數(shù)據(jù)
combiner:容器如何去聚合
// 仿Collectors.toList(),簡(jiǎn)單實(shí)現(xiàn)一個(gè)toList()
// 1.定義一個(gè)List容器
// 2.調(diào)用List的add方法將元素添加到容器中
// 3.采用List的addAll方法聚合容器
List<Integer> toList = Arrays.asList(1, 2, 3, 4).stream().collect(ArrayList::new, List::add, List::addAll);
System.out.println(toList);
// 輸出:[1, 2, 3, 4]
// 仿Collectors.toMap(),簡(jiǎn)單實(shí)現(xiàn)toMap()
// 1.定義一個(gè)Map容器
// 2.調(diào)用Map的merge方法將元素添加到容器中
// 3.采用Map的putAll方法聚合容器
Map<Object, Object> map = students.stream()
.collect(HashMap::new,
(holder, element) -> {
holder.merge(element.getAge(), element.getName(), (u, v) -> {
return u;
// throw new IllegalStateException(String.format("Duplicate key %s", u));
});
}, Map::putAll);
System.out.println(map);
// 輸出:{20=zhangsan, 10=wangwu, 15=lisi}
Optional
java8添加的容器對(duì)象,在一些場(chǎng)景下避免使用null檢查而設(shè)定的類(lèi),盡可能避免的NullPointerException。
創(chuàng)建Optional實(shí)例的靜態(tài)方法
Optional私有了構(gòu)造函數(shù),只能通過(guò)Optional對(duì)外提供了三個(gè)靜態(tài)方法構(gòu)造實(shí)例
empty
public static<T> Optional<T> empty()
返回一個(gè)空Optional實(shí)例
Optional<Object> empty = Optional.empty();
System.out.println(empty); // 輸出:Optional.empty
of
public static <T> Optional<T> of(T value)
創(chuàng)建一個(gè)包含非null值的Optional容器,如果值為null會(huì)直接拋出NullPointerException
Optional.of("");
Optional.of(null); // 會(huì)拋空指針異常
ofNullable
public static <T> Optional<T> ofNullable(T value)
創(chuàng)建一個(gè)可以包含null值的Optional容器
Optional.ofNullable(null);
Optional實(shí)例方法
get
public T get()
返回Optional容器保存的非null值,如果值為null會(huì)拋出NoSuchElementException
Optional<Object> empty = Optional.empty();
System.out.println(empty.get()); // 拋NoSuchElementException
Optional<Object> nullable = Optional.ofNullable(null);
System.out.println(nullable.get()); // 拋NoSuchElementException
Optional<Object> non = Optional.ofNullable("test");
System.out.println(non.get()); // 輸出:test
orElse
public T orElse(T other)
如果存在則返回值,否則返回other,是否存在的判斷為null或者是一個(gè)empty
Object data1 = Optional.ofNullable(null).orElse("data");
System.out.println(data1); // 輸出:data
Object data2 = Optional.ofNullable("data1").orElse("data2");
System.out.println(data2); // 輸出:data1
orElseGet
public T orElseGet(Supplier<? extends T> other)
如果存在則返回值,否則調(diào)用other并返回該調(diào)用的結(jié)果。
Object data = Optional.ofNullable(null).orElseGet(() -> "data");
System.out.println(data); // 輸出:data
orElse與orElseGet的區(qū)別
public static <T> T getValue(T oldValue, T newValue) {
System.out.println(newValue);
return newValue;
}
Optional.ofNullable("a").orElse(getValue("test1"));
Optional.ofNullable("b").orElseGet(() -> "test2");
// 此時(shí)會(huì)看到控制臺(tái)輸出了test1
// orElse即使值不為空的情況下,也會(huì)調(diào)用orElse()中的函數(shù),而orElseGet是在值為空的情況下才會(huì)調(diào)用other中的函數(shù)
// 如果有如下代碼那么將是致命的
User u1 = Optional.ofNullable(user).orElse(userMapper.findUserById("1")); // 1
User u2 = Optional.ofNullable(user).orElse(new User()); // 2
// 哪怕user不為空也會(huì)去查詢(xún)一次數(shù)據(jù)庫(kù),或者user不為空也會(huì)創(chuàng)建一個(gè)對(duì)象,加大了性能消耗,所以在使用時(shí)需要注意
orElseThrow
public <X extends Throwable> T orElseThrow(Supplier<? extends X> exceptionSupplier) throws X
如果存在返回包含的值,否則拋出定義的異常。
// 當(dāng)user為空的時(shí)候,拋出定義的異常
User user = Optional.ofNullable(user).orElseThrow(() -> new RuntimeException("User為空"));
isPresent
public boolean isPresent()
如果存在值則返回true,否則返回false。
boolean b1 = Optional.empty().isPresent(); // false
boolean b2 = Optional.ofNullable(null).isPresent(); // false
boolean b3 = Optional.ofNullable("").isPresent(); // ture
ifPresent
public void ifPresent(Consumer<? super T> consumer)
如果存在值,則執(zhí)行指定的操作,否則不執(zhí)行任何操作。
Optional.ofNullable(null).ifPresent(System.out::println); // 不會(huì)執(zhí)行
Optional.ofNullable("data").ifPresent(System.out::println); // 輸出data
filter
public Optional<T> filter(Predicate<? super T> predicate)
根據(jù)給定的條件過(guò)濾值,過(guò)濾后如果存在值則返回Optional包裝的值,否則返回空的Optional。
Optional<String> str1 = Optional.ofNullable("123456").filter(element -> element.contains("123"));
System.out.println(str1); // 輸出:Optional[123456]
Optional<String> str2 = Optional.ofNullable("123456").filter(element -> element.contains("789"));
System.out.println(str2); // 輸出:Optional.empty
map
public<U> Optional<U> map(Function<? super T, ? extends U> mapper)
如果存在值,就對(duì)該值執(zhí)行mapping調(diào)用,返回mapping后Optional包裝的值,否則返回空的Optional。
Student student = new Student("zhangsan",10);
Optional<String> name = Optional.of(student).map(Student::getName); // 返回一個(gè)Optional包裝的值
flatMap
public<U> Optional<U> flatMap(Function<? super T, Optional<U>> mapper)
如果值存在,就對(duì)該值執(zhí)行mapping調(diào)用,返回一個(gè) Optional 類(lèi)型的值,否則就返回一個(gè)空的 Optional 對(duì)象
Student student = new Student("zhangsan",10);
Optional<Integer> age = Optional.of(student).flatMap(s -> Optional.ofNullable(s.getAge())); // 返回一個(gè)Optional包裝的值
map與flatMap的區(qū)別
map接受的入?yún)槿我忸?lèi)型,flatMap接受的入?yún)镺ptional<U>類(lèi)型,返回的都是Optional<U>
總結(jié)
- lambda由參數(shù)列表,箭頭,主體組成。
- 函數(shù)式接口只能擁有一個(gè)抽象方法,可以擁有多個(gè)默認(rèn)方法,多個(gè)靜態(tài)方法。
- 方法引用實(shí)際就是Lambda的快捷寫(xiě)法。
- 流只能遍歷一次。遍歷完之后,我們就說(shuō)這個(gè)流已經(jīng)被消費(fèi)掉了。你可以從原始數(shù)據(jù)源那里再獲得一個(gè)新的流來(lái)重新遍歷一遍。
- 并行流是采用ForkJoin實(shí)現(xiàn)的。
- 在并行流中,不要在peek,map中不要去修改外部數(shù)據(jù)。
- 并行流使用需要注意,不要靠猜測(cè),請(qǐng)多測(cè)試。
- 接口默認(rèn)方法,優(yōu)先級(jí)最低,子類(lèi)會(huì)繼承默認(rèn)方法并且可以覆蓋默認(rèn)方法。如果因?yàn)槎嗬^承問(wèn)題引起沖突(子類(lèi)實(shí)現(xiàn)了兩個(gè)接口,兩個(gè)接口都擁有相同的方法名,相同函數(shù)描述符),那么必須覆蓋該方法,如果期望調(diào)用某接口中的默認(rèn)方法,可以使用X.super.m(…)來(lái)顯示調(diào)用哪個(gè)接口的默認(rèn)方法。
- 接口靜態(tài)方法,子類(lèi)不會(huì)繼承,也不能覆蓋,但是可以定義一個(gè)名稱(chēng)相同返回值相同的普通或靜態(tài)方法。









