【細談Java并發(fā)】談?wù)凩ockSupport

1、簡介

LockSupport 和 CAS 是Java并發(fā)包中很多并發(fā)工具控制機制的基礎(chǔ),它們底層其實都是依賴Unsafe實現(xiàn)。

LockSupport是用來創(chuàng)建鎖和其他同步類的基本線程阻塞原語。LockSupport 提供park()和unpark()方法實現(xiàn)阻塞線程和解除線程阻塞,LockSupport和每個使用它的線程都與一個許可(permit)關(guān)聯(lián)。permit相當(dāng)于1,0的開關(guān),默認是0,調(diào)用一次unpark就加1變成1,調(diào)用一次park會消費permit, 也就是將1變成0,同時park立即返回。再次調(diào)用park會變成block(因為permit為0了,會阻塞在這里,直到permit變?yōu)?), 這時調(diào)用unpark會把permit置為1。每個線程都有一個相關(guān)的permit, permit最多只有一個,重復(fù)調(diào)用unpark也不會積累。

park()和unpark()不會有 Thread.suspendThread.resume 所可能引發(fā)的死鎖問題,由于許可的存在,調(diào)用 park 的線程和另一個試圖將其 unpark 的線程之間的競爭將保持活性。

如果調(diào)用線程被中斷,則park方法會返回。同時park也擁有可以設(shè)置超時時間的版本。

三種形式的 park 還各自支持一個 blocker 對象參數(shù)。此對象在線程受阻塞時被記錄,以允許監(jiān)視工具和診斷工具確定線程受阻塞的原因。(這樣的工具可以使用方法 getBlocker(java.lang.Thread) 訪問 blocker。)建議最好使用這些形式,而不是不帶此參數(shù)的原始形式。在鎖實現(xiàn)中提供的作為 blocker 的普通參數(shù)是 this。
看下線程dump的結(jié)果來理解blocker的作用。

image

從線程dump結(jié)果可以看出:
有blocker的可以傳遞給開發(fā)人員更多的現(xiàn)場信息,通過jstack命令可以非常方便的監(jiān)控具體的阻塞對象,方便定位問題。所以java6新增加帶blocker入?yún)⒌南盗衟ark方法,替代原有的park方法。

看一個Java docs中的示例用法:一個先進先出非重入鎖類的框架

class FIFOMutex {
    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
      = new ConcurrentLinkedQueue<Thread>();
 
    public void lock() {
      boolean wasInterrupted = false;
      Thread current = Thread.currentThread();
      waiters.add(current);
 
      // Block while not first in queue or cannot acquire lock
      while (waiters.peek() != current ||
             !locked.compareAndSet(false, true)) {
        LockSupport.park(this);
        if (Thread.interrupted()) // ignore interrupts while waiting
          wasInterrupted = true;
      }

      waiters.remove();
      if (wasInterrupted)          // reassert interrupt status on exit
        current.interrupt();
    }
 
    public void unlock() {
      locked.set(false);
      LockSupport.unpark(waiters.peek());
    }
  }

2、Unsafe的park和unpark

LockSupport類是Java6(JSR166-JUC)引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調(diào)用了Unsafe類里的函數(shù),歸結(jié)到Unsafe里,只有兩個函數(shù):

/**
 * 為指定線程提供“許可(permit)”
 */
public native void unpark(Thread jthread);

/**
 * 阻塞指定時間等待“許可”。
 * @param isAbsolute: 時間是絕對的,還是相對的
 * @param time:等待許可的時間
 */
public native void park(boolean isAbsolute, long time);  

上面的這個“許可”是不能疊加的,“許可”是一次性的。

比如線程B連續(xù)調(diào)用了三次unpark函數(shù),當(dāng)線程A調(diào)用park函數(shù)就使用掉這個“許可”,如果線程A再次調(diào)用park,則進入等待狀態(tài)。

注意,unpark函數(shù)可以先于park調(diào)用。比如線程B調(diào)用unpark函數(shù),給線程A發(fā)了一個“許可”,那么當(dāng)線程A調(diào)用park時,它發(fā)現(xiàn)已經(jīng)有“許可”了,那么它會馬上再繼續(xù)運行。

可能有些朋友還是不理解“許可”這個概念,我們深入HotSpot的源碼來看看。

每個java線程都有一個Parker實例,Parker類是這樣定義的:

class Parker : public os::PlatformParker {  
private:  
  volatile int _counter ;  
  ...  
public:  
  void park(bool isAbsolute, jlong time);  
  void unpark();  
  ...  
}  
class PlatformParker : public CHeapObj<mtInternal> {  
  protected:  
    pthread_mutex_t _mutex [1] ;  
    pthread_cond_t  _cond  [1] ;  
    ...  
}  

可以看到Parker類實際上用Posix的mutex,condition來實現(xiàn)的。在Parker類里的_counter字段,就是用來記錄所謂的“許可”的。

當(dāng)調(diào)用park時,先嘗試直接能否直接拿到“許可”,即_counter>0時,如果成功,則把_counter設(shè)置為0,并返回:

void Parker::park(bool isAbsolute, jlong time) {  
  // Ideally we'd do something useful while spinning, such  
  // as calling unpackTime().  
  
  
  // Optional fast-path check:  
  // Return immediately if a permit is available.  
  // We depend on Atomic::xchg() having full barrier semantics  
  // since we are doing a lock-free update to _counter.  
  if (Atomic::xchg(0, &_counter) > 0) return;  

如果不成功,則構(gòu)造一個ThreadBlockInVM,然后檢查_counter是不是>0,如果是,則把_counter設(shè)置為0,unlock mutex并返回:

ThreadBlockInVM tbivm(jt);  
if (_counter > 0)  { // no wait needed  
  _counter = 0;  
  status = pthread_mutex_unlock(_mutex);  

否則,再判斷等待的時間,然后再調(diào)用pthread_cond_wait函數(shù)等待,如果等待返回,則把_counter設(shè)置為0,unlock mutex并返回:

if (time == 0) {  
  status = pthread_cond_wait (_cond, _mutex) ;  
}  
_counter = 0 ;  
status = pthread_mutex_unlock(_mutex) ;  
assert_status(status == 0, status, "invariant") ;  
OrderAccess::fence();  

當(dāng)unpark時,則簡單多了,直接設(shè)置_counter為1,再unlock mutext返回。如果_counter之前的值是0,則還要調(diào)用pthread_cond_signal喚醒在park中等待的線程:

void Parker::unpark() {  
  int s, status ;  
  status = pthread_mutex_lock(_mutex);  
  assert (status == 0, "invariant") ;  
  s = _counter;  
  _counter = 1;  
  if (s < 1) {  
     if (WorkAroundNPTLTimedWaitHang) {  
        status = pthread_cond_signal (_cond) ;  
        assert (status == 0, "invariant") ;  
        status = pthread_mutex_unlock(_mutex);  
        assert (status == 0, "invariant") ;  
     } else {  
        status = pthread_mutex_unlock(_mutex);  
        assert (status == 0, "invariant") ;  
        status = pthread_cond_signal (_cond) ;  
        assert (status == 0, "invariant") ;  
     }  
  } else {  
    pthread_mutex_unlock(_mutex);  
    assert (status == 0, "invariant") ;  
  }  
}  

簡而言之,是用mutex和condition保護了一個_counter的變量,當(dāng)park時,這個變量置為了0,當(dāng)unpark時,這個變量置為1。

值得注意的是在park函數(shù)里,調(diào)用pthread_cond_wait時,并沒有用while來判斷,所以posix condition里的"Spurious wakeup"一樣會傳遞到上層Java的代碼里。關(guān)于"Spurious wakeup",可以參考:并行編程之條件變量(posix condition variables)

3、LockSupport源碼分析

解釋完Unsafe的park和unpark的實現(xiàn)原理,我們再來看LockSupport的源碼時就會異常清晰,因為不復(fù)雜,所以直接看注釋吧。

public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.

    private static void setBlocker(Thread t, Object arg) {
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }
    
    /**
     * 返回提供給最近一次尚未解除阻塞的 park 方法調(diào)用的 blocker 對象。
     * 如果該調(diào)用不受阻塞,則返回 null。
     * 返回的值只是一個瞬間快照,即由于未解除阻塞或者在不同的 blocker 對象上受阻而具有的線程。
     */
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }
    
    /**
     * 如果給定線程的許可尚不可用,則使其可用。
     * 如果線程在 park 上受阻塞,則它將解除其阻塞狀態(tài)。
     * 否則,保證下一次調(diào)用 park 不會受阻塞。
     * 如果給定線程尚未啟動,則無法保證此操作有任何效果。 
     * @param thread: 要執(zhí)行 unpark 操作的線程;該參數(shù)為 null 表示此操作沒有任何效果。
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     * 為了線程調(diào)度,在許可可用之前阻塞當(dāng)前線程。 
     * 如果許可可用,則使用該許可,并且該調(diào)用立即返回;
     * 否則,為線程調(diào)度禁用當(dāng)前線程,并在發(fā)生以下三種情況之一以前,使其處于休眠狀態(tài):
     *  1. 其他某個線程將當(dāng)前線程作為目標調(diào)用 unpark
     *  2. 其他某個線程中斷當(dāng)前線程
     *  3. 該調(diào)用不合邏輯地(即毫無理由地)返回
     */
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    /**
     * 和park()方法類似,不過增加了等待的相對時間
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     * 和park()方法類似,不過增加了等待的絕對時間
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }
    
    /**
     * 和park()方法類似,只不過增加了暫停的同步對象
     * @param blocker 導(dǎo)致此線程暫停的同步對象
     * @since 1.6
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
    
    /**
     * parkNanos(long nanos)方法類似,只不過增加了暫停的同步對象
     * @param blocker 導(dǎo)致此線程暫停的同步對象
     * @since 1.6
     */
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    
    /**
     * parkUntil(long deadline)方法類似,只不過增加了暫停的同步對象
     * @param blocker 導(dǎo)致此線程暫停的同步對象
     * @since 1.6
     */
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

4、例子

看完LockSupport的源碼,我們來動手寫幾個例子來驗證一下猜想是否正確。

4.1、先park再unpark

public class LockSupportTest {

    public static void main(String[] args) throws InterruptedException {
        String a = new String("A");
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("睡覺");
                LockSupport.park(a);
                System.out.println("起床");
            }
        });
        t.setName("A-Name");
        t.start();
        Thread.sleep(300000);
        System.out.println("媽媽喊我起床");
        LockSupport.unpark(t);
    }
}

輸出結(jié)果:

睡覺
媽媽喊我起床
起床

不過在等待的過程中,我們可以用jstack查看是否能夠打印出檢測的對象A,找到A-Name這個線程確實看到了等待一個String對象

~ jps
5589 LockSupportTest

~ jstack 5589
"A-Name" #11 prio=5 os_prio=31 tid=0x00007fc143009800 nid=0xa803 waiting on condition [0x000070000c233000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076adf4d30> (a java.lang.String)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at com.github.locksupport.LockSupportTest$1.run(LockSupportTest.java:18)
        at java.lang.Thread.run(Thread.java:745)

驗證完unpark,接著我們來驗證一下interrupt。

4.2、先interrupt再park

public class LockSupportTest {

    public static void main(String[] args) throws InterruptedException {
        String a = new String("A");
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("睡覺");
                LockSupport.park(a);
                System.out.println("起床");
                System.out.println("是否中斷:" + Thread.currentThread().isInterrupted());
            }
        });
        t.setName("A-Name");
        t.start();
        t.interrupt();
        System.out.println("突然肚子很疼");
    }
}

可以看到中斷后執(zhí)行park會直接執(zhí)行下面的方法,并不會拋出InterruptedException,輸出結(jié)果如下:

突然肚子很疼
睡覺
起床
是否中斷:true

4.3、先unpark再park

public class LockSupportTest {

    public static void main(String[] args) throws InterruptedException {
        String a = new String("A");
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("睡覺");
                LockSupport.park(a);
                System.out.println("7點到,起床");
            }
        });
        t.setName("A-Name");
        t.start();
        LockSupport.unpark(t);
        System.out.println("提前上好鬧鐘7點起床");
    }
}

按照上面說過的,先設(shè)置好許可(unpark)再獲取許可的時候不會進行等待,正如我們說的那樣輸出如下:

提前上好鬧鐘7點起床
睡覺
7點到,起床

4、思考一個問題

看完源碼后,是不是覺得LockSupport.park()和unpark()和object.wait()和notify()很相似,那么它們有什么區(qū)別呢?

  1. 面向的主體不一樣。LockSuport主要是針對Thread進進行阻塞處理,可以指定阻塞隊列的目標對象,每次可以指定具體的線程喚醒。Object.wait()是以對象為緯度,阻塞當(dāng)前的線程和喚醒單個(隨機)或者所有線程。
  2. 實現(xiàn)機制不同。雖然LockSuport可以指定monitor的object對象,但和object.wait(),兩者的阻塞隊列并不交叉??梢钥聪聹y試例子。object.notifyAll()不能喚醒LockSupport的阻塞Thread.

4、參考

Java的LockSupport.park()實現(xiàn)分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • LockSupport,構(gòu)建同步組件的基礎(chǔ)工具,幫AQS完成相應(yīng)線程的阻塞或者喚醒的工作。 LockSupport...
    miaoLoveCode閱讀 15,560評論 3 23
  • 一個簡單的單例示例 單例模式可能是大家經(jīng)常接觸和使用的一個設(shè)計模式,你可能會這么寫 publicclassUnsa...
    Martin說閱讀 2,396評論 0 6
  • 在店鋪梳理毛毛蟲鞋預(yù)定的時候,我的本子上清晰的寫下了兩個人的名字,一個是我小侄女,另一個就是墩墩。第一個先鏈接了我...
    馬駿輝閱讀 1,540評論 1 1
  • 方的冬季比較寒冷,荒人村的村民這個時候都進入農(nóng)閑時分,男人們有些會在村頭曬著太陽,有些三五成群聚在一起也會打打撲克...
    棟華閱讀 684評論 0 2

友情鏈接更多精彩內(nèi)容