大家應(yīng)該都吃過轉(zhuǎn)盤小火鍋吧,情形是這樣的的:好多個(gè)人坐在一塊,圍著一條傳送帶,每個(gè)人的位置上都會有一個(gè)小火鍋,廚師將菜品放到傳送帶上,這些菜品會隨著傳送帶經(jīng)過每個(gè)人的位置,如果看到你想吃的菜品,則直接拿著放到自己的小火鍋里;如果沒碰到想吃的,則直接濾過,但傳送帶會繼續(xù)將他們傳遞到下一個(gè)人的身邊。
上述情況我們需要注意這樣幾個(gè)問題:
1. 我們只有坐在這條傳送帶旁邊,才能吃到上面的菜。
2. 我們想吃的菜品不是立馬就能出現(xiàn)到你面前的。如果你想吃肥牛,前提是需要廚師在傳送帶上面放一盤肥牛,肥牛才會被傳送到你旁邊,但是你并不知道肥牛什么時(shí)候才能到你身邊。
3. 傳送帶上的菜品都是按照廚師放置的順序依次被傳送到你身邊的。
Stream
那么什么是 Stream 呢?上面的傳送帶就是 Stream,傳送帶上面可以傳遞任何菜品,相應(yīng)的 Stream 也就可以傳遞任何數(shù)據(jù)類型。
為了方便的控制 Stream ,我們通常使用 StreamController,StreamController 中提供了兩個(gè)屬性,一個(gè)是用來往 Stream 中添加數(shù)據(jù)的 sink,一個(gè)是用于接收數(shù)據(jù)的 stream。
上面說了,只有坐在傳送帶旁邊才能吃到傳送帶上的菜品,這里,我們也只需要使用streamController.stream.listen(...) 就可以收到通知 (當(dāng) Stream上有數(shù)據(jù)的時(shí)候)。
只要我們坐在傳送帶前,我們就成了消費(fèi)者,對應(yīng)的,當(dāng)我們 listen 了一個(gè) Stream 的時(shí)候,我們就成了一個(gè) StreamSubscription (訂閱者)。
最后,當(dāng)我們不需要這個(gè) Stream 的時(shí)候,我們需要將其 close 掉,使用streamController.close()
下面我們用一個(gè)例子來演示下:
import 'dart:async';
void main() {
// 聲明一個(gè) StreamController
StreamController controller = StreamController();
// 監(jiān)聽此 Stream
StreamSubscription subscription1 =
controller.stream.listen((value) => print('$value'));
// 往 Stream 中添加數(shù)據(jù)
controller.sink.add(0);
controller.sink.add('a, b, c, d');
controller.sink.add(3.14);
// 關(guān)閉 StreamController
controller.close();
}
輸出:
0
a, b, c, d
3.14
上述代碼中,我們聲明了一個(gè) StreamController,然后往往里面放置了三種不同的數(shù)據(jù)類型,當(dāng)然,我們也可以使用泛型的方式來限制里面的數(shù)據(jù)類型:
StreamController<int> controller = StreamController<int>();
接下來我們看看 stream.listne(...) 這個(gè)方法,源碼如下:
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError});
可以看出 listen(...) 方法接受一個(gè)必選參數(shù)和三個(gè)可選參數(shù),我們上一個(gè)例子中只是傳遞了必選參數(shù) onData(...),其他額三個(gè)參數(shù)并沒有傳遞,下面我們來舉例說明 listen(...) 方法中各個(gè)參數(shù)的用途。
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.listen(onData, onError: onError, onDone: onDone, cancelOnError: true);
controller.sink.add(0);
controller.sink.add(1);
// 發(fā)送一個(gè) Error
controller.sink.addError(-1);
controller.sink.add(2);
controller.close();
}
void onData(int data) {
print('The value is $data');
}
void onError(err) {
print('The err is $err');
}
void onDone() {
print('The stream is done !');
}
輸出:
The value is 0
The value is 1
The err is -1
從上述代碼中,我們可以看到,stream.listen(...) ,接受 4 個(gè)參數(shù),這 4 個(gè)參數(shù)的作用分別如下:
onData(T data) : 用來接收 Stream 中的每一個(gè)事件。
onError(...) : 注釋上是這樣說的 The [onError] callback must be of type void onError(error),也就是所他需要一個(gè)接受一個(gè)參數(shù)的方法。但它還支持接受兩個(gè)參數(shù)的方法 void onError(error, StackTrace stackTrace) 。
onDone() : 當(dāng)一個(gè) Stream 關(guān)閉了,也就是執(zhí)行了 stream.close() 方法并且發(fā)送了 done 事件,這個(gè)方法會被調(diào)用。
cancelOnError : 這是一個(gè) bool 類型的值,意思也很簡單,就是當(dāng) Stream 碰到 Error 事件的時(shí)候,是否關(guān)閉這個(gè) Stream。
我們上述代碼中 cancelOnError 參數(shù)傳遞的是 true ,也就是說當(dāng)Stream遇到 Error 的時(shí)候,Stream 就關(guān)閉了,下面的事件就不會再發(fā)送出去了。我們把上面代碼中的 cancelOnError 參數(shù)改為 false 其他的代碼不變,輸出如下:
The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !
可以看出,這個(gè)情況下,即使 Stream 中遇到了 Error ,下面的事件依然會接著發(fā)送,并且最后的 done 事件也執(zhí)行了。
Stream 類型
Stream 類型分為兩種,分別為 Single-subscription Streams 和 Broadcast Streams
Single-subscription Streams
這種類型的 Stream 只允許一個(gè)訂閱者,也就是只能 listen 一次。我們上一小節(jié)中的例子就是一個(gè) Single-subscription Streams。接下來我們看看如果我們對這種類型的 Stream 訂閱兩次會發(fā)生什么情況。
import 'dart:async';
void main() {
// Single-subscription Streams
StreamController<int> controller = StreamController<int>();
// 第一個(gè)訂閱者
StreamSubscription subscription1 =
controller.stream.listen((value) => print('subscription1 $value'));
// 第二個(gè)訂閱者
StreamSubscription subscription2 =
controller.stream.listen((value) => print('subscription2 $value'));
controller.sink.add(0);
controller.sink.add(1);
controller.close();
}
輸出:
Unhandled exception:
Bad state: Stream has already been listened to.
...
Broadcast Streams
這種類型的 Stream 允許任意數(shù)量的訂閱者,只是新的訂閱者只能從它開始訂閱的時(shí)候接收事件。也就是訂閱之前 Stream 中的事件是接受不到的。
Broadcast Streams 的聲明方式如下:
StreamController<int> controller = StreamController<int>.broadcast();
接下來我們寫個(gè)示例看看:
import 'dart:async';
void main() {
// Broadcast Streams
StreamController<int> controller = StreamController<int>.broadcast();
StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));
controller.sink.add(0);
controller.sink.add(1);
StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));
controller.sink.add(2);
controller.sink.add(3);
controller.close();
}
輸出:
sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3
可以看出 sub1 可以接收到 Stream 中所有的數(shù)據(jù),而 sub2 只能接收到從訂閱這個(gè) Stream 之后發(fā)送的數(shù)據(jù)。
StreamTransformer
當(dāng)數(shù)據(jù)通過 Stream 傳遞的時(shí)候,我們可以按需來轉(zhuǎn)換里面的數(shù)據(jù),Dart 中給我們提供了 StreamTransformer 來對數(shù)據(jù)做出一些特定轉(zhuǎn)換。
我們可以通過三種方式來實(shí)現(xiàn)數(shù)據(jù)轉(zhuǎn)換:
-
Stream自帶的方法,如map,where等 - 通過
StreamTransformer.fromHandlers(...)來轉(zhuǎn)換 - 直接實(shí)現(xiàn)一個(gè)
StreamTransformer來定義一個(gè)轉(zhuǎn)換器
map、where ...
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.where((value) => value % 2 == 0) // where
.map((value) => 'The value is $value') // map
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
輸出:
The value is 0
The value is 2
The value is 4
上述代碼中,我們使用了 where 和 map 轉(zhuǎn)換符。where 將滿足條件的值過濾出來,然后 map 將整型的數(shù)字轉(zhuǎn)換成字符串類型的值。
StreamTransformer.fromHandlers(...)
import 'dart:async';
// 轉(zhuǎn)換方法
void handleData(data, EventSink sink) {
if (data % 2 == 0) {
sink.add(data);
}
}
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(StreamTransformer.fromHandlers(handleData: handleData))
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
輸出:
0
2
4
自定義 StreamTransformer
在自定義我們自己的 Transformer 之前,我們先來看看 stream.transform(...) 做了什么事情,畢竟我們是通過 transform(...) 方法傳入的 Transformer, transform(...) 源碼如下:
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
可以看出 transform 方法返回的依然回一個(gè) Stream ,只不過這個(gè) Stream 是經(jīng)過轉(zhuǎn)換后的Stream。 類似于 Java8 Stream 中的中間流,就是不停的返回 Stream 的那種。然后該方法是一個(gè)泛型方法,泛型類型分別為 T,和 S,可以這樣理解,T 為該方法的入?yún)㈩愋停琒 為該方法的出參類型,類似于 Java8 中的 Function,我們可以看下 Java 中的 Function 接口的部分源碼:
/**
* ...
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
* * @since 1.8
*/
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
...
}
最后,transform(...) 方法調(diào)用了 streamTransformer.bind(this); 該方法返回的是一個(gè)新的 Stream,也就是轉(zhuǎn)換后的 Stream,當(dāng)然,bind() 方法也是我們自定義 StreamTransformer 時(shí)需要實(shí)現(xiàn)的方法。
我們在之前的例子中聲明一個(gè) StreamController 的時(shí)候,都是沒有傳遞參數(shù)的,其實(shí),StreamController 的構(gòu)造方法是這樣的:
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false}) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
可以看出,StreamController 是有參數(shù)的,并且都是可選參數(shù),在這些參數(shù)中,我們重點(diǎn)實(shí)現(xiàn) onListen(),關(guān)于 onListen 的源碼解釋如下:
A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.
重點(diǎn)就是 onListen 方法是用來生產(chǎn)事件的。
接下來,來實(shí)現(xiàn)一個(gè)自定義的 StreamTransformer:
/// 自定義一個(gè) StreamTransformer ,
/// 泛型類型 S 為入?yún)㈩愋停琓 為出參類型
/// 這些類型都是 Stream 中傳遞的數(shù)據(jù)類型
class MyTransformer<S, T> implements StreamTransformer<S, T> {
// 用來生成一個(gè)新的 Stream 并且控制符合條件的數(shù)據(jù)
StreamController _controller;
StreamSubscription _subscription;
bool cancelOrError;
// 轉(zhuǎn)換之前的 Stream
Stream<S> _stream;
MyTransformer({bool sync: false, this.cancelOrError}) {
_controller = new StreamController<T>(
onListen: _onListen,
onCancel: _onCancel,
onPause: () {
_subscription.pause();
},
onResume: () {
_subscription.resume();
},
sync: sync);
}
MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
// 定義一個(gè) StreamController,注意泛型類型為 T,也就是出參類型,因?yàn)? // 我們是使用該 _controller 生成一個(gè)用來返回的新的 Stream<T>
_controller = new StreamController<T>.broadcast(
onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
// _stream 為轉(zhuǎn)換之前的 Stream<S>
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOrError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
// 數(shù)據(jù)轉(zhuǎn)換
void onData(S data) {
if ((data as int) % 2 == 0) {
// 將符合條件的數(shù)據(jù)添加到新的 Stream 中
_controller.sink.add(data);
}
}
// 參數(shù)為轉(zhuǎn)換之前的 Stream<S>
// 返回的是一個(gè)新的 Stream<T> (轉(zhuǎn)換之后的 Stream)
@override
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
// TODO: implement cast
return null;
}
}
使用如下:
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(new MyTransformer()) // 自定義的 StreamTransformer
.listen((value) => print('$value'));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
輸出:
0
2
4
如有錯誤,還請指出。謝謝?。。?/p>