功能性操作符
-
作用
輔助被觀察者(Observable) 在發(fā)送事件時(shí)實(shí)現(xiàn)一些功能性需求,如錯誤處理、線程調(diào)度等等.
-
類型

*###### 應(yīng)用場景 & 對應(yīng)操作符詳解
1.subscribe()
作用
訂閱,即連接觀察者 & 被觀察者,使得被觀察者 & 觀察者 形成訂閱關(guān)系。
<-- Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn) -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 在觀察者 subscriber抽象類復(fù)寫的方法 onSubscribe.call(subscriber),用于初始化工作
// 通過該調(diào)用,從而回調(diào)觀察者中的對應(yīng)方法從而響應(yīng)被觀察者生產(chǎn)的事件
// 從而實(shí)現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
// 同時(shí)也看出:Observable只是生產(chǎn)事件,真正的發(fā)送事件是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行時(shí)
}
2.subscribeOn() & observeOn()
作用
線程控制,即指定 被觀察者 (Observable) / 觀察者(Observer) 的工作線程類型。
| 類型 | 含義 | 應(yīng)用場景 |
|---|---|---|
| Schedulers.immediate() | 當(dāng)前線程 = 不指定線程 | 默認(rèn) |
| AndroidSchedulers.mainThread() | Android主線程 | 操作UI |
| Schedulers.newThread() | 常規(guī)新線程 | 耗時(shí)等操作 |
| Schedulers.io() | io操作線程 | 網(wǎng)絡(luò)請求、讀寫文件等io密集型操作 |
| Schedulers.computation() | CPU計(jì)算操作線程 | 大量計(jì)算操作 |
注:
若Observable.subscribeOn()多次指定被觀察者生產(chǎn)事件的線程,則只有第一次指定有效,其余的指定線程無效.
.若Observable.observeOn()多次指定觀察者 接收 & 響應(yīng)事件的線程,則每次指定均有效,即每指定一次,就會進(jìn)行一次線程的切換.
3.delay()
作用
使得被觀察者延遲一段時(shí)間再發(fā)送事件。
// 1. 指定延遲時(shí)間
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時(shí)間 & 調(diào)度器
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 線程調(diào)度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時(shí)間 & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執(zhí)行,執(zhí)行后再拋出錯誤異常
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 錯誤延遲參數(shù)
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時(shí)間 & 調(diào)度器 & 錯誤延遲
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 線程調(diào)度器;參數(shù)4 = 錯誤延遲參數(shù)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時(shí)間并添加調(diào)度器,錯誤通知可以設(shè)置是否延遲
事例
public void DelayOperators(View view){
Observable.just(1,2,3)
.delay(3, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss","接受到了時(shí)間"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss","發(fā)生了錯誤事件");
}
@Override
public void onComplete() {
Log.e("sss","事件已經(jīng)完成");
}
});
}
4.do()
作用
在某個(gè)事件的生命周期中調(diào)用。
do()操作符有很多個(gè),具體如下:

事例
public void DoOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Throwable("發(fā)生錯誤了"));
}
}).doOnEach(new Consumer<Notification<Integer>>() {
// 當(dāng)Observable每發(fā)送1次數(shù)據(jù)事件就會調(diào)用1次
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.e("sss","doOnEach"+integerNotification.getValue());
}
}).doOnNext(new Consumer<Integer>() {
// 執(zhí)行Next事件前調(diào)用
@Override
public void accept(Integer integer) throws Exception {
Log.e("sss","doOnNext:"+integer);
}
}).doAfterNext(new Consumer<Integer>() {
// 執(zhí)行Next事件后調(diào)用
@Override
public void accept(Integer integer) throws Exception {
Log.e("sss","doAfterNext:"+integer);
}
}).doOnComplete(new Action() {
// Observable正常發(fā)送事件完畢后調(diào)用
@Override
public void run() throws Exception {
Log.e("sss","doOnComplete");
}
}).doOnError(new Consumer<Throwable>() {
// Observable發(fā)送錯誤事件時(shí)調(diào)用
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("sss","doOnError:"+throwable.getMessage());
}
}).doOnSubscribe(new Consumer<Disposable>() {
// 觀察者訂閱時(shí)調(diào)用
@Override
public void accept(Disposable disposable) throws Exception {
Log.e("sss","doOnSubscribe---->");
}
}).doAfterTerminate(new Action() {
// Observable發(fā)送事件完畢后調(diào)用,無論正常發(fā)送完畢 / 異常終止
@Override
public void run() throws Exception {
Log.e("sss","doAfterTerminate");
}
}).doFinally(new Action() {
// 最后執(zhí)行
@Override
public void run() throws Exception {
Log.e("sss","doFinally:");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer );
}
@Override
public void onError(Throwable e) {
Log.e("sss", "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("sss", "對Complete事件作出響應(yīng)");
}
});
}
5.onErrorReturn()
作用
遇到錯誤時(shí),發(fā)送1個(gè)特殊事件 & 正常終止,可捕獲在它之前發(fā)生的異常。
事例
public void onErrorReturnOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("發(fā)生了錯誤"));
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e("sss", "在onErrorReturn處理了錯誤: "+throwable.toString() );
return 888;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer );
}
@Override
public void onError(Throwable e) {
Log.e("sss","對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("sss","對Complete事件作出響應(yīng)");
}
});
}
6.onErrorResumeNext()
作用
遇到錯誤時(shí),發(fā)送1個(gè)新的Observable,
注:onErrorResumeNext()攔截的錯誤 = Throwable;若需攔截Exception請用onExceptionResumeNext()
若onErrorResumeNext()攔截的錯誤 = Exception,則會將錯誤傳遞給觀察者的onError方法
事例:
public void onErrorResumeNextOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("發(fā)生錯誤了"));
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
return Observable.just(11,22);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss","接受到了時(shí)間"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss","對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("sss","對Complete事件作出響應(yīng)");
}
});
}
7.onExceptionResumeNext()
作用
遇到錯誤時(shí),發(fā)送1個(gè)新的Observable
注:
onExceptionResumeNext()攔截的錯誤 = Exception;若需攔截Throwable請用onErrorResumeNext()
若onExceptionResumeNext()攔截的錯誤 = Throwable,則會將錯誤傳遞給觀察者的onError方法
事例
public void onExceptionResumeNextOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("發(fā)生了異常"));
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer );
}
@Override
public void onError(Throwable e) {
Log.e("sss","對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("sss","對Complete事件作出響應(yīng)");
}
});
}
8.retry()
作用
重試,即當(dāng)出現(xiàn)錯誤時(shí),讓被觀察者(Observable)重新發(fā)射數(shù)據(jù),即接收到 onError()時(shí),重新訂閱 & 發(fā)送事件,Throwable 和 Exception都可攔截。
<-- 1. retry() -->
// 作用:出現(xiàn)錯誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯誤,則一直重新發(fā)送
<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯誤,則持續(xù)重試)
// 參數(shù) = 判斷邏輯
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯誤,則持續(xù)重試
// 參數(shù) = 判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯誤信息)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯
事例
public void retryOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
}
}).retry().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer );
}
@Override
public void onError(Throwable e) {
Log.e("sss","對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("sss","對Complete事件作出響應(yīng)");
}
});
}
9.retryUntil()
作用
出現(xiàn)錯誤后,判斷是否需要重新發(fā)送數(shù)據(jù)。
10.retryWhen()
作用
遇到錯誤時(shí),將發(fā)生的錯誤傳遞給一個(gè)新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件。
public void retryWhenOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯誤了"));
e.onNext(3);
}
})
// 遇到error事件才會回調(diào)
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型
// 返回Observable<?> = 新的被觀察者 Observable(任意類型)
// 此處有兩種情況:
// 1. 若 新的被觀察者 Observable發(fā)送的事件 = Error事件,那么 原始Observable則不重新發(fā)送事件:
// 2. 若 新的被觀察者 Observable發(fā)送的事件 = Next事件 ,那么原始的Observable則重新發(fā)送事件:
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 1. 若返回的Observable發(fā)送的事件 = Error事件,則原始的Observable不重新發(fā)送事件
// 該異常錯誤信息可在觀察者中的onError()中獲得
return Observable.error(new Throwable("retryWhen終止啦"));
// 2. 若返回的Observable發(fā)送的事件 = Next事件,則原始的Observable重新發(fā)送事件(若持續(xù)遇到錯誤,則持續(xù)重試)
// return Observable.just(1);
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("sss", "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.e("sss", "對Error事件作出響應(yīng)" + e.toString());
// 獲取異常錯誤信息
}
@Override
public void onComplete() {
Log.e("sss", "對Complete事件作出響應(yīng)");
}
});
}
11.repeat()
作用
無條件地、重復(fù)發(fā)送 被觀察者事件
事例
public void repeatOperators(View view){
Observable.just(1,2,3,4)
.repeat(3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件" + integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "對Error事件作出響應(yīng)" + e.toString());
}
@Override
public void onComplete() {
Log.e("sss", "對Complete事件作出響應(yīng)");
}
});
}
12.repeatWhen()
作用
有條件地、重復(fù)發(fā)送 被觀察者事件
實(shí)例
public void repeatWhenOperators(View view){
Observable.just(1,2,3)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return Observable.just(2);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "開始采用subscribe連接");
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件" + integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "對Error事件作出響應(yīng):" + e.toString());
}
@Override
public void onComplete() {
Log.e("sss", "對Complete事件作出響應(yīng)");
}
});
}
-
實(shí)際開發(fā)中的應(yīng)用
1.有條件網(wǎng)絡(luò)請求輪詢
repeatWhen()的使用
int i = 0;
public void webPolling(View view) {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest_Interface request_interface = retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable = request_interface.getCall();
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
if (i > 3) {
return Observable.error(new Throwable("輪詢結(jié)果"));
}
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
i++;
Log.e("sss", translation.getContent().getOut());
}
@Override
public void onError(Throwable e) {
Log.e("sss", e.toString());
}
@Override
public void onComplete() {
}
});
}
2.網(wǎng)絡(luò)請求出錯重連
// 設(shè)置變量
// 可重試次數(shù)
private int maxConnectCount = 10;
// 當(dāng)前已重試次數(shù)
private int currentRetryCount = 0;
// 重試等待時(shí)間
private int waitRetryTime = 0;
public void requestRetry(View view){
Retrofit retrofit=new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
final GetRequest_Interface request=retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable=request.getCall();
observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 輸出異常信息
Log.e("sss", "發(fā)生異常 = "+ throwable.toString());
/**
* 需求1:根據(jù)異常類型選擇是否重試
* 即,當(dāng)發(fā)生的異常 = 網(wǎng)絡(luò)異常 = IO異常 才選擇重試
*/
if (throwable instanceof IOException){
Log.e("sss", "屬于IO異常,需重試" );
/**
* 需求2:限制重試次數(shù)
* 即,當(dāng)已重試次數(shù) < 設(shè)置的重試次數(shù),才選擇重試
*/
if (currentRetryCount < maxConnectCount){
// 記錄重試次數(shù)
currentRetryCount++;
Log.e("sss", "重試次數(shù) = " + currentRetryCount);
/**
* 需求2:實(shí)現(xiàn)重試
* 通過返回的Observable發(fā)送的事件 = Next事件,從而使得retryWhen()重訂閱,最終實(shí)現(xiàn)重試功能
*
* 需求3:延遲1段時(shí)間再重試
* 采用delay操作符 = 延遲一段時(shí)間發(fā)送,以實(shí)現(xiàn)重試間隔設(shè)置
*
* 需求4:遇到的異常越多,時(shí)間越長
* 在delay操作符的等待時(shí)間內(nèi)設(shè)置 = 每重試1次,增多延遲重試時(shí)間1s
*/
// 設(shè)置等待時(shí)間
waitRetryTime = 1000 + currentRetryCount* 1000;
Log.e("sss", "等待時(shí)間 =" + waitRetryTime);
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
}else{
// 若重試次數(shù)已 > 設(shè)置重試次數(shù),則不重試
// 通過發(fā)送error來停止重試(可在觀察者的onError()中獲取信息)
return Observable.error(new Throwable("重試次數(shù)已超過設(shè)置次數(shù) = " +currentRetryCount + ",即 不再重試"));
}
}
// 若發(fā)生的異常不屬于I/O異常,則不重試
// 通過返回的Observable發(fā)送的事件 = Error事件 實(shí)現(xiàn)(可在觀察者的onError()中獲取信息)
else{
return Observable.error(new Throwable("發(fā)生了非網(wǎng)絡(luò)異常(非I/O異常)"));
}
}
});
}
}).subscribeOn(Schedulers.io()) // 切換到IO線程進(jìn)行網(wǎng)絡(luò)請求
.observeOn(AndroidSchedulers.mainThread()) // 切換回到主線程 處理請求結(jié)果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation result) {
// 接收服務(wù)器返回的數(shù)據(jù)
Log.e("sss", "發(fā)送成功");
Log.e("sss",result.getContent().getOut());
// result.show();
}
@Override
public void onError(Throwable e) {
// 獲取停止重試的信息
Log.e("sss", e.toString());
}
@Override
public void onComplete() {
}
});
}