TPL之Dataflow

Dataflow是啥

Dataflow是由微軟提供的一個(gè)用于異步或者并發(fā)的庫(kù)。是runtime的一部分但不隨runtime分發(fā)。需要手動(dòng)添加nuget包·System.Threading.Tasks.Dataflow·。

Dataflow的工作方式

顧名思義,Dataflow庫(kù)的工作方式就是按數(shù)據(jù)流的工作方式工作。將你需要做的事情拆分成到各個(gè)步驟,然后把步驟連接起來(lái),就構(gòu)建好了你的數(shù)據(jù)流。Dataflow將步驟定義為各種的Block。你只需要關(guān)心如何構(gòu)建Block并進(jìn)行鏈接,其他的工作交給Dataflow來(lái)完成。你不需要去關(guān)心數(shù)據(jù)具體是怎么在各個(gè)Block之間傳遞和緩存,也不用關(guān)心如何去給各個(gè)模塊分配到線程去執(zhí)行。當(dāng)然,如何取消執(zhí)行Dataflow也幫你搞定了。

假設(shè)我們的工作需要四個(gè)Block來(lái)完成,每個(gè)Block需要一秒來(lái)完成。那么執(zhí)行一次需要4秒,順序執(zhí)行20次需要80秒。如果交給Dataflow來(lái)做,需要多久呢?PS:每個(gè)Block默認(rèn)同時(shí)只能存在一個(gè)實(shí)例運(yùn)行,你可以自行更改。屬性是MaxDegreeOfParallelism

執(zhí)行示意圖

如圖所示,在第四秒開始,四個(gè)Block會(huì)同時(shí)執(zhí)行,也就是說(shuō),理論上只要24秒就可以完成。當(dāng)然中間會(huì)有額外的開銷,比如調(diào)度啊,數(shù)據(jù)傳遞啊。你可能也注意到,時(shí)間減少的程度和模塊的數(shù)量有關(guān)。是的,這是因?yàn)槟J(rèn)MaxDegreeOfParallelism = 1。更改這個(gè)屬性或者添加更多的模塊會(huì)使執(zhí)行速度更快,但可能會(huì)引入其他的問(wèn)題。
這里也只是舉了個(gè)簡(jiǎn)單的例子。實(shí)際使用中,Dataflow可以構(gòu)建更加復(fù)雜的數(shù)據(jù)流,形成一個(gè)網(wǎng)絡(luò)。在業(yè)務(wù)更加復(fù)雜的情況下,Dataflow 的優(yōu)勢(shì)更能體現(xiàn),即在保證基本性能的情況下,提升開發(fā)效率。

Dataflow僅適用于特定情況

以我目前對(duì)Dataflow 的了解,我認(rèn)為Dataflow適用于對(duì)性能有一定的追求,但不追求極致,同時(shí)對(duì)靈活性有一定的要求,開發(fā)人員的水平又有限的情況。同時(shí)任務(wù)又需要有一定的復(fù)雜度。
我這里舉一個(gè)簡(jiǎn)單例子:

using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

Console.WriteLine("Hello, World!");

var bufferBlock = new BufferBlock<int[]>();

var transferBlock = new TransformBlock<int[], double[]>(i => { Thread.Sleep(1000); return i.Select(ii => (double)ii).ToArray(); });

var transformBlock1 = new TransformBlock<double[], int>(d => { Thread.Sleep(1000); return (int)d.Sum(); });

var actionBlock = new ActionBlock<int>(i => { Thread.Sleep(1000); Console.WriteLine(i); });

bufferBlock.LinkTo(transferBlock);
bufferBlock.Completion.ContinueWith(delegate { transferBlock.Complete(); });

transferBlock.LinkTo(transformBlock1);
transferBlock.Completion.ContinueWith(delegate { transformBlock1.Complete(); });

transformBlock1.LinkTo(actionBlock);
transformBlock1.Completion.ContinueWith(delegate { actionBlock.Complete(); });

Stopwatch watch = Stopwatch.StartNew();
for (var i = 0; i < 20; ++ i)
{
    bufferBlock.Post(new int[1024 * 100]);
}

bufferBlock.Complete();

await actionBlock.Completion;
watch.Stop();
Console.WriteLine($"Dataflow costs {watch.ElapsedMilliseconds}");
// Output: Dataflow costs 22498

watch.Restart();
Parallel.For(0, 20, i =>
{
    var data = new int[1024 * 100];
    Thread.Sleep(1000);
    var tempData = data.Select(i=>(double)i).ToArray();
    Thread.Sleep(1000);
    var sum = tempData.Sum();
    Thread.Sleep(1000);
    Console.WriteLine(sum);
});
watch.Stop();
Console.WriteLine($"Parallel costs {watch.ElapsedMilliseconds}");
// Output: Parallel costs 9247

watch.Restart();
for(int i = 0; i < 20; ++ i)
{
    var data = new int[1024 * 100];
    Thread.Sleep(1000);
    var tempData = data.Select(i=>(double)i).ToArray();
    Thread.Sleep(1000);
    var sum = tempData.Sum();
    Thread.Sleep(1000);
    Console.WriteLine(sum);
}
watch.Stop();
Console.WriteLine($"Sync costs {watch.ElapsedMilliseconds}");
// Output: Sync costs 61329

可以看到,在不更改默認(rèn)配置的情況下,我們使用Parallel會(huì)更快。當(dāng)然你可以跟我較真說(shuō),只要稍微修改Dataflow就可以達(dá)到同樣的性能。但不理解Dataflow的工作模式,去更改設(shè)置可能會(huì)導(dǎo)致很多多線程的bug出現(xiàn)。理解的人呢,可能又有更好的解決方案,不需要用Dataflow。我猜,這大概就是為啥微軟不把這個(gè)庫(kù)隨runtime發(fā)布的原因吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容