什么是Rx呢?
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx.Rx是響應式編程的意思,本質是觀察者模式, 是以觀察者(Observer)和訂閱者(Subscriber)為基礎的異步響應方式. 在Android編程時, 經(jīng)常會使用后臺線程, 那么就可以使用這種方式.
RxJava ?
RxJava 主要的作用就是鏈式完成異步操作,并且非常強大,RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(訂閱者)。Observables發(fā)出一系列事件,Subscribers處理這些事件。
RxAndroid?
RxAndroid來源于RxJava, 在RxJava的基礎上擴展了一些Android的功能.主要的作用也是異步,隨意定制主線程、子線程的操作,鏈式編程,讓我們的代碼可讀性大大增強,總之,是非常好用。
我們可以了解一下官方的開源項目
我們再學習RxAndroid之前肯定是要做一些準備工作的
我們用Android Studio新建項目,直接在build.gradle中添加以下依賴
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
創(chuàng)建觀察者
- 定義一個方法,它完成某些任務,然后從異步調用中返回一個值,這個方法是觀察者的一部分
- 將這個異步調用本身定義為一個Observable
- 觀察者通過訂閱(Subscribe)操作關聯(lián)那個Observable
- 繼續(xù)你的業(yè)務邏輯,等方法返回時,Observable會發(fā)射結果觀察者的方法會開始處理結果或結果集
觀察者回調方法
- 回調方法(onNext、onCompleted、onError)
- onNext( T item)
Observable調用這個方法發(fā)射數(shù)據(jù),方法的參數(shù)就是Observable發(fā)射的數(shù)據(jù),這個方法可能會被調用多次,取決于你的實現(xiàn) - onError(Exception e)
當Observable遇到錯誤或者無法返回期望的數(shù)據(jù)時會調用這個方法,這個調用會終止Observable,后續(xù)不會再調用onNext和onCompleted方法,onError方法的參數(shù)書拋出異常 - onCompleted
正常終止,如果沒有遇到錯誤,Observable在最后一次調用onNext之后調用此方法
創(chuàng)建操作Create
- 你可以使用Create操作符從頭開始創(chuàng)建一個Observable,給這個操作符傳遞一個接收觀察者作為參數(shù)的函數(shù),編寫這個函數(shù)讓它的行為表現(xiàn)為一個Observable,恰當?shù)恼{用觀察者的onNext,onError和onCompleted方法
- 建議你在傳遞給create方法的函數(shù)中檢查觀察者的isUnsubscribed狀態(tài),以便在沒有觀察者的時候,讓你的Observable停止發(fā)射數(shù)據(jù)或者做昂貴的運算
//新建一個RxUtils類
public class RxUtils {
//創(chuàng)建一個觀察者方法
public static void createObserable(){
//定義被觀察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()){
subscriber.onNext("hello");// 發(fā)送事件
subscriber.onNext("world");// 發(fā)送事件
subscriber.onNext(downLoadJosn());// 發(fā)送事件
subscriber.onCompleted();// 完成事件
}
}
});
//訂閱者
Subscriber<String> showSub = new Subscriber<String>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(String s) {
Log.i("adu","onNext"+s);
}
};
observable.subscribe(showSub);//關聯(lián)被觀察者
}
/**
* 創(chuàng)建字符串
*/
public static String downLoadJosn(){
return "json data";
}
}
我們在MainActivity中去創(chuàng)建一個Button的點擊事件,然后調用這個方法
會在日志中看到結果是按照順序依次執(zhí)行

/**
* create第二種方式
*/
public static void createPrint(){
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override public void call(Subscriber<? super Integer> subscriber) {
if(!subscriber.isUnsubscribed()){
for(int i= 1;i<=10;i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}
}).subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer integer) {
Log.i("adu","onNext==》》"+integer);
}
});
}
調用方法,查看日志

From操作
將其他種類的對象和數(shù)據(jù)類型轉換為Observable
當你使用Observable時,如果你要處理的數(shù)據(jù)都可以轉換成展現(xiàn)為Observables,而不是需要使用Observables和其他類型的數(shù)據(jù)
/**
* 使用在被觀察者,返回的對象一般都是數(shù)值類型
*/
public static void from(){
Integer [] items={1,2,3,4,5,6,7,8,9};
Observable observable = Observable.from(items);
observable.subscribe(new Action1() {
@Override public void call(Object o) {
Log.i("adu",o.toString());
}
});
}
調用方法,查看日志

interval操作
interval操作符既可以延遲執(zhí)行一段邏輯
/**
* 指定某一時刻進行數(shù)據(jù)發(fā)送
*/
public static void interval(){
Integer [] items = {1,2,3,4,5};
Observable observable = Observable.interval(1,1, TimeUnit.SECONDS);//每隔一秒發(fā)送數(shù)據(jù)
observable.subscribe(new Action1() {
@Override public void call(Object o) {
Log.i("adu",o.toString());
}
});
}
just操作
獲取輸入數(shù)據(jù), 直接分發(fā), 更加簡潔, 省略其他回調.
/**
* 處理數(shù)組集合
*/
public static void just(){
Integer[] items1={1,2,3,4,5};
Integer[] items2={4,5,1,6,0};
Observable observable = Observable.just(items1,items2);
observable.subscribe(new Subscriber<Integer[]>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer[] integers) {
for (int i= 0;i<integers.length;i++){
Log.i("adu","onNext==》》"+integers[i]);
}
}
});
}
調用方法,查看日志 range操作
range操作符的作用根據(jù)出入的初始值n和數(shù)目m發(fā)射一系列大于等于n的m個值。
/**
* 指定輸出數(shù)據(jù)的范圍
*/
public static void range(){
Observable observable = Observable.range(23,4);
observable.subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer integer) {
Log.i("adu","onNext==》》"+integer);
}
});
}
調用方法,查看日志 過濾
/**
* 過濾某些條件
*/
public static void filter(){
Observable observable = Observable.just(1,2,3,4,5,6,7,8);
observable.filter(new Func1<Integer,Boolean>() {
@Override public Boolean call(Integer integer) {
return integer<5;
}
}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer integer) {
Log.i("adu","inNext==》》"+integer);
}
});
}
調用方法,查看日志