Java 程序員眼里的 Linux 內(nèi)核 —— wait_event 源碼分析

看 Linux 的 wait_event 源碼時,聯(lián)想到我們平時經(jīng)常用得比較多的 wait/notify、double-check 和 volatile,突然意識 wait_event 簡簡單單幾行代碼的背后,涉及的知識點其實非常豐富。本篇文章我們就一起了來探索它背后的知識,然后嘗試著和我們的日常開發(fā)關(guān)聯(lián)起來。

wait_event

這里使用 Linux-2.6.24 版本的源碼

背景

在某些情況下,我們會需要等待某個事件,在這個事件發(fā)生前,把進(jìn)程投入睡眠。比方說,同步寫 IO;在發(fā)出寫磁盤命令后,進(jìn)程要進(jìn)入休眠,等等磁盤完成。為了支持這一類場景,Linux 引入了 wait queue;wait queue 從概念上跟我們應(yīng)用層使用的 condition queue 是一樣的。

實現(xiàn)

這里我們著重講 wait_event 的實現(xiàn),一些相關(guān)的知識讀者可以參考《深入理解LINUX內(nèi)核》。

下面我們開始看代碼:

// ${linux_source}/include/linux/wait.h

/**
 * wait_event - sleep until a condition gets true
 * @wq: the waitqueue to wait on
 * @condition: a C expression for the event to wait for
 *
 * The process is put to sleep (TASK_UNINTERRUPTIBLE) until the
 * @condition evaluates to true. The @condition is checked each time
 * the waitqueue @wq is woken up.
 *
 * wake_up() has to be called after changing any variable that could
 * change the result of the wait condition.
 */
#define wait_event(wq, condition)       \
do {                                    \
    if (condition)                      \
        break;                          \
    __wait_event(wq, condition);        \
} while (0)

這里只是先檢測一遍條件,然后直接又調(diào)用 __wait_event

// ${linux_source}/include/linux/wait.h
#define __wait_event(wq, condition)                             \
do {                                                            \
    DEFINE_WAIT(__wait);                                        \
                                                                \
    for (;;) {                                                  \
        prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE);    \
        if (condition)                                          \
            break;                                              \
        // schedule 使用調(diào)度器調(diào)度另一個線程去執(zhí)行。當(dāng)前線程被重新      \
        // 調(diào)度時,schedule 函數(shù)才會返回                            \
        schedule();                                             \
    }                                                           \
    finish_wait(&wq, &__wait);                                  \
} while (0)

DEFINE_WAIT 宏用于定義局部變量 __wait

// ${linux_source}/include/linux/wait.h
#define DEFINE_WAIT(name)                                   \
    wait_queue_t name = {                                   \
        .private    = current,                              \
        .func       = autoremove_wake_function,             \
        .task_list  = LIST_HEAD_INIT((name).task_list),     \
    }

prepare_to_waitfinish_wait 源碼如下:

// ${linux_source}/kernel/wait.c
/*
 * Note: we use "set_current_state()" _after_ the wait-queue add,
 * because we need a memory barrier there on SMP, so that any
 * wake-function that tests for the wait-queue being active
 * will be guaranteed to see waitqueue addition _or_ subsequent
 * tests in this thread will see the wakeup having taken place.
 *
 * The spin_unlock() itself is semi-permeable and only protects
 * one way (it only protects stuff inside the critical region and
 * stops them from bleeding out - it would still allow subsequent
 * loads to move into the critical region).
 */
void fastcall
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
    unsigned long flags;

    // 非獨占等待(可以同時喚醒多個進(jìn)程)
    wait->flags &= ~WQ_FLAG_EXCLUSIVE;
    // 加鎖
    spin_lock_irqsave(&q->lock, flags);
    // wait 不存在于某個等待隊列時,才把它加入 q
    // wait 是我們新定義的,list_empty 將會返回 true
    if (list_empty(&wait->task_list))
        __add_wait_queue(q, wait);
    /*
     * don't alter the task state if this is just going to
     * queue an async wait queue callback
     */
    // 根據(jù) wait 的定義,is_sync_wait() 這里會返回 true
    if (is_sync_wait(wait))
        // 前面注釋說使用 set_current_state() 作為屏障,對此不理解的讀者可以暫時忽略,
        // 后面我們會舉例說明相關(guān)的用法
        set_current_state(state);
    // 解鎖
    spin_unlock_irqrestore(&q->lock, flags);
}

/*
 * Used to distinguish between sync and async io wait context:
 * sync i/o typically specifies a NULL wait queue entry or a wait
 * queue entry bound to a task (current task) to wake up.
 * aio specifies a wait queue entry with an async notification
 * callback routine, not associated with any task.
 */
#define is_sync_wait(wait)    (!(wait) || ((wait)->private))

void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
    unsigned long flags;

    __set_current_state(TASK_RUNNING);
    /*
     * We can check for list emptiness outside the lock
     * IFF:
     *  - we use the "careful" check that verifies both
     *    the next and prev pointers, so that there cannot
     *    be any half-pending updates in progress on other
     *    CPU's that we haven't seen yet (and that might
     *    still change the stack area.
     * and
     *  - all other users take the lock (ie we can only
     *    have _one_ other CPU that looks at or modifies
     *    the list).
     */
    if (!list_empty_careful(&wait->task_list)) {
        spin_lock_irqsave(&q->lock, flags);
        list_del_init(&wait->task_list);
        spin_unlock_irqrestore(&q->lock, flags);
    }
}

概括來講,prepare_to_wait 把自己加入等待隊列,finish_wait 則把自己從隊列里移除。但由于 prepare_to_wait 可能會被調(diào)用多次,如果判斷 wait 已經(jīng)處于某個隊列中,則不會重復(fù)添加。

條件、條件隊列和鎖

對于像我一樣平時使用 Java 比較多的讀者,對下面這一段代碼一定不會覺得陌生:

synchronized (this) {
    while (!condition) {
        wait();
    }
    // do your stuff
}

這里我們不禁要問,應(yīng)用層的代碼可以這樣簡潔,為什么內(nèi)核的就不行?這里我們先來大概了解一下條件隊列,然后再回答這個問題。

所謂的條件隊列/等待隊列,一般由 3 個成分組成:

  1. 一個隊列,用于存放等待條件/事件的線程。在應(yīng)用層,一般我們叫他條件隊列(condition queue),LINUX 內(nèi)核叫他 wait queue
  2. 一個鎖,用于保護(hù)這個隊列
  3. 一個謂詞(它的計算結(jié)果為 bool 值)用作條件,即前面示例代碼的 condition

Java 程序員們在這里需要特別注意的是,我說的鎖的作用是保護(hù)條件隊列?;仡櫸覀兂懙?Java 代碼,一般這個鎖也用來保護(hù)謂詞,但這個不是必須的。Java 要求我們在調(diào)用 wait 的時候必須持有鎖的原因之一是,wait 的內(nèi)部會把當(dāng)前線程加入條件隊列;修改列表必須持有鎖(另一個原因是,wait 的語義之一便是執(zhí)行后會釋放鎖,如果都不持有,何來的釋放呢)。

但在另一面,喚醒條件隊列上的線程卻不一定需要持有鎖,雖然 Java 要求我們必須持有鎖才能調(diào)用 notify。持有鎖調(diào)用 notify 的好處在于,notify 后條件不會改變。同時,如果持有鎖的話,這個操作里也可以把相關(guān)線程從條件隊列里刪除。不好的地方在于,調(diào)用 notify 的線程在執(zhí)行喚醒操作的時候還持有鎖,被喚醒線程這個時候如果被內(nèi)核調(diào)度,他的獲取鎖的操作將失敗(會導(dǎo)致該線程又進(jìn)入睡眠狀態(tài))。這種實現(xiàn)方式性能上可能差一點,但代碼更安全。

不要求調(diào)用 notify 時持有鎖的一個例子是 pthread。這種方式的問題在于,在 notify 還沒執(zhí)行完的時候,條件可能就發(fā)生了變化??赡艿膶崿F(xiàn)是,只設(shè)置線程為可執(zhí)行狀態(tài),等線程獲得鎖后自己把自己從隊列里面移除。

了解了相關(guān)的數(shù)據(jù)結(jié)構(gòu)后,不難猜想 Java 里 wait 的實現(xiàn)??紤]一種應(yīng)用層 wait 的實現(xiàn)如下:

void wait() {
    add_to_condition_queue();
    unlock();
    schedule();
}

wait 方法做一下內(nèi)聯(lián)(inlining)處理,可以得到:

lock();
while (!condition) {
    add_to_condition_queue();
    unlock();
    schedule();
}
// do your stuff

對比一下內(nèi)核的 wait_event

void our_wait_event() {
    if (condition) return;

    for (;;) {
        // 如果你喜歡,換成 condition_queue 也可以
        add_to_wait_queue_if_not_added_yet();
        if (condition)
            break;
        schedule();
    }
    remove_from_wait_queue();
}

可以看到,內(nèi)核把代碼寫得更復(fù)雜的好處在于,它在切換進(jìn)程前可以再檢查一次條件,如果條件已經(jīng)滿足,就不需要執(zhí)行 schedule 了。切換進(jìn)程需要保存當(dāng)前進(jìn)程的上下文,同時會導(dǎo)致 TLB、Cache 等一系列緩存時效,因此內(nèi)核總是盡量避免不必要的線程切換,而代價就是更復(fù)雜的代碼。

double-check

首先,如果你也和我一樣覺得 our_wait_event 里面兩個 if 有點難看,我們不妨試著來給他改一改:

void our_wait_event2() {
    while (!condition) {
        add_to_wait_queue_if_not_added_yet();
        schedule();
    }
    remove_from_wait_queue();
}

咋一看好像沒什么問題,都是一樣的檢測條件,在條件不滿足的情況下加入等待隊列,調(diào)用 schedule。重要的是,上面這段代碼更簡潔,更易讀。那么,他正確嗎?

不消說,肯定是有問題的,不然那班內(nèi)核程序員不會不知道該這么寫。那問題究竟出在哪里呢?

考慮下面兩個執(zhí)行流:

thread1                              thread2
-----------------------           --------------
check condition => false
add_to_wait_queue()

                                  alter_condition()
                                  notify_all()

state = TASK_UNINTERRUPTABLE
schedule()

thread1 在把自己加入等待隊列后,schedule 前,thread2 就更改了條件并且調(diào)用 notify。在這種情況下,如果沒有其他線程再次調(diào)用 notify,thread1 將會永遠(yuǎn)休眠(而 thread2 認(rèn)為自己已經(jīng) noitfy 過 thread1 了)。

為了防止發(fā)生這種情況,在添加到等待隊列后,thread1 還應(yīng)該再檢查一次條件,如果條件滿足,直接把自己從隊列里移除就可以了。

為了方便讀者把 wait_event 和 double-check 聯(lián)系起來,下面我們看一段使用 double-check 實現(xiàn)的 Java 的單例的例子:

public static SomeClass getInstance() {
    if (sInstance == null) {
        synchronized (SomeClass.class) {
            if (sInstance == null) {
                sInstance = new SomeClass();
            }
        }
    }
    return sInstance;
}

兩者的共同點都是先檢測一遍條件是否成立,然后設(shè)置一個“安全閥”。在持有這個安全閥時,再一次檢測條件是否滿足。double-check 的多線程安全性都源于這個安全閥。就 wait_event 來說,當(dāng)我們把自己加入等待隊列后,就可以保證不會丟失另一個線程的 notify。而創(chuàng)建單例時,加鎖保證了第二次判斷后不會有另一個線程同時創(chuàng)建對象。(可能說得有點抽象,如果讀者不明白,直接跳過就好。只要讀者能夠完成下面的小測驗,那就是懂得 double-check 的)。

double-check 小測驗

假設(shè)有這樣一個方法,他可以用來下載文件:

interface DownloadCallback {
    void onSuccess(File file);
}

public void download(String url, DownloadCallback callback) {
    // ...
}

我們又假設(shè),可能同時有多個客戶會調(diào)用這個接口下載同一個文件。為了避免同時下載同一個文件,我們可以在下載時判斷一下當(dāng)前是否已經(jīng)有任務(wù)在下載:


interface DownloadCallback {
    void onSuccess(File file);
}

private class DownloadTask {
    // guarded by itself
    private final List<DownloadCallback> mCallbacks = new ArrayList<>();

    public DownloadTask(String url, DownloadCallback callback) {
        mCallbacks.add(callback);
    }

    public void download() {
        // downloading ...
        File file = new File("downloaded-file");

        // bonus: 為什么需要拷貝 callback 列表?
        List<DownloadCallback> copy;
        synchronized (mCallbacks) {
            copy = new ArrayList<>(mCallbacks);
            mCallbacks.clear();
        }
        for (DownloadCallback callback : copy) {
            callback.onSuccess(file);
        }
    }

    public void addCallback(DownloadCallback callback) {
        synchronized (mCallbacks) {
            mCallbacks.add(callback);
        }
    }
}


private final ConcurrentMap<String, DownloadTask> mTasks = new ConcurrentHashMap<>();

public void download(String url, DownloadCallback callback) {
    File file = new File(getDestFilePath(url));
    if (file.exists()) {
        // 已經(jīng)存在則不需要下載了
        callback.onSuccess(file);
        return;
    }
    DownloadTask task = new DownloadTask(url, callback);
    DownloadTask downloadingTask = mTasks.putIfAbsent(url, task);
    if (downloadingTask == null) {
        // 沒有正在下載的任務(wù)時才需要下載
        task.download();
        return;
    }
    // 加入正在下載的任務(wù)的 callback 列表,
    downloadingTask.addCallback(callback);
}

private String getDestFilePath(String url) {
    return "url-to-file-path...";
}

為了檢驗讀者是否真正理解 wait_event,你可以嘗試著解決上面代碼里存在的競爭條件。如果一時沒能發(fā)現(xiàn)其中的問題,建議讀者再從頭讀一遍文章。為了鼓勵讀者獨立思考、與他人交流,這里我就順勢偷個懶不公布答案了。畢竟,在實際工作中,可不是總會有人告訴你你的代碼寫得是否正確。

內(nèi)存屏障一瞥

所謂的內(nèi)存屏障,主要分為 3 種:

  1. read memory barrier(rmb),保證屏障前的讀發(fā)生在屏障后的讀操作之前
  2. write memory barrier(wmb),保證屏障前的寫發(fā)生在屏障后的寫操作之前
  3. full memory barrier(mb),保證屏障前的讀寫操作發(fā)生在屏障后的讀寫操作之前

前面 prepare_to_wait 有這么一段注釋:

/*
 * Note: we use "set_current_state()" _after_ the wait-queue add,
 * because we need a memory barrier there on SMP, so that any
 * wake-function that tests for the wait-queue being active
 * will be guaranteed to see waitqueue addition _or_ subsequent
 * tests in this thread will see the wakeup having taken place.
 *
 * The spin_unlock() itself is semi-permeable and only protects
 * one way (it only protects stuff inside the critical region and
 * stops them from bleeding out - it would still allow subsequent
 * loads to move into the critical region).
 */

這段注釋一開始我也是看得云里霧里,直到我找到了他們解決一個內(nèi)核 bug 的郵件(Google 大法好)。

這里面的 tests for the wait-queue being active 可以根據(jù) waitqueue_active 來理解,其實指的就是等待隊列不為空。spin-lock 雖然可以防止數(shù)據(jù)競爭,但如果別人在檢查的時候不去獲取鎖呢?(waitqueue_active 就沒有加鎖)。當(dāng)然,不加鎖可以獲得更好的性能。
j

static inline int waitqueue_active(wait_queue_head_t *q)
{
    return !list_empty(&q->task_list);
}

set_current_state 使用 set_mb 來設(shè)置當(dāng)前進(jìn)程的狀態(tài)。

/*
 * set_current_state() includes a barrier so that the write of current->state
 * is correctly serialised wrt the caller's subsequent test of whether to
 * actually sleep:
 *
 *    set_current_state(TASK_UNINTERRUPTIBLE);
 *    if (do_i_need_to_sleep())
 *        schedule();
 *
 * If the caller does not need such serialisation then use __set_current_state()
 */
#define __set_current_state(state_value)                \
    do { current->state = (state_value); } while (0)
#define set_current_state(state_value)        \
    set_mb(current->state, (state_value))

下面是文檔對 set_mb 的描述:

 (*) set_mb(var, value)

     This assigns the value to the variable and then inserts a full memory
     barrier after it, depending on the function.  It isn't guaranteed to
     insert anything more than a compiler barrier in a UP compilation.

set_current_state 在設(shè)置當(dāng)前進(jìn)程的狀態(tài)后,會插入一個 mb。前面我們了解到,這將禁止 CPU 將 set_current_state 后面的 load/store 提前。

為了理解完全理解這里 set_mb 的使用,我們還需要再參考一下 wake_up 函數(shù)。但由于篇幅關(guān)系,這里我只是簡單介紹它的實現(xiàn):

for_each_process_in_wait_queue_without_lock:
    if process.state is sleeping:
        wake it up
        remove it from wait-queue

假設(shè)在 prepare_to_wait 里面我使用的是平凡的 __set_current_state,那么 CPU 就可以把 prepare_to_wait 函數(shù)返回后我們所執(zhí)行的對條件的判斷提前到設(shè)置進(jìn)程狀態(tài)前。這種情況下,如果發(fā)生以下的執(zhí)行序列,CPU2 將會丟失一個 wake-up,他有可能會永遠(yuǎn)休眠。

CPU0                    CPU1
----------------        ------------------
                        check_condition() => false
condition = true
wake_up()
                        __set_current_state()
                        schedule()

解決辦法就是引入一個 mb。在下面的例子中,如果 __set_current_state()wake_up() 后執(zhí)行,CPU1 上的這個線程將不會被喚醒,但隨后的 check_condition() 會正確返回 true。反過來,如果 __set_current_state()wake_up() 前執(zhí)行,check_condition() 可能返回 true 也可能返回 false,但無論如何,他都不會丟失隨后的 wake_up()。

CPU0                    CPU1
----------------        ------------------
condition = true
wake_up()
                        __set_current_state()
                        mb();
                        check_condition() => true

還記得嗎,set_current_state 就是在設(shè)置進(jìn)程狀態(tài)后插入一個內(nèi)存屏障,所以 prepare_to_wait 直接使用 set_current_state 就可以了。

現(xiàn)在,我們終于可以說自己完全理解 wait_event 了。也許讀者是第一次接觸內(nèi)存屏障,但我敢保證,很多 Java 程序員在不知不覺中使用過一定形式上的屏障。下面我們看一個例子:

private int mSomeData;          // guarded by mDataSet
private int mSomeOtherData;     // guarded by mDataSet
private volatile boolean mDataSet;

public void foo() {
    if (mDataSet) {
        // you can now use mSomeData, mSomeOtherData
    }
}

public void bar() {
    mSomeData = 1;
    mSomeOtherData = 2;
    mDataSet = true;
}

有一定開發(fā)經(jīng)驗的讀者很可能看過類似的代碼。雖然我們沒有在 mSomeDatamSomeOtherData 的讀寫上做顯式的同步,但只要仔細(xì)編寫代碼,利用一個 volatile 變量 mDataSet,這段代碼也可以是線程安全的。

為了避免引入內(nèi)存屏障這個比較復(fù)雜的概念(并且提供更好的移植性),Java 使用一個 happens-before 來描述相關(guān)的概念。關(guān)于 volatile 有這么一條描述:

A write to a volatile field happens-before every subsequent read of that field.

另外,對于同一個線程,有:

If x and y are actions of the same thread and x comes before y in program order, then x happen before y.

同時,happens-before 具有傳遞性(x -> y, y -> z, x -> z),所以就有了下面這個結(jié)論:

mSomeData, mSomeOtherData 的寫操作在 mDataSet = true 之前;mDataSet = true 在隨后另一個線程對他的讀操作之前;所以 mSomeData, mSomeOtherData 的寫操作在隨后對 mDataSet 的讀操作之前。

直白一點說,只要一個線程看到 mDataSettrue,那他就一定能夠正確讀取到 mSomeData, mSomeOtherData 的值。

如果顯式使用內(nèi)存屏障,上面的代碼就相當(dāng)于:

private int mSomeData;
private int mSomeOtherData;
private boolean mDataSet;

public void foo() {
    if (mDataSet) {
        // 這個讀內(nèi)存屏障保證我們讀到 mDataSet 后,也能讀到 mSomeData/mSomeOtherData
        // 的最新值
        rmb();
        // you can now use mSomeData, mSomeOtherData
    }
}

public void bar() {
    mSomeData = 1;
    mSomeOtherData = 2;
    // 寫內(nèi)存屏障保證對 mSomeData/mSomeOtherData 的寫在 mDataSet = true 之前執(zhí)行
    wmb();
    mDataSet = true;
}

最后,墻裂推薦一本并行編程的神書《Is Parallel Programming Hard, And, If So, What Can You Do About It?》(可以免費獲取),書里有關(guān)于內(nèi)存屏障的最好的描述。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容