淺談踩坑記之一個(gè)Java線程池參數(shù),差點(diǎn)引起線上事故

原文鏈接:
https://mp.weixin.qq.com/s/ZR6Ikt9Srw55xppjchPiYg

一、 前言

最近對(duì)重構(gòu)Dubbo服務(wù)線程池調(diào)優(yōu),工作線程使用 CachedThreadPool 線程策略,可是上線之后,出現(xiàn)線程池一路上升,差點(diǎn)導(dǎo)致線上事故。

image.png

所以本篇文章對(duì)線程池揭開(kāi)謎底。

二、Dubbo線程池介紹

Dubbo中 CachedThreadPool源代碼

package org.apache.dubbo.common.threadpool.support.cached;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.constants.CommonConstants.ALIVE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ALIVE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CORE_THREADS;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_QUEUES;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREAD_NAME;
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;

/**
 * This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
 * the upcoming request.
 *
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 */
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        //1 獲取線程名稱前綴 如果沒(méi)有 默認(rèn)是Dubbo
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //2\. 獲取線程池核心線程數(shù)大小
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        //3\. 獲取線程池最大線程數(shù)大小,默認(rèn)整型最大值
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        //4\. 獲取線程池隊(duì)列大小
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        //5\. 獲取線程池多長(zhǎng)時(shí)間被回收 單位毫秒
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        //6\. 使用JUC包里的ThreadPoolExecutor創(chuàng)建線程池
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

可以看出,Dubbo本質(zhì)上是使用JUC包里的ThreadPoolExecutor創(chuàng)建線程池,源碼如下

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

大致流程圖如下:

image.png

1、 當(dāng)線程池小于corePoolSize時(shí),新任務(wù)將創(chuàng)建一個(gè)新的線程,即使此時(shí)線程池中存在空閑線程。

2、 當(dāng)線程池達(dá)到corePoolSize時(shí),新提交的任務(wù)將被放入workQueue中,等待線程池任務(wù)調(diào)度執(zhí)行。

3、 當(dāng)workQueue已滿,且maximumPoolSize>corePoolSize時(shí),新任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)。

4、 當(dāng)提交任務(wù)數(shù)超過(guò)maximumPoolSize時(shí),新提交任務(wù)由RejectedExecutionHandler處理。

5、 當(dāng)線程池中超過(guò)corePoolSize時(shí),空閑時(shí)間達(dá)到keepAliveTime時(shí),關(guān)閉空閑線程。

另外,當(dāng)設(shè)置了allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉。

RejectedExecutionHandler 默認(rèn)提供了四種拒絕策略

1、AbortPolicy策略:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作;

2、CallerRunsPolicy策略:如果線程池的線程數(shù)量達(dá)到上限,該策略會(huì)把任務(wù)隊(duì)列中的任務(wù)放在調(diào)用者線程當(dāng)中運(yùn)行;

3、DiscardOledestPolicy策略:該策略會(huì)丟棄任務(wù)隊(duì)列中最老的一個(gè)任務(wù),也就是當(dāng)前任務(wù)隊(duì)列中最先被添加進(jìn)去的,馬上要被執(zhí)行的那個(gè)任務(wù),并嘗試再次提交。

4、DiscardPolicy策略:該策略會(huì)默默丟棄無(wú)法處理的任務(wù),不予任何處理。當(dāng)然使用此策略,業(yè)務(wù)場(chǎng)景中需允許任務(wù)的丟失;

值得注意的是,Dubbo中拒絕策略 AbortPolicyWithReport 實(shí)際上是繼承了 ThreadPoolExecutor.AbortPolicy 策略,主要是多打印了一些關(guān)鍵信息和堆棧信息。

三、 關(guān)于線程池配置

線程池配置非常重要,但是往往很容易忽視,配置不合理或者線程池復(fù)用次數(shù)少,依然會(huì)頻繁的創(chuàng)建和銷戶。

  1. 如何合理計(jì)算核心線程數(shù)?

我們可以通過(guò)接口平均響應(yīng)時(shí)間和服務(wù)需要支撐的QPS計(jì)算 例如: 我們接口平均RT 0.005s,那么,一個(gè)工作線程可以處理任務(wù)數(shù)200 如果單機(jī)需要支撐QPS 3W,那么可以計(jì)算出 需要核心線程數(shù) 150

即公式: QPS ? (1 ? 平均RT) = QPS * RT

  1. 容易忽視的 @Async 注解

Spring中使用 @Async 注解 默認(rèn)線程池是 SimpleAsyncTaskExecutor,默認(rèn)情況下如果沒(méi)有配置等于沒(méi)有使用線程池,因?yàn)樗看味紩?huì)重新創(chuàng)建一個(gè)新的線程,不會(huì)復(fù)用。

所以切記,如果使用@Async 一定要配置.

@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {
    private static final int corePoolSize = 100;             // 核心線程數(shù)(默認(rèn)線程數(shù))
    private static final int maxPoolSize = 400;             // 最大線程數(shù)
    private static final int keepAliveTime = 60;            // 允許線程空閑時(shí)間(單位:默認(rèn)為秒)
    private static final int queueCapacity = 0;         // 緩沖隊(duì)列數(shù)
    private static final String threadNamePrefix = "Async-Service-"; // 線程池名前綴

    @Bean("taskExecutor") 
    public ThreadPoolTaskExecutor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

四、 線程池飆升如何引起的?

Dubbo服務(wù)端工作線程我們配置如下:

corethreads: 150
threads: 800
threadpool: cached
queues: 10

看上去是不是挺合理的,設(shè)置很小隊(duì)列數(shù),是為了防止抖動(dòng)引起短暫線程池不足情況。從上面看,貌似也沒(méi)什么問(wèn)題,從白天業(yè)務(wù)量來(lái)說(shuō)核心線程數(shù)是完全夠用的(RT<5ms, QPS<1w)??墒巧暇€之后,線程池一路飆升,最大達(dá)到閾值最大值800, 報(bào)警信息如下:

org.apache.dubbo.remoting.RemotingException("Server side(IP,20880) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-IP:20880, Pool Size: 800 (active: 4, core: 300, max: 800, largest: 800), Task: 4101304 (completed: 4101301), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://IP:20880!"

從上可以看出,到達(dá)最大線程數(shù)時(shí),active線程數(shù)是很少的,這完全不符合預(yù)期。

五、場(chǎng)景模擬

由源碼

 queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues))

可知:

當(dāng)隊(duì)列元素為0時(shí),阻塞隊(duì)列使用的是SynchronousQueue;當(dāng)隊(duì)列元素小于0時(shí),使用的是無(wú)界阻塞隊(duì)列LinkedBlockingQueue;當(dāng)隊(duì)列元素大于0時(shí),使用的是有界的隊(duì)列LinkedBlockingQueue。

核心線程數(shù)和最大線程數(shù)肯定不會(huì)有問(wèn)題,所以我猜想是否隊(duì)列數(shù)設(shè)置是否有問(wèn)題。

為了復(fù)現(xiàn),我寫(xiě)了個(gè)簡(jiǎn)單的代碼模擬

package com.bytearch.fast.cloud;

import java.util.concurrent.*;

public class TestThreadPool {

    public final static int queueSize = 10;
    public static void main(String[] args) {
        ExecutorService executorService = getThreadPool(queueSize);
        for (int i = 0; i < 100000; i++) {
            int finalI = i;

            try {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        doSomething(finalI);
                    }
                });
            } catch (Exception e) {
                System.out.println("emsg:" + e.getMessage());
            }
            if (i % 20 == 0) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        System.out.println("all done!");
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public static ExecutorService getThreadPool(int queues) {
        int cores = 150;
        int threads = 800;
        int alive = 60 * 1000;

        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)));
    }

    public static void doSomething(final int i) {
        try {
            Thread.sleep(5);
            System.out.println("thread:" + Thread.currentThread().getName() +  ", active:" + Thread.activeCount() + ", do:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

模擬結(jié)果:

queueSize值現(xiàn)象0沒(méi)有出現(xiàn)異常10出現(xiàn)拒絕異常100沒(méi)有出現(xiàn)異常

異常如下:

emsg:Task com.bytearch.fast.cloud.TestThreadPool$1@733aa9d8 rejected from java.util.concurrent.ThreadPoolExecutor@6615435c[Running, pool size = 800, active threads = 32, queued tasks = 9, completed tasks = 89755]
all done!

很顯然,當(dāng)并發(fā)較高時(shí),使用LinkedBlockingQueue有界隊(duì)列, 隊(duì)列數(shù)設(shè)置相對(duì)較小時(shí),線程池會(huì)出現(xiàn)問(wèn)題。

將queues配置改為0后上線,恢復(fù)正常。

至于更深層原因,感興趣同學(xué)可以深度分析下,也可以公眾號(hào)后臺(tái)與我交流。

六、總結(jié)

這次分享了線程池 ThreadPoolExecutor 基本原理、線程池配置計(jì)算方式,和容易忽視的使用注解@Async 配置問(wèn)題。

另外介紹了下我們使用線程池遇到的詭異問(wèn)題,一個(gè)參數(shù)問(wèn)題,可能導(dǎo)致不可預(yù)期的后果。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容