Netty源碼學(xué)習(xí)(1)--java nio

? ? Java??

? ? NIO 由以下幾個(gè)核心部分組成:

? ? Channels、Buffers、Selectors

? ? 雖然Java NIO 中除此之外還有很多類和組件,但在我看來,Channel,Buffer 和Selector 構(gòu)成了核心的API。其它組件,如Pipe和FileLock,只不過是與三個(gè)核心組件共同使用的工具類。


Channel 和Buffer

? ? ? ? 基本上,所有的 IO 在NIO中都從一個(gè)Channel 開始。Channel 有點(diǎn)象流。數(shù)據(jù)可以從Channel讀到Buffer中,也可以從Buffer 寫到Channel中。

? ? ? ? Channel和Buffer有好幾種類型。下面是JAVA NIO中的一些主要Channel的實(shí)現(xiàn):

? ? ? ? FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel

? ? ? ? 正如你所看到的,這些通道涵蓋了UDP 和 TCP 網(wǎng)絡(luò)IO,以及文件IO。與這些類一起的有一些有趣的接口,但為簡(jiǎn)單起見,我盡量在概述中不提到它們。本教程其它章節(jié)與它們相關(guān)的地方我會(huì)進(jìn)行解釋。以下是Java NIO里關(guān)鍵的Buffer實(shí)現(xiàn):

? ? ? ?ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer

? ? ? 這些Buffer覆蓋了你能通過IO發(fā)送的基本數(shù)據(jù)類型:byte, short, int, long, float,double 和 char。Java NIO 還有個(gè) MappedByteBuffer,用于表示內(nèi)存映射文件。

? ? ? ?Selector

? ? ? ? Selector允許單線程處理多個(gè) Channel。如果你的應(yīng)用打開了多個(gè)連接(通道),但每個(gè)連接的流量都很低,使用Selector就會(huì)很方便。例如,在一個(gè)聊天服務(wù)器中。這是在一個(gè)單線程中使用一個(gè)Selector處理3個(gè)Channel的圖示:


? ? ? ? 要使用Selector,得向Selector注冊(cè)Channel,然后調(diào)用它的select()方法。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。一旦這個(gè)方法返回,線程就可以處理這些事件,事件的例子有如新連接進(jìn)來,數(shù)據(jù)接收等。

Buffer的使用


? ? ? ?總結(jié)出使用Buffer一般遵循下面幾個(gè)步驟:

? ? ? ? 1. 分配空間(ByteBuffer buf = ByteBuffer.allocate(1024); 還有一種allocateDirector后面再陳述)

? ? ? ? 2. 寫入數(shù)據(jù)到Buffer(int bytesRead = fileChannel.read(buf);)

? ? ? ? 3. 調(diào)用filp()方法( buf.flip();)

? ? ? ? 4. 從Buffer中讀取數(shù)據(jù)(System.out.print((char)buf.get());)

? ? ? ? 5. 調(diào)用clear()方法或者compact()方法

? ? ? ?Buffer顧名思義:緩沖區(qū),實(shí)際上是一個(gè)容器,一個(gè)連續(xù)數(shù)組。Channel提供從文件、網(wǎng)絡(luò)讀取數(shù)據(jù)的渠道,但是讀寫的數(shù)據(jù)都必須經(jīng)過Buffer。如圖:


向Buffer中寫數(shù)據(jù):

? ? ? ?從Channel寫到Buffer (fileChannel.read(buf))

? ? ? ?通過Buffer的put()方法 (buf.put(…))

從Buffer中讀取數(shù)據(jù):

? ? ? ?從Buffer讀取到Channel (channel.write(buf))

? ? ? ?使用get()方法從Buffer中讀取數(shù)據(jù) (buf.get())

? ? ? ? 可以把Buffer簡(jiǎn)單地理解為一組基本數(shù)據(jù)類型的元素列表,它通過幾個(gè)變量來保存這個(gè)數(shù)據(jù)的當(dāng)前位置狀態(tài):capacity, position, limit, mark: 具體說明如圖表所示:

? ? ? ? capacity

? ? ? ? 作為一個(gè)內(nèi)存塊,Buffer有一個(gè)固定的大小值,也叫“capacity”.你只能往里寫capacity個(gè)byte、long,char等類型。一旦Buffer滿了,需要將其清空(通過讀數(shù)據(jù)或者清除數(shù)據(jù))才能繼續(xù)寫數(shù)據(jù)往里寫數(shù)據(jù)。

? ? ? ? position

? ? ? ? 當(dāng)你寫數(shù)據(jù)到Buffer中時(shí),position表示當(dāng)前的位置。初始的position值為0.當(dāng)一個(gè)byte、long等數(shù)據(jù)寫到Buffer后,position會(huì)向前移動(dòng)到下一個(gè)可插入數(shù)據(jù)的Buffer單元。position最大可為capacity – 1.當(dāng)讀取數(shù)據(jù)時(shí),也是從某個(gè)特定位置讀。當(dāng)將Buffer從寫模式切換到讀模式,position會(huì)被重置為0. 當(dāng)從Buffer的position處讀取數(shù)據(jù)時(shí),position向前移動(dòng)到下一個(gè)可讀的位置。

? ? ? ? ?limit

? ? ? ? 在寫模式下,Buffer的limit表示你最多能往Buffer里寫多少數(shù)據(jù)。寫模式下,limit等于Buffer的capacity。當(dāng)切換Buffer到讀模式時(shí), limit表示你最多能讀到多少數(shù)據(jù)。因此,當(dāng)切換Buffer到讀模式時(shí),limit會(huì)被設(shè)置成寫模式下的position值。換句話說,你能讀到之前寫入的所有數(shù)據(jù)(limit被設(shè)置成已寫數(shù)據(jù)的數(shù)量,這個(gè)值在寫模式下就是position)

? ? ? ?flip()方法

? ? ? ?flip方法將Buffer從寫模式切換到讀模式。調(diào)用flip()方法會(huì)將position設(shè)回0,并將limit設(shè)置成之前position的值。換句話說,position現(xiàn)在用于標(biāo)記讀的位置,limit表示之前寫進(jìn)了多少個(gè)byte、char等 —— 現(xiàn)在能讀取多少個(gè)byte、char等。

? ? ? ? Scatter/Gather

? ? ? ?Java NIO開始支持scatter/gather,scatter/gather用于描述從Channel(譯者注:Channel在中文經(jīng)常翻譯為通道)中讀取或者寫入到Channel的操作。

分散(scatter)從Channel中讀取是指在讀操作時(shí)將讀取的數(shù)據(jù)寫入多個(gè)buffer中。因此,Channel將從Channel中讀取的數(shù)據(jù)“分散(scatter)”到多個(gè)Buffer中。

聚集(gather)寫入Channel是指在寫操作時(shí)將多個(gè)buffer的數(shù)據(jù)寫入同一個(gè)Channel,因此,Channel 將多個(gè)Buffer中的數(shù)據(jù)“聚集(gather)”后發(fā)送到Channel。

? ? ? ?scatter/ gather經(jīng)常用于需要將傳輸?shù)臄?shù)據(jù)分開處理的場(chǎng)合,例如傳輸一個(gè)由消息頭和消息體組成的消息,你可能會(huì)將消息體和消息頭分散到不同的buffer中,這樣你可以方便的處理消息頭和消息體。

代碼示例如下:


注意buffer首先被插入到數(shù)組,然后再將數(shù)組作為channel.read() 的輸入?yún)?shù)。read()方法按照buffer在數(shù)組中的順序?qū)腸hannel中讀取的數(shù)據(jù)寫入到buffer,當(dāng)一個(gè)buffer被寫滿后,channel緊接著向另一個(gè)buffer中寫。

Scattering

Reads在移動(dòng)下一個(gè)buffer前,必須填滿當(dāng)前的buffer,這也意味著它不適用于動(dòng)態(tài)消息(譯者注:消息大小不固定)。換句話說,如果存在消息頭和消息體,消息頭必須完成填充(例如 128byte),Scattering Reads才能正常工作。

GatheringWrites

Gathering

Writes是指數(shù)據(jù)從多個(gè)buffer寫入到同一個(gè)channel。

代碼示例如下:

buffers數(shù)組是write()方法的入?yún)ⅲ瑆rite()方法會(huì)按照buffer在數(shù)組中的順序,將數(shù)據(jù)寫入到channel,注意只有position和limit之間的數(shù)據(jù)才會(huì)被寫入。因此,如果一個(gè)buffer的容量為128byte,但是僅僅包含58byte的數(shù)據(jù),那么這58byte的數(shù)據(jù)將被寫入到channel中。因此與Scattering Reads相反,Gathering Writes能較好的處理動(dòng)態(tài)消息。

? ? ? ?舉例:我們通過ByteBuffer.allocate(11)方法創(chuàng)建了一個(gè)11個(gè)byte的數(shù)組的緩沖區(qū),初始狀態(tài)如圖,position的位置為0,capacity和limit默認(rèn)都是數(shù)組長度。


當(dāng)我們寫入5個(gè)字節(jié)時(shí),變化如下圖:


? ? ? ? 這時(shí)我們需要將緩沖區(qū)中的5個(gè)字節(jié)數(shù)據(jù)寫入Channel的通信信道,所以我們調(diào)用ByteBuffer.flip()方法,變化如下圖所示(position設(shè)回0,并將limit設(shè)成之前的position的值):


這時(shí)底層操作系統(tǒng)就可以從緩沖區(qū)中正確讀取這個(gè)5個(gè)字節(jié)數(shù)據(jù)并發(fā)送出去了。在下一次寫數(shù)據(jù)之前我們?cè)僬{(diào)用clear()方法,緩沖區(qū)的索引位置又回到了初始位置。

調(diào)用clear()方法:

? ? ? ? position將被設(shè)回0,limit設(shè)置成capacity,換句話說,Buffer被清空了,其實(shí)Buffer中的數(shù)據(jù)并未被清楚,只是這些標(biāo)記告訴我們可以從哪里開始往Buffer里寫數(shù)據(jù)。如果Buffer中有一些未讀的數(shù)據(jù),調(diào)用clear()方法,數(shù)據(jù)將“被遺忘”,意味著不再有任何標(biāo)記會(huì)告訴你哪些數(shù)據(jù)被讀過,哪些還沒有。如果Buffer中仍有未讀的數(shù)據(jù),且后續(xù)還需要這些數(shù)據(jù),但是此時(shí)想要先先寫些數(shù)據(jù),那么使用compact()方法。

compact()方法:

? ? ? ? 將所有未讀的數(shù)據(jù)拷貝到Buffer起始處。然后將position設(shè)到最后一個(gè)未讀元素正后面。limit屬性依然像clear()方法一樣,設(shè)置成capacity?,F(xiàn)在Buffer準(zhǔn)備好寫數(shù)據(jù)了,但是不會(huì)覆蓋未讀的數(shù)據(jù)。

通過調(diào)用Buffer.mark()方法:

? ? ? ? 可以標(biāo)記Buffer中的一個(gè)特定的position,之后可以通過調(diào)用Buffer.reset()方法恢復(fù)到這個(gè)position。Buffer.rewind()方法將position設(shè)回0,所以你可以重讀Buffer中的所有數(shù)據(jù)。limit保持不變,仍然表示能從Buffer中讀取多少個(gè)元素。

Selector

Selector運(yùn)行單線程處理多個(gè)Channel,如果你的應(yīng)用打開了多個(gè)通道,但每個(gè)連接的流量都很低,使用Selector就會(huì)很方便。例如在一個(gè)聊天服務(wù)器中。要使用Selector, 得向Selector注冊(cè)Channel,然后調(diào)用它的select()方法。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。一旦這個(gè)方法返回,線程就可以處理這些事件,事件的例子有如新的連接進(jìn)來、數(shù)據(jù)接收等。

? ? ? ? Selector的創(chuàng)建

? ? ? ? 通過調(diào)用Selector.open()方法創(chuàng)建一個(gè)Selector,如下:

? ? ? ? ? ? ? ? ? ? ? ? ? ?Selectorselector = Selector.open();

? ? ? ? 向Selector注冊(cè)通道

? ? ? ? 為了將Channel和Selector配合使用,必須將channel注冊(cè)到selector上。通過SelectableChannel.register()方法來實(shí)現(xiàn),如下:


? ? ? ?與Selector一起使用時(shí),Channel必須處于非阻塞模式下。這意味著不能將FileChannel與Selector一起使用,因?yàn)镕ileChannel不能切換到非阻塞模式。而套接字通道都可以。

注意register()方法的第二個(gè)參數(shù)。這是一個(gè)“interest集合”,意思是在通過Selector監(jiān)聽Channel時(shí)對(duì)什么事件感興趣??梢员O(jiān)聽四種不同類型的事件:

? ? ? ?Connect、Accept、Read、Write

? ? ? ? 通道觸發(fā)了一個(gè)事件意思是該事件已經(jīng)就緒。所以,某個(gè)channel成功連接到另一個(gè)服務(wù)器稱為“連接就緒”。一個(gè)server socket

? ? ? ? channel準(zhǔn)備好接收新進(jìn)入的連接稱為“接收就緒”。一個(gè)有數(shù)據(jù)可讀的通道可以說是“讀就緒”。等待寫數(shù)據(jù)的通道可以說是“寫就緒”。

這四種事件用SelectionKey的四個(gè)常量來表示:

SelectionKey.OP_CONNECT

SelectionKey.OP_ACCEPT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

如果你對(duì)不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來.


Netty架構(gòu)按照Reactor模式設(shè)計(jì)和實(shí)現(xiàn),它的服務(wù)端通信序列圖如下


? ? ?Netty的IO線程N(yùn)ioEventLoop由于聚合了多路復(fù)用器Selector,可以同時(shí)并發(fā)處理成百上千個(gè)客戶端Channel,由于讀寫操作都是非阻塞的,這就可以充分提升IO線程的運(yùn)行效率,避免由于頻繁IO阻塞導(dǎo)致的線程掛起。另外,由于Netty采用了異步通信模式,一個(gè)IO線程可以并發(fā)處理N個(gè)客戶端連接和讀寫操作,這從根本上解決了傳統(tǒng)同步阻塞IO一連接一線程模型,架構(gòu)的性能、彈性伸縮能力和可靠性都得到了極大的提升。

注:參考部分其他網(wǎng)站內(nèi)容:

http://www.importnew.com/19816.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容