Connector類
Connector只負(fù)責(zé)建立socket連接,不負(fù)責(zé)創(chuàng)建TcpConnection對象,它的newConnectionCallback回調(diào)的參數(shù)是socket文件描述符.Muduo中的Connector帶有自動(dòng)重連的功能,并且重試的事件間隔逐漸延長,直至30s.與被動(dòng)連接的Acceptor類相比,缺少一個(gè)acceptSocket_ 成員,因?yàn)镃onnector 是創(chuàng)建一個(gè)新的sockfd 并connect 它.因?yàn)槲覀冞@的Connector是主動(dòng)發(fā)起連接.
調(diào)用函數(shù)的時(shí)序?yàn)?Connector::start->Connector::stopInLoop->Connector::connect.
其中的Connector::connect()函數(shù):
void Connector::connect()
{
int sockfd = sockets::createNonblockingOrDie(); // 創(chuàng)建非阻塞套接字
int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS: // 非阻塞套接字,未連接成功返回碼是EINPROGRESS表示正在連接
case EINTR:
case EISCONN: // 連接成功
connecting(sockfd);
break;
case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd); // 重連
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd); // 不能重連,關(guān)閉sockfd
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
其中會(huì)根據(jù)savedError來選擇適當(dāng)?shù)膱?zhí)行函數(shù),或者retry()或者connecting().
connecting()函數(shù):
void Connector::connecting(int sockfd)
{
setState(kConnecting);
assert(!channel_);
// Channel與sockfd關(guān)聯(lián)
channel_.reset(new Channel(loop_, sockfd));
// 設(shè)置可寫回調(diào)函數(shù),這時(shí)候如果socket沒有錯(cuò)誤,sockfd就處于可寫狀態(tài)
channel_->setWriteCallback(
boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
// 設(shè)置錯(cuò)誤回調(diào)函數(shù)
channel_->setErrorCallback(
boost::bind(&Connector::handleError, this)); // FIXME: unsafe
channel_->enableWriting(); // 讓Poller關(guān)注可寫事件
}
當(dāng)調(diào)用此函數(shù)時(shí),sockfd便處于可寫的狀態(tài)(內(nèi)核緩沖區(qū)不為滿),而且poller關(guān)注可寫的事件,觸發(fā)回調(diào)Connector::handleWrite()函數(shù),在handleWrite()里面需要removeAndResetChannel(),因此此時(shí)連接建立,故不用再關(guān)注channel的可寫事件,最終會(huì)執(zhí)行 channel_.reset(); 即把channel析構(gòu)了.連接成功后調(diào)用newConnectionCallback_(sockfd).
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel(); // 從poller中移除關(guān)注,并將channel置空
// socket可寫并不意味著連接一定建立成功
// 還需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, ...)再次確認(rèn)一下。
int err = sockets::getSocketError(sockfd);
......
else // 連接成功
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd); // 回調(diào)
}
}
}
}
測試用例
#include "Connector.h"
#include "EventLoop.h"
#include <stdio.h>
muduo::EventLoop* g_loop;
void connectCallback(int sockfd)
{
printf("connected.\n");
g_loop->quit();
}
int main(int argc, char* argv[])
{
muduo::EventLoop loop;
g_loop = &loop;
muduo::InetAddress addr("127.0.0.1", 9981);
muduo::ConnectorPtr connector(new muduo::Connector(&loop, addr));
connector->setNewConnectionCallback(connectCallback);
connector->start();
loop.loop();
}
運(yùn)行結(jié)果:
20191013 11:29:40.428847Z 14273 DEBUG Connector ctor[0x1e76f40] - Connector.cc:31
-1
115
20191013 11:29:40.429139Z 14273 WARN Channel::handle_event() POLLHUP - Channel.cc:50
20191013 11:29:40.429157Z 14273 ERROR Connector::handleError - Connector.cc:190
20191013 11:29:40.429223Z 14273 INFO Connector::retry - Retry connecting to 127.0.0.1:9981 in 500 milliseconds. - Connector.cc:205
一次連接不成功,會(huì)一直反復(fù)嘗試,直到連接成功.
實(shí)際上,Connector類一般不會(huì)單獨(dú)使用,一般是作為TcpClient的成員,其與TcpServer類較為相似(都有newConnection和removeConnection這兩個(gè)成員函數(shù)),但每個(gè)TCPClient只管理一個(gè)TcpConnection,加上Connector之后,具備了TcpConnection斷開之后重新連接的功能.還有一點(diǎn)不同是在于TcpServer可以有多個(gè)Reactor,即mainReactor_Threadpool(subReactor)模式,但TcpClient只能有一個(gè)事件循環(huán)Eventloop,即也只能有一個(gè)Reactor,由它來處理TcpConnection上的事件.當(dāng)然我們可以多開幾個(gè)TcpClient一起綁定到同一個(gè)EventLoop對象上,也就是多個(gè)TcpConnection.
TcpClient類
數(shù)據(jù)成員:
typedef boost::shared_ptr<Connector> ConnectorPtr;
ConnectorPtr connector_; // 用于主動(dòng)發(fā)起連接
TcpConnectionPtr connection_; // Connector連接成功以后,得到一個(gè)TcpConnection,只管理一個(gè)TcpConnection
在構(gòu)造函數(shù)中:
//設(shè)置連接成功回調(diào)函數(shù)
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
主動(dòng)發(fā)起連接成功之后,會(huì)運(yùn)行TcpClient::newConnection()函數(shù).
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn; // 保存TcpConnection
}
conn->connectEstablished(); // 這里回調(diào)connectionCallback_
}
另外一點(diǎn),TcpServer或者TcpClient對象,通過調(diào)用setxxxxCallback()函數(shù)來注冊回調(diào)函數(shù),最終都會(huì)實(shí)際上設(shè)置的是TcpConnection對象的xxxxCallback_成員,這些函數(shù)會(huì)在有事件發(fā)生時(shí)被調(diào)用,比如OnConnection()和OnMessage().
測試用例
服務(wù)端:echo回射服務(wù)器
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
string msg1 = buf->retrieveAllAsString();
string msg(msg1);
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
msg.size(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
conn->send(msg);
}
EventLoop* loop_;
TcpServer server_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
客戶端程序:
#include <muduo/net/Channel.h>
#include <muduo/net/TcpClient.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestClient
{
public:
TestClient(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
client_(loop, listenAddr, "TestClient"),
stdinChannel_(loop, 0)
{
client_.setConnectionCallback(
boost::bind(&TestClient::onConnection, this, _1));
client_.setMessageCallback(
boost::bind(&TestClient::onMessage, this, _1, _2, _3));
//client_.enableRetry();
// 標(biāo)準(zhǔn)輸入緩沖區(qū)中有數(shù)據(jù)的時(shí)候,回調(diào)TestClient::handleRead
stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this));
stdinChannel_.enableReading(); // 關(guān)注可讀事件
}
void connect()
{
client_.connect();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
{
string msg1 = buf->retrieveAllAsString();
string msg(msg1);
printf("onMessage(): recv a message [%s]\n", msg.c_str());
LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString();
}
// 標(biāo)準(zhǔn)輸入緩沖區(qū)中有數(shù)據(jù)的時(shí)候,回調(diào)該函數(shù)
void handleRead()
{
char buf[1024] = {0};
fgets(buf, 1024, stdin);
buf[strlen(buf)-1] = '\0'; // 去除\n
client_.connection()->send(buf);
}
EventLoop* loop_;
TcpClient client_;
Channel stdinChannel_; // 標(biāo)準(zhǔn)輸入Channel
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
EventLoop loop;
InetAddress serverAddr("127.0.0.1", 8888);
TestClient client(&loop, serverAddr);
client.connect();
loop.loop();
}
分別運(yùn)行兩個(gè)程序,并在客戶端中輸入"aaaa",會(huì)被回射回來,運(yùn)行結(jié)果:
服務(wù)端輸出:
20191013 12:03:25.884639Z 19542 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51
main(): pid = 19542
20191013 12:03:25.884858Z 19542 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 12:03:25.884905Z 19542 TRACE EventLoop EventLoop created 0x7FFEBA471BD0 in thread 19542 - EventLoop.cc:76
20191013 12:03:25.884924Z 19542 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 12:03:25.885039Z 19542 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191013 12:03:25.885059Z 19542 TRACE loop EventLoop 0x7FFEBA471BD0 start looping - EventLoop.cc:108
20191013 12:03:28.221151Z 19542 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:28.221476Z 19542 TRACE printActiveChannels {6: IN } - EventLoop.cc:271
20191013 12:03:28.221549Z 19542 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:53952 - TcpServer.cc:93
20191013 12:03:28.221599Z 19542 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x12E2680 fd=8 - TcpConnection.cc:65
20191013 12:03:28.221617Z 19542 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191013 12:03:28.221646Z 19542 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191013 12:03:28.221664Z 19542 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:238
20191013 12:03:28.221673Z 19542 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:53952
20191013 12:03:28.221717Z 19542 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:243
20191013 12:03:28.221729Z 19542 TRACE newConnection [5] usecount=2 - TcpServer.cc:123
20191013 12:03:29.640547Z 19542 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:29.640608Z 19542 TRACE printActiveChannels{8: IN } - EventLoop.cc:271
20191013 12:03:29.640623Z 19542 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 4 bytes from connection [TestServer:0.0.0.0:8888#1] at 20191013 12:03:29.640545
20191013 12:03:29.640733Z 19542 TRACE handleEvent [12] usecount=2 - Channel.cc:69
^C
輸出中fd = 6是監(jiān)聽套接字,fd=8是返回的已連接套接字,連接建立調(diào)用OnConnection(),因?yàn)榭蛻舳溯斎雰纱當(dāng)?shù)據(jù),fd=8產(chǎn)生兩次可讀事件,調(diào)用兩次onMessage().
客戶端輸出:
20191013 12:03:28.220602Z 19543 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51
20191013 12:03:28.220775Z 19543 INFO pid = 19543, tid = 19543 - TcpClient_test.cc:77
20191013 12:03:28.220821Z 19543 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 12:03:28.220869Z 19543 TRACE EventLoop EventLoop created 0x7FFD9CF15DB0 in thread 19543 - EventLoop.cc:76
20191013 12:03:28.220893Z 19543 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 12:03:28.220924Z 19543 DEBUG Connector ctor[0xFCE4A0] - Connector.cc:33
20191013 12:03:28.220942Z 19543 INFO TcpClient::TcpClient[TestClient] - connector 0xFCE4A0 - TcpClient.cc:72
20191013 12:03:28.220957Z 19543 TRACE updateChannel fd = 0 events = 3 - EPollPoller.cc:104
20191013 12:03:28.220980Z 19543 INFO TcpClient::connect[TestClient] - connecting to 127.0.0.1:8888 - TcpClient.cc:106
20191013 12:03:28.221114Z 19543 TRACE updateChannel fd = 6 events = 4 - EPollPoller.cc:104
20191013 12:03:28.221146Z 19543 TRACE loop EventLoop 0x7FFD9CF15DB0 start looping - EventLoop.cc:108
20191013 12:03:28.221165Z 19543 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:28.221423Z 19543 TRACE printActiveChannels {6: OUT } - EventLoop.cc:271
20191013 12:03:28.221441Z 19543 TRACE handleWrite Connector::handleWrite 1 - Connector.cc:169
20191013 12:03:28.221458Z 19543 TRACE updateChannel fd = 6 events = 0 - EPollPoller.cc:104
20191013 12:03:28.221479Z 19543 TRACE removeChannel fd = 6 - EPollPoller.cc:147
20191013 12:03:28.221553Z 19543 DEBUG TcpConnection TcpConnection::ctor[TestClient:127.0.0.1:8888#1] at 0xFCE6F0 fd=6 - TcpConnection.cc:65
20191013 12:03:28.221585Z 19543 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:238
20191013 12:03:28.221594Z 19543 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestClient:127.0.0.1:8888#1] from 127.0.0.1:8888
20191013 12:03:28.221653Z 19543 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:243
aaaa
20191013 12:03:29.640345Z 19543 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:29.640415Z 19543 TRACE printActiveChannels {0: IN } - EventLoop.cc:271
20191013 12:03:29.640769Z 19543 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:29.640819Z 19543 TRACE printActiveChannels {6: IN } - EventLoop.cc:271
20191013 12:03:29.640834Z 19543 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): recv a message [aaaa]
20191013 12:03:29.640894Z 19543 TRACE onMessage TestClient:127.0.0.1:8888#1 recv 4 bytes at 20191013 12:03:29.640768 - TcpClient_test.cc:58
20191013 12:03:29.640914Z 19543 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 12:03:32.560680Z 19543 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 12:03:32.560750Z 19543 TRACE printActiveChannels {6: IN } - EventLoop.cc:271
20191013 12:03:32.560768Z 19543 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191013 12:03:32.560795Z 19543 TRACE handleClose fd = 6 state = 2 - TcpConnection.cc:369
20191013 12:03:32.560805Z 19543 TRACE updateChannel fd = 6 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestClient:127.0.0.1:8888#1] is down
20191013 12:03:32.560843Z 19543 TRACE handleClose [7] usecount=3 - TcpConnection.cc:377
20191013 12:03:32.560865Z 19543 TRACE handleClose [11] usecount=3 - TcpConnection.cc:380
20191013 12:03:32.560874Z 19543 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 12:03:32.560883Z 19543 TRACE removeChannel fd = 6 - EPollPoller.cc:147
20191013 12:03:32.560897Z 19543 DEBUG ~TcpConnection TcpConnection::dtor[TestClient:127.0.0.1:8888#1] at 0xFCE6F0 fd=6 - TcpConnection.cc:72
20191013 12:03:42.561752Z 19543 TRACE poll nothing happended - EPollPoller.cc:74
20191013 12:03:52.571864Z 19543 TRACE poll nothing happended - EPollPoller.cc:74
由前面的分析可知,一個(gè)進(jìn)程默認(rèn)打開0(輸入),1(輸出),2(錯(cuò)誤)文件描述符,該例子中0為標(biāo)準(zhǔn)輸入,且mainReactor中:epollfd_ = 3; timerfd_ = 4; wakeupFd_ = 5; sockfd_ = 6; idleFd_ = 7;,fd=6是客戶端連接的套接字,剛開始連接成功,fd=6可寫事件發(fā)生,但馬上把connector的channel移除關(guān)注并析構(gòu),并構(gòu)造TcpConnection。在命令行輸入一串?dāng)?shù)據(jù),標(biāo)準(zhǔn)輸入可讀事件發(fā)生,等服務(wù)器回射回來,fd=6可讀事件發(fā)生,調(diào)用OnMessage().我們首先ctrl+c 掉服務(wù)器,客戶端發(fā)現(xiàn)此連接已經(jīng)down掉,就會(huì)析構(gòu)TcpConnection,順便關(guān)閉套接字,當(dāng)然事件循環(huán)還在繼續(xù),因?yàn)槿缜懊嫠f,有可能EventLoop綁定了多個(gè)TcpClient。