「高并發(fā)通信框架Netty4 源碼解讀(六)」NIO通道之Socket通道

socket網(wǎng)絡(luò)通信太重要了。也是本專題的重中之重,所以小編單獨寫一篇文章來介紹Socket通道。Socket 通道有與文件通道不同的特征。新的 socket 通道類可以運行非阻塞模式并且是可選擇的。這兩個性能可以激活程序(如網(wǎng)絡(luò)服務(wù)器和中間件組件)巨大的可伸縮性和靈活性。本節(jié)中我們會看到,再也沒有為每個 socket 連接使用一個線程的必要了,也避免了管理大量線程所需的上下文交換總開銷。借助新的 NIO 類,一個或幾個線程就可以管理成百上千的活動 socket 連接了并且只有很少甚至可能沒有性能損失。這一點相當相當?shù)闹匾。。?/p>


全部 socket 通道類(DatagramChannel、 SocketChannel 和ServerSocketChannel)都是由位于 java.nio.channels.spi 包中的 AbstractSelectableChannel 引申而來。這意味著我們可以用一個 Selector 對象來執(zhí)行 socket 通道的有條件的選擇。選擇器下一篇再講。

請注意 DatagramChannel 和 SocketChannel 實現(xiàn)定義讀和寫功能的接口而 ServerSocketChannel不實現(xiàn)。ServerSocketChannel 負責(zé)監(jiān)聽傳入的連接和創(chuàng)建新的 SocketChannel 對象,它本身從不傳輸數(shù)據(jù)。

在我們具體討論每一種 socket 通道前,您應(yīng)該了解 socket 和 socket 通道之間的關(guān)系。之前的博文有寫道,通道是一個連接 I/O 服務(wù)導(dǎo)管并提供與該服務(wù)交互的方法。就某個 socket 而言,它不會再次實現(xiàn)與之對應(yīng)的 socket 通道類中的 socket 協(xié)議 API,而 java.net 中已經(jīng)存在的 socket 通道都可以被大多數(shù)協(xié)議操作重復(fù)使用。

全部 socket 通道類(DatagramChannel、 SocketChannel 和 ServerSocketChannel)在被實例化時都會創(chuàng)建一個對等 socket 對象。這些是我們所熟悉的來自 java.net 的類(Socket、 ServerSocket和 DatagramSocket),它們已經(jīng)被更新以識別通道。對等 socket 可以通過調(diào)用 socket( )方法從一個通道上獲取。此外,這三個 java.net 類現(xiàn)在都有 getChannel( )方法。

雖然每個 socket 通道(在 java.nio.channels 包中)都有一個關(guān)聯(lián)的 java.net socket 對象,卻并非所有的 socket 都有一個關(guān)聯(lián)的通道。如果您用傳統(tǒng)方式(直接實例化)創(chuàng)建了一個Socket 對象,它就不會有關(guān)聯(lián)的 SocketChannel 并且它的 getChannel( )方法將總是返回 null。

Socket 通道委派協(xié)議操作給對等 socket 對象。如果在通道類中存在似乎重復(fù)的 socket 方法,那么將有某個新的或者不同的行為同通道類上的這個方法相關(guān)聯(lián)。

非阻塞模式

Socket 通道可以在非阻塞模式下運行。這個陳述雖然簡單卻有著深遠的含義。傳統(tǒng) Java socket的阻塞性質(zhì)曾經(jīng)是 Java 程序可伸縮性的最重要制約之一。非阻塞 I/O 是許多復(fù)雜的、高性能的程序構(gòu)建的基礎(chǔ)。

要把一個 socket 通道置于非阻塞模式,我們要依靠所有 socket 通道類的公有超級類:SelectableChannel。下面的方法就是關(guān)于通道的阻塞模式的:

public abstract class SelectableChannel
extends AbstractChannel
implements Channel
{
  // This is a partial API listing
  public abstract void configureBlocking (boolean block) throws IOException;
  public abstract boolean isBlocking( );
  public abstract Object blockingLock( );
}

有條件的選擇(readiness selection)是一種可以用來查詢通道的機制,該查詢可以判斷通道是否準備好執(zhí)行一個目標操作,如讀或?qū)?。非阻?I/O 和可選擇性是緊密相連的,那也正是管理阻塞模式的 API 代碼要在 SelectableChannel 超級類中定義的原因。 SelectableChannel 的剩余 API 將在下篇討論。
設(shè)置或重新設(shè)置一個通道的阻塞模式是很簡單的,只要調(diào)用 configureBlocking( )方法即可,傳遞參數(shù)值為 true 則設(shè)為阻塞模式,參數(shù)值為 false 值設(shè)為非阻塞模式。真的,就這么簡單!您可以通過調(diào)用 isBlocking( )方法來判斷某個 socket 通道當前處于哪種模式:

SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
...
if ( ! sc.isBlocking( )) {
  doSomething (cs);
}

服務(wù)器端的使用經(jīng)常會考慮到非阻塞 socket 通道,因為它們使同時管理很多 socket 通道變得更容易。但是,在客戶端使用一個或幾個非阻塞模式的 socket 通道也是有益處的,例如,借助非阻塞socket 通道, GUI 程序可以專注于用戶請求并且同時維護與一個或多個服務(wù)器的會話。在很多程序上,非阻塞模式都是有用的。

偶爾地,我們也會需要防止 socket 通道的阻塞模式被更改。 API 中有一個 blockingLock( )方法,該方法會返回一個非透明的對象引用。返回的對象是通道實現(xiàn)修改阻塞模式時內(nèi)部使用的。只有擁有此對象的鎖的線程才能更改通道的阻塞模式(對象的鎖是用同步的 Java 密碼獲取的。對于確保在執(zhí)行代碼的關(guān)鍵部分時 socket 通道的阻塞模式不會改變以及在不影響其他線程的前提下暫時改變阻塞模式來說,這個方法都是非常方便的。

Socket socket = null;
Object lockObj = serverChannel.blockingLock( );
// have a handle to the lock object, but haven't locked it yet
// may block here until lock is acquired
synchronize (lockObj)
{
// This thread now owns the lock; mode can't be changed
  boolean prevState = serverChannel.isBlocking( );
  serverChannel.configureBlocking (false);
  socket = serverChannel.accept( );
  serverChannel.configureBlocking (prevState);
}
// lock is now released, mode is allowed to change
if (socket != null) {
  doSomethingWithTheSocket (socket);
}

.

ServerSocketChannel

讓我們從最簡單的 ServerSocketChannel 來開始對 socket 通道類的討論。以下是ServerSocketChannel 的完整 API:

public abstract class ServerSocketChannel extends AbstractSelectableChannel
{
  public static ServerSocketChannel open( ) throws IOException
  public abstract ServerSocket socket( );
  public abstract ServerSocket accept( ) throws IOException;
  public final int validOps( )
}

ServerSocketChannel 是一個基于通道的 socket 監(jiān)聽器。它同我們所熟悉的 java.net.ServerSocket執(zhí)行相同的基本任務(wù),不過它增加了通道語義,因此能夠在非阻塞模式下運行。

用靜態(tài)的 open( )工廠方法創(chuàng)建一個新的 ServerSocketChannel 對象,將會返回同一個未綁定的java.net.ServerSocket 關(guān)聯(lián)的通道。該對等 ServerSocket 可以通過在返回的 ServerSocketChannel 上調(diào)用 socket( )方法來獲取。作為 ServerSocketChannel 的對等體被創(chuàng)建的 ServerSocket 對象依賴通道實現(xiàn)。這些 socket 關(guān)聯(lián)的 SocketImpl 能識別通道。通道不能被封裝在隨意的 socket 對象外面。

由于 ServerSocketChannel 沒有 bind( )方法,因此有必要取出對等的 socket 并使用它來綁定到一個端口以開始監(jiān)聽連接。我們也是使用對等 ServerSocket 的 API 來根據(jù)需要設(shè)置其他的 socket 選項。

ServerSocketChannel ssc = ServerSocketChannel.open( );
ServerSocket serverSocket = ssc.socket( );
// Listen on port 1234
serverSocket.bind (new InetSocketAddress (1234));

同它的對等體 java.net.ServerSocket 一樣, ServerSocketChannel 也有 accept( )方法。一旦您創(chuàng)建了一個 ServerSocketChannel 并用對等 socket 綁定了它,然后您就可以在其中一個上調(diào)用 accept( )。

如果您選擇在 ServerSocket 上調(diào)用 accept( )方法,那么它會同任何其他的 ServerSocket 表現(xiàn)一樣的行為:總是阻塞并返回一個 java.net.Socket 對象。如果您選擇在 ServerSocketChannel 上調(diào)用 accept( )方法則會返回 SocketChannel 類型的對象,返回的對象能夠在非阻塞模式下運行。

假設(shè)系統(tǒng)已經(jīng)有一個安全管理器(security manager),兩種形式的方法調(diào)用都執(zhí)行相同的安全檢查。如果以非阻塞模式被調(diào)用,當沒有傳入連接在等待時, ServerSocketChannel.accept( )會立即返回 null。正是這種檢查連接而不阻塞的能力實現(xiàn)了可伸縮性并降低了復(fù)雜性??蛇x擇性也因此得到實現(xiàn)。我們可以使用一個選擇器實例來注冊一個ServerSocketChannel 對象以實現(xiàn)新連接到達時自動通知的功能。下例演示了如何使用一個非阻塞的 accept( )方法。

public class ChannelAccept {
    public static final String GREETING = "Hello I must be going.\r\n";

    public static void main(String[] argv)
            throws Exception {
        int port = 1234; // default
        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc  .configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connections");
            SocketChannel sc = ssc.accept();
            if (sc == null) {
// no connections, snooze a while
                Thread.sleep(2000);
            } else {
                System.out.println("Incoming connection from: "
                        + sc.socket().getRemoteSocketAddress());
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}

SocketChannel

下面開始學(xué)習(xí) SocketChannel,它是使用最多的 socket 通道類:

public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel
{
  // This is a partial API listing
  public static SocketChannel open( ) throws IOException
  public static SocketChannel open (InetSocketAddress remote) throws IOException
  public abstract Socket socket( );
  public abstract boolean connect (SocketAddress remote) throws IOException;
  public abstract boolean isConnectionPending( );
  public abstract boolean finishConnect( ) throws IOException;
  public abstract boolean isConnected( );
  public final int validOps( )
}

Socket 和 SocketChannel 類封裝點對點、有序的網(wǎng)絡(luò)連接,類似于我們所熟知并喜愛的 TCP/IP網(wǎng)絡(luò)連接SocketChannel 扮演客戶端發(fā)起同一個監(jiān)聽服務(wù)器的連接。直到連接成功,它才能收到數(shù)據(jù)并且只會從連接到的地址接收。每個 SocketChannel 對象創(chuàng)建時都是同一個對等的 java.net.Socket 對象串聯(lián)的。靜態(tài)的 open( )方法可以創(chuàng)建一個新的 SocketChannel 對象,而在新創(chuàng)建的 SocketChannel 上調(diào)用 socket( )方法能返回它對等的 Socket 對象;在該 Socket 上調(diào)用 getChannel( )方法則能返回最初的那個 SocketChannel。

雖然每個 SocketChannel 對象都會創(chuàng)建一個對等的 Socket 對象,反過來卻不成立。直接創(chuàng)建的 Socket 對象不會關(guān)聯(lián) SocketChannel 對象,它們的getChannel( )方法只返回 null。

新創(chuàng)建的 SocketChannel 雖已打開卻是未連接的。在一個未連接的 SocketChannel 對象上嘗試一個 I/O 操作會導(dǎo)致 NotYetConnectedException 異常。我們可以通過在通道上直接調(diào)用 connect( )方法或在通道關(guān)聯(lián)的 Socket 對象上調(diào)用 connect( )來將該 socket 通道連接。一旦一個 socket 通道被連接,它將保持連接狀態(tài)直到被關(guān)閉。您可以通過調(diào)用布爾型的 isConnected( )方法來測試某個SocketChannel 當前是否已連接。

第二種帶 InetSocketAddress 參數(shù)形式的 open( )是在返回之前進行連接的便捷方法。這段代碼:

SocketChannel socketChannel = SocketChannel.open (new InetSocketAddress ("somehost", somePort));

等價于下面這段代碼:

SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));

如果您選擇使用傳統(tǒng)方式進行連接——通過在對等 Socket 對象上調(diào)用 connect( )方法,那么傳統(tǒng)的連接語義將適用于此。線程在連接建立好或超時過期之前都將保持阻塞。如果您選擇通過在通上直接調(diào)用 connect( )方法來建立連接并且通道處于阻塞模式(默認模式),那么連接過程實際上是一樣的。

在 SocketChannel 上并沒有一種 connect( )方法可以讓您指定超時(timeout)值,當 connect( )方法在非阻塞模式下被調(diào)用時 SocketChannel 提供并發(fā)連接:它發(fā)起對請求地址的連接并且立即返回值。如果返回值是 true,說明連接立即建立了(這可能是本地環(huán)回連接);如果連接不能立即建立, connect( )方法會返回 false 且并發(fā)地繼續(xù)連接建立過程。

面向流的的 socket 建立連接狀態(tài)需要一定的時間,因為兩個待連接系統(tǒng)之間必須進行包對話以建立維護流 socket 所需的狀態(tài)信息??缭介_放互聯(lián)網(wǎng)連接到遠程系統(tǒng)會特別耗時。假如某個SocketChannel 上當前正由一個并發(fā)連接,isConnectPending( )方法就會返回 true 值。調(diào)用 finishConnect( )方法來完成連接過程,該方法任何時候都可以安全地進行調(diào)用。假如在一個非阻塞模式的 SocketChannel 對象上調(diào)用 finishConnect( )方法,將可能出現(xiàn)下列情形之一:

  • connect( )方法尚未被調(diào)用。那么將產(chǎn)生 NoConnectionPendingException 異常。
  • 連接建立過程正在進行,尚未完成。那么什么都不會發(fā)生, finishConnect( )方法會立即返回false 值。
  • 在非阻塞模式下調(diào)用 connect( )方法之后, SocketChannel 又被切換回了阻塞模式。那么如果有必要的話,調(diào)用線程會阻塞直到連接建立完成, finishConnect( )方法接著就會返回 true值。
  • 在初次調(diào)用 connect( )或最后一次調(diào)用 finishConnect( )之后,連接建立過程已經(jīng)完成。那么SocketChannel 對象的內(nèi)部狀態(tài)將被更新到已連接狀態(tài), finishConnect( )方法會返回 true值,然后 SocketChannel 對象就可以被用來傳輸數(shù)據(jù)了。
  • 連接已經(jīng)建立。那么什么都不會發(fā)生, finishConnect( )方法會返回 true 值。

當通道處于中間的連接等待(connection-pending)狀態(tài)時,您只可以調(diào)用 finishConnect( )、isConnectPending( )或 isConnected( )方法。一旦連接建立過程成功完成, isConnected( )將返回 true值。

InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
  doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );

一段用來管理異步連接的可用代碼

public class ConnectAsync {
    public static void main(String[] argv) throws Exception {
        String host = "localhost";
        int port = 80;
        if (argv.length == 2) {
            host = argv[0];
            port = Integer.parseInt(argv[1]);
        }
        InetSocketAddress addr = new InetSocketAddress(host, port);
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        System.out.println("initiating connection");
        sc.connect(addr);
        while (!sc.finishConnect()) {
            doSomethingUseful();
        }
        System.out.println("connection established");
// Do something with the connected socket
// The SocketChannel is still nonblocking
        sc.close();
    }

    private static void doSomethingUseful() {
        System.out.println("doing something useless");
    }
}

如果嘗試異步連接失敗,那么下次調(diào)用 finishConnect( )方法會產(chǎn)生一個適當?shù)慕?jīng)檢查的異常以指出問題的性質(zhì)。通道然后就會被關(guān)閉并將不能被連接或再次使用。與連接相關(guān)的方法使得我們可以對一個通道進行輪詢并在連接進行過程中判斷通道所處的狀態(tài)。

Socket 通道是線程安全的。并發(fā)訪問時無需特別措施來保護發(fā)起訪問的多個線程,不過任何時候都只有一個讀操作和一個寫操作在進行中。請記住, sockets 是面向流的而非包導(dǎo)向的。它們可以保證發(fā)送的字節(jié)會按照順序到達但無法承諾維持字節(jié)分組。某個發(fā)送器可能給一個 socket 寫入了20 個字節(jié)而接收器調(diào)用 read( )方法時卻只收到了其中的 3 個字節(jié)。剩下的 17 個字節(jié)還是傳輸中。由于這個原因,讓多個不配合的線程共享某個流 socket 的同一側(cè)絕非一個好的設(shè)計選擇。connect( )和 finishConnect( )方法是互相同步的,并且只要其中一個操作正在進行,任何讀或?qū)懙姆椒ㄕ{(diào)用都會阻塞,即使是在非阻塞模式下。如果此情形下您有疑問或不能承受一個讀或?qū)懖僮髟谀硞€通道上阻塞,請用 isConnected( )方法測試一下連接狀態(tài)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容