根據(jù)自己的理解,公司用的還是Rxjava 1.x,不過(guò)很久以前使用過(guò),沒(méi)有過(guò)多注意這一塊,不過(guò)還是想弄一下,周末的時(shí)候,主要看了 簡(jiǎn)單使用,如何切換線程,并且去理解這個(gè)過(guò)程
操作符什么的后面理解的時(shí)候,再談?wù)摚冗M(jìn)行就簡(jiǎn)單的create。重點(diǎn)切換線程
一、準(zhǔn)備
查看源碼的版本:
implementation 'io.reactivex:rxjava:1.2.1'
這邊先不糾結(jié)于 Rxjava 2.x,這個(gè)可能后續(xù)去看Rxjava2.x的時(shí)候,再去討論了。
二、進(jìn)入正題
2.1 簡(jiǎn)單使用
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println("Observable.OnSubscribe : " + Thread.currentThread());
subscriber.onNext("sss");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s + " " + Thread.currentThread());
}
});
簡(jiǎn)單就是這么的使用,打印如下:
Observable.OnSubscribe: Thread[RxNewThreadScheduler-1,5,main]
sss Thread[RxIoScheduler-2,5,main]
2.2 查看subscribeOn的源碼
//rx.Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//1
//rx.Observable 里面的成員create方法
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
-
這里直接創(chuàng)建了一個(gè)
OperatorSubscribeOn, 然后傳遞了一個(gè)this,然后把scheduler線程切換這里注意,這個(gè)
this我感覺(jué)比較關(guān)鍵,把當(dāng)前的Observable對(duì)象傳遞了進(jìn)去;當(dāng)前OperatorSubscribeOn還是通過(guò)Observable的create()方法創(chuàng)建Observable對(duì)象,也就是說(shuō),OperatorSubscribeOn是一個(gè)Observable.OnSubscribe對(duì)象,并且里面含有上一個(gè)Observable對(duì)象
進(jìn)入到 OperatorSubscribeOn類(lèi)中
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
//成員變量
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
//1.
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
//...省略代碼
};
//2.
source.unsafeSubscribe(s);
}
});
}
}
-
其實(shí)就是從
scheduler中創(chuàng)建一個(gè)任務(wù),然后通過(guò)schedule()方法進(jìn)行執(zhí)行。我是這么理解的,相當(dāng)于一個(gè)線程池執(zhí)行
//inner.schedule()類(lèi)比下面 ThreadPool.execute(new Runnable() { @Override public void run() { //執(zhí)行任務(wù) } };具體是
scheduler先不再這個(gè)討論,我這邊現(xiàn)在也還沒(méi)再看(因?yàn)槲沂莻€(gè)菜雞) source.unsafeSubscribe(s)這段語(yǔ)句相當(dāng)于source.subscribe(s),因?yàn)樵创a里面調(diào)用subscribe(s)的時(shí)候也把我們的Subscriber對(duì)象轉(zhuǎn)成了SafeSubscriber的對(duì)象。
到這里發(fā)現(xiàn),subscribeOn相當(dāng)于下面代碼
private Observable<T> source;
ThreadPool.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
void onError(Throwable t) {
sub.onError(t);
}
@Override
void onNext(T t) {
sub.onNext(t);
}
@Override
void onCompleted() {
sub.onCompleted();
}
};
source.subscribe(subscriber);
}
};
2.3 分析為什么subscribeOn多次調(diào)用只有第一次有用
其實(shí) 2.2 查看subscribeOn的源碼已經(jīng)得出了結(jié)論,是在線程里面通過(guò)代理了上一個(gè)的Observable<T>對(duì)象,也就是說(shuō),上游的被當(dāng)前線程池里面的線程接管了。
XXX.subscribeOn(Schedulers.AAA).subscribeOn(Schedulers.BBB).subscribe()
就拿這兩層來(lái)說(shuō),因?yàn)楸旧?code>subscribeOn就是代理上一個(gè)的Observable<T>對(duì)象,從后面往前面看,就是說(shuō),BBB這個(gè)線程池,要給前一個(gè)AAA的Observable<T>對(duì)象,如下偽代碼:
private Observable<T> sourceAAA;
ThreadPoolBBB.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//這個(gè)是BBB線程的執(zhí)行
sub.onNext(t);
}
//...省略
};
sourceAAA.subscribe(subscriber);
}
};
而AAA的前面是XXX,那偽代碼應(yīng)該如下:
private Observable<T> sourceXXX;
ThreadPoolAAA.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//這個(gè)是AAA線程的執(zhí)行
sub.onNext(t);
}
//...省略
};
sourceXXX.subscribe(subscriber);
}
};
好了,上面好像還是看不太出來(lái),然后我們組合一下偽代碼來(lái)看看
private Observable<T> sourceAAA;
ThreadPoolBBB.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//這個(gè)是BBB線程的執(zhí)行
sub.onNext {
//偽代碼遷移,內(nèi)部執(zhí)行
private Observable<T> sourceXXX;
ThreadPoolAAA.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//這個(gè)是AAA線程的執(zhí)行
sub.onNext(t);
}
//...省略
};
sourceXXX.subscribe(subscriber);
}
};
}
}
//...省略
};
sourceAAA.subscribe(subscriber);
}
};
簡(jiǎn)化一下代碼,用線程代替就是,相當(dāng)于如下:
new Thread("BBB") {
@Override
public void run() {
new Thread("AAA") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
}
}.start();
}
}.start();
打印可想而知,是里面 AAA線程;
Thread[AAA,5,main]
2.4 查看observerOn的源碼
//rx.Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//...
//2.
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1.
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
好家伙,看了一下,連續(xù)調(diào)用了4層,最總還是通過(guò)create()創(chuàng)建了一個(gè)Observable<T>對(duì)象,
-
OnSubscribeLift和前面看subscribeOn()一樣,第一個(gè)參數(shù)onSubscribe是上一個(gè)Observable<T>對(duì)象的onSubscribe,也就是說(shuō),subscribeOn()和observerOn()的比較大的區(qū)別是,subscribeOn()“代理的是上一個(gè)對(duì)象的Observable<T>”,而observerOn()是“代理的是上一個(gè)的OnSubscribe<T>”,所以observerOn()就是改變了下游的線程切換。第二個(gè)參數(shù)是上一步生成的OperatorObserveOn進(jìn)行了傳遞到里面。 -
scheduler的對(duì)象傳給了OperatorObserveOn,這個(gè)類(lèi)干了切換換線程的操作。
可以看出observerOn()通過(guò)兩個(gè)類(lèi)(OperatorObserveOn和OnSubscribeLift)來(lái)管理,也不知道是為什么,是不是有可能方便單元測(cè)試,還是為了單一職責(zé)呢,就不糾結(jié)于此了。如果我寫(xiě)可能就一個(gè)類(lèi)做了。
2.4.1 先看OnSubscribeLift
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
//...
//1.
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
//...
}
//...
}
}
-
Operator#call(OperatorObserveOn)然后通過(guò)Subscriber參數(shù)來(lái)生成了一個(gè)新的Subscriber然后通過(guò)上一個(gè)的OnSubscribe<T>對(duì)象傳遞。
2.4.2 OperatorObserveOn
這個(gè)類(lèi)比較長(zhǎng),分開(kāi)來(lái)了,
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
//...略
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
//...略
// 1.
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
//...略
}
//...略
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
//...略
}
- 接著 2.4.1 的
Operator#call,里面就調(diào)用了OperatorObserveOn的內(nèi)部類(lèi)ObserveOnSubscriber進(jìn)行把上一個(gè)的Subscriber代理了一下,然后具有了有了切換線程的能力
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final Queue<Object> queue;
//...略
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
}
//...略
void init() {
Subscriber<? super T> localChild = child;
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
//1.
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
//1.
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
//1.
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
@Override
public void call() {
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
//...略
for (;;) {
long requestAmount = requested.get();
//...略
while (requestAmount != currentEmission) {
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
//1.
localChild.onNext(NotificationLite.<T>getValue(v));
}
//...略
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
//...略
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (e != null) {
//1.
a.onError(e);
return true;
} else {
//1.
a.onCompleted();
return true;
}
return false;
}
}
上面已經(jīng)省去了很多代碼
-
就是通過(guò)傳入進(jìn)來(lái)的
child(Subscriber)對(duì)象,然后在外面調(diào)用Subscriber#onNext()方法的時(shí)候,實(shí)際就進(jìn)行了 queue 入隊(duì)操作,然后通過(guò)recursiveScheduler.schedule(this)切換到線程中執(zhí)行if (!queue.offer(NotificationLite.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } //recursiveScheduler.schedule(this); 最終執(zhí)行到這里 public void call() { }recursiveScheduler.schedule(this);最終執(zhí)行了call()方法,從而達(dá)到了且換線程的目的。我根據(jù)根據(jù)這個(gè)思路寫(xiě)了偽代碼
static final class ObserverOnSubscribe<T> extends Subscriber<T> implements Runnable { private Scheduler scheduler; private Subscriber<T> child; private Queue<T> queue; public ObserverOnSubscribe(Scheduler scheduler, Subscriber<T> child) { this.scheduler = scheduler; this.child = child; queue = new LinkedList<>(); } @Override public void run() { if (queue.isEmpty()) { child.onCompleted(); return; } T poll = queue.poll(); child.onNext(poll); } //... @Override void onNext(T t) { queue.offer(t); schedule(); } @Override void onCompleted() { schedule(); } public void schedule() { scheduler.execute(this); } }
2.5 observerOn每次調(diào)用,后面都會(huì)進(jìn)行線程切換,但不會(huì)影響前面的線程
observerOn每次調(diào)用,后面都會(huì)進(jìn)行線程切換,但不會(huì)影響前面的,因?yàn)?code>observerOn“代理了”OnSubscribe,就相當(dāng)于影響下游的切換。好比如下代碼
new Thread("BBB") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
new Thread("AAA") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
}
}.start();
}
}.start();
總結(jié)
subscribeOn :
整條線路上第一次切換的有效,但有效的范圍分為
a. 有ObserverOn的話,那就是ObserverOn的下下一個(gè),因?yàn)镺bserverOn只影響下一個(gè)
b. 如果只用了subscribeOn,那就是subscribeOn第一次切換的那個(gè)線程,然后知道ObserverOn來(lái)進(jìn)行切換
<img src="http://reactivex.io/documentation/operators/images/schedulers.png" alt="">
這個(gè)代理的是 Observrable 對(duì)象
//thread-1
thread {
Observeable#subscribe("創(chuàng)建了一新的");
}
為什么subscribeOn重復(fù)設(shè)置沒(méi)有使用,只有第一次呢?
就像這樣
//thread-2
thread {
//thread-1
thread {
Observeable#subscribe("創(chuàng)建了一新的");
}
}
這樣讀取出來(lái)其實(shí)還是thread-1,thread-2就沒(méi)什么意義了。
observeOn:影響下一次訂閱
observeOn代理的是Observable.OnSubscribe這個(gè)接口,就是onNext的這個(gè)接口,往下面?zhèn)鞯倪@個(gè)接口,所以會(huì)影響下面的線程。
//thread-2
thread {
//thread-1
thread {
Observeable#subscribe(() -> {
Run run = this.run;
thread {
run.run();
}
});
}
}
以上,如果有錯(cuò)誤,非常感謝您的指正,分享就是為了學(xué)習(xí),寫(xiě)錯(cuò)了,雖然丟臉,但是能學(xué)到東西也很舒服。
區(qū)別:subscribeOn()和observerOn()的比較大的區(qū)別是,subscribeOn()“代理的是上一個(gè)對(duì)象的Observable<T>”,而observerOn()是“代理的是上一個(gè)的OnSubscribe<T>”,所以observerOn()就是改變了下游的線程切換。