[Dubbo]基礎(chǔ)組件之ThreadPool

介紹

ThreadPool 我們在開發(fā)過程中經(jīng)常使用,java線程池的相關(guān)知識(shí)見
線程池相關(guān)文章
dubbo也不例外會(huì)使用線程池,見dubbo線程池

看完本文章主要學(xué)習(xí):

  • dubbo的線程池是如何實(shí)現(xiàn)
  • dubbo線程如何配置
  • 我們自己是先線程池注意事項(xiàng)

使用方式

需要通過不同的派發(fā)策略和不同的線程池配置的組合來應(yīng)對不同的場景:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

服務(wù)提供者

<dubbo:provider>標(biāo)簽下配置threadpool 屬性,默認(rèn)是fixed

詳細(xì)說明

源碼位置

634200dd16f92abe846572378c902729.jpg

繼承關(guān)系

屏幕快照 2018-09-10 下午9.26.44.png

Cached:緩存線程池,空閑一分鐘自動(dòng)刪除,需要時(shí)重建
Fixed:固定大小的線程池,默認(rèn)
Limited:可伸縮線程池,但池中的線程數(shù)只會(huì)增長不會(huì)收縮。只增長不收縮的目的是為了避免收縮時(shí)突然來了大流量引起的性能問題。

ThreadPool

/**
 * ThreadPool
 * 
 * @author william.liangf
 */
@SPI("fixed")
public interface ThreadPool {
    
    /**
     * 線程池
     * 
     * @param url 線程參數(shù)
     * @return 線程池
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

ThreadPool$Adaptive

啟動(dòng)的時(shí)候創(chuàng)建的適配對象
注:Adaptive注解在類上和方法上面表達(dá)的意義不一樣

注解在類上:代表人工實(shí)現(xiàn),實(shí)現(xiàn)一個(gè)裝飾類(設(shè)計(jì)模式中的裝飾模式)
注解在方法上:代表自動(dòng)生成和編譯一個(gè)動(dòng)態(tài)的Adpative類,它主要是用于SPI,因?yàn)閟pi的類是不固定、未知的擴(kuò)展類,所以設(shè)計(jì)了動(dòng)態(tài)$Adaptive類.

public class ThreadPool$Adaptive implements com.alibaba.dubbo.common.threadpool.ThreadPool {
    @Override
    public java.util.concurrent.Executor getExecutor(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("threadpool", "fixed");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.common.threadpool.ThreadPool) name from url(" + url.toString() + ") use keys([threadpool])");
        ThreadPool extension = ExtensionLoader.getExtensionLoader(ThreadPool.class).getExtension(extName);
        return extension.getExecutor(arg0);
    }
}

FixedThreadPool

/**
 * Creates a thread pool that reuses a fixed number of threads
 * 固定線程池
 *
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 */
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 線程名
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 固定線程數(shù)
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 隊(duì)列大小
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 線程池執(zhí)行器
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

CachedThreadPool

/**
 * This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
 * the upcoming request.
 * 緩存線程池,空閑一分鐘自動(dòng)刪除,需要時(shí)重建
 *
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 */
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 線程池名稱
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心線程池個(gè)數(shù)
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大線程池個(gè)數(shù)
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        //隊(duì)列個(gè)數(shù)
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 存活時(shí)間,默認(rèn)是毫秒數(shù)
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

LimitedThreadPool

/**
 * Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
 * automatically.
 * 可伸縮線程池,但池中的線程數(shù)只會(huì)增長不會(huì)收縮。只增長不收縮的目的是為了避免收縮時(shí)突然來了大流量引起的性能問題。
 */
public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 線程名
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心線程池個(gè)數(shù)
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大線程池個(gè)數(shù)
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 隊(duì)列大小
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 線程池執(zhí)行器
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

AbortPolicyWithReport

帶打印異常堆棧的拒絕策略

@Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        // 將異常拋出去
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
        long now = System.currentTimeMillis();

        //dump every 10 minutes 10分鐘打印一次
        if (now - lastPrintTime < 10 * 60 * 1000) {
            return;
        }

        if (!guard.tryAcquire()) {
            return;
        }

        // 異步打印
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                //獲取打印的目錄
                String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));

                SimpleDateFormat sdf;
                //系統(tǒng)
                String OS = System.getProperty("os.name").toLowerCase();

                // window system don't support ":" in file name
                if(OS.contains("win")){
                    sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                }else {
                    sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                }

                String dateStr = sdf.format(new Date());
                FileOutputStream jstackStream = null;
                try {
                    jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
                    // 打印 JStack
                    JVMUtil.jstack(jstackStream);
                } catch (Throwable t) {
                    logger.error("dump jstack error", t);
                } finally {
                    guard.release();
                    if (jstackStream != null) {
                        try {
                            jstackStream.flush();
                            jstackStream.close();
                        } catch (IOException e) {
                        }
                    }
                }

                lastPrintTime = System.currentTimeMillis();
            }
        });

    }

總結(jié)

dubbo客戶端使用的是 shared類型的線程池

 public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
        super(url, wrapChannelHandler(url, handler));
    }
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

    public static final String DEFAULT_CLIENT_THREADPOOL = "shared";

服務(wù)提供者使用的是fixed方式的線程池

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容