閱讀對(duì)象:本文適合SpringBoot 初學(xué)者及對(duì)SpringBoot感興趣的童鞋閱讀。
背景介紹:在企業(yè)級(jí) WEB 應(yīng)用開(kāi)發(fā)中,為了更好的用戶體驗(yàn)&提升響應(yīng)速度,往往會(huì)將一些耗時(shí)費(fèi)力的請(qǐng)求 (Excel導(dǎo)入or導(dǎo)出,復(fù)雜計(jì)算, etc.) 進(jìn)行異步化處理。 由此帶來(lái)的一個(gè)重要的問(wèn)題是如何通知用戶任務(wù)狀態(tài),常見(jiàn)的方法大致分為2類4種:
HTTP Pollingclient pullHTTP Long-Pollingclient pullServer-Sent Events (SSE)server pushWebSocketserver push
1. Polling 短輪詢
是一種非常簡(jiǎn)單的實(shí)現(xiàn)方式。就是client通過(guò)定時(shí)任務(wù)不斷得重復(fù)請(qǐng)求服務(wù)器,從而獲取新消息,而server按時(shí)間順序提供自上次請(qǐng)求以后發(fā)生的單個(gè)或多個(gè)消息。

短輪詢的優(yōu)點(diǎn)非常明顯,就是實(shí)現(xiàn)簡(jiǎn)單。當(dāng)兩個(gè)方向上的數(shù)據(jù)都非常少,并且請(qǐng)求間隔不是非常密集時(shí),這種方法就會(huì)非常有效。例如,新聞評(píng)論信息可以每半分鐘更新一次,這對(duì)用戶來(lái)說(shuō)是可以的。
它得缺點(diǎn)也是非常明顯,一旦我們對(duì)數(shù)據(jù)實(shí)時(shí)性要求非常高時(shí),為了保證消息的及時(shí)送達(dá),請(qǐng)求間隔必須縮短,在這種情況下,會(huì)加劇服務(wù)器資源的浪費(fèi),降低服務(wù)的可用性。另一個(gè)缺點(diǎn)就是在消息的數(shù)量較少時(shí),將會(huì)有大量的 request做無(wú)用功,進(jìn)而也導(dǎo)致服務(wù)器資源的浪費(fèi)。
2. Long-Polling 長(zhǎng)輪詢
長(zhǎng)輪詢的官方定義是:
The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.
如果與Polling的方式相比,會(huì)發(fā)現(xiàn)Long-Polling的優(yōu)點(diǎn)是通過(guò)hold open HTTP request 從而減少了無(wú)用的請(qǐng)求。
大致步驟為:
- client向server請(qǐng)求并等待響應(yīng)。
- 服務(wù)端將請(qǐng)求阻塞,并不斷檢查是否有新消息。如果在這個(gè)期間有新消息產(chǎn)生時(shí)就立即返回。否則一直等待至
請(qǐng)求超時(shí)。 - 當(dāng)client
獲取到新消息或請(qǐng)求超時(shí),進(jìn)行消息處理并發(fā)起下一次請(qǐng)求。

Long-Polling的缺點(diǎn)之一也是服務(wù)器資源的浪費(fèi),因?yàn)樗?code>Polling的一樣都屬于被動(dòng)獲取,都需要不斷的向服務(wù)器請(qǐng)求。在并發(fā)很高的情況下,對(duì)服務(wù)器性能是個(gè)嚴(yán)峻的考驗(yàn)。
Note:因?yàn)橐陨?兩種方式的實(shí)現(xiàn)都比較簡(jiǎn)單,所以我們這里就不做代碼演示了。接下來(lái)我們重點(diǎn)介紹一下
Server-Sent Events及WebSocket。
3. Demo概要
下面我們將通過(guò)一個(gè)下載文件的案例進(jìn)行演示SSE和WebSocket的消息推送,在這之前,我們先簡(jiǎn)單說(shuō)一下我們項(xiàng)目的結(jié)構(gòu),整個(gè)項(xiàng)目基于SpringBoot 構(gòu)建。
首先我們定義一個(gè)供前端訪問(wèn)的APIDownloadController
@RestController
public class DownloadController {
private static final Logger log = getLogger(DownloadController.class);
@Autowired
private MockDownloadComponent downloadComponent;
@GetMapping("/api/download/{type}")
public String download(@PathVariable String type, HttpServletRequest request) { // (A)
HttpSession session = request.getSession();
String sessionid = session.getId();
log.info("sessionid=[{}]", sessionid);
downloadComponent.mockDownload(type, sessionid); // (B)
return "success"; // (C)
}
}
- (A)
type參數(shù)用于區(qū)分使用哪種推送方式,這里為sse,ws,stomp這三種類型。 - (B)
MockDownloadComponent用于異步模擬下載文件的過(guò)程。 - (C) 因?yàn)橄螺d過(guò)程為異步化,所以該方法不會(huì)被阻塞并立即向客戶端返回
success,用于表明下載開(kāi)始。
在DownloadController中我們調(diào)用MockDownloadComponent的mockDownload()的方法進(jìn)行模擬真正的下載邏輯。
@Component
public class MockDownloadComponent {
private static final Logger log = LoggerFactory.getLogger(DownloadController.class);
@Async // (A)
public void mockDownload(String type, String sessionid) {
for (int i = 0; i < 100; i++) {
try {
TimeUnit.MILLISECONDS.sleep(100); // (B)
int percent = i + 1;
String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
log.info("username={}'s file has been finished [{}]% ", sessionid, percent);
switch (type) { // (D)
case "sse":
SseNotificationController.usesSsePush(sessionid, content);
break;
case "ws":
WebSocketNotificationHandler.usesWSPush(sessionid, content);
break;
case "stomp":
this.usesStompPush(sessionid, content);
break;
default:
throw new UnsupportedOperationException("");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- (A) 我們使用
@Async讓使其異步化。 - (B) 模擬下載耗時(shí)。
- (C) 消息的格式為
{"username":"abc","percent":1}。 - (D) 根據(jù)不同的
type選擇消息推送方式。
4. Server-Sent Events
SSE 是W3C定義的一組API規(guī)范,這使服務(wù)器能夠通過(guò)HTTP將數(shù)據(jù)推送到Web頁(yè)面,它具有如下特點(diǎn):
- 單向半雙工:只能由server向client推送消息
- 基于http:數(shù)據(jù)被編碼為“text/event-stream”內(nèi)容并使用HTTP流機(jī)制進(jìn)行傳輸
-
數(shù)據(jù)格式無(wú)限制:消息只是遵循規(guī)范定義的一組
key-value格式&UTF-8編碼的文本數(shù)據(jù)流,我們可以在消息payload中可以使用JSON或者XML或自定義數(shù)據(jù)格式。 - http 長(zhǎng)連接: 消息的實(shí)際傳遞是通過(guò)一個(gè)長(zhǎng)期存在的HTTP連接完成的,消耗資源更少
- 簡(jiǎn)單易用的API

瀏覽器支持情況:

Note:IE 瀏覽器可通過(guò)第三方JS庫(kù)進(jìn)行支持SSE
4.1 SpringBoot 中使用SSE
從Spring 4.2開(kāi)始支持SSE規(guī)范,我們只需要在Controller中返回SseEmitter對(duì)象即可。
Note:Spring 5 中提供了Spring Webflux 可以更加方便的使用SSE,但是為更貼近我們的實(shí)際項(xiàng)目,所以文本僅演示使用Spring MVC SSE。
我們?cè)?strong>服務(wù)器端定義一個(gè)SseNotificationController用于和客戶端處理和保存SSE連接. 其endpoint為/api/sse-notification。
@RestController
public class SseNotificationController {
public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)
@GetMapping("/api/sse-notification")
public SseEmitter files(HttpServletRequest request) {
long millis = TimeUnit.SECONDS.toMillis(60);
SseEmitter sseEmitter = new SseEmitter(millis); // (B)
HttpSession session = request.getSession();
String sessionid = session.getId();
SSE_HOLDER.put(sessionid, sseEmitter);
return sseEmitter;
}
/**
* 通過(guò)sessionId獲取對(duì)應(yīng)的客戶端進(jìn)行推送消息
*/
public static void usesSsePush(String sessionid, String content) { // (C)
SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
if (Objects.nonNull(emitter)) {
try {
emitter.send(content);
} catch (IOException | IllegalStateException e) {
log.warn("sse send error", e);
SseNotificationController.SSE_HOLDER.remove(sessionid);
}
}
}
}
- (A)
SSE_HOLDER保存了所有客戶端的SseEmitter,用于后續(xù)通知對(duì)應(yīng)客戶端。 - (B) 根據(jù)指定超時(shí)時(shí)間創(chuàng)建一個(gè)
SseEmitter對(duì)象, 它是SpringMVC提供用于操作SSE的類。 - (C)
usesSsePush()提供根據(jù)sessionId向?qū)?yīng)客戶端發(fā)送消息。發(fā)送只需要調(diào)用SseEmitter的send()方法即可。
至此服務(wù)端已經(jīng)完成,我們使用Vue編寫客戶端Download.html進(jìn)行測(cè)試。核心代碼如下:
usesSSENotification: function () {
var tt = this;
var url = "/api/sse-notification";
var sseClient = new EventSource(url); // (A)
sseClient.onopen = function () {...}; // (B)
sseClient.onmessage = function (msg) { // (C)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.sseMsg += 'SSE 通知您:已下載完成' + percent + "%\r\n";
if (percent === 100) {
sseClient.close(); // (D)
}
};
sseClient.onerror = function () {
console.log("EventSource failed.");
};
}
- (A) 開(kāi)啟一個(gè)新的 SSE connection 并訪問(wèn)
/api/sse-notification。 - (B) 當(dāng)連接成功時(shí)的callback。
- (C) 當(dāng)有新消息時(shí)的callback。
- (D) 當(dāng)下載進(jìn)度為100%時(shí),關(guān)閉連接。
效果演示:

4. WebSocket
WebSocket 類似于標(biāo)準(zhǔn)的TCP連接,它是IETF(RFC 6455)定義的通過(guò)TCP進(jìn)行實(shí)時(shí)全雙工通信一種通信方式,這意味這它的功能更強(qiáng)大,常用于如股票報(bào)價(jià)器,聊天應(yīng)用。
相比于SSE,它不僅可以雙向通信,而且甚至還能處理音頻/視頻等二進(jìn)制內(nèi)容。
Note:使用
WebSocket,在高并發(fā)情況下,服務(wù)器將擁有許多長(zhǎng)連接。這對(duì)網(wǎng)絡(luò)代理層組件及WebSocket服務(wù)器都是一個(gè)不小的性能挑戰(zhàn),我們需要考慮其負(fù)載均衡方案。同時(shí)連接安全等問(wèn)題也不容忽視。
4.1 Spring WebSocket (低級(jí)API)
Spring 4提供了一個(gè)新的Spring-WebSocket模塊,用于適應(yīng)各種WebSocket引擎,它與Java WebSocket API標(biāo)準(zhǔn)(JSR-356)兼容,并且提供了額外的增強(qiáng)功能。
Note: 對(duì)于應(yīng)用程序來(lái)說(shuō),直接使用WebSocket API會(huì)大大增加開(kāi)發(fā)難度,所以Spring為我們提供了 STOMP over WebSocket 更高級(jí)別的API使用WebSocket。在本文中將會(huì)分別演示通過(guò)low level API及higher level API進(jìn)行演示。
如果想在SpringBoot中使用WebSocket,首先需要引入spring-boot-starter-websocket依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
然后就可以配置相關(guān)信息,我們先通過(guò)low level API進(jìn)行演示。
首先需要自定義一個(gè)WebSocketNotificationHandler用于處理WebSocket 的連接及消息處理。我們只需要實(shí)現(xiàn)WebSocketHandler或子類TextWebSocketHandler BinaryWebSocketHandler。
public class WebSocketNotificationHandler extends TextWebSocketHandler {
private static final Logger log = getLogger(WebSocketNotificationHandler.class);
public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>(); // (A)
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { // (B)
String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
WS_HOLDER.put(httpSessionId, session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("handleTextMessage={}", message.getPayload());
}
public static void usesWSPush(String sessionid, String content) { // (C)
WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
if (Objects.nonNull(wssession)) {
TextMessage textMessage = new TextMessage(content);
try {
wssession.sendMessage(textMessage);
} catch (IOException | IllegalStateException e) {
WebSocketNotificationHandler.SESSIONS.remove(sessionid);
}
}
}
}
- (A)
WS_HOLDER用于保存客戶端的WebSocket Session - (B) 重寫
afterConnectionEstablished()方法,當(dāng)連接建立之后,按sessionId將WebSocket Session保存至WS_HOLDER,用于后續(xù)向client推送消息。 - (C) 根據(jù)
sessionId獲取對(duì)應(yīng)WebSocket Session,并調(diào)用WebSocket Session的sendMessage(textMessage)方法向client發(fā)送消息。
使用@EnableWebSocket開(kāi)啟WebSocket,并實(shí)現(xiàn)WebSocketConfigurer進(jìn)行配置。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler();
registry.addHandler(notificationHandler, "/ws-notification") // (A)
.addInterceptors(new HttpSessionHandshakeInterceptor()) // (B)
.withSockJS(); // (C)
}
}
- (A) 將我們自定義的
WebSocketNotificationHandler注冊(cè)至WebSocketHandlerRegistry. - (B)
HttpSessionHandshakeInterceptor是一個(gè)內(nèi)置的攔截器,用于傳遞HTTP會(huì)話屬性到WebSocket會(huì)話。當(dāng)然你也可以通過(guò)HandshakeInterceptor接口實(shí)現(xiàn)自己的攔截器。 - (C) 開(kāi)啟SockJS的支持,SockJS的目標(biāo)是讓應(yīng)用程序使用WebSocket API時(shí),當(dāng)發(fā)現(xiàn)瀏覽器不支持時(shí),無(wú)需要更改任何代碼,即可使用非WebSocket替代方案,盡可能的模擬WebSocket。關(guān)于SockJS的更多資料,可參考https://github.com/sockjs/sockjs-client
server端至此就基本大功告成,接下來(lái)我們來(lái)完善一下client端Download.html,其核心方法如下:
usesWSNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-notification";
var sock = new SockJS(url); // (A)
sock.onopen = function () {
console.log('open');
sock.send('test');
};
sock.onmessage = function (msg) { // (B)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.wsMsg += 'WS 通知您:已下載完成' + percent + "%\r\n";
if (percent === 100) {
sock.close();
}
};
sock.onclose = function () {
console.log('ws close');
};
}
- (A) 首先需要在項(xiàng)目中引入SockJS Client , 并根據(jù)指定URL創(chuàng)建一個(gè)SockJS對(duì)象。
- (B) 當(dāng)有新消息時(shí)的
callback,我們可以在該方法中處理我們的消息。
效果演示:

4.2 STOMP over WebSocket (高級(jí)API)
WebSocket雖然定義了兩種類型的消息,文本和二進(jìn)制,但是針對(duì)消息的內(nèi)容沒(méi)有定義,為了更方便的處理消息,我們希望Client和Server都需要就某種協(xié)議達(dá)成一致,以幫助處理消息。那么,有沒(méi)有已經(jīng)造好的輪子呢?答案肯定是有的。這就是STOMP。
STOMP是一種簡(jiǎn)單的面向文本的消息傳遞協(xié)議,它其實(shí)是消息隊(duì)列的一種協(xié)議, 和AMQP,JMS是平級(jí)的。 只不過(guò)由于它的簡(jiǎn)單性恰巧可以用于定義WS的消息體格式。雖然STOMP是面向文本的協(xié)議,但消息的內(nèi)容也可以是二進(jìn)制數(shù)據(jù)。同時(shí)STOMP 可已使用任何可靠的雙向流網(wǎng)絡(luò)協(xié)議,如TCP和WebSocket,目前很多服務(wù)端消息隊(duì)列都已經(jīng)支持了STOMP, 比如RabbitMQ, ActiveMQ等。
它結(jié)構(gòu)是一種基于幀的協(xié)議,一幀由一個(gè)命令,一組可選的Header和一個(gè)可選的Body組成。
COMMAND
header1:value1
header2:value2
Body^@
客戶端可以使用SEND或SUBSCRIBE命令發(fā)送或訂閱消息。 通過(guò)destination標(biāo)記述消息應(yīng)由誰(shuí)來(lái)接收處理,形成了類似于MQ的發(fā)布訂閱機(jī)制。

STOMP的優(yōu)勢(shì)也非常明顯,即:
- 不需要?jiǎng)?chuàng)建自定義消息格式
- 我們可以使用現(xiàn)有的stomp.js客戶端
- 可以實(shí)現(xiàn)消息路由及廣播
- 可以使用第三方成熟的消息代理中間件,如RabbitMQ, ActiveMQ等
最重要的是,Spring STOMP 為我們提供了能夠像Spring MVC一樣的編程模型,減少了我們的學(xué)習(xí)成本。
下面將我們的DEMO稍作調(diào)整,使用Spring STOMP來(lái)實(shí)現(xiàn)消息推送,在本例中我們使用SimpleBroker模式,我們的應(yīng)用將會(huì)內(nèi)置一個(gè)STOMP Broker,將所有信息保存至內(nèi)存中。

具體代碼如下:
@Configuration
@EnableWebSocketMessageBroker // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp-notification")
.addInterceptors(httpSessionHandshakeInterceptor()) // (B)
.setHandshakeHandler(httpSessionHandshakeHandler()) // (C)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app") // (D)
.enableSimpleBroker("/topic", "/queue"); // (E)
}
@Bean
public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
return new HttpSessionHandshakeInterceptor();
}
@Bean
public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
return new HttpSessionHandshakeHandler();
}
}
- (A) 使用
@EnableWebSocketMessageBroker注解開(kāi)啟支持STOMP - (B) 創(chuàng)建一個(gè)攔截器,用于傳遞HTTP會(huì)話屬性到WebSocket會(huì)話。
- (C) 配置一個(gè)自定義的
HttpSessionHandshakeHandler,其主要作用是按sessionId標(biāo)記識(shí)別連接。 - (D) 設(shè)置消息處理器路由前綴,當(dāng)消息的
destination已/app開(kāi)頭時(shí),將會(huì)把該消息路由到server端的對(duì)應(yīng)的消息處理方法中。(在本例中無(wú)實(shí)際意義) - (E) 設(shè)置客戶端訂閱消息的路徑前綴
HttpSessionHandshakeHandler代碼如下:
public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
return new HttpSessionPrincipal(sessionId);
}
}
當(dāng)我們需要向client發(fā)送消息時(shí),只需要注入SimpMessagingTemplate對(duì)象即可,是不是感覺(jué)非常熟悉?! 沒(méi)錯(cuò),這種Template模式和我們?nèi)粘J褂玫?code>RestTemplate JDBCTemplate是一樣的。
我們只需要調(diào)用SimpMessagingTemplate的convertAndSendToUser()方法即可向?qū)?yīng)用戶發(fā)送消息了。
private void usesStompPush(String sessionid, String content) {
String destination = "/queue/download-notification";
messagingTemplate.convertAndSendToUser(sessionid, destination, content);
}
在瀏覽器端,client可以使用stomp.js和sockjs-client進(jìn)行如下連接:
usesStompNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-stomp-notification";
// 公共topic
// var notificationTopic = "/topic/download-notification";
// 點(diǎn)對(duì)點(diǎn)廣播
var notificationTopic = "/user/queue/download-notification"; // (A)
var socket = new SockJS(url);
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log("STOMP connection successful");
stompClient.subscribe(notificationTopic, function (msg) { // (B)
var jsonStr = msg.body;
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.stompMsg += 'STOMP 通知您:已下載完成' + percent + "%\r\n";
if (percent === 100) {
stompClient.disconnect()
}
});
}, function (error) {
console.log("STOMP protocol error " + error)
})
}
- (A) 如果想針對(duì)特定用戶接收消息,我們需要以
/user/為前綴,Spring STOMP會(huì)把以/user/為前綴的消息交給UserDestinationMessageHandler進(jìn)行處理并發(fā)給特定的用戶,當(dāng)然這個(gè)/user/是可以通過(guò)WebSocketBrokerConfig進(jìn)行個(gè)性化配置的,為了簡(jiǎn)單起見(jiàn),我們這里就使用默認(rèn)配置,所以我們的topic url就是/user/queue/download-notification。 - (B) 設(shè)置
stompClient消息處理callback進(jìn)行消息處理。
效果演示:

5 總結(jié)
在文中為大家簡(jiǎn)單講解了幾種常用的消息推送方案,并通過(guò)一個(gè)下載案例重點(diǎn)演示了SSE及WebSocket這兩種server push模式的消息推送。當(dāng)然還有很多細(xì)節(jié)并沒(méi)有在文中說(shuō)明,建議大家下載源碼對(duì)照參考。

相比較這幾種模式,小編認(rèn)為如果我們的需求僅僅是向客戶端推送消息,那么使用SSE的性價(jià)比更高一些,Long-Polling次之。使用WebSocket有一種殺雞用牛刀的感覺(jué),并且給我們系統(tǒng)也帶來(lái)了更多的復(fù)雜性,得不償失,所以不太推薦。而Polling雖然實(shí)現(xiàn)方式最簡(jiǎn)單且兼容性最強(qiáng),但是其效率過(guò)低,所以不建議使用。當(dāng)然如果您有其他見(jiàn)解,歡迎留言討論交流。
文中示例源碼:https://github.com/leven-space/SpringBootNotification.git
如果您覺(jué)得這篇文章有用,請(qǐng)留下您的小????,我是一枚Java小學(xué)生,歡迎大家吐槽留言。