IOS源碼解析:GCDAsySocket

原創(chuàng):知識進階型文章
創(chuàng)作不易,請珍惜,之后會持續(xù)更新,不斷完善
個人比較喜歡做筆記和寫總結(jié),畢竟好記性不如爛筆頭哈哈,這些文章記錄了我的IOS成長歷程,希望能與大家一起進步
溫馨提示:由于簡書不支持目錄跳轉(zhuǎn),大家可通過command + F 輸入目錄標(biāo)題后迅速尋找到你所需要的內(nèi)容

目錄

  • 一、GCDAsyncSocket的成員變量
  • 二、創(chuàng)建socket
    • 1、提供給外界調(diào)用創(chuàng)建socket的接口方法
    • 2、層級調(diào)用
    • 3、最終調(diào)用的初始化方法
    • 4、在懶加載方法中進行一系列配置
  • 三、連接socket
    • 1、連接socket方法的具體實現(xiàn)
    • 2、包裹在自動釋放池中的GCDBlock中的具體內(nèi)容
    • 3、創(chuàng)建全局隊列并異步執(zhí)行代碼塊中的內(nèi)容
    • 4、實現(xiàn)在連接之前的接口檢查方法
    • 5、根據(jù)interface 得到IPV4和IPV6地址
  • 四、連接socket最終調(diào)用的方法
    • 1、lookup方法的內(nèi)部實現(xiàn)
    • 2、connectWithAddress4方法的內(nèi)部實現(xiàn)
    • 3、創(chuàng)建Socket
    • 4、連接Socket的究極方法
    • 5、連接成功后的回調(diào):設(shè)置一些連接成功的狀態(tài)
    • 6、注冊Stream的回調(diào)和讀stream的回調(diào)
    • 7、stream與runloop
  • 五、連接到服務(wù)器后開始讀取數(shù)據(jù)
    • 1、用偏移量 maxLength 讀取數(shù)據(jù)
    • 2、讓讀任務(wù)離隊,開始執(zhí)行這條讀任務(wù)
    • 3、可能開啟TLS
    • 4、讀取數(shù)據(jù)
  • Demo
  • 參考文獻

一、GCDAsyncSocket的成員變量

@implementation GCDAsyncSocket
{
    ...
}
標(biāo)識當(dāng)前socket的狀態(tài)
uint32_t flags;

enum GCDAsyncSocketFlags
{
    kSocketStarted                 = 1 <<  0,  // If set, socket has been started (accepting/connecting)
    kConnected        
        ...
}
ipv4和ipv6的配置
uint16_t config;

enum GCDAsyncSocketConfig
{
    kIPv4Disabled              = 1 << 0,  // If set, IPv4 is disabled
    kIPv6Disabled              = 1 << 1,  // If set, IPv6 is disabled
    kPreferIPv6                = 1 << 2,  // If set, IPv6 is preferred over IPv4
    kAllowHalfDuplexConnection = 1 << 3,  // If set, the socket will stay open even if the read stream closes
};
GCDAsyncSocketDelegate屬性
__weak id<GCDAsyncSocketDelegate> delegate;// 當(dāng)連接socket、讀寫數(shù)據(jù)、關(guān)閉socket的時候進行回調(diào)代理屬性
dispatch_queue_t delegateQueue;// 代理回調(diào)的queue
三種Socket類型對應(yīng)三種地址
int socket4FD;// 本地IPV4Socket
int socket6FD;// 本地IPV6Socket
int socketUN;// unix域的套接字,用于進程之間通訊
NSURL *socketUrl;// 服務(wù)端url
int stateIndex;// 狀態(tài)Index
NSData * connectInterface4;// 本機的IPV4地址
NSData * connectInterface6;// 本機的IPV6地址
NSData * connectInterfaceUN;// 本機unix域地址
這個類對Socket的操作都在這個串行queue中
dispatch_queue_t socketQueue;
接收源數(shù)據(jù)的回調(diào)事件
dispatch_source_t accept4Source;
dispatch_source_t accept6Source;
dispatch_source_t acceptUNSource;
鏈接、讀寫timer用的是GCD定時器
dispatch_source_t connectTimer;
dispatch_source_t readSource;
dispatch_source_t writeSource;
dispatch_source_t readTimer;
dispatch_source_t writeTimer;

connectTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
dispatch_source_set_event_handler(connectTimer, ^{ @autoreleasepool {
    [strongSelf doConnectTimeout];
}});
讀寫數(shù)據(jù)包數(shù)組:類似queue,先讀進去的先拿出來(FIFO)最大限制為5個包
NSMutableArray *readQueue;
NSMutableArray *writeQueue;
當(dāng)前正在讀寫數(shù)據(jù)包
GCDAsyncReadPacket *currentRead;
GCDAsyncWritePacket *currentWrite;
當(dāng)前socket未獲取完的數(shù)據(jù)大小
unsigned long socketFDBytesAvailable;
全局公用的提前緩沖區(qū):讀取的數(shù)據(jù)會填充到Buffer中,填充滿了后就會取出Buffer中的數(shù)據(jù)進行代理回調(diào)
GCDAsyncSocketPreBuffer *preBuffer;
讀寫數(shù)據(jù)流
CFStreamClientContext streamContext;
CFReadStreamRef readStream;// 讀的數(shù)據(jù)流
CFWriteStreamRef writeStream;// 寫的數(shù)據(jù)流
SSL認證
SSLContextRef sslContext;// SSL上下文,用來做SSL認證
GCDAsyncSocketPreBuffer *sslPreBuffer;// 全局公用的SSL的提前緩沖區(qū)
size_t sslWriteCachedLength;
OSStatus sslErrCode;// 記錄SSL讀取數(shù)據(jù)錯誤
OSStatus lastSSLHandshakeError;// 記錄SSL握手的錯誤
socket隊列的標(biāo)識key
void *IsOnSocketQueueOrTargetQueueKey;

// 將socketQueue作上標(biāo)記方便取出
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);

// 當(dāng)前線程根據(jù)這個標(biāo)識判斷是不是在這個隊列
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
    return delegate;
}

if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
    return delegateQueue;
}
連接備選服務(wù)端地址的延時 (另一個IPV4或IPV6)
NSTimeInterval alternateAddressDelay;

alternateAddressDelay = 0.3;// 默認0.3秒

// 用socket和address去連接服務(wù)器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];

// 如果有備選地址
if (alternateAddress)
{
    // 延遲去連接備選的地址
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
        [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
    });
}

二、創(chuàng)建socket

self.socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_global_queue(0, 0)];

1、提供給外界調(diào)用創(chuàng)建socket的接口方法

- (instancetype)init;
- (instancetype)initWithSocketQueue:(nullable dispatch_queue_t)sq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq socketQueue:(nullable dispatch_queue_t)sq;

2、層級調(diào)用

- (id)init
{
    return [self initWithDelegate:nil delegateQueue:NULL socketQueue:NULL];
}
- (id)initWithSocketQueue:(dispatch_queue_t)sq
{
    return [self initWithDelegate:nil delegateQueue:NULL socketQueue:sq];
}
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq
{
    return [self initWithDelegate:aDelegate delegateQueue:dq socketQueue:NULL];
}

3、最終調(diào)用的初始化方法

- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq
{
    if((self = [super init]))
    {
            ...
    }
    return self;
}
? 創(chuàng)建socket,屬性值先都置為 -1
socket4FD = SOCKET_NULL;// 本機的ipv4
socket6FD = SOCKET_NULL;
socketUN = SOCKET_NULL;
socketUrl = nil;
stateIndex = 0;// 狀態(tài)Index
? 如果scoketQueue存在卻是global的則報錯,沒有的話創(chuàng)建一個scoketQueue(名字為:GCDAsyncSocket,NULL表示是串行隊列)
if (sq)
{
    // 斷言必須要一個非并行queue
    NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
             @"The given socketQueue parameter must not be a concurrent queue.");
    
    socketQueue = sq;
}
else
{
    socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL);
}
? 給當(dāng)前隊里加一個標(biāo)識
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);
? 初始化讀寫數(shù)組,限制大小為5
readQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentRead = nil;

writeQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentWrite = nil;
? 設(shè)置緩沖區(qū)大小為4kb
preBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)];
? 設(shè)置連接備選服務(wù)端地址的延時為0.3秒
alternateAddressDelay = 0.3;

4、在懶加載方法中進行一系列配置

delegate
- (id)delegate
{
    if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 當(dāng)前隊列已經(jīng)是同步串行
    {
        return delegate;
    }
    else// 否則讓其處于同步串行
    {
        __block id result;
        
        // 同步串行隊列:保證socket順序安全,因為需要先建立鏈接才能讀寫數(shù)據(jù),存在順序性
        dispatch_sync(socketQueue, ^{
            result = self->delegate;
        });
        
        return result;
    }
}
delegateQueue
- (dispatch_queue_t)delegateQueue
{
    if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 當(dāng)前代理隊列已經(jīng)是同步串行
    {
        return delegateQueue;
    }
    else// 否則讓其處于同步串行
    {
        __block dispatch_queue_t result;
        
        // 保證代理數(shù)據(jù)回調(diào)順序安全
        dispatch_sync(socketQueue, ^{
            result = self->delegateQueue;
        });
        
        return result;
    }
}

三、連接socket

[self.socket connectToHost:@"127.0.0.1" onPort:8090 withTimeout:-1 error:&error];

1、連接socket方法的具體實現(xiàn)

- (BOOL)connectToHost:(NSString *)inHost
               onPort:(uint16_t)port
         viaInterface:(NSString *)inInterface
          withTimeout:(NSTimeInterval)timeout
                error:(NSError **)errPtr
{
    ...
}
? 拿到host和interface,通過copy防止外界傳入值被修改

很可能給我們的服務(wù)端的參數(shù)是一個可變字符串,所以我們需要copy,在Block里同步執(zhí)行,這樣就不需要擔(dān)心它被改變。

NSString *host = [inHost copy];
NSString *interface = [inInterface copy];
? 包裹在自動釋放池中的GCDBlock
  • 大量臨時變量(connect : 重連)
  • 自定義線程管理 (NSOperation)
dispatch_block_t block = ^{ @autoreleasepool {
    ...
}
? 在socketQueue中執(zhí)行這個Block,否則同步的調(diào)起這個queue去執(zhí)行
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
    block();// 在socketQueue中執(zhí)行這個Block
else
    dispatch_sync(socketQueue, block);// 否則同步的調(diào)起這個queue去執(zhí)行
? 如果發(fā)生了錯誤,則將錯誤進行賦值
__block NSError *preConnectErr = nil;// 錯誤信息
...
f (errPtr) *errPtr = preConnectErr;
? 把連接是否成功的result返回
__block BOOL result = NO;// 返回結(jié)果
...
return result;

2、包裹在自動釋放池中的GCDBlock中的具體內(nèi)容

dispatch_block_t block = ^{ @autoreleasepool {
    ...
}
? 對host服務(wù)器地址有效性進行校驗

return_from_block這個宏其實就是return。host長度為0本來應(yīng)該直接返回 但是目前在GCDBlock中具有異步性即編譯器還沒有識別到return的時候就直接跳過去了順序執(zhí)行下面內(nèi)容,所以采用在預(yù)編譯期執(zhí)行的宏定義這種方式防止此類錯誤的發(fā)生。其實這種情況很少發(fā)生,因為系統(tǒng)對return已經(jīng)進行過優(yōu)化防止此類情況發(fā)生。

#define return_from_block  return

if ([host length] == 0)
{
    NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string.";
    preConnectErr = [self badParamError:msg];
    
    return_from_block;
}
? 在連接之前的接口檢查,一般我們傳nil
// interface表示本機的IP端口
if (![self preConnectWithInterface:interface error:&preConnectErr])
{
    return_from_block;
}
? flags做或等運算:將flags標(biāo)識為開始Socket連接
self->flags |= kSocketStarted;
? 創(chuàng)建全局隊列并異步執(zhí)行
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(...);
? 啟連接超時并返回YES
[self startConnectTimeout:timeout];

result = YES;

3、創(chuàng)建全局隊列并異步執(zhí)行代碼塊中的內(nèi)容

dispatch_async(globalConcurrentQueue, ^{ @autoreleasepool {
}});
? 獲取server地址數(shù)組(包含IPV4 IPV6的地址)
NSError *lookupErr = nil;// 查找錯誤
// lookupHost方法用來獲取服務(wù)器地址,內(nèi)部實現(xiàn)和獲取本地地址大同小異,不再贅述
NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr];
? 如果有錯
if (lookupErr)// 如果有錯
{
    // 使用scocketQueue
    dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
        // 一些錯誤處理,比如清空一些數(shù)據(jù)等等
        [strongSelf lookup:aStateIndex didFail:lookupErr];
    }});
}
? 沒有發(fā)生錯誤的話則遍歷地址數(shù)組得到IPV4和IPV6地址
NSData *address4 = nil;
NSData *address6 = nil;

for (NSData *address in addresses)// 遍歷地址數(shù)組
{
    // 判斷是否address4為空且address為IPV4
    if (!address4 && [[self class] isIPv4Address:address])
    {
        address4 = address;
    }
    // 判斷是否address6為空且address為IPV6
    else if (!address6 && [[self class] isIPv6Address:address])
    {
        address6 = address;
    }
}
? 在socketQueue同步隊列中異步去發(fā)起連接以保證順序執(zhí)行且不堵塞線程
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});

4、實現(xiàn)在連接之前的接口檢查方法

- (BOOL)preConnectWithInterface:(NSString *)interface error:(NSError **)errPtr
{
    ...
    return YES
}
? 先斷言,如果當(dāng)前的queue不是初始化的quueue,直接報錯
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
? 進行一系列有效性 判斷

&表示位與運算,因為枚舉是用左位移<<運算定義的,所以可以用&來判斷config包不包含某個枚舉。因為一個值可能包含好幾個枚舉值,所以這時候不能用==來判斷,只能用&來判斷。

if (delegate == nil)// 無代理
if (delegateQueue == NULL) // 沒有代理queue
if (![self isDisconnected])// 當(dāng)前不是非連接狀態(tài)(即已經(jīng)連接了所以沒必要再進行連接)

// 是否支持IPV4 IPV6
BOOL isIPv4Disabled = (config & kIPv4Disabled) ? YES : NO;
BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO;

if (isIPv4Disabled && isIPv6Disabled) // 是否都不支持
{
    if (errPtr)
    {
        NSString *msg = @"Both IPv4 and IPv6 have been disabled. Must enable at least one protocol first.";
        *errPtr = [self badConfigError:msg];
    }
    return NO;
}
? 如果有interface本機地址(一般傳入的是nil)
if (interface)
{
    NSMutableData *interface4 = nil;
    NSMutableData *interface6 = nil;
    
    // 得到本機的IPV4和IPV6地址
    [self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:0];
    
    // 如果兩者都為nil
    if ((interface4 == nil) && (interface6 == nil))
    if (isIPv4Disabled && (interface6 == nil))
    if (isIPv6Disabled && (interface4 == nil))
    
    // 如果都沒問題則賦值到成員變量中
    connectInterface4 = interface4;
    connectInterface6 = interface6;
}

5、根據(jù)interface 得到IPV4和IPV6地址

- (void)getInterfaceAddress4:(NSMutableData **)interfaceAddr4Ptr
                    address6:(NSMutableData **)interfaceAddr6Ptr
             fromDescription:(NSString *)interfaceDescription
                        port:(uint16_t)port
{
    NSMutableData *addr4 = nil;
    NSMutableData *addr6 = nil;
    NSString *interface = nil;
    ...
}
? 先用:分割(比如https://www.127.0.0.1
NSArray *components = [interfaceDescription componentsSeparatedByString:@":"];
if ([components count] > 0)
{
    NSString *temp = [components objectAtIndex:0];
    if ([temp length] > 0)
    {
        interface = temp;
    }
}
? 拿到port端口
if ([components count] > 1 && port == 0)
{
    // 將一個字符串根據(jù)base參數(shù)轉(zhuǎn)成長整型,如base值為10則采用10進制
    NSString *temp = [components objectAtIndex:1];
    long portL = strtol([temp UTF8String], NULL, 10);
    
    // UINT16_MAX:65535最大端口號
    if (portL > 0 && portL <= UINT16_MAX)
    {
        port = (uint16_t)portL;
    }
}
? interface為空則自己創(chuàng)建一個interface
if (interface == nil)
{
    // 生成一個地址結(jié)構(gòu)體
    struct sockaddr_in sockaddr4;
    // memset的作用是在一段內(nèi)存塊中填充某個給定的值,它是對較大的結(jié)構(gòu)體或數(shù)組進行清零操作的一種最快方法
    memset(&sockaddr4, 0, sizeof(sockaddr4));
    
    sockaddr4.sin_len         = sizeof(sockaddr4);// 結(jié)構(gòu)體長度
    sockaddr4.sin_family      = AF_INET;
    sockaddr4.sin_port        = htons(port);// 端口號
    sockaddr4.sin_addr.s_addr = htonl(INADDR_ANY);// 0.0.0.0表示不確定地址或者任意地址
    
    // sockaddr6同上
    ...
    
    // 把這兩個結(jié)構(gòu)體轉(zhuǎn)成data
    addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
    addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
? 如果是localhost、loopback(回環(huán)地址)就賦值為127.0.0.1
else if ([interface isEqualToString:@"localhost"] || [interface isEqualToString:@"loopback"])
{
    sockaddr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);// 127.0.0.1
    // sockaddr6同上
    ...
    addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
    addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
? 非localhost、loopback則去獲取本機IP,看是否和傳進來Interface是同名或者同IP,相同才給賦端口號,把數(shù)據(jù)封裝進Data,否則為nil
else
{
    const char *iface = [interface UTF8String];// 轉(zhuǎn)成cString
    
    struct ifaddrs *addrs;// 定義結(jié)構(gòu)體指針,這個指針是本地IP
    const struct ifaddrs *cursor;
    
    // 獲取到本機IP,為0說明成功了
    if ((getifaddrs(&addrs) == 0))
    {
        cursor = addrs;// 賦值
        while (cursor != NULL)// 如果IP不為空,則循環(huán)鏈表去設(shè)置
        {
            // 如果 addr4 IPV4地址為空,而且地址類型為IPV4
            if ((addr4 == nil) && (cursor->ifa_addr->sa_family == AF_INET))
            {
                ...
            }
            else if ((addr6 == nil) && (cursor->ifa_addr->sa_family == AF_INET6))// IPV6同上
            {
                ...
            }
            // 指向鏈表下一個addr
            cursor = cursor->ifa_next;
        }
    }
}
? 如果 addr4 IPV4地址為空,而且地址類型為IPV4
// IPv4
struct sockaddr_in nativeAddr4;
// memcpy內(nèi)存copy函數(shù),把src開始到size的字節(jié)數(shù)copy到dest中
memcpy(&nativeAddr4, cursor->ifa_addr, sizeof(nativeAddr4));

// 比較兩個字符串是否相同:本機的IP名和接口interface是否相同
if (strcmp(cursor->ifa_name, iface) == 0)
{
    // 相同則賦值 port
    nativeAddr4.sin_port = htons(port);
    // 用data封號IPV4地址
    addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
}
else// 本機IP名和interface不相同
{
    // 聲明一個IP 16位的數(shù)組
    char ip[INET_ADDRSTRLEN];
    
    // 這里是轉(zhuǎn)成了10進制(因為獲取到的是二進制IP)
    const char *conversion = inet_ntop(AF_INET, &nativeAddr4.sin_addr, ip, sizeof(ip));
    
    // 如果conversion不為空說明轉(zhuǎn)換成功,比較轉(zhuǎn)換后的IP和interface是否相同
    if ((conversion != NULL) && (strcmp(ip, iface) == 0))
    {
        // 相同則賦值 port
        nativeAddr4.sin_port = htons(port);
        
        addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
    }
}
? 如果這兩個二級指針存在,則取成一級指針,把addr4賦值給它
if (interfaceAddr4Ptr) *interfaceAddr4Ptr = addr4;
if (interfaceAddr6Ptr) *interfaceAddr6Ptr = addr6;

四、連接socket最終調(diào)用的方法

dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});

1、lookup方法的內(nèi)部實現(xiàn)

- (void)lookup:(int)aStateIndex didSucceedWithAddress4:(NSData *)address4 address6:(NSData *)address6
{
    NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
    // 至少有一個server服務(wù)器地址
    NSAssert(address4 || address6, @"Expected at least one valid address");
    
    // 如果狀態(tài)不一致,說明斷開連接
    if (aStateIndex != stateIndex)
    {
        LogInfo(@"Ignoring lookupDidSucceed, already disconnected");
        return;
    }
    
    // 有效性檢查
    if (isIPv4Disabled && (address6 == nil))
    if (isIPv6Disabled && (address4 == nil))

    // 調(diào)用連接方法,如果失敗,則返回錯誤信息
    NSError *err = nil;
    if (![self connectWithAddress4:address4 address6:address6 error:&err])
    {
        [self closeWithError:err];
    }
}

2、connectWithAddress4方法的內(nèi)部實現(xiàn)

- (BOOL)connectWithAddress4:(NSData *)address4 address6:(NSData *)address6 error:(NSError **)errPtr
{
    ...
    return YES;
}
? 決定socket類型
// 判斷是否傾向于IPV6
BOOL preferIPv6 = (config & kPreferIPv6) ? YES : NO;

// 如果有IPV4地址,創(chuàng)建IPV4 Socket
if (address4)
{
    socket4FD = [self createSocket:AF_INET connectInterface:connectInterface4 errPtr:errPtr];
}

// 如果有IPV6地址,創(chuàng)建IPV6 Socket
if (address6)
{
    socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr];
}

// 如果都為空,直接返回
if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL)
{
    return NO;
}
? 主選與備選
int socketFD, alternateSocketFD;// 主選socketFD,備選alternateSocketFD
NSData *address, *alternateAddress;// 主選地址和備選地址

// 主選IPV6
if ((preferIPv6 && socket6FD != SOCKET_NULL) || socket4FD == SOCKET_NULL)
{
    socketFD = socket6FD;
    alternateSocketFD = socket4FD;
    address = address6;
    alternateAddress = address4;
}
// 主選IPV4
else
{
    socketFD = socket4FD;
    alternateSocketFD = socket6FD;
    address = address4;
    alternateAddress = address6;
}
// 拿到當(dāng)前狀態(tài)
int aStateIndex = stateIndex;
? 用socket和address去連接服務(wù)器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];
? 如果有備選地址則延遲去連接備選的地址
if (alternateAddress)
{
    // 延遲去連接備選的地址
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
        [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
    });
}

3、創(chuàng)建Socket

- (int)createSocket:(int)family connectInterface:(NSData *)connectInterface errPtr:(NSError **)errPtr
{
    // 用SOCK_STREAM TCP流創(chuàng)建socket
    int socketFD = socket(family, SOCK_STREAM, 0);
    
    // 如果創(chuàng)建失敗
    if (socketFD == SOCKET_NULL)
    {
        if (errPtr)
            *errPtr = [self errorWithErrno:errno reason:@"Error in socket() function"];
        
        return socketFD;
    }
    
    // 和connectInterface綁定
    if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr])
    {
        // 綁定失敗,直接關(guān)閉返回
        [self closeSocket:socketFD];
        
        return SOCKET_NULL;
    }

    return socketFD;
}

4、連接Socket的究極方法

- (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex
{
    ...
}
? 發(fā)現(xiàn)已連接則關(guān)閉連接直接返回
if (self.isConnected)
{
    [self closeSocket:socketFD];
    return;
}
? 在全局Queue中開啟新線程進行連接
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalConcurrentQueue, ^{
    ...
});
? 調(diào)用connect方法,該函數(shù)會阻塞線程,所以要通過異步方式執(zhí)行并開啟新線程

客戶端向特定網(wǎng)絡(luò)地址的服務(wù)器發(fā)送連接請求,連接成功返回0,失敗返回 -1。

int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
? 連接成功
// 在socketQueue中開辟線程,即在同步隊列中異步執(zhí)行
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    if (result == 0)// 連接成功
    {
        // 關(guān)閉掉另一個沒用的socket
        [self closeUnusedSocket:socketFD];
        
        // 調(diào)用didConnect:生成stream并改變狀態(tài)
        [strongSelf didConnect:aStateIndex];
    }
    else// 連接失敗
    {
        ...
    }
}});
? 連接失敗
// 關(guān)閉當(dāng)前socket
[strongSelf closeSocket:socketFD];

// 返回連接錯誤的error
if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
{
    NSError *error = [strongSelf errorWithErrno:err reason:@"Error in connect() function"];
    [strongSelf didNotConnect:aStateIndex error:error];
}

5、連接成功后的回調(diào):設(shè)置一些連接成功的狀態(tài)

- (void)didConnect:(int)aStateIndex
{
    ...
}
? 調(diào)用回調(diào)前的準(zhǔn)備工作
// 狀態(tài)不同
if (aStateIndex != stateIndex)

// 將kConnected合并到當(dāng)前flag中
flags |= kConnected;

// 停止超時連接
[self endConnectTimeout];

// 創(chuàng)建個Block來初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{
    // 創(chuàng)建讀寫stream失敗,則關(guān)閉并報對應(yīng)錯誤
    if (![self createReadAndWriteStream])
    
    // 參數(shù)NO表示有可讀bytes的時候不會調(diào)用回調(diào)函數(shù)
    if (![self registerForStreamCallbacksIncludingReadWrite:NO])
};

// 創(chuàng)建個Block來設(shè)置stream
dispatch_block_t SetupStreamsPart2 = ^{
    if (![self addStreamsToRunLoop])// 如果加到runloop上失敗
    if (![self openStreams])// 打開讀寫stream
};

// 拿到server端的host和port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
__strong id<GCDAsyncSocketDelegate> theDelegate = delegate;// 拿到代理
? 調(diào)用回調(diào)函數(shù)
// 代理隊列和Host不為nil且響應(yīng)didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
    // 調(diào)用初始化stream
    SetupStreamsPart1();
    
    dispatch_async(delegateQueue, ^{ @autoreleasepool {
        // 到代理隊列調(diào)用連接成功的代理方法
        [theDelegate socket:self didConnectToHost:host port:port];
        
        dispatch_async(self->socketQueue, ^{ @autoreleasepool {
            // 然后回到socketQueue中去執(zhí)行設(shè)置stream
            SetupStreamsPart2();
        }});
    }});
}
? 初始化讀寫source并開始下一個任務(wù)
// 初始化讀寫source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];

// 開始下一個任務(wù)
[self maybeDequeueRead];
[self maybeDequeueWrite];

6、注冊Stream的回調(diào)和讀stream的回調(diào)

a、注冊Stream的回調(diào)
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
    // 設(shè)置一個CF的flag:一種是錯誤發(fā)生的時候,一種是stream事件結(jié)束
    CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
    if (includeReadWrite)// 如果包含讀寫
        readStreamEvents |= kCFStreamEventHasBytesAvailable;// 仍然有Bytes要讀
    
    // 給讀stream設(shè)置客戶端,會在之前設(shè)置的那些標(biāo)記下回調(diào)函數(shù)CFReadStreamCallback
    if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))// 客戶端stream上下文對象
    {
        return NO;
    }
    
    // 寫的flag同上
}
b、讀stream的回調(diào)
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
    // 得到觸發(fā)回調(diào)的sokcet
    GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)pInfo;
    
    switch(type)
    {
        case kCFStreamEventHasBytesAvailable:// 如果是可讀數(shù)據(jù)的回調(diào)
        {
            // 在socketQueue中調(diào)用
            dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
                asyncSocket->flags |= kSecureSocketHasBytesAvailable;
                [asyncSocket doReadData];// 去讀取數(shù)據(jù)
            }});
            
            break;
        }
        ...
    }
}

7、客戶端的runloop綁定stream

stream隨著runloop不停轉(zhuǎn)動,只要stream流一發(fā)生改變就能立刻更新數(shù)據(jù)。

- (void)didConnect:(int)aStateIndex
{
    // 創(chuàng)建個Block來設(shè)置stream
    dispatch_block_t SetupStreamsPart2 = ^{
        if (![self addStreamsToRunLoop])// 如果加到runloop上失敗
}
a、把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
    // 判斷flag里是否包含kAddedStreamsToRunLoop,沒添加過則添加
    if (!(flags & kAddedStreamsToRunLoop))
    {
        [[self class] startCFStreamThreadIfNeeded];
        
        // 在開啟的線程中去執(zhí)行(阻塞式的)
        dispatch_sync(cfstreamThreadSetupQueue, ^{
            [[self class] performSelector:@selector(scheduleCFStreams:)
                                 onThread:cfstreamThread
                               withObject:self
                            waitUntilDone:YES];
        });
        // 添加標(biāo)識
        flags |= kAddedStreamsToRunLoop;
    }
    
    return YES;
}
b、注冊CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
    // 獲取到runloop
    CFRunLoopRef runLoop = CFRunLoopGetCurrent();
    
    // 如果有readStream
    if (asyncSocket->readStream)
        // 注冊readStream在runloop的kCFRunLoopDefaultMode上
        CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
    // 同上
    if (asyncSocket->writeStream)
        CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}

五、連接到服務(wù)器后開始讀取數(shù)據(jù)

// 已經(jīng)連接到服務(wù)器
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(nonnull NSString *)host port:(uint16_t)port
{
    NSLog(@"連接成功,主機:%@,端口:%d",host,port);
    [self.socket readDataWithTimeout:-1 tag:10086];
}

1、用偏移量 maxLength 讀取數(shù)據(jù)

- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag
{
    [self readDataWithTimeout:timeout buffer:nil bufferOffset:0 maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
                     buffer:(NSMutableData *)buffer
               bufferOffset:(NSUInteger)offset
                        tag:(long)tag
{
    [self readDataWithTimeout:timeout buffer:buffer bufferOffset:offset maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
                     buffer:(NSMutableData *)buffer
               bufferOffset:(NSUInteger)offset
                  maxLength:(NSUInteger)length
                        tag:(long)tag
{
    // 初始化packet(讀取的數(shù)據(jù)都是一個個的包)
    GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
                                  
    // 往讀的隊列添加任務(wù),任務(wù)是包的形式
    [self->readQueue addObject:packet];
                                  
    // 讓讀任務(wù)離隊,開始執(zhí)行這條讀任務(wù)
    [self maybeDequeueRead];
}

2、讓讀任務(wù)離隊,開始執(zhí)行這條讀任務(wù)

- (void)maybeDequeueRead
{
    ...
}
a、如果當(dāng)前讀的包為空,而且flag為已連接
if ((currentRead == nil) && (flags & kConnected))
? 從readQueue中拿到第一個寫入的數(shù)據(jù)
// 如果讀的queue大于0 (里面裝的是我們封裝的GCDAsyncReadPacket數(shù)據(jù)包)
if ([readQueue count] > 0)
{
    // 從readQueue中拿到第一個寫入的數(shù)據(jù)
    currentRead = [readQueue objectAtIndex:0];
    // 移除第一個寫入的數(shù)據(jù)
    [readQueue removeObjectAtIndex:0];
    ...
}
? GCDAsyncSpecialPacket這種類型會做TLS認證
// 我們的數(shù)據(jù)包是否是GCDAsyncSpecialPacket這種類型,這個包里裝了TLS的一些設(shè)置
if ([currentRead isKindOfClass:[GCDAsyncSpecialPacket class]])
{
    // 如果是這種類型的數(shù)據(jù),那么我們就標(biāo)記flag為正在讀取TLS
    flags |= kStartingReadTLS;
    
    // 只有讀寫都開啟了TLS才會做TLS認證
    [self maybeStartTLS];
}
? 不是特殊包類型則直接讀取數(shù)據(jù)
else
{
    // 設(shè)置讀的任務(wù)超時,每次延時的時候還會調(diào)用 [self doReadData];
    [self setupReadTimerWithTimeout:currentRead->timeout];
    
    // 讀取數(shù)據(jù)
    [self doReadData];
}

b、讀的隊列沒有數(shù)據(jù),標(biāo)記flag為讀了沒有數(shù)據(jù)則斷開連接狀態(tài)
else if (flags & kDisconnectAfterReads)
{
    // 如果標(biāo)記為寫然后斷開連接
    if (flags & kDisconnectAfterWrites)
    {
        // 如果寫的隊列為0,而且寫為空
        if (([writeQueue count] == 0) && (currentWrite == nil))
        {
            [self closeWithError:nil];// 斷開連接
        }
    }
    else
    {
        [self closeWithError:nil];// 斷開連接
    }
}

c、如果有安全socket
else if (flags & kSocketSecure)
{
    // 把加密數(shù)據(jù)從進程緩存區(qū)中讀取到prebuffer里
    [self flushSSLBuffers];
    
    // 如果可讀字節(jié)數(shù)為0
    if ([preBuffer availableBytes] == 0)
    {
        // 重新恢復(fù)讀的source。因為每次開始讀數(shù)據(jù)的時候,都會掛起讀的source
        [self resumeReadSource];
    }
}

3、可能開啟TLS

- (void)maybeStartTLS
{
    // 只有讀和寫TLS都開啟
    if ((flags & kStartingReadTLS) && (flags & kStartingWriteTLS))
    {
            ...
    }
}
a、如果是用CFStream的,則安全傳輸為NO
// 是否需要安全傳輸
BOOL useSecureTransport = YES;

// 拿到當(dāng)前讀的數(shù)據(jù)
GCDAsyncSpecialPacket *tlsPacket = (GCDAsyncSpecialPacket *)currentRead;

// 得到設(shè)置字典
NSDictionary *tlsSettings = @{};
if (tlsPacket)
{
    tlsSettings = tlsPacket->tlsSettings;
}

// 拿到Key為CFStreamTLS的value
NSNumber *value = [tlsSettings objectForKey:GCDAsyncSocketUseCFStreamForTLS];
// 如果是用CFStream的,則安全傳輸為NO
if (value && [value boolValue])
    useSecureTransport = NO;
b、如果使用安全通道則開啟TLS
// 如果使用安全通道
if (useSecureTransport)
{
    // 開啟TLS
    [self ssl_startTLS];
}
c、CFStream形式的Tls
else
{
    [self cf_startTLS];
}

4、讀取數(shù)據(jù)

static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
    [asyncSocket doReadData];// 去讀取數(shù)據(jù)
}
- (void)doReadData
{
    ...
}
a、STEP 1 - READ FROM PREBUFFER
// 當(dāng)前總讀的數(shù)據(jù)量
NSUInteger totalBytesReadForCurrentRead = 0;

// 讀取數(shù)據(jù),直接讀到界限
bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
// 或者讀到指定長度或者數(shù)據(jù)包的長度為止
bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];

// 從上兩步拿到我們需要讀的長度,去看看有沒有空間去存儲
[currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];

// 拿到我們需要追加數(shù)據(jù)的指針位置
uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
                                                                  currentRead->bytesDone;

// 從prebuffer處復(fù)制過來數(shù)據(jù),bytesToCopy長度
memcpy(buffer, [preBuffer readBuffer], bytesToCopy);

// 從preBuffer移除掉已經(jīng)復(fù)制的數(shù)據(jù)
[preBuffer didRead:bytesToCopy];

// 已讀的數(shù)據(jù)加上
currentRead->bytesDone += bytesToCopy;
// 當(dāng)前已讀的數(shù)據(jù)加上
totalBytesReadForCurrentRead += bytesToCopy;

// 如果已讀 == 需要讀的長度,說明已經(jīng)讀完
done = (currentRead->bytesDone == currentRead->readLength);
b、STEP 2 - READ FROM SOCKET(從socket中去讀?。?/h5>
// 循環(huán)去讀
do
{
    // 用ssl方式去讀取數(shù)據(jù)
    result = SSLRead(sslContext, loop_buffer, loop_bytesToRead, &loop_bytesRead);
    
    // 讀了的大小加進度
    bytesRead += loop_bytesRead;
}
// 如果沒出錯,且讀的大小小于需要讀的大小,就一直循環(huán)
while ((result == noErr) && (bytesRead < bytesToRead));
c、開始讀取數(shù)據(jù),最普通的形式 read
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

// 加上讀的數(shù)量
currentRead->bytesDone += bytesRead;
// 把這一次讀的數(shù)量加上來
totalBytesReadForCurrentRead += bytesRead;
// 判斷是否已讀完
done = (currentRead->bytesDone == currentRead->readLength);

//檢查是否讀完
if (done)
{
    //完成這次數(shù)據(jù)的讀取
    [self completeCurrentRead];
}

//如果響應(yīng)讀數(shù)據(jù)進度的代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
{
    //代理queue中回調(diào)出去
    dispatch_async(delegateQueue, ^{ @autoreleasepool {
        [theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
    }});
}

Demo

Demo在我的Github上,歡迎下載。
SourceCodeAnalysisDemo

參考文獻

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容