muduo源碼學(xué)習(xí)(四) 實現(xiàn)TCP網(wǎng)絡(luò)庫(中)

runInLoop相關(guān)

在之前得文章中提到了EventLoop::runInLoop(),該函數(shù)用于在EventLoop的IO線程執(zhí)行某個用戶的任務(wù)回調(diào),源碼如下:

void EventLoop::runInLoop(const Functor& cb)
{
    if (isInLoopThread()) { //判斷是否在當(dāng)前IO線程
        cb(); //同步調(diào)用
    } else {
        queueInLoop(cb); //加入隊列
    }
}

若用戶在其他線程調(diào)用runInLoop(),則執(zhí)行queueInLoop(),其實現(xiàn)如下:

void EventLoop::queueInLoop(const Functor& cb)
{
    {
    MutexLockGuard lock(mutex_);
    pendingFunctors_.push_back(cb);
    }

    if (!isInLoopThread() || callingPendingFunctors_) {
        wakeup();
    }
}

該函數(shù)將cb添加到隊列pendingFunctors_中,并喚醒IO線程。

1. 隊列中的回調(diào)是如何觸發(fā)的
EventLoop::loop()的事件循環(huán)中,通過doPendingFunctors()來執(zhí)行隊列中的任務(wù)回調(diào),實現(xiàn)如下:

void EventLoop::loop()
{
    //...
    while (!quit_) {
        //...
        doPendingFunctors();
    }
    //...
}

這里可能會有疑問,IO線程會阻塞在事件循環(huán)EventLoop::loop()poll調(diào)用中,因此通過loop()來觸發(fā)用戶回調(diào),如果EventLoop中一直沒有事件觸發(fā),那poll會一直阻塞,從而導(dǎo)致用戶回調(diào)一直無法執(zhí)行。
所以為了及時觸發(fā)用戶回調(diào),我們需要去喚醒IO線程。

2. 喚醒的實現(xiàn)
書中提到,傳統(tǒng)的做法是使用pipe(2),讓IO線程監(jiān)視此管道的可讀事件。在需要喚醒時,往管道寫入一個字節(jié)來觸發(fā)喚醒。
muduo中使用了eventfd(2)來實現(xiàn)IO線程的喚醒。其不必管理緩沖區(qū),可以更高效地喚醒。
EventLoop構(gòu)造時,創(chuàng)建eventfd并注冊可讀事件,將事件分發(fā)至EventLoop::handleRead()。
和傳統(tǒng)做法一樣,wakeup()的實現(xiàn)就是對eventfd進(jìn)行寫操作,從而觸發(fā)可讀事件達(dá)到喚醒IO線程的目的。

void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
    //...
}

3. doPendingFunctors的實現(xiàn)
上文提到在EventLoop::loop()事件循環(huán)最后,觸發(fā)隊列中的任務(wù)回調(diào),其實現(xiàn)如下:

void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
    MutexLockGuard lock(mutex_);
    functors.swap(pendingFunctors_);
    }

    for (const Functor& functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}

這段代碼有兩處需要注意的:

  • 鎖的范圍
    doPendingFunctors()沒有直接在臨界區(qū)內(nèi)依次調(diào)用任務(wù)回調(diào),而且sawp()到局部變量中(減小了臨界區(qū)的長度)。否則,鎖會一直等到所有回調(diào)函數(shù)處理完才釋放,阻塞其他線程調(diào)用queueInLoop()。

  • callingPendingFunctors_
    代碼中使用callingPendingFunctors_來標(biāo)記是否在執(zhí)行doPendingFunctors()過程中,目的是為了queueInLoop()來判斷喚醒的時機。

4. 喚醒的時機
queueInLoop()中,是否需要weakup()進(jìn)行了這樣的判斷:

if (!isInLoopThread() || callingPendingFunctors_) {
    wakeup();
}

即,當(dāng)調(diào)用queueInLoop()的線程不是IO線程;以及在IO線程調(diào)用queueInLoop(),但此時正在執(zhí)行doPendingFunctors()過程中,才需要喚醒。
因為,當(dāng)queueInLoop()在IO線程中調(diào)用,doPendingFunctors()就會在EventLoop::loop()事件循環(huán)的最后被調(diào)用,所以此時無須喚醒。

TcpServer

創(chuàng)建TcpServer對象,在其構(gòu)造時通過Acceptor來獲得新連接的fd。在Acceptor的構(gòu)造函數(shù)中調(diào)用了[1]socket()[2]bind()
啟動函數(shù)TcpServer::start()通過Acceptor::listen()(以runInLoop的方式)調(diào)用[3]listen(),并注冊了可讀事件。
當(dāng)新連接請求時,觸發(fā)可讀事件,在其回調(diào)函數(shù)Acceptor::handleRead()中調(diào)用了[4]accept()并回調(diào)了TcpServer::newConnection(),其實現(xiàn)如下:

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
    loop_->assertInLoopThread();
    EventLoop* ioLoop = threadPool_->getNextLoop();
    char buf[64];
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    string connName = name_ + buf;

    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    TcpConnectionPtr conn(
        new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

函數(shù)中創(chuàng)建了TcpConnection對象,把它加入ConnectionMap中管理并設(shè)置了相關(guān)回調(diào)。
注:以上的 [1] [2] [3] [4] 完成了一次完整的連接。

TcpConnection

TcpConnection是muduo中唯一默認(rèn)使用智能指針管理的類,也是唯一繼承enable_shared_from_this的類。有關(guān)enable_shared_from_this可以參考share_ptr相關(guān)。
TcpConnection作用就是使用Channel來獲得socket上的IO事件,執(zhí)行各種回調(diào)。相關(guān)事件的處理,后面的文章再具體介紹。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容