Project Reactor 之 publishOn 與 subscribeOn

一、概述

在 Spring Reactor 項目中,有兩個出鏡較少的方法:publishOnsubscribeOn。這兩個方法的作用是指定執(zhí)行 Reactive Streaming 的 Scheduler(可理解為線程池)。

為何需要指定執(zhí)行 Scheduler 呢?一個顯而易見的原因是:組成一個反應(yīng)式流的代碼有快有慢,例如 NIO、BIO。如果將這些功能都放在一個線程里執(zhí)行,快的就會被慢的影響,所以需要相互隔離。這是這兩個方法應(yīng)用的最典型的場景。

二、Scheduler

在介紹 publishOnsubscribeOn 方法之前,需要先介紹 Scheduler 這個概念。在 Reactor 中,Scheduler 用來定義執(zhí)行調(diào)度任務(wù)的抽象??梢院唵卫斫鉃榫€程池,但其實際作用要更多。先簡單介紹 Scheduler 的實現(xiàn):

  • Schedulers.elastic(): 調(diào)度器會動態(tài)創(chuàng)建工作線程,線程數(shù)無上界,類似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 創(chuàng)建固定線程數(shù)的調(diào)度器,默認線程數(shù)等于 CPU 核心數(shù)。

關(guān)于 Scheduler 的更多作用留在以后介紹。

三、publishOn 與 subscribeOn

接下來進入正題。先看兩個例子(來自 https://github.com/reactor/lite-rx-api-hands-on

publishOn 的例子

Mono<Void> fluxToBlockingRepository(Flux<User> flux, 
                                    BlockingRepository<User> repository) {
    return flux
            .publishOn(Schedulers.elastic())
            .doOnNext(repository::save)
            .then();
}

subscribeOn 的例子

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository)  {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
               .subscribeOn(Schedulers.elastic());
}

這里的 repository 的類型是 BlockingRepository,指的是會導(dǎo)致線程阻塞的數(shù)據(jù)庫操作的集合,例如 JPA、MyBatis 等基于 JDBC 技術(shù)實現(xiàn)的 DAO。

在第一個例子中,在執(zhí)行了 publishOn(Schedulers.elastic()) 之后,repository::save 就會被 Schedulers.elastic() 定義的線程池所執(zhí)行。

而在第二個例子中,subscribeOn(Schedulers.elastic()) 的作用類似。它使得 repository.findAll()(也包括 Flux.fromIterable)的執(zhí)行發(fā)生在 Schedulers.elastic() 所定義的線程池中。

從上面的描述看,publishOnsubscribeOn 的作用類似,那兩者的區(qū)別又是什么?

兩者的區(qū)別

簡單說,兩者的區(qū)別在于影響范圍。publishOn 影響在其之后的 operator 執(zhí)行的線程池,而 subscribeOn 則會從源頭影響整個執(zhí)行過程。所以,publishOn 的影響范圍和它的位置有關(guān),而 subscribeOn 的影響范圍則和位置無關(guān)。

看個 publishOnsubscribeOn 同時使用的例子

Flux.just("tom")
        .map(s -> {
            System.out.println("[map] Thread name: " + Thread.currentThread().getName());
            return s.concat("@mail.com");
        })
        .publishOn(Schedulers.newElastic("thread-publishOn"))
        .filter(s -> {
            System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
            return s.startsWith("t");
        })
        .subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
        .subscribe(s -> {
            System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
            System.out.println(s);
        });

輸出結(jié)果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

從上面的例子可以看出,subscribeOn 定義在 publishOn 之后,但是卻從源頭開始生效。而在 publishOn 執(zhí)行之后,線程池變更為 publishOn 所定義的。

實際用途

這里介紹 publishOnsubscribeOn 的一種實際用途,那就是反應(yīng)式編程和傳統(tǒng)的,會導(dǎo)致線程阻塞的編程技術(shù)混用的場景。其實開頭兩個例子已經(jīng)解釋了這個場景。

在第一個 publishOn 的例子中,repository::save 會導(dǎo)致線程阻塞,為了避免造成對其它反應(yīng)式操作的影響,便使用 publishOn 改變其執(zhí)行線程。

在第二個 subscribeOn 的例子中,repository.findAll() 會導(dǎo)致線程阻塞。但是其是源頭的 publisher,因此不能使用 publishOn 改變其 執(zhí)行線程。這時就需要使用 subscribeOn,在源頭上修改其執(zhí)行線程。

這樣,通過 publishOnsubscribeOn 就在反應(yīng)式編程中實現(xiàn)了線程池隔離的目的,一定程度上避免了會導(dǎo)致線程阻塞的程序執(zhí)行影響到反應(yīng)式編程的程序執(zhí)行效率。

局限性

使用 publishOnsubscribeOn 只能在一定程度上避免反應(yīng)式編程代碼執(zhí)行的效率被影響。因為用來隔離的線程池資源終歸是有限的,比如當(dāng)出現(xiàn)數(shù)據(jù)庫資源不足、慢查詢等問題時,對應(yīng)的線程池資源如果被耗盡,還是會使整個反應(yīng)式編程的執(zhí)行效率受到影響。

目前,Redis、Mongo、Couchbase 等非關(guān)系型數(shù)據(jù)庫均有相應(yīng)的反應(yīng)式編程的解決方案,但是關(guān)系型數(shù)據(jù)庫卻沒有理想的方案。一個重要原因是 JDBC 本身就是一個阻塞式的 API,根本不可能讓其適應(yīng)反應(yīng)式編程。因此需要一個新的方案。目前 Oracle 正在推動 ADBA (Asynchronous Database Access API),使得關(guān)系型數(shù)據(jù)庫可以滿足異步編程的需要。但是,因為是 Oracle 主導(dǎo),大家都懂的,所以目前前景還不是很明朗。另外一個技術(shù)方案是 Spring 推動的 R2DBC,從名字上來看就很像是 JDBC 在反應(yīng)式編程領(lǐng)域的對應(yīng)的解決方案。目前可以支持 PostgreSQL,支持 MySQL 目前還尚需時日。

后續(xù)

接下來關(guān)于 Project Reactor 的文章我打算向大家介紹一下 Hot 和 Cold Publisher 的概念以及 Project Reactor 的源碼實現(xiàn)。

我的技術(shù)公眾號“編走編想”
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容