Spring-Reactor 學(xué)習(xí)

響應(yīng)式編程

什么是響應(yīng)式編程

響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式。

響應(yīng)式編程的好處

響應(yīng)式編程是異步非阻塞的,能夠有效的利用服務(wù)器資源,提高性能。它提升的并不是單個API的響應(yīng)時間,而是提升整體服務(wù)并發(fā)處理量,默認(rèn)Tomcat200個線程同時只能處理200個請求,而基于數(shù)據(jù)流和事件循環(huán)的響應(yīng)式框架能更好的利用CPU和內(nèi)存資源。

Screen Shot 2020-10-22 at 5.35.25 PM.png

Spring Reactor

設(shè)計原理

觀察者模式的延伸,Push模型

核心接口

Publisher
public interface Publisher<T> {

    // 傳入訂閱者
    public void subscribe(Subscriber<? super T> s);
}
Subscriber
public interface Subscriber<T> {
    
    // 注冊完成后,首先被調(diào)用
    public void onSubscribe(Subscription s);
    // 執(zhí)行消費函數(shù)
    public void onNext(T t);
    // Error時調(diào)用
    public void onError(Throwable t);
    // 完成訂閱后執(zhí)行
    public void onComplete();
}
Subscription
public interface Subscription {
    
    // 請求數(shù)據(jù),參數(shù)n為請求的數(shù)據(jù)量,不是超時時間
    public void request(long n);
    // 取消訂閱
    public void cancel();
}
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

BackPressure

通過Subscription接口實現(xiàn)BackPressure處理,調(diào)用request方法調(diào)整上游傳遞的數(shù)據(jù)量,默認(rèn)是Long.MAX_VALUE。

Screen Shot 2020-10-22 at 5.47.19 PM.png

核心接口關(guān)系

  • Publisher調(diào)用subscribe方法,接受Subscriber對象參數(shù)。
  • subscribe方法中,Publisher調(diào)用Subscriber對象的onSubscribe方法,傳入Subscription對象。
  • SubscriptionSubscribePublisher中的數(shù)據(jù)組成(以ArraySubscription實現(xiàn)類為例)。
  • Subscription中根據(jù)request的值對Publisher中的數(shù)據(jù)進行循環(huán)調(diào)用SubscribeonNext方法。
    @Test
    void fluxTest() {
        Flux<Integer> range = Flux.range(1, 5)
                .log();

        range.subscribe();
    }
17:49:40.157 [Test worker] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
17:49:40.159 [Test worker] INFO reactor.Flux.Range.1 - | request(unbounded)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(1)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(2)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(3)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(4)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onNext(5)
17:49:40.160 [Test worker] INFO reactor.Flux.Range.1 - | onComplete()

    @Test
    void fluxBaseSubscriber() {
        Flux<Integer> ints = Flux.range(1, 4)
                .log();
        ints.subscribe(new BaseSubscriber<>() {
            int count = 0;

            public void hookOnSubscribe(Subscription subscription) {
                System.out.println("Subscribed");
                request(2);
            }

            public void hookOnNext(Integer value) {
                System.out.println(value);
                if (++count == 2) {
                    request(2);
                }
            }
        });
    }
17:51:50.405 [Test worker] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
Subscribed
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | request(2)
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | onNext(1)
1
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | onNext(2)
2
17:51:50.408 [Test worker] INFO reactor.Flux.Range.1 - | request(2)
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onNext(3)
3
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onNext(4)
4
17:51:50.409 [Test worker] INFO reactor.Flux.Range.1 - | onComplete()

Hot and Cold

Cold

訂閱前什么都不會發(fā)生,發(fā)布者進行了兩次訂閱,每次訂閱都導(dǎo)致它把數(shù)據(jù)流從新發(fā)一遍

    @Test
    public void testColdPublisher() {
        Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        System.out.println();
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
    }
Hot

手動觸發(fā)數(shù)據(jù)流

    @Test
    public void testConnectableFlux() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("上游收到訂閱"));

        ConnectableFlux<Integer> co = source.publish();

        co.subscribe(System.out::println, e -> {}, () -> {});
        co.subscribe(System.out::println, e -> {}, () -> {});

        System.out.println("訂閱者完成訂閱操作");
        Thread.sleep(500);
        System.out.println("還沒有連接上");

        co.connect();
    }

調(diào)度器和線程模型

前面介紹了響應(yīng)式流和在其上可以進行的各種操作,Scheduler可以指定這些操作執(zhí)行的方式和所在的線程。

調(diào)度器
  • publishOn調(diào)整之后的操作的運行線程
  • subscribeOn設(shè)置數(shù)據(jù)源的操作的運行線程
    @Test
    void testSubscribeOn() {
        Flux<Integer> fluxMap = Flux.range(1, 4)
                .map(integer -> {
                    System.out.println("Map1 number : " + integer +
                            " Thread is : " + Thread.currentThread().getName());
                    return integer;
                })
                .subscribeOn(Schedulers.single())
                .map(integer -> {
                    System.out.println("Map2 number : " + integer +
                            " Thread is : " + Thread.currentThread().getName());
                    return integer;
                });

        StepVerifier.create(fluxMap)
                .expectNext(1, 2, 3, 4)
                .verifyComplete();
    }

    @Test
    void testPublishOn() {
        Flux<Integer> fluxMap = Flux.range(1, 4)
                .map(integer -> {
                    System.out.println("Map1 number : " + integer +
                            " Thread is : " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return integer;
                })
                .publishOn(Schedulers.single())
                .map(integer -> {
                    System.out.println("Map2 number : " + integer +
                            " Thread is : " + Thread.currentThread().getName());
                    return integer;
                });

        StepVerifier.create(fluxMap)
                .expectNext(1, 2, 3, 4)
                .verifyComplete();
    }
線程模型
  • Schedulers.immediate() 當(dāng)前線程
  • Schedulers.single() 單一的可復(fù)用的線程
  • Schedulers.elastic() 彈性大小緩存線程池,線程池中的線程是可以復(fù)用的。當(dāng)所需要時,新的線程會被創(chuàng)建。如果一個線程閑置太長時間,則會被銷毀。該調(diào)度器適用于 I/O 操作相關(guān)的流的處理。
  • Schedulers.parallel() 并行操作優(yōu)化的線程池,通過 Schedulers.parallel()方法來創(chuàng)建。其中的線程數(shù)量取決于 CPU 的核的數(shù)量。該調(diào)度器適用于計算密集型的流的處理。
    @Test
    // 執(zhí)行時間大概8秒,創(chuàng)建新線程
    void testNewSingle() {
        System.out.println(LocalDateTime.now());
        Flux.just("tom", "jack", "allen")
                .publishOn(Schedulers.newSingle("1")).map(this::doSomething)
                .publishOn(Schedulers.newSingle("2")).map(this::doSomething)
                .publishOn(Schedulers.newSingle("3")).map(this::doSomething)
                .publishOn(Schedulers.newSingle("4")).map(this::doSomething)
                .publishOn(Schedulers.newSingle("5")).map(this::doSomething)
                .subscribeOn(Schedulers.newSingle("0")).blockLast();
        System.out.println(LocalDateTime.now());
    }

    @Test
    // 執(zhí)行時間15秒,使用相同線程
    void testSingle() {
        System.out.println(LocalDateTime.now());
        Flux.just("tom", "jack", "allen")
                .publishOn(Schedulers.single()).map(this::doSomething)
                .publishOn(Schedulers.single()).map(this::doSomething)
                .publishOn(Schedulers.single()).map(this::doSomething)
                .publishOn(Schedulers.single()).map(this::doSomething)
                .publishOn(Schedulers.single()).map(this::doSomething)
                .subscribeOn(Schedulers.single()).blockLast();
        System.out.println(LocalDateTime.now());

    }

    private String doSomething(String s) {
        System.out.println(Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return s;
    }

Spring WebFlux

Reactor框架為基礎(chǔ)的,響應(yīng)式Web框架
傳統(tǒng)的Spring MVC基于Servlet,是阻塞式的,每次請求由一個線程處理;而Spring WebFlux通過事件循環(huán)Event Loop的方式,由單個線程非阻塞的處理事件。當(dāng)需要處理耗時任務(wù)時,Event Loop綁定的線程會新啟線程來執(zhí)行,完成后通知Event Loop。

Screen Shot 2020-09-23 at 11.46.52 PM.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容