序言
上一次提到了Java 1.5中提供新的多線程模型,在大多數(shù)情況下,這已經(jīng)能夠滿足日常開發(fā)的需要。但是偶爾也許覺得那一套模型還是覺得欠缺點(diǎn)什么,于是乎,Java 7/8中又提供了新的多線程模型。
Java 8中提供了并行流以及**ForkJoinPool **(FJP)和lambda(據(jù)說Java 8的lambda只是語法糖,沒有深究過)
ForkJoinPool / ForkJoinTask
這一套工具是由Java 7提供的。要使用這種方法之前,應(yīng)該有所了解函數(shù)式編程,如果有過JavaScript或者其他一個(gè)腳本語言的開發(fā),應(yīng)該對(duì)此不會(huì)陌生。另外,通過這種方法,比較難確定實(shí)際上是否使用了超過一個(gè)線程,因?yàn)檫@是由流的具體實(shí)現(xiàn)決定的。最后,在默認(rèn)情況下是通過ForkJoinPool.commonPool()實(shí)現(xiàn)并行的。這個(gè)通用池由JVM來管理,并且被JVM進(jìn)程內(nèi)的所有線程共享。(以下示例代碼若非特別說明均要求Java 7及以上,部分代碼出于簡(jiǎn)潔,使用了lambda表達(dá)式,因此需要Java 8及以上才可以運(yùn)行??梢詫ambda表達(dá)式用匿名內(nèi)部類替代,即可在Java 7下編譯通過)
多線程經(jīng)常會(huì)伴隨著并行計(jì)算(并行不等于并發(fā)),雖然并不絕對(duì),但是通常與并行或多或少存在著聯(lián)系。而并行計(jì)算的特點(diǎn)在于將較為復(fù)雜、龐大的任務(wù),拆解成互不相干、較為簡(jiǎn)單、小型的任務(wù),最后將各個(gè)小任務(wù)的結(jié)果匯總、分析、處理,得到原本大任務(wù)的結(jié)果。這樣做的目的無非是提高效率,充分利用硬件計(jì)算資源,有效規(guī)避瓶頸效應(yīng)。
接下來以計(jì)算y! - x!為例,展示代碼:
// 計(jì)算y! - x!的值
class MyJob extends RecursiveTask<Integer> {
private int y;
private int x;
public MyJob(int x, int y) {
this.x = x;
this.y = y;
}
@Override
protected Integer compute() {
if (x > 2) { // 先計(jì)算x的階乘
MyJob subJobX = new MyJob(x - 1, -1);
subJobX.fork();
x *= subJobX.join();
if (y == -1) { // 判斷是否為遞歸計(jì)算
return x; // 遞歸計(jì)算則返回階乘結(jié)果
} else if (y > 2) { // 計(jì)算y的階乘
MyJob subJobY = new MyJob(y - 1, -1);
subJobY.fork();
y *= subJobY.join();
return y - x; // 輸出最后任務(wù)的結(jié)果
} else { // 正常輸入,不會(huì)進(jìn)入這個(gè)分支
System.err.println("Error.");
return 0;
}
} else {
return 2;
}
}
}
public class ForkJoinExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyJob job = new MyJob(4, 10);
Future<Integer> result = forkJoinPool.submit(job);
while (!result.isDone()) {
System.err.println("Waiting for result");
Thread.sleep(0, 1);
}
forkJoinPool.shutdown();
System.err.println("The results is " + result.get());
}
}
輸出結(jié)果:
Waiting for result
Waiting for result
The results is 3628776
在這個(gè)例子中,我把計(jì)算n * (n - 1)作為最小的任務(wù)。因此先判斷x是否大于2,如果不大于2,則fork出一個(gè)分支,計(jì)算(x - 1)!的值,y同理。最后得到x!與y!的值,相減,得出最后的結(jié)果。代碼中,fork出來的分支中,參數(shù)y我作為一個(gè)標(biāo)志,如果為-1則不是原始調(diào)用,而是fork的子任務(wù),只需要負(fù)責(zé)計(jì)算傳遞進(jìn)來x的階乘即可。
這段代碼中,你看不出任務(wù)在哪里完成(哪個(gè)線程)、由誰完成、什么時(shí)候完成。正如前面所述,甚至你很難看出來是否是大于一個(gè)線程在執(zhí)行。
當(dāng)然你可以嘗試在compte方法中增加System.err.printf("%s is running.%n",Thread.currentThread().getName());的語句來查看輸出,到底有幾個(gè)線程在運(yùn)行。不過根據(jù)我的實(shí)踐來說,一般情況下,應(yīng)該只有一個(gè)線程在運(yùn)行。這不是說代碼有問題,主要有兩個(gè)因素:
- 代碼設(shè)計(jì)不合理,我的代碼中,計(jì)算x的階乘與y的階乘是分開的,并沒有一起fork,因此本質(zhì)上并發(fā)性其實(shí)沒有顯示出來。我的代碼其實(shí)相當(dāng)于先計(jì)算x!,然后計(jì)算y!,最后計(jì)算y! - x!
- 由于例子中代碼的計(jì)算量很小,以當(dāng)前CPU的計(jì)算能力有盈余。針對(duì)這種情況,可以嘗試多開幾個(gè)任務(wù)同時(shí)并行查看輸出結(jié)果。
當(dāng)然我覺得上面的例子不好,因此想了另一張場(chǎng)景,并用代碼演示一下。
比如現(xiàn)在需要制作一個(gè)網(wǎng)絡(luò)爬蟲,爬什么呢,就爬簡(jiǎn)述首頁推薦文章每篇文章的字?jǐn)?shù)。代碼中涉及網(wǎng)絡(luò)請(qǐng)求和正則表達(dá)式的部分就不說明了,其中用了我自己寫的一個(gè)小工具類Spider.java和HttpRequester.java。
class JianshuSpiderJob extends RecursiveTask<List<String>> {
private static final String HOST = "http://www.itdecent.cn";
private String url;
public JianshuSpiderJob() {
this(HOST);
}
protected JianshuSpiderJob(String url) {
this.url = url;
}
protected List<String> requestHomepage() throws IOException {
List<String> result = new ArrayList<>();
Spider.newHost(new URL(this.url)).get((responseCode, responseHeaders, responseStream) -> { // 請(qǐng)求簡(jiǎn)書主頁
if (responseCode == 200) {
Pattern indexPattern = Pattern.compile("(/p/[a-z0-9]+)\">([^<>]+)</a></h4>");
Matcher indexMatcher = indexPattern.matcher(responseStream.toString());
// 從這里開始派分子任務(wù)
List<JianshuSpiderJob> subJobs = new ArrayList<>();
while (indexMatcher.find()) {
String subUrl = indexMatcher.group(1);
String subTitle = indexMatcher.group(2);
JianshuSpiderJob subJob = new JianshuSpiderJob(subUrl);
subJob.fork();
subJobs.add(subJob);
result.add(String.format("%s=%s", subTitle, subUrl));
}
// 銜接子任務(wù)的結(jié)果
for (JianshuSpiderJob job : subJobs) {
List<String> list = job.join();
if (list.size() > 0) {
String[] subResult = list.get(0).split("=");
for (int i = 0; i < result.size(); i++) {
if (result.get(i).indexOf(subResult[0]) > 0) {
String[] localResult = result.get(i).split("=");
result.remove(i);
result.add(String.format("%s=%s", localResult[0], subResult[1]));
break;
}
}
}
}
} else { // 網(wǎng)絡(luò)錯(cuò)誤
System.err.println("There is an error when trying to get homepage.");
}
return responseCode;
});
return result; // 返回最終結(jié)果
}
protected List<String> requestSubPage() throws IOException {
List<String> result = new ArrayList<>();
Map<String, String> requestHeader = new HashMap<>();
requestHeader.put("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36");
// 獲取文章頁面的詳細(xì)信息
Spider.newHost(new URL(HOST + this.url))
.setRequestHeaders(requestHeader)
.get((responseCode, responseHeaders, responseStream) -> { // 請(qǐng)求具體的文章頁
String html = responseStream.toString();
Pattern contextPattern = Pattern.compile("\"slug\":\"([a-z0-9]+)\".*?\"wordage\":(\\d+)");
Matcher contextMatcher = contextPattern.matcher(html);
if (contextMatcher.find())
result.add(String.format("%s=%s", contextMatcher.group(1), contextMatcher.group(2)));
return responseCode;
});
return result;
}
@Override
protected List<String> compute() {
try {
System.err.printf("%s is running.%n", Thread.currentThread().getName()); // 顯示當(dāng)前工作線程
if (HOST.equals(this.url)) {
return this.requestHomepage();
} else {
return this.requestSubPage();
}
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
public class ForkJoinExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
JianshuSpiderJob job = new JianshuSpiderJob();
forkJoinPool.submit(job);
List<String> result = job.get();
// 輸出結(jié)果
System.err.println("Article\tWords");
for (String str : result) {
String[] s = str.split("=");
if (s.length > 1)
System.err.printf("%s\t%s%n", s[0], s[1]);
else
System.err.println("Err -> " + str);
}
forkJoinPool.shutdown();
}
}
這里例子相比較之前那個(gè)計(jì)算階乘比較有典型,因?yàn)榫W(wǎng)絡(luò)請(qǐng)求本身是阻塞。一個(gè)請(qǐng)求可能幾毫秒就可以返回,也可以幾秒鐘才返回,甚至等了幾秒鐘以后,連接被中斷,請(qǐng)求失敗。例子中,并沒有考慮網(wǎng)絡(luò)異常的情況下。
運(yùn)行結(jié)果:
ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-6 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-7 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-4 is running.
Article Words
又一年輕姑娘離世:請(qǐng)記住這些“1+1=死神”的藥物! 2862
報(bào)告大王,新概念英語一至四冊(cè)全套資源(視頻、音頻、電子書)已被我活捉! 861
模式學(xué)習(xí)|找到最適合你的學(xué)習(xí)“套路” 4009
我們女孩子真不容易,既要貌美如花,又要賺錢養(yǎng)家。 1603
二十歲出頭的你,別急著想要出人頭地 2787
別怕,誰的大學(xué)不迷茫 1649
這10年,多少人從郭敬明到咪蒙 3143
老實(shí)人浪起來,你我都招架不住 2165
如果沒有回報(bào),我會(huì)堅(jiān)持寫作多久 2595
簡(jiǎn)書早報(bào)161030——《不負(fù)責(zé)任吐槽,四本買完看完就后悔的暢銷書》 2029
時(shí)而自信,時(shí)而自卑,如何改變這種雙重人生? 2855
有哪些小樂器,是學(xué)習(xí)起來非常方便的? 4259
窮人,最可怕的是總說自己窮 1838
時(shí)光回去,只愿未曾遇到你(五十七) 3109
親愛的,千萬別把孩子養(yǎng)得“輸不起”! 2580
你憑什么詆毀我的愛豆!向語言暴力Say No! 2177
史上最全36個(gè)虐腹動(dòng)作:想要馬甲線,人魚線的朋友練起來 776
兩只蝸牛的愛情 3382
比文招親【星言夙駕,說于桑田】十二 2493
《簡(jiǎn)書歷史月刊003·三千年來誰著史》上線 1118
從結(jié)果來看,整個(gè)過程最多的時(shí)候使用了8個(gè)線程來完成這個(gè)任務(wù)。每次運(yùn)行具體使用的線程數(shù)都不一樣,讀者也可以將這段代碼復(fù)制過去,并引用連接中的兩個(gè)類,看看結(jié)果如何。(由于運(yùn)行結(jié)果依賴于簡(jiǎn)書服務(wù)器返回的結(jié)果,隨著時(shí)間推移,程序結(jié)果很可能不正確,望知悉)
在整個(gè)代碼中,程序員并不知道具體任務(wù)是如何分配,程序員的關(guān)注點(diǎn)只在業(yè)務(wù)邏輯本身上,而不用關(guān)心有關(guān)于線程調(diào)度的問題。具體的調(diào)度交給里FJP。
ForkJoinTask 中拋出異常
而在ForkJoinTask,可能會(huì)引發(fā)Unchecked Exception,因此可以調(diào)用ForkJoinTask.isCompletedAbnormally()來判斷是否任務(wù)在執(zhí)行中出現(xiàn)異常。如果返回值為Throwable類型則表明在執(zhí)行過程中出現(xiàn)Unchecked Exception;若返回值為CancellationException則表明任務(wù)在執(zhí)行過程中被取消;如果任務(wù)還沒有結(jié)束或者正常完成,沒有異常,則返回null。
ForkJoinPool / ForkJoinTask 與 Executor 關(guān)聯(lián)
從類繼承圖上可以看到,F(xiàn)orkJoinPool 間接繼承了Executor,因此可以認(rèn)為兩者師出同門,只不過后者提供更加便捷API,使程序員將關(guān)注點(diǎn)更加集中在業(yè)務(wù)上。既然兩者師承一派,那么很多地方是一樣或類似的,這里著重說一下不同的地方。
| 區(qū)別 | Executor | ForkJoinPool |
|---|---|---|
| 接受的對(duì)象 | Runnable和Callable的實(shí)例 | Runnable、Callable和ForkJoinTask的實(shí)例 |
| 調(diào)度模式 | 處于后面等待中的任務(wù)需要等待前面任務(wù)執(zhí)行后才有機(jī)會(huì)被執(zhí)行,是否被執(zhí)行取決于具體的調(diào)度規(guī)則 | 采用work-stealing模式幫助其他線程執(zhí)行任務(wù),即ExcuteService解決的是并發(fā)問題,而ForkJoinPool解決的是并行問題。 |
對(duì)于了解類UNIX系統(tǒng)的人來說,對(duì)于fork這個(gè)詞應(yīng)該不會(huì)陌生。這里fork的含義基本相同,即一個(gè)大任務(wù)分支出多個(gè)小任務(wù)執(zhí)行,而小任務(wù)的執(zhí)行過程中可能還會(huì)分支出更小的任務(wù),如此往復(fù),直到分支出來的任務(wù)是原子任務(wù)。
而join是等待剛才fork出去的分支,返回結(jié)果。順序與fork正好相反,執(zhí)行結(jié)果不斷的join向上,最后那個(gè)大任務(wù)的結(jié)果就出來了。
其實(shí)FJP中還有一個(gè)Actor模型,但是我沒用過,就不介紹了,感興趣的可以善用搜索引擎。
Java 8 中的Stream
這個(gè)Stream不同于OIO中的Stream,不是一種輸出/輸出流,其本身不包含任何數(shù)據(jù),更像一種迭代器。這在后面的例子中會(huì)提現(xiàn)出來,這個(gè)Stream允許并行的對(duì)集合類型進(jìn)行迭代操作,并且依托于lambda表達(dá)式,可以用極為簡(jiǎn)便的代碼完成對(duì)集合的CRUD操作。而Stream之所以能夠提供并行迭代的,是因?yàn)槠鋬?nèi)部使用了FJP的模型(以下代碼若非特別說明均需要Java 8及以上)
一般來說,使用一個(gè)Stream的流程是:
- 取得一個(gè)數(shù)據(jù)源 source
- 數(shù)據(jù)轉(zhuǎn)換
- CRUD操作
- 返回新的Stream
Stream不會(huì)改變數(shù)據(jù)源,每次都會(huì)返回一個(gè)新的數(shù)據(jù)集合。而數(shù)據(jù)源可以是這些來源:
- Collection 對(duì)象
- Collection.stream()
- Collection.parallelStream()
- 數(shù)組
- Arrays.stream(T array)
- Stream.of()
- BufferedReader
- BufferedReader.lines()
- java.util.stream.IntStream.range()
- java.nio.file.Files.walk()
- java.util.Spliterator
- Random.ints()
- BitSet.stream()
- Pattern.splitAsStream(java.lang.CharSequence)
- JarFile.stream()
Stream的操作大致分為兩大列:
- Intermediate,一個(gè)Stream后面可以跟隨任意個(gè)Intermediate操作。其主要目的過過濾、映射數(shù)據(jù),值得一提的是intermediate是lazy的,因此只有調(diào)用相關(guān)方法后才會(huì)進(jìn)行相關(guān)Stream的真正操作(例如打開文件等)
- Terminal,一個(gè)Stream只能有一個(gè)Terminal。一旦執(zhí)行操作后,這個(gè)Stream就已經(jīng)結(jié)束了,因此Terminal一定是一個(gè)Stream的最后一個(gè)操作。Terminal的調(diào)用才會(huì)真正開始Stream的遍歷,并且會(huì)產(chǎn)生一個(gè)結(jié)果。
Stream的使用方法
理論安利的半天,看看比較直觀的代碼,比如從隨機(jī)數(shù)中找到大于x的值:(輸出大于50的數(shù)字)
public class StreamExample {
public static void main(String[] args) {
IntStream stream = new Random().ints(0, 100).limit(50); // 構(gòu)造Stream,生成50個(gè)[0,100)之間隨機(jī)數(shù),這行代碼結(jié)束的時(shí)候,數(shù)字還沒有生成
stream.filter(value -> value > 50) // 此時(shí)隨機(jī)數(shù)還沒有生成
.forEach(System.out::println); // 直到要輸出的時(shí)候,才從數(shù)據(jù)源獲取數(shù)據(jù)
}
}
代碼有沒有很簡(jiǎn)潔?傳統(tǒng)方法需要各種各樣的for循環(huán),這里全部沒有了。首先要格外強(qiáng)調(diào)的是:
- Stream是延遲操作的
- Stream本身是不包含任何數(shù)據(jù)
- Stream的數(shù)據(jù)均來自于數(shù)據(jù)源
- Stream只有執(zhí)行Terminal操作時(shí),才從數(shù)據(jù)源上獲取數(shù)據(jù)
- Stream的(輸入)數(shù)據(jù)源可以是無窮大的
- Stream的輸出不能是無窮的,必須是一個(gè)有限集合
這里舉個(gè)例子,說明一下數(shù)據(jù)源可以是無限的。常規(guī)的集合,數(shù)組、列表等都是有限集合,集合可以是非常大(受限于硬件限制),但必定有限。什么是無限的集合?數(shù)學(xué)上有個(gè)概念,叫自然數(shù),定義是所有正整數(shù)加上0的集合,而正整數(shù)這個(gè)子集合是無窮的。那么在Java中如何表示這個(gè)無限集合 自然數(shù)呢?
class NaturalNumber implements Supplier<BigInteger> {
private BigInteger num;
public NaturalNumber() {
this.num = BigInteger.valueOf(-1);
}
@Override
public BigInteger get() {
this.num = this.num.add(BigInteger.ONE);
return this.num;
}
}
這樣就構(gòu)造了一個(gè)無限的自然數(shù)集合,通過Stream.generate()方法來構(gòu)建與這個(gè)無限集合相關(guān)的Stream對(duì)象,Stream每次獲取值或調(diào)用get方法,無窮無盡。另外還有一個(gè)更簡(jiǎn)便的無窮的自然數(shù)集合,只有一句話:
Stream.iterate(0, val -> val + 1);
不過實(shí)際上這個(gè)有有窮的集合,受限于Integer數(shù)據(jù)類型的限制,最大只能到Integer.MAX_VALUE
那么什么是輸出不能是無窮的呢?有輸入,就可以輸出,為什么不能無限輸出呢?以這個(gè)自然數(shù)發(fā)生器來看個(gè)例子:
public class StreamExample {
public static void main(String[] args) {
Stream.generate(new NaturalNumber()).forEach(System.err::println);
}
}
編譯沒有問題,運(yùn)行起來也沒有問題。但是...似乎程序永遠(yuǎn)也不會(huì)停下來,因?yàn)镾tream能夠得到無窮的輸入,那么就可以無盡的輸出。永不停歇,大多數(shù)情況下,我們不希望程序會(huì)這樣,同樣以這個(gè)自然數(shù)發(fā)生器為例,可能我希望計(jì)算從m到n自然數(shù)的累加值。但是數(shù)據(jù)源是無限的,怎么辦?
public class StreamExample {
static class FinalFieldHelper<T> {
private T obj;
public FinalFieldHelper(T obj) {
this.obj = obj;
}
public T value() {
return this.obj;
}
public void value(T obj) {
this.obj = obj;
}
}
public static void main(String[] args) {
final int m = 10000, n = 100000;
final FinalFieldHelper<BigInteger> result = new FinalFieldHelper<>(BigInteger.ZERO);
Stream.generate(new NaturalNumber()).limit(n).skip(m).forEach(bigInteger -> result.value(bigInteger.add(result.value())));
System.err.printf("from %d to %d -> %s%n",m,n,result.value().toString());
}
}
是的,正如你所見的那樣,使用limit方法,將一個(gè)無限集合截取成有限集合,然后再進(jìn)行操作。因?yàn)閷?duì)于無限集合而言,調(diào)用任何一個(gè)Terminal操作都會(huì)導(dǎo)致程序掛起。(FinalFieldHelper是一個(gè)輔助類,因?yàn)閮?nèi)部類訪問外部類的變量必須是final的,所以在這里我無法更新result的值,用了這么個(gè)類變通一下)
這里介紹一下Stream的一些常用方法
| 方法 | 用途 |
|---|---|
| distinct | 去除重復(fù)對(duì)象,其結(jié)果依賴于具體對(duì)象的equals方法 |
| filter | 過濾數(shù)據(jù)源中的結(jié)果,產(chǎn)生新的Stream,參數(shù)為過濾的方法 |
| map | 對(duì)于Stream中包含的元素使用給定的轉(zhuǎn)換函數(shù)進(jìn)行轉(zhuǎn)換操作,新生成的Stream只包含轉(zhuǎn)換生成的元素。這個(gè)方法有三個(gè)對(duì)于原始類型的變種方法,分別是:mapToInt,mapToLong和mapToDouble。這三個(gè)方法也比較好理解,比如mapToInt就是把原始Stream轉(zhuǎn)換成一個(gè)新的Stream,這個(gè)新生成的Stream中的元素都是int類型。之所以會(huì)有這樣三個(gè)變種方法,可以免除自動(dòng)裝箱/拆箱的額外消耗; |
| flatMap | 和map類似,不同的是其每個(gè)元素轉(zhuǎn)換得到的是Stream對(duì)象,會(huì)把子Stream中的元素壓縮到父集合中; |
| peek | 生成一個(gè)包含原Stream的所有元素的新Stream,同時(shí)會(huì)提供一個(gè)消費(fèi)函數(shù)(Consumer實(shí)例),新Stream每個(gè)元素被消費(fèi)的時(shí)候都會(huì)執(zhí)行給定的消費(fèi)函數(shù); |
| limit | 對(duì)一個(gè)Stream進(jìn)行截?cái)嗖僮?,獲取其前N個(gè)元素,如果原Stream中包含的元素個(gè)數(shù)小于N,那就獲取其所有的元素; |
| skip | 返回一個(gè)丟棄原Stream的前N個(gè)元素后剩下元素組成的新Stream,如果原Stream中包含的元素個(gè)數(shù)小于N,那么返回空Stream; |
下面就這些常用方法,寫一些對(duì)應(yīng)的例子
public class StreamTestCase {
private final Object[] source = new Object[]{"a", "b", null, "c", new String[]{"d1", "d2", "d3"}, "e", "a", "b", "c", "f"};
private void printForEach(String methodName, Stream stream) {
if (methodName != null)
System.err.printf("===%s Start===%n", methodName);
System.err.print('[');
stream.forEach(o -> {
if (o == null)
System.err.print(o);
else if (o instanceof Stream)
this.printForEach(null, (Stream) o);
else if (o instanceof String || o instanceof Boolean)
System.err.print(o);
else {
Object[] obj = (Object[]) o;
System.err.print('[');
for (Object oo : obj) {
System.err.print(oo);
System.err.print(' ');
}
System.err.print(']');
}
System.err.print(' ');
});
System.err.println(']');
if (methodName != null)
System.err.printf("===%s Finish===%n", methodName);
}
@Test
public void distinctTest() {
this.printForEach("distinctTest", Stream.of(source).distinct()); // 去掉重復(fù)元素,后面的abc就被去掉了
}
@Test
public void peekTest() {
this.printForEach("piikTest", Stream.of(source).peek(o -> System.err.println("Peek -> " + o))); // 一定要有終端方法,peek才會(huì)被調(diào)用
}
@Test
public void filterTest() {
this.printForEach("filterTest", Stream.of(source).filter(o -> o != null && o instanceof String)); // 過濾掉了null和數(shù)組
}
@Test
public void limitTest() {
this.printForEach("limitTest", Stream.of(source).limit(4)); // 截取前四個(gè)元素
}
@Test
public void skipTest() {
this.printForEach("skipTest", Stream.of(source).skip(4)); // 去掉前四個(gè)元素
}
@Test
public void mapTest() {
// 將源對(duì)象根據(jù)自定義規(guī)則進(jìn)行類型轉(zhuǎn)換
// 我的轉(zhuǎn)化規(guī)則是null保持不變
// 其他元素非String的轉(zhuǎn)換為True
// String類型,首字母Ascii碼為偶數(shù)的為True 其余false
this.printForEach("mapTest", Stream.of(source).map(o -> {
if (o == null) return null;
if (o.getClass().isArray()) return Boolean.TRUE;
return (o.toString().charAt(0) & 1) == 0;
}));
}
@Test
public void flatMapTest() {
this.printForEach("flatMapTest", Stream.of(source).flatMap((Function<Object, Stream<?>>) o -> {
if (o == null) return null;
if (o.getClass().isArray()) return Stream.of(Boolean.TRUE);
return Stream.of((o.toString().charAt(0) & 1) == 0);
}));
}
}