Java 多線程模型與并發(fā)設(shè)計(jì)

序言

上一次提到了Java 1.5中提供新的多線程模型,在大多數(shù)情況下,這已經(jīng)能夠滿足日常開發(fā)的需要。但是偶爾也許覺得那一套模型還是覺得欠缺點(diǎn)什么,于是乎,Java 7/8中又提供了新的多線程模型。

Java 8中提供了并行流以及**ForkJoinPool **(FJP)和lambda(據(jù)說Java 8的lambda只是語法糖,沒有深究過)

ForkJoinPool / ForkJoinTask

這一套工具是由Java 7提供的。要使用這種方法之前,應(yīng)該有所了解函數(shù)式編程,如果有過JavaScript或者其他一個(gè)腳本語言的開發(fā),應(yīng)該對(duì)此不會(huì)陌生。另外,通過這種方法,比較難確定實(shí)際上是否使用了超過一個(gè)線程,因?yàn)檫@是由的具體實(shí)現(xiàn)決定的。最后,在默認(rèn)情況下是通過ForkJoinPool.commonPool()實(shí)現(xiàn)并行的。這個(gè)通用池由JVM來管理,并且被JVM進(jìn)程內(nèi)的所有線程共享。(以下示例代碼若非特別說明均要求Java 7及以上,部分代碼出于簡(jiǎn)潔,使用了lambda表達(dá)式,因此需要Java 8及以上才可以運(yùn)行??梢詫ambda表達(dá)式用匿名內(nèi)部類替代,即可在Java 7下編譯通過)

多線程經(jīng)常會(huì)伴隨著并行計(jì)算(并行不等于并發(fā)),雖然并不絕對(duì),但是通常與并行或多或少存在著聯(lián)系。而并行計(jì)算的特點(diǎn)在于將較為復(fù)雜、龐大的任務(wù),拆解成互不相干、較為簡(jiǎn)單、小型的任務(wù),最后將各個(gè)小任務(wù)的結(jié)果匯總、分析、處理,得到原本大任務(wù)的結(jié)果。這樣做的目的無非是提高效率,充分利用硬件計(jì)算資源,有效規(guī)避瓶頸效應(yīng)。

接下來以計(jì)算y! - x!為例,展示代碼:

// 計(jì)算y! - x!的值
class MyJob extends RecursiveTask<Integer> {

    private int y;
    private int x;

    public MyJob(int x, int y) {
        this.x = x;
        this.y = y;
    }

    @Override
    protected Integer compute() {
        if (x > 2) { // 先計(jì)算x的階乘
            MyJob subJobX = new MyJob(x - 1, -1);
            subJobX.fork();
            x *= subJobX.join();

            if (y == -1) { // 判斷是否為遞歸計(jì)算
                return x; // 遞歸計(jì)算則返回階乘結(jié)果
            } else if (y > 2) { // 計(jì)算y的階乘
                MyJob subJobY = new MyJob(y - 1, -1);
                subJobY.fork();
                y *= subJobY.join();

                return y - x; // 輸出最后任務(wù)的結(jié)果
            } else { // 正常輸入,不會(huì)進(jìn)入這個(gè)分支
                System.err.println("Error.");
                return 0;
            }
        } else {
            return 2;
        }
    }
}

public class ForkJoinExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        MyJob job = new MyJob(4, 10);

        Future<Integer> result = forkJoinPool.submit(job);

        while (!result.isDone()) {
            System.err.println("Waiting for result");
            Thread.sleep(0, 1);
        }
        forkJoinPool.shutdown();
        System.err.println("The results is " + result.get());
    }
}

輸出結(jié)果:

Waiting for result
Waiting for result
The results is 3628776

在這個(gè)例子中,我把計(jì)算n * (n - 1)作為最小的任務(wù)。因此先判斷x是否大于2,如果不大于2,則fork出一個(gè)分支,計(jì)算(x - 1)!的值,y同理。最后得到x!與y!的值,相減,得出最后的結(jié)果。代碼中,fork出來的分支中,參數(shù)y我作為一個(gè)標(biāo)志,如果為-1則不是原始調(diào)用,而是fork的子任務(wù),只需要負(fù)責(zé)計(jì)算傳遞進(jìn)來x的階乘即可。

這段代碼中,你看不出任務(wù)在哪里完成(哪個(gè)線程)、由誰完成、什么時(shí)候完成。正如前面所述,甚至你很難看出來是否是大于一個(gè)線程在執(zhí)行。

當(dāng)然你可以嘗試在compte方法中增加System.err.printf("%s is running.%n",Thread.currentThread().getName());的語句來查看輸出,到底有幾個(gè)線程在運(yùn)行。不過根據(jù)我的實(shí)踐來說,一般情況下,應(yīng)該只有一個(gè)線程在運(yùn)行。這不是說代碼有問題,主要有兩個(gè)因素:

  • 代碼設(shè)計(jì)不合理,我的代碼中,計(jì)算x的階乘與y的階乘是分開的,并沒有一起fork,因此本質(zhì)上并發(fā)性其實(shí)沒有顯示出來。我的代碼其實(shí)相當(dāng)于先計(jì)算x!,然后計(jì)算y!,最后計(jì)算y! - x!
  • 由于例子中代碼的計(jì)算量很小,以當(dāng)前CPU的計(jì)算能力有盈余。針對(duì)這種情況,可以嘗試多開幾個(gè)任務(wù)同時(shí)并行查看輸出結(jié)果。

當(dāng)然我覺得上面的例子不好,因此想了另一張場(chǎng)景,并用代碼演示一下。

比如現(xiàn)在需要制作一個(gè)網(wǎng)絡(luò)爬蟲,爬什么呢,就爬簡(jiǎn)述首頁推薦文章每篇文章的字?jǐn)?shù)。代碼中涉及網(wǎng)絡(luò)請(qǐng)求和正則表達(dá)式的部分就不說明了,其中用了我自己寫的一個(gè)小工具類Spider.javaHttpRequester.java。

class JianshuSpiderJob extends RecursiveTask<List<String>> {
    private static final String HOST = "http://www.itdecent.cn";
    private String url;

    public JianshuSpiderJob() {
        this(HOST);
    }

    protected JianshuSpiderJob(String url) {
        this.url = url;
    }

    protected List<String> requestHomepage() throws IOException {
        List<String> result = new ArrayList<>();

        Spider.newHost(new URL(this.url)).get((responseCode, responseHeaders, responseStream) -> { // 請(qǐng)求簡(jiǎn)書主頁
            if (responseCode == 200) {
                Pattern indexPattern = Pattern.compile("(/p/[a-z0-9]+)\">([^<>]+)</a></h4>");
                Matcher indexMatcher = indexPattern.matcher(responseStream.toString());

                // 從這里開始派分子任務(wù)
                List<JianshuSpiderJob> subJobs = new ArrayList<>();
                while (indexMatcher.find()) {
                    String subUrl = indexMatcher.group(1);
                    String subTitle = indexMatcher.group(2);
                    JianshuSpiderJob subJob = new JianshuSpiderJob(subUrl);
                    subJob.fork();
                    subJobs.add(subJob);
                    result.add(String.format("%s=%s", subTitle, subUrl));
                }

                // 銜接子任務(wù)的結(jié)果
                for (JianshuSpiderJob job : subJobs) {
                    List<String> list = job.join();
                    if (list.size() > 0) {
                        String[] subResult = list.get(0).split("=");
                        for (int i = 0; i < result.size(); i++) {
                            if (result.get(i).indexOf(subResult[0]) > 0) {
                                String[] localResult = result.get(i).split("=");
                                result.remove(i);
                                result.add(String.format("%s=%s", localResult[0], subResult[1]));
                                break;
                            }
                        }
                    }
                }
            } else { // 網(wǎng)絡(luò)錯(cuò)誤
                System.err.println("There is an error when trying to get homepage.");
            }
            return responseCode;
        });

        return result; // 返回最終結(jié)果
    }

    protected List<String> requestSubPage() throws IOException {
        List<String> result = new ArrayList<>();
        Map<String, String> requestHeader = new HashMap<>();
        requestHeader.put("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36");
        // 獲取文章頁面的詳細(xì)信息
        Spider.newHost(new URL(HOST + this.url))
                .setRequestHeaders(requestHeader)
                .get((responseCode, responseHeaders, responseStream) -> { // 請(qǐng)求具體的文章頁
                    String html = responseStream.toString();
                    Pattern contextPattern = Pattern.compile("\"slug\":\"([a-z0-9]+)\".*?\"wordage\":(\\d+)");
                    Matcher contextMatcher = contextPattern.matcher(html);
                    if (contextMatcher.find())
                        result.add(String.format("%s=%s", contextMatcher.group(1), contextMatcher.group(2)));
                    return responseCode;
                });
        return result;
    }

    @Override
    protected List<String> compute() {
        try {
            System.err.printf("%s is running.%n", Thread.currentThread().getName()); // 顯示當(dāng)前工作線程
            if (HOST.equals(this.url)) {
                return this.requestHomepage();
            } else {
                return this.requestSubPage();
            }
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}

public class ForkJoinExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        JianshuSpiderJob job = new JianshuSpiderJob();
        forkJoinPool.submit(job);

        List<String> result = job.get();

        // 輸出結(jié)果
        System.err.println("Article\tWords");
        for (String str : result) {
            String[] s = str.split("=");
            if (s.length > 1)
                System.err.printf("%s\t%s%n", s[0], s[1]);
            else
                System.err.println("Err -> " + str);
        }
        forkJoinPool.shutdown();

    }
}

這里例子相比較之前那個(gè)計(jì)算階乘比較有典型,因?yàn)榫W(wǎng)絡(luò)請(qǐng)求本身是阻塞。一個(gè)請(qǐng)求可能幾毫秒就可以返回,也可以幾秒鐘才返回,甚至等了幾秒鐘以后,連接被中斷,請(qǐng)求失敗。例子中,并沒有考慮網(wǎng)絡(luò)異常的情況下。

運(yùn)行結(jié)果:

ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-6 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-7 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-4 is running.
Article Words
又一年輕姑娘離世:請(qǐng)記住這些“1+1=死神”的藥物!  2862
報(bào)告大王,新概念英語一至四冊(cè)全套資源(視頻、音頻、電子書)已被我活捉! 861
模式學(xué)習(xí)|找到最適合你的學(xué)習(xí)“套路”  4009
我們女孩子真不容易,既要貌美如花,又要賺錢養(yǎng)家。    1603
二十歲出頭的你,別急著想要出人頭地   2787
別怕,誰的大學(xué)不迷茫  1649
這10年,多少人從郭敬明到咪蒙 3143
老實(shí)人浪起來,你我都招架不住  2165
如果沒有回報(bào),我會(huì)堅(jiān)持寫作多久 2595
簡(jiǎn)書早報(bào)161030——《不負(fù)責(zé)任吐槽,四本買完看完就后悔的暢銷書》  2029
時(shí)而自信,時(shí)而自卑,如何改變這種雙重人生?   2855
有哪些小樂器,是學(xué)習(xí)起來非常方便的?  4259
窮人,最可怕的是總說自己窮   1838
時(shí)光回去,只愿未曾遇到你(五十七)   3109
親愛的,千萬別把孩子養(yǎng)得“輸不起”!  2580
你憑什么詆毀我的愛豆!向語言暴力Say No! 2177
史上最全36個(gè)虐腹動(dòng)作:想要馬甲線,人魚線的朋友練起來 776
兩只蝸牛的愛情 3382
比文招親【星言夙駕,說于桑田】十二   2493
《簡(jiǎn)書歷史月刊003·三千年來誰著史》上線   1118

從結(jié)果來看,整個(gè)過程最多的時(shí)候使用了8個(gè)線程來完成這個(gè)任務(wù)。每次運(yùn)行具體使用的線程數(shù)都不一樣,讀者也可以將這段代碼復(fù)制過去,并引用連接中的兩個(gè)類,看看結(jié)果如何。(由于運(yùn)行結(jié)果依賴于簡(jiǎn)書服務(wù)器返回的結(jié)果,隨著時(shí)間推移,程序結(jié)果很可能不正確,望知悉)

在整個(gè)代碼中,程序員并不知道具體任務(wù)是如何分配,程序員的關(guān)注點(diǎn)只在業(yè)務(wù)邏輯本身上,而不用關(guān)心有關(guān)于線程調(diào)度的問題。具體的調(diào)度交給里FJP。

ForkJoinTask 中拋出異常

而在ForkJoinTask,可能會(huì)引發(fā)Unchecked Exception,因此可以調(diào)用ForkJoinTask.isCompletedAbnormally()來判斷是否任務(wù)在執(zhí)行中出現(xiàn)異常。如果返回值為Throwable類型則表明在執(zhí)行過程中出現(xiàn)Unchecked Exception;若返回值為CancellationException則表明任務(wù)在執(zhí)行過程中被取消;如果任務(wù)還沒有結(jié)束或者正常完成,沒有異常,則返回null。

ForkJoinPool / ForkJoinTask 與 Executor 關(guān)聯(lián)

從類繼承圖上可以看到,F(xiàn)orkJoinPool 間接繼承了Executor,因此可以認(rèn)為兩者師出同門,只不過后者提供更加便捷API,使程序員將關(guān)注點(diǎn)更加集中在業(yè)務(wù)上。既然兩者師承一派,那么很多地方是一樣或類似的,這里著重說一下不同的地方。

區(qū)別 Executor ForkJoinPool
接受的對(duì)象 Runnable和Callable的實(shí)例 Runnable、Callable和ForkJoinTask的實(shí)例
調(diào)度模式 處于后面等待中的任務(wù)需要等待前面任務(wù)執(zhí)行后才有機(jī)會(huì)被執(zhí)行,是否被執(zhí)行取決于具體的調(diào)度規(guī)則 采用work-stealing模式幫助其他線程執(zhí)行任務(wù),即ExcuteService解決的是并發(fā)問題,而ForkJoinPool解決的是并行問題。

對(duì)于了解類UNIX系統(tǒng)的人來說,對(duì)于fork這個(gè)詞應(yīng)該不會(huì)陌生。這里fork的含義基本相同,即一個(gè)大任務(wù)分支出多個(gè)小任務(wù)執(zhí)行,而小任務(wù)的執(zhí)行過程中可能還會(huì)分支出更小的任務(wù),如此往復(fù),直到分支出來的任務(wù)是原子任務(wù)。

而join是等待剛才fork出去的分支,返回結(jié)果。順序與fork正好相反,執(zhí)行結(jié)果不斷的join向上,最后那個(gè)大任務(wù)的結(jié)果就出來了。

其實(shí)FJP中還有一個(gè)Actor模型,但是我沒用過,就不介紹了,感興趣的可以善用搜索引擎。

Java 8 中的Stream

這個(gè)Stream不同于OIO中的Stream,不是一種輸出/輸出流,其本身不包含任何數(shù)據(jù),更像一種迭代器。這在后面的例子中會(huì)提現(xiàn)出來,這個(gè)Stream允許并行的對(duì)集合類型進(jìn)行迭代操作,并且依托于lambda表達(dá)式,可以用極為簡(jiǎn)便的代碼完成對(duì)集合的CRUD操作。而Stream之所以能夠提供并行迭代的,是因?yàn)槠鋬?nèi)部使用了FJP的模型(以下代碼若非特別說明均需要Java 8及以上)

一般來說,使用一個(gè)Stream的流程是:

  1. 取得一個(gè)數(shù)據(jù)源 source
  2. 數(shù)據(jù)轉(zhuǎn)換
  3. CRUD操作
  4. 返回新的Stream

Stream不會(huì)改變數(shù)據(jù)源,每次都會(huì)返回一個(gè)新的數(shù)據(jù)集合。而數(shù)據(jù)源可以是這些來源:

  • Collection 對(duì)象
    • Collection.stream()
    • Collection.parallelStream()
  • 數(shù)組
    • Arrays.stream(T array)
    • Stream.of()
  • BufferedReader
    • BufferedReader.lines()
  • java.util.stream.IntStream.range()
  • java.nio.file.Files.walk()
  • java.util.Spliterator
  • Random.ints()
  • BitSet.stream()
  • Pattern.splitAsStream(java.lang.CharSequence)
  • JarFile.stream()

Stream的操作大致分為兩大列:

  • Intermediate,一個(gè)Stream后面可以跟隨任意個(gè)Intermediate操作。其主要目的過過濾、映射數(shù)據(jù),值得一提的是intermediate是lazy的,因此只有調(diào)用相關(guān)方法后才會(huì)進(jìn)行相關(guān)Stream的真正操作(例如打開文件等)
  • Terminal,一個(gè)Stream只能有一個(gè)Terminal。一旦執(zhí)行操作后,這個(gè)Stream就已經(jīng)結(jié)束了,因此Terminal一定是一個(gè)Stream的最后一個(gè)操作。Terminal的調(diào)用才會(huì)真正開始Stream的遍歷,并且會(huì)產(chǎn)生一個(gè)結(jié)果。

Stream的使用方法

理論安利的半天,看看比較直觀的代碼,比如從隨機(jī)數(shù)中找到大于x的值:(輸出大于50的數(shù)字)

public class StreamExample {
    public static void main(String[] args) {
        IntStream stream = new Random().ints(0, 100).limit(50); // 構(gòu)造Stream,生成50個(gè)[0,100)之間隨機(jī)數(shù),這行代碼結(jié)束的時(shí)候,數(shù)字還沒有生成

        stream.filter(value -> value > 50) // 此時(shí)隨機(jī)數(shù)還沒有生成
                .forEach(System.out::println); // 直到要輸出的時(shí)候,才從數(shù)據(jù)源獲取數(shù)據(jù)
    }
}

代碼有沒有很簡(jiǎn)潔?傳統(tǒng)方法需要各種各樣的for循環(huán),這里全部沒有了。首先要格外強(qiáng)調(diào)的是:

  • Stream是延遲操作的
  • Stream本身是不包含任何數(shù)據(jù)
  • Stream的數(shù)據(jù)均來自于數(shù)據(jù)源
  • Stream只有執(zhí)行Terminal操作時(shí),才從數(shù)據(jù)源上獲取數(shù)據(jù)
  • Stream的(輸入)數(shù)據(jù)源可以是無窮大的
  • Stream的輸出不能是無窮的,必須是一個(gè)有限集合

這里舉個(gè)例子,說明一下數(shù)據(jù)源可以是無限的。常規(guī)的集合,數(shù)組、列表等都是有限集合,集合可以是非常大(受限于硬件限制),但必定有限。什么是無限的集合?數(shù)學(xué)上有個(gè)概念,叫自然數(shù),定義是所有正整數(shù)加上0的集合,而正整數(shù)這個(gè)子集合是無窮的。那么在Java中如何表示這個(gè)無限集合 自然數(shù)呢?

class NaturalNumber implements Supplier<BigInteger> {
    private BigInteger num;

    public NaturalNumber() {
        this.num = BigInteger.valueOf(-1);
    }

    @Override
    public BigInteger get() {
        this.num = this.num.add(BigInteger.ONE);
        return this.num;
    }
}

這樣就構(gòu)造了一個(gè)無限的自然數(shù)集合,通過Stream.generate()方法來構(gòu)建與這個(gè)無限集合相關(guān)的Stream對(duì)象,Stream每次獲取值或調(diào)用get方法,無窮無盡。另外還有一個(gè)更簡(jiǎn)便的無窮的自然數(shù)集合,只有一句話:

Stream.iterate(0, val -> val + 1);

不過實(shí)際上這個(gè)有有窮的集合,受限于Integer數(shù)據(jù)類型的限制,最大只能到Integer.MAX_VALUE

那么什么是輸出不能是無窮的呢?有輸入,就可以輸出,為什么不能無限輸出呢?以這個(gè)自然數(shù)發(fā)生器來看個(gè)例子:

public class StreamExample {
    public static void main(String[] args) {
        Stream.generate(new NaturalNumber()).forEach(System.err::println);
    }
}

編譯沒有問題,運(yùn)行起來也沒有問題。但是...似乎程序永遠(yuǎn)也不會(huì)停下來,因?yàn)镾tream能夠得到無窮的輸入,那么就可以無盡的輸出。永不停歇,大多數(shù)情況下,我們不希望程序會(huì)這樣,同樣以這個(gè)自然數(shù)發(fā)生器為例,可能我希望計(jì)算從m到n自然數(shù)的累加值。但是數(shù)據(jù)源是無限的,怎么辦?

public class StreamExample {
    static class FinalFieldHelper<T> {
        private T obj;

        public FinalFieldHelper(T obj) {
            this.obj = obj;
        }

        public T value() {
            return this.obj;
        }

        public void value(T obj) {
            this.obj = obj;
        }
    }

    public static void main(String[] args) {
        final int m = 10000, n = 100000;
        final FinalFieldHelper<BigInteger> result = new FinalFieldHelper<>(BigInteger.ZERO);
        Stream.generate(new NaturalNumber()).limit(n).skip(m).forEach(bigInteger -> result.value(bigInteger.add(result.value())));

        System.err.printf("from %d to %d -> %s%n",m,n,result.value().toString());
    }
}

是的,正如你所見的那樣,使用limit方法,將一個(gè)無限集合截取成有限集合,然后再進(jìn)行操作。因?yàn)閷?duì)于無限集合而言,調(diào)用任何一個(gè)Terminal操作都會(huì)導(dǎo)致程序掛起。(FinalFieldHelper是一個(gè)輔助類,因?yàn)閮?nèi)部類訪問外部類的變量必須是final的,所以在這里我無法更新result的值,用了這么個(gè)類變通一下)

這里介紹一下Stream的一些常用方法

方法 用途
distinct 去除重復(fù)對(duì)象,其結(jié)果依賴于具體對(duì)象的equals方法
filter 過濾數(shù)據(jù)源中的結(jié)果,產(chǎn)生新的Stream,參數(shù)為過濾的方法
map 對(duì)于Stream中包含的元素使用給定的轉(zhuǎn)換函數(shù)進(jìn)行轉(zhuǎn)換操作,新生成的Stream只包含轉(zhuǎn)換生成的元素。這個(gè)方法有三個(gè)對(duì)于原始類型的變種方法,分別是:mapToInt,mapToLong和mapToDouble。這三個(gè)方法也比較好理解,比如mapToInt就是把原始Stream轉(zhuǎn)換成一個(gè)新的Stream,這個(gè)新生成的Stream中的元素都是int類型。之所以會(huì)有這樣三個(gè)變種方法,可以免除自動(dòng)裝箱/拆箱的額外消耗;
flatMap 和map類似,不同的是其每個(gè)元素轉(zhuǎn)換得到的是Stream對(duì)象,會(huì)把子Stream中的元素壓縮到父集合中;
peek 生成一個(gè)包含原Stream的所有元素的新Stream,同時(shí)會(huì)提供一個(gè)消費(fèi)函數(shù)(Consumer實(shí)例),新Stream每個(gè)元素被消費(fèi)的時(shí)候都會(huì)執(zhí)行給定的消費(fèi)函數(shù);
limit 對(duì)一個(gè)Stream進(jìn)行截?cái)嗖僮?,獲取其前N個(gè)元素,如果原Stream中包含的元素個(gè)數(shù)小于N,那就獲取其所有的元素;
skip 返回一個(gè)丟棄原Stream的前N個(gè)元素后剩下元素組成的新Stream,如果原Stream中包含的元素個(gè)數(shù)小于N,那么返回空Stream;

下面就這些常用方法,寫一些對(duì)應(yīng)的例子

public class StreamTestCase {
    private final Object[] source = new Object[]{"a", "b", null, "c", new String[]{"d1", "d2", "d3"}, "e", "a", "b", "c", "f"};

    private void printForEach(String methodName, Stream stream) {
        if (methodName != null)
            System.err.printf("===%s Start===%n", methodName);
        System.err.print('[');
        stream.forEach(o -> {
            if (o == null)
                System.err.print(o);
            else if (o instanceof Stream)
                this.printForEach(null, (Stream) o);
            else if (o instanceof String || o instanceof Boolean)
                System.err.print(o);
            else {
                Object[] obj = (Object[]) o;
                System.err.print('[');
                for (Object oo : obj) {
                    System.err.print(oo);
                    System.err.print(' ');
                }
                System.err.print(']');
            }
            System.err.print(' ');
        });
        System.err.println(']');
        if (methodName != null)
            System.err.printf("===%s Finish===%n", methodName);
    }

    @Test
    public void distinctTest() {
        this.printForEach("distinctTest", Stream.of(source).distinct()); // 去掉重復(fù)元素,后面的abc就被去掉了
    }

    @Test
    public void peekTest() {
        this.printForEach("piikTest", Stream.of(source).peek(o -> System.err.println("Peek -> " + o))); // 一定要有終端方法,peek才會(huì)被調(diào)用
    }

    @Test
    public void filterTest() {
        this.printForEach("filterTest", Stream.of(source).filter(o -> o != null && o instanceof String)); // 過濾掉了null和數(shù)組
    }

    @Test
    public void limitTest() {
        this.printForEach("limitTest", Stream.of(source).limit(4)); // 截取前四個(gè)元素
    }

    @Test
    public void skipTest() {
        this.printForEach("skipTest", Stream.of(source).skip(4)); // 去掉前四個(gè)元素
    }

    @Test
    public void mapTest() {
        // 將源對(duì)象根據(jù)自定義規(guī)則進(jìn)行類型轉(zhuǎn)換
        // 我的轉(zhuǎn)化規(guī)則是null保持不變
        // 其他元素非String的轉(zhuǎn)換為True
        // String類型,首字母Ascii碼為偶數(shù)的為True 其余false
        this.printForEach("mapTest", Stream.of(source).map(o -> {
            if (o == null) return null;
            if (o.getClass().isArray()) return Boolean.TRUE;
            return (o.toString().charAt(0) & 1) == 0;
        }));
    }

    @Test
    public void flatMapTest() {
        this.printForEach("flatMapTest", Stream.of(source).flatMap((Function<Object, Stream<?>>) o -> {
            if (o == null) return null;
            if (o.getClass().isArray()) return Stream.of(Boolean.TRUE);
            return Stream.of((o.toString().charAt(0) & 1) == 0);
        }));
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評(píng)論 19 139
  • 一、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是,通過 New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,871評(píng)論 0 44
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,308評(píng)論 25 708
  • SET答辯失敗原因 1.耗時(shí)過長(zhǎng)。在通知了答辯時(shí)間控制在30分鐘內(nèi),PPT演講時(shí)間控制在10分鐘的情況下,仍然不注...
    三笑奈若何閱讀 254評(píng)論 0 0
  • 本周小結(jié) 本周實(shí)驗(yàn)室的工作上周的用戶周一反饋說,下載的模板書里數(shù)據(jù)不對(duì),周一操作測(cè)試環(huán)境的數(shù)據(jù)庫給解決了一下,同時(shí)...
    im天行閱讀 235評(píng)論 0 1

友情鏈接更多精彩內(nèi)容