RxJava系列_01ObservableEmitter

RxJava源碼打算花一周看看, 時間太少了, 不知道一周可以看幾個操作符;

按以下的demo來學習一個RxJava的思路;

demo:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                LogUtils.log(getClass(), "threadName:" + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(getClass(), "onSubscribe()");
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(getClass(), "onNext()");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(getClass(), "onError()");
            }

            @Override
            public void onComplete() {
                LogUtils.log(getClass(), "onComplete()");
            }
        });
  • 打印結果:
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onSubscribe()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->subscribe()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onNext()
04-23 23:58:14.755 3681-3681/com.test V/AndroidTest: ->onComplete()
ObservableEmitter.onNext, ObservableEmitter.onComplete到底做了哪些事情?

一、Observable.create:

1.1 Observable.create:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
    }
}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}
  • 創(chuàng)建ObservableCreate實例, ObservableCreate持有ObservableOnSubscribe的引用;

二、Observable.subscribe:

2.1 Observable.subscribe:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
}

public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        /**
         * 1. CreateEmitter與Observer互相持有對方的引用, gc如果采用引用計數(shù)算法, 解決不了這種情況;
         * 2. 可以看出來, 被觀察者持有CreateEmitter的引用, CreateEmitter持有觀察者Oberser的引用, 
         *    被觀察者Observable通過CreateEmitter.onXXX方法觸發(fā)觀察者Observer對應的onXXX方法;
         */
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}
2.2 CreateEmitter:
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
}
操作符簡單調(diào)度的流程
  • demo太短, 可能并不能體現(xiàn)這個操作流程的巧妙之處;
如果換成我們自己來寫觀察者模式, 可能出現(xiàn)的代碼是如下方式:
public void register() {
    Observable observable = new Observable();
    observable.addObserver(new Observer(1));
    observable.addObserver(new Observer(2));
    observable.addObserver(new Observer(3));
}

public void publish() {
    observable.publish(something);
}

public class Observable {
    public void publish(something) {
        if(succ) {...}
        else {...}
    }
}
  • 反觀Rxjava, 沒有那么多套路, 直接一個鏈式調(diào)用搞定;
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容