翻譯上面的原文,因?yàn)樽罱谟肦eactor, 所以想知道什么時(shí)候用什么,why? 可能會(huì)有多篇和實(shí)例,這是第一篇
多個(gè)Reactive 框架Reactor /Rxjava (1/2), 還有java8的stream, 來(lái)比較下,從8個(gè)方面
1. Composable
2. Lazy
3. Reuseable
4.Asynchronous
5.Cacheable
6.Pull or Push
7.Backpressure
8.Operator fusion
參與比較的工具有:
CompletableFucture, Stream, Optional, Observable(Rxjava1), Observable(Rxjava2), Flowable(Rxjava2), Flux(Reactor)
1. Composable: 組合性, 能否function編程
CompletableFucture, 有一組then*()方法,可以組成pipeline,傳遞你nothing或者single value 從stage到stage
Stream,有一組operators來(lái)傳遞N個(gè)value
Optional, 只有map,flatMap,filter少數(shù)幾個(gè)中間方法
Observable, Flux, Flowable, 這點(diǎn)跟stream一樣
2. Lazy, 是否懶加載,推遲執(zhí)行
CompletableFuture, 在對(duì)象創(chuàng)建的時(shí)候被賦予的是已經(jīng)啟動(dòng)的Async的線程,只能通過(guò)它得到結(jié)果,不lazy
Stream, 所有中間操作(返回Stream的)都是lazy,所以trigger操作都是不lazy
Optional, 不lazy, 立即發(fā)生
Observable, Flux, Flowable, lazy, 直到subscriber出現(xiàn)
3. Reusable, 復(fù)用
CompletableFucture, 可以復(fù)用,畢竟只是個(gè)wrapper對(duì)象
Stream,不可以, javadoc:?
一個(gè)stream只能在(一個(gè)中間或終結(jié)操作)執(zhí)行一次,復(fù)用會(huì)拋出IllegalStateException,但是一些stream操作會(huì)返回receiver而不是一個(gè)新的stream對(duì)象,所以很難發(fā)現(xiàn)
Optional, 可以,是immutable, 立即發(fā)生
Observable, Flux, Flowable, 可以從設(shè)計(jì)上復(fù)用,都是從開(kāi)始調(diào)用直到subscriber
4. Asynchronous 異步
CompletableFuture, 本身為了Async而出現(xiàn),代表了一個(gè)work,內(nèi)部通過(guò)ForkJoinPool來(lái)操作Executor,默認(rèn)創(chuàng)建cpu core數(shù)量的線城池,或者double(超頻),可以通過(guò)jvm參數(shù): -Djava.util.concurrent.ForkJoinPool.common.parallelism=? 設(shè)置數(shù)量
Stream,本身不Async, 可以通過(guò)parrallelStream來(lái)多線程
Optional, 不行
Observable, Flux, Flowable, 基于Observer模式的,肯定是Async, 還能通過(guò)Subscirber接口方法onNext onCompleted來(lái)控制, 然后subscribeOn 和observeOn 可以控制 subcription和reception ,類似:

返回:

5. Cacheable , 緩存
reuseable和cacheable的區(qū)別: 假設(shè)pipeline A , re-use 2次, 創(chuàng)建pipeline B = A + [] , C = A + []
如果B 和C 成功,就是可reusable,
如果B 和 C成功, A的每個(gè)stage只被調(diào)用1次,就是cacheable, 必須是reusable 才能cacheable
CompletableFuture,? 跟reusable的一樣,
Stream, 不能cache中間結(jié)果
Optional, 可以,因?yàn)榻Y(jié)果已經(jīng)得到
Observable, Flowable, Flux, 默認(rèn)不是, 可以.cache()方法來(lái)啟用

6. Push, Pull , 是push模式還是pull模式
Stream , Optional是pull的,通過(guò)collect(), get()等方法,pull模式意味著阻塞,同步
CompletableFuture, Flux, Observable, Flowable, 是push的, 意味著非阻塞,異步
7.Backpressure, 背壓(極差的翻譯)
主要是push模式才有, 上游的Push數(shù)量超過(guò)了下游subscriber的處理能力
CompletableFuture, 不需要,只有一個(gè)結(jié)果
Observable(Rxjava1), Flux, Flowable, 處理策略:
Bufferring, 緩存,來(lái)不及處理的放入buffer, onNext()方法(其中Flux默認(rèn)LONG_MAX 大小)
Drop, 扔
use latest, 用最新的
None, 不管
Exception, 扔異常
Observable(Rxjava2), 不解決這個(gè)問(wèn)題,必須使用Flowable來(lái)處理
8. Operator Fusion, 操作符熔斷
是一種多中間階段的代碼優(yōu)化,只有Reactor 和Rxjava2 支持, 舉例:
marco-fusion:

不是很清楚更具體從其他文章分析
總結(jié)

Stream,CompletableFuture and Optional都是為了解決特別的問(wèn)題而出現(xiàn),不用濫用
Rxjava 和Reactor 也同樣, 在大并發(fā)的情況下,需要測(cè)試使用