目錄
1.并行與并發(fā)
2.進程與線程
---- 2.1 進程
---- 2.2 線程
---- 2.3 進程與線程的區(qū)別
---- 2.4 線程調(diào)度
3.線程的創(chuàng)建方式
---- 3.1 繼承方式
---- 3.2 實現(xiàn)方式
---- 3.3 匿名內(nèi)部類方式
---- 3.4 繼承和實現(xiàn)的區(qū)別
---- 3.5 線程資源在內(nèi)存中的分布
---- 3.6 線程池方式
---- ---- 3.6.1 什么是線程池
---- ---- 3.6.2 線程池的使用
---- ---- 3.3.3 線程池的種類
4.高并發(fā)與線程安全
---- 4.1 悲觀鎖方式解決線程安全問題
---- ---- 4.1.2 問題代碼示例
---- ---- 4.1.3 synchronized 關(guān)鍵字 (悲觀鎖)
---- ---- ---- 4.1.3.1 同步代碼塊
---- ---- ---- 4.1.3.2 同步方法
---- ---- 4.1.4 Lock
---- ---- 4.1.5 死鎖
---- 4.2 樂觀鎖的方式解決線程安全問題
---- ---- 4.2.1 不可見性
---- ---- 4.2.2 無序性
---- ---- 4.2.3 原子性
---- ---- 4.2.4 原子類
---- ---- 4.2.5 CAS (樂觀鎖)
---- ---- 4.2.6 原子操作引發(fā)的 ABA 問題
---- 4.3 樂觀鎖與悲觀鎖的區(qū)別
---- 4.4 并發(fā)工具包
---- ---- 4.4.1 并發(fā)容器
---- ---- ---- CopyOnWriteArrayList
---- ---- ---- CopyOnWriteArraySet
---- ---- ---- ConcurrentHashMap
---- ---- 4.4.2 并發(fā)工具類
---- ---- ---- CountDownLatch
---- ---- ---- CyclicBarrier
---- ---- ---- Semaphore
---- ---- ---- Exchanger
5.線程的狀態(tài)
---- 5.1 產(chǎn)者消費者案例一
---- 5.1 產(chǎn)者消費者案例二
1. 并行與并發(fā)
- 并行: 指多個事件在同一時刻發(fā)生;
- 并發(fā): 指多個事件在同一個時間段內(nèi)發(fā)生;
在操作系統(tǒng)中, 這裝了多個程序, 并發(fā)的是在一段時間內(nèi)看起來有多個程序同時運行, 實際上是由CPU調(diào)度的分時交替運行; 如果CPU的調(diào)度時間比較長, 那么這個過程就會比較清晰, 在一段時間內(nèi)只有一個線程在運行;
2. 進程與線程
2.1 進程
進程是程序的一次執(zhí)行過程,是系統(tǒng)運行程序的基本單位;系統(tǒng)運行一個程序即是一個進程,從創(chuàng)建、運行到消亡的過程;每個進程都有一個獨立的內(nèi)存空間,一個應(yīng)用程序可以同時運行多個進程;
- 每個進程都有一個獨立的內(nèi)存空間, 進程與進程之間互不影響;
- 一個應(yīng)用程序可以有多個進程;
2.1 線程
線程是進程中一段程序的不同執(zhí)行路線流程,是進程中的一個執(zhí)行單元,負責(zé)當(dāng)前進程中程序的執(zhí)行;
- 線程是擁有資源和獨立運行的最小單位;
- 每個線程都有單獨的內(nèi)存空間;
- 一個進程之后, 可以有多個線程共享進程資源;
一個 Java 程序其實就是一個進程, 而一個進程一次只能執(zhí)行一條線程, 所以 Java 只有高并發(fā);
2.3 進程與線程的區(qū)別
- 進程: 有獨立的內(nèi)存空間, 進程中的數(shù)據(jù)存放空間是獨立的(棧和堆);
- 一個 Java 程序至少有兩個線程, main線程和GC(垃圾回收)線程 ;
- 線程: 堆空間是共享的, ??臻g是獨立的, 線程通信與轉(zhuǎn)換消耗的資源比進程小很多;
2.4 線程調(diào)度
-
分時調(diào)度
所有線程輪流使用 CPU 的使用權(quán), 平均分配每個線程占用 CPU 的時間 (這個時間會非常非常的短);
-
搶占式調(diào)度
優(yōu)先讓優(yōu)先級高的線程使用 CPU, 如果線程的優(yōu)先級相同, 那么會隨機選擇一個(線程隨機性);
Java使用的為搶占式調(diào)度;
3.線程的創(chuàng)建方式
3.1 繼承方式
// Java 中 Thread 表示一個線程
// Runnable接口表示執(zhí)行體函數(shù)的回調(diào), 而Thread實現(xiàn)了Runnable接口
public class MyThread extends Thread {
// 線程執(zhí)行體
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
new MyThread().start();
}
}
3.2 實現(xiàn)方式
// Runnable接口的回調(diào)就是線程執(zhí)行體
public class MyRunnable implements Runnable {
// 線程執(zhí)行體
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
new Thread(runnable).start();
}
}
3.3 匿名內(nèi)部類方式
public class ThreadTest {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
new Thread(runnable).start();
}
}
3.4 繼承和實現(xiàn)的區(qū)別
- 實現(xiàn)的方式將線程 (Thread) 和任務(wù) (Runnable的run方法中的內(nèi)容) 分開了, 而繼承是綁在了一起
- Runnable 中的資源可以被多個線程 (Thread) 共享
- 實現(xiàn)解耦操作, 增強程序的健壯性和擴展性
- Java 中線程池只能支持實現(xiàn)的Runnable 或 Callable 類線程, 不支持繼承Thread的類
3.5 線程資源在內(nèi)存中的分布
- 一個 Java 程序至少有兩個線程, 一個的 Main 方法所在的線程, 另一個是垃圾回收線程(GC);
- 當(dāng)線程啟動時, 會在棧內(nèi)存中開辟出一塊獨立空間 (每個線程都會有一塊獨立的棧空間);
- 此空間中保存 基本數(shù)據(jù)類型的對象和引用數(shù)據(jù)類型對象的引用 (堆空間中的地址), 也就是說基本所有的 線程都共享堆空間;
- 線程中的 靜態(tài)變量會保存在方法區(qū) (也叫靜態(tài)區(qū), 包含整個程序中永遠唯一的元素, 也就是 class 和 static變量); (data segment 與 code segment , 前面存放靜態(tài)變量或字符串常量, 后者存放類中的方法 )
- 當(dāng)線程中的方法被調(diào)用時, 該 方法會入此線程所在的棧 空間;
- Main 線程是最后出棧的, 基本代表程序的結(jié)束;
3.6 線程池方式
3.6.1 什么是線程池
線程池是 Java 提供的為我們管理和使用線程對象的池, 使用線程池不必再關(guān)心線程的頻繁創(chuàng)建與回收等等;
Java 里面線程池的頂級接口是 Executor, 但是嚴格意義上講 Executor 并不是一個線程池, 而只是一個執(zhí)行線程的工具; 真正的線程池接口是 ExecutorService;
3.6.2 線程池的使用
示例一:
public class ThreadTest {
public static void main(String[] args) {
// 創(chuàng)建線程池,并指定線程池中初始化線程的個數(shù)
ExecutorService es = Executors.newFixedThreadPool(3);
// 提交無返回值的任務(wù),并執(zhí)行任務(wù)
Future<?> submit1 = es.submit(new Runnable() {
@Override
public void run() {
}
});
// 提交有返回值的任務(wù),并執(zhí)行任務(wù)
Future<String> submit = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "yiGeSiren";
}
});
try {
System.out.println(submit1.get());
System.out.println(submit.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
示例二:
public class ThreadTest {
public static void main(String[] args) {
// 創(chuàng)建線程池,并指定線程池中初始化線程的個數(shù)
ExecutorService es = Executors.newFixedThreadPool(3);
Callable<String> runnable = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName()+
"開始: 實現(xiàn)Callable接口的任務(wù)...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+
"結(jié)束:實現(xiàn)Callable接口的任務(wù)...");
return "tiGeSiRen";
}
};
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
}
}
對于上述代碼線程池中的線程來說, 當(dāng)任務(wù)多于線程個數(shù)時, 任務(wù)會等待, 直到有空閑的線程從線程池中釋放出來;
3.6.2 線程池的種類
-
1. newCachedThreadPool
創(chuàng)建一個可緩存的線程池, 如果線程池長度超過處理需要, 會終止并從緩存中移除那些已有 60 秒鐘未被使用的線程;
如果線程池長度小于任務(wù)數(shù), 則會將已完成任務(wù)的線程加入緩存, 然后從緩存中取出線程去執(zhí)行新的任務(wù);
如果緩存中沒有空閑進程則會創(chuàng)建一個新的線程;public class ThreadTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName()+" 開始: 實現(xiàn)Callable接口的任務(wù)..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println(Thread.currentThread().getName()+" 結(jié)束:實現(xiàn)Callable接口的任務(wù)..."); return "tiGeSiRen"; } }; es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.shutdown(); } }打印結(jié)果
pool-1-thread-6 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-4 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-7 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-5 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-1 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-3 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實現(xiàn)Callable接口的任務(wù)...從上面的打印結(jié)果可以看出創(chuàng)建了七個線程對象;
-
2. newFixedThreadPool
指定線程個數(shù)的線程池, 也就是說當(dāng)任務(wù)數(shù)大于線程個數(shù)時, 任務(wù)只能等待線程空閑出來;
-
3. newScheduledThreadPool
創(chuàng)建一個指定線程個數(shù)的線程池, 支持定時及執(zhí)行周期性任務(wù);
public class ThreadTest { public static void main(String[] args) { ScheduledExecutorService es = Executors.newScheduledThreadPool(2); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " 開始: 實現(xiàn)Callable接口的任務(wù)..."); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "tiGeSiRen"; } }; es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.shutdown(); } }pool-1-thread-1 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-1 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實現(xiàn)Callable接口的任務(wù)... -
4. newSingleThreadExecutor
創(chuàng)建一個單線程的線程池; 這個線程池只有一個線程在工作, 也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù);
如果這個唯一的線程因為異常結(jié)束, 那么會有一個新的線程來替代它, 此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行;
4 高并發(fā)與線程安全
- 高并發(fā): 是指在某個時間點上, 有大量的用戶(線程)同時訪問同一資源;
- 線程安全: 在某個時間點上, 當(dāng)大量用戶(線程)訪問同一資源時, 由于多線程運行機制的原因, 可能會導(dǎo)致被訪問的資源出現(xiàn)"數(shù)據(jù)污染"的問題;
4.1 悲觀鎖方式的解決線程安全問題
4.1.2 問題代碼示例
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 100;
@Override
public void run() {
while (true) {
if (tickets < 1) {
break;
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "張票");
tickets--;
}
}
}
上面的代碼會執(zhí)行的結(jié)果會有幾個問題:
- 多條線程出現(xiàn)賣重復(fù)票
- 會出現(xiàn)賣負數(shù)票
- 有些票沒有出售
出現(xiàn)這些問題的原因, 是因為在 Java 中線程是搶占式調(diào)度, 所以當(dāng)線程在執(zhí)行任務(wù)的時候, 會被其他線程打斷;
解決方式一: 使用 synchronized 關(guān)鍵字加鎖
解決方式二: 使用 Lock 加鎖
解決方式三: 使用 volatile 關(guān)鍵字加 原子類
4.1.3 synchronized 關(guān)鍵字 (悲觀鎖)
synchronized 可以作用在 方法中的某個區(qū)塊 或 函數(shù) 上, 表示只對這個區(qū)塊的資源或函數(shù)進行互斥訪問; 也就是說, 當(dāng)有線程訪問到這個區(qū)塊或函數(shù)時, 其它的線程只能先等待;
需要注意:
- 鎖對象可以是任意對象;
- 但是同步線程加鎖的對象要一致;
- 同步函數(shù)的鎖對象; 如果這個函數(shù)是靜態(tài)函數(shù), 那么鎖對象是這個函數(shù)所在的 類.class; 如果這個函數(shù)是非靜態(tài)函數(shù), 那么鎖對象是調(diào)用這個函數(shù)的對象, 也就是 this ;
4.1.3.1 同步代碼塊
使用同步代碼塊方式, 解決 4.1.2 代碼示例中的問題; 代碼如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
synchronized (this){
if (tickets < 1) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第"
+ tickets + "張票");
tickets--;
}
}
}
}
4.1.3.2 同步方法
使用同步方法方式, 解決 4.1.2 代碼示例中的問題; 代碼如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
if (sellTickets()) {
break;
}
}
}
public synchronized boolean sellTickets() {
if (tickets < 1) {
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" + tickets
+ "張票");
tickets--;
return false;
}
}
4.1.4 Lock
Lock 也稱同步鎖, 是一個接口, 其實現(xiàn)類為 ReentrantLock ;
使用 Lock 鎖解決 4.1.2 代碼示例中的問題; 代碼如下:
public class MyRunnable implements Runnable {
Lock lock = new ReentrantLock();
int tickets = 50;
@Override
public void run() {
while (true) {
// 加鎖
lock.lock();
if (tickets < 1) {
// 循環(huán)退出 也要解鎖
lock.unlock();
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "張票");
tickets--;
// 解鎖
lock.unlock();
}
}
}
4.1.5 死鎖
- 什么是死鎖
多線程程序中, 使用了多把鎖, 造成線程之間相互等待;- 產(chǎn)生死鎖的條件
- 有多個線程;
- 有多把鎖
- 有同步代碼塊嵌套
- 案例:
線程A : 需要獲取A鎖, 再獲取B鎖, 才能執(zhí)行里面的代碼
線程B : 需要獲取B鎖, 再獲取A鎖, 才能執(zhí)行里面的代碼
代碼示例:
public class ThreadTest {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("鎖A"){
System.out.println("線程A獲取到了鎖A,等待獲取鎖B");
synchronized ("鎖B"){
System.out.println("線程A獲取到了鎖A,鎖B 開始執(zhí)行里面的代碼");
}
}
}
},"線程A").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("鎖B"){
System.out.println("線程B獲取到了鎖B,等待獲取鎖A");
synchronized ("鎖A"){
System.out.println("線程B獲取到了鎖B,鎖A 開始執(zhí)行里面的代碼");
}
}
}
},"線程B").start();
}
}
打印結(jié)果:
線程A獲取到了鎖A,等待獲取鎖B
線程B獲取到了鎖B,等待獲取鎖A (程序到這里是沒有結(jié)束的)
4.2 樂觀鎖的方式解決線程安全問題
使用樂觀鎖方式解決線程安全問題, 需要解決不可見性, 無序性和非原子操作的問題;
4.2.1 不可見性
代碼示例:
public class ThreadTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
// 在子線程中修改 flag 變量為True
thread.start();
while (true){
// 檢測到線程中 flag 變量為 true 之后結(jié)否死循環(huán)
if (MyThread.flag){
System.out.println("結(jié)束 Main 線程中的死循環(huán)!");
break;
}
}
}
}
public class MyThread extends Thread {
static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag變量為true");
}
}
期望結(jié)果:
子線程進行睡眠3秒, 主線程一直死循環(huán);
子線程醒了之后,修改flag=true,子線程結(jié)束,主線程結(jié)束;
實際結(jié)果:
子線程進入睡眠3秒,主線程一直死循環(huán);
子線程醒了之后,修改flag=true,子線程結(jié)束,但是主線程結(jié)束不了;
原因:
子線程對共享變量flag的值進行了修改,而主線程沒有看見(沒有獲取到修改后的值);
為什么主線程獲取不到子線程對共享變量flag修改后的值
- Java 內(nèi)存模型 (Java Memory Modle) JMM 描述了 Java 程序中各種變量的訪問規(guī)則 (靜態(tài)共享變量) , 以及 JVM 在將變量 讀取到內(nèi)存 和將變量 存儲到內(nèi)存 這樣的底層細節(jié);
- 線程中的 靜態(tài)變量會保存在方法區(qū) (也叫靜態(tài)區(qū), 包含整個程序中永遠唯一的元素, 也就是 class 和 static變量); (data segment 與 code segment , 前面存放靜態(tài)變量或字符串常量, 后者存放類中的方法 )
- 不同的線程會在棧區(qū)中有 各自的工作空間;
- 當(dāng)線程訪問共享靜態(tài)變量時, 線程會將這個變量 拷備一份到自己的工作區(qū) (也就是說, 在上面代碼中獲取到的變量值是在線程獨立的棧區(qū)), 進行讀寫的操作;
- 但是不會將在棧區(qū)操作后的變量值 寫回主內(nèi)存(也就是靜態(tài)區(qū));
volatile關(guān)鍵字可以解決線程中的可見性問題
public class MyThread extends Thread {
// 當(dāng)共享變量被volatile修飾,會強制讓線程每次獲取變量的值都從主內(nèi)存(方法區(qū)也就靜態(tài)區(qū))中去獲取;
volatile static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag變量為true");
}
}
4.2.2 無序性
無序性是指 內(nèi)存訪問順序與得到的字節(jié)碼順序 不一樣 或者 字節(jié)碼順序與實際的執(zhí)行順序 不一樣; 是由 JIT 動態(tài)編譯 (會重排序) 優(yōu)化的原因造成的, 靜態(tài)編譯是不會有這樣的問題產(chǎn)生的;
volatile 關(guān)鍵字修飾變量可以禁止變量相關(guān)的指令重排序;
http://www.itdecent.cn/p/119ffdcef55a
https://baijiahao.baidu.com/s?id=1662251623172268398&wfr=spider&for=pc
4.2.3 原子性
原子性是指在一次或多次操作中, 要么全部執(zhí)行不被中斷, 要么全部不執(zhí)行;
代碼示例如下:
public class ThreadTest {
public static void main(String[] args) {
// 子線程1000000次自增
new MyThread().start();
// main線程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a++;
}
System.out.println("Main線程的1000000次自增結(jié)束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值為" + MyThread.a);
}
}
public class MyThread extends Thread {
volatile static int a = 0;
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a++;
}
System.out.println("子線程的1000000次自增結(jié)束了");
}
}
期望打印結(jié)果
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為2000000
實際打印結(jié)果
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為1803604(實際上小于2000000)
原因: 兩個線程對a的自增操作, 產(chǎn)生了覆蓋效果; 就是這樣一種情況main線程自增一次之后等于1, 而子線程自增加之后也等于1, 而實際上是等于2了;
并且Volatile不能解決原子性問題, 實際上就是兩個或兩個以上的線程同時對主內(nèi)存的變量進行了寫的操作, 正確的應(yīng)該是當(dāng)多個線程同時對主內(nèi)存的變量時行寫操作時, 只有一個線程可以進行寫操作, 在等待當(dāng)前線程寫完之后其它的線程需要重要讀取變量的值;
4.2.4 原子類
在 java.util.concurrent.atomic 包下提供了一系的具有 原子性的原子類 API ;
它們可以保證對“變量”操作的:原子性、有序性、可見性; 其工作原理是基于 CAS 機制;

以 AtomicInteger 為例驗證上面4.2.3示例代碼中的原子性問題:
public class ThreadTest {
public static void main(String[] args) {
// 子線程1000000次自增
new MyThread().start();
// main線程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a.addAndGet(1);
}
System.out.println("Main線程的1000000次自增結(jié)束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值為" + MyThread.a);
}
}
public class MyThread extends Thread {
static AtomicInteger a = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a.addAndGet(1);
}
System.out.println("子線程的1000000次自增結(jié)束了");
}
}
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為2000000
4.2.5 CAS (樂觀鎖)
CAS (Compare and Swap/Set) 比較并交換, CAS 的算法過程是: 它包含3個參數(shù)CAS(V,E,N) ;
- V 表示要更新的變量(內(nèi)存值), E表示預(yù)期值(舊的), N表示新值;
- 當(dāng)且僅當(dāng) V 值等于 E 值是, 才會將 N 值覆給 V 值;
- 如果 V 值不等于 E 值, 表示當(dāng)前有其它線程正在進行操作, 那么當(dāng)前線程則什么都不做, 返回 V 的真實值;
- 當(dāng)多個線程同時使用 CAS 操作一個變量時, 只有一個會勝出, 并成功更新, 其余均會失敗; 失敗的線程不會被掛起, 僅是被告知失敗, 并且允許再次嘗試, 當(dāng)然也允許失敗的線程放棄操作;
4.2.6 原子操作引發(fā)的 ABA 問題
4.3 樂觀鎖與悲觀鎖的區(qū)別
- 樂觀鎖: 樂觀思想, 認為讀多寫少, 遇到并發(fā)寫的可能性低; 每次去拿數(shù)據(jù)的時候都認為別人不會修改, 所以不會上鎖, 但是在更新的時候會判斷一下有么有其它線程在此期間去更新這個數(shù)據(jù), 采取在寫入數(shù)據(jù)的時候加鎖的操作, 也就是說在 寫入數(shù)據(jù) 時候, 其它線程是可以進行讀操作 的;
- 悲觀鎖: 悲觀思想, 認為寫多, 遇到并發(fā)寫的可能性高; 每次去拿數(shù)據(jù)的時候都認為別人會修改, 所以每次在讀寫數(shù)據(jù)的時候都會上鎖, 這樣別人想讀寫這個數(shù)據(jù)就會block直到拿到鎖;
4.4 并發(fā)工具包
java.util.concurrent 下提供的一系列的處理并發(fā)問題的容器和工具類
4.4.1 并發(fā)容器
CopyOnWriteArrayList
與 ArrayList 相關(guān)操作一致, 不同的是 CopyOnWriteArrayList 就線程安全的, 其內(nèi)部源碼使用了同步代碼塊的方式, 并且 new 了一個 Object 對象作為鎖;
CopyOnWriteArraySet
與 CopyOnWriteArrayList 的相關(guān)操作一致, 是依賴 CopyOnWriteArrayList 實現(xiàn)的, 額外加了兩個函數(shù) addIfAbsent() 和 addAllAbsent() 來保證元素的唯 一性, 元素已經(jīng)在列表中, 就不往里面添加了;
ConcurrentHashMap
并發(fā)基于分段鎖思想;
4.4.2 并發(fā)工具類
CountDownLatch
利用這個類可以實現(xiàn)類似計數(shù)器的功能; 比如: 線程C 只有在 線程A 和線程B 走完 之后才能執(zhí)行; 示例示碼如下:
public class ThreadTest {
public static void main(String[] args) {
// 初始計數(shù)值為 2
CountDownLatch countDownLatch = new CountDownLatch(2);
// 線程c
new Thread(new Runnable() {
@Override
public void run() {
try {
// 當(dāng)基數(shù)值減為0時, 等待的線程C 被喚醒執(zhí)行
countDownLatch.await();
Thread.sleep(3000);
System.out.println("線程C執(zhí)行完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程c").start();
// 線程 A
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程A執(zhí)行完畢");
// 每次調(diào)用 countDown 函數(shù)計數(shù)值都會減一
countDownLatch.countDown();
}
}, "線程A").start();
// 線程 B
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程B執(zhí)行完畢");
// 每次調(diào)用 countDown 函數(shù)計數(shù)值都會減一
countDownLatch.countDown();
}
}, "線程B").start();
}
}
CyclicBarrier
通過它可以實現(xiàn)讓一組線程等待至某個狀態(tài)之后再全部同時執(zhí)行, 當(dāng)所有等待線程都被釋放以后,CyclicBarrier 可以被重用;
示例:公司召集5名員工開會,等5名員工都到了,會議開始。
?---------創(chuàng)建5個員工線程,1個開會線程,幾乎同時啟動,使用 CyclicBarrier 保證5名員工線程全部執(zhí)行后,再執(zhí)行開會線程;
---------如果是再有5個員工線程, 重新進入, 也可以重新開始開會 ( CyclicBarrier 可被重復(fù)利用);
public class ThreadTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
// 當(dāng)達到解放出阻塞線程的解開條件時,會回調(diào)此函數(shù)
@Override
public void run() {
System.out.println("可以開始開會了,會議內(nèi)容是......");
}
});
MyRunnable myRunnable = new MyRunnable(cyclicBarrier);
new Thread(myRunnable,"員工一").start();
new Thread(myRunnable,"員工二").start();
new Thread(myRunnable,"員工三").start();
new Thread(myRunnable,"員工四").start();
new Thread(myRunnable,"員工五").start();
new Thread(myRunnable,"員工六").start();
new Thread(myRunnable,"員工七").start();
new Thread(myRunnable,"員工八").start();
new Thread(myRunnable,"員工九").start();
new Thread(myRunnable,"員工十").start();
}
}
public class MyRunnable implements Runnable {
CyclicBarrier cyclicBarrier;
public MyRunnable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
if (cyclicBarrier != null) {
System.out.println(Thread.currentThread().getName()+":到達了會議室");
try {
// 阻塞線程繼續(xù)執(zhí)行,直到達到放開阻塞的條件
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
// 阻塞放開之后,線程繼續(xù)往下執(zhí)行
System.out.println(Thread.currentThread().getName()+":離開了會議室");
}
}
}
Semaphore
字面意思為信號量,Semaphore 可以控制 同時訪問的線程個數(shù);
案例: 教室同時只允許至多4名學(xué)生進來上課, 出去一名, 才能再進來一名; 示例代碼如下:
public class CLassRoom {
// 最大同時訪問的線程個數(shù)為4
Semaphore sp = new Semaphore(4);
public void into(){
// 獲得許可
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName()+"正在聽課3秒鐘");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"聽課完成");
// 釋放許可
sp.release();
}
}
public class MyRunnable implements Runnable {
ClassRoom cr;
public MyRunnable(ClassRoom cr) {
this.cr = cr;
}
@Override
public void run() {
cr.into();// 進入教室
}
}
public class ThreadTest {
public static void main(String[] args) {
CLassRoom cLassRoom = new CLassRoom();
MyRunnable myRunnable = new MyRunnable(cLassRoom);
new Thread(myRunnable,"學(xué)生一").start();
new Thread(myRunnable,"學(xué)生二").start();
new Thread(myRunnable,"學(xué)生三").start();
new Thread(myRunnable,"學(xué)生四").start();
new Thread(myRunnable,"學(xué)生五").start();
new Thread(myRunnable,"學(xué)生六").start();
new Thread(myRunnable,"學(xué)生七").start();
new Thread(myRunnable,"學(xué)生八").start();
new Thread(myRunnable,"學(xué)生九").start();
new Thread(myRunnable,"學(xué)生十").start();
}
}
Exchanger
是一個用于線程間協(xié)作的工具類; Exchanger用于進行 線程間的數(shù)據(jù)交換;
示例代碼:
public class ThreadA extends Thread {
Exchanger<String> exchanger;
public ThreadA(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadA");
System.out.println("線程A接收到了線程B交換的數(shù)據(jù): "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
Exchanger<String> exchanger;
public ThreadB(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadB");
System.out.println("線程B接收到了線程A交換的數(shù)據(jù): "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadTest {
public static void main(String[] args) {
Exchanger<String> stringExchanger = new Exchanger<>();
new ThreadA(stringExchanger).start();
new ThreadB(stringExchanger).start();
}
}
打印結(jié)果:
線程A接收到了線程B交換的數(shù)據(jù): ThreadB
線程B接收到了線程A交換的數(shù)據(jù): ThreadA
5 線程的狀態(tài)
- 新建(new)
- 運行(runnable)
- 終止(teminated)
- 鎖阻塞(blocked)
- 無線等待(waiting)
- 限時等待(time waiting)
線程狀態(tài)之間的關(guān)系:
運行與鎖阻塞: 線程未搶到鎖對象就會阻塞, 搶到鎖對象就會進入運行狀態(tài);
運行與無線等待: 運行中的線程調(diào)用鎖對象的 wait() 函數(shù), 會進入無限等待狀態(tài)等待被喚醒; 如果無限等待中的線程被喚醒但 沒有搶到鎖, 那么會進入阻塞狀態(tài); 如果無限等待中的線程被喚醒并且 搶到了鎖, 那么會進入運行狀態(tài);
運行與限時等待第一種情況: 運行中的線程調(diào)用鎖對象的 wait(數(shù)字) 函數(shù), 會進入無限等待狀態(tài)等待被喚醒;
如果時間到了, 但沒有搶到鎖, 會進入阻塞狀態(tài);
如果時間到了, 并且搶到了鎖, 會進入運行狀態(tài);
如果時間沒到, 卻被喚醒, 搶到了鎖就會進行運行狀態(tài), 沒有搶到鎖就會進入阻塞狀態(tài);
運行與限時等待第二種情況: 運行中的線程調(diào)用 sleep() 方法, 由于調(diào)用 sleep() 方法并不像調(diào)用 wait() 方法那樣會釋放鎖, 所以線程應(yīng)該是屬于限時等待狀態(tài), 但是時間到了就會直接進行運行狀態(tài);
代碼示例
public class ThreadTest {
public static void main(String[] args) {
Object object = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("線程一進入無限等待狀態(tài)");
object.wait();
System.out.println("線程一被線程二喚醒進行運行狀態(tài)");
Thread.sleep(2000);
System.out.println("線程一執(zhí)行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"線程一").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("線程二準備喚醒線程一");
Thread.sleep(2000);
object.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"線程二").start();
}
}
5.1 生產(chǎn)者消費者案例一
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag) {
try {
System.out.println(name + "進入無限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!product.flag) {
product.flag = true;
System.out.println(name + "正在生產(chǎn)商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag) {
try {
System.out.println("------" + name + "進入無限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (product.flag) {
product.flag = false;
System.out.println("------" + name + "正在消費商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class Product {
public boolean flag = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生產(chǎn)者線程1").start();
new ThreadB(product, "消費者線程1").start();
}
}
5.1 生產(chǎn)者消費者案例二
public class Product {
public int num = 0;
public boolean flag2 = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生產(chǎn)者線程1").start();
new ThreadA(product, "生產(chǎn)者線程2").start();
new ThreadA(product, "生產(chǎn)者線程3").start();
new ThreadA(product, "生產(chǎn)者線程4").start();
new ThreadA(product, "生產(chǎn)者線程5").start();
new ThreadA(product, "生產(chǎn)者線程6").start();
new ThreadA(product, "生產(chǎn)者線程7").start();
new ThreadA(product, "生產(chǎn)者線程8").start();
new ThreadA(product, "生產(chǎn)者線程9").start();
new ThreadB(product, "消費者線程1").start();
new ThreadB(product, "消費者線程2").start();
new ThreadB(product, "消費者線程3").start();
new ThreadB(product, "消費者線程4").start();
new ThreadB(product, "消費者線程5").start();
new ThreadB(product, "消費者線程6").start();
new ThreadB(product, "消費者線程7").start();
new ThreadB(product, "消費者線程8").start();
new ThreadB(product, "消費者線程9").start();
}
}
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag2) {
if (product.num < 10) {
product.num++;
System.out.println(name + "正在生產(chǎn)第" + product.num + "商品");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 10) {
product.flag2 = true;
product.notifyAll();
System.out.println(name + "進入無限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag2) {
if (product.num <= 10) {
System.out.println("------" + name + "正在消費第" + product.num + "商品");
product.num--;
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 0) {
product.flag2 = false;
product.notifyAll();
System.out.println("------" + name + "進入無限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}