之前文章介紹的例子都是處理一個流中的事件。然而在實際的業(yè)務(wù)中我們往往會遇到同時處理兩個流的需求。比如我們需要從兩個不同的 api 獲取數(shù)據(jù),然后合并數(shù)據(jù)在前端顯示等等。
首先為我們之前的例子添加一個文本輸入框 input,并獲取它的輸入事件流:
const input$ = fromEvent(inputRef.current, "input");
然而我們把輸入流中的事件變換為輸入值(默認是輸入事件對象),同時把之前的代碼做下整理:
const input$ = fromEvent(inputRef.current, "input").pipe(
map(e => e.target.value),
);
const timer$ = time$.pipe(
switchMap(addOneOrReset),
startWith({ count: 0 }),
scan((acc, current) => current(acc)),
map(obj => obj.count)
tap(v => setTxt(v))
);
tap:它的作用就是對流過的數(shù)據(jù)進行處理,然后原封不動的再把原數(shù)據(jù)傳遞給接下來的操作符。我們一般用它來進行產(chǎn)生負效果的操作(之前的負效果代碼是寫在 subscribe 函數(shù)中的),比如寫日志啊,更新頁面等等。這里其實遵循的是某一種 Rx 編程模型最佳實踐。也就是在 subscribe 函數(shù)中不做任何操作,有點兒類似函數(shù)式編程中 IO Monad。當(dāng)然,現(xiàn)在我們關(guān)注的重點是使用操作符完成功能。
準(zhǔn)備工作做好了,現(xiàn)在我們要做的是如何同時使用輸入流(input$)和定時器流(timer$)中的數(shù)據(jù)呢?
combineLatest:這個操作符有很多方法重載,我們這里用到的是接收多個流作為參數(shù)的方法。這里先不講,直接看效果。
combineLatest(timer$, input$).pipe(
tap(console.log)
).subscribe();

我們觀察控制臺,發(fā)現(xiàn)一開始什么輸出都沒有,按理說定時器流中的 startWith 操作符應(yīng)該會流出事件啊。當(dāng)我們在 input 輸入框輸入數(shù)據(jù)時,控制臺終于有了輸出。再點擊各種按鈕試試,發(fā)現(xiàn)規(guī)律了嗎?
combineLatest 是符如其名,組合流中最后的事件。意思是(以這里的例子為例):
- 首先文本輸入流和定時器流都得有事件流出。
- combineLatest 捕獲是兩個事件流中的最新值,如果文本輸入流有新值,那么將輸出 [定時器流最后一個值,文本輸入流新值];如果定時器流有新值,將輸出 [定時器流新值,文本輸入流最后一個值]。因此,只要任意一個流有新值產(chǎn)生,combineLatest 就會有輸出。
因此,一開始我們的定時器流中有值,但文本流沒有值,所以沒有輸出,這符合第一點。然后,當(dāng)我們開始在文本框輸入時,有值輸出;當(dāng)我們點擊定時器按鈕開始計時時,控制臺將會以定時器的頻率持續(xù)輸出,并且輸入肯定是兩個流中的最新值,或者說是最后那個值。這符合第二點。
我們看到 combineLatest 操作符以數(shù)組的方式組合了各個流中的數(shù)據(jù),一般來說我們肯定要對這些數(shù)據(jù)進行加工產(chǎn)生新的數(shù)據(jù)類型,比如對象啊,文本啊,可以在接下來使用 map 操作符進行數(shù)據(jù)變換。其實 combineLatest 的重載為我們提供了更方便的變換數(shù)據(jù)的方式,傳入額外的函數(shù)參數(shù),這個函數(shù)接收各個流中的值作為輸入?yún)?shù),返回值作為下一個操作符操作流中的值。使用方式如下:
combineLatest(
timer$,
input$,
(timeValue, inputValue) => ({count: timeValue, input: inputValue}) // 下一個操作符操作的值就為一個對象,包含兩個屬性
)
完整代碼如下:
import React, { useRef, useEffect, useState } from "react";
import { fromEvent, interval, merge, combineLatest } from "rxjs";
import {
takeUntil,
switchMap,
scan,
startWith,
mapTo,
tap,
map
} from "rxjs/operators";
export default function App() {
const [txt, setTxt] = useState("");
const pauseBtnRef = useRef(null);
const startBtnRef = useRef(null);
const resetBtnRef = useRef(null);
const halfBtnRef = useRef(null);
const quarterBtnRef = useRef(null);
const inputRef = useRef(null);
interface Count {
count: number;
}
const addOne = (acc: Count) => ({ count: acc.count + 1 });
const reset = (acc: Count) => ({ count: 0 });
useEffect(() => {
const pauseBtnClick$ = fromEvent(pauseBtnRef.current, "click");
const startBtnClick$ = fromEvent(startBtnRef.current, "click");
const resetBtnClick$ = fromEvent(resetBtnRef.current, "click");
const halfBtnClick$ = fromEvent(halfBtnRef.current, "click");
const quarterBtnClick$ = fromEvent(quarterBtnRef.current, "click");
const addOneOrReset = (time = 1000) =>
merge(
interval(time).pipe(
takeUntil(pauseBtnClick$),
mapTo(addOne)
),
resetBtnClick$.pipe(mapTo(reset))
);
const time$ = merge(
startBtnClick$.pipe(mapTo(1000)),
halfBtnClick$.pipe(mapTo(500)),
quarterBtnClick$.pipe(mapTo(250))
);
const input$ = fromEvent(inputRef.current, "input").pipe(
map(e => e.target.value)
);
const timer$ = time$.pipe(
switchMap(addOneOrReset),
startWith({ count: 0 }),
scan((acc, current) => current(acc)),
map(obj => obj.count),
tap(v => setTxt(v))
);
const subscription = combineLatest(
timer$,
input$,
(timeValue, inputValue) => ({count: timeValue, input: inputValue})
)
.pipe(tap(console.log))
.subscribe();
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div className="App">
<div style={{ fontSize: "30px" }}>{txt}</div>
<button ref={startBtnRef}>開始</button>
<button ref={pauseBtnRef}>暫停</button>
<button ref={resetBtnRef}>重置</button>
<button ref={halfBtnRef}>1/2秒</button>
<button ref={quarterBtnRef}>1/4秒</button>
<div>
<input type="text" ref={inputRef} />
</div>
</div>
);
}
如有任何問題,請?zhí)砑游⑿殴娞枴白x一讀我”。