昨天總結(jié)了RxJava2中Observable.create().subscribe()源碼,沒看的小伙伴可以看看昨天的博客地址,今天我們來學習下map,如果看懂了昨天的,那么今天的這個map很快就吸收了,我先放張圖片,思想跟昨天的一樣

Observable.create(
//===============這里是第一部分A===============
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("你好世界");
e.onComplete();
}
}
//===============這里是第一部分A-結(jié)束===============
).map(
//===============這里是第二部分B===============
new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.parseInt(s);
}
//===============這里是第二部分B===============
}).subscribe(
//===============這里是第三部分c==============
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
//===============這里是第三部分c==============
);
分為3大部分A B C,關(guān)于A部分的我就不講解了,因為昨天已經(jīng)涉及到了,今天講解B部分
老規(guī)矩看看.map里面的源碼
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//繼續(xù)點擊ObservableMap,這里的this表示的上游的Observable,也就是我們的A
//因為我們調(diào)用的是 Observable.map方法,所以mapper表示的就是B部分了
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Function接口代碼,這個沒什么說的
public interface Function<T, R> {
R apply(T t) throws Exception;
}
ObservableMap類代碼:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//保存了上游Observable
super(source);
this.function = function;//我們的B部分
}
這里我們看到了AbstractObservableWithUpstream,它是一個抽象類,其實它也是Observable因為他繼承了Observable
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
所以我們可以把AbstractObservableWithUpstream當做是Observable。
剛剛我們的this,傳遞給了ObservableSource,ObservableSource它是一個接口來著
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
剛好我們Observable這個抽象類實現(xiàn)了ObservableSource這個接口
public abstract class Observable<T> implements ObservableSource<T>{
//....省略很多代碼,貼出實現(xiàn)方法
//等會我們講解Observable.create().map.subscribe()的時候會講到這個方法
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//subscribeActual這個方法很重要,跟我們昨天講的Observable.create().subscrible調(diào)用一樣
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
記得開頭我的圖片么,我們還有一個C部分沒說,它是Observable.create().map.subscribe(new Observer<Integer>).....onNext ...onError ....省略....
我們點擊這個subscribe進入,不就上面我們說的那部分代碼么?在subscribe方法里面它就會執(zhí)行subscribeActual(observer);再次點擊這個subscribeActual,發(fā)現(xiàn)是一個抽象類
protected abstract void subscribeActual(Observer<? super T> observer);
那么它的實現(xiàn)在哪呢?大家記得ObservableMap這個類么?
它繼承了AbstractObservableWithUpstream,AbstractObservableWithUpstream繼承Observable,所以實現(xiàn)類就在ObservableMap里面了,如下代碼
//我們又回到了ObservableMap類里面了
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//接下來會去執(zhí)行new MapObserver<T, U>(t, function),
//然后再source.subscribe()(這里關(guān)鍵了),會去調(diào)用A部分的subscribeActual,并將MapObserver傳遞給CreateEmitter里面的observer
source.subscribe(new MapObserver<T, U>(t, function));
}
//
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
//省略..............
}
//這里有個BasicFuseableObserver,我們看看代碼
abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>{
//省略..............
}
上面代碼里面我們知道它進入了subscribeActual方法后,回去new MapObserver,并且傳遞2參數(shù) t和function,t是什么不就是我們的C部分么
,function是什么,不就是我們的B部分么,MapObserver 它繼承了BasicFuseableObserver,BasicFuseableObserver里面是實現(xiàn)了observer的接口,也就是實現(xiàn)了onComplete onError.......這些方法,沒有實現(xiàn)onNext, onNext的實現(xiàn),交給了MapObserver它是BasicFuseableObserver的繼承者。看看BasicFuseableObserver里面的實現(xiàn)Observer接口
BasicFuseableObserver的類
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}
回過頭來繼續(xù)看ObservableMap類里面的代碼
ObservableMap類
public void subscribeActual(Observer<? super U> t) {
//接下來會去執(zhí)行new MapObserver<T, U>(t, function),
//然后再source.subscribe()(這里關(guān)鍵了)
source.subscribe(new MapObserver<T, U>(t, function));
}
我們剛剛分析完了new MapObserver<T, U>(t, function),現(xiàn)在到了 source.subscribe,source是啥呢?它是我們的A部分代碼就是我們傳遞進來的this,當它去調(diào)用source.subscribe(new MapObserver<T, U>(t, function));,也就是Observable調(diào)用的subscribe,那么進一步調(diào)用抽象類subscribeActual
ObservableCreate 源碼
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer; //這里是將MapObserver傳遞進來了,
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);//這里調(diào)用的是ObservableMap里面的onNext,
//因為MapObserver實現(xiàn)了Observer的接口onNext
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
我們在Activity里面這樣調(diào)用的
Observable.create(
//===============這里是第一部分A===============
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//這里會調(diào)用CreateEmitter里面的onNext方法,
//因為它實現(xiàn)了ObservableEmitter
//最終會調(diào)用ObservableMap里面的onNext
e.onNext("1");
e.onComplete();
}
}
//===============這里是第一部分A-結(jié)束===============
).map(
.....省略
);
.....省略
MapObserver類
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
//直接調(diào)用這里
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v); //這里的actual就是C部分的代碼
}
actual始終表示我們的下一級
總的來說流程是這樣的
Observable.create(A).map(B).subscribe(C)
從create(A)---->進入到Observable,點擊new ObservableCreate<T>(source)---->進入到ObservableCreate,ObservableCreate繼承了Observable,所以里面必須要實現(xiàn)一個抽象類subscribeActual,此時我們的值賦值給了this.source。
從map(B)----->進入到Observable,點擊new ObservableMap<T, R>(this, mapper)------>進入到ObservableMap,我們將 Observable.create(A),傳遞給了source,將B傳遞給了function,ObservableMap繼承了AbstractObservableWithUpstream同時也繼承了Observable,中規(guī)中矩我們可以認為他是一個Observable,所以它必須實現(xiàn)subscribeActual,source表示的是我們 Observable.create(A),t 就表示我們的C部分。
從subscribe(C)----->進入到Observable,發(fā)現(xiàn)里面有subscribeActual,它是一個抽象類,它的實現(xiàn)在ObservableMap完成,所以此時我們的注意力應該在ObservableMap的subscribeActual的MapObserver類里面,t 在上面已經(jīng)描述了它是C部分,function是B,那么這樣傳遞下來后,在MapObserver的構(gòu)造函數(shù)里,t傳給了actual為=C,function傳給了mapper=B。我們要知道MapObserver類的繼承類BasicFuseableObserver實現(xiàn)了Observer,所以它實現(xiàn)了onError,onComplete,偏偏沒有實現(xiàn)onNext,它的實現(xiàn)就在MapObserver類里面。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
source是我們的A部分,其實就是調(diào)用A類的Observable
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);//看到這里沒有,調(diào)用這個抽象類
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual(observer);注意這里的這句代碼,然后就會調(diào)用ObservableCreate里面的subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//CreateEmitter實現(xiàn)了ObservableEmitter,
//當我們在A中調(diào)用onNext其實就是調(diào)用我媽媽的CreateEmitter實現(xiàn)的方法
//這里面的observer是我們的MapObserver
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;//我們的MapObserver
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);//調(diào)用MapObserver的接口
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}

好了今天的map講完了,謝謝大家。
下一篇文章subscribeOn:http://www.itdecent.cn/p/4adb8df73a9c