1、java9

JDK9中的Flow API對應(yīng)響應(yīng)式流規(guī)范,響應(yīng)式流規(guī)范是一種事實(shí)標(biāo)準(zhǔn)。JEP 266包含了一組最小接口集合,這組接口能捕獲核心的異步發(fā)布與訂閱。希望在未來第三方能夠?qū)崿F(xiàn)這些接口,并且能共享其方式。

java.util.concurrent.Flow包含以下4個(gè)接口:

  • Flow.Processor(處理器)
  • Flow.Publisher(發(fā)布者)
  • Flow.Subscriber(訂閱者)
  • Flow.Subscription(訂閱管理器)

交互流程如下:


Publisher、Subscriber交互流程.png

很類似mq中的發(fā)布-訂閱模式:發(fā)布者發(fā)布數(shù)據(jù),訂閱者接收數(shù)據(jù)。

使用示例

public class FlowDemo {
    public static void main(String[] args) throws Exception {
        // 1. 定義發(fā)布者, 發(fā)布的數(shù)據(jù)類型是 Integer
        // 直接使用~jdk~自帶的SubmissionPublisher, 它實(shí)現(xiàn)了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
                this.subscription = subscription;
                // 請求一個(gè)數(shù)據(jù)
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個(gè)數(shù)據(jù), 處理
                System.out.println("接受到數(shù)據(jù): " + item);
                    TimeUnit.SECONDS.sleep(3);
                // 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
                this.subscription.request(1);

                // 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
                System.out.println("處理完了!");
            }

        };
        // 3. 發(fā)布者和訂閱者 建立訂閱關(guān)系
        publiser.subscribe(subscriber);

        // 4. 生產(chǎn)數(shù)據(jù), 并發(fā)布
        // 這里忽略數(shù)據(jù)生產(chǎn)過程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成數(shù)據(jù):" + i);
            // submit是個(gè)block方法
            publiser.submit(i);
        }

        // 5. 結(jié)束后 關(guān)閉發(fā)布者
        // 正式環(huán)境 應(yīng)該放 finally 或者使用 try-~resouce~ 確保關(guān)閉
        publiser.close();

        // 主線程延遲停止, 否則數(shù)據(jù)沒有消費(fèi)就退出
        Thread.currentThread().join(1000);
        // debug的時(shí)候, 下面這行需要有斷點(diǎn)
        // 否則主線程結(jié)束無法debug
        System.out.println();
    }
}

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * 帶 process 的 flow demo
 */

/**
 * Processor, 需要繼承SubmissionPublisher并實(shí)現(xiàn)Processor接口
 * 
 * 輸入源數(shù)據(jù) integer, 過濾掉小于0的, 然后轉(zhuǎn)換成字符串發(fā)布出去
 */
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
        this.subscription = subscription;

        // 請求一個(gè)數(shù)據(jù)
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個(gè)數(shù)據(jù), 處理
        System.out.println("處理器接受到數(shù)據(jù): " + item);

        // 過濾掉小于0的, 然后發(fā)布出去
        if (item > 0) {
            this.submit("轉(zhuǎn)換后的數(shù)據(jù):" + item);
        }

        // 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
        this.subscription.request(1);

        // 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
        throwable.printStackTrace();

        // 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
        System.out.println("處理器處理完了!");
        // 關(guān)閉發(fā)布者
        this.close();
    }

}

public class FlowDemoTest {

    public static void main(String[] args) throws Exception {
        // 1. 定義發(fā)布者, 發(fā)布的數(shù)據(jù)類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定義處理器, 對數(shù)據(jù)進(jìn)行過濾, 并轉(zhuǎn)換為String類型
        MyProcessor processor = new MyProcessor();

        // 3. 發(fā)布者 和 處理器 建立訂閱關(guān)系
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費(fèi) String 類型數(shù)據(jù)
        Subscriber<String> subscriber = new Subscriber<String>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關(guān)系, 需要用它來給發(fā)布者響應(yīng)
                this.subscription = subscription;
                // 請求一個(gè)數(shù)據(jù)
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個(gè)數(shù)據(jù), 處理
                System.out.println("接受到數(shù)據(jù): " + item);

                // 處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
                this.subscription.request(1);

                // 或者 已經(jīng)達(dá)到了目標(biāo), 調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
                System.out.println("處理完了!");
            }
        };

        // 5. 處理器 和 最終訂閱者 建立訂閱關(guān)系
        processor.subscribe(subscriber);

        // 6. 生產(chǎn)數(shù)據(jù), 并發(fā)布
        // 這里忽略數(shù)據(jù)生產(chǎn)過程
        //是阻塞方法 默認(rèn)緩沖356個(gè)數(shù)據(jù)
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結(jié)束后 關(guān)閉發(fā)布者
        // 正式環(huán)境 應(yīng)該放 finally 或者使用 try-resouce 確保關(guān)閉
        publiser.close();

        // 主線程延遲停止, 否則數(shù)據(jù)沒有消費(fèi)就退出
        Thread.currentThread().join(1000);
    }
}

背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調(diào)用request和cancel方法交互),可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。

我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會(huì)有一個(gè)緩沖池,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞,發(fā)布者就會(huì)停(慢)下來;訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法),緩沖池有位置了,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過這樣的機(jī)制,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費(fèi)得快,生成就快,消費(fèi)得慢,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來了。

Flow類包含defaultBufferSize()靜態(tài)方法,它返回發(fā)布者和訂閱者使用的緩沖區(qū)的默認(rèn)大小。 目前,它返回256。


參考:

Reactive Programming with JDK 9 Flow API | Oracle Community
Java 9 揭秘(17. Reactive Streams) - 林本托 - 博客園
Java 9新特點(diǎn): 響應(yīng)式流Reactive Streams
http://www.reactive-streams.org/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容