注:本文摘自《RxJava Essentials》翻譯中文版電子書
StrictMode
StrictMode是一種模式,為了獲得更多出現(xiàn)在代碼中的關(guān)于公共問題的信息。
StrictMode幫助我們偵測敏感的活動,如我們無意的在主線程執(zhí)行磁盤訪問或者網(wǎng)絡(luò)調(diào)用。
為了在我們的App中激活StrictMode,我們只需要在MainActivity中添加幾行代碼,即onCreate()方法中這樣:
@Override
public void onCreate() {
super.onCreate();
if (BuildConfig.DEBUG) {
StrictMode.setThreadPolicy(new StrictMode.ThreadPolicy.Builder().detectAll().penaltyLog().build());
StrictMode.setVmPolicy(new StrictMode.VmPolicy.Builder().detectAll().penaltyLog().build());
}
}
我們并不想它總是激活著,因此我們只在debug構(gòu)建時使用。這種配置將報告每一種關(guān)于主線程用法的違規(guī)做法,并且這些做法都可能與內(nèi)存泄露有關(guān):Activities、BroadcastReceivers、Sqlite等對象。
選擇了penaltyLog(),當(dāng)違規(guī)做法發(fā)生時,StrictMode將會在logcat打印一條信息。
避免阻塞I/O的操作
我們激活StrictMode后,我們開始收到了關(guān)于我們的App錯誤操作磁盤I/O的不良信息。比如:
D/StrictMode StrictMode policy violation; ~duration=998 ms: android.os.StrictMode$StrictModeDiskReadViolation: policy=31 violation=2
at android.os.StrictMode$AndroidBlockGuardPolicy.onReadFromDisk (StrictMode.java:1135)
at libcore.io.BlockGuardOs.open(BlockGuardOs.java:106) at libcore.io.IoBridge.open(IoBridge.java:393)
at java.io.FileOutputStream.<init>(FileOutputStream.java:88)
at android.app.ContextImpl.openFileOutput(ContextImpl.java:918)
at android.content.ContextWrapper.openFileOutput(ContextWrapper. java:185)
at com.packtpub.apps.rxjava_essentials.Utils.storeBitmap (Utils.java:30)
上一條信息告訴我們Utils.storeBitmap()函數(shù)執(zhí)行完耗時998ms:在UI線程上近1秒的不必要的工作和App上近1秒不必要的遲鈍。這是因?yàn)槲覀円宰枞姆绞皆L問磁盤。
Schedulers
Schedulers 是調(diào)度器。RxJava提供了5種調(diào)度器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
Schedulers.io()
這個調(diào)度器時用于I/O操作。它基于根據(jù)需要,增長或縮減來自適應(yīng)的線程池。我們將使用它來修復(fù)我們之前看到的StrictMode違規(guī)做法。由于它專用于I/O操作,所以并不是RxJava的默認(rèn)方法;正確的使用它是由開發(fā)者決定的。
重點(diǎn)需要注意的是線程池是無限制的,大量的I/O調(diào)度操作將創(chuàng)建許多個線程并占用內(nèi)存。一如既往的是,我們需要在性能和簡捷兩者之間找到一個有效的平衡點(diǎn)。
Schedulers.computation()
這個是計算工作默認(rèn)的調(diào)度器,它與I/O操作無關(guān)。它也是許多RxJava方法的默認(rèn)調(diào)度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
Schedulers.immediate()
這個調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默認(rèn)的調(diào)度器。
Schedulers.newThread()
這個調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動一個新的線程。
Schedulers.trampoline()
當(dāng)我們想在當(dāng)前線程執(zhí)行一個任務(wù)時,并不是立即,我們可以用.trampoline()將它入隊。這個調(diào)度器將會處理它的隊列并且按序運(yùn)行隊列中每一個任務(wù)。它是repeat()和retry()方法默認(rèn)的調(diào)度器。
非阻塞I/O操作
假設(shè)blockingStoreBitmap方法是一個操作IO的方法,會阻塞線程,如何用RxJava解決:
public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
});
}
每次我們調(diào)用storeBitmap(),RxJava處理創(chuàng)建所有它需要從I / O線程池一個特定的I/ O線程執(zhí)行我們的任務(wù)。所有要執(zhí)行的操作都避免在UI線程執(zhí)行并且我們的App比之前要快上1秒
SubscribeOn and ObserveOn
RxJava提供了subscribeOn()方法來用于每個Observable對象:
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
observeOn()方法將會在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程。onBackpressureBuffer()方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費(fèi)的數(shù)據(jù)要更快的話,它必須把它們存儲在緩存中并提供一個合適的時間給它們。
執(zhí)行網(wǎng)絡(luò)任務(wù)
作為網(wǎng)絡(luò)訪問的第一個案例,我們將創(chuàng)建下面這樣一個場景:
- 加載一個進(jìn)度條。
- 用一個按鈕開始文件下載。
- 下載過程中更新進(jìn)度條。
- 下載完后開始視頻播放。
創(chuàng)建mDownloadProgress,用來管理進(jìn)度的更新,它和download函數(shù)協(xié)同工作。
private PublishSubject<Integer> mDownloadProgress = PublishSubject.create();
private Observable<Boolean> obserbableDownload(String source, String destination) {
return Observable.create(subscriber -> {
try {
boolean result = downloadFile(source, destination);
if (result) {
subscriber.onNext(true);
subscriber.onCompleted();
} else {
subscriber.onError(new Throwable("Download failed."));
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}
現(xiàn)在我們需要觸發(fā)下載操作,點(diǎn)擊下載按鈕:
@OnClick(R.id.button_download)
void download() {
mButton.setText(getString(R.string.downloading));
mButton.setClickable(false);
mDownloadProgress.distinct()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
App.L.debug("Completed");
}
@Override
public void onError(Throwable e) {
App.L.error(e.toString());
}
@Override
public void onNext(Integer progress) {
mArcProgress.setProgress(progress);
}
});
String destination = "sdcardsoftboy.avi";
obserbableDownload("http://archive.blender.org/fileadmin/movies/softboy.avi", destination)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(success -> {
resetDownloadButton();
Intent intent = new Intent(android.content.Intent.ACTION_VIEW);
File file = new File(destination);
intent.setDataAndType(Uri.fromFile(file),"video/avi");
intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
startActivity(intent);
}, error -> {
Toast.makeText(getActivity(), "Something went south", Toast.LENGTH_SHORT).show();
resetDownloadButton();
});
}
downloadFile:
private boolean downloadFile(String source, String destination) {
boolean result = false;
InputStream input = null;
OutputStream output = null;
HttpURLConnection connection = null;
try {
URL url = new URL(source);
connection = (HttpURLConnection) url.openConnection();
connection.connect();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
return false;
}
int fileLength = connection.getContentLength();
input = connection.getInputStream();
output = new FileOutputStream(destination);
byte data[] = new byte[4096];
long total = 0;
int count;
while ((count = input.read(data)) != -1) {
total += count;
if (fileLength > 0) {
int percentage = (int) (total * 100 / fileLength);
mDownloadProgress.onNext(percentage);
}
output.write(data, 0, count);
}
mDownloadProgress.onCompleted();
result = true;
} catch (Exception e) {
mDownloadProgress.onError(e);
} finally {
try {
if (output != null) {
output.close();
}
if (input != null) {
input.close();
}
} catch (IOException e) {
mDownloadProgress.onError(e);
}
if (connection != null) {
connection.disconnect();
mDownloadProgress.onCompleted();
}
}
return result;
}