Spring 5 WebFlux

作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-26】

更新日志

日期 更新內(nèi)容 備注
2017-11-26 新建文章 Spring 5 WebFlux demo

Reactor

Spring 5的一大亮點(diǎn)是對響應(yīng)式編程的支持,下面的圖片展示了傳統(tǒng)Spring Web MVC結(jié)構(gòu)以及Spring 5中新增加的基于Reactive Streams的Spring WebFlux框架,可以使用webFlux模塊來構(gòu)建異步的、非堵塞的、事件驅(qū)動(dòng)的服務(wù),在伸縮性方面表現(xiàn)非常好。

從上面的結(jié)構(gòu)圖中可以看出,WebFlux模塊從上到下依次是Router Functions,WebFlux,Reactive Streams三個(gè)新組件,WebFlux模塊需要運(yùn)行在實(shí)現(xiàn)了Servlet 3.1+規(guī)范的容器之上,Servlet 3.1規(guī)范中新增了對異步處理的支持,在新的Servlet規(guī)范中,Servlet線程不需要一直阻塞等待直到業(yè)務(wù)處理完成,也就是說,Servlet線程將不需要等待業(yè)務(wù)處理完成再進(jìn)行結(jié)果輸出,然后再結(jié)束Servlet線程,而是在接到新的請求之后,Servlet線程可以將這個(gè)請求委托給另外一個(gè)線程(業(yè)務(wù)線程)來完成,Servlet線程將委托完成之后變返回到容器中去接收新的請求,Servlet 3.1 規(guī)范特別適用于那種業(yè)務(wù)處理非常耗時(shí)的場景之下,可以減少服務(wù)器資源的占用,并且提高并發(fā)處理速度,而對于那些能快速響應(yīng)的場景收益并不大。下面介紹上圖中webFlux各個(gè)模塊:

  • Router Functions: 對標(biāo)@Controller,@RequestMapping等標(biāo)準(zhǔn)的Spring MVC注解,提供一套函數(shù)式風(fēng)格的API,用于創(chuàng)建Router,Handler和Filter。
  • WebFlux: 核心組件,協(xié)調(diào)上下游各個(gè)組件提供響應(yīng)式編程支持。
  • Reactive Streams: 一種支持背壓(Backpressure)的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor,Spring WebFlux默認(rèn)集成的是Reactor。

上面提到WebFlux默認(rèn)集成的Reactive Streams組件是Reactor,Reactor類似于RxJava 2.0,同屬于第四代響應(yīng)式框架,下面主要介紹一下Reactor中的兩個(gè)關(guān)鍵概念,F(xiàn)lux以及Mono。

Flux

如果去查看源代碼的話,可以發(fā)現(xiàn),F(xiàn)lux和Mono都實(shí)現(xiàn)了Reactor的Publisher接口,從這里可以看出,F(xiàn)lux和Mono屬于事件發(fā)布者,類似與生產(chǎn)者,對消費(fèi)者提供訂閱接口,當(dāng)有事件發(fā)生的時(shí)候,F(xiàn)lux或者M(jìn)ono會(huì)通過回調(diào)消費(fèi)者的相應(yīng)的方法來通知消費(fèi)者相應(yīng)的事件,這也就是所謂的相應(yīng)式編程模型,生產(chǎn)者和消費(fèi)者減耦,它們之間通過實(shí)現(xiàn)一個(gè)共同的方法組來實(shí)現(xiàn)相互聯(lián)系(生產(chǎn)者通知事件是通過回調(diào)消費(fèi)者的方法,而實(shí)現(xiàn)通知很多時(shí)候是通過代理)。

下面這張圖是Flux的工作流程圖:

可以從這張圖中很明顯的看出來Flux的工作模式,可以看出Flux可以emit很多item,并且這些item可以經(jīng)過若干Operators然后才被subscrib,下面是使用Flux的一個(gè)小例子:


Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

下面的圖片展示了Mono的處理流程,可以很直觀的看出來Mono和Flux的區(qū)別:

Mono只能emit最多只能emit一個(gè)item,下面是使用Mono的一個(gè)小例子:


Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

WebFlux實(shí)戰(zhàn)

上文中簡單介紹了Reactor的兩個(gè)重要組件Flux和Mono,本文將介紹如何使用Spring 5的新組件WebFlux來進(jìn)行應(yīng)用開發(fā),對于WebFlux底層的實(shí)現(xiàn)細(xì)節(jié)不在本文的分析范圍之內(nèi),當(dāng)然本文也不會(huì)分析總結(jié)Spring 5的新特性,這些內(nèi)容將在其他的文章中進(jìn)行分析總結(jié),下面將完整的描述一個(gè)使用WebFlux的步驟。

首先需要新建一個(gè)Spring項(xiàng)目,然后添加Spring 5的依賴,下面是添加的maven依賴:


    <properties>
        <spring.version>5.0.0.RELEASE</spring.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-core</artifactId>
            <version>8.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webflux</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

然后定義ViewModel類,下面是本文例子涉及的model類定義:


/**
 * Created by hujian06 on 2017/11/23.
 *
 * the result model
 */
public class ResultModel {

    private int id;
    private String content;

    public ResultModel() {

    }

    /**
     * read property from json string
     * @param id id
     * @param content data
     */
    public ResultModel(@JsonProperty("id") int id,
                       @JsonProperty("context") String content) {
        this.id = id;
        this.content = content;
    }
}

public class ResultViewModel {

    private int code;
    private String message;
    private ResultModel data;
}
    

上面的ResultViewModel類是最后將要返回的Vo類,包含了code、message以及data這三個(gè)標(biāo)準(zhǔn)返回內(nèi)容,響應(yīng)內(nèi)容將以json格式返回。下面介紹Service的實(shí)現(xiàn)細(xì)節(jié),可以從上面Vo類中的ResultModel中看出返回內(nèi)容很簡單,就是id和Content,下面首先mock幾個(gè)數(shù)據(jù):


    //*************mock data**************//
    private static List<ResultModel> resultModelList = new ArrayList<>();

    static {
        ResultModel model = new ResultModel();
        model.setId(1);
        model.setContent("This is first model");
        resultModelList.add(model);

        model = new ResultModel();
        model.setId(2);
        model.setContent("This is second model");
        resultModelList.add(model);
    }

在本例中要實(shí)現(xiàn)的接口包括查詢單個(gè)內(nèi)容(根據(jù)id)、查詢所有內(nèi)容、插入數(shù)據(jù)。下面分別介紹每一個(gè)接口的山西愛你細(xì)節(jié),首先是根據(jù)id查詢單個(gè)內(nèi)容的實(shí)現(xiàn):


    /**
     * get the result by the pathVar {"id"}
     * @param serverRequest the request
     * @return the result model
     */
    public Mono<ResultViewModel> extraResult(ServerRequest serverRequest) {
        int id = Integer.parseInt(serverRequest.pathVariable("id"));
        ResultModel model = null;
        ResultViewModel resultViewModel;

        for (ResultModel m : resultModelList) {
            if (m.getId() == id) {
                model = m;
                break;
            }
        }

        if (model != null) {
            resultViewModel = new ResultViewModel(200, "ok", model);
        } else {
            resultViewModel = ResultViewModel.EMPTY_RESULT;
        }

        //return the result.
        return Mono.just(resultViewModel);
    }


需要注意的是,和傳統(tǒng)的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerRequest和ServerResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。上面的方法中最為關(guān)鍵的一點(diǎn)是最后的return語句,返回了一個(gè)Mono,并且這個(gè)Mono包含了查詢的結(jié)果。下面是查詢所有內(nèi)容的方法細(xì)節(jié):


    /**
     * return total result view
     * @param serverRequest the request
     * @return flux of total result model view
     */
    public Flux<ResultViewModel> flowAllResult(ServerRequest serverRequest) {
        List<ResultViewModel> result = new ArrayList<>();
        for (ResultModel model : resultModelList) {
            result.add(new ResultViewModel(200, "ok", model));
        }

        return Flux.fromIterable(result);
    }

這個(gè)方法的實(shí)現(xiàn)就非常簡潔了,最后返回的內(nèi)容是一個(gè)Flux,意味著這個(gè)方法會(huì)返回多個(gè)item,方法中使用了Flux的fromIterable靜態(tài)方法來構(gòu)造Flux,還有很多其他的靜態(tài)方法來構(gòu)造Flux,具體的內(nèi)容可以參考源代碼。最后是插入一條內(nèi)容的方法實(shí)現(xiàn):


    /**
     * the "write" api
     * @param serverRequest the request
     * @return the write object
     */
    public Mono<ResultViewModel> putItem(ServerRequest serverRequest) {

        //get the object and put to list
        Mono<ResultModel> model = serverRequest.bodyToMono(ResultModel.class);
        final ResultModel[] data = new ResultModel[1];

        model.doOnNext(new Consumer<ResultModel>() {
            @Override
            public void accept(ResultModel model) {

                //check if we can put this data
                boolean check = true;
                for (ResultModel r : resultModelList) {
                    if (r.getId() == model.getId()) {
                        check= false;
                        break;
                    }
                }

                if (check) {
                    data[0] = model;
                    //put it!
                    resultModelList.add(model);
                } else {
                    data[0] = null; //error
                }
            }
        }).thenEmpty(Mono.empty());

        ResultViewModel resultViewModel;
        if (data[0] == null) { //error
            resultViewModel = new ResultViewModel(200, "ok", data[0]);
        } else { //success
            resultViewModel = ResultViewModel.EMPTY_RESULT;
        }

        //return the result
        return Mono.just(resultViewModel);
    }

這個(gè)方法看起來優(yōu)點(diǎn)費(fèi)解,首先通過ServerRequest的body構(gòu)造除了一個(gè)Mono(通過bodyToMono方法),然后通過調(diào)用這個(gè)Mono的doOnNext方法來進(jìn)行具體的插入邏輯處理。這個(gè)時(shí)候就需要看Reactor的另外一個(gè)重要的角色Subscriber了,也就是所謂的訂閱者,或者消費(fèi)者,下面是Subscriber提供的幾個(gè)方法:


    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    public void onComplete();

結(jié)合所謂的響應(yīng)式編程模型,publisher在做一件subscriber委托的事情的關(guān)鍵節(jié)點(diǎn)的時(shí)候需要通知subscribe,比如開始做、出錯(cuò)、完成。關(guān)于響應(yīng)式編程模型的具體分析總結(jié),等完成了RxJava 2.0的相關(guān)分析總結(jié)之后再來補(bǔ)充。到此為止本例的Service已經(jīng)編寫完成了,下面來編寫handler,handler其實(shí)是對Service的一層包裝,將返回類型包裝成ServerResponse,因?yàn)槭前b,所以只展示根據(jù)id查詢內(nèi)容的接口的包裝細(xì)節(jié):


    /**
     * get the result from service first, then trans the result to {@code ServerResponse}
     * @param serverRequest the req
     * @return the ServerResponse
     */
    public Mono<ServerResponse> extraResult(ServerRequest serverRequest) {
        //get the result from service
        //todo : do some check here.

        Mono<ResultViewModel> resultViewModelMono = resultService.extraResult(serverRequest);

        Mono<ServerResponse> notFound = ServerResponse.notFound().build();

        //trans to ServerResponse and return.
        //todo : too many code

        return resultViewModelMono.flatMap(new Function<ResultViewModel, Mono<ServerResponse>>() {
            @Override
            public Mono<ServerResponse> apply(ResultViewModel resultViewModel) {
                return ServerResponse
                        .ok()
                        .contentType(APPLICATION_JSON)
                        .body(fromObject(resultViewModel));
            }
        }).switchIfEmpty(notFound);
    }

ServerResponse提供了豐富的靜態(tài)方法來支持將Reactor類型的結(jié)果轉(zhuǎn)換為ServerResponse,到目前為止,業(yè)務(wù)層面已經(jīng)編寫完成,現(xiàn)在可以開始來進(jìn)行router的編程了,router就和他的意義一樣就是用來路由的,將url路由給具體的handler來實(shí)現(xiàn)處理,WebFlux需要返回一個(gè)RouterFunction來進(jìn)行設(shè)置路由信息,下面是本例子中使用到的RouterFunction細(xì)節(jié):


    /**
     * build the router
     * @return the router
     */
    public RouterFunction<ServerResponse> buildResultRouter() {
        return RouterFunctions
                .route(RequestPredicates.GET("/s5/get/{id}")
                        .and(RequestPredicates
                                .accept(MediaType.APPLICATION_JSON_UTF8)), requestHandler::extraResult)
                .andRoute(RequestPredicates.GET("/s5/list")
                        .and(RequestPredicates
                                .accept(MediaType.APPLICATION_JSON_UTF8)), requestHandler::listResult)
                .andRoute(RequestPredicates.POST("/s5/put/")
                        .and(RequestPredicates
                                .accept(MediaType.APPLICATION_JSON_UTF8)), requestHandler::createView);
    }

可以發(fā)現(xiàn),其實(shí)就是將一個(gè)url和一個(gè)handler的具體方法綁定在一起來實(shí)現(xiàn)將一個(gè)url路由給一個(gè)handler方法進(jìn)行處理,RequestPredicates提供了大量有用的靜態(tài)方法進(jìn)行該部分的工作,具體的內(nèi)容可以參考RequestPredicates的源碼以及在項(xiàng)目中多實(shí)踐積累。到目前為止,一個(gè)url請求可以路由到一個(gè)handler進(jìn)行處理了,下面將使用Netty或者Tomcat來將這個(gè)例子運(yùn)行起來,并且進(jìn)行測試,文章開頭提到,WebFlux需要運(yùn)行在實(shí)現(xiàn)了Servlet 3.1規(guī)范的容器中,而包括Tomcat、Jetty、Netty等都有實(shí)現(xiàn),但是推薦使用Netty來運(yùn)行WebFlux應(yīng)用,因?yàn)镹etty是非阻塞異步的,和WebFlux搭配效果更佳。所以下面的代碼展示了如何使用Netty來啟動(dòng)例子:


    public void nettyServer() {

        RouterFunction<ServerResponse> router = buildResultRouter();

        HttpHandler httpHandler = RouterFunctions.toHttpHandler(router);

        ReactorHttpHandlerAdapter httpHandlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);

        //create the netty server
        HttpServer httpServer = HttpServer.create("localhost", 8600);

        //start the netty http server
        httpServer.newHandler(httpHandlerAdapter).block();

        //block
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

如何想使用Tomcate來啟動(dòng)例子,則可以參考下面的例子:


    public void tomcatServer() {

        RouterFunction<?> route = buildResultRouter();
        HttpHandler httpHandler = toHttpHandler(route);

        Tomcat tomcatServer = new Tomcat();
        tomcatServer.setHostname("localhost");
        tomcatServer.setPort(8600);
        Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));
        ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler);
        Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
        rootContext.addServletMapping("/", "httpHandlerServlet");
        try {
            tomcatServer.start();
        } catch (LifecycleException e) {
            e.printStackTrace();
        }

        //block
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

運(yùn)行項(xiàng)目之后,就可以測試是否成功了,下面是一個(gè)測試:


curl http://127.0.0.1:8600/s5/get/1
{
  "code":200,
  "message":"ok",
  "data": {
       "id":1,
       "content":"This is first model"
       }
}

curl http://127.0.0.1:8600/s5/list
[
  {
    "code":200,
    "message":"ok",
    "data": { 
         "id":1,
         "content":"This is first model"
         }
  }, 
  {
     "code":200,
     "message":"ok",
     "data": { 
           "id":2,
           "content":"This is second model"
           }
  }
]


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容