響應(yīng)式框架reactor3的 使用其一

響應(yīng)式編程(Reactive programming) 是使用異步數(shù)據(jù)流(asynchronous data streams)進(jìn)行編程。
特性:

  • 異步編程: 提供了合適的異步編程模型,能夠挖掘多核CPU的能力、提高效率、降低延遲和阻塞等。
  • 數(shù)據(jù)流: 基于數(shù)據(jù)流模型,響應(yīng)式編程提供一套統(tǒng)一的Stream風(fēng)格的數(shù)據(jù)處理接口。和Java 8中的Stream相比,響應(yīng)式編程除了支持靜態(tài)數(shù)據(jù)流,還支持動態(tài)數(shù)據(jù)流,并且允許復(fù)用和同時接入多個訂閱者。
  • 變化傳播: 簡單來說就是以一個數(shù)據(jù)流為輸入,經(jīng)過一連串操作轉(zhuǎn)化為另一個數(shù)據(jù)流,然后分發(fā)給各個訂閱者的過程。這就有點像函數(shù)式編程中的組合函數(shù),將多個函數(shù)串聯(lián)起來,把一組輸入數(shù)據(jù)轉(zhuǎn)化為格式迥異的輸出數(shù)據(jù)。
    其他概念:背壓
    背壓: Backpressure 這個概念源自工程概念中的 Backpressure:在管道運輸中,氣流或液流由于管道突然變細(xì)、急彎等原因?qū)е掠赡程幊霈F(xiàn)了下游向上游的逆向壓力,這種情況稱作「back pressure」。這是一個很直觀的詞:向后的、往回的壓力。
    程序中解釋: 在數(shù)據(jù)流從上游生產(chǎn)者向下游消費者傳輸?shù)倪^程中,上游生產(chǎn)速度大于下游消費速度,導(dǎo)致下游的 Buffer 溢出,這種現(xiàn)象就叫做 Backpressure 出現(xiàn)。需要強(qiáng)調(diào)的是:這句話的重點不在于「上游生產(chǎn)速度大于下游消費速度」,而在于「Buffer 溢出」。
    在reactor3 中的體現(xiàn):
        Flux.interval(Duration.ofMillis(1)) // 每秒發(fā)送一條數(shù)據(jù)
                .log() // 打印日志
                //.limitRate(100) // 告訴上游我只能緩存100個 超過100個就會拋異常,下游每次消費100個后會告訴上游繼續(xù)發(fā)送數(shù)據(jù)
                .onBackpressureBuffer(2000) // 設(shè)置背壓緩存策略并設(shè)置緩存大小 當(dāng)緩存超過2000 個就拋異常 這個方法會覆蓋掉 limitRate
                .concatMap(x -> {
                    System.out.println(x);
                 return   Mono.delay(Duration.ofMillis(100));}) // concatMap 表示將上游的數(shù)據(jù)組裝成一個flux
                .blockLast(); // 無限阻塞 知道執(zhí)行完最后一條數(shù)據(jù)

上面代碼回拋出異常如下:

19:57:58.204 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2015)
18
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2016)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2017)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2018)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2019)
19:57:58.222 [parallel-1] INFO reactor.Flux.Interval.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
    at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:170)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2497)
        at FluxCreateTest.main(FluxCreateTest.java:17)

從打印的日志可以看出 concatMap 這個方法已經(jīng)執(zhí)行了 18條數(shù)據(jù) ,背壓緩存里緩存了2000 條數(shù)據(jù)。在2019 這里超出限制 2019-18 = 2001>2000 所以拋出異常

下面簡單介紹下 reactor3 相關(guān)方法
Flux & Mono 這兩個類

Flux 和 Mono 都是數(shù)據(jù)流的發(fā)布者,使用 Flux 和 Mono 都可以發(fā)出三種數(shù)據(jù)信號:元素值,錯誤信號,完成信號;錯誤信號和完成信號都代表終止信號,終止信號用于告訴訂閱者數(shù)據(jù)流結(jié)束了,錯誤信號終止數(shù)據(jù)流同時把錯誤信息傳遞給訂閱者。

三種信號的特點:
  • 錯誤信號和完成信號都是終止信號,不能共存;
  • 如果沒有發(fā)送任何元素值,而是直接發(fā)送錯誤或者完成信號,表示是空數(shù)據(jù)流;
  • 如果沒有錯誤信號,也沒有完成信號,表示是無限數(shù)據(jù)流;

Flux<T> 是一個標(biāo)準(zhǔn)的 Publisher<T>,表示為發(fā)出0到N個元素的異步序列;
Mono<T> 是一個特定的 Publisher<T>,最多可以發(fā)出一個元素,可以被 onComplete 或 onError 信號選擇性終止;

        Mono.create(monoSink -> {
            monoSink.success(11); // 因為Mono 表示單數(shù)據(jù) 因此包含了complete() 方法
           monoSink.error(new RuntimeException("這是個異常")); // 這條數(shù)據(jù)不會執(zhí)行
        })
        Flux.create(fluxSink -> {
           fluxSink.next("value"); // 元素
           fluxSink.complete();  // 完成         //個人認(rèn)為 這里相當(dāng)于 monoSink.Success() = next()+complete(); 
           fluxSink.error( new RuntimeException()); // 異常
        });

上面是數(shù)據(jù)的創(chuàng)建者下面還有寫常用的數(shù)據(jù)創(chuàng)建者

// 將list 作為數(shù)據(jù)源添加到flux
Flux.fromIterable(List.of(1,2,3))
// 將流作為數(shù)據(jù)源
Flux.fromStream(List.of(1,2,3).stream());
// 立即創(chuàng)建一個數(shù)據(jù)源 餓漢模式
Flux.just(List.of());
// 對應(yīng)的就有懶漢模式 這里每次調(diào)用就會返回一個
Flux.defer(()->Flux.just(1));

// 如果我想定時的創(chuàng)造數(shù)據(jù)流怎么辦
Flux.interval(Duration.ofMillis(1)) // 每秒發(fā)送一條數(shù)據(jù)
// 當(dāng)我的數(shù)據(jù)生產(chǎn)者不止一個怎么做
Flux.create();

Mono.

訂閱者

// 阻塞的
Mono.just(1).block();
// 非阻塞的
Mono.just(1).subscribe();
// flux 非阻塞的
Flux.just(1).subscribe();
// flux 阻塞 第一條數(shù)據(jù)
Flux.just(1).blockFirst();
// 阻塞最后一條數(shù)據(jù)
Flux.just(1).blockLast();



使用flux 實現(xiàn)文件的讀寫

        Flux.fromStream(Files.lines(Paths.get("17336.txt")))
                .log()
                .subscribe(new BaseSubscriber<String>() {
                    BufferedWriter bufferedWriter = Files.newBufferedWriter(Paths.get("凡人修仙傳.txt"));
                    @Override
                    @SneakyThrows
                    protected void hookOnNext(String value) {
                        super.hookOnNext(value);
                        bufferedWriter.write(value+"\n");
                        if(value.contains("------------")){ // 每寫入一個章節(jié)就將數(shù)據(jù)寫入文件并刷新緩存
                            bufferedWriter.flush();
                        }
                    }

                    @Override
                    @SneakyThrows
                    protected void hookOnComplete() {
                        super.hookOnComplete();
                        bufferedWriter.close();
                    }
                });

冷數(shù)據(jù)發(fā)布者VS熱數(shù)據(jù)發(fā)布者

首先來看定義:
冷數(shù)據(jù)發(fā)布者: 在向訂閱者發(fā)布數(shù)據(jù)的時候都會從起始位置開始,如果沒有訂閱者就不會做任何事情。
熱數(shù)據(jù)發(fā)布者:冷數(shù)據(jù)和熱數(shù)據(jù)相反,即當(dāng)一個新的訂閱者來訂閱流的時候會在最新的位置開始發(fā)送數(shù)據(jù)。

熱數(shù)據(jù)發(fā)布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        ConnectableFlux hotFlux = coldFlux.publish();
        hotFlux.subscribe((s)->{
            System.out.println(s);
        });
        hotFlux.connect();
        Thread.sleep(6000);
        hotFlux.subscribe(s->{
            System.out.println(s);
        });
        Thread.sleep(1000000);

執(zhí)行結(jié)果

16:53:10.474 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:53:10.495 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:53:10.498 [main] INFO reactor.Flux.Interval.1 - request(256)
16:53:12.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
熱數(shù)據(jù)第一個訂閱者:0
16:53:12.519 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
熱數(shù)據(jù)第一個訂閱者:1
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
熱數(shù)據(jù)第一個訂閱者:2
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
熱數(shù)據(jù)第一個訂閱者:3
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:20.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
熱數(shù)據(jù)第一個訂閱者:4
熱數(shù)據(jù)第二個發(fā)布者:4
16:53:20.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:22.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
熱數(shù)據(jù)第一個訂閱者:5
熱數(shù)據(jù)第二個發(fā)布者:5
冷數(shù)據(jù)發(fā)布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        coldFlux.subscribe((s)->{
            System.out.println("第一個訂閱者:"+s);
        });
        Thread.sleep(6000);
        coldFlux.subscribe(s->{
            System.out.println("第二個訂閱者:"+s);
        });
        Thread.sleep(1000000);

執(zhí)行結(jié)果

16:36:52.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:36:52.806 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:52.807 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:36:54.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
第一個訂閱者:0
16:36:56.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
第一個訂閱者:1
16:36:58.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
第一個訂閱者:2
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:37:00.819 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
16:37:00.819 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
第一個訂閱者:3
第二個訂閱者:0
16:37:02.811 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
第一個訂閱者:4
16:37:02.826 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
第二個訂閱者:1
16:37:04.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
第一個訂閱者:5
16:37:04.827 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
第二個訂閱者:2

異步與并行

講到這里目前看到當(dāng)前調(diào)用的方法基本上都是同步的,除了Flux.Interval()。但是interavl 這個方法局限性太大,那么有沒有讓整個流異步的方法呢?
先看代碼:

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 161, 17, 18, 19, 20, 21, 22);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(2)
                .runOn(Schedulers.parallel()) // 執(zhí)行
                .log()
                .subscribe(System.out::println);
        //Thread.sleep(1_000_000);

執(zhí)行結(jié)果

17:32:52.962 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:32:52.990 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.991 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)

Process finished with exit code 0

看到執(zhí)行結(jié)果可以判斷這段代碼是異步非阻塞的,相關(guān)的日志并沒有被打印出來
下面將線程休眠注釋關(guān)掉 執(zhí)行結(jié)果

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(8)
                .runOn(Schedulers.parallel())
                .log()
                .subscribe(System.out::println);
        Thread.sleep(1_000_000);

執(zhí)行結(jié)果:

16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onNext(1)
1
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onNext(2)
2
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onNext(3)
3
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onNext(4)
4
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onNext(5)
5
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onNext(7)
7
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onNext(6)
6
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onNext(8)
8
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onComplete()

上述代碼設(shè)計到兩個方法 parallel(),runOn() ;其中parallel() 這個方法 我在看國內(nèi)的文章里說這個是開啟異步的方法,但是我在看國外的文章與相關(guān)的文檔的時候并沒有說他是開啟異步的方法。而是開啟并行的方法。那這里就有歧義,那這里就需要驗證下;看如下代碼:

        Flux.range(1,10).parallel(3).log().subscribe(System.out::println);

執(zhí)行結(jié)果:

10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(1)
1
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(2)
2
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(3)
3
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(4)
4
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(5)
5
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(6)
6
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(7)
7
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(8)
8
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(9)
9
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(10)
10
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()

執(zhí)行結(jié)果發(fā)現(xiàn)并不是異步的,那這個方法到底是干什么的?
其實文檔里說的很明白了,就是并行,他的作用就是拓寬通道,本來只有一個通道的,在我使用parallel(3)方法的時候通道變?yōu)? 個, 之后訂閱者訂閱這三個通道。


image.png

publishOn VS subscribeOn 后續(xù)有機(jī)會再說

相關(guān)文檔:
背壓解釋
官方文檔
publishOn 和 subscribeOn

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容