Android 用RxJava模擬一個(gè)EventBus ———RxBus

** 本篇文章已授權(quán)微信公眾號(hào) guolin_blog (郭霖)獨(dú)家發(fā)布*

RxBus的核心功能是基于Rxjava的,既然是模擬EventBus,我們需要搞清楚RxJava滿足實(shí)現(xiàn)EventBus的那些條件,這樣才能更好的實(shí)現(xiàn)RxBus。

EventBus是Android上的一個(gè)事件發(fā)布/訂閱的事件總線框架,可以充分的解耦,簡(jiǎn)化了四大組件、UI線程與子線程的間的事件傳遞等等。它基本工作流程如下:

  • 1、訂閱:EventBus.getDefault().register(this);
  • 2、發(fā)送事件:EventBus.getDefault().post(event);
  • 3、接受、處理事件:onEventXXX(Object event);
  • 2、取消訂閱:EventBus.getDefault().unregister(this);

根據(jù)EventBus的工作流程,我們的RxBus首先需要自身的實(shí)例,這一點(diǎn)我們可以仿照EventBus的getDefault()方法,通過(guò)一個(gè)單例來(lái)實(shí)現(xiàn)。有了RxBus實(shí)例就可以進(jìn)行訂閱了,在RxJava中有個(gè)Subject類,它繼承Observable類,同時(shí)實(shí)現(xiàn)了Observer接口,因此Subject可以同時(shí)擔(dān)當(dāng)訂閱者和被訂閱者的角色,這里我們使用Subject的子類PublishSubject來(lái)創(chuàng)建一個(gè)Subject對(duì)象(PublishSubject只有被訂閱后才會(huì)把接收到的事件立刻發(fā)送給訂閱者),在需要接收事件的地方,訂閱該Subject對(duì)象,之后如果Subject對(duì)象接收到事件,則會(huì)發(fā)射給該訂閱者,此時(shí)Subject對(duì)象充當(dāng)被訂閱者的角色。完成了訂閱,在需要發(fā)送事件的地方將事件發(fā)送給之前被訂閱的Subject對(duì)象,則此時(shí)Subject對(duì)象做為訂閱者接收事件,然后會(huì)立刻將事件轉(zhuǎn)發(fā)給訂閱該Subject對(duì)象的訂閱者,以便訂閱者處理相應(yīng)事件,到這里就完成了事件的發(fā)送與處理。最后就是取消訂閱的操作了,Rxjava中,訂閱操作會(huì)返回一個(gè)Subscription對(duì)象,以便在合適的時(shí)機(jī)取消訂閱,防止內(nèi)存泄漏,如果一個(gè)類產(chǎn)生多個(gè)Subscription對(duì)象,我們可以用一個(gè)CompositeSubscription存儲(chǔ)起來(lái),以進(jìn)行批量的取消訂閱。

到這里我們已經(jīng)結(jié)合EventBus對(duì)RxBus的可行性以及大概的實(shí)現(xiàn)流程進(jìn)行了分析,接下來(lái)結(jié)合實(shí)現(xiàn)代碼再做進(jìn)一步的解釋:

public class RxBus {
    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;
    private HashMap<String, CompositeSubscription> mSubscriptionMap;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }

    /**
     * 發(fā)送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }

    /**
     * 返回指定類型的Observable實(shí)例
     *
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> toObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

    /**
     * 是否已有觀察者訂閱
     *
     * @return
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 一個(gè)默認(rèn)的訂閱方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return tObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

    /**
     * 保存訂閱后的subscription
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }

    /**
     * 取消訂閱
     * @param o
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }

        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)){
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).unsubscribe();
        }

        mSubscriptionMap.remove(key);
    }
}

先看一下這個(gè)私有的構(gòu)造函數(shù):

private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

由于Subject類是非線程安全的,所以我們通過(guò)它的子類SerializedSubject將PublishSubject轉(zhuǎn)換成一個(gè)線程安全的Subject對(duì)象。之后可通過(guò)單例方法getInstance()進(jìn)行RxBus的初始化。

toObservable()根據(jù)事件類型,通過(guò)mSubject.ofType(type);得到一個(gè)Observable對(duì)象,讓其它訂閱者來(lái)訂閱。其實(shí)ofType()方法,會(huì)過(guò)濾掉不符合條件的事件類型,然后將滿足條件的事件類型通過(guò)cast()方法,轉(zhuǎn)換成對(duì)應(yīng)類型的Observable對(duì)象,這點(diǎn)可通過(guò)源碼查看。
同時(shí)封裝了一個(gè)簡(jiǎn)單的訂閱方法doSubscribe(),只需要傳入事件類型,相應(yīng)的回調(diào)即可。其實(shí)可以根據(jù)需求在RxBus中擴(kuò)展?jié)M足自己需求的doSubscribe()方法,來(lái)簡(jiǎn)化使用時(shí)的代碼邏輯。

在需要發(fā)送事件的地方調(diào)用post()方法,它間接的通過(guò)mSubject.onNext(o);將事件發(fā)送給訂閱者。

同時(shí)RxBus提供了addSubscription()unSubscribe()方法,分別來(lái)保存訂閱時(shí)返回的Subscription對(duì)象,以及取消訂閱。

接下我們?cè)诰唧w的場(chǎng)景中測(cè)試一下:
1、我們?cè)贏ctivity的onCreate()方法中進(jìn)行進(jìn)行訂閱操作:

private void doSubscribe() {
        Subscription subscription1 = RxBus.getInstance()
                .tObservable(String.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        mTv.setText("事件內(nèi)容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription1);
    }

可以看到我們?cè)O(shè)定事件類型為String,并且Subscriber的回調(diào)發(fā)生在主線程,同時(shí)保存了Subscription對(duì)象。
然后通過(guò)一個(gè)Button發(fā)送事件:

mBtn1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                RxBus.getInstance().post("1024");
            }
        });

我們直接在UI線程發(fā)送了String類型的1024,看效果:

發(fā)送UI線程事件

2、同樣在onCreate()方法中進(jìn)行進(jìn)行訂閱操作:

private void doSubscribe() {
        Subscription subscription2 = RxBus.getInstance()
                .doSubscribe(Integer.class, new Action1<Integer>() {
                    @Override
                    public void call(Integer s) {
                        mTv.setText("事件內(nèi)容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription2);
    }

我們使用了RxBus中封裝好的doSubscribe()方法,設(shè)置事件類型為Integer。
這次我們通過(guò)Button在子線程中發(fā)送一個(gè)事件:

mBtn2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RxBus.getInstance().post(2048);
                    }
                }).start();
            }
        });

在子線程發(fā)送了一個(gè)Integer類型的2048,看效果:


發(fā)送子線程事件

3、我們?cè)贉y(cè)試下在廣播中發(fā)送事件,訂閱方式按照?qǐng)鼍?的方式。
然后定義一個(gè)檢測(cè)網(wǎng)絡(luò)狀態(tài)的廣播:

public class NetworkChangeReceiver extends BroadcastReceiver {
    @Override
    public void onReceive(Context context, Intent intent) {
        ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo networkInfo = manager.getActiveNetworkInfo();
        if (networkInfo != null && networkInfo.isAvailable()) {
            RxBus.getInstance().post("網(wǎng)絡(luò)連接成功");
        } else {
            RxBus.getInstance().post("網(wǎng)絡(luò)不可用");
        }
    }
}

在網(wǎng)絡(luò)可用與不可用時(shí)發(fā)送提示事件,然后在onCreate()方法中注冊(cè)廣播:

private void registerReceiver() {
        IntentFilter intentFilter = new IntentFilter();
        intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
        mReceiver = new NetworkChangeReceiver();
        registerReceiver(mReceiver, intentFilter);
    }

我們手動(dòng)打開(kāi)、關(guān)閉網(wǎng)絡(luò),可以看到mTv上會(huì)顯示網(wǎng)絡(luò)狀態(tài)的提示信息,看效果:


在廣播中發(fā)送事件

最后不要忘了在onDestory()中對(duì)廣播進(jìn)行取消注冊(cè),以及取消訂閱。

protected void onDestroy() {
        super.onDestroy();
        unregisterReceiver(mReceiver);
        RxBus.getInstance().unSubscribe(this);
    }

其它場(chǎng)景有興趣的可自行測(cè)試哦!到這里RxBus的基本功能就實(shí)現(xiàn)了。

但是還不夠完善,一般情況我們都是先訂閱事件,然后發(fā)送事件,如果我們反過(guò)來(lái),先發(fā)送了事件,再進(jìn)行訂閱操作,怎么保證發(fā)送的事件不丟失呢?也就是EventBus中的StickyEven功能。
其實(shí)通過(guò)RxJava實(shí)現(xiàn)類似的功能很簡(jiǎn)單,Subject有一個(gè)子類BehaviorSubject,在被訂閱之前,它可以緩存最近一個(gè)發(fā)送給它的事件,當(dāng)被訂閱后,它會(huì)立刻將緩存事件發(fā)送給訂閱者,這樣就解決了我們之前的疑問(wèn)。RxBus需要做的修改很簡(jiǎn)單:

private RxBus() {
        mSubject = new SerializedSubject<>(BehaviorSubject.create());
    }

但是有一點(diǎn)需要注意BehaviorSubject只能緩存最近的一個(gè)事件,如果有多個(gè)事件怎么辦?對(duì)RxJava來(lái)說(shuō)都不是事,Subject還有一個(gè)子類ReplaySubject,在被訂閱之前,它可以緩存多個(gè)發(fā)送給它的事件,在被訂閱后會(huì)發(fā)送所有事件給訂閱者,相信如何修改RxBus已經(jīng)很明顯了。

有興趣的話可以下載源碼測(cè)試:點(diǎn)我下載哦!

最后推薦一些RxJava的學(xué)習(xí)資源:RxJava入門(mén)給 Android 開(kāi)發(fā)者的 RxJava 詳解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容