1.發(fā)送數(shù)據(jù)
首先, 消息的發(fā)送動作(write)一定是在IO線程中進行的。為什么這樣做呢?
這是為了保證應(yīng)用層消息發(fā)送的順序性。
當非IO線程想使用套接字發(fā)送消息時,會把消息傳送給IO線程(sendInLoop)。為什么不會在epoll循環(huán)中一直保持注冊的寫事件?
muduo的epoll使用的是LT觸發(fā)模式。在LT模式EPOLLOUT只要在寫緩沖區(qū)有空間下,就會觸發(fā),因此會造成busy loop。muduo的發(fā)送數(shù)據(jù)(TcpConnection::send)是
(1)先嘗試直接發(fā)送數(shù)據(jù)。當然為了保證順序性,應(yīng)當在應(yīng)用層緩沖outputBuffer_中沒有數(shù)據(jù)的情況下,才能直接write。
(2)如果一次write沒有發(fā)送完全,也就是內(nèi)核寫緩沖區(qū)可能塞滿,會注冊EPOLLOUT等待緩沖區(qū)可寫后, 通過回調(diào)(TcpConnection::handleWrite)執(zhí)行寫。
每次寫只調(diào)用一次write,原因是如果第一次write沒發(fā)完,那么第二次幾乎肯定會EAGAIN。
(3)當outputBuffer_沒數(shù)據(jù)后,會在epoll中停止觀察可寫事件,避免觸發(fā)EPOLLOUT引起busy loop。由于muduo只接受被動關(guān)閉(保證收發(fā)數(shù)據(jù)完整性)。當我們想在服務(wù)端寫關(guān)閉的時候,需要先把outputBuffer_都發(fā)送完,才可調(diào)用shutdown。因此,當kDisconnecting狀態(tài)(寫關(guān)閉)時,需要在TcpConnection中,主動調(diào)用shutdownInLoop來完成TCP協(xié)議棧的shutdown write。
muduo在寫事件中,不處理錯誤而是放在讀事件處理。一旦錯誤,read會讀到0字節(jié),繼而關(guān)閉連接。
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
2.接收數(shù)據(jù)
- 接收數(shù)據(jù)沒有特殊的邏輯處理。對于read返回0或者error會關(guān)閉這個套接字。
- readFd只調(diào)用一次read,沒有反復read直至EAGAIN,這樣做書中解釋是為了兼顧多個連接的公平性,降低延遲,不會因為某個連接數(shù)據(jù)過大而使得該IO線程其他Fd得不到處理。
【總結(jié)自】 Linux多線程服務(wù)端編程:使用muduo C++網(wǎng)絡(luò)庫(陳碩)
文中有些是自己的理解,如有錯誤請留言。