一.概述
1.JUC是JDK1.5中提供的一套并發(fā)包及其子包。包含以下:
java.util.concurrent,
java.util.concurrent.atmoic,
java.util.concurrent.lock
2.JUC中包含了5套接口:BlockingQueue、ConcurrentMap、ExecutorService,Lock和Automic
二.BlockingQueue-阻塞式隊列
1.特征:阻塞、FIFO(先進先出)
2.BlockingQueue不同于之前學習的Queue,不能進行擴容。即BlockingQueue在使用的時候指定的容量是多少就是多少
3.當隊列已滿時,試圖放入元素的線程會被阻塞;當隊列為空時,似乎獲取元素的線程會被阻塞。
阻塞式隊列不允許元素為空。
4.重要方法:
| 特征 | 拋出異常 | 返回值 | 阻塞 | 定時阻塞 |
|---|---|---|---|---|
| 添加元素 | add- java.lang.IllegalStateException | offer-false | put | offer |
| 移除元素 | remove-java.util.NoSuchElementException | poll-null | take | poll |
5.常見的實現(xiàn)類:
- ArrayBlockingQueue-阻塞式順序隊列
a.底層依靠數(shù)組來存儲數(shù)據(jù)
b.使用的時候需要指定容量
package blockingqueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//構建隊列
ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(5);
queue.add("5");
queue.add("5");
queue.add("5");
queue.add("5");
queue.add("5");
//添加元素
//隊列已滿
//拋出異常,java.lang.IllegalStateException
//queue.add("a");
//返回false
boolean r = queue.offer("b");
System.out.println(r);
//產(chǎn)生阻塞
//queue.put("c");
//定時阻塞
boolean re = queue.offer("d", 5, TimeUnit.SECONDS);
System.out.println(re);
System.out.println(queue);
}
}
- LinkedBlockingQueue-阻塞式鏈式隊列
a.底層依靠單向節(jié)點來存儲數(shù)據(jù)
b.在使用的時候可以指定容量也可以不指定,如果指定了容量,則容量不可變。如果沒有指定容量,則容量為Integer.Max_VALUE,即2^31-1;此時因為這個容量相對較大,一般認為隊列是無限的。
package blockingqueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class LinkeBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>();
//隊列為空
//拋出異常java.util.NoSuchElementException
//System.out.println(queue.remove());
//返回null
//System.out.println(queue.poll());
//產(chǎn)生阻塞
//System.out.println(queue.take());
//定時阻塞
System.out.println(queue.poll(5, TimeUnit.SECONDS));
}
}
- PriorityBlockingQueue-具有優(yōu)先級的阻塞式隊列
a.在使用的時候可以不指定容量。如果不指定,則默認初始容量為11-在容量不夠,會進行擴容
b.底層依靠數(shù)組存儲元素
c.PriorityBlockingQueue會對放入其中的元素進行排序,要求元素對應的類必須實現(xiàn)Comparable接口,覆蓋compareTo方法
d.如果需要給隊列單獨指定比較規(guī)則,那么可以傳入Comparator對象
e.迭代遍歷不保證排序
package blockingqueue;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<String> queue=new PriorityBlockingQueue<>();
queue.put("Amy");
queue.put("Maray");
queue.put("Peter");
queue.put("Bob");
for(int i=0;i<4;i++){
System.out.println(queue.take());
}
PriorityBlockingQueue<Studnet> studnets=new PriorityBlockingQueue<>();
studnets.put(new Studnet("Amy",98,18));
studnets.put(new Studnet("Bob",48,17));
studnets.put(new Studnet("Cindy",85,20));
studnets.put(new Studnet("Lue",56,22));
studnets.put(new Studnet("Maray",100,18));
for(int i=0;i<5;i++){
System.out.println(studnets.take());
}
System.out.println("-----------------------------------------------");
//需要給隊列單獨指定比較規(guī)則
PriorityBlockingQueue<Studnet> studnet2=new PriorityBlockingQueue<>(
5,(s1,s2)->s1.getAge()-s2.getAge());
studnet2.put(new Studnet("Amy",98,18));
studnet2.put(new Studnet("Bob",48,17));
studnet2.put(new Studnet("Cindy",85,20));
studnet2.put(new Studnet("Lue",56,22));
studnet2.put(new Studnet("Maray",100,18));
for(int i=0;i<5;i++){
System.out.println(studnet2.take());
}
}
}
class Studnet implements Comparable<Studnet>{
private String name;
private int age;
private int score;
public Studnet(String name, int score,int age) {
this.name = name;
this.score = score;
this.age=age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Studnet{" +
"name='" + name + '\'' +
", age=" + age +
", score=" + score +
'}';
}
//按照分數(shù)進行排序
//在這個方法指定比較規(guī)則
//升序 this-o
//降序 o-this
@Override
public int compareTo(Studnet o) {
return o.score-this.score;
}
}
- SynchronousQueue-同步隊列
a.不需要指定容量,容量默認為1且只能為1 - 擴展:BlockingDeque-阻塞式雙向隊列-允許兩端存放兩端拿
三.ConcurrentMap-并發(fā)映射
1.ConcurrentMap是JDK1.5提供的一套用于應對高并發(fā)以及保證數(shù)據(jù)安全的映射機制。
2.ConcurrentMap包含了ConcurrentHashMap和ConcurrentNavigableMap
3.ConcurrentHashMap-并發(fā)哈希映射
1)底層是基于數(shù)組加鏈表來實現(xiàn)的。數(shù)組的每一個位置稱之為桶,每一個桶中維系一個鏈表。
2)如果不指定,默認情況下,初始容量為16,默認加載因子是0.75,擴容的時候是在原來的基礎上增加一倍。
3)ConcurrentHashMap的最大容量(最大桶數(shù))是2^30。
4)無論指定初始容量是多少,那么經(jīng)過計算,最終容量一定是2^n的形式。
5)從JDK1.8開始,ConcurrentHashMap引入了紅黑樹的機制。當ConcurrentHashMap桶中的元素數(shù)量達到8個,會將這個桶中的鏈表扭轉成為一個紅黑樹;如果桶中的元素數(shù)量不足7個的時候,會將這個桶中的紅黑樹再扭轉會鏈表。在ConcurrentHashMap中,使用紅黑樹的前提是容量>=64
6)轉化紅黑樹的前提是容量為>=64的原因如下:
假設現(xiàn)在容量為16,當其中桶中的鏈表數(shù)量達到了8個,這時需要將其轉化為紅黑樹,但是這時候concurrentHashMap進行了插入操作,這時同樣也達到了擴容的條件,這時需要同步進行紅黑樹的轉化以及rehash操作,產(chǎn)生了資源沖突。所以給定前提為容量>=64,這時擴容概率不會很大。
7)紅黑樹(Red-Black Tree)
- 本質(zhì)上是一種自平衡二叉查找樹
- 二叉查找樹的特征:
a.左子樹小于根,右子樹大于根
b.沒有相等的節(jié)點 - 特征:
a.所有節(jié)點非紅即黑
b.根節(jié)點必須是黑節(jié)點
c.紅節(jié)點的子節(jié)點必須是黑色的
d.最底層的葉子節(jié)點必須是黑色的空節(jié)點
e.從根節(jié)點到任意一個葉子節(jié)點經(jīng)過的路徑的黑色節(jié)點個數(shù)一致,即黑節(jié)點高度相同
f.新添的節(jié)點顏色必須是紅色的 - 紅黑樹的修正-前提:父子節(jié)點為紅
a.叔父節(jié)點為紅,那么將父節(jié)點和叔父節(jié)點涂黑,祖父節(jié)點涂紅
b.叔父節(jié)點為黑,且當前節(jié)點為右子葉,則以當前節(jié)點為軸進行左旋
c.叔父節(jié)點為黑,且當前節(jié)點為左子葉,則以當前節(jié)點為軸進行右旋 - 在紅黑樹中,每添加一個元素,都需要考慮這棵樹是否需要修正
-
紅黑樹的查詢時間復雜度為O(log n)
8)ConcurrentHashMap是一個異步線程安全的映射-支持并發(fā)。不同于Hashtable,ConcurrentHashMap采用了分段/桶鎖機制來保證線程安全。----宏觀同步,微觀異步(映射異步,桶同步)
分段鎖.png
HashMap:異步線程不安全
Hashtable:同步線程安全-凡是對外提供的方法都是同步方法,靜態(tài)方法鎖對象是當前類的字節(jié)碼對象,非靜態(tài)方法的鎖對象是this
9)線程在使用鎖的時候,會產(chǎn)生非常大的開銷(線程狀態(tài)切換、線程的上下文調(diào)度、CPU資源的切換等)
鎖的資源消耗.png
因此在JDK1.8中,引入了一套無鎖算法CAS(Compare And Swap 比較和交換)-CAS過程中涉及到線程的重新調(diào)度問題,所以CAS需要結合具體的CPU內(nèi)核架構實現(xiàn)。目前市面上幾乎所有的CPU內(nèi)核都是支持CAS的。Java中的CAS底層是依靠C語言實現(xiàn)的。
CAS.png
10)ConcurrentHashMap的用法是和HashMap一致。
4.ConcurrentNavigableMap-并發(fā)導航映射
1)ConcurrentNavigableMap提供了用于截取子映射的方法--headMap、tailMap、subMap
package concurrentmap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConcurrentNavigableMapDemo {
public static void main(String[] args) {
//實現(xiàn)類ConcurrentSkipListMap-并發(fā)跳躍表映射
ConcurrentNavigableMap<String,Integer> map=new ConcurrentSkipListMap<>();
map.put("Amy",98);
map.put("Bob",78);
map.put("Jack",75);
map.put("Rose",88);
map.put("Tony",45);
System.out.println(map);
//從頭開始截取到指定位置
System.out.println(map.headMap("Jack"));
//從指定位置截取到尾部
System.out.println(map.tailMap("Bob"));
//截取指定范圍的數(shù)據(jù)
System.out.println(map.subMap("Bob","Rose"));
}
}
2)ConcurrentNavigableMap本身是一個接口,在JDK中提供了唯一的實現(xiàn)類ConcurrentSkipListMap-并發(fā)跳躍表映射--底層是基于跳躍表實現(xiàn)的
3)跳躍表:
原理參考連接:https://blog.csdn.net/qpzkobe/article/details/80056807
- 針對有序列表來使用
- 適合于讀多寫少的場景
- 跳躍表可以進行多層提取,但是最后一層的元素個數(shù)不能少于2個
- 典型的“以空間換時間”的產(chǎn)物
- 當新增元素的時候,這個元素是否要提取到上層跳躍表中遵循“拋硬幣”原則
- 跳躍表的時間復雜度為O(log n),空間復雜度為O(n)
四.ExecutorService-執(zhí)行器服務
1.本質(zhì)上是一個線程池。意義:減少線程的創(chuàng)建和銷毀,減少服務器資源的浪費,做到線程的復用
2.線程池在剛定義的時候是空的,沒有任何線程。
3.如果接收到一個請求,線程池中就會創(chuàng)建一個線程(core-thread -核心線程)用于處理這個請求
4.核心線程用完之后不會銷毀而是會去等待下一個請求。
5.在定義線程池的時候需要去給定核心線程的數(shù)量。
6.在核心線程達到指定數(shù)量之前,每次來的請求都會觸發(fā)創(chuàng)建一個新的核心線程。
7.如果核心線程被全部占用,那么后來的線程將會放到工作隊列(work queue)中臨時存儲。工作隊列本質(zhì)上是一個阻塞式隊列
8.如果工作隊列被全部占用,那么后來的請求會被交給一個臨時線程(temproary thread)來處理
9.在定義線程池的時候需要給定臨時線程的數(shù)量
10.臨時線程在處理完請求之后,會存活指定的一段時間。如果在這段時間內(nèi)接受到新的請求,那么臨時線程會繼續(xù)處理新的請求而暫時不會被銷毀;如果超過這段時間臨時線程沒有接收到新的請求,那么這個臨時線程就會被銷毀。
11.如果臨時線程被全部占用,那么后來的請求會被交給拒絕執(zhí)行處理器(RejectedExecutionHandler)來進行拒絕處理。
12.代碼:
submit和execute的區(qū)別:
execute()用于提交Runnable線程
submit()既可以提交Runnable線程也可以提交Callable線程
package executorservice;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
public static void main(String[] args) {
//構建一個線程池
/**
*int corePoolSize---核心線程數(shù)量
*int maximumPoolSize---最大線程數(shù)量=核心線程數(shù)+臨時線程數(shù)
*long keepAliveTime---臨時線程存活時間
*TimeUnit unit---時間單位
*BlockingQueue<Runnable> workQueue---工作隊列
*RejectedExecutionHandler handler---拒絕執(zhí)行處理器--如果有具體的拒絕流程,需要覆蓋這個接口
*/
ExecutorService es=new ThreadPoolExecutor(5,
10,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
//實際過程中,會有一套明確的拒絕流程
(r,e)-> System.out.println("拒絕執(zhí)行線程")
);
//new Thread(new ExecutorThread()).start();
//可以通過線程池來執(zhí)行這個線程
/**
* submit和execute的區(qū)別:
* execute()用于提交Runnable線程
* submit()既可以提交Runnable線程也可以提交Callable線程
*/
//es.execute(new ExecutorThread());
es.submit(new Thread());
//關閉線程池
es.shutdown();
}
}
class ExecutorThread implements Runnable{
@Override
public void run() {
System.out.println("hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
13.Callable<T>
1)Callable是JDK1.5中提供的一套用于定義線程的方式,通過泛型來定義返回值類型
2)創(chuàng)建Callable線程的兩種方式:
package executorservice;
import java.util.concurrent.*;
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//方式一:將Callable包裝成Runnable,通過Thread來啟動
//Callable->FutureTask->RunnableFuture->Runnable
FutureTask<String> f=new FutureTask<>(new CallableThread());
new Thread(f).start();
//獲取指定結果
System.out.println(f.get());
//方式二:通過線程池來啟動Callable線程
ExecutorService es=new ThreadPoolExecutor(5,10,5,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5));
Future<String> f1 = es.submit(new CallableThread());
System.out.println(f1.get());
es.shutdown();
}
}
//泛型定義的是返回值類型
class CallableThread implements Callable<String>{
@Override
public String call() throws Exception {
return "SUCCESS";
}
}
3)Runnable和Callable比較:
| 比較 | Runnable | Callable |
|---|---|---|
| 返回值 | 沒有返回值 | 通過泛型來定義返回值 |
| 啟動方式 | 1.通過Thread直接啟動 2.通過線程池的execute或者submit來啟動 |
1.包裝成Runnable之后通過Thread來啟動 2.通過線程池的submit方法來啟動 |
| 異常機制 | 不允許拋出異常,一旦出現(xiàn)異常需要立即捕獲處理,就沒有辦法利用全局機制來進行處理 | 允許拋出異常,意味著可以選擇用全局機制(例如Spring中的異常通知)來統(tǒng)一處理異常 |
14.預定義的線程池
package executorservice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorServiceDemo2 {
public static void main(String[] args) {
//預定義的線程池
/**
* newCachedThreadPool特點:
* 1.沒有核心線程,全部都是臨時線程
* 2.臨時線程的數(shù)量為Integer.MAX_VALUE,即2^31-1
* 一臺服務器能夠承載的線程數(shù)量遠低于這個值
* 所以此時認為這個線程池能夠處理無限多的請求
* 3.臨時線程的存活時間是一分鐘
* 4.工作隊列是一個同步隊列(容量為1)
*/
//大池子小隊列
//適合于高并發(fā)的短任務場景,例如即時通信
//不適合于長任務場景
ExecutorService es= Executors.newCachedThreadPool();
/**
* newFixedThreadPool特點:
* 1.沒有臨時線程,全部都是核心線程
* 2.工作隊列是一個阻塞式鏈式隊列,且容量為Integer.MAX_VALUE,
* 此時認為這個線程池能夠處理無限多的請求
*/
//小池子大隊列
//適合于并發(fā)低的長任務場景,例如文件下載
//不適合高并發(fā)的短任務的場景
ExecutorService es1=Executors.newFixedThreadPool(5);
}
}
15.ScheduledExecutorService--定時調(diào)度執(zhí)行器任務。能夠起到定時調(diào)度的效果
package executorservice;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses= Executors.newScheduledThreadPool(5);
//延時執(zhí)行
//ses.schedule(new ScheduleThread(),5, TimeUnit.SECONDS);
//每隔5秒執(zhí)行一次
//從上次的開始來計算下一次的啟動時間
//實際間隔時間=max(指定時間,線程執(zhí)行時間)
//ses.scheduleAtFixedRate(new ScheduleThread(),0,5,TimeUnit.SECONDS);
//每隔5秒執(zhí)行一次
//從下一次的結束來計算下一次啟動時間
//實際間隔時間=指定時間+線程執(zhí)行時間
ses.scheduleWithFixedDelay(new ScheduleThread(),0,5,TimeUnit.SECONDS);
}
}
class ScheduleThread implements Runnable{
@Override
public void run() {
System.out.println("hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
16.ForkJoinPool分叉合并池
- 分叉:將一個大的任務拆分成多個小的任務交給多個線程來執(zhí)行
- 合并:將拆分出去的小的任務的計算結果來進行匯總
- 求1-100000000000L的和
package executorservice;
import java.util.concurrent.*;
public class ForkJoinPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start=System.currentTimeMillis();
//求1-100000000000L的和
//主函數(shù)所在的類默認是一個線程類-主線程
//一個線程只能落到一個CPU核上
//運行時間:39780
/*long sum=0;
for(long i=1;i<=100000000000L;i++){
sum+=i;
}
System.out.println(sum);*/
//運行時間:23613
ForkJoinPool pool=new ForkJoinPool();
Future<Long> f = pool.submit(new Sum(1, 100000000000L));
System.out.println(f.get());
pool.shutdown();
long end=System.currentTimeMillis();
System.out.println(end-start);
}
}
class Sum extends RecursiveTask<Long>{
private long start;
private long end;
public Sum(long start, long end) {
this.start = start;
this.end = end;
}
//分叉合并的邏輯就是覆蓋在這個方法中
@Override
protected Long compute() {
//拆分,如果拆分出去的范圍較大,那么繼續(xù)拆分
//如果拆分出去的范圍較小,那么將這個小的范圍的數(shù)字進行求和
if(end-start<=10000){
long sum=0;
for(long i=start;i<=end;i++){
sum+=i;
}
return sum;
}else {
long mid=(start+end)/2;
Sum left=new Sum(start,mid);
Sum right=new Sum(mid+1,end);
//分叉
left.fork();
right.fork();
//合并
return left.join()+right.join();
}
}
}
- 在數(shù)據(jù)量比較小的時候,使用循環(huán)的效率反而比較高,數(shù)據(jù)量越大,分叉合并的效率越高
- 分叉合并通過大量的線程搶占CPU,從而能夠有效地提高CPU的利用率,可能就會導致其他線程被擠占。因此在實際生產(chǎn)過程中,慎用分叉合并。如果需要使用分叉合并,放在相對空閑的時間來執(zhí)行
- 在分叉合并中,當一個核上的任務執(zhí)行完畢之后,這個和不會空閑下來,而是隨機掃描一個核,從這個被掃描的核的任務隊列尾端來“偷取”一個任務回來執(zhí)行--“work-stealing”(工作竊取)策略
五.Lock-鎖
1.Lock是JDK1.5提供的一套鎖機制,在實際生產(chǎn)過程中更推薦使用Lock代替synchronized---相對而言,Lock比synchronized更加靈活。
package Lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
static int i=0;
public static void main(String[] args) throws InterruptedException {
Lock lock=new ReentrantLock();
new Thread(new Add(lock)).start();
new Thread(new Add(lock)).start();
//main所在的類是一個線程類-主線程
//這個線程在執(zhí)行過程中需要啟動兩個Add的線程
//這兩個Add線程在啟動過程中,主線程會搶占CPU繼續(xù)執(zhí)行
//考慮:主線程即使搶占到CPU也需要阻塞
Thread.sleep(3000);
System.out.println(i);
}
}
class Add implements Runnable{
private Lock lock;
public Add(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
//加鎖
lock.lock();
for (int i = 0; i < 10000; i++) {
LockDemo.i++;
}
//解鎖
lock.unlock();
}
}
2.ReentrantLock-重入鎖
a.重入鎖:當鎖資源被釋放之后,這個鎖資源可以再次被線程占用
b.非重入鎖:當鎖資源被釋放之后,不能被再次使用--非重入鎖更多的是在校驗中使用
3.大部分排他鎖和自旋鎖
- 自旋鎖也是排他鎖
- 對于其他的陪他鎖而言,當一個線程占用鎖對象之后,其他的線程會陷入阻塞狀態(tài),持續(xù)等待。當鎖資源被釋放之后,被阻塞的線程需要被喚醒之后才能搶占,這個過程中就涉及到了線程的狀態(tài)的變化
- 自旋鎖的特點在于,當發(fā)現(xiàn)鎖資源被占用之后,線程不會陷入阻塞,而是持續(xù)判斷鎖資源是否被釋放
- 自旋鎖因為沒有狀態(tài)的轉化,所以效率相對要高一些;但是相對而言,自旋鎖會持續(xù)占用CPU資源
4.ReadWriteLock-讀寫鎖
- 讀鎖:允許多個人同時讀,不允許寫入--本質(zhì)上是共享鎖
- 寫鎖:只允許一個人寫,不允許讀--本質(zhì)上是排他鎖
//獲取寫鎖
ReadWriteLock rw=new ReentrantReadWriteLock();
Lock lock=rw.writeLock();
5.公平策略和非公平策略
- 在資源有限的情況下,雖然理論上各個線程搶占的幾率相等,但是實際上各個線程的搶占次數(shù)并不相等,這種現(xiàn)象稱之為非公平策略
-
在公平策略的前提下,各個線程并不能直接搶占資源,而是需要搶占入隊順序。此時,各個線程的執(zhí)行次數(shù)大致是相等的
公平策略.png - 公平策略需要涉及到大量線程調(diào)度的問題,所以相對而言,非公平策略的效率更高。
- synchronized、Lock默認是非公平的
//非公平的
ReadWriteLock rw=new ReentrantReadWriteLock(false);
//公平的
ReadWriteLock rw=new ReentrantReadWriteLock(true);
6.其他
- CountDownLatch:閉鎖/線程遞減鎖。對線程進行計數(shù),在計數(shù)歸零之前,線程會陷入阻塞;直到計數(shù)歸零之后,會自動放開阻塞-上一組線程結束需要開啟下一組線程
package Lock;
import java.util.concurrent.CountDownLatch;
/**
* 案例:考試
* 考官和考生到達考場之后,開始考試
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl=new CountDownLatch(7);
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Teacher(cdl)).start();
new Thread(new Teacher(cdl)).start();
//需要等上面的線程執(zhí)行完畢之后才能繼續(xù)執(zhí)行下面的邏輯
//在上面的線程執(zhí)行完畢之前,當前主線程需要阻塞
cdl.await();
System.out.println("開始考試?。?!");
}
}
class Teacher implements Runnable{
private CountDownLatch cdl;
public Teacher(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
//模擬:考官走到考場時間
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("考官到達考場~");
cdl.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Student implements Runnable{
private CountDownLatch cdl;
public Student(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
//模擬:考生走到考場時間
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("考生到達考場~");
cdl.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- CyclicBarrier:柵欄。對線程進行計數(shù)。在計數(shù)歸零之前,線程會陷入阻塞。直到線程計數(shù)歸零,會自動放開阻塞。-所有線程到達同一個點之后再分別繼續(xù)執(zhí)行
package Lock;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 案例:跑步比賽
* 運動員先到起跑線,人齊之后,聽到槍響之后在跑出去
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cb=new CyclicBarrier(6);
new Thread(new Runner(cb),"1號").start();
new Thread(new Runner(cb),"2號").start();
new Thread(new Runner(cb),"3號").start();
new Thread(new Runner(cb),"4號").start();
new Thread(new Runner(cb),"5號").start();
new Thread(new Runner(cb),"6號").start();
}
}
class Runner implements Runnable{
private CyclicBarrier cb;
public Runner(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
//模擬運動員走到起跑線
Thread.sleep((long) (Math.random()*10000));
String name=Thread.currentThread().getName();
System.out.println(name+"運動員走到了起跑線");
//先到起跑線的人需要等待,直到人齊了,聽到槍響之后再跑
//阻塞,減少計數(shù)--計數(shù)歸零會自動放開阻塞
cb.await();
System.out.println(name+"跑了出去~");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
- Exchanger<V>:交換機,用于交換兩個線程之間的信息。
package Lock;
import java.util.concurrent.Exchanger;
/**
* 案例:購物
* 一手交錢一手交貨
*/
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> ex=new Exchanger<>();
new Thread(new Seller(ex)).start();
new Thread(new Consumer(ex)).start();
}
}
class Consumer implements Runnable{
private final Exchanger<String> ex;
public Consumer(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String info="錢";
//挑好東西之后,需要付款,商家需要將商品交換給消費者
String msg= null;
try {
msg = ex.exchange(info);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者收到商家給的:"+msg);
}
}
class Seller implements Runnable{
private final Exchanger<String> ex;
public Seller(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String info="商品";
//商家將商品交付給消費者之后,需要收到消費者的付款
try {
String msg = ex.exchange(info);
System.out.println("商家收到消費者給的:"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- Semaphore:信號量。在執(zhí)行指定邏輯之前,線程需要先獲取信號。當信號被全部獲取完,那么后來的線程就會被阻塞;直到有信號被釋放,那么被阻塞的線程才會獲取信號執(zhí)行邏輯-在實際生產(chǎn)過程中,信號量適用于限流的
package Lock;
import java.util.concurrent.Semaphore;
/**
* 案例:去餐館吃飯
* 餐館中的桌子的數(shù)量有限。如果所有的桌子被占用,那么后來的人就會被阻塞
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//6個信號->6張桌子
Semaphore s=new Semaphore(6);
for (int i = 0; i < 10; i++) {
new Thread(new Eater(s)).start();
}
}
}
class Eater implements Runnable{
private Semaphore s;
public Eater(Semaphore s) {
this.s = s;
}
@Override
public void run() {
//占用一張桌子
//桌子->信號
try {
//獲取一個信號
s.acquire();
System.out.println("來了一波客人,占用了一張桌子");
//模擬用餐時間
Thread.sleep((long) (Math.random()*10000));
System.out.println("客人用餐完畢離開,空出來一張桌子");
//一張桌子就空出來相當于一個信號被釋放
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
六.Atomic操作-原子性操作
1.原子性操作實際上針對屬性提供了大量的線程安全的方法。在jdk1.8中,采用了CAS+volatile機制來保證屬性的線程安全。
2.volatile是Java中的關鍵字之一,是Java提供的一種輕量級的線程間的通信機制
- 保證線程的可見性。當共享資源發(fā)生變化的時候,其他線程能夠立即感知到這種變化并且做出對應的操作,這個過程稱之為可見性
- 不保證線程的原 子性。原子性指的是線程的執(zhí)行過程不可分割。換而言之,就是線程的執(zhí)行過程不會被打斷不會被搶占。加鎖實際上保證的線程的原子性。
- 禁止指令重排。指令重排指的是預先定義的順序和指令的實際執(zhí)行順序執(zhí)行不一致。 指令重排可能發(fā)生在每一步(java-class-System-CPU),但是注意,每一步過程中,發(fā)生指令重排的概率不足百萬分之一。指令重排不能違背happen-before(先發(fā)生)原則-使用的變量必須先產(chǎn)生。在多線程的情況下,執(zhí)行完全相同的代碼可能會因為指令重排獲取到不同的結果,這種現(xiàn)象稱之為結果的二相性。



