手寫RxJava簡易框架領(lǐng)悟RxJava的美秒

RxJava筆記

前言

看此篇之前最好知道RxJava的使用。由于RxJava內(nèi)部源碼實(shí)現(xiàn)有點(diǎn)復(fù)雜,既然用拆輪子的方式來分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除,只留下核心代碼,加上我個人的理解,帶大家揭秘RxJava的實(shí)現(xiàn)原理(本文不涉及框架的使用介紹)。

一、構(gòu)建觀察者類

Subsribler在RxJava里面是一個抽象類,它實(shí)現(xiàn)了Observer接口。

public interface Observer<T> {

    void onCompleted();

    void onError(Throwable throwable);

    void onNext(T value);
}

public abstract class Subscriber<T> implements Observer<T>{

    public void onStart(){

    }
}

二、構(gòu)建被觀察者

Observable(被觀察者)擁有很多工廠方法和各式各樣的操作符。每個Observable里面都維護(hù)了一個OnSubscribe對象,并通過subscribe()里面的call(Subscriber<? super T> subscriber)方法與觀察者產(chǎn)生聯(lián)系。

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    private  Observable(OnSubscribe<T> onSubscribe){
        this.onSubscribe = onSubscribe;
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){
        return new Observable<T>(onSubscribe);
    }

    public void subscribe(Subscriber<T> subscriber){
        subscriber.onStart();
        onSubscribe.call(subscriber);
    }

    public interface OnSubscribe<T>{
        void call(Subscriber<? super T> subscriber);
    }
}

三、RxJava的事件流雛形產(chǎn)生

通過上面寫的觀察者和被觀察者,即可寫出一個沒有操作符和線程切換功能的簡易版Rxjava。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Result: "+value);
            }
        });

通過Observable.create將OnSubscribe的匿名類傳給Observable,在subscribe()時回調(diào)OnSubscribe接口中的call方法,同時call方法參數(shù)即為subscribe的參數(shù),即觀察者,因此繼續(xù)回調(diào)subscriber.onNext()即可完成觀察者里的邏輯。

結(jié)果如下:

image.png

四、玩轉(zhuǎn)RxJava里的操作符

RxJava之所以強(qiáng)大好用,與其擁有豐富靈活的操作符是分不開的。那么我們就試著為這個框架添加一個最常用的操作符:map。先看代碼:

    public <R> Observable<R> map(final Fun1<T, R> transformer){
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> subscriber) {
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onNext(T value) {
                        subscriber.onNext(transformer.transfer(value));
                    }
                });
            }
        });
    }

    public interface Fun1<T, R>{
        R transfer(T from);
    }

測試代碼

 Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
             @Override
             public String transfer(Integer from) {
                 return String.valueOf(from)+"_Map";
             }
         }
        ).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+value);
            }
        });

結(jié)果如下:


image.png
  • 其實(shí)RxJava每調(diào)用一次操作符的方法,就相當(dāng)于在上層數(shù)據(jù)源和下層觀察者之間橋接了一個新的Observable。橋接的Observable內(nèi)部會實(shí)例化新的OnSuscribe和Subscriber。

  • 新建的OnSuscribe的call方法負(fù)責(zé)持有目標(biāo)Subscriber,此時就可以回調(diào)subscriber的方法來完成觀察的行為了。但是這是還沒有數(shù)據(jù)源,想要獲得數(shù)據(jù)源必須調(diào)用源Observable.OnSubscribe的subscribe方法,傳入一個新的Subscriber,這樣就可以在它的onNext()方法中獲得數(shù)據(jù)源,并經(jīng)過傳入的接口處理后,發(fā)送給最終的Subscriber。

總體來說就是源Observable.OnSubscribe將Event往下發(fā)送給橋接Observable.Subscriber,最終橋接Observable.Subscriber將Event做相應(yīng)處理后轉(zhuǎn)發(fā)給目標(biāo)Subscriber。

五、RxJava里的線程切換

RxJava中最激動人心的功能是異步處理,能夠自如地切換線程。

利用subscribeOn() 結(jié)合observeOn() 來實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。 observeOn() 可以多次調(diào)用,Subscriber的執(zhí)行線程與最后一次observeOn()的調(diào)用有關(guān)。但subscribeOn() 多次調(diào)用只有第一個subscribeOn() 起作用。

這是因?yàn)?observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe,這時事件還沒開始發(fā)送,因此subscribeOn()的線程控制可以從事件發(fā)出的開端就造成影響。

線程調(diào)度除了橋接Observable以外,RxJava還用到一個很關(guān)鍵的類Scheduler(調(diào)度器)。

5.1 Scheduler核心代碼如下:
public class Scheduler {
    private final static Scheduler ioScheduler
            = new Scheduler(Executors.newSingleThreadExecutor());

    Executor executor;

    public Scheduler(Executor executor){
        this.executor = executor;
    }

    public Worker createWorker(){
        return new Worker(executor);
    }

    public static class Worker {
        Executor executor1;
        public Worker(Executor executor1){
            this.executor1 = executor1;
        }

        public void schedule(Runnable runnable){
            executor1.execute(runnable);
        }
    }

    public static Scheduler io(){
        return ioScheduler;
    }
}

具體的Scheduler的實(shí)現(xiàn)類就不看了,但我們需要知道,能做到線程切換的關(guān)鍵是Worker的schedule方法,因?yàn)樗鼤褌鬟^來的任務(wù)放入線程池,并在新線程中執(zhí)行。

5.2 實(shí)現(xiàn)observeOn

observeOn是作用于下層Subscriber的,需要讓下層Subscriber的事件處理方法放到新線程中執(zhí)行。為此,在Observable類里面,添加如下代碼:

public Observable<T> observeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                subscriber.onStart();
                final Scheduler.Worker worker = scheduler.createWorker();
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable throwable) {
                    }

                    @Override
                    public void onNext(final T value) {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onNext(value);
                            }
                        });
                    }
                });
            }
        });
    }

測試代碼如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

結(jié)果如下:

image.png
5.3 實(shí)現(xiàn)subscribeOn

subscribeOn是作用于上層OnSubscribe的,可以讓OnSubscribe的call方法在新線程中執(zhí)行。

因此,在Observable類里面,添加如下代碼:

public Observable<T> subscribeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                scheduler.createWorker().schedule(new Runnable() {
                    @Override
                    public void run() {
                        Observable.this.onSubscribe.call(subscriber);
                    }
                });
            }
        });
    }

測試代碼如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("Observable thread: "+Thread.currentThread().getName());
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       System.out.println("Map Observable thread: "+Thread.currentThread().getName());
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
//                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

結(jié)果如下:


image.png

六、總結(jié)

相信看RxJava這個簡易版的設(shè)計對大家的啟示,比網(wǎng)上的一些源碼解析清晰的多,希望可以拋磚引玉。有時候我們總是認(rèn)為看幾篇博文貌似當(dāng)時就懂了明白了,但是這種理解或者說記憶貌似不持久。過了一段時間總是還給博主了。學(xué)習(xí)還是得深入源碼,從源碼中學(xué)習(xí),然后在結(jié)合其他人的博客查漏補(bǔ)缺,這樣才是自己的東西。大家有興趣可以把flatMap等其他操作符來自己實(shí)現(xiàn)一下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容