歡迎訪問陳同學(xué)博客原文
Spring Cloud 源碼學(xué)習(xí)之 Hystrix 入門
Spring Cloud 之 Hystrix 跨線程傳遞數(shù)據(jù)
本文學(xué)習(xí)了 Hystrix 工作原理及源碼,關(guān)注點(diǎn)在整體處理流程,不涉及具體的實(shí)現(xiàn)細(xì)節(jié)。后續(xù)將逐漸寫Metrics收集、斷路器、隔離、請求緩存等,有興趣可以關(guān)注奧。
下面 流程圖 來源于 Hystrix Wiki,展現(xiàn)了 Hystrix 工作原理,官方 Wiki 中對每一步都做了詳細(xì)的描述,可以直接參考。

文中源碼基于 Spring Cloud Finchley.SR1、Spring Boot 2.0.6.RELEASE.
工作原理簡述
當(dāng)需要完成某項(xiàng)任務(wù)時(shí),通過 Hystrix 將任務(wù)包裹起來,交由 Hystrix 來完成任務(wù),從而享受 Hystrix 帶來保護(hù)。這和古代鏢局生意有點(diǎn)類似,將任務(wù)委托給鏢局,以期安全完成任務(wù)。
上圖展示了 Hystrix 完成任務(wù)的處理流程,下面對1到9步驟進(jìn)行簡述:
1.構(gòu)建命令
Hystrix 提供了兩個(gè)Command, HystrixCommand 和 HystrixObservableCommand,可以使用這兩個(gè)對象來包裹待執(zhí)行的任務(wù)。
例如使用 @HystrixCommand 注解標(biāo)記方法,Hystrix 將利用AOP自動(dòng)將目標(biāo)方法包裝成HystrixCommand來執(zhí)行。
@HystrixCommand
public String hello() {
...
}
也可以繼承HystrixCommand或HystrixObservableCommand來創(chuàng)建Command,例如:
public class MyCommand extends HystrixCommand {
public MyCommand(HystrixCommandGroupKey group) {
super(group);
}
@Override
protected Object run() throws Exception {
// 需要做的事情及需要返回的結(jié)果
return null;
}
}
任務(wù)委托給 Hystrix 后,Hystrix 可以應(yīng)用自己的一系列保護(hù)機(jī)制,在執(zhí)行用戶任務(wù)的各節(jié)點(diǎn)(執(zhí)行前、執(zhí)行后、異常、超時(shí)等)做一系列的事情。
2.執(zhí)行命令
有四種方式執(zhí)行command。
- R execute():同步執(zhí)行,從依賴服務(wù)得到單一結(jié)果對象
- Future<R> queue():異步執(zhí)行,返回一個(gè) Future 以便獲取執(zhí)行結(jié)果,也是單一結(jié)果對象
- Observable<R> observe():hot observable,創(chuàng)建Observable后會(huì)訂閱Observable,可以返回多個(gè)結(jié)果
- Observable<R> toObservable():cold observable,返回一個(gè)Observable,只有訂閱時(shí)才會(huì)執(zhí)行,可以返回多個(gè)結(jié)果
execute() 的實(shí)現(xiàn)為 queue().get(); queue() 的實(shí)現(xiàn)為 toObservable().toBlocking().toFuture()。
最后Obserable都由toObservable()來創(chuàng)建,本文的主要內(nèi)容就是toObservable()。
// 利用queue()拿到Future, 執(zhí)行 get()同步等待拿到執(zhí)行結(jié)果
public R execute() {
...
return queue().get();
}
// 利用toObservable()得到Observable最后轉(zhuǎn)成Future
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
...
}
// 利用toObservable()得到Observable并直接訂閱它,立即執(zhí)行命令
public Observable<R> observe() {
ReplaySubject<R> subject = ReplaySubject.create();
final Subscription sourceSubscription = toObservable().subscribe(subject);
...
}
3.檢查緩存
第3到9步驟構(gòu)成了 Hystrix 的保護(hù)能力,通過這一些列步驟來執(zhí)行任務(wù),從而起到保護(hù)作用。
如果啟用了 Hystrix Cache,任務(wù)執(zhí)行前將先判斷是否有相同命令執(zhí)行的緩存。如果有則直接返回緩存的結(jié)果;如果沒有緩存的結(jié)果,但啟動(dòng)了緩存,將緩存本次執(zhí)行結(jié)果以供后續(xù)使用。
4.檢查斷路器是否打開
斷路器(circuit-breaker)和保險(xiǎn)絲類似,保險(xiǎn)絲在發(fā)生危險(xiǎn)時(shí)將會(huì)燒斷以保護(hù)電路,而斷路器可以在達(dá)到我們設(shè)定的閥值時(shí)觸發(fā)短路(比如請求失敗率達(dá)到50%),拒絕執(zhí)行任何請求。
如果斷路器被打開,Hystrix 將不會(huì)執(zhí)行命令,直接進(jìn)入Fallback處理邏輯。
5.檢查線程池/信號量情況
Hystrix 隔離方式有線程池隔離和信號量隔離。當(dāng)使用Hystrix線程池時(shí),Hystrix 默認(rèn)為每個(gè)依賴服務(wù)分配10個(gè)線程,當(dāng)10個(gè)線程都繁忙時(shí),將拒絕執(zhí)行命令。信號量同理。
6.執(zhí)行具體的任務(wù)
通過HystrixObservableCommand.construct() 或者 HystrixCommand.run() 來運(yùn)行用戶真正的任務(wù)。
7.計(jì)算鏈路健康情況
每次開始執(zhí)行command、結(jié)束執(zhí)行command以及發(fā)生異常等情況時(shí),都會(huì)記錄執(zhí)行情況,例如:成功、失敗、拒絕以及超時(shí)等情況,會(huì)定期處理這些數(shù)據(jù),再根據(jù)設(shè)定的條件來判斷是否開啟斷路器。
8.命令失敗時(shí)執(zhí)行 Fallback 邏輯
在命令失敗時(shí)執(zhí)行用戶指定的 Fallback 邏輯。上圖中的斷路、線程池拒絕、信號量拒絕、執(zhí)行執(zhí)行、執(zhí)行超時(shí)都會(huì)進(jìn)入 Fallback 處理。
9.返回執(zhí)行結(jié)果
原始結(jié)果將以O(shè)bservable形式返回,在返回給用戶之前,會(huì)根據(jù)調(diào)用方式的不同做一些處理。
下面是 Hystrix Return flow。

源碼學(xué)習(xí)
小故事
由于最終入口都是 toObservable(),就從 AbstractCommand的 Observable<R> toObservable() 方法開始。
Hystrix 使用觀察者模式,Observable 即被觀察者,被觀察者些狀態(tài)變更時(shí),觀察者可以做出各項(xiàng)響應(yīng)。舉個(gè)例子:大廳中一位演講者正在分享,廳中有觀眾和工作人員,可能發(fā)生如下事情:
被觀察者 事件 觀察者
-----------------------------------
演講者 分享到精彩處 -> 觀眾鼓掌
演講者 講的口干舌燥 -> 工作人員遞上一瓶水
演講者 放出自己的二維碼 -> 觀眾掃描
因?yàn)?Hystrix 基于RxJava,RxJava 初次看會(huì)比較復(fù)雜。為了便于下文理解,可以將Observable理解為數(shù)據(jù)源、數(shù)據(jù)發(fā)射器,上面例子中,演講者各種行為都可以抽象為數(shù)據(jù)源在發(fā)射數(shù)據(jù),而各種接收者可以做出各種響應(yīng)。
toObservable()
toObservable() 主要源碼如下:
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// 命令執(zhí)行結(jié)束后的清理者
final Action0 terminateCommandCleanup = new Action0() {...};
// 取消訂閱時(shí)處理者
final Action0 unsubscribeCommandCleanup = new Action0() {...};
// 重點(diǎn):Hystrix 核心邏輯: 斷路器、隔離
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {...};
// 發(fā)射數(shù)據(jù)(OnNext表示發(fā)射數(shù)據(jù))時(shí)的Hook
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
// 命令執(zhí)行完成的Hook
final Action0 fireOnCompletedHook = new Action0() {...};
// 通過Observable.defer()創(chuàng)建一個(gè)Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
// 首先嘗試從請求緩存中獲取結(jié)果
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 使用上面的Func0:applyHystrixSemantics 來創(chuàng)建Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 如果啟用請求緩存,將Observable包裝成HystrixCachedObservable并進(jìn)行相關(guān)處理
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
...
} else {
afterCache = hystrixObservable;
}
// 返回Observable
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}
上面的代碼可以換種思維方式來理解。平時(shí)開發(fā)時(shí)都是下面這種模式,按順序不斷的做事情,是一個(gè)很好的執(zhí)行者。
public void methodA{
try {
// 1. 做第一件事情
// 2. 調(diào)用methodB()做第二件事情
// 3. 做第三件事情
...
} catch (Exception e) {
// 處理錯(cuò)誤
} finally {
// 最后一定要做的事情
}
}
用一張圖來看 toObservable() 方法。這種方式是“軍師型”,排兵布陣,先創(chuàng)造了各個(gè)處理者,然后創(chuàng)造被觀察者,再設(shè)置Observable發(fā)生各種情況時(shí)由誰來處理,完全掌控全局。

解釋下Action0、Func1這種對象。Action、Func和Runnable、Callable類似,是一個(gè)可以被執(zhí)行的實(shí)體。Action沒有返回值,Action0...ActionN表示有0..N個(gè)參數(shù),Action0就表示沒有參數(shù);Func有返值,0..N一樣表示參數(shù)。
public interface Action0 extends Action {
void call();
}
public interface Func1<T, R> extends Function {
R call(T t);
}
下面用核心的 applyHystrixSemantics 來闡述一下。
// applyHystrixSemantics 是一個(gè)Func0(理解為執(zhí)行實(shí)體或處理者),表示沒有參數(shù),返回值是Observable。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
// Func0 做的事情如下
@Override
public Observable<R> call() {
// 如果未訂閱,返回一個(gè)"啞炮" Observable, 即一個(gè)不會(huì)發(fā)射任何數(shù)據(jù)的Observable
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 調(diào)用applyHystrixSemantics()來創(chuàng)建Observable
return applyHystrixSemantics(_cmd);
}
};
因此,當(dāng)執(zhí)行Func0: applyHystrixSemantics時(shí),可以得到一個(gè)Observable。toObservable() 大量代碼在準(zhǔn)備處理者(觀察者),實(shí)際使用時(shí)是方法最后的 Observable.defer(new Func0<Observable<R>>(){...}
Observable.defer
defer譯為延遲,表示演講者會(huì)等有觀眾來時(shí)才開始分享。Observable.defer() 就是說:必須有觀察者訂閱了我是,我才開始發(fā)射數(shù)據(jù)。而defer()的參數(shù)是個(gè)Func0,是一個(gè)會(huì)返回Observable的執(zhí)行實(shí)體。下面看看defer():
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 再一次使用Observable.defer()技能,這次用的是applyHystrixSemantics這個(gè)Func0
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
... // 此處忽略了請求緩存處理,上面已有提及
Observable<R> afterCache;
...
// 為Observable綁定幾個(gè)特定事件的處理者,這都是上門創(chuàng)建的Action0
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
applyHystrixSemantics()
接著看applyHystrixSemantics這個(gè)Func0,F(xiàn)unc0的call()中調(diào)用的是applyHystrixSemantics()函數(shù)。
// Semantics 譯為語義, 應(yīng)用Hystrix語義很拗口,其實(shí)就是應(yīng)用Hystrix的斷路器、隔離特性
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 源碼中有很多executionHook、eventNotifier的操作,這是Hystrix拓展性的一種體現(xiàn)。這里面啥事也沒做,留了個(gè)口子,開發(fā)人員可以拓展
executionHook.onStart(_cmd);
// 判斷斷路器是否開啟
if (circuitBreaker.attemptExecution()) {
// 獲取執(zhí)行信號
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {...};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...};
// 判斷是否信號量拒絕
if (executionSemaphore.tryAcquire()) {
try {
// 重點(diǎn):處理隔離策略和Fallback策略
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
}
// 開啟了斷路器,執(zhí)行Fallback
else {
return handleShortCircuitViaFallback();
}
}
executeCommandAndObserve()
下面看executeCommandAndObserve()方法,處理隔離策略和各種Fallback.
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {...};
final Action0 markOnCompleted = new Action0() {...};
// 利用Func1獲取處理Fallback的 Observable
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
// 拒絕處理
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
// 超時(shí)處理
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
...
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext ...
Observable<R> execution;
// 利用特定的隔離策略來處理
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
// 綁定Fallback的處理者
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation()
接著看隔離特性的處理:executeCommandWithSpecifiedIsolation()
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 線程池隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// 再次使用 Observable.defer(), 通過執(zhí)行Func0來得到Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 收集metric信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
...
try {
... // 獲取真正的用戶Task
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
...
}
// 綁定各種處理者
}).doOnTerminate(new Action0() {...})
.doOnUnsubscribe(new Action0() {...})
// 綁定超時(shí)處理者
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 信號量隔離,和線程池大同小異,全部省略了
else {
return Observable.defer(new Func0<Observable<R>>() {...}
}
}
getUserExecutionObservable()就不接著寫了,可以自己看下,就是拿到用戶真正要執(zhí)行的任務(wù)。這個(gè)任務(wù)就是這樣被Hystrix包裹著,置于層層防護(hù)之下。
倒過來看
上面方法層層調(diào)用,倒過來看,就是先創(chuàng)建一個(gè)Observable,然后綁定各種事件對應(yīng)的處理者,如下圖:

各類doOnXXXX,表示發(fā)生XXX事件時(shí)做什么事情。
參考
- DD 《Spring Cloud微服務(wù)實(shí)戰(zhàn)》
- 朱榮鑫,張?zhí)?,黃迪璇《Spring Cloud微服務(wù)架構(gòu)進(jìn)階》
- Hystrix Wiki: How it works
歡迎關(guān)注陳同學(xué)的公眾號,一起學(xué)習(xí),一起成長
