java多線程的常用鎖

java多線程的常用鎖

synchronized 關鍵字

若是對象鎖,則每個對象都持有一把自己的獨一無二的鎖,且對象之間的鎖互不影響 。若是類鎖,所有該類的對象共用這把鎖。
一個線程獲取一把鎖,沒有得到鎖的線程只能排隊等待;
synchronized 是可重入鎖,避免很多情況下的死鎖發(fā)生。
synchronized 方法若發(fā)生異常,則JVM會自動釋放鎖。
鎖對象不能為空,否則拋出NPE(NullPointerException)
同步本身是不具備繼承性的:即父類的synchronized 方法,子類重寫該方法,分情況討論:沒有synchonized修飾,則該子類方法不是線程同步的。(PS :涉及同步繼承性的問題要分情況)
synchronized本身修飾的范圍越小越好。畢竟是同步阻塞。跑不快還占著超車道…

synchronized 底層對應的 JVM 模型為 objectMonitor,使用了3個雙向鏈表來存放被阻塞的線程:_cxq(Contention queue)、_EntryList(EntryList)、_WaitSet(WaitSet)。

當線程獲取鎖失敗進入阻塞后,首先會被加入到 _cxq 鏈表,_cxq 鏈表的節(jié)點會在某個時刻被進一步轉移到 _EntryList 鏈表。

當持有鎖的線程釋放鎖后,_EntryList 鏈表頭結點的線程會被喚醒,該線程稱為 successor(假定繼承者),然后該線程會嘗試搶占鎖。

當我們調用 wait() 時,線程會被放入 _WaitSet,直到調用了 notify()/notifyAll() 后,線程才被重新放入 _cxq 或 _EntryList,默認放入 _cxq 鏈表頭部。

objectMonitor 的整體流程如下圖:

20210605142829731.png

鎖升級的流程如下圖:
20210605173039809.png

注:圖片轉載出處(https://zhuanlan.zhihu.com/p/378429667)

ReentrantLock JDK鎖

ReentrantLock先通過CAS嘗試獲取鎖,如果獲取了就將鎖狀態(tài)state設置為1
如果此時鎖已經被占用,
被自己占用:判斷當前的鎖是否是自己占用了,如果是的話就鎖計數器會state++(可重入性)
被其他線程占用:該線程加入AQS隊列并wait()
當前驅線程的鎖被釋放,一直到state==0,掛在CLH隊列為首的線程就會被notify(),然后繼續(xù)CAS嘗試獲取鎖,此時:
非公平鎖,如果有其他線程嘗試lock(),有可能被其他剛好申請鎖的線程搶占
公平鎖,只有在CLH隊列頭的線程才可以獲取鎖,新來的線程只能插入到隊尾。

ReadAndWriteLock 讀寫鎖

每一個ReentrantLock自身維護一個AQS隊列記錄申請鎖的線程信息;

通過大量CAS保證多個線程競爭鎖的時候的并發(fā)安全;

可重入的功能是通過維護state變量來記錄重入次數實現(xiàn)的。

公平鎖需要維護隊列,通過AQS隊列的先后順序獲取鎖,缺點是會造成大量線程上下文切換;

非公平鎖可以直接搶占,所以效率更高;

CountDownLatch 門栓

1.使用方法

CountDownLatch,每執(zhí)行一次countDown() 就會將設置的值-1,減到0,.await()的方法即可往下執(zhí)行

void test() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(3);
    for (int i = 0; i < 3; i++) {
        new Thread(()->{
            System.out.println("線程啟動");
            try {
                Thread.sleep(1000);
                countDownLatch.countDown();
                System.out.println("線程停止");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
    countDownLatch.await();
    System.out.println("主線程停止");
}

2.底層實現(xiàn)

底層是內部維護了一個Sync并且繼承了AQS(阻塞隊列+CAS操作)

CAS維護狀態(tài)位,并且利用CAS操作向AQS的阻塞隊列的隊尾添加元素

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    // 獲取當前狀態(tài)
    int getCount() {
        return getState();
    }

    // 當前狀態(tài)是否往下繼續(xù)運行
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 修改狀態(tài)值
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CyclicBarrier 線程柵欄

1.使用方法

CyclicBarrier,每執(zhí)行一次await() 就會將設置的值+1,加到設置的值,.await()的方法即可往下執(zhí)行

@Test
void test() throws InterruptedException {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    for (int i = 0; i < 10; i++) {
        Thread.sleep(1000);
        new Thread(()->{
            System.out.println("線程啟動");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            System.out.println("線程停止");
        }).start();
    }
    Thread.sleep(10000);
    System.out.println("主線程停止");
}

2.底層實現(xiàn)

CyclicBarrier中包含ReentrantLock和Condition

用ReentrantLock保證count值的原子性操作,Condition來喚醒等待的線程阻塞隊列

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

LongAdder 線程安全的數值類

1.使用方法

使用LongAdder來替代超高并發(fā)下的AtomicLong,使用adder()和sum()進行使用

@Test
void test() throws InterruptedException {
    LongAdder adder = new LongAdder();
    for (int i = 0; i < 1000; i++) {
        new Thread(()->{
            System.out.println("線程啟動");
            adder.increment();
            System.out.println("線程停止");
        }).start();
    }
    Thread.sleep(10000);
    System.out.println(adder.sum());
}

2.底層實現(xiàn)

LongAdder相對于有多個AtomicLong,將高并發(fā)降低為cell,每個cell內部又會用到CAS操作來實現(xiàn)原子性操作,最終使用sum()求和來獲取最終數值

LongAdder在沒有線程競爭的時候,只使用base值,此時的情況就類似與AtomicLong。但LongAdder的高明之處在于,發(fā)生線程競爭時,便會使用到Cell數組,所以該數組是惰性加載的。

abstract class Striped64 extends Number {

    @sun.misc.Contended static final class Cell {}
}
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

Phaser 階段器

1.使用方法

與CountDownLanuch和CyclicBarrier不同的是 里面的parties是動態(tài)配置,Phaser可以動態(tài)注冊需要協(xié)調的線程,相比CountDownLatch和CyclicBarrier就會變得更加靈活。

public class Phaser001 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        IntStream.rangeClosed(1,10).forEach(i->new MyTask(phaser).start());

        //等待注冊的任務全部完成
        phaser.arriveAndAwaitAdvance();

        System.out.println("任務全部完成");
    }

}

class MyTask extends Thread{

    public Phaser phaser;

    public MyTask(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register();
        System.out.println("任務注冊");
    }

    @Override
    public void run() {
        System.out.println("開始執(zhí)行任務");
        System.out.println("第一階段任務執(zhí)行完成");
        //當前注冊任務已經到達
        this.phaser.arrive();
    }
}

2.底層實現(xiàn)

cas操作

private int doRegister(int registrations) {
    // adjustment to state
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (;;) {
        long s = (parent == null) ? state : reconcileState();
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            break;
        if (counts != EMPTY) {                  // not 1st registration
            if (parent == null || reconcileState() == s) {
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    break;
            }
        }
        else if (parent == null) {              // 1st root registration
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                break;
        }
        else {
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break;
                }
            }
        }
    }
    return phase;
}

Semaphore 信號量

1.使用方法

有點像隊列的感覺,定義一個總的信號量,若當前acquire線程達到信號量,則再進行acquire就會進入等待隊列,可以用release釋放,具體例子像數據庫的鏈接池

public class Semaphore001 extends Thread{

    static Semaphore semaphore;

    @Override
    public void run() {
        try {
            //開啟獲取
            this.semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"開始執(zhí)行");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName()+"結束執(zhí)行");
            this.semaphore.release();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    public static void main(String[] args) {
        semaphore = new Semaphore(2);
        IntStream.rangeClosed(0,20).forEach(i ->{
            new Semaphore001().start();
        });
    }
}

2.底層實現(xiàn)

AQS實現(xiàn)

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Exchanger 交換者

1.使用方法

第一個線程先執(zhí)行exchange方法,它會一直等待第二個線程也執(zhí)行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

public class Exchanger001 {
    public static void main(String[] args) {
        Exchanger exchanger = new Exchanger();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"線程啟動");
                int a  = Integer.parseInt(exchanger.exchange(1).toString());
                System.out.println(Thread.currentThread().getName()+"獲取值為"+a);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"線程啟動");
                Thread.sleep(3000);
                int a  = Integer.parseInt(exchanger.exchange(2).toString());
                System.out.println(Thread.currentThread().getName()+"獲取值為"+a);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

底層實現(xiàn)

利用ThreadLocal實現(xiàn)

LockSupport

1.使用方法

利用park()和unpark()實現(xiàn)對指定線程的阻塞和喚醒

public class LockSupport001{
    static Thread t1;
    static Thread t2;
    public static void main(String[] args) {
        t1 = new Thread(()->{
            while(true) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                LockSupport.unpark(t2);
                LockSupport.park();
                System.out.println(Thread.currentThread().getName() + "線程0結束");
            }
        });

        t2 = new Thread(()->{
            while(true) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                LockSupport.park();
                System.out.println(Thread.currentThread().getName() + "線程1結束");
                LockSupport.unpark(t1);
            }
        });
        t1.start();
        t2.start();
    }
}

2.底層實現(xiàn)

public static void park() {
    UNSAFE.park(false, 0L);
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容