期望達成目標:
1.消息穩(wěn)定可靠
2.支持點對點消息
3.支持一對多消息
4.支持消息廣播
5.支持節(jié)點擴容
6.支持服務注冊發(fā)現(xiàn)
針對目標的思考:
1.消息穩(wěn)定可靠方面:
采用netty為網(wǎng)絡框架,實現(xiàn)websocket協(xié)議(長連接),如需要持久化消息,可將消息寫入數(shù)據(jù)庫,接收端進行消息確認。
2.點對點消息的支持
通過給 channel 綁定身份標識,消息體指定消息類型
3.支持一對多消息
通過拿到用戶所屬的組 ,將channel 加入ChannelGroup
4.支持廣播消息
單機模式,通過 channel 的 map,獲取所有的 channel 進行廣播
多節(jié)點模式,先將消息發(fā)送的直接發(fā)布訂閱的中間件,每臺服務收到中間件的廣播,將消息發(fā)送到當前節(jié)點的所有channel
5.節(jié)點擴容
采用消息中間件的發(fā)布訂閱模式,將收到的消息先投遞到消息中間件,服務節(jié)點通過消費中間件消息,進行當前節(jié)點消息的轉(zhuǎn)發(fā)
6.支持服務注冊發(fā)現(xiàn)
采用 springcloud 進行服務的注冊與發(fā)現(xiàn)(即通過springcloud 服務注冊相關(guān)的實現(xiàn),Nacos,eurake等)
具體實現(xiàn):
1.基于消息code進行業(yè)務處理的事件機制,服務啟動時會從spring上下文中拿到 CmdProcess 實現(xiàn),通過 code 進行 事件的分發(fā)
public interface CmdProcess {
/**
* 消息接收處理
* @param message 消息體
* @param channel 上下文
* @return 響應消息體(無響應,則返回null)
*/
Message handler(Message message, Channel channel);
/**
* 設置命令碼(此處的命令碼,需要和消息包對應上)
* @return 命令碼
*/
Byte getCmdCode();
}

image.gif
2.連接的安全認證機制(第一次握手執(zhí)行的事件),提供了簡單的實現(xiàn),通過具體業(yè)務定義自己的認證實現(xiàn)
public abstract class AuthProcess {
/**
* 登錄事件
* @param username 用戶名
* @param password 用戶密碼
* @return IM用戶對象
*/
public abstract ImSession login(String username, String password);
}

image.gif
3.channel 建立連接生命周期的執(zhí)行的事件(客戶端連接成功的建立,客戶端連接的斷開)
public interface LifeCycleEvent {
/**
* 綁定通道上下文
* @param login
* @param ctx
*/
void bindContext(ImSession login, ChannelHandlerContext ctx);
/**
* 解綁通道上下文
* @param channel
*/
void cleanContext(Channel channel);
}

image.gif
4.多節(jié)點部署消息分發(fā)接口的預留(通過參數(shù)設置,是否采用多節(jié)點模式)
public interface ImClusterTopic {
/**
* 發(fā)布消息
*/
void publish(ClusterMessage message);
/**
* 訂閱消息(采用廣播模式)
*/
void consumer();
}

image.gif
5.消息分發(fā)的工具類
package com.awy.common.ws.netty.toolkit;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.awy.common.message.api.packets.Message;
import com.awy.common.ws.netty.cluster.ImClusterTopic;
import com.awy.common.ws.netty.context.GlobalContent;
import com.awy.common.ws.netty.context.SessionContext;
import com.awy.common.ws.netty.config.ImConfig;
import com.awy.common.ws.netty.model.ClusterMessage;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Slf4j
public class ImSendUtil {
/**
* 發(fā)送給指定用戶
* @param userId 用戶id
* @param message 消息體
*/
public static void sendUser(String userId, Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.chatMessage(userId,message));
}else {
sendCurrentNodeUser(userId,message);
}
}
/**
* 發(fā)送給指定用戶列表
* @param userIds 用戶id列表
* @param message 消息體
*/
public static void sendUsers(List<String> userIds,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.chatsMessage(userIds,message));
}else {
sendCurrentNodeUsers(userIds,message);
}
}
/**
* 發(fā)送指定群組
* @param groupId 群組id
* @param message 消息體
*/
public static void sendGroup(String groupId,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.groupMessage(groupId,message));
}else {
sendCurrentNodeGroup(groupId,message);
}
}
/**
* 發(fā)送指定群組列表
* @param groupIds 群組列表
* @param message 消息體
*/
public static void sendGroups(List<String> groupIds,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.groupsMessage(groupIds,message));
}else {
sendCurrentNodeGroups(groupIds,message);
}
}
/**
* 發(fā)送給全部用戶
* @param message
*/
public static void sendAll(Message message){
//if cluster
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.noStateMessage(message));
}else {
//if standalone
sendCurrentNodeAllChannel(message);
}
}
/**
* 發(fā)送給當前節(jié)點指定用戶
* @param userId 用戶id
* @param message 消息
*/
public static void sendCurrentNodeUser(String userId,Message message){
Channel channel = SessionContext.getChannel(userId);
if(channel != null){
channel.writeAndFlush(getMessage(message));
}
}
/**
* 發(fā)送給當前節(jié)點指定用戶列表
* @param userIds 用戶id 列表
* @param message 消息體
*/
public static void sendCurrentNodeUsers(List<String> userIds,Message message){
if(CollUtil.isNotEmpty(userIds)){
if(userIds.size() == 1){
sendCurrentNodeUser(userIds.get(0),message);
}else {
for (String userId : userIds) {
sendCurrentNodeUser(userId,message);
}
}
}
}
/**
* 發(fā)送給當前節(jié)點指定群組
* @param groupId
* @param message
*/
public static void sendCurrentNodeGroup(String groupId,Message message){
ChannelGroup channelGroup = SessionContext.getChannelGroup(groupId);
if(channelGroup != null){
channelGroup.writeAndFlush(getMessage(message));
}
}
/**
* 發(fā)送給當前節(jié)點指定群組列表
* @param groupIds 群組id列表
* @param message 消息
*/
public static void sendCurrentNodeGroups(List<String> groupIds, Message message){
if(CollUtil.isNotEmpty(groupIds)){
if(groupIds.size() == 1){
sendCurrentNodeGroup(groupIds.get(0),message);
}else {
for (String groupId : groupIds) {
sendCurrentNodeGroup(groupId,message);
}
}
}
}
/**
* 發(fā)送給當前節(jié)點所有用戶
* @param message 消息
*/
public static void sendCurrentNodeAllChannel(Message message){
for (Map.Entry<String, Channel> entry : SessionContext.getAllChannel().entrySet()){
entry.getValue().writeAndFlush(getMessage(message));
}
}
/**
* 是否多節(jié)點
* @return
*/
private static boolean isCluster(){
return ImConfig.getImConfig().getPropertiesConfig().isCluster();
}
/**
* 獲取節(jié)點推送主題
* @return
*/
private static ImClusterTopic getImClusterTopic(){
return GlobalContent.getInstance().getImClusterTopic();
}
/**
* 獲取消息體
* @param message 消息體
* @return webSocket 消息體
*/
public static TextWebSocketFrame getMessage(Message message){
if(message == null){
log.error(">>>>>>>>>>>> message can not be empty ");
return null;
}
String result = JSONUtil.toJsonStr(message);
return new TextWebSocketFrame(result);
}
}

image.gif
6.webSocket服務器的實現(xiàn)
public class WebSocketServer {
/**
* 是否啟用ssl
*/
private boolean ssl = false;
//監(jiān)聽端口
private int port;
//ws 前綴
private String websocketPath;
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup boss;
private NioEventLoopGroup work;
private WebSocketServer(){}
public WebSocketServer(int port, String websocketPath, boolean ssl, AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
this.ssl = ssl;
this.port = port;
this.websocketPath = websocketPath;
//設置全局上下文
if(authProcess == null){
authProcess = new SimpleAuthProcess();
}
if( lifeCycleEvent == null){
lifeCycleEvent = new SimpleLifeCycleEvent();
}
GlobalContent globalContent = GlobalContent.getInstance();
globalContent.setAuthProcess(authProcess);
globalContent.setImClusterTopic(imClusterTopic);
globalContent.setLifeCycleEvent(lifeCycleEvent);
//主從 react 模型
serverBootstrap = new ServerBootstrap();
boss = new NioEventLoopGroup(1);
work = new NioEventLoopGroup();
serverBootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(getSslContext(),websocketPath));
}
private SslContext getSslContext(){
SslContext sslCtx = null;
try{
if (this.ssl) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
}
}catch (Exception e){
e.printStackTrace();
}
return sslCtx;
}
public void start(){
try {
Channel ch = serverBootstrap.bind(port).sync().channel();
log.info("Open your web browser and navigate to " +
(ssl ? "https" : "http") + "://127.0.0.1:" + port + "" + websocketPath);
} catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
boss.shutdownGracefully();
work.shutdownGracefully();
}
}

image.gif
7.spring引導類
public class NettyWebSocketStarter {
private Integer port;
private String websocketPath;
/**
* 是否啟用ssl
*/
private boolean ssl = false;
private WebSocketServer server;
/**
* 認證處理器
*/
private AuthProcess authProcess;
private LifeCycleEvent lifeCycleEvent;
private ImClusterTopic imClusterTopic;
private NettyWebSocketStarter(){}
public NettyWebSocketStarter(AuthProcess authProcess){
this(authProcess,null,null);
}
public NettyWebSocketStarter(AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
this.authProcess = authProcess;
this.lifeCycleEvent = lifeCycleEvent;
this.imClusterTopic = imClusterTopic;
}
@PostConstruct
public void init(){
setAttributes(ImConfig.getImConfig().getPropertiesConfig());
initCmdProcess();
server = new WebSocketServer(port,websocketPath,ssl,authProcess,lifeCycleEvent,imClusterTopic);
server.start();
registerDiscovery(ImConfig.getImConfig().getPropertiesConfig());
}
private void setAttributes(ImPropertiesConfig propertiesConfig){
String packetScanPath = "";
if(propertiesConfig == null){
log.error(">>>>>>>> im config prefix: im.ws ");
log.error(">>>>>>>> im config attributes can not by empty ");
System.exit(0);
}
if(propertiesConfig.isCluster() && this.imClusterTopic == null){
log.error(">>>>>>>> not allowed when Cluster model imClusterTopic is null");
System.exit(0);
}
this.port = propertiesConfig.getPort();
if(this.port == null){
this.port = getPort();
}
this.websocketPath = propertiesConfig.getWebsocketPath();
if(this.websocketPath == null || this.websocketPath.isEmpty()){
this.websocketPath = ImCommonConstant.DEFAULT_WEBSOCKET_PATH;
}
this.ssl = propertiesConfig.isSsl();
packetScanPath = propertiesConfig.getPacketScan();
if(packetScanPath == null || packetScanPath.isEmpty()){
log.error(">>>>>>>> im.ws.packetScan can not by empty ");
System.exit(0);
}
initPacket(packetScanPath);
}
private void registerDiscovery(ImPropertiesConfig propertiesConfig){
if(propertiesConfig.isRegister()){
Map<String, AbstractAutoServiceRegistration> serviceRegistrationMap = getApplicationContext().getBeansOfType(AbstractAutoServiceRegistration.class);
for (Map.Entry<String,AbstractAutoServiceRegistration> registrationEntry : serviceRegistrationMap.entrySet()){
registrationEntry.getValue().start();
}
}
}
private int getPort(){
int defaultPort = 8888;
return getPort(defaultPort);
}
private int getPort(int port){
ServerSocket socket = null;
try{
socket = new ServerSocket(port);
}catch (IOException e){
++port;
return getPort(port);
}finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
log.error("");
}
}
}
return port;
}
private void initCmdProcess(){
try{
List<CmdProcess> list = new ArrayList<>();
String[] beanDefinitionNames = getApplicationContext().getBeanDefinitionNames();
Stream.of(beanDefinitionNames).forEach(beanName -> {
Object bean = getApplicationContext().getBean(beanName);
if(bean instanceof CmdProcess){
list.add((CmdProcess)bean);
}
});
ProcessManager.getInstance().addCmdProcessList(list);
log.info("init im process repository success ! count [" + list.size() + "]");
}catch (Exception e){
log.error("nit im process repository error",e);
System.exit(0);
}
}
private ApplicationContext getApplicationContext(){
return ImConfig.getImConfig().getApplicationContext();
}
private void initPacket(String packetScanPath){
try{
Set<Class<?>> set = ClassUtil.scanPackage(packetScanPath);
Object obj;
List<Message> list = new ArrayList<>();
if(CollectionUtil.isNotEmpty(set)){
for (Class clazz : set) {
obj = ReflectUtil.newInstance(clazz);
if(obj instanceof Message){
list.add((Message) obj);
}
}
}
MessageManager.getInstance().addMessages(list);
log.info("init IM packet repository success ! count [" + list.size() + "]");
}catch (Exception e){
log.error("init packet repository error",e);
System.exit(0);
}
}
@PreDestroy
public void stop(){
server.stop();
}
}

image.gif
說明:當前實現(xiàn)需要依賴spring,有好的建議歡迎大家提出,指正,最后貼出代碼地址
github地址: https://github.com/awyFamily/awy-common-all/tree/master/common-ws-netty