簡(jiǎn)介
| 組/前綴 | 配置項(xiàng) | 默認(rèn)值 | 說(shuō)明 |
|---|---|---|---|
| Execution | execution.isolation.strategy | THREAD | 二選一 THREAD 或 SEMAPHORE |
| hystrix.command.default/ hystrix.command.HystrixCommandKey | execution.isolation.thread.timeoutInMilliseconds | 1000 | 調(diào)用超時(shí)時(shí)間設(shè)置,超時(shí)后觸發(fā)fallback |
| execution.timeout.enabled | true | 是否啟用超時(shí)機(jī)制 | |
| execution.isolation.thread.interruptOnTimeout | true | 超時(shí)后是否中斷執(zhí)行(只在THREAD模式下有效) | |
| execution.isolation.thread.interruptOnCancel | false | 取消時(shí)是否中斷執(zhí)行(只在THREAD模式下有效) | |
| execution.isolation.semaphore.maxConcurrentRequests | 10 | 最大并發(fā)數(shù),超過(guò)會(huì)被拒絕(只在SEMAPHORE模式下有效) | |
| Fallback | fallback.isolation.semaphore.maxConcurrentRequests | 10 | fallback最大并發(fā)數(shù)(不論Execution是什么模式,fallback都是SEMAPHORE模式) |
| hystrix.command.default/ hystrix.command.HystrixCommandKey | fallback.enabled | true | 是否開(kāi)啟fallback功能 |
| Circuit Breaker | circuitBreaker.enabled | true | 是否開(kāi)啟斷路器 |
| hystrix.command.default/ hystrix.command.HystrixCommandKey | circuitBreaker.requestVolumeThreshold | 20 | 斷路器開(kāi)啟的最小請(qǐng)求次數(shù) |
| circuitBreaker.sleepWindowInMilliseconds | 5000 | 斷路器開(kāi)啟后的維持時(shí)間,到時(shí)間后會(huì)處于半開(kāi)狀態(tài)放一個(gè)請(qǐng)求進(jìn)來(lái) | |
| circuitBreaker.errorThresholdPercentage | 50 | 執(zhí)行失敗比例超過(guò)多少后開(kāi)啟斷路 | |
| circuitBreaker.forceOpen | false | 是否強(qiáng)制開(kāi)啟斷路器 | |
| circuitBreaker.forceClosed | false | 是否強(qiáng)制關(guān)閉斷路器 | |
| Metrics | metrics.rollingStats.timeInMilliseconds | 10000 | 統(tǒng)計(jì)的時(shí)間窗口 |
| hystrix.command.default/ hystrix.command.HystrixCommandKey | metrics.rollingStats.numBuckets | 10 | 統(tǒng)計(jì)時(shí)間窗口內(nèi)的細(xì)分個(gè)數(shù) |
| metrics.rollingPercentile.enabled | true | 啟用百分比直方圖 | |
| metrics.rollingPercentile.timeInMilliseconds | 60000 | 統(tǒng)計(jì)的時(shí)間窗口 | |
| metrics.rollingPercentile.numBuckets | 6 | 統(tǒng)計(jì)時(shí)間窗口內(nèi)的細(xì)分個(gè)數(shù) | |
| metrics.rollingPercentile.bucketSize | 100 | 沒(méi)用。。 | |
| metrics.healthSnapshot.intervalInMilliseconds | 500 | HealthCounts 專用統(tǒng)計(jì)窗口(對(duì)斷路器起作用) | |
| Request Context | requestCache.enabled | true | 是否啟用RequestScope的緩存 |
| hystrix.command.default/ hystrix.command.HystrixCommandKey | requestLog.enabled | true | 是否記錄執(zhí)行的細(xì)節(jié)日志 |
| Collapser Properties | maxRequestsInBatch | Integer.MAX_VALUE | 一批的最大請(qǐng)求樹(shù) |
| hystrix.collapser.default/ hystrix.collapser.HystrixCollapserKey | timerDelayInMilliseconds | 10 | 批量處理收集請(qǐng)求的時(shí)間窗口 |
| requestCache.enabled | true | 啟用requestscope緩存,同Command緩存,配置前綴為hystrix.collapser.XXX | |
| ThreadPool Properties | coreSize | 10 | 核心線程數(shù) |
| hystrix.threadpool.default/ hystrix.threadpool.HystrixThreadPoolKey | maximumSize | 10 | 最大線程數(shù) |
| maxQueueSize | -1 | 等待隊(duì)列最大長(zhǎng)度 | |
| queueSizeRejectionThreshold | 5 | 動(dòng)態(tài)調(diào)整等待隊(duì)列大小 | |
| keepAliveTimeMinutes | 1 | 空閑線程回收時(shí)間 | |
| allowMaximumSizeToDivergeFromCoreSize | false | 設(shè)為true之后最大線程數(shù)和核心線程數(shù)可以設(shè)不同的值 | |
| metrics.rollingStats.timeInMilliseconds | 10000 | 線程池統(tǒng)計(jì)時(shí)間窗口 | |
| metrics.rollingStats.numBuckets | 10 | 線程池統(tǒng)計(jì)滑動(dòng)窗口數(shù) |
詳解
-
Execution
-
execution.isolation.strategy
- 兩種主要模式:“線程池隔離”或“信號(hào)量隔離”
- com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(...) ... .subscribeOn(threadPool.getScheduler()) }else{ return Observable.defer(...); }- 這里需要一些RXJAVA的基礎(chǔ)。 上面的邏輯 “subscribeOn(threadPool.getScheduler())” 在 “線程池隔離”模式下會(huì)讓調(diào)用在線程池中執(zhí)行。 而在“信號(hào)量隔離”模式下沒(méi)有特殊設(shè)置,默認(rèn)是在當(dāng)前線程執(zhí)行
-
execution.timeout.enabled
- com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { ... if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } ... }- lift是一個(gè)通用的Obserable操作,類似于代理,里面添加了超時(shí)的攔截邏輯。
- 內(nèi)部會(huì)創(chuàng)建一個(gè)TimeListner在另外的線程中固定時(shí)間后調(diào)用,幷取消下游訂閱,拋出超時(shí)異常等,詳細(xì)可以查看TimerListener#tick功能
-
execution.isolation.thread.timeoutInMilliseconds
- 參考TimerListener#getIntervalTimeInMilliseconds
-
execution.isolation.thread.interruptOnTimeout
- 當(dāng)超時(shí)發(fā)生時(shí)會(huì)調(diào)用TimerListener.tick方法,里面會(huì)調(diào)用unsubscribe
- com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$FutureCompleterWithConfigurableInterrupt#unsubscribe
private static class FutureCompleterWithConfigurableInterrupt implements Subscription { public void unsubscribe() { if (shouldInterruptThread.call()) { futureTask.cancel(true); }else{ futureTask.cancel(false); } } } -
execution.isolation.thread.interruptOnCancel
- 當(dāng)原始接口返回Future類型的時(shí)候,這時(shí)候任務(wù)可以被外面手動(dòng)cancel。這個(gè)配置就有作用了。
- com.netflix.hystrix.HystrixCommand#queue
public Future<R> queue() { final Future<R> f = new Future<R>() { public boolean cancel(boolean mayInterruptIfRunning) { ... final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); t.interrupt(); } ... } } } -
execution.isolation.semaphore.maxConcurrentRequests
- 指定了SEMAPHORE模式下的最大并發(fā)數(shù)
- AbstractCommand$TryableSemaphore接口和JDK的Semaphore功能類似,不過(guò)這個(gè)不會(huì)阻塞,并發(fā)性能更好。
- 使用方式參考AbstractCommand#applyHystrixSemantics
-
-
Fallback
- fallback.isolation.semaphore.maxConcurrentRequests
- fallback.enabled
- SEMAPHORE用法同EXECUTION, 無(wú)論EXECUTION是什么模式,fallback都是SEMAPHORE模式
-
Circuit Breaker
-
circuitBreaker.enabled
- AbstractCommand#initCircuitBreaker
if (enabled) { HystrixCircuitBreaker.Factory.getInstance(...); }else{ return new NoOpCircuitBreaker(); }- AbstractCommand#applyHystrixSemantics
if (circuitBreaker.attemptExecution()) { ...//繼續(xù)執(zhí)行 }else{ return handleShortCircuitViaFallback();//直接調(diào)用fallback } } ``` circuitBreaker.requestVolumeThreshold
-
circuitBreaker.errorThresholdPercentage
- com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#subscribeToStream
public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat windo }else{ if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window }else{ if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } circuitBreaker.sleepWindowInMilliseconds
circuitBreaker.forceOpen
-
circuitBreaker.forceClosed
- com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#allowRequest
public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; }else{ if (status.get().equals(Status.HALF_OPEN)) { return false; }else{ return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; }
-
-
Metrics
- metrics.rollingStats.timeInMilliseconds
- metrics.rollingStats.numBuckets
- metrics.rollingPercentile.enabled
- metrics.rollingPercentile.timeInMilliseconds
- metrics.rollingPercentile.numBuckets
- metrics.rollingPercentile.bucketSize
- metrics.healthSnapshot.intervalInMilliseconds
- 大致類結(jié)構(gòu)是這樣的
- BucketedCounterStream 桶計(jì)算基類
- BucketedCumulativeCounterStream 累計(jì)桶計(jì)算基類
- CumulativeCollapserEventCounterStream 累計(jì)計(jì)算Collapser事件
- CumulativeCommandEventCounterStream 累計(jì)計(jì)算Command執(zhí)行事件
- CumulativeThreadPoolEventCounterStream 累計(jì)計(jì)算線程池事件
- BucketedRollingCounterStream 滾動(dòng)桶計(jì)算基類
- HealthCountsStream 健康狀態(tài)統(tǒng)計(jì)(用于斷路器)
- RollingCollapserEventCounterStream 滾動(dòng)計(jì)算Collapser事件
- RollingCommandEventCounterStream 滾動(dòng)計(jì)算Command執(zhí)行事件
- RollingThreadPoolEventCounterStream 滾動(dòng)計(jì)算線程池事件
- BucketedCumulativeCounterStream 累計(jì)桶計(jì)算基類
- RollingDistributionStream 直方圖基類(百分比)
- RollingCollapserBatchSizeDistributionStream 統(tǒng)計(jì)Collapser批大小
- RollingCommandLatencyDistributionStream 統(tǒng)計(jì)Command執(zhí)行延遲
- RollingCommandUserLatencyDistributionStream 統(tǒng)計(jì)用戶線程執(zhí)行延遲
- metrics.rollingStats.* 對(duì)大多數(shù)桶計(jì)算實(shí)現(xiàn)有效
- metrics.rollingPercentile.* 對(duì)所有直方圖統(tǒng)計(jì)有效
- metrics.healthSnapshot.intervalInMilliseconds特殊,只用于HealthCountsStream, 斷路器會(huì)使用這個(gè)統(tǒng)計(jì)數(shù)據(jù)來(lái)執(zhí)行斷路判斷
- BucketedCounterStream 桶計(jì)算基類
- 整個(gè)metric都是類似的套路,統(tǒng)計(jì)滑動(dòng)時(shí)間窗口內(nèi)的數(shù)據(jù)。主要是用到了Rxjava的window方法
- 我們以com.netflix.hystrix.metric.consumer.RollingDistributionStream為例
rollingDistributionStream = stream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //按時(shí)間窗口分組事件 .flatMap(reduceBucketToSingleDistribution) //把事件轉(zhuǎn)換成數(shù)據(jù) .startWith(emptyDistributionsToStart) //數(shù)據(jù)初始結(jié)構(gòu) .window(numBuckets, 1) //切分成numBuckets份,每次滑動(dòng)一份大小的窗口 .flatMap(reduceWindowToSingleDistribution) //統(tǒng)計(jì)每個(gè)窗口numBuckets份的數(shù)據(jù) .map(cacheHistogramValues) //其他邏輯 .share() //緩存計(jì)算 .onBackpressureDrop(); //下游計(jì)算跟不上上游發(fā)送時(shí),丟棄數(shù)據(jù) -
Request Context
-
requestCache.enabled
- 開(kāi)啟HTTP request scope的緩存執(zhí)行,同請(qǐng)求在線程間共享
- 需要設(shè)置緩存的key, 可以使用@CacheResult/@CacheKey/實(shí)現(xiàn)AbstractCommand#getCacheKey其中一種來(lái)實(shí)現(xiàn)
- 需要一個(gè)servletFilter來(lái)開(kāi)啟和結(jié)束上下文
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } }- 用的人不是很多,可以用SpringCache + @RequestScope實(shí)現(xiàn)
-
requestLog.enabled
- 貌似只是記錄了所有命令的執(zhí)行情況,幷沒(méi)有實(shí)際的打印動(dòng)作??梢宰约簩?shí)現(xiàn)
- 參考HystrixRequestLog#getExecutedCommandsAsString
-
-
Collapser Properties
-
maxRequestsInBatch
- com.netflix.hystrix.collapser.RequestBatch#offer
public Observable<ResponseType> offer(RequestArgumentType arg) { ... if (argumentMap.size() >= maxBatchSize) { return null;//超過(guò)批量大小,外層拿到null之后會(huì)新建一個(gè)batch }else{ CollapsedRequestSubject<> collapsedRequest = new CollapsedRequestSubject<>(arg, this); //放入緩存 CollapsedRequestSubject<> existing = argumentMap.putIfAbsent(arg, collapsedRequest); if (existing != null) { return existing.toObservable(); }else{ return collapsedRequest.toObservable(); } } } -
timerDelayInMilliseconds
- com.netflix.hystrix.collapser.RequestCollapser#submitRequest
public Observable<ResponseType> submitRequest(final RequestArgumentType arg) { if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) { /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */ //設(shè)置定時(shí)任務(wù),timerDelayInMilliseconds后運(yùn)行 timerListenerReference.set(timer.addListener(new CollapsedTask())); } }private class CollapsedTask implements TimerListener{ public int getIntervalTimeInMilliseconds() { return properties.timerDelayInMilliseconds().get(); } public void tick(){ RequestBatch<> currentBatch = batch.get(); if (currentBatch != null && currentBatch.getSize() > 0) { //新建一個(gè)batch,幷?qǐng)?zhí)行前一個(gè)batch createNewBatchAndExecutePreviousIfNeeded(currentBatch);//新建一個(gè)batch,幷?qǐng)?zhí)行前一個(gè)batch } } } -
requestCache.enabled
- collapser用的緩存開(kāi)關(guān)
-
-
Thread Pool Properties
- coreSize
- maximumSize
- maxQueueSize
- queueSizeRejectionThreshold
- keepAliveTimeMinutes
- allowMaximumSizeToDivergeFromCoreSize
- metrics.rollingStats.timeInMilliseconds
- metrics.rollingStats.numBuckets
- 基本都是ThreadPool的常規(guī)配置, 詳見(jiàn)HystrixConcurrencyStrategy#getThreadPool
- 官方推薦線程數(shù)設(shè)置公式為 線程池大小 = 峰值每秒請(qǐng)求數(shù) * 99%延遲大小 + 富余空間。 比如 30rps * 0.2延遲 = 6, 給一個(gè)富余比例可以設(shè)為10
其他
-
spring如何初始化hystrix?
@EnableCircuitBreaker
-
spring-cloud-netflix-core -> spring.factories
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration
HystrixCommandAspect
HystrixCommandAspect#methodsAnnotatedWithHystrixCommand
-
spring的配置properties是如何注入的?
- spring-cloud-netflix-hystrix 會(huì)引入 spring-cloud-netflix-archaius
- 他的spring.factories中有 ArchaiusAutoConfiguration
- ArchaiusAutoConfiguration#configurableEnvironmentConfiguration
- ArchaiusAutoConfiguration#configureArchaius
- ArchaiusAutoConfiguration#addArchaiusConfiguration
- 以上三部將spring的properties配置轉(zhuǎn)換成netflix的動(dòng)態(tài)ConfigurationManager
-
hystrix屬性是如何動(dòng)態(tài)更新的?
-
ArchaiusAutoConfiguration$PropagateEventsConfiguration#onApplicationEvent監(jiān)聽(tīng)spring的EnvironmentChangeEvent事件,幷轉(zhuǎn)發(fā)給netflix的配置管理器
public void onApplicationEvent(EnvironmentChangeEvent event) { AbstractConfiguration manager = ConfigurationManager.getConfigInstance(); for (String key : event.getKeys()) { for (ConfigurationListener listener : manager .getConfigurationListeners()) { listener.configurationChanged(new ConfigurationEvent(source, type, key, value, beforeUpdate)); } } } ExpandedConfigurationListenerAdapter#configurationChanged
-
DynamicProperty$DynamicPropertyListener#setProperty
private static boolean updateProperty(String propName, Object value) { DynamicProperty prop = ALL_PROPS.get(propName); if (prop != null && prop.updateValue(value)) { prop.notifyCallbacks(); return true; } return false; } HystrixProperty -> HystrixDynamicProperty -> ArchaiusDynamicProperty -> PropertyWrapper -> DynamicProperty
DynamicProperty就是我們最終獲得值和更新值的地方
-