Spring 5 中引入了Reactive理念,下文主要介紹Reactive模式的基礎。
工程地址,分支為reactive-operations。
Reactive概念
Reactive是函數(shù)式編程(Functional),管道流(pipeline, stream), 異步非阻塞的,事件驅動的。
org.reactivestreams包中主要有4個接口
- 發(fā)布者
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
- 訂閱者
Subscriber
當接收到Publisher的數(shù)據(jù)時,會調用響應的回調方法。注冊完成時,首先會調用onSubscribe方法,參數(shù)Subscription s包含了注冊信息。
public interface Subscriber<T> {
// 注冊完成后,首先被調用
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
- 訂閱
Subscription
- 通過訂閱,訂閱者
Subscriber可以請求數(shù)據(jù)request,或者取消訂閱cancel。 - 在請求數(shù)據(jù)時,參數(shù)
long n表示希望接收的數(shù)據(jù)量,防止發(fā)布者Publisher發(fā)送過多的數(shù)據(jù)。 - 一旦開始請求,數(shù)據(jù)就會在流
stream中傳輸。每接收一個,就會調用onNext(T t);發(fā)生錯誤時,onError(Throwable t)被調用;傳輸完成后,onComplete()被調用。
public interface Subscription {
// 請求數(shù)據(jù),參數(shù)n為請求的數(shù)據(jù)量,不是超時時間
public void request(long n);
// 取消訂閱
public void cancel();
}
Processor
可以看出,Processor接口繼承了Subscriber和Publisher,是流的中間環(huán)節(jié)。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Reactive Stream中數(shù)據(jù)從Publisher開始,經(jīng)過若干個Processor,最終到達Subcriber,即完整的Pipeline。
Project Reactor
依賴
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
Mono和Flux
- 抽象類
Mono和Flux實現(xiàn)了Publisher接口,他們是發(fā)布者。 -
Mono表示少于等于1個數(shù)據(jù)(即0個, 或1個數(shù)據(jù))或錯誤;Flux表示一連串多個數(shù)據(jù)。
操作
- 創(chuàng)建
Flux或Mono,調用subscribe()后,數(shù)據(jù)開始流動。
主要方法有:just, fromArray, fromStream, fromIterable, range
@Test
public void create() {
//just方法
String[] arr = new String[]{"hello", "world"};
Flux<String> flux1 = Flux.just(arr);
flux1.subscribe(System.out::println);
Mono<String> mono = Mono.just("hi world");
mono.subscribe(System.out::println);
//fromArray方法
List<String> list = Arrays.asList("hello", "world");
Flux<String> flux2 = Flux.fromIterable(list);
//fromIterable方法
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> flux3 = Flux.fromIterable(fruitList);
//fromStream方法
Stream<String> stream = Stream.of("hi", "hello");
Flux<String> flux4 = Flux.fromStream(stream);
//range方法
Flux<Integer> range = Flux.range(0, 5);
//interval方法, take方法限制個數(shù)為5個
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
}
- 合并
mergeWith
@Test
public void mergeFlux() {
Flux<String> source1 = Flux.just("hello", "world");
Flux<String> source2 = Flux.just("hi", "ted");
Flux<String> merge = source1.mergeWith(source2);
merge.subscribe(System.out::println);
}
- 結合為
Tuple2元組類型zipWith
@Test
public void zipFlux() {
Flux<String> source1 = Flux.just("hello", "world");
Flux<String> source2 = Flux.just("hi", "ted");
Flux<Tuple2<String, String>> zip = source1.zipWith(source2);
zip.subscribe(tuple -> {
System.out.println(tuple.getT1() + " -> " + tuple.getT2());
});
}
- 轉換和過濾
skip: 略過2個
@Test
public void skipFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.skip(2);
skip.subscribe(System.out::println);
}
take:只取前2個
@Test
public void takeFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.take(2);
skip.subscribe(System.out::println);
}
filter: 接收Predicate
@Test
public void filterFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.filter(s -> s.startsWith("h"));
skip.subscribe(System.out::println);
}
distinct: 去重
@Test
public void distinctFlux() {
Flux<String> source1 = Flux.just("hello", "hello", "world", "hi", "ted");
Flux<String> skip = source1.filter(s -> s.startsWith("h")).distinct();
skip.subscribe(System.out::println);
}
map: 接收Function
@Test
public void mapFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<String> skip = source1.map(s -> s + " is mapped");
skip.subscribe(System.out::println);
}
flatMap: 根據(jù)Flux中的元素先生成Mono, 再對Mono中的元素進行map轉換。
@Test
public void flatMapFlux() {
Flux<String> source1 = Flux.just("hello world", "hi ted");
Flux<String> flatMap = source1.flatMap(s -> Mono.just(s).map(s1 -> {
String[] strings = s1.split("\\s");
return new String(strings[0] + " - " + strings[1]);
}));
flatMap.subscribe(System.out::println);
}
buffer: 將stream中的數(shù)據(jù)按照固定大小分配,新的Flux中的List的元素個數(shù)是2
@Test
public void bufferFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Flux<List<String>> buffer = source1.buffer(2);
buffer.subscribe(strings -> System.out.println(strings.size()));
}
collectList: 將Flux中的元素收集到一個List中
@Test
public void collectListFlux() {
Flux<String> source1 = Flux.just("hello", "world", "hi", "ted");
Mono<List<String>> mono = source1.collectList();
mono.subscribe(System.out::println)
}
collectMap: 將Flux中的元素提取為一個Map,Map的key根據(jù)Function生成
@Test
public void collectMapFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Map<Character, String>> map = source1.collectMap(s -> s.charAt(0));
map.subscribe(characterStringMap -> System.out.println(characterStringMap.get('t')));
}
- 邏輯運算
all: 判斷Flux中元素是否都滿足Predicate條件
@Test
public void allFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Boolean> mono = source1.all(s -> s.contains("e"));
mono.subscribe(System.out::println);
}
any: 判斷Flux中元素是否至少有1個滿足Predicate條件
@Test
public void anyFlux() {
Flux<String> source1 = Flux.just("hello", "world", "ted");
Mono<Boolean> mono = source1.any(s -> s.contains("e"));
mono.subscribe(System.out::println);
}