一、為什么使用websocket?
假設客戶端需要感知服務端狀態(tài)發(fā)生的變化(例如股票的實時行情,火車票的剩余票數(shù)等等),按照傳統(tǒng)http的思路,是需要每隔一段時間去服務端詢問有沒有最新的數(shù)據(jù),服務端有變化的數(shù)據(jù)就在response返回。
最開始的做法,是Polling(短輪詢)。意思就是開個定時器,每隔(比如3s)一段時間向服務端發(fā)送一個http請求。這樣做存在一些缺陷,一是性能差,因為服務端狀態(tài)變化可能僅在某一時刻,在之后一段時間都不會變化,那么這些時間段發(fā)的請求都基本是無效的(沒有數(shù)據(jù))。二是實時性不夠好,假設http輪詢的時間間隔是3s,則有可能服務端狀態(tài)變化3s后才能感知到,而不是立刻(當然你輪詢時間間隔可以設置更短,然而這也意味著消耗更多的性能)。
后來,有人對Polling進行了改進,變成了LongPolling(長輪詢)。長輪詢和短輪詢的差別在于,短輪詢給服務端發(fā)送了一個請求后,如果沒有數(shù)據(jù)則立即返回null,而長輪詢的做法是向服務端發(fā)了一個請求后,如果沒有數(shù)據(jù)不立即返回,而是一直等待,如果在等待的過程中有數(shù)據(jù),則返回,則會響應超時,從而結束這個請求。這樣做相比于polling,請求的次數(shù)少了,但是仍然存在缺陷。缺陷一是,每次數(shù)據(jù)更新都要經(jīng)過客戶端發(fā)起請求,服務端回復響應這一來一會,實時性還是不夠好。缺陷二是,如果每次更新數(shù)據(jù)量較小,那么網(wǎng)絡利用率會很低,因為數(shù)據(jù)包包含的http頭部(General Headers + Request Headers)占比太大。
由于上述存在的問題,所以,后來推出了websocket協(xié)議。此協(xié)議基于tcp協(xié)議,能夠實現(xiàn)服務端和客戶端的雙向通信。握手部分采用了http協(xié)議,然后升級成websocket協(xié)議。下圖為wireshark抓取的websocket連接傳輸斷連的過程。


websocket握手時使用http協(xié)議,建立連接后數(shù)據(jù)交互使用數(shù)據(jù)幀,格式如下:

二、springboot項目中使用websocket
1.maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.創(chuàng)建handshake
package com.leaf.app.user.service.websocket;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import com.leaf.app.common.passport.dto.TokenDTO;
import com.leaf.app.user.service.passport.service.TokenService;
@Component
public class HandShake implements HandshakeInterceptor {
@Autowired
private TokenService tokenService;
private static Logger logger = LoggerFactory.getLogger(HandShake.class);
// 建立連接前要先對token鑒權
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
logger.debug("begin handshake, url: " + request.getURI());
Map<String, String> paramterMap = parseParameterMap(request.getURI().getQuery());
String token = paramterMap.get("token");
// 對token進行鑒權
TokenDTO tokenDTO = tokenService.resolveToken(token);
if (tokenService.checkToken(tokenDTO)) {
// 鑒權通過后,設置當前uid
attributes.put("uid", tokenDTO.getUid());
return true;
} else {
logger.debug("handshake auth failed, url :" + request.getURI());
return false;
}
}
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
logger.debug("**********afterHandshake************");
}
private Map<String, String> parseParameterMap(String queryString) {
Map<String, String> parameterMap = new HashMap<>();
String[] parameters = queryString.split("&");
for (String parameter : parameters) {
String[] paramPair = parameter.split("=");
if (paramPair.length == 2) {
parameterMap.put(paramPair[0], paramPair[1]);
}
}
return parameterMap;
}
}
注意,握手過程發(fā)過來的是http請求,從url請求參數(shù)解析token進行鑒權
- websocket 配置類
package com.leaf.app.user.service.websocket.config;
import javax.annotation.Resource;
import com.leaf.app.user.service.websocket.HandShake;
import com.leaf.app.user.service.websocket.MyWebSocketHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@EnableWebSocket
@Configuration
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
@Resource
MyWebSocketHandler handler;
@Autowired
HandShake handShake;
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/v1/ws").addInterceptors(handShake).setAllowedOrigins("*");
registry.addHandler(handler, "/v1/ws/sockjs").addInterceptors(handShake).setAllowedOrigins("*").withSockJS();
}
}
注: setAllowedOrigins主要是解決 websocket連接 403的問題
- websocket處理類
package com.leaf.app.user.service.websocket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import com.leaf.app.common.constant.VersionConstant;
import com.leaf.app.common.user.activity.UserActivityMessage;
import com.leaf.app.user.service.activity.service.ChatService;
import com.leaf.app.user.service.sync.dto.UnreadReminderDTO;
import com.leaf.app.user.service.sync.dto.UnreadReminderListDTO;
import com.leaf.app.user.service.sync.service.MessageService;
import com.leaf.app.user.service.utils.MixUtils;
import com.leaf.app.user.service.websocket.constant.MessageType;
import com.leaf.app.user.service.websocket.entity.ChatMessage;
import com.leaf.app.user.service.websocket.entity.Message;
import com.leaf.app.user.service.websocket.util.SpringContextUtil;
import com.leaf.shared.util.JSONUtils;
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
private SpringContextUtil springContextUtil;
private static final Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);
public static final Map<Long, WebSocketSession> userSocketSessionMap;
static {
userSocketSessionMap = new ConcurrentHashMap<Long, WebSocketSession>();
}
public static Map<Long, WebSocketSession> getUsersocketsessionmap() {
return userSocketSessionMap;
}
/**
* 建立連接后
*/
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Long uid = (Long) session.getAttributes().get("uid");
logger.debug("user id: "+ uid + " established websocket session.");
if (userSocketSessionMap.get(uid) == null) {
userSocketSessionMap.put(uid, session);
}
// 建立連接后推送未讀消息
MessageService messageService = springContextUtil.getBean(MessageService.class);
UnreadReminderListDTO unreadReminderListDTO = messageService.getUserUnreadReminders(uid);
for (UnreadReminderDTO unreadReminderDTO : unreadReminderListDTO.getData()) {
Message message = new Message();
message.setMessageId(unreadReminderDTO.getId());
message.setData(JSONUtils.toJSONString(unreadReminderDTO));
message.setTimestamp(unreadReminderDTO.getTimestamp());
message.setType(MessageType.UNREAD_REMINDER.getCode() + "");
message.setVersion(VersionConstant.V1);
session.sendMessage(new TextMessage(JSONUtils.toJSONString(message).getBytes()));
}
}
/**
* 消息處理,在客戶端通過Websocket API發(fā)送的消息會經(jīng)過這里,然后進行相應的處理
*/
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
// 如果是ping幀, 回復 pong幀
Long uid = (Long) session.getAttributes().get("uid");
if (message instanceof PingMessage) {
logger.debug("user :" + uid + " is keep alive");
ByteBuffer byteBuffer = ByteBuffer.wrap("OK".getBytes());
session.sendMessage(new PongMessage(byteBuffer));
} else {
String clientMessage = message.getPayload().toString();
if (message.getPayload().toString().length() == 0) {
return;
} else {
Message bizMessage = JSONUtils.parseObject(clientMessage, Message.class);
logger.debug("receive client data:" + clientMessage + ", message type:" + MessageType.getCHName(Integer.parseInt(bizMessage.getType())));
// TODO 根據(jù)message 類型處理業(yè)務
if (bizMessage.getType().equals(MessageType.CLIENT_ACK.getCode() + "")) {
MessageService messageService = springContextUtil.getBean(MessageService.class);
messageService.clearUnreadReminder(bizMessage.getMessageId());
}
if (bizMessage.getType().equals(MessageType.CHAT_MESSAGE.getCode() + "")) {
ChatMessage chatMessage = JSONUtils.parseObject(bizMessage.getData(), ChatMessage.class);
ChatService chatService = springContextUtil.getBean(ChatService.class);
UserActivityMessage userActivityMessage = new UserActivityMessage();
userActivityMessage.setFromUserId(Long.parseLong(chatMessage.getFromUserId()));
userActivityMessage.setToUserId(Long.parseLong(chatMessage.getToUserId()));
userActivityMessage.setOpType(10);
chatService.sendChatAPNSMessage(userActivityMessage);
// 返回服務端確認包
Message ackMessage = new Message();
ackMessage.setMessageId(bizMessage.getMessageId());
ackMessage.setType(MessageType.SERVER_ACK.getCode() + "");
Map<String, String> dataMap = new HashMap<>();
dataMap.put("originType", bizMessage.getType());
ackMessage.setData(JSONUtils.toJSONString(dataMap));
session.sendMessage(new TextMessage(JSONUtils.toJSONString(ackMessage).getBytes()));
}
// 發(fā)送接收ACK確認包
// session.sendMessage(new TextMessage(("server received data:" + clientMessage).getBytes()));
}
}
}
/**
* 消息傳輸錯誤處理
*/
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
long userId = (long) session.getAttributes().get("uid");
if (session.isOpen()) {
session.close();
}
Iterator<Entry<Long, WebSocketSession>> it = userSocketSessionMap.entrySet().iterator();
// 移除Socket會話
while (it.hasNext()) {
Entry<Long, WebSocketSession> entry = it.next();
if (entry.getValue().getId().equals(session.getId())) {
userSocketSessionMap.remove(entry.getKey());
logger.debug("user : " + userId + " has close websocket!");
break;
}
}
}
/**
* 關閉連接后
*/
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
Iterator<Entry<Long, WebSocketSession>> it = userSocketSessionMap.entrySet().iterator();
// 移除Socket會話
while (it.hasNext()) {
Entry<Long, WebSocketSession> entry = it.next();
if (entry.getValue().getId().equals(session.getId())) {
userSocketSessionMap.remove(entry.getKey());
break;
}
}
}
public boolean supportsPartialMessages() {
return false;
}
/**
* 給所有在線用戶發(fā)送消息
*
* @param message
* @throws IOException
*/
public void broadcast(final TextMessage message) throws IOException {
Iterator<Entry<Long, WebSocketSession>> it = userSocketSessionMap.entrySet().iterator();
// 多線程群發(fā)
while (it.hasNext()) {
final Entry<Long, WebSocketSession> entry = it.next();
if (entry.getValue().isOpen()) {
// entry.getValue().sendMessage(message);
new Thread(new Runnable() {
public void run() {
try {
if (entry.getValue().isOpen()) {
entry.getValue().sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
/**
* 給某個用戶發(fā)送消息
*
* @param uid
* @param message
* @throws IOException
*/
public void sendMessageToUser(Long uid, Message message) throws IOException {
WebSocketSession session = userSocketSessionMap.get(uid);
String hostName = MixUtils.getServerInfo().getHostName();
if (session != null && session.isOpen()) {
logger.debug("Found user :" + uid + " websocket session, server info:" + hostName + ", message:" + message.toString());
TextMessage textMessage = new TextMessage(JSONUtils.toJSONString(message));
session.sendMessage(textMessage);
} else {
logger.debug("Not found user :" + uid + " websocket session, server info:" + hostName + ", message:" + message.toString());
}
}
}
package com.leaf.app.user.service.websocket.util;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* 獲取spring容器,以訪問容器中定義的其他bean
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
// Spring應用上下文環(huán)境
@Autowired
private ApplicationContext applicationContext;
/**
* 實現(xiàn)ApplicationContextAware接口的回調(diào)方法,設置上下文環(huán)境
*
* @param applicationContext
*/
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
/**
* @return ApplicationContext
*/
public ApplicationContext getApplicationContext() {
return this.applicationContext;
}
/**
* 獲取對象 這里重寫了bean方法,起主要作用
*
* @param name
* @return Object 一個以所給名字注冊的bean的實例
* @throws BeansException
*/
public Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
}
public <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}
注:SpringContextUtil類獲取bean主要是解決循環(huán)依賴的問題
- 消息推送消費
// Copyright 2017 www.chinaleaf.net All rights reserved.
package com.leaf.app.user.service.websocket.consumer;
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.leaf.app.api.common.Constant;
import com.leaf.app.common.constant.VersionConstant;
import com.leaf.app.common.mq.MQReceiveMessage;
import com.leaf.app.common.mq.consumer.AbstractONSConsumer;
import com.leaf.app.common.mq.consumer.ConsumerContext;
import com.leaf.app.common.user.activity.UserActivityMessage;
import com.leaf.app.user.service.sync.dto.UnreadReminderDTO;
import com.leaf.app.user.service.sync.service.MessageService;
import com.leaf.app.user.service.websocket.MyWebSocketHandler;
import com.leaf.app.user.service.websocket.constant.MessageType;
import com.leaf.app.user.service.websocket.entity.Message;
import com.leaf.shared.util.JSONUtils;
/**
* @author xiao.xianming
* 2017年11月14日下午4:11:55
*/
@Component
public class WebSocketMessagePushConsumer extends AbstractONSConsumer {
@Value("${ons.consumer.cid.websocket.push}")
private String consumerId;
@Value("${ons.consumer.subscribe.topic.websocket.push}")
private String consumeTopic;
@Value("${ons.consumer.subscribe.topic.tags.websocket.push}")
private String consumeTopicTags;
@Autowired
private MyWebSocketHandler webSocketHandler;
@Autowired
private MessageService messageService;
@Override
protected boolean consumeMessage(MQReceiveMessage msg) {
UserActivityMessage userActivityMessage = JSONUtils.parseObject(msg.getMessageBody(),
UserActivityMessage.class);
// 如果是自己的活動(自己點贊),則不推送
if (userActivityMessage.getFromUserId() == userActivityMessage.getToUserId()) {
return true;
}
// 發(fā)websocket推送消息
UnreadReminderDTO unreadReminderDTO = messageService.convertUserActivityMessage2UnreadReminder(userActivityMessage);
if (unreadReminderDTO != null) {
Message message = new Message();
message.setMessageId(unreadReminderDTO.getId());
message.setType(MessageType.UNREAD_REMINDER.getCode() + "");
message.setTimestamp(unreadReminderDTO.getTimestamp());
message.setVersion(VersionConstant.V1);
message.setData(JSONUtils.toJSONString(unreadReminderDTO));
try {
this.webSocketHandler.sendMessageToUser(new Long(userActivityMessage.getToUserId()), message);
} catch (IOException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
return true;
}
@Override
protected void initConsumerContext() {
boolean flBroadcastConsume = true;
this.consumeContext = new ConsumerContext(consumerId, consumeTopic, consumeTopicTags, flBroadcastConsume);
}
}
6.nginx配置
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream,video/quicktime;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
upstream app_servers {
server 116.62.206.5:8083;
server 118.31.15.121:8083;
}
upstream websocket_servers {
server 116.62.206.5:5555;
server 118.31.15.121:5555;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
#gzip on;
server {
listen 80;
listen 443 ssl;
server_name www.chinaleaf.net;
ssl_certificate /usr/local/nginx/ssl/s.crt;
ssl_certificate_key /usr/local/nginx/ssl/nginx.key;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
location /v1 {
proxy_pass http://app_servers;
#proxy_http_version 1.1;
#proxy_set_header Upgrade $http_upgrade;
#proxy_set_header Connection $connection_upgrade;
}
location /v1/ws {
proxy_pass http://websocket_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#
#location ~ /\.ht {
# deny all;
#}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}
附:前后端交互注意事項
1.消息確認和去重,服務端推送給客戶端的消息,客戶端需要回復ACK,對于沒有ACK的消息,在每次websocket建立連接的時候會再次推送到客戶端,客戶端根據(jù)messageId對消息去重。
2.重接?;?,客戶端每隔20s會發(fā)一個ping幀,服務端回復一個pang幀,客戶端那邊根據(jù)結果判斷是否需要重連(每次斷網(wǎng)后再次聯(lián)網(wǎng)也會重新連接)
3.websocket認證, 服務端在handshake時候對websocket攜帶的token進行鑒權
4.websocket的集群,使用nginx作為客戶端代理入口
5.消息主動推送(點贊,關注等事件),通過ONS發(fā)送到各應用服務器,應用服務器廣播消費這條消息,如果user的websocketsession在本機,則推送此消息,否則不做任何動作。