Android 帶大家進入RxJava的世界

Rx指的是ReactiveX,也是Reactive Extensions的縮寫,是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口,結合了觀察者模式、迭代器模式和函數(shù)式編程的精華,是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言

RxJava里最核心的兩個東西Observable和Subscriber,Observable指的是被觀察者、事件源,Subscriber指的是被觀察者、訂閱者、用戶。

舉個簡單的例子,像微信里面的服務號和公眾號,我們關注了某個服務號或公眾號,只要發(fā)出消息,我們都能同時收到。這里的服務號和公眾號就是Observable,Subscriber指的就是我們用戶。

看起來RxJava蠻像觀察者模式,不過有一點不同Observable如果沒有Subscriber不會發(fā)出任何事件

下面就讓我們一起進入RxJava的世界吧

參考資料學習網(wǎng)址

1、Grokking RxJava
2、github地址
3、ReactiveX/RxJava文檔中文版
4、ReactiveX官網(wǎng)
5、拋物線博客

配置環(huán)境

在builde.gradle里面添加

compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'

基本實現(xiàn)

1、Hello world

創(chuàng)建Observable對象,通過Observable.create()來創(chuàng)建,sub.onNext()可以發(fā)出信息,最后調(diào)用sub.onCompleted()來完成,還有一個sub.onError()方法是出現(xiàn)異常提供的一個方法,一旦調(diào)用了sub.onError()或者sub.onCompleted(),后面的邏輯代碼都不會執(zhí)行。

Observable<String> myObservable = Observable.create(
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> sub) {
                sub.onNext("Hello, world1!");
                sub.onNext("Hello, world2!");
                sub.onCompleted();
            }
        }
);

創(chuàng)建Subscriber對象,實例化Subscriber抽象類,實現(xiàn)onNext、onCompleted、onError三個方法,依次響應Observable對象里面的sub.onNext()、sub.onCompleted()和sub.onError()方法

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d("rxjava","onNext="+s); 
    }
    @Override
    public void onCompleted() { 
        Log.d("rxjava","onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.d("rxjava","onError="+e);
    }
};

最后Subscriber訂閱Observable,這里Observable可以被多個Subscriber訂閱

myObservable.subscribe(mySubscriber);
myObservable.subscribe(mySubscriber2);
myObservable.subscribe(mySubscriber3);

2、簡化代碼

上面的代碼可以轉(zhuǎn)變?yōu)檫@樣:

Observable<String> myObservable = Observable.just("Hello, world1!","Hello, world2!");
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable e) {
    }
};
Action0 onCompletedAction = new Action0() {
    @Override
    public void call() {
    }
};
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
myObservable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
myObservable.subscribe(onNextAction,onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
myObservable.subscribe(onNextAction,onErrorAction,onCompletedAction);

Observable.just("Hello, world1!","Hello, world2!")會直接創(chuàng)建Observable對象,
然后依次調(diào)用sub.onNext("Hello, world1!"); sub.onNext("Hello, world2!"); sub.onCompleted();

一般onErrorAction和onCompletedAction我們可以不用管理 最后代碼可優(yōu)化成:

Observable.just("Hello, world1!","Hello, world2!")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

java8語法lambda可以寫成

Observable.just("Hello, world!")
                .subscribe(s -> System.out.println(s));

3、操作符map

簡單的一個例子,傳遞一個字符串,然后獲取它的長度,最后打印出來,在RxJava里我們可以這樣實現(xiàn):

Observable.just("Hello, world!")
        .map(new Func1<String,Integer>(){
            @Override
            public Integer call(String s){
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>(){
            @Override
            public void call(Integer i){
                System.out.println(Integer.toString(i));
            }
        });

這里map方法需要結合Func1接口來使用,F(xiàn)unc1里第一個String類型是對應接收just類型,第二個Integer類型是返回值,供下一個對象接收

4、操作符from

如果我們要循環(huán)遍歷一個數(shù)組或者集合,在RxJava里面可以使用from操作符來實現(xiàn)

String[] items = { "0", "1", "2", "3", "4", "5" };
        Observable.from(items)
                .subscribe(new Action1<String>(){
                    @Override
                    public void call(String str){
                        //依次遍歷打印items
                        System.out.println(str);
                    }
                });

操作符from會循環(huán)遍歷方法參數(shù)里面的數(shù)組或者集合,然后依次調(diào)用onNext()方法

5、操作符flatMap

如果我們要實現(xiàn)多重循環(huán),可以使用flatMap操作符來實現(xiàn)

String[][] itemArray = {{"1","2","3"},{"4","5","6"},{"7","8","9"}};
Observable.from(itemArray)
        .flatMap(new Func1<String[],Observable<String>>(){
            @Override
            public Observable<String> call(String[] s){
                return Observable.from(s);
            }
        })
        .subscribe(new Subscriber<String>(){
            @Override
            public void onCompleted(){
            }
            @Override
            public void onError(Throwable e){
            }
            @Override
            public void onNext(String str){
                Log.d("rxJava","str="+str);
            }
        });

上面代碼執(zhí)行后,依次打印str=1到9,這里要注意的是Func1第二個參數(shù)類型是Observable<?>

6、操作符filter

字面理解應該就知道它的作用了,過濾不需要的數(shù)據(jù)

Observable.just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer item) {
            return( item < 4 );
        }
    })
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }
        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

上面代碼執(zhí)行結果,只會打印1到3,后面4和5不符合,則過濾

7、操作符merge

合并多個Observables

Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });

上面代碼執(zhí)行后結果:依次打印的是1、3、5、2、4、6

*  Javadoc: merge(Iterable)
*  Javadoc: merge(Iterable,int)
*  Javadoc: merge(Observable[])
*  Javadoc: merge(Observable,Observable) (接受二到九個Observable)

除了傳遞多個Observable給merge,你還可以傳遞一個Observable列表List,數(shù)組,甚至是一個發(fā)射Observable序列的Observable,merge將合并它們的輸出作為單個Observable的輸出

8、操作符timer和interval

5秒后打印數(shù)據(jù),第一個參數(shù)是多少時間執(zhí)行,第二個參數(shù)是時間單位

Observable.timer(5,TimeUnit.SECONDS)
        .subscribe(new Action1<Long>(){
            @Override
            public void call(Long aLong){
                Log.d("rxJava","5秒后執(zhí)行"+aLong);
            }
        });

1秒后每5秒循環(huán)打印,第一個參數(shù)為多少時間后執(zhí)行,第二個為沒多少時間執(zhí)行,第三個是時間單位

Observable.interval(1,5,TimeUnit.SECONDS)
        .subscribe(new Action1<Long>(){
            @Override
            public void call(Long aLong){
                Log.d("rxJava","1秒后每5秒循環(huán)執(zhí)行"+aLong);
            }
        });

這里aLong返回的值應該是打印的次數(shù)

9、線程切換

主要通過subscribeOn()和 observeOn()來實現(xiàn)
subscribeOn()是控制Observable.OnSubscribe的線程切換, subscribeOn()的線程切換發(fā)生在 OnSubscribe中,即在它通知上一級 OnSubscribe時,這時事件還沒有開始發(fā)送,因此控制可以從事件發(fā)出的開端就造成影響
observeOn()則是控制Subscriber的線程切換,observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber中,即發(fā)生在它即將給下一級 Subscriber發(fā)送事件時,因此控制的是它后面的線程
Schedulers.io():IO線程
AndroidSchedulers.mainThread():主線程
Schedulers.newThread():新開線程

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

10、防止按鈕重復點擊

1秒鐘內(nèi)只會執(zhí)行第一次點擊

RxView.clicks(view)
        .throttleFirst(1,TimeUnit.SECONDS)
        .subscribe(new Observer<Object>(){
            @Override
            public void onCompleted(){
            }
            @Override
            public void onError(Throwable e){
            }
            @Override
            public void onNext(Object o){
            }
        });

RxView是RxBinding庫里的對象,需要添加環(huán)境

compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

throttleFirst():在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件

11、RxBus代替EventBus

public class RxBus {
    // 主題
    private final Subject<Object, Object> bus;
    // PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
    private RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }
    public static RxBus get() {
        return RxBusHolder.sInstance;
    }
    public void unSubscribe(CompositeSubscription compositeSubscription){
        if (compositeSubscription != null && !compositeSubscription.isUnsubscribed())
            compositeSubscription.unsubscribe();
    }
    private static class RxBusHolder {
        private static final RxBus sInstance = new RxBus();
    }
    // 提供了一個新的事件
    public void post(Object o) {
        bus.onNext(o);
    }
    // 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
    public <T> Observable<T> toObserverable(Class<T> eventType) {
        return bus.ofType(eventType);
    }
}

RxBus提供了三個方法toObserverable()獲取Observable對象、unSubscribe()解除訂閱、post()發(fā)送數(shù)據(jù)。

在MainActivity中實現(xiàn)

    public class MainActivity extends AppCompatActivity {
        private CompositeSubscription allSubscription = new CompositeSubscription();
        @Override
        protected void onCreate(Bundle savedInstanceState){
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            ...
            initRxBus();
        }
        private void initRxBus(){
            //注冊訂閱RxBus并添加到CompositeSubscription,在onDestroy()里可以統(tǒng)一解除訂閱
            allSubscription.add(RxBus.get().toObserverable(OneEvent.class).subscribe(this::response));
        }
        @Override
        protected void onDestroy(){
            super.onDestroy();
            //解除訂閱
            RxBus.get().unSubscribe(allSubscription);
        }
        /**
         * RxBus響應接收數(shù)據(jù)方法
         * @param event
         */
        public void response(OneEvent event) {
            Toast.makeText(getApplicationContext(),event.msg,Toast.LENGTH_LONG).show();
        }
        class OneEvent {
            String msg;
            public OneEvent(String msg) {
                this.msg = msg;
            }
        }
    }

最后在需要發(fā)送數(shù)據(jù)的地方調(diào)用,跟EventBus還是滿相似的

RxBus.get().post(new OneEvent("hello bus"));

12、結合Retrofit使用
這里我就直接拿介紹Retrofit那篇文章的例子在這里說明下

@POST("query")
Observable<PostQueryInfo> searchRx(@Query("type") String type, @Query("postid") String postid);
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://www.kuaidi100.com/")
                 //添加數(shù)據(jù)解析ConverterFactory
                .addConverterFactory(GsonConverterFactory.create()) 
                 //添加RxJava
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())   
                .build();
        GitHubService apiService = retrofit.create(GitHubService.class);
        apiService.searchRx("yuantong","500379523313")
                //訪問網(wǎng)絡切換異步線程
                .subscribeOn(Schedulers.io())
                //響應結果處理切換成主線程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<PostQueryInfo>() {
                    @Override
                    public void onCompleted() {
                         //請求結束回調(diào)
                    }
                    @Override
                    public void onError(Throwable e) {
                         //錯誤回調(diào)
                        e.printStackTrace();
                    }
                    @Override
                    public void onNext(PostQueryInfo postQueryInfo) {
                         //成功結果返回
                        Log.e("APP",postQueryInfo.getNu());
                    }
                });

Retrofit支持RxJava可以返回Observable對象,所以我們就可以直接拿來用,很方便吧
.subscribeOn(Schedulers.io()請求網(wǎng)絡切換成IO線程
.observeOn(AndroidSchedulers.mainThread())切換成主線程更新UI
onNext(PostQueryInfo postQueryInfo)成功回調(diào)解析數(shù)據(jù)
public void onError(Throwable e)異常回調(diào)拋出異常

Tips

配置java8 lambda環(huán)境

1、在Module中的build.gradle中添加,然后Make Module 'xxx'

apply plugin: 'me.tatarka.retrolambda'
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'me.tatarka:gradle-retrolambda:3.2.5'
    }
}
repositories {
    mavenCentral()
}
android {
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
    ...
}

2、測試 下面代碼不報紅,配置環(huán)境OK

textView.setOnClickListener(v -> 
    Toast.makeText(getApplicationContext(),"lambda",Toast.LENGTH_LONG).show()
);

學到最后其實RxJava只是剛入了個門,還有很多很多需要我們?nèi)W習的,這里只是跟大家一起看了看RxJava的世界是什么樣子的,如果要學好,還是要多去看看api文檔,多練習才能真正的熟練使用它。

革命尚未成功,我們?nèi)孕枧?/strong>

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容