1.什么是RxJava(ReactiveX.io鏈?zhǔn)骄幊蹋?/h1>
RXJava是一個響應(yīng)式編程框架,采用觀察者設(shè)計(jì)模式,
觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機(jī)制
概述:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫
優(yōu)點(diǎn):異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性,因?yàn)樵谡{(diào)度過程比較復(fù)雜的
情況下,異步代碼經(jīng)常會既難寫也難被讀懂。Android 創(chuàng)造的AsyncTask和
Handler,其實(shí)都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔,但它
的簡潔的與眾不同之處在于,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡
潔。(函數(shù)風(fēng)格、代碼簡單、異步錯誤處理、輕松使用并發(fā))
2.觀察者模式
被觀察者
/**
* 被觀察者
*/
public class Watched{
private List<Watcher > list = new ArrayList<>();
//注冊觀察者
@Override
public void registerWatcher(Watcher watcher) {
list.add(watcher);
}
//移除觀察者
@Override
public void unregisterWatcher(Watcher watcher) {
list.remove(watcher);
}
//清空觀察者
@Override
public void clearWatcher() {
list.clear();
}
//通知觀察者
@Override
public void notifyWathers(String string) {
for (Watcher watcher: list ) {
watcher.update(string);
}
}
}
觀察者
/**
* 觀察者
*/
public class Watcher {
//用于觀察者更新狀態(tài)
@Override
public void update(String string) {
System.out.println(Thread.currentThread().toString() + " : " + string);
}
}
測試類
/**
* 測試類
*/
public class MyClass {
public static void main(String[] args){
//觀察者
Watcher watcher1 = new Watcher();
Watcher watcher2 = new Watcher();
Watcher watcher3 = new Watcher();
//被觀察者
Watched watched = new Watched();
//被觀察者注冊觀察者
watched.registerWatcher(watcher1);
watched.registerWatcher(watcher2);
watched.registerWatcher(watcher3);
//通知
watched.notifyWathers("接收的數(shù)");
//清空
watched.clearWatcher();
}
}
3.基本概念(觀察者模式)
案例:按鈕點(diǎn)擊處理、廣播注冊
通過setOnClickListener()方法,Button持有OnClickListener的引用;當(dāng)用戶
擊時(shí),Button自動調(diào)用OnClickListener的onClick()方法。
Button——>被觀察者
OnClickListener——>觀察者
setOnClickListener ——>訂閱
onClick ——>事件
RxJava 有3個基本概念:
1.Observable(可觀察者,即被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。
觀察者模式
Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而
Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。
普通事件
onNext() 接收被觀察者發(fā)送的消息
特殊的事件:
onCompleted() 事件隊(duì)列完結(jié)
onError () 事件隊(duì)列異常
注意:
1)RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列。
2)RxJava 規(guī)定,onNext() 接收被觀察者發(fā)送的消息、可以執(zhí)行多次;當(dāng)不
會再有新的 onNext () 發(fā)出時(shí),需要觸發(fā) onCompleted () 方法作為標(biāo)志。
onError():事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會被觸發(fā),同
時(shí)隊(duì)列自動終止,不允許再有事件發(fā)出。
3)在一個正確運(yùn)行的事件序列中, onCompleted() 和 onError () 有且只有一個,
并且是事件序列中的最后一個。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在隊(duì)列中調(diào)
用了其中一個,就不應(yīng)該再調(diào)用另一個。
3.調(diào)度器
RxJava中調(diào)度器設(shè)置方法
subscribeOn():或者叫做事件產(chǎn)生的線程。
指定 subscribe()所發(fā)生的線程,
即 Observable.OnSubscribe 被激活時(shí)所處的線程。
observeOn():或者叫做事件消費(fèi)的線程。
指定 Subscriber所運(yùn)行在的線程。
幾種調(diào)度器
在RxJava 中Scheduler——調(diào)度器,相當(dāng)于線程控制器,
RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。
RxJava 已經(jīng)內(nèi)置了幾個Scheduler,它們已經(jīng)適合大多數(shù)的使用場景:
1:Schedulers.immediate():直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的Scheduler
2:Schedulers.newThread():總是啟用新線程,并在新線程執(zhí)行操作。
3:Schedulers.io():I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
行為模式和 newThread()差不多區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線
程池可以重用空閑的線程,因此多數(shù)情況下 io()比 newThread()更有效率。不要把計(jì)算
工作放在 io()中可以避免創(chuàng)建不必要的線程。
4:Schedulers.computation():計(jì)算所使用的 Scheduler這個計(jì)算指的是 CPU密集型計(jì)算,
即不會被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個 Scheduler使用的固定
的線程池,大小為 CPU核數(shù)。不要把 I/O 操作放在computation()中,否則 I/O 操作
的等待時(shí)間會浪費(fèi)CPU。
5.AndroidSchedulers.mainThread():Android 還有一個專用的
它指定的操作將在 Android主線程運(yùn)行。有了這幾個 Scheduler,就可以使用
subscribeOn()和 observeOn()兩個方法來對線程進(jìn)行控制了。
4.依賴庫
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//轉(zhuǎn)換器,請求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 庫
5.簡單使用
public static void baseRx(){
//創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//關(guān)閉線程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失敗
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//被觀察者訂閱觀察者
observable.subscribe(observer);
//線程切換
observable
//被訂閱者在子線程中
.subscribeOn(Schedulers.io())
//訂閱者在主線程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//觀察中可以重復(fù)指定線程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
6.Android功能使用
private void rxAndroidBean() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<Bean> call = myServer.getDate2();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bean responseBody) {
Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
7.其他操作符使用
//遍歷輸出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//數(shù)組合并輸出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范圍輸出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//過濾輸出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定時(shí)器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//數(shù)組轉(zhuǎn)換
public static void rxMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer+"abc";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: "+s );
}
});
}
//一個對象轉(zhuǎn)換為一組對象
public static void rxFlatMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
String[] strs = new String[3];
for (int i =0;i<strs.length;i++){
strs[i] = integer + strs[i];
}
return Observable.fromArray(strs);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
//Observable壓縮合并
public static void rxZip(){
Integer[] a= {1,2,3};
Integer[] b={4,5,6};
Observable<Integer> observableA = Observable.fromArray(a);
Observable<Integer> observableB = Observable.fromArray(b);
Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + ":" + integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
//合并
public static void rxMerge(){
Integer[] a ={1,2,3};
String[] b = {"abc","aaa","bbb"};
char[] c = {'a','b','c'};
Observable<Integer> A = Observable.fromArray(a);
Observable<String> B = Observable.fromArray(b);
Observable<char[]> C = Observable.fromArray(c);
Observable
.merge(A,B,C)
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.e(TAG, "accept: ."+serializable );
}
});
}
8.RxAndroid好處
用途
是一個實(shí)現(xiàn)異步操作的庫,具有簡潔的鏈?zhǔn)酱a,提供強(qiáng)大的數(shù)據(jù)變換。
優(yōu)勢
異步好簡單、代碼好簡潔,一個簡單、一個簡潔,這就意味著工作效率。
subscribeOn只能定義一次,除非是在定義doOnSubscribe
observeOn可以定義多次,決定后續(xù)代碼所在的線程
9.RxJava:好處
使用Rxjava的好處在于,我們可以方便的切換方法的執(zhí)行線程,對線程動態(tài)
切換,該過程無需我們自己手動創(chuàng)建和啟動線程。使用Rxjava創(chuàng)建的代碼雖然
出現(xiàn)在同一個線程中,但是我們可以設(shè)置使得不同方法在不同線程中執(zhí)行。上述
功能的實(shí)現(xiàn)主要?dú)w功于RxJava的Scheduler實(shí)現(xiàn),Scheduler 提供了『后臺處
理,前臺回調(diào)』的異步機(jī)制。
RXJava是一個響應(yīng)式編程框架,采用觀察者設(shè)計(jì)模式,
觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機(jī)制
概述:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫
優(yōu)點(diǎn):異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性,因?yàn)樵谡{(diào)度過程比較復(fù)雜的
情況下,異步代碼經(jīng)常會既難寫也難被讀懂。Android 創(chuàng)造的AsyncTask和
Handler,其實(shí)都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔,但它
的簡潔的與眾不同之處在于,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡
潔。(函數(shù)風(fēng)格、代碼簡單、異步錯誤處理、輕松使用并發(fā))
/**
* 被觀察者
*/
public class Watched{
private List<Watcher > list = new ArrayList<>();
//注冊觀察者
@Override
public void registerWatcher(Watcher watcher) {
list.add(watcher);
}
//移除觀察者
@Override
public void unregisterWatcher(Watcher watcher) {
list.remove(watcher);
}
//清空觀察者
@Override
public void clearWatcher() {
list.clear();
}
//通知觀察者
@Override
public void notifyWathers(String string) {
for (Watcher watcher: list ) {
watcher.update(string);
}
}
}
/**
* 觀察者
*/
public class Watcher {
//用于觀察者更新狀態(tài)
@Override
public void update(String string) {
System.out.println(Thread.currentThread().toString() + " : " + string);
}
}
/**
* 測試類
*/
public class MyClass {
public static void main(String[] args){
//觀察者
Watcher watcher1 = new Watcher();
Watcher watcher2 = new Watcher();
Watcher watcher3 = new Watcher();
//被觀察者
Watched watched = new Watched();
//被觀察者注冊觀察者
watched.registerWatcher(watcher1);
watched.registerWatcher(watcher2);
watched.registerWatcher(watcher3);
//通知
watched.notifyWathers("接收的數(shù)");
//清空
watched.clearWatcher();
}
}
通過setOnClickListener()方法,Button持有OnClickListener的引用;當(dāng)用戶
擊時(shí),Button自動調(diào)用OnClickListener的onClick()方法。
Button——>被觀察者
OnClickListener——>觀察者
setOnClickListener ——>訂閱
onClick ——>事件
1.Observable(可觀察者,即被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。
Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而
Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。
普通事件
onNext() 接收被觀察者發(fā)送的消息
特殊的事件:
onCompleted() 事件隊(duì)列完結(jié)
onError () 事件隊(duì)列異常
1)RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列。
2)RxJava 規(guī)定,onNext() 接收被觀察者發(fā)送的消息、可以執(zhí)行多次;當(dāng)不
會再有新的 onNext () 發(fā)出時(shí),需要觸發(fā) onCompleted () 方法作為標(biāo)志。
onError():事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會被觸發(fā),同
時(shí)隊(duì)列自動終止,不允許再有事件發(fā)出。
3)在一個正確運(yùn)行的事件序列中, onCompleted() 和 onError () 有且只有一個,
并且是事件序列中的最后一個。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在隊(duì)列中調(diào)
用了其中一個,就不應(yīng)該再調(diào)用另一個。
subscribeOn():或者叫做事件產(chǎn)生的線程。
指定 subscribe()所發(fā)生的線程,
即 Observable.OnSubscribe 被激活時(shí)所處的線程。
observeOn():或者叫做事件消費(fèi)的線程。
指定 Subscriber所運(yùn)行在的線程。
在RxJava 中Scheduler——調(diào)度器,相當(dāng)于線程控制器,
RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。
RxJava 已經(jīng)內(nèi)置了幾個Scheduler,它們已經(jīng)適合大多數(shù)的使用場景:
1:Schedulers.immediate():直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的Scheduler
2:Schedulers.newThread():總是啟用新線程,并在新線程執(zhí)行操作。
3:Schedulers.io():I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
行為模式和 newThread()差不多區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線
程池可以重用空閑的線程,因此多數(shù)情況下 io()比 newThread()更有效率。不要把計(jì)算
工作放在 io()中可以避免創(chuàng)建不必要的線程。
4:Schedulers.computation():計(jì)算所使用的 Scheduler這個計(jì)算指的是 CPU密集型計(jì)算,
即不會被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個 Scheduler使用的固定
的線程池,大小為 CPU核數(shù)。不要把 I/O 操作放在computation()中,否則 I/O 操作
的等待時(shí)間會浪費(fèi)CPU。
5.AndroidSchedulers.mainThread():Android 還有一個專用的
它指定的操作將在 Android主線程運(yùn)行。有了這幾個 Scheduler,就可以使用
subscribeOn()和 observeOn()兩個方法來對線程進(jìn)行控制了。
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//轉(zhuǎn)換器,請求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 庫
public static void baseRx(){
//創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//關(guān)閉線程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失敗
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//被觀察者訂閱觀察者
observable.subscribe(observer);
//線程切換
observable
//被訂閱者在子線程中
.subscribeOn(Schedulers.io())
//訂閱者在主線程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//觀察中可以重復(fù)指定線程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
private void rxAndroidBean() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(MyServer.Url)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
MyServer myServer = retrofit.create(MyServer.class);
Observable<Bean> call = myServer.getDate2();
call.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bean responseBody) {
Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
//遍歷輸出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//數(shù)組合并輸出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范圍輸出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//過濾輸出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定時(shí)器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//數(shù)組轉(zhuǎn)換
public static void rxMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer+"abc";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: "+s );
}
});
}
//一個對象轉(zhuǎn)換為一組對象
public static void rxFlatMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
String[] strs = new String[3];
for (int i =0;i<strs.length;i++){
strs[i] = integer + strs[i];
}
return Observable.fromArray(strs);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
//Observable壓縮合并
public static void rxZip(){
Integer[] a= {1,2,3};
Integer[] b={4,5,6};
Observable<Integer> observableA = Observable.fromArray(a);
Observable<Integer> observableB = Observable.fromArray(b);
Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + ":" + integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}
//合并
public static void rxMerge(){
Integer[] a ={1,2,3};
String[] b = {"abc","aaa","bbb"};
char[] c = {'a','b','c'};
Observable<Integer> A = Observable.fromArray(a);
Observable<String> B = Observable.fromArray(b);
Observable<char[]> C = Observable.fromArray(c);
Observable
.merge(A,B,C)
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.e(TAG, "accept: ."+serializable );
}
});
}
用途
是一個實(shí)現(xiàn)異步操作的庫,具有簡潔的鏈?zhǔn)酱a,提供強(qiáng)大的數(shù)據(jù)變換。
優(yōu)勢
異步好簡單、代碼好簡潔,一個簡單、一個簡潔,這就意味著工作效率。
subscribeOn只能定義一次,除非是在定義doOnSubscribe
observeOn可以定義多次,決定后續(xù)代碼所在的線程
使用Rxjava的好處在于,我們可以方便的切換方法的執(zhí)行線程,對線程動態(tài)
切換,該過程無需我們自己手動創(chuàng)建和啟動線程。使用Rxjava創(chuàng)建的代碼雖然
出現(xiàn)在同一個線程中,但是我們可以設(shè)置使得不同方法在不同線程中執(zhí)行。上述
功能的實(shí)現(xiàn)主要?dú)w功于RxJava的Scheduler實(shí)現(xiàn),Scheduler 提供了『后臺處
理,前臺回調(diào)』的異步機(jī)制。