1.Linux網(wǎng)絡(luò)IO模型
在linux系統(tǒng)中所有的外部設(shè)備的操作都可以看作是一個(gè)文件操作,linux對文件操作的外部設(shè)備返回一個(gè)文件描述符fd(file descriptor)。對于socket的訪問也有一個(gè)描述符表示,稱為socketfd描述符,它表示一個(gè)數(shù)字,指向內(nèi)核系統(tǒng)中的文件路徑或者數(shù)據(jù)區(qū)等機(jī)構(gòu)體。
在實(shí)際應(yīng)用開發(fā)中可以將數(shù)據(jù)區(qū)域分為兩個(gè)區(qū)域,一個(gè)是用戶進(jìn)程區(qū)域,一個(gè)內(nèi)核區(qū)域,程序的數(shù)據(jù)操作都是基于用戶區(qū)域的,用戶區(qū)域和內(nèi)核區(qū)域的數(shù)據(jù)交換稱為io操作(這個(gè)定義比較狹隘,只是想對用戶程序和內(nèi)核之間交換的io進(jìn)行總結(jié),更詳細(xì)的內(nèi)容可以參考《Unix網(wǎng)絡(luò)編程》)。
以Java的網(wǎng)絡(luò)傳輸過程為例,在實(shí)際開發(fā)中為了提高傳輸效率,linux開發(fā)者使用了很多的緩存用來傳輸數(shù)據(jù),同樣的在應(yīng)用進(jìn)程中也是內(nèi)存的處理效率高,在用戶進(jìn)程合內(nèi)核之間的交互過程是對緩存數(shù)據(jù)之間的交換。
數(shù)據(jù)從一個(gè)用戶進(jìn)程通過網(wǎng)卡傳輸?shù)絻?nèi)核空間,另一個(gè)用戶進(jìn)程訪問內(nèi)核空間獲得這些數(shù)據(jù)。
1.1 阻塞IO
Linux系統(tǒng)中,缺省的情況下,所有的文件操作都是阻塞的。進(jìn)程空間在進(jìn)行系統(tǒng)調(diào)用的時(shí)候從開始調(diào)用到數(shù)據(jù)復(fù)制完成這一過程都是阻塞的,數(shù)據(jù)準(zhǔn)備階段、數(shù)據(jù)拷貝階段整段過程都是阻塞的,因此稱為阻塞IO。以socket調(diào)度用為例,如圖1所示,阻塞IO調(diào)用全過程,用戶程序在調(diào)用讀取操作指導(dǎo)數(shù)據(jù)復(fù)制完成這一整段過程結(jié)束的時(shí)候都是阻塞的,因此被稱為阻塞io。

1.2 同步非阻塞IO
同步非阻塞IO是在數(shù)據(jù)準(zhǔn)備階段直接同步返回結(jié)果,如果沒有數(shù)據(jù)就會直接返回,如果有數(shù)據(jù)就進(jìn)行復(fù)制操作指導(dǎo)操作完成返回。內(nèi)核緩沖區(qū)沒有數(shù)據(jù)的時(shí)候就會直接返回一個(gè)報(bào)錯(cuò),應(yīng)用程序通過循環(huán)調(diào)用來判斷數(shù)據(jù)是否準(zhǔn)備完成。
圖2為調(diào)用過程,假設(shè)前面的調(diào)用內(nèi)核緩沖區(qū)都是沒有數(shù)據(jù)的,就會直接返回用戶進(jìn)程,直達(dá)有數(shù)據(jù)之后才會阻塞進(jìn)行數(shù)據(jù)復(fù)制。
如圖所示同步非阻塞IO流程如下:
(1) 發(fā)生一次數(shù)據(jù)讀取操作,如果內(nèi)核沒有數(shù)據(jù),系統(tǒng)調(diào)用直接返回失敗的標(biāo)志;
(2) 如果內(nèi)核緩沖區(qū)有數(shù)據(jù),那么操作成功,并且一直阻塞到數(shù)據(jù)復(fù)制完成返回。
以上流程中可以分析出同步非阻塞IO的特點(diǎn):首先同步非阻塞IO很好的避免了阻塞IO需要等待內(nèi)核緩沖區(qū)數(shù)據(jù)準(zhǔn)備的過程,用戶程序不必阻塞,可以做其他的事情;與此同時(shí)也增加了輪訓(xùn)查詢訪問內(nèi)核的操作,使得CPU等資源消耗增加,并且增加了輪訓(xùn)的操作,在實(shí)際開發(fā)中也存在一定的局限性。
1.3 IO多路復(fù)用
Linux的IO模型中增加了一種select/epoll模型,將文件描述符傳遞給select/epoll系統(tǒng)調(diào)用,當(dāng)文件描述符準(zhǔn)備就緒之后一般是內(nèi)核可讀寫之后通知到用戶進(jìn)程進(jìn)行相應(yīng)的IO操作。
這樣單個(gè)線程就可以處理多個(gè)IO事件輪訓(xùn)操作,當(dāng)內(nèi)核中的有準(zhǔn)備就緒的IO事件之后,事件回調(diào)通知到select事件準(zhǔn)備就緒。值得注意的是select/poll模型中是采用的還是輪訓(xùn)的方式來處理select事件監(jiān)控,epoll采用得到是事件驅(qū)動的方式來代替順序掃描的方式,如果有fd準(zhǔn)備就緒的時(shí)候,立刻調(diào)用回調(diào)函數(shù)。
我們以Java NIO流程來說明IO多路復(fù)用的流程:
(1) 首先我們需要創(chuàng)建一個(gè)selector選擇器,將所有的IO操作都注冊到選擇器中去;
(2) 選擇器選擇準(zhǔn)備就緒的IO操作,將準(zhǔn)備就緒的IO放到準(zhǔn)備就緒集合中;
(3) 應(yīng)用程序?qū)⒅鱾鋵?zhǔn)備就緒的集合進(jìn)行相關(guān)處理,其實(shí)就是進(jìn)行IO操作;
(4) 發(fā)生IO操作,將數(shù)據(jù)從內(nèi)核緩沖區(qū)到用戶緩沖區(qū)中,這個(gè)過程是阻塞的。
如圖3所示為IO多路復(fù)用調(diào)用圖:
1.4 信號驅(qū)動IO
我的理解信號驅(qū)動和epoll模型的最大區(qū)別是信號驅(qū)動一開始會開啟套接字信號驅(qū)動IO功能,應(yīng)用程序通過系統(tǒng)執(zhí)行一個(gè)信號處理程序。當(dāng)內(nèi)核數(shù)據(jù)準(zhǔn)備就緒的時(shí)候就通過回調(diào)函數(shù)通知用戶進(jìn)程進(jìn)行下一步IO操作。
而epoll采用事件驅(qū)動方式有一種異曲同工的地方(后續(xù)如果有深入研究linux調(diào)用流程之后再更新結(jié)果)。
如圖4所示為信號驅(qū)動模型的調(diào)用圖:
1.5 異步IO
異步IO的整個(gè)IO都是異步的,簡單來說就是用戶線程通知系統(tǒng)進(jìn)行一個(gè)IO操作,在IO操作過程中,包括數(shù)據(jù)準(zhǔn)備,數(shù)據(jù)復(fù)制到用戶空間整個(gè)過程都是內(nèi)核自己完成,用戶只需要處理接下來的業(yè)務(wù)就可以了。
如圖5為異步IO模型處理過程:
2. Reactor設(shè)計(jì)模式
2.1 傳統(tǒng)OIO模式
首先為什么要使用Reactor設(shè)計(jì)模式呢?在回答這個(gè)問題之前先看下Java傳統(tǒng)的IO模式OIO。在Java NIO出現(xiàn)之前,Java一直采用的是阻塞IO模型進(jìn)行編程,那么阻塞操作會出現(xiàn)在哪些地方呢,或者說我們會更加關(guān)心哪里會出現(xiàn)阻塞呢。
如圖2.1所示為傳統(tǒng)IO模式處理示意圖:
圖中所示一般是一個(gè)請求一個(gè)單獨(dú)的處理線程。接下來看下具體的編程實(shí)現(xiàn)是怎么樣的。如下面的一段代碼,采用的就是阻塞操作:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class IOServer{
private ServerSocket serverSocket = null;
private static Socket socket = null;
private static InputStream inputStream = null;
private static OutputStream outputStream = null;
public IOServer() throws IOException {
serverSocket = new ServerSocket(18010);
}
public void startServer() {
try {
//阻塞操作
socket = serverSocket.accept();
handler(socket);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void handler(Socket socket) throws IOException{
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
byte[] bytes = new byte[1024];
//阻塞操作
int len = inputStream.read(bytes);
System.out.println(new String(bytes,0,len,"utf-8"));
outputStream.write("this is server".getBytes("utf-8"));
}
public void stop() throws Exception {
if(null != outputStream){
outputStream.close();
outputStream = null;
}
if(null != inputStream){
inputStream.close();
inputStream = null;
}
if(null != socket){
socket.close();
socket = null;
}
if(null != serverSocket){
serverSocket.close();
serverSocket = null;
}
}
public static void main(String[] args) throws IOException {
new IOServer().startServer();
}
}
看下面的代碼,accpet操作是阻塞的業(yè)務(wù)處理中的,handler中的讀寫請求也是阻塞的,那么這樣的一種IO模式將會導(dǎo)致一個(gè)線程的請求沒有處理完成無法處理下一個(gè)請求,這樣就大大降低了吞吐量,這將是一個(gè)嚴(yán)重的問題。
//阻塞操作
socket = serverSocket.accept();
int len = inputStream.read(bytes);
為了解決這種問題就出現(xiàn)了一個(gè)經(jīng)典的模式——Connection Per Thread即一個(gè)線程處理一個(gè)請求。
如下面這一段代碼就是采用得到了多線程的方式來處理請求:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class IOServerMuti implements Runnable {
private ServerSocket serverSocket = null;
private static Socket socket = null;
private static InputStream inputStream = null;
private static OutputStream outputStream = null;
public IOServerMuti() throws IOException {
serverSocket = new ServerSocket(18010);
}
@Override
public void run() {
// 一個(gè)請求一個(gè)線程處理
while (!Thread.interrupted())
startServer();
}
public void startServer() {
try {
// 阻塞操作
socket = serverSocket.accept();
handler(socket);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void handler(Socket socket) throws IOException {
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
byte[] bytes = new byte[1024];
// 阻塞操作
int len = inputStream.read(bytes);
System.out.println(new String(bytes, 0, len, "utf-8"));
outputStream.write("this is server".getBytes("utf-8"));
}
public void stop() throws Exception {
if (null != outputStream) {
outputStream.close();
outputStream = null;
}
if (null != inputStream) {
inputStream.close();
inputStream = null;
}
if (null != socket) {
socket.close();
socket = null;
}
if (null != serverSocket) {
serverSocket.close();
serverSocket = null;
}
}
}
如下面這段代碼:
// 一個(gè)請求一個(gè)線程處理
while (!Thread.interrupted())
startServer();
對于每一個(gè)新的請求都會分配一個(gè)新的線程來處理,這樣的好處就是每個(gè)socket的請求相互之間不受影響,每個(gè)請求的業(yè)務(wù)邏輯相互之間也不影響。任何socket的讀寫操作都不會影響到后面的請求。
那么這樣做的后果是什么呢?很顯然每個(gè)鏈接都分配一個(gè)線程來處理,但不是每個(gè)鏈接都有請求發(fā)生,這樣就浪費(fèi)了很多的線程資源。這種情況下能不能使用一個(gè)線程來處理請求能,答案是不可以的。前面講到的阻塞IO模型的IO讀寫都是阻塞的,無法做到并行處理。
這個(gè)時(shí)候可以采用多路復(fù)用IO模型的方式來處理IO事件,使用Reactor將響應(yīng)IO事件和業(yè)務(wù)處理分開,一個(gè)或多個(gè)線程來處理IO事件,然后將就緒得到事件分發(fā)到業(yè)務(wù)處理handlers線程去異步非阻塞處理。
2.2 Reactor模式
2.3 單線程Reactor模式
什么是單線程Reactor模式,單線程模式采用一個(gè)Reactor線程來處理套接字,新連接的創(chuàng)建,并且將接收到的請求分發(fā)到處理器Handler中。
如圖2.2為簡單的單線程Reactor模式示意圖,Reactor線程和處理器線程在一個(gè)線程里,圖2.2參考doug lea論文《Scalable IO in Java》論文。
下面用Java NIO為例說明在實(shí)際開發(fā)中用到的Reactor模式:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ReactorDemo implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public ReactorDemo() throws IOException {
// 創(chuàng)建選擇器
selector = Selector.open();
// 創(chuàng)建nio服務(wù)端
serverSocketChannel = ServerSocketChannel.open();
// 綁定端口
serverSocketChannel.bind(new InetSocketAddress(18010));
// 將接收事件注冊到選擇器上,OP_ACCEPT表示注冊接收事件
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 設(shè)置回調(diào)函數(shù)
sk.attach(new AcceptorDemo());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
// 事件準(zhǔn)備就緒,分發(fā)到對應(yīng)的handler進(jìn)行處理
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
}
}
public void dispatch(SelectionKey sk) {
// 調(diào)用之間注冊得到對象,域之前的attach對應(yīng)
Runnable r = (Runnable) (sk.attachment());
// 調(diào)用之前注冊的callback對象
if (r != null) {
r.run();
}
}
class AcceptorDemo implements Runnable {
@Override
public void run() {
// 創(chuàng)建連接
SocketChannel channel;
try {
channel = serverSocketChannel.accept();
if (channel != null)
new Handler(selector, channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Handler {
private SelectionKey selector;
private SocketChannel channel;
public Handler(Selector selector, SocketChannel channle) {
channel.configureBlocking(false);
sk = channel.register(selector, 0);
// 將Handler作為callback對象
sk.attach(this);
// 第二步,注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
read();
send();
}
/************* 業(yè)務(wù)處理 ****************/
}
}
上面演示代碼即為單線程Reactor模式,與之前的OIO編程有幾個(gè)變動的地方,
首先看下面的一段代碼,在這種模式下面增加了一個(gè)selector選擇器,并且將接收事件注冊到選擇器中,設(shè)置回調(diào)函數(shù)attach,當(dāng)事件準(zhǔn)備就緒得到時(shí)候直接執(zhí)行AcceptorDemo處理類。
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new AcceptorDemo());
另外看下面一段代碼,這段代碼有兩個(gè)功能,其一是檢測事件是否準(zhǔn)備就緒,其二是將準(zhǔn)備就緒的事件分發(fā)到對應(yīng)的處理線程中去,dispatch對應(yīng)前面的attach。
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()){
//事件準(zhǔn)備就緒,分發(fā)到對應(yīng)的handler進(jìn)行處理
dispatch((SelectionKey) (it.next()));
}
另外在業(yè)務(wù)處理的時(shí)候可以將IO讀寫事件傳入到同一個(gè)Reactor中處理,如下代碼所示,這段代碼將讀寫操作注冊到了同一個(gè)reactor,這樣reactor和handler就在同一個(gè)線程中執(zhí)行,并且將本身對象傳輸?shù)絘ttach作為回調(diào)對象,這樣在回調(diào)的時(shí)候就能執(zhí)行到自己的讀寫方法。
channel.configureBlocking(false);
sk = channel.register(selector, 0);
//將Handler作為callback對象
sk.attach(this);
//第二步,注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
以上就是單線程Reactor的特點(diǎn)和實(shí)現(xiàn)實(shí)例,這種實(shí)現(xiàn)方式有存在著缺點(diǎn),從實(shí)例代碼中可以看出,handler的執(zhí)行是串行的,如果其中一個(gè)handler處理線程阻塞將導(dǎo)致其他的業(yè)務(wù)處理阻塞。由于handler和reactor在同一個(gè)線程中的執(zhí)行,這也將導(dǎo)致新的無法接收新的請求。為了解決這種問題,有人提出使用多線程的方式來處理業(yè)務(wù),也就是在業(yè)務(wù)處理的地方加入線程池異步處理,將reactor和handler在不同的線程來執(zhí)行。
2.4 多線程Reactor模式
多線程reactor模式的設(shè)計(jì)思想就是將handler線程放入到線程次中,在多核的情況下也可以考慮多個(gè)Selector選擇器來處理事件,如圖2.3為簡單的多線程Reactor示意圖;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ReactorDemo{
private final Selector[] selectors = new Selector[2];
private final ServerSocketChannel serverSocketChannel;
Reactor[] reactors = new Reactor[2];
SelectionKey sk;
public ReactorDemo() throws IOException {
//創(chuàng)建兩個(gè)選擇器
selectors[0] = Selector.open();
selectors[1] = Selector.open();
//創(chuàng)建nio服務(wù)端
serverSocketChannel = ServerSocketChannel.open();
//綁定端口
serverSocketChannel.bind(new InetSocketAddress(18010));
//將接收事件注冊到選擇器上,OP_ACCEPT表示注冊接收事件,一個(gè)選擇器處理請求事件,另一個(gè)選擇器處理IO讀寫事件
sk = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);
//設(shè)置回調(diào)函數(shù)
sk.attach(new AcceptorDemo());
//初始化反應(yīng)器
reactors[0] = new Reactor(selectors[0]);
reactors[1] = new Reactor(selectors[1]);
}
class Reactor implements Runnable{
//每個(gè)線程負(fù)責(zé)一個(gè)selector
private final Selector selector;
public Reactor( Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try
{
while (!Thread.interrupted())
{
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()){
//事件準(zhǔn)備就緒,分發(fā)到對應(yīng)的handler進(jìn)行處理
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex){}
}
}
public void dispatch(SelectionKey sk) {
//調(diào)用之間注冊得到對象,域之前的attach對應(yīng)
Runnable r = (Runnable) (sk.attachment());
//調(diào)用之前注冊的callback對象
if (r != null)
{
r.run();
}
}
class AcceptorDemo implements Runnable{
@Override
public void run() {
//創(chuàng)建連接
SocketChannel channel;
try {
channel = serverSocketChannel.accept();
if (channel != null)
new Handler(selectors[0], channel);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Handler implements Runnable{
ExecutorService pool = new ThreadPoolExecutor(5, 10,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
public Handler(Selector selector,SocketChannel channel) throws IOException {
channel.configureBlocking(false);
sk = channel.register(selectors[1], 0);
//將Handler作為callback對象
sk.attach(this);
//第二步,注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void handler() {
read();
write();
}
public void read(){};
public void write() {}
@Override
public void run() {
pool.execute(new Task());
}
class Task implements Runnable{
@Override
public void run() {
Handler.this.handler();
}
}
}
}
如上面程序所示,采用了兩個(gè)選擇器和兩個(gè)Reactor處理器來處理IO事件,并且在handler處理線程中使用多線程,是的handler處理線程和Reactor線程分離。在Netty實(shí)現(xiàn)中也是將接收請求處理事件和IO讀寫事件分別用不同的反應(yīng)器實(shí)現(xiàn)的。
Reactor有很多的好處,首先是響應(yīng)快,線程之間不是阻塞的,reactor處理線程復(fù)用性高,一個(gè)線程可以處理多個(gè)事件??傊畆eactor好處多多。
當(dāng)然reactor的復(fù)用模式需要操作系統(tǒng)的支持,如果是靠自己實(shí)現(xiàn)就沒有那么高效了,并且reactor也一定情況下增加了編程的復(fù)雜度,不過這些都不足以讓我們拒絕這樣一個(gè)優(yōu)秀的設(shè)計(jì)模式。
對于IO編程也是初窺門入,后面也將對netty源碼進(jìn)行深入的研究,希望能夠有一些更加深刻的體會。