注
文中用到multicasting這個(gè)術(shù)語(yǔ),這個(gè)詞用得很準(zhǔn)確。在網(wǎng)絡(luò)通訊中mulitcasting這個(gè)詞表示把數(shù)據(jù)發(fā)給一組特定的目標(biāo)節(jié)點(diǎn),一般譯為多播或者組播。Broadcasting是廣播,表示把數(shù)據(jù)發(fā)給網(wǎng)中所有節(jié)點(diǎn)。為了保持準(zhǔn)確性,下文把multicasting都翻譯為多播,如果你覺(jué)得用中文讀起來(lái)有點(diǎn)別扭,可以腦補(bǔ)為廣播,可能理解起來(lái)更方便。
正文
多播是減少RxJava重復(fù)執(zhí)行的重要方法。
多播一個(gè)事件是指,你發(fā)送同一個(gè)事件給所有下游operators/subscribers。例如做網(wǎng)絡(luò)請(qǐng)求這種耗時(shí)操作時(shí)經(jīng)常這樣用,但是我們不希望為每一個(gè)subscriber重復(fù)執(zhí)行相同的網(wǎng)絡(luò)請(qǐng)求,最好只請(qǐng)求一次,然后多播請(qǐng)求結(jié)果。
有兩個(gè)辦法實(shí)現(xiàn)多播:
- 使用ConnectableObservable (via publish() or replay())
- 使用Subject
任何在ConnectableObservable或者Subject之前的動(dòng)作都只會(huì)被執(zhí)行一次。然后執(zhí)行結(jié)果被多播給所有下游Subscribers。
這里有一個(gè)必須認(rèn)識(shí)到的微妙的地方:多播只在ConnectableObservable或Subject時(shí)才發(fā)生。同時(shí),多播之后發(fā)生的任何動(dòng)作是為每個(gè)Subscriber執(zhí)行一次。
看個(gè)例子:
Observable<String> observable = Observable.just("Event")
.publish()
.autoConnect(2)
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
});
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 got: Event
// Expensive operation for Event
// Sub2 got: Event
代碼中有ConnectableObservable,一個(gè)耗時(shí)的map()操作和兩個(gè)訂閱者。
這里有個(gè)問(wèn)題,即使我們添加了publish()來(lái)避免重復(fù)執(zhí)行,那個(gè)耗時(shí)的map()還是被執(zhí)行了兩次,這是我們不期望的。具體流程如下圖:

如果希望map()只執(zhí)行一次,應(yīng)該把它放在publish()之前調(diào)用:
Observable<String> observable = Observable.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
}) .publish()
.autoConnect(2);
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 received: Event
// Sub2 received: Event
更新后的流程如下:

這告訴我們什么呢?如果你想用多播減少不必要的開(kāi)銷,一定要在正確的位置做多播。
不論好壞,很多人都在使用Subject。它的一大優(yōu)點(diǎn)是多播,但是你必須要清楚,它們只在他們發(fā)出的時(shí)候多播。如果你有一批耗時(shí)操作要發(fā)布給下游的Subject,那你就需要考慮給下游的什么地方添加一個(gè)publish()。
.