Java8 Lambda實(shí)現(xiàn)源碼解析

Java8的lambda應(yīng)該大家都比較熟悉了,這篇文章主要從源碼層面探討一下lambda的設(shè)計(jì)和實(shí)現(xiàn)。

先看下面的示例代碼:

    static class A {
        @Getter
        private String a;

        @Getter
        private Integer b;

        public A(String a, Integer b) {
            this.a = a;
            this.b = b;
        }
    }

    public static void main(String[] args) {
        List<Integer> ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream()
            .map(A::getB)
            .filter(b -> b >= 2)
            .collect(Collectors.toList());
        System.out.println(ret);
    }

上面代碼中,其實(shí)主要就是幾步:

  1. ArrayList.stream
  2. .map
  3. .filter
  4. .collect

一步步來看,ArrayList.stream 實(shí)際調(diào)用的是Collector.stream方法:

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

spliterator()方法生成的是 IteratorSpliterator 對(duì)象,spliterator的意思就是可以split的iterator,這個(gè)主要是用于lambda中的parallelStream中的并行操作,上面的例子中由于調(diào)用的是stream,所以parallel=false。

StreamSupport.stream最后生成的是一個(gè)ReferencePipeline.Head對(duì)象:

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Head類是從ReferencePipeline派生的,表示lambda的pipeline中的頭節(jié)點(diǎn)。

有了這個(gè)Head對(duì)象之后,在它之上調(diào)用.map,實(shí)際上就是調(diào)用了基類ReferencePipeline.map方法:

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

返回的是一個(gè)StatelessOp,表示一個(gè)無狀態(tài)的算子,這個(gè)類也是ReferencePipeline的子類,可以看到它的構(gòu)造函數(shù),第一個(gè)參數(shù)this,表示把Head對(duì)象作為StatelessOp對(duì)象的upstream,也就是它的上游。StatelessOp.opWrapSink方法先不講,后面會(huì)講到。

接著調(diào)用StatelessOp.filter方法,也還是會(huì)回到ReferencePipeline.filter方法:

    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

可以看到,仍然生成的是一個(gè)StatelessOp對(duì)象,只是它的upstream變了而已。

最后調(diào)用StatelessOp.collect,繼續(xù)回到ReferencePipeline.collect方法:

    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

在前面幾步,.map, .filter方法其實(shí)都只是創(chuàng)建StatelessOp對(duì)象,但是到collect就不一樣了,了解spark/flink的就知道,collect其實(shí)是個(gè)action/sink,調(diào)用了collect,就會(huì)真實(shí)地觸發(fā)這個(gè)stream上各個(gè)operator的執(zhí)行。這也就是我們經(jīng)常聽到的lazy execution,所有的操作,只有碰到action的算子才會(huì)開始執(zhí)行。

之前講到這個(gè)stream的parallel=false,所以上面的實(shí)際執(zhí)行邏輯是:

A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

在進(jìn)入evaluate方法之前,先看一下ReduceOps.makeRef(collector),它實(shí)際上就是基于Collectors.toList生成的CollectorImpl實(shí)例包裝了一層,返回了一個(gè) TerminalOp對(duì)象(實(shí)際是ReduceOp)。

    public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0;
            }
        };
    }

上面代碼可以看到,基本也就是直接調(diào)用了collector的實(shí)現(xiàn),稍微需要注意的是,ReducingSink從Box派生,Box的意思就是盒子,它里面有個(gè)state成員,表示一個(gè)計(jì)算的狀態(tài)。ReducingSink就是通過這個(gè)state,進(jìn)行combine, accumulate操作(實(shí)際就是一個(gè)List)。

回到evaluate方法,它實(shí)際調(diào)用了:

terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

這里this就是最后階段的ReferencePipeline,即StatelessOp,這里我們稱它為 ReferencePipeline$2,即經(jīng)過兩個(gè)算子操作的pipeline。

sourceSpliterator 則會(huì)取到sourceStage的spliterator,即最上面Head的spliterator。

ReduceOp.evaluateSequential:

        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

helper即ReferencePipeline$2,這里makeSink即上面返回的ReducingSink重載的方法。
ReferencePipeline.wrapAndCopyInto,在其父類AbstractPipeline中實(shí)現(xiàn):

        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;

wrapSink代碼:

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

可以看到,這里就是將pipeline從后至前,分別調(diào)用每個(gè)pipeline的opWrapSink方法,就是一個(gè)責(zé)任鏈的模式。opWrapSink可以看上面map的opWrapSink的filter的opWrapSink實(shí)現(xiàn),map的很簡單,直接調(diào)用mapper.apply,實(shí)際上就是A::getB方法,filter的也很簡單,調(diào)用的是 predicate.test 方法。

接下來到copyInto方法,到這里才會(huì)有真正的執(zhí)行邏輯:

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

它會(huì)走入到這部分的邏輯中:

wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();

這里面最重要的是就是中間這行了,由于spliterator持有的Collection引用,是ArrayList,因此它會(huì)調(diào)用ArrayList.forEachRemaining方法:

public void forEachRemaining(Consumer<? super E> action) {
    // ...
    if ((i = index) >= 0 && (index = hi) <= a.length) {
       for (; i < hi; ++i) {
           @SuppressWarnings("unchecked") E e = (E) a[i];
           action.accept(e);
       }
       if (lst.modCount == mc)
           return;
   }
    // ...

這里的action參數(shù),就是上面經(jīng)過責(zé)任鏈封裝的Sink(它也是Consumer的子類)。
而這里調(diào)用action.accept,就會(huì)通過責(zé)任鏈來一層層調(diào)用每個(gè)算子的accept,我們從map的accept開始:

@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    return new Sink.ChainedReference<P_OUT, R>(sink) {
        @Override
        public void accept(P_OUT u) {
            downstream.accept(mapper.apply(u));
        }
    };
}

可以看到,它先調(diào)用mapper.apply,然后把結(jié)果直接傳給downstream.accept,也就是調(diào)用filter的accept,接著來到ReducingSink.accept,也就是往state中添加一個(gè)結(jié)果元素,這樣forEach執(zhí)行完之后,結(jié)果自然就有了。

看完上面的流程,接下來看一下lambda里面部分類設(shè)計(jì),首先來看一下Stream,它的基類是BaseStream,提供以下接口:

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    /**
     * 返回stream中元素的迭代器
        */
    Iterator<T> iterator();

    /**
     * 返回stream中元素的spliterator,用于并行執(zhí)行
     */
    Spliterator<T> spliterator();

    /**
     * 是否并行
     */
    boolean isParallel();

    /**
     * 返回串行的stream,即強(qiáng)制parallel=false
     */
    S sequential();

    /**
     * 返回并行的stream,即強(qiáng)制parallel=true
     */
    S parallel();

    // ...
}

直接繼承此接口的,是如IntStream, LongStream,DoubleStream等,這些是在BaseStream基礎(chǔ)上,提供了filter, map, mapToObj, distinct等算子的接口,但是這些算子,是限定類型的,如IntStream.filter, 它接受的就是 IntPredicate, 而不是常規(guī)的Predicate;map方法也是,接受的是 IntUnaryOperator。

IntStream, LongStream這些都是接口,也就是僅僅用來描述算子的。它們的實(shí)現(xiàn)都是基于Pipeline的,基類為 AbstractPipeline,它的幾個(gè)關(guān)鍵成員變量:

     /**
      * 最頂上的pipeline,即Head
      */
    private final AbstractPipeline sourceStage;

    /**
     * 直接上游pipeline
     */
    private final AbstractPipeline previousStage;

    /**
     * 直接下游pipeline
     */
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;

    /**
     * pipeline深度
     */
    private int depth;

    /**
     * head的spliterator
     */
    private Spliterator<?> sourceSpliterator;

     // ...

這個(gè)基類還提供了pipeline的基礎(chǔ)實(shí)現(xiàn),以及對(duì)BaseStream和PipelineHelper接口的實(shí)現(xiàn),如evaluate, sourceStageSpliterator, wrapAndCopyInto, wrapSink等。

類似地,從AbstractPipeline派生的子類有:IntPipeline, LongPipeline, DoublePipeline, ReferencePipeline等。前面三種比較容易理解,提供的是基于原始類型的lambda操作(且都實(shí)現(xiàn)了對(duì)應(yīng)的XXStream接口),而ReferencePipeline提供的是基于對(duì)象的lambda操作。

類層次如下:


image

注意這些子類,也都是abstract的,每一種pipeline下面,都有Head, StatelessOp, StatefulOp三個(gè)子類。分別用于描述pipeline的頭節(jié)點(diǎn),無狀態(tài)中間算子,有狀態(tài)中間算子。
Head是非抽象類,StatelessOp也是抽象類,它在map、filter、mapToObj等算子中,會(huì)動(dòng)態(tài)創(chuàng)建它的匿名子類,并實(shí)現(xiàn)opWrapSink方法。

通過這種設(shè)計(jì),除了collect之外,所有算子的返回結(jié)果都是Stream的子類,在IntPipeline中,map, flatMap, filter等都返回IntStream,即使它們的實(shí)現(xiàn)可能是StatelessOp, Head等,都對(duì)外提供了統(tǒng)一的接口。同時(shí)由于lambda中每個(gè)算子的實(shí)現(xiàn)是動(dòng)態(tài)的,如最上面例子中A::getB, b -> b>=2等,那就通過每個(gè)算子重載 opWrapSink 方法來動(dòng)態(tài)封裝這些邏輯。

同時(shí),通過將XXStream和XXPipeline分開的設(shè)計(jì),可以保持Stream接口的簡潔(對(duì)用戶透出的接口)。否則如果將BaseStream做成抽象類,將AbstractPipeline相關(guān)的邏輯移到這里面,會(huì)導(dǎo)致Stream變得非常臃腫,在API層面用戶使用的時(shí)候也會(huì)很困惑。

創(chuàng)建Pipeline的地方,則統(tǒng)一收口到了StreamSupport類中,這是一個(gè)大的工廠類。雖然ArrayList, Arrays等類中都提供了stream的方法,但是最后都統(tǒng)一調(diào)用了StreamSupport里來創(chuàng)建Pipeline的實(shí)例,通常也就是創(chuàng)建 XXPipeline.Head對(duì)象,然后通過這個(gè)對(duì)象進(jìn)行其他lambda算子的添加。

接下來看一個(gè)相對(duì)比較復(fù)雜的例子,雙流concat的場景,代碼如下:

    static class Mapper1 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand * operand;
        }
    }

    static class Filter1 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 2;
        }
    }

    static class Mapper2 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand + operand;
        }
    }

    static class Filter2 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 10;
        }
    }

    static class Mapper3 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand * operand;
        }
    }

    static class Filter3 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 10;
        }
    }

    public static void main(String[] args) {
        IntStream s1 = Arrays.stream(new int[] {1, 2, 3})
            .map(new Mapper1())
            .filter(new Filter1());

        IntStream s2 = Arrays.stream(new int[] {4, 5, 6})
            .map(new Mapper2())
            .filter(new Filter2());

        IntStream s3 = IntStream.concat(s1, s2)
            .map(new Mapper3())
            .filter(new Filter3());
        int sum = s3.sum();
    }

上面代碼中,先分別創(chuàng)建兩個(gè)IntStream:s1, s2。然后進(jìn)行concat操作,生成s2,最后調(diào)用sum操作做reduce。

代碼分析還是從sink開始,reduce跟前面的collect類似,實(shí)際會(huì)基于s3這個(gè)stream, 在AbstractPipeline.evaluate方法中執(zhí)行:

terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

這里terminalOp即為sum這個(gè)ReduceOp,sourceSpliterator為Streams.ConcatSpliterator,也即調(diào)用s3這個(gè)pipeline的wrapAndCopyInto方法:

    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

這里的wrapSink,就會(huì)將s3中的算子與最后的reduce串在一起,大致如下:
Head(concated s1 + s2 stream) -> Mapper3 -> Filter3 -> ReduceOp(sum)

到目前為止,我們還只看到s3的邏輯,那么s1和s2兩個(gè)stream的mapper和filter邏輯在哪里呢,接著看下面的copyInto方法:

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
            // ...

上面講到,這里的spliterator是Streams.ConcatSpliterator對(duì)象,看下Streams.ConcatSpliterator.forEachRemaining實(shí)現(xiàn):

        public void forEachRemaining(Consumer<? super T> consumer) {
            if (beforeSplit)
                aSpliterator.forEachRemaining(consumer);
            bSpliterator.forEachRemaining(consumer);
        }

這里就區(qū)分出了兩個(gè)不同的流,對(duì)每個(gè)流的spliterator分別調(diào)用forEachRemaining方法,這里的spliterator是IntWrappingSpliterator, 它是對(duì)s1/s2的一個(gè)封裝,它有兩個(gè)關(guān)鍵成員:

            // 包裝的原始pipeline
        final PipelineHelper<P_OUT> ph;

        // 原始pipeline的spliterator
        Spliterator<P_IN> spliterator;

所以就走到了 IntWrappingSpliterator.foreachMaining方法中:

        public void forEachRemaining(IntConsumer consumer) {
            if (buffer == null && !finished) {
                Objects.requireNonNull(consumer);
                init();

                ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
                finished = true;
            }
            // ...

可以看到,又調(diào)用了原始pipeline的wrapAndCopyInto方法中,而這里的consumer即為上面s3的邏輯。這樣又遞歸回到了AbstractPipeline.wrapAndCopyInto -> AbstractPipeline.wrapSink
-> AbstractPipeline.copyInto方法中,而在這時(shí)的wrapSink中,現(xiàn)在的pipeline就是s1/s2了,這時(shí)就會(huì)對(duì)s1/s2下面的所有算子,調(diào)用AbstractPipeline.opWrapSink串聯(lián)起來,以s1為例就變成:
Head(array[1,2,3]) -> Mapper1 -> Filter1 -> Mapper3 -> Filter3 -> ReduceOp(sum)

這樣s1流跟s3流就串起來執(zhí)行完成了,然后就是s2和s3流串起來執(zhí)行。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容