微服務(wù)實戰(zhàn)SpringCloud之Hystrix

Hystrix是什么?

在微服務(wù)架構(gòu)中,微服務(wù)之間互相依賴較大,相互之間調(diào)用必不可免的會失敗。但當(dāng)下游服務(wù)A因為瞬時流量導(dǎo)致服務(wù)崩潰,其他依賴于A服務(wù)的B、C服務(wù)由于調(diào)用A服務(wù)超時耗費了大量的資源,長時間下去,B、C服務(wù)也會崩潰。Hystrix就是用來解決服務(wù)之間相互調(diào)用失敗,避免產(chǎn)生蝴蝶效應(yīng)的熔斷器,以及提供降級選項。Hystrix通過隔離服務(wù)之間的訪問點,阻止它們之間的級聯(lián)故障以及提供默認(rèn)選項來實現(xiàn)這一目標(biāo),以提高系統(tǒng)的整體健壯性。

用來解決什么問題?

用來避免由于服務(wù)之間依賴較重,出現(xiàn)個別服務(wù)宕機、停止服務(wù)導(dǎo)致大面積服務(wù)雪崩的情況。

小試牛刀

服務(wù)降級

目前有eureka、zuul、product、cart四個服務(wù)。

目的是通過zuul調(diào)用product接口,product接口通過feign調(diào)用cart服務(wù)的接口。product項目調(diào)用cart項目超時觸發(fā)hystrix熔斷及服務(wù)降級。

現(xiàn)將關(guān)鍵配置及代碼陳列如下:

eureka:

server:
  port: 8761

eureka:
  client:
    service-url:
      defaultZone: http://${eureka.instance.hostname:localhost}:${server.port:8761}/eureka

spring:
  application:
    name: eureka
  profiles:
    active: ${boot.profile:dev}

zuul

服務(wù)配置:

spring:
  application:
    name: zuul #應(yīng)用名稱
server:
  port: 8080 #應(yīng)用服務(wù)端口

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka  #配置eureka 默認(rèn)分區(qū)的地址

ribbon:
  okhttp:
    enabled: true #開啟ribbon 使用OKhttp發(fā)送http請求

  ReadTimeout: 5000
  ConnectTimeout: 5000

zuul:
  prefix: /api #定義全局路由前綴
  strip-prefix: true #路由到下游服務(wù)時開啟去除前綴開關(guān)

路由配置:

@Configuration
public class ZuuPatternServiceRouteMapperConfiguration {

    /**
     * 獲取沒有版本號的路由匹配規(guī)則bean
     *
     * @return {@link PatternServiceRouteMapper}
     * @date 10:27 AM 2019/1/17
     **/
    @Bean
    public PatternServiceRouteMapper patternServiceRouteMapper() {

        return new PatternServiceRouteMapper("(?<version>v.*$)", "${name}");
    }
}

webMvc配置:

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/*")
                .allowedOrigins("*");
    }
}

product

server:
  port: 8082

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka
spring:
  application:
    name: product
  profiles:
    active: ${boot.profile:dev}

feign:
  client:
    config:
      default:
        connectTimeout: 1000 #feign調(diào)用連接超時時間
        readTimeout: 1000 #feign調(diào)用讀取超時時間
        loggerLevel: basic #feign調(diào)用的log 等級

啟動類代碼:

@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
@EnableCircuitBreaker
public class ProductApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class, args);
    }
}

controller代碼:

@RequestMapping("/product")
@RestController
public class ProductController {

    @Autowired
    private CartFeignClient cartFeignClient; //調(diào)用cart服務(wù)的feign client

    @HystrixCommand(fallbackMethod = "getDefaultValue") //指定出現(xiàn)異常的默認(rèn)回退方法
    @PostMapping("/toCart/{productId}")
    public ResponseEntity addCart(@PathVariable("productId") Long productId) throws InterruptedException {
        Thread.sleep(5000); //特意讓線程休眠一段時間以觸發(fā)熔斷及服務(wù)降級
        Long aLong = cartFeignClient.addCart(productId);
        return ResponseEntity.ok(productId);
    }
    //默認(rèn)降級方法
    private ResponseEntity getDefaultValue(Long productId) {
        return ResponseEntity.ok(0);
    }
}

cart服務(wù)的feign client代碼:

@FeignClient(value = "CART") //指定服務(wù)名稱
public interface CartFeignClient {

    @PostMapping("/cart/{productId}") //指定接口路徑
    Long addCart(@PathVariable("productId")Long productId); //參數(shù)
}

cart服務(wù)沒什么特殊配置及代碼,只要注冊到eureka以及提供接口就可以了。

服務(wù)配置:

server:
  port: 8081

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka
  instance:
    status-page-url-path: /info
    health-check-url-path: /health

spring:
  application:
    name: cart
  profiles:
    active: ${boot.profile:dev}

啟動類代碼:

@EnableDiscoveryClient
@SpringBootApplication
public class CartApplication {

    public static void main(String[] args) {
        SpringApplication.run(CartApplication.class, args);
    }

}

接口:

@RestController
@RequestMapping("/cart")
@Api(value = "購物車", tags = {"購物車"}) //swagger
public class CartControllrr {

    @ApiOperation("添加商品到購物車") //swagger 
    @ApiImplicitParam(name = "productId",value = "productId",required = true,paramType ="path",dataType = "String")
    @PostMapping("/{productId}")
    public ResponseEntity addCart(@PathVariable("productId") Long productId) {
        System.out.println(productId);
        return ResponseEntity.ok(productId);
    }
}

現(xiàn)在將服務(wù)全部啟動,并通過zuul調(diào)用接口即可。

原先直接調(diào)用product接口地址為:http://localhost:8082/product/toCart/1

經(jīng)過zuul后的地址為:http://localhost:8080/api/product/product/toCart/1

正常調(diào)用結(jié)果返回為1,

由于product處理代碼中加入了sleep,導(dǎo)致product服務(wù)調(diào)用超時,觸發(fā)hystrix熔斷及服務(wù)降級,調(diào)用了指定的回退方法,getDefaultValue,返回了0.

[圖片上傳失敗...(image-422c2f-1548757736276)]

這里需要注意的有幾點:

  1. 通過網(wǎng)關(guān)zuul調(diào)用時,zuul的超時時間要大于等于hystrix的超時時間配置,否則在zuul層轉(zhuǎn)發(fā)時就已經(jīng)觸發(fā)了zuul的超時,返回 GATEWAY TIME OUT

  2. 無重試機制時,通過feign加ribbon進行服務(wù)之間調(diào)用時,hystrix配置超時時間要小于ribbon超時時間,否則在ribbon調(diào)用其他服務(wù)時就已經(jīng)超時了,hystrix無法進行熔斷及降級

  3. 如果有重試時,如有組件跟Hystrix配合使用,一般來講,建議Hystrix的超時 > 其他組件的超時,否則將可能導(dǎo)致重試特性失效。例如,如果ribbon超時時間為1秒,重試3次,hystrix超時時間應(yīng)略大于3秒。

  4. 定義一個fallback方法需要注意以下幾點:

    • fallback方法必須和指定fallback方法的主方法在一個類中。

    • fallback方法的參數(shù)必須要和主方法的參數(shù)一致,否則不生效。

    • 使用fallback方法需要根據(jù)依賴服務(wù)設(shè)置合理的超時時間,即execution.isolation.thread.timeoutInMilliseconds的設(shè)置,可以在@HystrixCommand注解上通過HystrixProperty指定。

  5. 如果要在fallback方法中獲取異常信息,只需要在fallback方法中,加上一個參數(shù)Throwable throwable就可以了。

GitHub上Netflix給的例子是通過繼承HystrixCommand或者HystrixObservableCommand,然后實現(xiàn)execute()或者queue()方法來運行,但是這種方式代碼侵入性較大。使用注解@HystrixCommand方式的侵入性小一點。

剛才的hystrix的服務(wù)降級示例是同步模式的,也是通常情況下我們所使用的模式。

  • 同步command,同步fallback
@HystrixCommand(fallbackMethod = "getDefaultValue")
@PostMapping("/toCart/{productId}")
public ResponseEntity addCart(@PathVariable("productId") Long productId) throws InterruptedException {
    Long aLong = cartFeignClient.addCart(productId);
    System.out.println(aLong);
    return ResponseEntity.ok(productId);
}

private ResponseEntity getDefaultValue(Long productId) {
    return ResponseEntity.ok(0);
}
  • 異步command,同步fallback
@HystrixCommand(fallbackMethod = "getDefaultAsyncAddCart")
public Future<ResponseEntity<Long>> asyncAddCart(@PathVariable("productId") Long productId) throws InterruptedException {
    log.info("異步command:run。。。");
    Thread.sleep(5000);//觸發(fā)降級邏輯
    return new AsyncResult<ResponseEntity<Long>>() {
        @Override
        public ResponseEntity<Long> invoke() {
            return ResponseEntity.ok(cartFeignClient.addCart(productId));
        }
    };
}

private ResponseEntity<Long> getDefaultAsyncAddCart(Long productId, Throwable throwable) {
    log.info("異步command,同步fallback:run。。。");
    return ResponseEntity.ok(0L);
}
  • 異步command,異步fallback
@HystrixCommand(fallbackMethod = "getDefaultAsyncAddCart2")
public Future<ResponseEntity<Long>> asyncAddCart2(@PathVariable("productId") Long productId) throws InterruptedException {
    log.info("異步command:run。。。");
    Thread.sleep(5000);
    return new AsyncResult<ResponseEntity<Long>>() {
        @Override
        public ResponseEntity<Long> invoke() {
            return ResponseEntity.ok(cartFeignClient.addCart(productId));
        }
    };
}

@HystrixCommand //注意,異步fallback 這里必須加@HystixCommand注解,否則運行時報錯
private Future<ResponseEntity<Long>> getDefaultAsyncAddCart2(Long productId, Throwable throwable) {
    log.info("異步command,同步fallback:run。。。");
    log.warn("", throwable);
    return new AsyncResult<ResponseEntity<Long>>() {
        @Override
        public ResponseEntity<Long> invoke() {
            return ResponseEntity.ok(0L);
        }
    };
}

注意,hystrix不支持同步command,異步fallback。

hystrix配置

我們先看下@HystrixCommand注解中有哪些配置。

屬性 類型 描述
groupKey String 用于報告、告警、大盤展示時的分組key
commandKey String hystrix 命令的key值,默認(rèn)是方法名
threadPoolKey String 用于區(qū)分不同線程池的key值,hystrix線程池是用來監(jiān)控,緩存,避免個別服務(wù)出現(xiàn)異常導(dǎo)致拖累所有線程都被占用的key值。
fallbackMethod String 指定回退/降級的方法,此方法必須要定義在相同的類中,參數(shù)也應(yīng)該相同
commandProperties HystrixProperty數(shù)組 指定hystrix command 屬性值
threadPoolProperties HystrixProperty數(shù)組 指定hystrix線程池的屬性值
ignoreExceptions Throwable及子類 定義應(yīng)該忽略的異常
observableExecutionMode ObservableExecutionMode 指定hystrix用于執(zhí)行觀察者命令的模式,默認(rèn)饑餓加載

其他配置:詳情見https://github.com/Netflix/Hystrix/wiki/Configuration

@HystixCommand屬性配置官方示例:https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

命令屬性配置:

執(zhí)行時:

屬性 默認(rèn) 描述 全局配置
execution.isolation.strategy THREAD 執(zhí)行hystrix命令模式時的隔離模式,默認(rèn)是THREAD。有兩種選項,THREAD線程隔離,SEMAPHORE信號量隔離。THREAD模式會在有限線程的線程池內(nèi)選擇一個單獨的線程執(zhí)行,SEMAPHORE是直接在調(diào)用線程上執(zhí)行,并發(fā)請求受信號量計數(shù)的限制 hystrix.command.default.execution.isolation.strategy
execution.isolation.thread.timeoutInMilliseconds 1000 設(shè)置執(zhí)行hystrix命令包裹方法的超時時間,超過這個時間則執(zhí)行回退邏輯 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds
execution.timeout.enabled true 設(shè)置是否開啟執(zhí)行hystrix命令包裹方法的超時。 hystrix.command.default.execution.timeout.enabled
execution.isolation.thread.interruptOnTimeout true 此屬性設(shè)置HystrixCommand.run()在發(fā)生超時時是否應(yīng)中斷執(zhí)行。 hystrix.command.default.execution.isolation.thread.interruptOnTimeout
execution.isolation.thread.interruptOnCancel false 此屬性設(shè)置HystrixCommand.run()在發(fā)生取消時是否應(yīng)中斷執(zhí)行。 hystrix.command.default.execution.isolation.thread.interruptOnCancel
execution.isolation.semaphore.maxConcurrentRequests 10 當(dāng)使用SEMAPHORE信號量模式時,設(shè)置允許HystrixCommand.run()同時并發(fā)執(zhí)行的最大請求數(shù)。達到最大值時,后面的請求將被拒絕執(zhí)行,并執(zhí)行回退邏輯,如果沒有回退方法,則會拋出異常。生產(chǎn)環(huán)境可根據(jù)實際情況調(diào)整 hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests

回退時:

屬性 默認(rèn)值 描述 全局配置
fallback.isolation.semaphore.maxConcurrentRequests 10 設(shè)置允許回退方法同時執(zhí)行的最大并發(fā)數(shù),超過這個值,則將拒絕后續(xù)請求并拋出異常(無第二級回退時) hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests
fallback.enabled true 是否執(zhí)行回退 hystrix.command.default.fallback.enabled

斷路器配置:

屬性 默認(rèn)值 描述 全局配置
circuitBreaker.enabled true 斷路器開關(guān),此配置決定斷路器是否用于跟蹤運行狀況,以及在其跳閘時是否用于短路請求。 hystrix.command.default.circuitBreaker.enabled
circuitBreaker.requestVolumeThreshold 20 設(shè)置用于觸發(fā)跳閘的滾動窗口的最小失敗請求數(shù)的閾值 hystrix.command.default.circuitBreaker.requestVolumeThreshold
circuitBreaker.sleepWindowInMilliseconds 5000 設(shè)置觸發(fā)斷路器后允許再次執(zhí)行請求前,拒絕請求的時間,單位為毫秒 hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds
circuitBreaker.errorThresholdPercentage 50 設(shè)置觸發(fā)斷路器,走降級邏輯的默認(rèn)百分比最小閾值 hystrix.command.default.circuitBreaker.errorThresholdPercentage
circuitBreaker.forceOpen false 是否強制進入斷路器狀態(tài),進入該狀態(tài)將拒絕所有請求 hystrix.command.default.circuitBreaker.forceOpen
circuitBreaker.forceClosed false 是否強制關(guān)閉斷路器狀態(tài),關(guān)閉后,將允許所有請求進入 hystrix.command.default.circuitBreaker.forceClosed

Metrics:

屬性名 默認(rèn)值 描述 全局配置
metrics.rollingStats.timeInMilliseconds 10000 設(shè)置統(tǒng)計滾動窗口失敗請求次數(shù)的統(tǒng)計時間 hystrix.command.default.metrics.rollingStats.timeInMilliseconds
metrics.rollingStats.numBuckets 10 此屬性設(shè)置滾動統(tǒng)計窗口劃分的存儲桶數(shù)。必須能被metrics.rollingStats.timeInMilliseconds整除,否則拋出異常 hystrix.command.default.metrics.rollingStats.numBuckets
metrics.rollingPercentile.enabled true 配置是否應(yīng)跟蹤執(zhí)行延遲并將其計算為百分位數(shù)。 hystrix.command.default.metrics.rollingPercentile.enabled
metrics.rollingPercentile.timeInMilliseconds 60000 設(shè)置滾動窗口的持續(xù)時間,其中保持執(zhí)行時間以允許百分位計算,以毫秒為單位。 hystrix.command.default.metrics.rollingPercentile.timeInMilliseconds
metrics.rollingPercentile.numBucketsmetrics.rollingPercentile.numBuckets 6 設(shè)置rollingPercentile窗口將分成的桶數(shù)。必須能被metrics.rollingPercentile.timeInMilliseconds整除,否則拋出異常 hystrix.command.default.metrics.rollingPercentile.numBuckets
metrics.rollingPercentile.bucketSize 100 設(shè)置每個存儲桶保留的最大執(zhí)行次數(shù) hystrix.command.default.metrics.rollingPercentile.bucketSize
metrics.healthSnapshot.intervalInMilliseconds 500 設(shè)置允許執(zhí)行計算成功和錯誤百分比的健康快照與影響斷路器狀態(tài)之間的等待時間(以毫秒為單位)。 hystrix.command.default.metrics.healthSnapshot.intervalInMilliseconds

Request Context:

屬性 默認(rèn)值 描述 全局配置
requestCache.enabled true 請求緩存的開關(guān),開啟之后,hystrix的cacheKey會被緩存掉,當(dāng)同一個請求來時,使用緩存的內(nèi)容 hystrix.command.default.requestCache.enabled
requestLog.enabled true 是否打印請求的log hystrix.command.default.requestLog.enabled

Collapser Properties:

屬性 默認(rèn)值 描述 全局配置
maxRequestsInBatch Integer.MAX_VALUE 設(shè)置在批處理之前允許的最大請求數(shù) hystrix.collapser.default.maxRequestsInBatch
timerDelayInMilliseconds 10 設(shè)置在批處理創(chuàng)建完后多少毫秒后執(zhí)行 hystrix.collapser.default.timerDelayInMilliseconds
requestCache.enabled true 是否開啟請求緩存 hystrix.collapser.default.requestCache.enabled

Thread Properties:

配置hystrix線程池的屬性

image
屬性 默認(rèn)值 描述 全局配置
coreSize 10 設(shè)置hystrix線程池核心線程池大小 hystrix.threadpool.default.coreSize
maximumSize 10 設(shè)置hystrix線程池最大線程池大小 hystrix.threadpool.default.maximumSize
maxQueueSize ?1 設(shè)置hystrix線程池的最大隊列大小 hystrix.threadpool.default.maxQueueSize
queueSizeRejectionThreshold 5 設(shè)置隊列拒絕閾值,即使未達到maxQueueSize也會發(fā)生拒絕的最大隊列大小,此屬性的存在是因為無法動態(tài)更改maxQueueSize,我們希望允許您動態(tài)更改影響拒絕的隊列大小。當(dāng)maxQueueSize為-1時,此配置不生效 hystrix.threadpool.default.queueSizeRejectionThreshold
keepAliveTimeMinutes 1 設(shè)置線程池線程釋放之前保持活躍狀態(tài)的時間,單位:分鐘 hystrix.threadpool.default.keepAliveTimeMinutes
allowMaximumSizeToDivergeFromCoreSize false hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize
metrics.rollingStats.timeInMilliseconds 10000 設(shè)置統(tǒng)計滾動窗口的持續(xù)時間 hystrix.threadpool.default.metrics.rollingStats.timeInMilliseconds
metrics.rollingStats.numBuckets 10 設(shè)置滾動統(tǒng)計窗口分為的桶數(shù)。 hystrix.threadpool.default.metrics.rollingStats.numBuckets

服務(wù)容錯保護(Hystrix依賴隔離)

hystrix為每一個命令創(chuàng)建一個獨立的線程池,這樣就算某個hystrix命令由于依賴其他服務(wù)導(dǎo)致出現(xiàn)延遲過高的情況,也只是對該依賴服務(wù)的調(diào)用產(chǎn)生影響,而不會拖累其他服務(wù)。

通過對依賴服務(wù)的線程池隔離實現(xiàn),可以帶來如下優(yōu)勢:

  • 應(yīng)用自身得到完全的保護,不會受不可控的依賴服務(wù)影響。即便給依賴服務(wù)分配的線程池被填滿,也不會影響應(yīng)用自身的額其余部分。
  • 可以有效的降低接入新服務(wù)的風(fēng)險。如果新服務(wù)接入后運行不穩(wěn)定或存在問題,完全不會影響到應(yīng)用其他的請求。
  • 當(dāng)依賴的服務(wù)從失效恢復(fù)正常后,它的線程池會被清理并且能夠馬上恢復(fù)健康的服務(wù),相比之下容器級別的清理恢復(fù)速度要慢得多。
  • 當(dāng)依賴的服務(wù)出現(xiàn)配置錯誤的時候,線程池會快速的反應(yīng)出此問題(通過失敗次數(shù)、延遲、超時、拒絕等指標(biāo)的增加情況)。同時,我們可以在不影響應(yīng)用功能的情況下通過實時的動態(tài)屬性刷新(后續(xù)會通過Spring Cloud Config與Spring Cloud Bus的聯(lián)合使用來介紹)來處理它。
  • 當(dāng)依賴的服務(wù)因?qū)崿F(xiàn)機制調(diào)整等原因造成其性能出現(xiàn)很大變化的時候,此時線程池的監(jiān)控指標(biāo)信息會反映出這樣的變化。同時,我們也可以通過實時動態(tài)刷新自身應(yīng)用對依賴服務(wù)的閾值進行調(diào)整以適應(yīng)依賴方的改變。
  • 除了上面通過線程池隔離服務(wù)發(fā)揮的優(yōu)點之外,每個專有線程池都提供了內(nèi)置的并發(fā)實現(xiàn),可以利用它為同步的依賴服務(wù)構(gòu)建異步的訪問。

總之,通過對依賴服務(wù)實現(xiàn)線程池隔離,讓我們的應(yīng)用更加健壯,不會因為個別依賴服務(wù)出現(xiàn)問題而引起非相關(guān)服務(wù)的異常。同時,也使得我們的應(yīng)用變得更加靈活,可以在不停止服務(wù)的情況下,配合動態(tài)配置刷新實現(xiàn)性能配置上的調(diào)整。

原理及設(shè)計

[圖片上傳失敗...(image-f885c3-1548757736276)]

通過翻看hystrix-javanica的ReadMe以及查看注解@HystrixCommand的引用可以發(fā)現(xiàn),HystrixCommandAspect類是一個很關(guān)鍵的類。

接下來我們從HystrixCommandAspect來入手。

@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

//指定切點為@HystrixCommand注解 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
    //切面
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        //先獲取到執(zhí)行的方法數(shù)據(jù)
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        //...
        //根據(jù)注解類型獲取不同methodHolder的構(gòu)造工廠,methodHolder用來保存方法的一些數(shù)據(jù),如注解以及注解的屬性值等
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //根據(jù)method創(chuàng)建一個methodHolder
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        //構(gòu)造hystrixCommand對象
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //獲取方法執(zhí)行類型,同步、異步還是流式。
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                //非流式,執(zhí)行命令
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                //流式,執(zhí)行命令
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause() != null ? e.getCause() : e;
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
}    

非流式命令執(zhí)行代碼:

CommandExecutor.execute()。

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    //......
    switch (executionType) {
        case SYNCHRONOUS: {
            //同步模式執(zhí)行command
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            //異步模式執(zhí)行command
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            //流式模式執(zhí)行command
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        //轉(zhuǎn)換
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }

    private static HystrixObservable castToObservable(HystrixInvokable invokable) {
        //轉(zhuǎn)換
        if (invokable instanceof HystrixObservable) {
            return (HystrixObservable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
    }

同步執(zhí)行的execute()方法有兩個實現(xiàn)類,分別是HystrixCommand以及HystrixCollapser。

通常是HystrixCommand,只有在指定合并多個請求時才會是HystrixCollapser。

我們先看下HystrixCommand的execute()方法。

public R execute() {
    try {
        //直接調(diào)用queue()方法獲得結(jié)果Future后再調(diào)用get()方法。
        return queue().get();
    } catch (Exception e) {
        //重新拋出異常
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        
        final Future<R> f = new Future<R>() {

            //...
            
        };

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                //...處理異常
            }
        }

        return f;
    }

Hystrix還有哪些待改進的地方?

待完善。

有沒有更好的解決方式

待完善。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容