前 言
上一篇文章介紹了連接的創(chuàng)建,引出了TcpConnection類。其作用就是處理socket上的IO事件,執(zhí)行各種回調(diào)。本文介紹TcpConnection對斷開連接、讀取數(shù)據(jù)、發(fā)送數(shù)據(jù)的處理。
斷開連接
連接的關(guān)閉分為主動斷開和被動斷開,兩者的處理方式基本一致。muduo采用的連接關(guān)閉方式:被動斷開,其核心函數(shù)為TcpConnection::handleClose()。書中提到,如果需要主動斷開,添加一個接口調(diào)用handleClose()即可。
對于遠(yuǎn)端連接斷開的感知:在可讀事件處理函數(shù)handleRead()中,當(dāng)read返回值為0時,即遠(yuǎn)端斷開了連接,調(diào)用TcpConnection::handleClose()。此時處理如下:
1. 取消所有關(guān)注的IO事件
2. 調(diào)用用戶注冊回調(diào)ConnectionCallback
3. 調(diào)用closeCallback_(),此回調(diào)綁定到TcpServer::removeConnection()
在removeConnection()中處理如下:
4. 將對應(yīng)的TcpConnection對象從TcpServer中移除
5. 調(diào)用TcpConnection::connectDestroyed(),并通過std::bind()將TcpConnection對象的生命周期延長到執(zhí)行完成connectDestroyed()
6. 將連接對應(yīng)的Channel從EventLoop中移除
7. TcpConnection析構(gòu),成員socket_引用計數(shù)為0,其析構(gòu)時會調(diào)用close(),關(guān)閉連接的fd
讀取數(shù)據(jù)
新連接建立時,通過TcpConnection::connectEstablished()注冊可讀事件,當(dāng)觸發(fā)可讀事件時調(diào)用回調(diào),即TcpConnection::handleRead(),其主要內(nèi)容如下:
void TcpConnection::handleRead(Timestamp receiveTime) {
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
handleError();
}
}
這里主要進(jìn)行了兩個處理:
1. 讀取數(shù)據(jù)到inputBuffer_中,其使用Buffer::readFd()來實現(xiàn),具體如下。
2. 調(diào)用用戶回調(diào)messageCallback_,此函數(shù)是在建立新連接時,將TcpServer的成員函數(shù)MessageCallback設(shè)置為回調(diào),其由用戶提供。
Buffer::readFd()實現(xiàn)(非源碼)如下:
ssize_t Buffer::readFd(int fd, int & savedErrno) {
// 申請棧上空間
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
// 兩塊iovec分別指向內(nèi)部buffer的可寫空間和棧上空間
vec[0].iov_base = begin() + m_writerIndex;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// 判斷內(nèi)部buffer的可寫空間是否足夠
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0) {
savedErrno = errno;
} else if (static_cast<size_t>(n) <= writable) {
// 內(nèi)部空間足夠,直接寫入,移動可寫索引
m_writerIndex += n;
} else {
// 內(nèi)部空間不足,先寫入棧上空間,再將棧上數(shù)據(jù)append到內(nèi)部空間
m_writerIndex = m_buffer.size();
append(extrabuf, n - writable);
}
return n;
}
書中提到,此實現(xiàn)一是使用了scatter/gather IO(分離/聚散IO),配合內(nèi)部棧空間使用;二是muduo采用level trigger(LT)模式,只需要調(diào)用一次read(2)且不會丟失數(shù)據(jù)。從而兼顧了內(nèi)存使用量和效率。
發(fā)送數(shù)據(jù)
數(shù)據(jù)的發(fā)送通過TcpConnection::send()實現(xiàn),代碼如下:
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(message);
} else {
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message.as_string()));
}
}
}
在確保是連接狀態(tài)的情況下,如果在當(dāng)前IO線程觸發(fā)就調(diào)用TcpConnection::sendInLoop(),反之則使用runInLoop將該任務(wù)拋給IO線程執(zhí)行。有關(guān)runInLoop的內(nèi)容在上一篇已經(jīng)介紹過,這里不再贅述。
在TcpConnection::sendInLoop()中,處理如下:
1. 若outputBuffer_為空,直接發(fā)送數(shù)據(jù)
2. 若發(fā)送數(shù)據(jù)沒有寫完,統(tǒng)計剩余的字節(jié)數(shù),將剩余數(shù)據(jù)寫入outputBuffer_
3. 注冊可寫事件
當(dāng)socket可寫時,調(diào)用TcpConnection::handleWrite(),繼續(xù)發(fā)送outputBuffer_中的數(shù)據(jù),一旦發(fā)送完成,立刻將可寫事件移除。
此流程需要注意的是可寫事件觀察的范圍,可以看出只有在outputBuffer_中有數(shù)據(jù)時,才會注冊觀察可寫事件,因為當(dāng)outputBuffer_中沒數(shù)據(jù)時,此時socket一直是處于可寫狀態(tài)的, 這將會導(dǎo)致一直觸發(fā)TcpConnection::handleWrite(),而我們并沒有數(shù)據(jù)需要發(fā)送。所以此觸發(fā)沒有意義,不需要去關(guān)注。
此外,數(shù)據(jù)發(fā)送的流程中:
當(dāng)outputBuffer_中的舊數(shù)據(jù)字節(jié)和剩余數(shù)據(jù)字節(jié)之和大于highWaterMark_時,會將highWaterMarkCallback_放入待執(zhí)行隊列中
當(dāng)數(shù)據(jù)發(fā)送完畢時,會調(diào)用writeCompleteCallback_
兩者配合使用,可以起到限流的作用。
更多內(nèi)容,詳見github NetLib
參考:
《Linux多線程服務(wù)端編程》陳碩 著
muduo源碼