JUC

一.概述

1.JUC是JDK1.5中提供的一套并發(fā)包及其子包。包含以下:
java.util.concurrent,
java.util.concurrent.atmoic,
java.util.concurrent.lock
2.JUC中包含了5套接口:BlockingQueue、ConcurrentMap、ExecutorService,Lock和Automic

二.BlockingQueue-阻塞式隊列

1.特征:阻塞、FIFO(先進先出)
2.BlockingQueue不同于之前學習的Queue,不能進行擴容。即BlockingQueue在使用的時候指定的容量是多少就是多少
3.當隊列已滿時,試圖放入元素的線程會被阻塞;當隊列為空時,似乎獲取元素的線程會被阻塞。
阻塞式隊列不允許元素為空。
4.重要方法:

特征 拋出異常 返回值 阻塞 定時阻塞
添加元素 add- java.lang.IllegalStateException offer-false put offer
移除元素 remove-java.util.NoSuchElementException poll-null take poll

5.常見的實現(xiàn)類:

  • ArrayBlockingQueue-阻塞式順序隊列
    a.底層依靠數(shù)組來存儲數(shù)據(jù)
    b.使用的時候需要指定容量
package blockingqueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        //構建隊列
        ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(5);
        queue.add("5");
        queue.add("5");
        queue.add("5");
        queue.add("5");
        queue.add("5");
        //添加元素
        //隊列已滿
        //拋出異常,java.lang.IllegalStateException
        //queue.add("a");
        //返回false
        boolean r = queue.offer("b");
        System.out.println(r);
        //產(chǎn)生阻塞
        //queue.put("c");
        //定時阻塞
        boolean re = queue.offer("d", 5, TimeUnit.SECONDS);
        System.out.println(re);
        System.out.println(queue);
    }
}
  • LinkedBlockingQueue-阻塞式鏈式隊列
    a.底層依靠單向節(jié)點來存儲數(shù)據(jù)
    b.在使用的時候可以指定容量也可以不指定,如果指定了容量,則容量不可變。如果沒有指定容量,則容量為Integer.Max_VALUE,即2^31-1;此時因為這個容量相對較大,一般認為隊列是無限的。
package blockingqueue;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class LinkeBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>();
        //隊列為空
        //拋出異常java.util.NoSuchElementException
        //System.out.println(queue.remove());
        //返回null
        //System.out.println(queue.poll());
        //產(chǎn)生阻塞
        //System.out.println(queue.take());
        //定時阻塞
        System.out.println(queue.poll(5, TimeUnit.SECONDS));
    }
}
  • PriorityBlockingQueue-具有優(yōu)先級的阻塞式隊列
    a.在使用的時候可以不指定容量。如果不指定,則默認初始容量為11-在容量不夠,會進行擴容
    b.底層依靠數(shù)組存儲元素
    c.PriorityBlockingQueue會對放入其中的元素進行排序,要求元素對應的類必須實現(xiàn)Comparable接口,覆蓋compareTo方法
    d.如果需要給隊列單獨指定比較規(guī)則,那么可以傳入Comparator對象
    e.迭代遍歷不保證排序
package blockingqueue;

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<String> queue=new PriorityBlockingQueue<>();
        queue.put("Amy");
        queue.put("Maray");
        queue.put("Peter");
        queue.put("Bob");
        for(int i=0;i<4;i++){
            System.out.println(queue.take());
        }

        PriorityBlockingQueue<Studnet> studnets=new PriorityBlockingQueue<>();
        studnets.put(new Studnet("Amy",98,18));
        studnets.put(new Studnet("Bob",48,17));
        studnets.put(new Studnet("Cindy",85,20));
        studnets.put(new Studnet("Lue",56,22));
        studnets.put(new Studnet("Maray",100,18));
        for(int i=0;i<5;i++){
            System.out.println(studnets.take());
        }

        System.out.println("-----------------------------------------------");
        //需要給隊列單獨指定比較規(guī)則
        PriorityBlockingQueue<Studnet> studnet2=new PriorityBlockingQueue<>(
                5,(s1,s2)->s1.getAge()-s2.getAge());
        studnet2.put(new Studnet("Amy",98,18));
        studnet2.put(new Studnet("Bob",48,17));
        studnet2.put(new Studnet("Cindy",85,20));
        studnet2.put(new Studnet("Lue",56,22));
        studnet2.put(new Studnet("Maray",100,18));
        for(int i=0;i<5;i++){
            System.out.println(studnet2.take());
        }
    }
}
class Studnet implements Comparable<Studnet>{
    private String name;
    private int age;
    private int score;

    public Studnet(String name, int score,int age) {
        this.name = name;
        this.score = score;
        this.age=age;
    }
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Studnet{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", score=" + score +
                '}';
    }

    //按照分數(shù)進行排序
    //在這個方法指定比較規(guī)則
    //升序 this-o
    //降序 o-this
    @Override
    public int compareTo(Studnet o) {
        return o.score-this.score;
    }
}
  • SynchronousQueue-同步隊列
    a.不需要指定容量,容量默認為1且只能為1
  • 擴展:BlockingDeque-阻塞式雙向隊列-允許兩端存放兩端拿

三.ConcurrentMap-并發(fā)映射

1.ConcurrentMap是JDK1.5提供的一套用于應對高并發(fā)以及保證數(shù)據(jù)安全的映射機制。
2.ConcurrentMap包含了ConcurrentHashMap和ConcurrentNavigableMap
3.ConcurrentHashMap-并發(fā)哈希映射
1)底層是基于數(shù)組加鏈表來實現(xiàn)的。數(shù)組的每一個位置稱之為桶,每一個桶中維系一個鏈表。
2)如果不指定,默認情況下,初始容量為16,默認加載因子是0.75,擴容的時候是在原來的基礎上增加一倍。
3)ConcurrentHashMap的最大容量(最大桶數(shù))是2^30。
4)無論指定初始容量是多少,那么經(jīng)過計算,最終容量一定是2^n的形式。
5)從JDK1.8開始,ConcurrentHashMap引入了紅黑樹的機制。當ConcurrentHashMap桶中的元素數(shù)量達到8個,會將這個桶中的鏈表扭轉成為一個紅黑樹;如果桶中的元素數(shù)量不足7個的時候,會將這個桶中的紅黑樹再扭轉會鏈表。在ConcurrentHashMap中,使用紅黑樹的前提是容量>=64
6)轉化紅黑樹的前提是容量為>=64的原因如下:
假設現(xiàn)在容量為16,當其中桶中的鏈表數(shù)量達到了8個,這時需要將其轉化為紅黑樹,但是這時候concurrentHashMap進行了插入操作,這時同樣也達到了擴容的條件,這時需要同步進行紅黑樹的轉化以及rehash操作,產(chǎn)生了資源沖突。所以給定前提為容量>=64,這時擴容概率不會很大。
7)紅黑樹(Red-Black Tree)

  • 本質(zhì)上是一種自平衡二叉查找樹
  • 二叉查找樹的特征:
    a.左子樹小于根,右子樹大于根
    b.沒有相等的節(jié)點
  • 特征:
    a.所有節(jié)點非紅即黑
    b.根節(jié)點必須是黑節(jié)點
    c.紅節(jié)點的子節(jié)點必須是黑色的
    d.最底層的葉子節(jié)點必須是黑色的空節(jié)點
    e.從根節(jié)點到任意一個葉子節(jié)點經(jīng)過的路徑的黑色節(jié)點個數(shù)一致,即黑節(jié)點高度相同
    f.新添的節(jié)點顏色必須是紅色的
  • 紅黑樹的修正-前提:父子節(jié)點為紅
    a.叔父節(jié)點為紅,那么將父節(jié)點和叔父節(jié)點涂黑,祖父節(jié)點涂紅
    b.叔父節(jié)點為黑,且當前節(jié)點為右子葉,則以當前節(jié)點為軸進行左旋
    c.叔父節(jié)點為黑,且當前節(jié)點為左子葉,則以當前節(jié)點為軸進行右旋
  • 在紅黑樹中,每添加一個元素,都需要考慮這棵樹是否需要修正
  • 紅黑樹的查詢時間復雜度為O(log n)
    8)ConcurrentHashMap是一個異步線程安全的映射-支持并發(fā)。不同于Hashtable,ConcurrentHashMap采用了分段/桶鎖機制來保證線程安全。----宏觀同步,微觀異步(映射異步,桶同步)


    分段鎖.png

    HashMap:異步線程不安全
    Hashtable:同步線程安全-凡是對外提供的方法都是同步方法,靜態(tài)方法鎖對象是當前類的字節(jié)碼對象,非靜態(tài)方法的鎖對象是this
    9)線程在使用鎖的時候,會產(chǎn)生非常大的開銷(線程狀態(tài)切換、線程的上下文調(diào)度、CPU資源的切換等)


    鎖的資源消耗.png

    因此在JDK1.8中,引入了一套無鎖算法CAS(Compare And Swap 比較和交換)-CAS過程中涉及到線程的重新調(diào)度問題,所以CAS需要結合具體的CPU內(nèi)核架構實現(xiàn)。目前市面上幾乎所有的CPU內(nèi)核都是支持CAS的。Java中的CAS底層是依靠C語言實現(xiàn)的。
    CAS.png

    10)ConcurrentHashMap的用法是和HashMap一致。

    4.ConcurrentNavigableMap-并發(fā)導航映射
    1)ConcurrentNavigableMap提供了用于截取子映射的方法--headMap、tailMap、subMap

package concurrentmap;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConcurrentNavigableMapDemo {
    public static void main(String[] args) {
        //實現(xiàn)類ConcurrentSkipListMap-并發(fā)跳躍表映射
        ConcurrentNavigableMap<String,Integer> map=new ConcurrentSkipListMap<>();
        map.put("Amy",98);
        map.put("Bob",78);
        map.put("Jack",75);
        map.put("Rose",88);
        map.put("Tony",45);
        System.out.println(map);
        //從頭開始截取到指定位置
        System.out.println(map.headMap("Jack"));
        //從指定位置截取到尾部
        System.out.println(map.tailMap("Bob"));
        //截取指定范圍的數(shù)據(jù)
        System.out.println(map.subMap("Bob","Rose"));
    }
}

2)ConcurrentNavigableMap本身是一個接口,在JDK中提供了唯一的實現(xiàn)類ConcurrentSkipListMap-并發(fā)跳躍表映射--底層是基于跳躍表實現(xiàn)的
3)跳躍表:
原理參考連接:https://blog.csdn.net/qpzkobe/article/details/80056807

  • 針對有序列表來使用
  • 適合于讀多寫少的場景
  • 跳躍表可以進行多層提取,但是最后一層的元素個數(shù)不能少于2個
  • 典型的“以空間換時間”的產(chǎn)物
  • 當新增元素的時候,這個元素是否要提取到上層跳躍表中遵循“拋硬幣”原則
  • 跳躍表的時間復雜度為O(log n),空間復雜度為O(n)

四.ExecutorService-執(zhí)行器服務

1.本質(zhì)上是一個線程池。意義:減少線程的創(chuàng)建和銷毀,減少服務器資源的浪費,做到線程的復用
2.線程池在剛定義的時候是空的,沒有任何線程。
3.如果接收到一個請求,線程池中就會創(chuàng)建一個線程(core-thread -核心線程)用于處理這個請求
4.核心線程用完之后不會銷毀而是會去等待下一個請求。
5.在定義線程池的時候需要去給定核心線程的數(shù)量。
6.在核心線程達到指定數(shù)量之前,每次來的請求都會觸發(fā)創(chuàng)建一個新的核心線程。
7.如果核心線程被全部占用,那么后來的線程將會放到工作隊列(work queue)中臨時存儲。工作隊列本質(zhì)上是一個阻塞式隊列
8.如果工作隊列被全部占用,那么后來的請求會被交給一個臨時線程(temproary thread)來處理
9.在定義線程池的時候需要給定臨時線程的數(shù)量
10.臨時線程在處理完請求之后,會存活指定的一段時間。如果在這段時間內(nèi)接受到新的請求,那么臨時線程會繼續(xù)處理新的請求而暫時不會被銷毀;如果超過這段時間臨時線程沒有接收到新的請求,那么這個臨時線程就會被銷毀。
11.如果臨時線程被全部占用,那么后來的請求會被交給拒絕執(zhí)行處理器(RejectedExecutionHandler)來進行拒絕處理。
12.代碼:
submit和execute的區(qū)別:
execute()用于提交Runnable線程
submit()既可以提交Runnable線程也可以提交Callable線程

package executorservice;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceDemo {
    public static void main(String[] args) {
        //構建一個線程池
        /**
         *int corePoolSize---核心線程數(shù)量
         *int maximumPoolSize---最大線程數(shù)量=核心線程數(shù)+臨時線程數(shù)
         *long keepAliveTime---臨時線程存活時間
         *TimeUnit unit---時間單位
         *BlockingQueue<Runnable> workQueue---工作隊列
         *RejectedExecutionHandler handler---拒絕執(zhí)行處理器--如果有具體的拒絕流程,需要覆蓋這個接口
         */
        ExecutorService es=new ThreadPoolExecutor(5,
                10,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5),
                //實際過程中,會有一套明確的拒絕流程
                (r,e)-> System.out.println("拒絕執(zhí)行線程")
                );
        //new Thread(new ExecutorThread()).start();
        //可以通過線程池來執(zhí)行這個線程
        /**
         * submit和execute的區(qū)別:
         * execute()用于提交Runnable線程
         * submit()既可以提交Runnable線程也可以提交Callable線程
         */
        //es.execute(new ExecutorThread());
        es.submit(new Thread());
        //關閉線程池
        es.shutdown();
    }
}
class ExecutorThread implements Runnable{
    @Override
    public void run() {
        System.out.println("hello");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

13.Callable<T>
1)Callable是JDK1.5中提供的一套用于定義線程的方式,通過泛型來定義返回值類型
2)創(chuàng)建Callable線程的兩種方式:

package executorservice;

import java.util.concurrent.*;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //方式一:將Callable包裝成Runnable,通過Thread來啟動
        //Callable->FutureTask->RunnableFuture->Runnable
        FutureTask<String> f=new FutureTask<>(new CallableThread());
        new Thread(f).start();
        //獲取指定結果
        System.out.println(f.get());
        //方式二:通過線程池來啟動Callable線程
        ExecutorService es=new ThreadPoolExecutor(5,10,5,TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5));
        Future<String> f1 = es.submit(new CallableThread());
        System.out.println(f1.get());
        es.shutdown();
    }
}
//泛型定義的是返回值類型
class CallableThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        return "SUCCESS";
    }
}

3)Runnable和Callable比較:

比較 Runnable Callable
返回值 沒有返回值 通過泛型來定義返回值
啟動方式 1.通過Thread直接啟動
2.通過線程池的execute或者submit來啟動
1.包裝成Runnable之后通過Thread來啟動
2.通過線程池的submit方法來啟動
異常機制 不允許拋出異常,一旦出現(xiàn)異常需要立即捕獲處理,就沒有辦法利用全局機制來進行處理 允許拋出異常,意味著可以選擇用全局機制(例如Spring中的異常通知)來統(tǒng)一處理異常

14.預定義的線程池

package executorservice;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceDemo2 {
    public static void main(String[] args) {
        //預定義的線程池
        /**
         * newCachedThreadPool特點:
         * 1.沒有核心線程,全部都是臨時線程
         * 2.臨時線程的數(shù)量為Integer.MAX_VALUE,即2^31-1
         *  一臺服務器能夠承載的線程數(shù)量遠低于這個值
         *  所以此時認為這個線程池能夠處理無限多的請求
         * 3.臨時線程的存活時間是一分鐘
         * 4.工作隊列是一個同步隊列(容量為1)
         */
        //大池子小隊列
        //適合于高并發(fā)的短任務場景,例如即時通信
        //不適合于長任務場景
        ExecutorService es= Executors.newCachedThreadPool();
        /**
         * newFixedThreadPool特點:
         * 1.沒有臨時線程,全部都是核心線程
         * 2.工作隊列是一個阻塞式鏈式隊列,且容量為Integer.MAX_VALUE,
         *     此時認為這個線程池能夠處理無限多的請求
         */
        //小池子大隊列
        //適合于并發(fā)低的長任務場景,例如文件下載
        //不適合高并發(fā)的短任務的場景
        ExecutorService es1=Executors.newFixedThreadPool(5);
    }
}

15.ScheduledExecutorService--定時調(diào)度執(zhí)行器任務。能夠起到定時調(diào)度的效果

package executorservice;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses= Executors.newScheduledThreadPool(5);
        //延時執(zhí)行
        //ses.schedule(new ScheduleThread(),5, TimeUnit.SECONDS);
        //每隔5秒執(zhí)行一次
        //從上次的開始來計算下一次的啟動時間
        //實際間隔時間=max(指定時間,線程執(zhí)行時間)
        //ses.scheduleAtFixedRate(new ScheduleThread(),0,5,TimeUnit.SECONDS);
        //每隔5秒執(zhí)行一次
        //從下一次的結束來計算下一次啟動時間
        //實際間隔時間=指定時間+線程執(zhí)行時間
        ses.scheduleWithFixedDelay(new ScheduleThread(),0,5,TimeUnit.SECONDS);
    }
}
class ScheduleThread implements Runnable{

    @Override
    public void run() {
        System.out.println("hello");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

16.ForkJoinPool分叉合并池

  • 分叉:將一個大的任務拆分成多個小的任務交給多個線程來執(zhí)行
  • 合并:將拆分出去的小的任務的計算結果來進行匯總
  • 求1-100000000000L的和
package executorservice;

import java.util.concurrent.*;

public class ForkJoinPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start=System.currentTimeMillis();
        //求1-100000000000L的和
        //主函數(shù)所在的類默認是一個線程類-主線程
        //一個線程只能落到一個CPU核上
        //運行時間:39780
        /*long sum=0;
        for(long i=1;i<=100000000000L;i++){
            sum+=i;
        }
        System.out.println(sum);*/
        //運行時間:23613
        ForkJoinPool pool=new ForkJoinPool();
        Future<Long> f = pool.submit(new Sum(1, 100000000000L));
        System.out.println(f.get());
        pool.shutdown();

        long end=System.currentTimeMillis();
        System.out.println(end-start);
    }
}
class Sum extends RecursiveTask<Long>{
    private long start;
    private long end;

    public Sum(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //分叉合并的邏輯就是覆蓋在這個方法中
    @Override
    protected Long compute() {
        //拆分,如果拆分出去的范圍較大,那么繼續(xù)拆分
        //如果拆分出去的范圍較小,那么將這個小的范圍的數(shù)字進行求和
        if(end-start<=10000){
            long sum=0;
            for(long i=start;i<=end;i++){
                sum+=i;
            }
            return sum;
        }else {
            long mid=(start+end)/2;
            Sum left=new Sum(start,mid);
            Sum right=new Sum(mid+1,end);
            //分叉
            left.fork();
            right.fork();
            //合并
            return left.join()+right.join();
        }
    }
}
  • 在數(shù)據(jù)量比較小的時候,使用循環(huán)的效率反而比較高,數(shù)據(jù)量越大,分叉合并的效率越高
  • 分叉合并通過大量的線程搶占CPU,從而能夠有效地提高CPU的利用率,可能就會導致其他線程被擠占。因此在實際生產(chǎn)過程中,慎用分叉合并。如果需要使用分叉合并,放在相對空閑的時間來執(zhí)行
  • 在分叉合并中,當一個核上的任務執(zhí)行完畢之后,這個和不會空閑下來,而是隨機掃描一個核,從這個被掃描的核的任務隊列尾端來“偷取”一個任務回來執(zhí)行--“work-stealing”(工作竊取)策略

五.Lock-鎖

1.Lock是JDK1.5提供的一套鎖機制,在實際生產(chǎn)過程中更推薦使用Lock代替synchronized---相對而言,Lock比synchronized更加靈活。

package Lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    static int i=0;

    public static void main(String[] args) throws InterruptedException {
        Lock lock=new ReentrantLock();
        new Thread(new Add(lock)).start();
        new Thread(new Add(lock)).start();
        //main所在的類是一個線程類-主線程
        //這個線程在執(zhí)行過程中需要啟動兩個Add的線程
        //這兩個Add線程在啟動過程中,主線程會搶占CPU繼續(xù)執(zhí)行
        //考慮:主線程即使搶占到CPU也需要阻塞
        Thread.sleep(3000);
        System.out.println(i);
    }
}
class Add implements Runnable{
    private Lock lock;

    public Add(Lock lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        //加鎖
        lock.lock();
        for (int i = 0; i < 10000; i++) {
            LockDemo.i++;
        }
        //解鎖
        lock.unlock();
    }
}

2.ReentrantLock-重入鎖
a.重入鎖:當鎖資源被釋放之后,這個鎖資源可以再次被線程占用
b.非重入鎖:當鎖資源被釋放之后,不能被再次使用--非重入鎖更多的是在校驗中使用
3.大部分排他鎖和自旋鎖

  • 自旋鎖也是排他鎖
  • 對于其他的陪他鎖而言,當一個線程占用鎖對象之后,其他的線程會陷入阻塞狀態(tài),持續(xù)等待。當鎖資源被釋放之后,被阻塞的線程需要被喚醒之后才能搶占,這個過程中就涉及到了線程的狀態(tài)的變化
  • 自旋鎖的特點在于,當發(fā)現(xiàn)鎖資源被占用之后,線程不會陷入阻塞,而是持續(xù)判斷鎖資源是否被釋放
  • 自旋鎖因為沒有狀態(tài)的轉化,所以效率相對要高一些;但是相對而言,自旋鎖會持續(xù)占用CPU資源

4.ReadWriteLock-讀寫鎖

  • 讀鎖:允許多個人同時讀,不允許寫入--本質(zhì)上是共享鎖
  • 寫鎖:只允許一個人寫,不允許讀--本質(zhì)上是排他鎖
         //獲取寫鎖
        ReadWriteLock rw=new ReentrantReadWriteLock();
        Lock lock=rw.writeLock();

5.公平策略和非公平策略

  • 在資源有限的情況下,雖然理論上各個線程搶占的幾率相等,但是實際上各個線程的搶占次數(shù)并不相等,這種現(xiàn)象稱之為非公平策略
  • 在公平策略的前提下,各個線程并不能直接搶占資源,而是需要搶占入隊順序。此時,各個線程的執(zhí)行次數(shù)大致是相等的


    公平策略.png
  • 公平策略需要涉及到大量線程調(diào)度的問題,所以相對而言,非公平策略的效率更高。
  • synchronized、Lock默認是非公平的
         //非公平的
        ReadWriteLock rw=new ReentrantReadWriteLock(false);
         //公平的
        ReadWriteLock rw=new ReentrantReadWriteLock(true);

6.其他

  • CountDownLatch:閉鎖/線程遞減鎖。對線程進行計數(shù),在計數(shù)歸零之前,線程會陷入阻塞;直到計數(shù)歸零之后,會自動放開阻塞-上一組線程結束需要開啟下一組線程
package Lock;

import java.util.concurrent.CountDownLatch;

/**
 * 案例:考試
 * 考官和考生到達考場之后,開始考試
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch cdl=new CountDownLatch(7);
        new Thread(new Student(cdl)).start();
        new Thread(new Student(cdl)).start();
        new Thread(new Student(cdl)).start();
        new Thread(new Student(cdl)).start();
        new Thread(new Student(cdl)).start();
        new Thread(new Teacher(cdl)).start();
        new Thread(new Teacher(cdl)).start();
        //需要等上面的線程執(zhí)行完畢之后才能繼續(xù)執(zhí)行下面的邏輯
        //在上面的線程執(zhí)行完畢之前,當前主線程需要阻塞
        cdl.await();
        System.out.println("開始考試?。?!");
    }
}
class Teacher implements Runnable{
    private CountDownLatch cdl;

    public Teacher(CountDownLatch cdl) {
        this.cdl = cdl;
    }

    @Override
    public void run() {
        //模擬:考官走到考場時間
        try {
            Thread.sleep((long)(Math.random()*10000));
            System.out.println("考官到達考場~");
            cdl.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Student implements Runnable{
    private CountDownLatch cdl;

    public Student(CountDownLatch cdl) {
        this.cdl = cdl;
    }

    @Override
    public void run() {
        //模擬:考生走到考場時間
        try {
            Thread.sleep((long)(Math.random()*10000));
            System.out.println("考生到達考場~");
            cdl.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • CyclicBarrier:柵欄。對線程進行計數(shù)。在計數(shù)歸零之前,線程會陷入阻塞。直到線程計數(shù)歸零,會自動放開阻塞。-所有線程到達同一個點之后再分別繼續(xù)執(zhí)行
package Lock;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 案例:跑步比賽
 * 運動員先到起跑線,人齊之后,聽到槍響之后在跑出去
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cb=new CyclicBarrier(6);
        new Thread(new Runner(cb),"1號").start();
        new Thread(new Runner(cb),"2號").start();
        new Thread(new Runner(cb),"3號").start();
        new Thread(new Runner(cb),"4號").start();
        new Thread(new Runner(cb),"5號").start();
        new Thread(new Runner(cb),"6號").start();
    }
}
class Runner implements Runnable{

    private CyclicBarrier cb;

    public Runner(CyclicBarrier cb) {
        this.cb = cb;
    }

    @Override
    public void run() {
        try {
            //模擬運動員走到起跑線
            Thread.sleep((long) (Math.random()*10000));
            String name=Thread.currentThread().getName();
            System.out.println(name+"運動員走到了起跑線");
            //先到起跑線的人需要等待,直到人齊了,聽到槍響之后再跑
            //阻塞,減少計數(shù)--計數(shù)歸零會自動放開阻塞
            cb.await();
            System.out.println(name+"跑了出去~");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
  • Exchanger<V>:交換機,用于交換兩個線程之間的信息。
package Lock;

import java.util.concurrent.Exchanger;

/**
 * 案例:購物
 * 一手交錢一手交貨
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> ex=new Exchanger<>();
        new Thread(new Seller(ex)).start();
        new Thread(new Consumer(ex)).start();
    }
}
class Consumer implements Runnable{

    private final Exchanger<String> ex;

    public Consumer(Exchanger<String> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        String info="錢";
        //挑好東西之后,需要付款,商家需要將商品交換給消費者
        String msg= null;
        try {
            msg = ex.exchange(info);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消費者收到商家給的:"+msg);
    }
}
class Seller implements Runnable{
    private final Exchanger<String> ex;

    public Seller(Exchanger<String> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        String info="商品";
        //商家將商品交付給消費者之后,需要收到消費者的付款
        try {
            String msg = ex.exchange(info);
            System.out.println("商家收到消費者給的:"+msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • Semaphore:信號量。在執(zhí)行指定邏輯之前,線程需要先獲取信號。當信號被全部獲取完,那么后來的線程就會被阻塞;直到有信號被釋放,那么被阻塞的線程才會獲取信號執(zhí)行邏輯-在實際生產(chǎn)過程中,信號量適用于限流的
package Lock;

import java.util.concurrent.Semaphore;

/**
 * 案例:去餐館吃飯
 * 餐館中的桌子的數(shù)量有限。如果所有的桌子被占用,那么后來的人就會被阻塞
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //6個信號->6張桌子
        Semaphore s=new Semaphore(6);
        for (int i = 0; i < 10; i++) {
            new Thread(new Eater(s)).start();
        }
    }
}
class Eater implements Runnable{

    private Semaphore s;

    public Eater(Semaphore s) {
        this.s = s;
    }

    @Override
    public void run() {
        //占用一張桌子
        //桌子->信號
        try {
            //獲取一個信號
            s.acquire();
            System.out.println("來了一波客人,占用了一張桌子");
            //模擬用餐時間
            Thread.sleep((long) (Math.random()*10000));
            System.out.println("客人用餐完畢離開,空出來一張桌子");
            //一張桌子就空出來相當于一個信號被釋放
            s.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

六.Atomic操作-原子性操作

1.原子性操作實際上針對屬性提供了大量的線程安全的方法。在jdk1.8中,采用了CAS+volatile機制來保證屬性的線程安全。
2.volatile是Java中的關鍵字之一,是Java提供的一種輕量級的線程間的通信機制

  • 保證線程的可見性。當共享資源發(fā)生變化的時候,其他線程能夠立即感知到這種變化并且做出對應的操作,這個過程稱之為可見性
  • 不保證線程的原 子性。原子性指的是線程的執(zhí)行過程不可分割。換而言之,就是線程的執(zhí)行過程不會被打斷不會被搶占。加鎖實際上保證的線程的原子性。
  • 禁止指令重排。指令重排指的是預先定義的順序和指令的實際執(zhí)行順序執(zhí)行不一致。 指令重排可能發(fā)生在每一步(java-class-System-CPU),但是注意,每一步過程中,發(fā)生指令重排的概率不足百萬分之一。指令重排不能違背happen-before(先發(fā)生)原則-使用的變量必須先產(chǎn)生。在多線程的情況下,執(zhí)行完全相同的代碼可能會因為指令重排獲取到不同的結果,這種現(xiàn)象稱之為結果的二相性。
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容