RxJava2 源碼學(xué)習(xí)

基于 RxJava 2.1.6 RxJava github 地址

為什么要用 RxJava

簡(jiǎn)潔!簡(jiǎn)潔!簡(jiǎn)潔?。ㄖ匾氖虑檎f(shuō)三遍)

RxJava 最大的優(yōu)點(diǎn)就是簡(jiǎn)潔。簡(jiǎn)潔的代碼能讓人心曠神怡,減少 bug 。

RxJava 是一種新的編程模式 響應(yīng)式編程

響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。
數(shù)據(jù)流就像一條河:它可以被觀(guān)測(cè),被過(guò)濾,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。

以上是 RxJava Essentials 中文翻譯版 對(duì)響應(yīng)式編程的介紹。使用 RxJava 可以讓我們?cè)?java 語(yǔ)言中體驗(yàn)什么是響應(yīng)式編程。響應(yīng)式編程有兩個(gè)重要概念

1. 基于異步
2. 數(shù)據(jù)流

如何使用 RxJava

RxJava 的設(shè)計(jì)理念基于 觀(guān)察者模式

在 RxJava 中首先明白有兩個(gè)對(duì)象觀(guān)察者被觀(guān)察者。

觀(guān)察者和被觀(guān)察者可以存在不同的線(xiàn)程之中,所以存在觀(guān)測(cè)線(xiàn)程被觀(guān)測(cè)線(xiàn)程*

觀(guān)察者和被觀(guān)察者通過(guò) subscribe 發(fā)生『訂閱』關(guān)系。

RxJava 提供一些列的鏈?zhǔn)秸{(diào)用,使用起來(lái)如下:

Observable
    .create(...)
    .observeOn(...)
    .subscribeOn(...)
    .subscribe(...)    

subscribe() 方法為鏈?zhǔn)秸{(diào)用的最后一層,create() 和 subscribe() 方式之前可以任意設(shè)置其他操作。

上面提到『響應(yīng)式編程』中的數(shù)據(jù)流就像是一條河。
create() 方法可以比喻為河流的『上游發(fā)源地』,subscribe() 則為河流的『入??凇弧?在這兩個(gè)方法之間我們可以添加觀(guān)察、過(guò)濾等操作。

在鏈?zhǔn)秸{(diào)用中增加一些數(shù)據(jù)處理

Observable
    .create(...)
    .observeOn(...)
    .subscribeOn(...)
    .map(...)
    .filter(...)
    .subscribe(...) 

在 RxJava2 中提供一系列可觀(guān)測(cè)對(duì)象(也就是上面鏈?zhǔn)秸{(diào)用的 Observable 等同功能)

  • io.reactivex.Flowable
  • io.reactivex.Observable
  • io.reactivex.Single
  • io.reactivex.Completable
  • io.reactivex.Maybe

這里我們寫(xiě)一個(gè)例子

    Observable.create((ObservableOnSubscribe<String>) e -> {
        for (int i = 0; i < 5; i++) {
            e.onNext(i + "");
        }
    })
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.io())
            .map(s -> {
                System.out.println("map:" + s);
                return s + "_map";
            })
            .filter(o -> {
                System.out.println("flat:" + o);
                if (o.compareTo("3") < 0) {
                    return true;
                }
                return false;
            })
            .subscribe(o -> System.out.println("subscribe:" + o));

輸出結(jié)果如下:

map:0
flat:0_map
subscribe:0_map
map:1
flat:1_map
subscribe:1_map
map:2
flat:2_map
subscribe:2_map
map:3
flat:3_map
map:4
flat:4_map

上面的例子中,我們?cè)?strong>被觀(guān)察者中發(fā)射了 5 個(gè)數(shù)據(jù)源,觀(guān)察者和被觀(guān)察著都在同一個(gè)線(xiàn)程中,通過(guò) map 對(duì)象給每個(gè)發(fā)射的對(duì)象拼接一個(gè) 『_map』字符串,通過(guò) filter 過(guò)濾了比『3』字符串小的對(duì)象。

所以最終在觀(guān)察者中接收到了 3 次消息。

源碼分析 RxJava 中的核心代碼

訂閱關(guān)系的產(chǎn)生

創(chuàng)建被觀(guān)察著

『被觀(guān)測(cè)者』是事件產(chǎn)生的一方,創(chuàng)建方式也有很多種。這里列舉一下 Observable 創(chuàng)建的方式

  1. 通過(guò) create() 方法創(chuàng)建

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
         ObjectHelper.requireNonNull(source, "source is null");
         return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
     }
    
  2. 通過(guò) just() 方法創(chuàng)建

     public static <T> Observable<T> just(T item) {
         ObjectHelper.requireNonNull(item, "The item is null");
         return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
     }
    
  3. 通過(guò) fromArray() 方法創(chuàng)建

     public static <T> Observable<T> fromArray(T... items) {
         ObjectHelper.requireNonNull(items, "items is null");
         if (items.length == 0) {
             return empty();
         } else
         if (items.length == 1) {
             return just(items[0]);
         }
         return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
     }
    

其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子類(lèi),而 Observable 本身是一個(gè)抽象。

這些子類(lèi)主要實(shí)現(xiàn) Observe 的抽象方法

protected abstract void subscribeActual(Observer<? super T> observer);

再看一下 RxJavaPlugins.onAssembly() 方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

這里需要說(shuō)明一下 RxJavaPlugins.onAssembly()是一個(gè) Hock,如果不做任何 hock 處理,RxJavaPlugins.onAssembly()會(huì)直接返回傳入的對(duì)象。onObservableAssembly 靜態(tài)成員變量為 null

我們用 ObservableCreate 舉例

ObservableCreate 的構(gòu)造方法需要傳入一個(gè) ObservableOnSubscribe 對(duì)象

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

并重載 Observable 的 subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

subscribeActual() 方法傳入了一個(gè) Observer 對(duì)象并且包裝到 CreateEmitter 對(duì)象中,然后調(diào)用
observer.onSubscribe(parent);source.subscribe(parent);

創(chuàng)建觀(guān)察者

觀(guān)察者比較簡(jiǎn)單,需要實(shí)現(xiàn) 4 個(gè)方法

new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
}
訂閱

通常情況下,我們不需要重載 Observer 的每一個(gè)方法,RxJava 內(nèi)部提供了另一個(gè) LambdaObserver 把 Observer 的四個(gè)方法拆分為 4 個(gè)部分。

Observable.subscribe() 可以只傳入一個(gè) Consumer 對(duì)象。

內(nèi)部會(huì)把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

然后調(diào)用接收 Observer 的 subscribe() 方法

@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } ……
}

這里調(diào)用了 Observable 的 subscribeActual(observer) 方法。
這里就完成了 觀(guān)察者被觀(guān)察著 之間的訂閱關(guān)系

rxJava_01.png

如下一段代碼

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            for (int i = 0; i < 5; i++) {
                System.out.println("subscribe:" + i);
                e.onNext(i + "");
            }
        }
    })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept:" + s);
                }
            });

調(diào)用的時(shí)序圖如下


rxJava_02.png

線(xiàn)程調(diào)度原理分析

上部分分析的訂閱關(guān)系的創(chuàng)建,都是在當(dāng)前線(xiàn)程之中。RxJava 可以指定 觀(guān)察線(xiàn)程觀(guān)察者線(xiàn)程

observeOn 原理分析

Observable 的 observeOn 方法有三個(gè)

  • Observable<T> observeOn(Scheduler scheduler)
  • Observable<T> observeOn(Scheduler scheduler, boolean delayError)
  • Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)

這三個(gè)方法中,前兩個(gè)方法都會(huì)再次調(diào)用第三個(gè)方法

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

我們看到這里創(chuàng)建了一個(gè) ObservableObserveOn 對(duì)象,ObservableObserveOnObservable的子類(lèi)。

我們看下 ObservableObserveOnsubscribeActual

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

這里我們假設(shè)傳入的 SchedulerSchedulers.io() 進(jìn)一步跟蹤分析會(huì)發(fā)現(xiàn) Schedulers.io() 返回的是 IoScheduler
所以會(huì)走上面代碼的 else 分支。

先忽略 scheduler.createWorker() 過(guò)程,先看下 ObserveOnObserver

這里的 sourceObservableCreate ,而 source.subscribe() 會(huì)調(diào)用 ObservableCreate.subscribeActual(observer) 然后調(diào)用 ObserveOnObserver.onSubscribe() 方法

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                ……
            }
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            actual.onSubscribe(this);
        }
    }

這里傳入的 Disposable 是 CreateEmitter 對(duì)象,所以不會(huì)走 if 分支。

然后創(chuàng)建了一個(gè) SpscLinkedArrayQueue 對(duì)象,

緊接著執(zhí)行 actual.onSubscribe() 也就是 LambdaObserver.onSubscribe()

后面應(yīng)該執(zhí)行的是 ObserveOnObserver.onNext() 方法

    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

看到會(huì)把 onNext(T t) 傳入?yún)?shù)放入隊(duì)列之中,然后執(zhí)行 schedule

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

worker.schedule() 接收的是一個(gè) Runnable 對(duì)象,所以我們從這里可以看出 Observer 的 onNext() 、onComplete()、onError() 等方法都是在線(xiàn)程之中執(zhí)行。

接下來(lái)我們看下線(xiàn)程的創(chuàng)建

ObservableObserveOnsubscribeActual() 方法中的

Scheduler.Worker w = scheduler.createWorker();

Schedulers.io()的跟蹤過(guò)程比較簡(jiǎn)單,最終會(huì)得到一個(gè) IoScheduler

public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

直接看 createWorker()

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

再看 EventLoopWorker 的構(gòu)造函數(shù)

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

繼續(xù)看下 CachedWorkerPool 的構(gòu)造方法

    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }      

終于我們找到了線(xiàn)程池相關(guān)的代碼

 evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
 task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

pool.get() 方法

    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }

        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }

這里返回了一個(gè) ThreadWorker

看下 ThreadWorker 的創(chuàng)建過(guò)程

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

看下父類(lèi)的構(gòu)造函數(shù)

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

再跟下去

public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
}

這里又出現(xiàn)了一個(gè)線(xiàn)程池

下面開(kāi)始看 worker.schedule(this)

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }

繼續(xù)跟蹤下去

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

終于我們找到了執(zhí)行線(xiàn)程的方法
總結(jié)一下真?zhèn)€流程

rxJava_03.png

subscribeOn 流程分析

subscribeOn 方法只有一個(gè)

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

和 observeOn 類(lèi)似,這里把 Observable 包裝到 ObservableSubscribeOn 對(duì)象中。

直接看subscribeActual方法

public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

這里把 Observe 包裹在 SubscribeOnObserver 中,并執(zhí)行

    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }

這只設(shè)置了 Disposable 只能被執(zhí)行一次。

重點(diǎn)看下 scheduler.scheduleDirect(new SubscribeTask(parent))

先看一些 SubscribeTask 類(lèi)

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

這里看出 SubscribeTask 也是一個(gè) Runnable 在 run 方法中執(zhí)行 source.subscribe(parent),

所以 SubscribeTask 是觀(guān)察者線(xiàn)程的關(guān)鍵

繼續(xù)依照 IoScheduler 為例

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

這里和 subscribeOn 原理一樣了,通過(guò) Worker 對(duì)象把觀(guān)察者的行為設(shè)置在線(xiàn)程之中。

操作符原理分析

RxJava 里面有很多操作符,這里找一個(gè) map 操作進(jìn)行分析。其他更復(fù)雜的操作不一一分析。

先看一個(gè)例子

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            for (int i = 0; i < 5; i++) {
                System.out.println("subscribe:" + i);
                e.onNext(i + "");
            }
        }
    })
            .map(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    System.out.println("map:" + s);
                    return s + "_map";
                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept:" + s);
                }
            });

輸出結(jié)果如下:

subscribe:0
map:0
accept:0_map
subscribe:1
map:1
accept:1_map
subscribe:2
map:2
accept:2_map
subscribe:3
map:3
accept:3_map
subscribe:4
map:4
accept:4_map

可以看 ObservableOnSubscribe 里面每次發(fā)送的數(shù)據(jù)都會(huì)到 Function.apply() 方法進(jìn)行『過(guò)濾』,把每個(gè)發(fā)送的 String 轉(zhuǎn)換一下后再發(fā)送給 Consumer

看下 map() 方法

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

根據(jù)上面的經(jīng)驗(yàn),我們直接看 ObservableMap 對(duì)象的 subscribeActual()

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

然后繼續(xù)看 MapObserver 對(duì)象的 onNext,其中 function 就是我們?cè)?map() 方法中傳入的 Function 對(duì)象。

先看構(gòu)造函數(shù)

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

在看 onNext()

    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }

        U v;

        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        actual.onNext(v);
    }

可以看出,在執(zhí)行 actual.onNext(v) 之前,會(huì)執(zhí)行 mapper.apply(t) 從而完成轉(zhuǎn)換。

其他更復(fù)雜的操作符,基本都是在各種特定的 XXXObserver 中的 onNext(T t) 方法中做特殊處理。

參考資料

NotRxJava懶人專(zhuān)用指南

RxJava 從入門(mén)到放棄再到不離不棄

RxJava github 地址

觀(guān)察者模式

RxJava系列6(從微觀(guān)角度解讀RxJava源碼)

Rxjava 2 源碼分析

Rxjava 2 源碼分析 (2)

RxJava Essentials 中文翻譯版

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容