RxJava使用簡史(一)RxJava1的回顧

前言

使用RxJava 已經(jīng)有一年多的時間,從 RxJava 1.0 +Retrofit 2.0 網(wǎng)絡(luò)請求框架,包括常用的 RxBus,RxPermission 等,到后面的RxJava2.0+Retrofit2.0。然而始終沒有認真細致的去歸納總結(jié),基于這個緣由,本文將由淺入深,由Rx的基本用法,到RxJava和Retrofit的封裝,到Rx的源碼分析,做一次系統(tǒng)的學(xué)習(xí)。

https://github.com/ReactiveX/RxJava

本文將分為三個方面闡述:

  • RxJava 是什么
  • RxJava 基本用法
  • RxJava 的用途

一:RxJava的認識

想要用一句話總結(jié)RxJava,或許只有作者的那句話才能精準概括了:RxJava實現(xiàn)了jvm的響應(yīng)式擴展,通過觀察序列來實現(xiàn)異步和基于事件的庫。
如果要加上更細致的闡述那就是:它擴展了觀察者模式,支持數(shù)據(jù)或者事件的序列,添加了一系列的操作符,允許你使用聲明的方式組合序列,同時處理了線程、同步、線程安全、和并發(fā)的問題。

RxJava 依賴代碼

compile "io.reactivex:rxjava:1.2.3"
compile "io.reactivex:rxandroid:1.2.1"

既然是觀察者模式,那么就必然有觀察者和被觀察者,在RxJava中,有兩個類需要搞清楚

Observable :被觀察者,Observer :觀察者。
二者通過 subscribe() 方法來實現(xiàn)訂閱的關(guān)系,這樣一來 Observable 可以在需要的時候發(fā)送事件通知 Observer。

Observable 的創(chuàng)建

    //創(chuàng)建一個 Observable (被觀察者)
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hi");
            subscriber.onCompleted();
        }
    });

創(chuàng)建 Observable 的時候,需要傳遞一個 OnSubscribe 對象 ,這個對象的指定了一個回調(diào),當訂閱事件發(fā)生的時候,call 方法里面的內(nèi)容就會被調(diào)用。在回調(diào)的方法中 可以看到 Subscriber 對象,這個對象實現(xiàn)了 Observer 和 Subscription 接口,事實上,它是 Observer(觀察者) 的升級版。

而 Observable 創(chuàng)建的時候做了什么呢?

 public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

RxJavaHooks 的 onCreate() 返回了一個 OnSubscribe 對象,OnSubscribe 是 Action1 的子類,是一個只有一個 call() 方法的接口。RxJavaHooks 的內(nèi)部,則調(diào)用了 RxJavaHooks 的一個接口 Func1 的 call()方法的回調(diào)。

Observer 的創(chuàng)建

    //創(chuàng)建一個 Observer (觀察者)
    Observer observer = new Observer() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(Object o) {
        }
    };

或者可以這樣寫:

     Subscriber subscriber = new Subscriber() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(Object o) {
        }
    };

Observer只是一個接口,他們的作用是一樣的,區(qū)別在于 Subscriber 比 Observer 多了兩個方法 onStart() 和 unsubscribe(), 前者是在事件還沒有發(fā)送之前調(diào)用的,可以進行一些準備的動作,后者則是用于取消訂閱,在發(fā)生訂閱行為的時候,被觀察者會持有觀察者的引用,如果在頁面銷毀或者view銷毀或者其他情況下沒有進行解綁,則會發(fā)生內(nèi)存泄漏。

訂閱

  //訂閱
    observable.subscribe(observer);

這里有個奇怪的現(xiàn)象,你會發(fā)現(xiàn)是被觀察者訂閱了觀察者。之所以這樣實現(xiàn),是為了更好的設(shè)計。
而訂閱行為到底干了什么,為什么要傳遞進去一個接口呢?看下源碼

public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    if (observer == null) {
        throw new NullPointerException("observer is null");
    }
    return subscribe(new ObserverSubscriber<T>(observer));
}

重點看下 observable 的 subscribe 方法,他需要傳遞一個 ObserverSubscriber的對象 ,它Subscriber 的子類,而且提供了一個構(gòu)造函數(shù)來傳遞 observer。
在真正的 subscribe 方法中,我們可以看到這么一句話:

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("subscriber can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }

    // new Subscriber so onStart it
    subscriber.onStart();

    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would
    // add a significant depth to already huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // in case the subscriber can't listen to exceptions anymore
        if (subscriber.isUnsubscribed()) {
            RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
        } else {
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
        }
        return Subscriptions.unsubscribed();
    }
}

subscriber.onStart(); 在 observable 中 調(diào)用了 subscriber 的 onStart()方法,
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 這個方法,其實就是OnSubscribe對call 方法的調(diào)用。

經(jīng)過上面的三個步驟,就完成了一次RxJava的基本使用。然而事實上,不管是 observable 的創(chuàng)建或者是 observer 或者 Subscriber ,再或者綁定的使用,都有很多種方法可以選擇,事實上,要根據(jù)業(yè)務(wù)邏輯或者不同的場景來選擇使用。比如:

  Observable<String> just = Observable.just( "are", "u", "ok");
    just.subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
            Log.i(TAG, s);
        }
    });

通過just 將傳入的參數(shù)依次的發(fā)送出來,這樣,在 Subscriber 的回調(diào)中(onNext),就可以依次的打印出那幾個字符串。
假定有一個數(shù)據(jù)源,我需要依次發(fā)送數(shù)據(jù)源里面的數(shù)據(jù),from方法則會很恰當,就像這樣:

    String[]  array = new String[]{"are","u","ok"};
    Observable<String> observableStr = Observable.from(array);
    observableStr.subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
            
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }
    });

同樣的,Subscribe() 方法也可以通過Action來實現(xiàn)某些不完全的定義,比如這樣:

    Action1<String> action1 = new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, s);
        }
    };
    observableStr.subscribe(action1);

Scheduler 調(diào)度器

通過 Schduler 調(diào)度器來指定某段代碼運行在哪個線程之中,因為在日常開發(fā)中,我們總是會將耗時的請求放在后臺,通過回調(diào)在前臺更新ui,基于這些重要的線程控制,Scheduler 提供了一系列的方法來進行線程切換。諸如:

    //直接運行,即在當前的線程運行
    Schedulers.immediate();
    //計算的線程,主要用于事件輪詢,回調(diào)的處理等。不能進行耗時的操作。
    Schedulers.computation();
    //開啟一個新的線程,并且運行在新線程
    Schedulers.newThread();
    // 讀寫、網(wǎng)絡(luò)、等耗時操作
    Schedulers.io();
    //Android 主線程 
    AndroidSchedulers.mainThread();

Observable 提供了一些方法來指定我們需要的線程:

observableStr.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

subscribeOn() 方法指定了subscribe() 發(fā)生在IO線程,observeOn() 則指定了其回調(diào)發(fā)生在主線程。準確的說前者是指定事件產(chǎn)生的線程,后者是指定事件消費的線程。

變換

所謂變換,即把事件序列 中的對象或者整個序列進行變換,得到我想要的事件或者序列。通過一個簡單的例子就可以看出來轉(zhuǎn)換的強大之處了。
比如說我想加載指定路徑下的ImageView

    String mImgPath = "...";
    Observable.just(mImgPath)
            .map(new Func1<String, Bitmap>() {
                @Override
                public Bitmap call(String s) {
                    Bitmap bitmap = BitmapFactory.decodeFile(s);
                    return bitmap;
                }
            })
            .subscribe(new Action1<Bitmap>() {
                @Override
                public void call(Bitmap bitmap) {
                    imageView.setImageBitmap(bitmap);
                }
            });

map()方法需要一個 Func1 的回調(diào) ,F(xiàn)unc1是一個簡單的接口,和 Action 不同的地方在于,它是有返回值的。這就達到了轉(zhuǎn)換的目的。我們看到,我們發(fā)送事件的只是一個字符串,通過轉(zhuǎn)換,在我們需要Bitmap的時候,在map() 中把字符串做了轉(zhuǎn)換。

除此之外,還有一個轉(zhuǎn)換的方法 flatMap(),與map()方法不同的地方在于,flatMap中返回的是一個 Observable 對象。通過 flatMap 將事件進行了拆分,可以進行更加細致和精準的處理:
假如有這個一個類,在發(fā)起事件的時候傳遞整個類的對象,最后我想要的是String content,這種情況使用flatMap 就更加方便了。

public class DataBean implements Serializable {
public Data data;
public String message;
public int code;

class Data {
    public String title;
    public Content content;
}

class Content {
    public String content;
}

}

final List<DataBean> beanList = new ArrayList<>();
    Subscriber<String> subscriber1 = new Subscriber<String>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(String s) {
        }
    };

    Observable.from(beanList)
            .flatMap(new Func1<DataBean, Observable<DataBean.Content>>() {
                @Override
                public Observable<DataBean.Content> call(DataBean bean) {
                    return Observable.just(bean.data.content);
                }
            })
            .flatMap(new Func1<DataBean.Content, Observable<String>>() {
                @Override
                public Observable<String> call(DataBean.Content content) {
                    return Observable.just(content.content);
                }
            })
            .subscribe(subscriber1);

我們會發(fā)現(xiàn) map 是一對一的關(guān)系,而 flatMap 是一對多的關(guān)系,通過 flatmap 可以實現(xiàn)很強大的功能。

小示例

假如我們需要把一個Bitmap格式的圖片保存為png或者jpg文件 ,假如使用RxJava,結(jié)合上面的線程調(diào)度器,我們該怎么寫呢?直接上代碼吧。

 Observable.create(new SaveObservable(bitmap))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SaveSubscriber());


    //Observable
 private class SaveObservable implements Observable.OnSubscribe<String> {
    private Bitmap drawingCache = null;

    public SaveObservable(Bitmap drawingCache) {
        this.drawingCache = drawingCache;
    }

    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (drawingCache == null) {
            subscriber.onError(new NullPointerException("獲取圖片失敗"));
        } else {
            try {
                File imageFile = new File(imagePath,imageName);
                FileOutputStream outStream = new FileOutputStream(imageFile);
                drawingCache.compress(Bitmap.CompressFormat.JPEG, 100, outStream);
                subscriber.onNext(Environment.getExternalStorageDirectory() + "/store");
                subscriber.onCompleted();
                outStream.flush();
                outStream.close();
            } catch (IOException e) {
                subscriber.onError(e);
            }
        }
    }
}

//Subscriber
private class SaveSubscriber extends Subscriber<String> {

    @Override
    public void onCompleted() {
      //  Toast.makeText(context, "保存成功", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onError(Throwable e) {
        Log.i(getClass().getSimpleName(), e.toString());
        Toast.makeText(context, "保存失敗:" + e.toString(), Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onNext(String s) {
        Toast.makeText(context, "保存至" + s, Toast.LENGTH_SHORT).show();
    }
}

這樣以來,我們的代碼看起來非常的清爽,而且邏輯清晰。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容