Noseparte說(shuō):網(wǎng)絡(luò)游戲中的網(wǎng)關(guān)線程池是如何創(chuàng)建的

Java 線程池的正確使用姿勢(shì)

線程池 ThreadPool

1. 線程池的定義:

(摘自職Q)在面向?qū)ο缶幊讨?,?chuàng)建和銷(xiāo)毀對(duì)象是很費(fèi)時(shí)間的,因?yàn)閯?chuàng)建一個(gè)對(duì)象要獲取內(nèi)存資源或者其它更多資源。在Java中更是如此,虛擬機(jī)將試圖跟蹤每一個(gè)對(duì)象,以便能夠在對(duì)象銷(xiāo)毀后進(jìn)行垃圾回收。所以提高服務(wù)程序效率的一個(gè)手段就是盡可能減少創(chuàng)建和銷(xiāo)毀對(duì)象的次數(shù),特別是一些很耗資源的對(duì)象創(chuàng)建和銷(xiāo)毀,這就是"池化資源"技術(shù)產(chǎn)生的原因。線程池顧名思義就是事先創(chuàng)建若干個(gè)可執(zhí)行的線程放入一個(gè)池(容器)中,需要的時(shí)候從池中獲取線程不用自行創(chuàng)建,使用完畢不需要銷(xiāo)毀線程而是放回池中,從而減少創(chuàng)建和銷(xiāo)毀線程對(duì)象的開(kāi)銷(xiāo)。

2. 如何創(chuàng)建線程池:

  • 使用 ThreadPoolExecutor:
    ThreadPoolExecutor是一個(gè)靈活的、穩(wěn)定的線程池,允許進(jìn)行定制。
  • 使用 Executors:
    Executors中的靜態(tài)工廠方法之一來(lái)創(chuàng)建線程池:
    newSingleThreadExecutor: 是一個(gè)單線程的Executor,它創(chuàng)建單個(gè)工作者線程來(lái)執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,會(huì)創(chuàng)建另一個(gè)線程來(lái)替代。newSingleThreadExecutor能確保依照任務(wù)在隊(duì)列中的順序來(lái)串行執(zhí)行(例如 FIFO、LIFO、優(yōu)先級(jí))。
    newFixedThreadPool: 將創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,每當(dāng)提交一個(gè)任務(wù)時(shí)就創(chuàng)建一個(gè)線程,直到達(dá)到線程池的最大數(shù)量,這時(shí)線程池的規(guī)模將不再發(fā)生變化(如果某個(gè)線程由于發(fā)生了未預(yù)期的Exception而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新的線程)。
    newCachedThreadPool: 將創(chuàng)建一個(gè)可緩存的線程池,如果線程池的當(dāng)前規(guī)模超過(guò)了處理需求時(shí),那么將回收空閑的線程,而當(dāng)需求增加時(shí),則可以添加新的線程,線程池規(guī)模不存在任何限制。
    newScheduledThreadExecutor: 創(chuàng)建了一個(gè)固定長(zhǎng)度的線程池,而且以延遲或定時(shí)的方式來(lái)執(zhí)行任務(wù),類(lèi)似于Timer。

配置 ThreadPoolExecutor

public class ThreadPoolExecutor {
    
    // 線程池維護(hù)的最小線程數(shù)
    private volatile int corePoolSize;
    // 線程池可容納線程數(shù)的最大值
    private volatile int maximumPoolSize;
    // 線程池達(dá)到閾值后,新的線程需要等待的時(shí)間
    private volatile long keepAliveTime;
    // 以工廠模式創(chuàng)建新的線程
    private volatile ThreadFactory threadFactory;
    // 上下文
    private final AccessControlContext acc;
    // 阻塞隊(duì)列
    private final BlockingQueue<Runnable> workQueue;
    // 拒絕策略
    private volatile RejectedExecutionHandler handler;
    
    /**
     * ThreadPoolExecutor的核心構(gòu)造器
     */
    public ThreadPoolExecutor(int corePoolSize, 
                                       int maximumPoolSize, 
                                       long keepAliveTime,
                                       TimeUnit unit,  
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
           if (corePoolSize < 0 ||    
              maximumPoolSize <= 0 ||   
              maximumPoolSize < corePoolSize ||    
              keepAliveTime < 0)    throw new IllegalArgumentException();
           if (workQueue == null || threadFactory == null || handler == null)    
              throw new NullPointerException();
           this.acc = System.getSecurityManager() == null ?     
                    null :        
                    AccessController.getContext();
           this.corePoolSize = corePoolSize;
           this.maximumPoolSize = maximumPoolSize;
           this.workQueue = workQueue;
           this.keepAliveTime = unit.toNanos(keepAliveTime);
           this.threadFactory = threadFactory;
           this.handler = handler;
    }
    
}

管理任務(wù)隊(duì)列 BlockingQueue

ThreadPoolExecutor允許提供一個(gè)BlockingQueue來(lái)保存等待執(zhí)行的任務(wù)?;镜娜蝿?wù)排隊(duì)方法有3種:無(wú)界隊(duì)列、有界隊(duì)列和同步移交(Synchronous Handoff)。

  • 無(wú)界隊(duì)列: 隊(duì)列大小無(wú)限制,常用的為無(wú)界的LinkedBlockingQueue,使用該隊(duì)列做為阻塞隊(duì)列時(shí)要尤其當(dāng)心,當(dāng)任務(wù)耗時(shí)較長(zhǎng)時(shí)可能會(huì)導(dǎo)致大量新任務(wù)在隊(duì)列中堆積最終導(dǎo)致OOM。
    閱讀代碼發(fā)現(xiàn),Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而樓主踩到的就是這個(gè)坑,當(dāng)QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務(wù)被添加到這個(gè)無(wú)界LinkedBlockingQueue 中,導(dǎo)致cpu和內(nèi)存飆升服務(wù)器掛掉。
  • 有界隊(duì)列: 常用的有兩類(lèi),
    一類(lèi)是遵循FIFO原則的隊(duì)列如ArrayBlockingQueue與有界的LinkedBlockingQueue,
    另一類(lèi)是優(yōu)先級(jí)隊(duì)列如PriorityBlockingQueue。PriorityBlockingQueue中的優(yōu)先級(jí)由任務(wù)的Comparator決定。
    使用有界隊(duì)列時(shí)隊(duì)列大小需和線程池大小互相配合,線程池較小有界隊(duì)列較大時(shí)可減少內(nèi)存消耗,降低cpu使用率和上下文切換,但是可能會(huì)限制系統(tǒng)吞吐量。在我們的修復(fù)方案中,選擇的就是這個(gè)類(lèi)型的隊(duì)列,雖然會(huì)有部分任務(wù)被丟失,但是我們線上是排序日志搜集任務(wù),所以對(duì)部分對(duì)丟失是可以容忍的。
  • 同步移交隊(duì)列: 如果不希望任務(wù)在隊(duì)列中等待而是希望將任務(wù)直接移交給工作線程,可使用SynchronousQueue作為等待隊(duì)列。SynchronousQueue不是一個(gè)真正的隊(duì)列,而是一種線程之間移交的機(jī)制。要將一個(gè)元素放入SynchronousQueue中,必須有另一個(gè)線程正在等待接收這個(gè)元素。只有在使用無(wú)界線程池或者有飽和策略時(shí)才建議使用該隊(duì)列。

飽和策略 RejectedExecutionHandler

ThreadPoolExecutor提供如下4種飽和策略:

  • CallerRunsPolicy 由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
  • AbortPolicy 丟棄任務(wù)并直接拋出RejectedExecutionException異常(默認(rèn)的線程池拒絕策略)
  • DiscardPolicy 僅丟棄任務(wù)并不拋出異常
  • DiscardOldestPolicy 丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)

自定義飽和策略,只需實(shí)現(xiàn)RejectedExecutionHandler接口并重寫(xiě)void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法

public class ThreadPoolExecutor{

    /** 
     *  默認(rèn)的線程池拒絕策略 AbortPolicy
     */
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    /* ThreadPoolExecutor提供如下4中拒絕策略: */
    /**
     * 由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
     */
   public static class CallerRunsPolicy implements RejectedExecutionHandler {}
 
    /** 
     *  丟棄任務(wù)并直接拋出RejectedExecutionException異常
     */
   public static class AbortPolicy implements RejectedExecutionHandler {}
 
   /** 
    * 僅丟棄任務(wù)并不拋出異常
    */
   public static class DiscardPolicy implements RejectedExecutionHandler {}
   
   /** 
    * 丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
    */
   public static class DiscardOldestPolicy implements RejectedExecutionHandler {}

}

Executors(不推薦)

在阿里巴巴Java開(kāi)發(fā)手冊(cè)中提到,使用Executors創(chuàng)建線程池可能會(huì)導(dǎo)致OOM(OutOfMemory ,內(nèi)存溢出)

BlockingQueue致使OOM示意圖

ExecutorService

public interface ExecutorService extends Executor {  
     void shutdown();    
     List<Runnable> shutdownNow();    
     boolean isShutdown();    
     boolean isTerminated();    
     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
     // .......其他用于任務(wù)提交的方法   
}    

為了解決執(zhí)行服務(wù)的生命周期問(wèn)題,
ExecutorService拓展了Executor接口,添加了一些用于生命周期管理的方法。
ExecutorService的生命周期有3種狀態(tài):運(yùn)行、關(guān)閉和已終止。
ExecutorService在初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。
shutdown方法將執(zhí)行平緩的關(guān)閉過(guò)程:不再接受新的任務(wù),同時(shí)等待已經(jīng)提交的任務(wù)執(zhí)行完成——包括那些還未開(kāi)始執(zhí)行的任務(wù)。
shutdownNow方法將執(zhí)行粗暴的關(guān)閉過(guò)程:它將嘗試取消所有運(yùn)行中的任務(wù),并且不再啟動(dòng)隊(duì)列中尚未開(kāi)始執(zhí)行的任務(wù)。

ThreadFactory

DefaultThreadFactory


/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {  

        private static final AtomicInteger poolNumber = new AtomicInteger(1);    
        private final ThreadGroup group;    
        private final AtomicInteger threadNumber = new AtomicInteger(1);    
        private final String namePrefix;    
    
        DefaultThreadFactory() {        
            SecurityManager s = System.getSecurityManager();        
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();        
            namePrefix = "pool-" +                      
                              poolNumber.getAndIncrement() +                     
                              "-thread-";    
        }    
    
        public Thread newThread(Runnable r) {       
            Thread t = new Thread(group, r,                              
                            namePrefix + threadNumber.getAndIncrement(),
                            0);
            if (t.isDaemon())            
                t.setDaemon(false);       
            if (t.getPriority() != Thread.NORM_PRIORITY)      
                t.setPriority(Thread.NORM_PRIORITY);       
            return t;   
        }
}

PrivilegedThreadFactory

/**
 * 權(quán)限訪問(wèn)與類(lèi)加載
 */
static class PrivilegedThreadFactory extends DefaultThreadFactory {    

    private final AccessControlContext acc;    
    private final ClassLoader ccl;    
    PrivilegedThreadFactory() {        
        super();        
        SecurityManager sm = System.getSecurityManager();        
        if (sm != null) {            
            // Calls to getContextClassLoader from this class           
            // never trigger a security check, but we check            
            // whether our callers have this permission anyways.            
            sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);            
            // Fail fast            
            sm.checkPermission(new RuntimePermission("setContextClassLoader"));        
        }        
        this.acc = AccessController.getContext();        
        this.ccl = Thread.currentThread().getContextClassLoader();    
    }    
    
    public Thread newThread(final Runnable r) {        
        return super.newThread(new Runnable() {            
            public void run() {                
                AccessController.doPrivileged(new PrivilegedAction<Void>() {                    
                    public Void run() {   
                        Thread.currentThread().setContextClassLoader(ccl);                        
                        r.run();                        
                         return null;                   
                    }                
                }, acc);            
            }        
        });    
    }
}

使用guava的 ThreadFactoryBuilder


public class ThreadFactoryBuilder{

    private static ThreadFactory doBuild(ThreadFactoryBuilder builder) {  
        final String nameFormat = builder.nameFormat;  
        final Boolean daemon = builder.daemon;  
        final Integer priority = builder.priority;  
        final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;  
        final ThreadFactory backingThreadFactory =      
             (builder.backingThreadFactory != null)          
                ? builder.backingThreadFactory         
                : Executors.defaultThreadFactory();  
        final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;  
        return new ThreadFactory() {    
            @Override    
            public Thread newThread(Runnable runnable) {      
                Thread thread = backingThreadFactory.newThread(runnable);      
                if (nameFormat != null) {        
                    thread.setName(format(nameFormat, count.getAndIncrement()));      
                }     
                if (daemon != null) {// 守護(hù)線程        
                    thread.setDaemon(daemon);     
                }     
                if (priority != null) {// 優(yōu)先級(jí)        
                    thread.setPriority(priority);      
                }      
                if (uncaughtExceptionHandler != null) {     
                    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);     
                }      
                return thread;   
            } 
        };
    }
}

創(chuàng)建線程池的正確姿勢(shì)

/** 
 * @Auther: Noseparte * @Date: 2019/11/27 10:35 
 * @Description: 
 * 
 *          <p>定制協(xié)議網(wǎng)關(guān)線程池</p> 
 */
public class ThreadPool {    

    protected final static Logger _LOG = LogManager.getLogger(ThreadPool.class);    
    private List<ExecutorService> workers = new ArrayList<>();    
    private int threadCount;    
    private ThreadFactory threadFactory;
    
    public ThreadPool(int threadCount) {        
        this(threadCount, new UserThreadFactory("網(wǎng)關(guān)游戲邏輯協(xié)議線程池"));    
    }    
    
    public ThreadPool(int threadCount, ThreadFactory threadFactory) {        
        this.threadCount = threadCount;        
        this.threadFactory = threadFactory;        
        if (threadCount <= 0 || null == threadFactory)            
            throw new IllegalArgumentException();        
            for (int i = 0; i < threadCount; i++) {            
                workers.add(new ThreadPoolExecutor(threadCount, 200,                    
                    0L, TimeUnit.MILLISECONDS,                    
                    new LinkedBlockingQueue<Runnable>(1024),
                    threadFactory, 
                    new ThreadPoolExecutor.AbortPolicy()));        
            }    
    }    
    
    public Future execute(Runnable task, int mold) {        
        int index = Math.abs(mold) % threadCount;        
        ExecutorService executor = workers.get(index);        
        if (null == executor) {           
            _LOG.error("sid=" + mold + ", tid=" + index);            
            return null;       
        }        
        return executor.submit(task);    
    }    
    
    public void shutdown() {       
        int count = 0;        
        for (ExecutorService worker : workers) {            
            _LOG.error("close thread{}.", ++count);            
            worker.shutdown();        
        }   
    }    
    
    static class UserThreadFactory implements ThreadFactory {        
        private static final AtomicInteger poolNumber = new AtomicInteger(1);        
        private final ThreadGroup group;        
        private final AtomicInteger threadNumber = new AtomicInteger(1);        
        private final String namePrefix;        
        
        UserThreadFactory(String poolName) {            
            SecurityManager s = System.getSecurityManager();            
            group = (s != null) ? s.getThreadGroup() :  
                    Thread.currentThread().getThreadGroup();            
            namePrefix = poolName + "-" +                    
                    poolNumber.getAndIncrement() +                    
                    "-thread-";        
        }     
        
        public Thread newThread(Runnable r) {            
            Thread t = new Thread(group, r,                    
                namePrefix + threadNumber.getAndIncrement(),                    
                0);            
            if (t.isDaemon())                
                t.setDaemon(false);            
            if (t.getPriority() != Thread.NORM_PRIORITY)   
                t.setPriority(Thread.NORM_PRIORITY);            
            return t;        
        }   
        
    }
}

總結(jié)

創(chuàng)建線程池的注意事項(xiàng):

  1. 根據(jù)業(yè)務(wù)場(chǎng)景定制ThreadFactory、飽和策略、任務(wù)隊(duì)列、ThreadPoolExecutor
  2. 注意BlockingQueue中任務(wù)阻塞數(shù)量越來(lái)越多會(huì)導(dǎo)致內(nèi)存耗盡(OOM), 要設(shè)置隊(duì)列的上限值

源碼地址:
Almost-Famous: 游戲中的網(wǎng)關(guān)線程池是如何創(chuàng)建的

相關(guān)博文:友情鏈接
一次Java線程池誤用引發(fā)的血案和總結(jié)
Java中線程池,你真的會(huì)用嗎?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容