這篇文章基于RxJava2.0
RxJava是什么?
官網(wǎng)說RxJava是一個可觀測的序列來組成異步的額,基于事件的庫,簡單來說,它就是一個實現(xiàn)異步的庫,可以代替Android的API如AsyncTask ,Handler等等。
RxJava為什么好?
RxJava其實就是提供一套異步編程的API,這套API是基于觀察者模式的,而且是鏈式調(diào)用的,所以使用RxJava編寫的代碼邏輯會非常簡潔。
觀察者模式:
定義:對象間一種一對多的關(guān)系,使得每當(dāng)一個對象改變,則所有依賴于它的對象都會得到通知并被自動更新。
作用:解耦,UI層與具體的業(yè)務(wù)邏輯解耦。
官方支持時間?
官方支持更新到2020年12月31日,沒關(guān)系的,還有Rxjava3。
使用場景?
可以進行數(shù)據(jù)庫的寫入,大圖片的載入,文件壓縮和解壓等各種需要放在后臺工作的耗時操作,都可以使用RxJava來實現(xiàn),可以使用RxJava來實現(xiàn)響應(yīng)式編程。
如何使用?
添加依賴:
//RxJava2
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
通過RxJava最簡單的一個例子引出它的三個基本元素:觀察者,被觀察者和訂閱。

被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");//發(fā)送事件時,觀察者會回調(diào)onNext方法
emitter.onComplete();//這個順序不能顛倒,如果onNext方法放在最后,onNext就不會執(zhí)行了
}
}).subscribeOn(Schedulers.io())//實際項目中網(wǎng)絡(luò)請求在io線程
.observeOn(AndroidSchedulers.mainThread());//這里意為觀察者在主線程更新UI
create是RXJava中最基本的操作符,
觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//這個方法在訂閱前就會被調(diào)用
Log.i("log", "onSubscribe->" + d);
//d.dispose();//取消發(fā)射事件
}
@Override
public void onNext(String s) {
Log.i("log", "onNext->" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
訂閱觀察者:
observable.subscribe(observer);
被觀察者訂閱觀察者后,observable中的方法會被立刻回調(diào) 。
除了Observable,還有其它4個被觀察者可以操作,一共5種

接下來看看其它4種被觀察者是如何使用的
被觀察者(背壓)Flowable
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000000000; i++) {
emitter.onNext(i);
//這里還是發(fā)射了20條數(shù)據(jù)
Log.d("log", "subscribe: " + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
觀察者(背壓)
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d("log", "onSubscribe");
//這里體現(xiàn)的是響應(yīng)式拉取
//s.request(Long.MAX_VALUE);//指定下游(觀察者)接收數(shù)據(jù)的最大值
s.request(10);
}
@Override
public void onNext(Integer integer) {
//拉取10條數(shù)據(jù)
Log.d("log", "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("log", "onComplete");
}
};
被觀察者(Single)
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
emitter.onSuccess("消息");
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
});
被觀察者(Completable)
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
emitter.onError(new Exception());
}
});
被觀察者(Maybe)
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> emitter) throws Exception {
emitter.onSuccess("消息");
emitter.onComplete();
emitter.onError(new Exception());
}
});
操作符:操作符包括創(chuàng)建操作符,轉(zhuǎn)換操作符,組合操作符,功能操作符,過濾操作符,條件操作符。

舉個just的例子
just和creat一樣,也是創(chuàng)建,只是最多不能超過10個參數(shù)
//可以傳入多個參數(shù)
//Observable.just("a",1,2,"b").subscribe(new Observer<Object>() {
//也可以傳入一個方法
Observable.just(getNumber()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.i("log", "" + o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
public int getNumber() {
return 1;
}


//組合操作符
Observable.concat(Observable.just(1, 2),
Observable.just(5, 6),
Observable.just(3, 4),
Observable.just(7, 8)).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i("log", "" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
輸出結(jié)果為:1,2,5,6,3,4,7,8


Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 2;//發(fā)送數(shù)字小于2的消息
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i("log", "" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

RxJava的線程切換是如何實現(xiàn)的?
來了解下線程切換的代碼實現(xiàn)
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())//網(wǎng)絡(luò)請求在io線程
.observeOn(AndroidSchedulers.mainThread());//這里意為觀察者在主線程更新UI
RxJava的線程切換是通過Scheduler(線程調(diào)度器)來實現(xiàn)的,Scheduler的作用是簡化了異步操作。
subscribeOn:通過接收一個Schedule參數(shù),來指定對數(shù)據(jù)的處理運行在特定的調(diào)度器Schedule上,若多次設(shè)定,則只有一次起作用。
observeOn:接收一個Schedule參數(shù),來指定下游(RxJava官網(wǎng)把觀察者稱為下游)操作運行在特定的線程調(diào)度器Schedule上,若多次設(shè)定,每次均起作用。
Schedule的種類如下:

1)Schedules.io()
用于IO密集型的操作,例如讀寫SD卡文件,查詢數(shù)據(jù)庫,訪問網(wǎng)絡(luò)等,具有線程緩存機制,在此調(diào)度器接收到任務(wù)后,先檢查線程緩存池中,是否有空閑的線程,如果有,則復(fù)用,如果沒有則創(chuàng)建新的線程,并加入到線程池中,如果每次都沒有空閑線程使用,可以無上限的創(chuàng)建新線程。
2)Schedulers.newThread(?)
在每執(zhí)行一個任務(wù)時創(chuàng)建一個新的線程,不具有線程緩存機制,因為創(chuàng)建一個新的線程比復(fù)用一個線程更耗時耗力,雖然使用Schedulers.io(?)的地方,都可以使用Schedulers.newThread(?),但是,Schedulers.newThread(?)的效率沒有Schedulers.io(?)高。
3)Schedulers.computation()
用于CPU 密集型計算任務(wù),即不會被 I/O 等操作限制性能的耗時操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數(shù)。不可以用于I/O操作,因為I/O操作的等待時間會浪費CPU。
4)Schedulers.trampoline()
在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會將其暫停,等插入進來的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行。
5)Schedulers.single()
擁有一個線程單例,所有的任務(wù)都在這一個線程中執(zhí)行,當(dāng)此線程中有任務(wù)執(zhí)行時,其他任務(wù)將會按照先進先出的順序依次執(zhí)行。
6)Scheduler.from(@NonNull Executor executor)
指定一個線程調(diào)度器,由此調(diào)度器來控制任務(wù)的執(zhí)行策略。
7)AndroidSchedulers.mainThread()
在Android UI線程中執(zhí)行任務(wù),為Android開發(fā)定制。
RxJava中的背壓
背壓出現(xiàn)的原因:
當(dāng)上下游在不同的線程中,通過Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時,如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對于那些沒來得及處理的數(shù)據(jù)就會造成積壓,這些數(shù)據(jù)既不會丟失,也不會被垃圾回收機制回收,而是存放在一個異步緩存池中,如果緩存池中的數(shù)據(jù)一直得不到處理,越積越多,最后就會造成內(nèi)存溢出,這便是響應(yīng)式編程中的背壓(backpressure)問題
背壓策略的解決思路:
利用響應(yīng)式拉取,響應(yīng)式拉取是觀察者主動去被觀察者那里拉取事件,而被觀察者則是被動等待通知再發(fā)射事件。
BackpressureStrategy背壓策略:
1)MISSING
MissingEmitter:在此策略下,通過Create方法創(chuàng)建的Flowable相當(dāng)于沒有指定背壓策略,不會對通過onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過背壓操作符。
2)ERROR
ErrorAsyncEmitter:在此策略下,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會拋出MissingBackpressureException異常。
3)BUFFER
BufferAsyncEmitter:部維護了一個緩存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默認的異步緩存池滿了,會通過此緩存池暫存數(shù)據(jù),它與Observable的異步緩存池一樣,可以無限制向里添加數(shù)據(jù),不會拋出MissingBackpressureException異常,但會導(dǎo)致OOM
4)DROP
DropAsyncEmitter:在此策略下,如果Flowable的異步緩存池滿了,會丟掉上游發(fā)送的數(shù)據(jù)。
5)LATEST
LatestAsyncEmitter:與Drop策略一樣,如果緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù),不同的是,不管緩存池的狀態(tài)如何,LATEST都會將最后一條數(shù)據(jù)強行放入緩存池中,來保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù)
RxJava的生命周期
為什么要關(guān)注RxJava的生命周期?
來看一個小例子
MainActivity.java
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000000000; i++) {
emitter.onNext(i);
Log.d("log", "subscribe: " + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io());
比如我們這里有一個按鈕,從MainActivity跳轉(zhuǎn)到MainActivity2中,當(dāng)跳到MainActivity2的時候,可以通過log看到日志中還是輸出打印,這樣是很不友好的,更重要的是會造成內(nèi)存溢出,所以這就是為什么要對RxJava的生命周期進行管理了。
RxLifecycle的github地址:
https://github.com/trello/RxLifecycle
我這里用的是3.0的Rxlifecycle,需要依賴依androidx,Rxlifecycle和androidx的配置如下:
dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'androidx.appcompat:appcompat:1.0.2'
implementation 'androidx.constraintlayout:constraintlayout:2.0.0-alpha2'
testImplementation 'junit:junit:4.12'
androidTestImplementation 'androidx.test:runner:1.1.0'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0'
//RxJava2
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
//3.0這個是基于androidx的
implementation 'com.trello.rxlifecycle3:rxlifecycle:3.0.0'
implementation 'com.trello.rxlifecycle3:rxlifecycle-android:3.0.0'
implementation 'com.trello.rxlifecycle3:rxlifecycle-components:3.0.0'
}
然后讓我們的Activity繼承RxAppCompatActivity即可。
Observable.interval(1, TimeUnit.SECONDS).doOnDispose(new Action() {
@Override
public void run() throws Exception {
}
})
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE));
這里我設(shè)置的是在pause的時候取消訂閱,需要結(jié)合rxlifecycle使用,當(dāng)然也可以設(shè)置別的狀態(tài)來管理,比如onPause,onDestroy等。