- 多線程出現(xiàn)目的
- 如何使用多線程
- 線程狀態(tài)(6種)
- 線程開啟/停止
- 線程安全
- Volilate
- Sychronized
- 機制
- 如何實現(xiàn)鎖
- 為什么任何一個對象都可以成為鎖
- 鎖的優(yōu)化
- 機制
- Lock與Synchronized區(qū)別
- CAS
- AQS(AbstractQueuedSychronizer)
- ReentrantLock
- Lock()加鎖分析
- unlock()釋放鎖分析
- CountDownLatch
- 是什么
- 如何使用
- 分析
- ReentrantLock
多線程出現(xiàn)目的
場景:
- 當一個進程處理過程中,遇到網(wǎng)絡(luò)與IO操作都會進入阻塞狀態(tài),不再處理任何東西,浪費系統(tǒng)資源。
- 一個函數(shù)的處理非常耗時,其實其中多個邏輯可以并行處理。
多線程的面世就是要解決以上問題。
如何使用多線程
- extends Thread
public class ThreadDemo extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + "ThreadDemo Running");
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new ThreadDemo().start();
}
}
}
- implements Runnable
public class RunnableDemo implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + "RunnableDemo Running");
}
public static void main(String[] args) {
RunnableDemo runnableDemo = new RunnableDemo();
for (int i = 0; i < 10; i++) {
new Thread(runnableDemo).start();
}
}
}
- ExecutorService
- Executors.newFixedThreadPool
- Executors.newCachedThreadPool
- Executors.newSingleThreadPool
- Executors.newScheduledThreadPool
public class ExecutorServiceDemo {
private static ThreadPoolExecutor threadPool;
private static ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger integer = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
int threadName = integer.getAndIncrement();
System.out.println("Created Thread:" + threadName);
return new Thread(r, "ThreadPool Thread:" + threadName);
}
};
private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
threadPool = new ThreadPoolExecutor(10, 15, 1000L,
TimeUnit.SECONDS,
workQueue,
factory);
//execute()與submit()的區(qū)別在于submit有一個Future類型的返回,
// 實際submit是把Callable入?yún)b成RunnableFuture類型后再調(diào)用execute();
for (int i = 0; i < 15; i++) {
System.out.println("threadPool.execute");
threadPool.execute(new RunnableDemo());
}
for (int i = 0; i < 15; i++) {
System.out.println("threadPool.submit");
Future<?> future = threadPool.submit(new CallableDemo());
System.out.println(future.get());
}
}
}
- implements Callable<>
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ":" +"CallableDemo Running");
return "Callable Result";
}
public static void main(String[] args) throws Exception {
CallableDemo callableDemo = new CallableDemo();
String callableReturn = callableDemo.call();
System.out.println("callableReturn :" + callableReturn);
}
}
Callable與Runable區(qū)別:
Re:
- Callable任務(wù)線程能返回執(zhí)行結(jié)果,而Runnable任務(wù)線程不能返回結(jié)果
- Callable能向上拋出異常,而Runnable接口異常只能內(nèi)部消化
為什么提供extends Thread又提供implements Runnable
Re:因為JAVA不支持多繼承
線程狀態(tài)(6種)

- NEW 調(diào)用Start方法前
- RUNNABLE 運行
- BLOCKED 阻塞
- 等待阻塞 wait
- 同步阻塞 synchronized
- 其它阻塞 sleep/join
- WAITING 等待
- TIMED_WAITING 超時等待
- TERMINATED 終止
狀態(tài)變更圖示:

線程開啟/停止
開始:start()
停止:interrupt()
通過設(shè)置標志位的方式終止線程,使其能有機會去清理資源,而非暴力的方式直接kill掉,這種方式更新安全。
public class demo4 {
private static int num;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
num++;
System.out.println("Num:" + num);
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
線程安全
- 可見性
- 原子性
- 有序性
Volilate
public class VolatileDemo {
private volatile static boolean stop = false;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
System.out.println("i: " + i);
}
});
try {
long startTime = System.currentTimeMillis();
thread.start();
System.out.println("Thread Start");
TimeUnit.SECONDS.sleep(1);
stop = true;
long endTime = System.currentTimeMillis();
System.out.println("Runtime: " + (endTime - startTime) / 1000 + " Second");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
因為每個線程有自己私有的內(nèi)存空間,修改變量需要同步回主存才能對其它線程可見,而volatile就會有哪下作用
- 修改volatile修飾的變量時,會強制將修改的值回寫到主存。
- 讀取volatile修飾的變量時,會強制到主存獲取數(shù)據(jù),不再到緩存讀取
- volatile會使被volatile修飾的語句禁止指令重排序
指令重排序?qū)嵗?/p>
int a = 1;
int b = 2;
int c = 3;
//以上例子,可能是int c = 3優(yōu)先于int a = 1和int b = 2執(zhí)行
int a = 1;
volatile int b = 2;
int c = 3;
//以上例子則volatile int b = 2一定是在int a = 1和int c = 3之間執(zhí)行。
問題:為什么需要編譯器指令重排?
Re:優(yōu)化執(zhí)行效率。
問題:什么是CPU亂序執(zhí)行?

問題:為什么要禁止編譯器指令重排呢?
Re:因為多線程下指令重排可能會導致處理出錯,例如:
Thread-1:
int b = 10;
int c = b;
boolean flag = true;
Thread-2:
while(flag){
System.out.println(b);
}
如果編譯器把Thread-1的第3條指令重排到第一行,那Thread-2就有可能出錯,因為B還沒有初始化。
問題:volilate為什么不能保證原子性
Re:因為volilate對變量的操作在字節(jié)碼層面是由多條指令組成,非原子性操作,所以它只保證了可見性,不保證原子性。

Volilate因為只保證了Read and Load即從主存加載到工作內(nèi)存時加載的值是最新的,例如:
線程1和線程2在執(zhí)行Read and Load的時候,發(fā)現(xiàn)主存里的值都是5,雙方都加載了這個最新值,然后雙方都對該值加1,再把值放回主存,事實主存值結(jié)果為6,此操作有線程安全問題。
小結(jié)
聲明了volatile的變量進行寫操作,JVM就會向處理器發(fā)送一條Lock前綴的指令,把這個變量所在的緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存,再根據(jù)我們前面提到過的MESI的緩存一致性協(xié)議,來保證多CPU下的各個高速緩存中的數(shù)據(jù)的一致性。
Sychronized
機制
- 使用方法
- 修飾實例方法
- 修飾靜態(tài)方法
- 修飾代碼塊
- 使用Sychronized后,會通過字節(jié)碼生成以下指令:
-
修飾方法時:ACC_SYNCHRONIZED
image.png -
修飾代碼塊:monitorenter monitorexit
- 獲取鎖情況
- 修飾實例方法
進行同步代碼前,需要獲取當前實例的鎖 - 修飾靜態(tài)方法
進行同步代碼前,需要獲取當前類對象的鎖 - 修飾代碼塊
進行同步代碼前,需要獲取給定對象的鎖
如何實現(xiàn)鎖
本質(zhì):對象監(jiān)視器的獲?。í氄兼i)
為什么任何一個對象都可以成為鎖
因為對象在內(nèi)存中分為三塊區(qū)域:對象頭、實例數(shù)據(jù)、對齊填充

對象頭:

而Synchroned使用的鎖存在每一個對象的對象頭里,其中鎖標志位指向的是monitor對象(也稱為管程或監(jiān)視器鎖)的起始地址。
鎖的優(yōu)化
鎖的狀態(tài):
- 無向鎖
- 偏向鎖
大多數(shù)情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得。使用傳統(tǒng)的重量級鎖會有頻繁鎖操作,為了讓線程獲得鎖的代價更低,引入了偏向鎖, - 輕量鎖
當多線程竟然偏向鎖時會升級為輕量級鎖 - 重量鎖
基于monitor的鎖實現(xiàn)。
注意:鎖只能從輕到重的方向發(fā)展,不可逆。
Lock與Synchronized區(qū)別
- Lock是一個接口
- synchronized是JVM層的一個實現(xiàn)
- synchronized是被動的觸發(fā)鎖機制,而Lock是可以靈活的控制,鎖的創(chuàng)建和釋放都需要人為控制,特別是異常發(fā)生的時候要注意釋放鎖。
- Lock相對來講控制粒度更小,例如還可以分別控制讀寫鎖
- Lock支持公平、非公平鎖,而synchronized只支持非公平鎖
CAS
CAS是JDK提供的Unsafe類里的一系列操作,這一系列操作由JDK來保證原子性。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

Atomic一系列的對象是根據(jù)CAS的封裝來實例原子性。
AQS(AbstractQueuedSychronizer)
AQS的關(guān)鍵數(shù)據(jù)結(jié)構(gòu):

鏈表的操作通過CAS原子操作來保證多線程下的原子性:
- compareAndSetTail
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
- compareAndSetHead
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
通過compareAndSwapObject這個native方法來保證鏈表操作的線程安全性
ReentrantLock
Lock()加鎖分析

非公平鎖邏輯流程圖

公平鎖與非公平鎖的差異
- 公平鎖獲取鎖的過程
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 非公平鎖獲取鎖的過程
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
以上可以看出,非公平鎖在Lock()方法被調(diào)用時是首先嘗試當前線程是否能直接獲得鎖,然后tryAcquire()方法的時候公平鎖是需要檢查AQS隊列里是否有等待的節(jié)點,有的話是當前線程獲取鎖不成功,而非公平鎖是直接CAS當前鎖的狀態(tài),若通過就把鎖給當前線程了。同時也可以看出雙方在獲取不到鎖的時候,進行AQS隊列方式是一樣的,都是加在隊尾。在加入隊列后,還需要根據(jù)當前節(jié)點的前驅(qū)節(jié)點的waitStatus若是Node.SIGNAL狀態(tài)判斷是否需要把當前線程掛起,以省系統(tǒng)資源,
unlock()釋放鎖分析
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可以看出,每個unlock()操作都是一個State - 1操作,直到State == 0的時候,把ExclusiveOwnerThread即當前獲得鎖的線程設(shè)置為null來釋放鎖。
小結(jié)
在獲取鎖的時候,會維護一個雙向鏈表,用于存放獲取鎖失敗的的線程到隊列中進行自旋來獲取鎖,
CountDownLatch
是什么
CountDownLatch是JUC中提供的一個同步工具,使用調(diào)用await()它可以使一個或者多個線程進行等待,直到其它線程執(zhí)行CountDown()方法把倒數(shù)器減至0后,等待的線程才會啟動。
如何使用
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println(Thread.currentThread() + "執(zhí)行完畢");
countDownLatch.countDown();
}, "Thread-1").start();
new Thread(() -> {
System.out.println(Thread.currentThread() + "執(zhí)行完畢");
countDownLatch.countDown();
}, "Thread-2").start();
new Thread(() -> {
System.out.println(Thread.currentThread() + "執(zhí)行完畢");
countDownLatch.countDown();
}, "Thread-3").start();
countDownLatch.await();
System.out.println("全部線程執(zhí)行完畢");
}
}
分析
- await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 若線程中端,直接拋異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 計數(shù)為0時,表示獲取鎖成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 阻塞,并入隊
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 入隊
boolean failed = true;
try {
for (;;) {
// 獲取前驅(qū)節(jié)點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取鎖成功,設(shè)置隊列頭為node節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) // 線程掛起
&& parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 檢查計數(shù)器是否為0,為0直接返回
- 計數(shù)器大于0,即當前線程需要阻塞并等待計數(shù)器變?yōu)?
- 當前線程需要被封裝成Node對象并添加到AQS雙向鏈表里去
- 最后自旋嘗試獲取鎖,即檢查計數(shù)器是否為0,獲取成功即出隊,然后放行當前線程
- countDonw()
// 計數(shù)-1
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 首先嘗試釋放鎖
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) //如果計數(shù)已經(jīng)為0,則返回失敗
return false;
int nextc = c-1;
// 原子操作實現(xiàn)計數(shù)-1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 喚醒被阻塞的線程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 隊列非空,表示有線程被阻塞
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 頭結(jié)點如果為SIGNAL,則喚醒頭結(jié)點下個節(jié)點上關(guān)聯(lián)的線程,并出隊
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 沒有線程被阻塞,直接跳出
break;
}
}
- 嘗試釋放鎖,即將計數(shù)器-1,并判斷state是否為0,若為0即表示當前沒有鎖,可以開始喚醒鏈表中阻塞中的線程
- 如果鏈表里為空,即沒有阻塞的線程,直接退出
- 如果頭節(jié)點waitStatus為SIGNAL,就依次喚醒下個節(jié)點的線程,并出隊

