Java之RPC高性能框架概述

1. RPC

RPC定義


自定義RPC框架的設(shè)計(jì)思路

這個(gè)框架需要的知識(shí)點(diǎn):socket、zookeeper、動(dòng)態(tài)代理、反射、spring

這個(gè)框架的socketServer部分,之前學(xué)習(xí)的demo程序只是一個(gè)基礎(chǔ),這里需要更高級(jí)的netty nio


2. NIO-New IO

2.1. 定義

nio是New IO的簡(jiǎn)稱(chēng),從jdk1.4開(kāi)始提供的新的api包。特性:為所有的原始類(lèi)型提供buffer緩存支持,字符集編碼解碼解決方案。

channel:一個(gè)新的原始I/O抽象。

支持鎖和內(nèi)存映射文件的文件訪問(wèn)接口。提供多路non-blocking非阻塞式的高伸縮性網(wǎng)絡(luò)IO。


2.2. socket nio原理

2.2.1. 傳統(tǒng)的I/O

傳統(tǒng)I/O程序讀取文件內(nèi)容,寫(xiě)到另一個(gè)文件或socket:

File.read(fileDesc, buf, len);

Socket.send(socket, buf, len);

以上是傳統(tǒng)IO做法,會(huì)有較大性能開(kāi)銷(xiāo),主要表現(xiàn)在兩個(gè)方面:

1. 上下文切換(context switch), 此處有4次用戶(hù)態(tài)和內(nèi)核態(tài)的切換

2. Buffer內(nèi)存開(kāi)銷(xiāo),一個(gè)是應(yīng)用程序buffer,另一個(gè)是系統(tǒng)讀取buffer以及socket buffer其運(yùn)行示意圖如下

1)先將文件內(nèi)容從磁盤(pán)中拷貝到操作系統(tǒng)buffer

2)再?gòu)腛S buffer拷貝到程序應(yīng)用buffer

3)從程序buffer拷貝到socket buffer

4)從socket buffer拷貝到協(xié)議引擎


2.2.2. NIO

NIO技術(shù)相比傳統(tǒng)IO技術(shù),省去了上面步驟2)、3),直接將read buffer拷貝到socket buffer。FileChannel.transferTo() 方法就是這樣的實(shí)現(xiàn),這個(gè)實(shí)現(xiàn)是依賴(lài)于OS底層的sendFile()實(shí)現(xiàn)的。

如下圖:


2.2.3. 傳統(tǒng)IO和NIO服務(wù)器端對(duì)比

傳統(tǒng)IO服務(wù)器端如果有多個(gè)客戶(hù)端連接,服務(wù)器每accept一個(gè)客戶(hù)端,都會(huì)創(chuàng)建一個(gè)Thread去跟客戶(hù)端通信。這樣看起來(lái)服務(wù)器端是沒(méi)有阻塞的,實(shí)際上服務(wù)器端是阻塞的,是一個(gè)偽異步方式的IO,阻塞在accept。如下圖:


NIO是使用select方式,接收l(shuí)inux kernel的消息通知模式來(lái)處理多客戶(hù)端的連接和消息收發(fā)。如下圖:

原始NIO demo代碼結(jié)構(gòu)如下:

服務(wù)器端:

服務(wù)器端主程序

public class MultiplexerTimeServer implements Runnable{

private Selector selector;

private ServerSocketChannel servChannel;

private volatile boolean stop;

public MultiplexerTimeServer(int port){

try {

selector = Selector.open();

servChannel = ServerSocketChannel.open();

servChannel.configureBlocking(false);

servChannel.socket().bind(new InetSocketAddress(port), 1024);

servChannel.register(selector,? SelectionKey.OP_ACCEPT);

} catch (IOException e) {

e.printStackTrace();

}

out.println("The time server is start in port : " + port);

}

public void stop(){

this.stop = true;

}

@Override

public void run() {

while(!stop){

try {

selector.select(1000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedKeys.iterator();

SelectionKey key = null;

while(it.hasNext()){

key = it.next();

it.remove();

handleInput(key);

if(key != null){

key.cancel();

if (key.channel() != null){

key.channel().close();

}

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

if(selector != null){

try {

selector.close();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

private void handleInput(SelectionKey key) throws IOException{

if(key.isValid()){

//process the new connection

if(key.isAcceptable()){

//accept the new connection

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

SocketChannel sc = ssc.accept();

sc.configureBlocking(false);

//add the new connection to the selector

sc.register(selector, SelectionKey.OP_READ);

}

if(key.isReadable()){

//read the data

SocketChannel sc = (SocketChannel) key.channel();

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0){

readBuffer.flip();

byte[] bytes = new byte[readBuffer.remaining()];

readBuffer.get(bytes);

String body = new String(bytes, "UTF-8");

out.println("The time server receive order : " + body);

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";

doWrite(sc, currentTime);

}else if(readBytes < 0){

//client is disconnect

key.cancel();

sc.close();

}else{

//read 0 byte do nothing

;

}

}

}

}

private void doWrite(SocketChannel channel, String response) throws IOException{

if(response != null && response.trim().length() > 0){

byte[] bytes = response.getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

writeBuffer.put(bytes);

writeBuffer.flip();

channel.write(writeBuffer);

}

}

}


客戶(hù)端:

客戶(hù)端主程序

public class TimeClientHandle implements Runnable{

private String host;

private int port;

private Selector selector;

private SocketChannel socketChannel;

private volatile boolean stop;

public TimeClientHandle(String host, int port) {

this.host = host == null ? "127.0.0.1" : host;

this.port = port;

try {

selector = Selector.open();

socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

@Override

public void run() {

try {

doConnect();

} catch (IOException e2) {

// TODO Auto-generated catch block

e2.printStackTrace();

}

while(!stop){

try {

selector.select(1000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedKeys.iterator();

SelectionKey key = null;

while(it.hasNext()){

key = it.next();

it.remove();

try{

handleInput(key);

}catch(Exception e){

if(key != null){

key.cancel();

if(key.channel() != null){

key.channel().close();

}

}

}

}

} catch (IOException e1) {

e1.printStackTrace();

System.exit(1);

}

}

if(selector != null){

try {

selector.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void doConnect() throws IOException{

if(socketChannel.connect(new InetSocketAddress(host, port))){

socketChannel.register(selector,? SelectionKey.OP_READ);

doWrite(socketChannel);

}else{

socketChannel.register(selector,? SelectionKey.OP_CONNECT);

}

}

private void doWrite(SocketChannel sc) throws IOException{

byte[] req = "Query Time Order".getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);

writeBuffer.put(req);

writeBuffer.flip();

sc.write(writeBuffer);

if(!writeBuffer.hasRemaining()){

out.println("Send order 2 server succeed.");

}

}

private void handleInput(SelectionKey key) throws IOException{

if (key.isValid()){

//check if connet succ

SocketChannel sc = (SocketChannel) key.channel();

if(key.isConnectable()){

if(sc.finishConnect()){

sc.register(selector, SelectionKey.OP_READ);

doWrite(sc);

}else{

System.exit(1); //connect error

}

}

if(key.isReadable()){

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0){

readBuffer.flip();

byte[] bytes = new byte[readBuffer.remaining()];

readBuffer.get(bytes);

String body = new String(bytes, "UTF-8");

out.println("Now is : " + body);

this.stop = true;

}else if(readBytes < 0){

key.cancel();

sc.close();

}else{

;

}

}

}

}

}


2.3. 高性能NIO框架netty

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RPC的實(shí)現(xiàn) 1. RPC客戶(hù)端 2. RPC服務(wù)端 RPC客戶(hù)端的實(shí)現(xiàn) RPC客戶(hù)端和RPC服務(wù)器端需要一個(gè)相同...
    架構(gòu)師springboot閱讀 802評(píng)論 0 11
  • kiss-rpc特性: 輕量級(jí),簡(jiǎn)單易用。支持idl和手動(dòng)編寫(xiě)協(xié)議兩種方式。模擬函數(shù)式調(diào)用方式,更加符合rpc遠(yuǎn)程...
    jasonsalex閱讀 2,179評(píng)論 0 4
  • 在 RPC Benchmark Round 1 中,Turbo 性能炸裂表現(xiàn)強(qiáng)悍,并且在 listUser 這一項(xiàng)...
    魯小憨閱讀 10,138評(píng)論 4 26
  • 上一篇文章我分享了如何進(jìn)行分析閱讀?,F(xiàn)在就現(xiàn)炒現(xiàn)賣(mài),用分析閱讀的方法來(lái)解剖一期“羅輯思維”——第160期《鄙視鏈?zhǔn)?..
    蘇沉的世界閱讀 608評(píng)論 0 0
  • 本文系半撇私塾新媒體創(chuàng)意寫(xiě)作項(xiàng)目里程碑作品一。 一個(gè)在情感上比較慢半拍的人,當(dāng)事情發(fā)生的時(shí)候沒(méi)有多大感覺(jué),但在事后...
    準(zhǔn)基控閱讀 266評(píng)論 0 0

友情鏈接更多精彩內(nèi)容