響應(yīng)式編程(Reactive programming) 是使用異步數(shù)據(jù)流(asynchronous data streams)進(jìn)行編程。
特性:
- 異步編程: 提供了合適的異步編程模型,能夠挖掘多核CPU的能力、提高效率、降低延遲和阻塞等。
- 數(shù)據(jù)流: 基于數(shù)據(jù)流模型,響應(yīng)式編程提供一套統(tǒng)一的Stream風(fēng)格的數(shù)據(jù)處理接口。和Java 8中的Stream相比,響應(yīng)式編程除了支持靜態(tài)數(shù)據(jù)流,還支持動態(tài)數(shù)據(jù)流,并且允許復(fù)用和同時接入多個訂閱者。
- 變化傳播: 簡單來說就是以一個數(shù)據(jù)流為輸入,經(jīng)過一連串操作轉(zhuǎn)化為另一個數(shù)據(jù)流,然后分發(fā)給各個訂閱者的過程。這就有點像函數(shù)式編程中的組合函數(shù),將多個函數(shù)串聯(lián)起來,把一組輸入數(shù)據(jù)轉(zhuǎn)化為格式迥異的輸出數(shù)據(jù)。
其他概念:背壓
背壓: Backpressure 這個概念源自工程概念中的 Backpressure:在管道運輸中,氣流或液流由于管道突然變細(xì)、急彎等原因?qū)е掠赡程幊霈F(xiàn)了下游向上游的逆向壓力,這種情況稱作「back pressure」。這是一個很直觀的詞:向后的、往回的壓力。
程序中解釋: 在數(shù)據(jù)流從上游生產(chǎn)者向下游消費者傳輸?shù)倪^程中,上游生產(chǎn)速度大于下游消費速度,導(dǎo)致下游的 Buffer 溢出,這種現(xiàn)象就叫做 Backpressure 出現(xiàn)。需要強(qiáng)調(diào)的是:這句話的重點不在于「上游生產(chǎn)速度大于下游消費速度」,而在于「Buffer 溢出」。
在reactor3 中的體現(xiàn):
Flux.interval(Duration.ofMillis(1)) // 每秒發(fā)送一條數(shù)據(jù)
.log() // 打印日志
//.limitRate(100) // 告訴上游我只能緩存100個 超過100個就會拋異常,下游每次消費100個后會告訴上游繼續(xù)發(fā)送數(shù)據(jù)
.onBackpressureBuffer(2000) // 設(shè)置背壓緩存策略并設(shè)置緩存大小 當(dāng)緩存超過2000 個就拋異常 這個方法會覆蓋掉 limitRate
.concatMap(x -> {
System.out.println(x);
return Mono.delay(Duration.ofMillis(100));}) // concatMap 表示將上游的數(shù)據(jù)組裝成一個flux
.blockLast(); // 無限阻塞 知道執(zhí)行完最后一條數(shù)據(jù)
上面代碼回拋出異常如下:
19:57:58.204 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2015)
18
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2016)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2017)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2018)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2019)
19:57:58.222 [parallel-1] INFO reactor.Flux.Interval.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:170)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2497)
at FluxCreateTest.main(FluxCreateTest.java:17)
從打印的日志可以看出 concatMap 這個方法已經(jīng)執(zhí)行了 18條數(shù)據(jù) ,背壓緩存里緩存了2000 條數(shù)據(jù)。在2019 這里超出限制 2019-18 = 2001>2000 所以拋出異常
下面簡單介紹下 reactor3 相關(guān)方法
Flux & Mono 這兩個類
Flux 和 Mono 都是數(shù)據(jù)流的發(fā)布者,使用 Flux 和 Mono 都可以發(fā)出三種數(shù)據(jù)信號:元素值,錯誤信號,完成信號;錯誤信號和完成信號都代表終止信號,終止信號用于告訴訂閱者數(shù)據(jù)流結(jié)束了,錯誤信號終止數(shù)據(jù)流同時把錯誤信息傳遞給訂閱者。
三種信號的特點:
- 錯誤信號和完成信號都是終止信號,不能共存;
- 如果沒有發(fā)送任何元素值,而是直接發(fā)送錯誤或者完成信號,表示是空數(shù)據(jù)流;
- 如果沒有錯誤信號,也沒有完成信號,表示是無限數(shù)據(jù)流;
Flux<T> 是一個標(biāo)準(zhǔn)的 Publisher<T>,表示為發(fā)出0到N個元素的異步序列;
Mono<T> 是一個特定的 Publisher<T>,最多可以發(fā)出一個元素,可以被 onComplete 或 onError 信號選擇性終止;
Mono.create(monoSink -> {
monoSink.success(11); // 因為Mono 表示單數(shù)據(jù) 因此包含了complete() 方法
monoSink.error(new RuntimeException("這是個異常")); // 這條數(shù)據(jù)不會執(zhí)行
})
Flux.create(fluxSink -> {
fluxSink.next("value"); // 元素
fluxSink.complete(); // 完成 //個人認(rèn)為 這里相當(dāng)于 monoSink.Success() = next()+complete();
fluxSink.error( new RuntimeException()); // 異常
});
上面是數(shù)據(jù)的創(chuàng)建者下面還有寫常用的數(shù)據(jù)創(chuàng)建者
// 將list 作為數(shù)據(jù)源添加到flux
Flux.fromIterable(List.of(1,2,3))
// 將流作為數(shù)據(jù)源
Flux.fromStream(List.of(1,2,3).stream());
// 立即創(chuàng)建一個數(shù)據(jù)源 餓漢模式
Flux.just(List.of());
// 對應(yīng)的就有懶漢模式 這里每次調(diào)用就會返回一個
Flux.defer(()->Flux.just(1));
// 如果我想定時的創(chuàng)造數(shù)據(jù)流怎么辦
Flux.interval(Duration.ofMillis(1)) // 每秒發(fā)送一條數(shù)據(jù)
// 當(dāng)我的數(shù)據(jù)生產(chǎn)者不止一個怎么做
Flux.create();
Mono.
訂閱者
// 阻塞的
Mono.just(1).block();
// 非阻塞的
Mono.just(1).subscribe();
// flux 非阻塞的
Flux.just(1).subscribe();
// flux 阻塞 第一條數(shù)據(jù)
Flux.just(1).blockFirst();
// 阻塞最后一條數(shù)據(jù)
Flux.just(1).blockLast();
使用flux 實現(xiàn)文件的讀寫
Flux.fromStream(Files.lines(Paths.get("17336.txt")))
.log()
.subscribe(new BaseSubscriber<String>() {
BufferedWriter bufferedWriter = Files.newBufferedWriter(Paths.get("凡人修仙傳.txt"));
@Override
@SneakyThrows
protected void hookOnNext(String value) {
super.hookOnNext(value);
bufferedWriter.write(value+"\n");
if(value.contains("------------")){ // 每寫入一個章節(jié)就將數(shù)據(jù)寫入文件并刷新緩存
bufferedWriter.flush();
}
}
@Override
@SneakyThrows
protected void hookOnComplete() {
super.hookOnComplete();
bufferedWriter.close();
}
});
冷數(shù)據(jù)發(fā)布者VS熱數(shù)據(jù)發(fā)布者
首先來看定義:
冷數(shù)據(jù)發(fā)布者: 在向訂閱者發(fā)布數(shù)據(jù)的時候都會從起始位置開始,如果沒有訂閱者就不會做任何事情。
熱數(shù)據(jù)發(fā)布者:冷數(shù)據(jù)和熱數(shù)據(jù)相反,即當(dāng)一個新的訂閱者來訂閱流的時候會在最新的位置開始發(fā)送數(shù)據(jù)。
熱數(shù)據(jù)發(fā)布示例
Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
ConnectableFlux hotFlux = coldFlux.publish();
hotFlux.subscribe((s)->{
System.out.println(s);
});
hotFlux.connect();
Thread.sleep(6000);
hotFlux.subscribe(s->{
System.out.println(s);
});
Thread.sleep(1000000);
執(zhí)行結(jié)果
16:53:10.474 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:53:10.495 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:53:10.498 [main] INFO reactor.Flux.Interval.1 - request(256)
16:53:12.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
熱數(shù)據(jù)第一個訂閱者:0
16:53:12.519 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
熱數(shù)據(jù)第一個訂閱者:1
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
熱數(shù)據(jù)第一個訂閱者:2
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
熱數(shù)據(jù)第一個訂閱者:3
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:20.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
熱數(shù)據(jù)第一個訂閱者:4
熱數(shù)據(jù)第二個發(fā)布者:4
16:53:20.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:22.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
熱數(shù)據(jù)第一個訂閱者:5
熱數(shù)據(jù)第二個發(fā)布者:5
冷數(shù)據(jù)發(fā)布示例
Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
coldFlux.subscribe((s)->{
System.out.println("第一個訂閱者:"+s);
});
Thread.sleep(6000);
coldFlux.subscribe(s->{
System.out.println("第二個訂閱者:"+s);
});
Thread.sleep(1000000);
執(zhí)行結(jié)果
16:36:52.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:36:52.806 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:52.807 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:36:54.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
第一個訂閱者:0
16:36:56.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
第一個訂閱者:1
16:36:58.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
第一個訂閱者:2
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:37:00.819 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
16:37:00.819 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
第一個訂閱者:3
第二個訂閱者:0
16:37:02.811 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
第一個訂閱者:4
16:37:02.826 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
第二個訂閱者:1
16:37:04.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
第一個訂閱者:5
16:37:04.827 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
第二個訂閱者:2
異步與并行
講到這里目前看到當(dāng)前調(diào)用的方法基本上都是同步的,除了Flux.Interval()。但是interavl 這個方法局限性太大,那么有沒有讓整個流異步的方法呢?
先看代碼:
List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 161, 17, 18, 19, 20, 21, 22);
Flux.fromStream(facebookAccountList.stream())
.parallel(2)
.runOn(Schedulers.parallel()) // 執(zhí)行
.log()
.subscribe(System.out::println);
//Thread.sleep(1_000_000);
執(zhí)行結(jié)果
17:32:52.962 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:32:52.990 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.991 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
Process finished with exit code 0
看到執(zhí)行結(jié)果可以判斷這段代碼是異步非阻塞的,相關(guān)的日志并沒有被打印出來
下面將線程休眠注釋關(guān)掉 執(zhí)行結(jié)果
List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
Flux.fromStream(facebookAccountList.stream())
.parallel(8)
.runOn(Schedulers.parallel())
.log()
.subscribe(System.out::println);
Thread.sleep(1_000_000);
執(zhí)行結(jié)果:
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onNext(1)
1
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onNext(2)
2
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onNext(3)
3
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onNext(4)
4
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onNext(5)
5
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onNext(7)
7
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onNext(6)
6
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onNext(8)
8
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onComplete()
上述代碼設(shè)計到兩個方法 parallel(),runOn() ;其中parallel() 這個方法 我在看國內(nèi)的文章里說這個是開啟異步的方法,但是我在看國外的文章與相關(guān)的文檔的時候并沒有說他是開啟異步的方法。而是開啟并行的方法。那這里就有歧義,那這里就需要驗證下;看如下代碼:
Flux.range(1,10).parallel(3).log().subscribe(System.out::println);
執(zhí)行結(jié)果:
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(1)
1
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(2)
2
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(3)
3
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(4)
4
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(5)
5
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(6)
6
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(7)
7
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(8)
8
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(9)
9
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(10)
10
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
執(zhí)行結(jié)果發(fā)現(xiàn)并不是異步的,那這個方法到底是干什么的?
其實文檔里說的很明白了,就是并行,他的作用就是拓寬通道,本來只有一個通道的,在我使用parallel(3)方法的時候通道變?yōu)? 個, 之后訂閱者訂閱這三個通道。

publishOn VS subscribeOn 后續(xù)有機(jī)會再說
相關(guān)文檔:
背壓解釋
官方文檔
publishOn 和 subscribeOn