一、創(chuàng)建操作符
1、create
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<String> e) throws Exception {
? ? ? ? e.onNext("Hello Observer");
? ? ? ? e.onComplete();
? ? }
});
說明:創(chuàng)建一個被觀察者Observable
2、just
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "=================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "=================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "=================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "=================onComplete ");
? ? }
});
打?。?/p>
=================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete
說明:創(chuàng)建一個被觀察者,并發(fā)送事件,發(fā)送的事件不可以超過10個以上。
3、fromArray
Integer[] array = new Integer[]{1, 2, 3, 4};
? ? ? ? Observable.fromArray(array)
? ? ? ? ? ? ? ? .subscribe(new Observer < Integer > () {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onSubscribe");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(Integer integer) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onNext " + integer);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onError ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onComplete ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打?。?/p>
=================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete
說明:這個方法和 just() 類似,只不過 fromArray 可以傳入多于10個的變量,并且可以傳入一個數(shù)組。
4、fromCallable
Observable.fromCallable(new Callable < Integer > () {
? ? @Override
? ? public Integer call() throws Exception {
? ? ? ? return 1;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "================accept " + integer);
? ? }
});
================accept 1
說明:這里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發(fā)給觀察者的。
5、fromFuture
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
? ? @Override
? ? public String call() throws Exception {
? ? ? ? Log.d(TAG, "CallableDemo is Running");
? ? ? ? return "返回結果";
? ? }
});
Observable.fromFuture(futureTask)
? ? .doOnSubscribe(new Consumer < Disposable > () {
? ? @Override
? ? public void accept(Disposable disposable) throws Exception {
? ? ? ? futureTask.run();
? ? }
})
.subscribe(new Consumer < String > () {
? ? @Override
? ? public void accept(String s) throws Exception {
? ? ? ? Log.d(TAG, "================accept " + s);
? ? }
});
打?。?/p>
CallableDemo is Running
================accept 返回結果
說明:參數(shù)中的 Future 是 java.util.concurrent 中的 Future,F(xiàn)uture 的作用是增加了 cancel() 等方法操作 Callable,它可以通過 get() 方法來獲取 Callable 返回的值。
doOnSubscribe() 的作用就是只有訂閱時才會發(fā)送事件。
6、fromIterable
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "=================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "=================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "=================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "=================onComplete ");
? ? }
});
打?。?/p>
=================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete
說明:直接發(fā)送一個 List 集合數(shù)據(jù)給觀察者
8、defer
// i 要定義為成員變量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
? ? @Override
? ? public ObservableSource<? extends Integer> call() throws Exception {
? ? ? ? return Observable.just(i);
? ? }
});
i = 200;
Observer<Integer> observer = new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
打?。?/p>
================onNext 200
================onNext 300
說明:因為 defer() 只有觀察者訂閱的時候才會創(chuàng)建新的被觀察者,所以每訂閱一次就會打印一次,并且都是打印 i 最新的值。
9、timer
Observable.timer(5, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? .subscribe(new Observer<Long>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "===============onSubscribe " + d);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "===============onNext " + aLong);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打?。?/p>
2019-08-19 13:30:09.819 26417-26417/com.xbox D/SDK: ===============onSubscribe null
2019-08-19 13:30:14.820 26417-26446/com.xbox D/SDK: ===============onNext 0
說明:timer的第一個參數(shù)是延時5秒,第二個參數(shù)是單位。當執(zhí)行完onSubscribe方法后延時5秒執(zhí)行onNext方法,返回的參數(shù)為0
10、interval
Observable.interval(2, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? .subscribe(new Observer < Long > () {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打印:
==============onSubscribe
==============onNext 0
==============onNext 1
==============onNext 2
==============onNext 3
.............
.............
說明:第一個參數(shù)為每隔多少秒執(zhí)行onNext方法(從0開始),當執(zhí)行完onSubscribe后,每隔2秒執(zhí)行onNext方法
11、intervalRange
Observable.intervalRange(4, 5, 8, 3, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? .subscribe(new Observer < Long > () {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打?。?/p>
==============onSubscribe
==============onNext 4
==============onNext 5
==============onNext 6
==============onNext 7
==============onNext 8
說明:
第一個參數(shù):從4開始計數(shù)
第二個參數(shù):一共執(zhí)行onNext5次(五個值)
第三個參數(shù):當執(zhí)行完onSubscribe方法之后,隔8秒開始執(zhí)行第一個onNext方法
第四個參數(shù):執(zhí)行完第一個onNext之后,每隔3秒執(zhí)行下一個onNext方法
依次+1遞增
12、range
Observable.range(2, 5)
? ? ? ? ? ? ? ? .subscribe(new Observer < Integer > () {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(Integer aLong) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打?。?/p>
==============onSubscribe
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6
說明:從2開始執(zhí)行5次onNext方法,+1遞增
13、rangeLong
說明:與 range() 一樣,只是數(shù)據(jù)類型為 Long 這里就不上代碼了
14、 empty() & never() & error()
Observable.empty()
.subscribe(new Observer < Object > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Object o) {
? ? ? ? Log.d(TAG, "==================onNext");
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError " + e);
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete");
? ? }
});
說明:
empty():直接發(fā)送 onComplete() 事件
never():不發(fā)送任何事件
error():發(fā)送 onError() 事件
二、轉換操作符
1、map
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
? ? @Override
? ? public String apply(Integer integer) throws Exception {
? ? ? ? return "I'm " + integer;
? ? }
})
.subscribe(new Observer < String > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.e(TAG, "===================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.e(TAG, "===================onNext " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3
說明:map 可以將被觀察者發(fā)送的數(shù)據(jù)類型轉變成其他的類型,上述代碼的作用是將 Integer 類型的數(shù)據(jù)轉換成 String。
2、flatMap
List<Person> personList = new ArrayList<>();
? ? ? ? List<Plan> plansList = new ArrayList<>();
? ? ? ? List<String> actionList = new ArrayList<>();
? ? ? ? actionList.add("玩游戲");
? ? ? ? actionList.add("寫作業(yè)");
? ? ? ? actionList.add("看書");
? ? ? ? Plan plan = new Plan("1", "小明的計劃");
? ? ? ? plan.setActionList(actionList);
? ? ? ? plansList.add(plan);
? ? ? ? Person person = new Person("小明", plansList);
? ? ? ? personList.add(person);
? ? ? ? List<String> actionList2 = new ArrayList<>();
? ? ? ? List<Plan> plansList2 = new ArrayList<>();
? ? ? ? actionList2.add("開電腦");
? ? ? ? actionList2.add("打王者");
? ? ? ? actionList2.add("吃雞");
? ? ? ? Plan plan2 = new Plan("2", "小紅的計劃");
? ? ? ? plan2.setActionList(actionList2);
? ? ? ? plansList2.add(plan2);
? ? ? ? Person person2 = new Person("小紅", plansList2);
? ? ? ? personList.add(person2);
? ? ? ? Observable.fromIterable(personList)
? ? ? ? ? ? ? ? .flatMap(new Function<Person, ObservableSource<Plan>>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public ObservableSource<Plan> apply(Person person) {
? ? ? ? ? ? ? ? ? ? ? ? return Observable.fromIterable(person.getPlanList());
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? })
? ? ? ? ? ? ? ? .flatMap(new Function<Plan, ObservableSource<String>>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public ObservableSource<String> apply(Plan plan) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? return Observable.fromIterable(plan.getActionList());
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? })
? ? ? ? ? ? ? ? .subscribe(new Observer<String>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(String s) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==================action: " + s);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
public class Person {
? ? ? ? private String name;
? ? ? ? private List<Plan> planList;
? ? ? ? public Person(String name, List<Plan> planList) {
? ? ? ? ? ? this.name = name;
? ? ? ? ? ? this.planList = planList;
? ? ? ? }
? ? ? ? public String getName() {
? ? ? ? ? ? return name;
? ? ? ? }
? ? ? ? public void setName(String name) {
? ? ? ? ? ? this.name = name;
? ? ? ? }
? ? ? ? public List<Plan> getPlanList() {
? ? ? ? ? ? return planList;
? ? ? ? }
? ? ? ? public void setPlanList(List<Plan> planList) {
? ? ? ? ? ? this.planList = planList;
? ? ? ? }
? ? }
? ? public class Plan {
? ? ? ? private String time;
? ? ? ? private String content;
? ? ? ? private List<String> actionList = new ArrayList<>();
? ? ? ? public Plan(String time, String content) {
? ? ? ? ? ? this.time = time;
? ? ? ? ? ? this.content = content;
? ? ? ? }
? ? ? ? public String getTime() {
? ? ? ? ? ? return time;
? ? ? ? }
? ? ? ? public void setTime(String time) {
? ? ? ? ? ? this.time = time;
? ? ? ? }
? ? ? ? public String getContent() {
? ? ? ? ? ? return content;
? ? ? ? }
? ? ? ? public void setContent(String content) {
? ? ? ? ? ? this.content = content;
? ? ? ? }
? ? ? ? public List<String> getActionList() {
? ? ? ? ? ? return actionList;
? ? ? ? }
? ? ? ? public void setActionList(List<String> actionList) {
? ? ? ? ? ? this.actionList = actionList;
? ? ? ? }
? ? }
打?。?/p>
==================action: 玩游戲
==================action: 寫作業(yè)
==================action: 看書
==================action: 開電腦
==================action: 打王者
==================action: 吃雞
說明:通過flatMap打印出所有的action
3、concatMap
說明:與flatMap用法基本一樣,只不過 concatMap() 轉發(fā)出來的事件是有序的,而 flatMap() 是無序的
4、buffer
Observable.just(1, 2, 3, 4, 5,6,7)
? ? ? ? ? ? ? ? .buffer(3, 2)
? ? ? ? ? ? ? ? .subscribe(new Observer < List < Integer >> () {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(List < Integer > integers) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "================緩沖區(qū)大?。?" + integers.size());
? ? ? ? ? ? ? ? ? ? ? ? for (Integer i: integers) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "================元素: " + i);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打印:
================緩沖區(qū)大?。?3
================元素: 1
================元素: 2
================元素: 3
================緩沖區(qū)大?。?3
================元素: 3
================元素: 4
================元素: 5
================緩沖區(qū)大?。?3
================元素: 5
================元素: 6
================元素: 7
================緩沖區(qū)大?。?1
================元素: 7
說明:buffer從需要發(fā)送的事件當中獲取一定數(shù)量的事件,并將這些事件放到緩沖區(qū)當中一并發(fā)出。
第一個參數(shù)代表緩沖區(qū)元素的數(shù)量,第二個參數(shù)表示下一次事件序列的時候要跳過多少元素,比如例子中第二個參數(shù)是2,那么在次遍歷就從3開始,跳過了1和2。
5、groupBy
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
? ? ? ? ? ? ? ? .groupBy(new Function<Integer, Integer>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public Integer apply(Integer integer) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? return integer % 3;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? })
? ? ? ? ? ? ? ? .subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onSubscribe ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onNext ");
? ? ? ? ? ? ? ? ? ? ? ? integerIntegerGroupedObservable.subscribe(new Observer<Integer>() {
? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onSubscribe ");
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onNext(Integer integer) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onNext? groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onError ");
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onComplete ");
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onError ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onComplete ");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
打?。?/p>
====================onSubscribe
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext? groupName: 1 value: 1
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext? groupName: 2 value: 2
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext? groupName: 0 value: 3
====================GroupedObservable onNext? groupName: 1 value: 4
====================GroupedObservable onNext? groupName: 2 value: 5
====================GroupedObservable onNext? groupName: 0 value: 6
====================GroupedObservable onNext? groupName: 1 value: 7
====================GroupedObservable onNext? groupName: 2 value: 8
====================GroupedObservable onNext? groupName: 0 value: 9
====================GroupedObservable onNext? groupName: 1 value: 10
====================GroupedObservable onComplete
====================GroupedObservable onComplete
====================onComplete
說明:在 groupBy() 方法返回的參數(shù)是分組的名字(integerIntegerGroupedObservable.getKey()),每返回一個值,那就代表會創(chuàng)建一個組,以上的代碼就是將1~10的數(shù)據(jù)分成3組。
6、scan
Observable.just(1, 2, 3)
.scan(new BiFunction < Integer, Integer, Integer > () {
? ? @Override
? ? public Integer apply(Integer integer, Integer integer2) throws Exception {
? ? ? ? Log.d(TAG, "====================apply ");
? ? ? ? Log.d(TAG, "====================integer " + integer);
? ? ? ? Log.d(TAG, "====================integer2 " + integer2);
? ? ? ? return integer + integer2;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "====================accept " + integer);
? ? }
});
打?。?/p>
====================accept 1
====================apply
====================integer 1
====================integer2 2
====================accept 3
====================apply
====================integer 3
====================integer2 3
====================accept 6
說明:scan將數(shù)據(jù)以一定的邏輯聚合起來,相當于從第一個元素開始(1)開始? 元素1和元素2相加和與元素3相加
7、window
Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "=====================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Observable < Integer > integerObservable) {
? ? ? ? integerObservable.subscribe(new Observer < Integer > () {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onSubscribe(Disposable d) {
? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onSubscribe ");
? ? ? ? ? ? }
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onNext(Integer integer) {
? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onNext " + integer);
? ? ? ? ? ? }
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onError(Throwable e) {
? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onError ");
? ? ? ? ? ? }
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onComplete() {
? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onComplete ");
? ? ? ? ? ? }
? ? ? ? });
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "=====================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "=====================onComplete ");
? ? }
});
打?。?/p>
=====================onSubscribe
=====================integerObservable onSubscribe
=====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 5
=====================integerObservable onComplete
=====================onComplete
說明:window 中的 count 的參數(shù)就是代表指定的數(shù)量,例如將 count 指定為2,那么每發(fā)2個數(shù)據(jù)就會將這2個數(shù)據(jù)分成一組。例中,window() 將 1~5 的事件分成了3組。
三、組合操作符
1、concat
Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
說明:可以將多個觀察者組合在一起,然后按照之前發(fā)送順序發(fā)送事件。需要注意的是,concat() 最多只可以發(fā)送4個事件。
2、concatArray
Observable.concatArray(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8),
Observable.just(9, 10))
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
================onNext 9
================onNext 10
說明:與 concat() 作用一樣,不過 concatArray() 可以發(fā)送多于 4 個被觀察者。
3、merge
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
? ? @Override
? ? public String apply(Long aLong) throws Exception {
? ? ? ? return "A" + aLong;
? ? }
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
? ? @Override
? ? public String apply(Long aLong) throws Exception {
? ? ? ? return "B" + aLong;
? ? }
}))
? ? .subscribe(new Observer < String > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.d(TAG, "=====================onNext " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
=====================onNext A0
=====================onNext B0
=====================onNext A1
=====================onNext B1
=====================onNext B2
=====================onNext A2
=====================onNext B3
=====================onNext A3
=====================onNext A4
=====================onNext B4
=====================onNext A5
=====================onNext B5
..............
說明:這個方法月 concat() 作用基本一樣,知識 concat() 是串行發(fā)送事件,而 merge() 并行發(fā)送事件。
把merge換成concat之后打印如下:
=====================onNext A0
=====================onNext A1
=====================onNext A2
=====================onNext A3
=====================onNext A4
..............
說明:只有等到第一個被觀察者發(fā)送完事件之后,第二個被觀察者才會發(fā)送事件。mergeArray() 與 merge() 的作用是一樣的,只是它可以發(fā)送4個以上的被觀察者,這里就不再贅述了。
4、concatArrayDelayError() & mergeArrayDelayError()
Observable.concatArray(
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onError(new NumberFormatException());
? ? }
}), Observable.just(2, 3, 4))
? ? .subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "===================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
===================onNext 1
===================onError
說明:在 concatArray() 和 mergeArray() 兩個方法當中,如果其中有一個被觀察者發(fā)送了一個 Error 事件,那么就會停止發(fā)送事件,如果你想 onError() 事件延遲到所有被觀察者都發(fā)送完事件后再執(zhí)行的話,就可以使用? concatArrayDelayError() 和 mergeArrayDelayError();
從結果可以知道,確實中斷了,現(xiàn)在換用 concatArrayDelayError(),代碼如下:
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onError(new NumberFormatException());
? ? }
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "===================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? }
});
打?。?/p>
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError
說明:出現(xiàn)錯誤延遲回調
5、zip
Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
? ? .map(new Function<Long, String>() {
? ? ? ? @Override
? ? ? ? public String apply(Long aLong) throws Exception {
? ? ? ? ? ? String s1 = "A" + aLong;
? ? ? ? ? ? Log.d(TAG, "===================A 發(fā)送的事件 " + s1);
? ? ? ? ? ? return s1;
? ? ? ? }}),
? ? ? ? Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
? ? ? ? ? ? .map(new Function<Long, String>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public String apply(Long aLong) throws Exception {
? ? ? ? ? ? ? ? String s2 = "B" + aLong;
? ? ? ? ? ? ? ? Log.d(TAG, "===================B 發(fā)送的事件 " + s2);
? ? ? ? ? ? ? ? return s2;
? ? ? ? ? ? }
? ? ? ? }),
? ? ? ? new BiFunction<String, String, String>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public String apply(String s, String s2) throws Exception {
? ? ? ? ? ? ? ? String res = s + s2;
? ? ? ? ? ? ? ? return res;
? ? ? ? ? ? }
? ? ? ? })
.subscribe(new Observer<String>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "===================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.d(TAG, "===================onNext " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "===================onComplete ");
? ? }
});
打?。?/p>
===================onSubscribe
===================A 發(fā)送的事件 A1
===================B 發(fā)送的事件 B1
===================onNext A1B1
===================A 發(fā)送的事件 A2
===================B 發(fā)送的事件 B2
===================onNext A2B2
===================A 發(fā)送的事件 A3
===================B 發(fā)送的事件 B3
===================onNext A3B3
===================A 發(fā)送的事件 A4
===================B 發(fā)送的事件 B4
===================onNext A4B4
===================A 發(fā)送的事件 A5
===================B 發(fā)送的事件 B5
===================onNext A5B5
===================onComplete
說明:會將多個被觀察者合并,根據(jù)各個被觀察者發(fā)送事件的順序一個個結合起來,最終發(fā)送的事件數(shù)量會與源 Observable 中最少事件的數(shù)量一樣,上面代碼中有兩個 Observable,第一個發(fā)送事件的數(shù)量為5個,第二個發(fā)送事件的數(shù)量為6個??梢园l(fā)現(xiàn)最終接收到的事件數(shù)量是5,那么為什么第二個 Observable 沒有發(fā)送第6個事件呢?因為在這之前第一個 Observable 已經(jīng)發(fā)送了 onComplete 事件,所以第二個 Observable 不會再發(fā)送事件。
6、combineLatest() & combineLatestDelayError()
Observable.combineLatest(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
? ? .map(new Function < Long, String > () {@Override
? ? public String apply(Long aLong) throws Exception {
? ? ? ? String s1 = "A" + aLong;
? ? ? ? Log.d(TAG, "===================A 發(fā)送的事件 " + s1);
? ? ? ? return s1;
? ? }
}),
Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
? ? .map(new Function < Long, String > () {@Override
? ? public String apply(Long aLong) throws Exception {
? ? ? ? String s2 = "B" + aLong;
? ? ? ? Log.d(TAG, "===================B 發(fā)送的事件 " + s2);
? ? ? ? return s2;
? ? }
}),
new BiFunction < String, String, String > () {@Override
? ? public String apply(String s, String s2) throws Exception {
? ? ? ? String res = s + s2;
? ? ? ? return res;
? ? }
})
.subscribe(new Observer < String > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "===================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.d(TAG, "===================最終接收到的事件 " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "===================onComplete ");
? ? }
});
打印:
===================onSubscribe
===================A 發(fā)送的事件 A1
===================A 發(fā)送的事件 A2
===================B 發(fā)送的事件 B1
===================最終接收到的事件 A2B1
===================A 發(fā)送的事件 A3
===================最終接收到的事件 A3B1
===================A 發(fā)送的事件 A4
===================B 發(fā)送的事件 B2
===================最終接收到的事件 A4B1
===================最終接收到的事件 A4B2
===================B 發(fā)送的事件 B3
===================最終接收到的事件 A4B3
===================B 發(fā)送的事件 B4
===================最終接收到的事件 A4B4
===================B 發(fā)送的事件 B5
===================最終接收到的事件 A4B5
===================onComplete
說明:combineLatest() 的作用與 zip() 類似,但是 combineLatest() 發(fā)送事件的序列是與發(fā)送的時間線有關的,當 combineLatest() 中所有的 Observable 都發(fā)送了事件,只要其中有一個 Observable 發(fā)送事件,這個事件就會和其他 Observable 最近發(fā)送的事件結合起來發(fā)送,分析上面的代碼,Observable A 會每隔1秒就發(fā)送一次事件,Observable B 會隔2秒發(fā)送一次事件。當發(fā)送 A1 事件之后,因為 B 并沒有發(fā)送任何事件,所以根本不會發(fā)生結合。當 B 發(fā)送了 B1 事件之后,就會與 A 最近發(fā)送的事件 A2 結合成 A2B1,這樣只有后面一有被觀察者發(fā)送事件,這個事件就會與其他被觀察者最近發(fā)送的事件結合起來了。
因為 combineLatestDelayError() 就是多了延遲發(fā)送 onError() 功能,這里就不再贅述了。
7、reduce
Observable.just(3, 7, 10, 2)
.reduce(new BiFunction < Integer, Integer, Integer > () {
? ? @Override
? ? public Integer apply(Integer integer, Integer integer2) throws Exception {
? ? ? ? int res = integer + integer2;
? ? ? ? Log.d(TAG, "====================integer " + integer);
? ? ? ? Log.d(TAG, "====================integer2 " + integer2);
? ? ? ? Log.d(TAG, "====================res " + res);
? ? ? ? return res;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "==================accept " + integer);
? ? }
});
打?。?/p>
====================integer 3
====================integer2 7
====================res 10
====================integer 10
====================integer2 10
====================res 20
====================integer 20
====================integer2 2
====================res 22
==================accept 22
說明:與 scan() 操作符的作用也是將發(fā)送數(shù)據(jù)以一定邏輯聚合起來,這兩個的區(qū)別在于 scan() 每處理一次數(shù)據(jù)就會將事件發(fā)送給觀察者,而 reduce() 會將所有數(shù)據(jù)聚合在一起才會發(fā)送事件給觀察者。從結果可以看到,其實就是前2個數(shù)據(jù)聚合之后,然后再與后1個數(shù)據(jù)進行聚合,一直到?jīng)]有數(shù)據(jù)為止,把最終的結果通過觀察者一次輸出。
8、collect
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
? ? @Override
? ? public ArrayList < Integer > call() throws Exception {
? ? ? ? return new ArrayList < > ();
? ? }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
? ? @Override
? ? public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
? ? ? ? integers.add(integer);
? ? }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
? ? @Override
? ? public void accept(ArrayList < Integer > integers) throws Exception {
? ? ? ? Log.d(TAG, "===============accept " + integers);
? ? }
});
打?。?/p>
===============accept [1, 2, 3, 4]
說明:
將數(shù)據(jù)收集到數(shù)據(jù)結構當中。
9、startWith() & startWithArray()
Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "================accept " + integer);
? ? }
});
打?。?/p>
================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7
說明:在發(fā)送事件之前追加事件,startWith() 追加一個事件,startWithArray() 可以追加多個事件。追加的事件會先發(fā)出。
10、count
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
? ? @Override
? ? public void accept(Long aLong) throws Exception {
? ? ? ? Log.d(TAG, "=======================aLong " + aLong);
? ? }
});
打?。?/p>
=======================aLong 3
說明:返回被觀察者發(fā)送事件的數(shù)量。