簡介
主要對stream這個概念做一個形象的描述和理解,同時介紹一下比較常用的API。主要參考了Node.js的官方文檔。
stream的種類
-
Readable - streams from which data can be read (for example
fs.createReadStream()). -
Writable - streams to which data can be written (for example
fs.createWriteStream()). -
Duplex - streams that are both Readable and Writable (for example
net.Socket). -
Transform - Duplex streams that can modify or transform the data as it is written and read (for example
zlib.createDeflate()).
stream解決的問題
設計 stream API 的一個關鍵目標是將數(shù)據(jù)緩沖限制到可接受的級別,從而使得不同傳輸速率的源可以進行數(shù)據(jù)的傳輸,同時不會占用過量的內存。比如,文件的讀取。系統(tǒng)從硬盤中讀取文件的速度和我們程序處理文件內容的速度是不相匹配的,而且讀取的文件可能是很大的。如果不用流來讀取文件,那么我們首先就需要把整個文件讀取到內存中,然后程序從內存中讀取文件內容來進行后續(xù)的業(yè)務處理。這會極大的消耗系統(tǒng)的內存,并且降低處理的效率(要先讀取整個文件,再處理數(shù)據(jù))。
stream這個概念是很形象的,就像是水流,可以通過管道,從一處流向另一處。比如從文件輸入,最終由程序接收,進行后續(xù)的處理。而 stream.pipe()就是流中最關鍵的一個管道方法。
// 此處代碼實現(xiàn)了從file.txt讀取數(shù)據(jù),然后壓縮數(shù)據(jù),然后寫入file.txt.gz的過程
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
緩存機制
Writeable 和 Readable stream 都將數(shù)據(jù)存儲在一個自身內部的buffer中。分別writable.writableBuffer or readable.readableBuffer 可以得到buffer的數(shù)據(jù)。有一個參數(shù)highWaterMark 用來限制這個buffer的最大容量。從而使得流之間的數(shù)據(jù)傳輸可以被限制在一定的內存占用下,并且擁有較高的效率。
Writable Streams
幾個典型的可寫流:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
-
process.stdout,process.stderr
// 示例
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); // 關閉流,不可再寫入。
觸發(fā)的事件:
- close
-
drain
If a call tostream.write(chunk)returnsfalse, the'drain'event will be emitted when it is appropriate to resume writing data to the stream.
不能無限制的寫入,當writeable stream的內部緩存數(shù)據(jù)超過highWaterMark的閾值,stream.write會返回false,這是應該停止寫入,等到觸發(fā)drain事件再繼續(xù)寫入。 -
error
注意,error事件觸發(fā)的時候,stream不會自動被關閉,需要手動處理關閉。 - finish
-
pipe
對應于readable.pipe(),有一個readable stream和該stream連通的時候觸發(fā) - unpipe
對應于readable.unpipe(),有一個readable stream和該stream管道斷開的時候觸發(fā)
Readable Streams
幾個典型的可讀流:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
process.stdin
兩種模式: flowing and paused
readable stream 創(chuàng)建時都為paused模式,但是可以通過以下幾個方法變?yōu)閒lowing:
- Adding a
'data'event handler. - Calling the
stream.resume()method. - Calling the
stream.pipe()method to send the data to a Writable.
最常用的其實就是stream.pipe()了。
觸發(fā)的事件:
- close
- data
- end
- readable
// 示例
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
readable.setEncoding(encoding)
調用readable.setEncoding('utf8')可以使得chunk的類型由buffer變?yōu)閟tring。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
HTTP通信中的應用
看代碼和注釋應該就能懂了
const http = require('http');
const server = http.createServer((req, res) => {
// req is an http.IncomingMessage, which is a Readable Stream
// res is an http.ServerResponse, which is a Writable Stream
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the end event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1