Project Reactor:OptimizableOperator 原理

前言

通常來說在響應式編程中 Publisher 的創(chuàng)建到真正的訂閱者中間會經(jīng)過許多的響應式操作符,而大部分的操作符其實都是 OptimizableOperator 的實現(xiàn)。

隨便舉幾個例子,例如:map,flatMap,filter,doOnNext 等等。基本上所有對上游數(shù)據(jù)做處理的函數(shù)都實現(xiàn)了 OptimizableOperator

準備工作

你需要對 Reactor 或者響應式編程有一定了解

推薦閱讀:
「響應式編程入門之 Project Reactor」

OptimizableOperator

OptimizableOperator

首先 OptimizableOperator 繼承了 CorePublisher,這沒什么可說的,因為不管是使用 Mono 還是 Flux 執(zhí)行完任何操作之后返回的依舊是一個 Publisher

下面我們來看下 OptimizableOperator 的核心方法,先來簡單說下,后邊會結(jié)合源碼來詳細理解

OptimizableOperator.subscribeOrReturn

該方法有兩種方式實現(xiàn)

  1. 返回一個 CoreSubscriber,返回的訂閱者包裝了下游的實際訂閱者
  1. 返回 null
    當選擇這種實現(xiàn)方式時,當前 OptimizableOperator 自行消費真正的 Publisher。將自己作為下游 Subscriber 的發(fā)布者

這兩種實現(xiàn)方式有什么區(qū)別呢?第一種方式僅僅是作為一個消費者訂閱。而第二種方式,Operator 相當于翻身農(nóng)奴把歌唱了,自己當上了 Publisher,能做的事情比一種方式更多

OptimizableOperator.source

返回當前 OptimizableOperator 的上游(上游可能還是一個 OptimizableOperator)

OptimizableOperator.nextOptimizableSource

返回鏈中的下一個 OptimizableOperator(如果上游是 OptimizableOperator 則返回,否則返回 null)

InternalMonoOperator

我們來通過 InternalMonoOperator 來理解一下上述的幾個方法

InternalMonoOperator

InternalMonoOperator 構(gòu)造方法

InternalMonoOperator 是所有 Mono 操作符的父類,在執(zhí)行操作符動作之前都會先構(gòu)建操作符對象,最終會調(diào)用到 InternalMonoOperator 構(gòu)造方法。

  1. 入?yún)?source 為當前操作符的上游,
  2. super(source) 內(nèi)部實現(xiàn)是將上游賦值給成員變量 source
  3. 如果上游同樣也是操作符,還會將其賦值給 optimizableOperator

subscribe

當最后一個操作符被訂閱時會執(zhí)行如下邏輯??此坪茈y讀,其實和上面我們講的是一樣的,下面一起來分析下

public final void subscribe(CoreSubscriber<? super O> subscriber) {
    // 將當前對象賦值給 operator
    OptimizableOperator operator = this;
    try {
        while (true) {
            // 調(diào)用操作符的 subscribeOrReturn
            // 1. 返回 != null,我們認為 operator 已經(jīng)對實際訂閱者做了包裝。所以繼續(xù)調(diào)用下一個操作符(也就是當前操作符的上游)
            // 2. 如果返回 == null,則代表 operator 要自己處理上游的消費和下游的訂閱,方法結(jié)束
            subscriber = operator.subscribeOrReturn(subscriber);
            if (subscriber == null) {
                // null means "I will subscribe myself", returning...
                return;
            }
            // 返回當前 operator 的下一個操作符(也就是返回的上游的操作符)
            OptimizableOperator newSource = operator.nextOptimizableSource();
            if (newSource == null) {
                // 如果所有 operator 都執(zhí)行完了,那么可以直接向 Publisher 發(fā)起訂閱了
                // 訂閱結(jié)束后,退出當前方法
                operator.source().subscribe(subscriber);
                return;
            }
            // 存在下一個操作符,繼續(xù)執(zhí)行該邏輯
            operator = newSource;
        }
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}

在理解了 subscribe 的邏輯之后,我們在以后在閱讀 Reactor 操作符的源碼時,就可以清楚地知道只需要關注該操作符的 subscribeOrReturn 方法即可。

如果 subscribeOrReturn 僅是返回一個 subscriber,那么我們只需要關注其 Subscriber 的相關邏輯即可。如果返回的 null,則代表該操作符要自行消費上游然后向下游傳遞訂閱,我們需要關注他的 Subscription 相關

最后

如果覺得我的文章對你有幫助,動動小手點下關注,你的支持是對我最大的幫助

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容