通過上一篇《Rx系列之RxJava操作符》,相信已經(jīng)能夠熟練的使用一些基本的操作符了。但是對于我們大家而言,其實(shí)最傳統(tǒng)的命令式編程已經(jīng)是我們順手就可以拈來的,但是,現(xiàn)在用響應(yīng)式編程,突然發(fā)現(xiàn):臥槽,這個(gè)地方用響應(yīng)式怎么寫,這樣寫對么?估計(jì)很多人才開始接觸RxJava的時(shí)候應(yīng)該都有這樣的疑慮。不用擔(dān)心,這一篇就給大家講講RxJava到底該怎么用,在什么情況下用!
RxJava的使用場景
眼尖的小伙伴,可能已經(jīng)發(fā)現(xiàn),在上一篇中,很多那么重要的操作符怎么都沒講!哈哈哈,答案在這里。好廢話不多說,來看看Rxjava到底在哪些情況下可以使用。
動(dòng)態(tài)搜索的場景
我們先來看一個(gè)動(dòng)態(tài)搜索的場景:

假設(shè),我要在這進(jìn)行網(wǎng)絡(luò)搜索,那么,我就要在這里面進(jìn)行網(wǎng)絡(luò)訪問,如果是輸入完成之后點(diǎn)擊確定進(jìn)行搜索還好,但是如果是動(dòng)態(tài)收索呢?只要搜索框中的搜索內(nèi)容一改變,那么是不是就要進(jìn)行網(wǎng)絡(luò)請求呢?那這樣就不是那么友好了。為了解決這樣的問題,rxjava為我們提供了一個(gè)很好的解決方案:
-
使用debounce作為textSearch
debounce()函數(shù)過濾掉由Observable發(fā)射的速率過快的數(shù)據(jù);如果在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè),那么它將發(fā)射最后的那個(gè)。
debounce()使用TimeUnit對象指定時(shí)間間隔。
是不是感覺棒棒噠,昂,不管你喜不喜歡,反正我是愛死它了。
來看一下示意圖

由上圖我們可以看出,在比較密集的數(shù)據(jù)(2,3,4,5)發(fā)射之后,其實(shí)最終只是發(fā)射5。
附上代碼:
RxTextView.textChanges(editText)
.debounce(5000,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(CharSequence charSequence) {
Log.d(TAG, "onNext: "+charSequence.toString());
}
});
在這5s內(nèi),我輸入了2,3,4,5(出最后一個(gè)5,其他輸入之后就刪除哈),但是最后得到的結(jié)果卻是:
onNext: 5
注意:這個(gè)操作符會(huì)會(huì)接著最后一項(xiàng)數(shù)據(jù)發(fā)射原始Observable的onCompleted通知,即使這個(gè)通知發(fā)生在你指定的時(shí)間窗口內(nèi)(從最后一項(xiàng)數(shù)據(jù)的發(fā)射算起)。也就是說,onCompleted通知不會(huì)觸發(fā)限流。
在上面可能會(huì)存在一個(gè)疑問,那就是
RxTextView.textChanges(editText)
這是個(gè)什么東西?你丫怎么沒講,哈哈,這個(gè)呀,要在后面的Rx系列中單獨(dú)來講,所以不要著急,暫時(shí)說一下這個(gè)的功能,這個(gè)RxTextView.textChanges(editText)其實(shí)是RxBinding里面的一個(gè)對控件的操作,其功能就跟TextWatcher一樣,就是對數(shù)據(jù)的變更進(jìn)行監(jiān)聽,所以上面的數(shù)據(jù)變化之后5s后將數(shù)據(jù)發(fā)射出去。嗯嗯,到這里就把動(dòng)態(tài)搜索場景講解了。
緩存檢測場景
在請求取數(shù)據(jù)的處理過程中,我們的操作一般是這樣一個(gè)原理:
- ** 首先檢查內(nèi)存是否有緩存**
- 然后檢查文件緩存中是否有
-
最后才從網(wǎng)絡(luò)中取
任何一步一旦發(fā)現(xiàn)數(shù)據(jù)后面的操作都不執(zhí)行
在rxjava中為我們提供了兩個(gè)解決這個(gè)問題的操作符,分別是: concat和first
concat
不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable
concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。直到前面一個(gè)Observable終止,Concat
才會(huì)訂閱額外的一個(gè)Observable
請注意上面所說的“就好像它們是一個(gè)Observable”,其實(shí)并不是一個(gè)Observable,是前面一個(gè)停止之后才會(huì)訂閱下一個(gè),所以說他們并不是一個(gè),請君注意咯。

如上所示,就是將兩個(gè)Observable連接起來了。
還有一個(gè)實(shí)例方法concatWith,它是和concat等價(jià)的:Observable.concat(a,b)==a.concatWith(b)
來看一下是不是這個(gè)樣子的:
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable a = Observable.just(1, 2, 3, 4, 5);
Observable b = Observable.just(6, 7, 8, 9, 10);
Observable.concat(a, b)
.subscribe(subscriber);
然后我們得到:
onNext: 1
...
onNext: 10
onCompleted: onCompleted
這時(shí)估計(jì)就會(huì)有人說了:你不是說這個(gè)操作符其實(shí)是將兩個(gè)訂閱連接起來了嘛!那么,為什么只是在最后打印了onCompleted,在onNext: 5后面不是也應(yīng)該打印一個(gè)嗎?
我們都知道觀察者和被觀察者之間,是由訂閱建立關(guān)系的,那么對于被觀察者來說,確實(shí)我發(fā)射了兩個(gè)數(shù)據(jù)源,但是對于觀察者來說,我不知道你有幾個(gè)數(shù)據(jù)源,我的職責(zé)就只是,數(shù)據(jù)發(fā)射過來后,我打印而已。所以,只有當(dāng)onNext沒有接收到數(shù)據(jù)時(shí),才會(huì)調(diào)用onCompleted。
最后對這個(gè)操作符,再補(bǔ)充一點(diǎn):如果當(dāng)?shù)谝粋€(gè)Observable a拋異常,那么將不會(huì)繼續(xù)執(zhí)行后面的Observable b了。
如果想測試請將上面的
Observable a = Observable.just(1, 2, 3, 4, 5);
變成
Observable a = Observable.just(1, 2, 3, 4, new RuntimeException());
進(jìn)行測試。
first
只發(fā)射第一項(xiàng)(或者滿足某個(gè)條件的第一項(xiàng))數(shù)據(jù)

由上圖我們可以看出,這個(gè)只要第一項(xiàng)滿足條件,后面的將不會(huì)再進(jìn)行發(fā)射,所以只是得到了1這個(gè)數(shù)字。
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable a = Observable.just(1, 2, 3, 4, 5);
a.first().subscribe(subscriber);
得到結(jié)果:
onNext: 1
onCompleted: onCompleted
這個(gè)應(yīng)該很容易就看出來了。就是只是打印了第一個(gè)數(shù)據(jù)!
在這兒必須為大家區(qū)別一個(gè)操作符:single(),這個(gè)操作符也是只打印一個(gè)數(shù)據(jù)的,但是single()和first()最大的區(qū)別在于:前者只會(huì)發(fā)射一個(gè)數(shù)據(jù),不能發(fā)射多個(gè),否則會(huì)報(bào)錯(cuò);而first確實(shí)滿足條件的那一個(gè)。
如下:
Observable a = Observable.just(1);
a.single().subscribe(subscriber);
估計(jì)到這兒應(yīng)該已經(jīng)有人知道了上面的3個(gè)步驟改真沒寫了,來我們來看看代碼:
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//依次檢查memory、disk、network
Observable
.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
現(xiàn)在看上面的代碼是不是就知道它在干什么了,是不是很簡單!這個(gè)緩存檢測場景就講到這里。
輸入合法場景
在某些時(shí)候,我們需要所以的輸入都合法后,我們的某些按鈕才亮起來,或者才能點(diǎn)擊,如下圖:

在這個(gè)場景中,我們得掌握兩個(gè)操作符:skip和combineLatest
skip
抑制Observable發(fā)射的前N項(xiàng)數(shù)據(jù)

從上圖可以看到,總共發(fā)射了4個(gè)數(shù)據(jù),只有最后兩個(gè)發(fā)射出去了,這就是skip(2)的作用。
Observable.just(1,2,3,4).skip(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
});
得到如下結(jié)果:
onNext: 3
onNext: 4
onCompleted: onCompleted
combineLatest
當(dāng)多個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。
CombineLatest在原始的Observable中任意一個(gè)發(fā)射了數(shù)據(jù)時(shí)發(fā)射一條數(shù)據(jù)。當(dāng)原始Observables的任何一個(gè)發(fā)射了一條數(shù)據(jù)時(shí),CombineLatest
使用一個(gè)函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù),然后發(fā)射這個(gè)函數(shù)的返回值。
一開始看到這句話,我又懵b了,這tm幾個(gè)意思?我們先來看看這個(gè)場景的實(shí)現(xiàn)代碼,然后再解釋:
private void combineLatestEvent() {
Observable<CharSequence> usernameObservable = RxTextView.textChanges(mUsername).skip(1);
Observable<CharSequence> emailObservable = RxTextView.textChanges(mEmail).skip(1);
Observable<CharSequence> passwordObservable = RxTextView.textChanges(mPassword).skip(1);
Subscription subscription = Observable.combineLatest(usernameObservable, emailObservable,
passwordObservable,
new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence userName, CharSequence email, CharSequence
password) {
boolean isUserNameValid = !TextUtils.isEmpty(userName) && (userName
.toString().length() > 2 && userName.toString().length() < 9);
if (!isUserNameValid) {
mUsername.setError("用戶名無效");
}
boolean isEmailValid = !TextUtils.isEmpty(email) && Patterns
.EMAIL_ADDRESS.matcher(email).matches();
if (!isEmailValid) {
mEmail.setError("郵箱無效");
}
boolean isPasswordValid = !TextUtils.isEmpty(password) && (password
.toString().length() >5 && password.toString().length() < 11);
if (!isPasswordValid) {
mPassword.setError("密碼無效");
}
return isUserNameValid && isEmailValid && isPasswordValid;
}
})
.subscribe(getObserver());
}
private Observer<Boolean> getObserver() {
return new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
//更改注冊按鈕是否可用的狀態(tài)
register.setEnabled(aBoolean);
}
};
}
這個(gè)場景,有3個(gè)edittext,分別是mUsername,mEmail,mPassword,通過輸入合法的內(nèi)容進(jìn)行判定注冊按鈕是否亮起來。
當(dāng)我點(diǎn)擊其中的任何一個(gè)進(jìn)行編寫的時(shí)候,就會(huì)發(fā)射數(shù)據(jù),發(fā)射的是什么?是我們編輯的內(nèi)容嗎?其實(shí)不是的,發(fā)射的是結(jié)合Func3這個(gè)方法的返回值,在這里這個(gè)返回值是Boolean型的。返回了boolean型之后,就可以在觀察者里面設(shè)置注冊按鈕是否亮起來。現(xiàn)在再看上面那句高深莫測的話,是不是簡單多了!
這兒可能有人有疑問了:這3個(gè)edittext為什么要使用skip(1)呢?
答案其實(shí)很簡答啊,那就是當(dāng)我們不寫skip(1)的時(shí)候,edittext中沒有輸入任何值的時(shí)候,會(huì)把它當(dāng)作第一個(gè)數(shù)據(jù)進(jìn)行發(fā)射,雖然發(fā)射的是個(gè)空數(shù)據(jù),但是還是會(huì)發(fā)射啊!
奧偶,這個(gè)場景解釋完了!
數(shù)據(jù)過期場景
其實(shí)這個(gè)場景可以和上面的數(shù)據(jù)緩存檢測場景進(jìn)行合并:在緩存檢測場景中,我們知道,如果memory中沒有數(shù)據(jù),就從disk上面尋找,然后再是網(wǎng)絡(luò)請求,那么,問題來了,如果我們的memory中一直有數(shù)據(jù),但是網(wǎng)絡(luò)數(shù)據(jù)已經(jīng)變更了,又由于緩存檢測原則的只要有一個(gè)有數(shù)據(jù)就不會(huì)進(jìn)行網(wǎng)絡(luò)請求了,這就會(huì)造成我們顯示的數(shù)據(jù)一直是一個(gè)舊數(shù)據(jù)。

那這個(gè)該怎么辦呢?
解決方法有如下兩個(gè):
- 采用定時(shí)進(jìn)行清除本地緩存數(shù)據(jù)
- 采用過濾操作符
我們先來看看第一種,如果是進(jìn)行定時(shí)做本地?cái)?shù)據(jù)清空的話,那么就會(huì)用到,我們一個(gè)輪詢的操作符Interval
創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable
Interval通俗的講,就是每隔一段時(shí)間過后做什么事情!上一篇已經(jīng)講過了,所以這里就不詳細(xì)講解了,直接上代碼:
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
...
@Override
public void onNext(Long aLong) {
//清除緩存操作
}
});
很多人可能會(huì)想到,那既然,我能夠用清除本地緩存的方法,那么能不能用,每隔一段時(shí)間進(jìn)行請求,讓請求的結(jié)果與本地緩存進(jìn)行合并呢?
答案是肯定的,來看如下代碼:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> observer) {
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
})
這個(gè)就是使用schedulePeriodically做輪詢請求
這樣造成每過一定時(shí)間我們就會(huì),清除緩存或者網(wǎng)絡(luò)請求。讀到這兒,是不是感覺這個(gè)方法真爛,哈哈哈,不著急,我們不是還有第二種方法嘛!來接著看第二種方法
-
采用過濾操作符
其實(shí)這個(gè)操作符我們已經(jīng)講過了,那就是操作符first,回顧一下上面的代碼,就是我們的first就是保證,眾多的數(shù)據(jù),有一個(gè)符合條件就發(fā)射數(shù)據(jù),后面的都將不執(zhí)行。我們的是否需要更新的條件不加在這里,就沒天理咯!
Observable source = Observable
.concat(memory, disk, network)
.first(new Func1() {
@Override public Boolean call(Data data) {
return data.isUpToDate();
}
});
哇偶,這個(gè)操作符完美的解決了如上的問題!那你丫的還將那么多,呵呵,我只是給大家講解操作符的使用場景而已,那個(gè)適合哪個(gè)場景,取決你們自己咯!
這一篇主要講解的內(nèi)容的就到這兒了,下面還有一些其他的場景,就簡單的介紹一下。
其他的場景
合并兩個(gè)數(shù)據(jù)源場景
使用merge合并兩個(gè)數(shù)據(jù)源,代碼如下:
Observable.merge(getInfoFromFile(), getInfoFromNet())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(String data) {
Log.d(TAG, "onNext: only one ! ");
});
Retrofit結(jié)合RxJava場景
這個(gè)場景的話,大家可以查看扔物線大神寫的給 Android 開發(fā)者的 RxJava 詳解,其中講解到了這個(gè)場景的結(jié)合!
就操作符使用場景這一塊而言,大概就講解這么多,如果大家有其他的使用場景,我們可以一起交流哦。感謝大家的支持,謝謝!