轉(zhuǎn)載自:http://www.cnblogs.com/rainy-shurun/p/5213086.html
Netty中的那些坑(上篇)
最近開(kāi)發(fā)了一個(gè)純異步的redis客戶(hù)端,算是比較深入的使用了一把netty。在使用過(guò)程中一邊優(yōu)化,一邊解決各種坑。兒這些坑大部分基本上是Netty4對(duì)Netty3的改進(jìn)部分引起的。
注:這里說(shuō)的坑不是說(shuō)netty不好,只是如果這些地方不注意,或者不去看netty的代碼,就有可能掉進(jìn)去了。
Netty 4的線(xiàn)程模型轉(zhuǎn)變
在Netty 3的時(shí)候,upstream是在IO線(xiàn)程里執(zhí)行的,而downstream是在業(yè)務(wù)線(xiàn)程里執(zhí)行的。比如netty從網(wǎng)絡(luò)讀取一個(gè)包傳遞給你的handler的時(shí)候,你的handler部分的代碼是執(zhí)行在IO線(xiàn)程里,而你的業(yè)務(wù)線(xiàn)程調(diào)用write向網(wǎng)絡(luò)寫(xiě)出一些東西的時(shí)候,你的handler是執(zhí)行在業(yè)務(wù)線(xiàn)程里。而Netty 4修改了這一模型。在Netty 4里inbound(upstream)和outbound(downstream)都是執(zhí)行在EventLoop(IO線(xiàn)程)里。也就是你如果在業(yè)務(wù)線(xiàn)程里通過(guò)channel.write向網(wǎng)絡(luò)寫(xiě)出一些東西的時(shí)候,在某一點(diǎn),netty 4會(huì)往這個(gè)channel的EventLoop里提交一個(gè)寫(xiě)出的任務(wù)。那也就是業(yè)務(wù)線(xiàn)程和IO線(xiàn)程是異步執(zhí)行的。
這有什么問(wèn)題呢?一般我們?cè)诰W(wǎng)絡(luò)通信里,業(yè)務(wù)層寫(xiě)出的都是對(duì)象。然后經(jīng)過(guò)序列化等手段轉(zhuǎn)換成字節(jié)流到網(wǎng)絡(luò),而Netty給我們提供了很好的編碼解碼的模型,一般我們也會(huì)將序列化和反序列化放到一個(gè)handler里處理,而在Netty 4里這些handler都是在EventLoop里執(zhí)行,那么就意味著在Netty 4里下面的代碼可能會(huì)導(dǎo)致一些微妙的結(jié)果:
User user = new User();
user.setName(“admin”);
channel.write(user);
user.setName(“guest”);
因?yàn)樾蛄谢蜆I(yè)務(wù)線(xiàn)程異步執(zhí)行,那么在write執(zhí)行后并不表示user對(duì)象已經(jīng)序列化了,如果這個(gè)時(shí)候修改了user對(duì)象那么傳遞到peer的對(duì)象可能就不再是你期望的那個(gè)user了。所以在Netty 4里如果還是使用handler實(shí)現(xiàn)序列化就一定要小心了。你要么在調(diào)用channel.write寫(xiě)出之前將對(duì)象進(jìn)行深度拷貝,要么就不在handler里進(jìn)行序列化了,直接將序列化好的東西傳遞給channel。
在不同的線(xiàn)程里使用PooledByteBufAllocator分配和回收
這個(gè)問(wèn)題其實(shí)是上面一個(gè)問(wèn)題的續(xù)集。在碰到之前一個(gè)問(wèn)題后,我們就決定不再在handler里做序列化了,而是直接在業(yè)務(wù)線(xiàn)程里做。但是為了減少內(nèi)存的拷貝,我們就期望在序列化的時(shí)候直接將字節(jié)流序列化到DirectByteBuf里,這樣通過(guò)socket寫(xiě)出的時(shí)候就不進(jìn)行拷貝了。而DirectByteBuf的分配成本比HeapByteBuf的成本要高,為此Netty 4借鑒jemalloc的思路實(shí)現(xiàn)了一個(gè)PooledByteBufAllocator。顧名思義,就是將DirectByteBuf池化起來(lái),回收的時(shí)候不真正回收,分配的時(shí)候從池里取一個(gè)空閑的。這對(duì)于大多數(shù)應(yīng)用來(lái)說(shuō)優(yōu)化效果還是很明顯的,比如在一些RPC場(chǎng)景中,我們所傳遞的對(duì)象的大小往往是差不多的,這可以充分利用池化的效果。
但是我們?cè)谑褂妙?lèi)似下面的偽代碼的時(shí)候內(nèi)存占用不斷飆高,然后瘋狂Full GC,并且有的時(shí)候還會(huì)出現(xiàn)OOM。這好像是內(nèi)存泄漏的跡象:
//業(yè)務(wù)線(xiàn)程
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buffer = allocator.buffer();
User user = new User();
//將對(duì)象直接序列化到ByteBuf
serialization.serialize(buffer, user);
//進(jìn)入EventLoop
channel.writeAndFlush(buffer);
上面的代碼表面看沒(méi)什么問(wèn)題。但實(shí)際上,PooledByteBufAllocator為了減少鎖競(jìng)爭(zhēng),池是通過(guò)thread local來(lái)實(shí)現(xiàn)的。也就是分配的時(shí)候會(huì)從本線(xiàn)程(這里就是業(yè)務(wù)線(xiàn)程)的thread local里取。而channel.writeAndFlush調(diào)用后,在將buffer寫(xiě)到socket后,這個(gè)buffer將被回收到池里?;厥盏臅r(shí)候也是通過(guò)thread local找到對(duì)應(yīng)的池,回收掉。這樣就有一個(gè)問(wèn)題,分配的時(shí)候是在業(yè)務(wù)線(xiàn)程,也就是說(shuō)從業(yè)務(wù)線(xiàn)程的thread local對(duì)應(yīng)的池里分配的,而回收的時(shí)候是在IO線(xiàn)程。這兩個(gè)是不同的線(xiàn)程。池的作用完全喪失了,一個(gè)線(xiàn)程不斷地去分配,不斷地轉(zhuǎn)移到另外一個(gè)池。
ByteBuf擴(kuò)展引起的問(wèn)題
其實(shí)這個(gè)問(wèn)題和上面一個(gè)問(wèn)題是一樣的。但是比之前的問(wèn)題更加隱晦,就在你彈冠相慶的時(shí)候給你致命一擊。在碰到上面一個(gè)問(wèn)題后我們就在想,既然分配和回收都得在同一個(gè)線(xiàn)程里執(zhí)行,那我們是不是可以啟動(dòng)一個(gè)專(zhuān)門(mén)的線(xiàn)程來(lái)負(fù)責(zé)分配和回收呢?于是就有了下面的代碼:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import qunar.tc.qclient.redis.exception.RedisRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Allocator {
public static final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
private static final BlockingQueue bufferQueue = new ArrayBlockingQueue(100);
private static final BlockingQueue toCleanQueue = new LinkedBlockingQueue();
private static final int TO_CLEAN_SIZE = 50;
private static final long CLEAN_PERIOD = 100;
private static class AllocThread implements Runnable {
@Override
public void run() {
long lastCleanTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted()) {
try {
ByteBuf buffer = allocator.buffer();
//確保是本線(xiàn)程釋放
buffer.retain();
bufferQueue.put(buffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (toCleanQueue.size() > TO_CLEAN_SIZE || System.currentTimeMillis() - lastCleanTime > CLEAN_PERIOD) {
final List toClean = new ArrayList(toCleanQueue.size());
toCleanQueue.drainTo(toClean);
for (ByteBuf buffer : toClean) {
ReferenceCountUtil.release(buffer);
}
lastCleanTime = System.currentTimeMillis();
}
}
}
}
static {
Thread thread = new Thread(new AllocThread(), “qclient-redis-allocator”);
thread.setDaemon(true);
thread.start();
}
public static ByteBuf alloc() {
try {
return bufferQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisRuntimeException(“alloc interrupt”);
}
}
public static void release(ByteBuf buf) {
toCleanQueue.add(buf);
}
}
在業(yè)務(wù)線(xiàn)程里調(diào)用alloc,從queue里拿到專(zhuān)用的線(xiàn)程分配好的buffer。在將buffer寫(xiě)出到socket之后再調(diào)用release回收:
//業(yè)務(wù)線(xiàn)程
ByteBuf buffer = Allocator.alloc();
//序列化
……..
//寫(xiě)出
ChannelPromise promise = channel.newPromise();
promise.addListener(new GenericFutureListener
連接超時(shí)
在網(wǎng)絡(luò)應(yīng)用中,超時(shí)往往是最后一道防線(xiàn),或是最后一根稻草。我們不怕干脆利索的宕機(jī),怕就怕要死不活。當(dāng)碰到要死不活的應(yīng)用的時(shí)候往往就是依靠超時(shí)了。
在使用Netty編寫(xiě)客戶(hù)端的時(shí)候,我們一般會(huì)有類(lèi)似這樣的代碼:
bootstrap.connect(address).await(1000, TimeUnit.MILLISECONDS)
向?qū)Χ税l(fā)起一個(gè)連接,超時(shí)等待1秒鐘。如果1秒鐘沒(méi)有連接上則重連或者做其他處理。而其實(shí)在bootstrap的選項(xiàng)里,還有這樣的一項(xiàng):
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
如果這兩個(gè)值設(shè)置的不一致,在await的時(shí)候較短,而option里設(shè)置的較長(zhǎng)就出問(wèn)題了。這個(gè)時(shí)候你會(huì)發(fā)現(xiàn)connect里已經(jīng)超時(shí)了,你以為連接失敗了,但實(shí)際上await超時(shí)Netty并不會(huì)幫你取消正在連接的鏈接。這個(gè)時(shí)候如果第2秒的時(shí)候連上了對(duì)端服務(wù)器,那么你剛才的判斷就失誤了。如果你根據(jù)connect(address).await(1000, TimeUnit.MILLISECONDS)來(lái)決定是否重連,很有可能你就建立了兩個(gè)連接,而且很有可能你的handler就在這兩個(gè)channel里共享起來(lái)了,這就有可能讓你產(chǎn)生:哎呀,Netty的handler不是在單線(xiàn)程里執(zhí)行的這樣的假象。所以我的建議是,不要在await上設(shè)置超時(shí),而總是使用option上的選項(xiàng)來(lái)設(shè)置。這個(gè)更準(zhǔn)確些,超時(shí)了就是真的表示沒(méi)有連上。
異步處理,流控先行
這個(gè)坑其實(shí)也不算坑,只是因?yàn)閼校撟龅氖虑闆](méi)做。一般來(lái)講我們的業(yè)務(wù)如果比較小的時(shí)候我們用同步處理,等業(yè)務(wù)到一定規(guī)模的時(shí)候,一個(gè)優(yōu)化手段就是異步化。異步化是提高吞吐量的一個(gè)很好的手段。但是,與異步相比,同步有天然的負(fù)反饋機(jī)制,也就是如果后端慢了,前面也會(huì)跟著慢起來(lái),可以自動(dòng)的調(diào)節(jié)。但是異步就不同了,異步就像決堤的大壩一樣,洪水是暢通無(wú)阻。如果這個(gè)時(shí)候沒(méi)有進(jìn)行有效的限流措施就很容易把后端沖垮。如果一下子把后端沖垮倒也不是最壞的情況,就怕把后端沖的要死不活。這個(gè)時(shí)候,后端就會(huì)變得特別緩慢,如果這個(gè)時(shí)候前面的應(yīng)用使用了一些無(wú)界的資源等,就有可能把自己弄死。那么現(xiàn)在要介紹的這個(gè)坑就是關(guān)于Netty里的ChannelOutboundBuffer這個(gè)東西的。這個(gè)buffer是用在netty向channel write數(shù)據(jù)的時(shí)候,有個(gè)buffer緩沖,這樣可以提高網(wǎng)絡(luò)的吞吐量(每個(gè)channel有一個(gè)這樣的buffer)。初始大小是32(32個(gè)元素,不是指字節(jié)),但是如果超過(guò)32就會(huì)翻倍,一直增長(zhǎng)。大部分時(shí)候是沒(méi)有什么問(wèn)題的,但是在碰到對(duì)端非常慢(對(duì)端慢指的是對(duì)端處理TCP包的速度變慢,比如對(duì)端負(fù)載特別高的時(shí)候就有可能是這個(gè)情況)的時(shí)候就有問(wèn)題了,這個(gè)時(shí)候如果還是不斷地寫(xiě)數(shù)據(jù),這個(gè)buffer就會(huì)不斷地增長(zhǎng),最后就有可能出問(wèn)題了(我們的情況是開(kāi)始吃swap,最后進(jìn)程被linux killer干掉了)。
為什么說(shuō)這個(gè)地方是坑呢,因?yàn)榇蟛糠謺r(shí)候我們往一個(gè)channel寫(xiě)數(shù)據(jù)會(huì)判斷channel是否active,但是往往忽略了這種慢的情況。
那這個(gè)問(wèn)題怎么解決呢?其實(shí)ChannelOutboundBuffer雖然無(wú)界,但是可以給它配置一個(gè)高水位線(xiàn)和低水位線(xiàn),當(dāng)buffer的大小超過(guò)高水位線(xiàn)的時(shí)候?qū)?yīng)channel的isWritable就會(huì)變成false,當(dāng)buffer的大小低于低水位線(xiàn)的時(shí)候,isWritable就會(huì)變成true。所以應(yīng)用應(yīng)該判斷isWritable,如果是false就不要再寫(xiě)數(shù)據(jù)了。高水位線(xiàn)和低水位線(xiàn)是字節(jié)數(shù),默認(rèn)高水位是64K,低水位是32K,我們可以根據(jù)我們的應(yīng)用需要支持多少連接數(shù)和系統(tǒng)資源進(jìn)行合理規(guī)劃。
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)
在使用一些開(kāi)源的框架上還真是要熟悉人家的實(shí)現(xiàn)機(jī)制,然后才可以大膽的使用啊,不然被坑死都覺(jué)得自己很冤枉
Netty中的坑(下篇)
其實(shí)這篇應(yīng)該叫Netty實(shí)踐,但是為了與前一篇名字保持一致,所以還是用一下坑這個(gè)名字吧。
Netty是高性能Java NIO網(wǎng)絡(luò)框架,在很多開(kāi)源系統(tǒng)里都有她的身影,而在絕大多數(shù)互聯(lián)網(wǎng)公司所實(shí)施的服務(wù)化,以及最近流行的MicroService中,她都作為基礎(chǔ)中的基礎(chǔ)出現(xiàn)。
Netty的出現(xiàn)讓我們可以簡(jiǎn)單容易地就可以使用NIO帶來(lái)的高性能網(wǎng)絡(luò)編程的潛力。她用一種統(tǒng)一的流水線(xiàn)方式組織我們的業(yè)務(wù)代碼,將底層網(wǎng)絡(luò)繁雜的細(xì)節(jié)隱藏起來(lái),讓我們只需要關(guān)注業(yè)務(wù)代碼即可。并且用這種機(jī)制將不同的業(yè)務(wù)劃分到不同的handler里,比如將編碼,連接管理,業(yè)務(wù)邏輯處理進(jìn)行分開(kāi)。Netty也力所能及的屏蔽了一些NIO bug,比如著名的epoll cpu 100% bug。而且,還提供了很多優(yōu)化支持,比如使用buffer來(lái)提高網(wǎng)絡(luò)吞吐量。
但是,和所有的框架一樣,框架為我們屏蔽了底層細(xì)節(jié),讓我們可以很快上手。但是,并不表示我們不需要對(duì)框架所屏蔽的那一層進(jìn)行了解。本文所涉及的幾個(gè)地方就是Netty與底層網(wǎng)絡(luò)結(jié)合的幾個(gè)地方,看看我們使用的時(shí)候應(yīng)該怎么處理,以及為什么要這么處理。
autoread
在Netty 4里我覺(jué)得一個(gè)很有用的功能是autoread。autoread是一個(gè)開(kāi)關(guān),如果打開(kāi)的時(shí)候Netty就會(huì)幫我們注冊(cè)讀事件(這個(gè)需要對(duì)NIO有些基本的了解)。當(dāng)注冊(cè)了讀事件后,如果網(wǎng)絡(luò)可讀,則Netty就會(huì)從channel讀取數(shù)據(jù),然后我們的pipeline就會(huì)開(kāi)始流動(dòng)起來(lái)。那如果autoread關(guān)掉后,則Netty會(huì)不注冊(cè)讀事件,這樣即使是對(duì)端發(fā)送數(shù)據(jù)過(guò)來(lái)了也不會(huì)觸發(fā)讀時(shí)間,從而也不會(huì)從channel讀取到數(shù)據(jù)。那么這樣一個(gè)功能到底有什么作用呢?
它的作用就是更精確的速率控制。那么這句話(huà)是什么意思呢?比如我們現(xiàn)在在使用Netty開(kāi)發(fā)一個(gè)應(yīng)用,這個(gè)應(yīng)用從網(wǎng)絡(luò)上發(fā)送過(guò)來(lái)的數(shù)據(jù)量非常大,大到有時(shí)我們都有點(diǎn)處理不過(guò)來(lái)了。而我們使用Netty開(kāi)發(fā)應(yīng)用往往是這樣的安排方式:Netty的Worker線(xiàn)程處理網(wǎng)絡(luò)事件,比如讀取和寫(xiě)入,然后將讀取后的數(shù)據(jù)交給pipeline處理,比如經(jīng)過(guò)反序列化等最后到業(yè)務(wù)層。到業(yè)務(wù)層的時(shí)候如果業(yè)務(wù)層有阻塞操作,比如數(shù)據(jù)庫(kù)IO等,可能還要將收到的數(shù)據(jù)交給另外一個(gè)線(xiàn)程池處理。因?yàn)槲覀兘^對(duì)不能阻塞Worker線(xiàn)程,一旦阻塞就會(huì)影響網(wǎng)絡(luò)處理效率,因?yàn)檫@些Worker是所有網(wǎng)絡(luò)處理共享的,如果這里阻塞了,可能影響很多channel的網(wǎng)絡(luò)處理。
但是,如果把接到的數(shù)據(jù)交給另外一個(gè)線(xiàn)程池處理就又涉及另外一個(gè)問(wèn)題:速率匹配。
比如現(xiàn)在網(wǎng)絡(luò)實(shí)在太忙了,接收到很多數(shù)據(jù)交給線(xiàn)程池。然后就出現(xiàn)兩種情況:
- 由于開(kāi)發(fā)的時(shí)候沒(méi)有考慮到,這個(gè)線(xiàn)程池使用了某些無(wú)界資源。比如很多人對(duì)ThreadPoolExecutor的幾個(gè)參數(shù)不是特別熟悉,就有可能用錯(cuò),最后導(dǎo)致資源無(wú)節(jié)制使用,整個(gè)系統(tǒng)crash掉。
//比如開(kāi)始的時(shí)候沒(méi)有考慮到會(huì)有這么大量//這種方式線(xiàn)程數(shù)是無(wú)界的,那么有可能創(chuàng)建大量的線(xiàn)程對(duì)系統(tǒng)穩(wěn)定性造成影響Executor executor = Executors.newCachedTheadPool(); executor.execute(requestWorker);//或者使用這個(gè)//這種queue是無(wú)界的,有可能會(huì)消耗太多內(nèi)存,對(duì)系統(tǒng)穩(wěn)定性造成影響Executor executor = Executors.newFixedThreadPool(8); executor.execute(requestWorker);
- 第二種情況就是限制了資源使用,所以只好把最老的或最新的數(shù)據(jù)丟棄。
//線(xiàn)程池滿(mǎn)后,將最老的數(shù)據(jù)丟棄Executor executor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(1000), namedFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
其實(shí)上面兩種情況,不管哪一種都不是太合理。不過(guò)在Netty 4里我們就有了更好的解決辦法了。如果我們的線(xiàn)程池暫時(shí)處理不過(guò)來(lái),那么我們可以將autoread關(guān)閉,這樣Netty就不再?gòu)腸hannel上讀取數(shù)據(jù)了。那么這樣造成的影響是什么呢?這樣socket在內(nèi)核那一層的read buffer就會(huì)滿(mǎn)了。因?yàn)門(mén)CP默認(rèn)就是帶flow control的,read buffer變小之后,向?qū)Χ税l(fā)送ACK的時(shí)候,就會(huì)降低窗口大小,直至變成0,這樣對(duì)端就會(huì)自動(dòng)的降低發(fā)送數(shù)據(jù)的速率了。等到我們又可以處理數(shù)據(jù)了,我們就可以將autoread又打開(kāi)這樣數(shù)據(jù)又源源不斷的到來(lái)了。
這樣整個(gè)系統(tǒng)就通過(guò)TCP的這個(gè)負(fù)反饋機(jī)制,和諧的運(yùn)行著。那么autoread涉及的網(wǎng)絡(luò)知識(shí)就是,發(fā)送端會(huì)根據(jù)對(duì)端ACK時(shí)候所攜帶的advertises window來(lái)調(diào)整自己發(fā)送的數(shù)據(jù)量。而ACK里的這個(gè)window的大小又跟接收端的read buffer有關(guān)系。而不注冊(cè)讀事件后,read buffer里的數(shù)據(jù)沒(méi)有被消費(fèi)掉,就會(huì)達(dá)到控制發(fā)送端速度的目的。
不過(guò)設(shè)計(jì)關(guān)閉和打開(kāi)autoread的策略也要注意,不要設(shè)計(jì)成我們不能處理任何數(shù)據(jù)了就立即關(guān)閉autoread,而我們開(kāi)始能處理了就立即打開(kāi)autoread。這個(gè)地方應(yīng)該留一個(gè)緩沖地帶。也就是如果現(xiàn)在排隊(duì)的數(shù)據(jù)達(dá)到我們預(yù)設(shè)置的一個(gè)高水位線(xiàn)的時(shí)候我們關(guān)閉autoread,而低于一個(gè)低水位線(xiàn)的時(shí)候才打開(kāi)autoread。不這么弄的話(huà),有可能就會(huì)導(dǎo)致我們的autoread頻繁打開(kāi)和關(guān)閉。autoread的每次調(diào)整都會(huì)涉及系統(tǒng)調(diào)用,對(duì)性能是有影響的。類(lèi)似下面這樣一個(gè)代碼,在將任務(wù)提交到線(xiàn)程池之前,判斷一下現(xiàn)在的排隊(duì)量(注:本文的所有數(shù)字純?yōu)檠菔咀饔?,所有線(xiàn)程池,隊(duì)列等大小數(shù)據(jù)要根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景仔細(xì)設(shè)計(jì)和考量)。
int highReadWaterMarker = 900;int lowReadWaterMarker = 600; ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(1000), namedFactory, new ThreadPoolExecutor.DiscardOldestPolicy());int queued = executor.getQueue().size();if(queued > highReadWaterMarker){ channel.config().setAutoRead(false); }if(queued < lowReadWaterMarker){ channel.config().setAutoRead(true); }
但是使用autoread也要注意一件事情。autoread如果關(guān)閉后,對(duì)端發(fā)送FIN的時(shí)候,接收端應(yīng)用層也是感知不到的。這樣帶來(lái)一個(gè)后果就是對(duì)端發(fā)送了FIN,然后內(nèi)核將這個(gè)socket的狀態(tài)變成CLOSE_WAIT。但是因?yàn)閼?yīng)用層感知不到,所以應(yīng)用層一直沒(méi)有調(diào)用close。這樣的socket就會(huì)長(zhǎng)期處于CLOSE_WAIT狀態(tài)。特別是一些使用連接池的應(yīng)用,如果將連接歸還給連接池后,一定要記著autoread一定是打開(kāi)的。不然就會(huì)有大量的連接處于CLOSE_WAIT狀態(tài)。
其實(shí)所有異步的場(chǎng)合都存在速率匹配的問(wèn)題,而同步往往不存在這樣的問(wèn)題,因?yàn)橥奖旧砭褪菐ж?fù)反饋的。
isWritable
isWritable其實(shí)在上一篇文章已經(jīng)介紹了一點(diǎn),不過(guò)這里我想結(jié)合網(wǎng)絡(luò)層再啰嗦一下。上面我們講的autoread一般是接收端的事情,而發(fā)送端也有速率控制的問(wèn)題。Netty為了提高網(wǎng)絡(luò)的吞吐量,在業(yè)務(wù)層與socket之間又增加了一個(gè)ChannelOutboundBuffer。在我們調(diào)用channel.write的時(shí)候,所有寫(xiě)出的數(shù)據(jù)其實(shí)并沒(méi)有寫(xiě)到socket,而是先寫(xiě)到ChannelOutboundBuffer。當(dāng)調(diào)用channel.flush的時(shí)候才真正的向socket寫(xiě)出。因?yàn)檫@中間有一個(gè)buffer,就存在速率匹配了,而且這個(gè)buffer還是無(wú)界的。也就是你如果沒(méi)有控制channel.write的速度,會(huì)有大量的數(shù)據(jù)在這個(gè)buffer里堆積,而且如果碰到socket又『寫(xiě)不出』數(shù)據(jù)的時(shí)候,很有可能的結(jié)果就是資源耗盡。而且這里讓這個(gè)事情更嚴(yán)重的是ChannelOutboundBuffer很多時(shí)候我們放到里面的是DirectByteBuffer,什么意思呢,意思是這些內(nèi)存是放在GC Heap之外。如果我們僅僅是監(jiān)控GC的話(huà)還監(jiān)控不出來(lái)這個(gè)隱患。
那么說(shuō)到這里,socket什么時(shí)候會(huì)寫(xiě)不出數(shù)據(jù)呢?在上一節(jié)我們了解到接收端有一個(gè)read buffer,其實(shí)發(fā)送端也有一個(gè)send buffer。我們調(diào)用socket的write的時(shí)候其實(shí)是向這個(gè)send buffer寫(xiě)數(shù)據(jù),如果寫(xiě)進(jìn)去了就表示成功了(所以這里千萬(wàn)不能將socket.write調(diào)用成功理解成數(shù)據(jù)已經(jīng)到達(dá)接收端了),如果send buffer滿(mǎn)了,對(duì)于同步socket來(lái)講,write就會(huì)阻塞直到超時(shí)或者send buffer又有空間(這么一看,其實(shí)我們可以將同步的socket.write理解為半同步嘛)。對(duì)于異步來(lái)講這里是立即返回的。
那么進(jìn)入send buffer的數(shù)據(jù)什么時(shí)候會(huì)減少呢?是發(fā)送到網(wǎng)絡(luò)的數(shù)據(jù)就會(huì)從send buffer里去掉么?也不是這個(gè)樣子的。還記得TCP有重傳機(jī)制么,如果發(fā)送到網(wǎng)絡(luò)的數(shù)據(jù)都從send buffer刪除了,那么這個(gè)數(shù)據(jù)沒(méi)有得到確認(rèn)TCP怎么重傳呢?所以send buffer的數(shù)據(jù)是等到接收端回復(fù)ACK確認(rèn)后才刪除。那么,如果接收端非常慢,比如CPU占用已經(jīng)到100%了,而load也非常高的時(shí)候,很有可能來(lái)不及處理網(wǎng)絡(luò)事件,這個(gè)時(shí)候send buffer就有可能會(huì)堆滿(mǎn)。這就導(dǎo)致socket寫(xiě)不出數(shù)據(jù)了。而發(fā)送端的應(yīng)用層在發(fā)送數(shù)據(jù)的時(shí)候往往判斷socket是不是有效的(是否已經(jīng)斷開(kāi)),而忽略了是否可寫(xiě),這個(gè)時(shí)候有可能就還一個(gè)勁的寫(xiě)數(shù)據(jù),最后導(dǎo)致ChannelOutboundBuffer膨脹,造成系統(tǒng)不穩(wěn)定。
所以,Netty已經(jīng)為我們考慮了這點(diǎn)。channel有一個(gè)isWritable屬性,可以來(lái)控制ChannelOutboundBuffer,不讓其無(wú)限制膨脹。至于isWritable的實(shí)現(xiàn)機(jī)制可以參考前一篇。
序列化
所有講TCP的書(shū)都會(huì)有這么一個(gè)介紹:TCP provides a connection-oriented, reliable, byte stream service。前面兩個(gè)這里就不關(guān)心了,那么這個(gè)byte stream到底是什么意思呢?我們?cè)诎l(fā)送端發(fā)送數(shù)據(jù)的時(shí)候,對(duì)于應(yīng)用層來(lái)說(shuō)我們發(fā)送的是一個(gè)個(gè)對(duì)象,然后序列化成一個(gè)個(gè)字節(jié)數(shù)組,但無(wú)論怎樣,我們發(fā)送的是一個(gè)個(gè)『包』。每個(gè)都是獨(dú)立的。那么接收端是不是也像發(fā)送端一樣,接收到一個(gè)個(gè)獨(dú)立的『包』呢?很遺憾,不是的。這就是byte stream的意思。接收端沒(méi)有『包』的概念了。
這對(duì)于應(yīng)用層編碼的人員來(lái)說(shuō)可能有點(diǎn)困惑。比如我使用Netty開(kāi)發(fā),我的handler的channelRead這次明明傳遞給我的是一個(gè)ByteBuf啊,是一個(gè)『獨(dú)立』的包啊,如果是byte stream的話(huà)難道不應(yīng)該傳遞我一個(gè)Stream么。但是這個(gè)ByteBuf和發(fā)送端的ByteBuf一點(diǎn)關(guān)系都沒(méi)有。比如:
public class Decorder extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //這里的msg和發(fā)送端channel.write(msg)時(shí)候的msg沒(méi)有任何關(guān)系 } }
這個(gè)ByteBuf可能包含發(fā)送端多個(gè)ByteBuf,也可能只包含發(fā)送端半個(gè)ByteBuf。但是別擔(dān)心,TCP的可靠性會(huì)確保接收端的順序和發(fā)送端的順序是一致的。這樣的byte stream協(xié)議對(duì)我們的反序列化工作就帶來(lái)了一些挑戰(zhàn)。在反序列化的時(shí)候我們要時(shí)刻記著這一點(diǎn)。對(duì)于半個(gè)ByteBuf我們按照設(shè)計(jì)的協(xié)議如果解不出一個(gè)完整對(duì)象,我們要留著,和下次收到的ByteBuf拼湊在一起再次解析,而收到的多個(gè)ByteBuf我們要根據(jù)協(xié)議解析出多個(gè)完整對(duì)象,而很有可能最后一個(gè)也是不完整的。不過(guò)幸運(yùn)的是,我們有了Netty。Netty為我們已經(jīng)提供了很多種協(xié)議解析的方式,并且對(duì)于這種半包粘包也已經(jīng)有考慮,我們可以參考ByteToMessageDecoder以及它的一連串子類(lèi)來(lái)實(shí)現(xiàn)自己的反序列化機(jī)制。而在反序列化的時(shí)候我們可能經(jīng)常要取ByteBuf中的一個(gè)片段,這個(gè)時(shí)候建議使用ByteBuf的readSlice方法而不是使用copy。
另外,Netty還提供了兩個(gè)ByteBuf的流封裝:ByteBufInputStream, ByteBufOutputStream。比如我們?cè)谑褂靡恍┬蛄谢ぞ?,比如Hessian之類(lèi)的時(shí)候,我們往往需要傳遞一個(gè)InputStream(反序列化),OutputStream(序列化)到這些工具。而很多協(xié)議的實(shí)現(xiàn)都涉及大量的內(nèi)存copy。比如對(duì)于反序列化,先將ByteBuf里的數(shù)據(jù)讀取到byte[],然后包裝成ByteArrayInputStream,而序列化的時(shí)候是先將對(duì)象序列化成ByteArrayOutputStream再copy到ByteBuf。而使用ByteBufInputStream和ByteBufOutputStream就不再有這樣的內(nèi)存拷貝了,大大節(jié)約了內(nèi)存開(kāi)銷(xiāo)。
另外,因?yàn)閟ocket.write和socket.read都需要一個(gè)direct byte buffer(即使你傳入的是一個(gè)heap byte buffer,socket內(nèi)部也會(huì)將內(nèi)容copy到direct byte buffer)。如果我們直接使用ByteBufInputStream和ByteBufOutputStream封裝的direct byte buffer再加上Netty 4的內(nèi)存池,那么內(nèi)存將更有效的使用。這里提一個(gè)問(wèn)題:為什么socket.read和socket.write都需要direct byte buffer呢?heap byte buffer不行么?
總結(jié)起來(lái),對(duì)于序列化和反序列化來(lái)講就是兩條:1 減少內(nèi)存拷貝 2 處理好TCP的粘包和半包問(wèn)題
后記
作為一個(gè)應(yīng)用層程序員,往往是幸福的。因?yàn)槲覀冇胸S富的框架和工具為我們屏蔽下層的細(xì)節(jié),這樣我們可以更容易的解決很多業(yè)務(wù)問(wèn)題。但是目前程序設(shè)計(jì)并沒(méi)有發(fā)展到不需要了解所有下層的知識(shí)就可以寫(xiě)出更有效率的程序,所以我們?cè)谑褂靡粋€(gè)框架的時(shí)候最好要對(duì)它所屏蔽和所依賴(lài)的知識(shí)進(jìn)行一些了解,這樣在碰到一些問(wèn)題的時(shí)候我們可以根據(jù)這些理論知識(shí)去分析原因。這就是理論和實(shí)踐的相結(jié)合