RxJava基本使用

image

參考:給 Android 開發(fā)者的 RxJava 詳解
GitHub:RxJava、RxAndroid

// Build.gradle配置文件引入
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'

1、RxJava可以做什么

RxJava是一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫??梢暂p松實(shí)現(xiàn)線程切換的方式。平時我們做線程切換的幾種方式:

  • Handler
    這個應(yīng)該是Android常用的一種方式,利用線程的Handler發(fā)送消息到相應(yīng)的線程Looper里面,那么這個任務(wù)也將會在該線程執(zhí)行。
handler.sendEmptyMessage(EVENT_DOWNLOAD_DATA);

Handler另外的一種發(fā)送消息的方式:

getUiHandler().post(new Runnable() {
    @Override
    public void run() {
        // do something                
    }
});
  • new Thread()
    新建一個子線程執(zhí)行耗時操作,需要切換到主線更新UI時
getActivity().runOnUiThread(new Runnable() {
    @Override
    public void run() {
         // do something       
    }
});
  • AsyncTask
    內(nèi)部有個線程池執(zhí)行異常任務(wù)doInBackground(),在UI線程中執(zhí)行任務(wù)前onPreExecute()、更新進(jìn)度onProgressUpdate()、完成任務(wù)onPostExecute()的方法。
  • EventBus、otto
    比較通用的事件消息總線開源框架,對于普通的線程切換來說,也是比較重的一種方式。
  • 另外還有廣播、Service等不大靈活的線程切換方式。

相必,以上的幾種方式使用起來都不是那么的簡潔,業(yè)務(wù)復(fù)雜起來,還會導(dǎo)致邏輯上的不清晰,那么RxJava就可以做到邏輯的簡潔,執(zhí)行異步任務(wù)、線程切換都可以在一條鏈上完成。

2、觀察者模式

觀察者模式的例子:

  • Button點(diǎn)擊事件
    Button被觀察者,持有引用,OnClickListener觀察者,通過setOnClickListener()注冊/訂閱點(diǎn)擊事件,發(fā)生onClick()事件。
  • 設(shè)置監(jiān)聽回調(diào)函數(shù)
    比如執(zhí)行選擇會員彈窗Dialog。Dialog被觀察者,持有引用,OnMemberSelectListener觀察者,通過setOnMemberSelectListener()注冊/訂閱選擇會員事件,發(fā)生onMemberSelect(Member member)事件。

3、RxJava觀察者模式實(shí)現(xiàn)步驟

3.1、創(chuàng)建觀察者 Observer

Observable被觀察者 和 Observer觀察者 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系
事件。

onNext()
onCompleted()
onError()
onStart() // 這是 Subscriber 增加的方法。它總是在 subscribe 所發(fā)生的線程被調(diào)用。不太適用于執(zhí)行任務(wù)前的彈窗等UI操作。
unsubscribe() // 這是 Subscriber 所實(shí)現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱。避免內(nèi)存泄漏。

Observer 接口,Observer 的抽象類:Subscriber

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
3.2、創(chuàng)建被觀察者 Observable

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

另外還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列

// just()
Observable observable = Observable.just("Hello", "Hi", "Aloha");

// from()
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
3.3、Subscribe (訂閱)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

觀察者被觀察者換位置:對流式 API 的設(shè)計會造成影響。

在 RxJava 中, Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候。

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào)

// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當(dāng)成一個包裝對象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe(),相當(dāng)于其他某些語言中的『閉包』。

4、RxJava操作符

變換:就是將事件序列中的對象或整個序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列

FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

4.1、map()

事件對象的直接變換

Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
4.2、flatMap()
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。 flatMap() 的原理是這樣的:

    1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;
    1. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;
    1. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。

由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求。

5、Scheduler 線程切換

  • Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
  • Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費(fèi) CPU。
  • AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。
  • subscribeOn():subscribe() 所發(fā)生的線程,即Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
  • observeOn():Subscriber 所運(yùn)行在的線程?;蛘呓凶鍪录M(fèi)的線程。
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

通過 observeOn() 的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換。

不過,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的。

  • doOnSubscribe()

與 Subscriber.onStart() 相對應(yīng)的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容