Reactor學(xué)習(xí):三、訂閱

聲明:


一、subscribe方法

注意:所有的操作只有在訂閱的那一刻才開始進行?。?!
subscribe方法有兩種常用的形式:

  • Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
    這四個參數(shù)的意義如下:
    1)consumer:收到一個數(shù)據(jù)時的回調(diào)
    2)errorConsumer:上游報告出現(xiàn)錯誤信號時的回調(diào)
    3)completeConsumer:上游報告完成信號時的回調(diào)
    4)subscriptionConsumer:提供一個Subscription類型的對象,你可以使用他對上游流量進行反饋控制
    Disposable返回類型:使用Disposable#dispose方法可以取消訂閱行為,在FluxMono中,取消意味著告知數(shù)據(jù)源不再生產(chǎn)數(shù)據(jù),但這種取消行為并不一定是及時的,也許數(shù)據(jù)源產(chǎn)生數(shù)據(jù)非???,在接收到取消信號之前就完成所有數(shù)據(jù)的生成。
    Disposable有一個工具類Disposables,它主要提供了Disposable.SwapDisposable.Composite兩種包裝類的工廠方法,前者允許你對一個Disposable進行替換(不取消容器中當(dāng)前的Disposable,僅作替換)以及更新(取消容器中當(dāng)前的Disposable,并替換為新的Dsiposable)操作,后者允許你一次性控制多個Disposable
  • void subscribe(Subscriber<? super T> actual)
    Subscriber是一個訂閱器,里面包含了onSubscribe、onNext、onErroronError四種方法,使用它們可以更加方便的控制訂閱的操作。當(dāng)然實際中使用的還是BaseSubscriber,這個類繼承了Subscriber,但是它是抽象的,無法直接實例化。使用時直接使用匿名類進行實例化就好,這樣可以很好避免一個BaseSubscriber實例同時作為兩個不同訂閱操作的訂閱器所帶來的異常,因為根據(jù)Reactive Stream規(guī)則來說,一個訂閱器中的onNext方法只能被時序調(diào)用,而不同同時調(diào)用。
    你可以看下面這個例子:
public class CustomSubsriber<T> extends BaseSubscriber<T> {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("subscribed");
        request(1);
    }

    @Override
    protected void hookOnNext(T value) {
        System.out.println("get value:" + value);
        LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
        request(1);
    }

    @Override
    protected void hookOnComplete() {
        System.out.println("completed");
    }
}

二、背壓——流控

Reactor中,下游想要控制上游的流速,是通過request來實現(xiàn)的,request的總數(shù)代表著下游當(dāng)前的需求量,比如subscriber.request(1)就代表下游需要一個數(shù)據(jù),多了不要。如果你把request的值設(shè)為Long.MAX_VALUE,則它意味著下游可以接收無限制的數(shù)據(jù),比如Mono#block、Flux#blockFirstFlux#blockLast就默認(rèn)無限制接收數(shù)據(jù)。對了,這三個方法也是一種訂閱操作,使用它們會激活整個訂閱過程,Mono#blockFlux#blockFirst均表示接收流中第一個數(shù)據(jù)并返回,如果在等待中接收到了完成信號則返回null,同理Flux#blockLast表示只接收最后一個數(shù)據(jù),其會一直等待完成信號的到來。
值得注意的是:request并不一定恒定的,它可能會被整個上游中的某個操作修改,比如stringFlux.buffer(3).subscriber(null,null,null,sub -> {sub.request(2)})中,buffer(3)會將request進一步修改為6,因為sub.request(2)的2個請求是請求buffer(3)的輸出結(jié)果的,而一個buffer(3)結(jié)果需要其上游的3個數(shù)據(jù),故request被更改為了6

  • limitRequest也是一種流控操作,它接收一個參數(shù)作為“最大請求量”,如果下游總的請求量并沒有到達上限,那么一切照常,否則,在到達那一刻時,改操作將會給上游發(fā)出取消信號,并給下游發(fā)出完成信號
  • limitRate將請求分組,比如下游請求100個request(100),那么limitRate(10)將會把請求分為10個request(10),每完成一個request(10)則自動發(fā)出下一個request(10)

三、異步

Reactor中,如果不特別指定異步操作的話,那么整個流的發(fā)生到訂閱過程全部會執(zhí)行在subscribe那個線程中。所以最簡單的異步使用Reactor的方法就是新建一個線程,并在其中執(zhí)行subscribe,比如:

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); 

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> 
          System.out.println(v + Thread.currentThread().getName()) 
      )
  )
  t.start();
  t.join();
}

其結(jié)果為:

hello thread Thread-0

當(dāng)然Reactor其實提供更加簡便的異步操作方式,其中比較常用的就是publishOnSubscribeOn兩個方法了,這兩個方法都需要一個Scheduler類型的參數(shù),它控制著操作的執(zhí)行模式以及執(zhí)行執(zhí)行位置,單從表現(xiàn)上來看,倒是有點像ExecutorService。創(chuàng)建Scheduler你需要使用到Schedulers工廠類,里面定義了許多不同類型的Scheduler
1)Schedulers.immediate():直接在當(dāng)前線程中立刻執(zhí)行
2)Schedulers.single()/newSingle():提供一個單線程線程池以供操作,前者是一個固定的定義好的單線程線程池,后者你可以使用它來創(chuàng)建新的單線程線程池
3)Schedulers.elastic():相當(dāng)于提供了一個CachedThreadPool
4)Schedulers.parallel():相當(dāng)于提供了一個和Cpu核心數(shù)一樣多的核心線程數(shù)的線程池
5)Schedulers.fromExecutorService():從現(xiàn)有的ExecutorService中引入

  • subscribeOn
    subscribeOn會將整個流(包括數(shù)據(jù)源生成)放置在指定線程上執(zhí)行(具體由哪個線程執(zhí)行由前面所說的Scheduler來控制),注意,無論subscribeOn放在哪,它都將影響整個流,可以有多個subscribeOn,但是只有第一個會生效
  • publishOn
    publishOn會將其后的操作搬移到指定線程上執(zhí)行,注意,publishOn優(yōu)先級比subscribeOn高,而且多個publishOn都各自對其后的操作會有影響。

看個例子:

final Flux<String> flux = Flux
        .range(1, 2)
        .map(i -> {
            System.out.println("map1:"+Thread.currentThread().getName());
            LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
            return 10 + i;
        })
        .subscribeOn(s)
        .map(i -> {
            System.out.println("map2:"+Thread.currentThread().getName());
            LockSupport.parkNanos(Duration.ofSeconds(2).toNanos());
            return "value " + i;
        }).publishOn(Schedulers.single())
        .map(i -> {
            System.out.println("map3:"+Thread.currentThread().getName());
            return "mtk:"+i;
        });

new Thread(() -> flux.subscribe(System.out::println)).start();

結(jié)果如下:

map1:parallel-scheduler-1
map2:parallel-scheduler-1
map1:parallel-scheduler-1
map3:single-1
mtk:value 11
map2:parallel-scheduler-1
map3:single-1
mtk:value 12

從結(jié)果中我們可以發(fā)現(xiàn)整個流在沒有運行在subscribe方法調(diào)用時所在的線程中,因為有subscribeOn的緣故,整個流運行在parallel-scheduler-1線程中,但是在map3操作前,有個publishOn,其使得其后的操作運行在了single-1線程中


參考文檔:
[1] Reactor api doc
[2] Reactor reference doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容