1 前言
Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡(jiǎn)潔 & 使用簡(jiǎn)單的特點(diǎn),深受各大 Android開(kāi)發(fā)者的歡迎。
RxJava github地址

944365-4c1c1eb44ffe01e5.png
1 定義
- RxJava 在 GitHub 的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù)
- 總結(jié):RxJava 是一個(gè) 基于事件流、實(shí)現(xiàn)異步操作的庫(kù)
2 作用
- 實(shí)現(xiàn)異步操作
- 類似于 Android中的 AsyncTask 、Handler作用
3 特點(diǎn)
由于 RxJava的使用方式是:基于事件流的鏈?zhǔn)秸{(diào)用,所以使得 RxJava:
- 邏輯簡(jiǎn)潔
- 實(shí)現(xiàn)優(yōu)雅
- 使用簡(jiǎn)單
更重要的是,隨著程序邏輯的復(fù)雜性提高,它依然能夠保持簡(jiǎn)潔 & 優(yōu)雅
4 原理
4.1 生活例子引入
- 顧客到飯店吃飯

顧客到飯店吃飯.png

944365-07f12da4616b2b68.png
4.2 Rxjava原理介紹
- Rxjava原理 基于.一種擴(kuò)展的觀察者模式
- Rxjava的擴(kuò)展觀察者模式中有4個(gè)角色
| 角色 | 作用 | 類比 |
|---|---|---|
| 被觀察者(Observable) | 產(chǎn)生事件 | 顧客 |
| 觀察者(Observer) | 接收事件,并給出響應(yīng)動(dòng)作 | 廚房 |
| 訂閱(Subscribe) | 連接 被觀察者 & 觀察者 | 服務(wù)員 |
| 事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
具體原理
請(qǐng)結(jié)合上述 顧客到飯店吃飯 的生活例子理解:

944365-5b6e7c8a3bb55f39.png

944365-fc3b7eb5a0ad28d0.png
- 即RxJava原理可總結(jié)為:被觀察者 (Observable) 通過(guò) 訂閱(Subscribe)按順序發(fā)送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對(duì)應(yīng)的響應(yīng)動(dòng)作。具體如下圖:

944365-98ec92df0a4d7e0b.png
至此,RxJava原理講解完畢。
5 基本使用
- 1.分步驟實(shí)現(xiàn):該方法主要為了深入說(shuō)明Rxjava的原理 & 使用,主要用于演示說(shuō)明
- 2.基于事件流的鏈?zhǔn)秸{(diào)用:主要用于實(shí)際使用
5.1 方式1:分步驟實(shí)現(xiàn)
5.1.1 使用步驟

944365-779b0832b164e116.png
5.1.2 步驟詳解
//步驟1:創(chuàng)建被觀察者 (Observable )& 生產(chǎn)事件
Observable<Integer> observable = createObservable1();
//步驟2:創(chuàng)建觀察者 (Observer )并 定義響應(yīng)事件的行為
Observer<Integer> observer = createObserver1();
//步驟3:通過(guò)訂閱(Subscribe)連接觀察者和被觀察者
observable.subscribe(observer);
// 或者 observable.subscribe(subscriber);
步驟1:創(chuàng)建被觀察者 (Observable )& 生產(chǎn)事件
- 即 顧客入飯店 - 坐下餐桌 - 點(diǎn)菜
- 具體實(shí)現(xiàn)
create()--just(T...)--fromXX()
create()
private Observable<Integer> createObservable1() {
// 1. 創(chuàng)建被觀察者 Observable 對(duì)象
// create() 是 RxJava 最基本的創(chuàng)造事件序列的方法
// 此處傳入了一個(gè) OnSubscribe 對(duì)象參數(shù)
// 當(dāng) Observable 被訂閱時(shí),OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,即事件序列就會(huì)依照設(shè)定依次被觸發(fā)
// 即觀察者會(huì)依次調(diào)用對(duì)應(yīng)事件的復(fù)寫(xiě)方法從而響應(yīng)事件
// 從而實(shí)現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在復(fù)寫(xiě)的subscribe()里定義需要發(fā)送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通過(guò) ObservableEmitter類對(duì)象產(chǎn)生事件并通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發(fā)射器
// b. 作用:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
return integerObservable;
}
just(T...)
//擴(kuò)展:RxJava 提供了其他方法用于 創(chuàng)建被觀察者對(duì)象Observable
// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來(lái)
private Observable createObservable2() {
Observable observable = Observable.just("A", "B", "C");
return observable;
from(T[])
//擴(kuò)展:RxJava 提供了其他方法用于 創(chuàng)建被觀察者對(duì)象Observable
// 方法2:from(T[]) 將傳入的數(shù)組 / Iterable 拆分成具體對(duì)象后,依次發(fā)送出來(lái)
private Observable createObservable3() {
String[] words = {"A", "B", "C"};
Observable observable = Observable.fromArray(words);
return observable;
}
步驟2:創(chuàng)建觀察者 (Observer )并 定義響應(yīng)事件的行為
- 即 開(kāi)廚房 - 確定對(duì)應(yīng)菜式
- 發(fā)生的事件類型包括:Next事件、Complete事件 & Error事件。具體如下:

944365-8cb0da34f94b0c73.png
具體實(shí)現(xiàn)
- 方式1:采用Observer 接口
- 方式2:采用Subscriber 抽象類
2種方法的區(qū)別- 相同點(diǎn):二者基本使用方式完全一致(實(shí)質(zhì)上,在RxJava的 subscribe 過(guò)程中,Observer總是會(huì)先被轉(zhuǎn)換成Subscriber再使用)
- 不同點(diǎn):Subscriber抽象類對(duì) Observer 接口進(jìn)行了擴(kuò)展,新增了兩個(gè)方法:
- onStart():在還未響應(yīng)事件前調(diào)用,用于做一些初始化工作
- unsubscribe():用于取消訂閱。在該方法被調(diào)用后,觀察者將不再接收 & 響應(yīng)事件
- 調(diào)用該方法前,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用,如果引用不能及時(shí)釋放,就會(huì)出現(xiàn)內(nèi)存泄露
采用Observer 接口
private Observer<Integer> createObserver1() {
//方式1:采用Observer 接口
// 1. 創(chuàng)建觀察者 (Observer )對(duì)象
Observer<Integer> observer = new Observer<Integer>() {
// 2. 創(chuàng)建對(duì)象時(shí)通過(guò)對(duì)應(yīng)復(fù)寫(xiě)對(duì)應(yīng)事件方法 從而 響應(yīng)對(duì)應(yīng)事件
// 觀察者接收事件前,默認(rèn)最先調(diào)用復(fù)寫(xiě) onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開(kāi)始采用subscribe連接");
}
// 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onNext(Integer integer) {
Log.d(TAG, "對(duì)Next事件作出響應(yīng)" + integer);
}
// 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
// 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
};
return observer;
}
方式2:采用Subscriber 抽象類
private Subscriber<Integer> createObserver2() {
//方式2:采用Subscriber 抽象類
// 說(shuō)明:Subscriber類 = RxJava 內(nèi)置的一個(gè)實(shí)現(xiàn)了 Observer 的抽象類,
// 對(duì) Observer 接口進(jìn)行了擴(kuò)展\
// 1. 創(chuàng)建觀察者 (Observer )對(duì)象
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
// 2. 創(chuàng)建對(duì)象時(shí)通過(guò)對(duì)應(yīng)復(fù)寫(xiě)對(duì)應(yīng)事件方法 從而 響應(yīng)對(duì)應(yīng)事件
// 觀察者接收事件前,默認(rèn)最先調(diào)用復(fù)寫(xiě) onSubscribe()
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "開(kāi)始采用subscribe連接");
}
// 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onNext(Integer integer) {
Log.d(TAG, "對(duì)Next事件作出響應(yīng)" + integer);
}
// 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onError(Throwable t) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
// 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
};
return subscriber;
}
步驟3:通過(guò)訂閱(Subscribe)連接觀察者和被觀察者
- 即 顧客找到服務(wù)員 - 點(diǎn)菜 - 服務(wù)員下單到>廚房 - 廚房烹調(diào)
- 具體實(shí)現(xiàn)
observable.subscribe(observer);
// 或者 observable.subscribe(subscriber);
5.2 方式2:優(yōu)雅的實(shí)現(xiàn)方法 - 基于事件流的鏈?zhǔn)秸{(diào)用
- 上述的實(shí)現(xiàn)方式是為了說(shuō)明Rxjava的原理 & 使用
- 在實(shí)際應(yīng)用中,會(huì)將上述步驟&代碼連在一起,從而更加簡(jiǎn)潔、更加優(yōu)雅,即所謂的 RxJava基于事件流的鏈?zhǔn)秸{(diào)用.
private void chainCall1() {
// RxJava的鏈?zhǔn)讲僮? // 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通過(guò)通過(guò)訂閱(subscribe)連接觀察者和被觀察者
// 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開(kāi)始采用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "對(duì)Next事件" + integer + "作出響應(yīng)");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
//注:整體方法調(diào)用順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.onNext()>觀察者.onComplete()
}
RxJava 2.x 提供了多個(gè)函數(shù)式接口 ,用于實(shí)現(xiàn)簡(jiǎn)便式的觀察者模式。

944365-abda1c2bef8681f3.png
private void chainCall2() {
Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
6 額外說(shuō)明
6.1 觀察者 Observer的subscribe()具備多個(gè)重載的方法
public final Disposable subscribe() {}
// 表示觀察者不對(duì)被觀察者發(fā)送的事件作出任何響應(yīng)(但被觀察者還是可以繼續(xù)發(fā)送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示觀察者只對(duì)被觀察者發(fā)送的Next事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示觀察者只對(duì)被觀察者發(fā)送的Next事件 & Error事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示觀察者只對(duì)被觀察者發(fā)送的Next事件、Error事件 & Complete事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示觀察者只對(duì)被觀察者發(fā)送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應(yīng)
public final void subscribe(Observer<? super T> observer) {}
// 表示觀察者對(duì)被觀察者發(fā)送的任何事件都作出響應(yīng)
6.2 可采用 Disposable.dispose() 切斷觀察者 與 被觀察者 之間的連接
即觀察者 無(wú)法繼續(xù) 接收 被觀察者的事件,但被觀察者還是可以繼續(xù)發(fā)送事件
// 主要在觀察者 Observer中 實(shí)現(xiàn)
Observer<Integer> observer = new Observer<Integer>() {
// 1. 定義Disposable類變量
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開(kāi)始采用subscribe連接");
// 2. 對(duì)Disposable類變量賦值
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)" );
if (value == 2) {
// 設(shè)置在接收到第二個(gè)事件后切斷觀察者和被觀察者的連接
mDisposable.dispose();
Log.d(TAG, "已經(jīng)切斷了連接:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
};