??Rxjava 在這些年在android開發(fā)中非常的火爆,它和Retrofit 的結(jié)合堪稱完美,他們可以把我們從以前我們進行網(wǎng)絡(luò)請求中各種線程切換,各種接口回掉中解放出來了??梢宰屛覀兊倪壿嬜兊姆浅G逦?,便于代碼維護。我們公司的項目目前使用的Rxjava 版本還是1.0版本的,準(zhǔn)備打算升級到使用Rx2.0。所以打算學(xué)習(xí)一下Rxjava 2
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext "+s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-13 16:24:39.176 11578-11578/? E/huangjie: onSubscribe
10-13 16:24:39.176 11578-11578/? E/huangjie: onNext hello world
10-13 16:24:39.176 11578-11578/? E/huangjie: onComplete
上面代碼中我們看到了兩個陌生的對象,ObservableEmitter和Disposable.
ObservableEmitter:
?? Emitter是發(fā)射器的意思,那就很好猜了,這個就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件
Disposable:
??這個單詞的字面意思是一次性用品,用完即可丟棄的. 那么在RxJava中怎么去理解它呢, 可以想象成一條河流 當(dāng)調(diào)用它的dispose()方法時, 它就會放下柵欄, 從而導(dǎo)致下游收不到事件.
在Rxjava 2 中,Observable不再支持訂閱Subscriber了,而是需要使用Observer作為觀察者。但是有一點是沒有變化的,不管是在Rxjava 1 還是在Rxjava 2中,被觀察者,觀察者,訂閱 三者缺一不可。只有在使用了subscribe()方法過后(就是被觀察者訂閱觀察者),被觀察者才會發(fā)送數(shù)據(jù)。
在Rxjava 2中有五種觀察者模式

| 類型 | 描述 |
|---|---|
| Observale | 能夠發(fā)射0或者n 個數(shù)據(jù),并以成功或者錯誤事件終止 |
| Flowable | 能夠發(fā)射0或n個數(shù)據(jù),并以成功或者錯誤事件終止,支持背壓,可以控制數(shù)據(jù)源發(fā)射的速度 |
| Single | 只能發(fā)射單個數(shù)據(jù)或錯誤事件 |
| Completable | 從來不發(fā)射數(shù)據(jù),只處理onComplete 和onError事件,可以看成Rxjava的Runnable |
| Maybe | 能夠發(fā)射0或者n個數(shù)據(jù),要么成功,要么失敗。有點類似于Optional |
do操作符
??do操作符可以給Observable的生命周期的各個階段加上一系列的監(jiān)聽,當(dāng)Observable執(zhí)行到各個階段的時,這些回掉就會觸發(fā)。
Observable.just("hello")
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "doOnNext" + s);
}
})
.doAfterNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "doAfterNext" + s);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally");
}
})
.doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> stringNotification) throws Exception {
Log.e(TAG, "doOnEach" + (stringNotification.isOnNext() ? "onNext" : stringNotification.isOnComplete() ? "onComplete" : "onError"));
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle run");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e(TAG, " onNext" + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, " onError");
}
@Override
public void onComplete() {
}
});
E/huangjie: doOnSubscribe
E/huangjie: doOnLifecycle
E/huangjie: doOnNext hello
E/huangjie: doOnEach onNext
E/huangjie: onNext hello
E/huangjie: doAfterNext hello
E/huangjie: doOnComplete
E/huangjie: doOnEach onComplete
E/huangjie: doFinally
E/huangjie: doAfterTerminate
根據(jù)打印的日志我們看出Rxjava 內(nèi)部數(shù)據(jù)的流向
| 操作符 | 用途 |
|---|---|
| doOnSubscribe | 一旦觀察者訂閱了Observable,它就會調(diào)用 |
| doOnLifecycle | 可以在觀察者訂閱之后,設(shè)置是否取消訂閱 |
| doOnNext | 它產(chǎn)生的Observable 每發(fā)射一項就會調(diào)用它一次,它的Consumer接受發(fā)射的數(shù)據(jù)項,一般用于在subscrible之前的數(shù)據(jù)進行處理 |
| doOnEach | 它產(chǎn)生的Observable每發(fā)射一項數(shù)據(jù)就會調(diào)用它一次,不僅包括onNext,還包括onError和onCompleted |
| doAfterNext | 在onNext之后執(zhí)行,而doOnNext()是在onNext之前執(zhí)行 |
| doOnComplete | 當(dāng)它產(chǎn)生的Observable在正常終止調(diào)用onComplete時會被調(diào)用 |
| doFinally | 在當(dāng)它產(chǎn)生的Observable終止之后會被調(diào)用,無論是正常終止,還是異常終止,doFinally 優(yōu)于doAfterTerminate的調(diào)用 |
| doAfterTerminate | 注冊一個Action,當(dāng)Observable調(diào)用onComplete或onError時觸發(fā) |
上面的代碼中我們發(fā)現(xiàn)了Consumer 和Action,我可以認(rèn)為其實Consumer 就是Rxjava 1.x 的Action1, Consumer是單一參數(shù),Action是無參數(shù)類型。