本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog/camile
| 版本 | 日期 | 備注 |
|---|---|---|
| 1.0 | 2019.4.27 | 文章首發(fā) |
| 1.1 | 2021.5.21 | 修改標題:從一段代碼談起——淺談JavaIO接口-> 談談代碼:Java IO業(yè)務代碼優(yōu)化之路
|
| 1.1 | 2022.2.14 | 部分章節(jié)調(diào)整 |
1.前言
前陣子休息天日常在尋找項目里不好的代碼,看到了這樣的一段代碼:
private Result sshSameExec(Session session, String cmd) {
if (log.isDebugEnabled()) {
log.debug("shell command: {}", cmd);
}
UserInfo ui = getUserInfo();
session.setUserInfo(ui);
int exitStatus = 0;
StringBuilder builder = new StringBuilder();
ChannelExec channel;
InputStream in;
InputStream err;
try {
session.connect(connectTimeout);
channel = (ChannelExec) session.openChannel("exec");
channel.setCommand(cmd);
in = channel.getInputStream();
err = channel.getErrStream();
channel.connect();
} catch (Exception e) {
throw new CloudRuntimeException(e);
}
try {
long lastRead = Long.MAX_VALUE;
byte[] tmp = new byte[1024];
while (true) {
while (in.available() > 0 || err.available() > 0) {
int i = 0;
if (in.available() > 0) {
i = in.read(tmp, 0, 1024);
} else if (err.available() > 0) {
i = err.read(tmp, 0, 1024);
}
if (i < 0) {
break;
}
lastRead = System.currentTimeMillis();
builder.append(new String(tmp, 0, i));
}
if (channel.isClosed()) {
if (in.available() > 0) {
continue;
}
exitStatus = channel.getExitStatus();
break;
}
if (System.currentTimeMillis() - lastRead > exeTimeout) {
break;
}
}
} catch (IOException e) {
throw new CloudRuntimeException(e);
} finally {
channel.disconnect();
session.disconnect();
}
if (0 != exitStatus) {
return Result.createByError(ErrorData.builder()
.errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
.detail(builder.toString())
.title(ResultCode.EXECUTE_SSH_FAIL.toString())
.build());
} else {
return Result.createBySuccess(builder.toString());
}
}
簡單解釋一下這段代碼——即通過ssh到一臺機器上,然后執(zhí)行一些命令.對命令輸出的東西,開了一個循環(huán),每一次讀一定的位置,然后以字節(jié)流的形式讀回來.
這段代碼有點丑,于是我聞到了學習的味道.

首先是對兩個Stream的消費,很顯然,在多核環(huán)境下,我們同時也只能夠消費其中一個Stream.其次,這代碼太挫了,自己定義一個tmp,然后1024、1024這樣的去取出來.
在改良之前,我們先來回顧一下JavaIO的接口定義.
2.JavaIO 接口知識回顧
2.1 低級抽象接口:InputStream 和 OutputStream
這里有同學可能問了,為啥叫它低抽象接口呢?因為它離底層太近了,計算機本來就是處理二進制的,而這兩個接口正是用來處理二進制數(shù)據(jù)流的.
先簡單看一眼這兩個接口:
- InputStream
**
* This abstract class is the superclass of all classes representing
* an input stream of bytes.
*
* <p> Applications that need to define a subclass of <code>InputStream</code>
* must always provide a method that returns the next byte of input.
*
* @author Arthur van Hoff
* @see java.io.BufferedInputStream
* @see java.io.ByteArrayInputStream
* @see java.io.DataInputStream
* @see java.io.FilterInputStream
* @see java.io.InputStream#read()
* @see java.io.OutputStream
* @see java.io.PushbackInputStream
* @since JDK1.0
*/
public abstract class InputStream implements Closeable {.....}
- OutputStream
/**
* This abstract class is the superclass of all classes representing
* an output stream of bytes. An output stream accepts output bytes
* and sends them to some sink.
* <p>
* Applications that need to define a subclass of
* <code>OutputStream</code> must always provide at least a method
* that writes one byte of output.
*
* @author Arthur van Hoff
* @see java.io.BufferedOutputStream
* @see java.io.ByteArrayOutputStream
* @see java.io.DataOutputStream
* @see java.io.FilterOutputStream
* @see java.io.InputStream
* @see java.io.OutputStream#write(int)
* @since JDK1.0
*/
public abstract class OutputStream implements Closeable, Flushable {...}
我們可以發(fā)現(xiàn),它們都實現(xiàn)了Closeable的接口.因此大家在使用這些原生類時,要注意在結(jié)束時調(diào)用Close方法哦.
這兩個接口的常用實現(xiàn)類有:
-
FileInputStream和FileOutputStream -
DataInputStream和DataOutputStream -
ObjectInputStream和ObjectOutputStream
2.2 高級抽象接口——Writer和Reader
為啥說它是高級抽象接口呢?我們先來看看它們的注釋:
- Writer
/**
* Abstract class for writing to character streams. The only methods that a
* subclass must implement are write(char[], int, int), flush(), and close().
* Most subclasses, however, will override some of the methods defined here in
* order to provide higher efficiency, additional functionality, or both.
*
* @see Writer
* @see BufferedWriter
* @see CharArrayWriter
* @see FilterWriter
* @see OutputStreamWriter
* @see FileWriter
* @see PipedWriter
* @see PrintWriter
* @see StringWriter
* @see Reader
*
* @author Mark Reinhold
* @since JDK1.1
*/
public abstract class Writer implements Appendable, Closeable, Flushable {
- Reader
/**
* Abstract class for reading character streams. The only methods that a
* subclass must implement are read(char[], int, int) and close(). Most
* subclasses, however, will override some of the methods defined here in order
* to provide higher efficiency, additional functionality, or both.
*
*
* @see BufferedReader
* @see LineNumberReader
* @see CharArrayReader
* @see InputStreamReader
* @see FileReader
* @see FilterReader
* @see PushbackReader
* @see PipedReader
* @see StringReader
* @see Writer
*
* @author Mark Reinhold
* @since JDK1.1
*/
public abstract class Reader implements Readable, Closeable {
我們可以看到,這個抽象類是用來面向character的,也就是字符.字符的抽象等級必然比字節(jié)高,因為字符靠近上層,即人類.
2.3 優(yōu)化輸入和輸出——Buffered
如果我們直接使用上述實現(xiàn)類去打開一個文件(如FileWriter、FileReader、FileInputStream、FileOutputStream),對其對象調(diào)用read、write、readLine等,每個請求都是由基礎OS直接處理的,這會使一個程序效率低得多——因為它們都會引發(fā)磁盤訪問or網(wǎng)絡請求等.
為了減少這種開銷,Java 平臺實現(xiàn)緩沖 I/O 流。緩沖輸入流從被稱為緩沖區(qū)(buffer)的存儲器區(qū)域讀出數(shù)據(jù);僅當緩沖區(qū)是空時,本地輸入 API 才被調(diào)用。同樣,緩沖輸出流,將數(shù)據(jù)寫入到緩存區(qū),只有當緩沖區(qū)已滿才調(diào)用本機輸出 API。
用于包裝非緩存流的緩沖流類有4個:BufferedInputStream和BufferedOutputStream·用于創(chuàng)建字節(jié)緩沖字節(jié)流,BufferedReader和BufferedWriter`用于創(chuàng)建字符緩沖字節(jié)流.
3. 著手優(yōu)化
之前,我們提到了這段代碼寫得搓的地方:
- 首先是對兩個Stream的消費,很顯然,在多核環(huán)境下,我們同時也只能夠消費其中一個Stream.
- 其次,這代碼太挫了,自己定義一個tmp,然后1024、1024這樣的去取出來.
故此,我們可以考慮對每個Stream都進行包裝,支持用線程去消費,其次我們可以用高級抽象分接口去適配Byte,然后去裝飾成Buffer.
接下來,我們來看一段ZStack里的工具類ShellUtils,為了節(jié)省篇幅,我們僅僅截出它在IDE里的Structure:

run方法的核心:

我們可以看到StreamConsumer這個類,我們來看一下它的代碼:
private static class StreamConsumer extends Thread {
final InputStream in;
final PrintWriter out;
final boolean flush;
StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
this.in = in;
this.out = out;
flush = flushEveryWrite;
}
@Override
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(in));
String line;
while ( (line = br.readLine()) != null) {
out.println(line);
if (flush) {
out.flush();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
這段代碼已經(jīng)達到了我們的理想狀態(tài):線程消費,高級抽象.
3.1 使用Kotlin
3.1.1 Kotlin IO
閑話不多說,先貼代碼為敬:
import java.io.InputStream
import java.io.InputStreamReader
class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {
override fun run() {
InputStreamReader(inputStream).buffered().use {
it.lines().forEach { r -> result.append(r) }
}
}
}
還是一樣熟悉的配方,我們逐行來解讀:
- 定義一個類,并且要求構(gòu)造函數(shù)必須傳入InputStream和一個StringBuilder.且實現(xiàn)了Runnable接口,這意味著它可以被線程消費.
- 覆寫run方法.我們可以看到InputStream被適配成了
InputStreamReader,這意味著它可以輸出字符流了,然后我們使用了Kotlin的接口將其裝飾成了Buffer. - 讀每一行buffer,并appned到result這個StringBuilder里去.
- 讀完就可以告辭了,close.(use會將其關閉)
3.1.2 Kotlin Coroutine

先看一下上面的圖,我們都知道內(nèi)核態(tài)線程是由OS調(diào)度的,但當一個線程拿到時間片時,卻調(diào)到了阻塞IO,那么只能等在那邊,浪費時間.
而協(xié)程則可以解決這個問題,當一個Jobhang住的時候,可以去做別的事情,繞開阻塞.更好的利用時間片.