什么是RxJava(ReactiveX.io鏈式編程)
定義:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。
總結(jié):RxJava 是一個 基于事件流、實現(xiàn)異步操作的庫
理解:RXJava是一個響應(yīng)式編程框架 ,采用觀察者設(shè)計模式,觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制優(yōu)點:
由于 RxJava是基于事件流的鏈式調(diào)用,所以使得 RxJava:
邏輯簡潔
實現(xiàn)優(yōu)雅
使用簡單
- 作用:
實現(xiàn)異步操作
類似于 Android中的 AsyncTask 、Handler作用
RxJava 有3個基本概念及原理
1.Observable(被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。
注意
1)RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。
2)RxJava 規(guī)定,onNext() 接收被觀察者發(fā)送的消息、可以執(zhí)行多次;當不會再有新的 onNext () 發(fā)出時,需要觸發(fā) onCompleted () 方法作為標志。onError():事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
3)在一個正確運行的事件序列中, onCompleted() 和 onError () 有且只有一個,并且是事件序列中的最后一個。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。
依賴庫
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//轉(zhuǎn)換器,請求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用
簡單使用
public static void baseRx(){
//1.創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//2.創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//關(guān)閉線程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失敗
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//3.被觀察者訂閱觀察者
observable.subscribe(observer);
//線程切換
observable
//被訂閱者在子線程中
.subscribeOn(Schedulers.io())
//訂閱者在主線程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//觀察中可以重復(fù)指定線程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
Android功能使用
final Retrofit homeRetrofit = new Retrofit.Builder()
.baseUrl(ApiServer.homeUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
final ApiServer server = homeRetrofit.create(ApiServer.class);
final Observable<HomeBean> home = server.getHome("" + count);
home.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HomeBean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(HomeBean homeBean) {
List<HomeBean.ResultsBean> results = homeBean.getResults();
homeList.addAll(results);
srl.finishRefresh();
srl.finishLoadMore();
adapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
Log.e("TAG", "onError()" + e.getMessage());
}
@Override
public void onComplete() {
}
});
其他操作符使用( 查看操作符)
- 創(chuàng)建操作符
//遍歷輸出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
// Observable.fromArray(1,2,3,4)
//Observable.fromArray("a","b","c")
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//數(shù)組合并輸出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范圍輸出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定時器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//閃屏
private void rxjavaInterval() {
final Long time = 5L;
subscribe = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("TAG", "倒計時:" + aLong);
if (aLong < time && !subscribe.isDisposed()) {
tv.setText("記錄改變生活" + (time - aLong - 1));
} else {
Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
startActivity(intent);
finish();
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscribe.dispose();
subscribe = null;
}
- 過濾操作符
//過濾輸出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
- 變換操作符
①Map:通過指定一個Fun函數(shù)將Observeble轉(zhuǎn)換成一個新的Observable對象并發(fā)射,觀察者收到新的observable處理。
public static void rxMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer+"abc";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: "+s );
}
});
}