netty使用tcp/ip協(xié)議傳輸數(shù)據(jù)。而tcp/ip協(xié)議是類似水流一樣的數(shù)據(jù)傳輸方式。多次訪問的時(shí)候有可能出現(xiàn)數(shù)據(jù)粘包的問題,解決這種問題的方式如下:
定長(zhǎng)數(shù)據(jù)流
客戶端和服務(wù)器,提前協(xié)調(diào)好,每個(gè)消息長(zhǎng)度固定。(如:長(zhǎng)度10)。如果客戶端或服務(wù)器寫出的數(shù)據(jù)不足10,則使用空白字符補(bǔ)足(如:使用空格)。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
*/
package com.bjsxt.socket.netty.fixedlength;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4FixedLength {
// 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
private EventLoopGroup acceptorGroup = null;
// 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
private EventLoopGroup clientGroup = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private ServerBootstrap bootstrap = null;
public Server4FixedLength(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設(shè)定緩沖區(qū)大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 定長(zhǎng)Handler。通過構(gòu)造參數(shù)設(shè)置消息長(zhǎng)度(單位是字節(jié))。發(fā)送的消息長(zhǎng)度不足可以使用空格補(bǔ)全。
acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
// 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4FixedLengthHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4FixedLength server = null;
try{
server = new Server4FixedLength();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
package com.bjsxt.socket.netty.fixedlength;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
// 業(yè)務(wù)處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message.trim());
String line = "ok ";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. connect連接服務(wù),并發(fā)起請(qǐng)求
*/
package com.bjsxt.socket.netty.fixedlength;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4FixedLength {
// 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
private EventLoopGroup group = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private Bootstrap bootstrap = null;
public Client4FixedLength(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new FixedLengthFrameDecoder(3);
// 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4FixedLengthHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4FixedLength client = null;
ChannelFuture future = null;
try{
client = new Client4FixedLength();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
byte[] bs = new byte[5];
byte[] temp = line.getBytes("UTF-8");
if(temp.length <= 5){
for(int i = 0; i < temp.length; i++){
bs[i] = temp[i];
}
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.bjsxt.socket.netty.fixedlength;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4FixedLengthHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免內(nèi)存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
特殊結(jié)束符
客戶端和服務(wù)器,協(xié)商定義一個(gè)特殊的分隔符號(hào),分隔符號(hào)長(zhǎng)度自定義。如:‘#’、‘’、‘AA@’。在通訊的時(shí)候,只要沒有發(fā)送分隔符號(hào),則代表一條數(shù)據(jù)沒有結(jié)束。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
*/
package com.bjsxt.socket.netty.delimiter;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Delimiter {
// 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
private EventLoopGroup acceptorGroup = null;
// 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
private EventLoopGroup clientGroup = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private ServerBootstrap bootstrap = null;
public Server4Delimiter(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設(shè)定緩沖區(qū)大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 數(shù)據(jù)分隔符, 定義的數(shù)據(jù)分隔符一定是一個(gè)ByteBuf類型的數(shù)據(jù)對(duì)象。
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 處理固定結(jié)束標(biāo)記符號(hào)的Handler。這個(gè)Handler沒有@Sharable注解修飾,
// 必須每次初始化通道時(shí)創(chuàng)建一個(gè)新對(duì)象
// 使用特殊符號(hào)分隔處理數(shù)據(jù)粘包問題,也要定義每個(gè)數(shù)據(jù)包最大長(zhǎng)度。netty建議數(shù)據(jù)有最大長(zhǎng)度。
acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4DelimiterHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Delimiter server = null;
try{
server = new Server4Delimiter();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
package com.bjsxt.socket.netty.delimiter;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4DelimiterHandler extends ChannelHandlerAdapter {
// 業(yè)務(wù)處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message);
String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. connect連接服務(wù),并發(fā)起請(qǐng)求
*/
package com.bjsxt.socket.netty.delimiter;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Delimiter {
// 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
private EventLoopGroup group = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private Bootstrap bootstrap = null;
public Client4Delimiter(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 數(shù)據(jù)分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字符串解碼器Handler,會(huì)自動(dòng)處理channelRead方法的msg參數(shù),將ByteBuf類型的數(shù)據(jù)轉(zhuǎn)換為字符串對(duì)象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4DelimiterHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Delimiter client = null;
ChannelFuture future = null;
try{
client = new Client4Delimiter();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.bjsxt.socket.netty.delimiter;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免內(nèi)存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
協(xié)議
相對(duì)最成熟的數(shù)據(jù)傳遞方式。有服務(wù)器的開發(fā)者提供一個(gè)固定格式的協(xié)議標(biāo)準(zhǔn)。客戶端和服務(wù)器發(fā)送數(shù)據(jù)和接受數(shù)據(jù)的時(shí)候,都依據(jù)協(xié)議制定和解析消息。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. 綁定服務(wù)監(jiān)聽端口并啟動(dòng)服務(wù)
*/
package com.bjsxt.socket.netty.protocol;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Protocol {
// 監(jiān)聽線程組,監(jiān)聽客戶端請(qǐng)求
private EventLoopGroup acceptorGroup = null;
// 處理客戶端相關(guān)操作線程組,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
private EventLoopGroup clientGroup = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private ServerBootstrap bootstrap = null;
public Server4Protocol(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設(shè)定緩沖區(qū)大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū),SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Protocol server = null;
try{
server = new Server4Protocol();
future = server.doAccept(9999,new Server4ProtocolHandler());
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表當(dāng)前Handler是一個(gè)可以分享的處理器。也就意味著,服務(wù)器注冊(cè)此Handler后,可以分享給多個(gè)客戶端同時(shí)使用。
* 如果不使用注解描述類型,則每次客戶端請(qǐng)求時(shí),必須為客戶端重新創(chuàng)建一個(gè)新的Handler對(duì)象。
*
*/
package com.bjsxt.socket.netty.protocol;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {
// 業(yè)務(wù)處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("server receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error request from client");
return ;
}
System.out.println("from client : " + message);
String line = "server message";
line = ProtocolParser.transferTo(line);
System.out.println("server send protocol content : " + line);
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動(dòng)信息
* 3. 注冊(cè)業(yè)務(wù)處理Handler
* 4. connect連接服務(wù),并發(fā)起請(qǐng)求
*/
package com.bjsxt.socket.netty.protocol;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Protocol {
// 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
private EventLoopGroup group = null;
// 服務(wù)啟動(dòng)相關(guān)配置信息
private Bootstrap bootstrap = null;
public Client4Protocol(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設(shè)定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Protocol client = null;
ChannelFuture future = null;
try{
client = new Client4Protocol();
future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
System.out.println("client send protocol content : " + line);
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.bjsxt.socket.netty.protocol;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4ProtocolHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("client receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error response from server");
return ;
}
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免內(nèi)存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}