響應(yīng)式思維 | Ben Lesh

響應(yīng)式思維 (Thinking Reactively) | Ben Lesh

Ben Lesh 是 RxJS 庫的領(lǐng)導(dǎo)者和布道者,提倡使用響應(yīng)式思維來抽象邏輯和編寫程序,現(xiàn)就職于 Google 。而本文則是對他的一篇研報(bào)的記錄,該研報(bào)是在 AngularConnect 會(huì)議中匯報(bào)的。研報(bào)首先從一個(gè)實(shí)例開始談起:

實(shí)例: Drag & Drop

內(nèi)容

每次 在目標(biāo)上按下鼠標(biāo)(mousedown) ,開始監(jiān)聽 頁面上鼠標(biāo)移動(dòng)(mousemove) 直到 鼠標(biāo)彈起(mouseup)

相關(guān)概念(基礎(chǔ)函數(shù))定義:

const target = document.querySelect('#target')

const targetMouseDown$ = Observable.fromEvent(target, 'mousedown')

const docMouseMove$ = Observable.fromEvent(target, 'mouseover')

const docMouseUp$ = Observable.fromEvent(target, 'mouseup')
  1. 變量后 $ 表示該變量是 Observable 。
  2. Observable 本質(zhì)是一個(gè)函數(shù),后面 Ben 會(huì)解釋。

分析

  1. 頁面上鼠標(biāo)移動(dòng)(mousemove)直到鼠標(biāo)彈起(mouseup) =>
docMouseMove$.takeUntil(docMouseUp$)

操作符 takeUntil 使得 docMouseMove$ 持續(xù)推送數(shù)據(jù),直到 docMouseUp$ 推送一個(gè)通知(數(shù)據(jù))后停止。附上 takeUntil 彈珠圖:

takeUntil
  1. 每次在目標(biāo)上按下鼠標(biāo)(mousedown),開始監(jiān)聽 =>
const dragDrop$ = targetMouseDown$.switchMap(() =>
  docMouseMove$.takeUntil(docMouseUp$)
)

操作符 switchMaptargetMouseDown$ 推送的值傳入進(jìn)內(nèi)部函數(shù)(該例不傳入推送值,推送僅僅作為通知使用),然后 切換docMouseMove$ 并壓平輸出(例中 targtMouseDown$ 僅僅推送一次,內(nèi)部函數(shù)僅執(zhí)行一次,因此無須壓平)。附上 switchMap 彈珠圖:

switchMap

小結(jié)

解決問題的思路應(yīng)該是從后向前推導(dǎo),確認(rèn)好每個(gè)事件流,根據(jù)問題組織事件流。

進(jìn)一步了解響應(yīng)式思維

流變量和非流變量 (自己臆想的,慎看)

在系統(tǒng)中流變量都是 Observable。

變量顧名思義是變化的,根據(jù)廣義、狹義相對論可知,變化是針對參照物來說的(提高點(diǎn) X 格),而非流變量的參照物是整個(gè)程序和時(shí)間軸。但是如果將參照物改為某一行代碼,非流變量隨時(shí)間是不變的,只是一個(gè)占位符。

var c = a + b // 站在這里,發(fā)現(xiàn) c 一直不變。
doSometing(c) // 對非流變量的操作,僅調(diào)用一次

流變量則是一種指向流( stream )的標(biāo)識符。

var c$ = a$.combineLatest(b$, (a, b) => a + b) // 站在這里感受下涓涓細(xì)流
c$.subscribe(doSomething) // 對流變量的操作,回調(diào)多次

操作符 combineLatest 對任一個(gè) Observable 推送的值,都與其他 Observable 最后值融合。具體融合方法以函數(shù)形式給出。慣例附上 combineLatest 彈珠圖:

combineLatest

沒有了操作符,Observable 就是 。。。

// 名字就是標(biāo)識符,可以改成 asy.ok(...)
promise.then(successFn, errorFn)

// 名字就是標(biāo)識符,可以改成 asy.ok(...)
observable.subscribe(nextFn, errorFn, completeFn)

大概 60+ 的操作符,請查看官網(wǎng)

重點(diǎn):解密 Observable

Observable 內(nèi)部是什么?

  • 攪亂腦汁的復(fù)雜異步
  • 應(yīng)用于航空科技的算法
  • 黑魔法
  • 獨(dú)角獸

以上是 Ben 總結(jié)的。

Observable 僅僅是一個(gè)函數(shù)

  1. Observable 有一個(gè)名為 observer 的參數(shù):
const myObservable = observer => {}
  1. observer 對象會(huì)有幾個(gè)方法:
const myObservable = observer => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
}
  1. Observable 會(huì)返回銷毀邏輯:
const myObservable = observer => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
  return () => clearInterval(id) // 用于終止訂閱
}
  1. 調(diào)用 Observable 函數(shù)的同時(shí)訂閱了你的 observer
const myObservable = (observer) => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
  return () => clearInterval(id) // 用于終止訂閱
}

// 訂閱你的 observer
const teardown = myObservable({
  next(x) {console.log(x)},
  error(err) {console.error(err)},
  complete() {console.info('done)}
})

// 1 秒后取消訂閱
setTimeout(() => {
  teardown()
}, 1000)

操作符也是一個(gè)函數(shù)

操作符函數(shù)吃進(jìn)一個(gè) Observable 吐出一個(gè) Observable :

const operator = InputObservable => OutputObservable
  1. 將 OutputObservable 變量展開成函數(shù)形式:
const operator = (InputObservable) => {
  return (OutObserver) => {...}
}
  1. 操作符是一個(gè)函數(shù),她的參數(shù)是 Observable 并且輸出也是 Observable ,也就是通過 InputObservable 構(gòu)建 OutputObservable :
const operator = InputObservable => {
  return OutObserver => {
    return InputObservable(InObserver)
  }
}
  1. Observable 就是一個(gè)擁有 observer 參數(shù)的函數(shù),而 observer 對象的形式是約定好的:
const observer = {
  next: (data) => {...},
  error: (err) => {...},
  complete: () => {...}
}

也可以短方法聲明:

const observer = {
  next(data) {...},
  error(err) {...},
  complete() {...}
}
  1. 構(gòu)建 InObserver 和 OutObserver 之間的映射關(guān)系(操作符是 Observable 和 Observer 之間的操作,而此時(shí)還沒有給出映射函數(shù),所以 InObserver 和 OutObserver 其實(shí)現(xiàn)在還沒有變化 ):
const operator = InputObservable => {
  return OutObserver => {
    return InputObservable({
      next(data) {
        OutObserver.next(data)
      },
      error(err) {
        OutObserver.error(err)
      },
      complete() {
        OutObserver.complete()
      }
    })
  }
}

不難看出 next(data) { OutObserver.next(data)} 等同于 next = OutObserver.next, errorcomplete 類似,意味著此時(shí) InObserver 等于 OutObserver。

  1. 最后添加操作推送數(shù)據(jù)的映射函數(shù):
const operator = (InputObservable, mapFn) => {
  return OutObserver => {
    return InputObservable({
      next(data) {
        OutObserver.next(mapFn(data))
      },
      error(err) {
        OutObserver.error(err)
      },
      complete() {
        OutObserver.complete()
      }
    })
  }
}

單獨(dú)分析 next 方法來觀察 InObserverOutObserver 的關(guān)系:

function InObserver.next(data) {
  let newData = mapFn(data)
  OutObserver.next(newData)
}
  1. 驗(yàn)證?,F(xiàn)在把之前創(chuàng)建的 myObservableoperator 應(yīng)用到程序中:
const source = operator(myObservable, x => x + '!')

const teardown = source({
  next(data) {
    console.log(data)
  },
  error(err) {
    consol.log(err)
  },
  complete() {
    console.log('done')
  }
})

// 4 秒后取消訂閱
setTimeout(() => {
  teardown()
}, 4000)

輸出的結(jié)果:

0!
1!
2!
...

這里的結(jié)果有問題,因?yàn)槭盏?complete 推送后,并沒有取消訂閱,因此上面代碼設(shè)置了顯式的取消訂閱過程。Ben 在另外的研報(bào)中詳細(xì)介紹了使用 safeObserver 解決上述問題。

  1. 酷,來幾個(gè)串行操作。
const source = operator(operator(myObservable, x => x + '!'), x => x + '?')

const teardown = source({
  next(data) {
    console.log(data)
  },
  error(err) {
    consol.log(err)
  },
  complete() {
    console.log('done')
  }
})

// 4 秒后取消訂閱
setTimeout(() => {
  teardown()
}, 4000)

太繁瑣了,想想就頭痛:

const source = operator(operator(opserator(observable, mapFn), mapFn), mapFn)
  1. 把 Observable 函數(shù)用類來包裹(注意僅僅是把 Observable 函數(shù)打包進(jìn)類里,并不是把 Observable 函數(shù)轉(zhuǎn)化成類),操作符作為類的方法,這樣便可以使用鏈?zhǔn)綄懛ㄕ{(diào)用操作符了:
class Observable {
  constructor(observableFn) {
    this.subscribe = observableFn // 好記憶的標(biāo)識 subscribe
  }
}

const myObservable = new Observable((observer) => {...})

const teardown = myObservable.subscribe({
  next(data) { console.log(data) },
  error(err) { consol.log(err) },
  complete() { console.log('done') }
})
  1. 添加 map 操作符到類中:
class Observable {
  constructor(observableFn) {
    this.subscribe = observableFn // 好記憶的標(biāo)識 subscribe
  }

  map(mapFn) {
    return new Observable(observer => {
      return this.subscribe({
        next(data) {
          observer.next(mapFn(data))
        },
        error(err) {
          observer.error(err)
        },
        complete() {
          observer.complete()
        }
      })
    })
  }
}

現(xiàn)在使用鏈?zhǔn)綄懖僮鞣囋嚕?/p>

myObservable
  .map(x => x + '!')
  .map(x => x + '?')
  .map(x => x + '.')
  .subscribe({
    next(data) {
      console.log(data)
    }
  })

小結(jié)

  • Observable 就是函數(shù)。
  • 因?yàn)楹瘮?shù)僅在調(diào)用時(shí)執(zhí)行,所以 Observable 是惰性的。
  • 操作符也是函數(shù),輸入是 Observable,輸出也是 Observable 。
  • 鏈?zhǔn)讲僮骶褪沁B接每個(gè)操作的 observer 。

圖示

正常推送數(shù)據(jù)

下面將 Observable 的運(yùn)行過程可視化,先給出 Observable 的訂閱實(shí)例:

Observable.interval(1000) // like setInterval
  .filter(x => x % 2 === 0)
  .map(x => x + x)
  .subscribe(next, error, complete)
  1. 開始是數(shù)據(jù)的產(chǎn)生者
producer
  1. 流程的最后是你的回調(diào)處理,也就是數(shù)據(jù)的消費(fèi)者
consumer
  1. 最初推送的是 0 ,每一步的圖示:
推送數(shù)值 0
filter
map
consumer
  1. 然后是推送 1 :
推送數(shù)值 1
filter

然后就沒有然后了, 1 被 filter 過濾掉了。

重點(diǎn):異常處理

異常處理過程可能會(huì)給我們帶來一些疑惑,主要是因?yàn)橐韵聨讉€(gè)事件影響:

  • error() 被調(diào)用
  • complete() 被調(diào)用
  • 取消訂閱

這些事件發(fā)生后, Observable 將不再推送數(shù)據(jù)。繼續(xù)給出實(shí)例:

Observable.interval(1000)
  .map(x => {
    if (x === 1) {
      throw new Error('haha')
    }
    return x
  })
  .subscribe(next, error, complete)
  1. 處理推送 0 的過程略過。

  2. 生產(chǎn)者推送 1 是,拋出了異常:

map計(jì)算
得到異常
取消訂閱
處理異常

當(dāng)拋出異常后,Observable 不會(huì)繼續(xù)推送數(shù)據(jù)(取消訂閱),而消費(fèi)者將會(huì)使用 error() 處理異常。

這個(gè)還有個(gè)問題,Ben 在他的專欄里提到過,多播場景的錯(cuò)誤捕獲。

  1. 當(dāng)異常拋出后,Observable 就掛掉了,如果還想繼續(xù)推送如何實(shí)現(xiàn)?答案是:創(chuàng)建 observer 分支。
Observable.interval(10000)
  .switchMap(() => this.http.get(url).catch(err => Observable.empty()))
  .subscribe(data => render(data))

先來學(xué)習(xí)操作符 catch ,它會(huì)捕獲和處理 Observable 推送的異常,并返回一個(gè)新的 Observable 或者繼續(xù)拋出異常。附上彈珠圖:

catch

圖示具體流程:

生產(chǎn)者
switchMap

網(wǎng)絡(luò)不好,查詢過程超時(shí)。

Ajax超時(shí)

取消 Ajax Observable 的訂閱

catch

轉(zhuǎn)化異常到新的 Observable

取消分支訂閱

取消 Observable.empty 的訂閱

小結(jié):

  • 創(chuàng)建另外一個(gè) observer 鏈
  • 使用 catch 增強(qiáng)這個(gè)鏈的魯棒性
  • 守護(hù)了原始的 observer 鏈

響應(yīng)式思維的適用場景

Ben 在研報(bào)的最后分析了響應(yīng)式思維的使用場景,這里簡單的將 PPT 頁翻譯,具體的實(shí)際應(yīng)用還是需要在編程中發(fā)覺和選擇的。

  • 將多個(gè)事件融合在一起
  • 添加延時(shí)
  • 客戶端限制流量
  • 協(xié)調(diào)異步任務(wù)
  • 需要注銷機(jī)制

總結(jié)

最后將該文的具體內(nèi)容概述為以下 6 個(gè)方面。

  • 逆向思維
  • 任何的變量都可以被觀察
  • Observable 是函數(shù)
  • Observer 鏈處理計(jì)算
  • 調(diào)用 error() 會(huì)終止 observer 鏈
  • 盡情使用 Rx 。

本文是在我學(xué)習(xí) RxJS 過程中為了加強(qiáng)記憶和便于理解而記錄的,里面添加了大量的個(gè)人學(xué)習(xí)傾向,局限于個(gè)人知識面有限,難免有不當(dāng)和錯(cuò)誤之處,歡迎大家批評指導(dǎo)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容