RxBus

導(dǎo)包

compile 'io.reactivex.rxjava2:rxjava:2.1.6'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'

調(diào)用

RxBus.getInstance().send(user);(user為UserModel實(shí)例)


register.png

RxBus源碼

import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;

import java.util.concurrent.ConcurrentHashMap;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;


public class RxBus {

    private Relay<Object> bus = null;
    private static RxBus instance;

    //禁用構(gòu)造方法
    private RxBus() {
        bus = PublishRelay.create().toSerialized();
    }

    public static RxBus getInstance() {
        if (instance == null) {
            synchronized (RxBus.class) {
                if (instance == null) {
                    instance = new RxBus();
                }
            }
        }
        return instance;
    }

    public void send(Object event) {
        bus.accept(event);
    }

    public <T> Observable<T> toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }

    ConcurrentHashMap<Class, Object> mStickMap = new ConcurrentHashMap<>();

    /**
     * 發(fā)送rxbus粘性廣播
     *
     * @param event
     */
    public void sendSticky(Object event) {
        mStickMap.put(event.getClass(), event);
    }

    /**
     * 消費(fèi)粘性廣播(僅一處消費(fèi))
     */
    public <T> void registerStickyJustHere(final Class<T> eventType, Scheduler scheduler, Consumer<T> consumer) {
        T t = (T) mStickMap.get(eventType);
        if (t != null) {
            Observable.just(t).observeOn(scheduler).subscribe(consumer);
            clearSticky(eventType);
        }
    }

    public <T> void registerStickyJustHere(Class<T> eventType, Consumer<T> consumer) {
        registerStickyJustHere(eventType, AndroidSchedulers.mainThread(), consumer);
    }
    /**
     * 消費(fèi)粘性廣播
     */
    public <T> void registerSticky(Class<T> eventType, Scheduler scheduler, final Consumer<T> consumer) {
        T t = (T) mStickMap.get(eventType);
        if (t != null) {
            Observable.just(t).subscribe(consumer);
        }
    }

    public <T> void registerSticky(Class<T> eventType, Consumer<T> consumer) {
        registerSticky(eventType, AndroidSchedulers.mainThread(), consumer);
    }
    public <T> void clearSticky(Class<T> eventType){
        mStickMap.remove(eventType);
    }

    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
      }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                               Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public void unregister(Disposable... disposables) {
        for (Disposable disposable : disposables
                ) {
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 178,765評(píng)論 25 709
  • 新的應(yīng)用中用到了RxBus,這里簡(jiǎn)單敘述一下. 1.添加依賴 // RxJava 2implementation'...
    沐沐小風(fēng)閱讀 523評(píng)論 1 0
  • 了解RxJava也蠻久了,原來(lái)一直不了解其中的原理,尤其是配合Retrofit組合之后線程切換和類型轉(zhuǎn)換老是暈,剛...
    RoboyCore閱讀 1,045評(píng)論 0 1
  • Rxjava目前已經(jīng)很火了,如果你尚未了解,可以查看rxjava詳情。RxBus并不是一個(gè)庫(kù),而是一種模式,用過(guò)E...
    圈圈貓閱讀 1,559評(píng)論 0 1
  • 物有本末,事有終始。 “但愿世間人無(wú)病,何愁架上藥生塵” 吃藥只是手段,不生病才是根本。 賺錢只是方法,幸福的活著...
    望心鏡閱讀 660評(píng)論 0 3

友情鏈接更多精彩內(nèi)容