RxJava
集成
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.2.1'
創(chuàng)建被觀察者
create
使用OnSubscribe從頭創(chuàng)建一個(gè)Observable,這種方法比較簡(jiǎn)單。需要注意的是,使用該方法創(chuàng)建時(shí),建議在OnSubscribe#call方法中檢查訂閱狀態(tài),以便及時(shí)停止發(fā)射數(shù)據(jù)或者運(yùn)算。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("item1");
subscriber.onNext("item2");
subscriber.onCompleted();
}
});
from
將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組,內(nèi)部通過代理的方式轉(zhuǎn)換成一個(gè)Observable。Future轉(zhuǎn)換為OnSubscribe是通過OnSubscribeToObservableFuture進(jìn)行的,Iterable轉(zhuǎn)換通過OnSubscribeFromIterable進(jìn)行。數(shù)組通過OnSubscribeFromArray轉(zhuǎn)換。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
just
將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè)Observable。如果是單個(gè)對(duì)象,內(nèi)部創(chuàng)建的是ScalarSynchronousObservable對(duì)象。如果是多個(gè)對(duì)象,則是調(diào)用了from方法創(chuàng)建。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
timer
創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射數(shù)據(jù)項(xiàng)為0的Observable<Long>,內(nèi)部通過OnSubscribeTimerOnce工作
Observable.timer(1000,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("JG",aLong.toString()); // 0
}
});
interval
創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射從0開始的整數(shù)序列的Observable<Long>,內(nèi)部通過OnSubscribeTimerPeriodically工作
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//每隔1秒發(fā)送數(shù)據(jù)項(xiàng),從0開始計(jì)數(shù)
//0,1,2,3....
}
});
range
創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable<Integer>
Observable.range(2,5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",integer.toString());// 2,3,4,5,6 從2開始發(fā)射5個(gè)數(shù)據(jù)
}
});
創(chuàng)建觀察者
創(chuàng)建Observer
Observer 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer 接口的實(shí)現(xiàn)方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
創(chuàng)建subscriber
除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
訂閱
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了。代碼形式很簡(jiǎn)單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Scheduler

多切換幾次線程:
當(dāng)使用了多個(gè) subscribeOn() 的時(shí)候,只有第一個(gè) subscribeOn() 起作用。
observeOn() 指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求,只要在每個(gè)想要切換線程的位置調(diào)用一次 observeOn() 即可
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程,由 observeOn() 指定
變換
flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個(gè) Observable 對(duì)象,并且這個(gè) Observable 對(duì)象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象;2. 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開始發(fā)送事件;3. 每一個(gè)創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個(gè)步驟,把事件拆成了兩級(jí),通過一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個(gè)『鋪平』就是 flatMap() 所謂的 flat。
map
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
flatmap
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
過濾
- filter
- ofType
- skip
- take
- distinct
組合
- zip
- merge
- combineLatest
- concat
- startWith
轉(zhuǎn)換
- toList
- toSortedList
- toMap
- toMultiMap