Rxjava2源碼分析

分析目的

  1. Observable發(fā)出數據和Observer接收數據
  2. 如何實現線程調度和操作符原理

文章僅分析Observable不分析帶背壓的Flowable

Observable創(chuàng)建和訂閱

一個常見例子

 Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("test");
            emitter.onComplete();
        }).map(s -> s + "111")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        LPLogger.e("onNext:" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        e.printStackTrace();
                        LPLogger.e("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        LPLogger.e("onComplete");
                    }
                });
    }

以上常見例子展示了Observable 創(chuàng)建,訂閱和調度以及轉換操作符的整個過程下面分析各個過程

Observable創(chuàng)建

Observable.create()方法參數ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}  

該接口方法subscribe(ObservableEmitter)參數ObservableEmitter實現了onNext(),onError()onComplete()即我們使用發(fā)送數據的地方

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

其中RxJavaPlugins.onAssembly()方法是hook方法,默認返回原值即ObservableCreate(source)是一個Observable,實現了subscribeActual()此方法是訂閱真正執(zhí)行的方法,先不用關注
只需要注意Observable.create()方法傳入參數ObservableOnSubscribe返回ObservableCreate即可

Observable訂閱

先不看線程調度和操作符轉換處理僅看最簡單的部分subscribe()

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //..
        subscribeActual(observer);
    }

實際是調用的subscribeActual(observer),即前面創(chuàng)建的ObservableCreate里面的subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {
     //實現自ObservableEmitter
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //這里是外界的回調
    observer.onSubscribe(parent);
    try {
        //這里source即create傳入的ObservableOnSubscribe
        //parent即前面?zhèn)魅氲腛bservableEmitter,即emitter.onNext("test")中的emitter
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

看下CreateEmitter源碼

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("xxx"));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("xxx");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
  1. 以上onNext(T)調用了observer.onNext(t)即讓訂閱者接收到發(fā)送端數據
    我們可以看到subscribeActual(observer)中的方法已經將ObservableObserver聯系起來
  2. 同時注意到CreateEmitter是繼承自Disposable即我們可以使用回調中的onSubscribe(Disposable d)中的Disposable去結束Observable發(fā)送,當我們subscribe(Consumer)獲取的返回值同理
  3. 從上面代碼可以得出我們之前記住的一些結論如onError()onComplete()只能調用一次,onSubscribeonNext()之前執(zhí)行等

線程調度

subcribeOn

subscribeOn(Schedulers.io())返回值類似于create(),返回的是ObservableSubscribeOn<T>(this, scheduler)ObservableCreate<T>(source)多一個線程處理,典型的裝飾器模式應用
源碼也和ObservableCreate類似

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //和CreateEmitter一樣也是包裝Observer,最終調用Observer.onNext之類方法
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        //線程切換SubscribeTask是Runnable最終執(zhí)行的還是source.subscribe(parent);
        //source.subscribe(parent)執(zhí)行后會執(zhí)行到ObservableCreate的subscribeActual()
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

繼續(xù)看

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //實際是一個Dispose并設置到ObservableSubscribeOn方便管理任務
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

執(zhí)行調用如下不一一展開:
w.schedule(task, delay, unit)->IoScheduler.EventLoopWorker.schedule->NewThreadWorker.scheduleActual()->ScheduledRunnable.call()->ScheduledRunnable.run()->DisposeTask.run()->new SubscribeTask(parent)
即在使用線程池執(zhí)行了source.subscribe(parent)

observeOn

observeOn(AndroidSchedulers.mainThread())中創(chuàng)建的是ObservableObserveOn

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //創(chuàng)建worker
        Scheduler.Worker w = scheduler.createWorker();
        //還是和source.subscribe(parent)一致
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

查看ObserveOnObserver是繼承Runnable

public void onNext(T t) {
    if (done) {
        return;
    }
    //先把值存儲到隊列中,然后切換線程處理
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
//run方法運行到這里
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            //切到線程后再執(zhí)行onNext
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
  1. ObserveOnObserveronNext()中把發(fā)送值存到隊列,然后調用schedule()
  2. 調用的是worker.schedule(this);和前面分析subcribeOn一樣直接查看run(),此時已經完成線程切換
  3. run()中調用的是drainNormal(),從1中存儲的隊列中取出值發(fā)送
切換線程分析

AndroidSchedulers.mainThread()實現

private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

繼續(xù)看HandlerScheduler

public Worker createWorker() {
return new HandlerWorker(handler, async);
}

private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;

private volatile boolean disposed;

HandlerWorker(Handler handler, boolean async) {
    this.handler = handler;
    this.async = async;
}

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);
    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //設置了runnable()后續(xù)發(fā)送到主線程會執(zhí)行run()
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

ObservableObserveOn中的subscribeActual()創(chuàng)建的worker就是HandlerWorker

  1. AndroidSchedulers.mainThread() 創(chuàng)建了一個帶主線程HandlerHandlerScheduler
  2. schedule()中通過handler.sendMessageDelayed(msg,delay)發(fā)送消息到主線程,因為message設置了Runnable(),消息發(fā)送到主線程后會調用message.callback.run()從而調用schedule()中的參數run(),即實際完成切換線程回調到ObserveOnObserverrun()
操作符原理

僅分析下map操作符,其它操作符類似
直接看ObservableMap源碼

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
  1. 可以看到subscribeActual()直接是source.subscribe(parent)類型,我們直接看MapObserveronNext()
  2. 非聚合模式sourceMode的值是NONE,相當于Observer.onNext(mapper.apply(t))mapper.apply(t)則是我們寫的lambda表達式的返回值即s + "111",由此可以看出map是直接把值返回
總結
  1. subscribeActual()方法中實際完成訂閱,subscribe訂閱后各個操作符才完成訂閱,即訂閱是自下而上進行的
  2. 線程操作是通過線程池和Handler完成切換
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • RxJava2源碼分析 RxJava的鼎鼎大名相信Android開發(fā)的同學都非常熟悉了,其實不僅僅有RxJava,...
    BlackFlag閱讀 310評論 0 3
  • 概述 書接上文,上節(jié)我們分析了Rxjava是如何對被觀察線程進行調度的,這節(jié)我們來分析下Rxjava是如何對觀察者...
    yhihua0607閱讀 1,017評論 1 1
  • 該類里有3個重要方法: create subscribe subscribeActual(子類Observable...
    o動感超人o閱讀 886評論 2 50
  • 前言 最近由于項目需要自己搭建了網絡框架,采用時下非常流行的Rxjava2 + Retrofit搭建, Rxjav...
    yhihua0607閱讀 2,929評論 0 8
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開了第一次的黨會,身份的轉變要...
    余生動聽閱讀 10,834評論 0 11

友情鏈接更多精彩內容