? ?上一篇講了webflux的簡單使用,但如果深入點想就會有很多疑問。webflux是如何同netty協(xié)作的?響應(yīng)式的線程是如何調(diào)度的?一個請求是怎么來到我們定義的RequestMapping方法的?本篇通過對webflux源碼的閱讀,簡要分析這幾點。
? ?spring webflux基于reactor,默認的容器為netty,所以想學(xué)習(xí)spring webflux的源碼,必須要有這兩個技術(shù)的知識。這里先簡要介紹一下。
Netty
? ?Netty是一個高性能、異步事件驅(qū)動的NIO框架,提供了對TCP、UDP和文件傳輸?shù)闹С?,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的。
? ?上圖是netty的基本工作流程,簡單來說:
· 一個EventLoopGroup包含一個或多個EventLoop。
· 一個EventLoop在生命中周期綁定到一個Thread上。
· EventLoop使用其對應(yīng)的Thread處理IO事件。
· 一個Channel使用EventLoop進行注冊。
· 一個EventLoop可被分配至一個或多個Channel。
? ?EventLoop除了要負責(zé)處理綁定的Channel所有io操作,由于其繼承了Executor接口,還可以執(zhí)行提交的任務(wù)。需要注意的是,提交給EventLoop的任務(wù)必須是非阻塞的,否則將使io處理沒有資源,導(dǎo)致整個應(yīng)用吞吐量下降。
? ?netty處理數(shù)據(jù)流程如下:

? ?數(shù)據(jù)從客戶端傳入服務(wù)端稱為出站,反之稱為入站。一個socket鏈接為一個channel,一個channel有一個channelpipeline,pipeline是入站處理器和出站處理器的鏈式集合。當一個請求msg從某個channel入站時,將從該channel綁定的pipeline的head開始,經(jīng)過一個個ChannelInboundHandler入站處理器的處理。反之,當一個響應(yīng)msg從channel出站時,將從該channel綁定的pipeline的tail開始,經(jīng)過一個個ChannelOutboundHandler入站處理器的處理。
? ?當某個ChannelHandler被添加到ChannelPipeline中時,會為其創(chuàng)建一個ChannelHandlerContext,ChannelHandler可以訪問其綁定的ChannelHandlerContext,從而和pipeline交互。有點類似攔截器鏈,ChannelHandler通過ChannelHandlerContext將數(shù)據(jù)交給pipe中的下一個處理器處理。
Reactor
? ?Reactor的核心是Mono和Flux兩個類,他們都繼承了Publisher接口,代表一個數(shù)據(jù)流的發(fā)布者。其中,一個Flux代表一個0~N個元素的序列發(fā)射源,而Mono代表只有0或1個元素的發(fā)射源??梢酝ㄟ^subscribe()方法訂閱發(fā)射源,類似java中的stream操作,在執(zhí)行subscribe()之前,Mono和Flux并不會開始發(fā)射數(shù)據(jù)。
? ?Mono和Flux提供了豐富的api可以進行鏈式調(diào)用,并且可以通過subscribeOn()或者publishOn()指定Mono中某一步操作的執(zhí)行線程。如果不指定Mono或Flux的執(zhí)行線程,那么默認會在調(diào)用subscribe()的線程上運行,這一點也和Stream相似。
public Mono<String> say(String name) {
return Mono.just(name)
.publishOn(Schedulers.elastic())
.map(Try.of(this::hello));
}
private String hello(String name) throws InterruptedException {
Thread.sleep(10000);
String result = String.format("hello %s, current-thread is [%s]", name, Thread.currentThread().getName());
System.out.println(result);
return result;
}
上述代碼將打?。?br>
main thread
hello nihao, current-thread is [elastic-2]
但如果將
.publishOn(Schedulers.elastic())注釋那么結(jié)果是:
hello nihao, current-thread is [main]
main thread
從這里可以看出Reactor的優(yōu)點,她極大的簡化了異步編程中線程切換處理的難度。
Spring Webflux
? ?回到Webflux,他是如何工作的?可以從@EnableWebFlux注解開始看,這里略去復(fù)雜的過程直接說結(jié)果。首先,WebFlux的核心仍然和MVC一樣是Dispatcher,另外簡單來說,可以將整個過程描述為兩步:
一、向Netty注冊ChannelHandler
二、Netty調(diào)用ChannelHandler
先說向Netty注冊ChannelHandler。
- 容器啟動,@EnableWebFlux注解引入的DelegatingWebFluxConfiguration配置類向容器注冊基礎(chǔ)bean,包括DispatcherHandler、WebFluxResponseStatusExceptionHandler、RequestMappingHandlerMapping等。
- 向WebHttpHandlerBuilder傳入ApplicationContext,WebHttpHandlerBuilder利用context獲得WebHandler(Dispatcher)、List<WebFilter>、List<WebExceptionHandler>等bean,并構(gòu)造HttpHandler(HttpWebHandlerAdapter)。HttpHandler接口的方法:Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response)。很明顯,她是Webflux功能的集合和對外接口。
- 由于使用Netty,使用ReactorHttpHandlerAdapter是包裝HttpHandler,ReactorHttpHandlerAdapter作為WebFlux上層到底層容器Netty的橋梁,類似的還有ServletHttpHandlerAdapter,鏈接Servlet容器。
- 使用httpServer.newHandler(adapter)注冊ReactorHttpHandlerAdapter。 httpServer下一層是tcpServer,tcpServer將ReactorHttpHandlerAdapter包裝包裝成ContextHandler,ContextHandler實現(xiàn)了ChannelInitializer接口,可以用來向Netty注冊ChannelHandler
@Override
public final Mono<? extends NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return Mono.create(sink -> {
ServerBootstrap b = options.get();
SocketAddress local = options.getAddress();
b.localAddress(local);
ContextHandler<Channel> contextHandler = doHandler(handler, sink);
b.childHandler(contextHandler);
if(log.isDebugEnabled()){
b.handler(loggingHandler());
}
contextHandler.setFuture(b.bind());
});
}
- ContextHandler在initChannel()->accept()方法中向pipeline中注冊了ChannelOperationsHandler,ChannelOperationsHandler實現(xiàn)了Netty的ChannelDuplexHandler接口,可以通過ChannelHandlerContext處理入站和出站數(shù)據(jù)。
channel.pipeline()
.addLast(NettyPipeline.ReactiveBridge,
new ChannelOperationsHandler(this));
- ChannelOperationsHandler通過channelActive()在channel每次激活時調(diào)用了ContextHandler的createOperations()方法創(chuàng)建ChannelOperations。這一步很關(guān)鍵,首先通過channelActive()說明ChannelOperations是與一次請求對應(yīng)的。另外看createOperations()的代碼,有兩處:
ChannelOperations<?, ?> op =
channelOpFactory.create((CHANNEL) channel, this, msg);
channel.eventLoop().execute(op::onHandlerStart);
這說明了Spring Webflux處理一次請求的方式:
- 一個request到Netty時,Netty傳遞給ChannelOperationsHandler
- ChannelOperationsHandler將數(shù)據(jù)和對數(shù)據(jù)的操作封裝成ChannelOperations
- 將ChannelOperations作為一個任務(wù)提交給Netty的eventLoop
- eventLoop在對一個channel的pipeline調(diào)用完成后,將執(zhí)行提交的任務(wù),此時將進入處理Spring WebFlux的操作。
來看看ChannelOperations做了什么,首先明確幾點:ChannelOperations中封裝了this(ContextHandler),而ContextHandler中封裝了ReactorHttpHandlerAdapter,ReactorHttpHandlerAdapter中則封裝了HttpHandler,HttpHandler中則是核心組件WebHandler(Dispatcher)、List<WebFilter>、List<WebExceptionHandler>等的集合。
protected final void applyHandler() {
// channel.pipeline()
// .fireUserEventTriggered(NettyPipeline.handlerStartedEvent());
if (log.isDebugEnabled()) {
log.debug("[{}] {} handler is being applied: {}", formatName(), channel
(), handler);
}
try {
Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
.subscribe(this);
}
catch (Throwable t) {
log.error("", t);
channel.close();
}
}
關(guān)鍵點又來了,提交給eventLoop 的任務(wù)做了什么:
Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
.subscribe(this);
handler就是包裝在ChannelOperations中的ReactorHttpHandlerAdapter,可以看出這里的Mono并沒有做線程切換,所以會由當前線程(eventLoop)執(zhí)行ReactorHttpHandlerAdapter的apply()方法,ReactorHttpHandlerAdapter在其apply()方法里調(diào)用了她包裝的httpHandler的handle(), 而httpHandler的handle()又調(diào)用了Dispatcher的handler():
從中我們可以得到使用Spring WebFlux的核心法則:
絕對不要阻塞Controller的方法。
因為這將導(dǎo)致eventLoop線程的阻塞,而eventLoop線程數(shù)量一般只有cpu核心數(shù)*2個,如果阻塞了eventLoop線程將導(dǎo)致真?zhèn)€服務(wù)不可用。
下面做個小實驗
package com.xinan.demo.rest;
import com.xinan.demo.util.Try;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xinan
* @date 2018/8/3
*/
@RestController
@RequestMapping("hello")
@Slf4j
public class HelloController {
@GetMapping
public Mono<String> say(String name) {
return Mono.just(name)
.map(Try.of(this::hello));
}
private String hello(String name) throws InterruptedException {
Thread.sleep(10000);
String result = String.format("hello %s, current-thread is [%s]", name, Thread.currentThread().getName());
System.out.println(result);
return result;
}
@GetMapping("nob")
public Mono<String> nob() {
return Mono.just(Thread.currentThread().getName());
}
}
通過webbench say()同時發(fā)送20個請求(大于eventLoop線程數(shù))
webbench -c 20 -t 30 http://localhost:8080/hello/say
再立即通過瀏覽器訪問 http://localhost:8080/hello/nob,可以看到http://localhost:8080/hello/nob接口直到10秒之后才返回結(jié)果。這就是阻塞了eventLoop線程的結(jié)果,耗盡了線程資源,導(dǎo)致服務(wù)不可用。
? ?本文簡要分析了Spring WebFlux的主要工作流程,可以看到WebFlux在線程調(diào)用方面和Spring MVC還是又很大不同的,在使用中一定要注意WebFlux的特點,避免錯誤使用導(dǎo)致性能遠低于預(yù)期。
? ?如有錯誤,懇請批評指正!