目錄:
該系列文章預計包括:
- 1 - 簡介
- 2 - 連接以及連接過程解析
- 3 - 發(fā)送訂閱消息以及發(fā)送過程
- 4 - 各個類的解析
之前一直都忘記這個東西了……其實就是懶癌發(fā)作了。
一、 連接 (2019年02月23日更新)
- (void)connectTo:(NSString *)host
port:(NSInteger)port
tls:(BOOL)tls
keepalive:(NSInteger)keepalive
clean:(BOOL)clean
auth:(BOOL)auth
user:(NSString *)user
pass:(NSString *)pass
willTopic:(NSString *)willTopic
will:(NSData *)will
willQos:(MQTTQosLevel)willQos
willRetainFlag:(BOOL)willRetainFlag
withClientId:(NSString *)clientId
securityPolicy:(MQTTSSLSecurityPolicy *)securityPolicy
certificates:(NSArray *)certificates
protocolLevel:(MQTTProtocolVersion)protocolLevel
connectHandler:(MQTTConnectHandler)connectHandler;
映入眼簾就是一大把參數(shù),不要被這個嚇著了,我們一個參數(shù)一個參數(shù)的看
host : 地址
port : 端口
tls: 加密 ssl
keepalive: ?;顣r間
clean: 斷開的時候是否清理
auth:授權(quán)
user: 用戶名
pass: 密碼
willTopic: 主題
will: 數(shù)據(jù) 訂閱主題之后發(fā)送的數(shù)據(jù)
willQos: 消息級別在第一篇文章有介紹
MQTTQosLevelAtMostOnce = 0, 最多一次
MQTTQosLevelAtLeastOnce = 1,最少一次
MQTTQosLevelExactlyOnce = 2 只有一次
willRetainFlag:服務器是否重發(fā)
clientID:客戶端ID如果不寫會自己隨機生成一個
securityPolicy: 安全策略對象
certificates:證書數(shù)組
protocolLevel: 協(xié)議版本
MQTTProtocolVersion0 = 0,
MQTTProtocolVersion31 = 3,
MQTTProtocolVersion311 = 4,
MQTTProtocolVersion50 = 5
connectHandler: 連接回調(diào)
調(diào)用這個就創(chuàng)建Session,這里需要我們設(shè)置代理去監(jiān)聽我們連接的狀態(tài)以及消息發(fā)布和訂閱都會在delegate中。
如果你點擊進去看過源碼就會發(fā)現(xiàn)內(nèi)部會創(chuàng)建一個Session。
如果你不用系統(tǒng)給你的manager就需要自己創(chuàng)建session,并且切記設(shè)置代理。還需要注意的是Session的Transport是可能不一樣的,假設(shè)我使用的WEBSocket的就需要把Transport設(shè)置為MQTTWebsocketTransport,否則就用MQTTCFSocketTransport
self.session = [[MQTTSession alloc] initWithClientId:clientId
userName:auth ? user : nil
password:auth ? pass : nil
keepAlive:keepalive
cleanSession:clean
will:will
willTopic:willTopic
willMsg:willMsg
willQoS:willQos
willRetainFlag:willRetainFlag
protocolLevel:protocolLevel
runLoop:[NSRunLoop currentRunLoop]
forMode:NSDefaultRunLoopMode
securityPolicy:securityPolicy
certificates:certificates
transportType:MQTTTransportTypeWebSocket];
MQTTCoreDataPersistence *persistence = [[MQTTCoreDataPersistence alloc] init];
persistence.persistent = self.persistent;
persistence.maxWindowSize = self.maxWindowSize;
persistence.maxSize = self.maxSize;
persistence.maxMessages = self.maxMessages;
self.session.persistence = persistence;
現(xiàn)在再來看看delegate有哪些方法,這里拿重要的說:
1、收到訂閱通道的消息就會來這個回調(diào)
- (void)newMessage:(MQTTSession *)session
data:(NSData *)data
onTopic:(NSString *)topic
qos:(MQTTQosLevel)qos
retained:(BOOL)retained
mid:(unsigned int)mid;
2、MQTT的連接狀態(tài)
- (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error;
3、連接部分的代理
/** gets called when a connection has been successfully established
@param session the MQTTSession reporting the connect
*/
- (void)connected:(MQTTSession *)session;
/** gets called when a connection has been successfully established
@param session the MQTTSession reporting the connect
@param sessionPresent represents the Session Present flag sent by the broker
*/
- (void)connected:(MQTTSession *)session sessionPresent:(BOOL)sessionPresent;
/** gets called when a connection has been refused
@param session the MQTTSession reporting the refusal
@param error an optional additional error object with additional information
*/
- (void)connectionRefused:(MQTTSession *)session error:(NSError *)error;
/** gets called when a connection has been closed
@param session the MQTTSession reporting the close
*/
- (void)connectionClosed:(MQTTSession *)session;
/** gets called when a connection error happened
@param session the MQTTSession reporting the connect error
@param error an optional additional error object with additional information
*/
- (void)connectionError:(MQTTSession *)session error:(NSError *)error;
通常我們在連接成功之后就可以發(fā)布信息,以及訂閱通道。
二、連接過程
先放一張我自己整理的圖片

先從左邊看起吧
1、調(diào)用connectAndWaitTimeout這個方法
- (BOOL)connectAndWaitTimeout:(NSTimeInterval)timeout {
NSDate *started = [NSDate date];
self.synchronConnect = TRUE;
[self connect];
[[NSRunLoop currentRunLoop] addPort:[NSMachPort port] forMode:NSRunLoopCommonModes];
while (self.synchronConnect && (timeout == 0 || started.timeIntervalSince1970 + timeout > [NSDate date].timeIntervalSince1970)) {
DDLogVerbose(@"[MQTTSessionSynchron] waiting for connect");
[[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:.1]];
}
DDLogVerbose(@"[MQTTSessionSynchron] end connect");
return (self.status == MQTTSessionStatusConnected);
}
1、得到當前時間作為開始時間
2、把標志synchronConnect設(shè)置為true
3、開始連接
4、在當前runloop中提添加一個端口
5、等待連接反饋或者超時
2、調(diào)用connect方法
- (void)connect {
if (MQTTStrict.strict &&
self.clientId && self.clientId.length < 1 &&
!self.cleanSessionFlag) {
NSException* myException = [NSException
exceptionWithName:@"clientId must be at least 1 character long if cleanSessionFlag is off"
reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.clientId) {
NSException* myException = [NSException
exceptionWithName:@"clientId must not be nil"
reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
[self.clientId dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
NSException* myException = [NSException
exceptionWithName:@"clientId may not be longer than 65535 bytes in UTF8 representation"
reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
![self.clientId dataUsingEncoding:NSUTF8StringEncoding]) {
NSException* myException = [NSException
exceptionWithName:@"clientId must not contain non-UTF8 characters"
reason:[NSString stringWithFormat:@"clientId = %@", self.clientId]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
[self.userName dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
NSException* myException = [NSException
exceptionWithName:@"userName may not be longer than 65535 bytes in UTF8 representation"
reason:[NSString stringWithFormat:@"userName length = %lu", [self.userName dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
![self.userName dataUsingEncoding:NSUTF8StringEncoding]) {
NSException* myException = [NSException
exceptionWithName:@"userName must not contain non-UTF8 characters"
reason:[NSString stringWithFormat:@"password = %@", self.userName]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.userName) {
NSException* myException = [NSException
exceptionWithName:@"password specified without userName"
reason:[NSString stringWithFormat:@"password = %@", self.password]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.protocolLevel != MQTTProtocolVersion31 &&
self.protocolLevel != MQTTProtocolVersion311 &&
self.protocolLevel != MQTTProtocolVersion50) {
NSException* myException = [NSException
exceptionWithName:@"Illegal protocolLevel"
reason:[NSString stringWithFormat:@"%d is not 3, 4, or 5", self.protocolLevel]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.willFlag &&
self.willTopic) {
NSException* myException = [NSException
exceptionWithName:@"Will topic must be nil if willFlag is false"
reason:[NSString stringWithFormat:@"%@", self.willTopic]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.willFlag &&
self.willMsg) {
NSException* myException = [NSException
exceptionWithName:@"Will message must be nil if willFlag is false"
reason:[NSString stringWithFormat:@"%@", self.willMsg]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.willFlag &&
self.willRetainFlag) {
NSException* myException = [NSException
exceptionWithName:@"Will retain must be false if willFlag is false"
reason:[NSString stringWithFormat:@"%d", self.willRetainFlag]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
!self.willFlag &&
self.willQoS != MQTTQosLevelAtMostOnce) {
NSException* myException = [NSException
exceptionWithName:@"Will QoS Level must be 0 if willFlag is false"
reason:[NSString stringWithFormat:@"%d", self.willQoS]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willQoS != MQTTQosLevelAtMostOnce &&
self.willQoS != MQTTQosLevelAtLeastOnce &&
self.willQoS != MQTTQosLevelExactlyOnce) {
NSException* myException = [NSException
exceptionWithName:@"Illegal will QoS level"
reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", self.willQoS]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willFlag &&
!self.willTopic) {
NSException* myException = [NSException
exceptionWithName:@"Will topic must not be nil if willFlag is true"
reason:[NSString stringWithFormat:@"%@", self.willTopic]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willTopic &&
self.willTopic.length < 1) {
NSException* myException = [NSException
exceptionWithName:@"Will topic must be at least 1 character long"
reason:[NSString stringWithFormat:@"%@", self.willTopic]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willTopic &&
[self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
NSException* myException = [NSException
exceptionWithName:@"willTopic may not be longer than 65535 bytes in UTF8 representation"
reason:[NSString stringWithFormat:@"willTopic length = %lu", [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willTopic &&
![self.willTopic dataUsingEncoding:NSUTF8StringEncoding]) {
NSException* myException = [NSException
exceptionWithName:@"willTopic must not contain non-UTF8 characters"
reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willTopic &&
([self.willTopic containsString:@"+"] ||
[self.willTopic containsString:@"#"])
) {
NSException* myException = [NSException
exceptionWithName:@"willTopic must not contain wildcards"
reason:[NSString stringWithFormat:@"willTopic = %@", self.self.willTopic]
userInfo:nil];
@throw myException;
}
if (MQTTStrict.strict &&
self.willFlag &&
!self.willMsg) {
NSException* myException = [NSException
exceptionWithName:@"Will message must not be nil if willFlag is true"
reason:[NSString stringWithFormat:@"%@", self.willMsg]
userInfo:nil];
@throw myException;
}
DDLogVerbose(@"[MQTTSession] connecting");
if (self.cleanSessionFlag) {
[self.persistence deleteAllFlowsForClientId:self.clientId];
[self.subscribeHandlers removeAllObjects];
[self.unsubscribeHandlers removeAllObjects];
[self.publishHandlers removeAllObjects];
}
[self tell];
self.status = MQTTSessionStatusConnecting;
self.decoder = [[MQTTDecoder alloc] init];
self.decoder.runLoop = self.runLoop;
self.decoder.runLoopMode = self.runLoopMode;
self.decoder.delegate = self;
[self.decoder open];
self.transport.delegate = self;
[self.transport open];
}
0、先做一系列的判斷,如果不通過就輸出異常
1、根據(jù)cleanSessionFlag狀態(tài)清除數(shù)組中的數(shù)據(jù)
2、[self tell]
3、設(shè)置當前狀態(tài)為連接狀態(tài)
4、創(chuàng)建一個解碼對象MQTTDecoder,并賦值
5、調(diào)用decoder的open 方法
6、設(shè)置transport代理
7、調(diào)用transport 的 open 方法
3、在tell中方法又做了什么
- (void)tell {
NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId
incomingFlag:YES].count;
NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId
incomingFlag:NO].count;
if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
flowingIn:incoming
flowingOut:outflowing];
}
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:0
flowingIn:incoming
flowingOut:outflowing];
}
}
數(shù)據(jù)庫查找未輸入/輸出的數(shù)據(jù)count,
然后看代理存不存在,存在就把內(nèi)部遍歷出來的數(shù)組的count調(diào)用出去
4、再然后open方法中
- (void)open {
self.state = MQTTDecoderStateDecodingHeader;
}
設(shè)置標志
5、transport的open方法
這里就復雜點了,首先transport是在Session創(chuàng)建的時候指定的,這里Transport的類型有MQTTCFSocketTransport、MQTTWebsocketTransport、MQTTSSLSecurityPolicyTransport三種transport,每個都不一樣。
先來看
MQTTSSLSecurityPolicyTransport中做了什么
- (void)open {
DDLogVerbose(@"[MQTTSSLSecurityPolicyTransport] open");
self.state = MQTTTransportOpening;
NSError* connectError;
CFReadStreamRef readStream;
CFWriteStreamRef writeStream;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
if (self.tls) {
NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
// delegate certificates verify operation to our secure policy.
// by disabling chain validation, it becomes our responsibility to verify that the host at the other end can be trusted.
// the server's certificates will be verified during MQTT encoder/decoder processing.
sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
sslOptions[(NSString *)kCFStreamSSLValidatesCertificateChain] = @NO;
if (self.certificates) {
sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
}
if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"MQTT"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
}
if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"MQTT"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
}
}
if(!connectError){
self.encoder = [[MQTTSSLSecurityPolicyEncoder alloc] init];
self.encoder.stream = CFBridgingRelease(writeStream);
self.encoder.securityPolicy = self.tls ? self.securityPolicy : nil;
self.encoder.securityDomain = self.tls ? self.host : nil;
self.encoder.runLoop = self.runLoop;
self.encoder.runLoopMode = self.runLoopMode;
self.encoder.delegate = self;
if (self.voip) {
[self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.encoder open];
self.decoder = [[MQTTSSLSecurityPolicyDecoder alloc] init];
self.decoder.stream = CFBridgingRelease(readStream);
self.decoder.securityPolicy = self.tls ? self.securityPolicy : nil;
self.decoder.securityDomain = self.tls ? self.host : nil;
self.decoder.runLoop = self.runLoop;
self.decoder.runLoopMode = self.runLoopMode;
self.decoder.delegate = self;
if (self.voip) {
[self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.decoder open];
} else {
[self close];
}
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個readStreamRef和一個writeStreamRef,以及一個NSError對象
3、判斷是否加密,是就進行加密處理(內(nèi)部如果出現(xiàn)了錯誤信息,就把上面創(chuàng)建的NSError對象實現(xiàn))
4、判斷是否有錯誤信息,如果有就close掉沒有就繼續(xù)執(zhí)行
5、分別創(chuàng)建兩個對象,一個encoder一個decoder,用于解編碼。
6、依次調(diào)用encoder 和 decoder 的 open 方法,等待消息接收處理
在來看MQTTWebsocketTransport中做了什么
- (void)open {
DDLogVerbose(@"[MQTTWebsocketTransport] open");
self.state = MQTTTransportOpening;
NSMutableURLRequest *urlRequest = [NSMutableURLRequest requestWithURL:[self endpointURL]];
urlRequest.SR_SSLPinnedCertificates = self.pinnedCertificates;
NSArray <NSString *> *protocols = @[@"mqtt"];
self.websocket = [[SRWebSocket alloc] initWithURLRequest:urlRequest
protocols:protocols
allowsUntrustedSSLCertificates:self.allowUntrustedCertificates];
self.websocket.delegate = self;
[self.websocket open];
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個URLRequest [self endPointURL]
3、設(shè)置URLRequest的證書
4、創(chuàng)建一個協(xié)議數(shù)組,并且添加協(xié)議MQTT [@“mqtt”]
5、創(chuàng)建SRWebsocket對象 websocket
6、設(shè)置SRWebcoket對象的代理 websocket.delegate = self
7、打開連接 websocket open, 最終和上面的類似,也是打開Stream通道
這里還有一個方法是需要介紹的:endpointURL
- (NSURL*) endpointURL {
NSString *protocol = (self.tls) ? @"wss" : @"ws";
NSString *portString = (self.port == 0) ? @"" : [NSString stringWithFormat:@":%d",(unsigned int)self.port];
NSString *path = self.path;
NSString *urlString = [NSString stringWithFormat:@"%@://%@%@%@",
protocol,
self.host,
portString,
path];
NSURL *url = [NSURL URLWithString:urlString];
return url;
}
拼接一個URL字符串
1、判斷是否加密 self.tls 生成 @“wss” 或者 @“ws”
2、判斷端口號是否為 0 ,生成端口
3、路徑 默認為 @“/mqtt”
4、組合字符串 協(xié)議://地址端口路徑
5、生成 URL
6、返回URL
最后一個就是
MQTTCFSocketTransport
DDLogVerbose(@"[MQTTCFSocketTransport] open");
self.state = MQTTTransportOpening;
NSError* connectError;
CFReadStreamRef readStream;
CFWriteStreamRef writeStream;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
if (self.tls) {
NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
if (self.certificates) {
sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
}
if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"MQTT"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
}
if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"MQTT"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
}
}
if(!connectError){
self.encoder.delegate = nil;
self.encoder = [[MQTTCFSocketEncoder alloc] init];
self.encoder.stream = CFBridgingRelease(writeStream);
self.encoder.runLoop = self.runLoop;
self.encoder.runLoopMode = self.runLoopMode;
self.encoder.delegate = self;
if (self.voip) {
[self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.encoder open];
self.decoder.delegate = nil;
self.decoder = [[MQTTCFSocketDecoder alloc] init];
self.decoder.stream = CFBridgingRelease(readStream);
self.decoder.runLoop = self.runLoop;
self.decoder.runLoopMode = self.runLoopMode;
self.decoder.delegate = self;
if (self.voip) {
[self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.decoder open];
} else {
[self close];
}
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個readStreamRef和一個writeStreamRef,以及一個NSError對象
3、判斷是否加密,是就進行加密處理(內(nèi)部如果出現(xiàn)了錯誤信息,就把上面創(chuàng)建的NSError對象實現(xiàn))
4、判斷是否有錯誤信息,如果有就close掉沒有就繼續(xù)執(zhí)行
5、分別創(chuàng)建兩個對象,一個encoder一個decoder,用于解編碼。
6、依次調(diào)用encoder 和 decoder 的 open 方法,等待消息接收處理
整個的流程和SSLSecurityPolicyTransport一樣,
只是一個是加密的,一個是不加密的,所以兩個編碼、解碼都不一樣。
到這里之后連接部分就已經(jīng)完成了,但是剛才說的是左邊的連接,現(xiàn)在我們看看右邊的又有什么不同。右邊的就是根據(jù)MQTTSessionManager這個類進入的
在外面調(diào)用了connect....方法之后呢
if (shouldReconnect) {
DDLogVerbose(@"[MQTTSessionManager] reconnecting");
[self disconnect];
[self reconnect];
} else {
DDLogVerbose(@"[MQTTSessionManager] connecting");
[self connectToInternal];
}
如果還沒連接,就先連接
而后調(diào)用
- (void)connectToInternal {
if (self.session && self.state == MQTTSessionManagerStateStarting) {
[self updateState:MQTTSessionManagerStateConnecting];
[self.session connectToHost:self.host
port:self.port
usingSSL:self.tls];
}
}
判斷是否已經(jīng)在連接了,如果沒有,就標志正在連接,并且開始連接
調(diào)用連接方法
- (void)connectToHost:(NSString*)host port:(UInt32)port usingSSL:(BOOL)usingSSL {
[self connectToHost:host port:port usingSSL:usingSSL connectHandler:nil];
}
- (void)connectToHost:(NSString *)host
port:(UInt32)port
usingSSL:(BOOL)usingSSL
connectHandler:(MQTTConnectHandler)connectHandler {
DDLogVerbose(@"MQTTSessionLegacy connectToHost:%@ port:%d usingSSL:%d connectHandler:%p",
host, (unsigned int)port, usingSSL, connectHandler);
if (self.transportType == MQTTTransportTypeWebSocket) {
MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
transport.host = host;
transport.port = port;
transport.tls = usingSSL;
transport.runLoopMode = self.runLoopMode;
transport.runLoop = self.runLoop;
self.transport = transport;
}else if (self.securityPolicy) {
MQTTSSLSecurityPolicyTransport *transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
transport.host = host;
transport.port = port;
transport.tls = usingSSL;
transport.securityPolicy = self.securityPolicy;
transport.certificates = self.certificates;
transport.voip = self.voip;
transport.runLoop = self.runLoop;
transport.runLoopMode = self.runLoopMode;
self.transport = transport;
} else {
MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
transport.host = host;
transport.port = port;
transport.tls = usingSSL;
transport.certificates = self.certificates;
transport.voip = self.voip;
transport.runLoop = self.runLoop;
transport.runLoopMode = self.runLoopMode;
self.transport = transport;
}
[self connectWithConnectHandler:connectHandler];
}
1、判斷連接類型,看需要哪種transport,然后創(chuàng)建并設(shè)置一些屬性
2、開始連接,并且把回調(diào)也傳入
調(diào)用connectWithConnectHandler方法
- (void)connectWithConnectHandler:(MQTTConnectHandler)connectHandler {
DDLogVerbose(@"[MQTTSession] connectWithConnectHandler:%p", connectHandler);
self.connectHandler = connectHandler;
[self connect];
}
1、保存回調(diào)
2、開始連接
調(diào)用connect方法在這里就回到了左邊的connect方法中