Reactor 3-響應式編程-WHAT

轉(zhuǎn)載請注明出處
作者:@宮三千

是什么?

響應式編程是一種編程的思維模式,區(qū)別于聲明式編程,響應式編程更注重數(shù)據(jù)流轉(zhuǎn),每一段程序都是圍圍繞著數(shù)據(jù)來設計的,原始數(shù)據(jù),在哪,經(jīng)過那些處理之后,最終變成什么樣子。

像這樣圍繞著數(shù)據(jù)流轉(zhuǎn)的程序,我們可以簡稱為執(zhí)行序列。應用響應式編程最直接的方式就是從以往的聲明式程序中找出核心數(shù)據(jù),或者說是抽象出核心數(shù)據(jù),然后把程序的執(zhí)行過程轉(zhuǎn)變?yōu)閷诵臄?shù)據(jù)的執(zhí)行序列,最終,如果是查詢數(shù)據(jù)的程序,就返回當前的執(zhí)行序列如果是執(zhí)行命令的程序,則可以返回當前序列的后續(xù)(then())。

為什么不直接返回生成好的數(shù)據(jù),而要返回但前的執(zhí)行序列呢?原因有如下幾點(可能還有其他的,但是暫時沒想到)

  • 首先,你不知道該程序要在哪執(zhí)行,或者說在哪個線程來執(zhí)行
  • 再者,這是響應式編程的特性,當且僅當調(diào)用訂閱方法( Flux#subscribe() )的時候,序列才會真正的執(zhí)行
  • 最后,從設計的角度講,響應式編程的數(shù)據(jù)返回值,實際上返回的是一個數(shù)據(jù)的發(fā)布者( Publisher),調(diào)用方相當于是數(shù)據(jù)的訂閱者( Subscriber )(這里還存在冷熱序列,和推拉模式的區(qū)別,后續(xù)再說),只有當訂閱者去訂閱數(shù)據(jù)的時候,發(fā)布者提供的數(shù)據(jù)以及相應的執(zhí)行序列才會真正被執(zhí)行,有另外一種叫法叫做 Lazy Compute ,延遲計算。
  • 最后的最后,統(tǒng)一的設計模式,有助于思維貫通,以響應式編程的方式去思考你的所有實現(xiàn)。

給你的API分門別類

有個微服務的最佳實踐模式命令查詢職責隔離模式(CQRS),在該模式當中,將客戶端的請求分為兩類:

  • 命令(Command)
  • 查詢(Query)

在響應式編程設計中,我們也可以使用相同的概念。

該模式還有一個概念,叫做事件溯源(Event Sourcing),服務和服務之間除了命令和查詢之外,還有事件,和響應式編程的模式非常契合,我也是因為之前學習過這個模式,才被啟發(fā)想到需要給API分門別類的。

響應式編程的事件類型是固定的,通過調(diào)用接口去傳遞事件(next事件,error事件,subscribe事件,complete事件,事件也可以稱之為狀態(tài),其實都是一回事兒,只不過抽象方法以及場景不一樣,所以有不同的叫法)

命令

簡單一點說,所有返回類型為 void 的接口都是命令式接口。

一般這樣的接口,使用響應式變編程的思維,需要返回一個內(nèi)部執(zhí)行序列的后續(xù),所以說,所有void方法的返回值可以統(tǒng)統(tǒng)是Mono<Void> ,該對象代表了該接口提供的針對于傳入?yún)?shù)的后續(xù)執(zhí)行序列。

//聲明式編程
public void runSomething(){doSomthing();}

//響應式編程
public Mono<Void> runSomthing(){return Mono.fromRunnable(()-> doSomthing());}

查詢

簡單一點說,所有返回類型不是 void 的接口都是查詢接口。

一般這樣的接口,需要區(qū)分一下返回的是單個數(shù)據(jù),還是一組數(shù)據(jù),如果是單個數(shù)據(jù),可以使用Mono<T>來表示,如果是一組數(shù)據(jù),可以使用Flux<T> 來表示,這兩個對象代表了該接口提供的針對傳入?yún)?shù)的數(shù)據(jù)查詢操作的執(zhí)行序列。

其實查詢操作使用數(shù)據(jù)發(fā)布者的角色來考慮更好理解:該接口返回了針對于傳入?yún)?shù)的數(shù)據(jù)發(fā)布者。調(diào)用者可以通過訂閱該發(fā)布者去獲取數(shù)據(jù)。

//聲明式編程
public <T> T getOne(){return t}

//響應式編程
public <T> Mono<T> getOne(){return Mono.fromSupplier(()-> t);}

拼接

簡單一點說,所有不直接傳遞參數(shù)值,而是提供一個參數(shù)的容器,或者把參數(shù)封裝成輸入,并且返回響應的容器或者輸出的接口,都是拼接接口(和map操作有點像)。

使用Java8的StreamAPI也可以使用這樣的方式去設計API,避免重復聲明邏輯一樣的流定義。

public <R,T> Mono<R> mapSomthing(Mono<T> monoT){return monoT.map(()->v);}

訂閱者還是發(fā)布者?

響應式編程更注重數(shù)據(jù)的流轉(zhuǎn),那么,誰才是數(shù)據(jù)的發(fā)布者,誰又是數(shù)據(jù)的訂閱者呢?當然這個問題會隨著業(yè)務邏輯的不同而有所變化。發(fā)布者也可以是其他發(fā)布者的訂閱者,訂閱者也有可能是其他訂閱者的發(fā)布者,這樣就會形成一個鏈條,將不同的組件鏈接起來,實現(xiàn)某些業(yè)務。

通常的,我認為,每個獨立的組件或者模塊,不論大小,在響應式編程的設計思路下,都需要抽象出內(nèi)部的數(shù)據(jù)發(fā)布者,和數(shù)據(jù)訂閱者,其實就是輸入和輸出(Input and Output),嗯,這是應該是很正常的事情。發(fā)布者和訂閱者也是輸入和輸出的其中一種抽象方式。

這里想再多說一點關于輸入和輸出的話題,個人感覺,任何程序,不論粒度的大?。ùa塊,函數(shù),類,模塊,組件,服務,中間件,框架等等)在進行設計的時候都需要考慮這段程序的的輸入是什么,輸出是什么,以及輸入和輸出的方式,函數(shù)自然不用說,就是傳參和返回值,想要改變這種簡單的輸入輸出模式,只能通過使用某些模式來抽象參數(shù)和返回值的方式來擴展輸入輸出的功能。這里說到輸入,響應式編程,還有另外一種思考方式,上下游 ,傳遞參數(shù)的時候,直接給一個明確的值,當然也可以傳遞某些執(zhí)行序列的上游信息,然后方法內(nèi)部實現(xiàn)是提供針對某些執(zhí)行序列的下游執(zhí)行序列,然后返回一個新的執(zhí)行序列。Reactor3中的執(zhí)行序列,當你在添加下游執(zhí)行單元的時候,會返回一個新的執(zhí)行序列對象,其中包裹著上游執(zhí)行序列對象,內(nèi)部稱之為source,有點像是裝飾者模式,沒有太深入看源碼,所以不太確定。

所以除了命令查詢兩種方法之外,還有另一種,應該叫做 拼接 方法(我自己起的名字)。拼接式接口就像上邊所說的,傳一個上游序列,拼接新的執(zhí)行單元,然后返回新的執(zhí)行序列,方便復用。

在使用Spring WebFlux 實現(xiàn)websocket的時候有個WebSocketHandler就和這個拼接方法類似,不過那個是熱序列的拼接,比較不好理解。

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        webSocketSession.getHandshakeInfo();
        Mono<Void> in = webSocketSession
                .receive()
                .doOnNext(webSocketMessage -> {
                    String text = webSocketMessage.getPayloadAsText();
                    LOGGER.info("WebSocket msg:{}", text);
                })
                .then();

        Mono<Void> out = webSocketSession.send(webSocketMessagePublisher(webSocketSession));
        return Mono.zip(in, out).then();
    }

handle方法傳進來的是個WebSocketSession對象,調(diào)用receive()方法返回的是一個Flux<WebSocketMessage>對象,我們該如何理解這個Flux對象呢?

可以理解為一個執(zhí)行序列的上游信息,我們可以在其基礎之上添加相應的執(zhí)行單元,因為這是個WebSocketSession,所以其實是個熱序列(會不斷生成新數(shù)據(jù),直到斷開鏈接),所以我們定義的下游執(zhí)行單元需要指定,當收到新增數(shù)據(jù)的時候,我們應該執(zhí)行什么操作,這也是個典型的push模式,這里調(diào)用doOnNext()方法傳遞一個Consumer對象其實是在定義Subscriber的onNext()方法在收到新數(shù)據(jù)的時候需要執(zhí)行的操作,用發(fā)布訂閱的模式去理解這個過程就是,參數(shù)傳遞進來的是個數(shù)據(jù)發(fā)布者,方法內(nèi)部的程序是數(shù)據(jù)訂閱者,訂閱者要向發(fā)布者訂閱數(shù)據(jù),但是不是真正的訂閱,而只是提供一個訂閱之后的執(zhí)行邏輯,因為從整體來說的話,該方法只是其中的一個執(zhí)行單元,如果在這里調(diào)用subscribe方法,那后續(xù)再添加任何執(zhí)行單元就都沒有什么用了。所以這里其實是有個原則的。

只有可以確認當前序列不會再有下游信息的時候,才能最終調(diào)用Flux#subscribe()方法去執(zhí)行序列??赡懿惶美斫?,不過慢慢的就會理解了。(對于冪等的序列,可以不用考慮這個原則,例如通過 Mono.just() 或 Flux.just() 等方法創(chuàng)建的序列)**

發(fā)布訂閱模式(PubSub)

企業(yè)微信截圖_8cc99873-29b0-4ffb-b698-646fc7ace807.png

序列的開始,是由訂閱者調(diào)用發(fā)布者的subscribe方法,首先publishier會創(chuàng)建一個subscription對象,并回調(diào)訂閱者的onSubscribe方法,將subscription注入到訂閱者的體內(nèi),當訂閱者需要從發(fā)布者那里獲取數(shù)據(jù)的時候,訂閱者可以調(diào)用Subscription對象的request方法,發(fā)布者生成指定數(shù)量的數(shù)據(jù)之后,循環(huán)調(diào)用subscriber對象的onNext方法用來傳遞數(shù)據(jù),當發(fā)布的過程產(chǎn)生錯誤,發(fā)布者會調(diào)用訂閱者的onError方法來傳遞錯誤,當發(fā)布者發(fā)布完成,會調(diào)用訂閱者onComplete方法代表發(fā)布完成了,之后,發(fā)布者將不會再次發(fā)布數(shù)據(jù),這個流程就結(jié)束了。

以上是標準的冷序列的調(diào)用直徑流程。熱序列和這個流程有點不一樣并且不太容易理解,所以我們到后續(xù)的章節(jié)再單獨進行說明。

發(fā)布訂閱模式可能更多的是應用于進程之間或者線程之間的交互,其中通過阻塞隊列或相應的消息中間件去實現(xiàn)消息的容器,但是在響應式編程當中,該模式的主要作用,就是進行連接,將數(shù)據(jù)從一個組件傳遞到另外一個組件,而且默認是沒有像阻塞隊列那樣的數(shù)據(jù)容器,除非手動指定buffer模式,后邊介紹各種Operator的時候會說到buffer模式。

以發(fā)布訂閱模式的出發(fā)點去思考的話,應對理解響應式編程有幫助。

一個數(shù)據(jù)提供者對象,都能提供哪些數(shù)據(jù),這些數(shù)據(jù)通過什么方式傳遞?

一個可執(zhí)行命令的對象,都能提供哪些命令的執(zhí)行?這些命令什么時候去執(zhí)行,我能否先獲得所有需要執(zhí)行的命令,再根據(jù)一些規(guī)則,去定制他們的執(zhí)行順序?

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容