看這部分的前提是大家已經(jīng)看過AbstractQueuedSynchronizer這個類,知道它是個啥了哈,如果不知道,請先看這里http://www.itdecent.cn/p/c629aebcd251。
咳咳,這個鎖嘛,就是當程序的某個部分被多個線程執(zhí)行時想讓線程挨個執(zhí)行它而不是大家一擁而上,所以就用鎖來控制它,這個鎖同時只能被一個線程占據(jù),這樣某些程序才不會因為并發(fā)而出錯。
先看怎么使用,再看實現(xiàn)原理吧。
public classReentrantTestextendsThread {
privateReentrantLockreentrantLock;
??? publicReentrantTest(ReentrantLock reentrantLock){
this.reentrantLock= reentrantLock;
???}
@Override
???public voidrun() {
//??????? reentrantLock.lock();
???????try{
for(intj =0;j <100000;j++) {
System.out.print(j);
???????????????System.out.print("\n");
???????????}
}finally{
//??????????? reentrantLock.unlock();
???????}
??? }
public static voidmain(String[] args) {
ReentrantLock reentrantLock =newReentrantLock();
???????ReentrantTest reentrantTest =newReentrantTest(reentrantLock);
???????ReentrantTest reentrantTest1 =newReentrantTest(reentrantLock);
???????reentrantTest.start();
???????reentrantTest1.start();
???}
}
這是一段簡單的示例代碼,目的是為了讓每個線程可以完整打出0到99999,不被中斷,有人會問那你干嘛用多線程…..=。=我承認我例子想得不好,反正就這樣!大家試著跑一跑就能看到效果啦,如果注釋掉上面那兩行,程序輸出的結果就是混亂的,兩個線程輸出的結果是交雜的,但是用了lock就會是一個線程執(zhí)行完才下個線程。
這里又有人問了,那這個跟在方法前加個synchronise有啥區(qū)別?
首先,這都JDK10都要出來了,synchronise在進行了一系列優(yōu)化后,性能還算OK,跟ReentrantLock差距不大了。既然性能差的不多,那就從功能上來對比,確實synchronise很方便,一切由jvm控制,但是它的粒度不如ReentrantLock細,怎么說呢,ReentrantLock是要我們自己寫,從哪里lock,所以就算在一個方法內部,它也可以控制方法的一部分是受到鎖的控制,而另一部分不受,所以粒度比較細,但要注意一定要在finally中釋放鎖,不然程序執(zhí)行出現(xiàn)異常等提前退出的現(xiàn)象可能會導致死鎖。
好,然后呢,ReentrantLock提供了一些額外的功能,比如可重入,可中斷,可限時,可公平等,下面在講它實現(xiàn)的時候會挨個講,這些都是synchronise不具備的啦。
ReentrantLock名字叫Reentrant對吧,重新進入的意思,相信大家聽說過可重入鎖,這其實跟這個ReentrantLock的實現(xiàn)機制有關,它內部定義了一個私有類Sync(這個就是實現(xiàn)鎖這個功能的關鍵):
abstractstaticclassSyncextendsAbstractQueuedSynchronizer {
//序列號
privatestaticfinallongserialVersionUID = -5179523762034025860L;
//獲取鎖
abstractvoidlock();
//非公平方式獲取
finalbooleannonfairTryAcquire(intacquires) {
//當前線程
finalThread current =Thread.currentThread();
//獲取狀態(tài)
intc =getState();
if(c == 0) {//表示沒有線程正在競爭該鎖
if(compareAndSetState(0, acquires)) {//比較并設置狀態(tài)成功,狀態(tài)0表示鎖沒有被占用
//設置當前線程獨占setExclusiveOwnerThread(current);
returntrue;//成功}
}
elseif(current == getExclusiveOwnerThread()) {//當前線程擁有該鎖
intnextc = c + acquires;//增加重入次數(shù)
if(nextc < 0)//overflow
thrownewError("Maximum lock count exceeded");
//設置狀態(tài)setState(nextc);
//成功
returntrue;
}
//失敗
returnfalse;
}
//試圖在共享模式下獲取對象狀態(tài),此方法應該查詢是否允許它在共享模式下獲取對象狀態(tài),如果允許,則獲取它
protectedfinalbooleantryRelease(intreleases) {
intc = getState() -releases;
if(Thread.currentThread() != getExclusiveOwnerThread())//當前線程不為獨占線程
thrownewIllegalMonitorStateException();//拋出異常
//釋放標識
booleanfree =false;
if(c == 0) {
free=true;
//已經(jīng)釋放,清空獨占
setExclusiveOwnerThread(null);
}
//設置標識setState(c);
returnfree;
}
//判斷資源是否被當前線程占有
protectedfinalbooleanisHeldExclusively() {
//While we must in general read state before owner,
//we don't need to do so to check if current thread is owner
returngetExclusiveOwnerThread() ==Thread.currentThread();
}
//新生一個條件
finalConditionObject newCondition() {
returnnewConditionObject();
}
//Methods relayed from outer class
//返回資源的占用線程
finalThread getOwner() {
returngetState() == 0 ?null: getExclusiveOwnerThread();
}
//返回狀態(tài)
finalintgetHoldCount() {
returnisHeldExclusively() ? getState() : 0;
}
//資源是否被占用
finalbooleanisLocked() {
returngetState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
//自定義反序列化邏輯
privatevoidreadObject(java.io.ObjectInputStream s)
throwsjava.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);//reset to unlocked state}
}
好的,貼了一段代碼,就是Sync的定義,注意到,它繼承自AbstractQueuedSynchronizer,我講這個是為了講ReentrantLock的實現(xiàn)機制哈,你打開源碼就知道這個ReentrantLock其實沒別的東西,主要就是靠這個Sync,所以得把這個Sync是干嘛的講通。好的繼續(xù),這個Sync里就有幾個起主要作用的方法,下面一一說明這個東西是怎么工作的。
首先,最重要的方法:lock():
abstract void lock();
第一部分?FairSync
它是一個抽象方法,具體的實現(xiàn)在它FairSync和NonfairSync兩個子類中(一個是公平鎖一個是不公平鎖
),當這個方法被調用時,意味著執(zhí)行該方法的當前線程是在試圖去獲取這個鎖,我們先看NonfairSync的lock方法:
final voidlock() {
???if(compareAndSetState(0,1))
??????? setExclusiveOwnerThread(Thread.currentThread());
??? else
???????acquire(1);
}
可以看到compareAndSetState(0, 1)如果此刻鎖的狀態(tài)是0就代表了鎖現(xiàn)在沒人占,那么我們當前線程直接占了就行,否則,執(zhí)行acquire方法:
public final voidacquire(intarg) {
???if(!tryAcquire(arg) &&
??????? acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
???????selfInterrupt();
}
這個方法在AbstractQueuedSynchronizer里面哈,讓你們去看了,因為它是基礎,然后這個方法里呢,tryAcquire則是在NonfairSync里實現(xiàn)的:
protected final booleantryAcquire(intacquires) {
???returnnonfairTryAcquire(acquires);
}
原來它也就調用的父類中的nonfairTryAcquire方法:
final booleannonfairTryAcquire(intacquires) {
???finalThread current = Thread.currentThread();
??? intc = getState();
??? if(c ==0) {
???????if(compareAndSetState(0,acquires)) {
??????????? setExclusiveOwnerThread(current);
??????????? return true;
???????}
??? }
???else if(current == getExclusiveOwnerThread()) {
???????intnextc = c + acquires;
??????? if(nextc <0)// overflow
???????????throw newError("Maximum lock count exceeded");
???????setState(nextc);
??????? return true;
???}
???return false;
}
分析一波,這個參數(shù)代表可以重復的獲取這個鎖,State這個變量,存儲的就是這個鎖被占有的狀態(tài),0就是沒被占有,1就是被占有一次,n就是被占有n次(這個次數(shù)可能描述不是特別恰當,自己體會~),然后加入狀態(tài)是0,那么把這個state設置為acquires這么大,然后占有這個鎖,假如狀態(tài)不是0,那么又會去判斷,占有這個鎖的是不是當前線程(不是多此一舉哈,因為前面說了,ReentrantLock可以重復申請,也就是可重入,那么就有可能這個鎖已經(jīng)被當前線程占有啦),那就把狀態(tài)的值再設為當前的狀態(tài)值加上acquires,如果不滿足以上條件,啥都不干,返回false(返回false后接下來的操作就會回到AbstractQueuedSynchronizer里面去,也就是排隊等待的那套流程,你們要看?。。?/p>
第二部分 NonFairSync
FairSync里面的lock方法更簡單:
final voidlock() {
??? acquire(1);
}
直接執(zhí)行acquire方法,我們上面寫出來這個方法,所以還是會調用tryAcquire方法:
???protected final booleantryAcquire(intacquires) {
???????finalThread current = Thread.currentThread();
??????? intc = getState();
??????? if(c ==0) {
???????????if(!hasQueuedPredecessors() &&
??????????????? compareAndSetState(0,acquires)) {
??????????????? setExclusiveOwnerThread(current);
??????????????? return true;
???????????}
??????? }
???????else if(current == getExclusiveOwnerThread()) {
???????????intnextc = c + acquires;
??????????? if(nextc <0)
???????????????throw newError("Maximum lock count exceeded");
???????????setState(nextc);
??????????? return true;
???????}
???????return false;
???}
}
就是當鎖狀態(tài)為0時,公平鎖會調用hasQueuedPredecessors判斷是否有其他線程比它等待更長時間,方法如下:? ?
public final booleanhasQueuedPredecessors() {
???Node t =tail;?
???Node h =head;
???Node s;
??? returnh != t &&
??????? ((s = h.next) ==null|| s.thread!= Thread.currentThread());
}
來看下它的判斷條件,?滿足下面幾個條件返回true:
首先等待隊列的頭節(jié)點不等于尾節(jié)點,其次,頭節(jié)點的下一個節(jié)點不為空,或者當前線程不是頭節(jié)點的下一個節(jié)點。
如果返回true代表當前線程節(jié)點前面還有等待的其他線程,那么為了公平當前線程就不會去占有這個鎖(就是說即使這個鎖現(xiàn)在是空著的也不會去占),不像不公平鎖,它不會去判斷是否隊列還有更靠前的線程在等待
???????else if(current == getExclusiveOwnerThread()) {
???????????intnextc = c + acquires;
??????????? if(nextc <0)
???????????????throw newError("Maximum lock count exceeded");
???????????setState(nextc);
??????????? return true;
???????}
這部分跟不公平鎖是一樣的。
對,很簡單就講完了,自然它里面還提供了一些其他方法,不過都很簡單啦,大家可以自己去看,至于unlock:
public voidunlock() {
???sync.release(1);
}
簡單哈,就是調用:
public final booleanrelease(intarg) {
???if(tryRelease(arg)) {
??????? Node h =head;
??????? if(h !=null&& h.waitStatus!=0)
??????????? unparkSuccessor(h);
??????? return true;
???}
???return false;
}
先調用tryRlease方法:
protected final booleantryRelease(intreleases) {
???intc = getState() - releases;
??? if(Thread.currentThread() != getExclusiveOwnerThread())
???????throw newIllegalMonitorStateException();
??? booleanfree =false;
??? if(c ==0) {
??????? free =true;
???????setExclusiveOwnerThread(null);
???}
??? setState(c);
??? returnfree;
}
將狀態(tài)設為當前狀態(tài)減去releases的值,如果當前獨占鎖的線程不是當前線程,那么一定有毛病,拋異常!否則,如果減去releases之后的狀態(tài)值為0了的話,代表該鎖已經(jīng)被完全釋放了,那么就把這個鎖的所有者設置為null(不再是當前線程)。
在tryRlease成功的情況下,如果等待隊列的頭既不為null并且它的等待狀態(tài)不是0,那么就要調用AQS中的unparkSuccessor方法(可能會喚醒它的下一個節(jié)點),這個等待狀態(tài)的說明在AQS那篇文章里也有講哈,這里不重復了。
整個獲取鎖和釋放鎖的過程就是這樣了。
第三部分 可限時
可限時和可中斷我們還沒講,我相信可限時很好理解,就是獲取鎖的時候等待特定的時間后就不再等待直接返回,相應的方法:
public booleantryLock(longtimeout,TimeUnit unit)
???????throwsInterruptedException {
???returnsync.tryAcquireNanos(1,unit.toNanos(timeout));
}
第四部分 可中斷
可中斷有什么用呢?就是它能夠及時地響應中斷信號(可能從別的線程發(fā)送過來的中斷信號),ReentrantLock里面的可中斷方法是:
public voidlockInterruptibly()throwsInterruptedException {
???sync.acquireInterruptibly(1);
}
走進acquireInterruptibly方法:
public final voidacquireInterruptibly(intarg)
???????throwsInterruptedException {
???if(Thread.interrupted())
???????throw newInterruptedException();
??? if(!tryAcquire(arg))
??????? doAcquireInterruptibly(arg);
}
其實就是判斷當前的Thread的中斷信號,如果是被中斷的狀態(tài)就直接拋異常,就不會繼續(xù)去tryAcquire了,否則還是會去等待獲取鎖,跟lock()一樣。
看看doAcquireInterruptibly方法:
private voiddoAcquireInterruptibly(intarg)
throwsInterruptedException {
finalNode node = addWaiter(Node.EXCLUSIVE);
??? booleanfailed =true;
??? try{
for(;;) {
finalNode p = node.predecessor();
??????????? if(p ==head&& tryAcquire(arg)) {
setHead(node);
???????????????p.next=null;// help GC
???????????????failed =false;
??????????????? return;
???????????}
if(shouldParkAfterFailedAcquire(p,node) &&
??????????????? parkAndCheckInterrupt())
throw newInterruptedException();
???????}
}finally{
if(failed)
cancelAcquire(node);
???}
}
在我那篇AbstractQueuedSynchronizer源碼解析講過acquireQueued方法:
final booleanacquireQueued(finalNode node, intarg) {
booleanfailed =true;
??? try{
booleaninterrupted =false;
??????? for(;;) {
finalNode p = node.predecessor();
??????????? if(p ==head&& tryAcquire(arg)) {
setHead(node);
???????????????p.next=null;// help GC
???????????????failed =false;
??????????????? returninterrupted;
???????????}
if(shouldParkAfterFailedAcquire(p,node) &&
??????????????? parkAndCheckInterrupt())
interrupted =true;
???????}
}finally{
if(failed)
cancelAcquire(node);
???}
}
仔細對比一下它們的區(qū)別,就只是當線程中斷標志為true的時候,acquireQueued只是保存一下當前線程的狀態(tài)(傳遞出去以免被重置了),而doAcquireInterruptibly直接就拋異常了。
那么中斷的好處就是,當我們定義線程需要被中斷的時候就直接異常,返回,不再繼續(xù)執(zhí)行下面的操作,節(jié)約了時間,對中斷不了解的請看http://www.itdecent.cn/p/5708ac61226b
ReentrantLock也就差不多了~寫的有什么不對的地方歡迎指正。