聲明:差不多就是Transforming an Existing Sequence的翻譯
一、轉(zhuǎn)換當(dāng)前的序列
- 一對一轉(zhuǎn)換(比如將當(dāng)前字符串的值轉(zhuǎn)換為其長度):map
Flux.just("str1" , "str").map(item -> {return item.length();}).subscribe(System.out::println);//4 3- 僅作類型轉(zhuǎn)換:cast
Mono.just("str1").cast(Object.class); - 將元素轉(zhuǎn)換為帶序號的封裝元素,序號來自
index操作所接收到的順序,從0開始Flux.just("str1","str2").index();//3.1.0版本似乎沒有這個(gè)方法 - 將元素轉(zhuǎn)換為帶有記錄間隔時(shí)間的封裝元素,間隔時(shí)間是
elapsed操作接收到上一個(gè)數(shù)據(jù)到當(dāng)前數(shù)據(jù)的時(shí)間間隔Flux.interval(Duration.ofSeconds(1)).elapsed().subscribe(System.out::println,System.out::println,null,sub -> sub.request(3)); //----------------------- [1007,0] [1001,1] [999,2]
- 僅作類型轉(zhuǎn)換:cast
- 一對多轉(zhuǎn)換(比如將字符串轉(zhuǎn)換為組成它們的字符): flatMap
注意:flatMap會將其內(nèi)部多個(gè)Publisher合成為一個(gè)(按照時(shí)間順序),所以最后展現(xiàn)的還是一個(gè)序列Flux.just("str1","str2").flatMap(item -> { return Flux.fromStream(item.chars().mapToObj(c -> Character.valueOf((char) c))); } , 2).subscribe(System.out::println); //----------------(結(jié)果去掉換行) s t r 1 s t r 2- 如果想忽略掉
flatMap中某些數(shù)據(jù)可以使用Mono.empty() - 如果想使得
flatMap的數(shù)據(jù)按原數(shù)據(jù)順序排列(比如轉(zhuǎn)換公式為:str => s t r ,原數(shù)據(jù)如果為str1 str2 那么轉(zhuǎn)換后的排序順序必為 s t r 1 s t r 2,即只要上一個(gè)元素內(nèi)部轉(zhuǎn)換流沒有走完,下一個(gè)轉(zhuǎn)換內(nèi)容的永遠(yuǎn)不會被輸出,但會被記錄),可以試試Flux#flatMapSequential -
Mono的一對多轉(zhuǎn)換(轉(zhuǎn)換為Flux):Mono#flatMapMany
- 如果想忽略掉
二、在已有序列中增添新的元素
- 在序列之前 :Flux#startWith
Flux.just(1,2,3).startWith(0).subscribe(System.out::println); //------------------ 0 1 2 3 - 在序列之后 :Flux#concatWith
三、對Flux進(jìn)行歸并
-
歸并為List:Flux#collectList , Flux#collectSortedList
等待接收完成,并將所有數(shù)據(jù)歸并成一個(gè)Mono<List>進(jìn)行返回,后者比前者多個(gè)排序功能Flux.just(3,5,2,1).collectSortedList((left , right) -> { if(left > right){ return 1; }else if(left < right){ return -1; }else{ return 0; } }).subscribe(System.out::println); //--------------- [1, 2, 3, 5] -
歸并為Map:Flux#collectMap , Flux#collectMultiMap
等待接收完成,將所有數(shù)據(jù)歸并為一個(gè)Mono<Map>進(jìn)行返回,后者返回的是Mono<Map<key,Collection>>Flux.just(3,5,2,1).collectMap(item -> item.toString()).subscribe(System.out::println); //------------------------ //{1=1, 2=2, 3=3, 5=5} -
由Collector來完成歸并:Flux#collect
Flux.just(3,5,2,1).collect(Collectors.counting()).subscribe(System.out::println); //-------------------- //4 -
計(jì)算序列中元素?cái)?shù)量:Flux#count
Flux.just(3,5,2,1).count().subscribe(System.out::println); //----------------------------------- //4 -
對序列中的每一個(gè)元素應(yīng)用一個(gè)回調(diào),該回調(diào)的結(jié)果會帶入下一次回調(diào),直到所有元素被轉(zhuǎn)換為一個(gè)最終回調(diào)結(jié)果:Flux#reduce
Flux.just(3,5,2,1).reduce("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println); //---------------- //number:3521- 在Flux#reduce的基礎(chǔ)上,增加輸出每次轉(zhuǎn)換結(jié)果的功能:Flux#scan
Flux.just(3,5,2,1).scan("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println); //------------------------- number: number:3 number:35 number:352 number:3521
- 在Flux#reduce的基礎(chǔ)上,增加輸出每次轉(zhuǎn)換結(jié)果的功能:Flux#scan
-
歸并成布爾值
- 指定判斷式,是否符合所有元素:Flux#all
Flux.just(3,5,2,1).all(item -> item < 10).subscribe(System.out::println); //--------------- //true注意,如果有一個(gè)不符合,將會立刻向上游發(fā)送
cancel信號,并向下游發(fā)出元素false- 指定判斷式,是否至少有一個(gè)符合:Flux#any
注意,如果有一個(gè)符合,將會將會立刻向上游發(fā)送
cancel信號,并向下游發(fā)出元素true,下面兩個(gè)也同理,達(dá)到要求便立刻中止- 檢驗(yàn)序列中是否有元素:Flux#hasElements()
Flux.empty().hasElements().subscribe(System.out::println); //--------------- //false - 檢驗(yàn)序列中是否有特定元素:Flux#hasElement(T value)
四、合并多個(gè)生產(chǎn)者
- 以序列的順序進(jìn)行合并:Flux#concatWith(other)
Flux.just(1,2,3).concatWith(Flux.just(4,5,6)).subscribe(System.out::println); //--------------------------------- //1 2 3 4 5 6 - 以發(fā)出的元素順序進(jìn)行合并,先發(fā)出的元素在前:Flux#mergeWith(other)
Flux.interval(Duration.ofSeconds(1)).mergeWith(Flux.interval(Duration.ofSeconds(1))).subscribe(System.out::println); //--------------- //0 0 1 1 2 2 3 3 ....- 在
mergeWith基礎(chǔ)上,增加了兼容不同類型生產(chǎn)者的功能和合并元素的功能,如[1,2,3]和[a,b,c,d]會合并成[1a,2b,3c]:Flux#zipWithFlux.just(1,2,3).zipWith(Flux.just("a" , "b" , "c")).subscribe(System.out::println); //------ //[1,a] [2,b] [3,c] Flux.interval(Duration.ofSeconds(1)).zipWith(Flux.just("a","b","c") , (f,s) -> f.toString().concat(s)).subscribe(System.out::println); //-------- //0a 1b 2c
- 在
- 等待另一個(gè)序列結(jié)束,然后丟出一個(gè)Mono<Void>:Mono#and
Mono.just("e").and(Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok")); //----------- //0 1 2 ok - 等待指定所有序列結(jié)束,然后丟出一個(gè)Mono<Void>:Mono#when
Mono.when(Flux.just(1,2,3) , Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok")); //------------- //0 1 2 ok - 與
zipWith相似,但是每次合并的元素都取自其他序列發(fā)出的最近一個(gè)值,而不是一直等待其他序列發(fā)出下一個(gè)值:Flux#combineLatestFlux.combineLatest(objects -> objects[0].toString().concat(objects[1].toString()) ,Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just("a","b","c")).subscribe(System.out::println); // 0c 1c 2c - 有一個(gè)生產(chǎn)者序列集合,以誰先發(fā)出第一個(gè)元素來判斷選擇哪個(gè)序列進(jìn)行輸出
Flux.first(Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just(4,5,6)).subscribe(System.out::println); //---------- // 4 5 6 - 與
flatMap相似,但是當(dāng)轉(zhuǎn)換的序列還沒結(jié)束,但是主序列的下一個(gè)元素已經(jīng)到來額時(shí)候,會直接取消掉轉(zhuǎn)換后的序列,也就是說同時(shí)只能存在一個(gè)轉(zhuǎn)換的序列被執(zhí)行:switchMap
五、重復(fù)
- 完成后,重新訂閱該序列,重復(fù)輸出,永不停止:repeat
- 上述功能,再加上時(shí)間間隔:Flux.interval(duration).flatMap(tick → myExistingPublisher)
六、只對完成信號感興趣,即忽略元素
- 忽略所有的元素,如果上游發(fā)出完成信號,則完成:ignoreElements
- 忽略所有的元素,只響應(yīng)錯(cuò)誤信號和完成信號,完成返回
Mono<Void>: then- 變體--響應(yīng)完成時(shí)不再返回
Mono<Void>,而是正常執(zhí)行參數(shù)中的Mono,并將其返回值作為返回值:then(Mono<T> other)
- 變體--響應(yīng)完成時(shí)不再返回
- 完成后,還要再完成一個(gè)提供的空任務(wù)后才返回: Mono<Void> thenEmpty(Publisher<Void> other)
- 完成后,返回提供的值:Mono<T> Mono#thenReturn(T)
- 完成后,執(zhí)行提供的Flux,其元素會正常輸出:thenMany
七、有一個(gè)需要延遲完成的Mono
- 需要等待該Mono中元素所生成的序列完成后再向下游發(fā)出該元素:Mono#delayUntil(Function)
Mono.just("complete!").delayUntil(item -> Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(System.out::println); //------------------------------------------- //0 1 2 complete!
八、如果需要對流中元素進(jìn)行遞歸操作
- 以分支為先展開: Flux#expand(Function)
即先對所有元素進(jìn)行一遍遞歸,然后再對各個(gè)遞歸后序列中的序列進(jìn)行遞歸Flux.just(1,2).expand(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println); //-------------- //1 2 2 4 4 8 8 - 以深度為先展開:Flux#expandDeep(Function)
Flux.just(1,2).expandDeep(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println); //----------------- //1 2 4 8 2 4 8
九、序列為空時(shí)轉(zhuǎn)換
- 如果序列為空,則輸出提供的預(yù)定值: defaultIfEmpty
- 如果序列為空,則訂閱提供的預(yù)定序列::switchIfEmpty
參考文檔:
[1] Reactor api doc
[2] Reactor reference doc