Android Develop——RxJava2(一)

重要的話寫(xiě)在前面

  • Android RxJava 現(xiàn)在基本已經(jīng)是App的標(biāo)配了。但我這種渣渣還是不太熟悉,所以想要學(xué)習(xí)一下這個(gè)強(qiáng)大的框架。
  • 但是又苦于沒(méi)有很完整的教程,只能拜讀各位大神整理的精華了,本文將大神們編寫(xiě)的總結(jié)的東西歸納整理。方便日后學(xué)習(xí)
  • 若有侵權(quán)問(wèn)題,立馬刪除,表示歉意
  • Season大神 寫(xiě)的 《給初學(xué)者的RxJava2.0教程(1~8)》是我看到的非常完整的教程之一,所以會(huì)有這篇教程中的東西。奉上原文地址:https://juejin.im/post/5848d96761ff4b0058c9d3dc

RxJava2的原理

Season大神講解的并不是各種技術(shù)的術(shù)語(yǔ)與概念而是從本質(zhì)的模式開(kāi)始闡述,從事件流角度來(lái)說(shuō)明RxJava的基本工作原理
有兩根水管,一根為事件產(chǎn)生的水管,上游。一根為事件接收的水管,下游。兩根水管通過(guò)一定的方式連接起來(lái),使得上游每產(chǎn)生一個(gè)事件,下游就能接收到該事件。
使用RxjAVA 需要在gradle中配置

    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
事件流向

這樣的模式在RxJava中,上游就是被觀察者Observable(可觀察者),下游就是觀察者(Observer),對(duì)應(yīng)到代碼層面來(lái)說(shuō)就是:

  public void startRxjava2Lesson() {
        // TODO: 創(chuàng)建一個(gè)被觀察者 上游對(duì)象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
            }
        });
        // TODO: 創(chuàng)建觀察被觀察者的對(duì)象——觀察者 下游
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: ");
            }


            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        //建立連接
        observable.subscribe(observer);
    }

只有當(dāng)上游和下游建立連接之后,上游才會(huì)開(kāi)始發(fā)送事件,也就是Observable調(diào)用了subscribe()方法之后才開(kāi)始發(fā)送事件

  • RxJava2特有的鏈?zhǔn)讲僮鲗?xiě)法
 public void startRxJava2Lesson() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

Observable Observable的創(chuàng)建方式

Observable

Observable采用工廠創(chuàng)建模式(?)

 Observable<T> create(@NotNull io.reactivex.ObservableOnSubscribe<T> source)

Defer:直到有訂閱者訂閱了才創(chuàng)建Observable 對(duì)象,才通過(guò)Observable工廠創(chuàng)建Observable并執(zhí)行,這樣確保Observable 每次狀態(tài)為最新**

<T> Observable<T> defer(Func0<Observable<T>> observableFactory)

Empty:返回Obervable 不執(zhí)行任何其他操作,直接執(zhí)行訂閱者的onComplete()方法

<T>Observable<T> empty()

Never:產(chǎn)生一個(gè)不會(huì)執(zhí)行任何參數(shù)永遠(yuǎn)不會(huì)結(jié)束的Observable,起初只用于測(cè)試。
Throw:創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且以錯(cuò)誤結(jié)束的Observable。
From:將數(shù)組/列表用來(lái)創(chuàng)建Observable對(duì)象,將里面的對(duì)象一一當(dāng)參數(shù)執(zhí)行,或者可以用Future來(lái)創(chuàng)建Observable對(duì)象,將Future.get()的值作為參數(shù)執(zhí)行,我們可以指定一個(gè)超時(shí)的值。Observable將等待來(lái)自Future的結(jié)果;如果在超時(shí)之前仍然沒(méi)有結(jié)果返回,Observable將會(huì)觸發(fā)onError()方法通知觀察者有錯(cuò)誤發(fā)生了。

<T> Observable<T> from(Future<? super T> future)
<T> Observable<T> from(Future<? super T> future,long timeout, TimeUnit unit)
<T> Observable<T> from(T[] array)

Interval:每隔一段時(shí)間就產(chǎn)生一個(gè)數(shù)字,這些數(shù)字從0開(kāi)始,一次遞增1直至無(wú)窮大

Observable<long> interval(long initialDelay,long period, TimeUnit unit)

Just:把其他類型的對(duì)象和數(shù)據(jù)類型轉(zhuǎn)化成Observable,just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable,Just類似與From,但是From會(huì)將數(shù)組或Iterable的元素取出然后逐個(gè)發(fā)射,而Just只是簡(jiǎn)單的原樣發(fā)射,將數(shù)組或Iterable當(dāng)作單個(gè)數(shù)據(jù),如果傳遞一個(gè)null給Just,它會(huì)返回一個(gè)發(fā)射null值的Observable

<T>Observable<T> just(T t1,T t2,T t3)

Range:創(chuàng)建一組在從n開(kāi)始,個(gè)數(shù)為m的連續(xù)數(shù)字的Observable,比如range(3,10),就是創(chuàng)建3、4、5…12的一組數(shù)字。

Observable<Integer> range(int start,int count)

Repeat:對(duì)某一個(gè)Observable,重復(fù)產(chǎn)生多次結(jié)果

Observable<T> repeat(final long count, Scheduler scheduler)

Timer:創(chuàng)建一連串?dāng)?shù)字,間隔固定時(shí)間

Observable<Long> timer(long initialDelay,long period, TimeUnit unit, Scheduler scheduler)

Observer

Observer采用構(gòu)造方法創(chuàng)建

Observer<Integer> observer = new Observer<Integer>

ObservableEmitter

ObservableEmitter:Emitter是發(fā)射器的意思,這個(gè)就是用來(lái)發(fā)出事件的,它可以發(fā)出三種類型的事件,通過(guò)調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable t) 就可以發(fā)出對(duì)應(yīng)的事件

  • 上游可以發(fā)送無(wú)限個(gè)onNext 下游也可以接收無(wú)限個(gè)onNext
  • 上游發(fā)送了一個(gè)onComplete后,上游onComplete之后的事件將會(huì)被繼續(xù)發(fā)送,而下游收到onComplete事件后將不繼續(xù)接收事件
  • 上游發(fā)送了一個(gè)onError后,上游onError之后的事件將繼續(xù)發(fā)送,而下游收到onError事件后將不再繼續(xù)接收事件
  • 上游可以不發(fā)送onComplete或者onError
  • onComplete和onError是互斥的,不能發(fā)送多個(gè)onComplete或者onError,這里說(shuō)的是,如果違背了這個(gè)規(guī)則不一定會(huì)引起程序崩潰,發(fā)送多個(gè)onComplete是可以正常運(yùn)行的,依然是收到第一個(gè)onComplete程序就不在接收后續(xù)的事件了,如果發(fā)送多個(gè)onError則收到第二個(gè)onError事件會(huì)導(dǎo)致程序的崩潰

Disposable

Disposable的意思是一次性用品,用完即可丟棄的,在RxJava中對(duì)應(yīng)于上面的水管的粒子,可以理解為兩根管道之間的一個(gè)機(jī)關(guān),當(dāng)調(diào)用它的dispose()方法時(shí),就會(huì)將兩根管道切斷。切斷后上游發(fā)送的事件下游將不會(huì)收到事件。
如果在開(kāi)發(fā)中有多個(gè)Disposable我們?cè)撊绾喂芾砟??在RxJava中已經(jīng)內(nèi)置了一個(gè)容器CompositeDisposable,每當(dāng)我們得到一個(gè)Disposable時(shí)就調(diào)用CompositeDisposable.add()將該Disposable添加到容器中,在需要切斷事件時(shí)調(diào)用CompositeDisposable.clear()即可切斷所有的水管。

subscribe()的重載方法

    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}
  • 不帶任何參數(shù)的subscribe()表示下游不關(guān)心任何事件,上游發(fā)送的事件都不會(huì)被接收
  • 帶有一個(gè)Consumer參數(shù)的方法表示下游只關(guān)心onNext事件,其他的事件不接受

Rxjava2線程調(diào)度

在RxJava2中,當(dāng)我們?cè)谥骶€程中去創(chuàng)建一個(gè)上游Observable來(lái)發(fā)送事件,則這個(gè)上游默認(rèn)在主線程中發(fā)送事件。
當(dāng)我們?cè)谥骶€程中創(chuàng)建一個(gè)下游Observer來(lái)接收事件,則這個(gè)下游默認(rèn)就在主線程中接收事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + Thread.currentThread().getName());
                Log.d(TAG, "accept: " + integer);
            }
        });
  • 執(zhí)行結(jié)果
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

如果我們需要改變執(zhí)行的線程,例如我們希望讓上游的Observable在子線程中發(fā)送事件,然后希望讓下游的Cusmer在主線程接收事件,我們可以通過(guò)RxJava內(nèi)置的線程調(diào)度器來(lái)調(diào)整。

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + Thread.currentThread().getName());
                        Log.d(TAG, "accept: " + integer);
                    }
                });
  • 執(zhí)行結(jié)果
09-11 01:30:37.675 22693-22722/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

我們改變事件的發(fā)送接收線程,需要使用

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())
  • subscribeOn(Schedulers.newThread()) 指定的是上游發(fā)送事件的線程
  • observeOn()指定的是下游接收事件的線程
  • 多次指定上游線程只有第一次指定有效,其余的會(huì)被忽略
  • 多次指定下游的線程是可以的,也就是說(shuō)每一次observeOn(),下游的線程就會(huì)切換一次
    eg:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribeOn(Schedulers.io())
                // change thread firest
                .observeOn(AndroidSchedulers.mainThread())
                //2rd
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + Thread.currentThread().getName());
                        Log.d(TAG, "accept: " + integer);
                    }
                });
  • 執(zhí)行結(jié)果
09-11 01:34:49.456 26491-26514/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: RxCachedThreadScheduler-2
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

RxJava中,內(nèi)置的線程選項(xiàng)

  • Schedulers.io() 代表io操作的線程,通常用于網(wǎng)絡(luò),讀寫(xiě)文件等io密集型的操作
  • Schedulers.computation() 代表CPU計(jì)算密集型的操作,例如需要大量的計(jì)算
  • Schedulers.newThread()代表一個(gè)新的常規(guī)新線程
  • AndroidSchedulers.mainThread() 代表Android的主線程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容