RxJava核心代碼解析

RxJava是一個非常好用的框架,它的作用是將一個復(fù)雜的業(yè)務(wù)邏輯,分拆成一個個小的功能模塊,然后將這些功能串起來,達到思路清晰的目的。

關(guān)于如何詳細使用rxjava,這里就不再介紹了,可以點擊 這里
進行查看,rxjava核心代碼很簡潔巧妙,本文主要介紹核心代碼實現(xiàn)。

我們先看一個例子。

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e(TAG, "call");
                subscriber.onNext("hello");
                subscriber.onNext("beauty");
                subscriber.onCompleted();
            }
        })
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {         
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext s: " + s);
            }
        });

05-31 14:36:06.469 29241-29241/? E/rx2: call
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: hello
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: beauty
05-31 14:36:06.470 29241-29241/? E/rx2: onCompleted

這個例子創(chuàng)建了一個被觀察者和一個訂閱者,被觀察者產(chǎn)生了兩個字符串,然后傳遞給了訂閱者。

我們看一下源碼

public class Observable<T> {
    //創(chuàng)建被觀察者,需要傳入訂閱事件f,這個是被觀察者和訂閱者之間的紐帶,我們就稱呼它為訂閱紐帶吧,避免混淆
    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        //hook里面什么都沒有做,只是將f返回
        return new Observable<T>(hook.onCreate(f));
    }
    //保存訂閱紐帶
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

    //訂閱方法
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //通知訂閱者已經(jīng)開始
        subscriber.onStart();
        try {
            //onSubscribe是訂閱紐帶,這里直接調(diào)用訂閱紐帶的call方法,并將訂閱者傳了過去,然后執(zhí)行subscriber.onNext("hello");方法,那么訂閱者也就收到了該事件
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            return Subscriptions.unsubscribed();
        }
    }
}

這個例子,從代碼實現(xiàn)層來看就是訂閱者訂閱被觀察者后,觸發(fā)訂閱紐帶的call方法,call方法產(chǎn)生數(shù)據(jù),再傳給訂閱者。

現(xiàn)在將例子改一下,要將產(chǎn)生的字符串后面加上“*”號,我們可以通過map方法

        //為了方便后面說明,我們稱create返回的被觀察者為原始被觀察者,這個OnSubscribe我們稱為原始訂閱紐帶
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e(TAG, "call");
                subscriber.onNext("hello");
                subscriber.onNext("beauty");
                subscriber.onCompleted();
            }
        })
        //map方法里面有map被觀察者,map訂閱紐帶,map訂閱者,見后面的代碼
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e(TAG, "map call s: " + s);
                return s + "*";
            }
        })
        //這個Subscriber我們稱為最終訂閱者
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext s: " + s);
            }
        });

打印結(jié)果如下
05-31 14:39:03.707 29692-29692/? E/rx2: call
05-31 14:39:03.707 29692-29692/? E/rx2: map call s: hello
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: hello*
05-31 14:39:03.708 29692-29692/? E/rx2: map call s: beauty
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: beauty*
05-31 14:39:03.708 29692-29692/? E/rx2: onCompleted
我們可以看到,在原始訂閱紐帶里面調(diào)用來subscriber.onNext("hello");后,會將字符串傳遞到map的call方法里面,然后再調(diào)用最終訂閱者的onNext方法

這個例子將一件事情分拆層兩個步驟去實現(xiàn),產(chǎn)生字符串,加工字符串。我們看一下具體是怎么做到的

這個例子跟上一個的差別是在中間加了map方法,我們看一下map方法的實現(xiàn)

public class Observable<T> {
      public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        //這里看一下OperatorMap類,func是要加工數(shù)據(jù)的方法
        return lift(new OperatorMap<T, R>(func));
    }

    //該方法返回的是被觀察者,相當(dāng)于新建了一個觀察者,為了方便說明,我們稱該返回的被觀察者是map被觀察者,訂閱紐帶為map訂閱紐帶
    //注意,下面的call方法是理解整個框架的關(guān)鍵,有點繞,冷靜下來分析一下
    //我們現(xiàn)在就來梳理這個例子的調(diào)用流程,在這之前,我們定義了幾個名稱:原始被觀察者、原始訂閱紐帶、map被觀察者、map訂閱紐帶、map訂閱者,最終訂閱者,先熟悉一下這幾個詞
    //1.在最后調(diào)用subscribe(new Subscriber<String>())的時候,也是就是執(zhí)行map被觀察者的subscribe方法時,會調(diào)用map訂閱紐帶的call方法,也就是下面lift里面call方法
    //2.調(diào)用hook.onLift(operator).call(o)得到的是map訂閱者,o是最終訂閱者,map訂閱者是最終訂閱者的代理,里面的onNext方法多了一層在轉(zhuǎn)換
    //3.onSubscribe是這個類的訂閱紐帶,也就是初始訂閱紐帶。調(diào)用onSubscribe.call(st)方法執(zhí)行初始訂閱者的call方法,也就是上面代碼塊的subscriber.onNext("hello")這里。
    //這里的subscriber訂閱者是map訂閱者,調(diào)用了它的call方法,在OperatorMap這個類里面,執(zhí)行的是里面的訂閱者的onNext(T t)方法,調(diào)用o.onNext(transformer.call(t));。
    //t就是傳進來的"hello",通過transformer進行轉(zhuǎn)換,這里的o是最終訂閱者,t是原始訂閱紐帶傳過來的"hello"字符串,這樣就完成了轉(zhuǎn)換,所以最終訂閱者o就收到來轉(zhuǎn)化后的數(shù)據(jù)
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                onSubscribe.call(st);
            }
        });
    }
}

public final class OperatorMap<T, R> implements Operator<R, T> {
    private final Func1<? super T, ? extends R> transformer;
    //加工數(shù)據(jù)的類,負責(zé)轉(zhuǎn)換數(shù)據(jù)
    public OperatorMap(Func1<? super T, ? extends R> transformer{
        this.transformer = transformer;
    }
    //為了方便說明,我們稱該方法返回的訂閱者是map訂閱者,從代碼來看,生成的這個訂閱者是最終訂閱者的代理,唯一不同的是多了一步transformer.call(t)的轉(zhuǎn)化
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o){
        return new Subscriber<T>(o) {
            @Override
            public void onCompleted() {
                o.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
            @Override
            public void onNext(T t) {
                //這里的o是最終訂閱者,t是原始訂閱紐帶傳過來的
                o.onNext(transformer.call(t));
            }
        };
    }
}

這里有一點繞,我畫了一張圖來梳理這個關(guān)系
rxjava調(diào)用關(guān)系

上面的說明再結(jié)合這張圖,關(guān)系就能梳理清楚了

總結(jié)一下,調(diào)用subscribe訂閱時,觸發(fā)map被觀察者的call方法,調(diào)用onSubscribe.call(st),st又是最終訂閱者的代理(該代理獲取到數(shù)據(jù)后加工數(shù)據(jù)),onSubscribe是原始訂閱紐帶,觸發(fā)產(chǎn)生數(shù)據(jù)的onNext方法,然后將數(shù)據(jù)傳遞給map訂閱者,加工完數(shù)據(jù)后,再傳遞給最終訂閱者的onNext方法

每次添加一個map方法時,在原先的連接關(guān)系中間插入了一層關(guān)系,對上一層的被觀察者來說,插入了一個訂閱者;對下一層對訂閱者而言,插入了一個被觀察者,通過這種方式,又將原先對鏈條連接上了。

對于map方法是這個邏輯,對應(yīng)其它的方法,也是類似的原理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善閱讀 3,647評論 0 0
  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 1,029評論 0 2
  • 當(dāng)你主動去做的時候,會促進你的思考。把任何一件事,放到整體去看。 舉個例子:當(dāng)你脖子有點疼痛,供血不足,導(dǎo)致頭暈。...
    持續(xù)行動派閱讀 186評論 0 0
  • 緣起 先看一段jsx代碼 其中有這么一段話: The Point#set() functions are also...
    清水蘆葦閱讀 998評論 0 0
  • 2016好像一眨眼就過去了,15年圣誕節(jié)的時候小火車群里除了旺爺之外我們六個都是單身狗, 我還記得15年圣誕節(jié)的時...
    Wan9rui閱讀 197評論 0 0

友情鏈接更多精彩內(nèi)容