本文的分析基于RxJava1.1.5版本,flatMap是為了一對(duì)多的轉(zhuǎn)換而設(shè)計(jì)的,具體的實(shí)現(xiàn)運(yùn)用了merge和map的操作,而最終也還是基于了lift()方法,是轉(zhuǎn)換的思想,下面是具體的分析
1、首先創(chuàng)建一個(gè)簡(jiǎn)單的例子,代碼如下
final List<Student> students = new ArrayList<>();
List<Course> jayList = new ArrayList<>();
jayList.add(new Course("語(yǔ)文", "何炅"));
jayList.add(new Course("英語(yǔ)", "謝娜"));
jayList.add(new Course("物理", "何時(shí)風(fēng)"));
students.add(new Student(1, "周杰倫", jayList));
List<Course> jjList = new ArrayList<>();
jjList.add(new Course("數(shù)學(xué)", "鄧軍權(quán)"));
jjList.add(new Course("生物", "搖風(fēng)"));
jjList.add(new Course("物理", "何時(shí)風(fēng)"));
jjList.add(new Course("語(yǔ)文", "何炅"));
students.add(new Student(2, "林俊杰", jjList));
List<Course> luhanList = new ArrayList<>();
luhanList.add(new Course("英語(yǔ)", "謝娜"));
luhanList.add(new Course("生物", "搖風(fēng)"));
luhanList.add(new Course("語(yǔ)文", "何炅"));
students.add(new Student(3, "鹿晗", luhanList));
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
for (Student s : students) {
subscriber.onNext(s);
}
}
}).flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
Log.e("TAG", "學(xué)生名稱為:" + student.getName());
return Observable.from(student.getmList());
}
}).subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
Log.e("TAG", "---onComplete()------");
}
@Override
public void onError(Throwable e) {
Log.e("TAG", "---onError()------");
}
@Override
public void onNext(Course course) {
Log.e("TAG", "課程名稱為:" + course.getCourseName() + ", 任課老師為:" + course.getTechName());
}
});
以上用到的Student類還有Course類如下
class Student {
private int id;
private String name;
private List<Course> mList;
public Student(int id, String name, List<Course> mList) {
this.id = id;
this.name = name;
this.mList = mList;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Course> getmList() {
return mList;
}
public void setmList(List<Course> mList) {
this.mList = mList;
}
}
class Course {
private String courseName;
private String techName;
public Course(String courseName, String techName) {
this.courseName = courseName;
this.techName = techName;
}
public String getCourseName() {
return courseName;
}
public void setCourseName(String courseName) {
this.courseName = courseName;
}
public String getTechName() {
return techName;
}
public void setTechName(String techName) {
this.techName = techName;
}
}
2、下面是具體的分析
首先進(jìn)入到flatMap()方法中,flatMap的代碼如下
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
判斷直接跳過(guò),主要看返回值,返回值調(diào)用了merge()方法,并且以map()方法的返回值作為參數(shù),那么我們首先進(jìn)入到map()方法中看看
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
這個(gè)方法中將會(huì)調(diào)用以func1對(duì)象為參數(shù),創(chuàng)建OperatorMap對(duì)象,然后將OperatorMap對(duì)象作為參數(shù)調(diào)用lift()方法,那么進(jìn)入到lift()方法看看
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
這個(gè)方法中將以初始被觀察者對(duì)象中的onSubscribe(本文中我們將初始被觀察者對(duì)象稱為ob_init,將ob_init中的onSubscribe稱為onSub_init)和OperatorMap對(duì)象為參數(shù)創(chuàng)建第一個(gè)OnSubscribeLift對(duì)象(稱為onSublift_one),同時(shí)以onSublift_one為參數(shù)創(chuàng)建新的被觀察者對(duì)象(稱為ob_one),那么到此map完畢,它將將ob_one返回作為merge()方法的參數(shù),那么下面進(jìn)入到merge()方法中
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
在merge()方法中,前面判斷忽略,直接看返回值,發(fā)現(xiàn)它將用ob_one去調(diào)用lift()方法,并且會(huì)創(chuàng)建OperatorMerge對(duì)象作為lift()方法的參數(shù),那么通過(guò)看前面lift()方法的作用,我們可以知道,它將會(huì)以O(shè)peratorMerge對(duì)象和ob_one中的onSubscribe作為參數(shù)再次創(chuàng)建新的OnSubscribeLift對(duì)象(稱為onSublift_merge),同時(shí)會(huì)以onSublift_merge作為參數(shù),再次創(chuàng)建新的被觀察者對(duì)象(稱為ob_merge),那么現(xiàn)在我們就可以知道,flatMap()方法的最終返回值為ob_merge對(duì)象,那么下面ob_merge將會(huì)調(diào)用訂閱方法subscribe(),并且會(huì)傳入初始觀察者對(duì)象(稱為sub_init),那么下面進(jìn)入到subscribe()中看看
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
// new Subscriber so onStart it
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
異常忽略...
}
}
subscribe()最終會(huì)調(diào)用靜態(tài)的subscribe()方法,傳入的參數(shù)為sub_init對(duì)象和ob_merge對(duì)象,忽略掉前面的判斷直接到hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber)這一句,在這里onSubscribeStart方法將原路返回傳入的observable.onSubscribe,那么傳入的傳入的observable.onSubscribe其實(shí)就是ob_merge中的onSubscribe,那么它調(diào)用的call()方法應(yīng)該就是onSublift_merge對(duì)象中的call()方法,也就是OnSubscribeLift類中的call()方法,傳入的參數(shù)為sub_init,下面進(jìn)入到該call()方法看看
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
在這個(gè)方法中,onLift()方法將會(huì)將傳入的參數(shù)原路返回,也就是返回值就是傳入的operator,這個(gè)operator就是在創(chuàng)建onSublift_merge對(duì)象時(shí)保存的operator,也就是OperatorMerge對(duì)象,那么也就是會(huì)調(diào)用OperatorMerge對(duì)象中的call()方法,傳入的參數(shù)是sub_init,下面進(jìn)入到OperatorMerge對(duì)象中的call()方法
@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
在這個(gè)方法中的主要作用就是將sub_init對(duì)象進(jìn)行包裝,重新創(chuàng)建一個(gè)觀察者對(duì)象(稱為sub_merge),并且返回該對(duì)象,那么在OnSubscribeLift類中的call()方法中的Subscriber<? super T> st = hook.onLift(operator).call(o)這個(gè)操作所創(chuàng)建的觀察者對(duì)象就為sub_merge,接著call()方法會(huì)執(zhí)行,parent.call(st),這里傳入的參數(shù)就是sub_merge,但是這里需要特別注意,parent的值為ob_one對(duì)象中的onSubscribe,也就是在利用map()方法創(chuàng)建的被觀察者對(duì)象中的onSubscribe,那么它調(diào)用的call()方法就是OnSubscribeLift類中的call()方法,所以程序?qū)⒃俅螆?zhí)行OnSubscribeLift類中的call()方法,這次傳入的參數(shù)是sub_merge,那么這次的operator就是OperatorMap對(duì)象,那么它以sub_merge為參數(shù)調(diào)用call()方法,調(diào)用的就是OperatorMap類中的方法,下面進(jìn)入到該方法
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
在這個(gè)方法中,將會(huì)以sub_merge和func1對(duì)象(transformer保存的就是func1對(duì)象)為參數(shù)創(chuàng)建新的觀察者對(duì)象(稱為sub_one),并且返回,那么在OnSubscribeLift類中的call()方法中返回的對(duì)象將是sub_one,那么繼續(xù)往下執(zhí)行,將再次來(lái)到parent.call(st),那么這次的st就是sub_one,parent就是ob_init對(duì)象中的onSubscribe,也就是初始被觀察者對(duì)象中的onSubscribe,那么它調(diào)用的call()方法,將會(huì)回到一下代碼
public void call(Subscriber<? super Student> subscriber) {
for (Student s : students) {
subscriber.onNext(s);
}
}
現(xiàn)在的觀察者對(duì)象已經(jīng)是sub_one,那么它調(diào)用的onNext()方法就是OperatorMap類中的靜態(tài)內(nèi)部類MapSubscriber中的onNext()方法,那么進(jìn)入到該方法
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
在這個(gè)方法中,主要就是result = mapper.call(t)這個(gè)操作,這里的mapper就是func1對(duì)象,那么func1對(duì)象調(diào)用的call()方法有回到了我們開(kāi)始flatMap中的回調(diào)call()方法,它將會(huì)返回一個(gè)Observable對(duì)象,那么接著會(huì)調(diào)用actual.onNext(result),這里的actual就是sub_merge對(duì)象,也就是OperatorMerge類中創(chuàng)建的MergeSubscriber對(duì)象,那么調(diào)用它的onNext()方法,我們進(jìn)入到它的onNext()方法看看,傳入的參數(shù)是func1對(duì)象返回的Observable對(duì)象
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
在這個(gè)方法中,忽略掉前面的判斷,直接進(jìn)入else分析,這里InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++)將會(huì)以sub_merge和uniqueId為參數(shù)再次創(chuàng)建一個(gè)觀察者對(duì)象(稱為inner_sub),然后t.unsafeSubscribe(inner)這個(gè)操作,因?yàn)閠為func1對(duì)象所返回的Observable對(duì)象,所以將會(huì)將inner_sub為參數(shù),調(diào)用unsafeSubscribe()方法,那么進(jìn)入到unsafeSubscribe()方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
在這個(gè)方法中主要看hook.onSubscribeStart(this, onSubscribe).call(subscriber)這里,這里的onSubscribe就是func1對(duì)象返回的Observable對(duì)象中的onSubscribe,所以調(diào)用它的call()方法,那么在這個(gè)call()方法中肯定會(huì)調(diào)用subscribe.onNext()方法,那么這個(gè)subscriber就是傳進(jìn)來(lái)的參數(shù),也就是inner_sub,那么將會(huì)調(diào)用inner_sub中的onNext()方法,也就是InnerSubscriber類中的onNext()方法,下面進(jìn)入到該方法
public void onNext(T t) {
parent.tryEmit(this, t);
}
在這個(gè)方法中的parent就是創(chuàng)建inner_sub時(shí)的傳入的父級(jí)觀察者對(duì)象,也就是MergeSubscriber對(duì)象,也就是sub_merge,那么調(diào)用該對(duì)象的tryEmit()方法,下面進(jìn)入該方法,傳入的參數(shù)是inner_sub,t ( t為最終輸出的數(shù)據(jù))
void tryEmit(T value) {
boolean success = false;
long r = producer.get();
if (r != 0L) {
synchronized (this) {
// if nobody is emitting and child has available requests
r = producer.get();
if (!emitting && r != 0L) {
emitting = true;
success = true;
}
}
}
if (success) {
emitScalar(value, r);
} else {
queueScalar(value);
}
}
這里的關(guān)鍵句在 emitScalar(value, r)這里,它將會(huì)將最終需要輸出的值和r作為參數(shù)調(diào)用emitScalar()方法,下面進(jìn)入到emitScalar()方法
protected void emitScalar(T value, long r) {
boolean skipFinal = false;
try {
try {
child.onNext(value);
} catch (Throwable t) {
if (!delayErrors) {
Exceptions.throwIfFatal(t);
skipFinal = true;
this.unsubscribe();
this.onError(t);
return;
}
getOrCreateErrorQueue().offer(t);
}
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
int produced = scalarEmissionCount + 1;
if (produced == scalarEmissionLimit) {
scalarEmissionCount = 0;
this.requestMore(produced);
} else {
scalarEmissionCount = produced;
}
// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
if (!missed) {
emitting = false;
return;
}
missed = false;
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
emitLoop();
}
在這個(gè)方法中的最主要的操作就是child.onNext(value)這個(gè)了,在這里,終于看到了child,這個(gè)child就是初始的觀察者,也就是我們一開(kāi)始創(chuàng)建的觀察者,那么它調(diào)用onNext()方法就是以下代碼
public void onNext(Course course) {
Log.e("TAG", "課程名稱為:" + course.getCourseName() + ", 任課老師為:" + course.getTechName());
}
這個(gè)方法就是我們自己創(chuàng)建觀察者對(duì)象時(shí)的回調(diào)方法,就是最終的調(diào)用方法,到這里,整個(gè)流程也就打通了,因?yàn)檫@是正常情況下的流程,所以忽略了很多的判斷和特殊的情況,最后,這個(gè)過(guò)程實(shí)在是有點(diǎn)復(fù)雜,所以,可能描寫的有點(diǎn)亂,望見(jiàn)諒