一、概述
Reactive Streams 規(guī)范描述了開(kāi)發(fā)一個(gè)響應(yīng)式編程框架需要遵循的接口規(guī)范和相關(guān)原則。reactive-streams-jvm 項(xiàng)目中包含了 Reactive Streams 規(guī)范設(shè)計(jì)的四個(gè)主要接口和一些接口實(shí)現(xiàn)的 example。為了更好的理解響應(yīng)式編程的核心理念,故對(duì) reactive-streams-jvm 源碼下的 examples 實(shí)現(xiàn)代碼進(jìn)行分析。
這里重點(diǎn)分析 examples 下的 Publisher、Subscriber、Subscription、Processor 接口實(shí)現(xiàn)之間的交互流程,不會(huì)對(duì)背壓、異步以及響應(yīng)式等核心的概念進(jìn)行深入分析。
reactive-streams-jvm 源碼:Github 地址
發(fā)布 - 訂閱模型
Reactive Streams 核心接口類圖
二、AsyncIterablePublisher
簡(jiǎn)單描述一下
AsyncIterablePublisher 是 exmples 中的其中一個(gè)實(shí)現(xiàn),正如其名,它是一個(gè)使用了 Iterable 的異步序列的發(fā)布者對(duì)象,其構(gòu)造方法接收以下三個(gè)參數(shù):
- Iterable 作為數(shù)據(jù)元素的數(shù)據(jù)源。
- Executor 用作異步執(zhí)行線程的調(diào)度器。
- BatchSize 可選參數(shù),標(biāo)明數(shù)據(jù)元素的的上限,默認(rèn)值是1024。
// AsyncIterablePublisher.java
public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
this(elements, DEFAULT_BATCHSIZE, executor);
}
public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
...
}
當(dāng) AsyncIterablePublisher 的 subscribe 方法被調(diào)用的時(shí)候,AsyncIterablePublisher 的內(nèi)部類 SubscriptionImpl 的對(duì)象會(huì)被創(chuàng)建并初始化。SubscriptionImpl 對(duì)象初始化的方法中,會(huì)回調(diào) Subscriber 的 onSubscribe 方法。
// AsyncIterablePublisher.java
@Override
public void subscribe(final Subscriber<? super T> s) {
new SubscriptionImpl(s).init();
}
這里涉及到例外一個(gè)關(guān)鍵的地方是 Signal 接口,Signal 接口的對(duì)象標(biāo)識(shí)異步信號(hào),SubscriptionImpl 類中有一個(gè) inboundSignals 字段,用于接受 Signal 對(duì)象,當(dāng) run 方法被驅(qū)動(dòng)執(zhí)行的時(shí)候,會(huì)從 inboundSignals 隊(duì)列中取出 Signal 對(duì)象并處理。
// AsyncIterablePublisher.java
static interface Signal {};
enum Cancel implements Signal { Instance; };
enum Subscribe implements Signal { Instance; };
enum Send implements Signal { Instance; };
static final class Request implements Signal {
final long n;
Request(final long n) {
this.n = n;
}
};
核心類圖
主要流程
SubscriptionImpl 事件輪詢處理邏輯
從上面的類圖可以看到,SubscriptionImple 類同時(shí)實(shí)現(xiàn)了 Runnable 對(duì)象,是可以被 Executor 調(diào)度執(zhí)行的,因此我們可以猜測(cè)核心的事件處理機(jī)制就在這里。
// SubscriptionImpl 的 run 方法實(shí)現(xiàn)
@Override public final void run() {
if(on.get()) {
try {
final Signal s = inboundSignals.poll();
if (!cancelled) {
// 根據(jù)類型處理消息
...
}
} finally {
on.set(false);
if(!inboundSignals.isEmpty())
tryScheduleToExecute();
}
}
}
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) {
if (!cancelled) {
doCancel();
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear();
on.set(false);
}
}
}
}
}
上述代碼省略了處理具體信號(hào)的邏輯
- on 是一個(gè) AtomicBoolean 類型的對(duì)象,初始值為 flase,在這里做同步鎖的作用,保證同時(shí)只有一個(gè)線程可以執(zhí)行 run 方法。
- tryScheduleToExecute 判斷是否正在執(zhí)行信號(hào)處理任務(wù),如果沒(méi)有,則使用 Executor 調(diào)度 this 對(duì)象啟動(dòng)信號(hào)處理。
- tryScheduleToExecute 是一個(gè)關(guān)鍵方法,每當(dāng)在任何地方收到信號(hào)后,都會(huì)調(diào)用此方法嘗試進(jìn)行信號(hào)處理操作。
具體的業(yè)務(wù)流程圖:
subscribe 事件處理
Subscriber 調(diào)用 AsyncIterablePublisher 的 subscribe 方法的時(shí)候,SubscriptionImpl 對(duì)象就會(huì)被初始化并進(jìn)入 SubscriptionImpl 對(duì)象的 init 方法。init 方法會(huì)將 subscribe 信號(hào)添加到信號(hào)隊(duì)列,并調(diào)用 tryScheduleToExecute 方法請(qǐng)求調(diào)度。
當(dāng) subscribe 信號(hào)被處理的時(shí)候,會(huì)調(diào)用 doSubscribe 方法:
private void doSubscribe() {
// 前面是處理 iterator 對(duì)象的邏輯
...
if (!cancelled) {
try {
subscriber.onSubscribe(this);
} catch(final Throwable t) {
terminateDueTo(new IllegalStateException("XXX", t));
}
// Deal with already complete iterators promptly
boolean hasElements = false;
try {
hasElements = iterator.hasNext();
} catch(final Throwable t) {
terminateDueTo(t);
}
if (!hasElements) {
try {
doCancel();
subscriber.onComplete();
} catch(final Throwable t) {
(new IllegalStateException(subscriber + " xxx",t))
.printStackTrace(System.err);
}
}
}
}
- 省略前面處理 iterator 對(duì)象的邏輯,當(dāng)進(jìn)入此方法的時(shí)候,首先回調(diào) subscriber 的 onSubscribe 方法。
- 如果數(shù)據(jù)源中沒(méi)有數(shù)據(jù),會(huì)取消并通知 subscriber 數(shù)據(jù)已發(fā)送完畢。
request 事件處理
當(dāng) SubscriptionImpl 的 request 方法被調(diào)用的時(shí)候,request 方法會(huì)發(fā)送 Request 信號(hào)到信號(hào)隊(duì)列,并調(diào)用 tryScheduleToExecute 嘗試請(qǐng)求調(diào)度器處理信號(hào)。
當(dāng) request 信號(hào)被調(diào)度的時(shí)候,doRequest 方法會(huì)被調(diào)用,此方法的主要作用主要是用于更新當(dāng)前的需求變量(demand),然后調(diào)用 doSend() 方法發(fā)送數(shù)據(jù)給 Subscriber。
private void doRequest(final long n) {
if (n < 1)
terminateDueTo(new IllegalArgumentException(subscriber + "..."));
else if (demand + n < 1) {
demand = Long.MAX_VALUE;
doSend();
} else {
demand += n;
doSend();
}
}
doSend 方法主要是用來(lái)給 Subscriber 發(fā)送數(shù)據(jù)的。其主要邏輯是執(zhí)行背壓需求判斷,如果符合背壓需求,并且還有數(shù)據(jù)就調(diào)用 Subscriber 的 onNext 方法將數(shù)據(jù)發(fā)送給下游。
這里如果不滿足背壓需求,會(huì)發(fā)出 Send 信號(hào),等待下次被調(diào)度。
如果數(shù)據(jù)已經(jīng)發(fā)送完成,則調(diào)用 Subscriber 的 onComplete 方法通知下游。
private void doSend() {
try {
int leftInBatch = batchSize;
do {
T next;
boolean hasNext;
try {
next = iterator.next();
hasNext = iterator.hasNext();
} catch (final Throwable t) {
terminateDueTo(t);
return;
}
subscriber.onNext(next);
if (!hasNext) {
doCancel();
subscriber.onComplete();
}
} while (!cancelled
&& --leftInBatch > 0
&& --demand > 0);
if (!cancelled && demand > 0)
signal(Send.Instance);
} catch(final Throwable t) {
doCancel();
(new IllegalStateException(subscriber + " ...", t))
.printStackTrace(System.err);
}
}
cancel 事件處理
當(dāng) SubscriptionImpl 的 cancel 方法被調(diào)用的時(shí)候,會(huì)添加 cancel 信號(hào)到信號(hào)隊(duì)列,并艙室請(qǐng)求處理信號(hào)消息,當(dāng) cancel 信號(hào)被調(diào)度到的時(shí)候 doCancel 方法會(huì)被調(diào)用。
private void doCancel() {
cancelled = true;
}