RxJava的基本流程以及線程切換可以參考之前的文章
http://www.itdecent.cn/p/2adaea7237c4
1、序言
RxJava除了擁有邏輯簡潔的事件流鏈?zhǔn)秸{(diào)用,使用簡單外其豐富的操作符基本可以滿足日常開發(fā)中的各種實(shí)現(xiàn)邏輯
Rx的基本操作符分類

下面會逐一講解每一類操作符的使用
2、創(chuàng)建操作符

2.1、基本創(chuàng)建操作符
create作為RxJava最基本的創(chuàng)建操作,用來完整的創(chuàng)建一個(gè)被觀察者Observable對象
通過create創(chuàng)建一個(gè)被觀察Observable對象
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//重寫subscribe 寫入實(shí)際的代碼邏輯
if (!e.isDisposed()) {
e.onNext("RxText");
}
}
});
創(chuàng)建觀察者Observer對象
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
LogUtils.showLog("message == " + (String) o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
通過subscribe進(jìn)行關(guān)聯(lián)
observable.subscribe(observer);
在實(shí)際使用時(shí)一般鏈?zhǔn)秸{(diào)用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("test");
}
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
LogUtils.showLog("s == " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2.2、快速創(chuàng)建操作符
使用場景,快速的創(chuàng)建被觀察者并進(jìn)行數(shù)據(jù)發(fā)送
1、just()
作用:快速創(chuàng)建1個(gè)被觀察者對象Observable
注意:just只能傳入最多10個(gè)數(shù)據(jù)
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
LogUtils.showLog("integer == " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
輸出:
D/hzfTag1204: integer == 1
integer == 2
integer == 3
integer == 4
integer == 5
2、fromArray()
作用:快速創(chuàng)建一個(gè)被觀察者,直接發(fā)送傳入的數(shù)組數(shù)據(jù),當(dāng)發(fā)送數(shù)據(jù)大于10時(shí)可以考慮采用fromArray
int[] arrays = {1, 2, 3, 4, 5};
Observable.fromArray(arrays)
.subscribe(new Observer<int[]>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(int[] ints) {
//遍歷數(shù)組并輸出
for (int num : ints) {
LogUtils.showLog("num == " + num);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3、 fromIterable
作用:快速創(chuàng)建一個(gè)Observable并將集合當(dāng)中的數(shù)據(jù)發(fā)送
fromIterable的使用與fromArray一致,數(shù)據(jù)由fromArray的數(shù)組改為集合,不做具體的贅述了
BUT!!! 我在做測試的時(shí)候發(fā)現(xiàn)了一個(gè)好玩兒的事情.....
fromArray中也可以傳一個(gè)list;但是fromIterable不能傳數(shù)組
fromIterable不能傳數(shù)組根據(jù)代碼很明顯,其對參數(shù)做了限制

但fromArray沒有做限制,當(dāng)我用以下代碼操作時(shí)可以正確拿到list中的數(shù)據(jù)
List list = new ArrayList();
list.add("1");
list.add("2");
list.add("3");
Observable.fromArray(list)
.subscribe(new Observer<List>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List ints) {
LogUtils.showLog("ints == "+ints.get(2));
}
@Override
public void onError(Throwable e) {
LogUtils.showLog("exception == "+e.getMessage().toString());
}
@Override
public void onComplete() {
}
});

通過斷點(diǎn)分析源碼fromArray當(dāng)中的邏輯,當(dāng)fromArray傳入一個(gè)數(shù)組,會走到item.length == 1這個(gè)判斷當(dāng)中,并且最終走的是just操作符

所以通過fromArray傳入集合就相當(dāng)于是just(list);但是為什么會走到items.length == 1這個(gè)判斷當(dāng)中?
這里面涉及到Java可變參數(shù)的概念,fromArray后面參數(shù)是T...意思是參數(shù)不確定,可以傳多個(gè)參數(shù),傳入幾個(gè)參數(shù)這個(gè)items的length就是多少,所以fromArray不論傳的是list還是array,只要傳的是一個(gè)參數(shù),最終都相當(dāng)于通過just將數(shù)據(jù)發(fā)送出去了(相當(dāng)于把對象通過just發(fā)出去);當(dāng)fromArray中的參數(shù)大于1時(shí),會將參數(shù)封裝成為一個(gè)T[]數(shù)組,再將數(shù)組中的每一個(gè)元素逐一發(fā)送。

4、never
作用:只會調(diào)用onSubscribe方法,不會回調(diào)onNext onError onComplete等回調(diào)方法;通過源碼可以看出,內(nèi)部的subscribeActual方法只是調(diào)用了onSubscribe,并沒有執(zhí)行其他的回調(diào)方法

5、empty
作用:當(dāng)訂閱后,被觀察者只會發(fā)送一個(gè)onComplete事件,最終Observer的回調(diào)只有onSubscribe和onComplete會執(zhí)行
6、error
作用:訂閱后僅發(fā)送Error事件,error的參數(shù)可以自定義異常發(fā)送給onError
2.3、延遲創(chuàng)建操作符
延遲創(chuàng)建操作符的需求場景:
(1)當(dāng)經(jīng)過n秒后,執(zhí)行操作x
(2)每經(jīng)過n秒,周期的執(zhí)行操作x
延遲創(chuàng)建操作符的分類:

1、timer
作用:快速創(chuàng)建一個(gè)Observable,并指定一段時(shí)間后發(fā)送onNext(0)事件;onNext的參數(shù)為long類型,默認(rèn)數(shù)值為0
final long startTime = System.currentTimeMillis();
Observable.timer(5,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
LogUtils.showLog((System.currentTimeMillis() - startTime) + " ms后接收到了數(shù)據(jù) " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
12-05 10:07:11.351 12187-12207/com.hzf.test.myapplication D/hzfTag1205: 5005 ms后接收到了數(shù)據(jù) 0
2、defer
作用:等到實(shí)際subscribe訂閱時(shí)才會創(chuàng)建一個(gè)Observable;可以保證Observable的數(shù)據(jù)在訂閱時(shí)是最新的數(shù)據(jù)
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(test1);
}
});
test1 = "test222";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
LogUtils.showLog("s == "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
12-05 10:22:00.299 12308-12308/com.hzf.test.myapplication D/hzfTag1205: s == test222
通過defer的源碼可以出來被觀察者的創(chuàng)建是在subscribeActual訂閱時(shí)
(ObservableDefer)

3、interval
作用:快速創(chuàng)建一個(gè)被觀察者Observable對象,每隔指定時(shí)間發(fā)送一個(gè)事件
interval的參數(shù)最多可用的為4個(gè)參數(shù)
參數(shù)1:初始延遲發(fā)送事件的時(shí)間
參數(shù)2:間隔發(fā)送事件的時(shí)間
參數(shù)3:TimeUnit指定的時(shí)間類型
參數(shù)4:Scheduler,可以手動(dòng)創(chuàng)建一個(gè)worker指定interval的運(yùn)行線程(如果不手動(dòng)選擇第四個(gè)參數(shù),默認(rèn)interval發(fā)生在子線程)
Observable.interval(3, 5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
LogUtils.showLog("aLong == "+aLong+",當(dāng)前線程為 "+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
運(yùn)行結(jié)果為:
12-05 11:17:32.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:37.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:42.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:47.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,當(dāng)前線程為 RxComputationThreadPool-1
也可以指定調(diào)度器,例如:
Observable.interval(3, 5, TimeUnit.SECONDS, new Scheduler() {
@Override
public Worker createWorker() {
return AndroidSchedulers.mainThread().createWorker();
}
})
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
LogUtils.showLog("aLong == "+aLong+",當(dāng)前線程為 "+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
12-05 11:20:19.386 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,當(dāng)前線程為 main
12-05 11:20:24.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,當(dāng)前線程為 main
12-05 11:20:29.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,當(dāng)前線程為 main
12-05 11:20:34.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,當(dāng)前線程為 main
如果運(yùn)用線程操作符的話,經(jīng)過我的實(shí)驗(yàn),當(dāng)調(diào)用subscribeOn時(shí)是不起作用的,實(shí)際發(fā)生事件的線程依然是子線程或者指定的調(diào)度器;而調(diào)用observeOn時(shí),onNext接收事件的線程即為observeOn所指定的線程。
2、intervalRange
作用:快讀創(chuàng)建一個(gè)被觀察者對象,每隔指定時(shí)間發(fā)送數(shù)據(jù),但是與interval不同的是intervalRange可以指定發(fā)送的數(shù)據(jù)數(shù)量
參數(shù)1:起始的事件序號
參數(shù)2:事件數(shù)量
參數(shù)3:第1次事件延遲發(fā)送的時(shí)間
參數(shù)4:事件間的間隔時(shí)間
參數(shù)5:時(shí)間單位
參數(shù)6:Scheduler
Observable.intervalRange(3, 3, 3, 2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
LogUtils.showLog("aLong == " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
運(yùn)行結(jié)果:
12-05 11:30:08.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 3
12-05 11:30:10.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 4
12-05 11:30:12.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 5
2、range
作用:快速創(chuàng)建一個(gè)被觀察者,并連續(xù)發(fā)送一個(gè)事件序列,可指定范圍。功能與intervalRange類似,但實(shí)現(xiàn)的功能會相對簡單一些。
參數(shù)1:起始數(shù)據(jù)
參數(shù)2:發(fā)送多少條數(shù)據(jù)
//從5開始發(fā)送 發(fā)送10個(gè)數(shù)據(jù)
Observable.range(5, 10)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
LogUtils.showLog("integer == " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2、rangeLong
作用:與range類似,但支持long類型