iOS即時通訊進階 - CocoaAsyncSocket源碼解析(Read篇終)

前言:

本文為CocoaAsyncSocket Read篇終,將重點涉及該框架是如何利用緩沖區(qū)對數據進行讀取、以及各種情況下的數據包處理,其中還包括普通的、和基于TLS的不同讀取操作等等。
注:由于該框架源碼篇幅過大,且有大部分相對抽象的數據操作邏輯,盡管樓主竭力想要簡單的去陳述相關內容,但是閱讀起來仍會有一定的難度。如果不是誠心想學習IM相關知識,在這里就可以離場了...

本文系列第一篇:Connect篇已經完結,感興趣可以看看:
iOS即時通訊進階 - CocoaAsyncSocket源碼解析(Connect篇)
iOS即時通訊進階 - CocoaAsyncSocket源碼解析(Connect篇終)

Read第一篇
iOS即時通訊進階 - CocoaAsyncSocket源碼解析(Read篇)

注:文中涉及代碼比較多,建議大家結合源碼一起閱讀比較容易能加深理解。這里有樓主標注好注釋的源碼,有需要的可以作為參照:CocoaAsyncSocket源碼注釋
如果對該框架用法不熟悉的話,可以參考樓主之前文章:
iOS即時通訊,從入門到“放棄”?
即時通訊下數據粘包、斷包處理實例(基于CocoaAsyncSocket)
或者自行查閱。

目錄:
  • 1.淺析Read讀取,并闡述數據從socket到用戶手中的流程。?
  • 2.講講兩種TLS建立連接的過程。?
  • 3.深入講解Read的核心方法---doReadData的實現。?
正文:

前文講完了兩次TLS建立連接的流程,接著就是本篇的重頭戲了:doReadData方法。在這里我不準備直接把這個整個方法列出來,因為就光這一個方法,加上注釋有1200行,整個貼過來也無法展開描述,所以在這里我打算對它分段進行講解:

注:以下代碼整個包括在doReadData大括號中:

//讀取數據
- (void)doReadData
{
  ....
}

Part1.無法正常讀取數據時的前置處理:
//如果當前讀取的包為空,或者flag為讀取停止,這兩種情況是不能去讀取數據的
if ((currentRead == nil) || (flags & kReadsPaused))
{
     LogVerbose(@"No currentRead or kReadsPaused");
     
     // Unable to read at this time
     //如果是安全的通信,通過TLS/SSL
     if (flags & kSocketSecure)
     {
       //刷新SSLBuffer,把數據從鏈路上移到prebuffer中 (當前不讀取數據的時候做)
          [self flushSSLBuffers];
     }
     
   //判斷是否用的是 CFStream的TLS
     if ([self usingCFStreamForTLS])
     {

     }
     else
     {
       //掛起source
          if (socketFDBytesAvailable > 0)
          {
               [self suspendReadSource];
          }
     }
     return;
}

當我們當前讀取的包是空或者標記為讀停止狀態(tài)的時候,則不會去讀取數據。
前者不難理解,因為我們要讀取的數據最終是要傳給currentRead中去的,所以如果currentRead為空,我們去讀數據也沒有意義。
后者kReadsPaused標記是從哪里加上的呢?我們全局搜索一下,發(fā)現它才read超時的時候被添加。
講到這我們順便來看這個讀取超時的一個邏輯,我們每次做讀取任務傳進來的超時,都會調用這么一個方法:

Part2.讀取超時處理:
[self setupReadTimerWithTimeout:currentRead->timeout];
//初始化讀的超時
- (void)setupReadTimerWithTimeout:(NSTimeInterval)timeout
{
    if (timeout >= 0.0)
    {
        //生成一個定時器source
        readTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
        
        __weak GCDAsyncSocket *weakSelf = self;
        
        //句柄
        dispatch_source_set_event_handler(readTimer, ^{ @autoreleasepool {
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            __strong GCDAsyncSocket *strongSelf = weakSelf;
            if (strongSelf == nil) return_from_block;
            
            //執(zhí)行超時操作
            [strongSelf doReadTimeout];
            
        #pragma clang diagnostic pop
        }});
        
        #if !OS_OBJECT_USE_OBJC
        dispatch_source_t theReadTimer = readTimer;
        
        //取消的句柄
        dispatch_source_set_cancel_handler(readTimer, ^{
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            LogVerbose(@"dispatch_release(readTimer)");
            dispatch_release(theReadTimer);
            
        #pragma clang diagnostic pop
        });
        #endif
        
        //定時器延時 timeout時間執(zhí)行
        dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeout * NSEC_PER_SEC));
        //間隔為永遠,即只執(zhí)行一次
        dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);
        dispatch_resume(readTimer);
    }
}

這個方法定義了一個GCD定時器,這個定時器只執(zhí)行一次,間隔就是我們的超時,很顯然這是一個延時執(zhí)行,那小伙伴要問了,這里為什么我們不用NSTimer或者下面這種方式:

[self performSelector:<#(nonnull SEL)#> withObject:<#(nullable id)#> afterDelay:<#(NSTimeInterval)#>

原因很簡單,performSelector是基于runloop才能使用的,它本質是轉化成runloop基于非端口的源source0。很顯然我們所在的socketQueue開辟出來的線程,并沒有添加一個runloop。而NSTimer也是一樣。

所以這里我們用GCD Timer,因為它是基于XNU內核來實現的,并不需要借助于runloop

這里當超時時間間隔到達時,我們會執(zhí)行超時操作:

[strongSelf doReadTimeout];
//執(zhí)行超時操作
- (void)doReadTimeout
{
    // This is a little bit tricky.
    // Ideally we'd like to synchronously query the delegate about a timeout extension.
    // But if we do so synchronously we risk a possible deadlock.
    // So instead we have to do so asynchronously, and callback to ourselves from within the delegate block.
    
    //因為這里用同步容易死鎖,所以用異步從代理中回調
    
    //標記讀暫停
    flags |= kReadsPaused;
    
    __strong id theDelegate = delegate;

    //判斷是否實現了延時  補時的代理
    if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:shouldTimeoutReadWithTag:elapsed:bytesDone:)])
    {
        //拿到當前讀的包
        GCDAsyncReadPacket *theRead = currentRead;
        
        //代理queue中回調
        dispatch_async(delegateQueue, ^{ @autoreleasepool {
            
            NSTimeInterval timeoutExtension = 0.0;
            
            //調用代理方法,拿到續(xù)的時長
            timeoutExtension = [theDelegate socket:self shouldTimeoutReadWithTag:theRead->tag
                                                                         elapsed:theRead->timeout
                                                                       bytesDone:theRead->bytesDone];
            
            //socketQueue中,做延時
            dispatch_async(socketQueue, ^{ @autoreleasepool {
                
                [self doReadTimeoutWithExtension:timeoutExtension];
            }});
        }});
    }
    else
    {
        [self doReadTimeoutWithExtension:0.0];
    }
}
//做讀取數據延時
- (void)doReadTimeoutWithExtension:(NSTimeInterval)timeoutExtension
{
    if (currentRead)
    {
        if (timeoutExtension > 0.0)
        {
            //把超時加上
            currentRead->timeout += timeoutExtension;
            
            // Reschedule the timer
            //重新生成時間
            dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeoutExtension * NSEC_PER_SEC));
            //重置timer時間
            dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);
            
            // Unpause reads, and continue
            //在把paused標記移除
            flags &= ~kReadsPaused;
            //繼續(xù)去讀取數據
            [self doReadData];
        }
        else
        {
            //輸出讀取超時,并斷開連接
            LogVerbose(@"ReadTimeout");
            
            [self closeWithError:[self readTimeoutError]];
        }
    }
}

這里調用了續(xù)時代理,如果我們實現了這個代理,則可以增加這個超時時間,然后重新生成超時定時器,移除讀取停止的標記kReadsPaused。繼續(xù)去讀取數據。
否則我們就斷開socket
注意:這個定時器會被取消,如果當前數據包被讀取完成,這樣就不會走到定時器超時的時間,則不會斷開socket。講到這是不是大家就有印象了?這個就是之前在樓主:
iOS即時通訊,從入門到“放棄”?中講過的可以被用來做PingPong機制的原理。

我們接著回到doReadData中,我們講到如果當前讀取包為空或者狀態(tài)為kReadsPaused,我們就去執(zhí)行一些非讀取數據的處理。
這里我們第一步去判斷當前連接是否為kSocketSecure,也就是安全通道的TLS。如果是我們則調用:

if (flags & kSocketSecure)
{
     //刷新,把TLS加密型的數據從鏈路上移到prebuffer中 (當前暫停的時候做)
     [self flushSSLBuffers];
}

按理說,我們有當前讀取包的時候,在去從prebuffer、socket中去讀取,但是這里為什么要提前去讀呢?
我們來看看這個框架作者的解釋:

// Here's the situation:
// We have an established secure connection.
// There may not be a currentRead, but there might be encrypted data sitting around for us.
// When the user does get around to issuing a read, that encrypted data will need to be decrypted.
// So why make the user wait?
// We might as well get a head start on decrypting some data now.
// The other reason we do this has to do with detecting a socket disconnection.
// The SSL/TLS protocol has it's own disconnection handshake.
// So when a secure socket is closed, a "goodbye" packet comes across the wire.
// We want to make sure we read the "goodbye" packet so we can properly detect the TCP disconnection.

簡單來講,就是我們用TLS類型的Socket,讀取數據的時候需要解密的過程,而這個過程是費時的,我們沒必要讓用戶在讀取數據的時候去等待這個解密的過程,我們可以提前在數據一到達,就去讀取解密。
而且這種方式,還能時刻根據TLSgoodbye包來準確的檢測到TCP斷開連接。

在我們來看flushSSLBuffers方法之前,我們先來看看這個一直提到的全局緩沖區(qū)prebuffer的定義,它其實就是下面這么一個類的實例:

Part3.GCDAsyncSocketPreBuffer的定義
@interface GCDAsyncSocketPreBuffer : NSObject
{
    //unsigned char
    //提前的指針,指向這塊提前的緩沖區(qū)
    uint8_t *preBuffer;
    //size_t 它是一個與機器相關的unsigned類型,其大小足以保證存儲內存中對象的大小。
    //它可以存儲在理論上是可能的任何類型的數組的最大大小
    size_t preBufferSize;
    //讀的指針
    uint8_t *readPointer;
    //寫的指針
    uint8_t *writePointer;
}

里面存了3個指針,包括preBuffer起點指針、當前讀寫所處位置指針、以及一個preBufferSize,這個sizepreBuffer所指向的位置,在內存中分配的空間大小。

我們來看看它的幾個方法:

//初始化
- (id)initWithCapacity:(size_t)numBytes
{
    if ((self = [super init]))
    {
        //設置size
        preBufferSize = numBytes;
        //申請size大小的內存給preBuffer
        preBuffer = malloc(preBufferSize);
        
        //為同一個值
        readPointer = preBuffer;
        writePointer = preBuffer;
    }
    return self;
}

包括一個初始化方法,去初始化preBufferSize大小的一塊內存空間。然后3個指針都指向這個空間。

- (void)dealloc
{
    if (preBuffer)
        free(preBuffer);
}

銷毀的方法:釋放preBuffer。

//確認讀的大小
- (void)ensureCapacityForWrite:(size_t)numBytes
{
    //拿到當前可用的空間大小
    size_t availableSpace = [self availableSpace];
    
    //如果申請的大小大于可用的大小
    if (numBytes > availableSpace)
    {
        //需要多出來的大小
        size_t additionalBytes = numBytes - availableSpace;
        //新的總大小
        size_t newPreBufferSize = preBufferSize + additionalBytes;
        //重新去分配preBuffer
        uint8_t *newPreBuffer = realloc(preBuffer, newPreBufferSize);
        
        //讀的指針偏移量(已讀大?。?        size_t readPointerOffset = readPointer - preBuffer;
        //寫的指針偏移量(已寫大?。?        size_t writePointerOffset = writePointer - preBuffer;
        //提前的Buffer重新復制
        preBuffer = newPreBuffer;
        //大小重新賦值
        preBufferSize = newPreBufferSize;
        
        //讀寫指針重新賦值 + 上偏移量
        readPointer = preBuffer + readPointerOffset;
        writePointer = preBuffer + writePointerOffset;
    }
}

確保prebuffer可用空間的方法:這個方法會重新分配preBuffer,直到可用大小等于傳遞進來的numBytes,已用大小不會變。

//仍然可讀的數據,過程是先寫后讀,只有寫的大于讀的,才能讓你繼續(xù)去讀,不然沒數據可讀了
- (size_t)availableBytes
{
    return writePointer - readPointer;
}

- (uint8_t *)readBuffer
{
    return readPointer;
}

- (void)getReadBuffer:(uint8_t **)bufferPtr availableBytes:(size_t *)availableBytesPtr
{
    if (bufferPtr) *bufferPtr = readPointer;
    if (availableBytesPtr) *availableBytesPtr = [self availableBytes];
}

//讀數據的指針
- (void)didRead:(size_t)bytesRead
{
    readPointer += bytesRead;
    //如果讀了這么多,指針和寫的指針還相同的話,說明已經讀完,重置指針到最初的位置
    if (readPointer == writePointer)
    {
        // The prebuffer has been drained. Reset pointers.
        readPointer  = preBuffer;
        writePointer = preBuffer;
    }
}
//prebuffer的剩余空間  = preBufferSize(總大?。?- (寫的頭指針 - preBuffer一開的指針,即已被寫的大?。?
- (size_t)availableSpace
{
    return preBufferSize - (writePointer - preBuffer);
}

- (uint8_t *)writeBuffer
{
    return writePointer;
}

- (void)getWriteBuffer:(uint8_t **)bufferPtr availableSpace:(size_t *)availableSpacePtr
{
    if (bufferPtr) *bufferPtr = writePointer;
    if (availableSpacePtr) *availableSpacePtr = [self availableSpace];
}

- (void)didWrite:(size_t)bytesWritten
{
    writePointer += bytesWritten;
}

- (void)reset
{
    readPointer  = preBuffer;
    writePointer = preBuffer;
}

然后就是對讀寫指針進行處理的方法,如果讀了多少數據readPointer就后移多少,寫也是一樣。
而獲取當前未讀數據,則是用已寫指針-已讀指針,得到的差值,當已讀=已寫的時候,說明prebuffer數據讀完,則重置讀寫指針的位置,還是指向初始化位置。

講完全局緩沖區(qū)對于指針的處理,我們接著往下說
Part4.flushSSLBuffers方法:
//緩沖ssl數據
- (void)flushSSLBuffers
{
     LogTrace();
     //斷言為安全Socket
     NSAssert((flags & kSocketSecure), @"Cannot flush ssl buffers on non-secure socket");
     //如果preBuffer有數據可讀,直接返回
     if ([preBuffer availableBytes] > 0)
     {
          return;
     }
     
     #if TARGET_OS_IPHONE
     //如果用的CFStream的TLS,把數據用CFStream的方式搬運到preBuffer中
     if ([self usingCFStreamForTLS])
     {
        //如果flag為kSecureSocketHasBytesAvailable,而且readStream有數據可讀
          if ((flags & kSecureSocketHasBytesAvailable) && CFReadStreamHasBytesAvailable(readStream))
          {
               LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
               
            //默認一次讀的大小為4KB??
               CFIndex defaultBytesToRead = (1024 * 4);
               
            //用來確保有這么大的提前buffer緩沖空間
               [preBuffer ensureCapacityForWrite:defaultBytesToRead];
               //拿到寫的buffer
               uint8_t *buffer = [preBuffer writeBuffer];
               
            //從readStream中去讀, 一次就讀4KB,讀到數據后,把數據寫到writeBuffer中去   如果讀的大小小于readStream中數據流大小,則會不停的觸發(fā)callback,直到把數據讀完為止。
               CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
            //打印結果
               LogVerbose(@"%@ - CFReadStreamRead(): result = %i", THIS_METHOD, (int)result);
               
            //大于0,說明讀寫成功
               if (result > 0)
               {
                //把寫的buffer頭指針,移動result個偏移量
                    [preBuffer didWrite:result];
               }
               
            //把kSecureSocketHasBytesAvailable 仍然可讀的標記移除
               flags &= ~kSecureSocketHasBytesAvailable;
          }
          
          return;
     }
     
     #endif
     
    //不用CFStream的處理方法
    
    //先設置一個預估可用的大小
     __block NSUInteger estimatedBytesAvailable = 0;
     //更新預估可用的Block
     dispatch_block_t updateEstimatedBytesAvailable = ^{
          
        //預估大小 = 未讀的大小 + SSL的可讀大小
          estimatedBytesAvailable = socketFDBytesAvailable + [sslPreBuffer availableBytes];
          

          size_t sslInternalBufSize = 0;
        //獲取到ssl上下文的大小,從sslContext中
          SSLGetBufferedReadSize(sslContext, &sslInternalBufSize);
          //再加上下文的大小
          estimatedBytesAvailable += sslInternalBufSize;
     };
     
    //調用這個Block
     updateEstimatedBytesAvailable();
     
    //如果大于0,說明有數據可讀
     if (estimatedBytesAvailable > 0)
     {
        
          LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
          
        //標志,循環(huán)是否結束,SSL的方式是會阻塞的,直到讀的數據有estimatedBytesAvailable大小為止,或者出錯
          BOOL done = NO;
          do
          {
               LogVerbose(@"%@ - estimatedBytesAvailable = %lu", THIS_METHOD, (unsigned long)estimatedBytesAvailable);
               
               // Make sure there's enough room in the prebuffer
               //確保有足夠的空間給prebuffer
               [preBuffer ensureCapacityForWrite:estimatedBytesAvailable];
               
               // Read data into prebuffer
               //拿到寫的buffer
               uint8_t *buffer = [preBuffer writeBuffer];
               size_t bytesRead = 0;
               //用SSLRead函數去讀,讀到后,把數據寫到buffer中,estimatedBytesAvailable為需要讀的大小,bytesRead這一次實際讀到字節(jié)大小,為sslContext上下文
               OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
               LogVerbose(@"%@ - read from secure socket = %u", THIS_METHOD, (unsigned)bytesRead);
               
            //把寫指針后移bytesRead大小
               if (bytesRead > 0)
               {
                    [preBuffer didWrite:bytesRead];
               }
               
               LogVerbose(@"%@ - prebuffer.length = %zu", THIS_METHOD, [preBuffer availableBytes]);
               
            //如果讀數據出現錯誤
               if (result != noErr)
               {
                    done = YES;
               }
               else
               {
                //在更新一下可讀的數據大小
                    updateEstimatedBytesAvailable();
               }
               
          }
        //只有done為NO,而且 estimatedBytesAvailable大于0才繼續(xù)循環(huán)
        while (!done && estimatedBytesAvailable > 0);
     }
}

這個方法有點略長,包含了兩種SSL的數據處理:

  1. CFStream類型:我們會調用下面這個函數去從stream并且讀取數據并解密:
CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);

數據被讀取到后,直接轉移到了prebuffer中,并且調用:

[preBuffer didWrite:result];

讓寫指針后移讀取到的數據大小。
這里有兩個關于CFReadStreamRead方法,需要注意的問題:
1)就是我們調用它去讀取4KB數據,并不僅僅是只讀這么多,而是因為這個方法是會遞歸調用的,它每次只讀4KB,直到把stream中的數據讀完。
2)我們之前設置的CFStream函數的回調,在數據來了之后只會被觸發(fā)一次,以后數據再來都不會觸發(fā)。直到我們調用這個方法,把stream中的數據讀完,下次再來數據才會觸發(fā)函數回調。這也是我們在使用CFStream的時候,不需要擔心像source那樣,有數據會不斷的被觸發(fā)回調,而需要掛起像source那樣掛起stream(實際也沒有這樣的方法)。

  1. SSL安全通道類型:這里我們主要是循環(huán)去調用下面這個函數去讀取數據:
OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);

其他的基本和CFStream一致

這里需要注意的是SSLRead這個方法,并不是直接從我們的socket中獲取到的數據,而是從我們一開始綁定的SSL回調函數中,得到數據。而回調函數本身,也需要調用read函數從socket中獲取到加密的數據。然后再經由SSLRead這個方法,數據被解密,并且傳遞給buffer。

至于SSLRead綁定的回調函數,是怎么處理數據讀取的,因為它處理數據的流程,和我們doReadData后續(xù)數據讀取處理基本相似,所以現在暫時不提。

我們繞了一圈,講完了這個包為空或者當前暫停狀態(tài)下的前置處理,總結一下:
  1. 就是如果是SSL類型的數據,那么先解密了,緩沖到prebuffer中去。
  2. 判斷當前socket可讀數據大于0,非CFStreamSSL類型,則掛起source,防止反復觸發(fā)。
Part5.接著我們開始doReadData正常數據處理流程:

首先它大的方向,依然是分為3種類型的數據處理:
1.SSL安全通道; 2.CFStream類型SSL; 3.普通數據傳輸。
因為這3種類型的代碼,重復部分較大,處理流程基本類似,只不過調用讀取方法所有區(qū)別:

//1.
OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
//2.
CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
//3.
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

SSLRead回調函數內部,也調用了第3種read讀取,這個我們后面會說。
現在這里我們將跳過前兩種(方法部分調用可以見上面的flushSSLBuffers方法),只講第3種普通數據的讀取操作,而SSL的讀取操作,基本一致。

先來看看當前數據包任務是否完成,是如何定義的:

由于框架提供的對外read接口:

- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag;
- (void)readDataToLength:(NSUInteger)length withTimeout:(NSTimeInterval)timeout tag:(long)tag;
- (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag;

將數據讀取是否完成的操作,大致分為這3個類型:
1.全讀;2讀取一定的長度;3讀取到某個標記符為止。

當且僅當上面3種類型對應的操作完成,才視作當前包任務完成,才會回調我們在類中聲明的讀取消息的代理:

- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag

否則就等待著,直到當前數據包任務完成。

然后我們讀取數據的流程大致如下:

先從prebuffer中去讀取,如果讀完了,當前數據包任務仍未完成,那么再從socket中去讀取。
而判斷包是否讀完,都是用我們上面的3種類型,來對應處理的。

講了半天理論,想必大家看的有點不耐煩了,接下來看看代碼實際是如何處理的吧:

step1:從prebuffer中讀取數據:
//先從提前緩沖區(qū)去讀,如果緩沖區(qū)可讀大小大于0
if ([preBuffer availableBytes] > 0)
{
     // There are 3 types of read packets:
     // 
     // 1) Read all available data.
     // 2) Read a specific length of data.
     // 3) Read up to a particular terminator.
     //3種類型的讀法,1、全讀、2、讀取特定長度、3、讀取到一個明確的界限
   
     NSUInteger bytesToCopy;
     
   //如果當前讀的數據界限不為空
     if (currentRead->term != nil)
     {
          // Read type #3 - read up to a terminator
          //直接讀到界限
          bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
     }
     else
     {
          // Read type #1 or #2
          //讀取數據,讀到指定長度或者數據包的長度為止
          bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];
     }
     
     // Make sure we have enough room in the buffer for our read.
     //從上兩步拿到我們需要讀的長度,去看看有沒有空間去存儲
     [currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];
     
     // Copy bytes from prebuffer into packet buffer

   //拿到我們需要追加數據的指針位置
#pragma mark - 不明白
   //當前讀的數據 + 開始偏移 + 已經讀完的??
     uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
                                                                       currentRead->bytesDone;
     //從prebuffer處復制過來數據,bytesToCopy長度
     memcpy(buffer, [preBuffer readBuffer], bytesToCopy);
     
     // Remove the copied bytes from the preBuffer
   //從preBuffer移除掉已經復制的數據
     [preBuffer didRead:bytesToCopy];
     
   
     LogVerbose(@"copied(%lu) preBufferLength(%zu)", (unsigned long)bytesToCopy, [preBuffer availableBytes]);
     
     // Update totals
     
   //已讀的數據加上
     currentRead->bytesDone += bytesToCopy;
   //當前已讀的數據加上
     totalBytesReadForCurrentRead += bytesToCopy;
     
     // Check to see if the read operation is done
     //判斷是不是讀完了
     if (currentRead->readLength > 0)
     {
          // Read type #2 - read a specific length of data
          //如果已讀 == 需要讀的長度,說明已經讀完
          done = (currentRead->bytesDone == currentRead->readLength);
     }
   //判斷界限標記
     else if (currentRead->term != nil)
     {
          // Read type #3 - read up to a terminator
          
          // Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method
          //如果沒做完,且讀的最大長度大于0,去判斷是否溢出
          if (!done && currentRead->maxLength > 0)
          {
               // We're not done and there's a set maxLength.
               // Have we reached that maxLength yet?
               
           //如果已讀的大小大于最大的大小,則報溢出錯誤
               if (currentRead->bytesDone >= currentRead->maxLength)
               {
                    error = [self readMaxedOutError];
               }
          }
     }
     else
     {
          // Read type #1 - read all available data
          // 
          // We're done as soon as
          // - we've read all available data (in prebuffer and socket)
          // - we've read the maxLength of read packet.
          //判斷已讀大小和最大大小是否相同,相同則讀完
          done = ((currentRead->maxLength > 0) && (currentRead->bytesDone == currentRead->maxLength));
     }
     
}

這個方法就是利用我們之前提到的3種類型,來判斷數據包需要讀取的長度,然后調用:

memcpy(buffer, [preBuffer readBuffer], bytesToCopy);

把數據從preBuffer中,移到了currentRead數據包中。

step2:從socket中讀取數據:
// 從socket中去讀取

//是否讀到EOFException ,這個錯誤指的是文件結尾了還在繼續(xù)讀,就會導致這個錯誤被拋出
BOOL socketEOF = (flags & kSocketHasReadEOF) ? YES : NO;  // Nothing more to read via socket (end of file)

//如果沒完成,且沒錯,沒讀到結尾,且沒有可讀數據了
BOOL waiting   = !done && !error && !socketEOF && !hasBytesAvailable; // Ran out of data, waiting for more

//如果沒完成,且沒錯,沒讀到結尾,有可讀數據
if (!done && !error && !socketEOF && hasBytesAvailable)
{
   //斷言,有可讀數據
     NSAssert(([preBuffer availableBytes] == 0), @"Invalid logic");
   //是否讀到preBuffer中去
   BOOL readIntoPreBuffer = NO;
     uint8_t *buffer = NULL;
     size_t bytesRead = 0;
     
   //如果flag標記為安全socket
     if (flags & kSocketSecure)
     {
       //...類似flushSSLBuffer的一系列操作
     }
     else
     {
          // Normal socket operation
          //普通的socket 操作
       
          NSUInteger bytesToRead;
          
          // There are 3 types of read packets:
          //
          // 1) Read all available data.
          // 2) Read a specific length of data.
          // 3) Read up to a particular terminator.
          
       //和上面類似,讀取到邊界標記??不是吧
          if (currentRead->term != nil)
          {
               // Read type #3 - read up to a terminator
               
           //讀這個長度,如果到maxlength,就用maxlength??慈绻捎每臻g大于需要讀的空間,則不用prebuffer
               bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable
                                                    shouldPreBuffer:&readIntoPreBuffer];
          }
       
          else
          {
               // Read type #1 or #2
               //直接讀這個長度,如果到maxlength,就用maxlength
               bytesToRead = [currentRead readLengthForNonTermWithHint:estimatedBytesAvailable];
          }
          
       //大于最大值,則先讀最大值
          if (bytesToRead > SIZE_MAX) { // NSUInteger may be bigger than size_t (read param 3)
               bytesToRead = SIZE_MAX;
          }
          
          // Make sure we have enough room in the buffer for our read.
          //
          // We are either reading directly into the currentRead->buffer,
          // or we're reading into the temporary preBuffer.
          
          if (readIntoPreBuffer)
          {
               [preBuffer ensureCapacityForWrite:bytesToRead];
               
               buffer = [preBuffer writeBuffer];
          }
          else
          {
               [currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead];
               
               buffer = (uint8_t *)[currentRead->buffer mutableBytes]
                      + currentRead->startOffset
                      + currentRead->bytesDone;
          }
          
          // Read data into buffer
          
          int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
#pragma mark - 開始讀取數據,最普通的形式 read
       
       //讀數據
          ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
          LogVerbose(@"read from socket = %i", (int)result);
       //讀取錯誤
          if (result < 0)
          {
           //EWOULDBLOCK IO阻塞
               if (errno == EWOULDBLOCK)
               //先等待
                    waiting = YES;
               else
               //得到錯誤
                    error = [self errnoErrorWithReason:@"Error in read() function"];
               //把可讀取的長度設置為0
               socketFDBytesAvailable = 0;
          }
       //讀到邊界了
          else if (result == 0)
          {
               socketEOF = YES;
               socketFDBytesAvailable = 0;
          }
       //正常
          else
          {
           //設置讀到的數據長度
               bytesRead = result;
               
           //如果讀到的數據小于應該讀的長度,說明這個包沒讀完
               if (bytesRead < bytesToRead)
               {
                    // The read returned less data than requested.
                    // This means socketFDBytesAvailable was a bit off due to timing,
                    // because we read from the socket right when the readSource event was firing.
                    socketFDBytesAvailable = 0;
               }
           //正常
               else
               {
               //如果 socketFDBytesAvailable比讀了的數據小的話,直接置為0
                    if (socketFDBytesAvailable <= bytesRead)
                         socketFDBytesAvailable = 0;
               //減去已讀大小
                    else
                         socketFDBytesAvailable -= bytesRead;
               }
               //如果 socketFDBytesAvailable 可讀數量為0,把讀的狀態(tài)切換為等待
               if (socketFDBytesAvailable == 0)
               {
                    waiting = YES;
               }
          }
     }

本來想講點什么。。發(fā)現確實沒什么好講的,無非就是判斷應該讀取的長度,然后調用:

ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

socket中得到讀取的實際長度。

唯一需要講一下的可能是數據流向的問題,這里調用:`

bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable shouldPreBuffer:&readIntoPreBuffer];

來判斷數據是否先流向prebuffer,還是直接流向currentRead,而SSL的讀取中也有類似方法:

- (NSUInteger)optimalReadLengthWithDefault:(NSUInteger)defaultValue shouldPreBuffer:(BOOL *)shouldPreBufferPtr

這個方法核心的思路就是,如果當前讀取包,長度給明了,則直接流向currentRead,如果數據長度不清楚,那么則去判斷這一次讀取的長度,和currentRead可用空間長度去對比,如果長度比currentRead可用空間小,則流向currentRead,否則先用prebuffer來緩沖。

至于細節(jié)方面,大家對著github中的源碼注釋看看吧,這么大篇幅的業(yè)務代碼,一行行講確實沒什么意義。

走完這兩步讀取,接著就是第三步:

step3:判斷數據包完成程度:

這里有3種情況:
1.數據包剛好讀完;2.數據粘包;3.數據斷包;
注:這里判斷粘包斷包的長度,都是我們一開始調用read方法給的長度或者分界符得出的。

很顯然,第一種就什么都不用處理,完美匹配。
第二種情況,我們把需要的長度放到currentRead,多余的長度放到prebuffer中去。
第三種情況,數據還沒讀完,我們暫時為未讀完。

這里就不貼代碼了。

就這樣普通讀取數據的整個流程就走完了,而SSL的兩種模式,和上述基本一致。

我們接著根據之前讀取的結果,來判斷數據是否讀完:

//檢查是否讀完
if (done)
{
    //完成這次數據的讀取
    [self completeCurrentRead];
    //如果沒出錯,沒有到邊界,prebuffer中還有可讀數據
    if (!error && (!socketEOF || [preBuffer availableBytes] > 0))
    {
        //讓讀操作離隊,繼續(xù)進行下一次讀取
        [self maybeDequeueRead];
    }
}

如果讀完,則去做讀完的操作,并且進行下一次讀取。

我們來看看讀完的操作:
//完成了這次的讀數據
- (void)completeCurrentRead
{
    LogTrace();
    //斷言currentRead
    NSAssert(currentRead, @"Trying to complete current read when there is no current read.");
    
    //結果數據
    NSData *result = nil;
    
    //如果是我們自己創(chuàng)建的Buffer
    if (currentRead->bufferOwner)
    {
        // We created the buffer on behalf of the user.
        // Trim our buffer to be the proper size.
        //修剪buffer到合適的大小
        //把大小設置到我們讀取到的大小
        [currentRead->buffer setLength:currentRead->bytesDone];
        //賦值給result
        result = currentRead->buffer;
    }
    else
    {
        // We did NOT create the buffer.
        // The buffer is owned by the caller.
        // Only trim the buffer if we had to increase its size.
        //這是調用者的data,我們只會去加大尺寸
        if ([currentRead->buffer length] > currentRead->originalBufferLength)
        {
            //拿到的讀的size
            NSUInteger readSize = currentRead->startOffset + currentRead->bytesDone;
            //拿到原始尺寸
            NSUInteger origSize = currentRead->originalBufferLength;
            
            //取得最大的
            NSUInteger buffSize = MAX(readSize, origSize);
            //把buffer設置為較大的尺寸
            [currentRead->buffer setLength:buffSize];
        }
        //拿到數據的頭指針
        uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset;
        
        //reslut為,從頭指針開始到長度為寫的長度 freeWhenDone為YES,創(chuàng)建完就釋放buffer
        result = [NSData dataWithBytesNoCopy:buffer length:currentRead->bytesDone freeWhenDone:NO];
    }
    
    __strong id theDelegate = delegate;

#pragma mark -總算到調用代理方法,接受到數據了
    if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadData:withTag:)])
    {
        //拿到當前的數據包
        GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer
        
        dispatch_async(delegateQueue, ^{ @autoreleasepool {
            //把result在代理queue中回調出去。
            [theDelegate socket:self didReadData:result withTag:theRead->tag];
        }});
    }
    //取消掉讀取超時
    [self endCurrentRead];
}

這里對currentReaddata做了個長度的設置。然后調用代理把最終包給回調出去。最后關掉我們之前提到的讀取超時。

還是回到doReadData,就剩下最后一點處理了:

//如果這次讀的數量大于0
else if (totalBytesReadForCurrentRead > 0)
{
    // We're not done read type #2 or #3 yet, but we have read in some bytes

    __strong id theDelegate = delegate;
    
    //如果響應讀數據進度的代理
    if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
    {
        long theReadTag = currentRead->tag;
        
        //代理queue中回調出去
        dispatch_async(delegateQueue, ^{ @autoreleasepool {
            
            [theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
        }});
    }
}

這里未完成,如果這次讀取大于0,如果響應讀取進度的代理,則把當前進度回調出去。

最后檢查錯誤:
//檢查錯誤
if (error)
{
    //如果有錯直接報錯斷開連接
    [self closeWithError:error];
}
//如果是讀到邊界錯誤
else if (socketEOF)
{
    [self doReadEOF];
}

//如果是等待
else if (waiting)
{
    //如果用的是CFStream,則讀取數據和source無關
    //非CFStream形式
    if (![self usingCFStreamForTLS])
    {
        // Monitor the socket for readability (if we're not already doing so)
        //重新恢復source
        [self resumeReadSource];
    }
}

如果有錯,直接斷開socket,如果是邊界錯誤,調用邊界錯誤處理,如果是等待,說明當前包還沒讀完,如果非CFStreamTLS,則恢復source,等待下一次數據到達的觸發(fā)。

關于這個讀取邊界錯誤EOF,這里我簡單的提下,其實它就是服務端發(fā)出一個邊界錯誤,說明不會再有數據發(fā)送給我們了。我們講無法再接收到數據,但是我們其實還是可以寫數據,發(fā)送給服務端的。

doReadEOF這個方法的處理,就是做了這么一件事。判斷我們是否需要這種不可讀,只能寫的連接。

我們來簡單看看這個方法:
Part6.讀取邊界錯誤處理:
//讀到EOFException,邊界錯誤
- (void)doReadEOF
{
    LogTrace();
   //這個方法可能被調用很多次,如果讀到EOF的時候,還有數據在prebuffer中,在調用doReadData之后?? 這個方法可能被持續(xù)的調用
    
    //標記為讀EOF
    flags |= kSocketHasReadEOF;
    
    //如果是安全socket
    if (flags & kSocketSecure)
    {
        //去刷新sslbuffer中的數據
        [self flushSSLBuffers];
    }
    
    //標記是否應該斷開連接
    BOOL shouldDisconnect = NO;
    NSError *error = nil;
    
    //如果狀態(tài)為開始讀寫TLS
    if ((flags & kStartingReadTLS) || (flags & kStartingWriteTLS))
    {
        //我們得到EOF在開啟TLS之前,這個TLS握手是不可能的,因此這是不可恢復的錯誤
        
        //標記斷開連接
        shouldDisconnect = YES;
        //如果是安全的TLS,賦值錯誤
        if ([self usingSecureTransportForTLS])
        {
            error = [self sslError:errSSLClosedAbort];
        }
    }
    //如果是讀流關閉狀態(tài)
    else if (flags & kReadStreamClosed)
    {
        
        //不應該被關閉
        shouldDisconnect = NO;
    }
    else if ([preBuffer availableBytes] > 0)
    {
        //仍然有數據可讀的時候不關閉
        shouldDisconnect = NO;
    }
    else if (config & kAllowHalfDuplexConnection)
    {
        
        //拿到socket
        int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
        
        //輪詢用的結構體
        
        /*
         struct pollfd {
         int fd;        //文件描述符
         short events;  //要求查詢的事件掩碼  監(jiān)聽的
         short revents; //返回的事件掩碼   實際發(fā)生的
         };
         */
        
        struct pollfd pfd[1];
        pfd[0].fd = socketFD;
        //寫數據不會導致阻塞。
        pfd[0].events = POLLOUT;
        //這個為當前實際發(fā)生的事情
        pfd[0].revents = 0;
        
        /*
         poll函數使用pollfd類型的結構來監(jiān)控一組文件句柄,ufds是要監(jiān)控的文件句柄集合,nfds是監(jiān)控的文件句柄數量,timeout是等待的毫秒數,這段時間內無論I/O是否準備好,poll都會返回。timeout為負數表示無線等待,timeout為0表示調用后立即返回。執(zhí)行結果:為0表示超時前沒有任何事件發(fā)生;-1表示失?。怀晒t返回結構體中revents不為0的文件描述符個數。pollfd結構監(jiān)控的事件類型如下:
         int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
         */
        //阻塞的,但是timeout為0,則不阻塞,直接返回
        poll(pfd, 1, 0);
        
        //如果被觸發(fā)的事件是寫數據
        if (pfd[0].revents & POLLOUT)
        {
            // Socket appears to still be writeable
            
            //則標記為不關閉
            shouldDisconnect = NO;
            //標記為讀流關閉
            flags |= kReadStreamClosed;
            
            // Notify the delegate that we're going half-duplex
            //通知代理,我們開始半雙工
            __strong id theDelegate = delegate;

            //調用已經關閉讀流的代理方法
            if (delegateQueue && [theDelegate respondsToSelector:@selector(socketDidCloseReadStream:)])
            {
                dispatch_async(delegateQueue, ^{ @autoreleasepool {
                    
                    [theDelegate socketDidCloseReadStream:self];
                }});
            }
        }
        else
        {
            //標記為斷開
            shouldDisconnect = YES;
        }
    }
    else
    {
        shouldDisconnect = YES;
    }
    
    //如果應該斷開
    if (shouldDisconnect)
    {
        if (error == nil)
        {
            //判斷是否是安全TLS傳輸
            if ([self usingSecureTransportForTLS])
            {
                ///標記錯誤信息
                if (sslErrCode != noErr && sslErrCode != errSSLClosedGraceful)
                {
                    error = [self sslError:sslErrCode];
                }
                else
                {
                    error = [self connectionClosedError];
                }
            }
            else
            {
                error = [self connectionClosedError];
            }
        }
        //關閉socket
        [self closeWithError:error];
    }
    //不斷開
    else
    {
        //如果不是用CFStream流
        if (![self usingCFStreamForTLS])
        {
            // Suspend the read source (if needed)
            //掛起讀source
            [self suspendReadSource];
        }
    }
}

簡單說一下,這個方法主要是對socket是否需要主動關閉進行了判斷:這里僅僅以下3種情況,不會關閉socket

  1. 讀流已經是關閉狀態(tài)(如果加了這個標記,說明為半雙工連接狀態(tài))。
  • preBuffer中還有可讀數據,我們需要等數據讀完才能關閉連接。
  • 配置標記為kAllowHalfDuplexConnection,我們則要開始半雙工處理。我們調用了:
poll(pfd, 1, 0);

函數,如果觸發(fā)了寫事件POLLOUT,說明我們半雙工連接成功,則我們可以在讀流關閉的狀態(tài)下,仍然可以向服務器寫數據。

其他情況下,一律直接關閉socket
而不關閉的情況下,我們會掛起source。這樣我們就只能可寫不可讀了。

最后還是提下SSL的回調方法,數據解密的地方。兩種模式的回調;

Part7.兩種SSL數據解密位置:

1.CFStream:當我們調用:

CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);

數據就會被解密。
2.SSL安全通道:當我們調用:

OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);

會觸發(fā)SSL綁定的函數回調:

//讀函數
static OSStatus SSLReadFunction(SSLConnectionRef connection, void *data, size_t *dataLength)
{
    //拿到socket
    GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)connection;
    
    //斷言當前為socketQueue
    NSCAssert(dispatch_get_specific(asyncSocket->IsOnSocketQueueOrTargetQueueKey), @"What the deuce?");
    
    //讀取數據,并且返回狀態(tài)碼
    return [asyncSocket sslReadWithBuffer:data length:dataLength];
}

接著我們在下面的方法進行了數據讀?。?/p>

//SSL讀取數據最終方法
- (OSStatus)sslReadWithBuffer:(void *)buffer length:(size_t *)bufferLength
{
    //...
    ssize_t result = read(socketFD, buf, bytesToRead);
    //....
}

其實read這一步,數據是沒有被解密的,然后傳遞回SSLReadFunction,在傳遞到SSLRead內部,數據被解密。

尾聲:

這個系列就剩下最后一篇Write了。由于內容相對比較簡單,預計就一篇寫完了。
如果一直看到這里的朋友,會發(fā)現,相對之前有些內容,講解沒那么詳細了。其實原因主要有兩點,一是代碼數量龐大,確實無法詳細。二是樓主對這個系列寫的有點不耐煩,想要盡快結束了..
不過至少整篇的源碼注釋在github上是有的,我覺得大家自己去對著源碼去閱讀理解同樣重要,如果一直逐字逐行的去講,那就真的沒什么意義了。

畢竟授之以魚,不如授之以漁。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容