更多關(guān)于Java方面的文章,歡迎訪問燕歸來https://www.zhoutao123.com
編寫一個網(wǎng)絡(luò)應(yīng)用程序需要實現(xiàn)某種編解碼器,編解碼器的作用就是講原始字節(jié)數(shù)據(jù)與自定義的消息對象進(jìn)行互轉(zhuǎn)。網(wǎng)絡(luò)中都是以字節(jié)碼的數(shù)據(jù)形式來傳輸數(shù)據(jù)的,服務(wù)器編碼數(shù)據(jù)后發(fā)送到客戶端,客戶端需要對數(shù)據(jù)進(jìn)行解碼,因為編解碼器由兩部分組成:
- Decoder(解碼器)
- Encoder(編碼器)
解碼器負(fù)責(zé)處理“入站”數(shù)據(jù),編碼器負(fù)責(zé)處理“出站”數(shù)據(jù)。編碼器和解碼器的結(jié)構(gòu)很簡單,消息被編碼后解碼后會自動通過ReferenceCountUtil.release(message)釋放。
需要補(bǔ)充說明的是,Netty中有兩個方向的數(shù)據(jù)流
入站(ChannelInboundHandler):從遠(yuǎn)程主機(jī)到用戶應(yīng)用程序則是“入站(inbound)”
出站(ChannelOutboundHandler):從用戶應(yīng)用程序到遠(yuǎn)程主機(jī)則是“出站(outbound)”
今天我們主要學(xué)習(xí)編碼器,也就是Encoder
實現(xiàn)邏輯
完成一個編碼器的編寫主要是實現(xiàn)一個抽象類MessageToMessageEncoder,其中我們需要重寫方法是
/**
* Encode from one message to an other. This method will be called for each written message that can be handled
* by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
* @param msg the message to encode to an other one
* @param out the {@link List} into which the encoded msg should be added
* needs to do some kind of aggragation
* @throws Exception is thrown if an error accour
*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
其中泛型參數(shù)I表示我們需要接收的參數(shù)類型,如你需要將ByteBuf類型轉(zhuǎn)換為Date類型,那么泛型I就是ByteBuf(事實上當(dāng)實現(xiàn)ByteBuf編碼為其他類型的時候是不需要使用MessageToMessageEncoder,Netty提供了ByteToMessageCodec,其本質(zhì)也是實現(xiàn)了MessageToMessageEncoder)
代碼編寫
需求說明
客戶端發(fā)過來一個數(shù)字(ByteBuf),我們將此類型轉(zhuǎn)換為數(shù)字,獲取當(dāng)前時間加上此數(shù)字的時間后返回客戶端,具體邏輯如下:
<div align="center">
<img src="https://www.zhoutao123.com/wp-content/uploads/2018/06/18261b926b654e57fd48e19d3022c215.png"/>
</div>
編碼器的編寫
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.time.LocalDateTime;
import java.util.List;
public class TimeEncoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//將ByteBuf轉(zhuǎn)換為String,注意此處,我是Mac OS,數(shù)據(jù)結(jié)尾是\r\n,如果是其他類型的OS,此處可能需要調(diào)整
String dataStr = msg.toString(CharsetUtil.UTF_8).replace("\r\n","");
//將String轉(zhuǎn)換為Integer
Integer dataInteger = Integer.valueOf(dataStr);
//獲取當(dāng)前時間N小時后的數(shù)據(jù)
LocalDateTime now = LocalDateTime.now();
LocalDateTime dataLocalDatetime = now.plusHours(dataInteger);
out.add(dataLocalDatetime);
}
}
服務(wù)端處理代碼
此處的服務(wù)端HandleAdapter和前面兩個章節(jié)的HandleAdapter有所區(qū)別的是:其繼承了SimpleChannelInboundHandler<I> 并且傳遞了一個泛型參數(shù),這里需要說明一下,SimpleChannelInboundHandler是ChannelInboundHandler一個子類,他能夠自動幫我們處理一些數(shù)據(jù),在ChannelInboundHandler中,我們使用channel方法來接收數(shù)據(jù),那么在SimpleChannelInboundHandler中我們使用protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;來接收客戶端的參數(shù),可以看到的是,其參數(shù)中自動的實現(xiàn)了我們需要處理的泛型I msg,另外看一下SimpleChannelInboundHandler中channelRead方法的實現(xiàn)代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
//acceptInboundMessage() 檢查參數(shù)的類型是否和設(shè)定的泛型是否匹配
//可以看到匹配的話,會進(jìn)行強(qiáng)制類型轉(zhuǎn)換并調(diào)用messageReceived方法
//否則的話,不執(zhí)行,也就是說,這里的泛型一定要和編碼器轉(zhuǎn)換的結(jié)果類型一致,否則將接收不到參數(shù)
//當(dāng)前如果你需要自己轉(zhuǎn)換,那么你也可以和ChannelInBoundHandleAdapter一樣,重寫channelRead方法
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
messageReceived(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
那么繼續(xù)實現(xiàn)我們的HandleAdapter,代碼非常簡單,這里不再贅述。需要注意的是,我們這里沒有做解碼器,也就是說入站的時候需要ByteBuf類型的數(shù)據(jù),因此使用channel.writeAndFlush(Object)的時候,需要的就是ByteBuf類型的數(shù)據(jù)類型(當(dāng)然如果pipeline中添加了StringDecoder解碼器,那么你就可以直接使用字符串類型的數(shù)據(jù)了)
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TimeServerChannelHandleAdapter extends SimpleChannelInboundHandler<LocalDateTime> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("添加了新的連接信息:id = " + ctx.channel().id());
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, LocalDateTime msg) throws Exception {
// 轉(zhuǎn)換時間格式
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String dateFormat = msg.format(dateTimeFormatter);
System.out.println("接收到參數(shù):" + dateFormat);
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(dateFormat, Charset.defaultCharset()));
}
}
服務(wù)端啟動代碼
服務(wù)前啟動代碼和以前的代碼非常類似,只需要在pipeline添加上適配的編碼器即可,當(dāng)然需要注意順序(這個知識點以后我在仔細(xì)的闡述)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
public void start() throws Exception {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//設(shè)置編碼
pipeline.addLast(new TimeEncoder());
//設(shè)置服務(wù)處理
pipeline.addLast(new TimeServerChannelHandleAdapter());
}
});
ChannelFuture sync = bootstrap.bind(9998).sync();
System.out.println("Netty Server start with 9998 port");
sync.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
boosGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
TimeServer server = new TimeServer();
server.start();
}
}
效果展示
這里為了不寫太多的代碼,防止造成知識的不理解,迷惑,這里我們使用telnet命令來測試數(shù)據(jù),
啟動服務(wù)器端
運(yùn)行TimeServer代碼的中的main方法即可

使用Telnet發(fā)送數(shù)據(jù)
telnet的命令格式是
usage: telnet [-l user] [-a] [-s src_addr] host-name [port]
可以看到大部分參數(shù)都是可選的,只有主機(jī)名稱必填

發(fā)送數(shù)據(jù)的效果
繼續(xù)在telnet中發(fā)送一個數(shù)據(jù)5,我們分別看下服務(wù)端的打印的數(shù)據(jù)和telnet接收到的數(shù)據(jù)
服務(wù)端打印的數(shù)據(jù)如下:

telnet端打印的接收到的數(shù)據(jù)

總結(jié)
至此,一個簡單的編碼器就完成,我們總結(jié)一下步驟
- 繼承MessageToMessageDecoder抽象類,實現(xiàn)decode()方法
- 配置HandleAdapter 實現(xiàn)channelRead或者messageReceived方法
- 配置服務(wù)啟動類,配置ChannelPipeline,添加編碼器和HandleAdapter
- 編寫客戶端或者使用telnet或者其他手段測試
更多關(guān)于Java方面的文章,歡迎訪問燕歸來https://www.zhoutao123.com