Func1 和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于 Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
1.flatMapIterable: (將數(shù)據(jù)轉(zhuǎn)換后再發(fā)送)
private Observable<String> flatMapIterableObserver(){
return Observable.just(1,2,3)
.flatMapIterable(new Func1<Integer, Iterable<? extends String>>() {
@Override
public Iterable<? extends String> call(Integer integer) {
ArrayList<String> strings = new ArrayList<>();
for (int i=0;i<3;i++){
strings.add("flatMapIterableObserver "+integer);
}
return strings;
}
});
}
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
2.map:(將數(shù)據(jù)源Observable發(fā)送給每個數(shù)據(jù)進(jìn)行指定函數(shù)轉(zhuǎn)換,再將轉(zhuǎn)換后的數(shù)據(jù)發(fā)送出去)
private Observable<Integer> mapObservable(){
return Observable.just(1,2,3)
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer*10;
}
});
}
09-13 19:30:11.802 32095-32095/com.example.administrator.rxjavademo E/call: map: 10
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 20
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 30
- flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。
- flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中,map返回的是結(jié)果集。
- flatMap() 的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat
- map適用于一對一轉(zhuǎn)換,flatmap適用于一對多,多對多的場景
3.groupBy:(將數(shù)據(jù)源轉(zhuǎn)換為groupBy篩選后的Observable對象)
private Observable<GroupedObservable<Integer,String>> groupedByStringObservable(){
return Observable.just(1,2,3,4,5,6,7,8,9)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 2;
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "groupBy: "+integer;
}
});
}
groupedByStringObservable().subscribe(new Action1<GroupedObservable<Integer, String>>() {
@Override
public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
if (integerStringGroupedObservable.getKey()==0){
integerStringGroupedObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("call", s);
}
});
}
}
});
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 2
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 4
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 6
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 8
4.cast:(將Observable發(fā)送的數(shù)據(jù)強(qiáng)轉(zhuǎn)成另外一種類型)
5.scan:(做一次計算,有條件、有篩選的輸出最終結(jié)果)
6.throttleWithTimeout:(時間限流, 低于指定時間的都將被過濾)
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<10;i++){
if (!subscriber.isUnsubscribed()){
subscriber.onNext(i);
}
int sleep = 100;
if (i%3==0){
sleep = 300;
}
try{
Thread.sleep(sleep);
} catch (Exception e){
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
}
private Observable<Integer> throttleWithThimoutObserver(){
return createObserver().throttleWithTimeout(200,TimeUnit.MILLISECONDS);
}
對小于3的倍數(shù)的數(shù)據(jù)延遲100毫秒后發(fā)送新數(shù)據(jù), 100毫秒小于過濾的時間,數(shù)據(jù)被過濾掉;
輸出日志如下:
09-14 10:06:19.673 2916-2964/com.example.administrator.rxjavademo E/call: integer: 0
09-14 10:06:20.174 2916-2964/com.example.administrator.rxjavademo E/call: integer: 3
09-14 10:06:20.674 2916-2964/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:06:21.175 2916-2964/com.example.administrator.rxjavademo E/call: integer: 9
7.distinct:(去重,不能List元素出重, 可以list對象出重)
private Observable<Integer> distinctObserver(){
return Observable.just(1,2,4,5,6,8,4,3,2,1).distinct();
}
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 1
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 2
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 4
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 5
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 8
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 3
8.elementAt:(下標(biāo)順序過濾數(shù)據(jù)源)
private Observable<Integer> elementAtObserver(){
return Observable.just(1,3,8,10,9).elementAt(3);
}
09-14 11:05:48.591 5006-5006/com.example.administrator.rxjavademo E/call: integer: 10
9.filter:(根據(jù)函數(shù)進(jìn)行過濾操作,返回true就往下執(zhí)行,否則過濾掉 , 和last類似)
private Observable<Integer> filterObserver(){
return Observable.just(0,1,2,3,4,5,6,7,8,9)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer<3;
}
});
}
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 0
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 1
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 2
10.first:(只會返回第一條或者滿足條件的第一條數(shù)據(jù))
Observable.just(0,1,2,3,4,5).first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("call","integer: "+integer);
}
});
//BlockingObservable不會做任何處理,只會阻塞;
int result = Observable.just(0,1,2,3,4,5)
.toBlocking()
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
});
09-14 11:56:10.987 6328-6328/com.example.administrator.rxjavademo E/call: integer: 4
11.skip和take: (skip操作符過濾掉前面的數(shù)據(jù),take只取前面數(shù)據(jù) ,將后面的數(shù)據(jù)全部過濾掉)
12.sample和throttleFrist:(sample一次性發(fā)送間隔的幾個數(shù)據(jù),throttleFrist間隔時間后發(fā)送一個數(shù)據(jù))
13.join:(將兩個Observable在有效的時間內(nèi)拼接)
private Observable<String> getLeftObservable(){
return Observable.just("a","b","c");
}
private Observable<Long> getRightObservable(){
return Observable.just(1L,2L,3L);
}
private Observable<String> joinObserver(){
return getLeftObservable().join(getRightObservable(), new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<String, Long, String>() {
@Override
public String call(String s, Long aLong) {
return s+" : "+aLong;
}
});
}
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 1
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 2
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 3
14.merge:(將多個Observable發(fā)送的數(shù)據(jù)整合在一起后發(fā)送,但發(fā)送的數(shù)據(jù)可能是錯亂的,如果不想錯亂可以使用concat)
private Observable<Integer> mergeObserver(){
return Observable.merge(Observable.just(1,2,3),Observable.just(4,5,6));
}
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 5
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 6
15.mergeDelayError:(類似merge ,但是遇到異常后會繼續(xù)組合操作,等所有數(shù)據(jù)發(fā)送完成后才將這個異常拋出)
private Observable<Integer> mergeDelayErrorObserver(){
return Observable.mergeDelayError(Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<5;i++){
if (i==3){
subscriber.onError(new Throwable("onError"));
}
subscriber.onNext(i);
}
}
}),Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<5;i++){
if (i==3){
subscriber.onNext(i+5);
}
subscriber.onCompleted();
}
}
}));
}
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 0
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 8
09-14 15:44:31.090 10353-10353/com.example.administrator.rxjavademo E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.example.administrator.rxjavademo, PID: 10353
rx.exceptions.OnErrorNotImplementedException: onError
16.startWith:(需要發(fā)送的數(shù)據(jù)源前面插入數(shù)據(jù))
private Observable<Integer> startWithObserver(){
return Observable.just(1,2,3).startWith(-1,4);
}
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: -1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 3
17.onErrorReturn:(捕獲異常并返回了指定的字符串給訂閱者)
private Observable<String> createObserver(){
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i=1;i<=6;i++){
if (i<3){
subscriber.onNext("onNext: "+i);
} else {
subscriber.onError(new Throwable("onError"));
}
}
}
});
}
private Observable<String> onErrorReturnObserver() {
return createObserver().onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "onErrorReturn";
}
});
}
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 1
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 2
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onErrorReturn
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onCompleted
18.onErrorResumeNext:(發(fā)生異常的時候, 創(chuàng)建新的Observable來繼續(xù)發(fā)送數(shù)據(jù))
19.onExceptionResumeNext:(類似onErrorResumeNext,不同之處是對異常數(shù)據(jù)做判斷 ,如果是Exception就會使用另一個Observable代替原來的繼續(xù)發(fā)數(shù)據(jù),否則將錯誤分發(fā)給Subscriber)
20.retry:(發(fā)生錯誤的時候會重新訂閱,而且可以重復(fù)多次,但是這樣也就有可能造成死循環(huán),建議指定最大重復(fù)次數(shù))
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e("createObserver","subscriber");
for (int i=0;i<3;i++){
if (i==2){
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onNext(i);
}
}
}
});
}
createObserver().retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("retry","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("retry","onError: "+e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.e("retry","call: "+integer);
}
});
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: onError: Exception
21.delay:(延遲發(fā)送)
22.do:(給Observable的各個階段加上監(jiān)聽,執(zhí)行到的時候就觸發(fā))
- doOnEach() :Observable每次發(fā)送一個數(shù)據(jù)的時候就會觸發(fā)這個回調(diào),無論Observable調(diào)用的是onNext,onError還是onCompleted.
- doOnNext: 只有Observable調(diào)用onNext 發(fā)送數(shù)據(jù)的時候才會調(diào)用;
- doOnError: 只有Observable通過onError 分發(fā)錯誤的時候才會觸發(fā)回調(diào),并且調(diào)用Throwble對象作為參數(shù)傳遞到回調(diào)函數(shù)去;
- doOnComplete:只有Observable調(diào)用doOnComplete 發(fā)送結(jié)束事件的時候才會觸發(fā)回調(diào);
- doOnSubscribe和doOnUnsubscribe: 會在Subscrible進(jìn)行訂閱和反訂閱的時候才會觸發(fā)回調(diào);
- doOnTerminate:會在Observable結(jié)束前觸發(fā)回調(diào),無論是正常結(jié)束還是異常結(jié)束;
- finallyDo: 會在Observable結(jié)束后觸發(fā)回調(diào),無論是正常結(jié)束還是異常結(jié)束;
23.subscribeOn和observeOn:(subscribeOn是在哪個線程上訂閱,也就是用subscribeOn指定要工作的線程;observeOn是在哪個線程上觀察,也就是結(jié)果被使用的線程)
private Observable<Integer> observableOnserver(){
return createObserver()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread());
}
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e("createObserver","call: "+Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
});
}
private Observable<Integer> subscribeOnObserver(){
return createObserver()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.immediate());
}
09-18 10:34:52.832 3048-3092/com.example.administrator.rxjavademo E/createObserver: call: RxComputationThreadPool-1
09-18 10:34:52.833 3048-3092/com.example.administrator.rxjavademo E/computation: call: 1 RxComputationThreadPool-1
09-18 10:34:52.843 3048-3091/com.example.administrator.rxjavademo E/createObserver: call: RxNewThreadScheduler-1
09-18 10:34:52.844 3048-3048/com.example.administrator.rxjavademo E/mainThread: call: 1 main
24.using:(創(chuàng)建一個在Observable生命周期內(nèi)存活的資源,但是Observable終止的時候,資源也會被銷毀)
25.all:(對Observable發(fā)送的所有數(shù)據(jù)進(jìn)行判斷,如果全部滿足就返回true,否則返回false)
26.amb:(最多將9個Observable結(jié)合起來,看哪個先發(fā)送(包括onError和onComplete),后發(fā)送的將被丟棄)
27.contains:(判斷發(fā)送的所有數(shù)據(jù)有沒有包含某個數(shù)據(jù),如果包含就返回true,Observable沒發(fā)送完所有數(shù)據(jù)前不會返回數(shù)據(jù))
28.isEmpty:(判斷Observable是否發(fā)送過數(shù)據(jù),如果發(fā)送過了就返回false;如果Observavble已經(jīng)結(jié)束了都還沒發(fā)送這個數(shù)據(jù),則返回true)
29.concat:(將發(fā)送的數(shù)據(jù)組合起來,類似startWith和merge)
30.from:(接收一個對象作為參數(shù)來創(chuàng)建Observable,參數(shù)對象可以是Iterable,Callable,Future和數(shù)組)
31.just:(接收對象作為輸入,然后創(chuàng)建一個發(fā)送該對象的Observable,對象可以是數(shù)字,字符串,數(shù)組,Iterate對象等)
- from()創(chuàng)建方式和just()操作符類似,但是just操作符創(chuàng)建的Observable會將整個參數(shù)對象作為數(shù)據(jù)一下子發(fā)送出去,例如參數(shù)是個含有10個數(shù)字的數(shù)組,使用from創(chuàng)建Observable就會發(fā)送10次,而just創(chuàng)建的Observable會一次將整個數(shù)組發(fā)送出去;
- 一般如果用from轉(zhuǎn)換多個數(shù)據(jù),比如 ArrayList等包含多個數(shù)據(jù)的數(shù)組或者列表, just用于處理單個的數(shù)據(jù)。
- from 遇到只包含一個數(shù)據(jù)的時候,會使用just創(chuàng)建Observable; just 遇到多于一個的情況,會使用from 創(chuàng)建 Observable
32.自定義操作符:
A. 可以多次調(diào)用Subscriber的onNext方法,但是同個數(shù)據(jù)只能調(diào)用一次;
B. 可以調(diào)用Subscriber的onComplete或者onError方法,但是這兩個方法是互斥的,調(diào)用了其中一個就不能調(diào)用另一個,并且一旦調(diào)用了兩者中的任何一個方法就不能調(diào)用onNext方法;
C. 如果無法保證無法保證上面兩條原則,可以對自定義操作符加上serialize操作符,這個操作符會強(qiáng)制性發(fā)送正確的數(shù)據(jù);
D. 自定義操作內(nèi)部不能阻塞;
E.如果有異常的時候,不能繼續(xù)發(fā)送正常的數(shù)據(jù),要立刻調(diào)用Subscriber的onError() 來將異常拋出;
F.null也屬于一種數(shù)據(jù), 可以正常發(fā)送,和完全不發(fā)送是兩回事;
G.如果通過組合多個Rxjava原生操作符就能達(dá)到目的, 就不要使用自定義操作符實現(xiàn);例如:
- first(操作符是通過take(1).single()來實現(xiàn)的;
- ignoreElements()是通過filter(alwaysFalse())來實現(xiàn)的;
- reduce(a)是通過scan(a).last()來實現(xiàn)的;
參考:
- RxJava響應(yīng)式編程
- RxJavaX GitHub
- RxJavaX 官方
- RxJavaX 中文文檔
- 給 Android 開發(fā)者的 RxJava 詳解