什么是eventfd
eventfd是Linux 2.6提供的一種系統(tǒng)調(diào)用,它可以用來(lái)實(shí)現(xiàn)事件通知。eventfd包含一個(gè)由內(nèi)核維護(hù)的64位無(wú)符號(hào)整型計(jì)數(shù)器,創(chuàng)建eventfd時(shí)會(huì)返回一個(gè)文件描述符,進(jìn)程/線程可以通過(guò)對(duì)這個(gè)文件描述符進(jìn)行read/write來(lái)讀取/改變計(jì)數(shù)器的值,從而實(shí)現(xiàn)進(jìn)程/線程間通信。
eventfd
eventfd - 事件通知文件描述符
eventfd系統(tǒng)函數(shù) - 創(chuàng)建一個(gè)能被用戶應(yīng)用程序用于時(shí)間等待喚醒機(jī)制的eventfd對(duì)象。
eventfd 單純的使用文件描述符實(shí)現(xiàn)的線程間的通知機(jī)制,可以很好的融入select、poll、epoll的I/O復(fù)用機(jī)制中。
創(chuàng)建eventfd
#include <sys/eventfd.h>
int eventfd(unsigned int initval ,int flags );
參數(shù)意義:
initval:
創(chuàng)建eventfd時(shí)它所對(duì)應(yīng)的64位計(jì)數(shù)器的初始值;
flags:
eventfd文件描述符的標(biāo)志,可由三種選項(xiàng)組成:EFD_CLOEXEC、EFD_NONBLOCK和EFD_SEMAPHORE。
- EFD_CLOEXEC:表示返回的eventfd文件描述符在fork后exec其他程序時(shí)會(huì)自動(dòng)關(guān)閉這個(gè)文件描述符;
- EFD_NONBLOCK:設(shè)置返回的eventfd非阻塞;
- EFD_SEMAPHORE: 表示將eventfd作為一個(gè)信號(hào)量來(lái)使用。
讀eventfd read(2)
1.read函數(shù)會(huì)從eventfd對(duì)應(yīng)的64位計(jì)數(shù)器中讀取一個(gè)8字節(jié)的整型變量;
2.read函數(shù)設(shè)置的接收buf的大小不能低于8個(gè)字節(jié),否則read函數(shù)會(huì)出錯(cuò),errno為EINVAL;
3.read函數(shù)返回的值是按小端字節(jié)序的;
4.如果eventfd設(shè)置了EFD_SEMAPHORE,那么每次read就會(huì)返回1,并且讓eventfd對(duì)應(yīng)的計(jì)數(shù)器減一;如果eventfd沒(méi)有設(shè)置EFD_SEMAPHORE,那么每次read就會(huì)直接返回計(jì)數(shù)器中的數(shù)值,read之后計(jì)數(shù)器就會(huì)置0。不管是哪一種,當(dāng)計(jì)數(shù)器為0時(shí),如果繼續(xù)read,那么read就會(huì)阻塞(如果eventfd沒(méi)有設(shè)置EFD_NONBLOCK)或者返回EAGAIN錯(cuò)誤(如果eventfd設(shè)置了EFD_NONBLOCK)。
寫(xiě)eventfd write(2)
1.在沒(méi)有設(shè)置EFD_SEMAPHORE的情況下,write函數(shù)會(huì)將發(fā)送buf中的數(shù)據(jù)寫(xiě)入到eventfd對(duì)應(yīng)的計(jì)數(shù)器中,最大只能寫(xiě)入0xffffffffffffffff,否則返回EINVAL錯(cuò)誤;
2.在設(shè)置了EFD_SEMAPHORE的情況下,write函數(shù)相當(dāng)于是向計(jì)數(shù)器中進(jìn)行“添加”,比如說(shuō)計(jì)數(shù)器中的值原本是2,如果write了一個(gè)3,那么計(jì)數(shù)器中的值就變成了5。如果某一次write后,計(jì)數(shù)器中的值超過(guò)了0xfffffffffffffffe(64位最大值-1),那么write就會(huì)阻塞直到另一個(gè)進(jìn)程/線程從eventfd中進(jìn)行了read(如果write沒(méi)有設(shè)置EFD_NONBLOCK),或者返回EAGAIN錯(cuò)誤(如果write設(shè)置了EFD_NONBLOCK)。
除此之外,eventfd還支持select和poll,與一般的讀寫(xiě)描述符相類(lèi)似。
EventLoop對(duì)eventfd的封裝
所增加的接口及成員:
typedef std::function<void()> Functor;
//如果用戶在當(dāng)前IO線程調(diào)用這個(gè)函數(shù), 回調(diào)會(huì)同步進(jìn)行; 如果用戶在其他線程調(diào)用runInLoop(),cb會(huì)被加入隊(duì)列, IO線程會(huì)被喚醒來(lái)調(diào)用這個(gè)Functor。
void runInLoop(const Functor& cb);
//寫(xiě)已注冊(cè)到poll的eventfd 通知poll 處理讀事件。
void wakeup(); //是寫(xiě)m_wakeupFd 通知poll 處理讀事件。
//會(huì)將回調(diào)添加到容器,同時(shí)通過(guò)wakeup()喚醒poll()調(diào)用容器內(nèi)的回調(diào)。
void queueInLoop(const Functor& cb);
private:
//used to waked up
//poll回調(diào)讀事件,處理eventfd
void handleRead();
//處理掛起的事件
void doPendingFunctors();
int m_wakeupFd;
std::unique_ptr<Channel> p_wakeupChannel;
mutable MutexLock m_mutex;
bool m_callingPendingFunctors; /* atomic */
std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_
工作時(shí)序:
(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()
runInLoop()
void EventLoop::runInLoop(const Functor& cb)
{
if(isInloopThread())
cb();
else
queueInLoop(cb);
}
- EventLoop大多數(shù)時(shí)間都在運(yùn)行l(wèi)oop中的循環(huán),也就是說(shuō)想要在本線程內(nèi)執(zhí)行runInLoop只有一種可能,那就是在調(diào)用handleEvents的時(shí)候,對(duì)應(yīng)的文件描述符調(diào)用了runInLoop,此時(shí)處于LoopThread內(nèi),于是直接執(zhí)行cb。
- 再來(lái)看由其他線程調(diào)用runInLoop的情況,首先會(huì)直接進(jìn)入queueInLoop分支,把cb加入到待處理的回調(diào)函數(shù)數(shù)組里。因?yàn)槲覀円孖O線程及時(shí)處理我們的cb,我們勢(shì)必要wakeupIO線程,讓他從poll中出來(lái),所以我們執(zhí)行wakeup。
至于另一種情況,我們先來(lái)假設(shè)在IO線程中也會(huì)調(diào)用queueInLoop,那么這種調(diào)用會(huì)在兩個(gè)地方出現(xiàn),第一是handleEvent,我們此時(shí)無(wú)需wakeup,因?yàn)镮O線程處理完所有的channel之后便會(huì)執(zhí)行doPendingFunctors,處理我們加入的cb。第二種情況是在執(zhí)行doPendingFunctors的時(shí)候,執(zhí)行functorsi時(shí)執(zhí)行到了queueInLoop,那我們?yōu)榱俗孖O線程在執(zhí)行完所有的functors后立即處理我們這個(gè)時(shí)候加入的cb,還要喚醒他,也就是說(shuō)提前喚醒,等到下次IO運(yùn)行到poll直接就返回了,如果不這樣我們的cb可能會(huì)被晚處理一段時(shí)間。
void EventLoop::runInLoop(Functor&&cb)
{
if(isInLoopThread())
cb();
else
queueInLoop(std::move(cb));
}
void EventLoop::queueInLoop(Functor&&cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.emplace_back(std::move(cb));
}
if(!isInLoopThread()||callingPendingFunctors_)
wakeup();
}
void EventLoop::loop()
{
assert(!looping_);
assert(isInLoopThread());
looping_=true;
quit_=false;
std::vector<SP_Channel>ret;
while(!quit_)
{
ret.clear();
ret=poller_->poll();
eventHandlding_=true;
for(auto & it :ret)
it->handleEvents();
eventHandling_=false;
doPendingFunctors();
poller_->handleExpired();
}
looping_=false;
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor>functors;
callingPendingFunctors_=true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for(size_t i=0;i<functors.size();++i)
functors[i]();
callingPendingFunctors_=false;
}
參考鏈接:
Linux進(jìn)程間通信——eventfd
eventfd實(shí)現(xiàn)線程事件通知機(jī)制
關(guān)于muduo庫(kù)中EventLoop的runInLoop功能
muduo庫(kù)發(fā)送數(shù)據(jù)簡(jiǎn)述
muduo網(wǎng)絡(luò)庫(kù)學(xué)習(xí)(四)事件驅(qū)動(dòng)循環(huán)EventLoop
muduo net庫(kù)學(xué)習(xí)筆記4——事件驅(qū)動(dòng)循環(huán)EventLoop、runInLoop和queueInLoop及對(duì)應(yīng)喚醒