Dart簡介5--Stream

說明

Stream 也是用于接收異步事件數(shù)據(jù),和 Future 不同的是,它可以接收多個異步操作的結(jié)果(成功或失?。?。 也就是說,在執(zhí)行異步任務(wù)時,可以通過多次觸發(fā)成功或失敗事件來傳遞結(jié)果數(shù)據(jù)或錯誤異常。簡單理解,其實就是一個異步數(shù)據(jù)隊列而已。我們知道隊列的特點是先進先出的,Stream也正是如此。 Stream 常用于會多次讀取數(shù)據(jù)的異步任務(wù)場景,如網(wǎng)絡(luò)內(nèi)容下載、文件讀寫等。在Dart語言中,Stream有兩種類型,一種是點對點的單訂閱流(Single-subscription),另一種則是廣播流。

stream幾個構(gòu)造方法:

  • Stream.periodic:構(gòu)造一個周期流,有2個參數(shù)第一個參數(shù)為間隔時間,第二個參數(shù)為回調(diào)函數(shù)。
  • Stream.fromFuture:從Future創(chuàng)建新的單訂閱流,當future完成時將觸發(fā)一個data或者error,然后使用Down事件關(guān)閉這個流。
  • Stream.fromFutures:從一組Future創(chuàng)建一個單訂閱流,每個future都有自己的data或者error事件,當整個Futures完成后,流將會關(guān)閉。如果Futures為空,流將會立刻關(guān)閉。
  • Stream.fromIterable:創(chuàng)建從一個集合中獲取其數(shù)據(jù)的單訂閱流。
  • Stream.value:用于從單個值創(chuàng)建Stream。
void main(){
  test();
}

test() async{
  // 1,使用 periodic 創(chuàng)建流,第一個參數(shù)為間隔時間,第二個參數(shù)為回調(diào)函數(shù)
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);

 // 2,從Future創(chuàng)建Stream
  Future<String> fut = Future((){
      return "async task";
  });
  Stream<String> stream = Stream<String>.fromFuture(fut);

  //3, 將多個Future放入一個列表中,將該列表傳入
  Future<String> fut1 = Future((){
      // 模擬耗時5秒
      sleep(Duration(seconds:5));
      return "async task1";
  });
  Future<String> fut2 = Future((){
      return "async task2";
  });
  Stream<String> stream = Stream<String>.fromFutures([fut1,fut2]);

  // 4,從一個列表創(chuàng)建`Stream`
  Stream<int> stream = Stream<int>.fromIterable([1,2,3]);

  // 5,value
  Stream<bool> stream = Stream<bool>.value(false);

  // await for循環(huán)從流中讀取
  await for(var i in stream){
    print(i);
  }
}

// 可以在回調(diào)函數(shù)中對值進行處理,這里直接返回了
int callback(int value){
  return value;
}

監(jiān)聽 Stream

監(jiān)聽Stream,并從中獲取數(shù)據(jù)也有三種方式:

  • 一種就是我們上文中使用的await for循環(huán),這也是官方推薦的方式,看起來更簡潔友好;
  • 使用forEach方法;
  • 使用listen方法StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError})。
    onData(必填):收到數(shù)據(jù)時觸發(fā);onError:收到Error時觸發(fā);onDone:結(jié)束時觸發(fā);unsubscribeOnError:遇到第一個Error時是否取消訂閱,默認為false。
test() async{
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);

 //1, await for循環(huán)從流中讀取
  await for(var i in stream){
    print(i);
  }

  //2, 使用forEach,傳入一個函數(shù)進去獲取并處理數(shù)據(jù)
  stream.forEach((int x){
    print(x);
  });

  //3,listen
  stream.listen(
    (x)=>print(x),
  onError: (e)=>print(e),
  onDone: ()=>print("onDone"));
}

Stream的一些方法

take 和 takeWhile

Stream<T> take(int count) 用于限制Stream中的元素數(shù)量;
Stream<T>.takeWhile(bool test(T element)) 與 take作用相似,只是它的參數(shù)是一個函數(shù)類型,且返回值必須是一個bool值.

skip 和 skipWhile

Stream<T> skip (int count),跳過元素。請注意,該方法只是從Stream中獲取元素時跳過,被跳過的元素依然是被執(zhí)行了的,所耗費的時間依然存在,其實只是跳過了執(zhí)行完的結(jié)果而已。
Stream<T> skipWhile(bool test(T element)) 方法與takeWhile用法是相同的,傳入一個函數(shù)對結(jié)果進行判斷,表示跳過滿足條件的

toList

Future<List<T>> toList() 表示將Stream中所有數(shù)據(jù)存儲在List中

屬性 length

stream.length 等待并獲取流中所有數(shù)據(jù)的數(shù)量

Stream 2個輔助類

StreamController

Stream的一個幫助類,可用于整個 Stream 過程的控制。使用該類時,需要導(dǎo)入'dart:async',其add方法和sink.add方法是相同的,都是用于放入一個元素,addError方法用于產(chǎn)生一個錯誤,監(jiān)聽方法中的onError可獲取錯誤。

import 'dart:async';

void main() {
  test();
}

test() async{
  // 創(chuàng)建
  StreamController streamController = StreamController();
  //也可以傳入一個指定的stream sc.addStream(stream);
  // 放入事件
  streamController.add('element_1');
  streamController.addError("this is error");
  streamController.sink.add('element_2');
  streamController.stream.listen(
    print,
  onError: print,
  onDone: ()=>print("onDone"));
}

StreamController的原型,它有5個可選參數(shù):factory StreamController( {void onListen(),void onPause(),void onResume(),onCancel(),bool sync: false}),可以調(diào)用對于的方法控制流,同時回調(diào)處理對于的節(jié)點。

  • onListen 注冊監(jiān)聽時回調(diào)
  • onPause 當流暫停時回調(diào)
  • onResume 當流恢復(fù)時回調(diào)
  • onCancel 當監(jiān)聽器被取消時回調(diào)
  • sync 當值為true時表示同步控制器SynchronousStreamController,默認值為false,表示異步控制器

StreamTransformer

該類可以使我們在Stream上執(zhí)行數(shù)據(jù)轉(zhuǎn)換。然后,這些轉(zhuǎn)換被推回到流中,以便該流注冊的所有監(jiān)聽器可以接收;
構(gòu)造方法:factory StreamTransformer.fromHandlers({ void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)})

  • handleData:響應(yīng)從流中發(fā)出的任何數(shù)據(jù)事件。提供的參數(shù)是來自發(fā)出事件的數(shù)據(jù),以及EventSink<T>,表示正在進行此轉(zhuǎn)換的當前流的實例
  • handleError:響應(yīng)從流中發(fā)出的任何錯誤事件
  • handleDone:當流不再有數(shù)據(jù)要處理時調(diào)用。通常在流的close()方法被調(diào)用時回調(diào)
void test() {
  StreamController sc = StreamController<int>();
  
  // 創(chuàng)建 StreamTransformer對象
  StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
    handleData: (int data, EventSink sink) {
      // 操作數(shù)據(jù)后,轉(zhuǎn)換為 double 類型
      sink.add((data * 2).toDouble());
    }, 
    handleError: (error, stacktrace, sink) {
      sink.addError('wrong: $error');
    }, 
    handleDone: (sink) {
      sink.close();
    },
  );
  
  // 調(diào)用流的transform方法,傳入轉(zhuǎn)換對象
  Stream stream = sc.stream.transform(stf);

  stream.listen(print);

  // 添加數(shù)據(jù),這里的類型是int
  sc.add(1);
  sc.add(2); 
  sc.add(3); 
  
  // 調(diào)用后,觸發(fā)handleDone回調(diào)
  // sc.close();
}

//輸出結(jié)果:
//2.0
//4.0
//6.0

廣播流

廣播流則可以允許多個監(jiān)聽器存在,就如同廣播一樣,凡是監(jiān)聽了廣播流,每個監(jiān)聽器都能獲取到數(shù)據(jù)。要注意,如果在觸發(fā)事件時將監(jiān)聽者正添加到廣播流,則該監(jiān)聽器將不會接收當前正在觸發(fā)的事件。如果取消監(jiān)聽,監(jiān)聽者會立即停止接收事件。
有兩種方式創(chuàng)建廣播流,一種直接從Stream創(chuàng)建,另一種使用StreamController創(chuàng)建。

test() async{
  // 調(diào)用 Stream 的 asBroadcastStream 方法創(chuàng)建
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e)
  .asBroadcastStream();
  stream = stream.take(5);

  stream.listen(print);
  stream.listen(print);
}

test() async{
  // 創(chuàng)建廣播流
  StreamController sc = StreamController.broadcast();

  sc.stream.listen(print);
  sc.stream.listen(print);

  sc.add("event1");
  sc.add("event2");
}

先就寫這么多,下面就得邊學習邊總結(jié),有什么不對的地方,歡迎留言討論,謝謝!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容