前言
在源碼分析的第一部分Netty源碼(一)Netty架構解析里面提到了netty的幾個關鍵組件
-
EventLoop
EventLoop是Netty中最重要的組件,一個單線程事件循環(huán),監(jiān)聽IO事件、處理IO事件和任務隊列。 -
EventLoopGroup
EventLoopGroup顧名思義,管理多個EventLoop。 -
Channel
Channel是能夠進行IO操作組件的抽象,如讀、寫、連接和綁定。 -
ChannelHandler
ChannelHandler是我們使用Netty開發(fā)最關心的一個組件,它是對IO事件的具體處理邏輯。 -
ChannelPipeline
ChannelPipeline則是ChannelHandler的容器,本質是一個雙向鏈表,將所有的ChannelHandler按照順序連接起。
在本篇文章將會分析一個netty服務器的啟動流程,以及是如何將這個組件給串聯(lián)起來的。
服務端代碼
下面是一段服務端的啟動代碼,來自netty官網(wǎng)
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
- EventLoopGroup 之前以及詳細介紹過了,處理IO操作的多線程事件循環(huán),分為bossGroup和workGroup,分別處理連接事件和讀寫事件。可以通過構造參數(shù)指定EventLoopGroup 的線程數(shù)量,默認為當前cup數(shù)量的兩倍;
- ServerBootstrap 為了方便構建服務器的輔助類;
- channel:由于是服務器端,使用NioServerSocketChannel構建一個Channel用于接收傳入的連接;
- childHandler:添加用戶自定義的ChannelHandler,比如解碼、編碼處理器和業(yè)務處理的Handler。
- option:設置Channel的一些參數(shù),比如這里正在實現(xiàn)一個TCP/IP服務器,可以指定一些TCP參數(shù)比如tcpNoDelay和keepAlive。
- childOption:option與childOption不同在于,option指定的是傳入channel的參數(shù)如現(xiàn)在的NioServerSocketChannel,而childOption指定的是NioServerSocketChannel接受的Channel。
- 將服務綁定到指定的端口,并啟動服務。
可以看見netty構建一個服務器代碼是很簡單明了,大家如果看過用NIO構建一個服務器的代碼是非常復雜和抽象的,而netty用極短的代碼就能構建出一個服務器,可見netty框架對底層細節(jié)的抽象和對框架的設計的能力。
Channel初始化
接下來開始分析服務端的啟動流程,上面代碼構造的服務器最關鍵的一步就是調(diào)用bind(),其他的group、childHandler方法都是做一些簡單的賦值操作,這里就不分析了,而bind()是將上一步賦值的參數(shù)全部串起來,然后啟動服務。
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
跟到關鍵實現(xiàn)的地方doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// ......
doBind0(regFuture, channel, localAddress, promise);
// ......
}
總結起來這個方法做了兩個事件:
-
initAndRegister(): 初始化傳入的Channel并且注冊到EventLoopGroup -
doBind0():綁定端口
initAndRegister()
由名字可知初始化并且注冊,但是初始化什么、注冊到哪里?雖然已經(jīng)告訴了答案,
初始化傳入的Channel并且注冊到EventLoopGroup,還是要具體分析一下是怎么初始化和注冊的。
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//初始化Channel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// ......
}
//注冊到EventLoopGroup
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
初始化
channel = channelFactory.newChannel();
看到這行代碼要channelFactory是什么,做什么作用?分析源碼的過程中很容易就迷失,找不到關鍵的地方,帶著問題去分析是最好的方法。比如這里當我們不知道這個channelFactory是什么的時候,首先看他的定義是什么
public interface ChannelFactory<T extends Channel> {
/**
* Creates a new channel.
*/
T newChannel();
}
定義了一個接口,初始化Channel,可以知道channelFactory的職責是初始化一個Channel。那這里channelFactory的具體實現(xiàn)是什么呢?
最簡單的方法就是使用debug的方式找到channelFactory的實現(xiàn)類,或者使用編譯器看這個變量這哪里被調(diào)用,做的復制操作。通過這些方法我們定位到了代碼
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
上訴代碼可以知道我們在調(diào)用.channel(NioServerSocketChannel.class)的時候就對channelFactory做了初始化,默認為ReflectiveChannelFactory,顧名思義通過反射初始化Channel,下面代碼就展示了ReflectiveChannelFactory的實現(xiàn)。通過反射拿到Channel的構造器,然后調(diào)用初始化方法。
ReflectiveChannelFactory
//.....
this.constructor = clazz.getConstructor();
//.....
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
所以分析到這里,我們就可以知道channel = channelFactory.newChannel();
這里的初始化的Channel就是第一步構造ServerBootstrap傳入的NioServerSocketChannel。
接下來分析第二步init(channel)
abstract void init(Channel channel) throws Exception;
可以看見是一個抽象方法,找到服務端的具體實現(xiàn)
void init(Channel channel) {
//設置屬性
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
// ......
p.addLast(new ChannelInitializer<Channel>() {
// 將啟動類傳入的ChannelHandler 加入pipeline
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// acceptor角色
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這段代碼比較長,個人認為有兩個點需要重點分析
- ChannelPipeline p = channel.pipeline();
- ServerBootstrapAcceptor
ChannelPipeline netty中的一個關鍵的組件在這里出現(xiàn)了,ChannelPipeline是ChannelHandler 的容器,這里是調(diào)用channel的pipeline()返回的ChannelPipeline對象。所以需要找一下ChannelPipeline 是什么、在什么時候初始化的,按照之前的方法很輕松的在AbstractChannel找到代碼
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
可見是在Channel初始化的時候創(chuàng)建的ChannelPipeline默認為DefaultChannelPipeline,所以在這里也可以知道每個Channel都對應著一個ChannelPipeline。
接下來分析很關鍵的代碼
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
在pipeline中添加了名為ServerBootstrapAcceptor的處理器,在第一篇文章中我們提到過Reactor模式,里面就有一個acceptor角色用于將mainReactor角色介紹的連接轉給subReactor,這里看一下ServerBootstrapAcceptor是不是也在做這個事情。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
繼承自ChannelInboundHandlerAdapter,可見是一個ChannelHandler而且處理的是入站事件,下面分析它是怎么處理的入站事件。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 父Channel監(jiān)聽的是連接事件,所有直接轉為Channel
final Channel child = (Channel) msg;
//添加 啟動類傳入的childHandler
child.pipeline().addLast(childHandler);
//設置啟動類傳入的屬性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 將boosWorup獲得的連接注冊到childGroup上
childGroup.register(child).addListener(new ChannelFutureListener() {
// ......
});
} catch (Throwable t) {
// ......
}
}
- 將父Channel(NioServerSocketChannel)讀取的信息強轉為Channel
- 添加 啟動類傳入的childHandler
- 設置啟動類傳入的屬性
- 將boosWorup獲得的連接注冊到childGroup上
分析到這里可以知道netty是通過ChannelHandler的形式將bossGroup與workGroup聯(lián)系起來,當bossGroup接受到新的連接就將其注冊到childGroup上。
最后再回到initAndRegister()這個方法的最后一步
ChannelFuture regFuture = config().group().register(channel);
這個方法就是將我們實體化的NioServerSocketChannel注冊到bossGroup上。
分析到這里對initAndRegister()做一個簡單的總結
- 通過channelFactory使用反射的形式將啟動類傳入的Channel(NioServerSocketChannel)實例化;
- 實例化Channel的時候會創(chuàng)建一個ChannelPipeline對象,以Channel一一對應;
- 通過ServerBootstrapAcceptor將bossGroup接入的連接轉給workGroup;
- 將實例化的Channel(NioServerSocketChannel)注冊到boosGroup上。
所以initAndRegister()實例化的是啟動類傳入的Channel對象并注冊在boosGroup上。
綁定地址
這里分析服務啟動第二步doBind0綁定端口地址
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
跟進去channel.bind(SocketAddress localAddress, ChannelPromise promise)找到AbstractChannel的bind方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
調(diào)用pipeline的bind方法,再跟進去
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
之前說過ChannelPipeline是管理ChannelHandler的容器,DefaultChannelPipeline的實現(xiàn)就是用一個雙向鏈表,初始化的是一個有一個頭節(jié)點HeadContext和尾節(jié)點TailContext,這里不具體分析。
這里分析需要打上斷點一步一步調(diào)用,最好找到實現(xiàn)在AbstractUnsafe中的bind方法
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//......
boolean wasActive = isActive();
try {
//綁定端口地址
doBind(localAddress);
} catch (Throwable t) {
//......
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//觸發(fā)channelActive事件
pipeline.fireChannelActive();
}
});
}
//綁定成功
safeSetSuccess(promise);
}
簡化之后做三件事情調(diào)用jdk底層綁定端口,觸發(fā)channelActive事件,將promise設置為成功。
doBind 調(diào)用jdk底層綁定端口
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
就此服務端啟動的流程結束,當然這里分析不是所有細節(jié)都涉及到了,只是對netty服務端啟動流程的一個簡要分析。
總結
- 通過ServerBootstrap將主要的組件先添加進去,比如NioEventLoopGroup、Channel、ChannelHandler、option、childOption等;
- 通過反射創(chuàng)建出Channel,并且在創(chuàng)建Channel的時候也會初始化ChannelPipeline;
- 將ChannelHandler添加到Channel所對應的ChannelPipeline中;
- 通過ServerBootstrapAcceptor將Channel所接受的連接轉交給workGroup處理;
- 將第2步初始化Channel注冊到bossGroup上;
- 調(diào)用jdk底層綁定端口地址;
- 觸發(fā)fireChannelActive事件;
netty服務端的啟動流程主要就是上面的流程,越看netty的源碼就越感覺netty設計的精妙。不管是底層細節(jié)、還是整體框架的設計,比如創(chuàng)建一個netty的客戶端如下
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
對比服務器只有少數(shù)不同的地方?;蛘卟皇褂肗ioServerSocketChannel作為服務器的底層連接將NioServerSocketChannel替換就可以了比如EpollServerSocketChannel就可以,不需要調(diào)整其他地方。本篇的源碼分析就到此為止。