前言
上文簡單介紹了響應式編程和Reactor的使用,今天開始深入了解一下響應式編程的規(guī)范,為開始學習Reactor源碼做準備
Reactive streams
上文也簡單提到過,響應式的擴展庫有很多比如RxJava、Reactor,這就給上層應用帶來困擾,比如Spring中如果想支持響應式,那么到底是基于那個擴展庫開發(fā)吶?如果選擇了Reactor,今后想切換Reactor怎么辦?
導致這種問題的根本原因在于沒有同一的接口規(guī)范,就像jdk中有數(shù)據(jù)庫驅動的接口 Driver,各種各樣的數(shù)據(jù)庫只要去實現(xiàn)它我們的程序就可以自由切換,寫代碼時也不用關心底層實現(xiàn)
所以一些業(yè)界大佬共同定制了一個響應式規(guī)范即 Reactive Stream,maven可以通過以下方式引入
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
在java9中,響應式規(guī)范已被加入JDK中
接下來就結合Reactive streams接口代碼看看到底規(guī)范了什么內容
概念
從概念上講,Reactive streams所描述的響應式編程就是“發(fā)布者發(fā)布數(shù)據(jù),訂閱者根據(jù)發(fā)布結果作出對應響應”
所以響應式編程的兩個角色:發(fā)布者 and 訂閱者
當然發(fā)布者和訂閱者之間可以編排一些中間處理流程,這些中間過程對于上游來說是訂閱者,對于下游來說是發(fā)布者,所以既是發(fā)布者又是訂閱者
最后是背壓的支持,所謂背壓,并沒有多么深奧,簡單來說就是訂閱者能控制發(fā)布者的發(fā)布速度,此時發(fā)布的主動權在訂閱者手中,訂閱者要多少,發(fā)布者就發(fā)布多少

這張圖比較貼切:
- 圖1中,訂閱者是人,發(fā)布者是水瓶,訂閱者通過回給發(fā)布者壓力讓其發(fā)布(出水),再做出響應(喝水),發(fā)布的主動權在訂閱者,這就是
Backpressure(背壓) - 圖2中,訂閱者是人,發(fā)布者是噴頭,噴頭完全忽略訂閱者的響應快慢,一個勁的噴水,導致訂閱者響應不過來,發(fā)布的主動權在發(fā)布者
接口
在看Reactive streams源碼,發(fā)布者抽象: Publisher
public interface Publisher<T> {
// 訂閱
public void subscribe(Subscriber<? super T> s);
}
沒錯,作為一個發(fā)布者,最終要的方法是可以接受訂閱,至于如何發(fā)布等留給實現(xiàn)者自己去實現(xiàn)
再看一下訂閱者: Subscriber
public interface Subscriber<T> {
// 訂閱成功事件
public void onSubscribe(Subscription s);
// 接收到新消息事件
public void onNext(T t);
// 異常處理
public void onError(Throwable t);
// 訂閱完成、結束事件
public void onComplete();
}
定義了訂閱者要處理的四種事件,其實也間接的限定了訂閱發(fā)布的模式
- 訂閱成功時,會執(zhí)行onSubscribe回調
- 發(fā)布新消息時,會執(zhí)行onNext回調
- 發(fā)布出錯時,會執(zhí)行onError回調
- 訂閱結束時,會執(zhí)行onComplete回調
再看一下訂閱成功回調的返回:Subscription,代表“本次訂閱”,相當于一次成功訂閱的訂單,通過它訂閱者可以向發(fā)布者請求n個數(shù)據(jù)或主動取消訂閱,這就是對“背壓”的支持
public interface Subscription {
// 請求n個數(shù)據(jù)
public void request(long n);
// 取消訂閱
public void cancel();
}
再看一下中間處理,上文說到它即是訂閱者又是發(fā)布者:Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
其實就是繼承了訂閱者和發(fā)布者
Reactor
回到Reactor,它是Reactive streams的一個實現(xiàn),看看它是如何實現(xiàn)響應式規(guī)范的
Publisher
首先Reactor的Flux和Mono其實就是Reactive streams中的Publisher,只不過一個會發(fā)布0-N個,一個會發(fā)布0-1個

那么我們完全可以實現(xiàn)一個Reactive streams中的訂閱者去訂閱數(shù)據(jù)
Flux.just("a", "b", "c").subscribe(new Subscriber<String>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1); // 請求1個
}
@Override
public void onNext(String s) {
System.out.println(s); // 響應
subscription.request(1); // 再請求1個
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("completed"); // 完成
}
});
最終輸出
a
b
c
completed
Consumer
再看一下下面這種一般訂閱寫法
Flux.just("a", "b", "c", "d").subscribe(System.out::println);
此時subscribe訂閱的訂閱者是Reactor自己封裝的訂閱者: Consumer,在subscribe方法中最終會被轉換為LambdaSubscriber

而LambdaSubscriber就是Reactive streams中的Subscriber

之所以使用Consumer不需要去執(zhí)行request(n)是因為LambdaSubscriber在訂閱成功時就request(Long.MAX_VALUE)(一次性訂閱所有)

所以說Reactor可以支持背壓,但大部分常規(guī)寫法是不考慮背壓的,這主要因為一般場景真用不到
Processor
最后看一下reactor中的中間操作
Flux.just("a", "b", "c", "d").take(3).subscribe(System.out::println);
Reactor的中間操作如take、map、flatMap等并沒有實現(xiàn)Processor來做中間操作,而是自己定義了Operator中間操作
take:

map:

可能是Processor這個確實實現(xiàn)起來比較麻煩,而且其實主要符合發(fā)布訂閱規(guī)范就基本可以了,Reactor還是支持使用Processor的,但最新版本的Reactor中Processor已被無情棄用
最后
下一篇開始研究Reactor是如何實現(xiàn)Reactive streams規(guī)范的并提供響應式編程支持的,從源碼角度分析,并嘗試自己寫一個Reactive streams實現(xiàn)來對照Reactor源碼