java多線程的常用鎖
synchronized 關鍵字
若是對象鎖,則每個對象都持有一把自己的獨一無二的鎖,且對象之間的鎖互不影響 。若是類鎖,所有該類的對象共用這把鎖。
一個線程獲取一把鎖,沒有得到鎖的線程只能排隊等待;
synchronized 是可重入鎖,避免很多情況下的死鎖發(fā)生。
synchronized 方法若發(fā)生異常,則JVM會自動釋放鎖。
鎖對象不能為空,否則拋出NPE(NullPointerException)
同步本身是不具備繼承性的:即父類的synchronized 方法,子類重寫該方法,分情況討論:沒有synchonized修飾,則該子類方法不是線程同步的。(PS :涉及同步繼承性的問題要分情況)
synchronized本身修飾的范圍越小越好。畢竟是同步阻塞。跑不快還占著超車道…
synchronized 底層對應的 JVM 模型為 objectMonitor,使用了3個雙向鏈表來存放被阻塞的線程:_cxq(Contention queue)、_EntryList(EntryList)、_WaitSet(WaitSet)。
當線程獲取鎖失敗進入阻塞后,首先會被加入到 _cxq 鏈表,_cxq 鏈表的節(jié)點會在某個時刻被進一步轉移到 _EntryList 鏈表。
當持有鎖的線程釋放鎖后,_EntryList 鏈表頭結點的線程會被喚醒,該線程稱為 successor(假定繼承者),然后該線程會嘗試搶占鎖。
當我們調用 wait() 時,線程會被放入 _WaitSet,直到調用了 notify()/notifyAll() 后,線程才被重新放入 _cxq 或 _EntryList,默認放入 _cxq 鏈表頭部。
objectMonitor 的整體流程如下圖:

鎖升級的流程如下圖:

注:圖片轉載出處(https://zhuanlan.zhihu.com/p/378429667)
ReentrantLock JDK鎖
ReentrantLock先通過CAS嘗試獲取鎖,如果獲取了就將鎖狀態(tài)state設置為1
如果此時鎖已經被占用,
被自己占用:判斷當前的鎖是否是自己占用了,如果是的話就鎖計數器會state++(可重入性)
被其他線程占用:該線程加入AQS隊列并wait()
當前驅線程的鎖被釋放,一直到state==0,掛在CLH隊列為首的線程就會被notify(),然后繼續(xù)CAS嘗試獲取鎖,此時:
非公平鎖,如果有其他線程嘗試lock(),有可能被其他剛好申請鎖的線程搶占
公平鎖,只有在CLH隊列頭的線程才可以獲取鎖,新來的線程只能插入到隊尾。
ReadAndWriteLock 讀寫鎖
每一個ReentrantLock自身維護一個AQS隊列記錄申請鎖的線程信息;
通過大量CAS保證多個線程競爭鎖的時候的并發(fā)安全;
可重入的功能是通過維護state變量來記錄重入次數實現(xiàn)的。
公平鎖需要維護隊列,通過AQS隊列的先后順序獲取鎖,缺點是會造成大量線程上下文切換;
非公平鎖可以直接搶占,所以效率更高;
CountDownLatch 門栓
1.使用方法
CountDownLatch,每執(zhí)行一次countDown() 就會將設置的值-1,減到0,.await()的方法即可往下執(zhí)行
void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("線程啟動");
try {
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("線程停止");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
countDownLatch.await();
System.out.println("主線程停止");
}
2.底層實現(xiàn)
底層是內部維護了一個Sync并且繼承了AQS(阻塞隊列+CAS操作)
CAS維護狀態(tài)位,并且利用CAS操作向AQS的阻塞隊列的隊尾添加元素
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
// 獲取當前狀態(tài)
int getCount() {
return getState();
}
// 當前狀態(tài)是否往下繼續(xù)運行
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 修改狀態(tài)值
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CyclicBarrier 線程柵欄
1.使用方法
CyclicBarrier,每執(zhí)行一次await() 就會將設置的值+1,加到設置的值,.await()的方法即可往下執(zhí)行
@Test
void test() throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
new Thread(()->{
System.out.println("線程啟動");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
System.out.println("線程停止");
}).start();
}
Thread.sleep(10000);
System.out.println("主線程停止");
}
2.底層實現(xiàn)
CyclicBarrier中包含ReentrantLock和Condition
用ReentrantLock保證count值的原子性操作,Condition來喚醒等待的線程阻塞隊列
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
LongAdder 線程安全的數值類
1.使用方法
使用LongAdder來替代超高并發(fā)下的AtomicLong,使用adder()和sum()進行使用
@Test
void test() throws InterruptedException {
LongAdder adder = new LongAdder();
for (int i = 0; i < 1000; i++) {
new Thread(()->{
System.out.println("線程啟動");
adder.increment();
System.out.println("線程停止");
}).start();
}
Thread.sleep(10000);
System.out.println(adder.sum());
}
2.底層實現(xiàn)
LongAdder相對于有多個AtomicLong,將高并發(fā)降低為cell,每個cell內部又會用到CAS操作來實現(xiàn)原子性操作,最終使用sum()求和來獲取最終數值
LongAdder在沒有線程競爭的時候,只使用base值,此時的情況就類似與AtomicLong。但LongAdder的高明之處在于,發(fā)生線程競爭時,便會使用到Cell數組,所以該數組是惰性加載的。
abstract class Striped64 extends Number {
@sun.misc.Contended static final class Cell {}
}
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
Phaser 階段器
1.使用方法
與CountDownLanuch和CyclicBarrier不同的是 里面的parties是動態(tài)配置,Phaser可以動態(tài)注冊需要協(xié)調的線程,相比CountDownLatch和CyclicBarrier就會變得更加靈活。
public class Phaser001 {
public static void main(String[] args) {
Phaser phaser = new Phaser();
IntStream.rangeClosed(1,10).forEach(i->new MyTask(phaser).start());
//等待注冊的任務全部完成
phaser.arriveAndAwaitAdvance();
System.out.println("任務全部完成");
}
}
class MyTask extends Thread{
public Phaser phaser;
public MyTask(Phaser phaser) {
this.phaser = phaser;
this.phaser.register();
System.out.println("任務注冊");
}
@Override
public void run() {
System.out.println("開始執(zhí)行任務");
System.out.println("第一階段任務執(zhí)行完成");
//當前注冊任務已經到達
this.phaser.arrive();
}
}
2.底層實現(xiàn)
cas操作
private int doRegister(int registrations) {
// adjustment to state
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
Semaphore 信號量
1.使用方法
有點像隊列的感覺,定義一個總的信號量,若當前acquire線程達到信號量,則再進行acquire就會進入等待隊列,可以用release釋放,具體例子像數據庫的鏈接池
public class Semaphore001 extends Thread{
static Semaphore semaphore;
@Override
public void run() {
try {
//開啟獲取
this.semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"開始執(zhí)行");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+"結束執(zhí)行");
this.semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
semaphore = new Semaphore(2);
IntStream.rangeClosed(0,20).forEach(i ->{
new Semaphore001().start();
});
}
}
2.底層實現(xiàn)
AQS實現(xiàn)
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Exchanger 交換者
1.使用方法
第一個線程先執(zhí)行exchange方法,它會一直等待第二個線程也執(zhí)行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
public class Exchanger001 {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"線程啟動");
int a = Integer.parseInt(exchanger.exchange(1).toString());
System.out.println(Thread.currentThread().getName()+"獲取值為"+a);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"線程啟動");
Thread.sleep(3000);
int a = Integer.parseInt(exchanger.exchange(2).toString());
System.out.println(Thread.currentThread().getName()+"獲取值為"+a);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
底層實現(xiàn)
利用ThreadLocal實現(xiàn)
LockSupport
1.使用方法
利用park()和unpark()實現(xiàn)對指定線程的阻塞和喚醒
public class LockSupport001{
static Thread t1;
static Thread t2;
public static void main(String[] args) {
t1 = new Thread(()->{
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.unpark(t2);
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "線程0結束");
}
});
t2 = new Thread(()->{
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "線程1結束");
LockSupport.unpark(t1);
}
});
t1.start();
t2.start();
}
}
2.底層實現(xiàn)
public static void park() {
UNSAFE.park(false, 0L);
}