重要的話寫(xiě)在前面
- Android RxJava 現(xiàn)在基本已經(jīng)是App的標(biāo)配了。但我這種渣渣還是不太熟悉,所以想要學(xué)習(xí)一下這個(gè)強(qiáng)大的框架。
- 但是又苦于沒(méi)有很完整的教程,只能拜讀各位大神整理的精華了,本文將大神們編寫(xiě)的總結(jié)的東西歸納整理。方便日后學(xué)習(xí)
- 若有侵權(quán)問(wèn)題,立馬刪除,表示歉意
- Season大神 寫(xiě)的 《給初學(xué)者的RxJava2.0教程(1~8)》是我看到的非常完整的教程之一,所以會(huì)有這篇教程中的東西。奉上原文地址:https://juejin.im/post/5848d96761ff4b0058c9d3dc
RxJava2的原理
Season大神講解的并不是各種技術(shù)的術(shù)語(yǔ)與概念而是從本質(zhì)的模式開(kāi)始闡述,從事件流角度來(lái)說(shuō)明RxJava的基本工作原理
有兩根水管,一根為事件產(chǎn)生的水管,上游。一根為事件接收的水管,下游。兩根水管通過(guò)一定的方式連接起來(lái),使得上游每產(chǎn)生一個(gè)事件,下游就能接收到該事件。
使用RxjAVA 需要在gradle中配置
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

這樣的模式在RxJava中,上游就是被觀察者Observable(可觀察者),下游就是觀察者(Observer),對(duì)應(yīng)到代碼層面來(lái)說(shuō)就是:
public void startRxjava2Lesson() {
// TODO: 創(chuàng)建一個(gè)被觀察者 上游對(duì)象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
}
});
// TODO: 創(chuàng)建觀察被觀察者的對(duì)象——觀察者 下游
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
//建立連接
observable.subscribe(observer);
}
只有當(dāng)上游和下游建立連接之后,上游才會(huì)開(kāi)始發(fā)送事件,也就是Observable調(diào)用了subscribe()方法之后才開(kāi)始發(fā)送事件
- RxJava2特有的鏈?zhǔn)讲僮鲗?xiě)法
public void startRxJava2Lesson() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
Observable Observable的創(chuàng)建方式
Observable
Observable采用工廠創(chuàng)建模式(?)
Observable<T> create(@NotNull io.reactivex.ObservableOnSubscribe<T> source)
Defer:直到有訂閱者訂閱了才創(chuàng)建Observable 對(duì)象,才通過(guò)Observable工廠創(chuàng)建Observable并執(zhí)行,這樣確保Observable 每次狀態(tài)為最新**
<T> Observable<T> defer(Func0<Observable<T>> observableFactory)
Empty:返回Obervable 不執(zhí)行任何其他操作,直接執(zhí)行訂閱者的onComplete()方法
<T>Observable<T> empty()
Never:產(chǎn)生一個(gè)不會(huì)執(zhí)行任何參數(shù)永遠(yuǎn)不會(huì)結(jié)束的Observable,起初只用于測(cè)試。
Throw:創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且以錯(cuò)誤結(jié)束的Observable。
From:將數(shù)組/列表用來(lái)創(chuàng)建Observable對(duì)象,將里面的對(duì)象一一當(dāng)參數(shù)執(zhí)行,或者可以用Future來(lái)創(chuàng)建Observable對(duì)象,將Future.get()的值作為參數(shù)執(zhí)行,我們可以指定一個(gè)超時(shí)的值。Observable將等待來(lái)自Future的結(jié)果;如果在超時(shí)之前仍然沒(méi)有結(jié)果返回,Observable將會(huì)觸發(fā)onError()方法通知觀察者有錯(cuò)誤發(fā)生了。
<T> Observable<T> from(Future<? super T> future)
<T> Observable<T> from(Future<? super T> future,long timeout, TimeUnit unit)
<T> Observable<T> from(T[] array)
Interval:每隔一段時(shí)間就產(chǎn)生一個(gè)數(shù)字,這些數(shù)字從0開(kāi)始,一次遞增1直至無(wú)窮大
Observable<long> interval(long initialDelay,long period, TimeUnit unit)
Just:把其他類型的對(duì)象和數(shù)據(jù)類型轉(zhuǎn)化成Observable,just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable,Just類似與From,但是From會(huì)將數(shù)組或Iterable的元素取出然后逐個(gè)發(fā)射,而Just只是簡(jiǎn)單的原樣發(fā)射,將數(shù)組或Iterable當(dāng)作單個(gè)數(shù)據(jù),如果傳遞一個(gè)null給Just,它會(huì)返回一個(gè)發(fā)射null值的Observable
<T>Observable<T> just(T t1,T t2,T t3)
Range:創(chuàng)建一組在從n開(kāi)始,個(gè)數(shù)為m的連續(xù)數(shù)字的Observable,比如range(3,10),就是創(chuàng)建3、4、5…12的一組數(shù)字。
Observable<Integer> range(int start,int count)
Repeat:對(duì)某一個(gè)Observable,重復(fù)產(chǎn)生多次結(jié)果
Observable<T> repeat(final long count, Scheduler scheduler)
Timer:創(chuàng)建一連串?dāng)?shù)字,間隔固定時(shí)間
Observable<Long> timer(long initialDelay,long period, TimeUnit unit, Scheduler scheduler)
Observer
Observer采用構(gòu)造方法創(chuàng)建
Observer<Integer> observer = new Observer<Integer>
ObservableEmitter
ObservableEmitter:Emitter是發(fā)射器的意思,這個(gè)就是用來(lái)發(fā)出事件的,它可以發(fā)出三種類型的事件,通過(guò)調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable t) 就可以發(fā)出對(duì)應(yīng)的事件
- 上游可以發(fā)送無(wú)限個(gè)onNext 下游也可以接收無(wú)限個(gè)onNext
- 上游發(fā)送了一個(gè)onComplete后,上游onComplete之后的事件將會(huì)被繼續(xù)發(fā)送,而下游收到onComplete事件后將不繼續(xù)接收事件
- 上游發(fā)送了一個(gè)onError后,上游onError之后的事件將繼續(xù)發(fā)送,而下游收到onError事件后將不再繼續(xù)接收事件
- 上游可以不發(fā)送onComplete或者onError
- onComplete和onError是互斥的,不能發(fā)送多個(gè)onComplete或者onError,這里說(shuō)的是,如果違背了這個(gè)規(guī)則不一定會(huì)引起程序崩潰,發(fā)送多個(gè)onComplete是可以正常運(yùn)行的,依然是收到第一個(gè)onComplete程序就不在接收后續(xù)的事件了,如果發(fā)送多個(gè)onError則收到第二個(gè)onError事件會(huì)導(dǎo)致程序的崩潰
Disposable
Disposable的意思是一次性用品,用完即可丟棄的,在RxJava中對(duì)應(yīng)于上面的水管的粒子,可以理解為兩根管道之間的一個(gè)機(jī)關(guān),當(dāng)調(diào)用它的dispose()方法時(shí),就會(huì)將兩根管道切斷。切斷后上游發(fā)送的事件下游將不會(huì)收到事件。
如果在開(kāi)發(fā)中有多個(gè)Disposable我們?cè)撊绾喂芾砟??在RxJava中已經(jīng)內(nèi)置了一個(gè)容器CompositeDisposable,每當(dāng)我們得到一個(gè)Disposable時(shí)就調(diào)用CompositeDisposable.add()將該Disposable添加到容器中,在需要切斷事件時(shí)調(diào)用CompositeDisposable.clear()即可切斷所有的水管。
subscribe()的重載方法
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
- 不帶任何參數(shù)的subscribe()表示下游不關(guān)心任何事件,上游發(fā)送的事件都不會(huì)被接收
- 帶有一個(gè)Consumer參數(shù)的方法表示下游只關(guān)心onNext事件,其他的事件不接受
Rxjava2線程調(diào)度
在RxJava2中,當(dāng)我們?cè)谥骶€程中去創(chuàng)建一個(gè)上游Observable來(lái)發(fā)送事件,則這個(gè)上游默認(rèn)在主線程中發(fā)送事件。
當(dāng)我們?cè)谥骶€程中創(chuàng)建一個(gè)下游Observer來(lái)接收事件,則這個(gè)下游默認(rèn)就在主線程中接收事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
//emit 1
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + Thread.currentThread().getName());
Log.d(TAG, "accept: " + integer);
}
});
- 執(zhí)行結(jié)果
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1
如果我們需要改變執(zhí)行的線程,例如我們希望讓上游的Observable在子線程中發(fā)送事件,然后希望讓下游的Cusmer在主線程接收事件,我們可以通過(guò)RxJava內(nèi)置的線程調(diào)度器來(lái)調(diào)整。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
//emit 1
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + Thread.currentThread().getName());
Log.d(TAG, "accept: " + integer);
}
});
- 執(zhí)行結(jié)果
09-11 01:30:37.675 22693-22722/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1
我們改變事件的發(fā)送接收線程,需要使用
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
- subscribeOn(Schedulers.newThread()) 指定的是上游發(fā)送事件的線程
- observeOn()指定的是下游接收事件的線程
- 多次指定上游線程只有第一次指定有效,其余的會(huì)被忽略
- 多次指定下游的線程是可以的,也就是說(shuō)每一次observeOn(),下游的線程就會(huì)切換一次
eg:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
//emit 1
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
// change thread firest
.observeOn(AndroidSchedulers.mainThread())
//2rd
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + Thread.currentThread().getName());
Log.d(TAG, "accept: " + integer);
}
});
- 執(zhí)行結(jié)果
09-11 01:34:49.456 26491-26514/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: RxCachedThreadScheduler-2
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1
RxJava中,內(nèi)置的線程選項(xiàng)
- Schedulers.io() 代表io操作的線程,通常用于網(wǎng)絡(luò),讀寫(xiě)文件等io密集型的操作
- Schedulers.computation() 代表CPU計(jì)算密集型的操作,例如需要大量的計(jì)算
- Schedulers.newThread()代表一個(gè)新的常規(guī)新線程
- AndroidSchedulers.mainThread() 代表Android的主線程