簡介
在多線程(進(jìn)程)并發(fā)模型中,多個(gè)線程如果同時(shí)處理同一塊數(shù)據(jù)的話,會(huì)引發(fā)競態(tài)問題,以及隨之而來的線程安全問題。而鎖是解決線程安全的重要手段,其中主要包括原子性操作atomic,自旋鎖spin_lock,信號(hào)量semaphore,互斥信號(hào)量mutex,讀寫鎖rw_lock等等。
臨界區(qū)
臨界區(qū)是指一個(gè)訪問共用資源(例如:共用設(shè)備或是共用存儲(chǔ)器)的程序片段,而這些共用資源又無法同時(shí)被多個(gè)訪問的特性。
當(dāng)一個(gè)進(jìn)程進(jìn)入一段代碼時(shí)被中斷,而另一個(gè)進(jìn)程剛好也進(jìn)入相關(guān)代碼片段操作了同一塊數(shù)據(jù)導(dǎo)致數(shù)據(jù)的不正確,從而導(dǎo)致了進(jìn)程工作不正確。最簡單的方式是關(guān)中斷,但這容易出問題,而且在多核操作系統(tǒng)中,關(guān)中斷并不能阻止其他CPU上跑了同一代碼片段,因此必須有些機(jī)制來保證只有一個(gè)進(jìn)程進(jìn)入臨界區(qū)問題,我們稱之為互斥。
原子操作
atomic是最簡單的鎖操作,可以保證最簡單的操作,諸如計(jì)數(shù)器加1之類的,可以不被中斷的執(zhí)行代碼,在SMP系統(tǒng)中需要特殊的鎖機(jī)制來保證只在一個(gè)CPU上執(zhí)行,在單CPU上通過關(guān)中斷可以很好實(shí)現(xiàn)也很好理解atomic,但是SMP系統(tǒng)中的鎖保證就會(huì)稍微理解起來比較復(fù)雜一點(diǎn)。這種特殊的鎖指令比較偏硬件底層,就不細(xì)探究了。我們簡單看一下atomic相關(guān)數(shù)據(jù)結(jié)構(gòu)和對于int相關(guān)操作。(相關(guān)操作與具體的體系結(jié)構(gòu)有關(guān),這是因?yàn)椴煌捏w系結(jié)構(gòu)鎖指令不一樣)
typedef struct {
int counter;
} atomic_t;
// x86
static inline void atomic_inc(atomic_t *v)
{
asm_volatile(LOCK_PREFIX "incl %0"
: "+m" (v->counter));
}
#ifdef CONFIG_SMP
#define LOCK_PREFIX_HERE \
".pushsection .smp_locks,\"a\"\n" \
".balign 4\n" \
".long 671f - .\n" /* offset */ \
".popsection\n" \
"671:"
#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "
#else /* ! CONFIG_SMP */
#define LOCK_PREFIX_HERE ""
#define LOCK_PREFIX ""
#endif
static __always_inline int arch_atomic_read(const atomic_t *v)
{
/*
* Note for KASAN: we deliberately don't use READ_ONCE_NOCHECK() here,
* it's non-inlined function that increases binary size and stack usage.
*/
return READ_ONCE((v)->counter);
}
#define __READ_ONCE_SIZE \
({ \
switch (size) { \
case 1: *(__u8 *)res = *(volatile __u8 *)p; break; \
case 2: *(__u16 *)res = *(volatile __u16 *)p; break; \
case 4: *(__u32 *)res = *(volatile __u32 *)p; break; \
case 8: *(__u64 *)res = *(volatile __u64 *)p; break; \
default: \
barrier(); \
__builtin_memcpy((void *)res, (const void *)p, size); \
barrier(); \
} \
})
atomic的數(shù)據(jù)結(jié)構(gòu)其實(shí)就是一個(gè)int 變量,讀操作通過volatile讀取(相關(guān)內(nèi)容見內(nèi)存屏障,由于只涉及到一個(gè)操作,不需要額外保證),而inc操作通過lock指令+inc指令聯(lián)合實(shí)現(xiàn)。這樣就可以實(shí)現(xiàn)對于
自旋鎖
自旋鎖用于保護(hù)執(zhí)行很快的鎖操作,等待鎖的時(shí)間不會(huì)太長,所以不釋放CPU,自循環(huán)忙等直到獲取鎖后繼續(xù)執(zhí)行臨界區(qū)代碼。被自旋鎖保護(hù)的代碼在獲取鎖期間必須得是完整執(zhí)行的(不能被其他進(jìn)程打斷,類似于原子性的),如果被自旋鎖保護(hù)的臨界區(qū)代碼執(zhí)行到一半被打斷,就會(huì)引起不正確狀態(tài)。自旋鎖會(huì)通過關(guān)搶占來保證不被打斷;而且使用者也要保證被臨界區(qū)代碼不能主動(dòng)睡眠(這樣可能會(huì)被別人搶占,打破了原子性);自旋鎖是不可重入的
相關(guān)數(shù)據(jù)結(jié)構(gòu)和代碼:(省略了debug相關(guān)難懂的代碼)
typedef struct spinlock {
union {
struct raw_spinlock rlock;
... // debug related struct
};
} spinlock_t;
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
... // debug related struct
} raw_spinlock_t;
// arm
typedef struct {
union {
u32 slock;
struct __raw_tickets {
#ifdef __ARMEB__
u16 next;
u16 owner;
#else
u16 owner;
u16 next;
#endif
} tickets;
};
} arch_spinlock_t;
spin_lock => raw_spin_lock => _raw_spin_lock => __raw_spin_lock => do_raw_spin_lock => arch_spin_lock
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
unsigned long tmp;
u32 newval;
arch_spinlock_t lockval;
prefetchw(&lock->slock);
__asm__ __volatile__( // 這段匯編要做的事情就是原子性操作 localval = lock->tickets.next++
"1: ldrex %0, [%3]\n" // 標(biāo)記lock->slock獨(dú)占訪問,lockval = lock->slock
" add %1, %0, %4\n" // newval = lockval + (1 << 16) => next ++
" strex %2, %1, [%3]\n" // lock->slock = newval; tmp=0 清除獨(dú)占訪問標(biāo)記
" teq %2, #0\n" // tmp == 0
" bne 1b" // tmp != 0,strex操作失敗,自旋重試
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
: "cc");
while (lockval.tickets.next != lockval.tickets.owner) { // 通過判斷l(xiāng)ocalval ==
lock->tickets.owner 來判斷鎖是否由當(dāng)前CPU占有
wfe();
lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
}
smp_mb();
}
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
smp_mb();
lock->tickets.owner++; // 釋放鎖
dsb_sev();
}
arm的自旋鎖邏輯比較簡單,所以就以arm為例來說一下spinlock的實(shí)現(xiàn),在arm的arch_spinlock_t中額外維護(hù)了own和next兩個(gè)變量來判斷當(dāng)前鎖獲取者是誰,own記錄了當(dāng)前鎖的占有者,next記錄了當(dāng)前CPU的排隊(duì)序號(hào)。每次lock時(shí)將next自增,并記錄自己占有的next,稱之為localnext,unlock時(shí)own自增,如果own != localnext說明當(dāng)前鎖由其他CPU占有,等待其他CPU釋放鎖,直到own與localnext相等,則判斷為當(dāng)前CPU是鎖的占有者。
信號(hào)量
實(shí)質(zhì)上信號(hào)量只是受保護(hù)的特別變量,能夠表示為正負(fù)數(shù),初始化為1。為操作信號(hào)量定義了兩個(gè)標(biāo)準(zhǔn)操作,up 和 down。在進(jìn)程進(jìn)入臨界區(qū)時(shí),執(zhí)行down操作,該操作會(huì)將值減1,如果之前已經(jīng)是負(fù)數(shù)則會(huì)將當(dāng)前進(jìn)程放入等待隊(duì)列中,并將當(dāng)前進(jìn)程標(biāo)記為UnInterruptable or Interruptable狀態(tài)進(jìn)入block狀態(tài),等待被喚醒重新調(diào)度;獲取到信號(hào)量的進(jìn)程在退出臨界區(qū)時(shí),執(zhí)行up操作,該操作將值加1,并從等待隊(duì)列中選取一個(gè)進(jìn)程喚醒。
相關(guān)數(shù)據(jù)結(jié)構(gòu)和代碼如下:
struct semaphore {
raw_spinlock_t lock; // 自旋鎖,用于更新count及wait_list
unsigned int count; // 信號(hào)量值
struct list_head wait_list; // 等待隊(duì)列
};
void down(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0)) // 傾向于有足夠的count可供使用
sem->count--;
else
__down(sem); // 需要block當(dāng)前process
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list); // 放入等待隊(duì)列
waiter.task = current;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, current)) // 被中斷喚醒
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state); // TASK_UNINTERRUPTIBLE
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout); // 啟動(dòng)超時(shí)定時(shí)器,超時(shí)后被喚醒并通過調(diào)用schedule放棄CPU調(diào)度。(定時(shí)器相關(guān)目前還沒怎么看)
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
signed long __sched schedule_timeout(signed long timeout)
{
struct process_timer timer;
unsigned long expire;
switch (timeout)
{
case MAX_SCHEDULE_TIMEOUT:
/*
* These two special cases are useful to be comfortable
* in the caller. Nothing more. We could take
* MAX_SCHEDULE_TIMEOUT from one of the negative value
* but I' d like to return a valid offset (>=0) to allow
* the caller to do everything it want with the retval.
*/
schedule();
goto out;
default:
/*
* Another bit of PARANOID. Note that the retval will be
* 0 since no piece of kernel is supposed to do a check
* for a negative retval of schedule_timeout() (since it
* should never happens anyway). You just have the printk()
* that will tell you if something is gone wrong and where.
*/
if (timeout < 0) {
printk(KERN_ERR "schedule_timeout: wrong timeout "
"value %lx\n", timeout);
dump_stack();
current->state = TASK_RUNNING;
goto out;
}
}
expire = timeout + jiffies;
timer.task = current;
timer_setup_on_stack(&timer.timer, process_timeout, 0); // 初始化定時(shí)器,到點(diǎn)后執(zhí)行process_timeout喚醒當(dāng)前task,
__mod_timer(&timer.timer, expire, 0);
schedule(); // 放棄cpu,調(diào)度next se,相關(guān)內(nèi)容見核心調(diào)度器
del_singleshot_timer_sync(&timer.timer);
/* Remove the timer from the object tracker */
destroy_timer_on_stack(&timer.timer);
timeout = expire - jiffies;
out:
return timeout < 0 ? 0 : timeout;
}
static void process_timeout(struct timer_list *t)
{
struct process_timer *timeout = from_timer(timeout, t, timer);
wake_up_process(timeout->task); // 喚醒task
}
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list))) // 更傾向于沒有等待鎖的
sem->count++;
else
__up(sem); // 喚醒task
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true; // 修改對應(yīng)waiter狀態(tài)
wake_up_process(waiter->task); // 喚醒對應(yīng)task
}
如果對CPU調(diào)度有大致理解的話不難理解相關(guān)代碼,其中主要關(guān)注的就是schedule函數(shù),這個(gè)函數(shù)大致語義就是調(diào)度當(dāng)前CPU下一個(gè)task,不會(huì)將當(dāng)前task放入等待隊(duì)列rq。
這里需要單獨(dú)拿出來額外解析一下wake_up_process函數(shù)。后續(xù)我也會(huì)借助java中的鎖來分析一下user space lock與內(nèi)核是如何協(xié)同工作的。
int wake_up_process(struct task_struct *p)
{
return try_to_wake_up(p, TASK_NORMAL, 0);
}
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
unsigned long flags;
int cpu, success = 0;
/*
* If we are going to wake up a thread waiting for CONDITION we
* need to ensure that CONDITION=1 done by the caller can not be
* reordered with p->state check below. This pairs with mb() in
* set_current_state() the waiting thread does.
*/
raw_spin_lock_irqsave(&p->pi_lock, flags);
smp_mb__after_spinlock();
if (!(p->state & state))
goto out;
trace_sched_waking(p);
/* We're going to change ->state: */
success = 1;
cpu = task_cpu(p);
/*
* Ensure we load p->on_rq _after_ p->state, otherwise it would
* be possible to, falsely, observe p->on_rq == 0 and get stuck
* in smp_cond_load_acquire() below.
*
* sched_ttwu_pending() try_to_wake_up()
* STORE p->on_rq = 1 LOAD p->state
* UNLOCK rq->lock
*
* __schedule() (switch to task 'p')
* LOCK rq->lock smp_rmb();
* smp_mb__after_spinlock();
* UNLOCK rq->lock
*
* [task p]
* STORE p->state = UNINTERRUPTIBLE LOAD p->on_rq
*
* Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
* __schedule(). See the comment for smp_mb__after_spinlock().
*/
smp_rmb();
if (p->on_rq && ttwu_remote(p, wake_flags))
goto stat;
#ifdef CONFIG_SMP
/*
* Ensure we load p->on_cpu _after_ p->on_rq, otherwise it would be
* possible to, falsely, observe p->on_cpu == 0.
*
* One must be running (->on_cpu == 1) in order to remove oneself
* from the runqueue.
*
* __schedule() (switch to task 'p') try_to_wake_up()
* STORE p->on_cpu = 1 LOAD p->on_rq
* UNLOCK rq->lock
*
* __schedule() (put 'p' to sleep)
* LOCK rq->lock smp_rmb();
* smp_mb__after_spinlock();
* STORE p->on_rq = 0 LOAD p->on_cpu
*
* Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
* __schedule(). See the comment for smp_mb__after_spinlock().
*/
smp_rmb();
/*
* If the owning (remote) CPU is still in the middle of schedule() with
* this task as prev, wait until its done referencing the task.
*
* Pairs with the smp_store_release() in finish_task().
*
* This ensures that tasks getting woken will be fully ordered against
* their previous state and preserve Program Order.
*/
smp_cond_load_acquire(&p->on_cpu, !VAL);
p->sched_contributes_to_load = !!task_contributes_to_load(p);
p->state = TASK_WAKING;
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
cpu = select_task_rq(p, p->wake_cpu, SD_BALANCE_WAKE, wake_flags);
if (task_cpu(p) != cpu) {
wake_flags |= WF_MIGRATED;
psi_ttwu_dequeue(p);
set_task_cpu(p, cpu);
}
#else /* CONFIG_SMP */
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
#endif /* CONFIG_SMP */
ttwu_queue(p, cpu, wake_flags); // 將對應(yīng)的task放入喚醒隊(duì)列
stat:
ttwu_stat(p, cpu, wake_flags);
out:
raw_spin_unlock_irqrestore(&p->pi_lock, flags);
return success;
}
ttwu_queue是喚醒task的重要入口,相關(guān)調(diào)度關(guān)系如下:

互斥信號(hào)量 mutex
互斥信號(hào)量其實(shí)和信號(hào)量本質(zhì)上其實(shí)沒有什么區(qū)別,基本實(shí)現(xiàn)原理也大同小異,唯一的區(qū)別就是互斥信號(hào)量count的取值限制為1或者-1,從而可以做一些特殊的優(yōu)化來提高性能,這也是他存在的理由。在早之前mutex是會(huì)切換上下文的,但是最新的版本都是自旋等待了。
mutex涉及的操作包括mutex_lock及mutex_unlock。
相關(guān)數(shù)據(jù)結(jié)構(gòu)及代碼如下:
struct mutex {
atomic_long_t owner; // 當(dāng)前信號(hào)量被那個(gè)task占用
spinlock_t wait_lock; // 保護(hù)owner及wait_list
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
};
mutex_lock => __mutex_lock_slowpath => __mutex_lock => __mutex_lock_common
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
might_sleep(); // 提示該函數(shù)會(huì)sleep,不要在這個(gè)函數(shù)上使用自旋鎖
...
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) || // 再次嘗試快速獲取鎖
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) { // 嘗試自旋的方式獲取鎖,以提高性能,失敗的判斷依據(jù)是:1. 被搶占過 2. 第一個(gè)等待的task不等于上面最后一個(gè)變量,這里是NULL,也就是之前沒有task在等待
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
...
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list); // FIFO加入等待隊(duì)列隊(duì)尾
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); // 加入等待隊(duì)列隊(duì)尾并做一些標(biāo)記工作
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock)) // 獲取成功
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (signal_pending_state(state, current)) { //被中斷
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret) // 被
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) //自旋等待
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired: // 獲取成功,設(shè)置狀態(tài)并清除相關(guān)隊(duì)列信息
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait: // 快速獲取成功,未操作隊(duì)列以及狀態(tài)信息
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err: // 被中斷之類的
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill: //被kill
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
/*
* Release the lock, slowpath:
*/
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
mutex_release(&lock->dep_map, 1, ip);
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner); // 獲取當(dāng)前owner
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner)); // 釋放鎖
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next); // 加入喚醒隊(duì)列
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next); // 直接將owner轉(zhuǎn)交給下一個(gè)task
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q); // 喚醒對應(yīng)的task
}
mutex_lock先會(huì)嘗試__mutex_trylock_fast,也就是當(dāng)鎖未被占用時(shí),通過cas操作將owner置為當(dāng)前task。__mutex_lock_slowpath則是相關(guān)的隊(duì)列操作的入口函數(shù),最終會(huì)委托給__mutex_lock_common執(zhí)行相關(guān)操作。在我看的版本(5.0.8)已經(jīng)是自旋等待信號(hào)量了,不過在2.6.37版本還是通過上下文切換等待,具體哪個(gè)版本變化的,我也沒深究了。lock的操作就是通過原子性操作cas將owner置為當(dāng)前task_struct地址,如果成功或者owner中的值已經(jīng)被設(shè)為自己則表示拿鎖成功,否則繼續(xù)自旋等待。
mutex_unlock同mutex_lock一樣也會(huì)有fast和slow兩種方式,其負(fù)責(zé)將owner置空,如果有MUTEX_FLAG_HANDOFF flag,則直接將owner值設(shè)為下一個(gè)task,如果有task等待該信號(hào)量,則將其喚醒。
RCU
read-copy_update這個(gè)機(jī)制是一個(gè)通過空間換時(shí)間的同步機(jī)制,性能很好,但是使用有一些限制:
- 共享資源大部分時(shí)間都是只讀的,寫比較少。
- RCU保護(hù)的代碼,內(nèi)核也不能進(jìn)入睡眠。
- 受保護(hù)的資源必須通過指針訪問。
其原理很簡單,讀取是訪問實(shí)際存儲(chǔ),而修改則是通過創(chuàng)建一個(gè)副本,在副本中修改相關(guān)內(nèi)容,在所有讀使用者結(jié)束舊副本的訪問后將指針替換為新的副本指針。RCU機(jī)制主要用于內(nèi)核list相關(guān)操作中。相關(guān)實(shí)現(xiàn)細(xì)節(jié)有點(diǎn)繁瑣,而思想比較簡單,就不仔細(xì)看了。
讀寫鎖
上述的各個(gè)機(jī)制,沒有很好的區(qū)分讀寫訪問,一般來說有很多場景讀請求是要遠(yuǎn)大于寫的,而又有少數(shù)場景寫請求遠(yuǎn)大于讀,所以需要一種更加靈活的機(jī)制來提高鎖的性能,讀寫鎖就橫空出世了。內(nèi)核提供了額外的信號(hào)量和自旋鎖版本,分別稱之為讀者/寫者信號(hào)量和讀者/寫者自旋鎖,其相關(guān)實(shí)現(xiàn)與體系結(jié)構(gòu)有關(guān),但基本大同小異思想相同,我們以arm32為例來閱讀一下相關(guān)源碼。
讀寫自旋鎖
arm32體系結(jié)構(gòu)讀寫自旋鎖相關(guān)代碼如下:
typedef struct {
u32 lock;
} arch_rwlock_t;
//類似于執(zhí)行while(!atomic_cas(rw->lock, 0, 0x80000000));
static inline void arch_write_lock(arch_rwlock_t *rw)
{
unsigned long tmp;
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%1]\n" // 標(biāo)記rw->lock 獨(dú)占訪問,tmp = rw->lock
" teq %0, #0\n" // 判斷 tmp 是否等于0
WFE("ne") // tmp != 0,表示有讀請求或者請求正在執(zhí)行,進(jìn)入WFE狀態(tài)
" strexeq %0, %2, [%1]\n" // 如果rw->lock==0 這執(zhí)行 rw->lock= 0x80000000,如果成功則讓tmp=0
" teq %0, #0\n"
" bne 1b" // tmp != 0,則說明獲取鎖失敗,自旋重試
: "=&r" (tmp)
: "r" (&rw->lock), "r" (0x80000000)
: "cc");
smp_mb();
}
static inline void arch_write_unlock(arch_rwlock_t *rw)
{
smp_mb();
__asm__ __volatile__(
"str %1, [%0]\n" // rw->lock = 0
:
: "r" (&rw->lock), "r" (0)
: "cc");
dsb_sev(); //喚醒WFE cpu
}
// 類似于執(zhí)行while(atomic_load(rw->lock) + 1 < 0);
static inline void arch_read_lock(arch_rwlock_t *rw)
{
unsigned long tmp, tmp2;
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%2]\n" // tmp = rw->lock
" adds %0, %0, #1\n" // tmp++
" strexpl %1, %0, [%2]\n" // 如果tmp非負(fù),則執(zhí)行rw->lock = tmp
WFE("mi") // 如果tmp是負(fù)數(shù)則進(jìn)入 wait for event 狀態(tài)
" rsbpls %0, %1, #0\n" // 判斷strexpl是否執(zhí)行成功,也就是tmp非負(fù)
" bmi 1b" // strexpl 執(zhí)行不成功,自旋重試
: "=&r" (tmp), "=&r" (tmp2)
: "r" (&rw->lock)
: "cc");
smp_mb();
}
// 類似于執(zhí)行while(!atomic_dec(rw->lock));
static inline void arch_read_unlock(arch_rwlock_t *rw)
{
unsigned long tmp, tmp2;
smp_mb();
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%2]\n" // tmp = rw->lock,標(biāo)記獨(dú)占訪問rw->lock,
" sub %0, %0, #1\n" // tmp --;
" strex %1, %0, [%2]\n" // 清除獨(dú)占訪問rw->lock;
" teq %1, #0\n"
" bne 1b" // tmp2 !=0, strex失敗,自旋重試到1
: "=&r" (tmp), "=&r" (tmp2)
: "r" (&rw->lock)
: "cc");
if (tmp == 0)
dsb_sev(); // 說明是最后一個(gè)離開臨界區(qū)的reader,調(diào)用sev喚醒WFE的cpu core
}
arm讀寫自旋鎖的實(shí)現(xiàn)其實(shí)和自旋鎖的實(shí)現(xiàn),差距不大,其基本思想就是:
通過一個(gè)int 變量lock來記錄鎖狀態(tài):正數(shù),說明只有讀者,讀鎖不需要自旋等待;負(fù)數(shù),說明有寫訪問,讀鎖需要進(jìn)入WFE自旋狀態(tài)。
- 對于read lock,讓lock值原子性自增1,如果lock值為正數(shù),直接返回,否則自旋等待。
- 對于read unlock, 讓lock值原子性自減1,如果lock==0,則做一次WFE喚醒
- 對于write lock,如果執(zhí)行cas(lock, 0, 0x80000000)成功,說明之前鎖free,直接返回,否則說明有其他讀者或者寫者占用鎖,進(jìn)入WFE自旋狀態(tài)。
- 對于write unlock,直接讓lock=0 表示鎖無占用。
可以看出這是一個(gè)不公平鎖,寫操作明顯具有劣勢,只要lock值不為0,則寫操作就永遠(yuǎn)獲取不到鎖,即使一個(gè)寫請求先于一個(gè)讀請求到達(dá),如果當(dāng)時(shí)已有讀者占用的話。那么我們基于一個(gè)公平鎖來介紹一下讀者/寫者信號(hào)量(也有不公平的但實(shí)現(xiàn)跟上面差不多)。
讀寫信號(hào)量
與讀寫自旋鎖不同的是,讀寫信號(hào)量,當(dāng)獲取不到鎖時(shí),需要將對應(yīng)的task掛起等待喚醒,其也有公平和非公平兩種實(shí)現(xiàn)模式,非公平的比較簡單和上述讀寫自旋鎖差不多,除了需要schedule切換上下文,所以下面只講公平鎖的實(shí)現(xiàn)。
我們直接來看代碼:
source: /include/linux/rwsem-spinlock.h && /include/linux/rwsem-spinlock.c
struct rw_semaphore {
__s32 count; // 記錄讀者數(shù)
raw_spinlock_t wait_lock; // 保護(hù)count及wait_list的修改
struct list_head wait_list; // 堵塞隊(duì)列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
down_write => __down_write => ____down_write_common
/*
* get a write lock on the semaphore
*/
int __sched __down_write_common(struct rw_semaphore *sem, int state)
{
struct rwsem_waiter waiter;
unsigned long flags;
int ret = 0;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
/* set up my own style of waitqueue */
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
list_add_tail(&waiter.list, &sem->wait_list); // 加入等待隊(duì)列
/* wait for someone to release the lock */
for (;;) {
/*
* That is the key to support write lock stealing: allows the
* task already on CPU to get the lock soon rather than put
* itself into sleep and waiting for system woke it or someone
* else in the head of the wait list up.
*/
if (sem->count == 0) // 獲取到鎖
break;
if (signal_pending_state(state, current))
goto out_nolock;
set_current_state(state);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
schedule(); // 掛起當(dāng)前task,切換到下一個(gè)task
raw_spin_lock_irqsave(&sem->wait_lock, flags);
}
/* got the lock */
sem->count = -1;
list_del(&waiter.list);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return ret;
out_nolock:
list_del(&waiter.list);
if (!list_empty(&sem->wait_list) && sem->count >= 0)
__rwsem_do_wake(sem, 0);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return -EINTR;
}
up_write => __up_write
void __up_write(struct rw_semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
sem->count = 0;
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, 1); // 從sem等待隊(duì)列中獲取一個(gè)task放入喚醒隊(duì)列
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
down_read => __down_read => __down_read_common
int __sched __down_read_common(struct rw_semaphore *sem, int state)
{
struct rwsem_waiter waiter;
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
if (sem->count >= 0 && list_empty(&sem->wait_list)) { // 如果count大于0且sem->wait_list為空時(shí)才認(rèn)為是可以獲取讀鎖的
/* granted */
sem->count++;
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
goto out;
}
/* set up my own style of waitqueue */
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(current);
list_add_tail(&waiter.list, &sem->wait_list); // 加入等待隊(duì)列
/* wait to be given the lock */
for (;;) {
if (!waiter.task) // 獲取到鎖(只有寫鎖釋放會(huì)調(diào)用到這塊邏輯,而__rwsem_do_wake會(huì)把這個(gè)指針置空)
break;
if (signal_pending_state(state, current)) // 被中斷
goto out_nolock;
set_current_state(state);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
schedule();
raw_spin_lock_irqsave(&sem->wait_lock, flags);
}
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
out:
return 0;
out_nolock:
/*
* We didn't take the lock, so that there is a writer, which
* is owner or the first waiter of the sem. If it's a waiter,
* it will be woken by current owner. Not need to wake anybody.
*/
list_del(&waiter.list);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return -EINTR;
}
up_read => __up_read
/*
* release a read lock on the semaphore
*/
void __up_read(struct rw_semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
if (--sem->count == 0 && !list_empty(&sem->wait_list))
sem = __rwsem_wake_one_writer(sem); // 如果是最后一個(gè)占用讀鎖,則喚醒一個(gè)等待寫的task
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
通過上述代碼發(fā)現(xiàn),公平鎖的實(shí)現(xiàn)主要區(qū)別就是增加了對wait_list的size判斷處理邏輯,這也是內(nèi)核實(shí)現(xiàn)的精妙之處。read獲取鎖時(shí)不僅僅需要count非負(fù),而且wait_list必須得為空。如果wait_list非空則說明已有寫操作等待在隊(duì)列中了,后續(xù)的讀操作得排隊(duì)在寫請求之后,在非公平鎖中,這些讀請求會(huì)優(yōu)于寫請求。write lock如果當(dāng)前count不等于0則加入等待隊(duì)列wait_list,并放棄CPU,等待喚醒,否則獲取成功。write unlock則將count置為0,并喚醒等待隊(duì)列中的第一個(gè)task執(zhí)行。read lock如果count非負(fù),且wait_list為空,則獲取鎖成功,count自增1返回,否則,放入就緒隊(duì)列中,等待被喚醒。read unlock 將count自減1,如果count==0,則嘗試喚醒一個(gè)等待寫的task(如果存在的話)。
總結(jié)
至此內(nèi)核鎖相關(guān)的內(nèi)容介紹就差不多了,但是用戶空間的鎖和內(nèi)核空間的鎖存在什么關(guān)系呢?比如java中的ReentrantLock,ReadWriteLock這些的實(shí)現(xiàn)和內(nèi)核鎖有沒有什么關(guān)聯(lián)呢?
用戶空間鎖相關(guān)
內(nèi)核可以很好地通過schedule調(diào)度來進(jìn)行相關(guān)task等待以及喚醒,但是用戶空間并不能直接去處理調(diào)度相關(guān)內(nèi)容,所以用戶空間鎖等待鎖時(shí)是如何掛起線程的呢?我們帶著這個(gè)問題來探討一下相關(guān)內(nèi)容。
對于pthread,大家應(yīng)該不陌生,具體見鏈接,該標(biāo)準(zhǔn)定義了一大堆與thread相關(guān)的接口函數(shù),包括線程,鎖,優(yōu)先級(jí),信號(hào)等等各種資源。用戶空間相關(guān)的鎖就是通過這一套標(biāo)準(zhǔn)與內(nèi)核交互的。
其中與鎖相關(guān)的主要包括:
pthread_spin_lock,自旋鎖
pthread_mutex_lock/unlock,互斥鎖
pthread_rwlock_rdlock,讀鎖
pthread_rwlock_timedrdlock,讀鎖超時(shí)
pthread_rwlock_wrlock,寫鎖
pthread_rwlock_timedwrlock,寫鎖超時(shí)
pthread_rwlock_unlock,釋放讀寫鎖
pthread_cond_wait,條件變量等待
pthread_cond_signal,條件變量通知
pthread_cond_broadcast,條件變量通知所有等待thread
等等函數(shù),在linux中,這些都實(shí)現(xiàn)在glibc庫中,有興趣的同學(xué)可以看一下:https://github.com/bminor/glibc
其中
pthread_spin_lock就是通過while循環(huán)執(zhí)行cas操作
pthread_mutex_lock及pthread_rwlock實(shí)際是通過futex系統(tǒng)調(diào)用來執(zhí)行的,具體見http://man7.org/linux/man-pages/man2/futex.2.html
,其基本思想就是想嘗試在user space通過cas獲取鎖,如果成功就直接返回,如果失敗則進(jìn)入內(nèi)核調(diào)用mutex_lock相關(guān)邏輯,而讀寫鎖相關(guān)邏輯是自己實(shí)現(xiàn)的,其思想與內(nèi)核相關(guān)實(shí)現(xiàn)差不多。
pthread_cond_wait/notify會(huì)結(jié)合信號(hào)機(jī)制及鎖機(jī)制一起合作實(shí)現(xiàn),通過維護(hù)pthread_cond_t中的thread隊(duì)列來進(jìn)行相關(guān)通信及等待,具體咋搞的,我也沒看,有點(diǎn)復(fù)雜,相比內(nèi)核代碼而言,他這個(gè)簡直是太難看懂了0.0。想看的可以去看一看pthread_cond_wait,pthread_cond_signal
java相關(guān)的鎖便是基于pthread 通過JNI封裝了一層調(diào)用,比如
Object wait notify notifyall 對應(yīng)于 pthread_cond_wait pthread_cond_signal
LockSupport park 及 unpark 是 相對于 Object wait notify 的一種更加安全的實(shí)現(xiàn)機(jī)制,其底層也是通過 pthread_cond_wait pthread_cond_signal 實(shí)現(xiàn)。
Object 及 LockSupport c++實(shí)現(xiàn)都是基于os相關(guān)的os::PlatformEvent::park()及os::PlatformEvent::unpark(),而這兩個(gè)函數(shù)在linux中是通過以上兩個(gè)操作pthread_cond_wait 及pthread_cond_signal完成的
ReentrantLock 與 ReadWriteLock這對應(yīng)于 pthread_mutex及pthread_rwlock相關(guān)操作。
相關(guān)內(nèi)容可以看java 8 hotspot park