Dart之異步流Stream

說明

Stream 也是用于接收異步事件數(shù)據(jù),和 Future 不同的是,它可以接收多個異步操作的結果(成功或失?。?也就是說,在執(zhí)行異步任務時,可以通過多次觸發(fā)成功或失敗事件來傳遞結果數(shù)據(jù)或錯誤異常。簡單理解,其實就是一個異步數(shù)據(jù)隊列而已。我們知道隊列的特點是先進先出的,Stream也正是如此。 Stream 常用于會多次讀取數(shù)據(jù)的異步任務場景,如網(wǎng)絡內(nèi)容下載、文件讀寫等。在Dart語言中,Stream有兩種類型,一種是點對點的單訂閱流(Single-subscription),另一種則是廣播流。

stream幾個構造方法:

Stream.periodic:構造一個周期流,有2個參數(shù)第一個參數(shù)為間隔時間,第二個參數(shù)為回調函數(shù)。
Stream.fromFuture:從Future創(chuàng)建新的單訂閱流,當future完成時將觸發(fā)一個data或者error,然后使用Down事件關閉這個流。
Stream.fromFutures:從一組Future創(chuàng)建一個單訂閱流,每個future都有自己的data或者error事件,當整個Futures完成后,流將會關閉。如果Futures為空,流將會立刻關閉。
Stream.fromIterable:創(chuàng)建從一個集合中獲取其數(shù)據(jù)的單訂閱流。
Stream.value:用于從單個值創(chuàng)建Stream。

void main(){
  test();
}

test() async{
  // 1,使用 periodic 創(chuàng)建流,第一個參數(shù)為間隔時間,第二個參數(shù)為回調函數(shù)
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);

 // 2,從Future創(chuàng)建Stream
  Future<String> fut = Future((){
      return "async task";
  });
  Stream<String> stream = Stream<String>.fromFuture(fut);

  //3, 將多個Future放入一個列表中,將該列表傳入
  Future<String> fut1 = Future((){
      // 模擬耗時5秒
      sleep(Duration(seconds:5));
      return "async task1";
  });
  Future<String> fut2 = Future((){
      return "async task2";
  });
  Stream<String> stream = Stream<String>.fromFutures([fut1,fut2]);

  // 4,從一個列表創(chuàng)建`Stream`
  Stream<int> stream = Stream<int>.fromIterable([1,2,3]);

  // 5,value
  Stream<bool> stream = Stream<bool>.value(false);

  // await for循環(huán)從流中讀取
  await for(var i in stream){
    print(i);
  }
}

// 可以在回調函數(shù)中對值進行處理,這里直接返回了
int callback(int value){
  return value;
}
監(jiān)聽 Stream

監(jiān)聽Stream,并從中獲取數(shù)據(jù)也有三種方式:

一種就是我們上文中使用的await for循環(huán),這也是官方推薦的方式,看起來更簡潔友好;
使用forEach方法;
使用listen方法StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError})。
onData(必填):收到數(shù)據(jù)時觸發(fā);onError:收到Error時觸發(fā);onDone:結束時觸發(fā);unsubscribeOnError:遇到第一個Error時是否取消訂閱,默認為false。

test() async{
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);

 //1, await for循環(huán)從流中讀取
  await for(var i in stream){
    print(i);
  }

  //2, 使用forEach,傳入一個函數(shù)進去獲取并處理數(shù)據(jù)
  stream.forEach((int x){
    print(x);
  });

  //3,listen
  stream.listen(
    (x)=>print(x),
  onError: (e)=>print(e),
  onDone: ()=>print("onDone"));
}
Stream的一些方法

take 和 takeWhile

Stream<T> take(int count) 用于限制Stream中的元素數(shù)量;
Stream<T>.takeWhile(bool test(T element)) 與 take作用相似,只是它的參數(shù)是一個函數(shù)類型,且返回值必須是一個bool值.

skip 和 skipWhile

Stream<T> skip (int count),跳過元素。請注意,該方法只是從Stream中獲取元素時跳過,被跳過的元素依然是被執(zhí)行了的,所耗費的時間依然存在,其實只是跳過了執(zhí)行完的結果而已。
Stream<T> skipWhile(bool test(T element)) 方法與takeWhile用法是相同的,傳入一個函數(shù)對結果進行判斷,表示跳過滿足條件的

toList

Future<List<T>> toList() 表示將Stream中所有數(shù)據(jù)存儲在List中

屬性 length

stream.length 等待并獲取流中所有數(shù)據(jù)的數(shù)量

Stream 2個輔助類

StreamController

Stream的一個幫助類,可用于整個 Stream 過程的控制。使用該類時,需要導入'dart:async',其add方法和sink.add方法是相同的,都是用于放入一個元素,addError方法用于產(chǎn)生一個錯誤,監(jiān)聽方法中的onError可獲取錯誤。

import 'dart:async';

void main() {
  test();
}

test() async{
  // 創(chuàng)建
  StreamController streamController = StreamController();
  //也可以傳入一個指定的stream sc.addStream(stream);
  // 放入事件
  streamController.add('element_1');
  streamController.addError("this is error");
  streamController.sink.add('element_2');
  streamController.stream.listen(
    print,
  onError: print,
  onDone: ()=>print("onDone"));
}

StreamController的原型,它有5個可選參數(shù):factory StreamController( {void onListen(),void onPause(),void onResume(),onCancel(),bool sync: false}),可以調用對于的方法控制流,同時回調處理對于的節(jié)點。
onListen 注冊監(jiān)聽時回調
onPause 當流暫停時回調
onResume 當流恢復時回調
onCancel 當監(jiān)聽器被取消時回調
sync 當值為true時表示同步控制器SynchronousStreamController,默認值為false,表示異步控制器

StreamTransformer

該類可以使我們在Stream上執(zhí)行數(shù)據(jù)轉換。然后,這些轉換被推回到流中,以便該流注冊的所有監(jiān)聽器可以接收;
構造方法:factory StreamTransformer.fromHandlers({ void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)})
handleData:響應從流中發(fā)出的任何數(shù)據(jù)事件。提供的參數(shù)是來自發(fā)出事件的數(shù)據(jù),以及EventSink<T>,表示正在進行此轉換的當前流的實例
handleError:響應從流中發(fā)出的任何錯誤事件
handleDone:當流不再有數(shù)據(jù)要處理時調用。通常在流的close()方法被調用時回調

void test() {
  StreamController sc = StreamController<int>();
  
  // 創(chuàng)建 StreamTransformer對象
  StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
    handleData: (int data, EventSink sink) {
      // 操作數(shù)據(jù)后,轉換為 double 類型
      sink.add((data * 2).toDouble());
    }, 
    handleError: (error, stacktrace, sink) {
      sink.addError('wrong: $error');
    }, 
    handleDone: (sink) {
      sink.close();
    },
  );
  
  // 調用流的transform方法,傳入轉換對象
  Stream stream = sc.stream.transform(stf);

  stream.listen(print);

  // 添加數(shù)據(jù),這里的類型是int
  sc.add(1);
  sc.add(2); 
  sc.add(3); 
  
  // 調用后,觸發(fā)handleDone回調
  // sc.close();
}

//輸出結果:
//2.0
//4.0
//6.0
廣播流

廣播流則可以允許多個監(jiān)聽器存在,就如同廣播一樣,凡是監(jiān)聽了廣播流,每個監(jiān)聽器都能獲取到數(shù)據(jù)。要注意,如果在觸發(fā)事件時將監(jiān)聽者正添加到廣播流,則該監(jiān)聽器將不會接收當前正在觸發(fā)的事件。如果取消監(jiān)聽,監(jiān)聽者會立即停止接收事件。
有兩種方式創(chuàng)建廣播流,一種直接從Stream創(chuàng)建,另一種使用StreamController創(chuàng)建。

test() async{
  // 調用 Stream 的 asBroadcastStream 方法創(chuàng)建
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e)
  .asBroadcastStream();
  stream = stream.take(5);

  stream.listen(print);
  stream.listen(print);
}

test() async{
  // 創(chuàng)建廣播流
  StreamController sc = StreamController.broadcast();

  sc.stream.listen(print);
  sc.stream.listen(print);

  sc.add("event1");
  sc.add("event2");
}

作者:逃離_102
鏈接:http://www.itdecent.cn/p/03e6477c1548
來源:簡書
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容