Reactive、Reactor和webflux

開題

Reactor顧名思義核反應堆,光聽名字就知道它有多強了,首先Reactor是異步非阻塞的,基于netty,而tomcat不是,一個請求一個線程(除了Servlet3.1以上),使用Reactor就是整個代碼在執(zhí)行鏈上也更清晰,做過前端的同學應該很有感悟,不管是jquery還是vue都是一級一級往下點,那種感覺真的很爽,當然java8也有類似體驗。所以誕生之初,這個東西就不是為了java而生的,java是重語言,強調穩(wěn)定性,直到jdk1.8也不愿意為reactor妥協(xié)。最終Spring按奈不住了,率先把reactor集成至自己函數(shù)庫中,所以Spring的版本至少是Spring5,而jdk至少是1.7,(因為Spring5實現(xiàn)了很多關于響應式編程的東西),然后webflux坑很深,完全看上去像另一門語言(重點),所以學習成本相對陡峭,我更多從使用者方向去思考。
國內使用這個技術的公司好像沒幾家,除了我上家公司以外(用的也不是很好),首先我知道的有阿里,當然也只是一部分技術部分,也是我同學告訴我的,然后我在這家公司剛做技術選型時注冊中心還在糾結eureka還是nacos,因為我來這之前根本不會nacos,只是聽過,用eureka倒是很熟,但是看到nacos支持響應式編程,我還是很開心的,加上其他一些因素還是選定了nacos。
所以大家暫時不用擔心,這個技術暫時還不會取代java命令式編程,因為兼容其他中間件還需要時間,但是也是一個警告。

命令式編程和響應式編程區(qū)別

命令式編程一行一個代碼,我們很明確就能知道,下一行代碼跟上一行代碼關系,因為是按步驟一步一步往下走的,最終返回的那個結果是上面一行一行代碼組合最終的呈現(xiàn)結果。

而響應式編程不一樣,它不會再描述每一步我們要進行的步驟,它只描述你要構建數(shù)據(jù)將要流經的管道,當數(shù)據(jù)流經管道時,可以對它們進行某種形式的修改或者使用。這樣做的好處是我們不再關注每一行代碼是做什么的(想象有100行代碼),只需要關注管道最終返回的結果是什么,然后依據(jù)上一個管代的結果,流到我們這個管道需要做什么。每個管道都是異步非阻塞的。

主要原因是Servlet是阻塞和多線程的,每個連接都會使用一個線程。在請求處理的時候,會在線程池中拉取一個worker線程來對請求進行處理。同時,請求線程是阻塞的,直到worker線程提示它完成為止。這也帶來的后果就是阻塞式Web框架在大量請求無法有效地擴展。緩慢的worker線程所帶來的延遲會使情況變得更糟,因為它將花費更長的時間才能將worker線程送回池中,準備處理另一個請求。在某些場景中,這種設計完全可以接受。事實上,這種方式也是這十年來Web應用程序的開發(fā)方式,但是時代在改變。這種方式適合以前偶爾瀏覽網站的人們,而現(xiàn)在人們會頻繁消費HTTPAPI,他們會持續(xù)地和Web API交換數(shù)據(jù)。

事件輪詢請求

數(shù)據(jù)庫支持

  1. 2.3執(zhí)行時間和普通比。
  2. Spring Data Reactive不支持 MySQL,進一步也不支持 MySQL 事務。所以用了 Reactive 原來的 spring 事務管理就不好用了。jdbc jpa 的事務是基于阻塞 IO 模型的,如果 Spring Data Reactive 沒有升級 IO 模型去支持 JDBC,生產上的應用只能使用不強依賴事務的。

Reactor的主要類

在Reactor中,經常使用的類并不多,主要有以下兩個:

  • Mono 實現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到1個元素的發(fā)布者(Publisher)
  • Flux 同樣實現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到N個元素的發(fā)布者

Publisher

Mono和Flux都是Publisher,發(fā)布者起到發(fā)送流數(shù)據(jù)作用。

Subscriber

1.Subscriber,因為一次只請求一個元素會導致本身效率低下。
2.為了驗證是不是一次請求一個元素,fromInter 或 range。
onComplete因為是多線程,為了防止發(fā)布者和訂閱者結束后有個通知,否則會造成周期競爭。
onComplete或onError都會觸發(fā)終止訂閱

Subscription 和 Processor

發(fā)布者、訂閱者關系流程

Backpressure

指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略,可能最終導致溢出。subscription

doOnSubscribe 和 doOnNext

  1. doOnSubscribe是事件被訂閱之前(也就是事件源發(fā)起之前)會調用的方法, 它一般執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
  2. doOnNext是觀察者被通知之前(也就是回調之前)會調用的方法,說白了就是最終回調之前的前一個回調方法,這個方法一般做的事件類似于觀察者做的事情,只是自己不是最終的回調者。(觀察者即最終回調者)

Mono和Flux

開發(fā)者應只關注Publisher ,如果開發(fā)中間件,redis、dubbo,甚至nacos, Web Flux 則會自動幫我們實現(xiàn) Subscriber

Flux類的靜態(tài)方法

  • just():可以指定序列中包含的全部元素。
  • fromArray(),fromIterable()和 fromStream():可以從一個數(shù)組、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象。
  • empty():創(chuàng)建一個不包含任何元素,只發(fā)布結束消息的序列。
  • error(Throwable error):創(chuàng)建一個只包含錯誤消息的序列。
  • range(int start, int count):創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。
  • interval(Duration period):創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發(fā)布。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間。
  • concat,類似于Mono的zip(但是不一樣,這個不會返回tuple)。
  • concatWith,類似于Mono的zipWith(但是不一樣,這個不會返回tuple)。
  • concatMapIterable(Arrays.asList),會在Flux.just(1,2,3)每個元素中逐個穿插指定集合元素,或對當前Flux進行數(shù)據(jù)操作(比如逐個元素加2)。
  • defer():一種懶創(chuàng)建方式,對比just。
  • MathFlux.sumInt(Flux.range),這是新出的一種封裝,MathFlux有多種函數(shù)計算實現(xiàn),可以根據(jù)不同場景選型。替換可.as()。也可替換reduce也能實現(xiàn)上述功能,但是reduce功能更加強大,不僅可以對數(shù)字甚至任何類型都可以。
  • generate() 方法同步和逐一的方式來產生 Flux 序列,next()方法只能最多被調用一次,不調用 complete()方法,所產生的是一個無限序列。
  • create() 跟上面類似,只是它允許有多個元素。
  • buffer(int) 和 bufferUntil(), 這兩個操作符的作用是把當前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進行收集時可以指定不同的條件。
  • concatWith(Flux)、onErrorResume()、onErrorReturn、doFinally 和 retry() 異常處理對比java。
  • collectList() 和 collect(Collectors.toList()) ,Mono轉Flux。

Mono類的靜態(tài)方法

  • zipWith():不需要上一個Mono的結果(類型可以不一樣)。
  • zipWhen():需要上一個Mono的結果(類型可以不一樣)。
  • zip():組裝多個Mono(類型可以不一樣)。
  • flatMapMany():Mono轉Flux。
  • delayElement,類似于Thread.sleep,可以結合map(同步)、flatMap(異步)。

Flux和Mono共有方法

  • transform():抽出公共部分組裝。
  • defer():同F(xiàn)lux。
  • publishOn(Schedulers) 和 subscribeOn(Schedulers),可以動態(tài)切換線程,可以結合buffer、log使用。

Schedulers 類有如下幾種對上下文操作的靜態(tài)方法:

  • immediate():無執(zhí)行上下文,提交的Runnable將直接在原線程上執(zhí)行,可以理解沒有調度
  • single():可重用單線程,使用一個線程處理所有請求
  • elastic(): 沒有邊界的彈性線程池
  • boundedElastic():有邊界彈性線程池,設置線程限制,默認為cpu核心數(shù)*10。達到上限后最多可以提交10萬個任務。是阻塞線程的方法
  • parallel(): 固定線程數(shù)量的并行線程池,線程數(shù)量和cpu內核一樣多

WebFlux

RouterFunction 類似 Spring Web 的 @RequestMapping 。RouterFunction 用來定義 Spring 5 應用的路由信息。RouterFunctions 助手類包含一些有用的方法,例如 route 定義路由并構建 RouterFunction 對象。RequestPredicates 包含大量有用的方法如 GET, POST, path, queryParam ,accept, headers, contentType 等等,可用來定義路由和構建 RouterFunction。每個 Route 映射到一個處理方法,當接收到 HttpRequest 請求的時候就會調用。

Mono<ServerResponse> 是在配置控制器方法中返回的,而不是controller。

RouterFunction 為應用程序提供了 DSL 風格的路由功能。此時,Spring 并不支持兩種風格混合使用。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容