Dart 單線程模型
眾所周知,在Java中使用多線程來處理并發(fā)任務,適量并合適地使用多線程,能夠極大地提高資源的利用率和程序運行效率,但是缺點也比較明顯,比如過度開啟線程會帶來額外的資源和性能消耗或多線程共享內(nèi)存容易出現(xiàn)死鎖等。實際上,在APP的使用過程中,多數(shù)處理空閑狀態(tài),并不需要進行密集或高并發(fā)的處理,因此從某些意義上來說,多線程顯得有點多余。正是因為如此,Dart作為一種新的語言,通過引單線程模型很好地處理了并發(fā)任務對多線程的依賴。
1.1 單線程模型
Dart是一種單線程語言,因此Dart程序沒有主線程和子線程之分,而在Dart中線程并不是指Thread,而是指Isolate。因為Dart沒有線程的概念,只有Isolate,每個Isolate都是隔離的,并不會共享內(nèi)存。所有的Dart代碼都是在Isolate中運行,它就像機器上的一個小空間,具有自己的私有內(nèi)存塊和一個運行著事件循環(huán)模型的單線程。也就是說,一旦某個Dart函數(shù)開始執(zhí)行,它將執(zhí)行到這個函數(shù)的結(jié)束而不被其他Dart代碼打斷,這就是單線程的特性。
默認情況下,Dart程序只有一個Isolate(未自己創(chuàng)建的情況下),而這個Isolate就是Main Isolate。也就是說,一個Dart程序是從Main Isolate的main函數(shù)開始的,而在main函數(shù)結(jié)束后,Main isolate線程開始一個一個處理事件循環(huán)模型隊列中的每一事件(Event)。上圖描述的就是Main Isolate的消息循環(huán)模型。
1.2 事件循環(huán)模型
也許你會問,既然Dart是一種單線程語言,那么是不是就意味著Dart無法并發(fā)處理異步任務了?此言差矣。前面說到,所有的Dart程序都在Isolate中運行,每個Isolate擁有自己的私有內(nèi)存塊和一個事件循環(huán)模型,其中,事件循環(huán)模型就是用來處理各種事件,比如點輸入/輸出,點擊,定時器以及異步任務等。下圖描述了一個Isolate的事件循環(huán)模型的整個流程:

從上圖可知,Dart事件循環(huán)機制由一個消息循環(huán)(event looper)和兩個消息隊列構(gòu)成,其中,兩個消息隊列是指事件隊列(event queue)和微任務隊列(Microtask queue)。該機制運行原理為:
- 首先,Dart程序從main函數(shù)開始運行,待main函數(shù)執(zhí)行完畢后,event looper開始工作;
- 然后,event looper優(yōu)先遍歷執(zhí)行Microtask隊列所有事件,直到Microtask隊列為空;
- 接著,event looper才遍歷執(zhí)行Event隊列中的所有事件,直到Event隊列為空;
- 最后,視情況退出循環(huán)。
為了進一步理解,我們解釋下上述三個概念:
(1)消息循環(huán)(Event Looper)
顧名思義,消息循環(huán)就是指一個永不停歇且不能阻塞的循環(huán),它將不停的嘗試從微任務隊列和事件隊列中獲取事件(event)進行處理,而這些Event包括了用戶輸入,點擊,Timer,文件IO等。

(2)事件隊列(Event queue)
該隊列的事件來源于外部事件和Future,其中,外部事件主要包括I/O,手勢,繪制,計時器和isolate相互通信的message等,而Future主要是指用戶自定義的異步任務,通過創(chuàng)建Future類實例來向事件隊列添加事件。需要注意的是,當Event looper正在處理Microtask Queue時,Event queue會被阻塞,此時APP將無法進行UI繪制,響應用戶輸入和I/O等事件。下列示例演示了向Event queue中添加一個異步任務事件:
main(List<String> args) {
print('main start...')
var futureInstance = Future<String>(() => "12345");
futureInstance.then((res) {
print(res);
}).catchError((err) {
print(err);
});
print('main end...')
}
// 打印結(jié)果:
// main start...
// main end...
// 12345
(3)微任務隊列(Microtask queue)
該隊列的事件來源與當前isolate的內(nèi)部或通過scheduleMicrotask函數(shù)創(chuàng)建,Microtask一般用于非常短的內(nèi)部異步動作,并且任務量非常少,如果微任務非常多,就會造成Event queue排不上隊,會阻塞Event queue的執(zhí)行造成應用ANR,因為Microtask queue的優(yōu)先級高于Event queue。因此,大多數(shù)情況下的任務優(yōu)先考慮使用Event queue,不到萬不得已不要使用Microtask queue。下列 示例演示了兩個事件隊列執(zhí)行情況:
iimport 'dart:async';
void main(List<String> args) {
print('main start...');
/// 創(chuàng)建異步event queue
var futureInstance = Future<String>(() => "12345");
futureInstance.then((res) {
print(res);
}).catchError((err) {
print(err);
});
/// 執(zhí)行函數(shù)
stark();
/// scheduleMicrotask函數(shù)創(chuàng)建微任務MircroTask queue
scheduleMicrotask(() => print('microtask #1 of 2'));
print('main end...');
/// scheduleMicrotask函數(shù)創(chuàng)建微任務MircroTask queue
scheduleMicrotask(() => print('microtask #2 of 2'));
}
void stark() async {
print('stark start...');
/// 創(chuàng)建異步event queue
var futureInstance = Future<String>(() => "900303");
/// 異步等待
var rep = await futureInstance;
print(rep);
/// 創(chuàng)建異步event queue
var futureInstance1 = Future<String>(() => "xxxxx");
futureInstance1.then((res) {
print(res);
}).catchError((err) {
print(err);
});
print('stark end...');
}
// 執(zhí)行順序:
// main start...
// stark start...
// main end...
// microtask #1 of 2
// microtask #2 of 2
// 12345
// 900303
// stark end...
// xxxxx
1.3 Isolate
大多數(shù)計算機中,甚至在移動平臺上,都在使用多核CPU。 為了有效利用多核性能,開發(fā)者一般使用共享內(nèi)存數(shù)據(jù)來保證多線程的正確執(zhí)行。 然而多線程共享數(shù)據(jù)通常會導致很多潛在的問題,并導致代碼運行出錯。Dart作為一種新語言,為了緩解上述問題,提出了Isolate(隔離區(qū))的概念,即Dart沒有線程的概念,只有Isolate,所有的Dart代碼都是在Isolate中運行,它就像是機器上的一個小空間,具有自己的私有內(nèi)存堆和一個運行著Event Looper的單個線程。
通常,一個Dart應用對應著一個Main Isolate,且應用的入口即為該Isolate的main函數(shù)。當然,我們也可以創(chuàng)建其它的Isolate,由于Isolate的內(nèi)存堆是私有的,因此這些Isolate的內(nèi)存都不會被其它Isolate訪問。假如不同的Isolate需要通信(單向/雙向),就只能通過向?qū)Ψ降氖录h(huán)隊列里寫入任務,并且它們之間的通訊方式是通過port(端口)實現(xiàn)的,其中,Port又分為receivePort(接收端口)和sendPort(發(fā)送端口),它們是成對出現(xiàn)的。Isolate之間通信過程:
- 首先,當前Isolate創(chuàng)建一個ReceivePort對象,并獲得對應的SendPort對象;
var receivePort = ReceivePort();
var sendPort = receivePort.sendPort;
- 其次,創(chuàng)建一個新的Isolate,并實現(xiàn)新Isolate要執(zhí)行的異步任務,同時,將當前Isolate的SendPort對象傳遞給新的Isolate,以便新Isolate使用這個SendPort對象向原來的Isolate發(fā)送事件;
// 調(diào)用Isolate.spawn創(chuàng)建一個新的Isolate
// 這是一個異步操作,因此使用await等待執(zhí)行完畢
var anotherIsolate = await Isolate.spawn(otherIsolateInit, receivePort.sendPort);
// 新Isolate要執(zhí)行的異步任務
// 即調(diào)用當前Isolate的sendPort向其receivePort發(fā)送消息
void otherIsolateInit(SendPort sendPort) async {
value = "Other Thread!";
sendPort.send("BB");
}
- 第三,調(diào)用當前Isolate#receivePort的listen方法監(jiān)聽新的Isolate傳遞過來的數(shù)據(jù)。Isolate之間什么數(shù)據(jù)類型都可以傳遞,不必做任何標記。
receivePort.listen((date) {
print("Isolate 1 接受消息:data = $date");
});
- 最后,消息傳遞完畢,關閉新創(chuàng)建的Isolate。
import 'dart:isolate';
var anotherIsolate;
var value = "Now Thread!";
void startOtherIsolate() async {
value = " Isolate1 Thread!";
/// 獲取當前 Isolate 接收對象
var receivePort = ReceivePort();
receivePort.listen((message) {
if (message is String) {
print("Isolate1 接受消息:data = $message; value = $value");
} else if (message is SendPort) {
message.send("我來自 isolate1 發(fā)送 消息");
}
});
/// 創(chuàng)建 Isolate 對象
anotherIsolate = await Isolate.spawn(otherIsolateInit, receivePort.sendPort);
}
void otherIsolateInit(SendPort sendPort) async {
value = " Isolate2 Thread!";
/// 獲取當前 Isolate 接收對象
var receivePort2 = ReceivePort();
receivePort2.listen((message) {
print("Isolate2 接受消息:message = $message; value = $value ");
});
/// 發(fā)送消息
sendPort.send("加油");
sendPort.send(receivePort2.sendPort);
}
void main(List<String> args) {
startOtherIsolate();
}
==============執(zhí)行結(jié)果===========
Isolate1 接受消息:data = 加油; value = Isolate1 Thread!
Isolate2 接受消息:message = 我來自 isolate1 發(fā)送 消息; value = Isolate2 Thread!
Isolate 處理耗時操作使用場景
import 'dart:isolate';
var anotherIsolate;
var value = "Now Thread!";
void startOtherIsolate() async {
value = " Isolate1 Thread!";
int count = 1000000000;
/// 獲取當前 Isolate 接收對象
var receivePort = ReceivePort();
receivePort.listen((message) {
if (message is String) {
print("Isolate1 接收消息:message = $message; value = $value");
} else if (message is SendPort) {
message.send("我來自 isolate1 發(fā)送 消息");
} else if (message is int) {
print('Isolate1 接接int消息: $message');
}
});
/// 創(chuàng)建 Isolate 對象
anotherIsolate = await Isolate.spawn(
otherIsolateInit, {"sendPort": receivePort.sendPort, "params": count});
}
void otherIsolateInit(Map<String, dynamic> info) async {
SendPort sendPort = info['sendPort'];
int count = info['params'];
value = " Isolate2 Thread!";
print('otherIsolateInit start...');
/// 獲取當前 Isolate 接收對象
var receivePort2 = ReceivePort();
receivePort2.listen((message) {
print("Isolate2 接受消息:message = $message; value = $value ");
});
/// 發(fā)送消息
sendPort.send(receivePort2.sendPort);
/// 處理耗時任務
/// 耗時任務會阻塞該isolate、不會阻塞main isolate
int _sum = 0;
int _index = 0;
while (_index <= count) {
_sum += _index;
_index++;
}
sendPort.send(_sum);
print('otherIsolateInit end...');
}
void main(List<String> args) {
startOtherIsolate();
}
============輸入日志===============
otherIsolateInit start...
otherIsolateInit end...
Isolate1 接接int消息: 500000000500000000
Isolate2 接受消息:message = 我來自 isolate1 發(fā)送 消息; value = Isolate2 Thread!