Sentinel基于并發(fā)線程數(shù)流控驗(yàn)證

一、Sentinel基于并發(fā)線程數(shù)流控

采用基于線程數(shù)的限流模式后,我們不需要再顯式地去進(jìn)行線程池隔離,Sentinel 會控制訪問該資源的線程數(shù),超出的請求直接拒絕,直到堆積的線程處理完成。相當(dāng)于是以資源為維度, 隔離出了每一種資源對應(yīng)的不同線程數(shù)。
例如,當(dāng)應(yīng)用所依賴的下游應(yīng)用由于某種原因?qū)е路?wù)不穩(wěn)定、響應(yīng)延遲增加,對于調(diào)用者來說,意味著吞吐量下降和更多的線程數(shù)占用,極端情況下甚至導(dǎo)致線程池耗盡。為應(yīng)對太多線程占用的情況,業(yè)內(nèi)有使用隔離的方案,比如通過不同業(yè)務(wù)邏輯使用不同線程池來隔離業(yè)務(wù)自身之間的資源爭搶(線程池隔離)。這種隔離方案雖然隔離性比較好,但是代價(jià)就是線程數(shù)目太多,線程上下文切換的 overhead 比較大,特別是對低延時(shí)的調(diào)用有比較大的影響。Sentinel 并發(fā)線程數(shù)限流不負(fù)責(zé)創(chuàng)建和管理線程池,而是簡單統(tǒng)計(jì)當(dāng)前請求上下文的線程數(shù)目,如果超出閾值,新的請求會被立即拒絕,效果類似于信號量隔離。

二、代碼以及事例驗(yàn)證過程

1、初始化限流規(guī)則, 設(shè)置訪問該資源的最大的線程數(shù)為20, 拒絕策略使用RuleConstant.FLOW_GRADE_THREAD, 基于訪問訪問資源的線程數(shù)限流.


private static void initFlowRule() {
    List rules =new ArrayList();
    FlowRule rule1 =new FlowRule();
    rule1.setResource("methodA");
    // set limit concurrent thread for 'methodA' to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

2、同時(shí)啟動100個(gè)線程, 訪問資源, 剛開始時(shí)模擬methodB的業(yè)務(wù)耗時(shí)1s, 當(dāng)運(yùn)行60s后, methodB的業(yè)務(wù)耗時(shí)降低為20ms

public static void main(String[] args) throws Exception {
        System.out.println(
                "MethodA will call methodB. After running for a while, methodB becomes fast, "
                        + "which make methodA also become fast ");
        //設(shè)定時(shí)間,60s后將methodB的業(yè)務(wù)耗時(shí)修改為20ms
        tick();
        //設(shè)置線程數(shù)限流規(guī)則
        initFlowRule();
        //同時(shí)啟動100個(gè)線程, 訪問資源, 剛開始時(shí)模擬methodB的業(yè)務(wù)耗時(shí)1s, 當(dāng)運(yùn)行60s后, methodB的業(yè)務(wù)耗時(shí)降低為20ms
        for (int i = 0; i < threadCount; i++) {
            Thread entryThread = new Thread(() -> {
                while (true) {
                    Entry methodA = null;
                    try {
                        TimeUnit.MILLISECONDS.sleep(5);
                        methodA = SphU.entry("methodA");
                        activeThread.incrementAndGet();
                        Entry methodB = SphU.entry("methodB");
                        //模擬業(yè)務(wù)耗時(shí)
                        TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
                        methodB.exit();
                        pass.addAndGet(1);
                    } catch (BlockException e1) {
                        block.incrementAndGet();
                    } catch (Exception e2) {
                        // biz exception
                    } finally {
                        total.incrementAndGet();
                        if (methodA != null) {
                            methodA.exit();
                            activeThread.decrementAndGet();
                        }
                    }
                }
            });
            entryThread.setName("working thread");
            entryThread.start();
        }
    }

methodA = SphU.entry("methodA");對a進(jìn)行了限流,而methodB只是單純模擬業(yè)務(wù)耗時(shí)。
activeThread.incrementAndGet();是AtomicInteger的原子操作先加1再獲取值。
pass.addAndGet(1);是通過的條數(shù)
block.incrementAndGet();是失敗的條數(shù)
tick()方法開啟了另一個(gè)工作線程是打印日志信息以及在60s后講methodB的業(yè)務(wù)耗時(shí)將為20ms

static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " total qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                        + ", pass:" + oneSecondPass
                        + ", block:" + oneSecondBlock
                        + " activeThread:" + activeThread.get());
                if (seconds-- <= 0) {
                    stop = true;
                }
                if (seconds == 40) {
                    System.out.println("method B is running much faster; more requests are allowed to pass");
                    methodBRunningTime = 20;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                    + ", block:" + block.get());
            System.exit(0);
        }
    }
image.png

活躍線程數(shù)就是最初限流的20,pass數(shù)從20激增到800左右展示了methodB業(yè)務(wù)耗時(shí)20s之后通過訪問資源的請求qps應(yīng)該是顯著增加的,可以看到, 在methodB業(yè)務(wù)耗時(shí)降低的前后, 存活的線程數(shù)activeThread大概都是20個(gè)左右, 在同樣是20個(gè)線程的處理請求時(shí), 很明顯能看到在methodB業(yè)務(wù)耗時(shí)降低前后, pass由原來的20個(gè)左右, 上升到900左右.

image.png
image.png

總結(jié):

Sentinel基于并發(fā)線程數(shù)的流量控制, 能以資源為維度, 為不同的資源, 配置不用的線程數(shù), 控制訪問每個(gè)資源的線程數(shù), 有點(diǎn)類似于信號量Semaphore方式, 信號量的資源數(shù)即為并發(fā)線程數(shù), 比如當(dāng)有請求需要訪問被Sentinel保護(hù)的資源時(shí), 會首先去獲取信號量Semaphore中的資源, 只有成功從semaphore.acquire();獲取資源, 才能訪問應(yīng)用資源.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容