配置
<properties>
<!-- 依賴版本 -->
<hystrix.version>1.3.16</hystrix.version>
<hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
</properties>
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>${hystrix.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>${hystrix-metrics-event-stream.version}</version>
</dependency>
</dependencies>
使用
超時(shí)降級(jí)
package com.fulton_shaw;
import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
/**
* @author xiaohuadong
* @date 2018/11/06
*/
public class FallbackCommand extends HystrixCommand<String> {
public FallbackCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(name))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
}
@Override
protected String getFallback() {
return "FALLBACK";
}
protected String run() throws Exception {
ThreadUtils.sleepRandom(300,700);
return "SUCCEED RUN";
}
}
package com.fulton_shaw;
import com.fulton_shaw.common.util.concurrent.ExpectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
/**
* @author xiaohuadong
* @date 2018/11/06
*/
public class FallbackMain {
private static final Logger LOG = LoggerFactory.getLogger(FallbackMain.class);
public static void main(String[] args) {
ExpectUtils.untilTrueCount(20, new Callable<Boolean>() {
public Boolean call() throws Exception {
FallbackCommand command = new FallbackCommand("will-you-fallback");
String result = command.execute();
LOG.info("{}", result);
return "FALLBACK".equals(result);
}
});
LOG.info("end");
System.exit(0);
}
}
基本設(shè)計(jì)
接口+Factory+intern構(gòu)成一個(gè)Hystrix的全局注冊(cè)中心
同時(shí),將UnitTest放到類的后面,以便發(fā)布時(shí),將測(cè)試包含在內(nèi)。
很大量是依賴RxJava庫(kù),其中,Observable,Operator非常經(jīng)典。
HystrixCommand運(yùn)行時(shí),是處于線程池中的。
服務(wù)場(chǎng)景
1.單個(gè)服務(wù)熔斷:一個(gè)服務(wù)正常能夠接受2個(gè)并發(fā)訪問(wèn),測(cè)試4個(gè)并發(fā)訪問(wèn),帶有超時(shí)的場(chǎng)景
2.服務(wù)之間存在依賴的熔斷:服務(wù)A正常接受5個(gè)并發(fā)訪問(wèn),服務(wù)B接受3個(gè)并發(fā)訪問(wèn),服務(wù)C接受2個(gè)并發(fā)訪問(wèn),服務(wù)A依賴B,B依賴C。
單服務(wù)模式下Hystrix的使用測(cè)試
構(gòu)造ThroughputService代表一般的服務(wù),該服務(wù)能夠在一定時(shí)間內(nèi)返回,同時(shí),有一個(gè)支持的最大并發(fā)量上限
package com.fulton_shaw.common.lang.testing;
import com.fulton_shaw.common.lang.FixedValueGetter;
import com.fulton_shaw.common.lang.ValueGetter;
import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiaohuadong
* @date 2018/11/08
*/
@ThreadSafe
public class ThroughPutService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(ThroughPutService.class);
/* 服務(wù)時(shí)間 , 單位:ms 小于等于0時(shí),無(wú)需等待 */
private ValueGetter<Long> serviceTimeStrategy;
/* 最大并發(fā)數(shù) , 小于等于0時(shí),無(wú)限制*/
private final int maxConcurrent;
/* 服務(wù)調(diào)用 */
private Runnable beforeService;
private Runnable afterService;
/* 服務(wù)拒絕時(shí)調(diào)用 */
private Runnable onDeny;
/* 服務(wù)異常時(shí)調(diào)用 */
private Function<Exception,Void> onException;
private AtomicInteger currentConcurrent;
/**
* 調(diào)用服務(wù)
*
* @return 服務(wù)是否成功
*/
public boolean tryService() {
boolean canService = false;
LOG.debug("{} --> beginning tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
if (maxConcurrent <= 0) {
currentConcurrent.incrementAndGet();
canService = true;
} else {
// CAS
while (true) {
int old = currentConcurrent.get();
if(old >= maxConcurrent){
break;
}else if(currentConcurrent.compareAndSet(old, old + 1)){
canService = true;
break;
}
}
}
try {
if (canService) {
LOG.debug("{} --> gain tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
beforeService.run();
Long sleep = serviceTimeStrategy.get();
LOG.debug("{} --> will sleep {} ms",ThreadUtils.getCurrentThreadName(),sleep);
ThreadUtils.sleep(sleep);
currentConcurrent.decrementAndGet();
afterService.run();
} else {
onDeny.run();
}
}catch (Exception e){
try {
onException.apply(e);
}catch (Exception ae){
LOG.error("error happened while handing {}",e.toString(),ae);
}
}
LOG.debug("{} --> end tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
return canService;
}
}
ThroughPutService類的tryService方法嘗試調(diào)用一個(gè)服務(wù),如果當(dāng)前并發(fā)量尚未達(dá)到上限,則服務(wù),否則,調(diào)用onDeny。若有錯(cuò)誤發(fā)生,則調(diào)用onException。
下面是對(duì)該Service的一個(gè)測(cè)試
public class ThroughPutServiceMain {
public static void main(String[] args) {
final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(300, 500), 2);
ThreadUtils.startThreadForTimesAndWait(5, new ProcessMethod<Integer>() {
@Nullable
@Override
public void apply(@Nullable Integer input) {
service.tryService();
}
});
}
}
serivce的服務(wù)時(shí)間在300~500毫秒之間,隨機(jī)返回。最大允許的并發(fā)量是2。測(cè)試程序開(kāi)啟了5個(gè)線程來(lái)消費(fèi)這個(gè)服務(wù),下面的結(jié)果顯示,只有其中兩個(gè)線程得到服務(wù),其他均被拒絕。
得到服務(wù)的是Thread-2和Thread-3, gain tryService.
輸出結(jié)果:
15:39:46.789 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> beginning tryService,current concurrent = 0
15:39:46.791 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> will sleep 415 ms
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> will sleep 461 ms
15:39:47.206 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> end tryService,current concurrent = 1
15:39:47.252 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> end tryService,current concurrent = 0
ThroughPutService不能服務(wù)的情況有一種:就是最大并發(fā)量。
但是對(duì)于客戶端而言,ThroughPutService不能服務(wù)還包括一種情況:ThroughPutService本身的服務(wù)質(zhì)量--響應(yīng)時(shí)間超時(shí)。
HystrixCommand封裝ThroughPutService
我們首先識(shí)別出ThroughPutService服務(wù)不可用的情況,就是:1.超過(guò)服務(wù)的最大并發(fā)量 2.服務(wù)本身超時(shí)
在下面的Command的封裝中,run會(huì)調(diào)用service,但是service可能返回false,因此,在返回false的情況下,我們應(yīng)當(dāng)拋出異常,表明服務(wù)不可用,迫使Hystrix轉(zhuǎn)向getFallback()調(diào)用。
此外,即使service正??捎?,但是服務(wù)時(shí)間超時(shí),Hystrix本身會(huì)對(duì)run進(jìn)行超時(shí)判斷,因此,它也會(huì)轉(zhuǎn)向getFallback()
package com.fulton_shaw.third_party_demo.hystrix;
public class SingleServiceCommand extends HystrixCommand<Void> {
private static final Logger LOG = LoggerFactory.getLogger(SingleServiceCommand.class);
private ThroughPutService service;
protected SingleServiceCommand(ThroughPutService service,long time) {
super(HystrixCommand.Setter.
withGroupKey(HystrixCommandGroupKey.Factory.asKey(SingleServiceCommand.class.getSimpleName()))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(true).withExecutionTimeoutInMilliseconds((int)time))
);
this.service = service;
}
@Override
protected Void run() throws Exception {
boolean succeed = service.tryService();
LOG.info("{} --> tryService succeed? {} ",ThreadUtils.getCurrentThreadName(),succeed);
if(!succeed){
throw new RuntimeException(String.format("%s --> try service rejected", ThreadUtils.getCurrentThreadName()));
}
return null;
}
@Override
protected Void getFallback() {
LOG.info("{} --> getting Fallback",Thread.currentThread().getName());
return null;
}
}
測(cè)試封裝的HystrixCommand
package com.fulton_shaw.third_party_demo.hystrix;
public class SingleServiceDemo {
private static final Logger LOG = LoggerFactory.getLogger(SingleServiceDemo.class);
public static void main(String[] args) {
final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(1000, 3000), 2);
final AtomicInteger denied = new AtomicInteger(0);
service.setOnDeny(new Runnable() {
@Override
public void run() {
denied.incrementAndGet();
}
});
int timeout = 2000;
int commandCount = 4;
final SingleServiceCommand[] commands = new SingleServiceCommand[commandCount];
for (int i = 0; i < commandCount; i++) {
commands[i] = new SingleServiceCommand(service,timeout);
}
ThreadUtils.startThreadForTimesAndWait(4, new ProcessMethod<Integer>() {
@Nullable
@Override
public void apply(@Nullable Integer i) {
commands[i].execute();
}
});
LOG.info("concurrent denied = {}", denied.get());
}
}
在上面的程序中,我們新建了一個(gè)服務(wù),隨機(jī)返回時(shí)間是1000-3000ms之間,我們將服務(wù)的超時(shí)時(shí)間設(shè)置為2000ms,然后使用denied變量記錄服務(wù)被拒絕的數(shù)量。然后,我們新建了4個(gè)對(duì)該服務(wù)的調(diào)用實(shí)例,由Hystrix管理。
從下面的輸出結(jié)果中,我們可以看出4和3獲得服務(wù)執(zhí)行,其中4超時(shí),3沒(méi)有超時(shí),1和2均被拒絕服務(wù)。
由于1,2被拒絕服務(wù),因此他們拋出異常之后,Hystrix轉(zhuǎn)向了Fallback,我們可以看到的時(shí),拋出異常之后,getFallback仍然處于同一線程之中。
但是4轉(zhuǎn)向Fallback之后,卻是由HystrixTimer-1來(lái)調(diào)用的,而原來(lái)的線程仍然繼續(xù)運(yùn)行,我們可以看到4最后也輸出了 end tryService.
輸出結(jié)果:
16:22:04.243 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> gain tryService,current concurrent = 1
16:22:04.243 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> will sleep 2665 ms
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> gain tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> will sleep 1284 ms
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> tryService succeed? false
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> tryService succeed? false
16:22:04.245 [hystrix-SingleServiceCommand-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-1 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.245 [hystrix-SingleServiceCommand-2] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-2 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.254 [hystrix-SingleServiceCommand-2] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> getting Fallback
16:22:04.257 [hystrix-SingleServiceCommand-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> getting Fallback
16:22:05.528 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> end tryService,current concurrent = 1
16:22:05.528 [hystrix-SingleServiceCommand-3] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-3 --> tryService succeed? true
16:22:06.231 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> end tryService,current concurrent = 0
16:22:06.231 [hystrix-SingleServiceCommand-4] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-4 --> tryService succeed? true
16:22:06.232 [HystrixTimer-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - HystrixTimer-1 --> getting Fallback
16:22:06.233 [main] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceDemo - concurrent denied = 2
多服務(wù)和服務(wù)存在依賴的場(chǎng)景測(cè)試
2.服務(wù)之間存在依賴的熔斷:服務(wù)A正常接受5個(gè)并發(fā)訪問(wèn),服務(wù)B接受3個(gè)并發(fā)訪問(wèn),服務(wù)C接受2個(gè)并發(fā)訪問(wèn),服務(wù)A依賴B,B依賴C。
只需要將服務(wù)
參考
http://www.itdecent.cn/p/fc19f6ed6d0d
http://www.itdecent.cn/p/05f3e75b3739
Hystrix中的設(shè)計(jì)哲學(xué)
Hystrix使用總結(jié)
execute方法會(huì)開(kāi)啟新的線程執(zhí)行,當(dāng)前線程阻塞等待那個(gè)線程。
所謂信號(hào)量隔離,是指當(dāng)服務(wù)并發(fā)量大于信號(hào)量設(shè)定的值時(shí),自動(dòng)降級(jí)
熔斷是指降級(jí)的終極情況,
- 假設(shè)線路內(nèi)的容量(請(qǐng)求QPS)達(dá)到一定閾值(通過(guò) HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
- 同時(shí),假設(shè)線路內(nèi)的錯(cuò)誤率達(dá)到一定閾值(通過(guò) HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
- 熔斷器將從『閉路』轉(zhuǎn)換成『開(kāi)路』
- 若此時(shí)是『開(kāi)路』狀態(tài),熔斷器將短路后續(xù)所有經(jīng)過(guò)該熔斷器的請(qǐng)求,這些請(qǐng)求直接走『失敗回退邏輯』
- 經(jīng)過(guò)一定時(shí)間(即『休眠窗口』,通過(guò) HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后續(xù)第一個(gè)請(qǐng)求將會(huì)被允許通過(guò)熔斷器(此時(shí)熔斷器處于『半開(kāi)』狀態(tài)),若該請(qǐng)求失敗,熔斷器將又進(jìn)入『開(kāi)路』狀態(tài),且在休眠窗口內(nèi)保持此狀態(tài);若該請(qǐng)求成功,熔斷器將進(jìn)入『閉路』狀態(tài),回到邏輯1循環(huán)往復(fù)。
其他特性
緩存
組合請(qǐng)求
Dashboard監(jiān)控