分享工程算法在業(yè)務(wù)開發(fā)中的應(yīng)用(二)并發(fā)編程中的數(shù)據(jù)結(jié)構(gòu)

之前寫過(guò)篇分享,結(jié)合工作內(nèi)容聊了聊寫業(yè)務(wù)代碼時(shí)會(huì)用到的工程算法;這次繼續(xù)本系列的分享,結(jié)合今年新寫的代碼,聊一聊并發(fā)編程中會(huì)用到的數(shù)據(jù)結(jié)構(gòu)。

一、雖然無(wú)鎖但是并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)

寫java時(shí),常用的并發(fā)容器大部分都是基于鎖的,最基礎(chǔ)的是使用粗粒度鎖,如Collections.synchronizedCollection(Collection<T> c),如ArrayBlockingQueue,優(yōu)點(diǎn)是簡(jiǎn)單好實(shí)現(xiàn),但缺點(diǎn)是粒度粗導(dǎo)致并發(fā)吞吐量不高;優(yōu)化后的是基于細(xì)粒度鎖的容器,如ConcurrentHashMap,里面塞滿了各種優(yōu)化(結(jié)合只用一個(gè)字母命名的變量、層層大括號(hào)嵌套,簡(jiǎn)直讓人沒(méi)法點(diǎn)進(jìn)去看)
基于鎖的并發(fā)容器易于實(shí)現(xiàn),但也有些缺點(diǎn),包括:
線程調(diào)度帶來(lái)的性能損失;
活躍度風(fēng)險(xiǎn),如死鎖,如持有鎖的線程掛起導(dǎo)致其他線程不能繼續(xù)處理,優(yōu)先級(jí)倒置等。
相比之下,一些雖然無(wú)鎖但是并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)在性能上更好,同時(shí)也能避免死鎖等活躍度問(wèn)題。

1.1. 無(wú)鎖并發(fā)讀

雖然無(wú)鎖但是并發(fā)讀安全的數(shù)據(jù)結(jié)構(gòu)有:

1.1.1. 不可變數(shù)據(jù)結(jié)構(gòu)

如果不可變數(shù)據(jù)結(jié)構(gòu)的讀操作不改變內(nèi)部狀態(tài),那么稍加改造就能拿來(lái)當(dāng)并發(fā)容器。這里說(shuō)稍加改造,是因?yàn)樾枰紤]安全發(fā)布,如果不可變數(shù)據(jù)結(jié)構(gòu)內(nèi)部的變量沒(méi)有加volatile、final關(guān)鍵字,很可能初始化時(shí)候?qū)戇M(jìn)去的內(nèi)容沒(méi)法被其他線程看見,因此需要我們?nèi)藶榧由蟜inal、volatile,保證安全發(fā)布。

典型例子是項(xiàng)目中很多配置數(shù)據(jù)放在guava的ImmutableMap里,允許并發(fā)讀;
再一個(gè)例子是,上游傳來(lái)的數(shù)據(jù)有A,B,C三個(gè)字段,其中C可能為空(只給A、B),或者B、C都為空(只給A),下游需要靠這3個(gè)字段唯一標(biāo)識(shí)一個(gè)業(yè)務(wù)類型。
這個(gè)場(chǎng)景用Trie寫比較優(yōu)雅,于是實(shí)現(xiàn)了一個(gè)不可變Trie

1.1.2. Copy-on-Write數(shù)據(jù)結(jié)構(gòu)

不可變數(shù)據(jù)結(jié)構(gòu)雖好,但有很多情況不適用,比如確實(shí)有很小的可能改動(dòng),比如限于框架的生命周期,不好用不可變數(shù)據(jù)結(jié)構(gòu)。例如項(xiàng)目中開發(fā)了給其他系統(tǒng)用的sdk,sdk中提供了回調(diào)鉤子,業(yè)務(wù)系統(tǒng)只需實(shí)現(xiàn)回調(diào)鉤子,當(dāng)sdk收到消息后即可回調(diào)通知業(yè)務(wù)系統(tǒng)。在這個(gè)場(chǎng)景中,系統(tǒng)啟動(dòng)后由spring把實(shí)現(xiàn)類注入到sdk的容器中,此時(shí)容器得等spring注入完了才“不可變”,在注入之前是可變的,因此不能用不可變?nèi)萜鳌_@種場(chǎng)景用Copy-on-write數(shù)據(jù)結(jié)構(gòu)即可。
另外值得一提的是,jdk里的copy-on-write數(shù)據(jù)結(jié)構(gòu)在copy時(shí)比較粗暴、直接把所有數(shù)據(jù)全復(fù)制了一遍,更好的實(shí)現(xiàn)是用可持久化數(shù)據(jù)結(jié)構(gòu),每次寫操作只復(fù)制一小部分?jǐn)?shù)據(jù)、沒(méi)必要復(fù)制全家。我在輪自己的copy-on-write可持久化數(shù)據(jù)結(jié)構(gòu)庫(kù),沒(méi)寫完就不貼了。

1.2. 無(wú)鎖并發(fā)讀寫

既然能做到無(wú)鎖并發(fā)讀,我們自然會(huì)想到,能否做到無(wú)鎖并發(fā)讀寫,并且并發(fā)安全?
理論上管這類算法叫非阻塞(non-blocking)算法,意思是我們嫌棄基于鎖的算法隔離性太差,當(dāng)持有鎖的線程死鎖/掛起等情況發(fā)生時(shí)所有等待鎖的線程都會(huì)阻塞住沒(méi)法前進(jìn),于是我們就想:能不能搞出一類隔離性好的算法,一個(gè)線程出問(wèn)題了不會(huì)導(dǎo)致別的線程阻塞?

An algorithm is called non‐blocking if
failure or suspension of any thread cannot cause failure or suspension of another thread

按照提供的保證由弱到強(qiáng),非阻塞算法可以分為以下幾類,感興趣可以看看https://www.youtube.com/watch?v=DdAV7891-OAhttps://www.baeldung.com/lock-free-programming 這里不細(xì)展開。

  • Obstruction-Free

a thread is guaranteed to proceed if all other threads are suspended.

  • Lock-Free
    無(wú)論何時(shí),無(wú)論出現(xiàn)了啥情況,總有一個(gè)線程能繼續(xù)處理。
    有了Lock-Free級(jí)別的保證,我們不用擔(dān)心一個(gè)死鎖導(dǎo)致全家阻塞的情況,但是還是有可能出現(xiàn)饑餓的情況:比如某線程一直在等待cpu調(diào)度執(zhí)行,但cpu偏心,一直調(diào)度別的線程不調(diào)度它。

  • Wait-Free
    每個(gè)線程都能得到保證,保證在有限步驟后能夠繼續(xù)執(zhí)行。因此也避免了饑餓情況

我們?cè)谌粘i_發(fā)時(shí)常用的是Lock-Free級(jí)別的容器,它們往往通過(guò)cas等原語(yǔ)實(shí)現(xiàn),比如juc包下面的ConcurrentLinkedQueue,ConcurrentLinkedDeque,比如ConcurrentSkipListMap里也有用到(我沒(méi)細(xì)看),又比如Disruptor等。相關(guān)討論見https://www.quora.com/Are-there-any-non-blocking-lock-free-concurrent-data-structure-libraries-for-Java
基于cas實(shí)現(xiàn)的lock-free數(shù)據(jù)結(jié)構(gòu)好處是在并發(fā)度不高的時(shí)候吞吐量高、線程隔離性好,但缺點(diǎn)是并發(fā)度非常高的時(shí)候,自旋cas會(huì)浪費(fèi)CPU,吞吐量反而會(huì)降低。也因此,jdk雖然已經(jīng)提供了一個(gè)基于cas的Random類,后來(lái)又加了個(gè)ThreadLocalRandom,在并發(fā)度高的情況下減少overhead。

1.3. 避免共享:ThreadLocal

沒(méi)有共享就沒(méi)有并發(fā)安全問(wèn)題,ThreadLocal很基礎(chǔ)這里就不細(xì)講了。
一個(gè)例子是寫業(yè)務(wù)代碼時(shí)候經(jīng)常用到SimpleDateFormat,可以用ThreadLocal實(shí)現(xiàn)一個(gè)對(duì)象池,免得頻繁創(chuàng)建對(duì)象。

二、能夠?qū)崿F(xiàn)延時(shí)任務(wù)/定時(shí)任務(wù)的數(shù)據(jù)結(jié)構(gòu)

項(xiàng)目中遇到了一個(gè)問(wèn)題:日志量太大,磁盤很快就滿了。于是想針對(duì)日志做一些優(yōu)化,其中一個(gè)優(yōu)化點(diǎn)是對(duì)于性能監(jiān)控的日志,原先每次調(diào)用打一條,可以優(yōu)化成按秒/按分鐘聚合日志,每秒或每分鐘打一條,記錄一分鐘調(diào)用次數(shù)、平均耗時(shí)。那么如何實(shí)現(xiàn)這個(gè)功能呢?

2.1. 延遲隊(duì)列DelayQueue

剛好在我的RPC框架福報(bào)RPC里用延遲隊(duì)列實(shí)現(xiàn)過(guò)這個(gè)功能,原理就是每次要打某個(gè)key的性能監(jiān)控日志時(shí),就生成當(dāng)前key當(dāng)前秒的任務(wù),把任務(wù)塞到延遲隊(duì)列里等下一秒打印;同時(shí)把任務(wù)塞到hashMap里作為索引,map的key為"key@時(shí)間戳"(比如"key123@2020-09-09 00:00:00"),當(dāng)同一秒內(nèi)再打同一個(gè)key的日志時(shí),根據(jù)key從索引中找出來(lái)任務(wù),更改任務(wù)里的調(diào)用次數(shù)、調(diào)用時(shí)長(zhǎng)。
詳細(xì)代碼見這里

2.2. ScheduledExecutorService

ScheduledExecutorService可以看成worker thread線程池+延遲隊(duì)列。普通ExecutorService線程池里的線程是不斷從阻塞隊(duì)列里取任務(wù),而ScheduledExecutorService里的線程是不斷從DelayQueue里取任務(wù)。
用ScheduledExecutorService實(shí)現(xiàn)這個(gè)功能和直接用延遲隊(duì)列實(shí)現(xiàn)本質(zhì)上沒(méi)區(qū)別。

2.3. 時(shí)間輪

時(shí)間輪可以拿來(lái)執(zhí)行定時(shí)任務(wù),具體介紹可以看看這篇文章
時(shí)間輪可以看成支持按槽“批處理”的DelayQueue,相比DelayQueue有如下優(yōu)勢(shì):

  • O(logN) vs O(1)

  • delayQueue每take一個(gè)任務(wù)后,都要走await/signal重新線程調(diào)度;而分槽后就能支持按槽批處理,槽內(nèi)元素一口氣批處理掉,無(wú)需每次完成一個(gè)任務(wù)就重新線程調(diào)度

  • 鎖消耗:DelayQueue每個(gè)操作都得用一把大鎖;

    [圖片上傳中...(image-2c0039-1601365646856-0)]

    而時(shí)間輪無(wú)鎖(單個(gè)消費(fèi)者線程,而生產(chǎn)者寫入時(shí)單獨(dú)寫一個(gè)MPSQueue里,該隊(duì)列無(wú)鎖讀寫,消費(fèi)者將任務(wù)從queue轉(zhuǎn)移到wheel里)見https://albenw.github.io/posts/ec8df8c/

通過(guò)時(shí)間輪實(shí)現(xiàn)日志聚合場(chǎng)景的話,順帶還保證了容錯(cuò):消費(fèi)者出問(wèn)題的話也不至于堵隊(duì)列堵生產(chǎn)者,因?yàn)榫退阆M(fèi)者堵了,生產(chǎn)者生產(chǎn)數(shù)據(jù)的時(shí)候直接把時(shí)間輪中過(guò)于舊的槽覆蓋掉即可,隊(duì)列永遠(yuǎn)不會(huì)堵。

另外值得一提的是,如果是想在分布式的環(huán)境中使用時(shí)間輪,可以基于中心化存儲(chǔ)實(shí)現(xiàn)時(shí)間輪,每個(gè)槽對(duì)應(yīng)一條key-value數(shù)據(jù),由server輪詢中心化存儲(chǔ),方案細(xì)節(jié)見解密“達(dá)達(dá)-京東到家”的訂單即時(shí)派發(fā)技術(shù)原理和實(shí)踐。不過(guò)粗看了一下,文中這種方案存在可用性問(wèn)題。

三、圖

3.1. 任務(wù)集合可以抽象成圖

并發(fā)編程中,經(jīng)常面對(duì)一些可以并發(fā)執(zhí)行的任務(wù)集合,我們可以把這些任務(wù)集合抽象成圖,以便簡(jiǎn)化編程。
例如某pipeline中,任務(wù)之間沒(méi)有互相先后順序依賴,可以抽象成以下的任務(wù)關(guān)系圖,圖的點(diǎn)代表任務(wù),邊代表依賴關(guān)系。對(duì)于這種任務(wù)之間沒(méi)有任何依賴關(guān)系的圖,直接把所有任務(wù)放到線程池ExecutorService執(zhí)行即可。


image.png

java的線程池ExecutorService適合提交互相之間沒(méi)有依賴的任務(wù),如果任務(wù)之間有依賴,就不能簡(jiǎn)單的調(diào)用ExecutorService.submit()了,可能出現(xiàn)死鎖、饑餓等情況。例如下圖線性pipeline,圖中有4個(gè)任務(wù),彼此之間互相依賴,如果把4個(gè)任務(wù)扔進(jìn)一個(gè)只有3個(gè)線程的ExecutorService,那么可能出現(xiàn)任務(wù)2、任務(wù)3、任務(wù)4占用了3個(gè)線程,但都需要等待前面的任務(wù)完成;而任務(wù)1還在ExecutorService的任務(wù)隊(duì)列里放著,永遠(yuǎn)沒(méi)有空閑線程能把他取出來(lái)執(zhí)行,因此整個(gè)線程池就會(huì)永遠(yuǎn)阻塞、無(wú)法前進(jìn)。


image.png

如果任務(wù)之間的依賴很簡(jiǎn)單,靠CompletionService+future.get()可以搞定,比如下圖的非線性pipeline;但是寫起來(lái)很麻煩,每一個(gè)階段需要拿到前面階段的future對(duì)象,感興趣可以嘗試寫一下,個(gè)人感覺(jué)不好寫。


image.png

如果是分治任務(wù),可以使用Fork/join,任務(wù)圖如下(網(wǎng)上抄的圖);


image.png

而如果任務(wù)之間的依賴圖不規(guī)則、有點(diǎn)復(fù)雜,比如下圖,該如何寫任務(wù)編排呢?

image.png

有很多編程模式可以選擇,包括以下(詳細(xì)可以看并發(fā)編程模型并發(fā)編程時(shí),如何寫復(fù)雜的任務(wù)編排?,這里不細(xì)聊)
A. 水平切分:Parallel Workers
每個(gè)線程串行跑全套任務(wù),不管任務(wù)圖多復(fù)雜,我們就老老實(shí)實(shí)的單線程按順序一個(gè)一個(gè)執(zhí)行,通過(guò)添加更多線程來(lái)提高并發(fā)度
B. 垂直切分:pipeline模式
類似于CPU流水線,每個(gè)任務(wù)由獨(dú)立的線程(池)執(zhí)行,任務(wù)與任務(wù)之間靠queue異步通信,個(gè)人理解go中的csp模型就是這種方案
C. 生產(chǎn)消費(fèi)模式
D. Actor模式
E. 函數(shù)式并行

辦法很多,但寫起來(lái)還是有些麻煩的,有沒(méi)有更好的辦法?

3.2. 圖線程池

為了簡(jiǎn)化上述任務(wù)圖的并發(fā)編程,我寫了個(gè)圖線程池,見https://github.com/seeflood/Advanced-Concurrent/blob/master/src/test/java/io/github/seeflood/advanced/concurrent/executor/dag/DAGTaskExecutorImplTest.java
假設(shè)有一組做菜任務(wù),任務(wù)圖如下,其中燒水/洗菜/聽歌可以并行,燒水/洗菜/聽歌都做好了才能切菜,切菜完成了才能炒菜

image.png

那么使用圖線程池執(zhí)行這些任務(wù),只需要構(gòu)造好DAG,扔到線程池里執(zhí)行即可,代碼如下:

        DAGTaskGroup<String> dag = new DAGTaskGroup<>();
        Callable<String> one_one = () -> handle("燒水");
        Callable<String> one_two = () -> handle("洗菜");
        Callable<String> one_three = () -> handle("聽歌");
        Callable<String> two_one = () -> handle("切菜");
        Callable<String> three_one = () -> handle("炒菜");
        dag.link(one_one, two_one);
        dag.link(one_two, two_one);
        dag.link(one_three, two_one);
        dag.link(two_one, three_one);
        dag.link(one_three, three_one);
        DAGTaskExecutorImpl executor = new DAGTaskExecutorImpl(Executors.newFixedThreadPool(3));
        Map<Callable, String> submit = executor.submit(dag);
        submit.forEach((k, v) -> System.out.println("result map value:" + v));

(這個(gè)框架只是個(gè)周末靈機(jī)一動(dòng)的demo,沒(méi)有加超時(shí)、容錯(cuò)之類的功能,還不好用于生產(chǎn))

后續(xù)

這個(gè)系列的分享我已經(jīng)構(gòu)思好了幾個(gè)主題,可以一個(gè)季度寫一篇,嘻嘻

時(shí)隔一星期之后發(fā)現(xiàn)傻屌簡(jiǎn)書丟了部分?jǐn)?shù)據(jù),簡(jiǎn)單修改了一下,懶得補(bǔ)救了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容