JUC并發(fā)集合總結(jié)

ConcurrentLinkedQueue

線程安全的支持高并發(fā)的隊(duì)列,使用鏈表實(shí)現(xiàn)。非阻塞,無鎖,無界。該隊(duì)列也不允許空元素,而且size方法并不是常量,其需要遍歷鏈表,此時(shí)并發(fā)修改鏈表會(huì)造成統(tǒng)計(jì)size不正確。同樣,bulk操作和equal以及toArray方法不保證原子性。

代碼實(shí)現(xiàn):

public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) {

        final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

        // 往隊(duì)列中執(zhí)行添加的任務(wù)
        Runnable offerTask = () -> {
            String threadName = Thread.currentThread().getName();
            for (int i = 0; i < 10000; i++) {
                queue.offer(threadName + i);
            }
        };

        // 往隊(duì)列中執(zhí)行移除的任務(wù)
        Runnable pollTask = () -> {
            for (int i = 0; i < 10000; i++) {
                queue.poll();
            }
        };

        Thread[] threads = new Thread[100];

        // 100個(gè)offerTask線程
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(offerTask);
            threads[i].start();
        }

        // 主線程等待生產(chǎn)線程執(zhí)行完畢
        for (int i = 0; i < 100; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("當(dāng)前隊(duì)列size:" + queue.size());
        Assert.assertEquals(10000 * 100, queue.size());

        // 100個(gè)pollTask線程
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(pollTask);
            threads[i].start();
        }

        // 主線程等待消費(fèi)線程執(zhí)行完畢
        for (int i = 0; i < 100; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("當(dāng)前隊(duì)列size:" + queue.size());
        Assert.assertEquals(0, queue.size());
    }
}

ConcurrentLinkedDeque

并發(fā)隊(duì)列ConcurrentLinkedDeque,這是一個(gè)非阻塞,無鎖,無界,線程安全雙端操作的隊(duì)列。簡(jiǎn)單說就是ConcurrentLinkedQueue的升級(jí)版,在JDK7之后才提供。該隊(duì)列也不允許空元素,而且size方法并不是常量,其需要遍歷鏈表,此時(shí)并發(fā)修改鏈表會(huì)造成統(tǒng)計(jì)size不正確。同樣,bulk操作和equal以及toArray方法不保證原子性。

主要API介紹

  1. 不拋異常,會(huì)移除
    pollFirst()pollLast():返回并移除隊(duì)列中第一個(gè)元素和最后一個(gè)元素,如果隊(duì)列為空,返回null
  2. 不拋異常,不會(huì)移除
    peek()、peekFirst()peekLast():這些方法將分別返回列表的第一個(gè)和最后一個(gè)元素。它們不會(huì)從列表刪除返回的元素。如果列表為空,這些方法將返回null值。
  3. 拋異常,會(huì)移除
    remove()、removeFirst()removeLast():這些方法將分別返回列表的第一個(gè)和最后一個(gè)元素。它們將從列表刪除返回的元素。如果列表為空,這些方法將拋出NoSuchElementExcpetion異常。
  4. 拋異常,不會(huì)移除
    getFirst()getLast():這些方法將分別返回列表的第一個(gè)和最后一個(gè)元素。它們不會(huì)從列表刪除返回的元素。如果列表為空,這些方法將拋出NoSuchElementExcpetion異常。

代碼實(shí)現(xiàn)

public class AddTask implements Runnable {

    private ConcurrentLinkedDeque<String> deque;

    public AddTask(ConcurrentLinkedDeque<String> deque) {
        this.deque = deque;
    }

    // 在列表中存儲(chǔ)10000個(gè)正在執(zhí)行任務(wù)的線程的名稱和一個(gè)數(shù)字的字符串。
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        for(int i = 0; i < 10000; i++) {
            deque.add(name + ": Element " + i);
        }
    }
}
public class PollTask implements Runnable {
    private ConcurrentLinkedDeque<String> deque;

    public PollTask(ConcurrentLinkedDeque<String> deque) {
        this.deque = deque;
    }

    // 從列表中取出10000個(gè)元素(在一個(gè)循環(huán)5000次的循環(huán)中,每次取出2個(gè)元素)。
    @Override
    public void run() {
        for(int i = 0; i < 5000; i++) {
            // 返回并移除隊(duì)列中第一個(gè)元素,如果隊(duì)列為空,返回null
            deque.pollFirst();
            // 返回并隊(duì)列中最后一個(gè)元素,如果隊(duì)列為空,返回null
            deque.pollLast();
        }
    }
}
public class Main {

    public static void main(String[] args) {

        final ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
        Thread[] threads = new Thread[100];

        // 創(chuàng)建100個(gè)AddTask,并啟動(dòng)線程,每一個(gè)task都會(huì)在list中加入10000個(gè)元素,即最終應(yīng)該是1000000個(gè)元素
        for(int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new AddTask(deque));
            threads[i].start();
        }
        System.out.printf("Main: %d AddTask threads have been launched\n", threads.length);

        // 讓Main線程等待所有AddTask線程執(zhí)行完畢后才能繼續(xù)執(zhí)行
        for(int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Main: Size of the List: %d\n", deque.size());

        // 創(chuàng)建100個(gè)PollTask,并啟動(dòng)線程,每一個(gè)task會(huì)從list中取出10000個(gè)元素,即list最終應(yīng)剩余0個(gè)元素
        for(int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new PollTask(deque));
            threads[i].start();
        }
        System.out.printf("Main: %d PollTask threads have been launched\n", threads.length);

        // 讓Main線程等待所有PollTask線程執(zhí)行完畢后才能繼續(xù)執(zhí)行
        for(int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Main: Size of the List: %d\n", deque.size());
    }
}

DelayQueue

此隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,其實(shí)也是一個(gè)阻塞隊(duì)列。

代碼實(shí)現(xiàn)

// 事件類
public class Event implements Delayed {
    // 到期時(shí)間
    private Date startDate;

    public Event(Date startDate) {
        this.startDate = startDate;
    }

    // 此方法返回此延遲對(duì)象剩余多少時(shí)間到期,根據(jù)給定的TmeUnit表示
    @Override
    public long getDelay(TimeUnit unit) {
        Date now = new Date();
        long diff = startDate.getTime() - now.getTime();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    // 可以理解為,延遲隊(duì)列根據(jù)此方法的返回值,排列延遲對(duì)象在延遲隊(duì)列中的位置
    // 這里表達(dá)的意思是: 距離到期時(shí)間越遠(yuǎn)的延遲對(duì)象,放到延遲隊(duì)列的越后面
    @Override
    public int compareTo(Delayed o) {
        long result = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        if (result < 0) {
            return -1;
        } else if (result > 0) {
            return 1;
        } else {
            return 0;
        }
    }
}
// 任務(wù)類
public class Task implements Runnable {

    private int id;
    private DelayQueue<Event> queue;

    public Task(int id, DelayQueue<Event> queue) {
        this.id = id;
        this.queue = queue;
    }

    @Override
    public void run() {
        Date now = new Date();
        Date delay = new Date();
        delay.setTime(now.getTime() + (id * 4000));
        System.out.printf("Thread %s: %s\n", id, delay);

        for (int i = 0; i < 100; i++) {
            Event event = new Event(delay);
            queue.add(event);
        }
    }
}
// 主類
/*
    DelayedQueue類是Java API提供的一種有趣的數(shù)據(jù)結(jié)構(gòu),并且你可以用在并發(fā)應(yīng)用程序中。
    在這個(gè)類中,你可以存儲(chǔ)帶有激活日期的元素。方法返回或抽取隊(duì)列的元素將忽略未到期的數(shù)據(jù)元素。它們對(duì)這些方法來說是看不見的。
    為了獲取這種行為,你想要存儲(chǔ)到DelayedQueue類中的元素必須實(shí)現(xiàn)Delayed接口。這個(gè)接口允許你處理延遲對(duì)象,
    所以你將實(shí)現(xiàn)存儲(chǔ)在DelayedQueue對(duì)象的激活日期,這個(gè)激活時(shí)期將作為對(duì)象的剩余時(shí)間,直到激活日期到來。
    這個(gè)接口強(qiáng)制實(shí)現(xiàn)以下兩種方法:
    1.compareTo(Delayed o):Delayed接口繼承Comparable接口。
        如果執(zhí)行這個(gè)方法的對(duì)象的延期小于作為參數(shù)傳入的對(duì)象時(shí),該方法返回一個(gè)小于0的值。
        如果執(zhí)行這個(gè)方法的對(duì)象的延期大于作為參數(shù)傳入的對(duì)象時(shí),該方法返回一個(gè)大于0的值。
        如果這兩個(gè)對(duì)象有相同的延期,該方法返回0。
    2.getDelay(TimeUnit unit):該方法返回與此對(duì)象相關(guān)的剩余延遲時(shí)間,以給定的時(shí)間單位表示。
*/
public class Main {

    public static void main(String[] args) throws InterruptedException {

        // 一個(gè)共享延遲隊(duì)列
        DelayQueue<Event> queue = new DelayQueue<>();
        Thread threads[] = new Thread[5];

        // 5個(gè)線程
        for (int i = 0; i < threads.length; i++) {
            Task task = new Task(i + 1, queue);
            threads[i] = new Thread(task);
        }

        // 啟動(dòng)這5個(gè)線程,每個(gè)線程往延遲隊(duì)列中放入100個(gè)Event對(duì)象,并指定Event對(duì)象的StartDate
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        // 等待線程執(zhí)行完畢
        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }
        System.out.println(queue.size());
        // 當(dāng)延遲隊(duì)列中有數(shù)據(jù)時(shí),每500毫秒從延遲隊(duì)列中
        do {
            int counter = 0;
            Event event;
            do {
                // 只有當(dāng)延遲對(duì)象到期后才能從queue中取到
                // 注意: 這里如果使用take方法獲取,如果獲取不到會(huì)阻塞.而poll方法獲取不到會(huì)返回空
                event = queue.poll();
                if (event != null) counter++;
            } while (event != null);
            System.out.printf("At %s you have read %d events\n", new Date(), counter);
            TimeUnit.MILLISECONDS.sleep(500);
        } while (queue.size() > 0);
    }
}

ArrayBlockingQueue

有界阻塞隊(duì)列,數(shù)組實(shí)現(xiàn)。

代碼實(shí)現(xiàn)

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {

        // 用于記錄生產(chǎn)總量
        AtomicInteger putCount = new AtomicInteger(0);
        // 用于記錄消費(fèi)總量
        AtomicInteger takeCount = new AtomicInteger(0);

        final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        // putTask負(fù)責(zé)生產(chǎn)數(shù)據(jù)放入queue
        Runnable putTask = () -> {
            String threadName = Thread.currentThread().getName();
            for(int i = 0; i < 10000; i++) {
                try {
                    // 向queue中放入數(shù)據(jù)。當(dāng)queue中的元素?cái)?shù)量已滿,則阻塞等待,等到有人消費(fèi)掉,再向queue中放入數(shù)據(jù)
                    queue.put(threadName + i);
                    putCount.incrementAndGet();
                    System.out.println(threadName + ":" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        // takeTask負(fù)責(zé)從queue消費(fèi)數(shù)據(jù)
        Runnable takeTask = () -> {
            String threadName = Thread.currentThread().getName();
            for(int i = 0; i < 10000; i++) {
                try {
                    // 從queue中取出數(shù)據(jù)。當(dāng)queue中沒有元素時(shí),則阻塞等待,等到有人生產(chǎn)出數(shù)據(jù)放入queue時(shí)再進(jìn)行消費(fèi)
                    queue.take();
                    takeCount.incrementAndGet();
                    System.out.println(threadName + ":" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread[] putThreads = new Thread[10];
        Thread[] takeThreads = new Thread[10];

        // 啟動(dòng)10個(gè)生產(chǎn)者線程和10個(gè)消費(fèi)者線程
        for(int i = 0; i < 10; i++) {
            Thread putThread = new Thread(putTask, "producer" + i);
            putThreads[i] = putThread;
            putThread.start();
            Thread takeThread = new Thread(takeTask, "consumer" + i);
            takeThreads[i] = takeThread;
            takeThread.start();
        }

        // 等待生產(chǎn)者線程和消費(fèi)者線程執(zhí)行完畢
        for(int i = 0; i < 10; i++) {
            try {
                putThreads[i].join();
                takeThreads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 輸出生產(chǎn)總量和消費(fèi)總量
        System.out.println(putCount.get());
        System.out.println(takeCount.get());
    }
}

LinkedBlockingQueue

無界阻塞隊(duì)列(也可以指定有界),鏈表實(shí)現(xiàn)。和LinkedBlockingDeque差不多,一個(gè)單向、一個(gè)雙向。

LinkedBlockingDeque

無界阻塞隊(duì)列(也可以指定有界),鏈表實(shí)現(xiàn)。和LinkedBlockingQueue差不多,一個(gè)單向、一個(gè)雙向。

主要API介紹

  1. 獲?。韩@取到并移除/獲取不到就阻塞
    takeFirst()takeLast():這些方法分別返回隊(duì)列的第一個(gè)和最后一個(gè)元素。它們從隊(duì)列刪除返回的元素。如果隊(duì)列為空,這些方法將阻塞線程,直到隊(duì)列有元素。

  2. 獲?。韩@取到并移除/獲取不到返回null
    poll()、pollFirst()pollLast():這些方法分別返回隊(duì)列的第一個(gè)和最后一個(gè)元素。它們從隊(duì)列刪除返回的元素。如果隊(duì)列為空,這些方法將返回null值。

  3. 獲?。韩@取到但不移除/獲取不到拋異常
    getFirst()getLast():這些方法分別返回隊(duì)列的第一個(gè)和最后一個(gè)元素。它們不會(huì)從隊(duì)列刪除返回的元素。如果隊(duì)列為空,這些方法將拋出NoSuchElementExcpetion異常。

  4. 獲?。韩@取到但不移除/獲取不到返回null
    peek()、peekFirst(),和peekLast():這些方法分別返回隊(duì)列的第一個(gè)和最后一個(gè)元素。它們不會(huì)從隊(duì)列刪除返回的元素。如果隊(duì)列為空,這些方法將返回null值。

  5. 添加:超過容量的添加會(huì)拋出異常
    add()、 addFirst()、 addLast():這些方法分別在第一個(gè)位置和最后一個(gè)位置上添加元素(add是在最后添加)。如果隊(duì)列已滿(你已使用固定大小創(chuàng)建它),這些方法將拋出IllegalStateException異常。

代碼實(shí)現(xiàn)

public class Client implements Runnable {

    private LinkedBlockingDeque<String> deque;

    public Client(LinkedBlockingDeque<String> deque) {
        this.deque = deque;
    }

    @Override
    public void run() {
        // 每2秒往隊(duì)列中放入5個(gè)字符串,重復(fù)3次
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 5; j++) {
                StringBuilder request = new StringBuilder();
                request.append(i);
                request.append(":");
                request.append(j);
                try {
                    deque.put(request.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.printf("Client: %s at %s.\n", request, new Date());
            }
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Client: End.\n");
    }
}
public class Main {

    /*
        Client類使用put()方法添加字符串到隊(duì)列中。
        如果隊(duì)列已滿(因?yàn)槟阋咽褂霉潭ù笮韯?chuàng)建它),這個(gè)方法阻塞線程的執(zhí)行,直到隊(duì)列有可用空間。
        
        Main類使用take()方法從隊(duì)列中獲取字符串,
        如果隊(duì)列為空,這個(gè)方法將阻塞線程的執(zhí)行,直到隊(duì)列中有元素。

        在這個(gè)例子中,使用LinkedBlockingDeque類的這兩個(gè)方法,
        如果它們?cè)谧枞麜r(shí)被中斷,將拋出InterruptedException異常。
        所以,你必須包含必要的代碼來捕捉這個(gè)異常。
    */
    public static void main(String[] args) throws InterruptedException {

        // 創(chuàng)建阻塞雙端隊(duì)列
        LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(3);
        // Client每2秒會(huì)往這個(gè)雙端隊(duì)列中放入5個(gè)字符串,重復(fù)3次
        Thread thread = new Thread(new Client(deque));
        // 啟動(dòng)線程
        thread.start();

        // 每300毫秒從隊(duì)列中取走3個(gè)元素,重復(fù)5次
        for (int i = 0; i < 5; i++) {
            for (int j = 0; j < 3; j++) {
                String request = deque.take();
                System.out.printf("Main: Request: %s at %s. Size:%d\n", request, new Date(), deque.size());
            }
            TimeUnit.MILLISECONDS.sleep(300);
        }

        System.out.printf("Main: End of the program.\n");
    }
}

PriorityBlockingQueue

優(yōu)先級(jí)隊(duì)列,所有存儲(chǔ)在PriorityBlockingQueue的元素必須實(shí)現(xiàn)Comparable接口。在使用add方法加入元素時(shí),并不會(huì)進(jìn)行排序,而是在第一次調(diào)用take或poll等方法時(shí)才會(huì)將隊(duì)列元素排序(根據(jù)compareTo方法作升序排列),然后取出優(yōu)先級(jí)最高的元素。

代碼實(shí)現(xiàn)

public class Event implements Comparable<Event> {

    // 用來存儲(chǔ)已創(chuàng)建事件的線程數(shù)。
    private int thread;

    // 用來存儲(chǔ)事件的優(yōu)先級(jí)。
    private int priority;

    public Event(int thread, int priority) {
        this.thread = thread;
        this.priority = priority;
    }

    public int getThread() {
        return thread;
    }

    public int getPriority() {
        return priority;
    }

    // 實(shí)現(xiàn)compareTo()方法。它接收Event作為參數(shù),并且比較當(dāng)前事件與參數(shù)的優(yōu)先級(jí)。
    // 如果當(dāng)前事件的優(yōu)先級(jí)更大,則返回-1,如果這兩個(gè)優(yōu)先級(jí)相等,則返回0,如果當(dāng)前事件的優(yōu)先級(jí)更小,則返回1。
    // 注意,這與大多數(shù)Comparator.compareTo()的實(shí)現(xiàn)是相反的。
    // 即當(dāng)前對(duì)象的優(yōu)先級(jí)越高,越應(yīng)該排在隊(duì)列的最前方,可以理解為隊(duì)列中的排序是根據(jù)此方法作升序排列的
    @Override
    public int compareTo(Event e) {
        if (this.priority > e.getPriority()) {
            return -1;
        } else if (this.priority < e.getPriority()) {
            return 1;
        } else {
            return 0;
        }
    }
}

SynchronousQueue

同步阻塞隊(duì)列,直接隊(duì)列中add會(huì)拋異常,除非此時(shí)有一個(gè)線程正調(diào)用take向此隊(duì)列索取元素,那么隊(duì)列會(huì)將add進(jìn)來的元素直接交給正在take的線程。

代碼實(shí)現(xiàn)

public class SynchronousQueueTest {

    public static void main(String[] args) {

        // 可以理解為這個(gè)阻塞隊(duì)列的大小為0,生產(chǎn)者想要往這個(gè)隊(duì)列中放數(shù)據(jù)是放不進(jìn)的,
        // 除非此時(shí)有線程在從此隊(duì)列中取數(shù)據(jù),那么此隊(duì)列會(huì)將生產(chǎn)者會(huì)生產(chǎn)出的數(shù)據(jù)直接交給消費(fèi)者

        // 當(dāng)沒有消費(fèi)者取數(shù)據(jù)時(shí),各類添加操作的行為:
        //     put      阻塞
        //     offer    返回false
        //     add      拋出異常

        final SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        // 生產(chǎn)者任務(wù)
        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    // 永遠(yuǎn)為0
                    System.out.println(threadName + ":" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消費(fèi)者任務(wù)
        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            for (int i = 0; i < 100; i++) {
                try {
                    queue.take();
                    // 永遠(yuǎn)為0
                    System.out.println(threadName + ":" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

LinkedTransferQueue

TransferQueue也具有SynchronousQueue的所有功能,但是TransferQueue的功能更強(qiáng)大。

主要API介紹:

  1. transfer(E e)
    若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,即立刻將e移交之;否則將元素e插入到隊(duì)列尾部,并且當(dāng)前線程進(jìn)入阻塞狀態(tài),直到有消費(fèi)者線程取走該元素。
  2. tryTransfer(E e)
    若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,則該方法會(huì)即刻轉(zhuǎn)移e,并返回true;若不存在則返回false,但是并不會(huì)將e插入到隊(duì)列中。這個(gè)方法不會(huì)阻塞當(dāng)前線程,要么快速返回true,要么快速返回false。
  3. hasWaitingConsumer()getWaitingConsumerCount()
    用來判斷當(dāng)前正在等待消費(fèi)的消費(fèi)者線程個(gè)數(shù)。
  4. tryTransfer(E e, long timeout, TimeUnit unit)
    若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,會(huì)立即傳輸給它; 否則將元素e插入到隊(duì)列尾部,并且等待被消費(fèi)者線程獲取消費(fèi)掉。若在指定的時(shí)間內(nèi)元素e無法被消費(fèi)者線程獲取,則返回false,同時(shí)該元素從隊(duì)列中移除。

代碼實(shí)現(xiàn)

public class TransferQueueTest1 {

    public static void main(String[] args) throws InterruptedException {
        final TransferQueue<String> queue = new LinkedTransferQueue<>();

        // 1
        new Thread(() -> {
            try {
                // 向空隊(duì)列中獲取數(shù)據(jù),這里會(huì)阻塞
                System.out.println(Thread.currentThread().getName() + " : queue.take() = " + queue.take());
                System.out.println(Thread.currentThread().getName() + " : queue.size() = " + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(1);

        // 2
        new Thread(() -> {
            try {
                // transfer(E e):
                // 若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,即立刻將元素移交給消費(fèi)者線程,
                // 否則將元素插入到隊(duì)列尾部,并且當(dāng)前線程進(jìn)入阻塞狀態(tài),直到有消費(fèi)者線程取走該元素

                // A元素直接移交成功,因?yàn)樵诖酥坝幸粋€(gè)消費(fèi)者線程正在等待獲取
                queue.transfer("A");

                // B元素會(huì)移交不成功,所以會(huì)添加到隊(duì)列尾部
                queue.transfer("B");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(3);

        // 這里會(huì)獲取到B元素,B元素在之前無法直接移交,會(huì)放到隊(duì)列中,這里使用take方法直接獲取到
        System.out.println(Thread.currentThread().getName() + " : queue.take() = " + queue.take());

        // 此時(shí)隊(duì)列容量為0,隊(duì)列中所有元素均被取走
        System.out.println(Thread.currentThread().getName() + " : queue.size() = " + queue.size());
    }
}
public class TransferQueueTest2 {

    public static void main(String[] args) throws InterruptedException {
        final TransferQueue<String> queue = new LinkedTransferQueue<>();

        // 1
        new Thread(() -> {
            try {
                // 向空隊(duì)列中獲取數(shù)據(jù),這里會(huì)阻塞
                System.out.println(Thread.currentThread().getName()
                        + " : queue.take() = " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(1);

        // 2
        new Thread(() -> {
            // tryTransfer(E e):
            // 若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,則該方法會(huì)即刻轉(zhuǎn)移e,并返回true;
            // 若不存在則返回false,但是并不會(huì)將e插入到隊(duì)列中。
            // 這個(gè)方法不會(huì)阻塞當(dāng)前線程,要么快速返回true,要么快速返回false。

            // A元素直接移交成功,因?yàn)樵诖酥坝幸粋€(gè)消費(fèi)者線程正在等待獲取
            System.out.println(Thread.currentThread().getName() +
                    " : queue.tryTransfer(\"A\") = " + queue.tryTransfer("A"));

            // B元素會(huì)移交不成功,直接返回false,也不會(huì)將B放入隊(duì)列中
            System.out.println(Thread.currentThread().getName() +
                    " : queue.tryTransfer(\"B\") = " + queue.tryTransfer("B"));
        }).start();

        TimeUnit.SECONDS.sleep(1);

        // 此時(shí)隊(duì)列大小為0,因?yàn)閠ryTransfer不會(huì)將未成功移交的元素放入隊(duì)列
        System.out.println(Thread.currentThread().getName() + " : queue.size() = " + queue.size());
    }
}

ConcurrentSkipListMap

ConcurrentSkipListMap是ConcurrentNavigableMap的實(shí)現(xiàn)類,在內(nèi)部實(shí)現(xiàn)中,它使用Skip List來存儲(chǔ)數(shù)據(jù)。Skip List是基于并行列表的數(shù)據(jù)結(jié)構(gòu),它允許我們獲取類似二叉樹的效率。使用它,你可以得到一個(gè)排序的數(shù)據(jù)結(jié)構(gòu),這比排序數(shù)列使用更短的訪問時(shí)間來插入、搜索和刪除元素。

主要API介紹

  1. headMap(K toKey):K是參數(shù)化ConcurrentSkipListMap對(duì)象的Key值的類。返回此映射的部分視圖,其鍵值小于 toKey。
  2. tailMap(K fromKey):K是參數(shù)化ConcurrentSkipListMap對(duì)象的Key值的類。返回此映射的部分視圖,其鍵大于等于 fromKey。
  3. putIfAbsent(K key, V Value):如果key不存在map中,則這個(gè)方法插入指定的key和value。
  4. pollLastEntry():這個(gè)方法返回并刪除map中最后一個(gè)元素的Map.Entry對(duì)象。
  5. replace(K key, V Value):如果這個(gè)key存在map中,則這個(gè)方法將指定key的value替換成新的value。

代碼實(shí)現(xiàn)

public class Main {
    public static void main(String[] args) throws InterruptedException {

        ConcurrentSkipListMap<String, Contact> map = new ConcurrentSkipListMap<>();

        Thread threads[] = new Thread[25];
        int counter = 0;

        // 創(chuàng)建并啟動(dòng)25個(gè)線程,每個(gè)線程的任務(wù)就是往map中放入1000條數(shù)據(jù)
        // key:A1001  value:new Contact(A, 1001)
        for (char i = 'A'; i < 'Z'; i++) {
            // Task (ConcurrentSkipListMap<String, Contact> map, String id)
            Task task = new Task(map, String.valueOf(i));
            threads[counter] = new Thread(task);
            threads[counter].start();
            counter++;
        }

        // 等待線程創(chuàng)建完畢
        for (int i = 0; i < 25; i++) {
            threads[i].join();
        }

        // 輸出map當(dāng)前容量,即25000
        System.out.printf("Main: Size of the map: %d\n", map.size());
        Map.Entry<String, Contact> element;
        Contact contact;

        // 獲取第一個(gè)Entry
        element = map.firstEntry();
        contact = element.getValue();
        System.out.printf("Main: First Entry: %s: %s\n", contact.getName(), contact.getPhone());

        // 獲取最后一個(gè)Entry
        element = map.lastEntry();
        contact = element.getValue();
        System.out.printf("Main: Last Entry: %s: %s\n", contact.getName(), contact.getPhone());

        // 獲取key為A1996-B1002之間的數(shù)據(jù)作為子map,實(shí)際上取出的是A1996-B1001,即包左不包右的原則
        System.out.printf("Main: Submap from A1996 to B1002: \n");
        ConcurrentNavigableMap<String, Contact> submap = map.subMap("A1996", "B1002");

        do {
            // 獲取并移除第一個(gè)Entry
            element = submap.pollFirstEntry();
            if (element != null) {
                contact = element.getValue();
                System.out.printf("%s: %s\n", contact.getName(), contact.getPhone());
            }
        } while (element != null);

        // 輸出map當(dāng)前容量,即24994
        System.out.printf("Main: Size of the map: %d\n", map.size());
    }

}

ConcurrentHashMap

以下內(nèi)容的原文地址:ConcurrentHashMap總結(jié)
并發(fā)編程實(shí)踐中,ConcurrentHashMap是一個(gè)經(jīng)常被使用的數(shù)據(jù)結(jié)構(gòu),相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎(chǔ)上提供了更好的寫并發(fā)能力,但同時(shí)降低了對(duì)讀一致性的要求。ConcurrentHashMap的設(shè)計(jì)與實(shí)現(xiàn)非常精巧,大量的利用了volatile,final,CAS等lock-free技術(shù)來減少鎖競(jìng)爭(zhēng)對(duì)于性能的影響。ConcurrentHashMap在JDK6,7,8中實(shí)現(xiàn)都不同。

JDK6與JDK7中的實(shí)現(xiàn)

ConcurrentHashMap采用了分段鎖的設(shè)計(jì),只有在同一個(gè)分段內(nèi)才存在競(jìng)態(tài)關(guān)系,不同的分段鎖之間沒有鎖競(jìng)爭(zhēng)。相比于對(duì)整個(gè)Map加鎖的設(shè)計(jì),分段鎖大大的提高了高并發(fā)環(huán)境下的處理能力。但同時(shí),由于不是對(duì)整個(gè)Map加鎖,導(dǎo)致一些需要掃描整個(gè)Map的方法(如size(), containsValue())需要使用特殊的實(shí)現(xiàn),另外一些方法(如clear())甚至放棄了對(duì)一致性的要求(ConcurrentHashMap是弱一致性的,具體請(qǐng)查看ConcurrentHashMap能完全替代HashTable嗎?)。

JDK8中的實(shí)現(xiàn)

ConcurrentHashMap在JDK8中進(jìn)行了巨大改動(dòng),很需要通過源碼來再次學(xué)習(xí)下Doug Lea的實(shí)現(xiàn)方法。
它摒棄了Segment(鎖段)的概念,而是啟用了一種全新的方式實(shí)現(xiàn),利用CAS算法。它沿用了與它同時(shí)期的HashMap版本的思想,底層依然由“數(shù)組”+鏈表+紅黑樹的方式思想(JDK7與JDK8中HashMap的實(shí)現(xiàn)),但是為了做到并發(fā),又增加了很多輔助的類,例如TreeBin,Traverser等對(duì)象內(nèi)部類。

代碼實(shí)現(xiàn)

public class ConcurrentHashMapTest {

    public static long execute(Map<String, String> map) {
        Random random = new Random();
        Thread[] threads = new Thread[100];

        long start = System.currentTimeMillis();

        for (int i = 0; i < threads.length; i++) {
            final int x = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    map.put(String.valueOf(random.nextInt(100000)), x + "" + j);
                }
            });
        }

        for (int i = 0; i < 100; i++) {
            threads[i].start();
        }

        for (int i = 0; i < 100; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return System.currentTimeMillis() - start;
    }

    public static void main(String[] args) {

        Map<String, String> hashtable = new Hashtable<>();
        Map<String, String> concurrentHashMap = new ConcurrentHashMap<>();
        Map<String, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();

        long total = 0;
        for (int i = 0; i < 10; i++) {
            total += execute(hashtable);
        }
        System.out.println("hashtable: " + total); // 3942

        total = 0;
        for (int i = 0; i < 10; i++) {
            total += execute(concurrentHashMap);
        }
        System.out.println("concurrentHashMap: " + total); // 1258

        total = 0;
        for (int i = 0; i < 10; i++) {
            total += execute(concurrentSkipListMap);
        }
        System.out.println("concurrentSkipListMap: " + total); // 2525
    }
}

CopyOnWrite

以下內(nèi)容的原文地址:聊聊并發(fā)-Java中的Copy-On-Write容器

Copy-On-Write簡(jiǎn)稱COW,是一種用于程序設(shè)計(jì)中的優(yōu)化策略。其基本思路是,從一開始大家都在共享同一個(gè)內(nèi)容,當(dāng)某個(gè)人想要修改這個(gè)內(nèi)容的時(shí)候,才會(huì)真正把內(nèi)容Copy出去形成一個(gè)新的內(nèi)容然后再改,這是一種延時(shí)懶惰策略。從JDK1.5開始Java并發(fā)包里提供了兩個(gè)使用CopyOnWrite機(jī)制實(shí)現(xiàn)的并發(fā)容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并發(fā)場(chǎng)景中使用到。

什么是CopyOnWrite容器?

CopyOnWrite容器即寫時(shí)復(fù)制的容器。通俗的理解是當(dāng)我們往一個(gè)容器添加元素的時(shí)候,不直接往當(dāng)前容器添加,而是先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個(gè)新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對(duì)CopyOnWrite容器進(jìn)行并發(fā)的讀,而不需要加鎖,因?yàn)楫?dāng)前容器不會(huì)添加任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。

實(shí)現(xiàn)原理

以下是CopyOnWriteArrayList的源碼,可以發(fā)現(xiàn)在添加的時(shí)候是需要加鎖的,否則多線程寫的時(shí)候會(huì)Copy出N個(gè)副本出來。讀的時(shí)候不需要加鎖,如果讀的時(shí)候有多個(gè)線程正在向list添加數(shù)據(jù),讀還是會(huì)讀到舊的數(shù)據(jù),因?yàn)閷懙臅r(shí)候不會(huì)鎖住舊的list。

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

public E get(int index) {
    return get(getArray(), index);
}

代碼實(shí)現(xiàn)

public class CopyOnWriteArrayListTest {

    public static void main(String[] args) {
        final List<String> list =
//                Collections.synchronizedList(new ArrayList<>());
//                new Vector<>();
                // 并發(fā)寫時(shí),CopyOnWriteArrayList性能遠(yuǎn)不如synchronizedList和Vector
                new CopyOnWriteArrayList<>();
        Random random = new Random();

        // 創(chuàng)建100個(gè)線程,每個(gè)線程的任務(wù)就是往list中放入1000條數(shù)據(jù)
        Thread[] threads = new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            Runnable task = () -> {
                for (int j = 0; j < 1000; j++) {
                    list.add("a" + random.nextInt(10000));
                }
            };
            threads[i] = new Thread(task);
        }

        long start = System.currentTimeMillis();
        // 啟動(dòng)所有線程
        for (Thread thread : Arrays.asList(threads)) {
            thread.start();
        }
        // 等待這些線程執(zhí)行完畢
        for (Thread thread : Arrays.asList(threads)) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(System.currentTimeMillis() - start);

        System.out.println(list.size());
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容