操作系統(tǒng)知識(shí)點(diǎn)總結(jié)--eventfd實(shí)現(xiàn)線程事件通知機(jī)制

什么是eventfd

eventfd是Linux 2.6提供的一種系統(tǒng)調(diào)用,它可以用來(lái)實(shí)現(xiàn)事件通知。eventfd包含一個(gè)由內(nèi)核維護(hù)的64位無(wú)符號(hào)整型計(jì)數(shù)器,創(chuàng)建eventfd時(shí)會(huì)返回一個(gè)文件描述符,進(jìn)程/線程可以通過(guò)對(duì)這個(gè)文件描述符進(jìn)行read/write來(lái)讀取/改變計(jì)數(shù)器的值,從而實(shí)現(xiàn)進(jìn)程/線程間通信。

eventfd

eventfd - 事件通知文件描述符
eventfd系統(tǒng)函數(shù) - 創(chuàng)建一個(gè)能被用戶應(yīng)用程序用于時(shí)間等待喚醒機(jī)制的eventfd對(duì)象。
eventfd 單純的使用文件描述符實(shí)現(xiàn)的線程間的通知機(jī)制,可以很好的融入select、poll、epoll的I/O復(fù)用機(jī)制中。

創(chuàng)建eventfd

#include <sys/eventfd.h>
int eventfd(unsigned int initval ,int flags );
參數(shù)意義:
initval:

創(chuàng)建eventfd時(shí)它所對(duì)應(yīng)的64位計(jì)數(shù)器的初始值;

flags:

eventfd文件描述符的標(biāo)志,可由三種選項(xiàng)組成:EFD_CLOEXEC、EFD_NONBLOCK和EFD_SEMAPHORE。

  1. EFD_CLOEXEC:表示返回的eventfd文件描述符在fork后exec其他程序時(shí)會(huì)自動(dòng)關(guān)閉這個(gè)文件描述符;
  2. EFD_NONBLOCK:設(shè)置返回的eventfd非阻塞;
  3. EFD_SEMAPHORE: 表示將eventfd作為一個(gè)信號(hào)量來(lái)使用。

讀eventfd read(2)

1.read函數(shù)會(huì)從eventfd對(duì)應(yīng)的64位計(jì)數(shù)器中讀取一個(gè)8字節(jié)的整型變量;
2.read函數(shù)設(shè)置的接收buf的大小不能低于8個(gè)字節(jié),否則read函數(shù)會(huì)出錯(cuò),errno為EINVAL;
3.read函數(shù)返回的值是按小端字節(jié)序的;
4.如果eventfd設(shè)置了EFD_SEMAPHORE,那么每次read就會(huì)返回1,并且讓eventfd對(duì)應(yīng)的計(jì)數(shù)器減一;如果eventfd沒(méi)有設(shè)置EFD_SEMAPHORE,那么每次read就會(huì)直接返回計(jì)數(shù)器中的數(shù)值,read之后計(jì)數(shù)器就會(huì)置0。不管是哪一種,當(dāng)計(jì)數(shù)器為0時(shí),如果繼續(xù)read,那么read就會(huì)阻塞(如果eventfd沒(méi)有設(shè)置EFD_NONBLOCK)或者返回EAGAIN錯(cuò)誤(如果eventfd設(shè)置了EFD_NONBLOCK)。

寫(xiě)eventfd write(2)

1.在沒(méi)有設(shè)置EFD_SEMAPHORE的情況下,write函數(shù)會(huì)將發(fā)送buf中的數(shù)據(jù)寫(xiě)入到eventfd對(duì)應(yīng)的計(jì)數(shù)器中,最大只能寫(xiě)入0xffffffffffffffff,否則返回EINVAL錯(cuò)誤;
2.在設(shè)置了EFD_SEMAPHORE的情況下,write函數(shù)相當(dāng)于是向計(jì)數(shù)器中進(jìn)行“添加”,比如說(shuō)計(jì)數(shù)器中的值原本是2,如果write了一個(gè)3,那么計(jì)數(shù)器中的值就變成了5。如果某一次write后,計(jì)數(shù)器中的值超過(guò)了0xfffffffffffffffe(64位最大值-1),那么write就會(huì)阻塞直到另一個(gè)進(jìn)程/線程從eventfd中進(jìn)行了read(如果write沒(méi)有設(shè)置EFD_NONBLOCK),或者返回EAGAIN錯(cuò)誤(如果write設(shè)置了EFD_NONBLOCK)。

除此之外,eventfd還支持select和poll,與一般的讀寫(xiě)描述符相類(lèi)似。

EventLoop對(duì)eventfd的封裝

所增加的接口及成員:

typedef std::function<void()> Functor;
    //如果用戶在當(dāng)前IO線程調(diào)用這個(gè)函數(shù), 回調(diào)會(huì)同步進(jìn)行; 如果用戶在其他線程調(diào)用runInLoop(),cb會(huì)被加入隊(duì)列, IO線程會(huì)被喚醒來(lái)調(diào)用這個(gè)Functor。
    void runInLoop(const Functor& cb);

    //寫(xiě)已注冊(cè)到poll的eventfd 通知poll 處理讀事件。
    void wakeup(); //是寫(xiě)m_wakeupFd 通知poll 處理讀事件。

    //會(huì)將回調(diào)添加到容器,同時(shí)通過(guò)wakeup()喚醒poll()調(diào)用容器內(nèi)的回調(diào)。
    void queueInLoop(const Functor& cb);

private:
    //used to waked up

    //poll回調(diào)讀事件,處理eventfd
    void handleRead();

    //處理掛起的事件
    void doPendingFunctors();


    int m_wakeupFd;
    std::unique_ptr<Channel> p_wakeupChannel;
    mutable MutexLock m_mutex;
    bool m_callingPendingFunctors; /* atomic */
    std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_

工作時(shí)序:
(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()

runInLoop()
void EventLoop::runInLoop(const Functor&  cb)
{
  if(isInloopThread())
    cb();
  else
    queueInLoop(cb);
}
  1. EventLoop大多數(shù)時(shí)間都在運(yùn)行l(wèi)oop中的循環(huán),也就是說(shuō)想要在本線程內(nèi)執(zhí)行runInLoop只有一種可能,那就是在調(diào)用handleEvents的時(shí)候,對(duì)應(yīng)的文件描述符調(diào)用了runInLoop,此時(shí)處于LoopThread內(nèi),于是直接執(zhí)行cb。
  2. 再來(lái)看由其他線程調(diào)用runInLoop的情況,首先會(huì)直接進(jìn)入queueInLoop分支,把cb加入到待處理的回調(diào)函數(shù)數(shù)組里。因?yàn)槲覀円孖O線程及時(shí)處理我們的cb,我們勢(shì)必要wakeupIO線程,讓他從poll中出來(lái),所以我們執(zhí)行wakeup。
    至于另一種情況,我們先來(lái)假設(shè)在IO線程中也會(huì)調(diào)用queueInLoop,那么這種調(diào)用會(huì)在兩個(gè)地方出現(xiàn),第一是handleEvent,我們此時(shí)無(wú)需wakeup,因?yàn)镮O線程處理完所有的channel之后便會(huì)執(zhí)行doPendingFunctors,處理我們加入的cb。第二種情況是在執(zhí)行doPendingFunctors的時(shí)候,執(zhí)行functorsi時(shí)執(zhí)行到了queueInLoop,那我們?yōu)榱俗孖O線程在執(zhí)行完所有的functors后立即處理我們這個(gè)時(shí)候加入的cb,還要喚醒他,也就是說(shuō)提前喚醒,等到下次IO運(yùn)行到poll直接就返回了,如果不這樣我們的cb可能會(huì)被晚處理一段時(shí)間。
void EventLoop::runInLoop(Functor&&cb)
{
    if(isInLoopThread())
        cb();
    else
        queueInLoop(std::move(cb));
}
 
void EventLoop::queueInLoop(Functor&&cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.emplace_back(std::move(cb));
    }
    if(!isInLoopThread()||callingPendingFunctors_)
        wakeup();
}
void EventLoop::loop()
{
    assert(!looping_);
    assert(isInLoopThread());
    looping_=true;
    quit_=false;
    std::vector<SP_Channel>ret;
    while(!quit_)
    {
        ret.clear();
        ret=poller_->poll();
        eventHandlding_=true;
        for(auto & it :ret)
            it->handleEvents();
        eventHandling_=false;
        doPendingFunctors();
        poller_->handleExpired();
    }
    looping_=false;
}
void EventLoop::doPendingFunctors()
{
    std::vector<Functor>functors;
    callingPendingFunctors_=true;
 
    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    for(size_t i=0;i<functors.size();++i)
        functors[i]();
    callingPendingFunctors_=false;
}

參考鏈接:
Linux進(jìn)程間通信——eventfd
eventfd實(shí)現(xiàn)線程事件通知機(jī)制
關(guān)于muduo庫(kù)中EventLoop的runInLoop功能
muduo庫(kù)發(fā)送數(shù)據(jù)簡(jiǎn)述
muduo網(wǎng)絡(luò)庫(kù)學(xué)習(xí)(四)事件驅(qū)動(dòng)循環(huán)EventLoop
muduo net庫(kù)學(xué)習(xí)筆記4——事件驅(qū)動(dòng)循環(huán)EventLoop、runInLoop和queueInLoop及對(duì)應(yīng)喚醒

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容