如何使用響應(yīng)式編程(一)
該篇主要是針對(duì)于響應(yīng)式編程的實(shí)踐,在一些實(shí)際的開發(fā)當(dāng)中,利用具體的例子和應(yīng)用場(chǎng)景,來(lái)加深對(duì)響應(yīng)式編程的理解。
其中可能會(huì)涉及到一些思考過程,和方案的選擇,以及最佳實(shí)踐的討論,希望可以讓讀者更快的上手并且可以自己開利用Reactor3框架進(jìn)行實(shí)際的開發(fā)工作。
在這之前,我們可以先說一下冷熱序列的問題。
冷序列與熱序列
冷熱序列是相對(duì)于發(fā)布者(Publisher)來(lái)說的。一般普通的默認(rèn)序列我們都認(rèn)為是冷序列,這些序列在調(diào)用subscribe方法之前是不會(huì)有任何行動(dòng)的,那么熱序列又是什么呢,別著急,我們慢慢來(lái)說,先從一個(gè)應(yīng)用場(chǎng)景來(lái)說明什么是熱序列。
熱序列的使用場(chǎng)景
想象有這樣一個(gè)需求,需要將后臺(tái)服務(wù)打印的日志,實(shí)時(shí)的展示到頁(yè)面上,那么我們應(yīng)該要怎么來(lái)做這個(gè)事情呢
這個(gè)示例是一個(gè)完整的示例,雖然在實(shí)際的工作中沒有什么特別大的用處,但是重點(diǎn)是要熟悉一下Reactor3的熱序列的應(yīng)用。
該示例運(yùn)行在2.5.2的springboot上,使用的是webFlux
我們這兒選擇使用websocket來(lái)實(shí)現(xiàn)這個(gè)這個(gè)功能,先來(lái)說一下使用webflux配置WebSocket的方法;
- 首先我們應(yīng)該在WebConfig當(dāng)中創(chuàng)建一個(gè)
SimpleUrlHandlerMapping
@Bean
public HandlerMapping handlerMapping(LogWebSocketHandler logWebSocketHandler,
FlowWebSocketHandler flowWebSocketHandler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/log", logWebSocketHandler); //如果一個(gè)handler不夠可以配置多個(gè)
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
LogWebSocketHandler 實(shí)現(xiàn)了 WebSocketHandler 接口,如下
public interface WebSocketHandler {
default List<String> getSubProtocols() {
return Collections.emptyList();
}
Mono<Void> handle(WebSocketSession var1);
}
通過 handle 方法我們可以拿到一個(gè)WebSocketSession對(duì)象,如下是該方法的實(shí)現(xiàn)
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession.getHandshakeInfo();
Mono<Void> in = webSocketSession //1
.receive()
.doOnNext(webSocketMessage -> {
String text = webSocketMessage.getPayloadAsText();
LOGGER.info("WebSocket msg:{}", text);
})
.then();
Mono<Void> out = webSocketSession.send(webSocketMessagePublisher(webSocketSession)); //2
return Mono.zip(in, out).then(); //3
}
- 用來(lái)接受客戶端發(fā)來(lái)的數(shù)據(jù)(輸入)
- 用來(lái)給客戶端發(fā)送數(shù)據(jù)(輸出)
- 這里盡量保證輸入和輸出分開來(lái)處理,因?yàn)槲覀冞@里不是為了要實(shí)現(xiàn)一個(gè)請(qǐng)求響應(yīng)的模式(這說法有點(diǎn)牽強(qiáng),等找到更好的表達(dá)方式的時(shí)候,我會(huì)回來(lái)改掉);這里 zip 我感覺還是挺有意思的,想象一下我們有兩個(gè)等長(zhǎng)的集合或者數(shù)組,將這兩個(gè)集合或者數(shù)組 zip 一下的結(jié)果應(yīng)該是什么呢,在一些實(shí)際項(xiàng)目開發(fā)的場(chǎng)景當(dāng)中,我門可能會(huì)遇到這樣的場(chǎng)景,Java JDK當(dāng)中是沒有提供 元組(Tuple) 這個(gè)數(shù)據(jù)結(jié)構(gòu)的,當(dāng)有些場(chǎng)景需要使用元組的時(shí)候,我門只能自己實(shí)現(xiàn)一個(gè),那么如何生成元組呢,這時(shí),我門可以實(shí)現(xiàn)一個(gè) zip 算法來(lái)生成元組。
我門再來(lái)看下WebSocketSession的 send 方法的定義
Mono<Void> send(Publisher<WebSocketMessage> var1);
來(lái)看,這里需要傳入的是一個(gè) WebSocketMessage 的 Publisher,而返回值是一個(gè)Mono<Void>,這里是不是長(zhǎng)得有點(diǎn)像我們之前說的拼接模式呢,入?yún)⑹且粋€(gè)上游的序列,而返回的是該上游序列拼接了內(nèi)部執(zhí)行邏輯的下游序列之后的后繼,聽起來(lái)有點(diǎn)繞口對(duì)不對(duì),沒關(guān)系,這里不是重點(diǎn),因?yàn)槲覀兪遣恍枰P(guān)心send方法內(nèi)部到底拼接了哪些邏輯的,可能是關(guān)于一些網(wǎng)絡(luò)請(qǐng)求,或者網(wǎng)絡(luò)協(xié)議之類的東西,這些都無(wú)所謂,我們最需要關(guān)注的是這里的 Publisher(這里斜體加粗體你就知道有多重要了)。
到這里,WebSocket的配置已經(jīng)結(jié)束,下面的重點(diǎn)是如何來(lái)實(shí)現(xiàn)這個(gè)Publisher。
兩種實(shí)現(xiàn)方案
第一個(gè)方案
當(dāng)?shù)谝淮斡龅竭@個(gè)問題的時(shí)候,我的第一反應(yīng)是維護(hù)一組 Session ,每個(gè)Session代表一個(gè)客戶端鏈接,想要給哪個(gè)人發(fā)消息,就找到相應(yīng)的 Session對(duì)象并調(diào)用發(fā)送消息的接口(之前使用SrpingMVC的時(shí)候就是這么干的)。很顯然這個(gè)方案是不可行的,因?yàn)檫@里的 WebSocketSession沒什么實(shí)際的用處(除了用于獲取 HandShakeInfo),我門看send方法,并沒有給誰(shuí)發(fā)消息,而是返回了一個(gè) Mono<Voide> ,我們能拿這個(gè)返回值怎么辦呢,什么都干不了。
第一反應(yīng)不行,緊接著是第二反應(yīng):百度搜索。查詢了很久,查到一個(gè)解決方案:
List<FluxSink<WebSocketMessage>> fluxSinkList = new ArrayList<>();
Flux<WebSocketMessage> sessionFlux = Flux.create(fluxSinkList::add);
是的,我們現(xiàn)在得到了一個(gè) SessionFlux 這個(gè) Flux 可以作為一個(gè)數(shù)據(jù)發(fā)布者,返回給我們的WebSocket框架,然后用暴露出來(lái)的 FluxSink#next() 發(fā)布新的數(shù)據(jù)。
這個(gè)看起來(lái)貌似是個(gè)不錯(cuò)的解決方案,但是總覺得有一些別扭,將一個(gè)內(nèi)部對(duì)象暴露出來(lái)存在別的容器當(dāng)中,估計(jì)還需要考慮內(nèi)存泄漏的問題。
第二個(gè)方案
第二個(gè)方案是使用官方文檔推薦的方式,使用 Sinks , 先貼一段代碼
private final Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(1000, true);
private final Flux<String> logFlux = many.asFlux();
...
public void outputLog(String log) {
many.tryEmitNext(log);
}
...
public Flux<String> getLogFlux() {
return logFlux;
}
Reactor3 API 提供的這種方式用起來(lái)還是挺方便的,省了很多事情,在這之前,我還嘗試了一種方式,那就是 繼承一個(gè) Flux ,將數(shù)據(jù)發(fā)布的邏輯放到實(shí)現(xiàn)的內(nèi)部,也就是說用Flux把業(yè)務(wù)模塊包起來(lái),做成一個(gè)大的數(shù)據(jù)發(fā)布者,這種方法很麻煩,需要考慮的事情太多了,不推薦使用,當(dāng)然如果是需要擴(kuò)展 Flux 的功能的話,那就無(wú)所謂了,不過在日常開發(fā)中,熱序列的應(yīng)用場(chǎng)景其實(shí)是比較少的,一般一個(gè)系統(tǒng)也不會(huì)出現(xiàn)幾個(gè)。
進(jìn)一步說明冷序列與熱序列(重要的事情總是要多說幾遍)
之前我們說的執(zhí)行序列說的是以數(shù)據(jù)為核心,聲明對(duì)某些數(shù)據(jù)執(zhí)行順序的封裝,也就是說,一個(gè)執(zhí)行序列封裝了數(shù)據(jù)的執(zhí)行步驟(第一步干什么,第二部干什么,最終轉(zhuǎn)化成什么樣子,類似生產(chǎn)線),再來(lái)說冷熱序列,冷序列是被動(dòng)執(zhí)行的,熱序列是主動(dòng)執(zhí)行的,被動(dòng)執(zhí)行的意思就是它的執(zhí)行是由訂閱者來(lái)觸發(fā),當(dāng)Subscriber調(diào)用subscription的request方法的時(shí)候,傳遞一個(gè)數(shù)值類型的參數(shù),該參數(shù)代表需要請(qǐng)求數(shù)據(jù)的數(shù)量,當(dāng)publisher收到請(qǐng)求之后就會(huì)產(chǎn)出指定數(shù)量的數(shù)據(jù),然后調(diào)用 onNext 方法來(lái)發(fā)部新數(shù)據(jù);主動(dòng)執(zhí)行的意思就是在發(fā)布者創(chuàng)建對(duì)象的那一刻開始,就就已經(jīng)開始生產(chǎn)新數(shù)據(jù)了,不管是否有訂閱者請(qǐng)求數(shù)據(jù),而且訂閱者請(qǐng)求數(shù)據(jù)過程,發(fā)布者也不會(huì)按照訂閱者請(qǐng)求的數(shù)量返還相應(yīng)的數(shù)據(jù),而是有數(shù)據(jù)生成的時(shí)候主動(dòng)推送給訂閱者,直到訂閱者取消訂閱或者發(fā)布者完成發(fā)布的操作(onComplete)。
上邊介紹的推送日志的功能,就是一個(gè)典型熱序列,我們創(chuàng)建了一個(gè)用于推送日志的發(fā)布者(Flux),然后將這個(gè)發(fā)布者當(dāng)作Websocket請(qǐng)求鏈接的返回值返回給框架,這個(gè)時(shí)候,框架當(dāng)中Websocket請(qǐng)求對(duì)象成為了我們的訂閱者,當(dāng)有新日志生成的時(shí)候,日志發(fā)布者就會(huì)將日志發(fā)布給WebSocket請(qǐng)求鏈接,請(qǐng)求鏈接再發(fā)送給客戶端,netty里邊具體怎么做的沒有太細(xì)研究,應(yīng)該是一個(gè)Channel等待就緒狀態(tài),當(dāng)有戲數(shù)據(jù)的時(shí)候就變?yōu)榫途w狀態(tài),select的時(shí)候直接拿到并發(fā)送到操作系統(tǒng)的IO當(dāng)中(這是nio的執(zhí)行邏輯,有可能是使用的epoll或者kqueue,具體看你使用的是什么操作系統(tǒng),這倆是netty自己實(shí)現(xiàn)的,沒太細(xì)看)。