1、什么是觀察者模式:
觀察者模式是基于Subject這個概念的,Subject是一種特殊對象,又叫主題或者被觀察者。當(dāng)改變時,那些由他保存的一系列對象將會得到通知,而這一系列對象被稱作Observer(觀察者)。
2、擴展的觀察者模式
在Rxjava中主要有4個角色
1、Observable
2、Subject
3、Observer
4、Subscriber
Observable和Subject是兩個“生產(chǎn)實體”,Observer和Subscriber是兩個“消費實體”。直白的說Observable對應(yīng)于觀察者模式中的被觀察者,而Observer和Subscriber對應(yīng)觀察者模式中的觀察者。而Subscriber其實是一個實現(xiàn)了Observer的抽象類。Subject比較復(fù)雜。
Rxjava如何使用的
第一步:創(chuàng)建觀察者Observer
Observer observer = new Observer() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
};
開篇我們講過RxJava是基于一種擴展的觀察這模式實現(xiàn),這里多出的onCompleted和onError正是對觀察者模式的擴展。ps:onNext就相當(dāng)于普通觀察者模式中的update
RxJava中添加了普通觀察者模式缺失的三個功能:
RxJava中規(guī)定當(dāng)不再有新的事件發(fā)出時,可以調(diào)用onCompleted()方法作為標(biāo)示;
當(dāng)事件處理出現(xiàn)異常時框架自動觸發(fā)onError()方法;
同時Observables支持鏈?zhǔn)秸{(diào)用,從而避免了回調(diào)嵌套的問題。
第二步:創(chuàng)建被觀察者Observable
Observable.create()方法可以創(chuàng)建一個Observable,使用crate()創(chuàng)建Observable需要一個OnSubscribe對象,這個對象繼承Action1。當(dāng)觀察者訂閱我們的Observable時,它作為一個參數(shù)傳入并執(zhí)行call()函數(shù)。
Observable observable1 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Object o) {
}
});
除了create能創(chuàng)建Observable,還有just和from也能創(chuàng)建
just(T)方式
/***第二種:創(chuàng)建被1觀察者**/
Observable observable2 = Observable.just("one", "two", "three", "foth");
//上面代碼會調(diào)用
//onNext("one");
//onNext("two");
//onNext("three");
//onNext("foth");
//onCompleted();
from(T[])/form(Iterable<? extends T>)
/***第三種創(chuàng)建被觀察者*/
String[] array = {"one", "two", "three", "foth"};
Observable observable3 = Observable.from(array);
//onNext("one");
//onNext("two");
//onNext("three");
//onNext("foth");
//onCompleted();
第三部:被觀察者Observable訂閱觀察者Observer
這里就不同于普通的觀察者模式,這里是被觀察者訂閱觀察者
Observable.subscribe(obserber);
連接起來寫就是這樣的
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 5; i++){
subscriber.onNext(i);
}
}
}).subscribe(new Observer<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Toast.makeText(MainActivity.this, "create"+integer, Toast.LENGTH_SHORT).show();
}
});
上述代碼一旦調(diào)用subscribe()訂閱后就會自動觸發(fā)call方法,call()中的Subscriber其實就是一個觀察者observer,subscriber是實現(xiàn)了Observer的接口。
在看subscribe()這個方法內(nèi)部將傳進來的Observer做了一層代理,將他轉(zhuǎn)換成Subscriber??纯丛创a:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
返回的是observable.onSubscribe,正是create()方法中的Subscribe。
subscribe()的參數(shù)除了可以是Observer和Subscriber以外還可以是Action1,Action0,這是一種簡單的回調(diào),只有一個call(T)方法。
Observable.just("1111", "2222")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, "just print log"+s, Toast.LENGTH_SHORT).show();
}
});
Observable.from(str)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, "just print log", Toast.LENGTH_SHORT).show();
}
});
Schedulers線程的切換

合起來的鏈?zhǔn)讲僮骶褪沁@樣:
Observable.from(str)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, "just print log", Toast.LENGTH_SHORT).show();
}
});