Java 9 Reactive Streams

Java 9?Reactive Streams允許我們實現(xiàn)非阻塞異步流處理。這是將響應式編程模型應用于核心java編程的重要一步。

如果您對響應式編程不熟悉,請閱讀Reactive Manifesto并閱讀Reactive Streams的簡短說明。RxJava和Akka Streams一直是十分優(yōu)秀的響應流實現(xiàn)庫?,F(xiàn)在java 9已經(jīng)通過java.util.concurrent.Flow?API 引入了響應流支持。

Java 9 Reactive Streams

Reactive Streams是關于流的異步處理,因此應該有一個發(fā)布者(Publisher)和一個訂閱者(Subscriber)。發(fā)布者發(fā)布數(shù)據(jù)流,訂閱者使用數(shù)據(jù)。

有時我們必須在Publisher和Subscriber之間轉換數(shù)據(jù)。處理器(Processor)是位于最終發(fā)布者和訂閱者之間的實體,用于轉換從發(fā)布者接收的數(shù)據(jù),以便訂閱者能理解它。我們可以擁有一系列(chain?)處理器。

  從上面的圖中可以清楚地看出,Processor既可以作為訂閱者也可以作為發(fā)布者。

Java 9 Flow API

Java 9 Flow API實現(xiàn)了Reactive Streams規(guī)范。Flow API是IteratorObserver模式的組合。Iterator在pull模型上工作,用于應用程序從源中拉取項目;而Observer在push模型上工作,并在item從源推送到應用程序時作出反應。

Java 9 Flow API訂閱者可以在訂閱發(fā)布者時請求N個項目。然后將項目從發(fā)布者推送到訂閱者,直到推送玩所有項目或遇到某些錯誤。?


Java 9 Flow API類和接口

讓我們快速瀏覽一下Flow API類和接口。

java.util.concurrent.Flow:這是Flow API的主要類。該類封裝了Flow API的所有重要接口。這是一個final類,我們不能擴展它。

java.util.concurrent.Flow.Publisher:這是一個功能接口,每個發(fā)布者都必須實現(xiàn)它的subscribe方法,并添加相關的訂閱者以接收消息。

java.util.concurrent.Flow.Subscriber:每個訂閱者都必須實現(xiàn)此接口。訂閱者中的方法以嚴格的順序進行調(diào)用。此接口有四種方法:?

onSubscribe:這是訂閱者訂閱了發(fā)布者后接收消息時調(diào)用的第一個方法。通常我們調(diào)用subscription.request開始從處理器(Processor)接收項目。

onNext:當從發(fā)布者收到項目時調(diào)用此方法,這是我們實現(xiàn)業(yè)務邏輯以處理流,然后從發(fā)布者請求更多數(shù)據(jù)的方法。

onError:當發(fā)生不可恢復的錯誤時調(diào)用此方法,我們可以在此方法中執(zhí)行清理操作,例如關閉數(shù)據(jù)庫連接。

onComplete:這就像finally方法,并且在發(fā)布者沒有發(fā)布其他項目發(fā)布者關閉時調(diào)用。我們可以用它來發(fā)送流成功處理的通知。

java.util.concurrent.Flow.Subscription:這用于在發(fā)布者和訂閱者之間創(chuàng)建異步非阻塞鏈接。訂閱者調(diào)用其request方法來向發(fā)布者請求項目。它還有cancel取消訂閱的方法,即關閉發(fā)布者和訂閱者之間的鏈接。

java.util.concurrent.Flow.Processor:此接口同時擴展了Publisher和Subscriber接口,用于在發(fā)布者和訂閱者之間轉換消息。

java.util.concurrent.SubmissionPublisher:一個Publisher實現(xiàn),它將提交的項目異步發(fā)送給當前訂閱者,直到它關閉為止。它使用Executor框架,我們將在響應流示例中使用該類來添加訂閱者,然后向其提交項目。

Java 9響應流示例

  讓我們從一個簡單的例子開始,我們將實現(xiàn)Flow API Subscriber接口并使用SubmissionPublisher來創(chuàng)建發(fā)布者和發(fā)送消息。

Stream Data

  假設我們有一個Employee類,用于創(chuàng)建從發(fā)布者發(fā)送到訂閱者的流消息。

packagecom.journaldev.reactive.beans;publicclassEmployee{privateintid;privateString name;publicintgetId() {returnid;? ? }publicvoidsetId(intid) {this.id = id;? ? }publicStringgetName() {returnname;? ? }publicvoidsetName(String name) {this.name = name;? ? }publicEmployee(inti, String s) {this.id = i;this.name = s;? ? }publicEmployee() {? ? }@OverridepublicStringtoString() {return"[id="+id+",name="+name+"]";? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

  我們還有一個實用程序類來為我們的示例創(chuàng)建一個員工列表。

packagecom.journaldev.reactive_streams;importjava.util.ArrayList;importjava.util.List;importcom.journaldev.reactive.beans.Employee;publicclassEmpHelper{publicstaticListgetEmps() {? ? ? ? Employee e1 =newEmployee(1,"Pankaj");? ? ? ? Employee e2 =newEmployee(2,"David");? ? ? ? Employee e3 =newEmployee(3,"Lisa");? ? ? ? Employee e4 =newEmployee(4,"Ram");? ? ? ? Employee e5 =newEmployee(5,"Anupam");? ? ? ? List emps =newArrayList<>();? ? ? ? emps.add(e1);? ? ? ? emps.add(e2);? ? ? ? emps.add(e3);? ? ? ? emps.add(e4);? ? ? ? emps.add(e5);returnemps;? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

Subscriber

packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Employee;publicclassMySubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) {? ? ? ? System.out.println("Subscribed");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item");? ? }@OverridepublicvoidonNext(Employee item) {? ? ? ? System.out.println("Processing Employee "+item);? ? ? ? counter++;this.subscription.request(1);? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? System.out.println("Some error happened");? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("All Processing Done");? ? }publicintgetCounter() {returncounter;? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

Subscription變量以保持引用,以便可以在onNext方法中進行請求。

counter變量以記錄處理的項目數(shù),請注意它的值在onNext方法中增加。這將在我們的main方法中用于在結束主線程之前等待執(zhí)行完成。

在onSubscribe方法中調(diào)用訂閱請求以開始處理。另請注意,onNext在處理項目后再次調(diào)用方法,要求對下一個從發(fā)布者發(fā)布的項目進行處理。

onError和onComplete在例子中沒有太多邏輯,但在實際情況中,它們應該用于在發(fā)生錯誤時執(zhí)行糾正措施或在處理成功完成時清理資源。

響應流測試程序

我們將使用SubmissionPublisherPublisher作為示例,讓我們看一下響應流實現(xiàn)的測試程序。

packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;publicclassMyReactiveApp{publicstaticvoidmain(String args[])throwsInterruptedException {// Create PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Register SubscriberMySubscriber subs =newMySubscriber();? ? ? ? publisher.subscribe(subs);? ? ? ? List emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber");? ? ? ? emps.stream().forEach(i -> publisher.submit(i));// logic to wait till processing of all messages are overwhile(emps.size() != subs.getCounter()) {? ? ? ? ? ? Thread.sleep(10);? ? ? ? }// close the Publisherpublisher.close();? ? ? ? System.out.println("Exiting the app");? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

在上述代碼中,最重要的部分是發(fā)布者subscribe和submit方法的調(diào)用。我們應該始終關閉發(fā)布者以避免任何內(nèi)存泄漏。

  執(zhí)行上述程序時,我們將得到以下輸出。

SubscribedPublishing ItemstoSubscriberonSubscribe requested1itemProcessing Employee [id=1,name=Pankaj]Processing Employee [id=2,name=David]Processing Employee [id=3,name=Lisa]Processing Employee [id=4,name=Ram]Processing Employee [id=5,name=Anupam]ExitingtheappAll Processing Done

1

2

3

4

5

6

7

8

9

10

消息轉換示例

處理器用于在發(fā)布者和訂閱者之間轉換消息。假設我們有另一個用戶希望處理不同類型的消息。假設這個新的消息類型是Freelancer。

packagecom.journaldev.reactive.beans;publicclassFreelancerextendsEmployee{privateintfid;publicintgetFid() {returnfid;? ? }publicvoidsetFid(intfid) {this.fid = fid;? ? }publicFreelancer(intid,intfid, String name) {super(id, name);this.fid = fid;? ? }@OverridepublicStringtoString() {return"[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

  我們有一個新訂閱者使用Freelancer流數(shù)據(jù)。

packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Freelancer;publicclassMyFreelancerSubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) {? ? ? ? System.out.println("Subscribed for Freelancer");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item for Freelancer");? ? }@OverridepublicvoidonNext(Freelancer item) {? ? ? ? System.out.println("Processing Freelancer "+item);? ? ? ? counter++;this.subscription.request(1);? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? System.out.println("Some error happened in MyFreelancerSubscriber");? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("All Processing Done for MyFreelancerSubscriber");? ? }publicintgetCounter() {returncounter;? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

processor

代碼重要的部分是實現(xiàn)Processor接口。由于我們想要使用它SubmissionPublisher,我們會擴展它并在適合的地方使用它。

packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Processor;importjava.util.concurrent.Flow.Subscription;importjava.util.concurrent.SubmissionPublisher;importjava.util.function.Function;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyProcessorextendsSubmissionPublisherimplementsProcessor {privateSubscription subscription;privateFunction function;publicMyProcessor(Function function) {super();this.function = function;? ? ? ? }@OverridepublicvoidonSubscribe(Subscription subscription) {this.subscription = subscription;? ? ? ? subscription.request(1);? ? }@OverridepublicvoidonNext(Employee emp) {? ? ? ? submit((Freelancer) function.apply(emp));? ? ? ? ? subscription.request(1);? ? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("Done");? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

Function?將用于將Employee對象轉換為Freelancer對象。

我們將傳入的Employee消息轉換為onNext方法中的Freelancer消息,然后使用SubmissionPublisher?submit方法將其發(fā)送給訂閱者。

由于Processor既是訂閱者又是發(fā)布者,我們可以在終端發(fā)布者和訂閱者之間創(chuàng)建一系列處理器。

消息轉換測試

packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyReactiveAppWithProcessor{publicstaticvoidmain(String[] args)throwsInterruptedException {// Create End PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Create ProcessorMyProcessor transformProcessor =newMyProcessor(s -> {returnnewFreelancer(s.getId(), s.getId() +100, s.getName());? ? ? ? });//Create End SubscriberMyFreelancerSubscriber subs =newMyFreelancerSubscriber();//Create chain of publisher, processor and subscriberpublisher.subscribe(transformProcessor);// publisher to processortransformProcessor.subscribe(subs);// processor to subscriberList emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber");? ? ? ? emps.stream().forEach(i -> publisher.submit(i));// Logic to wait for messages processing to finishwhile(emps.size() != subs.getCounter()) {? ? ? ? ? ? Thread.sleep(10);? ? ? ? }// Closing publisherspublisher.close();? ? ? ? transformProcessor.close();? ? ? ? System.out.println("Exiting the app");? ? }}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

閱讀程序中的注釋以正確理解它,最重要的變化是發(fā)布者 - 處理器 - 訂閱者鏈的創(chuàng)建。執(zhí)行上述程序時,我們將得到以下輸出。

SubscribedforFreelancerPublishing ItemstoSubscriberonSubscribe requested1itemforFreelancerProcessing Freelancer [id=1,name=Pankaj,fid=101]Processing Freelancer [id=2,name=David,fid=102]Processing Freelancer [id=3,name=Lisa,fid=103]Processing Freelancer [id=4,name=Ram,fid=104]Processing Freelancer [id=5,name=Anupam,fid=105]ExitingtheappAll Processing DoneforMyFreelancerSubscriberDone

1

2

3

4

5

6

7

8

9

10

11

取消訂閱

我們可以使用Subscription?cancel方法停止在訂閱者中接收消息。

  以下是一個示例代碼,其中訂閱者只消費3條消息,然后取消訂閱。

@OverridepublicvoidonNext(Employee item) {? ? System.out.println("Processing Employee "+item);? ? counter++;if(counter==3) {this.subscription.cancel();return;? ? }this.subscription.request(1);}

1

2

3

4

5

6

7

8

9

10

請注意,在這種情況下,我們在處理所有消息之前停止主線程的邏輯將進入無限循環(huán)。我們可以為此場景添加一些額外的邏輯,如果訂閱者已停止處理或取消訂閱,就使用一些全局變量來標志該狀態(tài)。

Back Pressure

當發(fā)布者以比訂閱者消費更快的速度生成消息時,會產(chǎn)生背壓。Flow API不提供任何關于背壓或處理它的信號的機制。但我們可以設計自己的策略來處理它,例如微調(diào)用戶或降低信息產(chǎn)生率。您可以閱讀RxJava deals with Back Pressure。

總結

  Java 9 Flow API是響應式編程和創(chuàng)建異步非阻塞應用程序的良好舉措。但是,只有在所有系統(tǒng)API都支持它時,才能創(chuàng)建真正的響應式應用程序。

原文地址:Java 9 Reactive Streams?written by Pankaj?

完整代碼:Github

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容