JDK8中的stream.reduce方法

作為JDK8新特性之一,stream引入了許多新的方法,reduce就是其中一種。

Reduction 操作

首先來(lái)看什么是reduce。在官方文檔中有詳細(xì)的說(shuō)明【1】,說(shuō)的很詳細(xì),這里全文引用。

A reduction operation (also called a fold) takes a sequence of input elements and combines them into a single summary result by repeated application of a combining operation, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. The streams classes have multiple forms of general reduction operations, called reduce() and collect(), as well as multiple specialized reduction forms such as sum(), max(), or count().

reduction 操作(也稱為fold),通過(guò)反復(fù)的對(duì)一個(gè)輸入序列的元素進(jìn)行某種組合操作(如對(duì)數(shù)的集合求和、求最大值,或者將所有元素放入一個(gè)列表),最終將其組合為一個(gè)單一的概要信息。stream類包含多種形式的通用reduction操作,如reduce和collect,以及其他多種專用reduction形式:sum,max或者count。

Of course, such operations can be readily implemented as simple sequential loops, as in:
int sum = 0;
for (int x : numbers) {
sum += x;
}

當(dāng)然,這種操作可以很容易的以簡(jiǎn)單序列循環(huán)的方式實(shí)現(xiàn):

    int sum = 0;
    for (int x : numbers) {
       sum += x;
    }

However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction "more abstract" -- it operates on the stream as a whole rather than individual elements -- but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless.

然而,reduce 操作有其優(yōu)勢(shì)。不僅僅是因?yàn)閞edution“更加抽象”——它將stream作為一個(gè)整體進(jìn)行操作而不是作為多個(gè)獨(dú)立的個(gè)體,而是因?yàn)楹侠順?gòu)造的reduction操作天生就是支持并行執(zhí)行的,只要處理元素的函數(shù)滿足結(jié)合律并且是無(wú)狀態(tài)的的。

For example, given a stream of numbers for which we want to find the sum, we can write:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
or:
int sum = numbers.stream().reduce(0, Integer::sum);
These reduction operations can run safely in parallel with almost no modification:
int sum = numbers.parallelStream().reduce(0, Integer::sum);

例如,給定一個(gè)數(shù)的stream,可以對(duì)其進(jìn)行求和:

    int sum = numbers.stream().reduce(0, (x,y) -> x+y);

或者:

    int sum = numbers.stream().reduce(0, Integer::sum);

這些reduction操作可以安全的并發(fā)執(zhí)行而幾乎不需要任何修改。

    int sum = numbers.parallelStream().reduce(0, Integer::sum);

Reduction parallellizes well because the implementation can operate on subsets of the data in parallel, and then combine the intermediate results to get the final correct answer. (Even if the language had a "parallel for-each" construct, the mutative accumulation approach would still required the developer to provide thread-safe updates to the shared accumulating variable sum, and the required synchronization would then likely eliminate any performance gain from parallelism.) Using reduce() instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization required.

Reduction之所以能很好的支持并行,是因?yàn)槠鋵?shí)現(xiàn)可以并行的操作數(shù)據(jù)的子集,然后再將中間結(jié)果組合起來(lái)以得到最終的正確結(jié)果。(就算語(yǔ)言內(nèi)包含了“parallel for-each”的結(jié)構(gòu),“mutative accumulation”方案下(前面看到的for loop),仍然需要開(kāi)發(fā)者為共享的累計(jì)變量sum提供線程安全的保證,而其所需的同步很可能會(huì)抵消并行帶來(lái)的好處)。使用reduce()則可以去除reduction操作在并發(fā)上的負(fù)擔(dān),庫(kù)可以提供有效的并行實(shí)現(xiàn)而不需要任何同步。

The "widgets" examples shown earlier shows how reduction combines with other operations to replace for loops with bulk operations. If widgets is a collection of Widget objects, which have a getWeight method, we can find the heaviest widget with:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();

前面的“widgets”例子展示了reduction是如何和其他操作合作,從而將for循環(huán)替換為塊操作。如果widgets是一個(gè)widget的集合,而widget有一個(gè)getWidget方法,以下代碼可以幫我們找到最重的widget:

     OptionalInt heaviest = widgets.parallelStream()
                                   .mapToInt(Widget::getWeight)
                                   .max();

In its more general form, a reduce operation on elements of type <T> yielding a result of type <U> requires three parameters:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);

更為一般的形式,一個(gè)運(yùn)行于<T>類型之上并返回<U>類型的reduce操作需要3個(gè)參數(shù):

 <U> U reduce(U identity,
              BiFunction<U, ? super T, U> accumulator,
              BinaryOperator<U> combiner);

Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element, and produces a new partial result. The combiner function combines two partial results to produce a new partial result. (The combiner is necessary in parallel reductions, where the input is partitioned, a partial accumulation computed for each partition, and then the partial results are combined to produce a final result.)

這里,identity不僅是reduction的初始值,而且在沒(méi)有輸入元素的情況下,它還是默認(rèn)的結(jié)果。accumulator函數(shù)接受一個(gè)局部結(jié)果和下一個(gè)元素,產(chǎn)生一個(gè)新的局部結(jié)果。combiner函數(shù)將兩個(gè)局部結(jié)果組合起來(lái)生成一個(gè)新的局部結(jié)果。(combiner在并行reduction中是必須的,在并行reduction中,輸入是partitioned的,對(duì)于每個(gè)partition,由局部累計(jì)函數(shù)計(jì)算其結(jié)果,然后這些局部結(jié)果會(huì)被組合以生成最終結(jié)果。)

More formally, the identity value must be an identity for the combiner function. This means that for all u, combiner.apply(identity, u) is equal to u. Additionally, the combiner function must be associative and must be compatible with the accumulator function: for all u and t, combiner.apply(u, accumulator.apply(identity, t)) must be equals() to accumulator.apply(u, t).

更為正式說(shuō)法,identity 的值對(duì)于combiner來(lái)說(shuō),必須是identify,意味著對(duì)于所有的u,combiner.apply(identity, u) 必須等于 u。此外,combiner函數(shù)必須滿足結(jié)合律,同時(shí)必須和accumulator函數(shù)兼容,即對(duì)于所有的u和t,combiner.apply(u, accumulator.apply(identity, t)) 和accumulator.apply(u, t)的結(jié)果必須是equals()的。

The three-argument form is a generalization of the two-argument form, incorporating a mapping step into the accumulation step. We could re-cast the simple sum-of-weights example using the more general form as follows:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);

三參數(shù)重載形式是兩參數(shù)重載形式的一般化,它在累積函數(shù)的基礎(chǔ)上,加入了Mapping的步驟。可以將sum-of-weights用一般化的形式重寫:

     int sumOfWeights = widgets.stream()
                               .reduce(0,
                                       (sum, b) -> sum + b.getWeight())
                                       Integer::sum);

though the explicit map-reduce form is more readable and therefore should usually be preferred. The generalized form is provided for cases where significant work can be optimized away by combining mapping and reducing into a single function.

盡管map-reduce的顯式形式因?yàn)榭勺x性更強(qiáng)而更為常用,一般化的形式仍然有其用武之地,即通過(guò)將mapping和reducing組合為一個(gè)函數(shù),優(yōu)化繁重的工作。

reduce的三種形式

reduce有三種重載形式:

Optional<T> reduce(BinaryOperator<T> accumulator)
T reduce(T identity,  BinaryOperator<T> accumulator)
<U> U reduce (U identity, 
              BiFunction<U,? super [T],U> accumulator, 
              BinaryOperator<U> combiner)

簡(jiǎn)化形式

Optional<T> reduce(BinaryOperator<T> accumulator)

該方法接受一個(gè)BinaryOperator<T>型的變量,返回Optional<T>的結(jié)果。

BinaryOperator<T>是一個(gè)函數(shù)式接口【2】,代表一個(gè)在兩個(gè)操作數(shù)上執(zhí)行的操作,生成一個(gè)和操作數(shù)類型相同的結(jié)果。

Optional<T>是JDK8的有一個(gè)特性【3】,這是一個(gè)容器對(duì)象,可以包含或者不包含一個(gè)非空的值。get()方法將獲取其包含的值,如果其不包含一個(gè)非空的值,get()將拋出異常。

    private static void testReduce1Empty(){
        List<Integer> list = new ArrayList<>();
        System.out.println(list.stream().reduce((a,b)->a+b)); // Optional.empty
    }

    private static void testReduce1EmptyGet(){
        List<Integer> list = new ArrayList<>();
        System.out.println(list.stream().reduce((a,b)->a+b).get());
        // Exception in thread "main" java.util.NoSuchElementException: 
        //No value present
        // https://docs.oracle.com/javase/8/docs/api/java/util/Optional.html
    }
應(yīng)用舉例1
    private static void testReduce1(){
        List<Integer> list = new ArrayList<>();
        list.add(1);
        System.out.println(list.stream().reduce((a,b)->a+b)); // Optional[1]
    }

    private static void testReduce1Get(){
        List<Integer> list = new ArrayList<>();
        list.add(1);
        System.out.println(list.stream().reduce((a,b)->a+b).get()); // 1
    }

類庫(kù)中的具體實(shí)現(xiàn)方法

    private static void testTransferSum(){
        List<List<Integer>> lists = new ArrayList<>();
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        List<Integer> list2 = new ArrayList<>();
        list2.add(2);
        lists.add(list1);
        lists.add(list2);

        int rel = lists.stream().mapToInt(x->x.get(0)).sum();
        System.out.println(rel);
    }

    private static void testTransferSumEmpty(){
        List<List<Integer>> lists = new ArrayList<>();

        int rel = lists.stream().mapToInt(x->x.get(0)).sum();
        System.out.println(rel);
    }

兩參數(shù)重載形式

T reduce(T identity,  BinaryOperator<T> accumulator)

相比較于簡(jiǎn)化形式,兩參數(shù)重載形式主要有兩點(diǎn)增強(qiáng):

  1. 自動(dòng)處理stream為空的情況
    private static void testTwoParaSumEmpty(){
        List<Integer> list = new ArrayList<>();
        int rel = list.stream().reduce(1,Integer::sum);
        System.out.println(rel);
    }
  1. 自定義初始值
    private static void testTwoParaSum(){
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        int rel = list.stream().reduce(1,Integer::sum);
        System.out.println(rel);
    }

三參數(shù)重載形式

<U> U reduce (U identity, 
              BiFunction<U,? super [T],U> accumulator, 
              BinaryOperator<U> combiner)

三參數(shù)重載形式引入了一個(gè)新的變量combiner,主要提供了兩項(xiàng)增強(qiáng):

  1. 可以返回同stream內(nèi)元素類型不同的結(jié)果。
  2. 在并行條件下和parallelStream一起使用,提高效率。
Interface BiFunction<T,U,R>

consumer的類型是 Interface BiFunction<T,U,R>,有3個(gè)類型參數(shù):
T - 函數(shù)第一個(gè)參數(shù)的類型(實(shí)際為U,accumulator函數(shù)返回結(jié)果的類型)
U - 函數(shù)第二個(gè)參數(shù)的類型(實(shí)際為T,stream內(nèi)元素的類型)
R - 函數(shù)返回值的類型 (這里實(shí)際為U)

實(shí)例

    private static void testThreeParaSum(){
        List<List<Integer>> lists = new ArrayList<>();
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        List<Integer> list2 = new ArrayList<>();
        list2.add(2);
        lists.add(list1);
        lists.add(list2);

        int rel = lists.stream().reduce(1, (a,b)->a + b.get(0), (x,y)->x+y);
        System.out.println(rel);
    }

在非并行的情況下,該代碼沒(méi)有問(wèn)題,它解決了stream中的元素類型和返回結(jié)果類型不同的問(wèn)題,但是其顯然不滿足文檔中對(duì) combiner的定義:identity 的值對(duì)于combiner來(lái)說(shuō),必須是identify,意味著對(duì)于所有的u,combiner.apply(identity, u) 必須等于 u。

combiner的限制

    private static void testThreeParaSumParallel(){
        List<List<Integer>> lists = new ArrayList<>();
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        List<Integer> list2 = new ArrayList<>();
        list2.add(2);
        lists.add(list1);
        lists.add(list2);

        int rel = lists.parallelStream().reduce(1, (a,b)->a + b.get(0), (x,y)->x+y);
        System.out.println(rel); // return 5 instead of 4
    }

而在非并行的情況下,conbiner其實(shí)并沒(méi)有發(fā)揮作用:

    private static void testThreeParaSumNonSense(){
        List<List<Integer>> lists = new ArrayList<>();
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        List<Integer> list2 = new ArrayList<>();
        list2.add(2);
        lists.add(list1);
        lists.add(list2);

        int rel = lists.stream().reduce(1, (a,b)->a + b.get(0), (x,y)->0);
        System.out.println(rel); // return 4
    }

【1】Reduction
【2】BinaryOperator
【3】Optional
【4】BiFunction

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容