實(shí)現(xiàn)雙端隊(duì)列和阻塞隊(duì)列

一、隊(duì)列的概念

定義 特點(diǎn)
隊(duì)列 一端刪除(頭)另一端添加(尾) First In First Out
雙端隊(duì)列 兩端都可以刪除、添加
優(yōu)先級(jí)隊(duì)列 優(yōu)先級(jí)高者先出隊(duì)
延時(shí)隊(duì)列 根據(jù)延時(shí)時(shí)間確定優(yōu)先級(jí)
并發(fā)非阻塞隊(duì)列 隊(duì)列空或滿(mǎn)時(shí)不阻塞
并發(fā)阻塞隊(duì)列 隊(duì)列空時(shí)刪除阻塞、隊(duì)列滿(mǎn)時(shí)添加阻塞

二、實(shí)現(xiàn)雙端隊(duì)列

定義接口

public interface Deque<E> {

    /**
     * 隊(duì)首添加元素
     * @param e
     * @return
     */
    boolean offerFirst(E e);

    /**
     * 隊(duì)尾添加元素
     * @param e
     * @return
     */
    boolean offerLast(E e);

    /**
     * 隊(duì)首移除元素并返回
     * @return
     */
    E pollFirst();

    /**
     * 隊(duì)尾移除元素并返回
     * @return
     */
    E pollLast();

    /**
     * 獲取隊(duì)首元素
     * @return
     */
    E peekFirst();

    /**
     * 獲取隊(duì)尾元素
     * @return
     */
    E peekLast();

    /**
     * 判斷隊(duì)列是否為空
     * @return
     */
    boolean isEmpty();

    /**
     * 判斷隊(duì)列是否已滿(mǎn)
     * @return
     */
    boolean isFull();

}

2.1 基于數(shù)組實(shí)現(xiàn):

package com.hcx.algorithm.queue;

import java.util.Iterator;

/**
 * @Title: CircularArrayDeque.java
 * @Package com.hcx.algorithm.queue
 * @Description: 基于循環(huán)數(shù)組實(shí)現(xiàn)雙端隊(duì)列
 * tail指向的位置不存儲(chǔ)元素,每次都指向待存儲(chǔ)的位置
 * head操作:先往前走一個(gè),然后設(shè)置元素
 * tail操作:先設(shè)置值,然后往后走一個(gè)
 * @Author: hongcaixia
 * @Date: 2025/1/12 15:04
 * @Version V1.0
 */
public class CircularArrayDeque<E> implements Deque<E>,Iterable<E> {

    E[] array;

    int head;

    int tail;

    public CircularArrayDeque(int capacity) {
        array = (E[]) new Object[capacity + 1];
    }

    @Override
    public boolean offerFirst(E e) {
        if (isFull()) {
            return false;
        }
        // head指針先往前走一個(gè)位置,然后設(shè)置值
        head = head - 1;
        if (head < 0) {
            head = array.length - 1;
        }
        array[head] = e;
        return true;
    }

    @Override
    public boolean offerLast(E e) {
        if (isFull()) {
            return false;
        }
        array[tail] = e;
        tail = tail + 1;
        if (tail >= array.length) {
            tail = 0;
        }
        return true;
    }

    @Override
    public E pollFirst() {
        if (isEmpty()) {
            return null;
        }
        E e = array[head];
        // help gc
        array[head] = null;
        head = head + 1;
        if (head >= array.length) {
            head = 0;
        }
        return e;
    }

    @Override
    public E pollLast() {
        if (isEmpty()) {
            return null;
        }
        tail = tail - 1;
        if (tail < 0) {
            tail = array.length - 1;
        }
        E e = array[tail];
        // help gc
        array[tail] = null;
        return e;
    }

    @Override
    public E peekFirst() {
        if (isEmpty()) {
            return null;
        }
        return array[head];
    }

    @Override
    public E peekLast() {
        if (isEmpty()) {
            return null;
        }
        tail = tail - 1;
        if (tail < 0) {
            tail = array.length - 1;
        }
        return array[tail];
    }

    @Override
    public boolean isEmpty() {
        return head == tail;
    }

    @Override
    public boolean isFull() {
        if (tail > head) {
            return tail - head == array.length - 1;
        } else if (tail < head) {
            return head - tail == 1;
        } else {
            return false;
        }
    }

    @Override
    public Iterator<E> iterator() {
        return new Iterator<E>() {
            int pointer = head;

            @Override
            public boolean hasNext() {
                return pointer != tail;
            }

            @Override
            public E next() {
                E e = array[pointer];
                pointer = pointer + 1;
                if (pointer >= array.length) {
                    pointer = 0;
                }
                return e;
            }

            @Override
            public void remove() {

            }
        };
    }
}

2.2 基于鏈表實(shí)現(xiàn)

package com.hcx.algorithm.queue;

/**
 * @Title: CircularLinkedListQueue.java
 * @Package com.hcx.algorithm.queue
 * @Description: Leetcode622.設(shè)計(jì)循環(huán)隊(duì)列
 * @Author: hongcaixia
 * @Date: 2025/1/11 14:57
 * @Version V1.0
 */
public class CircularLinkedListQueue {

    private static class Node {
        int value;
        Node next;
        Node(int value, Node next) {
            this.value = value;
            this.next = next;
        }
    }
    private final Node head = new Node(-1, null);
    private Node tail = head;
    private int size = 0;
    private int capacity = 0;

    {
        tail.next = head;
    }

    public CircularLinkedListQueue(int capacity) {
        this.capacity = capacity;
    }

    public boolean enQueue(int value) {
        if(isFull()) {
            return false;
        }
        Node added = new Node(value, head);
        tail.next = added;
        tail = added;
        size++;
        return true;
    }

    public boolean deQueue() {
        if(isEmpty()) {
            return false;
        }
        Node first = head.next;
        head.next = first.next;
        if (first == tail) {
            tail = head;
        }
        size--;
        return true;
    }

    public int Front() {
        if(isEmpty()) {
            return -1;
        }
        return head.next.value;
    }

    public int Rear() {
        if(isEmpty()) {
            return -1;
        }
        return tail.value;
    }

    public boolean isEmpty() {
        return head == tail;
    }

    public boolean isFull() {
        return size == capacity;
    }
}

三、實(shí)現(xiàn)阻塞隊(duì)列

大部分場(chǎng)景要求分離生產(chǎn)者(向隊(duì)列放入)和消費(fèi)者(從隊(duì)列拿出)兩個(gè)角色、它們由不同的線(xiàn)程來(lái)承擔(dān),前面的實(shí)現(xiàn)沒(méi)有考慮線(xiàn)程安全問(wèn)題。

隊(duì)列為空,前面的實(shí)現(xiàn)里會(huì)返回 null,如果就是要拿到一個(gè)元素只能不斷循環(huán)嘗試。
隊(duì)列滿(mǎn),前面的實(shí)現(xiàn)里會(huì)返回 false,如果就是要放入一個(gè)元素只能不斷循環(huán)嘗試。

實(shí)現(xiàn)方案:

  1. 用鎖保證線(xiàn)程安全
  2. 用條件變量讓等待非空線(xiàn)程等待不滿(mǎn)線(xiàn)程進(jìn)入等待狀態(tài),而不是不斷循環(huán)嘗試,讓 CPU 空轉(zhuǎn)。

1.阻塞隊(duì)列接口

package com.hcx.algorithm.queue;

/**
 * @Title: BlockingQueue.java
 * @Package com.hcx.algorithm.queue
 * @Description: 阻塞隊(duì)列接口
 * @Author: hongcaixia
 * @Date: 2025/1/13 15:16
 * @Version V1.0
 */
public interface BlockingQueue<E> {

    /**
     * 阻塞入隊(duì)元素
     * 當(dāng)隊(duì)列滿(mǎn)時(shí),阻塞,直到隊(duì)列不滿(mǎn)時(shí),把元素入隊(duì)
     *
     * @param e
     * @throws InterruptedException
     */
    void offer(E e) throws InterruptedException;

    /**
     * 超時(shí)阻塞入隊(duì)元素
     * 當(dāng)隊(duì)列滿(mǎn)時(shí),阻塞,直到隊(duì)列不滿(mǎn)時(shí),把元素入隊(duì),如果中間超時(shí)時(shí)間達(dá)到,則放棄入隊(duì),返回false
     *
     * @param e
     * @param timeout
     * @return
     * @throws InterruptedException
     */
    void offer(E e, long timeout) throws InterruptedException;

    /**
     * 元素出隊(duì)
     *
     * @return
     * @throws InterruptedException
     */
    E poll() throws InterruptedException;

    /**
     * 判斷隊(duì)列是否為空
     *
     * @return
     */
    boolean isEmpty();

    /**
     * 判斷隊(duì)列是否已滿(mǎn)
     *
     * @return
     */
    boolean isFull();
}

2.單鎖實(shí)現(xiàn)

package com.hcx.algorithm.queue;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Title: SingleLockBlockingQueue.java
 * @Package com.hcx.algorithm.queue
 * @Description: 單鎖實(shí)現(xiàn)阻塞隊(duì)列
 * @Author: hongcaixia
 * @Date: 2025/1/13 15:23
 * @Version V1.0
 */
public class SingleLockBlockingQueue<E> implements BlockingQueue<E> {

    E[] array;

    // 隊(duì)列大小
    int size = 0;

    // 頭指針
    int head = 0;

    // 尾指針
    int tail = 0;

    // 保證線(xiàn)程安全操作隊(duì)列時(shí)需要加鎖
    ReentrantLock lock = new ReentrantLock();

    // 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
    Condition tailWait = lock.newCondition();

    // 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
    Condition headWait = lock.newCondition();

    public SingleLockBlockingQueue(int capacity) {
        array = (E[]) new Object[capacity];
    }


    @Override
    public void offer(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            // tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                tailWait.await();
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            size++;
            // 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
            headWait.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            long time = TimeUnit.MILLISECONDS.toNanos(timeout);
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                if (time <= 0) {
                    return;
                }
                // 重新更新等待時(shí)間
                time = tailWait.awaitNanos(time);
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            size++;
            // 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
            headWait.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            // 喚醒后應(yīng)該重新檢查條件
            while (isEmpty()) {
                headWait.await();
            }
            E e = array[head];
            array[head] = null;
            head++;
            if (head == array.length) {
                head = 0;
            }
            size--;
            // 此時(shí)隊(duì)列中已經(jīng)有元素出隊(duì),那么隊(duì)列中已經(jīng)有空間可以再入隊(duì)了,喚醒在入隊(duì)時(shí)因?yàn)闆](méi)空間阻塞的等待隊(duì)列tailWait
            tailWait.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        return size == 0;
    }

    @Override
    public boolean isFull() {
        return size == array.length;
    }
}

2.1 lock.lockInterruptibly();
加鎖的過(guò)程中在阻塞時(shí)可以打斷(如果是lock方法就不會(huì)被打斷,會(huì)一直等下去),提前喚醒,不需要在阻塞下去了,但此時(shí)也不一定能獲取到鎖,除非持鎖線(xiàn)程已經(jīng)解鎖了。

2.2 隊(duì)列滿(mǎn)時(shí)的操作
讓offer線(xiàn)程進(jìn)入阻塞狀態(tài)。不繼續(xù)執(zhí)行添加操作。
使用lock的條件變量存儲(chǔ)阻塞的offer線(xiàn)程。
tailWait.await(); 將當(dāng)前offer線(xiàn)程加入tailWait集合中,當(dāng)前線(xiàn)程阻塞住。

await()方法會(huì)釋放鎖。

2.3 隊(duì)列不滿(mǎn)時(shí)的操作
喚醒阻塞的offer線(xiàn)程,讓阻塞的線(xiàn)程繼續(xù)往后執(zhí)行,完成入隊(duì)操作。
tailWait.signal();

調(diào)用signal()方法之前需要先獲取鎖。

2.4 處理虛假喚醒

            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                if (time <= 0) {
                    return;
                }
                // 重新更新等待時(shí)間
                time = tailWait.awaitNanos(time);
            }

這里判斷隊(duì)列是否是滿(mǎn)的要使用while,不能只使用if:
當(dāng)多線(xiàn)程并發(fā)的情況下,隊(duì)列只有一個(gè)空位時(shí),喚醒了阻塞的線(xiàn)程,可能這個(gè)時(shí)候恰好又有一個(gè)線(xiàn)程也要操作入隊(duì),恰好比他這個(gè)剛被喚醒的線(xiàn)程搶先一步拿到了鎖,發(fā)現(xiàn)隊(duì)列沒(méi)有滿(mǎn),就添加元素,隊(duì)列又滿(mǎn)了,執(zhí)行完釋放鎖;此時(shí)對(duì)于剛被喚醒的線(xiàn)程來(lái)說(shuō),再拿到了鎖,繼續(xù)執(zhí)行,那么就會(huì)覆蓋掉隊(duì)列的元素了,所以需要再判斷一次隊(duì)列是否已滿(mǎn)。

這里注意被喚醒的阻塞線(xiàn)程,也是要重新?lián)屾i的(await方法會(huì)釋放鎖的)沒(méi)搶到,就還是被阻塞著。

2.5 處理帶超時(shí)時(shí)間的虛假喚醒

            long time = TimeUnit.MILLISECONDS.toNanos(timeout);
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                if (time <= 0) {
                    return;
                }
                // 重新更新等待時(shí)間 返回的就是剩余等待時(shí)間。
                time = tailWait.awaitNanos(time);
            }

假設(shè)offer線(xiàn)程超時(shí)等待的時(shí)間為5s,時(shí)間過(guò)去了2s時(shí),poll線(xiàn)程取走了一個(gè)元素,喚醒了在等待的offer線(xiàn)程;此時(shí)又來(lái)了一個(gè)offer線(xiàn)程,搶到了鎖,又把隊(duì)列放滿(mǎn)了,那么被喚醒的線(xiàn)程此次為虛假喚醒,但是不能再繼續(xù)等待5s了,已經(jīng)等待了2s,只需要再等待3s;所以需要重新更新等待時(shí)間。

3.雙鎖實(shí)現(xiàn)

針對(duì)單鎖的實(shí)現(xiàn),出隊(duì)和入隊(duì)都使用了一把鎖,實(shí)際出隊(duì)和入隊(duì)是不沖突的,所以為了提高性能,這里出隊(duì)使用一把鎖,入隊(duì)使用另一把鎖。針對(duì)共享變量size的操作,使用原子類(lèi)來(lái)保證安全。

  • 入隊(duì)使用tailLock鎖住隊(duì)尾,阻塞時(shí)加入tailWait等待
  • 出隊(duì)使用headLock鎖住隊(duì)頭,阻塞時(shí)加入headWait等待

入隊(duì)offer邏輯

  • 1.加tailLock鎖
  • 2.如果隊(duì)列滿(mǎn),進(jìn)入tailWait等待
  • 3.隊(duì)列不滿(mǎn),執(zhí)行入隊(duì)操作,同時(shí)喚醒在headWait等待的出隊(duì)線(xiàn)程(因?yàn)橛性厝腙?duì)了,就代表可以有元素出隊(duì),之間因?yàn)闆](méi)有元素而阻塞的等待出隊(duì)的線(xiàn)程就可以喚醒來(lái)執(zhí)行了)

出隊(duì)poll邏輯

  • 1.加headLock鎖
  • 2.如果隊(duì)列空,進(jìn)入headWait等待;
  • 3.隊(duì)列不空,執(zhí)行出隊(duì)操作,同時(shí)喚醒在tailWait等待的入隊(duì)線(xiàn)程(因?yàn)橛性爻鲫?duì)了,就代表有位置給元素入隊(duì),之間因?yàn)闆](méi)有位置而阻塞的等待入隊(duì)的線(xiàn)程就可以喚醒來(lái)執(zhí)行了)

signal和await使用

  • 必須配合鎖來(lái)使用,調(diào)用前加鎖,調(diào)用后解鎖
  • 必須配對(duì)使用:
    • headWait調(diào)用await和signal方法,必須加的是headLock鎖。(headWait是由headLock創(chuàng)建的)
    • tailWait調(diào)用await和signal方法,必須加的是tailLock鎖。(tailWait是由tailLock創(chuàng)建的)

基礎(chǔ)版

/**
 * @Title: SingleLockBlockingQueue.java
 * @Package com.hcx.algorithm.queue
 * @Description: 雙鎖實(shí)現(xiàn)阻塞隊(duì)列
 * 一把鎖保護(hù) tail,一把鎖保護(hù) head,提升性能
 * @Author: hongcaixia
 * @Date: 2025/1/13 15:23
 * @Version V1.0
 */
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {

    E[] array;

    // 隊(duì)列大小
    AtomicInteger size = new AtomicInteger(0);

    // 頭指針
    int head = 0;

    // 尾指針
    int tail = 0;

    // 出隊(duì)鎖
    ReentrantLock headLock = new ReentrantLock();
    // 入隊(duì)鎖
    ReentrantLock tailLock = new ReentrantLock();

    // 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
    Condition tailWait = tailLock.newCondition();

    // 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
    Condition headWait = headLock.newCondition();

    public DoubleLockBlockingQueue(int capacity) {
        array = (E[]) new Object[capacity];
    }


    @Override
    public void offer(E e) throws InterruptedException {
        tailLock.lockInterruptibly();
        try {
            // tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                tailWait.await();
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            size.getAndIncrement();

            // 隊(duì)列從空到非空,喚醒等待出隊(duì)的poll線(xiàn)程
            headLock.lock();
            try {
                headWait.signal();
            }finally {
                headLock.unlock();
            }
        } finally {
            tailLock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        headLock.lockInterruptibly();
        try {
            // 隊(duì)列空等待
            while (isEmpty()) {
                headWait.await();
            }
            // 不空則出隊(duì)
            e = array[head];
            if (++head == array.length) {
                head = 0;
            }
            // 修改 size
            size.getAndDecrement();

            // 隊(duì)列從滿(mǎn)到不滿(mǎn),喚醒等待入隊(duì)的offer線(xiàn)程
            tailLock.lock();
            try {
                tailWait.signal();
            } finally {
                tailLock.unlock();
            }
        } finally {
            headLock.unlock();
        }
        return e;
    }

    @Override
    public boolean isEmpty() {
        return size.get() == 0;
    }

    @Override
    public boolean isFull() {
        return size.get() == array.length;
    }
}

上述代碼,兩把鎖嵌套使用,非常容易出現(xiàn)死鎖:

  • poll線(xiàn)程獲取了head鎖,準(zhǔn)備去獲取tail鎖
  • offer線(xiàn)程獲取了tail鎖,準(zhǔn)備去獲取head鎖

兩個(gè)線(xiàn)程各自持有著一把鎖,都準(zhǔn)備去獲取對(duì)方的鎖,誰(shuí)也不讓誰(shuí),就出現(xiàn)了死鎖。

改進(jìn):不嵌套獲取,把喚醒線(xiàn)程的操作放在上一個(gè)鎖解鎖之后:

@Override
    public void offer(E e) throws InterruptedException {
        tailLock.lockInterruptibly();
        try {
            // tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                tailWait.await();
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            size.getAndIncrement();
        } finally {
            tailLock.unlock();
        }
        // 隊(duì)列從空到非空,喚醒等待出隊(duì)的poll線(xiàn)程
        headLock.lock();
        try {
            headWait.signal();
        } finally {
            headLock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        headLock.lockInterruptibly();
        try {
            // 隊(duì)列空等待
            while (isEmpty()) {
                headWait.await();
            }
            // 不空則出隊(duì)
            e = array[head];
            if (++head == array.length) {
                head = 0;
            }
            // 修改 size
            size.getAndDecrement();
        } finally {
            headLock.unlock();
        }
        // 隊(duì)列從滿(mǎn)到不滿(mǎn),喚醒等待入隊(duì)的offer線(xiàn)程
        tailLock.lock();
        try {
            tailWait.signal();
        } finally {
            tailLock.unlock();
        }
        return e;
    }

優(yōu)化版:使用級(jí)聯(lián)通知完成喚醒操作
因?yàn)樵诓僮鱫ffer和poll時(shí),不僅要拿到當(dāng)前要操作的鎖,還要拿對(duì)方的鎖來(lái)喚醒對(duì)方。所以效率降低了。
針對(duì)offer線(xiàn)程來(lái)說(shuō),入隊(duì)用的是tailLock鎖,喚醒用的是headLock鎖,要盡可能減少去獲取headLock鎖的次數(shù)。

場(chǎng)景
剛開(kāi)始隊(duì)列為空:
來(lái)了3個(gè)poll線(xiàn)程操作,因?yàn)殛?duì)列沒(méi)有元素,所以都進(jìn)入了headWait中等待。

來(lái)了3個(gè)offer線(xiàn)程,分別往隊(duì)伍加入了3個(gè)元素,現(xiàn)在的代碼,對(duì)于每一個(gè)offer線(xiàn)程,他都會(huì)去獲取鎖執(zhí)行喚醒操作;為了減少offer線(xiàn)程取獲取headLock鎖,只讓其中一個(gè)offer線(xiàn)程去獲取鎖執(zhí)行喚醒操作,而對(duì)于headWait中剩下2個(gè)poll線(xiàn)程,則由已經(jīng)被offer線(xiàn)程喚醒的那一個(gè)poll線(xiàn)程來(lái)喚醒。(condition里的結(jié)構(gòu)是一個(gè)鏈表結(jié)構(gòu))

具體實(shí)現(xiàn)
offer操作的headWait的喚醒邏輯:隊(duì)列空時(shí)在headWait等待出隊(duì)的tail線(xiàn)程

  • 針對(duì)offer線(xiàn)程:只讓剛?cè)腙?duì)第一個(gè)元素的offer線(xiàn)程(此時(shí)隊(duì)列大小size為0)去執(zhí)行喚醒操作,其他喚醒操作交給被喚醒的poll線(xiàn)程來(lái)實(shí)現(xiàn)級(jí)聯(lián)喚醒。
  • 針對(duì)poll線(xiàn)程:當(dāng)headWait中的線(xiàn)程被喚醒之后,判斷隊(duì)列中是不是還有多的元素,如果有超過(guò)1個(gè),那么就自己來(lái)喚醒在headWait中等待出隊(duì)的線(xiàn)程。

poll操作的tailWait的喚醒邏輯:隊(duì)列滿(mǎn)時(shí)在tailWait中等待入隊(duì)的offer線(xiàn)程

  • 針對(duì)poll線(xiàn)程:取走了一個(gè)元素,那么隊(duì)列就有空位了,可以喚醒在tailWait中等待的線(xiàn)程。不是所有的poll都執(zhí)行,只在隊(duì)列剛從滿(mǎn)變成不滿(mǎn)的時(shí)候(取走元素之前隊(duì)列size和數(shù)組長(zhǎng)度一樣)由poll來(lái)喚醒,其他喚醒交給被喚醒的offer線(xiàn)程來(lái)實(shí)現(xiàn)級(jí)聯(lián)喚醒。
  • 針對(duì)offer線(xiàn)程:當(dāng)tailWait中的offer線(xiàn)程被喚醒之后,判斷隊(duì)列是不是還有多的空位,如果空位超過(guò)1個(gè),那么就自己來(lái)喚醒在tailWait中等待入隊(duì)的線(xiàn)程。

完整代碼:

package com.hcx.algorithm.queue;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Title: SingleLockBlockingQueue.java
 * @Package com.hcx.algorithm.queue
 * @Description: 雙鎖實(shí)現(xiàn)阻塞隊(duì)列
 * 一把鎖保護(hù) tail,一把鎖保護(hù) head,提升性能
 * 在喚醒的時(shí)候進(jìn)行了優(yōu)化,只有當(dāng)隊(duì)列處于臨界條件的時(shí)候才由另一個(gè)線(xiàn)程喚醒,否則自己?jiǎn)拘眩?jí)聯(lián)通知)
 * @Author: hongcaixia
 * @Date: 2025/1/13 15:23
 * @Version V1.0
 */
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {

    E[] array;

    // 隊(duì)列大小
    AtomicInteger size = new AtomicInteger(0);

    // 頭指針
    int head = 0;

    // 尾指針
    int tail = 0;

    // 出隊(duì)鎖
    ReentrantLock headLock = new ReentrantLock();
    // 入隊(duì)鎖
    ReentrantLock tailLock = new ReentrantLock();

    // 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
    Condition tailWait = tailLock.newCondition();

    // 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
    Condition headWait = headLock.newCondition();

    public DoubleLockBlockingQueue(int capacity) {
        array = (E[]) new Object[capacity];
    }


    @Override
    public void offer(E e) throws InterruptedException {

        // 隊(duì)列當(dāng)前大小
        int beforeAdd;

        tailLock.lockInterruptibly();
        try {
            // tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                tailWait.await();
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            // 返回增加之前的值
            beforeAdd = size.getAndIncrement();
            if (beforeAdd + 1 < array.length) {
                // 說(shuō)明隊(duì)列還沒(méi)有滿(mǎn),喚醒阻塞的offer線(xiàn)程
                tailWait.signal();
            }
        } finally {
            tailLock.unlock();
        }

        // 再offer這里加了鎖去喚醒非空的poll線(xiàn)程,會(huì)導(dǎo)致正常的poll線(xiàn)程也阻塞,搶不到鎖,所以需要減少喚醒時(shí)的加鎖次數(shù)
        /**
         * 當(dāng)隊(duì)列原本為空,剛好入隊(duì)了一個(gè)元素時(shí),此時(shí)執(zhí)行喚醒操作,對(duì)于剩余的其他阻塞了的poll線(xiàn)程,交給poll線(xiàn)程自己來(lái)喚醒
         */
        if (beforeAdd == 0) {
            headLock.lock();
            try{
                // 隊(duì)列從空變化到不空,會(huì)喚醒一個(gè)等待的 poll 線(xiàn)程,為了避免死鎖,需要等tailLock釋放了之后才開(kāi)始加(條件變量的 await(), signal() 等方法需要先獲得與之關(guān)聯(lián)的鎖)
                headWait.signal();
            }finally {
                headLock.unlock();
            }
        }
    }

    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        int beforeAdd;
        tailLock.lockInterruptibly();
        try {
            long time = TimeUnit.MILLISECONDS.toNanos(timeout);
            // 喚醒后應(yīng)該重新檢查條件
            while (isFull()) {
                if (time <= 0) {
                    return;
                }
                // 重新更新等待時(shí)間
                time = tailWait.awaitNanos(time);
            }
            array[tail] = e;
            tail++;
            if (tail == array.length) {
                tail = 0;
            }
            beforeAdd = size.getAndIncrement();
            if (beforeAdd + 1 < array.length) {
                // 如果入隊(duì)之前+1 還有剩余空間,說(shuō)明隊(duì)列中元素還有剩余,可以喚醒阻塞的polll線(xiàn)程
                tailWait.signal();
            }
        } finally {
            tailLock.unlock();
        }

        // 如果入隊(duì)之前隊(duì)列時(shí)空的,那么此時(shí)只有一個(gè)元素,就喚醒阻塞的poll線(xiàn)程
        if (beforeAdd == 0) {
            headLock.lock();
            try {
                // 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
                headWait.signal();
            } finally {
                headLock.unlock();
            }
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        // size減之前的值
        int beforeDecrement;

        headLock.lockInterruptibly();
        try {
            // 喚醒后應(yīng)該重新檢查條件
            while (isEmpty()) {
                headWait.await();
            }
            e = array[head];
            array[head] = null;
            head++;
            if (head == array.length) {
                head = 0;
            }
            beforeDecrement = size.getAndDecrement();
            if (beforeDecrement > 1) {
                // 說(shuō)明隊(duì)列中還有剩余元素(并非當(dāng)前poll完了之后就為空了),所以還可以喚醒阻塞的poll線(xiàn)程
                headWait.signal();
            }
        } finally {
            headLock.unlock();
        }

        // 隊(duì)列剛從滿(mǎn)變成不滿(mǎn)的時(shí)候,去喚醒等待隊(duì)列中的offer線(xiàn)程,剩余阻塞的offer線(xiàn)程由offer線(xiàn)程自己?jiǎn)拘?        if (beforeDecrement == array.length) {
            tailLock.lock();
            try {
                // 隊(duì)列從滿(mǎn)變化到不滿(mǎn),喚醒等待隊(duì)列中的offer線(xiàn)程
                tailWait.signal();
            } finally {
                tailLock.unlock();
            }
        }
        return e;
    }

    @Override
    public boolean isEmpty() {
        return size.get() == 0;
    }

    @Override
    public boolean isFull() {
        return size.get() == array.length;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容