runInLoop相關(guān)
在之前得文章中提到了EventLoop::runInLoop(),該函數(shù)用于在EventLoop的IO線程執(zhí)行某個用戶的任務(wù)回調(diào),源碼如下:
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread()) { //判斷是否在當(dāng)前IO線程
cb(); //同步調(diào)用
} else {
queueInLoop(cb); //加入隊列
}
}
若用戶在其他線程調(diào)用runInLoop(),則執(zhí)行queueInLoop(),其實現(xiàn)如下:
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
}
該函數(shù)將cb添加到隊列pendingFunctors_中,并喚醒IO線程。
1. 隊列中的回調(diào)是如何觸發(fā)的
在EventLoop::loop()的事件循環(huán)中,通過doPendingFunctors()來執(zhí)行隊列中的任務(wù)回調(diào),實現(xiàn)如下:
void EventLoop::loop()
{
//...
while (!quit_) {
//...
doPendingFunctors();
}
//...
}
這里可能會有疑問,IO線程會阻塞在事件循環(huán)EventLoop::loop()的poll調(diào)用中,因此通過loop()來觸發(fā)用戶回調(diào),如果EventLoop中一直沒有事件觸發(fā),那poll會一直阻塞,從而導(dǎo)致用戶回調(diào)一直無法執(zhí)行。
所以為了及時觸發(fā)用戶回調(diào),我們需要去喚醒IO線程。
2. 喚醒的實現(xiàn)
書中提到,傳統(tǒng)的做法是使用pipe(2),讓IO線程監(jiān)視此管道的可讀事件。在需要喚醒時,往管道寫入一個字節(jié)來觸發(fā)喚醒。
muduo中使用了eventfd(2)來實現(xiàn)IO線程的喚醒。其不必管理緩沖區(qū),可以更高效地喚醒。
在EventLoop構(gòu)造時,創(chuàng)建eventfd并注冊可讀事件,將事件分發(fā)至EventLoop::handleRead()。
和傳統(tǒng)做法一樣,wakeup()的實現(xiàn)就是對eventfd進(jìn)行寫操作,從而觸發(fā)可讀事件達(dá)到喚醒IO線程的目的。
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
//...
}
3. doPendingFunctors的實現(xiàn)
上文提到在EventLoop::loop()事件循環(huán)最后,觸發(fā)隊列中的任務(wù)回調(diào),其實現(xiàn)如下:
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
這段代碼有兩處需要注意的:
鎖的范圍
doPendingFunctors()沒有直接在臨界區(qū)內(nèi)依次調(diào)用任務(wù)回調(diào),而且sawp()到局部變量中(減小了臨界區(qū)的長度)。否則,鎖會一直等到所有回調(diào)函數(shù)處理完才釋放,阻塞其他線程調(diào)用queueInLoop()。callingPendingFunctors_
代碼中使用callingPendingFunctors_來標(biāo)記是否在執(zhí)行doPendingFunctors()過程中,目的是為了queueInLoop()來判斷喚醒的時機。
4. 喚醒的時機
在queueInLoop()中,是否需要weakup()進(jìn)行了這樣的判斷:
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
即,當(dāng)調(diào)用queueInLoop()的線程不是IO線程;以及在IO線程調(diào)用queueInLoop(),但此時正在執(zhí)行doPendingFunctors()過程中,才需要喚醒。
因為,當(dāng)queueInLoop()在IO線程中調(diào)用,doPendingFunctors()就會在EventLoop::loop()事件循環(huán)的最后被調(diào)用,所以此時無須喚醒。
TcpServer
創(chuàng)建TcpServer對象,在其構(gòu)造時通過Acceptor來獲得新連接的fd。在Acceptor的構(gòu)造函數(shù)中調(diào)用了[1]socket()和[2]bind()。
啟動函數(shù)TcpServer::start()通過Acceptor::listen()(以runInLoop的方式)調(diào)用[3]listen(),并注冊了可讀事件。
當(dāng)新連接請求時,觸發(fā)可讀事件,在其回調(diào)函數(shù)Acceptor::handleRead()中調(diào)用了[4]accept()并回調(diào)了TcpServer::newConnection(),其實現(xiàn)如下:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(
new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
函數(shù)中創(chuàng)建了TcpConnection對象,把它加入ConnectionMap中管理并設(shè)置了相關(guān)回調(diào)。
注:以上的 [1] [2] [3] [4] 完成了一次完整的連接。
TcpConnection
TcpConnection是muduo中唯一默認(rèn)使用智能指針管理的類,也是唯一繼承enable_shared_from_this的類。有關(guān)enable_shared_from_this可以參考share_ptr相關(guān)。
TcpConnection作用就是使用Channel來獲得socket上的IO事件,執(zhí)行各種回調(diào)。相關(guān)事件的處理,后面的文章再具體介紹。