iOS MQTT - 2.連接以及連接過程解析

目錄:



該系列文章預計包括:




之前一直都忘記這個東西了……其實就是懶癌發(fā)作了。

一、 連接 (2019年02月23日更新)

- (void)connectTo:(NSString *)host
                  port:(NSInteger)port
                   tls:(BOOL)tls
             keepalive:(NSInteger)keepalive
                 clean:(BOOL)clean
                  auth:(BOOL)auth
                  user:(NSString *)user
                  pass:(NSString *)pass
             willTopic:(NSString *)willTopic
                  will:(NSData *)will
               willQos:(MQTTQosLevel)willQos
        willRetainFlag:(BOOL)willRetainFlag
          withClientId:(NSString *)clientId
   securityPolicy:(MQTTSSLSecurityPolicy *)securityPolicy
     certificates:(NSArray *)certificates
    protocolLevel:(MQTTProtocolVersion)protocolLevel
   connectHandler:(MQTTConnectHandler)connectHandler;

映入眼簾就是一大把參數(shù),不要被這個嚇著了,我們一個參數(shù)一個參數(shù)的看

host : 地址
port : 端口
tls: 加密  ssl
keepalive: ?;顣r間
clean: 斷開的時候是否清理
auth:授權(quán)
user: 用戶名
pass: 密碼
willTopic: 主題
will: 數(shù)據(jù) 訂閱主題之后發(fā)送的數(shù)據(jù)
willQos: 消息級別在第一篇文章有介紹 
    MQTTQosLevelAtMostOnce = 0, 最多一次
    MQTTQosLevelAtLeastOnce = 1,最少一次
    MQTTQosLevelExactlyOnce = 2 只有一次
willRetainFlag:服務器是否重發(fā)
clientID:客戶端ID如果不寫會自己隨機生成一個
securityPolicy: 安全策略對象
certificates:證書數(shù)組
protocolLevel: 協(xié)議版本
    MQTTProtocolVersion0 = 0,
    MQTTProtocolVersion31 = 3,
    MQTTProtocolVersion311 = 4,
    MQTTProtocolVersion50 = 5
connectHandler: 連接回調(diào)



調(diào)用這個就創(chuàng)建Session,這里需要我們設(shè)置代理去監(jiān)聽我們連接的狀態(tài)以及消息發(fā)布和訂閱都會在delegate中。

如果你點擊進去看過源碼就會發(fā)現(xiàn)內(nèi)部會創(chuàng)建一個Session。
如果你不用系統(tǒng)給你的manager就需要自己創(chuàng)建session,并且切記設(shè)置代理。還需要注意的是Session的Transport是可能不一樣的,假設(shè)我使用的WEBSocket的就需要把Transport設(shè)置為MQTTWebsocketTransport,否則就用MQTTCFSocketTransport

self.session = [[MQTTSession alloc] initWithClientId:clientId
                                            userName:auth ? user : nil
                                            password:auth ? pass : nil
                                           keepAlive:keepalive
                                        cleanSession:clean
                                                will:will
                                           willTopic:willTopic
                                             willMsg:willMsg
                                             willQoS:willQos
                                      willRetainFlag:willRetainFlag
                                       protocolLevel:protocolLevel
                                             runLoop:[NSRunLoop currentRunLoop]
                                             forMode:NSDefaultRunLoopMode
                                      securityPolicy:securityPolicy
                                        certificates:certificates
                                       transportType:MQTTTransportTypeWebSocket];
MQTTCoreDataPersistence *persistence = [[MQTTCoreDataPersistence alloc] init];
persistence.persistent = self.persistent;
persistence.maxWindowSize = self.maxWindowSize;
persistence.maxSize = self.maxSize;
persistence.maxMessages = self.maxMessages;
self.session.persistence = persistence;



現(xiàn)在再來看看delegate有哪些方法,這里拿重要的說:

1、收到訂閱通道的消息就會來這個回調(diào)

- (void)newMessage:(MQTTSession *)session
              data:(NSData *)data
           onTopic:(NSString *)topic
               qos:(MQTTQosLevel)qos
          retained:(BOOL)retained
               mid:(unsigned int)mid;

2、MQTT的連接狀態(tài)

- (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error;

3、連接部分的代理

/** gets called when a connection has been successfully established
 @param session the MQTTSession reporting the connect
 
 */
- (void)connected:(MQTTSession *)session;

/** gets called when a connection has been successfully established
 @param session the MQTTSession reporting the connect
 @param sessionPresent represents the Session Present flag sent by the broker
 
 */
- (void)connected:(MQTTSession *)session sessionPresent:(BOOL)sessionPresent;

/** gets called when a connection has been refused
 @param session the MQTTSession reporting the refusal
 @param error an optional additional error object with additional information
 */
- (void)connectionRefused:(MQTTSession *)session error:(NSError *)error;

/** gets called when a connection has been closed
 @param session the MQTTSession reporting the close

 */
- (void)connectionClosed:(MQTTSession *)session;

/** gets called when a connection error happened
 @param session the MQTTSession reporting the connect error
 @param error an optional additional error object with additional information
 */
- (void)connectionError:(MQTTSession *)session error:(NSError *)error;



通常我們在連接成功之后就可以發(fā)布信息,以及訂閱通道。

二、連接過程

先放一張我自己整理的圖片

MQTT連接過程.png


先從左邊看起吧
1、調(diào)用connectAndWaitTimeout這個方法
- (BOOL)connectAndWaitTimeout:(NSTimeInterval)timeout {
    NSDate *started = [NSDate date];
    self.synchronConnect = TRUE;
    
    [self connect];
    
    [[NSRunLoop currentRunLoop] addPort:[NSMachPort port] forMode:NSRunLoopCommonModes];
    
    while (self.synchronConnect && (timeout == 0 || started.timeIntervalSince1970 + timeout > [NSDate date].timeIntervalSince1970)) {
        DDLogVerbose(@"[MQTTSessionSynchron] waiting for connect");
        [[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:.1]];
    }
    
    DDLogVerbose(@"[MQTTSessionSynchron] end connect");
    
    return (self.status == MQTTSessionStatusConnected);
}
1、得到當前時間作為開始時間
2、把標志synchronConnect設(shè)置為true
3、開始連接
4、在當前runloop中提添加一個端口
5、等待連接反饋或者超時
2、調(diào)用connect方法

- (void)connect {

    if (MQTTStrict.strict &&
        self.clientId && self.clientId.length < 1 &&
        !self.cleanSessionFlag) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must be at least 1 character long if cleanSessionFlag is off"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.clientId) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must not be nil"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        ![self.clientId dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"clientId = %@", self.clientId]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        [self.userName dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"userName may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"userName length = %lu", [self.userName dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        ![self.userName dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"userName must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"password = %@", self.userName]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.userName) {
        NSException* myException = [NSException
                                    exceptionWithName:@"password specified without userName"
                                    reason:[NSString stringWithFormat:@"password = %@", self.password]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.protocolLevel != MQTTProtocolVersion31 &&
        self.protocolLevel != MQTTProtocolVersion311 &&
        self.protocolLevel != MQTTProtocolVersion50) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Illegal protocolLevel"
                                    reason:[NSString stringWithFormat:@"%d is not 3, 4, or 5", self.protocolLevel]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willTopic) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must be nil if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willMsg) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will message must be nil if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%@", self.willMsg]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willRetainFlag) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will retain must be false if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%d", self.willRetainFlag]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willQoS != MQTTQosLevelAtMostOnce) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will QoS Level must be 0 if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%d", self.willQoS]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willQoS != MQTTQosLevelAtMostOnce &&
        self.willQoS != MQTTQosLevelAtLeastOnce &&
        self.willQoS != MQTTQosLevelExactlyOnce) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Illegal will QoS level"
                                    reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", self.willQoS]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willFlag &&
        !self.willTopic) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must not be nil if willFlag is true"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        self.willTopic.length < 1) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must be at least 1 character long"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"willTopic length = %lu", [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        ![self.willTopic dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        ([self.willTopic containsString:@"+"] ||
         [self.willTopic containsString:@"#"])
        ) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic must not contain wildcards"
                                    reason:[NSString stringWithFormat:@"willTopic = %@", self.self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willFlag &&
        !self.willMsg) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will message must not be nil if willFlag is true"
                                    reason:[NSString stringWithFormat:@"%@", self.willMsg]
                                    userInfo:nil];
        @throw myException;
    }

    DDLogVerbose(@"[MQTTSession] connecting");
    if (self.cleanSessionFlag) {
        [self.persistence deleteAllFlowsForClientId:self.clientId];
        [self.subscribeHandlers removeAllObjects];
        [self.unsubscribeHandlers removeAllObjects];
        [self.publishHandlers removeAllObjects];
    }
    [self tell];

    self.status = MQTTSessionStatusConnecting;

    self.decoder = [[MQTTDecoder alloc] init];
    self.decoder.runLoop = self.runLoop;
    self.decoder.runLoopMode = self.runLoopMode;
    self.decoder.delegate = self;
    [self.decoder open];

    self.transport.delegate = self;
    [self.transport open];
}
0、先做一系列的判斷,如果不通過就輸出異常
1、根據(jù)cleanSessionFlag狀態(tài)清除數(shù)組中的數(shù)據(jù)
2、[self tell]
3、設(shè)置當前狀態(tài)為連接狀態(tài)
4、創(chuàng)建一個解碼對象MQTTDecoder,并賦值
5、調(diào)用decoder的open 方法
6、設(shè)置transport代理
7、調(diào)用transport 的 open 方法


3、在tell中方法又做了什么
- (void)tell {
    NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId
                                                   incomingFlag:YES].count;
    NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId
                                                     incomingFlag:NO].count;
    if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
    if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                         queued:0
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
}
數(shù)據(jù)庫查找未輸入/輸出的數(shù)據(jù)count,
然后看代理存不存在,存在就把內(nèi)部遍歷出來的數(shù)組的count調(diào)用出去


4、再然后open方法中
- (void)open {
    self.state = MQTTDecoderStateDecodingHeader;
}
設(shè)置標志


5、transportopen方法

這里就復雜點了,首先transport是在Session創(chuàng)建的時候指定的,這里Transport的類型有MQTTCFSocketTransport、MQTTWebsocketTransport、MQTTSSLSecurityPolicyTransport三種transport,每個都不一樣。

先來看MQTTSSLSecurityPolicyTransport中做了什么

- (void)open {
    DDLogVerbose(@"[MQTTSSLSecurityPolicyTransport] open");
    self.state = MQTTTransportOpening;

    NSError* connectError;

    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;

    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);

    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);

    if (self.tls) {
        NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
        
        // delegate certificates verify operation to our secure policy.
        // by disabling chain validation, it becomes our responsibility to verify that the host at the other end can be trusted.
        // the server's certificates will be verified during MQTT encoder/decoder processing.
        sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
        sslOptions[(NSString *)kCFStreamSSLValidatesCertificateChain] = @NO;
        
        if (self.certificates) {
            sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
        }
        
        if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
        }
        if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
        }
    }
    
    if(!connectError){
        self.encoder = [[MQTTSSLSecurityPolicyEncoder alloc] init];
        self.encoder.stream = CFBridgingRelease(writeStream);
        self.encoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.encoder.securityDomain = self.tls ? self.host : nil;
        self.encoder.runLoop = self.runLoop;
        self.encoder.runLoopMode = self.runLoopMode;
        self.encoder.delegate = self;
        if (self.voip) {
            [self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.encoder open];
        
        self.decoder = [[MQTTSSLSecurityPolicyDecoder alloc] init];
        self.decoder.stream =  CFBridgingRelease(readStream);
        self.decoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.decoder.securityDomain = self.tls ? self.host : nil;
        self.decoder.runLoop = self.runLoop;
        self.decoder.runLoopMode = self.runLoopMode;
        self.decoder.delegate = self;
        if (self.voip) {
            [self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.decoder open];
        
    } else {
        [self close];
    }
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個readStreamRef和一個writeStreamRef,以及一個NSError對象
3、判斷是否加密,是就進行加密處理(內(nèi)部如果出現(xiàn)了錯誤信息,就把上面創(chuàng)建的NSError對象實現(xiàn))
4、判斷是否有錯誤信息,如果有就close掉沒有就繼續(xù)執(zhí)行
5、分別創(chuàng)建兩個對象,一個encoder一個decoder,用于解編碼。
6、依次調(diào)用encoder 和 decoder 的 open 方法,等待消息接收處理


在來看MQTTWebsocketTransport中做了什么

- (void)open {
    DDLogVerbose(@"[MQTTWebsocketTransport] open");
    self.state = MQTTTransportOpening;
    
    NSMutableURLRequest *urlRequest = [NSMutableURLRequest requestWithURL:[self endpointURL]];
    urlRequest.SR_SSLPinnedCertificates = self.pinnedCertificates;
    NSArray <NSString *> *protocols = @[@"mqtt"];
    
    self.websocket = [[SRWebSocket alloc] initWithURLRequest:urlRequest
                                                   protocols:protocols
                              allowsUntrustedSSLCertificates:self.allowUntrustedCertificates];
    
    self.websocket.delegate = self;
    [self.websocket open];
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個URLRequest [self endPointURL]
3、設(shè)置URLRequest的證書
4、創(chuàng)建一個協(xié)議數(shù)組,并且添加協(xié)議MQTT [@“mqtt”]
5、創(chuàng)建SRWebsocket對象 websocket
6、設(shè)置SRWebcoket對象的代理 websocket.delegate = self
7、打開連接 websocket open, 最終和上面的類似,也是打開Stream通道

這里還有一個方法是需要介紹的:endpointURL

- (NSURL*) endpointURL {
    NSString *protocol = (self.tls) ? @"wss" : @"ws";
    NSString *portString = (self.port == 0) ? @"" : [NSString stringWithFormat:@":%d",(unsigned int)self.port];
    NSString *path = self.path;
    NSString *urlString = [NSString stringWithFormat:@"%@://%@%@%@",
                           protocol,
                           self.host,
                           portString,
                           path];
    NSURL *url = [NSURL URLWithString:urlString];
    return url;
}
拼接一個URL字符串
1、判斷是否加密 self.tls 生成 @“wss” 或者 @“ws”
2、判斷端口號是否為 0 ,生成端口
3、路徑 默認為 @“/mqtt”
4、組合字符串 協(xié)議://地址端口路徑
5、生成 URL
6、返回URL

最后一個就是MQTTCFSocketTransport

    DDLogVerbose(@"[MQTTCFSocketTransport] open");
    self.state = MQTTTransportOpening;

    NSError* connectError;

    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;

    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);

    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    
    if (self.tls) {
        NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
        
        sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
        
        if (self.certificates) {
            sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
        }
        
        if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
        }
        if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
        }
    }
    
    if(!connectError){
        self.encoder.delegate = nil;
        self.encoder = [[MQTTCFSocketEncoder alloc] init];
        self.encoder.stream = CFBridgingRelease(writeStream);
        self.encoder.runLoop = self.runLoop;
        self.encoder.runLoopMode = self.runLoopMode;
        self.encoder.delegate = self;
        if (self.voip) {
            [self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.encoder open];
        
        self.decoder.delegate = nil;
        self.decoder = [[MQTTCFSocketDecoder alloc] init];
        self.decoder.stream =  CFBridgingRelease(readStream);
        self.decoder.runLoop = self.runLoop;
        self.decoder.runLoopMode = self.runLoopMode;
        self.decoder.delegate = self;
        if (self.voip) {
            [self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.decoder open];
        
    } else {
        [self close];
    }
}
1、設(shè)置 MQTTTransportState 為 MQTTTransportOpening
2、創(chuàng)建一個readStreamRef和一個writeStreamRef,以及一個NSError對象
3、判斷是否加密,是就進行加密處理(內(nèi)部如果出現(xiàn)了錯誤信息,就把上面創(chuàng)建的NSError對象實現(xiàn))
4、判斷是否有錯誤信息,如果有就close掉沒有就繼續(xù)執(zhí)行
5、分別創(chuàng)建兩個對象,一個encoder一個decoder,用于解編碼。
6、依次調(diào)用encoder 和 decoder 的 open 方法,等待消息接收處理

整個的流程和SSLSecurityPolicyTransport一樣,
只是一個是加密的,一個是不加密的,所以兩個編碼、解碼都不一樣。



到這里之后連接部分就已經(jīng)完成了,但是剛才說的是左邊的連接,現(xiàn)在我們看看右邊的又有什么不同。右邊的就是根據(jù)MQTTSessionManager這個類進入的


在外面調(diào)用了connect....方法之后呢

    if (shouldReconnect) {
        DDLogVerbose(@"[MQTTSessionManager] reconnecting");
        [self disconnect];
        [self reconnect];
    } else {
        DDLogVerbose(@"[MQTTSessionManager] connecting");
        [self connectToInternal];
    }
如果還沒連接,就先連接



而后調(diào)用

- (void)connectToInternal {
    if (self.session && self.state == MQTTSessionManagerStateStarting) {
        [self updateState:MQTTSessionManagerStateConnecting];
        [self.session connectToHost:self.host
                               port:self.port
                           usingSSL:self.tls];
    }
}
判斷是否已經(jīng)在連接了,如果沒有,就標志正在連接,并且開始連接



調(diào)用連接方法

- (void)connectToHost:(NSString*)host port:(UInt32)port usingSSL:(BOOL)usingSSL {
    [self connectToHost:host port:port usingSSL:usingSSL connectHandler:nil];
}

- (void)connectToHost:(NSString *)host
                 port:(UInt32)port
             usingSSL:(BOOL)usingSSL
       connectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"MQTTSessionLegacy connectToHost:%@ port:%d usingSSL:%d connectHandler:%p",
                 host, (unsigned int)port, usingSSL, connectHandler);
    
    
    if (self.transportType == MQTTTransportTypeWebSocket) {
        MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.runLoopMode = self.runLoopMode;
        transport.runLoop = self.runLoop;
        self.transport = transport;
    }else if (self.securityPolicy) {
        MQTTSSLSecurityPolicyTransport *transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.securityPolicy = self.securityPolicy;
        transport.certificates = self.certificates;
        transport.voip = self.voip;
        transport.runLoop = self.runLoop;
        transport.runLoopMode = self.runLoopMode;
        self.transport = transport;
        
    } else {
        MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.certificates = self.certificates;
        transport.voip = self.voip;
        transport.runLoop = self.runLoop;
        transport.runLoopMode = self.runLoopMode;
        self.transport = transport;
    }
    
    [self connectWithConnectHandler:connectHandler];
}

1、判斷連接類型,看需要哪種transport,然后創(chuàng)建并設(shè)置一些屬性
2、開始連接,并且把回調(diào)也傳入

調(diào)用connectWithConnectHandler方法

- (void)connectWithConnectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"[MQTTSession] connectWithConnectHandler:%p", connectHandler);
    self.connectHandler = connectHandler;
    [self connect];
}
1、保存回調(diào)
2、開始連接

調(diào)用connect方法在這里就回到了左邊的connect方法中

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評論 19 139
  • 點擊查看原文 Web SDK 開發(fā)手冊 SDK 概述 網(wǎng)易云信 SDK 為 Web 應用提供一個完善的 IM 系統(tǒng)...
    layjoy閱讀 14,314評論 0 15
  • RAC使用測試Demo下載:github.com/FuWees/WPRACTestDemo 1.ReactiveC...
    FuWees閱讀 6,651評論 3 10
  • 海浪蒼涼, 浸上我們的雙腳。 我們像是,徒手站在崖邊,與命運之神對抗的囚馬, 明明可以一同跨越到崖的那邊,但是你受...
    冰蘭檳榔閱讀 207評論 1 1
  • 今天分享的是干貨,各位寶媽歡迎參考哦! 下面附上文字版: 準寶媽備貨攻略 --媽媽 孕前 1、葉酸 備孕期間就要吃...
    泡沫伊然閱讀 481評論 1 0

友情鏈接更多精彩內(nèi)容