2020-02-01-Java線程池

Java線程池基本用法

Java提供了一些通用接口來創(chuàng)建線程池:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

但是通常不推薦使用這些簡易接口,因為這些接口可能會使用無界的任務(wù)隊列,理論上可以無限添加任務(wù)到線程池,并且對核心線程數(shù)和最大線程數(shù)的設(shè)置也可能不合理,導(dǎo)致對系統(tǒng)資源的消耗很大。
通常建議自己創(chuàng)建一個線程池對象,以ThreadPoolExecutor為例,看看如何構(gòu)造一個線程池

線程池的創(chuàng)建

構(gòu)造函數(shù)有幾個參數(shù)比較重要:

  1. corePoolSize 核心線程數(shù),線程池一般情況下不會回收的線程數(shù)量;
  2. maximumPoolSize 最大線程數(shù),超過這個數(shù)量后不允許再創(chuàng)建線程;
  3. keepAliveTime 非核心線程處于空閑狀態(tài)的最大時間,超過這個時間就會被回收;
  4. unit 時間單位,跟keepAliveTime結(jié)合使用;
  5. workQueue 任務(wù)隊列,通常有SynchronousQueue,LinkedBlockingQueue幾種類型;
  6. threadFactory 指定創(chuàng)建線程的工廠方法;
  7. handler 拒絕策略,當(dāng)任務(wù)隊列已滿或線程數(shù)達到最大時執(zhí)行。
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

線程池的生命周期

以ThreadPoolExecutor為例,簡單看下代碼結(jié)構(gòu)和生命周期


Java線程池 (1).png

線程池有5個狀態(tài):

  1. RUNNING : 接受新的任務(wù)并處理隊列中的任務(wù);
  2. SHUTDOWN : 不再接受新任務(wù),但會繼續(xù)處理隊列中未完成任務(wù);
  3. STOP : 不再接受新任務(wù),也不再處理隊列中的任務(wù),正在執(zhí)行的任務(wù)被中斷;
  4. TIDYING : 所有的任務(wù)處理完成,有效的線程數(shù)是0,下一步會執(zhí)行terminated()進入TERMINATED;
  5. TERMINATED : terminated()方法執(zhí)行完畢,線程池結(jié)束。

新任務(wù)的執(zhí)行 execute()

一個新任務(wù)的執(zhí)行分4步:

  1. 如果當(dāng)前線程數(shù)小于核心線程數(shù)corePoolSize,就創(chuàng)建新的線程;
  2. 當(dāng)前線程數(shù)大于核心線程數(shù),且任務(wù)隊列未滿,將任務(wù)放入任務(wù)隊列;
  3. 當(dāng)前任務(wù)隊列已滿,且線程數(shù)小于最大線程數(shù),就創(chuàng)建線程;
  4. 當(dāng)前線程數(shù)大于最大線程數(shù)maximumPoolSize,拒絕執(zhí)行任務(wù)。
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

拒絕策略

Java提供了4種拒絕任務(wù)的策略,都是RejectedExecutionHandler的實現(xiàn)。

  1. AbortPolicy:拋出RejectedExecutionException;
  2. DiscardPolicy:什么也不做,直接忽略;
  3. DiscardOldestPolicy:丟棄執(zhí)行隊列中最老的任務(wù),嘗試為當(dāng)前提交的任務(wù)騰出位置;
  4. CallerRunsPolicy:直接由提交任務(wù)者執(zhí)行這個任務(wù)

任務(wù)隊列

比較常見的隊列類型有SynchronousQueue和LinkedBlockingQueue。

  1. SynchronousQueue
    這種隊列沒有緩存,生產(chǎn)者和消費者兩個線程必須交叉運行。隊列中已經(jīng)有一個任務(wù)時,生產(chǎn)者線程阻塞,直到消費者取出線程,才能繼續(xù)運行。反過來,隊列為空時,消費者線程阻塞。
    比如下面兩個線程會形成死鎖,都進入WAITING狀態(tài)。
private Thread putThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                queue.put("arg0");
                System.out.println("put arg0 to queue");
                takThread.join();
                queue.put("arg1");
                System.out.println("put arg1 to queue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread");
    
    private Thread takThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                String arg0 = queue.take();
                System.out.println("take arg from queue:" + arg0);
                putThread.join();
                String arg1 = queue.take();
                System.out.println("take arg from queue:" + arg1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "takThread");

SynchronousQueue有兩種公平和非公平模式,實際上是兩種FIFO和LIFO數(shù)據(jù)結(jié)構(gòu)。
公平模式使用隊列,先入隊的任務(wù)先執(zhí)行;
非公平模式使用棧,后入隊的任務(wù)先執(zhí)行。
比如下面的代碼,默認情況下是先執(zhí)行putThread2,再執(zhí)行putThread1。因為默認fair參數(shù)是false。

private Thread putThread1 = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                queue.put("putThread1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread1");
    
    private Thread putThread2 = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                queue.put("putThread2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread2");
    
    private Thread takThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("take arg from queue:" + queue.take());
                System.out.println("take arg from queue:" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "takThread");
  1. LinkedBlockingQueue
    LinkedBlockingQueue也是生產(chǎn)者-消費者模式,但是生產(chǎn)者線程和消費者線程使用不同的鎖來進行同步,理論上生產(chǎn)者可以一直添加任務(wù)進隊列。
    LinkedBlockingQueue是FIFO的隊列,相當(dāng)于公平模式,保證先入隊的任務(wù)先執(zhí)行。
    LinkedBlockingQueue可以指定容量,當(dāng)隊列已滿時,再執(zhí)行put()方法,生產(chǎn)者線程會阻塞,進入WAITING狀態(tài)?;驁?zhí)行offer()方法會返回false。
    參考:

https://www.cnblogs.com/ants/p/11343657.html
https://blog.csdn.net/yanyan19880509/article/details/52562039

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容