一、disposable用法
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableConsumeExample() {
Disposable disposable = model.getRxJavaFlowableRealExample()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
1、在activity的生命周期結(jié)束前清除compositeDisposable即可
public void onFinishActivity() {
compositeDisposable.dispose();
}
二、注意事項
1、 compositeDisposable.dispose(); 此處只是把觀察者和被觀察者之前的通道切斷,觀察者不能接收數(shù)據(jù),而被觀察者則會繼續(xù)發(fā)送數(shù)據(jù)到結(jié)束為止。
2、如上例所示,如果在無限for循環(huán)中不添加
if (emitter.isCancelled()) {
break;
}
這樣的語句,則被觀察者會無限發(fā)送數(shù)據(jù)。
日志
備注:(關(guān)閉acitivity情況下,等于已經(jīng)切斷通道,可以requested會保持某個值,一直打印數(shù)據(jù))
08-31 14:14:15.203 29916-30066/com.example.zhang D/MainModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87145
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87146
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87147
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2--:87148
getRxJavaFlowableRealExample---:RxCachedThreadScheduler-2-request-emit:45
三、總結(jié)
1、被觀察者如果是無限發(fā)送數(shù)據(jù),或者有線程阻塞的情況下,要手動控制
類似
if (emitter.isCancelled()) {
break;
}