1、基本概念
觀察者:Observer,觀察事件變化并處理的主要角色。消費者(Consumer)也可以理解成一種>特殊的觀察者。
被觀察者:觸發(fā)事件并決定什么時候發(fā)送事件的主要角色。(異常和完成也是一種事件)●Observable、Flowable、Single、Completable、Maybe都是被觀察者。
Flowable是支持背壓的一種被觀察者。
Single、Completable、Maybe是簡化版的Observable。
幾種被觀察者通過toObservable/toFlowable/toSingle/toCompletable/toMaybe相互轉(zhuǎn)換。
訂閱(subscribe):觀察者和被觀察者建立關(guān)聯(lián)的操作。
2、onError與onComplete為互斥事件。
3、基本操作符
基本操作符(https://blog.csdn.net/m0_46268254/article/details/139834767)
詳細使用(https://blog.csdn.net/y2653904/article/details/135892911)
just(T...): 將給定的參數(shù)作為事件序列發(fā)出。(最多10個) fromArray不受限制
fromIterable(Iterable): 從一個可迭代對象(如 List 或 Set)創(chuàng)建一個 Observable。
create(ObservableOnSubscribe): 使用自定義邏輯創(chuàng)建一個 Observable。
timer操作符:可以做定時操作,就是延遲執(zhí)行。時間間隔由timer控制。
interval 操作符:定時的周期性操作,與timer的區(qū)別就在于它可以重復(fù)操作。事件間隔由interval控制
subscribeOn(Scheduler): 指定 Observable 的訂閱過程應(yīng)該在哪個線程執(zhí)行。
observeOn(Scheduler): 指定 Observer 的回調(diào)方法應(yīng)該在哪個線程執(zhí)行。來決定下游事件被處理時所處的線程。
4、過濾操作符
filter(Predicate): 只允許滿足條件的項通過。
take(int): 只發(fā)出指定數(shù)量的項。
skip(int): 忽略序列開頭的指定數(shù)量的項。
distinct(): 確保不會發(fā)出重復(fù)的項。
firstElement(): 只發(fā)出第一個元素或者如果沒有元素則不發(fā)出任何東西。
lastElement(): 只發(fā)出最后一個元素或者如果沒有元素則不發(fā)出任何東西。
5、轉(zhuǎn)換操作符
map(Func1): ☆☆☆對每個項應(yīng)用函數(shù),并將結(jié)果發(fā)送給觀察者。
flatMap(Func1): ☆☆☆對每個項應(yīng)用函數(shù),該函數(shù)返回一個新的 Observable,然后合并這些 Observables 的輸出。
concatMap(Func1): 類似于 flatMap,但是它會按順序合并 Observable 的輸出。
buffer(): 收集來自原始 Observable 的項,并以批處理形式轉(zhuǎn)發(fā)它們。
scan(Func2): 累積地應(yīng)用函數(shù)到前一個結(jié)果和當(dāng)前項上,并發(fā)出累積的結(jié)果。
6、組合操作符
zip(Observable, Func2): 按數(shù)量,將多個 Observables 的項組合在一起,并使用提供的函數(shù)來創(chuàng)建新的結(jié)果。
combineLatest(Observable, Func2): 按時間,當(dāng)任意一個 Observable 發(fā)出新值時,將最新的值與另一個 Observable 的最新值結(jié)合。
startWith(T): 發(fā)送事件前追加發(fā)送事件,在原始 Observable 的序列之前添加一個或多個項。
按發(fā)送順序:concat(四個)、concatArray(無限)。串行發(fā)送。
merge(Observable): 按時間,合并多個 Observables 的輸出,但保持原始順序(如果可能的話)。并行發(fā)送。
7、Scheduler線程控制
在 RxJava 中,提供了一個名為 Scheduler 的線程調(diào)度器,RxJava 內(nèi)部提供了4個調(diào)度器,分別是:
- Schedulers.io(): I/O 操作(讀寫文件、數(shù)據(jù)庫、網(wǎng)絡(luò)請求等),與newThread()差不多,區(qū)別在于io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要進行大量的計算,以免產(chǎn)生不必要的線程;
- Schedulers.newThread(): 開啟新線程操作;
- Schedulers.immediate(): 默認指定的線程,也就是當(dāng)前線程;
- Schedulers.computation():計算所使用的調(diào)度器。這個計算指的是 CPU 密集型計算,即不會被 I/O等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。值得注意的是,不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU;
- AndroidSchedulers.mainThread(): RxJava 擴展的 Android 主線程;
public class MainActivity extends AppCompatActivity {
private final static String IMGPATH = "https://image.baidu.com/search/detail?";
private ImageView img;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
img = findViewById(R.id.img);
//創(chuàng)建Observable
Observable.just(IMGPATH)//發(fā)送圖片地址
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(IMGPATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才開始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
.subscribeOn(Schedulers.io())//上面是異步
.observeOn(AndroidSchedulers.mainThread())//下面是主線程
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
img.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
8、未取消訂閱而引起的內(nèi)存泄漏
在Activity#onDestroy()的時候或者不需要繼續(xù)執(zhí)行的時候應(yīng)該取消訂閱
Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//發(fā)送事件
e.onNext("袁震");
//事件發(fā)送完成
e.onComplete();
}
});
observable.unsubscribeOn(AndroidSchedulers.mainThread());