Hystrix是什么?
在微服務(wù)架構(gòu)中,微服務(wù)之間互相依賴較大,相互之間調(diào)用必不可免的會失敗。但當(dāng)下游服務(wù)A因為瞬時流量導(dǎo)致服務(wù)崩潰,其他依賴于A服務(wù)的B、C服務(wù)由于調(diào)用A服務(wù)超時耗費了大量的資源,長時間下去,B、C服務(wù)也會崩潰。Hystrix就是用來解決服務(wù)之間相互調(diào)用失敗,避免產(chǎn)生蝴蝶效應(yīng)的熔斷器,以及提供降級選項。Hystrix通過隔離服務(wù)之間的訪問點,阻止它們之間的級聯(lián)故障以及提供默認(rèn)選項來實現(xiàn)這一目標(biāo),以提高系統(tǒng)的整體健壯性。
用來解決什么問題?
用來避免由于服務(wù)之間依賴較重,出現(xiàn)個別服務(wù)宕機、停止服務(wù)導(dǎo)致大面積服務(wù)雪崩的情況。
小試牛刀
服務(wù)降級
目前有eureka、zuul、product、cart四個服務(wù)。
目的是通過zuul調(diào)用product接口,product接口通過feign調(diào)用cart服務(wù)的接口。product項目調(diào)用cart項目超時觸發(fā)hystrix熔斷及服務(wù)降級。
現(xiàn)將關(guān)鍵配置及代碼陳列如下:
eureka:
server:
port: 8761
eureka:
client:
service-url:
defaultZone: http://${eureka.instance.hostname:localhost}:${server.port:8761}/eureka
spring:
application:
name: eureka
profiles:
active: ${boot.profile:dev}
zuul
服務(wù)配置:
spring:
application:
name: zuul #應(yīng)用名稱
server:
port: 8080 #應(yīng)用服務(wù)端口
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka #配置eureka 默認(rèn)分區(qū)的地址
ribbon:
okhttp:
enabled: true #開啟ribbon 使用OKhttp發(fā)送http請求
ReadTimeout: 5000
ConnectTimeout: 5000
zuul:
prefix: /api #定義全局路由前綴
strip-prefix: true #路由到下游服務(wù)時開啟去除前綴開關(guān)
路由配置:
@Configuration
public class ZuuPatternServiceRouteMapperConfiguration {
/**
* 獲取沒有版本號的路由匹配規(guī)則bean
*
* @return {@link PatternServiceRouteMapper}
* @date 10:27 AM 2019/1/17
**/
@Bean
public PatternServiceRouteMapper patternServiceRouteMapper() {
return new PatternServiceRouteMapper("(?<version>v.*$)", "${name}");
}
}
webMvc配置:
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/*")
.allowedOrigins("*");
}
}
product
server:
port: 8082
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
spring:
application:
name: product
profiles:
active: ${boot.profile:dev}
feign:
client:
config:
default:
connectTimeout: 1000 #feign調(diào)用連接超時時間
readTimeout: 1000 #feign調(diào)用讀取超時時間
loggerLevel: basic #feign調(diào)用的log 等級
啟動類代碼:
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
@EnableCircuitBreaker
public class ProductApplication {
public static void main(String[] args) {
SpringApplication.run(ProductApplication.class, args);
}
}
controller代碼:
@RequestMapping("/product")
@RestController
public class ProductController {
@Autowired
private CartFeignClient cartFeignClient; //調(diào)用cart服務(wù)的feign client
@HystrixCommand(fallbackMethod = "getDefaultValue") //指定出現(xiàn)異常的默認(rèn)回退方法
@PostMapping("/toCart/{productId}")
public ResponseEntity addCart(@PathVariable("productId") Long productId) throws InterruptedException {
Thread.sleep(5000); //特意讓線程休眠一段時間以觸發(fā)熔斷及服務(wù)降級
Long aLong = cartFeignClient.addCart(productId);
return ResponseEntity.ok(productId);
}
//默認(rèn)降級方法
private ResponseEntity getDefaultValue(Long productId) {
return ResponseEntity.ok(0);
}
}
cart服務(wù)的feign client代碼:
@FeignClient(value = "CART") //指定服務(wù)名稱
public interface CartFeignClient {
@PostMapping("/cart/{productId}") //指定接口路徑
Long addCart(@PathVariable("productId")Long productId); //參數(shù)
}
cart服務(wù)沒什么特殊配置及代碼,只要注冊到eureka以及提供接口就可以了。
服務(wù)配置:
server:
port: 8081
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
instance:
status-page-url-path: /info
health-check-url-path: /health
spring:
application:
name: cart
profiles:
active: ${boot.profile:dev}
啟動類代碼:
@EnableDiscoveryClient
@SpringBootApplication
public class CartApplication {
public static void main(String[] args) {
SpringApplication.run(CartApplication.class, args);
}
}
接口:
@RestController
@RequestMapping("/cart")
@Api(value = "購物車", tags = {"購物車"}) //swagger
public class CartControllrr {
@ApiOperation("添加商品到購物車") //swagger
@ApiImplicitParam(name = "productId",value = "productId",required = true,paramType ="path",dataType = "String")
@PostMapping("/{productId}")
public ResponseEntity addCart(@PathVariable("productId") Long productId) {
System.out.println(productId);
return ResponseEntity.ok(productId);
}
}
現(xiàn)在將服務(wù)全部啟動,并通過zuul調(diào)用接口即可。
原先直接調(diào)用product接口地址為:http://localhost:8082/product/toCart/1
經(jīng)過zuul后的地址為:http://localhost:8080/api/product/product/toCart/1
正常調(diào)用結(jié)果返回為1,
由于product處理代碼中加入了sleep,導(dǎo)致product服務(wù)調(diào)用超時,觸發(fā)hystrix熔斷及服務(wù)降級,調(diào)用了指定的回退方法,getDefaultValue,返回了0.
[圖片上傳失敗...(image-422c2f-1548757736276)]
這里需要注意的有幾點:
通過網(wǎng)關(guān)zuul調(diào)用時,zuul的超時時間要大于等于hystrix的超時時間配置,否則在zuul層轉(zhuǎn)發(fā)時就已經(jīng)觸發(fā)了zuul的超時,返回 GATEWAY TIME OUT
無重試機制時,通過feign加ribbon進行服務(wù)之間調(diào)用時,hystrix配置超時時間要小于ribbon超時時間,否則在ribbon調(diào)用其他服務(wù)時就已經(jīng)超時了,hystrix無法進行熔斷及降級
如果有重試時,如有組件跟Hystrix配合使用,一般來講,建議Hystrix的超時 > 其他組件的超時,否則將可能導(dǎo)致重試特性失效。例如,如果ribbon超時時間為1秒,重試3次,hystrix超時時間應(yīng)略大于3秒。
-
定義一個fallback方法需要注意以下幾點:
fallback方法必須和指定fallback方法的主方法在一個類中。
fallback方法的參數(shù)必須要和主方法的參數(shù)一致,否則不生效。
使用fallback方法需要根據(jù)依賴服務(wù)設(shè)置合理的超時時間,即execution.isolation.thread.timeoutInMilliseconds的設(shè)置,可以在@HystrixCommand注解上通過HystrixProperty指定。
如果要在fallback方法中獲取異常信息,只需要在fallback方法中,加上一個參數(shù)Throwable throwable就可以了。
GitHub上Netflix給的例子是通過繼承HystrixCommand或者HystrixObservableCommand,然后實現(xiàn)execute()或者queue()方法來運行,但是這種方式代碼侵入性較大。使用注解@HystrixCommand方式的侵入性小一點。
剛才的hystrix的服務(wù)降級示例是同步模式的,也是通常情況下我們所使用的模式。
- 同步command,同步fallback
@HystrixCommand(fallbackMethod = "getDefaultValue")
@PostMapping("/toCart/{productId}")
public ResponseEntity addCart(@PathVariable("productId") Long productId) throws InterruptedException {
Long aLong = cartFeignClient.addCart(productId);
System.out.println(aLong);
return ResponseEntity.ok(productId);
}
private ResponseEntity getDefaultValue(Long productId) {
return ResponseEntity.ok(0);
}
- 異步command,同步fallback
@HystrixCommand(fallbackMethod = "getDefaultAsyncAddCart")
public Future<ResponseEntity<Long>> asyncAddCart(@PathVariable("productId") Long productId) throws InterruptedException {
log.info("異步command:run。。。");
Thread.sleep(5000);//觸發(fā)降級邏輯
return new AsyncResult<ResponseEntity<Long>>() {
@Override
public ResponseEntity<Long> invoke() {
return ResponseEntity.ok(cartFeignClient.addCart(productId));
}
};
}
private ResponseEntity<Long> getDefaultAsyncAddCart(Long productId, Throwable throwable) {
log.info("異步command,同步fallback:run。。。");
return ResponseEntity.ok(0L);
}
- 異步command,異步fallback
@HystrixCommand(fallbackMethod = "getDefaultAsyncAddCart2")
public Future<ResponseEntity<Long>> asyncAddCart2(@PathVariable("productId") Long productId) throws InterruptedException {
log.info("異步command:run。。。");
Thread.sleep(5000);
return new AsyncResult<ResponseEntity<Long>>() {
@Override
public ResponseEntity<Long> invoke() {
return ResponseEntity.ok(cartFeignClient.addCart(productId));
}
};
}
@HystrixCommand //注意,異步fallback 這里必須加@HystixCommand注解,否則運行時報錯
private Future<ResponseEntity<Long>> getDefaultAsyncAddCart2(Long productId, Throwable throwable) {
log.info("異步command,同步fallback:run。。。");
log.warn("", throwable);
return new AsyncResult<ResponseEntity<Long>>() {
@Override
public ResponseEntity<Long> invoke() {
return ResponseEntity.ok(0L);
}
};
}
注意,hystrix不支持同步command,異步fallback。
hystrix配置
我們先看下@HystrixCommand注解中有哪些配置。
| 屬性 | 類型 | 描述 |
|---|---|---|
| groupKey | String | 用于報告、告警、大盤展示時的分組key |
| commandKey | String | hystrix 命令的key值,默認(rèn)是方法名 |
| threadPoolKey | String | 用于區(qū)分不同線程池的key值,hystrix線程池是用來監(jiān)控,緩存,避免個別服務(wù)出現(xiàn)異常導(dǎo)致拖累所有線程都被占用的key值。 |
| fallbackMethod | String | 指定回退/降級的方法,此方法必須要定義在相同的類中,參數(shù)也應(yīng)該相同 |
| commandProperties | HystrixProperty數(shù)組 | 指定hystrix command 屬性值 |
| threadPoolProperties | HystrixProperty數(shù)組 | 指定hystrix線程池的屬性值 |
| ignoreExceptions | Throwable及子類 | 定義應(yīng)該忽略的異常 |
| observableExecutionMode | ObservableExecutionMode | 指定hystrix用于執(zhí)行觀察者命令的模式,默認(rèn)饑餓加載 |
其他配置:詳情見https://github.com/Netflix/Hystrix/wiki/Configuration
@HystixCommand屬性配置官方示例:https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica
命令屬性配置:
執(zhí)行時:
| 屬性 | 默認(rèn) | 描述 | 全局配置 |
|---|---|---|---|
| execution.isolation.strategy | THREAD | 執(zhí)行hystrix命令模式時的隔離模式,默認(rèn)是THREAD。有兩種選項,THREAD線程隔離,SEMAPHORE信號量隔離。THREAD模式會在有限線程的線程池內(nèi)選擇一個單獨的線程執(zhí)行,SEMAPHORE是直接在調(diào)用線程上執(zhí)行,并發(fā)請求受信號量計數(shù)的限制 | hystrix.command.default.execution.isolation.strategy |
| execution.isolation.thread.timeoutInMilliseconds | 1000 | 設(shè)置執(zhí)行hystrix命令包裹方法的超時時間,超過這個時間則執(zhí)行回退邏輯 | hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds |
| execution.timeout.enabled | true | 設(shè)置是否開啟執(zhí)行hystrix命令包裹方法的超時。 | hystrix.command.default.execution.timeout.enabled |
| execution.isolation.thread.interruptOnTimeout | true | 此屬性設(shè)置HystrixCommand.run()在發(fā)生超時時是否應(yīng)中斷執(zhí)行。 |
hystrix.command.default.execution.isolation.thread.interruptOnTimeout |
| execution.isolation.thread.interruptOnCancel | false | 此屬性設(shè)置HystrixCommand.run()在發(fā)生取消時是否應(yīng)中斷執(zhí)行。 |
hystrix.command.default.execution.isolation.thread.interruptOnCancel |
| execution.isolation.semaphore.maxConcurrentRequests | 10 | 當(dāng)使用SEMAPHORE信號量模式時,設(shè)置允許HystrixCommand.run()同時并發(fā)執(zhí)行的最大請求數(shù)。達到最大值時,后面的請求將被拒絕執(zhí)行,并執(zhí)行回退邏輯,如果沒有回退方法,則會拋出異常。生產(chǎn)環(huán)境可根據(jù)實際情況調(diào)整 | hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests |
回退時:
| 屬性 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| fallback.isolation.semaphore.maxConcurrentRequests | 10 | 設(shè)置允許回退方法同時執(zhí)行的最大并發(fā)數(shù),超過這個值,則將拒絕后續(xù)請求并拋出異常(無第二級回退時) | hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests |
| fallback.enabled | true | 是否執(zhí)行回退 | hystrix.command.default.fallback.enabled |
斷路器配置:
| 屬性 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| circuitBreaker.enabled | true | 斷路器開關(guān),此配置決定斷路器是否用于跟蹤運行狀況,以及在其跳閘時是否用于短路請求。 | hystrix.command.default.circuitBreaker.enabled |
| circuitBreaker.requestVolumeThreshold | 20 | 設(shè)置用于觸發(fā)跳閘的滾動窗口的最小失敗請求數(shù)的閾值 | hystrix.command.default.circuitBreaker.requestVolumeThreshold |
| circuitBreaker.sleepWindowInMilliseconds | 5000 | 設(shè)置觸發(fā)斷路器后允許再次執(zhí)行請求前,拒絕請求的時間,單位為毫秒 | hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds |
| circuitBreaker.errorThresholdPercentage | 50 | 設(shè)置觸發(fā)斷路器,走降級邏輯的默認(rèn)百分比最小閾值 | hystrix.command.default.circuitBreaker.errorThresholdPercentage |
| circuitBreaker.forceOpen | false | 是否強制進入斷路器狀態(tài),進入該狀態(tài)將拒絕所有請求 | hystrix.command.default.circuitBreaker.forceOpen |
| circuitBreaker.forceClosed | false | 是否強制關(guān)閉斷路器狀態(tài),關(guān)閉后,將允許所有請求進入 | hystrix.command.default.circuitBreaker.forceClosed |
Metrics:
| 屬性名 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| metrics.rollingStats.timeInMilliseconds | 10000 | 設(shè)置統(tǒng)計滾動窗口失敗請求次數(shù)的統(tǒng)計時間 | hystrix.command.default.metrics.rollingStats.timeInMilliseconds |
| metrics.rollingStats.numBuckets | 10 | 此屬性設(shè)置滾動統(tǒng)計窗口劃分的存儲桶數(shù)。必須能被metrics.rollingStats.timeInMilliseconds整除,否則拋出異常 | hystrix.command.default.metrics.rollingStats.numBuckets |
| metrics.rollingPercentile.enabled | true | 配置是否應(yīng)跟蹤執(zhí)行延遲并將其計算為百分位數(shù)。 | hystrix.command.default.metrics.rollingPercentile.enabled |
| metrics.rollingPercentile.timeInMilliseconds | 60000 | 設(shè)置滾動窗口的持續(xù)時間,其中保持執(zhí)行時間以允許百分位計算,以毫秒為單位。 | hystrix.command.default.metrics.rollingPercentile.timeInMilliseconds |
| metrics.rollingPercentile.numBucketsmetrics.rollingPercentile.numBuckets | 6 | 設(shè)置rollingPercentile窗口將分成的桶數(shù)。必須能被metrics.rollingPercentile.timeInMilliseconds整除,否則拋出異常 | hystrix.command.default.metrics.rollingPercentile.numBuckets |
| metrics.rollingPercentile.bucketSize | 100 | 設(shè)置每個存儲桶保留的最大執(zhí)行次數(shù) | hystrix.command.default.metrics.rollingPercentile.bucketSize |
| metrics.healthSnapshot.intervalInMilliseconds | 500 | 設(shè)置允許執(zhí)行計算成功和錯誤百分比的健康快照與影響斷路器狀態(tài)之間的等待時間(以毫秒為單位)。 | hystrix.command.default.metrics.healthSnapshot.intervalInMilliseconds |
Request Context:
| 屬性 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| requestCache.enabled | true | 請求緩存的開關(guān),開啟之后,hystrix的cacheKey會被緩存掉,當(dāng)同一個請求來時,使用緩存的內(nèi)容 | hystrix.command.default.requestCache.enabled |
| requestLog.enabled | true | 是否打印請求的log | hystrix.command.default.requestLog.enabled |
Collapser Properties:
| 屬性 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| maxRequestsInBatch | Integer.MAX_VALUE | 設(shè)置在批處理之前允許的最大請求數(shù) | hystrix.collapser.default.maxRequestsInBatch |
| timerDelayInMilliseconds | 10 | 設(shè)置在批處理創(chuàng)建完后多少毫秒后執(zhí)行 | hystrix.collapser.default.timerDelayInMilliseconds |
| requestCache.enabled | true | 是否開啟請求緩存 | hystrix.collapser.default.requestCache.enabled |
Thread Properties:
配置hystrix線程池的屬性

| 屬性 | 默認(rèn)值 | 描述 | 全局配置 |
|---|---|---|---|
| coreSize | 10 | 設(shè)置hystrix線程池核心線程池大小 | hystrix.threadpool.default.coreSize |
| maximumSize | 10 | 設(shè)置hystrix線程池最大線程池大小 | hystrix.threadpool.default.maximumSize |
| maxQueueSize | ?1 | 設(shè)置hystrix線程池的最大隊列大小 | hystrix.threadpool.default.maxQueueSize |
| queueSizeRejectionThreshold | 5 | 設(shè)置隊列拒絕閾值,即使未達到maxQueueSize也會發(fā)生拒絕的最大隊列大小,此屬性的存在是因為無法動態(tài)更改maxQueueSize,我們希望允許您動態(tài)更改影響拒絕的隊列大小。當(dāng)maxQueueSize為-1時,此配置不生效 | hystrix.threadpool.default.queueSizeRejectionThreshold |
| keepAliveTimeMinutes | 1 | 設(shè)置線程池線程釋放之前保持活躍狀態(tài)的時間,單位:分鐘 | hystrix.threadpool.default.keepAliveTimeMinutes |
| allowMaximumSizeToDivergeFromCoreSize | false | hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize | |
| metrics.rollingStats.timeInMilliseconds | 10000 | 設(shè)置統(tǒng)計滾動窗口的持續(xù)時間 | hystrix.threadpool.default.metrics.rollingStats.timeInMilliseconds |
| metrics.rollingStats.numBuckets | 10 | 設(shè)置滾動統(tǒng)計窗口分為的桶數(shù)。 | hystrix.threadpool.default.metrics.rollingStats.numBuckets |
服務(wù)容錯保護(Hystrix依賴隔離)
hystrix為每一個命令創(chuàng)建一個獨立的線程池,這樣就算某個hystrix命令由于依賴其他服務(wù)導(dǎo)致出現(xiàn)延遲過高的情況,也只是對該依賴服務(wù)的調(diào)用產(chǎn)生影響,而不會拖累其他服務(wù)。
通過對依賴服務(wù)的線程池隔離實現(xiàn),可以帶來如下優(yōu)勢:
- 應(yīng)用自身得到完全的保護,不會受不可控的依賴服務(wù)影響。即便給依賴服務(wù)分配的線程池被填滿,也不會影響應(yīng)用自身的額其余部分。
- 可以有效的降低接入新服務(wù)的風(fēng)險。如果新服務(wù)接入后運行不穩(wěn)定或存在問題,完全不會影響到應(yīng)用其他的請求。
- 當(dāng)依賴的服務(wù)從失效恢復(fù)正常后,它的線程池會被清理并且能夠馬上恢復(fù)健康的服務(wù),相比之下容器級別的清理恢復(fù)速度要慢得多。
- 當(dāng)依賴的服務(wù)出現(xiàn)配置錯誤的時候,線程池會快速的反應(yīng)出此問題(通過失敗次數(shù)、延遲、超時、拒絕等指標(biāo)的增加情況)。同時,我們可以在不影響應(yīng)用功能的情況下通過實時的動態(tài)屬性刷新(后續(xù)會通過Spring Cloud Config與Spring Cloud Bus的聯(lián)合使用來介紹)來處理它。
- 當(dāng)依賴的服務(wù)因?qū)崿F(xiàn)機制調(diào)整等原因造成其性能出現(xiàn)很大變化的時候,此時線程池的監(jiān)控指標(biāo)信息會反映出這樣的變化。同時,我們也可以通過實時動態(tài)刷新自身應(yīng)用對依賴服務(wù)的閾值進行調(diào)整以適應(yīng)依賴方的改變。
- 除了上面通過線程池隔離服務(wù)發(fā)揮的優(yōu)點之外,每個專有線程池都提供了內(nèi)置的并發(fā)實現(xiàn),可以利用它為同步的依賴服務(wù)構(gòu)建異步的訪問。
總之,通過對依賴服務(wù)實現(xiàn)線程池隔離,讓我們的應(yīng)用更加健壯,不會因為個別依賴服務(wù)出現(xiàn)問題而引起非相關(guān)服務(wù)的異常。同時,也使得我們的應(yīng)用變得更加靈活,可以在不停止服務(wù)的情況下,配合動態(tài)配置刷新實現(xiàn)性能配置上的調(diào)整。
原理及設(shè)計
[圖片上傳失敗...(image-f885c3-1548757736276)]
通過翻看hystrix-javanica的ReadMe以及查看注解@HystrixCommand的引用可以發(fā)現(xiàn),HystrixCommandAspect類是一個很關(guān)鍵的類。
接下來我們從HystrixCommandAspect來入手。
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
//指定切點為@HystrixCommand注解 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//切面
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//先獲取到執(zhí)行的方法數(shù)據(jù)
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//...
//根據(jù)注解類型獲取不同methodHolder的構(gòu)造工廠,methodHolder用來保存方法的一些數(shù)據(jù),如注解以及注解的屬性值等
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//根據(jù)method創(chuàng)建一個methodHolder
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//構(gòu)造hystrixCommand對象
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//獲取方法執(zhí)行類型,同步、異步還是流式。
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
//非流式,執(zhí)行命令
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
//流式,執(zhí)行命令
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
}
非流式命令執(zhí)行代碼:
CommandExecutor.execute()。
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
//......
switch (executionType) {
case SYNCHRONOUS: {
//同步模式執(zhí)行command
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
//異步模式執(zhí)行command
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
//流式模式執(zhí)行command
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
//轉(zhuǎn)換
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
private static HystrixObservable castToObservable(HystrixInvokable invokable) {
//轉(zhuǎn)換
if (invokable instanceof HystrixObservable) {
return (HystrixObservable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
}
同步執(zhí)行的execute()方法有兩個實現(xiàn)類,分別是HystrixCommand以及HystrixCollapser。
通常是HystrixCommand,只有在指定合并多個請求時才會是HystrixCollapser。
我們先看下HystrixCommand的execute()方法。
public R execute() {
try {
//直接調(diào)用queue()方法獲得結(jié)果Future后再調(diào)用get()方法。
return queue().get();
} catch (Exception e) {
//重新拋出異常
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
//...
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
//...處理異常
}
}
return f;
}
Hystrix還有哪些待改進的地方?
待完善。
有沒有更好的解決方式
待完善。