在前面一章節(jié)中,講解了Rxjava的基本的組成,他們之間的關(guān)系,這一章,主要讓大家了解一下在RxJava中的操作符。但是在這之前,我們還得先了解一下 Scheduler(調(diào)度器)
Scheduler(調(diào)度器)
在上一章節(jié),我們講到了,Rxjava的就是異步。
那么如何讓他進(jìn)行異步呢?這就要用到我們的調(diào)度器了。
先看看RxJava中調(diào)度器支持哪幾種調(diào)度:
| 調(diào)度器類型 | 效果 |
|---|---|
| Schedulers.computation(?) | 用于計(jì)算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請(qǐng)使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量 |
| Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
| Schedulers.immediate(?) | 在當(dāng)前線程立即開始執(zhí)行任務(wù) |
| Schedulers.io(?) | 用于IO密集型任務(wù),如異步阻塞IO操作,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長(zhǎng);對(duì)于普通的計(jì)算任務(wù),請(qǐng)使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線程緩存的新線程調(diào)度器 |
| Schedulers.newThread(?) | 為每個(gè)任務(wù)創(chuàng)建一個(gè)新線程 |
| Schedulers.trampoline(?) | 當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行 |
有了上述所說的幾種調(diào)度之后,就可以調(diào)用subscribeOn()和observeOn()來(lái)對(duì)線程進(jìn)行調(diào)度了。
subscribeOn()指定:Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行。
ObserveOn()指定:一個(gè)Observable在一個(gè)特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法,
Subscriber subcriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
上面這段代碼中,由于指定了1,2,3,4發(fā)射代碼為Schedulers.io(),那么發(fā)射數(shù)據(jù)就將在io線程中執(zhí)行。而onNext, onError和onCompleted則將在主線中執(zhí)行。
Operators(操作符)
map家族
RxJava提供了幾個(gè)mapping函數(shù):map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有這些函數(shù)都作用于一個(gè)可觀測(cè)序列,然后變換它發(fā)射的值,最后用一種新的形式返回它們。
map
map 是用于變換的一個(gè)操作符,這在RxJava中占據(jù)了一定的地位,就是因?yàn)樗淖儞Q操作。
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
})
.subscribe(subcriber);
在上面的代碼中,我通過map將字符串轉(zhuǎn)化成了整形的1,2,3,4,返回一個(gè)Observable的對(duì)象。
請(qǐng)注意:這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
flatmap
flatmap對(duì)于新入門的來(lái)說,理解起來(lái)確實(shí)有一定的難度,可以先看一個(gè)簡(jiǎn)單的栗子:
Subscriber subcriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Error!");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.subscribe(subcriber);
從上面我們可以看出,map與flatmap很相似,都是用的Func1,而且模式都是<I,O>模式,即是:I轉(zhuǎn)換成O并返回。但是最大的不同點(diǎn)在于:我們flatmap的輸出類型是Observable的類型。
在這里請(qǐng)注意一個(gè)問題:在執(zhí)行flatmap中返回之后(O輸出返回的Observable),并不是立馬把返回的Observable通過Subscribe進(jìn)行訂閱,而是將返回的若干Observables都交給同一個(gè)Observable,然后再進(jìn)行subscribe。
所以,在上面我們先將字符串"1","2", "3", "4" 分別轉(zhuǎn)換成一個(gè)整形的Observable類型,即是:Observable(2),Observable(3),Observable(4),Observable(5)。然后將這些個(gè)Observables統(tǒng)一轉(zhuǎn)換成一個(gè)Observable,再進(jìn)行subscribe。看一下結(jié)果:
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted: Completed!
那么,這個(gè)flatmap到底有何用呢?可以用在什么地方呢?
假設(shè)這樣一種情景:一個(gè)學(xué)校的老師我們定義為一個(gè)集合A,每個(gè)老師包括了個(gè)人信息和所教課程,一個(gè)老師不可能只教授一門課程,所以我們將老師所教授課程定義為集合B。如果讓你打印每個(gè)老師所教課程,該怎么做?
Teacher[] teachers = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(teachers)
.flatMap(new Func1<Teacher, Observable<Course>>() {
@Override
public Observable<Course> call(Teacher teacher) {
return Observable.from(teacher.getCourses());
}
})
.subscribe(subscriber);
最后再補(bǔ)充一點(diǎn):FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。這意味著flatMap()函數(shù)在最后的Observable中不能夠保證源Observables確切的發(fā)射順序。
ConcatMap
RxJava的concatMap()函數(shù)解決了flatMap()的交叉問題,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù),而不是合并它們,如下圖所示:

變換的操作符還有很多:buffer,Scan...等等,大家可以研究一下。
其他操作符
repeat
讓你發(fā)射的數(shù)據(jù)重復(fù)發(fā)射
Subscriber subcriber = new Subscriber<Integer>() {
...
}
};
Observable.just("1", "2","3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.parseInt(s)+1);
}
})
.repeat(3)
.subscribe(subcriber);
看一下結(jié)果:
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onNext: 2
onNext: 3
onNext: 4
onCompleted: Completed!
range
從起始點(diǎn)開始發(fā)射數(shù)據(jù)
Subscriber subcriber = new Subscriber<Integer>() {
...
};
Observable.range(10,3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
結(jié)果為:10,11,12。range(10,3),其中10 是起始,3是數(shù)量。
interval
在需要輪詢的時(shí)候是最好的選擇
Observable.interval(3,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
...
});
interval()函數(shù)的兩個(gè)參數(shù):一個(gè)指定兩次發(fā)射的時(shí)間間隔,另一個(gè)是用到的時(shí)間單位。
take
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
...
});
輸出
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
TakeLast
如果我們想要最后N個(gè)元素,我們只需使用takeLast()函數(shù):
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takelast(2)
.subscribe(new Subscriber<Integer>() {
...
});
輸出
Next: 7
Next: 8
Sequence complete.
當(dāng)然Rxjava的操作符不止這一點(diǎn),大家可以戳我可以查看更多的操作符的用法。
Rx系列文章:
第一篇《Rx系列之RxJava初識(shí)》
第三篇《Rx系列之Rxjava操作符進(jìn)階-使用場(chǎng)景》