拆包粘包問題解決

netty使用tcp/ip協(xié)議傳輸數(shù)據(jù)。而tcp/ip協(xié)議是類似水流一樣的數(shù)據(jù)傳輸方式。多次訪問的時(shí)候有可能出現(xiàn)數(shù)據(jù)粘包的問題,解決這種問題的方式如下:

定長(zhǎng)數(shù)據(jù)流

客戶端和服務(wù)器,提前協(xié)調(diào)好,每個(gè)消息長(zhǎng)度固定。(如:長(zhǎng)度10)。如果客戶端或服務(wù)器寫出的數(shù)據(jù)不足10,則使用空白字符補(bǔ)足(如:使用空格)。

/**
 * 1. 雙線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
 */
package com.bjsxt.socket.netty.fixedlength;

import java.nio.charset.Charset;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Server4FixedLength {
    // 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
    private EventLoopGroup acceptorGroup = null;
    // 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
    private EventLoopGroup clientGroup = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private ServerBootstrap bootstrap = null;
    public Server4FixedLength(){
        init();
    }
    private void init(){
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        // 綁定線程組
        bootstrap.group(acceptorGroup, clientGroup);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioServerSocketChannel.class);
        // 設(shè)定緩沖區(qū)大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
    }
    public ChannelFuture doAccept(int port) throws InterruptedException{
        
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
                // 定長(zhǎng)Handler。通過構(gòu)造參數(shù)設(shè)置消息長(zhǎng)度(單位是字節(jié))。發(fā)送的消息長(zhǎng)度不足可以使用空格補(bǔ)全。
                acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
                // 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
                acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
                acceptorHandlers[2] = new Server4FixedLengthHandler();
                ch.pipeline().addLast(acceptorHandlers);
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4FixedLength server = null;
        try{
            server = new Server4FixedLength();
            
            future = server.doAccept(9999);
            System.out.println("server started.");
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.fixedlength;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
    
    // 業(yè)務(wù)處理邏輯
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = msg.toString();
        System.out.println("from client : " + message.trim());
        String line = "ok ";
        ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    }
    

    // 異常處理邏輯
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

/**
 * 1. 單線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. connect連接服務(wù),并發(fā)起請(qǐng)求
 */
package com.bjsxt.socket.netty.fixedlength;

import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Client4FixedLength {
    
    // 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
    private EventLoopGroup group = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private Bootstrap bootstrap = null;
    
    public Client4FixedLength(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 綁定線程組
        bootstrap.group(group);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioSocketChannel.class);
    }
    
    public ChannelFuture doRequest(String host, int port) throws InterruptedException{
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelHandler[] handlers = new ChannelHandler[3];
                handlers[0] = new FixedLengthFrameDecoder(3);
                // 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
                handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
                handlers[2] = new Client4FixedLengthHandler();
                
                ch.pipeline().addLast(handlers);
            }
        });
        ChannelFuture future = this.bootstrap.connect(host, port).sync();
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4FixedLength client = null;
        ChannelFuture future = null;
        try{
            client = new Client4FixedLength();
            
            future = client.doRequest("localhost", 9999);
            
            Scanner s = null;
            while(true){
                s = new Scanner(System.in);
                System.out.print("enter message send to server > ");
                String line = s.nextLine();
                byte[] bs = new byte[5];
                byte[] temp = line.getBytes("UTF-8");
                if(temp.length <= 5){
                    for(int i = 0; i < temp.length; i++){
                        bs[i] = temp[i];
                    }
                }
                future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
                TimeUnit.SECONDS.sleep(1);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.fixedlength;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4FixedLengthHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            String message = msg.toString();
            System.out.println("from server : " + message);
        }finally{
            // 用于釋放緩存。避免內(nèi)存溢出
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

特殊結(jié)束符

客戶端和服務(wù)器,協(xié)商定義一個(gè)特殊的分隔符號(hào),分隔符號(hào)長(zhǎng)度自定義。如:‘#’、‘_’、‘AA@’。在通訊的時(shí)候,只要沒有發(fā)送分隔符號(hào),則代表一條數(shù)據(jù)沒有結(jié)束。

/**
 * 1. 雙線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
 */
package com.bjsxt.socket.netty.delimiter;

import java.nio.charset.Charset;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Server4Delimiter {
    // 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
    private EventLoopGroup acceptorGroup = null;
    // 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
    private EventLoopGroup clientGroup = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private ServerBootstrap bootstrap = null;
    public Server4Delimiter(){
        init();
    }
    private void init(){
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        // 綁定線程組
        bootstrap.group(acceptorGroup, clientGroup);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioServerSocketChannel.class);
        // 設(shè)定緩沖區(qū)大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
    }
    public ChannelFuture doAccept(int port) throws InterruptedException{
        
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 數(shù)據(jù)分隔符, 定義的數(shù)據(jù)分隔符一定是一個(gè)ByteBuf類型的數(shù)據(jù)對(duì)象。
                ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
                ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
                // 處理固定結(jié)束標(biāo)記符號(hào)的Handler。這個(gè)Handler沒有@Sharable注解修飾,
                // 必須每次初始化通道時(shí)創(chuàng)建一個(gè)新對(duì)象
                // 使用特殊符號(hào)分隔處理數(shù)據(jù)粘包問題,也要定義每個(gè)數(shù)據(jù)包最大長(zhǎng)度。netty建議數(shù)據(jù)有最大長(zhǎng)度。
                acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
                // 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
                acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
                acceptorHandlers[2] = new Server4DelimiterHandler();
                ch.pipeline().addLast(acceptorHandlers);
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4Delimiter server = null;
        try{
            server = new Server4Delimiter();
            
            future = server.doAccept(9999);
            System.out.println("server started.");
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.delimiter;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Server4DelimiterHandler extends ChannelHandlerAdapter {
    
    // 業(yè)務(wù)處理邏輯
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = msg.toString();
        System.out.println("from client : " + message);
        String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
        ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    }
    

    // 異常處理邏輯
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

/**
 * 1. 單線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. connect連接服務(wù),并發(fā)起請(qǐng)求
 */
package com.bjsxt.socket.netty.delimiter;

import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Client4Delimiter {
    
    // 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
    private EventLoopGroup group = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private Bootstrap bootstrap = null;
    
    public Client4Delimiter(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 綁定線程組
        bootstrap.group(group);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioSocketChannel.class);
    }
    
    public ChannelFuture doRequest(String host, int port) throws InterruptedException{
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 數(shù)據(jù)分隔符
                ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
                ChannelHandler[] handlers = new ChannelHandler[3];
                handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
                // 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
                handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
                handlers[2] = new Client4DelimiterHandler();
                
                ch.pipeline().addLast(handlers);
            }
        });
        ChannelFuture future = this.bootstrap.connect(host, port).sync();
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4Delimiter client = null;
        ChannelFuture future = null;
        try{
            client = new Client4Delimiter();
            
            future = client.doRequest("localhost", 9999);
            
            Scanner s = null;
            while(true){
                s = new Scanner(System.in);
                System.out.print("enter message send to server > ");
                String line = s.nextLine();
                future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
                TimeUnit.SECONDS.sleep(1);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.delimiter;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4DelimiterHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            String message = msg.toString();
            System.out.println("from server : " + message);
        }finally{
            // 用于釋放緩存。避免內(nèi)存溢出
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

協(xié)議

相對(duì)最成熟的數(shù)據(jù)傳遞方式。有服務(wù)器的開發(fā)者提供一個(gè)固定格式的協(xié)議標(biāo)準(zhǔn)。客戶端和服務(wù)器發(fā)送數(shù)據(jù)和接受數(shù)據(jù)的時(shí)候,都依據(jù)協(xié)議制定和解析消息。

/**
 * 1. 雙線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
 */
package com.bjsxt.socket.netty.protocol;

import java.nio.charset.Charset;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class Server4Protocol {
    // 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
    private EventLoopGroup acceptorGroup = null;
    // 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
    private EventLoopGroup clientGroup = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private ServerBootstrap bootstrap = null;
    public Server4Protocol(){
        init();
    }
    private void init(){
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        // 綁定線程組
        bootstrap.group(acceptorGroup, clientGroup);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioServerSocketChannel.class);
        // 設(shè)定緩沖區(qū)大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
    }
    public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
        
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(acceptorHandlers);
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4Protocol server = null;
        try{
            server = new Server4Protocol();
            future = server.doAccept(9999,new Server4ProtocolHandler());
            System.out.println("server started.");
            
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

/**
 * @Sharable注解 - 
 *  代表當(dāng)前Handler是一個(gè)可以分享的處理器。也就意味著,服務(wù)器注冊(cè)此Handler后,可以分享給多個(gè)客戶端同時(shí)使用。
 *  如果不使用注解描述類型,則每次客戶端請(qǐng)求時(shí),必須為客戶端重新創(chuàng)建一個(gè)新的Handler對(duì)象。
 *  
 */
package com.bjsxt.socket.netty.protocol;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {
    
    // 業(yè)務(wù)處理邏輯
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = msg.toString();
        System.out.println("server receive protocol content : " + message);
        message = ProtocolParser.parse(message);
        if(null == message){
            System.out.println("error request from client");
            return ;
        }
        System.out.println("from client : " + message);
        String line = "server message";
        line = ProtocolParser.transferTo(line);
        System.out.println("server send protocol content : " + line);
        ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    }

    // 異常處理邏輯
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        cause.printStackTrace();
        ctx.close();
    }
    
    static class ProtocolParser{
        public static String parse(String message){
            String[] temp = message.split("HEADBODY");
            temp[0] = temp[0].substring(4);
            temp[1] = temp[1].substring(0, (temp[1].length()-4));
            int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
            if(length != temp[1].length()){
                return null;
            }
            return temp[1];
        }
        public static String transferTo(String message){
            message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
            return message;
        }
    }

}

/**
 * 1. 單線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. connect連接服務(wù),并發(fā)起請(qǐng)求
 */
package com.bjsxt.socket.netty.protocol;

import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class Client4Protocol {
    
    // 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
    private EventLoopGroup group = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private Bootstrap bootstrap = null;
    
    public Client4Protocol(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 綁定線程組
        bootstrap.group(group);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioSocketChannel.class);
    }
    
    public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(handlers);
            }
        });
        ChannelFuture future = this.bootstrap.connect(host, port).sync();
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4Protocol client = null;
        ChannelFuture future = null;
        try{
            client = new Client4Protocol();
            future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
            
            Scanner s = null;
            while(true){
                s = new Scanner(System.in);
                System.out.print("enter message send to server > ");
                String line = s.nextLine();
                line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
                System.out.println("client send protocol content : " + line);
                future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
                TimeUnit.SECONDS.sleep(1);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.protocol;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4ProtocolHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            String message = msg.toString();
            System.out.println("client receive protocol content : " + message);
            message = ProtocolParser.parse(message);
            if(null == message){
                System.out.println("error response from server");
                return ;
            }
            System.out.println("from server : " + message);
        }finally{
            // 用于釋放緩存。避免內(nèi)存溢出
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

    static class ProtocolParser{
        public static String parse(String message){
            String[] temp = message.split("HEADBODY");
            temp[0] = temp[0].substring(4);
            temp[1] = temp[1].substring(0, (temp[1].length()-4));
            int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
            if(length != temp[1].length()){
                return null;
            }
            return temp[1];
        }
        public static String transferTo(String message){
            message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
            return message;
        }
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.這篇文章不是本人原創(chuàng)的,只是個(gè)人為了對(duì)這部分知識(shí)做一個(gè)整理和系統(tǒng)的輸出而編輯成的,在此鄭重地向本文所引用文章的...
    SOMCENT閱讀 13,383評(píng)論 6 174
  • 計(jì)算機(jī)網(wǎng)絡(luò)概述 網(wǎng)絡(luò)編程的實(shí)質(zhì)就是兩個(gè)(或多個(gè))設(shè)備(例如計(jì)算機(jī))之間的數(shù)據(jù)傳輸。 按照計(jì)算機(jī)網(wǎng)絡(luò)的定義,通過一定...
    蛋炒飯_By閱讀 1,373評(píng)論 0 10
  • 網(wǎng)絡(luò)編程 網(wǎng)絡(luò)編程對(duì)于很多的初學(xué)者來說,都是很向往的一種編程技能,但是很多的初學(xué)者卻因?yàn)楹荛L(zhǎng)一段時(shí)間無法進(jìn)入網(wǎng)絡(luò)編...
    程序員歐陽(yáng)閱讀 2,113評(píng)論 1 37
  • 個(gè)人認(rèn)為,Goodboy1881先生的TCP /IP 協(xié)議詳解學(xué)習(xí)博客系列博客是一部非常精彩的學(xué)習(xí)筆記,這雖然只是...
    貳零壹柒_fc10閱讀 5,200評(píng)論 0 8
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評(píng)論 19 139

友情鏈接更多精彩內(nèi)容