本篇文章已授權(quán)微信公眾號 YYGeeker 獨(dú)家發(fā)布轉(zhuǎn)載請標(biāo)明出處
CSDN學(xué)院課程地址
- RxJava2從入門到精通-初級篇:https://edu.csdn.net/course/detail/10036
- RxJava2從入門到精通-中級篇:https://edu.csdn.net/course/detail/10037
- RxJava2從入門到精通-進(jìn)階篇:https://edu.csdn.net/course/detail/10038
- RxJava2從入門到精通-源碼分析篇:https://edu.csdn.net/course/detail/10138
7. RxJava高級用法(一)
7.1 自定義Operator
自定義Operator屬于RxJava的高級用法,可以自己自定義一些適用于常見應(yīng)用場景的操作符。實(shí)現(xiàn)自定義Operator很簡單,只需要實(shí)現(xiàn)RxJava提供的ObservableOperator接口,實(shí)現(xiàn)對應(yīng)的功能即可,同時(shí),使用lift操作符將自定義操作符應(yīng)用到我們的程序中。下面我們使用自定義Operator,該操作符的作用是將List集合轉(zhuǎn)換成String類型的輸出
1、實(shí)現(xiàn)ObservableOperator,創(chuàng)建自定義Operator
public class CustomOperator implements ObservableOperator<String, List<String>> {
@Override
public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(List<String> strings) {
observer.onNext(strings.toString());
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
2、使用lift操作符添加自定義Operator
public class Main {
public static void main(String[] args) {
//創(chuàng)建被觀察者
Observable.create(new ObservableOnSubscribe<List<String>>() {
@Override
//默認(rèn)在主線程里執(zhí)行該方法
public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
e.onNext(list);
e.onComplete();
}
})
.lift(new CustomOperator())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
3、輸出結(jié)果
onNext=[1, 2, 3, 4]
onComplete
7.2 自定義Transformer
自定義Transformer表示一個(gè)批量操作符的變換器,如果你在很多Observable中使用相同的一系列操作符,比如每次都要使用到map+take+doOnNext等操作,那么就可以定義一個(gè)通用的Transformer對象,里面可以將需要重復(fù)用到的操作符打包成Transformer對象,使用compose操作符將Transformer對象應(yīng)用到我們的Observable上即可
實(shí)現(xiàn)自定義Transformer很簡單,只需要實(shí)現(xiàn)RxJava提供的ObservableTransformer接口,實(shí)現(xiàn)對應(yīng)的功能即可,同時(shí),使用compose操作符將自定義Transformer應(yīng)用到我們的程序中。下面我們使用自定義Transformer,該Transformer的作用是將發(fā)射的數(shù)據(jù)從Integer轉(zhuǎn)換成String,并取2個(gè)數(shù)據(jù)項(xiàng),同時(shí)在發(fā)射的時(shí)候監(jiān)聽發(fā)射事件,進(jìn)行輸出的打印
1、實(shí)現(xiàn)ObservableTransformer,創(chuàng)建自定義Transformer
public class CustomTransformer implements ObservableTransformer<Integer, String> {
@Override
public ObservableSource<String> apply(io.reactivex.Observable<Integer> upstream) {
return upstream.take(2).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "序號:" + integer + "發(fā)射成功";
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s + ",準(zhǔn)備發(fā)射");
}
});
}
}
2、使用compose操作符添加自定義Transformer
public class Main {
public static void main(String[] args) {
//創(chuàng)建被觀察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.compose(new CustomTransformer())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
3、輸出結(jié)果
序號:1發(fā)射成功,準(zhǔn)備發(fā)射
序號:1發(fā)射成功
序號:2發(fā)射成功,準(zhǔn)備發(fā)射
序號:2發(fā)射成功
在安卓開發(fā)中,通常我們也會(huì)自定義Transformer來實(shí)現(xiàn)我們常用的線程切場景,具體如下
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> schedulersTransformerForFlowable() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
7.3 自定義Plugin
自定義Plugin表示自定義插件,自定義插件可以在RxJavaPlugins中提供的接口中去插入自己的一段代碼操作,類似于面向切面編程,或者理解成Android的Hook。如果你需要在所有的訂閱事件中去插入一段統(tǒng)一的操作,或者是監(jiān)聽所有訂閱事件發(fā)生異常時(shí)的回調(diào),都可以使用自定義插件。在實(shí)際應(yīng)用中,目前并未發(fā)現(xiàn)有什么作用
實(shí)現(xiàn)自定義Plugin只需要調(diào)用RxJavaPlugins提供的set方法即可,下面我們通過例子輸出Observable和Observer的地址信息,來驗(yàn)證每次訂閱的時(shí)候,回調(diào)自定義Plugin的方法中,插件對象和源對象是否為同一個(gè)對象
1、通過設(shè)置ObservableSubscribe,每次對Observable操作的時(shí)候回調(diào)
public class Main {
public static void main(String[] args) {
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());//任意操作符都有回調(diào)
RxJavaPlugins.setOnObservableSubscribe(new CustomObservableSubscribe());//每次subscribe時(shí)候有回調(diào)
Observable observable = getObservable();
Observer<Integer> observer = getObserver();
System.out.println("main observable.toString:" + observable.toString());
System.out.println("main observer.toString:" + observer.toString());
observable.subscribe(observer);
}
public static Observable getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(5);
emitter.onNext(2);
emitter.onNext(3);
}
});
}
public static Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext=" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
};
}
}
2、CustomObservableAssembly
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
3、CustomObservableSubscribe
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
return observer;
}
}
4、輸出結(jié)果
地址相同說明是同個(gè)對象,自定義插件Hook成功
CustomObservableAssembly observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
CustomObservableSubscribe observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca,observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
onNext=5
onNext=2
onNext=3
補(bǔ)充:
可以通過設(shè)置ErrorHandler,發(fā)生異常時(shí)會(huì)回調(diào)
RxJavaPlugins.setErrorHandler();
可以通過設(shè)置SchedulerHandler來Hook到對應(yīng)的schedule
RxJavaPlugins.setIoSchedulerHandler();
RxJavaPlugins.setNewThreadSchedulerHandler();
RxJavaPlugins.setComputationSchedulerHandler();
RxJavaPlugins.setSingleSchedulerHandler();
錯(cuò)誤演示:
由于CustomObservableAssembly是在任意操作符操作的時(shí)候都會(huì)回調(diào),所以在回調(diào)里面是不可以對observable再進(jìn)行操作符的操作,否則回調(diào)里面observable的操作符還是會(huì)回調(diào)CustomObservableAssembly自身,導(dǎo)致死循環(huán),發(fā)生StackOverflowError
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
由于CustomObservableSubscribe是在subscribe之后進(jìn)行的回調(diào),如果在回調(diào)里面對observable進(jìn)行操作符的操作,這個(gè)時(shí)候是不會(huì)生效的,因?yàn)樵趕ubscribe之后onNext的函數(shù)是不會(huì)再處理后面新添的操作符,原理與源碼有關(guān)
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
observable.take(2);
return observer;
}
}