RxJava2.x常用操作符總結(一)

一、創(chuàng)建操作符

1、create

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<String> e) throws Exception {

? ? ? ? e.onNext("Hello Observer");

? ? ? ? e.onComplete();

? ? }

});

說明:創(chuàng)建一個被觀察者Observable

2、just

Observable.just(1, 2, 3)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "=================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "=================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "=================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "=================onComplete ");

? ? }

});

打?。?/p>

=================onSubscribe

=================onNext 1

=================onNext 2

=================onNext 3

=================onComplete

說明:創(chuàng)建一個被觀察者,并發(fā)送事件,發(fā)送的事件不可以超過10個以上。

3、fromArray

Integer[] array = new Integer[]{1, 2, 3, 4};

? ? ? ? Observable.fromArray(array)

? ? ? ? ? ? ? ? .subscribe(new Observer < Integer > () {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onSubscribe");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(Integer integer) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onNext " + integer);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onError ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "=================onComplete ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打?。?/p>

=================onSubscribe

=================onNext 1

=================onNext 2

=================onNext 3

=================onNext 4

=================onComplete

說明:這個方法和 just() 類似,只不過 fromArray 可以傳入多于10個的變量,并且可以傳入一個數(shù)組。

4、fromCallable

Observable.fromCallable(new Callable < Integer > () {

? ? @Override

? ? public Integer call() throws Exception {

? ? ? ? return 1;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "================accept " + integer);

? ? }

});

================accept 1

說明:這里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發(fā)給觀察者的。

5、fromFuture

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {

? ? @Override

? ? public String call() throws Exception {

? ? ? ? Log.d(TAG, "CallableDemo is Running");

? ? ? ? return "返回結果";

? ? }

});

Observable.fromFuture(futureTask)

? ? .doOnSubscribe(new Consumer < Disposable > () {

? ? @Override

? ? public void accept(Disposable disposable) throws Exception {

? ? ? ? futureTask.run();

? ? }

})

.subscribe(new Consumer < String > () {

? ? @Override

? ? public void accept(String s) throws Exception {

? ? ? ? Log.d(TAG, "================accept " + s);

? ? }

});

打?。?/p>

CallableDemo is Running

================accept 返回結果

說明:參數(shù)中的 Future 是 java.util.concurrent 中的 Future,F(xiàn)uture 的作用是增加了 cancel() 等方法操作 Callable,它可以通過 get() 方法來獲取 Callable 返回的值。

doOnSubscribe() 的作用就是只有訂閱時才會發(fā)送事件。

6、fromIterable

List<Integer> list = new ArrayList<>();

list.add(0);

list.add(1);

list.add(2);

list.add(3);

Observable.fromIterable(list)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "=================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "=================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "=================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "=================onComplete ");

? ? }

});

打?。?/p>

=================onSubscribe

=================onNext 0

=================onNext 1

=================onNext 2

=================onNext 3

=================onComplete

說明:直接發(fā)送一個 List 集合數(shù)據(jù)給觀察者

8、defer

// i 要定義為成員變量

Integer i = 100;


Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

? ? @Override

? ? public ObservableSource<? extends Integer> call() throws Exception {

? ? ? ? return Observable.just(i);

? ? }

});

i = 200;

Observer<Integer> observer = new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);

打?。?/p>

================onNext 200

================onNext 300

說明:因為 defer() 只有觀察者訂閱的時候才會創(chuàng)建新的被觀察者,所以每訂閱一次就會打印一次,并且都是打印 i 最新的值。

9、timer

Observable.timer(5, TimeUnit.SECONDS)

? ? ? ? ? ? ? ? .subscribe(new Observer<Long>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "===============onSubscribe " + d);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "===============onNext " + aLong);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打?。?/p>

2019-08-19 13:30:09.819 26417-26417/com.xbox D/SDK: ===============onSubscribe null

2019-08-19 13:30:14.820 26417-26446/com.xbox D/SDK: ===============onNext 0

說明:timer的第一個參數(shù)是延時5秒,第二個參數(shù)是單位。當執(zhí)行完onSubscribe方法后延時5秒執(zhí)行onNext方法,返回的參數(shù)為0

10、interval

Observable.interval(2, TimeUnit.SECONDS)

? ? ? ? ? ? ? ? .subscribe(new Observer < Long > () {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打印:

==============onSubscribe

==============onNext 0

==============onNext 1

==============onNext 2

==============onNext 3

.............

.............

說明:第一個參數(shù)為每隔多少秒執(zhí)行onNext方法(從0開始),當執(zhí)行完onSubscribe后,每隔2秒執(zhí)行onNext方法

11、intervalRange

Observable.intervalRange(4, 5, 8, 3, TimeUnit.SECONDS)

? ? ? ? ? ? ? ? .subscribe(new Observer < Long > () {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(Long aLong) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打?。?/p>

==============onSubscribe

==============onNext 4

==============onNext 5

==============onNext 6

==============onNext 7

==============onNext 8

說明:

第一個參數(shù):從4開始計數(shù)

第二個參數(shù):一共執(zhí)行onNext5次(五個值)

第三個參數(shù):當執(zhí)行完onSubscribe方法之后,隔8秒開始執(zhí)行第一個onNext方法

第四個參數(shù):執(zhí)行完第一個onNext之后,每隔3秒執(zhí)行下一個onNext方法

依次+1遞增

12、range

Observable.range(2, 5)

? ? ? ? ? ? ? ? .subscribe(new Observer < Integer > () {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onSubscribe ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(Integer aLong) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==============onNext " + aLong);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打?。?/p>

==============onSubscribe

==============onNext 2

==============onNext 3

==============onNext 4

==============onNext 5

==============onNext 6

說明:從2開始執(zhí)行5次onNext方法,+1遞增

13、rangeLong

說明:與 range() 一樣,只是數(shù)據(jù)類型為 Long 這里就不上代碼了

14、 empty() & never() & error()

Observable.empty()

.subscribe(new Observer < Object > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Object o) {

? ? ? ? Log.d(TAG, "==================onNext");

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError " + e);

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete");

? ? }

});

說明:

empty():直接發(fā)送 onComplete() 事件

never():不發(fā)送任何事件

error():發(fā)送 onError() 事件

二、轉換操作符

1、map

Observable.just(1, 2, 3)

.map(new Function < Integer, String > () {

? ? @Override

? ? public String apply(Integer integer) throws Exception {

? ? ? ? return "I'm " + integer;

? ? }

})

.subscribe(new Observer < String > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.e(TAG, "===================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.e(TAG, "===================onNext " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

===================onSubscribe

===================onNext I'm 1

===================onNext I'm 2

===================onNext I'm 3

說明:map 可以將被觀察者發(fā)送的數(shù)據(jù)類型轉變成其他的類型,上述代碼的作用是將 Integer 類型的數(shù)據(jù)轉換成 String。

2、flatMap

List<Person> personList = new ArrayList<>();

? ? ? ? List<Plan> plansList = new ArrayList<>();

? ? ? ? List<String> actionList = new ArrayList<>();

? ? ? ? actionList.add("玩游戲");

? ? ? ? actionList.add("寫作業(yè)");

? ? ? ? actionList.add("看書");

? ? ? ? Plan plan = new Plan("1", "小明的計劃");

? ? ? ? plan.setActionList(actionList);

? ? ? ? plansList.add(plan);

? ? ? ? Person person = new Person("小明", plansList);

? ? ? ? personList.add(person);

? ? ? ? List<String> actionList2 = new ArrayList<>();

? ? ? ? List<Plan> plansList2 = new ArrayList<>();

? ? ? ? actionList2.add("開電腦");

? ? ? ? actionList2.add("打王者");

? ? ? ? actionList2.add("吃雞");

? ? ? ? Plan plan2 = new Plan("2", "小紅的計劃");

? ? ? ? plan2.setActionList(actionList2);

? ? ? ? plansList2.add(plan2);

? ? ? ? Person person2 = new Person("小紅", plansList2);

? ? ? ? personList.add(person2);

? ? ? ? Observable.fromIterable(personList)

? ? ? ? ? ? ? ? .flatMap(new Function<Person, ObservableSource<Plan>>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public ObservableSource<Plan> apply(Person person) {

? ? ? ? ? ? ? ? ? ? ? ? return Observable.fromIterable(person.getPlanList());

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? })

? ? ? ? ? ? ? ? .flatMap(new Function<Plan, ObservableSource<String>>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public ObservableSource<String> apply(Plan plan) throws Exception {

? ? ? ? ? ? ? ? ? ? ? ? return Observable.fromIterable(plan.getActionList());

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? })

? ? ? ? ? ? ? ? .subscribe(new Observer<String>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(String s) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "==================action: " + s);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

public class Person {

? ? ? ? private String name;

? ? ? ? private List<Plan> planList;

? ? ? ? public Person(String name, List<Plan> planList) {

? ? ? ? ? ? this.name = name;

? ? ? ? ? ? this.planList = planList;

? ? ? ? }

? ? ? ? public String getName() {

? ? ? ? ? ? return name;

? ? ? ? }

? ? ? ? public void setName(String name) {

? ? ? ? ? ? this.name = name;

? ? ? ? }

? ? ? ? public List<Plan> getPlanList() {

? ? ? ? ? ? return planList;

? ? ? ? }

? ? ? ? public void setPlanList(List<Plan> planList) {

? ? ? ? ? ? this.planList = planList;

? ? ? ? }

? ? }

? ? public class Plan {

? ? ? ? private String time;

? ? ? ? private String content;

? ? ? ? private List<String> actionList = new ArrayList<>();

? ? ? ? public Plan(String time, String content) {

? ? ? ? ? ? this.time = time;

? ? ? ? ? ? this.content = content;

? ? ? ? }

? ? ? ? public String getTime() {

? ? ? ? ? ? return time;

? ? ? ? }

? ? ? ? public void setTime(String time) {

? ? ? ? ? ? this.time = time;

? ? ? ? }

? ? ? ? public String getContent() {

? ? ? ? ? ? return content;

? ? ? ? }

? ? ? ? public void setContent(String content) {

? ? ? ? ? ? this.content = content;

? ? ? ? }

? ? ? ? public List<String> getActionList() {

? ? ? ? ? ? return actionList;

? ? ? ? }

? ? ? ? public void setActionList(List<String> actionList) {

? ? ? ? ? ? this.actionList = actionList;

? ? ? ? }

? ? }

打?。?/p>

==================action: 玩游戲

==================action: 寫作業(yè)

==================action: 看書

==================action: 開電腦

==================action: 打王者

==================action: 吃雞

說明:通過flatMap打印出所有的action

3、concatMap

說明:與flatMap用法基本一樣,只不過 concatMap() 轉發(fā)出來的事件是有序的,而 flatMap() 是無序的

4、buffer

Observable.just(1, 2, 3, 4, 5,6,7)

? ? ? ? ? ? ? ? .buffer(3, 2)

? ? ? ? ? ? ? ? .subscribe(new Observer < List < Integer >> () {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(List < Integer > integers) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "================緩沖區(qū)大?。?" + integers.size());

? ? ? ? ? ? ? ? ? ? ? ? for (Integer i: integers) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "================元素: " + i);

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打印:

================緩沖區(qū)大?。?3

================元素: 1

================元素: 2

================元素: 3

================緩沖區(qū)大?。?3

================元素: 3

================元素: 4

================元素: 5

================緩沖區(qū)大?。?3

================元素: 5

================元素: 6

================元素: 7

================緩沖區(qū)大?。?1

================元素: 7

說明:buffer從需要發(fā)送的事件當中獲取一定數(shù)量的事件,并將這些事件放到緩沖區(qū)當中一并發(fā)出。

第一個參數(shù)代表緩沖區(qū)元素的數(shù)量,第二個參數(shù)表示下一次事件序列的時候要跳過多少元素,比如例子中第二個參數(shù)是2,那么在次遍歷就從3開始,跳過了1和2。

5、groupBy

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

? ? ? ? ? ? ? ? .groupBy(new Function<Integer, Integer>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public Integer apply(Integer integer) throws Exception {

? ? ? ? ? ? ? ? ? ? ? ? return integer % 3;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? })

? ? ? ? ? ? ? ? .subscribe(new Observer<GroupedObservable<Integer, Integer>>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onSubscribe ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onNext ");

? ? ? ? ? ? ? ? ? ? ? ? integerIntegerGroupedObservable.subscribe(new Observer<Integer>() {

? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onSubscribe ");

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onNext(Integer integer) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onNext? groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onError ");

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================GroupedObservable onComplete ");

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? });

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onError ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? ? ? ? ? Log.d(TAG, "====================onComplete ");

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

打?。?/p>

====================onSubscribe

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext? groupName: 1 value: 1

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext? groupName: 2 value: 2

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext? groupName: 0 value: 3

====================GroupedObservable onNext? groupName: 1 value: 4

====================GroupedObservable onNext? groupName: 2 value: 5

====================GroupedObservable onNext? groupName: 0 value: 6

====================GroupedObservable onNext? groupName: 1 value: 7

====================GroupedObservable onNext? groupName: 2 value: 8

====================GroupedObservable onNext? groupName: 0 value: 9

====================GroupedObservable onNext? groupName: 1 value: 10

====================GroupedObservable onComplete

====================GroupedObservable onComplete

====================onComplete

說明:在 groupBy() 方法返回的參數(shù)是分組的名字(integerIntegerGroupedObservable.getKey()),每返回一個值,那就代表會創(chuàng)建一個組,以上的代碼就是將1~10的數(shù)據(jù)分成3組。

6、scan

Observable.just(1, 2, 3)

.scan(new BiFunction < Integer, Integer, Integer > () {

? ? @Override

? ? public Integer apply(Integer integer, Integer integer2) throws Exception {

? ? ? ? Log.d(TAG, "====================apply ");

? ? ? ? Log.d(TAG, "====================integer " + integer);

? ? ? ? Log.d(TAG, "====================integer2 " + integer2);

? ? ? ? return integer + integer2;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "====================accept " + integer);

? ? }

});

打?。?/p>

====================accept 1

====================apply

====================integer 1

====================integer2 2

====================accept 3

====================apply

====================integer 3

====================integer2 3

====================accept 6

說明:scan將數(shù)據(jù)以一定的邏輯聚合起來,相當于從第一個元素開始(1)開始? 元素1和元素2相加和與元素3相加

7、window

Observable.just(1, 2, 3, 4, 5)

.window(2)

.subscribe(new Observer < Observable < Integer >> () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "=====================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Observable < Integer > integerObservable) {

? ? ? ? integerObservable.subscribe(new Observer < Integer > () {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onSubscribe(Disposable d) {

? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onSubscribe ");

? ? ? ? ? ? }

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onNext(Integer integer) {

? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onNext " + integer);

? ? ? ? ? ? }

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onError ");

? ? ? ? ? ? }

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onComplete() {

? ? ? ? ? ? ? ? Log.d(TAG, "=====================integerObservable onComplete ");

? ? ? ? ? ? }

? ? ? ? });

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "=====================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "=====================onComplete ");

? ? }

});

打?。?/p>

=====================onSubscribe

=====================integerObservable onSubscribe

=====================integerObservable onNext 1

=====================integerObservable onNext 2

=====================integerObservable onComplete

=====================integerObservable onSubscribe

=====================integerObservable onNext 3

=====================integerObservable onNext 4

=====================integerObservable onComplete

=====================integerObservable onSubscribe

=====================integerObservable onNext 5

=====================integerObservable onComplete

=====================onComplete

說明:window 中的 count 的參數(shù)就是代表指定的數(shù)量,例如將 count 指定為2,那么每發(fā)2個數(shù)據(jù)就會將這2個數(shù)據(jù)分成一組。例中,window() 將 1~5 的事件分成了3組。

三、組合操作符

1、concat

Observable.concat(Observable.just(1, 2),

Observable.just(3, 4),

Observable.just(5, 6),

Observable.just(7, 8))

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

================onNext 1

================onNext 2

================onNext 3

================onNext 4

================onNext 5

================onNext 6

================onNext 7

================onNext 8

說明:可以將多個觀察者組合在一起,然后按照之前發(fā)送順序發(fā)送事件。需要注意的是,concat() 最多只可以發(fā)送4個事件。

2、concatArray

Observable.concatArray(Observable.just(1, 2),

Observable.just(3, 4),

Observable.just(5, 6),

Observable.just(7, 8),

Observable.just(9, 10))

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

================onNext 1

================onNext 2

================onNext 3

================onNext 4

================onNext 5

================onNext 6

================onNext 7

================onNext 8

================onNext 9

================onNext 10

說明:與 concat() 作用一樣,不過 concatArray() 可以發(fā)送多于 4 個被觀察者。

3、merge

Observable.merge(

Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {

? ? @Override

? ? public String apply(Long aLong) throws Exception {

? ? ? ? return "A" + aLong;

? ? }

}),

Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {

? ? @Override

? ? public String apply(Long aLong) throws Exception {

? ? ? ? return "B" + aLong;

? ? }

}))

? ? .subscribe(new Observer < String > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.d(TAG, "=====================onNext " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

=====================onNext A0

=====================onNext B0

=====================onNext A1

=====================onNext B1

=====================onNext B2

=====================onNext A2

=====================onNext B3

=====================onNext A3

=====================onNext A4

=====================onNext B4

=====================onNext A5

=====================onNext B5

..............

說明:這個方法月 concat() 作用基本一樣,知識 concat() 是串行發(fā)送事件,而 merge() 并行發(fā)送事件。

把merge換成concat之后打印如下:

=====================onNext A0

=====================onNext A1

=====================onNext A2

=====================onNext A3

=====================onNext A4

..............

說明:只有等到第一個被觀察者發(fā)送完事件之后,第二個被觀察者才會發(fā)送事件。mergeArray() 與 merge() 的作用是一樣的,只是它可以發(fā)送4個以上的被觀察者,這里就不再贅述了。

4、concatArrayDelayError() & mergeArrayDelayError()

Observable.concatArray(

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onError(new NumberFormatException());

? ? }

}), Observable.just(2, 3, 4))

? ? .subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "===================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

===================onNext 1

===================onError

說明:在 concatArray() 和 mergeArray() 兩個方法當中,如果其中有一個被觀察者發(fā)送了一個 Error 事件,那么就會停止發(fā)送事件,如果你想 onError() 事件延遲到所有被觀察者都發(fā)送完事件后再執(zhí)行的話,就可以使用? concatArrayDelayError() 和 mergeArrayDelayError();

從結果可以知道,確實中斷了,現(xiàn)在換用 concatArrayDelayError(),代碼如下:

Observable.concatArrayDelayError(

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onError(new NumberFormatException());

? ? }

}), Observable.just(2, 3, 4))

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "===================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? }

});

打?。?/p>

===================onNext 1

===================onNext 2

===================onNext 3

===================onNext 4

===================onError

說明:出現(xiàn)錯誤延遲回調

5、zip

Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)

? ? .map(new Function<Long, String>() {

? ? ? ? @Override

? ? ? ? public String apply(Long aLong) throws Exception {

? ? ? ? ? ? String s1 = "A" + aLong;

? ? ? ? ? ? Log.d(TAG, "===================A 發(fā)送的事件 " + s1);

? ? ? ? ? ? return s1;

? ? ? ? }}),

? ? ? ? Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)

? ? ? ? ? ? .map(new Function<Long, String>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public String apply(Long aLong) throws Exception {

? ? ? ? ? ? ? ? String s2 = "B" + aLong;

? ? ? ? ? ? ? ? Log.d(TAG, "===================B 發(fā)送的事件 " + s2);

? ? ? ? ? ? ? ? return s2;

? ? ? ? ? ? }

? ? ? ? }),

? ? ? ? new BiFunction<String, String, String>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public String apply(String s, String s2) throws Exception {

? ? ? ? ? ? ? ? String res = s + s2;

? ? ? ? ? ? ? ? return res;

? ? ? ? ? ? }

? ? ? ? })

.subscribe(new Observer<String>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "===================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.d(TAG, "===================onNext " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "===================onComplete ");

? ? }

});

打?。?/p>

===================onSubscribe

===================A 發(fā)送的事件 A1

===================B 發(fā)送的事件 B1

===================onNext A1B1

===================A 發(fā)送的事件 A2

===================B 發(fā)送的事件 B2

===================onNext A2B2

===================A 發(fā)送的事件 A3

===================B 發(fā)送的事件 B3

===================onNext A3B3

===================A 發(fā)送的事件 A4

===================B 發(fā)送的事件 B4

===================onNext A4B4

===================A 發(fā)送的事件 A5

===================B 發(fā)送的事件 B5

===================onNext A5B5

===================onComplete

說明:會將多個被觀察者合并,根據(jù)各個被觀察者發(fā)送事件的順序一個個結合起來,最終發(fā)送的事件數(shù)量會與源 Observable 中最少事件的數(shù)量一樣,上面代碼中有兩個 Observable,第一個發(fā)送事件的數(shù)量為5個,第二個發(fā)送事件的數(shù)量為6個??梢园l(fā)現(xiàn)最終接收到的事件數(shù)量是5,那么為什么第二個 Observable 沒有發(fā)送第6個事件呢?因為在這之前第一個 Observable 已經(jīng)發(fā)送了 onComplete 事件,所以第二個 Observable 不會再發(fā)送事件。

6、combineLatest() & combineLatestDelayError()

Observable.combineLatest(

Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)

? ? .map(new Function < Long, String > () {@Override

? ? public String apply(Long aLong) throws Exception {

? ? ? ? String s1 = "A" + aLong;

? ? ? ? Log.d(TAG, "===================A 發(fā)送的事件 " + s1);

? ? ? ? return s1;

? ? }

}),

Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)

? ? .map(new Function < Long, String > () {@Override

? ? public String apply(Long aLong) throws Exception {

? ? ? ? String s2 = "B" + aLong;

? ? ? ? Log.d(TAG, "===================B 發(fā)送的事件 " + s2);

? ? ? ? return s2;

? ? }

}),

new BiFunction < String, String, String > () {@Override

? ? public String apply(String s, String s2) throws Exception {

? ? ? ? String res = s + s2;

? ? ? ? return res;

? ? }

})

.subscribe(new Observer < String > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "===================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.d(TAG, "===================最終接收到的事件 " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "===================onComplete ");

? ? }

});

打印:

===================onSubscribe

===================A 發(fā)送的事件 A1

===================A 發(fā)送的事件 A2

===================B 發(fā)送的事件 B1

===================最終接收到的事件 A2B1

===================A 發(fā)送的事件 A3

===================最終接收到的事件 A3B1

===================A 發(fā)送的事件 A4

===================B 發(fā)送的事件 B2

===================最終接收到的事件 A4B1

===================最終接收到的事件 A4B2

===================B 發(fā)送的事件 B3

===================最終接收到的事件 A4B3

===================B 發(fā)送的事件 B4

===================最終接收到的事件 A4B4

===================B 發(fā)送的事件 B5

===================最終接收到的事件 A4B5

===================onComplete

說明:combineLatest() 的作用與 zip() 類似,但是 combineLatest() 發(fā)送事件的序列是與發(fā)送的時間線有關的,當 combineLatest() 中所有的 Observable 都發(fā)送了事件,只要其中有一個 Observable 發(fā)送事件,這個事件就會和其他 Observable 最近發(fā)送的事件結合起來發(fā)送,分析上面的代碼,Observable A 會每隔1秒就發(fā)送一次事件,Observable B 會隔2秒發(fā)送一次事件。當發(fā)送 A1 事件之后,因為 B 并沒有發(fā)送任何事件,所以根本不會發(fā)生結合。當 B 發(fā)送了 B1 事件之后,就會與 A 最近發(fā)送的事件 A2 結合成 A2B1,這樣只有后面一有被觀察者發(fā)送事件,這個事件就會與其他被觀察者最近發(fā)送的事件結合起來了。

因為 combineLatestDelayError() 就是多了延遲發(fā)送 onError() 功能,這里就不再贅述了。

7、reduce

Observable.just(3, 7, 10, 2)

.reduce(new BiFunction < Integer, Integer, Integer > () {

? ? @Override

? ? public Integer apply(Integer integer, Integer integer2) throws Exception {

? ? ? ? int res = integer + integer2;

? ? ? ? Log.d(TAG, "====================integer " + integer);

? ? ? ? Log.d(TAG, "====================integer2 " + integer2);

? ? ? ? Log.d(TAG, "====================res " + res);

? ? ? ? return res;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "==================accept " + integer);

? ? }

});

打?。?/p>

====================integer 3

====================integer2 7

====================res 10

====================integer 10

====================integer2 10

====================res 20

====================integer 20

====================integer2 2

====================res 22

==================accept 22

說明:與 scan() 操作符的作用也是將發(fā)送數(shù)據(jù)以一定邏輯聚合起來,這兩個的區(qū)別在于 scan() 每處理一次數(shù)據(jù)就會將事件發(fā)送給觀察者,而 reduce() 會將所有數(shù)據(jù)聚合在一起才會發(fā)送事件給觀察者。從結果可以看到,其實就是前2個數(shù)據(jù)聚合之后,然后再與后1個數(shù)據(jù)進行聚合,一直到?jīng)]有數(shù)據(jù)為止,把最終的結果通過觀察者一次輸出。

8、collect

Observable.just(1, 2, 3, 4)

.collect(new Callable < ArrayList < Integer >> () {

? ? @Override

? ? public ArrayList < Integer > call() throws Exception {

? ? ? ? return new ArrayList < > ();

? ? }

},

new BiConsumer < ArrayList < Integer > , Integer > () {

? ? @Override

? ? public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {

? ? ? ? integers.add(integer);

? ? }

})

.subscribe(new Consumer < ArrayList < Integer >> () {

? ? @Override

? ? public void accept(ArrayList < Integer > integers) throws Exception {

? ? ? ? Log.d(TAG, "===============accept " + integers);

? ? }

});

打?。?/p>

===============accept [1, 2, 3, 4]

說明:

將數(shù)據(jù)收集到數(shù)據(jù)結構當中。

9、startWith() & startWithArray()

Observable.just(5, 6, 7)

.startWithArray(2, 3, 4)

.startWith(1)

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "================accept " + integer);

? ? }

});

打?。?/p>

================accept 1

================accept 2

================accept 3

================accept 4

================accept 5

================accept 6

================accept 7

說明:在發(fā)送事件之前追加事件,startWith() 追加一個事件,startWithArray() 可以追加多個事件。追加的事件會先發(fā)出。

10、count

Observable.just(1, 2, 3)

.count()

.subscribe(new Consumer < Long > () {

? ? @Override

? ? public void accept(Long aLong) throws Exception {

? ? ? ? Log.d(TAG, "=======================aLong " + aLong);

? ? }

});

打?。?/p>

=======================aLong 3

說明:返回被觀察者發(fā)送事件的數(shù)量。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容