作者:IT魔幻師
博客:www.huyingzi.top
轉(zhuǎn)載請注明出處:http://www.itdecent.cn/p/afeba5aea533
一、創(chuàng)建型操作符
主要用于創(chuàng)建被觀察者
-
just
create的快捷創(chuàng)建操作,create操作符必須手動調(diào)用onNext才能觸發(fā)事件,just會自動觸發(fā)@Test public void testjust() { //just是create的快捷創(chuàng)建操作 Observable.just("我是你爸爸","我是你爸爸2").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { //此處會依次收到j(luò)ust參數(shù)傳遞過來的值 } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } -
fromArray
相比于just,fromArray適用于多參數(shù)的情況.@Test public void testFromArray() { Observable.fromArray(new String[]{"我是你爸爸", "我是你爸爸2", "我是你爸爸3", "我是你爸爸4"}).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println("onNext "+s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } -
range
創(chuàng)建在一定范圍內(nèi)的事件@Test public void testRange() { //從5開始執(zhí)行11次事件 Observable.range(5,11).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } -
empty
主要適用于調(diào)用后不需要返回參數(shù)只需要關(guān)心結(jié)果,如:發(fā)起網(wǎng)絡(luò)請求后在onComplete()中處理結(jié)果即可他不會回調(diào)onNext函數(shù).@Test public void testempty() { Observable.empty().subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("執(zhí)行結(jié)束"); } }); } -
interval
定時器操作符,需要依賴Android的api不能在純java環(huán)境下使用//每隔1單位秒的時間執(zhí)行一次 Observable.interval(1, TimeUnit.SECONDS); -
intervalRange
定時器操作符,需要依賴Android的api不能在純java環(huán)境下使用//從0開始每隔1000毫秒發(fā)送50個事件 初始延時0 Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong); } }); timer
跟interval一樣.
二、轉(zhuǎn)換操作符
將事件類型轉(zhuǎn)換成我們想要的結(jié)果
- map
@Test public void testMap() { //場景:根據(jù)圖片地址最終轉(zhuǎn)換成bitmap Observable.just("icon01.png","icon02.png").map(new Function<String, Bitmap>() { @Override public Bitmap apply(String url) throws Exception { //在此次模擬執(zhí)行網(wǎng)絡(luò)請求等操作 // ... 此處省略 Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888); return mBitmap; } }).subscribe(new Observer<Bitmap>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Bitmap bitmap) { //在此次就可以 以此得到請求到的圖片 System.out.println("得到結(jié)果:"+bitmap); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } - flatMap
在上一個事件完成后才能開始下一個事件的情況@Test public void testFlatMap() { //比如:token過期了 必須先請求一個token 再進(jìn)行登錄請求 Observable.just("getToken","login").flatMap(new Function<String, ObservableSource<?>>() { @Override public ObservableSource<?> apply(String s) throws Exception { System.out.println("執(zhí)行事件:"+s); return createRespone(s); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { //依次回調(diào)處理結(jié)果 System.out.println(o); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private ObservableSource<?> createRespone(final String s) { //根據(jù)請求再創(chuàng)建一個被觀察者,觀察上一個請求是否成功了 return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { System.out.println("上一個事件已經(jīng)執(zhí)行完成開始執(zhí)行此事件:"+s); //此處是基于getToken完成之后才會執(zhí)行 emitter.onNext(s); } }); } - groupBy
對傳入的事件進(jìn)行分組,分組的條件可以自己指定@Test public void testGroupBy() { Observable.just(1,2,3,4).groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer>2?"A組":"B組"; } }).subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { //stringIntegerGroupedObservable 是一個分組后的被觀察者 stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { String key = stringIntegerGroupedObservable.getKey(); System.out.println("key="+key+" "+integer); } }); } }); } - buffer
大批量數(shù)據(jù)需要處理的時候,對其進(jìn)行分批次處理@Test public void testBuffer() { //將6條數(shù)據(jù)每2條分一個組執(zhí)行 Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Integer> integers) { //以此回調(diào)每個組的數(shù)據(jù) System.out.println(integers); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } - range
上一個結(jié)果作為下一個參數(shù),所有的結(jié)果累加得到最終結(jié)果,文件合并或者字符串拼接等場景.@Test public void testScan() { Observable.range(1,5).scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { //第一個參數(shù)integer為之前所有結(jié)果的和 就是累加的形式 //相當(dāng)于 第一個文件跟第二個文件合并,合并后的結(jié)果跟第三個文件合并...最終合并成一個大文件 return integer+integer2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
三、過濾操作符
- filter
對事件進(jìn)行過濾或者不過濾的處理@Test public void testFilter() { Observable.just(1,2,3,4,5,6).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //此次決定是否過濾 //true 不過濾 //false 過濾掉不計(jì)入結(jié)果中 return integer>2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { //接受過濾后的結(jié)果 System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } - take
限制產(chǎn)生事件的數(shù)量@Test public void testTake() { //每隔1單位秒的時間執(zhí)行一次 take限制只執(zhí)行5次 Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { System.out.println(aLong+""); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } - distinct
過濾重復(fù)事件@Test public void testDistinct() { Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } - elementAt
過濾指定的事件//指定過慮出第5個事件 Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } });
四、條件操作符
- all
判斷所有事件是否滿足一個條件,如果全部滿足則為trueObservable.just(1,2,3,4,5,6).all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //所有的事件都大于2嗎 return integer>2; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //此次返回時間結(jié)果 System.out.println(aBoolean); } }); - contains
判斷所有事件中是否包含某項(xiàng)事件Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //此處返回是否包含3的結(jié)果 System.out.println(aBoolean); } }); - any
所有事件中只要有有一個符合條件即為trueObservable.just(1,2,3,4,5).any(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer==3; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println(aBoolean); } }); - isEmpty
判斷一個觀察者是否有事件Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //有事件返回true 空事件返回false System.out.println(aBoolean); } }); - defaultIfEmpty
如果被觀察者不發(fā)送任何事件,則會發(fā)送默認(rèn)事件.defaultIfEmpty(0) - skipWhile
跳過滿足條件的事件//從0開始每隔1000毫秒發(fā)送50個事件 初始延時0 Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { //跳過<10的事件 return aLong<10; } }).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong); } });
五、合并操作符
將被觀察者進(jìn)行合并
- startWith
把需要的事件合并成一個事件進(jìn)行處理,會先處理startWith添加的事件//把需要的事件合并成一個事件進(jìn)行處理,會先處理2,4,6,8的事件 Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } }); - concat
合并最多4個事件 以先來后到的順序進(jìn)行處理,跟startWith相反。//合并兩個事件 123 會優(yōu)先處理 Observable.concat( Observable.just(1,2,3), Observable.just(4,5,6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } }); - merge
merge合并多個被觀察者,合并之后按照時間順序并行執(zhí)行Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS); Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS); Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS); Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { System.out.println(o); } }); - mergeDelayError
延遲拋出異常事件,當(dāng)合并的其它事件都執(zhí)行完成之后再拋出異常//延遲拋出異常事件 Flowable observable1 = Flowable.create(new FlowableOnSubscribe<Publisher<?>>() { @Override public void subscribe(FlowableEmitter<Publisher<?>> emitter) throws Exception { //假設(shè)此處發(fā)生了異常 emitter.onError(new NullPointerException()); } }, BackpressureStrategy.BUFFER); Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS); Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { System.out.println(o); } }); - zip
將多個被觀察者壓縮成單個,輸出事件最少的被觀察者結(jié)果