Scala Reactive Extensions
由來:
異步編程(Future)避免了blocking,并且提供簡單的方法組合多個future的值。但是,future只能處理單個結(jié)果,對于http請求或者異步計算,有時候它的能力就夠了,比如我們需要對一個計算中發(fā)生的多種事件,或者狀態(tài)做出反應(yīng),舉例:
- 跟蹤使用Future來下載文件的下載進(jìn)度(其實也可以吧,使用10個Future代表十個點,注冊回調(diào),沒達(dá)成一個就增加一個值)
但是,事件流就很適合這個場景,因為他們可以產(chǎn)生任意數(shù)量的事件值。另外,類似Future,event streams也是第一公民,可以使用函數(shù)式的組合子進(jìn)行組合轉(zhuǎn)換。
事件驅(qū)動(event-driven programming)
是一種編程范式,風(fēng)格,程序的流程是由事件決定的??赡苁峭獠枯斎?,用戶的行為,或者來自其他的計算結(jié)果。
反應(yīng)式編程(reactive programming)
也是一種編程范式,處理變化的傳播和數(shù)據(jù)的流動。和事件驅(qū)動是很相近的概念。相比傳統(tǒng)的范式,它可以表達(dá)不同的數(shù)據(jù)約束。由于分布式,并發(fā),大數(shù)據(jù)量應(yīng)用的驅(qū)動,反應(yīng)式越來越流行了。
因為,傳統(tǒng)的回調(diào)和指令式API有以下問題:
- 模糊了程序的流程,大量的回調(diào)讓業(yè)務(wù)邏輯難以閱讀,call-back hell
- 搭配上并發(fā)問題,以及依賴可變的變量,加上復(fù)雜的環(huán)境(網(wǎng)絡(luò),分布式,CPU執(zhí)行一定程度的隨機(jī)性)讓bug難以被察覺
反應(yīng)式流抓住了回調(diào)的pattern(共有模式),讓使用者更容易清晰表達(dá)邏輯,是一種更結(jié)構(gòu)化的方式來構(gòu)建事件驅(qū)動系統(tǒng)。
Reactive extensions
一種編程框架,使用event streams(事件流),組合異步和事件驅(qū)動的編程框架
Rx中的一些核心思想:將事件和數(shù)據(jù)同等看待。
組件:
- Observable 產(chǎn)生事件的對象,事件源
- Observer 觀察事件,需要對事件做出最終反應(yīng)
- Event 事件,事件流中的事件可以具有不同類型:正常的,錯誤的,完成事件通知,訂閱事件通知
- Subscription 事件源和觀察者之間的訂閱關(guān)系
為了將反應(yīng)式流應(yīng)用到項目中,我們需要完成以下課題:
- 如何創(chuàng)建observable,什么樣的輸入,事件,可以作為observable?它的事件應(yīng)該是怎么樣的?
- observable和observer之間聯(lián)系起來
- 使用訂閱關(guān)系取消關(guān)系
- 使用組合子組合observables
- 使用Rx調(diào)度來控制并發(fā)(事件也是數(shù)據(jù))
- 如何設(shè)計更大的應(yīng)用
// https://mvnrepository.com/artifact/io.reactivex/rxscala
libraryDependencies += "io.reactivex" %% "rxscala" % "0.27.0"
//https://github.com/ReactiveX/RxScala
Notice:
這個scala的reactive extension項目已經(jīng)結(jié)束了,不再維護(hù)和迭代,不能用在生產(chǎn)中,只是用來學(xué)習(xí)而已。不過替代品有:
- RxJava (Java, similar API as RxScala)
- Reactor (Java)
- Akka Streams (Scala & Java)
- monix (Scala)
- cats-effect (Scala)
- fs2 (Scala)
- ZIO (Scala)
創(chuàng)建Observable
直接的方式
使用api創(chuàng)建,但是我們一般不會這么做,因為事件常常是由其他產(chǎn)生的, Observable.from
同步訂閱的observable
val o = Observable.items("pa","python","java")
o.subscribe(_ => print)
o.subscribe(_ => println)
subscribe方法立刻執(zhí)行,不會等待,且跑在當(dāng)前線程上。
異步訂閱的observable
Observable.timer(1 second)
subscribe方法會等到一秒后事件產(chǎn)生,且不會阻塞當(dāng)前線程,它泡在Rx的內(nèi)部實現(xiàn)的線程里。
根據(jù)實際自定義
Observable.create或者apply方法`
@deprecated("Use [[Observable.apply]] instead", "0.26.2")
def create[T](f: Observer[T] => Subscription): Observable[T] = {
Observable(
(subscriber: Subscriber[T]) => {
val s = f(subscriber)
if(s != null && s != subscriber) {
subscriber.add(s)
}
}
)
}
一個Observable的核心有:
- 如何產(chǎn)生數(shù)據(jù)
- 如何把數(shù)據(jù)通知到Observer
- 如何建立和Observer的訂閱關(guān)系
create方法接受的f函數(shù)便是做2,3。
而1 數(shù)據(jù)如何產(chǎn)生,可以有多種方式,從已有的集合,或者Future等。在函數(shù)f的實際調(diào)用中,將Future的值,增加回調(diào)送給Observer的onNext,onError,如下面的示例:
object ObservablesCreateFuture extends App {
val f = Future { "Back to the Future(s)" }
val o = Observable.create[String] { obs =>
f foreach { case s => obs.onNext(s); obs.onCompleted() }
f.failed foreach { case t => obs.onError(t) }
Subscription()
}
o.subscribe(log _)
}
那如何取消呢?因為一旦注冊Future的回調(diào),是沒有直接的語法進(jìn)行Future回調(diào)取消的。其實不用取消,觀察代碼,對于future f,因為寫法的問題,它并不是Observable產(chǎn)生的數(shù)據(jù),但實際對應(yīng)的效果是已經(jīng)產(chǎn)生的事件,而取消操作,只需要正對未產(chǎn)生的事件。
訂閱關(guān)系
針對普通的訂閱關(guān)系,如:
def hotModified(dir:String):Observable[String] = {
Observable.create {ob =>
val fileMonitor = new FileAlterationMonitor(1000)
val fileObs = new FileAlterationObserver(dir)
val fileLis = new FileAlterationListenerAdaptor() {
override def onFileChange(file: File): Unit = {
ob.onNext(file.getName)
}
}
fileObs.addListener(fileLis)
fileMonitor.addObserver(fileObs)
fileMonitor.start()
Subscription { fileMonitor.stop() }
}
}
val fileMonitor = new FileAlterationMonitor(1000)
fileMonitor.start()
def hotModified2(directory: String): Observable[String] = {
val fileObs = new FileAlterationObserver(directory)
fileMonitor.addObserver(fileObs)
Observable.create { observer =>
val fileLis = new FileAlterationListenerAdaptor {
override def onFileChange(file: java.io.File) {
observer.onNext(file.getName)
}
}
fileObs.addListener(fileLis)
Subscription { fileObs.removeListener(fileLis) }
}
}
可以直接切斷事件源和觀察者的聯(lián)系。
另外對于上述兩種代碼,一種是 cold observables 另一種是hot observables
前者等有觀察者來了,才開始放出事件,后者是一直在放出事件
組合
略
嵌套的Observables — high-order event stream
nested observable, also called high-order event stream, is an observable object that emits events that are observables objects.
def fetchQuoteObservable(): Observable[String] = {
Observable.from(fetchQuote()) // fetchQuote returns Future[String]
}
def quotes: Observable[Observable[String]] =
Observable.interval(0.5 seconds).take(4).map {
n => fetchQuoteObservable().map(txt => s"$n) $txt")
}
Observable[Observable[String]]很類似Seq[Seq[String]],調(diào)用類似的方法即可,區(qū)別在于,前者是有時間順序的,后者沒有。
concat,將他們按照第一層的時間序排列。
flatten,按照第二層的完成時間排列
錯誤處理
observable一遇到錯誤就會停下,不會繼續(xù)。
- 停止
- 重試
- 給出不同的事件:
obversable[T].onErrorReturn(partialFunction[Any,T])obversable[T].onErrorResumeNext(Function[Throwable, Observable[T]])
但是可以利用這個特性來重試:
object Solution extends App{
def randomQuote = Observable.create[String] { obs =>
val url = "http://www.iheartquotes.com/api/v1/random?" +
"show_permalink=false&show_source=false"
obs.onNext(Source.fromURL(url).getLines.mkString)
obs.onCompleted()
Subscription()
}
import Observable._
def errorMessage = Observable.from("Retrying...") ++ error(new Exception)
def quoteMessage = for {
text <- randomQuote
message <- if (text.length < 100) Observable.from(text) else errorMessage
} yield message
quoteMessage.retry(5) subscribe println _
Thread.sleep(2500)
}
除此之外還可以重復(fù)repeat, is used to repeat events from completed Observable objects,相當(dāng)于時間重播。
調(diào)度
除了在當(dāng)前線程,Rx框架自己的線程,以及由Future來的元素從ExecutionContext的某個線程 來emit事件,我們從一個Observable對象創(chuàng)建另一個Observable對象的時候指定一個線程,這就是調(diào)度scheduler
略
利用subject來做top-bottom programming
bottom-up style: created a new observable from an existing observable with map function etc.
top-down style: declare an observable object, and define its dependencies later.
subject is both an observable and an obverser
Use Subject instances when you need to create an Observable object whose inputs are
not available when the Observable object is created
另外它還可以緩存所有已經(jīng)產(chǎn)生的事件,這樣在其他的組件注冊進(jìn)來后,可以對他們重放events
其實不過是中間加一層抽象罷了,果然軟件的世界就是取名字加抽象層。
- [1] learning-concurrent programming in scala 2nd