原理
Rx是Reactive Extensions的縮寫的簡寫,可以使用可觀察數(shù)據(jù)流對(duì)編程接口進(jìn)行異步編程,它結(jié)合了觀察者模式,迭代器模式和函數(shù)式的精華。
Rxjava是一種異步數(shù)據(jù)處理庫,也是一種觀察者模式。最早是Netflix公司用于重構(gòu)當(dāng)前架構(gòu)時(shí)減少REST調(diào)用的次數(shù),參考了Microsoft公司的響應(yīng)式編程,把Microsoft的Rx庫遷移到Java JVM中,其中最有名的就是RxJava。
它的特點(diǎn)主要有以下:
- 支持Java 8 Lambda。
- 支持異步和同步。
- 單一依賴關(guān)系。
- 簡潔,優(yōu)雅。
RxAndroid
在開發(fā)項(xiàng)目的時(shí)候,開發(fā)者在使用Rxjava時(shí)會(huì)搭配RxAndroid,他是針對(duì)Rxjava在Android平臺(tái)使用的一個(gè)響應(yīng)式擴(kuò)展組件。使用RxAndroid的Schedulers(調(diào)度器)可以解決Android主線程問題, 多線程等問題。
觀察者模式的四大要素
- Observable 被觀察者
- Observer
- 觀察者 subscribe 訂閱
- 事件

觀察者訂閱被觀察者,一旦被觀察者發(fā)出事件,觀察者就可以接收到。
擴(kuò)展的觀察者模式

onNext()訂閱了一個(gè)事件,當(dāng)事件完成時(shí)會(huì)回調(diào)onComplete(),在完成過程中發(fā)生了異常會(huì)回調(diào)onError()。
使用
依賴
//在Project的gradle下添加maven倉庫
maven { url "https://oss.jfrog.org/libs-snapshot" }
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Hello World
//1.創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world");
emitter.onComplete();
}
});
//2.創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe():");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext():" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError():" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete():");
}
};
//3.訂閱事件
observable.subscribe(observer);
注意:onError()和onComplete()只會(huì)回調(diào)一個(gè)。
操作符
Creating Observables(創(chuàng)建 Observable)
Create
//鏈?zhǔn)綄懛? Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe():"+d.toString());
}
@Override
public void onNext(String o) {
System.out.println("onNext():" + o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError():" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete():");
}
});
Just
使用將為你創(chuàng)建一個(gè)Observable并自動(dòng)為你調(diào)用onNext( )發(fā)射數(shù)據(jù),just中傳遞的參數(shù)將直接在Observer的onNext()方法中接收到。
Observable.just("hello world").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
From
將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable,遍歷集合,發(fā)送每個(gè)item。相當(dāng)于多次回調(diào)onNext()方法,每次傳入一個(gè)item。
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add("Hello" + i);
}
Observable.fromArray(list).subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
System.out.println(strings);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Defer
當(dāng)觀察者訂閱時(shí),才創(chuàng)建Observable,并且針對(duì)每個(gè)觀察者創(chuàng)建都是一個(gè)新的Observable。 以何種方式創(chuàng)建這個(gè)Observable對(duì)象,當(dāng)滿足回調(diào)條件后,就會(huì)進(jìn)行相應(yīng)的回調(diào)。
value = "2020/12/13";
Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> get() throws Throwable {
return Observable.just(value);
}
});
value = "12345";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Empty/Never/Throw
Empty是創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable。 Never是創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable。 Throw是創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable。 這三個(gè)操作符生成的Observable行為非常特殊和受限。測(cè)試的時(shí)候很有用,有時(shí)候也用于結(jié)合其它的Observables,或者作為其它需要Observable的操作符的參數(shù)。
Observable.defer(new Supplier<ObservableSource<?>>() {
@Override
public ObservableSource<?> get() throws Throwable {
return Observable.error(new Throwable("你寫了個(gè)bug"));
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
Interval
創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,可用作定時(shí)器。即按照固定1秒一次調(diào)用onNext()方法。
//TrampolineScheduler不會(huì)立即執(zhí)行,當(dāng)其他排隊(duì)任務(wù)結(jié)束時(shí)才執(zhí)行,TrampolineScheduler運(yùn)行在主線程。
Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Long aLong) {
System.out.println(aLong);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Repeat
創(chuàng)建一個(gè)Observable,該Observable的事件可以重復(fù)調(diào)用。
Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Start
返回一個(gè)Observable,它發(fā)射一個(gè)類似于函數(shù)聲明的值。
Timer
創(chuàng)建一個(gè)Observable,它在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值,即表示延遲2秒后,調(diào)用onNext()方法。
Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println(aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Transforming Observables(轉(zhuǎn)換 Observable)
Map
Map就是把原來的Observable對(duì)象轉(zhuǎn)換成另一個(gè)Observable對(duì)象,同時(shí)將傳輸?shù)臄?shù)據(jù)進(jìn)行一些靈活的操作,方便Observer獲得想要的數(shù)據(jù)形式。
//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return s.toString();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
flatMap對(duì)于數(shù)據(jù)的轉(zhuǎn)換比map()更加徹底,如果發(fā)送的數(shù)據(jù)是集合,flatmap()重新生成一個(gè)Observable對(duì)象,并把數(shù)據(jù)轉(zhuǎn)換成Observer想要的數(shù)據(jù)形式。它可以返回任何它想返回的Observable對(duì)象。
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> apply(Integer integer) throws Exception {
return Observable.just(integer.toString());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
GroupBy
根據(jù)規(guī)則對(duì)數(shù)據(jù)進(jìn)行分組。
Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer % 2==0?"偶數(shù)":"奇數(shù)";
}
}).subscribe(new Observer<GroupedObservable<String, Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
arg0.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(arg0.getKey() + "-------" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Buffer
定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值。
Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
System.out.println(integers);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Scan
將數(shù)據(jù)進(jìn)行累加。
Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Window
window和buffer相似,它返回的是一個(gè)Observable對(duì)象,它根據(jù)一系列任務(wù)規(guī)則把數(shù)據(jù)聚集到一個(gè)列表。
// window第一個(gè)參數(shù)count:每個(gè)窗口應(yīng)發(fā)射前的最大大小;第二個(gè):在啟動(dòng)新窗口之前需要跳過多少項(xiàng)
Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(final Observable<Integer> arg0) {
System.out.println(arg0);
arg0.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("---"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Filtering Observables(過濾 Observable)
Debounce
操作間隔一定時(shí)間內(nèi)沒有做任何操作,數(shù)據(jù)才會(huì)發(fā)送到觀察者。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(2000);
arg0.onNext(i);
}
arg0.onComplete();
}
}).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Distinct
去掉重復(fù)數(shù)據(jù)的操作符。
Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
ElementAt
取出指定位置的數(shù)據(jù)。
Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Filter
對(duì)數(shù)據(jù)進(jìn)行指定規(guī)則的過濾。
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
First
取數(shù)據(jù)中的第一個(gè)數(shù)據(jù)。
//first參數(shù):defaultItem: 當(dāng)前Observable不發(fā)射任何內(nèi)容時(shí)發(fā)出的默認(rèn)項(xiàng)
Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
});
IgnoreElements
忽略所有的數(shù)據(jù),不向觀察者發(fā)送數(shù)據(jù),直接回調(diào)onError或onComplete()。
Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
Last
列表數(shù)據(jù)最后指定的數(shù)位項(xiàng)數(shù)據(jù)。 SingleObserver只發(fā)射一條單一的數(shù)據(jù),或者一條異常通知,不能發(fā)射完成通知,其中數(shù)據(jù)與通知只能發(fā)射一個(gè)。
Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
}).last("4").subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
});
Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
}).last("4").subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
});
Sample
對(duì)數(shù)據(jù)源進(jìn)行樣本采集,發(fā)送給觀察者。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg0.onNext(i);
}
arg0.onComplete();
}
}).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Skip
跳過指定列表項(xiàng)數(shù)據(jù)的指定項(xiàng)數(shù)據(jù)。
Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
SkipLast
跳過列表數(shù)據(jù)的最后幾位數(shù)據(jù)。
Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Take
只取列表數(shù)據(jù)的前幾項(xiàng)。
Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
TakeLast
取列表數(shù)據(jù)項(xiàng)的最后幾項(xiàng)數(shù)據(jù)。 Consumer是簡易版的Observer,他有多重重載,可以自定義你需要處理的信息,他只提供一個(gè)回調(diào)接口accept,由于沒有onError和onCompete,無法再 接受到onError或者onCompete之后,實(shí)現(xiàn)函數(shù)回調(diào)。
Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
Combining Observables(組合 Observable)
Zip
通過一個(gè)函數(shù)將多個(gè)Observables的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)。當(dāng)其中一個(gè)Observable發(fā)送數(shù)據(jù)結(jié)束或異常,另外一個(gè)也停止發(fā)送。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Merge
合并多個(gè)Observables的發(fā)射物。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
StartWith
在數(shù)據(jù)序列的開頭插入一條指定的項(xiàng)。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
CombineLatest
當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。
Observable<Integer> observable = Observable.just(1, 3, 5);
Observable<Integer> observable1 = Observable.just(2, 4, 6);
Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Throwable {
System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Join
任何時(shí)候,只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)定義的時(shí)間窗口內(nèi),這個(gè)Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個(gè)Observable發(fā)射的數(shù)據(jù)。
String[] args1 = new String[]{"張欣1", "張欣2", "張欣3", "張欣4", "張欣5"};
String[] args2 = new String[]{"春曉1", "春曉2", "春曉3", "春曉4"};
Observable<String> o1 = Observable.fromArray(args1);
Observable<String> o2 = Observable.fromArray(args2);
//相同的數(shù)組可以進(jìn)行合并
o2.join(o1, new Function<String, Observable<Long>>() {
@Override
public Observable<Long> apply(String s) throws Exception {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Function<String, Observable<Long>>() {
@Override
public Observable<Long> apply(String s) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + "-&--" + s2;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
SwitchOnNext
將一個(gè)發(fā)射多個(gè)Observables的Observable轉(zhuǎn)換成另一個(gè)單獨(dú)的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項(xiàng)。
final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
Thread.sleep(1000);
// 此時(shí)發(fā)射一個(gè)新的observable2,將會(huì)取消訂閱observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 創(chuàng)建發(fā)射含有Error通知的Observable序列的Observable
Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
// emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發(fā)射一個(gè)發(fā)射Error通知的Observable
// emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發(fā)射一個(gè)發(fā)射Error通知的Observable
Thread.sleep(1000);
// 此時(shí)發(fā)射一個(gè)新的observable2,將會(huì)取消訂閱observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
// 可選參數(shù) bufferSize: 緩存數(shù)據(jù)項(xiàng)大小
// 接受一個(gè)發(fā)射Observable序列的Observable類型的sources,
// 當(dāng)sources發(fā)射一個(gè)新的Observable后,則會(huì)取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù)
Disposable subscribe = Observable.switchOnNext(sources)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("--------------------------------------------------------------------");
// 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
// 可選參數(shù) prefetch: 與讀取數(shù)據(jù)項(xiàng)大小
// 當(dāng)sources發(fā)射一個(gè)新的Observable后,則會(huì)取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù),
// 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成,在那時(shí)它才會(huì)把onError傳遞給觀察者
Observable.switchOnNextDelayError(sourcesError)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long t) {
System.out.println("--> onNext(2): " + t);
}
@Override
public void onError(Throwable e) {
// 判斷是否是CompositeException對(duì)象(發(fā)生多個(gè)Observable出現(xiàn)Error時(shí)會(huì)發(fā)送的對(duì)象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(2): " + exceptions);
} else {
System.out.println("--> onError(2): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
Error Handling Operators(處理錯(cuò)誤)
Catch
Catch操作符攔截原始Observable的onError通知,將它替換為其它的數(shù)據(jù)項(xiàng)或數(shù)據(jù)序列,讓產(chǎn)生的Observable能夠正常終止或者根本不終止。 還有一個(gè)叫onErrorResumeNext的操作符,它的行為與Catch相似。 RxJava將Catch實(shí)現(xiàn)為三個(gè)不同的操作符:
- onErrorReturn 讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止。
- onErrorResumeNext 讓Observable在遇到錯(cuò)誤時(shí)開始發(fā)射第二個(gè)Observable的數(shù)據(jù)序。
- onExceptionResumeNext 讓Observable在遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射后面的數(shù)據(jù)項(xiàng)。
Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Throwable {
return null;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
Retry
如果原始Observable遇到錯(cuò)誤,重新訂閱它期望它能正常終止。 retryWhen和retry類似,區(qū)別是,retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù),這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable,retryWhen觀察它的結(jié)果再?zèng)Q定是不是要重新訂閱原始的Observable。如果這個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù),它就重新訂閱,如果這個(gè)Observable發(fā)射的是onError通知,它就將這個(gè)通知傳遞給觀察者然后終止。 retryWhen默認(rèn)在trampoline調(diào)度器上執(zhí)行,你可以通過參數(shù)指定其它的調(diào)度器。
場景:網(wǎng)絡(luò)請(qǐng)求失敗重試操作。
final AtomicInteger atomicInteger = new AtomicInteger(3);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext(String.valueOf(System.currentTimeMillis()));
emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
return throwableObservable;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
更多相關(guān)文章
Android如何進(jìn)階:http://docs.qq.com/doc/DWHFqVHBMVEJPWUx
Android面試題匯總:http://docs.qq.com/doc/DWGZIRFh5VEtYWE1D
Android音視頻需要學(xué)習(xí)哪些:http://docs.qq.com/doc/DWFFWZHNPTHZVdHFX
Android常有的開源框架有哪些框:docs.qq.com/doc/DWHlGYUdseVhsSUda
Android車載應(yīng)需要學(xué)習(xí)哪些:docs.qq.com/doc/DWEl0blBabXVvU2Nw Android
Framework怎么學(xué):docs.qq.com/doc/DWFdlc2JocEtNbEJ1
Schedulers(調(diào)度器)
它是RxJava以一種及其簡單的方式解決多線程問題的機(jī)制。
種類
io() 用于I/O操作。 computation() 計(jì)算,計(jì)算工作默認(rèn)的調(diào)度器,與I/O操作無關(guān)。 immediate() 立即執(zhí)行,允許立即在當(dāng)前線程執(zhí)行你指定的工作。 newThread() 新線程,為指定任務(wù)創(chuàng)建新線程。 trampoline() 順序處理,按需處理隊(duì)列,并運(yùn)行隊(duì)列的每一個(gè)任務(wù)。
AndroidSchedulers
RxAndroid提供在Android平臺(tái)的調(diào)度器(指定觀察者在主線程)。
- SubscribeOn 方法用于每個(gè)Observable對(duì)象
- ObserveOn 方法用于每個(gè)Subscriber(Observer)對(duì)象
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(new HttpErrorHandler<T>())
.subscribe(observer);
使用場景
與Retrofit結(jié)合使用
retrofitBuilder = new Retrofit.Builder();
retrofitBuilder.client(okHttpClient)
.addConverterFactory(ScalarsConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create());
public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {
if (observable == null || httpCallBack == null) {
throw new IllegalArgumentException("observable或HttpCallBack為空");
}
//觀察者_(dá)網(wǎng)絡(luò)請(qǐng)求狀態(tài)
BaseObserver<T> observer = new BaseObserver<T>() {
@Override
public void onNext(T t) {
try {
if (t != null) {
httpCallBack.onSuccess(t);
}
} catch (Exception e) {
e.printStackTrace();
httpCallBack.onFailure(e);
}
}
@Override
public void onError(Throwable e) {
httpCallBack.onFailure(e);
}
};
if (owner == null) {
//被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.subscribe(observer);
} else {
//被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
.subscribe(observer);
}
}
與RxPermission結(jié)合使用
RxPermission是基于RxJava的Android動(dòng)態(tài)權(quán)限申請(qǐng)框架。
使用(簡單封裝)
public void initPermissions(String[] permissions, PermissionResult permissionResult) {
if (rxPermissions == null) {
rxPermissions = new RxPermissions(this);
}
rxPermissions.requestEachCombined(permissions)
.subscribe(permission -> {
if (permission.granted) {
permissionResult.onSuccess();
} else if (permission.shouldShowRequestPermissionRationale) {
permissionResult.onFailure();
} else {
permissionResult.onFailureWithNeverAsk();
}
});
}
代替EventBus
EventBus是一個(gè)Android端優(yōu)化的publish/subscribe消息總線,簡化了應(yīng)用程序內(nèi)各組件間、組件與后臺(tái)線程間的通信。更多相關(guān)請(qǐng)參考Android事件總線之EventBus。 RxJava也可以實(shí)現(xiàn)事件總線,因?yàn)樗鼈兌家罁?jù)于觀察者模式。我們使用RxJava替換EventBus,可以減少App的體積。
使用
private static volatile RxBus instance;
private PublishSubject<Object> mRxtBus;
public static RxBus getDefault() {
if (instance == null) {
synchronized (RxBus.class) {
instance = new RxBus();
}
}
return instance;
}
private RxBus() {
mRxtBus = PublishSubject.create();
}
public void post(String tag, Object event) {
Message msg = new Message(tag, event);
mRxtBus.onNext(msg);
}
public <T> Observable<T> toEvent(Class<T> eventType) {
return mRxtBus.ofType(eventType);
}
}
發(fā)送
RxBus.getDefault().post("payValue",code);
接收
subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
@Override
public void accept(RxBus.Message message) throws Throwable {
if ("payValue".equals(message.getTag())) {
Log.e("yhj", "accept: " + message.getEvent().toString());
}
}
});
解綁
if (subscribe != null && !subscribe.isDisposed()) {
subscribe.dispose();
}
Rxjava內(nèi)存泄漏的處理
Rxjava的使用不當(dāng)會(huì)導(dǎo)致內(nèi)存泄漏,使用AutoDispose可以解決這個(gè)問題,它是一個(gè)隨Android生命周期事件自動(dòng)解綁Rxjava訂閱的方便工具。
使用
結(jié)合JetPack的LifeCycle(生命周期感知型組件),根據(jù)生命周期取消訂閱。
//被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
.subscribe(observer);
總結(jié)
本文主要是對(duì)RxJava使用及Android常見使用場景進(jìn)行總結(jié),掌握這些還遠(yuǎn)遠(yuǎn)不夠,RxJava還有許多強(qiáng)大的功能,諸如從磁盤/內(nèi)存中獲取緩存數(shù)據(jù),背壓策略,聯(lián)想搜索優(yōu)化等等。后面在項(xiàng)目開發(fā)中遇到相關(guān)場景再進(jìn)行總結(jié),更新。本文若有不當(dāng)之處,請(qǐng)批評(píng)指正。