聲明:
一、subscribe方法
注意:所有的操作只有在訂閱的那一刻才開始進行?。?!
subscribe方法有兩種常用的形式:
-
Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
這四個參數(shù)的意義如下:
1)consumer:收到一個數(shù)據(jù)時的回調(diào)
2)errorConsumer:上游報告出現(xiàn)錯誤信號時的回調(diào)
3)completeConsumer:上游報告完成信號時的回調(diào)
4)subscriptionConsumer:提供一個Subscription類型的對象,你可以使用他對上游流量進行反饋控制
Disposable返回類型:使用Disposable#dispose方法可以取消訂閱行為,在Flux和Mono中,取消意味著告知數(shù)據(jù)源不再生產(chǎn)數(shù)據(jù),但這種取消行為并不一定是及時的,也許數(shù)據(jù)源產(chǎn)生數(shù)據(jù)非???,在接收到取消信號之前就完成所有數(shù)據(jù)的生成。
Disposable有一個工具類Disposables,它主要提供了Disposable.Swap和Disposable.Composite兩種包裝類的工廠方法,前者允許你對一個Disposable進行替換(不取消容器中當(dāng)前的Disposable,僅作替換)以及更新(取消容器中當(dāng)前的Disposable,并替換為新的Dsiposable)操作,后者允許你一次性控制多個Disposable -
void subscribe(Subscriber<? super T> actual)
Subscriber是一個訂閱器,里面包含了onSubscribe、onNext、onError、onError四種方法,使用它們可以更加方便的控制訂閱的操作。當(dāng)然實際中使用的還是BaseSubscriber,這個類繼承了Subscriber,但是它是抽象的,無法直接實例化。使用時直接使用匿名類進行實例化就好,這樣可以很好避免一個BaseSubscriber實例同時作為兩個不同訂閱操作的訂閱器所帶來的異常,因為根據(jù)Reactive Stream規(guī)則來說,一個訂閱器中的onNext方法只能被時序調(diào)用,而不同同時調(diào)用。
你可以看下面這個例子:
public class CustomSubsriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("subscribed");
request(1);
}
@Override
protected void hookOnNext(T value) {
System.out.println("get value:" + value);
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("completed");
}
}
二、背壓——流控
在Reactor中,下游想要控制上游的流速,是通過request來實現(xiàn)的,request的總數(shù)代表著下游當(dāng)前的需求量,比如subscriber.request(1)就代表下游需要一個數(shù)據(jù),多了不要。如果你把request的值設(shè)為Long.MAX_VALUE,則它意味著下游可以接收無限制的數(shù)據(jù),比如Mono#block、Flux#blockFirst、Flux#blockLast就默認(rèn)無限制接收數(shù)據(jù)。對了,這三個方法也是一種訂閱操作,使用它們會激活整個訂閱過程,Mono#block和Flux#blockFirst均表示接收流中第一個數(shù)據(jù)并返回,如果在等待中接收到了完成信號則返回null,同理Flux#blockLast表示只接收最后一個數(shù)據(jù),其會一直等待完成信號的到來。
值得注意的是:request并不一定恒定的,它可能會被整個上游中的某個操作修改,比如stringFlux.buffer(3).subscriber(null,null,null,sub -> {sub.request(2)})中,buffer(3)會將request進一步修改為6,因為sub.request(2)的2個請求是請求buffer(3)的輸出結(jié)果的,而一個buffer(3)結(jié)果需要其上游的3個數(shù)據(jù),故request被更改為了6
-
limitRequest也是一種流控操作,它接收一個參數(shù)作為“最大請求量”,如果下游總的請求量并沒有到達上限,那么一切照常,否則,在到達那一刻時,改操作將會給上游發(fā)出取消信號,并給下游發(fā)出完成信號 -
limitRate將請求分組,比如下游請求100個request(100),那么limitRate(10)將會把請求分為10個request(10),每完成一個request(10)則自動發(fā)出下一個request(10)
三、異步
在Reactor中,如果不特別指定異步操作的話,那么整個流的發(fā)生到訂閱過程全部會執(zhí)行在subscribe那個線程中。所以最簡單的異步使用Reactor的方法就是新建一個線程,并在其中執(zhí)行subscribe,比如:
public static void main(String[] args) throws InterruptedException {
final Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
)
t.start();
t.join();
}
其結(jié)果為:
hello thread Thread-0
當(dāng)然Reactor其實提供更加簡便的異步操作方式,其中比較常用的就是publishOn和SubscribeOn兩個方法了,這兩個方法都需要一個Scheduler類型的參數(shù),它控制著操作的執(zhí)行模式以及執(zhí)行執(zhí)行位置,單從表現(xiàn)上來看,倒是有點像ExecutorService。創(chuàng)建Scheduler你需要使用到Schedulers工廠類,里面定義了許多不同類型的Scheduler:
1)Schedulers.immediate():直接在當(dāng)前線程中立刻執(zhí)行
2)Schedulers.single()/newSingle():提供一個單線程線程池以供操作,前者是一個固定的定義好的單線程線程池,后者你可以使用它來創(chuàng)建新的單線程線程池
3)Schedulers.elastic():相當(dāng)于提供了一個CachedThreadPool
4)Schedulers.parallel():相當(dāng)于提供了一個和Cpu核心數(shù)一樣多的核心線程數(shù)的線程池
5)Schedulers.fromExecutorService():從現(xiàn)有的ExecutorService中引入
-
subscribeOn
subscribeOn會將整個流(包括數(shù)據(jù)源生成)放置在指定線程上執(zhí)行(具體由哪個線程執(zhí)行由前面所說的Scheduler來控制),注意,無論subscribeOn放在哪,它都將影響整個流,可以有多個subscribeOn,但是只有第一個會生效 -
publishOn
publishOn會將其后的操作搬移到指定線程上執(zhí)行,注意,publishOn優(yōu)先級比subscribeOn高,而且多個publishOn都各自對其后的操作會有影響。
看個例子:
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> {
System.out.println("map1:"+Thread.currentThread().getName());
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
return 10 + i;
})
.subscribeOn(s)
.map(i -> {
System.out.println("map2:"+Thread.currentThread().getName());
LockSupport.parkNanos(Duration.ofSeconds(2).toNanos());
return "value " + i;
}).publishOn(Schedulers.single())
.map(i -> {
System.out.println("map3:"+Thread.currentThread().getName());
return "mtk:"+i;
});
new Thread(() -> flux.subscribe(System.out::println)).start();
結(jié)果如下:
map1:parallel-scheduler-1
map2:parallel-scheduler-1
map1:parallel-scheduler-1
map3:single-1
mtk:value 11
map2:parallel-scheduler-1
map3:single-1
mtk:value 12
從結(jié)果中我們可以發(fā)現(xiàn)整個流在沒有運行在
subscribe方法調(diào)用時所在的線程中,因為有subscribeOn的緣故,整個流運行在parallel-scheduler-1線程中,但是在map3操作前,有個publishOn,其使得其后的操作運行在了single-1線程中
參考文檔:
[1] Reactor api doc
[2] Reactor reference doc