最近看了Java的IO包源碼,對(duì)BIO有了較深入的理解。Socket編程其實(shí)也是基于IO流操作,并且其流操作都是阻塞的,就想著寫(xiě)一個(gè)Socket程序并對(duì)其一步一步優(yōu)化,來(lái)加深對(duì)IO的理解。本文主要從簡(jiǎn)單的Socket連接開(kāi)始,一步一步優(yōu)化,最后使用線程池等技術(shù)提高并發(fā)。Socket源碼本篇未涉及,等有時(shí)間我再研究一番。
一. 基本概念
Socket編程的基本流程如下圖(圖片來(lái)自網(wǎng)絡(luò)),一個(gè)IP地址和一個(gè)端口號(hào)稱為一個(gè)套接字(socket)。

Socket編程是BIO的,對(duì)于服務(wù)端,accept()、read()、write()都會(huì)堵塞。
- accept是阻塞的,只有新連接來(lái)了,accept才會(huì)返回,主線程才能繼續(xù)
- read是阻塞的,只有請(qǐng)求消息來(lái)了,read才能返回,子線程才能繼續(xù)處理
- write是阻塞的,只有客戶端把消息收了,write才能返回,子線程才能繼續(xù)讀取下一個(gè)請(qǐng)求
Socket開(kāi)發(fā)Java提供了兩個(gè)類,Socket用于BIO連接和信息收發(fā),ServerSocket用于構(gòu)建一個(gè)服務(wù)端,其accept()方法獲得一個(gè)Socket對(duì)象,最終客戶端服務(wù)器都是使用Socket進(jìn)行通信。
二. 最基本的Socket
如下,最基本的客戶端發(fā)送消息,服務(wù)端接收消息輸入。需要注意的是,由于中文的utf8編碼是3個(gè)字節(jié),如果使用buffer來(lái)分段接收字節(jié)流,可能導(dǎo)致亂碼。另外,read()是堵塞的,如果不判斷read() == -1來(lái)表示結(jié)束,那么read()方法會(huì)一直堵塞。
package me.zebin.demo.javaio;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class JavaioApplicationTests {
@Test
public void server() throws Exception {
// 指定端口
ServerSocket ss = new ServerSocket(9999);
System.out.println("server starting...");
// 等待連接
Socket s = ss.accept();
// 獲取輸入流,接收客戶端的消息
InputStream is = s.getInputStream();
// 緩存buffer,utf8編碼中文是3個(gè)字節(jié),這里也可是使用BufferedReader解碼
byte[] buffer = new byte[5];
while(true){
int cnt = is.read(buffer);
// 如果不判斷流結(jié)束,上面的read()讀不到數(shù)據(jù)會(huì)一直堵塞
if(cnt == -1){
break;
}
String str = new String(buffer, 0, cnt, "utf8");
System.out.println(str);
}
s.close();
ss.close();
}
@Test
public void client() throws Exception{
// 指定端口
Socket s = new Socket("127.0.0.1", 9999);
// 獲取輸出流,向服務(wù)端發(fā)消息
OutputStream os = s.getOutputStream();
// 發(fā)送消息,utf8編碼中文是3個(gè)字節(jié),服務(wù)端使用buffer可能導(dǎo)致亂碼
String str = "我是客戶端";
os.write(str.getBytes("utf8"));
s.close();
}
}
以上程序,如果buffer設(shè)置為5,運(yùn)行結(jié)果如下,出現(xiàn)亂碼。

當(dāng)然,解決方案可以整行讀取,將InputStream轉(zhuǎn)為Reader再轉(zhuǎn)BufferedReader即可讀取一行。也可使用Scanner來(lái)解決,服務(wù)端代碼改為如下:
@Test
public void server() throws Exception {
// 指定端口
ServerSocket ss = new ServerSocket(9999);
System.out.println("server starting...");
// 等待連接
Socket s = ss.accept();
// 獲取輸入流,接收客戶端的消息
InputStream is = s.getInputStream();
// 輸入字節(jié)流封裝為Scanner,讀取整行
Scanner sc = new Scanner(is, "utf8");
while (sc.hasNextLine()){
System.out.println(sc.nextLine());
}
s.close();
ss.close();
}
運(yùn)行結(jié)果如下,沒(méi)有亂碼了。

服務(wù)端判斷流關(guān)閉,一般使用兩種方法。
- 使用特殊符號(hào):既然上面可以獲取到行,服務(wù)端客戶端就可以約定相關(guān)的結(jié)束符,如接收到一個(gè)空行就結(jié)束,服務(wù)端進(jìn)行判斷關(guān)閉流即可。
- 使用長(zhǎng)度界定:類似http協(xié)議就有content-length界定結(jié)束符,我們也可以在客戶端發(fā)送byte[]數(shù)組前,在byte[]數(shù)據(jù)前兩個(gè)字節(jié)標(biāo)識(shí)消息長(zhǎng)度。當(dāng)然,兩個(gè)字節(jié)能表示的消息長(zhǎng)度就只有2^16-1,即大小是2^16字節(jié),即64k大小。
三. 多線程版本
上面的版本有一個(gè)弊端,就是一個(gè)服務(wù)器只能提供給一個(gè)客戶端進(jìn)行連接,如果將連接的用線程處理,服務(wù)器可以處理更多的客戶端連接,代碼如下:
@Test
public void server() throws Exception {
// 指定端口
ServerSocket ss = new ServerSocket(9998);
System.out.println("server starting...");
while(true){
// 等待連接
Socket s = ss.accept();
System.out.println("獲得連接");
Thread t = new Thread(new ServerThread(s));
t.start();
}
}
class ServerThread implements Runnable{
private Socket s;
ServerThread(Socket s){
this.s = s;
}
@Override
public void run(){
// 獲取輸入流,接收客戶端的消息
InputStream is = null;
try {
is = s.getInputStream();
// 使用Scanner封裝
Scanner sc = new Scanner(is, "utf8");
while (sc.hasNextLine()){
System.out.println(sc.nextLine());
}
s.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
四. 線程池版本
以上多線程版本我們使用了多線程來(lái)處理并發(fā),不過(guò)線程的創(chuàng)建和銷毀都會(huì)消耗大量的資源和時(shí)間,同時(shí),高并發(fā)下會(huì)創(chuàng)建非常多的線程,且不說(shuō)操作系統(tǒng)能開(kāi)啟的線程數(shù)有限,操作系統(tǒng)維護(hù)和切換大量的線程也會(huì)非常耗時(shí)。所以使用線程池,只用4個(gè)線程,用隊(duì)列將未執(zhí)行到的線程排隊(duì)處理,減少了線程數(shù)量,同時(shí)也避免了創(chuàng)建和銷毀線程帶來(lái)的性能問(wèn)題。
@Test
public void server() throws Exception {
// 指定端口
ServerSocket ss = new ServerSocket(9998);
System.out.println("server starting...");
// 創(chuàng)建線程隊(duì)列
BlockingQueue bq = new ArrayBlockingQueue(100);
// 拒絕策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
Executor executor = new ThreadPoolExecutor(4, 8, 1, TimeUnit.MINUTES, bq, handler);
while(true){
// 等待連接
Socket s = ss.accept();
System.out.println("獲得連接");
Thread t = new Thread(new ServerThread(s));
executor.execute(t);
}
}
以上,本篇結(jié)束。
參考資料
- 通俗地講,Netty 能做什么? - 知乎用戶的回答 - 知乎
https://www.zhihu.com/question/24322387/answer/282001188 - https://juejin.im/post/5ad9dd61518825671c0e1d71