手寫RPC框架
1、手寫一個(gè)RPC框架,看看100個(gè)線程同時(shí)調(diào)用效果如何
2、手寫RPC框架(2)-引入zookeeper做服務(wù)治理
3、手寫RPC框架(3)-引入Hessian序列化工具
4、手寫RPC框架(4)-重寫服務(wù)治理,開啟1000個(gè)線程看看netty的執(zhí)行調(diào)用情況
Netty是基于NIO的的服務(wù)框架,屏蔽了使用Java原生NIO網(wǎng)絡(luò)模型的各種問題,對(duì)外提供靈活的Reactor模型配置,也提供了插拔式的Handler處理器,便于支持各種網(wǎng)絡(luò)協(xié)議和特定業(yè)務(wù)等操作,也是異步事件驅(qū)動(dòng),使得性能能夠更高。此前RPC中關(guān)于Netty的代碼邏輯存在些問題,對(duì)Netty的一些概念也沒有理解到位,所以這次就一起再學(xué)習(xí)Netty,先寫一個(gè)demo有大致的了解和印象,隨后通過問題介紹各個(gè)組件的功能和特點(diǎn),其原因是什么。
- 粘包、拆包是什么情況,為什么會(huì)發(fā)生這種情況?
- pipeline 和 handler是什么關(guān)系?
- pipeline.addLast的順序是如何執(zhí)行的?
- handler中的各個(gè)fireXXX執(zhí)行順序是怎樣的?
- 為什么server是2個(gè)EventLoopGroup,而client卻只有1個(gè)EventLoopGroup?
Demo
運(yùn)行效果如下圖

服務(wù)端
public class Server {
public void run(int port) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Server().run(10002);
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered ...");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered ...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive ...");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive ...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Client:[" + body + "]");
String cur = ("Hello, My name is jwfy".equalsIgnoreCase(body) ? "OK" : "ERROR") + System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(cur.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete ...");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端
public class Client {
public void connection(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Client().connection("127.0.0.1", 10002);
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered ...");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered ...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive ...");
for(int i = 0; i < 1; i++ ) {
ctx.writeAndFlush(Unpooled.copiedBuffer(("Hello, My name is jwfy" + System.getProperty("line.separator")).getBytes()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive ...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Server:[" + body + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
EventLoopGroup
EventLoopGroup 是一種Reactor多線程模型的抽象,具體實(shí)現(xiàn)一般都是NioEventLoopGroup。而Reactor模型又有單線程、多線程、以及主從多線程模型,他們有什么區(qū)別呢?
單線程模型

1個(gè)NIO線程原則上可以負(fù)責(zé)所有IO相關(guān)的請(qǐng)求操作,通過acceptor接收客戶端發(fā)生的TCP請(qǐng)求,當(dāng)鏈接建立成功之后,通過Dispatch將對(duì)于的請(qǐng)求數(shù)據(jù)包裝成bytebuf指派給相關(guān)的handler處理。但是這在某些場(chǎng)景下也不太合適。
- 一個(gè)NIO線程同時(shí)管理成百上千的客戶端鏈接,會(huì)嚴(yán)重影響性能
- 當(dāng)NIO線程的負(fù)載很高時(shí),導(dǎo)致處理速度變慢,同時(shí)還可能因?yàn)槟骋徽?qǐng)求影響整個(gè)NIO線程的工作,進(jìn)而影響其他端口的處理請(qǐng)求。故有了多線程模型。
多線程模型

同樣是一個(gè)NIO線程接收客戶端的請(qǐng)求調(diào)用,當(dāng)鏈接完成后請(qǐng)求會(huì)分配給一個(gè)NIO線程池,具體的消息序列化反序列化、數(shù)據(jù)處理等任務(wù)可有NIO線程池中的線程完成。
同樣的基本情況下是沒有問題的,但是多個(gè)客戶端連接依舊可能出現(xiàn)性能問題,故有了主從多線程模型
主從多線程模型
主從情況就是從一個(gè)NIO線程變成了一個(gè)NIO線程池,可同時(shí)由多個(gè)NIO線程處理客戶端的請(qǐng)求連接操作,減少因?yàn)樾阅懿蛔銓?dǎo)致的問題,這也是netty推薦的使用方法。
EventLoopGroup 則也是一個(gè)NIO線程池,即可用于客戶端的TCP請(qǐng)連接求,也可用于數(shù)據(jù)的IO處理,所以在上述代碼中觀察發(fā)現(xiàn)服務(wù)端和客戶端的EventLoopGroup個(gè)數(shù)不一樣也是這個(gè)道理,服務(wù)端一個(gè)線程池用來接收客戶端連接,另一個(gè)則用來進(jìn)行讀寫IO操作。
粘包、拆包
眾所周知,網(wǎng)絡(luò)上的傳輸?shù)亩际亲止?jié)流,從TCP/IP協(xié)議角度出發(fā)無法知道具體的業(yè)務(wù)數(shù)據(jù)組裝情況,所以實(shí)際場(chǎng)景中一個(gè)請(qǐng)求可能被分批次傳輸,也有可能因?yàn)檎?qǐng)求數(shù)據(jù)太少故打包多個(gè)請(qǐng)求統(tǒng)一傳輸

如上圖,正常的情況是分別有D1和D2兩個(gè)數(shù)據(jù)包發(fā)送到服務(wù)端,但是因?yàn)榫W(wǎng)絡(luò)擁塞比較嚴(yán)重,滑動(dòng)窗口自適應(yīng)的縮小,使得1個(gè)緩沖區(qū)的大小無法裝滿整個(gè)請(qǐng)求體,就會(huì)出現(xiàn)拆包的情況;又例如請(qǐng)求體內(nèi)容較少,無法填充完整緩沖區(qū),那么就會(huì)等待多個(gè)請(qǐng)求把緩沖區(qū)填滿再發(fā)送出去,就會(huì)出現(xiàn)粘包的情況,如下距離:
- D2和D1 同時(shí)發(fā)送到服務(wù)端,那么服務(wù)端則需要正確的進(jìn)行拆分處理,否則反序列化會(huì)失敗
- D1和D2 的一部分D2_1 同時(shí)發(fā)送到服務(wù)端,服務(wù)端除了需要把D1拆出來,還需要等待D2_2的到來才能開始處理D2數(shù)據(jù)
必須首先處理好拆包和粘包問題,才能保證收到正常的完整的消息,而netty則幫我們解決了大部分問題了,例如根據(jù)長(zhǎng)度拆分(FixedLengthFrameDecoder),根據(jù)換行符拆分(LineBasedFrameDecoder),又或者分割符拆分(DelimiterBasedFrameDecoder),只是在本Demo中使用的是換行符切分的LineBasedFrameDecoder
ChannelPipeline 和 ChannelHandler

ChannelPipeline 是一個(gè)擁有頭(Head)和尾(Tail)的雙向鏈?zhǔn)饺萜?,可自由添加不同的handler處理器以滿足不同的業(yè)務(wù)需求。同時(shí)因?yàn)橛袕耐饨缱x取數(shù)據(jù)和發(fā)送數(shù)據(jù)兩種場(chǎng)景,所以有inbound和outbound兩種情況。

ChannelHandler 則是具體的處理器,可通過addLast方式添加到pipeline管道鏈路上,如粘包說的LineBasedFrameDecoder也是一種具體的handler處理器。demo中提到的添加自定義handler代碼塊如下所示
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
通過這種添加方式形成了下面的鏈路
HEAD TAIL
LineBasedFrameDecoder -> StringDecoder -> TimeServerHandler
- netty讀取的規(guī)則是從head開始的,先進(jìn)行拆分、粘包的處理,再反序列化,后面交由具體的業(yè)務(wù)處理器,是inbound類處理器
- netty寫出的規(guī)則是從tail開始的,先進(jìn)行數(shù)據(jù)的序列號(hào),再發(fā)送出去,是outbound類處理器
這整個(gè)鏈路是比較清晰完整的,如果把StringDecoder和LineBasedFrameDecoder的處理器順序換一下,則會(huì)發(fā)現(xiàn)出現(xiàn)錯(cuò)誤,如下圖

圈住的地方換行符就是我們代碼中添加的 System.getProperty("line.separator") 換行導(dǎo)致,因?yàn)檫@個(gè)就1次調(diào)用,所以會(huì)發(fā)現(xiàn)只進(jìn)行了字符串的轉(zhuǎn)換,并沒有進(jìn)行拆包處理,再次把請(qǐng)求的數(shù)據(jù)量加大些,再測(cè)試看看

會(huì)發(fā)現(xiàn)服務(wù)端接收到的數(shù)據(jù)全部錯(cuò)誤了,沒有一個(gè)正確,切記不要把handler處理器順序搞錯(cuò),如下圖是netty源碼中關(guān)于順序的說明情況。

Handler 生命周期
handler在處理的時(shí)候是有著一定的順序,例如服務(wù)端先接收請(qǐng)求的注冊(cè),等到TCP/IP三次握手完成后,相當(dāng)于channel激活完成,開始接受客戶端正常的請(qǐng)求調(diào)用,然后返回響應(yīng)結(jié)果等,客戶端關(guān)閉后,服務(wù)端也需要進(jìn)行取消激活,關(guān)閉注冊(cè)的操作,以放棄該channel的管理操作。通過對(duì)其各個(gè)步驟的生命周期的管理,可以實(shí)現(xiàn)自定義的各種管理和控制。fireXXX又被包裝成類似于channelRegistered的名字,如下圖的調(diào)用過程

如本demo的運(yùn)行結(jié)果也可以很明顯的看出其執(zhí)行鏈路。

結(jié)束
到此netty的學(xué)習(xí)就結(jié)束了,并沒有介紹的太深入,也只是把常用的組件知識(shí)梳理了一遍,以便于我們?cè)谑褂胣etty的時(shí)候注意到這些問題,以發(fā)揮netty的最大功效,文中很多內(nèi)容都參考自《Netty權(quán)威指南》,大家如果有興趣的話可以自行閱讀學(xué)習(xí)和加強(qiáng)理解,下一期將會(huì)進(jìn)行RPC代碼中的netty部門的改造。
如代碼存在的問題歡迎提出~