Android技術(shù)前沿:RxBus的實踐

RxBus的簡單和強大是基于RxJava技術(shù)的,RxJava天生就是類似sub/pub的觀察者模式,而且很容易處理線程切換。這就使得RxBus天然的支持事件總線。本文基于對RxJava的理解,自定義RxBus,并在目前一個相對輕量級的應(yīng)用中得到了驗證。

自定義RxBus

RxJava中有一種實體對象叫做Subject,官方的解釋是“A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.” 這句話的意思就是Subject既是observer(觀察者) 也是 Observable(訂閱者),作為observer,它可以訂閱多個Observable并觀察之;作為Observable,它可以被訂閱,并拋出事件。

按照不同的需求,Subject被設(shè)計有四種,分別為:AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。這四種Subject的說明可查閱 http://reactivex.io/documentation/subject.html
這里主要講述PublishSubject。

PublishSubject可以實現(xiàn)從哪里訂閱就從哪里開始發(fā)送數(shù)據(jù),這就保證了事件的訂閱和接收的有序。示意圖如下:

錯誤發(fā)生,事件傳遞中斷
public class RxBus {
  private static final String TAG = "RxBus";

  public RxBus() {
  }

  private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();

  public <T> Observable<T> register(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if (null == subjectList) {
      subjectList = new ArrayList<>();
      subjectMapper.put(tag, subjectList);
    }

    Subject<T, T> subject = PublishSubject.create();
    subjectList.add(subject);
    Logger.t(TAG).i("[register]subjectMapper: %s", subjectMapper);
    return subject;
  }

  @SuppressWarnings("unchecked")
  public void unregister(@NonNull Object tag, @NonNull Observable observable) {
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
      subjects.remove(observable);
      if (subjects.size() == 0) {
        subjectMapper.remove(tag);
      }
    }
    Logger.t(TAG).i("[unregister]subjectMapper: %s", subjectMapper);
  }

  @SuppressWarnings("unchecked")
  public void post(@NonNull Object tag, @NonNull Object content) {
    List<Subject> subjectList = subjectMapper.get(tag);

    if (subjectList != null) {
      for (Subject subject : subjectList) {
        subject.onNext(content);
      }
    }
    Logger.t(TAG).i("[send]subjectMapper: %s", subjectMapper);
  }
}

代碼解讀:ConcurrentHashMap<Object, List<Subject>> subjectMapper 保存了所有的regist的 observer,每次regist就會PublishSubject.create()并存到該Map中;post事件之后,check subjectMapper是否有對應(yīng)的observer,有則取出執(zhí)行onNext()方法。

使用:

public class TestFragment extends BaseFragment {
  @Inject
  RxBus rxBus;
  private Observable<Object> observable;
  
  @Override
  public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    component().inject(this);
    // 注冊事件
    observable = rxBus.register(GlobalConfig.RXBUS_TAG_VERIFY_SUBMIT_SUC);
 // 訂閱事件   observable.observeOn(AndroidSchedulers.mainThread())
        .subscribe(new HttpResponseObserver<Object>() {
          @Override
          public void onCompleted() {

          }

          @Override
          public void onNext(Object o) {
            if (null != getActivity()) {
              getActivity().finish();
            }
          }
        });
  }
}

發(fā)送事件: rxBus.post(GlobalConfig.RXBUS_TAG_VERIFY_SUBMIT_SUC, new Object());

EventBus與RxBus討論

一個完美的事件總線應(yīng)該具備哪些功能?

  1. 容易訂閱事件:事件訂閱者只要聲明自己就好了,當(dāng)事件發(fā)生時自然會被調(diào)到。訂閱和取消可以方便綁定到Activity和Fragment的生命周期上。
  2. 容易發(fā)送事件:事件發(fā)送者直接發(fā)送就好了,其他的事都不管。
  3. 方便的切換線程:有些事必須主線程干,有些事必須非主線程干,所以這個還是要說清楚。
  4. 性能:隨著應(yīng)用的成長,總線可能會被重度使用,性能一定要好。

EventBus在以上幾點做到了極致,其使用和性能都已經(jīng)被廣大開發(fā)者所驗證。其優(yōu)點是全面并且性能和分發(fā)效率都很好。

RxBus由于有RxJava的支持,在易用性和性能上都非常好。但是其在大型項目中,事件非常多的情況下,其分發(fā)效率和可靠性沒有得到足夠驗證,這一點不如EventBus。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容