一、Sentinel基于并發(fā)線程數(shù)流控
采用基于線程數(shù)的限流模式后,我們不需要再顯式地去進(jìn)行線程池隔離,Sentinel 會控制訪問該資源的線程數(shù),超出的請求直接拒絕,直到堆積的線程處理完成。相當(dāng)于是以資源為維度, 隔離出了每一種資源對應(yīng)的不同線程數(shù)。
例如,當(dāng)應(yīng)用所依賴的下游應(yīng)用由于某種原因?qū)е路?wù)不穩(wěn)定、響應(yīng)延遲增加,對于調(diào)用者來說,意味著吞吐量下降和更多的線程數(shù)占用,極端情況下甚至導(dǎo)致線程池耗盡。為應(yīng)對太多線程占用的情況,業(yè)內(nèi)有使用隔離的方案,比如通過不同業(yè)務(wù)邏輯使用不同線程池來隔離業(yè)務(wù)自身之間的資源爭搶(線程池隔離)。這種隔離方案雖然隔離性比較好,但是代價(jià)就是線程數(shù)目太多,線程上下文切換的 overhead 比較大,特別是對低延時(shí)的調(diào)用有比較大的影響。Sentinel 并發(fā)線程數(shù)限流不負(fù)責(zé)創(chuàng)建和管理線程池,而是簡單統(tǒng)計(jì)當(dāng)前請求上下文的線程數(shù)目,如果超出閾值,新的請求會被立即拒絕,效果類似于信號量隔離。
二、代碼以及事例驗(yàn)證過程
1、初始化限流規(guī)則, 設(shè)置訪問該資源的最大的線程數(shù)為20, 拒絕策略使用RuleConstant.FLOW_GRADE_THREAD, 基于訪問訪問資源的線程數(shù)限流.
private static void initFlowRule() {
List rules =new ArrayList();
FlowRule rule1 =new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
2、同時(shí)啟動100個(gè)線程, 訪問資源, 剛開始時(shí)模擬methodB的業(yè)務(wù)耗時(shí)1s, 當(dāng)運(yùn)行60s后, methodB的業(yè)務(wù)耗時(shí)降低為20ms
public static void main(String[] args) throws Exception {
System.out.println(
"MethodA will call methodB. After running for a while, methodB becomes fast, "
+ "which make methodA also become fast ");
//設(shè)定時(shí)間,60s后將methodB的業(yè)務(wù)耗時(shí)修改為20ms
tick();
//設(shè)置線程數(shù)限流規(guī)則
initFlowRule();
//同時(shí)啟動100個(gè)線程, 訪問資源, 剛開始時(shí)模擬methodB的業(yè)務(wù)耗時(shí)1s, 當(dāng)運(yùn)行60s后, methodB的業(yè)務(wù)耗時(shí)降低為20ms
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(() -> {
while (true) {
Entry methodA = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
methodA = SphU.entry("methodA");
activeThread.incrementAndGet();
Entry methodB = SphU.entry("methodB");
//模擬業(yè)務(wù)耗時(shí)
TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
methodB.exit();
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (methodA != null) {
methodA.exit();
activeThread.decrementAndGet();
}
}
}
});
entryThread.setName("working thread");
entryThread.start();
}
}
methodA = SphU.entry("methodA");對a進(jìn)行了限流,而methodB只是單純模擬業(yè)務(wù)耗時(shí)。
activeThread.incrementAndGet();是AtomicInteger的原子操作先加1再獲取值。
pass.addAndGet(1);是通過的條數(shù)
block.incrementAndGet();是失敗的條數(shù)
tick()方法開啟了另一個(gè)工作線程是打印日志信息以及在60s后講methodB的業(yè)務(wù)耗時(shí)將為20ms
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " total qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock
+ " activeThread:" + activeThread.get());
if (seconds-- <= 0) {
stop = true;
}
if (seconds == 40) {
System.out.println("method B is running much faster; more requests are allowed to pass");
methodBRunningTime = 20;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}

活躍線程數(shù)就是最初限流的20,pass數(shù)從20激增到800左右展示了methodB業(yè)務(wù)耗時(shí)20s之后通過訪問資源的請求qps應(yīng)該是顯著增加的,可以看到, 在methodB業(yè)務(wù)耗時(shí)降低的前后, 存活的線程數(shù)activeThread大概都是20個(gè)左右, 在同樣是20個(gè)線程的處理請求時(shí), 很明顯能看到在methodB業(yè)務(wù)耗時(shí)降低前后, pass由原來的20個(gè)左右, 上升到900左右.


總結(jié):
Sentinel基于并發(fā)線程數(shù)的流量控制, 能以資源為維度, 為不同的資源, 配置不用的線程數(shù), 控制訪問每個(gè)資源的線程數(shù), 有點(diǎn)類似于信號量Semaphore方式, 信號量的資源數(shù)即為并發(fā)線程數(shù), 比如當(dāng)有請求需要訪問被Sentinel保護(hù)的資源時(shí), 會首先去獲取信號量Semaphore中的資源, 只有成功從semaphore.acquire();獲取資源, 才能訪問應(yīng)用資源.