最近做的項目有需求跟硬件通信,使用tcp實現(xiàn)長連接,協(xié)議自己規(guī)定,于是后端決定選用netty來作為tcp服務(wù)器,這里簡單說一下netty的工作流程。外部的數(shù)據(jù)傳入netty服務(wù)器中,netty首先通過解碼器對數(shù)據(jù)進(jìn)行一次預(yù)處理(比如把字節(jié)轉(zhuǎn)為字符串或?qū)ο髞矸奖悴僮鳎?,接著把預(yù)處理后的數(shù)據(jù)轉(zhuǎn)發(fā)給處理器,在處理器中執(zhí)行業(yè)務(wù)邏輯,最后如果有必要返回數(shù)據(jù)給連接者,可以通過netty提供的channel發(fā)送。
- netty—>decode—>handler
首先是啟動一個tcp服務(wù)器
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author lanni
* @date 2020/8/19 23:05
* @description
**/
public class TCPServer {
public void run(int port) throws Exception {
//創(chuàng)建線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創(chuàng)建啟動類
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口,開始接收進(jìn)來的連接
ChannelFuture f = b.bind(port).sync();
// 等待服務(wù)器 socket 關(guān)閉 。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
try {
System.out.println("tcp服務(wù)器啟動...");
new TCPServer().run(8998);
} catch (Exception e) {
e.printStackTrace();
}
}
}
初始化解碼器、處理器
package server;
import handler.CustomDecode;
import handler.TCPServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
/**
* @author lanni
* @date 2020/8/22 11:58
* @description
**/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().
addLast(new CustomDecode()). //自定義解碼器
addLast(new TCPServerHandler()) //自定義處理器
;
}
}
解碼器中解決tcp粘包問題,關(guān)于什么是粘包、拆包我就不做解釋了,我這里直接上解決方案,這里我簡單說一下我做的項目數(shù)據(jù)傳輸,規(guī)定數(shù)據(jù)格式:
固定頭部(2字節(jié))+數(shù)據(jù)長度(4字節(jié))+其它(17字節(jié))+數(shù)據(jù)(可變長度)+crc校驗碼(2字節(jié))+固定結(jié)尾(2字節(jié))
所以每次收到的數(shù)據(jù)包中包含了數(shù)據(jù)的長度,就以此長度來組裝數(shù)據(jù)包傳遞給handler,這里注意看我的注釋部分。
import util.StringUtil;
import java.util.List;
/**
* @Author lanni
* @Date 2020/8/23 9:30
* @Description
**/
public class CustomDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
int len = in.readableBytes(); //這里得到可讀取的字節(jié)長度
in.markReaderIndex(); //包頭做標(biāo)記位,后面可以重新回到數(shù)據(jù)包頭開始讀數(shù)據(jù)
//有數(shù)據(jù)時開始讀數(shù)據(jù)包
if (len > 0) {
byte[] src = new byte[len];
in.readBytes(src); //把數(shù)據(jù)讀到字節(jié)數(shù)組中(讀取完之后指針會到最后一個數(shù)據(jù))
in.resetReaderIndex(); //重置當(dāng)前指針到標(biāo)記位(包頭)
//驗證首部為A5 5A,只接收首部正確的數(shù)據(jù)包,如果包頭錯誤可以直接丟棄或關(guān)閉連接
if ((src[0] & 0x000000ff) == 0xA5 && (src[1] & 0x000000ff) == 0x5A) {
//計算報文長度
byte[] data = {src[3],src[2]};
String hexLen = StringUtil.byteArrayToHexString(data);
//這里計算出來的是數(shù)據(jù)長度的報文長度,需要加27個固定長度
int pLen = Integer.parseInt(hexLen, 16) + 27;
if (len < pLen) {
//當(dāng)數(shù)據(jù)包的長度不夠時直接return,netty在緩沖區(qū)有數(shù)據(jù)時會一直調(diào)用decode方法,所以我們只需要等待下一個數(shù)據(jù)包傳輸過來一起解析
return;
}
byte[] packet = new byte[pLen];
in.readBytes(packet,0,pLen);
out.add(packet);
}else {
channelHandlerContext.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("連接異常:"+cause);
// ctx.close();
}
然后就是處理器,用于處理得到的數(shù)據(jù)包,這個大家可以自己編寫邏輯。
package handler;
import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
* @Author lanni
* @Date 2020/8/19 23:07
* @Description
**/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里msg就是從解碼器中傳來的數(shù)據(jù),解碼器傳輸過來是什么格式,這里直接轉(zhuǎn)成對應(yīng)的格式就可以
byte[] src = (byte[]) msg;
try {
//這里做自己的業(yè)務(wù)邏輯
//獲取鏈接實例
Channel channel = ctx.channel();
//響應(yīng)消息一定要這樣去發(fā)送,只能使用字節(jié)傳輸
//netty中發(fā)送數(shù)據(jù)需要把待發(fā)送的字節(jié)數(shù)組包裝一下成為ByteBuf來發(fā)送
byte[] dest = null;
ByteBuf buf = Unpooled.copiedBuffer(dest);
//數(shù)據(jù)沖刷
ChannelFuture cf = channel.writeAndFlush(buf);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
netty中當(dāng)然還涉及到服務(wù)器主動發(fā)送消息給客戶端,但是需要注意的是如果是主動發(fā)消息,有一個先決條件是需要知道客戶端的唯一標(biāo)識(id或其它標(biāo)識),我們需要用一個map來保存好channel和這個標(biāo)識的對應(yīng)關(guān)系。我所做的項目是服務(wù)器來維護(hù)設(shè)備id和連接通道channel的對應(yīng)關(guān)系。
首先需要一個統(tǒng)一管理channel的類,這里有CHANNEL_POOL和KEY_POOL兩個map,是為了讓id和channel能夠互相對應(yīng)起來,可能有人會想著只需要維護(hù)id—>channel的關(guān)系就可以了,但是可以看見上面在發(fā)生異常時所使用的處理方法exceptionCaught(ChannelHandlerContext ctx, Throwable cause)時,只能拿到channel,所以需要通過channel找到id來做出相應(yīng)的操作。
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author lanni
* @date 2020/9/11 20:21
*
**/
@Slf4j
public class NettyChannelManager {
/**
* 保存連接 Channel 的地方
*/
private static Map<String, Channel> CHANNEL_POOL = new ConcurrentHashMap<>();
private static Map<Channel, String> KEY_POOL = new ConcurrentHashMap<>();
/**
* 添加 Channel
*
* @param key
*/
public static void add(String key, Channel channel) {
CHANNEL_POOL.put(key, channel);
KEY_POOL.put(channel, key);
}
/**
* 刪除 Channel
*
* @param key
*/
public static void remove(String key) {
Channel channel = CHANNEL_POOL.get(key);
if (channel == null) {
return;
}
CHANNEL_POOL.remove(key);
KEY_POOL.remove(channel);
}
/**
* 刪除并同步關(guān)閉連接
*
* @param key
*/
public static void removeAndClose(String key) {
Channel channel = CHANNEL_POOL.get(key);
remove(key);
if (channel != null) {
// 關(guān)閉連接
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void removeAndClose(Channel channel) {
String key = KEY_POOL.get(channel);
removeAndClose(key);
}
/**
* 獲得 Channel
*
* @param key
* @return String
*/
public static Channel getChannel(String key) {
return CHANNEL_POOL.get(key);
}
/**
* 獲得 key
*
* @param channel
* @return Channel
*/
public static String getKey(Channel channel) {
return KEY_POOL.get(channel);
}
/**
* 判斷是否存在key
* @author lanni
* @date 2020/9/16 10:10
* @param key
* @return boolean
**/
public static boolean hasKey(String key) {
return CHANNEL_POOL.containsKey(key);
}
/**
* 判斷是否存在channel
* @author lanni
* @date 2020/10/12 9:34
* @param channel
* @return boolean
**/
public static boolean hasChannel(Channel channel) {
return KEY_POOL.containsKey(channel);
}
}
我這里是在處理器中獲取到設(shè)備的id,然后交給NettyChannelManager管理,當(dāng)發(fā)生異常時關(guān)閉channel并移除對應(yīng)的連接信息。
package handler;
import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
* @Author lanni
* @Date 2020/8/19 23:07
* @Description
**/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里msg就是從解碼器中傳來的數(shù)據(jù),解碼器傳輸過來是什么格式,這里直接轉(zhuǎn)成對應(yīng)的格式就可以
byte[] src = (byte[]) msg;
try {
// 從數(shù)據(jù)包中拿到設(shè)備id
byte[] deviceId = new byte[17];
System.arraycopy(src, 4, deviceId, 0, 17);
String devId = StrUtil.str(deviceId, CharsetUtil.UTF_8);
// 保存channel,key
// deviceId為空時表示設(shè)備斷線重連
if (!NettyChannelManager.hasKey(devId)) {
NettyChannelManager.add(devId, ctx.channel());
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
log.error("發(fā)生異常:" + cause.getMessage());
String devId = NettyChannelManager.getKey(ctx.channel());
if (devId == null || "".equals(devId)) {
return;
}
// 刪除鏈接信息并關(guān)閉鏈接
NettyChannelManager.removeAndClose(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String devId = NettyChannelManager.getKey(ctx.channel());
if (devId == null || "".equals(devId)) {
return;
}
// 刪除鏈接信息并關(guān)閉鏈接
NettyChannelManager.removeAndClose(ctx.channel());
}
}
現(xiàn)在有了這樣一個對應(yīng)關(guān)系之后,如果我們想給客戶端主動發(fā)送消息,那么我們只需要通過客戶端的id拿到對應(yīng)的channel就可以在任意位置發(fā)送數(shù)據(jù)。
// 先準(zhǔn)備好需要發(fā)送的數(shù)據(jù)
byte[] pkg =
// 通過id獲取netty連接通道channel
Channel channel = NettyChannelManager.getChannel(deviceId);
// 封裝數(shù)據(jù)
ByteBuf buf = Unpooled.copiedBuffer(pkg);
// 把數(shù)據(jù)寫入通道并發(fā)送
channel.writeAndFlush(buf);
結(jié)語:以上所說都是在單機(jī)環(huán)境下,如果說是分布式環(huán)境的話那么關(guān)于id-channel的維護(hù)就需要修改。我們可以使用spring session來代替這里的
NettyChannelManager,只需要幾個配置就能解決分布式的問題,當(dāng)然也可以有其它的方案,我在這里就不列舉了。