非內(nèi)置鎖存在的意義
synchronized關(guān)鍵字提供了一套非常完整的java內(nèi)置鎖實(shí)現(xiàn),簡單易用通過塊語句控制鎖的范圍,而且性能不低,隱藏了偏向鎖,輕量、重量鎖等復(fù)雜的概念,對程序員非常友好。
那么為什么還要存在一套獨(dú)立于內(nèi)置鎖的實(shí)現(xiàn)呢,這里我們指的就是java.util.concurrent.locks下面的這套鎖。
- 首先,內(nèi)置鎖的范圍只能位于塊語句內(nèi),不能交叉或者跨越不同塊(方法)。
什么是交叉呢?假想一個(gè)場景來說明,我們有一臺設(shè)備,需要在移動中接入沿路不同的信號基站,接入時(shí)必須持有這個(gè)基站的某個(gè)信道的鎖,但是呢要求連接不斷開,也就是先拿到下一個(gè)基站信道的鎖,連接成功后,才能釋放上一個(gè)信道的鎖,這個(gè)時(shí)候就需要鎖的范圍能夠存在交集,但又不完全重合。
- 其次,內(nèi)置鎖的鎖定過程不能被中斷,只有下一個(gè)等待鎖的線程真正獲取了鎖之后,你的代碼才重新取得控制權(quán)。
- 再次,內(nèi)置鎖不能做嘗試,一旦執(zhí)行到同步塊,就只有兩種結(jié)果,要么得到,要么進(jìn)入等待。
嘗試鎖的意義,在于有些情況下競爭不一定非要取得成功,比如在做并行計(jì)算時(shí),假如線程的個(gè)數(shù)已經(jīng)超過了當(dāng)前待計(jì)算資源的個(gè)數(shù),那么與其頻繁發(fā)生阻塞和競爭,倒不如休眠或者處理其他事情,效率反而更高
- 最后,每個(gè)內(nèi)置鎖都僅有一個(gè)條件等待對象(ConditionObject),也就是這個(gè)實(shí)例自身,只有在它自己的同步塊內(nèi),才能調(diào)用wait和notify。
于是Doug Lea大師的這套鎖實(shí)現(xiàn),就針對了上述的幾種場景,提供了解決方案,而且在實(shí)現(xiàn)的同時(shí),還是保持了高度的可擴(kuò)展性。
搭建可重入鎖的框架
想要擴(kuò)展內(nèi)置鎖,首先要支持可重入,下面來看看重入的概念:
允許同一個(gè)線程,連續(xù)多次獲取同一把鎖
舉例來說,上一篇文章《形形色色的鎖》中提到的幾種自旋鎖實(shí)現(xiàn),都不可重入,而內(nèi)置鎖和locks包下的這些鎖,都是可重入鎖。
要實(shí)現(xiàn)鎖,首先就要明確定義鎖具備的一些基本操作原語
獲取釋放
首先獲取和釋放,獨(dú)立為兩種操作,就允許這兩者發(fā)生在不同的代碼塊甚至函數(shù)中(雖然并不推薦這么做)。
獲?。ㄔ试S中斷)
允許中斷的獲取,直接利用了線程的interrupt標(biāo)記,實(shí)現(xiàn)了鎖的可打斷,即獲取前若已interrupt,則終止動作,獲取中發(fā)現(xiàn)被中斷,同樣終止退出。
嘗試獲取
嘗試獲取,實(shí)現(xiàn)了非阻塞的獲取鎖的能力,要么立即獲取,要么失敗立即返回,把是否輪詢或者其他進(jìn)一步操作的自由,留給了調(diào)用者。
創(chuàng)建條件(condition)
創(chuàng)建條件獨(dú)立成為一種操作后,就不再束縛于object提供的wait和notify機(jī)制,可以創(chuàng)建任意多個(gè)條件對象,然后統(tǒng)統(tǒng)交由一把鎖來控制,可以高效實(shí)現(xiàn)一對多的事件分發(fā)。
源碼中的Lock接口
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
condition接口 (除了多提供帶時(shí)間參數(shù)的重載版本await方法,功能上與Object的wait并無差別)
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
AbstractQueuedSynchronizer
concurrent庫中有大量的鎖實(shí)現(xiàn),其中一部分是繼承自Lock接口比如ReentrantLock、ReentrantReadWriteLock,也有些自稱派系的Semaphore、CountDownLatch,不過看了源碼就發(fā)現(xiàn),并非每種鎖的實(shí)現(xiàn)都在重復(fù)造輪子,因?yàn)樗麄兊幕A(chǔ)行為有著非常多的共同點(diǎn),比如等待隊(duì)列, 獨(dú)占獲取,共享獲取等。
這份抽象,就是AbstractQueuedSynchronizer,下面簡稱AQS
AQS具備以下幾種特性
- 阻塞等待隊(duì)列
- 共享/獨(dú)占
- 公平/非公平
- 可重入
- 允許中斷
為了能夠充分理解AQS,首先要引入一個(gè)支持的工具類,LockSupport,這里我會花費(fèi)較大篇幅,深入和有對比地來介紹java虛擬機(jī)是如何實(shí)現(xiàn)這個(gè)類的功能。
public class LockSupport {
...
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
...
}
LockSupport存在的目的是為鎖操作提供一種阻塞等待的機(jī)制,它的代碼不多,但是存在的意義卻非常重要。試想,阻塞和恢復(fù)運(yùn)行,在java中是多么想當(dāng)然的事情,Object.wait()和早些版本的Thread.suspend()方法都可以實(shí)現(xiàn)這種功能。那為什么還要費(fèi)周折來實(shí)現(xiàn)這么一個(gè)LockSupport呢?
Thread的暫?;謴?fù)操作很早就被deprecated掉了,因?yàn)橛锌赡苷兄滤梨i,那么能否借用舊版sdk suspend()的實(shí)現(xiàn)機(jī)制來代替park?不行,因?yàn)閜ark方法提供的是類似信號量的操作,也就是說,如果先unpark,則下一次park會立即返回,那么可以理解LockSupport等同于初始值為0,最大值為1的信號量。但改變信號量和線程阻塞是兩個(gè)操作,如果要合二為一,又要使用鎖,這里本身就是要在java層提供一個(gè)有別于synchronized的鎖,總不能再去引用人家吧。所以在native層直接合并成一步原子操作是比較合適的。
同樣的原因,Object.wait()首先就必須在自己的同步塊中進(jìn)行,那也必須引用到內(nèi)置鎖,所以這里park/unpark就必須完整地實(shí)現(xiàn)一套自己的阻塞機(jī)制。下面來探究一下這三種阻塞機(jī)制的差別:
- Thread.suspend() 和 Thread.resume()
- Object.wait() 和 Object.notify()
- LockSupport.park() 和 LockSupport.unpark()
在介紹源碼前,先來看下java虛擬機(jī)的線程實(shí)現(xiàn)框架
java線程各個(gè)接口的邏輯其實(shí)是一層一層地分發(fā)下去執(zhí)行的,調(diào)用鏈看起來很冗長,不過研究一下就能看出來jvm的大部分功能,都是依照這個(gè)思路,將邏輯分解和派發(fā)下去的,直到最后和平臺相關(guān)的操作派發(fā)給具體平臺實(shí)現(xiàn)層來完成,比如在windows上創(chuàng)建線程是用win32API的 CreateThread(), 而在linux上就是pthread_create()

native層線程類繼承關(guān)系:

Thread.suspend()
openjdk/jdk/source/share/native/java/lang/Thread.c
...
static JNINativeMethod methods[] = {
...
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
...
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
...
openjdk/hotspot/src/share/vm/prims/jvm.cpp
...
JVM_ENTRY(void, JVM_SuspendThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_SuspendThread");
oop java_thread = JNIHandles::resolve_non_null(jthread);
JavaThread* receiver = java_lang_Thread::thread(java_thread);
if (receiver != NULL) {
{
MutexLockerEx ml(receiver->SR_lock(), Mutex::_no_safepoint_check_flag);
if (receiver->is_external_suspend()) {
return;
}
if (receiver->is_exiting()) { // thread is in the process of exiting
return;
}
receiver->set_external_suspend();
}
receiver->java_suspend();
}
JVM_END
...
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::java_suspend() {
{ MutexLocker mu(Threads_lock);
if (!Threads::includes(this) || is_exiting() || this->threadObj() == NULL) {
return;
}
}
{ MutexLockerEx ml(SR_lock(), Mutex::_no_safepoint_check_flag);
if (!is_external_suspend()) {
// a racing resume has cancelled us; bail out now
return;
}
// suspend is done
uint32_t debug_bits = 0;
if (is_ext_suspend_completed(false /* !called_by_wait */,
SuspendRetryDelay, &debug_bits) ) {
return;
}
}
VM_ForceSafepoint vm_suspend;
VMThread::execute(&vm_suspend);
}
// Part II of external suspension.
// A JavaThread self suspends when it detects a pending external suspend
// request. This is usually on transitions. It is also done in places
// where continuing to the next transition would surprise the caller,
// e.g., monitor entry.
//
// Returns the number of times that the thread self-suspended.
//
// Note: DO NOT call java_suspend_self() when you just want to block current
// thread. java_suspend_self() is the second stage of cooperative
// suspension for external suspend requests and should only be used
// to complete an external suspend request.
//
int JavaThread::java_suspend_self() {
...
while (is_external_suspend()) {
ret++;
this->set_ext_suspended();
// _ext_suspended flag is cleared by java_resume()
while (is_ext_suspended()) {
this->SR_lock()->wait(Mutex::_no_safepoint_check_flag);
}
}
return ret;
}
openjdk/hotspot/src/share/vm/runtime/mutex.cpp
bool Monitor::wait(bool no_safepoint_check, long timeout, bool as_suspend_equivalent) {
Thread * const Self = Thread::current() ;
...
int Monitor::IWait (Thread * Self, jlong timo) {
...
for (;;) {
if (ESelf->Notified) break ;
int err = ParkCommon (ESelf, timo) ;
if (err == OS_TIMEOUT || (NativeMonitorFlags & 1)) break ;
}
...
return WasOnWaitSet != 0 ; // return true IFF timeout
}
static int ParkCommon (ParkEvent * ev, jlong timo) {
// Diagnostic support - periodically unwedge blocked threads
intx nmt = NativeMonitorTimeout ;
if (nmt > 0 && (nmt < timo || timo <= 0)) {
timo = nmt ;
}
int err = OS_OK ;
if (0 == timo) {
ev->park() ;
} else {
err = ev->park(timo) ;
}
return err ;
}
openjdk/hotspot/src/os/linux/vm/os_linux.cpp
void os::PlatformEvent::park() { // AKA "down()"
...
if (v == 0) {
...
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
...
}
}
結(jié)論很清楚,最終一步一步調(diào)用進(jìn)入了pthread_cond_wait,也就是利用了linux pthread的鎖(其他平臺版本也有各自的實(shí)現(xiàn)),進(jìn)入了阻塞狀態(tài),而條件鎖能夠阻塞最終一定是通過系統(tǒng)調(diào)用,隨后將當(dāng)前該線程移出調(diào)度。但是這個(gè)過程是怎么從第一步j(luò)ava_suspend()調(diào)用到j(luò)ava_suspend_self()中去了呢?
我們注意到:在JVM_SuspendThread函數(shù)中,set_external_suspend()就已經(jīng)被調(diào)用了,也就是說調(diào)用java_suspend()前,這個(gè)標(biāo)記就已經(jīng)置位了,接下來就等著這個(gè)標(biāo)記被檢查就行了,我們看看源碼的注釋怎么解釋的:
openjdk/hotspot/src/share/vm/runtime/thread.hpp
// The external_suspend
// flag is checked by has_special_runtime_exit_condition() and java thread
// will self-suspend when handle_special_runtime_exit_condition() is
// called. Most uses of the _thread_blocked state in JavaThreads are
// considered the same as being externally suspended; if the blocking
// condition lifts, the JavaThread will self-suspend. Other places
// where VM checks for external_suspend include:
// + mutex granting (do not enter monitors when thread is suspended)
// + state transitions from _thread_in_native
//
// In general, java_suspend() does not wait for an external suspend
// request to complete. When it returns, the only guarantee is that
// the _external_suspend field is true.
總結(jié)一下,有三種時(shí)機(jī)會檢查這個(gè)標(biāo)記位:
- has_special_runtime_exit_condition()調(diào)用時(shí)
- 要進(jìn)入monitors的時(shí)候,如果此時(shí)已經(jīng)是suspend,就不用再進(jìn)了
- javaThread從native狀態(tài)切換回java狀態(tài)的時(shí)候
其中,1和3會調(diào)用java_suspend_self()
openjdk/hotspot/src/share/vm/runtime/thread.hpp
bool has_special_runtime_exit_condition() {
return (_special_runtime_exit_condition != _no_async_condition) ||
is_external_suspend() || is_deopt_suspend();
}
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::handle_special_runtime_exit_condition(bool check_asyncs) {
bool do_self_suspend = is_external_suspend_with_lock();
if (do_self_suspend && (!AllowJNIEnvProxy || this == JavaThread::current())) {
frame_anchor()->make_walkable(this);
java_suspend_self();
}
if (check_asyncs) {
check_and_handle_async_exceptions();
}
}
void JavaThread::check_safepoint_and_suspend_for_native_trans(JavaThread *thread) {
JavaThread *curJT = JavaThread::current();
bool do_self_suspend = thread->is_external_suspend();
...
if (do_self_suspend && (!AllowJNIEnvProxy || curJT == thread)) {
JavaThreadState state = thread->thread_state();
thread->set_thread_state(_thread_blocked);
thread->java_suspend_self();
thread->set_thread_state(state);
...
}
}
if (SafepointSynchronize::do_call_back()) {
// If we are safepointing, then block the caller which may not be
// the same as the target thread (see above).
SafepointSynchronize::block(curJT);
}
...
}
那么有哪些地方會調(diào)用handle_special_runtime_exit_condition()呢,比如在SafepointSynchronize::block中,或者在JavaCallWrapper::JavaCallWrapper調(diào)用被包裝的java方法前,或者在javaThread狀態(tài)相關(guān)的類析構(gòu)的時(shí)候。而check_safepoint_and_suspend_for_native_trans()則會在SharedRuntime::generate_native_wrapper或者InterpreterGenerator::generate_native_entry中調(diào)用,總而言之,就是不論調(diào)用的是java方法還是native方法,總會有機(jī)會檢查這個(gè)_external_suspend標(biāo)記并調(diào)用第二步的java_suspend_self()。因?yàn)楸疚膫?cè)重于鎖和線程,更多的關(guān)于java/native方法調(diào)用過程,有機(jī)會再單獨(dú)寫文章分析之。
SafePoint
我們注意到j(luò)ava_suspend()方法中有這么一段:
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::java_suspend() {
...
VM_ForceSafepoint vm_suspend;
VMThread::execute(&vm_suspend);
...
}
這里引入一個(gè)概念SafePoint,援引openJdk官方的解釋:
A point during program execution at which all GC roots are known and all heap object contents are consistent. From a global point of view, all threads must block at a safepoint before the GC can run.
通俗解釋SafePoint原本是為了方便GC而在字節(jié)碼中或者編譯成二進(jìn)制的指令中插入的一些特殊操作:
- 對于解釋執(zhí)行,這個(gè)插入的操作就是去檢查當(dāng)前線程是否處于SafePoint同步狀態(tài),如果是就進(jìn)入阻塞。
- 對于已經(jīng)AOT的代碼比如經(jīng)過HotSpot優(yōu)化過的部分,這個(gè)操作就是訪問一個(gè)特殊內(nèi)存位置造成SIGSEGV,然后因?yàn)镴VM已經(jīng)自己捕獲了這個(gè)信號,所以就有機(jī)會檢查,是否是因?yàn)镾afePoint而進(jìn)入,進(jìn)而執(zhí)行阻塞。
SafePoint出現(xiàn)的位置主要有:
- 循環(huán)的末尾 (防止大循環(huán)的時(shí)候一直不進(jìn)入safepoint,而其他線程在等待它進(jìn)入safepoint)
- 方法返回前
- 調(diào)用方法的call之后
- 拋出異常的位置
openjdk/hotspot/src/share/vm/runtime/vm_operations.hpp
// dummy vm op, evaluated just to force a safepoint
class VM_ForceSafepoint: public VM_Operation {
public:
VM_ForceSafepoint() {}
void doit() {}
VMOp_Type type() const { return VMOp_ForceSafepoint; }
};
在openJDK的源碼vmThread.cpp以及整個(gè)工程中,都沒有找到對于這個(gè)ForceSafepoint操作的處理相關(guān)代碼,而且它的doit()函數(shù)也沒有其他的實(shí)現(xiàn)。如果按照注釋的字面意思理解,就是強(qiáng)制當(dāng)前這個(gè)線程進(jìn)入SafePoint,也就是說,暫停不是隨隨便便什么時(shí)刻都可以發(fā)生的,只有處在SafePoint才不至于影響到其他活動(譬如內(nèi)存分配或者GC)。
Object.wait()
openjdk/jdk/src/share/native/java/lang/Object.c
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
JNIEXPORT void JNICALL
Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls,
methods, sizeof(methods)/sizeof(methods[0]));
}
openjdk/hotspot/src/share/vm/prims/jvm.cpp
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
...
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
monitor->wait(millis, true, THREAD);
...
}
openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
...
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
}
...
}
到了這里就能看懂了,最后又走到os_linux.cpp的park()方法里面去了,為什么Object.wait()這么簡單,不想Thread.suspend()那樣呢?那是因?yàn)椋軌蜻M(jìn)入wait,說明已經(jīng)進(jìn)入了synchronized的臨界區(qū)域,很多工作就可以省掉了。
LockSupport.park()
最后來看看LockSupport.park()的實(shí)現(xiàn):
openjdk/jdk/src/share/classes/sun/misc/unsafe.java
public final class Unsafe {
...
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
...
}
openjdk/hotspot/src/share/vm/prims/unsafe.cpp
static JNINativeMethod methods_18[] = {
...
{CC"park", CC"(ZJ)V", FN_PTR(Unsafe_Park)},
{CC"unpark", CC"("OBJ")V", FN_PTR(Unsafe_Unpark)}
...
}
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread->current_park_blocker();
event.set_klass((obj != NULL) ? obj->klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
event.commit();
}
UNSAFE_END
openjdk/hotspot/src/share/vm/runtime/park.hpp
class Parker : public os::PlatformParker {
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
openjdk/hotspot/src/os/linux/vm/os_linux.cpp
void Parker::park(bool isAbsolute, jlong time) {
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
JavaThread *jt = (JavaThread *)thread;
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
ThreadBlockInVM tbivm(jt);
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
OrderAccess::fence();
return;
}
...
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
...
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
status = pthread_mutex_unlock(_mutex);
} else {
status = pthread_mutex_unlock(_mutex);
status = pthread_cond_signal (&_cond[_cur_index]);
}
} else {
pthread_mutex_unlock(_mutex);
}
} else {
pthread_mutex_unlock(_mutex);
}
}
最后沒有懸念的也是調(diào)用了pthread_cond_wait(),但是仔細(xì)觀察,就可以看出這里通過pthread的接口,封裝了一個(gè)上限為1資源,也就是_count,通過這樣的操作,就構(gòu)成了一個(gè)條件鎖,這也是我們在java世界構(gòu)建一套有別于內(nèi)置鎖的鎖實(shí)現(xiàn)的基石。
比較完了三者,我們由要回到主題,繼續(xù)講解AQS了。
再回顧一下AQS的幾個(gè)要素:
- 阻塞等待隊(duì)列
- 共享/獨(dú)占
- 公平/非公平
- 可重入
- 允許中斷
現(xiàn)在我們已經(jīng)有了一個(gè)完全從底層獨(dú)立實(shí)現(xiàn)的條件鎖,它支持了阻塞等待,并且可以被中斷,看起來剩下的主要工作量,就是構(gòu)建等待隊(duì)列了。
阻塞等待隊(duì)列

我們理清楚一下AQS在java層扮演的角色,它其實(shí)就是一個(gè)java層的條件鎖,但和LockSupport提供的基礎(chǔ)條件鎖語義不同的是,它能派生出N多條件,每個(gè)條件都可以有自己的等待隊(duì)列,然后鎖本身也有等待隊(duì)列,也就是說,既可以拿它來構(gòu)造一個(gè)普通的帶隊(duì)列的鎖,也可以構(gòu)造出支持多條件的帶隊(duì)列的鎖。那么公平性其實(shí)就體現(xiàn)在隊(duì)列上,可以插隊(duì)就不公平,不讓插隊(duì),就是公平的。
這里要指出一點(diǎn),即使是公平隊(duì)列,也只能保證在隊(duì)列中的線程按順序獲取鎖,但是并不能保證兩個(gè)線程同時(shí)進(jìn)入隊(duì)列時(shí),先請求的一定排在前面,同樣的,沒有隊(duì)列的時(shí)候,兩個(gè)線程同時(shí)來獲取鎖,這時(shí)誰能拿到也是沒有保證的,僅取決于當(dāng)時(shí)cpu的調(diào)度情況。
大概是處于性能考慮,這個(gè)隊(duì)列的操作是無鎖的,入隊(duì)和出隊(duì)都是基于CAS和自旋重試,下面來看看AQS是如何定義獲取鎖的動作的
獲取鎖的流程:

進(jìn)入等待隊(duì)列的流程:

java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
出隊(duì)的過程,由釋放操作來完成:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
釋放的過程相對簡單些,不過注意下這了s.waitStatus有可能是CANCELED = 1,此時(shí)就必須尋找隊(duì)列中下一個(gè)還沒有被cancel的節(jié)點(diǎn)來釋放。這里我們通過unpark的調(diào)用位置,也能看出,很有可能是和獲取鎖的park動作并發(fā)的,也就是誰先誰后不確定,有可能在競爭者進(jìn)入隊(duì)列后但是park前,當(dāng)前鎖的持有者就調(diào)用了unpark。不過根據(jù)前面的分析,因?yàn)閜ark操作包含了一個(gè)信號量在里面,所以即使先調(diào)用了unpark,也沒關(guān)系,park會立即返回讓其立即重新參與競爭。
但是這里總感覺少了點(diǎn)什么?對了,好像漏掉了設(shè)置隊(duì)列head,此時(shí)被喚醒的線程應(yīng)該是被設(shè)置為head不是嗎?
恩,還真不是,我們再來分析下這個(gè)模型,這個(gè)鏈表顯然是帶頭結(jié)點(diǎn)的,也就是頭結(jié)點(diǎn)不過是一個(gè)空結(jié)構(gòu),用來指示第一個(gè)節(jié)點(diǎn)的位置,通過傳遞頭結(jié)點(diǎn),就可以完成dequeue操作,最上面隊(duì)列示意圖中紫色的當(dāng)前持有鎖的線程,其實(shí)本身是不在這個(gè)等待隊(duì)列里的,但它可以是頭結(jié)點(diǎn)。有點(diǎn)暈嗎?我們還是從release()調(diào)用結(jié)束來分析,被喚醒的第一個(gè)線程,此時(shí)拿到鎖了嗎?并沒有,因?yàn)樗€沒來得及tryAcquire(),在具體的實(shí)現(xiàn)里,通常是要調(diào)用setExclusiveOwnerThread(Thread) 這個(gè)AQS父類的方法把自己設(shè)置進(jìn)去才算真正拿到鎖。所以,從這個(gè)線程被park的地方接著看就明白了,它會在喚醒后的下一個(gè)循環(huán),把自己設(shè)置為頭結(jié)點(diǎn):
final boolean acquireQueued(final Node node, int arg) {
...
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
...
}
共享/獨(dú)占
我們剛才討論的acquire和release操作,都是針對互斥鎖的,那什么是共享鎖呢,直接拿ReentrantReadWriteLock為例,它允許多個(gè)讀線程共享一個(gè)鎖資源,但是同時(shí)又和寫線程互斥,也就好比一本書,寫好之后,多少個(gè)人圍著看都沒問題,但是一旦作者需要修改或重寫,那等大家看完后,這本書就收起來只有作者可以訪問了。(當(dāng)然如果有一個(gè)以上的作者,那他們之間也必須輪流的寫而不能同時(shí)獲取訪問權(quán)。)
如何實(shí)現(xiàn)共享鎖呢,還是來看AQS源碼
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先,這里定義了共享獲取的原語tryAcquireShared,用以區(qū)分互斥鎖
- addWaiter入隊(duì)時(shí)傳入的是Node.SHARED而不是Node.EXCLUSIVE
- 等待結(jié)束后,不僅把字節(jié)置為頭結(jié)點(diǎn),還得看看下一個(gè)節(jié)點(diǎn)是否也是共享獲取,如果是把他喚醒
這里第3點(diǎn)是為了后續(xù)的請求共享鎖線程,第2點(diǎn)是為了給第3點(diǎn)鋪路,那么只有第1點(diǎn)才是最關(guān)鍵的,決定著共享獲取和互斥獲取的差別。那看看ReentrantReadWriteLock是如何實(shí)現(xiàn)tryAcquireShared()的
java.util.concurrent.locks.ReentrantReadWriteLock
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
這里加入了兩個(gè)概念exclusiveCount和sharedCount,獨(dú)占和共享計(jì)數(shù),用來表示當(dāng)前鎖的狀態(tài),顯然有了這兩種狀態(tài),共享和獨(dú)占獲取就能知道自己應(yīng)該是等待還是立即拿到鎖。
獨(dú)占獲取實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
通過對state字段的按位拆分,讀寫鎖擁有了可以標(biāo)記當(dāng)前是屬于共享讀還是互斥寫狀態(tài)的能力,那為什么必須復(fù)用state字段而不能新增字段呢?還是和之前分析LockSupport同樣的理由,如果代表資源的字段有多個(gè),那么就無法通過一次CAS來完成賦值,那就起碼是兩次,于是又要用到鎖來把這兩個(gè)操作一起保護(hù)起來,而這里恰是構(gòu)建鎖代碼的一部分,蛋和雞的問題不是么。
公平/非公平
公平性在代碼上主要依賴AQS的抽象方法tryAcquire的具體實(shí)現(xiàn)來保證,如同上面已經(jīng)分析過的,這里的公平性,僅僅能保證在已經(jīng)存在等待的情況下,隊(duì)列前面的線程能夠優(yōu)先獲取鎖,但是并不能保證兩個(gè)同時(shí)去爭搶的線程,先來的一定先拿到鎖或者排在隊(duì)列的前面(java代碼的非原子性問題導(dǎo)致)
以ReentrantLock的代碼為例,它是通過派生了AQS類來定義共享和非共享兩種行為

那么區(qū)別公平和非公平的關(guān)鍵在于,調(diào)用公平鎖的線程不是直接上去就搶,而是先禮貌的看下有沒有人在排隊(duì),如果有,就自覺排在最后一個(gè),很像香港人對吧:)
java.util.concurrent.locks.ReentrantLock
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
static final class NonfairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可重入 & 允許中斷
最后這兩點(diǎn)特性就比較簡單了,可重入功能實(shí)現(xiàn)關(guān)鍵點(diǎn)在于每次獲取鎖都調(diào)用了setExclusiveOwnerThread(Thread),這樣能夠知道當(dāng)前獲取鎖的是哪個(gè)線程,就很容易做到可重入。(不要搞糊涂了,AQS中代表資源的是state字段,而不是這個(gè)Thread,不然就只能表示0和1,也支持不了上面說的共享鎖了)
對于中斷的支持也不復(fù)雜,因?yàn)槲覀兎治鲞^park方法本身是可以支持中斷的,那么只需要在park被中斷后作出對應(yīng)項(xiàng)的響應(yīng)即可:
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 注意:這里不再是記錄標(biāo)記為而是直接拋
// 中斷異常來跳出循環(huán)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
總結(jié)
1. 可重入鎖框架,相比于內(nèi)置鎖最大的區(qū)別:
- 范圍可重疊
- 獲取鎖過程允許中斷和失敗
- 支持多個(gè)條件隊(duì)列
- 可以擴(kuò)展出讀寫鎖、CountDownLatch、Semphore等多種靈活的形式
2. LockSupport
它從native層為我們提供了一套獨(dú)立于內(nèi)置鎖的完整的帶信號量的阻塞機(jī)制,是能夠?qū)崿F(xiàn)有別于內(nèi)置鎖的另外一套鎖的必要條件
3. AQS
AQS利用了LockSupport的阻塞機(jī)制,在其上構(gòu)建了一系列的接口
acquire()
acquireInterruptibly()
acquireShared()
acquireSharedInterruptibly()
以及對應(yīng)的release和其它接口
如果從抽象角度來說,這些接口或者實(shí)現(xiàn)定義了滿足前面討論過的五種屬性的對應(yīng)語義(semantic),這些語義才是實(shí)現(xiàn)各種各樣的鎖的基石。這也符合分層次設(shè)計(jì)的思想,避免了在每個(gè)具體的鎖實(shí)現(xiàn)中都考慮如何細(xì)致而又正確地去完成這些動作。
補(bǔ)充
1. 關(guān)于原子性的理解
我們反復(fù)提到,沒辦法保證真正意義上的先來先得,就是因?yàn)樵有浴N覀冎纉ava的一行代碼經(jīng)過虛擬機(jī)的解釋,可以轉(zhuǎn)化成多條字節(jié)碼指令,而一條字節(jié)碼指令又可能分解成多條匯編指令來完成(有些偽指令可能會包含多個(gè)指令),從這個(gè)角度上講,先到先得那就必須是先執(zhí)行到鎖方法的第一條匯編指令的那個(gè)線程應(yīng)該優(yōu)先得到鎖,至少在java層這是不可能的。
既然獲取鎖包含了這么多的前置操作,那么如果我們直接用CAS操作,來代替鎖會怎樣。其實(shí)這么做也存在隱患,并非所有場景都適用,比如很有名的ABA問題,我們來看一個(gè)案例:
現(xiàn)有一個(gè)用單向鏈表實(shí)現(xiàn)的堆棧,棧頂為A,這時(shí)線程T1已經(jīng)知道A.next為B,然后希望用CAS將棧頂替換為B:
head.compareAndSet(A,B);
image.png
在T1執(zhí)行上面這條指令之前,線程T2介入,將A、B出棧,再pushD、C、A,此時(shí)堆棧結(jié)構(gòu)如下圖,而對象B此時(shí)處于游離狀態(tài):
image.png
此時(shí)輪到線程T1執(zhí)行CAS操作,檢測發(fā)現(xiàn)棧頂仍為A,所以CAS成功,棧頂變?yōu)锽,但實(shí)際上B.next為null,所以此時(shí)的情況變?yōu)椋?br>image.png
其中堆棧中只有B一個(gè)元素,C和D組成的鏈表不再存在于堆棧中,平白無故就把C、D丟掉了,這是嚴(yán)重的邏輯錯(cuò)誤。
怎么解決呢? 方法有很多,可以參考ObjectMonitor的限制出隊(duì)個(gè)數(shù)的方式或者參考AtomicStampedReference的實(shí)現(xiàn),這里不再展開討論了。
2. 重入鎖的性能
測試場景:MacOS + java 1.8 + 賦值1000萬次
測試結(jié)果:
單線程,無沖突
| 鎖類型 | 耗時(shí)(ms) | 相對增幅(相對第一名) |
|---|---|---|
| synchronized | 166 | +0% |
| writelock | 190 | +14% |
| readlock | 216 | +30% |
在無沖突的情況下,內(nèi)置鎖的速度是最快的,這是因?yàn)槠蜴i機(jī)制的效率更高,至于如何做到這點(diǎn)的,可以參考下面的鏈接資料。
而寫鎖比讀鎖快一丟丟,也許和非競爭條件下,執(zhí)行的代碼量有關(guān),當(dāng)然這點(diǎn)差別絕對不是告訴我們優(yōu)先用寫鎖,還是應(yīng)該看讀寫的頻率來定。
雙線程,有沖突
| 鎖類型 | 耗時(shí)(ms) | 相對增幅(相對第一名) |
|---|---|---|
| synchronized | 448 | +0% |
| 1 readlock + 1 writelock | 1038 | +132% |
| 2 writelock | 1137 | +154% |
3. 內(nèi)存屏障
如果你有仔細(xì)留意分析LockSupport的過程,肯定注意到了這句話OrderAccess::fence();
為什么它總是在一些很重要的操作之后,調(diào)用一下?它其實(shí)是一個(gè)內(nèi)存屏障功能,也叫做memory barier或者memory fence,能夠保證從另一個(gè)CPU來看屏障的兩邊的所有指令都是正確的程序順序,而保持程序順序的外部可見性;其次它們可以實(shí)現(xiàn)內(nèi)存數(shù)據(jù)可見性,確保內(nèi)存數(shù)據(jù)會同步到CPU緩存子系統(tǒng)。我們知道編譯器有個(gè)功能叫OoO(Out-of-order execution),會為了更好利用緩存而交換一些指定的順序,在java中則可能是字節(jié)碼,在java1.5之前,volatile就僅僅做到了可見性而沒有支持內(nèi)存屏障,導(dǎo)致二次判空方式實(shí)現(xiàn)的單例,仍存在重復(fù)創(chuàng)建的風(fēng)險(xiǎn),之后的jdk版本,volatile都同時(shí)涵蓋了可見性和內(nèi)存屏障雙重含義,也就是給volatile變量賦值的時(shí)候,這句代碼后面會被自動插入一條內(nèi)存屏障,來組織編譯器優(yōu)化執(zhí)行順序。所以不是非要解決可見性才能使用volatile.
內(nèi)存屏障 vs CAS
貌似這兩個(gè)功能在一定程度上有些交集,因?yàn)镃AS本身是可以保證執(zhí)行完賦值操作一定完成的。那么就有必要來比較下這兩者的性能差異,通常前者的性能消耗是在若干個(gè)CPU時(shí)鐘周期,而一次硬件CAS則最起碼要數(shù)百個(gè)時(shí)鐘周期,鎖的話開銷就更大了。那么我們要做的,就是選擇既可以滿足我們需求,又開銷最小的一種操作即可。
關(guān)于java內(nèi)存和緩存的一致性以及volatile的更多細(xì)節(jié),在下一篇《形形色色的鎖3》中再詳細(xì)討論
參考資料,但寫這篇文章不一定都用上了
openjdk 1.8 source
偏向鎖
Synchronization Public2
cpu緩存


