RxBus實現(xiàn)過程詳解

1,什么是RxBus

rxbus不是什么框架,它只是一個通過rxjava實現(xiàn)eventbus的類
在android中使用時,它還還可以引用AndroidLifecycle來解決內(nèi)存溢出問題
它是觀察者模式的一種應(yīng)用,方便了我們在不同頁面與不同線程間的通信

2,代碼

RxBus的代碼實現(xiàn)


public class RxBus {
    private volatile static RxBus mDefaultInstance;
    //事件總線
    private final Subject<Object> mBus;
    //粘性事件存儲
    private final Map<Class<?>, Object> mStickyEventMap;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
        mStickyEventMap = new ConcurrentHashMap<>();
    }

    public static RxBus getInstance() {
        if (mDefaultInstance == null) {
            synchronized (RxBus.class) {
                if (mDefaultInstance == null) {
                    mDefaultInstance = new RxBus();
                }
            }
        }
        return mDefaultInstance;
    }

    /**
     * 發(fā)送事件
     */
    public void post(Object event) {
        mBus.onNext(event);
    }

    /**
     * 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
     */
    public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
        LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
        return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
    }

    /**
     * 判斷是否有訂閱者
     */
    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public void reset() {
        mDefaultInstance = null;
    }

    /**
     * 發(fā)送一個新Sticky事件
     */
    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        post(event);
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     * 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
     */
    public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
            Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
            } else {
                return observable;
            }
        }
    }

    /**
     * 根據(jù)eventType獲取Sticky事件
     */
    public <T> T getStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.get(eventType));
        }
    }

    /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }

    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }

}

事件實體類

public class MsgEvent {
    private String msg;

    public MsgEvent(String msg) {
        this.msg = msg;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

接收事件(觀察者)

 RxBus.getInstance()
                .toObservable(this, MsgEvent.class)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MsgEvent>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(MsgEvent msgEvent) {
                        text.setText("one " + msgEvent.getMsg());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

事件發(fā)送

RxBus.getInstance().post(new MsgEvent("Java"));

RxBus實現(xiàn)原理

初始化分析

首先,RxBus是一個單利模式,這沒什么可以說的,畢竟后面使用到RxBus需要是公共唯一的類。

事件總線,現(xiàn)在來說一下Subject<Object> mBus,這是一個事件總線,什么是事件總線呢?
在簡單觀察模式中,觀察者訂閱被觀察者,單被觀察者狀態(tài)或者數(shù)據(jù)發(fā)生變化時通知觀察者,這是一對一的關(guān)系。
但當(dāng)觀察者和被觀察者是多個或者不確定數(shù)量的時候,這就需要一個總線來存儲這些觀察者和被觀察者,方便在發(fā)送通知的時候找到對應(yīng)的觀察者。

public class RxBus {
    
    ...
    
    //事件總線
    private final Subject<Object> mBus;
    
    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
        mStickyEventMap = new ConcurrentHashMap<>();
    }
    
    ...
}

這里是通過Rxjava中的PublishSubject.create().toSerialized() 來創(chuàng)建總線用來存儲觀察者。簡單的就把它當(dāng)做集合吧。

觀察者的創(chuàng)建分析

public class RxBus{
    
    ...
    
    /**
     * 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
    */
    public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
        LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
        return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
    }
    
    /**
     * 發(fā)送事件
    */
    public void post(Object event) {
        mBus.onNext(event);
    }
    ...
}

回顧觀察者創(chuàng)建代碼

 RxBus.getInstance()
                .toObservable(this, MsgEvent.class)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MsgEvent>() { ... });

在觀察者創(chuàng)建時,先調(diào)用RxBus中的toObservable獲取一個回調(diào)事件類型為MsgEvent的被觀察者Observable,其中傳入了LifecycleOwner,
這個是為了防止當(dāng)頁面關(guān)閉以后,訂閱事件還沒有結(jié)束。

然后再執(zhí)行.subscribe(),并傳入一個新建的觀察者Observer,在subscribe的作用就是讓觀察者訂閱被觀察者

事件發(fā)送

執(zhí)行Subject的next進行發(fā)送

3,RxBus粘性事件

什么是粘性事件

一般情況都是先創(chuàng)建觀察者并加入到總線中,然后在執(zhí)行事件發(fā)送,觀察者就可以收到相應(yīng)的事件

但是有時候也出現(xiàn)先發(fā)送事件,然后再創(chuàng)建觀察者,這個時候就收不到之前的事件了,使用粘性事件就可以做到后創(chuàng)建的觀察者也可以收到之前的事件。

class RxBus{

    ...
    
    /**
     * 發(fā)送一個新Sticky事件
     */
    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        post(event);
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     * 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
     */
    public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
            Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
            } else {
                return observable;
            }
        }
    }
    
    ...
}

粘性事件實際就是創(chuàng)建一個Map<Class<?>, Object> mStickyEventMap,用于存儲所有發(fā)送過的粘性事件,當(dāng)創(chuàng)建粘性觀察者時,會從這map中知道對應(yīng)的EventType類型的被觀察者Observable,并返回
observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event)))),這是返回Observable時代碼發(fā)送了從map中對應(yīng)的事件,這樣新創(chuàng)建的觀察者也可以能馬上收到之前的事件

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RxBus、EventBus因為解耦太徹底,濫用的話,項目可維護性會越來越低;一些簡單場景更推薦用回調(diào)、Subje...
    YoKey閱讀 16,186評論 32 81
  • 在正文開始之前的最后,放上 GitHub 鏈接和引入依賴的 gradle 代碼: Github: https://...
    松江野人閱讀 6,139評論 0 1
  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 1,028評論 0 2
  • 作者寄語 很久之前就想寫一個專題,專寫Android開發(fā)框架,專題的名字叫 XXX 從入門到放棄 ,沉淀了這么久,...
    戴定康閱讀 7,729評論 13 85
  • 文/葉孜 我是一只流浪貓,我遇見了一個人。 她很溫柔,會輕輕的抱著我, 她很漂亮,美麗的臉龐讓我一次就記住了她, ...
    小小葉不愛謊言閱讀 388評論 0 0

友情鏈接更多精彩內(nèi)容