rxjava的基礎(chǔ)概念
觀察者模式于發(fā)布-訂閱模型
rxjava有四個(gè)基礎(chǔ)概念
- Observable 被觀察對(duì)象
- Observer 觀察者對(duì)象
- subscribe 訂閱
- 事件 事件回調(diào),包括onNext、onError、onCompleted
rxjava中事件的概念
-
onCompleted():事件隊(duì)列完結(jié),不再有onNext()發(fā)出時(shí),需要觸發(fā)onCompleted()方法作為完結(jié)標(biāo)志 -
onError(Throwable e):事件隊(duì)列異常,同時(shí)該事件隊(duì)列終止,在同一個(gè)事件流中,onError和onCompleted只能存在一個(gè) -
onNext(T param):普通的事件回調(diào)
rxjava的基本實(shí)現(xiàn)
-
創(chuàng)建Observer
直接new一個(gè)Observer接口
Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(String s) { System,out.println(s); } };或者new一個(gè)rxjava中實(shí)現(xiàn)了Observer接口的Subscribe抽象類
Subscriber<String> subscriber = new Subscriber<String>() { public void onCompleted() { System.out.println("onCompleted"); } public void onError(Throwable e) { System.out.println(e.getMessage()); } public void onNext(String s) { System.out.println(s + " on onNext"); } }; -
創(chuàng)建Observable對(duì)象
Observable<String> observable = Observable.create(subscriber1 -> { System.out.println("call"); subscriber1.onNext("sss"); subscriber1.onNext("sss1"); subscriber1.onCompleted(); }); -
訂閱過(guò)程,調(diào)用subscribe方法
observable.subscribe(subscriber);
通過(guò)源碼可以發(fā)現(xiàn),subscribe方法最后都會(huì)調(diào)用
Subscription subscribe(Subscriber<? super T> subscriber)
這個(gè)方法,所以第一步在創(chuàng)建觀察者對(duì)象的時(shí)候,即使使用Observer,最終都會(huì)轉(zhuǎn)換為Subscriber對(duì)象。
這個(gè)方法的核心步驟為:
subscriber.onStart(); //可選的準(zhǔn)備方法
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //Observable被訂閱,發(fā)出事件流
return Subscriptions.unsubscribed(); //取消訂閱
通過(guò)查看Subscriber的源碼可以發(fā)現(xiàn),它相比Observer多出了2個(gè)重要的方法:
- onStart()
在事件發(fā)出之前作一些準(zhǔn)備工作,默認(rèn)方法實(shí)現(xiàn)為空 - unSubscribe()
取消訂閱,釋放Observalbe對(duì)Subscribe的引用,防止內(nèi)存泄漏
rxjava的常用操作符
記錄一下自己學(xué)習(xí)的幾個(gè)rxjava的操作符
-
map
將事件對(duì)象轉(zhuǎn)換為另一個(gè)對(duì)象,例如
Observable.from(strs) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.hashCode(); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } });結(jié)合lambda表達(dá)式就可以寫成為
Observable.from(strs) .map(s -> s.hashCode()) .subscribe(integer -> System.out.println(integer));代碼非常的簡(jiǎn)潔。這段代碼獲取了每個(gè)string的hashcode并進(jìn)行打印
-
flatMap
先記下這里示例代碼中的需要使用對(duì)象
static class Student { List<String> courses; //課程 public Student() { } public void setCourses(List<String> courses) { this.courses = courses; } public List<String> getCourses() { return courses; } }有些時(shí)候,map并能滿足我們的所有需求,比如我使用map,獲取到的對(duì)象是一個(gè)List,這個(gè)時(shí)候,我們?cè)诒闅vlist的時(shí)候就要在回調(diào)方法里面寫for循環(huán),就類似這樣:
Observable.from(students) .map(student -> student.getCourses()).subscribe(strings -> { for (String course : strings) { System.out.println(course); } });如果我需要再把每一個(gè)course放入事件流,那么就又需要自己去create Observer,非常的不簡(jiǎn)潔。于是,我們可以使用flatMap操作符,flatMap操作符可以把事件對(duì)象轉(zhuǎn)換為另一個(gè)對(duì)象的Observable
示例代碼如下:Observable.from(students) .flatMap(new Func1<Student, Observable<String>>() { @Override public Observable<String> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });配合lamdba寫成
Observable.from(students) .flatMap(student -> Observable.from(student.getCourses())) .subscribe(s -> System.out.println(s));即使需要繼續(xù)下去,也可以繼續(xù)使用flatMap操作符,代碼十分簡(jiǎn)潔
-
compose操作符
通過(guò)源碼可以看到,map/flatMap操作符的實(shí)現(xiàn)其實(shí)是通過(guò)left()實(shí)現(xiàn)的,在rxjava中,除了left,還有一個(gè)變換叫ompose(Transformer)
這個(gè)操作符針對(duì)自身進(jìn)行變換,事件序列并不會(huì)改變
例如原先的代碼中需要多個(gè)left()變換:observable1 .lift1() .lift2() .subscribe(subscriber1);
observable2
.lift1()
.lift2()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.subscribe(subscriber1);
```
這段代碼簡(jiǎn)單封裝后可以寫成:
```
private Observable liftAll(Observable observable) {
return observable
.lift1()
.lift2()
}
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
```
這段代碼看起來(lái)比之前的代碼舒服很多,但是封裝了observable,感覺(jué)不是很靈活,于是代碼可以改寫成:
```
class AllLeftTransformers implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable.left1().left2();
}
}
AllLeftTransformers liftAll = new AllLiftTransfomers();
//訂閱事件
observable1.compose(leftAll).subscriber(subscribe1);
observable2.compose(leftAll).subscriber(subscribe2);
observable3.compose(leftAll).subscriber(subscribe3);
observable4.compose(leftAll).subscriber(subscribe4);
```
這樣相對(duì)于自己封裝的優(yōu)勢(shì)就在于,不用把Observable封裝,并不會(huì)減少Observable的靈活性
rxjava的線程調(diào)度
rxjava內(nèi)置了線程調(diào)度(scheduler)
示例代碼:
Observable.from(array)
.observeOn(Schedulers.immediate())
.subscribeOn(Schedulers.io())
.subscribe(i -> System.out.println(i + ""));
上面代碼的意思就是觀察在當(dāng)前線程,訂閱事件發(fā)生在io線程
關(guān)于rxjava的Schedulers里面常見(jiàn)的線程有如下幾種
- Schedulers.immediate() 當(dāng)前線程
- Schedulers.io() io線程
- Schedulers.newThread() 新的線程
- Schedulers.computation() 計(jì)算線程,即CPU密集的線程
其中rxAndroid里面還有一個(gè)AndroidSchedulers.mainThread(),表示Android的ui線程
再比如比較經(jīng)典的,在Android中請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù):
getId()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscriber(user->userView.setId(user.id));
其中
- 觀察者表示的就是ui界面,使用在android主線程進(jìn)行
- 被觀察者應(yīng)該是觀察是否發(fā)生網(wǎng)絡(luò)請(qǐng)求獲取數(shù)據(jù),所以訂閱事件發(fā)生在io線程2