Stream in Dart

大家應(yīng)該都吃過轉(zhuǎn)盤小火鍋吧,情形是這樣的的:好多個(gè)人坐在一塊,圍著一條傳送帶,每個(gè)人的位置上都會有一個(gè)小火鍋,廚師將菜品放到傳送帶上,這些菜品會隨著傳送帶經(jīng)過每個(gè)人的位置,如果看到你想吃的菜品,則直接拿著放到自己的小火鍋里;如果沒碰到想吃的,則直接濾過,但傳送帶會繼續(xù)將他們傳遞到下一個(gè)人的身邊。

上述情況我們需要注意這樣幾個(gè)問題:

1. 我們只有坐在這條傳送帶旁邊,才能吃到上面的菜。

2. 我們想吃的菜品不是立馬就能出現(xiàn)到你面前的。如果你想吃肥牛,前提是需要廚師在傳送帶上面放一盤肥牛,肥牛才會被傳送到你旁邊,但是你并不知道肥牛什么時(shí)候才能到你身邊。

3. 傳送帶上的菜品都是按照廚師放置的順序依次被傳送到你身邊的。

Stream

那么什么是 Stream 呢?上面的傳送帶就是 Stream,傳送帶上面可以傳遞任何菜品,相應(yīng)的 Stream 也就可以傳遞任何數(shù)據(jù)類型。

為了方便的控制 Stream ,我們通常使用 StreamControllerStreamController 中提供了兩個(gè)屬性,一個(gè)是用來往 Stream 中添加數(shù)據(jù)的 sink,一個(gè)是用于接收數(shù)據(jù)的 stream。

上面說了,只有坐在傳送帶旁邊才能吃到傳送帶上的菜品,這里,我們也只需要使用streamController.stream.listen(...) 就可以收到通知 (當(dāng) Stream上有數(shù)據(jù)的時(shí)候)。

只要我們坐在傳送帶前,我們就成了消費(fèi)者,對應(yīng)的,當(dāng)我們 listen 了一個(gè) Stream 的時(shí)候,我們就成了一個(gè) StreamSubscription (訂閱者)。

最后,當(dāng)我們不需要這個(gè) Stream 的時(shí)候,我們需要將其 close 掉,使用streamController.close()

下面我們用一個(gè)例子來演示下:

import 'dart:async';  
  
void main() {  
  // 聲明一個(gè) StreamController  
  StreamController controller = StreamController();  
  
  // 監(jiān)聽此 Stream  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('$value'));  
  
  // 往 Stream 中添加數(shù)據(jù)  
  controller.sink.add(0);  
  controller.sink.add('a, b, c, d');  
  controller.sink.add(3.14);  
  
  // 關(guān)閉 StreamController  
  controller.close();  
}

輸出:

0
a, b, c, d
3.14

上述代碼中,我們聲明了一個(gè) StreamController,然后往往里面放置了三種不同的數(shù)據(jù)類型,當(dāng)然,我們也可以使用泛型的方式來限制里面的數(shù)據(jù)類型:

StreamController<int> controller = StreamController<int>();

接下來我們看看 stream.listne(...) 這個(gè)方法,源碼如下:

StreamSubscription<T> listen(void onData(T event),  
  {Function onError, void onDone(), bool cancelOnError});

可以看出 listen(...) 方法接受一個(gè)必選參數(shù)和三個(gè)可選參數(shù),我們上一個(gè)例子中只是傳遞了必選參數(shù) onData(...),其他額三個(gè)參數(shù)并沒有傳遞,下面我們來舉例說明 listen(...) 方法中各個(gè)參數(shù)的用途。

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .listen(onData, onError: onError, onDone: onDone, cancelOnError: true);  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  // 發(fā)送一個(gè) Error  
  controller.sink.addError(-1);  
  controller.sink.add(2);  
  
  controller.close();  
}  
  
void onData(int data) {  
  print('The value is $data');  
}  
  
void onError(err) {  
  print('The err is $err');  
}  
  
void onDone() {  
  print('The stream is done !');  
}

輸出:

The value is 0
The value is 1
The err is -1

從上述代碼中,我們可以看到,stream.listen(...) ,接受 4 個(gè)參數(shù),這 4 個(gè)參數(shù)的作用分別如下:

onData(T data) : 用來接收 Stream 中的每一個(gè)事件。

onError(...) : 注釋上是這樣說的 The [onError] callback must be of type void onError(error),也就是所他需要一個(gè)接受一個(gè)參數(shù)的方法。但它還支持接受兩個(gè)參數(shù)的方法 void onError(error, StackTrace stackTrace) 。

onDone() : 當(dāng)一個(gè) Stream 關(guān)閉了,也就是執(zhí)行了 stream.close() 方法并且發(fā)送了 done 事件,這個(gè)方法會被調(diào)用。

cancelOnError : 這是一個(gè) bool 類型的值,意思也很簡單,就是當(dāng) Stream 碰到 Error 事件的時(shí)候,是否關(guān)閉這個(gè) Stream。

我們上述代碼中 cancelOnError 參數(shù)傳遞的是 true ,也就是說當(dāng)Stream遇到 Error 的時(shí)候,Stream 就關(guān)閉了,下面的事件就不會再發(fā)送出去了。我們把上面代碼中的 cancelOnError 參數(shù)改為 false 其他的代碼不變,輸出如下:

The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !

可以看出,這個(gè)情況下,即使 Stream 中遇到了 Error ,下面的事件依然會接著發(fā)送,并且最后的 done 事件也執(zhí)行了。

Stream 類型

Stream 類型分為兩種,分別為 Single-subscription StreamsBroadcast Streams

Single-subscription Streams

這種類型的 Stream 只允許一個(gè)訂閱者,也就是只能 listen 一次。我們上一小節(jié)中的例子就是一個(gè) Single-subscription Streams。接下來我們看看如果我們對這種類型的 Stream 訂閱兩次會發(fā)生什么情況。

import 'dart:async';  
  
void main() {  
  // Single-subscription Streams  
  StreamController<int> controller = StreamController<int>();  
  
  // 第一個(gè)訂閱者  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('subscription1 $value'));  
  
  // 第二個(gè)訂閱者  
  StreamSubscription subscription2 =  
      controller.stream.listen((value) => print('subscription2 $value'));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  controller.close();  
}

輸出:

Unhandled exception:
Bad state: Stream has already been listened to.
...

Broadcast Streams

這種類型的 Stream 允許任意數(shù)量的訂閱者,只是新的訂閱者只能從它開始訂閱的時(shí)候接收事件。也就是訂閱之前 Stream 中的事件是接受不到的。

Broadcast Streams 的聲明方式如下:

StreamController<int> controller = StreamController<int>.broadcast();

接下來我們寫個(gè)示例看看:

import 'dart:async';  
  
void main() {  
  // Broadcast Streams  
  StreamController<int> controller = StreamController<int>.broadcast();  
  
  StreamSubscription sub1 =  controller.stream.listen((value) => print('sub1 value is $value'));  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));  
  controller.sink.add(2);  
  controller.sink.add(3);  
  
  controller.close();  
}

輸出:

sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3

可以看出 sub1 可以接收到 Stream 中所有的數(shù)據(jù),而 sub2 只能接收到從訂閱這個(gè) Stream 之后發(fā)送的數(shù)據(jù)。

StreamTransformer

當(dāng)數(shù)據(jù)通過 Stream 傳遞的時(shí)候,我們可以按需來轉(zhuǎn)換里面的數(shù)據(jù),Dart 中給我們提供了 StreamTransformer 來對數(shù)據(jù)做出一些特定轉(zhuǎn)換。

我們可以通過三種方式來實(shí)現(xiàn)數(shù)據(jù)轉(zhuǎn)換:

  • Stream 自帶的方法,如 mapwhere
  • 通過 StreamTransformer.fromHandlers(...) 來轉(zhuǎn)換
  • 直接實(shí)現(xiàn)一個(gè) StreamTransformer 來定義一個(gè)轉(zhuǎn)換器

map、where ...

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .where((value) => value % 2 == 0) // where  
  .map((value) => 'The value is $value') // map  
  .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

輸出:

The value is 0
The value is 2
The value is 4

上述代碼中,我們使用了 wheremap 轉(zhuǎn)換符。where 將滿足條件的值過濾出來,然后 map 將整型的數(shù)字轉(zhuǎn)換成字符串類型的值。

StreamTransformer.fromHandlers(...)

import 'dart:async';  
  
// 轉(zhuǎn)換方法  
void handleData(data, EventSink sink) {  
  if (data % 2 == 0) {  
    sink.add(data);  
  }  
}  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .transform(StreamTransformer.fromHandlers(handleData: handleData))  
      .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

輸出:

0
2
4

自定義 StreamTransformer

在自定義我們自己的 Transformer 之前,我們先來看看 stream.transform(...) 做了什么事情,畢竟我們是通過 transform(...) 方法傳入的 Transformer, transform(...) 源碼如下:

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {  
  return streamTransformer.bind(this);  
}

可以看出 transform 方法返回的依然回一個(gè) Stream ,只不過這個(gè) Stream 是經(jīng)過轉(zhuǎn)換后的Stream。 類似于 Java8 Stream 中的中間流,就是不停的返回 Stream 的那種。然后該方法是一個(gè)泛型方法,泛型類型分別為 T,和 S,可以這樣理解,T 為該方法的入?yún)㈩愋停琒 為該方法的出參類型,類似于 Java8 中的 Function,我們可以看下 Java 中的 Function 接口的部分源碼:

/**  
 * ...
 * @param <T> the type of the input to the function  
 * @param <R> the type of the result of the function  
 * * @since 1.8  
 */
@FunctionalInterface  
public interface Function<T, R> {  
 R apply(T t);
 ...
 }

最后,transform(...) 方法調(diào)用了 streamTransformer.bind(this); 該方法返回的是一個(gè)新的 Stream,也就是轉(zhuǎn)換后的 Stream,當(dāng)然,bind() 方法也是我們自定義 StreamTransformer 時(shí)需要實(shí)現(xiàn)的方法。

我們在之前的例子中聲明一個(gè) StreamController 的時(shí)候,都是沒有傳遞參數(shù)的,其實(shí),StreamController 的構(gòu)造方法是這樣的:

factory StreamController(  
    {void onListen(),  
  void onPause(),  
  void onResume(),  
  onCancel(),  
  bool sync: false}) {  
  return sync  
      ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)  
      : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);  
}

可以看出,StreamController 是有參數(shù)的,并且都是可選參數(shù),在這些參數(shù)中,我們重點(diǎn)實(shí)現(xiàn) onListen(),關(guān)于 onListen 的源碼解釋如下:

A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.

重點(diǎn)就是 onListen 方法是用來生產(chǎn)事件的。

接下來,來實(shí)現(xiàn)一個(gè)自定義的 StreamTransformer:

/// 自定義一個(gè) StreamTransformer ,
/// 泛型類型 S 為入?yún)㈩愋停琓 為出參類型
/// 這些類型都是 Stream 中傳遞的數(shù)據(jù)類型
class MyTransformer<S, T> implements StreamTransformer<S, T> {

  // 用來生成一個(gè)新的 Stream 并且控制符合條件的數(shù)據(jù)
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOrError;

  // 轉(zhuǎn)換之前的 Stream
  Stream<S> _stream;

  MyTransformer({bool sync: false, this.cancelOrError}) {
    _controller = new StreamController<T>(
        onListen: _onListen,
        onCancel: _onCancel,
        onPause: () {
          _subscription.pause();
        },
        onResume: () {
          _subscription.resume();
        },
        sync: sync);
  }

  MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
    // 定義一個(gè) StreamController,注意泛型類型為 T,也就是出參類型,因?yàn)?    // 我們是使用該 _controller 生成一個(gè)用來返回的新的 Stream<T>
    _controller = new StreamController<T>.broadcast(
        onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    // _stream 為轉(zhuǎn)換之前的 Stream<S>
    _subscription = _stream.listen(onData,
        onError: _controller.addError,
        onDone: _controller.close,
        cancelOnError: cancelOrError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  // 數(shù)據(jù)轉(zhuǎn)換
  void onData(S data) {
    if ((data as int) % 2 == 0) {
      // 將符合條件的數(shù)據(jù)添加到新的 Stream 中
      _controller.sink.add(data);
    }
  }

  // 參數(shù)為轉(zhuǎn)換之前的 Stream<S>
  // 返回的是一個(gè)新的 Stream<T> (轉(zhuǎn)換之后的 Stream)
  @override
  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    // TODO: implement cast
    return null;
  }
}

使用如下:

void main() {
  StreamController<int> controller = StreamController<int>();

  controller.stream
      .transform(new MyTransformer()) // 自定義的 StreamTransformer
      .listen((value) => print('$value'));

  controller.sink.add(0);
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);

  controller.close();
}

輸出:

0
2
4

如有錯誤,還請指出。謝謝?。。?/p>

參考鏈接

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容