開線程的方式

首先何為線程(進程/線程)?
進程操作系統(tǒng)動態(tài)執(zhí)行基本單元,系統(tǒng)為每個進程分配內(nèi)存,包括一般情況下,包括文本區(qū)域(text region/指令)、數(shù)據(jù)區(qū)域(data region/變量)和堆棧(stack region/對象)。

我們的程序雖然可以做到多進程,但是,多進程需要切換上下文,什么是上下文?
當程序執(zhí)行到多進程的指令,那么會把當前的運行環(huán)境-堆棧等,復制一份,到另一塊內(nèi)存區(qū)域,而又因為cpu是輪尋機制,關(guān)于CPU運行原理,百度上有篇文章寫的很好,我這邊引用一下。
頻繁切換上下文(大概一次20微秒),屬于沒必要的消耗。另外就是進程間通信需要通過管道機制,比較復雜。

所以通常需要性能的場景通常合理使用線程來提高效率,線程的定義是,一個線程有且至少有一個線程,線程利用進程的資源,且本身不擁有系統(tǒng)資源,所以對線程的調(diào)度的開銷就會小很多(相對進程)。

因為這篇文章我定義到Java的分類下面,所以還是要通過Java來描述
其實我認為要真的好好深入學習線程進程,cpu調(diào)度這塊,還是要通過C來學
日后有時間,我會用C語言來模擬實現(xiàn)一遍

既然了解了什么是線程,看下Java怎么實現(xiàn)多線程:
Thread,Runnable,Future
至于網(wǎng)上有些說4種的,其實就是用ExecutorService來管理了一下。
那么從頭聊一聊。

Thread 其實是Runnable的實現(xiàn)類,類聲明如下

public class Thread implements Runnable

看下最核心的一個方法
首先判定現(xiàn)成的狀態(tài),0狀態(tài)表示該線程是新創(chuàng)建的,一切不是新建狀態(tài)的線程,都視為非法
第二部添加到線程組,線程組默認初始長度為4,如果滿了就闊為2倍。
之后可以看到,調(diào)用了一個本地方法start0,如果成功,則更改started標簽量
最后一個判定,啟動失敗,從線程組中移除當前線程。

public synchronized void start() {
    /**
     * This method is not invoked for the main method thread or "system"
     * group threads created/set up by the VM. Any new functionality added
     * to this method in the future may have to also be added to the VM.
     *
     * A zero status value corresponds to state "NEW".
     */
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started
     * so that it can be added to the group's list of threads
     * and the group's unstarted count can be decremented. */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}

Thread中出現(xiàn)的Runnable,作為一個接口,只有一個方法,就是run。

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

之后來看Future,
Future其實提供了和Runnable能力并列的接口,簡單解釋一下為什么這么說,Runnable接口提供了run,也就是可以放在線程中執(zhí)行的能力,Future其實是賦予了線程執(zhí)行后可以返回的能力,run的聲明是void,所以沒有返回值。

兩者結(jié)合,簡單易懂的一個類RunnableFuture的接口就出來了。
那么相當于Thread的實現(xiàn)類,FutureTask就出現(xiàn)了,它就是集大成者。

這么說可能有點跳躍,先看下下面的實現(xiàn),一看,誒,怎么沒有Future?
Future本身是一個接口,跟Runnable是相同的級別,但區(qū)別通俗來講在于他沒有run的能力,這個能力來自于Runnable。
追溯一下FutureTask,發(fā)現(xiàn)它繼承了RunnableFuture,誒,這個單詞起的就有意思了,包含了Runnable,和Future。
點進去看下

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

Future就在這,關(guān)于Future里面有什么,大家可以點進去看看,里面最關(guān)鍵的就是get() throws InterruptedException, ExecutionException;這個方法,就這這個方法,讓我們通過調(diào)用api,拿到線程里面的值。

如果想使用這個東西,開啟線程,這個時候不能用new Thread(future)這種方式了,因為Thread沒有這種能力,只實現(xiàn)了一個Runnable接口,
這個時候,一個新的類出現(xiàn)了,源碼如下

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

看下Callable的聲明,是有返回值的,并且可以拋出異常的。
這個返回值就很關(guān)鍵了,通過這個返回值,你可以把任何你想通過線程拿到的結(jié)果拿回來。
而拿結(jié)果的方法就是FutureTask的get()方法,之前我們看源碼時又看到,這個get方法來自于Future接口的V get()方法

簡單看下如何使用一個有返回值的多線程操作

public class CallableTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> callable = new Task();
        FutureTask task = new FutureTask(callable);

        Thread oneThread = new Thread(task);
        oneThread.start();

        System.out.println(">>>  工作結(jié)果 " + task.get().toString());
    }

}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println(">>>  線程開始工作");
        Thread.sleep(1000);
        System.out.println(">>>  結(jié)束工作開始返回");
        return 10;
    }

}

可以看到FutureTask依然調(diào)用的是Thread,走的是本地方法start0。
Runbale就沒什么好說的了,實現(xiàn)一個接口,放到Thread里面去執(zhí)行,基本沒什么東西,能力與Thread差不多,區(qū)別是實現(xiàn)Runnable接口的類必須依托于Thread類才能啟動,

//使用這個構(gòu)造方法
public Thread(Runnable target) {
    this(null, target, "Thread-" + nextThreadNum(), 0);
}

然后用Thread的start方法,需要注意的是,千萬不要調(diào)run方法, 要用start。

最后看下ExecutorService這個類,ExecutorService級別很高,他的爸爸直接就是Executor。
他的兒子,是AbstractExecutorService,這里實現(xiàn)了submit,doInvokeAny等方法。
而我們調(diào)用Executors.newFixedThreadPool(poolSize);返回的是ThreadPoolExecutor


注:一般不建議使用Executors.newFixedThreadPool(poolSize);,什么東西全是默認,建議如下方式:

ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
ExecutorService service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingDeque<>(1024), factory, new ThreadPoolExecutor.AbortPolicy());

具體參數(shù)含義就自行百度吧,很多人講的。

ThreadPoolExecutor這個類,又是AbstractExecutorService的兒子,所以這個關(guān)系就很明顯了
ThreadPoolExecutor -> AbstractExecutorService -> ExecutorService -> Executor

看到一堆submit方法,然而沒什么用,真正關(guān)鍵的方法,是execute方法,在ThreadPoolExecutor中實現(xiàn)。

這個類有點意思,一上來就給我一個下馬威
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

這個ctl到底是什么?
查了很久,在cnblogs里找到一個大神的描述 -- Okevin

這個變量使用來干嘛的呢?它的作用有點類似我們在《7.ReadWriteLock接口及其實現(xiàn)ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態(tài),而AQS則只提供了state一個int型變量,此時將state高16位表示為讀狀態(tài),低16位表示為寫狀態(tài)。這里的clt同樣也是,它表示了兩個概念:

workerCount:當前有效的線程數(shù)
runState:當前線程池的五種狀態(tài),Running、Shutdown、Stop、Tidying、Terminate。
  int型變量一共有32位,線程池的五種狀態(tài)runState至少需要3位來表示,故workCount只能有29位,所以代碼中規(guī)定線程池的有效線程數(shù)最多為2^29-1。

看到這先來聊一下線程提交任務的規(guī)則,--《java并發(fā)編程藝術(shù)》

  1. 首先會判斷核心線程池里是否有線程可執(zhí)行,有空閑線程則創(chuàng)建一個線程來執(zhí)行任務。
  2. 當核心線程池里已經(jīng)沒有線程可執(zhí)行的時候,此時將任務丟到任務隊列中去。
  3. 如果任務隊列(有界)也已經(jīng)滿了的話,但運行的線程數(shù)小于最大線程池的數(shù)量的時候,此時將會新建一個線程用于執(zhí)行任務,但如果運行的線程數(shù)已經(jīng)達到最大線程池的數(shù)量的時候,此時將無法創(chuàng)建線程執(zhí)行任務。
      所以實際上對于線程池不僅是單純地將任務丟到線程池,線程池中有線程就執(zhí)行任務,沒線程就等待。

最后附上大神對execute的注解

/**
  * corePoolSize:核心線程池的線程數(shù)量
  * 
  * maximumPoolSize:最大的線程池線程數(shù)量
  * 
  * keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。
  * 
  * unit:線程活動保持時間的單位。
  * 
  * workQueue:指定任務隊列所使用的阻塞隊列
*/
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   //由它可以獲取到當前有效的線程數(shù)和線程池的狀態(tài)
   /*1.獲取當前正在運行線程數(shù)是否小于核心線程池,是則新創(chuàng)建一個線程執(zhí)行任務,否則將任務放到任務隊列中*/
   int c = ctl.get();
    if (workerCountOf(c) < corePoolSize){
        if (addWorker(command, tre))     //在addWorker中創(chuàng)建工作線程執(zhí)行任務
            return ;
        c = ctl.get();
    }
    /*2.當前核心線程池中全部線程都在運行workerCountOf(c) >= corePoolSize,所以此時將線程放到任務隊列中*/
    //線程池是否處于運行狀態(tài),且是否任務插入任務隊列成功
    if (isRunning(c) && workQueue.offer(command))    {
        int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))//線程池是否處于運行狀態(tài),如果不是則使剛剛的任務出隊
       reject(command);//拋出RejectedExceptionException異常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
    /*3.插入隊列不成功,且當前線程數(shù)數(shù)量小于最大線程池數(shù)量,此時則創(chuàng)建新線程執(zhí)行任務,創(chuàng)建失敗拋出異常*/
  else if (!addWorker(command, false)){
    reject(command);    //拋出RejectedExceptionException異常
  }
}
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查線程池是否處于運行狀態(tài),核心線程池中是否還有空閑線程,都滿足條件過后則會調(diào)用compareAndIncrementWorkerCount先將正在運行的線程數(shù)+1,數(shù)量自增成功則跳出循環(huán),自增失敗則繼續(xù)從頭繼續(xù)循環(huán)*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
  /*正在運行的線程數(shù)自增成功后則將線程封裝成工作線程Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;//全局鎖
    w = new Woker(firstTask);//將線程封裝為Worker工作線程
    final Thread t = w.thread;
    if (t != null) {
      //獲取全局鎖
      mainLock.lock();
      /*當持有了全局鎖的時候,還需要再次檢查線程池的運行狀態(tài)等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);        //線程池運行狀態(tài)
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){          //線程池處于運行狀態(tài),或者線程池關(guān)閉且任務線程為空
          if (t.isAlive())    //線程處于活躍狀態(tài),即線程已經(jīng)開始執(zhí)行或者還未死亡,正確的應線程在這里應該是還未開始執(zhí)行的
            throw new IllegalThreadStateException();
          //private final HashSet<Worker> wokers = new HashSet<Worker>();
          //包含線程池中所有的工作線程,只有在獲取了全局的時候才能訪問它。將新構(gòu)造的工作線程加入到工作線程集合中
          workers.add(w);
          int s = worker.size();    //工作線程數(shù)量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;    //新構(gòu)造的工作線程加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        //在被構(gòu)造為Worker工作線程,且被加入到工作線程集合中后,執(zhí)行線程任務
        //注意這里的start實際上執(zhí)行Worker中run方法,所以接下來分析Worker的run方法
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)   //未能成功創(chuàng)建執(zhí)行工作線程
      //在啟動工作線程失敗后,將工作線程從集合中移除
      addWorkerFailed(w);    
  }
  return workerStarted;
}
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現(xiàn)了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    //設(shè)置AQS的同步狀態(tài)為-1,禁止中斷,直到調(diào)用runWorker
    setState(-1);    
    this.firstTask = firstTask;
    //通過線程工廠來創(chuàng)建一個線程,將自身作為Runnable傳遞傳遞
    this.thread = getThreadFactory().newThread(this);    
  }
  public void run() {
    runWorker(this);    //運行工作線程
  }
}

Okevin博客

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容