Hystrix工作機制

在復雜的分布式應用中有著許多的依賴,各個依賴都有難免在某個時刻失敗,如果應用不隔離各個依賴,降低外部的風險,那容易拖垮整個應用。

舉個電商場景中常見的例子,比如訂單服務調用了庫存服務、商品服務、積分服務、支付服務,系統均正常情況下,訂單模塊正常運行。


但是當積分服務發(fā)生異常時且會阻塞30s時,訂單服務就有有部分請求失敗,且工作線程阻塞在調用積分服務上。


流量高峰時,問題會更加嚴重,訂單服務的所有請求都會阻塞在調用積分服務上,工作線程全部掛起,導致機器資源耗盡,訂單服務也不可用,造成級聯影響,整個集群宕機,這種稱為雪崩效應。


所以需要一種機制,使得單個服務出現故障時,整個集群可用性不受到影響。Hystrix就是實現這種機制的框架;


【入口】Hystrix的執(zhí)行入口是HystrixCommand或HystrixObservableCommand對象,通常在Spring應用中會通過注解和AOP來實現對象的構造,以降低對業(yè)務代碼的侵入性;

【緩存】HystrixCommand對象實際開始執(zhí)行后,首先是否開啟緩存,若開啟緩存且命中,則直接返回;

【熔斷】若熔斷器打開,則執(zhí)行短路,直接走降級邏輯;若熔斷器關閉,繼續(xù)下一步,進入隔離邏輯。熔斷器的狀態(tài)主要基于窗口期內執(zhí)行失敗率,若失敗率過高,則熔斷器自動打開;

【隔離】用戶可配置走線程池隔離或信號量隔離,判斷線程池任務已滿(或信號量),則進入降級邏輯;否則繼續(xù)下一步,實際由線程池任務線程執(zhí)行業(yè)務調用;

【執(zhí)行】實際開始執(zhí)行業(yè)務調用,若執(zhí)行失敗或異常,則進入降級邏輯;若執(zhí)行成功,則正常返回;

【超時】通過定時器延時任務檢測業(yè)務調用執(zhí)行是否超時,若超時則取消業(yè)務執(zhí)行的線程,進入降級邏輯;若未超時,則正常返回。線程池、信號量兩種策略均隔離方式支持超時配置(信號量策略存在缺陷);

【降級】進入降級邏輯后,當業(yè)務實現了HystrixCommand.getFallback() 方法,則返回降級處理的數據;當未實現時,則返回異常;

【統計】業(yè)務調用執(zhí)行結果成功、失敗、超時等均會進入統計模塊,通過健康統計結果來決定熔斷器打開或關閉。
----

熔斷

若電路中正確地安置了保險絲,那么保險絲就會在電流異常升高到一定的高度和一定的時候,自身熔斷切斷電流,從而起到保護電路安全運行的作用。Hystrix提供的熔斷器就有類似功能,應用調用某個服務提供者,當一定時間內請求總數超過配置的閾值,且窗口期內錯誤率過高,那Hystrix就會對調用請求熔斷,后續(xù)的請求直接短路,直接進入降級邏輯,執(zhí)行本地的降級策略。

Hystrix具有自我調節(jié)的能力,熔斷器打開在一定時間后,會嘗試通過一個請求,并根據執(zhí)行結果調整熔斷器狀態(tài),讓熔斷器在closed,open,half-open三種狀態(tài)之間自動切換。


【HystrixCircuitBreaker】boolean attemptExecution():每次HystrixCommand執(zhí)行,都要調用這個方法,判斷是否可以繼續(xù)執(zhí)行,若熔斷器狀態(tài)為打開且超過休眠窗口,更新熔斷器狀態(tài)為half-open;通過CAS原子變更熔斷器狀態(tài)來保證只放過一條業(yè)務請求實際調用提供方,并根據執(zhí)行結果調整狀態(tài)。

public boolean attemptExecution() {

? ? //判斷配置是否強制打開熔斷器

? ? if (properties.circuitBreakerForceOpen().get()) {

? ? ? ? return false;

? ? }

? ? //判斷配置是否強制關閉熔斷器

? ? if (properties.circuitBreakerForceClosed().get()) {

? ? ? ? return true;

? ? }

? ? //判斷熔斷器開關是否關閉

? ? if (circuitOpened.get() == -1) {

? ? ? ? return true;

? ? } else {

? ? ? ? //判斷請求是否在休眠窗口后

? ? ? ? if (isAfterSleepWindow()) {

? ? ? ? ? ? //更新開關為半開,并允許本次請求通過

? ? ? ? ? ? if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {

? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? return false;

? ? ? ? ? ? }

? ? ? ? } else {

? ? ? ? ? ? //拒絕請求

? ? ? ? ? ? return false;

? ? ? ? }

? ? }

}


【HystrixCircuitBreaker】void markSuccess():HystrixCommand執(zhí)行成功后調用,當熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為closed。此種情況為熔斷器原本為open,放過單條請求實際調用服務提供者,并且后續(xù)執(zhí)行成功,Hystrix自動調節(jié)熔斷器為closed。


public void markSuccess() {

? ? //更新熔斷器開關為關閉

? ? if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {

? ? ? ? //重置訂閱健康統計

? ? ? ? metrics.resetStream();

? ? ? ? Subscription previousSubscription = activeSubscription.get();

? ? ? ? if (previousSubscription != null) {

? ? ? ? ? ? previousSubscription.unsubscribe();

? ? ? ? }

? ? ? ? Subscription newSubscription = subscribeToStream();

? ? ? ? activeSubscription.set(newSubscription);

? ? ? ? //更新熔斷器開關為關閉

? ? ? ? circuitOpened.set(-1L);

? ? }

}

【HystrixCircuitBreaker】void markNonSuccess():HystrixCommand執(zhí)行成功后調用,若熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為open。此種情況為熔斷器原本為open,放過單條請求實際調用服務提供者,并且后續(xù)執(zhí)行失敗,Hystrix繼續(xù)保持熔斷器打開,并把此次請求作為休眠窗口期開始時間。

public void markNonSuccess() {

? ? ? //更新熔斷器開關,從半開變?yōu)榇蜷_

? ? ? if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {

? ? ? ? ? //記錄失敗時間,作為休眠窗口開始時間

? ? ? ? ? circuitOpened.set(System.currentTimeMillis());

? ? ? }

? }

private Subscription subscribeToStream() {

? ? //訂閱監(jiān)控統計信息

? ? return metrics.getHealthCountsStream()

? ? ? ? ? ? .observe()

? ? ? ? ? ? .subscribe(new Subscriber<HealthCounts>() {

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void onCompleted() {}

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void onError(Throwable e) {}

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void onNext(HealthCounts hc) {

? ? ? ? ? ? ? ? ? ? // 判斷總請求數量是否超過配置閾值,若未超過,則不改變熔斷器狀態(tài)

? ? ? ? ? ? ? ? ? ? if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? //判斷請求錯誤率是否超過配置錯誤率閾值,若未超過,則不改變熔斷器狀態(tài);若超過,則錯誤率過高,更新熔斷器狀態(tài)未打開,拒絕后續(xù)請求

? ? ? ? ? ? ? ? ? ? ? ? if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

? ? ? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? ? ? if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? circuitOpened.set(System.currentTimeMillis());

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? });

}

資源隔離

在貨船中,為了防止漏水和火災的擴散,一般會將貨倉進行分割,避免了一個貨倉出事導致整艘船沉沒的悲劇。同樣的,在Hystrix中,也采用了這樣的艙壁模式,將系統中的服務提供者隔離起來,一個服務提供者延遲升高或者失敗,并不會導致整個系統的失敗,同時也能夠控制調用這些服務的并發(fā)度。如下圖,訂單服務調用下游積分、庫存等服務使用不同的線程池,當積分服務故障時,只會把對應線程池打滿,而不會影響到其他服務的調用。Hystrix隔離模式支持線程池和信號量兩種方式。


1,信號量模式

信號量模式控制單個服務提供者執(zhí)行并發(fā)度,比如單個CommondKey下正在請求數為N,若N小于maxConcurrentRequests,則繼續(xù)執(zhí)行;若大于等于maxConcurrentRequests,則直接拒絕,進入降級邏輯。信號量模式使用請求線程本身執(zhí)行,沒有線程上下文切換,開銷較小,但超時機制失效。

【AbstractCommand】Observable<R>applyHystrixSemantics(finalAbstractCommand<R> _cmd):嘗試獲取信號量,若能獲取到,則繼續(xù)調用服務提供者;若不能獲取到,則進入降級策略。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

? ? executionHook.onStart(_cmd);

? ? //判斷熔斷器是否通過

? ? if (circuitBreaker.attemptExecution()) {

? ? ? ? //獲取信號量

? ? ? ? final TryableSemaphore executionSemaphore = getExecutionSemaphore();

? ? ? ? final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

? ? ? ? final Action0 singleSemaphoreRelease = new Action0() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? if (semaphoreHasBeenReleased.compareAndSet(false, true)) {

? ? ? ? ? ? ? ? ? ? executionSemaphore.release();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void call(Throwable t) {

? ? ? ? ? ? ? ? eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? //嘗試獲取信號量

? ? ? ? if (executionSemaphore.tryAcquire()) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? //記錄業(yè)務執(zhí)行開始時間

? ? ? ? ? ? ? ? executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

? ? ? ? ? ? ? ? //繼續(xù)執(zhí)行業(yè)務

? ? ? ? ? ? ? ? return executeCommandAndObserve(_cmd)

? ? ? ? ? ? ? ? ? ? ? ? .doOnError(markExceptionThrown)

? ? ? ? ? ? ? ? ? ? ? ? .doOnTerminate(singleSemaphoreRelease)

? ? ? ? ? ? ? ? ? ? ? ? .doOnUnsubscribe(singleSemaphoreRelease);

? ? ? ? ? ? } catch (RuntimeException e) {

? ? ? ? ? ? ? ? return Observable.error(e);

? ? ? ? ? ? }

? ? ? ? } else {

? ? ? ? ? ? //信號量拒絕,進入降級邏輯

? ? ? ? ? ? return handleSemaphoreRejectionViaFallback();

? ? ? ? }

? ? } else {

? ? ? ? //熔斷器拒絕,直接短路,進入降級邏輯

? ? ? ? return handleShortCircuitViaFallback();

? ? }

}

【AbstractCommand】TryableSemaphore getExecutionSemaphore():獲取信號量實例,若當前隔離模式為信號量,則根據commandKey獲取信號量,不存在時初始化并緩存;若當前隔離模式為線程池,則使用默認信號量TryableSemaphoreNoOp.DEFAULT,全部請求可通過。

protected TryableSemaphore getExecutionSemaphore() {

? ? //判斷隔離模式是否為信號量

? ? if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {

? ? ? ? if (executionSemaphoreOverride == null) {

? ? ? ? ? ? //獲取信號量

? ? ? ? ? ? TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());

? ? ? ? ? ? if (_s == null) {

? ? ? ? ? ? ? ? //初始化信號量并緩存

? ? ? ? ? ? ? ? executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));

? ? ? ? ? ? ? ? //返回信號量

? ? ? ? ? ? ? ? return executionSemaphorePerCircuit.get(commandKey.name());

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? return _s;

? ? ? ? ? ? }

? ? ? ? } else {

? ? ? ? ? ? return executionSemaphoreOverride;

? ? ? ? }

? ? } else {

? ? ? ? //返回默認信號量,任何請求均可通過

? ? ? ? return TryableSemaphoreNoOp.DEFAULT;

? ? }

}

線程池模式控制單個服務提供者執(zhí)行并發(fā)度,代碼上都會先走獲取信號量,只是使用默認信號量,全部請求可通過,然后實際調用線程池邏輯。線程池模式下,比如單個CommondKey下正在請求數為N,若N小于maximumPoolSize,會先從 Hystrix 管理的線程池里面獲得一個線程,然后將參數傳遞給任務線程去執(zhí)行真正調用,如果并發(fā)請求數多于線程池線程個數,就有任務需要進入隊列排隊,但排隊隊列也有上限,如果排隊隊列也滿,則進去降級邏輯。線程池模式可以支持異步調用,支持超時調用,存在線程切換,開銷大。

【AbstractCommand】Observable<R>executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd):從線程池中獲取線程,并執(zhí)行,過程中記錄線程狀態(tài)。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

? ? ? //判斷是否為線程池隔離模式

? ? ? if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

? ? ? ? ? return Observable.defer(new Func0<Observable<R>>() {

? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? public Observable<R> call() {

? ? ? ? ? ? ? ? ? executionResult = executionResult.setExecutionOccurred();

? ? ? ? ? ? ? ? ? if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

? ? ? ? ? ? ? ? ? ? ? return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? //統計信息

? ? ? ? ? ? ? ? ? metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

? ? ? ? ? ? ? ? ? //判斷是否超時,若超時,直接拋出異常

? ? ? ? ? ? ? ? ? if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {

? ? ? ? ? ? ? ? ? ? ? return Observable.error(new RuntimeException("timed out before executing run()"));

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? //更新線程狀態(tài)為已開始

? ? ? ? ? ? ? ? ? if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

? ? ? ? ? ? ? ? ? ? ? HystrixCounters.incrementGlobalConcurrentThreads();

? ? ? ? ? ? ? ? ? ? ? threadPool.markThreadExecution();

? ? ? ? ? ? ? ? ? ? ? endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

? ? ? ? ? ? ? ? ? ? ? executionResult = executionResult.setExecutedInThread();

? ? ? ? ? ? ? ? ? ? ? //執(zhí)行hook,若異常,則直接拋出異常

? ? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? ? executionHook.onThreadStart(_cmd);

? ? ? ? ? ? ? ? ? ? ? ? ? executionHook.onRunStart(_cmd);

? ? ? ? ? ? ? ? ? ? ? ? ? executionHook.onExecutionStart(_cmd);

? ? ? ? ? ? ? ? ? ? ? ? ? return getUserExecutionObservable(_cmd);

? ? ? ? ? ? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? ? ? ? ? ? return Observable.error(ex);

? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? //空返回

? ? ? ? ? ? ? ? ? ? ? return Observable.empty();

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? }

? ? ? ? ? }).doOnTerminate(new Action0() {

? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? ? //結束邏輯,省略

? ? ? ? ? ? ? }

? ? ? ? ? }).doOnUnsubscribe(new Action0() {

? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? ? //取消訂閱邏輯,省略

? ? ? ? ? ? ? }

? ? ? ? ? ? ? //從線程池中獲取業(yè)務執(zhí)行線程

? ? ? ? ? }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? public Boolean call() {

? ? ? ? ? ? ? ? ? //判斷是否超時

? ? ? ? ? ? ? ? ? return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

? ? ? ? ? ? ? }

? ? ? ? ? }));

? ? ? } else {

? ? ? ? ? //信號量模式

? ? ? ? ? //省略

? ? ? }

? }

【HystrixThreadPool】Subscription schedule(final Action0 action):HystrixContextScheduler是Hystrix對rx中Scheduler調度器的重寫,主要為了實現在Observable未被訂閱時,不執(zhí)行命令,以及支持在命令執(zhí)行過程中能夠打斷運行。在rx中,Scheduler將生成對應的Worker給Observable用于執(zhí)行命令,由Worker具體負責相關執(zhí)行線程的調度,ThreadPoolWorker是Hystrix自行實現的Worker,執(zhí)行調度的核心方法。


public Subscription schedule(final Action0 action) {

? ? //若無訂閱,則不執(zhí)行直接返回

? ? if (subscription.isUnsubscribed()) {

? ? ? ? return Subscriptions.unsubscribed();

? ? }

? ? ScheduledAction sa = new ScheduledAction(action);

? ? subscription.add(sa);

? ? sa.addParent(subscription);

? ? //獲取線程池

? ? ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();

? ? //提交執(zhí)行任務

? ? FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

? ? sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

? ? return sa;

}


Hystrix超時機制降低了第三方依賴項延遲過高對調用方的影響,使請求快速失敗。主要通過延遲任務機制實現,包括注冊延時任務過程和執(zhí)行延時任務過程。

當隔離策略為線程池時,主線程訂閱執(zhí)行結果,線程池中任務線程調用提供者服務端,同時會有定時器線程在一定時間后檢測任務是否完成,若未完成則表示任務超時,拋出超時異常,并且后續(xù)任務線程的執(zhí)行結果也會跳過不再發(fā)布;若已完成則表示任務在超時時間內完成執(zhí)行完成,定時器檢測任務結束。

當隔離策略為信號量時,主線程訂閱執(zhí)行結果并實際調用提供者服務端(沒有任務線程),當超出指定時間,主線程仍然會執(zhí)行完業(yè)務調用,然后拋出超時異常。信號量模式下超時配置有一定缺陷,不能取消在執(zhí)行的調用,并不能限制主線程返回時間。

【AbstractCommand】Observable<R>executeCommandAndObserve(finalAbstractCommand<R> \_cmd):超時檢測入口,執(zhí)行l(wèi)ift(new HystrixObservableTimeoutOperator<R>(\_cmd))關聯超時檢測任務。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

? ? //省略

? ? Observable<R> execution;

? ? //判斷是否開啟超時檢測

? ? if (properties.executionTimeoutEnabled().get()) {

? ? ? ? execution = executeCommandWithSpecifiedIsolation(_cmd)

? ? ? ? ? ? ? ? //增加超時檢測操作

? ? ? ? ? ? ? ? .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

? ? } else {

? ? ? ? //正常執(zhí)行

? ? ? ? execution = executeCommandWithSpecifiedIsolation(_cmd);

? ? }

? ? return execution.doOnNext(markEmits)

? ? ? ? ? ? .doOnCompleted(markOnCompleted)

? ? ? ? ? ? .onErrorResumeNext(handleFallback)

? ? ? ? ? ? .doOnEach(setRequestContext);

}

【HystrixObservableTimeoutOperator】Subscriber<? super R> call(final Subscriber<? super R> child):創(chuàng)建檢測任務,并關聯延遲任務;若檢測任務執(zhí)行時仍未執(zhí)行完成,則拋出超時異常;若已執(zhí)行完成或異常,則清除檢測任務。

public Subscriber<? super R> call(final Subscriber<? super R> child) {

? ? ? ? final CompositeSubscription s = new CompositeSubscription();

? ? ? ? child.add(s);

? ? ? ? final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

? ? ? ? //實列化監(jiān)聽器

? ? ? ? TimerListener listener = new TimerListener() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void tick() {

? ? ? ? ? ? ? ? //若任務未執(zhí)行完成,則更新為超時

? ? ? ? ? ? ? ? if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {

? ? ? ? ? ? ? ? ? ? // 上報超時失敗

? ? ? ? ? ? ? ? ? ? originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

? ? ? ? ? ? ? ? ? ? // 取消訂閱

? ? ? ? ? ? ? ? ? ? s.unsubscribe();

? ? ? ? ? ? ? ? ? ? final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? ? ? ? ? ? ? child.onError(new HystrixTimeoutException());

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? });

? ? ? ? ? ? ? ? ? ? //拋出超時異常

? ? ? ? ? ? ? ? ? ? timeoutRunnable.run();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? //超時時間配置

? ? ? ? ? ? @Override

? ? ? ? ? ? public int getIntervalTimeInMilliseconds() {

? ? ? ? ? ? ? ? return originalCommand.properties.executionTimeoutInMilliseconds().get();

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? //注冊監(jiān)聽器,關聯檢測任務

? ? ? ? final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

? ? ? ? originalCommand.timeoutTimer.set(tl);

? ? ? ? Subscriber<R> parent = new Subscriber<R>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onCompleted() {

? ? ? ? ? ? ? ? if (isNotTimedOut()) {

? ? ? ? ? ? ? ? ? ? // 未超時情況下,任務執(zhí)行完成,清除超時檢測任務

? ? ? ? ? ? ? ? ? ? tl.clear();

? ? ? ? ? ? ? ? ? ? child.onCompleted();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onError(Throwable e) {

? ? ? ? ? ? ? ? if (isNotTimedOut()) {

? ? ? ? ? ? ? ? ? ? // 未超時情況下,任務執(zhí)行異常,清除超時檢測任務

? ? ? ? ? ? ? ? ? ? tl.clear();

? ? ? ? ? ? ? ? ? ? child.onError(e);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onNext(R v) {

? ? ? ? ? ? ? ? ? ? //未超時情況下,發(fā)布執(zhí)行結果;超時時則直接跳過發(fā)布執(zhí)行結果

? ? ? ? ? ? ? ? if (isNotTimedOut()) {

? ? ? ? ? ? ? ? ? ? child.onNext(v);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? //判斷是否超時

? ? ? ? ? ? private boolean isNotTimedOut() {

? ? ? ? ? ? ? ? return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||

? ? ? ? ? ? ? ? ? ? ? ? originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? s.add(parent);

? ? ? ? return parent;

? ? }

}

【HystrixTimer】Reference<TimerListener>addTimerListener(finalTimerListener listener):addTimerListener通過java的定時任務服務scheduleAtFixedRate在延遲超時時間后執(zhí)行。

public Reference<TimerListener> addTimerListener(final TimerListener listener) {

? ? //初始化xian

? ? startThreadIfNeeded();

? ? //構造檢測任務

? ? Runnable r = new Runnable() {

? ? ? ? @Override

? ? ? ? public void run() {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? listener.tick();

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? logger.error("Failed while ticking TimerListener", e);

? ? ? ? ? ? }

? ? ? ? }

? ? };

? ? //延遲執(zhí)行檢測任務

? ? ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);

? ? return new TimerReference(listener, f);

}


降級

Hystrix降級邏輯作為兜底的策略,當出現業(yè)務執(zhí)行異常、線程池或信號量已滿、執(zhí)行超時等情況時,會進入降級邏輯。降級邏輯中應從內存或靜態(tài)邏輯獲取通用返回,盡量不依賴依賴網絡調用,如果未實現降級方法或降級方法中也出現異常,則業(yè)務線程中會引發(fā)異常。


【AbstractCommand】Observable<R> getFallbackOrThrowException(finalAbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException):首先判斷是否為不可恢復異常,若是則不走降級邏輯,直接異常返回;其次判斷是否能獲取到降級信號量,然后走降級邏輯;當降級邏輯中也發(fā)生異?;蛘邲]有降級方法實現時,則異常返回。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {

? ? final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();

? ? long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();

? ? executionResult = executionResult.addEvent((int) latency, eventType);

? ? //判斷是否為不可恢復異常,如棧溢出、OOM等

? ? if (isUnrecoverable(originalException)) {

? ? ? ? logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);

? ? ? ? Exception e = wrapWithOnErrorHook(failureType, originalException);

? ? ? ? //直接返回異常

? ? ? ? return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));

? ? } else {

? ? ? ? //判斷為是否可恢復錯誤

? ? ? ? if (isRecoverableError(originalException)) {

? ? ? ? ? ? logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);

? ? ? ? }

? ? ? ? //判斷降級配置是否打開

? ? ? ? if (properties.fallbackEnabled().get()) {

? ? ? ? ? /**

? ? ? ? ? ? * 省略

? ? ? ? ? ? */

? ? ? ? ? ? final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public Observable<R> call(Throwable t) {

? ? ? ? ? ? ? ? ? ? Exception e = wrapWithOnErrorHook(failureType, originalException);

? ? ? ? ? ? ? ? ? ? Exception fe = getExceptionFromThrowable(t);

? ? ? ? ? ? ? ? ? ? long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();

? ? ? ? ? ? ? ? ? ? Exception toEmit;

? ? ? ? ? ? ? ? ? ? //是否是不支持操作異常,當業(yè)務中沒有覆寫getFallBack方法時,會拋出此異常

? ? ? ? ? ? ? ? ? ? if (fe instanceof UnsupportedOperationException) {

? ? ? ? ? ? ? ? ? ? ? ? logger.debug("No fallback for HystrixCommand. ", fe);

? ? ? ? ? ? ? ? ? ? ? ? eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);

? ? ? ? ? ? ? ? ? ? ? ? executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

? ? ? ? ? ? ? ? ? ? ? ? toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? //執(zhí)行降級邏輯時發(fā)生異常

? ? ? ? ? ? ? ? ? ? ? ? logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);

? ? ? ? ? ? ? ? ? ? ? ? eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);

? ? ? ? ? ? ? ? ? ? ? ? executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

? ? ? ? ? ? ? ? ? ? ? ? toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? //判斷異常是否包裝

? ? ? ? ? ? ? ? ? ? if (shouldNotBeWrapped(originalException)) {

? ? ? ? ? ? ? ? ? ? ? ? //拋出異常

? ? ? ? ? ? ? ? ? ? ? ? return Observable.error(e);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? //拋出異常

? ? ? ? ? ? ? ? ? ? return Observable.error(toEmit);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? };

? ? ? ? ? ? //獲取降級信號量

? ? ? ? ? ? final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();

? ? ? ? ? ? final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

? ? ? ? ? ? final Action0 singleSemaphoreRelease = new Action0() {

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? ? ? if (semaphoreHasBeenReleased.compareAndSet(false, true)) {

? ? ? ? ? ? ? ? ? ? ? ? fallbackSemaphore.release();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? };

? ? ? ? ? ? Observable<R> fallbackExecutionChain;

? ? ? ? ? ? // 嘗試獲取降級信號量

? ? ? ? ? ? if (fallbackSemaphore.tryAcquire()) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? //判斷是否定義了fallback方法

? ? ? ? ? ? ? ? ? ? if (isFallbackUserDefined()) {

? ? ? ? ? ? ? ? ? ? ? ? executionHook.onFallbackStart(this);

? ? ? ? ? ? ? ? ? ? ? ? //執(zhí)行降級邏輯

? ? ? ? ? ? ? ? ? ? ? ? fallbackExecutionChain = getFallbackObservable();

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? //執(zhí)行降級邏輯

? ? ? ? ? ? ? ? ? ? ? ? fallbackExecutionChain = getFallbackObservable();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? ? ? fallbackExecutionChain = Observable.error(ex);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? return fallbackExecutionChain

? ? ? ? ? ? ? ? ? ? ? ? .doOnEach(setRequestContext)

? ? ? ? ? ? ? ? ? ? ? ? .lift(new FallbackHookApplication(_cmd))

? ? ? ? ? ? ? ? ? ? ? ? .lift(new DeprecatedOnFallbackHookApplication(_cmd))

? ? ? ? ? ? ? ? ? ? ? ? .doOnNext(markFallbackEmit)

? ? ? ? ? ? ? ? ? ? ? ? .doOnCompleted(markFallbackCompleted)

? ? ? ? ? ? ? ? ? ? ? ? .onErrorResumeNext(handleFallbackError)

? ? ? ? ? ? ? ? ? ? ? ? .doOnTerminate(singleSemaphoreRelease)

? ? ? ? ? ? ? ? ? ? ? ? .doOnUnsubscribe(singleSemaphoreRelease);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? //處理降級信號量拒絕異常

? ? ? ? ? ? ? return handleFallbackRejectionByEmittingError();

? ? ? ? ? ? }

? ? ? ? } else {

? ? ? ? ? ? //處理降級配置關閉時異常

? ? ? ? ? ? return handleFallbackDisabledByEmittingError(originalException, failureType, message);

? ? ? ? }

? ? }

}

【HystrixCommand】R getFallback():HystrixCommand默認拋出操作不支持異常,需要子類覆寫getFalBack方法實現降級邏輯。

protected R getFallback() {

? ? throw new UnsupportedOperationException("No fallback available.");

}

健康統計

Hystrix基于通過滑動窗口的數據統計判定服務失敗占比選擇性熔斷,能夠實現快速失敗并走降級邏輯。步驟如下:

AbstractCommand執(zhí)行完成后調? handleCommandEnd?法將執(zhí)行結果HystrixCommandCompletion事件發(fā)布到事件流中;

事件流通過 Observable.window()?法將事件按時間分組,并通過 flatMap()?法將事件按類型(成功、失敗等)聚合成桶,形成桶流;

再將各個桶使?Observable.window()按窗口內桶數量聚合成滑動窗?數據;

將滑動窗口數據聚合成數據對象(如健康數據流、累計數據等);

熔斷器CircuitBreaker初始化時訂閱健康數據流,根據健康情況修改熔斷器的開關。


【AbstractCommand】void handleCommandEnd(boolean commandExecutionStarted):在業(yè)務執(zhí)行完畢后,會調用handleCommandEnd方法,在此方法中,上報執(zhí)行結果executionResult,這也是健康統計的入口。

private void handleCommandEnd(boolean commandExecutionStarted) {

? ? Reference<TimerListener> tl = timeoutTimer.get();

? ? if (tl != null) {

? ? ? ? tl.clear();

? ? }

? ? long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;

? ? executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);

? ? //執(zhí)行結果上報健康統計

? ? if (executionResultAtTimeOfCancellation == null) {

? ? ? ? metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);

? ? } else {

? ? ? ? metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);

? ? }

? ? if (endCurrentThreadExecutingCommand != null) {

? ? ? ? endCurrentThreadExecutingCommand.call();

? ? }

}

【BucketedRollingCounterStream】BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> re-duceBucket)

健康統計類HealthCountsStream的滑動窗口實現主要是在父類BucketedRollingCounterStream,首先父類BucketedCounterStream將事件流處理成桶流,BucketedRollingCounterStream處理成滑動窗口,然后由HealthCountsStream傳入的reduceBucket函數處理成健康統計信息.

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? final Func2<Bucket, Event, Bucket> appendRawEventToBucket,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? final Func2<Output, Bucket, Output> reduceBucket) {

? ? //調用父類,數據處理成桶流

? ? super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);

? ? //根據傳入的reduceBucket函數,處理滑動窗口內數據

? ? Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {

? ? ? ? @Override

? ? ? ? public Observable<Output> call(Observable<Bucket> window) {

? ? ? ? ? ? return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);

? ? ? ? }

? ? };

? ? //對父類桶流數據進行操作

? ? this.sourceStream = bucketedStream

? ? //窗口內桶數量為numBuckets,每次移動1個桶

? ? ? ? ? ? .window(numBuckets, 1)

? ? ? ? ? ? //滑動窗口內數據處理

? ? ? ? ? ? .flatMap(reduceWindowToSummary)

? ? ? ? ? ? .doOnSubscribe(new Action0() {

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? ? ? isSourceCurrentlySubscribed.set(true);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? })

? ? ? ? ? ? .doOnUnsubscribe(new Action0() {

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void call() {

? ? ? ? ? ? ? ? ? ? isSourceCurrentlySubscribed.set(false);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? })

? ? ? ? ? ? .share()

? ? ? ? ? ? .onBackpressureDrop();

}

【HealthCounts】HealthCounts plus(long[] eventTypeCounts):對桶內數據按事件類型累計,生成統計數據HealthCounts;

public HealthCounts plus(long[] eventTypeCounts) {

? ? long updatedTotalCount = totalCount;

? ? long updatedErrorCount = errorCount;

? ? long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];

? ? long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];

? ? long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];

? ? long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];

? ? long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

? ? //總數

? ? updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);

? ? //失敗數

? ? updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);

? ? return new HealthCounts(updatedTotalCount, updatedErrorCount);

}


summary


在分布式環(huán)境中,不可避免地會有許多服務的依賴項中有的失敗。Hystrix作為一個庫,可通過添加熔斷、隔離、降級等邏輯來幫助用戶控制分布式服務之間的交互,以提高系統的整體彈性。主要功能如下:

保護系統,控制來自訪問第三方依賴項(通常是通過網絡)的延遲和失敗

阻止復雜分布式系統中的級聯故障

快速失敗并快速恢復

平滑降級

近乎實時的監(jiān)控,警報和控制

Hystrix使用過程中,有一些要注意的點:

覆寫的getFallback()方法,盡量不要有網絡依賴。如果有網絡依賴,建議采用多次降級,即在getFallback()內實例化 HystrixCommand,并執(zhí)行Command。getFallback()盡量保證高性能返回,快速降級。

HystrixCommand 建議采用的是線程隔離策略。

hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize設置為true時,hystrix.threadpool.default.maximumSize才會生效。最大線程數需要根據業(yè)務自身情況和性能測試結果來考量,盡量初始時設置小一些,支持動態(tài)調整大小,因為它是減少負載并防止資源在延遲發(fā)生時被阻塞的主要工具。

信號隔離策略下,執(zhí)行業(yè)務邏輯時,使用的是應用服務的父級線程(如Tomcat容器線程)。所以,一定要設置好并發(fā)量,有網絡開銷的調用,不建議使用該策略,容易導致容器線程排隊堵塞,從而影響整個應用服務。

另外Hystrix高度依賴RxJava這個響應式函數編程框架,簡單了解RxJava的使用方式,有利于理解源碼邏輯。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容