springboot整合netty之簡單使用

項目介紹
  1. 簡單介紹: 我在實際的工作之中做過的項目闡述這個項目。
    netty client,netty server,other server主要圍繞這三個服務(wù)之間的邏輯處理。
    邏輯圖:


    設(shè)備工作流程.png
  2. 主要以netty server為例講解
    項目中主要解決了一下問題:
  • 在netty server的核心業(yè)務(wù)處理類中處理無法注入bean的問題
  • 提供netty連接通道channel在分布式下的共享服務(wù)思路·后續(xù)更新具體實現(xiàn)方案
netty和springboot的整合
  1. pom.xml 文件

在SpringBoot項目里添加netty的依賴,注意不要用netty5的依賴,因為已經(jīng)廢棄了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.netty.vr</groupId>
    <artifactId>centre</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>centre</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.61</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.bytebuddy</groupId>
            <artifactId>byte-buddy</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.20.1</version>
                <configuration>
                    <!-- 不指定單元測試 -->
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  1. yml配置文件

將端口和IP寫入application.yml文件里,本機測試,用127.0.0.1就ok。即使在服務(wù)器上部署也建議127.0.0.1,然后用nginx配置域名代理

netty:
  port: 8888
  url: 127.0.0.1
  1. netty服務(wù)啟動引導(dǎo)類
package com.daoyin.vr.centre.component;

import com.daoyin.vr.centre.netty.MyWebSocketChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
 * netty程序的入口,負責啟動應(yīng)用
 * @author Mr培
 */
@Component
public class NettyServer {
    public void start(InetSocketAddress address){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup, workGroup);
            server.channel(NioServerSocketChannel.class);
            server.childHandler(new MyWebSocketChannelHandler());
            System.out.println("服務(wù)端開啟等待客戶端連接....");
            Channel ch = server.bind(address).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            //優(yōu)雅的退出程序
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
  1. 啟動main方法,CommandLineRunner #run()
    這里主要是通過CommandLineRunner 接口的run方法,實現(xiàn)在項目啟動后執(zhí)行的功能,SpringBoot提供的一種簡單的實現(xiàn)方案就是添加一個model并實現(xiàn)CommandLineRunner接口,實現(xiàn)功能的代碼放在實現(xiàn)的run方法中。
    當然還有其他的啟動方法,比如
  • 利用 ApplicationListener 上下文監(jiān)聽器
  • 注解@PostConstruct
  • 利用監(jiān)聽器啟動
    這里以CommandLineRunner為例
package com.daoyin.vr.centre;

import com.daoyin.vr.centre.component.NettyServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.net.InetSocketAddress;

/**
 * 實現(xiàn) CommandLineRunner 執(zhí)行額外任務(wù)
 * @author Mr培
 */
@SpringBootApplication
public class CentreApplication implements CommandLineRunner {

    @Value("${netty.port}")
    private int port;

    @Value("${netty.url}")
    private String url;

    /**
     * final修飾 + 構(gòu)造器 ≌ @Autowired 注解
     * */
    private final NettyServer server;

    public CentreApplication(NettyServer server) {
        this.server = server;
    }
    public static void main(String[] args) {
        SpringApplication.run(CentreApplication.class, args);
    }
    /**
     * 服務(wù)開始則啟動netty server
     * */
    @Override
    public void run(String... args){
        InetSocketAddress address = new InetSocketAddress(url,port);
        server.start(address);
    }
}
  1. 存儲整個工程的全局配置

可根據(jù)個人的不同業(yè)務(wù)邏輯自行實現(xiàn)業(yè)務(wù)方法

package com.daoyin.vr.centre.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

/**
* 存儲整個工程的全局配置
* @author rp
*
*/
public class NettyConfig {
   
   /**
    * 存儲每一個客戶端接入進來時的channel對象
    */
   public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   /**
    * 儲存每一個客戶端接入進來時的channel鍵值對 對象
    * */
   public static ConcurrentHashMap<String, ChannelId> cannelMap = new ConcurrentHashMap<>();
   /**
    * vr設(shè)備狀態(tài)儲存
    * 0 未占用  1 占用
    * */
   public static ConcurrentHashMap<String, Integer> vrStatusMap = new ConcurrentHashMap<>();

   /**
    * 添加設(shè)備和狀態(tài)
    * */
   public static void addStatus(String key,Integer status){
       vrStatusMap.put(key,status);
   }

   /**
    * 查詢設(shè)備狀態(tài)
    * */
   public static Integer findStatus(String key){
       return vrStatusMap.get(key);
   }

   /**
    * 儲存netty客戶端channel管道
    * */
   public static void addChannel(String key,Channel channel){
       group.add(channel);
       cannelMap.put(key,channel.id());
   }
   /**
   * 查詢客戶端channel管道
   * */
   public static Channel findChannel(String key){
       return group.find(cannelMap.get(key));
   }

   /**
    * 移除客戶端channel管道
    * */
   public static void removeChannel(Channel channel){
       NettyConfig.group.remove(channel);
   }

   /**
    * 根據(jù)設(shè)備標識找到客戶端channel并發(fā)送消息
    * */
   public static void send(String key,String value){
       findChannel(key).writeAndFlush(new TextWebSocketFrame(value));
   }

   /**
    * 根據(jù)客戶端channel直接發(fā)送消息
    * */
   public static void send(Channel channel,String value){
       channel.writeAndFlush(new TextWebSocketFrame(value));
   }

   /**
    * 根據(jù)設(shè)備標識移除客戶端channel
    * */
   public static void removeChannel(String key,Channel channel){
       group.remove(channel);
       cannelMap.remove(key);
   }

   /**
    * 群發(fā)消息
    * */
   public static void sendAll(String value){
       group.writeAndFlush(new TextWebSocketFrame(value));
   }
}
  1. 初始化連接時候的各個組件
package com.daoyin.vr.centre.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 初始化連接時候的各個組件
 * @author Mr培
 *
 */
public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel){
        channel.pipeline().addLast("http-codec", new HttpServerCodec());
        channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        channel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        channel.pipeline().addLast("handler", new MyWebSocketHandler());
    }

}
  1. 接收/處理/響應(yīng)客戶端websocket請求的核心業(yè)務(wù)處理類
    服務(wù)端業(yè)務(wù)處理handler
package com.daoyin.vr.centre.netty;

import com.daoyin.vr.centre.service.VrChannelService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.*;

/**
 * 接收/處理/響應(yīng)客戶端websocket請求的核心業(yè)務(wù)處理類
 * @author rp
 */
@Component
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handShaker;

    private static MyWebSocketHandler myWebSocketHandler;

    @Autowired
    private VrChannelService vrChannelService;

    /**
     * 解決無法注入問題
     * */
    @PostConstruct
    public void init() {
        myWebSocketHandler = this;
    }

    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";

    /**
     * 客戶端與服務(wù)端創(chuàng)建連接的時候調(diào)用
     * 使用單線程池在第一次連接成功發(fā)送消息到客戶端·不要問為什么要用線程發(fā)送消息
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(()->NettyConfig.send(ctx.channel(),"OK"));
        executorService.shutdown();
        executorService.awaitTermination(0L, TimeUnit.MILLISECONDS);
        System.out.println("客戶端與服務(wù)端連接開啟...");
    }

    /**
     * 客戶端與服務(wù)端斷開連接的時候調(diào)用
     * */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyConfig.removeChannel(ctx.channel());
        myWebSocketHandler.vrChannelService.updateStatusByChannelId(2,ctx.channel().id().toString());
        System.out.println("客戶端與服務(wù)端連接關(guān)閉...");
    }

    /**
     * 服務(wù)端接收客戶端發(fā)送過來的數(shù)據(jù)結(jié)束之后調(diào)用
     * */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
    }

    /**
     * 工程出現(xiàn)異常的時候調(diào)用
     * */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        NettyConfig.send(ctx.channel(),"EXCEPTION");
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 服務(wù)端處理客戶端websocket請求的核心方法
     * */
    @Override
    protected void channelRead0(ChannelHandlerContext context, Object msg) throws Exception {
        //處理客戶端向服務(wù)端發(fā)起http握手請求的業(yè)務(wù)
        if (msg instanceof FullHttpRequest) {
            handHttpRequest(context,  (FullHttpRequest)msg);
        }else if (msg instanceof WebSocketFrame) {
            //處理websocket連接業(yè)務(wù)
            handWebsocketFrame(context, (WebSocketFrame)msg);
        }
    }

    /**
     * 處理客戶端與服務(wù)端之前的websocket業(yè)務(wù)
     * @param ctx
     * @param frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        //判斷是否是關(guān)閉websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            handShaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
        }
        //判斷是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        //判斷是否是二進制消息,如果是二進制消息,拋出異常
        if( ! (frame instanceof TextWebSocketFrame) ){
            System.out.println("目前我們不支持二進制消息·可能客戶端斷開連接");
            return;
        }
        //獲取客戶端向服務(wù)端發(fā)送的消息
        String vrId = ((TextWebSocketFrame) frame).text();
        System.out.println("接收到客戶端消息---->> " + vrId);
        if (vrId.contains(":0")){
            vrId = vrId.split(":")[0];
        }else{
            NettyConfig.addChannel(vrId,ctx.channel());
            NettyConfig.send(ctx.channel(),"OK");
        }
        NettyConfig.addStatus(vrId,0);
        //將信息存儲mysql數(shù)據(jù)庫
myWebSocketHandler.vrChannelService.insertVrMysql(vrId,0,ctx.channel().id().toString());
    }

    /**
     * 處理客戶端向服務(wù)端發(fā)起http握手請求的業(yè)務(wù)
     * @param ctx
     * @param req
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        if (!req.decoderResult().isSuccess() || !("websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                WEB_SOCKET_URL, null, false);
        handShaker = wsFactory.newHandshaker(req);
        if (handShaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            handShaker.handshake(ctx.channel(), req);
        }
    }
    
    /**
     * 服務(wù)端向客戶端響應(yīng)消息
     * @param ctx
     * @param req
     * @param res
     */
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        //服務(wù)端向客戶端發(fā)送數(shù)據(jù)
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

/**
 * netty 5 的方法
 *  @Override
 *  protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
 *      //處理客戶端向服務(wù)端發(fā)起http握手請求的業(yè)務(wù)
 *      if (msg instanceof FullHttpRequest) {
 *          handHttpRequest(context,  (FullHttpRequest)msg);
 *      }else if (msg instanceof WebSocketFrame) {
 *          //處理websocket連接業(yè)務(wù)
 *          handWebsocketFrame(context, (WebSocketFrame)msg);
 *      }
 *  }
 */
}
netty連接通道channel在分布式下的共享服務(wù)思路

首先來說channel是無法被序列化儲存的,所以對于一些想要把channel序列化存儲后再取出來,給客戶端發(fā)送消息是不現(xiàn)實的,即使將channel序列化存儲后取出來也不再是原來的channel了。

  • 思路一: 用 rabbitmq 來通知每個netty server

以兩個netty server為例,把每個netty server 當作rabbitmq的服務(wù)中心。
例如: 當client1連接到server1,client2連接server2,這時有其他的服務(wù)連接到server1,需要sercer2通知client2時,就可以通過server1發(fā)送通知告訴server2,server2再發(fā)送消息給client2。這樣就實現(xiàn)了netty的分布式的部署。

  1. 思路二: netty注冊中心服務(wù)

這種方案其實跟第一種沒多大區(qū)別,思路都是一樣的。只是說將rabbitmq換成了netty server來進行消息的互通,發(fā)送消息還是通過當前的server發(fā)送到連接的client。

其他
  1. 若想了解其他的啟動方式可以參考
    springboot整合netty的多種方式
  2. 了解其他的優(yōu)秀文章
    Netty服務(wù)端Channel的創(chuàng)建與初始化
    深入理解 Netty-Channel架構(gòu)體系
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容