一 .volatile關鍵字
?volatile是java提供的輕量級的同步機制,主要有三個特性:保證可見性,禁止指令重排,不保證原子性。
JMM(Java 內存模型)
基本概念
?JMM本身是一種抽象的概念 并不真實存在,他描述的是一組定義或規(guī)范,通過這組規(guī)范規(guī)定了程序中的訪問方式
?JMM同步規(guī)定:
??1.線程解鎖前必須把共享變量的值刷回主內存
??2.線程加鎖前把主內存的值復制到自己的內存
??3.加鎖和解鎖必須是同一把鎖。
?由于 JVM 運行程序的實體是線程,而每個線程創(chuàng)建時 JVM 都會為其創(chuàng)建一個工作內存,工作內存是每個線程的私有數(shù)據(jù)區(qū)域,而 Java 內存模型中規(guī)定所有變量的儲存在主內存,主內存是共享內存區(qū)域,所有的線程都可以訪問,但線程對變量的操作(讀取賦值等)必須都工作內存進行看。
?首先要將變量從主內存拷貝的自己的工作內存空間,然后對變量進行操作,操作完成后再將變量寫回主內存,不能直接操作主內存中的變量,工作內存中存儲著主內存中的變量副本拷貝,前面說過,工作內存是每個線程的私有數(shù)據(jù)區(qū)域,因此不同的線程間無法訪問對方的工作內存,線程間的通信(傳值)必須通過主內存來完成。
1.可見性代碼示例
public class VolatileDemo {
static int v1;
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
v1 = 100;
}
}).start();
while (v1 == 0) {
}
System.out.println("v1 = " + v1);
}
}
?上面示例無法輸出結果,當主線程已經(jīng)讀到v1的值時,如果不加volatile關鍵字,另一個線程更改這個值不會去通知主線程。所以進入死循環(huán)。如果加上volatile關鍵字,則會刷新主線程的v1的值,打印之后結束主線程。
2.不保證原子性代碼示例
public class VolatileDemo {
static volatile int v1;
public static void add(){
v1++;
}
public static void main(String[] args) throws InterruptedException {
for(int i = 0;i< 20;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;i < 1000; i++){
add();
}
}
}).start();
}
//當活動線程只有主線程和GC時才進行打印否則讓位其他線程
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println("v1 = " + v1);
}
}
?i++并非原子操作,包含三個步驟:1·讀取i的值 ? 2·將i的值加一?3·將加一后的值寫回i
?發(fā)現(xiàn)上面示例打印的總是比20000小,說明volatile并不能保證原子性
3.禁止指令重排
?指令重排:一般情況下,CPU和編譯器為了提升程序執(zhí)行的效率,會按照一定的規(guī)則允許進行指令優(yōu)化,在某些情況下,這種優(yōu)化會帶來一些執(zhí)行的邏輯問題,主要的原因是代碼邏輯之間是存在一定的先后順序,在并發(fā)執(zhí)行情況下,會發(fā)生二義性,即按照不同的執(zhí)行邏輯,會得到不同的結果信息。
?volatile 實現(xiàn)禁止指令重排序的優(yōu)化,從而避免了多線程環(huán)境下程序出現(xiàn)亂序的現(xiàn)象
?先了解一個概念,內存屏障(Memory Barrier)又稱內存柵欄,是一個 CPU 指令,他的作用有兩個:
??1.保證特定操作的執(zhí)行順序
??2.保證某些變量的內存可見性(利用該特性實現(xiàn) volatile 的內存可見性)
?由于編譯器個處理器都能執(zhí)行指令重排序優(yōu)化,如果在指令間插入一條 Memory Barrier 則會告訴編譯器和 CPU,不管什么指令都不能個這條 Memory Barrier 指令重排序,也就是說通過插入內存屏障禁止在內存屏障前后執(zhí)行重排序優(yōu)化。內存屏障另一個作用是強制刷出各種 CPU 緩存數(shù)據(jù),因此任何 CPU 上的線程都能讀取到這些數(shù)據(jù)的最新版本。
volatile常見用法(雙端檢鎖單例)
public class Singleton {
private static volatile Singleton singleton;
private Singleton() {
}
public static Singleton getInstance(){
if(singleton == null){
synchronized (Singleton.class){
if(singleton == null){
singleton = new Singleton();
}
}
}
return singleton;
}
}
?如果不加volatile多線程環(huán)境下存在指令重排的風險,singleton = new Singleton(); 可以分解為三條指令
??1.分配內存地址
?? 2.初始化對象
?? 3.將內存地址指向初始化對象
由于指令重排只保證單線程下程序的執(zhí)行結果,編譯可以優(yōu)化為132的順序,這樣在getInstance方法調用時singleton == null。加上volatile可以避免這個問題。
二.CAS (CompareAndSwap)比較并交換
?CAS是一種無鎖編程,對比synchronized效率更高。CAS操作包含三個操作數(shù)——內存位置(V),預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那么處理器將會自動將該位置值更新為新值,否則,不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。
通過以上定義我們知道CAS其實是有三個步驟的
??1.讀取內存中的值
??2.將讀取的值和預期的值比較
??3.如果比較的結果符合預期,則寫入新值
?CAS 體現(xiàn)在 JAVA 語言中就是 sun.misc.Unsafe 類中的各個方法。調用 UnSafe 類中的 CAS 方法,JVM 會幫我們實現(xiàn)出 CAS 匯編指令。這是一種完全依賴硬件的功能,通過它實現(xiàn)了原子操作。由于 CAS 是一種系統(tǒng)源語,源語屬于操作系統(tǒng)用語范疇,是由若干條指令組成,用于完成某一個功能的過程,并且原語的執(zhí)行必須是連續(xù)的,在執(zhí)行的過程中不允許被中斷,也就是說 CAS 是一條原子指令,不會造成所謂的數(shù)據(jù)不一致的問題。
UnSafe類
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
?Unsafe 是 CAS 的核心類,由于 Java 方法無法直接訪問底層系統(tǒng),而需要通過本地(native)方法來訪問, Unsafe 類相當一個后門,基于該類可以直接操作特定內存的數(shù)據(jù)。Unsafe 類存在于 sun.misc 包中,其內部方法操作可以像 C 指針一樣直接操作內存,因為 Java 中 CAS 操作執(zhí)行依賴于 Unsafe 類。
?變量 vauleOffset,表示該變量值在內存中的偏移量,因為 Unsafe 就是根據(jù)內存偏移量來獲取數(shù)據(jù)的。
?變量 value 用 volatile 修飾,保證了多線程之間的內存可見性。
AtomicInteger示例代碼
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(2019);
System.out.println(i.compareAndSet(2019, 2020)); //true
System.out.println(i); //2020
AtomicInteger n = new AtomicInteger();
System.out.println(n.compareAndSet(2019, 2020)); //false
System.out.println(n); // 0
}
}
除了JAVA提供的基本類型外還提供了AtuomicRefence 原子引用類,可以處理對象的原子類。
AtomicRefence示例代碼
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
public static void main(String[] args) {
User user = new User("zhangsan", 18);
AtomicReference atomicReference = new AtomicReference(user);
atomicReference.compareAndSet(user,new User("lisi",20));
System.out.println(atomicReference.get());
}
}
class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
CAS的缺點
?1.循環(huán)時間長開銷很大。如果 CAS 失敗,會一直嘗試,如果 CAS 長時間一直不成功,可能會給 CPU 帶來很大的開銷(比如線程數(shù)很多,每次比較都是失敗,就會一直循環(huán)),所以希望是線程數(shù)比較小的場景。
?2.只能對一個共享變量進行原子操作,多個變量的情況不可用。
?3.會出現(xiàn)ABA問題
ABA問題
?兩個線程修改共享變量,共享變量由A改為B,又由B改為A。此時另一個線程并不知道情況,以為共享變量的值沒有改變,將共享變量的值修改。
代碼示例
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicABADemo {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger();
new Thread(new Runnable() {
@Override
public void run() {
i.compareAndSet(0,1);
System.out.println("i的值由0改為1");
i.compareAndSet(1,0);
System.out.println("i的值由1改為0");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i.compareAndSet(0,2019);
System.out.println("ABA問題出現(xiàn)——》i的值由0改為2019");
}
}).start();
}
}
解決方案
?java提供了一個帶有版本號的原子引用類AtomicStampedRefence,實際上就是一個樂觀鎖。
代碼示例
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicDemo {
public static void main(String[] args) {
User user = new User("zhangsan",18);
// 傳入初始對象和版本號
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(user,0);
User user1 = new User("lisi",20);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(user,user1,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("版本號為" + atomicStampedReference.getStamp() + ",User對象信息為" + atomicStampedReference.getReference());
atomicStampedReference.compareAndSet(user1,user,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("版本號為" + atomicStampedReference.getStamp() + ",User對象信息為" + atomicStampedReference.getReference());
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedReference.getStamp();
User user2 = new User("wangwu",28);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(user,user2,stamp,stamp++);
System.out.println("版本號為" + atomicStampedReference.getStamp() + ",User對象信息為" + atomicStampedReference.getReference());
}
}).start();
}
}
class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
三 .Java中的鎖
1.公平鎖與非公平鎖
?公平鎖:是指多個線程按照申請的順序來獲取值
?非公平鎖:是值多個線程獲取值的順序并不是按照申請鎖的順序,有可能后申請的線程比先申請的線程優(yōu)先獲取鎖,在高并發(fā)的情況下,可能會造成優(yōu)先級翻轉或者饑餓現(xiàn)象
兩者區(qū)別
?公平鎖:在并發(fā)環(huán)境中,每一個線程在獲取鎖時會先查看此鎖維護的等待隊列,如果為空,或者當前線程是等待隊列的第一個就占有鎖,否者就會加入到等待隊列中,以后會按照 FIFO 的規(guī)則獲取鎖
?非公平鎖:一上來就嘗試占有鎖,如果失敗在進行排隊
代碼示例
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的構造方法中可傳入一個boolean值,表示是否是公平鎖。默認為非公平鎖。 synchronized是一種非公平鎖。
2.可重入鎖(遞歸鎖)和不可重入鎖
?可重入鎖:指的是同一個線程外層函數(shù)獲得鎖之后,內層仍然能獲取到該鎖,在同一個線程在外層方法獲取鎖的時候,在進入內層方法或會自動獲取該鎖。
?不可重入鎖: 若當前線程執(zhí)行某個方法已經(jīng)獲取了該鎖,那么在方法中嘗試再次獲取鎖時,就會獲取不到被阻塞。
代碼示例
public class ReentrantLockDemo {
public static void main(String[] args) {
method1();
}
public static synchronized void method1(){
System.out.println("method1執(zhí)行");
method2();
}
public static synchronized void method2(){
System.out.println("method2執(zhí)行");
}
}
?在main方法中調用method1()發(fā)現(xiàn)兩個方法都執(zhí)行了,說明synchronized是可重入鎖。ReentrantLock也是可重入鎖。
3.自旋鎖 類似CAS
?嘗試獲取鎖的線程不會立即堵塞,而是采用循環(huán)的方式去嘗試獲取鎖,這樣的好處是減少線程上線文切換的消耗,缺點就是循環(huán)會消耗 CPU。
代碼示例
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockTest {
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
new Thread(new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------");
spinLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("++++++++++++");
spinLock.unlock();
}
}).start();
}
}
class SpinLock {
private AtomicReference<Thread> atomicReference = new AtomicReference();
public void lock() {
Thread thread = Thread.currentThread();
while (!atomicReference.compareAndSet(null, thread)) {
}
System.out.println(thread.getName() + "獲取鎖");
}
public void unlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + "釋放鎖");
}
}
/*
Thread-0獲取鎖
------------
Thread-0釋放鎖
Thread-1獲取鎖
++++++++++++
Thread-1釋放鎖
*/
輸出結果表示加鎖成功。獲取鎖的時候,如果原子引用為空就獲取鎖,不為空表示有人獲取了鎖,就循環(huán)等待。
4.獨占鎖與共享鎖(讀寫鎖)
?獨占鎖:指該鎖一次只能被一個線程持有
?共享鎖:該鎖可以被多個線程持有
?Java中的ReentrantLock和synchronized都是獨占鎖。ReentrantReadWriteLock中的ReadLock是共享鎖,WriteLock是獨占鎖。多個線程可以同時持有ReadLock,讀寫·寫讀·寫寫都是互斥的。
代碼示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWriteLockTest {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
List list = new ArrayList();
for(int i = 0;i < 20 ; i++){
new Thread(new Runnable() {
@Override
public void run() {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread() + "寫開始");
list.add(Math.round(Math.random()*100));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "寫結束");
readWriteLock.writeLock().unlock();
}
}).start();
}
for(int i = 0;i < 20 ; i++){
new Thread(new Runnable() {
@Override
public void run() {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread() + "讀開始");
System.out.println(list);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "讀結束");
readWriteLock.readLock().unlock();
}
}).start();
}
}
}
5.synchronized 和 Lock
原始結構
??synchronized 是關鍵字屬于 JVM 層面,反應在字節(jié)碼上是 monitorenter 和 monitorexit,其底層是通過 monitor 對象來完成,其實 wait/notify 等方法也是依賴 monitor 對象只有在同步快或方法中才能調用 wait/notify 等方法。
??Lock 是具體類(java.util.concurrent.locks.Lock)是 api 層面的鎖。
使用方法
??synchronized 不需要用戶手動去釋放鎖,當 synchronized 代碼執(zhí)行完后系統(tǒng)會自動讓線程釋放對鎖的占用。
??ReentrantLock 則需要用戶手動的釋放鎖,若沒有主動釋放鎖,可能導致出現(xiàn)死鎖的現(xiàn)象,lock() 和 unlock() 方法需要配合 try/finally 語句來完成。
等待是否可中斷
??synchronized 不可中斷,除非拋出異?;蛘哒_\行完成。
??ReentrantLock 可中斷,設置超時方法 tryLock(long timeout, TimeUnit unit),lockInterruptibly() 放代碼塊中,調用 interrupt() 方法可中斷。
加鎖是否公平
??synchronized 非公平鎖
??ReentrantLock 默認非公平鎖,構造方法中可以傳入 boolean 值,true 為公平鎖,false 為非公平鎖。
鎖可以綁定多個 Condition
??synchronized 沒有 Condition。
??ReentrantLock 用來實現(xiàn)分組喚醒需要喚醒的線程們,可以精確喚醒,而不是像 synchronized 要么隨機喚醒一個線程要么喚醒全部線程。
| 類別 | synchronized | Lock |
|---|---|---|
| 存在層次 | Java的關鍵字,在jvm層面上 | 是一個類 |
| 鎖的釋放 | 1、以獲取鎖的線程執(zhí)行完同步代碼,釋放鎖 2、線程執(zhí)行發(fā)生異常,jvm會讓線程釋放鎖 | 在finally中必須釋放鎖,不然容易造成線程死鎖 |
| 鎖的獲取 | 假設A線程獲得鎖,B線程等待。如果A線程阻塞,B線程會一直等待 | 分情況而定,Lock有多個鎖獲取的方式,具體下面會說道,大致就是可以嘗試獲得鎖,線程可以不用一直等待 |
| 鎖狀態(tài) | 無法判斷 | 可以判斷 |
| 鎖類型 | 可重入 不可中斷 非公平 | 可重入 可判斷 可公平(兩者皆可) |
| 性能 | 少量同步 | 大量同步 |
JDK1.6以后,為了減少獲得鎖和釋放鎖所帶來的性能消耗,提高性能,引入了“輕量級鎖”和“偏向鎖”。官方更建議使用synchronized。詳情見Java中的偏向鎖,輕量級鎖, 重量級鎖解析
代碼示例:synchronized實現(xiàn)生產(chǎn)消費模型
import java.util.concurrent.TimeUnit;
public class ProdConsume_Synchronized {
private int count = 0;
public static final int FULL = 10;
private volatile boolean flag = true;
private Object lock;
public ProdConsume_Synchronized(Object lock) {
this.lock = lock;
}
public static void main(String[] args) {
Object lock = new Object();
ProdConsume_Synchronized prodConsume_synchronized = new ProdConsume_Synchronized(lock);
new Thread(new Runnable() {
@Override
public void run() {
try {
prodConsume_synchronized.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
prodConsume_synchronized.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_synchronized.flag = false;
}
public void prod() throws InterruptedException {
while (flag) {
synchronized (lock) {
while (count == FULL) {
lock.wait();
}
count++;
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
System.out.println(round + "號商品生產(chǎn)完畢,還有" + count + "個商品");
lock.notifyAll();
}
}
}
public void consume() throws InterruptedException {
while (flag) {
synchronized (lock) {
while (count == 0) {
lock.wait();
}
count--;
Thread.sleep(500);
System.out.println("取走一個商品,還有" + count + "個商品");
lock.notifyAll();
}
}
}
}
輸出結果
643號商品生產(chǎn)完畢,還有1個商品
322號商品生產(chǎn)完畢,還有2個商品
819號商品生產(chǎn)完畢,還有3個商品
877號商品生產(chǎn)完畢,還有4個商品
112號商品生產(chǎn)完畢,還有5個商品
904號商品生產(chǎn)完畢,還有6個商品
978號商品生產(chǎn)完畢,還有7個商品
569號商品生產(chǎn)完畢,還有8個商品
949號商品生產(chǎn)完畢,還有9個商品
661號商品生產(chǎn)完畢,還有10個商品
取走一個商品,還有9個商品
取走一個商品,還有8個商品
取走一個商品,還有7個商品
取走一個商品,還有6個商品
取走一個商品,還有5個商品
取走一個商品,還有4個商品
取走一個商品,還有3個商品
124號商品生產(chǎn)完畢,還有4個商品
代碼示例:ReentrantLock實現(xiàn)生產(chǎn)消費模型
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProdConsume_ReentrantLock {
private int count = 0;
public static final int FULL = 10;
private volatile boolean flag = true;
private Lock lock = new ReentrantLock();
private Condition condition_prod = lock.newCondition();
private Condition condition_consume = lock.newCondition();
public static void main(String[] args) {
ProdConsume_ReentrantLock prodConsume_reentrantLock = new ProdConsume_ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_reentrantLock.prod();
}
}).start();
}
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_reentrantLock.consume();
}
}).start();
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_reentrantLock.flag = false;
}
public void prod() {
while (flag) {
try {
lock.lock();
try {
while (count == FULL) {
condition_prod.await();
}
count++;
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
System.out.println(round + "號商品生產(chǎn)完畢,還有" + count + "個商品");
condition_consume.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
public void consume() {
while (flag) {
try {
lock.lock();
try {
while (count == 0) {
condition_consume.await();
}
count--;
Thread.sleep(500);
System.out.println("取走一個商品,還有" + count + "個商品");
condition_prod.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
代碼示例:ReentrantLock實現(xiàn)精準喚醒
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrecisionWeakUp_ReentrantLock {
private final Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
private int flag = 1;
// 多線程下先輸出五次A 再輸出五次B 再輸出五次C
public static void main(String[] args) {
PrecisionWeakUp_ReentrantLock precisionWeakUp_reentrantLock = new PrecisionWeakUp_ReentrantLock();
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
public void printA() throws InterruptedException {
try {
lock.lock();
while (flag != 1) {
conditionA.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("A");
}
flag = 2;
conditionB.signalAll();
}finally {
lock.unlock();
}
}
public void printB() throws InterruptedException {
try {
lock.lock();
while (flag != 2) {
conditionB.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("B");
}
flag = 3;
conditionC.signalAll();
}finally {
lock.unlock();
}
}
public void printC() throws InterruptedException {
try {
lock.lock();
while (flag != 3) {
conditionC.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("C");
}
flag = 1;
conditionA.signalAll();
}finally {
lock.unlock();
}
}
}
四 .JUC包的并發(fā)工具類
1.CountDownLatch
?CountDownLatch中count down是倒數(shù)的意思,latch則是門閂的含義。整體含義可以理解為倒數(shù)的門栓。在構造CountDownLatch的時候需要傳入一個整數(shù)n,在這個整數(shù)“倒數(shù)”到0之前,主線程需要等待在門口,而這個“倒數(shù)”過程則是由各個執(zhí)行線程驅動的,每個線程執(zhí)行完一個任務“倒數(shù)”一次??偨Y來說,CountDownLatch的作用就是等待其他的線程都執(zhí)行完任務,必要時可以對各個任務的執(zhí)行結果進行匯總,然后主線程才繼續(xù)往下執(zhí)行。
? CountDownLatch主要有兩個方法:countDown()和await()。countDown()方法用于使計數(shù)器減一,其一般是執(zhí)行任務的線程調用,await()方法則使調用該方法的線程處于等待狀態(tài),其一般是主線程調用。這里需要注意的是,countDown()方法并沒有規(guī)定一個線程只能調用一次,當同一個線程調用多次countDown()方法時,每次都會使計數(shù)器減一;另外,await()方法也并沒有規(guī)定只能有一個線程執(zhí)行該方法,如果多個線程同時執(zhí)行await()方法,那么這幾個線程都將處于等待狀態(tài),并且以共享模式享有同一個鎖。
代碼示例
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) {
List list = new ArrayList();
List synchronizedList = Collections.synchronizedList(list);
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 0;i < 5;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
long round = Math.round(Math.random() * 100);
synchronizedList.add(round);
System.out.println(round + "添加進List");
countDownLatch.countDown();
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(synchronizedList);
}
}
輸出結果
68添加進List
91添加進List
57添加進List
12添加進List
98添加進List
[68, 57, 12, 98, 91]
?CountDownLatch非常適合于對任務進行拆分,使其并行執(zhí)行,比如某個任務執(zhí)行2s,其對數(shù)據(jù)的請求可以分為五個部分,那么就可以將這個任務拆分為5個子任務,分別交由五個線程執(zhí)行,執(zhí)行完成之后再由主線程進行匯總,此時,總的執(zhí)行時間將決定于執(zhí)行最慢的任務,平均來看,還是大大減少了總的執(zhí)行時間。
2. CyclicBarrier
?CyclicBarrier同步屏障,可以讓一組線程達到一個屏障時被阻塞,直到最后一個線程達到屏障時,所以被阻塞的線程才能繼續(xù)執(zhí)行。
?CyclicBarrier好比一扇門,默認情況下關閉狀態(tài),堵住了線程執(zhí)行的道路,直到所有線程都就位,門才打開,讓所有線程一起通過。
CyclicBarrier的構造方法
?1.CyclicBarrier(int parties):創(chuàng)建一個新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,但它不會在啟動 barrier 時執(zhí)行預定義的操作。
?2.CyclicBarrier(int parties, Runnable barrierAction) :創(chuàng)建一個新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,并在啟動 barrier 時執(zhí)行給定的屏障操作,該操作由最后一個進入 barrier 的線程執(zhí)行。
代碼示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
//景區(qū)觀光車循環(huán)發(fā)車,每一輛車一個五個座位,坐滿發(fā)車
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("人員已到位,出發(fā)");
}
});
for(int i = 1;i <= 5;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(n + "號游客上車");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 6;i <= 10;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(n + "號游客上車");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
輸出結果
1號游客上車
3號游客上車
2號游客上車
5號游客上車
4號游客上車
人員已到位,出發(fā)
6號游客上車
7號游客上車
8號游客上車
9號游客上車
10號游客上車
人員已到位,出發(fā)
?每當線程執(zhí)行await,內部變量count減1,如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。
?當count == 0時,說明所有線程都已經(jīng)到屏障處,執(zhí)行條件變量的signalAll方法喚醒等待的線程。
CountDownLatch與CyclicBarrier比較
| CountDownLatch | CyclicBarrier |
|---|---|
| 減計數(shù)方式 | 加計數(shù)方式 |
| 計算為0時釋放所有等待的線程 | 計數(shù)達到指定值時釋放所有等待線程 |
| 計數(shù)為0時,無法重置 | 計數(shù)達到指定值時,計數(shù)置為0重新開始 |
| 調用countDown()方法計數(shù)減一,調用await()方法只進行阻塞,對計數(shù)沒任何影響 | 調用await()方法計數(shù)加1,若加1后的值不等于構造方法的值,則線程阻塞 |
| 不可重復利用 | 可重復利用 |
3.Semaphore
?ReentrantLock和Synchronized一次都只允許一個線程訪問一個資源。Semaphore允許多個線程同時訪問同一個資源。
?Semaphore管理著一組許可(permit),許可的初始數(shù)量可以通過構造函數(shù)設定,操作時首先要獲取到許可,才能進行操作,操作完成后需要釋放許可。如果沒有獲取許可,則阻塞到有許可被釋放。如果初始化了一個許可為1的Semaphore,那么就相當于一個不可重入的互斥鎖。其中0、1就相當于它的狀態(tài),當=1時表示其他線程可以獲取,當=0時,排他,即其他線程必須要等待。
Semaphore的構造方法
?1.Semaphore(int permits) :創(chuàng)建具有給定的許可數(shù)和非公平的公平設置的 Semaphore,默認非公平鎖。
?2.Semaphore(int permits, boolean fair) :創(chuàng)建具有給定的許可數(shù)和給定的公平設置的 Semaphore。
代碼示例
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 六個車搶三個車位
Semaphore semaphore = new Semaphore(3);
for(int i = 1;i <= 6;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(n + "號搶到車位——————————");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(n + "號離開車位++++++++++");
semaphore.release();
}
}
}).start();
}
}
}
輸出結果
1號搶到車位——————————
2號搶到車位——————————
3號搶到車位——————————
2號離開車位++++++++++
1號離開車位++++++++++
3號離開車位++++++++++
4號搶到車位——————————
6號搶到車位——————————
5號搶到車位——————————
6號離開車位++++++++++
4號離開車位++++++++++
5號離開車位++++++++++
?Semaphore在限制流量方面有非常多的應用,比如程序跑批高峰時幾萬個數(shù)據(jù)庫的連接同時操作,為了不影響其他用戶訪問只允許同時開放十條連接。
四.線程安全的集合類
?Java中的集合包括三大類,它們是Set、List和Map它們都處于java.util包中,Set、List和Map都是接口,它們有各自的實現(xiàn)類。
1.List(列表)
?實現(xiàn)類主要有ArrayList,LinkedList,Vector
?ArrayList,LinkedList為線程不安全的集合類,ArrayList底層是數(shù)組而LinkedList底層實現(xiàn)為鏈表。Vector和ArrayList類似,是長度可變的數(shù)組。Vector是線程安全的,它給幾乎所有的public方法都加上了synchronized關鍵字。由于加鎖導致性能降低,在不需要并發(fā)訪問同一對象時,這種強制性的同步機制就顯得多余,所以現(xiàn)在Vector已被棄用。
2.Set(集)
?實現(xiàn)類主要有HashSet,TreeSet
?HashSet是一個無序的集合,基于HashMap實現(xiàn);TreeSet是一個有序的集合,基于TreeMap實現(xiàn)。HashSet集合中允許有null元素,TreeSet集合中不允許有null元素。HashSet和TreeSet都是線程不安全的。
3.Map(映射)
?實現(xiàn)類主要有HashMap,TreeMap,HashTable
?HashTable和HashMap類似,不同點是HashTable是線程安全的,它給幾乎所有public方法都加上了synchronized關鍵字,還有一個不同點是HashTable的K,V都不能是null,但HashMap可以,它現(xiàn)在也因為性能原因被棄用。TreeMap也是線程不安全的。
4.除廢棄的集合類外還有哪些方法可以保證線程安全
?1.Collections包裝方法
??Collections工具類中提供了相應的包裝方法把它們包裝成線程安全的集合
List<E> synArrayList = Collections.synchronizedList(new ArrayList<E>());
Set<E> synHashSet = Collections.synchronizedSet(new HashSet<E>());
Map<K,V> synHashMap = Collections.synchronizedMap(new HashMap<K,V>());
?2.java.util.concurrent包中的集合
??CopyOnWriteArrayList和CopyOnWriteArraySet
?? ?CopyOnWriteArrayList 中的set、add、remove等方法,都使用了ReentrantLock的lock來加鎖, unlock來解鎖當增加元素的時候使用Arrays.copyOf()來拷貝副本,在副本上增加元素,然后改變原來引用的指向副本。讀操作不需要加鎖,因此,CopyOnWriteArrayList類是一個線程安全的List接口實現(xiàn),這對于讀操作遠遠多于寫操作的應用非常適合,特別是在并發(fā)的情況下,可以提供高性能的并發(fā)讀取,并保證讀取的內容一定是正確的,不受多線程并發(fā)問題的影響。
?? ConcurrentHashMap
? ? ? 1.8版本的ConcurrentHashMap拋棄了原有的 Segment 分段鎖,而采用了 CAS + synchronized 來保證并發(fā)安全性。詳情見HashMap? ConcurrentHashMap? 相信看完這篇沒人能難住你!
5.CopyOnWrite機制
??CopyOnWrite容器即寫是復制的容器。通俗的理解就是當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后新的容器里添加元素,添加完元素后,再將原容器的引用指向新的容器。這樣做的好處就是我們可以對CopyOnWrite容器進行并發(fā)的讀,而不需要加鎖,因為當前容器不會添加任何元素.所以,CopyOnWrite容器也是一種讀寫分離的思想。讀和寫不容的容器。
??ArrayList里添加元素,在添加的時候是需要加鎖的,否則多線程寫的時候會copy出多個副本出來
讀的時候不需要加鎖,如果讀的時候有多線程正在像ArrayList中添加數(shù)據(jù),還是會讀到舊的數(shù)據(jù),因為寫的時候不會鎖住舊的ArrayList
??1、使用場景:讀多寫少
??2、使用注意點:減少擴容開銷;b、使用批量添加
?缺點:
??a、內存占用問題
??b、數(shù)據(jù)一致性問題
五.BlockingQueue(阻塞隊列)
?在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:
??1、當阻塞隊列是空時,從隊列中獲取元素的操作將會被阻塞。
??2、當阻塞隊列是滿時,往隊列里添加元素的操作將會被阻塞。
?當一個線程對已經(jīng)滿了的阻塞隊列進行入隊操作時會阻塞,除非有另外一個線程進行了出隊操作,當一個線程對一個空的阻塞隊列進行出隊操作時也會阻塞,除非有另外一個線程進行了入隊操作。
| 操作 | 拋異常 ThrowsException | 特定值 SpecialValue | 阻塞 Blocks | 超時 TimesOut |
|---|---|---|---|---|
| 插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| 移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
| 檢查 | element(o) | peek(o) |
?這四類方法分別對應的是:
??1、ThrowsException :如果操作不能馬上進行,則拋出異常
??2、SpecialValue :如果操作不能馬上進行,將會返回一個特殊的值,一般是true或者false
??3、Blocks : 如果操作不能馬上進行,操作會被阻塞
??4、TimesOut : 如果操作不能馬上進行,操作會被阻塞指定的時間,如果指定時間沒執(zhí)行,則返回一個特殊值,一般是true或者false
?插入方法
??add(E e):添加成功返回true,失敗拋 IllegalStateException 異常
??offer(E e):成功返回 true,如果此隊列已滿,則返回 false
??put(E e):將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞
?刪除方法
??remove(Object o) :移除指定元素,成功返回true,失敗返回false
??poll():獲取并移除此隊列的頭元素,若隊列為空,則返回 null
??take():獲取并移除此隊列頭元素,若沒有元素則一直阻塞
?檢查方法
??element() :獲取但不移除此隊列的頭元素,沒有元素則拋異常
??peek() :獲取但不移除此隊列的頭;若隊列為空,則返回 null
BlockingQueue的七個實現(xiàn)類
??1、ArrayBlockingQueue :一個由數(shù)組結構組成的有界阻塞隊列。
??2、LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
??3、PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列。
??4、DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。
??5、SynchronousQueue:一個不存儲元素的阻塞隊列。
??6、LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
??7、LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
??注意:第七個實現(xiàn)類的末尾為Deque
??詳細信息Java并發(fā)編程-阻塞隊列(BlockingQueue)的實現(xiàn)原理
代碼示例:BlockingQueue實現(xiàn)生產(chǎn)消費模型
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsume_BlockingQueue {
private AtomicInteger atomicInteger = new AtomicInteger();
private BlockingQueue blockingQueue;
private volatile boolean flag = true;
public ProdConsume_BlockingQueue(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void prod() {
while (flag) {
try {
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
blockingQueue.put(round);
int i = atomicInteger.incrementAndGet();
System.out.println(round + "號商品生產(chǎn)完畢放入隊列,隊列中還有" + i + "個商品");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void consum() {
while (flag) {
try {
Thread.sleep(500);
Object take = blockingQueue.take();
atomicInteger.decrementAndGet();
System.out.println(take + "號商品被購買");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
ProdConsume_BlockingQueue prodConsume_blockingQueue = new ProdConsume_BlockingQueue(blockingQueue);
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_blockingQueue.prod();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_blockingQueue.consum();
}
}).start();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_blockingQueue.flag = false;
}
}
輸出結果
180號商品生產(chǎn)完畢放入隊列,隊列中還有1個商品
180號商品被購買
331號商品生產(chǎn)完畢放入隊列,隊列中還有1個商品
201號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
331號商品被購買
201號商品被購買
897號商品生產(chǎn)完畢放入隊列,隊列中還有1個商品
897號商品被購買
588號商品生產(chǎn)完畢放入隊列,隊列中還有1個商品
217號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
588號商品被購買
154號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
217號商品被購買
592號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
154號商品被購買
442號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
592號商品被購買
712號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
442號商品被購買
712號商品被購買
893號商品生產(chǎn)完畢放入隊列,隊列中還有1個商品
15號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
893號商品被購買
496號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
4號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
15號商品被購買
534號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
172號商品生產(chǎn)完畢放入隊列,隊列中還有4個商品
496號商品被購買
4號商品被購買
891號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
534號商品被購買
414號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
172號商品被購買
891號商品被購買
920號商品生產(chǎn)完畢放入隊列,隊列中還有2個商品
71號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
144號商品生產(chǎn)完畢放入隊列,隊列中還有4個商品
414號商品被購買
586號商品生產(chǎn)完畢放入隊列,隊列中還有4個商品
920號商品被購買
71號商品被購買
553號商品生產(chǎn)完畢放入隊列,隊列中還有3個商品
五.實現(xiàn)多線程的幾種方式
? 1. 繼承Thread類,重寫run方法
? ? 略
? 2. 實現(xiàn)Runnable接口,重寫run方法
? ? 略
? 3. 實現(xiàn)Callable接口,重寫call方法,通過FutureTask包裝器來創(chuàng)建Thread線程
? ? Runnable和Callable的區(qū)別:
? ??1、Callable規(guī)定的方法是call(),Runnable規(guī)定的方法是run().
? ??2、Callable的任務執(zhí)行后可返回值,而Runnable的任務是不能返回值得
? ??3、call方法可以拋出異常,run方法不可以
? ??4、運行Callable任務可以拿到一個Future對象,表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并檢索計算的結果。通過Future對象可以了解任務執(zhí)行情況,可取消任務的執(zhí)行,還可獲取執(zhí)行結果。
? ? Future接口
? ?? Future是一個接口,代表了一個異步計算的結果。接口中的方法用來檢查計算是否完成、等待完成和得到計算的結果。
? ??當計算完成后,只能通過get()方法得到結果,get方法會阻塞直到結果準備好了。
? ??如果想取消,那么調用cancel()方法。其他方法用于確定任務是正常完成還是取消了。一旦計算完成了,那么這個計算就不能被取消。
??FutureTask類
? ?? FutureTask類實現(xiàn)了RunnableFuture接口,而RunnnableFuture接口繼承了Runnable和Future接口,所以說FutureTask是一個提供異步計算的結果的任務。
? ??FutureTask可以用來包裝Callable或者Runnbale對象。因為FutureTask實現(xiàn)了Runnable接口,所以FutureTask也可以被提交給Executor。
??Callable兩種執(zhí)行方式
? ?1、借助FutureTask執(zhí)行
? ??FutureTask類同時實現(xiàn)了兩個接口,F(xiàn)uture和Runnable接口,所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值。
? ?代碼示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableDemo {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Callable");
Thread.sleep(1000);
return (int)(Math.random()*100);
}
});
futureTask.run();
//如果沒有執(zhí)行完一直阻塞
while (!futureTask.isDone()){
}
Integer integer = null;
try {
integer = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(integer);
}
}
? ?2、借助線程池來運行
? ????????↓
? 4. 線程池ThreadPoolExecuter
? ????????↓
六.線程池ThreadPoolExecutor
? 線程池主要是控制運行線程的數(shù)量,處理過程中將任務放入隊列,然后在線程創(chuàng)建后啟動這些任務,如果線程數(shù)量超過了最大數(shù)量的線程排隊等候,等其他線程執(zhí)行完畢,再從隊列中取出任務來執(zhí)行。
?主要特點是:線程復用、控制最大并發(fā)數(shù)、管理線程。
??1.降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
??2.提高響應速度。當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
??3.提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調優(yōu)和監(jiān)控。

ThreadPoolExecutor的構造方法

?7個參數(shù)的構造方法 代碼示例
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
| 序號 | 名稱 | 類型 | 含義 |
|---|---|---|---|
| 1 | corePoolSize | int | 線程池中的常駐核心線程數(shù) |
| 2 | maximumPoolSize | int | 線程池能夠容納同時執(zhí)行的最大線程數(shù) |
| 3 | keepAliveTime | long | 多余空閑線程的存活時間 |
| 4 | unit | TimeUnit | keepAliveTime的單位 |
| 5 | workQueue | BlockingQueue | 被提交但尚未被執(zhí)行的任務隊列 |
| 6 | threadFactory | ThreadFactory | 線程池中工作線程的線程工廠 |
| 7 | handler | RejectedExecutionHandler | 拒絕策略 |
??1. int corePoolSize:線程池中的常駐核心線程數(shù)
???核心線程:線程池新建線程的時候,如果當前線程總數(shù)小于corePoolSize,則新建的是核心線程,如果超過corePoolSize,則新建的是非核心線程核心線程默認情況下會一直存活在線程池中,即使這個核心線程是閑置狀態(tài)。
???如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那么核心線程如果為閑置狀態(tài),超過一定時間(時長下面參數(shù)決定),就會被銷毀掉。
??2.int maximumPoolSize:線程池能夠容納同時執(zhí)行的最大線程數(shù)
???maximumPoolSize此值必須大于等于1.
???maximumPoolSize = corePoolSize + 非核心線程數(shù)(可緩沖的線程數(shù))。
??3.long keepAliveTime:多余空閑線程的存活時間
???當前線程池數(shù)量超過corePoolSize時,當空閑時間達到keepAliveTime值時,多余空閑線程會被銷毀直到只剩下corePoolSize個線程為止。
???默認情況下,只有當線程池中的線程數(shù)大于corePoolSize時keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize。
??4.TimeUnit unit:keepAliveTime的單位
??5.BlockingQueue workQueue:被提交但尚未被執(zhí)行的任務隊列
???當所有的核心線程都在工作時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執(zhí)行任務。
??6.ThreadFactory threadFactory:創(chuàng)建線程的方式。
???用于創(chuàng)建線程,一般用默認的即可。
??7.RejectedExecutionHandler handler:拒絕策略
???當提交任務數(shù)超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理。
jdk1.5提供的四種拒絕策略 :
?1.AbortPolicy(默認):直接拋出RejectedExecutionException異常阻止系統(tǒng)正常運行。
?2.CallerRunsPolicy:“調用者運行”一種調節(jié)機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。
?3.DiscardOldestPolicy:拋棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交當前任務。
?4.DiscardPolicy:直接丟棄任務,不予任何處理也不拋出異常。如果允許任務丟失,這是最好的一種方案。
?以上內置拒絕策略均實現(xiàn)了RejectedExecutionHandler接口
線程池的處理流程

?線程池判斷核心線程池里是的線程是否都在執(zhí)行任務,如果不是(正在執(zhí)行的線程數(shù) 小于 corePoolSize),則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果核心線程池里的線程都在執(zhí)行任務(正在執(zhí)行的線程數(shù) 大于 corePoolSize),則進入下一個流程
?線程池判斷工作隊列是否已滿。如果工作隊列沒有滿,則將新提交的任務儲存在這個工作隊列里。如果工作隊列滿了,則進入下一個流程。
?線程池判斷其內部線程是否都處于工作狀態(tài)。如果沒有正在(運行的線程數(shù)量小于maximumPoolSize),則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果已滿了(運行的線程數(shù)量 大于 maximumPoolSize),則交給飽和策略來處理這個任務。
?當一個線程完成任務時,它會從隊列中取下一個任務來執(zhí)行。
?當一個線程無事可做超過一定的時間(keepAlilveTime)時,線程池會判斷:如果當前運行的線程數(shù)大于corePoolSize,那么這個線程就被停掉。線程池的所有任務完成后最終會收縮到corePoolSize的大小。
線程池執(zhí)行時的四種情況

??如果當前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務
??如果運行的線程等于或多于corePoolSize ,則將任務加入BlockingQueue
??如果無法將任務加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務
??如果創(chuàng)建新線程將使當前運行的線程超出maxiumPoolSize,任務將被拒絕,并調用RejectedExecutionHandler.rejectedExecution()方法。
Java提供的線程池
?Java在Executors工具類中提供了5種線程池
??1. SingleThreadExecutor 單一線程池
???它只會創(chuàng)建一條工作線程處理任務;
???采用的阻塞隊列為LinkedBlockingQueue;
??2.FixedThreadPool 定長線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
???它是一種固定大小的線程池;
???corePoolSize和maximunPoolSize都為用戶設定的線程數(shù)量nThreads;
???keepAliveTime為0,意味著一旦有多余的空閑線程,就會被立即停止掉;但這里keepAliveTime無效;
???阻塞隊列采用了LinkedBlockingQueue,它是一個無界隊列;
???由于阻塞隊列是一個無界隊列,因此永遠不可能拒絕任務;
???由于采用了無界隊列,實際線程數(shù)量將永遠維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。
??3. CachedThreadPool 可緩存線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
???它是一個可以無限擴大的線程池;
???它比較適合處理執(zhí)行時間比較小的任務;
corePoolSize為0,maximumPoolSize為無限大,意味著線程數(shù)量可以無限大;
???keepAliveTime為60S,意味著線程空閑時間超過60S就會被殺死;
???采用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味著只要有請求到來,就必須要找到一條工作線程處理他,如果當前沒有空閑的線程,那么就會再創(chuàng)建一條新的線程。
??4. ScheduledThreadPool 可調度的線程池
???它用來處理延時任務或定時任務。
???它接收SchduledFutureTask類型的任務,有兩種提交任務的方式:
????scheduledAtFixedRate
????scheduledWithFixedDelay
???SchduledFutureTask接收的參數(shù):
????time:任務開始的時間
????sequenceNumber:任務的序號
????period:任務執(zhí)行的時間間隔
???它采用DelayQueue存儲等待的任務
DelayQueue內部封裝了一個PriorityQueue,它會根據(jù)time的先后時間排序,若time相同則根據(jù)sequenceNumber排序;
???DelayQueue也是一個無界隊列;
???工作線程的執(zhí)行過程:
????工作線程會從DelayQueue取已經(jīng)到期的任務去執(zhí)行;
執(zhí)行結束后重新設置任務的到期時間,再次放回DelayQueue
??5. newWorkStealingPool Java8新增,使用可用的處理器作為它的并行級別
???待補充
生產(chǎn)上應該使用哪種線程池
?在阿里巴巴Java開發(fā)手冊并發(fā)處理章節(jié)中嚴禁使用Java提供的線程池,所以生產(chǎn)上只能使用自定義的線程池。
【強制】線程資源必須通過線程池提供,不允許在應用中自行顯式創(chuàng)建線程。 說明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時間以及系統(tǒng)資源的開銷,解決資源不足的問題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導致消耗完內存或者“過度切換”的問題。
【強制】線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。說明:Executors返回的線程池對象的弊端如下:
?1)FixedThreadPool和SingleThreadPool:允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。
?2)CachedThreadPool和ScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致OOM。
向線程池提交任務
?1.void execute(Runnable command)
??用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執(zhí)行成功
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
?2.<T> Future<T> submit(Callable<T> task)
??用于提交需要返回值的任務
Future<Integer> future = executorService.submit(() -> (int) Math.random());
Integer i = future.get();
System.out.println(i);
關閉線程池
?ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分別是shutdown()和shutdownNow(),它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,但這兩種方式對于正在執(zhí)行的線程處理方式不同。
?1.shutdown()
??僅停止阻塞隊列中等待的線程,那些正在執(zhí)行的線程就會讓他們執(zhí)行結束。
?2.shutdownNow()
??不僅會停止阻塞隊列中的線程,而且會停止正在執(zhí)行的線程。
合理配置線程池
?CPU 密集型
??CPU 密集的意思是該任務需要大量的運算,而沒有阻塞,CPU 一直全速運行。
??CPU 密集型任務盡可能的少的線程數(shù)量,一般為 CPU 核數(shù) + 1 個線程的線程池。
?IO 密集型
??由于 IO 密集型任務線程并不是一直在執(zhí)行任務,可以多分配一點線程數(shù),如 CPU * 2 。
??也可以使用公式:CPU 核數(shù) / (1 - 阻塞系數(shù));其中阻塞系數(shù)在 0.8 ~ 0.9 之間。