Stream和 Future都是Dart:async庫的核心API,對異步提供了非常好的支持。
我思考了很久,究竟應(yīng)該如何向大家介紹Stream(流)。因?yàn)镾tream非常有用,它是為處理異步事件而生的。而在應(yīng)用中有大量的場景需要使用異步事件,例如請求網(wǎng)絡(luò),和用戶交互等等,它們都無法同步完成。Stream能夠極大的幫助我們處理這些問題。??
但是對于剛接觸的新手來說,流確實(shí)足夠抽象,以至于大家需要花費(fèi)非常多的時(shí)間來理解它。
所以我將會(huì)盡我所能向大家介紹Stream。
?????Stream
??什么是Stream
Stream非常有特點(diǎn)但卻不太好理解,我與其按照字面意思把它看作流,更愿意把它看成一個(gè)工廠或者是機(jī)器。
<figcaption></figcaption>
我們來看看這個(gè)機(jī)器它有什么特點(diǎn):
- 它有一個(gè)入口,可以放東西/指令(anything)
- 這個(gè)機(jī)器不知道入口什么時(shí)候會(huì)放東西進(jìn)來
- 中間的機(jī)器能夠生產(chǎn)或者加工,這應(yīng)該會(huì)耗費(fèi)一些時(shí)間
- 他有一個(gè)出口,應(yīng)該會(huì)有產(chǎn)品從那出來
- 我們也不知道到底什么時(shí)候產(chǎn)品會(huì)從出口出來
整個(gè)過程,時(shí)間都是一個(gè)不確定因素,我們隨時(shí)都可以向這個(gè)機(jī)器的入口放東西進(jìn)去,放進(jìn)去了以后機(jī)器進(jìn)行處理,但是我們并不知道它多久處理完。所以出口是需要專門派人盯著的,等待機(jī)器流出東西來。整個(gè)過程都是以異步的眼光來看的。
??我們將機(jī)器模型轉(zhuǎn)化成Stream
[圖片上傳失敗...(image-c01deb-1561013840446)]
<figcaption></figcaption>
- 這個(gè)大機(jī)器就是StreamController,它是創(chuàng)建流的方式之一。
- StreamController有一個(gè)入口,叫做sink
- sink可以使用add方法放東西進(jìn)來,放進(jìn)去以后就不再關(guān)心了。
- 當(dāng)有東西從sink進(jìn)來以后,我們的機(jī)器就開始工作啦,空空空。
- StreamController有一個(gè)出口,叫做stream
- 機(jī)器處理完畢后就會(huì)把產(chǎn)品從出口丟出來,但是我們并不知道什么時(shí)候會(huì)出來,所以我們需要使用listen方法一直監(jiān)聽這個(gè)出口。
- 而且當(dāng)多個(gè)物品被放進(jìn)來了之后,它不會(huì)打亂順序,而是先入先出。
通過這個(gè)例子,相信大家對流應(yīng)該都有了基礎(chǔ)印象,那么要解釋后面的東西就不難了。
??如何使用Stream
獲得Stream的方法:
- 通過構(gòu)造函數(shù)
- 使用StreamController
- IO Stream
stream有三個(gè)構(gòu)造方法:
Stream.fromFuture:從Future創(chuàng)建新的單訂閱流,當(dāng)future完成時(shí)將觸發(fā)一個(gè)data或者error,然后使用Down事件關(guān)閉這個(gè)流。
Stream.fromFutures:從一組Future創(chuàng)建一個(gè)單訂閱流,每個(gè)future都有自己的data或者error事件,當(dāng)整個(gè)Futures完成后,流將會(huì)關(guān)閉。如果Futures為空,流將會(huì)立刻關(guān)閉。
Stream.fromIterable:創(chuàng)建從一個(gè)集合中獲取其數(shù)據(jù)的單訂閱流。
Stream.fromIntreable([1,2,3]);
復(fù)制代碼
??監(jiān)聽Stream的方法
監(jiān)聽一個(gè)流最常見的方法就是listen。當(dāng)有事件發(fā)出時(shí),流將會(huì)通知listener。Listen方法提供了這幾種觸發(fā)事件:
- onData(必填):收到數(shù)據(jù)時(shí)觸發(fā)
- onError:收到Error時(shí)觸發(fā)
- onDone:結(jié)束時(shí)觸發(fā)
- unsubscribeOnError:遇到第一個(gè)Error時(shí)是否取消訂閱,默認(rèn)為false
?? StreamController
如果你想創(chuàng)建一條新的流的話,非常簡單!?? 使用StreamController,它為你提供了非常豐富的功能,你能夠在streamController上發(fā)送數(shù)據(jù),處理錯(cuò)誤,并獲得結(jié)果!
//任意類型的流
StreamController controller = StreamController();
controller.sink.add(123);
controller.sink.add("xyz");
controller.sink.add(Anything);
//創(chuàng)建一條處理int類型的流
StreamController<int> numController = StreamController();
numController.sink.add(123);
復(fù)制代碼
泛型定義了我們能向流上推送什么類型的數(shù)據(jù)。它可以是任何類型!
我們再來看看如何獲取最后的結(jié)果。
StreamController controller = StreamController();
//監(jiān)聽這個(gè)流的出口,當(dāng)有data流出時(shí),打印這個(gè)data
StreamSubscription subscription =
controller.stream.listen((data)=>print("$data"));
controller.sink.add(123);
復(fù)制代碼
輸出: 123
你需要將一個(gè)方法交給stream的listen函數(shù),這個(gè)方法入?yún)?data)是我們的StreamController處理完畢后產(chǎn)生的結(jié)果,我們監(jiān)聽出口,并獲得了這個(gè)結(jié)果(data)。這里可以使用lambda表達(dá)式,也可以是其他任何函數(shù)。
(這里我為了方便區(qū)分,把listen說成函數(shù),(data)=>print(data)說成方法,其實(shí)是一個(gè)東西。)
??Transforming an existing stream
假如你已經(jīng)有了一個(gè)流,你可以通過它轉(zhuǎn)化成為一條新的流。非常簡單!流提供了map(),where(),expand(),和take()方法,能夠輕松將已有的流轉(zhuǎn)化為新的流。
where
如果你想要篩選掉一些不想要的事件。例如一個(gè)猜數(shù)游戲,用戶可以輸入數(shù)字,當(dāng)輸入正確的時(shí)候,我們做出一定反應(yīng)。而我們必須篩選掉所有錯(cuò)誤的答案,這個(gè)時(shí)候我們可以使用where篩選掉不需要的數(shù)字。
stream.where((event){...})
復(fù)制代碼
where函數(shù)接收一個(gè)事件,每當(dāng)這個(gè)流有東西流到where函數(shù)的時(shí)候,這就是那個(gè)事件。我們或許根本不需要這個(gè)事件,但是必須作為參數(shù)傳入。
take
如果你想要控制這個(gè)流最多能傳多少個(gè)東西。比如輸入密碼,我們可能想讓用戶最多輸四次,那么我們可以使用take來限制。
stream.take(4);
復(fù)制代碼
take函數(shù)接收一個(gè)int,代表最多能經(jīng)過take函數(shù)的事件次數(shù)。當(dāng)傳輸次數(shù)達(dá)到這個(gè)數(shù)字時(shí),這個(gè)流將會(huì)關(guān)閉,無法再傳輸。
transform
如果你需要更多的控制轉(zhuǎn)換,那么請使用transform()方法。他需要配合StreamTransformer進(jìn)行使用。我們先來看下面一段猜數(shù)游戲,然后我會(huì)向你解釋。
StreamController<int> controller = StreamController<int>();
final transformer = StreamTransformer<int,String>.fromHandlers(
handleData:(value, sink){
if(value==100){
sink.add("你猜對了");
}
else{ sink.addError('還沒猜中,再試一次吧');
}
});
controller.stream
.transform(transformer)
.listen(
(data) => print(data),
onError:(err) => print(err));
controller.sink.add(23);
//controller.sink.add(100);
復(fù)制代碼
輸出: 還沒猜中,再試一次吧
StreamTransformer<S,T>是我們stream的檢查員,他負(fù)責(zé)接收stream通過的信息,然后進(jìn)行處理返回一條新的流。
- S代表之前的流的輸入類型,我們這里是輸入一個(gè)數(shù)字,所以是int。
- T代表轉(zhuǎn)化后流的輸入類型,我們這里add進(jìn)去的是一串字符串,所以是String。
- handleData接收一個(gè)value并創(chuàng)建一條新的流并暴露sink,我們可以在這里對流進(jìn)行轉(zhuǎn)化。
- 我們還可以addError進(jìn)去告訴后面有問題。
然后我們監(jiān)聽transform之后的流,當(dāng)轉(zhuǎn)換好的event流出時(shí),我們打印這個(gè)event,這個(gè)event就是我們剛才add進(jìn)sink的數(shù)據(jù)。onError能夠捕捉到我們add進(jìn)去的err。
??Stream的種類
流有兩種
- "Single-subscription" streams 單訂閱流
- "broadcast" streams 多訂閱流
"Single-subscription" streams
單個(gè)訂閱流在流的整個(gè)生命周期內(nèi)僅允許有一個(gè)listener。它在有收聽者之前不會(huì)生成事件,并且在取消收聽時(shí)它會(huì)停止發(fā)送事件,即使你仍然在Sink.add更多事件。
即使在第一個(gè)訂閱被取消后,也不允許在單個(gè)訂閱流上進(jìn)行兩次偵聽。
單訂閱流通常用于流式傳輸更大的連續(xù)數(shù)據(jù)塊,如文件I / O.
StreamController controller = StreamController();
controller.stream.listen((data)=> print(data));
controller.stream.listen((data)=> print(data));
controller.sink.add(123);
復(fù)制代碼
輸出: Bad state: Stream has already been listened to. 單訂閱流不能有多個(gè)收聽者。
"Broadcast" streams
廣播流允許任意數(shù)量的收聽者,且無論是否有收聽者,他都能產(chǎn)生事件。所以中途進(jìn)來的收聽者將不會(huì)收到之前的消息。
如果多個(gè)收聽者想要收聽單個(gè)訂閱流,請使用asBroadcastStream在非廣播流之上創(chuàng)建廣播流。
如果在觸發(fā)事件時(shí)將收聽者添加到廣播流,則該偵聽器將不會(huì)接收當(dāng)前正在觸發(fā)的事件。如果取消收聽,收聽者會(huì)立即停止接收事件。
一般的流都是單訂閱流。從Stream繼承的廣播流必須重寫isBroadcast 才能返回true。
StreamController controller = StreamController();
//將單訂閱流轉(zhuǎn)化為廣播流
Stream stream = controller.stream.asBroadcastStream();
stream.listen((data)=> print(data));
stream.listen((data)=> print(data));
controller.sink.add(123);
復(fù)制代碼
輸出: 123 123