Dart | 什么是Stream

??前言

StreamFuture都是Dart:async庫的核心API,對異步提供了非常好的支持。

我思考了很久,究竟應該如何向大家介紹Stream(流)。因為Stream非常有用,它是為處理異步事件而生的。而在應用中有大量的場景需要使用異步事件,例如請求網絡,和用戶交互等等,它們都無法同步完成。Stream能夠極大的幫助我們處理這些問題。??

但是對于剛接觸的新手來說,流確實足夠抽象,以至于大家需要花費非常多的時間來理解它。

所以我將會盡我所能向大家介紹Stream。

?????Stream

??什么是Stream

Stream非常有特點但卻不太好理解,我與其按照字面意思把它看作流,更愿意把它看成一個工廠或者是機器。

image

我們來看看這個機器它有什么特點:

  • 它有一個入口,可以放東西/指令(anything)
  • 這個機器不知道入口什么時候會放東西進來
  • 中間的機器能夠生產或者加工,這應該會耗費一些時間
  • 他有一個出口,應該會有產品從那出來
  • 我們也不知道到底什么時候產品會從出口出來

整個過程,時間都是一個不確定因素,我們隨時都可以向這個機器的入口放東西進去,放進去了以后機器進行處理,但是我們并不知道它多久處理完。所以出口是需要專門派人盯著的,等待機器流出東西來。整個過程都是以異步的眼光來看的。

??我們將機器模型轉化成Stream

image

  • 這個大機器就是StreamController,它是創(chuàng)建流的方式之一。
  • StreamController有一個入口,叫做sink
  • sink可以使用add方法放東西進來,放進去以后就不再關心了。
  • 當有東西從sink進來以后,我們的機器就開始工作啦,空空空。
  • StreamController有一個出口,叫做stream
  • 機器處理完畢后就會把產品從出口丟出來,但是我們并不知道什么時候會出來,所以我們需要使用listen方法一直監(jiān)聽這個出口。
  • 而且當多個物品被放進來了之后,它不會打亂順序,而是先入先出。

通過這個例子,相信大家對流應該都有了基礎印象,那么要解釋后面的東西就不難了。

??如何使用Stream

獲得Stream的方法:

  • 通過構造函數
  • 使用StreamController
  • IO Stream

stream有三個構造方法:

  • Stream.fromFuture:從Future創(chuàng)建新的單訂閱流,當future完成時將觸發(fā)一個data或者error,然后使用Down事件關閉這個流。

  • Stream.fromFutures:從一組Future創(chuàng)建一個單訂閱流,每個future都有自己的data或者error事件,當整個Futures完成后,流將會關閉。如果Futures為空,流將會立刻關閉。

  • Stream.fromIterable:創(chuàng)建從一個集合中獲取其數據的單訂閱流。


Stream.fromIntreable([1,2,3]);

??監(jiān)聽Stream的方法

監(jiān)聽一個流最常見的方法就是listen。當有事件發(fā)出時,流將會通知listener。Listen方法提供了這幾種觸發(fā)事件:

  • onData(必填):收到數據時觸發(fā)
  • onError:收到Error時觸發(fā)
  • onDone:結束時觸發(fā)
  • unsubscribeOnError:遇到第一個Error時是否取消訂閱,默認為false

?? StreamController

如果你想創(chuàng)建一條新的流的話,非常簡單!?? 使用StreamController,它為你提供了非常豐富的功能,你能夠在streamController上發(fā)送數據,處理錯誤,并獲得結果!

//任意類型的流
StreamController controller = StreamController();
controller.sink.add(123);
controller.sink.add("xyz");
controller.sink.add(Anything);

//創(chuàng)建一條處理int類型的流
StreamController<int> numController = StreamController();
numController.sink.add(123);

泛型定義了我們能向流上推送什么類型的數據。它可以是任何類型!

我們再來看看如何獲取最后的結果。

StreamController controller = StreamController();

//監(jiān)聽這個流的出口,當有data流出時,打印這個data
StreamSubscription subscription =
controller.stream.listen((data)=>print("data"));

controller.sink.add(123);

輸出: 123

你需要將一個方法交給stream的listen函數,這個方法入參(data)是我們的StreamController處理完畢后產生的結果,我們監(jiān)聽出口,并獲得了這個結果(data)。這里可以使用lambda表達式,也可以是其他任何函數。

(這里我為了方便區(qū)分,把listen說成函數,(data)=>print(data)說成方法,其實是一個東西。)

??Transforming an existing stream

假如你已經有了一個流,你可以通過它轉化成為一條新的流。非常簡單!流提供了map(),where(),expand(),和take()方法,能夠輕松將已有的流轉化為新的流。

where

如果你想要篩選掉一些不想要的事件。例如一個猜數游戲,用戶可以輸入數字,當輸入正確的時候,我們做出一定反應。而我們必須篩選掉所有錯誤的答案,這個時候我們可以使用where篩選掉不需要的數字。

stream.where((event){...})

where函數接收一個事件,每當這個流有東西流到where函數的時候,這就是那個事件。我們或許根本不需要這個事件,但是必須作為參數傳入。

take

如果你想要控制這個流最多能傳多少個東西。比如輸入密碼,我們可能想讓用戶最多輸四次,那么我們可以使用take來限制。

stream.take(4);

take函數接收一個int,代表最多能經過take函數的事件次數。當傳輸次數達到這個數字時,這個流將會關閉,無法再傳輸。

transform

如果你需要更多的控制轉換,那么請使用transform()方法。他需要配合StreamTransformer進行使用。我們先來看下面一段猜數游戲,然后我會向你解釋。

StreamController<int> controller = StreamController<int>();

final transformer = StreamTransformer<int,String>.fromHandlers(
    handleData:(value, sink){
    if(value==100){
      sink.add("你猜對了");
    }
    else{ sink.addError('還沒猜中,再試一次吧');
    }
  });
  
  controller.stream
            .transform(transformer)
            .listen(
                (data) => print(data),
                onError:(err) => print(err));
    
    controller.sink.add(23);
    //controller.sink.add(100);

輸出: 還沒猜中,再試一次吧

StreamTransformer<S,T>是我們stream的檢查員,他負責接收stream通過的信息,然后進行處理返回一條新的流。

  • S代表之前的流的輸入類型,我們這里是輸入一個數字,所以是int。
  • T代表轉化后流的輸入類型,我們這里add進去的是一串字符串,所以是String。
  • handleData接收一個value并創(chuàng)建一條新的流并暴露sink,我們可以在這里對流進行轉化。
  • 我們還可以addError進去告訴后面有問題。

然后我們監(jiān)聽transform之后的流,當轉換好的event流出時,我們打印這個event,這個event就是我們剛才add進sink的數據。onError能夠捕捉到我們add進去的err。

??Stream的種類

流有兩種

  • "Single-subscription" streams 單訂閱流
  • "broadcast" streams 多訂閱流

"Single-subscription" streams

單個訂閱流在流的整個生命周期內僅允許有一個listener。它在有收聽者之前不會生成事件,并且在取消收聽時它會停止發(fā)送事件,即使你仍然在Sink.add更多事件。

即使在第一個訂閱被取消后,也不允許在單個訂閱流上進行兩次偵聽。

單訂閱流通常用于流式傳輸更大的連續(xù)數據塊,如文件I / O.

StreamController controller = StreamController();

controller.stream.listen((data)=> print(data));
controller.stream.listen((data)=> print(data));

controller.sink.add(123);

輸出: Bad state: Stream has already been listened to.
單訂閱流不能有多個收聽者。

"Broadcast" streams

廣播流允許任意數量的收聽者,且無論是否有收聽者,他都能產生事件。所以中途進來的收聽者將不會收到之前的消息。

如果多個收聽者想要收聽單個訂閱流,請使用asBroadcastStream在非廣播流之上創(chuàng)建廣播流。

如果在觸發(fā)事件時將收聽者添加到廣播流,則該偵聽器將不會接收當前正在觸發(fā)的事件。如果取消收聽,收聽者會立即停止接收事件。

一般的流都是單訂閱流。從Stream繼承的廣播流必須重寫isBroadcast 才能返回true。

StreamController controller = StreamController();
//將單訂閱流轉化為廣播流
Stream stream = controller.stream.asBroadcastStream();

stream.listen((data)=> print(data));
stream.listen((data)=> print(data));

controller.sink.add(123);

輸出:
123
123

?? 寫在最后

以上就是關于Dart中Stream的簡單介紹,如果你還有任何疑問或者建議,歡迎在下方評論區(qū)或者郵箱告訴我!我會在24小時內盡快聯(lián)系您??

下一篇文章我將向大家介紹一種非常棒的狀態(tài)管理方式,它是Google團隊力推的狀態(tài)管理方法,我認為它真的很酷!

所以,準備好迎接BLoC了嗎??

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容