RxJava學(xué)習(xí):一個簡單的Demo讓你了解Rxjava的原理

客戶需求

/**
 * 根據(jù)一個Cat-Sdk提供的API,通過關(guān)鍵字來搜索網(wǎng)絡(luò)中符合要求的所有貓的圖片,根據(jù)
 * 其可愛程度的數(shù)據(jù),將這張最可愛的圖片保存到本地中。
 * 完成任務(wù):
 * 1、下載返回的貓的圖片
 * 2、找到最可愛的那張圖片
 * 3、保存到本地
 */

Cat-Sdk API (Version 1.0)

  • model類

      public class Cat implements Comparable<Cat> {
    
          private Bitmap mBitmap;
          private int mCuteness;
      
          @TargetApi(Build.VERSION_CODES.KITKAT)
          @Override
          public int compareTo(Cat another) {
              return Integer.compare(mCuteness, another.mCuteness);
          }
      }
    
  • 阻塞式interface

      public interface Api {
      
          // 通過查詢條件查詢所有符合條件的貓
          List<Cat> queryCats1(String query);
          // 保存最可愛的貓,并返回保存路徑
          Uri store1(Cat cat);
      }
    

業(yè)務(wù)邏輯

最理想的做法:阻塞調(diào)用
    public class CatsHelper {
        private Api api;
        public Uri saveTheCutestCat1(String query) {
            List<Cat> catList = api.queryCats1(query);
            Cat cutestCat = findCutestCat(catList);
            Uri savedUri = api.store1(cutestCat);
            return savedUri;
        } 
        private Cat findCutestCat(List<Cat> catList) {
            return Collections.max(catList);
        }
     }

上面的代碼是不是很清晰?是的。saveTheCutestCat1方法使用參數(shù)來調(diào)用這些方法。當(dāng)一個方法執(zhí)行完并返回結(jié)果后,再利用這個返回結(jié)果去調(diào)用下一個方法。除了簡單有效以外,還有其他優(yōu)點:

  • 組合功能(Composition)

    saveTheCutestCat1 由其他三個方法調(diào)用所組成,通過方法來把一個大功能分割為每個容易理解的小功能。通過方法調(diào)用來組合使用這些小功能,使用和理解起來都相當(dāng)簡單。

  • 異常處理

    每個方法都可以通過拋出異常來結(jié)束運行,該異??梢栽趻伋霎惓5姆椒ɡ锩嫣幚?,也可以在調(diào)用該方法的外面處理。無需每次都處理每個異常,我們還可以在一個地方處理所有可能拋出的異常,直接使用try...catch語句或者向外拋出。

理想是好的,但現(xiàn)實是殘酷的。在Android中不可能只使用阻塞調(diào)用,而我們更多的是使用異步回調(diào)。

異步回調(diào)

這里我們先不去關(guān)心Cat-Sdk API使用異步調(diào)用的api去訪問網(wǎng)絡(luò)資源,暫時假設(shè)我們已經(jīng)獲取到了網(wǎng)絡(luò)資源的結(jié)果了。那么新的API接口變?yōu)檫@樣了:

public interface Api {
        
     interface CatsQueryCallback {
        void onCatListReceived(List<Cat> catList);

        void onError(Exception e);
    }
    interface StoreCallback {
        void onCatStored(Uri uri);

        void onStoredFail(Exception e);
    }
    List<Cat> queryCats2(String query, CatsQueryCallback callback);
    Uri store2(Cat cat, StoreCallback callback);
}

那我們的的CatsHelper也要發(fā)生改變了:

public class CatsHelper {
    private Api api;

    public interface CutestCatCallback {
        void onCutestCatSaved(Uri uri);

        void onQueryFailed(Exception e);
    }

    public void saveTheCutestCat2(String query, final CutestCatCallback cutestCatCallback) {

        api.queryCats2(query, new Api.CatsQueryCallback() {
            @Override
            public void onCatListReceived(List<Cat> catList) {
                Cat cutestCat = findCutestCat(catList);
                api.store2(cutestCat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        cutestCatCallback.onCutestCatSaved(uri);
                    }

                    @Override
                    public void onStoredFail(Exception e) {
                        cutestCatCallback.onQueryFailed(e);
                    }
                });
            }

            @Override
            public void onError(Exception e) {
                cutestCatCallback.onQueryFailed(e);
            }
        });
    }

這個這個...。與阻塞調(diào)用比起來,一個是天堂, 一個是地獄啊。業(yè)務(wù)邏輯雖然是一樣的,但是有太多的干擾代碼了;太多的的匿名內(nèi)部類了;組合功能也不見了,需通過回調(diào)接口手動處理;異常也不會自動傳遞了,也需要手動的處理。上面的代碼不僅長得惡心,并且更難發(fā)現(xiàn)潛在的bug。那怎么辦了?怎么辦了?

泛型接口

通過觀察這三個接口(CatsQueryCallback, StoreCallback, CutestCatCallback),會發(fā)現(xiàn)一個共同點:

(1) 都有一個方法來返回結(jié)果(onCutestCatSaved, onCatListReceived, onCatStored)

(2) 都有一個方法來返回異常(onQueryFailed, onError, onStoredFail)

所以,我們可以使用一個泛型接口來替代這三個接口

public interface Callback<T> {
    void onResult(T result);

    void onError(Exception e);
}

由于我們無法修改SDK中api中方法的參數(shù),所以需新創(chuàng)建一個包裝類

public class ApiWrapper {
    private Api api;

    public void queryCats(String query, final Callback<List<Cat>> catsCallback) {
        api.queryCats2(query, new Api.CatsQueryCallback() {
            @Override
            public void onCatListReceived(List<Cat> catList) {
                catsCallback.onResult(catList);
            }

            @Override
            public void onError(Exception e) {
                catsCallback.onError(e);
            }
        });
    }

    public void store(Cat cat, final Callback<Uri> uriCallback) {
        api.store2(cat, new Api.StoreCallback() {
            @Override
            public void onCatStored(Uri uri) {
                uriCallback.onResult(uri);
            }

            @Override
            public void onStoredFail(Exception e) {
                uriCallback.onError(e);
            }
        });
    }       
}

再來看看我們的CatsHelper類

public class CatsHelper {
    private ApiWrapper wrapper;
    public void saveTheCutestCat3(String query, final Callback<Uri> cutestCallback) {
        wrapper.queryCats(query, new Callback<List<Cat>>() {
            @Override
            public void onResult(List<Cat> result) {
                Cat cutestCat = findCutestCat(result);
                wrapper.store(cutestCat, cutestCallback);
            }

            @Override
            public void onError(Exception e) {
                cutestCallback.onError(e);
            }
     });
}

嗯,不錯。與上一版的CatsHelper比較來,看起來確實舒服多了。那么還可以再優(yōu)化嗎?

分離參數(shù)和回調(diào)接口

我們繼續(xù)來觀察它們的共同點:queryCats,store,saveTheCutestCat3這三個異步方法的參數(shù)都是一個回調(diào)接口和一般參數(shù),那么把這個一般參數(shù)和回調(diào)接口分離,讓異步操作只管理一般參數(shù),而返回一個臨時對象來管理回調(diào)接口。

首先創(chuàng)建一個管理回調(diào)接口的臨時對象

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);
}

再修改我們的api包裝類

public class ApiWrapper {
    private Api api;
    public AsyncJob<List<Cat>> queryCats2(final String query) {

        return new AsyncJob<List<Cat>>() {
            @Override
            public void start(final Callback<List<Cat>> callback) {
                api.queryCats2(query, new Api.CatsQueryCallback() {
                    @Override
                    public void onCatListReceived(List<Cat> catList) {
                        callback.onResult(catList);
                    }
                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
    public AsyncJob<Uri> store2(final Cat cat) {

        return new AsyncJob<Uri>() {
            @Override
            public void start(final Callback<Uri> callback) {
                api.store2(cat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        callback.onResult(uri);
                    }
                    @Override
                    public void onStoredFail(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

最后修改我們的CatsHelper類

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat4(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        
        final AsyncJob<Cat> cutestCatAsyncJob = new AsyncJob<Cat>() {
            @Override
            public void start(final Callback<Cat> callback) {
                catListAsyncJob.start(new Callback<List<Cat>>() {
                    @Override
                    public void onResult(List<Cat> result) {
                        callback.onResult(findCutestCat(result));
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
        AsyncJob<Uri> storedUriAsyncJob = new AsyncJob<Uri>() {
            @Override
            public void start(final Callback<Uri> callback) {
                cutestCatAsyncJob.start(new Callback<Cat>() {
                    @Override
                    public void onResult(Cat result) {
                        wrapper.store2(result)
                                .start(new Callback<Uri>() {
                                    @Override
                                    public void onResult(Uri result) {
                                        callback.onResult(result);
                                    }

                                    @Override
                                    public void onError(Exception e) {
                                        callback.onError(e);
                                    }
                                });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
        return storedUriAsyncJob;
    }
}

上面的代碼將整個流程分解為幾個操作,數(shù)據(jù)流向為:

         (async)                 (sync)           (async)
query ===========> List<Cat> -------------> Cat ==========> Uri
    queryCats              findCutest          store

雖然findCutest標(biāo)記的是sync的,是相對qurey和store來說的,但在整個操作中,他還是屬于異步的。因為如果一個操作是異步的,則每個調(diào)用該異步操作的方法也是異步的。

上面的代碼量雖然增多了,但看起來還是比較清晰的,并且更加容易理解上面的異步操作:catListAsyncJob, cutestCatAsyncJob, storedUriAsyncJob. 不過,這個saveTheCutestCat4比saveTheCutestCat3有什么優(yōu)勢嗎?這個這個...。好像是沒有哦。好吧。我們繼續(xù)優(yōu)化。

簡單的映射

在cutestCatAsyncJob那一塊代碼中,其實核心的邏輯只有findCutestCat(result),其他的代碼只是為了啟動AnsyJob并接收結(jié)果和處理異常的干擾代碼。但是這些代碼是通用的,我們可以把他們放到其他地方來讓我們更加專注業(yè)務(wù)邏輯代碼。How to do?

通過一個轉(zhuǎn)換方法來轉(zhuǎn)換AsyncJob 的結(jié)果。但由于Java的限制,無法把方法作為參數(shù),所以需要用一個接口(或者類)并在里面定義一個轉(zhuǎn)換函數(shù):

public interface Func<T, R> {

    R call(T t);
}

當(dāng)我們把 AsyncJob 的結(jié)果轉(zhuǎn)換為其他類型的時候, 我們需要把一個結(jié)果值映射為另外一種類型,這個操作我們稱之為 map。 把該函數(shù)定義到 AsyncJob 類中比較方便,這樣就可以通過 this 來訪問 AsyncJob 對象了。

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);

    public <R> AsyncJob<R> map(final Func<T, R> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        R mapped = func.call(result);
                        callback.onResult(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

再來看看CatsHelper類

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat5(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        final AsyncJob<Cat> cutestCatAsyncJob = catListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
         //方式一:
        AsyncJob<Uri> storedUriAsyncJob = cutestCatAsyncJob.map(new Func<Cat, Uri>() {
            @Override
            public Uri call(Cat cat) {
                AsyncJob<Uri> store2 = wrapper.store2(cat);
                //這里的返回值不正確,無法編譯
                return store2;
            }
        });
        //方式二:
        AsyncJob<AsyncJob<Uri>> storedUriAsyncJob2 = cutestCatAsyncJob.map(new Func<Cat, AsyncJob<Uri>>() {
        @Override
            public AsyncJob<Uri> call(Cat cat) {
                AsyncJob<Uri> uriAsyncJob = wrapper.store2(cat);
                //這里的返回值不符合要求
                return uriAsyncJob;
            }
       });          
       return storedUriAsyncJob;
}               
}

根據(jù)方式二,我們只能夠得到一個AsyncJob<AsyncJob>的雙層異步結(jié)果,看來我們需要把它再壓縮成一層AsyncJob才能滿足我們的需求。在這里稱之為flatMap。那么我們轉(zhuǎn)換的結(jié)果不再是R,而是AsyncJob<R>

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);

    public <R> AsyncJob<R> map(final Func<T, R> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        R mapped = func.call(result);
                        callback.onResult(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
    public <R> AsyncJob<R> flatMap(final Func<T, AsyncJob<R>> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {

                        AsyncJob<R> mapped = func.call(result);
                        mapped.start(new Callback<R>() {
                            @Override
                            public void onResult(R result) {
                                callback.onResult(result);
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

最后回到我們的CatsHelper類

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat6(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        final AsyncJob<Cat> cutestCatAsyncJob = catListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
        AsyncJob<Uri> uriAsyncJob = cutestCat.flatMap(new Func<Cat, AsyncJob<Uri>>() {
       
            @Override
            public AsyncJob<Uri> call(Cat cat) {
                return wrapper.store2(cat);
            }
        });
        return uriAsyncJob;
    }               
}

上面的代碼是不是似曾相識?是的。回過頭再去看看前面我們說過的最理想的做法的代碼??赡懿皇呛苊黠@,我們用Java8表達式來看看

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat7(final String query) {
        AsyncJob<List<Cat>> listAsyncJob = wrapper.queryCats2(query);
        AsyncJob<Cat> cutestCat = listAsyncJob.map(cats -> findCutestCat(cats));
        AsyncJob<Uri> uriAsyncJob = cutestCat.flatMap(cat -> wrapper.store2(cat));
        return uriAsyncJob;
    }
    private Cat findCutestCat(List<Cat> catList) {
        return Collections.max(catList);
    }
}

使用RxJava

public class RxJavaApiWrapper {
private Api api;        
        
    public Observable<List<Cat>> rxJavaQueryCats(String query) {
        return Observable.create(new Observable.OnSubscribe<List<Cat>>() {
            @Override
            public void call(Subscriber<? super List<Cat>> subscriber) {
                api.queryCats2(query, new Api.CatsQueryCallback() {
                    @Override
                    public void onCatListReceived(List<Cat> catList) {
                        subscriber.onNext(catList);
                    }

                    @Override
                    public void onError(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }

    public Observable<Uri> rxJavaStore(Cat cat) {
        return Observable.create(new Observable.OnSubscribe<Uri>() {
            @Override
            public void call(Subscriber<? super Uri> subscriber) {
                api.store2(cat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        subscriber.onNext(uri);
                    }

                    @Override
                    public void onStoredFail(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }

}
----------------------------------------------------------------------------

public class RxJavaCatsHelper {
    private ApiWrapper wrapper;
    public Observable<Uri> rxJavaSaveTheCutestCat(String query) {
        Observable<List<Cat>> listObservable = wrapper.rxJavaQueryCats(query);
        Observable<Cat> cutestCat = listObservable.map(new Func1<List<Cat>, Cat>() {
            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
        Observable<Uri> uriObservable = cutestCat.flatMap(new Func1<Cat, Observable<Uri>>() {
            @Override
            public Observable<Uri> call(Cat cat) {
                return wrapper.rxJavaStore(cat);
            }
        });

        return uriObservable;
    }

    private Cat findCutestCat(List<Cat> catList) {
        return Collections.max(catList);
    }
}

總結(jié)

  • AsyncJob 等同于 Observable, 不僅僅可以返回一個結(jié)果,還可以返回一系列的結(jié)果,當(dāng)然也可能沒有結(jié)果返回。

  • Callback 等同于 Observer, 除了onNext(T t), onError(Throwable t)以外,還有一個onCompleted()函數(shù),該函數(shù)在結(jié)束繼續(xù)返回結(jié)果的時候通知Observable 。

  • abstract void start(Callback callback) 和 Subscription subscribe(final Observer observer) 類似,返回一個Subscription ,如果你不再需要后面的結(jié)果了,可以取消該任務(wù)。

  • 除了 map 和 flatMap 以外, Observable 還有很多其他常見的轉(zhuǎn)換操作。

看看我們的代碼是不是和RxJava的代碼很相似。當(dāng)然我們的只是一個部分,RxJava還有很多好東西值得我們?nèi)ネ诰虻摹?/p>

參考資料

http://yarikx.github.io/NotRxJava/

http://blog.chengyunfeng.com/?p=729

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容