首先這是基于rxjava1.0的學(xué)習(xí),最近又出了2.0版本,有了一些改動,首先用法就有了一定的變化,就比如開始我在Android Studio添加的依賴庫選了2.0,結(jié)果測試rxjava-essentials中的代碼時,發(fā)現(xiàn)方法根本找不到了。</br>
不過正所謂學(xué)習(xí)嘛,雖然說更新了2.0版本,但是內(nèi)部的核心肯定是換湯不換藥的,就從1.0版本開始學(xué)!
RxJava的與眾不同
從純Java的觀點看,Rxjava Observale類源自于經(jīng)典的Gang of Four的觀察者模式。
它添加了三個缺少的功能:</br>
- 生產(chǎn)者在沒有更多數(shù)據(jù)可用時能夠發(fā)出信號通知:onCompleted()事件。
- 生產(chǎn)者在發(fā)生錯誤時能夠發(fā)出信號通知:onError()事件。
- RxJava Observables 能夠組合而不是嵌套,從而避免開發(fā)者陷入回調(diào)地獄。
在RxJava的世界里,我們有四種角色:
- Observable
- Observer
- Subscriber
- Subjects
Observables和Subjects是兩個“生產(chǎn)”實體,Observers和Subscribers是兩個“消
費”實體。
以上摘自《rxjava-essentials》
Observer是觀察者接口,而Subscriber是其抽象實現(xiàn)類。
Observable.create()
Observable.create()創(chuàng)建一個Observable();
-
Observable.subscribe(Observer ob)訂閱一個觀察者。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Completed"); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } }); }
打印結(jié)果:</br>
i = 0</br>
i = 1</br>
i = 2</br>
i = 3</br>
i = 4</br>
Completed</br>
除了可以用Observable.create(),同時還有Observable.from(),Observable.just()。
Observable.from()
-
Observable.from()可以直接將我們的列表丟進參數(shù)中。
public static void main(String... args) { List<Integer> items = new ArrayList<>(); items.add(1); items.add(11); items.add(111); items.add(1111); Observable.from(items) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Completed"); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } }); }
打印結(jié)果:</br>
i = 0</br>
i = 1</br>
i = 2</br>
i = 3</br>
i = 4</br>
Completed</br>
Observable.just()
-
Observable.just()可以將我們現(xiàn)有的方法轉(zhuǎn)換成Observable。
public static void main(String... args) { Observable.just(reInt()) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Completed"); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } }); } public static int reInt(){ return 12; }
打印結(jié)果:</br>
i = 12</br>
Completed</br>
Subject
- 前面對Observer,Observable,Subscriber都有用到,單單還有一個Subject。
- Subject = Observable + Observer,它可以是一個Observable同時也可以是一個Observer,它作為連接這兩個世界的一座橋梁。
- 一個Subject可以訂閱一個Observable,就像一個觀察者,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù),就像一個Observable。很明顯,作為一個Observable,觀察者們或者其它Subject都可以訂閱它。
Rxjava提供了四種不同的Subject:
- PublishSubject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
PublishSubject
public static void main(String... args) {
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onNext(Object o) {
System.out.println(o.toString());
}
});
publishSubject.onNext("HelloWorld");
}
打印結(jié)果:</br>
HelloWorld
- 代碼開始創(chuàng)建了一個PublishSubject,用create()發(fā)射一個String值,之后訂閱了PublishSubject。一直到這里,也沒有數(shù)據(jù)要發(fā)送,直到最后
publishSubject.onNext("HelloWorld")觸發(fā)了觀察者的onNext()方法,將傳遞過來的"HelloWorld"打印出來。
BehaviorSubject
-
BehaviorSubject會首先向他的訂閱者發(fā)送截至訂閱前最新的一個數(shù)據(jù)對象或者初始值,然后正常的發(fā)送訂閱后的數(shù)據(jù)流。
public static void main(String... args) { BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1); behaviorSubject.subscribe(new Observer<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } }); behaviorSubject.onNext(999); }
打印結(jié)果:</br>
i = 1</br>
i = 999</br>
首先接受到創(chuàng)建Observer時的初始值1,然后出發(fā)觀察者的onNext()方法。
ReplaySubject
ReplaySubject會緩存它所訂閱的所有數(shù)據(jù),向任意一個訂閱它的觀察者重發(fā)。
public static void main(String... args) {
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
replaySubject.onNext(333);
replaySubject.onNext(444);
replaySubject.onNext(555);
replaySubject.onCompleted();
replaySubject.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
可以看到,replaySubject先發(fā)出了信息,然后才訂閱了Observer。</br>
打印結(jié)果:</br>
i = 333</br>
i = 444</br>
i = 555</br>
AsyncSubject
當(dāng)Observable完成時AsyncSubject只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者。
public static void main(String... args) {
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
asyncSubject.onNext(333);
asyncSubject.onNext(444);
asyncSubject.onNext(555);
asyncSubject.onCompleted();
}
在這里,asyncSubject發(fā)出了三個信息,但是根據(jù)asyncSubject的特性,只會發(fā)布最后一個數(shù)據(jù)給訂閱的觀察者,注意最后這一行代碼:asyncSubject.onCompleted(),如果不加這一行代碼,程序不能識別555是最后一個數(shù)據(jù),將不會打印任何結(jié)果。</br>
打印結(jié)果:</br>
i = 555</br>