dubbo源碼分析21 -- 遠(yuǎn)程通信 netty

dubbo 做為 RPC 框架,需要進(jìn)行跨 JVM 通信,要保證高性、穩(wěn)定的進(jìn)行遠(yuǎn)程通信。dubbo 底層通信選擇了 netty 這個 nio 框架做為默認(rèn)的網(wǎng)絡(luò)通信框架并且通過自定義協(xié)議進(jìn)行通信。dubbo 支持以下網(wǎng)絡(luò)通信框架:

  • Netty(默認(rèn))
  • Mina
  • Grizzly

1、 netty 簡介

在網(wǎng)絡(luò)編碼領(lǐng)悟,Netty 是 Java 的卓越框架。它封裝了 Java NIO 操作的復(fù)雜性,使得開發(fā)者能夠使用其提供的簡易 API 就能夠開發(fā)出高效的網(wǎng)絡(luò)程序。

首先我們來看一下 netty 里面的關(guān)鍵特性:

分類 Netty的特性
設(shè)計 統(tǒng)一的 API ,支持多種傳輸類型,阻塞和非阻塞的簡單
簡單而強(qiáng)大的線程模型
真正的無連接數(shù)據(jù)報套接字支持
鏈接邏輯組件以支持復(fù)用
易于使用 詳實(shí)的 javadoc 和大量的示例集
不需要超過 JDK 1.6 的依賴
性能 擁有比 Java 核心 API 更高的吞吐量以及更低的延遲
得益于池化和復(fù)用,擁有更低的資源消耗
最少的內(nèi)存復(fù)制
健壯性 不會因?yàn)槁?、快速或者超載的連接而導(dǎo)致 OutOfMemoryError
消除在高速網(wǎng)絡(luò)中 NIO 應(yīng)用程序常見的不公平讀/寫比率
安全性 完整的 SSL/TLS 以及 StartTLS 支持
可用于受限環(huán)境下,如 Applet 和 OSGI
社區(qū)驅(qū)動 發(fā)布快速而頻繁

很多公司包含:Apple、Twitter、Facebook、Google 都在使用 Netty,并且很多流行的開源項(xiàng)目也在使用 Netty如:Vert.x 、Apache Cassandra 和 Elasticsearch,它們所有的核心代碼都是利用了 Netty 強(qiáng)大的網(wǎng)絡(luò)抽象。

2、netty 的核心組件

在 netty 中主要包括以下主要的核心構(gòu)件塊:

  • Channel
  • EventLoop
  • ChannelHandler
  • ChannelFuture

2.1 Channel

Channel 是 Java NIO 里面的一個基本構(gòu)造。可以把它看做是傳入或傳出數(shù)據(jù)的載體,然后通過 NIO 里面的 Buffer 來進(jìn)行數(shù)據(jù)傳遞。

overview-channels-buffers.png

而在 Netty 中自己也定義了一個 Channel,它的主要作用是:

連接到網(wǎng)絡(luò)套接字或能夠進(jìn)行輸入輸出的組件的連接操作,如讀、寫、連接和綁定。

通道提供給用戶以下功能:

  • 通道的當(dāng)前狀態(tài)(例如,open?connected?)
  • 通道的 ChannelConfig 配置參數(shù)(例如,接收緩沖區(qū)大?。?/li>
  • 通道支持的輸入/輸出操作(如讀、寫、連接和綁定)
  • 通道處理與通道相關(guān)聯(lián)的所有輸入/輸出事件和請求

具體的方法可以參看 Netty 的 javadoc 中的 Channel

2.2 EventLoop

EventLoop 是 Netty 的核心概念, netty 里面的網(wǎng)絡(luò)編程處理都是基于事件回調(diào)。下面的圖說明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之間的關(guān)系

event.png

它們的關(guān)系是:

  • 一個 EventLoopGroup 包含一個或者多個 EventLoop;
  • 一個 EventLoop 在它的生命周期內(nèi)只和一個 Thread 綁定;
  • 所有的 EventLoop 處理的 I/O 事件都將在它專門的 Thread 被處理;
  • 一個 Channel 在它的生命周期內(nèi)只注冊于一個 EventLoop;
  • 一個 EventLoop 可能會被分配一個或者多個 Channel;

netty 這樣的設(shè)計,一個操作 I/O 的 Channel 都會由相同的 Thread 執(zhí)行的,這樣就能夠消除線程之間的同步。所以 netty 內(nèi)部是通過回調(diào)來處理事件。當(dāng)一個回調(diào)被觸發(fā)時相關(guān)的事件。

2.3 ChannelHandler

ChannelHandler 在 Netty 里面的地位,大家可以理解成 Bean 在 Spring 里面的地址。Spring 把需要操作的 POJO 抽象成 Bean,然后對于對象的管理都是可以通過 spring bean 的生命周期來進(jìn)行管理。同樣的,我們在操作 Netty 的時候,其實(shí)就是操作的就是 ChannelHandler。

在上一個小節(jié)闡述了 EventLoop 在一個線程里面管理 I/O 操作 Channel 的生命周期。而 netty 的內(nèi)部是通過回調(diào)來處理相關(guān)的事件。理解 ChannelHandler 其實(shí)可以理解成 Java Servlet 里面的 Filter.

filter.png

而 Netty 在處理 Channel 的時候也是類似的。只不過它區(qū)別得更加明確, 入站的時候使用是 ChannelHandler 的子接口 ChannelInboundHandler,而出站使用的 ChannelHandler 的子接口 ChannelOutboundHandler,然后 ChannelPipeline 提供了 ChannelHandler 鏈的容器。

pipeline.png

具體使用 Netty 的時候可以注冊哪些回調(diào)事件,大家可以參看以下接口對應(yīng)的 API:

2.4 ChannelFuture

因?yàn)?Netty 里面的操作都是異步的,所以一個操作可能不會立即返回。這樣我們就需要一種用于在之后的某個時間點(diǎn)確定其結(jié)果的方法。因此, Netty 提供了 ChannelFuture 接口。它提供了 addListener() 方法來注冊一個 ChannelFutureListener,以便在某個操作完成時(無論是否成功) 得到通知。

可以將 ChannelFuture 看做是將來要執(zhí)行的操作的結(jié)果的占位符。它究竟什么時候被執(zhí)行則可能取決于若干因素,因此不可能準(zhǔn)確的預(yù)測,但是可以肯定的是它將會被執(zhí)行。并且,所有屬于同一個 Channel 的操作都被保證其將以它們被調(diào)用的順序被執(zhí)行

3、Netty Demo

在上面的章節(jié)介紹了 Netty 里面的核心概念,下面我們就來編寫一個 Hello World 來加深一下對上面概念的理解。

3.1 Netty Server

編寫 Server 端業(yè)務(wù)處理類,因?yàn)樾枰憫?yīng)客戶端傳入的信息,所以需要實(shí)現(xiàn) ChannelInboundHandler 接口。作為一個簡單的 hello world 程序,我們可以繼承 ChannelInboundHandlerAdapter 它提供了 ChannelInboundHandler 的默認(rèn)實(shí)現(xiàn),我們只需要重寫需要關(guān)注的方法就行了。

EchoServerHandler

@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 將接收到的消息寫給發(fā)送者,而不沖刷出站消息
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 將未決消息沖刷到遠(yuǎn)程節(jié)點(diǎn),并且關(guān)閉該 Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 打印異常堆棧
        cause.printStackTrace();
        // 關(guān)閉該 Channel
        ctx.close();
    }
}

下面就需要創(chuàng)建引導(dǎo)服務(wù)器,綁定服務(wù)器并且監(jiān)聽并且接收傳入連接請求的端口,配置 Channel ,用來將傳入的入站消息通知給 EchoServerHandler 實(shí)例。

EchoServer

public class EchoServer {
    
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8080).start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(serverHandler);
                            }
                        });
            ChannelFuture future = bootstrap.bind().sync();
            future.channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        }
    }
}

上面的服務(wù)主要做了以下幾件事:

  • main() 方法引導(dǎo)了服務(wù)器,引導(dǎo)過程中所需要以下步驟
  • 創(chuàng)建一個 ServerBootstrap 的實(shí)例用于引導(dǎo)和綁定服務(wù)
  • 創(chuàng)建兩個 EventLoopGroup 實(shí)例,boss 用于接收請求,而 worker 用于真正的請求處理
  • 指定服務(wù)器綁定到本地的 InetSocketAddress
  • 使用一個 EventLoopGroup 的實(shí)例初始化每一個新的 Channel
  • 調(diào)用 ServerBootstrap.bind() 方法綁定服務(wù)器

這個時候服務(wù)器已經(jīng)初始化好了,可以 接收客戶端發(fā)送過來的請求了。

3.2 Netty Client

首先編碼客戶端處理類,客戶端處理類需要發(fā)送一個或者多個消息到服務(wù)器。并且在發(fā)送消息之后接收服務(wù)器發(fā)回的響應(yīng),最后關(guān)閉連接。

EchoClientHandler

@ChannelHandler.Sharable
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 當(dāng)被通知 Channel 活躍的時候,發(fā)送消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock ! ", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 打印服務(wù)端響應(yīng)的信息
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Client received : " + in.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 打印異常,并關(guān)閉 Channel
        cause.printStackTrace();
        ctx.close();
    }
}

下面我們就需要編寫客戶端引導(dǎo)類,用于連接服務(wù)端并且與服務(wù)端進(jìn)行通信。

EchoClient

public class EchoClient {
    
    private final String host;
    
    private final int prot;

    public EchoClient(String host, int prot) {
        this.host = host;
        this.prot = prot;
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("localhost", 8080).start();
    }
    
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .remoteAddress(new InetSocketAddress(host, prot))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });
            ChannelFuture future = bootstrap.connect().sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

}

以上代碼做的事如下:

  • 創(chuàng)建一個 Bootstrap 實(shí)例以初始化客戶端
  • 創(chuàng)建一個 EventLoopGroup 實(shí)例處理創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù)
  • 為連接服務(wù)器創(chuàng)建一個 InetSocketAddress 實(shí)例
  • 當(dāng)連接被建立時,一個 EchoClientHandler 實(shí)例會被安裝到 ChannelPipeline 中
  • 一切設(shè)置完成后,調(diào)用 Bootstrap.connect() 方法連接到遠(yuǎn)程節(jié)點(diǎn)

下面我們就可以測試程序的正確性了。

3.3 Test

運(yùn)行服務(wù)端引導(dǎo)類 EchoServer,然后運(yùn)行客戶端引導(dǎo)類 EchoClient。此時在服務(wù)端的控制臺打印以下結(jié)果:

server.png

接著在客戶端的控制臺打印以下結(jié)果:

client.png

是不是很簡單 :)

參考資料:

  • Netty In Action
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容