Rxjava 切換線程

根據(jù)自己的理解,公司用的還是Rxjava 1.x,不過(guò)很久以前使用過(guò),沒(méi)有過(guò)多注意這一塊,不過(guò)還是想弄一下,周末的時(shí)候,主要看了 簡(jiǎn)單使用,如何切換線程,并且去理解這個(gè)過(guò)程

操作符什么的后面理解的時(shí)候,再談?wù)摚冗M(jìn)行就簡(jiǎn)單的create。重點(diǎn)切換線程

一、準(zhǔn)備

查看源碼的版本:

implementation 'io.reactivex:rxjava:1.2.1'

這邊先不糾結(jié)于 Rxjava 2.x,這個(gè)可能后續(xù)去看Rxjava2.x的時(shí)候,再去討論了。

二、進(jìn)入正題

2.1 簡(jiǎn)單使用

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        System.out.println("Observable.OnSubscribe : " + Thread.currentThread());
        subscriber.onNext("sss");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.io())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s + " " + Thread.currentThread());
            }
        });

簡(jiǎn)單就是這么的使用,打印如下:

Observable.OnSubscribe: Thread[RxNewThreadScheduler-1,5,main]
sss Thread[RxIoScheduler-2,5,main]

2.2 查看subscribeOn的源碼

//rx.Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    //1
    //rx.Observable 里面的成員create方法
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}
  1. 這里直接創(chuàng)建了一個(gè)OperatorSubscribeOn, 然后傳遞了一個(gè) this,然后把scheduler線程切換

    這里注意,這個(gè) this 我感覺(jué)比較關(guān)鍵,把當(dāng)前的Observable對(duì)象傳遞了進(jìn)去;當(dāng)前OperatorSubscribeOn還是通過(guò) Observablecreate()方法創(chuàng)建Observable對(duì)象,也就是說(shuō),OperatorSubscribeOn是一個(gè)Observable.OnSubscribe對(duì)象,并且里面含有上一個(gè)Observable對(duì)象

進(jìn)入到 OperatorSubscribeOn類(lèi)中

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    //成員變量
    final Scheduler scheduler;
    final Observable<T> source;
        
    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        //1.
        subscriber.add(inner);
        inner.schedule(new Action0() {
            @Override
            public void call() {
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }
                    //...省略代碼
                };
              //2.
                source.unsafeSubscribe(s);
            }
        });
    }
}
  1. 其實(shí)就是從scheduler中創(chuàng)建一個(gè)任務(wù),然后通過(guò)schedule()方法進(jìn)行執(zhí)行。

    我是這么理解的,相當(dāng)于一個(gè)線程池執(zhí)行

    //inner.schedule()類(lèi)比下面
    ThreadPool.execute(new Runnable() {
        @Override
        public void run() {
             //執(zhí)行任務(wù)
        }
    };
    

    具體是scheduler先不再這個(gè)討論,我這邊現(xiàn)在也還沒(méi)再看(因?yàn)槲沂莻€(gè)菜雞)

  2. source.unsafeSubscribe(s)這段語(yǔ)句相當(dāng)于source.subscribe(s),因?yàn)樵创a里面調(diào)用subscribe(s)的時(shí)候也把我們的Subscriber對(duì)象轉(zhuǎn)成了SafeSubscriber的對(duì)象。

到這里發(fā)現(xiàn),subscribeOn相當(dāng)于下面代碼

private Observable<T> source;

ThreadPool.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    @Override
                    void onError(Throwable t) {
                        sub.onError(t);
                    }

                    @Override
                    void onNext(T t) {
                        sub.onNext(t);
                    }

                    @Override
                    void onCompleted() {
                        sub.onCompleted();
                    }
                };
        source.subscribe(subscriber);
    }
};

2.3 分析為什么subscribeOn多次調(diào)用只有第一次有用

其實(shí) 2.2 查看subscribeOn的源碼已經(jīng)得出了結(jié)論,是在線程里面通過(guò)代理了上一個(gè)的Observable<T>對(duì)象,也就是說(shuō),上游的被當(dāng)前線程池里面的線程接管了。

XXX.subscribeOn(Schedulers.AAA).subscribeOn(Schedulers.BBB).subscribe()

就拿這兩層來(lái)說(shuō),因?yàn)楸旧?code>subscribeOn就是代理上一個(gè)的Observable<T>對(duì)象,從后面往前面看,就是說(shuō),BBB這個(gè)線程池,要給前一個(gè)AAAObservable<T>對(duì)象,如下偽代碼:

private Observable<T> sourceAAA;

ThreadPoolBBB.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //這個(gè)是BBB線程的執(zhí)行
                        sub.onNext(t);
                    }
                            //...省略
                };
        sourceAAA.subscribe(subscriber);
    }
};

AAA的前面是XXX,那偽代碼應(yīng)該如下:

private Observable<T> sourceXXX;

ThreadPoolAAA.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //這個(gè)是AAA線程的執(zhí)行
                        sub.onNext(t);
                    }
                            //...省略
                };
        sourceXXX.subscribe(subscriber);
    }
};

好了,上面好像還是看不太出來(lái),然后我們組合一下偽代碼來(lái)看看

private Observable<T> sourceAAA;

ThreadPoolBBB.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //這個(gè)是BBB線程的執(zhí)行
                        sub.onNext {
                                //偽代碼遷移,內(nèi)部執(zhí)行
                                     private Observable<T> sourceXXX;

                            ThreadPoolAAA.execute(new Runnable() {
                                @Override
                                public void run() {
                                    Subscriber<T> subscriber = new Subscriber<T>() {
                                                //...省略
                                                @Override
                                                void onNext(T t) {
                                                    //這個(gè)是AAA線程的執(zhí)行
                                                    sub.onNext(t);
                                                }
                                                //...省略
                                            };
                                    sourceXXX.subscribe(subscriber);
                                }
                            };
                        }
                    }
                            //...省略
                };
        sourceAAA.subscribe(subscriber);
    }
};

簡(jiǎn)化一下代碼,用線程代替就是,相當(dāng)于如下:

new Thread("BBB") {
  @Override
  public void run() {
    new Thread("AAA") {
      @Override
      public void run() {
        System.out.println("" + Thread.currentThread());
      }
    }.start();
  }
}.start();

打印可想而知,是里面 AAA線程;

Thread[AAA,5,main]

2.4 查看observerOn的源碼

//rx.Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //...
            //2.
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            //1.
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

好家伙,看了一下,連續(xù)調(diào)用了4層,最總還是通過(guò)create()創(chuàng)建了一個(gè)Observable<T>對(duì)象,

  1. OnSubscribeLift和前面看subscribeOn()一樣,第一個(gè)參數(shù)onSubscribe是上一個(gè)Observable<T>對(duì)象的onSubscribe,也就是說(shuō),subscribeOn()observerOn()的比較大的區(qū)別是,subscribeOn()“代理的是上一個(gè)對(duì)象的Observable<T>”,而observerOn()是“代理的是上一個(gè)的OnSubscribe<T>”,所以observerOn()就是改變了下游的線程切換。第二個(gè)參數(shù)是上一步生成的OperatorObserveOn進(jìn)行了傳遞到里面。
  2. scheduler的對(duì)象傳給了OperatorObserveOn,這個(gè)類(lèi)干了切換換線程的操作。

可以看出observerOn()通過(guò)兩個(gè)類(lèi)(OperatorObserveOnOnSubscribeLift)來(lái)管理,也不知道是為什么,是不是有可能方便單元測(cè)試,還是為了單一職責(zé)呢,就不糾結(jié)于此了。如果我寫(xiě)可能就一個(gè)類(lèi)做了。

2.4.1 先看OnSubscribeLift

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;
    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
      //...
      //1.
      Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
      try {
        // new Subscriber created and being subscribed with so 'onStart' it
        st.onStart();
        parent.call(st);
      } catch (Throwable e) {
        //...
      }
      //...
    }
}
  1. Operator#callOperatorObserveOn)然后通過(guò)Subscriber參數(shù)來(lái)生成了一個(gè)新的Subscriber然后通過(guò)上一個(gè)的OnSubscribe<T>對(duì)象傳遞。

2.4.2 OperatorObserveOn

這個(gè)類(lèi)比較長(zhǎng),分開(kāi)來(lái)了,

public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
  //...略
@Override
  public Subscriber<? super T> call(Subscriber<? super T> child) {
     //...略
    // 1.
      ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
      parent.init();
      return parent;

  }
   //...略
  
}
//...略
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
  //...略
}
  1. 接著 2.4.1Operator#call,里面就調(diào)用了OperatorObserveOn的內(nèi)部類(lèi)ObserveOnSubscriber進(jìn)行把上一個(gè)的Subscriber代理了一下,然后具有了有了切換線程的能力
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> child;
    final Scheduler.Worker recursiveScheduler;
    final Queue<Object> queue;
    //...略
    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        this.recursiveScheduler = scheduler.createWorker();
        if (UnsafeAccess.isUnsafeAvailable()) {
            queue = new SpscArrayQueue<Object>(calculatedSize);
        } else {
            queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
        }
    }
  //...略
    void init() {
        Subscriber<? super T> localChild = child;
        localChild.add(recursiveScheduler);
        localChild.add(this);
    }

    
    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        //1.
        if (!queue.offer(NotificationLite.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }

    @Override
    public void onCompleted() {
      //1.
        if (isUnsubscribed() || finished) {
            return;
        }
        finished = true;
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        //1.
        if (isUnsubscribed() || finished) {
            RxJavaHooks.onError(e);
            return;
        }
        error = e;
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        //...略
        for (;;) {
            long requestAmount = requested.get();
              //...略
            while (requestAmount != currentEmission) {
                Object v = q.poll();
                boolean empty = v == null;

                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }

                if (empty) {
                    break;
                }
                                //1.
                localChild.onNext(NotificationLite.<T>getValue(v));
            }
                      //...略
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }
            //...略
        }
    }

    boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
      if (e != null) {
        //1.
             a.onError(e);
        return true;
      } else {
        //1.
            a.onCompleted();
        return true;
      }
      return false;
    }
}

上面已經(jīng)省去了很多代碼

  1. 就是通過(guò)傳入進(jìn)來(lái)的childSubscriber)對(duì)象,然后在外面調(diào)用 Subscriber#onNext()方法的時(shí)候,實(shí)際就進(jìn)行了 queue 入隊(duì)操作,然后通過(guò)recursiveScheduler.schedule(this)切換到線程中執(zhí)行

    if (!queue.offer(NotificationLite.next(t))) {
      onError(new MissingBackpressureException());
      return;
    }
    schedule();
    
    protected void schedule() {
      if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
      }
    }
    //recursiveScheduler.schedule(this); 最終執(zhí)行到這里
    public void call() {
      
    }
    

    recursiveScheduler.schedule(this);最終執(zhí)行了call()方法,從而達(dá)到了且換線程的目的。

    我根據(jù)根據(jù)這個(gè)思路寫(xiě)了偽代碼

    static final class ObserverOnSubscribe<T> extends Subscriber<T> implements Runnable {
        private Scheduler scheduler;
        private Subscriber<T> child;
        private Queue<T> queue;
    
        public ObserverOnSubscribe(Scheduler scheduler, Subscriber<T> child) {
            this.scheduler = scheduler;
            this.child = child;
    
            queue = new LinkedList<>();
        }
    
        @Override
        public void run() {
            if (queue.isEmpty()) {
                child.onCompleted();
                return;
            }
            T poll = queue.poll();
            child.onNext(poll);
        }
         //...
    
        @Override
        void onNext(T t) {
            queue.offer(t);
            schedule();
        }
    
        @Override
        void onCompleted() {
            schedule();
        }
    
        public void schedule() {
            scheduler.execute(this);
        }
    }
    

2.5 observerOn每次調(diào)用,后面都會(huì)進(jìn)行線程切換,但不會(huì)影響前面的線程

observerOn每次調(diào)用,后面都會(huì)進(jìn)行線程切換,但不會(huì)影響前面的,因?yàn)?code>observerOn“代理了”OnSubscribe,就相當(dāng)于影響下游的切換。好比如下代碼

new Thread("BBB") {
    @Override
    public void run() {
        System.out.println("" + Thread.currentThread());
        
        new Thread("AAA") {
            @Override
            public void run() {
                System.out.println("" + Thread.currentThread());
            }
        }.start();
    }
}.start();

總結(jié)

subscribeOn :

  1. 整條線路上第一次切換的有效,但有效的范圍分為

    a. 有ObserverOn的話,那就是ObserverOn的下下一個(gè),因?yàn)镺bserverOn只影響下一個(gè)

    b. 如果只用了subscribeOn,那就是subscribeOn第一次切換的那個(gè)線程,然后知道ObserverOn來(lái)進(jìn)行切換

<img src="http://reactivex.io/documentation/operators/images/schedulers.png" alt="">

這個(gè)代理的是 Observrable 對(duì)象

//thread-1
thread {
   Observeable#subscribe("創(chuàng)建了一新的");
}

為什么subscribeOn重復(fù)設(shè)置沒(méi)有使用,只有第一次呢?

就像這樣

//thread-2
thread {
  //thread-1
   thread {
   Observeable#subscribe("創(chuàng)建了一新的");
    }
}

這樣讀取出來(lái)其實(shí)還是thread-1,thread-2就沒(méi)什么意義了。

observeOn:影響下一次訂閱

observeOn代理的是Observable.OnSubscribe這個(gè)接口,就是onNext的這個(gè)接口,往下面?zhèn)鞯倪@個(gè)接口,所以會(huì)影響下面的線程。

//thread-2
thread {
  //thread-1
   thread {
   Observeable#subscribe(() -> {
        Run run = this.run;
        thread {
        run.run();
      }
   });
    }
}

以上,如果有錯(cuò)誤,非常感謝您的指正,分享就是為了學(xué)習(xí),寫(xiě)錯(cuò)了,雖然丟臉,但是能學(xué)到東西也很舒服。

區(qū)別:subscribeOn()observerOn()的比較大的區(qū)別是,subscribeOn()“代理的是上一個(gè)對(duì)象的Observable<T>”,而observerOn()是“代理的是上一個(gè)的OnSubscribe<T>”,所以observerOn()就是改變了下游的線程切換。

Rxjava線程切換簡(jiǎn)單的實(shí)現(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容