1. RPC


這個(gè)框架需要的知識(shí)點(diǎn):socket、zookeeper、動(dòng)態(tài)代理、反射、spring
這個(gè)框架的socketServer部分,之前學(xué)習(xí)的demo程序只是一個(gè)基礎(chǔ),這里需要更高級(jí)的netty nio
2. NIO-New IO
2.1. 定義
nio是New IO的簡(jiǎn)稱(chēng),從jdk1.4開(kāi)始提供的新的api包。特性:為所有的原始類(lèi)型提供buffer緩存支持,字符集編碼解碼解決方案。
channel:一個(gè)新的原始I/O抽象。
支持鎖和內(nèi)存映射文件的文件訪問(wèn)接口。提供多路non-blocking非阻塞式的高伸縮性網(wǎng)絡(luò)IO。
2.2. socket nio原理
2.2.1. 傳統(tǒng)的I/O
傳統(tǒng)I/O程序讀取文件內(nèi)容,寫(xiě)到另一個(gè)文件或socket:
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);
以上是傳統(tǒng)IO做法,會(huì)有較大性能開(kāi)銷(xiāo),主要表現(xiàn)在兩個(gè)方面:
1. 上下文切換(context switch), 此處有4次用戶(hù)態(tài)和內(nèi)核態(tài)的切換
2. Buffer內(nèi)存開(kāi)銷(xiāo),一個(gè)是應(yīng)用程序buffer,另一個(gè)是系統(tǒng)讀取buffer以及socket buffer其運(yùn)行示意圖如下

1)先將文件內(nèi)容從磁盤(pán)中拷貝到操作系統(tǒng)buffer
2)再?gòu)腛S buffer拷貝到程序應(yīng)用buffer
3)從程序buffer拷貝到socket buffer
4)從socket buffer拷貝到協(xié)議引擎
2.2.2. NIO
NIO技術(shù)相比傳統(tǒng)IO技術(shù),省去了上面步驟2)、3),直接將read buffer拷貝到socket buffer。FileChannel.transferTo() 方法就是這樣的實(shí)現(xiàn),這個(gè)實(shí)現(xiàn)是依賴(lài)于OS底層的sendFile()實(shí)現(xiàn)的。

如下圖:

2.2.3. 傳統(tǒng)IO和NIO服務(wù)器端對(duì)比
傳統(tǒng)IO服務(wù)器端如果有多個(gè)客戶(hù)端連接,服務(wù)器每accept一個(gè)客戶(hù)端,都會(huì)創(chuàng)建一個(gè)Thread去跟客戶(hù)端通信。這樣看起來(lái)服務(wù)器端是沒(méi)有阻塞的,實(shí)際上服務(wù)器端是阻塞的,是一個(gè)偽異步方式的IO,阻塞在accept。如下圖:

NIO是使用select方式,接收l(shuí)inux kernel的消息通知模式來(lái)處理多客戶(hù)端的連接和消息收發(fā)。如下圖:

原始NIO demo代碼結(jié)構(gòu)如下:

服務(wù)器端:

public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port){
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector,? SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
out.println("The time server is start in port : " + port);
}
public void stop(){
this.stop = true;
}
@Override
public void run() {
while(!stop){
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
handleInput(key);
if(key != null){
key.cancel();
if (key.channel() != null){
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//process the new connection
if(key.isAcceptable()){
//accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
//read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc, currentTime);
}else if(readBytes < 0){
//client is disconnect
key.cancel();
sc.close();
}else{
//read 0 byte do nothing
;
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException{
if(response != null && response.trim().length() > 0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
客戶(hù)端:

public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
while(!stop){
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host, port))){
socketChannel.register(selector,? SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,? SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException{
byte[] req = "Query Time Order".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
out.println("Send order 2 server succeed.");
}
}
private void handleInput(SelectionKey key) throws IOException{
if (key.isValid()){
//check if connet succ
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else{
System.exit(1); //connect error
}
}
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
out.println("Now is : " + body);
this.stop = true;
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
;
}
}
}
}
}