原文鏈接: https://medium.com/@benlesh/on-the-subject-of-subjects-in-rxjs-2b08b7198b93
本文為 RxJS 中文社區(qū) 翻譯文章,如需轉(zhuǎn)載,請(qǐng)注明出處,謝謝合作!
如果你也想和我們一起,翻譯更多優(yōu)質(zhì)的 RxJS 文章以奉獻(xiàn)給大家,請(qǐng)點(diǎn)擊【這里】

RxJS 中的 Subjects 經(jīng)常被誤解。因?yàn)樗鼈冊(cè)试S你命令式地向 Observable 流中推送值,當(dāng)人們不太清楚如何將某個(gè)東西變成 Observable 時(shí),他們傾向于濫用 Subjects 。此模式看起來有點(diǎn)像這樣...
// 當(dāng)人們發(fā)現(xiàn)不太清楚如何做時(shí),通常會(huì)首先使用 Subjects
// (不要這樣做)
const subject = new Subject();
button.addEventListener('click', () => subject.next('click');
subject.subscribe(x => console.log(x));
雖然這對(duì)于 RxJS 新手 (對(duì)于這個(gè)階段來說太正常不過了) 很有幫助,但這不是以 “Rx 的方式”在處理問題。理想的是在 Observable 中包裝事件注冊(cè),既可以監(jiān)聽事件,又可以取消事件監(jiān)聽。看起來像這樣:
// 這樣好些了,但請(qǐng)使用 Observable.fromEvent(button, 'click') 來替代
const clicks = new Observable(observer => {
const handler = (e) => observer.next(e);
button.addEventListener('click', handler);
return () => button.removeEventListener('click', handler);
});
為什么展示這個(gè)跟 Subjects 沒半點(diǎn)關(guān)系的示例?好吧,一點(diǎn)是它展示了為什么不總是需要使用 Subject,另外一點(diǎn)這有個(gè)隱藏的 subject... (某種程度上可以說是 subject )。這里要注意的一點(diǎn)是,Observable 是通過 addEventListener 來包裝按鈕的處理函數(shù)的注冊(cè),而 addEventListener 本身就是一個(gè) subject ?!辽俑鶕?jù) “Gang Of Four” 的觀察者模式來說是這樣的。
觀察者模式
你可能知道,RxJS 主要是關(guān)于 Observables 和 Observers 的,但它也是 Subjects 相關(guān)的。然而在 Gof 的 設(shè)計(jì)模式中是找不到 observables 的,Subjects 和 Observers 是 觀察者模式中的根本。

模式本身很簡(jiǎn)單。Observers 是具有通知方法的類,Subject 也是類,它具有向內(nèi)部觀察者列表添加或刪除觀察者的方法和通知觀察者列表的方法。
RxJS 中的 Subjects 并沒有太差區(qū)別。當(dāng)使用 observer 對(duì) Rx Subject 調(diào)用 subscribe 時(shí),Subject 會(huì)將該 observer 添加到內(nèi)部的觀察者列表中。同樣的,如果使用一到三個(gè)函數(shù)來調(diào)用 subscribe,Subject 會(huì)將它們包裝成一個(gè) observer,然后添加到觀察者列表中。當(dāng)調(diào)用 Subject 的 next(value) 時(shí),它會(huì)遍歷觀察者列表并將 value 傳遞給 next 方法。對(duì)于 error 和 complete 也是同樣的。要想從 subject 的觀察者列表中移除 observer,只需簡(jiǎn)單調(diào)用 subscription 的 unsubscribe 方法即可,subscription 是將 observer 添加到觀察者列表中時(shí)返回的。
const subject = new Subject();
// 將 observer1 添加到觀察者列表
const sub1 = subject.subscribe(observer1);
// 將 observer2 添加到觀察者列表
const sub2 = subject.subscribe(observer2);
// 使用 "hi there" 來通知列表中的所有觀察者
subject.next('hi there');
// 將 observer1 從觀察者列表中移除
sub1.unsubscribe();
Subject 相比較于 Observable
實(shí)際上,RxJS 中的 Subjects 不同于 GoF 觀察者模式中的 Subjects,但它們的 API 是 Observable 的鴨子類型。實(shí)際上,在 RxJS 中 Subjects 更甚,它們繼承自 Observable 。優(yōu)點(diǎn)是所有 Subjects 都具有與 Observable 相同的操作符和方法。
大概 Subject 和 Observable 之間一個(gè)很重要的區(qū)別就是 Subject 是有狀態(tài)的,它維護(hù)觀察者列表。另一方面,Observable 真的只是一個(gè)函數(shù),它建立了觀察本身。
雖然 Subjects 是 Observables,但 Subjects 還實(shí)現(xiàn)了 Observer 接口。也就是說,它們擁有 next、error 和 complete 方法。這些方法用來通知 subject 內(nèi)部觀察者列表中的 observers 。這意味著 subject 可以用作訂閱任何 observable 的 observer 。
// 為了使兩個(gè)觀察者 observer1 和 observer2 “共享” tick$,
// 我們可以通過 Subject 來傳輸所有通知,像這樣
const tick$ = Observable.interval(1000);
const subject = new Subject();
subject.subscribe(observer1);
subject.subscribe(observer2);
tick$.subscribe(subject);
上面的示例是將 observable tick$ “多播” 給兩個(gè)觀察者: observer1 和 observer2 。這其實(shí)就是 RxJS 中大多數(shù)多播操作符內(nèi)部所做的事情。例如 publish、publishReplay、multicast、share,等等。真的,這才是 RxJS 中 Subjects 的主要用法。
Subjects 是不可重用的
在 RxJS 中,Subjects 不能重用。也就是說,當(dāng)一個(gè) Subject 完成或報(bào)錯(cuò)時(shí),便不可再使用了。如果你嘗試在已關(guān)閉的 Subject (調(diào)用過 complete 或 error 方法)上調(diào)用 next,它會(huì)默認(rèn)忽略通知。如果想 Subject 在完成后調(diào)用 next 時(shí)進(jìn)行顯示地報(bào)錯(cuò),你可以在 subject 實(shí)例上直接調(diào)用 unsubscribe 。
// Subject 之死
const subject = new Subject();
subject.subscribe(x => console.log(x));
subject.next(1); // 1
subject.next(2); // 2
subject.complete();
subject.next(3); // 悄悄地忽略
subject.unsubscribe();
subject.next(4); // Unhandled ObjectUnsubscribedError
RxJS 中的陷阱
但是在 RxJS 的當(dāng)前版本中,會(huì)帶來了一些令人困惑的痛點(diǎn)。因?yàn)?Rx observables 不會(huì)“捕獲”錯(cuò)誤,我們會(huì)遭遇一些奇怪的行為。我曾經(jīng)嘲笑過 Promises 實(shí)現(xiàn)了錯(cuò)誤“捕獲”,但在多播場(chǎng)景中它或許是正確的。我的意思是當(dāng)我說 Rx observable 不“捕獲”錯(cuò)誤時(shí),是表示當(dāng)錯(cuò)誤滲透到觀察者鏈末端并的末端時(shí),如果錯(cuò)誤未被處理,它會(huì)被重新拋出。
// 演示缺少錯(cuò)誤處理時(shí)會(huì)進(jìn)行重新拋出
const badObservable = Observable.throw(new Error('haha'));
try {
badObservable.subscribe({
next: x => console.log(x),
error: null,
complete: () => console.log('done')
});
} catch (err) {
console.error(err); // 輸出自定義錯(cuò)誤: "haha"
}
現(xiàn)在我們來想想,當(dāng)你循環(huán)觀察者列表并通知它們時(shí)會(huì)發(fā)生什么(正如 subject 所做的)。
for (let observer of observers) {
observer.next('notify'); // 如果在這里調(diào)用 throw 會(huì)發(fā)生什么?
}
// 提示: 會(huì)報(bào)錯(cuò)會(huì)打破循環(huán)
// 注意: 好吧,這不僅僅是個(gè)提示
假設(shè)一些操作符是同步處理的(map、filter、scan 等等),如果其中一個(gè)或任何其它同步操作符報(bào)錯(cuò)了,你會(huì)在多播(使用 Subject 來循環(huán)觀察者列表并通知它們)的下游得到一些詭異的行為:
// 會(huì)發(fā)生奇怪的行為
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
在上面的示例中,大多數(shù)用戶會(huì)期望 A 和 C 能繼續(xù)通知。輸出 B 的 observable 死了是可以理解的,它報(bào)錯(cuò)了,但其他流和源流也死了令人想當(dāng)困惑。任意的第三方都可以殺掉共享的 observable 流以及未知數(shù)量的兄弟流,不應(yīng)該是這樣的。這是一個(gè)脆弱的抽象,我們需要在 RxJS 接下來的版本中修復(fù)它。
臨時(shí)解決上述場(chǎng)景中的問題很簡(jiǎn)單,感謝調(diào)度器( schedulers )。你可以在多播后使用 observeOn,這樣就可以解決此問題,因?yàn)殄e(cuò)誤不再是同步拋出的。
const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // 點(diǎn)睛之筆
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... 等等
還有另一種臨時(shí)解決方案,如果你可以管理它的話,它的性能會(huì)更好一些,方法是只需為所有的 subscriptions 添加錯(cuò)誤處理方法。
const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // 點(diǎn)睛之筆
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(
x => console.log('B', x),
err => console.log('Error handled: ' + err.message)
);
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// "Error handled: oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... 等等
Observable 的未來
TC39 Observable 提議 新的化身,不包括 CancelToken 業(yè)務(wù),提議本身就是一整篇文章,它可能會(huì)在沒有錯(cuò)誤處理方法的情況下通過“捕獲”錯(cuò)誤來解決這個(gè)問題。也就是說,它不再會(huì)到達(dá)觀察者鏈末端并重新拋出錯(cuò)誤。在 RxJS 未來的版本中,我想我們也會(huì)做同樣的事情,因?yàn)檫@才是正確的。當(dāng)然,這個(gè)問題是公開討論的,但我個(gè)人認(rèn)為這不會(huì)有太多阻力。
總結(jié)
- Subjects 既是 observer,又是 observable
- Subjects 對(duì)內(nèi)部的觀察者列表進(jìn)行“多播”
- Observables 只是建立觀察的函數(shù)
- Observables 當(dāng)前不會(huì)捕獲錯(cuò)誤,但它們應(yīng)該捕獲
- 向 Subject 的下游同步拋出錯(cuò)誤會(huì)殺掉此 Subject
- 你可以使用錯(cuò)誤處理方法或 observeOn 來解決 #4 中的問題
- 關(guān)于 Promise 的錯(cuò)誤捕獲,我錯(cuò)了。這是個(gè)好主意,因?yàn)?promises 是多播的 *
- 未來版本的 RxJS 很可能會(huì)捕獲錯(cuò)誤
- 雖然可能不是完全必要的,因?yàn)?promises 永遠(yuǎn)是異步的。 (聳肩臉)