muduo源碼學(xué)習(xí)(五) 實現(xiàn)TCP網(wǎng)絡(luò)庫(下)

前 言

上一篇文章介紹了連接的創(chuàng)建,引出了TcpConnection類。其作用就是處理socket上的IO事件,執(zhí)行各種回調(diào)。本文介紹TcpConnection對斷開連接、讀取數(shù)據(jù)、發(fā)送數(shù)據(jù)的處理。

斷開連接

連接的關(guān)閉分為主動斷開和被動斷開,兩者的處理方式基本一致。muduo采用的連接關(guān)閉方式:被動斷開,其核心函數(shù)為TcpConnection::handleClose()。書中提到,如果需要主動斷開,添加一個接口調(diào)用handleClose()即可。
對于遠(yuǎn)端連接斷開的感知:在可讀事件處理函數(shù)handleRead()中,當(dāng)read返回值為0時,即遠(yuǎn)端斷開了連接,調(diào)用TcpConnection::handleClose()。此時處理如下:
1. 取消所有關(guān)注的IO事件
2. 調(diào)用用戶注冊回調(diào)ConnectionCallback
3. 調(diào)用closeCallback_(),此回調(diào)綁定到TcpServer::removeConnection()
removeConnection()中處理如下:
4. 將對應(yīng)的TcpConnection對象從TcpServer中移除
5. 調(diào)用TcpConnection::connectDestroyed(),并通過std::bind()TcpConnection對象的生命周期延長到執(zhí)行完成connectDestroyed()
6. 將連接對應(yīng)的ChannelEventLoop中移除
7. TcpConnection析構(gòu),成員socket_引用計數(shù)為0,其析構(gòu)時會調(diào)用close(),關(guān)閉連接的fd

讀取數(shù)據(jù)

新連接建立時,通過TcpConnection::connectEstablished()注冊可讀事件,當(dāng)觸發(fā)可讀事件時調(diào)用回調(diào),即TcpConnection::handleRead(),其主要內(nèi)容如下:

void TcpConnection::handleRead(Timestamp receiveTime) {
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0) {
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    } else if (n == 0) {
        handleClose();
    } else {
        handleError();
    }
}

這里主要進(jìn)行了兩個處理:
1. 讀取數(shù)據(jù)到inputBuffer_中,其使用Buffer::readFd()來實現(xiàn),具體如下。
2. 調(diào)用用戶回調(diào)messageCallback_,此函數(shù)是在建立新連接時,將TcpServer的成員函數(shù)MessageCallback設(shè)置為回調(diào),其由用戶提供。

Buffer::readFd()實現(xiàn)(非源碼)如下:

ssize_t Buffer::readFd(int fd, int & savedErrno) {
    // 申請棧上空間
    char extrabuf[65536];
    struct iovec vec[2];
    const size_t writable = writableBytes();

    // 兩塊iovec分別指向內(nèi)部buffer的可寫空間和棧上空間
    vec[0].iov_base = begin() + m_writerIndex;
    vec[0].iov_len = writable;
    vec[1].iov_base = extrabuf;
    vec[1].iov_len = sizeof extrabuf;

    // 判斷內(nèi)部buffer的可寫空間是否足夠
    const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
    const ssize_t n = sockets::readv(fd, vec, iovcnt);
    if (n < 0) {
        savedErrno = errno;
    } else if (static_cast<size_t>(n) <= writable) {
        // 內(nèi)部空間足夠,直接寫入,移動可寫索引
        m_writerIndex += n;
    } else {
        // 內(nèi)部空間不足,先寫入棧上空間,再將棧上數(shù)據(jù)append到內(nèi)部空間
        m_writerIndex = m_buffer.size();
        append(extrabuf, n - writable);
    }
    return n;
}

書中提到,此實現(xiàn)一是使用了scatter/gather IO(分離/聚散IO),配合內(nèi)部棧空間使用;二是muduo采用level trigger(LT)模式,只需要調(diào)用一次read(2)且不會丟失數(shù)據(jù)。從而兼顧了內(nèi)存使用量和效率。

發(fā)送數(shù)據(jù)

數(shù)據(jù)的發(fā)送通過TcpConnection::send()實現(xiàn),代碼如下:

void TcpConnection::send(const StringPiece& message)
{
    if (state_ == kConnected) {
        if (loop_->isInLoopThread()) {
            sendInLoop(message);
        } else {
            loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message.as_string()));
        }
    }
}

在確保是連接狀態(tài)的情況下,如果在當(dāng)前IO線程觸發(fā)就調(diào)用TcpConnection::sendInLoop(),反之則使用runInLoop將該任務(wù)拋給IO線程執(zhí)行。有關(guān)runInLoop的內(nèi)容在上一篇已經(jīng)介紹過,這里不再贅述。
TcpConnection::sendInLoop()中,處理如下:
1.outputBuffer_為空,直接發(fā)送數(shù)據(jù)
2. 若發(fā)送數(shù)據(jù)沒有寫完,統(tǒng)計剩余的字節(jié)數(shù),將剩余數(shù)據(jù)寫入outputBuffer_
3. 注冊可寫事件
當(dāng)socket可寫時,調(diào)用TcpConnection::handleWrite(),繼續(xù)發(fā)送outputBuffer_中的數(shù)據(jù),一旦發(fā)送完成,立刻將可寫事件移除。
此流程需要注意的是可寫事件觀察的范圍,可以看出只有在outputBuffer_中有數(shù)據(jù)時,才會注冊觀察可寫事件,因為當(dāng)outputBuffer_中沒數(shù)據(jù)時,此時socket一直是處于可寫狀態(tài)的, 這將會導(dǎo)致一直觸發(fā)TcpConnection::handleWrite(),而我們并沒有數(shù)據(jù)需要發(fā)送。所以此觸發(fā)沒有意義,不需要去關(guān)注。

此外,數(shù)據(jù)發(fā)送的流程中:
當(dāng)outputBuffer_中的舊數(shù)據(jù)字節(jié)和剩余數(shù)據(jù)字節(jié)之和大于highWaterMark_時,會將highWaterMarkCallback_放入待執(zhí)行隊列中
當(dāng)數(shù)據(jù)發(fā)送完畢時,會調(diào)用writeCompleteCallback_
兩者配合使用,可以起到限流的作用。

更多內(nèi)容,詳見github NetLib
參考:
《Linux多線程服務(wù)端編程》陳碩 著
muduo源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容