響應(yīng)式編程
什么是響應(yīng)式編程
響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式。
響應(yīng)式編程的好處
響應(yīng)式編程是異步非阻塞的,能夠有效的利用服務(wù)器資源,提高性能。它提升的并不是單個API的響應(yīng)時間,而是提升整體服務(wù)并發(fā)處理量,默認(rèn)Tomcat有200個線程同時只能處理200個請求,而基于數(shù)據(jù)流和事件循環(huán)的響應(yīng)式框架能更好的利用CPU和內(nèi)存資源。

Spring Reactor
設(shè)計原理
觀察者模式的延伸,Push模型
核心接口
Publisher
public interface Publisher<T> {
// 傳入訂閱者
public void subscribe(Subscriber<? super T> s);
}
Subscriber
public interface Subscriber<T> {
// 注冊完成后,首先被調(diào)用
public void onSubscribe(Subscription s);
// 執(zhí)行消費函數(shù)
public void onNext(T t);
// Error時調(diào)用
public void onError(Throwable t);
// 完成訂閱后執(zhí)行
public void onComplete();
}
Subscription
public interface Subscription {
// 請求數(shù)據(jù),參數(shù)n為請求的數(shù)據(jù)量,不是超時時間
public void request(long n);
// 取消訂閱
public void cancel();
}
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
BackPressure
通過Subscription接口實現(xiàn)BackPressure處理,調(diào)用request方法調(diào)整上游傳遞的數(shù)據(jù)量,默認(rèn)是Long.MAX_VALUE。

核心接口關(guān)系
- Publisher調(diào)用subscribe方法,接受Subscriber對象參數(shù)。
- 在subscribe方法中,Publisher調(diào)用Subscriber對象的onSubscribe方法,傳入Subscription對象。
- 在Subscription由Subscribe和Publisher中的數(shù)據(jù)組成(以ArraySubscription實現(xiàn)類為例)。
- 在Subscription中根據(jù)request的值對Publisher中的數(shù)據(jù)進行循環(huán)調(diào)用Subscribe的onNext方法。
@Test
void fluxTest() {
Flux<Integer> range = Flux.range(1, 5)
.log();
range.subscribe();
}
17:49:40.157 [Test worker] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
17:49:40.159 [Test worker] INFO reactor.Flux.Range.1 - | request(unbounded)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(1)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(2)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(3)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(4)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(5)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onComplete()
@Test
void fluxBaseSubscriber() {
Flux<Integer> ints = Flux.range(1, 4)
.log();
ints.subscribe(new BaseSubscriber<>() {
int count = 0;
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(2);
}
public void hookOnNext(Integer value) {
System.out.println(value);
if (++count == 2) {
request(2);
}
}
});
}
17:51:50.405 [Test worker] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
Subscribed
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | request(2)
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | onNext(1)
1
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | onNext(2)
2
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | request(2)
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onNext(3)
3
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onNext(4)
4
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onComplete()
Hot and Cold
Cold
訂閱前什么都不會發(fā)生,發(fā)布者進行了兩次訂閱,每次訂閱都導(dǎo)致它把數(shù)據(jù)流從新發(fā)一遍
@Test
public void testColdPublisher() {
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
System.out.println();
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
}
Hot
手動觸發(fā)數(shù)據(jù)流
@Test
public void testConnectableFlux() throws InterruptedException {
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("上游收到訂閱"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("訂閱者完成訂閱操作");
Thread.sleep(500);
System.out.println("還沒有連接上");
co.connect();
}
調(diào)度器和線程模型
前面介紹了響應(yīng)式流和在其上可以進行的各種操作,Scheduler可以指定這些操作執(zhí)行的方式和所在的線程。
調(diào)度器
- publishOn調(diào)整之后的操作的運行線程
- subscribeOn設(shè)置數(shù)據(jù)源的操作的運行線程
@Test
void testSubscribeOn() {
Flux<Integer> fluxMap = Flux.range(1, 4)
.map(integer -> {
System.out.println("Map1 number : " + integer +
" Thread is : " + Thread.currentThread().getName());
return integer;
})
.subscribeOn(Schedulers.single())
.map(integer -> {
System.out.println("Map2 number : " + integer +
" Thread is : " + Thread.currentThread().getName());
return integer;
});
StepVerifier.create(fluxMap)
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
@Test
void testPublishOn() {
Flux<Integer> fluxMap = Flux.range(1, 4)
.map(integer -> {
System.out.println("Map1 number : " + integer +
" Thread is : " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer;
})
.publishOn(Schedulers.single())
.map(integer -> {
System.out.println("Map2 number : " + integer +
" Thread is : " + Thread.currentThread().getName());
return integer;
});
StepVerifier.create(fluxMap)
.expectNext(1, 2, 3, 4)
.verifyComplete();
}
線程模型
- Schedulers.immediate() 當(dāng)前線程
- Schedulers.single() 單一的可復(fù)用的線程
- Schedulers.elastic() 彈性大小緩存線程池,線程池中的線程是可以復(fù)用的。當(dāng)所需要時,新的線程會被創(chuàng)建。如果一個線程閑置太長時間,則會被銷毀。該調(diào)度器適用于 I/O 操作相關(guān)的流的處理。
- Schedulers.parallel() 并行操作優(yōu)化的線程池,通過 Schedulers.parallel()方法來創(chuàng)建。其中的線程數(shù)量取決于 CPU 的核的數(shù)量。該調(diào)度器適用于計算密集型的流的處理。
@Test
// 執(zhí)行時間大概8秒,創(chuàng)建新線程
void testNewSingle() {
System.out.println(LocalDateTime.now());
Flux.just("tom", "jack", "allen")
.publishOn(Schedulers.newSingle("1")).map(this::doSomething)
.publishOn(Schedulers.newSingle("2")).map(this::doSomething)
.publishOn(Schedulers.newSingle("3")).map(this::doSomething)
.publishOn(Schedulers.newSingle("4")).map(this::doSomething)
.publishOn(Schedulers.newSingle("5")).map(this::doSomething)
.subscribeOn(Schedulers.newSingle("0")).blockLast();
System.out.println(LocalDateTime.now());
}
@Test
// 執(zhí)行時間15秒,使用相同線程
void testSingle() {
System.out.println(LocalDateTime.now());
Flux.just("tom", "jack", "allen")
.publishOn(Schedulers.single()).map(this::doSomething)
.publishOn(Schedulers.single()).map(this::doSomething)
.publishOn(Schedulers.single()).map(this::doSomething)
.publishOn(Schedulers.single()).map(this::doSomething)
.publishOn(Schedulers.single()).map(this::doSomething)
.subscribeOn(Schedulers.single()).blockLast();
System.out.println(LocalDateTime.now());
}
private String doSomething(String s) {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
Spring WebFlux
以Reactor框架為基礎(chǔ)的,響應(yīng)式Web框架
傳統(tǒng)的Spring MVC基于Servlet,是阻塞式的,每次請求由一個線程處理;而Spring WebFlux通過事件循環(huán)Event Loop的方式,由單個線程非阻塞的處理事件。當(dāng)需要處理耗時任務(wù)時,Event Loop綁定的線程會新啟線程來執(zhí)行,完成后通知Event Loop。
