Hystrix配置項(xiàng)詳解

簡(jiǎn)介

組/前綴 配置項(xiàng) 默認(rèn)值 說(shuō)明
Execution execution.isolation.strategy THREAD 二選一 THREAD 或 SEMAPHORE
hystrix.command.default/ hystrix.command.HystrixCommandKey execution.isolation.thread.timeoutInMilliseconds 1000 調(diào)用超時(shí)時(shí)間設(shè)置,超時(shí)后觸發(fā)fallback
execution.timeout.enabled true 是否啟用超時(shí)機(jī)制
execution.isolation.thread.interruptOnTimeout true 超時(shí)后是否中斷執(zhí)行(只在THREAD模式下有效)
execution.isolation.thread.interruptOnCancel false 取消時(shí)是否中斷執(zhí)行(只在THREAD模式下有效)
execution.isolation.semaphore.maxConcurrentRequests 10 最大并發(fā)數(shù),超過(guò)會(huì)被拒絕(只在SEMAPHORE模式下有效)
Fallback fallback.isolation.semaphore.maxConcurrentRequests 10 fallback最大并發(fā)數(shù)(不論Execution是什么模式,fallback都是SEMAPHORE模式)
hystrix.command.default/ hystrix.command.HystrixCommandKey fallback.enabled true 是否開(kāi)啟fallback功能
Circuit Breaker circuitBreaker.enabled true 是否開(kāi)啟斷路器
hystrix.command.default/ hystrix.command.HystrixCommandKey circuitBreaker.requestVolumeThreshold 20 斷路器開(kāi)啟的最小請(qǐng)求次數(shù)
circuitBreaker.sleepWindowInMilliseconds 5000 斷路器開(kāi)啟后的維持時(shí)間,到時(shí)間后會(huì)處于半開(kāi)狀態(tài)放一個(gè)請(qǐng)求進(jìn)來(lái)
circuitBreaker.errorThresholdPercentage 50 執(zhí)行失敗比例超過(guò)多少后開(kāi)啟斷路
circuitBreaker.forceOpen false 是否強(qiáng)制開(kāi)啟斷路器
circuitBreaker.forceClosed false 是否強(qiáng)制關(guān)閉斷路器
Metrics metrics.rollingStats.timeInMilliseconds 10000 統(tǒng)計(jì)的時(shí)間窗口
hystrix.command.default/ hystrix.command.HystrixCommandKey metrics.rollingStats.numBuckets 10 統(tǒng)計(jì)時(shí)間窗口內(nèi)的細(xì)分個(gè)數(shù)
metrics.rollingPercentile.enabled true 啟用百分比直方圖
metrics.rollingPercentile.timeInMilliseconds 60000 統(tǒng)計(jì)的時(shí)間窗口
metrics.rollingPercentile.numBuckets 6 統(tǒng)計(jì)時(shí)間窗口內(nèi)的細(xì)分個(gè)數(shù)
metrics.rollingPercentile.bucketSize 100 沒(méi)用。。
metrics.healthSnapshot.intervalInMilliseconds 500 HealthCounts 專用統(tǒng)計(jì)窗口(對(duì)斷路器起作用)
Request Context requestCache.enabled true 是否啟用RequestScope的緩存
hystrix.command.default/ hystrix.command.HystrixCommandKey requestLog.enabled true 是否記錄執(zhí)行的細(xì)節(jié)日志
Collapser Properties maxRequestsInBatch Integer.MAX_VALUE 一批的最大請(qǐng)求樹(shù)
hystrix.collapser.default/ hystrix.collapser.HystrixCollapserKey timerDelayInMilliseconds 10 批量處理收集請(qǐng)求的時(shí)間窗口
requestCache.enabled true 啟用requestscope緩存,同Command緩存,配置前綴為hystrix.collapser.XXX
ThreadPool Properties coreSize 10 核心線程數(shù)
hystrix.threadpool.default/ hystrix.threadpool.HystrixThreadPoolKey maximumSize 10 最大線程數(shù)
maxQueueSize -1 等待隊(duì)列最大長(zhǎng)度
queueSizeRejectionThreshold 5 動(dòng)態(tài)調(diào)整等待隊(duì)列大小
keepAliveTimeMinutes 1 空閑線程回收時(shí)間
allowMaximumSizeToDivergeFromCoreSize false 設(shè)為true之后最大線程數(shù)和核心線程數(shù)可以設(shè)不同的值
metrics.rollingStats.timeInMilliseconds 10000 線程池統(tǒng)計(jì)時(shí)間窗口
metrics.rollingStats.numBuckets 10 線程池統(tǒng)計(jì)滑動(dòng)窗口數(shù)

詳解

  • Execution

    • execution.isolation.strategy

      • 兩種主要模式:“線程池隔離”或“信號(hào)量隔離”
      • com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
      private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
          if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
              return Observable.defer(...)
              ...
              .subscribeOn(threadPool.getScheduler())
          }else{
               return Observable.defer(...);          }
      
      • 這里需要一些RXJAVA的基礎(chǔ)。 上面的邏輯 “subscribeOn(threadPool.getScheduler())” 在 “線程池隔離”模式下會(huì)讓調(diào)用在線程池中執(zhí)行。 而在“信號(hào)量隔離”模式下沒(méi)有特殊設(shè)置,默認(rèn)是在當(dāng)前線程執(zhí)行
    • execution.timeout.enabled

      • com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
      private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
          ...
          if (properties.executionTimeoutEnabled().get()) {
              execution = executeCommandWithSpecifiedIsolation(_cmd)
                      .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
          } else {
              execution = executeCommandWithSpecifiedIsolation(_cmd);
          }
      
          ...
      }
      
      • lift是一個(gè)通用的Obserable操作,類似于代理,里面添加了超時(shí)的攔截邏輯。
      • 內(nèi)部會(huì)創(chuàng)建一個(gè)TimeListner在另外的線程中固定時(shí)間后調(diào)用,幷取消下游訂閱,拋出超時(shí)異常等,詳細(xì)可以查看TimerListener#tick功能
    • execution.isolation.thread.timeoutInMilliseconds

      • 參考TimerListener#getIntervalTimeInMilliseconds
    • execution.isolation.thread.interruptOnTimeout

      • 當(dāng)超時(shí)發(fā)生時(shí)會(huì)調(diào)用TimerListener.tick方法,里面會(huì)調(diào)用unsubscribe
      • com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$FutureCompleterWithConfigurableInterrupt#unsubscribe
      private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
          public void unsubscribe() {
              if (shouldInterruptThread.call()) {
                  futureTask.cancel(true);
              }else{
                  futureTask.cancel(false);
              }
          }
      }
      
    • execution.isolation.thread.interruptOnCancel

      • 當(dāng)原始接口返回Future類型的時(shí)候,這時(shí)候任務(wù)可以被外面手動(dòng)cancel。這個(gè)配置就有作用了。
      • com.netflix.hystrix.HystrixCommand#queue
          public Future<R> queue() {
              final Future<R> f = new Future<R>() {
                   public boolean cancel(boolean mayInterruptIfRunning) {
                          ...
                          final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                          if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                              final Thread t = executionThread.get();
                              t.interrupt();
                          }
                          ...
                   }
              }
          }
      
    • execution.isolation.semaphore.maxConcurrentRequests

      • 指定了SEMAPHORE模式下的最大并發(fā)數(shù)
      • AbstractCommand$TryableSemaphore接口和JDK的Semaphore功能類似,不過(guò)這個(gè)不會(huì)阻塞,并發(fā)性能更好。
      • 使用方式參考AbstractCommand#applyHystrixSemantics
  • Fallback

    • fallback.isolation.semaphore.maxConcurrentRequests
    • fallback.enabled
    • SEMAPHORE用法同EXECUTION, 無(wú)論EXECUTION是什么模式,fallback都是SEMAPHORE模式
  • Circuit Breaker

    • circuitBreaker.enabled

      • AbstractCommand#initCircuitBreaker
      if (enabled) {
          HystrixCircuitBreaker.Factory.getInstance(...);
      }else{
          return new NoOpCircuitBreaker();
      }
      
      • AbstractCommand#applyHystrixSemantics
          if (circuitBreaker.attemptExecution()) {
              ...//繼續(xù)執(zhí)行
          }else{
              return handleShortCircuitViaFallback();//直接調(diào)用fallback
          }
      }       ```
      
      
    • circuitBreaker.requestVolumeThreshold

    • circuitBreaker.errorThresholdPercentage

      • com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#subscribeToStream
          public void onNext(HealthCounts hc) {
              if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                  // we are not past the minimum volume threshold for the stat windo
              }else{
                  if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                      //we are not past the minimum error threshold for the stat window
                  }else{
                      if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                          circuitOpened.set(System.currentTimeMillis());
                      }
                  }
              }
          }
      
    • circuitBreaker.sleepWindowInMilliseconds

    • circuitBreaker.forceOpen

    • circuitBreaker.forceClosed

      • com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#allowRequest
          public boolean allowRequest() {
              if (properties.circuitBreakerForceOpen().get()) {
                  return false;
              }
              if (properties.circuitBreakerForceClosed().get()) {
                  return true;
              }
              if (circuitOpened.get() == -1) {
                  return true;
              }else{
                  if (status.get().equals(Status.HALF_OPEN)) {
                      return false;
                  }else{
                      return isAfterSleepWindow();
                  }
              }
          }
          
          private boolean isAfterSleepWindow() {
              final long circuitOpenTime = circuitOpened.get();
              final long currentTime = System.currentTimeMillis();
              final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
              return currentTime > circuitOpenTime + sleepWindowTime;
          }
      
  • Metrics

    • metrics.rollingStats.timeInMilliseconds
    • metrics.rollingStats.numBuckets
    • metrics.rollingPercentile.enabled
    • metrics.rollingPercentile.timeInMilliseconds
    • metrics.rollingPercentile.numBuckets
    • metrics.rollingPercentile.bucketSize
    • metrics.healthSnapshot.intervalInMilliseconds
    • 大致類結(jié)構(gòu)是這樣的
      • BucketedCounterStream 桶計(jì)算基類
        • BucketedCumulativeCounterStream 累計(jì)桶計(jì)算基類
          • CumulativeCollapserEventCounterStream 累計(jì)計(jì)算Collapser事件
          • CumulativeCommandEventCounterStream 累計(jì)計(jì)算Command執(zhí)行事件
          • CumulativeThreadPoolEventCounterStream 累計(jì)計(jì)算線程池事件
        • BucketedRollingCounterStream 滾動(dòng)桶計(jì)算基類
          • HealthCountsStream 健康狀態(tài)統(tǒng)計(jì)(用于斷路器)
          • RollingCollapserEventCounterStream 滾動(dòng)計(jì)算Collapser事件
          • RollingCommandEventCounterStream 滾動(dòng)計(jì)算Command執(zhí)行事件
          • RollingThreadPoolEventCounterStream 滾動(dòng)計(jì)算線程池事件
      • RollingDistributionStream 直方圖基類(百分比)
        • RollingCollapserBatchSizeDistributionStream 統(tǒng)計(jì)Collapser批大小
        • RollingCommandLatencyDistributionStream 統(tǒng)計(jì)Command執(zhí)行延遲
        • RollingCommandUserLatencyDistributionStream 統(tǒng)計(jì)用戶線程執(zhí)行延遲
      • metrics.rollingStats.* 對(duì)大多數(shù)桶計(jì)算實(shí)現(xiàn)有效
      • metrics.rollingPercentile.* 對(duì)所有直方圖統(tǒng)計(jì)有效
      • metrics.healthSnapshot.intervalInMilliseconds特殊,只用于HealthCountsStream, 斷路器會(huì)使用這個(gè)統(tǒng)計(jì)數(shù)據(jù)來(lái)執(zhí)行斷路判斷
    • 整個(gè)metric都是類似的套路,統(tǒng)計(jì)滑動(dòng)時(shí)間窗口內(nèi)的數(shù)據(jù)。主要是用到了Rxjava的window方法
    • 我們以com.netflix.hystrix.metric.consumer.RollingDistributionStream為例
    rollingDistributionStream = stream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //按時(shí)間窗口分組事件
        .flatMap(reduceBucketToSingleDistribution)     //把事件轉(zhuǎn)換成數(shù)據(jù) 
        .startWith(emptyDistributionsToStart)          //數(shù)據(jù)初始結(jié)構(gòu)
        .window(numBuckets, 1)                         //切分成numBuckets份,每次滑動(dòng)一份大小的窗口
        .flatMap(reduceWindowToSingleDistribution)     //統(tǒng)計(jì)每個(gè)窗口numBuckets份的數(shù)據(jù)
        .map(cacheHistogramValues)                     //其他邏輯
        .share()                                       //緩存計(jì)算
        .onBackpressureDrop();                         //下游計(jì)算跟不上上游發(fā)送時(shí),丟棄數(shù)據(jù)          
    
    
  • Request Context

    • requestCache.enabled

      • 開(kāi)啟HTTP request scope的緩存執(zhí)行,同請(qǐng)求在線程間共享
      • 需要設(shè)置緩存的key, 可以使用@CacheResult/@CacheKey/實(shí)現(xiàn)AbstractCommand#getCacheKey其中一種來(lái)實(shí)現(xiàn)
      • 需要一個(gè)servletFilter來(lái)開(kāi)啟和結(jié)束上下文
      public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
          HystrixRequestContext context = HystrixRequestContext.initializeContext();
          try {
              chain.doFilter(request, response);
          } finally {
              context.shutdown();
          }
      }
      
      • 用的人不是很多,可以用SpringCache + @RequestScope實(shí)現(xiàn)
    • requestLog.enabled

      • 貌似只是記錄了所有命令的執(zhí)行情況,幷沒(méi)有實(shí)際的打印動(dòng)作??梢宰约簩?shí)現(xiàn)
      • 參考HystrixRequestLog#getExecutedCommandsAsString
  • Collapser Properties

    • maxRequestsInBatch

      • com.netflix.hystrix.collapser.RequestBatch#offer
      public Observable<ResponseType> offer(RequestArgumentType arg) {
           ...
           if (argumentMap.size() >= maxBatchSize) {
              return null;//超過(guò)批量大小,外層拿到null之后會(huì)新建一個(gè)batch
         }else{
              CollapsedRequestSubject<> collapsedRequest = new CollapsedRequestSubject<>(arg, this);
              //放入緩存
              CollapsedRequestSubject<> existing = argumentMap.putIfAbsent(arg, collapsedRequest);
              if (existing != null) {
                  return existing.toObservable();
              }else{
                  return collapsedRequest.toObservable();
              }
         }
      
      }
      
    • timerDelayInMilliseconds

      • com.netflix.hystrix.collapser.RequestCollapser#submitRequest
      public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
          if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {
              /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
              //設(shè)置定時(shí)任務(wù),timerDelayInMilliseconds后運(yùn)行
              timerListenerReference.set(timer.addListener(new CollapsedTask()));
          }
      }
      
      private class CollapsedTask implements TimerListener{
          public int getIntervalTimeInMilliseconds() {
              return properties.timerDelayInMilliseconds().get();
          }
      
          public void tick(){
              RequestBatch<> currentBatch = batch.get();
              if (currentBatch != null && currentBatch.getSize() > 0) {
                  //新建一個(gè)batch,幷?qǐng)?zhí)行前一個(gè)batch
                  createNewBatchAndExecutePreviousIfNeeded(currentBatch);//新建一個(gè)batch,幷?qǐng)?zhí)行前一個(gè)batch
              }
          }
      }
      
    • requestCache.enabled

      • collapser用的緩存開(kāi)關(guān)
  • Thread Pool Properties

    • coreSize
    • maximumSize
    • maxQueueSize
    • queueSizeRejectionThreshold
    • keepAliveTimeMinutes
    • allowMaximumSizeToDivergeFromCoreSize
    • metrics.rollingStats.timeInMilliseconds
    • metrics.rollingStats.numBuckets
    • 基本都是ThreadPool的常規(guī)配置, 詳見(jiàn)HystrixConcurrencyStrategy#getThreadPool
    • 官方推薦線程數(shù)設(shè)置公式為 線程池大小 = 峰值每秒請(qǐng)求數(shù) * 99%延遲大小 + 富余空間。 比如 30rps * 0.2延遲 = 6, 給一個(gè)富余比例可以設(shè)為10

其他

  • spring如何初始化hystrix?

    1. @EnableCircuitBreaker

    2. spring-cloud-netflix-core -> spring.factories

      org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
      org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
      
    3. HystrixCircuitBreakerConfiguration

    4. HystrixCommandAspect

    5. HystrixCommandAspect#methodsAnnotatedWithHystrixCommand

  • spring的配置properties是如何注入的?

    1. spring-cloud-netflix-hystrix 會(huì)引入 spring-cloud-netflix-archaius
    2. 他的spring.factories中有 ArchaiusAutoConfiguration
    3. ArchaiusAutoConfiguration#configurableEnvironmentConfiguration
    4. ArchaiusAutoConfiguration#configureArchaius
    5. ArchaiusAutoConfiguration#addArchaiusConfiguration
    6. 以上三部將spring的properties配置轉(zhuǎn)換成netflix的動(dòng)態(tài)ConfigurationManager
  • hystrix屬性是如何動(dòng)態(tài)更新的?

    1. ArchaiusAutoConfiguration$PropagateEventsConfiguration#onApplicationEvent監(jiān)聽(tīng)spring的EnvironmentChangeEvent事件,幷轉(zhuǎn)發(fā)給netflix的配置管理器

      public void onApplicationEvent(EnvironmentChangeEvent event) {
          AbstractConfiguration manager = ConfigurationManager.getConfigInstance();
          for (String key : event.getKeys()) {
              for (ConfigurationListener listener : manager
                      .getConfigurationListeners()) {
                  listener.configurationChanged(new ConfigurationEvent(source, type,
                          key, value, beforeUpdate));         
              }
          }
      }
      
    2. ExpandedConfigurationListenerAdapter#configurationChanged

    3. DynamicProperty$DynamicPropertyListener#setProperty

          private static boolean updateProperty(String propName, Object value) {
              DynamicProperty prop = ALL_PROPS.get(propName);
              if (prop != null && prop.updateValue(value)) {
                  prop.notifyCallbacks();
                  return true;
              }
              return false;
          }
      
    4. HystrixProperty -> HystrixDynamicProperty -> ArchaiusDynamicProperty -> PropertyWrapper -> DynamicProperty

    5. DynamicProperty就是我們最終獲得值和更新值的地方

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容