關(guān)于RxJava和RxAndroid
強烈建議閱讀這篇文章: http://gank.io/post/560e15be2dca930e00da1083
RxAndroid使用了觀察者模式,屬響應(yīng)型機制。
參考 https://github.com/ReactiveX/RxAndroid/
應(yīng)用場景:異步。它是一個實現(xiàn)異步操作的庫。
優(yōu)勢:保持代碼可讀性
使用實例
IDE Android Studio
添加依賴庫
dependencies {
// ......
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.1.6'
}
實例1 直接關(guān)聯(lián)被觀察者與訂閱者
先定義出被觀察者(事件源)和訂閱者。然后把它們關(guān)聯(lián)起來。
當(dāng)訂閱者執(zhí)行了onCompleted()后,就不再接收消息了。
/**
* 被觀察者
*/
Observable.OnSubscribe mObservableAction = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("mObservableAction: " + mCount);
/**
* Notifies the Observer that the {@link Observable}
* has finished sending push-based notifications
*/
subscriber.onCompleted();// 執(zhí)行了此方法后,將不再接收處理消息
}
};
/**
* 接收消息的訂閱者
*/
Subscriber<String> mSubscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: got sth");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onNext(String str) {
Log.d(TAG, "onNext:" + str);
mTv1.setText(str);
}
};
/**
* 作為觀察者 - 接收到事件后執(zhí)行操作
* 不知為何要起 Action1 這個名字
*/
private Action1<String> mActionTv2 = new Action1<String>() {
@Override
public void call(String s) {
mTv2.setText(s);
}
};
findViewById(R.id.act_rx_btn1).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
mCount++;
Log.d(TAG, "onClick: " + mCount);
/**
* 實例1 將事件源與訂閱者關(guān)聯(lián)起來
*/
@SuppressWarnings("unchecked")
Observable<String> observable = Observable.create(mObservableAction)
.subscribeOn(AndroidSchedulers.mainThread());
observable.subscribe(mSubscriber1);// 先通知一個,再通知另一個
observable.subscribe(mActionTv2); // 這個可以一直執(zhí)行下去
}
});
實例2 直接分發(fā)特定事件給訂閱者
private Action1<String> mActionTv3 = new Action1<String>() {
@Override
public void call(String s) {
mTv3.setText(s);
}
};
private Action1<String> mActionShowToast = new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(RxAndroidActivity.this, s, Toast.LENGTH_SHORT).show();
}
};
findViewById(R.id.act_rx_btn2).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
// 事件產(chǎn)生,分發(fā)給訂閱者
Observable<String> oba1 = Observable.just("事件分發(fā)源 " + mCount);
oba1.observeOn(AndroidSchedulers.mainThread());
oba1.subscribe(mActionTv3);
oba1.subscribe(mActionShowToast);
}
});
循環(huán)產(chǎn)生的消息
在子線程中產(chǎn)生消息,通知UI線程。
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe(),
就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。
如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。
和上一個例子一樣,定時產(chǎn)生一個消息,發(fā)送給訂閱者
private Action1<String> mActionTimer = new Action1<String>() {
@Override
public void call(String s) {
final String second = s;
/**
* 跑在UI線程里更新
*/
runOnUiThread(new Runnable() {
@Override
public void run() {
mTimerTv.setText(second);
}
});
}
};
new Thread(new Runnable() {
@Override
public void run() {
int s = 0;
while (s <= 100) {
Observable<String> timerOb = Observable.just(String.valueOf(s) + "s");
// 指定在主線程發(fā)生回調(diào)
timerOb.observeOn(AndroidSchedulers.mainThread());
timerOb.subscribe(mActionTimer);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
s++;
}
}
}).start();
實例效果
| 效果圖 | |
|---|---|
![]() 實例1
|
![]() 實例2
|
畫面中的秒數(shù)計數(shù)器一直在更新
線程控制
使用Scheduler的API
調(diào)用Observable.subscribeOn(Schedulers s)來設(shè)定被觀察的任務(wù)執(zhí)行的線程
Observable.observeOn()來設(shè)定回調(diào)使用的線程
以下是Schedulers的部分源碼
public final class Schedulers {
private final Scheduler computationScheduler; // 計算線程 與CPU有關(guān)
private final Scheduler ioScheduler; // 主要用于I/O讀寫
private final Scheduler newThreadScheduler;
// .......
}
可以從注釋中了解到線程切換的效果
-
Schedulers.immediate()不切換線程 -
Schedulers.newThread()對每一次任務(wù)啟動一個新的線程 -
Schedulers.computation()適用于計算工作,比如處理循環(huán)事件,回調(diào)或者其他計算工作。不要在這里進行IO相關(guān)的操作。 -
Schedulers.io()內(nèi)部實現(xiàn)中有一個自增長的線程池,可用于異步的阻塞IO讀寫工作。不要把計算工作放在這里。
還有一個Android專用的UI線程,引入rx.android.schedulers.AndroidSchedulers;
-
AndroidSchedulers.mainThread()使用UI線程
代碼示例:在IO線程讀取圖片,然后顯示在界面上
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = ContextCompat.getDrawable(getApplicationContext(), R.mipmap.ic_launcher);
int count = 0;
while (count < 100) {
count++;// 人為制造一些延時
try {
Thread.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 在io線程取數(shù)據(jù)
.observeOn(AndroidSchedulers.mainThread())//在主線程執(zhí)行回調(diào)
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
mIv1.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(getApplicationContext(), "Error!", Toast.LENGTH_SHORT).show();
}
});
變換
將時間序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。串成串串。
使用map()來進行變換
輸入圖片的資源int值,通過map獲得Drawable對象,然后發(fā)送給監(jiān)聽者
這是最簡單最常用的變換方式,一對一的變換
Observable.just(R.mipmap.ic_launcher)
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
return ContextCompat.getDrawable(getApplicationContext(), integer);
}
})
.subscribeOn(Schedulers.io())// 線程調(diào)度
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Drawable>() {
@Override
public void call(Drawable drawable) {
mIv2.setImageDrawable(drawable);// 顯示圖片
}
});
其中Func1是一個接口;T是輸入對象,R是返回對象
public interface Func1<T, R> extends Function {
R call(T t);
}
使用flatMap
一個對象中持有某個集合,想要把這個集合輸出。
例如User持有一個String list,現(xiàn)在想一個個地獲取list中的內(nèi)容
User tom = new User("Tom");
User jerry = new User("jerry");
tom.profileList.add("p1");
tom.profileList.add("p2");
jerry.profileList.add("p4");
jerry.profileList.add("p5");
Observable.just(tom, jerry)
.flatMap(new Func1<User, Observable<String>>() {
@Override
public Observable<String> call(User user) {
Log.d(TAG, "user: " + user.name);
return Observable.from(user.profileList); // 可以接受Iterable
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "profile: " + s);
}
});
/**
* 示例用戶類
*/
class User {
public User(String name) {
this.name = name;
}
public String name;
public List<String> profileList = new ArrayList<>();
}
/*
輸出
user: Tom
profile: p1
profile: p2
user: jerry
profile: p4
profile: p5
*/
更多請參閱: http://rustfisher.github.io/2017/04/14/Android_note/RxAndroid-Instruction/

