第一篇用了1.0版本的RxJava 從現(xiàn)在開始來使用2.0.1,還是使用新的好一些
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
解釋一下其中兩個陌生的玩意:ObservableEmitter和Disposable.
ObservableEmitter: Emitter是發(fā)射器的意思,那就很好猜了,這個就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件。
這里使用要注意有使用規(guī)則
ObservableEmitter 可以發(fā)送無限個onNext,但是complete和onError時間只能發(fā)送其中一個
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
但是只要在收到onError或者onComplete后就不會再接收onNext,這時候會出現(xiàn)的情況就是被觀察的ObservableEmitter反射器可能還會繼續(xù)發(fā)送onNext但是接收這Observer不會再接收onNext時間。
注: 關于onComplete和onError唯一并且互斥這一點, 是需要自行在代碼中進行控制, 如果你的代碼邏輯中違背了這個規(guī)則, 并不一定會導致程序崩潰. 比如發(fā)送多個onComplete是可以正常運行的, 依然是收到第一個onComplete就不再接收了, 但若是發(fā)送多個onError, 則收到第二個onError事件會導致程序會崩潰.
作者:Season_zlc
鏈接:http://www.itdecent.cn/p/464fa025229e
來源:簡書
著作權歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。
那么Disposable用來干什么的呢,它的作用就是讓Observer不在接收onNext onComplete和onError事件
比如在一連串業(yè)務操作中 如果中途出現(xiàn)什么狀況需要停掉正在進行的業(yè)務 就可以使用這個,比如像下面這樣用
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable = null;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String value) {
if ("111".equals(value)) {
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
mDisposable = null;
}
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
另外還有一個不同的地方是
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
observable.subscribe();被重載了很多個 而且和1.0的也是有一些區(qū)別 那具體怎么用呢
1、沒有參數(shù)的 調(diào)用后Observer不會任何事件,而observable的subscribe中的事件調(diào)用會依次執(zhí)行
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// 這里相當于1.0中的call函數(shù) 調(diào)用的順序還是和1.0的時候一樣的 只是使用的類和函數(shù)名有了少許改變
//ObservableEmitter 時間發(fā)射器 其實可以理解為一個觀察者對象
log("Observable CurrentThread : " + Thread.currentThread().getName());
e.onNext("Hello");
e.onComplete();// 1.0中函數(shù)名為onCompleted()
}
});
打印日志結果
08-08 11:18:39.713 24454-24454/com.example.rxjavademo I/MainActivity----Test: Observable CurrentThread : main
2、subscribe(Consumer<? super T> onNext) 參數(shù)為一個Consumer對象的 從下面的代碼中可以看出,這里調(diào)用Observer只關心收到的onNext()不關系onComplete和onError 其實這里和上一篇講的1.0中的Action1這個類的作用很相似就是對onNext的函數(shù)封裝成了一個對于對象來使用,因為Consumer的內(nèi)部函數(shù)accept(T t)就是一個有參無返回的函數(shù) 而onNext(T t) 剛好也是同樣的,所以這樣你是不是發(fā)現(xiàn)onError(T t) 也是同樣的是這樣的。請繼續(xù)看第三條
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// 這里相當于1.0中的call函數(shù) 調(diào)用的順序還是和1.0的時候一樣的 只是使用的類和函數(shù)名有了少許改變
//ObservableEmitter 時間發(fā)射器 其實可以理解為一個觀察者對象
log("Observable CurrentThread : " + Thread.currentThread().getName());
e.onNext("Hello");
e.onComplete();// 1.0中函數(shù)名為onCompleted()
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log("Consumer CurrentThread : " + Thread.currentThread().getName());
log("onNext :" + s);
}
};
observable.subscribe(consumer);
日志打印結果
08-08 11:23:37.163 28780-28780/com.example.baiguosong.rxjavademo I/MainActivity----Test: Observable CurrentThread : main
08-08 11:23:37.163 28780-28780/com.example.baiguosong.rxjavademo I/MainActivity----Test: Consumer CurrentThread : main
08-08 11:23:37.163 28780-28780/com.example.baiguosong.rxjavademo I/MainActivity----Test: onNext :Hello
3、subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)。上面第二條中講到Consumer就是對onNext(T t) 和onError(Throwabe t)的封裝,看果不其然兩個參數(shù)的時候就關系的是onNext和onError看日志就知道onNext和onError都執(zhí)行了
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// 這里相當于1.0中的call函數(shù) 調(diào)用的順序還是和1.0的時候一樣的 只是使用的類和函數(shù)名有了少許改變
//ObservableEmitter 時間發(fā)射器 其實可以理解為一個觀察者對象
log("Observable CurrentThread : " + Thread.currentThread().getName());
e.onNext("Hello");
// e.onComplete();// 1.0中函數(shù)名為onCompleted()
e.onError(new NullPointerException());
}
});
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log("Consumer CurrentThread : " + Thread.currentThread().getName());
log("onNext :" + s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
log("onError : " + e.toString());
}
};
observable.subscribe(onNextConsumer,onErrorConsumer);
日志打印結果
08-08 11:35:49.043 7751-7751/com.example.baiguosong.rxjavademo I/MainActivity----Test: Observable CurrentThread : main
08-08 11:35:49.043 7751-7751/com.example.baiguosong.rxjavademo I/MainActivity----Test: Consumer CurrentThread : main
08-08 11:35:49.043 7751-7751/com.example.baiguosong.rxjavademo I/MainActivity----Test: onNext :Hello
08-08 11:35:49.043 7751-7751/com.example.baiguosong.rxjavademo I/MainActivity----Test: onError : java.lang.NullPointerException
4、subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)經(jīng)過前面的講解那么三個參數(shù)就一目了然了 第三個參數(shù)Action肯定就是對應的onComplete()的函數(shù)的封裝了 無參無返回值,老規(guī)矩還是直接上代碼
public interface Action {
/**
* Runs the action and optionally throws a checked exception.
* @throws Exception if the implementation wishes to throw a checked exception
*/
void run() throws Exception;
}
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// 這里相當于1.0中的call函數(shù) 調(diào)用的順序還是和1.0的時候一樣的 只是使用的類和函數(shù)名有了少許改變
//ObservableEmitter 時間發(fā)射器 其實可以理解為一個觀察者對象
log("Observable CurrentThread : " + Thread.currentThread().getName());
e.onNext("Hello");
// e.onComplete();// 1.0中函數(shù)名為onCompleted()
// e.onError(new NullPointerException());
e.onComplete();
}
});
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log("Consumer CurrentThread : " + Thread.currentThread().getName());
log("onNext :" + s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
log("onError : " + e.toString());
}
};
Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
log("onComplete : run");
}
};
observable.subscribe(onNextConsumer,onErrorConsumer,onCompleteAction);
日志打印結果
08-08 11:43:57.733 14894-14894/com.example.baiguosong.rxjavademo I/MainActivity----Test: Observable CurrentThread : main
08-08 11:43:57.733 14894-14894/com.example.baiguosong.rxjavademo I/MainActivity----Test: Consumer CurrentThread : main
08-08 11:43:57.733 14894-14894/com.example.baiguosong.rxjavademo I/MainActivity----Test: onNext :Hello
08-08 11:43:57.733 14894-14894/com.example.baiguosong.rxjavademo I/MainActivity----Test: onComplete : run
5、subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) 這里又多了一個參數(shù)Consumer<? super Disposable> 通過前面的舉例說明這里的第四個參數(shù)就沒什么難度了吧 Disposable這個不就是前面講到的那個讓Observer不再接收事件的函數(shù)嘛
6、subscribe(Observer<? super T> observer) 這個也很好理解了 直接看代碼
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable = null;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String value) {
log("onNext :" + value);
}
@Override
public void onError(Throwable e) {
log("onError :" + e.toString());
}
@Override
public void onComplete() {
log("onComplete :");
}
};
observable.subscribe(observer);
日志打印結果
08-08 11:50:43.103 20994-20994/com.example.baiguosong.rxjavademo I/MainActivity----Test: Observable CurrentThread : main
08-08 11:50:43.103 20994-20994/com.example.baiguosong.rxjavademo I/MainActivity----Test: onNext :Hello
08-08 11:50:43.103 20994-20994/com.example.baiguosong.rxjavademo I/MainActivity----Test: onComplete :
以上內(nèi)容就是2.0.1版本和1.0版本的對比講解后面的內(nèi)容都會基于2.0.1版本來學習研究,歡迎拍磚指教。
注:部分內(nèi)容是看了這位大佬的 傳送門
http://www.itdecent.cn/p/464fa025229e