說明
Stream 也是用于接收異步事件數(shù)據(jù),和 Future 不同的是,它可以接收多個異步操作的結果(成功或失?。?也就是說,在執(zhí)行異步任務時,可以通過多次觸發(fā)成功或失敗事件來傳遞結果數(shù)據(jù)或錯誤異常。簡單理解,其實就是一個異步數(shù)據(jù)隊列而已。我們知道隊列的特點是先進先出的,Stream也正是如此。 Stream 常用于會多次讀取數(shù)據(jù)的異步任務場景,如網(wǎng)絡內(nèi)容下載、文件讀寫等。在Dart語言中,Stream有兩種類型,一種是點對點的單訂閱流(Single-subscription),另一種則是廣播流。
stream幾個構造方法:
Stream.periodic:構造一個周期流,有2個參數(shù)第一個參數(shù)為間隔時間,第二個參數(shù)為回調函數(shù)。
Stream.fromFuture:從Future創(chuàng)建新的單訂閱流,當future完成時將觸發(fā)一個data或者error,然后使用Down事件關閉這個流。
Stream.fromFutures:從一組Future創(chuàng)建一個單訂閱流,每個future都有自己的data或者error事件,當整個Futures完成后,流將會關閉。如果Futures為空,流將會立刻關閉。
Stream.fromIterable:創(chuàng)建從一個集合中獲取其數(shù)據(jù)的單訂閱流。
Stream.value:用于從單個值創(chuàng)建Stream。
void main(){
test();
}
test() async{
// 1,使用 periodic 創(chuàng)建流,第一個參數(shù)為間隔時間,第二個參數(shù)為回調函數(shù)
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
// 2,從Future創(chuàng)建Stream
Future<String> fut = Future((){
return "async task";
});
Stream<String> stream = Stream<String>.fromFuture(fut);
//3, 將多個Future放入一個列表中,將該列表傳入
Future<String> fut1 = Future((){
// 模擬耗時5秒
sleep(Duration(seconds:5));
return "async task1";
});
Future<String> fut2 = Future((){
return "async task2";
});
Stream<String> stream = Stream<String>.fromFutures([fut1,fut2]);
// 4,從一個列表創(chuàng)建`Stream`
Stream<int> stream = Stream<int>.fromIterable([1,2,3]);
// 5,value
Stream<bool> stream = Stream<bool>.value(false);
// await for循環(huán)從流中讀取
await for(var i in stream){
print(i);
}
}
// 可以在回調函數(shù)中對值進行處理,這里直接返回了
int callback(int value){
return value;
}
監(jiān)聽 Stream
監(jiān)聽Stream,并從中獲取數(shù)據(jù)也有三種方式:
一種就是我們上文中使用的await for循環(huán),這也是官方推薦的方式,看起來更簡潔友好;
使用forEach方法;
使用listen方法StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError})。
onData(必填):收到數(shù)據(jù)時觸發(fā);onError:收到Error時觸發(fā);onDone:結束時觸發(fā);unsubscribeOnError:遇到第一個Error時是否取消訂閱,默認為false。
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
//1, await for循環(huán)從流中讀取
await for(var i in stream){
print(i);
}
//2, 使用forEach,傳入一個函數(shù)進去獲取并處理數(shù)據(jù)
stream.forEach((int x){
print(x);
});
//3,listen
stream.listen(
(x)=>print(x),
onError: (e)=>print(e),
onDone: ()=>print("onDone"));
}
Stream的一些方法
take 和 takeWhile
Stream<T> take(int count) 用于限制Stream中的元素數(shù)量;
Stream<T>.takeWhile(bool test(T element)) 與 take作用相似,只是它的參數(shù)是一個函數(shù)類型,且返回值必須是一個bool值.
skip 和 skipWhile
Stream<T> skip (int count),跳過元素。請注意,該方法只是從Stream中獲取元素時跳過,被跳過的元素依然是被執(zhí)行了的,所耗費的時間依然存在,其實只是跳過了執(zhí)行完的結果而已。
Stream<T> skipWhile(bool test(T element)) 方法與takeWhile用法是相同的,傳入一個函數(shù)對結果進行判斷,表示跳過滿足條件的
toList
Future<List<T>> toList() 表示將Stream中所有數(shù)據(jù)存儲在List中
屬性 length
stream.length 等待并獲取流中所有數(shù)據(jù)的數(shù)量
Stream 2個輔助類
StreamController
Stream的一個幫助類,可用于整個 Stream 過程的控制。使用該類時,需要導入'dart:async',其add方法和sink.add方法是相同的,都是用于放入一個元素,addError方法用于產(chǎn)生一個錯誤,監(jiān)聽方法中的onError可獲取錯誤。
import 'dart:async';
void main() {
test();
}
test() async{
// 創(chuàng)建
StreamController streamController = StreamController();
//也可以傳入一個指定的stream sc.addStream(stream);
// 放入事件
streamController.add('element_1');
streamController.addError("this is error");
streamController.sink.add('element_2');
streamController.stream.listen(
print,
onError: print,
onDone: ()=>print("onDone"));
}
StreamController的原型,它有5個可選參數(shù):factory StreamController( {void onListen(),void onPause(),void onResume(),onCancel(),bool sync: false}),可以調用對于的方法控制流,同時回調處理對于的節(jié)點。
onListen 注冊監(jiān)聽時回調
onPause 當流暫停時回調
onResume 當流恢復時回調
onCancel 當監(jiān)聽器被取消時回調
sync 當值為true時表示同步控制器SynchronousStreamController,默認值為false,表示異步控制器
StreamTransformer
該類可以使我們在Stream上執(zhí)行數(shù)據(jù)轉換。然后,這些轉換被推回到流中,以便該流注冊的所有監(jiān)聽器可以接收;
構造方法:factory StreamTransformer.fromHandlers({ void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)})
handleData:響應從流中發(fā)出的任何數(shù)據(jù)事件。提供的參數(shù)是來自發(fā)出事件的數(shù)據(jù),以及EventSink<T>,表示正在進行此轉換的當前流的實例
handleError:響應從流中發(fā)出的任何錯誤事件
handleDone:當流不再有數(shù)據(jù)要處理時調用。通常在流的close()方法被調用時回調
void test() {
StreamController sc = StreamController<int>();
// 創(chuàng)建 StreamTransformer對象
StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
handleData: (int data, EventSink sink) {
// 操作數(shù)據(jù)后,轉換為 double 類型
sink.add((data * 2).toDouble());
},
handleError: (error, stacktrace, sink) {
sink.addError('wrong: $error');
},
handleDone: (sink) {
sink.close();
},
);
// 調用流的transform方法,傳入轉換對象
Stream stream = sc.stream.transform(stf);
stream.listen(print);
// 添加數(shù)據(jù),這里的類型是int
sc.add(1);
sc.add(2);
sc.add(3);
// 調用后,觸發(fā)handleDone回調
// sc.close();
}
//輸出結果:
//2.0
//4.0
//6.0
廣播流
廣播流則可以允許多個監(jiān)聽器存在,就如同廣播一樣,凡是監(jiān)聽了廣播流,每個監(jiān)聽器都能獲取到數(shù)據(jù)。要注意,如果在觸發(fā)事件時將監(jiān)聽者正添加到廣播流,則該監(jiān)聽器將不會接收當前正在觸發(fā)的事件。如果取消監(jiān)聽,監(jiān)聽者會立即停止接收事件。
有兩種方式創(chuàng)建廣播流,一種直接從Stream創(chuàng)建,另一種使用StreamController創(chuàng)建。
test() async{
// 調用 Stream 的 asBroadcastStream 方法創(chuàng)建
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e)
.asBroadcastStream();
stream = stream.take(5);
stream.listen(print);
stream.listen(print);
}
test() async{
// 創(chuàng)建廣播流
StreamController sc = StreamController.broadcast();
sc.stream.listen(print);
sc.stream.listen(print);
sc.add("event1");
sc.add("event2");
}
作者:逃離_102
鏈接:http://www.itdecent.cn/p/03e6477c1548
來源:簡書
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。