RxJava源碼分析

從創(chuàng)建一個數(shù)組的Observable說起:

public static <T> Observable<T> from(T[] array) {
    ......
    return create(new OnSubscribeFromArray<T>(array));
}

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f)); //RxJavaHooks提供了改寫OnSubscribe的鉤子,可以認為直接返回參數(shù)
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

OnSubsribeFromArray:

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

當(dāng)調(diào)用Observable的subscribe時,會創(chuàng)建一個ObserverSubscribe包裝Observer

    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    
        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            observable.onSubscribe.call(subscriber);
            return subscriber;
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

顯示調(diào)用subscriber的onStart,然后用SafeSubscriber包裝subscriber,SafeSubscriber是為了保證OnError和OnComplete只會執(zhí)行其中之一,并且執(zhí)行后不再執(zhí)行onNext

接著調(diào)用OnSubscribeFromArray.call(subscriber)

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

    static final class FromArrayProducer<T>
    extends AtomicLong
    implements Producer {
        /** */
        private static final long serialVersionUID = 3534218984725836979L;

        final Subscriber<? super T> child;
        final T[] array;

        int index;

        public FromArrayProducer(Subscriber<? super T> child, T[] array) {
            this.child = child;
            this.array = array;
        }

        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }

        void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

        void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;

            long e = 0L;
            int i = index;

            for (;;) {

                while (r != 0L && i != n) {
                    if (child.isUnsubscribed()) {
                        return;
                    }

                    child.onNext(array[i]);

                    i++;

                    if (i == n) {
                        if (!child.isUnsubscribed()) {
                            child.onCompleted();
                        }
                        return;
                    }

                    r--;
                    e--;
                }

                r = get() + e;

                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }
    }

Subscriber.setProducer:如果包裝了其他subscriber,則轉(zhuǎn)交給其他subscriber,如果沒有包裝subscriber,沒有調(diào)用request,會調(diào)用p.request(Long.MAX_VALUE)。如果之前調(diào)用了request,則調(diào)用p.request(n),n為累積的數(shù)量。

    public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

FromArrayProducer.request(n)

        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }

如果n 為 Long.MAX_VALUE,如果之前沒有請求過,則調(diào)用fastPath

如果n不為0,則說明之前調(diào)用了request,則要支持背壓,調(diào)用slowPath

        void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

只要沒有unsubscribe,會對數(shù)組中每個元素調(diào)用Observer的onNext,然后調(diào)用Observer.onCompleted()

Scheduler

通過subscribeOn和observeOn可以改變操作符執(zhí)行的線程

subscribeOn只能調(diào)用一次,表示subscribe操作所在的線程,第一次ObserveOn之前所以操作都在subscribeOn所指定的線程

observeOn可以調(diào)用多次,調(diào)用之后所有的操作都在observeOn指定的線程上進行操作

subscribeOn,創(chuàng)建一個OperatorSubscribeOn將原來的Observable包裝起來。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

從scheduler中create一個worker,調(diào)用worker的schedule執(zhí)行包裝Observable的subscribe方法。而且保證Producer的request方法在worker線程上調(diào)用。

RxJava中的Scheduler有

  • NewThreadScheduler,NewThreadWorker(包裝了一個ScheduledExecutorService)
  • ComputationScheduler:EventLoopsScheduler,根據(jù)cpu數(shù)量確定worker的數(shù)量,EventLoopWorker包裝了PoolWorker(實際是NewThreadWorker)
  • IOScheduler: CachedThreadScheduler,ThreadWorker: unsubscribe的ThreadWorker可以重用,長期不用的會被清理。

Map操作符

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

創(chuàng)建一個OnSubscribeMap包裝map函數(shù)和原Observable。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

subscribe時會生成一個MapSubscriber包裝原來subscriber。

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

當(dāng)MapSubscriber的onNext方法被調(diào)用時,先用map函數(shù)轉(zhuǎn)換一下,然后傳給包裝subscriber的onNext

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容