一、推薦閱讀
可能有部分同學(xué)對(duì) Hystrix 的特性了解的不是很清晰,推薦如下文章,寫的真的好;
二、如何使用Hystrix
目前Hystrix最新版本是1.5.13,在項(xiàng)目的pom文件中加上依賴
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.13</version>
</dependency>
1.Command方式
要想使用hystrix,只需要繼承HystrixCommand或HystrixObservableCommand,兩者主要區(qū)別是:
- 前者的命令邏輯寫在
run();后者的命令邏輯寫在construct() - 前者的
run()是由新創(chuàng)建的線程執(zhí)行;后者的construct()是由調(diào)用程序線程執(zhí)行 - 前者一個(gè)實(shí)例只能向調(diào)用程序發(fā)送(emit)單條數(shù)據(jù),;后者一個(gè)實(shí)例可以順序發(fā)送多條數(shù)據(jù)
1.1執(zhí)行命令
execute()、queue()、observe()、toObservable()這4個(gè)方法用來觸發(fā)執(zhí)行run()/construct(),一個(gè)實(shí)例只能執(zhí)行一次這4個(gè)方法,特別說明的是HystrixObservableCommand沒有execute()和queue()。
4個(gè)方法的主要區(qū)別:
-
execute():以同步堵塞方式執(zhí)行run()。調(diào)用execute()后,hystrix先創(chuàng)建一個(gè)新線程運(yùn)行run(),接著調(diào)用程序 要在execute()調(diào)用處一直堵塞著,直到run()運(yùn)行完成 -
queue():以異步非堵塞方式執(zhí)行run()。一調(diào)用queue()就直接返回一個(gè)Future對(duì)象,同時(shí)hystrix創(chuàng)建一個(gè)新線程運(yùn)行run(),調(diào)用程序通過Future.get()拿到run()的返回結(jié)果,而Future.get()是堵塞執(zhí)行的 -
observe():事件注冊(cè)前執(zhí)行run()/construct()。第一步是事件注冊(cè)前,先調(diào)用observe()自動(dòng)觸發(fā) 執(zhí)行run()/construct()(如果繼承的是HystrixCommand,hystrix將創(chuàng)建新線程非堵塞執(zhí)行run();如果繼承的是HystrixObservableCommand,將以調(diào)用程序線程堵塞執(zhí)行construct()),第二步是從observe()返回后調(diào)用程序調(diào)用subscribe()完成事件注冊(cè),如果run()/construct()執(zhí)行成功則觸發(fā)onNext()和onCompleted(),如果執(zhí)行異常則觸發(fā)onError() -
toObservable():事件注冊(cè)后執(zhí)行run()/construct()。第一步是事件注冊(cè)前,一調(diào)用toObservable()就直接返回一個(gè)Observable<String>對(duì)象,第二步調(diào)用subscribe()完成事件注冊(cè)后自動(dòng)觸發(fā)執(zhí)行run()/construct()(如果繼承的是HystrixCommand,hystrix將創(chuàng)建新線程非堵塞執(zhí)行run(),調(diào)用程序不必等待run();如果繼承的是HystrixObservableCommand,將以調(diào)用程序線程堵塞執(zhí)行construct(),調(diào)用程序等待construct()執(zhí)行完才能繼續(xù)往下走),如果run()/construct()執(zhí)行成功則觸發(fā)onNext()和onCompleted(),如果執(zhí)行異常則觸發(fā)onError()
1.2 HystrixCommand
public class CommandHelloWorld2 extends HystrixCommand<String> {
private final String name;
protected CommandHelloWorld2(String name) {
super(//命令分組用于對(duì)依賴操作分組,便于統(tǒng)計(jì),匯總等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依賴超時(shí)時(shí)間,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工廠定義依賴名稱
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工廠定義線程池名稱
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,調(diào)用會(huì)超時(shí)
// TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
CommandHelloWorld2 commandHelloWorld2 = new CommandHelloWorld2("test-Fallback");
String s = commandHelloWorld2.execute();
System.out.println(" 同步 ====== " + s);
/*Future<String> queue = commandHelloWorld2.queue();
String s1 = queue.get();*/
}
}
-
execute() 執(zhí)行
HystrixCommand內(nèi)部的execute方法,可以實(shí)現(xiàn)run方法的同步執(zhí)行String s = commandHelloWorld2.execute(); -
queue() 執(zhí)行
HystrixCommand內(nèi)部的queue方法,可以實(shí)現(xiàn)run方法的異步執(zhí)行,如果依賴多個(gè)下游接口 ,通過異步方式,可以同時(shí)執(zhí)行,提高接口性能。Future<String> queue = commandHelloWorld2.queue(); String s1 = queue.get() -
通過執(zhí)行的結(jié)果發(fā)現(xiàn)
run()是由新創(chuàng)建的線程執(zhí)行,結(jié)果如下同步 ====== Hello test-Fallback thread : hystrix-HelloWorldPool-1
構(gòu)造方法
主要是相關(guān)參數(shù)設(shè)置,具體的參數(shù)的作用后續(xù)會(huì)介紹
protected CommandHelloWorld2(String name) {
super(//命令分組用于對(duì)依賴操作分組,便于統(tǒng)計(jì),匯總等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依賴超時(shí)時(shí)間,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工廠定義依賴名稱
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工廠定義線程池名稱
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
以下是封裝的具體點(diǎn)的參數(shù)設(shè)置,具體的參數(shù)設(shè)置根據(jù)業(yè)務(wù)需求而定;
public CommandHelloWorld4(Integer id) {
super(setter());
this.id = id;
}
private static Setter setter() {
return ApiSetter.setter("getNum");
}
public class ApiSetter {
public static HystrixCommand.Setter setter(String commandKeyName, String threadPoolKeyName) {
return setter("ApiGroup",commandKeyName,threadPoolKeyName);
}
public static HystrixCommand.Setter setter(String commandKeyName) {
return setter(commandKeyName,"Api-Pool");
}
/**
* @author liweihan
* @time 2017/12/20 16:57
* @description 相關(guān)參數(shù)設(shè)置
* @param groupKeyName 服務(wù)分組名
* @param commandKeyName 服務(wù)標(biāo)識(shí)名稱
* @param threadPoolKeyName 線程池名稱
* @return
*/
public static HystrixCommand.Setter setter(String groupKeyName, String commandKeyName, String threadPoolKeyName) {
//服務(wù)分組
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupKeyName);
//服務(wù)標(biāo)識(shí)
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandKeyName);
//線程池名稱
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKeyName);
//線程配置
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
.withCoreSize(25)
.withKeepAliveTimeMinutes(5)
.withMaxQueueSize(Integer.MAX_VALUE)
.withQueueSizeRejectionThreshold(10000);
//命令屬性的配置
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionTimeoutInMilliseconds(3000) //設(shè)置超時(shí)時(shí)間為3秒時(shí)自動(dòng)熔斷
.withCircuitBreakerErrorThresholdPercentage(20);//失敗率達(dá)到20%自動(dòng)熔斷
//返回
return HystrixCommand.Setter
.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties)
.andCommandPropertiesDefaults(commandProperties);
}
}
1.3 HystrixObservableCommand
代碼如下:
public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {
private final String name;
public HelloWorldHystrixObservableCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
// @Override
// protected String getFallback() {
// System.out.println("觸發(fā)了降級(jí)!");
// return "exeucute fallback";
// }
@Override
protected Observable<String> construct() {
System.out.println("in construct! thread:" + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
if (!observer.isUnsubscribed()) {
// observer.onError(getExecutionException()); // 直接拋異常退出,不會(huì)往下執(zhí)行
observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
observer.onNext(name + " thread:" + Thread.currentThread().getName());
System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
observer.onCompleted(); // 不會(huì)往下執(zhí)行observer的任何方法
System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
observer.onNext("abc"); // 不會(huì)執(zhí)行到
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
public static void main(String[] args) throws Exception{
Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").observe();
// Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").toObservable();
Thread.sleep(2000);
System.out.println("睡眠中。。。。。");
// 注冊(cè)觀察者事件
// subscribe()是非堵塞的
hotObservable.subscribe(new Observer<String>() {
// 先執(zhí)行onNext再執(zhí)行onCompleted
@Override
public void onCompleted() {
System.out.println("hotObservable of ObservableCommand completed");
}
@Override
public void onError(Throwable e) {
System.out.println("hotObservable of ObservableCommand error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("hotObservable of ObservableCommand onNext: " + v);
}
});
}
}
執(zhí)行結(jié)果如下:
in construct! thread:main
in call of construct! thread:main
complete before------ thread:main
complete after------ thread:main
睡眠中。。。。。
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
hotObservable of ObservableCommand completed
- 一個(gè)實(shí)例通過observer.onNext可以順序發(fā)送多條數(shù)據(jù)
- 通過執(zhí)行結(jié)果可知調(diào)用
observe()自動(dòng)觸發(fā)執(zhí)行construct()方法 - 通過執(zhí)行結(jié)果 thread:main 可知
construct()是由調(diào)用程序線程執(zhí)行
toObservable()方法
將toObservable()的注釋放開,observe()注釋掉,執(zhí)行main方法,結(jié)果如下:
睡眠中。。。。。
in construct! thread:main
in call of construct! thread:main
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
complete before------ thread:main
hotObservable of ObservableCommand completed
complete after------ thread:main
- 直接調(diào)用
toObservable()并不會(huì)執(zhí)行construct()方法,調(diào)用subscribe()完成事件注冊(cè)后自動(dòng)觸發(fā)執(zhí)行construct()
2.fallback(降級(jí))
使用fallback機(jī)制很簡單,繼承HystrixCommand只需重寫getFallback(),繼承HystrixObservableCommand只需重寫resumeWithFallback(),比如HelloWorldHystrixCommand加上下面代碼片段:
@Override
protected String getFallback() {
return "execute Falled";
}
fallback實(shí)際流程是當(dāng)run()/construct()被觸發(fā)執(zhí)行時(shí)或執(zhí)行中發(fā)生錯(cuò)誤時(shí),將轉(zhuǎn)向執(zhí)行getFallback()/resumeWithFallback()。結(jié)合下圖,4種錯(cuò)誤情況將觸發(fā)fallback:
- 當(dāng)
construct()或者run()方法執(zhí)行過程中拋出異常。 - 當(dāng)回路器打開,命令的執(zhí)行進(jìn)入了熔斷狀態(tài)。
- 當(dāng)命令執(zhí)行的線程池和隊(duì)列或者信號(hào)量已經(jīng)滿容。
- 命令執(zhí)行超時(shí)。
若失敗回退方法執(zhí)行失敗,或者用戶未提供失敗回退方法,Hystrix 會(huì)根據(jù)調(diào)用執(zhí)行命令的方法的不同而產(chǎn)生不同的行為:
-
execute()—— 拋出異常 -
queue()—— 成功返回Future對(duì)象,但其get()方法被調(diào)用時(shí),會(huì)拋出異常 -
observe()—— 返回Observable對(duì)象,當(dāng)你訂閱它的時(shí)候,會(huì)立即調(diào)用 subscriber 的onError方法中止請(qǐng)求 -
toObservable()—— 返回Observable對(duì)象,當(dāng)你訂閱它的時(shí)候,會(huì)立即調(diào)用 subscriber 的onError方法中止請(qǐng)求

途中大致的執(zhí)行順序如下:
1、構(gòu)建 HystrixCommand 或者 HystrixObservableCommand 對(duì)象
2、執(zhí)行命令(即上述 Command 對(duì)象包裝的邏輯)
3、結(jié)果是否有緩存
4、請(qǐng)求線路(類似電路)是否是開路
5、線程池/請(qǐng)求隊(duì)列/信號(hào)量占滿時(shí)會(huì)發(fā)生什么
6、使用 HystrixObservableCommand.construct() 還是 HystrixCommand.run()
7、計(jì)算鏈路健康度
8、失敗回退邏輯
9、返回正常回應(yīng)
接下來,我們一一驗(yàn)證4中情況
1、當(dāng)construct()或者run()方法執(zhí)行過程中拋出異常,代碼如下:
public class HystrixFallbackException extends HystrixCommand<String> {
private final String name;
public HystrixFallbackException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("FallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------會(huì)觸發(fā)fallback的case-------------------*/
//1.主動(dòng)拋出異常
// throw new HystrixTimeoutException();
// throw new RuntimeException("this command will trigger fallback");
// throw new Exception("this command will trigger fallback");
// throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, commandClass, message, cause, fallbackException);
// 2.除零異常
//int i = 1/0;
/*---------------不會(huì)觸發(fā)fallback的case-------------------*/
// 3.被捕獲的異常不會(huì)觸發(fā)fallback
/*try {
throw new RuntimeException("this command never trigger fallback");
} catch(Exception e) {
e.printStackTrace();
}*/
// 4.HystrixBadRequestException異常由非法參數(shù)或非系統(tǒng)錯(cuò)誤引起,不會(huì)觸發(fā)fallback,也不會(huì)被計(jì)入熔斷器
throw new HystrixBadRequestException("HystrixBadRequestException is never trigger fallback");
//return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
HystrixFallbackException hlx = new HystrixFallbackException("Hlx");
try {
String execute = hlx.execute();
System.out.println(execute);
} catch (Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),會(huì)被捕獲到這里" + e.getCause());
}
}
}
幾種異常情況已經(jīng)在代碼中注釋了,可直接去嘗試每種情況
非HystrixBadRequestException異常:當(dāng)拋出HystrixBadRequestException時(shí),調(diào)用程序可以捕獲異常,沒有觸發(fā)
getFallback(),而其他異常則會(huì)觸發(fā)getFallback(),調(diào)用程序?qū)@得getFallback()的返回
2、命令執(zhí)行超時(shí)
代碼如下:
public class HystrixFallbackTimeOut extends HystrixCommand<String> {
private final String name;
protected HystrixFallbackTimeOut(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,調(diào)用會(huì)超時(shí)
TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
HystrixFallbackTimeOut hystrixFallbackTimeOut = new HystrixFallbackTimeOut("Fallback-timeout");
String s = hystrixFallbackTimeOut.execute();
System.out.println(" 同步 ====== " + s);
}
}
- run()方法由于睡眠了2s,Hystrix的默認(rèn)的執(zhí)行超時(shí)時(shí)間為1000ms,所以執(zhí)行會(huì)超時(shí)
3、當(dāng)回路器打開,命令的執(zhí)行進(jìn)入了熔斷狀態(tài)
代碼如下:
/**
*
* CircuitBreakerRequestVolumeThreshold設(shè)置為3,意味著10s內(nèi)請(qǐng)求超過3次才會(huì)觸發(fā)熔斷器
* circuitBreakerErrorThresholdPercentage設(shè)置為80,錯(cuò)誤率是為%80才會(huì)觸發(fā)熔斷器
* 必須兩個(gè)參數(shù)同時(shí)滿足才會(huì)才會(huì)觸發(fā)熔斷器
* run()中使命令超時(shí)進(jìn)入fallback,執(zhí)行4次run后,將被熔斷,進(jìn)入降級(jí),即不進(jìn)入run()而直接進(jìn)入fallback
*
*/
public class HystrixFallbackCircuitBreaker extends HystrixCommand<String> {
private Integer id;
public HystrixFallbackCircuitBreaker(Integer id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 配置線程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 配置線程池里的線程數(shù),設(shè)置足夠多線程,以防未熔斷卻打滿threadpool
)
.andCommandPropertiesDefaults( // 配置熔斷器
HystrixCommandProperties.Setter()
//開啟熔斷器
.withCircuitBreakerEnabled(true)
//滑動(dòng)窗口內(nèi)(10s)的請(qǐng)求數(shù)閾值,只有達(dá)到了這個(gè)閾值,才有可能熔斷。默認(rèn)是 20,如果這個(gè)時(shí)間段只有19個(gè)請(qǐng)求,就算全部失敗了,也不會(huì)自動(dòng)熔斷。
.withCircuitBreakerRequestVolumeThreshold(3)
//錯(cuò)誤率閾值,默認(rèn) 50%,比如(10s)內(nèi)有100個(gè)請(qǐng)求,其中有60個(gè)發(fā)生異常,那么這段時(shí)間的錯(cuò)誤率是 60,已經(jīng)超過了錯(cuò)誤率閾值,熔斷器會(huì)自動(dòng)打開。
.withCircuitBreakerErrorThresholdPercentage(80)
// .withCircuitBreakerForceOpen(true) // 置為true時(shí),所有請(qǐng)求都將被拒絕,直接到fallback
// .withCircuitBreakerForceClosed(true) // 置為true時(shí),將忽略錯(cuò)誤
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 信號(hào)量隔離
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println("running run():" + id);
if (id % 2 == 0 && id <= 10) { //讓小于等于10的偶數(shù)返回
return "running run():" + id;
} else { //讓奇數(shù)或大于10的數(shù)進(jìn)入fallback
TimeUnit.MILLISECONDS.sleep(2000);
return id+"";
}
}
@Override
protected String getFallback() {
return " ====== CircuitBreaker fallback" + id + " ,是否進(jìn)入熔斷:" + super.isCircuitBreakerOpen();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i++) {
try {
System.out.println("===========" + new HystrixFallbackCircuitBreaker(i).execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
}
- 我們配置10s內(nèi)請(qǐng)求數(shù)大于3個(gè)時(shí)就啟動(dòng)熔斷器,請(qǐng)求錯(cuò)誤率大于80%時(shí)就熔斷,然后for循環(huán)發(fā)起請(qǐng)求,當(dāng)請(qǐng)求符合熔斷條件時(shí)將觸發(fā)
getFallback()。
4、當(dāng)命令執(zhí)行的線程池和隊(duì)列或者信號(hào)量已經(jīng)滿容
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.failback;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Created by QDHL on 2018/10/11.
*
* @author mingqiang ji
*/
public class HystrixFallbackThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixFallbackThreadPool(String name) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線程池里的線程數(shù)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixFallbackThreadPool("Hlx"+i).queue();
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixFallbackThreadPool("Hlx").execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
}
}
-
我們配置線程池?cái)?shù)目為3,然后先用一個(gè)for循環(huán)執(zhí)行
queue(),觸發(fā)的run()sleep 3s,然后再用第2個(gè)for循環(huán)執(zhí)行execute(),發(fā)現(xiàn)所有execute()都觸發(fā)了fallback,這是因?yàn)榈?個(gè)for的線程還在sleep,占用著線程池所有線程,導(dǎo)致第2個(gè)for的所有命令都無法獲取到線程。3.隔離策略
hystrix提供了兩種隔離策略:線程池隔離和信號(hào)量隔離。hystrix默認(rèn)采用線程池隔離。
- 線程池隔離:不同服務(wù)通過使用不同線程池,彼此間將不受影響,達(dá)到隔離效果。我們通過andThreadPoolKey配置使用命名為
ThreadPoolTest的線程池,實(shí)現(xiàn)與其他命名的線程池天然隔離,如果不配置andThreadPoolKey則使用withGroupKey配置來命名線程池 - 信號(hào)量隔離:線程隔離會(huì)帶來線程開銷,有些場景(比如無網(wǎng)絡(luò)請(qǐng)求場景)可能會(huì)因?yàn)橛瞄_銷換隔離得不償失,為此hystrix提供了信號(hào)量隔離,當(dāng)服務(wù)的并發(fā)數(shù)大于信號(hào)量閾值時(shí)將進(jìn)入fallback。通過
withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)配置為信號(hào)量隔離,通過withExecutionIsolationSemaphoreMaxConcurrentRequests配置執(zhí)行并發(fā)數(shù)。
3.1 線程池隔離
在該模式下,用戶請(qǐng)求會(huì)被提交到各自的線程池中執(zhí)行,把執(zhí)行每個(gè)下游服務(wù)的線程分離,這樣,某個(gè)依賴服務(wù)的高延遲只會(huì)拖慢這個(gè)依賴服務(wù)對(duì)應(yīng)的線程池,從而達(dá)到資源隔離的作用。當(dāng)線程池來不及處理并且請(qǐng)求隊(duì)列塞滿時(shí),新進(jìn)來的請(qǐng)求將快速失敗,可以避免依賴問題擴(kuò)散。
優(yōu)勢
- 減少所依賴服務(wù)發(fā)生故障時(shí)的影響面,比如ServiceA服務(wù)發(fā)生異常,導(dǎo)致請(qǐng)求大量超時(shí),對(duì)應(yīng)的線程池被打滿,這時(shí)并不影響ServiceB、ServiceC的調(diào)用。
- 如果接口性能有變動(dòng),可以方便的動(dòng)態(tài)調(diào)整線程池的參數(shù)或者是超時(shí)時(shí)間,前提是Hystrix參數(shù)實(shí)現(xiàn)了動(dòng)態(tài)調(diào)整。
缺點(diǎn)
- 請(qǐng)求在線程池中執(zhí)行,肯定會(huì)帶來任務(wù)調(diào)度、排隊(duì)和上下文切換帶來的開銷。
- 因?yàn)樯婕暗娇缇€程,那么就存在ThreadLocal數(shù)據(jù)的傳遞問題,比如在主線程初始化的ThreadLocal變量,在線程池線程中無法獲取
注意
因?yàn)镠ystrix默認(rèn)使用了線程池模式,所以對(duì)于每個(gè)Command,在初始化的時(shí)候,會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的線程池,如果項(xiàng)目中需要進(jìn)行降級(jí)的接口非常多,比如有上百個(gè)的話,不太了解Hystrix內(nèi)部機(jī)制的同學(xué),按照默認(rèn)配置直接使用,可能就會(huì)造成線程資源的大量浪費(fèi)。
- 線程池隔離:不同服務(wù)通過使用不同線程池,彼此間將不受影響,達(dá)到隔離效果。我們通過andThreadPoolKey配置使用命名為
線程池的使用示意圖如下圖所示,當(dāng)n個(gè)請(qǐng)求線程并發(fā)對(duì)某個(gè)接口請(qǐng)求調(diào)用時(shí),會(huì)先從hystrix管理的線程池里面獲得一個(gè)線程,然后將參數(shù)傳遞給這個(gè)線程去執(zhí)行真正調(diào)用。線程池的大小有限,默認(rèn)是10個(gè)線程,可以使用maxConcurrentRequests參數(shù)配置,如果并發(fā)請(qǐng)求數(shù)多于線程池線程個(gè)數(shù),就有線程需要進(jìn)入隊(duì)列排隊(duì),但排隊(duì)隊(duì)列也有上限,默認(rèn)是 5,如果排隊(duì)隊(duì)列也滿,則必定有請(qǐng)求線程會(huì)走fallback流程。
線程池模式可以支持異步調(diào)用,支持超時(shí)調(diào)用,支持直接熔斷,存在線程切換,開銷大。

代碼如下:
public class HystrixThreadPoolLsolation extends HystrixCommand<String> {
private final String name;
public HystrixThreadPoolLsolation(String name,String threadPoolName) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolName))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線程池里的線程數(shù)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name+"-"+Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixThreadPoolLsolation("Hlx"+i,"thread-pool-1").queue();
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixThreadPoolLsolation("Hlx","thread-pool-2").execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時(shí),被捕獲到這里" + e.getCause());
}
}
}
}
- 我們配置線程池?cái)?shù)目為3,然后先用一個(gè)for循環(huán)執(zhí)行
queue(),觸發(fā)的run()sleep 3s,然后再用第2個(gè)for循環(huán)執(zhí)行execute(),發(fā)現(xiàn)所有execute()沒有觸發(fā)fallback,而是繼續(xù)執(zhí)行,這是因?yàn)閮蓚€(gè)命令配置了不同的線程池。
3.2 信號(hào)量
如果要使用信號(hào)量模式,需要配置參數(shù)execution.isolation.strategy=ExecutionIsolationStrategy.SEMAPHORE;
另外,為了限制對(duì)下游依賴的并發(fā)調(diào)用量,可以配置Hystrix的
execution.isolation.semaphore.maxConcurrentRequests
當(dāng)并發(fā)請(qǐng)求數(shù)達(dá)到閾值時(shí),請(qǐng)求線程可以快速失敗,執(zhí)行降級(jí)。
信號(hào)量的使用示意圖如下圖所示,當(dāng)n個(gè)并發(fā)請(qǐng)求去調(diào)用一個(gè)目標(biāo)服務(wù)接口時(shí),都要獲取一個(gè)信號(hào)量才能真正去調(diào)用目標(biāo)服務(wù)接口,但信號(hào)量有限,默認(rèn)是10個(gè),可以使用maxConcurrentRequests參數(shù)配置,如果并發(fā)請(qǐng)求數(shù)多于信號(hào)量個(gè)數(shù),信號(hào)量在達(dá)到上限時(shí),會(huì)拒絕后續(xù)請(qǐng)求的訪問,則必定有請(qǐng)求線程會(huì)走fallback流程,從而達(dá)到限流和防止雪崩的目的。

信號(hào)量模式從始至終都只有請(qǐng)求線程自身,是同步調(diào)用模式,不支持超時(shí)調(diào)用,由于沒有線程的切換,開銷非常小。
在該模式下,接收請(qǐng)求和執(zhí)行下游依賴在同一個(gè)線程內(nèi)完成,
比如一個(gè)接口中依賴了3個(gè)下游:serviceA、serviceB、serviceC,且這3個(gè)服務(wù)返回的數(shù)據(jù)互相不依賴,這種情況下如果針對(duì)A、B、C的熔斷降級(jí)使用信號(hào)量模式,那么接口耗時(shí)就等于請(qǐng)求A、B、C服務(wù)耗時(shí)的總和,無疑這不是好的方案。
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.Isolationstrategy;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 測試信號(hào)量隔離
* 默認(rèn)執(zhí)行run()用的是主線程,為了模擬并行執(zhí)行command,這里我們自己創(chuàng)建多個(gè)線程來執(zhí)行command
* 設(shè)置ExecutionIsolationSemaphoreMaxConcurrentRequests為3,意味著信號(hào)量最多允許執(zhí)行run的并發(fā)數(shù)為3,超過則觸發(fā)降級(jí),即不執(zhí)行run而執(zhí)行g(shù)etFallback
* 設(shè)置FallbackIsolationSemaphoreMaxConcurrentRequests為1,意味著信號(hào)量最多允許執(zhí)行fallback的并發(fā)數(shù)為1,超過則拋異常fallback execution rejected
*/
public class HystrixSemaphorelLsolation extends HystrixCommand<String> {
private final String name;
public HystrixSemaphorelLsolation(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreTestThreadPoolKey"))
.andCommandPropertiesDefaults( // 配置信號(hào)量隔離
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) // 信號(hào)量隔離
.withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
)
// 設(shè)置了信號(hào)量隔離后,線程池配置將變無效
// .andThreadPoolPropertiesDefaults(
// HystrixThreadPoolProperties.Setter()
// .withCoreSize(13) // 配置線程池里的線程數(shù)
// )
);
this.name = name;
}
@Override
protected String run() throws Exception {
return "run(): name="+name+",線程名是" + Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "getFallback(): name="+name+",線程名是" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
try {
Thread.sleep(2000);
for(int i = 0; i < 5; i++) {
final int j = i;
// 自主創(chuàng)建線程來執(zhí)行command,創(chuàng)造并發(fā)場景
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("===========" + new HystrixSemaphorelLsolation("HLX" + j).execute());
}
});
thread.start();
}
} catch(Exception e) {
e.printStackTrace();
}
System.in.read();
}
}
-
由于并發(fā)執(zhí)行5個(gè)線程,ExecutionIsolationSemaphoreMaxConcurrentRequests為3,設(shè)置FallbackIsolationSemaphoreMaxConcurrentRequests為1,最終執(zhí)行結(jié)果是3個(gè)線程可以并發(fā)執(zhí)行
run(),1個(gè)線程執(zhí)行
getFallback(),一個(gè)線程拋出異常;
4.熔斷器
在生活中,如果電路的負(fù)載過高,保險(xiǎn)箱會(huì)自動(dòng)跳閘,以保護(hù)家里的各種電器,這就是熔斷器的一個(gè)活生生例子。在Hystrix中也存在這樣一個(gè)熔斷器,當(dāng)所依賴的服務(wù)不穩(wěn)定時(shí),能夠自動(dòng)熔斷,并提供有損服務(wù),保護(hù)服務(wù)的穩(wěn)定性。在運(yùn)行過程中,Hystrix會(huì)根據(jù)接口的執(zhí)行狀態(tài)(成功、失敗、超時(shí)和拒絕),收集并統(tǒng)計(jì)這些數(shù)據(jù),根據(jù)這些信息來實(shí)時(shí)決策是否進(jìn)行熔斷。

線路的開路閉路詳細(xì)邏輯如下:
- 假設(shè)線路內(nèi)的容量(請(qǐng)求QPS)達(dá)到一定閾值(通過
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()配置) - 同時(shí),假設(shè)線路內(nèi)的錯(cuò)誤率達(dá)到一定閾值(通過
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()配置) - 熔斷器將從『閉路』轉(zhuǎn)換成『開路』
- 若此時(shí)是『開路』狀態(tài),熔斷器將短路后續(xù)所有經(jīng)過該熔斷器的請(qǐng)求,這些請(qǐng)求直接走『失敗回退邏輯』
- 經(jīng)過一定時(shí)間(即『休眠窗口』,通過
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()配置),后續(xù)第一個(gè)請(qǐng)求將會(huì)被允許通過熔斷器(此時(shí)熔斷器處于『半開』狀態(tài)),若該請(qǐng)求失敗,熔斷器將又進(jìn)入『開路』狀態(tài),且在休眠窗口內(nèi)保持此狀態(tài);若該請(qǐng)求成功,熔斷器將進(jìn)入『閉路』狀態(tài),回到邏輯1循環(huán)往復(fù)。
? 代碼可以參考 【2.fallback(降級(jí))中的 3、當(dāng)回路器打開,命令的執(zhí)行進(jìn)入了熔斷狀態(tài)的代碼片段 】
5.請(qǐng)求緩存
5.1 請(qǐng)求緩存有如下好處:
-
不同請(qǐng)求路徑上針對(duì)同一個(gè)依賴服務(wù)進(jìn)行的重復(fù)請(qǐng)求(有同一個(gè)緩存 Key),不會(huì)真實(shí)請(qǐng)求多次
這個(gè)特性在企業(yè)級(jí)系統(tǒng)中非常有用,在這些系統(tǒng)中,開發(fā)者往往開發(fā)的只是系統(tǒng)功能的一部分。(注:這樣,開發(fā)者彼此隔離,不太可能使用同樣的方法或者策略去請(qǐng)求同一個(gè)依賴服務(wù)提供的資源)
例如,請(qǐng)求一個(gè)用戶的
Account的邏輯如下所示,這個(gè)邏輯往往在系統(tǒng)不同地方被用到:Account account = new UserGetAccount(accountId).execute(); //or Observable<Account> accountObservable = new UserGetAccount(accountId).observe();Hystrix 的
RequestCache只會(huì)在內(nèi)部執(zhí)行run()方法一次,上面兩個(gè)線程在執(zhí)行HystrixCommand命令時(shí),會(huì)得到相同的結(jié)果,即使這兩個(gè)命令是兩個(gè)不同的實(shí)例。 -
數(shù)據(jù)獲取具有一致性
因?yàn)榫彺娴拇嬖?,除了第一次?qǐng)求需要真正訪問依賴服務(wù)以外,后續(xù)請(qǐng)求全部從緩存中獲取,可以保證在同一個(gè)用戶請(qǐng)求內(nèi),不會(huì)出現(xiàn)依賴服務(wù)返回不同的回應(yīng)的情況。
-
避免不必要的線程執(zhí)行
在
construct()或run()方法執(zhí)行之前,會(huì)先從請(qǐng)求緩存中獲取數(shù)據(jù),因此,Hystrix 能利用這個(gè)特性避免不必要的線程執(zhí)行,減小系統(tǒng)開銷。若 Hystrix 沒有實(shí)現(xiàn)請(qǐng)求緩存,那么
HystrixCommand和HystrixObservableCommand的實(shí)現(xiàn)者需要自己在construct()或run()方法中實(shí)現(xiàn)緩存,這種方式無法避免不必要的線程執(zhí)行開銷。
5.2 緩存的使用
要使用hystrix cache功能,第一個(gè)要求是重寫getCacheKey(),用來構(gòu)造cache key;第二個(gè)要求是構(gòu)建context,如果請(qǐng)求B要用到請(qǐng)求A的結(jié)果緩存,A和B必須同處一個(gè)context。通過HystrixRequestContext.initializeContext()和context.shutdown()可以構(gòu)建一個(gè)context,這兩條語句間的所有請(qǐng)求都處于同一個(gè)context。
代碼如下:
public class CommandHelloWorld3 extends HystrixCommand<String> {
private final int id;
protected CommandHelloWorld3(int id) {
super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute id = " + id);
return "execute=" + id;
}
//重寫getCacheKey,實(shí)現(xiàn)區(qū)分不同請(qǐng)求的邏輯
@Override
protected String getCacheKey() {
System.out.println(" --- ");
return String.valueOf(id);
}
public static void main(String[] args) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_a = new CommandHelloWorld3(22);
CommandHelloWorld3 commandHelloWorld3_b = new CommandHelloWorld3(22);
System.out.println("a執(zhí)行結(jié)果:" + commandHelloWorld3_a.execute());
System.out.println("a執(zhí)行結(jié)果是否從緩存中獲?。? + commandHelloWorld3_a.isResponseFromCache);
System.out.println("b執(zhí)行結(jié)果:" + commandHelloWorld3_b.execute());
System.out.println("b執(zhí)行結(jié)果是否從緩存中獲取:" + commandHelloWorld3_b.isResponseFromCache);
} finally {
context.shutdown();
}
context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_c = new CommandHelloWorld3(22);
System.out.println("c執(zhí)行結(jié)果:" + commandHelloWorld3_c.execute());
System.out.println("c執(zhí)行結(jié)果是否從緩存中獲?。? + commandHelloWorld3_c.isResponseFromCache);
} finally {
context.shutdown();
}
}
}
執(zhí)行結(jié)果如下:
---
---
hystrix-RequestCacheCommand-1 execute id = 22
a執(zhí)行結(jié)果:execute=22
a執(zhí)行結(jié)果是否從緩存中獲取:false
---
---
b執(zhí)行結(jié)果:execute=22
b執(zhí)行結(jié)果是否從緩存中獲?。簍rue
---
---
hystrix-RequestCacheCommand-2 execute id = 22
c執(zhí)行結(jié)果:execute=22
c執(zhí)行結(jié)果是否從緩存中獲?。篺alse
6.合并請(qǐng)求collapsing
下圖展示了在兩種場景(未增加『請(qǐng)求合并器』和增加『請(qǐng)求合并器』)下,線程和網(wǎng)絡(luò)連接數(shù)量(假設(shè)所有請(qǐng)求在一個(gè)很小的時(shí)間窗口內(nèi),例如 10ms,是『并發(fā)』的):

為什么要使用請(qǐng)求合并?
例如,這里有一個(gè)包含 300 個(gè)視頻對(duì)象的列表,需要遍歷這個(gè)列表,并對(duì)每一個(gè)對(duì)象調(diào)用 getSomeAttribute() 方法,但如果簡單處理的話,可能會(huì)導(dǎo)致 300 次的網(wǎng)絡(luò)請(qǐng)求(假設(shè) getSomeAttribute() 方法內(nèi)需要發(fā)出網(wǎng)絡(luò)請(qǐng)求),每一個(gè)網(wǎng)絡(luò)請(qǐng)求可能都會(huì)花上幾毫秒(顯然,這種方式非常容易拖慢系統(tǒng))。
通過使用 HystrixCollapser,Hystrix 能自動(dòng)完成請(qǐng)求的合并,可以將300個(gè)網(wǎng)絡(luò)請(qǐng)求降低為只發(fā)送一次網(wǎng)絡(luò)請(qǐng)求,大大的減少了網(wǎng)絡(luò)的耗時(shí)。
Hystrix中的請(qǐng)求合并,就是利用一個(gè)合并處理器,將對(duì)同一個(gè)服務(wù)發(fā)起的連續(xù)請(qǐng)求合并成一個(gè)請(qǐng)求進(jìn)行處理(這些連續(xù)請(qǐng)求的時(shí)間窗默認(rèn)為10ms),可以有效節(jié)省網(wǎng)絡(luò)帶寬和線程池資源
請(qǐng)求合并帶來的額外開銷
請(qǐng)求合并會(huì)導(dǎo)致依賴服務(wù)的請(qǐng)求延遲增高(該延遲為等待請(qǐng)求的延遲),延遲的最大值為合并時(shí)間窗口大小。
若某個(gè)請(qǐng)求耗時(shí)的可能是 5ms,合并時(shí)間窗口為 10ms,但是現(xiàn)在必須再等10ms看看還有沒有其他的請(qǐng)求一起的,這樣一個(gè)請(qǐng)求的耗時(shí)就從5ms增加到15ms了。
請(qǐng)求合并帶來的額外開銷是否值得,取決于將要執(zhí)行的命令,高延遲的命令相比較而言不會(huì)有太大的影響,因?yàn)檫@個(gè)時(shí)候時(shí)間窗的時(shí)間消耗就顯得微不足道了
另外,如果一個(gè)命令具有高并發(fā)度,并且能批量處理多個(gè),甚至上百個(gè)的話,請(qǐng)求合并帶來的性能開銷會(huì)因?yàn)橥掏铝康臉O大提升而基本可以忽略,因?yàn)?Hystrix 會(huì)減少這些請(qǐng)求所需的線程和網(wǎng)絡(luò)連接數(shù)量。如果一個(gè)合并時(shí)間窗口內(nèi)只有 1~2 個(gè)請(qǐng)求,將請(qǐng)求合并顯然不是明智的選擇。
代碼如下:
package com.jimingqiang.study.hystrix.hystrixbegin.collapsing;
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class HystrixCollapsing extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public HystrixCollapsing(Integer key) {
this.key = key;
}
/**
* 請(qǐng)求參數(shù)
* 如果有多個(gè)參數(shù)需要被綁定,創(chuàng)建一個(gè)單獨(dú)的對(duì)象來包含它們,或者使用Tuple。
* @return
*/
@Override
public Integer getRequestArgument() {
return key;
}
/**
* 創(chuàng)建一個(gè)批量請(qǐng)求命令
* @param requests 保存了延遲時(shí)間窗中收集到的所有單個(gè)的請(qǐng)求的參數(shù) String是返回結(jié)果類型,Integer是請(qǐng)求參數(shù)類型
* @return
*/
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
return new BatchCommand(requests); // 把批量請(qǐng)求傳給command類
}
//
/**
* 把批量請(qǐng)求的結(jié)果和對(duì)應(yīng)的請(qǐng)求一一對(duì)應(yīng)起來
* @param batchResponse 響應(yīng)的結(jié)果
* @param requests 請(qǐng)求參數(shù)
*/
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
// command類
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollepsingGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CollepsingKey")
));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
// 處理每個(gè)請(qǐng)求,返回結(jié)果
for (CollapsedRequest<String, Integer> request : requests) {
// artificial response for each argument received in the batch
response.add("ValueForKey: " + request.getArgument() + " thread:" + Thread.currentThread().getName());
}
return response;
}
}
public static void main(String[] args) throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new HystrixCollapsing(1).queue();
Future<String> f2 = new HystrixCollapsing(2).queue();
Future<String> f3 = new HystrixCollapsing(3).queue();
Future<String> f4 = new HystrixCollapsing(4).queue();
Future<String> f5 = new HystrixCollapsing(5).queue();
// f5和f6,如果sleep時(shí)間夠小則會(huì)合并,如果sleep時(shí)間夠大則不會(huì)合并,默認(rèn)10ms
TimeUnit.MILLISECONDS.sleep(1000);
Future<String> f6 = new HystrixCollapsing(6).queue();
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
System.out.println(f5.get());
System.out.println(f6.get());
// note:numExecuted表示共有幾個(gè)命令執(zhí)行,1個(gè)批量多命令請(qǐng)求算一個(gè),這個(gè)實(shí)際值可能比代碼寫的要多,因?yàn)閐ue to non-determinism of scheduler since this example uses the real timer
int numExecuted = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size();
System.out.println("num executed: " + numExecuted);
int numLogs = 0;
for (HystrixInvokableInfo<?> command : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
numLogs++;
System.err.println(command.getCommandKey().name() + " => command.getExecutionEvents(): " + command.getExecutionEvents());
}
System.out.println(numLogs==numExecuted);
} finally {
context.shutdown();
}
}
}
重要一點(diǎn),兩個(gè)請(qǐng)求能自動(dòng)合并的前提是兩者足夠“近”,即合并時(shí)間窗口,兩者啟動(dòng)執(zhí)行的間隔時(shí)長要足夠小,默認(rèn)為10ms,即超過10ms將不自動(dòng)合并。
-
我們連續(xù)發(fā)起多個(gè)queue請(qǐng)求,依次返回f1~f6共6個(gè)Future對(duì)象,根據(jù)打印結(jié)果可知f2~f5同處一個(gè)線程,說明這4個(gè)請(qǐng)求被合并了,而f6由另一個(gè)線程執(zhí)行,這是因?yàn)?em>f5和f6中間隔了一個(gè)sleep,超過了合并要求的最大間隔時(shí)長,f1為什么沒有合并到一起,我很疑惑
執(zhí)行結(jié)果如下:
ValueForKey: 1 thread:hystrix-CollepsingGroup-1 ValueForKey: 2 thread:hystrix-CollepsingGroup-2 ValueForKey: 3 thread:hystrix-CollepsingGroup-2 ValueForKey: 4 thread:hystrix-CollepsingGroup-2 ValueForKey: 5 thread:hystrix-CollepsingGroup-2 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] ValueForKey: 6 thread:hystrix-CollepsingGroup-3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] num executed: 3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] true
7. 配置策略
? 具體的配置可以查看官網(wǎng) 官網(wǎng)配置地址,下邊僅是簡單的整理
-
HystrixCommandProperties
/* --------------統(tǒng)計(jì)相關(guān)------------------*/ // 統(tǒng)計(jì)滾動(dòng)的時(shí)間窗口,默認(rèn):5000毫秒(取自circuitBreakerSleepWindowInMilliseconds) private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds; // 統(tǒng)計(jì)窗口的Buckets的數(shù)量,默認(rèn):10個(gè),每秒一個(gè)Buckets統(tǒng)計(jì) private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow // 是否開啟監(jiān)控統(tǒng)計(jì)功能,默認(rèn):true private final HystrixProperty metricsRollingPercentileEnabled; /* --------------熔斷器相關(guān)------------------*/ // 熔斷器在整個(gè)統(tǒng)計(jì)時(shí)間內(nèi)是否開啟的閥值,默認(rèn)20。也就是在metricsRollingStatisticalWindowInMilliseconds(默認(rèn)10s)內(nèi)至少請(qǐng)求20次,熔斷器才發(fā)揮起作用 private final HystrixProperty circuitBreakerRequestVolumeThreshold; // 熔斷時(shí)間窗口,默認(rèn):5秒.熔斷器中斷請(qǐng)求5秒后會(huì)進(jìn)入半打開狀態(tài),放下一個(gè)請(qǐng)求進(jìn)來重試,如果該請(qǐng)求成功就關(guān)閉熔斷器,否則繼續(xù)等待一個(gè)熔斷時(shí)間窗口 private final HystrixProperty circuitBreakerSleepWindowInMilliseconds; //是否啟用熔斷器,默認(rèn)true. 啟動(dòng) private final HystrixProperty circuitBreakerEnabled; //默認(rèn):50%。當(dāng)出錯(cuò)率超過50%后熔斷器啟動(dòng) private final HystrixProperty circuitBreakerErrorThresholdPercentage; //是否強(qiáng)制開啟熔斷器阻斷所有請(qǐng)求,默認(rèn):false,不開啟。置為true時(shí),所有請(qǐng)求都將被拒絕,直接到fallback private final HystrixProperty circuitBreakerForceOpen; //是否允許熔斷器忽略錯(cuò)誤,默認(rèn)false, 不開啟 private final HystrixProperty circuitBreakerForceClosed; /* --------------信號(hào)量相關(guān)------------------*/ //使用信號(hào)量隔離時(shí),命令調(diào)用最大的并發(fā)數(shù),默認(rèn):10 private final HystrixProperty executionIsolationSemaphoreMaxConcurrentRequests; //使用信號(hào)量隔離時(shí),命令fallback(降級(jí))調(diào)用最大的并發(fā)數(shù),默認(rèn):10 private final HystrixProperty fallbackIsolationSemaphoreMaxConcurrentRequests; /* --------------其他------------------*/ //使用命令調(diào)用隔離方式,默認(rèn):采用線程隔離,ExecutionIsolationStrategy.THREAD private final HystrixProperty executionIsolationStrategy; //使用線程隔離時(shí),調(diào)用超時(shí)時(shí)間,默認(rèn):1秒 private final HystrixProperty executionIsolationThreadTimeoutInMilliseconds; //線程池的key,用于決定命令在哪個(gè)線程池執(zhí)行 private final HystrixProperty executionIsolationThreadPoolKeyOverride; //是否開啟fallback降級(jí)策略 默認(rèn):true private final HystrixProperty fallbackEnabled; // 使用線程隔離時(shí),是否對(duì)命令執(zhí)行超時(shí)的線程調(diào)用中斷(Thread.interrupt())操作.默認(rèn):true private final HystrixProperty executionIsolationThreadInterruptOnTimeout; // 是否開啟請(qǐng)求日志,默認(rèn):true private final HystrixProperty requestLogEnabled; //是否開啟請(qǐng)求緩存,默認(rèn):true private final HystrixProperty requestCacheEnabled; // Whether request caching is enabled.
-
HystrixCollapserProperties
//請(qǐng)求合并是允許的最大請(qǐng)求數(shù),默認(rèn): Integer.MAX_VALUE private final HystrixProperty maxRequestsInBatch; //批處理過程中每個(gè)命令延遲的時(shí)間,默認(rèn):10毫秒 private final HystrixProperty timerDelayInMilliseconds; //批處理過程中是否開啟請(qǐng)求緩存,默認(rèn):開啟 private final HystrixProperty requestCacheEnabled;
-
HystrixThreadPoolProperties
/* 配置線程池大小,默認(rèn)值10個(gè). 建議值:請(qǐng)求高峰時(shí)99.5%的平均響應(yīng)時(shí)間 + 向上預(yù)留一些即可 */ private final HystrixProperty corePoolSize; /* 配置線程值等待隊(duì)列長度,默認(rèn)值:-1 建議值:-1表示不等待直接拒絕,測試表明線程池使用直接決絕策略+ 合適大小的非回縮線程池效率最高.所以不建議修改此值。 當(dāng)使用非回縮線程池時(shí),queueSizeRejectionThreshold,keepAliveTimeMinutes 參數(shù)無效 */ private final HystrixProperty maxQueueSize;
參考: