一、 拆分使用
先創(chuàng)建被觀察者和觀察者,然后建立訂閱關(guān)系,這樣在觀察者中就會(huì)接收到個(gè)生命周期的回調(diào):
@Test
public void test(){
//1. 創(chuàng)建被觀察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 發(fā)送消息
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
//2. 創(chuàng)建觀察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立訂閱關(guān)系");
}
@Override
public void onNext(Integer integer) {
//接受到消息
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
//3. 建立訂閱關(guān)系
observable.subscribe(observer);
}
運(yùn)行結(jié)果:
建立訂閱關(guān)系
1
2
完成
二、 鏈?zhǔn)秸{(diào)用(一般都是這種寫(xiě)法):
@Test
public void test2() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立訂閱關(guān)系");
}
@Override
public void onNext(Integer integer) {
//接受到消息
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
}
三、更簡(jiǎn)單的觀察者
@Test
public void test3() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
Consumer相對(duì)Observer簡(jiǎn)化了很多,沒(méi)有了onSubscribe() onError () onComplete (),當(dāng)然也無(wú)法對(duì)這些進(jìn)行監(jiān)聽(tīng)了。
四、創(chuàng)建操作符
上面用的creat是創(chuàng)建被觀察者的一種操作符,另外常用的還有just、justArrat、range、empty,直接看運(yùn)行結(jié)果去理解就好了。
empty這里說(shuō)下,這個(gè)使用場(chǎng)景比如一個(gè)耗時(shí)操作不要任何數(shù)據(jù)反饋去更新UI,只是顯示和隱藏加載動(dòng)畫(huà)。(先不用去糾結(jié)耗時(shí)操作在哪里添加)
@Test
public void test4() {
System.out.println("-----------------just");
Observable.just(1, 2, 3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------fromArray");
Observable.fromArray(new Integer[]{1,2,3}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------range");
Observable.range(0, 3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------empty");
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立訂閱關(guān)系");
}
@Override
public void onNext(Object object) {
//接受到消息
System.out.println(object);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
運(yùn)行結(jié)果:
-----------------just
1
2
3
-----------------fromArray
1
2
3
-----------------range
0
1
2
-----------------empty
建立訂閱關(guān)系
完成
五、合并操作符
合并操作是指合并被觀察者,用同一個(gè)觀察者去接受,常用的有concatWith、startWith、concat、merge、zip,這里為了顯示出合并的區(qū)別,用了另一個(gè)創(chuàng)建創(chuàng)建操作符intervalRange,比如Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS),這個(gè)代表從0開(kāi)始發(fā)送10個(gè)數(shù),延遲0秒后開(kāi)始執(zhí)行,每1秒發(fā)送一次。
用這兩個(gè)被觀察者測(cè)試上面幾個(gè)合并操作符:
//發(fā)送0-4
Observable observable1 = Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS);
//發(fā)送10-14
Observable observable2 = Observable.intervalRange(10, 5, 0, 1, TimeUnit.SECONDS);
測(cè)試函數(shù):
private void concatWith() {
Log.e(TAG, "-----------------concatWith");
observable1.concatWith(observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "concatWith: " + aLong);
}
});
}
private void startWith() {
Log.e(TAG, "-----------------startWith");
observable1.startWith(observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "startWith: " + aLong);
}
});
}
public void concat() {
Log.e(TAG, "-----------------concat");
Observable.concat(observable1,observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "concat: " + aLong);
}
});
}
public void merge() {
Log.e(TAG, "-----------------merge");
Observable.merge(observable1,observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "merge: " + aLong);
}
});
}
運(yùn)行結(jié)果:
E/MainActivity: -----------------concatWith
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------startWith
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: -----------------concat
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------merge
E/MainActivity: -----------------merge
E/MainActivity: accept: 10
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 11
E/MainActivity: accept: 2
E/MainActivity: accept: 12
E/MainActivity: accept: 3
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 4
根據(jù)上面結(jié)果總結(jié):
-
concatWith和startWith是執(zhí)行的先后順序不一樣,是同步執(zhí)行的 -
concatWith和concat都是順序執(zhí)行,只是寫(xiě)法不一樣 -
concat和merge寫(xiě)法一樣,但是merge是異步的,兩個(gè)被觀察者沒(méi)有先后順序,各自執(zhí)行。
還有一種zip操作符,把被觀察者合并時(shí)一一對(duì)應(yīng),直接看使用方式:
private void zip() {
Observable observable1 = Observable.just("語(yǔ)文", "數(shù)學(xué)", "英語(yǔ)");
Observable observable2 = Observable.just("100", "80", "60");
Observable.zip(observable1, observable2, new BiFunction() {
@Override
public Object apply(Object o, Object o2) throws Exception {
return o.toString() + ":" + o2.toString();
}
})
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "accept: " + o);
}
});
}
運(yùn)行結(jié)果:
E/MainActivity: accept: 語(yǔ)文:100
E/MainActivity: accept: 數(shù)學(xué):80
E/MainActivity: accept: 英語(yǔ):60
六、變換操作符
常見(jiàn)的有map、concatMap、flatMap、groupBy、buffer
先通過(guò)最簡(jiǎn)單的map來(lái)看看變換操作符是干什么的
private void map() {
Log.e(TAG, "-----------------map");
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "轉(zhuǎn)化為String" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "merge: " + string);
}
});
}
運(yùn)行結(jié)果:
E/MainActivity: -----------------map
E/MainActivity: accept: 轉(zhuǎn)化為String1
也就是說(shuō)map里可以把被觀察者傳遞過(guò)來(lái)的數(shù)據(jù)轉(zhuǎn)換成另一種數(shù)據(jù)格式傳遞給觀察者,這里是Integer轉(zhuǎn)String,比如你也可以被觀察者傳遞過(guò)來(lái)一個(gè)URL,在Function直接網(wǎng)絡(luò)請(qǐng)求,轉(zhuǎn)化成請(qǐng)求結(jié)果給觀察者。
扯多了,繼續(xù)看上面的操作符flatMap:
private void flatMap() {
Observable.just(1)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("轉(zhuǎn)化為String" + integer);
e.onNext("我還可以再發(fā)送" + integer);
e.onNext("我還可以隨便發(fā)送" + integer);
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "accept: " + string);
}
});
}
運(yùn)行結(jié)果:
E/MainActivity: accept: 轉(zhuǎn)化為String1
E/MainActivity: accept: 我還可以再發(fā)送1
E/MainActivity: accept: 我還可以隨便發(fā)送1
這個(gè)相對(duì)map更靈活,map是的Function里直接返回的是轉(zhuǎn)換之后的數(shù)據(jù),一對(duì)一的,而flatMap的Function返回的是另一個(gè)被觀察者,所以這個(gè)可以在里面隨意發(fā)送給觀察者。
在用concatMap之前先看flatMap的另一種操作:
private void flatMap2() {
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add(integer + "." + (1 + i));
}
return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "accept: " + string);
}
});
}
在收到被觀察者發(fā)來(lái)的數(shù)據(jù)后,生產(chǎn)一個(gè)List再延遲1s發(fā)送給觀察者,看下運(yùn)行結(jié)果:
E/MainActivity: accept: 2.1
E/MainActivity: accept: 2.2
E/MainActivity: accept: 2.3
E/MainActivity: accept: 1.1
E/MainActivity: accept: 3.1
E/MainActivity: accept: 3.2
E/MainActivity: accept: 3.3
E/MainActivity: accept: 1.2
E/MainActivity: accept: 1.3
每個(gè)都是先.1再.2再.3沒(méi)錯(cuò),但是整體并沒(méi)有按照1、2、3順序執(zhí)行,說(shuō)明他們是異步執(zhí)行的,類似合并操作符中的merge(其實(shí)內(nèi)部調(diào)用的就是merge)??赐赀@個(gè)問(wèn)題,就可以猜到concatMap的作用了,就不貼了,是完全按順序同步輸出的。
然后來(lái)groupBy操作符:
private void group() {
Observable.just(20, 40, 60, 80, 100)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer >= 60 ? "及格" : "不及格";
}
})
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer + ":" + stringIntegerGroupedObservable.getKey());
}
});
}
});
}
輸出結(jié)果:
E/MainActivity: accept: 20:不及格
E/MainActivity: accept: 40:不及格
E/MainActivity: accept: 60:及格
E/MainActivity: accept: 80:及格
E/MainActivity: accept: 100:及格
buffer操作符:
private void buffer() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
e.onNext(i);
}
e.onComplete();
}
})
.buffer(20)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
運(yùn)行結(jié)果:
E/MainActivity: accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
E/MainActivity: accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
E/MainActivity: accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
E/MainActivity: accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
E/MainActivity: accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
這兩種沒(méi)什么特殊的,groupBy是按條件分組, buffer是分批發(fā)送。
七、過(guò)濾操作符
filter、take、distinct、elementAl
//條件篩選,輸出B、C
public void filter() {
Observable.just("A", "B", "C")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
if ("A".equals(s)) {
return false;
}
return true;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
}
//用于停止定時(shí)器,輸出0、1、2、3、4
public void take() {
Observable.interval(1, TimeUnit.SECONDS)
.take(5)// 5次之后停下
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: " + aLong);
}
});
}
//過(guò)濾重復(fù),輸出1、2、3
public void distinct() {
Observable.just(1,1,2,3,3)
.distinct()
.subscribe(new Consumer<Integer>() { // 下游 觀察者
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
//制定發(fā)送角標(biāo),輸出B
public void elementAt() {
Observable.just("A", "B", "C")
.elementAt(1, "X")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
八、條件操作符
any、all、contains,這些就是改變Java中if的書(shū)寫(xiě)方式,與、或、包含
all:全部為true,才是true,只要有一個(gè)為false,就是false
any:全部為 false,才是false, 只要有一個(gè)為true,就是true
contains :是否包含
//等于Java中if的連續(xù)判斷,有一個(gè)等于C就返回false,輸出false
public void all() {
Observable.just("A", "B", "C", "D")
.all(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.equals("C");
}
})
.subscribe(new Consumer<Boolean>() { // 下游 觀察者
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
//判斷包含
public void contains() {
Observable.just("A", "B", "C", "D")
.contains("C")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
//和上面的All相反,有一個(gè)等于C就返回true,輸出true
public void any() {
Observable.just("A", "B", "C", "D")
.any(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.equals("C");
}
})
.subscribe(new Consumer<Boolean>() { // 下游 觀察者
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
九、異常處理操作符
onErrorReturn、onErrorResumeNext、onExceptionResumeNext、retry
先模擬個(gè)錯(cuò)誤:
public void onError() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
}
e.onNext(i);
}
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
運(yùn)行結(jié)果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模擬一個(gè)錯(cuò)誤
上面代碼會(huì)在觀察者的onError中收到回調(diào),然后來(lái)看一下異常操作符能干什么,先看onErrorReturn和onErrorResumeNext,區(qū)別就是onErrorReturn發(fā)送一次,onErrorResumeNext可以任意發(fā),跟上面很多其他的操作符一樣:
private void onErrorReturn() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 400;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
運(yùn)行結(jié)果:
D/MainActivity: onNext: 400
D/MainActivity: onComplete:
onErrorResumeNext:
public void onErrorResumeNext() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(400);
e.onNext(4000);
e.onNext(40000);
e.onComplete();
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
運(yùn)行結(jié)果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onNext: 400
D/MainActivity: onNext: 4000
D/MainActivity: onNext: 40000
D/MainActivity: onComplete:
這里兩個(gè)注意點(diǎn):
-
onErrorReturn發(fā)生error后會(huì)自動(dòng)調(diào)用onComplete(),而onErrorResumeNext需要根據(jù)需要手動(dòng)調(diào)用 - 都不會(huì)再觸發(fā)觀察者的
onError()回調(diào),除非onErrorResumeNext中再手動(dòng)調(diào)用e.onError()
然后看下onExceptionResumeNext代碼:
public void onExceptionResumeNext() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onNext(400);
observer.onNext(4000);
observer.onNext(40000);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
運(yùn)行結(jié)果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模擬一個(gè)錯(cuò)誤
跟onErrorResumeNext的運(yùn)行結(jié)果對(duì)比,很明顯沒(méi)有400、4000、40000,說(shuō)明新的Observer并不會(huì)起作用,這里用的是Throwable,如果是用Exception,同樣也會(huì)有400、4000、40000,所以:onErrorResumeNext和onExceptionResumeNext對(duì)Exception的處理是一樣的流程,區(qū)別在于對(duì)Error處理的時(shí)候,是否會(huì)使用新的Observer發(fā)送消息,也就是onExceptionResumeNext不處理Error,直接回調(diào)觀察者的onError (),onErrorResumeNext都處理,不會(huì)再調(diào)用觀察者的onError ()。
然后是retry這個(gè)操作符,這個(gè)很簡(jiǎn)單,貼出三種常用的:
public void retry() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new IllegalAccessError("模擬錯(cuò)誤"));
}
e.onNext(i);
}
e.onComplete();
}
})
//不設(shè)置重試次數(shù)
.retry( new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
//true表示不停地重試 , false表示不重試
return true;
}
})
// //設(shè)置重試次數(shù)
// .retry(3, new Predicate<Throwable>() {
// @Override
// public boolean test(Throwable throwable) throws Exception {
// //true表示按設(shè)置的次數(shù)重試 , false表示不重試
// return true;
// }
// })
//
// //可獲取重試次數(shù)
// .retry(new BiPredicate<Integer, Throwable>() {
// @Override
// public boolean test(Integer integer, Throwable throwable) throws Exception {
// //相對(duì)上面兩種,這個(gè)integer表示重試次數(shù), 返回值跟上面一樣
// return true;
// }
// })
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
十、線程切換
默認(rèn)發(fā)送和接收都是在主線程:
private void schedulers() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "發(fā)送: " + Thread.currentThread().getName());
e.onNext("123");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "接收: " + Thread.currentThread().getName());
}
});
}
輸出:
E/MainActivity: 發(fā)送: main
E/MainActivity: 接收: main
可以通過(guò)subscribeOn()會(huì)同時(shí)修改觀察者和被觀察者的線程,通過(guò)observeOn()只設(shè)置觀察者線程,通過(guò)AndroidSchedulers.mainThread()得到主線程,通過(guò)Schedulers.io()得到子線程:
private void schedulers() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "發(fā)送: " + Thread.currentThread().getName());
e.onNext("123");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "接收: " + Thread.currentThread().getName());
}
});
}
這一段,先通過(guò)subscribeOn(Schedulers.io())把觀察者和被觀察者都設(shè)置到子線程,如果不寫(xiě)下面這句observeOn(AndroidSchedulers.mainThread()),會(huì)輸出:
E/MainActivity: 發(fā)送: RxCachedThreadScheduler-1
E/MainActivity: 接收: RxCachedThreadScheduler-1
但是下面又用observeOn(AndroidSchedulers.mainThread())把觀察者改回子線程,所以輸出:
E/MainActivity: 發(fā)送: RxCachedThreadScheduler-1
E/MainActivity: 接收: main
十一、背壓模式
當(dāng)上下游運(yùn)行在不同的線程中,且上游發(fā)射數(shù)據(jù)的速度大于下游接收處理數(shù)據(jù)的速度時(shí),就會(huì)產(chǎn)生背壓?jiǎn)栴},內(nèi)存使用越來(lái)越多,這時(shí)候就需要用Flowable去處理。Flowable會(huì)對(duì)上游發(fā)送的時(shí)間進(jìn)行緩存,緩存池也滿了(超出128)的時(shí)候會(huì)有4種不通的處理方式:
- BackpressureStrategy.ERROR:就會(huì)拋出異常
- BackpressureStrategy.DROP:把后面發(fā)射的事件丟棄
- BackpressureStrategy.LATEST:把前面發(fā)射的事件丟棄
- BackpressureStrategy.BUFFER:這種不會(huì)有上限,但是如果上游發(fā)送太多,也會(huì)造成內(nèi)存使用越來(lái)越大
Flowable的使用跟Observable很類似,簡(jiǎn)單使用:
private void backpressure() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
// 改成129就會(huì)崩潰
for (int i = 0; i < 128; i++) {
e.onNext(i); // todo 1
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收: " + integer);
}
});
}
然后看一種完整模式的觀察者:
private void backpressure() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
// 改成129就會(huì)崩潰
for (int i = 0; i < 128; i++) {
e.onNext(i); // todo 1
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收: " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
這一會(huì)發(fā)現(xiàn)觀察者收不到任何消息,這里跟Observable有個(gè)區(qū)別,就是訂閱的方法subscribe()的參數(shù),Observable訂閱對(duì)應(yīng)的是Observer,而Flowable對(duì)應(yīng)的是Subscriber,Observer和Subscriber對(duì)應(yīng)的回調(diào)onSubscribe(..)參數(shù)不同,Subscriber的onSubscribe(..)參數(shù)拿到的是一個(gè)Subscription,這個(gè)需要主動(dòng)去取數(shù)據(jù),比如:
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
這樣就會(huì)onNext()中就會(huì)收到前10個(gè)。那這個(gè)使用就很靈活了,根據(jù)代碼需要,可以在需要的地方主動(dòng)調(diào)用s.request(..),讓觀察者接收到數(shù)據(jù)。
十二、一個(gè)展示網(wǎng)絡(luò)圖片的例子
private void getImage(final String path) {
Observable.just(path)
// 通過(guò)map變換操作符把String轉(zhuǎn)換成Bitmap
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(path);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode();
if (HttpURLConnection.HTTP_OK == responseCode) {
Bitmap bitmap = BitmapFactory.decodeStream(httpURLConnection.getInputStream());
return bitmap;
}
return null;
}
})
// 下載圖片在子線程中
.subscribeOn(Schedulers.io())
// 設(shè)置圖片在主線程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
// 開(kāi)始操作前
public void onSubscribe(Disposable d) {
progressDialog = new ProgressDialog(MainActivity.this);
progressDialog.setMessage("正在下載中...");
progressDialog.show();
}
@Override
// 收到Bitmap
public void onNext(Bitmap bitmap) {
if (imageView != null) {
imageView.setImageBitmap(bitmap);
}
}
@Override
// 下載錯(cuò)誤
public void onError(Throwable e) {
if (progressDialog != null) {
progressDialog.dismiss();
}
if (imageView != null) {
imageView.setImageResource(R.mipmap.ic_launcher);
}
Log.e(TAG, "onError: " + e.toString());
}
@Override
// 下載完成
public void onComplete() {
if (progressDialog != null) {
progressDialog.dismiss();
}
}
});
}