多線程(五)——線程池原理剖析&鎖的深度化

什么是線程池

Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過程中,合理地使用線程池能夠帶來3個好處。
第一:降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
第二:提高響應(yīng)速度。當任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,
還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。但是,要做到合理利用
線程池,必須對其實現(xiàn)原理了如指掌。

線程池的作用

線程池是為突然大量爆發(fā)的線程設(shè)計的,通過有限的幾個固定線程為大量的操作服務(wù),減少了創(chuàng)建和銷毀線程所需的時間,從而提高效率。
如果一個線程的時間非常長,就沒必要用線程池了(不是不能作長時間操作,而是不宜。),況且我們還不能控制線程池中線程的開始、掛起、和中止。

線程池的分類

ThreadPoolExecutor
Java是天生就支持并發(fā)的語言,支持并發(fā)意味著多線程,線程的頻繁創(chuàng)建在高并發(fā)及大數(shù)據(jù)量是非常消耗資源的,因為java提供了線程池,在jdk1.5以前的版本中,線程池的使用是及其簡陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入給予開發(fā)人員開發(fā)并發(fā)程序以及解決并發(fā)問題很大的幫助。
這里主要介紹下并發(fā)包下的Executor接口,Executor接口雖然作為一個非常舊的接口(JDK1.5 2004年發(fā)布),但是很多程序員對于其中的一些原理還是不熟悉。
Executor框架的最頂層實現(xiàn)是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其實也只是ThreadPoolExecutor的構(gòu)造函數(shù)參數(shù)不同而已。通過傳入不同的參數(shù),就可以構(gòu)造出適用于不同應(yīng)用場景下的線程池。

傳入?yún)?shù):
corePoolSize: 核心池的大小。 當有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放到緩存隊列當中。
maximumPoolSize: 線程池最大線程數(shù),它表示在線程池中最多能創(chuàng)建多少個線程。
keepAliveTime: 表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。
unit: 參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性

線程池四種創(chuàng)建方式

Java通過Executors(jdk1.5并發(fā)包)提供四種線程池,分別為:
newCachedThreadPool:創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool :創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
newScheduledThreadPool :創(chuàng)建一個定時線程池,支持定時及周期性任務(wù)執(zhí)行。
newSingleThreadExecutor: 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。

newCachedThreadPool

創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

  • 示例代碼如下
public class Demo01 {
  public static void main(String[] args) {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
    for (int i = 1; i <= 13; i++) {
        final int  temp = i;
        cachedThreadPool.execute(new Runnable() {               
            public void run() {
                System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);                  
            }
        });     
      }
  }
}
  • 運行結(jié)果
ThreadName:pool-1-thread-2,i:2
ThreadName:pool-1-thread-3,i:3
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-4,i:4
ThreadName:pool-1-thread-1,i:13
ThreadName:pool-1-thread-3,i:12
ThreadName:pool-1-thread-5,i:5
ThreadName:pool-1-thread-6,i:6
ThreadName:pool-1-thread-7,i:7
ThreadName:pool-1-thread-8,i:8
ThreadName:pool-1-thread-9,i:9
ThreadName:pool-1-thread-10,i:10
ThreadName:pool-1-thread-11,i:11

可見,并沒有創(chuàng)建13個線程,而是重復利用了緩存。
總結(jié): 線程池為無限大,當執(zhí)行第二個任務(wù)時第一個任務(wù)已經(jīng)完成,會復用執(zhí)行第一個任務(wù)的線程,而不用每次新建線程。

newFixedThreadPool

創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。

  • 示例代碼如下:
public class Demo01 {
public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    for (int i = 1; i <= 13; i++) {
        final int  temp = i;
        threadPool.execute(new Runnable() {             
            public void run() {
                System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);                  
            }
        });     
      }
  }
}
  • 運行結(jié)果
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-1,i:4
ThreadName:pool-1-thread-2,i:2
ThreadName:pool-1-thread-1,i:5
ThreadName:pool-1-thread-2,i:6
ThreadName:pool-1-thread-1,i:7
ThreadName:pool-1-thread-2,i:8
ThreadName:pool-1-thread-2,i:10
ThreadName:pool-1-thread-2,i:11
ThreadName:pool-1-thread-2,i:12
ThreadName:pool-1-thread-2,i:13
ThreadName:pool-1-thread-1,i:9
ThreadName:pool-1-thread-3,i:3

總結(jié):因為線程池大小為3,每個任務(wù)輸出index后sleep 2秒,所以每兩秒打印3個數(shù)字。定長線程池的大小最好根據(jù)系統(tǒng)資源進行設(shè)置。如Runtime.getRuntime().availableProcessors();

newScheduledThreadPool

創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。

  • 延遲執(zhí)行示例代碼如下:
public class Demo01 {
  public static void main(String[] args)        
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
    for (int i = 1; i <= 10; i++) {
        final int  temp = i;
        threadPool.schedule(new Runnable() {                
            public void run() {
                System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);                  
            }
        },3,TimeUnit.SECONDS);      
    }
   }
}
  • 運行結(jié)果
 ThreadName:pool-1-thread-3,i:2
 ThreadName:pool-1-thread-1,i:1
 ThreadName:pool-1-thread-3,i:4
 ThreadName:pool-1-thread-2,i:3
 ThreadName:pool-1-thread-3,i:6
 ThreadName:pool-1-thread-1,i:5
 ThreadName:pool-1-thread-3,i:8
 ThreadName:pool-1-thread-3,i:10
 ThreadName:pool-1-thread-1,i:9
 ThreadName:pool-1-thread-2,i:7

表示延遲3秒執(zhí)行

newSingleThreadExecutor

創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。

  • 示例代碼如下:
public class Demo01 {
  public static void main(String[] args)        
    ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 1; i <= 10; i++) {
        final int  temp = i;
        threadExecutor.execute(new Runnable() {             
            public void run() {
                System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);                  
            }
        });     
      }
   }
}
  • 運行結(jié)果
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-1,i:2
ThreadName:pool-1-thread-1,i:3
ThreadName:pool-1-thread-1,i:4
ThreadName:pool-1-thread-1,i:5
ThreadName:pool-1-thread-1,i:6
ThreadName:pool-1-thread-1,i:7
ThreadName:pool-1-thread-1,i:8
ThreadName:pool-1-thread-1,i:9
ThreadName:pool-1-thread-1,i:10

注意: 結(jié)果依次輸出,相當于順序執(zhí)行各個任務(wù)。

線程池原理剖析

提交一個任務(wù)到線程池中,線程池的處理流程如下:
1、判斷線程池里的核心線程是否都在執(zhí)行任務(wù),如果不是(核心線程空閑或者還有核心線程沒有被創(chuàng)建)則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果核心線程都在執(zhí)行任務(wù),則進入下個流程。
2、線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務(wù)存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。
3、判斷線程池里的線程是否都處于工作狀態(tài),如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個任務(wù)。


合理配置線程池

要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來進行分析:
任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
任務(wù)的優(yōu)先級:高,中和低。
任務(wù)的執(zhí)行時間:長,中和短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置Ncpu+1個線程的線程池。IO密集型任務(wù)則由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。

總結(jié)
CPU密集型時,任務(wù)可以少配置線程數(shù),大概和機器的cpu核數(shù)相當,這樣可以使得每個線程都在執(zhí)行任務(wù)
IO密集型時,大部分線程都阻塞,故需要多配置線程數(shù),2*cpu核數(shù)
操作系統(tǒng)之名稱解釋:
某些進程花費了絕大多數(shù)時間在計算上,而其他則在等待I/O上花費了大多是時間,
前者稱為計算密集型(CPU密集型)computer-bound,后者稱為I/O密集型,I/O-bound。

悲觀鎖、樂觀鎖、排他鎖

場景
當多個請求同時操作數(shù)據(jù)庫時,首先將訂單狀態(tài)改為已支付,在金額加上200,在同時并發(fā)場景查詢條件下,會造成重復通知。
SQL:


悲觀鎖與樂觀鎖

悲觀鎖:
悲觀鎖悲觀的認為每一次操作都會造成更新丟失問題,在每次查詢時加上排他鎖。
每次去拿數(shù)據(jù)的時候都認為別人會修改,所以每次在拿數(shù)據(jù)的時候都會上鎖,這樣別人想拿這個數(shù)據(jù)就會block直到它拿到鎖。傳統(tǒng)的關(guān)系型數(shù)據(jù)庫里邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖。
核心SQL代碼
select * from xxx for update;
例如:

樂觀鎖:
總是認為不會產(chǎn)生并發(fā)問題,每次去取數(shù)據(jù)的時候總認為不會有其他線程對數(shù)據(jù)進行修改,因此不會上鎖,但是在更新時會判斷其他線程在這之前有沒有對數(shù)據(jù)進行修改,一般會使用版本號機制或CAS操作實現(xiàn)。

  • version方式:
    一般是在數(shù)據(jù)表中加上一個數(shù)據(jù)版本號version字段,表示數(shù)據(jù)被修改的次數(shù),當數(shù)據(jù)被修改時,version值會加一。當線程A要更新數(shù)據(jù)值時,在讀取數(shù)據(jù)的同時也會讀取version值,在提交更新時,若剛才讀取到的version值為當前數(shù)據(jù)庫中的version值相等時才更新,否則重試更新操作,直到更新成功。
    核心SQL代碼
    update table set x=x+1, version=version+1 where id=#{id} and version=#{version};
  • CAS操作方式:
    即compare and swap 或者 compare and set,涉及到三個操作數(shù),數(shù)據(jù)所在的內(nèi)存值,預期值,新值。當需要更新時,判斷當前內(nèi)存值與之前取到的值是否相等,若相等,則用新值更新,若失敗則重試,一般情況下是一個自旋操作,即不斷的重試。

可重入鎖

鎖作為并發(fā)共享數(shù)據(jù),保證一致性的工具,在JAVA平臺有多種實現(xiàn)(如 synchronized 和 ReentrantLock等等 ) 。這些已經(jīng)寫好提供的鎖為我們開發(fā)提供了便利。
重入鎖,也叫做遞歸鎖,指的是同一線程外層函數(shù)獲得鎖之后,內(nèi)層遞歸函數(shù)仍然有獲取該鎖的代碼,但不受影響。
在JAVA環(huán)境下ReentrantLock和synchronized都是可重入鎖。

  • 代碼示例
public  synchronized void get() {
    System.out.println("name:" + Thread.currentThread().getName() + " get();");
    set();
}

public synchronized  void set() {
    System.out.println("name:" + Thread.currentThread().getName() + " set();");
}

讀寫鎖

相比Java中的鎖(Locks in Java)里Lock實現(xiàn),讀寫鎖更復雜一些。假設(shè)你的程序中涉及到對一些共享資源的讀和寫操作,且寫操作沒有讀操作那么頻繁。在沒有寫操作的時候,兩個線程同時讀一個資源沒有任何問題,所以應(yīng)該允許多個線程能在同時讀取共享資源。但是如果有一個線程想去寫這些共享資源,就不應(yīng)該再有其它線程對該資源進行讀或?qū)懀ㄗg者注:也就是說:讀-讀能共存,讀-寫不能共存,寫-寫不能共存)。這就需要一個讀/寫鎖來解決這個問題。Java5在java.util.concurrent包中已經(jīng)包含了讀寫鎖。盡管如此,我們還是應(yīng)該了解其實現(xiàn)背后的原理。

public class Cache {
  static Map<String, Object> map = new HashMap<String, Object>();
  static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  static Lock r = rwl.readLock();
  static Lock w = rwl.writeLock();

  // 獲取一個key對應(yīng)的value
  public static final Object get(String key) {
    r.lock();
    try {
        System.out.println("正在做讀的操作,key:" + key + " 開始");
        Thread.sleep(100);
        Object object = map.get(key);
        System.out.println("正在做讀的操作,key:" + key + " 結(jié)束");
        System.out.println();
        return object;
    } catch (InterruptedException e) {

    } finally {
        r.unlock();
    }
    return key;
  }

  // 設(shè)置key對應(yīng)的value,并返回舊有的value
  public static final Object put(String key, Object value) {
    w.lock();
    try {
        System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "開始.");
        Thread.sleep(100);
        Object object = map.put(key, value);
        System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "結(jié)束.");
        System.out.println();
        return object;
    } catch (InterruptedException e) {

    } finally {
        w.unlock();
    }
    return value;
  }

  // 清空所有的內(nèi)容
  public static final void clear() {
    w.lock();
    try {
        map.clear();
    } finally {
        w.unlock();
    }
  }

  public static void main(String[] args) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                Cache.put(i + "", i + "");
            }

        }
    }).start();
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                Cache.get(i + "");
            }

        }
    }).start();
  }
}

CAS無鎖機制

(1)與鎖相比,使用比較交換(下文簡稱CAS)會使程序看起來更加復雜一些。但由于其非阻塞性,它對死鎖問題天生免疫,并且,線程間的相互影響也遠遠比基于鎖的方式要小。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的系統(tǒng)開銷,也沒有線程間頻繁調(diào)度帶來的開銷,因此,它要比基于鎖的方式擁有更優(yōu)越的性能。
(2)無鎖的好處:
第一,在高并發(fā)的情況下,它比有鎖的程序擁有更好的性能;
第二,它天生就是死鎖免疫的。
就憑借這兩個優(yōu)勢,就值得我們冒險嘗試使用無鎖的并發(fā)。
(3)CAS算法的過程是這樣:它包含三個參數(shù)CAS(V,E,N): V表示要更新的變量,E表示預期值,N表示新值。僅當V值等于E值時,才會將V的值設(shè)為N,如果V值和E值不同,則說明已經(jīng)有其他線程做了更新,則當前線程什么都不做。最后,CAS返回當前V的真實值。
(4)CAS操作是抱著樂觀的態(tài)度進行的,它總是認為自己可以成功完成操作。當多個線程同時使用CAS操作一個變量時,只有一個會勝出,并成功更新,其余均會失敗。失敗的線程不會被掛起,僅是被告知失敗,并且允許再次嘗試,當然也允許失敗的線程放棄操作?;谶@樣的原理,CAS操作即使沒有鎖,也可以發(fā)現(xiàn)其他線程對當前線程的干擾,并進行恰當?shù)奶幚怼?br> (5)簡單地說,CAS需要你額外給出一個期望值,也就是你認為這個變量現(xiàn)在應(yīng)該是什么樣子的。如果變量不是你想象的那樣,那說明它已經(jīng)被別人修改過了。你就重新讀取,再次嘗試修改就好了。
(6)在硬件層面,大部分的現(xiàn)代處理器都已經(jīng)支持原子化的CAS指令。在JDK 5.0以后,虛擬機便可以使用這個指令來實現(xiàn)并發(fā)操作和并發(fā)數(shù)據(jù)結(jié)構(gòu),并且,這種操作在虛擬機中可以說是無處不在。

  • 原子類


  /** 
 * Atomically increments by one the current value. 
 * 
 * @return the updated value 
 */  
public final int incrementAndGet() {  
    for (;;) {  
        //獲取當前值  
        int current = get();  
        //設(shè)置期望值  
        int next = current + 1;  
        //調(diào)用Native方法compareAndSet,執(zhí)行CAS操作  
        if (compareAndSet(current, next))  
            //成功后才會返回期望值,否則無線循環(huán)  
            return next;  
    }  
}  

自旋鎖

自旋鎖是采用讓當前線程不停地的在循環(huán)體內(nèi)執(zhí)行實現(xiàn)的,當循環(huán)的條件被其他線程改變時 才能進入臨界區(qū)。

  • 如下
  private AtomicReference<Thread> sign =new AtomicReference<>();
  public void lock() {
    Thread current = Thread.currentThread();
    while (!sign.compareAndSet(null, current)) { }
  }
  public void unlock() {
    Thread current = Thread.currentThread();
    sign.compareAndSet(current, null);
  }
  • 示例代碼
public class Test implements Runnable {
  static int sum;
  private SpinLock lock;

  public Test(SpinLock lock) {
    this.lock = lock;
  }

  /**
   * @param args
   *  @throws InterruptedException
   */
  public static void main(String[] args) throws InterruptedException {
    SpinLock lock = new SpinLock();
    for (int i = 0; i < 100; i++) {
        Test test = new Test(lock);
        Thread t = new Thread(test);
        t.start();
    }

    Thread.currentThread().sleep(1000);
    System.out.println(sum);
  }

  @Override
  public void run() {
    this.lock.lock();
    this.lock.lock();
    sum++;
    this.lock.unlock();
    this.lock.unlock();
  }
}

當一個線程 調(diào)用這個不可重入的自旋鎖去加鎖的時候沒問題,當再次調(diào)用lock()的時候,因為自旋鎖的持有引用已經(jīng)不為空了,該線程對象會誤認為是別人的線程持有了自旋鎖使用了CAS原子操作,lock函數(shù)將owner設(shè)置為當前線程,并且預測原來的值為空。unlock函數(shù)將owner設(shè)置為null,并且預測值為當前線程。
當有第二個線程調(diào)用lock操作時由于owner值不為空,導致循環(huán)一直被執(zhí)行,直至第一個線程調(diào)用unlock函數(shù)將owner設(shè)置為null,第二個線程才能進入臨界區(qū)。
由于自旋鎖只是將當前線程不停地執(zhí)行循環(huán)體,不進行線程狀態(tài)的改變,所以響應(yīng)速度更快。但當線程數(shù)不停增加時,性能下降明顯,因為每個線程都需要執(zhí)行,占用CPU時間。如果線程競爭不激烈,并且保持鎖的時間段。適合使用自旋鎖。

分布式鎖

如果想在不同的jvm中保證數(shù)據(jù)同步,使用分布式鎖技術(shù)。
有數(shù)據(jù)庫實現(xiàn)、緩存實現(xiàn)、Zookeeper分布式鎖

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容