java.util.concurrent----多線程基礎(chǔ)

創(chuàng)建線程的方法

創(chuàng)建線程的方式一共有四種,1.繼承Thread,2.實現(xiàn)Runnable接口,3.實現(xiàn)Callable接口,4.線程池
首先介紹前兩種方式:

public class TestThread {

    public static void main(String[] args) {
        ThreadDemo1 td1 = new ThreadDemo1();
        ThreadDemo2 td2 = new ThreadDemo2();
        td1.start();
        new Thread(td2).start();
    }
}

class ThreadDemo1 extends Thread {

    @Override
    public void run() {
        System.out.println("thread");
    }
}

class ThreadDemo2 implements Runnable {

    @Override
    public void run() {
        System.out.println("runnable");
    }
}

Thread本身就繼承了Runnable接口,所以自然也就重寫了run()方法,所以在使用繼承Thread和實現(xiàn)Runnable都需要重寫run方法。
我們平時不會去采用繼承Thread的方式(單繼承,多實現(xiàn))。

都會采用start方法來開啟線程,與run相比,start方法會創(chuàng)建一個線程,并把線程至于可運行狀態(tài)(CPU可以進(jìn)行調(diào)度)。然后主線程會跳過這段代碼繼續(xù)向下執(zhí)行。當(dāng)執(zhí)行到該線程的時候,會調(diào)用run()方法,執(zhí)行線程內(nèi)部的邏輯。如果直接使用run方法是不會開啟一條線程的,相當(dāng)于仍然是單線程在執(zhí)行。

線程安全問題

線程安全問題存在于多個線程對共享變量的訪問。線程安全主要有兩個問題,一個是原子性,一個是可見性。這兩個問題都可以加鎖的方式解決,但在某些場合,加鎖的代價很重,因此不太適合。下面會介紹不通過加鎖解決線程安全的方法。

  1. 可見性:使用volatile關(guān)鍵字。每一個線程都會有自己的一塊內(nèi)存,當(dāng)它們對共享數(shù)據(jù)進(jìn)行修改的時候,只會更改自己線程內(nèi)存中的數(shù)據(jù),刷新到主內(nèi)存中需要一定的時間,其它的線程是無法快速感知的,這樣的話就會造成數(shù)據(jù)錯誤。volatile關(guān)鍵字的作用是,使用該關(guān)鍵子修飾的共享變量在修改的時候,會將數(shù)據(jù)同步到主內(nèi)存,變量在讀取的時候不從自己線程的緩存中讀取,而是去主內(nèi)存中讀取,我們可以理解為使用了volatile后,修改和讀取都是在主內(nèi)存中進(jìn)行的,雖然會有一定的開銷,但對于加鎖操作來說,開銷小太多了。除此之外volatile還有禁止指令重排的功能(屏障)。
//一般用于開關(guān)使用
public class TestVolatile {

    public static void main(String[] args) {
        ThreadDemo td = new ThreadDemo();
        new Thread(td).start();

        while (true) {
            //可以加同步鎖保證可見性,但是太重了(通過Happen-before原則來實現(xiàn)可見性)
            if(td.isFlag()) {
                System.out.println("--------------------");
                break;
            }

        }
    }

}

class ThreadDemo implements Runnable {

    //去掉volatile以后,主方法sout不會執(zhí)行
    private volatile boolean flag = false;

    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag = true;
        System.out.println("flag = " + isFlag());
    }

    public boolean isFlag() {
        return flag;
    }

}
  1. 原子性:一個經(jīng)典的例子i++,它是需要有兩步操作的,所以在多線程的環(huán)境下會有安全問題,volatile只能保證可見性,但是并不能保證原子性。
public class TestAtomic {

    public static void main(String[] args) {
        ThreadAtomicDemo td = new ThreadAtomicDemo();
        for(int i = 0; i < 10; i++) {
            new Thread(td).start();
        }
    }

}

class ThreadAtomicDemo implements Runnable {

    //無法保證原子性,有可能會導(dǎo)致有相同的值出現(xiàn)
    //private int num = 0;

    //使用juc中Atomic包下的類,自帶volatile,原子性通過CAS保證
    AtomicInteger num = new AtomicInteger(0);

    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(getNum());
    }

    public int getNum() {
        //return num++;
        return num.getAndIncrement();
    }
}

每一個數(shù)據(jù)類型都對應(yīng)著其線程安全的數(shù)據(jù)類型,例如Integer對應(yīng)著AtomicInteger,其可見性通過volatile關(guān)鍵字來保證,原子性則是通過CAS來保證的。
CAS:硬件對于原子性的支持,舉個例子,假設(shè)一個線程要對共享數(shù)據(jù)i進(jìn)行加1的操作,首先它要去主內(nèi)存中去取i的值,發(fā)現(xiàn)i為2,接下來它對i進(jìn)行加1操作,同時再去主內(nèi)存中取i的值,如果此時i的值仍然為2,那么就把3寫回主內(nèi)存,如果它發(fā)現(xiàn)此時i的值不為2,那么從頭開始,重新嘗試對i進(jìn)行加1操作。

線程安全的集合類

我們會經(jīng)常使用集合,在多線程的情況又如何來保證安全問題呢?最簡單粗暴的方式就是對修改集合中數(shù)據(jù)的方法加同步鎖。第二種方式是使用Collections.synchronizedList(new ArrayList<>()),同樣也有set和map的方法,缺點是,在對集合中的數(shù)據(jù)進(jìn)行修改的時候會將整張表鎖住,所以在集合遍歷的時候,進(jìn)行修改的話,會有并發(fā)修改異常。接下來介紹一些常用的線程安全的集合類。

  1. ConcurrentHashMap:與HashTable不同的是,HashTable鎖住了整張表,而ConcurrentHashMap則是使用分段鎖(Segment)來表示不同的部分,每個段其實就是一個小的hashTable,它們都有自己的鎖(Lock),所以當(dāng)多個修改發(fā)生在不同段上的時候,就可以并發(fā)的進(jìn)行。當(dāng)然它也會有上述所說的異常。
  2. CopyOnWriteArrayList/CopyOnWriteArraySet:CopyOnWrite容器是寫的時候復(fù)制的容器,就是我們在往集合里面寫東西的時候,不是直接寫而是先copy這個容器,我們在copy的容器中添加元素,之后將指針指向新容器,因此可以并發(fā)的讀,不需要加鎖,十分適合讀多寫少的并發(fā)場景。當(dāng)然它的缺點一個是內(nèi)存占用的問題,一個是它只能保證最終一致性。

具體選擇用哪種線程安全的集合類看業(yè)務(wù)場景而定。

public class TestCopyOnWriteArrayList {

    public static void main(String[] args) {

        ThreadCopyOnWriteDemo td = new ThreadCopyOnWriteDemo();
        for(int i = 0; i < 10; i++) {
            new Thread(td).start();
        }

    }

}

class ThreadCopyOnWriteDemo implements Runnable {

    /**
     * 會有并發(fā)修改異常出現(xiàn)
     */
    private static List<String> list = Collections.synchronizedList(new ArrayList<>());
    /**
     * 底層通過每次修改集合都會復(fù)制出一個新的集合來保證線程安全
     * 適用于讀多寫少的多線程環(huán)境
     */
    //private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

    static {
        list.add("AA");
        list.add("BB");
        list.add("CC");
    }

    @Override
    public void run() {

        Iterator<String> it = list.iterator();

        while (it.hasNext()) {
            System.out.println(it.next());
            list.add("AA");
        }

    }
}

線程之間的通訊

舉一個例子說,如果我希望一個線程要等其他的線程運行完之后再去運行。

  1. CountDownLatch:
  • 允許一個或者多個線程等待其他線程完成之后再執(zhí)行,比如人齊了一起吃飯。

  • CountDownLatch(int count): 構(gòu)造方法,初始化計數(shù)器。

  • await(): 是當(dāng)前線程在計數(shù)器為0之前一直等待,除非線程被中斷。

  • countDown(): 計數(shù)器減1。

  • getCount(): 返回當(dāng)前計數(shù)。

  • CountDownLatch內(nèi)部有一個線程數(shù)量計數(shù)器,當(dāng)一個(或多個)線程執(zhí)行await方法后等待,其他的線程完成任務(wù)后,計數(shù)器減1。如果此時計數(shù)器仍然大于0,那么等待的線程繼續(xù)等待。如果為0,表示其他線程任務(wù)執(zhí)行完成之后,等待的線程會被喚醒。

public class TestCountDown {

    public static void main(String[] args) throws InterruptedException {
        //指定計數(shù)器5,每當(dāng)有線程執(zhí)行完后就減1,當(dāng)減到0的時候,主線程才會執(zhí)行
        final CountDownLatch latch = new CountDownLatch(5);
        TestCountDownDemo td = new TestCountDownDemo(latch);
        long start = System.currentTimeMillis();
        for(int i = 0; i < 5; i++) {
            new Thread(td).start();
        }
        //主線程等待,只有l(wèi)atch的計數(shù)器到0的時候才會放行
        latch.await();
        /**
         * 當(dāng)其它線程全部執(zhí)行完畢后才會執(zhí)行
         * 用于總計算,那些需要其它線程執(zhí)行完結(jié)果再去執(zhí)行的場景
         */
        long end = System.currentTimeMillis();
        System.out.println("耗費時間為:" + (end - start));
    }

}

class TestCountDownDemo implements Runnable {

    private CountDownLatch latch;

    public TestCountDownDemo(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {

        synchronized (this) {
            try {
                for(int i = 0; i < 10; i++) {
                    if(i % 2 ==0) {
                        System.out.println(i);
                    }
                }
            } finally {
                //當(dāng)線程執(zhí)行完畢后減1
                latch.countDown();
            }
        }
    }
}
  1. 通過實現(xiàn)Callable接口來創(chuàng)建線程,實現(xiàn)Callable接口,可以將線程的結(jié)果進(jìn)行返回,需要FutureTask實現(xiàn)類的支持,用于接收結(jié)果。簡單來說就是線程能拋異常,能又返回值,返回值存在Future中。Callable實例當(dāng)作參數(shù),生成一個FutureTask的對象,然后把這個對象當(dāng)作一個Runnable,作為參數(shù)令起線程。
public class TestCallable {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadCallableDemo td = new ThreadCallableDemo();
        FutureTask<Integer> future = new FutureTask<>(td);
        new Thread(future).start();
        //接收線程運算后的結(jié)果
        Integer result = future.get();
        System.out.println(result);
        System.out.println("-----------------------");

    }

}

class ThreadCallableDemo implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {

        int sum = 0;
        for(int i = 0; i <= 100; i++) {
            sum += i;
        }

        return sum;
    }
}

Future的核心思想:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智??梢栽谡{(diào)用f的時候,立刻返回一個Future,可以通過Future這個數(shù)據(jù)結(jié)構(gòu)去控制方法f的計算過程。

  • get:獲取計算結(jié)果(如果還沒計算完,也是必須等待的)
  • cancel:還沒計算完,可以取消計算過程
  • isDone:判斷是否計算完
  • isCancelled:判斷計算是否別取消

實際上,F(xiàn)utureTask也是一個Runnable,其中的run方法的邏輯就是運行Callable的call方法,然后保存結(jié)果或者異常。

讀寫鎖

讀寫鎖希望在共享數(shù)據(jù)的訪問上,讀讀不互斥,讀寫互斥,寫寫互斥,而且希望讀必須是最新的數(shù)據(jù),因此需要加讀寫鎖。

public class TestReadAndWrite {

    public static void main(String[] args) {
        ReadAndWrite rw = new ReadAndWrite();

        for(int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    rw.get();
                }
            }).start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                rw.save();
            }
        }, "write").start();
    }
}


class ReadAndWrite {

    private int number = 0;

    private ReadWriteLock lock = new ReentrantReadWriteLock();

    //讀
    public void get() {

        lock.readLock().lock();

        try {
            System.out.println(Thread.currentThread().getName() + " : " + number);
        } finally {
            lock.readLock().unlock();
        }

    }

    //寫
    public void save() {

        lock.writeLock().lock();

        try {
            //Thread.sleep(20);
            System.out.println(Thread.currentThread().getName());
            this.number = new Random().nextInt(100);

        } catch (Exception e) {

        } finally {
            lock.writeLock().unlock();
        }

    }
}

線程池

創(chuàng)建線程的第四種方式

為什么要使用線程池
  1. 減少過于頻繁的創(chuàng)建、銷毀線程,增加處理效率。
  2. 線程并發(fā)數(shù)量過多,搶占系統(tǒng)資源從而導(dǎo)致堵塞。
  3. 對線程進(jìn)行簡單的處理。
線程池中的參數(shù)
  • corePoolSize:線程池中核心線程的最大值。(線程池里面分為核心線程和非核心線程) PS:核心線程默認(rèn)會一直存活,即使這個核心線程啥事都不干。
  • maximumPoolSize:線程總數(shù)最大值。(線程總數(shù) = 核心線程 + 非核心線程)
  • keepAliveTime:非核心線程,閑置最長時長。
  • TimeUnit:keepAliveTime的單位。
  • BlockingQueue:線程池中的任務(wù)隊列,核心線程沒滿,新添加的線程直接進(jìn)核心線程,核心線程滿了,加入任務(wù)隊列,若隊列滿了,新建非核心線程,若超出之前設(shè)置的線程總數(shù)最大值,就會報錯。
Executors提供的線程池創(chuàng)建配置
  1. newCachedThreadPool(): 用于處理大量短時間工作任務(wù)的線程池。
  • 試圖緩存線程并重用,當(dāng)無緩存線程可用時,會創(chuàng)建新的線程。
  • 如果線程閑置時間超過60s,則被終止并移出緩存。
  • 內(nèi)部使用SynchronousQueue作為工作隊列。
(corePoolSize: 0, maximumPoolSize: Integer.MAX_VALUE, keepAliveTime: 60L, TimeUnit: SECONDS, BlockingQueue: SynchronousQueue)
  1. newFixThreadPool(int nThreads):
  • 重用指定數(shù)目的線程。(nThreads)
  • 使用無界的工作隊列。
  • 任務(wù)數(shù)量超過活動隊列的數(shù)目,將在工作隊列中等待空閑線程出現(xiàn)。如果有工作線程退出,將會有新的工作線程被創(chuàng)建,以補足指定數(shù)目的nThreads。
(corePoolSize: nThreads, maximumPoolSize: nThreads, keepAliveTime: 0L, TimeUnit: SECONDS, BlockingQueue: LinkedBlockingQueue)
  1. newSingleThreadExecutor(): 創(chuàng)建的是ScheduledExecutorService,也就是可以進(jìn)行定時或周期性的工作調(diào)度。
  • 工作線程數(shù)目限制為1,保證所有任務(wù)都是被順序執(zhí)行。
  • 最多會有一個任務(wù)處于活動狀態(tài)。
  • 不允許使用者更改線程池實例,可以避免其改變線程數(shù)目。
(corePoolSize: 1, maximumPoolSize: 1, keepAliveTime: 0L, TimeUnit: SECONDS, BlockingQueue: LinkedBlockingQueue)
  1. newScheduledThreadPool(int corePoolSize): 同樣是ScheduledExecutorService
  • 會保持corePoolSize個工作線程。
  • 可以設(shè)置延遲多少秒執(zhí)行。
(corePoolSize: corePoolSize, maximumPoolSize: Integer.MAX_VALUE, BlockingQueue: DelayedQueue)
public class TestThreadPool {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        SumRunnable sr = new SumRunnable();
        SumCallable sc = new SumCallable();

        /*for(int i = 0; i < 100; i++) {
            executorService.submit(sr);
        }*/

        List<Future<Integer>> list = new ArrayList<>();
        //List<Integer> list1 = new ArrayList<>();
        for(int i = 0; i < 100; i++) {
            Future<Integer> future = executorService.submit(sc);
            list.add(future);
            //list1.add(future.get());
        }
        executorService.shutdown();

        for(Future<Integer> future: list) {
            System.out.println(future.get());
        }

        /*for(Integer i: list1) {
            System.out.println(i);
        }*/
    }



}

class SumRunnable implements Runnable {

    private int number = 0;

    @Override
    public void run() {

        System.out.println(Thread.currentThread().getName());
        for(int i = 0; i <= 100; i++) {
            number += i;
        }

    }
}

class SumCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {

        int number = 0;
        Thread.sleep(100);
        for(int i = 0; i <= 100; i++) {
            number += i;
        }
        return number;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容