RecctiveX與RxJava
?在講到RxJava之前我們首先要了解什么是ReactiveX,因?yàn)镽xJava是ReactiveX的一種java實(shí)現(xiàn)。ReactiveX是Reactive Extensions的縮寫,一般簡(jiǎn)寫為Rx。微軟給的定義是,Rx是一個(gè)函數(shù)庫(kù),讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序。開發(fā)者可以用Observables表示異步數(shù)據(jù)流,用LINQ操作符查詢異部數(shù)據(jù)流,用Schedulers參數(shù)化異步數(shù)據(jù)流的并發(fā)處理,Rx可以這樣定義:Rx=Observables+LINQ+Schedulers。
?說到異步操作,我們會(huì)想到Android的AsyncTask和Handler。但是隨著請(qǐng)求數(shù)量越來越多,代碼邏輯將會(huì)變得將越來越復(fù)雜而RxJava卻依舊能保持清晰的邏輯。RxJava的原理就是創(chuàng)建一個(gè)Observable對(duì)象來干活,然后使用各種操作符建立起來的鏈?zhǔn)讲僮?,就如同流水線一樣,把你想要處理的數(shù)據(jù)一步步的加工成你想要的成品,然后發(fā)射給Subscriber處理。
?RxJava的異步操作是通過擴(kuò)展觀察者模式來實(shí)現(xiàn)的,RxJava有4個(gè)角色Observable、Observer、Subscriber和Suject,這4個(gè)角色后面會(huì)具體講解。Observable和Observer通過subscribe方法實(shí)現(xiàn)訂閱關(guān)系,Observable就可以在需要的時(shí)候通知Observer,官方文檔。
RxJava基本實(shí)現(xiàn)
?在使用RxJava前請(qǐng)先在Android Studio中配置gradle:
dependencies{
//最新版本
//implementation "io.reactivex.rxjava3:rxjava:3.x.y"
implementation 'io.reactivex.rxjava:1.2.0'
implementation 'io.reactivex.rxandroid:1.2.1'
}
?其中RxAndroid是RxJava在Android平臺(tái)的擴(kuò)展。它包含了一些能夠簡(jiǎn)化Android開發(fā)的工具,比如特殊的調(diào)度器。
?RxJava的基本用法為3個(gè)步驟。
?1.創(chuàng)建Observer(觀察者)
?它決定實(shí)踐觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨?。代碼如下:
Subscriber subscriber = new Subscriber<String>(){
@Overrride
public void onCompleted(){
Log.d(TAG,"onCompleted");
}
@Overrride
public void onError(Throwable e){
Log.d(TAG,"onError");
}
@Overrride
public void onNext(String s){
Log.d(TAG,"onNext"+s);
}
@Overrride
public void onStart(){
Log.d(TAG,"onNext"+s);
}
}
?其中onCompleted、onError和onNext是必須要實(shí)現(xiàn)的方法,其含義如下。
- onCompleted:時(shí)間隊(duì)列完結(jié)。RxJava不僅把每個(gè)事件單獨(dú)處理,其還會(huì)把它們看作一個(gè)隊(duì)列。當(dāng)不會(huì)再有新的onNext發(fā)出時(shí),需要觸發(fā)onCompleted()方法作為完成標(biāo)志。
- onError:事件隊(duì)列異常。在事件處理過程中出現(xiàn)異常時(shí),onError()會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。
- onNext:普通的事件。將要處理的事件添加到事件隊(duì)列中。
- onStart:它會(huì)在事件還未發(fā)送之前被調(diào)用,它可以用于做一些準(zhǔn)備工作。例如數(shù)據(jù)的清零或重置。這是一個(gè)可選的方法,默認(rèn)情況下它的實(shí)現(xiàn)為空。
當(dāng)然,如果實(shí)現(xiàn)簡(jiǎn)單的功能,也可以用到Observer來創(chuàng)建觀察者。Observer是一個(gè)接口,而上面用到的Subscriber是在Observer的基礎(chǔ)上進(jìn)行的擴(kuò)展。在后文的Subscriber訂閱過程中Observer也會(huì)先被轉(zhuǎn)換為Subscriber來使用。用Observer創(chuàng)建觀察者,如下所示:
Observer<String> observer = new Observer<String> (){
@Overrride
public void onCompleted(){
Log.d(TAG,"onCompleted");
}
@Overrride
public void onError(Throwable e){
Log.d(TAG,"onError");
}
@Overrride
public void onNext(String s){
Log.d(TAG,"onNext"+s);
}
}
?2.創(chuàng)建Observable(被觀察者)
?它決定什么時(shí)候出發(fā)事以及怎樣的事件。RxJava使用create方法來創(chuàng)建一個(gè)Observable,并為它定義事件觸發(fā)規(guī)則,如下所示:
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subsccribe<? super String> subscribe){
subscribe.onNext("楊影楓")
subscribe.onNext("月眉兒")
subscribe.onCompleted();
}
})
?通過調(diào)用Subscriber的方法,不斷地事件添加到任務(wù)隊(duì)列中。也可用just方法來實(shí)現(xiàn):
Observable observable = Observable.just("楊影楓","月眉兒");
?上述代碼會(huì)依次調(diào)用onNext("楊影楓")、onNext("月眉兒")、onCompleted()。還可以用from方法來實(shí)現(xiàn),如下所示:
String[] words = {"楊影楓","月眉兒"};
Observable observabvle = Observable.form(words);
?上述代碼調(diào)用的方法順序和just方法是一樣的。
?3.Subscribe(訂閱)
?訂閱只需要一行代碼就可以了,如下所示:
observable.subscribe(subscriber);
?運(yùn)行代碼查看Log:
?D/RxJava:onStart
?D/RxJava:onNext楊影楓
?D/RxJava:onNext月眉兒
?D/RxJava:onCompleted
?和預(yù)想的一樣調(diào)用onStart方法,接著調(diào)用兩個(gè)onNext方法,調(diào)用onCompleted方法。
RxJava的不完整定義回調(diào)
?前面介紹了回調(diào)的接收主要依賴于subscribe(Observer)和subscribe(Subscriber)。此外。RxJava還提供了另一種回調(diào)方式,也就是不完整回調(diào)。在講到不完整回調(diào)之前我們首先要了解Action。查看RxJava源碼,發(fā)現(xiàn)一堆Action。
public interface Action0 extends Action{
void call();
}
?再看Action1:
public interface Action1<T> extends Action{
void call(T t);
}
?再看Action9:
public interface Action9<T1,T2,T3,T4,T5,T6,T7,T8,T9> extends Action{
void call(T1 t1,T2 t2,T3 t3,T4 t4,T5 t5,T6 t6,T7 t7,T8 t8,T9 t9);
}
?很明顯,Aciton后的數(shù)字代表回調(diào)的參數(shù)類型數(shù)量,之前創(chuàng)建Observer和訂閱代碼也就可以改寫為下面的代碼:
Action1<String> onNextAction = new Action1<String>(){
@Override
public void call(String s){
Log.d(TAG,"onNext"+s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>(){
@Override
public void call(Throwable throwable){
}
};
Action0 onCompletedAction = new Action0(){
@Override
public void call(){
Log.d(TAG,"onCompleted");
}
};
observable.subscride(onNextAction,onErrorAction,onCompletedAction);
?
我們定義了onNextAction來處理 onNext的回調(diào),同理,我們還定義了onErrorAction和onCompletedAction來分別處理onError和onCompleted的回調(diào),最后我們把它們傳給subscribe方法。很顯然這樣寫的靈活更大一些;同時(shí)我們也可以只傳一個(gè)或者兩個(gè)Action,如下所示:
observable.subscride(onNextAction);
observable.subscribe(onNextAction,onErrorAction);
?第一行之定義了onNextAction來處理onNext的回調(diào);而第二行則定義了onNextAction處onNext的回調(diào),onErrorAction來處理onError的回調(diào)
作者:張三兒 郵箱:921849651@qq.com
注:歡迎大佬指點(diǎn)江山,希望大家多多支持作者。持續(xù)技術(shù)分享!