理解Dart 異步事件流 Stream

轉(zhuǎn)載:http://blog.sina.com.cn/s/blog_12d64892f0102vtk9.html

基本概念

顧名思義,Stream 就是流的意思,表示發(fā)出的一系列的異步數(shù)據(jù)??梢院?jiǎn)單地認(rèn)為 Stream 是一個(gè)異步數(shù)據(jù)源。它是 Dart 中處理異步事件流的統(tǒng)一 API。

集合與Stream

Dart 中,集合(Iterable或Collection)表示一系列的對(duì)象。而 Stream (也就是“流”)也表示一系列的對(duì)象,但區(qū)別在于 Stream 是異步的事件流。比如文件、套接字這種 IO 數(shù)據(jù)的非阻塞輸入流(input data),或者用戶界面上用戶觸發(fā)的動(dòng)作(UI事件)。

集合可以理解為“拉”模式,比如你有一個(gè) List ,你可以主動(dòng)地通過迭代獲得其中的每個(gè)元素,想要就能拿出來(lái)。而 Stream 可以理解為“推”模式,這些異步產(chǎn)生的事件或數(shù)據(jù)會(huì)推送給你(并不是你想要就能立刻拿到)。這種模式下,你要做的是用一個(gè) listener (即callback)做好數(shù)據(jù)接收的準(zhǔn)備,數(shù)據(jù)可用時(shí)就通知你。

推和拉就是別人給你還是你自己去拿的區(qū)別。但是不管如何獲取數(shù)據(jù),二者的本質(zhì)都可以認(rèn)為是數(shù)據(jù)的集合(數(shù)據(jù)可能無(wú)限多)。所以,二者有很多相同的方法,稍后介紹。

怎么理解 Stream 中的數(shù)據(jù)?

數(shù)據(jù)(data)是個(gè)非常抽象的概念,可以認(rèn)為一切皆數(shù)據(jù)。在程序的世界里,其實(shí)只有兩種東西:數(shù)據(jù)和對(duì)數(shù)據(jù)的操作。對(duì)數(shù)據(jù)的操作就是對(duì)輸入的數(shù)據(jù)經(jīng)過一些計(jì)算,之后輸出一些新數(shù)據(jù)。事件(event,如UI上的事件)、計(jì)算結(jié)果(value,如函數(shù)/方法的返回值)以及從文件或網(wǎng)絡(luò)獲得的純數(shù)據(jù)都可以認(rèn)為是數(shù)據(jù)(data)。另外,Dart 中的所有事物都是對(duì)象,所以數(shù)據(jù)也一定是某種對(duì)象(object)。在本文中,可以認(rèn)為事件、結(jié)果、數(shù)據(jù)、對(duì)象都是一樣的,不用特意區(qū)分。

Stream 與 Future

Stream 和 Future 是 Dart 異步處理的核心 API。Future 表示稍后獲得的一個(gè)數(shù)據(jù),所有異步的操作的返回值都用 Future 來(lái)表示。但是 Future 只能表示一次異步獲得的數(shù)據(jù)。而 Stream 表示多次異步獲得的數(shù)據(jù)。比如界面上的按鈕可能會(huì)被用戶點(diǎn)擊多次,所以按鈕上的點(diǎn)擊事件(onClick)就是一個(gè) Stream 。簡(jiǎn)單地說(shuō),F(xiàn)uture將返回一個(gè)值,而Stream將返回多次值。

另外一點(diǎn), Stream 是流式處理,比如 IO 處理的時(shí)候,一般情況是每次只會(huì)讀取一部分?jǐn)?shù)據(jù)(具體取決于實(shí)現(xiàn))。和一次性讀取整個(gè)文件的內(nèi)容相比,Stream 的好處是處理過程中內(nèi)存占用較小。而 File 的 readAsString(異步讀,返回 Future)或 readAsStringSync(同步讀,返回 String)等方法都是一次性讀取整個(gè)文件的內(nèi)容進(jìn)來(lái),雖然獲得完整內(nèi)容處理起來(lái)比較方便,但是如果文件很大的話就會(huì)導(dǎo)致內(nèi)存占用過大的問題。


基本使用

獲得 Stream

Dart 中統(tǒng)一使用 Stream 處理異步事件流,所以可以獲得 Stream 的地方很多。為了方便演示,這里先介紹2種獲取 Stream 的方式。

  1. 將集合(Iterable)包裝為 Stream
    Stream 有3個(gè)工廠構(gòu)造函數(shù):fromFuture、fromIterable 和 periodic,分別可以通過一個(gè) Future、Iterable或定時(shí)觸發(fā)動(dòng)作作為 Stream 的事件源構(gòu)造 Stream。下面的代碼就是通過一個(gè) List 構(gòu)造的 Stream。
var data = [1, 2, 3, 4];
var stream = new Stream.fromIterable(data);

對(duì)集合的包裝只是簡(jiǎn)單地模擬異步,定時(shí)觸發(fā)、IO輸入、UI事件等現(xiàn)實(shí)情況才是真正的異步事件。

  1. 使用 Stream 讀文件
    讀文件的方式有多種,其中一種是使用 Stream 獲得文件內(nèi)容。File 的方法 openRead() 返回一個(gè) Stream>,List 可以理解為一個(gè) byte array,因?yàn)?Dart 中沒有 byte 類型。下面的代碼將打開當(dāng)前程序的源代碼的 Stream 輸入流。
var stream = new File(new Options().script).openRead();

訂閱 Stream

當(dāng)你有了一個(gè) Stream 時(shí),最常用的功能就是通過 listen() 方法訂閱 Stream 上發(fā)出的數(shù)據(jù)(即事件)。有事件發(fā)出時(shí)就會(huì)通知訂閱者。如果在發(fā)出事件的同時(shí)添加訂閱者,那么要在訂閱者在該事件發(fā)出后才會(huì)生效。如果訂閱者取消了訂閱,那么它會(huì)立即停止接收事件。

我們?cè)诮邮找粋€(gè)輸入流的時(shí)候要面臨幾種不同的情況和狀態(tài),最基本的是處理收到數(shù)據(jù),此外上游還可能出現(xiàn)錯(cuò)誤,以及出現(xiàn)錯(cuò)誤時(shí)是否繼續(xù)后續(xù)數(shù)據(jù)的處理,最后在輸入完成的時(shí)候還有一個(gè)結(jié)束狀態(tài)。所以 listen 方法的幾個(gè)參數(shù)分別對(duì)應(yīng)這些情況和狀態(tài):

onData,處理收到的數(shù)據(jù)的 callback
onError,處理遇到錯(cuò)誤時(shí)的 callback
onDone,結(jié)束時(shí)的通知 callback
unsubscribeOnError,遇到第一個(gè)錯(cuò)誤時(shí)是否停止(也就是取消訂閱),默認(rèn)為false

onData 是唯一必填參數(shù),也是用的最多的,后面3個(gè)是可選的命名參數(shù)。

下面我們訂閱一個(gè) Stream 的數(shù)據(jù),收到數(shù)據(jù)時(shí)只是簡(jiǎn)單地打印出來(lái):

var data = [1, 2, 3, 4];
var stream = new Stream.fromIterable(data);

stream.listen((e)=>print(e), onDone: () => print('Done'));
// => 1, 2, 3, 4
// => Done

上面的代碼會(huì)先打印出從 Stream 收到的每個(gè)數(shù)字,最后打印一個(gè)‘Done’。

當(dāng) Stream 中的所有數(shù)據(jù)發(fā)送完時(shí),就會(huì)觸發(fā) onDone 的調(diào)用,但提前取消訂閱不會(huì)觸發(fā) onDone 。在結(jié)束的同時(shí)(收到 onDone 事件之前),所有的訂閱者都被取消了訂閱,此時(shí) Stream 上便沒有訂閱者了。允許對(duì)一個(gè)已經(jīng)結(jié)束了的 Stream 再添加訂閱者(盡管沒什么意義),此時(shí)只會(huì)立刻收到一個(gè) onDone 事件。

stream.listen(print, onDone: () {
print('first done');
//listen again
stream.listen(print, onDone:() => print('second done'));
});
// => data: 1,2,3,4,
// => first done
// => no data, because stream is done
// => second done

上面的代碼中,首先我們?cè)?onDone 的回調(diào)中打印了 ‘first done’ 表示第一次結(jié)束。此時(shí) stream 上已經(jīng)沒有訂閱者了,但接著我們又再次訂閱了這個(gè) stream。這一次沒有再收到數(shù)據(jù),而是馬上打印出了 ‘second done’ 表示第二次訂閱的結(jié)束。

高級(jí)訂閱管理

前面的示例代碼會(huì)處理 Stream 發(fā)出的所有數(shù)據(jù),直到 Stream 結(jié)束。如果想提前取消處理怎么辦?listen() 方法會(huì)返回一個(gè) StreamSubscription 對(duì)象,用于提供對(duì)訂閱的管理控制。onData、onError和onDone 這3個(gè)方法分別用于設(shè)置(如果listen方法中的參數(shù)為null)或覆蓋對(duì)應(yīng)的 callback。cancel、pause和resume分別用于取消訂閱、暫停和繼續(xù)。比如,可以在 listen 方法中參數(shù)置為 null,接著通過 subscription 對(duì)象設(shè)置 callback 。此外,cancel 方法也重要,要么一直處理數(shù)據(jù)直到 stream 結(jié)束,要么提前取消訂閱結(jié)束處理。比如使用 Stream 讀文件,為了使資源得到釋放,要么讀完整個(gè)文件,要么使用 subscription 的 cancel 方法取消訂閱(即終止后續(xù)數(shù)據(jù)的讀?。?梢钥闯?,這里的 cancel 相當(dāng)于傳統(tǒng)意義上的 close 方法。最后,pause和resume方法是嘗試向數(shù)據(jù)源發(fā)出暫停和繼續(xù)的請(qǐng)求,其意義取決于實(shí)際情況,并且不保證一定能生效。比如數(shù)據(jù)源能夠支持,或者是帶緩沖實(shí)現(xiàn)的 stream 才能做到暫停。

var sub = stream.listen(null);
sub.onData(print);
sub.onError((e)=>print('error $e'));
sub.onDone(()=>print('done'));
// => 1, 2, 3, 4, done

上面的代碼與前面的 listen 示例代碼作用相同。

var sub = stream.listen(null);
sub.onData((e){
if(e > 2)
sub.cancel();
else
print(e);
});
sub.onDone(()=>print('done'));
// => 1, 2
// no 'done', because stream is cancel.

上面的代碼最后會(huì)打印出1和2,但不會(huì)打印出‘done’ 。首先,listen 中的參數(shù)為 null,也就是沒有訂閱者。然后,通過 listen 的返回者 subscription 對(duì)象設(shè)置了 onData 和 onDone 的處理,這時(shí)才有了訂閱者。在 onData 中,如果收到的數(shù)字大于2就取消后續(xù)處理,因此到數(shù)字 3 的時(shí)候就沒有打印 3,而是立即結(jié)束了處理,這樣后面的 4 也不會(huì)出現(xiàn)了。既然是提前退出,所以 onDone 也是不會(huì)觸發(fā)的。

Stream 兩種訂閱模式

Stream有兩種訂閱模式:?jiǎn)斡嗛?single)和多訂閱(broadcast)。單訂閱就是只能有一個(gè)訂閱者,而廣播是可以有多個(gè)訂閱者。這就有點(diǎn)類似于消息服務(wù)(Message Service)的處理模式。單訂閱類似于點(diǎn)對(duì)點(diǎn),在訂閱者出現(xiàn)之前會(huì)持有數(shù)據(jù),在訂閱者出現(xiàn)之后就才轉(zhuǎn)交給它。而廣播類似于發(fā)布訂閱模式,可以同時(shí)有多個(gè)訂閱者,當(dāng)有數(shù)據(jù)時(shí)就會(huì)傳遞給所有的訂閱者,而不管當(dāng)前是否已有訂閱者存在。

Stream 默認(rèn)處于單訂閱模式,所以同一個(gè) stream 上的 listen 和其它大多數(shù)方法只能調(diào)用一次,調(diào)用第二次就會(huì)報(bào)錯(cuò)。但 Stream 可以通過 transform() 方法(返回另一個(gè) Stream)進(jìn)行連續(xù)調(diào)用。通過 Stream.asBroadcastStream() 可以將一個(gè)單訂閱模式的 Stream 轉(zhuǎn)換成一個(gè)多訂閱模式的 Stream,isBroadcast 屬性可以判斷當(dāng)前 Stream 所處的模式。

assert(stream.isBroadcast == false);
stream.first.then(print);
stream.last.then(print);// Bad state: Stream already has subscriber.

上的代碼需要分別打印出 stream 的第一個(gè)數(shù)據(jù)和最后一個(gè)數(shù)據(jù),但是單模式 Stream 只能訂閱一次,所以直接出錯(cuò)了。當(dāng)然,Stream 是異步的,所以 first 也沒有打印出來(lái)。

var bs = stream.asBroadcastStream();
assert(bs.isBroadcast == true);
bs.first.then(print);
bs.last.then(print);
// OK => 1, 4

上面的代碼,我們把單模式 Stream 轉(zhuǎn)成了多訂閱的 Stream,所以可以 first 和 last 都打印出來(lái)了。

按前面說(shuō)的,單訂閱模式會(huì)持有數(shù)據(jù),多訂閱模式如果沒有及時(shí)添加訂閱者則可能丟數(shù)據(jù)。不過具體取決于 stream 的實(shí)現(xiàn)。

new Timer(new Duration(seconds:5), ()=>stream.listen(print));
// after 5 second, it output 1,2,3,4

上面的代碼利用 Timer 延遲了5秒才訂閱 stream,但仍然輸出了數(shù)據(jù)。因?yàn)槲覀冞@里的這個(gè) stream 是單訂閱模式,它在有訂閱者后才會(huì)發(fā)出事件。那么多訂閱模式就一定會(huì)漏掉數(shù)據(jù)嗎?

var bs = stream.asBroadcastStream();
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it also output 1,2,3,4
// because asBroadcastStream() is a simple wrap,
// it don't change the source stream's feature

上面我們把原始的單訂閱模式轉(zhuǎn)成了多訂閱模式的 Stream,此時(shí)可以添加多個(gè)訂閱者。我們5秒后才在 broadcast stream 上添加了訂閱者,但它依然輸出了 1,2,3,4 ,并沒有漏掉數(shù)據(jù)。這其實(shí)是因?yàn)?asBroadcastStream() 只是對(duì)原始 stream 的封裝,并不改變?cè)?stream 的實(shí)現(xiàn)特性。所以這個(gè) broadcast stream 同樣在等待有訂閱者之后才發(fā)出數(shù)據(jù)。但是如果一旦有了第一個(gè)訂閱者,然后再延遲添加第二個(gè)訂閱者就會(huì)漏數(shù)據(jù)了。

var bs = stream.asBroadcastStream();
// add first listener
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it output 1,2,3,4

// add second listener
new Timer(new Duration(seconds:10), ()=>bs.listen(print));
// after 10 second, nothing output, because stream is done

再來(lái)看另外一個(gè)例子,我們自己來(lái)創(chuàng)建一個(gè) Stream。StreamController 用于創(chuàng)建 Stream,它有兩個(gè)構(gòu)造函數(shù),分別用于創(chuàng)建單訂閱模式 Stream 和 多訂閱模式 Stream。然后可以利用 add()、addError() 和 close() 方法發(fā)送事件、發(fā)送錯(cuò)誤和結(jié)束,這三個(gè)方法來(lái)自 EventSink,是各種 Sink 上的通用方法。

// build single stream
//var controller = new StreamController();

// build broadcast stream
var controller = new StreamController.broadcast();
//send event
controller..add(1)
..add(2)
..add(3)
..add(4);
//send done
controller.close();

var myStream = controller.stream;
new Timer(new Duration(seconds:5), ()=>myStream.listen(print));
//if myStream is single stream, it output 1,2,3,4
//if myStream is broadcast stream, it output nothing, because stream is done.

Stream 的集合特性

前面說(shuō)過,Stream 和一般的集合類似,都是一組數(shù)據(jù),只不過一個(gè)是異步推送,一個(gè)是同步拉取。所以他們都很多共同的方法。例如:

stream.any((e) => e > 2).then(print);// stream.any()
print([1,2,3,4].any((e) => e > 2));// iterable.any()
// => true, true

比如 Stream 和 集合 都有 any() 方法,集合是同步的(但是惰性執(zhí)行,這里因?yàn)橛?print 調(diào)用,所以立刻執(zhí)行了)并直接返回結(jié)果, Stream 上的 any() 方法是異步的,返回的是 Future 。方法本身的含義都是一樣的。上面的代碼雖然 stream 的 any 方法在前,但因?yàn)槭钱惒降?,所以的輸出在后?/p>

在列舉其它 Stream 和 Iterable 通用的方法:

//常見集合方法
stream.first.then(print);
stream.firstWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.last.then(print);
stream.lastWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.length.then(print);
stream.isEmpty.then(print);

stream.any((e) => e > 2).then(print);
stream.every((e) => e > 2).then(print);
stream.contains(3).then(print);
stream.elementAt(2).then(print);
stream.where((e) => e >2).listen(print);

stream.skip(2).listen(print);
stream.skipWhile((e) => e < 2).listen(print);
stream.take(2).listen(print);
stream.takeWhile((e)=>e<3).listen(print);

stream.map((e) => e*2).listen(print);
stream.reduce(0, (p, c) => p + c).then(print);
stream.expand((e) => [e, e]).listen(print);

stream.toList().then(print);
stream.toSet().then(print);

注意以上方法同時(shí)只能使用一次,因?yàn)槭菃斡嗛喣J?。此外,如果方法只有一個(gè)返回值,即數(shù)據(jù)收斂類型的方法,那么返回就是一個(gè) Future。如果是只是數(shù)據(jù)轉(zhuǎn)換的方法,如 map ,返回的還是一個(gè) Stream,只是數(shù)據(jù)數(shù)據(jù)的類型和數(shù)量變了。看到這么多 Stream 與 Iterable 相同的方法,大家應(yīng)該更清楚 Stream 其實(shí)也是個(gè)數(shù)據(jù)集合。

通用數(shù)據(jù)收斂方法

集合中有很多方法只返回一個(gè)值,多個(gè)數(shù)據(jù)作為輸入、一個(gè)數(shù)據(jù)作為輸出的方法就是數(shù)據(jù)收斂的方法。Stream 有一個(gè)更通用的收斂方法 pipe() 。pipe() 方法的參數(shù)要求是一個(gè) StreamConsumer 接口的實(shí)現(xiàn),該接口只有一個(gè)方法: Future consume(Stream stream)

class DataConsumer implements StreamConsumer{
Future consume(Stream stream){
return stream.reduce(0, (c,p)=>c+p);
}
}

stream.pipe(new DataConsumer()).then(print);
// => 10

// equivalent below
stream.reduce(0, (p, c) => p + c).then(print);

上面我們自己實(shí)現(xiàn)了一個(gè) StreamConsumer ,它只是對(duì) Stream 的數(shù)據(jù)求和,并返回該結(jié)果。這個(gè)簡(jiǎn)單的例子實(shí)際意義不大。但這里只是為了演示這個(gè)通用 pipe() 方法和 StreamConsumer 接口的意義。

通用數(shù)據(jù)轉(zhuǎn)換方法

除了數(shù)據(jù)收斂方法,Stream 也有自己通用的數(shù)據(jù)轉(zhuǎn)換方法 transform() 。類似于 Future 的連續(xù)調(diào)用,Stream 也可以連續(xù)調(diào)用。 transform 方法就是把一個(gè) Stream 作為輸入,然后經(jīng)過計(jì)算或數(shù)據(jù)轉(zhuǎn)換,輸出為另一個(gè) Stream。另一個(gè) Stream 中的數(shù)據(jù)類型可以不同于原類型,數(shù)據(jù)多少也可以不同(比如實(shí)現(xiàn)一個(gè)數(shù)據(jù)的 buffer )。

transform 的方法簽名是:
Stream transform(StreamTransformer streamTransformer)

下面我們構(gòu)造一個(gè) StreamTransformer ,然后使用 Stream 的 transform() 進(jìn)行轉(zhuǎn)換:

var transformer = new StreamTransformer(
handleData: (e, sink){
sink.add(e*2);
}
);
stream.transform(transformer).listen(print);

// equivalent below
stream.map((e) => e*2).listen(print);

class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
sink.add(e*2);
}
}

stream.transform(new MyTransformer()).listen(print);

使用 StreamTransformer 接口的工廠構(gòu)造函數(shù) 或者 繼承 StreamEventTransformer 都可以構(gòu)造一個(gè) transformer 。其本質(zhì)和我們處理一個(gè) Stream 是一樣的,就要要處理 handleData、handleError 和 handleDone 這三件事。上面的 transform 和 map 方法類似,但是 transform 方法比 map 方法更靈活。map 只能做1對(duì)1的轉(zhuǎn)換,而 transform 并沒有這個(gè)要求,因?yàn)樗抢?sink 來(lái)添加數(shù)據(jù),而不是返回轉(zhuǎn)換結(jié)果。transform 方法和 StreamTransformer 接口是一種更通用的設(shè)計(jì)。

舉個(gè)更實(shí)用點(diǎn)例子,Dart 中的 StringDecoder 和 StringEncoder 就是一個(gè) StreamTransformer,負(fù)責(zé)實(shí)現(xiàn) byte stream 和 String stream 之間的轉(zhuǎn)換。LineTransformer 是切分行的 transformer。比如,使用 Stream 讀文件需要先將字節(jié)轉(zhuǎn)換為字符,然后還可以按行讀取。

file.openRead()
.transform(new StringDecoder())
.transform(new LineTransformer()) .listen(your_process);

注意,不管是 Stream.map() 還是 Stream.transform() ,他們都是在做轉(zhuǎn)換,而非訂閱。對(duì)于單模式 Stream ,如果沒有添加訂閱者,那么轉(zhuǎn)換方法根本不會(huì)執(zhí)行(可能是由于是惰性執(zhí)行的緣故)。

stream.map((e){
print(e);
return e*2;
});
// nothing output, because lazy evaluate

class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
print(e);
sink.add(e*2);
}
}
stream.transform(new MyTransformer());
// nothing output, because no subscription

上面的示例中,都在轉(zhuǎn)換過程中做了輸出,但實(shí)際不會(huì)輸出內(nèi)容,因?yàn)闆]有用 listen 添加訂閱者。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容