也談rxjava2

眾所周知Android開發(fā)不能在主線程中進行耗時操作,所以一些操作必須放在子線程中進行,這樣一來就就會涉及到涉及線程的創(chuàng)建及線程間的通信。當然Android系統(tǒng)也提供了AsyncTask,但是在處理嵌套處理方面做的并不優(yōu)雅。rxjava采用事件流的方式來解決了這一問題,當然rxjava的作用及優(yōu)點不止是這個,還有很多的功能在使用起來也是讓人愛不釋手。本文不是對rxjava的用法及功能進行介紹,而是對rxjava的內(nèi)部原理進行分析。

rxjava主要是通過發(fā)布/訂閱模式來實現(xiàn)事件的控制和處理。兩個接口和簡單的一行代碼就能明白rxjava的原理:

//發(fā)布者
public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}
//訂閱者
public interface Observer<T> {

void onSubscribe(@NonNull Disposable d);

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();

}
//訂閱
observableSource.subsrcibe(observer)
簡單用法
Observable.create(ObservableOnSubscribe<String> {
            //代碼1
            it.onNext("hello world")
        }).flatMap(Function<String, ObservableSource<String>> {
            val value = it
            ObservableSource {
            //代碼2
                it.onNext("flatmap-->$value")
            }
        }).subscribe(object : Observer<String> {
            override fun onComplete() {
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onNext(t: String) {
            //代碼3
                e("MainActivity", "t--->$t")
            }
            override fun onError(e: Throwable) {
            }

        })

上面這段段代碼創(chuàng)建了三個主要的對象:ObservableOnSubscribe(A)、ObservableFlatMap(B)和Observer(C),然后通過subscribe()方法將這個鏈串了起來。首先來看下Observable.create()方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

這里創(chuàng)建了一個ObservableCreate對象,ObservableCreate繼承自O(shè)bsevable,Observable實現(xiàn)了ObservableSource接口。然后調(diào)用了flatMap方法,最終會創(chuàng)建ObservableFlatMap對象,這相當于B訂閱了A。當調(diào)用了subsrcibe方法時,相當于C訂閱了B。我們來看下ObservableFlatMap中的subsrcibe()方法,然后又調(diào)用了subscribeActual()這個核心方法:

 @Override
public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

這里的source對象就是他訂閱的A對象,調(diào)用的subscribe(Observer o)方法就是代碼方法,這里的o對象就是上面代碼中的MergeObserver對象本身,緊接著又調(diào)用了MergeObserver.onNext()方法:

 @Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            s.dispose();
            onError(e);
            return;
        }
       //省略部分代碼
    }

這里的mapper對象就是代碼2處創(chuàng)建的Function對象,然后返回一個ObservaleSource對象,之后繼續(xù)調(diào)用了下一個訂閱者(也就是對象A)的onNext()方法。至此這個事件流就通過訂閱鏈依次到每一個訂閱者。

我們通過簡單的代碼來快速了解下rxjava的原理:

//發(fā)布者
public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);}
//訂閱者
public interface Observer<T> {
    void onNext(T t);
}
//中間訂閱者
public interface Function<V, K> {
  K apply(V v);
}
public abstract class FlatMapObservable<T> implements Observer<T>, ObservableSource<T> {
    private T mT;
    public <K> ObservableSource<K> flatMap(Function<T, ? extends ObservableSource<K>> function) {
    return function.apply(mT);
}
    @Override
    public void onNext(T t) {
    mT = t;
    }
}

//dome
public class Test {
    public static void main(String[] args) {

    new FlatMapObservable<String>() {
        @Override
        public void subscribe(Observer<? super String> observer) {
            observer.onNext("hello  world");
        }
    }.flatMap(new Function<String, ObservableSource<Boolean>>() {
        @Override
        public ObservableSource<Boolean> apply(String s) {
            return new ObservableSource<Boolean>() {
                @Override
                public void subscribe(Observer<? super Boolean> observer) {
                    observer.onNext(false);
                }
            };
        }
    }).subscribe(new Observer<Boolean>() {
        @Override
        public void onNext(Boolean aBoolean) {
            System.out.println(aBoolean);

        }
    });
 }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容