基于netty 實現(xiàn) ws協(xié)議的 im 組件(一)

期望達成目標:

1.消息穩(wěn)定可靠

2.支持點對點消息

3.支持一對多消息

4.支持消息廣播

5.支持節(jié)點擴容

6.支持服務注冊發(fā)現(xiàn)

針對目標的思考:

1.消息穩(wěn)定可靠方面:

采用netty為網(wǎng)絡框架,實現(xiàn)websocket協(xié)議(長連接),如需要持久化消息,可將消息寫入數(shù)據(jù)庫,接收端進行消息確認。

2.點對點消息的支持

通過給 channel 綁定身份標識,消息體指定消息類型

3.支持一對多消息

通過拿到用戶所屬的組 ,將channel 加入ChannelGroup

4.支持廣播消息

單機模式,通過 channel 的 map,獲取所有的 channel 進行廣播

多節(jié)點模式,先將消息發(fā)送的直接發(fā)布訂閱的中間件,每臺服務收到中間件的廣播,將消息發(fā)送到當前節(jié)點的所有channel

5.節(jié)點擴容

 采用消息中間件的發(fā)布訂閱模式,將收到的消息先投遞到消息中間件,服務節(jié)點通過消費中間件消息,進行當前節(jié)點消息的轉(zhuǎn)發(fā)

6.支持服務注冊發(fā)現(xiàn)

 采用 springcloud 進行服務的注冊與發(fā)現(xiàn)(即通過springcloud 服務注冊相關(guān)的實現(xiàn),Nacos,eurake等)

具體實現(xiàn):

1.基于消息code進行業(yè)務處理的事件機制,服務啟動時會從spring上下文中拿到 CmdProcess 實現(xiàn),通過 code 進行 事件的分發(fā)

public interface CmdProcess {

    /**

     * 消息接收處理

     * @param message 消息體

     * @param channel 上下文

     * @return 響應消息體(無響應,則返回null)

     */

    Message handler(Message message, Channel channel);

    /**

     * 設置命令碼(此處的命令碼,需要和消息包對應上)

     * @return 命令碼

     */

    Byte getCmdCode();

}
image.gif

2.連接的安全認證機制(第一次握手執(zhí)行的事件),提供了簡單的實現(xiàn),通過具體業(yè)務定義自己的認證實現(xiàn)

public abstract class AuthProcess {

    /**

     * 登錄事件

     * @param username 用戶名

     * @param password 用戶密碼

     * @return IM用戶對象

     */

    public abstract ImSession login(String username, String password);

}

image.gif

3.channel 建立連接生命周期的執(zhí)行的事件(客戶端連接成功的建立,客戶端連接的斷開)

public interface LifeCycleEvent {

    /**

     * 綁定通道上下文

     * @param login

     * @param ctx

     */

    void bindContext(ImSession login, ChannelHandlerContext ctx);

    /**

     * 解綁通道上下文

     * @param channel

     */

    void cleanContext(Channel channel);

}
image.gif

4.多節(jié)點部署消息分發(fā)接口的預留(通過參數(shù)設置,是否采用多節(jié)點模式)

public interface ImClusterTopic {

    /**

     * 發(fā)布消息

     */

    void publish(ClusterMessage message);

    /**

     * 訂閱消息(采用廣播模式)

     */

    void consumer();

}

image.gif

5.消息分發(fā)的工具類

package com.awy.common.ws.netty.toolkit;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.awy.common.message.api.packets.Message;
import com.awy.common.ws.netty.cluster.ImClusterTopic;
import com.awy.common.ws.netty.context.GlobalContent;
import com.awy.common.ws.netty.context.SessionContext;
import com.awy.common.ws.netty.config.ImConfig;
import com.awy.common.ws.netty.model.ClusterMessage;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Locale;
import java.util.Map;

@Slf4j
public class ImSendUtil {

    /**
     * 發(fā)送給指定用戶
     * @param userId 用戶id
     * @param message 消息體
     */
    public static void sendUser(String userId, Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.chatMessage(userId,message));
        }else {
            sendCurrentNodeUser(userId,message);
        }
    }

    /**
     * 發(fā)送給指定用戶列表
     * @param userIds 用戶id列表
     * @param message 消息體
     */
    public static void sendUsers(List<String> userIds,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.chatsMessage(userIds,message));
        }else {
            sendCurrentNodeUsers(userIds,message);
        }
    }

    /**
     * 發(fā)送指定群組
     * @param groupId 群組id
     * @param message 消息體
     */
    public static void sendGroup(String groupId,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.groupMessage(groupId,message));
        }else {
            sendCurrentNodeGroup(groupId,message);
        }
    }

    /**
     * 發(fā)送指定群組列表
     * @param groupIds 群組列表
     * @param message 消息體
     */
    public static void sendGroups(List<String> groupIds,Message message){
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.groupsMessage(groupIds,message));
        }else {
            sendCurrentNodeGroups(groupIds,message);
        }
    }

    /**
     * 發(fā)送給全部用戶
     * @param message
     */
    public static void sendAll(Message message){
        //if cluster
        if(isCluster()){
            getImClusterTopic().publish(ClusterMessage.noStateMessage(message));
        }else {
            //if standalone
            sendCurrentNodeAllChannel(message);
        }
    }

    /**
     * 發(fā)送給當前節(jié)點指定用戶
     * @param userId 用戶id
     * @param message 消息
     */
    public static void sendCurrentNodeUser(String userId,Message message){
        Channel channel = SessionContext.getChannel(userId);
        if(channel != null){
            channel.writeAndFlush(getMessage(message));
        }
    }

    /**
     * 發(fā)送給當前節(jié)點指定用戶列表
     * @param userIds 用戶id 列表
     * @param message 消息體
     */
    public static void sendCurrentNodeUsers(List<String> userIds,Message message){
        if(CollUtil.isNotEmpty(userIds)){
            if(userIds.size() == 1){
                sendCurrentNodeUser(userIds.get(0),message);
            }else {
                for (String userId : userIds) {
                    sendCurrentNodeUser(userId,message);
                }
            }
        }
    }

    /**
     * 發(fā)送給當前節(jié)點指定群組
     * @param groupId
     * @param message
     */
    public static void sendCurrentNodeGroup(String groupId,Message message){
        ChannelGroup channelGroup = SessionContext.getChannelGroup(groupId);
        if(channelGroup != null){
            channelGroup.writeAndFlush(getMessage(message));
        }
    }

    /**
     * 發(fā)送給當前節(jié)點指定群組列表
     * @param groupIds 群組id列表
     * @param message 消息
     */
    public static void sendCurrentNodeGroups(List<String> groupIds, Message message){
        if(CollUtil.isNotEmpty(groupIds)){
            if(groupIds.size() == 1){
                sendCurrentNodeGroup(groupIds.get(0),message);
            }else {
                for (String groupId : groupIds) {
                    sendCurrentNodeGroup(groupId,message);
                }
            }
        }
    }

    /**
     * 發(fā)送給當前節(jié)點所有用戶
     * @param message 消息
     */
    public static void sendCurrentNodeAllChannel(Message message){
        for (Map.Entry<String, Channel> entry : SessionContext.getAllChannel().entrySet()){
            entry.getValue().writeAndFlush(getMessage(message));
        }
    }

    /**
     * 是否多節(jié)點
     * @return
     */
    private static boolean isCluster(){
        return ImConfig.getImConfig().getPropertiesConfig().isCluster();
    }

    /**
     * 獲取節(jié)點推送主題
     * @return
     */
    private static ImClusterTopic getImClusterTopic(){
        return GlobalContent.getInstance().getImClusterTopic();
    }

    /**
     * 獲取消息體
     * @param message 消息體
     * @return webSocket 消息體
     */
    public static TextWebSocketFrame getMessage(Message message){
        if(message == null){
            log.error(">>>>>>>>>>>> message can not be empty ");
            return null;
        }
        String result = JSONUtil.toJsonStr(message);
        return new TextWebSocketFrame(result);
    }

}
image.gif

6.webSocket服務器的實現(xiàn)

public class WebSocketServer {

    /**

     * 是否啟用ssl

     */

    private boolean ssl = false;

    //監(jiān)聽端口

    private int port;

    //ws 前綴

    private String websocketPath;

    private ServerBootstrap serverBootstrap;

    private NioEventLoopGroup boss;

    private NioEventLoopGroup work;

    private WebSocketServer(){}

    public WebSocketServer(int port, String websocketPath, boolean ssl, AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){

        this.ssl = ssl;

        this.port = port;

        this.websocketPath = websocketPath;

        //設置全局上下文

        if(authProcess == null){

            authProcess = new SimpleAuthProcess();

        }

        if( lifeCycleEvent == null){

            lifeCycleEvent = new SimpleLifeCycleEvent();

        }

        GlobalContent globalContent = GlobalContent.getInstance();

        globalContent.setAuthProcess(authProcess);

        globalContent.setImClusterTopic(imClusterTopic);

        globalContent.setLifeCycleEvent(lifeCycleEvent);

        //主從 react 模型

        serverBootstrap = new ServerBootstrap();

        boss = new NioEventLoopGroup(1);

        work = new NioEventLoopGroup();

        serverBootstrap.group(boss,work)

                .channel(NioServerSocketChannel.class)

                .childHandler(new WebSocketServerInitializer(getSslContext(),websocketPath));

    }

    private SslContext getSslContext(){

        SslContext sslCtx = null;

        try{

            if (this.ssl) {

                SelfSignedCertificate ssc = new SelfSignedCertificate();

                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

            }

        }catch (Exception e){

            e.printStackTrace();

        }

        return sslCtx;

    }

    public void start(){

        try {

            Channel ch = serverBootstrap.bind(port).sync().channel();

            log.info("Open your web browser and navigate to " +

                    (ssl ? "https" : "http") + "://127.0.0.1:" + port + "" + websocketPath);

        } catch (Exception e){

            e.printStackTrace();

        }

    }

    public void stop(){

        boss.shutdownGracefully();

        work.shutdownGracefully();

    }

}
image.gif

7.spring引導類

public class NettyWebSocketStarter {

    private Integer port;

    private String websocketPath;

    /**
     * 是否啟用ssl
     */
    private boolean ssl = false;

    private WebSocketServer server;

    /**
     * 認證處理器
     */
    private AuthProcess authProcess;

    private LifeCycleEvent lifeCycleEvent;

    private ImClusterTopic imClusterTopic;

    private NettyWebSocketStarter(){}

    public NettyWebSocketStarter(AuthProcess authProcess){
        this(authProcess,null,null);
    }

    public NettyWebSocketStarter(AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
        this.authProcess = authProcess;
        this.lifeCycleEvent = lifeCycleEvent;
        this.imClusterTopic = imClusterTopic;
    }

    @PostConstruct
    public void init(){
        setAttributes(ImConfig.getImConfig().getPropertiesConfig());
        initCmdProcess();
        server = new WebSocketServer(port,websocketPath,ssl,authProcess,lifeCycleEvent,imClusterTopic);
        server.start();
        registerDiscovery(ImConfig.getImConfig().getPropertiesConfig());
    }

    private void setAttributes(ImPropertiesConfig propertiesConfig){
        String packetScanPath = "";
        if(propertiesConfig == null){
            log.error(">>>>>>>> im config prefix: im.ws ");
            log.error(">>>>>>>> im config attributes can not by empty ");
            System.exit(0);
        }

        if(propertiesConfig.isCluster() && this.imClusterTopic == null){
            log.error(">>>>>>>> not allowed  when Cluster model imClusterTopic is null");
            System.exit(0);
        }

        this.port = propertiesConfig.getPort();
        if(this.port == null){
            this.port = getPort();
        }
        this.websocketPath = propertiesConfig.getWebsocketPath();
        if(this.websocketPath == null || this.websocketPath.isEmpty()){
            this.websocketPath = ImCommonConstant.DEFAULT_WEBSOCKET_PATH;
        }

        this.ssl = propertiesConfig.isSsl();
        packetScanPath = propertiesConfig.getPacketScan();
        if(packetScanPath == null || packetScanPath.isEmpty()){
            log.error(">>>>>>>> im.ws.packetScan can not by empty ");
            System.exit(0);
        }
        initPacket(packetScanPath);

    }

    private void registerDiscovery(ImPropertiesConfig propertiesConfig){
        if(propertiesConfig.isRegister()){
            Map<String, AbstractAutoServiceRegistration> serviceRegistrationMap = getApplicationContext().getBeansOfType(AbstractAutoServiceRegistration.class);
            for (Map.Entry<String,AbstractAutoServiceRegistration>  registrationEntry : serviceRegistrationMap.entrySet()){
                registrationEntry.getValue().start();
            }

        }
    }

    private int getPort(){
        int defaultPort = 8888;
        return getPort(defaultPort);
    }

    private int getPort(int port){
        ServerSocket socket = null;
        try{
            socket = new ServerSocket(port);
        }catch (IOException e){
            ++port;
            return getPort(port);
        }finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    log.error("");
                }
            }
        }
        return port;
    }

    private void initCmdProcess(){
        try{
            List<CmdProcess> list = new ArrayList<>();

            String[] beanDefinitionNames = getApplicationContext().getBeanDefinitionNames();
            Stream.of(beanDefinitionNames).forEach(beanName -> {
                Object bean = getApplicationContext().getBean(beanName);
                if(bean instanceof CmdProcess){
                    list.add((CmdProcess)bean);
                }
            });

            ProcessManager.getInstance().addCmdProcessList(list);
            log.info("init im process repository success ! count [" + list.size() + "]");
        }catch (Exception e){
            log.error("nit im process repository error",e);
            System.exit(0);
        }
    }

    private  ApplicationContext getApplicationContext(){
        return ImConfig.getImConfig().getApplicationContext();
    }

    private void initPacket(String packetScanPath){
        try{
            Set<Class<?>> set = ClassUtil.scanPackage(packetScanPath);
            Object obj;
            List<Message> list = new ArrayList<>();
            if(CollectionUtil.isNotEmpty(set)){
                for (Class clazz : set) {
                    obj = ReflectUtil.newInstance(clazz);
                    if(obj instanceof Message){
                        list.add((Message) obj);
                    }
                }
            }

            MessageManager.getInstance().addMessages(list);
            log.info("init IM packet repository success ! count [" + list.size() + "]");
        }catch (Exception e){
            log.error("init packet repository error",e);
            System.exit(0);
        }
    }

    @PreDestroy
    public void stop(){
        server.stop();
    }
}
image.gif

說明:當前實現(xiàn)需要依賴spring,有好的建議歡迎大家提出,指正,最后貼出代碼地址

github地址: https://github.com/awyFamily/awy-common-all/tree/master/common-ws-netty

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容