使用
導包
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0-RC8'
基本使用
Observable.create(ObservableOnSubscribe<Int> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
emitter.onNext(4)
}).subscribe(object : Observer<Int> {
override fun onComplete() {
Log.i(TAG,"onComplete")
}
override fun onSubscribe(d: Disposable) {
// print > onSubscribe: io.reactivex.rxjava3.internal.operators.observable.ObservableCreate$CreateEmitter
Log.i(TAG,"onSubscribe: "+d.javaClass.name)
}
override fun onNext(t: Int) {
// print > 1 2 3
Log.i(TAG,"onNext: $t")
}
override fun onError(e: Throwable?) {
Log.i(TAG,"onError")
}
})
配合Retrofit使用
interface IFreeService {
@GET("api/personalMessage/get/{id}")
fun updatePersonalMessage(
@Path("id") personalId: String
): Single<List<Repo>>
}
val retrofit = Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.build()
val api = retrofit.create(IFreeService::class.java)
api.updatePersonalMessage(personalId = "1")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<List<Repo>>{
override fun onSuccess(t: List<Repo>?) {
}
// 剛剛訂閱就會得到回調(diào),做初始化工作,在網(wǎng)絡(luò)請求之前調(diào)用
override fun onSubscribe(d: Disposable?) {
}
override fun onError(e: Throwable?) {
}
})
注意頁面關(guān)閉需要解綁,避免內(nèi)存泄漏。
Observable源碼解析
Observable#subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 關(guān)鍵代碼,執(zhí)行了被觀察者的subscribeActual方法
subscribeActual(observer);
}catch (Throwable e) {
...
}
}
被觀察者通過create創(chuàng)建ObservableCreate對象。
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//創(chuàng)建發(fā)射器,并且把下游的observer給發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<>(observer);
// 下游接收到被訂閱回調(diào)
observer.onSubscribe(parent);
try {
//上游訂閱,訂閱創(chuàng)建的發(fā)射器
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這個就相當于在原有被觀察者,觀察者模型間插了一層CreateEmitter,上游發(fā)送數(shù)據(jù)給CreateEmitter,加工后再給到下游。
CreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
總結(jié):
- 接收到上游發(fā)送數(shù)據(jù),如果沒有解綁會直接轉(zhuǎn)發(fā)數(shù)據(jù)到下游。
- onComplete和onError都會調(diào)用dispose方法,再調(diào)用onNext,下游也就接收不到數(shù)據(jù)了。
取消訂閱
上游停止生產(chǎn),上游不再向下游傳遞數(shù)據(jù)。
講解:Observable的delay操作符
從上面Observable源碼解析可以得到delay方法,生成了ObservableDelay對象,鏈式調(diào)用會調(diào)用它的subscribeActual方法。
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
Observer<T> observer;
if (delayError) {
observer = (Observer<T>)t;
} else {
observer = new SerializedObserver<>(t);
}
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
}
DelayObserver
static final class DelayObserver<T> implements Observer<T>, Disposable {
Disposable upstream;
@Override
public void onSubscribe(Disposable d) {
// 握有上游的Disposable對象,下游綁定的DelayObserver這個Disposable對象
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(final T t) {
w.schedule(new OnNext(t), delay, unit);
}
@Override
public void onError(final Throwable t) {
w.schedule(new OnError(t), delayError ? delay : 0, unit);
}
@Override
public void onComplete() {
w.schedule(new OnComplete(), delay, unit);
}
@Override
public void dispose() {
// 取消上游任務(wù)
upstream.dispose();
// 取消延時發(fā)送任務(wù)
w.dispose();
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
final class OnNext implements Runnable {
private final T t;
OnNext(T t) {
this.t = t;
}
@Override
public void run() {
downstream.onNext(t);
}
}
final class OnError implements Runnable {
private final Throwable throwable;
OnError(Throwable throwable) {
this.throwable = throwable;
}
@Override
public void run() {
try {
downstream.onError(throwable);
} finally {
w.dispose();
}
}
}
final class OnComplete implements Runnable {
@Override
public void run() {
try {
downstream.onComplete();
} finally {
w.dispose();
}
}
}
}