JDK9中的Flow API對應(yīng)響應(yīng)式流規(guī)范,響應(yīng)式流規(guī)范是一種事實(shí)標(biāo)準(zhǔn)。JEP 266包含了一組最小接口集合,這組接口能捕獲核心的異步發(fā)布與訂閱。希望在未來第三方能夠?qū)崿F(xiàn)這些接口,并且能共享其方式。
java.util.concurrent.Flow包含以下4個(gè)接口:
- Flow.Processor(處理器)
- Flow.Publisher(發(fā)布者)
- Flow.Subscriber(訂閱者)
- Flow.Subscription(訂閱管理器)
交互流程如下:

很類似mq中的發(fā)布-訂閱模式:發(fā)布者發(fā)布數(shù)據(jù),訂閱者接收數(shù)據(jù)。
使用示例
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定義發(fā)布者, 發(fā)布的數(shù)據(jù)類型是 Integer
// 直接使用~jdk~自帶的SubmissionPublisher, 它實(shí)現(xiàn)了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定義訂閱者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
this.subscription = subscription;
// 請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個(gè)數(shù)據(jù), 處理
System.out.println("接受到數(shù)據(jù): " + item);
TimeUnit.SECONDS.sleep(3);
// 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
// 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理完了!");
}
};
// 3. 發(fā)布者和訂閱者 建立訂閱關(guān)系
publiser.subscribe(subscriber);
// 4. 生產(chǎn)數(shù)據(jù), 并發(fā)布
// 這里忽略數(shù)據(jù)生產(chǎn)過程
for (int i = 0; i < 1000; i++) {
System.out.println("生成數(shù)據(jù):" + i);
// submit是個(gè)block方法
publiser.submit(i);
}
// 5. 結(jié)束后 關(guān)閉發(fā)布者
// 正式環(huán)境 應(yīng)該放 finally 或者使用 try-~resouce~ 確保關(guān)閉
publiser.close();
// 主線程延遲停止, 否則數(shù)據(jù)沒有消費(fèi)就退出
Thread.currentThread().join(1000);
// debug的時(shí)候, 下面這行需要有斷點(diǎn)
// 否則主線程結(jié)束無法debug
System.out.println();
}
}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* 帶 process 的 flow demo
*/
/**
* Processor, 需要繼承SubmissionPublisher并實(shí)現(xiàn)Processor接口
*
* 輸入源數(shù)據(jù) integer, 過濾掉小于0的, 然后轉(zhuǎn)換成字符串發(fā)布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
this.subscription = subscription;
// 請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個(gè)數(shù)據(jù), 處理
System.out.println("處理器接受到數(shù)據(jù): " + item);
// 過濾掉小于0的, 然后發(fā)布出去
if (item > 0) {
this.submit("轉(zhuǎn)換后的數(shù)據(jù):" + item);
}
// 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
// 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理器處理完了!");
// 關(guān)閉發(fā)布者
this.close();
}
}
public class FlowDemoTest {
public static void main(String[] args) throws Exception {
// 1. 定義發(fā)布者, 發(fā)布的數(shù)據(jù)類型是 Integer
// 直接使用jdk自帶的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定義處理器, 對數(shù)據(jù)進(jìn)行過濾, 并轉(zhuǎn)換為String類型
MyProcessor processor = new MyProcessor();
// 3. 發(fā)布者 和 處理器 建立訂閱關(guān)系
publiser.subscribe(processor);
// 4. 定義最終訂閱者, 消費(fèi) String 類型數(shù)據(jù)
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
this.subscription = subscription;
// 請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一個(gè)數(shù)據(jù), 處理
System.out.println("接受到數(shù)據(jù): " + item);
// 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
this.subscription.request(1);
// 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
throwable.printStackTrace();
// 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
System.out.println("處理完了!");
}
};
// 5. 處理器 和 最終訂閱者 建立訂閱關(guān)系
processor.subscribe(subscriber);
// 6. 生產(chǎn)數(shù)據(jù), 并發(fā)布
// 這里忽略數(shù)據(jù)生產(chǎn)過程
//是阻塞方法 默認(rèn)緩沖356個(gè)數(shù)據(jù)
publiser.submit(-111);
publiser.submit(111);
// 7. 結(jié)束后 關(guān)閉發(fā)布者
// 正式環(huán)境 應(yīng)該放 finally 或者使用 try-resouce 確保關(guān)閉
publiser.close();
// 主線程延遲停止, 否則數(shù)據(jù)沒有消費(fèi)就退出
Thread.currentThread().join(1000);
}
}
背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調(diào)用request和cancel方法交互),可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。
我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會(huì)有一個(gè)緩沖池,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞,發(fā)布者就會(huì)停(慢)下來;訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法),緩沖池有位置了,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過這樣的機(jī)制,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費(fèi)得快,生成就快,消費(fèi)得慢,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來了。
Flow類包含defaultBufferSize()靜態(tài)方法,它返回發(fā)布者和訂閱者使用的緩沖區(qū)的默認(rèn)大小。 目前,它返回256。
參考:
Reactive Programming with JDK 9 Flow API | Oracle Community
Java 9 揭秘(17. Reactive Streams) - 林本托 - 博客園
Java 9新特點(diǎn): 響應(yīng)式流Reactive Streams
http://www.reactive-streams.org/