Java BIO編程#
BIO - 阻塞IO。 即Java的遠程IO
java學(xué)習(xí)交流群:737251827 進群可領(lǐng)取學(xué)習(xí)資源及對十年開發(fā)經(jīng)驗大佬提問,免費解答!
IO模型#
BIO線程模型:#
NIO模型(簡單描述):#
IO模型應(yīng)用場景#
Java BIO基本介紹#
Java BIO 工作機制#
Java BIO 應(yīng)用案例#
// 代碼示例:
public class BIOService {
? ? public static void main(String[] args) throws IOException {
? ? ? ? // 功能需求:
? ? ? ? // 使用BIO模型編寫一個服務(wù)器,監(jiān)聽6666窗口,當(dāng)有客戶端連接時,就啟動一個客戶端線程與之通信.
? ? ? ? // 要求使用線程連接機制,可以連接多個客戶端.
? ? ? ? // 服務(wù)器端可以接受客戶端發(fā)送的數(shù)據(jù)(telnet方式即可)
? ? ? ? //1. 首先建立一個線程池.
? ? ? ? ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
? ? ? ? //2. 建立一個監(jiān)聽服務(wù),來監(jiān)聽客戶端連接
? ? ? ? ServerSocket serverSocket = new ServerSocket(6666);
? ? ? ? System.out.println("服務(wù)器啟動成功");
? ? ? ? while (true) {
? ? ? ? ? ? // 監(jiān)聽,等待客戶端連接
? ? ? ? ? ? final Socket socket = serverSocket.accept();
? ? ? ? ? ? System.out.println("客戶端連接了.");
? ? ? ? ? ? //連接了之后,給這個用戶創(chuàng)建一個線程用于通信.
? ? ? ? ? ? newCachedThreadPool.execute(new Runnable() {
? ? ? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? ? ? //從寫run方法. 接受客戶端發(fā)送的消息.打印到控制臺.
? ? ? ? ? ? ? ? ? ? handler(socket);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? }
? ? }
? ? private static void handler(Socket socket) {
? ? ? ? byte[] bytes = new byte[1024];
? ? ? ? try (InputStream inputStream = socket.getInputStream()) {
? ? ? ? ? ? while (true) { //通過socket獲取到輸入流
? ? ? ? ? ? ? ? int read = inputStream.read(bytes);
? ? ? ? ? ? ? ? if (read != -1) { // 如果在讀的過程中,打印出字節(jié).
? ? ? ? ? ? ? ? ? ? System.out.println(Arrays.toString(bytes));
? ? ? ? ? ? ? ? } else {//讀完之后,退出循環(huán)
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally {
? ? ? ? ? ? // 我試試會報錯不會.不關(guān)閉流,但是實用的try- which - resource
? ? ? ? ? ? System.out.println("關(guān)閉連接");
? ? ? ? }
? ? }
}
Java BIO問題分析#
Java NIO編程#
JavaNIO基本介紹#
NIO中的Channel 相當(dāng)于 BIO當(dāng)中的serverSocket。 非阻塞 是通過Buffer實現(xiàn)的。
NIO Buffer的基本使用 案例介紹:
? public class BasicBuffer {
? ? public static void main(String[] args) {
? ? ? ? IntBuffer intBuffer = IntBuffer.allocate(5);
? ? ? ? intBuffer.put(1);
? ? ? ? intBuffer.put(2);
? ? ? ? intBuffer.put(3);
? ? ? ? intBuffer.put(4);
? ? ? ? intBuffer.put(5);
? ? ? ? intBuffer.flip();? // 轉(zhuǎn)換讀寫操作.
? ? ? ? while (intBuffer.hasRemaining()) {
? ? ? ? ? ? int i = intBuffer.get();
? ? ? ? ? ? System.out.println(i);
? ? ? ? }
? ? }
}
NIO和BIO的比較#
NIO三大核心原理示意圖#
Selector 、 Channel 和Buffer的關(guān)系圖的說明
每個channel都會對應(yīng)一個Buffer
Selector會對應(yīng)一個線程。一個線程對應(yīng)多個channel(連接)
該圖反應(yīng)了有三個channel注冊到了該selector。
程序切換到哪個channel,是由事件決定的。Event是一個重要的概念。(后續(xù)會學(xué)習(xí)都有哪些事件)
selector會根據(jù)不同的事件,在各個通道上切換。
Buffer就是一個內(nèi)存塊,底層是有一個數(shù)組
數(shù)據(jù)的讀取寫入是通過Buffer,這個和BIO是有本質(zhì)不同的。BIO中對于一個流而言,要么是輸入流或者是輸出流,不會是雙向流動的。但是NIO的BUffer是可以讀,也可以寫的。但是需要使用flip()切換。
Channel也是雙向的??梢苑磻?yīng)底層操作系統(tǒng)的情況。比如說Linux,底層的操作系統(tǒng)通到就是雙向的。
NIO三大核心之—Buffer#
Buffer基本介紹#
Buffer類及其子類 API#
Buffer API#
ByteBuffer API#
NIO三大核心之—Channel#
基本介紹#
ServerSocketChannel 類似ServerSocket
ServerChannel類似Server
舉例:FileChannel類#
實現(xiàn)流程示意圖:
1.應(yīng)用實例:?
本地文件寫數(shù)據(jù)。 代碼實現(xiàn):
public class NIOFileBuffer {
? ? public static void main(String[] args) throws IOException {
? ? ? ? //將"hello,二娃"寫入到hello.txt文件中
? ? ? ? String str = "hello,二娃";
? ? ? ? // 首先要創(chuàng)建一個輸出流:
? ? ? ? FileOutputStream fileOutputStream = new FileOutputStream("hello.txt");
? ? ? ? //創(chuàng)建一個fileChannel通道
? ? ? ? FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
? ? ? ? //創(chuàng)建一個ByteBuffer,將字符串寫入到Buffer中
? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
? ? ? ? byteBuffer.put(str.getBytes());
? ? ? ? //要對byteBuffer進行一個翻轉(zhuǎn)
? ? ? ? byteBuffer.flip();
? ? ? ? //將byteBuffer寫入到fileChannel中
? ? ? ? fileOutputStreamChannel.write(byteBuffer);
? ? ? ? //關(guān)閉流
? ? ? ? fileOutputStream.close();
? ? }
}
2. 本地文件讀數(shù)據(jù):?
? //創(chuàng)建一個輸入流,讀取文件內(nèi)容
? ? ? ? File file = new File("hello.txt");
? ? ? ? FileInputStream fileInputStream = new FileInputStream(file);
? ? ? ? //獲取到輸入流通到
? ? ? ? FileChannel fileInputStreamChannel = fileInputStream.getChannel();
? ? ? ? //準(zhǔn)備一個byteBuffer
? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
? ? ? ? //將管道中的數(shù)據(jù)放入到byteBuffer中
? ? ? ? fileInputStreamChannel.read(byteBuffer);
? ? ? ? //輸出內(nèi)容
? ? ? ? System.out.println(new String(byteBuffer.array()));
? ? ? ? fileInputStream.close();
3.使用一個Buffer完成文件的讀取。? 把文件A中的內(nèi)容讀取到,寫入到文件B中。 示意圖如上.代碼如下:
//用一個Buffer完成文件的讀寫
try (
? ? ? FileInputStream fileInputStream = new FileInputStream(new File("hello.txt"));
? ? ? FileChannel fileInputStreamChannel = fileInputStream.getChannel();
? ? ? FileOutputStream fileOutputStream = new FileOutputStream(new File("hello2.txt"));
? ? ? FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
) {
? ? ? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(512);
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? byteBuffer.clear();
? ? ? ? ? ? ? ? int read = fileInputStreamChannel.read(byteBuffer);
? ? ? ? ? ? ? ? if (read == -1) {
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? byteBuffer.flip();
? ? ? ? ? ? ? ? fileOutputStreamChannel.write(byteBuffer);
? ? ? ? ? ? }
? ? ? ? }
4.拷貝文件。使用transferFrom方法try(
? ? ? ? // 使用拷貝方法,拷貝一個圖片
? ? ? ? FileInputStream fileInputStream = new FileInputStream(new File("hello.txt"));
? ? ? ? FileChannel fileInputStreamChannel = fileInputStream.getChannel();
? ? ? ? FileOutputStream fileOutputStream = new FileOutputStream(new File("hello2.txt"));
? ? ? ? FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
? ? ? ? ){
? ? ? ? ? fileOutputStreamChannel.transferFrom(fileInputStreamChannel,0,fileInputStreamChannel.size());
? ? ? ? }
關(guān)于Buffer和Channel的注意事項和細節(jié)#
注意事項要注意。
1.Buffer支持類型化。 put的什么類型,讀取的時候就要get相應(yīng)的類型。 舉例說明:public static void main(String[] args) {
? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(64);
? ? ? ? byteBuffer.putInt(123);
? ? ? ? byteBuffer.putChar('a');
? ? ? ? byteBuffer.putLong(10L);
? ? ? ? byteBuffer.putShort((short)234);
? ? ? ? byteBuffer.flip();
? ? ? ? System.out.println(byteBuffer.getInt());
? ? ? ? System.out.println(byteBuffer.getChar());
? ? ? ? System.out.println(byteBuffer.getLong());
? ? ? ? System.out.println(byteBuffer.getShort());?
? //順序如果不同,可能會導(dǎo)致程序拋出異常。java.nio.BufferUnderflowException
}
2. 可以將一個普通Buffer轉(zhuǎn)成只讀Buffer。只讀Buffer只能讀。寫操作時會拋 ReadOnlyBufferException
? 舉例說明:
? public static void main(String[] args) {
? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(32);
? ? ? ? for (int i = 0; i < byteBuffer.capacity(); i++) {
? ? ? ? ? ? byteBuffer.put((byte) i);
? ? ? ? }
? ? ? ? byteBuffer.flip();
? ? ? ? ByteBuffer asReadOnlyBuffer = byteBuffer.asReadOnlyBuffer();
? ? ? ? while (asReadOnlyBuffer.hasRemaining()) {
? ? ? ? ? ? System.out.print(asReadOnlyBuffer.get()+ " ");
? ? ? ? }
? ? ? ? asReadOnlyBuffer.put((byte) 12); //已經(jīng)轉(zhuǎn)換成readBuffer。此時pur會拋異常ReadOnlyBufferException
? ? }
3.MappedByteBuffer? 作用: 可讓文件直接在內(nèi)部(堆外內(nèi)存)修改,操作系統(tǒng)不需要拷貝一次。
// 參數(shù)1. FileChannel.MapMode.READ_WRITE 使用的讀寫模式
? // 參數(shù)2 : 0 可以直接修改的起始位置
? // 參數(shù)3 : 5 是映射到內(nèi)存的大小(不是索引位置)。即將1.txt的多少個字節(jié)映射到內(nèi)存
? //可以直接修改的范圍就是0-5
? // MappedByteBuffer 的實際類型是 DirectByteBuffer
? public static void main(String[] args) throws Exception {
? ? ? ? try(
? ? ? ? // 獲取到一個文件, rw為可以讀寫的模式
? ? ? ? RandomAccessFile randomAccessFile = new RandomAccessFile("hello.txt","rw");
? ? ? ? FileChannel fileChannel = randomAccessFile.getChannel();
? ? ? ? ) {
? ? ? ? ? ? MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
? ? ? ? ? ? map.put(1, (byte) 'H');
? ? ? ? ? ? map.put(2, (byte) 'E');
? ? ? ? ? ? map.put(3, (byte) 'E');
? ? ? ? }
? ? }
4. Scattering 和 Gathering ; 分散和聚合。
? 之前我們都是使用一個Buffer來操作的。NIO還支持多個Buffer(即Buffer數(shù)組)來完成讀寫操作。即 分散和聚合。
//Scattering 將數(shù)據(jù)寫入到Buffer時,可以采用Buffer數(shù)組,依次寫入。[分散]
//Gathering? 從Buffer讀取數(shù)據(jù)時,可以采用Buffer數(shù)組,依次讀【聚合】
//這次使用 ServerSocketChannel 和 SocketChannel 網(wǎng)絡(luò) 來操作。
? public static void main(String[] args) throws IOException {
? ? ? ? ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
? ? ? ? InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
? ? ? ? // 綁定端口到socket ,并啟動
? ? ? ? serverSocketChannel.socket().bind(inetSocketAddress);
? ? ? ? // 創(chuàng)建一個Buffer數(shù)組
? ? ? ? ByteBuffer[] byteBuffers = new ByteBuffer[2];
? ? ? ? byteBuffers[0] = ByteBuffer.allocate(5);
? ? ? ? byteBuffers[1] = ByteBuffer.allocate(3);
? ? ? ? //等待客戶端連接(使用telnet)
? ? ? ? SocketChannel socketChannel = serverSocketChannel.accept();
? ? ? ? System.out.println("連接成功");
? ? ? ? long messageLength = 8;
? ? ? ? //連接成功,循環(huán)讀取
? ? ? ? while (true) {
? ? ? ? ? ? int byteRead = 0;
? ? ? ? ? ? while (byteRead < messageLength) {
? ? ? ? ? ? ? ? long l = socketChannel.read(byteBuffers);
? ? ? ? ? ? ? ? byteRead += l;
? ? ? ? ? ? ? ? System.out.println("當(dāng)前的byteRead: " + byteRead);
? ? ? ? ? ? ? ? //使用流打印,打印出當(dāng)前的Buffer中的? limit , position
? ? ? ? ? ? ? ? Arrays.stream(byteBuffers).map(byteBuffer -> "position" + byteBuffer.position() + ", limit "
? ? ? ? ? ? ? ? ? ? ? ? + byteBuffer.limit()).forEach(System.out::println);
? ? ? ? ? ? }
? ? ? ? ? ? //將所有的Buffer進行flip
? ? ? ? ? ? Arrays.stream(byteBuffers).map(ByteBuffer::flip);
? ? ? ? ? ? //將數(shù)據(jù)讀出返回給客戶端
? ? ? ? ? ? long byteWrite = 0;
? ? ? ? ? ? while (byteWrite < messageLength) {
? ? ? ? ? ? ? ? long write = socketChannel.write(byteBuffers);
? ? ? ? ? ? ? ? byteWrite += write;
? ? ? ? ? ? }
? ? ? ? ? ? //將所有的BUffer進行clean
? ? ? ? ? ? Arrays.stream(byteBuffers).map(ByteBuffer::clear);
? ? ? ? ? ? System.out.println("readLength " + byteRead + "writeLength " + byteWrite);
? ? ? ? }
? ? }
NIO三大核心之—Selector#
Selector基本介紹#
selector API#
selector類中實現(xiàn)的方法及其方法功能的說明。列出來功能,更能方便的使用。
重點記著- open方法,返回一個selector。
NIO 非阻塞網(wǎng)絡(luò)編程原理分析圖
對下圖的說明:
當(dāng)客戶端連接時,會通過serverSocketChannel得到一個對應(yīng)的SocketChannel
Selector進行監(jiān)聽(使用Select方法),返回有事件發(fā)生的通道的個數(shù)。
將socketChannel注冊到selector上。一個selector上可以注冊多個socketChannel。(
SelectableChannel.register(Selectoe sel, int ops))。ops參數(shù)的說明:有4個狀態(tài)。
注冊后返回一個SelectionKey,會和該selector關(guān)聯(lián)(集合的方式關(guān)聯(lián))。
進一步得到各個SelectionKey(有事件發(fā)生的的SelectionKey)
再通過SelectionKey反向獲取注冊的socketChannel。(使用SelectionKey.channel()方法)
可以得到channel,完成業(yè)務(wù)處理。
實例代碼案例演示:? NIO非阻塞網(wǎng)絡(luò)編程通訊?
服務(wù)器端:
public static void main(String[] args) throws IOException {? ? ? ? // NIO非阻塞網(wǎng)絡(luò)編程通訊? -- 服務(wù)器端
//? ? ? ? 1. 創(chuàng)建serverSocketChannel
? ? ? ? ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//? ? ? ? 2. 得到一個Selector對象
? ? ? ? Selector selector = Selector.open();
//? ? ? ? 3. 綁定一個端口6666, 在服務(wù)器端監(jiān)聽
? ? ? ? serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//? ? ? ? 4. 設(shè)置為非阻塞
? ? ? ? serverSocketChannel.configureBlocking(false);
//? ? ? ? 5. 把serverSocketChannel注冊到Selector,關(guān)心事件op_accept
? ? ? ? serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//? ? ? ? 6. 循環(huán)等待客戶端連接
? ? ? ? while (true) {
? ? ? ? ? ? // 等待一秒鐘,如果沒有客戶端事件發(fā)生,不等待了。
? ? ? ? ? ? if ((selector.select(1000) == 0)) {
? ? ? ? ? ? ? ? //沒有事件發(fā)生
? ? ? ? ? ? ? ? System.out.println("服務(wù)器上一秒中,沒有客戶端連接");
? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? }
? ? ? ? ? ? // 如果返回的>0 ,就獲取到相關(guān)的 selectionKeys集合。
? ? ? ? ? ? Set<SelectionKey> selectionKeys = selector.selectedKeys();
? ? ? ? ? ? Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
? ? ? ? ? ? // 通過selectionKeys反向獲取通道,處理業(yè)務(wù)
? ? ? ? ? ? while (selectionKeyIterator.hasNext()) {
? ? ? ? ? ? ? ? // 獲取selectionKey
? ? ? ? ? ? ? ? SelectionKey selectionKey = selectionKeyIterator.next();
? ? ? ? ? ? ? ? // 根據(jù)key對應(yīng)的通道事件,做相應(yīng)的處理
? ? ? ? ? ? ? ? if (selectionKey.isAcceptable()) {
? ? ? ? ? ? ? ? ? ? //給此客戶端分配一個socketChannel
? ? ? ? ? ? ? ? ? ? SocketChannel socketChannel = serverSocketChannel.accept();
? ? ? ? ? ? ? ? ? ? System.out.println("客戶端連接了, " + selectionKey.hashCode());
? ? ? ? ? ? ? ? ? ? socketChannel.configureBlocking(false);
? ? ? ? ? ? ? ? ? ? //將此channel注冊到 selector上, 關(guān)注read事件
? ? ? ? ? ? ? ? ? ? socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (selectionKey.isReadable()) { //發(fā)生了 read事件
? ? ? ? ? ? ? ? ? ? //通過key,反向獲取到對應(yīng)的channel
? ? ? ? ? ? ? ? ? ? SocketChannel channel = (SocketChannel) selectionKey.channel();
? ? ? ? ? ? ? ? ? ? //獲取到該key的buffer
? ? ? ? ? ? ? ? ? ? ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
? ? ? ? ? ? ? ? ? ? channel.read(byteBuffer);
? ? ? ? ? ? ? ? ? ? System.out.println("from 客戶端 : " + new String(byteBuffer.array()));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? //手動移除key
? ? ? ? ? ? ? ? selectionKeyIterator.remove();
? ? ? ? ? ? }
? ? ? ? }
? ? }
客戶端:
public static void main(String[] args) throws IOException {
//? ? ? ? 1. 得到一個網(wǎng)絡(luò)通道
? ? ? ? SocketChannel socketChannel = SocketChannel.open();
//? ? ? ? 2. 提供非阻塞
? ? ? ? socketChannel.configureBlocking(false);
//? ? ? ? 3. 提供服務(wù)器端的IP和端口
? ? ? ? InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//? ? ? ? 4. 連接服務(wù)器
? ? ? ? if (!socketChannel.connect(inetSocketAddress)) {
? ? ? ? ? ? //? ? ? ? 連接不成功, 打印一句話,代表這時候不阻塞,可以去做別的事情
? ? ? ? ? ? while (!socketChannel.finishConnect()) {
? ? ? ? ? ? ? ? System.out.println("客戶端連接未成功,先去干別的事情了");
? ? ? ? ? ? }
? ? ? ? }
//? ? ? ? 5. 如果連接成功,發(fā)送數(shù)據(jù)。 通過ByteBuffer.wrap (根據(jù)字節(jié)的大小自動放入到Buffer中。)
? ? ? ? String str = "hello,二娃";
? ? ? ? ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
//? ? ? ? 6. 發(fā)送數(shù)據(jù)。將Buffer數(shù)據(jù)寫入channel。
? ? ? ? socketChannel.write(byteBuffer);
? ? ? ? System.in.read();
? ? }
SelectionKey API
每注冊一個客戶端,會出現(xiàn)一個新的channel ,selectionkey.keys()就會增加1
selectionKeys.size() ; 活動的channel的個數(shù)。
selectionkeys.keys(); 總的channel的個數(shù)。
注意,這時候我看了一下源碼, selector真正的實現(xiàn)方法已經(jīng)和視頻中老師的不一樣了。
下圖是老師視頻中的 和 我自己的方法對比。 原因是 老師的電腦是Windows,我的是Mac
ServerSocketChannel API#
SocketChannel API#
NIO網(wǎng)絡(luò)編程應(yīng)用實例-群聊系統(tǒng)#
完成這個群聊系統(tǒng)的代碼案例
開發(fā)流程:
1. 先編寫服務(wù)器端
? 1.1 服務(wù)器啟動并監(jiān)聽6667
? 1.2 服務(wù)器接受客戶端信息,并實現(xiàn)轉(zhuǎn)發(fā)【處理上線和離線】
2.編寫客戶端
? 2.1 連接服務(wù)器
? 2.2 發(fā)送消息
? 2.3 接受服務(wù)器的消息
? 1.初始化構(gòu)造器,
? 2. 監(jiān)聽
服務(wù)器端代碼:
/**
* weChat服務(wù)器端
* 1. 先編寫服務(wù)器端
*? 1.1 服務(wù)器啟動并監(jiān)聽6667
*? 1.2 服務(wù)器接受客戶端信息,并實現(xiàn)轉(zhuǎn)發(fā)【處理上線和離線】
*/
public class weCharServer {
? ? private ServerSocketChannel listenSocketChannel ;
? ? private Selector selector;
? ? private static? final? int PORT = 6666;
? ? public weCharServer() throws IOException {
? ? ? ? //1. 得到選擇器
? ? ? ? selector = Selector.open();
? ? ? ? //2. 得到 serverSocketChannel
? ? ? ? listenSocketChannel = ServerSocketChannel.open();
? ? ? ? //3. 綁定端口
? ? ? ? listenSocketChannel.socket().bind(new InetSocketAddress(PORT));
? ? ? ? //4. 設(shè)置非阻塞
? ? ? ? listenSocketChannel.configureBlocking(false);
? ? ? ? //5. 注冊
? ? ? ? listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
? ? }
? ? /**
? ? * 監(jiān)聽
? ? */
? ? public void listen(){
? ? ? ? try {
? ? ? ? while (true) {
? ? ? ? ? ? ? ? int count = selector.select(2000);
? ? ? ? ? ? if (count > 0) {
? ? ? ? ? ? ? ? //有事件處理
? ? ? ? ? ? ? ? //遍歷得到selectionKeys集合
? ? ? ? ? ? ? ? Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
? ? ? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? ? ? //取出selectionKey
? ? ? ? ? ? ? ? ? ? SelectionKey key = iterator.next();
? ? ? ? ? ? ? ? ? ? //監(jiān)聽到accept
? ? ? ? ? ? ? ? ? ? if (key.isAcceptable()) {
? ? ? ? ? ? ? ? ? ? ? ? SocketChannel sc = listenSocketChannel.accept();
? ? ? ? ? ? ? ? ? ? ? ? //將 該 SocketChannel注冊到 selector 上
? ? ? ? ? ? ? ? ? ? ? ? sc.configureBlocking(false);
? ? ? ? ? ? ? ? ? ? ? ? sc.register(selector, SelectionKey.OP_READ);
? ? ? ? ? ? ? ? ? ? ? ? //提示上線
? ? ? ? ? ? ? ? ? ? ? ? System.out.println(sc.getRemoteAddress() + "上線了");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? if (key.isReadable()) {
? ? ? ? ? ? ? ? ? ? ? ? //通道發(fā)送read事件,即通道是刻度的狀態(tài)
? ? ? ? ? ? ? ? ? ? ? ? keyRead(key);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? iterator.remove();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? private void keyRead(SelectionKey key) {
? ? ? ? SocketChannel channel = null;
? ? ? ? try {
? ? ? ? ? ? //根據(jù)key得到channel
? ? ? ? ? ? channel = (SocketChannel) key.channel();
? ? ? ? ? ? //創(chuàng)建Buffer
? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.allocate(1024);
? ? ? ? ? ? int read = channel.read(buffer);
? ? ? ? ? ? //根據(jù)read只,做處理
? ? ? ? ? ? if (read > 0) {
? ? ? ? ? ? ? ? //把緩存區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串
? ? ? ? ? ? ? ? String msg = new String(buffer.array());
? ? ? ? ? ? ? ? System.out.println("from 客戶端 : " + msg);
? ? ? ? ? ? ? ? //向其他客戶轉(zhuǎn)發(fā)消息
? ? ? ? ? ? ? ? sendInfoToOtherClient(msg,channel);
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? System.out.println(channel.getRemoteAddress() + " 離線了");
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ex.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? private void sendInfoToOtherClient(String msg, SocketChannel self) throws IOException {
? ? ? ? System.out.println("服務(wù)器轉(zhuǎn)發(fā)消息中...");
? ? ? ? //遍歷所有注冊到selector上的socketChannel,并排除self
? ? ? ? for (SelectionKey key : selector.keys()) {
? ? ? ? ? ? //通過key取出對應(yīng)的socketChannel
? ? ? ? ? ? SelectableChannel targetChannel = key.channel();
? ? ? ? ? ? //排除自己
? ? ? ? ? ? if (targetChannel instanceof SocketChannel && targetChannel != self) {
? ? ? ? ? ? ? ? //將Buffer中的數(shù)據(jù)寫入通道
? ? ? ? ? ? ? ? ((SocketChannel) targetChannel).write(ByteBuffer.wrap(msg.getBytes()));
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException {
? ? ? ? weCharServer weCharServer = new weCharServer();
? ? ? ? weCharServer.listen();
? ? }
}
客戶端代碼:
public class weChatClient {
? ? private SocketChannel socketChannel;
? ? private String username;
? ? private Selector selector;
? ? public weChatClient() throws IOException {
? ? ? ? selector = Selector.open();
? ? ? ? socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
? ? ? ? //設(shè)置為非阻塞
? ? ? ? socketChannel.configureBlocking(false);
? ? ? ? //注冊
? ? ? ? socketChannel.register(selector, SelectionKey.OP_READ);
? ? ? ? username = socketChannel.getLocalAddress().toString().substring(1);
? ? ? ? System.out.println("username : " + username);
? ? }
? ? //向服務(wù)器發(fā)送消息
? ? public void senInfo(String info) {
? ? ? ? info = username + " 說 : " + info;
? ? ? ? try {
? ? ? ? ? ? socketChannel.write(ByteBuffer.wrap(info.getBytes()));
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? //從服務(wù)器讀取消息
? ? public? void readInfo(){
? ? ? ? try {
? ? ? ? ? ? int readChannels = selector.select();
? ? ? ? ? ? if (readChannels > 0) {
? ? ? ? ? ? ? ? //有可用的通道
? ? ? ? ? ? ? ? Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
? ? ? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? ? ? SelectionKey key = iterator.next();
? ? ? ? ? ? ? ? ? ? if (key.isReadable()) { //讀事件
? ? ? ? ? ? ? ? ? ? ? ? //得到相關(guān)的通道
? ? ? ? ? ? ? ? ? ? ? ? SocketChannel sc = (SocketChannel) key.channel();
? ? ? ? ? ? ? ? ? ? ? ? //得到一個緩沖區(qū)
? ? ? ? ? ? ? ? ? ? ? ? ByteBuffer allocate = ByteBuffer.allocate(1024);
? ? ? ? ? ? ? ? ? ? ? ? sc.read(allocate);
? ? ? ? ? ? ? ? ? ? ? ? //把讀取的數(shù)據(jù)轉(zhuǎn)換成字符換
? ? ? ? ? ? ? ? ? ? ? ? String msg = new String(allocate.array());
? ? ? ? ? ? ? ? ? ? ? ? System.out.println(msg.trim());
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws IOException {
? ? ? ? //啟動一個客戶端
? ? ? ? weChatClient chatClient = new weChatClient();
? ? ? ? //啟動一個線程,每三秒讀取從服務(wù)器發(fā)送的數(shù)據(jù)
? ? ? ? new Thread(() -> {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? chatClient.readInfo();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.currentThread().sleep(3000);
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }).start();
? ? ? ? //發(fā)送消息給服務(wù)器端
? ? ? ? Scanner scanner = new Scanner(System.in);
? ? ? ? while (scanner.hasNextLine()) {
? ? ? ? ? ? chatClient.senInfo(scanner.nextLine());
? ? ? ? }
? ? }
}
NIO與零拷貝#
零拷貝,是指從操作系統(tǒng)看的,不經(jīng)過CPU拷貝。
什么是DMA(direct memory access)? 直接內(nèi)存拷貝(不適用CPU)。
傳統(tǒng)IO數(shù)據(jù)讀寫#
什么是DMA(direct memory access)? 直接內(nèi)存拷貝(不適用CPU)
傳統(tǒng)的IO:使用了4次拷貝,3次狀態(tài)的轉(zhuǎn)換。
mmap優(yōu)化#
mmap優(yōu)化:使用了3次拷貝,3次狀態(tài)切換。
sendFile優(yōu)化#
sendFile 優(yōu)化: 使用3次拷貝,2次狀態(tài)切換。
sendFile 進一步優(yōu)化: 使用2次拷貝,2次上下文狀態(tài)切換。
這里還是有一次CPU拷貝的。 從kernel buffer -> socket buffer . 但是拷貝的信息很少。比如 length ,offet ,消耗低,可以忽略。
mmap 和 sendFile的區(qū)別#
NIO零拷貝案例#
transferTo注意事項 :? 1. 在Linux下,一個transferTo方法就可以傳輸完、? 2. 在Windows下一次調(diào)用transferTo只能傳輸8M,而且要注意傳輸時的位置。? ? 使用方法:fileChannel.transferTo(0,fileChannel.size(),socketChannel); 從0開始傳,傳多少個。