[Soul 源碼之旅] 1.8.6 Soul插件初體驗(yàn) (Hystrix 2)

在講 Hystrix 插件前,我們需要先了解一下 Hystrix 的具體實(shí)現(xiàn),Hystrix 限流框架基于 大名鼎鼎的 RxJava 實(shí)現(xiàn),我們這節(jié)先來(lái)了解一下 RxJava。

1.8.6.1 響應(yīng)式編程

響應(yīng)式編程簡(jiǎn)稱 FRP (Function Reatcive Programing)詳細(xì)定義大家可以看一下 stackOverFlow 的這位大神解釋,簡(jiǎn)單來(lái)說(shuō)他就是一種基于事件的模型,我們需要知道某個(gè)事件是否發(fā)生有兩種方式,一種就是主動(dòng)輪訓(xùn),我們把它稱為 Proactive 方式。另一種就是被動(dòng)接收反饋,我們稱為 Reactive。
響應(yīng)式編程是未來(lái)的一種趨勢(shì),對(duì)于后端java 程序員關(guān)于響應(yīng)式編程第一個(gè)應(yīng)該會(huì)想到 Spring 5 引入的 WebFlux ,而在 Android 中,響應(yīng)式編程發(fā)展卻快得多,最主要原因是使用場(chǎng)景,Android 客戶端中存在大量的和服務(wù)器交互的程序,所以存在大量的異步回掉過(guò)程,一般這種請(qǐng)看就會(huì)使用到 RxJava ,它可以更加簡(jiǎn)化代碼的邏輯。

1.8.6.2 Rxjava 使用入門

我們先來(lái)看一下一個(gè)RxJava 調(diào)用的實(shí)例:

//創(chuàng)建被觀察者
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("111");
                subscriber.onNext("222");
                subscriber.onNext("333");
                subscriber.onCompleted();
                
            }
        });
        //創(chuàng)建觀察者
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println("Complete");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        };
        //注冊(cè),將觀察者和被觀察者關(guān)聯(lián),將會(huì)觸發(fā)OnSubscribe.call方法
        observable.subscribe(observer);

我們先來(lái)介紹一下 RxJava 最重要的兩個(gè)成員:

  • Observable 可以理解為被觀察者,RxJava 中使用了觀察者模式。
  • Observer 可以理解為觀察者。
    Observable 方法
  • call 即被觀察者的所有活動(dòng),這里可以調(diào)用 subscriber.onNext 向訂閱者發(fā)送消息。
  • subscribe 這個(gè)是所有方法的啟動(dòng)源頭,我將其主要邏輯整理如下,首先他會(huì)調(diào)用 Observerstart方法,告訴它開(kāi)始了,然后調(diào)用自己的 call 方法,假如發(fā)生異常,就調(diào)用 subscriberonError 方法,然后 unsubscribed , 也就是將被觀察者注冊(cè)的觀察者清空。
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        subscriber.onStart();
        try {
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
             subscriber.onError(RxJavaHooks.onObservableError(e));
            return Subscriptions.unsubscribed();
        }
    }

接下來(lái)我們說(shuō)一下 observer 的三個(gè)方法:

  • onCompleted 當(dāng) Observable 的 call 完成后調(diào)用,它后 onError 只有一個(gè)會(huì)被調(diào)用
  • onError 我們可以參考上面的源碼,在 調(diào)用 call 異常時(shí)調(diào)用
  • onNext Observable 每個(gè)發(fā)生時(shí)間都會(huì)調(diào)用。

1.8.6.3 Rxjava 各種操作

  • Schedule 調(diào)度線程
    我們將我們的代碼稍微改一下
        final CountDownLatch latch = new CountDownLatch(1);
        //創(chuàng)建被觀察者
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("111");
                subscriber.onNext("222");
                subscriber.onNext("333");
                subscriber.onCompleted();
                System.out.println(Thread.currentThread().getName());
            }
        });
        //創(chuàng)建觀察者
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println(Thread.currentThread().getName() + "-Complete");
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String s) {
                System.out.println(Thread.currentThread().getName() + "-" +s);
            }
        };
        //注冊(cè),將觀察者和被觀察者關(guān)聯(lián),將會(huì)觸發(fā)OnSubscribe.call方法
        observable.subscribeOn(Schedulers.newThread()) // 指定 subscribe() 發(fā)生在新的線程
            .observeOn(Schedulers.newThread()).subscribe(observer);
        latch.await();

得到結(jié)果入下:

RxNewThreadScheduler-2
RxNewThreadScheduler-1-111
RxNewThreadScheduler-1-222
RxNewThreadScheduler-1-333
RxNewThreadScheduler-1-Complete

這對(duì)于我們來(lái)說(shuō)確實(shí)是很酷的事情,這里subscribeOnobserveOn 分別指定了我們生產(chǎn)者在調(diào)用 call 的線程和消費(fèi)者的線程。

  • map 轉(zhuǎn)換
    map

    我們可以參考如上面的圖和下面的代碼,假如 observable 發(fā)射的是 Integer 類型,而 Obsever 需要接收 String 那么我們就可以通過(guò) map 進(jìn)行轉(zhuǎn)換。
   observable.subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.newThread()).map(i -> i +"").subscribe(observer);
  • flatmap
    flatmap 對(duì)于很多新手來(lái)說(shuō)是最難理解的,flat 可以理解為平鋪,我們需要將 Observable 發(fā)射的時(shí)間平鋪開(kāi)來(lái)的場(chǎng)景,如我們需要將 Stirng 使用空格分割成一個(gè)個(gè)單詞進(jìn)行消費(fèi),如下:
 observable.subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.newThread()).flatMap(s -> Observable.from(s.split(" "))).subscribe(observer);

我們也可以參考圖片,我們這里將一個(gè)OnNext 事件轉(zhuǎn)為一個(gè) Observable 然后將這些 Observable 匯總成一個(gè) ObservableObsever 進(jìn)行消費(fèi)。

flatmpa

  • from 如上,我們可以通過(guò)from 將一個(gè)數(shù)組快速轉(zhuǎn)為 Observable
  • filter 顧名思義,就是將一部分事件過(guò)濾掉,這里很多操作和 stream 是類似的。
  • distinct 去重。

1.8.6.4 總結(jié)

我們這次只是簡(jiǎn)單地學(xué)習(xí)了一下 RxJava 的原理,RxJava 還有很多好玩的地方等著我們?nèi)ヌ剿?,有興趣的小伙伴可以深入學(xué)習(xí)一下,非常適合入門響應(yīng)式編程,我們下一節(jié)將接收 Hystrix 如何通過(guò) RxJava 實(shí)現(xiàn)降級(jí)和熔斷。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容