要用RxJava,那么就得先了解兩個概念
Observable被觀察者
Observer觀察者
比如放羊,羊是被觀察者,放羊娃就是觀察者,當羊餓了的時候,咩咩的叫個不停,相當于向放羊娃發(fā)出信號說我餓了,你看著辦,放羊娃呢,在聽到羊的叫聲后就會做出相應(yīng)的處理.那么這里前提是二者要關(guān)聯(lián)成為觀察與被觀察的關(guān)系.否則沒有關(guān)聯(lián)的關(guān)系,放羊娃才不會那么傻去喂養(yǎng)和自己沒關(guān)系的羊.
觀實者與被觀察者就是這樣的狀態(tài),本來各不相干,但是一旦二者關(guān)聯(lián),那么觀察者就和被觀察者形成關(guān)系,觀察者會對被觀察者的言行舉止做出相應(yīng)的響應(yīng).
Observable被觀察者(小羊)
首先是被觀察者的創(chuàng)建:(羊來了)
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
? ? ? ? ? ? ? ? ? // ?羊咩咩的叫個不停于是用e向外發(fā)射信號
????????????????????e.onNext("我餓了");
}
});
此處有兩個概念:
ObservableOnSubscribe :?一個接口,創(chuàng)建被觀察者時需要傳入他的實例
ObservableEmitter :?事件發(fā)射器
create方法做了判空處理后,返回了ObservableCreate的實例
ObservableCreate是Observable的子類實現(xiàn)
Observer?觀察者(放羊娃)
觀察者的創(chuàng)建
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(String s) {
//?接收到羊發(fā)來的信號s,我知道啦,馬上給你準備草去
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {? ? }
};
Observer用來對被觀察者發(fā)射出的事件做相應(yīng)的處理,也可理解為對監(jiān)聽到被觀察者的相應(yīng)的狀態(tài)做出相應(yīng)的處理.
observable.subscribe(observer);
最后通過subscribe方法完成被觀察者與觀察者的訂閱連接?,此時羊與放羊娃明確關(guān)系.
subscribe方法內(nèi)部實現(xiàn)其實最終調(diào)用的是observable的subscribeActual?實現(xiàn)方法
@Override
protected void subscribeActual(Observer observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到,該方法會將我們傳入的observer再次傳給CreateEmitter, CreateEmitter是ObservableCreater?的一個內(nèi)部類,同時實現(xiàn)了ObservableEmitter,Disposable這兩個接口,相當于關(guān)聯(lián)了Observer的基礎(chǔ)上又做了進一步的封裝,因為其內(nèi)部實現(xiàn)最終事件的執(zhí)行都是通過傳入的observer來完成的
其持有了observer實例后,在可以執(zhí)行observer的相應(yīng)事件方法的基礎(chǔ)上進一步的添加了一些控制訂閱,以及狀態(tài)判斷的方法.如下源碼可見:
static final class CreateEmitter
extends AtomicReference
implements ObservableEmitter, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer observer;
CreateEmitter(Observer observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter serialize() {
return new SerializedEmitter(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
ObservableEmitter也是一個接口,是Emitter的子類,如下源碼:
public interface ObservableEmitter extends Emitter
public interface Emitter {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
***********************************************************
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
onSubscribe方法將觀察者和emmitter發(fā)射器關(guān)聯(lián)起來,
往下走source.subscribe(parent);
此處的source就是我們在創(chuàng)建被觀察者是傳入的(new ObservableOnSubscribe())
至此可以看到ObservableOnSubscribe實例的subscriber方法將自身與關(guān)聯(lián)了Observer的事件發(fā)射器進行了關(guān)聯(lián).
總結(jié)
1.被觀察者Observabler的創(chuàng)建最終返回的是ObservableCreate的實例,所以被觀察者Observabler的狀態(tài)以及事件的執(zhí)行都是通過ObservableCreate的內(nèi)部類CreateEmitter來實現(xiàn)的,
2.CreateEmitter對訂閱的Observer進行了關(guān)聯(lián)封裝
3.被觀察者Observable通過subscribe方法將觀察者傳給他的內(nèi)部類CreateEmitter用來進行關(guān)聯(lián)封裝.
4.ObservableCreate創(chuàng)建的時候接收了Observable.create方法傳入的(newObservableOnSubscribe())并與自身的內(nèi)部類CreateEmitter進行了關(guān)聯(lián)
5.被觀察者Observabler要執(zhí)行的事件都會在傳入的ObservableOnSubscribe實例的subscriber方法中進行操作再經(jīng)由關(guān)聯(lián)的CreateEmitter向外發(fā)射.最終由CreateEmitter關(guān)聯(lián)封裝的Observer接收事件并作出響應(yīng).