深入linux內(nèi)核架構(gòu)--內(nèi)核鎖

簡介

在多線程(進(jìn)程)并發(fā)模型中,多個(gè)線程如果同時(shí)處理同一塊數(shù)據(jù)的話,會(huì)引發(fā)競態(tài)問題,以及隨之而來的線程安全問題。而鎖是解決線程安全的重要手段,其中主要包括原子性操作atomic,自旋鎖spin_lock,信號(hào)量semaphore,互斥信號(hào)量mutex,讀寫鎖rw_lock等等。

臨界區(qū)

臨界區(qū)是指一個(gè)訪問共用資源(例如:共用設(shè)備或是共用存儲(chǔ)器)的程序片段,而這些共用資源又無法同時(shí)被多個(gè)訪問的特性。
當(dāng)一個(gè)進(jìn)程進(jìn)入一段代碼時(shí)被中斷,而另一個(gè)進(jìn)程剛好也進(jìn)入相關(guān)代碼片段操作了同一塊數(shù)據(jù)導(dǎo)致數(shù)據(jù)的不正確,從而導(dǎo)致了進(jìn)程工作不正確。最簡單的方式是關(guān)中斷,但這容易出問題,而且在多核操作系統(tǒng)中,關(guān)中斷并不能阻止其他CPU上跑了同一代碼片段,因此必須有些機(jī)制來保證只有一個(gè)進(jìn)程進(jìn)入臨界區(qū)問題,我們稱之為互斥。

原子操作

atomic是最簡單的鎖操作,可以保證最簡單的操作,諸如計(jì)數(shù)器加1之類的,可以不被中斷的執(zhí)行代碼,在SMP系統(tǒng)中需要特殊的鎖機(jī)制來保證只在一個(gè)CPU上執(zhí)行,在單CPU上通過關(guān)中斷可以很好實(shí)現(xiàn)也很好理解atomic,但是SMP系統(tǒng)中的鎖保證就會(huì)稍微理解起來比較復(fù)雜一點(diǎn)。這種特殊的鎖指令比較偏硬件底層,就不細(xì)探究了。我們簡單看一下atomic相關(guān)數(shù)據(jù)結(jié)構(gòu)和對于int相關(guān)操作。(相關(guān)操作與具體的體系結(jié)構(gòu)有關(guān),這是因?yàn)椴煌捏w系結(jié)構(gòu)鎖指令不一樣)

typedef struct {
    int counter;
} atomic_t;
// x86
static inline void atomic_inc(atomic_t *v)
{
    asm_volatile(LOCK_PREFIX "incl %0"
             : "+m" (v->counter));
}
#ifdef CONFIG_SMP
#define LOCK_PREFIX_HERE \
        ".pushsection .smp_locks,\"a\"\n"   \
        ".balign 4\n"               \
        ".long 671f - .\n" /* offset */     \
        ".popsection\n"             \
        "671:"

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

#else /* ! CONFIG_SMP */
#define LOCK_PREFIX_HERE ""
#define LOCK_PREFIX ""
#endif

static __always_inline int arch_atomic_read(const atomic_t *v)
{
    /*
     * Note for KASAN: we deliberately don't use READ_ONCE_NOCHECK() here,
     * it's non-inlined function that increases binary size and stack usage.
     */
    return READ_ONCE((v)->counter);
}
#define __READ_ONCE_SIZE                        \
({                                  \
    switch (size) {                         \
    case 1: *(__u8 *)res = *(volatile __u8 *)p; break;      \
    case 2: *(__u16 *)res = *(volatile __u16 *)p; break;        \
    case 4: *(__u32 *)res = *(volatile __u32 *)p; break;        \
    case 8: *(__u64 *)res = *(volatile __u64 *)p; break;        \
    default:                            \
        barrier();                      \
        __builtin_memcpy((void *)res, (const void *)p, size);   \
        barrier();                      \
    }                               \
})

atomic的數(shù)據(jù)結(jié)構(gòu)其實(shí)就是一個(gè)int 變量,讀操作通過volatile讀取(相關(guān)內(nèi)容見內(nèi)存屏障,由于只涉及到一個(gè)操作,不需要額外保證),而inc操作通過lock指令+inc指令聯(lián)合實(shí)現(xiàn)。這樣就可以實(shí)現(xiàn)對于

自旋鎖

自旋鎖用于保護(hù)執(zhí)行很快的鎖操作,等待鎖的時(shí)間不會(huì)太長,所以不釋放CPU,自循環(huán)忙等直到獲取鎖后繼續(xù)執(zhí)行臨界區(qū)代碼。被自旋鎖保護(hù)的代碼在獲取鎖期間必須得是完整執(zhí)行的(不能被其他進(jìn)程打斷,類似于原子性的),如果被自旋鎖保護(hù)的臨界區(qū)代碼執(zhí)行到一半被打斷,就會(huì)引起不正確狀態(tài)。自旋鎖會(huì)通過關(guān)搶占來保證不被打斷;而且使用者也要保證被臨界區(qū)代碼不能主動(dòng)睡眠(這樣可能會(huì)被別人搶占,打破了原子性);自旋鎖是不可重入的
相關(guān)數(shù)據(jù)結(jié)構(gòu)和代碼:(省略了debug相關(guān)難懂的代碼)

typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
                ... // debug related struct
    };
} spinlock_t;
typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
        ... // debug related struct
} raw_spinlock_t;

// arm
typedef struct {
    union {
        u32 slock;
        struct __raw_tickets {
#ifdef __ARMEB__
            u16 next;
            u16 owner;
#else
            u16 owner;
            u16 next;
#endif
        } tickets;
    };
} arch_spinlock_t;

spin_lock => raw_spin_lock => _raw_spin_lock => __raw_spin_lock => do_raw_spin_lock => arch_spin_lock 

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
    unsigned long tmp;
    u32 newval;
    arch_spinlock_t lockval;
    prefetchw(&lock->slock);
    __asm__ __volatile__( // 這段匯編要做的事情就是原子性操作 localval = lock->tickets.next++
"1: ldrex   %0, [%3]\n" // 標(biāo)記lock->slock獨(dú)占訪問,lockval = lock->slock
"   add %1, %0, %4\n"   // newval = lockval + (1 << 16)  =>  next ++
"   strex   %2, %1, [%3]\n" //  lock->slock = newval; tmp=0 清除獨(dú)占訪問標(biāo)記
"   teq %2, #0\n" //  tmp == 0
"   bne 1b" // tmp != 0,strex操作失敗,自旋重試
    : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
    : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    : "cc");
    while (lockval.tickets.next != lockval.tickets.owner) { // 通過判斷l(xiāng)ocalval ==
lock->tickets.owner 來判斷鎖是否由當(dāng)前CPU占有
        wfe();
        lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
    }
    smp_mb();
}

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    smp_mb();
    lock->tickets.owner++; // 釋放鎖
    dsb_sev();
}

arm的自旋鎖邏輯比較簡單,所以就以arm為例來說一下spinlock的實(shí)現(xiàn),在arm的arch_spinlock_t中額外維護(hù)了own和next兩個(gè)變量來判斷當(dāng)前鎖獲取者是誰,own記錄了當(dāng)前鎖的占有者,next記錄了當(dāng)前CPU的排隊(duì)序號(hào)。每次lock時(shí)將next自增,并記錄自己占有的next,稱之為localnext,unlock時(shí)own自增,如果own != localnext說明當(dāng)前鎖由其他CPU占有,等待其他CPU釋放鎖,直到own與localnext相等,則判斷為當(dāng)前CPU是鎖的占有者。

信號(hào)量

實(shí)質(zhì)上信號(hào)量只是受保護(hù)的特別變量,能夠表示為正負(fù)數(shù),初始化為1。為操作信號(hào)量定義了兩個(gè)標(biāo)準(zhǔn)操作,up 和 down。在進(jìn)程進(jìn)入臨界區(qū)時(shí),執(zhí)行down操作,該操作會(huì)將值減1,如果之前已經(jīng)是負(fù)數(shù)則會(huì)將當(dāng)前進(jìn)程放入等待隊(duì)列中,并將當(dāng)前進(jìn)程標(biāo)記為UnInterruptable or Interruptable狀態(tài)進(jìn)入block狀態(tài),等待被喚醒重新調(diào)度;獲取到信號(hào)量的進(jìn)程在退出臨界區(qū)時(shí),執(zhí)行up操作,該操作將值加1,并從等待隊(duì)列中選取一個(gè)進(jìn)程喚醒。
相關(guān)數(shù)據(jù)結(jié)構(gòu)和代碼如下:

struct semaphore {
    raw_spinlock_t      lock; // 自旋鎖,用于更新count及wait_list
    unsigned int        count;      // 信號(hào)量值
    struct list_head    wait_list;  // 等待隊(duì)列
};
void down(struct semaphore *sem)
{
    unsigned long flags;
    raw_spin_lock_irqsave(&sem->lock, flags);
    if (likely(sem->count > 0)) // 傾向于有足夠的count可供使用
        sem->count--;
    else
        __down(sem); // 需要block當(dāng)前process
    raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __down(struct semaphore *sem)
{
    __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
                                long timeout)
{
    struct semaphore_waiter waiter;

    list_add_tail(&waiter.list, &sem->wait_list); // 放入等待隊(duì)列
    waiter.task = current;
    waiter.up = false;
    for (;;) {
        if (signal_pending_state(state, current)) // 被中斷喚醒
            goto interrupted;
        if (unlikely(timeout <= 0))
            goto timed_out;
        __set_current_state(state);  // TASK_UNINTERRUPTIBLE
        raw_spin_unlock_irq(&sem->lock);
        timeout = schedule_timeout(timeout); // 啟動(dòng)超時(shí)定時(shí)器,超時(shí)后被喚醒并通過調(diào)用schedule放棄CPU調(diào)度。(定時(shí)器相關(guān)目前還沒怎么看)
        raw_spin_lock_irq(&sem->lock);
        if (waiter.up)
            return 0;
    }

 timed_out:
    list_del(&waiter.list);
    return -ETIME;

 interrupted:
    list_del(&waiter.list);
    return -EINTR;
}

signed long __sched schedule_timeout(signed long timeout)
{
    struct process_timer timer;
    unsigned long expire;

    switch (timeout)
    {
    case MAX_SCHEDULE_TIMEOUT:
        /*
         * These two special cases are useful to be comfortable
         * in the caller. Nothing more. We could take
         * MAX_SCHEDULE_TIMEOUT from one of the negative value
         * but I' d like to return a valid offset (>=0) to allow
         * the caller to do everything it want with the retval.
         */
        schedule();
        goto out;
    default:
        /*
         * Another bit of PARANOID. Note that the retval will be
         * 0 since no piece of kernel is supposed to do a check
         * for a negative retval of schedule_timeout() (since it
         * should never happens anyway). You just have the printk()
         * that will tell you if something is gone wrong and where.
         */
        if (timeout < 0) {
            printk(KERN_ERR "schedule_timeout: wrong timeout "
                "value %lx\n", timeout);
            dump_stack();
            current->state = TASK_RUNNING;
            goto out;
        }
    }

    expire = timeout + jiffies;

    timer.task = current;
    timer_setup_on_stack(&timer.timer, process_timeout, 0); // 初始化定時(shí)器,到點(diǎn)后執(zhí)行process_timeout喚醒當(dāng)前task,
    __mod_timer(&timer.timer, expire, 0);
    schedule(); // 放棄cpu,調(diào)度next se,相關(guān)內(nèi)容見核心調(diào)度器
    del_singleshot_timer_sync(&timer.timer);
    /* Remove the timer from the object tracker */
    destroy_timer_on_stack(&timer.timer); 
    timeout = expire - jiffies;
 out:
    return timeout < 0 ? 0 : timeout;
}
static void process_timeout(struct timer_list *t)
{
    struct process_timer *timeout = from_timer(timeout, t, timer);
    wake_up_process(timeout->task);  // 喚醒task
}

void up(struct semaphore *sem)
{
    unsigned long flags;
    raw_spin_lock_irqsave(&sem->lock, flags);
    if (likely(list_empty(&sem->wait_list))) // 更傾向于沒有等待鎖的
        sem->count++;
    else
        __up(sem); // 喚醒task
    raw_spin_unlock_irqrestore(&sem->lock, flags);
}

static noinline void __sched __up(struct semaphore *sem)
{
    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
                        struct semaphore_waiter, list);
    list_del(&waiter->list);
    waiter->up = true; // 修改對應(yīng)waiter狀態(tài)
    wake_up_process(waiter->task); // 喚醒對應(yīng)task
}

如果對CPU調(diào)度有大致理解的話不難理解相關(guān)代碼,其中主要關(guān)注的就是schedule函數(shù),這個(gè)函數(shù)大致語義就是調(diào)度當(dāng)前CPU下一個(gè)task,不會(huì)將當(dāng)前task放入等待隊(duì)列rq。

這里需要單獨(dú)拿出來額外解析一下wake_up_process函數(shù)。后續(xù)我也會(huì)借助java中的鎖來分析一下user space lock與內(nèi)核是如何協(xié)同工作的。

int wake_up_process(struct task_struct *p)
{
    return try_to_wake_up(p, TASK_NORMAL, 0);
}
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
    unsigned long flags;
    int cpu, success = 0;
    /*
     * If we are going to wake up a thread waiting for CONDITION we
     * need to ensure that CONDITION=1 done by the caller can not be
     * reordered with p->state check below. This pairs with mb() in
     * set_current_state() the waiting thread does.
     */
    raw_spin_lock_irqsave(&p->pi_lock, flags);
    smp_mb__after_spinlock();
    if (!(p->state & state))
        goto out;
    trace_sched_waking(p);
    /* We're going to change ->state: */
    success = 1;
    cpu = task_cpu(p);

    /*
     * Ensure we load p->on_rq _after_ p->state, otherwise it would
     * be possible to, falsely, observe p->on_rq == 0 and get stuck
     * in smp_cond_load_acquire() below.
     *
     * sched_ttwu_pending()         try_to_wake_up()
     *   STORE p->on_rq = 1           LOAD p->state
     *   UNLOCK rq->lock
     *
     * __schedule() (switch to task 'p')
     *   LOCK rq->lock            smp_rmb();
     *   smp_mb__after_spinlock();
     *   UNLOCK rq->lock
     *
     * [task p]
     *   STORE p->state = UNINTERRUPTIBLE     LOAD p->on_rq
     *
     * Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
     * __schedule().  See the comment for smp_mb__after_spinlock().
     */
    smp_rmb();
    if (p->on_rq && ttwu_remote(p, wake_flags))
        goto stat;

#ifdef CONFIG_SMP
    /*
     * Ensure we load p->on_cpu _after_ p->on_rq, otherwise it would be
     * possible to, falsely, observe p->on_cpu == 0.
     *
     * One must be running (->on_cpu == 1) in order to remove oneself
     * from the runqueue.
     *
     * __schedule() (switch to task 'p')    try_to_wake_up()
     *   STORE p->on_cpu = 1          LOAD p->on_rq
     *   UNLOCK rq->lock
     *
     * __schedule() (put 'p' to sleep)
     *   LOCK rq->lock            smp_rmb();
     *   smp_mb__after_spinlock();
     *   STORE p->on_rq = 0           LOAD p->on_cpu
     *
     * Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
     * __schedule().  See the comment for smp_mb__after_spinlock().
     */
    smp_rmb();

    /*
     * If the owning (remote) CPU is still in the middle of schedule() with
     * this task as prev, wait until its done referencing the task.
     *
     * Pairs with the smp_store_release() in finish_task().
     *
     * This ensures that tasks getting woken will be fully ordered against
     * their previous state and preserve Program Order.
     */
    smp_cond_load_acquire(&p->on_cpu, !VAL);

    p->sched_contributes_to_load = !!task_contributes_to_load(p);
    p->state = TASK_WAKING;

    if (p->in_iowait) {
        delayacct_blkio_end(p);
        atomic_dec(&task_rq(p)->nr_iowait);
    }

    cpu = select_task_rq(p, p->wake_cpu, SD_BALANCE_WAKE, wake_flags);
    if (task_cpu(p) != cpu) {
        wake_flags |= WF_MIGRATED;
        psi_ttwu_dequeue(p);
        set_task_cpu(p, cpu);
    }

#else /* CONFIG_SMP */

    if (p->in_iowait) {
        delayacct_blkio_end(p);
        atomic_dec(&task_rq(p)->nr_iowait);
    }

#endif /* CONFIG_SMP */

    ttwu_queue(p, cpu, wake_flags); // 將對應(yīng)的task放入喚醒隊(duì)列
stat:
    ttwu_stat(p, cpu, wake_flags);
out:
    raw_spin_unlock_irqrestore(&p->pi_lock, flags);
    return success;
}

ttwu_queue是喚醒task的重要入口,相關(guān)調(diào)度關(guān)系如下:


ttwu_queue

互斥信號(hào)量 mutex

互斥信號(hào)量其實(shí)和信號(hào)量本質(zhì)上其實(shí)沒有什么區(qū)別,基本實(shí)現(xiàn)原理也大同小異,唯一的區(qū)別就是互斥信號(hào)量count的取值限制為1或者-1,從而可以做一些特殊的優(yōu)化來提高性能,這也是他存在的理由。在早之前mutex是會(huì)切換上下文的,但是最新的版本都是自旋等待了。
mutex涉及的操作包括mutex_lock及mutex_unlock。
相關(guān)數(shù)據(jù)結(jié)構(gòu)及代碼如下:

struct mutex {
    atomic_long_t       owner; // 當(dāng)前信號(hào)量被那個(gè)task占用
    spinlock_t      wait_lock; // 保護(hù)owner及wait_list
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
    struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
    struct list_head    wait_list;
};

mutex_lock => __mutex_lock_slowpath => __mutex_lock => __mutex_lock_common
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
            struct lockdep_map *nest_lock, unsigned long ip,
            struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
    struct mutex_waiter waiter;
    bool first = false;
    struct ww_mutex *ww;
    int ret;

    might_sleep(); // 提示該函數(shù)會(huì)sleep,不要在這個(gè)函數(shù)上使用自旋鎖
        ...
    preempt_disable();
    mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
    if (__mutex_trylock(lock) || // 再次嘗試快速獲取鎖
        mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {  // 嘗試自旋的方式獲取鎖,以提高性能,失敗的判斷依據(jù)是:1. 被搶占過 2. 第一個(gè)等待的task不等于上面最后一個(gè)變量,這里是NULL,也就是之前沒有task在等待
        /* got the lock, yay! */
        lock_acquired(&lock->dep_map, ip);
        if (use_ww_ctx && ww_ctx)
            ww_mutex_set_context_fastpath(ww, ww_ctx);
        preempt_enable();
        return 0;
    }

    spin_lock(&lock->wait_lock);
    /*
     * After waiting to acquire the wait_lock, try again.
     */
    if (__mutex_trylock(lock)) {
        if (use_ww_ctx && ww_ctx)
            __ww_mutex_check_waiters(lock, ww_ctx);
        goto skip_wait;
    }
    ...
    if (!use_ww_ctx) {
        /* add waiting tasks to the end of the waitqueue (FIFO): */
        __mutex_add_waiter(lock, &waiter, &lock->wait_list); // FIFO加入等待隊(duì)列隊(duì)尾
#ifdef CONFIG_DEBUG_MUTEXES
        waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
    } else {
        /*
         * Add in stamp order, waking up waiters that must kill
         * themselves.
         */
        ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); // 加入等待隊(duì)列隊(duì)尾并做一些標(biāo)記工作
        if (ret)
            goto err_early_kill;

        waiter.ww_ctx = ww_ctx;
    }

    waiter.task = current;

    set_current_state(state);
    for (;;) {
        /*
         * Once we hold wait_lock, we're serialized against
         * mutex_unlock() handing the lock off to us, do a trylock
         * before testing the error conditions to make sure we pick up
         * the handoff.
         */
        if (__mutex_trylock(lock)) // 獲取成功
            goto acquired;

        /*
         * Check for signals and kill conditions while holding
         * wait_lock. This ensures the lock cancellation is ordered
         * against mutex_unlock() and wake-ups do not go missing.
         */
        if (signal_pending_state(state, current)) { //被中斷
            ret = -EINTR;
            goto err;
        }
        if (use_ww_ctx && ww_ctx) {
            ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
            if (ret) // 被
                goto err;
        }
        spin_unlock(&lock->wait_lock);
        schedule_preempt_disabled();
        /*
         * ww_mutex needs to always recheck its position since its waiter
         * list is not FIFO ordered.
         */
        if ((use_ww_ctx && ww_ctx) || !first) {
            first = __mutex_waiter_is_first(lock, &waiter);
            if (first)
                __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
        }
        set_current_state(state);
        /*
         * Here we order against unlock; we must either see it change
         * state back to RUNNING and fall through the next schedule(),
         * or we must see its unlock and acquire.
         */
        if (__mutex_trylock(lock) ||
            (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) //自旋等待
            break;

        spin_lock(&lock->wait_lock);
    }
    spin_lock(&lock->wait_lock);
acquired: // 獲取成功,設(shè)置狀態(tài)并清除相關(guān)隊(duì)列信息
    __set_current_state(TASK_RUNNING);
    if (use_ww_ctx && ww_ctx) {
        /*
         * Wound-Wait; we stole the lock (!first_waiter), check the
         * waiters as anyone might want to wound us.
         */
        if (!ww_ctx->is_wait_die &&
            !__mutex_waiter_is_first(lock, &waiter))
            __ww_mutex_check_waiters(lock, ww_ctx);
    }
    mutex_remove_waiter(lock, &waiter, current);
    if (likely(list_empty(&lock->wait_list)))
        __mutex_clear_flag(lock, MUTEX_FLAGS);
    debug_mutex_free_waiter(&waiter);
skip_wait: // 快速獲取成功,未操作隊(duì)列以及狀態(tài)信息
    /* got the lock - cleanup and rejoice! */
    lock_acquired(&lock->dep_map, ip);
    if (use_ww_ctx && ww_ctx)
        ww_mutex_lock_acquired(ww, ww_ctx);
    spin_unlock(&lock->wait_lock);
    preempt_enable();
    return 0;
err: // 被中斷之類的
    __set_current_state(TASK_RUNNING);
    mutex_remove_waiter(lock, &waiter, current);
err_early_kill: //被kill
    spin_unlock(&lock->wait_lock);
    debug_mutex_free_waiter(&waiter);
    mutex_release(&lock->dep_map, 1, ip);
    preempt_enable();
    return ret;
}

/*
 * Release the lock, slowpath:
 */
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
    struct task_struct *next = NULL;
    DEFINE_WAKE_Q(wake_q);
    unsigned long owner;
    mutex_release(&lock->dep_map, 1, ip);
    /*
     * Release the lock before (potentially) taking the spinlock such that
     * other contenders can get on with things ASAP.
     *
     * Except when HANDOFF, in that case we must not clear the owner field,
     * but instead set it to the top waiter.
     */
    owner = atomic_long_read(&lock->owner); // 獲取當(dāng)前owner
    for (;;) {
        unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
        DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
        DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
        if (owner & MUTEX_FLAG_HANDOFF)
            break;
        old = atomic_long_cmpxchg_release(&lock->owner, owner,
                          __owner_flags(owner)); // 釋放鎖
        if (old == owner) {
            if (owner & MUTEX_FLAG_WAITERS)
                break;
            return;
        }
        owner = old;
    }
    spin_lock(&lock->wait_lock);
    debug_mutex_unlock(lock);
    if (!list_empty(&lock->wait_list)) {
        /* get the first entry from the wait-list: */
        struct mutex_waiter *waiter =
            list_first_entry(&lock->wait_list,
                     struct mutex_waiter, list);
        next = waiter->task;
        debug_mutex_wake_waiter(lock, waiter);
        wake_q_add(&wake_q, next); // 加入喚醒隊(duì)列
    }
    if (owner & MUTEX_FLAG_HANDOFF)
        __mutex_handoff(lock, next); // 直接將owner轉(zhuǎn)交給下一個(gè)task
    spin_unlock(&lock->wait_lock);
    wake_up_q(&wake_q); // 喚醒對應(yīng)的task
}

mutex_lock先會(huì)嘗試__mutex_trylock_fast,也就是當(dāng)鎖未被占用時(shí),通過cas操作將owner置為當(dāng)前task。__mutex_lock_slowpath則是相關(guān)的隊(duì)列操作的入口函數(shù),最終會(huì)委托給__mutex_lock_common執(zhí)行相關(guān)操作。在我看的版本(5.0.8)已經(jīng)是自旋等待信號(hào)量了,不過在2.6.37版本還是通過上下文切換等待,具體哪個(gè)版本變化的,我也沒深究了。lock的操作就是通過原子性操作cas將owner置為當(dāng)前task_struct地址,如果成功或者owner中的值已經(jīng)被設(shè)為自己則表示拿鎖成功,否則繼續(xù)自旋等待。

mutex_unlock同mutex_lock一樣也會(huì)有fast和slow兩種方式,其負(fù)責(zé)將owner置空,如果有MUTEX_FLAG_HANDOFF flag,則直接將owner值設(shè)為下一個(gè)task,如果有task等待該信號(hào)量,則將其喚醒。

RCU

read-copy_update這個(gè)機(jī)制是一個(gè)通過空間換時(shí)間的同步機(jī)制,性能很好,但是使用有一些限制:

  1. 共享資源大部分時(shí)間都是只讀的,寫比較少。
  2. RCU保護(hù)的代碼,內(nèi)核也不能進(jìn)入睡眠。
  3. 受保護(hù)的資源必須通過指針訪問。

其原理很簡單,讀取是訪問實(shí)際存儲(chǔ),而修改則是通過創(chuàng)建一個(gè)副本,在副本中修改相關(guān)內(nèi)容,在所有讀使用者結(jié)束舊副本的訪問后將指針替換為新的副本指針。RCU機(jī)制主要用于內(nèi)核list相關(guān)操作中。相關(guān)實(shí)現(xiàn)細(xì)節(jié)有點(diǎn)繁瑣,而思想比較簡單,就不仔細(xì)看了。

讀寫鎖

上述的各個(gè)機(jī)制,沒有很好的區(qū)分讀寫訪問,一般來說有很多場景讀請求是要遠(yuǎn)大于寫的,而又有少數(shù)場景寫請求遠(yuǎn)大于讀,所以需要一種更加靈活的機(jī)制來提高鎖的性能,讀寫鎖就橫空出世了。內(nèi)核提供了額外的信號(hào)量和自旋鎖版本,分別稱之為讀者/寫者信號(hào)量和讀者/寫者自旋鎖,其相關(guān)實(shí)現(xiàn)與體系結(jié)構(gòu)有關(guān),但基本大同小異思想相同,我們以arm32為例來閱讀一下相關(guān)源碼。

讀寫自旋鎖

arm32體系結(jié)構(gòu)讀寫自旋鎖相關(guān)代碼如下:

typedef struct {
    u32 lock;
} arch_rwlock_t;

//類似于執(zhí)行while(!atomic_cas(rw->lock, 0, 0x80000000));
static inline void arch_write_lock(arch_rwlock_t *rw)
{
    unsigned long tmp;
    prefetchw(&rw->lock);
    __asm__ __volatile__(
"1: ldrex   %0, [%1]\n" // 標(biāo)記rw->lock 獨(dú)占訪問,tmp  = rw->lock 
"   teq %0, #0\n"  // 判斷 tmp 是否等于0
    WFE("ne") // tmp != 0,表示有讀請求或者請求正在執(zhí)行,進(jìn)入WFE狀態(tài)
"   strexeq %0, %2, [%1]\n" // 如果rw->lock==0 這執(zhí)行 rw->lock= 0x80000000,如果成功則讓tmp=0
"   teq %0, #0\n"
"   bne 1b" // tmp != 0,則說明獲取鎖失敗,自旋重試
    : "=&r" (tmp)
    : "r" (&rw->lock), "r" (0x80000000)
    : "cc");
    smp_mb();
}

static inline void arch_write_unlock(arch_rwlock_t *rw)
{
    smp_mb();

    __asm__ __volatile__(
    "str    %1, [%0]\n" // rw->lock = 0
    :
    : "r" (&rw->lock), "r" (0)
    : "cc");
    dsb_sev(); //喚醒WFE cpu
}
// 類似于執(zhí)行while(atomic_load(rw->lock) + 1 < 0);
static inline void arch_read_lock(arch_rwlock_t *rw)
{
    unsigned long tmp, tmp2;
    prefetchw(&rw->lock);
    __asm__ __volatile__(
"1: ldrex   %0, [%2]\n" // tmp = rw->lock
"   adds    %0, %0, #1\n" // tmp++
"   strexpl %1, %0, [%2]\n" // 如果tmp非負(fù),則執(zhí)行rw->lock = tmp
    WFE("mi") // 如果tmp是負(fù)數(shù)則進(jìn)入 wait for event 狀態(tài)
"   rsbpls  %0, %1, #0\n" // 判斷strexpl是否執(zhí)行成功,也就是tmp非負(fù)
"   bmi 1b" // strexpl 執(zhí)行不成功,自旋重試
    : "=&r" (tmp), "=&r" (tmp2)
    : "r" (&rw->lock)
    : "cc");
    smp_mb();
}
// 類似于執(zhí)行while(!atomic_dec(rw->lock));
static inline void arch_read_unlock(arch_rwlock_t *rw)
{
    unsigned long tmp, tmp2;
    smp_mb();
    prefetchw(&rw->lock);
    __asm__ __volatile__(
"1: ldrex   %0, [%2]\n" // tmp = rw->lock,標(biāo)記獨(dú)占訪問rw->lock,
"   sub %0, %0, #1\n" // tmp --;
"   strex   %1, %0, [%2]\n" // 清除獨(dú)占訪問rw->lock;
"   teq %1, #0\n"
"   bne 1b"  // tmp2 !=0, strex失敗,自旋重試到1
    : "=&r" (tmp), "=&r" (tmp2)
    : "r" (&rw->lock)
    : "cc");
    if (tmp == 0)
        dsb_sev(); // 說明是最后一個(gè)離開臨界區(qū)的reader,調(diào)用sev喚醒WFE的cpu core
}

arm讀寫自旋鎖的實(shí)現(xiàn)其實(shí)和自旋鎖的實(shí)現(xiàn),差距不大,其基本思想就是:
通過一個(gè)int 變量lock來記錄鎖狀態(tài):正數(shù),說明只有讀者,讀鎖不需要自旋等待;負(fù)數(shù),說明有寫訪問,讀鎖需要進(jìn)入WFE自旋狀態(tài)。

  1. 對于read lock,讓lock值原子性自增1,如果lock值為正數(shù),直接返回,否則自旋等待。
  2. 對于read unlock, 讓lock值原子性自減1,如果lock==0,則做一次WFE喚醒
  3. 對于write lock,如果執(zhí)行cas(lock, 0, 0x80000000)成功,說明之前鎖free,直接返回,否則說明有其他讀者或者寫者占用鎖,進(jìn)入WFE自旋狀態(tài)。
  4. 對于write unlock,直接讓lock=0 表示鎖無占用。
    可以看出這是一個(gè)不公平鎖,寫操作明顯具有劣勢,只要lock值不為0,則寫操作就永遠(yuǎn)獲取不到鎖,即使一個(gè)寫請求先于一個(gè)讀請求到達(dá),如果當(dāng)時(shí)已有讀者占用的話。那么我們基于一個(gè)公平鎖來介紹一下讀者/寫者信號(hào)量(也有不公平的但實(shí)現(xiàn)跟上面差不多)。

讀寫信號(hào)量

與讀寫自旋鎖不同的是,讀寫信號(hào)量,當(dāng)獲取不到鎖時(shí),需要將對應(yīng)的task掛起等待喚醒,其也有公平和非公平兩種實(shí)現(xiàn)模式,非公平的比較簡單和上述讀寫自旋鎖差不多,除了需要schedule切換上下文,所以下面只講公平鎖的實(shí)現(xiàn)。
我們直接來看代碼:

source: /include/linux/rwsem-spinlock.h &&  /include/linux/rwsem-spinlock.c

struct rw_semaphore {
    __s32           count; // 記錄讀者數(shù)
    raw_spinlock_t      wait_lock; // 保護(hù)count及wait_list的修改
    struct list_head    wait_list; // 堵塞隊(duì)列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
};

down_write => __down_write => ____down_write_common
/*
 * get a write lock on the semaphore
 */
int __sched __down_write_common(struct rw_semaphore *sem, int state)
{
    struct rwsem_waiter waiter;
    unsigned long flags;
    int ret = 0;
    raw_spin_lock_irqsave(&sem->wait_lock, flags);
    /* set up my own style of waitqueue */
    waiter.task = current;
    waiter.type = RWSEM_WAITING_FOR_WRITE;
    list_add_tail(&waiter.list, &sem->wait_list); // 加入等待隊(duì)列
    /* wait for someone to release the lock */
    for (;;) {
        /*
         * That is the key to support write lock stealing: allows the
         * task already on CPU to get the lock soon rather than put
         * itself into sleep and waiting for system woke it or someone
         * else in the head of the wait list up.
         */
        if (sem->count == 0) // 獲取到鎖
            break;
        if (signal_pending_state(state, current))
            goto out_nolock;

        set_current_state(state);
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        schedule(); // 掛起當(dāng)前task,切換到下一個(gè)task
        raw_spin_lock_irqsave(&sem->wait_lock, flags);
    }
    /* got the lock */
    sem->count = -1;
    list_del(&waiter.list);

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);

    return ret;

out_nolock:
    list_del(&waiter.list);
    if (!list_empty(&sem->wait_list) && sem->count >= 0)
        __rwsem_do_wake(sem, 0);
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);

    return -EINTR;
}

up_write => __up_write
void __up_write(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);

    sem->count = 0;
    if (!list_empty(&sem->wait_list))
        sem = __rwsem_do_wake(sem, 1); // 從sem等待隊(duì)列中獲取一個(gè)task放入喚醒隊(duì)列
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}

down_read => __down_read => __down_read_common
int __sched __down_read_common(struct rw_semaphore *sem, int state)
{
    struct rwsem_waiter waiter;
    unsigned long flags;
    raw_spin_lock_irqsave(&sem->wait_lock, flags);
    if (sem->count >= 0 && list_empty(&sem->wait_list)) { // 如果count大于0且sem->wait_list為空時(shí)才認(rèn)為是可以獲取讀鎖的
        /* granted */
        sem->count++;
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        goto out;
    }
    /* set up my own style of waitqueue */
    waiter.task = current;
    waiter.type = RWSEM_WAITING_FOR_READ;
    get_task_struct(current);
    list_add_tail(&waiter.list, &sem->wait_list); // 加入等待隊(duì)列
    /* wait to be given the lock */
    for (;;) {
        if (!waiter.task) // 獲取到鎖(只有寫鎖釋放會(huì)調(diào)用到這塊邏輯,而__rwsem_do_wake會(huì)把這個(gè)指針置空)
            break;
        if (signal_pending_state(state, current)) // 被中斷
            goto out_nolock;
        set_current_state(state);
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        schedule();
        raw_spin_lock_irqsave(&sem->wait_lock, flags);
    }

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
 out:
    return 0;

out_nolock:
    /*
     * We didn't take the lock, so that there is a writer, which
     * is owner or the first waiter of the sem. If it's a waiter,
     * it will be woken by current owner. Not need to wake anybody.
     */
    list_del(&waiter.list);
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
    return -EINTR;
}

up_read => __up_read
/*
 * release a read lock on the semaphore
 */
void __up_read(struct rw_semaphore *sem)
{
    unsigned long flags;
    raw_spin_lock_irqsave(&sem->wait_lock, flags);
    if (--sem->count == 0 && !list_empty(&sem->wait_list))
        sem = __rwsem_wake_one_writer(sem); // 如果是最后一個(gè)占用讀鎖,則喚醒一個(gè)等待寫的task
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}

通過上述代碼發(fā)現(xiàn),公平鎖的實(shí)現(xiàn)主要區(qū)別就是增加了對wait_list的size判斷處理邏輯,這也是內(nèi)核實(shí)現(xiàn)的精妙之處。read獲取鎖時(shí)不僅僅需要count非負(fù),而且wait_list必須得為空。如果wait_list非空則說明已有寫操作等待在隊(duì)列中了,后續(xù)的讀操作得排隊(duì)在寫請求之后,在非公平鎖中,這些讀請求會(huì)優(yōu)于寫請求。write lock如果當(dāng)前count不等于0則加入等待隊(duì)列wait_list,并放棄CPU,等待喚醒,否則獲取成功。write unlock則將count置為0,并喚醒等待隊(duì)列中的第一個(gè)task執(zhí)行。read lock如果count非負(fù),且wait_list為空,則獲取鎖成功,count自增1返回,否則,放入就緒隊(duì)列中,等待被喚醒。read unlock 將count自減1,如果count==0,則嘗試喚醒一個(gè)等待寫的task(如果存在的話)。

總結(jié)

至此內(nèi)核鎖相關(guān)的內(nèi)容介紹就差不多了,但是用戶空間的鎖和內(nèi)核空間的鎖存在什么關(guān)系呢?比如java中的ReentrantLock,ReadWriteLock這些的實(shí)現(xiàn)和內(nèi)核鎖有沒有什么關(guān)聯(lián)呢?

用戶空間鎖相關(guān)

內(nèi)核可以很好地通過schedule調(diào)度來進(jìn)行相關(guān)task等待以及喚醒,但是用戶空間并不能直接去處理調(diào)度相關(guān)內(nèi)容,所以用戶空間鎖等待鎖時(shí)是如何掛起線程的呢?我們帶著這個(gè)問題來探討一下相關(guān)內(nèi)容。
對于pthread,大家應(yīng)該不陌生,具體見鏈接,該標(biāo)準(zhǔn)定義了一大堆與thread相關(guān)的接口函數(shù),包括線程,鎖,優(yōu)先級(jí),信號(hào)等等各種資源。用戶空間相關(guān)的鎖就是通過這一套標(biāo)準(zhǔn)與內(nèi)核交互的。
其中與鎖相關(guān)的主要包括:
pthread_spin_lock,自旋鎖
pthread_mutex_lock/unlock,互斥鎖
pthread_rwlock_rdlock,讀鎖
pthread_rwlock_timedrdlock,讀鎖超時(shí)
pthread_rwlock_wrlock,寫鎖
pthread_rwlock_timedwrlock,寫鎖超時(shí)
pthread_rwlock_unlock,釋放讀寫鎖
pthread_cond_wait,條件變量等待
pthread_cond_signal,條件變量通知
pthread_cond_broadcast,條件變量通知所有等待thread
等等函數(shù),在linux中,這些都實(shí)現(xiàn)在glibc庫中,有興趣的同學(xué)可以看一下:https://github.com/bminor/glibc

其中
pthread_spin_lock就是通過while循環(huán)執(zhí)行cas操作

pthread_mutex_lock及pthread_rwlock實(shí)際是通過futex系統(tǒng)調(diào)用來執(zhí)行的,具體見http://man7.org/linux/man-pages/man2/futex.2.html
,其基本思想就是想嘗試在user space通過cas獲取鎖,如果成功就直接返回,如果失敗則進(jìn)入內(nèi)核調(diào)用mutex_lock相關(guān)邏輯,而讀寫鎖相關(guān)邏輯是自己實(shí)現(xiàn)的,其思想與內(nèi)核相關(guān)實(shí)現(xiàn)差不多。

pthread_cond_wait/notify會(huì)結(jié)合信號(hào)機(jī)制及鎖機(jī)制一起合作實(shí)現(xiàn),通過維護(hù)pthread_cond_t中的thread隊(duì)列來進(jìn)行相關(guān)通信及等待,具體咋搞的,我也沒看,有點(diǎn)復(fù)雜,相比內(nèi)核代碼而言,他這個(gè)簡直是太難看懂了0.0。想看的可以去看一看pthread_cond_wait,pthread_cond_signal

java相關(guān)的鎖便是基于pthread 通過JNI封裝了一層調(diào)用,比如
Object wait notify notifyall 對應(yīng)于 pthread_cond_wait pthread_cond_signal
LockSupport park 及 unpark 是 相對于 Object wait notify 的一種更加安全的實(shí)現(xiàn)機(jī)制,其底層也是通過 pthread_cond_wait pthread_cond_signal 實(shí)現(xiàn)。
Object 及 LockSupport c++實(shí)現(xiàn)都是基于os相關(guān)的os::PlatformEvent::park()及os::PlatformEvent::unpark(),而這兩個(gè)函數(shù)在linux中是通過以上兩個(gè)操作pthread_cond_wait 及pthread_cond_signal完成的

ReentrantLock 與 ReadWriteLock這對應(yīng)于 pthread_mutex及pthread_rwlock相關(guān)操作。

相關(guān)內(nèi)容可以看java 8 hotspot park

java8 hotspot monitor管程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容