Akka HTTP之服務(wù)器發(fā)送的事件支持

服務(wù)器發(fā)送的事件(SSE)是一個用于將通知從 HTTP 服務(wù)器推送到客戶端輕量級、標(biāo)準(zhǔn)化協(xié)議。與提供雙向通信的 WebSocket 相比, SSE 只允許從服務(wù)器到客戶端的單向通信。如果這是你所需要的, SSE 的優(yōu)點是要簡單得多, 只能依賴于 HTTP, 并提供瀏覽器中斷的連接的重試語義。

根據(jù) SSE 規(guī)范, 客戶端可以通過 HTTP 從服務(wù)器請求事件流。服務(wù)器使用具有固定字符編碼 UTF-8 的媒體類型text/event-stream進(jìn)行響應(yīng), 并保持響應(yīng)打開, 以便在可用時將事件發(fā)送到客戶端。事件是文本結(jié)構(gòu), 它持有字段并以空行終止, 例如

data: { "username": "John Doe" }
event: added
id: 42

data: another event

重新連接后,客戶端可以選擇發(fā)送Last-Event-ID(標(biāo)識最后一個已接受事件)頭部給服務(wù)器。

模型

Akka HTTP 將事件流表示為Source[ServerSentEvent, NotUsed], 其中 ServerSentEvent 是具有以下只讀屬性的樣例類:

  • data: String – 實際有效載荷, 可能跨越多行
  • eventType: Option[String] – 可選限定符, 例如. “added”, “removed”, 等等.
  • id: Option[String] – 可選標(biāo)識符
  • retry: Option[Int] – 可選的重新連接延遲 (毫秒)

根據(jù) SSE 規(guī)范Akka HTTP 還提供了Last-Event-ID頭部和text/event-stream媒體類型。

服務(wù)器端用法: 編組

為了響應(yīng)帶有事件流的 HTTP 請求, 必須將 EventStreamMarshalling 定義的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隱式引入到各自路由定義的范圍中:

import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}

客戶端用法:解組

為了解組作為Source[ServerSentEvent, NotUsed]的事件流, 必須將 EventStreamUnmarshalling 定義的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]] 隱式引入到范圍中:

import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}

請注意, 如果您正在尋找一種能夠永久訂閱事件流的彈性方法, Alpakka 提供的 EventSource 連接器可以使用上次收到的事件 id 自動重新連接。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • SSE概述 傳統(tǒng)的網(wǎng)頁都是瀏覽器向服務(wù)器“查詢”數(shù)據(jù),但是很多場合,最有效的方式是服務(wù)器向瀏覽器“發(fā)送”數(shù)據(jù)。這要...
    wavesnow閱讀 3,991評論 3 5
  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 12,398評論 6 13
  • 外面飄起了雨滴 看這時間也只能各奔東西 習(xí)慣了并肩而行 因為你有迷人的魅力 雨水模糊了眼鏡 習(xí)慣了孤單落寂 習(xí)慣了...
    小糊童Mr閱讀 229評論 0 0
  • 一大清早,秦奎安就帶雷杰回到了雷厲的家中,面對著滿布雞血沒有清理的場景,再加上秦奎安昨晚的那一番話,讓雷杰更加的害...
    GARBIE閱讀 268評論 2 1

友情鏈接更多精彩內(nèi)容