rxjava1基本元素源碼分析

代碼示例

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        LogUtils.loge("subscriber call ...");
        if (!subscriber.isUnsubscribed()) {
            subscriber.onNext("test1");
            subscriber.onCompleted();
        }
    }
});

Observer<String> observer = new Observer<String>() {

    @Override
    public void onCompleted() {
        LogUtils.loge("Observer onCompleted");
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(String s) {
        LogUtils.loge("Observer onNext s = " + s);
    }
};
Subscription subscription = observable.subscribe(observer);

常用類說明

被觀察者

rx.Observable

訂閱

rx.Subscription

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

觀察者

rx.Observer

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

執(zhí)行流程

rx.Observable#create(rx.Observable.OnSubscribe<T>)

public static <T> Observable<T> create(OnSubscribe<T> f) {          
    // 加載RxJavaHooks的static,初始化資源
    // 返回一個Observable對象
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

rx.plugins.RxJavaHooks#onCreate(rx.Observable.OnSubscribe<T>)

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
    if (f != null) {
        // 這里其實是調用到了onObservableCreate的call方法
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

rx.plugins.RxJavaHooks

static {
    init();
}

static void init() {
    onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
        @Override
        public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
            // 調用開始訂閱的方法
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
        }
    };
    
    onObservableReturn = new Func1<Subscription, Subscription>() {
        @Override
        public Subscription call(Subscription f) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
        }
    };
    
    initCreate();
}

static void initCreate() {
    onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
        @Override
        public Observable.OnSubscribe call(Observable.OnSubscribe f) {
            /*
            這里1. 初始化RxJavaObservableExecutionHook 
                2. 返回我們傳入的Observable.OnSubscribe
            */
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
        }
    };
}

rx.plugins.RxJavaPlugins#getObservableExecutionHook

public RxJavaObservableExecutionHook getObservableExecutionHook() {
    if (observableExecutionHook.get() == null) {
        // 從系統(tǒng)配置文件中查找一個RxJavaObservableExecutionHook的實現(xiàn)類
        Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
        // impl = null
        if (impl == null) {
            // 沒有找到就使用這個默認的RxJavaObservableExecutionHookDefault實現(xiàn)類
            observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
            // we don't return from here but call get() again in case of thread-race so the winner will always get returned
        } else {
            // we received an implementation from the system property so use it
            observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
        }
    }
    return observableExecutionHook.get();
}

rx.plugins.RxJavaObservableExecutionHookDefault

rx.Observable#subscribe(rx.Observer<? super T>)

public final Subscription subscribe(final Observer<? super T> observer) {
    // 使用ObserverSubscriber對observer進行包裝
    return subscribe(new ObserverSubscriber<T>(observer));
}

rx.Observable#subscribe(rx.Subscriber<? super T>)

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    // 保證線程安全
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        // 調用先RxJavaHooks的onObservableStart的call方法,然后再調用我們在activity中定義的onSubscribe的call方法
        // 這里其實就是調用了開始訂閱
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } 
}

rx.plugins.RxJavaHooks#onObservableStart

public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
    Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
    if (f != null) {
        return f.call(instance, onSubscribe);
    }
    return onSubscribe;
}   

rx.plugins.RxJavaHooks#onObservableReturn

public static Subscription onObservableReturn(Subscription subscription) {
    Func1<Subscription, Subscription> f = onObservableReturn;
    if (f != null) {
        return f.call(subscription);
    }
    return subscription;
}

ObserverSubscriber類

public final class ObserverSubscriber<T> extends Subscriber<T> {
    final Observer<? super T> observer;

    public ObserverSubscriber(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        // 調用observer方法
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        observer.onError(e);
    }

    @Override
    public void onCompleted() {
        // 調用observer方法
        observer.onCompleted();
    }
}   

源碼閱讀總結

Subscription關聯(lián)觀察者和訂閱者

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    try {
        // 調用先RxJavaHooks的onObservableStart的call方法,然后再調用我們在activity中定義的onSubscribe的call方法
        // 這里其實就是調用了開始訂閱
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } 
}

observable.onSubscribe執(zhí)行

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

課堂總結

Observable

  1. 觀察得到的-被觀察者
  2. 通過Observable創(chuàng)建一個可觀察的序列(create方法)
  3. 通過subscribe去注冊一個觀察者

Observer

  1. 用于接收數(shù)據(jù)-觀察者
  2. 作為Observable的subsceibe方法的參數(shù)

Subscription

  1. 訂閱,用于描述被觀察者和觀察者之間的關系
  2. 用于取消訂閱和獲取當前訂閱狀態(tài)

OnSubscribe

  1. 當訂閱時會觸發(fā)此接口的調用
  2. 在Observable內部,實際作用是向訂閱者發(fā)射數(shù)據(jù)

Subscribe

  1. 實現(xiàn)了Observer和Subscription
  2. 只有自己才能阻止自己
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容