服務(wù)器發(fā)送的事件(SSE)是一個用于將通知從 HTTP 服務(wù)器推送到客戶端輕量級、標(biāo)準(zhǔn)化協(xié)議。與提供雙向通信的 WebSocket 相比, SSE 只允許從服務(wù)器到客戶端的單向通信。如果這是你所需要的, SSE 的優(yōu)點是要簡單得多, 只能依賴于 HTTP, 并提供瀏覽器中斷的連接的重試語義。
根據(jù) SSE 規(guī)范, 客戶端可以通過 HTTP 從服務(wù)器請求事件流。服務(wù)器使用具有固定字符編碼 UTF-8 的媒體類型text/event-stream進(jìn)行響應(yīng), 并保持響應(yīng)打開, 以便在可用時將事件發(fā)送到客戶端。事件是文本結(jié)構(gòu), 它持有字段并以空行終止, 例如
data: { "username": "John Doe" }
event: added
id: 42
data: another event
重新連接后,客戶端可以選擇發(fā)送Last-Event-ID(標(biāo)識最后一個已接受事件)頭部給服務(wù)器。
模型
Akka HTTP 將事件流表示為Source[ServerSentEvent, NotUsed], 其中 ServerSentEvent 是具有以下只讀屬性的樣例類:
-
data: String– 實際有效載荷, 可能跨越多行 -
eventType: Option[String] – 可選限定符, 例如. “added”, “removed”, 等等. -
id: Option[String]– 可選標(biāo)識符 -
retry: Option[Int] – 可選的重新連接延遲 (毫秒)
根據(jù) SSE 規(guī)范Akka HTTP 還提供了Last-Event-ID頭部和text/event-stream媒體類型。
服務(wù)器端用法: 編組
為了響應(yīng)帶有事件流的 HTTP 請求, 必須將 EventStreamMarshalling 定義的 ToResponseMarshaller[Source[ServerSentEvent, Any]] 隱式引入到各自路由定義的范圍中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
客戶端用法:解組
為了解組作為Source[ServerSentEvent, NotUsed]的事件流, 必須將 EventStreamUnmarshalling 定義的 FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]] 隱式引入到范圍中:
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._
import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME
def route: Route = {
import akka.http.scaladsl.marshalling.sse.EventStreamUnmarshalling._
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
請注意, 如果您正在尋找一種能夠永久訂閱事件流的彈性方法, Alpakka 提供的 EventSource 連接器可以使用上次收到的事件 id 自動重新連接。