RxJava2源碼學習總結(jié)(二)Map講解

昨天總結(jié)了RxJava2中Observable.create().subscribe()源碼,沒看的小伙伴可以看看昨天的博客地址,今天我們來學習下map,如果看懂了昨天的,那么今天的這個map很快就吸收了,我先放張圖片,思想跟昨天的一樣

圖片1.png
Observable.create(
                //===============這里是第一部分A===============
                new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("你好世界");
                        e.onComplete();
                    }
                }
                //===============這里是第一部分A-結(jié)束===============
        ).map(
                //===============這里是第二部分B===============
                new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) {
                        return Integer.parseInt(s);
                    }
                    //===============這里是第二部分B===============
                }).subscribe(
                //===============這里是第三部分c==============
                new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                }
                //===============這里是第三部分c==============
        );

分為3大部分A B C,關(guān)于A部分的我就不講解了,因為昨天已經(jīng)涉及到了,今天講解B部分
老規(guī)矩看看.map里面的源碼

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
      //繼續(xù)點擊ObservableMap,這里的this表示的上游的Observable,也就是我們的A
      //因為我們調(diào)用的是 Observable.map方法,所以mapper表示的就是B部分了
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

Function接口代碼,這個沒什么說的

public interface Function<T, R> {
    R apply(T t) throws Exception;
}

ObservableMap類代碼:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
      //保存了上游Observable
        super(source);
        this.function = function;//我們的B部分
    }

這里我們看到了AbstractObservableWithUpstream,它是一個抽象類,其實它也是Observable因為他繼承了Observable

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

所以我們可以把AbstractObservableWithUpstream當做是Observable。
剛剛我們的this,傳遞給了ObservableSource,ObservableSource它是一個接口來著

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

剛好我們Observable這個抽象類實現(xiàn)了ObservableSource這個接口

public abstract class Observable<T> implements ObservableSource<T>{
 //....省略很多代碼,貼出實現(xiàn)方法
   //等會我們講解Observable.create().map.subscribe()的時候會講到這個方法
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //subscribeActual這個方法很重要,跟我們昨天講的Observable.create().subscrible調(diào)用一樣
            subscribeActual(observer);
        } catch (NullPointerException e) { 
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

記得開頭我的圖片么,我們還有一個C部分沒說,它是Observable.create().map.subscribe(new Observer<Integer>).....onNext ...onError ....省略....
我們點擊這個subscribe進入,不就上面我們說的那部分代碼么?在subscribe方法里面它就會執(zhí)行subscribeActual(observer);再次點擊這個subscribeActual,發(fā)現(xiàn)是一個抽象類

    protected abstract void subscribeActual(Observer<? super T> observer);

那么它的實現(xiàn)在哪呢?大家記得ObservableMap這個類么?
它繼承了AbstractObservableWithUpstream,AbstractObservableWithUpstream繼承Observable,所以實現(xiàn)類就在ObservableMap里面了,如下代碼

//我們又回到了ObservableMap類里面了
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        //接下來會去執(zhí)行new MapObserver<T, U>(t, function),
          //然后再source.subscribe()(這里關(guān)鍵了),會去調(diào)用A部分的subscribeActual,并將MapObserver傳遞給CreateEmitter里面的observer
        source.subscribe(new MapObserver<T, U>(t, function));
    }
//
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
         U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

    //省略..............
}

//這里有個BasicFuseableObserver,我們看看代碼
abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>{
    //省略..............
}

上面代碼里面我們知道它進入了subscribeActual方法后,回去new MapObserver,并且傳遞2參數(shù) t和function,t是什么不就是我們的C部分么
,function是什么,不就是我們的B部分么,MapObserver 它繼承了BasicFuseableObserver,BasicFuseableObserver里面是實現(xiàn)了observer的接口,也就是實現(xiàn)了onComplete onError.......這些方法,沒有實現(xiàn)onNext, onNext的實現(xiàn),交給了MapObserver它是BasicFuseableObserver的繼承者。看看BasicFuseableObserver里面的實現(xiàn)Observer接口

BasicFuseableObserver的類
  @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        done = true;
        actual.onError(t);
    }

 @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        actual.onComplete();
    }

回過頭來繼續(xù)看ObservableMap類里面的代碼

ObservableMap類
  public void subscribeActual(Observer<? super U> t) {
        //接下來會去執(zhí)行new MapObserver<T, U>(t, function),
          //然后再source.subscribe()(這里關(guān)鍵了)
        source.subscribe(new MapObserver<T, U>(t, function));
    }

我們剛剛分析完了new MapObserver<T, U>(t, function),現(xiàn)在到了 source.subscribe,source是啥呢?它是我們的A部分代碼就是我們傳遞進來的this,當它去調(diào)用source.subscribe(new MapObserver<T, U>(t, function));,也就是Observable調(diào)用的subscribe,那么進一步調(diào)用抽象類subscribeActual

ObservableCreate 源碼
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;    //這里是將MapObserver傳遞進來了,
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);//這里調(diào)用的是ObservableMap里面的onNext,
                                  //因為MapObserver實現(xiàn)了Observer的接口onNext
            }
        }
        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

我們在Activity里面這樣調(diào)用的

   Observable.create(
                        //===============這里是第一部分A===============
            new ObservableOnSubscribe<String>() {
                      @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
//這里會調(diào)用CreateEmitter里面的onNext方法,
//因為它實現(xiàn)了ObservableEmitter
//最終會調(diào)用ObservableMap里面的onNext
                  e.onNext("1"); 
                  e.onComplete();
                   }
             }
                        //===============這里是第一部分A-結(jié)束===============
                ).map(
                       .....省略
                );
                    .....省略
MapObserver類
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
          //直接調(diào)用這里
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);  //這里的actual就是C部分的代碼
        }

actual始終表示我們的下一級
總的來說流程是這樣的
Observable.create(A).map(B).subscribe(C)
從create(A)---->進入到Observable,點擊new ObservableCreate<T>(source)---->進入到ObservableCreate,ObservableCreate繼承了Observable,所以里面必須要實現(xiàn)一個抽象類subscribeActual,此時我們的值賦值給了this.source。

從map(B)----->進入到Observable,點擊new ObservableMap<T, R>(this, mapper)------>進入到ObservableMap,我們將 Observable.create(A),傳遞給了source,將B傳遞給了function,ObservableMap繼承了AbstractObservableWithUpstream同時也繼承了Observable,中規(guī)中矩我們可以認為他是一個Observable,所以它必須實現(xiàn)subscribeActual,source表示的是我們 Observable.create(A),t 就表示我們的C部分。

從subscribe(C)----->進入到Observable,發(fā)現(xiàn)里面有subscribeActual,它是一個抽象類,它的實現(xiàn)在ObservableMap完成,所以此時我們的注意力應該在ObservableMap的subscribeActual的MapObserver類里面,t 在上面已經(jīng)描述了它是C部分,function是B,那么這樣傳遞下來后,在MapObserver的構(gòu)造函數(shù)里,t傳給了actual為=C,function傳給了mapper=B。我們要知道MapObserver類的繼承類BasicFuseableObserver實現(xiàn)了Observer,所以它實現(xiàn)了onError,onComplete,偏偏沒有實現(xiàn)onNext,它的實現(xiàn)就在MapObserver類里面。

  @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

source是我們的A部分,其實就是調(diào)用A類的Observable

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);//看到這里沒有,調(diào)用這個抽象類
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

subscribeActual(observer);注意這里的這句代碼,然后就會調(diào)用ObservableCreate里面的subscribeActual

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
//CreateEmitter實現(xiàn)了ObservableEmitter,
//當我們在A中調(diào)用onNext其實就是調(diào)用我媽媽的CreateEmitter實現(xiàn)的方法
//這里面的observer是我們的MapObserver
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;//我們的MapObserver
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);//調(diào)用MapObserver的接口
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

關(guān)系圖.png

好了今天的map講完了,謝謝大家。
下一篇文章subscribeOn:http://www.itdecent.cn/p/4adb8df73a9c

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容