Spring Cloud 源碼學(xué)習(xí)之 Hystrix 工作原理

歡迎訪問陳同學(xué)博客原文

Spring Cloud 源碼學(xué)習(xí)之 Hystrix 入門
Spring Cloud 之 Hystrix 跨線程傳遞數(shù)據(jù)

本文學(xué)習(xí)了 Hystrix 工作原理及源碼,關(guān)注點(diǎn)在整體處理流程,不涉及具體的實(shí)現(xiàn)細(xì)節(jié)。后續(xù)將逐漸寫Metrics收集、斷路器、隔離、請求緩存等,有興趣可以關(guān)注奧。

下面 流程圖 來源于 Hystrix Wiki,展現(xiàn)了 Hystrix 工作原理,官方 Wiki 中對每一步都做了詳細(xì)的描述,可以直接參考。

image

文中源碼基于 Spring Cloud Finchley.SR1、Spring Boot 2.0.6.RELEASE.

工作原理簡述

當(dāng)需要完成某項(xiàng)任務(wù)時(shí),通過 Hystrix 將任務(wù)包裹起來,交由 Hystrix 來完成任務(wù),從而享受 Hystrix 帶來保護(hù)。這和古代鏢局生意有點(diǎn)類似,將任務(wù)委托給鏢局,以期安全完成任務(wù)。

上圖展示了 Hystrix 完成任務(wù)的處理流程,下面對1到9步驟進(jìn)行簡述:

1.構(gòu)建命令

Hystrix 提供了兩個(gè)Command, HystrixCommandHystrixObservableCommand,可以使用這兩個(gè)對象來包裹待執(zhí)行的任務(wù)。

例如使用 @HystrixCommand 注解標(biāo)記方法,Hystrix 將利用AOP自動(dòng)將目標(biāo)方法包裝成HystrixCommand來執(zhí)行。

@HystrixCommand
public String hello() {
    ...
}

也可以繼承HystrixCommand或HystrixObservableCommand來創(chuàng)建Command,例如:

public class MyCommand extends HystrixCommand {
    
    public MyCommand(HystrixCommandGroupKey group) {
        super(group);
    }

    @Override
    protected Object run() throws Exception {
        // 需要做的事情及需要返回的結(jié)果
        return null;
    }
}

任務(wù)委托給 Hystrix 后,Hystrix 可以應(yīng)用自己的一系列保護(hù)機(jī)制,在執(zhí)行用戶任務(wù)的各節(jié)點(diǎn)(執(zhí)行前、執(zhí)行后、異常、超時(shí)等)做一系列的事情。

2.執(zhí)行命令

有四種方式執(zhí)行command。

  • R execute():同步執(zhí)行,從依賴服務(wù)得到單一結(jié)果對象
  • Future<R> queue():異步執(zhí)行,返回一個(gè) Future 以便獲取執(zhí)行結(jié)果,也是單一結(jié)果對象
  • Observable<R> observe():hot observable,創(chuàng)建Observable后會(huì)訂閱Observable,可以返回多個(gè)結(jié)果
  • Observable<R> toObservable():cold observable,返回一個(gè)Observable,只有訂閱時(shí)才會(huì)執(zhí)行,可以返回多個(gè)結(jié)果

execute() 的實(shí)現(xiàn)為 queue().get(); queue() 的實(shí)現(xiàn)為 toObservable().toBlocking().toFuture()。

最后Obserable都由toObservable()來創(chuàng)建,本文的主要內(nèi)容就是toObservable()。

// 利用queue()拿到Future, 執(zhí)行 get()同步等待拿到執(zhí)行結(jié)果
public R execute() {
    ...
    return queue().get();
}

// 利用toObservable()得到Observable最后轉(zhuǎn)成Future
public Future<R> queue() {
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    ...
} 

// 利用toObservable()得到Observable并直接訂閱它,立即執(zhí)行命令
public Observable<R> observe() {
    ReplaySubject<R> subject = ReplaySubject.create();
    final Subscription sourceSubscription = toObservable().subscribe(subject);
    ...
}

3.檢查緩存

第3到9步驟構(gòu)成了 Hystrix 的保護(hù)能力,通過這一些列步驟來執(zhí)行任務(wù),從而起到保護(hù)作用。

如果啟用了 Hystrix Cache,任務(wù)執(zhí)行前將先判斷是否有相同命令執(zhí)行的緩存。如果有則直接返回緩存的結(jié)果;如果沒有緩存的結(jié)果,但啟動(dòng)了緩存,將緩存本次執(zhí)行結(jié)果以供后續(xù)使用。

4.檢查斷路器是否打開

斷路器(circuit-breaker)和保險(xiǎn)絲類似,保險(xiǎn)絲在發(fā)生危險(xiǎn)時(shí)將會(huì)燒斷以保護(hù)電路,而斷路器可以在達(dá)到我們設(shè)定的閥值時(shí)觸發(fā)短路(比如請求失敗率達(dá)到50%),拒絕執(zhí)行任何請求。

如果斷路器被打開,Hystrix 將不會(huì)執(zhí)行命令,直接進(jìn)入Fallback處理邏輯。

5.檢查線程池/信號量情況

Hystrix 隔離方式有線程池隔離和信號量隔離。當(dāng)使用Hystrix線程池時(shí),Hystrix 默認(rèn)為每個(gè)依賴服務(wù)分配10個(gè)線程,當(dāng)10個(gè)線程都繁忙時(shí),將拒絕執(zhí)行命令。信號量同理。

6.執(zhí)行具體的任務(wù)

通過HystrixObservableCommand.construct() 或者 HystrixCommand.run() 來運(yùn)行用戶真正的任務(wù)。

7.計(jì)算鏈路健康情況

每次開始執(zhí)行command、結(jié)束執(zhí)行command以及發(fā)生異常等情況時(shí),都會(huì)記錄執(zhí)行情況,例如:成功、失敗、拒絕以及超時(shí)等情況,會(huì)定期處理這些數(shù)據(jù),再根據(jù)設(shè)定的條件來判斷是否開啟斷路器。

8.命令失敗時(shí)執(zhí)行 Fallback 邏輯

在命令失敗時(shí)執(zhí)行用戶指定的 Fallback 邏輯。上圖中的斷路、線程池拒絕、信號量拒絕、執(zhí)行執(zhí)行、執(zhí)行超時(shí)都會(huì)進(jìn)入 Fallback 處理。

9.返回執(zhí)行結(jié)果

原始結(jié)果將以O(shè)bservable形式返回,在返回給用戶之前,會(huì)根據(jù)調(diào)用方式的不同做一些處理。

下面是 Hystrix Return flow。

image

源碼學(xué)習(xí)

小故事

由于最終入口都是 toObservable(),就從 AbstractCommand的 Observable<R> toObservable() 方法開始。

Hystrix 使用觀察者模式,Observable 即被觀察者,被觀察者些狀態(tài)變更時(shí),觀察者可以做出各項(xiàng)響應(yīng)。舉個(gè)例子:大廳中一位演講者正在分享,廳中有觀眾和工作人員,可能發(fā)生如下事情:

被觀察者 事件         觀察者
-----------------------------------
演講者 分享到精彩處 -> 觀眾鼓掌
演講者 講的口干舌燥 -> 工作人員遞上一瓶水
演講者 放出自己的二維碼 -> 觀眾掃描

因?yàn)?Hystrix 基于RxJava,RxJava 初次看會(huì)比較復(fù)雜。為了便于下文理解,可以將Observable理解為數(shù)據(jù)源、數(shù)據(jù)發(fā)射器,上面例子中,演講者各種行為都可以抽象為數(shù)據(jù)源在發(fā)射數(shù)據(jù),而各種接收者可以做出各種響應(yīng)。

toObservable()

toObservable() 主要源碼如下:

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
    // 命令執(zhí)行結(jié)束后的清理者
    final Action0 terminateCommandCleanup = new Action0() {...};
    // 取消訂閱時(shí)處理者
    final Action0 unsubscribeCommandCleanup = new Action0() {...};
    // 重點(diǎn):Hystrix 核心邏輯: 斷路器、隔離
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {...};
    // 發(fā)射數(shù)據(jù)(OnNext表示發(fā)射數(shù)據(jù))時(shí)的Hook
    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
    // 命令執(zhí)行完成的Hook
    final Action0 fireOnCompletedHook = new Action0() {...};

    // 通過Observable.defer()創(chuàng)建一個(gè)Observable
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            // 首先嘗試從請求緩存中獲取結(jié)果
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            // 使用上面的Func0:applyHystrixSemantics 來創(chuàng)建Observable
            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // 如果啟用請求緩存,將Observable包裝成HystrixCachedObservable并進(jìn)行相關(guān)處理
            if (requestCacheEnabled && cacheKey != null) {
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                ...
            } else {
                afterCache = hystrixObservable;
            }

            // 返回Observable
            return afterCache
                    .doOnTerminate(terminateCommandCleanup)   
                    .doOnUnsubscribe(unsubscribeCommandCleanup)
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

上面的代碼可以換種思維方式來理解。平時(shí)開發(fā)時(shí)都是下面這種模式,按順序不斷的做事情,是一個(gè)很好的執(zhí)行者。

public void methodA{
    try {
        // 1. 做第一件事情
        // 2. 調(diào)用methodB()做第二件事情
        // 3. 做第三件事情
        ...
    } catch (Exception e) {
        // 處理錯(cuò)誤
    } finally {
        // 最后一定要做的事情
    }
}

用一張圖來看 toObservable() 方法。這種方式是“軍師型”,排兵布陣,先創(chuàng)造了各個(gè)處理者,然后創(chuàng)造被觀察者,再設(shè)置Observable發(fā)生各種情況時(shí)由誰來處理,完全掌控全局。

image

解釋下Action0、Func1這種對象。Action、Func和Runnable、Callable類似,是一個(gè)可以被執(zhí)行的實(shí)體。Action沒有返回值,Action0...ActionN表示有0..N個(gè)參數(shù),Action0就表示沒有參數(shù);Func有返值,0..N一樣表示參數(shù)。

public interface Action0 extends Action {
    void call();
}
public interface Func1<T, R> extends Function {
    R call(T t);
}

下面用核心的 applyHystrixSemantics 來闡述一下。

// applyHystrixSemantics 是一個(gè)Func0(理解為執(zhí)行實(shí)體或處理者),表示沒有參數(shù),返回值是Observable。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    // Func0 做的事情如下
    @Override
    public Observable<R> call() {
        // 如果未訂閱,返回一個(gè)"啞炮" Observable, 即一個(gè)不會(huì)發(fā)射任何數(shù)據(jù)的Observable
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        // 調(diào)用applyHystrixSemantics()來創(chuàng)建Observable
        return applyHystrixSemantics(_cmd);
    }
};

因此,當(dāng)執(zhí)行Func0: applyHystrixSemantics時(shí),可以得到一個(gè)Observable。toObservable() 大量代碼在準(zhǔn)備處理者(觀察者),實(shí)際使用時(shí)是方法最后的 Observable.defer(new Func0<Observable<R>>(){...}

Observable.defer

defer譯為延遲,表示演講者會(huì)等有觀眾來時(shí)才開始分享。Observable.defer() 就是說:必須有觀察者訂閱了我是,我才開始發(fā)射數(shù)據(jù)。而defer()的參數(shù)是個(gè)Func0,是一個(gè)會(huì)返回Observable的執(zhí)行實(shí)體。下面看看defer():

return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        // 再一次使用Observable.defer()技能,這次用的是applyHystrixSemantics這個(gè)Func0
        Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                        .map(wrapWithAllOnNextHooks);
        ... // 此處忽略了請求緩存處理,上面已有提及
        Observable<R> afterCache;
        ...
        // 為Observable綁定幾個(gè)特定事件的處理者,這都是上門創(chuàng)建的Action0
        return afterCache
                .doOnTerminate(terminateCommandCleanup) 
                .doOnUnsubscribe(unsubscribeCommandCleanup) 
                .doOnCompleted(fireOnCompletedHook);
    }
});

applyHystrixSemantics()

接著看applyHystrixSemantics這個(gè)Func0,F(xiàn)unc0的call()中調(diào)用的是applyHystrixSemantics()函數(shù)。

// Semantics 譯為語義, 應(yīng)用Hystrix語義很拗口,其實(shí)就是應(yīng)用Hystrix的斷路器、隔離特性
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // 源碼中有很多executionHook、eventNotifier的操作,這是Hystrix拓展性的一種體現(xiàn)。這里面啥事也沒做,留了個(gè)口子,開發(fā)人員可以拓展
    executionHook.onStart(_cmd);

    // 判斷斷路器是否開啟
    if (circuitBreaker.attemptExecution()) {
        // 獲取執(zhí)行信號
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {...};
        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...};

        // 判斷是否信號量拒絕
        if (executionSemaphore.tryAcquire()) {
            try {
                // 重點(diǎn):處理隔離策略和Fallback策略
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } 
    // 開啟了斷路器,執(zhí)行Fallback
    else {
        return handleShortCircuitViaFallback();
    }
}

executeCommandAndObserve()

下面看executeCommandAndObserve()方法,處理隔離策略和各種Fallback.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    final Action1<R> markEmits = new Action1<R>() {...};
    final Action0 markOnCompleted = new Action0() {...};

    // 利用Func1獲取處理Fallback的 Observable
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            // 拒絕處理
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            // 超時(shí)處理    
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                ...
                return handleFailureViaFallback(e);
            }
        }
    };

    final Action1<Notification<? super R>> setRequestContext ...

    Observable<R> execution;
    // 利用特定的隔離策略來處理
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            // 綁定Fallback的處理者
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

executeCommandWithSpecifiedIsolation()

接著看隔離特性的處理:executeCommandWithSpecifiedIsolation()

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    // 線程池隔離
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // 再次使用 Observable.defer(), 通過執(zhí)行Func0來得到Observable
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // 收集metric信息
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                ...
                try {
                    ... // 獲取真正的用戶Task
                    return getUserExecutionObservable(_cmd);
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
                ...
            }
            // 綁定各種處理者
        }).doOnTerminate(new Action0() {...})
            .doOnUnsubscribe(new Action0() {...})
            // 綁定超時(shí)處理者
            .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } 
    // 信號量隔離,和線程池大同小異,全部省略了
    else {
        return Observable.defer(new Func0<Observable<R>>() {...}
    }
}

getUserExecutionObservable()就不接著寫了,可以自己看下,就是拿到用戶真正要執(zhí)行的任務(wù)。這個(gè)任務(wù)就是這樣被Hystrix包裹著,置于層層防護(hù)之下。

倒過來看

上面方法層層調(diào)用,倒過來看,就是先創(chuàng)建一個(gè)Observable,然后綁定各種事件對應(yīng)的處理者,如下圖:

image

各類doOnXXXX,表示發(fā)生XXX事件時(shí)做什么事情。

參考

  • DD 《Spring Cloud微服務(wù)實(shí)戰(zhàn)》
  • 朱榮鑫,張?zhí)?,黃迪璇《Spring Cloud微服務(wù)架構(gòu)進(jìn)階》
  • Hystrix Wiki: How it works

歡迎關(guān)注陳同學(xué)的公眾號,一起學(xué)習(xí),一起成長

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容