flutter開發(fā) 異步編程以及eventbus(五)

isolate機制

? Dart是基于單線程模型的語言。但是在開發(fā)當中我們經(jīng)常會進行耗時操作比如網(wǎng)絡(luò)請求,這種耗時操作會堵塞我們的代碼,所以在Dart也有并發(fā)機制,名叫isolate。APP的啟動入口main函數(shù)就是一個類似Android主線程的一個主isolate。和Java的Thread不同的是,Dart中的isolate無法共享內(nèi)存。

isolate.png
import 'dart:isolate';

int i;

void main() {
  i = 10;
  //創(chuàng)建一個消息接收器
  ReceivePort receivePort = new ReceivePort();
  //創(chuàng)建isolate
  Isolate.spawn(isolateMain, receivePort.sendPort);

  //接收其他isolate發(fā)過來的消息
  receivePort.listen((message) {
    //發(fā)過來sendPort,則主isolate也可以向創(chuàng)建的isolate發(fā)送消息
    if (message is SendPort) {
      message.send("好呀好呀!");
    } else {
      print("接到子isolate消息:" + message);
    }
  });
}

/// 新isolate的入口函數(shù)
void isolateMain(SendPort sendPort) {
  // isolate是內(nèi)存隔離的,i的值是在主isolate定義的所以這里獲得null
  print(i);

  ReceivePort receivePort = new ReceivePort();
  sendPort.send(receivePort.sendPort);


  // 向主isolate發(fā)送消息
  sendPort.send("發(fā)送信息");


  receivePort.listen((message) {
    print("接到主isolate消息:" + message);
  });
}

event-loop

? 可以看到代碼中,我們接收消息使用了listene函數(shù)來監(jiān)聽消息。假設(shè)我們現(xiàn)在在main方法最后加入sleep休眠,會不會影響listene回調(diào)的時機?

import 'dart:io';
import 'dart:isolate';

int i;

void main() {
  i = 10;
  //創(chuàng)建一個消息接收器
  ReceivePort receivePort = new ReceivePort();
  //創(chuàng)建isolate
  Isolate.spawn(isolateMain, receivePort.sendPort);

  //接收其他isolate發(fā)過來的消息
  receivePort.listen((message) {
    //發(fā)過來sendPort,則主isolate也可以向創(chuàng)建的isolate發(fā)送消息
    if (message is SendPort) {
      message.send("好呀好呀!");
    } else {
      print("接到子isolate消息:" + message);
    }
  });

  //增加休眠,是否會影響listen的時機?
  sleep(Duration(seconds: 2));
  print("休眠完成");
}

/// 新isolate的入口函數(shù)
void isolateMain(SendPort sendPort) {
  // isolate是內(nèi)存隔離的,i的值是在主isolate定義的所以這里獲得null
  print(i);

  ReceivePort receivePort = new ReceivePort();
  sendPort.send(receivePort.sendPort);
  // 向主isolate發(fā)送消息
  sendPort.send("發(fā)送消息");


  receivePort.listen((message) {
    print("接到主isolate消息:" + message);
  });
}

? 結(jié)果是大概2s后,我們的listene才打印出其他isolate發(fā)過來的消息。同Android Handler類似,在Dart運行環(huán)境中也是靠事件驅(qū)動的,通過event loop不停的從隊列中獲取消息或者事件來驅(qū)動整個應(yīng)用的運行,isolate發(fā)過來的消息就是通過loop處理。但是不同的是在Android中每個線程只有一個Looper所對應(yīng)的MessageQueue,而Dart中有兩個隊列,一個叫做event queue(事件隊列),另一個叫做microtask queue(微任務(wù)隊列)。

消息機制.png

? Dart在執(zhí)行完main函數(shù)后,就會由Loop開始執(zhí)行兩個任務(wù)隊列中的Event。首先Loop檢查微服務(wù)隊列,依次執(zhí)行Event,當微服務(wù)隊列執(zhí)行完后,就檢查Event queue隊列依次執(zhí)行,在執(zhí)行Event queue的過程中,沒執(zhí)行完一個Event就再檢查一次微服務(wù)隊列。所以微服務(wù)隊列優(yōu)先級高,可以利用微服務(wù)進行插隊。

我們先來看個例子:


import 'dart:io';

void main(){
  new File("/Users/enjoy/a.txt").readAsString().then((content){
      print(content);
  });
  while(true){}
}

文件內(nèi)容永遠也無法打印出來,因為main函數(shù)還沒執(zhí)行完。而then方法是由Loop檢查Event queue執(zhí)行的。

如果需要往微服務(wù)中插入Event進行插隊:


import 'dart:async';
import 'dart:io';
//結(jié)果是限制性了microtask然后執(zhí)行then方法。
void main(){
  new File("/Users/enjoy/a.txt").readAsString().then((content){
      print(content);
  });
  //future內(nèi)部就是調(diào)用了 scheduleMicrotask
  Future.microtask((){
    print("future: excute microtask");

  });
//  scheduleMicrotask((){
//    print("");
//  });

}

Future

? 在 Dart 庫中隨處可見 Future 對象,通常異步函數(shù)返回的對象就是一個 Future。 當一個 future 執(zhí)行完后,他里面的值 就可以使用了,可以使用 then() 來在 future 完成的時候執(zhí)行其他代碼。Future對象其實就代表了在事件隊列中的一個事件的結(jié)果。

異常

//當給到一個不存在的文件地址時會發(fā)生異常,這時候可以利用catchError捕獲此異常。
//then().catchError() 模式就是異步的 try-catch。
new File("/Users/enjoy/a1.txt").readAsString().then((content) {
    print(content);
  }).catchError((e, s) {
    print(s);
  });

組合

then()的返回值同樣是一個future對象,可以利用隊列的原理進行組合異步任務(wù)

  new File("/Users/enjoy/a.txt").readAsString().then((content) {
    print(content);
    return 1; //1被轉(zhuǎn)化為 Future<int>類型 返回
  }).then((i){
    print(i);
  }).catchError((e, s) {
    print(s);
  });

上面的方式是等待執(zhí)行完成讀取文件之后,再執(zhí)行一個新的future。如果我們需要等待一組任務(wù)都執(zhí)行完成再統(tǒng)一處理一些事情,可以通過wait()完成。

 Future readDone = new File("/Users/enjoy/a.txt").readAsString();
  //延遲3s
  Future delayedDone = Future.delayed(Duration(seconds: 3));

  Future.wait([readDone, delayedDone]).then((values) {
     print(values[0]);//第一個future的結(jié)果
     print(values[1]);//第二個future的結(jié)果
  });

Stream

? Stream(流) 在 Dart API 中也經(jīng)常出現(xiàn),表示發(fā)出的一系列的異步數(shù)據(jù)。 Stream 是一個異步數(shù)據(jù)源,它是 Dart 中處理異步事件流的統(tǒng)一 API。

? Future 表示稍后獲得的一個數(shù)據(jù),所有異步的操作的返回值都用 Future 來表示。但是 Future 只能表示一次異步獲得的數(shù)據(jù)。而 Stream 表示多次異步獲得的數(shù)據(jù)。比如 IO 處理的時候,每次只會讀取一部分數(shù)據(jù)和一次性讀取整個文件的內(nèi)容相比,Stream 的好處是處理過程中內(nèi)存占用較小。而 File 的 readAsString()是一次性讀取整個文件的內(nèi)容進來,雖然獲得完整內(nèi)容處理起來比較方便,但是如果文件很大的話就會導(dǎo)致內(nèi)存占用過大的問題。

 new File("/Users/enjoy/app-release.apk").openRead().listen((List<int> bytes) {
    print("stream執(zhí)行"); //執(zhí)行多次
  });

  new File("/Users/enjoy/app-release.apk").readAsBytes().then((_){
    print("future執(zhí)行"); //執(zhí)行1次
  });

listen()其實就是訂閱這個Stream,它會返回一個StreamSubscription訂閱者。訂閱者肯定就提供了取消訂閱的cancel(),去掉后我們的listen中就接不到任何信息了。除了cancel()取消方法之外,我們還可以使用onData()重置listene方法,onDone監(jiān)聽完成等等操作。

  StreamSubscription<List<int>> listen = new File("/Users/enjoy/app-release.apk").openRead().listen((List<int> bytes) {
    print("stream執(zhí)行");
  });
  listen.onData((_){
    print("替代listene");
  });
  listen.onDone((){
    print("結(jié)束");
  });
  listen.onError((e,s){
    print("異常");
  });
  //暫停,如果沒有繼續(xù)則會退出程序
  listen.pause();
  //繼續(xù)
  listen.resume();

廣播模式

? Stream有兩種訂閱模式:單訂閱和多訂閱。單訂閱就是只能有一個訂閱者,上面的使用我們都是單訂閱模式,而廣播是可以有多個訂閱者。通過 Stream.asBroadcastStream() 可以將一個單訂閱模式的 Stream 轉(zhuǎn)換成一個多訂閱模式的 Stream,isBroadcast 屬性可以判斷當前 Stream 所處的模式。

var stream = new File("/Users/enjoy/app-release.apk").openRead();
  stream.listen((List<int> bytes) {
  });
  //錯誤 單訂閱只能有一個訂閱者
//  stream.listen((_){
//    print("stream執(zhí)行");
//  });

  var broadcastStream = new File("/Users/enjoy/app-release.apk").openRead().asBroadcastStream();
  broadcastStream.listen((_){
    print("訂閱者1");
  });
  broadcastStream.listen((_){
    print("訂閱者2");
  });

需要注意的是,多訂閱模式如果沒有及時添加訂閱者則可能丟數(shù)據(jù)。

//默認是單訂閱
  var stream = Stream.fromIterable([1, 2, 3]);
  //3s后添加訂閱者 不會丟失數(shù)據(jù)
  new Timer(new Duration(seconds: 3), () => stream.listen(print));

  //創(chuàng)建一個流管理器 對一個stream進行管理
  var streamController = StreamController.broadcast();
  //添加
  streamController.add(1);
  //先發(fā)出事件再訂閱 無法接到通知
  streamController.stream.listen((i){
    print("broadcast:$i");
  });
  //記得關(guān)閉
  streamController.close();


  //這里沒有丟失,因為stream通過asBroadcastStream轉(zhuǎn)為了多訂閱,但是本質(zhì)是單訂閱流,并不改變原始 stream 的實現(xiàn)特性
  var broadcastStream = Stream.fromIterable([1, 2, 3]).asBroadcastStream();
  new Timer(new Duration(seconds: 3), () => broadcastStream.listen(print));

我們也可以通過廣播實現(xiàn)一個eventbus

class EventBus {
  static EventBus _instanse;
  StreamController _streamController;

  factory EventBus.getDefault() {
    return _instanse ??= EventBus._instanse;
  }

  EventBus._internal() {
    _streamController = StreamController.broadcast();
  }

  StreamSubscription<T> regist<T>(void onData(T event)) {
    if (T is dynamic) {
      return _streamController.stream.listen(onData);
    } else {
      Stream<T> stream =
          _streamController.stream.where((type) => type is T).cast<T>();
     return stream.listen(onData);
    }
  }

  void post(event) {
    _streamController.add(event);
  }

  void destory() {
    _streamController.close();
  }
}

async/await

? 使用asyncawait的代碼是異步的,但是看起來很像同步代碼。當我們需要獲得A的結(jié)果,再執(zhí)行B,時,你需要then()->then(),但是利用asyncawait能夠非常好的解決回調(diào)地獄的問題:

void readFile(void callback(s)) {
  String result;
  new File("/Users/xiang/enjoy/a.txt").readAsString().then((s) {
    result += s;
    return new File("/Users/xiang/enjoy/a.txt").readAsString();
  }).then((s) {
    result += s;
  }).whenComplete((){
    print(result);
    callback(result);
  });
}
//async 表示這是一個異步方法,await必須再async方法中使用
//異步方法只能返回 void和Future
Future<String> readFile() async {
  //await 等待future執(zhí)行完成再執(zhí)行后續(xù)代碼
  String content = await new File("/Users/xiang/enjoy/a.txt").readAsString();
  String content2 = await new File("/Users/xiang/enjoy/a.txt").readAsString();
  //自動轉(zhuǎn)換為 future
  return content;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容